1use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
22use parking_lot::RwLock;
23use rustc_hash::FxHashMap;
24use std::io::{self, Read, Write};
25use std::sync::Arc;
26
27#[cfg(feature = "native")]
28use super::sstable_index::FstBlockIndex;
29use super::sstable_index::{BlockAddr, BlockIndex, MmapBlockIndex};
30use crate::compression::{CompressionDict, CompressionLevel};
31use crate::directories::{AsyncFileRead, LazyFileHandle, LazyFileSlice, OwnedBytes};
32
33pub const SSTABLE_MAGIC: u32 = 0x53544234; pub const BLOCK_SIZE: usize = 16 * 1024;
39
40pub const DEFAULT_DICT_SIZE: usize = 64 * 1024;
42
43pub const BLOOM_BITS_PER_KEY: usize = 10;
45
46pub const BLOOM_HASH_COUNT: usize = 7;
48
49#[derive(Debug, Clone)]
55pub struct BloomFilter {
56 bits: Vec<u64>,
57 num_bits: usize,
58 num_hashes: usize,
59}
60
61impl BloomFilter {
62 pub fn new(expected_keys: usize, bits_per_key: usize) -> Self {
64 let num_bits = (expected_keys * bits_per_key).max(64);
65 let num_words = num_bits.div_ceil(64);
66 Self {
67 bits: vec![0u64; num_words],
68 num_bits,
69 num_hashes: BLOOM_HASH_COUNT,
70 }
71 }
72
73 pub fn from_bytes(data: &[u8]) -> io::Result<Self> {
75 if data.len() < 12 {
76 return Err(io::Error::new(
77 io::ErrorKind::InvalidData,
78 "Bloom filter data too short",
79 ));
80 }
81 let mut reader = data;
82 let num_bits = reader.read_u32::<LittleEndian>()? as usize;
83 let num_hashes = reader.read_u32::<LittleEndian>()? as usize;
84 let num_words = reader.read_u32::<LittleEndian>()? as usize;
85
86 if reader.len() < num_words * 8 {
87 return Err(io::Error::new(
88 io::ErrorKind::InvalidData,
89 "Bloom filter data truncated",
90 ));
91 }
92
93 let mut bits = Vec::with_capacity(num_words);
94 for _ in 0..num_words {
95 bits.push(reader.read_u64::<LittleEndian>()?);
96 }
97
98 Ok(Self {
99 bits,
100 num_bits,
101 num_hashes,
102 })
103 }
104
105 pub fn to_bytes(&self) -> Vec<u8> {
107 let mut data = Vec::with_capacity(12 + self.bits.len() * 8);
108 data.write_u32::<LittleEndian>(self.num_bits as u32)
109 .unwrap();
110 data.write_u32::<LittleEndian>(self.num_hashes as u32)
111 .unwrap();
112 data.write_u32::<LittleEndian>(self.bits.len() as u32)
113 .unwrap();
114 for &word in &self.bits {
115 data.write_u64::<LittleEndian>(word).unwrap();
116 }
117 data
118 }
119
120 pub fn insert(&mut self, key: &[u8]) {
122 let (h1, h2) = self.hash_pair(key);
123 for i in 0..self.num_hashes {
124 let bit_pos = self.get_bit_pos(h1, h2, i);
125 let word_idx = bit_pos / 64;
126 let bit_idx = bit_pos % 64;
127 if word_idx < self.bits.len() {
128 self.bits[word_idx] |= 1u64 << bit_idx;
129 }
130 }
131 }
132
133 pub fn may_contain(&self, key: &[u8]) -> bool {
136 let (h1, h2) = self.hash_pair(key);
137 for i in 0..self.num_hashes {
138 let bit_pos = self.get_bit_pos(h1, h2, i);
139 let word_idx = bit_pos / 64;
140 let bit_idx = bit_pos % 64;
141 if word_idx >= self.bits.len() || (self.bits[word_idx] & (1u64 << bit_idx)) == 0 {
142 return false;
143 }
144 }
145 true
146 }
147
148 pub fn size_bytes(&self) -> usize {
150 12 + self.bits.len() * 8
151 }
152
153 fn hash_pair(&self, key: &[u8]) -> (u64, u64) {
155 let mut h1: u64 = 0xcbf29ce484222325;
157 for &byte in key {
158 h1 ^= byte as u64;
159 h1 = h1.wrapping_mul(0x100000001b3);
160 }
161
162 let mut h2: u64 = 0x84222325cbf29ce4;
164 for &byte in key {
165 h2 = h2.wrapping_mul(0x100000001b3);
166 h2 ^= byte as u64;
167 }
168
169 (h1, h2)
170 }
171
172 #[inline]
174 fn get_bit_pos(&self, h1: u64, h2: u64, i: usize) -> usize {
175 (h1.wrapping_add((i as u64).wrapping_mul(h2)) % (self.num_bits as u64)) as usize
176 }
177}
178
179pub trait SSTableValue: Clone + Send + Sync {
181 fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()>;
182 fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self>;
183}
184
185impl SSTableValue for u64 {
187 fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
188 write_vint(writer, *self)
189 }
190
191 fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
192 read_vint(reader)
193 }
194}
195
196impl SSTableValue for Vec<u8> {
198 fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
199 write_vint(writer, self.len() as u64)?;
200 writer.write_all(self)
201 }
202
203 fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
204 let len = read_vint(reader)? as usize;
205 let mut data = vec![0u8; len];
206 reader.read_exact(&mut data)?;
207 Ok(data)
208 }
209}
210
211#[derive(Debug, Clone, Copy, PartialEq, Eq)]
214pub struct SparseDimInfo {
215 pub offset: u64,
217 pub length: u32,
219}
220
221impl SparseDimInfo {
222 pub fn new(offset: u64, length: u32) -> Self {
223 Self { offset, length }
224 }
225}
226
227impl SSTableValue for SparseDimInfo {
228 fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
229 write_vint(writer, self.offset)?;
230 write_vint(writer, self.length as u64)
231 }
232
233 fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
234 let offset = read_vint(reader)?;
235 let length = read_vint(reader)? as u32;
236 Ok(Self { offset, length })
237 }
238}
239
240pub const MAX_INLINE_POSTINGS: usize = 3;
242
243#[derive(Debug, Clone, PartialEq, Eq)]
251pub enum TermInfo {
252 Inline {
255 doc_freq: u8,
257 data: [u8; 16],
260 data_len: u8,
262 },
263 External {
265 posting_offset: u64,
266 posting_len: u32,
267 doc_freq: u32,
268 position_offset: u64,
270 position_len: u32,
272 },
273}
274
275impl TermInfo {
276 pub fn external(posting_offset: u64, posting_len: u32, doc_freq: u32) -> Self {
278 TermInfo::External {
279 posting_offset,
280 posting_len,
281 doc_freq,
282 position_offset: 0,
283 position_len: 0,
284 }
285 }
286
287 pub fn external_with_positions(
289 posting_offset: u64,
290 posting_len: u32,
291 doc_freq: u32,
292 position_offset: u64,
293 position_len: u32,
294 ) -> Self {
295 TermInfo::External {
296 posting_offset,
297 posting_len,
298 doc_freq,
299 position_offset,
300 position_len,
301 }
302 }
303
304 pub fn try_inline(doc_ids: &[u32], term_freqs: &[u32]) -> Option<Self> {
307 if doc_ids.len() > MAX_INLINE_POSTINGS || doc_ids.is_empty() {
308 return None;
309 }
310
311 let mut data = [0u8; 16];
312 let mut cursor = std::io::Cursor::new(&mut data[..]);
313 let mut prev_doc_id = 0u32;
314
315 for (i, &doc_id) in doc_ids.iter().enumerate() {
316 let delta = doc_id - prev_doc_id;
317 if write_vint(&mut cursor, delta as u64).is_err() {
318 return None;
319 }
320 if write_vint(&mut cursor, term_freqs[i] as u64).is_err() {
321 return None;
322 }
323 prev_doc_id = doc_id;
324 }
325
326 let data_len = cursor.position() as u8;
327 if data_len > 16 {
328 return None;
329 }
330
331 Some(TermInfo::Inline {
332 doc_freq: doc_ids.len() as u8,
333 data,
334 data_len,
335 })
336 }
337
338 pub fn doc_freq(&self) -> u32 {
340 match self {
341 TermInfo::Inline { doc_freq, .. } => *doc_freq as u32,
342 TermInfo::External { doc_freq, .. } => *doc_freq,
343 }
344 }
345
346 pub fn is_inline(&self) -> bool {
348 matches!(self, TermInfo::Inline { .. })
349 }
350
351 pub fn external_info(&self) -> Option<(u64, u32)> {
353 match self {
354 TermInfo::External {
355 posting_offset,
356 posting_len,
357 ..
358 } => Some((*posting_offset, *posting_len)),
359 TermInfo::Inline { .. } => None,
360 }
361 }
362
363 pub fn position_info(&self) -> Option<(u64, u32)> {
365 match self {
366 TermInfo::External {
367 position_offset,
368 position_len,
369 ..
370 } if *position_len > 0 => Some((*position_offset, *position_len)),
371 _ => None,
372 }
373 }
374
375 pub fn decode_inline(&self) -> Option<(Vec<u32>, Vec<u32>)> {
378 match self {
379 TermInfo::Inline {
380 doc_freq,
381 data,
382 data_len,
383 } => {
384 let mut doc_ids = Vec::with_capacity(*doc_freq as usize);
385 let mut term_freqs = Vec::with_capacity(*doc_freq as usize);
386 let mut reader = &data[..*data_len as usize];
387 let mut prev_doc_id = 0u32;
388
389 for _ in 0..*doc_freq {
390 let delta = read_vint(&mut reader).ok()? as u32;
391 let tf = read_vint(&mut reader).ok()? as u32;
392 let doc_id = prev_doc_id + delta;
393 doc_ids.push(doc_id);
394 term_freqs.push(tf);
395 prev_doc_id = doc_id;
396 }
397
398 Some((doc_ids, term_freqs))
399 }
400 TermInfo::External { .. } => None,
401 }
402 }
403}
404
405impl SSTableValue for TermInfo {
406 fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
407 match self {
408 TermInfo::Inline {
409 doc_freq,
410 data,
411 data_len,
412 } => {
413 writer.write_u8(0xFF)?;
415 writer.write_u8(*doc_freq)?;
416 writer.write_u8(*data_len)?;
417 writer.write_all(&data[..*data_len as usize])?;
418 }
419 TermInfo::External {
420 posting_offset,
421 posting_len,
422 doc_freq,
423 position_offset,
424 position_len,
425 } => {
426 if *position_len > 0 {
429 writer.write_u8(0x01)?;
430 write_vint(writer, *doc_freq as u64)?;
431 write_vint(writer, *posting_offset)?;
432 write_vint(writer, *posting_len as u64)?;
433 write_vint(writer, *position_offset)?;
434 write_vint(writer, *position_len as u64)?;
435 } else {
436 writer.write_u8(0x00)?;
437 write_vint(writer, *doc_freq as u64)?;
438 write_vint(writer, *posting_offset)?;
439 write_vint(writer, *posting_len as u64)?;
440 }
441 }
442 }
443 Ok(())
444 }
445
446 fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
447 let tag = reader.read_u8()?;
448
449 if tag == 0xFF {
450 let doc_freq = reader.read_u8()?;
452 let data_len = reader.read_u8()?;
453 let mut data = [0u8; 16];
454 reader.read_exact(&mut data[..data_len as usize])?;
455 Ok(TermInfo::Inline {
456 doc_freq,
457 data,
458 data_len,
459 })
460 } else if tag == 0x00 {
461 let doc_freq = read_vint(reader)? as u32;
463 let posting_offset = read_vint(reader)?;
464 let posting_len = read_vint(reader)? as u32;
465 Ok(TermInfo::External {
466 posting_offset,
467 posting_len,
468 doc_freq,
469 position_offset: 0,
470 position_len: 0,
471 })
472 } else if tag == 0x01 {
473 let doc_freq = read_vint(reader)? as u32;
475 let posting_offset = read_vint(reader)?;
476 let posting_len = read_vint(reader)? as u32;
477 let position_offset = read_vint(reader)?;
478 let position_len = read_vint(reader)? as u32;
479 Ok(TermInfo::External {
480 posting_offset,
481 posting_len,
482 doc_freq,
483 position_offset,
484 position_len,
485 })
486 } else {
487 Err(io::Error::new(
488 io::ErrorKind::InvalidData,
489 format!("Invalid TermInfo tag: {}", tag),
490 ))
491 }
492 }
493}
494
495pub fn write_vint<W: Write + ?Sized>(writer: &mut W, mut value: u64) -> io::Result<()> {
497 loop {
498 let byte = (value & 0x7F) as u8;
499 value >>= 7;
500 if value == 0 {
501 writer.write_u8(byte)?;
502 return Ok(());
503 } else {
504 writer.write_u8(byte | 0x80)?;
505 }
506 }
507}
508
509pub fn read_vint<R: Read>(reader: &mut R) -> io::Result<u64> {
511 let mut result = 0u64;
512 let mut shift = 0;
513
514 loop {
515 let byte = reader.read_u8()?;
516 result |= ((byte & 0x7F) as u64) << shift;
517 if byte & 0x80 == 0 {
518 return Ok(result);
519 }
520 shift += 7;
521 if shift >= 64 {
522 return Err(io::Error::new(
523 io::ErrorKind::InvalidData,
524 "varint too long",
525 ));
526 }
527 }
528}
529
530pub fn common_prefix_len(a: &[u8], b: &[u8]) -> usize {
532 a.iter().zip(b.iter()).take_while(|(x, y)| x == y).count()
533}
534
535#[derive(Debug, Clone)]
537pub struct SSTableStats {
538 pub num_blocks: usize,
539 pub num_sparse_entries: usize,
540 pub num_entries: u64,
541 pub has_bloom_filter: bool,
542 pub has_dictionary: bool,
543 pub bloom_filter_size: usize,
544 pub dictionary_size: usize,
545}
546
547#[derive(Debug, Clone)]
549pub struct SSTableWriterConfig {
550 pub compression_level: CompressionLevel,
552 pub use_dictionary: bool,
554 pub dict_size: usize,
556 pub use_bloom_filter: bool,
558 pub bloom_bits_per_key: usize,
560}
561
562impl Default for SSTableWriterConfig {
563 fn default() -> Self {
564 Self::from_optimization(crate::structures::IndexOptimization::default())
565 }
566}
567
568impl SSTableWriterConfig {
569 pub fn from_optimization(optimization: crate::structures::IndexOptimization) -> Self {
571 use crate::structures::IndexOptimization;
572 match optimization {
573 IndexOptimization::Adaptive => Self {
574 compression_level: CompressionLevel::BETTER, use_dictionary: false,
576 dict_size: DEFAULT_DICT_SIZE,
577 use_bloom_filter: false,
578 bloom_bits_per_key: BLOOM_BITS_PER_KEY,
579 },
580 IndexOptimization::SizeOptimized => Self {
581 compression_level: CompressionLevel::MAX, use_dictionary: true,
583 dict_size: DEFAULT_DICT_SIZE,
584 use_bloom_filter: true,
585 bloom_bits_per_key: BLOOM_BITS_PER_KEY,
586 },
587 IndexOptimization::PerformanceOptimized => Self {
588 compression_level: CompressionLevel::FAST, use_dictionary: false,
590 dict_size: DEFAULT_DICT_SIZE,
591 use_bloom_filter: true, bloom_bits_per_key: BLOOM_BITS_PER_KEY,
593 },
594 }
595 }
596
597 pub fn fast() -> Self {
599 Self::from_optimization(crate::structures::IndexOptimization::PerformanceOptimized)
600 }
601
602 pub fn max_compression() -> Self {
604 Self::from_optimization(crate::structures::IndexOptimization::SizeOptimized)
605 }
606}
607
608pub struct SSTableWriter<'a, V: SSTableValue> {
614 writer: &'a mut dyn Write,
615 block_buffer: Vec<u8>,
616 prev_key: Vec<u8>,
617 index: Vec<BlockIndexEntry>,
618 current_offset: u64,
619 num_entries: u64,
620 block_first_key: Option<Vec<u8>>,
621 config: SSTableWriterConfig,
622 dictionary: Option<CompressionDict>,
624 all_keys: Vec<Vec<u8>>,
626 bloom_filter: Option<BloomFilter>,
628 _phantom: std::marker::PhantomData<V>,
629}
630
631impl<'a, V: SSTableValue> SSTableWriter<'a, V> {
632 pub fn new(writer: &'a mut dyn Write) -> Self {
634 Self::with_config(writer, SSTableWriterConfig::default())
635 }
636
637 pub fn with_config(writer: &'a mut dyn Write, config: SSTableWriterConfig) -> Self {
639 Self {
640 writer,
641 block_buffer: Vec::with_capacity(BLOCK_SIZE),
642 prev_key: Vec::new(),
643 index: Vec::new(),
644 current_offset: 0,
645 num_entries: 0,
646 block_first_key: None,
647 config,
648 dictionary: None,
649 all_keys: Vec::new(),
650 bloom_filter: None,
651 _phantom: std::marker::PhantomData,
652 }
653 }
654
655 pub fn with_dictionary(
657 writer: &'a mut dyn Write,
658 config: SSTableWriterConfig,
659 dictionary: CompressionDict,
660 ) -> Self {
661 Self {
662 writer,
663 block_buffer: Vec::with_capacity(BLOCK_SIZE),
664 prev_key: Vec::new(),
665 index: Vec::new(),
666 current_offset: 0,
667 num_entries: 0,
668 block_first_key: None,
669 config,
670 dictionary: Some(dictionary),
671 all_keys: Vec::new(),
672 bloom_filter: None,
673 _phantom: std::marker::PhantomData,
674 }
675 }
676
677 pub fn insert(&mut self, key: &[u8], value: &V) -> io::Result<()> {
678 if self.block_first_key.is_none() {
679 self.block_first_key = Some(key.to_vec());
680 }
681
682 if self.config.use_bloom_filter {
684 self.all_keys.push(key.to_vec());
685 }
686
687 let prefix_len = common_prefix_len(&self.prev_key, key);
688 let suffix = &key[prefix_len..];
689
690 write_vint(&mut self.block_buffer, prefix_len as u64)?;
691 write_vint(&mut self.block_buffer, suffix.len() as u64)?;
692 self.block_buffer.extend_from_slice(suffix);
693 value.serialize(&mut self.block_buffer)?;
694
695 self.prev_key.clear();
696 self.prev_key.extend_from_slice(key);
697 self.num_entries += 1;
698
699 if self.block_buffer.len() >= BLOCK_SIZE {
700 self.flush_block()?;
701 }
702
703 Ok(())
704 }
705
706 fn flush_block(&mut self) -> io::Result<()> {
708 if self.block_buffer.is_empty() {
709 return Ok(());
710 }
711
712 let compressed = if let Some(ref dict) = self.dictionary {
714 crate::compression::compress_with_dict(
715 &self.block_buffer,
716 self.config.compression_level,
717 dict,
718 )?
719 } else {
720 crate::compression::compress(&self.block_buffer, self.config.compression_level)?
721 };
722
723 if let Some(first_key) = self.block_first_key.take() {
724 self.index.push(BlockIndexEntry {
725 first_key,
726 offset: self.current_offset,
727 length: compressed.len() as u32,
728 });
729 }
730
731 self.writer.write_all(&compressed)?;
732 self.current_offset += compressed.len() as u64;
733 self.block_buffer.clear();
734 self.prev_key.clear();
735
736 Ok(())
737 }
738
739 pub fn finish(mut self) -> io::Result<()> {
740 self.flush_block()?;
742
743 if self.config.use_bloom_filter && !self.all_keys.is_empty() {
745 let mut bloom = BloomFilter::new(self.all_keys.len(), self.config.bloom_bits_per_key);
746 for key in &self.all_keys {
747 bloom.insert(key);
748 }
749 self.bloom_filter = Some(bloom);
750 }
751
752 let data_end_offset = self.current_offset;
753
754 let entries: Vec<(Vec<u8>, BlockAddr)> = self
757 .index
758 .iter()
759 .map(|e| {
760 (
761 e.first_key.clone(),
762 BlockAddr {
763 offset: e.offset,
764 length: e.length,
765 },
766 )
767 })
768 .collect();
769
770 #[cfg(feature = "native")]
772 let index_bytes = FstBlockIndex::build(&entries)?;
773 #[cfg(not(feature = "native"))]
774 let index_bytes = MmapBlockIndex::build(&entries)?;
775
776 self.writer
778 .write_u32::<LittleEndian>(index_bytes.len() as u32)?;
779 self.writer.write_all(&index_bytes)?;
780 self.current_offset += 4 + index_bytes.len() as u64;
781
782 let bloom_offset = if let Some(ref bloom) = self.bloom_filter {
784 let bloom_data = bloom.to_bytes();
785 let offset = self.current_offset;
786 self.writer.write_all(&bloom_data)?;
787 self.current_offset += bloom_data.len() as u64;
788 offset
789 } else {
790 0
791 };
792
793 let dict_offset = if let Some(ref dict) = self.dictionary {
795 let dict_bytes = dict.as_bytes();
796 let offset = self.current_offset;
797 self.writer
798 .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
799 self.writer.write_all(dict_bytes)?;
800 self.current_offset += 4 + dict_bytes.len() as u64;
801 offset
802 } else {
803 0
804 };
805
806 self.writer.write_u64::<LittleEndian>(data_end_offset)?;
808 self.writer.write_u64::<LittleEndian>(self.num_entries)?;
809 self.writer.write_u64::<LittleEndian>(bloom_offset)?; self.writer.write_u64::<LittleEndian>(dict_offset)?; self.writer
812 .write_u8(self.config.compression_level.0 as u8)?;
813 self.writer.write_u32::<LittleEndian>(SSTABLE_MAGIC)?;
814
815 Ok(())
816 }
817}
818
819#[derive(Debug, Clone)]
821struct BlockIndexEntry {
822 first_key: Vec<u8>,
823 offset: u64,
824 length: u32,
825}
826
827pub struct AsyncSSTableReader<V: SSTableValue> {
834 data_slice: LazyFileSlice,
836 block_index: BlockIndex,
838 num_entries: u64,
839 cache: RwLock<BlockCache>,
841 bloom_filter: Option<BloomFilter>,
843 dictionary: Option<CompressionDict>,
845 #[allow(dead_code)]
847 compression_level: CompressionLevel,
848 _phantom: std::marker::PhantomData<V>,
849}
850
851struct BlockCache {
853 blocks: FxHashMap<u64, Arc<Vec<u8>>>,
854 access_order: Vec<u64>,
855 max_blocks: usize,
856}
857
858impl BlockCache {
859 fn new(max_blocks: usize) -> Self {
860 Self {
861 blocks: FxHashMap::default(),
862 access_order: Vec::new(),
863 max_blocks,
864 }
865 }
866
867 fn get(&mut self, offset: u64) -> Option<Arc<Vec<u8>>> {
868 if let Some(block) = self.blocks.get(&offset) {
869 if let Some(pos) = self.access_order.iter().position(|&o| o == offset) {
870 self.access_order.remove(pos);
871 self.access_order.push(offset);
872 }
873 Some(Arc::clone(block))
874 } else {
875 None
876 }
877 }
878
879 fn insert(&mut self, offset: u64, block: Arc<Vec<u8>>) {
880 while self.blocks.len() >= self.max_blocks && !self.access_order.is_empty() {
881 let evict_offset = self.access_order.remove(0);
882 self.blocks.remove(&evict_offset);
883 }
884 self.blocks.insert(offset, block);
885 self.access_order.push(offset);
886 }
887}
888
889impl<V: SSTableValue> AsyncSSTableReader<V> {
890 pub async fn open(file_handle: LazyFileHandle, cache_blocks: usize) -> io::Result<Self> {
897 let file_len = file_handle.len();
898 if file_len < 37 {
899 return Err(io::Error::new(
900 io::ErrorKind::InvalidData,
901 "SSTable too small",
902 ));
903 }
904
905 let footer_bytes = file_handle
908 .read_bytes_range(file_len - 37..file_len)
909 .await?;
910
911 let mut reader = footer_bytes.as_slice();
912 let data_end_offset = reader.read_u64::<LittleEndian>()?;
913 let num_entries = reader.read_u64::<LittleEndian>()?;
914 let bloom_offset = reader.read_u64::<LittleEndian>()?;
915 let dict_offset = reader.read_u64::<LittleEndian>()?;
916 let compression_level = CompressionLevel(reader.read_u8()? as i32);
917 let magic = reader.read_u32::<LittleEndian>()?;
918
919 if magic != SSTABLE_MAGIC {
920 return Err(io::Error::new(
921 io::ErrorKind::InvalidData,
922 format!("Invalid SSTable magic: 0x{:08X}", magic),
923 ));
924 }
925
926 let index_start = data_end_offset;
928 let index_end = file_len - 37;
929 let index_bytes = file_handle.read_bytes_range(index_start..index_end).await?;
930
931 let mut idx_reader = index_bytes.as_slice();
933 let index_len = idx_reader.read_u32::<LittleEndian>()? as usize;
934
935 if index_len > idx_reader.len() {
936 return Err(io::Error::new(
937 io::ErrorKind::InvalidData,
938 "Index data truncated",
939 ));
940 }
941
942 let index_data = OwnedBytes::new(idx_reader[..index_len].to_vec());
943
944 #[cfg(feature = "native")]
946 let block_index = match FstBlockIndex::load(index_data.clone()) {
947 Ok(fst_idx) => BlockIndex::Fst(fst_idx),
948 Err(_) => BlockIndex::Mmap(MmapBlockIndex::load(index_data)?),
949 };
950 #[cfg(not(feature = "native"))]
951 let block_index = BlockIndex::Mmap(MmapBlockIndex::load(index_data)?);
952
953 let bloom_filter = if bloom_offset > 0 {
955 let bloom_start = bloom_offset;
956 let bloom_header = file_handle
958 .read_bytes_range(bloom_start..bloom_start + 12)
959 .await?;
960 let num_words = u32::from_le_bytes([
961 bloom_header[8],
962 bloom_header[9],
963 bloom_header[10],
964 bloom_header[11],
965 ]) as u64;
966 let bloom_size = 12 + num_words * 8;
967 let bloom_data = file_handle
968 .read_bytes_range(bloom_start..bloom_start + bloom_size)
969 .await?;
970 Some(BloomFilter::from_bytes(&bloom_data)?)
971 } else {
972 None
973 };
974
975 let dictionary = if dict_offset > 0 {
977 let dict_start = dict_offset;
978 let dict_len_bytes = file_handle
980 .read_bytes_range(dict_start..dict_start + 4)
981 .await?;
982 let dict_len = u32::from_le_bytes([
983 dict_len_bytes[0],
984 dict_len_bytes[1],
985 dict_len_bytes[2],
986 dict_len_bytes[3],
987 ]) as u64;
988 let dict_data = file_handle
989 .read_bytes_range(dict_start + 4..dict_start + 4 + dict_len)
990 .await?;
991 Some(CompressionDict::from_bytes(dict_data.to_vec()))
992 } else {
993 None
994 };
995
996 let data_slice = file_handle.slice(0..data_end_offset);
998
999 Ok(Self {
1000 data_slice,
1001 block_index,
1002 num_entries,
1003 cache: RwLock::new(BlockCache::new(cache_blocks)),
1004 bloom_filter,
1005 dictionary,
1006 compression_level,
1007 _phantom: std::marker::PhantomData,
1008 })
1009 }
1010
1011 pub fn num_entries(&self) -> u64 {
1013 self.num_entries
1014 }
1015
1016 pub fn stats(&self) -> SSTableStats {
1018 SSTableStats {
1019 num_blocks: self.block_index.len(),
1020 num_sparse_entries: 0, num_entries: self.num_entries,
1022 has_bloom_filter: self.bloom_filter.is_some(),
1023 has_dictionary: self.dictionary.is_some(),
1024 bloom_filter_size: self
1025 .bloom_filter
1026 .as_ref()
1027 .map(|b| b.size_bytes())
1028 .unwrap_or(0),
1029 dictionary_size: self.dictionary.as_ref().map(|d| d.len()).unwrap_or(0),
1030 }
1031 }
1032
1033 pub async fn get(&self, key: &[u8]) -> io::Result<Option<V>> {
1038 log::debug!(
1039 "SSTable::get called, key_len={}, total_blocks={}",
1040 key.len(),
1041 self.block_index.len()
1042 );
1043
1044 if let Some(ref bloom) = self.bloom_filter
1046 && !bloom.may_contain(key)
1047 {
1048 log::debug!("SSTable::get bloom filter negative");
1049 return Ok(None);
1050 }
1051
1052 let block_idx = match self.block_index.locate(key) {
1054 Some(idx) => idx,
1055 None => {
1056 log::debug!("SSTable::get key not found (before first block)");
1057 return Ok(None);
1058 }
1059 };
1060
1061 log::debug!("SSTable::get loading block_idx={}", block_idx);
1062
1063 let block_data = self.load_block(block_idx).await?;
1065 self.search_block(&block_data, key)
1066 }
1067
1068 pub async fn get_batch(&self, keys: &[&[u8]]) -> io::Result<Vec<Option<V>>> {
1074 if keys.is_empty() {
1075 return Ok(Vec::new());
1076 }
1077
1078 let mut key_to_block: Vec<(usize, usize)> = Vec::with_capacity(keys.len());
1080 for (key_idx, key) in keys.iter().enumerate() {
1081 if let Some(ref bloom) = self.bloom_filter
1083 && !bloom.may_contain(key)
1084 {
1085 key_to_block.push((key_idx, usize::MAX)); continue;
1087 }
1088
1089 match self.block_index.locate(key) {
1090 Some(block_idx) => key_to_block.push((key_idx, block_idx)),
1091 None => key_to_block.push((key_idx, usize::MAX)), }
1093 }
1094
1095 let mut blocks_to_load: Vec<usize> = key_to_block
1097 .iter()
1098 .filter(|(_, b)| *b != usize::MAX)
1099 .map(|(_, b)| *b)
1100 .collect();
1101 blocks_to_load.sort_unstable();
1102 blocks_to_load.dedup();
1103
1104 for &block_idx in &blocks_to_load {
1106 let _ = self.load_block(block_idx).await?;
1107 }
1108
1109 let mut results = vec![None; keys.len()];
1111 for (key_idx, block_idx) in key_to_block {
1112 if block_idx == usize::MAX {
1113 continue;
1114 }
1115 let block_data = self.load_block(block_idx).await?; results[key_idx] = self.search_block(&block_data, keys[key_idx])?;
1117 }
1118
1119 Ok(results)
1120 }
1121
1122 pub async fn preload_all_blocks(&self) -> io::Result<()> {
1127 for block_idx in 0..self.block_index.len() {
1128 self.load_block(block_idx).await?;
1129 }
1130 Ok(())
1131 }
1132
1133 async fn load_block(&self, block_idx: usize) -> io::Result<Arc<Vec<u8>>> {
1136 let addr = self.block_index.get_addr(block_idx).ok_or_else(|| {
1137 io::Error::new(io::ErrorKind::InvalidInput, "Block index out of range")
1138 })?;
1139
1140 {
1142 let mut cache = self.cache.write();
1143 if let Some(block) = cache.get(addr.offset) {
1144 log::debug!("SSTable::load_block idx={} CACHE HIT", block_idx);
1145 return Ok(block);
1146 }
1147 }
1148
1149 log::debug!(
1150 "SSTable::load_block idx={} CACHE MISS, reading bytes [{}-{}]",
1151 block_idx,
1152 addr.offset,
1153 addr.offset + addr.length as u64
1154 );
1155
1156 let range = addr.byte_range();
1158 let compressed = self.data_slice.read_bytes_range(range).await?;
1159
1160 let decompressed = if let Some(ref dict) = self.dictionary {
1162 crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
1163 } else {
1164 crate::compression::decompress(compressed.as_slice())?
1165 };
1166
1167 let block = Arc::new(decompressed);
1168
1169 {
1171 let mut cache = self.cache.write();
1172 cache.insert(addr.offset, Arc::clone(&block));
1173 }
1174
1175 Ok(block)
1176 }
1177
1178 fn search_block(&self, block_data: &[u8], target_key: &[u8]) -> io::Result<Option<V>> {
1179 let mut reader = block_data;
1180 let mut current_key = Vec::new();
1181
1182 while !reader.is_empty() {
1183 let common_prefix_len = read_vint(&mut reader)? as usize;
1184 let suffix_len = read_vint(&mut reader)? as usize;
1185
1186 current_key.truncate(common_prefix_len);
1187 let mut suffix = vec![0u8; suffix_len];
1188 reader.read_exact(&mut suffix)?;
1189 current_key.extend_from_slice(&suffix);
1190
1191 let value = V::deserialize(&mut reader)?;
1192
1193 match current_key.as_slice().cmp(target_key) {
1194 std::cmp::Ordering::Equal => return Ok(Some(value)),
1195 std::cmp::Ordering::Greater => return Ok(None),
1196 std::cmp::Ordering::Less => continue,
1197 }
1198 }
1199
1200 Ok(None)
1201 }
1202
1203 pub async fn prefetch_range(&self, start_key: &[u8], end_key: &[u8]) -> io::Result<()> {
1205 let start_block = self.block_index.locate(start_key).unwrap_or(0);
1206 let end_block = self
1207 .block_index
1208 .locate(end_key)
1209 .unwrap_or(self.block_index.len().saturating_sub(1));
1210
1211 for block_idx in start_block..=end_block.min(self.block_index.len().saturating_sub(1)) {
1212 let _ = self.load_block(block_idx).await?;
1213 }
1214
1215 Ok(())
1216 }
1217
1218 pub fn iter(&self) -> AsyncSSTableIterator<'_, V> {
1220 AsyncSSTableIterator::new(self)
1221 }
1222
1223 pub async fn all_entries(&self) -> io::Result<Vec<(Vec<u8>, V)>> {
1225 let mut results = Vec::new();
1226
1227 for block_idx in 0..self.block_index.len() {
1228 let block_data = self.load_block(block_idx).await?;
1229 let mut reader = block_data.as_slice();
1230 let mut current_key = Vec::new();
1231
1232 while !reader.is_empty() {
1233 let common_prefix_len = read_vint(&mut reader)? as usize;
1234 let suffix_len = read_vint(&mut reader)? as usize;
1235
1236 current_key.truncate(common_prefix_len);
1237 let mut suffix = vec![0u8; suffix_len];
1238 reader.read_exact(&mut suffix)?;
1239 current_key.extend_from_slice(&suffix);
1240
1241 let value = V::deserialize(&mut reader)?;
1242 results.push((current_key.clone(), value));
1243 }
1244 }
1245
1246 Ok(results)
1247 }
1248}
1249
1250pub struct AsyncSSTableIterator<'a, V: SSTableValue> {
1252 reader: &'a AsyncSSTableReader<V>,
1253 current_block: usize,
1254 block_data: Option<Arc<Vec<u8>>>,
1255 block_offset: usize,
1256 current_key: Vec<u8>,
1257 finished: bool,
1258}
1259
1260impl<'a, V: SSTableValue> AsyncSSTableIterator<'a, V> {
1261 fn new(reader: &'a AsyncSSTableReader<V>) -> Self {
1262 Self {
1263 reader,
1264 current_block: 0,
1265 block_data: None,
1266 block_offset: 0,
1267 current_key: Vec::new(),
1268 finished: reader.block_index.is_empty(),
1269 }
1270 }
1271
1272 async fn load_next_block(&mut self) -> io::Result<bool> {
1273 if self.current_block >= self.reader.block_index.len() {
1274 self.finished = true;
1275 return Ok(false);
1276 }
1277
1278 self.block_data = Some(self.reader.load_block(self.current_block).await?);
1279 self.block_offset = 0;
1280 self.current_key.clear();
1281 self.current_block += 1;
1282 Ok(true)
1283 }
1284
1285 pub async fn next(&mut self) -> io::Result<Option<(Vec<u8>, V)>> {
1287 if self.finished {
1288 return Ok(None);
1289 }
1290
1291 if self.block_data.is_none() && !self.load_next_block().await? {
1292 return Ok(None);
1293 }
1294
1295 loop {
1296 let block = self.block_data.as_ref().unwrap();
1297 if self.block_offset >= block.len() {
1298 if !self.load_next_block().await? {
1299 return Ok(None);
1300 }
1301 continue;
1302 }
1303
1304 let mut reader = &block[self.block_offset..];
1305 let start_len = reader.len();
1306
1307 let common_prefix_len = read_vint(&mut reader)? as usize;
1308 let suffix_len = read_vint(&mut reader)? as usize;
1309
1310 self.current_key.truncate(common_prefix_len);
1311 let mut suffix = vec![0u8; suffix_len];
1312 reader.read_exact(&mut suffix)?;
1313 self.current_key.extend_from_slice(&suffix);
1314
1315 let value = V::deserialize(&mut reader)?;
1316
1317 self.block_offset += start_len - reader.len();
1318
1319 return Ok(Some((self.current_key.clone(), value)));
1320 }
1321 }
1322}
1323
1324#[cfg(test)]
1325mod tests {
1326 use super::*;
1327
1328 #[test]
1329 fn test_bloom_filter_basic() {
1330 let mut bloom = BloomFilter::new(100, 10);
1331
1332 bloom.insert(b"hello");
1333 bloom.insert(b"world");
1334 bloom.insert(b"test");
1335
1336 assert!(bloom.may_contain(b"hello"));
1337 assert!(bloom.may_contain(b"world"));
1338 assert!(bloom.may_contain(b"test"));
1339
1340 assert!(!bloom.may_contain(b"notfound"));
1342 assert!(!bloom.may_contain(b"missing"));
1343 }
1344
1345 #[test]
1346 fn test_bloom_filter_serialization() {
1347 let mut bloom = BloomFilter::new(100, 10);
1348 bloom.insert(b"key1");
1349 bloom.insert(b"key2");
1350
1351 let bytes = bloom.to_bytes();
1352 let restored = BloomFilter::from_bytes(&bytes).unwrap();
1353
1354 assert!(restored.may_contain(b"key1"));
1355 assert!(restored.may_contain(b"key2"));
1356 assert!(!restored.may_contain(b"key3"));
1357 }
1358
1359 #[test]
1360 fn test_bloom_filter_false_positive_rate() {
1361 let num_keys = 10000;
1362 let mut bloom = BloomFilter::new(num_keys, BLOOM_BITS_PER_KEY);
1363
1364 for i in 0..num_keys {
1366 let key = format!("key_{}", i);
1367 bloom.insert(key.as_bytes());
1368 }
1369
1370 for i in 0..num_keys {
1372 let key = format!("key_{}", i);
1373 assert!(bloom.may_contain(key.as_bytes()));
1374 }
1375
1376 let mut false_positives = 0;
1378 let test_count = 10000;
1379 for i in 0..test_count {
1380 let key = format!("nonexistent_{}", i);
1381 if bloom.may_contain(key.as_bytes()) {
1382 false_positives += 1;
1383 }
1384 }
1385
1386 let fp_rate = false_positives as f64 / test_count as f64;
1389 assert!(
1390 fp_rate < 0.03,
1391 "False positive rate {} is too high",
1392 fp_rate
1393 );
1394 }
1395
1396 #[test]
1397 fn test_sstable_writer_config() {
1398 use crate::structures::IndexOptimization;
1399
1400 let config = SSTableWriterConfig::default();
1402 assert_eq!(config.compression_level.0, 9); assert!(!config.use_bloom_filter);
1404 assert!(!config.use_dictionary);
1405
1406 let adaptive = SSTableWriterConfig::from_optimization(IndexOptimization::Adaptive);
1408 assert_eq!(adaptive.compression_level.0, 9);
1409 assert!(!adaptive.use_bloom_filter);
1410 assert!(!adaptive.use_dictionary);
1411
1412 let size = SSTableWriterConfig::from_optimization(IndexOptimization::SizeOptimized);
1414 assert_eq!(size.compression_level.0, 22); assert!(size.use_bloom_filter);
1416 assert!(size.use_dictionary);
1417
1418 let perf = SSTableWriterConfig::from_optimization(IndexOptimization::PerformanceOptimized);
1420 assert_eq!(perf.compression_level.0, 1); assert!(perf.use_bloom_filter); assert!(!perf.use_dictionary);
1423
1424 let fast = SSTableWriterConfig::fast();
1426 assert_eq!(fast.compression_level.0, 1);
1427
1428 let max = SSTableWriterConfig::max_compression();
1429 assert_eq!(max.compression_level.0, 22);
1430 }
1431
1432 #[test]
1433 fn test_vint_roundtrip() {
1434 let test_values = [0u64, 1, 127, 128, 255, 256, 16383, 16384, u64::MAX];
1435
1436 for &val in &test_values {
1437 let mut buf = Vec::new();
1438 write_vint(&mut buf, val).unwrap();
1439 let mut reader = buf.as_slice();
1440 let decoded = read_vint(&mut reader).unwrap();
1441 assert_eq!(val, decoded, "Failed for value {}", val);
1442 }
1443 }
1444
1445 #[test]
1446 fn test_common_prefix_len() {
1447 assert_eq!(common_prefix_len(b"hello", b"hello"), 5);
1448 assert_eq!(common_prefix_len(b"hello", b"help"), 3);
1449 assert_eq!(common_prefix_len(b"hello", b"world"), 0);
1450 assert_eq!(common_prefix_len(b"", b"hello"), 0);
1451 assert_eq!(common_prefix_len(b"hello", b""), 0);
1452 }
1453}