skipdb_core/
iter.rs

1use either::Either;
2use txn_core::sync::{Cm, Marker};
3
4use super::*;
5
6use core::cmp;
7use crossbeam_skiplist::map::Iter as MapIter;
8
9/// An iterator over the entries of the database.
10pub struct Iter<'a, K, V> {
11  pub(crate) iter: MapIter<'a, K, Values<V>>,
12  pub(crate) version: u64,
13}
14
15impl<'a, K, V> Iterator for Iter<'a, K, V>
16where
17  K: Ord,
18{
19  type Item = Ref<'a, K, V>;
20
21  fn next(&mut self) -> Option<Self::Item> {
22    loop {
23      let ent = self.iter.next()?;
24      if let Some(version) = ent
25        .value()
26        .upper_bound(Bound::Included(&self.version))
27        .and_then(|ent| {
28          if ent.value().is_some() {
29            Some(*ent.key())
30          } else {
31            None
32          }
33        })
34      {
35        return Some(CommittedRef { version, ent }.into());
36      }
37    }
38  }
39}
40
41/// Iterator over the entries of the write transaction.
42pub struct TransactionIter<'a, K, V, C> {
43  committed: Iter<'a, K, V>,
44  pendings: BTreeMapIter<'a, K, EntryValue<V>>,
45  next_pending: Option<(&'a K, &'a EntryValue<V>)>,
46  next_committed: Option<Ref<'a, K, V>>,
47  last_yielded_key: Option<Either<&'a K, Ref<'a, K, V>>>,
48  marker: Option<Marker<'a, C>>,
49}
50
51impl<'a, K, V, C> TransactionIter<'a, K, V, C>
52where
53  C: Cm<Key = K>,
54  K: Ord,
55{
56  fn advance_pending(&mut self) {
57    self.next_pending = self.pendings.next();
58  }
59
60  fn advance_committed(&mut self) {
61    self.next_committed = self.committed.next();
62    if let (Some(item), Some(marker)) = (&self.next_committed, &mut self.marker) {
63      marker.mark(item.key());
64    }
65  }
66
67  pub fn new(
68    pendings: BTreeMapIter<'a, K, EntryValue<V>>,
69    committed: Iter<'a, K, V>,
70    marker: Option<Marker<'a, C>>,
71  ) -> Self {
72    let mut iterator = TransactionIter {
73      pendings,
74      committed,
75      next_pending: None,
76      next_committed: None,
77      last_yielded_key: None,
78      marker,
79    };
80
81    iterator.advance_pending();
82    iterator.advance_committed();
83
84    iterator
85  }
86}
87
88impl<'a, K, V, C> Iterator for TransactionIter<'a, K, V, C>
89where
90  K: Ord,
91  C: Cm<Key = K>,
92{
93  type Item = Ref<'a, K, V>;
94
95  fn next(&mut self) -> Option<Self::Item> {
96    loop {
97      match (self.next_pending, &self.next_committed) {
98        // Both pending and committed iterators have items to yield.
99        (Some((pending_key, _)), Some(committed)) => {
100          match pending_key.cmp(committed.key()) {
101            // Pending item has a smaller key, so yield this one.
102            cmp::Ordering::Less => {
103              let (key, value) = self.next_pending.take().unwrap();
104              self.advance_pending();
105              self.last_yielded_key = Some(Either::Left(key));
106              let version = value.version;
107              match &value.value {
108                Some(value) => return Some((version, key, value).into()),
109                None => continue,
110              }
111            }
112            // Keys are equal, so we prefer the pending item and skip the committed one.
113            cmp::Ordering::Equal => {
114              // Skip committed if it has the same key as pending
115              self.advance_committed();
116              // Loop again to check the next item without yielding anything this time.
117              continue;
118            }
119            // Committed item has a smaller key, so we consider yielding this one.
120            cmp::Ordering::Greater => {
121              let committed = self.next_committed.take().unwrap();
122              self.advance_committed(); // Prepare the next committed item for future iterations.
123                                        // Yield the committed item if it has not been yielded before.
124              if self.last_yielded_key.as_ref().map_or(true, |k| match k {
125                Either::Left(k) => *k != committed.key(),
126                Either::Right(item) => item.key() != committed.key(),
127              }) {
128                self.last_yielded_key = Some(Either::Right(committed.clone()));
129                return Some(committed);
130              }
131            }
132          }
133        }
134        // Only pending items are left, so yield the next pending item.
135        (Some((_, _)), None) => {
136          let (key, value) = self.next_pending.take().unwrap();
137          self.advance_pending(); // Advance the pending iterator for the next iteration.
138          self.last_yielded_key = Some(Either::Left(key)); // Update the last yielded key.
139          let version = value.version;
140          match &value.value {
141            Some(value) => return Some((version, key, value).into()),
142            None => continue,
143          }
144        }
145        // Only committed items are left, so yield the next committed item if it hasn't been yielded already.
146        (None, Some(committed)) => {
147          if self.last_yielded_key.as_ref().map_or(true, |k| match k {
148            Either::Left(k) => *k != committed.key(),
149            Either::Right(item) => item.key() != committed.key(),
150          }) {
151            let committed = self.next_committed.take().unwrap();
152            self.advance_committed(); // Advance the committed iterator for the next iteration.
153            self.last_yielded_key = Some(Either::Right(committed.clone()));
154            return Some(committed);
155          } else {
156            // The key has already been yielded, so move to the next.
157            self.advance_committed();
158            // Loop again to check the next item without yielding anything this time.
159            continue;
160          }
161        }
162        // Both iterators have no items left to yield.
163        (None, None) => return None,
164      }
165    }
166  }
167}