1use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
22use parking_lot::RwLock;
23use rustc_hash::FxHashMap;
24use std::io::{self, Read, Write};
25use std::sync::Arc;
26
27#[cfg(feature = "native")]
28use super::sstable_index::FstBlockIndex;
29use super::sstable_index::{BlockAddr, BlockIndex, MmapBlockIndex};
30use crate::compression::{CompressionDict, CompressionLevel};
31use crate::directories::{FileHandle, OwnedBytes};
32
33pub const SSTABLE_MAGIC: u32 = 0x53544234; pub const BLOCK_SIZE: usize = 16 * 1024;
39
40pub const DEFAULT_DICT_SIZE: usize = 64 * 1024;
42
43pub const BLOOM_BITS_PER_KEY: usize = 10;
45
46pub const BLOOM_HASH_COUNT: usize = 7;
48
49#[derive(Debug, Clone)]
55pub struct BloomFilter {
56 bits: BloomBits,
57 num_bits: usize,
58 num_hashes: usize,
59}
60
61#[derive(Debug, Clone)]
63enum BloomBits {
64 Vec(Vec<u64>),
66 Bytes(OwnedBytes),
68}
69
70impl BloomBits {
71 #[inline]
72 fn len(&self) -> usize {
73 match self {
74 BloomBits::Vec(v) => v.len(),
75 BloomBits::Bytes(b) => b.len() / 8,
76 }
77 }
78
79 #[inline]
80 fn get(&self, word_idx: usize) -> u64 {
81 match self {
82 BloomBits::Vec(v) => v[word_idx],
83 BloomBits::Bytes(b) => {
84 let off = word_idx * 8;
85 u64::from_le_bytes([
86 b[off],
87 b[off + 1],
88 b[off + 2],
89 b[off + 3],
90 b[off + 4],
91 b[off + 5],
92 b[off + 6],
93 b[off + 7],
94 ])
95 }
96 }
97 }
98
99 #[inline]
100 fn set_bit(&mut self, word_idx: usize, bit_idx: usize) {
101 match self {
102 BloomBits::Vec(v) => v[word_idx] |= 1u64 << bit_idx,
103 BloomBits::Bytes(_) => panic!("cannot mutate read-only bloom filter"),
104 }
105 }
106
107 fn size_bytes(&self) -> usize {
108 match self {
109 BloomBits::Vec(v) => v.len() * 8,
110 BloomBits::Bytes(b) => b.len(),
111 }
112 }
113}
114
115impl BloomFilter {
116 pub fn new(expected_keys: usize, bits_per_key: usize) -> Self {
118 let num_bits = (expected_keys * bits_per_key).max(64);
119 let num_words = num_bits.div_ceil(64);
120 Self {
121 bits: BloomBits::Vec(vec![0u64; num_words]),
122 num_bits,
123 num_hashes: BLOOM_HASH_COUNT,
124 }
125 }
126
127 pub fn from_owned_bytes(data: OwnedBytes) -> io::Result<Self> {
129 if data.len() < 12 {
130 return Err(io::Error::new(
131 io::ErrorKind::InvalidData,
132 "Bloom filter data too short",
133 ));
134 }
135 let d = data.as_slice();
136 let num_bits = u32::from_le_bytes([d[0], d[1], d[2], d[3]]) as usize;
137 let num_hashes = u32::from_le_bytes([d[4], d[5], d[6], d[7]]) as usize;
138 let num_words = u32::from_le_bytes([d[8], d[9], d[10], d[11]]) as usize;
139
140 if d.len() < 12 + num_words * 8 {
141 return Err(io::Error::new(
142 io::ErrorKind::InvalidData,
143 "Bloom filter data truncated",
144 ));
145 }
146
147 let bits_bytes = data.slice(12..12 + num_words * 8);
149
150 Ok(Self {
151 bits: BloomBits::Bytes(bits_bytes),
152 num_bits,
153 num_hashes,
154 })
155 }
156
157 pub fn to_bytes(&self) -> Vec<u8> {
159 let num_words = self.bits.len();
160 let mut data = Vec::with_capacity(12 + num_words * 8);
161 data.write_u32::<LittleEndian>(self.num_bits as u32)
162 .unwrap();
163 data.write_u32::<LittleEndian>(self.num_hashes as u32)
164 .unwrap();
165 data.write_u32::<LittleEndian>(num_words as u32).unwrap();
166 for i in 0..num_words {
167 data.write_u64::<LittleEndian>(self.bits.get(i)).unwrap();
168 }
169 data
170 }
171
172 pub fn insert(&mut self, key: &[u8]) {
174 let (h1, h2) = self.hash_pair(key);
175 for i in 0..self.num_hashes {
176 let bit_pos = self.get_bit_pos(h1, h2, i);
177 let word_idx = bit_pos / 64;
178 let bit_idx = bit_pos % 64;
179 if word_idx < self.bits.len() {
180 self.bits.set_bit(word_idx, bit_idx);
181 }
182 }
183 }
184
185 pub fn may_contain(&self, key: &[u8]) -> bool {
188 let (h1, h2) = self.hash_pair(key);
189 for i in 0..self.num_hashes {
190 let bit_pos = self.get_bit_pos(h1, h2, i);
191 let word_idx = bit_pos / 64;
192 let bit_idx = bit_pos % 64;
193 if word_idx >= self.bits.len() || (self.bits.get(word_idx) & (1u64 << bit_idx)) == 0 {
194 return false;
195 }
196 }
197 true
198 }
199
200 pub fn size_bytes(&self) -> usize {
202 12 + self.bits.size_bytes()
203 }
204
205 pub fn insert_hashed(&mut self, h1: u64, h2: u64) {
207 for i in 0..self.num_hashes {
208 let bit_pos = self.get_bit_pos(h1, h2, i);
209 let word_idx = bit_pos / 64;
210 let bit_idx = bit_pos % 64;
211 if word_idx < self.bits.len() {
212 self.bits.set_bit(word_idx, bit_idx);
213 }
214 }
215 }
216
217 #[inline]
219 fn hash_pair(&self, key: &[u8]) -> (u64, u64) {
220 let mut h1: u64 = 0xcbf29ce484222325;
221 let mut h2: u64 = 0x84222325cbf29ce4;
222 for &byte in key {
223 h1 ^= byte as u64;
224 h1 = h1.wrapping_mul(0x100000001b3);
225 h2 = h2.wrapping_mul(0x100000001b3);
226 h2 ^= byte as u64;
227 }
228 (h1, h2)
229 }
230
231 #[inline]
233 fn get_bit_pos(&self, h1: u64, h2: u64, i: usize) -> usize {
234 (h1.wrapping_add((i as u64).wrapping_mul(h2)) % (self.num_bits as u64)) as usize
235 }
236}
237
238#[inline]
241fn bloom_hash_pair(key: &[u8]) -> (u64, u64) {
242 let mut h1: u64 = 0xcbf29ce484222325;
243 let mut h2: u64 = 0x84222325cbf29ce4;
244 for &byte in key {
245 h1 ^= byte as u64;
246 h1 = h1.wrapping_mul(0x100000001b3);
247 h2 = h2.wrapping_mul(0x100000001b3);
248 h2 ^= byte as u64;
249 }
250 (h1, h2)
251}
252
253pub trait SSTableValue: Clone + Send + Sync {
255 fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()>;
256 fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self>;
257}
258
259impl SSTableValue for u64 {
261 fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
262 write_vint(writer, *self)
263 }
264
265 fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
266 read_vint(reader)
267 }
268}
269
270impl SSTableValue for Vec<u8> {
272 fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
273 write_vint(writer, self.len() as u64)?;
274 writer.write_all(self)
275 }
276
277 fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
278 let len = read_vint(reader)? as usize;
279 let mut data = vec![0u8; len];
280 reader.read_exact(&mut data)?;
281 Ok(data)
282 }
283}
284
285#[derive(Debug, Clone, Copy, PartialEq, Eq)]
288pub struct SparseDimInfo {
289 pub offset: u64,
291 pub length: u32,
293}
294
295impl SparseDimInfo {
296 pub fn new(offset: u64, length: u32) -> Self {
297 Self { offset, length }
298 }
299}
300
301impl SSTableValue for SparseDimInfo {
302 fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
303 write_vint(writer, self.offset)?;
304 write_vint(writer, self.length as u64)
305 }
306
307 fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
308 let offset = read_vint(reader)?;
309 let length = read_vint(reader)? as u32;
310 Ok(Self { offset, length })
311 }
312}
313
314pub const MAX_INLINE_POSTINGS: usize = 3;
316
317#[derive(Debug, Clone, PartialEq, Eq)]
325pub enum TermInfo {
326 Inline {
329 doc_freq: u8,
331 data: [u8; 16],
334 data_len: u8,
336 },
337 External {
339 posting_offset: u64,
340 posting_len: u64,
341 doc_freq: u32,
342 position_offset: u64,
344 position_len: u64,
346 },
347}
348
349impl TermInfo {
350 pub fn external(posting_offset: u64, posting_len: u64, doc_freq: u32) -> Self {
352 TermInfo::External {
353 posting_offset,
354 posting_len,
355 doc_freq,
356 position_offset: 0,
357 position_len: 0,
358 }
359 }
360
361 pub fn external_with_positions(
363 posting_offset: u64,
364 posting_len: u64,
365 doc_freq: u32,
366 position_offset: u64,
367 position_len: u64,
368 ) -> Self {
369 TermInfo::External {
370 posting_offset,
371 posting_len,
372 doc_freq,
373 position_offset,
374 position_len,
375 }
376 }
377
378 pub fn try_inline(doc_ids: &[u32], term_freqs: &[u32]) -> Option<Self> {
381 if doc_ids.len() > MAX_INLINE_POSTINGS || doc_ids.is_empty() {
382 return None;
383 }
384
385 let mut data = [0u8; 16];
386 let mut cursor = std::io::Cursor::new(&mut data[..]);
387 let mut prev_doc_id = 0u32;
388
389 for (i, &doc_id) in doc_ids.iter().enumerate() {
390 let delta = doc_id - prev_doc_id;
391 if write_vint(&mut cursor, delta as u64).is_err() {
392 return None;
393 }
394 if write_vint(&mut cursor, term_freqs[i] as u64).is_err() {
395 return None;
396 }
397 prev_doc_id = doc_id;
398 }
399
400 let data_len = cursor.position() as u8;
401 if data_len > 16 {
402 return None;
403 }
404
405 Some(TermInfo::Inline {
406 doc_freq: doc_ids.len() as u8,
407 data,
408 data_len,
409 })
410 }
411
412 pub fn try_inline_iter(count: usize, iter: impl Iterator<Item = (u32, u32)>) -> Option<Self> {
416 if count > MAX_INLINE_POSTINGS || count == 0 {
417 return None;
418 }
419
420 let mut data = [0u8; 16];
421 let mut cursor = std::io::Cursor::new(&mut data[..]);
422 let mut prev_doc_id = 0u32;
423
424 for (doc_id, tf) in iter {
425 let delta = doc_id - prev_doc_id;
426 if write_vint(&mut cursor, delta as u64).is_err() {
427 return None;
428 }
429 if write_vint(&mut cursor, tf as u64).is_err() {
430 return None;
431 }
432 prev_doc_id = doc_id;
433 }
434
435 let data_len = cursor.position() as u8;
436
437 Some(TermInfo::Inline {
438 doc_freq: count as u8,
439 data,
440 data_len,
441 })
442 }
443
444 pub fn doc_freq(&self) -> u32 {
446 match self {
447 TermInfo::Inline { doc_freq, .. } => *doc_freq as u32,
448 TermInfo::External { doc_freq, .. } => *doc_freq,
449 }
450 }
451
452 pub fn is_inline(&self) -> bool {
454 matches!(self, TermInfo::Inline { .. })
455 }
456
457 pub fn external_info(&self) -> Option<(u64, u64)> {
459 match self {
460 TermInfo::External {
461 posting_offset,
462 posting_len,
463 ..
464 } => Some((*posting_offset, *posting_len)),
465 TermInfo::Inline { .. } => None,
466 }
467 }
468
469 pub fn position_info(&self) -> Option<(u64, u64)> {
471 match self {
472 TermInfo::External {
473 position_offset,
474 position_len,
475 ..
476 } if *position_len > 0 => Some((*position_offset, *position_len)),
477 _ => None,
478 }
479 }
480
481 pub fn decode_inline(&self) -> Option<(Vec<u32>, Vec<u32>)> {
484 match self {
485 TermInfo::Inline {
486 doc_freq,
487 data,
488 data_len,
489 } => {
490 let mut doc_ids = Vec::with_capacity(*doc_freq as usize);
491 let mut term_freqs = Vec::with_capacity(*doc_freq as usize);
492 let mut reader = &data[..*data_len as usize];
493 let mut prev_doc_id = 0u32;
494
495 for _ in 0..*doc_freq {
496 let delta = read_vint(&mut reader).ok()? as u32;
497 let tf = read_vint(&mut reader).ok()? as u32;
498 let doc_id = prev_doc_id + delta;
499 doc_ids.push(doc_id);
500 term_freqs.push(tf);
501 prev_doc_id = doc_id;
502 }
503
504 Some((doc_ids, term_freqs))
505 }
506 TermInfo::External { .. } => None,
507 }
508 }
509}
510
511impl SSTableValue for TermInfo {
512 fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
513 match self {
514 TermInfo::Inline {
515 doc_freq,
516 data,
517 data_len,
518 } => {
519 writer.write_u8(0xFF)?;
521 writer.write_u8(*doc_freq)?;
522 writer.write_u8(*data_len)?;
523 writer.write_all(&data[..*data_len as usize])?;
524 }
525 TermInfo::External {
526 posting_offset,
527 posting_len,
528 doc_freq,
529 position_offset,
530 position_len,
531 } => {
532 if *position_len > 0 {
535 writer.write_u8(0x01)?;
536 write_vint(writer, *doc_freq as u64)?;
537 write_vint(writer, *posting_offset)?;
538 write_vint(writer, *posting_len)?;
539 write_vint(writer, *position_offset)?;
540 write_vint(writer, *position_len)?;
541 } else {
542 writer.write_u8(0x00)?;
543 write_vint(writer, *doc_freq as u64)?;
544 write_vint(writer, *posting_offset)?;
545 write_vint(writer, *posting_len)?;
546 }
547 }
548 }
549 Ok(())
550 }
551
552 fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
553 let tag = reader.read_u8()?;
554
555 if tag == 0xFF {
556 let doc_freq = reader.read_u8()?;
558 let data_len = reader.read_u8()?;
559 let mut data = [0u8; 16];
560 reader.read_exact(&mut data[..data_len as usize])?;
561 Ok(TermInfo::Inline {
562 doc_freq,
563 data,
564 data_len,
565 })
566 } else if tag == 0x00 {
567 let doc_freq = read_vint(reader)? as u32;
569 let posting_offset = read_vint(reader)?;
570 let posting_len = read_vint(reader)?;
571 Ok(TermInfo::External {
572 posting_offset,
573 posting_len,
574 doc_freq,
575 position_offset: 0,
576 position_len: 0,
577 })
578 } else if tag == 0x01 {
579 let doc_freq = read_vint(reader)? as u32;
581 let posting_offset = read_vint(reader)?;
582 let posting_len = read_vint(reader)?;
583 let position_offset = read_vint(reader)?;
584 let position_len = read_vint(reader)?;
585 Ok(TermInfo::External {
586 posting_offset,
587 posting_len,
588 doc_freq,
589 position_offset,
590 position_len,
591 })
592 } else {
593 Err(io::Error::new(
594 io::ErrorKind::InvalidData,
595 format!("Invalid TermInfo tag: {}", tag),
596 ))
597 }
598 }
599}
600
601pub fn write_vint<W: Write + ?Sized>(writer: &mut W, mut value: u64) -> io::Result<()> {
603 loop {
604 let byte = (value & 0x7F) as u8;
605 value >>= 7;
606 if value == 0 {
607 writer.write_u8(byte)?;
608 return Ok(());
609 } else {
610 writer.write_u8(byte | 0x80)?;
611 }
612 }
613}
614
615pub fn read_vint<R: Read>(reader: &mut R) -> io::Result<u64> {
617 let mut result = 0u64;
618 let mut shift = 0;
619
620 loop {
621 let byte = reader.read_u8()?;
622 result |= ((byte & 0x7F) as u64) << shift;
623 if byte & 0x80 == 0 {
624 return Ok(result);
625 }
626 shift += 7;
627 if shift >= 64 {
628 return Err(io::Error::new(
629 io::ErrorKind::InvalidData,
630 "varint too long",
631 ));
632 }
633 }
634}
635
636pub fn common_prefix_len(a: &[u8], b: &[u8]) -> usize {
638 a.iter().zip(b.iter()).take_while(|(x, y)| x == y).count()
639}
640
641#[derive(Debug, Clone)]
643pub struct SSTableStats {
644 pub num_blocks: usize,
645 pub num_sparse_entries: usize,
646 pub num_entries: u64,
647 pub has_bloom_filter: bool,
648 pub has_dictionary: bool,
649 pub bloom_filter_size: usize,
650 pub dictionary_size: usize,
651}
652
653#[derive(Debug, Clone)]
655pub struct SSTableWriterConfig {
656 pub compression_level: CompressionLevel,
658 pub use_dictionary: bool,
660 pub dict_size: usize,
662 pub use_bloom_filter: bool,
664 pub bloom_bits_per_key: usize,
666}
667
668impl Default for SSTableWriterConfig {
669 fn default() -> Self {
670 Self::from_optimization(crate::structures::IndexOptimization::default())
671 }
672}
673
674impl SSTableWriterConfig {
675 pub fn from_optimization(optimization: crate::structures::IndexOptimization) -> Self {
677 use crate::structures::IndexOptimization;
678 match optimization {
679 IndexOptimization::Adaptive => Self {
680 compression_level: CompressionLevel::BETTER, use_dictionary: false,
682 dict_size: DEFAULT_DICT_SIZE,
683 use_bloom_filter: false,
684 bloom_bits_per_key: BLOOM_BITS_PER_KEY,
685 },
686 IndexOptimization::SizeOptimized => Self {
687 compression_level: CompressionLevel::MAX, use_dictionary: true,
689 dict_size: DEFAULT_DICT_SIZE,
690 use_bloom_filter: true,
691 bloom_bits_per_key: BLOOM_BITS_PER_KEY,
692 },
693 IndexOptimization::PerformanceOptimized => Self {
694 compression_level: CompressionLevel::FAST, use_dictionary: false,
696 dict_size: DEFAULT_DICT_SIZE,
697 use_bloom_filter: true, bloom_bits_per_key: BLOOM_BITS_PER_KEY,
699 },
700 }
701 }
702
703 pub fn fast() -> Self {
705 Self::from_optimization(crate::structures::IndexOptimization::PerformanceOptimized)
706 }
707
708 pub fn max_compression() -> Self {
710 Self::from_optimization(crate::structures::IndexOptimization::SizeOptimized)
711 }
712}
713
714pub struct SSTableWriter<W: Write, V: SSTableValue> {
720 writer: W,
721 block_buffer: Vec<u8>,
722 prev_key: Vec<u8>,
723 index: Vec<BlockIndexEntry>,
724 current_offset: u64,
725 num_entries: u64,
726 block_first_key: Option<Vec<u8>>,
727 config: SSTableWriterConfig,
728 dictionary: Option<CompressionDict>,
730 bloom_hashes: Vec<(u64, u64)>,
733 _phantom: std::marker::PhantomData<V>,
734}
735
736impl<W: Write, V: SSTableValue> SSTableWriter<W, V> {
737 pub fn new(writer: W) -> Self {
739 Self::with_config(writer, SSTableWriterConfig::default())
740 }
741
742 pub fn with_config(writer: W, config: SSTableWriterConfig) -> Self {
744 Self {
745 writer,
746 block_buffer: Vec::with_capacity(BLOCK_SIZE),
747 prev_key: Vec::new(),
748 index: Vec::new(),
749 current_offset: 0,
750 num_entries: 0,
751 block_first_key: None,
752 config,
753 dictionary: None,
754 bloom_hashes: Vec::new(),
755 _phantom: std::marker::PhantomData,
756 }
757 }
758
759 pub fn with_dictionary(
761 writer: W,
762 config: SSTableWriterConfig,
763 dictionary: CompressionDict,
764 ) -> Self {
765 Self {
766 writer,
767 block_buffer: Vec::with_capacity(BLOCK_SIZE),
768 prev_key: Vec::new(),
769 index: Vec::new(),
770 current_offset: 0,
771 num_entries: 0,
772 block_first_key: None,
773 config,
774 dictionary: Some(dictionary),
775 bloom_hashes: Vec::new(),
776 _phantom: std::marker::PhantomData,
777 }
778 }
779
780 pub fn insert(&mut self, key: &[u8], value: &V) -> io::Result<()> {
781 if self.block_first_key.is_none() {
782 self.block_first_key = Some(key.to_vec());
783 }
784
785 if self.config.use_bloom_filter {
787 self.bloom_hashes.push(bloom_hash_pair(key));
788 }
789
790 let prefix_len = common_prefix_len(&self.prev_key, key);
791 let suffix = &key[prefix_len..];
792
793 write_vint(&mut self.block_buffer, prefix_len as u64)?;
794 write_vint(&mut self.block_buffer, suffix.len() as u64)?;
795 self.block_buffer.extend_from_slice(suffix);
796 value.serialize(&mut self.block_buffer)?;
797
798 self.prev_key.clear();
799 self.prev_key.extend_from_slice(key);
800 self.num_entries += 1;
801
802 if self.block_buffer.len() >= BLOCK_SIZE {
803 self.flush_block()?;
804 }
805
806 Ok(())
807 }
808
809 fn flush_block(&mut self) -> io::Result<()> {
811 if self.block_buffer.is_empty() {
812 return Ok(());
813 }
814
815 let compressed = if let Some(ref dict) = self.dictionary {
817 crate::compression::compress_with_dict(
818 &self.block_buffer,
819 self.config.compression_level,
820 dict,
821 )?
822 } else {
823 crate::compression::compress(&self.block_buffer, self.config.compression_level)?
824 };
825
826 if let Some(first_key) = self.block_first_key.take() {
827 self.index.push(BlockIndexEntry {
828 first_key,
829 offset: self.current_offset,
830 length: compressed.len() as u32,
831 });
832 }
833
834 self.writer.write_all(&compressed)?;
835 self.current_offset += compressed.len() as u64;
836 self.block_buffer.clear();
837 self.prev_key.clear();
838
839 Ok(())
840 }
841
842 pub fn finish(mut self) -> io::Result<W> {
843 self.flush_block()?;
845
846 let bloom_filter = if self.config.use_bloom_filter && !self.bloom_hashes.is_empty() {
848 let mut bloom =
849 BloomFilter::new(self.bloom_hashes.len(), self.config.bloom_bits_per_key);
850 for (h1, h2) in &self.bloom_hashes {
851 bloom.insert_hashed(*h1, *h2);
852 }
853 Some(bloom)
854 } else {
855 None
856 };
857
858 let data_end_offset = self.current_offset;
859
860 let entries: Vec<(Vec<u8>, BlockAddr)> = self
863 .index
864 .iter()
865 .map(|e| {
866 (
867 e.first_key.clone(),
868 BlockAddr {
869 offset: e.offset,
870 length: e.length,
871 },
872 )
873 })
874 .collect();
875
876 #[cfg(feature = "native")]
878 let index_bytes = FstBlockIndex::build(&entries)?;
879 #[cfg(not(feature = "native"))]
880 let index_bytes = MmapBlockIndex::build(&entries)?;
881
882 self.writer
884 .write_u32::<LittleEndian>(index_bytes.len() as u32)?;
885 self.writer.write_all(&index_bytes)?;
886 self.current_offset += 4 + index_bytes.len() as u64;
887
888 let bloom_offset = if let Some(ref bloom) = bloom_filter {
890 let bloom_data = bloom.to_bytes();
891 let offset = self.current_offset;
892 self.writer.write_all(&bloom_data)?;
893 self.current_offset += bloom_data.len() as u64;
894 offset
895 } else {
896 0
897 };
898
899 let dict_offset = if let Some(ref dict) = self.dictionary {
901 let dict_bytes = dict.as_bytes();
902 let offset = self.current_offset;
903 self.writer
904 .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
905 self.writer.write_all(dict_bytes)?;
906 self.current_offset += 4 + dict_bytes.len() as u64;
907 offset
908 } else {
909 0
910 };
911
912 self.writer.write_u64::<LittleEndian>(data_end_offset)?;
914 self.writer.write_u64::<LittleEndian>(self.num_entries)?;
915 self.writer.write_u64::<LittleEndian>(bloom_offset)?; self.writer.write_u64::<LittleEndian>(dict_offset)?; self.writer
918 .write_u8(self.config.compression_level.0 as u8)?;
919 self.writer.write_u32::<LittleEndian>(SSTABLE_MAGIC)?;
920
921 Ok(self.writer)
922 }
923}
924
925#[derive(Debug, Clone)]
927struct BlockIndexEntry {
928 first_key: Vec<u8>,
929 offset: u64,
930 length: u32,
931}
932
933pub struct AsyncSSTableReader<V: SSTableValue> {
940 data_slice: FileHandle,
942 block_index: BlockIndex,
944 num_entries: u64,
945 cache: RwLock<BlockCache>,
947 bloom_filter: Option<BloomFilter>,
949 dictionary: Option<CompressionDict>,
951 #[allow(dead_code)]
953 compression_level: CompressionLevel,
954 _phantom: std::marker::PhantomData<V>,
955}
956
957struct BlockCache {
964 blocks: FxHashMap<u64, Arc<[u8]>>,
965 lru_order: std::collections::VecDeque<u64>,
966 max_blocks: usize,
967}
968
969impl BlockCache {
970 fn new(max_blocks: usize) -> Self {
971 Self {
972 blocks: FxHashMap::default(),
973 lru_order: std::collections::VecDeque::with_capacity(max_blocks),
974 max_blocks,
975 }
976 }
977
978 fn get(&mut self, offset: u64) -> Option<Arc<[u8]>> {
979 if self.blocks.contains_key(&offset) {
980 self.promote(offset);
981 self.blocks.get(&offset).map(Arc::clone)
982 } else {
983 None
984 }
985 }
986
987 fn peek(&self, offset: u64) -> Option<Arc<[u8]>> {
989 self.blocks.get(&offset).map(Arc::clone)
990 }
991
992 fn insert(&mut self, offset: u64, block: Arc<[u8]>) {
993 if self.blocks.contains_key(&offset) {
994 self.promote(offset);
995 return;
996 }
997 while self.blocks.len() >= self.max_blocks {
998 if let Some(evict_offset) = self.lru_order.pop_front() {
999 self.blocks.remove(&evict_offset);
1000 } else {
1001 break;
1002 }
1003 }
1004 self.blocks.insert(offset, block);
1005 self.lru_order.push_back(offset);
1006 }
1007
1008 fn promote(&mut self, offset: u64) {
1010 if let Some(pos) = self.lru_order.iter().position(|&k| k == offset) {
1011 self.lru_order.remove(pos);
1012 self.lru_order.push_back(offset);
1013 }
1014 }
1015}
1016
1017impl<V: SSTableValue> AsyncSSTableReader<V> {
1018 pub async fn open(file_handle: FileHandle, cache_blocks: usize) -> io::Result<Self> {
1023 let file_len = file_handle.len();
1024 if file_len < 37 {
1025 return Err(io::Error::new(
1026 io::ErrorKind::InvalidData,
1027 "SSTable too small",
1028 ));
1029 }
1030
1031 let footer_bytes = file_handle
1034 .read_bytes_range(file_len - 37..file_len)
1035 .await?;
1036
1037 let mut reader = footer_bytes.as_slice();
1038 let data_end_offset = reader.read_u64::<LittleEndian>()?;
1039 let num_entries = reader.read_u64::<LittleEndian>()?;
1040 let bloom_offset = reader.read_u64::<LittleEndian>()?;
1041 let dict_offset = reader.read_u64::<LittleEndian>()?;
1042 let compression_level = CompressionLevel(reader.read_u8()? as i32);
1043 let magic = reader.read_u32::<LittleEndian>()?;
1044
1045 if magic != SSTABLE_MAGIC {
1046 return Err(io::Error::new(
1047 io::ErrorKind::InvalidData,
1048 format!("Invalid SSTable magic: 0x{:08X}", magic),
1049 ));
1050 }
1051
1052 let index_start = data_end_offset;
1054 let index_end = file_len - 37;
1055 let index_bytes = file_handle.read_bytes_range(index_start..index_end).await?;
1056
1057 let mut idx_reader = index_bytes.as_slice();
1059 let index_len = idx_reader.read_u32::<LittleEndian>()? as usize;
1060
1061 if index_len > idx_reader.len() {
1062 return Err(io::Error::new(
1063 io::ErrorKind::InvalidData,
1064 "Index data truncated",
1065 ));
1066 }
1067
1068 let index_data = index_bytes.slice(4..4 + index_len);
1069
1070 #[cfg(feature = "native")]
1072 let block_index = match FstBlockIndex::load(index_data.clone()) {
1073 Ok(fst_idx) => BlockIndex::Fst(fst_idx),
1074 Err(_) => BlockIndex::Mmap(MmapBlockIndex::load(index_data)?),
1075 };
1076 #[cfg(not(feature = "native"))]
1077 let block_index = BlockIndex::Mmap(MmapBlockIndex::load(index_data)?);
1078
1079 let bloom_filter = if bloom_offset > 0 {
1081 let bloom_start = bloom_offset;
1082 let bloom_header = file_handle
1084 .read_bytes_range(bloom_start..bloom_start + 12)
1085 .await?;
1086 let num_words = u32::from_le_bytes([
1087 bloom_header[8],
1088 bloom_header[9],
1089 bloom_header[10],
1090 bloom_header[11],
1091 ]) as u64;
1092 let bloom_size = 12 + num_words * 8;
1093 let bloom_data = file_handle
1094 .read_bytes_range(bloom_start..bloom_start + bloom_size)
1095 .await?;
1096 Some(BloomFilter::from_owned_bytes(bloom_data)?)
1097 } else {
1098 None
1099 };
1100
1101 let dictionary = if dict_offset > 0 {
1103 let dict_start = dict_offset;
1104 let dict_len_bytes = file_handle
1106 .read_bytes_range(dict_start..dict_start + 4)
1107 .await?;
1108 let dict_len = u32::from_le_bytes([
1109 dict_len_bytes[0],
1110 dict_len_bytes[1],
1111 dict_len_bytes[2],
1112 dict_len_bytes[3],
1113 ]) as u64;
1114 let dict_data = file_handle
1115 .read_bytes_range(dict_start + 4..dict_start + 4 + dict_len)
1116 .await?;
1117 Some(CompressionDict::from_owned_bytes(dict_data))
1118 } else {
1119 None
1120 };
1121
1122 let data_slice = file_handle.slice(0..data_end_offset);
1124
1125 Ok(Self {
1126 data_slice,
1127 block_index,
1128 num_entries,
1129 cache: RwLock::new(BlockCache::new(cache_blocks)),
1130 bloom_filter,
1131 dictionary,
1132 compression_level,
1133 _phantom: std::marker::PhantomData,
1134 })
1135 }
1136
1137 pub fn num_entries(&self) -> u64 {
1139 self.num_entries
1140 }
1141
1142 pub fn stats(&self) -> SSTableStats {
1144 SSTableStats {
1145 num_blocks: self.block_index.len(),
1146 num_sparse_entries: 0, num_entries: self.num_entries,
1148 has_bloom_filter: self.bloom_filter.is_some(),
1149 has_dictionary: self.dictionary.is_some(),
1150 bloom_filter_size: self
1151 .bloom_filter
1152 .as_ref()
1153 .map(|b| b.size_bytes())
1154 .unwrap_or(0),
1155 dictionary_size: self.dictionary.as_ref().map(|d| d.len()).unwrap_or(0),
1156 }
1157 }
1158
1159 pub fn cached_blocks(&self) -> usize {
1161 self.cache.read().blocks.len()
1162 }
1163
1164 pub async fn get(&self, key: &[u8]) -> io::Result<Option<V>> {
1169 log::debug!(
1170 "SSTable::get called, key_len={}, total_blocks={}",
1171 key.len(),
1172 self.block_index.len()
1173 );
1174
1175 if let Some(ref bloom) = self.bloom_filter
1177 && !bloom.may_contain(key)
1178 {
1179 log::debug!("SSTable::get bloom filter negative");
1180 return Ok(None);
1181 }
1182
1183 let block_idx = match self.block_index.locate(key) {
1185 Some(idx) => idx,
1186 None => {
1187 log::debug!("SSTable::get key not found (before first block)");
1188 return Ok(None);
1189 }
1190 };
1191
1192 log::debug!("SSTable::get loading block_idx={}", block_idx);
1193
1194 let block_data = self.load_block(block_idx).await?;
1196 self.search_block(&block_data, key)
1197 }
1198
1199 pub async fn get_batch(&self, keys: &[&[u8]]) -> io::Result<Vec<Option<V>>> {
1205 if keys.is_empty() {
1206 return Ok(Vec::new());
1207 }
1208
1209 let mut key_to_block: Vec<(usize, usize)> = Vec::with_capacity(keys.len());
1211 for (key_idx, key) in keys.iter().enumerate() {
1212 if let Some(ref bloom) = self.bloom_filter
1214 && !bloom.may_contain(key)
1215 {
1216 key_to_block.push((key_idx, usize::MAX)); continue;
1218 }
1219
1220 match self.block_index.locate(key) {
1221 Some(block_idx) => key_to_block.push((key_idx, block_idx)),
1222 None => key_to_block.push((key_idx, usize::MAX)), }
1224 }
1225
1226 let mut blocks_to_load: Vec<usize> = key_to_block
1228 .iter()
1229 .filter(|(_, b)| *b != usize::MAX)
1230 .map(|(_, b)| *b)
1231 .collect();
1232 blocks_to_load.sort_unstable();
1233 blocks_to_load.dedup();
1234
1235 for &block_idx in &blocks_to_load {
1237 let _ = self.load_block(block_idx).await?;
1238 }
1239
1240 let mut results = vec![None; keys.len()];
1242 for (key_idx, block_idx) in key_to_block {
1243 if block_idx == usize::MAX {
1244 continue;
1245 }
1246 let block_data = self.load_block(block_idx).await?; results[key_idx] = self.search_block(&block_data, keys[key_idx])?;
1248 }
1249
1250 Ok(results)
1251 }
1252
1253 pub async fn preload_all_blocks(&self) -> io::Result<()> {
1258 for block_idx in 0..self.block_index.len() {
1259 self.load_block(block_idx).await?;
1260 }
1261 Ok(())
1262 }
1263
1264 pub async fn prefetch_all_data_bulk(&self) -> io::Result<()> {
1270 let num_blocks = self.block_index.len();
1271 if num_blocks == 0 {
1272 return Ok(());
1273 }
1274
1275 let mut max_end: u64 = 0;
1277 for i in 0..num_blocks {
1278 if let Some(addr) = self.block_index.get_addr(i) {
1279 max_end = max_end.max(addr.offset + addr.length as u64);
1280 }
1281 }
1282
1283 let all_data = self.data_slice.read_bytes_range(0..max_end).await?;
1285 let buf = all_data.as_slice();
1286
1287 let mut cache = self.cache.write();
1289 cache.max_blocks = cache.max_blocks.max(num_blocks);
1290 for i in 0..num_blocks {
1291 let addr = self.block_index.get_addr(i).unwrap();
1292 if cache.get(addr.offset).is_some() {
1293 continue;
1294 }
1295 let compressed =
1296 &buf[addr.offset as usize..(addr.offset + addr.length as u64) as usize];
1297 let decompressed = if let Some(ref dict) = self.dictionary {
1298 crate::compression::decompress_with_dict(compressed, dict)?
1299 } else {
1300 crate::compression::decompress(compressed)?
1301 };
1302 cache.insert(addr.offset, Arc::from(decompressed));
1303 }
1304
1305 Ok(())
1306 }
1307
1308 async fn load_block(&self, block_idx: usize) -> io::Result<Arc<[u8]>> {
1311 let addr = self.block_index.get_addr(block_idx).ok_or_else(|| {
1312 io::Error::new(io::ErrorKind::InvalidInput, "Block index out of range")
1313 })?;
1314
1315 {
1317 if let Some(block) = self.cache.read().peek(addr.offset) {
1318 return Ok(block);
1319 }
1320 }
1321
1322 log::debug!(
1323 "SSTable::load_block idx={} CACHE MISS, reading bytes [{}-{}]",
1324 block_idx,
1325 addr.offset,
1326 addr.offset + addr.length as u64
1327 );
1328
1329 let range = addr.byte_range();
1331 let compressed = self.data_slice.read_bytes_range(range).await?;
1332
1333 let decompressed = if let Some(ref dict) = self.dictionary {
1335 crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
1336 } else {
1337 crate::compression::decompress(compressed.as_slice())?
1338 };
1339
1340 let block: Arc<[u8]> = Arc::from(decompressed);
1341
1342 {
1344 let mut cache = self.cache.write();
1345 cache.insert(addr.offset, Arc::clone(&block));
1346 }
1347
1348 Ok(block)
1349 }
1350
1351 #[cfg(feature = "sync")]
1353 fn load_block_sync(&self, block_idx: usize) -> io::Result<Arc<[u8]>> {
1354 let addr = self.block_index.get_addr(block_idx).ok_or_else(|| {
1355 io::Error::new(io::ErrorKind::InvalidInput, "Block index out of range")
1356 })?;
1357
1358 {
1360 if let Some(block) = self.cache.read().peek(addr.offset) {
1361 return Ok(block);
1362 }
1363 }
1364
1365 let range = addr.byte_range();
1367 let compressed = self.data_slice.read_bytes_range_sync(range)?;
1368
1369 let decompressed = if let Some(ref dict) = self.dictionary {
1371 crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
1372 } else {
1373 crate::compression::decompress(compressed.as_slice())?
1374 };
1375
1376 let block: Arc<[u8]> = Arc::from(decompressed);
1377
1378 {
1380 let mut cache = self.cache.write();
1381 cache.insert(addr.offset, Arc::clone(&block));
1382 }
1383
1384 Ok(block)
1385 }
1386
1387 #[cfg(feature = "sync")]
1389 pub fn get_sync(&self, key: &[u8]) -> io::Result<Option<V>> {
1390 if let Some(ref bloom) = self.bloom_filter
1392 && !bloom.may_contain(key)
1393 {
1394 return Ok(None);
1395 }
1396
1397 let block_idx = match self.block_index.locate(key) {
1399 Some(idx) => idx,
1400 None => {
1401 return Ok(None);
1402 }
1403 };
1404
1405 let block_data = self.load_block_sync(block_idx)?;
1406 self.search_block(&block_data, key)
1407 }
1408
1409 fn search_block(&self, block_data: &[u8], target_key: &[u8]) -> io::Result<Option<V>> {
1410 let mut reader = block_data;
1411 let mut current_key = Vec::new();
1412
1413 while !reader.is_empty() {
1414 let common_prefix_len = read_vint(&mut reader)? as usize;
1415 let suffix_len = read_vint(&mut reader)? as usize;
1416
1417 if suffix_len > reader.len() {
1418 return Err(io::Error::new(
1419 io::ErrorKind::UnexpectedEof,
1420 "SSTable block suffix truncated",
1421 ));
1422 }
1423 current_key.truncate(common_prefix_len);
1424 current_key.extend_from_slice(&reader[..suffix_len]);
1425 reader = &reader[suffix_len..];
1426
1427 let value = V::deserialize(&mut reader)?;
1428
1429 match current_key.as_slice().cmp(target_key) {
1430 std::cmp::Ordering::Equal => return Ok(Some(value)),
1431 std::cmp::Ordering::Greater => return Ok(None),
1432 std::cmp::Ordering::Less => continue,
1433 }
1434 }
1435
1436 Ok(None)
1437 }
1438
1439 pub async fn prefetch_range(&self, start_key: &[u8], end_key: &[u8]) -> io::Result<()> {
1441 let start_block = self.block_index.locate(start_key).unwrap_or(0);
1442 let end_block = self
1443 .block_index
1444 .locate(end_key)
1445 .unwrap_or(self.block_index.len().saturating_sub(1));
1446
1447 for block_idx in start_block..=end_block.min(self.block_index.len().saturating_sub(1)) {
1448 let _ = self.load_block(block_idx).await?;
1449 }
1450
1451 Ok(())
1452 }
1453
1454 pub fn iter(&self) -> AsyncSSTableIterator<'_, V> {
1456 AsyncSSTableIterator::new(self)
1457 }
1458
1459 pub async fn all_entries(&self) -> io::Result<Vec<(Vec<u8>, V)>> {
1461 let mut results = Vec::new();
1462
1463 for block_idx in 0..self.block_index.len() {
1464 let block_data = self.load_block(block_idx).await?;
1465 let mut reader = &block_data[..];
1466 let mut current_key = Vec::new();
1467
1468 while !reader.is_empty() {
1469 let common_prefix_len = read_vint(&mut reader)? as usize;
1470 let suffix_len = read_vint(&mut reader)? as usize;
1471
1472 if suffix_len > reader.len() {
1473 return Err(io::Error::new(
1474 io::ErrorKind::UnexpectedEof,
1475 "SSTable block suffix truncated",
1476 ));
1477 }
1478 current_key.truncate(common_prefix_len);
1479 current_key.extend_from_slice(&reader[..suffix_len]);
1480 reader = &reader[suffix_len..];
1481
1482 let value = V::deserialize(&mut reader)?;
1483 results.push((current_key.clone(), value));
1484 }
1485 }
1486
1487 Ok(results)
1488 }
1489}
1490
1491pub struct AsyncSSTableIterator<'a, V: SSTableValue> {
1493 reader: &'a AsyncSSTableReader<V>,
1494 current_block: usize,
1495 block_data: Option<Arc<[u8]>>,
1496 block_offset: usize,
1497 current_key: Vec<u8>,
1498 finished: bool,
1499}
1500
1501impl<'a, V: SSTableValue> AsyncSSTableIterator<'a, V> {
1502 fn new(reader: &'a AsyncSSTableReader<V>) -> Self {
1503 Self {
1504 reader,
1505 current_block: 0,
1506 block_data: None,
1507 block_offset: 0,
1508 current_key: Vec::new(),
1509 finished: reader.block_index.is_empty(),
1510 }
1511 }
1512
1513 async fn load_next_block(&mut self) -> io::Result<bool> {
1514 if self.current_block >= self.reader.block_index.len() {
1515 self.finished = true;
1516 return Ok(false);
1517 }
1518
1519 self.block_data = Some(self.reader.load_block(self.current_block).await?);
1520 self.block_offset = 0;
1521 self.current_key.clear();
1522 self.current_block += 1;
1523 Ok(true)
1524 }
1525
1526 pub async fn next(&mut self) -> io::Result<Option<(Vec<u8>, V)>> {
1528 if self.finished {
1529 return Ok(None);
1530 }
1531
1532 if self.block_data.is_none() && !self.load_next_block().await? {
1533 return Ok(None);
1534 }
1535
1536 loop {
1537 let block = self.block_data.as_ref().unwrap();
1538 if self.block_offset >= block.len() {
1539 if !self.load_next_block().await? {
1540 return Ok(None);
1541 }
1542 continue;
1543 }
1544
1545 let mut reader = &block[self.block_offset..];
1546 let start_len = reader.len();
1547
1548 let common_prefix_len = read_vint(&mut reader)? as usize;
1549 let suffix_len = read_vint(&mut reader)? as usize;
1550
1551 if suffix_len > reader.len() {
1552 return Err(io::Error::new(
1553 io::ErrorKind::UnexpectedEof,
1554 "SSTable block suffix truncated",
1555 ));
1556 }
1557 self.current_key.truncate(common_prefix_len);
1558 self.current_key.extend_from_slice(&reader[..suffix_len]);
1559 reader = &reader[suffix_len..];
1560
1561 let value = V::deserialize(&mut reader)?;
1562
1563 self.block_offset += start_len - reader.len();
1564
1565 return Ok(Some((self.current_key.clone(), value)));
1566 }
1567 }
1568}
1569
1570#[cfg(test)]
1571mod tests {
1572 use super::*;
1573
1574 #[test]
1575 fn test_bloom_filter_basic() {
1576 let mut bloom = BloomFilter::new(100, 10);
1577
1578 bloom.insert(b"hello");
1579 bloom.insert(b"world");
1580 bloom.insert(b"test");
1581
1582 assert!(bloom.may_contain(b"hello"));
1583 assert!(bloom.may_contain(b"world"));
1584 assert!(bloom.may_contain(b"test"));
1585
1586 assert!(!bloom.may_contain(b"notfound"));
1588 assert!(!bloom.may_contain(b"missing"));
1589 }
1590
1591 #[test]
1592 fn test_bloom_filter_serialization() {
1593 let mut bloom = BloomFilter::new(100, 10);
1594 bloom.insert(b"key1");
1595 bloom.insert(b"key2");
1596
1597 let bytes = bloom.to_bytes();
1598 let restored = BloomFilter::from_owned_bytes(OwnedBytes::new(bytes)).unwrap();
1599
1600 assert!(restored.may_contain(b"key1"));
1601 assert!(restored.may_contain(b"key2"));
1602 assert!(!restored.may_contain(b"key3"));
1603 }
1604
1605 #[test]
1606 fn test_bloom_filter_false_positive_rate() {
1607 let num_keys = 10000;
1608 let mut bloom = BloomFilter::new(num_keys, BLOOM_BITS_PER_KEY);
1609
1610 for i in 0..num_keys {
1612 let key = format!("key_{}", i);
1613 bloom.insert(key.as_bytes());
1614 }
1615
1616 for i in 0..num_keys {
1618 let key = format!("key_{}", i);
1619 assert!(bloom.may_contain(key.as_bytes()));
1620 }
1621
1622 let mut false_positives = 0;
1624 let test_count = 10000;
1625 for i in 0..test_count {
1626 let key = format!("nonexistent_{}", i);
1627 if bloom.may_contain(key.as_bytes()) {
1628 false_positives += 1;
1629 }
1630 }
1631
1632 let fp_rate = false_positives as f64 / test_count as f64;
1635 assert!(
1636 fp_rate < 0.03,
1637 "False positive rate {} is too high",
1638 fp_rate
1639 );
1640 }
1641
1642 #[test]
1643 fn test_sstable_writer_config() {
1644 use crate::structures::IndexOptimization;
1645
1646 let config = SSTableWriterConfig::default();
1648 assert_eq!(config.compression_level.0, 9); assert!(!config.use_bloom_filter);
1650 assert!(!config.use_dictionary);
1651
1652 let adaptive = SSTableWriterConfig::from_optimization(IndexOptimization::Adaptive);
1654 assert_eq!(adaptive.compression_level.0, 9);
1655 assert!(!adaptive.use_bloom_filter);
1656 assert!(!adaptive.use_dictionary);
1657
1658 let size = SSTableWriterConfig::from_optimization(IndexOptimization::SizeOptimized);
1660 assert_eq!(size.compression_level.0, 22); assert!(size.use_bloom_filter);
1662 assert!(size.use_dictionary);
1663
1664 let perf = SSTableWriterConfig::from_optimization(IndexOptimization::PerformanceOptimized);
1666 assert_eq!(perf.compression_level.0, 1); assert!(perf.use_bloom_filter); assert!(!perf.use_dictionary);
1669
1670 let fast = SSTableWriterConfig::fast();
1672 assert_eq!(fast.compression_level.0, 1);
1673
1674 let max = SSTableWriterConfig::max_compression();
1675 assert_eq!(max.compression_level.0, 22);
1676 }
1677
1678 #[test]
1679 fn test_vint_roundtrip() {
1680 let test_values = [0u64, 1, 127, 128, 255, 256, 16383, 16384, u64::MAX];
1681
1682 for &val in &test_values {
1683 let mut buf = Vec::new();
1684 write_vint(&mut buf, val).unwrap();
1685 let mut reader = buf.as_slice();
1686 let decoded = read_vint(&mut reader).unwrap();
1687 assert_eq!(val, decoded, "Failed for value {}", val);
1688 }
1689 }
1690
1691 #[test]
1692 fn test_common_prefix_len() {
1693 assert_eq!(common_prefix_len(b"hello", b"hello"), 5);
1694 assert_eq!(common_prefix_len(b"hello", b"help"), 3);
1695 assert_eq!(common_prefix_len(b"hello", b"world"), 0);
1696 assert_eq!(common_prefix_len(b"", b"hello"), 0);
1697 assert_eq!(common_prefix_len(b"hello", b""), 0);
1698 }
1699}