amaters_core/storage/
sstable.rs

1//! SSTable (Sorted String Table) implementation
2//!
3//! SSTables are immutable, on-disk sorted key-value stores used in LSM-Tree.
4//! They store memtable snapshots persistently with efficient read access.
5
6use crate::error::{AmateRSError, ErrorContext, Result};
7use crate::storage::{BloomFilter, BloomFilterConfig, BloomFilterMetadata};
8use crate::types::{CipherBlob, Key};
9use crate::utils::{calculate_checksum, verify_checksum};
10use std::collections::BTreeMap;
11use std::fs::{File, OpenOptions};
12use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
13use std::path::{Path, PathBuf};
14use std::sync::Arc;
15
16/// SSTable magic number: "SSTA" (0x53535441)
17const SSTABLE_MAGIC: u32 = 0x53535441;
18
19/// SSTable format version
20const SSTABLE_VERSION: u32 = 2; // Version 2 adds bloom filters
21
22/// Default block size (4KB)
23const DEFAULT_BLOCK_SIZE: usize = 4096;
24
25/// SSTable configuration
26#[derive(Debug, Clone)]
27pub struct SSTableConfig {
28    /// Block size in bytes
29    pub block_size: usize,
30    /// Enable compression (future feature)
31    pub enable_compression: bool,
32}
33
34impl Default for SSTableConfig {
35    fn default() -> Self {
36        Self {
37            block_size: DEFAULT_BLOCK_SIZE,
38            enable_compression: false,
39        }
40    }
41}
42
43/// Index entry pointing to a data block
44#[derive(Debug, Clone)]
45struct IndexEntry {
46    /// First key in the block
47    key: Key,
48    /// Offset of the block in the file
49    offset: u64,
50}
51
52/// Data block containing key-value pairs
53#[derive(Debug, Clone)]
54struct DataBlock {
55    entries: Vec<(Key, CipherBlob)>,
56    size: usize,
57}
58
59impl DataBlock {
60    fn new() -> Self {
61        Self {
62            entries: Vec::new(),
63            size: 0,
64        }
65    }
66
67    fn add_entry(&mut self, key: Key, value: CipherBlob) {
68        let entry_size = 8 + key.as_bytes().len() + value.as_bytes().len();
69        self.entries.push((key, value));
70        self.size += entry_size;
71    }
72
73    fn is_full(&self, block_size: usize) -> bool {
74        self.size >= block_size
75    }
76
77    fn encode(&self) -> Result<Vec<u8>> {
78        let mut bytes = Vec::with_capacity(self.size + 8);
79
80        // Number of entries (4 bytes)
81        bytes.extend_from_slice(&(self.entries.len() as u32).to_le_bytes());
82
83        // Entries
84        for (key, value) in &self.entries {
85            let key_bytes = key.as_bytes();
86            let value_bytes = value.as_bytes();
87
88            // Key length (4 bytes) + Key
89            bytes.extend_from_slice(&(key_bytes.len() as u32).to_le_bytes());
90            bytes.extend_from_slice(key_bytes);
91
92            // Value length (4 bytes) + Value
93            bytes.extend_from_slice(&(value_bytes.len() as u32).to_le_bytes());
94            bytes.extend_from_slice(value_bytes);
95        }
96
97        // Checksum (4 bytes)
98        let checksum = calculate_checksum(&bytes);
99        bytes.extend_from_slice(&checksum.to_le_bytes());
100
101        Ok(bytes)
102    }
103
104    fn decode(bytes: &[u8]) -> Result<Self> {
105        if bytes.len() < 8 {
106            return Err(AmateRSError::StorageIntegrity(ErrorContext::new(
107                "Data block too small".to_string(),
108            )));
109        }
110
111        // Verify checksum
112        let data_len = bytes.len() - 4;
113        let checksum_bytes = &bytes[data_len..];
114        let expected_checksum = u32::from_le_bytes([
115            checksum_bytes[0],
116            checksum_bytes[1],
117            checksum_bytes[2],
118            checksum_bytes[3],
119        ]);
120        verify_checksum(&bytes[..data_len], expected_checksum)?;
121
122        let mut cursor = 0;
123        let num_entries = u32::from_le_bytes([
124            bytes[cursor],
125            bytes[cursor + 1],
126            bytes[cursor + 2],
127            bytes[cursor + 3],
128        ]) as usize;
129        cursor += 4;
130
131        let mut block = DataBlock::new();
132
133        for _ in 0..num_entries {
134            // Read key
135            if cursor + 4 > data_len {
136                return Err(AmateRSError::StorageIntegrity(ErrorContext::new(
137                    "Incomplete key length".to_string(),
138                )));
139            }
140            let key_len = u32::from_le_bytes([
141                bytes[cursor],
142                bytes[cursor + 1],
143                bytes[cursor + 2],
144                bytes[cursor + 3],
145            ]) as usize;
146            cursor += 4;
147
148            if cursor + key_len > data_len {
149                return Err(AmateRSError::StorageIntegrity(ErrorContext::new(
150                    "Incomplete key data".to_string(),
151                )));
152            }
153            let key = Key::from_slice(&bytes[cursor..cursor + key_len]);
154            cursor += key_len;
155
156            // Read value
157            if cursor + 4 > data_len {
158                return Err(AmateRSError::StorageIntegrity(ErrorContext::new(
159                    "Incomplete value length".to_string(),
160                )));
161            }
162            let value_len = u32::from_le_bytes([
163                bytes[cursor],
164                bytes[cursor + 1],
165                bytes[cursor + 2],
166                bytes[cursor + 3],
167            ]) as usize;
168            cursor += 4;
169
170            if cursor + value_len > data_len {
171                return Err(AmateRSError::StorageIntegrity(ErrorContext::new(
172                    "Incomplete value data".to_string(),
173                )));
174            }
175            let value = CipherBlob::new(bytes[cursor..cursor + value_len].to_vec());
176            cursor += value_len;
177
178            block.add_entry(key, value);
179        }
180
181        Ok(block)
182    }
183}
184
185/// SSTable footer containing metadata
186#[derive(Debug, Clone)]
187struct Footer {
188    magic: u32,
189    version: u32,
190    index_offset: u64,
191    bloom_filter_offset: u64,
192    block_size: u32,
193    num_blocks: u32,
194    checksum: u32,
195}
196
197impl Footer {
198    fn new(index_offset: u64, bloom_filter_offset: u64, block_size: u32, num_blocks: u32) -> Self {
199        let mut footer = Self {
200            magic: SSTABLE_MAGIC,
201            version: SSTABLE_VERSION,
202            index_offset,
203            bloom_filter_offset,
204            block_size,
205            num_blocks,
206            checksum: 0,
207        };
208
209        // Calculate checksum of footer (excluding checksum field)
210        let mut bytes = Vec::new();
211        bytes.extend_from_slice(&footer.magic.to_le_bytes());
212        bytes.extend_from_slice(&footer.version.to_le_bytes());
213        bytes.extend_from_slice(&footer.index_offset.to_le_bytes());
214        bytes.extend_from_slice(&footer.bloom_filter_offset.to_le_bytes());
215        bytes.extend_from_slice(&footer.block_size.to_le_bytes());
216        bytes.extend_from_slice(&footer.num_blocks.to_le_bytes());
217        footer.checksum = calculate_checksum(&bytes);
218
219        footer
220    }
221
222    fn encode(&self) -> Vec<u8> {
223        let mut bytes = Vec::with_capacity(36);
224        bytes.extend_from_slice(&self.magic.to_le_bytes());
225        bytes.extend_from_slice(&self.version.to_le_bytes());
226        bytes.extend_from_slice(&self.index_offset.to_le_bytes());
227        bytes.extend_from_slice(&self.bloom_filter_offset.to_le_bytes());
228        bytes.extend_from_slice(&self.block_size.to_le_bytes());
229        bytes.extend_from_slice(&self.num_blocks.to_le_bytes());
230        bytes.extend_from_slice(&self.checksum.to_le_bytes());
231        bytes
232    }
233
234    fn decode(bytes: &[u8]) -> Result<Self> {
235        if bytes.len() < 36 {
236            return Err(AmateRSError::StorageIntegrity(ErrorContext::new(
237                "Footer too small".to_string(),
238            )));
239        }
240
241        let magic = u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
242        let version = u32::from_le_bytes([bytes[4], bytes[5], bytes[6], bytes[7]]);
243        let index_offset = u64::from_le_bytes([
244            bytes[8], bytes[9], bytes[10], bytes[11], bytes[12], bytes[13], bytes[14], bytes[15],
245        ]);
246        let bloom_filter_offset = u64::from_le_bytes([
247            bytes[16], bytes[17], bytes[18], bytes[19], bytes[20], bytes[21], bytes[22], bytes[23],
248        ]);
249        let block_size = u32::from_le_bytes([bytes[24], bytes[25], bytes[26], bytes[27]]);
250        let num_blocks = u32::from_le_bytes([bytes[28], bytes[29], bytes[30], bytes[31]]);
251        let checksum = u32::from_le_bytes([bytes[32], bytes[33], bytes[34], bytes[35]]);
252
253        if magic != SSTABLE_MAGIC {
254            return Err(AmateRSError::StorageIntegrity(ErrorContext::new(format!(
255                "Invalid SSTable magic: expected {}, got {}",
256                SSTABLE_MAGIC, magic
257            ))));
258        }
259
260        if version != SSTABLE_VERSION {
261            return Err(AmateRSError::StorageIntegrity(ErrorContext::new(format!(
262                "Unsupported SSTable version: {}",
263                version
264            ))));
265        }
266
267        // Verify checksum
268        let mut verify_bytes = Vec::new();
269        verify_bytes.extend_from_slice(&magic.to_le_bytes());
270        verify_bytes.extend_from_slice(&version.to_le_bytes());
271        verify_bytes.extend_from_slice(&index_offset.to_le_bytes());
272        verify_bytes.extend_from_slice(&bloom_filter_offset.to_le_bytes());
273        verify_bytes.extend_from_slice(&block_size.to_le_bytes());
274        verify_bytes.extend_from_slice(&num_blocks.to_le_bytes());
275        verify_checksum(&verify_bytes, checksum)?;
276
277        Ok(Self {
278            magic,
279            version,
280            index_offset,
281            bloom_filter_offset,
282            block_size,
283            num_blocks,
284            checksum,
285        })
286    }
287}
288
289/// SSTable writer - builds SSTable from sorted entries
290pub struct SSTableWriter {
291    path: PathBuf,
292    config: SSTableConfig,
293    writer: Option<BufWriter<File>>,
294    current_block: DataBlock,
295    index: Vec<IndexEntry>,
296    current_offset: u64,
297    bloom_filter: BloomFilter,
298}
299
300impl SSTableWriter {
301    /// Create a new SSTable writer
302    pub fn new<P: AsRef<Path>>(path: P, config: SSTableConfig) -> Result<Self> {
303        let file = OpenOptions::new()
304            .write(true)
305            .create(true)
306            .truncate(true)
307            .open(path.as_ref())
308            .map_err(|e| {
309                AmateRSError::StorageIntegrity(ErrorContext::new(format!(
310                    "Failed to create SSTable file: {}",
311                    e
312                )))
313            })?;
314
315        // Create bloom filter with default configuration
316        let bloom_filter = BloomFilter::new(BloomFilterConfig {
317            expected_elements: 10000,  // Default estimate
318            false_positive_rate: 0.01, // 1%
319        });
320
321        Ok(Self {
322            path: path.as_ref().to_path_buf(),
323            config,
324            writer: Some(BufWriter::new(file)),
325            current_block: DataBlock::new(),
326            index: Vec::new(),
327            current_offset: 0,
328            bloom_filter,
329        })
330    }
331
332    /// Add a key-value pair (must be in sorted order)
333    pub fn add(&mut self, key: Key, value: CipherBlob) -> Result<()> {
334        // If adding this entry would exceed block size, flush current block
335        let entry_size = 8 + key.as_bytes().len() + value.as_bytes().len();
336        if self.current_block.size + entry_size > self.config.block_size
337            && !self.current_block.entries.is_empty()
338        {
339            self.flush_block()?;
340        }
341
342        // If this is the first entry in the block, add to index
343        if self.current_block.entries.is_empty() {
344            self.index.push(IndexEntry {
345                key: key.clone(),
346                offset: self.current_offset,
347            });
348        }
349
350        // Insert key into bloom filter
351        self.bloom_filter.insert(&key);
352
353        self.current_block.add_entry(key, value);
354        Ok(())
355    }
356
357    /// Flush current block to disk
358    fn flush_block(&mut self) -> Result<()> {
359        if self.current_block.entries.is_empty() {
360            return Ok(());
361        }
362
363        let writer = self.writer.as_mut().ok_or_else(|| {
364            AmateRSError::StorageIntegrity(ErrorContext::new(
365                "SSTable writer already finalized".to_string(),
366            ))
367        })?;
368
369        let block_bytes = self.current_block.encode()?;
370        writer.write_all(&block_bytes).map_err(|e| {
371            AmateRSError::StorageIntegrity(ErrorContext::new(format!(
372                "Failed to write block: {}",
373                e
374            )))
375        })?;
376
377        self.current_offset += block_bytes.len() as u64;
378        self.current_block = DataBlock::new();
379
380        Ok(())
381    }
382
383    /// Finalize the SSTable (write index and footer)
384    pub fn finish(mut self) -> Result<()> {
385        // Flush remaining block
386        self.flush_block()?;
387
388        let writer = self.writer.as_mut().ok_or_else(|| {
389            AmateRSError::StorageIntegrity(ErrorContext::new(
390                "SSTable writer already finalized".to_string(),
391            ))
392        })?;
393
394        // Write index block
395        let index_offset = self.current_offset;
396        let mut index_bytes = Vec::new();
397
398        // Number of index entries
399        index_bytes.extend_from_slice(&(self.index.len() as u32).to_le_bytes());
400
401        for entry in &self.index {
402            let key_bytes = entry.key.as_bytes();
403            index_bytes.extend_from_slice(&(key_bytes.len() as u32).to_le_bytes());
404            index_bytes.extend_from_slice(key_bytes);
405            index_bytes.extend_from_slice(&entry.offset.to_le_bytes());
406        }
407
408        let index_checksum = calculate_checksum(&index_bytes);
409        index_bytes.extend_from_slice(&index_checksum.to_le_bytes());
410
411        writer.write_all(&index_bytes).map_err(|e| {
412            AmateRSError::StorageIntegrity(ErrorContext::new(format!(
413                "Failed to write index: {}",
414                e
415            )))
416        })?;
417        self.current_offset += index_bytes.len() as u64;
418
419        // Write bloom filter
420        let bloom_filter_offset = self.current_offset;
421
422        // Write bloom filter metadata
423        let bloom_metadata = self.bloom_filter.metadata();
424        let metadata_bytes = bloom_metadata.to_bytes();
425        writer.write_all(&metadata_bytes).map_err(|e| {
426            AmateRSError::StorageIntegrity(ErrorContext::new(format!(
427                "Failed to write bloom filter metadata: {}",
428                e
429            )))
430        })?;
431        self.current_offset += metadata_bytes.len() as u64;
432
433        // Write bloom filter data
434        let bloom_data = self.bloom_filter.as_bytes();
435        writer.write_all(bloom_data).map_err(|e| {
436            AmateRSError::StorageIntegrity(ErrorContext::new(format!(
437                "Failed to write bloom filter data: {}",
438                e
439            )))
440        })?;
441        self.current_offset += bloom_data.len() as u64;
442
443        // Write footer
444        let footer = Footer::new(
445            index_offset,
446            bloom_filter_offset,
447            self.config.block_size as u32,
448            self.index.len() as u32,
449        );
450        let footer_bytes = footer.encode();
451        writer.write_all(&footer_bytes).map_err(|e| {
452            AmateRSError::StorageIntegrity(ErrorContext::new(format!(
453                "Failed to write footer: {}",
454                e
455            )))
456        })?;
457
458        // Flush and sync
459        writer.flush().map_err(|e| {
460            AmateRSError::StorageIntegrity(ErrorContext::new(format!("Failed to flush: {}", e)))
461        })?;
462
463        writer.get_ref().sync_all().map_err(|e| {
464            AmateRSError::StorageIntegrity(ErrorContext::new(format!("Failed to sync: {}", e)))
465        })?;
466
467        self.writer = None;
468
469        Ok(())
470    }
471}
472
473/// SSTable reader - provides read access to SSTable
474pub struct SSTableReader {
475    path: PathBuf,
476    file: Arc<File>,
477    footer: Footer,
478    index: Vec<IndexEntry>,
479    bloom_filter: BloomFilter,
480}
481
482impl SSTableReader {
483    /// Open an existing SSTable for reading
484    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
485        let file = File::open(path.as_ref()).map_err(|e| {
486            AmateRSError::StorageIntegrity(ErrorContext::new(format!(
487                "Failed to open SSTable: {}",
488                e
489            )))
490        })?;
491
492        // Read footer
493        let file_size = file
494            .metadata()
495            .map_err(|e| {
496                AmateRSError::StorageIntegrity(ErrorContext::new(format!(
497                    "Failed to get file metadata: {}",
498                    e
499                )))
500            })?
501            .len();
502
503        if file_size < 36 {
504            return Err(AmateRSError::StorageIntegrity(ErrorContext::new(
505                "SSTable file too small".to_string(),
506            )));
507        }
508
509        let mut reader = BufReader::new(&file);
510        reader.seek(SeekFrom::End(-36)).map_err(|e| {
511            AmateRSError::StorageIntegrity(ErrorContext::new(format!(
512                "Failed to seek to footer: {}",
513                e
514            )))
515        })?;
516
517        let mut footer_bytes = [0u8; 36];
518        reader.read_exact(&mut footer_bytes).map_err(|e| {
519            AmateRSError::StorageIntegrity(ErrorContext::new(format!(
520                "Failed to read footer: {}",
521                e
522            )))
523        })?;
524
525        let footer = Footer::decode(&footer_bytes)?;
526
527        // Read index
528        reader
529            .seek(SeekFrom::Start(footer.index_offset))
530            .map_err(|e| {
531                AmateRSError::StorageIntegrity(ErrorContext::new(format!(
532                    "Failed to seek to index: {}",
533                    e
534                )))
535            })?;
536
537        // Calculate index size (between index_offset and bloom_filter_offset)
538        let index_size = footer.bloom_filter_offset - footer.index_offset;
539        let mut index_bytes = vec![0u8; index_size as usize];
540        reader.read_exact(&mut index_bytes).map_err(|e| {
541            AmateRSError::StorageIntegrity(ErrorContext::new(format!(
542                "Failed to read index: {}",
543                e
544            )))
545        })?;
546
547        // Verify checksum
548        let data_len = index_bytes.len() - 4;
549        let checksum_bytes = &index_bytes[data_len..];
550        let expected_checksum = u32::from_le_bytes([
551            checksum_bytes[0],
552            checksum_bytes[1],
553            checksum_bytes[2],
554            checksum_bytes[3],
555        ]);
556        verify_checksum(&index_bytes[..data_len], expected_checksum)?;
557
558        // Parse index
559        let mut cursor = 0;
560        let num_entries = u32::from_le_bytes([
561            index_bytes[cursor],
562            index_bytes[cursor + 1],
563            index_bytes[cursor + 2],
564            index_bytes[cursor + 3],
565        ]) as usize;
566        cursor += 4;
567
568        let mut index = Vec::with_capacity(num_entries);
569
570        for _ in 0..num_entries {
571            let key_len = u32::from_le_bytes([
572                index_bytes[cursor],
573                index_bytes[cursor + 1],
574                index_bytes[cursor + 2],
575                index_bytes[cursor + 3],
576            ]) as usize;
577            cursor += 4;
578
579            let key = Key::from_slice(&index_bytes[cursor..cursor + key_len]);
580            cursor += key_len;
581
582            let offset = u64::from_le_bytes([
583                index_bytes[cursor],
584                index_bytes[cursor + 1],
585                index_bytes[cursor + 2],
586                index_bytes[cursor + 3],
587                index_bytes[cursor + 4],
588                index_bytes[cursor + 5],
589                index_bytes[cursor + 6],
590                index_bytes[cursor + 7],
591            ]);
592            cursor += 8;
593
594            index.push(IndexEntry { key, offset });
595        }
596
597        // Read bloom filter
598        reader
599            .seek(SeekFrom::Start(footer.bloom_filter_offset))
600            .map_err(|e| {
601                AmateRSError::StorageIntegrity(ErrorContext::new(format!(
602                    "Failed to seek to bloom filter: {}",
603                    e
604                )))
605            })?;
606
607        // Read bloom filter metadata
608        let mut metadata_bytes = [0u8; 24];
609        reader.read_exact(&mut metadata_bytes).map_err(|e| {
610            AmateRSError::StorageIntegrity(ErrorContext::new(format!(
611                "Failed to read bloom filter metadata: {}",
612                e
613            )))
614        })?;
615
616        let bloom_metadata = BloomFilterMetadata::from_bytes(&metadata_bytes)?;
617
618        // Read bloom filter data
619        let bloom_size = (bloom_metadata.num_bits + 7) / 8;
620        let mut bloom_data = vec![0u8; bloom_size];
621        reader.read_exact(&mut bloom_data).map_err(|e| {
622            AmateRSError::StorageIntegrity(ErrorContext::new(format!(
623                "Failed to read bloom filter data: {}",
624                e
625            )))
626        })?;
627
628        let bloom_filter = BloomFilter::from_bytes(
629            bloom_data,
630            bloom_metadata.num_bits,
631            bloom_metadata.num_hash_functions,
632            bloom_metadata.num_elements,
633        )?;
634
635        Ok(Self {
636            path: path.as_ref().to_path_buf(),
637            file: Arc::new(file),
638            footer,
639            index,
640            bloom_filter,
641        })
642    }
643
644    /// Check if a key may be in the SSTable (using bloom filter)
645    ///
646    /// Returns:
647    /// - true: key MAY be in the SSTable (should check with get())
648    /// - false: key is DEFINITELY NOT in the SSTable
649    pub fn may_contain(&self, key: &Key) -> bool {
650        self.bloom_filter.may_contain(key)
651    }
652
653    /// Get a value by key
654    pub fn get(&self, key: &Key) -> Result<Option<CipherBlob>> {
655        // Check bloom filter first for fast negative lookups
656        if !self.may_contain(key) {
657            return Ok(None);
658        }
659
660        // Find the block that might contain this key
661        let Some(block_index) = self.find_block_index(key) else {
662            return Ok(None);
663        };
664        let block = self.read_block(block_index)?;
665
666        // Search for key in block
667        for (k, v) in &block.entries {
668            if k == key {
669                return Ok(Some(v.clone()));
670            }
671        }
672
673        Ok(None)
674    }
675
676    /// Find the block index that might contain the key
677    fn find_block_index(&self, key: &Key) -> Option<usize> {
678        // Binary search in index
679        match self.index.binary_search_by(|entry| entry.key.cmp(key)) {
680            Ok(idx) => Some(idx),
681            Err(idx) => {
682                if idx == 0 {
683                    None
684                } else {
685                    Some(idx - 1)
686                }
687            }
688        }
689    }
690
691    /// Read a block from disk
692    fn read_block(&self, block_index: usize) -> Result<DataBlock> {
693        if block_index >= self.index.len() {
694            return Err(AmateRSError::StorageIntegrity(ErrorContext::new(
695                "Block index out of bounds".to_string(),
696            )));
697        }
698
699        let offset = self.index[block_index].offset;
700        let next_offset = if block_index + 1 < self.index.len() {
701            self.index[block_index + 1].offset
702        } else {
703            self.footer.index_offset
704        };
705
706        let block_size = (next_offset - offset) as usize;
707        let mut block_bytes = vec![0u8; block_size];
708
709        let mut reader = BufReader::new(self.file.as_ref());
710        reader.seek(SeekFrom::Start(offset)).map_err(|e| {
711            AmateRSError::StorageIntegrity(ErrorContext::new(format!(
712                "Failed to seek to block: {}",
713                e
714            )))
715        })?;
716
717        reader.read_exact(&mut block_bytes).map_err(|e| {
718            AmateRSError::StorageIntegrity(ErrorContext::new(format!(
719                "Failed to read block: {}",
720                e
721            )))
722        })?;
723
724        DataBlock::decode(&block_bytes)
725    }
726
727    /// Get all entries in the SSTable (for iteration)
728    pub fn iter(&self) -> Result<Vec<(Key, CipherBlob)>> {
729        let mut entries = Vec::new();
730
731        for i in 0..self.index.len() {
732            let block = self.read_block(i)?;
733            entries.extend(block.entries);
734        }
735
736        Ok(entries)
737    }
738
739    /// Get SSTable metadata (min_key, max_key, num_entries)
740    pub fn metadata(&self) -> Result<(Key, Key, usize)> {
741        if self.index.is_empty() {
742            return Err(AmateRSError::StorageIntegrity(ErrorContext::new(
743                "SSTable has no entries".to_string(),
744            )));
745        }
746
747        // Get all entries to find min/max keys
748        let entries = self.iter()?;
749
750        if entries.is_empty() {
751            return Err(AmateRSError::StorageIntegrity(ErrorContext::new(
752                "SSTable has no data entries".to_string(),
753            )));
754        }
755
756        let min_key = entries
757            .first()
758            .ok_or_else(|| {
759                AmateRSError::StorageIntegrity(ErrorContext::new(
760                    "Failed to get first entry".to_string(),
761                ))
762            })?
763            .0
764            .clone();
765
766        let max_key = entries
767            .last()
768            .ok_or_else(|| {
769                AmateRSError::StorageIntegrity(ErrorContext::new(
770                    "Failed to get last entry".to_string(),
771                ))
772            })?
773            .0
774            .clone();
775
776        Ok((min_key, max_key, entries.len()))
777    }
778}
779
780#[cfg(test)]
781mod tests {
782    use super::*;
783    use std::env;
784
785    #[test]
786    fn test_sstable_basic_write_read() -> Result<()> {
787        let dir = env::temp_dir();
788        let path = dir.join("test_sstable_basic.sst");
789
790        // Write SSTable
791        {
792            let config = SSTableConfig::default();
793            let mut writer = SSTableWriter::new(&path, config)?;
794
795            for i in 0..10 {
796                let key = Key::from_str(&format!("key_{:03}", i));
797                let value = CipherBlob::new(vec![i as u8; 100]);
798                writer.add(key, value)?;
799            }
800
801            writer.finish()?;
802        }
803
804        // Read SSTable
805        {
806            let reader = SSTableReader::open(&path)?;
807
808            // Check we can read all keys
809            for i in 0..10 {
810                let key = Key::from_str(&format!("key_{:03}", i));
811                let value = reader.get(&key)?;
812                assert!(value.is_some());
813                let value = value.expect("Value should exist in SSTable");
814                assert_eq!(value.as_bytes()[0], i as u8);
815            }
816
817            // Non-existent key
818            let key = Key::from_str("nonexistent");
819            let value = reader.get(&key)?;
820            assert!(value.is_none());
821        }
822
823        // Cleanup
824        std::fs::remove_file(&path).ok();
825
826        Ok(())
827    }
828
829    #[test]
830    fn test_sstable_multiple_blocks() -> Result<()> {
831        let dir = env::temp_dir();
832        let path = dir.join("test_sstable_blocks.sst");
833
834        // Write with small block size to force multiple blocks
835        {
836            let config = SSTableConfig {
837                block_size: 256,
838                enable_compression: false,
839            };
840            let mut writer = SSTableWriter::new(&path, config)?;
841
842            for i in 0..100 {
843                let key = Key::from_str(&format!("key_{:03}", i));
844                let value = CipherBlob::new(vec![i as u8; 50]);
845                writer.add(key, value)?;
846            }
847
848            writer.finish()?;
849        }
850
851        // Read and verify
852        {
853            let reader = SSTableReader::open(&path)?;
854
855            for i in 0..100 {
856                let key = Key::from_str(&format!("key_{:03}", i));
857                let value = reader.get(&key)?;
858                assert!(value.is_some());
859            }
860        }
861
862        std::fs::remove_file(&path).ok();
863
864        Ok(())
865    }
866
867    #[test]
868    fn test_sstable_iteration() -> Result<()> {
869        let dir = env::temp_dir();
870        let path = dir.join("test_sstable_iter.sst");
871
872        // Write
873        {
874            let config = SSTableConfig::default();
875            let mut writer = SSTableWriter::new(&path, config)?;
876
877            for i in 0..50 {
878                let key = Key::from_str(&format!("key_{:03}", i));
879                let value = CipherBlob::new(vec![i as u8; 100]);
880                writer.add(key, value)?;
881            }
882
883            writer.finish()?;
884        }
885
886        // Iterate
887        {
888            let reader = SSTableReader::open(&path)?;
889            let entries = reader.iter()?;
890
891            assert_eq!(entries.len(), 50);
892
893            // Check ordering
894            for i in 0..49 {
895                assert!(entries[i].0 < entries[i + 1].0);
896            }
897        }
898
899        std::fs::remove_file(&path).ok();
900
901        Ok(())
902    }
903
904    #[test]
905    fn test_sstable_empty() -> Result<()> {
906        let dir = env::temp_dir();
907        let path = dir.join("test_sstable_empty.sst");
908
909        // Write empty SSTable
910        {
911            let config = SSTableConfig::default();
912            let writer = SSTableWriter::new(&path, config)?;
913            writer.finish()?;
914        }
915
916        // Read
917        {
918            let reader = SSTableReader::open(&path)?;
919            let entries = reader.iter()?;
920            assert_eq!(entries.len(), 0);
921
922            let key = Key::from_str("any_key");
923            let value = reader.get(&key)?;
924            assert!(value.is_none());
925        }
926
927        std::fs::remove_file(&path).ok();
928
929        Ok(())
930    }
931
932    #[test]
933    fn test_sstable_large_values() -> Result<()> {
934        let dir = env::temp_dir();
935        let path = dir.join("test_sstable_large.sst");
936
937        // Write with large values
938        {
939            let config = SSTableConfig::default();
940            let mut writer = SSTableWriter::new(&path, config)?;
941
942            for i in 0..10 {
943                let key = Key::from_str(&format!("key_{:03}", i));
944                let value = CipherBlob::new(vec![i as u8; 10000]); // 10KB values
945                writer.add(key, value)?;
946            }
947
948            writer.finish()?;
949        }
950
951        // Read
952        {
953            let reader = SSTableReader::open(&path)?;
954
955            for i in 0..10 {
956                let key = Key::from_str(&format!("key_{:03}", i));
957                let value = reader.get(&key)?;
958                assert!(value.is_some());
959                let value = value.expect("Value should exist in SSTable");
960                assert_eq!(value.as_bytes().len(), 10000);
961            }
962        }
963
964        std::fs::remove_file(&path).ok();
965
966        Ok(())
967    }
968
969    #[test]
970    fn test_sstable_corruption_detection() -> Result<()> {
971        let dir = env::temp_dir();
972        let path = dir.join("test_sstable_corrupt.sst");
973
974        // Write valid SSTable
975        {
976            let config = SSTableConfig::default();
977            let mut writer = SSTableWriter::new(&path, config)?;
978
979            for i in 0..10 {
980                let key = Key::from_str(&format!("key_{:03}", i));
981                let value = CipherBlob::new(vec![i as u8; 100]);
982                writer.add(key, value)?;
983            }
984
985            writer.finish()?;
986        }
987
988        // Corrupt the footer (last 28 bytes contain the footer)
989        {
990            let mut file = OpenOptions::new().write(true).open(&path)?;
991            // Corrupt the checksum bytes in the footer
992            file.seek(SeekFrom::End(-4))?;
993            file.write_all(&[0xFF, 0xFF, 0xFF, 0xFF])?;
994        }
995
996        // Try to read - should detect corruption
997        let result = SSTableReader::open(&path);
998        assert!(result.is_err());
999
1000        std::fs::remove_file(&path).ok();
1001
1002        Ok(())
1003    }
1004}