1use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
22use parking_lot::RwLock;
23use rustc_hash::FxHashMap;
24use std::io::{self, Read, Write};
25use std::sync::Arc;
26
27use crate::compression::{CompressionDict, CompressionLevel};
28use crate::directories::{AsyncFileRead, LazyFileHandle, LazyFileSlice};
29
30pub const SSTABLE_MAGIC: u32 = 0x53544233; pub const BLOCK_SIZE: usize = 16 * 1024;
35
36pub const SPARSE_INDEX_INTERVAL: usize = 16;
40
41pub const DEFAULT_DICT_SIZE: usize = 64 * 1024;
43
44pub const BLOOM_BITS_PER_KEY: usize = 10;
46
47pub const BLOOM_HASH_COUNT: usize = 7;
49
50#[derive(Debug, Clone)]
56pub struct BloomFilter {
57 bits: Vec<u64>,
58 num_bits: usize,
59 num_hashes: usize,
60}
61
62impl BloomFilter {
63 pub fn new(expected_keys: usize, bits_per_key: usize) -> Self {
65 let num_bits = (expected_keys * bits_per_key).max(64);
66 let num_words = num_bits.div_ceil(64);
67 Self {
68 bits: vec![0u64; num_words],
69 num_bits,
70 num_hashes: BLOOM_HASH_COUNT,
71 }
72 }
73
74 pub fn from_bytes(data: &[u8]) -> io::Result<Self> {
76 if data.len() < 12 {
77 return Err(io::Error::new(
78 io::ErrorKind::InvalidData,
79 "Bloom filter data too short",
80 ));
81 }
82 let mut reader = data;
83 let num_bits = reader.read_u32::<LittleEndian>()? as usize;
84 let num_hashes = reader.read_u32::<LittleEndian>()? as usize;
85 let num_words = reader.read_u32::<LittleEndian>()? as usize;
86
87 if reader.len() < num_words * 8 {
88 return Err(io::Error::new(
89 io::ErrorKind::InvalidData,
90 "Bloom filter data truncated",
91 ));
92 }
93
94 let mut bits = Vec::with_capacity(num_words);
95 for _ in 0..num_words {
96 bits.push(reader.read_u64::<LittleEndian>()?);
97 }
98
99 Ok(Self {
100 bits,
101 num_bits,
102 num_hashes,
103 })
104 }
105
106 pub fn to_bytes(&self) -> Vec<u8> {
108 let mut data = Vec::with_capacity(12 + self.bits.len() * 8);
109 data.write_u32::<LittleEndian>(self.num_bits as u32)
110 .unwrap();
111 data.write_u32::<LittleEndian>(self.num_hashes as u32)
112 .unwrap();
113 data.write_u32::<LittleEndian>(self.bits.len() as u32)
114 .unwrap();
115 for &word in &self.bits {
116 data.write_u64::<LittleEndian>(word).unwrap();
117 }
118 data
119 }
120
121 pub fn insert(&mut self, key: &[u8]) {
123 let (h1, h2) = self.hash_pair(key);
124 for i in 0..self.num_hashes {
125 let bit_pos = self.get_bit_pos(h1, h2, i);
126 let word_idx = bit_pos / 64;
127 let bit_idx = bit_pos % 64;
128 if word_idx < self.bits.len() {
129 self.bits[word_idx] |= 1u64 << bit_idx;
130 }
131 }
132 }
133
134 pub fn may_contain(&self, key: &[u8]) -> bool {
137 let (h1, h2) = self.hash_pair(key);
138 for i in 0..self.num_hashes {
139 let bit_pos = self.get_bit_pos(h1, h2, i);
140 let word_idx = bit_pos / 64;
141 let bit_idx = bit_pos % 64;
142 if word_idx >= self.bits.len() || (self.bits[word_idx] & (1u64 << bit_idx)) == 0 {
143 return false;
144 }
145 }
146 true
147 }
148
149 pub fn size_bytes(&self) -> usize {
151 12 + self.bits.len() * 8
152 }
153
154 fn hash_pair(&self, key: &[u8]) -> (u64, u64) {
156 let mut h1: u64 = 0xcbf29ce484222325;
158 for &byte in key {
159 h1 ^= byte as u64;
160 h1 = h1.wrapping_mul(0x100000001b3);
161 }
162
163 let mut h2: u64 = 0x84222325cbf29ce4;
165 for &byte in key {
166 h2 = h2.wrapping_mul(0x100000001b3);
167 h2 ^= byte as u64;
168 }
169
170 (h1, h2)
171 }
172
173 #[inline]
175 fn get_bit_pos(&self, h1: u64, h2: u64, i: usize) -> usize {
176 (h1.wrapping_add((i as u64).wrapping_mul(h2)) % (self.num_bits as u64)) as usize
177 }
178}
179
180pub trait SSTableValue: Clone + Send + Sync {
182 fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()>;
183 fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self>;
184}
185
186impl SSTableValue for u64 {
188 fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
189 write_vint(writer, *self)
190 }
191
192 fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
193 read_vint(reader)
194 }
195}
196
197impl SSTableValue for Vec<u8> {
199 fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
200 write_vint(writer, self.len() as u64)?;
201 writer.write_all(self)
202 }
203
204 fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
205 let len = read_vint(reader)? as usize;
206 let mut data = vec![0u8; len];
207 reader.read_exact(&mut data)?;
208 Ok(data)
209 }
210}
211
212pub const MAX_INLINE_POSTINGS: usize = 3;
214
215#[derive(Debug, Clone, PartialEq, Eq)]
223pub enum TermInfo {
224 Inline {
227 doc_freq: u8,
229 data: [u8; 16],
232 data_len: u8,
234 },
235 External {
237 posting_offset: u64,
238 posting_len: u32,
239 doc_freq: u32,
240 position_offset: u64,
242 position_len: u32,
244 },
245}
246
247impl TermInfo {
248 pub fn external(posting_offset: u64, posting_len: u32, doc_freq: u32) -> Self {
250 TermInfo::External {
251 posting_offset,
252 posting_len,
253 doc_freq,
254 position_offset: 0,
255 position_len: 0,
256 }
257 }
258
259 pub fn external_with_positions(
261 posting_offset: u64,
262 posting_len: u32,
263 doc_freq: u32,
264 position_offset: u64,
265 position_len: u32,
266 ) -> Self {
267 TermInfo::External {
268 posting_offset,
269 posting_len,
270 doc_freq,
271 position_offset,
272 position_len,
273 }
274 }
275
276 pub fn try_inline(doc_ids: &[u32], term_freqs: &[u32]) -> Option<Self> {
279 if doc_ids.len() > MAX_INLINE_POSTINGS || doc_ids.is_empty() {
280 return None;
281 }
282
283 let mut data = [0u8; 16];
284 let mut cursor = std::io::Cursor::new(&mut data[..]);
285 let mut prev_doc_id = 0u32;
286
287 for (i, &doc_id) in doc_ids.iter().enumerate() {
288 let delta = doc_id - prev_doc_id;
289 if write_vint(&mut cursor, delta as u64).is_err() {
290 return None;
291 }
292 if write_vint(&mut cursor, term_freqs[i] as u64).is_err() {
293 return None;
294 }
295 prev_doc_id = doc_id;
296 }
297
298 let data_len = cursor.position() as u8;
299 if data_len > 16 {
300 return None;
301 }
302
303 Some(TermInfo::Inline {
304 doc_freq: doc_ids.len() as u8,
305 data,
306 data_len,
307 })
308 }
309
310 pub fn doc_freq(&self) -> u32 {
312 match self {
313 TermInfo::Inline { doc_freq, .. } => *doc_freq as u32,
314 TermInfo::External { doc_freq, .. } => *doc_freq,
315 }
316 }
317
318 pub fn is_inline(&self) -> bool {
320 matches!(self, TermInfo::Inline { .. })
321 }
322
323 pub fn external_info(&self) -> Option<(u64, u32)> {
325 match self {
326 TermInfo::External {
327 posting_offset,
328 posting_len,
329 ..
330 } => Some((*posting_offset, *posting_len)),
331 TermInfo::Inline { .. } => None,
332 }
333 }
334
335 pub fn position_info(&self) -> Option<(u64, u32)> {
337 match self {
338 TermInfo::External {
339 position_offset,
340 position_len,
341 ..
342 } if *position_len > 0 => Some((*position_offset, *position_len)),
343 _ => None,
344 }
345 }
346
347 pub fn decode_inline(&self) -> Option<(Vec<u32>, Vec<u32>)> {
350 match self {
351 TermInfo::Inline {
352 doc_freq,
353 data,
354 data_len,
355 } => {
356 let mut doc_ids = Vec::with_capacity(*doc_freq as usize);
357 let mut term_freqs = Vec::with_capacity(*doc_freq as usize);
358 let mut reader = &data[..*data_len as usize];
359 let mut prev_doc_id = 0u32;
360
361 for _ in 0..*doc_freq {
362 let delta = read_vint(&mut reader).ok()? as u32;
363 let tf = read_vint(&mut reader).ok()? as u32;
364 let doc_id = prev_doc_id + delta;
365 doc_ids.push(doc_id);
366 term_freqs.push(tf);
367 prev_doc_id = doc_id;
368 }
369
370 Some((doc_ids, term_freqs))
371 }
372 TermInfo::External { .. } => None,
373 }
374 }
375}
376
377impl SSTableValue for TermInfo {
378 fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
379 match self {
380 TermInfo::Inline {
381 doc_freq,
382 data,
383 data_len,
384 } => {
385 writer.write_u8(0xFF)?;
387 writer.write_u8(*doc_freq)?;
388 writer.write_u8(*data_len)?;
389 writer.write_all(&data[..*data_len as usize])?;
390 }
391 TermInfo::External {
392 posting_offset,
393 posting_len,
394 doc_freq,
395 position_offset,
396 position_len,
397 } => {
398 if *position_len > 0 {
401 writer.write_u8(0x01)?;
402 write_vint(writer, *doc_freq as u64)?;
403 write_vint(writer, *posting_offset)?;
404 write_vint(writer, *posting_len as u64)?;
405 write_vint(writer, *position_offset)?;
406 write_vint(writer, *position_len as u64)?;
407 } else {
408 writer.write_u8(0x00)?;
409 write_vint(writer, *doc_freq as u64)?;
410 write_vint(writer, *posting_offset)?;
411 write_vint(writer, *posting_len as u64)?;
412 }
413 }
414 }
415 Ok(())
416 }
417
418 fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
419 let tag = reader.read_u8()?;
420
421 if tag == 0xFF {
422 let doc_freq = reader.read_u8()?;
424 let data_len = reader.read_u8()?;
425 let mut data = [0u8; 16];
426 reader.read_exact(&mut data[..data_len as usize])?;
427 Ok(TermInfo::Inline {
428 doc_freq,
429 data,
430 data_len,
431 })
432 } else if tag == 0x00 {
433 let doc_freq = read_vint(reader)? as u32;
435 let posting_offset = read_vint(reader)?;
436 let posting_len = read_vint(reader)? as u32;
437 Ok(TermInfo::External {
438 posting_offset,
439 posting_len,
440 doc_freq,
441 position_offset: 0,
442 position_len: 0,
443 })
444 } else if tag == 0x01 {
445 let doc_freq = read_vint(reader)? as u32;
447 let posting_offset = read_vint(reader)?;
448 let posting_len = read_vint(reader)? as u32;
449 let position_offset = read_vint(reader)?;
450 let position_len = read_vint(reader)? as u32;
451 Ok(TermInfo::External {
452 posting_offset,
453 posting_len,
454 doc_freq,
455 position_offset,
456 position_len,
457 })
458 } else {
459 Err(io::Error::new(
460 io::ErrorKind::InvalidData,
461 format!("Invalid TermInfo tag: {}", tag),
462 ))
463 }
464 }
465}
466
467pub fn write_vint<W: Write + ?Sized>(writer: &mut W, mut value: u64) -> io::Result<()> {
469 loop {
470 let byte = (value & 0x7F) as u8;
471 value >>= 7;
472 if value == 0 {
473 writer.write_u8(byte)?;
474 return Ok(());
475 } else {
476 writer.write_u8(byte | 0x80)?;
477 }
478 }
479}
480
481pub fn read_vint<R: Read>(reader: &mut R) -> io::Result<u64> {
483 let mut result = 0u64;
484 let mut shift = 0;
485
486 loop {
487 let byte = reader.read_u8()?;
488 result |= ((byte & 0x7F) as u64) << shift;
489 if byte & 0x80 == 0 {
490 return Ok(result);
491 }
492 shift += 7;
493 if shift >= 64 {
494 return Err(io::Error::new(
495 io::ErrorKind::InvalidData,
496 "varint too long",
497 ));
498 }
499 }
500}
501
502pub fn common_prefix_len(a: &[u8], b: &[u8]) -> usize {
504 a.iter().zip(b.iter()).take_while(|(x, y)| x == y).count()
505}
506
507#[derive(Debug, Clone)]
509struct SparseIndexEntry {
510 first_key: Vec<u8>,
512 block_idx: u32,
514}
515
516#[derive(Debug, Clone)]
518pub struct SSTableStats {
519 pub num_blocks: usize,
520 pub num_sparse_entries: usize,
521 pub num_entries: u64,
522 pub has_bloom_filter: bool,
523 pub has_dictionary: bool,
524 pub bloom_filter_size: usize,
525 pub dictionary_size: usize,
526}
527
528#[derive(Debug, Clone)]
530pub struct SSTableWriterConfig {
531 pub compression_level: CompressionLevel,
533 pub use_dictionary: bool,
535 pub dict_size: usize,
537 pub use_bloom_filter: bool,
539 pub bloom_bits_per_key: usize,
541}
542
543impl Default for SSTableWriterConfig {
544 fn default() -> Self {
545 Self::from_optimization(crate::structures::IndexOptimization::default())
546 }
547}
548
549impl SSTableWriterConfig {
550 pub fn from_optimization(optimization: crate::structures::IndexOptimization) -> Self {
552 use crate::structures::IndexOptimization;
553 match optimization {
554 IndexOptimization::Adaptive => Self {
555 compression_level: CompressionLevel::BETTER, use_dictionary: false,
557 dict_size: DEFAULT_DICT_SIZE,
558 use_bloom_filter: false,
559 bloom_bits_per_key: BLOOM_BITS_PER_KEY,
560 },
561 IndexOptimization::SizeOptimized => Self {
562 compression_level: CompressionLevel::MAX, use_dictionary: true,
564 dict_size: DEFAULT_DICT_SIZE,
565 use_bloom_filter: true,
566 bloom_bits_per_key: BLOOM_BITS_PER_KEY,
567 },
568 IndexOptimization::PerformanceOptimized => Self {
569 compression_level: CompressionLevel::FAST, use_dictionary: false,
571 dict_size: DEFAULT_DICT_SIZE,
572 use_bloom_filter: true, bloom_bits_per_key: BLOOM_BITS_PER_KEY,
574 },
575 }
576 }
577
578 pub fn fast() -> Self {
580 Self::from_optimization(crate::structures::IndexOptimization::PerformanceOptimized)
581 }
582
583 pub fn max_compression() -> Self {
585 Self::from_optimization(crate::structures::IndexOptimization::SizeOptimized)
586 }
587}
588
589pub struct SSTableWriter<'a, V: SSTableValue> {
595 writer: &'a mut dyn Write,
596 block_buffer: Vec<u8>,
597 prev_key: Vec<u8>,
598 index: Vec<BlockIndexEntry>,
599 current_offset: u64,
600 num_entries: u64,
601 block_first_key: Option<Vec<u8>>,
602 config: SSTableWriterConfig,
603 dictionary: Option<CompressionDict>,
605 all_keys: Vec<Vec<u8>>,
607 bloom_filter: Option<BloomFilter>,
609 _phantom: std::marker::PhantomData<V>,
610}
611
612impl<'a, V: SSTableValue> SSTableWriter<'a, V> {
613 pub fn new(writer: &'a mut dyn Write) -> Self {
615 Self::with_config(writer, SSTableWriterConfig::default())
616 }
617
618 pub fn with_config(writer: &'a mut dyn Write, config: SSTableWriterConfig) -> Self {
620 Self {
621 writer,
622 block_buffer: Vec::with_capacity(BLOCK_SIZE),
623 prev_key: Vec::new(),
624 index: Vec::new(),
625 current_offset: 0,
626 num_entries: 0,
627 block_first_key: None,
628 config,
629 dictionary: None,
630 all_keys: Vec::new(),
631 bloom_filter: None,
632 _phantom: std::marker::PhantomData,
633 }
634 }
635
636 pub fn with_dictionary(
638 writer: &'a mut dyn Write,
639 config: SSTableWriterConfig,
640 dictionary: CompressionDict,
641 ) -> Self {
642 Self {
643 writer,
644 block_buffer: Vec::with_capacity(BLOCK_SIZE),
645 prev_key: Vec::new(),
646 index: Vec::new(),
647 current_offset: 0,
648 num_entries: 0,
649 block_first_key: None,
650 config,
651 dictionary: Some(dictionary),
652 all_keys: Vec::new(),
653 bloom_filter: None,
654 _phantom: std::marker::PhantomData,
655 }
656 }
657
658 pub fn insert(&mut self, key: &[u8], value: &V) -> io::Result<()> {
659 if self.block_first_key.is_none() {
660 self.block_first_key = Some(key.to_vec());
661 }
662
663 if self.config.use_bloom_filter {
665 self.all_keys.push(key.to_vec());
666 }
667
668 let prefix_len = common_prefix_len(&self.prev_key, key);
669 let suffix = &key[prefix_len..];
670
671 write_vint(&mut self.block_buffer, prefix_len as u64)?;
672 write_vint(&mut self.block_buffer, suffix.len() as u64)?;
673 self.block_buffer.extend_from_slice(suffix);
674 value.serialize(&mut self.block_buffer)?;
675
676 self.prev_key.clear();
677 self.prev_key.extend_from_slice(key);
678 self.num_entries += 1;
679
680 if self.block_buffer.len() >= BLOCK_SIZE {
681 self.flush_block()?;
682 }
683
684 Ok(())
685 }
686
687 fn flush_block(&mut self) -> io::Result<()> {
689 if self.block_buffer.is_empty() {
690 return Ok(());
691 }
692
693 let compressed = if let Some(ref dict) = self.dictionary {
695 crate::compression::compress_with_dict(
696 &self.block_buffer,
697 self.config.compression_level,
698 dict,
699 )?
700 } else {
701 crate::compression::compress(&self.block_buffer, self.config.compression_level)?
702 };
703
704 if let Some(first_key) = self.block_first_key.take() {
705 self.index.push(BlockIndexEntry {
706 first_key,
707 offset: self.current_offset,
708 length: compressed.len() as u32,
709 });
710 }
711
712 self.writer.write_all(&compressed)?;
713 self.current_offset += compressed.len() as u64;
714 self.block_buffer.clear();
715 self.prev_key.clear();
716
717 Ok(())
718 }
719
720 pub fn finish(mut self) -> io::Result<()> {
721 self.flush_block()?;
723
724 if self.config.use_bloom_filter && !self.all_keys.is_empty() {
726 let mut bloom = BloomFilter::new(self.all_keys.len(), self.config.bloom_bits_per_key);
727 for key in &self.all_keys {
728 bloom.insert(key);
729 }
730 self.bloom_filter = Some(bloom);
731 }
732
733 let data_end_offset = self.current_offset;
734
735 let sparse_index: Vec<SparseIndexEntry> = self
737 .index
738 .iter()
739 .enumerate()
740 .filter(|(i, _)| *i % SPARSE_INDEX_INTERVAL == 0)
741 .map(|(i, entry)| SparseIndexEntry {
742 first_key: entry.first_key.clone(),
743 block_idx: i as u32,
744 })
745 .collect();
746
747 let index_clone = self.index.clone();
749 self.write_block_index_compressed(&index_clone)?;
750
751 self.writer
753 .write_u32::<LittleEndian>(sparse_index.len() as u32)?;
754 for entry in &sparse_index {
755 self.writer
756 .write_u16::<LittleEndian>(entry.first_key.len() as u16)?;
757 self.writer.write_all(&entry.first_key)?;
758 self.writer.write_u32::<LittleEndian>(entry.block_idx)?;
759 }
760
761 let bloom_offset = if let Some(ref bloom) = self.bloom_filter {
763 let bloom_data = bloom.to_bytes();
764 let offset = self.current_offset;
765 self.writer.write_all(&bloom_data)?;
766 self.current_offset += bloom_data.len() as u64;
767 offset
768 } else {
769 0
770 };
771
772 let dict_offset = if let Some(ref dict) = self.dictionary {
774 let dict_bytes = dict.as_bytes();
775 let offset = self.current_offset;
776 self.writer
777 .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
778 self.writer.write_all(dict_bytes)?;
779 self.current_offset += 4 + dict_bytes.len() as u64;
780 offset
781 } else {
782 0
783 };
784
785 self.writer.write_u64::<LittleEndian>(data_end_offset)?;
787 self.writer.write_u64::<LittleEndian>(self.num_entries)?;
788 self.writer.write_u64::<LittleEndian>(bloom_offset)?; self.writer.write_u64::<LittleEndian>(dict_offset)?; self.writer
791 .write_u8(self.config.compression_level.0 as u8)?;
792 self.writer.write_u32::<LittleEndian>(SSTABLE_MAGIC)?;
793
794 Ok(())
795 }
796
797 fn write_block_index_compressed(&mut self, index: &[BlockIndexEntry]) -> io::Result<()> {
799 self.writer.write_u32::<LittleEndian>(index.len() as u32)?;
800
801 let mut prev_key: Vec<u8> = Vec::new();
802 for entry in index {
803 let prefix_len = common_prefix_len(&prev_key, &entry.first_key);
805 let suffix = &entry.first_key[prefix_len..];
806
807 write_vint(&mut *self.writer, prefix_len as u64)?;
809 write_vint(&mut *self.writer, suffix.len() as u64)?;
810 self.writer.write_all(suffix)?;
811 write_vint(&mut *self.writer, entry.offset)?;
812 write_vint(&mut *self.writer, entry.length as u64)?;
813
814 prev_key.clear();
815 prev_key.extend_from_slice(&entry.first_key);
816 }
817
818 Ok(())
819 }
820}
821
822#[derive(Debug, Clone)]
824struct BlockIndexEntry {
825 first_key: Vec<u8>,
826 offset: u64,
827 length: u32,
828}
829
830pub struct AsyncSSTableReader<V: SSTableValue> {
832 data_slice: LazyFileSlice,
834 index: Vec<BlockIndexEntry>,
836 sparse_index: Vec<SparseIndexEntry>,
838 num_entries: u64,
839 cache: RwLock<BlockCache>,
841 bloom_filter: Option<BloomFilter>,
843 dictionary: Option<CompressionDict>,
845 #[allow(dead_code)]
847 compression_level: CompressionLevel,
848 _phantom: std::marker::PhantomData<V>,
849}
850
851struct BlockCache {
853 blocks: FxHashMap<u64, Arc<Vec<u8>>>,
854 access_order: Vec<u64>,
855 max_blocks: usize,
856}
857
858impl BlockCache {
859 fn new(max_blocks: usize) -> Self {
860 Self {
861 blocks: FxHashMap::default(),
862 access_order: Vec::new(),
863 max_blocks,
864 }
865 }
866
867 fn get(&mut self, offset: u64) -> Option<Arc<Vec<u8>>> {
868 if let Some(block) = self.blocks.get(&offset) {
869 if let Some(pos) = self.access_order.iter().position(|&o| o == offset) {
870 self.access_order.remove(pos);
871 self.access_order.push(offset);
872 }
873 Some(Arc::clone(block))
874 } else {
875 None
876 }
877 }
878
879 fn insert(&mut self, offset: u64, block: Arc<Vec<u8>>) {
880 while self.blocks.len() >= self.max_blocks && !self.access_order.is_empty() {
881 let evict_offset = self.access_order.remove(0);
882 self.blocks.remove(&evict_offset);
883 }
884 self.blocks.insert(offset, block);
885 self.access_order.push(offset);
886 }
887}
888
889impl<V: SSTableValue> AsyncSSTableReader<V> {
890 pub async fn open(file_handle: LazyFileHandle, cache_blocks: usize) -> io::Result<Self> {
893 let file_len = file_handle.len();
894 if file_len < 37 {
895 return Err(io::Error::new(
896 io::ErrorKind::InvalidData,
897 "SSTable too small",
898 ));
899 }
900
901 let footer_bytes = file_handle
904 .read_bytes_range(file_len - 37..file_len)
905 .await?;
906
907 let mut reader = footer_bytes.as_slice();
908 let data_end_offset = reader.read_u64::<LittleEndian>()?;
909 let num_entries = reader.read_u64::<LittleEndian>()?;
910 let bloom_offset = reader.read_u64::<LittleEndian>()?;
911 let dict_offset = reader.read_u64::<LittleEndian>()?;
912 let compression_level = CompressionLevel(reader.read_u8()? as i32);
913 let magic = reader.read_u32::<LittleEndian>()?;
914
915 if magic != SSTABLE_MAGIC {
916 return Err(io::Error::new(
917 io::ErrorKind::InvalidData,
918 format!("Invalid SSTable magic: 0x{:08X}", magic),
919 ));
920 }
921
922 let index_start = data_end_offset;
924 let index_end = file_len - 37;
925 let index_bytes = file_handle.read_bytes_range(index_start..index_end).await?;
926 let mut reader = index_bytes.as_slice();
927
928 let num_blocks = reader.read_u32::<LittleEndian>()? as usize;
930 let mut index = Vec::with_capacity(num_blocks);
931 let mut prev_key: Vec<u8> = Vec::new();
932
933 for _ in 0..num_blocks {
934 let prefix_len = read_vint(&mut reader)? as usize;
935 let suffix_len = read_vint(&mut reader)? as usize;
936 let mut suffix = vec![0u8; suffix_len];
937 reader.read_exact(&mut suffix)?;
938
939 let mut first_key = prev_key[..prefix_len.min(prev_key.len())].to_vec();
941 first_key.extend_from_slice(&suffix);
942
943 let offset = read_vint(&mut reader)?;
944 let length = read_vint(&mut reader)? as u32;
945
946 prev_key = first_key.clone();
947 index.push(BlockIndexEntry {
948 first_key,
949 offset,
950 length,
951 });
952 }
953
954 let num_sparse = reader.read_u32::<LittleEndian>()? as usize;
956 let mut sparse_index = Vec::with_capacity(num_sparse);
957
958 for _ in 0..num_sparse {
959 let key_len = reader.read_u16::<LittleEndian>()? as usize;
960 let mut first_key = vec![0u8; key_len];
961 reader.read_exact(&mut first_key)?;
962 let block_idx = reader.read_u32::<LittleEndian>()?;
963
964 sparse_index.push(SparseIndexEntry {
965 first_key,
966 block_idx,
967 });
968 }
969
970 let bloom_filter = if bloom_offset > 0 {
972 let bloom_start = bloom_offset;
973 let bloom_header = file_handle
975 .read_bytes_range(bloom_start..bloom_start + 12)
976 .await?;
977 let num_words = u32::from_le_bytes([
978 bloom_header[8],
979 bloom_header[9],
980 bloom_header[10],
981 bloom_header[11],
982 ]) as u64;
983 let bloom_size = 12 + num_words * 8;
984 let bloom_data = file_handle
985 .read_bytes_range(bloom_start..bloom_start + bloom_size)
986 .await?;
987 Some(BloomFilter::from_bytes(&bloom_data)?)
988 } else {
989 None
990 };
991
992 let dictionary = if dict_offset > 0 {
994 let dict_start = dict_offset;
995 let dict_len_bytes = file_handle
997 .read_bytes_range(dict_start..dict_start + 4)
998 .await?;
999 let dict_len = u32::from_le_bytes([
1000 dict_len_bytes[0],
1001 dict_len_bytes[1],
1002 dict_len_bytes[2],
1003 dict_len_bytes[3],
1004 ]) as u64;
1005 let dict_data = file_handle
1006 .read_bytes_range(dict_start + 4..dict_start + 4 + dict_len)
1007 .await?;
1008 Some(CompressionDict::from_bytes(dict_data.to_vec()))
1009 } else {
1010 None
1011 };
1012
1013 let data_slice = file_handle.slice(0..data_end_offset);
1015
1016 Ok(Self {
1017 data_slice,
1018 index,
1019 sparse_index,
1020 num_entries,
1021 cache: RwLock::new(BlockCache::new(cache_blocks)),
1022 bloom_filter,
1023 dictionary,
1024 compression_level,
1025 _phantom: std::marker::PhantomData,
1026 })
1027 }
1028
1029 pub fn num_entries(&self) -> u64 {
1031 self.num_entries
1032 }
1033
1034 pub fn stats(&self) -> SSTableStats {
1036 SSTableStats {
1037 num_blocks: self.index.len(),
1038 num_sparse_entries: self.sparse_index.len(),
1039 num_entries: self.num_entries,
1040 has_bloom_filter: self.bloom_filter.is_some(),
1041 has_dictionary: self.dictionary.is_some(),
1042 bloom_filter_size: self
1043 .bloom_filter
1044 .as_ref()
1045 .map(|b| b.size_bytes())
1046 .unwrap_or(0),
1047 dictionary_size: self.dictionary.as_ref().map(|d| d.len()).unwrap_or(0),
1048 }
1049 }
1050
1051 pub async fn get(&self, key: &[u8]) -> io::Result<Option<V>> {
1056 log::debug!(
1057 "SSTable::get called, key_len={}, total_blocks={}, sparse_entries={}",
1058 key.len(),
1059 self.index.len(),
1060 self.sparse_index.len()
1061 );
1062
1063 if let Some(ref bloom) = self.bloom_filter
1065 && !bloom.may_contain(key)
1066 {
1067 log::debug!("SSTable::get bloom filter negative");
1068 return Ok(None);
1069 }
1070
1071 let (start_block, end_block) = self.find_block_range(key);
1073 log::debug!("SSTable::get sparse_range=[{}, {}]", start_block, end_block);
1074
1075 let search_range = &self.index[start_block..=end_block];
1077 let block_idx =
1078 match search_range.binary_search_by(|entry| entry.first_key.as_slice().cmp(key)) {
1079 Ok(idx) => start_block + idx,
1080 Err(0) => {
1081 if start_block == 0 {
1082 log::debug!("SSTable::get key not found (before first block)");
1083 return Ok(None);
1084 }
1085 start_block
1086 }
1087 Err(idx) => start_block + idx - 1,
1088 };
1089
1090 log::debug!("SSTable::get loading block_idx={}", block_idx);
1091
1092 let block_data = self.load_block(block_idx).await?;
1094 self.search_block(&block_data, key)
1095 }
1096
1097 pub async fn get_batch(&self, keys: &[&[u8]]) -> io::Result<Vec<Option<V>>> {
1103 if keys.is_empty() {
1104 return Ok(Vec::new());
1105 }
1106
1107 let mut key_to_block: Vec<(usize, usize)> = Vec::with_capacity(keys.len());
1109 for (key_idx, key) in keys.iter().enumerate() {
1110 if let Some(ref bloom) = self.bloom_filter
1112 && !bloom.may_contain(key)
1113 {
1114 key_to_block.push((key_idx, usize::MAX)); continue;
1116 }
1117
1118 let (start_block, end_block) = self.find_block_range(key);
1119 let search_range = &self.index[start_block..=end_block];
1120 let block_idx =
1121 match search_range.binary_search_by(|entry| entry.first_key.as_slice().cmp(key)) {
1122 Ok(idx) => start_block + idx,
1123 Err(0) => {
1124 if start_block == 0 {
1125 key_to_block.push((key_idx, usize::MAX)); continue;
1127 }
1128 start_block
1129 }
1130 Err(idx) => start_block + idx - 1,
1131 };
1132 key_to_block.push((key_idx, block_idx));
1133 }
1134
1135 let mut blocks_to_load: Vec<usize> = key_to_block
1137 .iter()
1138 .filter(|(_, b)| *b != usize::MAX)
1139 .map(|(_, b)| *b)
1140 .collect();
1141 blocks_to_load.sort_unstable();
1142 blocks_to_load.dedup();
1143
1144 for &block_idx in &blocks_to_load {
1146 let _ = self.load_block(block_idx).await?;
1147 }
1148
1149 let mut results = vec![None; keys.len()];
1151 for (key_idx, block_idx) in key_to_block {
1152 if block_idx == usize::MAX {
1153 continue;
1154 }
1155 let block_data = self.load_block(block_idx).await?; results[key_idx] = self.search_block(&block_data, keys[key_idx])?;
1157 }
1158
1159 Ok(results)
1160 }
1161
1162 fn find_block_range(&self, key: &[u8]) -> (usize, usize) {
1166 if self.sparse_index.is_empty() {
1167 return (0, self.index.len().saturating_sub(1));
1168 }
1169
1170 let sparse_pos = match self
1172 .sparse_index
1173 .binary_search_by(|entry| entry.first_key.as_slice().cmp(key))
1174 {
1175 Ok(idx) => idx,
1176 Err(0) => 0,
1177 Err(idx) => idx - 1,
1178 };
1179
1180 let start_block = self.sparse_index[sparse_pos].block_idx as usize;
1181 let end_block = if sparse_pos + 1 < self.sparse_index.len() {
1182 (self.sparse_index[sparse_pos + 1].block_idx as usize).saturating_sub(1)
1184 } else {
1185 self.index.len().saturating_sub(1)
1187 };
1188
1189 (start_block, end_block.max(start_block))
1190 }
1191
1192 pub async fn preload_all_blocks(&self) -> io::Result<()> {
1197 for block_idx in 0..self.index.len() {
1198 self.load_block(block_idx).await?;
1199 }
1200 Ok(())
1201 }
1202
1203 async fn load_block(&self, block_idx: usize) -> io::Result<Arc<Vec<u8>>> {
1206 let entry = &self.index[block_idx];
1207
1208 {
1210 let mut cache = self.cache.write();
1211 if let Some(block) = cache.get(entry.offset) {
1212 log::debug!("SSTable::load_block idx={} CACHE HIT", block_idx);
1213 return Ok(block);
1214 }
1215 }
1216
1217 log::debug!(
1218 "SSTable::load_block idx={} CACHE MISS, reading bytes [{}-{}]",
1219 block_idx,
1220 entry.offset,
1221 entry.offset + entry.length as u64
1222 );
1223
1224 let start = entry.offset;
1226 let end = start + entry.length as u64;
1227 let compressed = self.data_slice.read_bytes_range(start..end).await?;
1228
1229 let decompressed = if let Some(ref dict) = self.dictionary {
1231 crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
1232 } else {
1233 crate::compression::decompress(compressed.as_slice())?
1234 };
1235
1236 let block = Arc::new(decompressed);
1237
1238 {
1240 let mut cache = self.cache.write();
1241 cache.insert(entry.offset, Arc::clone(&block));
1242 }
1243
1244 Ok(block)
1245 }
1246
1247 fn search_block(&self, block_data: &[u8], target_key: &[u8]) -> io::Result<Option<V>> {
1248 let mut reader = block_data;
1249 let mut current_key = Vec::new();
1250
1251 while !reader.is_empty() {
1252 let common_prefix_len = read_vint(&mut reader)? as usize;
1253 let suffix_len = read_vint(&mut reader)? as usize;
1254
1255 current_key.truncate(common_prefix_len);
1256 let mut suffix = vec![0u8; suffix_len];
1257 reader.read_exact(&mut suffix)?;
1258 current_key.extend_from_slice(&suffix);
1259
1260 let value = V::deserialize(&mut reader)?;
1261
1262 match current_key.as_slice().cmp(target_key) {
1263 std::cmp::Ordering::Equal => return Ok(Some(value)),
1264 std::cmp::Ordering::Greater => return Ok(None),
1265 std::cmp::Ordering::Less => continue,
1266 }
1267 }
1268
1269 Ok(None)
1270 }
1271
1272 pub async fn prefetch_range(&self, start_key: &[u8], end_key: &[u8]) -> io::Result<()> {
1274 let start_block = match self
1275 .index
1276 .binary_search_by(|e| e.first_key.as_slice().cmp(start_key))
1277 {
1278 Ok(idx) => idx,
1279 Err(0) => 0,
1280 Err(idx) => idx - 1,
1281 };
1282
1283 let end_block = match self
1284 .index
1285 .binary_search_by(|e| e.first_key.as_slice().cmp(end_key))
1286 {
1287 Ok(idx) => idx,
1288 Err(idx) if idx >= self.index.len() => self.index.len().saturating_sub(1),
1289 Err(idx) => idx,
1290 };
1291
1292 for block_idx in start_block..=end_block.min(self.index.len().saturating_sub(1)) {
1293 let _ = self.load_block(block_idx).await?;
1294 }
1295
1296 Ok(())
1297 }
1298
1299 pub fn iter(&self) -> AsyncSSTableIterator<'_, V> {
1301 AsyncSSTableIterator::new(self)
1302 }
1303
1304 pub async fn all_entries(&self) -> io::Result<Vec<(Vec<u8>, V)>> {
1306 let mut results = Vec::new();
1307
1308 for block_idx in 0..self.index.len() {
1309 let block_data = self.load_block(block_idx).await?;
1310 let mut reader = block_data.as_slice();
1311 let mut current_key = Vec::new();
1312
1313 while !reader.is_empty() {
1314 let common_prefix_len = read_vint(&mut reader)? as usize;
1315 let suffix_len = read_vint(&mut reader)? as usize;
1316
1317 current_key.truncate(common_prefix_len);
1318 let mut suffix = vec![0u8; suffix_len];
1319 reader.read_exact(&mut suffix)?;
1320 current_key.extend_from_slice(&suffix);
1321
1322 let value = V::deserialize(&mut reader)?;
1323 results.push((current_key.clone(), value));
1324 }
1325 }
1326
1327 Ok(results)
1328 }
1329}
1330
1331pub struct AsyncSSTableIterator<'a, V: SSTableValue> {
1333 reader: &'a AsyncSSTableReader<V>,
1334 current_block: usize,
1335 block_data: Option<Arc<Vec<u8>>>,
1336 block_offset: usize,
1337 current_key: Vec<u8>,
1338 finished: bool,
1339}
1340
1341impl<'a, V: SSTableValue> AsyncSSTableIterator<'a, V> {
1342 fn new(reader: &'a AsyncSSTableReader<V>) -> Self {
1343 Self {
1344 reader,
1345 current_block: 0,
1346 block_data: None,
1347 block_offset: 0,
1348 current_key: Vec::new(),
1349 finished: reader.index.is_empty(),
1350 }
1351 }
1352
1353 async fn load_next_block(&mut self) -> io::Result<bool> {
1354 if self.current_block >= self.reader.index.len() {
1355 self.finished = true;
1356 return Ok(false);
1357 }
1358
1359 self.block_data = Some(self.reader.load_block(self.current_block).await?);
1360 self.block_offset = 0;
1361 self.current_key.clear();
1362 self.current_block += 1;
1363 Ok(true)
1364 }
1365
1366 pub async fn next(&mut self) -> io::Result<Option<(Vec<u8>, V)>> {
1368 if self.finished {
1369 return Ok(None);
1370 }
1371
1372 if self.block_data.is_none() && !self.load_next_block().await? {
1373 return Ok(None);
1374 }
1375
1376 loop {
1377 let block = self.block_data.as_ref().unwrap();
1378 if self.block_offset >= block.len() {
1379 if !self.load_next_block().await? {
1380 return Ok(None);
1381 }
1382 continue;
1383 }
1384
1385 let mut reader = &block[self.block_offset..];
1386 let start_len = reader.len();
1387
1388 let common_prefix_len = read_vint(&mut reader)? as usize;
1389 let suffix_len = read_vint(&mut reader)? as usize;
1390
1391 self.current_key.truncate(common_prefix_len);
1392 let mut suffix = vec![0u8; suffix_len];
1393 reader.read_exact(&mut suffix)?;
1394 self.current_key.extend_from_slice(&suffix);
1395
1396 let value = V::deserialize(&mut reader)?;
1397
1398 self.block_offset += start_len - reader.len();
1399
1400 return Ok(Some((self.current_key.clone(), value)));
1401 }
1402 }
1403}
1404
1405#[cfg(test)]
1406mod tests {
1407 use super::*;
1408
1409 #[test]
1410 fn test_bloom_filter_basic() {
1411 let mut bloom = BloomFilter::new(100, 10);
1412
1413 bloom.insert(b"hello");
1414 bloom.insert(b"world");
1415 bloom.insert(b"test");
1416
1417 assert!(bloom.may_contain(b"hello"));
1418 assert!(bloom.may_contain(b"world"));
1419 assert!(bloom.may_contain(b"test"));
1420
1421 assert!(!bloom.may_contain(b"notfound"));
1423 assert!(!bloom.may_contain(b"missing"));
1424 }
1425
1426 #[test]
1427 fn test_bloom_filter_serialization() {
1428 let mut bloom = BloomFilter::new(100, 10);
1429 bloom.insert(b"key1");
1430 bloom.insert(b"key2");
1431
1432 let bytes = bloom.to_bytes();
1433 let restored = BloomFilter::from_bytes(&bytes).unwrap();
1434
1435 assert!(restored.may_contain(b"key1"));
1436 assert!(restored.may_contain(b"key2"));
1437 assert!(!restored.may_contain(b"key3"));
1438 }
1439
1440 #[test]
1441 fn test_bloom_filter_false_positive_rate() {
1442 let num_keys = 10000;
1443 let mut bloom = BloomFilter::new(num_keys, BLOOM_BITS_PER_KEY);
1444
1445 for i in 0..num_keys {
1447 let key = format!("key_{}", i);
1448 bloom.insert(key.as_bytes());
1449 }
1450
1451 for i in 0..num_keys {
1453 let key = format!("key_{}", i);
1454 assert!(bloom.may_contain(key.as_bytes()));
1455 }
1456
1457 let mut false_positives = 0;
1459 let test_count = 10000;
1460 for i in 0..test_count {
1461 let key = format!("nonexistent_{}", i);
1462 if bloom.may_contain(key.as_bytes()) {
1463 false_positives += 1;
1464 }
1465 }
1466
1467 let fp_rate = false_positives as f64 / test_count as f64;
1470 assert!(
1471 fp_rate < 0.03,
1472 "False positive rate {} is too high",
1473 fp_rate
1474 );
1475 }
1476
1477 #[test]
1478 fn test_sstable_writer_config() {
1479 use crate::structures::IndexOptimization;
1480
1481 let config = SSTableWriterConfig::default();
1483 assert_eq!(config.compression_level.0, 9); assert!(!config.use_bloom_filter);
1485 assert!(!config.use_dictionary);
1486
1487 let adaptive = SSTableWriterConfig::from_optimization(IndexOptimization::Adaptive);
1489 assert_eq!(adaptive.compression_level.0, 9);
1490 assert!(!adaptive.use_bloom_filter);
1491 assert!(!adaptive.use_dictionary);
1492
1493 let size = SSTableWriterConfig::from_optimization(IndexOptimization::SizeOptimized);
1495 assert_eq!(size.compression_level.0, 22); assert!(size.use_bloom_filter);
1497 assert!(size.use_dictionary);
1498
1499 let perf = SSTableWriterConfig::from_optimization(IndexOptimization::PerformanceOptimized);
1501 assert_eq!(perf.compression_level.0, 1); assert!(perf.use_bloom_filter); assert!(!perf.use_dictionary);
1504
1505 let fast = SSTableWriterConfig::fast();
1507 assert_eq!(fast.compression_level.0, 1);
1508
1509 let max = SSTableWriterConfig::max_compression();
1510 assert_eq!(max.compression_level.0, 22);
1511 }
1512
1513 #[test]
1514 fn test_vint_roundtrip() {
1515 let test_values = [0u64, 1, 127, 128, 255, 256, 16383, 16384, u64::MAX];
1516
1517 for &val in &test_values {
1518 let mut buf = Vec::new();
1519 write_vint(&mut buf, val).unwrap();
1520 let mut reader = buf.as_slice();
1521 let decoded = read_vint(&mut reader).unwrap();
1522 assert_eq!(val, decoded, "Failed for value {}", val);
1523 }
1524 }
1525
1526 #[test]
1527 fn test_common_prefix_len() {
1528 assert_eq!(common_prefix_len(b"hello", b"hello"), 5);
1529 assert_eq!(common_prefix_len(b"hello", b"help"), 3);
1530 assert_eq!(common_prefix_len(b"hello", b"world"), 0);
1531 assert_eq!(common_prefix_len(b"", b"hello"), 0);
1532 assert_eq!(common_prefix_len(b"hello", b""), 0);
1533 }
1534}