1use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
13use parking_lot::RwLock;
14use rustc_hash::FxHashMap;
15use std::io::{self, Read, Write};
16use std::sync::Arc;
17
18use crate::directories::{AsyncFileRead, LazyFileHandle, LazyFileSlice};
19
20pub const SSTABLE_MAGIC: u32 = 0x53544232; pub const BLOCK_SIZE: usize = 16 * 1024;
25
26pub const SPARSE_INDEX_INTERVAL: usize = 16;
30
31pub trait SSTableValue: Clone + Send + Sync {
33 fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()>;
34 fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self>;
35}
36
37impl SSTableValue for u64 {
39 fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
40 write_vint(writer, *self)
41 }
42
43 fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
44 read_vint(reader)
45 }
46}
47
48impl SSTableValue for Vec<u8> {
50 fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
51 write_vint(writer, self.len() as u64)?;
52 writer.write_all(self)
53 }
54
55 fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
56 let len = read_vint(reader)? as usize;
57 let mut data = vec![0u8; len];
58 reader.read_exact(&mut data)?;
59 Ok(data)
60 }
61}
62
63pub const MAX_INLINE_POSTINGS: usize = 3;
65
66#[derive(Debug, Clone, PartialEq, Eq)]
74pub enum TermInfo {
75 Inline {
78 doc_freq: u8,
80 data: [u8; 16],
83 data_len: u8,
85 },
86 External {
88 posting_offset: u64,
89 posting_len: u32,
90 doc_freq: u32,
91 },
92}
93
94impl TermInfo {
95 pub fn external(posting_offset: u64, posting_len: u32, doc_freq: u32) -> Self {
97 TermInfo::External {
98 posting_offset,
99 posting_len,
100 doc_freq,
101 }
102 }
103
104 pub fn try_inline(doc_ids: &[u32], term_freqs: &[u32]) -> Option<Self> {
107 if doc_ids.len() > MAX_INLINE_POSTINGS || doc_ids.is_empty() {
108 return None;
109 }
110
111 let mut data = [0u8; 16];
112 let mut cursor = std::io::Cursor::new(&mut data[..]);
113 let mut prev_doc_id = 0u32;
114
115 for (i, &doc_id) in doc_ids.iter().enumerate() {
116 let delta = doc_id - prev_doc_id;
117 if write_vint(&mut cursor, delta as u64).is_err() {
118 return None;
119 }
120 if write_vint(&mut cursor, term_freqs[i] as u64).is_err() {
121 return None;
122 }
123 prev_doc_id = doc_id;
124 }
125
126 let data_len = cursor.position() as u8;
127 if data_len > 16 {
128 return None;
129 }
130
131 Some(TermInfo::Inline {
132 doc_freq: doc_ids.len() as u8,
133 data,
134 data_len,
135 })
136 }
137
138 pub fn doc_freq(&self) -> u32 {
140 match self {
141 TermInfo::Inline { doc_freq, .. } => *doc_freq as u32,
142 TermInfo::External { doc_freq, .. } => *doc_freq,
143 }
144 }
145
146 pub fn is_inline(&self) -> bool {
148 matches!(self, TermInfo::Inline { .. })
149 }
150
151 pub fn external_info(&self) -> Option<(u64, u32)> {
153 match self {
154 TermInfo::External {
155 posting_offset,
156 posting_len,
157 ..
158 } => Some((*posting_offset, *posting_len)),
159 TermInfo::Inline { .. } => None,
160 }
161 }
162
163 pub fn decode_inline(&self) -> Option<(Vec<u32>, Vec<u32>)> {
166 match self {
167 TermInfo::Inline {
168 doc_freq,
169 data,
170 data_len,
171 } => {
172 let mut doc_ids = Vec::with_capacity(*doc_freq as usize);
173 let mut term_freqs = Vec::with_capacity(*doc_freq as usize);
174 let mut reader = &data[..*data_len as usize];
175 let mut prev_doc_id = 0u32;
176
177 for _ in 0..*doc_freq {
178 let delta = read_vint(&mut reader).ok()? as u32;
179 let tf = read_vint(&mut reader).ok()? as u32;
180 let doc_id = prev_doc_id + delta;
181 doc_ids.push(doc_id);
182 term_freqs.push(tf);
183 prev_doc_id = doc_id;
184 }
185
186 Some((doc_ids, term_freqs))
187 }
188 TermInfo::External { .. } => None,
189 }
190 }
191}
192
193impl SSTableValue for TermInfo {
194 fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
195 match self {
196 TermInfo::Inline {
197 doc_freq,
198 data,
199 data_len,
200 } => {
201 writer.write_u8(0xFF)?;
203 writer.write_u8(*doc_freq)?;
204 writer.write_u8(*data_len)?;
205 writer.write_all(&data[..*data_len as usize])?;
206 }
207 TermInfo::External {
208 posting_offset,
209 posting_len,
210 doc_freq,
211 } => {
212 writer.write_u8(0x00)?;
214 write_vint(writer, *doc_freq as u64)?;
215 write_vint(writer, *posting_offset)?;
216 write_vint(writer, *posting_len as u64)?;
217 }
218 }
219 Ok(())
220 }
221
222 fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
223 let tag = reader.read_u8()?;
224
225 if tag == 0xFF {
226 let doc_freq = reader.read_u8()?;
228 let data_len = reader.read_u8()?;
229 let mut data = [0u8; 16];
230 reader.read_exact(&mut data[..data_len as usize])?;
231 Ok(TermInfo::Inline {
232 doc_freq,
233 data,
234 data_len,
235 })
236 } else if tag == 0x00 {
237 let doc_freq = read_vint(reader)? as u32;
239 let posting_offset = read_vint(reader)?;
240 let posting_len = read_vint(reader)? as u32;
241 Ok(TermInfo::External {
242 posting_offset,
243 posting_len,
244 doc_freq,
245 })
246 } else {
247 Err(io::Error::new(
248 io::ErrorKind::InvalidData,
249 format!("Invalid TermInfo tag: {}", tag),
250 ))
251 }
252 }
253}
254
255pub fn write_vint<W: Write>(writer: &mut W, mut value: u64) -> io::Result<()> {
257 loop {
258 let byte = (value & 0x7F) as u8;
259 value >>= 7;
260 if value == 0 {
261 writer.write_u8(byte)?;
262 return Ok(());
263 } else {
264 writer.write_u8(byte | 0x80)?;
265 }
266 }
267}
268
269pub fn read_vint<R: Read>(reader: &mut R) -> io::Result<u64> {
271 let mut result = 0u64;
272 let mut shift = 0;
273
274 loop {
275 let byte = reader.read_u8()?;
276 result |= ((byte & 0x7F) as u64) << shift;
277 if byte & 0x80 == 0 {
278 return Ok(result);
279 }
280 shift += 7;
281 if shift >= 64 {
282 return Err(io::Error::new(
283 io::ErrorKind::InvalidData,
284 "varint too long",
285 ));
286 }
287 }
288}
289
290pub fn common_prefix_len(a: &[u8], b: &[u8]) -> usize {
292 a.iter().zip(b.iter()).take_while(|(x, y)| x == y).count()
293}
294
295#[derive(Debug, Clone)]
297struct SparseIndexEntry {
298 first_key: Vec<u8>,
300 block_idx: u32,
302}
303
304#[derive(Debug, Clone)]
306pub struct SSTableStats {
307 pub num_blocks: usize,
308 pub num_sparse_entries: usize,
309 pub num_entries: u64,
310}
311
312pub struct SSTableWriter<'a, V: SSTableValue> {
314 writer: &'a mut dyn Write,
315 block_buffer: Vec<u8>,
316 prev_key: Vec<u8>,
317 index: Vec<BlockIndexEntry>,
318 current_offset: u64,
319 num_entries: u64,
320 block_first_key: Option<Vec<u8>>,
321 _phantom: std::marker::PhantomData<V>,
322}
323
324impl<'a, V: SSTableValue> SSTableWriter<'a, V> {
325 pub fn new(writer: &'a mut dyn Write) -> Self {
326 Self {
327 writer,
328 block_buffer: Vec::with_capacity(BLOCK_SIZE),
329 prev_key: Vec::new(),
330 index: Vec::new(),
331 current_offset: 0,
332 num_entries: 0,
333 block_first_key: None,
334 _phantom: std::marker::PhantomData,
335 }
336 }
337
338 pub fn insert(&mut self, key: &[u8], value: &V) -> io::Result<()> {
339 if self.block_first_key.is_none() {
340 self.block_first_key = Some(key.to_vec());
341 }
342
343 let prefix_len = common_prefix_len(&self.prev_key, key);
344 let suffix = &key[prefix_len..];
345
346 write_vint(&mut self.block_buffer, prefix_len as u64)?;
347 write_vint(&mut self.block_buffer, suffix.len() as u64)?;
348 self.block_buffer.extend_from_slice(suffix);
349 value.serialize(&mut self.block_buffer)?;
350
351 self.prev_key.clear();
352 self.prev_key.extend_from_slice(key);
353 self.num_entries += 1;
354
355 if self.block_buffer.len() >= BLOCK_SIZE {
356 self.flush_block()?;
357 }
358
359 Ok(())
360 }
361
362 fn flush_block(&mut self) -> io::Result<()> {
363 if self.block_buffer.is_empty() {
364 return Ok(());
365 }
366
367 let compressed = crate::compression::compress(
368 &self.block_buffer[..],
369 crate::compression::CompressionLevel(3),
370 )?;
371
372 if let Some(first_key) = self.block_first_key.take() {
373 self.index.push(BlockIndexEntry {
374 first_key,
375 offset: self.current_offset,
376 length: compressed.len() as u32,
377 });
378 }
379
380 self.writer.write_all(&compressed)?;
381 self.current_offset += compressed.len() as u64;
382
383 self.block_buffer.clear();
384 self.prev_key.clear();
385
386 Ok(())
387 }
388
389 pub fn finish(mut self) -> io::Result<()> {
390 self.flush_block()?;
391
392 let data_end_offset = self.current_offset;
393
394 let sparse_index: Vec<SparseIndexEntry> = self
396 .index
397 .iter()
398 .enumerate()
399 .filter(|(i, _)| *i % SPARSE_INDEX_INTERVAL == 0)
400 .map(|(i, entry)| SparseIndexEntry {
401 first_key: entry.first_key.clone(),
402 block_idx: i as u32,
403 })
404 .collect();
405
406 self.writer
408 .write_u32::<LittleEndian>(self.index.len() as u32)?;
409 for entry in &self.index {
410 self.writer
411 .write_u16::<LittleEndian>(entry.first_key.len() as u16)?;
412 self.writer.write_all(&entry.first_key)?;
413 self.writer.write_u64::<LittleEndian>(entry.offset)?;
414 self.writer.write_u32::<LittleEndian>(entry.length)?;
415 }
416
417 self.writer
419 .write_u32::<LittleEndian>(sparse_index.len() as u32)?;
420 for entry in &sparse_index {
421 self.writer
422 .write_u16::<LittleEndian>(entry.first_key.len() as u16)?;
423 self.writer.write_all(&entry.first_key)?;
424 self.writer.write_u32::<LittleEndian>(entry.block_idx)?;
425 }
426
427 self.writer.write_u64::<LittleEndian>(data_end_offset)?;
429 self.writer.write_u64::<LittleEndian>(self.num_entries)?;
430 self.writer.write_u32::<LittleEndian>(SSTABLE_MAGIC)?;
431
432 Ok(())
433 }
434}
435
436#[derive(Debug, Clone)]
438struct BlockIndexEntry {
439 first_key: Vec<u8>,
440 offset: u64,
441 length: u32,
442}
443
444pub struct AsyncSSTableReader<V: SSTableValue> {
446 data_slice: LazyFileSlice,
448 index: Vec<BlockIndexEntry>,
450 sparse_index: Vec<SparseIndexEntry>,
452 num_entries: u64,
453 cache: RwLock<BlockCache>,
455 _phantom: std::marker::PhantomData<V>,
456}
457
458struct BlockCache {
460 blocks: FxHashMap<u64, Arc<Vec<u8>>>,
461 access_order: Vec<u64>,
462 max_blocks: usize,
463}
464
465impl BlockCache {
466 fn new(max_blocks: usize) -> Self {
467 Self {
468 blocks: FxHashMap::default(),
469 access_order: Vec::new(),
470 max_blocks,
471 }
472 }
473
474 fn get(&mut self, offset: u64) -> Option<Arc<Vec<u8>>> {
475 if let Some(block) = self.blocks.get(&offset) {
476 if let Some(pos) = self.access_order.iter().position(|&o| o == offset) {
477 self.access_order.remove(pos);
478 self.access_order.push(offset);
479 }
480 Some(Arc::clone(block))
481 } else {
482 None
483 }
484 }
485
486 fn insert(&mut self, offset: u64, block: Arc<Vec<u8>>) {
487 while self.blocks.len() >= self.max_blocks && !self.access_order.is_empty() {
488 let evict_offset = self.access_order.remove(0);
489 self.blocks.remove(&evict_offset);
490 }
491 self.blocks.insert(offset, block);
492 self.access_order.push(offset);
493 }
494}
495
496impl<V: SSTableValue> AsyncSSTableReader<V> {
497 pub async fn open(file_handle: LazyFileHandle, cache_blocks: usize) -> io::Result<Self> {
500 let file_len = file_handle.len();
501 if file_len < 20 {
502 return Err(io::Error::new(
503 io::ErrorKind::InvalidData,
504 "SSTable too small",
505 ));
506 }
507
508 let footer_bytes = file_handle
510 .read_bytes_range(file_len - 20..file_len)
511 .await?;
512 let mut footer_reader = footer_bytes.as_slice();
513
514 let data_end_offset = footer_reader.read_u64::<LittleEndian>()?;
515 let num_entries = footer_reader.read_u64::<LittleEndian>()?;
516 let magic = footer_reader.read_u32::<LittleEndian>()?;
517
518 if magic != SSTABLE_MAGIC {
519 return Err(io::Error::new(
520 io::ErrorKind::InvalidData,
521 "Invalid SSTable magic",
522 ));
523 }
524
525 let index_start = data_end_offset as usize;
527 let index_end = file_len - 20;
528 let index_bytes = file_handle.read_bytes_range(index_start..index_end).await?;
529 let mut reader = index_bytes.as_slice();
530
531 let num_blocks = reader.read_u32::<LittleEndian>()? as usize;
533 let mut index = Vec::with_capacity(num_blocks);
534
535 for _ in 0..num_blocks {
536 let key_len = reader.read_u16::<LittleEndian>()? as usize;
537 let mut first_key = vec![0u8; key_len];
538 reader.read_exact(&mut first_key)?;
539 let offset = reader.read_u64::<LittleEndian>()?;
540 let length = reader.read_u32::<LittleEndian>()?;
541
542 index.push(BlockIndexEntry {
543 first_key,
544 offset,
545 length,
546 });
547 }
548
549 let num_sparse = reader.read_u32::<LittleEndian>()? as usize;
551 let mut sparse_index = Vec::with_capacity(num_sparse);
552
553 for _ in 0..num_sparse {
554 let key_len = reader.read_u16::<LittleEndian>()? as usize;
555 let mut first_key = vec![0u8; key_len];
556 reader.read_exact(&mut first_key)?;
557 let block_idx = reader.read_u32::<LittleEndian>()?;
558
559 sparse_index.push(SparseIndexEntry {
560 first_key,
561 block_idx,
562 });
563 }
564
565 let data_slice = file_handle.slice(0..data_end_offset as usize);
567
568 Ok(Self {
569 data_slice,
570 index,
571 sparse_index,
572 num_entries,
573 cache: RwLock::new(BlockCache::new(cache_blocks)),
574 _phantom: std::marker::PhantomData,
575 })
576 }
577
578 pub fn num_entries(&self) -> u64 {
580 self.num_entries
581 }
582
583 pub fn stats(&self) -> SSTableStats {
585 SSTableStats {
586 num_blocks: self.index.len(),
587 num_sparse_entries: self.sparse_index.len(),
588 num_entries: self.num_entries,
589 }
590 }
591
592 pub async fn get(&self, key: &[u8]) -> io::Result<Option<V>> {
597 log::debug!(
598 "SSTable::get called, key_len={}, total_blocks={}, sparse_entries={}",
599 key.len(),
600 self.index.len(),
601 self.sparse_index.len()
602 );
603
604 let (start_block, end_block) = self.find_block_range(key);
606 log::debug!("SSTable::get sparse_range=[{}, {}]", start_block, end_block);
607
608 let search_range = &self.index[start_block..=end_block];
610 let block_idx =
611 match search_range.binary_search_by(|entry| entry.first_key.as_slice().cmp(key)) {
612 Ok(idx) => start_block + idx,
613 Err(0) => {
614 if start_block == 0 {
615 log::debug!("SSTable::get key not found (before first block)");
616 return Ok(None);
617 }
618 start_block
619 }
620 Err(idx) => start_block + idx - 1,
621 };
622
623 log::debug!("SSTable::get loading block_idx={}", block_idx);
624
625 let block_data = self.load_block(block_idx).await?;
627 self.search_block(&block_data, key)
628 }
629
630 pub async fn get_batch(&self, keys: &[&[u8]]) -> io::Result<Vec<Option<V>>> {
635 if keys.is_empty() {
636 return Ok(Vec::new());
637 }
638
639 let mut key_to_block: Vec<(usize, usize)> = Vec::with_capacity(keys.len());
641 for (key_idx, key) in keys.iter().enumerate() {
642 let (start_block, end_block) = self.find_block_range(key);
643 let search_range = &self.index[start_block..=end_block];
644 let block_idx =
645 match search_range.binary_search_by(|entry| entry.first_key.as_slice().cmp(key)) {
646 Ok(idx) => start_block + idx,
647 Err(0) => {
648 if start_block == 0 {
649 key_to_block.push((key_idx, usize::MAX)); continue;
651 }
652 start_block
653 }
654 Err(idx) => start_block + idx - 1,
655 };
656 key_to_block.push((key_idx, block_idx));
657 }
658
659 let mut blocks_to_load: Vec<usize> = key_to_block
661 .iter()
662 .filter(|(_, b)| *b != usize::MAX)
663 .map(|(_, b)| *b)
664 .collect();
665 blocks_to_load.sort_unstable();
666 blocks_to_load.dedup();
667
668 for &block_idx in &blocks_to_load {
670 let _ = self.load_block(block_idx).await?;
671 }
672
673 let mut results = vec![None; keys.len()];
675 for (key_idx, block_idx) in key_to_block {
676 if block_idx == usize::MAX {
677 continue;
678 }
679 let block_data = self.load_block(block_idx).await?; results[key_idx] = self.search_block(&block_data, keys[key_idx])?;
681 }
682
683 Ok(results)
684 }
685
686 fn find_block_range(&self, key: &[u8]) -> (usize, usize) {
690 if self.sparse_index.is_empty() {
691 return (0, self.index.len().saturating_sub(1));
692 }
693
694 let sparse_pos = match self
696 .sparse_index
697 .binary_search_by(|entry| entry.first_key.as_slice().cmp(key))
698 {
699 Ok(idx) => idx,
700 Err(0) => 0,
701 Err(idx) => idx - 1,
702 };
703
704 let start_block = self.sparse_index[sparse_pos].block_idx as usize;
705 let end_block = if sparse_pos + 1 < self.sparse_index.len() {
706 (self.sparse_index[sparse_pos + 1].block_idx as usize).saturating_sub(1)
708 } else {
709 self.index.len().saturating_sub(1)
711 };
712
713 (start_block, end_block.max(start_block))
714 }
715
716 pub async fn preload_all_blocks(&self) -> io::Result<()> {
721 for block_idx in 0..self.index.len() {
722 self.load_block(block_idx).await?;
723 }
724 Ok(())
725 }
726
727 async fn load_block(&self, block_idx: usize) -> io::Result<Arc<Vec<u8>>> {
729 let entry = &self.index[block_idx];
730
731 {
733 let mut cache = self.cache.write();
734 if let Some(block) = cache.get(entry.offset) {
735 log::debug!("SSTable::load_block idx={} CACHE HIT", block_idx);
736 return Ok(block);
737 }
738 }
739
740 log::debug!(
741 "SSTable::load_block idx={} CACHE MISS, reading bytes [{}-{}]",
742 block_idx,
743 entry.offset,
744 entry.offset + entry.length as u64
745 );
746
747 let start = entry.offset as usize;
749 let end = start + entry.length as usize;
750 let compressed = self.data_slice.read_bytes_range(start..end).await?;
751
752 let decompressed = crate::compression::decompress(compressed.as_slice())?;
753
754 let block = Arc::new(decompressed);
755
756 {
758 let mut cache = self.cache.write();
759 cache.insert(entry.offset, Arc::clone(&block));
760 }
761
762 Ok(block)
763 }
764
765 fn search_block(&self, block_data: &[u8], target_key: &[u8]) -> io::Result<Option<V>> {
766 let mut reader = block_data;
767 let mut current_key = Vec::new();
768
769 while !reader.is_empty() {
770 let common_prefix_len = read_vint(&mut reader)? as usize;
771 let suffix_len = read_vint(&mut reader)? as usize;
772
773 current_key.truncate(common_prefix_len);
774 let mut suffix = vec![0u8; suffix_len];
775 reader.read_exact(&mut suffix)?;
776 current_key.extend_from_slice(&suffix);
777
778 let value = V::deserialize(&mut reader)?;
779
780 match current_key.as_slice().cmp(target_key) {
781 std::cmp::Ordering::Equal => return Ok(Some(value)),
782 std::cmp::Ordering::Greater => return Ok(None),
783 std::cmp::Ordering::Less => continue,
784 }
785 }
786
787 Ok(None)
788 }
789
790 pub async fn prefetch_range(&self, start_key: &[u8], end_key: &[u8]) -> io::Result<()> {
792 let start_block = match self
793 .index
794 .binary_search_by(|e| e.first_key.as_slice().cmp(start_key))
795 {
796 Ok(idx) => idx,
797 Err(0) => 0,
798 Err(idx) => idx - 1,
799 };
800
801 let end_block = match self
802 .index
803 .binary_search_by(|e| e.first_key.as_slice().cmp(end_key))
804 {
805 Ok(idx) => idx,
806 Err(idx) if idx >= self.index.len() => self.index.len().saturating_sub(1),
807 Err(idx) => idx,
808 };
809
810 for block_idx in start_block..=end_block.min(self.index.len().saturating_sub(1)) {
811 let _ = self.load_block(block_idx).await?;
812 }
813
814 Ok(())
815 }
816
817 pub fn iter(&self) -> AsyncSSTableIterator<'_, V> {
819 AsyncSSTableIterator::new(self)
820 }
821
822 pub async fn all_entries(&self) -> io::Result<Vec<(Vec<u8>, V)>> {
824 let mut results = Vec::new();
825
826 for block_idx in 0..self.index.len() {
827 let block_data = self.load_block(block_idx).await?;
828 let mut reader = block_data.as_slice();
829 let mut current_key = Vec::new();
830
831 while !reader.is_empty() {
832 let common_prefix_len = read_vint(&mut reader)? as usize;
833 let suffix_len = read_vint(&mut reader)? as usize;
834
835 current_key.truncate(common_prefix_len);
836 let mut suffix = vec![0u8; suffix_len];
837 reader.read_exact(&mut suffix)?;
838 current_key.extend_from_slice(&suffix);
839
840 let value = V::deserialize(&mut reader)?;
841 results.push((current_key.clone(), value));
842 }
843 }
844
845 Ok(results)
846 }
847}
848
849pub struct AsyncSSTableIterator<'a, V: SSTableValue> {
851 reader: &'a AsyncSSTableReader<V>,
852 current_block: usize,
853 block_data: Option<Arc<Vec<u8>>>,
854 block_offset: usize,
855 current_key: Vec<u8>,
856 finished: bool,
857}
858
859impl<'a, V: SSTableValue> AsyncSSTableIterator<'a, V> {
860 fn new(reader: &'a AsyncSSTableReader<V>) -> Self {
861 Self {
862 reader,
863 current_block: 0,
864 block_data: None,
865 block_offset: 0,
866 current_key: Vec::new(),
867 finished: reader.index.is_empty(),
868 }
869 }
870
871 async fn load_next_block(&mut self) -> io::Result<bool> {
872 if self.current_block >= self.reader.index.len() {
873 self.finished = true;
874 return Ok(false);
875 }
876
877 self.block_data = Some(self.reader.load_block(self.current_block).await?);
878 self.block_offset = 0;
879 self.current_key.clear();
880 self.current_block += 1;
881 Ok(true)
882 }
883
884 pub async fn next(&mut self) -> io::Result<Option<(Vec<u8>, V)>> {
886 if self.finished {
887 return Ok(None);
888 }
889
890 if self.block_data.is_none() && !self.load_next_block().await? {
891 return Ok(None);
892 }
893
894 loop {
895 let block = self.block_data.as_ref().unwrap();
896 if self.block_offset >= block.len() {
897 if !self.load_next_block().await? {
898 return Ok(None);
899 }
900 continue;
901 }
902
903 let mut reader = &block[self.block_offset..];
904 let start_len = reader.len();
905
906 let common_prefix_len = read_vint(&mut reader)? as usize;
907 let suffix_len = read_vint(&mut reader)? as usize;
908
909 self.current_key.truncate(common_prefix_len);
910 let mut suffix = vec![0u8; suffix_len];
911 reader.read_exact(&mut suffix)?;
912 self.current_key.extend_from_slice(&suffix);
913
914 let value = V::deserialize(&mut reader)?;
915
916 self.block_offset += start_len - reader.len();
917
918 return Ok(Some((self.current_key.clone(), value)));
919 }
920 }
921}