mini_lsm/iterators/
concat_iterator.rs

1use std::sync::Arc;
2
3use anyhow::Result;
4
5use crate::{
6    key::KeySlice,
7    table::{SsTable, SsTableIterator},
8};
9
10use super::StorageIterator;
11
12/// Concat multiple iterators ordered in key order and their key ranges do not overlap. We do not want to create the
13/// iterators when initializing this iterator to reduce the overhead of seeking.
14pub struct SstConcatIterator {
15    current: Option<SsTableIterator>,
16    next_sst_idx: usize,
17    sstables: Vec<Arc<SsTable>>,
18}
19
20impl SstConcatIterator {
21    fn check_sst_valid(sstables: &[Arc<SsTable>]) {
22        for sst in sstables {
23            assert!(sst.first_key() <= sst.last_key());
24        }
25        if !sstables.is_empty() {
26            for i in 0..(sstables.len() - 1) {
27                assert!(sstables[i].last_key() < sstables[i + 1].first_key());
28            }
29        }
30    }
31
32    pub fn create_and_seek_to_first(sstables: Vec<Arc<SsTable>>) -> Result<Self> {
33        Self::check_sst_valid(&sstables);
34        if sstables.is_empty() {
35            return Ok(Self {
36                current: None,
37                next_sst_idx: 0,
38                sstables,
39            });
40        }
41        let mut iter = Self {
42            current: Some(SsTableIterator::create_and_seek_to_first(
43                sstables[0].clone(),
44            )?),
45            next_sst_idx: 1,
46            sstables,
47        };
48        iter.move_until_valid()?;
49        Ok(iter)
50    }
51
52    pub fn create_and_seek_to_key(sstables: Vec<Arc<SsTable>>, key: KeySlice) -> Result<Self> {
53        Self::check_sst_valid(&sstables);
54        let idx: usize = sstables
55            .partition_point(|table| table.first_key().as_key_slice() <= key)
56            .saturating_sub(1);
57        if idx >= sstables.len() {
58            return Ok(Self {
59                current: None,
60                next_sst_idx: sstables.len(),
61                sstables,
62            });
63        }
64        let mut iter = Self {
65            current: Some(SsTableIterator::create_and_seek_to_key(
66                sstables[idx].clone(),
67                key,
68            )?),
69            next_sst_idx: idx + 1,
70            sstables,
71        };
72        iter.move_until_valid()?;
73        Ok(iter)
74    }
75
76    fn move_until_valid(&mut self) -> Result<()> {
77        while let Some(iter) = self.current.as_mut() {
78            if iter.is_valid() {
79                break;
80            }
81            if self.next_sst_idx >= self.sstables.len() {
82                self.current = None;
83            } else {
84                self.current = Some(SsTableIterator::create_and_seek_to_first(
85                    self.sstables[self.next_sst_idx].clone(),
86                )?);
87                self.next_sst_idx += 1;
88            }
89        }
90        Ok(())
91    }
92}
93
94impl StorageIterator for SstConcatIterator {
95    type KeyType<'a> = KeySlice<'a>;
96
97    fn key(&self) -> KeySlice {
98        self.current.as_ref().unwrap().key()
99    }
100
101    fn value(&self) -> &[u8] {
102        self.current.as_ref().unwrap().value()
103    }
104
105    fn is_valid(&self) -> bool {
106        if let Some(current) = &self.current {
107            assert!(current.is_valid());
108            true
109        } else {
110            false
111        }
112    }
113
114    fn next(&mut self) -> Result<()> {
115        self.current.as_mut().unwrap().next()?;
116        self.move_until_valid()?;
117        Ok(())
118    }
119
120    fn num_active_iterators(&self) -> usize {
121        1
122    }
123}