rocksdb_fileformat/
iterator.rs1use crate::data_block::DataBlockReader;
2use crate::error::Result;
3use crate::index_block::IndexBlock;
4use crate::sst_reader::SstReader;
5use crate::types::CompressionType;
6
7pub trait SstIterator {
8 fn seek_to_first(&mut self) -> Result<()>;
9 fn seek_to_last(&mut self) -> Result<()>;
10 fn seek(&mut self, key: &[u8]) -> Result<()>;
11 fn next(&mut self) -> Result<bool>;
12 fn prev(&mut self) -> Result<bool>;
13 fn valid(&self) -> bool;
14 fn key(&self) -> Option<&[u8]>;
15 fn value(&self) -> Option<&[u8]>;
16}
17
18pub struct SstTableIterator {
19 sst_reader: SstReader,
20 index_block: IndexBlock,
21 current_data_block: Option<DataBlockReader>,
22 current_block_index: usize,
23 all_block_handles: Vec<crate::block_handle::BlockHandle>,
24 compression_type: CompressionType,
25 valid: bool,
26}
27
28impl SstTableIterator {
29 pub fn new(mut sst_reader: SstReader, compression_type: CompressionType) -> Result<Self> {
30 let footer = sst_reader.get_footer();
31 let index_data = sst_reader.read_block(footer.index_handle.clone())?;
32 let index_block = IndexBlock::new(&index_data, CompressionType::None)?;
33 let all_block_handles = index_block.get_all_block_handles()?;
34
35 Ok(SstTableIterator {
36 sst_reader,
37 index_block,
38 current_data_block: None,
39 current_block_index: 0,
40 all_block_handles,
41 compression_type,
42 valid: false,
43 })
44 }
45
46 fn load_data_block(&mut self, block_index: usize) -> Result<()> {
47 if block_index >= self.all_block_handles.len() {
48 self.current_data_block = None;
49 self.valid = false;
50 return Ok(());
51 }
52
53 let block_handle = self.all_block_handles[block_index].clone();
54 let data_block_reader = self
55 .sst_reader
56 .read_data_block_reader(block_handle, self.compression_type)?;
57
58 self.current_data_block = Some(data_block_reader);
59 self.current_block_index = block_index;
60 Ok(())
61 }
62
63 pub fn entries_count(&self) -> usize {
64 match &self.current_data_block {
65 Some(reader) => reader.entries().len(),
66 None => 0,
67 }
68 }
69
70 pub fn block_count(&self) -> usize {
71 self.all_block_handles.len()
72 }
73}
74
75impl SstIterator for SstTableIterator {
76 fn seek_to_first(&mut self) -> Result<()> {
77 if self.all_block_handles.is_empty() {
78 self.valid = false;
79 return Ok(());
80 }
81
82 self.load_data_block(0)?;
83
84 if let Some(ref mut data_block) = self.current_data_block {
85 data_block.seek_to_first();
86 self.valid = data_block.valid();
87 } else {
88 self.valid = false;
89 }
90
91 Ok(())
92 }
93
94 fn seek_to_last(&mut self) -> Result<()> {
95 if self.all_block_handles.is_empty() {
96 self.valid = false;
97 return Ok(());
98 }
99
100 let last_block_index = self.all_block_handles.len() - 1;
101 self.load_data_block(last_block_index)?;
102
103 if let Some(ref mut data_block) = self.current_data_block {
104 while data_block.next().is_some() {}
105 self.valid = false;
106
107 let entries_len = data_block.entries().len();
108 if entries_len > 0 {
109 data_block.seek_to_first();
110 for _ in 1..entries_len {
111 data_block.next();
112 }
113 self.valid = data_block.valid();
114 }
115 } else {
116 self.valid = false;
117 }
118
119 Ok(())
120 }
121
122 fn seek(&mut self, target_key: &[u8]) -> Result<()> {
123 let block_handle = self.index_block.find_block_for_key(target_key)?;
124
125 if let Some(handle) = block_handle {
126 if let Some(block_index) = self
127 .all_block_handles
128 .iter()
129 .position(|h| h.offset == handle.offset && h.size == handle.size)
130 {
131 self.load_data_block(block_index)?;
132
133 if let Some(ref mut data_block) = self.current_data_block {
134 self.valid = data_block.seek(target_key);
135 } else {
136 self.valid = false;
137 }
138 } else {
139 self.valid = false;
140 }
141 } else {
142 self.valid = false;
143 }
144
145 Ok(())
146 }
147
148 fn next(&mut self) -> Result<bool> {
149 if !self.valid {
150 return Ok(false);
151 }
152
153 if let Some(ref mut data_block) = self.current_data_block {
154 if data_block.next().is_some() && data_block.valid() {
155 return Ok(true);
156 }
157
158 let next_block_index = self.current_block_index + 1;
159 if next_block_index < self.all_block_handles.len() {
160 self.load_data_block(next_block_index)?;
161
162 if let Some(ref mut new_data_block) = self.current_data_block {
163 new_data_block.seek_to_first();
164 self.valid = new_data_block.valid();
165 return Ok(self.valid);
166 }
167 }
168 }
169
170 self.valid = false;
171 Ok(false)
172 }
173
174 fn prev(&mut self) -> Result<bool> {
175 if !self.valid {
176 return Ok(false);
177 }
178
179 if self.current_block_index > 0 {
180 let prev_block_index = self.current_block_index - 1;
181 self.load_data_block(prev_block_index)?;
182
183 if let Some(ref mut data_block) = self.current_data_block {
184 data_block.seek_to_first();
185 while data_block.next().is_some() {}
186
187 let entries_len = data_block.entries().len();
188 if entries_len > 0 {
189 data_block.seek_to_first();
190 for _ in 1..entries_len {
191 data_block.next();
192 }
193 self.valid = data_block.valid();
194 return Ok(self.valid);
195 }
196 }
197 }
198
199 self.valid = false;
200 Ok(false)
201 }
202
203 fn valid(&self) -> bool {
204 self.valid
205 }
206
207 fn key(&self) -> Option<&[u8]> {
208 if self.valid {
209 self.current_data_block.as_ref()?.key()
210 } else {
211 None
212 }
213 }
214
215 fn value(&self) -> Option<&[u8]> {
216 if self.valid {
217 self.current_data_block.as_ref()?.value()
218 } else {
219 None
220 }
221 }
222}
223
224pub struct SstEntryIterator {
225 iterator: SstTableIterator,
226}
227
228impl SstEntryIterator {
229 pub fn new(sst_reader: SstReader, compression_type: CompressionType) -> Result<Self> {
230 let iterator = SstTableIterator::new(sst_reader, compression_type)?;
231 Ok(SstEntryIterator { iterator })
232 }
233
234 pub fn collect_all(&mut self) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
235 let mut entries = Vec::new();
236 self.iterator.seek_to_first()?;
237
238 while self.iterator.valid() {
239 if let (Some(key), Some(value)) = (self.iterator.key(), self.iterator.value()) {
240 entries.push((key.to_vec(), value.to_vec()));
241 }
242 if !self.iterator.next()? {
243 break;
244 }
245 }
246
247 Ok(entries)
248 }
249
250 pub fn find(&mut self, target_key: &[u8]) -> Result<Option<Vec<u8>>> {
251 self.iterator.seek(target_key)?;
252
253 if self.iterator.valid() {
254 if let Some(key) = self.iterator.key() {
255 if key == target_key {
256 return Ok(self.iterator.value().map(|v| v.to_vec()));
257 }
258 }
259 }
260
261 Ok(None)
262 }
263
264 pub fn entries_count(&self) -> usize {
265 self.iterator.entries_count()
266 }
267
268 pub fn block_count(&self) -> usize {
269 self.iterator.block_count()
270 }
271}
272
273impl Iterator for SstEntryIterator {
274 type Item = Result<(Vec<u8>, Vec<u8>)>;
275
276 fn next(&mut self) -> Option<Self::Item> {
277 if self.iterator.valid() {
278 let result = match (self.iterator.key(), self.iterator.value()) {
279 (Some(key), Some(value)) => {
280 let entry = Ok((key.to_vec(), value.to_vec()));
281 match self.iterator.next() {
282 Ok(_) => Some(entry),
283 Err(e) => Some(Err(e)),
284 }
285 }
286 _ => None,
287 };
288
289 result
290 } else {
291 None
292 }
293 }
294}