1use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
22use parking_lot::RwLock;
23use rustc_hash::FxHashMap;
24use std::io::{self, Read, Write};
25use std::sync::Arc;
26
27#[cfg(feature = "native")]
28use super::sstable_index::FstBlockIndex;
29use super::sstable_index::{BlockAddr, BlockIndex, MmapBlockIndex};
30use crate::compression::{CompressionDict, CompressionLevel};
31use crate::directories::{AsyncFileRead, LazyFileHandle, LazyFileSlice, OwnedBytes};
32
33pub const SSTABLE_MAGIC: u32 = 0x53544234; pub const BLOCK_SIZE: usize = 16 * 1024;
39
40pub const DEFAULT_DICT_SIZE: usize = 64 * 1024;
42
43pub const BLOOM_BITS_PER_KEY: usize = 10;
45
46pub const BLOOM_HASH_COUNT: usize = 7;
48
49#[derive(Debug, Clone)]
55pub struct BloomFilter {
56 bits: Vec<u64>,
57 num_bits: usize,
58 num_hashes: usize,
59}
60
61impl BloomFilter {
62 pub fn new(expected_keys: usize, bits_per_key: usize) -> Self {
64 let num_bits = (expected_keys * bits_per_key).max(64);
65 let num_words = num_bits.div_ceil(64);
66 Self {
67 bits: vec![0u64; num_words],
68 num_bits,
69 num_hashes: BLOOM_HASH_COUNT,
70 }
71 }
72
73 pub fn from_bytes(data: &[u8]) -> io::Result<Self> {
75 if data.len() < 12 {
76 return Err(io::Error::new(
77 io::ErrorKind::InvalidData,
78 "Bloom filter data too short",
79 ));
80 }
81 let mut reader = data;
82 let num_bits = reader.read_u32::<LittleEndian>()? as usize;
83 let num_hashes = reader.read_u32::<LittleEndian>()? as usize;
84 let num_words = reader.read_u32::<LittleEndian>()? as usize;
85
86 if reader.len() < num_words * 8 {
87 return Err(io::Error::new(
88 io::ErrorKind::InvalidData,
89 "Bloom filter data truncated",
90 ));
91 }
92
93 let mut bits = Vec::with_capacity(num_words);
94 for _ in 0..num_words {
95 bits.push(reader.read_u64::<LittleEndian>()?);
96 }
97
98 Ok(Self {
99 bits,
100 num_bits,
101 num_hashes,
102 })
103 }
104
105 pub fn to_bytes(&self) -> Vec<u8> {
107 let mut data = Vec::with_capacity(12 + self.bits.len() * 8);
108 data.write_u32::<LittleEndian>(self.num_bits as u32)
109 .unwrap();
110 data.write_u32::<LittleEndian>(self.num_hashes as u32)
111 .unwrap();
112 data.write_u32::<LittleEndian>(self.bits.len() as u32)
113 .unwrap();
114 for &word in &self.bits {
115 data.write_u64::<LittleEndian>(word).unwrap();
116 }
117 data
118 }
119
120 pub fn insert(&mut self, key: &[u8]) {
122 let (h1, h2) = self.hash_pair(key);
123 for i in 0..self.num_hashes {
124 let bit_pos = self.get_bit_pos(h1, h2, i);
125 let word_idx = bit_pos / 64;
126 let bit_idx = bit_pos % 64;
127 if word_idx < self.bits.len() {
128 self.bits[word_idx] |= 1u64 << bit_idx;
129 }
130 }
131 }
132
133 pub fn may_contain(&self, key: &[u8]) -> bool {
136 let (h1, h2) = self.hash_pair(key);
137 for i in 0..self.num_hashes {
138 let bit_pos = self.get_bit_pos(h1, h2, i);
139 let word_idx = bit_pos / 64;
140 let bit_idx = bit_pos % 64;
141 if word_idx >= self.bits.len() || (self.bits[word_idx] & (1u64 << bit_idx)) == 0 {
142 return false;
143 }
144 }
145 true
146 }
147
148 pub fn size_bytes(&self) -> usize {
150 12 + self.bits.len() * 8
151 }
152
153 fn hash_pair(&self, key: &[u8]) -> (u64, u64) {
155 let mut h1: u64 = 0xcbf29ce484222325;
157 for &byte in key {
158 h1 ^= byte as u64;
159 h1 = h1.wrapping_mul(0x100000001b3);
160 }
161
162 let mut h2: u64 = 0x84222325cbf29ce4;
164 for &byte in key {
165 h2 = h2.wrapping_mul(0x100000001b3);
166 h2 ^= byte as u64;
167 }
168
169 (h1, h2)
170 }
171
172 #[inline]
174 fn get_bit_pos(&self, h1: u64, h2: u64, i: usize) -> usize {
175 (h1.wrapping_add((i as u64).wrapping_mul(h2)) % (self.num_bits as u64)) as usize
176 }
177}
178
179pub trait SSTableValue: Clone + Send + Sync {
181 fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()>;
182 fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self>;
183}
184
185impl SSTableValue for u64 {
187 fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
188 write_vint(writer, *self)
189 }
190
191 fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
192 read_vint(reader)
193 }
194}
195
196impl SSTableValue for Vec<u8> {
198 fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
199 write_vint(writer, self.len() as u64)?;
200 writer.write_all(self)
201 }
202
203 fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
204 let len = read_vint(reader)? as usize;
205 let mut data = vec![0u8; len];
206 reader.read_exact(&mut data)?;
207 Ok(data)
208 }
209}
210
211pub const MAX_INLINE_POSTINGS: usize = 3;
213
214#[derive(Debug, Clone, PartialEq, Eq)]
222pub enum TermInfo {
223 Inline {
226 doc_freq: u8,
228 data: [u8; 16],
231 data_len: u8,
233 },
234 External {
236 posting_offset: u64,
237 posting_len: u32,
238 doc_freq: u32,
239 position_offset: u64,
241 position_len: u32,
243 },
244}
245
246impl TermInfo {
247 pub fn external(posting_offset: u64, posting_len: u32, doc_freq: u32) -> Self {
249 TermInfo::External {
250 posting_offset,
251 posting_len,
252 doc_freq,
253 position_offset: 0,
254 position_len: 0,
255 }
256 }
257
258 pub fn external_with_positions(
260 posting_offset: u64,
261 posting_len: u32,
262 doc_freq: u32,
263 position_offset: u64,
264 position_len: u32,
265 ) -> Self {
266 TermInfo::External {
267 posting_offset,
268 posting_len,
269 doc_freq,
270 position_offset,
271 position_len,
272 }
273 }
274
275 pub fn try_inline(doc_ids: &[u32], term_freqs: &[u32]) -> Option<Self> {
278 if doc_ids.len() > MAX_INLINE_POSTINGS || doc_ids.is_empty() {
279 return None;
280 }
281
282 let mut data = [0u8; 16];
283 let mut cursor = std::io::Cursor::new(&mut data[..]);
284 let mut prev_doc_id = 0u32;
285
286 for (i, &doc_id) in doc_ids.iter().enumerate() {
287 let delta = doc_id - prev_doc_id;
288 if write_vint(&mut cursor, delta as u64).is_err() {
289 return None;
290 }
291 if write_vint(&mut cursor, term_freqs[i] as u64).is_err() {
292 return None;
293 }
294 prev_doc_id = doc_id;
295 }
296
297 let data_len = cursor.position() as u8;
298 if data_len > 16 {
299 return None;
300 }
301
302 Some(TermInfo::Inline {
303 doc_freq: doc_ids.len() as u8,
304 data,
305 data_len,
306 })
307 }
308
309 pub fn doc_freq(&self) -> u32 {
311 match self {
312 TermInfo::Inline { doc_freq, .. } => *doc_freq as u32,
313 TermInfo::External { doc_freq, .. } => *doc_freq,
314 }
315 }
316
317 pub fn is_inline(&self) -> bool {
319 matches!(self, TermInfo::Inline { .. })
320 }
321
322 pub fn external_info(&self) -> Option<(u64, u32)> {
324 match self {
325 TermInfo::External {
326 posting_offset,
327 posting_len,
328 ..
329 } => Some((*posting_offset, *posting_len)),
330 TermInfo::Inline { .. } => None,
331 }
332 }
333
334 pub fn position_info(&self) -> Option<(u64, u32)> {
336 match self {
337 TermInfo::External {
338 position_offset,
339 position_len,
340 ..
341 } if *position_len > 0 => Some((*position_offset, *position_len)),
342 _ => None,
343 }
344 }
345
346 pub fn decode_inline(&self) -> Option<(Vec<u32>, Vec<u32>)> {
349 match self {
350 TermInfo::Inline {
351 doc_freq,
352 data,
353 data_len,
354 } => {
355 let mut doc_ids = Vec::with_capacity(*doc_freq as usize);
356 let mut term_freqs = Vec::with_capacity(*doc_freq as usize);
357 let mut reader = &data[..*data_len as usize];
358 let mut prev_doc_id = 0u32;
359
360 for _ in 0..*doc_freq {
361 let delta = read_vint(&mut reader).ok()? as u32;
362 let tf = read_vint(&mut reader).ok()? as u32;
363 let doc_id = prev_doc_id + delta;
364 doc_ids.push(doc_id);
365 term_freqs.push(tf);
366 prev_doc_id = doc_id;
367 }
368
369 Some((doc_ids, term_freqs))
370 }
371 TermInfo::External { .. } => None,
372 }
373 }
374}
375
376impl SSTableValue for TermInfo {
377 fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
378 match self {
379 TermInfo::Inline {
380 doc_freq,
381 data,
382 data_len,
383 } => {
384 writer.write_u8(0xFF)?;
386 writer.write_u8(*doc_freq)?;
387 writer.write_u8(*data_len)?;
388 writer.write_all(&data[..*data_len as usize])?;
389 }
390 TermInfo::External {
391 posting_offset,
392 posting_len,
393 doc_freq,
394 position_offset,
395 position_len,
396 } => {
397 if *position_len > 0 {
400 writer.write_u8(0x01)?;
401 write_vint(writer, *doc_freq as u64)?;
402 write_vint(writer, *posting_offset)?;
403 write_vint(writer, *posting_len as u64)?;
404 write_vint(writer, *position_offset)?;
405 write_vint(writer, *position_len as u64)?;
406 } else {
407 writer.write_u8(0x00)?;
408 write_vint(writer, *doc_freq as u64)?;
409 write_vint(writer, *posting_offset)?;
410 write_vint(writer, *posting_len as u64)?;
411 }
412 }
413 }
414 Ok(())
415 }
416
417 fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
418 let tag = reader.read_u8()?;
419
420 if tag == 0xFF {
421 let doc_freq = reader.read_u8()?;
423 let data_len = reader.read_u8()?;
424 let mut data = [0u8; 16];
425 reader.read_exact(&mut data[..data_len as usize])?;
426 Ok(TermInfo::Inline {
427 doc_freq,
428 data,
429 data_len,
430 })
431 } else if tag == 0x00 {
432 let doc_freq = read_vint(reader)? as u32;
434 let posting_offset = read_vint(reader)?;
435 let posting_len = read_vint(reader)? as u32;
436 Ok(TermInfo::External {
437 posting_offset,
438 posting_len,
439 doc_freq,
440 position_offset: 0,
441 position_len: 0,
442 })
443 } else if tag == 0x01 {
444 let doc_freq = read_vint(reader)? as u32;
446 let posting_offset = read_vint(reader)?;
447 let posting_len = read_vint(reader)? as u32;
448 let position_offset = read_vint(reader)?;
449 let position_len = read_vint(reader)? as u32;
450 Ok(TermInfo::External {
451 posting_offset,
452 posting_len,
453 doc_freq,
454 position_offset,
455 position_len,
456 })
457 } else {
458 Err(io::Error::new(
459 io::ErrorKind::InvalidData,
460 format!("Invalid TermInfo tag: {}", tag),
461 ))
462 }
463 }
464}
465
466pub fn write_vint<W: Write + ?Sized>(writer: &mut W, mut value: u64) -> io::Result<()> {
468 loop {
469 let byte = (value & 0x7F) as u8;
470 value >>= 7;
471 if value == 0 {
472 writer.write_u8(byte)?;
473 return Ok(());
474 } else {
475 writer.write_u8(byte | 0x80)?;
476 }
477 }
478}
479
480pub fn read_vint<R: Read>(reader: &mut R) -> io::Result<u64> {
482 let mut result = 0u64;
483 let mut shift = 0;
484
485 loop {
486 let byte = reader.read_u8()?;
487 result |= ((byte & 0x7F) as u64) << shift;
488 if byte & 0x80 == 0 {
489 return Ok(result);
490 }
491 shift += 7;
492 if shift >= 64 {
493 return Err(io::Error::new(
494 io::ErrorKind::InvalidData,
495 "varint too long",
496 ));
497 }
498 }
499}
500
501pub fn common_prefix_len(a: &[u8], b: &[u8]) -> usize {
503 a.iter().zip(b.iter()).take_while(|(x, y)| x == y).count()
504}
505
506#[derive(Debug, Clone)]
508pub struct SSTableStats {
509 pub num_blocks: usize,
510 pub num_sparse_entries: usize,
511 pub num_entries: u64,
512 pub has_bloom_filter: bool,
513 pub has_dictionary: bool,
514 pub bloom_filter_size: usize,
515 pub dictionary_size: usize,
516}
517
518#[derive(Debug, Clone)]
520pub struct SSTableWriterConfig {
521 pub compression_level: CompressionLevel,
523 pub use_dictionary: bool,
525 pub dict_size: usize,
527 pub use_bloom_filter: bool,
529 pub bloom_bits_per_key: usize,
531}
532
533impl Default for SSTableWriterConfig {
534 fn default() -> Self {
535 Self::from_optimization(crate::structures::IndexOptimization::default())
536 }
537}
538
539impl SSTableWriterConfig {
540 pub fn from_optimization(optimization: crate::structures::IndexOptimization) -> Self {
542 use crate::structures::IndexOptimization;
543 match optimization {
544 IndexOptimization::Adaptive => Self {
545 compression_level: CompressionLevel::BETTER, use_dictionary: false,
547 dict_size: DEFAULT_DICT_SIZE,
548 use_bloom_filter: false,
549 bloom_bits_per_key: BLOOM_BITS_PER_KEY,
550 },
551 IndexOptimization::SizeOptimized => Self {
552 compression_level: CompressionLevel::MAX, use_dictionary: true,
554 dict_size: DEFAULT_DICT_SIZE,
555 use_bloom_filter: true,
556 bloom_bits_per_key: BLOOM_BITS_PER_KEY,
557 },
558 IndexOptimization::PerformanceOptimized => Self {
559 compression_level: CompressionLevel::FAST, use_dictionary: false,
561 dict_size: DEFAULT_DICT_SIZE,
562 use_bloom_filter: true, bloom_bits_per_key: BLOOM_BITS_PER_KEY,
564 },
565 }
566 }
567
568 pub fn fast() -> Self {
570 Self::from_optimization(crate::structures::IndexOptimization::PerformanceOptimized)
571 }
572
573 pub fn max_compression() -> Self {
575 Self::from_optimization(crate::structures::IndexOptimization::SizeOptimized)
576 }
577}
578
579pub struct SSTableWriter<'a, V: SSTableValue> {
585 writer: &'a mut dyn Write,
586 block_buffer: Vec<u8>,
587 prev_key: Vec<u8>,
588 index: Vec<BlockIndexEntry>,
589 current_offset: u64,
590 num_entries: u64,
591 block_first_key: Option<Vec<u8>>,
592 config: SSTableWriterConfig,
593 dictionary: Option<CompressionDict>,
595 all_keys: Vec<Vec<u8>>,
597 bloom_filter: Option<BloomFilter>,
599 _phantom: std::marker::PhantomData<V>,
600}
601
602impl<'a, V: SSTableValue> SSTableWriter<'a, V> {
603 pub fn new(writer: &'a mut dyn Write) -> Self {
605 Self::with_config(writer, SSTableWriterConfig::default())
606 }
607
608 pub fn with_config(writer: &'a mut dyn Write, config: SSTableWriterConfig) -> Self {
610 Self {
611 writer,
612 block_buffer: Vec::with_capacity(BLOCK_SIZE),
613 prev_key: Vec::new(),
614 index: Vec::new(),
615 current_offset: 0,
616 num_entries: 0,
617 block_first_key: None,
618 config,
619 dictionary: None,
620 all_keys: Vec::new(),
621 bloom_filter: None,
622 _phantom: std::marker::PhantomData,
623 }
624 }
625
626 pub fn with_dictionary(
628 writer: &'a mut dyn Write,
629 config: SSTableWriterConfig,
630 dictionary: CompressionDict,
631 ) -> Self {
632 Self {
633 writer,
634 block_buffer: Vec::with_capacity(BLOCK_SIZE),
635 prev_key: Vec::new(),
636 index: Vec::new(),
637 current_offset: 0,
638 num_entries: 0,
639 block_first_key: None,
640 config,
641 dictionary: Some(dictionary),
642 all_keys: Vec::new(),
643 bloom_filter: None,
644 _phantom: std::marker::PhantomData,
645 }
646 }
647
648 pub fn insert(&mut self, key: &[u8], value: &V) -> io::Result<()> {
649 if self.block_first_key.is_none() {
650 self.block_first_key = Some(key.to_vec());
651 }
652
653 if self.config.use_bloom_filter {
655 self.all_keys.push(key.to_vec());
656 }
657
658 let prefix_len = common_prefix_len(&self.prev_key, key);
659 let suffix = &key[prefix_len..];
660
661 write_vint(&mut self.block_buffer, prefix_len as u64)?;
662 write_vint(&mut self.block_buffer, suffix.len() as u64)?;
663 self.block_buffer.extend_from_slice(suffix);
664 value.serialize(&mut self.block_buffer)?;
665
666 self.prev_key.clear();
667 self.prev_key.extend_from_slice(key);
668 self.num_entries += 1;
669
670 if self.block_buffer.len() >= BLOCK_SIZE {
671 self.flush_block()?;
672 }
673
674 Ok(())
675 }
676
677 fn flush_block(&mut self) -> io::Result<()> {
679 if self.block_buffer.is_empty() {
680 return Ok(());
681 }
682
683 let compressed = if let Some(ref dict) = self.dictionary {
685 crate::compression::compress_with_dict(
686 &self.block_buffer,
687 self.config.compression_level,
688 dict,
689 )?
690 } else {
691 crate::compression::compress(&self.block_buffer, self.config.compression_level)?
692 };
693
694 if let Some(first_key) = self.block_first_key.take() {
695 self.index.push(BlockIndexEntry {
696 first_key,
697 offset: self.current_offset,
698 length: compressed.len() as u32,
699 });
700 }
701
702 self.writer.write_all(&compressed)?;
703 self.current_offset += compressed.len() as u64;
704 self.block_buffer.clear();
705 self.prev_key.clear();
706
707 Ok(())
708 }
709
710 pub fn finish(mut self) -> io::Result<()> {
711 self.flush_block()?;
713
714 if self.config.use_bloom_filter && !self.all_keys.is_empty() {
716 let mut bloom = BloomFilter::new(self.all_keys.len(), self.config.bloom_bits_per_key);
717 for key in &self.all_keys {
718 bloom.insert(key);
719 }
720 self.bloom_filter = Some(bloom);
721 }
722
723 let data_end_offset = self.current_offset;
724
725 let entries: Vec<(Vec<u8>, BlockAddr)> = self
728 .index
729 .iter()
730 .map(|e| {
731 (
732 e.first_key.clone(),
733 BlockAddr {
734 offset: e.offset,
735 length: e.length,
736 },
737 )
738 })
739 .collect();
740
741 #[cfg(feature = "native")]
743 let index_bytes = FstBlockIndex::build(&entries)?;
744 #[cfg(not(feature = "native"))]
745 let index_bytes = MmapBlockIndex::build(&entries)?;
746
747 self.writer
749 .write_u32::<LittleEndian>(index_bytes.len() as u32)?;
750 self.writer.write_all(&index_bytes)?;
751 self.current_offset += 4 + index_bytes.len() as u64;
752
753 let bloom_offset = if let Some(ref bloom) = self.bloom_filter {
755 let bloom_data = bloom.to_bytes();
756 let offset = self.current_offset;
757 self.writer.write_all(&bloom_data)?;
758 self.current_offset += bloom_data.len() as u64;
759 offset
760 } else {
761 0
762 };
763
764 let dict_offset = if let Some(ref dict) = self.dictionary {
766 let dict_bytes = dict.as_bytes();
767 let offset = self.current_offset;
768 self.writer
769 .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
770 self.writer.write_all(dict_bytes)?;
771 self.current_offset += 4 + dict_bytes.len() as u64;
772 offset
773 } else {
774 0
775 };
776
777 self.writer.write_u64::<LittleEndian>(data_end_offset)?;
779 self.writer.write_u64::<LittleEndian>(self.num_entries)?;
780 self.writer.write_u64::<LittleEndian>(bloom_offset)?; self.writer.write_u64::<LittleEndian>(dict_offset)?; self.writer
783 .write_u8(self.config.compression_level.0 as u8)?;
784 self.writer.write_u32::<LittleEndian>(SSTABLE_MAGIC)?;
785
786 Ok(())
787 }
788}
789
790#[derive(Debug, Clone)]
792struct BlockIndexEntry {
793 first_key: Vec<u8>,
794 offset: u64,
795 length: u32,
796}
797
798pub struct AsyncSSTableReader<V: SSTableValue> {
805 data_slice: LazyFileSlice,
807 block_index: BlockIndex,
809 num_entries: u64,
810 cache: RwLock<BlockCache>,
812 bloom_filter: Option<BloomFilter>,
814 dictionary: Option<CompressionDict>,
816 #[allow(dead_code)]
818 compression_level: CompressionLevel,
819 _phantom: std::marker::PhantomData<V>,
820}
821
822struct BlockCache {
824 blocks: FxHashMap<u64, Arc<Vec<u8>>>,
825 access_order: Vec<u64>,
826 max_blocks: usize,
827}
828
829impl BlockCache {
830 fn new(max_blocks: usize) -> Self {
831 Self {
832 blocks: FxHashMap::default(),
833 access_order: Vec::new(),
834 max_blocks,
835 }
836 }
837
838 fn get(&mut self, offset: u64) -> Option<Arc<Vec<u8>>> {
839 if let Some(block) = self.blocks.get(&offset) {
840 if let Some(pos) = self.access_order.iter().position(|&o| o == offset) {
841 self.access_order.remove(pos);
842 self.access_order.push(offset);
843 }
844 Some(Arc::clone(block))
845 } else {
846 None
847 }
848 }
849
850 fn insert(&mut self, offset: u64, block: Arc<Vec<u8>>) {
851 while self.blocks.len() >= self.max_blocks && !self.access_order.is_empty() {
852 let evict_offset = self.access_order.remove(0);
853 self.blocks.remove(&evict_offset);
854 }
855 self.blocks.insert(offset, block);
856 self.access_order.push(offset);
857 }
858}
859
860impl<V: SSTableValue> AsyncSSTableReader<V> {
861 pub async fn open(file_handle: LazyFileHandle, cache_blocks: usize) -> io::Result<Self> {
868 let file_len = file_handle.len();
869 if file_len < 37 {
870 return Err(io::Error::new(
871 io::ErrorKind::InvalidData,
872 "SSTable too small",
873 ));
874 }
875
876 let footer_bytes = file_handle
879 .read_bytes_range(file_len - 37..file_len)
880 .await?;
881
882 let mut reader = footer_bytes.as_slice();
883 let data_end_offset = reader.read_u64::<LittleEndian>()?;
884 let num_entries = reader.read_u64::<LittleEndian>()?;
885 let bloom_offset = reader.read_u64::<LittleEndian>()?;
886 let dict_offset = reader.read_u64::<LittleEndian>()?;
887 let compression_level = CompressionLevel(reader.read_u8()? as i32);
888 let magic = reader.read_u32::<LittleEndian>()?;
889
890 if magic != SSTABLE_MAGIC {
891 return Err(io::Error::new(
892 io::ErrorKind::InvalidData,
893 format!("Invalid SSTable magic: 0x{:08X}", magic),
894 ));
895 }
896
897 let index_start = data_end_offset;
899 let index_end = file_len - 37;
900 let index_bytes = file_handle.read_bytes_range(index_start..index_end).await?;
901
902 let mut idx_reader = index_bytes.as_slice();
904 let index_len = idx_reader.read_u32::<LittleEndian>()? as usize;
905
906 if index_len > idx_reader.len() {
907 return Err(io::Error::new(
908 io::ErrorKind::InvalidData,
909 "Index data truncated",
910 ));
911 }
912
913 let index_data = OwnedBytes::new(idx_reader[..index_len].to_vec());
914
915 #[cfg(feature = "native")]
917 let block_index = match FstBlockIndex::load(index_data.clone()) {
918 Ok(fst_idx) => BlockIndex::Fst(fst_idx),
919 Err(_) => BlockIndex::Mmap(MmapBlockIndex::load(index_data)?),
920 };
921 #[cfg(not(feature = "native"))]
922 let block_index = BlockIndex::Mmap(MmapBlockIndex::load(index_data)?);
923
924 let bloom_filter = if bloom_offset > 0 {
926 let bloom_start = bloom_offset;
927 let bloom_header = file_handle
929 .read_bytes_range(bloom_start..bloom_start + 12)
930 .await?;
931 let num_words = u32::from_le_bytes([
932 bloom_header[8],
933 bloom_header[9],
934 bloom_header[10],
935 bloom_header[11],
936 ]) as u64;
937 let bloom_size = 12 + num_words * 8;
938 let bloom_data = file_handle
939 .read_bytes_range(bloom_start..bloom_start + bloom_size)
940 .await?;
941 Some(BloomFilter::from_bytes(&bloom_data)?)
942 } else {
943 None
944 };
945
946 let dictionary = if dict_offset > 0 {
948 let dict_start = dict_offset;
949 let dict_len_bytes = file_handle
951 .read_bytes_range(dict_start..dict_start + 4)
952 .await?;
953 let dict_len = u32::from_le_bytes([
954 dict_len_bytes[0],
955 dict_len_bytes[1],
956 dict_len_bytes[2],
957 dict_len_bytes[3],
958 ]) as u64;
959 let dict_data = file_handle
960 .read_bytes_range(dict_start + 4..dict_start + 4 + dict_len)
961 .await?;
962 Some(CompressionDict::from_bytes(dict_data.to_vec()))
963 } else {
964 None
965 };
966
967 let data_slice = file_handle.slice(0..data_end_offset);
969
970 Ok(Self {
971 data_slice,
972 block_index,
973 num_entries,
974 cache: RwLock::new(BlockCache::new(cache_blocks)),
975 bloom_filter,
976 dictionary,
977 compression_level,
978 _phantom: std::marker::PhantomData,
979 })
980 }
981
982 pub fn num_entries(&self) -> u64 {
984 self.num_entries
985 }
986
987 pub fn stats(&self) -> SSTableStats {
989 SSTableStats {
990 num_blocks: self.block_index.len(),
991 num_sparse_entries: 0, num_entries: self.num_entries,
993 has_bloom_filter: self.bloom_filter.is_some(),
994 has_dictionary: self.dictionary.is_some(),
995 bloom_filter_size: self
996 .bloom_filter
997 .as_ref()
998 .map(|b| b.size_bytes())
999 .unwrap_or(0),
1000 dictionary_size: self.dictionary.as_ref().map(|d| d.len()).unwrap_or(0),
1001 }
1002 }
1003
1004 pub async fn get(&self, key: &[u8]) -> io::Result<Option<V>> {
1009 log::debug!(
1010 "SSTable::get called, key_len={}, total_blocks={}",
1011 key.len(),
1012 self.block_index.len()
1013 );
1014
1015 if let Some(ref bloom) = self.bloom_filter
1017 && !bloom.may_contain(key)
1018 {
1019 log::debug!("SSTable::get bloom filter negative");
1020 return Ok(None);
1021 }
1022
1023 let block_idx = match self.block_index.locate(key) {
1025 Some(idx) => idx,
1026 None => {
1027 log::debug!("SSTable::get key not found (before first block)");
1028 return Ok(None);
1029 }
1030 };
1031
1032 log::debug!("SSTable::get loading block_idx={}", block_idx);
1033
1034 let block_data = self.load_block(block_idx).await?;
1036 self.search_block(&block_data, key)
1037 }
1038
1039 pub async fn get_batch(&self, keys: &[&[u8]]) -> io::Result<Vec<Option<V>>> {
1045 if keys.is_empty() {
1046 return Ok(Vec::new());
1047 }
1048
1049 let mut key_to_block: Vec<(usize, usize)> = Vec::with_capacity(keys.len());
1051 for (key_idx, key) in keys.iter().enumerate() {
1052 if let Some(ref bloom) = self.bloom_filter
1054 && !bloom.may_contain(key)
1055 {
1056 key_to_block.push((key_idx, usize::MAX)); continue;
1058 }
1059
1060 match self.block_index.locate(key) {
1061 Some(block_idx) => key_to_block.push((key_idx, block_idx)),
1062 None => key_to_block.push((key_idx, usize::MAX)), }
1064 }
1065
1066 let mut blocks_to_load: Vec<usize> = key_to_block
1068 .iter()
1069 .filter(|(_, b)| *b != usize::MAX)
1070 .map(|(_, b)| *b)
1071 .collect();
1072 blocks_to_load.sort_unstable();
1073 blocks_to_load.dedup();
1074
1075 for &block_idx in &blocks_to_load {
1077 let _ = self.load_block(block_idx).await?;
1078 }
1079
1080 let mut results = vec![None; keys.len()];
1082 for (key_idx, block_idx) in key_to_block {
1083 if block_idx == usize::MAX {
1084 continue;
1085 }
1086 let block_data = self.load_block(block_idx).await?; results[key_idx] = self.search_block(&block_data, keys[key_idx])?;
1088 }
1089
1090 Ok(results)
1091 }
1092
1093 pub async fn preload_all_blocks(&self) -> io::Result<()> {
1098 for block_idx in 0..self.block_index.len() {
1099 self.load_block(block_idx).await?;
1100 }
1101 Ok(())
1102 }
1103
1104 async fn load_block(&self, block_idx: usize) -> io::Result<Arc<Vec<u8>>> {
1107 let addr = self.block_index.get_addr(block_idx).ok_or_else(|| {
1108 io::Error::new(io::ErrorKind::InvalidInput, "Block index out of range")
1109 })?;
1110
1111 {
1113 let mut cache = self.cache.write();
1114 if let Some(block) = cache.get(addr.offset) {
1115 log::debug!("SSTable::load_block idx={} CACHE HIT", block_idx);
1116 return Ok(block);
1117 }
1118 }
1119
1120 log::debug!(
1121 "SSTable::load_block idx={} CACHE MISS, reading bytes [{}-{}]",
1122 block_idx,
1123 addr.offset,
1124 addr.offset + addr.length as u64
1125 );
1126
1127 let range = addr.byte_range();
1129 let compressed = self.data_slice.read_bytes_range(range).await?;
1130
1131 let decompressed = if let Some(ref dict) = self.dictionary {
1133 crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
1134 } else {
1135 crate::compression::decompress(compressed.as_slice())?
1136 };
1137
1138 let block = Arc::new(decompressed);
1139
1140 {
1142 let mut cache = self.cache.write();
1143 cache.insert(addr.offset, Arc::clone(&block));
1144 }
1145
1146 Ok(block)
1147 }
1148
1149 fn search_block(&self, block_data: &[u8], target_key: &[u8]) -> io::Result<Option<V>> {
1150 let mut reader = block_data;
1151 let mut current_key = Vec::new();
1152
1153 while !reader.is_empty() {
1154 let common_prefix_len = read_vint(&mut reader)? as usize;
1155 let suffix_len = read_vint(&mut reader)? as usize;
1156
1157 current_key.truncate(common_prefix_len);
1158 let mut suffix = vec![0u8; suffix_len];
1159 reader.read_exact(&mut suffix)?;
1160 current_key.extend_from_slice(&suffix);
1161
1162 let value = V::deserialize(&mut reader)?;
1163
1164 match current_key.as_slice().cmp(target_key) {
1165 std::cmp::Ordering::Equal => return Ok(Some(value)),
1166 std::cmp::Ordering::Greater => return Ok(None),
1167 std::cmp::Ordering::Less => continue,
1168 }
1169 }
1170
1171 Ok(None)
1172 }
1173
1174 pub async fn prefetch_range(&self, start_key: &[u8], end_key: &[u8]) -> io::Result<()> {
1176 let start_block = self.block_index.locate(start_key).unwrap_or(0);
1177 let end_block = self
1178 .block_index
1179 .locate(end_key)
1180 .unwrap_or(self.block_index.len().saturating_sub(1));
1181
1182 for block_idx in start_block..=end_block.min(self.block_index.len().saturating_sub(1)) {
1183 let _ = self.load_block(block_idx).await?;
1184 }
1185
1186 Ok(())
1187 }
1188
1189 pub fn iter(&self) -> AsyncSSTableIterator<'_, V> {
1191 AsyncSSTableIterator::new(self)
1192 }
1193
1194 pub async fn all_entries(&self) -> io::Result<Vec<(Vec<u8>, V)>> {
1196 let mut results = Vec::new();
1197
1198 for block_idx in 0..self.block_index.len() {
1199 let block_data = self.load_block(block_idx).await?;
1200 let mut reader = block_data.as_slice();
1201 let mut current_key = Vec::new();
1202
1203 while !reader.is_empty() {
1204 let common_prefix_len = read_vint(&mut reader)? as usize;
1205 let suffix_len = read_vint(&mut reader)? as usize;
1206
1207 current_key.truncate(common_prefix_len);
1208 let mut suffix = vec![0u8; suffix_len];
1209 reader.read_exact(&mut suffix)?;
1210 current_key.extend_from_slice(&suffix);
1211
1212 let value = V::deserialize(&mut reader)?;
1213 results.push((current_key.clone(), value));
1214 }
1215 }
1216
1217 Ok(results)
1218 }
1219}
1220
1221pub struct AsyncSSTableIterator<'a, V: SSTableValue> {
1223 reader: &'a AsyncSSTableReader<V>,
1224 current_block: usize,
1225 block_data: Option<Arc<Vec<u8>>>,
1226 block_offset: usize,
1227 current_key: Vec<u8>,
1228 finished: bool,
1229}
1230
1231impl<'a, V: SSTableValue> AsyncSSTableIterator<'a, V> {
1232 fn new(reader: &'a AsyncSSTableReader<V>) -> Self {
1233 Self {
1234 reader,
1235 current_block: 0,
1236 block_data: None,
1237 block_offset: 0,
1238 current_key: Vec::new(),
1239 finished: reader.block_index.is_empty(),
1240 }
1241 }
1242
1243 async fn load_next_block(&mut self) -> io::Result<bool> {
1244 if self.current_block >= self.reader.block_index.len() {
1245 self.finished = true;
1246 return Ok(false);
1247 }
1248
1249 self.block_data = Some(self.reader.load_block(self.current_block).await?);
1250 self.block_offset = 0;
1251 self.current_key.clear();
1252 self.current_block += 1;
1253 Ok(true)
1254 }
1255
1256 pub async fn next(&mut self) -> io::Result<Option<(Vec<u8>, V)>> {
1258 if self.finished {
1259 return Ok(None);
1260 }
1261
1262 if self.block_data.is_none() && !self.load_next_block().await? {
1263 return Ok(None);
1264 }
1265
1266 loop {
1267 let block = self.block_data.as_ref().unwrap();
1268 if self.block_offset >= block.len() {
1269 if !self.load_next_block().await? {
1270 return Ok(None);
1271 }
1272 continue;
1273 }
1274
1275 let mut reader = &block[self.block_offset..];
1276 let start_len = reader.len();
1277
1278 let common_prefix_len = read_vint(&mut reader)? as usize;
1279 let suffix_len = read_vint(&mut reader)? as usize;
1280
1281 self.current_key.truncate(common_prefix_len);
1282 let mut suffix = vec![0u8; suffix_len];
1283 reader.read_exact(&mut suffix)?;
1284 self.current_key.extend_from_slice(&suffix);
1285
1286 let value = V::deserialize(&mut reader)?;
1287
1288 self.block_offset += start_len - reader.len();
1289
1290 return Ok(Some((self.current_key.clone(), value)));
1291 }
1292 }
1293}
1294
1295#[cfg(test)]
1296mod tests {
1297 use super::*;
1298
1299 #[test]
1300 fn test_bloom_filter_basic() {
1301 let mut bloom = BloomFilter::new(100, 10);
1302
1303 bloom.insert(b"hello");
1304 bloom.insert(b"world");
1305 bloom.insert(b"test");
1306
1307 assert!(bloom.may_contain(b"hello"));
1308 assert!(bloom.may_contain(b"world"));
1309 assert!(bloom.may_contain(b"test"));
1310
1311 assert!(!bloom.may_contain(b"notfound"));
1313 assert!(!bloom.may_contain(b"missing"));
1314 }
1315
1316 #[test]
1317 fn test_bloom_filter_serialization() {
1318 let mut bloom = BloomFilter::new(100, 10);
1319 bloom.insert(b"key1");
1320 bloom.insert(b"key2");
1321
1322 let bytes = bloom.to_bytes();
1323 let restored = BloomFilter::from_bytes(&bytes).unwrap();
1324
1325 assert!(restored.may_contain(b"key1"));
1326 assert!(restored.may_contain(b"key2"));
1327 assert!(!restored.may_contain(b"key3"));
1328 }
1329
1330 #[test]
1331 fn test_bloom_filter_false_positive_rate() {
1332 let num_keys = 10000;
1333 let mut bloom = BloomFilter::new(num_keys, BLOOM_BITS_PER_KEY);
1334
1335 for i in 0..num_keys {
1337 let key = format!("key_{}", i);
1338 bloom.insert(key.as_bytes());
1339 }
1340
1341 for i in 0..num_keys {
1343 let key = format!("key_{}", i);
1344 assert!(bloom.may_contain(key.as_bytes()));
1345 }
1346
1347 let mut false_positives = 0;
1349 let test_count = 10000;
1350 for i in 0..test_count {
1351 let key = format!("nonexistent_{}", i);
1352 if bloom.may_contain(key.as_bytes()) {
1353 false_positives += 1;
1354 }
1355 }
1356
1357 let fp_rate = false_positives as f64 / test_count as f64;
1360 assert!(
1361 fp_rate < 0.03,
1362 "False positive rate {} is too high",
1363 fp_rate
1364 );
1365 }
1366
1367 #[test]
1368 fn test_sstable_writer_config() {
1369 use crate::structures::IndexOptimization;
1370
1371 let config = SSTableWriterConfig::default();
1373 assert_eq!(config.compression_level.0, 9); assert!(!config.use_bloom_filter);
1375 assert!(!config.use_dictionary);
1376
1377 let adaptive = SSTableWriterConfig::from_optimization(IndexOptimization::Adaptive);
1379 assert_eq!(adaptive.compression_level.0, 9);
1380 assert!(!adaptive.use_bloom_filter);
1381 assert!(!adaptive.use_dictionary);
1382
1383 let size = SSTableWriterConfig::from_optimization(IndexOptimization::SizeOptimized);
1385 assert_eq!(size.compression_level.0, 22); assert!(size.use_bloom_filter);
1387 assert!(size.use_dictionary);
1388
1389 let perf = SSTableWriterConfig::from_optimization(IndexOptimization::PerformanceOptimized);
1391 assert_eq!(perf.compression_level.0, 1); assert!(perf.use_bloom_filter); assert!(!perf.use_dictionary);
1394
1395 let fast = SSTableWriterConfig::fast();
1397 assert_eq!(fast.compression_level.0, 1);
1398
1399 let max = SSTableWriterConfig::max_compression();
1400 assert_eq!(max.compression_level.0, 22);
1401 }
1402
1403 #[test]
1404 fn test_vint_roundtrip() {
1405 let test_values = [0u64, 1, 127, 128, 255, 256, 16383, 16384, u64::MAX];
1406
1407 for &val in &test_values {
1408 let mut buf = Vec::new();
1409 write_vint(&mut buf, val).unwrap();
1410 let mut reader = buf.as_slice();
1411 let decoded = read_vint(&mut reader).unwrap();
1412 assert_eq!(val, decoded, "Failed for value {}", val);
1413 }
1414 }
1415
1416 #[test]
1417 fn test_common_prefix_len() {
1418 assert_eq!(common_prefix_len(b"hello", b"hello"), 5);
1419 assert_eq!(common_prefix_len(b"hello", b"help"), 3);
1420 assert_eq!(common_prefix_len(b"hello", b"world"), 0);
1421 assert_eq!(common_prefix_len(b"", b"hello"), 0);
1422 assert_eq!(common_prefix_len(b"hello", b""), 0);
1423 }
1424}