1use std::cmp::Ordering;
25use std::collections::HashMap;
26use std::fs::File;
27use std::io::{Read, Seek, SeekFrom};
28use std::path::{Path, PathBuf};
29use std::sync::Arc;
30
31use memmap2::{Mmap, MmapOptions};
32use parking_lot::RwLock;
33
34use super::block::{Block, BlockHandle, BlockIterator, BlockType};
35use super::filter::FilterReader;
36use super::format::{Footer, Header, Section, SectionType, SSTableFormat, HEADER_SIZE};
37
38pub struct CachedBlock {
40 pub data: Vec<u8>,
42 pub block_type: BlockType,
44 pub decompressed: Vec<u8>,
46}
47
48pub struct BlockCache {
50 entries: RwLock<HashMap<(u64, u64), Arc<CachedBlock>>>,
52 capacity: usize,
54}
55
56impl BlockCache {
57 pub fn new(capacity: usize) -> Self {
59 Self {
60 entries: RwLock::new(HashMap::with_capacity(capacity)),
61 capacity,
62 }
63 }
64
65 pub fn get(&self, file_id: u64, offset: u64) -> Option<Arc<CachedBlock>> {
67 self.entries.read().get(&(file_id, offset)).cloned()
68 }
69
70 pub fn insert(&self, file_id: u64, offset: u64, block: CachedBlock) -> Arc<CachedBlock> {
72 let block = Arc::new(block);
73 let mut entries = self.entries.write();
74
75 if entries.len() >= self.capacity {
77 entries.clear();
78 }
79
80 entries.insert((file_id, offset), block.clone());
81 block
82 }
83}
84
85#[derive(Debug, Clone)]
87pub struct ReadOptions {
88 pub verify_checksums: bool,
90 pub fill_cache: bool,
92 pub use_filter: bool,
94}
95
96impl Default for ReadOptions {
97 fn default() -> Self {
98 Self {
99 verify_checksums: true,
100 fill_cache: true,
101 use_filter: true,
102 }
103 }
104}
105
106pub struct SSTable {
108 path: PathBuf,
110 file_id: u64,
112 mmap: Mmap,
114 header: Header,
116 footer: Footer,
118 index: Vec<u8>,
120 index_entries: Vec<IndexEntry>,
122 filter: Option<FilterReader>,
124 metadata: TableMetadata,
126 cache: Option<Arc<BlockCache>>,
128}
129
130#[derive(Debug, Clone)]
132struct IndexEntry {
133 largest_key: Vec<u8>,
135 handle: BlockHandle,
137}
138
139#[derive(Debug, Clone)]
141pub struct TableMetadata {
142 pub file_size: u64,
144 pub num_data_blocks: usize,
146 pub smallest_key: Option<Vec<u8>>,
148 pub largest_key: Option<Vec<u8>>,
150}
151
152impl SSTable {
153 pub fn open<P: AsRef<Path>>(path: P) -> std::io::Result<Self> {
155 Self::open_with_cache(path, None)
156 }
157
158 pub fn open_with_cache<P: AsRef<Path>>(
160 path: P,
161 cache: Option<Arc<BlockCache>>,
162 ) -> std::io::Result<Self> {
163 let path = path.as_ref();
164 let file = File::open(path)?;
165 let file_size = file.metadata()?.len();
166
167 let mmap = unsafe { MmapOptions::new().map(&file)? };
169
170 let file_id = {
172 use std::hash::{Hash, Hasher};
173 let mut hasher = std::collections::hash_map::DefaultHasher::new();
174 path.hash(&mut hasher);
175 hasher.finish()
176 };
177
178 if mmap.len() < HEADER_SIZE {
180 return Err(std::io::Error::new(
181 std::io::ErrorKind::InvalidData,
182 "File too small for SSTable header",
183 ));
184 }
185
186 let header = Header::decode(&mmap[..HEADER_SIZE]).ok_or_else(|| {
187 std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid SSTable header")
188 })?;
189
190 let footer_offset = header.footer_offset as usize;
192 if footer_offset >= mmap.len() {
193 return Err(std::io::Error::new(
194 std::io::ErrorKind::InvalidData,
195 "Footer offset beyond file",
196 ));
197 }
198
199 let footer = Footer::decode(&mmap[footer_offset..], header.num_sections).ok_or_else(|| {
200 std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid SSTable footer")
201 })?;
202
203 let index_section = footer
205 .sections
206 .iter()
207 .find(|s| s.section_type == SectionType::Index)
208 .ok_or_else(|| {
209 std::io::Error::new(std::io::ErrorKind::InvalidData, "Missing index section")
210 })?;
211
212 let index_start = index_section.offset as usize;
213 let index_end = index_start + index_section.size as usize;
214 let index = mmap[index_start..index_end].to_vec();
215
216 let index_entries = Self::parse_index(&index)?;
218
219 let filter = footer
221 .sections
222 .iter()
223 .find(|s| s.section_type == SectionType::Filter)
224 .and_then(|section| {
225 let start = section.offset as usize;
226 let end = start + section.size as usize;
227 FilterReader::from_bytes(&mmap[start..end])
228 });
229
230 let metadata = TableMetadata {
232 file_size,
233 num_data_blocks: index_entries.len(),
234 smallest_key: index_entries.first().map(|e| e.largest_key.clone()),
235 largest_key: index_entries.last().map(|e| e.largest_key.clone()),
236 };
237
238 Ok(Self {
239 path: path.to_path_buf(),
240 file_id,
241 mmap,
242 header,
243 footer,
244 index,
245 index_entries,
246 filter,
247 metadata,
248 cache,
249 })
250 }
251
252 fn parse_index(data: &[u8]) -> std::io::Result<Vec<IndexEntry>> {
254 let mut entries = Vec::new();
255 let block = Block::new(data.to_vec()).ok_or_else(|| {
256 std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid index block")
257 })?;
258 let mut iter = block.iter();
259
260 while iter.valid() {
261 let key = iter.key().to_vec();
262 let value = iter.value();
263
264 let (handle, _bytes_read) = BlockHandle::decode(value).ok_or_else(|| {
265 std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid block handle")
266 })?;
267
268 entries.push(IndexEntry {
269 largest_key: key,
270 handle,
271 });
272
273 iter.next();
274 }
275
276 Ok(entries)
277 }
278
279 pub fn get(&self, key: &[u8], options: &ReadOptions) -> std::io::Result<Option<Vec<u8>>> {
281 if options.use_filter {
283 if let Some(ref filter) = self.filter {
284 if !filter.may_contain(key) {
285 return Ok(None);
286 }
287 }
288 }
289
290 let block_idx = self.find_block_for_key(key);
292 if block_idx >= self.index_entries.len() {
293 return Ok(None);
294 }
295
296 let block_data = self.read_block(&self.index_entries[block_idx].handle, options)?;
298 let block = Block::new(block_data).ok_or_else(|| {
299 std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid data block")
300 })?;
301
302 let iter = block.seek(key);
303 if iter.valid() && iter.key() == key {
304 Ok(Some(iter.value().to_vec()))
305 } else {
306 Ok(None)
307 }
308 }
309
310 fn find_block_for_key(&self, key: &[u8]) -> usize {
312 self.index_entries
314 .binary_search_by(|entry| {
315 if entry.largest_key.as_slice() < key {
316 Ordering::Less
317 } else {
318 Ordering::Greater
319 }
320 })
321 .unwrap_or_else(|i| i)
322 }
323
324 fn read_block(
326 &self,
327 handle: &BlockHandle,
328 options: &ReadOptions,
329 ) -> std::io::Result<Vec<u8>> {
330 let offset = handle.offset();
331 let size = handle.size();
332
333 if let Some(ref cache) = self.cache {
335 if let Some(block) = cache.get(self.file_id, offset) {
336 return Ok(block.decompressed.clone());
337 }
338 }
339
340 let start = offset as usize;
342 let end = start + size as usize;
343
344 if end + 5 > self.mmap.len() {
345 return Err(std::io::Error::new(
346 std::io::ErrorKind::InvalidData,
347 "Block extends beyond file",
348 ));
349 }
350
351 let block_data = &self.mmap[start..end];
352 let block_type = BlockType::from_u8(self.mmap[end]);
353 let stored_checksum = u32::from_le_bytes([
354 self.mmap[end + 1],
355 self.mmap[end + 2],
356 self.mmap[end + 3],
357 self.mmap[end + 4],
358 ]);
359
360 if options.verify_checksums {
362 let computed_checksum = crc32fast::hash(block_data);
363 if computed_checksum != stored_checksum {
364 return Err(std::io::Error::new(
365 std::io::ErrorKind::InvalidData,
366 "Block checksum mismatch",
367 ));
368 }
369 }
370
371 let decompressed = match block_type {
373 BlockType::Uncompressed => block_data.to_vec(),
374 BlockType::Lz4 => lz4_flex::decompress_size_prepended(block_data).map_err(|e| {
375 std::io::Error::new(std::io::ErrorKind::InvalidData, format!("LZ4 error: {}", e))
376 })?,
377 BlockType::Zstd => zstd::decode_all(block_data).map_err(|e| {
378 std::io::Error::new(std::io::ErrorKind::InvalidData, format!("Zstd error: {}", e))
379 })?,
380 BlockType::Snappy => {
381 return Err(std::io::Error::new(
382 std::io::ErrorKind::InvalidData,
383 "Snappy not supported",
384 ))
385 }
386 };
387
388 if options.fill_cache {
390 if let Some(ref cache) = self.cache {
391 cache.insert(
392 self.file_id,
393 offset,
394 CachedBlock {
395 data: block_data.to_vec(),
396 block_type,
397 decompressed: decompressed.clone(),
398 },
399 );
400 }
401 }
402
403 Ok(decompressed)
404 }
405
406 pub fn iter(&self) -> SSTableIterator {
408 SSTableIterator::new(self)
409 }
410
411 pub fn range(
413 &self,
414 start: Option<&[u8]>,
415 end: Option<&[u8]>,
416 ) -> RangeIterator {
417 RangeIterator::new(self, start, end)
418 }
419
420 pub fn metadata(&self) -> &TableMetadata {
422 &self.metadata
423 }
424
425 pub fn path(&self) -> &Path {
427 &self.path
428 }
429
430 pub fn num_blocks(&self) -> usize {
432 self.index_entries.len()
433 }
434
435 pub fn may_contain(&self, key: &[u8]) -> bool {
437 self.filter
438 .as_ref()
439 .map(|f| f.may_contain(key))
440 .unwrap_or(true)
441 }
442}
443
444pub struct SSTableIterator<'a> {
446 table: &'a SSTable,
447 block_idx: usize,
449 block_data: Option<Vec<u8>>,
451 block_iter: Option<BlockIterator<'a>>,
453 options: ReadOptions,
455 valid: bool,
457}
458
459impl<'a> SSTableIterator<'a> {
460 fn new(table: &'a SSTable) -> Self {
461 let mut iter = Self {
462 table,
463 block_idx: 0,
464 block_data: None,
465 block_iter: None,
466 options: ReadOptions::default(),
467 valid: false,
468 };
469 iter.load_block();
470 iter
471 }
472
473 fn load_block(&mut self) {
475 if self.block_idx >= self.table.index_entries.len() {
476 self.valid = false;
477 return;
478 }
479
480 let handle = &self.table.index_entries[self.block_idx].handle;
481 match self.table.read_block(handle, &self.options) {
482 Ok(data) => {
483 self.block_data = Some(data);
484 self.valid = true;
485 }
486 Err(_) => {
487 self.valid = false;
488 }
489 }
490 }
491
492 pub fn valid(&self) -> bool {
494 self.valid
495 }
496
497 pub fn key(&self) -> Option<&[u8]> {
499 if !self.valid {
500 return None;
501 }
502 self.block_data.as_ref().map(|_| &b""[..])
505 }
506
507 pub fn value(&self) -> Option<&[u8]> {
509 if !self.valid {
510 return None;
511 }
512 self.block_data.as_ref().map(|_| &b""[..])
513 }
514
515 pub fn next(&mut self) {
517 self.block_idx += 1;
521 self.load_block();
522 }
523
524 pub fn seek(&mut self, target: &[u8]) {
526 self.block_idx = self.table.find_block_for_key(target);
528 self.load_block();
529 }
531}
532
533pub struct RangeIterator<'a> {
535 table: &'a SSTable,
536 start: Option<Vec<u8>>,
537 end: Option<Vec<u8>>,
538 current_block: usize,
539 exhausted: bool,
540}
541
542impl<'a> RangeIterator<'a> {
543 fn new(table: &'a SSTable, start: Option<&[u8]>, end: Option<&[u8]>) -> Self {
544 let start_block = start
545 .map(|k| table.find_block_for_key(k))
546 .unwrap_or(0);
547
548 Self {
549 table,
550 start: start.map(|s| s.to_vec()),
551 end: end.map(|e| e.to_vec()),
552 current_block: start_block,
553 exhausted: false,
554 }
555 }
556
557 pub fn exhausted(&self) -> bool {
559 self.exhausted
560 }
561}
562
563#[cfg(test)]
568mod tests {
569 use super::*;
570 use crate::sstable::builder::{SSTableBuilder, SSTableBuilderOptions};
571 use tempfile::tempdir;
572
573 #[test]
574 fn test_roundtrip() {
575 let dir = tempdir().unwrap();
576 let path = dir.path().join("test.sst");
577
578 let options = SSTableBuilderOptions {
580 block_size: 256,
581 filter_policy: None,
582 ..Default::default()
583 };
584
585 let mut builder = SSTableBuilder::new(&path, options).unwrap();
586
587 for i in 0..100 {
588 let key = format!("key{:05}", i);
589 let value = format!("value{:05}", i);
590 builder.add(key.as_bytes(), value.as_bytes()).unwrap();
591 }
592
593 builder.finish().unwrap();
594
595 let table = SSTable::open(&path).unwrap();
597
598 assert_eq!(table.num_blocks(), table.metadata.num_data_blocks);
599 }
600
601 #[test]
602 fn test_get() {
603 let dir = tempdir().unwrap();
604 let path = dir.path().join("test_get.sst");
605
606 let options = SSTableBuilderOptions {
607 block_size: 256,
608 filter_policy: None,
609 ..Default::default()
610 };
611
612 let mut builder = SSTableBuilder::new(&path, options).unwrap();
613
614 for i in 0..100 {
615 let key = format!("key{:05}", i);
616 let value = format!("value{:05}", i);
617 builder.add(key.as_bytes(), value.as_bytes()).unwrap();
618 }
619
620 builder.finish().unwrap();
621
622 let table = SSTable::open(&path).unwrap();
623 let read_opts = ReadOptions::default();
624
625 let result = table.get(b"key00050", &read_opts).unwrap();
627 assert!(result.is_some());
628 assert_eq!(result.unwrap(), b"value00050");
629
630 let result = table.get(b"nonexistent", &read_opts).unwrap();
632 assert!(result.is_none());
633 }
634
635 #[test]
636 fn test_block_cache() {
637 let cache = BlockCache::new(100);
638
639 let block = CachedBlock {
640 data: vec![1, 2, 3],
641 block_type: BlockType::Uncompressed,
642 decompressed: vec![1, 2, 3],
643 };
644
645 cache.insert(1, 0, block);
646
647 let cached = cache.get(1, 0);
648 assert!(cached.is_some());
649 assert_eq!(cached.unwrap().data, vec![1, 2, 3]);
650
651 let missing = cache.get(1, 100);
652 assert!(missing.is_none());
653 }
654}