1use crate::error::{AmateRSError, ErrorContext, Result};
7use crate::storage::compression::{self, CompressionType};
8use crate::storage::{BloomFilter, BloomFilterConfig, BloomFilterMetadata};
9use crate::types::{CipherBlob, Key};
10use crate::utils::{calculate_checksum, verify_checksum};
11use std::collections::BTreeMap;
12use std::fs::{File, OpenOptions};
13use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
14use std::path::{Path, PathBuf};
15use std::sync::Arc;
16
17const SSTABLE_MAGIC: u32 = 0x53535441;
19
20const SSTABLE_VERSION: u32 = 3; const DEFAULT_BLOCK_SIZE: usize = 4096;
25
26#[derive(Debug, Clone)]
28pub struct SSTableConfig {
29 pub block_size: usize,
31 pub compression_type: CompressionType,
33}
34
35impl Default for SSTableConfig {
36 fn default() -> Self {
37 Self {
38 block_size: DEFAULT_BLOCK_SIZE,
39 compression_type: CompressionType::None,
40 }
41 }
42}
43
44#[derive(Debug, Clone)]
46struct IndexEntry {
47 key: Key,
49 offset: u64,
51}
52
53#[derive(Debug, Clone)]
55struct DataBlock {
56 entries: Vec<(Key, CipherBlob)>,
57 size: usize,
58}
59
60impl DataBlock {
61 fn new() -> Self {
62 Self {
63 entries: Vec::new(),
64 size: 0,
65 }
66 }
67
68 fn add_entry(&mut self, key: Key, value: CipherBlob) {
69 let entry_size = 8 + key.as_bytes().len() + value.as_bytes().len();
70 self.entries.push((key, value));
71 self.size += entry_size;
72 }
73
74 fn is_full(&self, block_size: usize) -> bool {
75 self.size >= block_size
76 }
77
78 fn encode(&self) -> Result<Vec<u8>> {
79 let mut bytes = Vec::with_capacity(self.size + 8);
80
81 bytes.extend_from_slice(&(self.entries.len() as u32).to_le_bytes());
83
84 for (key, value) in &self.entries {
86 let key_bytes = key.as_bytes();
87 let value_bytes = value.as_bytes();
88
89 bytes.extend_from_slice(&(key_bytes.len() as u32).to_le_bytes());
91 bytes.extend_from_slice(key_bytes);
92
93 bytes.extend_from_slice(&(value_bytes.len() as u32).to_le_bytes());
95 bytes.extend_from_slice(value_bytes);
96 }
97
98 let checksum = calculate_checksum(&bytes);
100 bytes.extend_from_slice(&checksum.to_le_bytes());
101
102 Ok(bytes)
103 }
104
105 fn decode(bytes: &[u8]) -> Result<Self> {
106 if bytes.len() < 8 {
107 return Err(AmateRSError::StorageIntegrity(ErrorContext::new(
108 "Data block too small".to_string(),
109 )));
110 }
111
112 let data_len = bytes.len() - 4;
114 let checksum_bytes = &bytes[data_len..];
115 let expected_checksum = u32::from_le_bytes([
116 checksum_bytes[0],
117 checksum_bytes[1],
118 checksum_bytes[2],
119 checksum_bytes[3],
120 ]);
121 verify_checksum(&bytes[..data_len], expected_checksum)?;
122
123 let mut cursor = 0;
124 let num_entries = u32::from_le_bytes([
125 bytes[cursor],
126 bytes[cursor + 1],
127 bytes[cursor + 2],
128 bytes[cursor + 3],
129 ]) as usize;
130 cursor += 4;
131
132 let mut block = DataBlock::new();
133
134 for _ in 0..num_entries {
135 if cursor + 4 > data_len {
137 return Err(AmateRSError::StorageIntegrity(ErrorContext::new(
138 "Incomplete key length".to_string(),
139 )));
140 }
141 let key_len = u32::from_le_bytes([
142 bytes[cursor],
143 bytes[cursor + 1],
144 bytes[cursor + 2],
145 bytes[cursor + 3],
146 ]) as usize;
147 cursor += 4;
148
149 if cursor + key_len > data_len {
150 return Err(AmateRSError::StorageIntegrity(ErrorContext::new(
151 "Incomplete key data".to_string(),
152 )));
153 }
154 let key = Key::from_slice(&bytes[cursor..cursor + key_len]);
155 cursor += key_len;
156
157 if cursor + 4 > data_len {
159 return Err(AmateRSError::StorageIntegrity(ErrorContext::new(
160 "Incomplete value length".to_string(),
161 )));
162 }
163 let value_len = u32::from_le_bytes([
164 bytes[cursor],
165 bytes[cursor + 1],
166 bytes[cursor + 2],
167 bytes[cursor + 3],
168 ]) as usize;
169 cursor += 4;
170
171 if cursor + value_len > data_len {
172 return Err(AmateRSError::StorageIntegrity(ErrorContext::new(
173 "Incomplete value data".to_string(),
174 )));
175 }
176 let value = CipherBlob::new(bytes[cursor..cursor + value_len].to_vec());
177 cursor += value_len;
178
179 block.add_entry(key, value);
180 }
181
182 Ok(block)
183 }
184}
185
186const FOOTER_SIZE: usize = 37;
188
189#[derive(Debug, Clone)]
191struct Footer {
192 magic: u32,
193 version: u32,
194 index_offset: u64,
195 bloom_filter_offset: u64,
196 block_size: u32,
197 num_blocks: u32,
198 compression_type: CompressionType,
199 checksum: u32,
200}
201
202impl Footer {
203 fn new(
204 index_offset: u64,
205 bloom_filter_offset: u64,
206 block_size: u32,
207 num_blocks: u32,
208 compression_type: CompressionType,
209 ) -> Self {
210 let mut footer = Self {
211 magic: SSTABLE_MAGIC,
212 version: SSTABLE_VERSION,
213 index_offset,
214 bloom_filter_offset,
215 block_size,
216 num_blocks,
217 compression_type,
218 checksum: 0,
219 };
220
221 footer.checksum = footer.compute_checksum();
222 footer
223 }
224
225 fn compute_checksum(&self) -> u32 {
226 let mut bytes = Vec::new();
227 bytes.extend_from_slice(&self.magic.to_le_bytes());
228 bytes.extend_from_slice(&self.version.to_le_bytes());
229 bytes.extend_from_slice(&self.index_offset.to_le_bytes());
230 bytes.extend_from_slice(&self.bloom_filter_offset.to_le_bytes());
231 bytes.extend_from_slice(&self.block_size.to_le_bytes());
232 bytes.extend_from_slice(&self.num_blocks.to_le_bytes());
233 bytes.push(self.compression_type.to_byte());
234 calculate_checksum(&bytes)
235 }
236
237 fn encode(&self) -> Vec<u8> {
238 let mut bytes = Vec::with_capacity(FOOTER_SIZE);
239 bytes.extend_from_slice(&self.magic.to_le_bytes());
240 bytes.extend_from_slice(&self.version.to_le_bytes());
241 bytes.extend_from_slice(&self.index_offset.to_le_bytes());
242 bytes.extend_from_slice(&self.bloom_filter_offset.to_le_bytes());
243 bytes.extend_from_slice(&self.block_size.to_le_bytes());
244 bytes.extend_from_slice(&self.num_blocks.to_le_bytes());
245 bytes.push(self.compression_type.to_byte());
246 bytes.extend_from_slice(&self.checksum.to_le_bytes());
247 bytes
248 }
249
250 fn decode(bytes: &[u8]) -> Result<Self> {
251 if bytes.len() < FOOTER_SIZE {
252 return Err(AmateRSError::StorageIntegrity(ErrorContext::new(
253 "Footer too small".to_string(),
254 )));
255 }
256
257 let magic = u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
258 let version = u32::from_le_bytes([bytes[4], bytes[5], bytes[6], bytes[7]]);
259 let index_offset = u64::from_le_bytes([
260 bytes[8], bytes[9], bytes[10], bytes[11], bytes[12], bytes[13], bytes[14], bytes[15],
261 ]);
262 let bloom_filter_offset = u64::from_le_bytes([
263 bytes[16], bytes[17], bytes[18], bytes[19], bytes[20], bytes[21], bytes[22], bytes[23],
264 ]);
265 let block_size = u32::from_le_bytes([bytes[24], bytes[25], bytes[26], bytes[27]]);
266 let num_blocks = u32::from_le_bytes([bytes[28], bytes[29], bytes[30], bytes[31]]);
267 let compression_type = CompressionType::from_byte(bytes[32])?;
268 let checksum = u32::from_le_bytes([bytes[33], bytes[34], bytes[35], bytes[36]]);
269
270 if magic != SSTABLE_MAGIC {
271 return Err(AmateRSError::StorageIntegrity(ErrorContext::new(format!(
272 "Invalid SSTable magic: expected {}, got {}",
273 SSTABLE_MAGIC, magic
274 ))));
275 }
276
277 if version != SSTABLE_VERSION {
278 return Err(AmateRSError::StorageIntegrity(ErrorContext::new(format!(
279 "Unsupported SSTable version: {}",
280 version
281 ))));
282 }
283
284 let footer = Self {
285 magic,
286 version,
287 index_offset,
288 bloom_filter_offset,
289 block_size,
290 num_blocks,
291 compression_type,
292 checksum,
293 };
294
295 let expected = footer.compute_checksum();
297 if checksum != expected {
298 return Err(AmateRSError::StorageIntegrity(ErrorContext::new(format!(
299 "Footer checksum mismatch: expected {}, got {}",
300 expected, checksum
301 ))));
302 }
303
304 Ok(footer)
305 }
306}
307
308pub struct SSTableWriter {
310 path: PathBuf,
311 config: SSTableConfig,
312 writer: Option<BufWriter<File>>,
313 current_block: DataBlock,
314 index: Vec<IndexEntry>,
315 current_offset: u64,
316 bloom_filter: BloomFilter,
317}
318
319impl SSTableWriter {
320 pub fn new<P: AsRef<Path>>(path: P, config: SSTableConfig) -> Result<Self> {
322 let file = OpenOptions::new()
323 .write(true)
324 .create(true)
325 .truncate(true)
326 .open(path.as_ref())
327 .map_err(|e| {
328 AmateRSError::StorageIntegrity(ErrorContext::new(format!(
329 "Failed to create SSTable file: {}",
330 e
331 )))
332 })?;
333
334 let bloom_filter = BloomFilter::new(BloomFilterConfig {
336 expected_elements: 10000, false_positive_rate: 0.01, });
339
340 Ok(Self {
341 path: path.as_ref().to_path_buf(),
342 config,
343 writer: Some(BufWriter::new(file)),
344 current_block: DataBlock::new(),
345 index: Vec::new(),
346 current_offset: 0,
347 bloom_filter,
348 })
349 }
350
351 pub fn add(&mut self, key: Key, value: CipherBlob) -> Result<()> {
353 let entry_size = 8 + key.as_bytes().len() + value.as_bytes().len();
355 if self.current_block.size + entry_size > self.config.block_size
356 && !self.current_block.entries.is_empty()
357 {
358 self.flush_block()?;
359 }
360
361 if self.current_block.entries.is_empty() {
363 self.index.push(IndexEntry {
364 key: key.clone(),
365 offset: self.current_offset,
366 });
367 }
368
369 self.bloom_filter.insert(&key);
371
372 self.current_block.add_entry(key, value);
373 Ok(())
374 }
375
376 fn flush_block(&mut self) -> Result<()> {
386 if self.current_block.entries.is_empty() {
387 return Ok(());
388 }
389
390 let writer = self.writer.as_mut().ok_or_else(|| {
391 AmateRSError::StorageIntegrity(ErrorContext::new(
392 "SSTable writer already finalized".to_string(),
393 ))
394 })?;
395
396 let block_bytes = self.current_block.encode()?;
397 let original_size = block_bytes.len() as u32;
398
399 let compressed = compression::compress_block(&block_bytes, self.config.compression_type)?;
400 let compressed_size = compressed.len() as u32;
401
402 writer
404 .write_all(&original_size.to_le_bytes())
405 .map_err(|e| {
406 AmateRSError::StorageIntegrity(ErrorContext::new(format!(
407 "Failed to write block original size: {}",
408 e
409 )))
410 })?;
411 writer
412 .write_all(&compressed_size.to_le_bytes())
413 .map_err(|e| {
414 AmateRSError::StorageIntegrity(ErrorContext::new(format!(
415 "Failed to write block compressed size: {}",
416 e
417 )))
418 })?;
419 writer.write_all(&compressed).map_err(|e| {
420 AmateRSError::StorageIntegrity(ErrorContext::new(format!(
421 "Failed to write compressed block: {}",
422 e
423 )))
424 })?;
425
426 self.current_offset += 8 + compressed.len() as u64;
428 self.current_block = DataBlock::new();
429
430 Ok(())
431 }
432
433 pub fn finish(mut self) -> Result<()> {
435 self.flush_block()?;
437
438 let writer = self.writer.as_mut().ok_or_else(|| {
439 AmateRSError::StorageIntegrity(ErrorContext::new(
440 "SSTable writer already finalized".to_string(),
441 ))
442 })?;
443
444 let index_offset = self.current_offset;
446 let mut index_bytes = Vec::new();
447
448 index_bytes.extend_from_slice(&(self.index.len() as u32).to_le_bytes());
450
451 for entry in &self.index {
452 let key_bytes = entry.key.as_bytes();
453 index_bytes.extend_from_slice(&(key_bytes.len() as u32).to_le_bytes());
454 index_bytes.extend_from_slice(key_bytes);
455 index_bytes.extend_from_slice(&entry.offset.to_le_bytes());
456 }
457
458 let index_checksum = calculate_checksum(&index_bytes);
459 index_bytes.extend_from_slice(&index_checksum.to_le_bytes());
460
461 writer.write_all(&index_bytes).map_err(|e| {
462 AmateRSError::StorageIntegrity(ErrorContext::new(format!(
463 "Failed to write index: {}",
464 e
465 )))
466 })?;
467 self.current_offset += index_bytes.len() as u64;
468
469 let bloom_filter_offset = self.current_offset;
471
472 let bloom_metadata = self.bloom_filter.metadata();
474 let metadata_bytes = bloom_metadata.to_bytes();
475 writer.write_all(&metadata_bytes).map_err(|e| {
476 AmateRSError::StorageIntegrity(ErrorContext::new(format!(
477 "Failed to write bloom filter metadata: {}",
478 e
479 )))
480 })?;
481 self.current_offset += metadata_bytes.len() as u64;
482
483 let bloom_data = self.bloom_filter.as_bytes();
485 writer.write_all(bloom_data).map_err(|e| {
486 AmateRSError::StorageIntegrity(ErrorContext::new(format!(
487 "Failed to write bloom filter data: {}",
488 e
489 )))
490 })?;
491 self.current_offset += bloom_data.len() as u64;
492
493 let footer = Footer::new(
495 index_offset,
496 bloom_filter_offset,
497 self.config.block_size as u32,
498 self.index.len() as u32,
499 self.config.compression_type,
500 );
501 let footer_bytes = footer.encode();
502 writer.write_all(&footer_bytes).map_err(|e| {
503 AmateRSError::StorageIntegrity(ErrorContext::new(format!(
504 "Failed to write footer: {}",
505 e
506 )))
507 })?;
508
509 writer.flush().map_err(|e| {
511 AmateRSError::StorageIntegrity(ErrorContext::new(format!("Failed to flush: {}", e)))
512 })?;
513
514 writer.get_ref().sync_all().map_err(|e| {
515 AmateRSError::StorageIntegrity(ErrorContext::new(format!("Failed to sync: {}", e)))
516 })?;
517
518 self.writer = None;
519
520 Ok(())
521 }
522}
523
524pub struct SSTableReader {
526 path: PathBuf,
527 file: Arc<File>,
528 footer: Footer,
529 index: Vec<IndexEntry>,
530 bloom_filter: BloomFilter,
531 compression_type: CompressionType,
532}
533
534impl SSTableReader {
535 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
537 let file = File::open(path.as_ref()).map_err(|e| {
538 AmateRSError::StorageIntegrity(ErrorContext::new(format!(
539 "Failed to open SSTable: {}",
540 e
541 )))
542 })?;
543
544 let file_size = file
546 .metadata()
547 .map_err(|e| {
548 AmateRSError::StorageIntegrity(ErrorContext::new(format!(
549 "Failed to get file metadata: {}",
550 e
551 )))
552 })?
553 .len();
554
555 if file_size < FOOTER_SIZE as u64 {
556 return Err(AmateRSError::StorageIntegrity(ErrorContext::new(
557 "SSTable file too small".to_string(),
558 )));
559 }
560
561 let mut reader = BufReader::new(&file);
562 reader
563 .seek(SeekFrom::End(-(FOOTER_SIZE as i64)))
564 .map_err(|e| {
565 AmateRSError::StorageIntegrity(ErrorContext::new(format!(
566 "Failed to seek to footer: {}",
567 e
568 )))
569 })?;
570
571 let mut footer_bytes = [0u8; FOOTER_SIZE];
572 reader.read_exact(&mut footer_bytes).map_err(|e| {
573 AmateRSError::StorageIntegrity(ErrorContext::new(format!(
574 "Failed to read footer: {}",
575 e
576 )))
577 })?;
578
579 let footer = Footer::decode(&footer_bytes)?;
580
581 reader
583 .seek(SeekFrom::Start(footer.index_offset))
584 .map_err(|e| {
585 AmateRSError::StorageIntegrity(ErrorContext::new(format!(
586 "Failed to seek to index: {}",
587 e
588 )))
589 })?;
590
591 let index_size = footer.bloom_filter_offset - footer.index_offset;
593 let mut index_bytes = vec![0u8; index_size as usize];
594 reader.read_exact(&mut index_bytes).map_err(|e| {
595 AmateRSError::StorageIntegrity(ErrorContext::new(format!(
596 "Failed to read index: {}",
597 e
598 )))
599 })?;
600
601 let data_len = index_bytes.len() - 4;
603 let checksum_bytes = &index_bytes[data_len..];
604 let expected_checksum = u32::from_le_bytes([
605 checksum_bytes[0],
606 checksum_bytes[1],
607 checksum_bytes[2],
608 checksum_bytes[3],
609 ]);
610 verify_checksum(&index_bytes[..data_len], expected_checksum)?;
611
612 let mut cursor = 0;
614 let num_entries = u32::from_le_bytes([
615 index_bytes[cursor],
616 index_bytes[cursor + 1],
617 index_bytes[cursor + 2],
618 index_bytes[cursor + 3],
619 ]) as usize;
620 cursor += 4;
621
622 let mut index = Vec::with_capacity(num_entries);
623
624 for _ in 0..num_entries {
625 let key_len = u32::from_le_bytes([
626 index_bytes[cursor],
627 index_bytes[cursor + 1],
628 index_bytes[cursor + 2],
629 index_bytes[cursor + 3],
630 ]) as usize;
631 cursor += 4;
632
633 let key = Key::from_slice(&index_bytes[cursor..cursor + key_len]);
634 cursor += key_len;
635
636 let offset = u64::from_le_bytes([
637 index_bytes[cursor],
638 index_bytes[cursor + 1],
639 index_bytes[cursor + 2],
640 index_bytes[cursor + 3],
641 index_bytes[cursor + 4],
642 index_bytes[cursor + 5],
643 index_bytes[cursor + 6],
644 index_bytes[cursor + 7],
645 ]);
646 cursor += 8;
647
648 index.push(IndexEntry { key, offset });
649 }
650
651 reader
653 .seek(SeekFrom::Start(footer.bloom_filter_offset))
654 .map_err(|e| {
655 AmateRSError::StorageIntegrity(ErrorContext::new(format!(
656 "Failed to seek to bloom filter: {}",
657 e
658 )))
659 })?;
660
661 let mut metadata_bytes = [0u8; 24];
663 reader.read_exact(&mut metadata_bytes).map_err(|e| {
664 AmateRSError::StorageIntegrity(ErrorContext::new(format!(
665 "Failed to read bloom filter metadata: {}",
666 e
667 )))
668 })?;
669
670 let bloom_metadata = BloomFilterMetadata::from_bytes(&metadata_bytes)?;
671
672 let bloom_size = bloom_metadata.num_bits.div_ceil(8);
674 let mut bloom_data = vec![0u8; bloom_size];
675 reader.read_exact(&mut bloom_data).map_err(|e| {
676 AmateRSError::StorageIntegrity(ErrorContext::new(format!(
677 "Failed to read bloom filter data: {}",
678 e
679 )))
680 })?;
681
682 let bloom_filter = BloomFilter::from_bytes(
683 bloom_data,
684 bloom_metadata.num_bits,
685 bloom_metadata.num_hash_functions,
686 bloom_metadata.num_elements,
687 )?;
688
689 let compression_type = footer.compression_type;
690
691 Ok(Self {
692 path: path.as_ref().to_path_buf(),
693 file: Arc::new(file),
694 footer,
695 index,
696 bloom_filter,
697 compression_type,
698 })
699 }
700
701 pub fn may_contain(&self, key: &Key) -> bool {
707 self.bloom_filter.may_contain(key)
708 }
709
710 pub fn get(&self, key: &Key) -> Result<Option<CipherBlob>> {
712 if !self.may_contain(key) {
714 return Ok(None);
715 }
716
717 let Some(block_index) = self.find_block_index(key) else {
719 return Ok(None);
720 };
721 let block = self.read_block(block_index)?;
722
723 for (k, v) in &block.entries {
725 if k == key {
726 return Ok(Some(v.clone()));
727 }
728 }
729
730 Ok(None)
731 }
732
733 fn find_block_index(&self, key: &Key) -> Option<usize> {
735 match self.index.binary_search_by(|entry| entry.key.cmp(key)) {
737 Ok(idx) => Some(idx),
738 Err(idx) => {
739 if idx == 0 {
740 None
741 } else {
742 Some(idx - 1)
743 }
744 }
745 }
746 }
747
748 fn read_block(&self, block_index: usize) -> Result<DataBlock> {
755 if block_index >= self.index.len() {
756 return Err(AmateRSError::StorageIntegrity(ErrorContext::new(
757 "Block index out of bounds".to_string(),
758 )));
759 }
760
761 let offset = self.index[block_index].offset;
762
763 let mut reader = BufReader::new(self.file.as_ref());
764 reader.seek(SeekFrom::Start(offset)).map_err(|e| {
765 AmateRSError::StorageIntegrity(ErrorContext::new(format!(
766 "Failed to seek to block: {}",
767 e
768 )))
769 })?;
770
771 let mut header = [0u8; 8];
773 reader.read_exact(&mut header).map_err(|e| {
774 AmateRSError::StorageIntegrity(ErrorContext::new(format!(
775 "Failed to read block header: {}",
776 e
777 )))
778 })?;
779
780 let original_size =
781 u32::from_le_bytes([header[0], header[1], header[2], header[3]]) as usize;
782 let compressed_size =
783 u32::from_le_bytes([header[4], header[5], header[6], header[7]]) as usize;
784
785 let mut compressed_data = vec![0u8; compressed_size];
787 reader.read_exact(&mut compressed_data).map_err(|e| {
788 AmateRSError::StorageIntegrity(ErrorContext::new(format!(
789 "Failed to read compressed block data: {}",
790 e
791 )))
792 })?;
793
794 let block_bytes =
796 compression::decompress_block(&compressed_data, self.compression_type, original_size)?;
797
798 DataBlock::decode(&block_bytes)
799 }
800
801 pub fn iter(&self) -> Result<Vec<(Key, CipherBlob)>> {
803 let mut entries = Vec::new();
804
805 for i in 0..self.index.len() {
806 let block = self.read_block(i)?;
807 entries.extend(block.entries);
808 }
809
810 Ok(entries)
811 }
812
813 pub fn metadata(&self) -> Result<(Key, Key, usize)> {
815 if self.index.is_empty() {
816 return Err(AmateRSError::StorageIntegrity(ErrorContext::new(
817 "SSTable has no entries".to_string(),
818 )));
819 }
820
821 let entries = self.iter()?;
823
824 if entries.is_empty() {
825 return Err(AmateRSError::StorageIntegrity(ErrorContext::new(
826 "SSTable has no data entries".to_string(),
827 )));
828 }
829
830 let min_key = entries
831 .first()
832 .ok_or_else(|| {
833 AmateRSError::StorageIntegrity(ErrorContext::new(
834 "Failed to get first entry".to_string(),
835 ))
836 })?
837 .0
838 .clone();
839
840 let max_key = entries
841 .last()
842 .ok_or_else(|| {
843 AmateRSError::StorageIntegrity(ErrorContext::new(
844 "Failed to get last entry".to_string(),
845 ))
846 })?
847 .0
848 .clone();
849
850 Ok((min_key, max_key, entries.len()))
851 }
852}
853
854#[cfg(test)]
855mod tests {
856 use super::*;
857 use std::env;
858
859 #[test]
860 fn test_sstable_basic_write_read() -> Result<()> {
861 let dir = env::temp_dir();
862 let path = dir.join("test_sstable_basic.sst");
863
864 {
866 let config = SSTableConfig::default();
867 let mut writer = SSTableWriter::new(&path, config)?;
868
869 for i in 0..10 {
870 let key = Key::from_str(&format!("key_{:03}", i));
871 let value = CipherBlob::new(vec![i as u8; 100]);
872 writer.add(key, value)?;
873 }
874
875 writer.finish()?;
876 }
877
878 {
880 let reader = SSTableReader::open(&path)?;
881
882 for i in 0..10 {
884 let key = Key::from_str(&format!("key_{:03}", i));
885 let value = reader.get(&key)?;
886 assert!(value.is_some());
887 let value = value.expect("Value should exist in SSTable");
888 assert_eq!(value.as_bytes()[0], i as u8);
889 }
890
891 let key = Key::from_str("nonexistent");
893 let value = reader.get(&key)?;
894 assert!(value.is_none());
895 }
896
897 std::fs::remove_file(&path).ok();
899
900 Ok(())
901 }
902
903 #[test]
904 fn test_sstable_multiple_blocks() -> Result<()> {
905 let dir = env::temp_dir();
906 let path = dir.join("test_sstable_blocks.sst");
907
908 {
910 let config = SSTableConfig {
911 block_size: 256,
912 compression_type: CompressionType::None,
913 };
914 let mut writer = SSTableWriter::new(&path, config)?;
915
916 for i in 0..100 {
917 let key = Key::from_str(&format!("key_{:03}", i));
918 let value = CipherBlob::new(vec![i as u8; 50]);
919 writer.add(key, value)?;
920 }
921
922 writer.finish()?;
923 }
924
925 {
927 let reader = SSTableReader::open(&path)?;
928
929 for i in 0..100 {
930 let key = Key::from_str(&format!("key_{:03}", i));
931 let value = reader.get(&key)?;
932 assert!(value.is_some());
933 }
934 }
935
936 std::fs::remove_file(&path).ok();
937
938 Ok(())
939 }
940
941 #[test]
942 fn test_sstable_iteration() -> Result<()> {
943 let dir = env::temp_dir();
944 let path = dir.join("test_sstable_iter.sst");
945
946 {
948 let config = SSTableConfig::default();
949 let mut writer = SSTableWriter::new(&path, config)?;
950
951 for i in 0..50 {
952 let key = Key::from_str(&format!("key_{:03}", i));
953 let value = CipherBlob::new(vec![i as u8; 100]);
954 writer.add(key, value)?;
955 }
956
957 writer.finish()?;
958 }
959
960 {
962 let reader = SSTableReader::open(&path)?;
963 let entries = reader.iter()?;
964
965 assert_eq!(entries.len(), 50);
966
967 for i in 0..49 {
969 assert!(entries[i].0 < entries[i + 1].0);
970 }
971 }
972
973 std::fs::remove_file(&path).ok();
974
975 Ok(())
976 }
977
978 #[test]
979 fn test_sstable_empty() -> Result<()> {
980 let dir = env::temp_dir();
981 let path = dir.join("test_sstable_empty.sst");
982
983 {
985 let config = SSTableConfig::default();
986 let writer = SSTableWriter::new(&path, config)?;
987 writer.finish()?;
988 }
989
990 {
992 let reader = SSTableReader::open(&path)?;
993 let entries = reader.iter()?;
994 assert_eq!(entries.len(), 0);
995
996 let key = Key::from_str("any_key");
997 let value = reader.get(&key)?;
998 assert!(value.is_none());
999 }
1000
1001 std::fs::remove_file(&path).ok();
1002
1003 Ok(())
1004 }
1005
1006 #[test]
1007 fn test_sstable_large_values() -> Result<()> {
1008 let dir = env::temp_dir();
1009 let path = dir.join("test_sstable_large.sst");
1010
1011 {
1013 let config = SSTableConfig::default();
1014 let mut writer = SSTableWriter::new(&path, config)?;
1015
1016 for i in 0..10 {
1017 let key = Key::from_str(&format!("key_{:03}", i));
1018 let value = CipherBlob::new(vec![i as u8; 10000]); writer.add(key, value)?;
1020 }
1021
1022 writer.finish()?;
1023 }
1024
1025 {
1027 let reader = SSTableReader::open(&path)?;
1028
1029 for i in 0..10 {
1030 let key = Key::from_str(&format!("key_{:03}", i));
1031 let value = reader.get(&key)?;
1032 assert!(value.is_some());
1033 let value = value.expect("Value should exist in SSTable");
1034 assert_eq!(value.as_bytes().len(), 10000);
1035 }
1036 }
1037
1038 std::fs::remove_file(&path).ok();
1039
1040 Ok(())
1041 }
1042
1043 #[test]
1044 fn test_sstable_corruption_detection() -> Result<()> {
1045 let dir = env::temp_dir();
1046 let path = dir.join("test_sstable_corrupt.sst");
1047
1048 {
1050 let config = SSTableConfig::default();
1051 let mut writer = SSTableWriter::new(&path, config)?;
1052
1053 for i in 0..10 {
1054 let key = Key::from_str(&format!("key_{:03}", i));
1055 let value = CipherBlob::new(vec![i as u8; 100]);
1056 writer.add(key, value)?;
1057 }
1058
1059 writer.finish()?;
1060 }
1061
1062 {
1064 let mut file = OpenOptions::new().write(true).open(&path)?;
1065 file.seek(SeekFrom::End(-4))?;
1067 file.write_all(&[0xFF, 0xFF, 0xFF, 0xFF])?;
1068 }
1069
1070 let result = SSTableReader::open(&path);
1072 assert!(result.is_err());
1073
1074 std::fs::remove_file(&path).ok();
1075
1076 Ok(())
1077 }
1078
1079 fn write_read_roundtrip(
1081 filename: &str,
1082 compression_type: CompressionType,
1083 num_entries: usize,
1084 value_size: usize,
1085 block_size: usize,
1086 ) -> Result<()> {
1087 let dir = env::temp_dir();
1088 let path = dir.join(filename);
1089
1090 {
1092 let config = SSTableConfig {
1093 block_size,
1094 compression_type,
1095 };
1096 let mut writer = SSTableWriter::new(&path, config)?;
1097
1098 for i in 0..num_entries {
1099 let key = Key::from_str(&format!("key_{:06}", i));
1100 let mut value_data = Vec::with_capacity(value_size);
1102 for j in 0..value_size {
1103 value_data.push(((i + j) % 256) as u8);
1104 }
1105 let value = CipherBlob::new(value_data);
1106 writer.add(key, value)?;
1107 }
1108
1109 writer.finish()?;
1110 }
1111
1112 {
1114 let reader = SSTableReader::open(&path)?;
1115
1116 for i in 0..num_entries {
1117 let key = Key::from_str(&format!("key_{:06}", i));
1118 let value = reader.get(&key)?.ok_or_else(|| {
1119 AmateRSError::StorageIntegrity(ErrorContext::new(format!(
1120 "Missing key {} with {:?} compression",
1121 i, compression_type
1122 )))
1123 })?;
1124
1125 assert_eq!(value.as_bytes().len(), value_size);
1126 for j in 0..value_size {
1127 assert_eq!(
1128 value.as_bytes()[j],
1129 ((i + j) % 256) as u8,
1130 "Value mismatch at key={}, byte={}",
1131 i,
1132 j
1133 );
1134 }
1135 }
1136
1137 let missing = Key::from_str("nonexistent_key");
1139 assert!(reader.get(&missing)?.is_none());
1140
1141 let entries = reader.iter()?;
1143 assert_eq!(entries.len(), num_entries);
1144 }
1145
1146 std::fs::remove_file(&path).ok();
1147 Ok(())
1148 }
1149
1150 #[test]
1151 fn test_sstable_compressed_lz4_basic() -> Result<()> {
1152 write_read_roundtrip(
1153 "test_sstable_lz4_basic.sst",
1154 CompressionType::Lz4,
1155 20,
1156 200,
1157 DEFAULT_BLOCK_SIZE,
1158 )
1159 }
1160
1161 #[test]
1162 fn test_sstable_compressed_deflate_basic() -> Result<()> {
1163 write_read_roundtrip(
1164 "test_sstable_deflate_basic.sst",
1165 CompressionType::Deflate,
1166 20,
1167 200,
1168 DEFAULT_BLOCK_SIZE,
1169 )
1170 }
1171
1172 #[test]
1173 fn test_sstable_compressed_lz4_multiple_blocks() -> Result<()> {
1174 write_read_roundtrip(
1175 "test_sstable_lz4_multiblock.sst",
1176 CompressionType::Lz4,
1177 100,
1178 100,
1179 256, )
1181 }
1182
1183 #[test]
1184 fn test_sstable_compressed_deflate_multiple_blocks() -> Result<()> {
1185 write_read_roundtrip(
1186 "test_sstable_deflate_multiblock.sst",
1187 CompressionType::Deflate,
1188 100,
1189 100,
1190 256,
1191 )
1192 }
1193
1194 #[test]
1195 fn test_sstable_compression_ratio() -> Result<()> {
1196 let dir = env::temp_dir();
1197 let path_none = dir.join("test_sstable_ratio_none.sst");
1198 let path_lz4 = dir.join("test_sstable_ratio_lz4.sst");
1199 let path_deflate = dir.join("test_sstable_ratio_deflate.sst");
1200
1201 let num_entries = 200;
1203 let value_size = 500;
1204
1205 for (path, ct) in [
1206 (&path_none, CompressionType::None),
1207 (&path_lz4, CompressionType::Lz4),
1208 (&path_deflate, CompressionType::Deflate),
1209 ] {
1210 let config = SSTableConfig {
1211 block_size: DEFAULT_BLOCK_SIZE,
1212 compression_type: ct,
1213 };
1214 let mut writer = SSTableWriter::new(path, config)?;
1215
1216 for i in 0..num_entries {
1217 let key = Key::from_str(&format!("key_{:06}", i));
1218 let value = CipherBlob::new(vec![(i % 10) as u8; value_size]);
1220 writer.add(key, value)?;
1221 }
1222
1223 writer.finish()?;
1224 }
1225
1226 let size_none = std::fs::metadata(&path_none)
1227 .map_err(|e| {
1228 AmateRSError::StorageIntegrity(ErrorContext::new(format!(
1229 "Failed to get file size: {}",
1230 e
1231 )))
1232 })?
1233 .len();
1234 let size_lz4 = std::fs::metadata(&path_lz4)
1235 .map_err(|e| {
1236 AmateRSError::StorageIntegrity(ErrorContext::new(format!(
1237 "Failed to get file size: {}",
1238 e
1239 )))
1240 })?
1241 .len();
1242 let size_deflate = std::fs::metadata(&path_deflate)
1243 .map_err(|e| {
1244 AmateRSError::StorageIntegrity(ErrorContext::new(format!(
1245 "Failed to get file size: {}",
1246 e
1247 )))
1248 })?
1249 .len();
1250
1251 assert!(
1253 size_lz4 < size_none,
1254 "LZ4 ({}) should be smaller than None ({})",
1255 size_lz4,
1256 size_none
1257 );
1258 assert!(
1259 size_deflate < size_none,
1260 "Deflate ({}) should be smaller than None ({})",
1261 size_deflate,
1262 size_none
1263 );
1264
1265 for path in [&path_none, &path_lz4, &path_deflate] {
1267 let reader = SSTableReader::open(path)?;
1268 let entries = reader.iter()?;
1269 assert_eq!(entries.len(), num_entries);
1270 }
1271
1272 std::fs::remove_file(&path_none).ok();
1273 std::fs::remove_file(&path_lz4).ok();
1274 std::fs::remove_file(&path_deflate).ok();
1275
1276 Ok(())
1277 }
1278
1279 #[test]
1280 fn test_sstable_large_block_compression() -> Result<()> {
1281 write_read_roundtrip(
1283 "test_sstable_large_block_comp.sst",
1284 CompressionType::Lz4,
1285 10,
1286 10000,
1287 65536,
1288 )
1289 }
1290
1291 #[test]
1292 fn test_sstable_compressed_empty() -> Result<()> {
1293 let dir = env::temp_dir();
1294
1295 for ct in [CompressionType::Lz4, CompressionType::Deflate] {
1296 let filename = format!("test_sstable_empty_{:?}.sst", ct);
1297 let path = dir.join(&filename);
1298
1299 {
1300 let config = SSTableConfig {
1301 block_size: DEFAULT_BLOCK_SIZE,
1302 compression_type: ct,
1303 };
1304 let writer = SSTableWriter::new(&path, config)?;
1305 writer.finish()?;
1306 }
1307
1308 {
1309 let reader = SSTableReader::open(&path)?;
1310 let entries = reader.iter()?;
1311 assert_eq!(entries.len(), 0);
1312
1313 let key = Key::from_str("any_key");
1314 assert!(reader.get(&key)?.is_none());
1315 }
1316
1317 std::fs::remove_file(&path).ok();
1318 }
1319
1320 Ok(())
1321 }
1322
1323 #[test]
1324 fn test_sstable_compressed_iteration_order() -> Result<()> {
1325 let dir = env::temp_dir();
1326 let path = dir.join("test_sstable_comp_iter_order.sst");
1327
1328 {
1329 let config = SSTableConfig {
1330 block_size: 256,
1331 compression_type: CompressionType::Deflate,
1332 };
1333 let mut writer = SSTableWriter::new(&path, config)?;
1334
1335 for i in 0..50 {
1336 let key = Key::from_str(&format!("key_{:06}", i));
1337 let value = CipherBlob::new(vec![i as u8; 100]);
1338 writer.add(key, value)?;
1339 }
1340
1341 writer.finish()?;
1342 }
1343
1344 {
1345 let reader = SSTableReader::open(&path)?;
1346 let entries = reader.iter()?;
1347
1348 assert_eq!(entries.len(), 50);
1349
1350 for i in 0..49 {
1352 assert!(
1353 entries[i].0 < entries[i + 1].0,
1354 "Order violation at index {}",
1355 i
1356 );
1357 }
1358 }
1359
1360 std::fs::remove_file(&path).ok();
1361 Ok(())
1362 }
1363
1364 #[test]
1365 fn test_sstable_compressed_metadata() -> Result<()> {
1366 let dir = env::temp_dir();
1367 let path = dir.join("test_sstable_comp_metadata.sst");
1368
1369 {
1370 let config = SSTableConfig {
1371 block_size: DEFAULT_BLOCK_SIZE,
1372 compression_type: CompressionType::Lz4,
1373 };
1374 let mut writer = SSTableWriter::new(&path, config)?;
1375
1376 for i in 0..25 {
1377 let key = Key::from_str(&format!("key_{:06}", i));
1378 let value = CipherBlob::new(vec![i as u8; 50]);
1379 writer.add(key, value)?;
1380 }
1381
1382 writer.finish()?;
1383 }
1384
1385 {
1386 let reader = SSTableReader::open(&path)?;
1387 let (min_key, max_key, count) = reader.metadata()?;
1388
1389 assert_eq!(min_key, Key::from_str("key_000000"));
1390 assert_eq!(max_key, Key::from_str("key_000024"));
1391 assert_eq!(count, 25);
1392 }
1393
1394 std::fs::remove_file(&path).ok();
1395 Ok(())
1396 }
1397
1398 #[test]
1399 fn test_sstable_compressed_bloom_filter() -> Result<()> {
1400 let dir = env::temp_dir();
1401 let path = dir.join("test_sstable_comp_bloom.sst");
1402
1403 {
1404 let config = SSTableConfig {
1405 block_size: DEFAULT_BLOCK_SIZE,
1406 compression_type: CompressionType::Deflate,
1407 };
1408 let mut writer = SSTableWriter::new(&path, config)?;
1409
1410 for i in 0..100 {
1411 let key = Key::from_str(&format!("existing_{:06}", i));
1412 let value = CipherBlob::new(vec![i as u8; 30]);
1413 writer.add(key, value)?;
1414 }
1415
1416 writer.finish()?;
1417 }
1418
1419 {
1420 let reader = SSTableReader::open(&path)?;
1421
1422 for i in 0..100 {
1424 let key = Key::from_str(&format!("existing_{:06}", i));
1425 assert!(reader.may_contain(&key));
1426 }
1427
1428 let mut rejected = 0;
1430 for i in 0..1000 {
1431 let key = Key::from_str(&format!("missing_{:06}", i));
1432 if !reader.may_contain(&key) {
1433 rejected += 1;
1434 }
1435 }
1436 assert!(
1438 rejected > 900,
1439 "Bloom filter rejected only {} of 1000 non-existent keys",
1440 rejected
1441 );
1442 }
1443
1444 std::fs::remove_file(&path).ok();
1445 Ok(())
1446 }
1447}