1use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
22use parking_lot::RwLock;
23use rustc_hash::FxHashMap;
24use std::io::{self, Read, Write};
25use std::sync::Arc;
26
27use crate::compression::{CompressionDict, CompressionLevel};
28use crate::directories::{AsyncFileRead, LazyFileHandle, LazyFileSlice};
29
30pub const SSTABLE_MAGIC: u32 = 0x53544233; pub const BLOCK_SIZE: usize = 16 * 1024;
35
36pub const SPARSE_INDEX_INTERVAL: usize = 16;
40
41pub const DEFAULT_DICT_SIZE: usize = 64 * 1024;
43
44pub const BLOOM_BITS_PER_KEY: usize = 10;
46
47pub const BLOOM_HASH_COUNT: usize = 7;
49
50#[derive(Debug, Clone)]
56pub struct BloomFilter {
57 bits: Vec<u64>,
58 num_bits: usize,
59 num_hashes: usize,
60}
61
62impl BloomFilter {
63 pub fn new(expected_keys: usize, bits_per_key: usize) -> Self {
65 let num_bits = (expected_keys * bits_per_key).max(64);
66 let num_words = num_bits.div_ceil(64);
67 Self {
68 bits: vec![0u64; num_words],
69 num_bits,
70 num_hashes: BLOOM_HASH_COUNT,
71 }
72 }
73
74 pub fn from_bytes(data: &[u8]) -> io::Result<Self> {
76 if data.len() < 12 {
77 return Err(io::Error::new(
78 io::ErrorKind::InvalidData,
79 "Bloom filter data too short",
80 ));
81 }
82 let mut reader = data;
83 let num_bits = reader.read_u32::<LittleEndian>()? as usize;
84 let num_hashes = reader.read_u32::<LittleEndian>()? as usize;
85 let num_words = reader.read_u32::<LittleEndian>()? as usize;
86
87 if reader.len() < num_words * 8 {
88 return Err(io::Error::new(
89 io::ErrorKind::InvalidData,
90 "Bloom filter data truncated",
91 ));
92 }
93
94 let mut bits = Vec::with_capacity(num_words);
95 for _ in 0..num_words {
96 bits.push(reader.read_u64::<LittleEndian>()?);
97 }
98
99 Ok(Self {
100 bits,
101 num_bits,
102 num_hashes,
103 })
104 }
105
106 pub fn to_bytes(&self) -> Vec<u8> {
108 let mut data = Vec::with_capacity(12 + self.bits.len() * 8);
109 data.write_u32::<LittleEndian>(self.num_bits as u32)
110 .unwrap();
111 data.write_u32::<LittleEndian>(self.num_hashes as u32)
112 .unwrap();
113 data.write_u32::<LittleEndian>(self.bits.len() as u32)
114 .unwrap();
115 for &word in &self.bits {
116 data.write_u64::<LittleEndian>(word).unwrap();
117 }
118 data
119 }
120
121 pub fn insert(&mut self, key: &[u8]) {
123 let (h1, h2) = self.hash_pair(key);
124 for i in 0..self.num_hashes {
125 let bit_pos = self.get_bit_pos(h1, h2, i);
126 let word_idx = bit_pos / 64;
127 let bit_idx = bit_pos % 64;
128 if word_idx < self.bits.len() {
129 self.bits[word_idx] |= 1u64 << bit_idx;
130 }
131 }
132 }
133
134 pub fn may_contain(&self, key: &[u8]) -> bool {
137 let (h1, h2) = self.hash_pair(key);
138 for i in 0..self.num_hashes {
139 let bit_pos = self.get_bit_pos(h1, h2, i);
140 let word_idx = bit_pos / 64;
141 let bit_idx = bit_pos % 64;
142 if word_idx >= self.bits.len() || (self.bits[word_idx] & (1u64 << bit_idx)) == 0 {
143 return false;
144 }
145 }
146 true
147 }
148
149 pub fn size_bytes(&self) -> usize {
151 12 + self.bits.len() * 8
152 }
153
154 fn hash_pair(&self, key: &[u8]) -> (u64, u64) {
156 let mut h1: u64 = 0xcbf29ce484222325;
158 for &byte in key {
159 h1 ^= byte as u64;
160 h1 = h1.wrapping_mul(0x100000001b3);
161 }
162
163 let mut h2: u64 = 0x84222325cbf29ce4;
165 for &byte in key {
166 h2 = h2.wrapping_mul(0x100000001b3);
167 h2 ^= byte as u64;
168 }
169
170 (h1, h2)
171 }
172
173 #[inline]
175 fn get_bit_pos(&self, h1: u64, h2: u64, i: usize) -> usize {
176 (h1.wrapping_add((i as u64).wrapping_mul(h2)) % (self.num_bits as u64)) as usize
177 }
178}
179
180pub trait SSTableValue: Clone + Send + Sync {
182 fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()>;
183 fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self>;
184}
185
186impl SSTableValue for u64 {
188 fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
189 write_vint(writer, *self)
190 }
191
192 fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
193 read_vint(reader)
194 }
195}
196
197impl SSTableValue for Vec<u8> {
199 fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
200 write_vint(writer, self.len() as u64)?;
201 writer.write_all(self)
202 }
203
204 fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
205 let len = read_vint(reader)? as usize;
206 let mut data = vec![0u8; len];
207 reader.read_exact(&mut data)?;
208 Ok(data)
209 }
210}
211
212pub const MAX_INLINE_POSTINGS: usize = 3;
214
215#[derive(Debug, Clone, PartialEq, Eq)]
223pub enum TermInfo {
224 Inline {
227 doc_freq: u8,
229 data: [u8; 16],
232 data_len: u8,
234 },
235 External {
237 posting_offset: u64,
238 posting_len: u32,
239 doc_freq: u32,
240 },
241}
242
243impl TermInfo {
244 pub fn external(posting_offset: u64, posting_len: u32, doc_freq: u32) -> Self {
246 TermInfo::External {
247 posting_offset,
248 posting_len,
249 doc_freq,
250 }
251 }
252
253 pub fn try_inline(doc_ids: &[u32], term_freqs: &[u32]) -> Option<Self> {
256 if doc_ids.len() > MAX_INLINE_POSTINGS || doc_ids.is_empty() {
257 return None;
258 }
259
260 let mut data = [0u8; 16];
261 let mut cursor = std::io::Cursor::new(&mut data[..]);
262 let mut prev_doc_id = 0u32;
263
264 for (i, &doc_id) in doc_ids.iter().enumerate() {
265 let delta = doc_id - prev_doc_id;
266 if write_vint(&mut cursor, delta as u64).is_err() {
267 return None;
268 }
269 if write_vint(&mut cursor, term_freqs[i] as u64).is_err() {
270 return None;
271 }
272 prev_doc_id = doc_id;
273 }
274
275 let data_len = cursor.position() as u8;
276 if data_len > 16 {
277 return None;
278 }
279
280 Some(TermInfo::Inline {
281 doc_freq: doc_ids.len() as u8,
282 data,
283 data_len,
284 })
285 }
286
287 pub fn doc_freq(&self) -> u32 {
289 match self {
290 TermInfo::Inline { doc_freq, .. } => *doc_freq as u32,
291 TermInfo::External { doc_freq, .. } => *doc_freq,
292 }
293 }
294
295 pub fn is_inline(&self) -> bool {
297 matches!(self, TermInfo::Inline { .. })
298 }
299
300 pub fn external_info(&self) -> Option<(u64, u32)> {
302 match self {
303 TermInfo::External {
304 posting_offset,
305 posting_len,
306 ..
307 } => Some((*posting_offset, *posting_len)),
308 TermInfo::Inline { .. } => None,
309 }
310 }
311
312 pub fn decode_inline(&self) -> Option<(Vec<u32>, Vec<u32>)> {
315 match self {
316 TermInfo::Inline {
317 doc_freq,
318 data,
319 data_len,
320 } => {
321 let mut doc_ids = Vec::with_capacity(*doc_freq as usize);
322 let mut term_freqs = Vec::with_capacity(*doc_freq as usize);
323 let mut reader = &data[..*data_len as usize];
324 let mut prev_doc_id = 0u32;
325
326 for _ in 0..*doc_freq {
327 let delta = read_vint(&mut reader).ok()? as u32;
328 let tf = read_vint(&mut reader).ok()? as u32;
329 let doc_id = prev_doc_id + delta;
330 doc_ids.push(doc_id);
331 term_freqs.push(tf);
332 prev_doc_id = doc_id;
333 }
334
335 Some((doc_ids, term_freqs))
336 }
337 TermInfo::External { .. } => None,
338 }
339 }
340}
341
342impl SSTableValue for TermInfo {
343 fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
344 match self {
345 TermInfo::Inline {
346 doc_freq,
347 data,
348 data_len,
349 } => {
350 writer.write_u8(0xFF)?;
352 writer.write_u8(*doc_freq)?;
353 writer.write_u8(*data_len)?;
354 writer.write_all(&data[..*data_len as usize])?;
355 }
356 TermInfo::External {
357 posting_offset,
358 posting_len,
359 doc_freq,
360 } => {
361 writer.write_u8(0x00)?;
363 write_vint(writer, *doc_freq as u64)?;
364 write_vint(writer, *posting_offset)?;
365 write_vint(writer, *posting_len as u64)?;
366 }
367 }
368 Ok(())
369 }
370
371 fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
372 let tag = reader.read_u8()?;
373
374 if tag == 0xFF {
375 let doc_freq = reader.read_u8()?;
377 let data_len = reader.read_u8()?;
378 let mut data = [0u8; 16];
379 reader.read_exact(&mut data[..data_len as usize])?;
380 Ok(TermInfo::Inline {
381 doc_freq,
382 data,
383 data_len,
384 })
385 } else if tag == 0x00 {
386 let doc_freq = read_vint(reader)? as u32;
388 let posting_offset = read_vint(reader)?;
389 let posting_len = read_vint(reader)? as u32;
390 Ok(TermInfo::External {
391 posting_offset,
392 posting_len,
393 doc_freq,
394 })
395 } else {
396 Err(io::Error::new(
397 io::ErrorKind::InvalidData,
398 format!("Invalid TermInfo tag: {}", tag),
399 ))
400 }
401 }
402}
403
404pub fn write_vint<W: Write + ?Sized>(writer: &mut W, mut value: u64) -> io::Result<()> {
406 loop {
407 let byte = (value & 0x7F) as u8;
408 value >>= 7;
409 if value == 0 {
410 writer.write_u8(byte)?;
411 return Ok(());
412 } else {
413 writer.write_u8(byte | 0x80)?;
414 }
415 }
416}
417
418pub fn read_vint<R: Read>(reader: &mut R) -> io::Result<u64> {
420 let mut result = 0u64;
421 let mut shift = 0;
422
423 loop {
424 let byte = reader.read_u8()?;
425 result |= ((byte & 0x7F) as u64) << shift;
426 if byte & 0x80 == 0 {
427 return Ok(result);
428 }
429 shift += 7;
430 if shift >= 64 {
431 return Err(io::Error::new(
432 io::ErrorKind::InvalidData,
433 "varint too long",
434 ));
435 }
436 }
437}
438
439pub fn common_prefix_len(a: &[u8], b: &[u8]) -> usize {
441 a.iter().zip(b.iter()).take_while(|(x, y)| x == y).count()
442}
443
444#[derive(Debug, Clone)]
446struct SparseIndexEntry {
447 first_key: Vec<u8>,
449 block_idx: u32,
451}
452
453#[derive(Debug, Clone)]
455pub struct SSTableStats {
456 pub num_blocks: usize,
457 pub num_sparse_entries: usize,
458 pub num_entries: u64,
459 pub has_bloom_filter: bool,
460 pub has_dictionary: bool,
461 pub bloom_filter_size: usize,
462 pub dictionary_size: usize,
463}
464
465#[derive(Debug, Clone)]
467pub struct SSTableWriterConfig {
468 pub compression_level: CompressionLevel,
470 pub use_dictionary: bool,
472 pub dict_size: usize,
474 pub use_bloom_filter: bool,
476 pub bloom_bits_per_key: usize,
478}
479
480impl Default for SSTableWriterConfig {
481 fn default() -> Self {
482 Self::from_optimization(crate::structures::IndexOptimization::default())
483 }
484}
485
486impl SSTableWriterConfig {
487 pub fn from_optimization(optimization: crate::structures::IndexOptimization) -> Self {
489 use crate::structures::IndexOptimization;
490 match optimization {
491 IndexOptimization::Adaptive => Self {
492 compression_level: CompressionLevel::BETTER, use_dictionary: false,
494 dict_size: DEFAULT_DICT_SIZE,
495 use_bloom_filter: false,
496 bloom_bits_per_key: BLOOM_BITS_PER_KEY,
497 },
498 IndexOptimization::SizeOptimized => Self {
499 compression_level: CompressionLevel::MAX, use_dictionary: true,
501 dict_size: DEFAULT_DICT_SIZE,
502 use_bloom_filter: true,
503 bloom_bits_per_key: BLOOM_BITS_PER_KEY,
504 },
505 IndexOptimization::PerformanceOptimized => Self {
506 compression_level: CompressionLevel::FAST, use_dictionary: false,
508 dict_size: DEFAULT_DICT_SIZE,
509 use_bloom_filter: true, bloom_bits_per_key: BLOOM_BITS_PER_KEY,
511 },
512 }
513 }
514
515 pub fn fast() -> Self {
517 Self::from_optimization(crate::structures::IndexOptimization::PerformanceOptimized)
518 }
519
520 pub fn max_compression() -> Self {
522 Self::from_optimization(crate::structures::IndexOptimization::SizeOptimized)
523 }
524}
525
526pub struct SSTableWriter<'a, V: SSTableValue> {
532 writer: &'a mut dyn Write,
533 block_buffer: Vec<u8>,
534 prev_key: Vec<u8>,
535 index: Vec<BlockIndexEntry>,
536 current_offset: u64,
537 num_entries: u64,
538 block_first_key: Option<Vec<u8>>,
539 config: SSTableWriterConfig,
540 dictionary: Option<CompressionDict>,
542 all_keys: Vec<Vec<u8>>,
544 bloom_filter: Option<BloomFilter>,
546 _phantom: std::marker::PhantomData<V>,
547}
548
549impl<'a, V: SSTableValue> SSTableWriter<'a, V> {
550 pub fn new(writer: &'a mut dyn Write) -> Self {
552 Self::with_config(writer, SSTableWriterConfig::default())
553 }
554
555 pub fn with_config(writer: &'a mut dyn Write, config: SSTableWriterConfig) -> Self {
557 Self {
558 writer,
559 block_buffer: Vec::with_capacity(BLOCK_SIZE),
560 prev_key: Vec::new(),
561 index: Vec::new(),
562 current_offset: 0,
563 num_entries: 0,
564 block_first_key: None,
565 config,
566 dictionary: None,
567 all_keys: Vec::new(),
568 bloom_filter: None,
569 _phantom: std::marker::PhantomData,
570 }
571 }
572
573 pub fn with_dictionary(
575 writer: &'a mut dyn Write,
576 config: SSTableWriterConfig,
577 dictionary: CompressionDict,
578 ) -> Self {
579 Self {
580 writer,
581 block_buffer: Vec::with_capacity(BLOCK_SIZE),
582 prev_key: Vec::new(),
583 index: Vec::new(),
584 current_offset: 0,
585 num_entries: 0,
586 block_first_key: None,
587 config,
588 dictionary: Some(dictionary),
589 all_keys: Vec::new(),
590 bloom_filter: None,
591 _phantom: std::marker::PhantomData,
592 }
593 }
594
595 pub fn insert(&mut self, key: &[u8], value: &V) -> io::Result<()> {
596 if self.block_first_key.is_none() {
597 self.block_first_key = Some(key.to_vec());
598 }
599
600 if self.config.use_bloom_filter {
602 self.all_keys.push(key.to_vec());
603 }
604
605 let prefix_len = common_prefix_len(&self.prev_key, key);
606 let suffix = &key[prefix_len..];
607
608 write_vint(&mut self.block_buffer, prefix_len as u64)?;
609 write_vint(&mut self.block_buffer, suffix.len() as u64)?;
610 self.block_buffer.extend_from_slice(suffix);
611 value.serialize(&mut self.block_buffer)?;
612
613 self.prev_key.clear();
614 self.prev_key.extend_from_slice(key);
615 self.num_entries += 1;
616
617 if self.block_buffer.len() >= BLOCK_SIZE {
618 self.flush_block()?;
619 }
620
621 Ok(())
622 }
623
624 fn flush_block(&mut self) -> io::Result<()> {
626 if self.block_buffer.is_empty() {
627 return Ok(());
628 }
629
630 let compressed = if let Some(ref dict) = self.dictionary {
632 crate::compression::compress_with_dict(
633 &self.block_buffer,
634 self.config.compression_level,
635 dict,
636 )?
637 } else {
638 crate::compression::compress(&self.block_buffer, self.config.compression_level)?
639 };
640
641 if let Some(first_key) = self.block_first_key.take() {
642 self.index.push(BlockIndexEntry {
643 first_key,
644 offset: self.current_offset,
645 length: compressed.len() as u32,
646 });
647 }
648
649 self.writer.write_all(&compressed)?;
650 self.current_offset += compressed.len() as u64;
651 self.block_buffer.clear();
652 self.prev_key.clear();
653
654 Ok(())
655 }
656
657 pub fn finish(mut self) -> io::Result<()> {
658 self.flush_block()?;
660
661 if self.config.use_bloom_filter && !self.all_keys.is_empty() {
663 let mut bloom = BloomFilter::new(self.all_keys.len(), self.config.bloom_bits_per_key);
664 for key in &self.all_keys {
665 bloom.insert(key);
666 }
667 self.bloom_filter = Some(bloom);
668 }
669
670 let data_end_offset = self.current_offset;
671
672 let sparse_index: Vec<SparseIndexEntry> = self
674 .index
675 .iter()
676 .enumerate()
677 .filter(|(i, _)| *i % SPARSE_INDEX_INTERVAL == 0)
678 .map(|(i, entry)| SparseIndexEntry {
679 first_key: entry.first_key.clone(),
680 block_idx: i as u32,
681 })
682 .collect();
683
684 let index_clone = self.index.clone();
686 self.write_block_index_compressed(&index_clone)?;
687
688 self.writer
690 .write_u32::<LittleEndian>(sparse_index.len() as u32)?;
691 for entry in &sparse_index {
692 self.writer
693 .write_u16::<LittleEndian>(entry.first_key.len() as u16)?;
694 self.writer.write_all(&entry.first_key)?;
695 self.writer.write_u32::<LittleEndian>(entry.block_idx)?;
696 }
697
698 let bloom_offset = if let Some(ref bloom) = self.bloom_filter {
700 let bloom_data = bloom.to_bytes();
701 let offset = self.current_offset;
702 self.writer.write_all(&bloom_data)?;
703 self.current_offset += bloom_data.len() as u64;
704 offset
705 } else {
706 0
707 };
708
709 let dict_offset = if let Some(ref dict) = self.dictionary {
711 let dict_bytes = dict.as_bytes();
712 let offset = self.current_offset;
713 self.writer
714 .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
715 self.writer.write_all(dict_bytes)?;
716 self.current_offset += 4 + dict_bytes.len() as u64;
717 offset
718 } else {
719 0
720 };
721
722 self.writer.write_u64::<LittleEndian>(data_end_offset)?;
724 self.writer.write_u64::<LittleEndian>(self.num_entries)?;
725 self.writer.write_u64::<LittleEndian>(bloom_offset)?; self.writer.write_u64::<LittleEndian>(dict_offset)?; self.writer
728 .write_u8(self.config.compression_level.0 as u8)?;
729 self.writer.write_u32::<LittleEndian>(SSTABLE_MAGIC)?;
730
731 Ok(())
732 }
733
734 fn write_block_index_compressed(&mut self, index: &[BlockIndexEntry]) -> io::Result<()> {
736 self.writer.write_u32::<LittleEndian>(index.len() as u32)?;
737
738 let mut prev_key: Vec<u8> = Vec::new();
739 for entry in index {
740 let prefix_len = common_prefix_len(&prev_key, &entry.first_key);
742 let suffix = &entry.first_key[prefix_len..];
743
744 write_vint(&mut *self.writer, prefix_len as u64)?;
746 write_vint(&mut *self.writer, suffix.len() as u64)?;
747 self.writer.write_all(suffix)?;
748 write_vint(&mut *self.writer, entry.offset)?;
749 write_vint(&mut *self.writer, entry.length as u64)?;
750
751 prev_key.clear();
752 prev_key.extend_from_slice(&entry.first_key);
753 }
754
755 Ok(())
756 }
757}
758
759#[derive(Debug, Clone)]
761struct BlockIndexEntry {
762 first_key: Vec<u8>,
763 offset: u64,
764 length: u32,
765}
766
767pub struct AsyncSSTableReader<V: SSTableValue> {
769 data_slice: LazyFileSlice,
771 index: Vec<BlockIndexEntry>,
773 sparse_index: Vec<SparseIndexEntry>,
775 num_entries: u64,
776 cache: RwLock<BlockCache>,
778 bloom_filter: Option<BloomFilter>,
780 dictionary: Option<CompressionDict>,
782 #[allow(dead_code)]
784 compression_level: CompressionLevel,
785 _phantom: std::marker::PhantomData<V>,
786}
787
788struct BlockCache {
790 blocks: FxHashMap<u64, Arc<Vec<u8>>>,
791 access_order: Vec<u64>,
792 max_blocks: usize,
793}
794
795impl BlockCache {
796 fn new(max_blocks: usize) -> Self {
797 Self {
798 blocks: FxHashMap::default(),
799 access_order: Vec::new(),
800 max_blocks,
801 }
802 }
803
804 fn get(&mut self, offset: u64) -> Option<Arc<Vec<u8>>> {
805 if let Some(block) = self.blocks.get(&offset) {
806 if let Some(pos) = self.access_order.iter().position(|&o| o == offset) {
807 self.access_order.remove(pos);
808 self.access_order.push(offset);
809 }
810 Some(Arc::clone(block))
811 } else {
812 None
813 }
814 }
815
816 fn insert(&mut self, offset: u64, block: Arc<Vec<u8>>) {
817 while self.blocks.len() >= self.max_blocks && !self.access_order.is_empty() {
818 let evict_offset = self.access_order.remove(0);
819 self.blocks.remove(&evict_offset);
820 }
821 self.blocks.insert(offset, block);
822 self.access_order.push(offset);
823 }
824}
825
826impl<V: SSTableValue> AsyncSSTableReader<V> {
827 pub async fn open(file_handle: LazyFileHandle, cache_blocks: usize) -> io::Result<Self> {
830 let file_len = file_handle.len();
831 if file_len < 37 {
832 return Err(io::Error::new(
833 io::ErrorKind::InvalidData,
834 "SSTable too small",
835 ));
836 }
837
838 let footer_bytes = file_handle
841 .read_bytes_range(file_len - 37..file_len)
842 .await?;
843
844 let mut reader = footer_bytes.as_slice();
845 let data_end_offset = reader.read_u64::<LittleEndian>()?;
846 let num_entries = reader.read_u64::<LittleEndian>()?;
847 let bloom_offset = reader.read_u64::<LittleEndian>()?;
848 let dict_offset = reader.read_u64::<LittleEndian>()?;
849 let compression_level = CompressionLevel(reader.read_u8()? as i32);
850 let magic = reader.read_u32::<LittleEndian>()?;
851
852 if magic != SSTABLE_MAGIC {
853 return Err(io::Error::new(
854 io::ErrorKind::InvalidData,
855 format!("Invalid SSTable magic: 0x{:08X}", magic),
856 ));
857 }
858
859 let index_start = data_end_offset as usize;
861 let index_end = file_len - 37;
862 let index_bytes = file_handle.read_bytes_range(index_start..index_end).await?;
863 let mut reader = index_bytes.as_slice();
864
865 let num_blocks = reader.read_u32::<LittleEndian>()? as usize;
867 let mut index = Vec::with_capacity(num_blocks);
868 let mut prev_key: Vec<u8> = Vec::new();
869
870 for _ in 0..num_blocks {
871 let prefix_len = read_vint(&mut reader)? as usize;
872 let suffix_len = read_vint(&mut reader)? as usize;
873 let mut suffix = vec![0u8; suffix_len];
874 reader.read_exact(&mut suffix)?;
875
876 let mut first_key = prev_key[..prefix_len.min(prev_key.len())].to_vec();
878 first_key.extend_from_slice(&suffix);
879
880 let offset = read_vint(&mut reader)?;
881 let length = read_vint(&mut reader)? as u32;
882
883 prev_key = first_key.clone();
884 index.push(BlockIndexEntry {
885 first_key,
886 offset,
887 length,
888 });
889 }
890
891 let num_sparse = reader.read_u32::<LittleEndian>()? as usize;
893 let mut sparse_index = Vec::with_capacity(num_sparse);
894
895 for _ in 0..num_sparse {
896 let key_len = reader.read_u16::<LittleEndian>()? as usize;
897 let mut first_key = vec![0u8; key_len];
898 reader.read_exact(&mut first_key)?;
899 let block_idx = reader.read_u32::<LittleEndian>()?;
900
901 sparse_index.push(SparseIndexEntry {
902 first_key,
903 block_idx,
904 });
905 }
906
907 let bloom_filter = if bloom_offset > 0 {
909 let bloom_start = bloom_offset as usize;
910 let bloom_header = file_handle
912 .read_bytes_range(bloom_start..bloom_start + 12)
913 .await?;
914 let num_words = u32::from_le_bytes([
915 bloom_header[8],
916 bloom_header[9],
917 bloom_header[10],
918 bloom_header[11],
919 ]) as usize;
920 let bloom_size = 12 + num_words * 8;
921 let bloom_data = file_handle
922 .read_bytes_range(bloom_start..bloom_start + bloom_size)
923 .await?;
924 Some(BloomFilter::from_bytes(&bloom_data)?)
925 } else {
926 None
927 };
928
929 let dictionary = if dict_offset > 0 {
931 let dict_start = dict_offset as usize;
932 let dict_len_bytes = file_handle
934 .read_bytes_range(dict_start..dict_start + 4)
935 .await?;
936 let dict_len = u32::from_le_bytes([
937 dict_len_bytes[0],
938 dict_len_bytes[1],
939 dict_len_bytes[2],
940 dict_len_bytes[3],
941 ]) as usize;
942 let dict_data = file_handle
943 .read_bytes_range(dict_start + 4..dict_start + 4 + dict_len)
944 .await?;
945 Some(CompressionDict::from_bytes(dict_data.to_vec()))
946 } else {
947 None
948 };
949
950 let data_slice = file_handle.slice(0..data_end_offset as usize);
952
953 Ok(Self {
954 data_slice,
955 index,
956 sparse_index,
957 num_entries,
958 cache: RwLock::new(BlockCache::new(cache_blocks)),
959 bloom_filter,
960 dictionary,
961 compression_level,
962 _phantom: std::marker::PhantomData,
963 })
964 }
965
966 pub fn num_entries(&self) -> u64 {
968 self.num_entries
969 }
970
971 pub fn stats(&self) -> SSTableStats {
973 SSTableStats {
974 num_blocks: self.index.len(),
975 num_sparse_entries: self.sparse_index.len(),
976 num_entries: self.num_entries,
977 has_bloom_filter: self.bloom_filter.is_some(),
978 has_dictionary: self.dictionary.is_some(),
979 bloom_filter_size: self
980 .bloom_filter
981 .as_ref()
982 .map(|b| b.size_bytes())
983 .unwrap_or(0),
984 dictionary_size: self.dictionary.as_ref().map(|d| d.len()).unwrap_or(0),
985 }
986 }
987
988 pub async fn get(&self, key: &[u8]) -> io::Result<Option<V>> {
993 log::debug!(
994 "SSTable::get called, key_len={}, total_blocks={}, sparse_entries={}",
995 key.len(),
996 self.index.len(),
997 self.sparse_index.len()
998 );
999
1000 if let Some(ref bloom) = self.bloom_filter
1002 && !bloom.may_contain(key)
1003 {
1004 log::debug!("SSTable::get bloom filter negative");
1005 return Ok(None);
1006 }
1007
1008 let (start_block, end_block) = self.find_block_range(key);
1010 log::debug!("SSTable::get sparse_range=[{}, {}]", start_block, end_block);
1011
1012 let search_range = &self.index[start_block..=end_block];
1014 let block_idx =
1015 match search_range.binary_search_by(|entry| entry.first_key.as_slice().cmp(key)) {
1016 Ok(idx) => start_block + idx,
1017 Err(0) => {
1018 if start_block == 0 {
1019 log::debug!("SSTable::get key not found (before first block)");
1020 return Ok(None);
1021 }
1022 start_block
1023 }
1024 Err(idx) => start_block + idx - 1,
1025 };
1026
1027 log::debug!("SSTable::get loading block_idx={}", block_idx);
1028
1029 let block_data = self.load_block(block_idx).await?;
1031 self.search_block(&block_data, key)
1032 }
1033
1034 pub async fn get_batch(&self, keys: &[&[u8]]) -> io::Result<Vec<Option<V>>> {
1040 if keys.is_empty() {
1041 return Ok(Vec::new());
1042 }
1043
1044 let mut key_to_block: Vec<(usize, usize)> = Vec::with_capacity(keys.len());
1046 for (key_idx, key) in keys.iter().enumerate() {
1047 if let Some(ref bloom) = self.bloom_filter
1049 && !bloom.may_contain(key)
1050 {
1051 key_to_block.push((key_idx, usize::MAX)); continue;
1053 }
1054
1055 let (start_block, end_block) = self.find_block_range(key);
1056 let search_range = &self.index[start_block..=end_block];
1057 let block_idx =
1058 match search_range.binary_search_by(|entry| entry.first_key.as_slice().cmp(key)) {
1059 Ok(idx) => start_block + idx,
1060 Err(0) => {
1061 if start_block == 0 {
1062 key_to_block.push((key_idx, usize::MAX)); continue;
1064 }
1065 start_block
1066 }
1067 Err(idx) => start_block + idx - 1,
1068 };
1069 key_to_block.push((key_idx, block_idx));
1070 }
1071
1072 let mut blocks_to_load: Vec<usize> = key_to_block
1074 .iter()
1075 .filter(|(_, b)| *b != usize::MAX)
1076 .map(|(_, b)| *b)
1077 .collect();
1078 blocks_to_load.sort_unstable();
1079 blocks_to_load.dedup();
1080
1081 for &block_idx in &blocks_to_load {
1083 let _ = self.load_block(block_idx).await?;
1084 }
1085
1086 let mut results = vec![None; keys.len()];
1088 for (key_idx, block_idx) in key_to_block {
1089 if block_idx == usize::MAX {
1090 continue;
1091 }
1092 let block_data = self.load_block(block_idx).await?; results[key_idx] = self.search_block(&block_data, keys[key_idx])?;
1094 }
1095
1096 Ok(results)
1097 }
1098
1099 fn find_block_range(&self, key: &[u8]) -> (usize, usize) {
1103 if self.sparse_index.is_empty() {
1104 return (0, self.index.len().saturating_sub(1));
1105 }
1106
1107 let sparse_pos = match self
1109 .sparse_index
1110 .binary_search_by(|entry| entry.first_key.as_slice().cmp(key))
1111 {
1112 Ok(idx) => idx,
1113 Err(0) => 0,
1114 Err(idx) => idx - 1,
1115 };
1116
1117 let start_block = self.sparse_index[sparse_pos].block_idx as usize;
1118 let end_block = if sparse_pos + 1 < self.sparse_index.len() {
1119 (self.sparse_index[sparse_pos + 1].block_idx as usize).saturating_sub(1)
1121 } else {
1122 self.index.len().saturating_sub(1)
1124 };
1125
1126 (start_block, end_block.max(start_block))
1127 }
1128
1129 pub async fn preload_all_blocks(&self) -> io::Result<()> {
1134 for block_idx in 0..self.index.len() {
1135 self.load_block(block_idx).await?;
1136 }
1137 Ok(())
1138 }
1139
1140 async fn load_block(&self, block_idx: usize) -> io::Result<Arc<Vec<u8>>> {
1143 let entry = &self.index[block_idx];
1144
1145 {
1147 let mut cache = self.cache.write();
1148 if let Some(block) = cache.get(entry.offset) {
1149 log::debug!("SSTable::load_block idx={} CACHE HIT", block_idx);
1150 return Ok(block);
1151 }
1152 }
1153
1154 log::debug!(
1155 "SSTable::load_block idx={} CACHE MISS, reading bytes [{}-{}]",
1156 block_idx,
1157 entry.offset,
1158 entry.offset + entry.length as u64
1159 );
1160
1161 let start = entry.offset as usize;
1163 let end = start + entry.length as usize;
1164 let compressed = self.data_slice.read_bytes_range(start..end).await?;
1165
1166 let decompressed = if let Some(ref dict) = self.dictionary {
1168 crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
1169 } else {
1170 crate::compression::decompress(compressed.as_slice())?
1171 };
1172
1173 let block = Arc::new(decompressed);
1174
1175 {
1177 let mut cache = self.cache.write();
1178 cache.insert(entry.offset, Arc::clone(&block));
1179 }
1180
1181 Ok(block)
1182 }
1183
1184 fn search_block(&self, block_data: &[u8], target_key: &[u8]) -> io::Result<Option<V>> {
1185 let mut reader = block_data;
1186 let mut current_key = Vec::new();
1187
1188 while !reader.is_empty() {
1189 let common_prefix_len = read_vint(&mut reader)? as usize;
1190 let suffix_len = read_vint(&mut reader)? as usize;
1191
1192 current_key.truncate(common_prefix_len);
1193 let mut suffix = vec![0u8; suffix_len];
1194 reader.read_exact(&mut suffix)?;
1195 current_key.extend_from_slice(&suffix);
1196
1197 let value = V::deserialize(&mut reader)?;
1198
1199 match current_key.as_slice().cmp(target_key) {
1200 std::cmp::Ordering::Equal => return Ok(Some(value)),
1201 std::cmp::Ordering::Greater => return Ok(None),
1202 std::cmp::Ordering::Less => continue,
1203 }
1204 }
1205
1206 Ok(None)
1207 }
1208
1209 pub async fn prefetch_range(&self, start_key: &[u8], end_key: &[u8]) -> io::Result<()> {
1211 let start_block = match self
1212 .index
1213 .binary_search_by(|e| e.first_key.as_slice().cmp(start_key))
1214 {
1215 Ok(idx) => idx,
1216 Err(0) => 0,
1217 Err(idx) => idx - 1,
1218 };
1219
1220 let end_block = match self
1221 .index
1222 .binary_search_by(|e| e.first_key.as_slice().cmp(end_key))
1223 {
1224 Ok(idx) => idx,
1225 Err(idx) if idx >= self.index.len() => self.index.len().saturating_sub(1),
1226 Err(idx) => idx,
1227 };
1228
1229 for block_idx in start_block..=end_block.min(self.index.len().saturating_sub(1)) {
1230 let _ = self.load_block(block_idx).await?;
1231 }
1232
1233 Ok(())
1234 }
1235
1236 pub fn iter(&self) -> AsyncSSTableIterator<'_, V> {
1238 AsyncSSTableIterator::new(self)
1239 }
1240
1241 pub async fn all_entries(&self) -> io::Result<Vec<(Vec<u8>, V)>> {
1243 let mut results = Vec::new();
1244
1245 for block_idx in 0..self.index.len() {
1246 let block_data = self.load_block(block_idx).await?;
1247 let mut reader = block_data.as_slice();
1248 let mut current_key = Vec::new();
1249
1250 while !reader.is_empty() {
1251 let common_prefix_len = read_vint(&mut reader)? as usize;
1252 let suffix_len = read_vint(&mut reader)? as usize;
1253
1254 current_key.truncate(common_prefix_len);
1255 let mut suffix = vec![0u8; suffix_len];
1256 reader.read_exact(&mut suffix)?;
1257 current_key.extend_from_slice(&suffix);
1258
1259 let value = V::deserialize(&mut reader)?;
1260 results.push((current_key.clone(), value));
1261 }
1262 }
1263
1264 Ok(results)
1265 }
1266}
1267
1268pub struct AsyncSSTableIterator<'a, V: SSTableValue> {
1270 reader: &'a AsyncSSTableReader<V>,
1271 current_block: usize,
1272 block_data: Option<Arc<Vec<u8>>>,
1273 block_offset: usize,
1274 current_key: Vec<u8>,
1275 finished: bool,
1276}
1277
1278impl<'a, V: SSTableValue> AsyncSSTableIterator<'a, V> {
1279 fn new(reader: &'a AsyncSSTableReader<V>) -> Self {
1280 Self {
1281 reader,
1282 current_block: 0,
1283 block_data: None,
1284 block_offset: 0,
1285 current_key: Vec::new(),
1286 finished: reader.index.is_empty(),
1287 }
1288 }
1289
1290 async fn load_next_block(&mut self) -> io::Result<bool> {
1291 if self.current_block >= self.reader.index.len() {
1292 self.finished = true;
1293 return Ok(false);
1294 }
1295
1296 self.block_data = Some(self.reader.load_block(self.current_block).await?);
1297 self.block_offset = 0;
1298 self.current_key.clear();
1299 self.current_block += 1;
1300 Ok(true)
1301 }
1302
1303 pub async fn next(&mut self) -> io::Result<Option<(Vec<u8>, V)>> {
1305 if self.finished {
1306 return Ok(None);
1307 }
1308
1309 if self.block_data.is_none() && !self.load_next_block().await? {
1310 return Ok(None);
1311 }
1312
1313 loop {
1314 let block = self.block_data.as_ref().unwrap();
1315 if self.block_offset >= block.len() {
1316 if !self.load_next_block().await? {
1317 return Ok(None);
1318 }
1319 continue;
1320 }
1321
1322 let mut reader = &block[self.block_offset..];
1323 let start_len = reader.len();
1324
1325 let common_prefix_len = read_vint(&mut reader)? as usize;
1326 let suffix_len = read_vint(&mut reader)? as usize;
1327
1328 self.current_key.truncate(common_prefix_len);
1329 let mut suffix = vec![0u8; suffix_len];
1330 reader.read_exact(&mut suffix)?;
1331 self.current_key.extend_from_slice(&suffix);
1332
1333 let value = V::deserialize(&mut reader)?;
1334
1335 self.block_offset += start_len - reader.len();
1336
1337 return Ok(Some((self.current_key.clone(), value)));
1338 }
1339 }
1340}
1341
1342#[cfg(test)]
1343mod tests {
1344 use super::*;
1345
1346 #[test]
1347 fn test_bloom_filter_basic() {
1348 let mut bloom = BloomFilter::new(100, 10);
1349
1350 bloom.insert(b"hello");
1351 bloom.insert(b"world");
1352 bloom.insert(b"test");
1353
1354 assert!(bloom.may_contain(b"hello"));
1355 assert!(bloom.may_contain(b"world"));
1356 assert!(bloom.may_contain(b"test"));
1357
1358 assert!(!bloom.may_contain(b"notfound"));
1360 assert!(!bloom.may_contain(b"missing"));
1361 }
1362
1363 #[test]
1364 fn test_bloom_filter_serialization() {
1365 let mut bloom = BloomFilter::new(100, 10);
1366 bloom.insert(b"key1");
1367 bloom.insert(b"key2");
1368
1369 let bytes = bloom.to_bytes();
1370 let restored = BloomFilter::from_bytes(&bytes).unwrap();
1371
1372 assert!(restored.may_contain(b"key1"));
1373 assert!(restored.may_contain(b"key2"));
1374 assert!(!restored.may_contain(b"key3"));
1375 }
1376
1377 #[test]
1378 fn test_bloom_filter_false_positive_rate() {
1379 let num_keys = 10000;
1380 let mut bloom = BloomFilter::new(num_keys, BLOOM_BITS_PER_KEY);
1381
1382 for i in 0..num_keys {
1384 let key = format!("key_{}", i);
1385 bloom.insert(key.as_bytes());
1386 }
1387
1388 for i in 0..num_keys {
1390 let key = format!("key_{}", i);
1391 assert!(bloom.may_contain(key.as_bytes()));
1392 }
1393
1394 let mut false_positives = 0;
1396 let test_count = 10000;
1397 for i in 0..test_count {
1398 let key = format!("nonexistent_{}", i);
1399 if bloom.may_contain(key.as_bytes()) {
1400 false_positives += 1;
1401 }
1402 }
1403
1404 let fp_rate = false_positives as f64 / test_count as f64;
1407 assert!(
1408 fp_rate < 0.03,
1409 "False positive rate {} is too high",
1410 fp_rate
1411 );
1412 }
1413
1414 #[test]
1415 fn test_sstable_writer_config() {
1416 use crate::structures::IndexOptimization;
1417
1418 let config = SSTableWriterConfig::default();
1420 assert_eq!(config.compression_level.0, 9); assert!(!config.use_bloom_filter);
1422 assert!(!config.use_dictionary);
1423
1424 let adaptive = SSTableWriterConfig::from_optimization(IndexOptimization::Adaptive);
1426 assert_eq!(adaptive.compression_level.0, 9);
1427 assert!(!adaptive.use_bloom_filter);
1428 assert!(!adaptive.use_dictionary);
1429
1430 let size = SSTableWriterConfig::from_optimization(IndexOptimization::SizeOptimized);
1432 assert_eq!(size.compression_level.0, 22); assert!(size.use_bloom_filter);
1434 assert!(size.use_dictionary);
1435
1436 let perf = SSTableWriterConfig::from_optimization(IndexOptimization::PerformanceOptimized);
1438 assert_eq!(perf.compression_level.0, 1); assert!(perf.use_bloom_filter); assert!(!perf.use_dictionary);
1441
1442 let fast = SSTableWriterConfig::fast();
1444 assert_eq!(fast.compression_level.0, 1);
1445
1446 let max = SSTableWriterConfig::max_compression();
1447 assert_eq!(max.compression_level.0, 22);
1448 }
1449
1450 #[test]
1451 fn test_vint_roundtrip() {
1452 let test_values = [0u64, 1, 127, 128, 255, 256, 16383, 16384, u64::MAX];
1453
1454 for &val in &test_values {
1455 let mut buf = Vec::new();
1456 write_vint(&mut buf, val).unwrap();
1457 let mut reader = buf.as_slice();
1458 let decoded = read_vint(&mut reader).unwrap();
1459 assert_eq!(val, decoded, "Failed for value {}", val);
1460 }
1461 }
1462
1463 #[test]
1464 fn test_common_prefix_len() {
1465 assert_eq!(common_prefix_len(b"hello", b"hello"), 5);
1466 assert_eq!(common_prefix_len(b"hello", b"help"), 3);
1467 assert_eq!(common_prefix_len(b"hello", b"world"), 0);
1468 assert_eq!(common_prefix_len(b"", b"hello"), 0);
1469 assert_eq!(common_prefix_len(b"hello", b""), 0);
1470 }
1471}