lsm_tree/range.rs
1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use crate::{
6 key::InternalKey,
7 memtable::Memtable,
8 merge::Merger,
9 mvcc_stream::MvccStream,
10 run_reader::RunReader,
11 value::{SeqNo, UserKey},
12 version::Version,
13 BoxedIterator, InternalValue,
14};
15use self_cell::self_cell;
16use std::{
17 ops::{Bound, RangeBounds},
18 sync::Arc,
19};
20
21#[must_use]
22pub fn seqno_filter(item_seqno: SeqNo, seqno: SeqNo) -> bool {
23 item_seqno < seqno
24}
25
26pub(crate) fn prefix_upper_range(prefix: &[u8]) -> Bound<UserKey> {
27 use std::ops::Bound::{Excluded, Unbounded};
28
29 if prefix.is_empty() {
30 return Unbounded;
31 }
32
33 let mut end = prefix.to_vec();
34 let len = end.len();
35
36 for (idx, byte) in end.iter_mut().rev().enumerate() {
37 let idx = len - 1 - idx;
38
39 if *byte < 255 {
40 *byte += 1;
41 end.truncate(idx + 1);
42 return Excluded(end.into());
43 }
44 }
45
46 Unbounded
47}
48
49/// Converts a prefix to range bounds.
50#[must_use]
51#[allow(clippy::module_name_repetitions)]
52pub fn prefix_to_range(prefix: &[u8]) -> (Bound<UserKey>, Bound<UserKey>) {
53 use std::ops::Bound::{Included, Unbounded};
54
55 if prefix.is_empty() {
56 return (Unbounded, Unbounded);
57 }
58
59 (Included(prefix.into()), prefix_upper_range(prefix))
60}
61
62/// The iter state references the memtables used while the range is open
63///
64/// Because of Rust rules, the state is referenced using `self_cell`, see below.
65pub struct IterState {
66 pub(crate) active: Arc<Memtable>,
67 pub(crate) sealed: Vec<Arc<Memtable>>,
68 pub(crate) ephemeral: Option<Arc<Memtable>>,
69
70 // NOTE: Hold the version so tables cannot be unlinked
71 #[allow(unused)]
72 pub(crate) version: Version,
73}
74
75type BoxedMerge<'a> = Box<dyn DoubleEndedIterator<Item = crate::Result<InternalValue>> + 'a>;
76
77// TODO: maybe we can lifetime TreeIter and then use InternalKeyRef everywhere to bound lifetime of iterators (no need to construct InternalKey then, can just use range)
78self_cell!(
79 pub struct TreeIter {
80 owner: IterState,
81
82 #[covariant]
83 dependent: BoxedMerge,
84 }
85);
86
87impl Iterator for TreeIter {
88 type Item = crate::Result<InternalValue>;
89
90 fn next(&mut self) -> Option<Self::Item> {
91 self.with_dependent_mut(|_, iter| iter.next())
92 }
93}
94
95impl DoubleEndedIterator for TreeIter {
96 fn next_back(&mut self) -> Option<Self::Item> {
97 self.with_dependent_mut(|_, iter| iter.next_back())
98 }
99}
100
101/* fn collect_disjoint_tree_with_range<'a>(
102 level_manifest: &LevelManifest,
103 bounds: &(Bound<UserKey>, Bound<UserKey>),
104) -> MultiReader<RunReader<'a>> {
105 todo!()
106
107 /* debug_assert!(level_manifest.is_disjoint());
108
109 let mut levels = level_manifest
110 .levels
111 .iter()
112 .filter(|x| !x.is_empty())
113 .cloned()
114 .collect::<Vec<_>>();
115
116 // TODO: save key range per level, makes key range sorting easier
117 // and can remove levels not needed
118
119 // NOTE: We know the levels are disjoint to each other, so we can just sort
120 // them by comparing the first segment
121 //
122 // NOTE: Also, we filter out levels that are empty, so expect is fine
123 #[allow(clippy::expect_used)]
124 levels.sort_by(|a, b| {
125 a.segments
126 .first()
127 .expect("level should not be empty")
128 .metadata
129 .key_range
130 .min()
131 .cmp(
132 b.segments
133 .first()
134 .expect("level should not be empty")
135 .metadata
136 .key_range
137 .min(),
138 )
139 });
140
141 let readers = levels
142 .into_iter()
143 .filter_map(|lvl| LevelReader::new(lvl, bounds, CachePolicy::Write))
144 .collect();
145
146 MultiReader::new(readers) */
147} */
148
149impl TreeIter {
150 #[must_use]
151 #[allow(clippy::too_many_lines)]
152 pub fn create_range<K: AsRef<[u8]>, R: RangeBounds<K>>(
153 guard: IterState,
154 range: R,
155 seqno: SeqNo,
156 version: &Version,
157 ) -> Self {
158 Self::new(guard, |lock| {
159 let lo = match range.start_bound() {
160 // NOTE: See memtable.rs for range explanation
161 Bound::Included(key) => Bound::Included(InternalKey::new(
162 key.as_ref(),
163 SeqNo::MAX,
164 crate::ValueType::Tombstone,
165 )),
166 Bound::Excluded(key) => Bound::Excluded(InternalKey::new(
167 key.as_ref(),
168 0,
169 crate::ValueType::Tombstone,
170 )),
171 Bound::Unbounded => Bound::Unbounded,
172 };
173
174 let hi = match range.end_bound() {
175 // NOTE: See memtable.rs for range explanation, this is the reverse case
176 // where we need to go all the way to the last seqno of an item
177 //
178 // Example: We search for (Unbounded..Excluded(abdef))
179 //
180 // key -> seqno
181 //
182 // a -> 7 <<< This is the lowest key that matches the range
183 // abc -> 5
184 // abc -> 4
185 // abc -> 3 <<< This is the highest key that matches the range
186 // abcdef -> 6
187 // abcdef -> 5
188 //
189 Bound::Included(key) => {
190 Bound::Included(InternalKey::new(key.as_ref(), 0, crate::ValueType::Value))
191 }
192 Bound::Excluded(key) => Bound::Excluded(InternalKey::new(
193 key.as_ref(),
194 SeqNo::MAX,
195 crate::ValueType::Value,
196 )),
197 Bound::Unbounded => Bound::Unbounded,
198 };
199
200 let range = (lo, hi);
201
202 let mut iters: Vec<BoxedIterator<'_>> = Vec::with_capacity(5);
203
204 /* // TODO: Optimize disjoint trees (e.g. timeseries) to only use a single MultiReader.
205 if level_manifest.is_disjoint() {
206 let reader = collect_disjoint_tree_with_range(&level_manifest, &bounds);
207
208 if let Some(seqno) = seqno {
209 iters.push(Box::new(reader.filter(move |item| match item {
210 Ok(item) => seqno_filter(item.key.seqno, seqno),
211 Err(_) => true,
212 })));
213 } else {
214 iters.push(Box::new(reader));
215 }
216 } else { */
217
218 // };
219
220 #[allow(clippy::needless_continue)]
221 for run in version.iter_levels().flat_map(|lvl| lvl.iter()) {
222 match run.len() {
223 0 => continue,
224 1 => {
225 // NOTE: We checked for length
226 #[allow(clippy::expect_used)]
227 let table = run.first().expect("should exist");
228
229 if table.check_key_range_overlap(&(
230 range.start_bound().map(|x| &*x.user_key),
231 range.end_bound().map(|x| &*x.user_key),
232 )) {
233 let reader = table.range((
234 range.start_bound().map(|x| &x.user_key).cloned(),
235 range.end_bound().map(|x| &x.user_key).cloned(),
236 ));
237
238 iters.push(Box::new(reader.filter(move |item| match item {
239 Ok(item) => seqno_filter(item.key.seqno, seqno),
240 Err(_) => true,
241 })));
242 }
243 }
244 _ => {
245 if let Some(reader) = RunReader::new(
246 run.clone(),
247 (
248 range.start_bound().map(|x| &x.user_key).cloned(),
249 range.end_bound().map(|x| &x.user_key).cloned(),
250 ),
251 ) {
252 iters.push(Box::new(reader.filter(move |item| match item {
253 Ok(item) => seqno_filter(item.key.seqno, seqno),
254 Err(_) => true,
255 })));
256 }
257 }
258 }
259
260 /* if level.is_disjoint {
261 if !level.is_empty() {
262 if let Some(reader) =
263 LevelReader::new(level.clone(), &bounds, CachePolicy::Write)
264 {
265 if let Some(seqno) = seqno {
266 iters.push(Box::new(reader.filter(move |item| match item {
267 Ok(item) => seqno_filter(item.key.seqno, seqno),
268 Err(_) => true,
269 })));
270 } else {
271 iters.push(Box::new(reader));
272 }
273 }
274 }
275 } else {
276 for segment in &level.segments {
277 if segment.check_key_range_overlap(&bounds) {
278 let reader = segment.range(bounds.clone());
279
280 if let Some(seqno) = seqno {
281 iters.push(Box::new(reader.filter(move |item| match item {
282 Ok(item) => seqno_filter(item.key.seqno, seqno),
283 Err(_) => true,
284 })));
285 } else {
286 iters.push(Box::new(reader));
287 }
288 }
289 }
290 } */
291 }
292
293 // Sealed memtables
294 for memtable in &lock.sealed {
295 let iter = memtable.range(range.clone());
296
297 iters.push(Box::new(
298 iter.filter(move |item| seqno_filter(item.key.seqno, seqno))
299 .map(Ok),
300 ));
301 }
302
303 // Active memtable
304 {
305 let iter = lock.active.range(range.clone());
306
307 iters.push(Box::new(
308 iter.filter(move |item| seqno_filter(item.key.seqno, seqno))
309 .map(Ok),
310 ));
311 }
312
313 if let Some(index) = &lock.ephemeral {
314 let iter = Box::new(index.range(range).map(Ok));
315 iters.push(iter);
316 }
317
318 let merged = Merger::new(iters);
319 let iter = MvccStream::new(merged);
320
321 Box::new(iter.filter(|x| match x {
322 Ok(value) => !value.key.is_tombstone(),
323 Err(_) => true,
324 }))
325 })
326 }
327}
328
329#[cfg(test)]
330mod tests {
331 use super::*;
332 use crate::Slice;
333 use std::ops::Bound::{Excluded, Included, Unbounded};
334 use test_log::test;
335
336 fn test_prefix(prefix: &[u8], upper_bound: Bound<&[u8]>) {
337 let range = prefix_to_range(prefix);
338 assert_eq!(
339 range,
340 (
341 match prefix {
342 _ if prefix.is_empty() => Unbounded,
343 _ => Included(Slice::from(prefix)),
344 },
345 upper_bound.map(Slice::from),
346 ),
347 );
348 }
349
350 #[test]
351 fn prefix_to_range_basic() {
352 test_prefix(b"abc", Excluded(b"abd"));
353 }
354
355 #[test]
356 fn prefix_to_range_empty() {
357 test_prefix(b"", Unbounded);
358 }
359
360 #[test]
361 fn prefix_to_range_single_char() {
362 test_prefix(b"a", Excluded(b"b"));
363 }
364
365 #[test]
366 fn prefix_to_range_1() {
367 test_prefix(&[0, 250], Excluded(&[0, 251]));
368 }
369
370 #[test]
371 fn prefix_to_range_2() {
372 test_prefix(&[0, 250, 50], Excluded(&[0, 250, 51]));
373 }
374
375 #[test]
376 fn prefix_to_range_3() {
377 test_prefix(&[255, 255, 255], Unbounded);
378 }
379
380 #[test]
381 fn prefix_to_range_char_max() {
382 test_prefix(&[0, 255], Excluded(&[1]));
383 }
384
385 #[test]
386 fn prefix_to_range_char_max_2() {
387 test_prefix(&[0, 2, 255], Excluded(&[0, 3]));
388 }
389}