1use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
22use parking_lot::RwLock;
23use rustc_hash::FxHashMap;
24use std::io::{self, Read, Write};
25use std::sync::Arc;
26
27#[cfg(feature = "native")]
28use super::sstable_index::FstBlockIndex;
29use super::sstable_index::{BlockAddr, BlockIndex, MmapBlockIndex};
30use crate::compression::{CompressionDict, CompressionLevel};
31use crate::directories::{AsyncFileRead, LazyFileHandle, LazyFileSlice, OwnedBytes};
32
33pub const SSTABLE_MAGIC: u32 = 0x53544234; pub const BLOCK_SIZE: usize = 16 * 1024;
39
40pub const DEFAULT_DICT_SIZE: usize = 64 * 1024;
42
43pub const BLOOM_BITS_PER_KEY: usize = 10;
45
46pub const BLOOM_HASH_COUNT: usize = 7;
48
49#[derive(Debug, Clone)]
55pub struct BloomFilter {
56 bits: BloomBits,
57 num_bits: usize,
58 num_hashes: usize,
59}
60
61#[derive(Debug, Clone)]
63enum BloomBits {
64 Vec(Vec<u64>),
66 Bytes(OwnedBytes),
68}
69
70impl BloomBits {
71 #[inline]
72 fn len(&self) -> usize {
73 match self {
74 BloomBits::Vec(v) => v.len(),
75 BloomBits::Bytes(b) => b.len() / 8,
76 }
77 }
78
79 #[inline]
80 fn get(&self, word_idx: usize) -> u64 {
81 match self {
82 BloomBits::Vec(v) => v[word_idx],
83 BloomBits::Bytes(b) => {
84 let off = word_idx * 8;
85 u64::from_le_bytes([
86 b[off],
87 b[off + 1],
88 b[off + 2],
89 b[off + 3],
90 b[off + 4],
91 b[off + 5],
92 b[off + 6],
93 b[off + 7],
94 ])
95 }
96 }
97 }
98
99 #[inline]
100 fn set_bit(&mut self, word_idx: usize, bit_idx: usize) {
101 match self {
102 BloomBits::Vec(v) => v[word_idx] |= 1u64 << bit_idx,
103 BloomBits::Bytes(_) => panic!("cannot mutate read-only bloom filter"),
104 }
105 }
106
107 fn size_bytes(&self) -> usize {
108 match self {
109 BloomBits::Vec(v) => v.len() * 8,
110 BloomBits::Bytes(b) => b.len(),
111 }
112 }
113}
114
115impl BloomFilter {
116 pub fn new(expected_keys: usize, bits_per_key: usize) -> Self {
118 let num_bits = (expected_keys * bits_per_key).max(64);
119 let num_words = num_bits.div_ceil(64);
120 Self {
121 bits: BloomBits::Vec(vec![0u64; num_words]),
122 num_bits,
123 num_hashes: BLOOM_HASH_COUNT,
124 }
125 }
126
127 pub fn from_owned_bytes(data: OwnedBytes) -> io::Result<Self> {
129 if data.len() < 12 {
130 return Err(io::Error::new(
131 io::ErrorKind::InvalidData,
132 "Bloom filter data too short",
133 ));
134 }
135 let d = data.as_slice();
136 let num_bits = u32::from_le_bytes([d[0], d[1], d[2], d[3]]) as usize;
137 let num_hashes = u32::from_le_bytes([d[4], d[5], d[6], d[7]]) as usize;
138 let num_words = u32::from_le_bytes([d[8], d[9], d[10], d[11]]) as usize;
139
140 if d.len() < 12 + num_words * 8 {
141 return Err(io::Error::new(
142 io::ErrorKind::InvalidData,
143 "Bloom filter data truncated",
144 ));
145 }
146
147 let bits_bytes = data.slice(12..12 + num_words * 8);
149
150 Ok(Self {
151 bits: BloomBits::Bytes(bits_bytes),
152 num_bits,
153 num_hashes,
154 })
155 }
156
157 pub fn to_bytes(&self) -> Vec<u8> {
159 let num_words = self.bits.len();
160 let mut data = Vec::with_capacity(12 + num_words * 8);
161 data.write_u32::<LittleEndian>(self.num_bits as u32)
162 .unwrap();
163 data.write_u32::<LittleEndian>(self.num_hashes as u32)
164 .unwrap();
165 data.write_u32::<LittleEndian>(num_words as u32).unwrap();
166 for i in 0..num_words {
167 data.write_u64::<LittleEndian>(self.bits.get(i)).unwrap();
168 }
169 data
170 }
171
172 pub fn insert(&mut self, key: &[u8]) {
174 let (h1, h2) = self.hash_pair(key);
175 for i in 0..self.num_hashes {
176 let bit_pos = self.get_bit_pos(h1, h2, i);
177 let word_idx = bit_pos / 64;
178 let bit_idx = bit_pos % 64;
179 if word_idx < self.bits.len() {
180 self.bits.set_bit(word_idx, bit_idx);
181 }
182 }
183 }
184
185 pub fn may_contain(&self, key: &[u8]) -> bool {
188 let (h1, h2) = self.hash_pair(key);
189 for i in 0..self.num_hashes {
190 let bit_pos = self.get_bit_pos(h1, h2, i);
191 let word_idx = bit_pos / 64;
192 let bit_idx = bit_pos % 64;
193 if word_idx >= self.bits.len() || (self.bits.get(word_idx) & (1u64 << bit_idx)) == 0 {
194 return false;
195 }
196 }
197 true
198 }
199
200 pub fn size_bytes(&self) -> usize {
202 12 + self.bits.size_bytes()
203 }
204
205 pub fn insert_hashed(&mut self, h1: u64, h2: u64) {
207 for i in 0..self.num_hashes {
208 let bit_pos = self.get_bit_pos(h1, h2, i);
209 let word_idx = bit_pos / 64;
210 let bit_idx = bit_pos % 64;
211 if word_idx < self.bits.len() {
212 self.bits.set_bit(word_idx, bit_idx);
213 }
214 }
215 }
216
217 fn hash_pair(&self, key: &[u8]) -> (u64, u64) {
219 let mut h1: u64 = 0xcbf29ce484222325;
221 for &byte in key {
222 h1 ^= byte as u64;
223 h1 = h1.wrapping_mul(0x100000001b3);
224 }
225
226 let mut h2: u64 = 0x84222325cbf29ce4;
228 for &byte in key {
229 h2 = h2.wrapping_mul(0x100000001b3);
230 h2 ^= byte as u64;
231 }
232
233 (h1, h2)
234 }
235
236 #[inline]
238 fn get_bit_pos(&self, h1: u64, h2: u64, i: usize) -> usize {
239 (h1.wrapping_add((i as u64).wrapping_mul(h2)) % (self.num_bits as u64)) as usize
240 }
241}
242
243fn bloom_hash_pair(key: &[u8]) -> (u64, u64) {
246 let mut h1: u64 = 0xcbf29ce484222325;
247 for &byte in key {
248 h1 ^= byte as u64;
249 h1 = h1.wrapping_mul(0x100000001b3);
250 }
251 let mut h2: u64 = 0x84222325cbf29ce4;
252 for &byte in key {
253 h2 = h2.wrapping_mul(0x100000001b3);
254 h2 ^= byte as u64;
255 }
256 (h1, h2)
257}
258
259pub trait SSTableValue: Clone + Send + Sync {
261 fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()>;
262 fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self>;
263}
264
265impl SSTableValue for u64 {
267 fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
268 write_vint(writer, *self)
269 }
270
271 fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
272 read_vint(reader)
273 }
274}
275
276impl SSTableValue for Vec<u8> {
278 fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
279 write_vint(writer, self.len() as u64)?;
280 writer.write_all(self)
281 }
282
283 fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
284 let len = read_vint(reader)? as usize;
285 let mut data = vec![0u8; len];
286 reader.read_exact(&mut data)?;
287 Ok(data)
288 }
289}
290
291#[derive(Debug, Clone, Copy, PartialEq, Eq)]
294pub struct SparseDimInfo {
295 pub offset: u64,
297 pub length: u32,
299}
300
301impl SparseDimInfo {
302 pub fn new(offset: u64, length: u32) -> Self {
303 Self { offset, length }
304 }
305}
306
307impl SSTableValue for SparseDimInfo {
308 fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
309 write_vint(writer, self.offset)?;
310 write_vint(writer, self.length as u64)
311 }
312
313 fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
314 let offset = read_vint(reader)?;
315 let length = read_vint(reader)? as u32;
316 Ok(Self { offset, length })
317 }
318}
319
320pub const MAX_INLINE_POSTINGS: usize = 3;
322
323#[derive(Debug, Clone, PartialEq, Eq)]
331pub enum TermInfo {
332 Inline {
335 doc_freq: u8,
337 data: [u8; 16],
340 data_len: u8,
342 },
343 External {
345 posting_offset: u64,
346 posting_len: u32,
347 doc_freq: u32,
348 position_offset: u64,
350 position_len: u32,
352 },
353}
354
355impl TermInfo {
356 pub fn external(posting_offset: u64, posting_len: u32, doc_freq: u32) -> Self {
358 TermInfo::External {
359 posting_offset,
360 posting_len,
361 doc_freq,
362 position_offset: 0,
363 position_len: 0,
364 }
365 }
366
367 pub fn external_with_positions(
369 posting_offset: u64,
370 posting_len: u32,
371 doc_freq: u32,
372 position_offset: u64,
373 position_len: u32,
374 ) -> Self {
375 TermInfo::External {
376 posting_offset,
377 posting_len,
378 doc_freq,
379 position_offset,
380 position_len,
381 }
382 }
383
384 pub fn try_inline(doc_ids: &[u32], term_freqs: &[u32]) -> Option<Self> {
387 if doc_ids.len() > MAX_INLINE_POSTINGS || doc_ids.is_empty() {
388 return None;
389 }
390
391 let mut data = [0u8; 16];
392 let mut cursor = std::io::Cursor::new(&mut data[..]);
393 let mut prev_doc_id = 0u32;
394
395 for (i, &doc_id) in doc_ids.iter().enumerate() {
396 let delta = doc_id - prev_doc_id;
397 if write_vint(&mut cursor, delta as u64).is_err() {
398 return None;
399 }
400 if write_vint(&mut cursor, term_freqs[i] as u64).is_err() {
401 return None;
402 }
403 prev_doc_id = doc_id;
404 }
405
406 let data_len = cursor.position() as u8;
407 if data_len > 16 {
408 return None;
409 }
410
411 Some(TermInfo::Inline {
412 doc_freq: doc_ids.len() as u8,
413 data,
414 data_len,
415 })
416 }
417
418 pub fn try_inline_iter(count: usize, iter: impl Iterator<Item = (u32, u32)>) -> Option<Self> {
422 if count > MAX_INLINE_POSTINGS || count == 0 {
423 return None;
424 }
425
426 let mut data = [0u8; 16];
427 let mut cursor = std::io::Cursor::new(&mut data[..]);
428 let mut prev_doc_id = 0u32;
429
430 for (doc_id, tf) in iter {
431 let delta = doc_id - prev_doc_id;
432 if write_vint(&mut cursor, delta as u64).is_err() {
433 return None;
434 }
435 if write_vint(&mut cursor, tf as u64).is_err() {
436 return None;
437 }
438 prev_doc_id = doc_id;
439 }
440
441 let data_len = cursor.position() as u8;
442
443 Some(TermInfo::Inline {
444 doc_freq: count as u8,
445 data,
446 data_len,
447 })
448 }
449
450 pub fn doc_freq(&self) -> u32 {
452 match self {
453 TermInfo::Inline { doc_freq, .. } => *doc_freq as u32,
454 TermInfo::External { doc_freq, .. } => *doc_freq,
455 }
456 }
457
458 pub fn is_inline(&self) -> bool {
460 matches!(self, TermInfo::Inline { .. })
461 }
462
463 pub fn external_info(&self) -> Option<(u64, u32)> {
465 match self {
466 TermInfo::External {
467 posting_offset,
468 posting_len,
469 ..
470 } => Some((*posting_offset, *posting_len)),
471 TermInfo::Inline { .. } => None,
472 }
473 }
474
475 pub fn position_info(&self) -> Option<(u64, u32)> {
477 match self {
478 TermInfo::External {
479 position_offset,
480 position_len,
481 ..
482 } if *position_len > 0 => Some((*position_offset, *position_len)),
483 _ => None,
484 }
485 }
486
487 pub fn decode_inline(&self) -> Option<(Vec<u32>, Vec<u32>)> {
490 match self {
491 TermInfo::Inline {
492 doc_freq,
493 data,
494 data_len,
495 } => {
496 let mut doc_ids = Vec::with_capacity(*doc_freq as usize);
497 let mut term_freqs = Vec::with_capacity(*doc_freq as usize);
498 let mut reader = &data[..*data_len as usize];
499 let mut prev_doc_id = 0u32;
500
501 for _ in 0..*doc_freq {
502 let delta = read_vint(&mut reader).ok()? as u32;
503 let tf = read_vint(&mut reader).ok()? as u32;
504 let doc_id = prev_doc_id + delta;
505 doc_ids.push(doc_id);
506 term_freqs.push(tf);
507 prev_doc_id = doc_id;
508 }
509
510 Some((doc_ids, term_freqs))
511 }
512 TermInfo::External { .. } => None,
513 }
514 }
515}
516
517impl SSTableValue for TermInfo {
518 fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
519 match self {
520 TermInfo::Inline {
521 doc_freq,
522 data,
523 data_len,
524 } => {
525 writer.write_u8(0xFF)?;
527 writer.write_u8(*doc_freq)?;
528 writer.write_u8(*data_len)?;
529 writer.write_all(&data[..*data_len as usize])?;
530 }
531 TermInfo::External {
532 posting_offset,
533 posting_len,
534 doc_freq,
535 position_offset,
536 position_len,
537 } => {
538 if *position_len > 0 {
541 writer.write_u8(0x01)?;
542 write_vint(writer, *doc_freq as u64)?;
543 write_vint(writer, *posting_offset)?;
544 write_vint(writer, *posting_len as u64)?;
545 write_vint(writer, *position_offset)?;
546 write_vint(writer, *position_len as u64)?;
547 } else {
548 writer.write_u8(0x00)?;
549 write_vint(writer, *doc_freq as u64)?;
550 write_vint(writer, *posting_offset)?;
551 write_vint(writer, *posting_len as u64)?;
552 }
553 }
554 }
555 Ok(())
556 }
557
558 fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
559 let tag = reader.read_u8()?;
560
561 if tag == 0xFF {
562 let doc_freq = reader.read_u8()?;
564 let data_len = reader.read_u8()?;
565 let mut data = [0u8; 16];
566 reader.read_exact(&mut data[..data_len as usize])?;
567 Ok(TermInfo::Inline {
568 doc_freq,
569 data,
570 data_len,
571 })
572 } else if tag == 0x00 {
573 let doc_freq = read_vint(reader)? as u32;
575 let posting_offset = read_vint(reader)?;
576 let posting_len = read_vint(reader)? as u32;
577 Ok(TermInfo::External {
578 posting_offset,
579 posting_len,
580 doc_freq,
581 position_offset: 0,
582 position_len: 0,
583 })
584 } else if tag == 0x01 {
585 let doc_freq = read_vint(reader)? as u32;
587 let posting_offset = read_vint(reader)?;
588 let posting_len = read_vint(reader)? as u32;
589 let position_offset = read_vint(reader)?;
590 let position_len = read_vint(reader)? as u32;
591 Ok(TermInfo::External {
592 posting_offset,
593 posting_len,
594 doc_freq,
595 position_offset,
596 position_len,
597 })
598 } else {
599 Err(io::Error::new(
600 io::ErrorKind::InvalidData,
601 format!("Invalid TermInfo tag: {}", tag),
602 ))
603 }
604 }
605}
606
607pub fn write_vint<W: Write + ?Sized>(writer: &mut W, mut value: u64) -> io::Result<()> {
609 loop {
610 let byte = (value & 0x7F) as u8;
611 value >>= 7;
612 if value == 0 {
613 writer.write_u8(byte)?;
614 return Ok(());
615 } else {
616 writer.write_u8(byte | 0x80)?;
617 }
618 }
619}
620
621pub fn read_vint<R: Read>(reader: &mut R) -> io::Result<u64> {
623 let mut result = 0u64;
624 let mut shift = 0;
625
626 loop {
627 let byte = reader.read_u8()?;
628 result |= ((byte & 0x7F) as u64) << shift;
629 if byte & 0x80 == 0 {
630 return Ok(result);
631 }
632 shift += 7;
633 if shift >= 64 {
634 return Err(io::Error::new(
635 io::ErrorKind::InvalidData,
636 "varint too long",
637 ));
638 }
639 }
640}
641
642pub fn common_prefix_len(a: &[u8], b: &[u8]) -> usize {
644 a.iter().zip(b.iter()).take_while(|(x, y)| x == y).count()
645}
646
647#[derive(Debug, Clone)]
649pub struct SSTableStats {
650 pub num_blocks: usize,
651 pub num_sparse_entries: usize,
652 pub num_entries: u64,
653 pub has_bloom_filter: bool,
654 pub has_dictionary: bool,
655 pub bloom_filter_size: usize,
656 pub dictionary_size: usize,
657}
658
659#[derive(Debug, Clone)]
661pub struct SSTableWriterConfig {
662 pub compression_level: CompressionLevel,
664 pub use_dictionary: bool,
666 pub dict_size: usize,
668 pub use_bloom_filter: bool,
670 pub bloom_bits_per_key: usize,
672}
673
674impl Default for SSTableWriterConfig {
675 fn default() -> Self {
676 Self::from_optimization(crate::structures::IndexOptimization::default())
677 }
678}
679
680impl SSTableWriterConfig {
681 pub fn from_optimization(optimization: crate::structures::IndexOptimization) -> Self {
683 use crate::structures::IndexOptimization;
684 match optimization {
685 IndexOptimization::Adaptive => Self {
686 compression_level: CompressionLevel::BETTER, use_dictionary: false,
688 dict_size: DEFAULT_DICT_SIZE,
689 use_bloom_filter: false,
690 bloom_bits_per_key: BLOOM_BITS_PER_KEY,
691 },
692 IndexOptimization::SizeOptimized => Self {
693 compression_level: CompressionLevel::MAX, use_dictionary: true,
695 dict_size: DEFAULT_DICT_SIZE,
696 use_bloom_filter: true,
697 bloom_bits_per_key: BLOOM_BITS_PER_KEY,
698 },
699 IndexOptimization::PerformanceOptimized => Self {
700 compression_level: CompressionLevel::FAST, use_dictionary: false,
702 dict_size: DEFAULT_DICT_SIZE,
703 use_bloom_filter: true, bloom_bits_per_key: BLOOM_BITS_PER_KEY,
705 },
706 }
707 }
708
709 pub fn fast() -> Self {
711 Self::from_optimization(crate::structures::IndexOptimization::PerformanceOptimized)
712 }
713
714 pub fn max_compression() -> Self {
716 Self::from_optimization(crate::structures::IndexOptimization::SizeOptimized)
717 }
718}
719
720pub struct SSTableWriter<W: Write, V: SSTableValue> {
726 writer: W,
727 block_buffer: Vec<u8>,
728 prev_key: Vec<u8>,
729 index: Vec<BlockIndexEntry>,
730 current_offset: u64,
731 num_entries: u64,
732 block_first_key: Option<Vec<u8>>,
733 config: SSTableWriterConfig,
734 dictionary: Option<CompressionDict>,
736 bloom_hashes: Vec<(u64, u64)>,
739 _phantom: std::marker::PhantomData<V>,
740}
741
742impl<W: Write, V: SSTableValue> SSTableWriter<W, V> {
743 pub fn new(writer: W) -> Self {
745 Self::with_config(writer, SSTableWriterConfig::default())
746 }
747
748 pub fn with_config(writer: W, config: SSTableWriterConfig) -> Self {
750 Self {
751 writer,
752 block_buffer: Vec::with_capacity(BLOCK_SIZE),
753 prev_key: Vec::new(),
754 index: Vec::new(),
755 current_offset: 0,
756 num_entries: 0,
757 block_first_key: None,
758 config,
759 dictionary: None,
760 bloom_hashes: Vec::new(),
761 _phantom: std::marker::PhantomData,
762 }
763 }
764
765 pub fn with_dictionary(
767 writer: W,
768 config: SSTableWriterConfig,
769 dictionary: CompressionDict,
770 ) -> Self {
771 Self {
772 writer,
773 block_buffer: Vec::with_capacity(BLOCK_SIZE),
774 prev_key: Vec::new(),
775 index: Vec::new(),
776 current_offset: 0,
777 num_entries: 0,
778 block_first_key: None,
779 config,
780 dictionary: Some(dictionary),
781 bloom_hashes: Vec::new(),
782 _phantom: std::marker::PhantomData,
783 }
784 }
785
786 pub fn insert(&mut self, key: &[u8], value: &V) -> io::Result<()> {
787 if self.block_first_key.is_none() {
788 self.block_first_key = Some(key.to_vec());
789 }
790
791 if self.config.use_bloom_filter {
793 self.bloom_hashes.push(bloom_hash_pair(key));
794 }
795
796 let prefix_len = common_prefix_len(&self.prev_key, key);
797 let suffix = &key[prefix_len..];
798
799 write_vint(&mut self.block_buffer, prefix_len as u64)?;
800 write_vint(&mut self.block_buffer, suffix.len() as u64)?;
801 self.block_buffer.extend_from_slice(suffix);
802 value.serialize(&mut self.block_buffer)?;
803
804 self.prev_key.clear();
805 self.prev_key.extend_from_slice(key);
806 self.num_entries += 1;
807
808 if self.block_buffer.len() >= BLOCK_SIZE {
809 self.flush_block()?;
810 }
811
812 Ok(())
813 }
814
815 fn flush_block(&mut self) -> io::Result<()> {
817 if self.block_buffer.is_empty() {
818 return Ok(());
819 }
820
821 let compressed = if let Some(ref dict) = self.dictionary {
823 crate::compression::compress_with_dict(
824 &self.block_buffer,
825 self.config.compression_level,
826 dict,
827 )?
828 } else {
829 crate::compression::compress(&self.block_buffer, self.config.compression_level)?
830 };
831
832 if let Some(first_key) = self.block_first_key.take() {
833 self.index.push(BlockIndexEntry {
834 first_key,
835 offset: self.current_offset,
836 length: compressed.len() as u32,
837 });
838 }
839
840 self.writer.write_all(&compressed)?;
841 self.current_offset += compressed.len() as u64;
842 self.block_buffer.clear();
843 self.prev_key.clear();
844
845 Ok(())
846 }
847
848 pub fn finish(mut self) -> io::Result<W> {
849 self.flush_block()?;
851
852 let bloom_filter = if self.config.use_bloom_filter && !self.bloom_hashes.is_empty() {
854 let mut bloom =
855 BloomFilter::new(self.bloom_hashes.len(), self.config.bloom_bits_per_key);
856 for (h1, h2) in &self.bloom_hashes {
857 bloom.insert_hashed(*h1, *h2);
858 }
859 Some(bloom)
860 } else {
861 None
862 };
863
864 let data_end_offset = self.current_offset;
865
866 let entries: Vec<(Vec<u8>, BlockAddr)> = self
869 .index
870 .iter()
871 .map(|e| {
872 (
873 e.first_key.clone(),
874 BlockAddr {
875 offset: e.offset,
876 length: e.length,
877 },
878 )
879 })
880 .collect();
881
882 #[cfg(feature = "native")]
884 let index_bytes = FstBlockIndex::build(&entries)?;
885 #[cfg(not(feature = "native"))]
886 let index_bytes = MmapBlockIndex::build(&entries)?;
887
888 self.writer
890 .write_u32::<LittleEndian>(index_bytes.len() as u32)?;
891 self.writer.write_all(&index_bytes)?;
892 self.current_offset += 4 + index_bytes.len() as u64;
893
894 let bloom_offset = if let Some(ref bloom) = bloom_filter {
896 let bloom_data = bloom.to_bytes();
897 let offset = self.current_offset;
898 self.writer.write_all(&bloom_data)?;
899 self.current_offset += bloom_data.len() as u64;
900 offset
901 } else {
902 0
903 };
904
905 let dict_offset = if let Some(ref dict) = self.dictionary {
907 let dict_bytes = dict.as_bytes();
908 let offset = self.current_offset;
909 self.writer
910 .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
911 self.writer.write_all(dict_bytes)?;
912 self.current_offset += 4 + dict_bytes.len() as u64;
913 offset
914 } else {
915 0
916 };
917
918 self.writer.write_u64::<LittleEndian>(data_end_offset)?;
920 self.writer.write_u64::<LittleEndian>(self.num_entries)?;
921 self.writer.write_u64::<LittleEndian>(bloom_offset)?; self.writer.write_u64::<LittleEndian>(dict_offset)?; self.writer
924 .write_u8(self.config.compression_level.0 as u8)?;
925 self.writer.write_u32::<LittleEndian>(SSTABLE_MAGIC)?;
926
927 Ok(self.writer)
928 }
929}
930
931#[derive(Debug, Clone)]
933struct BlockIndexEntry {
934 first_key: Vec<u8>,
935 offset: u64,
936 length: u32,
937}
938
939pub struct AsyncSSTableReader<V: SSTableValue> {
946 data_slice: LazyFileSlice,
948 block_index: BlockIndex,
950 num_entries: u64,
951 cache: RwLock<BlockCache>,
953 bloom_filter: Option<BloomFilter>,
955 dictionary: Option<CompressionDict>,
957 #[allow(dead_code)]
959 compression_level: CompressionLevel,
960 _phantom: std::marker::PhantomData<V>,
961}
962
963struct BlockCache {
970 blocks: FxHashMap<u64, Arc<[u8]>>,
971 lru_order: std::collections::VecDeque<u64>,
972 max_blocks: usize,
973}
974
975impl BlockCache {
976 fn new(max_blocks: usize) -> Self {
977 Self {
978 blocks: FxHashMap::default(),
979 lru_order: std::collections::VecDeque::with_capacity(max_blocks),
980 max_blocks,
981 }
982 }
983
984 fn get(&mut self, offset: u64) -> Option<Arc<[u8]>> {
985 if self.blocks.contains_key(&offset) {
986 self.promote(offset);
987 self.blocks.get(&offset).map(Arc::clone)
988 } else {
989 None
990 }
991 }
992
993 fn insert(&mut self, offset: u64, block: Arc<[u8]>) {
994 if self.blocks.contains_key(&offset) {
995 self.promote(offset);
996 return;
997 }
998 while self.blocks.len() >= self.max_blocks {
999 if let Some(evict_offset) = self.lru_order.pop_front() {
1000 self.blocks.remove(&evict_offset);
1001 } else {
1002 break;
1003 }
1004 }
1005 self.blocks.insert(offset, block);
1006 self.lru_order.push_back(offset);
1007 }
1008
1009 fn promote(&mut self, offset: u64) {
1011 if let Some(pos) = self.lru_order.iter().position(|&k| k == offset) {
1012 self.lru_order.remove(pos);
1013 self.lru_order.push_back(offset);
1014 }
1015 }
1016}
1017
1018impl<V: SSTableValue> AsyncSSTableReader<V> {
1019 pub async fn open(file_handle: LazyFileHandle, cache_blocks: usize) -> io::Result<Self> {
1026 let file_len = file_handle.len();
1027 if file_len < 37 {
1028 return Err(io::Error::new(
1029 io::ErrorKind::InvalidData,
1030 "SSTable too small",
1031 ));
1032 }
1033
1034 let footer_bytes = file_handle
1037 .read_bytes_range(file_len - 37..file_len)
1038 .await?;
1039
1040 let mut reader = footer_bytes.as_slice();
1041 let data_end_offset = reader.read_u64::<LittleEndian>()?;
1042 let num_entries = reader.read_u64::<LittleEndian>()?;
1043 let bloom_offset = reader.read_u64::<LittleEndian>()?;
1044 let dict_offset = reader.read_u64::<LittleEndian>()?;
1045 let compression_level = CompressionLevel(reader.read_u8()? as i32);
1046 let magic = reader.read_u32::<LittleEndian>()?;
1047
1048 if magic != SSTABLE_MAGIC {
1049 return Err(io::Error::new(
1050 io::ErrorKind::InvalidData,
1051 format!("Invalid SSTable magic: 0x{:08X}", magic),
1052 ));
1053 }
1054
1055 let index_start = data_end_offset;
1057 let index_end = file_len - 37;
1058 let index_bytes = file_handle.read_bytes_range(index_start..index_end).await?;
1059
1060 let mut idx_reader = index_bytes.as_slice();
1062 let index_len = idx_reader.read_u32::<LittleEndian>()? as usize;
1063
1064 if index_len > idx_reader.len() {
1065 return Err(io::Error::new(
1066 io::ErrorKind::InvalidData,
1067 "Index data truncated",
1068 ));
1069 }
1070
1071 let index_data = index_bytes.slice(4..4 + index_len);
1072
1073 #[cfg(feature = "native")]
1075 let block_index = match FstBlockIndex::load(index_data.clone()) {
1076 Ok(fst_idx) => BlockIndex::Fst(fst_idx),
1077 Err(_) => BlockIndex::Mmap(MmapBlockIndex::load(index_data)?),
1078 };
1079 #[cfg(not(feature = "native"))]
1080 let block_index = BlockIndex::Mmap(MmapBlockIndex::load(index_data)?);
1081
1082 let bloom_filter = if bloom_offset > 0 {
1084 let bloom_start = bloom_offset;
1085 let bloom_header = file_handle
1087 .read_bytes_range(bloom_start..bloom_start + 12)
1088 .await?;
1089 let num_words = u32::from_le_bytes([
1090 bloom_header[8],
1091 bloom_header[9],
1092 bloom_header[10],
1093 bloom_header[11],
1094 ]) as u64;
1095 let bloom_size = 12 + num_words * 8;
1096 let bloom_data = file_handle
1097 .read_bytes_range(bloom_start..bloom_start + bloom_size)
1098 .await?;
1099 Some(BloomFilter::from_owned_bytes(bloom_data)?)
1100 } else {
1101 None
1102 };
1103
1104 let dictionary = if dict_offset > 0 {
1106 let dict_start = dict_offset;
1107 let dict_len_bytes = file_handle
1109 .read_bytes_range(dict_start..dict_start + 4)
1110 .await?;
1111 let dict_len = u32::from_le_bytes([
1112 dict_len_bytes[0],
1113 dict_len_bytes[1],
1114 dict_len_bytes[2],
1115 dict_len_bytes[3],
1116 ]) as u64;
1117 let dict_data = file_handle
1118 .read_bytes_range(dict_start + 4..dict_start + 4 + dict_len)
1119 .await?;
1120 Some(CompressionDict::from_owned_bytes(dict_data))
1121 } else {
1122 None
1123 };
1124
1125 let data_slice = file_handle.slice(0..data_end_offset);
1127
1128 Ok(Self {
1129 data_slice,
1130 block_index,
1131 num_entries,
1132 cache: RwLock::new(BlockCache::new(cache_blocks)),
1133 bloom_filter,
1134 dictionary,
1135 compression_level,
1136 _phantom: std::marker::PhantomData,
1137 })
1138 }
1139
1140 pub fn num_entries(&self) -> u64 {
1142 self.num_entries
1143 }
1144
1145 pub fn stats(&self) -> SSTableStats {
1147 SSTableStats {
1148 num_blocks: self.block_index.len(),
1149 num_sparse_entries: 0, num_entries: self.num_entries,
1151 has_bloom_filter: self.bloom_filter.is_some(),
1152 has_dictionary: self.dictionary.is_some(),
1153 bloom_filter_size: self
1154 .bloom_filter
1155 .as_ref()
1156 .map(|b| b.size_bytes())
1157 .unwrap_or(0),
1158 dictionary_size: self.dictionary.as_ref().map(|d| d.len()).unwrap_or(0),
1159 }
1160 }
1161
1162 pub fn cached_blocks(&self) -> usize {
1164 self.cache.read().blocks.len()
1165 }
1166
1167 pub async fn get(&self, key: &[u8]) -> io::Result<Option<V>> {
1172 log::debug!(
1173 "SSTable::get called, key_len={}, total_blocks={}",
1174 key.len(),
1175 self.block_index.len()
1176 );
1177
1178 if let Some(ref bloom) = self.bloom_filter
1180 && !bloom.may_contain(key)
1181 {
1182 log::debug!("SSTable::get bloom filter negative");
1183 return Ok(None);
1184 }
1185
1186 let block_idx = match self.block_index.locate(key) {
1188 Some(idx) => idx,
1189 None => {
1190 log::debug!("SSTable::get key not found (before first block)");
1191 return Ok(None);
1192 }
1193 };
1194
1195 log::debug!("SSTable::get loading block_idx={}", block_idx);
1196
1197 let block_data = self.load_block(block_idx).await?;
1199 self.search_block(&block_data, key)
1200 }
1201
1202 pub async fn get_batch(&self, keys: &[&[u8]]) -> io::Result<Vec<Option<V>>> {
1208 if keys.is_empty() {
1209 return Ok(Vec::new());
1210 }
1211
1212 let mut key_to_block: Vec<(usize, usize)> = Vec::with_capacity(keys.len());
1214 for (key_idx, key) in keys.iter().enumerate() {
1215 if let Some(ref bloom) = self.bloom_filter
1217 && !bloom.may_contain(key)
1218 {
1219 key_to_block.push((key_idx, usize::MAX)); continue;
1221 }
1222
1223 match self.block_index.locate(key) {
1224 Some(block_idx) => key_to_block.push((key_idx, block_idx)),
1225 None => key_to_block.push((key_idx, usize::MAX)), }
1227 }
1228
1229 let mut blocks_to_load: Vec<usize> = key_to_block
1231 .iter()
1232 .filter(|(_, b)| *b != usize::MAX)
1233 .map(|(_, b)| *b)
1234 .collect();
1235 blocks_to_load.sort_unstable();
1236 blocks_to_load.dedup();
1237
1238 for &block_idx in &blocks_to_load {
1240 let _ = self.load_block(block_idx).await?;
1241 }
1242
1243 let mut results = vec![None; keys.len()];
1245 for (key_idx, block_idx) in key_to_block {
1246 if block_idx == usize::MAX {
1247 continue;
1248 }
1249 let block_data = self.load_block(block_idx).await?; results[key_idx] = self.search_block(&block_data, keys[key_idx])?;
1251 }
1252
1253 Ok(results)
1254 }
1255
1256 pub async fn preload_all_blocks(&self) -> io::Result<()> {
1261 for block_idx in 0..self.block_index.len() {
1262 self.load_block(block_idx).await?;
1263 }
1264 Ok(())
1265 }
1266
1267 pub async fn prefetch_all_data_bulk(&self) -> io::Result<()> {
1273 let num_blocks = self.block_index.len();
1274 if num_blocks == 0 {
1275 return Ok(());
1276 }
1277
1278 let mut max_end: u64 = 0;
1280 for i in 0..num_blocks {
1281 if let Some(addr) = self.block_index.get_addr(i) {
1282 max_end = max_end.max(addr.offset + addr.length as u64);
1283 }
1284 }
1285
1286 let all_data = self.data_slice.read_bytes_range(0..max_end).await?;
1288 let buf = all_data.as_slice();
1289
1290 let mut cache = self.cache.write();
1292 cache.max_blocks = cache.max_blocks.max(num_blocks);
1293 for i in 0..num_blocks {
1294 let addr = self.block_index.get_addr(i).unwrap();
1295 if cache.get(addr.offset).is_some() {
1296 continue;
1297 }
1298 let compressed =
1299 &buf[addr.offset as usize..(addr.offset + addr.length as u64) as usize];
1300 let decompressed = if let Some(ref dict) = self.dictionary {
1301 crate::compression::decompress_with_dict(compressed, dict)?
1302 } else {
1303 crate::compression::decompress(compressed)?
1304 };
1305 cache.insert(addr.offset, Arc::from(decompressed));
1306 }
1307
1308 Ok(())
1309 }
1310
1311 async fn load_block(&self, block_idx: usize) -> io::Result<Arc<[u8]>> {
1314 let addr = self.block_index.get_addr(block_idx).ok_or_else(|| {
1315 io::Error::new(io::ErrorKind::InvalidInput, "Block index out of range")
1316 })?;
1317
1318 {
1320 if let Some(block) = self.cache.write().get(addr.offset) {
1321 return Ok(block);
1322 }
1323 }
1324
1325 log::debug!(
1326 "SSTable::load_block idx={} CACHE MISS, reading bytes [{}-{}]",
1327 block_idx,
1328 addr.offset,
1329 addr.offset + addr.length as u64
1330 );
1331
1332 let range = addr.byte_range();
1334 let compressed = self.data_slice.read_bytes_range(range).await?;
1335
1336 let decompressed = if let Some(ref dict) = self.dictionary {
1338 crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
1339 } else {
1340 crate::compression::decompress(compressed.as_slice())?
1341 };
1342
1343 let block: Arc<[u8]> = Arc::from(decompressed);
1344
1345 {
1347 let mut cache = self.cache.write();
1348 cache.insert(addr.offset, Arc::clone(&block));
1349 }
1350
1351 Ok(block)
1352 }
1353
1354 fn search_block(&self, block_data: &[u8], target_key: &[u8]) -> io::Result<Option<V>> {
1355 let mut reader = block_data;
1356 let mut current_key = Vec::new();
1357
1358 while !reader.is_empty() {
1359 let common_prefix_len = read_vint(&mut reader)? as usize;
1360 let suffix_len = read_vint(&mut reader)? as usize;
1361
1362 current_key.truncate(common_prefix_len);
1363 let mut suffix = vec![0u8; suffix_len];
1364 reader.read_exact(&mut suffix)?;
1365 current_key.extend_from_slice(&suffix);
1366
1367 let value = V::deserialize(&mut reader)?;
1368
1369 match current_key.as_slice().cmp(target_key) {
1370 std::cmp::Ordering::Equal => return Ok(Some(value)),
1371 std::cmp::Ordering::Greater => return Ok(None),
1372 std::cmp::Ordering::Less => continue,
1373 }
1374 }
1375
1376 Ok(None)
1377 }
1378
1379 pub async fn prefetch_range(&self, start_key: &[u8], end_key: &[u8]) -> io::Result<()> {
1381 let start_block = self.block_index.locate(start_key).unwrap_or(0);
1382 let end_block = self
1383 .block_index
1384 .locate(end_key)
1385 .unwrap_or(self.block_index.len().saturating_sub(1));
1386
1387 for block_idx in start_block..=end_block.min(self.block_index.len().saturating_sub(1)) {
1388 let _ = self.load_block(block_idx).await?;
1389 }
1390
1391 Ok(())
1392 }
1393
1394 pub fn iter(&self) -> AsyncSSTableIterator<'_, V> {
1396 AsyncSSTableIterator::new(self)
1397 }
1398
1399 pub async fn all_entries(&self) -> io::Result<Vec<(Vec<u8>, V)>> {
1401 let mut results = Vec::new();
1402
1403 for block_idx in 0..self.block_index.len() {
1404 let block_data = self.load_block(block_idx).await?;
1405 let mut reader = &block_data[..];
1406 let mut current_key = Vec::new();
1407
1408 while !reader.is_empty() {
1409 let common_prefix_len = read_vint(&mut reader)? as usize;
1410 let suffix_len = read_vint(&mut reader)? as usize;
1411
1412 current_key.truncate(common_prefix_len);
1413 let mut suffix = vec![0u8; suffix_len];
1414 reader.read_exact(&mut suffix)?;
1415 current_key.extend_from_slice(&suffix);
1416
1417 let value = V::deserialize(&mut reader)?;
1418 results.push((current_key.clone(), value));
1419 }
1420 }
1421
1422 Ok(results)
1423 }
1424}
1425
1426pub struct AsyncSSTableIterator<'a, V: SSTableValue> {
1428 reader: &'a AsyncSSTableReader<V>,
1429 current_block: usize,
1430 block_data: Option<Arc<[u8]>>,
1431 block_offset: usize,
1432 current_key: Vec<u8>,
1433 finished: bool,
1434}
1435
1436impl<'a, V: SSTableValue> AsyncSSTableIterator<'a, V> {
1437 fn new(reader: &'a AsyncSSTableReader<V>) -> Self {
1438 Self {
1439 reader,
1440 current_block: 0,
1441 block_data: None,
1442 block_offset: 0,
1443 current_key: Vec::new(),
1444 finished: reader.block_index.is_empty(),
1445 }
1446 }
1447
1448 async fn load_next_block(&mut self) -> io::Result<bool> {
1449 if self.current_block >= self.reader.block_index.len() {
1450 self.finished = true;
1451 return Ok(false);
1452 }
1453
1454 self.block_data = Some(self.reader.load_block(self.current_block).await?);
1455 self.block_offset = 0;
1456 self.current_key.clear();
1457 self.current_block += 1;
1458 Ok(true)
1459 }
1460
1461 pub async fn next(&mut self) -> io::Result<Option<(Vec<u8>, V)>> {
1463 if self.finished {
1464 return Ok(None);
1465 }
1466
1467 if self.block_data.is_none() && !self.load_next_block().await? {
1468 return Ok(None);
1469 }
1470
1471 loop {
1472 let block = self.block_data.as_ref().unwrap();
1473 if self.block_offset >= block.len() {
1474 if !self.load_next_block().await? {
1475 return Ok(None);
1476 }
1477 continue;
1478 }
1479
1480 let mut reader = &block[self.block_offset..];
1481 let start_len = reader.len();
1482
1483 let common_prefix_len = read_vint(&mut reader)? as usize;
1484 let suffix_len = read_vint(&mut reader)? as usize;
1485
1486 self.current_key.truncate(common_prefix_len);
1487 let mut suffix = vec![0u8; suffix_len];
1488 reader.read_exact(&mut suffix)?;
1489 self.current_key.extend_from_slice(&suffix);
1490
1491 let value = V::deserialize(&mut reader)?;
1492
1493 self.block_offset += start_len - reader.len();
1494
1495 return Ok(Some((self.current_key.clone(), value)));
1496 }
1497 }
1498}
1499
1500#[cfg(test)]
1501mod tests {
1502 use super::*;
1503
1504 #[test]
1505 fn test_bloom_filter_basic() {
1506 let mut bloom = BloomFilter::new(100, 10);
1507
1508 bloom.insert(b"hello");
1509 bloom.insert(b"world");
1510 bloom.insert(b"test");
1511
1512 assert!(bloom.may_contain(b"hello"));
1513 assert!(bloom.may_contain(b"world"));
1514 assert!(bloom.may_contain(b"test"));
1515
1516 assert!(!bloom.may_contain(b"notfound"));
1518 assert!(!bloom.may_contain(b"missing"));
1519 }
1520
1521 #[test]
1522 fn test_bloom_filter_serialization() {
1523 let mut bloom = BloomFilter::new(100, 10);
1524 bloom.insert(b"key1");
1525 bloom.insert(b"key2");
1526
1527 let bytes = bloom.to_bytes();
1528 let restored = BloomFilter::from_owned_bytes(OwnedBytes::new(bytes)).unwrap();
1529
1530 assert!(restored.may_contain(b"key1"));
1531 assert!(restored.may_contain(b"key2"));
1532 assert!(!restored.may_contain(b"key3"));
1533 }
1534
1535 #[test]
1536 fn test_bloom_filter_false_positive_rate() {
1537 let num_keys = 10000;
1538 let mut bloom = BloomFilter::new(num_keys, BLOOM_BITS_PER_KEY);
1539
1540 for i in 0..num_keys {
1542 let key = format!("key_{}", i);
1543 bloom.insert(key.as_bytes());
1544 }
1545
1546 for i in 0..num_keys {
1548 let key = format!("key_{}", i);
1549 assert!(bloom.may_contain(key.as_bytes()));
1550 }
1551
1552 let mut false_positives = 0;
1554 let test_count = 10000;
1555 for i in 0..test_count {
1556 let key = format!("nonexistent_{}", i);
1557 if bloom.may_contain(key.as_bytes()) {
1558 false_positives += 1;
1559 }
1560 }
1561
1562 let fp_rate = false_positives as f64 / test_count as f64;
1565 assert!(
1566 fp_rate < 0.03,
1567 "False positive rate {} is too high",
1568 fp_rate
1569 );
1570 }
1571
1572 #[test]
1573 fn test_sstable_writer_config() {
1574 use crate::structures::IndexOptimization;
1575
1576 let config = SSTableWriterConfig::default();
1578 assert_eq!(config.compression_level.0, 9); assert!(!config.use_bloom_filter);
1580 assert!(!config.use_dictionary);
1581
1582 let adaptive = SSTableWriterConfig::from_optimization(IndexOptimization::Adaptive);
1584 assert_eq!(adaptive.compression_level.0, 9);
1585 assert!(!adaptive.use_bloom_filter);
1586 assert!(!adaptive.use_dictionary);
1587
1588 let size = SSTableWriterConfig::from_optimization(IndexOptimization::SizeOptimized);
1590 assert_eq!(size.compression_level.0, 22); assert!(size.use_bloom_filter);
1592 assert!(size.use_dictionary);
1593
1594 let perf = SSTableWriterConfig::from_optimization(IndexOptimization::PerformanceOptimized);
1596 assert_eq!(perf.compression_level.0, 1); assert!(perf.use_bloom_filter); assert!(!perf.use_dictionary);
1599
1600 let fast = SSTableWriterConfig::fast();
1602 assert_eq!(fast.compression_level.0, 1);
1603
1604 let max = SSTableWriterConfig::max_compression();
1605 assert_eq!(max.compression_level.0, 22);
1606 }
1607
1608 #[test]
1609 fn test_vint_roundtrip() {
1610 let test_values = [0u64, 1, 127, 128, 255, 256, 16383, 16384, u64::MAX];
1611
1612 for &val in &test_values {
1613 let mut buf = Vec::new();
1614 write_vint(&mut buf, val).unwrap();
1615 let mut reader = buf.as_slice();
1616 let decoded = read_vint(&mut reader).unwrap();
1617 assert_eq!(val, decoded, "Failed for value {}", val);
1618 }
1619 }
1620
1621 #[test]
1622 fn test_common_prefix_len() {
1623 assert_eq!(common_prefix_len(b"hello", b"hello"), 5);
1624 assert_eq!(common_prefix_len(b"hello", b"help"), 3);
1625 assert_eq!(common_prefix_len(b"hello", b"world"), 0);
1626 assert_eq!(common_prefix_len(b"", b"hello"), 0);
1627 assert_eq!(common_prefix_len(b"hello", b""), 0);
1628 }
1629}