1use crate::{
6 key::InternalKey,
7 memtable::Memtable,
8 merge::Merger,
9 mvcc_stream::MvccStream,
10 run_reader::RunReader,
11 value::{SeqNo, UserKey},
12 version::SuperVersion,
13 BoxedIterator, InternalValue,
14};
15use self_cell::self_cell;
16use std::{
17 ops::{Bound, RangeBounds},
18 sync::Arc,
19};
20
21#[must_use]
22pub fn seqno_filter(item_seqno: SeqNo, seqno: SeqNo) -> bool {
23 item_seqno < seqno
24}
25
26pub(crate) fn prefix_upper_range(prefix: &[u8]) -> Bound<UserKey> {
27 use std::ops::Bound::{Excluded, Unbounded};
28
29 if prefix.is_empty() {
30 return Unbounded;
31 }
32
33 let mut end = prefix.to_vec();
34 let len = end.len();
35
36 for (idx, byte) in end.iter_mut().rev().enumerate() {
37 let idx = len - 1 - idx;
38
39 if *byte < 255 {
40 *byte += 1;
41 end.truncate(idx + 1);
42 return Excluded(end.into());
43 }
44 }
45
46 Unbounded
47}
48
49#[must_use]
51#[expect(clippy::module_name_repetitions)]
52pub fn prefix_to_range(prefix: &[u8]) -> (Bound<UserKey>, Bound<UserKey>) {
53 use std::ops::Bound::{Included, Unbounded};
54
55 if prefix.is_empty() {
56 return (Unbounded, Unbounded);
57 }
58
59 (Included(prefix.into()), prefix_upper_range(prefix))
60}
61
62pub struct IterState {
66 pub(crate) version: SuperVersion,
67 pub(crate) ephemeral: Option<(Arc<Memtable>, SeqNo)>,
68}
69
70type BoxedMerge<'a> = Box<dyn DoubleEndedIterator<Item = crate::Result<InternalValue>> + Send + 'a>;
71
72self_cell!(
74 pub struct TreeIter {
75 owner: IterState,
76
77 #[covariant]
78 dependent: BoxedMerge,
79 }
80);
81
82impl Iterator for TreeIter {
83 type Item = crate::Result<InternalValue>;
84
85 fn next(&mut self) -> Option<Self::Item> {
86 self.with_dependent_mut(|_, iter| iter.next())
87 }
88}
89
90impl DoubleEndedIterator for TreeIter {
91 fn next_back(&mut self) -> Option<Self::Item> {
92 self.with_dependent_mut(|_, iter| iter.next_back())
93 }
94}
95
96impl TreeIter {
97 pub fn create_range<K: AsRef<[u8]>, R: RangeBounds<K>>(
98 guard: IterState,
99 range: R,
100 seqno: SeqNo,
101 ) -> Self {
102 Self::new(guard, |lock| {
103 let lo = match range.start_bound() {
104 Bound::Included(key) => Bound::Included(InternalKey::new(
106 key.as_ref(),
107 SeqNo::MAX,
108 crate::ValueType::Tombstone,
109 )),
110 Bound::Excluded(key) => Bound::Excluded(InternalKey::new(
111 key.as_ref(),
112 0,
113 crate::ValueType::Tombstone,
114 )),
115 Bound::Unbounded => Bound::Unbounded,
116 };
117
118 let hi = match range.end_bound() {
119 Bound::Included(key) => {
134 Bound::Included(InternalKey::new(key.as_ref(), 0, crate::ValueType::Value))
135 }
136 Bound::Excluded(key) => Bound::Excluded(InternalKey::new(
137 key.as_ref(),
138 SeqNo::MAX,
139 crate::ValueType::Value,
140 )),
141 Bound::Unbounded => Bound::Unbounded,
142 };
143
144 let range = (lo, hi);
145
146 let mut iters: Vec<BoxedIterator<'_>> = Vec::with_capacity(5);
147
148 for run in lock
149 .version
150 .version
151 .iter_levels()
152 .flat_map(|lvl| lvl.iter())
153 {
154 match run.len() {
155 0 => {
156 }
158 1 => {
159 #[expect(clippy::expect_used, reason = "we checked for length")]
160 let table = run.first().expect("should exist");
161
162 if table.check_key_range_overlap(&(
163 range.start_bound().map(|x| &*x.user_key),
164 range.end_bound().map(|x| &*x.user_key),
165 )) {
166 let reader = table.range((
167 range.start_bound().map(|x| &x.user_key).cloned(),
168 range.end_bound().map(|x| &x.user_key).cloned(),
169 ));
170
171 iters.push(Box::new(reader.filter(move |item| match item {
172 Ok(item) => seqno_filter(item.key.seqno, seqno),
173 Err(_) => true,
174 })));
175 }
176 }
177 _ => {
178 if let Some(reader) = RunReader::new(
179 run.clone(),
180 (
181 range.start_bound().map(|x| &x.user_key).cloned(),
182 range.end_bound().map(|x| &x.user_key).cloned(),
183 ),
184 ) {
185 iters.push(Box::new(reader.filter(move |item| match item {
186 Ok(item) => seqno_filter(item.key.seqno, seqno),
187 Err(_) => true,
188 })));
189 }
190 }
191 }
192 }
193
194 for memtable in lock.version.sealed_memtables.iter() {
196 let iter = memtable.range(range.clone());
197
198 iters.push(Box::new(
199 iter.filter(move |item| seqno_filter(item.key.seqno, seqno))
200 .map(Ok),
201 ));
202 }
203
204 {
206 let iter = lock.version.active_memtable.range(range.clone());
207
208 iters.push(Box::new(
209 iter.filter(move |item| seqno_filter(item.key.seqno, seqno))
210 .map(Ok),
211 ));
212 }
213
214 if let Some((mt, seqno)) = &lock.ephemeral {
215 let iter = Box::new(
216 mt.range(range)
217 .filter(move |item| seqno_filter(item.key.seqno, *seqno))
218 .map(Ok),
219 );
220 iters.push(iter);
221 }
222
223 let merged = Merger::new(iters);
224 let iter = MvccStream::new(merged);
225
226 Box::new(iter.filter(|x| match x {
227 Ok(value) => !value.key.is_tombstone(),
228 Err(_) => true,
229 }))
230 })
231 }
232}
233
234#[cfg(test)]
235mod tests {
236 use super::*;
237 use crate::Slice;
238 use std::ops::Bound::{Excluded, Included, Unbounded};
239 use test_log::test;
240
241 fn test_prefix(prefix: &[u8], upper_bound: Bound<&[u8]>) {
242 let range = prefix_to_range(prefix);
243 assert_eq!(
244 range,
245 (
246 match prefix {
247 _ if prefix.is_empty() => Unbounded,
248 _ => Included(Slice::from(prefix)),
249 },
250 upper_bound.map(Slice::from),
251 ),
252 );
253 }
254
255 #[test]
256 fn prefix_to_range_basic() {
257 test_prefix(b"abc", Excluded(b"abd"));
258 }
259
260 #[test]
261 fn prefix_to_range_empty() {
262 test_prefix(b"", Unbounded);
263 }
264
265 #[test]
266 fn prefix_to_range_single_char() {
267 test_prefix(b"a", Excluded(b"b"));
268 }
269
270 #[test]
271 fn prefix_to_range_1() {
272 test_prefix(&[0, 250], Excluded(&[0, 251]));
273 }
274
275 #[test]
276 fn prefix_to_range_2() {
277 test_prefix(&[0, 250, 50], Excluded(&[0, 250, 51]));
278 }
279
280 #[test]
281 fn prefix_to_range_3() {
282 test_prefix(&[255, 255, 255], Unbounded);
283 }
284
285 #[test]
286 fn prefix_to_range_char_max() {
287 test_prefix(&[0, 255], Excluded(&[1]));
288 }
289
290 #[test]
291 fn prefix_to_range_char_max_2() {
292 test_prefix(&[0, 2, 255], Excluded(&[0, 3]));
293 }
294}