lsm_tree/
range.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use crate::{
6    key::InternalKey,
7    memtable::Memtable,
8    merge::Merger,
9    mvcc_stream::MvccStream,
10    run_reader::RunReader,
11    value::{SeqNo, UserKey},
12    version::SuperVersion,
13    BoxedIterator, InternalValue,
14};
15use self_cell::self_cell;
16use std::{
17    ops::{Bound, RangeBounds},
18    sync::Arc,
19};
20
21#[must_use]
22pub fn seqno_filter(item_seqno: SeqNo, seqno: SeqNo) -> bool {
23    item_seqno < seqno
24}
25
26pub(crate) fn prefix_upper_range(prefix: &[u8]) -> Bound<UserKey> {
27    use std::ops::Bound::{Excluded, Unbounded};
28
29    if prefix.is_empty() {
30        return Unbounded;
31    }
32
33    let mut end = prefix.to_vec();
34    let len = end.len();
35
36    for (idx, byte) in end.iter_mut().rev().enumerate() {
37        let idx = len - 1 - idx;
38
39        if *byte < 255 {
40            *byte += 1;
41            end.truncate(idx + 1);
42            return Excluded(end.into());
43        }
44    }
45
46    Unbounded
47}
48
49/// Converts a prefix to range bounds.
50#[must_use]
51#[expect(clippy::module_name_repetitions)]
52pub fn prefix_to_range(prefix: &[u8]) -> (Bound<UserKey>, Bound<UserKey>) {
53    use std::ops::Bound::{Included, Unbounded};
54
55    if prefix.is_empty() {
56        return (Unbounded, Unbounded);
57    }
58
59    (Included(prefix.into()), prefix_upper_range(prefix))
60}
61
62/// The iter state references the memtables used while the range is open
63///
64/// Because of Rust rules, the state is referenced using `self_cell`, see below.
65pub struct IterState {
66    pub(crate) version: SuperVersion,
67    pub(crate) ephemeral: Option<(Arc<Memtable>, SeqNo)>,
68}
69
70type BoxedMerge<'a> = Box<dyn DoubleEndedIterator<Item = crate::Result<InternalValue>> + Send + 'a>;
71
72self_cell!(
73    pub struct TreeIter {
74        owner: IterState,
75
76        #[covariant]
77        dependent: BoxedMerge,
78    }
79);
80
81impl Iterator for TreeIter {
82    type Item = crate::Result<InternalValue>;
83
84    fn next(&mut self) -> Option<Self::Item> {
85        self.with_dependent_mut(|_, iter| iter.next())
86    }
87}
88
89impl DoubleEndedIterator for TreeIter {
90    fn next_back(&mut self) -> Option<Self::Item> {
91        self.with_dependent_mut(|_, iter| iter.next_back())
92    }
93}
94
95impl TreeIter {
96    pub fn create_range<K: AsRef<[u8]>, R: RangeBounds<K>>(
97        guard: IterState,
98        range: R,
99        seqno: SeqNo,
100    ) -> Self {
101        Self::new(guard, |lock| {
102            let lo = match range.start_bound() {
103                // NOTE: See memtable.rs for range explanation
104                Bound::Included(key) => Bound::Included(InternalKey::new(
105                    key.as_ref(),
106                    SeqNo::MAX,
107                    crate::ValueType::Tombstone,
108                )),
109                Bound::Excluded(key) => Bound::Excluded(InternalKey::new(
110                    key.as_ref(),
111                    0,
112                    crate::ValueType::Tombstone,
113                )),
114                Bound::Unbounded => Bound::Unbounded,
115            };
116
117            let hi = match range.end_bound() {
118                // NOTE: See memtable.rs for range explanation, this is the reverse case
119                // where we need to go all the way to the last seqno of an item
120                //
121                // Example: We search for (Unbounded..Excluded(abdef))
122                //
123                // key -> seqno
124                //
125                // a   -> 7 <<< This is the lowest key that matches the range
126                // abc -> 5
127                // abc -> 4
128                // abc -> 3 <<< This is the highest key that matches the range
129                // abcdef -> 6
130                // abcdef -> 5
131                //
132                Bound::Included(key) => {
133                    Bound::Included(InternalKey::new(key.as_ref(), 0, crate::ValueType::Value))
134                }
135                Bound::Excluded(key) => Bound::Excluded(InternalKey::new(
136                    key.as_ref(),
137                    SeqNo::MAX,
138                    crate::ValueType::Value,
139                )),
140                Bound::Unbounded => Bound::Unbounded,
141            };
142
143            let range = (lo, hi);
144
145            let mut iters: Vec<BoxedIterator<'_>> = Vec::with_capacity(5);
146
147            for run in lock
148                .version
149                .version
150                .iter_levels()
151                .flat_map(|lvl| lvl.iter())
152            {
153                match run.len() {
154                    0 => {
155                        // Do nothing
156                    }
157                    1 => {
158                        #[expect(clippy::expect_used, reason = "we checked for length")]
159                        let table = run.first().expect("should exist");
160
161                        if table.check_key_range_overlap(&(
162                            range.start_bound().map(|x| &*x.user_key),
163                            range.end_bound().map(|x| &*x.user_key),
164                        )) {
165                            let reader = table
166                                .range((
167                                    range.start_bound().map(|x| &x.user_key).cloned(),
168                                    range.end_bound().map(|x| &x.user_key).cloned(),
169                                ))
170                                .filter(move |item| match item {
171                                    Ok(item) => seqno_filter(item.key.seqno, seqno),
172                                    Err(_) => true,
173                                });
174
175                            iters.push(Box::new(reader));
176                        }
177                    }
178                    _ => {
179                        if let Some(reader) = RunReader::new(
180                            run.clone(),
181                            (
182                                range.start_bound().map(|x| &x.user_key).cloned(),
183                                range.end_bound().map(|x| &x.user_key).cloned(),
184                            ),
185                        ) {
186                            iters.push(Box::new(reader.filter(move |item| match item {
187                                Ok(item) => seqno_filter(item.key.seqno, seqno),
188                                Err(_) => true,
189                            })));
190                        }
191                    }
192                }
193            }
194
195            // Sealed memtables
196            for memtable in lock.version.sealed_memtables.iter() {
197                let iter = memtable.range(range.clone());
198
199                iters.push(Box::new(
200                    iter.filter(move |item| seqno_filter(item.key.seqno, seqno))
201                        .map(Ok),
202                ));
203            }
204
205            // Active memtable
206            {
207                let iter = lock.version.active_memtable.range(range.clone());
208
209                iters.push(Box::new(
210                    iter.filter(move |item| seqno_filter(item.key.seqno, seqno))
211                        .map(Ok),
212                ));
213            }
214
215            if let Some((mt, seqno)) = &lock.ephemeral {
216                let iter = Box::new(
217                    mt.range(range)
218                        .filter(move |item| seqno_filter(item.key.seqno, *seqno))
219                        .map(Ok),
220                );
221                iters.push(iter);
222            }
223
224            let merged = Merger::new(iters);
225            let iter = MvccStream::new(merged);
226
227            Box::new(iter.filter(|x| match x {
228                Ok(value) => !value.key.is_tombstone(),
229                Err(_) => true,
230            }))
231        })
232    }
233}
234
235#[cfg(test)]
236mod tests {
237    use super::*;
238    use crate::Slice;
239    use std::ops::Bound::{Excluded, Included, Unbounded};
240    use test_log::test;
241
242    fn test_prefix(prefix: &[u8], upper_bound: Bound<&[u8]>) {
243        let range = prefix_to_range(prefix);
244        assert_eq!(
245            range,
246            (
247                match prefix {
248                    _ if prefix.is_empty() => Unbounded,
249                    _ => Included(Slice::from(prefix)),
250                },
251                upper_bound.map(Slice::from),
252            ),
253        );
254    }
255
256    #[test]
257    fn prefix_to_range_basic() {
258        test_prefix(b"abc", Excluded(b"abd"));
259    }
260
261    #[test]
262    fn prefix_to_range_empty() {
263        test_prefix(b"", Unbounded);
264    }
265
266    #[test]
267    fn prefix_to_range_single_char() {
268        test_prefix(b"a", Excluded(b"b"));
269    }
270
271    #[test]
272    fn prefix_to_range_1() {
273        test_prefix(&[0, 250], Excluded(&[0, 251]));
274    }
275
276    #[test]
277    fn prefix_to_range_2() {
278        test_prefix(&[0, 250, 50], Excluded(&[0, 250, 51]));
279    }
280
281    #[test]
282    fn prefix_to_range_3() {
283        test_prefix(&[255, 255, 255], Unbounded);
284    }
285
286    #[test]
287    fn prefix_to_range_char_max() {
288        test_prefix(&[0, 255], Excluded(&[1]));
289    }
290
291    #[test]
292    fn prefix_to_range_char_max_2() {
293        test_prefix(&[0, 2, 255], Excluded(&[0, 3]));
294    }
295}