skipdb_core/
range.rs

1use either::Either;
2use txn_core::sync::{Cm, Marker};
3
4use super::*;
5
6use core::cmp;
7use crossbeam_skiplist::map::Range as MapRange;
8
9/// An iterator over a subset of entries of the database.
10pub struct Range<'a, Q, R, K, V>
11where
12  K: Ord + Borrow<Q>,
13  R: RangeBounds<Q>,
14  Q: Ord + ?Sized,
15{
16  pub(crate) range: MapRange<'a, Q, R, K, Values<V>>,
17  pub(crate) version: u64,
18}
19
20impl<'a, Q, R, K, V> Iterator for Range<'a, Q, R, K, V>
21where
22  K: Ord + Borrow<Q>,
23  R: RangeBounds<Q>,
24  Q: Ord + ?Sized,
25{
26  type Item = Ref<'a, K, V>;
27
28  fn next(&mut self) -> Option<Self::Item> {
29    loop {
30      let ent = self.range.next()?;
31      if let Some(version) = ent
32        .value()
33        .upper_bound(Bound::Included(&self.version))
34        .and_then(|ent| {
35          if ent.value().is_some() {
36            Some(*ent.key())
37          } else {
38            None
39          }
40        })
41      {
42        return Some(CommittedRef { version, ent }.into());
43      }
44    }
45  }
46}
47
48/// An iterator over a subset of entries of the database.
49pub struct TransactionRange<'a, Q, R, K, V, C>
50where
51  K: Ord + Borrow<Q>,
52  R: RangeBounds<Q> + 'a,
53  Q: Ord + ?Sized,
54{
55  pub(crate) committed: Range<'a, Q, R, K, V>,
56  pub(crate) pendings: BTreeMapRange<'a, K, EntryValue<V>>,
57  next_pending: Option<(&'a K, &'a EntryValue<V>)>,
58  next_committed: Option<Ref<'a, K, V>>,
59  last_yielded_key: Option<Either<&'a K, Ref<'a, K, V>>>,
60  marker: Option<Marker<'a, C>>,
61}
62
63impl<'a, Q, R, K, V, C> TransactionRange<'a, Q, R, K, V, C>
64where
65  K: Ord + Borrow<Q>,
66  Q: Ord + ?Sized,
67  R: RangeBounds<Q> + 'a,
68  C: Cm<Key = K>,
69{
70  fn advance_pending(&mut self) {
71    self.next_pending = self.pendings.next();
72  }
73
74  fn advance_committed(&mut self) {
75    self.next_committed = self.committed.next();
76    if let (Some(item), Some(marker)) = (&self.next_committed, &mut self.marker) {
77      marker.mark(item.key());
78    }
79  }
80
81  pub fn new(
82    pendings: BTreeMapRange<'a, K, EntryValue<V>>,
83    committed: Range<'a, Q, R, K, V>,
84    marker: Option<Marker<'a, C>>,
85  ) -> Self {
86    let mut iterator = TransactionRange {
87      pendings,
88      committed,
89      next_pending: None,
90      next_committed: None,
91      last_yielded_key: None,
92      marker,
93    };
94
95    iterator.advance_pending();
96    iterator.advance_committed();
97
98    iterator
99  }
100}
101
102impl<'a, Q, R, K, V, C> Iterator for TransactionRange<'a, Q, R, K, V, C>
103where
104  K: Ord + Borrow<Q>,
105  Q: Ord + ?Sized,
106  R: RangeBounds<Q> + 'a,
107  C: Cm<Key = K>,
108{
109  type Item = Ref<'a, K, V>;
110
111  fn next(&mut self) -> Option<Self::Item> {
112    loop {
113      match (self.next_pending, &self.next_committed) {
114        // Both pending and committed iterators have items to yield.
115        (Some((pending_key, _)), Some(committed)) => {
116          match pending_key.cmp(committed.key()) {
117            // Pending item has a smaller key, so yield this one.
118            cmp::Ordering::Less => {
119              let (key, value) = self.next_pending.take().unwrap();
120              self.advance_pending();
121              self.last_yielded_key = Some(Either::Left(key));
122              let version = value.version;
123              match &value.value {
124                Some(value) => return Some((version, key, value).into()),
125                None => continue,
126              }
127            }
128            // Keys are equal, so we prefer the pending item and skip the committed one.
129            cmp::Ordering::Equal => {
130              // Skip committed if it has the same key as pending
131              self.advance_committed();
132              // Loop again to check the next item without yielding anything this time.
133              continue;
134            }
135            // Committed item has a smaller key, so we consider yielding this one.
136            cmp::Ordering::Greater => {
137              let committed = self.next_committed.take().unwrap();
138              self.advance_committed(); // Prepare the next committed item for future iterations.
139                                        // Yield the committed item if it has not been yielded before.
140              if self.last_yielded_key.as_ref().map_or(true, |k| match k {
141                Either::Left(k) => *k != committed.key(),
142                Either::Right(item) => item.key() != committed.key(),
143              }) {
144                self.last_yielded_key = Some(Either::Right(committed.clone()));
145                return Some(committed);
146              }
147            }
148          }
149        }
150        // Only pending items are left, so yield the next pending item.
151        (Some((_, _)), None) => {
152          let (key, value) = self.next_pending.take().unwrap();
153          self.advance_pending(); // Advance the pending iterator for the next iteration.
154          self.last_yielded_key = Some(Either::Left(key)); // Update the last yielded key.
155          let version = value.version;
156          match &value.value {
157            Some(value) => return Some((version, key, value).into()),
158            None => continue,
159          }
160        }
161        // Only committed items are left, so yield the next committed item if it hasn't been yielded already.
162        (None, Some(committed)) => {
163          if self.last_yielded_key.as_ref().map_or(true, |k| match k {
164            Either::Left(k) => *k != committed.key(),
165            Either::Right(item) => item.key() != committed.key(),
166          }) {
167            let committed = self.next_committed.take().unwrap();
168            self.advance_committed(); // Advance the committed iterator for the next iteration.
169            self.last_yielded_key = Some(Either::Right(committed.clone()));
170            return Some(committed);
171          } else {
172            // The key has already been yielded, so move to the next.
173            self.advance_committed();
174            // Loop again to check the next item without yielding anything this time.
175            continue;
176          }
177        }
178        // Both iterators have no items left to yield.
179        (None, None) => return None,
180      }
181    }
182  }
183}