lsm_tree/
range.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use crate::{
6    key::InternalKey,
7    memtable::Memtable,
8    merge::Merger,
9    mvcc_stream::MvccStream,
10    run_reader::RunReader,
11    value::{SeqNo, UserKey},
12    version::Version,
13    BoxedIterator, InternalValue,
14};
15use self_cell::self_cell;
16use std::{
17    ops::{Bound, RangeBounds},
18    sync::Arc,
19};
20
21#[must_use]
22pub fn seqno_filter(item_seqno: SeqNo, seqno: SeqNo) -> bool {
23    item_seqno < seqno
24}
25
26pub(crate) fn prefix_upper_range(prefix: &[u8]) -> Bound<UserKey> {
27    use std::ops::Bound::{Excluded, Unbounded};
28
29    if prefix.is_empty() {
30        return Unbounded;
31    }
32
33    let mut end = prefix.to_vec();
34    let len = end.len();
35
36    for (idx, byte) in end.iter_mut().rev().enumerate() {
37        let idx = len - 1 - idx;
38
39        if *byte < 255 {
40            *byte += 1;
41            end.truncate(idx + 1);
42            return Excluded(end.into());
43        }
44    }
45
46    Unbounded
47}
48
49/// Converts a prefix to range bounds.
50#[must_use]
51#[allow(clippy::module_name_repetitions)]
52pub fn prefix_to_range(prefix: &[u8]) -> (Bound<UserKey>, Bound<UserKey>) {
53    use std::ops::Bound::{Included, Unbounded};
54
55    if prefix.is_empty() {
56        return (Unbounded, Unbounded);
57    }
58
59    (Included(prefix.into()), prefix_upper_range(prefix))
60}
61
62/// The iter state references the memtables used while the range is open
63///
64/// Because of Rust rules, the state is referenced using `self_cell`, see below.
65pub struct IterState {
66    pub(crate) active: Arc<Memtable>,
67    pub(crate) sealed: Vec<Arc<Memtable>>,
68    pub(crate) ephemeral: Option<Arc<Memtable>>,
69
70    // NOTE: Hold the version so tables cannot be unlinked
71    #[allow(unused)]
72    pub(crate) version: Version,
73}
74
75type BoxedMerge<'a> = Box<dyn DoubleEndedIterator<Item = crate::Result<InternalValue>> + 'a>;
76
77// TODO: maybe we can lifetime TreeIter and then use InternalKeyRef everywhere to bound lifetime of iterators (no need to construct InternalKey then, can just use range)
78self_cell!(
79    pub struct TreeIter {
80        owner: IterState,
81
82        #[covariant]
83        dependent: BoxedMerge,
84    }
85);
86
87impl Iterator for TreeIter {
88    type Item = crate::Result<InternalValue>;
89
90    fn next(&mut self) -> Option<Self::Item> {
91        self.with_dependent_mut(|_, iter| iter.next())
92    }
93}
94
95impl DoubleEndedIterator for TreeIter {
96    fn next_back(&mut self) -> Option<Self::Item> {
97        self.with_dependent_mut(|_, iter| iter.next_back())
98    }
99}
100
101/* fn collect_disjoint_tree_with_range<'a>(
102    level_manifest: &LevelManifest,
103    bounds: &(Bound<UserKey>, Bound<UserKey>),
104) -> MultiReader<RunReader<'a>> {
105    todo!()
106
107    /* debug_assert!(level_manifest.is_disjoint());
108
109    let mut levels = level_manifest
110        .levels
111        .iter()
112        .filter(|x| !x.is_empty())
113        .cloned()
114        .collect::<Vec<_>>();
115
116    // TODO: save key range per level, makes key range sorting easier
117    // and can remove levels not needed
118
119    // NOTE: We know the levels are disjoint to each other, so we can just sort
120    // them by comparing the first segment
121    //
122    // NOTE: Also, we filter out levels that are empty, so expect is fine
123    #[allow(clippy::expect_used)]
124    levels.sort_by(|a, b| {
125        a.segments
126            .first()
127            .expect("level should not be empty")
128            .metadata
129            .key_range
130            .min()
131            .cmp(
132                b.segments
133                    .first()
134                    .expect("level should not be empty")
135                    .metadata
136                    .key_range
137                    .min(),
138            )
139    });
140
141    let readers = levels
142        .into_iter()
143        .filter_map(|lvl| LevelReader::new(lvl, bounds, CachePolicy::Write))
144        .collect();
145
146    MultiReader::new(readers) */
147} */
148
149impl TreeIter {
150    #[must_use]
151    #[allow(clippy::too_many_lines)]
152    pub fn create_range<K: AsRef<[u8]>, R: RangeBounds<K>>(
153        guard: IterState,
154        range: R,
155        seqno: SeqNo,
156        version: &Version,
157    ) -> Self {
158        Self::new(guard, |lock| {
159            let lo = match range.start_bound() {
160                // NOTE: See memtable.rs for range explanation
161                Bound::Included(key) => Bound::Included(InternalKey::new(
162                    key.as_ref(),
163                    SeqNo::MAX,
164                    crate::ValueType::Tombstone,
165                )),
166                Bound::Excluded(key) => Bound::Excluded(InternalKey::new(
167                    key.as_ref(),
168                    0,
169                    crate::ValueType::Tombstone,
170                )),
171                Bound::Unbounded => Bound::Unbounded,
172            };
173
174            let hi = match range.end_bound() {
175                // NOTE: See memtable.rs for range explanation, this is the reverse case
176                // where we need to go all the way to the last seqno of an item
177                //
178                // Example: We search for (Unbounded..Excluded(abdef))
179                //
180                // key -> seqno
181                //
182                // a   -> 7 <<< This is the lowest key that matches the range
183                // abc -> 5
184                // abc -> 4
185                // abc -> 3 <<< This is the highest key that matches the range
186                // abcdef -> 6
187                // abcdef -> 5
188                //
189                Bound::Included(key) => {
190                    Bound::Included(InternalKey::new(key.as_ref(), 0, crate::ValueType::Value))
191                }
192                Bound::Excluded(key) => Bound::Excluded(InternalKey::new(
193                    key.as_ref(),
194                    SeqNo::MAX,
195                    crate::ValueType::Value,
196                )),
197                Bound::Unbounded => Bound::Unbounded,
198            };
199
200            let range = (lo, hi);
201
202            let mut iters: Vec<BoxedIterator<'_>> = Vec::with_capacity(5);
203
204            /* // TODO: Optimize disjoint trees (e.g. timeseries) to only use a single MultiReader.
205            if level_manifest.is_disjoint() {
206                let reader = collect_disjoint_tree_with_range(&level_manifest, &bounds);
207
208                if let Some(seqno) = seqno {
209                    iters.push(Box::new(reader.filter(move |item| match item {
210                        Ok(item) => seqno_filter(item.key.seqno, seqno),
211                        Err(_) => true,
212                    })));
213                } else {
214                    iters.push(Box::new(reader));
215                }
216            } else { */
217
218            // };
219
220            #[allow(clippy::needless_continue)]
221            for run in version.iter_levels().flat_map(|lvl| lvl.iter()) {
222                match run.len() {
223                    0 => continue,
224                    1 => {
225                        // NOTE: We checked for length
226                        #[allow(clippy::expect_used)]
227                        let table = run.first().expect("should exist");
228
229                        if table.check_key_range_overlap(&(
230                            range.start_bound().map(|x| &*x.user_key),
231                            range.end_bound().map(|x| &*x.user_key),
232                        )) {
233                            let reader = table.range((
234                                range.start_bound().map(|x| &x.user_key).cloned(),
235                                range.end_bound().map(|x| &x.user_key).cloned(),
236                            ));
237
238                            iters.push(Box::new(reader.filter(move |item| match item {
239                                Ok(item) => seqno_filter(item.key.seqno, seqno),
240                                Err(_) => true,
241                            })));
242                        }
243                    }
244                    _ => {
245                        if let Some(reader) = RunReader::new(
246                            run.clone(),
247                            (
248                                range.start_bound().map(|x| &x.user_key).cloned(),
249                                range.end_bound().map(|x| &x.user_key).cloned(),
250                            ),
251                        ) {
252                            iters.push(Box::new(reader.filter(move |item| match item {
253                                Ok(item) => seqno_filter(item.key.seqno, seqno),
254                                Err(_) => true,
255                            })));
256                        }
257                    }
258                }
259
260                /* if level.is_disjoint {
261                    if !level.is_empty() {
262                        if let Some(reader) =
263                            LevelReader::new(level.clone(), &bounds, CachePolicy::Write)
264                        {
265                            if let Some(seqno) = seqno {
266                                iters.push(Box::new(reader.filter(move |item| match item {
267                                    Ok(item) => seqno_filter(item.key.seqno, seqno),
268                                    Err(_) => true,
269                                })));
270                            } else {
271                                iters.push(Box::new(reader));
272                            }
273                        }
274                    }
275                } else {
276                    for segment in &level.segments {
277                        if segment.check_key_range_overlap(&bounds) {
278                            let reader = segment.range(bounds.clone());
279
280                            if let Some(seqno) = seqno {
281                                iters.push(Box::new(reader.filter(move |item| match item {
282                                    Ok(item) => seqno_filter(item.key.seqno, seqno),
283                                    Err(_) => true,
284                                })));
285                            } else {
286                                iters.push(Box::new(reader));
287                            }
288                        }
289                    }
290                } */
291            }
292
293            // Sealed memtables
294            for memtable in &lock.sealed {
295                let iter = memtable.range(range.clone());
296
297                iters.push(Box::new(
298                    iter.filter(move |item| seqno_filter(item.key.seqno, seqno))
299                        .map(Ok),
300                ));
301            }
302
303            // Active memtable
304            {
305                let iter = lock.active.range(range.clone());
306
307                iters.push(Box::new(
308                    iter.filter(move |item| seqno_filter(item.key.seqno, seqno))
309                        .map(Ok),
310                ));
311            }
312
313            if let Some(index) = &lock.ephemeral {
314                let iter = Box::new(index.range(range).map(Ok));
315                iters.push(iter);
316            }
317
318            let merged = Merger::new(iters);
319            let iter = MvccStream::new(merged);
320
321            Box::new(iter.filter(|x| match x {
322                Ok(value) => !value.key.is_tombstone(),
323                Err(_) => true,
324            }))
325        })
326    }
327}
328
329#[cfg(test)]
330mod tests {
331    use super::*;
332    use crate::Slice;
333    use std::ops::Bound::{Excluded, Included, Unbounded};
334    use test_log::test;
335
336    fn test_prefix(prefix: &[u8], upper_bound: Bound<&[u8]>) {
337        let range = prefix_to_range(prefix);
338        assert_eq!(
339            range,
340            (
341                match prefix {
342                    _ if prefix.is_empty() => Unbounded,
343                    _ => Included(Slice::from(prefix)),
344                },
345                upper_bound.map(Slice::from),
346            ),
347        );
348    }
349
350    #[test]
351    fn prefix_to_range_basic() {
352        test_prefix(b"abc", Excluded(b"abd"));
353    }
354
355    #[test]
356    fn prefix_to_range_empty() {
357        test_prefix(b"", Unbounded);
358    }
359
360    #[test]
361    fn prefix_to_range_single_char() {
362        test_prefix(b"a", Excluded(b"b"));
363    }
364
365    #[test]
366    fn prefix_to_range_1() {
367        test_prefix(&[0, 250], Excluded(&[0, 251]));
368    }
369
370    #[test]
371    fn prefix_to_range_2() {
372        test_prefix(&[0, 250, 50], Excluded(&[0, 250, 51]));
373    }
374
375    #[test]
376    fn prefix_to_range_3() {
377        test_prefix(&[255, 255, 255], Unbounded);
378    }
379
380    #[test]
381    fn prefix_to_range_char_max() {
382        test_prefix(&[0, 255], Excluded(&[1]));
383    }
384
385    #[test]
386    fn prefix_to_range_char_max_2() {
387        test_prefix(&[0, 2, 255], Excluded(&[0, 3]));
388    }
389}