1use crate::memtable::MemtableEntry;
19use rustlite_core::{Error, Result};
20use serde::{Deserialize, Serialize};
21use std::fs::{self, File};
22use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
23use std::path::{Path, PathBuf};
24
25const SSTABLE_MAGIC_HEADER: [u8; 4] = *b"RSSL";
27
28const SSTABLE_MAGIC: u64 = 0x53_53_54_42_4C_49_54;
30
31const SSTABLE_FORMAT_VERSION: u16 = 1;
34
35const DEFAULT_BLOCK_SIZE: usize = 4096;
37
38const ENTRY_TYPE_VALUE: u8 = 0;
40const ENTRY_TYPE_TOMBSTONE: u8 = 1;
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct SSTableEntry {
45 pub key: Vec<u8>,
47 pub entry_type: u8,
49 pub value: Vec<u8>,
51}
52
53impl SSTableEntry {
54 pub fn value(key: Vec<u8>, value: Vec<u8>) -> Self {
56 Self {
57 key,
58 entry_type: ENTRY_TYPE_VALUE,
59 value,
60 }
61 }
62
63 pub fn tombstone(key: Vec<u8>) -> Self {
65 Self {
66 key,
67 entry_type: ENTRY_TYPE_TOMBSTONE,
68 value: Vec::new(),
69 }
70 }
71
72 pub fn is_tombstone(&self) -> bool {
74 self.entry_type == ENTRY_TYPE_TOMBSTONE
75 }
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize)]
80pub struct IndexEntry {
81 pub first_key: Vec<u8>,
83 pub offset: u64,
85 pub size: u32,
87}
88
89#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct SSTableFooter {
92 pub format_version: u16,
94 pub index_offset: u64,
96 pub index_size: u32,
98 pub entry_count: u64,
100 pub min_key: Vec<u8>,
102 pub max_key: Vec<u8>,
104 pub magic: u64,
106 pub crc: u32,
108}
109
110#[derive(Debug, Clone)]
112pub struct SSTableHeader {
113 pub magic: [u8; 4],
115 pub version: u16,
117}
118
119impl SSTableHeader {
120 pub const SIZE: usize = 6; pub fn new() -> Self {
125 Self {
126 magic: SSTABLE_MAGIC_HEADER,
127 version: SSTABLE_FORMAT_VERSION,
128 }
129 }
130
131 pub fn write_to<W: Write>(&self, writer: &mut W) -> Result<()> {
133 writer.write_all(&self.magic)?;
134 writer.write_all(&self.version.to_le_bytes())?;
135 Ok(())
136 }
137
138 pub fn read_from<R: Read>(reader: &mut R) -> Result<Self> {
140 let mut magic = [0u8; 4];
141 reader.read_exact(&mut magic)?;
142
143 if magic != SSTABLE_MAGIC_HEADER {
144 return Err(Error::Corruption(format!(
145 "Invalid SSTable magic: expected {:?}, got {:?}",
146 SSTABLE_MAGIC_HEADER, magic
147 )));
148 }
149
150 let mut version_bytes = [0u8; 2];
151 reader.read_exact(&mut version_bytes)?;
152 let version = u16::from_le_bytes(version_bytes);
153
154 if version > SSTABLE_FORMAT_VERSION {
155 return Err(Error::Corruption(format!(
156 "Unsupported SSTable version: {} (current: {})",
157 version, SSTABLE_FORMAT_VERSION
158 )));
159 }
160
161 Ok(Self { magic, version })
162 }
163}
164
165#[derive(Debug, Clone)]
167pub struct SSTableMeta {
168 pub path: PathBuf,
170 pub min_key: Vec<u8>,
172 pub max_key: Vec<u8>,
174 pub entry_count: u64,
176 pub file_size: u64,
178 pub level: u32,
180 pub sequence: u64,
182}
183
184pub struct SSTableWriter {
186 path: PathBuf,
188 writer: BufWriter<File>,
190 position: u64,
192 index: Vec<IndexEntry>,
194 block_buffer: Vec<u8>,
196 block_size: usize,
198 current_block_first_key: Option<Vec<u8>>,
200 entry_count: u64,
202 min_key: Option<Vec<u8>>,
204 max_key: Option<Vec<u8>>,
206}
207
208impl SSTableWriter {
209 pub fn new(path: impl AsRef<Path>) -> Result<Self> {
211 Self::with_block_size(path, DEFAULT_BLOCK_SIZE)
212 }
213
214 pub fn with_block_size(path: impl AsRef<Path>, block_size: usize) -> Result<Self> {
216 let path = path.as_ref().to_path_buf();
217 let file = File::create(&path)?;
218 let mut writer = BufWriter::new(file);
219
220 let header = SSTableHeader::new();
222 header.write_to(&mut writer)?;
223 let header_size = SSTableHeader::SIZE as u64;
224
225 Ok(Self {
226 path,
227 writer,
228 position: header_size,
229 index: Vec::new(),
230 block_buffer: Vec::with_capacity(block_size),
231 block_size,
232 current_block_first_key: None,
233 entry_count: 0,
234 min_key: None,
235 max_key: None,
236 })
237 }
238
239 pub fn add(&mut self, entry: SSTableEntry) -> Result<()> {
241 if self.min_key.is_none() {
243 self.min_key = Some(entry.key.clone());
244 }
245 self.max_key = Some(entry.key.clone());
246
247 if self.current_block_first_key.is_none() {
249 self.current_block_first_key = Some(entry.key.clone());
250 }
251
252 let encoded =
254 bincode::serialize(&entry).map_err(|e| Error::Serialization(e.to_string()))?;
255
256 let len = encoded.len() as u32;
258 self.block_buffer.extend_from_slice(&len.to_le_bytes());
259 self.block_buffer.extend_from_slice(&encoded);
260
261 self.entry_count += 1;
262
263 if self.block_buffer.len() >= self.block_size {
265 self.flush_block()?;
266 }
267
268 Ok(())
269 }
270
271 fn flush_block(&mut self) -> Result<()> {
273 if self.block_buffer.is_empty() {
274 return Ok(());
275 }
276
277 let crc = crc32fast::hash(&self.block_buffer);
279
280 if let Some(first_key) = self.current_block_first_key.take() {
282 self.index.push(IndexEntry {
283 first_key,
284 offset: self.position,
285 size: self.block_buffer.len() as u32 + 4, });
287 }
288
289 self.writer.write_all(&self.block_buffer)?;
291 self.position += self.block_buffer.len() as u64;
292
293 self.writer.write_all(&crc.to_le_bytes())?;
295 self.position += 4;
296
297 self.block_buffer.clear();
298
299 Ok(())
300 }
301
302 pub fn finish(mut self) -> Result<SSTableMeta> {
304 self.flush_block()?;
306
307 let index_offset = self.position;
309 let index_encoded =
310 bincode::serialize(&self.index).map_err(|e| Error::Serialization(e.to_string()))?;
311 let index_size = index_encoded.len() as u32;
312
313 self.writer.write_all(&index_encoded)?;
314 self.position += index_size as u64;
315
316 let min_key = self.min_key.clone().unwrap_or_default();
318 let max_key = self.max_key.clone().unwrap_or_default();
319
320 let footer_data = SSTableFooter {
321 format_version: SSTABLE_FORMAT_VERSION,
322 index_offset,
323 index_size,
324 entry_count: self.entry_count,
325 min_key: min_key.clone(),
326 max_key: max_key.clone(),
327 magic: SSTABLE_MAGIC,
328 crc: 0, };
330
331 let footer_encoded =
332 bincode::serialize(&footer_data).map_err(|e| Error::Serialization(e.to_string()))?;
333 let footer_crc = crc32fast::hash(&footer_encoded);
334
335 let final_footer = SSTableFooter {
337 crc: footer_crc,
338 ..footer_data
339 };
340 let final_footer_encoded =
341 bincode::serialize(&final_footer).map_err(|e| Error::Serialization(e.to_string()))?;
342
343 let footer_len = final_footer_encoded.len() as u32;
345 self.writer.write_all(&final_footer_encoded)?;
346 self.writer.write_all(&footer_len.to_le_bytes())?;
347
348 self.writer.flush()?;
349
350 let file_size = self.position + final_footer_encoded.len() as u64 + 4;
351
352 Ok(SSTableMeta {
353 path: self.path,
354 min_key,
355 max_key,
356 entry_count: self.entry_count,
357 file_size,
358 level: 0,
359 sequence: 0,
360 })
361 }
362
363 pub fn from_memtable<I>(path: impl AsRef<Path>, iter: I) -> Result<SSTableMeta>
365 where
366 I: Iterator<Item = (Vec<u8>, MemtableEntry)>,
367 {
368 let mut writer = SSTableWriter::new(path)?;
369
370 for (key, entry) in iter {
371 let sstable_entry = match entry {
372 MemtableEntry::Value(v) => SSTableEntry::value(key, v),
373 MemtableEntry::Tombstone => SSTableEntry::tombstone(key),
374 };
375 writer.add(sstable_entry)?;
376 }
377
378 writer.finish()
379 }
380}
381
382pub struct SSTableReader {
384 path: PathBuf,
386 file: BufReader<File>,
388 index: Vec<IndexEntry>,
390 footer: SSTableFooter,
392 file_size: u64,
394 header_offset: u64,
396}
397
398impl SSTableReader {
399 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
401 let path = path.as_ref().to_path_buf();
402 let mut file = File::open(&path)?;
403
404 let file_size = file.metadata()?.len();
406
407 if file_size < 4 {
408 return Err(Error::Corruption("SSTable too small".into()));
409 }
410
411 let header_offset = if file_size >= SSTableHeader::SIZE as u64 {
414 file.seek(SeekFrom::Start(0))?;
415 match SSTableHeader::read_from(&mut file) {
416 Ok(header) => {
417 tracing::debug!("Opened SSTable with format version {}", header.version);
419 SSTableHeader::SIZE as u64
420 }
421 Err(_) => {
422 tracing::debug!("Opened legacy SSTable (pre-v1.0)");
424 0
425 }
426 }
427 } else {
428 0
430 };
431
432 file.seek(SeekFrom::End(-4))?;
434 let mut footer_len_buf = [0u8; 4];
435 file.read_exact(&mut footer_len_buf)?;
436 let footer_len = u32::from_le_bytes(footer_len_buf) as i64;
437
438 file.seek(SeekFrom::End(-4 - footer_len))?;
440 let mut footer_buf = vec![0u8; footer_len as usize];
441 file.read_exact(&mut footer_buf)?;
442
443 let footer: SSTableFooter =
444 bincode::deserialize(&footer_buf).map_err(|e| Error::Serialization(e.to_string()))?;
445
446 if footer.magic != SSTABLE_MAGIC {
448 return Err(Error::Corruption("Invalid SSTable magic number".into()));
449 }
450
451 if footer.format_version > SSTABLE_FORMAT_VERSION {
453 return Err(Error::Corruption(format!(
454 "Unsupported SSTable format version: {} (current: {})",
455 footer.format_version, SSTABLE_FORMAT_VERSION
456 )));
457 }
458
459 let index_offset = if header_offset > 0 {
461 footer.index_offset
463 } else {
464 footer.index_offset
466 };
467 file.seek(SeekFrom::Start(index_offset))?;
468 let mut index_buf = vec![0u8; footer.index_size as usize];
469 file.read_exact(&mut index_buf)?;
470
471 let index: Vec<IndexEntry> =
472 bincode::deserialize(&index_buf).map_err(|e| Error::Serialization(e.to_string()))?;
473
474 Ok(Self {
475 path,
476 file: BufReader::new(file.try_clone()?),
477 index,
478 footer,
479 file_size,
480 header_offset,
481 })
482 }
483
484 pub fn get(&mut self, key: &[u8]) -> Result<Option<SSTableEntry>> {
486 let block_idx = self
488 .index
489 .partition_point(|entry| entry.first_key.as_slice() <= key);
490
491 if block_idx == 0 {
493 if key < self.footer.min_key.as_slice() {
495 return Ok(None);
496 }
497 }
498
499 let block_idx = if block_idx > 0 { block_idx - 1 } else { 0 };
501
502 if block_idx >= self.index.len() {
503 return Ok(None);
504 }
505
506 let block = self.read_block(block_idx)?;
508
509 for entry in block {
510 if entry.key.as_slice() == key {
511 return Ok(Some(entry));
512 }
513 if entry.key.as_slice() > key {
514 break;
515 }
516 }
517
518 Ok(None)
519 }
520
521 fn read_block(&mut self, block_idx: usize) -> Result<Vec<SSTableEntry>> {
523 let index_entry = &self.index[block_idx];
524
525 let absolute_offset = if self.header_offset > 0 {
528 index_entry.offset } else {
530 index_entry.offset };
532 self.file.seek(SeekFrom::Start(absolute_offset))?;
533
534 let data_size = index_entry.size as usize - 4; let mut data_buf = vec![0u8; data_size];
536 self.file.read_exact(&mut data_buf)?;
537
538 let mut crc_buf = [0u8; 4];
540 self.file.read_exact(&mut crc_buf)?;
541 let stored_crc = u32::from_le_bytes(crc_buf);
542 let computed_crc = crc32fast::hash(&data_buf);
543
544 if stored_crc != computed_crc {
545 return Err(Error::Corruption("Block CRC mismatch".into()));
546 }
547
548 let mut entries = Vec::new();
550 let mut offset = 0;
551
552 while offset < data_buf.len() {
553 if offset + 4 > data_buf.len() {
554 break;
555 }
556
557 let len = u32::from_le_bytes([
558 data_buf[offset],
559 data_buf[offset + 1],
560 data_buf[offset + 2],
561 data_buf[offset + 3],
562 ]) as usize;
563 offset += 4;
564
565 if offset + len > data_buf.len() {
566 break;
567 }
568
569 let entry: SSTableEntry = bincode::deserialize(&data_buf[offset..offset + len])
570 .map_err(|e| Error::Serialization(e.to_string()))?;
571 entries.push(entry);
572 offset += len;
573 }
574
575 Ok(entries)
576 }
577
578 pub fn metadata(&self) -> SSTableMeta {
580 SSTableMeta {
581 path: self.path.clone(),
582 min_key: self.footer.min_key.clone(),
583 max_key: self.footer.max_key.clone(),
584 entry_count: self.footer.entry_count,
585 file_size: self.file_size,
586 level: 0,
587 sequence: 0,
588 }
589 }
590
591 pub fn might_contain(&self, key: &[u8]) -> bool {
593 key >= self.footer.min_key.as_slice() && key <= self.footer.max_key.as_slice()
594 }
595
596 pub fn iter(&mut self) -> Result<SSTableIterator<'_>> {
598 Ok(SSTableIterator {
599 reader: self,
600 block_idx: 0,
601 block_entries: Vec::new(),
602 entry_idx: 0,
603 })
604 }
605}
606
607pub struct SSTableIterator<'a> {
609 reader: &'a mut SSTableReader,
610 block_idx: usize,
611 block_entries: Vec<SSTableEntry>,
612 entry_idx: usize,
613}
614
615impl SSTableIterator<'_> {
616 pub fn next_entry(&mut self) -> Result<Option<SSTableEntry>> {
618 loop {
619 if self.entry_idx < self.block_entries.len() {
621 let entry = self.block_entries[self.entry_idx].clone();
622 self.entry_idx += 1;
623 return Ok(Some(entry));
624 }
625
626 if self.block_idx >= self.reader.index.len() {
628 return Ok(None);
629 }
630
631 self.block_entries = self.reader.read_block(self.block_idx)?;
632 self.block_idx += 1;
633 self.entry_idx = 0;
634 }
635 }
636}
637
638pub fn delete_sstable(path: impl AsRef<Path>) -> Result<()> {
640 fs::remove_file(path)?;
641 Ok(())
642}
643
644#[cfg(test)]
645mod tests {
646 use super::*;
647 use tempfile::tempdir;
648
649 #[test]
650 fn test_sstable_write_read() {
651 let dir = tempdir().unwrap();
652 let path = dir.path().join("test.sst");
653
654 let mut writer = SSTableWriter::new(&path).unwrap();
656 writer
657 .add(SSTableEntry::value(b"a".to_vec(), b"1".to_vec()))
658 .unwrap();
659 writer
660 .add(SSTableEntry::value(b"b".to_vec(), b"2".to_vec()))
661 .unwrap();
662 writer
663 .add(SSTableEntry::value(b"c".to_vec(), b"3".to_vec()))
664 .unwrap();
665 let meta = writer.finish().unwrap();
666
667 assert_eq!(meta.entry_count, 3);
668 assert_eq!(meta.min_key, b"a".to_vec());
669 assert_eq!(meta.max_key, b"c".to_vec());
670
671 let mut reader = SSTableReader::open(&path).unwrap();
673
674 let entry = reader.get(b"a").unwrap().unwrap();
675 assert_eq!(entry.value, b"1".to_vec());
676
677 let entry = reader.get(b"b").unwrap().unwrap();
678 assert_eq!(entry.value, b"2".to_vec());
679
680 let entry = reader.get(b"c").unwrap().unwrap();
681 assert_eq!(entry.value, b"3".to_vec());
682
683 assert!(reader.get(b"d").unwrap().is_none());
684 }
685
686 #[test]
687 fn test_sstable_tombstone() {
688 let dir = tempdir().unwrap();
689 let path = dir.path().join("test.sst");
690
691 let mut writer = SSTableWriter::new(&path).unwrap();
693 writer
694 .add(SSTableEntry::tombstone(b"deleted".to_vec()))
695 .unwrap();
696 writer
697 .add(SSTableEntry::value(b"key".to_vec(), b"value".to_vec()))
698 .unwrap();
699 writer.finish().unwrap();
700
701 let mut reader = SSTableReader::open(&path).unwrap();
702
703 let entry = reader.get(b"key").unwrap().unwrap();
704 assert!(!entry.is_tombstone());
705
706 let entry = reader.get(b"deleted").unwrap().unwrap();
707 assert!(entry.is_tombstone());
708 }
709
710 #[test]
711 fn test_sstable_iterator() {
712 let dir = tempdir().unwrap();
713 let path = dir.path().join("test.sst");
714
715 let mut writer = SSTableWriter::new(&path).unwrap();
716 for i in 0..100 {
717 let key = format!("key{:03}", i);
718 let value = format!("value{}", i);
719 writer
720 .add(SSTableEntry::value(key.into_bytes(), value.into_bytes()))
721 .unwrap();
722 }
723 writer.finish().unwrap();
724
725 let mut reader = SSTableReader::open(&path).unwrap();
726 let mut iter = reader.iter().unwrap();
727
728 let mut count = 0;
729 while let Some(_entry) = iter.next_entry().unwrap() {
730 count += 1;
731 }
732
733 assert_eq!(count, 100);
734 }
735
736 #[test]
737 fn test_sstable_from_memtable() {
738 use crate::memtable::Memtable;
739
740 let dir = tempdir().unwrap();
741 let path = dir.path().join("test.sst");
742
743 let mut mt = Memtable::new();
744 mt.put(b"a".to_vec(), b"1".to_vec());
745 mt.put(b"b".to_vec(), b"2".to_vec());
746 mt.delete(b"c".to_vec());
747
748 let meta = SSTableWriter::from_memtable(&path, mt.drain()).unwrap();
749
750 assert_eq!(meta.entry_count, 3);
751
752 let mut reader = SSTableReader::open(&path).unwrap();
753 assert_eq!(reader.get(b"a").unwrap().unwrap().value, b"1".to_vec());
754 assert!(reader.get(b"c").unwrap().unwrap().is_tombstone());
755 }
756
757 #[test]
758 fn test_sstable_might_contain() {
759 let dir = tempdir().unwrap();
760 let path = dir.path().join("test.sst");
761
762 let mut writer = SSTableWriter::new(&path).unwrap();
763 writer
764 .add(SSTableEntry::value(b"b".to_vec(), b"2".to_vec()))
765 .unwrap();
766 writer
767 .add(SSTableEntry::value(b"d".to_vec(), b"4".to_vec()))
768 .unwrap();
769 writer.finish().unwrap();
770
771 let reader = SSTableReader::open(&path).unwrap();
772
773 assert!(!reader.might_contain(b"a")); assert!(reader.might_contain(b"b")); assert!(reader.might_contain(b"c")); assert!(reader.might_contain(b"d")); assert!(!reader.might_contain(b"e")); }
779}