1use std::cmp::Ordering;
28use std::collections::HashMap;
29use std::fs::File;
30use std::io::{Read, Seek, SeekFrom};
31use std::path::{Path, PathBuf};
32use std::sync::Arc;
33
34use memmap2::{Mmap, MmapOptions};
35use parking_lot::RwLock;
36
37use super::block::{Block, BlockHandle, BlockIterator, BlockType};
38use super::filter::FilterReader;
39use super::format::{Footer, Header, Section, SectionType, SSTableFormat, HEADER_SIZE};
40
41pub struct CachedBlock {
43 pub data: Vec<u8>,
45 pub block_type: BlockType,
47 pub decompressed: Vec<u8>,
49}
50
51pub struct BlockCache {
53 entries: RwLock<HashMap<(u64, u64), Arc<CachedBlock>>>,
55 capacity: usize,
57}
58
59impl BlockCache {
60 pub fn new(capacity: usize) -> Self {
62 Self {
63 entries: RwLock::new(HashMap::with_capacity(capacity)),
64 capacity,
65 }
66 }
67
68 pub fn get(&self, file_id: u64, offset: u64) -> Option<Arc<CachedBlock>> {
70 self.entries.read().get(&(file_id, offset)).cloned()
71 }
72
73 pub fn insert(&self, file_id: u64, offset: u64, block: CachedBlock) -> Arc<CachedBlock> {
75 let block = Arc::new(block);
76 let mut entries = self.entries.write();
77
78 if entries.len() >= self.capacity {
80 entries.clear();
81 }
82
83 entries.insert((file_id, offset), block.clone());
84 block
85 }
86}
87
88#[derive(Debug, Clone)]
90pub struct ReadOptions {
91 pub verify_checksums: bool,
93 pub fill_cache: bool,
95 pub use_filter: bool,
97}
98
99impl Default for ReadOptions {
100 fn default() -> Self {
101 Self {
102 verify_checksums: true,
103 fill_cache: true,
104 use_filter: true,
105 }
106 }
107}
108
109pub struct SSTable {
111 path: PathBuf,
113 file_id: u64,
115 mmap: Mmap,
117 header: Header,
119 footer: Footer,
121 index: Vec<u8>,
123 index_entries: Vec<IndexEntry>,
125 filter: Option<FilterReader>,
127 metadata: TableMetadata,
129 cache: Option<Arc<BlockCache>>,
131}
132
133#[derive(Debug, Clone)]
135struct IndexEntry {
136 largest_key: Vec<u8>,
138 handle: BlockHandle,
140}
141
142#[derive(Debug, Clone)]
144pub struct TableMetadata {
145 pub file_size: u64,
147 pub num_data_blocks: usize,
149 pub smallest_key: Option<Vec<u8>>,
151 pub largest_key: Option<Vec<u8>>,
153}
154
155impl SSTable {
156 pub fn open<P: AsRef<Path>>(path: P) -> std::io::Result<Self> {
158 Self::open_with_cache(path, None)
159 }
160
161 pub fn open_with_cache<P: AsRef<Path>>(
163 path: P,
164 cache: Option<Arc<BlockCache>>,
165 ) -> std::io::Result<Self> {
166 let path = path.as_ref();
167 let file = File::open(path)?;
168 let file_size = file.metadata()?.len();
169
170 let mmap = unsafe { MmapOptions::new().map(&file)? };
172
173 let file_id = {
175 use std::hash::{Hash, Hasher};
176 let mut hasher = std::collections::hash_map::DefaultHasher::new();
177 path.hash(&mut hasher);
178 hasher.finish()
179 };
180
181 if mmap.len() < HEADER_SIZE {
183 return Err(std::io::Error::new(
184 std::io::ErrorKind::InvalidData,
185 "File too small for SSTable header",
186 ));
187 }
188
189 let header = Header::decode(&mmap[..HEADER_SIZE]).ok_or_else(|| {
190 std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid SSTable header")
191 })?;
192
193 let footer_offset = header.footer_offset as usize;
195 if footer_offset >= mmap.len() {
196 return Err(std::io::Error::new(
197 std::io::ErrorKind::InvalidData,
198 "Footer offset beyond file",
199 ));
200 }
201
202 let footer = Footer::decode(&mmap[footer_offset..], header.num_sections).ok_or_else(|| {
203 std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid SSTable footer")
204 })?;
205
206 let index_section = footer
208 .sections
209 .iter()
210 .find(|s| s.section_type == SectionType::Index)
211 .ok_or_else(|| {
212 std::io::Error::new(std::io::ErrorKind::InvalidData, "Missing index section")
213 })?;
214
215 let index_start = index_section.offset as usize;
216 let index_end = index_start + index_section.size as usize;
217 let index = mmap[index_start..index_end].to_vec();
218
219 let index_entries = Self::parse_index(&index)?;
221
222 let filter = footer
224 .sections
225 .iter()
226 .find(|s| s.section_type == SectionType::Filter)
227 .and_then(|section| {
228 let start = section.offset as usize;
229 let end = start + section.size as usize;
230 FilterReader::from_bytes(&mmap[start..end])
231 });
232
233 let metadata = TableMetadata {
235 file_size,
236 num_data_blocks: index_entries.len(),
237 smallest_key: index_entries.first().map(|e| e.largest_key.clone()),
238 largest_key: index_entries.last().map(|e| e.largest_key.clone()),
239 };
240
241 Ok(Self {
242 path: path.to_path_buf(),
243 file_id,
244 mmap,
245 header,
246 footer,
247 index,
248 index_entries,
249 filter,
250 metadata,
251 cache,
252 })
253 }
254
255 fn parse_index(data: &[u8]) -> std::io::Result<Vec<IndexEntry>> {
257 let mut entries = Vec::new();
258 let block = Block::new(data.to_vec()).ok_or_else(|| {
259 std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid index block")
260 })?;
261 let mut iter = block.iter();
262
263 while iter.valid() {
264 let key = iter.key().to_vec();
265 let value = iter.value();
266
267 let (handle, _bytes_read) = BlockHandle::decode(value).ok_or_else(|| {
268 std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid block handle")
269 })?;
270
271 entries.push(IndexEntry {
272 largest_key: key,
273 handle,
274 });
275
276 iter.next();
277 }
278
279 Ok(entries)
280 }
281
282 pub fn get(&self, key: &[u8], options: &ReadOptions) -> std::io::Result<Option<Vec<u8>>> {
284 if options.use_filter {
286 if let Some(ref filter) = self.filter {
287 if !filter.may_contain(key) {
288 return Ok(None);
289 }
290 }
291 }
292
293 let block_idx = self.find_block_for_key(key);
295 if block_idx >= self.index_entries.len() {
296 return Ok(None);
297 }
298
299 let block_data = self.read_block(&self.index_entries[block_idx].handle, options)?;
301 let block = Block::new(block_data).ok_or_else(|| {
302 std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid data block")
303 })?;
304
305 let iter = block.seek(key);
306 if iter.valid() && iter.key() == key {
307 Ok(Some(iter.value().to_vec()))
308 } else {
309 Ok(None)
310 }
311 }
312
313 fn find_block_for_key(&self, key: &[u8]) -> usize {
315 self.index_entries
317 .binary_search_by(|entry| {
318 if entry.largest_key.as_slice() < key {
319 Ordering::Less
320 } else {
321 Ordering::Greater
322 }
323 })
324 .unwrap_or_else(|i| i)
325 }
326
327 fn read_block(
329 &self,
330 handle: &BlockHandle,
331 options: &ReadOptions,
332 ) -> std::io::Result<Vec<u8>> {
333 let offset = handle.offset();
334 let size = handle.size();
335
336 if let Some(ref cache) = self.cache {
338 if let Some(block) = cache.get(self.file_id, offset) {
339 return Ok(block.decompressed.clone());
340 }
341 }
342
343 let start = offset as usize;
345 let end = start + size as usize;
346
347 if end + 5 > self.mmap.len() {
348 return Err(std::io::Error::new(
349 std::io::ErrorKind::InvalidData,
350 "Block extends beyond file",
351 ));
352 }
353
354 let block_data = &self.mmap[start..end];
355 let block_type = BlockType::from_u8(self.mmap[end]);
356 let stored_checksum = u32::from_le_bytes([
357 self.mmap[end + 1],
358 self.mmap[end + 2],
359 self.mmap[end + 3],
360 self.mmap[end + 4],
361 ]);
362
363 if options.verify_checksums {
365 let computed_checksum = crc32fast::hash(block_data);
366 if computed_checksum != stored_checksum {
367 return Err(std::io::Error::new(
368 std::io::ErrorKind::InvalidData,
369 "Block checksum mismatch",
370 ));
371 }
372 }
373
374 let decompressed = match block_type {
376 BlockType::Uncompressed => block_data.to_vec(),
377 BlockType::Lz4 => lz4_flex::decompress_size_prepended(block_data).map_err(|e| {
378 std::io::Error::new(std::io::ErrorKind::InvalidData, format!("LZ4 error: {}", e))
379 })?,
380 BlockType::Zstd => zstd::decode_all(block_data).map_err(|e| {
381 std::io::Error::new(std::io::ErrorKind::InvalidData, format!("Zstd error: {}", e))
382 })?,
383 BlockType::Snappy => {
384 return Err(std::io::Error::new(
385 std::io::ErrorKind::InvalidData,
386 "Snappy not supported",
387 ))
388 }
389 };
390
391 if options.fill_cache {
393 if let Some(ref cache) = self.cache {
394 cache.insert(
395 self.file_id,
396 offset,
397 CachedBlock {
398 data: block_data.to_vec(),
399 block_type,
400 decompressed: decompressed.clone(),
401 },
402 );
403 }
404 }
405
406 Ok(decompressed)
407 }
408
409 pub fn iter(&self) -> SSTableIterator {
411 SSTableIterator::new(self)
412 }
413
414 pub fn range(
416 &self,
417 start: Option<&[u8]>,
418 end: Option<&[u8]>,
419 ) -> RangeIterator {
420 RangeIterator::new(self, start, end)
421 }
422
423 pub fn metadata(&self) -> &TableMetadata {
425 &self.metadata
426 }
427
428 pub fn path(&self) -> &Path {
430 &self.path
431 }
432
433 pub fn num_blocks(&self) -> usize {
435 self.index_entries.len()
436 }
437
438 pub fn may_contain(&self, key: &[u8]) -> bool {
440 self.filter
441 .as_ref()
442 .map(|f| f.may_contain(key))
443 .unwrap_or(true)
444 }
445}
446
447pub struct SSTableIterator<'a> {
449 table: &'a SSTable,
450 block_idx: usize,
452 block_data: Option<Vec<u8>>,
454 block_iter: Option<BlockIterator<'a>>,
456 options: ReadOptions,
458 valid: bool,
460}
461
462impl<'a> SSTableIterator<'a> {
463 fn new(table: &'a SSTable) -> Self {
464 let mut iter = Self {
465 table,
466 block_idx: 0,
467 block_data: None,
468 block_iter: None,
469 options: ReadOptions::default(),
470 valid: false,
471 };
472 iter.load_block();
473 iter
474 }
475
476 fn load_block(&mut self) {
478 if self.block_idx >= self.table.index_entries.len() {
479 self.valid = false;
480 return;
481 }
482
483 let handle = &self.table.index_entries[self.block_idx].handle;
484 match self.table.read_block(handle, &self.options) {
485 Ok(data) => {
486 self.block_data = Some(data);
487 self.valid = true;
488 }
489 Err(_) => {
490 self.valid = false;
491 }
492 }
493 }
494
495 pub fn valid(&self) -> bool {
497 self.valid
498 }
499
500 pub fn key(&self) -> Option<&[u8]> {
502 if !self.valid {
503 return None;
504 }
505 self.block_data.as_ref().map(|_| &b""[..])
508 }
509
510 pub fn value(&self) -> Option<&[u8]> {
512 if !self.valid {
513 return None;
514 }
515 self.block_data.as_ref().map(|_| &b""[..])
516 }
517
518 pub fn next(&mut self) {
520 self.block_idx += 1;
524 self.load_block();
525 }
526
527 pub fn seek(&mut self, target: &[u8]) {
529 self.block_idx = self.table.find_block_for_key(target);
531 self.load_block();
532 }
534}
535
536pub struct RangeIterator<'a> {
538 table: &'a SSTable,
539 start: Option<Vec<u8>>,
540 end: Option<Vec<u8>>,
541 current_block: usize,
542 exhausted: bool,
543}
544
545impl<'a> RangeIterator<'a> {
546 fn new(table: &'a SSTable, start: Option<&[u8]>, end: Option<&[u8]>) -> Self {
547 let start_block = start
548 .map(|k| table.find_block_for_key(k))
549 .unwrap_or(0);
550
551 Self {
552 table,
553 start: start.map(|s| s.to_vec()),
554 end: end.map(|e| e.to_vec()),
555 current_block: start_block,
556 exhausted: false,
557 }
558 }
559
560 pub fn exhausted(&self) -> bool {
562 self.exhausted
563 }
564}
565
566#[cfg(test)]
571mod tests {
572 use super::*;
573 use crate::sstable::builder::{SSTableBuilder, SSTableBuilderOptions};
574 use tempfile::tempdir;
575
576 #[test]
577 fn test_roundtrip() {
578 let dir = tempdir().unwrap();
579 let path = dir.path().join("test.sst");
580
581 let options = SSTableBuilderOptions {
583 block_size: 256,
584 filter_policy: None,
585 ..Default::default()
586 };
587
588 let mut builder = SSTableBuilder::new(&path, options).unwrap();
589
590 for i in 0..100 {
591 let key = format!("key{:05}", i);
592 let value = format!("value{:05}", i);
593 builder.add(key.as_bytes(), value.as_bytes()).unwrap();
594 }
595
596 builder.finish().unwrap();
597
598 let table = SSTable::open(&path).unwrap();
600
601 assert_eq!(table.num_blocks(), table.metadata.num_data_blocks);
602 }
603
604 #[test]
605 fn test_get() {
606 let dir = tempdir().unwrap();
607 let path = dir.path().join("test_get.sst");
608
609 let options = SSTableBuilderOptions {
610 block_size: 256,
611 filter_policy: None,
612 ..Default::default()
613 };
614
615 let mut builder = SSTableBuilder::new(&path, options).unwrap();
616
617 for i in 0..100 {
618 let key = format!("key{:05}", i);
619 let value = format!("value{:05}", i);
620 builder.add(key.as_bytes(), value.as_bytes()).unwrap();
621 }
622
623 builder.finish().unwrap();
624
625 let table = SSTable::open(&path).unwrap();
626 let read_opts = ReadOptions::default();
627
628 let result = table.get(b"key00050", &read_opts).unwrap();
630 assert!(result.is_some());
631 assert_eq!(result.unwrap(), b"value00050");
632
633 let result = table.get(b"nonexistent", &read_opts).unwrap();
635 assert!(result.is_none());
636 }
637
638 #[test]
639 fn test_block_cache() {
640 let cache = BlockCache::new(100);
641
642 let block = CachedBlock {
643 data: vec![1, 2, 3],
644 block_type: BlockType::Uncompressed,
645 decompressed: vec![1, 2, 3],
646 };
647
648 cache.insert(1, 0, block);
649
650 let cached = cache.get(1, 0);
651 assert!(cached.is_some());
652 assert_eq!(cached.unwrap().data, vec![1, 2, 3]);
653
654 let missing = cache.get(1, 100);
655 assert!(missing.is_none());
656 }
657}