1use crate::error::{AmateRSError, ErrorContext, Result};
7use crate::storage::{BloomFilter, BloomFilterConfig, BloomFilterMetadata};
8use crate::types::{CipherBlob, Key};
9use crate::utils::{calculate_checksum, verify_checksum};
10use std::collections::BTreeMap;
11use std::fs::{File, OpenOptions};
12use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
13use std::path::{Path, PathBuf};
14use std::sync::Arc;
15
16const SSTABLE_MAGIC: u32 = 0x53535441;
18
19const SSTABLE_VERSION: u32 = 2; const DEFAULT_BLOCK_SIZE: usize = 4096;
24
25#[derive(Debug, Clone)]
27pub struct SSTableConfig {
28 pub block_size: usize,
30 pub enable_compression: bool,
32}
33
34impl Default for SSTableConfig {
35 fn default() -> Self {
36 Self {
37 block_size: DEFAULT_BLOCK_SIZE,
38 enable_compression: false,
39 }
40 }
41}
42
43#[derive(Debug, Clone)]
45struct IndexEntry {
46 key: Key,
48 offset: u64,
50}
51
52#[derive(Debug, Clone)]
54struct DataBlock {
55 entries: Vec<(Key, CipherBlob)>,
56 size: usize,
57}
58
59impl DataBlock {
60 fn new() -> Self {
61 Self {
62 entries: Vec::new(),
63 size: 0,
64 }
65 }
66
67 fn add_entry(&mut self, key: Key, value: CipherBlob) {
68 let entry_size = 8 + key.as_bytes().len() + value.as_bytes().len();
69 self.entries.push((key, value));
70 self.size += entry_size;
71 }
72
73 fn is_full(&self, block_size: usize) -> bool {
74 self.size >= block_size
75 }
76
77 fn encode(&self) -> Result<Vec<u8>> {
78 let mut bytes = Vec::with_capacity(self.size + 8);
79
80 bytes.extend_from_slice(&(self.entries.len() as u32).to_le_bytes());
82
83 for (key, value) in &self.entries {
85 let key_bytes = key.as_bytes();
86 let value_bytes = value.as_bytes();
87
88 bytes.extend_from_slice(&(key_bytes.len() as u32).to_le_bytes());
90 bytes.extend_from_slice(key_bytes);
91
92 bytes.extend_from_slice(&(value_bytes.len() as u32).to_le_bytes());
94 bytes.extend_from_slice(value_bytes);
95 }
96
97 let checksum = calculate_checksum(&bytes);
99 bytes.extend_from_slice(&checksum.to_le_bytes());
100
101 Ok(bytes)
102 }
103
104 fn decode(bytes: &[u8]) -> Result<Self> {
105 if bytes.len() < 8 {
106 return Err(AmateRSError::StorageIntegrity(ErrorContext::new(
107 "Data block too small".to_string(),
108 )));
109 }
110
111 let data_len = bytes.len() - 4;
113 let checksum_bytes = &bytes[data_len..];
114 let expected_checksum = u32::from_le_bytes([
115 checksum_bytes[0],
116 checksum_bytes[1],
117 checksum_bytes[2],
118 checksum_bytes[3],
119 ]);
120 verify_checksum(&bytes[..data_len], expected_checksum)?;
121
122 let mut cursor = 0;
123 let num_entries = u32::from_le_bytes([
124 bytes[cursor],
125 bytes[cursor + 1],
126 bytes[cursor + 2],
127 bytes[cursor + 3],
128 ]) as usize;
129 cursor += 4;
130
131 let mut block = DataBlock::new();
132
133 for _ in 0..num_entries {
134 if cursor + 4 > data_len {
136 return Err(AmateRSError::StorageIntegrity(ErrorContext::new(
137 "Incomplete key length".to_string(),
138 )));
139 }
140 let key_len = u32::from_le_bytes([
141 bytes[cursor],
142 bytes[cursor + 1],
143 bytes[cursor + 2],
144 bytes[cursor + 3],
145 ]) as usize;
146 cursor += 4;
147
148 if cursor + key_len > data_len {
149 return Err(AmateRSError::StorageIntegrity(ErrorContext::new(
150 "Incomplete key data".to_string(),
151 )));
152 }
153 let key = Key::from_slice(&bytes[cursor..cursor + key_len]);
154 cursor += key_len;
155
156 if cursor + 4 > data_len {
158 return Err(AmateRSError::StorageIntegrity(ErrorContext::new(
159 "Incomplete value length".to_string(),
160 )));
161 }
162 let value_len = u32::from_le_bytes([
163 bytes[cursor],
164 bytes[cursor + 1],
165 bytes[cursor + 2],
166 bytes[cursor + 3],
167 ]) as usize;
168 cursor += 4;
169
170 if cursor + value_len > data_len {
171 return Err(AmateRSError::StorageIntegrity(ErrorContext::new(
172 "Incomplete value data".to_string(),
173 )));
174 }
175 let value = CipherBlob::new(bytes[cursor..cursor + value_len].to_vec());
176 cursor += value_len;
177
178 block.add_entry(key, value);
179 }
180
181 Ok(block)
182 }
183}
184
185#[derive(Debug, Clone)]
187struct Footer {
188 magic: u32,
189 version: u32,
190 index_offset: u64,
191 bloom_filter_offset: u64,
192 block_size: u32,
193 num_blocks: u32,
194 checksum: u32,
195}
196
197impl Footer {
198 fn new(index_offset: u64, bloom_filter_offset: u64, block_size: u32, num_blocks: u32) -> Self {
199 let mut footer = Self {
200 magic: SSTABLE_MAGIC,
201 version: SSTABLE_VERSION,
202 index_offset,
203 bloom_filter_offset,
204 block_size,
205 num_blocks,
206 checksum: 0,
207 };
208
209 let mut bytes = Vec::new();
211 bytes.extend_from_slice(&footer.magic.to_le_bytes());
212 bytes.extend_from_slice(&footer.version.to_le_bytes());
213 bytes.extend_from_slice(&footer.index_offset.to_le_bytes());
214 bytes.extend_from_slice(&footer.bloom_filter_offset.to_le_bytes());
215 bytes.extend_from_slice(&footer.block_size.to_le_bytes());
216 bytes.extend_from_slice(&footer.num_blocks.to_le_bytes());
217 footer.checksum = calculate_checksum(&bytes);
218
219 footer
220 }
221
222 fn encode(&self) -> Vec<u8> {
223 let mut bytes = Vec::with_capacity(36);
224 bytes.extend_from_slice(&self.magic.to_le_bytes());
225 bytes.extend_from_slice(&self.version.to_le_bytes());
226 bytes.extend_from_slice(&self.index_offset.to_le_bytes());
227 bytes.extend_from_slice(&self.bloom_filter_offset.to_le_bytes());
228 bytes.extend_from_slice(&self.block_size.to_le_bytes());
229 bytes.extend_from_slice(&self.num_blocks.to_le_bytes());
230 bytes.extend_from_slice(&self.checksum.to_le_bytes());
231 bytes
232 }
233
234 fn decode(bytes: &[u8]) -> Result<Self> {
235 if bytes.len() < 36 {
236 return Err(AmateRSError::StorageIntegrity(ErrorContext::new(
237 "Footer too small".to_string(),
238 )));
239 }
240
241 let magic = u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
242 let version = u32::from_le_bytes([bytes[4], bytes[5], bytes[6], bytes[7]]);
243 let index_offset = u64::from_le_bytes([
244 bytes[8], bytes[9], bytes[10], bytes[11], bytes[12], bytes[13], bytes[14], bytes[15],
245 ]);
246 let bloom_filter_offset = u64::from_le_bytes([
247 bytes[16], bytes[17], bytes[18], bytes[19], bytes[20], bytes[21], bytes[22], bytes[23],
248 ]);
249 let block_size = u32::from_le_bytes([bytes[24], bytes[25], bytes[26], bytes[27]]);
250 let num_blocks = u32::from_le_bytes([bytes[28], bytes[29], bytes[30], bytes[31]]);
251 let checksum = u32::from_le_bytes([bytes[32], bytes[33], bytes[34], bytes[35]]);
252
253 if magic != SSTABLE_MAGIC {
254 return Err(AmateRSError::StorageIntegrity(ErrorContext::new(format!(
255 "Invalid SSTable magic: expected {}, got {}",
256 SSTABLE_MAGIC, magic
257 ))));
258 }
259
260 if version != SSTABLE_VERSION {
261 return Err(AmateRSError::StorageIntegrity(ErrorContext::new(format!(
262 "Unsupported SSTable version: {}",
263 version
264 ))));
265 }
266
267 let mut verify_bytes = Vec::new();
269 verify_bytes.extend_from_slice(&magic.to_le_bytes());
270 verify_bytes.extend_from_slice(&version.to_le_bytes());
271 verify_bytes.extend_from_slice(&index_offset.to_le_bytes());
272 verify_bytes.extend_from_slice(&bloom_filter_offset.to_le_bytes());
273 verify_bytes.extend_from_slice(&block_size.to_le_bytes());
274 verify_bytes.extend_from_slice(&num_blocks.to_le_bytes());
275 verify_checksum(&verify_bytes, checksum)?;
276
277 Ok(Self {
278 magic,
279 version,
280 index_offset,
281 bloom_filter_offset,
282 block_size,
283 num_blocks,
284 checksum,
285 })
286 }
287}
288
289pub struct SSTableWriter {
291 path: PathBuf,
292 config: SSTableConfig,
293 writer: Option<BufWriter<File>>,
294 current_block: DataBlock,
295 index: Vec<IndexEntry>,
296 current_offset: u64,
297 bloom_filter: BloomFilter,
298}
299
300impl SSTableWriter {
301 pub fn new<P: AsRef<Path>>(path: P, config: SSTableConfig) -> Result<Self> {
303 let file = OpenOptions::new()
304 .write(true)
305 .create(true)
306 .truncate(true)
307 .open(path.as_ref())
308 .map_err(|e| {
309 AmateRSError::StorageIntegrity(ErrorContext::new(format!(
310 "Failed to create SSTable file: {}",
311 e
312 )))
313 })?;
314
315 let bloom_filter = BloomFilter::new(BloomFilterConfig {
317 expected_elements: 10000, false_positive_rate: 0.01, });
320
321 Ok(Self {
322 path: path.as_ref().to_path_buf(),
323 config,
324 writer: Some(BufWriter::new(file)),
325 current_block: DataBlock::new(),
326 index: Vec::new(),
327 current_offset: 0,
328 bloom_filter,
329 })
330 }
331
332 pub fn add(&mut self, key: Key, value: CipherBlob) -> Result<()> {
334 let entry_size = 8 + key.as_bytes().len() + value.as_bytes().len();
336 if self.current_block.size + entry_size > self.config.block_size
337 && !self.current_block.entries.is_empty()
338 {
339 self.flush_block()?;
340 }
341
342 if self.current_block.entries.is_empty() {
344 self.index.push(IndexEntry {
345 key: key.clone(),
346 offset: self.current_offset,
347 });
348 }
349
350 self.bloom_filter.insert(&key);
352
353 self.current_block.add_entry(key, value);
354 Ok(())
355 }
356
357 fn flush_block(&mut self) -> Result<()> {
359 if self.current_block.entries.is_empty() {
360 return Ok(());
361 }
362
363 let writer = self.writer.as_mut().ok_or_else(|| {
364 AmateRSError::StorageIntegrity(ErrorContext::new(
365 "SSTable writer already finalized".to_string(),
366 ))
367 })?;
368
369 let block_bytes = self.current_block.encode()?;
370 writer.write_all(&block_bytes).map_err(|e| {
371 AmateRSError::StorageIntegrity(ErrorContext::new(format!(
372 "Failed to write block: {}",
373 e
374 )))
375 })?;
376
377 self.current_offset += block_bytes.len() as u64;
378 self.current_block = DataBlock::new();
379
380 Ok(())
381 }
382
383 pub fn finish(mut self) -> Result<()> {
385 self.flush_block()?;
387
388 let writer = self.writer.as_mut().ok_or_else(|| {
389 AmateRSError::StorageIntegrity(ErrorContext::new(
390 "SSTable writer already finalized".to_string(),
391 ))
392 })?;
393
394 let index_offset = self.current_offset;
396 let mut index_bytes = Vec::new();
397
398 index_bytes.extend_from_slice(&(self.index.len() as u32).to_le_bytes());
400
401 for entry in &self.index {
402 let key_bytes = entry.key.as_bytes();
403 index_bytes.extend_from_slice(&(key_bytes.len() as u32).to_le_bytes());
404 index_bytes.extend_from_slice(key_bytes);
405 index_bytes.extend_from_slice(&entry.offset.to_le_bytes());
406 }
407
408 let index_checksum = calculate_checksum(&index_bytes);
409 index_bytes.extend_from_slice(&index_checksum.to_le_bytes());
410
411 writer.write_all(&index_bytes).map_err(|e| {
412 AmateRSError::StorageIntegrity(ErrorContext::new(format!(
413 "Failed to write index: {}",
414 e
415 )))
416 })?;
417 self.current_offset += index_bytes.len() as u64;
418
419 let bloom_filter_offset = self.current_offset;
421
422 let bloom_metadata = self.bloom_filter.metadata();
424 let metadata_bytes = bloom_metadata.to_bytes();
425 writer.write_all(&metadata_bytes).map_err(|e| {
426 AmateRSError::StorageIntegrity(ErrorContext::new(format!(
427 "Failed to write bloom filter metadata: {}",
428 e
429 )))
430 })?;
431 self.current_offset += metadata_bytes.len() as u64;
432
433 let bloom_data = self.bloom_filter.as_bytes();
435 writer.write_all(bloom_data).map_err(|e| {
436 AmateRSError::StorageIntegrity(ErrorContext::new(format!(
437 "Failed to write bloom filter data: {}",
438 e
439 )))
440 })?;
441 self.current_offset += bloom_data.len() as u64;
442
443 let footer = Footer::new(
445 index_offset,
446 bloom_filter_offset,
447 self.config.block_size as u32,
448 self.index.len() as u32,
449 );
450 let footer_bytes = footer.encode();
451 writer.write_all(&footer_bytes).map_err(|e| {
452 AmateRSError::StorageIntegrity(ErrorContext::new(format!(
453 "Failed to write footer: {}",
454 e
455 )))
456 })?;
457
458 writer.flush().map_err(|e| {
460 AmateRSError::StorageIntegrity(ErrorContext::new(format!("Failed to flush: {}", e)))
461 })?;
462
463 writer.get_ref().sync_all().map_err(|e| {
464 AmateRSError::StorageIntegrity(ErrorContext::new(format!("Failed to sync: {}", e)))
465 })?;
466
467 self.writer = None;
468
469 Ok(())
470 }
471}
472
473pub struct SSTableReader {
475 path: PathBuf,
476 file: Arc<File>,
477 footer: Footer,
478 index: Vec<IndexEntry>,
479 bloom_filter: BloomFilter,
480}
481
482impl SSTableReader {
483 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
485 let file = File::open(path.as_ref()).map_err(|e| {
486 AmateRSError::StorageIntegrity(ErrorContext::new(format!(
487 "Failed to open SSTable: {}",
488 e
489 )))
490 })?;
491
492 let file_size = file
494 .metadata()
495 .map_err(|e| {
496 AmateRSError::StorageIntegrity(ErrorContext::new(format!(
497 "Failed to get file metadata: {}",
498 e
499 )))
500 })?
501 .len();
502
503 if file_size < 36 {
504 return Err(AmateRSError::StorageIntegrity(ErrorContext::new(
505 "SSTable file too small".to_string(),
506 )));
507 }
508
509 let mut reader = BufReader::new(&file);
510 reader.seek(SeekFrom::End(-36)).map_err(|e| {
511 AmateRSError::StorageIntegrity(ErrorContext::new(format!(
512 "Failed to seek to footer: {}",
513 e
514 )))
515 })?;
516
517 let mut footer_bytes = [0u8; 36];
518 reader.read_exact(&mut footer_bytes).map_err(|e| {
519 AmateRSError::StorageIntegrity(ErrorContext::new(format!(
520 "Failed to read footer: {}",
521 e
522 )))
523 })?;
524
525 let footer = Footer::decode(&footer_bytes)?;
526
527 reader
529 .seek(SeekFrom::Start(footer.index_offset))
530 .map_err(|e| {
531 AmateRSError::StorageIntegrity(ErrorContext::new(format!(
532 "Failed to seek to index: {}",
533 e
534 )))
535 })?;
536
537 let index_size = footer.bloom_filter_offset - footer.index_offset;
539 let mut index_bytes = vec![0u8; index_size as usize];
540 reader.read_exact(&mut index_bytes).map_err(|e| {
541 AmateRSError::StorageIntegrity(ErrorContext::new(format!(
542 "Failed to read index: {}",
543 e
544 )))
545 })?;
546
547 let data_len = index_bytes.len() - 4;
549 let checksum_bytes = &index_bytes[data_len..];
550 let expected_checksum = u32::from_le_bytes([
551 checksum_bytes[0],
552 checksum_bytes[1],
553 checksum_bytes[2],
554 checksum_bytes[3],
555 ]);
556 verify_checksum(&index_bytes[..data_len], expected_checksum)?;
557
558 let mut cursor = 0;
560 let num_entries = u32::from_le_bytes([
561 index_bytes[cursor],
562 index_bytes[cursor + 1],
563 index_bytes[cursor + 2],
564 index_bytes[cursor + 3],
565 ]) as usize;
566 cursor += 4;
567
568 let mut index = Vec::with_capacity(num_entries);
569
570 for _ in 0..num_entries {
571 let key_len = u32::from_le_bytes([
572 index_bytes[cursor],
573 index_bytes[cursor + 1],
574 index_bytes[cursor + 2],
575 index_bytes[cursor + 3],
576 ]) as usize;
577 cursor += 4;
578
579 let key = Key::from_slice(&index_bytes[cursor..cursor + key_len]);
580 cursor += key_len;
581
582 let offset = u64::from_le_bytes([
583 index_bytes[cursor],
584 index_bytes[cursor + 1],
585 index_bytes[cursor + 2],
586 index_bytes[cursor + 3],
587 index_bytes[cursor + 4],
588 index_bytes[cursor + 5],
589 index_bytes[cursor + 6],
590 index_bytes[cursor + 7],
591 ]);
592 cursor += 8;
593
594 index.push(IndexEntry { key, offset });
595 }
596
597 reader
599 .seek(SeekFrom::Start(footer.bloom_filter_offset))
600 .map_err(|e| {
601 AmateRSError::StorageIntegrity(ErrorContext::new(format!(
602 "Failed to seek to bloom filter: {}",
603 e
604 )))
605 })?;
606
607 let mut metadata_bytes = [0u8; 24];
609 reader.read_exact(&mut metadata_bytes).map_err(|e| {
610 AmateRSError::StorageIntegrity(ErrorContext::new(format!(
611 "Failed to read bloom filter metadata: {}",
612 e
613 )))
614 })?;
615
616 let bloom_metadata = BloomFilterMetadata::from_bytes(&metadata_bytes)?;
617
618 let bloom_size = (bloom_metadata.num_bits + 7) / 8;
620 let mut bloom_data = vec![0u8; bloom_size];
621 reader.read_exact(&mut bloom_data).map_err(|e| {
622 AmateRSError::StorageIntegrity(ErrorContext::new(format!(
623 "Failed to read bloom filter data: {}",
624 e
625 )))
626 })?;
627
628 let bloom_filter = BloomFilter::from_bytes(
629 bloom_data,
630 bloom_metadata.num_bits,
631 bloom_metadata.num_hash_functions,
632 bloom_metadata.num_elements,
633 )?;
634
635 Ok(Self {
636 path: path.as_ref().to_path_buf(),
637 file: Arc::new(file),
638 footer,
639 index,
640 bloom_filter,
641 })
642 }
643
644 pub fn may_contain(&self, key: &Key) -> bool {
650 self.bloom_filter.may_contain(key)
651 }
652
653 pub fn get(&self, key: &Key) -> Result<Option<CipherBlob>> {
655 if !self.may_contain(key) {
657 return Ok(None);
658 }
659
660 let Some(block_index) = self.find_block_index(key) else {
662 return Ok(None);
663 };
664 let block = self.read_block(block_index)?;
665
666 for (k, v) in &block.entries {
668 if k == key {
669 return Ok(Some(v.clone()));
670 }
671 }
672
673 Ok(None)
674 }
675
676 fn find_block_index(&self, key: &Key) -> Option<usize> {
678 match self.index.binary_search_by(|entry| entry.key.cmp(key)) {
680 Ok(idx) => Some(idx),
681 Err(idx) => {
682 if idx == 0 {
683 None
684 } else {
685 Some(idx - 1)
686 }
687 }
688 }
689 }
690
691 fn read_block(&self, block_index: usize) -> Result<DataBlock> {
693 if block_index >= self.index.len() {
694 return Err(AmateRSError::StorageIntegrity(ErrorContext::new(
695 "Block index out of bounds".to_string(),
696 )));
697 }
698
699 let offset = self.index[block_index].offset;
700 let next_offset = if block_index + 1 < self.index.len() {
701 self.index[block_index + 1].offset
702 } else {
703 self.footer.index_offset
704 };
705
706 let block_size = (next_offset - offset) as usize;
707 let mut block_bytes = vec![0u8; block_size];
708
709 let mut reader = BufReader::new(self.file.as_ref());
710 reader.seek(SeekFrom::Start(offset)).map_err(|e| {
711 AmateRSError::StorageIntegrity(ErrorContext::new(format!(
712 "Failed to seek to block: {}",
713 e
714 )))
715 })?;
716
717 reader.read_exact(&mut block_bytes).map_err(|e| {
718 AmateRSError::StorageIntegrity(ErrorContext::new(format!(
719 "Failed to read block: {}",
720 e
721 )))
722 })?;
723
724 DataBlock::decode(&block_bytes)
725 }
726
727 pub fn iter(&self) -> Result<Vec<(Key, CipherBlob)>> {
729 let mut entries = Vec::new();
730
731 for i in 0..self.index.len() {
732 let block = self.read_block(i)?;
733 entries.extend(block.entries);
734 }
735
736 Ok(entries)
737 }
738
739 pub fn metadata(&self) -> Result<(Key, Key, usize)> {
741 if self.index.is_empty() {
742 return Err(AmateRSError::StorageIntegrity(ErrorContext::new(
743 "SSTable has no entries".to_string(),
744 )));
745 }
746
747 let entries = self.iter()?;
749
750 if entries.is_empty() {
751 return Err(AmateRSError::StorageIntegrity(ErrorContext::new(
752 "SSTable has no data entries".to_string(),
753 )));
754 }
755
756 let min_key = entries
757 .first()
758 .ok_or_else(|| {
759 AmateRSError::StorageIntegrity(ErrorContext::new(
760 "Failed to get first entry".to_string(),
761 ))
762 })?
763 .0
764 .clone();
765
766 let max_key = entries
767 .last()
768 .ok_or_else(|| {
769 AmateRSError::StorageIntegrity(ErrorContext::new(
770 "Failed to get last entry".to_string(),
771 ))
772 })?
773 .0
774 .clone();
775
776 Ok((min_key, max_key, entries.len()))
777 }
778}
779
780#[cfg(test)]
781mod tests {
782 use super::*;
783 use std::env;
784
785 #[test]
786 fn test_sstable_basic_write_read() -> Result<()> {
787 let dir = env::temp_dir();
788 let path = dir.join("test_sstable_basic.sst");
789
790 {
792 let config = SSTableConfig::default();
793 let mut writer = SSTableWriter::new(&path, config)?;
794
795 for i in 0..10 {
796 let key = Key::from_str(&format!("key_{:03}", i));
797 let value = CipherBlob::new(vec![i as u8; 100]);
798 writer.add(key, value)?;
799 }
800
801 writer.finish()?;
802 }
803
804 {
806 let reader = SSTableReader::open(&path)?;
807
808 for i in 0..10 {
810 let key = Key::from_str(&format!("key_{:03}", i));
811 let value = reader.get(&key)?;
812 assert!(value.is_some());
813 let value = value.expect("Value should exist in SSTable");
814 assert_eq!(value.as_bytes()[0], i as u8);
815 }
816
817 let key = Key::from_str("nonexistent");
819 let value = reader.get(&key)?;
820 assert!(value.is_none());
821 }
822
823 std::fs::remove_file(&path).ok();
825
826 Ok(())
827 }
828
829 #[test]
830 fn test_sstable_multiple_blocks() -> Result<()> {
831 let dir = env::temp_dir();
832 let path = dir.join("test_sstable_blocks.sst");
833
834 {
836 let config = SSTableConfig {
837 block_size: 256,
838 enable_compression: false,
839 };
840 let mut writer = SSTableWriter::new(&path, config)?;
841
842 for i in 0..100 {
843 let key = Key::from_str(&format!("key_{:03}", i));
844 let value = CipherBlob::new(vec![i as u8; 50]);
845 writer.add(key, value)?;
846 }
847
848 writer.finish()?;
849 }
850
851 {
853 let reader = SSTableReader::open(&path)?;
854
855 for i in 0..100 {
856 let key = Key::from_str(&format!("key_{:03}", i));
857 let value = reader.get(&key)?;
858 assert!(value.is_some());
859 }
860 }
861
862 std::fs::remove_file(&path).ok();
863
864 Ok(())
865 }
866
867 #[test]
868 fn test_sstable_iteration() -> Result<()> {
869 let dir = env::temp_dir();
870 let path = dir.join("test_sstable_iter.sst");
871
872 {
874 let config = SSTableConfig::default();
875 let mut writer = SSTableWriter::new(&path, config)?;
876
877 for i in 0..50 {
878 let key = Key::from_str(&format!("key_{:03}", i));
879 let value = CipherBlob::new(vec![i as u8; 100]);
880 writer.add(key, value)?;
881 }
882
883 writer.finish()?;
884 }
885
886 {
888 let reader = SSTableReader::open(&path)?;
889 let entries = reader.iter()?;
890
891 assert_eq!(entries.len(), 50);
892
893 for i in 0..49 {
895 assert!(entries[i].0 < entries[i + 1].0);
896 }
897 }
898
899 std::fs::remove_file(&path).ok();
900
901 Ok(())
902 }
903
904 #[test]
905 fn test_sstable_empty() -> Result<()> {
906 let dir = env::temp_dir();
907 let path = dir.join("test_sstable_empty.sst");
908
909 {
911 let config = SSTableConfig::default();
912 let writer = SSTableWriter::new(&path, config)?;
913 writer.finish()?;
914 }
915
916 {
918 let reader = SSTableReader::open(&path)?;
919 let entries = reader.iter()?;
920 assert_eq!(entries.len(), 0);
921
922 let key = Key::from_str("any_key");
923 let value = reader.get(&key)?;
924 assert!(value.is_none());
925 }
926
927 std::fs::remove_file(&path).ok();
928
929 Ok(())
930 }
931
932 #[test]
933 fn test_sstable_large_values() -> Result<()> {
934 let dir = env::temp_dir();
935 let path = dir.join("test_sstable_large.sst");
936
937 {
939 let config = SSTableConfig::default();
940 let mut writer = SSTableWriter::new(&path, config)?;
941
942 for i in 0..10 {
943 let key = Key::from_str(&format!("key_{:03}", i));
944 let value = CipherBlob::new(vec![i as u8; 10000]); writer.add(key, value)?;
946 }
947
948 writer.finish()?;
949 }
950
951 {
953 let reader = SSTableReader::open(&path)?;
954
955 for i in 0..10 {
956 let key = Key::from_str(&format!("key_{:03}", i));
957 let value = reader.get(&key)?;
958 assert!(value.is_some());
959 let value = value.expect("Value should exist in SSTable");
960 assert_eq!(value.as_bytes().len(), 10000);
961 }
962 }
963
964 std::fs::remove_file(&path).ok();
965
966 Ok(())
967 }
968
969 #[test]
970 fn test_sstable_corruption_detection() -> Result<()> {
971 let dir = env::temp_dir();
972 let path = dir.join("test_sstable_corrupt.sst");
973
974 {
976 let config = SSTableConfig::default();
977 let mut writer = SSTableWriter::new(&path, config)?;
978
979 for i in 0..10 {
980 let key = Key::from_str(&format!("key_{:03}", i));
981 let value = CipherBlob::new(vec![i as u8; 100]);
982 writer.add(key, value)?;
983 }
984
985 writer.finish()?;
986 }
987
988 {
990 let mut file = OpenOptions::new().write(true).open(&path)?;
991 file.seek(SeekFrom::End(-4))?;
993 file.write_all(&[0xFF, 0xFF, 0xFF, 0xFF])?;
994 }
995
996 let result = SSTableReader::open(&path);
998 assert!(result.is_err());
999
1000 std::fs::remove_file(&path).ok();
1001
1002 Ok(())
1003 }
1004}