1use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
22use parking_lot::RwLock;
23use rustc_hash::FxHashMap;
24use std::io::{self, Read, Write};
25use std::sync::Arc;
26
27#[cfg(feature = "native")]
28use super::sstable_index::FstBlockIndex;
29use super::sstable_index::{BlockAddr, BlockIndex, MmapBlockIndex};
30use crate::compression::{CompressionDict, CompressionLevel};
31use crate::directories::{FileHandle, OwnedBytes};
32
33pub const SSTABLE_MAGIC: u32 = 0x53544234; pub const BLOCK_SIZE: usize = 16 * 1024;
39
40pub const DEFAULT_DICT_SIZE: usize = 64 * 1024;
42
43pub const BLOOM_BITS_PER_KEY: usize = 10;
45
46pub const BLOOM_HASH_COUNT: usize = 7;
48
49#[derive(Debug, Clone)]
55pub struct BloomFilter {
56 bits: BloomBits,
57 num_bits: usize,
58 num_hashes: usize,
59}
60
61#[derive(Debug, Clone)]
63enum BloomBits {
64 Vec(Vec<u64>),
66 Bytes(OwnedBytes),
68}
69
70impl BloomBits {
71 #[inline]
72 fn len(&self) -> usize {
73 match self {
74 BloomBits::Vec(v) => v.len(),
75 BloomBits::Bytes(b) => b.len() / 8,
76 }
77 }
78
79 #[inline]
80 fn get(&self, word_idx: usize) -> u64 {
81 match self {
82 BloomBits::Vec(v) => v[word_idx],
83 BloomBits::Bytes(b) => {
84 let off = word_idx * 8;
85 u64::from_le_bytes([
86 b[off],
87 b[off + 1],
88 b[off + 2],
89 b[off + 3],
90 b[off + 4],
91 b[off + 5],
92 b[off + 6],
93 b[off + 7],
94 ])
95 }
96 }
97 }
98
99 #[inline]
100 fn set_bit(&mut self, word_idx: usize, bit_idx: usize) {
101 match self {
102 BloomBits::Vec(v) => v[word_idx] |= 1u64 << bit_idx,
103 BloomBits::Bytes(_) => panic!("cannot mutate read-only bloom filter"),
104 }
105 }
106
107 fn size_bytes(&self) -> usize {
108 match self {
109 BloomBits::Vec(v) => v.len() * 8,
110 BloomBits::Bytes(b) => b.len(),
111 }
112 }
113}
114
115impl BloomFilter {
116 pub fn new(expected_keys: usize, bits_per_key: usize) -> Self {
118 let num_bits = (expected_keys * bits_per_key).max(64);
119 let num_words = num_bits.div_ceil(64);
120 Self {
121 bits: BloomBits::Vec(vec![0u64; num_words]),
122 num_bits,
123 num_hashes: BLOOM_HASH_COUNT,
124 }
125 }
126
127 pub fn from_bytes_mutable(data: &[u8]) -> io::Result<Self> {
131 if data.len() < 12 {
132 return Err(io::Error::new(
133 io::ErrorKind::InvalidData,
134 "Bloom filter data too short",
135 ));
136 }
137 let num_bits = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
138 let num_hashes = u32::from_le_bytes([data[4], data[5], data[6], data[7]]) as usize;
139 let num_words = u32::from_le_bytes([data[8], data[9], data[10], data[11]]) as usize;
140
141 if data.len() < 12 + num_words * 8 {
142 return Err(io::Error::new(
143 io::ErrorKind::InvalidData,
144 "Bloom filter data truncated",
145 ));
146 }
147
148 let mut vec = vec![0u64; num_words];
149 for (i, v) in vec.iter_mut().enumerate() {
150 let off = 12 + i * 8;
151 *v = u64::from_le_bytes(data[off..off + 8].try_into().unwrap());
152 }
153
154 Ok(Self {
155 bits: BloomBits::Vec(vec),
156 num_bits,
157 num_hashes,
158 })
159 }
160
161 pub fn from_owned_bytes(data: OwnedBytes) -> io::Result<Self> {
163 if data.len() < 12 {
164 return Err(io::Error::new(
165 io::ErrorKind::InvalidData,
166 "Bloom filter data too short",
167 ));
168 }
169 let d = data.as_slice();
170 let num_bits = u32::from_le_bytes([d[0], d[1], d[2], d[3]]) as usize;
171 let num_hashes = u32::from_le_bytes([d[4], d[5], d[6], d[7]]) as usize;
172 let num_words = u32::from_le_bytes([d[8], d[9], d[10], d[11]]) as usize;
173
174 if d.len() < 12 + num_words * 8 {
175 return Err(io::Error::new(
176 io::ErrorKind::InvalidData,
177 "Bloom filter data truncated",
178 ));
179 }
180
181 let bits_bytes = data.slice(12..12 + num_words * 8);
183
184 Ok(Self {
185 bits: BloomBits::Bytes(bits_bytes),
186 num_bits,
187 num_hashes,
188 })
189 }
190
191 pub fn to_bytes(&self) -> Vec<u8> {
193 let num_words = self.bits.len();
194 let mut data = Vec::with_capacity(12 + num_words * 8);
195 data.write_u32::<LittleEndian>(self.num_bits as u32)
196 .unwrap();
197 data.write_u32::<LittleEndian>(self.num_hashes as u32)
198 .unwrap();
199 data.write_u32::<LittleEndian>(num_words as u32).unwrap();
200 for i in 0..num_words {
201 data.write_u64::<LittleEndian>(self.bits.get(i)).unwrap();
202 }
203 data
204 }
205
206 pub fn insert(&mut self, key: &[u8]) {
208 let (h1, h2) = self.hash_pair(key);
209 for i in 0..self.num_hashes {
210 let bit_pos = self.get_bit_pos(h1, h2, i);
211 let word_idx = bit_pos / 64;
212 let bit_idx = bit_pos % 64;
213 if word_idx < self.bits.len() {
214 self.bits.set_bit(word_idx, bit_idx);
215 }
216 }
217 }
218
219 pub fn may_contain(&self, key: &[u8]) -> bool {
222 let (h1, h2) = self.hash_pair(key);
223 for i in 0..self.num_hashes {
224 let bit_pos = self.get_bit_pos(h1, h2, i);
225 let word_idx = bit_pos / 64;
226 let bit_idx = bit_pos % 64;
227 if word_idx >= self.bits.len() || (self.bits.get(word_idx) & (1u64 << bit_idx)) == 0 {
228 return false;
229 }
230 }
231 true
232 }
233
234 pub fn size_bytes(&self) -> usize {
236 12 + self.bits.size_bytes()
237 }
238
239 pub fn insert_hashed(&mut self, h1: u64, h2: u64) {
241 for i in 0..self.num_hashes {
242 let bit_pos = self.get_bit_pos(h1, h2, i);
243 let word_idx = bit_pos / 64;
244 let bit_idx = bit_pos % 64;
245 if word_idx < self.bits.len() {
246 self.bits.set_bit(word_idx, bit_idx);
247 }
248 }
249 }
250
251 #[inline]
253 fn hash_pair(&self, key: &[u8]) -> (u64, u64) {
254 let mut h1: u64 = 0xcbf29ce484222325;
255 let mut h2: u64 = 0x84222325cbf29ce4;
256 for &byte in key {
257 h1 ^= byte as u64;
258 h1 = h1.wrapping_mul(0x100000001b3);
259 h2 = h2.wrapping_mul(0x100000001b3);
260 h2 ^= byte as u64;
261 }
262 (h1, h2)
263 }
264
265 #[inline]
267 fn get_bit_pos(&self, h1: u64, h2: u64, i: usize) -> usize {
268 (h1.wrapping_add((i as u64).wrapping_mul(h2)) % (self.num_bits as u64)) as usize
269 }
270}
271
272#[inline]
275fn bloom_hash_pair(key: &[u8]) -> (u64, u64) {
276 let mut h1: u64 = 0xcbf29ce484222325;
277 let mut h2: u64 = 0x84222325cbf29ce4;
278 for &byte in key {
279 h1 ^= byte as u64;
280 h1 = h1.wrapping_mul(0x100000001b3);
281 h2 = h2.wrapping_mul(0x100000001b3);
282 h2 ^= byte as u64;
283 }
284 (h1, h2)
285}
286
287pub trait SSTableValue: Clone + Send + Sync {
289 fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()>;
290 fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self>;
291}
292
293impl SSTableValue for u64 {
295 fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
296 write_vint(writer, *self)
297 }
298
299 fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
300 read_vint(reader)
301 }
302}
303
304impl SSTableValue for Vec<u8> {
306 fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
307 write_vint(writer, self.len() as u64)?;
308 writer.write_all(self)
309 }
310
311 fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
312 let len = read_vint(reader)? as usize;
313 let mut data = vec![0u8; len];
314 reader.read_exact(&mut data)?;
315 Ok(data)
316 }
317}
318
319#[derive(Debug, Clone, Copy, PartialEq, Eq)]
322pub struct SparseDimInfo {
323 pub offset: u64,
325 pub length: u32,
327}
328
329impl SparseDimInfo {
330 pub fn new(offset: u64, length: u32) -> Self {
331 Self { offset, length }
332 }
333}
334
335impl SSTableValue for SparseDimInfo {
336 fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
337 write_vint(writer, self.offset)?;
338 write_vint(writer, self.length as u64)
339 }
340
341 fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
342 let offset = read_vint(reader)?;
343 let length = read_vint(reader)? as u32;
344 Ok(Self { offset, length })
345 }
346}
347
348pub const MAX_INLINE_POSTINGS: usize = 3;
350
351#[derive(Debug, Clone, PartialEq, Eq)]
359pub enum TermInfo {
360 Inline {
363 doc_freq: u8,
365 data: [u8; 16],
368 data_len: u8,
370 },
371 External {
373 posting_offset: u64,
374 posting_len: u64,
375 doc_freq: u32,
376 position_offset: u64,
378 position_len: u64,
380 },
381}
382
383impl TermInfo {
384 pub fn external(posting_offset: u64, posting_len: u64, doc_freq: u32) -> Self {
386 TermInfo::External {
387 posting_offset,
388 posting_len,
389 doc_freq,
390 position_offset: 0,
391 position_len: 0,
392 }
393 }
394
395 pub fn external_with_positions(
397 posting_offset: u64,
398 posting_len: u64,
399 doc_freq: u32,
400 position_offset: u64,
401 position_len: u64,
402 ) -> Self {
403 TermInfo::External {
404 posting_offset,
405 posting_len,
406 doc_freq,
407 position_offset,
408 position_len,
409 }
410 }
411
412 pub fn try_inline(doc_ids: &[u32], term_freqs: &[u32]) -> Option<Self> {
415 if doc_ids.len() > MAX_INLINE_POSTINGS || doc_ids.is_empty() {
416 return None;
417 }
418
419 let mut data = [0u8; 16];
420 let mut cursor = std::io::Cursor::new(&mut data[..]);
421 let mut prev_doc_id = 0u32;
422
423 for (i, &doc_id) in doc_ids.iter().enumerate() {
424 let delta = doc_id - prev_doc_id;
425 if write_vint(&mut cursor, delta as u64).is_err() {
426 return None;
427 }
428 if write_vint(&mut cursor, term_freqs[i] as u64).is_err() {
429 return None;
430 }
431 prev_doc_id = doc_id;
432 }
433
434 let data_len = cursor.position() as u8;
435 if data_len > 16 {
436 return None;
437 }
438
439 Some(TermInfo::Inline {
440 doc_freq: doc_ids.len() as u8,
441 data,
442 data_len,
443 })
444 }
445
446 pub fn try_inline_iter(count: usize, iter: impl Iterator<Item = (u32, u32)>) -> Option<Self> {
450 if count > MAX_INLINE_POSTINGS || count == 0 {
451 return None;
452 }
453
454 let mut data = [0u8; 16];
455 let mut cursor = std::io::Cursor::new(&mut data[..]);
456 let mut prev_doc_id = 0u32;
457
458 for (doc_id, tf) in iter {
459 let delta = doc_id - prev_doc_id;
460 if write_vint(&mut cursor, delta as u64).is_err() {
461 return None;
462 }
463 if write_vint(&mut cursor, tf as u64).is_err() {
464 return None;
465 }
466 prev_doc_id = doc_id;
467 }
468
469 let data_len = cursor.position() as u8;
470
471 Some(TermInfo::Inline {
472 doc_freq: count as u8,
473 data,
474 data_len,
475 })
476 }
477
478 pub fn doc_freq(&self) -> u32 {
480 match self {
481 TermInfo::Inline { doc_freq, .. } => *doc_freq as u32,
482 TermInfo::External { doc_freq, .. } => *doc_freq,
483 }
484 }
485
486 pub fn is_inline(&self) -> bool {
488 matches!(self, TermInfo::Inline { .. })
489 }
490
491 pub fn external_info(&self) -> Option<(u64, u64)> {
493 match self {
494 TermInfo::External {
495 posting_offset,
496 posting_len,
497 ..
498 } => Some((*posting_offset, *posting_len)),
499 TermInfo::Inline { .. } => None,
500 }
501 }
502
503 pub fn position_info(&self) -> Option<(u64, u64)> {
505 match self {
506 TermInfo::External {
507 position_offset,
508 position_len,
509 ..
510 } if *position_len > 0 => Some((*position_offset, *position_len)),
511 _ => None,
512 }
513 }
514
515 pub fn decode_inline(&self) -> Option<(Vec<u32>, Vec<u32>)> {
518 match self {
519 TermInfo::Inline {
520 doc_freq,
521 data,
522 data_len,
523 } => {
524 let mut doc_ids = Vec::with_capacity(*doc_freq as usize);
525 let mut term_freqs = Vec::with_capacity(*doc_freq as usize);
526 let mut reader = &data[..*data_len as usize];
527 let mut prev_doc_id = 0u32;
528
529 for _ in 0..*doc_freq {
530 let delta = read_vint(&mut reader).ok()? as u32;
531 let tf = read_vint(&mut reader).ok()? as u32;
532 let doc_id = prev_doc_id + delta;
533 doc_ids.push(doc_id);
534 term_freqs.push(tf);
535 prev_doc_id = doc_id;
536 }
537
538 Some((doc_ids, term_freqs))
539 }
540 TermInfo::External { .. } => None,
541 }
542 }
543}
544
545impl SSTableValue for TermInfo {
546 fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
547 match self {
548 TermInfo::Inline {
549 doc_freq,
550 data,
551 data_len,
552 } => {
553 writer.write_u8(0xFF)?;
555 writer.write_u8(*doc_freq)?;
556 writer.write_u8(*data_len)?;
557 writer.write_all(&data[..*data_len as usize])?;
558 }
559 TermInfo::External {
560 posting_offset,
561 posting_len,
562 doc_freq,
563 position_offset,
564 position_len,
565 } => {
566 if *position_len > 0 {
569 writer.write_u8(0x01)?;
570 write_vint(writer, *doc_freq as u64)?;
571 write_vint(writer, *posting_offset)?;
572 write_vint(writer, *posting_len)?;
573 write_vint(writer, *position_offset)?;
574 write_vint(writer, *position_len)?;
575 } else {
576 writer.write_u8(0x00)?;
577 write_vint(writer, *doc_freq as u64)?;
578 write_vint(writer, *posting_offset)?;
579 write_vint(writer, *posting_len)?;
580 }
581 }
582 }
583 Ok(())
584 }
585
586 fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
587 let tag = reader.read_u8()?;
588
589 if tag == 0xFF {
590 let doc_freq = reader.read_u8()?;
592 let data_len = reader.read_u8()?;
593 let mut data = [0u8; 16];
594 reader.read_exact(&mut data[..data_len as usize])?;
595 Ok(TermInfo::Inline {
596 doc_freq,
597 data,
598 data_len,
599 })
600 } else if tag == 0x00 {
601 let doc_freq = read_vint(reader)? as u32;
603 let posting_offset = read_vint(reader)?;
604 let posting_len = read_vint(reader)?;
605 Ok(TermInfo::External {
606 posting_offset,
607 posting_len,
608 doc_freq,
609 position_offset: 0,
610 position_len: 0,
611 })
612 } else if tag == 0x01 {
613 let doc_freq = read_vint(reader)? as u32;
615 let posting_offset = read_vint(reader)?;
616 let posting_len = read_vint(reader)?;
617 let position_offset = read_vint(reader)?;
618 let position_len = read_vint(reader)?;
619 Ok(TermInfo::External {
620 posting_offset,
621 posting_len,
622 doc_freq,
623 position_offset,
624 position_len,
625 })
626 } else {
627 Err(io::Error::new(
628 io::ErrorKind::InvalidData,
629 format!("Invalid TermInfo tag: {}", tag),
630 ))
631 }
632 }
633}
634
635pub fn write_vint<W: Write + ?Sized>(writer: &mut W, mut value: u64) -> io::Result<()> {
637 loop {
638 let byte = (value & 0x7F) as u8;
639 value >>= 7;
640 if value == 0 {
641 writer.write_u8(byte)?;
642 return Ok(());
643 } else {
644 writer.write_u8(byte | 0x80)?;
645 }
646 }
647}
648
649pub fn read_vint<R: Read>(reader: &mut R) -> io::Result<u64> {
651 let mut result = 0u64;
652 let mut shift = 0;
653
654 loop {
655 let byte = reader.read_u8()?;
656 result |= ((byte & 0x7F) as u64) << shift;
657 if byte & 0x80 == 0 {
658 return Ok(result);
659 }
660 shift += 7;
661 if shift >= 64 {
662 return Err(io::Error::new(
663 io::ErrorKind::InvalidData,
664 "varint too long",
665 ));
666 }
667 }
668}
669
670pub fn common_prefix_len(a: &[u8], b: &[u8]) -> usize {
672 a.iter().zip(b.iter()).take_while(|(x, y)| x == y).count()
673}
674
675#[derive(Debug, Clone)]
677pub struct SSTableStats {
678 pub num_blocks: usize,
679 pub num_sparse_entries: usize,
680 pub num_entries: u64,
681 pub has_bloom_filter: bool,
682 pub has_dictionary: bool,
683 pub bloom_filter_size: usize,
684 pub dictionary_size: usize,
685}
686
687#[derive(Debug, Clone)]
689pub struct SSTableWriterConfig {
690 pub compression_level: CompressionLevel,
692 pub use_dictionary: bool,
694 pub dict_size: usize,
696 pub use_bloom_filter: bool,
698 pub bloom_bits_per_key: usize,
700}
701
702impl Default for SSTableWriterConfig {
703 fn default() -> Self {
704 Self::from_optimization(crate::structures::IndexOptimization::default())
705 }
706}
707
708impl SSTableWriterConfig {
709 pub fn from_optimization(optimization: crate::structures::IndexOptimization) -> Self {
711 use crate::structures::IndexOptimization;
712 match optimization {
713 IndexOptimization::Adaptive => Self {
714 compression_level: CompressionLevel::BETTER, use_dictionary: false,
716 dict_size: DEFAULT_DICT_SIZE,
717 use_bloom_filter: true, bloom_bits_per_key: BLOOM_BITS_PER_KEY,
719 },
720 IndexOptimization::SizeOptimized => Self {
721 compression_level: CompressionLevel::MAX, use_dictionary: true,
723 dict_size: DEFAULT_DICT_SIZE,
724 use_bloom_filter: true,
725 bloom_bits_per_key: BLOOM_BITS_PER_KEY,
726 },
727 IndexOptimization::PerformanceOptimized => Self {
728 compression_level: CompressionLevel::FAST, use_dictionary: false,
730 dict_size: DEFAULT_DICT_SIZE,
731 use_bloom_filter: true, bloom_bits_per_key: BLOOM_BITS_PER_KEY,
733 },
734 }
735 }
736
737 pub fn fast() -> Self {
739 Self::from_optimization(crate::structures::IndexOptimization::PerformanceOptimized)
740 }
741
742 pub fn max_compression() -> Self {
744 Self::from_optimization(crate::structures::IndexOptimization::SizeOptimized)
745 }
746}
747
748pub struct SSTableWriter<W: Write, V: SSTableValue> {
754 writer: W,
755 block_buffer: Vec<u8>,
756 prev_key: Vec<u8>,
757 index: Vec<BlockIndexEntry>,
758 current_offset: u64,
759 num_entries: u64,
760 block_first_key: Option<Vec<u8>>,
761 config: SSTableWriterConfig,
762 dictionary: Option<CompressionDict>,
764 bloom_hashes: Vec<(u64, u64)>,
767 _phantom: std::marker::PhantomData<V>,
768}
769
770impl<W: Write, V: SSTableValue> SSTableWriter<W, V> {
771 pub fn new(writer: W) -> Self {
773 Self::with_config(writer, SSTableWriterConfig::default())
774 }
775
776 pub fn with_config(writer: W, config: SSTableWriterConfig) -> Self {
778 Self {
779 writer,
780 block_buffer: Vec::with_capacity(BLOCK_SIZE),
781 prev_key: Vec::new(),
782 index: Vec::new(),
783 current_offset: 0,
784 num_entries: 0,
785 block_first_key: None,
786 config,
787 dictionary: None,
788 bloom_hashes: Vec::new(),
789 _phantom: std::marker::PhantomData,
790 }
791 }
792
793 pub fn with_dictionary(
795 writer: W,
796 config: SSTableWriterConfig,
797 dictionary: CompressionDict,
798 ) -> Self {
799 Self {
800 writer,
801 block_buffer: Vec::with_capacity(BLOCK_SIZE),
802 prev_key: Vec::new(),
803 index: Vec::new(),
804 current_offset: 0,
805 num_entries: 0,
806 block_first_key: None,
807 config,
808 dictionary: Some(dictionary),
809 bloom_hashes: Vec::new(),
810 _phantom: std::marker::PhantomData,
811 }
812 }
813
814 pub fn insert(&mut self, key: &[u8], value: &V) -> io::Result<()> {
815 if self.block_first_key.is_none() {
816 self.block_first_key = Some(key.to_vec());
817 }
818
819 if self.config.use_bloom_filter {
821 self.bloom_hashes.push(bloom_hash_pair(key));
822 }
823
824 let prefix_len = common_prefix_len(&self.prev_key, key);
825 let suffix = &key[prefix_len..];
826
827 write_vint(&mut self.block_buffer, prefix_len as u64)?;
828 write_vint(&mut self.block_buffer, suffix.len() as u64)?;
829 self.block_buffer.extend_from_slice(suffix);
830 value.serialize(&mut self.block_buffer)?;
831
832 self.prev_key.clear();
833 self.prev_key.extend_from_slice(key);
834 self.num_entries += 1;
835
836 if self.block_buffer.len() >= BLOCK_SIZE {
837 self.flush_block()?;
838 }
839
840 Ok(())
841 }
842
843 fn flush_block(&mut self) -> io::Result<()> {
845 if self.block_buffer.is_empty() {
846 return Ok(());
847 }
848
849 let compressed = if let Some(ref dict) = self.dictionary {
851 crate::compression::compress_with_dict(
852 &self.block_buffer,
853 self.config.compression_level,
854 dict,
855 )?
856 } else {
857 crate::compression::compress(&self.block_buffer, self.config.compression_level)?
858 };
859
860 if let Some(first_key) = self.block_first_key.take() {
861 self.index.push(BlockIndexEntry {
862 first_key,
863 offset: self.current_offset,
864 length: compressed.len() as u32,
865 });
866 }
867
868 self.writer.write_all(&compressed)?;
869 self.current_offset += compressed.len() as u64;
870 self.block_buffer.clear();
871 self.prev_key.clear();
872
873 Ok(())
874 }
875
876 pub fn finish(mut self) -> io::Result<W> {
877 self.flush_block()?;
879
880 let bloom_filter = if self.config.use_bloom_filter && !self.bloom_hashes.is_empty() {
882 let mut bloom =
883 BloomFilter::new(self.bloom_hashes.len(), self.config.bloom_bits_per_key);
884 for (h1, h2) in &self.bloom_hashes {
885 bloom.insert_hashed(*h1, *h2);
886 }
887 Some(bloom)
888 } else {
889 None
890 };
891
892 let data_end_offset = self.current_offset;
893
894 let entries: Vec<(Vec<u8>, BlockAddr)> = self
897 .index
898 .iter()
899 .map(|e| {
900 (
901 e.first_key.clone(),
902 BlockAddr {
903 offset: e.offset,
904 length: e.length,
905 },
906 )
907 })
908 .collect();
909
910 #[cfg(feature = "native")]
912 let index_bytes = FstBlockIndex::build(&entries)?;
913 #[cfg(not(feature = "native"))]
914 let index_bytes = MmapBlockIndex::build(&entries)?;
915
916 self.writer
918 .write_u32::<LittleEndian>(index_bytes.len() as u32)?;
919 self.writer.write_all(&index_bytes)?;
920 self.current_offset += 4 + index_bytes.len() as u64;
921
922 let bloom_offset = if let Some(ref bloom) = bloom_filter {
924 let bloom_data = bloom.to_bytes();
925 let offset = self.current_offset;
926 self.writer.write_all(&bloom_data)?;
927 self.current_offset += bloom_data.len() as u64;
928 offset
929 } else {
930 0
931 };
932
933 let dict_offset = if let Some(ref dict) = self.dictionary {
935 let dict_bytes = dict.as_bytes();
936 let offset = self.current_offset;
937 self.writer
938 .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
939 self.writer.write_all(dict_bytes)?;
940 self.current_offset += 4 + dict_bytes.len() as u64;
941 offset
942 } else {
943 0
944 };
945
946 self.writer.write_u64::<LittleEndian>(data_end_offset)?;
948 self.writer.write_u64::<LittleEndian>(self.num_entries)?;
949 self.writer.write_u64::<LittleEndian>(bloom_offset)?; self.writer.write_u64::<LittleEndian>(dict_offset)?; self.writer
952 .write_u8(self.config.compression_level.0 as u8)?;
953 self.writer.write_u32::<LittleEndian>(SSTABLE_MAGIC)?;
954
955 Ok(self.writer)
956 }
957}
958
959#[derive(Debug, Clone)]
961struct BlockIndexEntry {
962 first_key: Vec<u8>,
963 offset: u64,
964 length: u32,
965}
966
967pub struct AsyncSSTableReader<V: SSTableValue> {
974 data_slice: FileHandle,
976 block_index: BlockIndex,
978 num_entries: u64,
979 cache: RwLock<BlockCache>,
981 bloom_filter: Option<BloomFilter>,
983 dictionary: Option<CompressionDict>,
985 #[allow(dead_code)]
987 compression_level: CompressionLevel,
988 _phantom: std::marker::PhantomData<V>,
989}
990
991struct BlockCache {
998 blocks: FxHashMap<u64, Arc<[u8]>>,
999 lru_order: std::collections::VecDeque<u64>,
1000 max_blocks: usize,
1001}
1002
1003impl BlockCache {
1004 fn new(max_blocks: usize) -> Self {
1005 Self {
1006 blocks: FxHashMap::default(),
1007 lru_order: std::collections::VecDeque::with_capacity(max_blocks),
1008 max_blocks,
1009 }
1010 }
1011
1012 fn get(&mut self, offset: u64) -> Option<Arc<[u8]>> {
1013 if self.blocks.contains_key(&offset) {
1014 self.promote(offset);
1015 self.blocks.get(&offset).map(Arc::clone)
1016 } else {
1017 None
1018 }
1019 }
1020
1021 fn peek(&self, offset: u64) -> Option<Arc<[u8]>> {
1023 self.blocks.get(&offset).map(Arc::clone)
1024 }
1025
1026 fn insert(&mut self, offset: u64, block: Arc<[u8]>) {
1027 if self.blocks.contains_key(&offset) {
1028 self.promote(offset);
1029 return;
1030 }
1031 while self.blocks.len() >= self.max_blocks {
1032 if let Some(evict_offset) = self.lru_order.pop_front() {
1033 self.blocks.remove(&evict_offset);
1034 } else {
1035 break;
1036 }
1037 }
1038 self.blocks.insert(offset, block);
1039 self.lru_order.push_back(offset);
1040 }
1041
1042 fn promote(&mut self, offset: u64) {
1044 if let Some(pos) = self.lru_order.iter().position(|&k| k == offset) {
1045 self.lru_order.remove(pos);
1046 self.lru_order.push_back(offset);
1047 }
1048 }
1049}
1050
1051impl<V: SSTableValue> AsyncSSTableReader<V> {
1052 pub async fn open(file_handle: FileHandle, cache_blocks: usize) -> io::Result<Self> {
1057 let file_len = file_handle.len();
1058 if file_len < 37 {
1059 return Err(io::Error::new(
1060 io::ErrorKind::InvalidData,
1061 "SSTable too small",
1062 ));
1063 }
1064
1065 let footer_bytes = file_handle
1068 .read_bytes_range(file_len - 37..file_len)
1069 .await?;
1070
1071 let mut reader = footer_bytes.as_slice();
1072 let data_end_offset = reader.read_u64::<LittleEndian>()?;
1073 let num_entries = reader.read_u64::<LittleEndian>()?;
1074 let bloom_offset = reader.read_u64::<LittleEndian>()?;
1075 let dict_offset = reader.read_u64::<LittleEndian>()?;
1076 let compression_level = CompressionLevel(reader.read_u8()? as i32);
1077 let magic = reader.read_u32::<LittleEndian>()?;
1078
1079 if magic != SSTABLE_MAGIC {
1080 return Err(io::Error::new(
1081 io::ErrorKind::InvalidData,
1082 format!("Invalid SSTable magic: 0x{:08X}", magic),
1083 ));
1084 }
1085
1086 let index_start = data_end_offset;
1088 let index_end = file_len - 37;
1089 let index_bytes = file_handle.read_bytes_range(index_start..index_end).await?;
1090
1091 let mut idx_reader = index_bytes.as_slice();
1093 let index_len = idx_reader.read_u32::<LittleEndian>()? as usize;
1094
1095 if index_len > idx_reader.len() {
1096 return Err(io::Error::new(
1097 io::ErrorKind::InvalidData,
1098 "Index data truncated",
1099 ));
1100 }
1101
1102 let index_data = index_bytes.slice(4..4 + index_len);
1103
1104 #[cfg(feature = "native")]
1106 let block_index = match FstBlockIndex::load(index_data.clone()) {
1107 Ok(fst_idx) => BlockIndex::Fst(fst_idx),
1108 Err(_) => BlockIndex::Mmap(MmapBlockIndex::load(index_data)?),
1109 };
1110 #[cfg(not(feature = "native"))]
1111 let block_index = BlockIndex::Mmap(MmapBlockIndex::load(index_data)?);
1112
1113 let bloom_filter = if bloom_offset > 0 {
1115 let bloom_start = bloom_offset;
1116 let bloom_header = file_handle
1118 .read_bytes_range(bloom_start..bloom_start + 12)
1119 .await?;
1120 let num_words = u32::from_le_bytes([
1121 bloom_header[8],
1122 bloom_header[9],
1123 bloom_header[10],
1124 bloom_header[11],
1125 ]) as u64;
1126 let bloom_size = 12 + num_words * 8;
1127 let bloom_data = file_handle
1128 .read_bytes_range(bloom_start..bloom_start + bloom_size)
1129 .await?;
1130 Some(BloomFilter::from_owned_bytes(bloom_data)?)
1131 } else {
1132 None
1133 };
1134
1135 let dictionary = if dict_offset > 0 {
1137 let dict_start = dict_offset;
1138 let dict_len_bytes = file_handle
1140 .read_bytes_range(dict_start..dict_start + 4)
1141 .await?;
1142 let dict_len = u32::from_le_bytes([
1143 dict_len_bytes[0],
1144 dict_len_bytes[1],
1145 dict_len_bytes[2],
1146 dict_len_bytes[3],
1147 ]) as u64;
1148 let dict_data = file_handle
1149 .read_bytes_range(dict_start + 4..dict_start + 4 + dict_len)
1150 .await?;
1151 Some(CompressionDict::from_owned_bytes(dict_data))
1152 } else {
1153 None
1154 };
1155
1156 let data_slice = file_handle.slice(0..data_end_offset);
1158
1159 Ok(Self {
1160 data_slice,
1161 block_index,
1162 num_entries,
1163 cache: RwLock::new(BlockCache::new(cache_blocks)),
1164 bloom_filter,
1165 dictionary,
1166 compression_level,
1167 _phantom: std::marker::PhantomData,
1168 })
1169 }
1170
1171 pub fn num_entries(&self) -> u64 {
1173 self.num_entries
1174 }
1175
1176 pub fn stats(&self) -> SSTableStats {
1178 SSTableStats {
1179 num_blocks: self.block_index.len(),
1180 num_sparse_entries: 0, num_entries: self.num_entries,
1182 has_bloom_filter: self.bloom_filter.is_some(),
1183 has_dictionary: self.dictionary.is_some(),
1184 bloom_filter_size: self
1185 .bloom_filter
1186 .as_ref()
1187 .map(|b| b.size_bytes())
1188 .unwrap_or(0),
1189 dictionary_size: self.dictionary.as_ref().map(|d| d.len()).unwrap_or(0),
1190 }
1191 }
1192
1193 pub fn cached_blocks(&self) -> usize {
1195 self.cache.read().blocks.len()
1196 }
1197
1198 pub async fn get(&self, key: &[u8]) -> io::Result<Option<V>> {
1203 log::debug!(
1204 "SSTable::get called, key_len={}, total_blocks={}",
1205 key.len(),
1206 self.block_index.len()
1207 );
1208
1209 if let Some(ref bloom) = self.bloom_filter
1211 && !bloom.may_contain(key)
1212 {
1213 log::debug!("SSTable::get bloom filter negative");
1214 return Ok(None);
1215 }
1216
1217 let block_idx = match self.block_index.locate(key) {
1219 Some(idx) => idx,
1220 None => {
1221 log::debug!("SSTable::get key not found (before first block)");
1222 return Ok(None);
1223 }
1224 };
1225
1226 log::debug!("SSTable::get loading block_idx={}", block_idx);
1227
1228 let block_data = self.load_block(block_idx).await?;
1230 self.search_block(&block_data, key)
1231 }
1232
1233 pub async fn get_batch(&self, keys: &[&[u8]]) -> io::Result<Vec<Option<V>>> {
1239 if keys.is_empty() {
1240 return Ok(Vec::new());
1241 }
1242
1243 let mut key_to_block: Vec<(usize, usize)> = Vec::with_capacity(keys.len());
1245 for (key_idx, key) in keys.iter().enumerate() {
1246 if let Some(ref bloom) = self.bloom_filter
1248 && !bloom.may_contain(key)
1249 {
1250 key_to_block.push((key_idx, usize::MAX)); continue;
1252 }
1253
1254 match self.block_index.locate(key) {
1255 Some(block_idx) => key_to_block.push((key_idx, block_idx)),
1256 None => key_to_block.push((key_idx, usize::MAX)), }
1258 }
1259
1260 let mut blocks_to_load: Vec<usize> = key_to_block
1262 .iter()
1263 .filter(|(_, b)| *b != usize::MAX)
1264 .map(|(_, b)| *b)
1265 .collect();
1266 blocks_to_load.sort_unstable();
1267 blocks_to_load.dedup();
1268
1269 for &block_idx in &blocks_to_load {
1271 let _ = self.load_block(block_idx).await?;
1272 }
1273
1274 let mut results = vec![None; keys.len()];
1276 for (key_idx, block_idx) in key_to_block {
1277 if block_idx == usize::MAX {
1278 continue;
1279 }
1280 let block_data = self.load_block(block_idx).await?; results[key_idx] = self.search_block(&block_data, keys[key_idx])?;
1282 }
1283
1284 Ok(results)
1285 }
1286
1287 pub async fn preload_all_blocks(&self) -> io::Result<()> {
1292 for block_idx in 0..self.block_index.len() {
1293 self.load_block(block_idx).await?;
1294 }
1295 Ok(())
1296 }
1297
1298 pub async fn prefetch_all_data_bulk(&self) -> io::Result<()> {
1304 let num_blocks = self.block_index.len();
1305 if num_blocks == 0 {
1306 return Ok(());
1307 }
1308
1309 let mut max_end: u64 = 0;
1311 for i in 0..num_blocks {
1312 if let Some(addr) = self.block_index.get_addr(i) {
1313 max_end = max_end.max(addr.offset + addr.length as u64);
1314 }
1315 }
1316
1317 let all_data = self.data_slice.read_bytes_range(0..max_end).await?;
1319 let buf = all_data.as_slice();
1320
1321 let mut cache = self.cache.write();
1323 cache.max_blocks = cache.max_blocks.max(num_blocks);
1324 for i in 0..num_blocks {
1325 let addr = self.block_index.get_addr(i).unwrap();
1326 if cache.get(addr.offset).is_some() {
1327 continue;
1328 }
1329 let compressed =
1330 &buf[addr.offset as usize..(addr.offset + addr.length as u64) as usize];
1331 let decompressed = if let Some(ref dict) = self.dictionary {
1332 crate::compression::decompress_with_dict(compressed, dict)?
1333 } else {
1334 crate::compression::decompress(compressed)?
1335 };
1336 cache.insert(addr.offset, Arc::from(decompressed));
1337 }
1338
1339 Ok(())
1340 }
1341
1342 async fn load_block(&self, block_idx: usize) -> io::Result<Arc<[u8]>> {
1345 let addr = self.block_index.get_addr(block_idx).ok_or_else(|| {
1346 io::Error::new(io::ErrorKind::InvalidInput, "Block index out of range")
1347 })?;
1348
1349 {
1351 if let Some(block) = self.cache.read().peek(addr.offset) {
1352 return Ok(block);
1353 }
1354 }
1355
1356 log::debug!(
1357 "SSTable::load_block idx={} CACHE MISS, reading bytes [{}-{}]",
1358 block_idx,
1359 addr.offset,
1360 addr.offset + addr.length as u64
1361 );
1362
1363 let range = addr.byte_range();
1365 let compressed = self.data_slice.read_bytes_range(range).await?;
1366
1367 let decompressed = if let Some(ref dict) = self.dictionary {
1369 crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
1370 } else {
1371 crate::compression::decompress(compressed.as_slice())?
1372 };
1373
1374 let block: Arc<[u8]> = Arc::from(decompressed);
1375
1376 {
1378 let mut cache = self.cache.write();
1379 cache.insert(addr.offset, Arc::clone(&block));
1380 }
1381
1382 Ok(block)
1383 }
1384
1385 #[cfg(feature = "sync")]
1387 fn load_block_sync(&self, block_idx: usize) -> io::Result<Arc<[u8]>> {
1388 let addr = self.block_index.get_addr(block_idx).ok_or_else(|| {
1389 io::Error::new(io::ErrorKind::InvalidInput, "Block index out of range")
1390 })?;
1391
1392 {
1394 if let Some(block) = self.cache.read().peek(addr.offset) {
1395 return Ok(block);
1396 }
1397 }
1398
1399 let range = addr.byte_range();
1401 let compressed = self.data_slice.read_bytes_range_sync(range)?;
1402
1403 let decompressed = if let Some(ref dict) = self.dictionary {
1405 crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
1406 } else {
1407 crate::compression::decompress(compressed.as_slice())?
1408 };
1409
1410 let block: Arc<[u8]> = Arc::from(decompressed);
1411
1412 {
1414 let mut cache = self.cache.write();
1415 cache.insert(addr.offset, Arc::clone(&block));
1416 }
1417
1418 Ok(block)
1419 }
1420
1421 #[cfg(feature = "sync")]
1423 pub fn get_sync(&self, key: &[u8]) -> io::Result<Option<V>> {
1424 if let Some(ref bloom) = self.bloom_filter
1426 && !bloom.may_contain(key)
1427 {
1428 return Ok(None);
1429 }
1430
1431 let block_idx = match self.block_index.locate(key) {
1433 Some(idx) => idx,
1434 None => {
1435 return Ok(None);
1436 }
1437 };
1438
1439 let block_data = self.load_block_sync(block_idx)?;
1440 self.search_block(&block_data, key)
1441 }
1442
1443 fn search_block(&self, block_data: &[u8], target_key: &[u8]) -> io::Result<Option<V>> {
1444 let mut reader = block_data;
1445 let mut current_key = Vec::new();
1446
1447 while !reader.is_empty() {
1448 let common_prefix_len = read_vint(&mut reader)? as usize;
1449 let suffix_len = read_vint(&mut reader)? as usize;
1450
1451 if suffix_len > reader.len() {
1452 return Err(io::Error::new(
1453 io::ErrorKind::UnexpectedEof,
1454 "SSTable block suffix truncated",
1455 ));
1456 }
1457 current_key.truncate(common_prefix_len);
1458 current_key.extend_from_slice(&reader[..suffix_len]);
1459 reader = &reader[suffix_len..];
1460
1461 let value = V::deserialize(&mut reader)?;
1462
1463 match current_key.as_slice().cmp(target_key) {
1464 std::cmp::Ordering::Equal => return Ok(Some(value)),
1465 std::cmp::Ordering::Greater => return Ok(None),
1466 std::cmp::Ordering::Less => continue,
1467 }
1468 }
1469
1470 Ok(None)
1471 }
1472
1473 pub async fn prefetch_range(&self, start_key: &[u8], end_key: &[u8]) -> io::Result<()> {
1475 let start_block = self.block_index.locate(start_key).unwrap_or(0);
1476 let end_block = self
1477 .block_index
1478 .locate(end_key)
1479 .unwrap_or(self.block_index.len().saturating_sub(1));
1480
1481 for block_idx in start_block..=end_block.min(self.block_index.len().saturating_sub(1)) {
1482 let _ = self.load_block(block_idx).await?;
1483 }
1484
1485 Ok(())
1486 }
1487
1488 pub fn iter(&self) -> AsyncSSTableIterator<'_, V> {
1490 AsyncSSTableIterator::new(self)
1491 }
1492
1493 pub async fn all_entries(&self) -> io::Result<Vec<(Vec<u8>, V)>> {
1495 let mut results = Vec::new();
1496
1497 for block_idx in 0..self.block_index.len() {
1498 let block_data = self.load_block(block_idx).await?;
1499 let mut reader = &block_data[..];
1500 let mut current_key = Vec::new();
1501
1502 while !reader.is_empty() {
1503 let common_prefix_len = read_vint(&mut reader)? as usize;
1504 let suffix_len = read_vint(&mut reader)? as usize;
1505
1506 if suffix_len > reader.len() {
1507 return Err(io::Error::new(
1508 io::ErrorKind::UnexpectedEof,
1509 "SSTable block suffix truncated",
1510 ));
1511 }
1512 current_key.truncate(common_prefix_len);
1513 current_key.extend_from_slice(&reader[..suffix_len]);
1514 reader = &reader[suffix_len..];
1515
1516 let value = V::deserialize(&mut reader)?;
1517 results.push((current_key.clone(), value));
1518 }
1519 }
1520
1521 Ok(results)
1522 }
1523}
1524
1525pub struct AsyncSSTableIterator<'a, V: SSTableValue> {
1527 reader: &'a AsyncSSTableReader<V>,
1528 current_block: usize,
1529 block_data: Option<Arc<[u8]>>,
1530 block_offset: usize,
1531 current_key: Vec<u8>,
1532 finished: bool,
1533}
1534
1535impl<'a, V: SSTableValue> AsyncSSTableIterator<'a, V> {
1536 fn new(reader: &'a AsyncSSTableReader<V>) -> Self {
1537 Self {
1538 reader,
1539 current_block: 0,
1540 block_data: None,
1541 block_offset: 0,
1542 current_key: Vec::new(),
1543 finished: reader.block_index.is_empty(),
1544 }
1545 }
1546
1547 async fn load_next_block(&mut self) -> io::Result<bool> {
1548 if self.current_block >= self.reader.block_index.len() {
1549 self.finished = true;
1550 return Ok(false);
1551 }
1552
1553 self.block_data = Some(self.reader.load_block(self.current_block).await?);
1554 self.block_offset = 0;
1555 self.current_key.clear();
1556 self.current_block += 1;
1557 Ok(true)
1558 }
1559
1560 pub async fn next(&mut self) -> io::Result<Option<(Vec<u8>, V)>> {
1562 if self.finished {
1563 return Ok(None);
1564 }
1565
1566 if self.block_data.is_none() && !self.load_next_block().await? {
1567 return Ok(None);
1568 }
1569
1570 loop {
1571 let block = self.block_data.as_ref().unwrap();
1572 if self.block_offset >= block.len() {
1573 if !self.load_next_block().await? {
1574 return Ok(None);
1575 }
1576 continue;
1577 }
1578
1579 let mut reader = &block[self.block_offset..];
1580 let start_len = reader.len();
1581
1582 let common_prefix_len = read_vint(&mut reader)? as usize;
1583 let suffix_len = read_vint(&mut reader)? as usize;
1584
1585 if suffix_len > reader.len() {
1586 return Err(io::Error::new(
1587 io::ErrorKind::UnexpectedEof,
1588 "SSTable block suffix truncated",
1589 ));
1590 }
1591 self.current_key.truncate(common_prefix_len);
1592 self.current_key.extend_from_slice(&reader[..suffix_len]);
1593 reader = &reader[suffix_len..];
1594
1595 let value = V::deserialize(&mut reader)?;
1596
1597 self.block_offset += start_len - reader.len();
1598
1599 return Ok(Some((self.current_key.clone(), value)));
1600 }
1601 }
1602}
1603
1604#[cfg(test)]
1605mod tests {
1606 use super::*;
1607
1608 #[test]
1609 fn test_bloom_filter_basic() {
1610 let mut bloom = BloomFilter::new(100, 10);
1611
1612 bloom.insert(b"hello");
1613 bloom.insert(b"world");
1614 bloom.insert(b"test");
1615
1616 assert!(bloom.may_contain(b"hello"));
1617 assert!(bloom.may_contain(b"world"));
1618 assert!(bloom.may_contain(b"test"));
1619
1620 assert!(!bloom.may_contain(b"notfound"));
1622 assert!(!bloom.may_contain(b"missing"));
1623 }
1624
1625 #[test]
1626 fn test_bloom_filter_serialization() {
1627 let mut bloom = BloomFilter::new(100, 10);
1628 bloom.insert(b"key1");
1629 bloom.insert(b"key2");
1630
1631 let bytes = bloom.to_bytes();
1632 let restored = BloomFilter::from_owned_bytes(OwnedBytes::new(bytes)).unwrap();
1633
1634 assert!(restored.may_contain(b"key1"));
1635 assert!(restored.may_contain(b"key2"));
1636 assert!(!restored.may_contain(b"key3"));
1637 }
1638
1639 #[test]
1640 fn test_bloom_filter_false_positive_rate() {
1641 let num_keys = 10000;
1642 let mut bloom = BloomFilter::new(num_keys, BLOOM_BITS_PER_KEY);
1643
1644 for i in 0..num_keys {
1646 let key = format!("key_{}", i);
1647 bloom.insert(key.as_bytes());
1648 }
1649
1650 for i in 0..num_keys {
1652 let key = format!("key_{}", i);
1653 assert!(bloom.may_contain(key.as_bytes()));
1654 }
1655
1656 let mut false_positives = 0;
1658 let test_count = 10000;
1659 for i in 0..test_count {
1660 let key = format!("nonexistent_{}", i);
1661 if bloom.may_contain(key.as_bytes()) {
1662 false_positives += 1;
1663 }
1664 }
1665
1666 let fp_rate = false_positives as f64 / test_count as f64;
1669 assert!(
1670 fp_rate < 0.03,
1671 "False positive rate {} is too high",
1672 fp_rate
1673 );
1674 }
1675
1676 #[test]
1677 fn test_sstable_writer_config() {
1678 use crate::structures::IndexOptimization;
1679
1680 let config = SSTableWriterConfig::default();
1682 assert_eq!(config.compression_level.0, 9); assert!(config.use_bloom_filter); assert!(!config.use_dictionary);
1685
1686 let adaptive = SSTableWriterConfig::from_optimization(IndexOptimization::Adaptive);
1688 assert_eq!(adaptive.compression_level.0, 9);
1689 assert!(adaptive.use_bloom_filter);
1690 assert!(!adaptive.use_dictionary);
1691
1692 let size = SSTableWriterConfig::from_optimization(IndexOptimization::SizeOptimized);
1694 assert_eq!(size.compression_level.0, 22); assert!(size.use_bloom_filter);
1696 assert!(size.use_dictionary);
1697
1698 let perf = SSTableWriterConfig::from_optimization(IndexOptimization::PerformanceOptimized);
1700 assert_eq!(perf.compression_level.0, 1); assert!(perf.use_bloom_filter); assert!(!perf.use_dictionary);
1703
1704 let fast = SSTableWriterConfig::fast();
1706 assert_eq!(fast.compression_level.0, 1);
1707
1708 let max = SSTableWriterConfig::max_compression();
1709 assert_eq!(max.compression_level.0, 22);
1710 }
1711
1712 #[test]
1713 fn test_vint_roundtrip() {
1714 let test_values = [0u64, 1, 127, 128, 255, 256, 16383, 16384, u64::MAX];
1715
1716 for &val in &test_values {
1717 let mut buf = Vec::new();
1718 write_vint(&mut buf, val).unwrap();
1719 let mut reader = buf.as_slice();
1720 let decoded = read_vint(&mut reader).unwrap();
1721 assert_eq!(val, decoded, "Failed for value {}", val);
1722 }
1723 }
1724
1725 #[test]
1726 fn test_common_prefix_len() {
1727 assert_eq!(common_prefix_len(b"hello", b"hello"), 5);
1728 assert_eq!(common_prefix_len(b"hello", b"help"), 3);
1729 assert_eq!(common_prefix_len(b"hello", b"world"), 0);
1730 assert_eq!(common_prefix_len(b"", b"hello"), 0);
1731 assert_eq!(common_prefix_len(b"hello", b""), 0);
1732 }
1733}