Skip to main content

amaters_core/storage/
sstable.rs

1//! SSTable (Sorted String Table) implementation
2//!
3//! SSTables are immutable, on-disk sorted key-value stores used in LSM-Tree.
4//! They store memtable snapshots persistently with efficient read access.
5
6use crate::error::{AmateRSError, ErrorContext, Result};
7use crate::storage::compression::{self, CompressionType};
8use crate::storage::{BloomFilter, BloomFilterConfig, BloomFilterMetadata};
9use crate::types::{CipherBlob, Key};
10use crate::utils::{calculate_checksum, verify_checksum};
11use std::collections::BTreeMap;
12use std::fs::{File, OpenOptions};
13use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
14use std::path::{Path, PathBuf};
15use std::sync::Arc;
16
17/// SSTable magic number: "SSTA" (0x53535441)
18const SSTABLE_MAGIC: u32 = 0x53535441;
19
20/// SSTable format version
21const SSTABLE_VERSION: u32 = 3; // Version 3 adds block compression
22
23/// Default block size (4KB)
24const DEFAULT_BLOCK_SIZE: usize = 4096;
25
26/// SSTable configuration
27#[derive(Debug, Clone)]
28pub struct SSTableConfig {
29    /// Block size in bytes (uncompressed target size)
30    pub block_size: usize,
31    /// Compression algorithm for data blocks
32    pub compression_type: CompressionType,
33}
34
35impl Default for SSTableConfig {
36    fn default() -> Self {
37        Self {
38            block_size: DEFAULT_BLOCK_SIZE,
39            compression_type: CompressionType::None,
40        }
41    }
42}
43
44/// Index entry pointing to a data block
45#[derive(Debug, Clone)]
46struct IndexEntry {
47    /// First key in the block
48    key: Key,
49    /// Offset of the block in the file
50    offset: u64,
51}
52
53/// Data block containing key-value pairs
54#[derive(Debug, Clone)]
55struct DataBlock {
56    entries: Vec<(Key, CipherBlob)>,
57    size: usize,
58}
59
60impl DataBlock {
61    fn new() -> Self {
62        Self {
63            entries: Vec::new(),
64            size: 0,
65        }
66    }
67
68    fn add_entry(&mut self, key: Key, value: CipherBlob) {
69        let entry_size = 8 + key.as_bytes().len() + value.as_bytes().len();
70        self.entries.push((key, value));
71        self.size += entry_size;
72    }
73
74    fn is_full(&self, block_size: usize) -> bool {
75        self.size >= block_size
76    }
77
78    fn encode(&self) -> Result<Vec<u8>> {
79        let mut bytes = Vec::with_capacity(self.size + 8);
80
81        // Number of entries (4 bytes)
82        bytes.extend_from_slice(&(self.entries.len() as u32).to_le_bytes());
83
84        // Entries
85        for (key, value) in &self.entries {
86            let key_bytes = key.as_bytes();
87            let value_bytes = value.as_bytes();
88
89            // Key length (4 bytes) + Key
90            bytes.extend_from_slice(&(key_bytes.len() as u32).to_le_bytes());
91            bytes.extend_from_slice(key_bytes);
92
93            // Value length (4 bytes) + Value
94            bytes.extend_from_slice(&(value_bytes.len() as u32).to_le_bytes());
95            bytes.extend_from_slice(value_bytes);
96        }
97
98        // Checksum (4 bytes)
99        let checksum = calculate_checksum(&bytes);
100        bytes.extend_from_slice(&checksum.to_le_bytes());
101
102        Ok(bytes)
103    }
104
105    fn decode(bytes: &[u8]) -> Result<Self> {
106        if bytes.len() < 8 {
107            return Err(AmateRSError::StorageIntegrity(ErrorContext::new(
108                "Data block too small".to_string(),
109            )));
110        }
111
112        // Verify checksum
113        let data_len = bytes.len() - 4;
114        let checksum_bytes = &bytes[data_len..];
115        let expected_checksum = u32::from_le_bytes([
116            checksum_bytes[0],
117            checksum_bytes[1],
118            checksum_bytes[2],
119            checksum_bytes[3],
120        ]);
121        verify_checksum(&bytes[..data_len], expected_checksum)?;
122
123        let mut cursor = 0;
124        let num_entries = u32::from_le_bytes([
125            bytes[cursor],
126            bytes[cursor + 1],
127            bytes[cursor + 2],
128            bytes[cursor + 3],
129        ]) as usize;
130        cursor += 4;
131
132        let mut block = DataBlock::new();
133
134        for _ in 0..num_entries {
135            // Read key
136            if cursor + 4 > data_len {
137                return Err(AmateRSError::StorageIntegrity(ErrorContext::new(
138                    "Incomplete key length".to_string(),
139                )));
140            }
141            let key_len = u32::from_le_bytes([
142                bytes[cursor],
143                bytes[cursor + 1],
144                bytes[cursor + 2],
145                bytes[cursor + 3],
146            ]) as usize;
147            cursor += 4;
148
149            if cursor + key_len > data_len {
150                return Err(AmateRSError::StorageIntegrity(ErrorContext::new(
151                    "Incomplete key data".to_string(),
152                )));
153            }
154            let key = Key::from_slice(&bytes[cursor..cursor + key_len]);
155            cursor += key_len;
156
157            // Read value
158            if cursor + 4 > data_len {
159                return Err(AmateRSError::StorageIntegrity(ErrorContext::new(
160                    "Incomplete value length".to_string(),
161                )));
162            }
163            let value_len = u32::from_le_bytes([
164                bytes[cursor],
165                bytes[cursor + 1],
166                bytes[cursor + 2],
167                bytes[cursor + 3],
168            ]) as usize;
169            cursor += 4;
170
171            if cursor + value_len > data_len {
172                return Err(AmateRSError::StorageIntegrity(ErrorContext::new(
173                    "Incomplete value data".to_string(),
174                )));
175            }
176            let value = CipherBlob::new(bytes[cursor..cursor + value_len].to_vec());
177            cursor += value_len;
178
179            block.add_entry(key, value);
180        }
181
182        Ok(block)
183    }
184}
185
186/// Footer size in bytes
187const FOOTER_SIZE: usize = 37;
188
189/// SSTable footer containing metadata
190#[derive(Debug, Clone)]
191struct Footer {
192    magic: u32,
193    version: u32,
194    index_offset: u64,
195    bloom_filter_offset: u64,
196    block_size: u32,
197    num_blocks: u32,
198    compression_type: CompressionType,
199    checksum: u32,
200}
201
202impl Footer {
203    fn new(
204        index_offset: u64,
205        bloom_filter_offset: u64,
206        block_size: u32,
207        num_blocks: u32,
208        compression_type: CompressionType,
209    ) -> Self {
210        let mut footer = Self {
211            magic: SSTABLE_MAGIC,
212            version: SSTABLE_VERSION,
213            index_offset,
214            bloom_filter_offset,
215            block_size,
216            num_blocks,
217            compression_type,
218            checksum: 0,
219        };
220
221        footer.checksum = footer.compute_checksum();
222        footer
223    }
224
225    fn compute_checksum(&self) -> u32 {
226        let mut bytes = Vec::new();
227        bytes.extend_from_slice(&self.magic.to_le_bytes());
228        bytes.extend_from_slice(&self.version.to_le_bytes());
229        bytes.extend_from_slice(&self.index_offset.to_le_bytes());
230        bytes.extend_from_slice(&self.bloom_filter_offset.to_le_bytes());
231        bytes.extend_from_slice(&self.block_size.to_le_bytes());
232        bytes.extend_from_slice(&self.num_blocks.to_le_bytes());
233        bytes.push(self.compression_type.to_byte());
234        calculate_checksum(&bytes)
235    }
236
237    fn encode(&self) -> Vec<u8> {
238        let mut bytes = Vec::with_capacity(FOOTER_SIZE);
239        bytes.extend_from_slice(&self.magic.to_le_bytes());
240        bytes.extend_from_slice(&self.version.to_le_bytes());
241        bytes.extend_from_slice(&self.index_offset.to_le_bytes());
242        bytes.extend_from_slice(&self.bloom_filter_offset.to_le_bytes());
243        bytes.extend_from_slice(&self.block_size.to_le_bytes());
244        bytes.extend_from_slice(&self.num_blocks.to_le_bytes());
245        bytes.push(self.compression_type.to_byte());
246        bytes.extend_from_slice(&self.checksum.to_le_bytes());
247        bytes
248    }
249
250    fn decode(bytes: &[u8]) -> Result<Self> {
251        if bytes.len() < FOOTER_SIZE {
252            return Err(AmateRSError::StorageIntegrity(ErrorContext::new(
253                "Footer too small".to_string(),
254            )));
255        }
256
257        let magic = u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
258        let version = u32::from_le_bytes([bytes[4], bytes[5], bytes[6], bytes[7]]);
259        let index_offset = u64::from_le_bytes([
260            bytes[8], bytes[9], bytes[10], bytes[11], bytes[12], bytes[13], bytes[14], bytes[15],
261        ]);
262        let bloom_filter_offset = u64::from_le_bytes([
263            bytes[16], bytes[17], bytes[18], bytes[19], bytes[20], bytes[21], bytes[22], bytes[23],
264        ]);
265        let block_size = u32::from_le_bytes([bytes[24], bytes[25], bytes[26], bytes[27]]);
266        let num_blocks = u32::from_le_bytes([bytes[28], bytes[29], bytes[30], bytes[31]]);
267        let compression_type = CompressionType::from_byte(bytes[32])?;
268        let checksum = u32::from_le_bytes([bytes[33], bytes[34], bytes[35], bytes[36]]);
269
270        if magic != SSTABLE_MAGIC {
271            return Err(AmateRSError::StorageIntegrity(ErrorContext::new(format!(
272                "Invalid SSTable magic: expected {}, got {}",
273                SSTABLE_MAGIC, magic
274            ))));
275        }
276
277        if version != SSTABLE_VERSION {
278            return Err(AmateRSError::StorageIntegrity(ErrorContext::new(format!(
279                "Unsupported SSTable version: {}",
280                version
281            ))));
282        }
283
284        let footer = Self {
285            magic,
286            version,
287            index_offset,
288            bloom_filter_offset,
289            block_size,
290            num_blocks,
291            compression_type,
292            checksum,
293        };
294
295        // Verify checksum
296        let expected = footer.compute_checksum();
297        if checksum != expected {
298            return Err(AmateRSError::StorageIntegrity(ErrorContext::new(format!(
299                "Footer checksum mismatch: expected {}, got {}",
300                expected, checksum
301            ))));
302        }
303
304        Ok(footer)
305    }
306}
307
308/// SSTable writer - builds SSTable from sorted entries
309pub struct SSTableWriter {
310    path: PathBuf,
311    config: SSTableConfig,
312    writer: Option<BufWriter<File>>,
313    current_block: DataBlock,
314    index: Vec<IndexEntry>,
315    current_offset: u64,
316    bloom_filter: BloomFilter,
317}
318
319impl SSTableWriter {
320    /// Create a new SSTable writer
321    pub fn new<P: AsRef<Path>>(path: P, config: SSTableConfig) -> Result<Self> {
322        let file = OpenOptions::new()
323            .write(true)
324            .create(true)
325            .truncate(true)
326            .open(path.as_ref())
327            .map_err(|e| {
328                AmateRSError::StorageIntegrity(ErrorContext::new(format!(
329                    "Failed to create SSTable file: {}",
330                    e
331                )))
332            })?;
333
334        // Create bloom filter with default configuration
335        let bloom_filter = BloomFilter::new(BloomFilterConfig {
336            expected_elements: 10000,  // Default estimate
337            false_positive_rate: 0.01, // 1%
338        });
339
340        Ok(Self {
341            path: path.as_ref().to_path_buf(),
342            config,
343            writer: Some(BufWriter::new(file)),
344            current_block: DataBlock::new(),
345            index: Vec::new(),
346            current_offset: 0,
347            bloom_filter,
348        })
349    }
350
351    /// Add a key-value pair (must be in sorted order)
352    pub fn add(&mut self, key: Key, value: CipherBlob) -> Result<()> {
353        // If adding this entry would exceed block size, flush current block
354        let entry_size = 8 + key.as_bytes().len() + value.as_bytes().len();
355        if self.current_block.size + entry_size > self.config.block_size
356            && !self.current_block.entries.is_empty()
357        {
358            self.flush_block()?;
359        }
360
361        // If this is the first entry in the block, add to index
362        if self.current_block.entries.is_empty() {
363            self.index.push(IndexEntry {
364                key: key.clone(),
365                offset: self.current_offset,
366            });
367        }
368
369        // Insert key into bloom filter
370        self.bloom_filter.insert(&key);
371
372        self.current_block.add_entry(key, value);
373        Ok(())
374    }
375
376    /// Flush current block to disk
377    ///
378    /// Block on-disk format (with compression):
379    /// ```text
380    /// [original_size: 4 bytes LE][compressed_size: 4 bytes LE][compressed_data: N bytes]
381    /// ```
382    ///
383    /// When compression is `None`, `compressed_data` is the raw encoded block
384    /// and `original_size == compressed_size`.
385    fn flush_block(&mut self) -> Result<()> {
386        if self.current_block.entries.is_empty() {
387            return Ok(());
388        }
389
390        let writer = self.writer.as_mut().ok_or_else(|| {
391            AmateRSError::StorageIntegrity(ErrorContext::new(
392                "SSTable writer already finalized".to_string(),
393            ))
394        })?;
395
396        let block_bytes = self.current_block.encode()?;
397        let original_size = block_bytes.len() as u32;
398
399        let compressed = compression::compress_block(&block_bytes, self.config.compression_type)?;
400        let compressed_size = compressed.len() as u32;
401
402        // Write block envelope: original_size + compressed_size + data
403        writer
404            .write_all(&original_size.to_le_bytes())
405            .map_err(|e| {
406                AmateRSError::StorageIntegrity(ErrorContext::new(format!(
407                    "Failed to write block original size: {}",
408                    e
409                )))
410            })?;
411        writer
412            .write_all(&compressed_size.to_le_bytes())
413            .map_err(|e| {
414                AmateRSError::StorageIntegrity(ErrorContext::new(format!(
415                    "Failed to write block compressed size: {}",
416                    e
417                )))
418            })?;
419        writer.write_all(&compressed).map_err(|e| {
420            AmateRSError::StorageIntegrity(ErrorContext::new(format!(
421                "Failed to write compressed block: {}",
422                e
423            )))
424        })?;
425
426        // 8 bytes header + compressed data
427        self.current_offset += 8 + compressed.len() as u64;
428        self.current_block = DataBlock::new();
429
430        Ok(())
431    }
432
433    /// Finalize the SSTable (write index and footer)
434    pub fn finish(mut self) -> Result<()> {
435        // Flush remaining block
436        self.flush_block()?;
437
438        let writer = self.writer.as_mut().ok_or_else(|| {
439            AmateRSError::StorageIntegrity(ErrorContext::new(
440                "SSTable writer already finalized".to_string(),
441            ))
442        })?;
443
444        // Write index block
445        let index_offset = self.current_offset;
446        let mut index_bytes = Vec::new();
447
448        // Number of index entries
449        index_bytes.extend_from_slice(&(self.index.len() as u32).to_le_bytes());
450
451        for entry in &self.index {
452            let key_bytes = entry.key.as_bytes();
453            index_bytes.extend_from_slice(&(key_bytes.len() as u32).to_le_bytes());
454            index_bytes.extend_from_slice(key_bytes);
455            index_bytes.extend_from_slice(&entry.offset.to_le_bytes());
456        }
457
458        let index_checksum = calculate_checksum(&index_bytes);
459        index_bytes.extend_from_slice(&index_checksum.to_le_bytes());
460
461        writer.write_all(&index_bytes).map_err(|e| {
462            AmateRSError::StorageIntegrity(ErrorContext::new(format!(
463                "Failed to write index: {}",
464                e
465            )))
466        })?;
467        self.current_offset += index_bytes.len() as u64;
468
469        // Write bloom filter
470        let bloom_filter_offset = self.current_offset;
471
472        // Write bloom filter metadata
473        let bloom_metadata = self.bloom_filter.metadata();
474        let metadata_bytes = bloom_metadata.to_bytes();
475        writer.write_all(&metadata_bytes).map_err(|e| {
476            AmateRSError::StorageIntegrity(ErrorContext::new(format!(
477                "Failed to write bloom filter metadata: {}",
478                e
479            )))
480        })?;
481        self.current_offset += metadata_bytes.len() as u64;
482
483        // Write bloom filter data
484        let bloom_data = self.bloom_filter.as_bytes();
485        writer.write_all(bloom_data).map_err(|e| {
486            AmateRSError::StorageIntegrity(ErrorContext::new(format!(
487                "Failed to write bloom filter data: {}",
488                e
489            )))
490        })?;
491        self.current_offset += bloom_data.len() as u64;
492
493        // Write footer
494        let footer = Footer::new(
495            index_offset,
496            bloom_filter_offset,
497            self.config.block_size as u32,
498            self.index.len() as u32,
499            self.config.compression_type,
500        );
501        let footer_bytes = footer.encode();
502        writer.write_all(&footer_bytes).map_err(|e| {
503            AmateRSError::StorageIntegrity(ErrorContext::new(format!(
504                "Failed to write footer: {}",
505                e
506            )))
507        })?;
508
509        // Flush and sync
510        writer.flush().map_err(|e| {
511            AmateRSError::StorageIntegrity(ErrorContext::new(format!("Failed to flush: {}", e)))
512        })?;
513
514        writer.get_ref().sync_all().map_err(|e| {
515            AmateRSError::StorageIntegrity(ErrorContext::new(format!("Failed to sync: {}", e)))
516        })?;
517
518        self.writer = None;
519
520        Ok(())
521    }
522}
523
524/// SSTable reader - provides read access to SSTable
525pub struct SSTableReader {
526    path: PathBuf,
527    file: Arc<File>,
528    footer: Footer,
529    index: Vec<IndexEntry>,
530    bloom_filter: BloomFilter,
531    compression_type: CompressionType,
532}
533
534impl SSTableReader {
535    /// Open an existing SSTable for reading
536    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
537        let file = File::open(path.as_ref()).map_err(|e| {
538            AmateRSError::StorageIntegrity(ErrorContext::new(format!(
539                "Failed to open SSTable: {}",
540                e
541            )))
542        })?;
543
544        // Read footer
545        let file_size = file
546            .metadata()
547            .map_err(|e| {
548                AmateRSError::StorageIntegrity(ErrorContext::new(format!(
549                    "Failed to get file metadata: {}",
550                    e
551                )))
552            })?
553            .len();
554
555        if file_size < FOOTER_SIZE as u64 {
556            return Err(AmateRSError::StorageIntegrity(ErrorContext::new(
557                "SSTable file too small".to_string(),
558            )));
559        }
560
561        let mut reader = BufReader::new(&file);
562        reader
563            .seek(SeekFrom::End(-(FOOTER_SIZE as i64)))
564            .map_err(|e| {
565                AmateRSError::StorageIntegrity(ErrorContext::new(format!(
566                    "Failed to seek to footer: {}",
567                    e
568                )))
569            })?;
570
571        let mut footer_bytes = [0u8; FOOTER_SIZE];
572        reader.read_exact(&mut footer_bytes).map_err(|e| {
573            AmateRSError::StorageIntegrity(ErrorContext::new(format!(
574                "Failed to read footer: {}",
575                e
576            )))
577        })?;
578
579        let footer = Footer::decode(&footer_bytes)?;
580
581        // Read index
582        reader
583            .seek(SeekFrom::Start(footer.index_offset))
584            .map_err(|e| {
585                AmateRSError::StorageIntegrity(ErrorContext::new(format!(
586                    "Failed to seek to index: {}",
587                    e
588                )))
589            })?;
590
591        // Calculate index size (between index_offset and bloom_filter_offset)
592        let index_size = footer.bloom_filter_offset - footer.index_offset;
593        let mut index_bytes = vec![0u8; index_size as usize];
594        reader.read_exact(&mut index_bytes).map_err(|e| {
595            AmateRSError::StorageIntegrity(ErrorContext::new(format!(
596                "Failed to read index: {}",
597                e
598            )))
599        })?;
600
601        // Verify checksum
602        let data_len = index_bytes.len() - 4;
603        let checksum_bytes = &index_bytes[data_len..];
604        let expected_checksum = u32::from_le_bytes([
605            checksum_bytes[0],
606            checksum_bytes[1],
607            checksum_bytes[2],
608            checksum_bytes[3],
609        ]);
610        verify_checksum(&index_bytes[..data_len], expected_checksum)?;
611
612        // Parse index
613        let mut cursor = 0;
614        let num_entries = u32::from_le_bytes([
615            index_bytes[cursor],
616            index_bytes[cursor + 1],
617            index_bytes[cursor + 2],
618            index_bytes[cursor + 3],
619        ]) as usize;
620        cursor += 4;
621
622        let mut index = Vec::with_capacity(num_entries);
623
624        for _ in 0..num_entries {
625            let key_len = u32::from_le_bytes([
626                index_bytes[cursor],
627                index_bytes[cursor + 1],
628                index_bytes[cursor + 2],
629                index_bytes[cursor + 3],
630            ]) as usize;
631            cursor += 4;
632
633            let key = Key::from_slice(&index_bytes[cursor..cursor + key_len]);
634            cursor += key_len;
635
636            let offset = u64::from_le_bytes([
637                index_bytes[cursor],
638                index_bytes[cursor + 1],
639                index_bytes[cursor + 2],
640                index_bytes[cursor + 3],
641                index_bytes[cursor + 4],
642                index_bytes[cursor + 5],
643                index_bytes[cursor + 6],
644                index_bytes[cursor + 7],
645            ]);
646            cursor += 8;
647
648            index.push(IndexEntry { key, offset });
649        }
650
651        // Read bloom filter
652        reader
653            .seek(SeekFrom::Start(footer.bloom_filter_offset))
654            .map_err(|e| {
655                AmateRSError::StorageIntegrity(ErrorContext::new(format!(
656                    "Failed to seek to bloom filter: {}",
657                    e
658                )))
659            })?;
660
661        // Read bloom filter metadata
662        let mut metadata_bytes = [0u8; 24];
663        reader.read_exact(&mut metadata_bytes).map_err(|e| {
664            AmateRSError::StorageIntegrity(ErrorContext::new(format!(
665                "Failed to read bloom filter metadata: {}",
666                e
667            )))
668        })?;
669
670        let bloom_metadata = BloomFilterMetadata::from_bytes(&metadata_bytes)?;
671
672        // Read bloom filter data
673        let bloom_size = bloom_metadata.num_bits.div_ceil(8);
674        let mut bloom_data = vec![0u8; bloom_size];
675        reader.read_exact(&mut bloom_data).map_err(|e| {
676            AmateRSError::StorageIntegrity(ErrorContext::new(format!(
677                "Failed to read bloom filter data: {}",
678                e
679            )))
680        })?;
681
682        let bloom_filter = BloomFilter::from_bytes(
683            bloom_data,
684            bloom_metadata.num_bits,
685            bloom_metadata.num_hash_functions,
686            bloom_metadata.num_elements,
687        )?;
688
689        let compression_type = footer.compression_type;
690
691        Ok(Self {
692            path: path.as_ref().to_path_buf(),
693            file: Arc::new(file),
694            footer,
695            index,
696            bloom_filter,
697            compression_type,
698        })
699    }
700
701    /// Check if a key may be in the SSTable (using bloom filter)
702    ///
703    /// Returns:
704    /// - true: key MAY be in the SSTable (should check with get())
705    /// - false: key is DEFINITELY NOT in the SSTable
706    pub fn may_contain(&self, key: &Key) -> bool {
707        self.bloom_filter.may_contain(key)
708    }
709
710    /// Get a value by key
711    pub fn get(&self, key: &Key) -> Result<Option<CipherBlob>> {
712        // Check bloom filter first for fast negative lookups
713        if !self.may_contain(key) {
714            return Ok(None);
715        }
716
717        // Find the block that might contain this key
718        let Some(block_index) = self.find_block_index(key) else {
719            return Ok(None);
720        };
721        let block = self.read_block(block_index)?;
722
723        // Search for key in block
724        for (k, v) in &block.entries {
725            if k == key {
726                return Ok(Some(v.clone()));
727            }
728        }
729
730        Ok(None)
731    }
732
733    /// Find the block index that might contain the key
734    fn find_block_index(&self, key: &Key) -> Option<usize> {
735        // Binary search in index
736        match self.index.binary_search_by(|entry| entry.key.cmp(key)) {
737            Ok(idx) => Some(idx),
738            Err(idx) => {
739                if idx == 0 {
740                    None
741                } else {
742                    Some(idx - 1)
743                }
744            }
745        }
746    }
747
748    /// Read a block from disk, decompressing if necessary
749    ///
750    /// On-disk format per block:
751    /// ```text
752    /// [original_size: 4 bytes LE][compressed_size: 4 bytes LE][compressed_data: N bytes]
753    /// ```
754    fn read_block(&self, block_index: usize) -> Result<DataBlock> {
755        if block_index >= self.index.len() {
756            return Err(AmateRSError::StorageIntegrity(ErrorContext::new(
757                "Block index out of bounds".to_string(),
758            )));
759        }
760
761        let offset = self.index[block_index].offset;
762
763        let mut reader = BufReader::new(self.file.as_ref());
764        reader.seek(SeekFrom::Start(offset)).map_err(|e| {
765            AmateRSError::StorageIntegrity(ErrorContext::new(format!(
766                "Failed to seek to block: {}",
767                e
768            )))
769        })?;
770
771        // Read block envelope header (8 bytes)
772        let mut header = [0u8; 8];
773        reader.read_exact(&mut header).map_err(|e| {
774            AmateRSError::StorageIntegrity(ErrorContext::new(format!(
775                "Failed to read block header: {}",
776                e
777            )))
778        })?;
779
780        let original_size =
781            u32::from_le_bytes([header[0], header[1], header[2], header[3]]) as usize;
782        let compressed_size =
783            u32::from_le_bytes([header[4], header[5], header[6], header[7]]) as usize;
784
785        // Read compressed data
786        let mut compressed_data = vec![0u8; compressed_size];
787        reader.read_exact(&mut compressed_data).map_err(|e| {
788            AmateRSError::StorageIntegrity(ErrorContext::new(format!(
789                "Failed to read compressed block data: {}",
790                e
791            )))
792        })?;
793
794        // Decompress
795        let block_bytes =
796            compression::decompress_block(&compressed_data, self.compression_type, original_size)?;
797
798        DataBlock::decode(&block_bytes)
799    }
800
801    /// Get all entries in the SSTable (for iteration)
802    pub fn iter(&self) -> Result<Vec<(Key, CipherBlob)>> {
803        let mut entries = Vec::new();
804
805        for i in 0..self.index.len() {
806            let block = self.read_block(i)?;
807            entries.extend(block.entries);
808        }
809
810        Ok(entries)
811    }
812
813    /// Get SSTable metadata (min_key, max_key, num_entries)
814    pub fn metadata(&self) -> Result<(Key, Key, usize)> {
815        if self.index.is_empty() {
816            return Err(AmateRSError::StorageIntegrity(ErrorContext::new(
817                "SSTable has no entries".to_string(),
818            )));
819        }
820
821        // Get all entries to find min/max keys
822        let entries = self.iter()?;
823
824        if entries.is_empty() {
825            return Err(AmateRSError::StorageIntegrity(ErrorContext::new(
826                "SSTable has no data entries".to_string(),
827            )));
828        }
829
830        let min_key = entries
831            .first()
832            .ok_or_else(|| {
833                AmateRSError::StorageIntegrity(ErrorContext::new(
834                    "Failed to get first entry".to_string(),
835                ))
836            })?
837            .0
838            .clone();
839
840        let max_key = entries
841            .last()
842            .ok_or_else(|| {
843                AmateRSError::StorageIntegrity(ErrorContext::new(
844                    "Failed to get last entry".to_string(),
845                ))
846            })?
847            .0
848            .clone();
849
850        Ok((min_key, max_key, entries.len()))
851    }
852}
853
854#[cfg(test)]
855mod tests {
856    use super::*;
857    use std::env;
858
859    #[test]
860    fn test_sstable_basic_write_read() -> Result<()> {
861        let dir = env::temp_dir();
862        let path = dir.join("test_sstable_basic.sst");
863
864        // Write SSTable
865        {
866            let config = SSTableConfig::default();
867            let mut writer = SSTableWriter::new(&path, config)?;
868
869            for i in 0..10 {
870                let key = Key::from_str(&format!("key_{:03}", i));
871                let value = CipherBlob::new(vec![i as u8; 100]);
872                writer.add(key, value)?;
873            }
874
875            writer.finish()?;
876        }
877
878        // Read SSTable
879        {
880            let reader = SSTableReader::open(&path)?;
881
882            // Check we can read all keys
883            for i in 0..10 {
884                let key = Key::from_str(&format!("key_{:03}", i));
885                let value = reader.get(&key)?;
886                assert!(value.is_some());
887                let value = value.expect("Value should exist in SSTable");
888                assert_eq!(value.as_bytes()[0], i as u8);
889            }
890
891            // Non-existent key
892            let key = Key::from_str("nonexistent");
893            let value = reader.get(&key)?;
894            assert!(value.is_none());
895        }
896
897        // Cleanup
898        std::fs::remove_file(&path).ok();
899
900        Ok(())
901    }
902
903    #[test]
904    fn test_sstable_multiple_blocks() -> Result<()> {
905        let dir = env::temp_dir();
906        let path = dir.join("test_sstable_blocks.sst");
907
908        // Write with small block size to force multiple blocks
909        {
910            let config = SSTableConfig {
911                block_size: 256,
912                compression_type: CompressionType::None,
913            };
914            let mut writer = SSTableWriter::new(&path, config)?;
915
916            for i in 0..100 {
917                let key = Key::from_str(&format!("key_{:03}", i));
918                let value = CipherBlob::new(vec![i as u8; 50]);
919                writer.add(key, value)?;
920            }
921
922            writer.finish()?;
923        }
924
925        // Read and verify
926        {
927            let reader = SSTableReader::open(&path)?;
928
929            for i in 0..100 {
930                let key = Key::from_str(&format!("key_{:03}", i));
931                let value = reader.get(&key)?;
932                assert!(value.is_some());
933            }
934        }
935
936        std::fs::remove_file(&path).ok();
937
938        Ok(())
939    }
940
941    #[test]
942    fn test_sstable_iteration() -> Result<()> {
943        let dir = env::temp_dir();
944        let path = dir.join("test_sstable_iter.sst");
945
946        // Write
947        {
948            let config = SSTableConfig::default();
949            let mut writer = SSTableWriter::new(&path, config)?;
950
951            for i in 0..50 {
952                let key = Key::from_str(&format!("key_{:03}", i));
953                let value = CipherBlob::new(vec![i as u8; 100]);
954                writer.add(key, value)?;
955            }
956
957            writer.finish()?;
958        }
959
960        // Iterate
961        {
962            let reader = SSTableReader::open(&path)?;
963            let entries = reader.iter()?;
964
965            assert_eq!(entries.len(), 50);
966
967            // Check ordering
968            for i in 0..49 {
969                assert!(entries[i].0 < entries[i + 1].0);
970            }
971        }
972
973        std::fs::remove_file(&path).ok();
974
975        Ok(())
976    }
977
978    #[test]
979    fn test_sstable_empty() -> Result<()> {
980        let dir = env::temp_dir();
981        let path = dir.join("test_sstable_empty.sst");
982
983        // Write empty SSTable
984        {
985            let config = SSTableConfig::default();
986            let writer = SSTableWriter::new(&path, config)?;
987            writer.finish()?;
988        }
989
990        // Read
991        {
992            let reader = SSTableReader::open(&path)?;
993            let entries = reader.iter()?;
994            assert_eq!(entries.len(), 0);
995
996            let key = Key::from_str("any_key");
997            let value = reader.get(&key)?;
998            assert!(value.is_none());
999        }
1000
1001        std::fs::remove_file(&path).ok();
1002
1003        Ok(())
1004    }
1005
1006    #[test]
1007    fn test_sstable_large_values() -> Result<()> {
1008        let dir = env::temp_dir();
1009        let path = dir.join("test_sstable_large.sst");
1010
1011        // Write with large values
1012        {
1013            let config = SSTableConfig::default();
1014            let mut writer = SSTableWriter::new(&path, config)?;
1015
1016            for i in 0..10 {
1017                let key = Key::from_str(&format!("key_{:03}", i));
1018                let value = CipherBlob::new(vec![i as u8; 10000]); // 10KB values
1019                writer.add(key, value)?;
1020            }
1021
1022            writer.finish()?;
1023        }
1024
1025        // Read
1026        {
1027            let reader = SSTableReader::open(&path)?;
1028
1029            for i in 0..10 {
1030                let key = Key::from_str(&format!("key_{:03}", i));
1031                let value = reader.get(&key)?;
1032                assert!(value.is_some());
1033                let value = value.expect("Value should exist in SSTable");
1034                assert_eq!(value.as_bytes().len(), 10000);
1035            }
1036        }
1037
1038        std::fs::remove_file(&path).ok();
1039
1040        Ok(())
1041    }
1042
1043    #[test]
1044    fn test_sstable_corruption_detection() -> Result<()> {
1045        let dir = env::temp_dir();
1046        let path = dir.join("test_sstable_corrupt.sst");
1047
1048        // Write valid SSTable
1049        {
1050            let config = SSTableConfig::default();
1051            let mut writer = SSTableWriter::new(&path, config)?;
1052
1053            for i in 0..10 {
1054                let key = Key::from_str(&format!("key_{:03}", i));
1055                let value = CipherBlob::new(vec![i as u8; 100]);
1056                writer.add(key, value)?;
1057            }
1058
1059            writer.finish()?;
1060        }
1061
1062        // Corrupt the footer (last 28 bytes contain the footer)
1063        {
1064            let mut file = OpenOptions::new().write(true).open(&path)?;
1065            // Corrupt the checksum bytes in the footer
1066            file.seek(SeekFrom::End(-4))?;
1067            file.write_all(&[0xFF, 0xFF, 0xFF, 0xFF])?;
1068        }
1069
1070        // Try to read - should detect corruption
1071        let result = SSTableReader::open(&path);
1072        assert!(result.is_err());
1073
1074        std::fs::remove_file(&path).ok();
1075
1076        Ok(())
1077    }
1078
1079    /// Helper to write and read back an SSTable with the given compression type
1080    fn write_read_roundtrip(
1081        filename: &str,
1082        compression_type: CompressionType,
1083        num_entries: usize,
1084        value_size: usize,
1085        block_size: usize,
1086    ) -> Result<()> {
1087        let dir = env::temp_dir();
1088        let path = dir.join(filename);
1089
1090        // Write
1091        {
1092            let config = SSTableConfig {
1093                block_size,
1094                compression_type,
1095            };
1096            let mut writer = SSTableWriter::new(&path, config)?;
1097
1098            for i in 0..num_entries {
1099                let key = Key::from_str(&format!("key_{:06}", i));
1100                // Create somewhat compressible data (repeated byte patterns)
1101                let mut value_data = Vec::with_capacity(value_size);
1102                for j in 0..value_size {
1103                    value_data.push(((i + j) % 256) as u8);
1104                }
1105                let value = CipherBlob::new(value_data);
1106                writer.add(key, value)?;
1107            }
1108
1109            writer.finish()?;
1110        }
1111
1112        // Read and verify
1113        {
1114            let reader = SSTableReader::open(&path)?;
1115
1116            for i in 0..num_entries {
1117                let key = Key::from_str(&format!("key_{:06}", i));
1118                let value = reader.get(&key)?.ok_or_else(|| {
1119                    AmateRSError::StorageIntegrity(ErrorContext::new(format!(
1120                        "Missing key {} with {:?} compression",
1121                        i, compression_type
1122                    )))
1123                })?;
1124
1125                assert_eq!(value.as_bytes().len(), value_size);
1126                for j in 0..value_size {
1127                    assert_eq!(
1128                        value.as_bytes()[j],
1129                        ((i + j) % 256) as u8,
1130                        "Value mismatch at key={}, byte={}",
1131                        i,
1132                        j
1133                    );
1134                }
1135            }
1136
1137            // Verify non-existent key
1138            let missing = Key::from_str("nonexistent_key");
1139            assert!(reader.get(&missing)?.is_none());
1140
1141            // Verify iteration
1142            let entries = reader.iter()?;
1143            assert_eq!(entries.len(), num_entries);
1144        }
1145
1146        std::fs::remove_file(&path).ok();
1147        Ok(())
1148    }
1149
1150    #[test]
1151    fn test_sstable_compressed_lz4_basic() -> Result<()> {
1152        write_read_roundtrip(
1153            "test_sstable_lz4_basic.sst",
1154            CompressionType::Lz4,
1155            20,
1156            200,
1157            DEFAULT_BLOCK_SIZE,
1158        )
1159    }
1160
1161    #[test]
1162    fn test_sstable_compressed_deflate_basic() -> Result<()> {
1163        write_read_roundtrip(
1164            "test_sstable_deflate_basic.sst",
1165            CompressionType::Deflate,
1166            20,
1167            200,
1168            DEFAULT_BLOCK_SIZE,
1169        )
1170    }
1171
1172    #[test]
1173    fn test_sstable_compressed_lz4_multiple_blocks() -> Result<()> {
1174        write_read_roundtrip(
1175            "test_sstable_lz4_multiblock.sst",
1176            CompressionType::Lz4,
1177            100,
1178            100,
1179            256, // Small block size forces many blocks
1180        )
1181    }
1182
1183    #[test]
1184    fn test_sstable_compressed_deflate_multiple_blocks() -> Result<()> {
1185        write_read_roundtrip(
1186            "test_sstable_deflate_multiblock.sst",
1187            CompressionType::Deflate,
1188            100,
1189            100,
1190            256,
1191        )
1192    }
1193
1194    #[test]
1195    fn test_sstable_compression_ratio() -> Result<()> {
1196        let dir = env::temp_dir();
1197        let path_none = dir.join("test_sstable_ratio_none.sst");
1198        let path_lz4 = dir.join("test_sstable_ratio_lz4.sst");
1199        let path_deflate = dir.join("test_sstable_ratio_deflate.sst");
1200
1201        // Write highly compressible data (repeated patterns)
1202        let num_entries = 200;
1203        let value_size = 500;
1204
1205        for (path, ct) in [
1206            (&path_none, CompressionType::None),
1207            (&path_lz4, CompressionType::Lz4),
1208            (&path_deflate, CompressionType::Deflate),
1209        ] {
1210            let config = SSTableConfig {
1211                block_size: DEFAULT_BLOCK_SIZE,
1212                compression_type: ct,
1213            };
1214            let mut writer = SSTableWriter::new(path, config)?;
1215
1216            for i in 0..num_entries {
1217                let key = Key::from_str(&format!("key_{:06}", i));
1218                // Highly repetitive data
1219                let value = CipherBlob::new(vec![(i % 10) as u8; value_size]);
1220                writer.add(key, value)?;
1221            }
1222
1223            writer.finish()?;
1224        }
1225
1226        let size_none = std::fs::metadata(&path_none)
1227            .map_err(|e| {
1228                AmateRSError::StorageIntegrity(ErrorContext::new(format!(
1229                    "Failed to get file size: {}",
1230                    e
1231                )))
1232            })?
1233            .len();
1234        let size_lz4 = std::fs::metadata(&path_lz4)
1235            .map_err(|e| {
1236                AmateRSError::StorageIntegrity(ErrorContext::new(format!(
1237                    "Failed to get file size: {}",
1238                    e
1239                )))
1240            })?
1241            .len();
1242        let size_deflate = std::fs::metadata(&path_deflate)
1243            .map_err(|e| {
1244                AmateRSError::StorageIntegrity(ErrorContext::new(format!(
1245                    "Failed to get file size: {}",
1246                    e
1247                )))
1248            })?
1249            .len();
1250
1251        // Compressed files should be smaller than uncompressed
1252        assert!(
1253            size_lz4 < size_none,
1254            "LZ4 ({}) should be smaller than None ({})",
1255            size_lz4,
1256            size_none
1257        );
1258        assert!(
1259            size_deflate < size_none,
1260            "Deflate ({}) should be smaller than None ({})",
1261            size_deflate,
1262            size_none
1263        );
1264
1265        // Verify all three can be read back correctly
1266        for path in [&path_none, &path_lz4, &path_deflate] {
1267            let reader = SSTableReader::open(path)?;
1268            let entries = reader.iter()?;
1269            assert_eq!(entries.len(), num_entries);
1270        }
1271
1272        std::fs::remove_file(&path_none).ok();
1273        std::fs::remove_file(&path_lz4).ok();
1274        std::fs::remove_file(&path_deflate).ok();
1275
1276        Ok(())
1277    }
1278
1279    #[test]
1280    fn test_sstable_large_block_compression() -> Result<()> {
1281        // 64KB block with large values
1282        write_read_roundtrip(
1283            "test_sstable_large_block_comp.sst",
1284            CompressionType::Lz4,
1285            10,
1286            10000,
1287            65536,
1288        )
1289    }
1290
1291    #[test]
1292    fn test_sstable_compressed_empty() -> Result<()> {
1293        let dir = env::temp_dir();
1294
1295        for ct in [CompressionType::Lz4, CompressionType::Deflate] {
1296            let filename = format!("test_sstable_empty_{:?}.sst", ct);
1297            let path = dir.join(&filename);
1298
1299            {
1300                let config = SSTableConfig {
1301                    block_size: DEFAULT_BLOCK_SIZE,
1302                    compression_type: ct,
1303                };
1304                let writer = SSTableWriter::new(&path, config)?;
1305                writer.finish()?;
1306            }
1307
1308            {
1309                let reader = SSTableReader::open(&path)?;
1310                let entries = reader.iter()?;
1311                assert_eq!(entries.len(), 0);
1312
1313                let key = Key::from_str("any_key");
1314                assert!(reader.get(&key)?.is_none());
1315            }
1316
1317            std::fs::remove_file(&path).ok();
1318        }
1319
1320        Ok(())
1321    }
1322
1323    #[test]
1324    fn test_sstable_compressed_iteration_order() -> Result<()> {
1325        let dir = env::temp_dir();
1326        let path = dir.join("test_sstable_comp_iter_order.sst");
1327
1328        {
1329            let config = SSTableConfig {
1330                block_size: 256,
1331                compression_type: CompressionType::Deflate,
1332            };
1333            let mut writer = SSTableWriter::new(&path, config)?;
1334
1335            for i in 0..50 {
1336                let key = Key::from_str(&format!("key_{:06}", i));
1337                let value = CipherBlob::new(vec![i as u8; 100]);
1338                writer.add(key, value)?;
1339            }
1340
1341            writer.finish()?;
1342        }
1343
1344        {
1345            let reader = SSTableReader::open(&path)?;
1346            let entries = reader.iter()?;
1347
1348            assert_eq!(entries.len(), 50);
1349
1350            // Verify sorted order is preserved through compression
1351            for i in 0..49 {
1352                assert!(
1353                    entries[i].0 < entries[i + 1].0,
1354                    "Order violation at index {}",
1355                    i
1356                );
1357            }
1358        }
1359
1360        std::fs::remove_file(&path).ok();
1361        Ok(())
1362    }
1363
1364    #[test]
1365    fn test_sstable_compressed_metadata() -> Result<()> {
1366        let dir = env::temp_dir();
1367        let path = dir.join("test_sstable_comp_metadata.sst");
1368
1369        {
1370            let config = SSTableConfig {
1371                block_size: DEFAULT_BLOCK_SIZE,
1372                compression_type: CompressionType::Lz4,
1373            };
1374            let mut writer = SSTableWriter::new(&path, config)?;
1375
1376            for i in 0..25 {
1377                let key = Key::from_str(&format!("key_{:06}", i));
1378                let value = CipherBlob::new(vec![i as u8; 50]);
1379                writer.add(key, value)?;
1380            }
1381
1382            writer.finish()?;
1383        }
1384
1385        {
1386            let reader = SSTableReader::open(&path)?;
1387            let (min_key, max_key, count) = reader.metadata()?;
1388
1389            assert_eq!(min_key, Key::from_str("key_000000"));
1390            assert_eq!(max_key, Key::from_str("key_000024"));
1391            assert_eq!(count, 25);
1392        }
1393
1394        std::fs::remove_file(&path).ok();
1395        Ok(())
1396    }
1397
1398    #[test]
1399    fn test_sstable_compressed_bloom_filter() -> Result<()> {
1400        let dir = env::temp_dir();
1401        let path = dir.join("test_sstable_comp_bloom.sst");
1402
1403        {
1404            let config = SSTableConfig {
1405                block_size: DEFAULT_BLOCK_SIZE,
1406                compression_type: CompressionType::Deflate,
1407            };
1408            let mut writer = SSTableWriter::new(&path, config)?;
1409
1410            for i in 0..100 {
1411                let key = Key::from_str(&format!("existing_{:06}", i));
1412                let value = CipherBlob::new(vec![i as u8; 30]);
1413                writer.add(key, value)?;
1414            }
1415
1416            writer.finish()?;
1417        }
1418
1419        {
1420            let reader = SSTableReader::open(&path)?;
1421
1422            // Existing keys should pass bloom filter
1423            for i in 0..100 {
1424                let key = Key::from_str(&format!("existing_{:06}", i));
1425                assert!(reader.may_contain(&key));
1426            }
1427
1428            // Non-existent keys should mostly be rejected (bloom filter may have FPs)
1429            let mut rejected = 0;
1430            for i in 0..1000 {
1431                let key = Key::from_str(&format!("missing_{:06}", i));
1432                if !reader.may_contain(&key) {
1433                    rejected += 1;
1434                }
1435            }
1436            // With 1% FP rate, at least 900 of 1000 should be rejected
1437            assert!(
1438                rejected > 900,
1439                "Bloom filter rejected only {} of 1000 non-existent keys",
1440                rejected
1441            );
1442        }
1443
1444        std::fs::remove_file(&path).ok();
1445        Ok(())
1446    }
1447}