rocksdb_fileformat/
iterator.rs

1use crate::data_block::DataBlockReader;
2use crate::error::Result;
3use crate::index_block::IndexBlock;
4use crate::sst_reader::SstReader;
5use crate::types::CompressionType;
6
7pub trait SstIterator {
8    fn seek_to_first(&mut self) -> Result<()>;
9    fn seek_to_last(&mut self) -> Result<()>;
10    fn seek(&mut self, key: &[u8]) -> Result<()>;
11    fn next(&mut self) -> Result<bool>;
12    fn prev(&mut self) -> Result<bool>;
13    fn valid(&self) -> bool;
14    fn key(&self) -> Option<&[u8]>;
15    fn value(&self) -> Option<&[u8]>;
16}
17
18pub struct SstTableIterator {
19    sst_reader: SstReader,
20    index_block: IndexBlock,
21    current_data_block: Option<DataBlockReader>,
22    current_block_index: usize,
23    all_block_handles: Vec<crate::block_handle::BlockHandle>,
24    compression_type: CompressionType,
25    valid: bool,
26}
27
28impl SstTableIterator {
29    pub fn new(mut sst_reader: SstReader, compression_type: CompressionType) -> Result<Self> {
30        let footer = sst_reader.get_footer();
31        let index_data = sst_reader.read_block(footer.index_handle.clone())?;
32        let index_block = IndexBlock::new(&index_data, CompressionType::None)?;
33        let all_block_handles = index_block.get_all_block_handles()?;
34
35        Ok(SstTableIterator {
36            sst_reader,
37            index_block,
38            current_data_block: None,
39            current_block_index: 0,
40            all_block_handles,
41            compression_type,
42            valid: false,
43        })
44    }
45
46    fn load_data_block(&mut self, block_index: usize) -> Result<()> {
47        if block_index >= self.all_block_handles.len() {
48            self.current_data_block = None;
49            self.valid = false;
50            return Ok(());
51        }
52
53        let block_handle = self.all_block_handles[block_index].clone();
54        let data_block_reader = self
55            .sst_reader
56            .read_data_block_reader(block_handle, self.compression_type)?;
57
58        self.current_data_block = Some(data_block_reader);
59        self.current_block_index = block_index;
60        Ok(())
61    }
62
63    pub fn entries_count(&self) -> usize {
64        match &self.current_data_block {
65            Some(reader) => reader.entries().len(),
66            None => 0,
67        }
68    }
69
70    pub fn block_count(&self) -> usize {
71        self.all_block_handles.len()
72    }
73}
74
75impl SstIterator for SstTableIterator {
76    fn seek_to_first(&mut self) -> Result<()> {
77        if self.all_block_handles.is_empty() {
78            self.valid = false;
79            return Ok(());
80        }
81
82        self.load_data_block(0)?;
83
84        if let Some(ref mut data_block) = self.current_data_block {
85            data_block.seek_to_first();
86            self.valid = data_block.valid();
87        } else {
88            self.valid = false;
89        }
90
91        Ok(())
92    }
93
94    fn seek_to_last(&mut self) -> Result<()> {
95        if self.all_block_handles.is_empty() {
96            self.valid = false;
97            return Ok(());
98        }
99
100        let last_block_index = self.all_block_handles.len() - 1;
101        self.load_data_block(last_block_index)?;
102
103        if let Some(ref mut data_block) = self.current_data_block {
104            while data_block.next().is_some() {}
105            self.valid = false;
106
107            let entries_len = data_block.entries().len();
108            if entries_len > 0 {
109                data_block.seek_to_first();
110                for _ in 1..entries_len {
111                    data_block.next();
112                }
113                self.valid = data_block.valid();
114            }
115        } else {
116            self.valid = false;
117        }
118
119        Ok(())
120    }
121
122    fn seek(&mut self, target_key: &[u8]) -> Result<()> {
123        let block_handle = self.index_block.find_block_for_key(target_key)?;
124
125        if let Some(handle) = block_handle {
126            if let Some(block_index) = self
127                .all_block_handles
128                .iter()
129                .position(|h| h.offset == handle.offset && h.size == handle.size)
130            {
131                self.load_data_block(block_index)?;
132
133                if let Some(ref mut data_block) = self.current_data_block {
134                    self.valid = data_block.seek(target_key);
135                } else {
136                    self.valid = false;
137                }
138            } else {
139                self.valid = false;
140            }
141        } else {
142            self.valid = false;
143        }
144
145        Ok(())
146    }
147
148    fn next(&mut self) -> Result<bool> {
149        if !self.valid {
150            return Ok(false);
151        }
152
153        if let Some(ref mut data_block) = self.current_data_block {
154            if data_block.next().is_some() && data_block.valid() {
155                return Ok(true);
156            }
157
158            let next_block_index = self.current_block_index + 1;
159            if next_block_index < self.all_block_handles.len() {
160                self.load_data_block(next_block_index)?;
161
162                if let Some(ref mut new_data_block) = self.current_data_block {
163                    new_data_block.seek_to_first();
164                    self.valid = new_data_block.valid();
165                    return Ok(self.valid);
166                }
167            }
168        }
169
170        self.valid = false;
171        Ok(false)
172    }
173
174    fn prev(&mut self) -> Result<bool> {
175        if !self.valid {
176            return Ok(false);
177        }
178
179        if self.current_block_index > 0 {
180            let prev_block_index = self.current_block_index - 1;
181            self.load_data_block(prev_block_index)?;
182
183            if let Some(ref mut data_block) = self.current_data_block {
184                data_block.seek_to_first();
185                while data_block.next().is_some() {}
186
187                let entries_len = data_block.entries().len();
188                if entries_len > 0 {
189                    data_block.seek_to_first();
190                    for _ in 1..entries_len {
191                        data_block.next();
192                    }
193                    self.valid = data_block.valid();
194                    return Ok(self.valid);
195                }
196            }
197        }
198
199        self.valid = false;
200        Ok(false)
201    }
202
203    fn valid(&self) -> bool {
204        self.valid
205    }
206
207    fn key(&self) -> Option<&[u8]> {
208        if self.valid {
209            self.current_data_block.as_ref()?.key()
210        } else {
211            None
212        }
213    }
214
215    fn value(&self) -> Option<&[u8]> {
216        if self.valid {
217            self.current_data_block.as_ref()?.value()
218        } else {
219            None
220        }
221    }
222}
223
224pub struct SstEntryIterator {
225    iterator: SstTableIterator,
226}
227
228impl SstEntryIterator {
229    pub fn new(sst_reader: SstReader, compression_type: CompressionType) -> Result<Self> {
230        let iterator = SstTableIterator::new(sst_reader, compression_type)?;
231        Ok(SstEntryIterator { iterator })
232    }
233
234    pub fn collect_all(&mut self) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
235        let mut entries = Vec::new();
236        self.iterator.seek_to_first()?;
237
238        while self.iterator.valid() {
239            if let (Some(key), Some(value)) = (self.iterator.key(), self.iterator.value()) {
240                entries.push((key.to_vec(), value.to_vec()));
241            }
242            if !self.iterator.next()? {
243                break;
244            }
245        }
246
247        Ok(entries)
248    }
249
250    pub fn find(&mut self, target_key: &[u8]) -> Result<Option<Vec<u8>>> {
251        self.iterator.seek(target_key)?;
252
253        if self.iterator.valid() {
254            if let Some(key) = self.iterator.key() {
255                if key == target_key {
256                    return Ok(self.iterator.value().map(|v| v.to_vec()));
257                }
258            }
259        }
260
261        Ok(None)
262    }
263
264    pub fn entries_count(&self) -> usize {
265        self.iterator.entries_count()
266    }
267
268    pub fn block_count(&self) -> usize {
269        self.iterator.block_count()
270    }
271}
272
273impl Iterator for SstEntryIterator {
274    type Item = Result<(Vec<u8>, Vec<u8>)>;
275
276    fn next(&mut self) -> Option<Self::Item> {
277        if self.iterator.valid() {
278            let result = match (self.iterator.key(), self.iterator.value()) {
279                (Some(key), Some(value)) => {
280                    let entry = Ok((key.to_vec(), value.to_vec()));
281                    match self.iterator.next() {
282                        Ok(_) => Some(entry),
283                        Err(e) => Some(Err(e)),
284                    }
285                }
286                _ => None,
287            };
288
289            result
290        } else {
291            None
292        }
293    }
294}