raindb/versioning/
file_iterators.rs

1// Copyright (c) 2022 Google LLC
2//
3// Use of this source code is governed by an MIT-style
4// license that can be found in the LICENSE file or at
5// https://opensource.org/licenses/MIT.
6
7/*!
8This modules contains iterators and utilities for creating iterators that work over a set of table
9file metadata (i.e. [`FileMetadata`]).
10*/
11
12use std::sync::Arc;
13
14use crate::errors::{RainDBError, RainDBResult};
15use crate::iterator::CachingIterator;
16use crate::key::InternalKey;
17use crate::table_cache::TableCache;
18use crate::tables::table::TwoLevelIterator;
19use crate::tables::Table;
20use crate::{RainDbIterator, ReadOptions};
21
22use super::file_metadata::FileMetadata;
23
24/**
25Iterates over the entries in an ordered list of files.
26
27This iterator is meant to serve as an index iterator for two level iterators where a second level
28of iterators receives the information yielded by this iterator to retrieve more concrete values.
29
30# Legacy
31
32This iterator combines LevelDB's `Version::LevelFileNumIterator`, `leveldb::GetFileIterator`, and
33`Version::NewConcatenatingIterator`. This is because I don't think it is worth the trouble of
34making a generic "two-level iterator" like LevelDB's `leveldb::TwoLevelIterator`. Just thinking
35about the potential lifetime and closure issues is mind-numbing.
36*/
37pub(crate) struct FilesEntryIterator {
38    /// The ordered list of files to iterate.
39    file_list: Vec<Arc<FileMetadata>>,
40
41    /**
42    The current index into the file list that the cursor is at.
43
44    This value needs to stay in sync with the current table iterator. To ensure this, we only set
45    this value in `FilesEntryIterator::set_table_iter`.
46    */
47    current_file_index: usize,
48
49    /// The current table being iterated for entries.
50    current_table_iter: Option<TwoLevelIterator>,
51
52    /// The table cache to retrieve table files from.
53    table_cache: Arc<TableCache>,
54
55    /// Options to configure behavior when reading from table files.
56    read_options: ReadOptions,
57}
58
59/// Crate-only methods
60impl FilesEntryIterator {
61    /// Create a new instance of [`FilesEntryIterator`].
62    pub(crate) fn new(
63        file_list: Vec<Arc<FileMetadata>>,
64        table_cache: Arc<TableCache>,
65        read_options: ReadOptions,
66    ) -> Self {
67        Self {
68            file_list,
69            current_file_index: 0,
70            current_table_iter: None,
71            table_cache,
72            read_options,
73        }
74    }
75}
76
77/// Private methods
78impl FilesEntryIterator {
79    /// Set the table iterator to be used for iteration.
80    fn set_table_iter(&mut self, maybe_new_index: Option<usize>) -> RainDBResult<()> {
81        if maybe_new_index.is_none() || maybe_new_index.unwrap() == self.file_list.len() {
82            self.current_file_index = self.file_list.len();
83            self.current_table_iter = None;
84            return Ok(());
85        }
86
87        if let Some(new_index) = maybe_new_index {
88            if new_index == self.current_file_index && self.current_table_iter.is_some() {
89                // The file we need to iterate is the same as the current one. No update necessary
90                return Ok(());
91            }
92
93            let table = self
94                .table_cache
95                .find_table(self.file_list[new_index].file_number())?;
96            self.current_table_iter = Some(Table::iter_with(table, self.read_options.clone()));
97            self.current_file_index = new_index;
98        }
99        Ok(())
100    }
101
102    /// Move forward through any empty files.
103    fn skip_empty_table_files_forward(&mut self) -> RainDBResult<()> {
104        while self.current_table_iter.is_none()
105            || !self.current_table_iter.as_mut().unwrap().is_valid()
106        {
107            if self.current_file_index == self.file_list.len() {
108                // We've reached the end of the file list so there are no more tables to iterate
109                self.current_table_iter = None;
110                return Ok(());
111            }
112
113            // Move index iterator to check for the next data block handle
114            self.set_table_iter(Some(self.current_file_index + 1))?;
115
116            if self.current_table_iter.is_some() {
117                self.current_table_iter.as_mut().unwrap().seek_to_first()?;
118            }
119        }
120
121        Ok(())
122    }
123
124    /// Move the file index and table iterator backward until we find a non-empty file.
125    fn skip_empty_table_files_backward(&mut self) -> RainDBResult<()> {
126        while self.current_table_iter.is_none()
127            || !self.current_table_iter.as_mut().unwrap().is_valid()
128        {
129            if self.current_file_index == 0 {
130                // We've reached the start of the file list so there are no more tables to iterate
131                self.current_table_iter = None;
132                return Ok(());
133            }
134
135            // Move index iterator to check for the next data block handle
136            self.set_table_iter(Some(self.current_file_index - 1))?;
137
138            if self.current_table_iter.is_some() {
139                self.current_table_iter.as_mut().unwrap().seek_to_last()?;
140            }
141        }
142
143        Ok(())
144    }
145}
146
147impl RainDbIterator for FilesEntryIterator {
148    type Key = InternalKey;
149
150    type Error = RainDBError;
151
152    fn is_valid(&self) -> bool {
153        self.current_table_iter.is_some() && self.current_table_iter.as_ref().unwrap().is_valid()
154    }
155
156    fn seek(&mut self, target: &Self::Key) -> Result<(), Self::Error> {
157        let maybe_new_index =
158            super::utils::find_file_with_upper_bound_range(&self.file_list, target);
159        self.set_table_iter(maybe_new_index)?;
160
161        if self.current_table_iter.is_some() {
162            self.current_table_iter.as_mut().unwrap().seek(target)?;
163        }
164
165        self.skip_empty_table_files_forward()?;
166
167        Ok(())
168    }
169
170    fn seek_to_first(&mut self) -> Result<(), Self::Error> {
171        let new_file_index = 0;
172        self.set_table_iter(Some(new_file_index))?;
173
174        if self.current_table_iter.is_some() {
175            self.current_table_iter.as_mut().unwrap().seek_to_first()?;
176        }
177
178        self.skip_empty_table_files_forward()?;
179
180        Ok(())
181    }
182
183    fn seek_to_last(&mut self) -> Result<(), Self::Error> {
184        let new_file_index = if self.file_list.is_empty() {
185            0
186        } else {
187            self.file_list.len() - 1
188        };
189        self.set_table_iter(Some(new_file_index))?;
190
191        if self.current_table_iter.is_some() {
192            self.current_table_iter.as_mut().unwrap().seek_to_last()?;
193        }
194
195        self.skip_empty_table_files_backward()?;
196
197        Ok(())
198    }
199
200    fn next(&mut self) -> Option<(&Self::Key, &Vec<u8>)> {
201        if !self.is_valid() {
202            return None;
203        }
204
205        if self.current_table_iter.as_mut().unwrap().next().is_none() {
206            if let Err(error) = self.skip_empty_table_files_forward() {
207                log::error!(
208                    "There was an error skipping forward. Original error: {}",
209                    error
210                );
211                return None;
212            }
213        }
214
215        if self.is_valid() {
216            return self.current_table_iter.as_mut().unwrap().current();
217        }
218
219        None
220    }
221
222    fn prev(&mut self) -> Option<(&Self::Key, &Vec<u8>)> {
223        if !self.is_valid() {
224            return None;
225        }
226
227        if self.current_table_iter.as_mut().unwrap().prev().is_none() {
228            if let Err(error) = self.skip_empty_table_files_backward() {
229                log::error!(
230                    "There was an error skipping backward. Original error: {}",
231                    error
232                );
233                return None;
234            }
235        }
236
237        if self.is_valid() {
238            return self.current_table_iter.as_mut().unwrap().current();
239        }
240
241        None
242    }
243
244    fn current(&self) -> Option<(&Self::Key, &Vec<u8>)> {
245        if !self.is_valid() {
246            return None;
247        }
248
249        self.current_table_iter.as_ref().unwrap().current()
250    }
251}
252
253/// Enum for indicating the direction of iteration.
254enum IterationDirection {
255    Forward,
256    Backward,
257}
258
259/**
260An iterator that merges the output of a list of iterators in sorted order.
261
262This iterator does not do any sort of deduplication or filtering of sequence numbers.
263
264# Legacy
265
266As in LevelDB, a heap is not used to merge the inputs because the number of iterators that would be
267merged by the heap/priority queue is small. The only level where multiple iterators is created is
268level 0 which has a maximum of 12 files. This means that at most 12 + 1 iterator from the parent
269level will need to merged.
270*/
271pub(crate) struct MergingIterator {
272    /// The underlying iterators.
273    iterators: Vec<CachingIterator>,
274
275    /// The current direction of iteration.
276    direction: IterationDirection,
277
278    /// The index of the iterator we are currently reading.
279    current_iterator_index: Option<usize>,
280
281    /**
282    Store errors encountered during iteration. The index of the error maps to the index of the
283    iterator that encountered the error. Only one error is stored per iterator.
284
285    LevelDB doesn't throw errors when errors are encountered during iteration. A client needs to
286    explicitly check for an error status by calling `Iterator::status()`. This vector emulates
287    this behavior.
288    */
289    errors: Vec<Option<RainDBError>>,
290
291    /// Functions called to perform cleanup tasks when the iterator is dropped.
292    cleanup_callbacks: Vec<Box<dyn FnOnce()>>,
293}
294
295/// Crate-only methods
296impl MergingIterator {
297    /// Creata a new instance of [`MergingIterator`].
298    pub(crate) fn new(
299        iterators: Vec<Box<dyn RainDbIterator<Key = InternalKey, Error = RainDBError>>>,
300    ) -> Self {
301        let wrapped_iterators: Vec<CachingIterator> = iterators
302            .into_iter()
303            .map(|iter| CachingIterator::new(iter))
304            .collect();
305        let errors = vec![None; wrapped_iterators.len()];
306
307        Self {
308            iterators: wrapped_iterators,
309            direction: IterationDirection::Forward,
310            current_iterator_index: None,
311            errors,
312            cleanup_callbacks: vec![],
313        }
314    }
315
316    /**
317    Get the first error if any. This will take ownership of the error, leaving a `None` in its
318    place.
319    */
320    pub(crate) fn get_error(&mut self) -> Option<RainDBError> {
321        for maybe_error in self.errors.iter_mut() {
322            if maybe_error.is_some() {
323                return maybe_error.take();
324            }
325        }
326
327        None
328    }
329
330    /// Register a closure that is called when the iterator is dropped.
331    pub(crate) fn register_cleanup_method(&mut self, cleanup: Box<dyn FnOnce()>) {
332        self.cleanup_callbacks.push(cleanup);
333    }
334}
335
336/// Private methods
337impl MergingIterator {
338    /// Find the iterator with the currently smallest key and update the merging iterator state.
339    fn find_smallest(&mut self) {
340        if self.iterators.is_empty() {
341            return;
342        }
343
344        let mut maybe_smallest_iterator_index: Option<usize> = None;
345        for (index, iter) in self.iterators.iter().enumerate() {
346            if !iter.is_valid() {
347                continue;
348            }
349
350            if let Some((key, _)) = iter.current() {
351                if maybe_smallest_iterator_index.is_none() {
352                    maybe_smallest_iterator_index = Some(index);
353                } else if let Some(smallest_iterator_index) = maybe_smallest_iterator_index {
354                    let current_smallest_key =
355                        self.iterators[smallest_iterator_index].current().unwrap().0;
356
357                    if key < current_smallest_key {
358                        maybe_smallest_iterator_index = Some(index);
359                    }
360                }
361            }
362        }
363
364        self.current_iterator_index = maybe_smallest_iterator_index;
365    }
366
367    /// Find the iterator with the currently largest key and update the merging iterator state.
368    fn find_largest(&mut self) {
369        if self.iterators.is_empty() {
370            return;
371        }
372
373        let mut maybe_largest_iterator_index: Option<usize> = None;
374        for (index, iter) in self.iterators.iter().rev().enumerate() {
375            if !iter.is_valid() {
376                continue;
377            }
378
379            if let Some((key, _)) = iter.current() {
380                if maybe_largest_iterator_index.is_none() {
381                    maybe_largest_iterator_index = Some(self.iterators.len() - index - 1);
382                } else if let Some(largest_iterator_index) = maybe_largest_iterator_index {
383                    let current_largest_key =
384                        self.iterators[largest_iterator_index].current().unwrap().0;
385
386                    if key > current_largest_key {
387                        maybe_largest_iterator_index = Some(self.iterators.len() - index - 1);
388                    }
389                }
390            }
391        }
392
393        self.current_iterator_index = maybe_largest_iterator_index;
394    }
395
396    /// Store the error at the specified index.
397    fn save_error(&mut self, iterator_index: usize, error: RainDBError) {
398        log::error!(
399            "An error occurred during a merge iteration. Error: {}",
400            &error
401        );
402        self.errors[iterator_index] = Some(error);
403    }
404
405    /// Move the current iterator to the next entry.
406    fn advance_current_iterator(&mut self) {
407        if let Some(current_iter_index) = self.current_iterator_index {
408            let current_iter = &mut self.iterators[current_iter_index];
409            current_iter.next();
410        }
411    }
412
413    /// Move the current iterator to the prev entry.
414    fn reverse_current_iterator(&mut self) {
415        if let Some(current_iter_index) = self.current_iterator_index {
416            let current_iter = &mut self.iterators[current_iter_index];
417            current_iter.prev();
418        }
419    }
420}
421
422impl RainDbIterator for MergingIterator {
423    type Key = InternalKey;
424
425    type Error = RainDBError;
426
427    fn is_valid(&self) -> bool {
428        self.current_iterator_index.is_some()
429    }
430
431    fn seek(&mut self, target: &Self::Key) -> Result<(), Self::Error> {
432        for index in 0..self.iterators.len() {
433            let iter = &mut self.iterators[index];
434            let seek_result = iter.seek(target);
435            if let Err(error) = seek_result {
436                self.save_error(index, error);
437            }
438        }
439
440        self.find_smallest();
441        self.direction = IterationDirection::Forward;
442
443        Ok(())
444    }
445
446    fn seek_to_first(&mut self) -> Result<(), Self::Error> {
447        for index in 0..self.iterators.len() {
448            let iter = &mut self.iterators[index];
449            let seek_result = iter.seek_to_first();
450            if let Err(error) = seek_result {
451                self.save_error(index, error);
452            }
453        }
454
455        self.find_smallest();
456        self.direction = IterationDirection::Forward;
457
458        Ok(())
459    }
460
461    fn seek_to_last(&mut self) -> Result<(), Self::Error> {
462        for index in 0..self.iterators.len() {
463            let iter = &mut self.iterators[index];
464            let seek_result = iter.seek_to_last();
465            if let Err(error) = seek_result {
466                self.save_error(index, error);
467            }
468        }
469
470        self.find_largest();
471        self.direction = IterationDirection::Backward;
472
473        Ok(())
474    }
475
476    fn next(&mut self) -> Option<(&Self::Key, &Vec<u8>)> {
477        /*
478        Ensure that all child iterators are positioned after the current key.
479        If we are already moving in the forward direction, this is implicitly true for all
480        non-current iterators since the current iterator is the smallest iterator. Otherwise, we
481        explicitly position the non-current iterators.
482        */
483        if let IterationDirection::Backward = self.direction {
484            let current_key = self.current().unwrap().0.clone();
485            for index in 0..self.iterators.len() {
486                /*
487                We are forced to do this obtuse error saving because Rust can't handle multiple
488                mutable borrows to fields we use disjointly. It is also because of this that we
489                use indexes instead of `.iter_mut` to iterate the child iterators.
490                */
491                let mut maybe_error: Option<RainDBError> = None;
492                if let Some(current_index) = self.current_iterator_index {
493                    if index == current_index {
494                        continue;
495                    }
496                }
497
498                let iter = &mut self.iterators[index];
499                let seek_result = iter.seek(&current_key);
500                if let Err(error) = seek_result {
501                    maybe_error = Some(error);
502                }
503
504                if iter.is_valid() && (*iter.current().unwrap().0) == current_key {
505                    iter.next();
506                }
507
508                if let Some(error) = maybe_error {
509                    self.save_error(index, error);
510                }
511            }
512
513            self.direction = IterationDirection::Forward;
514        }
515
516        self.advance_current_iterator();
517        self.find_smallest();
518
519        self.current()
520    }
521
522    fn prev(&mut self) -> Option<(&Self::Key, &Vec<u8>)> {
523        /*
524        Ensure that all child iterators are positioned before the current key.
525        If we are already moving in the backward direction, this is implicitly true for all
526        non-current iterators since the current iterator is the largest iterator. Otherwise, we
527        explicitly position the non-current iterators.
528        */
529        if let IterationDirection::Forward = self.direction {
530            for index in 0..self.iterators.len() {
531                let current_key = self.current().unwrap().0.clone();
532                let mut maybe_error: Option<RainDBError> = None;
533                if let Some(current_index) = self.current_iterator_index {
534                    if index == current_index {
535                        continue;
536                    }
537                }
538
539                let iter = &mut self.iterators[index];
540                let seek_result = iter.seek(&current_key);
541                if let Err(error) = seek_result {
542                    maybe_error = Some(error);
543                }
544
545                if iter.is_valid() {
546                    // The child iterator's first entry is >= the current key. Step back one to be
547                    // less than the current key
548                    iter.prev();
549                } else {
550                    // The child iterator has no entries with keys >= the current key. Position at
551                    // the last entry.
552                    if let Err(error) = iter.seek_to_last() {
553                        maybe_error = Some(error);
554                    }
555                }
556
557                if let Some(error) = maybe_error {
558                    self.save_error(index, error);
559                }
560            }
561
562            self.direction = IterationDirection::Backward;
563        }
564
565        self.reverse_current_iterator();
566        self.find_largest();
567
568        self.current()
569    }
570
571    fn current(&self) -> Option<(&Self::Key, &Vec<u8>)> {
572        if let Some(current_iter_index) = self.current_iterator_index {
573            let current_iter = &self.iterators[current_iter_index];
574            return current_iter.current();
575        }
576
577        None
578    }
579}
580
581impl Drop for MergingIterator {
582    fn drop(&mut self) {
583        // Call cleanup closures
584        for callback in self.cleanup_callbacks.drain(..) {
585            callback();
586        }
587    }
588}
589
590#[cfg(test)]
591mod files_entry_iterator_tests {
592    use pretty_assertions::assert_eq;
593    use std::rc::Rc;
594
595    use crate::tables::TableBuilder;
596    use crate::versioning::version::Version;
597    use crate::{DbOptions, Operation};
598
599    use super::*;
600
601    #[test]
602    fn files_entry_iterator_with_an_empty_file_list_does_not_become_valid() {
603        let options = DbOptions::with_memory_env();
604        let table_cache = Arc::new(TableCache::new(options.clone(), 1000));
605        let version = create_test_version(options, &table_cache);
606        let mut iter = FilesEntryIterator::new(
607            version.files[4].clone(),
608            Arc::clone(&table_cache),
609            ReadOptions::default(),
610        );
611
612        assert!(!iter.is_valid());
613        assert!(iter.next().is_none());
614        assert!(iter.prev().is_none());
615        assert!(iter
616            .seek(&InternalKey::new(
617                "a".as_bytes().to_vec(),
618                100,
619                Operation::Put
620            ))
621            .is_ok());
622        assert!(iter.current().is_none());
623        assert!(iter.seek_to_first().is_ok());
624        assert!(iter.current().is_none());
625        assert!(iter.seek_to_last().is_ok());
626        assert!(iter.current().is_none());
627    }
628
629    #[test]
630    fn files_entry_iterator_can_seek_to_specific_targets() {
631        let options = DbOptions::with_memory_env();
632        let table_cache = Arc::new(TableCache::new(options.clone(), 1000));
633        let version = create_test_version(options, &table_cache);
634        let mut iter = FilesEntryIterator::new(
635            version.files[1].clone(),
636            Arc::clone(&table_cache),
637            ReadOptions::default(),
638        );
639
640        assert!(
641            !iter.is_valid(),
642            "The iterator should not be valid until its first seek"
643        );
644        assert!(iter
645            .seek(&InternalKey::new(
646                "h".as_bytes().to_vec(),
647                100,
648                Operation::Put
649            ))
650            .is_ok());
651        let (actual_key, actual_val) = iter.current().unwrap();
652        assert_eq!(
653            actual_key,
654            &InternalKey::new("h".as_bytes().to_vec(), 86, Operation::Put)
655        );
656        assert_eq!(actual_val, "h".as_bytes());
657
658        assert!(iter
659            .seek(&InternalKey::new(
660                "w".as_bytes().to_vec(),
661                20,
662                Operation::Put
663            ))
664            .is_ok());
665        let (actual_key, actual_val) = iter.current().unwrap();
666        assert_eq!(
667            actual_key,
668            &InternalKey::new("x".as_bytes().to_vec(), 78, Operation::Delete)
669        );
670        assert_eq!(actual_val, &[]);
671
672        let (actual_key, actual_val) = iter.next().unwrap();
673        assert_eq!(
674            actual_key,
675            &InternalKey::new("y".as_bytes().to_vec(), 79, Operation::Put)
676        );
677        assert_eq!(actual_val, "y".as_bytes());
678
679        assert!(iter.seek_to_first().is_ok());
680        let (actual_key, actual_val) = iter.current().unwrap();
681        assert_eq!(
682            actual_key,
683            &InternalKey::new("g".as_bytes().to_vec(), 85, Operation::Put)
684        );
685        assert_eq!(actual_val, "g".as_bytes());
686
687        assert!(iter.seek_to_last().is_ok());
688        let (actual_key, actual_val) = iter.current().unwrap();
689        assert_eq!(
690            actual_key,
691            &InternalKey::new("y".as_bytes().to_vec(), 79, Operation::Put)
692        );
693        assert_eq!(actual_val, "y".as_bytes());
694    }
695
696    #[test]
697    fn files_entry_iterator_can_be_iterated_forward_completely() {
698        let options = DbOptions::with_memory_env();
699        let table_cache = Arc::new(TableCache::new(options.clone(), 1000));
700        let version = create_test_version(options, &table_cache);
701        let mut iter = FilesEntryIterator::new(
702            version.files[1].clone(),
703            Arc::clone(&table_cache),
704            ReadOptions::default(),
705        );
706
707        assert!(iter.seek_to_first().is_ok());
708        let (actual_key, _) = iter.current().unwrap();
709        assert_eq!(
710            actual_key,
711            &InternalKey::new("g".as_bytes().to_vec(), 85, Operation::Put)
712        );
713
714        while iter.next().is_some() {
715            assert!(
716                iter.current().is_some(),
717                "Iteration did not yield a value but one was expected."
718            );
719        }
720
721        assert!(
722            iter.next().is_none(),
723            "Calling `next` after consuming all the values should not return a value"
724        );
725        assert!(
726            !iter.is_valid(),
727            "The block iterator should not be valid after moving past the end of the iterator"
728        );
729    }
730
731    #[test]
732    fn files_entry_iterator_can_be_iterated_backward_completely() {
733        let options = DbOptions::with_memory_env();
734        let table_cache = Arc::new(TableCache::new(options.clone(), 1000));
735        let version = create_test_version(options, &table_cache);
736        let mut iter = FilesEntryIterator::new(
737            version.files[1].clone(),
738            Arc::clone(&table_cache),
739            ReadOptions::default(),
740        );
741
742        assert!(iter.seek_to_last().is_ok());
743        let (actual_key, _) = iter.current().unwrap();
744        assert_eq!(
745            actual_key,
746            &InternalKey::new("y".as_bytes().to_vec(), 79, Operation::Put)
747        );
748
749        while iter.prev().is_some() {
750            assert!(
751                iter.current().is_some(),
752                "Iteration did not yield a value but one was expected."
753            );
754        }
755
756        assert!(
757            iter.prev().is_none(),
758            "Calling `prev` after consuming all the values should not return a value"
759        );
760        assert!(
761            !iter.is_valid(),
762            "The block iterator should not be valid after moving past the end of the iterator"
763        );
764    }
765
766    /// Creates version used to hold files for testing.
767    fn create_test_version(db_options: DbOptions, table_cache: &Arc<TableCache>) -> Version {
768        let mut version = Version::new(db_options.clone(), table_cache, 200, 30);
769
770        // Level 1
771        let entries = vec![
772            (
773                ("g".as_bytes().to_vec(), Operation::Put),
774                ("g".as_bytes().to_vec()),
775            ),
776            (
777                ("h".as_bytes().to_vec(), Operation::Put),
778                ("h".as_bytes().to_vec()),
779            ),
780            (
781                ("i".as_bytes().to_vec(), Operation::Put),
782                ("i".as_bytes().to_vec()),
783            ),
784            (
785                ("j".as_bytes().to_vec(), Operation::Put),
786                ("j".as_bytes().to_vec()),
787            ),
788        ];
789        let table_file_meta = create_table(db_options.clone(), entries, 85, 59);
790        version.files[1].push(Arc::new(table_file_meta));
791
792        let entries = vec![
793            (
794                ("o".as_bytes().to_vec(), Operation::Put),
795                ("o".as_bytes().to_vec()),
796            ),
797            (
798                ("r".as_bytes().to_vec(), Operation::Put),
799                ("r".as_bytes().to_vec()),
800            ),
801            (
802                ("s".as_bytes().to_vec(), Operation::Put),
803                ("s".as_bytes().to_vec()),
804            ),
805            (
806                ("t".as_bytes().to_vec(), Operation::Put),
807                ("t".as_bytes().to_vec()),
808            ),
809            (
810                ("u".as_bytes().to_vec(), Operation::Put),
811                ("u".as_bytes().to_vec()),
812            ),
813        ];
814        let table_file_meta = create_table(db_options.clone(), entries, 80, 58);
815        version.files[1].push(Arc::new(table_file_meta));
816
817        let entries = vec![
818            (
819                ("v".as_bytes().to_vec(), Operation::Put),
820                ("v".as_bytes().to_vec()),
821            ),
822            (
823                ("w".as_bytes().to_vec(), Operation::Put),
824                ("w".as_bytes().to_vec()),
825            ),
826            (("x".as_bytes().to_vec(), Operation::Delete), vec![]),
827            (
828                ("y".as_bytes().to_vec(), Operation::Put),
829                ("y".as_bytes().to_vec()),
830            ),
831        ];
832        let table_file_meta = create_table(db_options, entries, 76, 57);
833        version.files[1].push(Arc::new(table_file_meta));
834
835        version
836    }
837
838    /**
839    Create a table with the provided entries (key-value pairs) with sequence numbers starting
840    from the provided start point.
841    */
842    fn create_table(
843        db_options: DbOptions,
844        entries: Vec<((Vec<u8>, Operation), Vec<u8>)>,
845        starting_sequence_num: u64,
846        file_number: u64,
847    ) -> FileMetadata {
848        let smallest_key = InternalKey::new(
849            entries.first().unwrap().0 .0.clone(),
850            starting_sequence_num,
851            entries.first().unwrap().0 .1,
852        );
853        let largest_key = InternalKey::new(
854            entries.last().unwrap().0 .0.clone(),
855            starting_sequence_num + (entries.len() as u64) - 1,
856            entries.last().unwrap().0 .1,
857        );
858
859        let mut table_builder = TableBuilder::new(db_options, file_number).unwrap();
860        let mut curr_sequence_num = starting_sequence_num;
861        for ((user_key, operation), value) in entries {
862            table_builder
863                .add_entry(
864                    Rc::new(InternalKey::new(user_key, curr_sequence_num, operation)),
865                    &value,
866                )
867                .unwrap();
868            curr_sequence_num += 1;
869        }
870
871        table_builder.finalize().unwrap();
872
873        let mut file_meta = FileMetadata::new(file_number);
874        file_meta.set_smallest_key(Some(smallest_key));
875        file_meta.set_largest_key(Some(largest_key));
876        file_meta.set_file_size(table_builder.file_size());
877
878        file_meta
879    }
880}
881
882#[cfg(test)]
883mod merging_iterator_tests {
884    use pretty_assertions::assert_eq;
885    use std::cell::RefCell;
886    use std::rc::Rc;
887
888    use crate::tables::TableBuilder;
889    use crate::versioning::version::Version;
890    use crate::{DbOptions, Operation};
891
892    use super::*;
893
894    #[test]
895    fn with_an_empty_list_of_iterators_does_not_become_valid() {
896        let mut iter = MergingIterator::new(vec![]);
897
898        assert!(!iter.is_valid());
899        assert!(iter.next().is_none());
900        assert!(iter.prev().is_none());
901        assert!(iter
902            .seek(&InternalKey::new(
903                "a".as_bytes().to_vec(),
904                100,
905                Operation::Put
906            ))
907            .is_ok());
908        assert!(iter.current().is_none());
909        assert!(iter.seek_to_first().is_ok());
910        assert!(iter.current().is_none());
911        assert!(iter.seek_to_last().is_ok());
912        assert!(iter.current().is_none());
913        assert!(!iter.is_valid());
914    }
915
916    #[test]
917    fn can_be_iterated_forward_completely() {
918        let options = DbOptions::with_memory_env();
919        let table_cache = Arc::new(TableCache::new(options.clone(), 1000));
920        let version = create_test_version(options, &table_cache);
921        let version_iterators = version
922            .get_representative_iterators(&ReadOptions::default())
923            .unwrap();
924        let mut iter = MergingIterator::new(version_iterators);
925
926        assert!(iter.seek_to_first().is_ok());
927        let (actual_key, _) = iter.current().unwrap();
928        assert_eq!(
929            actual_key,
930            &InternalKey::new("a".as_bytes().to_vec(), 90, Operation::Put)
931        );
932
933        let mut expected_value_counter: usize = 2;
934        while iter.next().is_some() {
935            let (current_key, current_val) = iter.current().unwrap();
936            if current_key.get_operation() == Operation::Put {
937                assert_eq!(
938                    current_val,
939                    expected_value_counter.to_string().as_bytes(),
940                    "Expecting value to be {} but got {}",
941                    expected_value_counter.to_string(),
942                    String::from_utf8_lossy(current_val)
943                );
944            }
945
946            expected_value_counter += 1;
947        }
948
949        assert!(
950            iter.next().is_none(),
951            "Calling `next` after consuming all the values should not return a value"
952        );
953        assert!(
954            !iter.is_valid(),
955            "The block iterator should not be valid after moving past the end of the iterator"
956        );
957    }
958
959    #[test]
960    fn can_be_iterated_backward_completely() {
961        let options = DbOptions::with_memory_env();
962        let table_cache = Arc::new(TableCache::new(options.clone(), 1000));
963        let version = create_test_version(options, &table_cache);
964        let version_iterators = version
965            .get_representative_iterators(&ReadOptions::default())
966            .unwrap();
967        let mut iter = MergingIterator::new(version_iterators);
968
969        assert!(iter.seek_to_last().is_ok());
970        let (actual_key, _) = iter.current().unwrap();
971        assert_eq!(
972            actual_key,
973            &InternalKey::new("z7".as_bytes().to_vec(), 51, Operation::Put)
974        );
975
976        let mut expected_value_counter: usize = 43;
977        while iter.prev().is_some() {
978            let (current_key, current_val) = iter.current().unwrap();
979            if current_key.get_operation() == Operation::Put {
980                assert_eq!(
981                    current_val,
982                    expected_value_counter.to_string().as_bytes(),
983                    "Expecting value to be {} but got {}",
984                    expected_value_counter.to_string(),
985                    String::from_utf8_lossy(current_val)
986                );
987            }
988
989            expected_value_counter -= 1;
990        }
991
992        assert!(
993            iter.prev().is_none(),
994            "Calling `prev` after consuming all the values should not return a value"
995        );
996        assert!(
997            !iter.is_valid(),
998            "The block iterator should not be valid after moving past the end of the iterator"
999        );
1000    }
1001
1002    #[test]
1003    fn can_seek_randomly() {
1004        let options = DbOptions::with_memory_env();
1005        let table_cache = Arc::new(TableCache::new(options.clone(), 1000));
1006        let version = create_test_version(options, &table_cache);
1007        let version_iterators = version
1008            .get_representative_iterators(&ReadOptions::default())
1009            .unwrap();
1010        let mut iter = MergingIterator::new(version_iterators);
1011
1012        assert!(
1013            !iter.is_valid(),
1014            "The iterator should not be valid until its first seek"
1015        );
1016        assert!(iter
1017            .seek(&InternalKey::new(
1018                "h".as_bytes().to_vec(),
1019                100,
1020                Operation::Put
1021            ))
1022            .is_ok());
1023        let (actual_key, actual_val) = iter.current().unwrap();
1024        assert_eq!(
1025            actual_key,
1026            &InternalKey::new("h".as_bytes().to_vec(), 86, Operation::Put)
1027        );
1028        assert_eq!(actual_val, "14".as_bytes());
1029
1030        assert!(
1031            iter.next().is_some(),
1032            "Expected to seek to next after a random seek"
1033        );
1034        let (actual_key, _) = iter.current().unwrap();
1035        assert_eq!(
1036            actual_key,
1037            &InternalKey::new("i".as_bytes().to_vec(), 87, Operation::Put)
1038        );
1039
1040        assert!(
1041            iter.seek(&InternalKey::new(
1042                "z4".as_bytes().to_vec(),
1043                50,
1044                Operation::Put
1045            ))
1046            .is_ok(),
1047            "Expected to seek after random iteration"
1048        );
1049        let (actual_key, actual_val) = iter.current().unwrap();
1050        assert_eq!(
1051            actual_key,
1052            &InternalKey::new("z4".as_bytes().to_vec(), 48, Operation::Put)
1053        );
1054        assert_eq!(actual_val, "41".as_bytes());
1055    }
1056
1057    #[test]
1058    fn can_iterate_backwards_after_iterating_forward() {
1059        let options = DbOptions::with_memory_env();
1060        let table_cache = Arc::new(TableCache::new(options.clone(), 1000));
1061        let version = create_test_version(options, &table_cache);
1062        let version_iterators = version
1063            .get_representative_iterators(&ReadOptions::default())
1064            .unwrap();
1065        let mut iter = MergingIterator::new(version_iterators);
1066
1067        assert!(iter
1068            .seek(&InternalKey::new(
1069                "f1".as_bytes().to_vec(),
1070                200,
1071                Operation::Put
1072            ))
1073            .is_ok());
1074        let (actual_key, _) = iter.current().unwrap();
1075        assert_eq!(
1076            actual_key,
1077            &InternalKey::new("f1".as_bytes().to_vec(), 106, Operation::Put)
1078        );
1079
1080        assert!(iter.next().is_some());
1081        let (actual_key, _) = iter.current().unwrap();
1082        assert_eq!(
1083            actual_key,
1084            &InternalKey::new("f2".as_bytes().to_vec(), 107, Operation::Put),
1085            "Expected to seek to next after a random seek"
1086        );
1087
1088        assert!(iter.prev().is_some());
1089        let (actual_key, _) = iter.current().unwrap();
1090        assert_eq!(
1091            actual_key,
1092            &InternalKey::new("f1".as_bytes().to_vec(), 106, Operation::Put),
1093            "Expected to seek to previous from an iterator that was iterating forward"
1094        );
1095
1096        assert!(iter.prev().is_some());
1097        let (actual_key, _) = iter.current().unwrap();
1098        assert_eq!(
1099            actual_key,
1100            &InternalKey::new("f".as_bytes().to_vec(), 103, Operation::Put),
1101            "Expected other child iterators to be correctly set after changing iteration direction"
1102        );
1103    }
1104
1105    #[test]
1106    fn can_iterate_forwards_after_iterating_backward() {
1107        let options = DbOptions::with_memory_env();
1108        let table_cache = Arc::new(TableCache::new(options.clone(), 1000));
1109        let version = create_test_version(options, &table_cache);
1110        let version_iterators = version
1111            .get_representative_iterators(&ReadOptions::default())
1112            .unwrap();
1113        let mut iter = MergingIterator::new(version_iterators);
1114
1115        assert!(iter
1116            .seek(&InternalKey::new(
1117                "y".as_bytes().to_vec(),
1118                58,
1119                Operation::Put
1120            ))
1121            .is_ok());
1122        let (actual_key, _) = iter.current().unwrap();
1123        assert_eq!(
1124            actual_key,
1125            &InternalKey::new("y".as_bytes().to_vec(), 58, Operation::Put)
1126        );
1127
1128        assert!(iter.prev().is_some());
1129        let (actual_key, _) = iter.current().unwrap();
1130        assert_eq!(
1131            actual_key,
1132            &InternalKey::new("y".as_bytes().to_vec(), 79, Operation::Put),
1133            "Expected to seek to prev after a random seek"
1134        );
1135
1136        assert!(iter.next().is_some());
1137        let (actual_key, _) = iter.current().unwrap();
1138        assert_eq!(
1139            actual_key,
1140            &InternalKey::new("y".as_bytes().to_vec(), 58, Operation::Put),
1141            "Expected to seek to previous from an iterator that was iterating forward"
1142        );
1143
1144        assert!(iter.next().is_some());
1145        let (actual_key, _) = iter.current().unwrap();
1146        assert_eq!(
1147            actual_key,
1148            &InternalKey::new("z1".as_bytes().to_vec(), 45, Operation::Put),
1149            "Expected other child iterators to be correctly set after changing iteration direction"
1150        );
1151    }
1152
1153    #[test]
1154    fn calls_cleanup_callbacks_when_dropped() {
1155        let options = DbOptions::with_memory_env();
1156        let table_cache = Arc::new(TableCache::new(options.clone(), 1000));
1157        let version = create_test_version(options, &table_cache);
1158        let version_iterators = version
1159            .get_representative_iterators(&ReadOptions::default())
1160            .unwrap();
1161        let num_times_callback_called: Rc<RefCell<usize>> = Rc::new(RefCell::new(0));
1162        let cloned_counter = Rc::clone(&num_times_callback_called);
1163        let callback = move || {
1164            *cloned_counter.borrow_mut() += 1;
1165        };
1166        let mut iter = MergingIterator::new(version_iterators);
1167
1168        iter.register_cleanup_method(Box::new(callback));
1169        drop(iter);
1170
1171        assert_eq!(
1172            *num_times_callback_called.borrow(),
1173            1,
1174            "Expected cleanup callbacks to have been called once."
1175        );
1176    }
1177
1178    /// Creates version used to hold files for testing.
1179    fn create_test_version(db_options: DbOptions, table_cache: &Arc<TableCache>) -> Version {
1180        let mut version = Version::new(db_options.clone(), table_cache, 200, 30);
1181
1182        // Level 0 allows overlapping files
1183        let entries = vec![
1184            (
1185                ("a".as_bytes().to_vec(), Operation::Put),
1186                ("1".as_bytes().to_vec()),
1187            ),
1188            (
1189                ("b".as_bytes().to_vec(), Operation::Put),
1190                ("2".as_bytes().to_vec()),
1191            ),
1192            (
1193                ("c".as_bytes().to_vec(), Operation::Put),
1194                ("4".as_bytes().to_vec()),
1195            ),
1196            (
1197                ("d".as_bytes().to_vec(), Operation::Put),
1198                ("6".as_bytes().to_vec()),
1199            ),
1200        ];
1201        let table_file_meta = create_table(db_options.clone(), entries, 90, 60);
1202        version.files[0].push(Arc::new(table_file_meta));
1203
1204        let entries = vec![
1205            (
1206                ("c".as_bytes().to_vec(), Operation::Put),
1207                ("3".as_bytes().to_vec()),
1208            ),
1209            (
1210                ("d".as_bytes().to_vec(), Operation::Put),
1211                ("5".as_bytes().to_vec()),
1212            ),
1213            (
1214                ("e".as_bytes().to_vec(), Operation::Put),
1215                ("7".as_bytes().to_vec()),
1216            ),
1217            (
1218                ("f".as_bytes().to_vec(), Operation::Put),
1219                ("9".as_bytes().to_vec()),
1220            ),
1221        ];
1222        let table_file_meta = create_table(db_options.clone(), entries, 100, 61);
1223        version.files[0].push(Arc::new(table_file_meta));
1224
1225        let entries = vec![
1226            (
1227                ("f".as_bytes().to_vec(), Operation::Put),
1228                ("8".as_bytes().to_vec()),
1229            ),
1230            (
1231                ("f1".as_bytes().to_vec(), Operation::Put),
1232                ("10".as_bytes().to_vec()),
1233            ),
1234            (
1235                ("f2".as_bytes().to_vec(), Operation::Put),
1236                ("11".as_bytes().to_vec()),
1237            ),
1238            (
1239                ("f3".as_bytes().to_vec(), Operation::Put),
1240                ("12".as_bytes().to_vec()),
1241            ),
1242        ];
1243        let table_file_meta = create_table(db_options.clone(), entries, 105, 62);
1244        version.files[0].push(Arc::new(table_file_meta));
1245
1246        // Level 1
1247        let entries = vec![
1248            (
1249                ("g".as_bytes().to_vec(), Operation::Put),
1250                ("13".as_bytes().to_vec()),
1251            ),
1252            (
1253                ("h".as_bytes().to_vec(), Operation::Put),
1254                ("14".as_bytes().to_vec()),
1255            ),
1256            (
1257                ("i".as_bytes().to_vec(), Operation::Put),
1258                ("15".as_bytes().to_vec()),
1259            ),
1260            (
1261                ("j".as_bytes().to_vec(), Operation::Put),
1262                ("16".as_bytes().to_vec()),
1263            ),
1264        ];
1265        let table_file_meta = create_table(db_options.clone(), entries, 85, 59);
1266        version.files[1].push(Arc::new(table_file_meta));
1267
1268        let entries = vec![
1269            (
1270                ("o".as_bytes().to_vec(), Operation::Put),
1271                ("21".as_bytes().to_vec()),
1272            ),
1273            (
1274                ("r".as_bytes().to_vec(), Operation::Put),
1275                ("24".as_bytes().to_vec()),
1276            ),
1277            (
1278                ("s".as_bytes().to_vec(), Operation::Put),
1279                ("26".as_bytes().to_vec()),
1280            ),
1281            (
1282                ("t".as_bytes().to_vec(), Operation::Put),
1283                ("28".as_bytes().to_vec()),
1284            ),
1285            (
1286                ("u".as_bytes().to_vec(), Operation::Put),
1287                ("29".as_bytes().to_vec()),
1288            ),
1289        ];
1290        let table_file_meta = create_table(db_options.clone(), entries, 80, 58);
1291        version.files[1].push(Arc::new(table_file_meta));
1292
1293        let entries = vec![
1294            (
1295                ("v".as_bytes().to_vec(), Operation::Put),
1296                ("30".as_bytes().to_vec()),
1297            ),
1298            (
1299                ("w".as_bytes().to_vec(), Operation::Put),
1300                ("32".as_bytes().to_vec()),
1301            ),
1302            (("x".as_bytes().to_vec(), Operation::Delete), vec![]),
1303            (
1304                ("y".as_bytes().to_vec(), Operation::Put),
1305                ("36".as_bytes().to_vec()),
1306            ),
1307        ];
1308        let table_file_meta = create_table(db_options.clone(), entries, 76, 57);
1309        version.files[1].push(Arc::new(table_file_meta));
1310
1311        // Level 2
1312        let entries = vec![
1313            (
1314                ("k".as_bytes().to_vec(), Operation::Put),
1315                ("17".as_bytes().to_vec()),
1316            ),
1317            (
1318                ("l".as_bytes().to_vec(), Operation::Put),
1319                ("18".as_bytes().to_vec()),
1320            ),
1321            (
1322                ("m".as_bytes().to_vec(), Operation::Put),
1323                ("19".as_bytes().to_vec()),
1324            ),
1325            (
1326                ("n".as_bytes().to_vec(), Operation::Put),
1327                ("20".as_bytes().to_vec()),
1328            ),
1329        ];
1330        let table_file_meta = create_table(db_options.clone(), entries, 65, 55);
1331        version.files[2].push(Arc::new(table_file_meta));
1332
1333        let entries = vec![
1334            (
1335                ("o".as_bytes().to_vec(), Operation::Put),
1336                ("22".as_bytes().to_vec()),
1337            ),
1338            (
1339                ("p".as_bytes().to_vec(), Operation::Put),
1340                ("23".as_bytes().to_vec()),
1341            ),
1342            (
1343                ("r".as_bytes().to_vec(), Operation::Put),
1344                ("25".as_bytes().to_vec()),
1345            ),
1346            (
1347                ("s".as_bytes().to_vec(), Operation::Put),
1348                ("27".as_bytes().to_vec()),
1349            ),
1350        ];
1351        let table_file_meta = create_table(db_options.clone(), entries, 60, 54);
1352        version.files[2].push(Arc::new(table_file_meta));
1353
1354        let entries = vec![
1355            (
1356                ("v".as_bytes().to_vec(), Operation::Put),
1357                ("31".as_bytes().to_vec()),
1358            ),
1359            (
1360                ("w".as_bytes().to_vec(), Operation::Put),
1361                ("33".as_bytes().to_vec()),
1362            ),
1363            (
1364                ("x".as_bytes().to_vec(), Operation::Put),
1365                ("35".as_bytes().to_vec()),
1366            ),
1367            (
1368                ("y".as_bytes().to_vec(), Operation::Put),
1369                ("37".as_bytes().to_vec()),
1370            ),
1371        ];
1372        let table_file_meta = create_table(db_options.clone(), entries, 55, 53);
1373        version.files[2].push(Arc::new(table_file_meta));
1374
1375        // Level 3
1376        let entries = vec![
1377            (
1378                ("z1".as_bytes().to_vec(), Operation::Put),
1379                ("38".as_bytes().to_vec()),
1380            ),
1381            (
1382                ("z2".as_bytes().to_vec(), Operation::Put),
1383                ("39".as_bytes().to_vec()),
1384            ),
1385        ];
1386        let table_file_meta = create_table(db_options.clone(), entries, 45, 52);
1387        version.files[3].push(Arc::new(table_file_meta));
1388
1389        let entries = vec![
1390            (
1391                ("z3".as_bytes().to_vec(), Operation::Put),
1392                ("40".as_bytes().to_vec()),
1393            ),
1394            (
1395                ("z4".as_bytes().to_vec(), Operation::Put),
1396                ("41".as_bytes().to_vec()),
1397            ),
1398        ];
1399        let table_file_meta = create_table(db_options.clone(), entries, 47, 51);
1400        version.files[3].push(Arc::new(table_file_meta));
1401
1402        let entries = vec![
1403            (
1404                ("z5".as_bytes().to_vec(), Operation::Put),
1405                ("42".as_bytes().to_vec()),
1406            ),
1407            (
1408                ("z6".as_bytes().to_vec(), Operation::Put),
1409                ("43".as_bytes().to_vec()),
1410            ),
1411            (
1412                ("z7".as_bytes().to_vec(), Operation::Put),
1413                ("44".as_bytes().to_vec()),
1414            ),
1415        ];
1416        let table_file_meta = create_table(db_options, entries, 49, 50);
1417        version.files[3].push(Arc::new(table_file_meta));
1418
1419        version
1420    }
1421
1422    /**
1423    Create a table with the provided entries (key-value pairs) with sequence numbers starting
1424    from the provided start point.
1425    */
1426    fn create_table(
1427        db_options: DbOptions,
1428        entries: Vec<((Vec<u8>, Operation), Vec<u8>)>,
1429        starting_sequence_num: u64,
1430        file_number: u64,
1431    ) -> FileMetadata {
1432        let smallest_key = InternalKey::new(
1433            entries.first().unwrap().0 .0.clone(),
1434            starting_sequence_num,
1435            entries.first().unwrap().0 .1,
1436        );
1437        let largest_key = InternalKey::new(
1438            entries.last().unwrap().0 .0.clone(),
1439            starting_sequence_num + (entries.len() as u64) - 1,
1440            entries.last().unwrap().0 .1,
1441        );
1442
1443        let mut table_builder = TableBuilder::new(db_options, file_number).unwrap();
1444        let mut curr_sequence_num = starting_sequence_num;
1445        for ((user_key, operation), value) in entries {
1446            table_builder
1447                .add_entry(
1448                    Rc::new(InternalKey::new(user_key, curr_sequence_num, operation)),
1449                    &value,
1450                )
1451                .unwrap();
1452            curr_sequence_num += 1;
1453        }
1454
1455        table_builder.finalize().unwrap();
1456
1457        let mut file_meta = FileMetadata::new(file_number);
1458        file_meta.set_smallest_key(Some(smallest_key));
1459        file_meta.set_largest_key(Some(largest_key));
1460        file_meta.set_file_size(table_builder.file_size());
1461
1462        file_meta
1463    }
1464}