hermes_core/structures/
sstable.rs

1//! Async SSTable with lazy loading via FileSlice
2//!
3//! Only loads the index into memory, blocks are loaded on-demand.
4//!
5//! ## Sparse Index Optimization
6//!
7//! To reduce I/O during binary search, we maintain a sparse top-level index
8//! that samples every Nth block's first key. This allows us to narrow down
9//! the search range in-memory before doing any I/O, reducing bisect reads
10//! from O(log N) to O(log(N/SPARSE_INDEX_INTERVAL)) ≈ 1-2 reads.
11
12use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
13use parking_lot::RwLock;
14use rustc_hash::FxHashMap;
15use std::io::{self, Read, Write};
16use std::sync::Arc;
17
18use crate::directories::{AsyncFileRead, LazyFileHandle, LazyFileSlice};
19
20/// SSTable magic number
21pub const SSTABLE_MAGIC: u32 = 0x53544232; // "STB2"
22
23/// Block size for SSTable (16KB default)
24pub const BLOCK_SIZE: usize = 16 * 1024;
25
26/// Sparse index sampling interval - sample every Nth block
27/// The block index is already fully in memory; sparse index provides
28/// an additional level for very large SSTables.
29pub const SPARSE_INDEX_INTERVAL: usize = 16;
30
31/// SSTable value trait
32pub trait SSTableValue: Clone + Send + Sync {
33    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()>;
34    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self>;
35}
36
37/// u64 value implementation
38impl SSTableValue for u64 {
39    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
40        write_vint(writer, *self)
41    }
42
43    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
44        read_vint(reader)
45    }
46}
47
48/// Vec<u8> value implementation
49impl SSTableValue for Vec<u8> {
50    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
51        write_vint(writer, self.len() as u64)?;
52        writer.write_all(self)
53    }
54
55    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
56        let len = read_vint(reader)? as usize;
57        let mut data = vec![0u8; len];
58        reader.read_exact(&mut data)?;
59        Ok(data)
60    }
61}
62
63/// Maximum number of postings that can be inlined in TermInfo
64pub const MAX_INLINE_POSTINGS: usize = 3;
65
66/// Term info for posting list references
67///
68/// Supports two modes:
69/// - **Inline**: Small posting lists (1-3 docs) stored directly in TermInfo
70/// - **External**: Larger posting lists stored in separate .post file
71///
72/// This eliminates a separate I/O read for rare/unique terms.
73#[derive(Debug, Clone, PartialEq, Eq)]
74pub enum TermInfo {
75    /// Small posting list inlined directly (up to MAX_INLINE_POSTINGS entries)
76    /// Each entry is (doc_id, term_freq) delta-encoded
77    Inline {
78        /// Number of postings (1-3)
79        doc_freq: u8,
80        /// Inline data: delta-encoded (doc_id, term_freq) pairs
81        /// Format: [delta_doc_id, term_freq, delta_doc_id, term_freq, ...]
82        data: [u8; 16],
83        /// Actual length of data used
84        data_len: u8,
85    },
86    /// Reference to external posting list in .post file
87    External {
88        posting_offset: u64,
89        posting_len: u32,
90        doc_freq: u32,
91    },
92}
93
94impl TermInfo {
95    /// Create an external reference
96    pub fn external(posting_offset: u64, posting_len: u32, doc_freq: u32) -> Self {
97        TermInfo::External {
98            posting_offset,
99            posting_len,
100            doc_freq,
101        }
102    }
103
104    /// Try to create an inline TermInfo from posting data
105    /// Returns None if posting list is too large to inline
106    pub fn try_inline(doc_ids: &[u32], term_freqs: &[u32]) -> Option<Self> {
107        if doc_ids.len() > MAX_INLINE_POSTINGS || doc_ids.is_empty() {
108            return None;
109        }
110
111        let mut data = [0u8; 16];
112        let mut cursor = std::io::Cursor::new(&mut data[..]);
113        let mut prev_doc_id = 0u32;
114
115        for (i, &doc_id) in doc_ids.iter().enumerate() {
116            let delta = doc_id - prev_doc_id;
117            if write_vint(&mut cursor, delta as u64).is_err() {
118                return None;
119            }
120            if write_vint(&mut cursor, term_freqs[i] as u64).is_err() {
121                return None;
122            }
123            prev_doc_id = doc_id;
124        }
125
126        let data_len = cursor.position() as u8;
127        if data_len > 16 {
128            return None;
129        }
130
131        Some(TermInfo::Inline {
132            doc_freq: doc_ids.len() as u8,
133            data,
134            data_len,
135        })
136    }
137
138    /// Get document frequency
139    pub fn doc_freq(&self) -> u32 {
140        match self {
141            TermInfo::Inline { doc_freq, .. } => *doc_freq as u32,
142            TermInfo::External { doc_freq, .. } => *doc_freq,
143        }
144    }
145
146    /// Check if this is an inline posting list
147    pub fn is_inline(&self) -> bool {
148        matches!(self, TermInfo::Inline { .. })
149    }
150
151    /// Get external posting info (offset, len) - returns None for inline
152    pub fn external_info(&self) -> Option<(u64, u32)> {
153        match self {
154            TermInfo::External {
155                posting_offset,
156                posting_len,
157                ..
158            } => Some((*posting_offset, *posting_len)),
159            TermInfo::Inline { .. } => None,
160        }
161    }
162
163    /// Decode inline postings into (doc_ids, term_freqs)
164    /// Returns None if this is an external reference
165    pub fn decode_inline(&self) -> Option<(Vec<u32>, Vec<u32>)> {
166        match self {
167            TermInfo::Inline {
168                doc_freq,
169                data,
170                data_len,
171            } => {
172                let mut doc_ids = Vec::with_capacity(*doc_freq as usize);
173                let mut term_freqs = Vec::with_capacity(*doc_freq as usize);
174                let mut reader = &data[..*data_len as usize];
175                let mut prev_doc_id = 0u32;
176
177                for _ in 0..*doc_freq {
178                    let delta = read_vint(&mut reader).ok()? as u32;
179                    let tf = read_vint(&mut reader).ok()? as u32;
180                    let doc_id = prev_doc_id + delta;
181                    doc_ids.push(doc_id);
182                    term_freqs.push(tf);
183                    prev_doc_id = doc_id;
184                }
185
186                Some((doc_ids, term_freqs))
187            }
188            TermInfo::External { .. } => None,
189        }
190    }
191}
192
193impl SSTableValue for TermInfo {
194    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
195        match self {
196            TermInfo::Inline {
197                doc_freq,
198                data,
199                data_len,
200            } => {
201                // Tag byte 0xFF = inline marker
202                writer.write_u8(0xFF)?;
203                writer.write_u8(*doc_freq)?;
204                writer.write_u8(*data_len)?;
205                writer.write_all(&data[..*data_len as usize])?;
206            }
207            TermInfo::External {
208                posting_offset,
209                posting_len,
210                doc_freq,
211            } => {
212                // Tag byte 0x00 = external marker
213                writer.write_u8(0x00)?;
214                write_vint(writer, *doc_freq as u64)?;
215                write_vint(writer, *posting_offset)?;
216                write_vint(writer, *posting_len as u64)?;
217            }
218        }
219        Ok(())
220    }
221
222    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
223        let tag = reader.read_u8()?;
224
225        if tag == 0xFF {
226            // Inline
227            let doc_freq = reader.read_u8()?;
228            let data_len = reader.read_u8()?;
229            let mut data = [0u8; 16];
230            reader.read_exact(&mut data[..data_len as usize])?;
231            Ok(TermInfo::Inline {
232                doc_freq,
233                data,
234                data_len,
235            })
236        } else if tag == 0x00 {
237            // External
238            let doc_freq = read_vint(reader)? as u32;
239            let posting_offset = read_vint(reader)?;
240            let posting_len = read_vint(reader)? as u32;
241            Ok(TermInfo::External {
242                posting_offset,
243                posting_len,
244                doc_freq,
245            })
246        } else {
247            Err(io::Error::new(
248                io::ErrorKind::InvalidData,
249                format!("Invalid TermInfo tag: {}", tag),
250            ))
251        }
252    }
253}
254
255/// Write variable-length integer
256pub fn write_vint<W: Write>(writer: &mut W, mut value: u64) -> io::Result<()> {
257    loop {
258        let byte = (value & 0x7F) as u8;
259        value >>= 7;
260        if value == 0 {
261            writer.write_u8(byte)?;
262            return Ok(());
263        } else {
264            writer.write_u8(byte | 0x80)?;
265        }
266    }
267}
268
269/// Read variable-length integer
270pub fn read_vint<R: Read>(reader: &mut R) -> io::Result<u64> {
271    let mut result = 0u64;
272    let mut shift = 0;
273
274    loop {
275        let byte = reader.read_u8()?;
276        result |= ((byte & 0x7F) as u64) << shift;
277        if byte & 0x80 == 0 {
278            return Ok(result);
279        }
280        shift += 7;
281        if shift >= 64 {
282            return Err(io::Error::new(
283                io::ErrorKind::InvalidData,
284                "varint too long",
285            ));
286        }
287    }
288}
289
290/// Compute common prefix length
291pub fn common_prefix_len(a: &[u8], b: &[u8]) -> usize {
292    a.iter().zip(b.iter()).take_while(|(x, y)| x == y).count()
293}
294
295/// Sparse index entry - samples every Nth block for fast range narrowing
296#[derive(Debug, Clone)]
297struct SparseIndexEntry {
298    /// First key of the sampled block
299    first_key: Vec<u8>,
300    /// Index into the full block index
301    block_idx: u32,
302}
303
304/// SSTable statistics for debugging
305#[derive(Debug, Clone)]
306pub struct SSTableStats {
307    pub num_blocks: usize,
308    pub num_sparse_entries: usize,
309    pub num_entries: u64,
310}
311
312/// SSTable writer
313pub struct SSTableWriter<'a, V: SSTableValue> {
314    writer: &'a mut dyn Write,
315    block_buffer: Vec<u8>,
316    prev_key: Vec<u8>,
317    index: Vec<BlockIndexEntry>,
318    current_offset: u64,
319    num_entries: u64,
320    block_first_key: Option<Vec<u8>>,
321    _phantom: std::marker::PhantomData<V>,
322}
323
324impl<'a, V: SSTableValue> SSTableWriter<'a, V> {
325    pub fn new(writer: &'a mut dyn Write) -> Self {
326        Self {
327            writer,
328            block_buffer: Vec::with_capacity(BLOCK_SIZE),
329            prev_key: Vec::new(),
330            index: Vec::new(),
331            current_offset: 0,
332            num_entries: 0,
333            block_first_key: None,
334            _phantom: std::marker::PhantomData,
335        }
336    }
337
338    pub fn insert(&mut self, key: &[u8], value: &V) -> io::Result<()> {
339        if self.block_first_key.is_none() {
340            self.block_first_key = Some(key.to_vec());
341        }
342
343        let prefix_len = common_prefix_len(&self.prev_key, key);
344        let suffix = &key[prefix_len..];
345
346        write_vint(&mut self.block_buffer, prefix_len as u64)?;
347        write_vint(&mut self.block_buffer, suffix.len() as u64)?;
348        self.block_buffer.extend_from_slice(suffix);
349        value.serialize(&mut self.block_buffer)?;
350
351        self.prev_key.clear();
352        self.prev_key.extend_from_slice(key);
353        self.num_entries += 1;
354
355        if self.block_buffer.len() >= BLOCK_SIZE {
356            self.flush_block()?;
357        }
358
359        Ok(())
360    }
361
362    fn flush_block(&mut self) -> io::Result<()> {
363        if self.block_buffer.is_empty() {
364            return Ok(());
365        }
366
367        let compressed = crate::compression::compress(
368            &self.block_buffer[..],
369            crate::compression::CompressionLevel(3),
370        )?;
371
372        if let Some(first_key) = self.block_first_key.take() {
373            self.index.push(BlockIndexEntry {
374                first_key,
375                offset: self.current_offset,
376                length: compressed.len() as u32,
377            });
378        }
379
380        self.writer.write_all(&compressed)?;
381        self.current_offset += compressed.len() as u64;
382
383        self.block_buffer.clear();
384        self.prev_key.clear();
385
386        Ok(())
387    }
388
389    pub fn finish(mut self) -> io::Result<()> {
390        self.flush_block()?;
391
392        let data_end_offset = self.current_offset;
393
394        // Build sparse index - sample every SPARSE_INDEX_INTERVAL blocks
395        let sparse_index: Vec<SparseIndexEntry> = self
396            .index
397            .iter()
398            .enumerate()
399            .filter(|(i, _)| *i % SPARSE_INDEX_INTERVAL == 0)
400            .map(|(i, entry)| SparseIndexEntry {
401                first_key: entry.first_key.clone(),
402                block_idx: i as u32,
403            })
404            .collect();
405
406        // Write block index
407        self.writer
408            .write_u32::<LittleEndian>(self.index.len() as u32)?;
409        for entry in &self.index {
410            self.writer
411                .write_u16::<LittleEndian>(entry.first_key.len() as u16)?;
412            self.writer.write_all(&entry.first_key)?;
413            self.writer.write_u64::<LittleEndian>(entry.offset)?;
414            self.writer.write_u32::<LittleEndian>(entry.length)?;
415        }
416
417        // Write sparse index
418        self.writer
419            .write_u32::<LittleEndian>(sparse_index.len() as u32)?;
420        for entry in &sparse_index {
421            self.writer
422                .write_u16::<LittleEndian>(entry.first_key.len() as u16)?;
423            self.writer.write_all(&entry.first_key)?;
424            self.writer.write_u32::<LittleEndian>(entry.block_idx)?;
425        }
426
427        // Write footer
428        self.writer.write_u64::<LittleEndian>(data_end_offset)?;
429        self.writer.write_u64::<LittleEndian>(self.num_entries)?;
430        self.writer.write_u32::<LittleEndian>(SSTABLE_MAGIC)?;
431
432        Ok(())
433    }
434}
435
436/// Block index entry
437#[derive(Debug, Clone)]
438struct BlockIndexEntry {
439    first_key: Vec<u8>,
440    offset: u64,
441    length: u32,
442}
443
444/// Async SSTable reader - loads blocks on demand via LazyFileSlice
445pub struct AsyncSSTableReader<V: SSTableValue> {
446    /// LazyFileSlice for the data portion (blocks only) - fetches ranges on demand
447    data_slice: LazyFileSlice,
448    /// Block index (loaded into memory - small)
449    index: Vec<BlockIndexEntry>,
450    /// Sparse index for fast range narrowing (samples every Nth block)
451    sparse_index: Vec<SparseIndexEntry>,
452    num_entries: u64,
453    /// Hot cache for decompressed blocks
454    cache: RwLock<BlockCache>,
455    _phantom: std::marker::PhantomData<V>,
456}
457
458/// LRU-style block cache
459struct BlockCache {
460    blocks: FxHashMap<u64, Arc<Vec<u8>>>,
461    access_order: Vec<u64>,
462    max_blocks: usize,
463}
464
465impl BlockCache {
466    fn new(max_blocks: usize) -> Self {
467        Self {
468            blocks: FxHashMap::default(),
469            access_order: Vec::new(),
470            max_blocks,
471        }
472    }
473
474    fn get(&mut self, offset: u64) -> Option<Arc<Vec<u8>>> {
475        if let Some(block) = self.blocks.get(&offset) {
476            if let Some(pos) = self.access_order.iter().position(|&o| o == offset) {
477                self.access_order.remove(pos);
478                self.access_order.push(offset);
479            }
480            Some(Arc::clone(block))
481        } else {
482            None
483        }
484    }
485
486    fn insert(&mut self, offset: u64, block: Arc<Vec<u8>>) {
487        while self.blocks.len() >= self.max_blocks && !self.access_order.is_empty() {
488            let evict_offset = self.access_order.remove(0);
489            self.blocks.remove(&evict_offset);
490        }
491        self.blocks.insert(offset, block);
492        self.access_order.push(offset);
493    }
494}
495
496impl<V: SSTableValue> AsyncSSTableReader<V> {
497    /// Open an SSTable from a LazyFileHandle
498    /// Only loads the footer and index into memory, data blocks fetched on-demand
499    pub async fn open(file_handle: LazyFileHandle, cache_blocks: usize) -> io::Result<Self> {
500        let file_len = file_handle.len();
501        if file_len < 20 {
502            return Err(io::Error::new(
503                io::ErrorKind::InvalidData,
504                "SSTable too small",
505            ));
506        }
507
508        // Read footer (last 20 bytes)
509        let footer_bytes = file_handle
510            .read_bytes_range(file_len - 20..file_len)
511            .await?;
512        let mut footer_reader = footer_bytes.as_slice();
513
514        let data_end_offset = footer_reader.read_u64::<LittleEndian>()?;
515        let num_entries = footer_reader.read_u64::<LittleEndian>()?;
516        let magic = footer_reader.read_u32::<LittleEndian>()?;
517
518        if magic != SSTABLE_MAGIC {
519            return Err(io::Error::new(
520                io::ErrorKind::InvalidData,
521                "Invalid SSTable magic",
522            ));
523        }
524
525        // Read index (from data_end_offset to footer)
526        let index_start = data_end_offset as usize;
527        let index_end = file_len - 20;
528        let index_bytes = file_handle.read_bytes_range(index_start..index_end).await?;
529        let mut reader = index_bytes.as_slice();
530
531        // Read block index
532        let num_blocks = reader.read_u32::<LittleEndian>()? as usize;
533        let mut index = Vec::with_capacity(num_blocks);
534
535        for _ in 0..num_blocks {
536            let key_len = reader.read_u16::<LittleEndian>()? as usize;
537            let mut first_key = vec![0u8; key_len];
538            reader.read_exact(&mut first_key)?;
539            let offset = reader.read_u64::<LittleEndian>()?;
540            let length = reader.read_u32::<LittleEndian>()?;
541
542            index.push(BlockIndexEntry {
543                first_key,
544                offset,
545                length,
546            });
547        }
548
549        // Read sparse index
550        let num_sparse = reader.read_u32::<LittleEndian>()? as usize;
551        let mut sparse_index = Vec::with_capacity(num_sparse);
552
553        for _ in 0..num_sparse {
554            let key_len = reader.read_u16::<LittleEndian>()? as usize;
555            let mut first_key = vec![0u8; key_len];
556            reader.read_exact(&mut first_key)?;
557            let block_idx = reader.read_u32::<LittleEndian>()?;
558
559            sparse_index.push(SparseIndexEntry {
560                first_key,
561                block_idx,
562            });
563        }
564
565        // Create a lazy slice for just the data portion
566        let data_slice = file_handle.slice(0..data_end_offset as usize);
567
568        Ok(Self {
569            data_slice,
570            index,
571            sparse_index,
572            num_entries,
573            cache: RwLock::new(BlockCache::new(cache_blocks)),
574            _phantom: std::marker::PhantomData,
575        })
576    }
577
578    /// Number of entries
579    pub fn num_entries(&self) -> u64 {
580        self.num_entries
581    }
582
583    /// Get stats about this SSTable for debugging
584    pub fn stats(&self) -> SSTableStats {
585        SSTableStats {
586            num_blocks: self.index.len(),
587            num_sparse_entries: self.sparse_index.len(),
588            num_entries: self.num_entries,
589        }
590    }
591
592    /// Look up a key (async - may need to load block)
593    ///
594    /// Uses sparse index to narrow search range before binary search,
595    /// reducing I/O from O(log N) to typically 1 block read.
596    pub async fn get(&self, key: &[u8]) -> io::Result<Option<V>> {
597        log::debug!(
598            "SSTable::get called, key_len={}, total_blocks={}, sparse_entries={}",
599            key.len(),
600            self.index.len(),
601            self.sparse_index.len()
602        );
603
604        // Use sparse index to find the block range to search
605        let (start_block, end_block) = self.find_block_range(key);
606        log::debug!("SSTable::get sparse_range=[{}, {}]", start_block, end_block);
607
608        // Binary search within the narrowed range (in-memory, no I/O)
609        let search_range = &self.index[start_block..=end_block];
610        let block_idx =
611            match search_range.binary_search_by(|entry| entry.first_key.as_slice().cmp(key)) {
612                Ok(idx) => start_block + idx,
613                Err(0) => {
614                    if start_block == 0 {
615                        log::debug!("SSTable::get key not found (before first block)");
616                        return Ok(None);
617                    }
618                    start_block
619                }
620                Err(idx) => start_block + idx - 1,
621            };
622
623        log::debug!("SSTable::get loading block_idx={}", block_idx);
624
625        // Now we know exactly which block to load - single I/O
626        let block_data = self.load_block(block_idx).await?;
627        self.search_block(&block_data, key)
628    }
629
630    /// Batch lookup multiple keys with optimized I/O
631    ///
632    /// Groups keys by block and loads each block only once, reducing
633    /// I/O from N reads to at most N reads (often fewer if keys share blocks).
634    pub async fn get_batch(&self, keys: &[&[u8]]) -> io::Result<Vec<Option<V>>> {
635        if keys.is_empty() {
636            return Ok(Vec::new());
637        }
638
639        // Map each key to its block index
640        let mut key_to_block: Vec<(usize, usize)> = Vec::with_capacity(keys.len());
641        for (key_idx, key) in keys.iter().enumerate() {
642            let (start_block, end_block) = self.find_block_range(key);
643            let search_range = &self.index[start_block..=end_block];
644            let block_idx =
645                match search_range.binary_search_by(|entry| entry.first_key.as_slice().cmp(key)) {
646                    Ok(idx) => start_block + idx,
647                    Err(0) => {
648                        if start_block == 0 {
649                            key_to_block.push((key_idx, usize::MAX)); // Mark as not found
650                            continue;
651                        }
652                        start_block
653                    }
654                    Err(idx) => start_block + idx - 1,
655                };
656            key_to_block.push((key_idx, block_idx));
657        }
658
659        // Group keys by block
660        let mut blocks_to_load: Vec<usize> = key_to_block
661            .iter()
662            .filter(|(_, b)| *b != usize::MAX)
663            .map(|(_, b)| *b)
664            .collect();
665        blocks_to_load.sort_unstable();
666        blocks_to_load.dedup();
667
668        // Load all needed blocks (this is where I/O happens)
669        for &block_idx in &blocks_to_load {
670            let _ = self.load_block(block_idx).await?;
671        }
672
673        // Now search each key in its block (all blocks are cached)
674        let mut results = vec![None; keys.len()];
675        for (key_idx, block_idx) in key_to_block {
676            if block_idx == usize::MAX {
677                continue;
678            }
679            let block_data = self.load_block(block_idx).await?; // Will hit cache
680            results[key_idx] = self.search_block(&block_data, keys[key_idx])?;
681        }
682
683        Ok(results)
684    }
685
686    /// Find the block range that could contain the key using sparse index
687    ///
688    /// Returns (start_block_idx, end_block_idx) inclusive range
689    fn find_block_range(&self, key: &[u8]) -> (usize, usize) {
690        if self.sparse_index.is_empty() {
691            return (0, self.index.len().saturating_sub(1));
692        }
693
694        // Binary search in sparse index (in-memory, no I/O)
695        let sparse_pos = match self
696            .sparse_index
697            .binary_search_by(|entry| entry.first_key.as_slice().cmp(key))
698        {
699            Ok(idx) => idx,
700            Err(0) => 0,
701            Err(idx) => idx - 1,
702        };
703
704        let start_block = self.sparse_index[sparse_pos].block_idx as usize;
705        let end_block = if sparse_pos + 1 < self.sparse_index.len() {
706            // End at the next sparse entry's block (exclusive becomes inclusive -1)
707            (self.sparse_index[sparse_pos + 1].block_idx as usize).saturating_sub(1)
708        } else {
709            // Last sparse entry - search to end of index
710            self.index.len().saturating_sub(1)
711        };
712
713        (start_block, end_block.max(start_block))
714    }
715
716    /// Preload all data blocks into memory
717    ///
718    /// Call this after open() to eliminate all I/O during subsequent lookups.
719    /// Useful when the SSTable is small enough to fit in memory.
720    pub async fn preload_all_blocks(&self) -> io::Result<()> {
721        for block_idx in 0..self.index.len() {
722            self.load_block(block_idx).await?;
723        }
724        Ok(())
725    }
726
727    /// Load a block (checks cache first, then loads from FileSlice)
728    async fn load_block(&self, block_idx: usize) -> io::Result<Arc<Vec<u8>>> {
729        let entry = &self.index[block_idx];
730
731        // Check cache first
732        {
733            let mut cache = self.cache.write();
734            if let Some(block) = cache.get(entry.offset) {
735                log::debug!("SSTable::load_block idx={} CACHE HIT", block_idx);
736                return Ok(block);
737            }
738        }
739
740        log::debug!(
741            "SSTable::load_block idx={} CACHE MISS, reading bytes [{}-{}]",
742            block_idx,
743            entry.offset,
744            entry.offset + entry.length as u64
745        );
746
747        // Load from FileSlice
748        let start = entry.offset as usize;
749        let end = start + entry.length as usize;
750        let compressed = self.data_slice.read_bytes_range(start..end).await?;
751
752        let decompressed = crate::compression::decompress(compressed.as_slice())?;
753
754        let block = Arc::new(decompressed);
755
756        // Insert into cache
757        {
758            let mut cache = self.cache.write();
759            cache.insert(entry.offset, Arc::clone(&block));
760        }
761
762        Ok(block)
763    }
764
765    fn search_block(&self, block_data: &[u8], target_key: &[u8]) -> io::Result<Option<V>> {
766        let mut reader = block_data;
767        let mut current_key = Vec::new();
768
769        while !reader.is_empty() {
770            let common_prefix_len = read_vint(&mut reader)? as usize;
771            let suffix_len = read_vint(&mut reader)? as usize;
772
773            current_key.truncate(common_prefix_len);
774            let mut suffix = vec![0u8; suffix_len];
775            reader.read_exact(&mut suffix)?;
776            current_key.extend_from_slice(&suffix);
777
778            let value = V::deserialize(&mut reader)?;
779
780            match current_key.as_slice().cmp(target_key) {
781                std::cmp::Ordering::Equal => return Ok(Some(value)),
782                std::cmp::Ordering::Greater => return Ok(None),
783                std::cmp::Ordering::Less => continue,
784            }
785        }
786
787        Ok(None)
788    }
789
790    /// Prefetch blocks for a key range
791    pub async fn prefetch_range(&self, start_key: &[u8], end_key: &[u8]) -> io::Result<()> {
792        let start_block = match self
793            .index
794            .binary_search_by(|e| e.first_key.as_slice().cmp(start_key))
795        {
796            Ok(idx) => idx,
797            Err(0) => 0,
798            Err(idx) => idx - 1,
799        };
800
801        let end_block = match self
802            .index
803            .binary_search_by(|e| e.first_key.as_slice().cmp(end_key))
804        {
805            Ok(idx) => idx,
806            Err(idx) if idx >= self.index.len() => self.index.len().saturating_sub(1),
807            Err(idx) => idx,
808        };
809
810        for block_idx in start_block..=end_block.min(self.index.len().saturating_sub(1)) {
811            let _ = self.load_block(block_idx).await?;
812        }
813
814        Ok(())
815    }
816
817    /// Iterate over all entries (loads blocks as needed)
818    pub fn iter(&self) -> AsyncSSTableIterator<'_, V> {
819        AsyncSSTableIterator::new(self)
820    }
821
822    /// Get all entries as a vector (for merging)
823    pub async fn all_entries(&self) -> io::Result<Vec<(Vec<u8>, V)>> {
824        let mut results = Vec::new();
825
826        for block_idx in 0..self.index.len() {
827            let block_data = self.load_block(block_idx).await?;
828            let mut reader = block_data.as_slice();
829            let mut current_key = Vec::new();
830
831            while !reader.is_empty() {
832                let common_prefix_len = read_vint(&mut reader)? as usize;
833                let suffix_len = read_vint(&mut reader)? as usize;
834
835                current_key.truncate(common_prefix_len);
836                let mut suffix = vec![0u8; suffix_len];
837                reader.read_exact(&mut suffix)?;
838                current_key.extend_from_slice(&suffix);
839
840                let value = V::deserialize(&mut reader)?;
841                results.push((current_key.clone(), value));
842            }
843        }
844
845        Ok(results)
846    }
847}
848
849/// Async iterator over SSTable entries
850pub struct AsyncSSTableIterator<'a, V: SSTableValue> {
851    reader: &'a AsyncSSTableReader<V>,
852    current_block: usize,
853    block_data: Option<Arc<Vec<u8>>>,
854    block_offset: usize,
855    current_key: Vec<u8>,
856    finished: bool,
857}
858
859impl<'a, V: SSTableValue> AsyncSSTableIterator<'a, V> {
860    fn new(reader: &'a AsyncSSTableReader<V>) -> Self {
861        Self {
862            reader,
863            current_block: 0,
864            block_data: None,
865            block_offset: 0,
866            current_key: Vec::new(),
867            finished: reader.index.is_empty(),
868        }
869    }
870
871    async fn load_next_block(&mut self) -> io::Result<bool> {
872        if self.current_block >= self.reader.index.len() {
873            self.finished = true;
874            return Ok(false);
875        }
876
877        self.block_data = Some(self.reader.load_block(self.current_block).await?);
878        self.block_offset = 0;
879        self.current_key.clear();
880        self.current_block += 1;
881        Ok(true)
882    }
883
884    /// Advance to next entry (async)
885    pub async fn next(&mut self) -> io::Result<Option<(Vec<u8>, V)>> {
886        if self.finished {
887            return Ok(None);
888        }
889
890        if self.block_data.is_none() && !self.load_next_block().await? {
891            return Ok(None);
892        }
893
894        loop {
895            let block = self.block_data.as_ref().unwrap();
896            if self.block_offset >= block.len() {
897                if !self.load_next_block().await? {
898                    return Ok(None);
899                }
900                continue;
901            }
902
903            let mut reader = &block[self.block_offset..];
904            let start_len = reader.len();
905
906            let common_prefix_len = read_vint(&mut reader)? as usize;
907            let suffix_len = read_vint(&mut reader)? as usize;
908
909            self.current_key.truncate(common_prefix_len);
910            let mut suffix = vec![0u8; suffix_len];
911            reader.read_exact(&mut suffix)?;
912            self.current_key.extend_from_slice(&suffix);
913
914            let value = V::deserialize(&mut reader)?;
915
916            self.block_offset += start_len - reader.len();
917
918            return Ok(Some((self.current_key.clone(), value)));
919        }
920    }
921}