1use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
60use std::cmp::Ordering;
61use std::io::{Cursor, Read, Write};
62
63pub const DEFAULT_RESTART_INTERVAL: usize = 16;
65
66pub const DEFAULT_HASH_BUCKET_RATIO: f64 = 0.75;
68
69#[repr(u8)]
71#[derive(Debug, Clone, Copy, PartialEq, Eq)]
72pub enum BlockType {
73 Uncompressed = 0,
75 Snappy = 1,
77 Lz4 = 2,
79 Zstd = 3,
81}
82
83impl TryFrom<u8> for BlockType {
84 type Error = ();
85
86 fn try_from(value: u8) -> Result<Self, Self::Error> {
87 match value {
88 0 => Ok(BlockType::Uncompressed),
89 1 => Ok(BlockType::Snappy),
90 2 => Ok(BlockType::Lz4),
91 3 => Ok(BlockType::Zstd),
92 _ => Err(()),
93 }
94 }
95}
96
97impl BlockType {
98 pub fn from_u8(value: u8) -> Self {
100 Self::try_from(value).unwrap_or(BlockType::Uncompressed)
101 }
102}
103
104#[derive(Debug, Clone, Copy, PartialEq, Eq)]
106pub struct BlockHandle {
107 pub offset: u64,
109 pub size: u64,
111}
112
113impl BlockHandle {
114 pub fn new(offset: u64, size: u64) -> Self {
115 Self { offset, size }
116 }
117
118 pub fn offset(&self) -> u64 {
120 self.offset
121 }
122
123 pub fn size(&self) -> u64 {
125 self.size
126 }
127
128 pub fn encode(&self) -> Vec<u8> {
130 let mut buf = Vec::with_capacity(20);
131 encode_varint(&mut buf, self.offset);
132 encode_varint(&mut buf, self.size);
133 buf
134 }
135
136 pub fn decode(data: &[u8]) -> Option<(Self, usize)> {
138 let mut cursor = Cursor::new(data);
139 let offset = decode_varint(&mut cursor)?;
140 let size = decode_varint(&mut cursor)?;
141 Some((Self { offset, size }, cursor.position() as usize))
142 }
143}
144
145#[derive(Debug, Clone)]
147struct RestartPoint {
148 offset: u32,
150}
151
152#[derive(Debug, Clone)]
154struct HashBucket {
155 restart_index: u8,
157}
158
159pub struct BlockBuilder {
165 buffer: Vec<u8>,
167 restarts: Vec<u32>,
169 entries_since_restart: usize,
171 restart_interval: usize,
173 last_key: Vec<u8>,
175 entry_count: usize,
177 use_hash_index: bool,
179 keys_for_hash: Vec<Vec<u8>>,
181}
182
183impl BlockBuilder {
184 pub fn new(restart_interval: usize) -> Self {
186 Self {
187 buffer: Vec::with_capacity(4096),
188 restarts: vec![0], entries_since_restart: 0,
190 restart_interval,
191 last_key: Vec::new(),
192 entry_count: 0,
193 use_hash_index: false,
194 keys_for_hash: Vec::new(),
195 }
196 }
197
198 pub fn with_hash_index(restart_interval: usize) -> Self {
200 Self {
201 use_hash_index: true,
202 ..Self::new(restart_interval)
203 }
204 }
205
206 pub fn add(&mut self, key: &[u8], value: &[u8]) {
210 debug_assert!(
211 self.entry_count == 0 || key > self.last_key.as_slice(),
212 "Keys must be added in sorted order"
213 );
214
215 let shared = if self.entries_since_restart >= self.restart_interval {
217 self.restarts.push(self.buffer.len() as u32);
219 self.entries_since_restart = 0;
220 0
221 } else {
222 self.shared_prefix_len(&self.last_key, key)
224 };
225
226 let non_shared = key.len() - shared;
227 let value_len = value.len();
228
229 encode_varint(&mut self.buffer, shared as u64);
231 encode_varint(&mut self.buffer, non_shared as u64);
232 encode_varint(&mut self.buffer, value_len as u64);
233 self.buffer.extend_from_slice(&key[shared..]);
234 self.buffer.extend_from_slice(value);
235
236 self.last_key.clear();
238 self.last_key.extend_from_slice(key);
239 self.entries_since_restart += 1;
240 self.entry_count += 1;
241
242 if self.use_hash_index {
244 self.keys_for_hash.push(key.to_vec());
245 }
246 }
247
248 fn shared_prefix_len(&self, a: &[u8], b: &[u8]) -> usize {
250 let mut shared = 0;
251 let min_len = a.len().min(b.len());
252 while shared < min_len && a[shared] == b[shared] {
253 shared += 1;
254 }
255 shared
256 }
257
258 pub fn finish(&mut self) -> Vec<u8> {
262 let mut result = std::mem::take(&mut self.buffer);
263
264 if self.use_hash_index && self.entry_count > 0 {
266 self.build_hash_index(&mut result);
267 }
268
269 for restart in &self.restarts {
271 result.write_u32::<LittleEndian>(*restart).unwrap();
272 }
273 result
274 .write_u32::<LittleEndian>(self.restarts.len() as u32)
275 .unwrap();
276
277 result
278 }
279
280 fn build_hash_index(&self, data: &mut Vec<u8>) {
282 let num_buckets = ((self.entry_count as f64 * DEFAULT_HASH_BUCKET_RATIO) as usize).max(1);
284 let mut buckets = vec![0xFFu8; num_buckets]; for (key_idx, key) in self.keys_for_hash.iter().enumerate() {
288 let restart_idx = key_idx / self.restart_interval;
289 let bucket = Self::hash_key(key) as usize % num_buckets;
290
291 let mut probe = bucket;
293 for _ in 0..num_buckets {
294 if buckets[probe] == 0xFF {
295 buckets[probe] = restart_idx as u8;
296 break;
297 }
298 probe = (probe + 1) % num_buckets;
299 }
300 }
301
302 data.extend_from_slice(&buckets);
304 data.write_u32::<LittleEndian>(num_buckets as u32).unwrap();
305 }
306
307 fn hash_key(key: &[u8]) -> u32 {
309 twox_hash::xxh3::hash64(key) as u32
311 }
312
313 pub fn is_empty(&self) -> bool {
315 self.entry_count == 0
316 }
317
318 pub fn estimated_size(&self) -> usize {
320 self.buffer.len() + self.restarts.len() * 4 + 4
321 }
322
323 pub fn reset(&mut self) {
325 self.buffer.clear();
326 self.restarts.clear();
327 self.restarts.push(0);
328 self.entries_since_restart = 0;
329 self.last_key.clear();
330 self.entry_count = 0;
331 self.keys_for_hash.clear();
332 }
333}
334
335impl Default for BlockBuilder {
336 fn default() -> Self {
337 Self::new(DEFAULT_RESTART_INTERVAL)
338 }
339}
340
341pub struct Block {
347 data: Vec<u8>,
349 restarts_offset: usize,
351 num_restarts: usize,
353 num_hash_buckets: usize,
355 hash_index_offset: usize,
357}
358
359impl Block {
360 pub fn new(data: Vec<u8>) -> Option<Self> {
362 if data.len() < 4 {
363 return None;
364 }
365
366 let num_restarts = {
368 let mut cursor = Cursor::new(&data[data.len() - 4..]);
369 cursor.read_u32::<LittleEndian>().ok()? as usize
370 };
371
372 if num_restarts == 0 || data.len() < 4 + num_restarts * 4 {
373 return None;
374 }
375
376 let restarts_offset = data.len() - 4 - num_restarts * 4;
377
378 let num_hash_buckets = 0;
381 let hash_index_offset = restarts_offset;
382
383 Some(Self {
384 data,
385 restarts_offset,
386 num_restarts,
387 num_hash_buckets,
388 hash_index_offset,
389 })
390 }
391
392 fn restart_offset(&self, index: usize) -> u32 {
394 debug_assert!(index < self.num_restarts);
395 let offset = self.restarts_offset + index * 4;
396 let mut cursor = Cursor::new(&self.data[offset..offset + 4]);
397 cursor.read_u32::<LittleEndian>().unwrap()
398 }
399
400 pub fn seek(&self, target: &[u8]) -> BlockIterator<'_> {
404 let mut left = 0;
406 let mut right = self.num_restarts;
407
408 while left < right {
409 let mid = left + (right - left) / 2;
410 let offset = self.restart_offset(mid) as usize;
411
412 let key = self.read_key_at(offset);
414
415 match key.as_slice().cmp(target) {
416 Ordering::Less => left = mid + 1,
417 Ordering::Greater => right = mid,
418 Ordering::Equal => {
419 return BlockIterator::new(self, offset);
420 }
421 }
422 }
423
424 let start_restart = if left > 0 { left - 1 } else { 0 };
427 let start_offset = self.restart_offset(start_restart) as usize;
428
429 let mut iter = BlockIterator::new(self, start_offset);
430 while iter.valid() {
431 if iter.key() >= target {
432 break;
433 }
434 iter.next();
435 }
436 iter
437 }
438
439 pub fn get(&self, key: &[u8]) -> Option<Vec<u8>> {
443 let mut iter = self.seek(key);
447 if iter.valid() && iter.key() == key {
448 Some(iter.value().to_vec())
449 } else {
450 None
451 }
452 }
453
454 fn read_key_at(&self, offset: usize) -> Vec<u8> {
456 let mut cursor = Cursor::new(&self.data[offset..self.restarts_offset]);
457
458 let shared = decode_varint(&mut cursor).unwrap_or(0) as usize;
459 let non_shared = decode_varint(&mut cursor).unwrap_or(0) as usize;
460 let _value_len = decode_varint(&mut cursor);
461
462 debug_assert_eq!(shared, 0, "Expected restart point (shared = 0)");
463
464 let pos = cursor.position() as usize;
465 self.data[offset + pos..offset + pos + non_shared].to_vec()
466 }
467
468 pub fn iter(&self) -> BlockIterator<'_> {
470 BlockIterator::new(self, 0)
471 }
472
473 pub fn data(&self) -> &[u8] {
475 &self.data
476 }
477}
478
479pub struct BlockIterator<'a> {
485 block: &'a Block,
486 offset: usize,
488 key: Vec<u8>,
490 value_start: usize,
492 value_len: usize,
493 valid: bool,
495}
496
497impl<'a> BlockIterator<'a> {
498 pub fn new(block: &'a Block, offset: usize) -> Self {
500 let mut iter = Self {
501 block,
502 offset,
503 key: Vec::new(),
504 value_start: 0,
505 value_len: 0,
506 valid: false,
507 };
508 iter.parse_entry();
509 iter
510 }
511
512 pub fn valid(&self) -> bool {
514 self.valid
515 }
516
517 pub fn key(&self) -> &[u8] {
519 &self.key
520 }
521
522 pub fn value(&self) -> &[u8] {
524 &self.block.data[self.value_start..self.value_start + self.value_len]
525 }
526
527 pub fn next(&mut self) {
529 if !self.valid {
530 return;
531 }
532
533 self.offset = self.value_start + self.value_len;
535 self.parse_entry();
536 }
537
538 fn parse_entry(&mut self) {
540 if self.offset >= self.block.restarts_offset {
541 self.valid = false;
542 return;
543 }
544
545 let mut cursor = Cursor::new(&self.block.data[self.offset..self.block.restarts_offset]);
546
547 let shared = match decode_varint(&mut cursor) {
549 Some(v) => v as usize,
550 None => {
551 self.valid = false;
552 return;
553 }
554 };
555 let non_shared = match decode_varint(&mut cursor) {
556 Some(v) => v as usize,
557 None => {
558 self.valid = false;
559 return;
560 }
561 };
562 let value_len = match decode_varint(&mut cursor) {
563 Some(v) => v as usize,
564 None => {
565 self.valid = false;
566 return;
567 }
568 };
569
570 let header_len = cursor.position() as usize;
571 let data_start = self.offset + header_len;
572
573 if data_start + non_shared + value_len > self.block.restarts_offset {
575 self.valid = false;
576 return;
577 }
578
579 self.key.truncate(shared);
581 self.key
582 .extend_from_slice(&self.block.data[data_start..data_start + non_shared]);
583
584 self.value_start = data_start + non_shared;
586 self.value_len = value_len;
587 self.valid = true;
588 }
589
590 pub fn seek(&mut self, target: &[u8]) {
592 let new_iter = self.block.seek(target);
594 self.offset = new_iter.offset;
595 self.key = new_iter.key;
596 self.value_start = new_iter.value_start;
597 self.value_len = new_iter.value_len;
598 self.valid = new_iter.valid;
599 }
600
601 pub fn seek_to_first(&mut self) {
603 self.offset = 0;
604 self.key.clear();
605 self.parse_entry();
606 }
607}
608
609fn encode_varint(buf: &mut Vec<u8>, mut value: u64) {
615 while value >= 0x80 {
616 buf.push((value as u8) | 0x80);
617 value >>= 7;
618 }
619 buf.push(value as u8);
620}
621
622fn decode_varint<R: Read>(reader: &mut R) -> Option<u64> {
624 let mut result: u64 = 0;
625 let mut shift = 0;
626
627 loop {
628 let byte = reader.read_u8().ok()?;
629 result |= ((byte & 0x7F) as u64) << shift;
630 if byte & 0x80 == 0 {
631 break;
632 }
633 shift += 7;
634 if shift >= 64 {
635 return None; }
637 }
638
639 Some(result)
640}
641
642#[cfg(test)]
647mod tests {
648 use super::*;
649
650 #[test]
651 fn test_block_builder_single_entry() {
652 let mut builder = BlockBuilder::new(16);
653 builder.add(b"key1", b"value1");
654
655 let data = builder.finish();
656 let block = Block::new(data).unwrap();
657
658 assert_eq!(block.get(b"key1"), Some(b"value1".to_vec()));
659 assert_eq!(block.get(b"key2"), None);
660 }
661
662 #[test]
663 fn test_block_builder_multiple_entries() {
664 let mut builder = BlockBuilder::new(4);
665
666 for i in 0..20 {
667 let key = format!("key{:02}", i);
668 let value = format!("value{:02}", i);
669 builder.add(key.as_bytes(), value.as_bytes());
670 }
671
672 let data = builder.finish();
673 let block = Block::new(data).unwrap();
674
675 for i in 0..20 {
677 let key = format!("key{:02}", i);
678 let expected_value = format!("value{:02}", i);
679 assert_eq!(block.get(key.as_bytes()), Some(expected_value.into_bytes()));
680 }
681 }
682
683 #[test]
684 fn test_block_iterator() {
685 let mut builder = BlockBuilder::new(4);
686
687 builder.add(b"apple", b"1");
688 builder.add(b"banana", b"2");
689 builder.add(b"cherry", b"3");
690 builder.add(b"date", b"4");
691
692 let data = builder.finish();
693 let block = Block::new(data).unwrap();
694
695 let mut iter = block.iter();
696 let mut count = 0;
697 while iter.valid() {
698 count += 1;
699 iter.next();
700 }
701 assert_eq!(count, 4);
702 }
703
704 #[test]
705 fn test_block_seek() {
706 let mut builder = BlockBuilder::new(2);
707
708 builder.add(b"a", b"1");
709 builder.add(b"c", b"2");
710 builder.add(b"e", b"3");
711 builder.add(b"g", b"4");
712
713 let data = builder.finish();
714 let block = Block::new(data).unwrap();
715
716 let iter = block.seek(b"c");
718 assert!(iter.valid());
719 assert_eq!(iter.key(), b"c");
720
721 let iter = block.seek(b"d");
723 assert!(iter.valid());
724 assert_eq!(iter.key(), b"e");
725
726 let iter = block.seek(b"z");
728 assert!(!iter.valid());
729 }
730
731 #[test]
732 fn test_prefix_compression() {
733 let mut builder = BlockBuilder::new(16);
734
735 builder.add(b"user:1000:age", b"30");
737 builder.add(b"user:1000:email", b"alice@example.com");
738 builder.add(b"user:1000:name", b"Alice");
739 builder.add(b"user:1001:name", b"Bob");
740
741 let data = builder.finish();
742
743 let uncompressed_size =
745 b"user:1000:age".len() + b"30".len() +
746 b"user:1000:email".len() + b"alice@example.com".len() +
747 b"user:1000:name".len() + b"Alice".len() +
748 b"user:1001:name".len() + b"Bob".len();
749
750 assert!(data.len() < uncompressed_size + 50);
753
754 let block = Block::new(data).unwrap();
756 assert_eq!(block.get(b"user:1000:age"), Some(b"30".to_vec()));
757 assert_eq!(block.get(b"user:1000:email"), Some(b"alice@example.com".to_vec()));
758 assert_eq!(block.get(b"user:1000:name"), Some(b"Alice".to_vec()));
759 assert_eq!(block.get(b"user:1001:name"), Some(b"Bob".to_vec()));
760 }
761
762 #[test]
763 fn test_varint_encoding() {
764 let test_values = [0, 1, 127, 128, 255, 256, 16383, 16384, u64::MAX];
765
766 for &value in &test_values {
767 let mut buf = Vec::new();
768 encode_varint(&mut buf, value);
769
770 let mut cursor = Cursor::new(&buf);
771 let decoded = decode_varint(&mut cursor).unwrap();
772
773 assert_eq!(value, decoded, "Failed for value {}", value);
774 }
775 }
776
777 #[test]
778 fn test_block_handle() {
779 let handle = BlockHandle::new(12345, 67890);
780 let encoded = handle.encode();
781
782 let (decoded, len) = BlockHandle::decode(&encoded).unwrap();
783 assert_eq!(handle, decoded);
784 assert_eq!(len, encoded.len());
785 }
786}