lsm_tree/
range.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use crate::{
6    key::InternalKey,
7    level_manifest::{level::Level, LevelManifest},
8    level_reader::LevelReader,
9    memtable::Memtable,
10    merge::{BoxedIterator, Merger},
11    multi_reader::MultiReader,
12    mvcc_stream::MvccStream,
13    segment::value_block::CachePolicy,
14    value::{SeqNo, UserKey},
15    InternalValue,
16};
17use guardian::ArcRwLockReadGuardian;
18use self_cell::self_cell;
19use std::{ops::Bound, sync::Arc};
20
21#[must_use]
22pub fn seqno_filter(item_seqno: SeqNo, seqno: SeqNo) -> bool {
23    item_seqno < seqno
24}
25
26#[must_use]
27#[allow(clippy::module_name_repetitions)]
28pub fn prefix_to_range(prefix: &[u8]) -> (Bound<UserKey>, Bound<UserKey>) {
29    use std::ops::Bound::{Excluded, Included, Unbounded};
30
31    if prefix.is_empty() {
32        return (Unbounded, Unbounded);
33    }
34
35    let mut end = prefix.to_vec();
36    let len = end.len();
37
38    for (idx, byte) in end.iter_mut().rev().enumerate() {
39        let idx = len - 1 - idx;
40
41        if *byte < 255 {
42            *byte += 1;
43            end.truncate(idx + 1);
44            return (Included(prefix.into()), Excluded(end.into()));
45        }
46    }
47
48    (Included(prefix.into()), Unbounded)
49}
50
51/// The iter state references the memtables used while the range is open
52///
53/// Because of Rust rules, the state is referenced using `self_cell`, see below.
54pub struct IterState {
55    pub(crate) active: Arc<Memtable>,
56    pub(crate) sealed: Vec<Arc<Memtable>>,
57    pub(crate) ephemeral: Option<Arc<Memtable>>,
58
59    // NOTE: Monkey patch to keep segments referenced until range read drops
60    // Otherwise segment files can get deleted too early
61    // (because once we create the range iterator, it does not hold onto segments normally)
62    // TODO: we need a Version system
63    #[allow(unused)]
64    pub(crate) levels: Vec<Arc<Level>>,
65}
66
67type BoxedMerge<'a> = Box<dyn DoubleEndedIterator<Item = crate::Result<InternalValue>> + 'a>;
68
69self_cell!(
70    pub struct TreeIter {
71        owner: IterState,
72
73        #[covariant]
74        dependent: BoxedMerge,
75    }
76);
77
78impl Iterator for TreeIter {
79    type Item = crate::Result<InternalValue>;
80
81    fn next(&mut self) -> Option<Self::Item> {
82        self.with_dependent_mut(|_, iter| iter.next())
83    }
84}
85
86impl DoubleEndedIterator for TreeIter {
87    fn next_back(&mut self) -> Option<Self::Item> {
88        self.with_dependent_mut(|_, iter| iter.next_back())
89    }
90}
91
92fn collect_disjoint_tree_with_range(
93    level_manifest: &LevelManifest,
94    bounds: &(Bound<UserKey>, Bound<UserKey>),
95) -> MultiReader<LevelReader> {
96    debug_assert!(level_manifest.is_disjoint());
97
98    let mut levels = level_manifest
99        .levels
100        .iter()
101        .filter(|x| !x.is_empty())
102        .cloned()
103        .collect::<Vec<_>>();
104
105    // TODO: save key range per level, makes key range sorting easier
106    // and can remove levels not needed
107
108    // NOTE: We know the levels are disjoint to each other, so we can just sort
109    // them by comparing the first segment
110    //
111    // NOTE: Also, we filter out levels that are empty, so expect is fine
112    #[allow(clippy::expect_used)]
113    levels.sort_by(|a, b| {
114        a.segments
115            .first()
116            .expect("level should not be empty")
117            .metadata
118            .key_range
119            .min()
120            .cmp(
121                b.segments
122                    .first()
123                    .expect("level should not be empty")
124                    .metadata
125                    .key_range
126                    .min(),
127            )
128    });
129
130    let readers = levels
131        .into_iter()
132        .filter_map(|lvl| LevelReader::new(lvl, bounds, CachePolicy::Write))
133        .collect();
134
135    MultiReader::new(readers)
136}
137
138impl TreeIter {
139    #[must_use]
140    #[allow(clippy::too_many_lines)]
141    pub fn create_range(
142        guard: IterState,
143        bounds: (Bound<UserKey>, Bound<UserKey>),
144        seqno: Option<SeqNo>,
145        level_manifest: ArcRwLockReadGuardian<LevelManifest>,
146    ) -> Self {
147        Self::new(guard, |lock| {
148            let lo = match &bounds.0 {
149                // NOTE: See memtable.rs for range explanation
150                Bound::Included(key) => Bound::Included(InternalKey::new(
151                    key.clone(),
152                    SeqNo::MAX,
153                    crate::value::ValueType::Tombstone,
154                )),
155                Bound::Excluded(key) => Bound::Excluded(InternalKey::new(
156                    key.clone(),
157                    0,
158                    crate::value::ValueType::Tombstone,
159                )),
160                Bound::Unbounded => Bound::Unbounded,
161            };
162
163            let hi = match &bounds.1 {
164                // NOTE: See memtable.rs for range explanation, this is the reverse case
165                // where we need to go all the way to the last seqno of an item
166                //
167                // Example: We search for (Unbounded..Excluded(abdef))
168                //
169                // key -> seqno
170                //
171                // a   -> 7 <<< This is the lowest key that matches the range
172                // abc -> 5
173                // abc -> 4
174                // abc -> 3 <<< This is the highest key that matches the range
175                // abcdef -> 6
176                // abcdef -> 5
177                //
178                Bound::Included(key) => Bound::Included(InternalKey::new(
179                    key.clone(),
180                    0,
181                    crate::value::ValueType::Value,
182                )),
183                Bound::Excluded(key) => Bound::Excluded(InternalKey::new(
184                    key.clone(),
185                    SeqNo::MAX,
186                    crate::value::ValueType::Value,
187                )),
188                Bound::Unbounded => Bound::Unbounded,
189            };
190
191            let range = (lo, hi);
192
193            let mut iters: Vec<BoxedIterator<'_>> = Vec::with_capacity(5);
194
195            // NOTE: Optimize disjoint trees (e.g. timeseries) to only use a single MultiReader.
196            if level_manifest.is_disjoint() {
197                let reader = collect_disjoint_tree_with_range(&level_manifest, &bounds);
198
199                if let Some(seqno) = seqno {
200                    iters.push(Box::new(reader.filter(move |item| match item {
201                        Ok(item) => seqno_filter(item.key.seqno, seqno),
202                        Err(_) => true,
203                    })));
204                } else {
205                    iters.push(Box::new(reader));
206                }
207            } else {
208                for level in &level_manifest.levels {
209                    if level.is_disjoint {
210                        if !level.is_empty() {
211                            if let Some(reader) =
212                                LevelReader::new(level.clone(), &bounds, CachePolicy::Write)
213                            {
214                                if let Some(seqno) = seqno {
215                                    iters.push(Box::new(reader.filter(move |item| match item {
216                                        Ok(item) => seqno_filter(item.key.seqno, seqno),
217                                        Err(_) => true,
218                                    })));
219                                } else {
220                                    iters.push(Box::new(reader));
221                                }
222                            }
223                        }
224                    } else {
225                        for segment in &level.segments {
226                            if segment.check_key_range_overlap(&bounds) {
227                                let reader = segment.range(bounds.clone());
228
229                                if let Some(seqno) = seqno {
230                                    iters.push(Box::new(reader.filter(move |item| match item {
231                                        Ok(item) => seqno_filter(item.key.seqno, seqno),
232                                        Err(_) => true,
233                                    })));
234                                } else {
235                                    iters.push(Box::new(reader));
236                                }
237                            }
238                        }
239                    }
240                }
241            }
242
243            drop(level_manifest);
244
245            // Sealed memtables
246            for memtable in lock.sealed.iter() {
247                let iter = memtable.range(range.clone());
248
249                if let Some(seqno) = seqno {
250                    iters.push(Box::new(
251                        iter.filter(move |item| seqno_filter(item.key.seqno, seqno))
252                            .map(Ok),
253                    ));
254                } else {
255                    iters.push(Box::new(iter.map(Ok)));
256                }
257            }
258
259            // Active memtable
260            {
261                let iter = lock.active.range(range.clone());
262
263                if let Some(seqno) = seqno {
264                    iters.push(Box::new(
265                        iter.filter(move |item| seqno_filter(item.key.seqno, seqno))
266                            .map(Ok),
267                    ));
268                } else {
269                    iters.push(Box::new(iter.map(Ok)));
270                }
271            }
272
273            if let Some(index) = &lock.ephemeral {
274                let iter = Box::new(index.range(range).map(Ok));
275                iters.push(iter);
276            }
277
278            let merged = Merger::new(iters);
279            let iter = MvccStream::new(merged);
280
281            Box::new(iter.filter(|x| match x {
282                Ok(value) => !value.key.is_tombstone(),
283                Err(_) => true,
284            }))
285        })
286    }
287}
288
289#[cfg(test)]
290mod tests {
291    use super::*;
292    use crate::Slice;
293    use std::ops::Bound::{Excluded, Included, Unbounded};
294    use test_log::test;
295
296    fn test_prefix(prefix: &[u8], upper_bound: Bound<&[u8]>) {
297        let range = prefix_to_range(prefix);
298        assert_eq!(
299            range,
300            (
301                match prefix {
302                    _ if prefix.is_empty() => Unbounded,
303                    _ => Included(Slice::from(prefix)),
304                },
305                // TODO: Bound::map 1.77
306                match upper_bound {
307                    Unbounded => Unbounded,
308                    Included(x) => Included(Slice::from(x)),
309                    Excluded(x) => Excluded(Slice::from(x)),
310                }
311            )
312        );
313    }
314
315    #[test]
316    fn prefix_to_range_basic() {
317        test_prefix(b"abc", Excluded(b"abd"));
318    }
319
320    #[test]
321    fn prefix_to_range_empty() {
322        test_prefix(b"", Unbounded);
323    }
324
325    #[test]
326    fn prefix_to_range_single_char() {
327        test_prefix(b"a", Excluded(b"b"));
328    }
329
330    #[test]
331    fn prefix_to_range_1() {
332        test_prefix(&[0, 250], Excluded(&[0, 251]));
333    }
334
335    #[test]
336    fn prefix_to_range_2() {
337        test_prefix(&[0, 250, 50], Excluded(&[0, 250, 51]));
338    }
339
340    #[test]
341    fn prefix_to_range_3() {
342        test_prefix(&[255, 255, 255], Unbounded);
343    }
344
345    #[test]
346    fn prefix_to_range_char_max() {
347        test_prefix(&[0, 255], Excluded(&[1]));
348    }
349
350    #[test]
351    fn prefix_to_range_char_max_2() {
352        test_prefix(&[0, 2, 255], Excluded(&[0, 3]));
353    }
354}