1use either::Either;
2use txn_core::sync::{Cm, Marker};
3
4use super::*;
5
6use core::cmp;
7use crossbeam_skiplist::map::Iter as MapIter;
8
9pub struct Iter<'a, K, V> {
11 pub(crate) iter: MapIter<'a, K, Values<V>>,
12 pub(crate) version: u64,
13}
14
15impl<'a, K, V> Iterator for Iter<'a, K, V>
16where
17 K: Ord,
18{
19 type Item = Ref<'a, K, V>;
20
21 fn next(&mut self) -> Option<Self::Item> {
22 loop {
23 let ent = self.iter.next()?;
24 if let Some(version) = ent
25 .value()
26 .upper_bound(Bound::Included(&self.version))
27 .and_then(|ent| {
28 if ent.value().is_some() {
29 Some(*ent.key())
30 } else {
31 None
32 }
33 })
34 {
35 return Some(CommittedRef { version, ent }.into());
36 }
37 }
38 }
39}
40
41pub struct TransactionIter<'a, K, V, C> {
43 committed: Iter<'a, K, V>,
44 pendings: BTreeMapIter<'a, K, EntryValue<V>>,
45 next_pending: Option<(&'a K, &'a EntryValue<V>)>,
46 next_committed: Option<Ref<'a, K, V>>,
47 last_yielded_key: Option<Either<&'a K, Ref<'a, K, V>>>,
48 marker: Option<Marker<'a, C>>,
49}
50
51impl<'a, K, V, C> TransactionIter<'a, K, V, C>
52where
53 C: Cm<Key = K>,
54 K: Ord,
55{
56 fn advance_pending(&mut self) {
57 self.next_pending = self.pendings.next();
58 }
59
60 fn advance_committed(&mut self) {
61 self.next_committed = self.committed.next();
62 if let (Some(item), Some(marker)) = (&self.next_committed, &mut self.marker) {
63 marker.mark(item.key());
64 }
65 }
66
67 pub fn new(
68 pendings: BTreeMapIter<'a, K, EntryValue<V>>,
69 committed: Iter<'a, K, V>,
70 marker: Option<Marker<'a, C>>,
71 ) -> Self {
72 let mut iterator = TransactionIter {
73 pendings,
74 committed,
75 next_pending: None,
76 next_committed: None,
77 last_yielded_key: None,
78 marker,
79 };
80
81 iterator.advance_pending();
82 iterator.advance_committed();
83
84 iterator
85 }
86}
87
88impl<'a, K, V, C> Iterator for TransactionIter<'a, K, V, C>
89where
90 K: Ord,
91 C: Cm<Key = K>,
92{
93 type Item = Ref<'a, K, V>;
94
95 fn next(&mut self) -> Option<Self::Item> {
96 loop {
97 match (self.next_pending, &self.next_committed) {
98 (Some((pending_key, _)), Some(committed)) => {
100 match pending_key.cmp(committed.key()) {
101 cmp::Ordering::Less => {
103 let (key, value) = self.next_pending.take().unwrap();
104 self.advance_pending();
105 self.last_yielded_key = Some(Either::Left(key));
106 let version = value.version;
107 match &value.value {
108 Some(value) => return Some((version, key, value).into()),
109 None => continue,
110 }
111 }
112 cmp::Ordering::Equal => {
114 self.advance_committed();
116 continue;
118 }
119 cmp::Ordering::Greater => {
121 let committed = self.next_committed.take().unwrap();
122 self.advance_committed(); if self.last_yielded_key.as_ref().map_or(true, |k| match k {
125 Either::Left(k) => *k != committed.key(),
126 Either::Right(item) => item.key() != committed.key(),
127 }) {
128 self.last_yielded_key = Some(Either::Right(committed.clone()));
129 return Some(committed);
130 }
131 }
132 }
133 }
134 (Some((_, _)), None) => {
136 let (key, value) = self.next_pending.take().unwrap();
137 self.advance_pending(); self.last_yielded_key = Some(Either::Left(key)); let version = value.version;
140 match &value.value {
141 Some(value) => return Some((version, key, value).into()),
142 None => continue,
143 }
144 }
145 (None, Some(committed)) => {
147 if self.last_yielded_key.as_ref().map_or(true, |k| match k {
148 Either::Left(k) => *k != committed.key(),
149 Either::Right(item) => item.key() != committed.key(),
150 }) {
151 let committed = self.next_committed.take().unwrap();
152 self.advance_committed(); self.last_yielded_key = Some(Either::Right(committed.clone()));
154 return Some(committed);
155 } else {
156 self.advance_committed();
158 continue;
160 }
161 }
162 (None, None) => return None,
164 }
165 }
166 }
167}