1use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
22use parking_lot::RwLock;
23use rustc_hash::FxHashMap;
24use std::io::{self, Read, Write};
25use std::sync::Arc;
26
27#[cfg(feature = "native")]
28use super::sstable_index::FstBlockIndex;
29use super::sstable_index::{BlockAddr, BlockIndex, MmapBlockIndex};
30use crate::compression::{CompressionDict, CompressionLevel};
31use crate::directories::{AsyncFileRead, LazyFileHandle, LazyFileSlice, OwnedBytes};
32
33pub const SSTABLE_MAGIC: u32 = 0x53544234; pub const BLOCK_SIZE: usize = 16 * 1024;
39
40pub const DEFAULT_DICT_SIZE: usize = 64 * 1024;
42
43pub const BLOOM_BITS_PER_KEY: usize = 10;
45
46pub const BLOOM_HASH_COUNT: usize = 7;
48
49#[derive(Debug, Clone)]
55pub struct BloomFilter {
56 bits: Vec<u64>,
57 num_bits: usize,
58 num_hashes: usize,
59}
60
61impl BloomFilter {
62 pub fn new(expected_keys: usize, bits_per_key: usize) -> Self {
64 let num_bits = (expected_keys * bits_per_key).max(64);
65 let num_words = num_bits.div_ceil(64);
66 Self {
67 bits: vec![0u64; num_words],
68 num_bits,
69 num_hashes: BLOOM_HASH_COUNT,
70 }
71 }
72
73 pub fn from_bytes(data: &[u8]) -> io::Result<Self> {
75 if data.len() < 12 {
76 return Err(io::Error::new(
77 io::ErrorKind::InvalidData,
78 "Bloom filter data too short",
79 ));
80 }
81 let mut reader = data;
82 let num_bits = reader.read_u32::<LittleEndian>()? as usize;
83 let num_hashes = reader.read_u32::<LittleEndian>()? as usize;
84 let num_words = reader.read_u32::<LittleEndian>()? as usize;
85
86 if reader.len() < num_words * 8 {
87 return Err(io::Error::new(
88 io::ErrorKind::InvalidData,
89 "Bloom filter data truncated",
90 ));
91 }
92
93 let mut bits = Vec::with_capacity(num_words);
94 for _ in 0..num_words {
95 bits.push(reader.read_u64::<LittleEndian>()?);
96 }
97
98 Ok(Self {
99 bits,
100 num_bits,
101 num_hashes,
102 })
103 }
104
105 pub fn to_bytes(&self) -> Vec<u8> {
107 let mut data = Vec::with_capacity(12 + self.bits.len() * 8);
108 data.write_u32::<LittleEndian>(self.num_bits as u32)
109 .unwrap();
110 data.write_u32::<LittleEndian>(self.num_hashes as u32)
111 .unwrap();
112 data.write_u32::<LittleEndian>(self.bits.len() as u32)
113 .unwrap();
114 for &word in &self.bits {
115 data.write_u64::<LittleEndian>(word).unwrap();
116 }
117 data
118 }
119
120 pub fn insert(&mut self, key: &[u8]) {
122 let (h1, h2) = self.hash_pair(key);
123 for i in 0..self.num_hashes {
124 let bit_pos = self.get_bit_pos(h1, h2, i);
125 let word_idx = bit_pos / 64;
126 let bit_idx = bit_pos % 64;
127 if word_idx < self.bits.len() {
128 self.bits[word_idx] |= 1u64 << bit_idx;
129 }
130 }
131 }
132
133 pub fn may_contain(&self, key: &[u8]) -> bool {
136 let (h1, h2) = self.hash_pair(key);
137 for i in 0..self.num_hashes {
138 let bit_pos = self.get_bit_pos(h1, h2, i);
139 let word_idx = bit_pos / 64;
140 let bit_idx = bit_pos % 64;
141 if word_idx >= self.bits.len() || (self.bits[word_idx] & (1u64 << bit_idx)) == 0 {
142 return false;
143 }
144 }
145 true
146 }
147
148 pub fn size_bytes(&self) -> usize {
150 12 + self.bits.len() * 8
151 }
152
153 pub fn insert_hashed(&mut self, h1: u64, h2: u64) {
155 for i in 0..self.num_hashes {
156 let bit_pos = self.get_bit_pos(h1, h2, i);
157 let word_idx = bit_pos / 64;
158 let bit_idx = bit_pos % 64;
159 if word_idx < self.bits.len() {
160 self.bits[word_idx] |= 1u64 << bit_idx;
161 }
162 }
163 }
164
165 fn hash_pair(&self, key: &[u8]) -> (u64, u64) {
167 let mut h1: u64 = 0xcbf29ce484222325;
169 for &byte in key {
170 h1 ^= byte as u64;
171 h1 = h1.wrapping_mul(0x100000001b3);
172 }
173
174 let mut h2: u64 = 0x84222325cbf29ce4;
176 for &byte in key {
177 h2 = h2.wrapping_mul(0x100000001b3);
178 h2 ^= byte as u64;
179 }
180
181 (h1, h2)
182 }
183
184 #[inline]
186 fn get_bit_pos(&self, h1: u64, h2: u64, i: usize) -> usize {
187 (h1.wrapping_add((i as u64).wrapping_mul(h2)) % (self.num_bits as u64)) as usize
188 }
189}
190
191fn bloom_hash_pair(key: &[u8]) -> (u64, u64) {
194 let mut h1: u64 = 0xcbf29ce484222325;
195 for &byte in key {
196 h1 ^= byte as u64;
197 h1 = h1.wrapping_mul(0x100000001b3);
198 }
199 let mut h2: u64 = 0x84222325cbf29ce4;
200 for &byte in key {
201 h2 = h2.wrapping_mul(0x100000001b3);
202 h2 ^= byte as u64;
203 }
204 (h1, h2)
205}
206
207pub trait SSTableValue: Clone + Send + Sync {
209 fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()>;
210 fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self>;
211}
212
213impl SSTableValue for u64 {
215 fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
216 write_vint(writer, *self)
217 }
218
219 fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
220 read_vint(reader)
221 }
222}
223
224impl SSTableValue for Vec<u8> {
226 fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
227 write_vint(writer, self.len() as u64)?;
228 writer.write_all(self)
229 }
230
231 fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
232 let len = read_vint(reader)? as usize;
233 let mut data = vec![0u8; len];
234 reader.read_exact(&mut data)?;
235 Ok(data)
236 }
237}
238
239#[derive(Debug, Clone, Copy, PartialEq, Eq)]
242pub struct SparseDimInfo {
243 pub offset: u64,
245 pub length: u32,
247}
248
249impl SparseDimInfo {
250 pub fn new(offset: u64, length: u32) -> Self {
251 Self { offset, length }
252 }
253}
254
255impl SSTableValue for SparseDimInfo {
256 fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
257 write_vint(writer, self.offset)?;
258 write_vint(writer, self.length as u64)
259 }
260
261 fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
262 let offset = read_vint(reader)?;
263 let length = read_vint(reader)? as u32;
264 Ok(Self { offset, length })
265 }
266}
267
268pub const MAX_INLINE_POSTINGS: usize = 3;
270
271#[derive(Debug, Clone, PartialEq, Eq)]
279pub enum TermInfo {
280 Inline {
283 doc_freq: u8,
285 data: [u8; 16],
288 data_len: u8,
290 },
291 External {
293 posting_offset: u64,
294 posting_len: u32,
295 doc_freq: u32,
296 position_offset: u64,
298 position_len: u32,
300 },
301}
302
303impl TermInfo {
304 pub fn external(posting_offset: u64, posting_len: u32, doc_freq: u32) -> Self {
306 TermInfo::External {
307 posting_offset,
308 posting_len,
309 doc_freq,
310 position_offset: 0,
311 position_len: 0,
312 }
313 }
314
315 pub fn external_with_positions(
317 posting_offset: u64,
318 posting_len: u32,
319 doc_freq: u32,
320 position_offset: u64,
321 position_len: u32,
322 ) -> Self {
323 TermInfo::External {
324 posting_offset,
325 posting_len,
326 doc_freq,
327 position_offset,
328 position_len,
329 }
330 }
331
332 pub fn try_inline(doc_ids: &[u32], term_freqs: &[u32]) -> Option<Self> {
335 if doc_ids.len() > MAX_INLINE_POSTINGS || doc_ids.is_empty() {
336 return None;
337 }
338
339 let mut data = [0u8; 16];
340 let mut cursor = std::io::Cursor::new(&mut data[..]);
341 let mut prev_doc_id = 0u32;
342
343 for (i, &doc_id) in doc_ids.iter().enumerate() {
344 let delta = doc_id - prev_doc_id;
345 if write_vint(&mut cursor, delta as u64).is_err() {
346 return None;
347 }
348 if write_vint(&mut cursor, term_freqs[i] as u64).is_err() {
349 return None;
350 }
351 prev_doc_id = doc_id;
352 }
353
354 let data_len = cursor.position() as u8;
355 if data_len > 16 {
356 return None;
357 }
358
359 Some(TermInfo::Inline {
360 doc_freq: doc_ids.len() as u8,
361 data,
362 data_len,
363 })
364 }
365
366 pub fn try_inline_iter(count: usize, iter: impl Iterator<Item = (u32, u32)>) -> Option<Self> {
370 if count > MAX_INLINE_POSTINGS || count == 0 {
371 return None;
372 }
373
374 let mut data = [0u8; 16];
375 let mut cursor = std::io::Cursor::new(&mut data[..]);
376 let mut prev_doc_id = 0u32;
377
378 for (doc_id, tf) in iter {
379 let delta = doc_id - prev_doc_id;
380 if write_vint(&mut cursor, delta as u64).is_err() {
381 return None;
382 }
383 if write_vint(&mut cursor, tf as u64).is_err() {
384 return None;
385 }
386 prev_doc_id = doc_id;
387 }
388
389 let data_len = cursor.position() as u8;
390
391 Some(TermInfo::Inline {
392 doc_freq: count as u8,
393 data,
394 data_len,
395 })
396 }
397
398 pub fn doc_freq(&self) -> u32 {
400 match self {
401 TermInfo::Inline { doc_freq, .. } => *doc_freq as u32,
402 TermInfo::External { doc_freq, .. } => *doc_freq,
403 }
404 }
405
406 pub fn is_inline(&self) -> bool {
408 matches!(self, TermInfo::Inline { .. })
409 }
410
411 pub fn external_info(&self) -> Option<(u64, u32)> {
413 match self {
414 TermInfo::External {
415 posting_offset,
416 posting_len,
417 ..
418 } => Some((*posting_offset, *posting_len)),
419 TermInfo::Inline { .. } => None,
420 }
421 }
422
423 pub fn position_info(&self) -> Option<(u64, u32)> {
425 match self {
426 TermInfo::External {
427 position_offset,
428 position_len,
429 ..
430 } if *position_len > 0 => Some((*position_offset, *position_len)),
431 _ => None,
432 }
433 }
434
435 pub fn decode_inline(&self) -> Option<(Vec<u32>, Vec<u32>)> {
438 match self {
439 TermInfo::Inline {
440 doc_freq,
441 data,
442 data_len,
443 } => {
444 let mut doc_ids = Vec::with_capacity(*doc_freq as usize);
445 let mut term_freqs = Vec::with_capacity(*doc_freq as usize);
446 let mut reader = &data[..*data_len as usize];
447 let mut prev_doc_id = 0u32;
448
449 for _ in 0..*doc_freq {
450 let delta = read_vint(&mut reader).ok()? as u32;
451 let tf = read_vint(&mut reader).ok()? as u32;
452 let doc_id = prev_doc_id + delta;
453 doc_ids.push(doc_id);
454 term_freqs.push(tf);
455 prev_doc_id = doc_id;
456 }
457
458 Some((doc_ids, term_freqs))
459 }
460 TermInfo::External { .. } => None,
461 }
462 }
463}
464
465impl SSTableValue for TermInfo {
466 fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
467 match self {
468 TermInfo::Inline {
469 doc_freq,
470 data,
471 data_len,
472 } => {
473 writer.write_u8(0xFF)?;
475 writer.write_u8(*doc_freq)?;
476 writer.write_u8(*data_len)?;
477 writer.write_all(&data[..*data_len as usize])?;
478 }
479 TermInfo::External {
480 posting_offset,
481 posting_len,
482 doc_freq,
483 position_offset,
484 position_len,
485 } => {
486 if *position_len > 0 {
489 writer.write_u8(0x01)?;
490 write_vint(writer, *doc_freq as u64)?;
491 write_vint(writer, *posting_offset)?;
492 write_vint(writer, *posting_len as u64)?;
493 write_vint(writer, *position_offset)?;
494 write_vint(writer, *position_len as u64)?;
495 } else {
496 writer.write_u8(0x00)?;
497 write_vint(writer, *doc_freq as u64)?;
498 write_vint(writer, *posting_offset)?;
499 write_vint(writer, *posting_len as u64)?;
500 }
501 }
502 }
503 Ok(())
504 }
505
506 fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
507 let tag = reader.read_u8()?;
508
509 if tag == 0xFF {
510 let doc_freq = reader.read_u8()?;
512 let data_len = reader.read_u8()?;
513 let mut data = [0u8; 16];
514 reader.read_exact(&mut data[..data_len as usize])?;
515 Ok(TermInfo::Inline {
516 doc_freq,
517 data,
518 data_len,
519 })
520 } else if tag == 0x00 {
521 let doc_freq = read_vint(reader)? as u32;
523 let posting_offset = read_vint(reader)?;
524 let posting_len = read_vint(reader)? as u32;
525 Ok(TermInfo::External {
526 posting_offset,
527 posting_len,
528 doc_freq,
529 position_offset: 0,
530 position_len: 0,
531 })
532 } else if tag == 0x01 {
533 let doc_freq = read_vint(reader)? as u32;
535 let posting_offset = read_vint(reader)?;
536 let posting_len = read_vint(reader)? as u32;
537 let position_offset = read_vint(reader)?;
538 let position_len = read_vint(reader)? as u32;
539 Ok(TermInfo::External {
540 posting_offset,
541 posting_len,
542 doc_freq,
543 position_offset,
544 position_len,
545 })
546 } else {
547 Err(io::Error::new(
548 io::ErrorKind::InvalidData,
549 format!("Invalid TermInfo tag: {}", tag),
550 ))
551 }
552 }
553}
554
555pub fn write_vint<W: Write + ?Sized>(writer: &mut W, mut value: u64) -> io::Result<()> {
557 loop {
558 let byte = (value & 0x7F) as u8;
559 value >>= 7;
560 if value == 0 {
561 writer.write_u8(byte)?;
562 return Ok(());
563 } else {
564 writer.write_u8(byte | 0x80)?;
565 }
566 }
567}
568
569pub fn read_vint<R: Read>(reader: &mut R) -> io::Result<u64> {
571 let mut result = 0u64;
572 let mut shift = 0;
573
574 loop {
575 let byte = reader.read_u8()?;
576 result |= ((byte & 0x7F) as u64) << shift;
577 if byte & 0x80 == 0 {
578 return Ok(result);
579 }
580 shift += 7;
581 if shift >= 64 {
582 return Err(io::Error::new(
583 io::ErrorKind::InvalidData,
584 "varint too long",
585 ));
586 }
587 }
588}
589
590pub fn common_prefix_len(a: &[u8], b: &[u8]) -> usize {
592 a.iter().zip(b.iter()).take_while(|(x, y)| x == y).count()
593}
594
595#[derive(Debug, Clone)]
597pub struct SSTableStats {
598 pub num_blocks: usize,
599 pub num_sparse_entries: usize,
600 pub num_entries: u64,
601 pub has_bloom_filter: bool,
602 pub has_dictionary: bool,
603 pub bloom_filter_size: usize,
604 pub dictionary_size: usize,
605}
606
607#[derive(Debug, Clone)]
609pub struct SSTableWriterConfig {
610 pub compression_level: CompressionLevel,
612 pub use_dictionary: bool,
614 pub dict_size: usize,
616 pub use_bloom_filter: bool,
618 pub bloom_bits_per_key: usize,
620}
621
622impl Default for SSTableWriterConfig {
623 fn default() -> Self {
624 Self::from_optimization(crate::structures::IndexOptimization::default())
625 }
626}
627
628impl SSTableWriterConfig {
629 pub fn from_optimization(optimization: crate::structures::IndexOptimization) -> Self {
631 use crate::structures::IndexOptimization;
632 match optimization {
633 IndexOptimization::Adaptive => Self {
634 compression_level: CompressionLevel::BETTER, use_dictionary: false,
636 dict_size: DEFAULT_DICT_SIZE,
637 use_bloom_filter: false,
638 bloom_bits_per_key: BLOOM_BITS_PER_KEY,
639 },
640 IndexOptimization::SizeOptimized => Self {
641 compression_level: CompressionLevel::MAX, use_dictionary: true,
643 dict_size: DEFAULT_DICT_SIZE,
644 use_bloom_filter: true,
645 bloom_bits_per_key: BLOOM_BITS_PER_KEY,
646 },
647 IndexOptimization::PerformanceOptimized => Self {
648 compression_level: CompressionLevel::FAST, use_dictionary: false,
650 dict_size: DEFAULT_DICT_SIZE,
651 use_bloom_filter: true, bloom_bits_per_key: BLOOM_BITS_PER_KEY,
653 },
654 }
655 }
656
657 pub fn fast() -> Self {
659 Self::from_optimization(crate::structures::IndexOptimization::PerformanceOptimized)
660 }
661
662 pub fn max_compression() -> Self {
664 Self::from_optimization(crate::structures::IndexOptimization::SizeOptimized)
665 }
666}
667
668pub struct SSTableWriter<W: Write, V: SSTableValue> {
674 writer: W,
675 block_buffer: Vec<u8>,
676 prev_key: Vec<u8>,
677 index: Vec<BlockIndexEntry>,
678 current_offset: u64,
679 num_entries: u64,
680 block_first_key: Option<Vec<u8>>,
681 config: SSTableWriterConfig,
682 dictionary: Option<CompressionDict>,
684 bloom_hashes: Vec<(u64, u64)>,
687 _phantom: std::marker::PhantomData<V>,
688}
689
690impl<W: Write, V: SSTableValue> SSTableWriter<W, V> {
691 pub fn new(writer: W) -> Self {
693 Self::with_config(writer, SSTableWriterConfig::default())
694 }
695
696 pub fn with_config(writer: W, config: SSTableWriterConfig) -> Self {
698 Self {
699 writer,
700 block_buffer: Vec::with_capacity(BLOCK_SIZE),
701 prev_key: Vec::new(),
702 index: Vec::new(),
703 current_offset: 0,
704 num_entries: 0,
705 block_first_key: None,
706 config,
707 dictionary: None,
708 bloom_hashes: Vec::new(),
709 _phantom: std::marker::PhantomData,
710 }
711 }
712
713 pub fn with_dictionary(
715 writer: W,
716 config: SSTableWriterConfig,
717 dictionary: CompressionDict,
718 ) -> Self {
719 Self {
720 writer,
721 block_buffer: Vec::with_capacity(BLOCK_SIZE),
722 prev_key: Vec::new(),
723 index: Vec::new(),
724 current_offset: 0,
725 num_entries: 0,
726 block_first_key: None,
727 config,
728 dictionary: Some(dictionary),
729 bloom_hashes: Vec::new(),
730 _phantom: std::marker::PhantomData,
731 }
732 }
733
734 pub fn insert(&mut self, key: &[u8], value: &V) -> io::Result<()> {
735 if self.block_first_key.is_none() {
736 self.block_first_key = Some(key.to_vec());
737 }
738
739 if self.config.use_bloom_filter {
741 self.bloom_hashes.push(bloom_hash_pair(key));
742 }
743
744 let prefix_len = common_prefix_len(&self.prev_key, key);
745 let suffix = &key[prefix_len..];
746
747 write_vint(&mut self.block_buffer, prefix_len as u64)?;
748 write_vint(&mut self.block_buffer, suffix.len() as u64)?;
749 self.block_buffer.extend_from_slice(suffix);
750 value.serialize(&mut self.block_buffer)?;
751
752 self.prev_key.clear();
753 self.prev_key.extend_from_slice(key);
754 self.num_entries += 1;
755
756 if self.block_buffer.len() >= BLOCK_SIZE {
757 self.flush_block()?;
758 }
759
760 Ok(())
761 }
762
763 fn flush_block(&mut self) -> io::Result<()> {
765 if self.block_buffer.is_empty() {
766 return Ok(());
767 }
768
769 let compressed = if let Some(ref dict) = self.dictionary {
771 crate::compression::compress_with_dict(
772 &self.block_buffer,
773 self.config.compression_level,
774 dict,
775 )?
776 } else {
777 crate::compression::compress(&self.block_buffer, self.config.compression_level)?
778 };
779
780 if let Some(first_key) = self.block_first_key.take() {
781 self.index.push(BlockIndexEntry {
782 first_key,
783 offset: self.current_offset,
784 length: compressed.len() as u32,
785 });
786 }
787
788 self.writer.write_all(&compressed)?;
789 self.current_offset += compressed.len() as u64;
790 self.block_buffer.clear();
791 self.prev_key.clear();
792
793 Ok(())
794 }
795
796 pub fn finish(mut self) -> io::Result<W> {
797 self.flush_block()?;
799
800 let bloom_filter = if self.config.use_bloom_filter && !self.bloom_hashes.is_empty() {
802 let mut bloom =
803 BloomFilter::new(self.bloom_hashes.len(), self.config.bloom_bits_per_key);
804 for (h1, h2) in &self.bloom_hashes {
805 bloom.insert_hashed(*h1, *h2);
806 }
807 Some(bloom)
808 } else {
809 None
810 };
811
812 let data_end_offset = self.current_offset;
813
814 let entries: Vec<(Vec<u8>, BlockAddr)> = self
817 .index
818 .iter()
819 .map(|e| {
820 (
821 e.first_key.clone(),
822 BlockAddr {
823 offset: e.offset,
824 length: e.length,
825 },
826 )
827 })
828 .collect();
829
830 #[cfg(feature = "native")]
832 let index_bytes = FstBlockIndex::build(&entries)?;
833 #[cfg(not(feature = "native"))]
834 let index_bytes = MmapBlockIndex::build(&entries)?;
835
836 self.writer
838 .write_u32::<LittleEndian>(index_bytes.len() as u32)?;
839 self.writer.write_all(&index_bytes)?;
840 self.current_offset += 4 + index_bytes.len() as u64;
841
842 let bloom_offset = if let Some(ref bloom) = bloom_filter {
844 let bloom_data = bloom.to_bytes();
845 let offset = self.current_offset;
846 self.writer.write_all(&bloom_data)?;
847 self.current_offset += bloom_data.len() as u64;
848 offset
849 } else {
850 0
851 };
852
853 let dict_offset = if let Some(ref dict) = self.dictionary {
855 let dict_bytes = dict.as_bytes();
856 let offset = self.current_offset;
857 self.writer
858 .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
859 self.writer.write_all(dict_bytes)?;
860 self.current_offset += 4 + dict_bytes.len() as u64;
861 offset
862 } else {
863 0
864 };
865
866 self.writer.write_u64::<LittleEndian>(data_end_offset)?;
868 self.writer.write_u64::<LittleEndian>(self.num_entries)?;
869 self.writer.write_u64::<LittleEndian>(bloom_offset)?; self.writer.write_u64::<LittleEndian>(dict_offset)?; self.writer
872 .write_u8(self.config.compression_level.0 as u8)?;
873 self.writer.write_u32::<LittleEndian>(SSTABLE_MAGIC)?;
874
875 Ok(self.writer)
876 }
877}
878
879#[derive(Debug, Clone)]
881struct BlockIndexEntry {
882 first_key: Vec<u8>,
883 offset: u64,
884 length: u32,
885}
886
887pub struct AsyncSSTableReader<V: SSTableValue> {
894 data_slice: LazyFileSlice,
896 block_index: BlockIndex,
898 num_entries: u64,
899 cache: RwLock<BlockCache>,
901 bloom_filter: Option<BloomFilter>,
903 dictionary: Option<CompressionDict>,
905 #[allow(dead_code)]
907 compression_level: CompressionLevel,
908 _phantom: std::marker::PhantomData<V>,
909}
910
911struct BlockCache {
918 blocks: FxHashMap<u64, Arc<[u8]>>,
919 lru_order: std::collections::VecDeque<u64>,
920 max_blocks: usize,
921}
922
923impl BlockCache {
924 fn new(max_blocks: usize) -> Self {
925 Self {
926 blocks: FxHashMap::default(),
927 lru_order: std::collections::VecDeque::with_capacity(max_blocks),
928 max_blocks,
929 }
930 }
931
932 fn get(&mut self, offset: u64) -> Option<Arc<[u8]>> {
933 if self.blocks.contains_key(&offset) {
934 self.promote(offset);
935 self.blocks.get(&offset).map(Arc::clone)
936 } else {
937 None
938 }
939 }
940
941 fn insert(&mut self, offset: u64, block: Arc<[u8]>) {
942 if self.blocks.contains_key(&offset) {
943 self.promote(offset);
944 return;
945 }
946 while self.blocks.len() >= self.max_blocks {
947 if let Some(evict_offset) = self.lru_order.pop_front() {
948 self.blocks.remove(&evict_offset);
949 } else {
950 break;
951 }
952 }
953 self.blocks.insert(offset, block);
954 self.lru_order.push_back(offset);
955 }
956
957 fn promote(&mut self, offset: u64) {
959 if let Some(pos) = self.lru_order.iter().position(|&k| k == offset) {
960 self.lru_order.remove(pos);
961 self.lru_order.push_back(offset);
962 }
963 }
964}
965
966impl<V: SSTableValue> AsyncSSTableReader<V> {
967 pub async fn open(file_handle: LazyFileHandle, cache_blocks: usize) -> io::Result<Self> {
974 let file_len = file_handle.len();
975 if file_len < 37 {
976 return Err(io::Error::new(
977 io::ErrorKind::InvalidData,
978 "SSTable too small",
979 ));
980 }
981
982 let footer_bytes = file_handle
985 .read_bytes_range(file_len - 37..file_len)
986 .await?;
987
988 let mut reader = footer_bytes.as_slice();
989 let data_end_offset = reader.read_u64::<LittleEndian>()?;
990 let num_entries = reader.read_u64::<LittleEndian>()?;
991 let bloom_offset = reader.read_u64::<LittleEndian>()?;
992 let dict_offset = reader.read_u64::<LittleEndian>()?;
993 let compression_level = CompressionLevel(reader.read_u8()? as i32);
994 let magic = reader.read_u32::<LittleEndian>()?;
995
996 if magic != SSTABLE_MAGIC {
997 return Err(io::Error::new(
998 io::ErrorKind::InvalidData,
999 format!("Invalid SSTable magic: 0x{:08X}", magic),
1000 ));
1001 }
1002
1003 let index_start = data_end_offset;
1005 let index_end = file_len - 37;
1006 let index_bytes = file_handle.read_bytes_range(index_start..index_end).await?;
1007
1008 let mut idx_reader = index_bytes.as_slice();
1010 let index_len = idx_reader.read_u32::<LittleEndian>()? as usize;
1011
1012 if index_len > idx_reader.len() {
1013 return Err(io::Error::new(
1014 io::ErrorKind::InvalidData,
1015 "Index data truncated",
1016 ));
1017 }
1018
1019 let index_data = OwnedBytes::new(idx_reader[..index_len].to_vec());
1020
1021 #[cfg(feature = "native")]
1023 let block_index = match FstBlockIndex::load(index_data.clone()) {
1024 Ok(fst_idx) => BlockIndex::Fst(fst_idx),
1025 Err(_) => BlockIndex::Mmap(MmapBlockIndex::load(index_data)?),
1026 };
1027 #[cfg(not(feature = "native"))]
1028 let block_index = BlockIndex::Mmap(MmapBlockIndex::load(index_data)?);
1029
1030 let bloom_filter = if bloom_offset > 0 {
1032 let bloom_start = bloom_offset;
1033 let bloom_header = file_handle
1035 .read_bytes_range(bloom_start..bloom_start + 12)
1036 .await?;
1037 let num_words = u32::from_le_bytes([
1038 bloom_header[8],
1039 bloom_header[9],
1040 bloom_header[10],
1041 bloom_header[11],
1042 ]) as u64;
1043 let bloom_size = 12 + num_words * 8;
1044 let bloom_data = file_handle
1045 .read_bytes_range(bloom_start..bloom_start + bloom_size)
1046 .await?;
1047 Some(BloomFilter::from_bytes(&bloom_data)?)
1048 } else {
1049 None
1050 };
1051
1052 let dictionary = if dict_offset > 0 {
1054 let dict_start = dict_offset;
1055 let dict_len_bytes = file_handle
1057 .read_bytes_range(dict_start..dict_start + 4)
1058 .await?;
1059 let dict_len = u32::from_le_bytes([
1060 dict_len_bytes[0],
1061 dict_len_bytes[1],
1062 dict_len_bytes[2],
1063 dict_len_bytes[3],
1064 ]) as u64;
1065 let dict_data = file_handle
1066 .read_bytes_range(dict_start + 4..dict_start + 4 + dict_len)
1067 .await?;
1068 Some(CompressionDict::from_bytes(dict_data.to_vec()))
1069 } else {
1070 None
1071 };
1072
1073 let data_slice = file_handle.slice(0..data_end_offset);
1075
1076 Ok(Self {
1077 data_slice,
1078 block_index,
1079 num_entries,
1080 cache: RwLock::new(BlockCache::new(cache_blocks)),
1081 bloom_filter,
1082 dictionary,
1083 compression_level,
1084 _phantom: std::marker::PhantomData,
1085 })
1086 }
1087
1088 pub fn num_entries(&self) -> u64 {
1090 self.num_entries
1091 }
1092
1093 pub fn stats(&self) -> SSTableStats {
1095 SSTableStats {
1096 num_blocks: self.block_index.len(),
1097 num_sparse_entries: 0, num_entries: self.num_entries,
1099 has_bloom_filter: self.bloom_filter.is_some(),
1100 has_dictionary: self.dictionary.is_some(),
1101 bloom_filter_size: self
1102 .bloom_filter
1103 .as_ref()
1104 .map(|b| b.size_bytes())
1105 .unwrap_or(0),
1106 dictionary_size: self.dictionary.as_ref().map(|d| d.len()).unwrap_or(0),
1107 }
1108 }
1109
1110 pub fn cached_blocks(&self) -> usize {
1112 self.cache.read().blocks.len()
1113 }
1114
1115 pub async fn get(&self, key: &[u8]) -> io::Result<Option<V>> {
1120 log::debug!(
1121 "SSTable::get called, key_len={}, total_blocks={}",
1122 key.len(),
1123 self.block_index.len()
1124 );
1125
1126 if let Some(ref bloom) = self.bloom_filter
1128 && !bloom.may_contain(key)
1129 {
1130 log::debug!("SSTable::get bloom filter negative");
1131 return Ok(None);
1132 }
1133
1134 let block_idx = match self.block_index.locate(key) {
1136 Some(idx) => idx,
1137 None => {
1138 log::debug!("SSTable::get key not found (before first block)");
1139 return Ok(None);
1140 }
1141 };
1142
1143 log::debug!("SSTable::get loading block_idx={}", block_idx);
1144
1145 let block_data = self.load_block(block_idx).await?;
1147 self.search_block(&block_data, key)
1148 }
1149
1150 pub async fn get_batch(&self, keys: &[&[u8]]) -> io::Result<Vec<Option<V>>> {
1156 if keys.is_empty() {
1157 return Ok(Vec::new());
1158 }
1159
1160 let mut key_to_block: Vec<(usize, usize)> = Vec::with_capacity(keys.len());
1162 for (key_idx, key) in keys.iter().enumerate() {
1163 if let Some(ref bloom) = self.bloom_filter
1165 && !bloom.may_contain(key)
1166 {
1167 key_to_block.push((key_idx, usize::MAX)); continue;
1169 }
1170
1171 match self.block_index.locate(key) {
1172 Some(block_idx) => key_to_block.push((key_idx, block_idx)),
1173 None => key_to_block.push((key_idx, usize::MAX)), }
1175 }
1176
1177 let mut blocks_to_load: Vec<usize> = key_to_block
1179 .iter()
1180 .filter(|(_, b)| *b != usize::MAX)
1181 .map(|(_, b)| *b)
1182 .collect();
1183 blocks_to_load.sort_unstable();
1184 blocks_to_load.dedup();
1185
1186 for &block_idx in &blocks_to_load {
1188 let _ = self.load_block(block_idx).await?;
1189 }
1190
1191 let mut results = vec![None; keys.len()];
1193 for (key_idx, block_idx) in key_to_block {
1194 if block_idx == usize::MAX {
1195 continue;
1196 }
1197 let block_data = self.load_block(block_idx).await?; results[key_idx] = self.search_block(&block_data, keys[key_idx])?;
1199 }
1200
1201 Ok(results)
1202 }
1203
1204 pub async fn preload_all_blocks(&self) -> io::Result<()> {
1209 for block_idx in 0..self.block_index.len() {
1210 self.load_block(block_idx).await?;
1211 }
1212 Ok(())
1213 }
1214
1215 pub async fn prefetch_all_data_bulk(&self) -> io::Result<()> {
1221 let num_blocks = self.block_index.len();
1222 if num_blocks == 0 {
1223 return Ok(());
1224 }
1225
1226 let mut max_end: u64 = 0;
1228 for i in 0..num_blocks {
1229 if let Some(addr) = self.block_index.get_addr(i) {
1230 max_end = max_end.max(addr.offset + addr.length as u64);
1231 }
1232 }
1233
1234 let all_data = self.data_slice.read_bytes_range(0..max_end).await?;
1236 let buf = all_data.as_slice();
1237
1238 let mut cache = self.cache.write();
1240 cache.max_blocks = cache.max_blocks.max(num_blocks);
1241 for i in 0..num_blocks {
1242 let addr = self.block_index.get_addr(i).unwrap();
1243 if cache.get(addr.offset).is_some() {
1244 continue;
1245 }
1246 let compressed =
1247 &buf[addr.offset as usize..(addr.offset + addr.length as u64) as usize];
1248 let decompressed = if let Some(ref dict) = self.dictionary {
1249 crate::compression::decompress_with_dict(compressed, dict)?
1250 } else {
1251 crate::compression::decompress(compressed)?
1252 };
1253 cache.insert(addr.offset, Arc::from(decompressed));
1254 }
1255
1256 Ok(())
1257 }
1258
1259 async fn load_block(&self, block_idx: usize) -> io::Result<Arc<[u8]>> {
1262 let addr = self.block_index.get_addr(block_idx).ok_or_else(|| {
1263 io::Error::new(io::ErrorKind::InvalidInput, "Block index out of range")
1264 })?;
1265
1266 {
1268 if let Some(block) = self.cache.write().get(addr.offset) {
1269 return Ok(block);
1270 }
1271 }
1272
1273 log::debug!(
1274 "SSTable::load_block idx={} CACHE MISS, reading bytes [{}-{}]",
1275 block_idx,
1276 addr.offset,
1277 addr.offset + addr.length as u64
1278 );
1279
1280 let range = addr.byte_range();
1282 let compressed = self.data_slice.read_bytes_range(range).await?;
1283
1284 let decompressed = if let Some(ref dict) = self.dictionary {
1286 crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
1287 } else {
1288 crate::compression::decompress(compressed.as_slice())?
1289 };
1290
1291 let block: Arc<[u8]> = Arc::from(decompressed);
1292
1293 {
1295 let mut cache = self.cache.write();
1296 cache.insert(addr.offset, Arc::clone(&block));
1297 }
1298
1299 Ok(block)
1300 }
1301
1302 fn search_block(&self, block_data: &[u8], target_key: &[u8]) -> io::Result<Option<V>> {
1303 let mut reader = block_data;
1304 let mut current_key = Vec::new();
1305
1306 while !reader.is_empty() {
1307 let common_prefix_len = read_vint(&mut reader)? as usize;
1308 let suffix_len = read_vint(&mut reader)? as usize;
1309
1310 current_key.truncate(common_prefix_len);
1311 let mut suffix = vec![0u8; suffix_len];
1312 reader.read_exact(&mut suffix)?;
1313 current_key.extend_from_slice(&suffix);
1314
1315 let value = V::deserialize(&mut reader)?;
1316
1317 match current_key.as_slice().cmp(target_key) {
1318 std::cmp::Ordering::Equal => return Ok(Some(value)),
1319 std::cmp::Ordering::Greater => return Ok(None),
1320 std::cmp::Ordering::Less => continue,
1321 }
1322 }
1323
1324 Ok(None)
1325 }
1326
1327 pub async fn prefetch_range(&self, start_key: &[u8], end_key: &[u8]) -> io::Result<()> {
1329 let start_block = self.block_index.locate(start_key).unwrap_or(0);
1330 let end_block = self
1331 .block_index
1332 .locate(end_key)
1333 .unwrap_or(self.block_index.len().saturating_sub(1));
1334
1335 for block_idx in start_block..=end_block.min(self.block_index.len().saturating_sub(1)) {
1336 let _ = self.load_block(block_idx).await?;
1337 }
1338
1339 Ok(())
1340 }
1341
1342 pub fn iter(&self) -> AsyncSSTableIterator<'_, V> {
1344 AsyncSSTableIterator::new(self)
1345 }
1346
1347 pub async fn all_entries(&self) -> io::Result<Vec<(Vec<u8>, V)>> {
1349 let mut results = Vec::new();
1350
1351 for block_idx in 0..self.block_index.len() {
1352 let block_data = self.load_block(block_idx).await?;
1353 let mut reader = &block_data[..];
1354 let mut current_key = Vec::new();
1355
1356 while !reader.is_empty() {
1357 let common_prefix_len = read_vint(&mut reader)? as usize;
1358 let suffix_len = read_vint(&mut reader)? as usize;
1359
1360 current_key.truncate(common_prefix_len);
1361 let mut suffix = vec![0u8; suffix_len];
1362 reader.read_exact(&mut suffix)?;
1363 current_key.extend_from_slice(&suffix);
1364
1365 let value = V::deserialize(&mut reader)?;
1366 results.push((current_key.clone(), value));
1367 }
1368 }
1369
1370 Ok(results)
1371 }
1372}
1373
1374pub struct AsyncSSTableIterator<'a, V: SSTableValue> {
1376 reader: &'a AsyncSSTableReader<V>,
1377 current_block: usize,
1378 block_data: Option<Arc<[u8]>>,
1379 block_offset: usize,
1380 current_key: Vec<u8>,
1381 finished: bool,
1382}
1383
1384impl<'a, V: SSTableValue> AsyncSSTableIterator<'a, V> {
1385 fn new(reader: &'a AsyncSSTableReader<V>) -> Self {
1386 Self {
1387 reader,
1388 current_block: 0,
1389 block_data: None,
1390 block_offset: 0,
1391 current_key: Vec::new(),
1392 finished: reader.block_index.is_empty(),
1393 }
1394 }
1395
1396 async fn load_next_block(&mut self) -> io::Result<bool> {
1397 if self.current_block >= self.reader.block_index.len() {
1398 self.finished = true;
1399 return Ok(false);
1400 }
1401
1402 self.block_data = Some(self.reader.load_block(self.current_block).await?);
1403 self.block_offset = 0;
1404 self.current_key.clear();
1405 self.current_block += 1;
1406 Ok(true)
1407 }
1408
1409 pub async fn next(&mut self) -> io::Result<Option<(Vec<u8>, V)>> {
1411 if self.finished {
1412 return Ok(None);
1413 }
1414
1415 if self.block_data.is_none() && !self.load_next_block().await? {
1416 return Ok(None);
1417 }
1418
1419 loop {
1420 let block = self.block_data.as_ref().unwrap();
1421 if self.block_offset >= block.len() {
1422 if !self.load_next_block().await? {
1423 return Ok(None);
1424 }
1425 continue;
1426 }
1427
1428 let mut reader = &block[self.block_offset..];
1429 let start_len = reader.len();
1430
1431 let common_prefix_len = read_vint(&mut reader)? as usize;
1432 let suffix_len = read_vint(&mut reader)? as usize;
1433
1434 self.current_key.truncate(common_prefix_len);
1435 let mut suffix = vec![0u8; suffix_len];
1436 reader.read_exact(&mut suffix)?;
1437 self.current_key.extend_from_slice(&suffix);
1438
1439 let value = V::deserialize(&mut reader)?;
1440
1441 self.block_offset += start_len - reader.len();
1442
1443 return Ok(Some((self.current_key.clone(), value)));
1444 }
1445 }
1446}
1447
1448#[cfg(test)]
1449mod tests {
1450 use super::*;
1451
1452 #[test]
1453 fn test_bloom_filter_basic() {
1454 let mut bloom = BloomFilter::new(100, 10);
1455
1456 bloom.insert(b"hello");
1457 bloom.insert(b"world");
1458 bloom.insert(b"test");
1459
1460 assert!(bloom.may_contain(b"hello"));
1461 assert!(bloom.may_contain(b"world"));
1462 assert!(bloom.may_contain(b"test"));
1463
1464 assert!(!bloom.may_contain(b"notfound"));
1466 assert!(!bloom.may_contain(b"missing"));
1467 }
1468
1469 #[test]
1470 fn test_bloom_filter_serialization() {
1471 let mut bloom = BloomFilter::new(100, 10);
1472 bloom.insert(b"key1");
1473 bloom.insert(b"key2");
1474
1475 let bytes = bloom.to_bytes();
1476 let restored = BloomFilter::from_bytes(&bytes).unwrap();
1477
1478 assert!(restored.may_contain(b"key1"));
1479 assert!(restored.may_contain(b"key2"));
1480 assert!(!restored.may_contain(b"key3"));
1481 }
1482
1483 #[test]
1484 fn test_bloom_filter_false_positive_rate() {
1485 let num_keys = 10000;
1486 let mut bloom = BloomFilter::new(num_keys, BLOOM_BITS_PER_KEY);
1487
1488 for i in 0..num_keys {
1490 let key = format!("key_{}", i);
1491 bloom.insert(key.as_bytes());
1492 }
1493
1494 for i in 0..num_keys {
1496 let key = format!("key_{}", i);
1497 assert!(bloom.may_contain(key.as_bytes()));
1498 }
1499
1500 let mut false_positives = 0;
1502 let test_count = 10000;
1503 for i in 0..test_count {
1504 let key = format!("nonexistent_{}", i);
1505 if bloom.may_contain(key.as_bytes()) {
1506 false_positives += 1;
1507 }
1508 }
1509
1510 let fp_rate = false_positives as f64 / test_count as f64;
1513 assert!(
1514 fp_rate < 0.03,
1515 "False positive rate {} is too high",
1516 fp_rate
1517 );
1518 }
1519
1520 #[test]
1521 fn test_sstable_writer_config() {
1522 use crate::structures::IndexOptimization;
1523
1524 let config = SSTableWriterConfig::default();
1526 assert_eq!(config.compression_level.0, 9); assert!(!config.use_bloom_filter);
1528 assert!(!config.use_dictionary);
1529
1530 let adaptive = SSTableWriterConfig::from_optimization(IndexOptimization::Adaptive);
1532 assert_eq!(adaptive.compression_level.0, 9);
1533 assert!(!adaptive.use_bloom_filter);
1534 assert!(!adaptive.use_dictionary);
1535
1536 let size = SSTableWriterConfig::from_optimization(IndexOptimization::SizeOptimized);
1538 assert_eq!(size.compression_level.0, 22); assert!(size.use_bloom_filter);
1540 assert!(size.use_dictionary);
1541
1542 let perf = SSTableWriterConfig::from_optimization(IndexOptimization::PerformanceOptimized);
1544 assert_eq!(perf.compression_level.0, 1); assert!(perf.use_bloom_filter); assert!(!perf.use_dictionary);
1547
1548 let fast = SSTableWriterConfig::fast();
1550 assert_eq!(fast.compression_level.0, 1);
1551
1552 let max = SSTableWriterConfig::max_compression();
1553 assert_eq!(max.compression_level.0, 22);
1554 }
1555
1556 #[test]
1557 fn test_vint_roundtrip() {
1558 let test_values = [0u64, 1, 127, 128, 255, 256, 16383, 16384, u64::MAX];
1559
1560 for &val in &test_values {
1561 let mut buf = Vec::new();
1562 write_vint(&mut buf, val).unwrap();
1563 let mut reader = buf.as_slice();
1564 let decoded = read_vint(&mut reader).unwrap();
1565 assert_eq!(val, decoded, "Failed for value {}", val);
1566 }
1567 }
1568
1569 #[test]
1570 fn test_common_prefix_len() {
1571 assert_eq!(common_prefix_len(b"hello", b"hello"), 5);
1572 assert_eq!(common_prefix_len(b"hello", b"help"), 3);
1573 assert_eq!(common_prefix_len(b"hello", b"world"), 0);
1574 assert_eq!(common_prefix_len(b"", b"hello"), 0);
1575 assert_eq!(common_prefix_len(b"hello", b""), 0);
1576 }
1577}