1use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
63use std::cmp::Ordering;
64use std::io::{Cursor, Read, Write};
65
66pub const DEFAULT_RESTART_INTERVAL: usize = 16;
68
69pub const DEFAULT_HASH_BUCKET_RATIO: f64 = 0.75;
71
72#[repr(u8)]
74#[derive(Debug, Clone, Copy, PartialEq, Eq)]
75pub enum BlockType {
76 Uncompressed = 0,
78 Snappy = 1,
80 Lz4 = 2,
82 Zstd = 3,
84}
85
86impl TryFrom<u8> for BlockType {
87 type Error = ();
88
89 fn try_from(value: u8) -> Result<Self, Self::Error> {
90 match value {
91 0 => Ok(BlockType::Uncompressed),
92 1 => Ok(BlockType::Snappy),
93 2 => Ok(BlockType::Lz4),
94 3 => Ok(BlockType::Zstd),
95 _ => Err(()),
96 }
97 }
98}
99
100impl BlockType {
101 pub fn from_u8(value: u8) -> Self {
103 Self::try_from(value).unwrap_or(BlockType::Uncompressed)
104 }
105}
106
107#[derive(Debug, Clone, Copy, PartialEq, Eq)]
109pub struct BlockHandle {
110 pub offset: u64,
112 pub size: u64,
114}
115
116impl BlockHandle {
117 pub fn new(offset: u64, size: u64) -> Self {
118 Self { offset, size }
119 }
120
121 pub fn offset(&self) -> u64 {
123 self.offset
124 }
125
126 pub fn size(&self) -> u64 {
128 self.size
129 }
130
131 pub fn encode(&self) -> Vec<u8> {
133 let mut buf = Vec::with_capacity(20);
134 encode_varint(&mut buf, self.offset);
135 encode_varint(&mut buf, self.size);
136 buf
137 }
138
139 pub fn decode(data: &[u8]) -> Option<(Self, usize)> {
141 let mut cursor = Cursor::new(data);
142 let offset = decode_varint(&mut cursor)?;
143 let size = decode_varint(&mut cursor)?;
144 Some((Self { offset, size }, cursor.position() as usize))
145 }
146}
147
148#[derive(Debug, Clone)]
150struct RestartPoint {
151 offset: u32,
153}
154
155#[derive(Debug, Clone)]
157struct HashBucket {
158 restart_index: u8,
160}
161
162pub struct BlockBuilder {
168 buffer: Vec<u8>,
170 restarts: Vec<u32>,
172 entries_since_restart: usize,
174 restart_interval: usize,
176 last_key: Vec<u8>,
178 entry_count: usize,
180 use_hash_index: bool,
182 keys_for_hash: Vec<Vec<u8>>,
184}
185
186impl BlockBuilder {
187 pub fn new(restart_interval: usize) -> Self {
189 Self {
190 buffer: Vec::with_capacity(4096),
191 restarts: vec![0], entries_since_restart: 0,
193 restart_interval,
194 last_key: Vec::new(),
195 entry_count: 0,
196 use_hash_index: false,
197 keys_for_hash: Vec::new(),
198 }
199 }
200
201 pub fn with_hash_index(restart_interval: usize) -> Self {
203 Self {
204 use_hash_index: true,
205 ..Self::new(restart_interval)
206 }
207 }
208
209 pub fn add(&mut self, key: &[u8], value: &[u8]) {
213 debug_assert!(
214 self.entry_count == 0 || key > self.last_key.as_slice(),
215 "Keys must be added in sorted order"
216 );
217
218 let shared = if self.entries_since_restart >= self.restart_interval {
220 self.restarts.push(self.buffer.len() as u32);
222 self.entries_since_restart = 0;
223 0
224 } else {
225 self.shared_prefix_len(&self.last_key, key)
227 };
228
229 let non_shared = key.len() - shared;
230 let value_len = value.len();
231
232 encode_varint(&mut self.buffer, shared as u64);
234 encode_varint(&mut self.buffer, non_shared as u64);
235 encode_varint(&mut self.buffer, value_len as u64);
236 self.buffer.extend_from_slice(&key[shared..]);
237 self.buffer.extend_from_slice(value);
238
239 self.last_key.clear();
241 self.last_key.extend_from_slice(key);
242 self.entries_since_restart += 1;
243 self.entry_count += 1;
244
245 if self.use_hash_index {
247 self.keys_for_hash.push(key.to_vec());
248 }
249 }
250
251 fn shared_prefix_len(&self, a: &[u8], b: &[u8]) -> usize {
253 let mut shared = 0;
254 let min_len = a.len().min(b.len());
255 while shared < min_len && a[shared] == b[shared] {
256 shared += 1;
257 }
258 shared
259 }
260
261 pub fn finish(&mut self) -> Vec<u8> {
265 let mut result = std::mem::take(&mut self.buffer);
266
267 if self.use_hash_index && self.entry_count > 0 {
269 self.build_hash_index(&mut result);
270 }
271
272 for restart in &self.restarts {
274 result.write_u32::<LittleEndian>(*restart).unwrap();
275 }
276 result
277 .write_u32::<LittleEndian>(self.restarts.len() as u32)
278 .unwrap();
279
280 result
281 }
282
283 fn build_hash_index(&self, data: &mut Vec<u8>) {
285 let num_buckets = ((self.entry_count as f64 * DEFAULT_HASH_BUCKET_RATIO) as usize).max(1);
287 let mut buckets = vec![0xFFu8; num_buckets]; for (key_idx, key) in self.keys_for_hash.iter().enumerate() {
291 let restart_idx = key_idx / self.restart_interval;
292 let bucket = Self::hash_key(key) as usize % num_buckets;
293
294 let mut probe = bucket;
296 for _ in 0..num_buckets {
297 if buckets[probe] == 0xFF {
298 buckets[probe] = restart_idx as u8;
299 break;
300 }
301 probe = (probe + 1) % num_buckets;
302 }
303 }
304
305 data.extend_from_slice(&buckets);
307 data.write_u32::<LittleEndian>(num_buckets as u32).unwrap();
308 }
309
310 fn hash_key(key: &[u8]) -> u32 {
312 twox_hash::xxh3::hash64(key) as u32
314 }
315
316 pub fn is_empty(&self) -> bool {
318 self.entry_count == 0
319 }
320
321 pub fn estimated_size(&self) -> usize {
323 self.buffer.len() + self.restarts.len() * 4 + 4
324 }
325
326 pub fn reset(&mut self) {
328 self.buffer.clear();
329 self.restarts.clear();
330 self.restarts.push(0);
331 self.entries_since_restart = 0;
332 self.last_key.clear();
333 self.entry_count = 0;
334 self.keys_for_hash.clear();
335 }
336}
337
338impl Default for BlockBuilder {
339 fn default() -> Self {
340 Self::new(DEFAULT_RESTART_INTERVAL)
341 }
342}
343
344pub struct Block {
350 data: Vec<u8>,
352 restarts_offset: usize,
354 num_restarts: usize,
356 num_hash_buckets: usize,
358 hash_index_offset: usize,
360}
361
362impl Block {
363 pub fn new(data: Vec<u8>) -> Option<Self> {
365 if data.len() < 4 {
366 return None;
367 }
368
369 let num_restarts = {
371 let mut cursor = Cursor::new(&data[data.len() - 4..]);
372 cursor.read_u32::<LittleEndian>().ok()? as usize
373 };
374
375 if num_restarts == 0 || data.len() < 4 + num_restarts * 4 {
376 return None;
377 }
378
379 let restarts_offset = data.len() - 4 - num_restarts * 4;
380
381 let (num_hash_buckets, hash_index_offset) = Self::detect_hash_index(&data, restarts_offset, num_restarts);
391
392 Some(Self {
393 data,
394 restarts_offset,
395 num_restarts,
396 num_hash_buckets,
397 hash_index_offset,
398 })
399 }
400
401 fn detect_hash_index(data: &[u8], restarts_offset: usize, num_restarts: usize) -> (usize, usize) {
406 if restarts_offset < 4 {
408 return (0, restarts_offset);
409 }
410
411 let nb_offset = restarts_offset - 4;
413 let candidate = u32::from_le_bytes([
414 data[nb_offset],
415 data[nb_offset + 1],
416 data[nb_offset + 2],
417 data[nb_offset + 3],
418 ]) as usize;
419
420 if candidate == 0 || candidate > nb_offset {
422 return (0, restarts_offset);
423 }
424
425 let hash_start = nb_offset - candidate;
426
427 let all_valid = data[hash_start..nb_offset]
432 .iter()
433 .all(|&b| b == 0xFF || (b as usize) < num_restarts);
434
435 if all_valid {
436 (candidate, hash_start)
437 } else {
438 (0, restarts_offset)
439 }
440 }
441
442 fn restart_offset(&self, index: usize) -> u32 {
444 debug_assert!(index < self.num_restarts);
445 let offset = self.restarts_offset + index * 4;
446 let mut cursor = Cursor::new(&self.data[offset..offset + 4]);
447 cursor.read_u32::<LittleEndian>().unwrap()
448 }
449
450 pub fn seek(&self, target: &[u8]) -> BlockIterator<'_> {
454 let mut left = 0;
456 let mut right = self.num_restarts;
457
458 while left < right {
459 let mid = left + (right - left) / 2;
460 let offset = self.restart_offset(mid) as usize;
461
462 let key = self.read_key_at(offset);
464
465 match key.as_slice().cmp(target) {
466 Ordering::Less => left = mid + 1,
467 Ordering::Greater => right = mid,
468 Ordering::Equal => {
469 return BlockIterator::new(self, offset);
470 }
471 }
472 }
473
474 let start_restart = if left > 0 { left - 1 } else { 0 };
477 let start_offset = self.restart_offset(start_restart) as usize;
478
479 let mut iter = BlockIterator::new(self, start_offset);
480 while iter.valid() {
481 if iter.key() >= target {
482 break;
483 }
484 iter.next();
485 }
486 iter
487 }
488
489 pub fn get(&self, key: &[u8]) -> Option<Vec<u8>> {
493 let mut iter = self.seek(key);
497 if iter.valid() && iter.key() == key {
498 Some(iter.value().to_vec())
499 } else {
500 None
501 }
502 }
503
504 fn read_key_at(&self, offset: usize) -> Vec<u8> {
506 let mut cursor = Cursor::new(&self.data[offset..self.hash_index_offset]);
507
508 let shared = decode_varint(&mut cursor).unwrap_or(0) as usize;
509 let non_shared = decode_varint(&mut cursor).unwrap_or(0) as usize;
510 let _value_len = decode_varint(&mut cursor);
511
512 debug_assert_eq!(shared, 0, "Expected restart point (shared = 0)");
513
514 let pos = cursor.position() as usize;
515 self.data[offset + pos..offset + pos + non_shared].to_vec()
516 }
517
518 pub fn iter(&self) -> BlockIterator<'_> {
520 BlockIterator::new(self, 0)
521 }
522
523 pub fn data(&self) -> &[u8] {
525 &self.data
526 }
527}
528
529pub struct BlockIterator<'a> {
535 block: &'a Block,
536 offset: usize,
538 key: Vec<u8>,
540 value_start: usize,
542 value_len: usize,
543 valid: bool,
545}
546
547impl<'a> BlockIterator<'a> {
548 pub fn new(block: &'a Block, offset: usize) -> Self {
550 let mut iter = Self {
551 block,
552 offset,
553 key: Vec::new(),
554 value_start: 0,
555 value_len: 0,
556 valid: false,
557 };
558 iter.parse_entry();
559 iter
560 }
561
562 pub fn valid(&self) -> bool {
564 self.valid
565 }
566
567 pub fn key(&self) -> &[u8] {
569 &self.key
570 }
571
572 pub fn value(&self) -> &[u8] {
574 &self.block.data[self.value_start..self.value_start + self.value_len]
575 }
576
577 pub fn next(&mut self) {
579 if !self.valid {
580 return;
581 }
582
583 self.offset = self.value_start + self.value_len;
585 self.parse_entry();
586 }
587
588 fn parse_entry(&mut self) {
590 let entries_end = self.block.hash_index_offset;
593
594 if self.offset >= entries_end {
595 self.valid = false;
596 return;
597 }
598
599 let mut cursor = Cursor::new(&self.block.data[self.offset..entries_end]);
600
601 let shared = match decode_varint(&mut cursor) {
603 Some(v) => v as usize,
604 None => {
605 self.valid = false;
606 return;
607 }
608 };
609 let non_shared = match decode_varint(&mut cursor) {
610 Some(v) => v as usize,
611 None => {
612 self.valid = false;
613 return;
614 }
615 };
616 let value_len = match decode_varint(&mut cursor) {
617 Some(v) => v as usize,
618 None => {
619 self.valid = false;
620 return;
621 }
622 };
623
624 let header_len = cursor.position() as usize;
625 let data_start = self.offset + header_len;
626
627 if data_start + non_shared + value_len > entries_end {
629 self.valid = false;
630 return;
631 }
632
633 self.key.truncate(shared);
635 self.key
636 .extend_from_slice(&self.block.data[data_start..data_start + non_shared]);
637
638 self.value_start = data_start + non_shared;
640 self.value_len = value_len;
641 self.valid = true;
642 }
643
644 pub fn seek(&mut self, target: &[u8]) {
646 let new_iter = self.block.seek(target);
648 self.offset = new_iter.offset;
649 self.key = new_iter.key;
650 self.value_start = new_iter.value_start;
651 self.value_len = new_iter.value_len;
652 self.valid = new_iter.valid;
653 }
654
655 pub fn seek_to_first(&mut self) {
657 self.offset = 0;
658 self.key.clear();
659 self.parse_entry();
660 }
661}
662
663fn encode_varint(buf: &mut Vec<u8>, mut value: u64) {
669 while value >= 0x80 {
670 buf.push((value as u8) | 0x80);
671 value >>= 7;
672 }
673 buf.push(value as u8);
674}
675
676fn decode_varint<R: Read>(reader: &mut R) -> Option<u64> {
678 let mut result: u64 = 0;
679 let mut shift = 0;
680
681 loop {
682 let byte = reader.read_u8().ok()?;
683 result |= ((byte & 0x7F) as u64) << shift;
684 if byte & 0x80 == 0 {
685 break;
686 }
687 shift += 7;
688 if shift >= 64 {
689 return None; }
691 }
692
693 Some(result)
694}
695
696#[cfg(test)]
701mod tests {
702 use super::*;
703
704 #[test]
705 fn test_block_builder_single_entry() {
706 let mut builder = BlockBuilder::new(16);
707 builder.add(b"key1", b"value1");
708
709 let data = builder.finish();
710 let block = Block::new(data).unwrap();
711
712 assert_eq!(block.get(b"key1"), Some(b"value1".to_vec()));
713 assert_eq!(block.get(b"key2"), None);
714 }
715
716 #[test]
717 fn test_block_builder_multiple_entries() {
718 let mut builder = BlockBuilder::new(4);
719
720 for i in 0..20 {
721 let key = format!("key{:02}", i);
722 let value = format!("value{:02}", i);
723 builder.add(key.as_bytes(), value.as_bytes());
724 }
725
726 let data = builder.finish();
727 let block = Block::new(data).unwrap();
728
729 for i in 0..20 {
731 let key = format!("key{:02}", i);
732 let expected_value = format!("value{:02}", i);
733 assert_eq!(block.get(key.as_bytes()), Some(expected_value.into_bytes()));
734 }
735 }
736
737 #[test]
738 fn test_block_iterator() {
739 let mut builder = BlockBuilder::new(4);
740
741 builder.add(b"apple", b"1");
742 builder.add(b"banana", b"2");
743 builder.add(b"cherry", b"3");
744 builder.add(b"date", b"4");
745
746 let data = builder.finish();
747 let block = Block::new(data).unwrap();
748
749 let mut iter = block.iter();
750 let mut count = 0;
751 while iter.valid() {
752 count += 1;
753 iter.next();
754 }
755 assert_eq!(count, 4);
756 }
757
758 #[test]
759 fn test_block_seek() {
760 let mut builder = BlockBuilder::new(2);
761
762 builder.add(b"a", b"1");
763 builder.add(b"c", b"2");
764 builder.add(b"e", b"3");
765 builder.add(b"g", b"4");
766
767 let data = builder.finish();
768 let block = Block::new(data).unwrap();
769
770 let iter = block.seek(b"c");
772 assert!(iter.valid());
773 assert_eq!(iter.key(), b"c");
774
775 let iter = block.seek(b"d");
777 assert!(iter.valid());
778 assert_eq!(iter.key(), b"e");
779
780 let iter = block.seek(b"z");
782 assert!(!iter.valid());
783 }
784
785 #[test]
786 fn test_prefix_compression() {
787 let mut builder = BlockBuilder::new(16);
788
789 builder.add(b"user:1000:age", b"30");
791 builder.add(b"user:1000:email", b"alice@example.com");
792 builder.add(b"user:1000:name", b"Alice");
793 builder.add(b"user:1001:name", b"Bob");
794
795 let data = builder.finish();
796
797 let uncompressed_size =
799 b"user:1000:age".len() + b"30".len() +
800 b"user:1000:email".len() + b"alice@example.com".len() +
801 b"user:1000:name".len() + b"Alice".len() +
802 b"user:1001:name".len() + b"Bob".len();
803
804 assert!(data.len() < uncompressed_size + 50);
807
808 let block = Block::new(data).unwrap();
810 assert_eq!(block.get(b"user:1000:age"), Some(b"30".to_vec()));
811 assert_eq!(block.get(b"user:1000:email"), Some(b"alice@example.com".to_vec()));
812 assert_eq!(block.get(b"user:1000:name"), Some(b"Alice".to_vec()));
813 assert_eq!(block.get(b"user:1001:name"), Some(b"Bob".to_vec()));
814 }
815
816 #[test]
817 fn test_varint_encoding() {
818 let test_values = [0, 1, 127, 128, 255, 256, 16383, 16384, u64::MAX];
819
820 for &value in &test_values {
821 let mut buf = Vec::new();
822 encode_varint(&mut buf, value);
823
824 let mut cursor = Cursor::new(&buf);
825 let decoded = decode_varint(&mut cursor).unwrap();
826
827 assert_eq!(value, decoded, "Failed for value {}", value);
828 }
829 }
830
831 #[test]
832 fn test_block_handle() {
833 let handle = BlockHandle::new(12345, 67890);
834 let encoded = handle.encode();
835
836 let (decoded, len) = BlockHandle::decode(&encoded).unwrap();
837 assert_eq!(handle, decoded);
838 assert_eq!(len, encoded.len());
839 }
840}