1use std::cmp::Ordering;
28use std::collections::HashMap;
29use std::fs::File;
30use std::path::{Path, PathBuf};
31use std::sync::Arc;
32
33use memmap2::{Mmap, MmapOptions};
34use parking_lot::RwLock;
35
36use super::block::{Block, BlockHandle, BlockType};
37use super::filter::FilterReader;
38use super::format::{Footer, HEADER_SIZE, Header, SectionType};
39
40pub struct CachedBlock {
42 pub data: Vec<u8>,
44 pub block_type: BlockType,
46 pub decompressed: Vec<u8>,
48}
49
50pub struct BlockCache {
52 entries: RwLock<HashMap<(u64, u64), Arc<CachedBlock>>>,
54 capacity: usize,
56}
57
58impl BlockCache {
59 pub fn new(capacity: usize) -> Self {
61 Self {
62 entries: RwLock::new(HashMap::with_capacity(capacity)),
63 capacity,
64 }
65 }
66
67 pub fn get(&self, file_id: u64, offset: u64) -> Option<Arc<CachedBlock>> {
69 self.entries.read().get(&(file_id, offset)).cloned()
70 }
71
72 pub fn insert(&self, file_id: u64, offset: u64, block: CachedBlock) -> Arc<CachedBlock> {
74 let block = Arc::new(block);
75 let mut entries = self.entries.write();
76
77 if entries.len() >= self.capacity {
79 entries.clear();
80 }
81
82 entries.insert((file_id, offset), block.clone());
83 block
84 }
85}
86
87#[derive(Debug, Clone)]
89pub struct ReadOptions {
90 pub verify_checksums: bool,
92 pub fill_cache: bool,
94 pub use_filter: bool,
96}
97
98impl Default for ReadOptions {
99 fn default() -> Self {
100 Self {
101 verify_checksums: true,
102 fill_cache: true,
103 use_filter: true,
104 }
105 }
106}
107
108pub struct SSTable {
110 path: PathBuf,
112 file_id: u64,
114 mmap: Mmap,
116 header: Header,
118 footer: Footer,
120 index: Vec<u8>,
122 index_entries: Vec<IndexEntry>,
124 filter: Option<FilterReader>,
126 metadata: TableMetadata,
128 cache: Option<Arc<BlockCache>>,
130}
131
132#[derive(Debug, Clone)]
134struct IndexEntry {
135 largest_key: Vec<u8>,
137 handle: BlockHandle,
139}
140
141#[derive(Debug, Clone)]
143pub struct TableMetadata {
144 pub file_size: u64,
146 pub num_data_blocks: usize,
148 pub smallest_key: Option<Vec<u8>>,
150 pub largest_key: Option<Vec<u8>>,
152}
153
154impl SSTable {
155 pub fn open<P: AsRef<Path>>(path: P) -> std::io::Result<Self> {
157 Self::open_with_cache(path, None)
158 }
159
160 pub fn open_with_cache<P: AsRef<Path>>(
162 path: P,
163 cache: Option<Arc<BlockCache>>,
164 ) -> std::io::Result<Self> {
165 let path = path.as_ref();
166 let file = File::open(path)?;
167 let file_size = file.metadata()?.len();
168
169 let mmap = unsafe { MmapOptions::new().map(&file)? };
171
172 let file_id = {
174 use std::hash::{Hash, Hasher};
175 let mut hasher = std::collections::hash_map::DefaultHasher::new();
176 path.hash(&mut hasher);
177 hasher.finish()
178 };
179
180 if mmap.len() < HEADER_SIZE {
182 return Err(std::io::Error::new(
183 std::io::ErrorKind::InvalidData,
184 "File too small for SSTable header",
185 ));
186 }
187
188 let header = Header::decode(&mmap[..HEADER_SIZE]).ok_or_else(|| {
189 std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid SSTable header")
190 })?;
191
192 let footer_offset = header.footer_offset as usize;
194 if footer_offset >= mmap.len() {
195 return Err(std::io::Error::new(
196 std::io::ErrorKind::InvalidData,
197 "Footer offset beyond file",
198 ));
199 }
200
201 let footer =
202 Footer::decode(&mmap[footer_offset..], header.num_sections).ok_or_else(|| {
203 std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid SSTable footer")
204 })?;
205
206 let index_section = footer
208 .sections
209 .iter()
210 .find(|s| s.section_type == SectionType::Index)
211 .ok_or_else(|| {
212 std::io::Error::new(std::io::ErrorKind::InvalidData, "Missing index section")
213 })?;
214
215 let index_start = index_section.offset as usize;
216 let index_end = index_start + index_section.size as usize;
217 let index = mmap[index_start..index_end].to_vec();
218
219 let index_entries = Self::parse_index(&index)?;
221
222 let filter = footer
224 .sections
225 .iter()
226 .find(|s| s.section_type == SectionType::Filter)
227 .and_then(|section| {
228 let start = section.offset as usize;
229 let end = start + section.size as usize;
230 FilterReader::from_bytes(&mmap[start..end])
231 });
232
233 let metadata = TableMetadata {
235 file_size,
236 num_data_blocks: index_entries.len(),
237 smallest_key: index_entries.first().map(|e| e.largest_key.clone()),
238 largest_key: index_entries.last().map(|e| e.largest_key.clone()),
239 };
240
241 Ok(Self {
242 path: path.to_path_buf(),
243 file_id,
244 mmap,
245 header,
246 footer,
247 index,
248 index_entries,
249 filter,
250 metadata,
251 cache,
252 })
253 }
254
255 fn parse_index(data: &[u8]) -> std::io::Result<Vec<IndexEntry>> {
257 let mut entries = Vec::new();
258 let block = Block::new(data.to_vec()).ok_or_else(|| {
259 std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid index block")
260 })?;
261 let mut iter = block.iter();
262
263 while iter.valid() {
264 let key = iter.key().to_vec();
265 let value = iter.value();
266
267 let (handle, _bytes_read) = BlockHandle::decode(value).ok_or_else(|| {
268 std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid block handle")
269 })?;
270
271 entries.push(IndexEntry {
272 largest_key: key,
273 handle,
274 });
275
276 iter.next();
277 }
278
279 Ok(entries)
280 }
281
282 pub fn get(&self, key: &[u8], options: &ReadOptions) -> std::io::Result<Option<Vec<u8>>> {
284 if options.use_filter {
286 if let Some(ref filter) = self.filter {
287 if !filter.may_contain(key) {
288 return Ok(None);
289 }
290 }
291 }
292
293 let block_idx = self.find_block_for_key(key);
295 if block_idx >= self.index_entries.len() {
296 return Ok(None);
297 }
298
299 let block_data = self.read_block(&self.index_entries[block_idx].handle, options)?;
301 let block = Block::new(block_data).ok_or_else(|| {
302 std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid data block")
303 })?;
304
305 let iter = block.seek(key);
306 if iter.valid() && iter.key() == key {
307 Ok(Some(iter.value().to_vec()))
308 } else {
309 Ok(None)
310 }
311 }
312
313 fn find_block_for_key(&self, key: &[u8]) -> usize {
315 self.index_entries
317 .binary_search_by(|entry| {
318 if entry.largest_key.as_slice() < key {
319 Ordering::Less
320 } else {
321 Ordering::Greater
322 }
323 })
324 .unwrap_or_else(|i| i)
325 }
326
327 fn read_block(&self, handle: &BlockHandle, options: &ReadOptions) -> std::io::Result<Vec<u8>> {
329 let offset = handle.offset();
330 let size = handle.size();
331
332 if let Some(ref cache) = self.cache {
334 if let Some(block) = cache.get(self.file_id, offset) {
335 return Ok(block.decompressed.clone());
336 }
337 }
338
339 let start = offset as usize;
341 let end = start + size as usize;
342
343 if end + 5 > self.mmap.len() {
344 return Err(std::io::Error::new(
345 std::io::ErrorKind::InvalidData,
346 "Block extends beyond file",
347 ));
348 }
349
350 let block_data = &self.mmap[start..end];
351 let block_type = BlockType::from_u8(self.mmap[end]);
352 let stored_checksum = u32::from_le_bytes([
353 self.mmap[end + 1],
354 self.mmap[end + 2],
355 self.mmap[end + 3],
356 self.mmap[end + 4],
357 ]);
358
359 if options.verify_checksums {
361 let computed_checksum = crc32fast::hash(block_data);
362 if computed_checksum != stored_checksum {
363 return Err(std::io::Error::new(
364 std::io::ErrorKind::InvalidData,
365 "Block checksum mismatch",
366 ));
367 }
368 }
369
370 let decompressed = match block_type {
372 BlockType::Uncompressed => block_data.to_vec(),
373 BlockType::Lz4 => lz4_flex::decompress_size_prepended(block_data).map_err(|e| {
374 std::io::Error::new(std::io::ErrorKind::InvalidData, format!("LZ4 error: {}", e))
375 })?,
376 BlockType::Zstd => zstd::decode_all(block_data).map_err(|e| {
377 std::io::Error::new(
378 std::io::ErrorKind::InvalidData,
379 format!("Zstd error: {}", e),
380 )
381 })?,
382 BlockType::Snappy => {
383 return Err(std::io::Error::new(
384 std::io::ErrorKind::InvalidData,
385 "Snappy not supported",
386 ));
387 }
388 };
389
390 if options.fill_cache {
392 if let Some(ref cache) = self.cache {
393 cache.insert(
394 self.file_id,
395 offset,
396 CachedBlock {
397 data: block_data.to_vec(),
398 block_type,
399 decompressed: decompressed.clone(),
400 },
401 );
402 }
403 }
404
405 Ok(decompressed)
406 }
407
408 pub fn iter(&self) -> SSTableIterator<'_> {
410 SSTableIterator::new(self)
411 }
412
413 pub fn range(&self, start: Option<&[u8]>, end: Option<&[u8]>) -> RangeIterator<'_> {
415 RangeIterator::new(self, start, end)
416 }
417
418 pub fn metadata(&self) -> &TableMetadata {
420 &self.metadata
421 }
422
423 pub fn path(&self) -> &Path {
425 &self.path
426 }
427
428 pub fn num_blocks(&self) -> usize {
430 self.index_entries.len()
431 }
432
433 pub fn may_contain(&self, key: &[u8]) -> bool {
435 self.filter
436 .as_ref()
437 .map(|f| f.may_contain(key))
438 .unwrap_or(true)
439 }
440}
441
442pub struct SSTableIterator<'a> {
449 table: &'a SSTable,
450 block_idx: usize,
452 entries: Vec<(Vec<u8>, Vec<u8>)>,
454 entry_idx: usize,
456 options: ReadOptions,
458 valid: bool,
460}
461
462impl<'a> SSTableIterator<'a> {
463 fn new(table: &'a SSTable) -> Self {
464 let mut iter = Self {
465 table,
466 block_idx: 0,
467 entries: Vec::new(),
468 entry_idx: 0,
469 options: ReadOptions::default(),
470 valid: false,
471 };
472 iter.load_block(0);
473 iter
474 }
475
476 fn load_block(&mut self, block_idx: usize) {
479 self.block_idx = block_idx;
480 self.entries.clear();
481 self.entry_idx = 0;
482 self.valid = false;
483
484 while self.block_idx < self.table.index_entries.len() {
485 let handle = &self.table.index_entries[self.block_idx].handle;
486 match self.table.read_block(handle, &self.options) {
487 Ok(data) => {
488 if let Some(block) = Block::new(data) {
489 let mut bi = block.iter();
491 while bi.valid() {
492 self.entries.push((bi.key().to_vec(), bi.value().to_vec()));
493 bi.next();
494 }
495 if !self.entries.is_empty() {
496 self.entry_idx = 0;
497 self.valid = true;
498 return;
499 }
500 }
502 }
503 Err(_) => {
504 }
506 }
507 self.block_idx += 1;
508 }
509 }
510
511 pub fn valid(&self) -> bool {
513 self.valid
514 }
515
516 pub fn key(&self) -> Option<&[u8]> {
518 if self.valid {
519 Some(&self.entries[self.entry_idx].0)
520 } else {
521 None
522 }
523 }
524
525 pub fn value(&self) -> Option<&[u8]> {
527 if self.valid {
528 Some(&self.entries[self.entry_idx].1)
529 } else {
530 None
531 }
532 }
533
534 pub fn next(&mut self) {
537 if !self.valid {
538 return;
539 }
540
541 self.entry_idx += 1;
542 if self.entry_idx < self.entries.len() {
543 return; }
545
546 self.load_block(self.block_idx + 1);
548 }
549
550 pub fn seek(&mut self, target: &[u8]) {
552 let start_block = self.table.find_block_for_key(target);
554 self.load_block(start_block);
555
556 while self.valid {
558 if self.entries[self.entry_idx].0.as_slice() >= target {
559 return;
560 }
561 self.next();
562 }
563 }
564
565 pub fn seek_to_first(&mut self) {
567 self.load_block(0);
568 }
569}
570
571pub struct RangeIterator<'a> {
575 inner: SSTableIterator<'a>,
576 end: Option<Vec<u8>>,
577 exhausted: bool,
578}
579
580impl<'a> RangeIterator<'a> {
581 fn new(table: &'a SSTable, start: Option<&[u8]>, end: Option<&[u8]>) -> Self {
582 let mut inner = SSTableIterator::new(table);
583
584 if let Some(start_key) = start {
586 inner.seek(start_key);
587 }
588
589 let mut ri = Self {
590 inner,
591 end: end.map(|e| e.to_vec()),
592 exhausted: false,
593 };
594
595 ri.check_end();
597 ri
598 }
599
600 fn check_end(&mut self) {
602 if self.exhausted {
603 return;
604 }
605 if !self.inner.valid() {
606 self.exhausted = true;
607 return;
608 }
609 if let Some(ref end_key) = self.end {
610 if let Some(key) = self.inner.key() {
611 if key >= end_key.as_slice() {
612 self.exhausted = true;
613 }
614 }
615 }
616 }
617
618 pub fn exhausted(&self) -> bool {
620 self.exhausted
621 }
622
623 pub fn valid(&self) -> bool {
625 !self.exhausted && self.inner.valid()
626 }
627
628 pub fn key(&self) -> Option<&[u8]> {
630 if self.exhausted {
631 None
632 } else {
633 self.inner.key()
634 }
635 }
636
637 pub fn value(&self) -> Option<&[u8]> {
639 if self.exhausted {
640 None
641 } else {
642 self.inner.value()
643 }
644 }
645
646 pub fn next(&mut self) {
648 if self.exhausted {
649 return;
650 }
651 self.inner.next();
652 self.check_end();
653 }
654}
655
656#[cfg(test)]
661mod tests {
662 use super::*;
663 use crate::sstable::builder::{SSTableBuilder, SSTableBuilderOptions};
664 use tempfile::tempdir;
665
666 #[test]
667 fn test_roundtrip() {
668 let dir = tempdir().unwrap();
669 let path = dir.path().join("test.sst");
670
671 let options = SSTableBuilderOptions {
673 block_size: 256,
674 filter_policy: None,
675 ..Default::default()
676 };
677
678 let mut builder = SSTableBuilder::new(&path, options).unwrap();
679
680 for i in 0..100 {
681 let key = format!("key{:05}", i);
682 let value = format!("value{:05}", i);
683 builder.add(key.as_bytes(), value.as_bytes()).unwrap();
684 }
685
686 builder.finish().unwrap();
687
688 let table = SSTable::open(&path).unwrap();
690
691 assert_eq!(table.num_blocks(), table.metadata.num_data_blocks);
692 }
693
694 #[test]
695 fn test_get() {
696 let dir = tempdir().unwrap();
697 let path = dir.path().join("test_get.sst");
698
699 let options = SSTableBuilderOptions {
700 block_size: 256,
701 filter_policy: None,
702 ..Default::default()
703 };
704
705 let mut builder = SSTableBuilder::new(&path, options).unwrap();
706
707 for i in 0..100 {
708 let key = format!("key{:05}", i);
709 let value = format!("value{:05}", i);
710 builder.add(key.as_bytes(), value.as_bytes()).unwrap();
711 }
712
713 builder.finish().unwrap();
714
715 let table = SSTable::open(&path).unwrap();
716 let read_opts = ReadOptions::default();
717
718 let result = table.get(b"key00050", &read_opts).unwrap();
720 assert!(result.is_some());
721 assert_eq!(result.unwrap(), b"value00050");
722
723 let result = table.get(b"nonexistent", &read_opts).unwrap();
725 assert!(result.is_none());
726 }
727
728 #[test]
729 fn test_block_cache() {
730 let cache = BlockCache::new(100);
731
732 let block = CachedBlock {
733 data: vec![1, 2, 3],
734 block_type: BlockType::Uncompressed,
735 decompressed: vec![1, 2, 3],
736 };
737
738 cache.insert(1, 0, block);
739
740 let cached = cache.get(1, 0);
741 assert!(cached.is_some());
742 assert_eq!(cached.unwrap().data, vec![1, 2, 3]);
743
744 let missing = cache.get(1, 100);
745 assert!(missing.is_none());
746 }
747
748 #[test]
749 fn test_sstable_iterator_full_scan() {
750 let dir = tempdir().unwrap();
751 let path = dir.path().join("test_iter.sst");
752
753 let options = SSTableBuilderOptions {
755 block_size: 64,
756 filter_policy: None,
757 ..Default::default()
758 };
759
760 let mut builder = SSTableBuilder::new(&path, options).unwrap();
761
762 let n = 200;
763 for i in 0..n {
764 let key = format!("key{:05}", i);
765 let value = format!("value{:05}", i);
766 builder.add(key.as_bytes(), value.as_bytes()).unwrap();
767 }
768
769 builder.finish().unwrap();
770
771 let table = SSTable::open(&path).unwrap();
772 eprintln!("num_blocks = {}", table.num_blocks());
773 assert!(table.num_blocks() > 1, "Need multiple blocks for this test");
774
775 let mut iter = table.iter();
777 let mut count = 0;
778 let mut prev_key: Option<Vec<u8>> = None;
779
780 while iter.valid() {
781 let key = iter.key().unwrap().to_vec();
782 let value = iter.value().unwrap().to_vec();
783
784 let expected_key = format!("key{:05}", count);
785 let expected_val = format!("value{:05}", count);
786 assert_eq!(
787 String::from_utf8_lossy(&key),
788 expected_key,
789 "key mismatch at entry {}",
790 count
791 );
792 assert_eq!(
793 String::from_utf8_lossy(&value),
794 expected_val,
795 "value mismatch at entry {}",
796 count
797 );
798
799 if let Some(ref pk) = prev_key {
801 assert!(key > *pk, "keys not in order at {}", count);
802 }
803 prev_key = Some(key);
804
805 count += 1;
806 iter.next();
807 }
808
809 assert_eq!(count, n, "iterator did not return all {} entries", n);
810 }
811
812 #[test]
813 fn test_sstable_iterator_seek() {
814 let dir = tempdir().unwrap();
815 let path = dir.path().join("test_seek.sst");
816
817 let options = SSTableBuilderOptions {
818 block_size: 64,
819 filter_policy: None,
820 ..Default::default()
821 };
822
823 let mut builder = SSTableBuilder::new(&path, options).unwrap();
824
825 for i in (0..100).step_by(2) {
826 let key = format!("key{:05}", i);
827 let value = format!("value{:05}", i);
828 builder.add(key.as_bytes(), value.as_bytes()).unwrap();
829 }
830
831 builder.finish().unwrap();
832
833 let table = SSTable::open(&path).unwrap();
834
835 let mut iter = table.iter();
837 iter.seek(b"key00010");
838 assert!(iter.valid());
839 assert_eq!(iter.key().unwrap(), b"key00010");
840
841 iter.seek(b"key00011");
843 assert!(iter.valid());
844 assert_eq!(iter.key().unwrap(), b"key00012");
845
846 iter.seek(b"key99999");
848 assert!(!iter.valid());
849
850 iter.seek_to_first();
852 assert!(iter.valid());
853 assert_eq!(iter.key().unwrap(), b"key00000");
854 }
855
856 #[test]
857 fn test_range_iterator() {
858 let dir = tempdir().unwrap();
859 let path = dir.path().join("test_range.sst");
860
861 let options = SSTableBuilderOptions {
862 block_size: 64,
863 filter_policy: None,
864 ..Default::default()
865 };
866
867 let mut builder = SSTableBuilder::new(&path, options).unwrap();
868
869 for i in 0..100 {
870 let key = format!("key{:05}", i);
871 let value = format!("value{:05}", i);
872 builder.add(key.as_bytes(), value.as_bytes()).unwrap();
873 }
874
875 builder.finish().unwrap();
876
877 let table = SSTable::open(&path).unwrap();
878
879 let mut range = table.range(Some(b"key00010"), Some(b"key00020"));
881 let mut count = 0;
882
883 while range.valid() {
884 let key = range.key().unwrap();
885 assert!(key >= b"key00010".as_slice());
886 assert!(key < b"key00020".as_slice());
887 count += 1;
888 range.next();
889 }
890
891 assert_eq!(count, 10, "expected 10 keys in range [10, 20)");
892 assert!(range.exhausted());
893
894 let mut range = table.range(None, None);
896 let mut total = 0;
897 while range.valid() {
898 total += 1;
899 range.next();
900 }
901 assert_eq!(total, 100);
902 }
903}