1use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
63use std::cmp::Ordering;
64use std::io::{Cursor, Read};
65
66pub const DEFAULT_RESTART_INTERVAL: usize = 16;
68
69pub const DEFAULT_HASH_BUCKET_RATIO: f64 = 0.75;
71
72#[repr(u8)]
74#[derive(Debug, Clone, Copy, PartialEq, Eq)]
75pub enum BlockType {
76 Uncompressed = 0,
78 Snappy = 1,
80 Lz4 = 2,
82 Zstd = 3,
84}
85
86impl TryFrom<u8> for BlockType {
87 type Error = ();
88
89 fn try_from(value: u8) -> Result<Self, Self::Error> {
90 match value {
91 0 => Ok(BlockType::Uncompressed),
92 1 => Ok(BlockType::Snappy),
93 2 => Ok(BlockType::Lz4),
94 3 => Ok(BlockType::Zstd),
95 _ => Err(()),
96 }
97 }
98}
99
100impl BlockType {
101 pub fn from_u8(value: u8) -> Self {
103 Self::try_from(value).unwrap_or(BlockType::Uncompressed)
104 }
105}
106
107#[derive(Debug, Clone, Copy, PartialEq, Eq)]
109pub struct BlockHandle {
110 pub offset: u64,
112 pub size: u64,
114}
115
116impl BlockHandle {
117 pub fn new(offset: u64, size: u64) -> Self {
118 Self { offset, size }
119 }
120
121 pub fn offset(&self) -> u64 {
123 self.offset
124 }
125
126 pub fn size(&self) -> u64 {
128 self.size
129 }
130
131 pub fn encode(&self) -> Vec<u8> {
133 let mut buf = Vec::with_capacity(20);
134 encode_varint(&mut buf, self.offset);
135 encode_varint(&mut buf, self.size);
136 buf
137 }
138
139 pub fn decode(data: &[u8]) -> Option<(Self, usize)> {
141 let mut cursor = Cursor::new(data);
142 let offset = decode_varint(&mut cursor)?;
143 let size = decode_varint(&mut cursor)?;
144 Some((Self { offset, size }, cursor.position() as usize))
145 }
146}
147
148#[derive(Debug, Clone)]
150struct RestartPoint {
151 offset: u32,
153}
154
155#[derive(Debug, Clone)]
157struct HashBucket {
158 restart_index: u8,
160}
161
162pub struct BlockBuilder {
168 buffer: Vec<u8>,
170 restarts: Vec<u32>,
172 entries_since_restart: usize,
174 restart_interval: usize,
176 last_key: Vec<u8>,
178 entry_count: usize,
180 use_hash_index: bool,
182 keys_for_hash: Vec<Vec<u8>>,
184}
185
186impl BlockBuilder {
187 pub fn new(restart_interval: usize) -> Self {
189 Self {
190 buffer: Vec::with_capacity(4096),
191 restarts: vec![0], entries_since_restart: 0,
193 restart_interval,
194 last_key: Vec::new(),
195 entry_count: 0,
196 use_hash_index: false,
197 keys_for_hash: Vec::new(),
198 }
199 }
200
201 pub fn with_hash_index(restart_interval: usize) -> Self {
203 Self {
204 use_hash_index: true,
205 ..Self::new(restart_interval)
206 }
207 }
208
209 pub fn add(&mut self, key: &[u8], value: &[u8]) {
213 debug_assert!(
214 self.entry_count == 0 || key > self.last_key.as_slice(),
215 "Keys must be added in sorted order"
216 );
217
218 let shared = if self.entries_since_restart >= self.restart_interval {
220 self.restarts.push(self.buffer.len() as u32);
222 self.entries_since_restart = 0;
223 0
224 } else {
225 self.shared_prefix_len(&self.last_key, key)
227 };
228
229 let non_shared = key.len() - shared;
230 let value_len = value.len();
231
232 encode_varint(&mut self.buffer, shared as u64);
234 encode_varint(&mut self.buffer, non_shared as u64);
235 encode_varint(&mut self.buffer, value_len as u64);
236 self.buffer.extend_from_slice(&key[shared..]);
237 self.buffer.extend_from_slice(value);
238
239 self.last_key.clear();
241 self.last_key.extend_from_slice(key);
242 self.entries_since_restart += 1;
243 self.entry_count += 1;
244
245 if self.use_hash_index {
247 self.keys_for_hash.push(key.to_vec());
248 }
249 }
250
251 fn shared_prefix_len(&self, a: &[u8], b: &[u8]) -> usize {
253 let mut shared = 0;
254 let min_len = a.len().min(b.len());
255 while shared < min_len && a[shared] == b[shared] {
256 shared += 1;
257 }
258 shared
259 }
260
261 pub fn finish(&mut self) -> Vec<u8> {
265 let mut result = std::mem::take(&mut self.buffer);
266
267 if self.use_hash_index && self.entry_count > 0 {
269 self.build_hash_index(&mut result);
270 }
271
272 for restart in &self.restarts {
274 result.write_u32::<LittleEndian>(*restart).unwrap();
275 }
276 result
277 .write_u32::<LittleEndian>(self.restarts.len() as u32)
278 .unwrap();
279
280 result
281 }
282
283 fn build_hash_index(&self, data: &mut Vec<u8>) {
285 let num_buckets = ((self.entry_count as f64 * DEFAULT_HASH_BUCKET_RATIO) as usize).max(1);
287 let mut buckets = vec![0xFFu8; num_buckets]; for (key_idx, key) in self.keys_for_hash.iter().enumerate() {
291 let restart_idx = key_idx / self.restart_interval;
292 let bucket = Self::hash_key(key) as usize % num_buckets;
293
294 let mut probe = bucket;
296 for _ in 0..num_buckets {
297 if buckets[probe] == 0xFF {
298 buckets[probe] = restart_idx as u8;
299 break;
300 }
301 probe = (probe + 1) % num_buckets;
302 }
303 }
304
305 data.extend_from_slice(&buckets);
307 data.write_u32::<LittleEndian>(num_buckets as u32).unwrap();
308 }
309
310 fn hash_key(key: &[u8]) -> u32 {
312 twox_hash::xxh3::hash64(key) as u32
314 }
315
316 pub fn is_empty(&self) -> bool {
318 self.entry_count == 0
319 }
320
321 pub fn estimated_size(&self) -> usize {
323 self.buffer.len() + self.restarts.len() * 4 + 4
324 }
325
326 pub fn reset(&mut self) {
328 self.buffer.clear();
329 self.restarts.clear();
330 self.restarts.push(0);
331 self.entries_since_restart = 0;
332 self.last_key.clear();
333 self.entry_count = 0;
334 self.keys_for_hash.clear();
335 }
336}
337
338impl Default for BlockBuilder {
339 fn default() -> Self {
340 Self::new(DEFAULT_RESTART_INTERVAL)
341 }
342}
343
344pub struct Block {
350 data: Vec<u8>,
352 restarts_offset: usize,
354 num_restarts: usize,
356 num_hash_buckets: usize,
358 hash_index_offset: usize,
360}
361
362impl Block {
363 pub fn new(data: Vec<u8>) -> Option<Self> {
365 if data.len() < 4 {
366 return None;
367 }
368
369 let num_restarts = {
371 let mut cursor = Cursor::new(&data[data.len() - 4..]);
372 cursor.read_u32::<LittleEndian>().ok()? as usize
373 };
374
375 if num_restarts == 0 || data.len() < 4 + num_restarts * 4 {
376 return None;
377 }
378
379 let restarts_offset = data.len() - 4 - num_restarts * 4;
380
381 let (num_hash_buckets, hash_index_offset) =
391 Self::detect_hash_index(&data, restarts_offset, num_restarts);
392
393 Some(Self {
394 data,
395 restarts_offset,
396 num_restarts,
397 num_hash_buckets,
398 hash_index_offset,
399 })
400 }
401
402 fn detect_hash_index(
407 data: &[u8],
408 restarts_offset: usize,
409 num_restarts: usize,
410 ) -> (usize, usize) {
411 if restarts_offset < 4 {
413 return (0, restarts_offset);
414 }
415
416 let nb_offset = restarts_offset - 4;
418 let candidate = u32::from_le_bytes([
419 data[nb_offset],
420 data[nb_offset + 1],
421 data[nb_offset + 2],
422 data[nb_offset + 3],
423 ]) as usize;
424
425 if candidate == 0 || candidate > nb_offset {
427 return (0, restarts_offset);
428 }
429
430 let hash_start = nb_offset - candidate;
431
432 let all_valid = data[hash_start..nb_offset]
437 .iter()
438 .all(|&b| b == 0xFF || (b as usize) < num_restarts);
439
440 if all_valid {
441 (candidate, hash_start)
442 } else {
443 (0, restarts_offset)
444 }
445 }
446
447 fn restart_offset(&self, index: usize) -> u32 {
449 debug_assert!(index < self.num_restarts);
450 let offset = self.restarts_offset + index * 4;
451 let mut cursor = Cursor::new(&self.data[offset..offset + 4]);
452 cursor.read_u32::<LittleEndian>().unwrap()
453 }
454
455 pub fn seek(&self, target: &[u8]) -> BlockIterator<'_> {
459 let mut left = 0;
461 let mut right = self.num_restarts;
462
463 while left < right {
464 let mid = left + (right - left) / 2;
465 let offset = self.restart_offset(mid) as usize;
466
467 let key = self.read_key_at(offset);
469
470 match key.as_slice().cmp(target) {
471 Ordering::Less => left = mid + 1,
472 Ordering::Greater => right = mid,
473 Ordering::Equal => {
474 return BlockIterator::new(self, offset);
475 }
476 }
477 }
478
479 let start_restart = if left > 0 { left - 1 } else { 0 };
482 let start_offset = self.restart_offset(start_restart) as usize;
483
484 let mut iter = BlockIterator::new(self, start_offset);
485 while iter.valid() {
486 if iter.key() >= target {
487 break;
488 }
489 iter.next();
490 }
491 iter
492 }
493
494 pub fn get(&self, key: &[u8]) -> Option<Vec<u8>> {
498 let iter = self.seek(key);
502 if iter.valid() && iter.key() == key {
503 Some(iter.value().to_vec())
504 } else {
505 None
506 }
507 }
508
509 fn read_key_at(&self, offset: usize) -> Vec<u8> {
511 let mut cursor = Cursor::new(&self.data[offset..self.hash_index_offset]);
512
513 let shared = decode_varint(&mut cursor).unwrap_or(0) as usize;
514 let non_shared = decode_varint(&mut cursor).unwrap_or(0) as usize;
515 let _value_len = decode_varint(&mut cursor);
516
517 debug_assert_eq!(shared, 0, "Expected restart point (shared = 0)");
518
519 let pos = cursor.position() as usize;
520 self.data[offset + pos..offset + pos + non_shared].to_vec()
521 }
522
523 pub fn iter(&self) -> BlockIterator<'_> {
525 BlockIterator::new(self, 0)
526 }
527
528 pub fn data(&self) -> &[u8] {
530 &self.data
531 }
532}
533
534pub struct BlockIterator<'a> {
540 block: &'a Block,
541 offset: usize,
543 key: Vec<u8>,
545 value_start: usize,
547 value_len: usize,
548 valid: bool,
550}
551
552impl<'a> BlockIterator<'a> {
553 pub fn new(block: &'a Block, offset: usize) -> Self {
555 let mut iter = Self {
556 block,
557 offset,
558 key: Vec::new(),
559 value_start: 0,
560 value_len: 0,
561 valid: false,
562 };
563 iter.parse_entry();
564 iter
565 }
566
567 pub fn valid(&self) -> bool {
569 self.valid
570 }
571
572 pub fn key(&self) -> &[u8] {
574 &self.key
575 }
576
577 pub fn value(&self) -> &[u8] {
579 &self.block.data[self.value_start..self.value_start + self.value_len]
580 }
581
582 pub fn next(&mut self) {
584 if !self.valid {
585 return;
586 }
587
588 self.offset = self.value_start + self.value_len;
590 self.parse_entry();
591 }
592
593 fn parse_entry(&mut self) {
595 let entries_end = self.block.hash_index_offset;
598
599 if self.offset >= entries_end {
600 self.valid = false;
601 return;
602 }
603
604 let mut cursor = Cursor::new(&self.block.data[self.offset..entries_end]);
605
606 let shared = match decode_varint(&mut cursor) {
608 Some(v) => v as usize,
609 None => {
610 self.valid = false;
611 return;
612 }
613 };
614 let non_shared = match decode_varint(&mut cursor) {
615 Some(v) => v as usize,
616 None => {
617 self.valid = false;
618 return;
619 }
620 };
621 let value_len = match decode_varint(&mut cursor) {
622 Some(v) => v as usize,
623 None => {
624 self.valid = false;
625 return;
626 }
627 };
628
629 let header_len = cursor.position() as usize;
630 let data_start = self.offset + header_len;
631
632 if data_start + non_shared + value_len > entries_end {
634 self.valid = false;
635 return;
636 }
637
638 self.key.truncate(shared);
640 self.key
641 .extend_from_slice(&self.block.data[data_start..data_start + non_shared]);
642
643 self.value_start = data_start + non_shared;
645 self.value_len = value_len;
646 self.valid = true;
647 }
648
649 pub fn seek(&mut self, target: &[u8]) {
651 let new_iter = self.block.seek(target);
653 self.offset = new_iter.offset;
654 self.key = new_iter.key;
655 self.value_start = new_iter.value_start;
656 self.value_len = new_iter.value_len;
657 self.valid = new_iter.valid;
658 }
659
660 pub fn seek_to_first(&mut self) {
662 self.offset = 0;
663 self.key.clear();
664 self.parse_entry();
665 }
666}
667
668fn encode_varint(buf: &mut Vec<u8>, mut value: u64) {
674 while value >= 0x80 {
675 buf.push((value as u8) | 0x80);
676 value >>= 7;
677 }
678 buf.push(value as u8);
679}
680
681fn decode_varint<R: Read>(reader: &mut R) -> Option<u64> {
683 let mut result: u64 = 0;
684 let mut shift = 0;
685
686 loop {
687 let byte = reader.read_u8().ok()?;
688 result |= ((byte & 0x7F) as u64) << shift;
689 if byte & 0x80 == 0 {
690 break;
691 }
692 shift += 7;
693 if shift >= 64 {
694 return None; }
696 }
697
698 Some(result)
699}
700
701#[cfg(test)]
706mod tests {
707 use super::*;
708
709 #[test]
710 fn test_block_builder_single_entry() {
711 let mut builder = BlockBuilder::new(16);
712 builder.add(b"key1", b"value1");
713
714 let data = builder.finish();
715 let block = Block::new(data).unwrap();
716
717 assert_eq!(block.get(b"key1"), Some(b"value1".to_vec()));
718 assert_eq!(block.get(b"key2"), None);
719 }
720
721 #[test]
722 fn test_block_builder_multiple_entries() {
723 let mut builder = BlockBuilder::new(4);
724
725 for i in 0..20 {
726 let key = format!("key{:02}", i);
727 let value = format!("value{:02}", i);
728 builder.add(key.as_bytes(), value.as_bytes());
729 }
730
731 let data = builder.finish();
732 let block = Block::new(data).unwrap();
733
734 for i in 0..20 {
736 let key = format!("key{:02}", i);
737 let expected_value = format!("value{:02}", i);
738 assert_eq!(block.get(key.as_bytes()), Some(expected_value.into_bytes()));
739 }
740 }
741
742 #[test]
743 fn test_block_iterator() {
744 let mut builder = BlockBuilder::new(4);
745
746 builder.add(b"apple", b"1");
747 builder.add(b"banana", b"2");
748 builder.add(b"cherry", b"3");
749 builder.add(b"date", b"4");
750
751 let data = builder.finish();
752 let block = Block::new(data).unwrap();
753
754 let mut iter = block.iter();
755 let mut count = 0;
756 while iter.valid() {
757 count += 1;
758 iter.next();
759 }
760 assert_eq!(count, 4);
761 }
762
763 #[test]
764 fn test_block_seek() {
765 let mut builder = BlockBuilder::new(2);
766
767 builder.add(b"a", b"1");
768 builder.add(b"c", b"2");
769 builder.add(b"e", b"3");
770 builder.add(b"g", b"4");
771
772 let data = builder.finish();
773 let block = Block::new(data).unwrap();
774
775 let iter = block.seek(b"c");
777 assert!(iter.valid());
778 assert_eq!(iter.key(), b"c");
779
780 let iter = block.seek(b"d");
782 assert!(iter.valid());
783 assert_eq!(iter.key(), b"e");
784
785 let iter = block.seek(b"z");
787 assert!(!iter.valid());
788 }
789
790 #[test]
791 fn test_prefix_compression() {
792 let mut builder = BlockBuilder::new(16);
793
794 builder.add(b"user:1000:age", b"30");
796 builder.add(b"user:1000:email", b"alice@example.com");
797 builder.add(b"user:1000:name", b"Alice");
798 builder.add(b"user:1001:name", b"Bob");
799
800 let data = builder.finish();
801
802 let uncompressed_size = b"user:1000:age".len()
804 + b"30".len()
805 + b"user:1000:email".len()
806 + b"alice@example.com".len()
807 + b"user:1000:name".len()
808 + b"Alice".len()
809 + b"user:1001:name".len()
810 + b"Bob".len();
811
812 assert!(data.len() < uncompressed_size + 50);
815
816 let block = Block::new(data).unwrap();
818 assert_eq!(block.get(b"user:1000:age"), Some(b"30".to_vec()));
819 assert_eq!(
820 block.get(b"user:1000:email"),
821 Some(b"alice@example.com".to_vec())
822 );
823 assert_eq!(block.get(b"user:1000:name"), Some(b"Alice".to_vec()));
824 assert_eq!(block.get(b"user:1001:name"), Some(b"Bob".to_vec()));
825 }
826
827 #[test]
828 fn test_varint_encoding() {
829 let test_values = [0, 1, 127, 128, 255, 256, 16383, 16384, u64::MAX];
830
831 for &value in &test_values {
832 let mut buf = Vec::new();
833 encode_varint(&mut buf, value);
834
835 let mut cursor = Cursor::new(&buf);
836 let decoded = decode_varint(&mut cursor).unwrap();
837
838 assert_eq!(value, decoded, "Failed for value {}", value);
839 }
840 }
841
842 #[test]
843 fn test_block_handle() {
844 let handle = BlockHandle::new(12345, 67890);
845 let encoded = handle.encode();
846
847 let (decoded, len) = BlockHandle::decode(&encoded).unwrap();
848 assert_eq!(handle, decoded);
849 assert_eq!(len, encoded.len());
850 }
851}