1use crate::memtable::MemtableEntry;
19use rustlite_core::{Error, Result};
20use serde::{Deserialize, Serialize};
21use std::fs::{self, File};
22use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
23use std::path::{Path, PathBuf};
24
25const SSTABLE_MAGIC: u64 = 0x53_53_54_42_4C_49_54; const DEFAULT_BLOCK_SIZE: usize = 4096;
30
31const ENTRY_TYPE_VALUE: u8 = 0;
33const ENTRY_TYPE_TOMBSTONE: u8 = 1;
34
35#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct SSTableEntry {
38 pub key: Vec<u8>,
40 pub entry_type: u8,
42 pub value: Vec<u8>,
44}
45
46impl SSTableEntry {
47 pub fn value(key: Vec<u8>, value: Vec<u8>) -> Self {
49 Self {
50 key,
51 entry_type: ENTRY_TYPE_VALUE,
52 value,
53 }
54 }
55
56 pub fn tombstone(key: Vec<u8>) -> Self {
58 Self {
59 key,
60 entry_type: ENTRY_TYPE_TOMBSTONE,
61 value: Vec::new(),
62 }
63 }
64
65 pub fn is_tombstone(&self) -> bool {
67 self.entry_type == ENTRY_TYPE_TOMBSTONE
68 }
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize)]
73pub struct IndexEntry {
74 pub first_key: Vec<u8>,
76 pub offset: u64,
78 pub size: u32,
80}
81
82#[derive(Debug, Clone, Serialize, Deserialize)]
84pub struct SSTableFooter {
85 pub index_offset: u64,
87 pub index_size: u32,
89 pub entry_count: u64,
91 pub min_key: Vec<u8>,
93 pub max_key: Vec<u8>,
95 pub magic: u64,
97 pub crc: u32,
99}
100
101#[derive(Debug, Clone)]
103pub struct SSTableMeta {
104 pub path: PathBuf,
106 pub min_key: Vec<u8>,
108 pub max_key: Vec<u8>,
110 pub entry_count: u64,
112 pub file_size: u64,
114 pub level: u32,
116 pub sequence: u64,
118}
119
120pub struct SSTableWriter {
122 path: PathBuf,
124 writer: BufWriter<File>,
126 position: u64,
128 index: Vec<IndexEntry>,
130 block_buffer: Vec<u8>,
132 block_size: usize,
134 current_block_first_key: Option<Vec<u8>>,
136 entry_count: u64,
138 min_key: Option<Vec<u8>>,
140 max_key: Option<Vec<u8>>,
142}
143
144impl SSTableWriter {
145 pub fn new(path: impl AsRef<Path>) -> Result<Self> {
147 Self::with_block_size(path, DEFAULT_BLOCK_SIZE)
148 }
149
150 pub fn with_block_size(path: impl AsRef<Path>, block_size: usize) -> Result<Self> {
152 let path = path.as_ref().to_path_buf();
153 let file = File::create(&path)?;
154
155 Ok(Self {
156 path,
157 writer: BufWriter::new(file),
158 position: 0,
159 index: Vec::new(),
160 block_buffer: Vec::with_capacity(block_size),
161 block_size,
162 current_block_first_key: None,
163 entry_count: 0,
164 min_key: None,
165 max_key: None,
166 })
167 }
168
169 pub fn add(&mut self, entry: SSTableEntry) -> Result<()> {
171 if self.min_key.is_none() {
173 self.min_key = Some(entry.key.clone());
174 }
175 self.max_key = Some(entry.key.clone());
176
177 if self.current_block_first_key.is_none() {
179 self.current_block_first_key = Some(entry.key.clone());
180 }
181
182 let encoded =
184 bincode::serialize(&entry).map_err(|e| Error::Serialization(e.to_string()))?;
185
186 let len = encoded.len() as u32;
188 self.block_buffer.extend_from_slice(&len.to_le_bytes());
189 self.block_buffer.extend_from_slice(&encoded);
190
191 self.entry_count += 1;
192
193 if self.block_buffer.len() >= self.block_size {
195 self.flush_block()?;
196 }
197
198 Ok(())
199 }
200
201 fn flush_block(&mut self) -> Result<()> {
203 if self.block_buffer.is_empty() {
204 return Ok(());
205 }
206
207 let crc = crc32fast::hash(&self.block_buffer);
209
210 if let Some(first_key) = self.current_block_first_key.take() {
212 self.index.push(IndexEntry {
213 first_key,
214 offset: self.position,
215 size: self.block_buffer.len() as u32 + 4, });
217 }
218
219 self.writer.write_all(&self.block_buffer)?;
221 self.position += self.block_buffer.len() as u64;
222
223 self.writer.write_all(&crc.to_le_bytes())?;
225 self.position += 4;
226
227 self.block_buffer.clear();
228
229 Ok(())
230 }
231
232 pub fn finish(mut self) -> Result<SSTableMeta> {
234 self.flush_block()?;
236
237 let index_offset = self.position;
239 let index_encoded =
240 bincode::serialize(&self.index).map_err(|e| Error::Serialization(e.to_string()))?;
241 let index_size = index_encoded.len() as u32;
242
243 self.writer.write_all(&index_encoded)?;
244 self.position += index_size as u64;
245
246 let min_key = self.min_key.clone().unwrap_or_default();
248 let max_key = self.max_key.clone().unwrap_or_default();
249
250 let footer_data = SSTableFooter {
251 index_offset,
252 index_size,
253 entry_count: self.entry_count,
254 min_key: min_key.clone(),
255 max_key: max_key.clone(),
256 magic: SSTABLE_MAGIC,
257 crc: 0, };
259
260 let footer_encoded =
261 bincode::serialize(&footer_data).map_err(|e| Error::Serialization(e.to_string()))?;
262 let footer_crc = crc32fast::hash(&footer_encoded);
263
264 let final_footer = SSTableFooter {
266 crc: footer_crc,
267 ..footer_data
268 };
269 let final_footer_encoded =
270 bincode::serialize(&final_footer).map_err(|e| Error::Serialization(e.to_string()))?;
271
272 let footer_len = final_footer_encoded.len() as u32;
274 self.writer.write_all(&final_footer_encoded)?;
275 self.writer.write_all(&footer_len.to_le_bytes())?;
276
277 self.writer.flush()?;
278
279 let file_size = self.position + final_footer_encoded.len() as u64 + 4;
280
281 Ok(SSTableMeta {
282 path: self.path,
283 min_key,
284 max_key,
285 entry_count: self.entry_count,
286 file_size,
287 level: 0,
288 sequence: 0,
289 })
290 }
291
292 pub fn from_memtable<I>(path: impl AsRef<Path>, iter: I) -> Result<SSTableMeta>
294 where
295 I: Iterator<Item = (Vec<u8>, MemtableEntry)>,
296 {
297 let mut writer = SSTableWriter::new(path)?;
298
299 for (key, entry) in iter {
300 let sstable_entry = match entry {
301 MemtableEntry::Value(v) => SSTableEntry::value(key, v),
302 MemtableEntry::Tombstone => SSTableEntry::tombstone(key),
303 };
304 writer.add(sstable_entry)?;
305 }
306
307 writer.finish()
308 }
309}
310
311pub struct SSTableReader {
313 path: PathBuf,
315 file: BufReader<File>,
317 index: Vec<IndexEntry>,
319 footer: SSTableFooter,
321 file_size: u64,
323}
324
325impl SSTableReader {
326 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
328 let path = path.as_ref().to_path_buf();
329 let mut file = File::open(&path)?;
330
331 let file_size = file.metadata()?.len();
333
334 if file_size < 4 {
335 return Err(Error::Corruption("SSTable too small".into()));
336 }
337
338 file.seek(SeekFrom::End(-4))?;
340 let mut footer_len_buf = [0u8; 4];
341 file.read_exact(&mut footer_len_buf)?;
342 let footer_len = u32::from_le_bytes(footer_len_buf) as i64;
343
344 file.seek(SeekFrom::End(-4 - footer_len))?;
346 let mut footer_buf = vec![0u8; footer_len as usize];
347 file.read_exact(&mut footer_buf)?;
348
349 let footer: SSTableFooter =
350 bincode::deserialize(&footer_buf).map_err(|e| Error::Serialization(e.to_string()))?;
351
352 if footer.magic != SSTABLE_MAGIC {
354 return Err(Error::Corruption("Invalid SSTable magic number".into()));
355 }
356
357 file.seek(SeekFrom::Start(footer.index_offset))?;
359 let mut index_buf = vec![0u8; footer.index_size as usize];
360 file.read_exact(&mut index_buf)?;
361
362 let index: Vec<IndexEntry> =
363 bincode::deserialize(&index_buf).map_err(|e| Error::Serialization(e.to_string()))?;
364
365 Ok(Self {
366 path,
367 file: BufReader::new(file.try_clone()?),
368 index,
369 footer,
370 file_size,
371 })
372 }
373
374 pub fn get(&mut self, key: &[u8]) -> Result<Option<SSTableEntry>> {
376 let block_idx = self
378 .index
379 .partition_point(|entry| entry.first_key.as_slice() <= key);
380
381 if block_idx == 0 {
383 if key < self.footer.min_key.as_slice() {
385 return Ok(None);
386 }
387 }
388
389 let block_idx = if block_idx > 0 { block_idx - 1 } else { 0 };
391
392 if block_idx >= self.index.len() {
393 return Ok(None);
394 }
395
396 let block = self.read_block(block_idx)?;
398
399 for entry in block {
400 if entry.key.as_slice() == key {
401 return Ok(Some(entry));
402 }
403 if entry.key.as_slice() > key {
404 break;
405 }
406 }
407
408 Ok(None)
409 }
410
411 fn read_block(&mut self, block_idx: usize) -> Result<Vec<SSTableEntry>> {
413 let index_entry = &self.index[block_idx];
414
415 self.file.seek(SeekFrom::Start(index_entry.offset))?;
416
417 let data_size = index_entry.size as usize - 4; let mut data_buf = vec![0u8; data_size];
419 self.file.read_exact(&mut data_buf)?;
420
421 let mut crc_buf = [0u8; 4];
423 self.file.read_exact(&mut crc_buf)?;
424 let stored_crc = u32::from_le_bytes(crc_buf);
425 let computed_crc = crc32fast::hash(&data_buf);
426
427 if stored_crc != computed_crc {
428 return Err(Error::Corruption("Block CRC mismatch".into()));
429 }
430
431 let mut entries = Vec::new();
433 let mut offset = 0;
434
435 while offset < data_buf.len() {
436 if offset + 4 > data_buf.len() {
437 break;
438 }
439
440 let len = u32::from_le_bytes([
441 data_buf[offset],
442 data_buf[offset + 1],
443 data_buf[offset + 2],
444 data_buf[offset + 3],
445 ]) as usize;
446 offset += 4;
447
448 if offset + len > data_buf.len() {
449 break;
450 }
451
452 let entry: SSTableEntry = bincode::deserialize(&data_buf[offset..offset + len])
453 .map_err(|e| Error::Serialization(e.to_string()))?;
454 entries.push(entry);
455 offset += len;
456 }
457
458 Ok(entries)
459 }
460
461 pub fn metadata(&self) -> SSTableMeta {
463 SSTableMeta {
464 path: self.path.clone(),
465 min_key: self.footer.min_key.clone(),
466 max_key: self.footer.max_key.clone(),
467 entry_count: self.footer.entry_count,
468 file_size: self.file_size,
469 level: 0,
470 sequence: 0,
471 }
472 }
473
474 pub fn might_contain(&self, key: &[u8]) -> bool {
476 key >= self.footer.min_key.as_slice() && key <= self.footer.max_key.as_slice()
477 }
478
479 pub fn iter(&mut self) -> Result<SSTableIterator<'_>> {
481 Ok(SSTableIterator {
482 reader: self,
483 block_idx: 0,
484 block_entries: Vec::new(),
485 entry_idx: 0,
486 })
487 }
488}
489
490pub struct SSTableIterator<'a> {
492 reader: &'a mut SSTableReader,
493 block_idx: usize,
494 block_entries: Vec<SSTableEntry>,
495 entry_idx: usize,
496}
497
498impl SSTableIterator<'_> {
499 pub fn next_entry(&mut self) -> Result<Option<SSTableEntry>> {
501 loop {
502 if self.entry_idx < self.block_entries.len() {
504 let entry = self.block_entries[self.entry_idx].clone();
505 self.entry_idx += 1;
506 return Ok(Some(entry));
507 }
508
509 if self.block_idx >= self.reader.index.len() {
511 return Ok(None);
512 }
513
514 self.block_entries = self.reader.read_block(self.block_idx)?;
515 self.block_idx += 1;
516 self.entry_idx = 0;
517 }
518 }
519}
520
521pub fn delete_sstable(path: impl AsRef<Path>) -> Result<()> {
523 fs::remove_file(path)?;
524 Ok(())
525}
526
527#[cfg(test)]
528mod tests {
529 use super::*;
530 use tempfile::tempdir;
531
532 #[test]
533 fn test_sstable_write_read() {
534 let dir = tempdir().unwrap();
535 let path = dir.path().join("test.sst");
536
537 let mut writer = SSTableWriter::new(&path).unwrap();
539 writer
540 .add(SSTableEntry::value(b"a".to_vec(), b"1".to_vec()))
541 .unwrap();
542 writer
543 .add(SSTableEntry::value(b"b".to_vec(), b"2".to_vec()))
544 .unwrap();
545 writer
546 .add(SSTableEntry::value(b"c".to_vec(), b"3".to_vec()))
547 .unwrap();
548 let meta = writer.finish().unwrap();
549
550 assert_eq!(meta.entry_count, 3);
551 assert_eq!(meta.min_key, b"a".to_vec());
552 assert_eq!(meta.max_key, b"c".to_vec());
553
554 let mut reader = SSTableReader::open(&path).unwrap();
556
557 let entry = reader.get(b"a").unwrap().unwrap();
558 assert_eq!(entry.value, b"1".to_vec());
559
560 let entry = reader.get(b"b").unwrap().unwrap();
561 assert_eq!(entry.value, b"2".to_vec());
562
563 let entry = reader.get(b"c").unwrap().unwrap();
564 assert_eq!(entry.value, b"3".to_vec());
565
566 assert!(reader.get(b"d").unwrap().is_none());
567 }
568
569 #[test]
570 fn test_sstable_tombstone() {
571 let dir = tempdir().unwrap();
572 let path = dir.path().join("test.sst");
573
574 let mut writer = SSTableWriter::new(&path).unwrap();
576 writer
577 .add(SSTableEntry::tombstone(b"deleted".to_vec()))
578 .unwrap();
579 writer
580 .add(SSTableEntry::value(b"key".to_vec(), b"value".to_vec()))
581 .unwrap();
582 writer.finish().unwrap();
583
584 let mut reader = SSTableReader::open(&path).unwrap();
585
586 let entry = reader.get(b"key").unwrap().unwrap();
587 assert!(!entry.is_tombstone());
588
589 let entry = reader.get(b"deleted").unwrap().unwrap();
590 assert!(entry.is_tombstone());
591 }
592
593 #[test]
594 fn test_sstable_iterator() {
595 let dir = tempdir().unwrap();
596 let path = dir.path().join("test.sst");
597
598 let mut writer = SSTableWriter::new(&path).unwrap();
599 for i in 0..100 {
600 let key = format!("key{:03}", i);
601 let value = format!("value{}", i);
602 writer
603 .add(SSTableEntry::value(key.into_bytes(), value.into_bytes()))
604 .unwrap();
605 }
606 writer.finish().unwrap();
607
608 let mut reader = SSTableReader::open(&path).unwrap();
609 let mut iter = reader.iter().unwrap();
610
611 let mut count = 0;
612 while let Some(_entry) = iter.next_entry().unwrap() {
613 count += 1;
614 }
615
616 assert_eq!(count, 100);
617 }
618
619 #[test]
620 fn test_sstable_from_memtable() {
621 use crate::memtable::Memtable;
622
623 let dir = tempdir().unwrap();
624 let path = dir.path().join("test.sst");
625
626 let mut mt = Memtable::new();
627 mt.put(b"a".to_vec(), b"1".to_vec());
628 mt.put(b"b".to_vec(), b"2".to_vec());
629 mt.delete(b"c".to_vec());
630
631 let meta = SSTableWriter::from_memtable(&path, mt.drain()).unwrap();
632
633 assert_eq!(meta.entry_count, 3);
634
635 let mut reader = SSTableReader::open(&path).unwrap();
636 assert_eq!(reader.get(b"a").unwrap().unwrap().value, b"1".to_vec());
637 assert!(reader.get(b"c").unwrap().unwrap().is_tombstone());
638 }
639
640 #[test]
641 fn test_sstable_might_contain() {
642 let dir = tempdir().unwrap();
643 let path = dir.path().join("test.sst");
644
645 let mut writer = SSTableWriter::new(&path).unwrap();
646 writer
647 .add(SSTableEntry::value(b"b".to_vec(), b"2".to_vec()))
648 .unwrap();
649 writer
650 .add(SSTableEntry::value(b"d".to_vec(), b"4".to_vec()))
651 .unwrap();
652 writer.finish().unwrap();
653
654 let reader = SSTableReader::open(&path).unwrap();
655
656 assert!(!reader.might_contain(b"a")); assert!(reader.might_contain(b"b")); assert!(reader.might_contain(b"c")); assert!(reader.might_contain(b"d")); assert!(!reader.might_contain(b"e")); }
662}