1use std::borrow::Borrow;
98use std::cmp::Ordering;
99use std::collections::hash_map::{self, HashMap};
100use std::collections::HashSet;
101use std::hash::Hash;
102use std::ops::{Deref, DerefMut};
103use std::sync::{Arc, RwLock as RwLockInner};
104use std::task::Poll;
105use std::{fmt, iter};
106
107use collate::Collator;
108use ds_ext::{OrdHashMap, OrdHashSet};
109use futures::TryFutureExt;
110use tokio::sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock};
111
112use super::guard::{TxnReadGuard, TxnReadGuardMap, TxnWriteGuard};
113use super::semaphore::*;
114use super::{Error, Result};
115
116pub use super::range::{Key, Range};
117
118type Canon<K, V> = HashMap<Key<K>, Arc<V>>;
119type Delta<K, V> = HashMap<Key<K>, Option<Arc<V>>>;
120type Deltas<I, K, V> = OrdHashMap<I, Delta<K, V>>;
121type Pending<K, V> = HashMap<Key<K>, Option<Arc<RwLock<V>>>>;
122
123pub type TxnMapValueReadGuard<K, V> = TxnReadGuard<Range<K>, V>;
125
126pub type TxnMapValueReadGuardMap<K, V> = TxnReadGuardMap<Range<K>, V>;
128
129pub type TxnMapValueWriteGuard<K, V> = TxnWriteGuard<Range<K>, V>;
131
132#[derive(Debug)]
133enum PendingValue<V> {
134 Committed(Arc<V>),
135 Pending(OwnedRwLockReadGuard<V>),
136}
137
138struct State<I, K, V> {
139 canon: Canon<K, V>,
140 commits: OrdHashSet<I>,
141 deltas: Deltas<I, K, V>,
142 pending: OrdHashMap<I, Pending<K, V>>,
143 finalized: Option<I>,
144}
145
146impl<I: Ord + Hash + fmt::Debug, K, V> State<I, K, V> {
147 #[inline]
148 fn new(txn_id: I, version: Pending<K, V>) -> Self {
149 Self {
150 canon: Canon::new(),
151 commits: OrdHashSet::new(),
152 deltas: OrdHashMap::new(),
153 pending: OrdHashMap::from_iter(iter::once((txn_id, version))),
154 finalized: None,
155 }
156 }
157}
158
159impl<I, K, V> State<I, K, V>
160where
161 I: Copy + Hash + Ord + fmt::Debug,
162 K: Hash + Ord,
163 V: fmt::Debug,
164{
165 #[inline]
166 fn check_committed(&self, txn_id: &I) -> Result<bool> {
167 match self.finalized.as_ref().cmp(&Some(txn_id)) {
168 Ordering::Greater => Err(Error::Outdated),
169 Ordering::Equal => Ok(true),
170 Ordering::Less => Ok(self.commits.contains(txn_id)),
171 }
172 }
173
174 #[inline]
175 fn check_pending(&self, txn_id: &I) -> Result<()> {
176 if self.finalized.as_ref() > Some(txn_id) {
177 Err(Error::Outdated)
178 } else if self.commits.contains(txn_id) {
179 Err(Error::Committed)
180 } else {
181 Ok(())
182 }
183 }
184
185 #[inline]
186 fn canon(&self, txn_id: &I) -> Canon<K, V> {
187 let mut canon = self.canon.clone();
188
189 let deltas = self
190 .deltas
191 .iter()
192 .take_while(|(id, _)| *id <= txn_id)
193 .map(|(_id, version)| version);
194
195 for version in deltas {
196 for (key, delta) in version {
197 if let Some(value) = delta {
198 canon.insert(key.clone(), value.clone());
199 } else {
200 canon.remove(key);
201 }
202 }
203 }
204
205 canon
206 }
207
208 #[inline]
209 fn clear(&mut self, txn_id: I) -> Canon<K, V> {
210 let mut map = self.canon(&txn_id);
211
212 if let Some(version) = self.pending.remove(&txn_id) {
213 for (key, delta) in version {
214 if let Some(value) = delta {
215 let value = Arc::try_unwrap(value).expect("value");
216 map.insert(key, Arc::new(value.into_inner()));
217 } else {
218 map.remove(&key);
219 }
220 }
221 }
222
223 let version = map.keys().cloned().map(|key| (key, None)).collect();
224 self.pending.insert(txn_id, version);
225
226 map
227 }
228
229 #[inline]
230 fn commit_version(&mut self, txn_id: &I) -> Option<Delta<K, V>> {
231 self.pending.remove(txn_id).map(|version| {
232 version
233 .into_iter()
234 .map(|(key, delta)| {
235 let value = if let Some(present) = delta {
236 if let Ok(value) = Arc::try_unwrap(present) {
237 Some(Arc::new(value.into_inner()))
238 } else {
239 panic!("a value at {:?} is still locked", txn_id);
240 }
241 } else {
242 None
243 };
244
245 (key, value)
246 })
247 .collect()
248 })
249 }
250
251 #[inline]
252 fn commit(&mut self, txn_id: I) {
253 if self.commits.contains(&txn_id) {
254 #[cfg(feature = "logging")]
255 log::warn!("duplicate commit at {:?}", txn_id);
256 } else if let Some(version) = self.commit_version(&txn_id) {
257 self.deltas.insert(txn_id, version);
258 }
259
260 self.commits.insert(txn_id);
261 }
262
263 #[inline]
264 fn contains_canon<Q>(&self, txn_id: &I, key: &Q) -> bool
265 where
266 Q: Eq + Hash + ?Sized,
267 Key<K>: Borrow<Q>,
268 {
269 contains_canon(&self.canon, &self.deltas, txn_id, key)
270 }
271
272 #[inline]
273 fn contains_committed<Q>(&self, txn_id: &I, key: &Q) -> Poll<Result<bool>>
274 where
275 Q: Eq + Hash + ?Sized,
276 Key<K>: Borrow<Q>,
277 {
278 match self.finalized.as_ref().cmp(&Some(txn_id)) {
279 Ordering::Greater => Poll::Ready(Err(Error::Outdated)),
280 Ordering::Equal => Poll::Ready(Ok(self.contains_canon(txn_id, key))),
281 Ordering::Less => {
282 if self.commits.contains(txn_id) {
283 Poll::Ready(Ok(self.contains_canon(txn_id, key)))
284 } else {
285 Poll::Pending
286 }
287 }
288 }
289 }
290
291 #[inline]
292 fn contains_pending<Q>(&self, txn_id: &I, key: &Q) -> bool
293 where
294 Q: Eq + Hash + ?Sized,
295 Key<K>: Borrow<Q>,
296 {
297 if let Some(delta) = self.pending.get(txn_id) {
298 if let Some(key_state) = delta.get(key) {
299 return key_state.is_some();
300 }
301 }
302
303 self.contains_canon(txn_id, key)
304 }
305
306 #[inline]
307 fn extend<Q, E>(&mut self, txn_id: I, other: E)
308 where
309 Q: Into<Key<K>>,
310 E: IntoIterator<Item = (Q, V)>,
311 {
312 let entries = other
313 .into_iter()
314 .map(|(key, value)| (key.into(), Some(Arc::new(RwLock::new(value)))));
315
316 if let Some(version) = self.pending.get_mut(&txn_id) {
317 version.extend(entries);
318 } else {
319 self.pending.insert(txn_id, entries.collect());
320 }
321 }
322
323 #[inline]
324 fn finalize(&mut self, txn_id: I) -> Option<&Canon<K, V>> {
325 if self.finalized > Some(txn_id) {
326 return None;
327 }
328
329 while let Some(version_id) = self.pending.keys().next().copied() {
330 if version_id <= txn_id {
331 self.pending.pop_first();
332 } else {
333 break;
334 }
335 }
336
337 while let Some(version_id) = self.commits.first().copied() {
338 if version_id <= txn_id {
339 self.commits.pop_first();
340 } else {
341 break;
342 }
343 }
344
345 while let Some(version_id) = self.deltas.keys().next().copied() {
346 if version_id <= txn_id {
347 let version = self.deltas.pop_first().expect("version");
348 merge_owned(&mut self.canon, version);
349 } else {
350 break;
351 }
352 }
353
354 self.finalized = Some(txn_id);
355
356 Some(&self.canon)
357 }
358
359 #[inline]
360 fn get_canon<Q>(&self, txn_id: &I, key: &Q) -> Option<Arc<V>>
361 where
362 Q: Hash + Eq + ?Sized,
363 Key<K>: Borrow<Q>,
364 {
365 get_canon(&self.canon, &self.deltas, txn_id, key).cloned()
366 }
367
368 #[inline]
369 fn get_committed<Q>(
370 &self,
371 txn_id: &I,
372 key: &Q,
373 ) -> Poll<Result<Option<TxnMapValueReadGuard<K, V>>>>
374 where
375 Q: Hash + Eq + ?Sized,
376 Key<K>: Borrow<Q>,
377 {
378 if self.finalized.as_ref() > Some(txn_id) {
379 Poll::Ready(Err(Error::Outdated))
380 } else if self.commits.contains(txn_id) {
381 debug_assert!(!self.pending.contains_key(txn_id));
382 let value = self.get_canon(txn_id, key);
383 Poll::Ready(Ok(value.map(TxnMapValueReadGuard::committed)))
384 } else {
385 Poll::Pending
386 }
387 }
388
389 #[inline]
390 fn get_pending<Q>(&self, txn_id: &I, key: &Q) -> Option<PendingValue<V>>
391 where
392 Q: Eq + Hash + ?Sized,
393 Key<K>: Borrow<Q>,
394 {
395 if let Some(version) = self.pending.get(txn_id) {
396 if let Some(delta) = version.get(key) {
397 return if let Some(value) = delta {
398 let guard = value.clone().try_read_owned().expect("read version");
400 Some(PendingValue::Pending(guard))
401 } else {
402 None
403 };
404 }
405 }
406
407 self.get_canon(txn_id, key)
408 .map(|value| PendingValue::Committed(value))
409 }
410
411 #[inline]
412 fn insert(&mut self, txn_id: I, key: Key<K>, value: V) -> Option<Arc<V>> {
413 let value = Arc::new(RwLock::new(value));
414
415 if let Some(deltas) = self.pending.get_mut(&txn_id) {
416 match deltas.entry(key) {
417 hash_map::Entry::Occupied(mut delta) => {
418 if let Some(prior) = delta.insert(Some(value)) {
419 let lock = Arc::try_unwrap(prior).expect("prior value");
420 let prior_value = Arc::new(lock.into_inner());
421 Some(prior_value)
422 } else {
423 get_canon(&self.canon, &self.deltas, &txn_id, delta.key()).cloned()
424 }
425 }
426 hash_map::Entry::Vacant(delta) => {
427 let prior = get_canon(&self.canon, &self.deltas, &txn_id, delta.key()).cloned();
428
429 delta.insert(Some(value));
430 prior
431 }
432 }
433 } else {
434 let prior = get_canon(&self.canon, &self.deltas, &txn_id, &key).cloned();
435
436 let pending = iter::once((key, Some(value))).collect();
437 self.pending.insert(txn_id, pending);
438
439 prior
440 }
441 }
442
443 #[inline]
444 fn key<Q>(&self, txn_id: &I, key: &Q) -> Option<&Key<K>>
445 where
446 Q: Eq + Hash + ?Sized,
447 Key<K>: Borrow<Q>,
448 {
449 if let Some((key, _)) = self.canon.get_key_value(key) {
450 Some(key)
451 } else if let Some(deltas) = self.pending.get(txn_id) {
452 deltas.get_key_value(key).map(|(key, _delta)| key)
453 } else {
454 None
455 }
456 }
457
458 #[inline]
459 fn keys_committed(&self, txn_id: &I) -> HashSet<Key<K>> {
460 let mut keys = self.canon.keys().cloned().collect();
461
462 let deltas = self
463 .deltas
464 .iter()
465 .take_while(|(id, _)| *id <= txn_id)
466 .map(|(_, version)| version);
467
468 for version in deltas {
469 merge_keys(&mut keys, version);
470 }
471
472 keys
473 }
474
475 #[inline]
476 fn keys_pending(&self, txn_id: I) -> HashSet<Key<K>> {
477 let mut keys = self.keys_committed(&txn_id);
478
479 if let Some(pending) = self.pending.get(&txn_id) {
480 merge_keys(&mut keys, pending);
481 }
482
483 keys
484 }
485
486 #[inline]
487 fn remove(&mut self, txn_id: I, key: Key<K>) -> Option<Arc<V>> {
488 if let Some(pending) = self.pending.get_mut(&txn_id) {
489 match pending.entry(key) {
490 hash_map::Entry::Occupied(mut entry) => {
491 if let Some(lock) = entry.insert(None) {
492 let lock = Arc::try_unwrap(lock).expect("removed value");
493 Some(Arc::new(lock.into_inner()))
494 } else {
495 None
496 }
497 }
498 hash_map::Entry::Vacant(entry) => {
499 if let Some(prior) =
500 get_canon(&self.canon, &self.deltas, &txn_id, entry.key()).cloned()
501 {
502 entry.insert(None);
503 Some(prior)
504 } else {
505 None
506 }
507 }
508 }
509 } else if let Some(prior) = get_canon(&self.canon, &self.deltas, &txn_id, &key).cloned() {
510 let pending = iter::once((key, None)).collect();
511 self.pending.insert(txn_id, pending);
512 Some(prior)
513 } else {
514 None
515 }
516 }
517}
518
519#[inline]
520fn contains_canon<I, K, V, Q>(
521 canon: &Canon<K, V>,
522 deltas: &OrdHashMap<I, Delta<K, V>>,
523 txn_id: &I,
524 key: &Q,
525) -> bool
526where
527 I: Hash + Ord + fmt::Debug,
528 K: Hash + Ord,
529 Q: Eq + Hash + ?Sized,
530 Key<K>: Borrow<Q>,
531{
532 let deltas = deltas
533 .iter()
534 .rev()
535 .skip_while(|(id, _)| *id > txn_id)
536 .map(|(_, version)| version);
537
538 for delta in deltas {
539 if let Some(key_state) = delta.get(key) {
540 return key_state.is_some();
541 }
542 }
543
544 canon.contains_key(key)
545}
546
547impl<I, K, V> State<I, K, V>
548where
549 I: Copy + Hash + Ord + fmt::Debug,
550 K: Hash + Ord,
551 V: Clone + fmt::Debug,
552{
553 #[inline]
554 fn get_mut(&mut self, txn_id: I, key: Key<K>) -> Option<OwnedRwLockWriteGuard<V>> {
555 #[inline]
556 fn new_value<V: Clone>(canon: &V) -> (Arc<RwLock<V>>, OwnedRwLockWriteGuard<V>) {
557 let value = V::clone(canon);
558 let value = Arc::new(RwLock::new(value));
559 let guard = value.clone().try_write_owned().expect("write version");
560 (value, guard)
561 }
562
563 if let Some(pending) = self.pending.get_mut(&txn_id) {
564 match pending.entry(key) {
565 hash_map::Entry::Occupied(delta) => {
566 let value = delta.get().as_ref()?;
567
568 value
569 .clone()
570 .try_write_owned()
571 .map(Some)
572 .expect("write version")
573 }
574 hash_map::Entry::Vacant(delta) => {
575 let canon = get_canon(&self.canon, &self.deltas, &txn_id, delta.key())?;
576 let (value, guard) = new_value(&**canon);
577
578 delta.insert(Some(value));
579
580 Some(guard)
581 }
582 }
583 } else {
584 let canon = get_canon(&self.canon, &self.deltas, &txn_id, &key)?;
585 let (value, guard) = new_value(&**canon);
586
587 let version = iter::once((key, Some(value))).collect();
588 self.pending.insert(txn_id, version);
589
590 Some(guard)
591 }
592 }
593
594 #[inline]
595 fn insert_new(&mut self, txn_id: I, key: Key<K>, value: V) -> OwnedRwLockWriteGuard<V> {
596 let value = Arc::new(RwLock::new(value));
597 let guard = value.clone().try_write_owned().expect("value");
598
599 if let Some(version) = self.pending.get_mut(&txn_id) {
600 assert!(version.insert(key, Some(value)).is_none());
601 } else {
602 let version = iter::once((key, Some(value))).collect();
603 self.pending.insert(txn_id, version);
604 }
605
606 guard
607 }
608}
609
610#[inline]
611fn get_canon<'a, I, K, V, Q>(
612 canon: &'a Canon<K, V>,
613 deltas: &'a Deltas<I, K, V>,
614 txn_id: &'a I,
615 key: &'a Q,
616) -> Option<&'a Arc<V>>
617where
618 I: Hash + Ord + fmt::Debug,
619 K: Hash + Ord,
620 Q: Eq + Hash + ?Sized,
621 Key<K>: Borrow<Q>,
622{
623 let committed = deltas
624 .iter()
625 .rev()
626 .skip_while(|(id, _)| *id > txn_id)
627 .map(|(_, version)| version);
628
629 for version in committed {
630 if let Some(delta) = version.get(key) {
631 return delta.as_ref();
632 }
633 }
634
635 canon.get(key)
636}
637
638#[inline]
639fn merge_keys<K: Hash + Ord, V>(keys: &mut HashSet<Key<K>>, deltas: &Delta<K, V>) {
640 for (key, delta) in deltas {
641 if delta.is_some() {
642 keys.insert(key.clone());
643 } else {
644 keys.remove(key);
645 }
646 }
647}
648
649pub struct EntryOccupied<K, V> {
651 key: Key<K>,
652 value: TxnMapValueWriteGuard<K, V>,
653}
654
655impl<K, V> EntryOccupied<K, V> {
656 pub fn get(&self) -> &V {
658 self.value.deref()
659 }
660
661 pub fn get_mut(&mut self) -> &mut V {
663 self.value.deref_mut()
664 }
665
666 pub fn key(&self) -> &K {
668 &self.key
669 }
670}
671
672pub struct EntryVacant<I, K, V> {
674 permit: PermitWrite<Range<K>>,
675 txn_id: I,
676 key: Key<K>,
677 map_state: Arc<RwLockInner<State<I, K, V>>>,
678}
679
680impl<I, K, V> EntryVacant<I, K, V>
681where
682 I: Hash + Ord + Copy + fmt::Debug,
683 K: Hash + Ord + Clone + fmt::Debug,
684 V: Clone + fmt::Debug,
685{
686 pub fn insert(self, value: V) -> TxnMapValueWriteGuard<K, V> {
688 let mut map_state = self.map_state.write().expect("lock state");
689 let value = map_state.insert_new(self.txn_id, self.key, value);
690 TxnMapValueWriteGuard::new(self.permit, value)
691 }
692
693 pub fn key(&self) -> &K {
695 &self.key
696 }
697}
698
699pub enum Entry<I, K, V> {
701 Occupied(EntryOccupied<K, V>),
702 Vacant(EntryVacant<I, K, V>),
703}
704
705impl<I, K, V> Entry<I, K, V> {
706 pub fn key(&self) -> &K {
707 todo!()
708 }
709}
710
711pub struct TxnMapLock<I, K, V> {
716 state: Arc<RwLockInner<State<I, K, V>>>,
717 semaphore: Semaphore<I, Collator<K>, Range<K>>,
718}
719
720impl<I, K, V> Clone for TxnMapLock<I, K, V> {
721 fn clone(&self) -> Self {
722 Self {
723 state: self.state.clone(),
724 semaphore: self.semaphore.clone(),
725 }
726 }
727}
728
729impl<I, K, V> TxnMapLock<I, K, V> {
730 #[inline]
731 fn state(&self) -> impl Deref<Target = State<I, K, V>> + '_ {
732 self.state.read().expect("lock state")
733 }
734
735 #[inline]
736 fn state_mut(&self) -> impl DerefMut<Target = State<I, K, V>> + '_ {
737 self.state.write().expect("lock state")
738 }
739}
740
741impl<I: Ord + Hash + fmt::Debug, K, V> TxnMapLock<I, K, V> {
742 pub fn new(txn_id: I) -> Self {
744 let collator = Collator::<K>::default();
745
746 Self {
747 state: Arc::new(RwLockInner::new(State::new(txn_id, Pending::new()))),
748 semaphore: Semaphore::new(collator),
749 }
750 }
751}
752
753impl<I, K, V> TxnMapLock<I, K, V>
754where
755 I: Copy + Hash + Ord + fmt::Debug,
756 K: Eq + Hash + Ord + fmt::Debug + Send + Sync,
757 V: fmt::Debug,
758{
759 pub fn with_contents<KV: IntoIterator<Item = (K, V)>>(txn_id: I, contents: KV) -> Self {
761 let version = contents
762 .into_iter()
763 .map(|(key, value)| (Key::new(key), Some(Arc::new(RwLock::new(value)))))
764 .collect();
765
766 let collator = Collator::<K>::default();
767
768 Self {
769 state: Arc::new(RwLockInner::new(State::new(txn_id, version))),
770 semaphore: Semaphore::with_reservation(txn_id, collator, Range::All),
771 }
772 }
773
774 pub async fn clear(&self, txn_id: I) -> Result<Canon<K, V>> {
776 #[cfg(feature = "logging")]
777 log::trace!("clear {self:?} at {txn_id:?}");
778
779 let _permit = self.semaphore.write(txn_id, Range::All).await?;
780
781 let mut state = self.state_mut();
782 state.check_pending(&txn_id)?;
783
784 Ok(state.clear(txn_id))
785 }
786
787 pub fn try_clear(&self, txn_id: I) -> Result<Canon<K, V>> {
789 #[cfg(feature = "logging")]
790 log::trace!("try_clear {self:?} at {txn_id:?}");
791
792 let mut state = self.state_mut();
793 state.check_pending(&txn_id)?;
794
795 let _permit = self.semaphore.try_write(txn_id, Range::All)?;
796
797 Ok(state.clear(txn_id))
798 }
799
800 pub async fn contains_key<Q>(&self, txn_id: I, key: &Q) -> Result<bool>
802 where
803 Q: Eq + Hash + ToOwned<Owned = K> + ?Sized,
804 Key<K>: Borrow<Q>,
805 {
806 #[cfg(feature = "logging")]
807 log::trace!("contains_key? {self:?} at {txn_id:?}");
808
809 let range: Range<K> = {
810 let state = self.state();
811 if let Poll::Ready(result) = state.contains_committed(&txn_id, key) {
812 return result;
813 } else {
814 Key::<K>::from((key, state.key(&txn_id, key))).into()
815 }
816 };
817
818 let _permit = self.semaphore.read(txn_id, range).await?;
819 Ok(self.state().contains_pending(&txn_id, key))
820 }
821
822 pub fn try_contains_key<Q>(&self, txn_id: I, key: &Q) -> Result<bool>
824 where
825 Q: Eq + Hash + ToOwned<Owned = K> + ?Sized,
826 Key<K>: Borrow<Q>,
827 {
828 #[cfg(feature = "logging")]
829 log::trace!("try_contains_key {self:?} at {txn_id:?}");
830
831 let state = self.state();
832 if let Poll::Ready(result) = state.contains_committed(&txn_id, key) {
833 return result;
834 }
835
836 let range = Key::<K>::from((key, state.key(&txn_id, key))).into();
837 let _permit = self.semaphore.try_read(txn_id, range)?;
838 Ok(state.contains_pending(&txn_id, key))
839 }
840
841 pub async fn extend<Q, E>(&self, txn_id: I, other: E) -> Result<()>
843 where
844 Q: Into<Key<K>>,
845 E: IntoIterator<Item = (Q, V)>,
846 {
847 #[cfg(feature = "logging")]
848 log::trace!("extend {self:?} at {txn_id:?}");
849
850 let _permit = self.semaphore.write(txn_id, Range::All).await?;
851
852 let mut state = self.state_mut();
853 state.check_pending(&txn_id)?;
854
855 state.extend(txn_id, other);
856 Ok(())
857 }
858
859 pub fn try_extend<Q, E>(&self, txn_id: I, other: E) -> Result<()>
861 where
862 Q: Into<Key<K>>,
863 E: IntoIterator<Item = (Q, V)>,
864 {
865 #[cfg(feature = "logging")]
866 log::trace!("try_extend {self:?} at {txn_id:?}");
867
868 let mut state = self.state_mut();
869 state.check_pending(&txn_id)?;
870
871 let _permit = self.semaphore.try_write(txn_id, Range::All)?;
872
873 state.extend(txn_id, other);
874 Ok(())
875 }
876
877 pub async fn get<Q>(&self, txn_id: I, key: &Q) -> Result<Option<TxnMapValueReadGuard<K, V>>>
879 where
880 Q: Eq + Hash + ToOwned<Owned = K> + ?Sized,
881 Key<K>: Borrow<Q>,
882 {
883 #[cfg(feature = "logging")]
884 log::trace!("get from {self:?} at {txn_id:?}");
885
886 let range: Range<K> = {
887 let state = self.state();
888
889 if let Poll::Ready(result) = state.get_committed(&txn_id, key) {
890 return result;
891 }
892
893 Key::<K>::from((key, state.key(&txn_id, key))).into()
894 };
895
896 let permit = self.semaphore.read(txn_id, range).await?;
897
898 let state = self.state();
899
900 if let Poll::Ready(result) = state.get_committed(&txn_id, key) {
901 return result;
902 }
903
904 let value = state.get_pending(&txn_id, key).map(|value| match value {
905 PendingValue::Committed(value) => TxnMapValueReadGuard::committed(value),
906 PendingValue::Pending(value) => TxnMapValueReadGuard::pending_write(permit, value),
907 });
908
909 Ok(value)
910 }
911
912 pub fn try_get<Q>(&self, txn_id: I, key: &Q) -> Result<Option<TxnMapValueReadGuard<K, V>>>
914 where
915 Q: Eq + Hash + ToOwned<Owned = K> + ?Sized,
916 Key<K>: Borrow<Q>,
917 {
918 #[cfg(feature = "logging")]
919 log::trace!("try_get from {self:?} at {txn_id:?}");
920
921 let state = self.state();
922
923 if let Poll::Ready(result) = state.get_committed(&txn_id, key) {
924 #[cfg(feature = "log")]
925 log::trace!(
926 "{self:?} is already committed at {txn_id:?}, no need for a read permit..."
927 );
928 return result;
929 } else {
930 #[cfg(feature = "log")]
931 log::trace!("{self:?} is not yet committed at {txn_id:?}, getting a read permit...");
932 }
933
934 let range = Key::<K>::from((key, state.key(&txn_id, key))).into();
935 let permit = self.semaphore.try_read(txn_id, range)?;
936 let value = state.get_pending(&txn_id, key).map(|value| match value {
937 PendingValue::Committed(value) => TxnMapValueReadGuard::committed(value),
938 PendingValue::Pending(value) => TxnMapValueReadGuard::pending_write(permit, value),
939 });
940
941 Ok(value)
942 }
943
944 pub async fn iter(&self, txn_id: I) -> Result<Iter<I, K, V>> {
946 #[cfg(feature = "logging")]
947 log::trace!("iter over {self:?} at {txn_id:?}");
948
949 let permit = self.semaphore.read(txn_id, Range::All).await?;
950
951 let state = self.state();
952
953 if state.check_committed(&txn_id)? {
954 let keys = state.keys_committed(&txn_id);
955 Ok(Iter::new(self.state.clone(), txn_id, None, keys))
956 } else {
957 let keys = self.state().keys_pending(txn_id);
958 Ok(Iter::new(self.state.clone(), txn_id, Some(permit), keys))
959 }
960 }
961
962 pub fn try_iter(&self, txn_id: I) -> Result<Iter<I, K, V>> {
964 #[cfg(feature = "logging")]
965 log::trace!("try_iter over {self:?} at {txn_id:?}");
966
967 let state = self.state();
968
969 if state.check_committed(&txn_id)? {
970 let keys = state.keys_committed(&txn_id);
971 Ok(Iter::new(self.state.clone(), txn_id, None, keys))
972 } else {
973 let permit = self.semaphore.try_read(txn_id, Range::All)?;
974 let keys = state.keys_pending(txn_id);
975 Ok(Iter::new(self.state.clone(), txn_id, Some(permit), keys))
976 }
977 }
978
979 pub async fn insert<Q: Into<Key<K>>>(
981 &self,
982 txn_id: I,
983 key: Q,
984 value: V,
985 ) -> Result<Option<Arc<V>>> {
986 #[cfg(feature = "logging")]
987 log::trace!("insert into {self:?} at {txn_id:?}");
988
989 let key: Key<K> = key.into();
990 let _permit = self.semaphore.write(txn_id, key.clone().into()).await?;
991
992 let mut state = self.state_mut();
993 state.check_pending(&txn_id)?;
994
995 Ok(state.insert(txn_id, key, value))
996 }
997
998 pub fn try_insert<Q: Into<Key<K>>>(
1000 &self,
1001 txn_id: I,
1002 key: Q,
1003 value: V,
1004 ) -> Result<Option<Arc<V>>> {
1005 #[cfg(feature = "logging")]
1006 log::trace!("try_insert into {self:?} at {txn_id:?}");
1007
1008 let mut state = self.state_mut();
1009 state.check_pending(&txn_id)?;
1010
1011 let key = key.into();
1012 let _permit = self.semaphore.try_write(txn_id, key.clone().into())?;
1013
1014 Ok(state.insert(txn_id, key, value))
1015 }
1016
1017 pub async fn is_empty(&self, txn_id: I) -> Result<bool> {
1019 #[cfg(feature = "logging")]
1020 log::trace!("is_empty? {self:?} at {txn_id:?}");
1021
1022 self.len(txn_id).map_ok(|len| len == 0).await
1023 }
1024
1025 pub async fn len(&self, txn_id: I) -> Result<usize> {
1027 #[cfg(feature = "logging")]
1028 log::trace!("len of {self:?} at {txn_id:?}");
1029
1030 let _permit = self.semaphore.read(txn_id, Range::All).await?;
1031
1032 let state = self.state();
1033
1034 if state.check_committed(&txn_id)? {
1035 let keys = state.keys_committed(&txn_id);
1036 Ok(keys.len())
1037 } else {
1038 let keys = state.keys_pending(txn_id);
1039 Ok(keys.len())
1040 }
1041 }
1042
1043 pub async fn remove<Q>(&self, txn_id: I, key: &Q) -> Result<Option<Arc<V>>>
1045 where
1046 Q: Eq + Hash + ToOwned<Owned = K> + ?Sized,
1047 Key<K>: Borrow<Q>,
1048 {
1049 #[cfg(feature = "logging")]
1050 log::trace!("remove from {self:?} at {txn_id:?}");
1051
1052 let key: Key<K> = {
1053 let state = self.state();
1054 state.check_pending(&txn_id)?;
1055 (key, state.key(&txn_id, key)).into()
1056 };
1057
1058 let _permit = self.semaphore.write(txn_id, key.clone().into()).await?;
1059
1060 let mut state = self.state_mut();
1061 state.check_pending(&txn_id)?;
1062
1063 Ok(state.remove(txn_id, key))
1064 }
1065
1066 pub fn try_remove<Q>(&self, txn_id: I, key: &Q) -> Result<Option<Arc<V>>>
1068 where
1069 Q: Eq + Hash + ToOwned<Owned = K> + ?Sized,
1070 Key<K>: Borrow<Q>,
1071 {
1072 #[cfg(feature = "logging")]
1073 log::trace!("try_remove from {self:?} at {txn_id:?}");
1074
1075 let mut state = self.state_mut();
1076 state.check_pending(&txn_id)?;
1077
1078 let key: Key<K> = (key, state.key(&txn_id, key)).into();
1079 let _permit = self.semaphore.try_write(txn_id, key.clone().into())?;
1080
1081 Ok(state.remove(txn_id, key))
1082 }
1083}
1084
1085impl<I, K, V> TxnMapLock<I, K, V>
1086where
1087 I: Copy + Hash + Ord + fmt::Debug,
1088 K: Hash + Ord + fmt::Debug + Send + Sync,
1089 V: Clone + fmt::Debug,
1090{
1091 pub async fn entry<Q: Into<Key<K>>>(&self, txn_id: I, key: Q) -> Result<Entry<I, K, V>> {
1093 #[cfg(feature = "logging")]
1094 log::trace!("get entry in {self:?} at {txn_id:?}");
1095
1096 self.state().check_pending(&txn_id)?;
1098
1099 let key: Key<K> = key.into();
1100 let range = key.clone().into();
1101 let permit = self.semaphore.write(txn_id, range).await?;
1102
1103 if let Some(value) = self.state_mut().get_mut(txn_id, key.clone()) {
1104 Ok(Entry::Occupied(EntryOccupied {
1105 key,
1106 value: TxnMapValueWriteGuard::new(permit, value),
1107 }))
1108 } else {
1109 Ok(Entry::Vacant(EntryVacant {
1110 permit,
1111 key,
1112 txn_id,
1113 map_state: self.state.clone(),
1114 }))
1115 }
1116 }
1117
1118 pub async fn get_mut<Q: Into<Key<K>>>(
1120 &self,
1121 txn_id: I,
1122 key: Q,
1123 ) -> Result<Option<TxnMapValueWriteGuard<K, V>>> {
1124 #[cfg(feature = "logging")]
1125 log::trace!("get_mut {self:?} at {txn_id:?}");
1126
1127 self.state().check_pending(&txn_id)?;
1129
1130 let key = key.into();
1131 let permit = self.semaphore.write(txn_id, key.clone().into()).await?;
1132
1133 if let Some(value) = self.state_mut().get_mut(txn_id, key) {
1134 Ok(Some(TxnMapValueWriteGuard::new(permit, value)))
1135 } else {
1136 Ok(None)
1137 }
1138 }
1139
1140 pub fn try_get_mut<Q>(&self, txn_id: I, key: &Q) -> Result<Option<TxnMapValueWriteGuard<K, V>>>
1142 where
1143 Q: Eq + Hash + ToOwned<Owned = K> + ?Sized,
1144 Key<K>: Borrow<Q>,
1145 {
1146 #[cfg(feature = "logging")]
1147 log::trace!("try_get_mut {self:?} at {txn_id:?}");
1148
1149 let mut state = self.state_mut();
1150
1151 state.check_pending(&txn_id)?;
1153
1154 let maybe_key = state.key(&txn_id, key);
1155 let key = Key::<K>::from((key, maybe_key));
1156 let permit = self.semaphore.try_write(txn_id, key.clone().into())?;
1157
1158 if let Some(value) = state.get_mut(txn_id, key) {
1159 Ok(Some(TxnMapValueWriteGuard::new(permit, value)))
1160 } else {
1161 Ok(None)
1162 }
1163 }
1164
1165 pub async fn iter_mut(&self, txn_id: I) -> Result<IterMut<I, K, V>> {
1167 #[cfg(feature = "logging")]
1168 log::trace!("iter_mut {self:?} at {txn_id:?}");
1169
1170 self.state().check_pending(&txn_id)?;
1172
1173 let permit = self.semaphore.write(txn_id, Range::All).await?;
1174 let keys = self.state().keys_pending(txn_id);
1175 Ok(IterMut::new(self.state.clone(), txn_id, permit, keys))
1176 }
1177
1178 pub fn try_iter_mut(&self, txn_id: I) -> Result<IterMut<I, K, V>> {
1181 #[cfg(feature = "logging")]
1182 log::trace!("try_iter_mut {self:?} at {txn_id:?}");
1183
1184 let state = self.state();
1185
1186 state.check_pending(&txn_id)?;
1188
1189 let permit = self.semaphore.try_write(txn_id, Range::All)?;
1190 let keys = state.keys_pending(txn_id);
1191 Ok(IterMut::new(self.state.clone(), txn_id, permit, keys))
1192 }
1193}
1194
1195impl<I, K, V> TxnMapLock<I, K, V>
1196where
1197 I: Copy + Hash + Ord + fmt::Debug,
1198 K: Eq + Hash + Ord + fmt::Debug + Send + Sync,
1199 V: fmt::Debug,
1200{
1201 pub fn commit(&self, txn_id: I) {
1206 #[cfg(feature = "logging")]
1207 log::trace!("commit {self:?} at {txn_id:?}");
1208
1209 let mut state = self.state_mut();
1210
1211 if state.finalized.as_ref() >= Some(&txn_id) {
1212 panic!("tried to commit already-finalized version {:?}", txn_id);
1213 }
1214
1215 state.commit(txn_id);
1216
1217 self.semaphore.finalize(&txn_id, false);
1218 }
1219
1220 pub async fn read_and_commit(&self, txn_id: I) -> (Canon<K, V>, Option<Delta<K, V>>) {
1227 #[cfg(feature = "logging")]
1228 log::trace!("read and commit {self:?} at {txn_id:?}");
1229
1230 let _permit = self
1231 .semaphore
1232 .read(txn_id, Range::All)
1233 .await
1234 .expect("permit");
1235
1236 let (version, deltas) = {
1237 let mut state = self.state_mut();
1238
1239 if state.finalized > Some(txn_id) {
1240 panic!("tried to commit already-finalized version {:?}", txn_id);
1241 }
1242
1243 state.commit(txn_id);
1244
1245 (state.canon(&txn_id), state.deltas.get(&txn_id).cloned())
1246 };
1247
1248 self.semaphore.finalize(&txn_id, false);
1249
1250 (version, deltas)
1251 }
1252
1253 pub fn rollback(&self, txn_id: &I) {
1257 self.semaphore.finalize(txn_id, false);
1258
1259 let mut state = self.state_mut();
1260
1261 assert!(
1262 !state.commits.contains(txn_id),
1263 "cannot roll back committed transaction {:?}",
1264 txn_id
1265 );
1266
1267 if state.finalized.as_ref() > Some(txn_id) {
1268 panic!("tried to roll back finalized version at {:?}", txn_id);
1269 }
1270
1271 state.pending.remove(txn_id);
1272
1273 self.semaphore.finalize(txn_id, false);
1274 }
1275
1276 pub async fn read_and_rollback(&self, txn_id: I) -> (Canon<K, V>, Option<Delta<K, V>>) {
1282 let _permit = self
1283 .semaphore
1284 .read(txn_id, Range::All)
1285 .await
1286 .expect("permit");
1287
1288 let (version, deltas) = {
1289 let mut state = self.state_mut();
1290
1291 assert!(
1292 !state.commits.contains(&txn_id),
1293 "cannot roll back committed transaction {:?}",
1294 txn_id
1295 );
1296
1297 if state.finalized > Some(txn_id) {
1298 panic!("tried to roll back finalized version at {:?}", txn_id);
1299 }
1300
1301 let deltas = state.commit_version(&txn_id);
1303
1304 let mut version = state.canon(&txn_id);
1305
1306 if let Some(deltas) = &deltas {
1307 merge(&mut version, deltas);
1308 }
1309
1310 (version, deltas)
1311 };
1312
1313 self.semaphore.finalize(&txn_id, false);
1314
1315 (version, deltas)
1316 }
1317
1318 pub fn finalize(&self, txn_id: I) {
1321 self.semaphore.finalize(&txn_id, true);
1322 self.state_mut().finalize(txn_id);
1323 }
1324
1325 pub fn read_and_finalize(&self, txn_id: I) -> Option<Canon<K, V>> {
1328 self.semaphore.finalize(&txn_id, true);
1329 self.state_mut().finalize(txn_id).cloned()
1330 }
1331}
1332
1333impl<I, K, V> fmt::Debug for TxnMapLock<I, K, V> {
1334 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1335 f.write_str("a transactional lock on a map of keys to values")
1336 }
1337}
1338
1339#[derive(Debug)]
1341pub struct TxnMapLockIterGuard<V> {
1342 value: PendingValue<V>,
1343}
1344
1345impl<V> From<PendingValue<V>> for TxnMapLockIterGuard<V> {
1346 fn from(value: PendingValue<V>) -> Self {
1347 Self { value }
1348 }
1349}
1350
1351impl<V> Deref for TxnMapLockIterGuard<V> {
1352 type Target = V;
1353
1354 fn deref(&self) -> &Self::Target {
1355 match &self.value {
1356 PendingValue::Committed(value) => value.deref(),
1357 PendingValue::Pending(value) => value.deref(),
1358 }
1359 }
1360}
1361
1362impl<V: PartialEq> PartialEq<V> for TxnMapLockIterGuard<V> {
1363 fn eq(&self, other: &V) -> bool {
1364 self.deref().eq(other)
1365 }
1366}
1367
1368impl<V: PartialOrd> PartialOrd<V> for TxnMapLockIterGuard<V> {
1369 fn partial_cmp(&self, other: &V) -> Option<Ordering> {
1370 self.deref().partial_cmp(other)
1371 }
1372}
1373
1374pub struct Iter<I, K, V> {
1376 lock_state: Arc<RwLockInner<State<I, K, V>>>,
1377 txn_id: I,
1378 permit: Option<PermitRead<Range<K>>>,
1379 keys: <HashSet<Key<K>> as IntoIterator>::IntoIter,
1380}
1381
1382impl<I, K, V> Iter<I, K, V> {
1383 fn new(
1384 lock_state: Arc<RwLockInner<State<I, K, V>>>,
1385 txn_id: I,
1386 permit: Option<PermitRead<Range<K>>>,
1387 keys: HashSet<Key<K>>,
1388 ) -> Self {
1389 Self {
1390 lock_state,
1391 txn_id,
1392 permit,
1393 keys: keys.into_iter(),
1394 }
1395 }
1396}
1397
1398impl<I, K, V> Iterator for Iter<I, K, V>
1399where
1400 I: Copy + Hash + Ord + fmt::Debug,
1401 K: Hash + Ord,
1402 V: fmt::Debug,
1403{
1404 type Item = (Key<K>, TxnMapLockIterGuard<V>);
1405
1406 fn next(&mut self) -> Option<Self::Item> {
1407 let state = self.lock_state.read().expect("lock state");
1408
1409 loop {
1410 let key = self.keys.next()?;
1411 let value = get_key(&state, &self.txn_id, &key, self.permit.is_none());
1412 if let Some(value) = value {
1413 return Some((key, value.into()));
1414 }
1415 }
1416 }
1417
1418 fn size_hint(&self) -> (usize, Option<usize>) {
1419 self.keys.size_hint()
1420 }
1421}
1422
1423#[inline]
1424fn get_key<I, K, V, Q>(
1425 state: &State<I, K, V>,
1426 txn_id: &I,
1427 key: &Q,
1428 committed: bool,
1429) -> Option<PendingValue<V>>
1430where
1431 I: Copy + Hash + Ord + fmt::Debug,
1432 K: Hash + Ord,
1433 V: fmt::Debug,
1434 Q: Eq + Hash + ?Sized,
1435 Key<K>: Borrow<Q>,
1436{
1437 if committed {
1438 state.get_canon(txn_id, key).map(PendingValue::Committed)
1439 } else {
1440 state.get_pending(txn_id, key)
1441 }
1442}
1443
1444#[derive(Debug)]
1446pub struct TxnMapLockIterMutGuard<V> {
1447 guard: OwnedRwLockWriteGuard<V>,
1448}
1449
1450impl<V> Deref for TxnMapLockIterMutGuard<V> {
1451 type Target = V;
1452
1453 fn deref(&self) -> &Self::Target {
1454 self.guard.deref()
1455 }
1456}
1457
1458impl<V> DerefMut for TxnMapLockIterMutGuard<V> {
1459 fn deref_mut(&mut self) -> &mut Self::Target {
1460 self.guard.deref_mut()
1461 }
1462}
1463
1464impl<V: PartialEq> PartialEq<V> for TxnMapLockIterMutGuard<V> {
1465 fn eq(&self, other: &V) -> bool {
1466 self.deref().eq(other)
1467 }
1468}
1469
1470impl<V: PartialOrd> PartialOrd<V> for TxnMapLockIterMutGuard<V> {
1471 fn partial_cmp(&self, other: &V) -> Option<Ordering> {
1472 self.deref().partial_cmp(other)
1473 }
1474}
1475
1476impl<V> From<OwnedRwLockWriteGuard<V>> for TxnMapLockIterMutGuard<V> {
1477 fn from(guard: OwnedRwLockWriteGuard<V>) -> Self {
1478 Self { guard }
1479 }
1480}
1481
1482pub struct IterMut<I, K, V> {
1484 lock_state: Arc<RwLockInner<State<I, K, V>>>,
1485 txn_id: I,
1486
1487 #[allow(unused)]
1488 permit: PermitWrite<Range<K>>,
1489 keys: <HashSet<Key<K>> as IntoIterator>::IntoIter,
1490}
1491
1492impl<I, K, V> IterMut<I, K, V> {
1493 fn new(
1494 lock_state: Arc<RwLockInner<State<I, K, V>>>,
1495 txn_id: I,
1496 permit: PermitWrite<Range<K>>,
1497 keys: HashSet<Key<K>>,
1498 ) -> Self {
1499 Self {
1500 lock_state,
1501 txn_id,
1502 permit,
1503 keys: keys.into_iter(),
1504 }
1505 }
1506}
1507
1508impl<I, K, V> Iterator for IterMut<I, K, V>
1509where
1510 I: Copy + Hash + Ord + fmt::Debug,
1511 K: Hash + Ord,
1512 V: Clone + fmt::Debug,
1513{
1514 type Item = (Arc<K>, TxnMapLockIterMutGuard<V>);
1515
1516 fn next(&mut self) -> Option<Self::Item> {
1517 let mut state = self.lock_state.write().expect("lock state");
1518
1519 loop {
1520 let key = self.keys.next()?;
1521 if let Some(guard) = state.get_mut(self.txn_id, key.clone()) {
1522 return Some((key.into(), TxnMapLockIterMutGuard::from(guard)));
1523 }
1524 }
1525 }
1526
1527 fn size_hint(&self) -> (usize, Option<usize>) {
1528 self.keys.size_hint()
1529 }
1530}
1531
1532#[inline]
1533fn merge<K: Eq + Hash, V>(version: &mut Canon<K, V>, deltas: &Delta<K, V>) {
1534 for (key, delta) in deltas {
1535 match delta {
1536 Some(value) => version.insert(key.clone(), value.clone()),
1537 None => version.remove(key),
1538 };
1539 }
1540}
1541
1542#[inline]
1543fn merge_owned<K: Eq + Hash, V>(version: &mut Canon<K, V>, deltas: Delta<K, V>) {
1544 for (key, delta) in deltas {
1545 match delta {
1546 Some(value) => version.insert(key, value),
1547 None => version.remove(&key),
1548 };
1549 }
1550}