1use crate::memtable::MemtableEntry;
19use rustlite_core::{Error, Result};
20use serde::{Deserialize, Serialize};
21use std::fs::{self, File};
22use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
23use std::path::{Path, PathBuf};
24
25const SSTABLE_MAGIC: u64 = 0x53_53_54_42_4C_49_54;
27
28const SSTABLE_FORMAT_VERSION: u16 = 1;
31
32const DEFAULT_BLOCK_SIZE: usize = 4096;
34
35const ENTRY_TYPE_VALUE: u8 = 0;
37const ENTRY_TYPE_TOMBSTONE: u8 = 1;
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct SSTableEntry {
42 pub key: Vec<u8>,
44 pub entry_type: u8,
46 pub value: Vec<u8>,
48}
49
50impl SSTableEntry {
51 pub fn value(key: Vec<u8>, value: Vec<u8>) -> Self {
53 Self {
54 key,
55 entry_type: ENTRY_TYPE_VALUE,
56 value,
57 }
58 }
59
60 pub fn tombstone(key: Vec<u8>) -> Self {
62 Self {
63 key,
64 entry_type: ENTRY_TYPE_TOMBSTONE,
65 value: Vec::new(),
66 }
67 }
68
69 pub fn is_tombstone(&self) -> bool {
71 self.entry_type == ENTRY_TYPE_TOMBSTONE
72 }
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct IndexEntry {
78 pub first_key: Vec<u8>,
80 pub offset: u64,
82 pub size: u32,
84}
85
86#[derive(Debug, Clone, Serialize, Deserialize)]
88pub struct SSTableFooter {
89 pub format_version: u16,
91 pub index_offset: u64,
93 pub index_size: u32,
95 pub entry_count: u64,
97 pub min_key: Vec<u8>,
99 pub max_key: Vec<u8>,
101 pub magic: u64,
103 pub crc: u32,
105}
106
107#[derive(Debug, Clone)]
109pub struct SSTableMeta {
110 pub path: PathBuf,
112 pub min_key: Vec<u8>,
114 pub max_key: Vec<u8>,
116 pub entry_count: u64,
118 pub file_size: u64,
120 pub level: u32,
122 pub sequence: u64,
124}
125
126pub struct SSTableWriter {
128 path: PathBuf,
130 writer: BufWriter<File>,
132 position: u64,
134 index: Vec<IndexEntry>,
136 block_buffer: Vec<u8>,
138 block_size: usize,
140 current_block_first_key: Option<Vec<u8>>,
142 entry_count: u64,
144 min_key: Option<Vec<u8>>,
146 max_key: Option<Vec<u8>>,
148}
149
150impl SSTableWriter {
151 pub fn new(path: impl AsRef<Path>) -> Result<Self> {
153 Self::with_block_size(path, DEFAULT_BLOCK_SIZE)
154 }
155
156 pub fn with_block_size(path: impl AsRef<Path>, block_size: usize) -> Result<Self> {
158 let path = path.as_ref().to_path_buf();
159 let file = File::create(&path)?;
160
161 Ok(Self {
162 path,
163 writer: BufWriter::new(file),
164 position: 0,
165 index: Vec::new(),
166 block_buffer: Vec::with_capacity(block_size),
167 block_size,
168 current_block_first_key: None,
169 entry_count: 0,
170 min_key: None,
171 max_key: None,
172 })
173 }
174
175 pub fn add(&mut self, entry: SSTableEntry) -> Result<()> {
177 if self.min_key.is_none() {
179 self.min_key = Some(entry.key.clone());
180 }
181 self.max_key = Some(entry.key.clone());
182
183 if self.current_block_first_key.is_none() {
185 self.current_block_first_key = Some(entry.key.clone());
186 }
187
188 let encoded =
190 bincode::serialize(&entry).map_err(|e| Error::Serialization(e.to_string()))?;
191
192 let len = encoded.len() as u32;
194 self.block_buffer.extend_from_slice(&len.to_le_bytes());
195 self.block_buffer.extend_from_slice(&encoded);
196
197 self.entry_count += 1;
198
199 if self.block_buffer.len() >= self.block_size {
201 self.flush_block()?;
202 }
203
204 Ok(())
205 }
206
207 fn flush_block(&mut self) -> Result<()> {
209 if self.block_buffer.is_empty() {
210 return Ok(());
211 }
212
213 let crc = crc32fast::hash(&self.block_buffer);
215
216 if let Some(first_key) = self.current_block_first_key.take() {
218 self.index.push(IndexEntry {
219 first_key,
220 offset: self.position,
221 size: self.block_buffer.len() as u32 + 4, });
223 }
224
225 self.writer.write_all(&self.block_buffer)?;
227 self.position += self.block_buffer.len() as u64;
228
229 self.writer.write_all(&crc.to_le_bytes())?;
231 self.position += 4;
232
233 self.block_buffer.clear();
234
235 Ok(())
236 }
237
238 pub fn finish(mut self) -> Result<SSTableMeta> {
240 self.flush_block()?;
242
243 let index_offset = self.position;
245 let index_encoded =
246 bincode::serialize(&self.index).map_err(|e| Error::Serialization(e.to_string()))?;
247 let index_size = index_encoded.len() as u32;
248
249 self.writer.write_all(&index_encoded)?;
250 self.position += index_size as u64;
251
252 let min_key = self.min_key.clone().unwrap_or_default();
254 let max_key = self.max_key.clone().unwrap_or_default();
255
256 let footer_data = SSTableFooter {
257 format_version: SSTABLE_FORMAT_VERSION,
258 index_offset,
259 index_size,
260 entry_count: self.entry_count,
261 min_key: min_key.clone(),
262 max_key: max_key.clone(),
263 magic: SSTABLE_MAGIC,
264 crc: 0, };
266
267 let footer_encoded =
268 bincode::serialize(&footer_data).map_err(|e| Error::Serialization(e.to_string()))?;
269 let footer_crc = crc32fast::hash(&footer_encoded);
270
271 let final_footer = SSTableFooter {
273 crc: footer_crc,
274 ..footer_data
275 };
276 let final_footer_encoded =
277 bincode::serialize(&final_footer).map_err(|e| Error::Serialization(e.to_string()))?;
278
279 let footer_len = final_footer_encoded.len() as u32;
281 self.writer.write_all(&final_footer_encoded)?;
282 self.writer.write_all(&footer_len.to_le_bytes())?;
283
284 self.writer.flush()?;
285
286 let file_size = self.position + final_footer_encoded.len() as u64 + 4;
287
288 Ok(SSTableMeta {
289 path: self.path,
290 min_key,
291 max_key,
292 entry_count: self.entry_count,
293 file_size,
294 level: 0,
295 sequence: 0,
296 })
297 }
298
299 pub fn from_memtable<I>(path: impl AsRef<Path>, iter: I) -> Result<SSTableMeta>
301 where
302 I: Iterator<Item = (Vec<u8>, MemtableEntry)>,
303 {
304 let mut writer = SSTableWriter::new(path)?;
305
306 for (key, entry) in iter {
307 let sstable_entry = match entry {
308 MemtableEntry::Value(v) => SSTableEntry::value(key, v),
309 MemtableEntry::Tombstone => SSTableEntry::tombstone(key),
310 };
311 writer.add(sstable_entry)?;
312 }
313
314 writer.finish()
315 }
316}
317
318pub struct SSTableReader {
320 path: PathBuf,
322 file: BufReader<File>,
324 index: Vec<IndexEntry>,
326 footer: SSTableFooter,
328 file_size: u64,
330}
331
332impl SSTableReader {
333 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
335 let path = path.as_ref().to_path_buf();
336 let mut file = File::open(&path)?;
337
338 let file_size = file.metadata()?.len();
340
341 if file_size < 4 {
342 return Err(Error::Corruption("SSTable too small".into()));
343 }
344
345 file.seek(SeekFrom::End(-4))?;
347 let mut footer_len_buf = [0u8; 4];
348 file.read_exact(&mut footer_len_buf)?;
349 let footer_len = u32::from_le_bytes(footer_len_buf) as i64;
350
351 file.seek(SeekFrom::End(-4 - footer_len))?;
353 let mut footer_buf = vec![0u8; footer_len as usize];
354 file.read_exact(&mut footer_buf)?;
355
356 let footer: SSTableFooter =
357 bincode::deserialize(&footer_buf).map_err(|e| Error::Serialization(e.to_string()))?;
358
359 if footer.magic != SSTABLE_MAGIC {
361 return Err(Error::Corruption("Invalid SSTable magic number".into()));
362 }
363
364 if footer.format_version != SSTABLE_FORMAT_VERSION {
366 return Err(Error::Corruption(format!(
367 "Unsupported SSTable format version: {} (expected {})",
368 footer.format_version, SSTABLE_FORMAT_VERSION
369 )));
370 }
371
372 file.seek(SeekFrom::Start(footer.index_offset))?;
374 let mut index_buf = vec![0u8; footer.index_size as usize];
375 file.read_exact(&mut index_buf)?;
376
377 let index: Vec<IndexEntry> =
378 bincode::deserialize(&index_buf).map_err(|e| Error::Serialization(e.to_string()))?;
379
380 Ok(Self {
381 path,
382 file: BufReader::new(file.try_clone()?),
383 index,
384 footer,
385 file_size,
386 })
387 }
388
389 pub fn get(&mut self, key: &[u8]) -> Result<Option<SSTableEntry>> {
391 let block_idx = self
393 .index
394 .partition_point(|entry| entry.first_key.as_slice() <= key);
395
396 if block_idx == 0 {
398 if key < self.footer.min_key.as_slice() {
400 return Ok(None);
401 }
402 }
403
404 let block_idx = if block_idx > 0 { block_idx - 1 } else { 0 };
406
407 if block_idx >= self.index.len() {
408 return Ok(None);
409 }
410
411 let block = self.read_block(block_idx)?;
413
414 for entry in block {
415 if entry.key.as_slice() == key {
416 return Ok(Some(entry));
417 }
418 if entry.key.as_slice() > key {
419 break;
420 }
421 }
422
423 Ok(None)
424 }
425
426 fn read_block(&mut self, block_idx: usize) -> Result<Vec<SSTableEntry>> {
428 let index_entry = &self.index[block_idx];
429
430 self.file.seek(SeekFrom::Start(index_entry.offset))?;
431
432 let data_size = index_entry.size as usize - 4; let mut data_buf = vec![0u8; data_size];
434 self.file.read_exact(&mut data_buf)?;
435
436 let mut crc_buf = [0u8; 4];
438 self.file.read_exact(&mut crc_buf)?;
439 let stored_crc = u32::from_le_bytes(crc_buf);
440 let computed_crc = crc32fast::hash(&data_buf);
441
442 if stored_crc != computed_crc {
443 return Err(Error::Corruption("Block CRC mismatch".into()));
444 }
445
446 let mut entries = Vec::new();
448 let mut offset = 0;
449
450 while offset < data_buf.len() {
451 if offset + 4 > data_buf.len() {
452 break;
453 }
454
455 let len = u32::from_le_bytes([
456 data_buf[offset],
457 data_buf[offset + 1],
458 data_buf[offset + 2],
459 data_buf[offset + 3],
460 ]) as usize;
461 offset += 4;
462
463 if offset + len > data_buf.len() {
464 break;
465 }
466
467 let entry: SSTableEntry = bincode::deserialize(&data_buf[offset..offset + len])
468 .map_err(|e| Error::Serialization(e.to_string()))?;
469 entries.push(entry);
470 offset += len;
471 }
472
473 Ok(entries)
474 }
475
476 pub fn metadata(&self) -> SSTableMeta {
478 SSTableMeta {
479 path: self.path.clone(),
480 min_key: self.footer.min_key.clone(),
481 max_key: self.footer.max_key.clone(),
482 entry_count: self.footer.entry_count,
483 file_size: self.file_size,
484 level: 0,
485 sequence: 0,
486 }
487 }
488
489 pub fn might_contain(&self, key: &[u8]) -> bool {
491 key >= self.footer.min_key.as_slice() && key <= self.footer.max_key.as_slice()
492 }
493
494 pub fn iter(&mut self) -> Result<SSTableIterator<'_>> {
496 Ok(SSTableIterator {
497 reader: self,
498 block_idx: 0,
499 block_entries: Vec::new(),
500 entry_idx: 0,
501 })
502 }
503}
504
505pub struct SSTableIterator<'a> {
507 reader: &'a mut SSTableReader,
508 block_idx: usize,
509 block_entries: Vec<SSTableEntry>,
510 entry_idx: usize,
511}
512
513impl SSTableIterator<'_> {
514 pub fn next_entry(&mut self) -> Result<Option<SSTableEntry>> {
516 loop {
517 if self.entry_idx < self.block_entries.len() {
519 let entry = self.block_entries[self.entry_idx].clone();
520 self.entry_idx += 1;
521 return Ok(Some(entry));
522 }
523
524 if self.block_idx >= self.reader.index.len() {
526 return Ok(None);
527 }
528
529 self.block_entries = self.reader.read_block(self.block_idx)?;
530 self.block_idx += 1;
531 self.entry_idx = 0;
532 }
533 }
534}
535
536pub fn delete_sstable(path: impl AsRef<Path>) -> Result<()> {
538 fs::remove_file(path)?;
539 Ok(())
540}
541
542#[cfg(test)]
543mod tests {
544 use super::*;
545 use tempfile::tempdir;
546
547 #[test]
548 fn test_sstable_write_read() {
549 let dir = tempdir().unwrap();
550 let path = dir.path().join("test.sst");
551
552 let mut writer = SSTableWriter::new(&path).unwrap();
554 writer
555 .add(SSTableEntry::value(b"a".to_vec(), b"1".to_vec()))
556 .unwrap();
557 writer
558 .add(SSTableEntry::value(b"b".to_vec(), b"2".to_vec()))
559 .unwrap();
560 writer
561 .add(SSTableEntry::value(b"c".to_vec(), b"3".to_vec()))
562 .unwrap();
563 let meta = writer.finish().unwrap();
564
565 assert_eq!(meta.entry_count, 3);
566 assert_eq!(meta.min_key, b"a".to_vec());
567 assert_eq!(meta.max_key, b"c".to_vec());
568
569 let mut reader = SSTableReader::open(&path).unwrap();
571
572 let entry = reader.get(b"a").unwrap().unwrap();
573 assert_eq!(entry.value, b"1".to_vec());
574
575 let entry = reader.get(b"b").unwrap().unwrap();
576 assert_eq!(entry.value, b"2".to_vec());
577
578 let entry = reader.get(b"c").unwrap().unwrap();
579 assert_eq!(entry.value, b"3".to_vec());
580
581 assert!(reader.get(b"d").unwrap().is_none());
582 }
583
584 #[test]
585 fn test_sstable_tombstone() {
586 let dir = tempdir().unwrap();
587 let path = dir.path().join("test.sst");
588
589 let mut writer = SSTableWriter::new(&path).unwrap();
591 writer
592 .add(SSTableEntry::tombstone(b"deleted".to_vec()))
593 .unwrap();
594 writer
595 .add(SSTableEntry::value(b"key".to_vec(), b"value".to_vec()))
596 .unwrap();
597 writer.finish().unwrap();
598
599 let mut reader = SSTableReader::open(&path).unwrap();
600
601 let entry = reader.get(b"key").unwrap().unwrap();
602 assert!(!entry.is_tombstone());
603
604 let entry = reader.get(b"deleted").unwrap().unwrap();
605 assert!(entry.is_tombstone());
606 }
607
608 #[test]
609 fn test_sstable_iterator() {
610 let dir = tempdir().unwrap();
611 let path = dir.path().join("test.sst");
612
613 let mut writer = SSTableWriter::new(&path).unwrap();
614 for i in 0..100 {
615 let key = format!("key{:03}", i);
616 let value = format!("value{}", i);
617 writer
618 .add(SSTableEntry::value(key.into_bytes(), value.into_bytes()))
619 .unwrap();
620 }
621 writer.finish().unwrap();
622
623 let mut reader = SSTableReader::open(&path).unwrap();
624 let mut iter = reader.iter().unwrap();
625
626 let mut count = 0;
627 while let Some(_entry) = iter.next_entry().unwrap() {
628 count += 1;
629 }
630
631 assert_eq!(count, 100);
632 }
633
634 #[test]
635 fn test_sstable_from_memtable() {
636 use crate::memtable::Memtable;
637
638 let dir = tempdir().unwrap();
639 let path = dir.path().join("test.sst");
640
641 let mut mt = Memtable::new();
642 mt.put(b"a".to_vec(), b"1".to_vec());
643 mt.put(b"b".to_vec(), b"2".to_vec());
644 mt.delete(b"c".to_vec());
645
646 let meta = SSTableWriter::from_memtable(&path, mt.drain()).unwrap();
647
648 assert_eq!(meta.entry_count, 3);
649
650 let mut reader = SSTableReader::open(&path).unwrap();
651 assert_eq!(reader.get(b"a").unwrap().unwrap().value, b"1".to_vec());
652 assert!(reader.get(b"c").unwrap().unwrap().is_tombstone());
653 }
654
655 #[test]
656 fn test_sstable_might_contain() {
657 let dir = tempdir().unwrap();
658 let path = dir.path().join("test.sst");
659
660 let mut writer = SSTableWriter::new(&path).unwrap();
661 writer
662 .add(SSTableEntry::value(b"b".to_vec(), b"2".to_vec()))
663 .unwrap();
664 writer
665 .add(SSTableEntry::value(b"d".to_vec(), b"4".to_vec()))
666 .unwrap();
667 writer.finish().unwrap();
668
669 let reader = SSTableReader::open(&path).unwrap();
670
671 assert!(!reader.might_contain(b"a")); assert!(reader.might_contain(b"b")); assert!(reader.might_contain(b"c")); assert!(reader.might_contain(b"d")); assert!(!reader.might_contain(b"e")); }
677}