lsm_tree/
mvcc_stream.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use crate::{InternalValue, UserKey};
6use double_ended_peekable::{DoubleEndedPeekable, DoubleEndedPeekableExt};
7
8/// Consumes a stream of KVs and emits a new stream according to MVCC and tombstone rules
9///
10/// This iterator is used for read operations.
11#[allow(clippy::module_name_repetitions)]
12pub struct MvccStream<I: DoubleEndedIterator<Item = crate::Result<InternalValue>>> {
13    inner: DoubleEndedPeekable<I>,
14}
15
16impl<I: DoubleEndedIterator<Item = crate::Result<InternalValue>>> MvccStream<I> {
17    /// Initializes a new merge iterator
18    #[must_use]
19    pub fn new(iter: I) -> Self {
20        let iter = iter.double_ended_peekable();
21        Self { inner: iter }
22    }
23
24    fn drain_key_min(&mut self, key: &UserKey) -> crate::Result<()> {
25        loop {
26            let Some(next) = self.inner.peek() else {
27                return Ok(());
28            };
29
30            let Ok(next) = next else {
31                // NOTE: We just asserted, the peeked value is an error
32                #[allow(clippy::expect_used)]
33                return Err(self
34                    .inner
35                    .next()
36                    .expect("should exist")
37                    .expect_err("should be error"));
38            };
39
40            // Consume version
41            if next.key.user_key == key {
42                // NOTE: We know the next value is not empty, because we just peeked it
43                #[allow(clippy::expect_used)]
44                self.inner.next().expect("should not be empty")?;
45            } else {
46                return Ok(());
47            }
48        }
49    }
50}
51
52impl<I: DoubleEndedIterator<Item = crate::Result<InternalValue>>> Iterator for MvccStream<I> {
53    type Item = crate::Result<InternalValue>;
54
55    fn next(&mut self) -> Option<Self::Item> {
56        let head = fail_iter!(self.inner.next()?);
57
58        // As long as items are the same key, ignore them
59        fail_iter!(self.drain_key_min(&head.key.user_key));
60
61        Some(Ok(head))
62    }
63}
64
65impl<I: DoubleEndedIterator<Item = crate::Result<InternalValue>>> DoubleEndedIterator
66    for MvccStream<I>
67{
68    fn next_back(&mut self) -> Option<Self::Item> {
69        loop {
70            let tail = fail_iter!(self.inner.next_back()?);
71
72            let prev = match self.inner.peek_back() {
73                Some(Ok(prev)) => prev,
74                Some(Err(_)) => {
75                    // NOTE: We just asserted, the peeked value is an error
76                    #[allow(clippy::expect_used)]
77                    return Some(Err(self
78                        .inner
79                        .next_back()
80                        .expect("should exist")
81                        .expect_err("should be error")));
82                }
83                None => {
84                    return Some(Ok(tail));
85                }
86            };
87
88            if prev.key.user_key < tail.key.user_key {
89                return Some(Ok(tail));
90            }
91        }
92    }
93}
94
95#[cfg(test)]
96#[allow(clippy::string_lit_as_bytes)]
97mod tests {
98    use super::*;
99    use crate::value::{InternalValue, ValueType};
100    use test_log::test;
101
102    macro_rules! stream {
103      ($($key:expr, $sub_key:expr, $value_type:expr),* $(,)?) => {{
104          let mut values = Vec::new();
105          let mut counters = std::collections::HashMap::new();
106
107          $(
108              let key = $key.as_bytes();
109              let sub_key = $sub_key.as_bytes();
110              let value_type = match $value_type {
111                  "V" => ValueType::Value,
112                  "T" => ValueType::Tombstone,
113                  "W" => ValueType::WeakTombstone,
114                  _ => panic!("Unknown value type"),
115              };
116
117              let counter = counters.entry($key).and_modify(|x| { *x -= 1 }).or_insert(999);
118              values.push(InternalValue::from_components(key, sub_key, *counter, value_type));
119          )*
120
121          values
122      }};
123    }
124
125    macro_rules! iter_closed {
126        ($iter:expr) => {
127            assert!($iter.next().is_none(), "iterator should be closed (done)");
128            assert!(
129                $iter.next_back().is_none(),
130                "iterator should be closed (done)"
131            );
132        };
133    }
134
135    // NOTE: Tests that the iterator emit the same stuff forwards and backwards, just in reverse
136    macro_rules! test_reverse {
137        ($v:expr) => {
138            let iter = Box::new($v.iter().cloned().map(Ok));
139            let iter = MvccStream::new(iter);
140            let mut forwards = iter.flatten().collect::<Vec<_>>();
141            forwards.reverse();
142
143            let iter = Box::new($v.iter().cloned().map(Ok));
144            let iter = MvccStream::new(iter);
145            let backwards = iter.rev().flatten().collect::<Vec<_>>();
146
147            assert_eq!(forwards, backwards);
148        };
149    }
150
151    #[test]
152    #[allow(clippy::unwrap_used)]
153    fn mvcc_queue_reverse_almost_gone() -> crate::Result<()> {
154        let vec = [
155            InternalValue::from_components("a", "a", 0, ValueType::Value),
156            InternalValue::from_components("b", "", 1, ValueType::Tombstone),
157            InternalValue::from_components("b", "b", 0, ValueType::Value),
158            InternalValue::from_components("c", "", 1, ValueType::Tombstone),
159            InternalValue::from_components("c", "c", 0, ValueType::Value),
160            InternalValue::from_components("d", "", 1, ValueType::Tombstone),
161            InternalValue::from_components("d", "d", 0, ValueType::Value),
162            InternalValue::from_components("e", "", 1, ValueType::Tombstone),
163            InternalValue::from_components("e", "e", 0, ValueType::Value),
164        ];
165
166        let iter = Box::new(vec.iter().cloned().map(Ok));
167
168        let mut iter = MvccStream::new(iter);
169
170        assert_eq!(
171            InternalValue::from_components(*b"a", *b"a", 0, ValueType::Value),
172            iter.next().unwrap()?,
173        );
174        assert_eq!(
175            InternalValue::from_components(*b"b", *b"", 1, ValueType::Tombstone),
176            iter.next().unwrap()?,
177        );
178        assert_eq!(
179            InternalValue::from_components(*b"c", *b"", 1, ValueType::Tombstone),
180            iter.next().unwrap()?,
181        );
182        assert_eq!(
183            InternalValue::from_components(*b"d", *b"", 1, ValueType::Tombstone),
184            iter.next().unwrap()?,
185        );
186        assert_eq!(
187            InternalValue::from_components(*b"e", *b"", 1, ValueType::Tombstone),
188            iter.next().unwrap()?,
189        );
190        iter_closed!(iter);
191
192        test_reverse!(vec);
193
194        Ok(())
195    }
196
197    #[test]
198    #[allow(clippy::unwrap_used)]
199    fn mvcc_queue_almost_gone_2() -> crate::Result<()> {
200        let vec = [
201            InternalValue::from_components("a", "a", 0, ValueType::Value),
202            InternalValue::from_components("b", "", 1, ValueType::Tombstone),
203            InternalValue::from_components("c", "", 1, ValueType::Tombstone),
204            InternalValue::from_components("d", "", 1, ValueType::Tombstone),
205            InternalValue::from_components("e", "", 1, ValueType::Tombstone),
206        ];
207
208        let iter = Box::new(vec.iter().cloned().map(Ok));
209
210        let mut iter = MvccStream::new(iter);
211
212        assert_eq!(
213            InternalValue::from_components(*b"a", *b"a", 0, ValueType::Value),
214            iter.next().unwrap()?,
215        );
216        assert_eq!(
217            InternalValue::from_components(*b"b", *b"", 1, ValueType::Tombstone),
218            iter.next().unwrap()?,
219        );
220        assert_eq!(
221            InternalValue::from_components(*b"c", *b"", 1, ValueType::Tombstone),
222            iter.next().unwrap()?,
223        );
224        assert_eq!(
225            InternalValue::from_components(*b"d", *b"", 1, ValueType::Tombstone),
226            iter.next().unwrap()?,
227        );
228        assert_eq!(
229            InternalValue::from_components(*b"e", *b"", 1, ValueType::Tombstone),
230            iter.next().unwrap()?,
231        );
232        iter_closed!(iter);
233
234        test_reverse!(vec);
235
236        Ok(())
237    }
238
239    #[test]
240    #[allow(clippy::unwrap_used)]
241    fn mvcc_queue() -> crate::Result<()> {
242        let vec = [
243            InternalValue::from_components("a", "a", 0, ValueType::Value),
244            InternalValue::from_components("b", "b", 0, ValueType::Value),
245            InternalValue::from_components("c", "c", 0, ValueType::Value),
246            InternalValue::from_components("d", "d", 0, ValueType::Value),
247            InternalValue::from_components("e", "", 1, ValueType::Tombstone),
248            InternalValue::from_components("e", "e", 0, ValueType::Value),
249        ];
250
251        let iter = Box::new(vec.iter().cloned().map(Ok));
252
253        let mut iter = MvccStream::new(iter);
254
255        assert_eq!(
256            InternalValue::from_components(*b"a", *b"a", 0, ValueType::Value),
257            iter.next().unwrap()?,
258        );
259        assert_eq!(
260            InternalValue::from_components(*b"b", *b"b", 0, ValueType::Value),
261            iter.next().unwrap()?,
262        );
263        assert_eq!(
264            InternalValue::from_components(*b"c", *b"c", 0, ValueType::Value),
265            iter.next().unwrap()?,
266        );
267        assert_eq!(
268            InternalValue::from_components(*b"d", *b"d", 0, ValueType::Value),
269            iter.next().unwrap()?,
270        );
271        assert_eq!(
272            InternalValue::from_components(*b"e", *b"", 1, ValueType::Tombstone),
273            iter.next().unwrap()?,
274        );
275        iter_closed!(iter);
276
277        test_reverse!(vec);
278
279        Ok(())
280    }
281
282    #[test]
283    #[allow(clippy::unwrap_used)]
284    fn mvcc_queue_weak_almost_gone() -> crate::Result<()> {
285        let vec = [
286            InternalValue::from_components("a", "a", 0, ValueType::Value),
287            InternalValue::from_components("b", "", 1, ValueType::WeakTombstone),
288            InternalValue::from_components("b", "b", 0, ValueType::Value),
289            InternalValue::from_components("c", "", 1, ValueType::WeakTombstone),
290            InternalValue::from_components("c", "c", 0, ValueType::Value),
291            InternalValue::from_components("d", "", 1, ValueType::WeakTombstone),
292            InternalValue::from_components("d", "d", 0, ValueType::Value),
293            InternalValue::from_components("e", "", 1, ValueType::WeakTombstone),
294            InternalValue::from_components("e", "e", 0, ValueType::Value),
295        ];
296
297        let iter = Box::new(vec.iter().cloned().map(Ok));
298
299        let mut iter = MvccStream::new(iter);
300
301        assert_eq!(
302            InternalValue::from_components(*b"a", *b"a", 0, ValueType::Value),
303            iter.next().unwrap()?,
304        );
305        assert_eq!(
306            InternalValue::from_components(*b"b", *b"", 1, ValueType::WeakTombstone),
307            iter.next().unwrap()?,
308        );
309        assert_eq!(
310            InternalValue::from_components(*b"c", *b"", 1, ValueType::WeakTombstone),
311            iter.next().unwrap()?,
312        );
313        assert_eq!(
314            InternalValue::from_components(*b"d", *b"", 1, ValueType::WeakTombstone),
315            iter.next().unwrap()?,
316        );
317        assert_eq!(
318            InternalValue::from_components(*b"e", *b"", 1, ValueType::WeakTombstone),
319            iter.next().unwrap()?,
320        );
321        iter_closed!(iter);
322
323        test_reverse!(vec);
324
325        Ok(())
326    }
327
328    #[test]
329    #[allow(clippy::unwrap_used)]
330    fn mvcc_queue_weak_almost_gone_2() -> crate::Result<()> {
331        let vec = [
332            InternalValue::from_components("a", "a", 0, ValueType::Value),
333            InternalValue::from_components("b", "", 1, ValueType::WeakTombstone),
334            InternalValue::from_components("c", "", 1, ValueType::WeakTombstone),
335            InternalValue::from_components("d", "", 1, ValueType::WeakTombstone),
336            InternalValue::from_components("e", "", 1, ValueType::WeakTombstone),
337        ];
338
339        let iter = Box::new(vec.iter().cloned().map(Ok));
340
341        let mut iter = MvccStream::new(iter);
342
343        assert_eq!(
344            InternalValue::from_components(*b"a", *b"a", 0, ValueType::Value),
345            iter.next().unwrap()?,
346        );
347        assert_eq!(
348            InternalValue::from_components(*b"b", *b"", 1, ValueType::WeakTombstone),
349            iter.next().unwrap()?,
350        );
351        assert_eq!(
352            InternalValue::from_components(*b"c", *b"", 1, ValueType::WeakTombstone),
353            iter.next().unwrap()?,
354        );
355        assert_eq!(
356            InternalValue::from_components(*b"d", *b"", 1, ValueType::WeakTombstone),
357            iter.next().unwrap()?,
358        );
359        assert_eq!(
360            InternalValue::from_components(*b"e", *b"", 1, ValueType::WeakTombstone),
361            iter.next().unwrap()?,
362        );
363        iter_closed!(iter);
364
365        test_reverse!(vec);
366
367        Ok(())
368    }
369
370    #[test]
371    #[allow(clippy::unwrap_used)]
372    fn mvcc_queue_weak_reverse() -> crate::Result<()> {
373        let vec = [
374            InternalValue::from_components("a", "a", 0, ValueType::Value),
375            InternalValue::from_components("b", "b", 0, ValueType::Value),
376            InternalValue::from_components("c", "c", 0, ValueType::Value),
377            InternalValue::from_components("d", "d", 0, ValueType::Value),
378            InternalValue::from_components("e", "", 1, ValueType::WeakTombstone),
379            InternalValue::from_components("e", "e", 0, ValueType::Value),
380        ];
381
382        let iter = Box::new(vec.iter().cloned().map(Ok));
383
384        let mut iter = MvccStream::new(iter);
385
386        assert_eq!(
387            InternalValue::from_components(*b"a", *b"a", 0, ValueType::Value),
388            iter.next().unwrap()?,
389        );
390        assert_eq!(
391            InternalValue::from_components(*b"b", *b"b", 0, ValueType::Value),
392            iter.next().unwrap()?,
393        );
394        assert_eq!(
395            InternalValue::from_components(*b"c", *b"c", 0, ValueType::Value),
396            iter.next().unwrap()?,
397        );
398        assert_eq!(
399            InternalValue::from_components(*b"d", *b"d", 0, ValueType::Value),
400            iter.next().unwrap()?,
401        );
402        assert_eq!(
403            InternalValue::from_components(*b"e", *b"", 1, ValueType::WeakTombstone),
404            iter.next().unwrap()?,
405        );
406        iter_closed!(iter);
407
408        test_reverse!(vec);
409
410        Ok(())
411    }
412
413    #[test]
414    #[allow(clippy::unwrap_used)]
415    fn mvcc_stream_simple() -> crate::Result<()> {
416        #[rustfmt::skip]
417        let vec = stream![
418          "a", "new", "V",
419          "a", "old", "V",
420        ];
421
422        let iter = Box::new(vec.iter().cloned().map(Ok));
423
424        let mut iter = MvccStream::new(iter);
425
426        assert_eq!(
427            InternalValue::from_components(*b"a", *b"new", 999, ValueType::Value),
428            iter.next().unwrap()?,
429        );
430        iter_closed!(iter);
431
432        test_reverse!(vec);
433
434        Ok(())
435    }
436
437    #[test]
438    #[allow(clippy::unwrap_used)]
439    fn mvcc_stream_simple_multi_keys() -> crate::Result<()> {
440        #[rustfmt::skip]
441        let vec = stream![
442          "a", "new", "V",
443          "a", "old", "V",
444          "b", "new", "V",
445          "b", "old", "V",
446          "c", "newnew", "V",
447          "c", "new", "V",
448          "c", "old", "V",
449        ];
450
451        let iter = Box::new(vec.iter().cloned().map(Ok));
452
453        let mut iter = MvccStream::new(iter);
454
455        assert_eq!(
456            InternalValue::from_components(*b"a", *b"new", 999, ValueType::Value),
457            iter.next().unwrap()?,
458        );
459        assert_eq!(
460            InternalValue::from_components(*b"b", *b"new", 999, ValueType::Value),
461            iter.next().unwrap()?,
462        );
463        assert_eq!(
464            InternalValue::from_components(*b"c", *b"newnew", 999, ValueType::Value),
465            iter.next().unwrap()?,
466        );
467        iter_closed!(iter);
468
469        test_reverse!(vec);
470
471        Ok(())
472    }
473
474    #[test]
475    #[allow(clippy::unwrap_used)]
476    fn mvcc_stream_tombstone() -> crate::Result<()> {
477        #[rustfmt::skip]
478        let vec = stream![
479          "a", "", "T",
480          "a", "old", "V",
481        ];
482
483        let iter = Box::new(vec.iter().cloned().map(Ok));
484
485        let mut iter = MvccStream::new(iter);
486
487        assert_eq!(
488            InternalValue::from_components(*b"a", *b"", 999, ValueType::Tombstone),
489            iter.next().unwrap()?,
490        );
491        iter_closed!(iter);
492
493        test_reverse!(vec);
494
495        Ok(())
496    }
497
498    #[test]
499    #[allow(clippy::unwrap_used)]
500    fn mvcc_stream_tombstone_multi_keys() -> crate::Result<()> {
501        #[rustfmt::skip]
502        let vec = stream![
503          "a", "", "T",
504          "a", "old", "V",
505          "b", "", "T",
506          "b", "old", "V",
507          "c", "", "T",
508          "c", "", "T",
509          "c", "old", "V",
510        ];
511
512        let iter = Box::new(vec.iter().cloned().map(Ok));
513
514        let mut iter = MvccStream::new(iter);
515
516        assert_eq!(
517            InternalValue::from_components(*b"a", *b"", 999, ValueType::Tombstone),
518            iter.next().unwrap()?,
519        );
520        assert_eq!(
521            InternalValue::from_components(*b"b", *b"", 999, ValueType::Tombstone),
522            iter.next().unwrap()?,
523        );
524        assert_eq!(
525            InternalValue::from_components(*b"c", *b"", 999, ValueType::Tombstone),
526            iter.next().unwrap()?,
527        );
528        iter_closed!(iter);
529
530        test_reverse!(vec);
531
532        Ok(())
533    }
534
535    #[test]
536    #[allow(clippy::unwrap_used)]
537    fn mvcc_stream_weak_tombstone_simple() -> crate::Result<()> {
538        #[rustfmt::skip]
539        let vec = stream![
540          "a", "", "W",
541          "a", "old", "V",
542        ];
543
544        let iter = Box::new(vec.iter().cloned().map(Ok));
545
546        let mut iter = MvccStream::new(iter);
547
548        assert_eq!(
549            InternalValue::from_components(*b"a", *b"", 999, ValueType::WeakTombstone),
550            iter.next().unwrap()?,
551        );
552        iter_closed!(iter);
553
554        test_reverse!(vec);
555
556        Ok(())
557    }
558
559    #[test]
560    #[allow(clippy::unwrap_used)]
561    fn mvcc_stream_weak_tombstone_resurrection() -> crate::Result<()> {
562        #[rustfmt::skip]
563        let vec = stream![
564          "a", "", "W",
565          "a", "new", "V",
566          "a", "old", "V",
567        ];
568
569        let iter = Box::new(vec.iter().cloned().map(Ok));
570
571        let mut iter = MvccStream::new(iter);
572
573        assert_eq!(
574            InternalValue::from_components(*b"a", *b"", 999, ValueType::WeakTombstone),
575            iter.next().unwrap()?,
576        );
577        iter_closed!(iter);
578
579        test_reverse!(vec);
580
581        Ok(())
582    }
583
584    #[test]
585    #[allow(clippy::unwrap_used)]
586    fn mvcc_stream_weak_tombstone_priority() -> crate::Result<()> {
587        #[rustfmt::skip]
588        let vec = stream![
589          "a", "", "T",  
590          "a", "", "W",
591          "a", "new", "V",
592          "a", "old", "V",
593        ];
594
595        let iter = Box::new(vec.iter().cloned().map(Ok));
596
597        let mut iter = MvccStream::new(iter);
598
599        assert_eq!(
600            InternalValue::from_components(*b"a", *b"", 999, ValueType::Tombstone),
601            iter.next().unwrap()?,
602        );
603        iter_closed!(iter);
604
605        test_reverse!(vec);
606
607        Ok(())
608    }
609
610    #[test]
611    #[allow(clippy::unwrap_used)]
612    fn mvcc_stream_weak_tombstone_multi_keys() -> crate::Result<()> {
613        #[rustfmt::skip]
614        let vec = stream![
615          "a", "", "W",
616          "a", "old", "V",
617          "b", "", "W",
618          "b", "old", "V",
619          "c", "", "W",
620          "c", "old", "V",
621        ];
622
623        let iter = Box::new(vec.iter().cloned().map(Ok));
624
625        let mut iter = MvccStream::new(iter);
626
627        assert_eq!(
628            InternalValue::from_components(*b"a", *b"", 999, ValueType::WeakTombstone),
629            iter.next().unwrap()?,
630        );
631        assert_eq!(
632            InternalValue::from_components(*b"b", *b"", 999, ValueType::WeakTombstone),
633            iter.next().unwrap()?,
634        );
635        assert_eq!(
636            InternalValue::from_components(*b"c", *b"", 999, ValueType::WeakTombstone),
637            iter.next().unwrap()?,
638        );
639        iter_closed!(iter);
640
641        test_reverse!(vec);
642
643        Ok(())
644    }
645}