1use crate::{
6 key::InternalKey,
7 memtable::Memtable,
8 merge::Merger,
9 mvcc_stream::MvccStream,
10 run_reader::RunReader,
11 value::{SeqNo, UserKey},
12 version::Version,
13 BoxedIterator, InternalValue,
14};
15use self_cell::self_cell;
16use std::{
17 ops::{Bound, RangeBounds},
18 sync::Arc,
19};
20
21#[must_use]
22pub fn seqno_filter(item_seqno: SeqNo, seqno: SeqNo) -> bool {
23 item_seqno < seqno
24}
25
26pub(crate) fn prefix_upper_range(prefix: &[u8]) -> Bound<UserKey> {
27 use std::ops::Bound::{Excluded, Unbounded};
28
29 if prefix.is_empty() {
30 return Unbounded;
31 }
32
33 let mut end = prefix.to_vec();
34 let len = end.len();
35
36 for (idx, byte) in end.iter_mut().rev().enumerate() {
37 let idx = len - 1 - idx;
38
39 if *byte < 255 {
40 *byte += 1;
41 end.truncate(idx + 1);
42 return Excluded(end.into());
43 }
44 }
45
46 Unbounded
47}
48
49#[must_use]
51#[expect(clippy::module_name_repetitions)]
52pub fn prefix_to_range(prefix: &[u8]) -> (Bound<UserKey>, Bound<UserKey>) {
53 use std::ops::Bound::{Included, Unbounded};
54
55 if prefix.is_empty() {
56 return (Unbounded, Unbounded);
57 }
58
59 (Included(prefix.into()), prefix_upper_range(prefix))
60}
61
62pub struct IterState {
66 pub(crate) active: Arc<Memtable>,
67 pub(crate) sealed: Vec<Arc<Memtable>>,
68 pub(crate) ephemeral: Option<Arc<Memtable>>,
69
70 #[expect(unused, reason = "version is held so tables cannot be unlinked")]
71 pub(crate) version: Version,
72}
73
74type BoxedMerge<'a> = Box<dyn DoubleEndedIterator<Item = crate::Result<InternalValue>> + 'a>;
75
76self_cell!(
78 pub struct TreeIter {
79 owner: IterState,
80
81 #[covariant]
82 dependent: BoxedMerge,
83 }
84);
85
86impl Iterator for TreeIter {
87 type Item = crate::Result<InternalValue>;
88
89 fn next(&mut self) -> Option<Self::Item> {
90 self.with_dependent_mut(|_, iter| iter.next())
91 }
92}
93
94impl DoubleEndedIterator for TreeIter {
95 fn next_back(&mut self) -> Option<Self::Item> {
96 self.with_dependent_mut(|_, iter| iter.next_back())
97 }
98}
99
100impl TreeIter {
101 pub fn create_range<K: AsRef<[u8]>, R: RangeBounds<K>>(
102 guard: IterState,
103 range: R,
104 seqno: SeqNo,
105 version: &Version,
106 ) -> Self {
107 Self::new(guard, |lock| {
108 let lo = match range.start_bound() {
109 Bound::Included(key) => Bound::Included(InternalKey::new(
111 key.as_ref(),
112 SeqNo::MAX,
113 crate::ValueType::Tombstone,
114 )),
115 Bound::Excluded(key) => Bound::Excluded(InternalKey::new(
116 key.as_ref(),
117 0,
118 crate::ValueType::Tombstone,
119 )),
120 Bound::Unbounded => Bound::Unbounded,
121 };
122
123 let hi = match range.end_bound() {
124 Bound::Included(key) => {
139 Bound::Included(InternalKey::new(key.as_ref(), 0, crate::ValueType::Value))
140 }
141 Bound::Excluded(key) => Bound::Excluded(InternalKey::new(
142 key.as_ref(),
143 SeqNo::MAX,
144 crate::ValueType::Value,
145 )),
146 Bound::Unbounded => Bound::Unbounded,
147 };
148
149 let range = (lo, hi);
150
151 let mut iters: Vec<BoxedIterator<'_>> = Vec::with_capacity(5);
152
153 for run in version.iter_levels().flat_map(|lvl| lvl.iter()) {
154 match run.len() {
155 0 => {
156 }
158 1 => {
159 #[expect(clippy::expect_used, reason = "we checked for length")]
160 let table = run.first().expect("should exist");
161
162 if table.check_key_range_overlap(&(
163 range.start_bound().map(|x| &*x.user_key),
164 range.end_bound().map(|x| &*x.user_key),
165 )) {
166 let reader = table.range((
167 range.start_bound().map(|x| &x.user_key).cloned(),
168 range.end_bound().map(|x| &x.user_key).cloned(),
169 ));
170
171 iters.push(Box::new(reader.filter(move |item| match item {
172 Ok(item) => seqno_filter(item.key.seqno, seqno),
173 Err(_) => true,
174 })));
175 }
176 }
177 _ => {
178 if let Some(reader) = RunReader::new(
179 run.clone(),
180 (
181 range.start_bound().map(|x| &x.user_key).cloned(),
182 range.end_bound().map(|x| &x.user_key).cloned(),
183 ),
184 ) {
185 iters.push(Box::new(reader.filter(move |item| match item {
186 Ok(item) => seqno_filter(item.key.seqno, seqno),
187 Err(_) => true,
188 })));
189 }
190 }
191 }
192 }
193
194 for memtable in &lock.sealed {
196 let iter = memtable.range(range.clone());
197
198 iters.push(Box::new(
199 iter.filter(move |item| seqno_filter(item.key.seqno, seqno))
200 .map(Ok),
201 ));
202 }
203
204 {
206 let iter = lock.active.range(range.clone());
207
208 iters.push(Box::new(
209 iter.filter(move |item| seqno_filter(item.key.seqno, seqno))
210 .map(Ok),
211 ));
212 }
213
214 if let Some(index) = &lock.ephemeral {
215 let iter = Box::new(index.range(range).map(Ok));
216 iters.push(iter);
217 }
218
219 let merged = Merger::new(iters);
220 let iter = MvccStream::new(merged);
221
222 Box::new(iter.filter(|x| match x {
223 Ok(value) => !value.key.is_tombstone(),
224 Err(_) => true,
225 }))
226 })
227 }
228}
229
230#[cfg(test)]
231mod tests {
232 use super::*;
233 use crate::Slice;
234 use std::ops::Bound::{Excluded, Included, Unbounded};
235 use test_log::test;
236
237 fn test_prefix(prefix: &[u8], upper_bound: Bound<&[u8]>) {
238 let range = prefix_to_range(prefix);
239 assert_eq!(
240 range,
241 (
242 match prefix {
243 _ if prefix.is_empty() => Unbounded,
244 _ => Included(Slice::from(prefix)),
245 },
246 upper_bound.map(Slice::from),
247 ),
248 );
249 }
250
251 #[test]
252 fn prefix_to_range_basic() {
253 test_prefix(b"abc", Excluded(b"abd"));
254 }
255
256 #[test]
257 fn prefix_to_range_empty() {
258 test_prefix(b"", Unbounded);
259 }
260
261 #[test]
262 fn prefix_to_range_single_char() {
263 test_prefix(b"a", Excluded(b"b"));
264 }
265
266 #[test]
267 fn prefix_to_range_1() {
268 test_prefix(&[0, 250], Excluded(&[0, 251]));
269 }
270
271 #[test]
272 fn prefix_to_range_2() {
273 test_prefix(&[0, 250, 50], Excluded(&[0, 250, 51]));
274 }
275
276 #[test]
277 fn prefix_to_range_3() {
278 test_prefix(&[255, 255, 255], Unbounded);
279 }
280
281 #[test]
282 fn prefix_to_range_char_max() {
283 test_prefix(&[0, 255], Excluded(&[1]));
284 }
285
286 #[test]
287 fn prefix_to_range_char_max_2() {
288 test_prefix(&[0, 2, 255], Excluded(&[0, 3]));
289 }
290}