Skip to main content

lsm_tree/
mvcc_stream.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use crate::double_ended_peekable::{DoubleEndedPeekable, DoubleEndedPeekableExt};
6use crate::{InternalValue, UserKey};
7
8/// Consumes a stream of KVs and emits a new stream according to MVCC and tombstone rules
9///
10/// This iterator is used for read operations.
11pub struct MvccStream<I: DoubleEndedIterator<Item = crate::Result<InternalValue>>> {
12    inner: DoubleEndedPeekable<crate::Result<InternalValue>, I>,
13}
14
15impl<I: DoubleEndedIterator<Item = crate::Result<InternalValue>>> MvccStream<I> {
16    /// Initializes a new multi-version-aware iterator.
17    #[must_use]
18    pub fn new(iter: I) -> Self {
19        Self {
20            inner: iter.double_ended_peekable(),
21        }
22    }
23
24    // Drains all entries for the given user key from the front of the iterator.
25    fn drain_key_min(&mut self, key: &UserKey) -> crate::Result<()> {
26        loop {
27            let Some(next) = self.inner.next_if(|kv| {
28                if let Ok(kv) = kv {
29                    kv.key.user_key == key
30                } else {
31                    true
32                }
33            }) else {
34                return Ok(());
35            };
36
37            next?;
38        }
39    }
40}
41
42impl<I: DoubleEndedIterator<Item = crate::Result<InternalValue>>> Iterator for MvccStream<I> {
43    type Item = crate::Result<InternalValue>;
44
45    fn next(&mut self) -> Option<Self::Item> {
46        let head = fail_iter!(self.inner.next()?);
47
48        // As long as items are the same key, ignore them
49        fail_iter!(self.drain_key_min(&head.key.user_key));
50
51        Some(Ok(head))
52    }
53}
54
55impl<I: DoubleEndedIterator<Item = crate::Result<InternalValue>>> DoubleEndedIterator
56    for MvccStream<I>
57{
58    fn next_back(&mut self) -> Option<Self::Item> {
59        loop {
60            let tail = fail_iter!(self.inner.next_back()?);
61
62            let prev = match self.inner.peek_back() {
63                Some(Ok(prev)) => prev,
64                Some(Err(_)) => {
65                    #[expect(
66                        clippy::expect_used,
67                        reason = "we just asserted, the peeked value is an error"
68                    )]
69                    return Some(Err(self
70                        .inner
71                        .next_back()
72                        .expect("should exist")
73                        .expect_err("should be error")));
74                }
75                None => {
76                    return Some(Ok(tail));
77                }
78            };
79
80            if prev.key.user_key < tail.key.user_key {
81                return Some(Ok(tail));
82            }
83        }
84    }
85}
86
87#[cfg(test)]
88#[expect(clippy::string_lit_as_bytes)]
89mod tests {
90    use super::*;
91    use crate::{value::InternalValue, ValueType};
92    use test_log::test;
93
94    macro_rules! stream {
95      ($($key:expr, $sub_key:expr, $value_type:expr),* $(,)?) => {{
96          let mut values = Vec::new();
97          let mut counters = std::collections::HashMap::new();
98
99          $(
100              let key = $key.as_bytes();
101              let sub_key = $sub_key.as_bytes();
102              let value_type = match $value_type {
103                  "V" => ValueType::Value,
104                  "T" => ValueType::Tombstone,
105                  "W" => ValueType::WeakTombstone,
106                  _ => panic!("Unknown value type"),
107              };
108
109              let counter = counters.entry($key).and_modify(|x| { *x -= 1 }).or_insert(999);
110              values.push(InternalValue::from_components(key, sub_key, *counter, value_type));
111          )*
112
113          values
114      }};
115    }
116
117    macro_rules! iter_closed {
118        ($iter:expr) => {
119            assert!($iter.next().is_none(), "iterator should be closed (done)");
120            assert!(
121                $iter.next_back().is_none(),
122                "iterator should be closed (done)"
123            );
124        };
125    }
126
127    /// Tests that the iterator emit the same stuff forwards and backwards, just in reverse
128    macro_rules! test_reverse {
129        ($v:expr) => {
130            let iter = Box::new($v.iter().cloned().map(Ok));
131            let iter = MvccStream::new(iter);
132            let mut forwards = iter.flatten().collect::<Vec<_>>();
133            forwards.reverse();
134
135            let iter = Box::new($v.iter().cloned().map(Ok));
136            let iter = MvccStream::new(iter);
137            let backwards = iter.rev().flatten().collect::<Vec<_>>();
138
139            assert_eq!(forwards, backwards);
140        };
141    }
142
143    #[test]
144    #[expect(clippy::unwrap_used)]
145    fn mvcc_stream_error() -> crate::Result<()> {
146        {
147            let vec = [
148                Ok(InternalValue::from_components(
149                    "a",
150                    "new",
151                    999,
152                    ValueType::Value,
153                )),
154                Err(crate::Error::Io(std::io::Error::other("test error"))),
155            ];
156
157            let iter = Box::new(vec.into_iter());
158            let mut iter = MvccStream::new(iter);
159
160            // Because next calls drain_key_min, the error is immediately first, even though
161            // the first item is technically Ok
162            assert!(matches!(iter.next().unwrap(), Err(crate::Error::Io(_))));
163            iter_closed!(iter);
164        }
165
166        {
167            let vec = [
168                Ok(InternalValue::from_components(
169                    "a",
170                    "new",
171                    999,
172                    ValueType::Value,
173                )),
174                Err(crate::Error::Io(std::io::Error::other("test error"))),
175            ];
176
177            let iter = Box::new(vec.into_iter());
178            let mut iter = MvccStream::new(iter);
179
180            assert!(matches!(
181                iter.next_back().unwrap(),
182                Err(crate::Error::Io(_))
183            ));
184            assert_eq!(
185                InternalValue::from_components(*b"a", *b"new", 999, ValueType::Value),
186                iter.next_back().unwrap()?,
187            );
188            iter_closed!(iter);
189        }
190
191        Ok(())
192    }
193
194    #[test]
195    #[expect(clippy::unwrap_used)]
196    fn mvcc_queue_reverse_almost_gone() -> crate::Result<()> {
197        let vec = [
198            InternalValue::from_components("a", "a", 0, ValueType::Value),
199            InternalValue::from_components("b", "", 1, ValueType::Tombstone),
200            InternalValue::from_components("b", "b", 0, ValueType::Value),
201            InternalValue::from_components("c", "", 1, ValueType::Tombstone),
202            InternalValue::from_components("c", "c", 0, ValueType::Value),
203            InternalValue::from_components("d", "", 1, ValueType::Tombstone),
204            InternalValue::from_components("d", "d", 0, ValueType::Value),
205            InternalValue::from_components("e", "", 1, ValueType::Tombstone),
206            InternalValue::from_components("e", "e", 0, ValueType::Value),
207        ];
208
209        let iter = Box::new(vec.iter().cloned().map(Ok));
210
211        let mut iter = MvccStream::new(iter);
212
213        assert_eq!(
214            InternalValue::from_components(*b"a", *b"a", 0, ValueType::Value),
215            iter.next().unwrap()?,
216        );
217        assert_eq!(
218            InternalValue::from_components(*b"b", *b"", 1, ValueType::Tombstone),
219            iter.next().unwrap()?,
220        );
221        assert_eq!(
222            InternalValue::from_components(*b"c", *b"", 1, ValueType::Tombstone),
223            iter.next().unwrap()?,
224        );
225        assert_eq!(
226            InternalValue::from_components(*b"d", *b"", 1, ValueType::Tombstone),
227            iter.next().unwrap()?,
228        );
229        assert_eq!(
230            InternalValue::from_components(*b"e", *b"", 1, ValueType::Tombstone),
231            iter.next().unwrap()?,
232        );
233        iter_closed!(iter);
234
235        test_reverse!(vec);
236
237        Ok(())
238    }
239
240    #[test]
241    #[expect(clippy::unwrap_used)]
242    fn mvcc_queue_almost_gone_2() -> crate::Result<()> {
243        let vec = [
244            InternalValue::from_components("a", "a", 0, ValueType::Value),
245            InternalValue::from_components("b", "", 1, ValueType::Tombstone),
246            InternalValue::from_components("c", "", 1, ValueType::Tombstone),
247            InternalValue::from_components("d", "", 1, ValueType::Tombstone),
248            InternalValue::from_components("e", "", 1, ValueType::Tombstone),
249        ];
250
251        let iter = Box::new(vec.iter().cloned().map(Ok));
252
253        let mut iter = MvccStream::new(iter);
254
255        assert_eq!(
256            InternalValue::from_components(*b"a", *b"a", 0, ValueType::Value),
257            iter.next().unwrap()?,
258        );
259        assert_eq!(
260            InternalValue::from_components(*b"b", *b"", 1, ValueType::Tombstone),
261            iter.next().unwrap()?,
262        );
263        assert_eq!(
264            InternalValue::from_components(*b"c", *b"", 1, ValueType::Tombstone),
265            iter.next().unwrap()?,
266        );
267        assert_eq!(
268            InternalValue::from_components(*b"d", *b"", 1, ValueType::Tombstone),
269            iter.next().unwrap()?,
270        );
271        assert_eq!(
272            InternalValue::from_components(*b"e", *b"", 1, ValueType::Tombstone),
273            iter.next().unwrap()?,
274        );
275        iter_closed!(iter);
276
277        test_reverse!(vec);
278
279        Ok(())
280    }
281
282    #[test]
283    #[expect(clippy::unwrap_used)]
284    fn mvcc_queue() -> crate::Result<()> {
285        let vec = [
286            InternalValue::from_components("a", "a", 0, ValueType::Value),
287            InternalValue::from_components("b", "b", 0, ValueType::Value),
288            InternalValue::from_components("c", "c", 0, ValueType::Value),
289            InternalValue::from_components("d", "d", 0, ValueType::Value),
290            InternalValue::from_components("e", "", 1, ValueType::Tombstone),
291            InternalValue::from_components("e", "e", 0, ValueType::Value),
292        ];
293
294        let iter = Box::new(vec.iter().cloned().map(Ok));
295
296        let mut iter = MvccStream::new(iter);
297
298        assert_eq!(
299            InternalValue::from_components(*b"a", *b"a", 0, ValueType::Value),
300            iter.next().unwrap()?,
301        );
302        assert_eq!(
303            InternalValue::from_components(*b"b", *b"b", 0, ValueType::Value),
304            iter.next().unwrap()?,
305        );
306        assert_eq!(
307            InternalValue::from_components(*b"c", *b"c", 0, ValueType::Value),
308            iter.next().unwrap()?,
309        );
310        assert_eq!(
311            InternalValue::from_components(*b"d", *b"d", 0, ValueType::Value),
312            iter.next().unwrap()?,
313        );
314        assert_eq!(
315            InternalValue::from_components(*b"e", *b"", 1, ValueType::Tombstone),
316            iter.next().unwrap()?,
317        );
318        iter_closed!(iter);
319
320        test_reverse!(vec);
321
322        Ok(())
323    }
324
325    #[test]
326    #[expect(clippy::unwrap_used)]
327    fn mvcc_queue_weak_almost_gone() -> crate::Result<()> {
328        let vec = [
329            InternalValue::from_components("a", "a", 0, ValueType::Value),
330            InternalValue::from_components("b", "", 1, ValueType::WeakTombstone),
331            InternalValue::from_components("b", "b", 0, ValueType::Value),
332            InternalValue::from_components("c", "", 1, ValueType::WeakTombstone),
333            InternalValue::from_components("c", "c", 0, ValueType::Value),
334            InternalValue::from_components("d", "", 1, ValueType::WeakTombstone),
335            InternalValue::from_components("d", "d", 0, ValueType::Value),
336            InternalValue::from_components("e", "", 1, ValueType::WeakTombstone),
337            InternalValue::from_components("e", "e", 0, ValueType::Value),
338        ];
339
340        let iter = Box::new(vec.iter().cloned().map(Ok));
341
342        let mut iter = MvccStream::new(iter);
343
344        assert_eq!(
345            InternalValue::from_components(*b"a", *b"a", 0, ValueType::Value),
346            iter.next().unwrap()?,
347        );
348        assert_eq!(
349            InternalValue::from_components(*b"b", *b"", 1, ValueType::WeakTombstone),
350            iter.next().unwrap()?,
351        );
352        assert_eq!(
353            InternalValue::from_components(*b"c", *b"", 1, ValueType::WeakTombstone),
354            iter.next().unwrap()?,
355        );
356        assert_eq!(
357            InternalValue::from_components(*b"d", *b"", 1, ValueType::WeakTombstone),
358            iter.next().unwrap()?,
359        );
360        assert_eq!(
361            InternalValue::from_components(*b"e", *b"", 1, ValueType::WeakTombstone),
362            iter.next().unwrap()?,
363        );
364        iter_closed!(iter);
365
366        test_reverse!(vec);
367
368        Ok(())
369    }
370
371    #[test]
372    #[expect(clippy::unwrap_used)]
373    fn mvcc_queue_weak_almost_gone_2() -> crate::Result<()> {
374        let vec = [
375            InternalValue::from_components("a", "a", 0, ValueType::Value),
376            InternalValue::from_components("b", "", 1, ValueType::WeakTombstone),
377            InternalValue::from_components("c", "", 1, ValueType::WeakTombstone),
378            InternalValue::from_components("d", "", 1, ValueType::WeakTombstone),
379            InternalValue::from_components("e", "", 1, ValueType::WeakTombstone),
380        ];
381
382        let iter = Box::new(vec.iter().cloned().map(Ok));
383
384        let mut iter = MvccStream::new(iter);
385
386        assert_eq!(
387            InternalValue::from_components(*b"a", *b"a", 0, ValueType::Value),
388            iter.next().unwrap()?,
389        );
390        assert_eq!(
391            InternalValue::from_components(*b"b", *b"", 1, ValueType::WeakTombstone),
392            iter.next().unwrap()?,
393        );
394        assert_eq!(
395            InternalValue::from_components(*b"c", *b"", 1, ValueType::WeakTombstone),
396            iter.next().unwrap()?,
397        );
398        assert_eq!(
399            InternalValue::from_components(*b"d", *b"", 1, ValueType::WeakTombstone),
400            iter.next().unwrap()?,
401        );
402        assert_eq!(
403            InternalValue::from_components(*b"e", *b"", 1, ValueType::WeakTombstone),
404            iter.next().unwrap()?,
405        );
406        iter_closed!(iter);
407
408        test_reverse!(vec);
409
410        Ok(())
411    }
412
413    #[test]
414    #[expect(clippy::unwrap_used)]
415    fn mvcc_queue_weak_reverse() -> crate::Result<()> {
416        let vec = [
417            InternalValue::from_components("a", "a", 0, ValueType::Value),
418            InternalValue::from_components("b", "b", 0, ValueType::Value),
419            InternalValue::from_components("c", "c", 0, ValueType::Value),
420            InternalValue::from_components("d", "d", 0, ValueType::Value),
421            InternalValue::from_components("e", "", 1, ValueType::WeakTombstone),
422            InternalValue::from_components("e", "e", 0, ValueType::Value),
423        ];
424
425        let iter = Box::new(vec.iter().cloned().map(Ok));
426
427        let mut iter = MvccStream::new(iter);
428
429        assert_eq!(
430            InternalValue::from_components(*b"a", *b"a", 0, ValueType::Value),
431            iter.next().unwrap()?,
432        );
433        assert_eq!(
434            InternalValue::from_components(*b"b", *b"b", 0, ValueType::Value),
435            iter.next().unwrap()?,
436        );
437        assert_eq!(
438            InternalValue::from_components(*b"c", *b"c", 0, ValueType::Value),
439            iter.next().unwrap()?,
440        );
441        assert_eq!(
442            InternalValue::from_components(*b"d", *b"d", 0, ValueType::Value),
443            iter.next().unwrap()?,
444        );
445        assert_eq!(
446            InternalValue::from_components(*b"e", *b"", 1, ValueType::WeakTombstone),
447            iter.next().unwrap()?,
448        );
449        iter_closed!(iter);
450
451        test_reverse!(vec);
452
453        Ok(())
454    }
455
456    #[test]
457    #[expect(clippy::unwrap_used)]
458    fn mvcc_stream_simple() -> crate::Result<()> {
459        #[rustfmt::skip]
460        let vec = stream![
461          "a", "new", "V",
462          "a", "old", "V",
463        ];
464
465        let iter = Box::new(vec.iter().cloned().map(Ok));
466
467        let mut iter = MvccStream::new(iter);
468
469        assert_eq!(
470            InternalValue::from_components(*b"a", *b"new", 999, ValueType::Value),
471            iter.next().unwrap()?,
472        );
473        iter_closed!(iter);
474
475        test_reverse!(vec);
476
477        Ok(())
478    }
479
480    #[test]
481    #[expect(clippy::unwrap_used)]
482    fn mvcc_stream_simple_multi_keys() -> crate::Result<()> {
483        #[rustfmt::skip]
484        let vec = stream![
485          "a", "new", "V",
486          "a", "old", "V",
487          "b", "new", "V",
488          "b", "old", "V",
489          "c", "newnew", "V",
490          "c", "new", "V",
491          "c", "old", "V",
492        ];
493
494        let iter = Box::new(vec.iter().cloned().map(Ok));
495
496        let mut iter = MvccStream::new(iter);
497
498        assert_eq!(
499            InternalValue::from_components(*b"a", *b"new", 999, ValueType::Value),
500            iter.next().unwrap()?,
501        );
502        assert_eq!(
503            InternalValue::from_components(*b"b", *b"new", 999, ValueType::Value),
504            iter.next().unwrap()?,
505        );
506        assert_eq!(
507            InternalValue::from_components(*b"c", *b"newnew", 999, ValueType::Value),
508            iter.next().unwrap()?,
509        );
510        iter_closed!(iter);
511
512        test_reverse!(vec);
513
514        Ok(())
515    }
516
517    #[test]
518    #[expect(clippy::unwrap_used)]
519    fn mvcc_stream_tombstone() -> crate::Result<()> {
520        #[rustfmt::skip]
521        let vec = stream![
522          "a", "", "T",
523          "a", "old", "V",
524        ];
525
526        let iter = Box::new(vec.iter().cloned().map(Ok));
527
528        let mut iter = MvccStream::new(iter);
529
530        assert_eq!(
531            InternalValue::from_components(*b"a", *b"", 999, ValueType::Tombstone),
532            iter.next().unwrap()?,
533        );
534        iter_closed!(iter);
535
536        test_reverse!(vec);
537
538        Ok(())
539    }
540
541    #[test]
542    #[expect(clippy::unwrap_used)]
543    fn mvcc_stream_tombstone_multi_keys() -> crate::Result<()> {
544        #[rustfmt::skip]
545        let vec = stream![
546          "a", "", "T",
547          "a", "old", "V",
548          "b", "", "T",
549          "b", "old", "V",
550          "c", "", "T",
551          "c", "", "T",
552          "c", "old", "V",
553        ];
554
555        let iter = Box::new(vec.iter().cloned().map(Ok));
556
557        let mut iter = MvccStream::new(iter);
558
559        assert_eq!(
560            InternalValue::from_components(*b"a", *b"", 999, ValueType::Tombstone),
561            iter.next().unwrap()?,
562        );
563        assert_eq!(
564            InternalValue::from_components(*b"b", *b"", 999, ValueType::Tombstone),
565            iter.next().unwrap()?,
566        );
567        assert_eq!(
568            InternalValue::from_components(*b"c", *b"", 999, ValueType::Tombstone),
569            iter.next().unwrap()?,
570        );
571        iter_closed!(iter);
572
573        test_reverse!(vec);
574
575        Ok(())
576    }
577
578    #[test]
579    #[expect(clippy::unwrap_used)]
580    fn mvcc_stream_weak_tombstone_simple() -> crate::Result<()> {
581        #[rustfmt::skip]
582        let vec = stream![
583          "a", "", "W",
584          "a", "old", "V",
585        ];
586
587        let iter = Box::new(vec.iter().cloned().map(Ok));
588
589        let mut iter = MvccStream::new(iter);
590
591        assert_eq!(
592            InternalValue::from_components(*b"a", *b"", 999, ValueType::WeakTombstone),
593            iter.next().unwrap()?,
594        );
595        iter_closed!(iter);
596
597        test_reverse!(vec);
598
599        Ok(())
600    }
601
602    #[test]
603    #[expect(clippy::unwrap_used)]
604    fn mvcc_stream_weak_tombstone_resurrection() -> crate::Result<()> {
605        #[rustfmt::skip]
606        let vec = stream![
607          "a", "", "W",
608          "a", "new", "V",
609          "a", "old", "V",
610        ];
611
612        let iter = Box::new(vec.iter().cloned().map(Ok));
613
614        let mut iter = MvccStream::new(iter);
615
616        assert_eq!(
617            InternalValue::from_components(*b"a", *b"", 999, ValueType::WeakTombstone),
618            iter.next().unwrap()?,
619        );
620        iter_closed!(iter);
621
622        test_reverse!(vec);
623
624        Ok(())
625    }
626
627    #[test]
628    #[expect(clippy::unwrap_used)]
629    fn mvcc_stream_weak_tombstone_priority() -> crate::Result<()> {
630        #[rustfmt::skip]
631        let vec = stream![
632          "a", "", "T",  
633          "a", "", "W",
634          "a", "new", "V",
635          "a", "old", "V",
636        ];
637
638        let iter = Box::new(vec.iter().cloned().map(Ok));
639
640        let mut iter = MvccStream::new(iter);
641
642        assert_eq!(
643            InternalValue::from_components(*b"a", *b"", 999, ValueType::Tombstone),
644            iter.next().unwrap()?,
645        );
646        iter_closed!(iter);
647
648        test_reverse!(vec);
649
650        Ok(())
651    }
652
653    #[test]
654    #[expect(clippy::unwrap_used)]
655    fn mvcc_stream_weak_tombstone_multi_keys() -> crate::Result<()> {
656        #[rustfmt::skip]
657        let vec = stream![
658          "a", "", "W",
659          "a", "old", "V",
660          "b", "", "W",
661          "b", "old", "V",
662          "c", "", "W",
663          "c", "old", "V",
664        ];
665
666        let iter = Box::new(vec.iter().cloned().map(Ok));
667
668        let mut iter = MvccStream::new(iter);
669
670        assert_eq!(
671            InternalValue::from_components(*b"a", *b"", 999, ValueType::WeakTombstone),
672            iter.next().unwrap()?,
673        );
674        assert_eq!(
675            InternalValue::from_components(*b"b", *b"", 999, ValueType::WeakTombstone),
676            iter.next().unwrap()?,
677        );
678        assert_eq!(
679            InternalValue::from_components(*b"c", *b"", 999, ValueType::WeakTombstone),
680            iter.next().unwrap()?,
681        );
682        iter_closed!(iter);
683
684        test_reverse!(vec);
685
686        Ok(())
687    }
688}