1use crate::{
6 key::InternalKey,
7 memtable::Memtable,
8 merge::Merger,
9 mvcc_stream::MvccStream,
10 run_reader::RunReader,
11 value::{SeqNo, UserKey},
12 version::SuperVersion,
13 BoxedIterator, InternalValue,
14};
15use self_cell::self_cell;
16use std::{
17 ops::{Bound, RangeBounds},
18 sync::Arc,
19};
20
21#[must_use]
22pub fn seqno_filter(item_seqno: SeqNo, seqno: SeqNo) -> bool {
23 item_seqno < seqno
24}
25
26pub(crate) fn prefix_upper_range(prefix: &[u8]) -> Bound<UserKey> {
27 use std::ops::Bound::{Excluded, Unbounded};
28
29 if prefix.is_empty() {
30 return Unbounded;
31 }
32
33 let mut end = prefix.to_vec();
34 let len = end.len();
35
36 for (idx, byte) in end.iter_mut().rev().enumerate() {
37 let idx = len - 1 - idx;
38
39 if *byte < 255 {
40 *byte += 1;
41 end.truncate(idx + 1);
42 return Excluded(end.into());
43 }
44 }
45
46 Unbounded
47}
48
49#[must_use]
51#[expect(clippy::module_name_repetitions)]
52pub fn prefix_to_range(prefix: &[u8]) -> (Bound<UserKey>, Bound<UserKey>) {
53 use std::ops::Bound::{Included, Unbounded};
54
55 if prefix.is_empty() {
56 return (Unbounded, Unbounded);
57 }
58
59 (Included(prefix.into()), prefix_upper_range(prefix))
60}
61
62pub struct IterState {
66 pub(crate) version: SuperVersion,
67 pub(crate) ephemeral: Option<(Arc<Memtable>, SeqNo)>,
68}
69
70type BoxedMerge<'a> = Box<dyn DoubleEndedIterator<Item = crate::Result<InternalValue>> + Send + 'a>;
71
72self_cell!(
73 pub struct TreeIter {
74 owner: IterState,
75
76 #[covariant]
77 dependent: BoxedMerge,
78 }
79);
80
81impl Iterator for TreeIter {
82 type Item = crate::Result<InternalValue>;
83
84 fn next(&mut self) -> Option<Self::Item> {
85 self.with_dependent_mut(|_, iter| iter.next())
86 }
87}
88
89impl DoubleEndedIterator for TreeIter {
90 fn next_back(&mut self) -> Option<Self::Item> {
91 self.with_dependent_mut(|_, iter| iter.next_back())
92 }
93}
94
95impl TreeIter {
96 pub fn create_range<K: AsRef<[u8]>, R: RangeBounds<K>>(
97 guard: IterState,
98 range: R,
99 seqno: SeqNo,
100 ) -> Self {
101 Self::new(guard, |lock| {
102 let lo = match range.start_bound() {
103 Bound::Included(key) => Bound::Included(InternalKey::new(
105 key.as_ref(),
106 SeqNo::MAX,
107 crate::ValueType::Tombstone,
108 )),
109 Bound::Excluded(key) => Bound::Excluded(InternalKey::new(
110 key.as_ref(),
111 0,
112 crate::ValueType::Tombstone,
113 )),
114 Bound::Unbounded => Bound::Unbounded,
115 };
116
117 let hi = match range.end_bound() {
118 Bound::Included(key) => {
133 Bound::Included(InternalKey::new(key.as_ref(), 0, crate::ValueType::Value))
134 }
135 Bound::Excluded(key) => Bound::Excluded(InternalKey::new(
136 key.as_ref(),
137 SeqNo::MAX,
138 crate::ValueType::Value,
139 )),
140 Bound::Unbounded => Bound::Unbounded,
141 };
142
143 let range = (lo, hi);
144
145 let mut iters: Vec<BoxedIterator<'_>> = Vec::with_capacity(5);
146
147 for run in lock
148 .version
149 .version
150 .iter_levels()
151 .flat_map(|lvl| lvl.iter())
152 {
153 match run.len() {
154 0 => {
155 }
157 1 => {
158 #[expect(clippy::expect_used, reason = "we checked for length")]
159 let table = run.first().expect("should exist");
160
161 if table.check_key_range_overlap(&(
162 range.start_bound().map(|x| &*x.user_key),
163 range.end_bound().map(|x| &*x.user_key),
164 )) {
165 let reader = table
166 .range((
167 range.start_bound().map(|x| &x.user_key).cloned(),
168 range.end_bound().map(|x| &x.user_key).cloned(),
169 ))
170 .filter(move |item| match item {
171 Ok(item) => seqno_filter(item.key.seqno, seqno),
172 Err(_) => true,
173 });
174
175 iters.push(Box::new(reader));
176 }
177 }
178 _ => {
179 if let Some(reader) = RunReader::new(
180 run.clone(),
181 (
182 range.start_bound().map(|x| &x.user_key).cloned(),
183 range.end_bound().map(|x| &x.user_key).cloned(),
184 ),
185 ) {
186 iters.push(Box::new(reader.filter(move |item| match item {
187 Ok(item) => seqno_filter(item.key.seqno, seqno),
188 Err(_) => true,
189 })));
190 }
191 }
192 }
193 }
194
195 for memtable in lock.version.sealed_memtables.iter() {
197 let iter = memtable.range(range.clone());
198
199 iters.push(Box::new(
200 iter.filter(move |item| seqno_filter(item.key.seqno, seqno))
201 .map(Ok),
202 ));
203 }
204
205 {
207 let iter = lock.version.active_memtable.range(range.clone());
208
209 iters.push(Box::new(
210 iter.filter(move |item| seqno_filter(item.key.seqno, seqno))
211 .map(Ok),
212 ));
213 }
214
215 if let Some((mt, seqno)) = &lock.ephemeral {
216 let iter = Box::new(
217 mt.range(range)
218 .filter(move |item| seqno_filter(item.key.seqno, *seqno))
219 .map(Ok),
220 );
221 iters.push(iter);
222 }
223
224 let merged = Merger::new(iters);
225 let iter = MvccStream::new(merged);
226
227 Box::new(iter.filter(|x| match x {
228 Ok(value) => !value.key.is_tombstone(),
229 Err(_) => true,
230 }))
231 })
232 }
233}
234
235#[cfg(test)]
236mod tests {
237 use super::*;
238 use crate::Slice;
239 use std::ops::Bound::{Excluded, Included, Unbounded};
240 use test_log::test;
241
242 fn test_prefix(prefix: &[u8], upper_bound: Bound<&[u8]>) {
243 let range = prefix_to_range(prefix);
244 assert_eq!(
245 range,
246 (
247 match prefix {
248 _ if prefix.is_empty() => Unbounded,
249 _ => Included(Slice::from(prefix)),
250 },
251 upper_bound.map(Slice::from),
252 ),
253 );
254 }
255
256 #[test]
257 fn prefix_to_range_basic() {
258 test_prefix(b"abc", Excluded(b"abd"));
259 }
260
261 #[test]
262 fn prefix_to_range_empty() {
263 test_prefix(b"", Unbounded);
264 }
265
266 #[test]
267 fn prefix_to_range_single_char() {
268 test_prefix(b"a", Excluded(b"b"));
269 }
270
271 #[test]
272 fn prefix_to_range_1() {
273 test_prefix(&[0, 250], Excluded(&[0, 251]));
274 }
275
276 #[test]
277 fn prefix_to_range_2() {
278 test_prefix(&[0, 250, 50], Excluded(&[0, 250, 51]));
279 }
280
281 #[test]
282 fn prefix_to_range_3() {
283 test_prefix(&[255, 255, 255], Unbounded);
284 }
285
286 #[test]
287 fn prefix_to_range_char_max() {
288 test_prefix(&[0, 255], Excluded(&[1]));
289 }
290
291 #[test]
292 fn prefix_to_range_char_max_2() {
293 test_prefix(&[0, 2, 255], Excluded(&[0, 3]));
294 }
295}