1use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
22use parking_lot::RwLock;
23use rustc_hash::FxHashMap;
24use std::io::{self, Read, Write};
25use std::sync::Arc;
26
27#[cfg(feature = "native")]
28use super::sstable_index::FstBlockIndex;
29use super::sstable_index::{BlockAddr, BlockIndex, MmapBlockIndex};
30use crate::compression::{CompressionDict, CompressionLevel};
31use crate::directories::{AsyncFileRead, LazyFileHandle, LazyFileSlice, OwnedBytes};
32
33pub const SSTABLE_MAGIC: u32 = 0x53544234; pub const BLOCK_SIZE: usize = 16 * 1024;
39
40pub const DEFAULT_DICT_SIZE: usize = 64 * 1024;
42
43pub const BLOOM_BITS_PER_KEY: usize = 10;
45
46pub const BLOOM_HASH_COUNT: usize = 7;
48
49#[derive(Debug, Clone)]
55pub struct BloomFilter {
56 bits: Vec<u64>,
57 num_bits: usize,
58 num_hashes: usize,
59}
60
61impl BloomFilter {
62 pub fn new(expected_keys: usize, bits_per_key: usize) -> Self {
64 let num_bits = (expected_keys * bits_per_key).max(64);
65 let num_words = num_bits.div_ceil(64);
66 Self {
67 bits: vec![0u64; num_words],
68 num_bits,
69 num_hashes: BLOOM_HASH_COUNT,
70 }
71 }
72
73 pub fn from_bytes(data: &[u8]) -> io::Result<Self> {
75 if data.len() < 12 {
76 return Err(io::Error::new(
77 io::ErrorKind::InvalidData,
78 "Bloom filter data too short",
79 ));
80 }
81 let mut reader = data;
82 let num_bits = reader.read_u32::<LittleEndian>()? as usize;
83 let num_hashes = reader.read_u32::<LittleEndian>()? as usize;
84 let num_words = reader.read_u32::<LittleEndian>()? as usize;
85
86 if reader.len() < num_words * 8 {
87 return Err(io::Error::new(
88 io::ErrorKind::InvalidData,
89 "Bloom filter data truncated",
90 ));
91 }
92
93 let mut bits = Vec::with_capacity(num_words);
94 for _ in 0..num_words {
95 bits.push(reader.read_u64::<LittleEndian>()?);
96 }
97
98 Ok(Self {
99 bits,
100 num_bits,
101 num_hashes,
102 })
103 }
104
105 pub fn to_bytes(&self) -> Vec<u8> {
107 let mut data = Vec::with_capacity(12 + self.bits.len() * 8);
108 data.write_u32::<LittleEndian>(self.num_bits as u32)
109 .unwrap();
110 data.write_u32::<LittleEndian>(self.num_hashes as u32)
111 .unwrap();
112 data.write_u32::<LittleEndian>(self.bits.len() as u32)
113 .unwrap();
114 for &word in &self.bits {
115 data.write_u64::<LittleEndian>(word).unwrap();
116 }
117 data
118 }
119
120 pub fn insert(&mut self, key: &[u8]) {
122 let (h1, h2) = self.hash_pair(key);
123 for i in 0..self.num_hashes {
124 let bit_pos = self.get_bit_pos(h1, h2, i);
125 let word_idx = bit_pos / 64;
126 let bit_idx = bit_pos % 64;
127 if word_idx < self.bits.len() {
128 self.bits[word_idx] |= 1u64 << bit_idx;
129 }
130 }
131 }
132
133 pub fn may_contain(&self, key: &[u8]) -> bool {
136 let (h1, h2) = self.hash_pair(key);
137 for i in 0..self.num_hashes {
138 let bit_pos = self.get_bit_pos(h1, h2, i);
139 let word_idx = bit_pos / 64;
140 let bit_idx = bit_pos % 64;
141 if word_idx >= self.bits.len() || (self.bits[word_idx] & (1u64 << bit_idx)) == 0 {
142 return false;
143 }
144 }
145 true
146 }
147
148 pub fn size_bytes(&self) -> usize {
150 12 + self.bits.len() * 8
151 }
152
153 fn hash_pair(&self, key: &[u8]) -> (u64, u64) {
155 let mut h1: u64 = 0xcbf29ce484222325;
157 for &byte in key {
158 h1 ^= byte as u64;
159 h1 = h1.wrapping_mul(0x100000001b3);
160 }
161
162 let mut h2: u64 = 0x84222325cbf29ce4;
164 for &byte in key {
165 h2 = h2.wrapping_mul(0x100000001b3);
166 h2 ^= byte as u64;
167 }
168
169 (h1, h2)
170 }
171
172 #[inline]
174 fn get_bit_pos(&self, h1: u64, h2: u64, i: usize) -> usize {
175 (h1.wrapping_add((i as u64).wrapping_mul(h2)) % (self.num_bits as u64)) as usize
176 }
177}
178
179pub trait SSTableValue: Clone + Send + Sync {
181 fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()>;
182 fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self>;
183}
184
185impl SSTableValue for u64 {
187 fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
188 write_vint(writer, *self)
189 }
190
191 fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
192 read_vint(reader)
193 }
194}
195
196impl SSTableValue for Vec<u8> {
198 fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
199 write_vint(writer, self.len() as u64)?;
200 writer.write_all(self)
201 }
202
203 fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
204 let len = read_vint(reader)? as usize;
205 let mut data = vec![0u8; len];
206 reader.read_exact(&mut data)?;
207 Ok(data)
208 }
209}
210
211#[derive(Debug, Clone, Copy, PartialEq, Eq)]
214pub struct SparseDimInfo {
215 pub offset: u64,
217 pub length: u32,
219}
220
221impl SparseDimInfo {
222 pub fn new(offset: u64, length: u32) -> Self {
223 Self { offset, length }
224 }
225}
226
227impl SSTableValue for SparseDimInfo {
228 fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
229 write_vint(writer, self.offset)?;
230 write_vint(writer, self.length as u64)
231 }
232
233 fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
234 let offset = read_vint(reader)?;
235 let length = read_vint(reader)? as u32;
236 Ok(Self { offset, length })
237 }
238}
239
240pub const MAX_INLINE_POSTINGS: usize = 3;
242
243#[derive(Debug, Clone, PartialEq, Eq)]
251pub enum TermInfo {
252 Inline {
255 doc_freq: u8,
257 data: [u8; 16],
260 data_len: u8,
262 },
263 External {
265 posting_offset: u64,
266 posting_len: u32,
267 doc_freq: u32,
268 position_offset: u64,
270 position_len: u32,
272 },
273}
274
275impl TermInfo {
276 pub fn external(posting_offset: u64, posting_len: u32, doc_freq: u32) -> Self {
278 TermInfo::External {
279 posting_offset,
280 posting_len,
281 doc_freq,
282 position_offset: 0,
283 position_len: 0,
284 }
285 }
286
287 pub fn external_with_positions(
289 posting_offset: u64,
290 posting_len: u32,
291 doc_freq: u32,
292 position_offset: u64,
293 position_len: u32,
294 ) -> Self {
295 TermInfo::External {
296 posting_offset,
297 posting_len,
298 doc_freq,
299 position_offset,
300 position_len,
301 }
302 }
303
304 pub fn try_inline(doc_ids: &[u32], term_freqs: &[u32]) -> Option<Self> {
307 if doc_ids.len() > MAX_INLINE_POSTINGS || doc_ids.is_empty() {
308 return None;
309 }
310
311 let mut data = [0u8; 16];
312 let mut cursor = std::io::Cursor::new(&mut data[..]);
313 let mut prev_doc_id = 0u32;
314
315 for (i, &doc_id) in doc_ids.iter().enumerate() {
316 let delta = doc_id - prev_doc_id;
317 if write_vint(&mut cursor, delta as u64).is_err() {
318 return None;
319 }
320 if write_vint(&mut cursor, term_freqs[i] as u64).is_err() {
321 return None;
322 }
323 prev_doc_id = doc_id;
324 }
325
326 let data_len = cursor.position() as u8;
327 if data_len > 16 {
328 return None;
329 }
330
331 Some(TermInfo::Inline {
332 doc_freq: doc_ids.len() as u8,
333 data,
334 data_len,
335 })
336 }
337
338 pub fn try_inline_iter(count: usize, iter: impl Iterator<Item = (u32, u32)>) -> Option<Self> {
342 if count > MAX_INLINE_POSTINGS || count == 0 {
343 return None;
344 }
345
346 let mut data = [0u8; 16];
347 let mut cursor = std::io::Cursor::new(&mut data[..]);
348 let mut prev_doc_id = 0u32;
349
350 for (doc_id, tf) in iter {
351 let delta = doc_id - prev_doc_id;
352 if write_vint(&mut cursor, delta as u64).is_err() {
353 return None;
354 }
355 if write_vint(&mut cursor, tf as u64).is_err() {
356 return None;
357 }
358 prev_doc_id = doc_id;
359 }
360
361 let data_len = cursor.position() as u8;
362
363 Some(TermInfo::Inline {
364 doc_freq: count as u8,
365 data,
366 data_len,
367 })
368 }
369
370 pub fn doc_freq(&self) -> u32 {
372 match self {
373 TermInfo::Inline { doc_freq, .. } => *doc_freq as u32,
374 TermInfo::External { doc_freq, .. } => *doc_freq,
375 }
376 }
377
378 pub fn is_inline(&self) -> bool {
380 matches!(self, TermInfo::Inline { .. })
381 }
382
383 pub fn external_info(&self) -> Option<(u64, u32)> {
385 match self {
386 TermInfo::External {
387 posting_offset,
388 posting_len,
389 ..
390 } => Some((*posting_offset, *posting_len)),
391 TermInfo::Inline { .. } => None,
392 }
393 }
394
395 pub fn position_info(&self) -> Option<(u64, u32)> {
397 match self {
398 TermInfo::External {
399 position_offset,
400 position_len,
401 ..
402 } if *position_len > 0 => Some((*position_offset, *position_len)),
403 _ => None,
404 }
405 }
406
407 pub fn decode_inline(&self) -> Option<(Vec<u32>, Vec<u32>)> {
410 match self {
411 TermInfo::Inline {
412 doc_freq,
413 data,
414 data_len,
415 } => {
416 let mut doc_ids = Vec::with_capacity(*doc_freq as usize);
417 let mut term_freqs = Vec::with_capacity(*doc_freq as usize);
418 let mut reader = &data[..*data_len as usize];
419 let mut prev_doc_id = 0u32;
420
421 for _ in 0..*doc_freq {
422 let delta = read_vint(&mut reader).ok()? as u32;
423 let tf = read_vint(&mut reader).ok()? as u32;
424 let doc_id = prev_doc_id + delta;
425 doc_ids.push(doc_id);
426 term_freqs.push(tf);
427 prev_doc_id = doc_id;
428 }
429
430 Some((doc_ids, term_freqs))
431 }
432 TermInfo::External { .. } => None,
433 }
434 }
435}
436
437impl SSTableValue for TermInfo {
438 fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
439 match self {
440 TermInfo::Inline {
441 doc_freq,
442 data,
443 data_len,
444 } => {
445 writer.write_u8(0xFF)?;
447 writer.write_u8(*doc_freq)?;
448 writer.write_u8(*data_len)?;
449 writer.write_all(&data[..*data_len as usize])?;
450 }
451 TermInfo::External {
452 posting_offset,
453 posting_len,
454 doc_freq,
455 position_offset,
456 position_len,
457 } => {
458 if *position_len > 0 {
461 writer.write_u8(0x01)?;
462 write_vint(writer, *doc_freq as u64)?;
463 write_vint(writer, *posting_offset)?;
464 write_vint(writer, *posting_len as u64)?;
465 write_vint(writer, *position_offset)?;
466 write_vint(writer, *position_len as u64)?;
467 } else {
468 writer.write_u8(0x00)?;
469 write_vint(writer, *doc_freq as u64)?;
470 write_vint(writer, *posting_offset)?;
471 write_vint(writer, *posting_len as u64)?;
472 }
473 }
474 }
475 Ok(())
476 }
477
478 fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
479 let tag = reader.read_u8()?;
480
481 if tag == 0xFF {
482 let doc_freq = reader.read_u8()?;
484 let data_len = reader.read_u8()?;
485 let mut data = [0u8; 16];
486 reader.read_exact(&mut data[..data_len as usize])?;
487 Ok(TermInfo::Inline {
488 doc_freq,
489 data,
490 data_len,
491 })
492 } else if tag == 0x00 {
493 let doc_freq = read_vint(reader)? as u32;
495 let posting_offset = read_vint(reader)?;
496 let posting_len = read_vint(reader)? as u32;
497 Ok(TermInfo::External {
498 posting_offset,
499 posting_len,
500 doc_freq,
501 position_offset: 0,
502 position_len: 0,
503 })
504 } else if tag == 0x01 {
505 let doc_freq = read_vint(reader)? as u32;
507 let posting_offset = read_vint(reader)?;
508 let posting_len = read_vint(reader)? as u32;
509 let position_offset = read_vint(reader)?;
510 let position_len = read_vint(reader)? as u32;
511 Ok(TermInfo::External {
512 posting_offset,
513 posting_len,
514 doc_freq,
515 position_offset,
516 position_len,
517 })
518 } else {
519 Err(io::Error::new(
520 io::ErrorKind::InvalidData,
521 format!("Invalid TermInfo tag: {}", tag),
522 ))
523 }
524 }
525}
526
527pub fn write_vint<W: Write + ?Sized>(writer: &mut W, mut value: u64) -> io::Result<()> {
529 loop {
530 let byte = (value & 0x7F) as u8;
531 value >>= 7;
532 if value == 0 {
533 writer.write_u8(byte)?;
534 return Ok(());
535 } else {
536 writer.write_u8(byte | 0x80)?;
537 }
538 }
539}
540
541pub fn read_vint<R: Read>(reader: &mut R) -> io::Result<u64> {
543 let mut result = 0u64;
544 let mut shift = 0;
545
546 loop {
547 let byte = reader.read_u8()?;
548 result |= ((byte & 0x7F) as u64) << shift;
549 if byte & 0x80 == 0 {
550 return Ok(result);
551 }
552 shift += 7;
553 if shift >= 64 {
554 return Err(io::Error::new(
555 io::ErrorKind::InvalidData,
556 "varint too long",
557 ));
558 }
559 }
560}
561
562pub fn common_prefix_len(a: &[u8], b: &[u8]) -> usize {
564 a.iter().zip(b.iter()).take_while(|(x, y)| x == y).count()
565}
566
567#[derive(Debug, Clone)]
569pub struct SSTableStats {
570 pub num_blocks: usize,
571 pub num_sparse_entries: usize,
572 pub num_entries: u64,
573 pub has_bloom_filter: bool,
574 pub has_dictionary: bool,
575 pub bloom_filter_size: usize,
576 pub dictionary_size: usize,
577}
578
579#[derive(Debug, Clone)]
581pub struct SSTableWriterConfig {
582 pub compression_level: CompressionLevel,
584 pub use_dictionary: bool,
586 pub dict_size: usize,
588 pub use_bloom_filter: bool,
590 pub bloom_bits_per_key: usize,
592}
593
594impl Default for SSTableWriterConfig {
595 fn default() -> Self {
596 Self::from_optimization(crate::structures::IndexOptimization::default())
597 }
598}
599
600impl SSTableWriterConfig {
601 pub fn from_optimization(optimization: crate::structures::IndexOptimization) -> Self {
603 use crate::structures::IndexOptimization;
604 match optimization {
605 IndexOptimization::Adaptive => Self {
606 compression_level: CompressionLevel::BETTER, use_dictionary: false,
608 dict_size: DEFAULT_DICT_SIZE,
609 use_bloom_filter: false,
610 bloom_bits_per_key: BLOOM_BITS_PER_KEY,
611 },
612 IndexOptimization::SizeOptimized => Self {
613 compression_level: CompressionLevel::MAX, use_dictionary: true,
615 dict_size: DEFAULT_DICT_SIZE,
616 use_bloom_filter: true,
617 bloom_bits_per_key: BLOOM_BITS_PER_KEY,
618 },
619 IndexOptimization::PerformanceOptimized => Self {
620 compression_level: CompressionLevel::FAST, use_dictionary: false,
622 dict_size: DEFAULT_DICT_SIZE,
623 use_bloom_filter: true, bloom_bits_per_key: BLOOM_BITS_PER_KEY,
625 },
626 }
627 }
628
629 pub fn fast() -> Self {
631 Self::from_optimization(crate::structures::IndexOptimization::PerformanceOptimized)
632 }
633
634 pub fn max_compression() -> Self {
636 Self::from_optimization(crate::structures::IndexOptimization::SizeOptimized)
637 }
638}
639
640pub struct SSTableWriter<W: Write, V: SSTableValue> {
646 writer: W,
647 block_buffer: Vec<u8>,
648 prev_key: Vec<u8>,
649 index: Vec<BlockIndexEntry>,
650 current_offset: u64,
651 num_entries: u64,
652 block_first_key: Option<Vec<u8>>,
653 config: SSTableWriterConfig,
654 dictionary: Option<CompressionDict>,
656 all_keys: Vec<Vec<u8>>,
658 bloom_filter: Option<BloomFilter>,
660 _phantom: std::marker::PhantomData<V>,
661}
662
663impl<W: Write, V: SSTableValue> SSTableWriter<W, V> {
664 pub fn new(writer: W) -> Self {
666 Self::with_config(writer, SSTableWriterConfig::default())
667 }
668
669 pub fn with_config(writer: W, config: SSTableWriterConfig) -> Self {
671 Self {
672 writer,
673 block_buffer: Vec::with_capacity(BLOCK_SIZE),
674 prev_key: Vec::new(),
675 index: Vec::new(),
676 current_offset: 0,
677 num_entries: 0,
678 block_first_key: None,
679 config,
680 dictionary: None,
681 all_keys: Vec::new(),
682 bloom_filter: None,
683 _phantom: std::marker::PhantomData,
684 }
685 }
686
687 pub fn with_dictionary(
689 writer: W,
690 config: SSTableWriterConfig,
691 dictionary: CompressionDict,
692 ) -> Self {
693 Self {
694 writer,
695 block_buffer: Vec::with_capacity(BLOCK_SIZE),
696 prev_key: Vec::new(),
697 index: Vec::new(),
698 current_offset: 0,
699 num_entries: 0,
700 block_first_key: None,
701 config,
702 dictionary: Some(dictionary),
703 all_keys: Vec::new(),
704 bloom_filter: None,
705 _phantom: std::marker::PhantomData,
706 }
707 }
708
709 pub fn insert(&mut self, key: &[u8], value: &V) -> io::Result<()> {
710 if self.block_first_key.is_none() {
711 self.block_first_key = Some(key.to_vec());
712 }
713
714 if self.config.use_bloom_filter {
716 self.all_keys.push(key.to_vec());
717 }
718
719 let prefix_len = common_prefix_len(&self.prev_key, key);
720 let suffix = &key[prefix_len..];
721
722 write_vint(&mut self.block_buffer, prefix_len as u64)?;
723 write_vint(&mut self.block_buffer, suffix.len() as u64)?;
724 self.block_buffer.extend_from_slice(suffix);
725 value.serialize(&mut self.block_buffer)?;
726
727 self.prev_key.clear();
728 self.prev_key.extend_from_slice(key);
729 self.num_entries += 1;
730
731 if self.block_buffer.len() >= BLOCK_SIZE {
732 self.flush_block()?;
733 }
734
735 Ok(())
736 }
737
738 fn flush_block(&mut self) -> io::Result<()> {
740 if self.block_buffer.is_empty() {
741 return Ok(());
742 }
743
744 let compressed = if let Some(ref dict) = self.dictionary {
746 crate::compression::compress_with_dict(
747 &self.block_buffer,
748 self.config.compression_level,
749 dict,
750 )?
751 } else {
752 crate::compression::compress(&self.block_buffer, self.config.compression_level)?
753 };
754
755 if let Some(first_key) = self.block_first_key.take() {
756 self.index.push(BlockIndexEntry {
757 first_key,
758 offset: self.current_offset,
759 length: compressed.len() as u32,
760 });
761 }
762
763 self.writer.write_all(&compressed)?;
764 self.current_offset += compressed.len() as u64;
765 self.block_buffer.clear();
766 self.prev_key.clear();
767
768 Ok(())
769 }
770
771 pub fn finish(mut self) -> io::Result<W> {
772 self.flush_block()?;
774
775 if self.config.use_bloom_filter && !self.all_keys.is_empty() {
777 let mut bloom = BloomFilter::new(self.all_keys.len(), self.config.bloom_bits_per_key);
778 for key in &self.all_keys {
779 bloom.insert(key);
780 }
781 self.bloom_filter = Some(bloom);
782 }
783
784 let data_end_offset = self.current_offset;
785
786 let entries: Vec<(Vec<u8>, BlockAddr)> = self
789 .index
790 .iter()
791 .map(|e| {
792 (
793 e.first_key.clone(),
794 BlockAddr {
795 offset: e.offset,
796 length: e.length,
797 },
798 )
799 })
800 .collect();
801
802 #[cfg(feature = "native")]
804 let index_bytes = FstBlockIndex::build(&entries)?;
805 #[cfg(not(feature = "native"))]
806 let index_bytes = MmapBlockIndex::build(&entries)?;
807
808 self.writer
810 .write_u32::<LittleEndian>(index_bytes.len() as u32)?;
811 self.writer.write_all(&index_bytes)?;
812 self.current_offset += 4 + index_bytes.len() as u64;
813
814 let bloom_offset = if let Some(ref bloom) = self.bloom_filter {
816 let bloom_data = bloom.to_bytes();
817 let offset = self.current_offset;
818 self.writer.write_all(&bloom_data)?;
819 self.current_offset += bloom_data.len() as u64;
820 offset
821 } else {
822 0
823 };
824
825 let dict_offset = if let Some(ref dict) = self.dictionary {
827 let dict_bytes = dict.as_bytes();
828 let offset = self.current_offset;
829 self.writer
830 .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
831 self.writer.write_all(dict_bytes)?;
832 self.current_offset += 4 + dict_bytes.len() as u64;
833 offset
834 } else {
835 0
836 };
837
838 self.writer.write_u64::<LittleEndian>(data_end_offset)?;
840 self.writer.write_u64::<LittleEndian>(self.num_entries)?;
841 self.writer.write_u64::<LittleEndian>(bloom_offset)?; self.writer.write_u64::<LittleEndian>(dict_offset)?; self.writer
844 .write_u8(self.config.compression_level.0 as u8)?;
845 self.writer.write_u32::<LittleEndian>(SSTABLE_MAGIC)?;
846
847 Ok(self.writer)
848 }
849}
850
851#[derive(Debug, Clone)]
853struct BlockIndexEntry {
854 first_key: Vec<u8>,
855 offset: u64,
856 length: u32,
857}
858
859pub struct AsyncSSTableReader<V: SSTableValue> {
866 data_slice: LazyFileSlice,
868 block_index: BlockIndex,
870 num_entries: u64,
871 cache: RwLock<BlockCache>,
873 bloom_filter: Option<BloomFilter>,
875 dictionary: Option<CompressionDict>,
877 #[allow(dead_code)]
879 compression_level: CompressionLevel,
880 _phantom: std::marker::PhantomData<V>,
881}
882
883struct BlockCache {
889 blocks: FxHashMap<u64, Arc<Vec<u8>>>,
890 insert_order: std::collections::VecDeque<u64>,
891 max_blocks: usize,
892}
893
894impl BlockCache {
895 fn new(max_blocks: usize) -> Self {
896 Self {
897 blocks: FxHashMap::default(),
898 insert_order: std::collections::VecDeque::with_capacity(max_blocks),
899 max_blocks,
900 }
901 }
902
903 fn get(&self, offset: u64) -> Option<Arc<Vec<u8>>> {
904 self.blocks.get(&offset).map(Arc::clone)
905 }
906
907 fn insert(&mut self, offset: u64, block: Arc<Vec<u8>>) {
908 if self.blocks.contains_key(&offset) {
909 return; }
911 while self.blocks.len() >= self.max_blocks {
912 if let Some(evict_offset) = self.insert_order.pop_front() {
913 self.blocks.remove(&evict_offset);
914 } else {
915 break;
916 }
917 }
918 self.blocks.insert(offset, block);
919 self.insert_order.push_back(offset);
920 }
921}
922
923impl<V: SSTableValue> AsyncSSTableReader<V> {
924 pub async fn open(file_handle: LazyFileHandle, cache_blocks: usize) -> io::Result<Self> {
931 let file_len = file_handle.len();
932 if file_len < 37 {
933 return Err(io::Error::new(
934 io::ErrorKind::InvalidData,
935 "SSTable too small",
936 ));
937 }
938
939 let footer_bytes = file_handle
942 .read_bytes_range(file_len - 37..file_len)
943 .await?;
944
945 let mut reader = footer_bytes.as_slice();
946 let data_end_offset = reader.read_u64::<LittleEndian>()?;
947 let num_entries = reader.read_u64::<LittleEndian>()?;
948 let bloom_offset = reader.read_u64::<LittleEndian>()?;
949 let dict_offset = reader.read_u64::<LittleEndian>()?;
950 let compression_level = CompressionLevel(reader.read_u8()? as i32);
951 let magic = reader.read_u32::<LittleEndian>()?;
952
953 if magic != SSTABLE_MAGIC {
954 return Err(io::Error::new(
955 io::ErrorKind::InvalidData,
956 format!("Invalid SSTable magic: 0x{:08X}", magic),
957 ));
958 }
959
960 let index_start = data_end_offset;
962 let index_end = file_len - 37;
963 let index_bytes = file_handle.read_bytes_range(index_start..index_end).await?;
964
965 let mut idx_reader = index_bytes.as_slice();
967 let index_len = idx_reader.read_u32::<LittleEndian>()? as usize;
968
969 if index_len > idx_reader.len() {
970 return Err(io::Error::new(
971 io::ErrorKind::InvalidData,
972 "Index data truncated",
973 ));
974 }
975
976 let index_data = OwnedBytes::new(idx_reader[..index_len].to_vec());
977
978 #[cfg(feature = "native")]
980 let block_index = match FstBlockIndex::load(index_data.clone()) {
981 Ok(fst_idx) => BlockIndex::Fst(fst_idx),
982 Err(_) => BlockIndex::Mmap(MmapBlockIndex::load(index_data)?),
983 };
984 #[cfg(not(feature = "native"))]
985 let block_index = BlockIndex::Mmap(MmapBlockIndex::load(index_data)?);
986
987 let bloom_filter = if bloom_offset > 0 {
989 let bloom_start = bloom_offset;
990 let bloom_header = file_handle
992 .read_bytes_range(bloom_start..bloom_start + 12)
993 .await?;
994 let num_words = u32::from_le_bytes([
995 bloom_header[8],
996 bloom_header[9],
997 bloom_header[10],
998 bloom_header[11],
999 ]) as u64;
1000 let bloom_size = 12 + num_words * 8;
1001 let bloom_data = file_handle
1002 .read_bytes_range(bloom_start..bloom_start + bloom_size)
1003 .await?;
1004 Some(BloomFilter::from_bytes(&bloom_data)?)
1005 } else {
1006 None
1007 };
1008
1009 let dictionary = if dict_offset > 0 {
1011 let dict_start = dict_offset;
1012 let dict_len_bytes = file_handle
1014 .read_bytes_range(dict_start..dict_start + 4)
1015 .await?;
1016 let dict_len = u32::from_le_bytes([
1017 dict_len_bytes[0],
1018 dict_len_bytes[1],
1019 dict_len_bytes[2],
1020 dict_len_bytes[3],
1021 ]) as u64;
1022 let dict_data = file_handle
1023 .read_bytes_range(dict_start + 4..dict_start + 4 + dict_len)
1024 .await?;
1025 Some(CompressionDict::from_bytes(dict_data.to_vec()))
1026 } else {
1027 None
1028 };
1029
1030 let data_slice = file_handle.slice(0..data_end_offset);
1032
1033 Ok(Self {
1034 data_slice,
1035 block_index,
1036 num_entries,
1037 cache: RwLock::new(BlockCache::new(cache_blocks)),
1038 bloom_filter,
1039 dictionary,
1040 compression_level,
1041 _phantom: std::marker::PhantomData,
1042 })
1043 }
1044
1045 pub fn num_entries(&self) -> u64 {
1047 self.num_entries
1048 }
1049
1050 pub fn stats(&self) -> SSTableStats {
1052 SSTableStats {
1053 num_blocks: self.block_index.len(),
1054 num_sparse_entries: 0, num_entries: self.num_entries,
1056 has_bloom_filter: self.bloom_filter.is_some(),
1057 has_dictionary: self.dictionary.is_some(),
1058 bloom_filter_size: self
1059 .bloom_filter
1060 .as_ref()
1061 .map(|b| b.size_bytes())
1062 .unwrap_or(0),
1063 dictionary_size: self.dictionary.as_ref().map(|d| d.len()).unwrap_or(0),
1064 }
1065 }
1066
1067 pub fn cached_blocks(&self) -> usize {
1069 self.cache.read().blocks.len()
1070 }
1071
1072 pub async fn get(&self, key: &[u8]) -> io::Result<Option<V>> {
1077 log::debug!(
1078 "SSTable::get called, key_len={}, total_blocks={}",
1079 key.len(),
1080 self.block_index.len()
1081 );
1082
1083 if let Some(ref bloom) = self.bloom_filter
1085 && !bloom.may_contain(key)
1086 {
1087 log::debug!("SSTable::get bloom filter negative");
1088 return Ok(None);
1089 }
1090
1091 let block_idx = match self.block_index.locate(key) {
1093 Some(idx) => idx,
1094 None => {
1095 log::debug!("SSTable::get key not found (before first block)");
1096 return Ok(None);
1097 }
1098 };
1099
1100 log::debug!("SSTable::get loading block_idx={}", block_idx);
1101
1102 let block_data = self.load_block(block_idx).await?;
1104 self.search_block(&block_data, key)
1105 }
1106
1107 pub async fn get_batch(&self, keys: &[&[u8]]) -> io::Result<Vec<Option<V>>> {
1113 if keys.is_empty() {
1114 return Ok(Vec::new());
1115 }
1116
1117 let mut key_to_block: Vec<(usize, usize)> = Vec::with_capacity(keys.len());
1119 for (key_idx, key) in keys.iter().enumerate() {
1120 if let Some(ref bloom) = self.bloom_filter
1122 && !bloom.may_contain(key)
1123 {
1124 key_to_block.push((key_idx, usize::MAX)); continue;
1126 }
1127
1128 match self.block_index.locate(key) {
1129 Some(block_idx) => key_to_block.push((key_idx, block_idx)),
1130 None => key_to_block.push((key_idx, usize::MAX)), }
1132 }
1133
1134 let mut blocks_to_load: Vec<usize> = key_to_block
1136 .iter()
1137 .filter(|(_, b)| *b != usize::MAX)
1138 .map(|(_, b)| *b)
1139 .collect();
1140 blocks_to_load.sort_unstable();
1141 blocks_to_load.dedup();
1142
1143 for &block_idx in &blocks_to_load {
1145 let _ = self.load_block(block_idx).await?;
1146 }
1147
1148 let mut results = vec![None; keys.len()];
1150 for (key_idx, block_idx) in key_to_block {
1151 if block_idx == usize::MAX {
1152 continue;
1153 }
1154 let block_data = self.load_block(block_idx).await?; results[key_idx] = self.search_block(&block_data, keys[key_idx])?;
1156 }
1157
1158 Ok(results)
1159 }
1160
1161 pub async fn preload_all_blocks(&self) -> io::Result<()> {
1166 for block_idx in 0..self.block_index.len() {
1167 self.load_block(block_idx).await?;
1168 }
1169 Ok(())
1170 }
1171
1172 pub async fn prefetch_all_data_bulk(&self) -> io::Result<()> {
1178 let num_blocks = self.block_index.len();
1179 if num_blocks == 0 {
1180 return Ok(());
1181 }
1182
1183 let mut max_end: u64 = 0;
1185 for i in 0..num_blocks {
1186 if let Some(addr) = self.block_index.get_addr(i) {
1187 max_end = max_end.max(addr.offset + addr.length as u64);
1188 }
1189 }
1190
1191 let all_data = self.data_slice.read_bytes_range(0..max_end).await?;
1193 let buf = all_data.as_slice();
1194
1195 let mut cache = self.cache.write();
1197 cache.max_blocks = cache.max_blocks.max(num_blocks);
1198 for i in 0..num_blocks {
1199 let addr = self.block_index.get_addr(i).unwrap();
1200 if cache.get(addr.offset).is_some() {
1201 continue;
1202 }
1203 let compressed =
1204 &buf[addr.offset as usize..(addr.offset + addr.length as u64) as usize];
1205 let decompressed = if let Some(ref dict) = self.dictionary {
1206 crate::compression::decompress_with_dict(compressed, dict)?
1207 } else {
1208 crate::compression::decompress(compressed)?
1209 };
1210 cache.insert(addr.offset, Arc::new(decompressed));
1211 }
1212
1213 Ok(())
1214 }
1215
1216 async fn load_block(&self, block_idx: usize) -> io::Result<Arc<Vec<u8>>> {
1219 let addr = self.block_index.get_addr(block_idx).ok_or_else(|| {
1220 io::Error::new(io::ErrorKind::InvalidInput, "Block index out of range")
1221 })?;
1222
1223 {
1225 if let Some(block) = self.cache.read().get(addr.offset) {
1226 return Ok(block);
1227 }
1228 }
1229
1230 log::debug!(
1231 "SSTable::load_block idx={} CACHE MISS, reading bytes [{}-{}]",
1232 block_idx,
1233 addr.offset,
1234 addr.offset + addr.length as u64
1235 );
1236
1237 let range = addr.byte_range();
1239 let compressed = self.data_slice.read_bytes_range(range).await?;
1240
1241 let decompressed = if let Some(ref dict) = self.dictionary {
1243 crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
1244 } else {
1245 crate::compression::decompress(compressed.as_slice())?
1246 };
1247
1248 let block = Arc::new(decompressed);
1249
1250 {
1252 let mut cache = self.cache.write();
1253 cache.insert(addr.offset, Arc::clone(&block));
1254 }
1255
1256 Ok(block)
1257 }
1258
1259 fn search_block(&self, block_data: &[u8], target_key: &[u8]) -> io::Result<Option<V>> {
1260 let mut reader = block_data;
1261 let mut current_key = Vec::new();
1262
1263 while !reader.is_empty() {
1264 let common_prefix_len = read_vint(&mut reader)? as usize;
1265 let suffix_len = read_vint(&mut reader)? as usize;
1266
1267 current_key.truncate(common_prefix_len);
1268 let mut suffix = vec![0u8; suffix_len];
1269 reader.read_exact(&mut suffix)?;
1270 current_key.extend_from_slice(&suffix);
1271
1272 let value = V::deserialize(&mut reader)?;
1273
1274 match current_key.as_slice().cmp(target_key) {
1275 std::cmp::Ordering::Equal => return Ok(Some(value)),
1276 std::cmp::Ordering::Greater => return Ok(None),
1277 std::cmp::Ordering::Less => continue,
1278 }
1279 }
1280
1281 Ok(None)
1282 }
1283
1284 pub async fn prefetch_range(&self, start_key: &[u8], end_key: &[u8]) -> io::Result<()> {
1286 let start_block = self.block_index.locate(start_key).unwrap_or(0);
1287 let end_block = self
1288 .block_index
1289 .locate(end_key)
1290 .unwrap_or(self.block_index.len().saturating_sub(1));
1291
1292 for block_idx in start_block..=end_block.min(self.block_index.len().saturating_sub(1)) {
1293 let _ = self.load_block(block_idx).await?;
1294 }
1295
1296 Ok(())
1297 }
1298
1299 pub fn iter(&self) -> AsyncSSTableIterator<'_, V> {
1301 AsyncSSTableIterator::new(self)
1302 }
1303
1304 pub async fn all_entries(&self) -> io::Result<Vec<(Vec<u8>, V)>> {
1306 let mut results = Vec::new();
1307
1308 for block_idx in 0..self.block_index.len() {
1309 let block_data = self.load_block(block_idx).await?;
1310 let mut reader = block_data.as_slice();
1311 let mut current_key = Vec::new();
1312
1313 while !reader.is_empty() {
1314 let common_prefix_len = read_vint(&mut reader)? as usize;
1315 let suffix_len = read_vint(&mut reader)? as usize;
1316
1317 current_key.truncate(common_prefix_len);
1318 let mut suffix = vec![0u8; suffix_len];
1319 reader.read_exact(&mut suffix)?;
1320 current_key.extend_from_slice(&suffix);
1321
1322 let value = V::deserialize(&mut reader)?;
1323 results.push((current_key.clone(), value));
1324 }
1325 }
1326
1327 Ok(results)
1328 }
1329}
1330
1331pub struct AsyncSSTableIterator<'a, V: SSTableValue> {
1333 reader: &'a AsyncSSTableReader<V>,
1334 current_block: usize,
1335 block_data: Option<Arc<Vec<u8>>>,
1336 block_offset: usize,
1337 current_key: Vec<u8>,
1338 finished: bool,
1339}
1340
1341impl<'a, V: SSTableValue> AsyncSSTableIterator<'a, V> {
1342 fn new(reader: &'a AsyncSSTableReader<V>) -> Self {
1343 Self {
1344 reader,
1345 current_block: 0,
1346 block_data: None,
1347 block_offset: 0,
1348 current_key: Vec::new(),
1349 finished: reader.block_index.is_empty(),
1350 }
1351 }
1352
1353 async fn load_next_block(&mut self) -> io::Result<bool> {
1354 if self.current_block >= self.reader.block_index.len() {
1355 self.finished = true;
1356 return Ok(false);
1357 }
1358
1359 self.block_data = Some(self.reader.load_block(self.current_block).await?);
1360 self.block_offset = 0;
1361 self.current_key.clear();
1362 self.current_block += 1;
1363 Ok(true)
1364 }
1365
1366 pub async fn next(&mut self) -> io::Result<Option<(Vec<u8>, V)>> {
1368 if self.finished {
1369 return Ok(None);
1370 }
1371
1372 if self.block_data.is_none() && !self.load_next_block().await? {
1373 return Ok(None);
1374 }
1375
1376 loop {
1377 let block = self.block_data.as_ref().unwrap();
1378 if self.block_offset >= block.len() {
1379 if !self.load_next_block().await? {
1380 return Ok(None);
1381 }
1382 continue;
1383 }
1384
1385 let mut reader = &block[self.block_offset..];
1386 let start_len = reader.len();
1387
1388 let common_prefix_len = read_vint(&mut reader)? as usize;
1389 let suffix_len = read_vint(&mut reader)? as usize;
1390
1391 self.current_key.truncate(common_prefix_len);
1392 let mut suffix = vec![0u8; suffix_len];
1393 reader.read_exact(&mut suffix)?;
1394 self.current_key.extend_from_slice(&suffix);
1395
1396 let value = V::deserialize(&mut reader)?;
1397
1398 self.block_offset += start_len - reader.len();
1399
1400 return Ok(Some((self.current_key.clone(), value)));
1401 }
1402 }
1403}
1404
1405#[cfg(test)]
1406mod tests {
1407 use super::*;
1408
1409 #[test]
1410 fn test_bloom_filter_basic() {
1411 let mut bloom = BloomFilter::new(100, 10);
1412
1413 bloom.insert(b"hello");
1414 bloom.insert(b"world");
1415 bloom.insert(b"test");
1416
1417 assert!(bloom.may_contain(b"hello"));
1418 assert!(bloom.may_contain(b"world"));
1419 assert!(bloom.may_contain(b"test"));
1420
1421 assert!(!bloom.may_contain(b"notfound"));
1423 assert!(!bloom.may_contain(b"missing"));
1424 }
1425
1426 #[test]
1427 fn test_bloom_filter_serialization() {
1428 let mut bloom = BloomFilter::new(100, 10);
1429 bloom.insert(b"key1");
1430 bloom.insert(b"key2");
1431
1432 let bytes = bloom.to_bytes();
1433 let restored = BloomFilter::from_bytes(&bytes).unwrap();
1434
1435 assert!(restored.may_contain(b"key1"));
1436 assert!(restored.may_contain(b"key2"));
1437 assert!(!restored.may_contain(b"key3"));
1438 }
1439
1440 #[test]
1441 fn test_bloom_filter_false_positive_rate() {
1442 let num_keys = 10000;
1443 let mut bloom = BloomFilter::new(num_keys, BLOOM_BITS_PER_KEY);
1444
1445 for i in 0..num_keys {
1447 let key = format!("key_{}", i);
1448 bloom.insert(key.as_bytes());
1449 }
1450
1451 for i in 0..num_keys {
1453 let key = format!("key_{}", i);
1454 assert!(bloom.may_contain(key.as_bytes()));
1455 }
1456
1457 let mut false_positives = 0;
1459 let test_count = 10000;
1460 for i in 0..test_count {
1461 let key = format!("nonexistent_{}", i);
1462 if bloom.may_contain(key.as_bytes()) {
1463 false_positives += 1;
1464 }
1465 }
1466
1467 let fp_rate = false_positives as f64 / test_count as f64;
1470 assert!(
1471 fp_rate < 0.03,
1472 "False positive rate {} is too high",
1473 fp_rate
1474 );
1475 }
1476
1477 #[test]
1478 fn test_sstable_writer_config() {
1479 use crate::structures::IndexOptimization;
1480
1481 let config = SSTableWriterConfig::default();
1483 assert_eq!(config.compression_level.0, 9); assert!(!config.use_bloom_filter);
1485 assert!(!config.use_dictionary);
1486
1487 let adaptive = SSTableWriterConfig::from_optimization(IndexOptimization::Adaptive);
1489 assert_eq!(adaptive.compression_level.0, 9);
1490 assert!(!adaptive.use_bloom_filter);
1491 assert!(!adaptive.use_dictionary);
1492
1493 let size = SSTableWriterConfig::from_optimization(IndexOptimization::SizeOptimized);
1495 assert_eq!(size.compression_level.0, 22); assert!(size.use_bloom_filter);
1497 assert!(size.use_dictionary);
1498
1499 let perf = SSTableWriterConfig::from_optimization(IndexOptimization::PerformanceOptimized);
1501 assert_eq!(perf.compression_level.0, 1); assert!(perf.use_bloom_filter); assert!(!perf.use_dictionary);
1504
1505 let fast = SSTableWriterConfig::fast();
1507 assert_eq!(fast.compression_level.0, 1);
1508
1509 let max = SSTableWriterConfig::max_compression();
1510 assert_eq!(max.compression_level.0, 22);
1511 }
1512
1513 #[test]
1514 fn test_vint_roundtrip() {
1515 let test_values = [0u64, 1, 127, 128, 255, 256, 16383, 16384, u64::MAX];
1516
1517 for &val in &test_values {
1518 let mut buf = Vec::new();
1519 write_vint(&mut buf, val).unwrap();
1520 let mut reader = buf.as_slice();
1521 let decoded = read_vint(&mut reader).unwrap();
1522 assert_eq!(val, decoded, "Failed for value {}", val);
1523 }
1524 }
1525
1526 #[test]
1527 fn test_common_prefix_len() {
1528 assert_eq!(common_prefix_len(b"hello", b"hello"), 5);
1529 assert_eq!(common_prefix_len(b"hello", b"help"), 3);
1530 assert_eq!(common_prefix_len(b"hello", b"world"), 0);
1531 assert_eq!(common_prefix_len(b"", b"hello"), 0);
1532 assert_eq!(common_prefix_len(b"hello", b""), 0);
1533 }
1534}