Skip to main content

txn_lock/
map.rs

1//! A futures-aware read-write lock on a [`HashMap`] which supports transactional versioning.
2//!
3//! Example usage:
4//! ```
5//! use std::collections::HashMap;
6//! use std::sync::Arc;
7//!
8//! use futures::executor::block_on;
9//!
10//! use txn_lock::map::*;
11//! use txn_lock::Error;
12//!
13//! let one = "one";
14//! let two = "two";
15//!
16//! let map = TxnMapLock::<u64, String, f32>::new(1);
17//!
18//! assert_eq!(block_on(map.insert(1, one.to_string(), 1.0)).expect("insert"), None);
19//!
20//! let value = block_on(map.get(1, one)).expect("read").expect("value");
21//! assert_eq!(value, 1.0);
22//!
23//! assert_eq!(map.try_insert(1, one.to_string(), 2.0).unwrap_err(), Error::WouldBlock);
24//!
25//! std::mem::drop(value);
26//!
27//! let (state_at_txn_1, deltas) = block_on(map.read_and_commit(1));
28//! assert_eq!(deltas.expect("deltas").len(), 1);
29//! assert_eq!(state_at_txn_1.len(), 1);
30//!
31//! let mut value = map.try_get_mut(2, one).expect("read").expect("value");
32//! assert_eq!(value, 1.0);
33//! *value = 2.0;
34//!
35//! assert_eq!(map.try_remove(2, one).unwrap_err(), Error::WouldBlock);
36//! std::mem::drop(value);
37//!
38//! let value = block_on(map.remove(2, one)).expect("remove").expect("value");
39//! assert_eq!(*value, 2.0);
40//!
41//! assert_eq!(map.try_insert(2, two.to_string(), 1.0).expect("insert"), None);
42//!
43//! assert!(map.try_remove(2, one).expect("remove").is_none());
44//!
45//! assert_eq!(map.try_insert(2, two.to_string(), 2.0).expect("insert"), Some(1.0.into()));
46//!
47//! map.rollback(&2);
48//!
49//! let value = map.try_get(1, one).expect("read");
50//! assert_eq!(*(value.expect("guard")), 1.0);
51//!
52//! assert!(map.try_remove(3, two).expect("remove").is_none());
53//!
54//! map.finalize(2);
55//!
56//! assert_eq!(map.try_get(1, one).unwrap_err(), Error::Outdated);
57//!
58//! let value = map.try_get(3, one).expect("read");
59//! assert_eq!(*(value.expect("guard")), 1.0);
60//!
61//! map.commit(3);
62//!
63//! let extension = [
64//!     ("one".to_string(), 1.0),
65//!     ("two".to_string(), 2.0),
66//!     ("three".to_string(), 3.0),
67//!     ("four".to_string(), 4.0),
68//!     ("five".to_string(), 5.0),
69//! ];
70//!
71//! map.try_extend(4, extension).expect("extend");
72//!
73//! for (key, mut value) in map.try_iter_mut(4).expect("iter") {
74//!     *value *= 2.;
75//! }
76//!
77//! let expected = HashMap::<String, f32>::from_iter([
78//!     ("two".to_string(), 4.0),
79//!     ("three".to_string(), 6.0),
80//!     ("one".to_string(), 2.0),
81//!     ("four".to_string(), 8.0),
82//!     ("five".to_string(), 10.0),
83//! ]);
84//!
85//! let actual = map.try_iter(4).expect("iter").collect::<Vec<_>>();
86//! assert_eq!(actual.len(), expected.len());
87//!
88//! for (k, v) in actual {
89//!     assert_eq!(expected.get(&*k), Some(&*v));
90//! }
91//!
92//! let actual = map.try_clear(4).expect("clear");
93//! assert_eq!(actual, expected.into_iter().map(|(k, v)| (k.into(), v.into())).collect());
94//!
95//! ```
96
97use std::borrow::Borrow;
98use std::cmp::Ordering;
99use std::collections::hash_map::{self, HashMap};
100use std::collections::HashSet;
101use std::hash::Hash;
102use std::ops::{Deref, DerefMut};
103use std::sync::{Arc, RwLock as RwLockInner};
104use std::task::Poll;
105use std::{fmt, iter};
106
107use collate::Collator;
108use ds_ext::{OrdHashMap, OrdHashSet};
109use futures::TryFutureExt;
110use tokio::sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock};
111
112use super::guard::{TxnReadGuard, TxnReadGuardMap, TxnWriteGuard};
113use super::semaphore::*;
114use super::{Error, Result};
115
116pub use super::range::{Key, Range};
117
118type Canon<K, V> = HashMap<Key<K>, Arc<V>>;
119type Delta<K, V> = HashMap<Key<K>, Option<Arc<V>>>;
120type Deltas<I, K, V> = OrdHashMap<I, Delta<K, V>>;
121type Pending<K, V> = HashMap<Key<K>, Option<Arc<RwLock<V>>>>;
122
123/// A read guard on a value in a [`TxnMapLock`]
124pub type TxnMapValueReadGuard<K, V> = TxnReadGuard<Range<K>, V>;
125
126/// A mapped read guard on a value in a [`TxnMapLock`]
127pub type TxnMapValueReadGuardMap<K, V> = TxnReadGuardMap<Range<K>, V>;
128
129/// A write guard on a value in a [`TxnMapLock`]
130pub type TxnMapValueWriteGuard<K, V> = TxnWriteGuard<Range<K>, V>;
131
132#[derive(Debug)]
133enum PendingValue<V> {
134    Committed(Arc<V>),
135    Pending(OwnedRwLockReadGuard<V>),
136}
137
138struct State<I, K, V> {
139    canon: Canon<K, V>,
140    commits: OrdHashSet<I>,
141    deltas: Deltas<I, K, V>,
142    pending: OrdHashMap<I, Pending<K, V>>,
143    finalized: Option<I>,
144}
145
146impl<I: Ord + Hash + fmt::Debug, K, V> State<I, K, V> {
147    #[inline]
148    fn new(txn_id: I, version: Pending<K, V>) -> Self {
149        Self {
150            canon: Canon::new(),
151            commits: OrdHashSet::new(),
152            deltas: OrdHashMap::new(),
153            pending: OrdHashMap::from_iter(iter::once((txn_id, version))),
154            finalized: None,
155        }
156    }
157}
158
159impl<I, K, V> State<I, K, V>
160where
161    I: Copy + Hash + Ord + fmt::Debug,
162    K: Hash + Ord,
163    V: fmt::Debug,
164{
165    #[inline]
166    fn check_committed(&self, txn_id: &I) -> Result<bool> {
167        match self.finalized.as_ref().cmp(&Some(txn_id)) {
168            Ordering::Greater => Err(Error::Outdated),
169            Ordering::Equal => Ok(true),
170            Ordering::Less => Ok(self.commits.contains(txn_id)),
171        }
172    }
173
174    #[inline]
175    fn check_pending(&self, txn_id: &I) -> Result<()> {
176        if self.finalized.as_ref() > Some(txn_id) {
177            Err(Error::Outdated)
178        } else if self.commits.contains(txn_id) {
179            Err(Error::Committed)
180        } else {
181            Ok(())
182        }
183    }
184
185    #[inline]
186    fn canon(&self, txn_id: &I) -> Canon<K, V> {
187        let mut canon = self.canon.clone();
188
189        let deltas = self
190            .deltas
191            .iter()
192            .take_while(|(id, _)| *id <= txn_id)
193            .map(|(_id, version)| version);
194
195        for version in deltas {
196            for (key, delta) in version {
197                if let Some(value) = delta {
198                    canon.insert(key.clone(), value.clone());
199                } else {
200                    canon.remove(key);
201                }
202            }
203        }
204
205        canon
206    }
207
208    #[inline]
209    fn clear(&mut self, txn_id: I) -> Canon<K, V> {
210        let mut map = self.canon(&txn_id);
211
212        if let Some(version) = self.pending.remove(&txn_id) {
213            for (key, delta) in version {
214                if let Some(value) = delta {
215                    let value = Arc::try_unwrap(value).expect("value");
216                    map.insert(key, Arc::new(value.into_inner()));
217                } else {
218                    map.remove(&key);
219                }
220            }
221        }
222
223        let version = map.keys().cloned().map(|key| (key, None)).collect();
224        self.pending.insert(txn_id, version);
225
226        map
227    }
228
229    #[inline]
230    fn commit_version(&mut self, txn_id: &I) -> Option<Delta<K, V>> {
231        self.pending.remove(txn_id).map(|version| {
232            version
233                .into_iter()
234                .map(|(key, delta)| {
235                    let value = if let Some(present) = delta {
236                        if let Ok(value) = Arc::try_unwrap(present) {
237                            Some(Arc::new(value.into_inner()))
238                        } else {
239                            panic!("a value at {:?} is still locked", txn_id);
240                        }
241                    } else {
242                        None
243                    };
244
245                    (key, value)
246                })
247                .collect()
248        })
249    }
250
251    #[inline]
252    fn commit(&mut self, txn_id: I) {
253        if self.commits.contains(&txn_id) {
254            #[cfg(feature = "logging")]
255            log::warn!("duplicate commit at {:?}", txn_id);
256        } else if let Some(version) = self.commit_version(&txn_id) {
257            self.deltas.insert(txn_id, version);
258        }
259
260        self.commits.insert(txn_id);
261    }
262
263    #[inline]
264    fn contains_canon<Q>(&self, txn_id: &I, key: &Q) -> bool
265    where
266        Q: Eq + Hash + ?Sized,
267        Key<K>: Borrow<Q>,
268    {
269        contains_canon(&self.canon, &self.deltas, txn_id, key)
270    }
271
272    #[inline]
273    fn contains_committed<Q>(&self, txn_id: &I, key: &Q) -> Poll<Result<bool>>
274    where
275        Q: Eq + Hash + ?Sized,
276        Key<K>: Borrow<Q>,
277    {
278        match self.finalized.as_ref().cmp(&Some(txn_id)) {
279            Ordering::Greater => Poll::Ready(Err(Error::Outdated)),
280            Ordering::Equal => Poll::Ready(Ok(self.contains_canon(txn_id, key))),
281            Ordering::Less => {
282                if self.commits.contains(txn_id) {
283                    Poll::Ready(Ok(self.contains_canon(txn_id, key)))
284                } else {
285                    Poll::Pending
286                }
287            }
288        }
289    }
290
291    #[inline]
292    fn contains_pending<Q>(&self, txn_id: &I, key: &Q) -> bool
293    where
294        Q: Eq + Hash + ?Sized,
295        Key<K>: Borrow<Q>,
296    {
297        if let Some(delta) = self.pending.get(txn_id) {
298            if let Some(key_state) = delta.get(key) {
299                return key_state.is_some();
300            }
301        }
302
303        self.contains_canon(txn_id, key)
304    }
305
306    #[inline]
307    fn extend<Q, E>(&mut self, txn_id: I, other: E)
308    where
309        Q: Into<Key<K>>,
310        E: IntoIterator<Item = (Q, V)>,
311    {
312        let entries = other
313            .into_iter()
314            .map(|(key, value)| (key.into(), Some(Arc::new(RwLock::new(value)))));
315
316        if let Some(version) = self.pending.get_mut(&txn_id) {
317            version.extend(entries);
318        } else {
319            self.pending.insert(txn_id, entries.collect());
320        }
321    }
322
323    #[inline]
324    fn finalize(&mut self, txn_id: I) -> Option<&Canon<K, V>> {
325        if self.finalized > Some(txn_id) {
326            return None;
327        }
328
329        while let Some(version_id) = self.pending.keys().next().copied() {
330            if version_id <= txn_id {
331                self.pending.pop_first();
332            } else {
333                break;
334            }
335        }
336
337        while let Some(version_id) = self.commits.first().copied() {
338            if version_id <= txn_id {
339                self.commits.pop_first();
340            } else {
341                break;
342            }
343        }
344
345        while let Some(version_id) = self.deltas.keys().next().copied() {
346            if version_id <= txn_id {
347                let version = self.deltas.pop_first().expect("version");
348                merge_owned(&mut self.canon, version);
349            } else {
350                break;
351            }
352        }
353
354        self.finalized = Some(txn_id);
355
356        Some(&self.canon)
357    }
358
359    #[inline]
360    fn get_canon<Q>(&self, txn_id: &I, key: &Q) -> Option<Arc<V>>
361    where
362        Q: Hash + Eq + ?Sized,
363        Key<K>: Borrow<Q>,
364    {
365        get_canon(&self.canon, &self.deltas, txn_id, key).cloned()
366    }
367
368    #[inline]
369    fn get_committed<Q>(
370        &self,
371        txn_id: &I,
372        key: &Q,
373    ) -> Poll<Result<Option<TxnMapValueReadGuard<K, V>>>>
374    where
375        Q: Hash + Eq + ?Sized,
376        Key<K>: Borrow<Q>,
377    {
378        if self.finalized.as_ref() > Some(txn_id) {
379            Poll::Ready(Err(Error::Outdated))
380        } else if self.commits.contains(txn_id) {
381            debug_assert!(!self.pending.contains_key(txn_id));
382            let value = self.get_canon(txn_id, key);
383            Poll::Ready(Ok(value.map(TxnMapValueReadGuard::committed)))
384        } else {
385            Poll::Pending
386        }
387    }
388
389    #[inline]
390    fn get_pending<Q>(&self, txn_id: &I, key: &Q) -> Option<PendingValue<V>>
391    where
392        Q: Eq + Hash + ?Sized,
393        Key<K>: Borrow<Q>,
394    {
395        if let Some(version) = self.pending.get(txn_id) {
396            if let Some(delta) = version.get(key) {
397                return if let Some(value) = delta {
398                    // the permit means it's safe to call try_read_owned().expect()
399                    let guard = value.clone().try_read_owned().expect("read version");
400                    Some(PendingValue::Pending(guard))
401                } else {
402                    None
403                };
404            }
405        }
406
407        self.get_canon(txn_id, key)
408            .map(|value| PendingValue::Committed(value))
409    }
410
411    #[inline]
412    fn insert(&mut self, txn_id: I, key: Key<K>, value: V) -> Option<Arc<V>> {
413        let value = Arc::new(RwLock::new(value));
414
415        if let Some(deltas) = self.pending.get_mut(&txn_id) {
416            match deltas.entry(key) {
417                hash_map::Entry::Occupied(mut delta) => {
418                    if let Some(prior) = delta.insert(Some(value)) {
419                        let lock = Arc::try_unwrap(prior).expect("prior value");
420                        let prior_value = Arc::new(lock.into_inner());
421                        Some(prior_value)
422                    } else {
423                        get_canon(&self.canon, &self.deltas, &txn_id, delta.key()).cloned()
424                    }
425                }
426                hash_map::Entry::Vacant(delta) => {
427                    let prior = get_canon(&self.canon, &self.deltas, &txn_id, delta.key()).cloned();
428
429                    delta.insert(Some(value));
430                    prior
431                }
432            }
433        } else {
434            let prior = get_canon(&self.canon, &self.deltas, &txn_id, &key).cloned();
435
436            let pending = iter::once((key, Some(value))).collect();
437            self.pending.insert(txn_id, pending);
438
439            prior
440        }
441    }
442
443    #[inline]
444    fn key<Q>(&self, txn_id: &I, key: &Q) -> Option<&Key<K>>
445    where
446        Q: Eq + Hash + ?Sized,
447        Key<K>: Borrow<Q>,
448    {
449        if let Some((key, _)) = self.canon.get_key_value(key) {
450            Some(key)
451        } else if let Some(deltas) = self.pending.get(txn_id) {
452            deltas.get_key_value(key).map(|(key, _delta)| key)
453        } else {
454            None
455        }
456    }
457
458    #[inline]
459    fn keys_committed(&self, txn_id: &I) -> HashSet<Key<K>> {
460        let mut keys = self.canon.keys().cloned().collect();
461
462        let deltas = self
463            .deltas
464            .iter()
465            .take_while(|(id, _)| *id <= txn_id)
466            .map(|(_, version)| version);
467
468        for version in deltas {
469            merge_keys(&mut keys, version);
470        }
471
472        keys
473    }
474
475    #[inline]
476    fn keys_pending(&self, txn_id: I) -> HashSet<Key<K>> {
477        let mut keys = self.keys_committed(&txn_id);
478
479        if let Some(pending) = self.pending.get(&txn_id) {
480            merge_keys(&mut keys, pending);
481        }
482
483        keys
484    }
485
486    #[inline]
487    fn remove(&mut self, txn_id: I, key: Key<K>) -> Option<Arc<V>> {
488        if let Some(pending) = self.pending.get_mut(&txn_id) {
489            match pending.entry(key) {
490                hash_map::Entry::Occupied(mut entry) => {
491                    if let Some(lock) = entry.insert(None) {
492                        let lock = Arc::try_unwrap(lock).expect("removed value");
493                        Some(Arc::new(lock.into_inner()))
494                    } else {
495                        None
496                    }
497                }
498                hash_map::Entry::Vacant(entry) => {
499                    if let Some(prior) =
500                        get_canon(&self.canon, &self.deltas, &txn_id, entry.key()).cloned()
501                    {
502                        entry.insert(None);
503                        Some(prior)
504                    } else {
505                        None
506                    }
507                }
508            }
509        } else if let Some(prior) = get_canon(&self.canon, &self.deltas, &txn_id, &key).cloned() {
510            let pending = iter::once((key, None)).collect();
511            self.pending.insert(txn_id, pending);
512            Some(prior)
513        } else {
514            None
515        }
516    }
517}
518
519#[inline]
520fn contains_canon<I, K, V, Q>(
521    canon: &Canon<K, V>,
522    deltas: &OrdHashMap<I, Delta<K, V>>,
523    txn_id: &I,
524    key: &Q,
525) -> bool
526where
527    I: Hash + Ord + fmt::Debug,
528    K: Hash + Ord,
529    Q: Eq + Hash + ?Sized,
530    Key<K>: Borrow<Q>,
531{
532    let deltas = deltas
533        .iter()
534        .rev()
535        .skip_while(|(id, _)| *id > txn_id)
536        .map(|(_, version)| version);
537
538    for delta in deltas {
539        if let Some(key_state) = delta.get(key) {
540            return key_state.is_some();
541        }
542    }
543
544    canon.contains_key(key)
545}
546
547impl<I, K, V> State<I, K, V>
548where
549    I: Copy + Hash + Ord + fmt::Debug,
550    K: Hash + Ord,
551    V: Clone + fmt::Debug,
552{
553    #[inline]
554    fn get_mut(&mut self, txn_id: I, key: Key<K>) -> Option<OwnedRwLockWriteGuard<V>> {
555        #[inline]
556        fn new_value<V: Clone>(canon: &V) -> (Arc<RwLock<V>>, OwnedRwLockWriteGuard<V>) {
557            let value = V::clone(canon);
558            let value = Arc::new(RwLock::new(value));
559            let guard = value.clone().try_write_owned().expect("write version");
560            (value, guard)
561        }
562
563        if let Some(pending) = self.pending.get_mut(&txn_id) {
564            match pending.entry(key) {
565                hash_map::Entry::Occupied(delta) => {
566                    let value = delta.get().as_ref()?;
567
568                    value
569                        .clone()
570                        .try_write_owned()
571                        .map(Some)
572                        .expect("write version")
573                }
574                hash_map::Entry::Vacant(delta) => {
575                    let canon = get_canon(&self.canon, &self.deltas, &txn_id, delta.key())?;
576                    let (value, guard) = new_value(&**canon);
577
578                    delta.insert(Some(value));
579
580                    Some(guard)
581                }
582            }
583        } else {
584            let canon = get_canon(&self.canon, &self.deltas, &txn_id, &key)?;
585            let (value, guard) = new_value(&**canon);
586
587            let version = iter::once((key, Some(value))).collect();
588            self.pending.insert(txn_id, version);
589
590            Some(guard)
591        }
592    }
593
594    #[inline]
595    fn insert_new(&mut self, txn_id: I, key: Key<K>, value: V) -> OwnedRwLockWriteGuard<V> {
596        let value = Arc::new(RwLock::new(value));
597        let guard = value.clone().try_write_owned().expect("value");
598
599        if let Some(version) = self.pending.get_mut(&txn_id) {
600            assert!(version.insert(key, Some(value)).is_none());
601        } else {
602            let version = iter::once((key, Some(value))).collect();
603            self.pending.insert(txn_id, version);
604        }
605
606        guard
607    }
608}
609
610#[inline]
611fn get_canon<'a, I, K, V, Q>(
612    canon: &'a Canon<K, V>,
613    deltas: &'a Deltas<I, K, V>,
614    txn_id: &'a I,
615    key: &'a Q,
616) -> Option<&'a Arc<V>>
617where
618    I: Hash + Ord + fmt::Debug,
619    K: Hash + Ord,
620    Q: Eq + Hash + ?Sized,
621    Key<K>: Borrow<Q>,
622{
623    let committed = deltas
624        .iter()
625        .rev()
626        .skip_while(|(id, _)| *id > txn_id)
627        .map(|(_, version)| version);
628
629    for version in committed {
630        if let Some(delta) = version.get(key) {
631            return delta.as_ref();
632        }
633    }
634
635    canon.get(key)
636}
637
638#[inline]
639fn merge_keys<K: Hash + Ord, V>(keys: &mut HashSet<Key<K>>, deltas: &Delta<K, V>) {
640    for (key, delta) in deltas {
641        if delta.is_some() {
642            keys.insert(key.clone());
643        } else {
644            keys.remove(key);
645        }
646    }
647}
648
649/// An occupied entry in a [`TxnMapLock`]
650pub struct EntryOccupied<K, V> {
651    key: Key<K>,
652    value: TxnMapValueWriteGuard<K, V>,
653}
654
655impl<K, V> EntryOccupied<K, V> {
656    /// Borrow this entry's value.
657    pub fn get(&self) -> &V {
658        self.value.deref()
659    }
660
661    /// Borrow this entry's value mutably.
662    pub fn get_mut(&mut self) -> &mut V {
663        self.value.deref_mut()
664    }
665
666    /// Borrow this entry's key.
667    pub fn key(&self) -> &K {
668        &self.key
669    }
670}
671
672/// A vacant entry in a [`TxnMapLock`]
673pub struct EntryVacant<I, K, V> {
674    permit: PermitWrite<Range<K>>,
675    txn_id: I,
676    key: Key<K>,
677    map_state: Arc<RwLockInner<State<I, K, V>>>,
678}
679
680impl<I, K, V> EntryVacant<I, K, V>
681where
682    I: Hash + Ord + Copy + fmt::Debug,
683    K: Hash + Ord + Clone + fmt::Debug,
684    V: Clone + fmt::Debug,
685{
686    /// Insert a new value at this [`Entry`].
687    pub fn insert(self, value: V) -> TxnMapValueWriteGuard<K, V> {
688        let mut map_state = self.map_state.write().expect("lock state");
689        let value = map_state.insert_new(self.txn_id, self.key, value);
690        TxnMapValueWriteGuard::new(self.permit, value)
691    }
692
693    /// Borrow this entry's key.
694    pub fn key(&self) -> &K {
695        &self.key
696    }
697}
698
699/// An entry in a [`TxnMapLock`]
700pub enum Entry<I, K, V> {
701    Occupied(EntryOccupied<K, V>),
702    Vacant(EntryVacant<I, K, V>),
703}
704
705impl<I, K, V> Entry<I, K, V> {
706    pub fn key(&self) -> &K {
707        todo!()
708    }
709}
710
711/// A futures-aware read-write lock on a [`HashMap`] which supports transactional versioning
712///
713/// The `get_mut` and `try_get_mut` methods require the value type `V` to implement [`Clone`]
714/// in order to support multiple transactions with different versions.
715pub struct TxnMapLock<I, K, V> {
716    state: Arc<RwLockInner<State<I, K, V>>>,
717    semaphore: Semaphore<I, Collator<K>, Range<K>>,
718}
719
720impl<I, K, V> Clone for TxnMapLock<I, K, V> {
721    fn clone(&self) -> Self {
722        Self {
723            state: self.state.clone(),
724            semaphore: self.semaphore.clone(),
725        }
726    }
727}
728
729impl<I, K, V> TxnMapLock<I, K, V> {
730    #[inline]
731    fn state(&self) -> impl Deref<Target = State<I, K, V>> + '_ {
732        self.state.read().expect("lock state")
733    }
734
735    #[inline]
736    fn state_mut(&self) -> impl DerefMut<Target = State<I, K, V>> + '_ {
737        self.state.write().expect("lock state")
738    }
739}
740
741impl<I: Ord + Hash + fmt::Debug, K, V> TxnMapLock<I, K, V> {
742    /// Construct a new [`TxnMapLock`].
743    pub fn new(txn_id: I) -> Self {
744        let collator = Collator::<K>::default();
745
746        Self {
747            state: Arc::new(RwLockInner::new(State::new(txn_id, Pending::new()))),
748            semaphore: Semaphore::new(collator),
749        }
750    }
751}
752
753impl<I, K, V> TxnMapLock<I, K, V>
754where
755    I: Copy + Hash + Ord + fmt::Debug,
756    K: Eq + Hash + Ord + fmt::Debug + Send + Sync,
757    V: fmt::Debug,
758{
759    /// Construct a new [`TxnMapLock`] with the given `contents`.
760    pub fn with_contents<KV: IntoIterator<Item = (K, V)>>(txn_id: I, contents: KV) -> Self {
761        let version = contents
762            .into_iter()
763            .map(|(key, value)| (Key::new(key), Some(Arc::new(RwLock::new(value)))))
764            .collect();
765
766        let collator = Collator::<K>::default();
767
768        Self {
769            state: Arc::new(RwLockInner::new(State::new(txn_id, version))),
770            semaphore: Semaphore::with_reservation(txn_id, collator, Range::All),
771        }
772    }
773
774    /// Remove and return all entries from this [`TxnMapLock`] at `txn_id`.
775    pub async fn clear(&self, txn_id: I) -> Result<Canon<K, V>> {
776        #[cfg(feature = "logging")]
777        log::trace!("clear {self:?} at {txn_id:?}");
778
779        let _permit = self.semaphore.write(txn_id, Range::All).await?;
780
781        let mut state = self.state_mut();
782        state.check_pending(&txn_id)?;
783
784        Ok(state.clear(txn_id))
785    }
786
787    /// Remove and return all entries from this [`TxnMapLock`] at `txn_id`.
788    pub fn try_clear(&self, txn_id: I) -> Result<Canon<K, V>> {
789        #[cfg(feature = "logging")]
790        log::trace!("try_clear {self:?} at {txn_id:?}");
791
792        let mut state = self.state_mut();
793        state.check_pending(&txn_id)?;
794
795        let _permit = self.semaphore.try_write(txn_id, Range::All)?;
796
797        Ok(state.clear(txn_id))
798    }
799
800    /// Return `true` if this [`TxnMapLock`] has an entry at `key` at `txn_id`.
801    pub async fn contains_key<Q>(&self, txn_id: I, key: &Q) -> Result<bool>
802    where
803        Q: Eq + Hash + ToOwned<Owned = K> + ?Sized,
804        Key<K>: Borrow<Q>,
805    {
806        #[cfg(feature = "logging")]
807        log::trace!("contains_key? {self:?} at {txn_id:?}");
808
809        let range: Range<K> = {
810            let state = self.state();
811            if let Poll::Ready(result) = state.contains_committed(&txn_id, key) {
812                return result;
813            } else {
814                Key::<K>::from((key, state.key(&txn_id, key))).into()
815            }
816        };
817
818        let _permit = self.semaphore.read(txn_id, range).await?;
819        Ok(self.state().contains_pending(&txn_id, key))
820    }
821
822    /// Synchronously check whether the given `key` is present in this [`TxnMapLock`]s, if possible.
823    pub fn try_contains_key<Q>(&self, txn_id: I, key: &Q) -> Result<bool>
824    where
825        Q: Eq + Hash + ToOwned<Owned = K> + ?Sized,
826        Key<K>: Borrow<Q>,
827    {
828        #[cfg(feature = "logging")]
829        log::trace!("try_contains_key {self:?} at {txn_id:?}");
830
831        let state = self.state();
832        if let Poll::Ready(result) = state.contains_committed(&txn_id, key) {
833            return result;
834        }
835
836        let range = Key::<K>::from((key, state.key(&txn_id, key))).into();
837        let _permit = self.semaphore.try_read(txn_id, range)?;
838        Ok(state.contains_pending(&txn_id, key))
839    }
840
841    /// Insert the entries from `other` [`TxnMapLock`] at `txn_id`.
842    pub async fn extend<Q, E>(&self, txn_id: I, other: E) -> Result<()>
843    where
844        Q: Into<Key<K>>,
845        E: IntoIterator<Item = (Q, V)>,
846    {
847        #[cfg(feature = "logging")]
848        log::trace!("extend {self:?} at {txn_id:?}");
849
850        let _permit = self.semaphore.write(txn_id, Range::All).await?;
851
852        let mut state = self.state_mut();
853        state.check_pending(&txn_id)?;
854
855        state.extend(txn_id, other);
856        Ok(())
857    }
858
859    /// Insert the entries from `other` [`TxnMapLock`] at `txn_id` synchronously, if possible.
860    pub fn try_extend<Q, E>(&self, txn_id: I, other: E) -> Result<()>
861    where
862        Q: Into<Key<K>>,
863        E: IntoIterator<Item = (Q, V)>,
864    {
865        #[cfg(feature = "logging")]
866        log::trace!("try_extend {self:?} at {txn_id:?}");
867
868        let mut state = self.state_mut();
869        state.check_pending(&txn_id)?;
870
871        let _permit = self.semaphore.try_write(txn_id, Range::All)?;
872
873        state.extend(txn_id, other);
874        Ok(())
875    }
876
877    /// Read a value from this [`TxnMapLock`] at `txn_id`.
878    pub async fn get<Q>(&self, txn_id: I, key: &Q) -> Result<Option<TxnMapValueReadGuard<K, V>>>
879    where
880        Q: Eq + Hash + ToOwned<Owned = K> + ?Sized,
881        Key<K>: Borrow<Q>,
882    {
883        #[cfg(feature = "logging")]
884        log::trace!("get from {self:?} at {txn_id:?}");
885
886        let range: Range<K> = {
887            let state = self.state();
888
889            if let Poll::Ready(result) = state.get_committed(&txn_id, key) {
890                return result;
891            }
892
893            Key::<K>::from((key, state.key(&txn_id, key))).into()
894        };
895
896        let permit = self.semaphore.read(txn_id, range).await?;
897
898        let state = self.state();
899
900        if let Poll::Ready(result) = state.get_committed(&txn_id, key) {
901            return result;
902        }
903
904        let value = state.get_pending(&txn_id, key).map(|value| match value {
905            PendingValue::Committed(value) => TxnMapValueReadGuard::committed(value),
906            PendingValue::Pending(value) => TxnMapValueReadGuard::pending_write(permit, value),
907        });
908
909        Ok(value)
910    }
911
912    /// Read a value from this [`TxnMapLock`] at `txn_id` synchronously, if possible.
913    pub fn try_get<Q>(&self, txn_id: I, key: &Q) -> Result<Option<TxnMapValueReadGuard<K, V>>>
914    where
915        Q: Eq + Hash + ToOwned<Owned = K> + ?Sized,
916        Key<K>: Borrow<Q>,
917    {
918        #[cfg(feature = "logging")]
919        log::trace!("try_get from {self:?} at {txn_id:?}");
920
921        let state = self.state();
922
923        if let Poll::Ready(result) = state.get_committed(&txn_id, key) {
924            #[cfg(feature = "log")]
925            log::trace!(
926                "{self:?} is already committed at {txn_id:?}, no need for a read permit..."
927            );
928            return result;
929        } else {
930            #[cfg(feature = "log")]
931            log::trace!("{self:?} is not yet committed at {txn_id:?}, getting a read permit...");
932        }
933
934        let range = Key::<K>::from((key, state.key(&txn_id, key))).into();
935        let permit = self.semaphore.try_read(txn_id, range)?;
936        let value = state.get_pending(&txn_id, key).map(|value| match value {
937            PendingValue::Committed(value) => TxnMapValueReadGuard::committed(value),
938            PendingValue::Pending(value) => TxnMapValueReadGuard::pending_write(permit, value),
939        });
940
941        Ok(value)
942    }
943
944    /// Construct an iterator over the entries in this [`TxnMapLock`] at `txn_id`.
945    pub async fn iter(&self, txn_id: I) -> Result<Iter<I, K, V>> {
946        #[cfg(feature = "logging")]
947        log::trace!("iter over {self:?} at {txn_id:?}");
948
949        let permit = self.semaphore.read(txn_id, Range::All).await?;
950
951        let state = self.state();
952
953        if state.check_committed(&txn_id)? {
954            let keys = state.keys_committed(&txn_id);
955            Ok(Iter::new(self.state.clone(), txn_id, None, keys))
956        } else {
957            let keys = self.state().keys_pending(txn_id);
958            Ok(Iter::new(self.state.clone(), txn_id, Some(permit), keys))
959        }
960    }
961
962    /// Construct an iterator over the entries in this [`TxnMapLock`] at `txn_id` synchronously.
963    pub fn try_iter(&self, txn_id: I) -> Result<Iter<I, K, V>> {
964        #[cfg(feature = "logging")]
965        log::trace!("try_iter over {self:?} at {txn_id:?}");
966
967        let state = self.state();
968
969        if state.check_committed(&txn_id)? {
970            let keys = state.keys_committed(&txn_id);
971            Ok(Iter::new(self.state.clone(), txn_id, None, keys))
972        } else {
973            let permit = self.semaphore.try_read(txn_id, Range::All)?;
974            let keys = state.keys_pending(txn_id);
975            Ok(Iter::new(self.state.clone(), txn_id, Some(permit), keys))
976        }
977    }
978
979    /// Insert a new entry into this [`TxnMapLock`] at `txn_id` and return the prior value, if any.
980    pub async fn insert<Q: Into<Key<K>>>(
981        &self,
982        txn_id: I,
983        key: Q,
984        value: V,
985    ) -> Result<Option<Arc<V>>> {
986        #[cfg(feature = "logging")]
987        log::trace!("insert into {self:?} at {txn_id:?}");
988
989        let key: Key<K> = key.into();
990        let _permit = self.semaphore.write(txn_id, key.clone().into()).await?;
991
992        let mut state = self.state_mut();
993        state.check_pending(&txn_id)?;
994
995        Ok(state.insert(txn_id, key, value))
996    }
997
998    /// Insert a new entry into this [`TxnMapLock`] at `txn_id` synchronously, if possible.
999    pub fn try_insert<Q: Into<Key<K>>>(
1000        &self,
1001        txn_id: I,
1002        key: Q,
1003        value: V,
1004    ) -> Result<Option<Arc<V>>> {
1005        #[cfg(feature = "logging")]
1006        log::trace!("try_insert into {self:?} at {txn_id:?}");
1007
1008        let mut state = self.state_mut();
1009        state.check_pending(&txn_id)?;
1010
1011        let key = key.into();
1012        let _permit = self.semaphore.try_write(txn_id, key.clone().into())?;
1013
1014        Ok(state.insert(txn_id, key, value))
1015    }
1016
1017    /// Return `true` if this [`TxnMapLock`] is empty at the given `txn_id`.
1018    pub async fn is_empty(&self, txn_id: I) -> Result<bool> {
1019        #[cfg(feature = "logging")]
1020        log::trace!("is_empty? {self:?} at {txn_id:?}");
1021
1022        self.len(txn_id).map_ok(|len| len == 0).await
1023    }
1024
1025    /// Get the size of this [`TxnMapLock`] at the given `txn_id`.
1026    pub async fn len(&self, txn_id: I) -> Result<usize> {
1027        #[cfg(feature = "logging")]
1028        log::trace!("len of {self:?} at {txn_id:?}");
1029
1030        let _permit = self.semaphore.read(txn_id, Range::All).await?;
1031
1032        let state = self.state();
1033
1034        if state.check_committed(&txn_id)? {
1035            let keys = state.keys_committed(&txn_id);
1036            Ok(keys.len())
1037        } else {
1038            let keys = state.keys_pending(txn_id);
1039            Ok(keys.len())
1040        }
1041    }
1042
1043    /// Remove and return the value at `key` from this [`TxnMapLock`] at `txn_id`, if present.
1044    pub async fn remove<Q>(&self, txn_id: I, key: &Q) -> Result<Option<Arc<V>>>
1045    where
1046        Q: Eq + Hash + ToOwned<Owned = K> + ?Sized,
1047        Key<K>: Borrow<Q>,
1048    {
1049        #[cfg(feature = "logging")]
1050        log::trace!("remove from {self:?} at {txn_id:?}");
1051
1052        let key: Key<K> = {
1053            let state = self.state();
1054            state.check_pending(&txn_id)?;
1055            (key, state.key(&txn_id, key)).into()
1056        };
1057
1058        let _permit = self.semaphore.write(txn_id, key.clone().into()).await?;
1059
1060        let mut state = self.state_mut();
1061        state.check_pending(&txn_id)?;
1062
1063        Ok(state.remove(txn_id, key))
1064    }
1065
1066    /// Remove and return the value at `key` from this [`TxnMapLock`] at `txn_id`, if present.
1067    pub fn try_remove<Q>(&self, txn_id: I, key: &Q) -> Result<Option<Arc<V>>>
1068    where
1069        Q: Eq + Hash + ToOwned<Owned = K> + ?Sized,
1070        Key<K>: Borrow<Q>,
1071    {
1072        #[cfg(feature = "logging")]
1073        log::trace!("try_remove from {self:?} at {txn_id:?}");
1074
1075        let mut state = self.state_mut();
1076        state.check_pending(&txn_id)?;
1077
1078        let key: Key<K> = (key, state.key(&txn_id, key)).into();
1079        let _permit = self.semaphore.try_write(txn_id, key.clone().into())?;
1080
1081        Ok(state.remove(txn_id, key))
1082    }
1083}
1084
1085impl<I, K, V> TxnMapLock<I, K, V>
1086where
1087    I: Copy + Hash + Ord + fmt::Debug,
1088    K: Hash + Ord + fmt::Debug + Send + Sync,
1089    V: Clone + fmt::Debug,
1090{
1091    /// Borrow an [`Entry`] mutably for writing at `txn_id`.
1092    pub async fn entry<Q: Into<Key<K>>>(&self, txn_id: I, key: Q) -> Result<Entry<I, K, V>> {
1093        #[cfg(feature = "logging")]
1094        log::trace!("get entry in {self:?} at {txn_id:?}");
1095
1096        // before acquiring a permit, check if this version has already been committed
1097        self.state().check_pending(&txn_id)?;
1098
1099        let key: Key<K> = key.into();
1100        let range = key.clone().into();
1101        let permit = self.semaphore.write(txn_id, range).await?;
1102
1103        if let Some(value) = self.state_mut().get_mut(txn_id, key.clone()) {
1104            Ok(Entry::Occupied(EntryOccupied {
1105                key,
1106                value: TxnMapValueWriteGuard::new(permit, value),
1107            }))
1108        } else {
1109            Ok(Entry::Vacant(EntryVacant {
1110                permit,
1111                key,
1112                txn_id,
1113                map_state: self.state.clone(),
1114            }))
1115        }
1116    }
1117
1118    /// Read a mutable value from this [`TxnMapLock`] at `txn_id`.
1119    pub async fn get_mut<Q: Into<Key<K>>>(
1120        &self,
1121        txn_id: I,
1122        key: Q,
1123    ) -> Result<Option<TxnMapValueWriteGuard<K, V>>> {
1124        #[cfg(feature = "logging")]
1125        log::trace!("get_mut {self:?} at {txn_id:?}");
1126
1127        // before acquiring a permit, check if this version has already been committed
1128        self.state().check_pending(&txn_id)?;
1129
1130        let key = key.into();
1131        let permit = self.semaphore.write(txn_id, key.clone().into()).await?;
1132
1133        if let Some(value) = self.state_mut().get_mut(txn_id, key) {
1134            Ok(Some(TxnMapValueWriteGuard::new(permit, value)))
1135        } else {
1136            Ok(None)
1137        }
1138    }
1139
1140    /// Read a mutable value from this [`TxnMapLock`] at `txn_id`.
1141    pub fn try_get_mut<Q>(&self, txn_id: I, key: &Q) -> Result<Option<TxnMapValueWriteGuard<K, V>>>
1142    where
1143        Q: Eq + Hash + ToOwned<Owned = K> + ?Sized,
1144        Key<K>: Borrow<Q>,
1145    {
1146        #[cfg(feature = "logging")]
1147        log::trace!("try_get_mut {self:?} at {txn_id:?}");
1148
1149        let mut state = self.state_mut();
1150
1151        // before acquiring a permit, check if this version has already been committed
1152        state.check_pending(&txn_id)?;
1153
1154        let maybe_key = state.key(&txn_id, key);
1155        let key = Key::<K>::from((key, maybe_key));
1156        let permit = self.semaphore.try_write(txn_id, key.clone().into())?;
1157
1158        if let Some(value) = state.get_mut(txn_id, key) {
1159            Ok(Some(TxnMapValueWriteGuard::new(permit, value)))
1160        } else {
1161            Ok(None)
1162        }
1163    }
1164
1165    /// Construct a mutable iterator over the entries in this [`TxnMapLock`] at `txn_id`.
1166    pub async fn iter_mut(&self, txn_id: I) -> Result<IterMut<I, K, V>> {
1167        #[cfg(feature = "logging")]
1168        log::trace!("iter_mut {self:?} at {txn_id:?}");
1169
1170        // before acquiring a permit, check if this version has already been committed
1171        self.state().check_pending(&txn_id)?;
1172
1173        let permit = self.semaphore.write(txn_id, Range::All).await?;
1174        let keys = self.state().keys_pending(txn_id);
1175        Ok(IterMut::new(self.state.clone(), txn_id, permit, keys))
1176    }
1177
1178    /// Construct a mutable iterator over the entries in this [`TxnMapLock`] at `txn_id`,
1179    /// synchronously if possible.
1180    pub fn try_iter_mut(&self, txn_id: I) -> Result<IterMut<I, K, V>> {
1181        #[cfg(feature = "logging")]
1182        log::trace!("try_iter_mut {self:?} at {txn_id:?}");
1183
1184        let state = self.state();
1185
1186        // before acquiring a permit, check if this version has already been committed
1187        state.check_pending(&txn_id)?;
1188
1189        let permit = self.semaphore.try_write(txn_id, Range::All)?;
1190        let keys = state.keys_pending(txn_id);
1191        Ok(IterMut::new(self.state.clone(), txn_id, permit, keys))
1192    }
1193}
1194
1195impl<I, K, V> TxnMapLock<I, K, V>
1196where
1197    I: Copy + Hash + Ord + fmt::Debug,
1198    K: Eq + Hash + Ord + fmt::Debug + Send + Sync,
1199    V: fmt::Debug,
1200{
1201    /// Commit the state of this [`TxnMapLock`] at `txn_id`.
1202    /// Panics:
1203    ///  - if this [`TxnMapLock`] has already been finalized at `txn_id`
1204    ///  - if any new value to commit is still locked (for reading or writing)
1205    pub fn commit(&self, txn_id: I) {
1206        #[cfg(feature = "logging")]
1207        log::trace!("commit {self:?} at {txn_id:?}");
1208
1209        let mut state = self.state_mut();
1210
1211        if state.finalized.as_ref() >= Some(&txn_id) {
1212            panic!("tried to commit already-finalized version {:?}", txn_id);
1213        }
1214
1215        state.commit(txn_id);
1216
1217        self.semaphore.finalize(&txn_id, false);
1218    }
1219
1220    /// Read and commit this [`TxnMapLock`] in a single operation.
1221    /// Also returns the set of changes committed, if any.
1222    ///
1223    /// Panics:
1224    ///  - if this [`TxnMapLock`] has already been finalized at `txn_id`
1225    ///  - if any new value to commit is still locked (for reading or writing)
1226    pub async fn read_and_commit(&self, txn_id: I) -> (Canon<K, V>, Option<Delta<K, V>>) {
1227        #[cfg(feature = "logging")]
1228        log::trace!("read and commit {self:?} at {txn_id:?}");
1229
1230        let _permit = self
1231            .semaphore
1232            .read(txn_id, Range::All)
1233            .await
1234            .expect("permit");
1235
1236        let (version, deltas) = {
1237            let mut state = self.state_mut();
1238
1239            if state.finalized > Some(txn_id) {
1240                panic!("tried to commit already-finalized version {:?}", txn_id);
1241            }
1242
1243            state.commit(txn_id);
1244
1245            (state.canon(&txn_id), state.deltas.get(&txn_id).cloned())
1246        };
1247
1248        self.semaphore.finalize(&txn_id, false);
1249
1250        (version, deltas)
1251    }
1252
1253    /// Roll back the state of this [`TxnMapLock`] at `txn_id`.
1254    ///
1255    /// Panics: if this [`TxnMapLock`] has already been committed or finalized at `txn_id`
1256    pub fn rollback(&self, txn_id: &I) {
1257        self.semaphore.finalize(txn_id, false);
1258
1259        let mut state = self.state_mut();
1260
1261        assert!(
1262            !state.commits.contains(txn_id),
1263            "cannot roll back committed transaction {:?}",
1264            txn_id
1265        );
1266
1267        if state.finalized.as_ref() > Some(txn_id) {
1268            panic!("tried to roll back finalized version at {:?}", txn_id);
1269        }
1270
1271        state.pending.remove(txn_id);
1272
1273        self.semaphore.finalize(txn_id, false);
1274    }
1275
1276    /// Read and roll back this [`TxnMapLock`] in a single operation.
1277    ///
1278    /// Panics:
1279    ///  - if this [`TxnMapLock`] has already been committed or finalized at `txn_id`
1280    ///  - if any updated value is still locked for reading or writing
1281    pub async fn read_and_rollback(&self, txn_id: I) -> (Canon<K, V>, Option<Delta<K, V>>) {
1282        let _permit = self
1283            .semaphore
1284            .read(txn_id, Range::All)
1285            .await
1286            .expect("permit");
1287
1288        let (version, deltas) = {
1289            let mut state = self.state_mut();
1290
1291            assert!(
1292                !state.commits.contains(&txn_id),
1293                "cannot roll back committed transaction {:?}",
1294                txn_id
1295            );
1296
1297            if state.finalized > Some(txn_id) {
1298                panic!("tried to roll back finalized version at {:?}", txn_id);
1299            }
1300
1301            // note: this removes the pending version, i.e. it returns the version that WOULD be committed
1302            let deltas = state.commit_version(&txn_id);
1303
1304            let mut version = state.canon(&txn_id);
1305
1306            if let Some(deltas) = &deltas {
1307                merge(&mut version, deltas);
1308            }
1309
1310            (version, deltas)
1311        };
1312
1313        self.semaphore.finalize(&txn_id, false);
1314
1315        (version, deltas)
1316    }
1317
1318    /// Finalize the state of this [`TxnMapLock`] at `txn_id`.
1319    /// This will finalize commits and prevent further reads of all versions earlier than `txn_id`.
1320    pub fn finalize(&self, txn_id: I) {
1321        self.semaphore.finalize(&txn_id, true);
1322        self.state_mut().finalize(txn_id);
1323    }
1324
1325    /// Read and finalize the state of this [`TxnMapLock`] at `txn_id`, if not already finalized.
1326    /// This will finalize commits and prevent further reads of all versions earlier than `txn_id`.
1327    pub fn read_and_finalize(&self, txn_id: I) -> Option<Canon<K, V>> {
1328        self.semaphore.finalize(&txn_id, true);
1329        self.state_mut().finalize(txn_id).cloned()
1330    }
1331}
1332
1333impl<I, K, V> fmt::Debug for TxnMapLock<I, K, V> {
1334    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1335        f.write_str("a transactional lock on a map of keys to values")
1336    }
1337}
1338
1339/// A guard on a value in an [`Iter`]
1340#[derive(Debug)]
1341pub struct TxnMapLockIterGuard<V> {
1342    value: PendingValue<V>,
1343}
1344
1345impl<V> From<PendingValue<V>> for TxnMapLockIterGuard<V> {
1346    fn from(value: PendingValue<V>) -> Self {
1347        Self { value }
1348    }
1349}
1350
1351impl<V> Deref for TxnMapLockIterGuard<V> {
1352    type Target = V;
1353
1354    fn deref(&self) -> &Self::Target {
1355        match &self.value {
1356            PendingValue::Committed(value) => value.deref(),
1357            PendingValue::Pending(value) => value.deref(),
1358        }
1359    }
1360}
1361
1362impl<V: PartialEq> PartialEq<V> for TxnMapLockIterGuard<V> {
1363    fn eq(&self, other: &V) -> bool {
1364        self.deref().eq(other)
1365    }
1366}
1367
1368impl<V: PartialOrd> PartialOrd<V> for TxnMapLockIterGuard<V> {
1369    fn partial_cmp(&self, other: &V) -> Option<Ordering> {
1370        self.deref().partial_cmp(other)
1371    }
1372}
1373
1374/// An iterator over the entries in a [`TxnMapLock`] as of a specific transaction
1375pub struct Iter<I, K, V> {
1376    lock_state: Arc<RwLockInner<State<I, K, V>>>,
1377    txn_id: I,
1378    permit: Option<PermitRead<Range<K>>>,
1379    keys: <HashSet<Key<K>> as IntoIterator>::IntoIter,
1380}
1381
1382impl<I, K, V> Iter<I, K, V> {
1383    fn new(
1384        lock_state: Arc<RwLockInner<State<I, K, V>>>,
1385        txn_id: I,
1386        permit: Option<PermitRead<Range<K>>>,
1387        keys: HashSet<Key<K>>,
1388    ) -> Self {
1389        Self {
1390            lock_state,
1391            txn_id,
1392            permit,
1393            keys: keys.into_iter(),
1394        }
1395    }
1396}
1397
1398impl<I, K, V> Iterator for Iter<I, K, V>
1399where
1400    I: Copy + Hash + Ord + fmt::Debug,
1401    K: Hash + Ord,
1402    V: fmt::Debug,
1403{
1404    type Item = (Key<K>, TxnMapLockIterGuard<V>);
1405
1406    fn next(&mut self) -> Option<Self::Item> {
1407        let state = self.lock_state.read().expect("lock state");
1408
1409        loop {
1410            let key = self.keys.next()?;
1411            let value = get_key(&state, &self.txn_id, &key, self.permit.is_none());
1412            if let Some(value) = value {
1413                return Some((key, value.into()));
1414            }
1415        }
1416    }
1417
1418    fn size_hint(&self) -> (usize, Option<usize>) {
1419        self.keys.size_hint()
1420    }
1421}
1422
1423#[inline]
1424fn get_key<I, K, V, Q>(
1425    state: &State<I, K, V>,
1426    txn_id: &I,
1427    key: &Q,
1428    committed: bool,
1429) -> Option<PendingValue<V>>
1430where
1431    I: Copy + Hash + Ord + fmt::Debug,
1432    K: Hash + Ord,
1433    V: fmt::Debug,
1434    Q: Eq + Hash + ?Sized,
1435    Key<K>: Borrow<Q>,
1436{
1437    if committed {
1438        state.get_canon(txn_id, key).map(PendingValue::Committed)
1439    } else {
1440        state.get_pending(txn_id, key)
1441    }
1442}
1443
1444/// A guard on a mutable value in an [`IterMut`]
1445#[derive(Debug)]
1446pub struct TxnMapLockIterMutGuard<V> {
1447    guard: OwnedRwLockWriteGuard<V>,
1448}
1449
1450impl<V> Deref for TxnMapLockIterMutGuard<V> {
1451    type Target = V;
1452
1453    fn deref(&self) -> &Self::Target {
1454        self.guard.deref()
1455    }
1456}
1457
1458impl<V> DerefMut for TxnMapLockIterMutGuard<V> {
1459    fn deref_mut(&mut self) -> &mut Self::Target {
1460        self.guard.deref_mut()
1461    }
1462}
1463
1464impl<V: PartialEq> PartialEq<V> for TxnMapLockIterMutGuard<V> {
1465    fn eq(&self, other: &V) -> bool {
1466        self.deref().eq(other)
1467    }
1468}
1469
1470impl<V: PartialOrd> PartialOrd<V> for TxnMapLockIterMutGuard<V> {
1471    fn partial_cmp(&self, other: &V) -> Option<Ordering> {
1472        self.deref().partial_cmp(other)
1473    }
1474}
1475
1476impl<V> From<OwnedRwLockWriteGuard<V>> for TxnMapLockIterMutGuard<V> {
1477    fn from(guard: OwnedRwLockWriteGuard<V>) -> Self {
1478        Self { guard }
1479    }
1480}
1481
1482/// An iterator over the keys and mutable values in a [`TxnMapLock`]
1483pub struct IterMut<I, K, V> {
1484    lock_state: Arc<RwLockInner<State<I, K, V>>>,
1485    txn_id: I,
1486
1487    #[allow(unused)]
1488    permit: PermitWrite<Range<K>>,
1489    keys: <HashSet<Key<K>> as IntoIterator>::IntoIter,
1490}
1491
1492impl<I, K, V> IterMut<I, K, V> {
1493    fn new(
1494        lock_state: Arc<RwLockInner<State<I, K, V>>>,
1495        txn_id: I,
1496        permit: PermitWrite<Range<K>>,
1497        keys: HashSet<Key<K>>,
1498    ) -> Self {
1499        Self {
1500            lock_state,
1501            txn_id,
1502            permit,
1503            keys: keys.into_iter(),
1504        }
1505    }
1506}
1507
1508impl<I, K, V> Iterator for IterMut<I, K, V>
1509where
1510    I: Copy + Hash + Ord + fmt::Debug,
1511    K: Hash + Ord,
1512    V: Clone + fmt::Debug,
1513{
1514    type Item = (Arc<K>, TxnMapLockIterMutGuard<V>);
1515
1516    fn next(&mut self) -> Option<Self::Item> {
1517        let mut state = self.lock_state.write().expect("lock state");
1518
1519        loop {
1520            let key = self.keys.next()?;
1521            if let Some(guard) = state.get_mut(self.txn_id, key.clone()) {
1522                return Some((key.into(), TxnMapLockIterMutGuard::from(guard)));
1523            }
1524        }
1525    }
1526
1527    fn size_hint(&self) -> (usize, Option<usize>) {
1528        self.keys.size_hint()
1529    }
1530}
1531
1532#[inline]
1533fn merge<K: Eq + Hash, V>(version: &mut Canon<K, V>, deltas: &Delta<K, V>) {
1534    for (key, delta) in deltas {
1535        match delta {
1536            Some(value) => version.insert(key.clone(), value.clone()),
1537            None => version.remove(key),
1538        };
1539    }
1540}
1541
1542#[inline]
1543fn merge_owned<K: Eq + Hash, V>(version: &mut Canon<K, V>, deltas: Delta<K, V>) {
1544    for (key, delta) in deltas {
1545        match delta {
1546            Some(value) => version.insert(key, value),
1547            None => version.remove(&key),
1548        };
1549    }
1550}