1use std::sync::Arc;
13
14use crate::errors::{RainDBError, RainDBResult};
15use crate::iterator::CachingIterator;
16use crate::key::InternalKey;
17use crate::table_cache::TableCache;
18use crate::tables::table::TwoLevelIterator;
19use crate::tables::Table;
20use crate::{RainDbIterator, ReadOptions};
21
22use super::file_metadata::FileMetadata;
23
24pub(crate) struct FilesEntryIterator {
38 file_list: Vec<Arc<FileMetadata>>,
40
41 current_file_index: usize,
48
49 current_table_iter: Option<TwoLevelIterator>,
51
52 table_cache: Arc<TableCache>,
54
55 read_options: ReadOptions,
57}
58
59impl FilesEntryIterator {
61 pub(crate) fn new(
63 file_list: Vec<Arc<FileMetadata>>,
64 table_cache: Arc<TableCache>,
65 read_options: ReadOptions,
66 ) -> Self {
67 Self {
68 file_list,
69 current_file_index: 0,
70 current_table_iter: None,
71 table_cache,
72 read_options,
73 }
74 }
75}
76
77impl FilesEntryIterator {
79 fn set_table_iter(&mut self, maybe_new_index: Option<usize>) -> RainDBResult<()> {
81 if maybe_new_index.is_none() || maybe_new_index.unwrap() == self.file_list.len() {
82 self.current_file_index = self.file_list.len();
83 self.current_table_iter = None;
84 return Ok(());
85 }
86
87 if let Some(new_index) = maybe_new_index {
88 if new_index == self.current_file_index && self.current_table_iter.is_some() {
89 return Ok(());
91 }
92
93 let table = self
94 .table_cache
95 .find_table(self.file_list[new_index].file_number())?;
96 self.current_table_iter = Some(Table::iter_with(table, self.read_options.clone()));
97 self.current_file_index = new_index;
98 }
99 Ok(())
100 }
101
102 fn skip_empty_table_files_forward(&mut self) -> RainDBResult<()> {
104 while self.current_table_iter.is_none()
105 || !self.current_table_iter.as_mut().unwrap().is_valid()
106 {
107 if self.current_file_index == self.file_list.len() {
108 self.current_table_iter = None;
110 return Ok(());
111 }
112
113 self.set_table_iter(Some(self.current_file_index + 1))?;
115
116 if self.current_table_iter.is_some() {
117 self.current_table_iter.as_mut().unwrap().seek_to_first()?;
118 }
119 }
120
121 Ok(())
122 }
123
124 fn skip_empty_table_files_backward(&mut self) -> RainDBResult<()> {
126 while self.current_table_iter.is_none()
127 || !self.current_table_iter.as_mut().unwrap().is_valid()
128 {
129 if self.current_file_index == 0 {
130 self.current_table_iter = None;
132 return Ok(());
133 }
134
135 self.set_table_iter(Some(self.current_file_index - 1))?;
137
138 if self.current_table_iter.is_some() {
139 self.current_table_iter.as_mut().unwrap().seek_to_last()?;
140 }
141 }
142
143 Ok(())
144 }
145}
146
147impl RainDbIterator for FilesEntryIterator {
148 type Key = InternalKey;
149
150 type Error = RainDBError;
151
152 fn is_valid(&self) -> bool {
153 self.current_table_iter.is_some() && self.current_table_iter.as_ref().unwrap().is_valid()
154 }
155
156 fn seek(&mut self, target: &Self::Key) -> Result<(), Self::Error> {
157 let maybe_new_index =
158 super::utils::find_file_with_upper_bound_range(&self.file_list, target);
159 self.set_table_iter(maybe_new_index)?;
160
161 if self.current_table_iter.is_some() {
162 self.current_table_iter.as_mut().unwrap().seek(target)?;
163 }
164
165 self.skip_empty_table_files_forward()?;
166
167 Ok(())
168 }
169
170 fn seek_to_first(&mut self) -> Result<(), Self::Error> {
171 let new_file_index = 0;
172 self.set_table_iter(Some(new_file_index))?;
173
174 if self.current_table_iter.is_some() {
175 self.current_table_iter.as_mut().unwrap().seek_to_first()?;
176 }
177
178 self.skip_empty_table_files_forward()?;
179
180 Ok(())
181 }
182
183 fn seek_to_last(&mut self) -> Result<(), Self::Error> {
184 let new_file_index = if self.file_list.is_empty() {
185 0
186 } else {
187 self.file_list.len() - 1
188 };
189 self.set_table_iter(Some(new_file_index))?;
190
191 if self.current_table_iter.is_some() {
192 self.current_table_iter.as_mut().unwrap().seek_to_last()?;
193 }
194
195 self.skip_empty_table_files_backward()?;
196
197 Ok(())
198 }
199
200 fn next(&mut self) -> Option<(&Self::Key, &Vec<u8>)> {
201 if !self.is_valid() {
202 return None;
203 }
204
205 if self.current_table_iter.as_mut().unwrap().next().is_none() {
206 if let Err(error) = self.skip_empty_table_files_forward() {
207 log::error!(
208 "There was an error skipping forward. Original error: {}",
209 error
210 );
211 return None;
212 }
213 }
214
215 if self.is_valid() {
216 return self.current_table_iter.as_mut().unwrap().current();
217 }
218
219 None
220 }
221
222 fn prev(&mut self) -> Option<(&Self::Key, &Vec<u8>)> {
223 if !self.is_valid() {
224 return None;
225 }
226
227 if self.current_table_iter.as_mut().unwrap().prev().is_none() {
228 if let Err(error) = self.skip_empty_table_files_backward() {
229 log::error!(
230 "There was an error skipping backward. Original error: {}",
231 error
232 );
233 return None;
234 }
235 }
236
237 if self.is_valid() {
238 return self.current_table_iter.as_mut().unwrap().current();
239 }
240
241 None
242 }
243
244 fn current(&self) -> Option<(&Self::Key, &Vec<u8>)> {
245 if !self.is_valid() {
246 return None;
247 }
248
249 self.current_table_iter.as_ref().unwrap().current()
250 }
251}
252
253enum IterationDirection {
255 Forward,
256 Backward,
257}
258
259pub(crate) struct MergingIterator {
272 iterators: Vec<CachingIterator>,
274
275 direction: IterationDirection,
277
278 current_iterator_index: Option<usize>,
280
281 errors: Vec<Option<RainDBError>>,
290
291 cleanup_callbacks: Vec<Box<dyn FnOnce()>>,
293}
294
295impl MergingIterator {
297 pub(crate) fn new(
299 iterators: Vec<Box<dyn RainDbIterator<Key = InternalKey, Error = RainDBError>>>,
300 ) -> Self {
301 let wrapped_iterators: Vec<CachingIterator> = iterators
302 .into_iter()
303 .map(|iter| CachingIterator::new(iter))
304 .collect();
305 let errors = vec![None; wrapped_iterators.len()];
306
307 Self {
308 iterators: wrapped_iterators,
309 direction: IterationDirection::Forward,
310 current_iterator_index: None,
311 errors,
312 cleanup_callbacks: vec![],
313 }
314 }
315
316 pub(crate) fn get_error(&mut self) -> Option<RainDBError> {
321 for maybe_error in self.errors.iter_mut() {
322 if maybe_error.is_some() {
323 return maybe_error.take();
324 }
325 }
326
327 None
328 }
329
330 pub(crate) fn register_cleanup_method(&mut self, cleanup: Box<dyn FnOnce()>) {
332 self.cleanup_callbacks.push(cleanup);
333 }
334}
335
336impl MergingIterator {
338 fn find_smallest(&mut self) {
340 if self.iterators.is_empty() {
341 return;
342 }
343
344 let mut maybe_smallest_iterator_index: Option<usize> = None;
345 for (index, iter) in self.iterators.iter().enumerate() {
346 if !iter.is_valid() {
347 continue;
348 }
349
350 if let Some((key, _)) = iter.current() {
351 if maybe_smallest_iterator_index.is_none() {
352 maybe_smallest_iterator_index = Some(index);
353 } else if let Some(smallest_iterator_index) = maybe_smallest_iterator_index {
354 let current_smallest_key =
355 self.iterators[smallest_iterator_index].current().unwrap().0;
356
357 if key < current_smallest_key {
358 maybe_smallest_iterator_index = Some(index);
359 }
360 }
361 }
362 }
363
364 self.current_iterator_index = maybe_smallest_iterator_index;
365 }
366
367 fn find_largest(&mut self) {
369 if self.iterators.is_empty() {
370 return;
371 }
372
373 let mut maybe_largest_iterator_index: Option<usize> = None;
374 for (index, iter) in self.iterators.iter().rev().enumerate() {
375 if !iter.is_valid() {
376 continue;
377 }
378
379 if let Some((key, _)) = iter.current() {
380 if maybe_largest_iterator_index.is_none() {
381 maybe_largest_iterator_index = Some(self.iterators.len() - index - 1);
382 } else if let Some(largest_iterator_index) = maybe_largest_iterator_index {
383 let current_largest_key =
384 self.iterators[largest_iterator_index].current().unwrap().0;
385
386 if key > current_largest_key {
387 maybe_largest_iterator_index = Some(self.iterators.len() - index - 1);
388 }
389 }
390 }
391 }
392
393 self.current_iterator_index = maybe_largest_iterator_index;
394 }
395
396 fn save_error(&mut self, iterator_index: usize, error: RainDBError) {
398 log::error!(
399 "An error occurred during a merge iteration. Error: {}",
400 &error
401 );
402 self.errors[iterator_index] = Some(error);
403 }
404
405 fn advance_current_iterator(&mut self) {
407 if let Some(current_iter_index) = self.current_iterator_index {
408 let current_iter = &mut self.iterators[current_iter_index];
409 current_iter.next();
410 }
411 }
412
413 fn reverse_current_iterator(&mut self) {
415 if let Some(current_iter_index) = self.current_iterator_index {
416 let current_iter = &mut self.iterators[current_iter_index];
417 current_iter.prev();
418 }
419 }
420}
421
422impl RainDbIterator for MergingIterator {
423 type Key = InternalKey;
424
425 type Error = RainDBError;
426
427 fn is_valid(&self) -> bool {
428 self.current_iterator_index.is_some()
429 }
430
431 fn seek(&mut self, target: &Self::Key) -> Result<(), Self::Error> {
432 for index in 0..self.iterators.len() {
433 let iter = &mut self.iterators[index];
434 let seek_result = iter.seek(target);
435 if let Err(error) = seek_result {
436 self.save_error(index, error);
437 }
438 }
439
440 self.find_smallest();
441 self.direction = IterationDirection::Forward;
442
443 Ok(())
444 }
445
446 fn seek_to_first(&mut self) -> Result<(), Self::Error> {
447 for index in 0..self.iterators.len() {
448 let iter = &mut self.iterators[index];
449 let seek_result = iter.seek_to_first();
450 if let Err(error) = seek_result {
451 self.save_error(index, error);
452 }
453 }
454
455 self.find_smallest();
456 self.direction = IterationDirection::Forward;
457
458 Ok(())
459 }
460
461 fn seek_to_last(&mut self) -> Result<(), Self::Error> {
462 for index in 0..self.iterators.len() {
463 let iter = &mut self.iterators[index];
464 let seek_result = iter.seek_to_last();
465 if let Err(error) = seek_result {
466 self.save_error(index, error);
467 }
468 }
469
470 self.find_largest();
471 self.direction = IterationDirection::Backward;
472
473 Ok(())
474 }
475
476 fn next(&mut self) -> Option<(&Self::Key, &Vec<u8>)> {
477 if let IterationDirection::Backward = self.direction {
484 let current_key = self.current().unwrap().0.clone();
485 for index in 0..self.iterators.len() {
486 let mut maybe_error: Option<RainDBError> = None;
492 if let Some(current_index) = self.current_iterator_index {
493 if index == current_index {
494 continue;
495 }
496 }
497
498 let iter = &mut self.iterators[index];
499 let seek_result = iter.seek(¤t_key);
500 if let Err(error) = seek_result {
501 maybe_error = Some(error);
502 }
503
504 if iter.is_valid() && (*iter.current().unwrap().0) == current_key {
505 iter.next();
506 }
507
508 if let Some(error) = maybe_error {
509 self.save_error(index, error);
510 }
511 }
512
513 self.direction = IterationDirection::Forward;
514 }
515
516 self.advance_current_iterator();
517 self.find_smallest();
518
519 self.current()
520 }
521
522 fn prev(&mut self) -> Option<(&Self::Key, &Vec<u8>)> {
523 if let IterationDirection::Forward = self.direction {
530 for index in 0..self.iterators.len() {
531 let current_key = self.current().unwrap().0.clone();
532 let mut maybe_error: Option<RainDBError> = None;
533 if let Some(current_index) = self.current_iterator_index {
534 if index == current_index {
535 continue;
536 }
537 }
538
539 let iter = &mut self.iterators[index];
540 let seek_result = iter.seek(¤t_key);
541 if let Err(error) = seek_result {
542 maybe_error = Some(error);
543 }
544
545 if iter.is_valid() {
546 iter.prev();
549 } else {
550 if let Err(error) = iter.seek_to_last() {
553 maybe_error = Some(error);
554 }
555 }
556
557 if let Some(error) = maybe_error {
558 self.save_error(index, error);
559 }
560 }
561
562 self.direction = IterationDirection::Backward;
563 }
564
565 self.reverse_current_iterator();
566 self.find_largest();
567
568 self.current()
569 }
570
571 fn current(&self) -> Option<(&Self::Key, &Vec<u8>)> {
572 if let Some(current_iter_index) = self.current_iterator_index {
573 let current_iter = &self.iterators[current_iter_index];
574 return current_iter.current();
575 }
576
577 None
578 }
579}
580
581impl Drop for MergingIterator {
582 fn drop(&mut self) {
583 for callback in self.cleanup_callbacks.drain(..) {
585 callback();
586 }
587 }
588}
589
590#[cfg(test)]
591mod files_entry_iterator_tests {
592 use pretty_assertions::assert_eq;
593 use std::rc::Rc;
594
595 use crate::tables::TableBuilder;
596 use crate::versioning::version::Version;
597 use crate::{DbOptions, Operation};
598
599 use super::*;
600
601 #[test]
602 fn files_entry_iterator_with_an_empty_file_list_does_not_become_valid() {
603 let options = DbOptions::with_memory_env();
604 let table_cache = Arc::new(TableCache::new(options.clone(), 1000));
605 let version = create_test_version(options, &table_cache);
606 let mut iter = FilesEntryIterator::new(
607 version.files[4].clone(),
608 Arc::clone(&table_cache),
609 ReadOptions::default(),
610 );
611
612 assert!(!iter.is_valid());
613 assert!(iter.next().is_none());
614 assert!(iter.prev().is_none());
615 assert!(iter
616 .seek(&InternalKey::new(
617 "a".as_bytes().to_vec(),
618 100,
619 Operation::Put
620 ))
621 .is_ok());
622 assert!(iter.current().is_none());
623 assert!(iter.seek_to_first().is_ok());
624 assert!(iter.current().is_none());
625 assert!(iter.seek_to_last().is_ok());
626 assert!(iter.current().is_none());
627 }
628
629 #[test]
630 fn files_entry_iterator_can_seek_to_specific_targets() {
631 let options = DbOptions::with_memory_env();
632 let table_cache = Arc::new(TableCache::new(options.clone(), 1000));
633 let version = create_test_version(options, &table_cache);
634 let mut iter = FilesEntryIterator::new(
635 version.files[1].clone(),
636 Arc::clone(&table_cache),
637 ReadOptions::default(),
638 );
639
640 assert!(
641 !iter.is_valid(),
642 "The iterator should not be valid until its first seek"
643 );
644 assert!(iter
645 .seek(&InternalKey::new(
646 "h".as_bytes().to_vec(),
647 100,
648 Operation::Put
649 ))
650 .is_ok());
651 let (actual_key, actual_val) = iter.current().unwrap();
652 assert_eq!(
653 actual_key,
654 &InternalKey::new("h".as_bytes().to_vec(), 86, Operation::Put)
655 );
656 assert_eq!(actual_val, "h".as_bytes());
657
658 assert!(iter
659 .seek(&InternalKey::new(
660 "w".as_bytes().to_vec(),
661 20,
662 Operation::Put
663 ))
664 .is_ok());
665 let (actual_key, actual_val) = iter.current().unwrap();
666 assert_eq!(
667 actual_key,
668 &InternalKey::new("x".as_bytes().to_vec(), 78, Operation::Delete)
669 );
670 assert_eq!(actual_val, &[]);
671
672 let (actual_key, actual_val) = iter.next().unwrap();
673 assert_eq!(
674 actual_key,
675 &InternalKey::new("y".as_bytes().to_vec(), 79, Operation::Put)
676 );
677 assert_eq!(actual_val, "y".as_bytes());
678
679 assert!(iter.seek_to_first().is_ok());
680 let (actual_key, actual_val) = iter.current().unwrap();
681 assert_eq!(
682 actual_key,
683 &InternalKey::new("g".as_bytes().to_vec(), 85, Operation::Put)
684 );
685 assert_eq!(actual_val, "g".as_bytes());
686
687 assert!(iter.seek_to_last().is_ok());
688 let (actual_key, actual_val) = iter.current().unwrap();
689 assert_eq!(
690 actual_key,
691 &InternalKey::new("y".as_bytes().to_vec(), 79, Operation::Put)
692 );
693 assert_eq!(actual_val, "y".as_bytes());
694 }
695
696 #[test]
697 fn files_entry_iterator_can_be_iterated_forward_completely() {
698 let options = DbOptions::with_memory_env();
699 let table_cache = Arc::new(TableCache::new(options.clone(), 1000));
700 let version = create_test_version(options, &table_cache);
701 let mut iter = FilesEntryIterator::new(
702 version.files[1].clone(),
703 Arc::clone(&table_cache),
704 ReadOptions::default(),
705 );
706
707 assert!(iter.seek_to_first().is_ok());
708 let (actual_key, _) = iter.current().unwrap();
709 assert_eq!(
710 actual_key,
711 &InternalKey::new("g".as_bytes().to_vec(), 85, Operation::Put)
712 );
713
714 while iter.next().is_some() {
715 assert!(
716 iter.current().is_some(),
717 "Iteration did not yield a value but one was expected."
718 );
719 }
720
721 assert!(
722 iter.next().is_none(),
723 "Calling `next` after consuming all the values should not return a value"
724 );
725 assert!(
726 !iter.is_valid(),
727 "The block iterator should not be valid after moving past the end of the iterator"
728 );
729 }
730
731 #[test]
732 fn files_entry_iterator_can_be_iterated_backward_completely() {
733 let options = DbOptions::with_memory_env();
734 let table_cache = Arc::new(TableCache::new(options.clone(), 1000));
735 let version = create_test_version(options, &table_cache);
736 let mut iter = FilesEntryIterator::new(
737 version.files[1].clone(),
738 Arc::clone(&table_cache),
739 ReadOptions::default(),
740 );
741
742 assert!(iter.seek_to_last().is_ok());
743 let (actual_key, _) = iter.current().unwrap();
744 assert_eq!(
745 actual_key,
746 &InternalKey::new("y".as_bytes().to_vec(), 79, Operation::Put)
747 );
748
749 while iter.prev().is_some() {
750 assert!(
751 iter.current().is_some(),
752 "Iteration did not yield a value but one was expected."
753 );
754 }
755
756 assert!(
757 iter.prev().is_none(),
758 "Calling `prev` after consuming all the values should not return a value"
759 );
760 assert!(
761 !iter.is_valid(),
762 "The block iterator should not be valid after moving past the end of the iterator"
763 );
764 }
765
766 fn create_test_version(db_options: DbOptions, table_cache: &Arc<TableCache>) -> Version {
768 let mut version = Version::new(db_options.clone(), table_cache, 200, 30);
769
770 let entries = vec![
772 (
773 ("g".as_bytes().to_vec(), Operation::Put),
774 ("g".as_bytes().to_vec()),
775 ),
776 (
777 ("h".as_bytes().to_vec(), Operation::Put),
778 ("h".as_bytes().to_vec()),
779 ),
780 (
781 ("i".as_bytes().to_vec(), Operation::Put),
782 ("i".as_bytes().to_vec()),
783 ),
784 (
785 ("j".as_bytes().to_vec(), Operation::Put),
786 ("j".as_bytes().to_vec()),
787 ),
788 ];
789 let table_file_meta = create_table(db_options.clone(), entries, 85, 59);
790 version.files[1].push(Arc::new(table_file_meta));
791
792 let entries = vec![
793 (
794 ("o".as_bytes().to_vec(), Operation::Put),
795 ("o".as_bytes().to_vec()),
796 ),
797 (
798 ("r".as_bytes().to_vec(), Operation::Put),
799 ("r".as_bytes().to_vec()),
800 ),
801 (
802 ("s".as_bytes().to_vec(), Operation::Put),
803 ("s".as_bytes().to_vec()),
804 ),
805 (
806 ("t".as_bytes().to_vec(), Operation::Put),
807 ("t".as_bytes().to_vec()),
808 ),
809 (
810 ("u".as_bytes().to_vec(), Operation::Put),
811 ("u".as_bytes().to_vec()),
812 ),
813 ];
814 let table_file_meta = create_table(db_options.clone(), entries, 80, 58);
815 version.files[1].push(Arc::new(table_file_meta));
816
817 let entries = vec![
818 (
819 ("v".as_bytes().to_vec(), Operation::Put),
820 ("v".as_bytes().to_vec()),
821 ),
822 (
823 ("w".as_bytes().to_vec(), Operation::Put),
824 ("w".as_bytes().to_vec()),
825 ),
826 (("x".as_bytes().to_vec(), Operation::Delete), vec![]),
827 (
828 ("y".as_bytes().to_vec(), Operation::Put),
829 ("y".as_bytes().to_vec()),
830 ),
831 ];
832 let table_file_meta = create_table(db_options, entries, 76, 57);
833 version.files[1].push(Arc::new(table_file_meta));
834
835 version
836 }
837
838 fn create_table(
843 db_options: DbOptions,
844 entries: Vec<((Vec<u8>, Operation), Vec<u8>)>,
845 starting_sequence_num: u64,
846 file_number: u64,
847 ) -> FileMetadata {
848 let smallest_key = InternalKey::new(
849 entries.first().unwrap().0 .0.clone(),
850 starting_sequence_num,
851 entries.first().unwrap().0 .1,
852 );
853 let largest_key = InternalKey::new(
854 entries.last().unwrap().0 .0.clone(),
855 starting_sequence_num + (entries.len() as u64) - 1,
856 entries.last().unwrap().0 .1,
857 );
858
859 let mut table_builder = TableBuilder::new(db_options, file_number).unwrap();
860 let mut curr_sequence_num = starting_sequence_num;
861 for ((user_key, operation), value) in entries {
862 table_builder
863 .add_entry(
864 Rc::new(InternalKey::new(user_key, curr_sequence_num, operation)),
865 &value,
866 )
867 .unwrap();
868 curr_sequence_num += 1;
869 }
870
871 table_builder.finalize().unwrap();
872
873 let mut file_meta = FileMetadata::new(file_number);
874 file_meta.set_smallest_key(Some(smallest_key));
875 file_meta.set_largest_key(Some(largest_key));
876 file_meta.set_file_size(table_builder.file_size());
877
878 file_meta
879 }
880}
881
882#[cfg(test)]
883mod merging_iterator_tests {
884 use pretty_assertions::assert_eq;
885 use std::cell::RefCell;
886 use std::rc::Rc;
887
888 use crate::tables::TableBuilder;
889 use crate::versioning::version::Version;
890 use crate::{DbOptions, Operation};
891
892 use super::*;
893
894 #[test]
895 fn with_an_empty_list_of_iterators_does_not_become_valid() {
896 let mut iter = MergingIterator::new(vec![]);
897
898 assert!(!iter.is_valid());
899 assert!(iter.next().is_none());
900 assert!(iter.prev().is_none());
901 assert!(iter
902 .seek(&InternalKey::new(
903 "a".as_bytes().to_vec(),
904 100,
905 Operation::Put
906 ))
907 .is_ok());
908 assert!(iter.current().is_none());
909 assert!(iter.seek_to_first().is_ok());
910 assert!(iter.current().is_none());
911 assert!(iter.seek_to_last().is_ok());
912 assert!(iter.current().is_none());
913 assert!(!iter.is_valid());
914 }
915
916 #[test]
917 fn can_be_iterated_forward_completely() {
918 let options = DbOptions::with_memory_env();
919 let table_cache = Arc::new(TableCache::new(options.clone(), 1000));
920 let version = create_test_version(options, &table_cache);
921 let version_iterators = version
922 .get_representative_iterators(&ReadOptions::default())
923 .unwrap();
924 let mut iter = MergingIterator::new(version_iterators);
925
926 assert!(iter.seek_to_first().is_ok());
927 let (actual_key, _) = iter.current().unwrap();
928 assert_eq!(
929 actual_key,
930 &InternalKey::new("a".as_bytes().to_vec(), 90, Operation::Put)
931 );
932
933 let mut expected_value_counter: usize = 2;
934 while iter.next().is_some() {
935 let (current_key, current_val) = iter.current().unwrap();
936 if current_key.get_operation() == Operation::Put {
937 assert_eq!(
938 current_val,
939 expected_value_counter.to_string().as_bytes(),
940 "Expecting value to be {} but got {}",
941 expected_value_counter.to_string(),
942 String::from_utf8_lossy(current_val)
943 );
944 }
945
946 expected_value_counter += 1;
947 }
948
949 assert!(
950 iter.next().is_none(),
951 "Calling `next` after consuming all the values should not return a value"
952 );
953 assert!(
954 !iter.is_valid(),
955 "The block iterator should not be valid after moving past the end of the iterator"
956 );
957 }
958
959 #[test]
960 fn can_be_iterated_backward_completely() {
961 let options = DbOptions::with_memory_env();
962 let table_cache = Arc::new(TableCache::new(options.clone(), 1000));
963 let version = create_test_version(options, &table_cache);
964 let version_iterators = version
965 .get_representative_iterators(&ReadOptions::default())
966 .unwrap();
967 let mut iter = MergingIterator::new(version_iterators);
968
969 assert!(iter.seek_to_last().is_ok());
970 let (actual_key, _) = iter.current().unwrap();
971 assert_eq!(
972 actual_key,
973 &InternalKey::new("z7".as_bytes().to_vec(), 51, Operation::Put)
974 );
975
976 let mut expected_value_counter: usize = 43;
977 while iter.prev().is_some() {
978 let (current_key, current_val) = iter.current().unwrap();
979 if current_key.get_operation() == Operation::Put {
980 assert_eq!(
981 current_val,
982 expected_value_counter.to_string().as_bytes(),
983 "Expecting value to be {} but got {}",
984 expected_value_counter.to_string(),
985 String::from_utf8_lossy(current_val)
986 );
987 }
988
989 expected_value_counter -= 1;
990 }
991
992 assert!(
993 iter.prev().is_none(),
994 "Calling `prev` after consuming all the values should not return a value"
995 );
996 assert!(
997 !iter.is_valid(),
998 "The block iterator should not be valid after moving past the end of the iterator"
999 );
1000 }
1001
1002 #[test]
1003 fn can_seek_randomly() {
1004 let options = DbOptions::with_memory_env();
1005 let table_cache = Arc::new(TableCache::new(options.clone(), 1000));
1006 let version = create_test_version(options, &table_cache);
1007 let version_iterators = version
1008 .get_representative_iterators(&ReadOptions::default())
1009 .unwrap();
1010 let mut iter = MergingIterator::new(version_iterators);
1011
1012 assert!(
1013 !iter.is_valid(),
1014 "The iterator should not be valid until its first seek"
1015 );
1016 assert!(iter
1017 .seek(&InternalKey::new(
1018 "h".as_bytes().to_vec(),
1019 100,
1020 Operation::Put
1021 ))
1022 .is_ok());
1023 let (actual_key, actual_val) = iter.current().unwrap();
1024 assert_eq!(
1025 actual_key,
1026 &InternalKey::new("h".as_bytes().to_vec(), 86, Operation::Put)
1027 );
1028 assert_eq!(actual_val, "14".as_bytes());
1029
1030 assert!(
1031 iter.next().is_some(),
1032 "Expected to seek to next after a random seek"
1033 );
1034 let (actual_key, _) = iter.current().unwrap();
1035 assert_eq!(
1036 actual_key,
1037 &InternalKey::new("i".as_bytes().to_vec(), 87, Operation::Put)
1038 );
1039
1040 assert!(
1041 iter.seek(&InternalKey::new(
1042 "z4".as_bytes().to_vec(),
1043 50,
1044 Operation::Put
1045 ))
1046 .is_ok(),
1047 "Expected to seek after random iteration"
1048 );
1049 let (actual_key, actual_val) = iter.current().unwrap();
1050 assert_eq!(
1051 actual_key,
1052 &InternalKey::new("z4".as_bytes().to_vec(), 48, Operation::Put)
1053 );
1054 assert_eq!(actual_val, "41".as_bytes());
1055 }
1056
1057 #[test]
1058 fn can_iterate_backwards_after_iterating_forward() {
1059 let options = DbOptions::with_memory_env();
1060 let table_cache = Arc::new(TableCache::new(options.clone(), 1000));
1061 let version = create_test_version(options, &table_cache);
1062 let version_iterators = version
1063 .get_representative_iterators(&ReadOptions::default())
1064 .unwrap();
1065 let mut iter = MergingIterator::new(version_iterators);
1066
1067 assert!(iter
1068 .seek(&InternalKey::new(
1069 "f1".as_bytes().to_vec(),
1070 200,
1071 Operation::Put
1072 ))
1073 .is_ok());
1074 let (actual_key, _) = iter.current().unwrap();
1075 assert_eq!(
1076 actual_key,
1077 &InternalKey::new("f1".as_bytes().to_vec(), 106, Operation::Put)
1078 );
1079
1080 assert!(iter.next().is_some());
1081 let (actual_key, _) = iter.current().unwrap();
1082 assert_eq!(
1083 actual_key,
1084 &InternalKey::new("f2".as_bytes().to_vec(), 107, Operation::Put),
1085 "Expected to seek to next after a random seek"
1086 );
1087
1088 assert!(iter.prev().is_some());
1089 let (actual_key, _) = iter.current().unwrap();
1090 assert_eq!(
1091 actual_key,
1092 &InternalKey::new("f1".as_bytes().to_vec(), 106, Operation::Put),
1093 "Expected to seek to previous from an iterator that was iterating forward"
1094 );
1095
1096 assert!(iter.prev().is_some());
1097 let (actual_key, _) = iter.current().unwrap();
1098 assert_eq!(
1099 actual_key,
1100 &InternalKey::new("f".as_bytes().to_vec(), 103, Operation::Put),
1101 "Expected other child iterators to be correctly set after changing iteration direction"
1102 );
1103 }
1104
1105 #[test]
1106 fn can_iterate_forwards_after_iterating_backward() {
1107 let options = DbOptions::with_memory_env();
1108 let table_cache = Arc::new(TableCache::new(options.clone(), 1000));
1109 let version = create_test_version(options, &table_cache);
1110 let version_iterators = version
1111 .get_representative_iterators(&ReadOptions::default())
1112 .unwrap();
1113 let mut iter = MergingIterator::new(version_iterators);
1114
1115 assert!(iter
1116 .seek(&InternalKey::new(
1117 "y".as_bytes().to_vec(),
1118 58,
1119 Operation::Put
1120 ))
1121 .is_ok());
1122 let (actual_key, _) = iter.current().unwrap();
1123 assert_eq!(
1124 actual_key,
1125 &InternalKey::new("y".as_bytes().to_vec(), 58, Operation::Put)
1126 );
1127
1128 assert!(iter.prev().is_some());
1129 let (actual_key, _) = iter.current().unwrap();
1130 assert_eq!(
1131 actual_key,
1132 &InternalKey::new("y".as_bytes().to_vec(), 79, Operation::Put),
1133 "Expected to seek to prev after a random seek"
1134 );
1135
1136 assert!(iter.next().is_some());
1137 let (actual_key, _) = iter.current().unwrap();
1138 assert_eq!(
1139 actual_key,
1140 &InternalKey::new("y".as_bytes().to_vec(), 58, Operation::Put),
1141 "Expected to seek to previous from an iterator that was iterating forward"
1142 );
1143
1144 assert!(iter.next().is_some());
1145 let (actual_key, _) = iter.current().unwrap();
1146 assert_eq!(
1147 actual_key,
1148 &InternalKey::new("z1".as_bytes().to_vec(), 45, Operation::Put),
1149 "Expected other child iterators to be correctly set after changing iteration direction"
1150 );
1151 }
1152
1153 #[test]
1154 fn calls_cleanup_callbacks_when_dropped() {
1155 let options = DbOptions::with_memory_env();
1156 let table_cache = Arc::new(TableCache::new(options.clone(), 1000));
1157 let version = create_test_version(options, &table_cache);
1158 let version_iterators = version
1159 .get_representative_iterators(&ReadOptions::default())
1160 .unwrap();
1161 let num_times_callback_called: Rc<RefCell<usize>> = Rc::new(RefCell::new(0));
1162 let cloned_counter = Rc::clone(&num_times_callback_called);
1163 let callback = move || {
1164 *cloned_counter.borrow_mut() += 1;
1165 };
1166 let mut iter = MergingIterator::new(version_iterators);
1167
1168 iter.register_cleanup_method(Box::new(callback));
1169 drop(iter);
1170
1171 assert_eq!(
1172 *num_times_callback_called.borrow(),
1173 1,
1174 "Expected cleanup callbacks to have been called once."
1175 );
1176 }
1177
1178 fn create_test_version(db_options: DbOptions, table_cache: &Arc<TableCache>) -> Version {
1180 let mut version = Version::new(db_options.clone(), table_cache, 200, 30);
1181
1182 let entries = vec![
1184 (
1185 ("a".as_bytes().to_vec(), Operation::Put),
1186 ("1".as_bytes().to_vec()),
1187 ),
1188 (
1189 ("b".as_bytes().to_vec(), Operation::Put),
1190 ("2".as_bytes().to_vec()),
1191 ),
1192 (
1193 ("c".as_bytes().to_vec(), Operation::Put),
1194 ("4".as_bytes().to_vec()),
1195 ),
1196 (
1197 ("d".as_bytes().to_vec(), Operation::Put),
1198 ("6".as_bytes().to_vec()),
1199 ),
1200 ];
1201 let table_file_meta = create_table(db_options.clone(), entries, 90, 60);
1202 version.files[0].push(Arc::new(table_file_meta));
1203
1204 let entries = vec![
1205 (
1206 ("c".as_bytes().to_vec(), Operation::Put),
1207 ("3".as_bytes().to_vec()),
1208 ),
1209 (
1210 ("d".as_bytes().to_vec(), Operation::Put),
1211 ("5".as_bytes().to_vec()),
1212 ),
1213 (
1214 ("e".as_bytes().to_vec(), Operation::Put),
1215 ("7".as_bytes().to_vec()),
1216 ),
1217 (
1218 ("f".as_bytes().to_vec(), Operation::Put),
1219 ("9".as_bytes().to_vec()),
1220 ),
1221 ];
1222 let table_file_meta = create_table(db_options.clone(), entries, 100, 61);
1223 version.files[0].push(Arc::new(table_file_meta));
1224
1225 let entries = vec![
1226 (
1227 ("f".as_bytes().to_vec(), Operation::Put),
1228 ("8".as_bytes().to_vec()),
1229 ),
1230 (
1231 ("f1".as_bytes().to_vec(), Operation::Put),
1232 ("10".as_bytes().to_vec()),
1233 ),
1234 (
1235 ("f2".as_bytes().to_vec(), Operation::Put),
1236 ("11".as_bytes().to_vec()),
1237 ),
1238 (
1239 ("f3".as_bytes().to_vec(), Operation::Put),
1240 ("12".as_bytes().to_vec()),
1241 ),
1242 ];
1243 let table_file_meta = create_table(db_options.clone(), entries, 105, 62);
1244 version.files[0].push(Arc::new(table_file_meta));
1245
1246 let entries = vec![
1248 (
1249 ("g".as_bytes().to_vec(), Operation::Put),
1250 ("13".as_bytes().to_vec()),
1251 ),
1252 (
1253 ("h".as_bytes().to_vec(), Operation::Put),
1254 ("14".as_bytes().to_vec()),
1255 ),
1256 (
1257 ("i".as_bytes().to_vec(), Operation::Put),
1258 ("15".as_bytes().to_vec()),
1259 ),
1260 (
1261 ("j".as_bytes().to_vec(), Operation::Put),
1262 ("16".as_bytes().to_vec()),
1263 ),
1264 ];
1265 let table_file_meta = create_table(db_options.clone(), entries, 85, 59);
1266 version.files[1].push(Arc::new(table_file_meta));
1267
1268 let entries = vec![
1269 (
1270 ("o".as_bytes().to_vec(), Operation::Put),
1271 ("21".as_bytes().to_vec()),
1272 ),
1273 (
1274 ("r".as_bytes().to_vec(), Operation::Put),
1275 ("24".as_bytes().to_vec()),
1276 ),
1277 (
1278 ("s".as_bytes().to_vec(), Operation::Put),
1279 ("26".as_bytes().to_vec()),
1280 ),
1281 (
1282 ("t".as_bytes().to_vec(), Operation::Put),
1283 ("28".as_bytes().to_vec()),
1284 ),
1285 (
1286 ("u".as_bytes().to_vec(), Operation::Put),
1287 ("29".as_bytes().to_vec()),
1288 ),
1289 ];
1290 let table_file_meta = create_table(db_options.clone(), entries, 80, 58);
1291 version.files[1].push(Arc::new(table_file_meta));
1292
1293 let entries = vec![
1294 (
1295 ("v".as_bytes().to_vec(), Operation::Put),
1296 ("30".as_bytes().to_vec()),
1297 ),
1298 (
1299 ("w".as_bytes().to_vec(), Operation::Put),
1300 ("32".as_bytes().to_vec()),
1301 ),
1302 (("x".as_bytes().to_vec(), Operation::Delete), vec![]),
1303 (
1304 ("y".as_bytes().to_vec(), Operation::Put),
1305 ("36".as_bytes().to_vec()),
1306 ),
1307 ];
1308 let table_file_meta = create_table(db_options.clone(), entries, 76, 57);
1309 version.files[1].push(Arc::new(table_file_meta));
1310
1311 let entries = vec![
1313 (
1314 ("k".as_bytes().to_vec(), Operation::Put),
1315 ("17".as_bytes().to_vec()),
1316 ),
1317 (
1318 ("l".as_bytes().to_vec(), Operation::Put),
1319 ("18".as_bytes().to_vec()),
1320 ),
1321 (
1322 ("m".as_bytes().to_vec(), Operation::Put),
1323 ("19".as_bytes().to_vec()),
1324 ),
1325 (
1326 ("n".as_bytes().to_vec(), Operation::Put),
1327 ("20".as_bytes().to_vec()),
1328 ),
1329 ];
1330 let table_file_meta = create_table(db_options.clone(), entries, 65, 55);
1331 version.files[2].push(Arc::new(table_file_meta));
1332
1333 let entries = vec![
1334 (
1335 ("o".as_bytes().to_vec(), Operation::Put),
1336 ("22".as_bytes().to_vec()),
1337 ),
1338 (
1339 ("p".as_bytes().to_vec(), Operation::Put),
1340 ("23".as_bytes().to_vec()),
1341 ),
1342 (
1343 ("r".as_bytes().to_vec(), Operation::Put),
1344 ("25".as_bytes().to_vec()),
1345 ),
1346 (
1347 ("s".as_bytes().to_vec(), Operation::Put),
1348 ("27".as_bytes().to_vec()),
1349 ),
1350 ];
1351 let table_file_meta = create_table(db_options.clone(), entries, 60, 54);
1352 version.files[2].push(Arc::new(table_file_meta));
1353
1354 let entries = vec![
1355 (
1356 ("v".as_bytes().to_vec(), Operation::Put),
1357 ("31".as_bytes().to_vec()),
1358 ),
1359 (
1360 ("w".as_bytes().to_vec(), Operation::Put),
1361 ("33".as_bytes().to_vec()),
1362 ),
1363 (
1364 ("x".as_bytes().to_vec(), Operation::Put),
1365 ("35".as_bytes().to_vec()),
1366 ),
1367 (
1368 ("y".as_bytes().to_vec(), Operation::Put),
1369 ("37".as_bytes().to_vec()),
1370 ),
1371 ];
1372 let table_file_meta = create_table(db_options.clone(), entries, 55, 53);
1373 version.files[2].push(Arc::new(table_file_meta));
1374
1375 let entries = vec![
1377 (
1378 ("z1".as_bytes().to_vec(), Operation::Put),
1379 ("38".as_bytes().to_vec()),
1380 ),
1381 (
1382 ("z2".as_bytes().to_vec(), Operation::Put),
1383 ("39".as_bytes().to_vec()),
1384 ),
1385 ];
1386 let table_file_meta = create_table(db_options.clone(), entries, 45, 52);
1387 version.files[3].push(Arc::new(table_file_meta));
1388
1389 let entries = vec![
1390 (
1391 ("z3".as_bytes().to_vec(), Operation::Put),
1392 ("40".as_bytes().to_vec()),
1393 ),
1394 (
1395 ("z4".as_bytes().to_vec(), Operation::Put),
1396 ("41".as_bytes().to_vec()),
1397 ),
1398 ];
1399 let table_file_meta = create_table(db_options.clone(), entries, 47, 51);
1400 version.files[3].push(Arc::new(table_file_meta));
1401
1402 let entries = vec![
1403 (
1404 ("z5".as_bytes().to_vec(), Operation::Put),
1405 ("42".as_bytes().to_vec()),
1406 ),
1407 (
1408 ("z6".as_bytes().to_vec(), Operation::Put),
1409 ("43".as_bytes().to_vec()),
1410 ),
1411 (
1412 ("z7".as_bytes().to_vec(), Operation::Put),
1413 ("44".as_bytes().to_vec()),
1414 ),
1415 ];
1416 let table_file_meta = create_table(db_options, entries, 49, 50);
1417 version.files[3].push(Arc::new(table_file_meta));
1418
1419 version
1420 }
1421
1422 fn create_table(
1427 db_options: DbOptions,
1428 entries: Vec<((Vec<u8>, Operation), Vec<u8>)>,
1429 starting_sequence_num: u64,
1430 file_number: u64,
1431 ) -> FileMetadata {
1432 let smallest_key = InternalKey::new(
1433 entries.first().unwrap().0 .0.clone(),
1434 starting_sequence_num,
1435 entries.first().unwrap().0 .1,
1436 );
1437 let largest_key = InternalKey::new(
1438 entries.last().unwrap().0 .0.clone(),
1439 starting_sequence_num + (entries.len() as u64) - 1,
1440 entries.last().unwrap().0 .1,
1441 );
1442
1443 let mut table_builder = TableBuilder::new(db_options, file_number).unwrap();
1444 let mut curr_sequence_num = starting_sequence_num;
1445 for ((user_key, operation), value) in entries {
1446 table_builder
1447 .add_entry(
1448 Rc::new(InternalKey::new(user_key, curr_sequence_num, operation)),
1449 &value,
1450 )
1451 .unwrap();
1452 curr_sequence_num += 1;
1453 }
1454
1455 table_builder.finalize().unwrap();
1456
1457 let mut file_meta = FileMetadata::new(file_number);
1458 file_meta.set_smallest_key(Some(smallest_key));
1459 file_meta.set_largest_key(Some(largest_key));
1460 file_meta.set_file_size(table_builder.file_size());
1461
1462 file_meta
1463 }
1464}