1use either::Either;
2use txn_core::sync::{Cm, Marker};
3
4use super::*;
5
6use core::cmp;
7use crossbeam_skiplist::map::Range as MapRange;
8
9pub struct Range<'a, Q, R, K, V>
11where
12 K: Ord + Borrow<Q>,
13 R: RangeBounds<Q>,
14 Q: Ord + ?Sized,
15{
16 pub(crate) range: MapRange<'a, Q, R, K, Values<V>>,
17 pub(crate) version: u64,
18}
19
20impl<'a, Q, R, K, V> Iterator for Range<'a, Q, R, K, V>
21where
22 K: Ord + Borrow<Q>,
23 R: RangeBounds<Q>,
24 Q: Ord + ?Sized,
25{
26 type Item = Ref<'a, K, V>;
27
28 fn next(&mut self) -> Option<Self::Item> {
29 loop {
30 let ent = self.range.next()?;
31 if let Some(version) = ent
32 .value()
33 .upper_bound(Bound::Included(&self.version))
34 .and_then(|ent| {
35 if ent.value().is_some() {
36 Some(*ent.key())
37 } else {
38 None
39 }
40 })
41 {
42 return Some(CommittedRef { version, ent }.into());
43 }
44 }
45 }
46}
47
48pub struct TransactionRange<'a, Q, R, K, V, C>
50where
51 K: Ord + Borrow<Q>,
52 R: RangeBounds<Q> + 'a,
53 Q: Ord + ?Sized,
54{
55 pub(crate) committed: Range<'a, Q, R, K, V>,
56 pub(crate) pendings: BTreeMapRange<'a, K, EntryValue<V>>,
57 next_pending: Option<(&'a K, &'a EntryValue<V>)>,
58 next_committed: Option<Ref<'a, K, V>>,
59 last_yielded_key: Option<Either<&'a K, Ref<'a, K, V>>>,
60 marker: Option<Marker<'a, C>>,
61}
62
63impl<'a, Q, R, K, V, C> TransactionRange<'a, Q, R, K, V, C>
64where
65 K: Ord + Borrow<Q>,
66 Q: Ord + ?Sized,
67 R: RangeBounds<Q> + 'a,
68 C: Cm<Key = K>,
69{
70 fn advance_pending(&mut self) {
71 self.next_pending = self.pendings.next();
72 }
73
74 fn advance_committed(&mut self) {
75 self.next_committed = self.committed.next();
76 if let (Some(item), Some(marker)) = (&self.next_committed, &mut self.marker) {
77 marker.mark(item.key());
78 }
79 }
80
81 pub fn new(
82 pendings: BTreeMapRange<'a, K, EntryValue<V>>,
83 committed: Range<'a, Q, R, K, V>,
84 marker: Option<Marker<'a, C>>,
85 ) -> Self {
86 let mut iterator = TransactionRange {
87 pendings,
88 committed,
89 next_pending: None,
90 next_committed: None,
91 last_yielded_key: None,
92 marker,
93 };
94
95 iterator.advance_pending();
96 iterator.advance_committed();
97
98 iterator
99 }
100}
101
102impl<'a, Q, R, K, V, C> Iterator for TransactionRange<'a, Q, R, K, V, C>
103where
104 K: Ord + Borrow<Q>,
105 Q: Ord + ?Sized,
106 R: RangeBounds<Q> + 'a,
107 C: Cm<Key = K>,
108{
109 type Item = Ref<'a, K, V>;
110
111 fn next(&mut self) -> Option<Self::Item> {
112 loop {
113 match (self.next_pending, &self.next_committed) {
114 (Some((pending_key, _)), Some(committed)) => {
116 match pending_key.cmp(committed.key()) {
117 cmp::Ordering::Less => {
119 let (key, value) = self.next_pending.take().unwrap();
120 self.advance_pending();
121 self.last_yielded_key = Some(Either::Left(key));
122 let version = value.version;
123 match &value.value {
124 Some(value) => return Some((version, key, value).into()),
125 None => continue,
126 }
127 }
128 cmp::Ordering::Equal => {
130 self.advance_committed();
132 continue;
134 }
135 cmp::Ordering::Greater => {
137 let committed = self.next_committed.take().unwrap();
138 self.advance_committed(); if self.last_yielded_key.as_ref().map_or(true, |k| match k {
141 Either::Left(k) => *k != committed.key(),
142 Either::Right(item) => item.key() != committed.key(),
143 }) {
144 self.last_yielded_key = Some(Either::Right(committed.clone()));
145 return Some(committed);
146 }
147 }
148 }
149 }
150 (Some((_, _)), None) => {
152 let (key, value) = self.next_pending.take().unwrap();
153 self.advance_pending(); self.last_yielded_key = Some(Either::Left(key)); let version = value.version;
156 match &value.value {
157 Some(value) => return Some((version, key, value).into()),
158 None => continue,
159 }
160 }
161 (None, Some(committed)) => {
163 if self.last_yielded_key.as_ref().map_or(true, |k| match k {
164 Either::Left(k) => *k != committed.key(),
165 Either::Right(item) => item.key() != committed.key(),
166 }) {
167 let committed = self.next_committed.take().unwrap();
168 self.advance_committed(); self.last_yielded_key = Some(Either::Right(committed.clone()));
170 return Some(committed);
171 } else {
172 self.advance_committed();
174 continue;
176 }
177 }
178 (None, None) => return None,
180 }
181 }
182 }
183}