1use crate::memtable::MemtableEntry;
19use rustlite_core::{Error, Result};
20use serde::{Deserialize, Serialize};
21use std::fs::{self, File};
22use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
23use std::path::{Path, PathBuf};
24
25const SSTABLE_MAGIC: u64 = 0x535354_424C_4954; const DEFAULT_BLOCK_SIZE: usize = 4096;
30
31const ENTRY_TYPE_VALUE: u8 = 0;
33const ENTRY_TYPE_TOMBSTONE: u8 = 1;
34
35#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct SSTableEntry {
38 pub key: Vec<u8>,
40 pub entry_type: u8,
42 pub value: Vec<u8>,
44}
45
46impl SSTableEntry {
47 pub fn value(key: Vec<u8>, value: Vec<u8>) -> Self {
49 Self {
50 key,
51 entry_type: ENTRY_TYPE_VALUE,
52 value,
53 }
54 }
55
56 pub fn tombstone(key: Vec<u8>) -> Self {
58 Self {
59 key,
60 entry_type: ENTRY_TYPE_TOMBSTONE,
61 value: Vec::new(),
62 }
63 }
64
65 pub fn is_tombstone(&self) -> bool {
67 self.entry_type == ENTRY_TYPE_TOMBSTONE
68 }
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize)]
73pub struct IndexEntry {
74 pub first_key: Vec<u8>,
76 pub offset: u64,
78 pub size: u32,
80}
81
82#[derive(Debug, Clone, Serialize, Deserialize)]
84pub struct SSTableFooter {
85 pub index_offset: u64,
87 pub index_size: u32,
89 pub entry_count: u64,
91 pub min_key: Vec<u8>,
93 pub max_key: Vec<u8>,
95 pub magic: u64,
97 pub crc: u32,
99}
100
101#[derive(Debug, Clone)]
103pub struct SSTableMeta {
104 pub path: PathBuf,
106 pub min_key: Vec<u8>,
108 pub max_key: Vec<u8>,
110 pub entry_count: u64,
112 pub file_size: u64,
114 pub level: u32,
116 pub sequence: u64,
118}
119
120pub struct SSTableWriter {
122 path: PathBuf,
124 writer: BufWriter<File>,
126 position: u64,
128 index: Vec<IndexEntry>,
130 block_buffer: Vec<u8>,
132 block_size: usize,
134 current_block_first_key: Option<Vec<u8>>,
136 entry_count: u64,
138 min_key: Option<Vec<u8>>,
140 max_key: Option<Vec<u8>>,
142}
143
144impl SSTableWriter {
145 pub fn new(path: impl AsRef<Path>) -> Result<Self> {
147 Self::with_block_size(path, DEFAULT_BLOCK_SIZE)
148 }
149
150 pub fn with_block_size(path: impl AsRef<Path>, block_size: usize) -> Result<Self> {
152 let path = path.as_ref().to_path_buf();
153 let file = File::create(&path)?;
154
155 Ok(Self {
156 path,
157 writer: BufWriter::new(file),
158 position: 0,
159 index: Vec::new(),
160 block_buffer: Vec::with_capacity(block_size),
161 block_size,
162 current_block_first_key: None,
163 entry_count: 0,
164 min_key: None,
165 max_key: None,
166 })
167 }
168
169 pub fn add(&mut self, entry: SSTableEntry) -> Result<()> {
171 if self.min_key.is_none() {
173 self.min_key = Some(entry.key.clone());
174 }
175 self.max_key = Some(entry.key.clone());
176
177 if self.current_block_first_key.is_none() {
179 self.current_block_first_key = Some(entry.key.clone());
180 }
181
182 let encoded = bincode::serialize(&entry)
184 .map_err(|e| Error::Serialization(e.to_string()))?;
185
186 let len = encoded.len() as u32;
188 self.block_buffer.extend_from_slice(&len.to_le_bytes());
189 self.block_buffer.extend_from_slice(&encoded);
190
191 self.entry_count += 1;
192
193 if self.block_buffer.len() >= self.block_size {
195 self.flush_block()?;
196 }
197
198 Ok(())
199 }
200
201 fn flush_block(&mut self) -> Result<()> {
203 if self.block_buffer.is_empty() {
204 return Ok(());
205 }
206
207 let crc = crc32fast::hash(&self.block_buffer);
209
210 if let Some(first_key) = self.current_block_first_key.take() {
212 self.index.push(IndexEntry {
213 first_key,
214 offset: self.position,
215 size: self.block_buffer.len() as u32 + 4, });
217 }
218
219 self.writer.write_all(&self.block_buffer)?;
221 self.position += self.block_buffer.len() as u64;
222
223 self.writer.write_all(&crc.to_le_bytes())?;
225 self.position += 4;
226
227 self.block_buffer.clear();
228
229 Ok(())
230 }
231
232 pub fn finish(mut self) -> Result<SSTableMeta> {
234 self.flush_block()?;
236
237 let index_offset = self.position;
239 let index_encoded = bincode::serialize(&self.index)
240 .map_err(|e| Error::Serialization(e.to_string()))?;
241 let index_size = index_encoded.len() as u32;
242
243 self.writer.write_all(&index_encoded)?;
244 self.position += index_size as u64;
245
246 let min_key = self.min_key.clone().unwrap_or_default();
248 let max_key = self.max_key.clone().unwrap_or_default();
249
250 let footer_data = SSTableFooter {
251 index_offset,
252 index_size,
253 entry_count: self.entry_count,
254 min_key: min_key.clone(),
255 max_key: max_key.clone(),
256 magic: SSTABLE_MAGIC,
257 crc: 0, };
259
260 let footer_encoded = bincode::serialize(&footer_data)
261 .map_err(|e| Error::Serialization(e.to_string()))?;
262 let footer_crc = crc32fast::hash(&footer_encoded);
263
264 let final_footer = SSTableFooter {
266 crc: footer_crc,
267 ..footer_data
268 };
269 let final_footer_encoded = bincode::serialize(&final_footer)
270 .map_err(|e| Error::Serialization(e.to_string()))?;
271
272 let footer_len = final_footer_encoded.len() as u32;
274 self.writer.write_all(&final_footer_encoded)?;
275 self.writer.write_all(&footer_len.to_le_bytes())?;
276
277 self.writer.flush()?;
278
279 let file_size = self.position + final_footer_encoded.len() as u64 + 4;
280
281 Ok(SSTableMeta {
282 path: self.path,
283 min_key,
284 max_key,
285 entry_count: self.entry_count,
286 file_size,
287 level: 0,
288 sequence: 0,
289 })
290 }
291
292 pub fn from_memtable<I>(path: impl AsRef<Path>, iter: I) -> Result<SSTableMeta>
294 where
295 I: Iterator<Item = (Vec<u8>, MemtableEntry)>,
296 {
297 let mut writer = SSTableWriter::new(path)?;
298
299 for (key, entry) in iter {
300 let sstable_entry = match entry {
301 MemtableEntry::Value(v) => SSTableEntry::value(key, v),
302 MemtableEntry::Tombstone => SSTableEntry::tombstone(key),
303 };
304 writer.add(sstable_entry)?;
305 }
306
307 writer.finish()
308 }
309}
310
311pub struct SSTableReader {
313 path: PathBuf,
315 file: BufReader<File>,
317 index: Vec<IndexEntry>,
319 footer: SSTableFooter,
321 file_size: u64,
323}
324
325impl SSTableReader {
326 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
328 let path = path.as_ref().to_path_buf();
329 let mut file = File::open(&path)?;
330
331 let file_size = file.metadata()?.len();
333
334 if file_size < 4 {
335 return Err(Error::Corruption("SSTable too small".into()));
336 }
337
338 file.seek(SeekFrom::End(-4))?;
340 let mut footer_len_buf = [0u8; 4];
341 file.read_exact(&mut footer_len_buf)?;
342 let footer_len = u32::from_le_bytes(footer_len_buf) as i64;
343
344 file.seek(SeekFrom::End(-4 - footer_len))?;
346 let mut footer_buf = vec![0u8; footer_len as usize];
347 file.read_exact(&mut footer_buf)?;
348
349 let footer: SSTableFooter = bincode::deserialize(&footer_buf)
350 .map_err(|e| Error::Serialization(e.to_string()))?;
351
352 if footer.magic != SSTABLE_MAGIC {
354 return Err(Error::Corruption("Invalid SSTable magic number".into()));
355 }
356
357 file.seek(SeekFrom::Start(footer.index_offset))?;
359 let mut index_buf = vec![0u8; footer.index_size as usize];
360 file.read_exact(&mut index_buf)?;
361
362 let index: Vec<IndexEntry> = bincode::deserialize(&index_buf)
363 .map_err(|e| Error::Serialization(e.to_string()))?;
364
365 Ok(Self {
366 path,
367 file: BufReader::new(file.try_clone()?),
368 index,
369 footer,
370 file_size,
371 })
372 }
373
374 pub fn get(&mut self, key: &[u8]) -> Result<Option<SSTableEntry>> {
376 let block_idx = self.index.partition_point(|entry| entry.first_key.as_slice() <= key);
378
379 if block_idx == 0 {
381 if key < self.footer.min_key.as_slice() {
383 return Ok(None);
384 }
385 }
386
387 let block_idx = if block_idx > 0 { block_idx - 1 } else { 0 };
389
390 if block_idx >= self.index.len() {
391 return Ok(None);
392 }
393
394 let block = self.read_block(block_idx)?;
396
397 for entry in block {
398 if entry.key.as_slice() == key {
399 return Ok(Some(entry));
400 }
401 if entry.key.as_slice() > key {
402 break;
403 }
404 }
405
406 Ok(None)
407 }
408
409 fn read_block(&mut self, block_idx: usize) -> Result<Vec<SSTableEntry>> {
411 let index_entry = &self.index[block_idx];
412
413 self.file.seek(SeekFrom::Start(index_entry.offset))?;
414
415 let data_size = index_entry.size as usize - 4; let mut data_buf = vec![0u8; data_size];
417 self.file.read_exact(&mut data_buf)?;
418
419 let mut crc_buf = [0u8; 4];
421 self.file.read_exact(&mut crc_buf)?;
422 let stored_crc = u32::from_le_bytes(crc_buf);
423 let computed_crc = crc32fast::hash(&data_buf);
424
425 if stored_crc != computed_crc {
426 return Err(Error::Corruption("Block CRC mismatch".into()));
427 }
428
429 let mut entries = Vec::new();
431 let mut offset = 0;
432
433 while offset < data_buf.len() {
434 if offset + 4 > data_buf.len() {
435 break;
436 }
437
438 let len = u32::from_le_bytes([
439 data_buf[offset],
440 data_buf[offset + 1],
441 data_buf[offset + 2],
442 data_buf[offset + 3],
443 ]) as usize;
444 offset += 4;
445
446 if offset + len > data_buf.len() {
447 break;
448 }
449
450 let entry: SSTableEntry = bincode::deserialize(&data_buf[offset..offset + len])
451 .map_err(|e| Error::Serialization(e.to_string()))?;
452 entries.push(entry);
453 offset += len;
454 }
455
456 Ok(entries)
457 }
458
459 pub fn metadata(&self) -> SSTableMeta {
461 SSTableMeta {
462 path: self.path.clone(),
463 min_key: self.footer.min_key.clone(),
464 max_key: self.footer.max_key.clone(),
465 entry_count: self.footer.entry_count,
466 file_size: self.file_size,
467 level: 0,
468 sequence: 0,
469 }
470 }
471
472 pub fn might_contain(&self, key: &[u8]) -> bool {
474 key >= self.footer.min_key.as_slice() && key <= self.footer.max_key.as_slice()
475 }
476
477 pub fn iter(&mut self) -> Result<SSTableIterator<'_>> {
479 Ok(SSTableIterator {
480 reader: self,
481 block_idx: 0,
482 block_entries: Vec::new(),
483 entry_idx: 0,
484 })
485 }
486}
487
488pub struct SSTableIterator<'a> {
490 reader: &'a mut SSTableReader,
491 block_idx: usize,
492 block_entries: Vec<SSTableEntry>,
493 entry_idx: usize,
494}
495
496impl<'a> SSTableIterator<'a> {
497 pub fn next_entry(&mut self) -> Result<Option<SSTableEntry>> {
499 loop {
500 if self.entry_idx < self.block_entries.len() {
502 let entry = self.block_entries[self.entry_idx].clone();
503 self.entry_idx += 1;
504 return Ok(Some(entry));
505 }
506
507 if self.block_idx >= self.reader.index.len() {
509 return Ok(None);
510 }
511
512 self.block_entries = self.reader.read_block(self.block_idx)?;
513 self.block_idx += 1;
514 self.entry_idx = 0;
515 }
516 }
517}
518
519pub fn delete_sstable(path: impl AsRef<Path>) -> Result<()> {
521 fs::remove_file(path)?;
522 Ok(())
523}
524
525#[cfg(test)]
526mod tests {
527 use super::*;
528 use tempfile::tempdir;
529
530 #[test]
531 fn test_sstable_write_read() {
532 let dir = tempdir().unwrap();
533 let path = dir.path().join("test.sst");
534
535 let mut writer = SSTableWriter::new(&path).unwrap();
537 writer.add(SSTableEntry::value(b"a".to_vec(), b"1".to_vec())).unwrap();
538 writer.add(SSTableEntry::value(b"b".to_vec(), b"2".to_vec())).unwrap();
539 writer.add(SSTableEntry::value(b"c".to_vec(), b"3".to_vec())).unwrap();
540 let meta = writer.finish().unwrap();
541
542 assert_eq!(meta.entry_count, 3);
543 assert_eq!(meta.min_key, b"a".to_vec());
544 assert_eq!(meta.max_key, b"c".to_vec());
545
546 let mut reader = SSTableReader::open(&path).unwrap();
548
549 let entry = reader.get(b"a").unwrap().unwrap();
550 assert_eq!(entry.value, b"1".to_vec());
551
552 let entry = reader.get(b"b").unwrap().unwrap();
553 assert_eq!(entry.value, b"2".to_vec());
554
555 let entry = reader.get(b"c").unwrap().unwrap();
556 assert_eq!(entry.value, b"3".to_vec());
557
558 assert!(reader.get(b"d").unwrap().is_none());
559 }
560
561 #[test]
562 fn test_sstable_tombstone() {
563 let dir = tempdir().unwrap();
564 let path = dir.path().join("test.sst");
565
566 let mut writer = SSTableWriter::new(&path).unwrap();
568 writer
569 .add(SSTableEntry::tombstone(b"deleted".to_vec()))
570 .unwrap();
571 writer
572 .add(SSTableEntry::value(b"key".to_vec(), b"value".to_vec()))
573 .unwrap();
574 writer.finish().unwrap();
575
576 let mut reader = SSTableReader::open(&path).unwrap();
577
578 let entry = reader.get(b"key").unwrap().unwrap();
579 assert!(!entry.is_tombstone());
580
581 let entry = reader.get(b"deleted").unwrap().unwrap();
582 assert!(entry.is_tombstone());
583 }
584
585 #[test]
586 fn test_sstable_iterator() {
587 let dir = tempdir().unwrap();
588 let path = dir.path().join("test.sst");
589
590 let mut writer = SSTableWriter::new(&path).unwrap();
591 for i in 0..100 {
592 let key = format!("key{:03}", i);
593 let value = format!("value{}", i);
594 writer.add(SSTableEntry::value(key.into_bytes(), value.into_bytes())).unwrap();
595 }
596 writer.finish().unwrap();
597
598 let mut reader = SSTableReader::open(&path).unwrap();
599 let mut iter = reader.iter().unwrap();
600
601 let mut count = 0;
602 while let Some(_entry) = iter.next_entry().unwrap() {
603 count += 1;
604 }
605
606 assert_eq!(count, 100);
607 }
608
609 #[test]
610 fn test_sstable_from_memtable() {
611 use crate::memtable::Memtable;
612
613 let dir = tempdir().unwrap();
614 let path = dir.path().join("test.sst");
615
616 let mut mt = Memtable::new();
617 mt.put(b"a".to_vec(), b"1".to_vec());
618 mt.put(b"b".to_vec(), b"2".to_vec());
619 mt.delete(b"c".to_vec());
620
621 let meta = SSTableWriter::from_memtable(&path, mt.into_iter()).unwrap();
622
623 assert_eq!(meta.entry_count, 3);
624
625 let mut reader = SSTableReader::open(&path).unwrap();
626 assert_eq!(reader.get(b"a").unwrap().unwrap().value, b"1".to_vec());
627 assert!(reader.get(b"c").unwrap().unwrap().is_tombstone());
628 }
629
630 #[test]
631 fn test_sstable_might_contain() {
632 let dir = tempdir().unwrap();
633 let path = dir.path().join("test.sst");
634
635 let mut writer = SSTableWriter::new(&path).unwrap();
636 writer.add(SSTableEntry::value(b"b".to_vec(), b"2".to_vec())).unwrap();
637 writer.add(SSTableEntry::value(b"d".to_vec(), b"4".to_vec())).unwrap();
638 writer.finish().unwrap();
639
640 let reader = SSTableReader::open(&path).unwrap();
641
642 assert!(!reader.might_contain(b"a")); assert!(reader.might_contain(b"b")); assert!(reader.might_contain(b"c")); assert!(reader.might_contain(b"d")); assert!(!reader.might_contain(b"e")); }
648}