1use crate::double_ended_peekable::{DoubleEndedPeekable, DoubleEndedPeekableExt};
6use crate::{InternalValue, UserKey};
7
8pub struct MvccStream<I: DoubleEndedIterator<Item = crate::Result<InternalValue>>> {
12 inner: DoubleEndedPeekable<crate::Result<InternalValue>, I>,
13}
14
15impl<I: DoubleEndedIterator<Item = crate::Result<InternalValue>>> MvccStream<I> {
16 #[must_use]
18 pub fn new(iter: I) -> Self {
19 Self {
20 inner: iter.double_ended_peekable(),
21 }
22 }
23
24 fn drain_key_min(&mut self, key: &UserKey) -> crate::Result<()> {
26 loop {
27 let Some(next) = self.inner.next_if(|kv| {
28 if let Ok(kv) = kv {
29 kv.key.user_key == key
30 } else {
31 true
32 }
33 }) else {
34 return Ok(());
35 };
36
37 next?;
38 }
39 }
40}
41
42impl<I: DoubleEndedIterator<Item = crate::Result<InternalValue>>> Iterator for MvccStream<I> {
43 type Item = crate::Result<InternalValue>;
44
45 fn next(&mut self) -> Option<Self::Item> {
46 let head = fail_iter!(self.inner.next()?);
47
48 fail_iter!(self.drain_key_min(&head.key.user_key));
50
51 Some(Ok(head))
52 }
53}
54
55impl<I: DoubleEndedIterator<Item = crate::Result<InternalValue>>> DoubleEndedIterator
56 for MvccStream<I>
57{
58 fn next_back(&mut self) -> Option<Self::Item> {
59 loop {
60 let tail = fail_iter!(self.inner.next_back()?);
61
62 let prev = match self.inner.peek_back() {
63 Some(Ok(prev)) => prev,
64 Some(Err(_)) => {
65 #[expect(
66 clippy::expect_used,
67 reason = "we just asserted, the peeked value is an error"
68 )]
69 return Some(Err(self
70 .inner
71 .next_back()
72 .expect("should exist")
73 .expect_err("should be error")));
74 }
75 None => {
76 return Some(Ok(tail));
77 }
78 };
79
80 if prev.key.user_key < tail.key.user_key {
81 return Some(Ok(tail));
82 }
83 }
84 }
85}
86
87#[cfg(test)]
88#[expect(clippy::string_lit_as_bytes)]
89mod tests {
90 use super::*;
91 use crate::{value::InternalValue, ValueType};
92 use test_log::test;
93
94 macro_rules! stream {
95 ($($key:expr, $sub_key:expr, $value_type:expr),* $(,)?) => {{
96 let mut values = Vec::new();
97 let mut counters = std::collections::HashMap::new();
98
99 $(
100 let key = $key.as_bytes();
101 let sub_key = $sub_key.as_bytes();
102 let value_type = match $value_type {
103 "V" => ValueType::Value,
104 "T" => ValueType::Tombstone,
105 "W" => ValueType::WeakTombstone,
106 _ => panic!("Unknown value type"),
107 };
108
109 let counter = counters.entry($key).and_modify(|x| { *x -= 1 }).or_insert(999);
110 values.push(InternalValue::from_components(key, sub_key, *counter, value_type));
111 )*
112
113 values
114 }};
115 }
116
117 macro_rules! iter_closed {
118 ($iter:expr) => {
119 assert!($iter.next().is_none(), "iterator should be closed (done)");
120 assert!(
121 $iter.next_back().is_none(),
122 "iterator should be closed (done)"
123 );
124 };
125 }
126
127 macro_rules! test_reverse {
129 ($v:expr) => {
130 let iter = Box::new($v.iter().cloned().map(Ok));
131 let iter = MvccStream::new(iter);
132 let mut forwards = iter.flatten().collect::<Vec<_>>();
133 forwards.reverse();
134
135 let iter = Box::new($v.iter().cloned().map(Ok));
136 let iter = MvccStream::new(iter);
137 let backwards = iter.rev().flatten().collect::<Vec<_>>();
138
139 assert_eq!(forwards, backwards);
140 };
141 }
142
143 #[test]
144 #[expect(clippy::unwrap_used)]
145 fn mvcc_stream_error() -> crate::Result<()> {
146 {
147 let vec = [
148 Ok(InternalValue::from_components(
149 "a",
150 "new",
151 999,
152 ValueType::Value,
153 )),
154 Err(crate::Error::Io(std::io::Error::other("test error"))),
155 ];
156
157 let iter = Box::new(vec.into_iter());
158 let mut iter = MvccStream::new(iter);
159
160 assert!(matches!(iter.next().unwrap(), Err(crate::Error::Io(_))));
163 iter_closed!(iter);
164 }
165
166 {
167 let vec = [
168 Ok(InternalValue::from_components(
169 "a",
170 "new",
171 999,
172 ValueType::Value,
173 )),
174 Err(crate::Error::Io(std::io::Error::other("test error"))),
175 ];
176
177 let iter = Box::new(vec.into_iter());
178 let mut iter = MvccStream::new(iter);
179
180 assert!(matches!(
181 iter.next_back().unwrap(),
182 Err(crate::Error::Io(_))
183 ));
184 assert_eq!(
185 InternalValue::from_components(*b"a", *b"new", 999, ValueType::Value),
186 iter.next_back().unwrap()?,
187 );
188 iter_closed!(iter);
189 }
190
191 Ok(())
192 }
193
194 #[test]
195 #[expect(clippy::unwrap_used)]
196 fn mvcc_queue_reverse_almost_gone() -> crate::Result<()> {
197 let vec = [
198 InternalValue::from_components("a", "a", 0, ValueType::Value),
199 InternalValue::from_components("b", "", 1, ValueType::Tombstone),
200 InternalValue::from_components("b", "b", 0, ValueType::Value),
201 InternalValue::from_components("c", "", 1, ValueType::Tombstone),
202 InternalValue::from_components("c", "c", 0, ValueType::Value),
203 InternalValue::from_components("d", "", 1, ValueType::Tombstone),
204 InternalValue::from_components("d", "d", 0, ValueType::Value),
205 InternalValue::from_components("e", "", 1, ValueType::Tombstone),
206 InternalValue::from_components("e", "e", 0, ValueType::Value),
207 ];
208
209 let iter = Box::new(vec.iter().cloned().map(Ok));
210
211 let mut iter = MvccStream::new(iter);
212
213 assert_eq!(
214 InternalValue::from_components(*b"a", *b"a", 0, ValueType::Value),
215 iter.next().unwrap()?,
216 );
217 assert_eq!(
218 InternalValue::from_components(*b"b", *b"", 1, ValueType::Tombstone),
219 iter.next().unwrap()?,
220 );
221 assert_eq!(
222 InternalValue::from_components(*b"c", *b"", 1, ValueType::Tombstone),
223 iter.next().unwrap()?,
224 );
225 assert_eq!(
226 InternalValue::from_components(*b"d", *b"", 1, ValueType::Tombstone),
227 iter.next().unwrap()?,
228 );
229 assert_eq!(
230 InternalValue::from_components(*b"e", *b"", 1, ValueType::Tombstone),
231 iter.next().unwrap()?,
232 );
233 iter_closed!(iter);
234
235 test_reverse!(vec);
236
237 Ok(())
238 }
239
240 #[test]
241 #[expect(clippy::unwrap_used)]
242 fn mvcc_queue_almost_gone_2() -> crate::Result<()> {
243 let vec = [
244 InternalValue::from_components("a", "a", 0, ValueType::Value),
245 InternalValue::from_components("b", "", 1, ValueType::Tombstone),
246 InternalValue::from_components("c", "", 1, ValueType::Tombstone),
247 InternalValue::from_components("d", "", 1, ValueType::Tombstone),
248 InternalValue::from_components("e", "", 1, ValueType::Tombstone),
249 ];
250
251 let iter = Box::new(vec.iter().cloned().map(Ok));
252
253 let mut iter = MvccStream::new(iter);
254
255 assert_eq!(
256 InternalValue::from_components(*b"a", *b"a", 0, ValueType::Value),
257 iter.next().unwrap()?,
258 );
259 assert_eq!(
260 InternalValue::from_components(*b"b", *b"", 1, ValueType::Tombstone),
261 iter.next().unwrap()?,
262 );
263 assert_eq!(
264 InternalValue::from_components(*b"c", *b"", 1, ValueType::Tombstone),
265 iter.next().unwrap()?,
266 );
267 assert_eq!(
268 InternalValue::from_components(*b"d", *b"", 1, ValueType::Tombstone),
269 iter.next().unwrap()?,
270 );
271 assert_eq!(
272 InternalValue::from_components(*b"e", *b"", 1, ValueType::Tombstone),
273 iter.next().unwrap()?,
274 );
275 iter_closed!(iter);
276
277 test_reverse!(vec);
278
279 Ok(())
280 }
281
282 #[test]
283 #[expect(clippy::unwrap_used)]
284 fn mvcc_queue() -> crate::Result<()> {
285 let vec = [
286 InternalValue::from_components("a", "a", 0, ValueType::Value),
287 InternalValue::from_components("b", "b", 0, ValueType::Value),
288 InternalValue::from_components("c", "c", 0, ValueType::Value),
289 InternalValue::from_components("d", "d", 0, ValueType::Value),
290 InternalValue::from_components("e", "", 1, ValueType::Tombstone),
291 InternalValue::from_components("e", "e", 0, ValueType::Value),
292 ];
293
294 let iter = Box::new(vec.iter().cloned().map(Ok));
295
296 let mut iter = MvccStream::new(iter);
297
298 assert_eq!(
299 InternalValue::from_components(*b"a", *b"a", 0, ValueType::Value),
300 iter.next().unwrap()?,
301 );
302 assert_eq!(
303 InternalValue::from_components(*b"b", *b"b", 0, ValueType::Value),
304 iter.next().unwrap()?,
305 );
306 assert_eq!(
307 InternalValue::from_components(*b"c", *b"c", 0, ValueType::Value),
308 iter.next().unwrap()?,
309 );
310 assert_eq!(
311 InternalValue::from_components(*b"d", *b"d", 0, ValueType::Value),
312 iter.next().unwrap()?,
313 );
314 assert_eq!(
315 InternalValue::from_components(*b"e", *b"", 1, ValueType::Tombstone),
316 iter.next().unwrap()?,
317 );
318 iter_closed!(iter);
319
320 test_reverse!(vec);
321
322 Ok(())
323 }
324
325 #[test]
326 #[expect(clippy::unwrap_used)]
327 fn mvcc_queue_weak_almost_gone() -> crate::Result<()> {
328 let vec = [
329 InternalValue::from_components("a", "a", 0, ValueType::Value),
330 InternalValue::from_components("b", "", 1, ValueType::WeakTombstone),
331 InternalValue::from_components("b", "b", 0, ValueType::Value),
332 InternalValue::from_components("c", "", 1, ValueType::WeakTombstone),
333 InternalValue::from_components("c", "c", 0, ValueType::Value),
334 InternalValue::from_components("d", "", 1, ValueType::WeakTombstone),
335 InternalValue::from_components("d", "d", 0, ValueType::Value),
336 InternalValue::from_components("e", "", 1, ValueType::WeakTombstone),
337 InternalValue::from_components("e", "e", 0, ValueType::Value),
338 ];
339
340 let iter = Box::new(vec.iter().cloned().map(Ok));
341
342 let mut iter = MvccStream::new(iter);
343
344 assert_eq!(
345 InternalValue::from_components(*b"a", *b"a", 0, ValueType::Value),
346 iter.next().unwrap()?,
347 );
348 assert_eq!(
349 InternalValue::from_components(*b"b", *b"", 1, ValueType::WeakTombstone),
350 iter.next().unwrap()?,
351 );
352 assert_eq!(
353 InternalValue::from_components(*b"c", *b"", 1, ValueType::WeakTombstone),
354 iter.next().unwrap()?,
355 );
356 assert_eq!(
357 InternalValue::from_components(*b"d", *b"", 1, ValueType::WeakTombstone),
358 iter.next().unwrap()?,
359 );
360 assert_eq!(
361 InternalValue::from_components(*b"e", *b"", 1, ValueType::WeakTombstone),
362 iter.next().unwrap()?,
363 );
364 iter_closed!(iter);
365
366 test_reverse!(vec);
367
368 Ok(())
369 }
370
371 #[test]
372 #[expect(clippy::unwrap_used)]
373 fn mvcc_queue_weak_almost_gone_2() -> crate::Result<()> {
374 let vec = [
375 InternalValue::from_components("a", "a", 0, ValueType::Value),
376 InternalValue::from_components("b", "", 1, ValueType::WeakTombstone),
377 InternalValue::from_components("c", "", 1, ValueType::WeakTombstone),
378 InternalValue::from_components("d", "", 1, ValueType::WeakTombstone),
379 InternalValue::from_components("e", "", 1, ValueType::WeakTombstone),
380 ];
381
382 let iter = Box::new(vec.iter().cloned().map(Ok));
383
384 let mut iter = MvccStream::new(iter);
385
386 assert_eq!(
387 InternalValue::from_components(*b"a", *b"a", 0, ValueType::Value),
388 iter.next().unwrap()?,
389 );
390 assert_eq!(
391 InternalValue::from_components(*b"b", *b"", 1, ValueType::WeakTombstone),
392 iter.next().unwrap()?,
393 );
394 assert_eq!(
395 InternalValue::from_components(*b"c", *b"", 1, ValueType::WeakTombstone),
396 iter.next().unwrap()?,
397 );
398 assert_eq!(
399 InternalValue::from_components(*b"d", *b"", 1, ValueType::WeakTombstone),
400 iter.next().unwrap()?,
401 );
402 assert_eq!(
403 InternalValue::from_components(*b"e", *b"", 1, ValueType::WeakTombstone),
404 iter.next().unwrap()?,
405 );
406 iter_closed!(iter);
407
408 test_reverse!(vec);
409
410 Ok(())
411 }
412
413 #[test]
414 #[expect(clippy::unwrap_used)]
415 fn mvcc_queue_weak_reverse() -> crate::Result<()> {
416 let vec = [
417 InternalValue::from_components("a", "a", 0, ValueType::Value),
418 InternalValue::from_components("b", "b", 0, ValueType::Value),
419 InternalValue::from_components("c", "c", 0, ValueType::Value),
420 InternalValue::from_components("d", "d", 0, ValueType::Value),
421 InternalValue::from_components("e", "", 1, ValueType::WeakTombstone),
422 InternalValue::from_components("e", "e", 0, ValueType::Value),
423 ];
424
425 let iter = Box::new(vec.iter().cloned().map(Ok));
426
427 let mut iter = MvccStream::new(iter);
428
429 assert_eq!(
430 InternalValue::from_components(*b"a", *b"a", 0, ValueType::Value),
431 iter.next().unwrap()?,
432 );
433 assert_eq!(
434 InternalValue::from_components(*b"b", *b"b", 0, ValueType::Value),
435 iter.next().unwrap()?,
436 );
437 assert_eq!(
438 InternalValue::from_components(*b"c", *b"c", 0, ValueType::Value),
439 iter.next().unwrap()?,
440 );
441 assert_eq!(
442 InternalValue::from_components(*b"d", *b"d", 0, ValueType::Value),
443 iter.next().unwrap()?,
444 );
445 assert_eq!(
446 InternalValue::from_components(*b"e", *b"", 1, ValueType::WeakTombstone),
447 iter.next().unwrap()?,
448 );
449 iter_closed!(iter);
450
451 test_reverse!(vec);
452
453 Ok(())
454 }
455
456 #[test]
457 #[expect(clippy::unwrap_used)]
458 fn mvcc_stream_simple() -> crate::Result<()> {
459 #[rustfmt::skip]
460 let vec = stream![
461 "a", "new", "V",
462 "a", "old", "V",
463 ];
464
465 let iter = Box::new(vec.iter().cloned().map(Ok));
466
467 let mut iter = MvccStream::new(iter);
468
469 assert_eq!(
470 InternalValue::from_components(*b"a", *b"new", 999, ValueType::Value),
471 iter.next().unwrap()?,
472 );
473 iter_closed!(iter);
474
475 test_reverse!(vec);
476
477 Ok(())
478 }
479
480 #[test]
481 #[expect(clippy::unwrap_used)]
482 fn mvcc_stream_simple_multi_keys() -> crate::Result<()> {
483 #[rustfmt::skip]
484 let vec = stream![
485 "a", "new", "V",
486 "a", "old", "V",
487 "b", "new", "V",
488 "b", "old", "V",
489 "c", "newnew", "V",
490 "c", "new", "V",
491 "c", "old", "V",
492 ];
493
494 let iter = Box::new(vec.iter().cloned().map(Ok));
495
496 let mut iter = MvccStream::new(iter);
497
498 assert_eq!(
499 InternalValue::from_components(*b"a", *b"new", 999, ValueType::Value),
500 iter.next().unwrap()?,
501 );
502 assert_eq!(
503 InternalValue::from_components(*b"b", *b"new", 999, ValueType::Value),
504 iter.next().unwrap()?,
505 );
506 assert_eq!(
507 InternalValue::from_components(*b"c", *b"newnew", 999, ValueType::Value),
508 iter.next().unwrap()?,
509 );
510 iter_closed!(iter);
511
512 test_reverse!(vec);
513
514 Ok(())
515 }
516
517 #[test]
518 #[expect(clippy::unwrap_used)]
519 fn mvcc_stream_tombstone() -> crate::Result<()> {
520 #[rustfmt::skip]
521 let vec = stream![
522 "a", "", "T",
523 "a", "old", "V",
524 ];
525
526 let iter = Box::new(vec.iter().cloned().map(Ok));
527
528 let mut iter = MvccStream::new(iter);
529
530 assert_eq!(
531 InternalValue::from_components(*b"a", *b"", 999, ValueType::Tombstone),
532 iter.next().unwrap()?,
533 );
534 iter_closed!(iter);
535
536 test_reverse!(vec);
537
538 Ok(())
539 }
540
541 #[test]
542 #[expect(clippy::unwrap_used)]
543 fn mvcc_stream_tombstone_multi_keys() -> crate::Result<()> {
544 #[rustfmt::skip]
545 let vec = stream![
546 "a", "", "T",
547 "a", "old", "V",
548 "b", "", "T",
549 "b", "old", "V",
550 "c", "", "T",
551 "c", "", "T",
552 "c", "old", "V",
553 ];
554
555 let iter = Box::new(vec.iter().cloned().map(Ok));
556
557 let mut iter = MvccStream::new(iter);
558
559 assert_eq!(
560 InternalValue::from_components(*b"a", *b"", 999, ValueType::Tombstone),
561 iter.next().unwrap()?,
562 );
563 assert_eq!(
564 InternalValue::from_components(*b"b", *b"", 999, ValueType::Tombstone),
565 iter.next().unwrap()?,
566 );
567 assert_eq!(
568 InternalValue::from_components(*b"c", *b"", 999, ValueType::Tombstone),
569 iter.next().unwrap()?,
570 );
571 iter_closed!(iter);
572
573 test_reverse!(vec);
574
575 Ok(())
576 }
577
578 #[test]
579 #[expect(clippy::unwrap_used)]
580 fn mvcc_stream_weak_tombstone_simple() -> crate::Result<()> {
581 #[rustfmt::skip]
582 let vec = stream![
583 "a", "", "W",
584 "a", "old", "V",
585 ];
586
587 let iter = Box::new(vec.iter().cloned().map(Ok));
588
589 let mut iter = MvccStream::new(iter);
590
591 assert_eq!(
592 InternalValue::from_components(*b"a", *b"", 999, ValueType::WeakTombstone),
593 iter.next().unwrap()?,
594 );
595 iter_closed!(iter);
596
597 test_reverse!(vec);
598
599 Ok(())
600 }
601
602 #[test]
603 #[expect(clippy::unwrap_used)]
604 fn mvcc_stream_weak_tombstone_resurrection() -> crate::Result<()> {
605 #[rustfmt::skip]
606 let vec = stream![
607 "a", "", "W",
608 "a", "new", "V",
609 "a", "old", "V",
610 ];
611
612 let iter = Box::new(vec.iter().cloned().map(Ok));
613
614 let mut iter = MvccStream::new(iter);
615
616 assert_eq!(
617 InternalValue::from_components(*b"a", *b"", 999, ValueType::WeakTombstone),
618 iter.next().unwrap()?,
619 );
620 iter_closed!(iter);
621
622 test_reverse!(vec);
623
624 Ok(())
625 }
626
627 #[test]
628 #[expect(clippy::unwrap_used)]
629 fn mvcc_stream_weak_tombstone_priority() -> crate::Result<()> {
630 #[rustfmt::skip]
631 let vec = stream![
632 "a", "", "T",
633 "a", "", "W",
634 "a", "new", "V",
635 "a", "old", "V",
636 ];
637
638 let iter = Box::new(vec.iter().cloned().map(Ok));
639
640 let mut iter = MvccStream::new(iter);
641
642 assert_eq!(
643 InternalValue::from_components(*b"a", *b"", 999, ValueType::Tombstone),
644 iter.next().unwrap()?,
645 );
646 iter_closed!(iter);
647
648 test_reverse!(vec);
649
650 Ok(())
651 }
652
653 #[test]
654 #[expect(clippy::unwrap_used)]
655 fn mvcc_stream_weak_tombstone_multi_keys() -> crate::Result<()> {
656 #[rustfmt::skip]
657 let vec = stream![
658 "a", "", "W",
659 "a", "old", "V",
660 "b", "", "W",
661 "b", "old", "V",
662 "c", "", "W",
663 "c", "old", "V",
664 ];
665
666 let iter = Box::new(vec.iter().cloned().map(Ok));
667
668 let mut iter = MvccStream::new(iter);
669
670 assert_eq!(
671 InternalValue::from_components(*b"a", *b"", 999, ValueType::WeakTombstone),
672 iter.next().unwrap()?,
673 );
674 assert_eq!(
675 InternalValue::from_components(*b"b", *b"", 999, ValueType::WeakTombstone),
676 iter.next().unwrap()?,
677 );
678 assert_eq!(
679 InternalValue::from_components(*b"c", *b"", 999, ValueType::WeakTombstone),
680 iter.next().unwrap()?,
681 );
682 iter_closed!(iter);
683
684 test_reverse!(vec);
685
686 Ok(())
687 }
688}