1use crate::{InternalValue, UserKey};
6use double_ended_peekable::{DoubleEndedPeekable, DoubleEndedPeekableExt};
7
8#[allow(clippy::module_name_repetitions)]
12pub struct MvccStream<I: DoubleEndedIterator<Item = crate::Result<InternalValue>>> {
13 inner: DoubleEndedPeekable<I>,
14}
15
16impl<I: DoubleEndedIterator<Item = crate::Result<InternalValue>>> MvccStream<I> {
17 #[must_use]
19 pub fn new(iter: I) -> Self {
20 let iter = iter.double_ended_peekable();
21 Self { inner: iter }
22 }
23
24 fn drain_key_min(&mut self, key: &UserKey) -> crate::Result<()> {
25 loop {
26 let Some(next) = self.inner.peek() else {
27 return Ok(());
28 };
29
30 let Ok(next) = next else {
31 #[allow(clippy::expect_used)]
33 return Err(self
34 .inner
35 .next()
36 .expect("should exist")
37 .expect_err("should be error"));
38 };
39
40 if next.key.user_key == key {
42 #[allow(clippy::expect_used)]
44 self.inner.next().expect("should not be empty")?;
45 } else {
46 return Ok(());
47 }
48 }
49 }
50}
51
52impl<I: DoubleEndedIterator<Item = crate::Result<InternalValue>>> Iterator for MvccStream<I> {
53 type Item = crate::Result<InternalValue>;
54
55 fn next(&mut self) -> Option<Self::Item> {
56 let head = fail_iter!(self.inner.next()?);
57
58 fail_iter!(self.drain_key_min(&head.key.user_key));
60
61 Some(Ok(head))
62 }
63}
64
65impl<I: DoubleEndedIterator<Item = crate::Result<InternalValue>>> DoubleEndedIterator
66 for MvccStream<I>
67{
68 fn next_back(&mut self) -> Option<Self::Item> {
69 loop {
70 let tail = fail_iter!(self.inner.next_back()?);
71
72 let prev = match self.inner.peek_back() {
73 Some(Ok(prev)) => prev,
74 Some(Err(_)) => {
75 #[allow(clippy::expect_used)]
77 return Some(Err(self
78 .inner
79 .next_back()
80 .expect("should exist")
81 .expect_err("should be error")));
82 }
83 None => {
84 return Some(Ok(tail));
85 }
86 };
87
88 if prev.key.user_key < tail.key.user_key {
89 return Some(Ok(tail));
90 }
91 }
92 }
93}
94
95#[cfg(test)]
96#[allow(clippy::string_lit_as_bytes)]
97mod tests {
98 use super::*;
99 use crate::value::{InternalValue, ValueType};
100 use test_log::test;
101
102 macro_rules! stream {
103 ($($key:expr, $sub_key:expr, $value_type:expr),* $(,)?) => {{
104 let mut values = Vec::new();
105 let mut counters = std::collections::HashMap::new();
106
107 $(
108 let key = $key.as_bytes();
109 let sub_key = $sub_key.as_bytes();
110 let value_type = match $value_type {
111 "V" => ValueType::Value,
112 "T" => ValueType::Tombstone,
113 "W" => ValueType::WeakTombstone,
114 _ => panic!("Unknown value type"),
115 };
116
117 let counter = counters.entry($key).and_modify(|x| { *x -= 1 }).or_insert(999);
118 values.push(InternalValue::from_components(key, sub_key, *counter, value_type));
119 )*
120
121 values
122 }};
123 }
124
125 macro_rules! iter_closed {
126 ($iter:expr) => {
127 assert!($iter.next().is_none(), "iterator should be closed (done)");
128 assert!(
129 $iter.next_back().is_none(),
130 "iterator should be closed (done)"
131 );
132 };
133 }
134
135 macro_rules! test_reverse {
137 ($v:expr) => {
138 let iter = Box::new($v.iter().cloned().map(Ok));
139 let iter = MvccStream::new(iter);
140 let mut forwards = iter.flatten().collect::<Vec<_>>();
141 forwards.reverse();
142
143 let iter = Box::new($v.iter().cloned().map(Ok));
144 let iter = MvccStream::new(iter);
145 let backwards = iter.rev().flatten().collect::<Vec<_>>();
146
147 assert_eq!(forwards, backwards);
148 };
149 }
150
151 #[test]
152 #[allow(clippy::unwrap_used)]
153 fn mvcc_queue_reverse_almost_gone() -> crate::Result<()> {
154 let vec = [
155 InternalValue::from_components("a", "a", 0, ValueType::Value),
156 InternalValue::from_components("b", "", 1, ValueType::Tombstone),
157 InternalValue::from_components("b", "b", 0, ValueType::Value),
158 InternalValue::from_components("c", "", 1, ValueType::Tombstone),
159 InternalValue::from_components("c", "c", 0, ValueType::Value),
160 InternalValue::from_components("d", "", 1, ValueType::Tombstone),
161 InternalValue::from_components("d", "d", 0, ValueType::Value),
162 InternalValue::from_components("e", "", 1, ValueType::Tombstone),
163 InternalValue::from_components("e", "e", 0, ValueType::Value),
164 ];
165
166 let iter = Box::new(vec.iter().cloned().map(Ok));
167
168 let mut iter = MvccStream::new(iter);
169
170 assert_eq!(
171 InternalValue::from_components(*b"a", *b"a", 0, ValueType::Value),
172 iter.next().unwrap()?,
173 );
174 assert_eq!(
175 InternalValue::from_components(*b"b", *b"", 1, ValueType::Tombstone),
176 iter.next().unwrap()?,
177 );
178 assert_eq!(
179 InternalValue::from_components(*b"c", *b"", 1, ValueType::Tombstone),
180 iter.next().unwrap()?,
181 );
182 assert_eq!(
183 InternalValue::from_components(*b"d", *b"", 1, ValueType::Tombstone),
184 iter.next().unwrap()?,
185 );
186 assert_eq!(
187 InternalValue::from_components(*b"e", *b"", 1, ValueType::Tombstone),
188 iter.next().unwrap()?,
189 );
190 iter_closed!(iter);
191
192 test_reverse!(vec);
193
194 Ok(())
195 }
196
197 #[test]
198 #[allow(clippy::unwrap_used)]
199 fn mvcc_queue_almost_gone_2() -> crate::Result<()> {
200 let vec = [
201 InternalValue::from_components("a", "a", 0, ValueType::Value),
202 InternalValue::from_components("b", "", 1, ValueType::Tombstone),
203 InternalValue::from_components("c", "", 1, ValueType::Tombstone),
204 InternalValue::from_components("d", "", 1, ValueType::Tombstone),
205 InternalValue::from_components("e", "", 1, ValueType::Tombstone),
206 ];
207
208 let iter = Box::new(vec.iter().cloned().map(Ok));
209
210 let mut iter = MvccStream::new(iter);
211
212 assert_eq!(
213 InternalValue::from_components(*b"a", *b"a", 0, ValueType::Value),
214 iter.next().unwrap()?,
215 );
216 assert_eq!(
217 InternalValue::from_components(*b"b", *b"", 1, ValueType::Tombstone),
218 iter.next().unwrap()?,
219 );
220 assert_eq!(
221 InternalValue::from_components(*b"c", *b"", 1, ValueType::Tombstone),
222 iter.next().unwrap()?,
223 );
224 assert_eq!(
225 InternalValue::from_components(*b"d", *b"", 1, ValueType::Tombstone),
226 iter.next().unwrap()?,
227 );
228 assert_eq!(
229 InternalValue::from_components(*b"e", *b"", 1, ValueType::Tombstone),
230 iter.next().unwrap()?,
231 );
232 iter_closed!(iter);
233
234 test_reverse!(vec);
235
236 Ok(())
237 }
238
239 #[test]
240 #[allow(clippy::unwrap_used)]
241 fn mvcc_queue() -> crate::Result<()> {
242 let vec = [
243 InternalValue::from_components("a", "a", 0, ValueType::Value),
244 InternalValue::from_components("b", "b", 0, ValueType::Value),
245 InternalValue::from_components("c", "c", 0, ValueType::Value),
246 InternalValue::from_components("d", "d", 0, ValueType::Value),
247 InternalValue::from_components("e", "", 1, ValueType::Tombstone),
248 InternalValue::from_components("e", "e", 0, ValueType::Value),
249 ];
250
251 let iter = Box::new(vec.iter().cloned().map(Ok));
252
253 let mut iter = MvccStream::new(iter);
254
255 assert_eq!(
256 InternalValue::from_components(*b"a", *b"a", 0, ValueType::Value),
257 iter.next().unwrap()?,
258 );
259 assert_eq!(
260 InternalValue::from_components(*b"b", *b"b", 0, ValueType::Value),
261 iter.next().unwrap()?,
262 );
263 assert_eq!(
264 InternalValue::from_components(*b"c", *b"c", 0, ValueType::Value),
265 iter.next().unwrap()?,
266 );
267 assert_eq!(
268 InternalValue::from_components(*b"d", *b"d", 0, ValueType::Value),
269 iter.next().unwrap()?,
270 );
271 assert_eq!(
272 InternalValue::from_components(*b"e", *b"", 1, ValueType::Tombstone),
273 iter.next().unwrap()?,
274 );
275 iter_closed!(iter);
276
277 test_reverse!(vec);
278
279 Ok(())
280 }
281
282 #[test]
283 #[allow(clippy::unwrap_used)]
284 fn mvcc_queue_weak_almost_gone() -> crate::Result<()> {
285 let vec = [
286 InternalValue::from_components("a", "a", 0, ValueType::Value),
287 InternalValue::from_components("b", "", 1, ValueType::WeakTombstone),
288 InternalValue::from_components("b", "b", 0, ValueType::Value),
289 InternalValue::from_components("c", "", 1, ValueType::WeakTombstone),
290 InternalValue::from_components("c", "c", 0, ValueType::Value),
291 InternalValue::from_components("d", "", 1, ValueType::WeakTombstone),
292 InternalValue::from_components("d", "d", 0, ValueType::Value),
293 InternalValue::from_components("e", "", 1, ValueType::WeakTombstone),
294 InternalValue::from_components("e", "e", 0, ValueType::Value),
295 ];
296
297 let iter = Box::new(vec.iter().cloned().map(Ok));
298
299 let mut iter = MvccStream::new(iter);
300
301 assert_eq!(
302 InternalValue::from_components(*b"a", *b"a", 0, ValueType::Value),
303 iter.next().unwrap()?,
304 );
305 assert_eq!(
306 InternalValue::from_components(*b"b", *b"", 1, ValueType::WeakTombstone),
307 iter.next().unwrap()?,
308 );
309 assert_eq!(
310 InternalValue::from_components(*b"c", *b"", 1, ValueType::WeakTombstone),
311 iter.next().unwrap()?,
312 );
313 assert_eq!(
314 InternalValue::from_components(*b"d", *b"", 1, ValueType::WeakTombstone),
315 iter.next().unwrap()?,
316 );
317 assert_eq!(
318 InternalValue::from_components(*b"e", *b"", 1, ValueType::WeakTombstone),
319 iter.next().unwrap()?,
320 );
321 iter_closed!(iter);
322
323 test_reverse!(vec);
324
325 Ok(())
326 }
327
328 #[test]
329 #[allow(clippy::unwrap_used)]
330 fn mvcc_queue_weak_almost_gone_2() -> crate::Result<()> {
331 let vec = [
332 InternalValue::from_components("a", "a", 0, ValueType::Value),
333 InternalValue::from_components("b", "", 1, ValueType::WeakTombstone),
334 InternalValue::from_components("c", "", 1, ValueType::WeakTombstone),
335 InternalValue::from_components("d", "", 1, ValueType::WeakTombstone),
336 InternalValue::from_components("e", "", 1, ValueType::WeakTombstone),
337 ];
338
339 let iter = Box::new(vec.iter().cloned().map(Ok));
340
341 let mut iter = MvccStream::new(iter);
342
343 assert_eq!(
344 InternalValue::from_components(*b"a", *b"a", 0, ValueType::Value),
345 iter.next().unwrap()?,
346 );
347 assert_eq!(
348 InternalValue::from_components(*b"b", *b"", 1, ValueType::WeakTombstone),
349 iter.next().unwrap()?,
350 );
351 assert_eq!(
352 InternalValue::from_components(*b"c", *b"", 1, ValueType::WeakTombstone),
353 iter.next().unwrap()?,
354 );
355 assert_eq!(
356 InternalValue::from_components(*b"d", *b"", 1, ValueType::WeakTombstone),
357 iter.next().unwrap()?,
358 );
359 assert_eq!(
360 InternalValue::from_components(*b"e", *b"", 1, ValueType::WeakTombstone),
361 iter.next().unwrap()?,
362 );
363 iter_closed!(iter);
364
365 test_reverse!(vec);
366
367 Ok(())
368 }
369
370 #[test]
371 #[allow(clippy::unwrap_used)]
372 fn mvcc_queue_weak_reverse() -> crate::Result<()> {
373 let vec = [
374 InternalValue::from_components("a", "a", 0, ValueType::Value),
375 InternalValue::from_components("b", "b", 0, ValueType::Value),
376 InternalValue::from_components("c", "c", 0, ValueType::Value),
377 InternalValue::from_components("d", "d", 0, ValueType::Value),
378 InternalValue::from_components("e", "", 1, ValueType::WeakTombstone),
379 InternalValue::from_components("e", "e", 0, ValueType::Value),
380 ];
381
382 let iter = Box::new(vec.iter().cloned().map(Ok));
383
384 let mut iter = MvccStream::new(iter);
385
386 assert_eq!(
387 InternalValue::from_components(*b"a", *b"a", 0, ValueType::Value),
388 iter.next().unwrap()?,
389 );
390 assert_eq!(
391 InternalValue::from_components(*b"b", *b"b", 0, ValueType::Value),
392 iter.next().unwrap()?,
393 );
394 assert_eq!(
395 InternalValue::from_components(*b"c", *b"c", 0, ValueType::Value),
396 iter.next().unwrap()?,
397 );
398 assert_eq!(
399 InternalValue::from_components(*b"d", *b"d", 0, ValueType::Value),
400 iter.next().unwrap()?,
401 );
402 assert_eq!(
403 InternalValue::from_components(*b"e", *b"", 1, ValueType::WeakTombstone),
404 iter.next().unwrap()?,
405 );
406 iter_closed!(iter);
407
408 test_reverse!(vec);
409
410 Ok(())
411 }
412
413 #[test]
414 #[allow(clippy::unwrap_used)]
415 fn mvcc_stream_simple() -> crate::Result<()> {
416 #[rustfmt::skip]
417 let vec = stream![
418 "a", "new", "V",
419 "a", "old", "V",
420 ];
421
422 let iter = Box::new(vec.iter().cloned().map(Ok));
423
424 let mut iter = MvccStream::new(iter);
425
426 assert_eq!(
427 InternalValue::from_components(*b"a", *b"new", 999, ValueType::Value),
428 iter.next().unwrap()?,
429 );
430 iter_closed!(iter);
431
432 test_reverse!(vec);
433
434 Ok(())
435 }
436
437 #[test]
438 #[allow(clippy::unwrap_used)]
439 fn mvcc_stream_simple_multi_keys() -> crate::Result<()> {
440 #[rustfmt::skip]
441 let vec = stream![
442 "a", "new", "V",
443 "a", "old", "V",
444 "b", "new", "V",
445 "b", "old", "V",
446 "c", "newnew", "V",
447 "c", "new", "V",
448 "c", "old", "V",
449 ];
450
451 let iter = Box::new(vec.iter().cloned().map(Ok));
452
453 let mut iter = MvccStream::new(iter);
454
455 assert_eq!(
456 InternalValue::from_components(*b"a", *b"new", 999, ValueType::Value),
457 iter.next().unwrap()?,
458 );
459 assert_eq!(
460 InternalValue::from_components(*b"b", *b"new", 999, ValueType::Value),
461 iter.next().unwrap()?,
462 );
463 assert_eq!(
464 InternalValue::from_components(*b"c", *b"newnew", 999, ValueType::Value),
465 iter.next().unwrap()?,
466 );
467 iter_closed!(iter);
468
469 test_reverse!(vec);
470
471 Ok(())
472 }
473
474 #[test]
475 #[allow(clippy::unwrap_used)]
476 fn mvcc_stream_tombstone() -> crate::Result<()> {
477 #[rustfmt::skip]
478 let vec = stream![
479 "a", "", "T",
480 "a", "old", "V",
481 ];
482
483 let iter = Box::new(vec.iter().cloned().map(Ok));
484
485 let mut iter = MvccStream::new(iter);
486
487 assert_eq!(
488 InternalValue::from_components(*b"a", *b"", 999, ValueType::Tombstone),
489 iter.next().unwrap()?,
490 );
491 iter_closed!(iter);
492
493 test_reverse!(vec);
494
495 Ok(())
496 }
497
498 #[test]
499 #[allow(clippy::unwrap_used)]
500 fn mvcc_stream_tombstone_multi_keys() -> crate::Result<()> {
501 #[rustfmt::skip]
502 let vec = stream![
503 "a", "", "T",
504 "a", "old", "V",
505 "b", "", "T",
506 "b", "old", "V",
507 "c", "", "T",
508 "c", "", "T",
509 "c", "old", "V",
510 ];
511
512 let iter = Box::new(vec.iter().cloned().map(Ok));
513
514 let mut iter = MvccStream::new(iter);
515
516 assert_eq!(
517 InternalValue::from_components(*b"a", *b"", 999, ValueType::Tombstone),
518 iter.next().unwrap()?,
519 );
520 assert_eq!(
521 InternalValue::from_components(*b"b", *b"", 999, ValueType::Tombstone),
522 iter.next().unwrap()?,
523 );
524 assert_eq!(
525 InternalValue::from_components(*b"c", *b"", 999, ValueType::Tombstone),
526 iter.next().unwrap()?,
527 );
528 iter_closed!(iter);
529
530 test_reverse!(vec);
531
532 Ok(())
533 }
534
535 #[test]
536 #[allow(clippy::unwrap_used)]
537 fn mvcc_stream_weak_tombstone_simple() -> crate::Result<()> {
538 #[rustfmt::skip]
539 let vec = stream![
540 "a", "", "W",
541 "a", "old", "V",
542 ];
543
544 let iter = Box::new(vec.iter().cloned().map(Ok));
545
546 let mut iter = MvccStream::new(iter);
547
548 assert_eq!(
549 InternalValue::from_components(*b"a", *b"", 999, ValueType::WeakTombstone),
550 iter.next().unwrap()?,
551 );
552 iter_closed!(iter);
553
554 test_reverse!(vec);
555
556 Ok(())
557 }
558
559 #[test]
560 #[allow(clippy::unwrap_used)]
561 fn mvcc_stream_weak_tombstone_resurrection() -> crate::Result<()> {
562 #[rustfmt::skip]
563 let vec = stream![
564 "a", "", "W",
565 "a", "new", "V",
566 "a", "old", "V",
567 ];
568
569 let iter = Box::new(vec.iter().cloned().map(Ok));
570
571 let mut iter = MvccStream::new(iter);
572
573 assert_eq!(
574 InternalValue::from_components(*b"a", *b"", 999, ValueType::WeakTombstone),
575 iter.next().unwrap()?,
576 );
577 iter_closed!(iter);
578
579 test_reverse!(vec);
580
581 Ok(())
582 }
583
584 #[test]
585 #[allow(clippy::unwrap_used)]
586 fn mvcc_stream_weak_tombstone_priority() -> crate::Result<()> {
587 #[rustfmt::skip]
588 let vec = stream![
589 "a", "", "T",
590 "a", "", "W",
591 "a", "new", "V",
592 "a", "old", "V",
593 ];
594
595 let iter = Box::new(vec.iter().cloned().map(Ok));
596
597 let mut iter = MvccStream::new(iter);
598
599 assert_eq!(
600 InternalValue::from_components(*b"a", *b"", 999, ValueType::Tombstone),
601 iter.next().unwrap()?,
602 );
603 iter_closed!(iter);
604
605 test_reverse!(vec);
606
607 Ok(())
608 }
609
610 #[test]
611 #[allow(clippy::unwrap_used)]
612 fn mvcc_stream_weak_tombstone_multi_keys() -> crate::Result<()> {
613 #[rustfmt::skip]
614 let vec = stream![
615 "a", "", "W",
616 "a", "old", "V",
617 "b", "", "W",
618 "b", "old", "V",
619 "c", "", "W",
620 "c", "old", "V",
621 ];
622
623 let iter = Box::new(vec.iter().cloned().map(Ok));
624
625 let mut iter = MvccStream::new(iter);
626
627 assert_eq!(
628 InternalValue::from_components(*b"a", *b"", 999, ValueType::WeakTombstone),
629 iter.next().unwrap()?,
630 );
631 assert_eq!(
632 InternalValue::from_components(*b"b", *b"", 999, ValueType::WeakTombstone),
633 iter.next().unwrap()?,
634 );
635 assert_eq!(
636 InternalValue::from_components(*b"c", *b"", 999, ValueType::WeakTombstone),
637 iter.next().unwrap()?,
638 );
639 iter_closed!(iter);
640
641 test_reverse!(vec);
642
643 Ok(())
644 }
645}