lsm_tree/
range.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use crate::{
6    key::InternalKey,
7    memtable::Memtable,
8    merge::Merger,
9    mvcc_stream::MvccStream,
10    run_reader::RunReader,
11    value::{SeqNo, UserKey},
12    version::SuperVersion,
13    BoxedIterator, InternalValue,
14};
15use self_cell::self_cell;
16use std::{
17    ops::{Bound, RangeBounds},
18    sync::Arc,
19};
20
21#[must_use]
22pub fn seqno_filter(item_seqno: SeqNo, seqno: SeqNo) -> bool {
23    item_seqno < seqno
24}
25
26pub(crate) fn prefix_upper_range(prefix: &[u8]) -> Bound<UserKey> {
27    use std::ops::Bound::{Excluded, Unbounded};
28
29    if prefix.is_empty() {
30        return Unbounded;
31    }
32
33    let mut end = prefix.to_vec();
34    let len = end.len();
35
36    for (idx, byte) in end.iter_mut().rev().enumerate() {
37        let idx = len - 1 - idx;
38
39        if *byte < 255 {
40            *byte += 1;
41            end.truncate(idx + 1);
42            return Excluded(end.into());
43        }
44    }
45
46    Unbounded
47}
48
49/// Converts a prefix to range bounds.
50#[must_use]
51#[expect(clippy::module_name_repetitions)]
52pub fn prefix_to_range(prefix: &[u8]) -> (Bound<UserKey>, Bound<UserKey>) {
53    use std::ops::Bound::{Included, Unbounded};
54
55    if prefix.is_empty() {
56        return (Unbounded, Unbounded);
57    }
58
59    (Included(prefix.into()), prefix_upper_range(prefix))
60}
61
62/// The iter state references the memtables used while the range is open
63///
64/// Because of Rust rules, the state is referenced using `self_cell`, see below.
65pub struct IterState {
66    pub(crate) version: SuperVersion,
67    pub(crate) ephemeral: Option<Arc<Memtable>>,
68}
69
70type BoxedMerge<'a> = Box<dyn DoubleEndedIterator<Item = crate::Result<InternalValue>> + Send + 'a>;
71
72// TODO: maybe we can lifetime TreeIter and then use InternalKeyRef everywhere to bound lifetime of iterators (no need to construct InternalKey then, can just use range)
73self_cell!(
74    pub struct TreeIter {
75        owner: IterState,
76
77        #[covariant]
78        dependent: BoxedMerge,
79    }
80);
81
82impl Iterator for TreeIter {
83    type Item = crate::Result<InternalValue>;
84
85    fn next(&mut self) -> Option<Self::Item> {
86        self.with_dependent_mut(|_, iter| iter.next())
87    }
88}
89
90impl DoubleEndedIterator for TreeIter {
91    fn next_back(&mut self) -> Option<Self::Item> {
92        self.with_dependent_mut(|_, iter| iter.next_back())
93    }
94}
95
96impl TreeIter {
97    pub fn create_range<K: AsRef<[u8]>, R: RangeBounds<K>>(
98        guard: IterState,
99        range: R,
100        seqno: SeqNo,
101    ) -> Self {
102        Self::new(guard, |lock| {
103            let lo = match range.start_bound() {
104                // NOTE: See memtable.rs for range explanation
105                Bound::Included(key) => Bound::Included(InternalKey::new(
106                    key.as_ref(),
107                    SeqNo::MAX,
108                    crate::ValueType::Tombstone,
109                )),
110                Bound::Excluded(key) => Bound::Excluded(InternalKey::new(
111                    key.as_ref(),
112                    0,
113                    crate::ValueType::Tombstone,
114                )),
115                Bound::Unbounded => Bound::Unbounded,
116            };
117
118            let hi = match range.end_bound() {
119                // NOTE: See memtable.rs for range explanation, this is the reverse case
120                // where we need to go all the way to the last seqno of an item
121                //
122                // Example: We search for (Unbounded..Excluded(abdef))
123                //
124                // key -> seqno
125                //
126                // a   -> 7 <<< This is the lowest key that matches the range
127                // abc -> 5
128                // abc -> 4
129                // abc -> 3 <<< This is the highest key that matches the range
130                // abcdef -> 6
131                // abcdef -> 5
132                //
133                Bound::Included(key) => {
134                    Bound::Included(InternalKey::new(key.as_ref(), 0, crate::ValueType::Value))
135                }
136                Bound::Excluded(key) => Bound::Excluded(InternalKey::new(
137                    key.as_ref(),
138                    SeqNo::MAX,
139                    crate::ValueType::Value,
140                )),
141                Bound::Unbounded => Bound::Unbounded,
142            };
143
144            let range = (lo, hi);
145
146            let mut iters: Vec<BoxedIterator<'_>> = Vec::with_capacity(5);
147
148            for run in lock
149                .version
150                .version
151                .iter_levels()
152                .flat_map(|lvl| lvl.iter())
153            {
154                match run.len() {
155                    0 => {
156                        // Do nothing
157                    }
158                    1 => {
159                        #[expect(clippy::expect_used, reason = "we checked for length")]
160                        let table = run.first().expect("should exist");
161
162                        if table.check_key_range_overlap(&(
163                            range.start_bound().map(|x| &*x.user_key),
164                            range.end_bound().map(|x| &*x.user_key),
165                        )) {
166                            let reader = table.range((
167                                range.start_bound().map(|x| &x.user_key).cloned(),
168                                range.end_bound().map(|x| &x.user_key).cloned(),
169                            ));
170
171                            iters.push(Box::new(reader.filter(move |item| match item {
172                                Ok(item) => seqno_filter(item.key.seqno, seqno),
173                                Err(_) => true,
174                            })));
175                        }
176                    }
177                    _ => {
178                        if let Some(reader) = RunReader::new(
179                            run.clone(),
180                            (
181                                range.start_bound().map(|x| &x.user_key).cloned(),
182                                range.end_bound().map(|x| &x.user_key).cloned(),
183                            ),
184                        ) {
185                            iters.push(Box::new(reader.filter(move |item| match item {
186                                Ok(item) => seqno_filter(item.key.seqno, seqno),
187                                Err(_) => true,
188                            })));
189                        }
190                    }
191                }
192            }
193
194            // Sealed memtables
195            for (_, memtable) in lock.version.sealed_memtables.iter() {
196                let iter = memtable.range(range.clone());
197
198                iters.push(Box::new(
199                    iter.filter(move |item| seqno_filter(item.key.seqno, seqno))
200                        .map(Ok),
201                ));
202            }
203
204            // Active memtable
205            {
206                let iter = lock.version.active_memtable.range(range.clone());
207
208                iters.push(Box::new(
209                    iter.filter(move |item| seqno_filter(item.key.seqno, seqno))
210                        .map(Ok),
211                ));
212            }
213
214            if let Some(index) = &lock.ephemeral {
215                let iter = Box::new(index.range(range).map(Ok));
216                iters.push(iter);
217            }
218
219            let merged = Merger::new(iters);
220            let iter = MvccStream::new(merged);
221
222            Box::new(iter.filter(|x| match x {
223                Ok(value) => !value.key.is_tombstone(),
224                Err(_) => true,
225            }))
226        })
227    }
228}
229
230#[cfg(test)]
231mod tests {
232    use super::*;
233    use crate::Slice;
234    use std::ops::Bound::{Excluded, Included, Unbounded};
235    use test_log::test;
236
237    fn test_prefix(prefix: &[u8], upper_bound: Bound<&[u8]>) {
238        let range = prefix_to_range(prefix);
239        assert_eq!(
240            range,
241            (
242                match prefix {
243                    _ if prefix.is_empty() => Unbounded,
244                    _ => Included(Slice::from(prefix)),
245                },
246                upper_bound.map(Slice::from),
247            ),
248        );
249    }
250
251    #[test]
252    fn prefix_to_range_basic() {
253        test_prefix(b"abc", Excluded(b"abd"));
254    }
255
256    #[test]
257    fn prefix_to_range_empty() {
258        test_prefix(b"", Unbounded);
259    }
260
261    #[test]
262    fn prefix_to_range_single_char() {
263        test_prefix(b"a", Excluded(b"b"));
264    }
265
266    #[test]
267    fn prefix_to_range_1() {
268        test_prefix(&[0, 250], Excluded(&[0, 251]));
269    }
270
271    #[test]
272    fn prefix_to_range_2() {
273        test_prefix(&[0, 250, 50], Excluded(&[0, 250, 51]));
274    }
275
276    #[test]
277    fn prefix_to_range_3() {
278        test_prefix(&[255, 255, 255], Unbounded);
279    }
280
281    #[test]
282    fn prefix_to_range_char_max() {
283        test_prefix(&[0, 255], Excluded(&[1]));
284    }
285
286    #[test]
287    fn prefix_to_range_char_max_2() {
288        test_prefix(&[0, 2, 255], Excluded(&[0, 3]));
289    }
290}