1use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
63use std::cmp::Ordering;
64use std::io::{Cursor, Read, Write};
65
66pub const DEFAULT_RESTART_INTERVAL: usize = 16;
68
69pub const DEFAULT_HASH_BUCKET_RATIO: f64 = 0.75;
71
72#[repr(u8)]
74#[derive(Debug, Clone, Copy, PartialEq, Eq)]
75pub enum BlockType {
76 Uncompressed = 0,
78 Snappy = 1,
80 Lz4 = 2,
82 Zstd = 3,
84}
85
86impl TryFrom<u8> for BlockType {
87 type Error = ();
88
89 fn try_from(value: u8) -> Result<Self, Self::Error> {
90 match value {
91 0 => Ok(BlockType::Uncompressed),
92 1 => Ok(BlockType::Snappy),
93 2 => Ok(BlockType::Lz4),
94 3 => Ok(BlockType::Zstd),
95 _ => Err(()),
96 }
97 }
98}
99
100impl BlockType {
101 pub fn from_u8(value: u8) -> Self {
103 Self::try_from(value).unwrap_or(BlockType::Uncompressed)
104 }
105}
106
107#[derive(Debug, Clone, Copy, PartialEq, Eq)]
109pub struct BlockHandle {
110 pub offset: u64,
112 pub size: u64,
114}
115
116impl BlockHandle {
117 pub fn new(offset: u64, size: u64) -> Self {
118 Self { offset, size }
119 }
120
121 pub fn offset(&self) -> u64 {
123 self.offset
124 }
125
126 pub fn size(&self) -> u64 {
128 self.size
129 }
130
131 pub fn encode(&self) -> Vec<u8> {
133 let mut buf = Vec::with_capacity(20);
134 encode_varint(&mut buf, self.offset);
135 encode_varint(&mut buf, self.size);
136 buf
137 }
138
139 pub fn decode(data: &[u8]) -> Option<(Self, usize)> {
141 let mut cursor = Cursor::new(data);
142 let offset = decode_varint(&mut cursor)?;
143 let size = decode_varint(&mut cursor)?;
144 Some((Self { offset, size }, cursor.position() as usize))
145 }
146}
147
148#[derive(Debug, Clone)]
150struct RestartPoint {
151 offset: u32,
153}
154
155#[derive(Debug, Clone)]
157struct HashBucket {
158 restart_index: u8,
160}
161
162pub struct BlockBuilder {
168 buffer: Vec<u8>,
170 restarts: Vec<u32>,
172 entries_since_restart: usize,
174 restart_interval: usize,
176 last_key: Vec<u8>,
178 entry_count: usize,
180 use_hash_index: bool,
182 keys_for_hash: Vec<Vec<u8>>,
184}
185
186impl BlockBuilder {
187 pub fn new(restart_interval: usize) -> Self {
189 Self {
190 buffer: Vec::with_capacity(4096),
191 restarts: vec![0], entries_since_restart: 0,
193 restart_interval,
194 last_key: Vec::new(),
195 entry_count: 0,
196 use_hash_index: false,
197 keys_for_hash: Vec::new(),
198 }
199 }
200
201 pub fn with_hash_index(restart_interval: usize) -> Self {
203 Self {
204 use_hash_index: true,
205 ..Self::new(restart_interval)
206 }
207 }
208
209 pub fn add(&mut self, key: &[u8], value: &[u8]) {
213 debug_assert!(
214 self.entry_count == 0 || key > self.last_key.as_slice(),
215 "Keys must be added in sorted order"
216 );
217
218 let shared = if self.entries_since_restart >= self.restart_interval {
220 self.restarts.push(self.buffer.len() as u32);
222 self.entries_since_restart = 0;
223 0
224 } else {
225 self.shared_prefix_len(&self.last_key, key)
227 };
228
229 let non_shared = key.len() - shared;
230 let value_len = value.len();
231
232 encode_varint(&mut self.buffer, shared as u64);
234 encode_varint(&mut self.buffer, non_shared as u64);
235 encode_varint(&mut self.buffer, value_len as u64);
236 self.buffer.extend_from_slice(&key[shared..]);
237 self.buffer.extend_from_slice(value);
238
239 self.last_key.clear();
241 self.last_key.extend_from_slice(key);
242 self.entries_since_restart += 1;
243 self.entry_count += 1;
244
245 if self.use_hash_index {
247 self.keys_for_hash.push(key.to_vec());
248 }
249 }
250
251 fn shared_prefix_len(&self, a: &[u8], b: &[u8]) -> usize {
253 let mut shared = 0;
254 let min_len = a.len().min(b.len());
255 while shared < min_len && a[shared] == b[shared] {
256 shared += 1;
257 }
258 shared
259 }
260
261 pub fn finish(&mut self) -> Vec<u8> {
265 let mut result = std::mem::take(&mut self.buffer);
266
267 if self.use_hash_index && self.entry_count > 0 {
269 self.build_hash_index(&mut result);
270 }
271
272 for restart in &self.restarts {
274 result.write_u32::<LittleEndian>(*restart).unwrap();
275 }
276 result
277 .write_u32::<LittleEndian>(self.restarts.len() as u32)
278 .unwrap();
279
280 result
281 }
282
283 fn build_hash_index(&self, data: &mut Vec<u8>) {
285 let num_buckets = ((self.entry_count as f64 * DEFAULT_HASH_BUCKET_RATIO) as usize).max(1);
287 let mut buckets = vec![0xFFu8; num_buckets]; for (key_idx, key) in self.keys_for_hash.iter().enumerate() {
291 let restart_idx = key_idx / self.restart_interval;
292 let bucket = Self::hash_key(key) as usize % num_buckets;
293
294 let mut probe = bucket;
296 for _ in 0..num_buckets {
297 if buckets[probe] == 0xFF {
298 buckets[probe] = restart_idx as u8;
299 break;
300 }
301 probe = (probe + 1) % num_buckets;
302 }
303 }
304
305 data.extend_from_slice(&buckets);
307 data.write_u32::<LittleEndian>(num_buckets as u32).unwrap();
308 }
309
310 fn hash_key(key: &[u8]) -> u32 {
312 twox_hash::xxh3::hash64(key) as u32
314 }
315
316 pub fn is_empty(&self) -> bool {
318 self.entry_count == 0
319 }
320
321 pub fn estimated_size(&self) -> usize {
323 self.buffer.len() + self.restarts.len() * 4 + 4
324 }
325
326 pub fn reset(&mut self) {
328 self.buffer.clear();
329 self.restarts.clear();
330 self.restarts.push(0);
331 self.entries_since_restart = 0;
332 self.last_key.clear();
333 self.entry_count = 0;
334 self.keys_for_hash.clear();
335 }
336}
337
338impl Default for BlockBuilder {
339 fn default() -> Self {
340 Self::new(DEFAULT_RESTART_INTERVAL)
341 }
342}
343
344pub struct Block {
350 data: Vec<u8>,
352 restarts_offset: usize,
354 num_restarts: usize,
356 num_hash_buckets: usize,
358 hash_index_offset: usize,
360}
361
362impl Block {
363 pub fn new(data: Vec<u8>) -> Option<Self> {
365 if data.len() < 4 {
366 return None;
367 }
368
369 let num_restarts = {
371 let mut cursor = Cursor::new(&data[data.len() - 4..]);
372 cursor.read_u32::<LittleEndian>().ok()? as usize
373 };
374
375 if num_restarts == 0 || data.len() < 4 + num_restarts * 4 {
376 return None;
377 }
378
379 let restarts_offset = data.len() - 4 - num_restarts * 4;
380
381 let num_hash_buckets = 0;
384 let hash_index_offset = restarts_offset;
385
386 Some(Self {
387 data,
388 restarts_offset,
389 num_restarts,
390 num_hash_buckets,
391 hash_index_offset,
392 })
393 }
394
395 fn restart_offset(&self, index: usize) -> u32 {
397 debug_assert!(index < self.num_restarts);
398 let offset = self.restarts_offset + index * 4;
399 let mut cursor = Cursor::new(&self.data[offset..offset + 4]);
400 cursor.read_u32::<LittleEndian>().unwrap()
401 }
402
403 pub fn seek(&self, target: &[u8]) -> BlockIterator<'_> {
407 let mut left = 0;
409 let mut right = self.num_restarts;
410
411 while left < right {
412 let mid = left + (right - left) / 2;
413 let offset = self.restart_offset(mid) as usize;
414
415 let key = self.read_key_at(offset);
417
418 match key.as_slice().cmp(target) {
419 Ordering::Less => left = mid + 1,
420 Ordering::Greater => right = mid,
421 Ordering::Equal => {
422 return BlockIterator::new(self, offset);
423 }
424 }
425 }
426
427 let start_restart = if left > 0 { left - 1 } else { 0 };
430 let start_offset = self.restart_offset(start_restart) as usize;
431
432 let mut iter = BlockIterator::new(self, start_offset);
433 while iter.valid() {
434 if iter.key() >= target {
435 break;
436 }
437 iter.next();
438 }
439 iter
440 }
441
442 pub fn get(&self, key: &[u8]) -> Option<Vec<u8>> {
446 let mut iter = self.seek(key);
450 if iter.valid() && iter.key() == key {
451 Some(iter.value().to_vec())
452 } else {
453 None
454 }
455 }
456
457 fn read_key_at(&self, offset: usize) -> Vec<u8> {
459 let mut cursor = Cursor::new(&self.data[offset..self.restarts_offset]);
460
461 let shared = decode_varint(&mut cursor).unwrap_or(0) as usize;
462 let non_shared = decode_varint(&mut cursor).unwrap_or(0) as usize;
463 let _value_len = decode_varint(&mut cursor);
464
465 debug_assert_eq!(shared, 0, "Expected restart point (shared = 0)");
466
467 let pos = cursor.position() as usize;
468 self.data[offset + pos..offset + pos + non_shared].to_vec()
469 }
470
471 pub fn iter(&self) -> BlockIterator<'_> {
473 BlockIterator::new(self, 0)
474 }
475
476 pub fn data(&self) -> &[u8] {
478 &self.data
479 }
480}
481
482pub struct BlockIterator<'a> {
488 block: &'a Block,
489 offset: usize,
491 key: Vec<u8>,
493 value_start: usize,
495 value_len: usize,
496 valid: bool,
498}
499
500impl<'a> BlockIterator<'a> {
501 pub fn new(block: &'a Block, offset: usize) -> Self {
503 let mut iter = Self {
504 block,
505 offset,
506 key: Vec::new(),
507 value_start: 0,
508 value_len: 0,
509 valid: false,
510 };
511 iter.parse_entry();
512 iter
513 }
514
515 pub fn valid(&self) -> bool {
517 self.valid
518 }
519
520 pub fn key(&self) -> &[u8] {
522 &self.key
523 }
524
525 pub fn value(&self) -> &[u8] {
527 &self.block.data[self.value_start..self.value_start + self.value_len]
528 }
529
530 pub fn next(&mut self) {
532 if !self.valid {
533 return;
534 }
535
536 self.offset = self.value_start + self.value_len;
538 self.parse_entry();
539 }
540
541 fn parse_entry(&mut self) {
543 if self.offset >= self.block.restarts_offset {
544 self.valid = false;
545 return;
546 }
547
548 let mut cursor = Cursor::new(&self.block.data[self.offset..self.block.restarts_offset]);
549
550 let shared = match decode_varint(&mut cursor) {
552 Some(v) => v as usize,
553 None => {
554 self.valid = false;
555 return;
556 }
557 };
558 let non_shared = match decode_varint(&mut cursor) {
559 Some(v) => v as usize,
560 None => {
561 self.valid = false;
562 return;
563 }
564 };
565 let value_len = match decode_varint(&mut cursor) {
566 Some(v) => v as usize,
567 None => {
568 self.valid = false;
569 return;
570 }
571 };
572
573 let header_len = cursor.position() as usize;
574 let data_start = self.offset + header_len;
575
576 if data_start + non_shared + value_len > self.block.restarts_offset {
578 self.valid = false;
579 return;
580 }
581
582 self.key.truncate(shared);
584 self.key
585 .extend_from_slice(&self.block.data[data_start..data_start + non_shared]);
586
587 self.value_start = data_start + non_shared;
589 self.value_len = value_len;
590 self.valid = true;
591 }
592
593 pub fn seek(&mut self, target: &[u8]) {
595 let new_iter = self.block.seek(target);
597 self.offset = new_iter.offset;
598 self.key = new_iter.key;
599 self.value_start = new_iter.value_start;
600 self.value_len = new_iter.value_len;
601 self.valid = new_iter.valid;
602 }
603
604 pub fn seek_to_first(&mut self) {
606 self.offset = 0;
607 self.key.clear();
608 self.parse_entry();
609 }
610}
611
612fn encode_varint(buf: &mut Vec<u8>, mut value: u64) {
618 while value >= 0x80 {
619 buf.push((value as u8) | 0x80);
620 value >>= 7;
621 }
622 buf.push(value as u8);
623}
624
625fn decode_varint<R: Read>(reader: &mut R) -> Option<u64> {
627 let mut result: u64 = 0;
628 let mut shift = 0;
629
630 loop {
631 let byte = reader.read_u8().ok()?;
632 result |= ((byte & 0x7F) as u64) << shift;
633 if byte & 0x80 == 0 {
634 break;
635 }
636 shift += 7;
637 if shift >= 64 {
638 return None; }
640 }
641
642 Some(result)
643}
644
645#[cfg(test)]
650mod tests {
651 use super::*;
652
653 #[test]
654 fn test_block_builder_single_entry() {
655 let mut builder = BlockBuilder::new(16);
656 builder.add(b"key1", b"value1");
657
658 let data = builder.finish();
659 let block = Block::new(data).unwrap();
660
661 assert_eq!(block.get(b"key1"), Some(b"value1".to_vec()));
662 assert_eq!(block.get(b"key2"), None);
663 }
664
665 #[test]
666 fn test_block_builder_multiple_entries() {
667 let mut builder = BlockBuilder::new(4);
668
669 for i in 0..20 {
670 let key = format!("key{:02}", i);
671 let value = format!("value{:02}", i);
672 builder.add(key.as_bytes(), value.as_bytes());
673 }
674
675 let data = builder.finish();
676 let block = Block::new(data).unwrap();
677
678 for i in 0..20 {
680 let key = format!("key{:02}", i);
681 let expected_value = format!("value{:02}", i);
682 assert_eq!(block.get(key.as_bytes()), Some(expected_value.into_bytes()));
683 }
684 }
685
686 #[test]
687 fn test_block_iterator() {
688 let mut builder = BlockBuilder::new(4);
689
690 builder.add(b"apple", b"1");
691 builder.add(b"banana", b"2");
692 builder.add(b"cherry", b"3");
693 builder.add(b"date", b"4");
694
695 let data = builder.finish();
696 let block = Block::new(data).unwrap();
697
698 let mut iter = block.iter();
699 let mut count = 0;
700 while iter.valid() {
701 count += 1;
702 iter.next();
703 }
704 assert_eq!(count, 4);
705 }
706
707 #[test]
708 fn test_block_seek() {
709 let mut builder = BlockBuilder::new(2);
710
711 builder.add(b"a", b"1");
712 builder.add(b"c", b"2");
713 builder.add(b"e", b"3");
714 builder.add(b"g", b"4");
715
716 let data = builder.finish();
717 let block = Block::new(data).unwrap();
718
719 let iter = block.seek(b"c");
721 assert!(iter.valid());
722 assert_eq!(iter.key(), b"c");
723
724 let iter = block.seek(b"d");
726 assert!(iter.valid());
727 assert_eq!(iter.key(), b"e");
728
729 let iter = block.seek(b"z");
731 assert!(!iter.valid());
732 }
733
734 #[test]
735 fn test_prefix_compression() {
736 let mut builder = BlockBuilder::new(16);
737
738 builder.add(b"user:1000:age", b"30");
740 builder.add(b"user:1000:email", b"alice@example.com");
741 builder.add(b"user:1000:name", b"Alice");
742 builder.add(b"user:1001:name", b"Bob");
743
744 let data = builder.finish();
745
746 let uncompressed_size =
748 b"user:1000:age".len() + b"30".len() +
749 b"user:1000:email".len() + b"alice@example.com".len() +
750 b"user:1000:name".len() + b"Alice".len() +
751 b"user:1001:name".len() + b"Bob".len();
752
753 assert!(data.len() < uncompressed_size + 50);
756
757 let block = Block::new(data).unwrap();
759 assert_eq!(block.get(b"user:1000:age"), Some(b"30".to_vec()));
760 assert_eq!(block.get(b"user:1000:email"), Some(b"alice@example.com".to_vec()));
761 assert_eq!(block.get(b"user:1000:name"), Some(b"Alice".to_vec()));
762 assert_eq!(block.get(b"user:1001:name"), Some(b"Bob".to_vec()));
763 }
764
765 #[test]
766 fn test_varint_encoding() {
767 let test_values = [0, 1, 127, 128, 255, 256, 16383, 16384, u64::MAX];
768
769 for &value in &test_values {
770 let mut buf = Vec::new();
771 encode_varint(&mut buf, value);
772
773 let mut cursor = Cursor::new(&buf);
774 let decoded = decode_varint(&mut cursor).unwrap();
775
776 assert_eq!(value, decoded, "Failed for value {}", value);
777 }
778 }
779
780 #[test]
781 fn test_block_handle() {
782 let handle = BlockHandle::new(12345, 67890);
783 let encoded = handle.encode();
784
785 let (decoded, len) = BlockHandle::decode(&encoded).unwrap();
786 assert_eq!(handle, decoded);
787 assert_eq!(len, encoded.len());
788 }
789}