1use crate::{
43 error::Error,
44 storage::sstable::bti::{
45 encoder::ByteComparableEncoder,
46 node::{
47 BtiNode, BtiNodeData, BtiNodeType, BtiResult, PayloadRef, SizedPointer, Transition,
48 TrieNavigator,
49 },
50 },
51 types::Value,
52};
53use std::collections::HashMap;
54use std::io::{Read, Seek, SeekFrom};
55
56fn classify_node_nibble(nibble: u8) -> BtiResult<BtiNodeType> {
69 match nibble {
70 0 => Ok(BtiNodeType::PayloadOnly),
71 1..=4 => Ok(BtiNodeType::Single),
72 5..=9 => Ok(BtiNodeType::Sparse),
73 10..=15 => Ok(BtiNodeType::Dense),
74 other => Err(Error::Parse(format!(
75 "Invalid BTI node type nibble: {}",
76 other
77 ))),
78 }
79}
80
81fn pointer_bytes_for_ordinal(ordinal: u8) -> u8 {
88 match ordinal {
89 0 => 0, 1 => 0, 2 => 1, 3 => 0, 4 => 2, 5 => 1, 6 => 0, 7 => 2, 8 => 3, 9 => 5, 10 => 0, 11 => 2, 12 => 3, 13 => 4, 14 => 5, 15 => 8, _ => 0,
106 }
107}
108
109fn parse_bti_node(data: &[u8], offset: u64) -> BtiResult<BtiNode> {
126 if data.is_empty() {
127 return Err(Error::Parse("Empty BTI node data".to_string()));
128 }
129
130 let header_byte = data[0];
131 let ordinal = (header_byte >> 4) & 0x0F;
132 let payload_flags = header_byte & 0x0F;
133 let has_payload = payload_flags != 0;
134 let node_type = classify_node_nibble(ordinal)?;
135
136 match node_type {
137 BtiNodeType::PayloadOnly => {
138 if !has_payload {
141 return Err(Error::Parse(
142 "PayloadOnly node has no payload flags set".to_string(),
143 ));
144 }
145 let payload = parse_payload_ref(&data[1..])?;
146 Ok(BtiNode {
147 node_type,
148 level: 0,
149 key_prefix: Vec::new(),
150 data: BtiNodeData::PayloadOnly { payload },
151 })
152 }
153
154 BtiNodeType::Single => {
155 match ordinal {
174 1 => {
175 if data.len() < 2 {
177 return Err(Error::Parse(
178 "SingleNoPayload4 node data too short".to_string(),
179 ));
180 }
181 let delta = (header_byte & 0x0F) as u64;
182 let transition_byte = data[1];
183 let child_offset = offset.saturating_sub(delta);
184 Ok(BtiNode {
185 node_type,
186 level: 1,
187 key_prefix: Vec::new(),
188 data: BtiNodeData::Single {
189 transition: Transition::new(
190 transition_byte,
191 SizedPointer::new(child_offset),
192 ),
193 },
194 })
195 }
196 3 => {
197 if data.len() < 3 {
199 return Err(Error::Parse(
200 "SingleNoPayload12 node data too short".to_string(),
201 ));
202 }
203 let delta = (((header_byte & 0x0F) as u64) << 8) | (data[1] as u64);
204 let transition_byte = data[2];
205 let child_offset = offset.saturating_sub(delta);
206 Ok(BtiNode {
207 node_type,
208 level: 1,
209 key_prefix: Vec::new(),
210 data: BtiNodeData::Single {
211 transition: Transition::new(
212 transition_byte,
213 SizedPointer::new(child_offset),
214 ),
215 },
216 })
217 }
218 _ => {
219 let ptr_bytes = pointer_bytes_for_ordinal(ordinal) as usize;
221 let needed = 2 + ptr_bytes;
222 if data.len() < needed {
223 return Err(Error::Parse(format!(
224 "Single node (ordinal {}) data too short: need {} bytes, have {}",
225 ordinal,
226 needed,
227 data.len()
228 )));
229 }
230 let transition_byte = data[1];
231 let delta = read_be_unsigned(&data[2..2 + ptr_bytes]);
232 let child_offset = offset.saturating_sub(delta);
233 let transition =
234 Transition::new(transition_byte, SizedPointer::new(child_offset));
235 Ok(BtiNode {
236 node_type,
237 level: 1,
238 key_prefix: Vec::new(),
239 data: BtiNodeData::Single { transition },
240 })
241 }
242 }
243 }
244
245 BtiNodeType::Sparse => {
246 if data.len() < 2 {
257 return Err(Error::Parse("Sparse node data too short".to_string()));
258 }
259 let count = data[1] as usize;
260 if count == 0 {
261 return Err(Error::Parse(
262 "Sparse node must have at least one transition".to_string(),
263 ));
264 }
265
266 let bytes_start = 2;
267 let pointers_start = bytes_start + count;
268
269 if ordinal == 6 {
270 let packed_len = (count * 3).div_ceil(2); let needed = pointers_start + packed_len;
273 if data.len() < needed {
274 return Err(Error::Parse(format!(
275 "Sparse12 node data too short: need {}, have {}",
276 needed,
277 data.len()
278 )));
279 }
280 let mut transitions = Vec::with_capacity(count);
281 for i in 0..count {
282 let t_byte = data[bytes_start + i];
283 let delta = read_12bit_packed(&data[pointers_start..], i);
284 let child_offset = offset.saturating_sub(delta);
285 transitions.push(Transition::new(t_byte, SizedPointer::new(child_offset)));
286 }
287 Ok(BtiNode {
288 node_type,
289 level: 1,
290 key_prefix: Vec::new(),
291 data: BtiNodeData::Sparse { transitions },
292 })
293 } else {
294 let ptr_bytes = pointer_bytes_for_ordinal(ordinal) as usize;
295 let needed = pointers_start + count * ptr_bytes;
296 if data.len() < needed {
297 return Err(Error::Parse(format!(
298 "Sparse node (ordinal {}) data too short: need {}, have {}",
299 ordinal,
300 needed,
301 data.len()
302 )));
303 }
304 let mut transitions = Vec::with_capacity(count);
305 for i in 0..count {
306 let t_byte = data[bytes_start + i];
307 let ptr_off = pointers_start + i * ptr_bytes;
308 let delta = read_be_unsigned(&data[ptr_off..ptr_off + ptr_bytes]);
309 let child_offset = offset.saturating_sub(delta);
310 transitions.push(Transition::new(t_byte, SizedPointer::new(child_offset)));
311 }
312 Ok(BtiNode {
313 node_type,
314 level: 1,
315 key_prefix: Vec::new(),
316 data: BtiNodeData::Sparse { transitions },
317 })
318 }
319 }
320
321 BtiNodeType::Dense => {
322 if data.len() < 3 {
332 return Err(Error::Parse("Dense node data too short".to_string()));
333 }
334 let start_byte = data[1];
335 let range_len = data[2] as usize + 1; if ordinal == 10 {
338 let packed_len = (range_len * 3).div_ceil(2);
340 let needed = 3 + packed_len;
341 if data.len() < needed {
342 return Err(Error::Parse(format!(
343 "Dense12 node data too short: need {}, have {}",
344 needed,
345 data.len()
346 )));
347 }
348 let mut children = Vec::with_capacity(range_len);
349 for i in 0..range_len {
350 let delta = read_12bit_packed(&data[3..], i);
351 let child_offset = if delta == 0 {
353 0
354 } else {
355 offset.saturating_sub(delta)
356 };
357 children.push(SizedPointer::new(child_offset));
358 }
359 Ok(BtiNode {
360 node_type,
361 level: 1,
362 key_prefix: Vec::new(),
363 data: BtiNodeData::Dense {
364 start_byte,
365 children,
366 },
367 })
368 } else {
369 let ptr_bytes = pointer_bytes_for_ordinal(ordinal) as usize;
370 let needed = 3 + range_len * ptr_bytes;
371 if data.len() < needed {
372 return Err(Error::Parse(format!(
373 "Dense node (ordinal {}) data too short: need {}, have {}",
374 ordinal,
375 needed,
376 data.len()
377 )));
378 }
379 let mut children = Vec::with_capacity(range_len);
380 for i in 0..range_len {
381 let ptr_off = 3 + i * ptr_bytes;
382 let delta = read_be_unsigned(&data[ptr_off..ptr_off + ptr_bytes]);
383 let child_offset = if delta == 0 {
385 0
386 } else {
387 offset.saturating_sub(delta)
388 };
389 children.push(SizedPointer::new(child_offset));
390 }
391 Ok(BtiNode {
392 node_type,
393 level: 1,
394 key_prefix: Vec::new(),
395 data: BtiNodeData::Dense {
396 start_byte,
397 children,
398 },
399 })
400 }
401 }
402 }
403}
404
405fn read_be_unsigned(data: &[u8]) -> u64 {
410 let mut result = 0u64;
411 for &byte in data {
412 result = (result << 8) | (byte as u64);
413 }
414 result
415}
416
417fn read_12bit_packed(data: &[u8], index: usize) -> u64 {
426 let byte_offset = (3 * index) / 2;
427 if byte_offset + 1 >= data.len() {
428 return 0;
429 }
430 let word = ((data[byte_offset] as u16) << 8) | (data[byte_offset + 1] as u16);
431 let value = if (index & 1) == 0 {
432 word >> 4
433 } else {
434 word & 0x0FFF
435 };
436 value as u64
437}
438
439fn parse_payload_ref(data: &[u8]) -> BtiResult<PayloadRef> {
445 if data.len() < 12 {
446 return Err(Error::Parse(format!(
447 "PayloadRef data too short: need 12 bytes, have {}",
448 data.len()
449 )));
450 }
451 let offset = u64::from_be_bytes([
452 data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7],
453 ]);
454 let length = u32::from_be_bytes([data[8], data[9], data[10], data[11]]);
455 Ok(PayloadRef::new(offset, length))
456}
457
458#[derive(Debug, Clone)]
460pub struct BtiHeader {
461 pub magic: u32,
463 pub version: u16,
465 pub flags: u16,
467 pub root_offset: u64,
469 pub entry_count: u64,
471 pub metadata_size: u32,
473}
474
475impl BtiHeader {
476 pub const MAGIC: u32 = 0x6461_0000; pub const VERSION: u16 = 0x0001;
481
482 pub fn parse(data: &[u8]) -> BtiResult<(Self, usize)> {
484 if data.len() < 24 {
485 return Err(Error::Parse("BTI header too short".to_string()));
486 }
487
488 let magic = u32::from_be_bytes([data[0], data[1], data[2], data[3]]);
489 if magic != Self::MAGIC {
490 return Err(Error::Parse(format!(
491 "Invalid BTI magic: 0x{:08x}, expected 0x{:08x}",
492 magic,
493 Self::MAGIC
494 )));
495 }
496
497 let version = u16::from_be_bytes([data[4], data[5]]);
498 if version != Self::VERSION {
499 return Err(Error::Parse(format!(
500 "Unsupported BTI version: 0x{:04x}, expected 0x{:04x}",
501 version,
502 Self::VERSION
503 )));
504 }
505
506 let flags = u16::from_be_bytes([data[6], data[7]]);
507 let root_offset = u64::from_be_bytes([
508 data[8], data[9], data[10], data[11], data[12], data[13], data[14], data[15],
509 ]);
510 let entry_count = u64::from_be_bytes([
511 data[16], data[17], data[18], data[19], data[20], data[21], data[22], data[23],
512 ]);
513
514 let metadata_size = if data.len() >= 28 {
515 u32::from_be_bytes([data[24], data[25], data[26], data[27]])
516 } else {
517 0
518 };
519
520 let header = BtiHeader {
521 magic,
522 version,
523 flags,
524 root_offset,
525 entry_count,
526 metadata_size,
527 };
528
529 let header_size = if metadata_size > 0 { 28 } else { 24 };
530 Ok((header, header_size))
531 }
532
533 pub fn to_bytes(&self) -> Vec<u8> {
535 let mut bytes = Vec::with_capacity(28);
536
537 bytes.extend_from_slice(&self.magic.to_be_bytes());
538 bytes.extend_from_slice(&self.version.to_be_bytes());
539 bytes.extend_from_slice(&self.flags.to_be_bytes());
540 bytes.extend_from_slice(&self.root_offset.to_be_bytes());
541 bytes.extend_from_slice(&self.entry_count.to_be_bytes());
542
543 if self.metadata_size > 0 {
544 bytes.extend_from_slice(&self.metadata_size.to_be_bytes());
545 }
546
547 bytes
548 }
549}
550
551pub struct PartitionsParser<R: Read + Seek> {
553 reader: R,
555 header: BtiHeader,
557 encoder: ByteComparableEncoder,
559 node_cache: HashMap<u64, BtiNode>,
561}
562
563impl<R: Read + Seek> PartitionsParser<R> {
564 pub fn new(mut reader: R) -> BtiResult<Self> {
566 reader.seek(SeekFrom::Start(0))?;
568 let mut header_data = vec![0u8; 28];
569 reader.read_exact(&mut header_data)?;
570
571 let (header, _) = BtiHeader::parse(&header_data)?;
572
573 Ok(Self {
574 reader,
575 header,
576 encoder: ByteComparableEncoder::new(),
577 node_cache: HashMap::new(),
578 })
579 }
580
581 pub fn lookup_partition(&mut self, partition_key: &[Value]) -> BtiResult<Option<PayloadRef>> {
583 let encoded_key = self.encoder.encode_composite_key(partition_key)?;
585
586 let mut navigator = TrieNavigator::new(self.header.root_offset);
588
589 self.lookup_in_trie(&mut navigator, &encoded_key)
590 }
591
592 fn lookup_in_trie(
594 &mut self,
595 navigator: &mut TrieNavigator,
596 encoded_key: &[u8],
597 ) -> BtiResult<Option<PayloadRef>> {
598 let mut key_pos = 0;
599
600 loop {
601 let current_node = self.load_node(navigator.current_offset)?;
603
604 if current_node.is_leaf() {
606 return Ok(current_node.get_payload().cloned());
607 }
608
609 if let Some(payload) = current_node.get_payload() {
611 if key_pos >= encoded_key.len() {
612 return Ok(Some(payload.clone()));
613 }
614 }
615
616 if key_pos >= encoded_key.len() {
618 return Ok(current_node.get_payload().cloned());
619 }
620
621 let next_byte = encoded_key[key_pos];
623 if let Some(child_pointer) = current_node.find_child(next_byte) {
624 navigator.navigate_to_child(next_byte, child_pointer)?;
625 key_pos += 1;
626 } else {
627 return Ok(None);
629 }
630 }
631 }
632
633 fn load_node(&mut self, offset: u64) -> BtiResult<BtiNode> {
635 if let Some(cached_node) = self.node_cache.get(&offset) {
636 return Ok(cached_node.clone());
637 }
638
639 self.reader.seek(SeekFrom::Start(offset))?;
641 let mut node_data = vec![0u8; 4096]; let bytes_read = self.reader.read(&mut node_data)?;
643 node_data.truncate(bytes_read);
644
645 let node = self.parse_node_data(&node_data, offset)?;
647
648 self.node_cache.insert(offset, node.clone());
650 Ok(node)
651 }
652
653 fn parse_node_data(&self, data: &[u8], offset: u64) -> BtiResult<BtiNode> {
658 parse_bti_node(data, offset)
659 }
660
661 pub fn iterate_partitions(&mut self) -> BtiResult<PartitionIterator<'_, R>> {
663 PartitionIterator::new(self)
664 }
665
666 pub fn header(&self) -> &BtiHeader {
668 &self.header
669 }
670
671 pub fn get_stats(&self) -> BtiIndexStats {
673 BtiIndexStats {
674 entry_count: self.header.entry_count,
675 root_offset: self.header.root_offset,
676 cached_nodes: self.node_cache.len(),
677 }
678 }
679}
680
681pub struct RowsParser<R: Read + Seek> {
683 reader: R,
685 header: BtiHeader,
687 encoder: ByteComparableEncoder,
689 node_cache: HashMap<u64, BtiNode>,
691}
692
693impl<R: Read + Seek> RowsParser<R> {
694 pub fn new(mut reader: R) -> BtiResult<Self> {
696 reader.seek(SeekFrom::Start(0))?;
698 let mut header_data = vec![0u8; 28];
699 reader.read_exact(&mut header_data)?;
700
701 let (header, _) = BtiHeader::parse(&header_data)?;
702
703 Ok(Self {
704 reader,
705 header,
706 encoder: ByteComparableEncoder::new(),
707 node_cache: HashMap::new(),
708 })
709 }
710
711 pub fn lookup_row(&mut self, clustering_key: &[Value]) -> BtiResult<Option<PayloadRef>> {
713 let encoded_key = self.encoder.encode_composite_key(clustering_key)?;
715
716 let mut navigator = TrieNavigator::new(self.header.root_offset);
718
719 self.lookup_in_trie(&mut navigator, &encoded_key)
720 }
721
722 fn lookup_in_trie(
724 &mut self,
725 navigator: &mut TrieNavigator,
726 encoded_key: &[u8],
727 ) -> BtiResult<Option<PayloadRef>> {
728 let mut key_pos = 0;
729
730 loop {
731 let current_node = self.load_node(navigator.current_offset)?;
733
734 if let Some(payload) = current_node.get_payload() {
736 if key_pos >= encoded_key.len() {
737 return Ok(Some(payload.clone()));
738 }
739 }
740
741 if key_pos >= encoded_key.len() {
743 return Ok(current_node.get_payload().cloned());
744 }
745
746 let next_byte = encoded_key[key_pos];
748 if let Some(child_pointer) = current_node.find_child(next_byte) {
749 navigator.navigate_to_child(next_byte, child_pointer)?;
750 key_pos += 1;
751 } else {
752 return Ok(None);
754 }
755 }
756 }
757
758 fn load_node(&mut self, offset: u64) -> BtiResult<BtiNode> {
760 if let Some(cached_node) = self.node_cache.get(&offset) {
761 return Ok(cached_node.clone());
762 }
763
764 self.reader.seek(SeekFrom::Start(offset))?;
766 let mut node_data = vec![0u8; 4096]; let bytes_read = self.reader.read(&mut node_data)?;
768 node_data.truncate(bytes_read);
769
770 let node = self.parse_node_data(&node_data, offset)?;
772
773 self.node_cache.insert(offset, node.clone());
775 Ok(node)
776 }
777
778 fn parse_node_data(&self, data: &[u8], offset: u64) -> BtiResult<BtiNode> {
786 parse_bti_node(data, offset)
787 }
788
789 pub fn range_query(
791 &mut self,
792 start_key: &[Value],
793 end_key: &[Value],
794 ) -> BtiResult<Vec<PayloadRef>> {
795 let _encoded_start = self.encoder.encode_composite_key(start_key)?;
796 let _encoded_end = self.encoder.encode_composite_key(end_key)?;
797
798 let results = Vec::new();
799
800 let _navigator = TrieNavigator::new(self.header.root_offset);
802
803 Ok(results)
807 }
808
809 pub fn iterate_rows(&mut self) -> BtiResult<RowIterator<'_, R>> {
811 RowIterator::new(self)
812 }
813
814 pub fn header(&self) -> &BtiHeader {
816 &self.header
817 }
818}
819
820pub struct PartitionIterator<'a, R: Read + Seek> {
822 #[allow(dead_code)]
823 parser: &'a mut PartitionsParser<R>,
824 #[allow(dead_code)]
825 current_position: u64,
826 finished: bool,
827}
828
829impl<'a, R: Read + Seek> PartitionIterator<'a, R> {
830 fn new(parser: &'a mut PartitionsParser<R>) -> BtiResult<Self> {
831 let root_offset = parser.header.root_offset;
832 Ok(Self {
833 parser,
834 current_position: root_offset,
835 finished: false,
836 })
837 }
838}
839
840impl<'a, R: Read + Seek> Iterator for PartitionIterator<'a, R> {
841 type Item = BtiResult<(Vec<u8>, PayloadRef)>;
842
843 fn next(&mut self) -> Option<Self::Item> {
844 if self.finished {
845 return None;
846 }
847
848 self.finished = true;
851 None
852 }
853}
854
855pub struct RowIterator<'a, R: Read + Seek> {
857 #[allow(dead_code)]
858 parser: &'a mut RowsParser<R>,
859 #[allow(dead_code)]
860 current_position: u64,
861 finished: bool,
862}
863
864impl<'a, R: Read + Seek> RowIterator<'a, R> {
865 fn new(parser: &'a mut RowsParser<R>) -> BtiResult<Self> {
866 let root_offset = parser.header.root_offset;
867 Ok(Self {
868 parser,
869 current_position: root_offset,
870 finished: false,
871 })
872 }
873}
874
875impl<'a, R: Read + Seek> Iterator for RowIterator<'a, R> {
876 type Item = BtiResult<(Vec<u8>, PayloadRef)>;
877
878 fn next(&mut self) -> Option<Self::Item> {
879 if self.finished {
880 return None;
881 }
882
883 self.finished = true;
886 None
887 }
888}
889
890#[derive(Debug, Clone)]
892pub struct BtiIndexStats {
893 pub entry_count: u64,
895 pub root_offset: u64,
897 pub cached_nodes: usize,
899}
900
901#[cfg(test)]
902mod tests {
903 use super::*;
904 use std::io::Cursor;
905
906 fn make_bti_file(root_node_bytes: Vec<u8>) -> Vec<u8> {
911 let root_offset: u64 = 64; let mut data = Vec::new();
913 data.extend_from_slice(&BtiHeader::MAGIC.to_be_bytes());
914 data.extend_from_slice(&BtiHeader::VERSION.to_be_bytes());
915 data.extend_from_slice(&0u16.to_be_bytes()); data.extend_from_slice(&root_offset.to_be_bytes());
917 data.extend_from_slice(&1u64.to_be_bytes()); data.extend_from_slice(&0u32.to_be_bytes()); while data.len() < root_offset as usize {
920 data.push(0);
921 }
922 data.extend(root_node_bytes);
923 data
924 }
925
926 fn payload_only_node(data_offset: u64, length: u32) -> Vec<u8> {
933 let mut v = vec![0x01u8]; v.extend_from_slice(&data_offset.to_be_bytes());
935 v.extend_from_slice(&length.to_be_bytes());
936 v
937 }
938
939 fn single8_node(payload_flags: u8, transition: u8, delta: u8) -> Vec<u8> {
941 vec![0x20 | (payload_flags & 0x0F), transition, delta]
942 }
943
944 fn single_nopayload4_node(delta4: u8, transition: u8) -> Vec<u8> {
947 vec![0x10 | (delta4 & 0x0F), transition]
948 }
949
950 fn single_nopayload12_node(delta: u16, transition: u8) -> Vec<u8> {
953 vec![
954 0x30 | ((delta >> 8) as u8 & 0x0F),
955 (delta & 0xFF) as u8,
956 transition,
957 ]
958 }
959
960 fn single16_node(payload_flags: u8, transition: u8, delta: u16) -> Vec<u8> {
962 let mut v = vec![0x40 | (payload_flags & 0x0F), transition];
963 v.extend_from_slice(&delta.to_be_bytes());
964 v
965 }
966
967 fn sparse8_node(payload_flags: u8, pairs: &[(u8, u8)]) -> Vec<u8> {
969 let mut v = vec![0x50 | (payload_flags & 0x0F), pairs.len() as u8];
970 for &(t, _) in pairs {
971 v.push(t);
972 }
973 for &(_, d) in pairs {
974 v.push(d);
975 }
976 v
977 }
978
979 fn dense16_node(payload_flags: u8, start: u8, deltas: &[u16]) -> Vec<u8> {
981 let len = deltas.len() as u8;
982 let mut v = vec![0xB0 | (payload_flags & 0x0F), start, len - 1];
983 for &d in deltas {
984 v.extend_from_slice(&d.to_be_bytes());
985 }
986 v
987 }
988
989 fn long_dense_node(payload_flags: u8, start: u8, deltas: &[u64]) -> Vec<u8> {
991 let len = deltas.len() as u8;
992 let mut v = vec![0xF0 | (payload_flags & 0x0F), start, len - 1];
993 for &d in deltas {
994 v.extend_from_slice(&d.to_be_bytes());
995 }
996 v
997 }
998
999 fn sparse12_node(payload_flags: u8, pairs: &[(u8, u16)]) -> Vec<u8> {
1011 let count = pairs.len();
1012 let mut v = vec![0x60 | (payload_flags & 0x0F), count as u8];
1013 for &(t, _) in pairs {
1014 v.push(t);
1015 }
1016 let mut i = 0;
1018 while i + 2 <= count {
1019 let p0 = pairs[i].1 as u32;
1020 let p1 = pairs[i + 1].1 as u32;
1021 v.push((p0 >> 4) as u8);
1022 v.push(((p0 << 4) | (p1 >> 8)) as u8);
1023 v.push((p1 & 0xFF) as u8);
1024 i += 2;
1025 }
1026 if i < count {
1027 let pd = pairs[i].1 as u32;
1029 let s = (pd << 4) as u16;
1030 v.extend_from_slice(&s.to_be_bytes());
1031 }
1032 v
1033 }
1034
1035 fn sparse24_node(payload_flags: u8, pairs: &[(u8, u32)]) -> Vec<u8> {
1037 let count = pairs.len();
1038 let mut v = vec![0x80 | (payload_flags & 0x0F), count as u8];
1039 for &(t, _) in pairs {
1040 v.push(t);
1041 }
1042 for &(_, d) in pairs {
1043 v.push(((d >> 16) & 0xFF) as u8);
1045 v.push(((d >> 8) & 0xFF) as u8);
1046 v.push((d & 0xFF) as u8);
1047 }
1048 v
1049 }
1050
1051 fn sparse40_node(payload_flags: u8, pairs: &[(u8, u64)]) -> Vec<u8> {
1053 let count = pairs.len();
1054 let mut v = vec![0x90 | (payload_flags & 0x0F), count as u8];
1055 for &(t, _) in pairs {
1056 v.push(t);
1057 }
1058 for &(_, d) in pairs {
1059 v.push(((d >> 32) & 0xFF) as u8);
1061 v.push(((d >> 24) & 0xFF) as u8);
1062 v.push(((d >> 16) & 0xFF) as u8);
1063 v.push(((d >> 8) & 0xFF) as u8);
1064 v.push((d & 0xFF) as u8);
1065 }
1066 v
1067 }
1068
1069 fn dense12_node(payload_flags: u8, start: u8, deltas: &[u16]) -> Vec<u8> {
1076 let range_len = deltas.len();
1077 let mut v = vec![0xA0 | (payload_flags & 0x0F), start, (range_len - 1) as u8];
1078 let mut carry: u8 = 0;
1079 for (i, &d) in deltas.iter().enumerate() {
1080 let val = d as u32;
1081 if (i & 1) == 0 {
1082 v.push((val >> 4) as u8);
1083 carry = (val << 4) as u8;
1084 } else {
1085 v.push(carry | (val >> 8) as u8);
1086 v.push((val & 0xFF) as u8);
1087 carry = 0;
1088 }
1089 }
1090 if (range_len & 1) == 1 {
1092 v.push(carry);
1093 }
1094 v
1095 }
1096
1097 #[test]
1107 fn regression_rows_parser_single_node_not_mislabeled_as_payload_only() {
1108 let node_bytes = single8_node(0, b'a', 5);
1117 let offset: u64 = 100;
1118
1119 let node = parse_bti_node(&node_bytes, offset)
1120 .expect("parse_bti_node must succeed for a valid Single8 node");
1121
1122 assert_eq!(
1124 node.node_type,
1125 BtiNodeType::Single,
1126 "Single8 node (nibble 0x2) was mislabeled as {:?} — regression from #647 stub",
1127 node.node_type,
1128 );
1129
1130 match &node.data {
1132 BtiNodeData::Single { transition } => {
1133 assert_eq!(transition.byte, b'a');
1134 assert_eq!(
1135 transition.child.distance, 95,
1136 "child offset should be parent(100) - delta(5) = 95"
1137 );
1138 }
1139 other => panic!("Expected BtiNodeData::Single, got {:?}", other),
1140 }
1141 }
1142
1143 #[test]
1148 fn parse_bti_node_payload_only_correct_type_and_offsets() {
1149 let node = payload_only_node(0xDEAD_BEEF_0000_1234, 42);
1151 let parsed = parse_bti_node(&node, 0).unwrap();
1152 assert_eq!(parsed.node_type, BtiNodeType::PayloadOnly);
1153 match &parsed.data {
1154 BtiNodeData::PayloadOnly { payload } => {
1155 assert_eq!(payload.offset, 0xDEAD_BEEF_0000_1234);
1156 assert_eq!(payload.length, 42);
1157 }
1158 other => panic!("Expected PayloadOnly, got {:?}", other),
1159 }
1160 }
1161
1162 #[test]
1163 fn parse_bti_node_payload_only_no_payload_flags_is_error() {
1164 let node_bytes = vec![0x00u8]; let err = parse_bti_node(&node_bytes, 0);
1167 assert!(err.is_err(), "PayloadOnly with flags=0 should be an error");
1168 }
1169
1170 #[test]
1175 fn parse_bti_node_single_nopayload4_ordinal1() {
1176 let node_bytes = single_nopayload4_node(3, b'x');
1178 let parsed = parse_bti_node(&node_bytes, 50).unwrap();
1179 assert_eq!(parsed.node_type, BtiNodeType::Single);
1180 match &parsed.data {
1181 BtiNodeData::Single { transition } => {
1182 assert_eq!(transition.byte, b'x');
1183 assert_eq!(transition.child.distance, 47); }
1185 other => panic!("Expected Single, got {:?}", other),
1186 }
1187 }
1188
1189 #[test]
1190 fn parse_bti_node_single8_ordinal2() {
1191 let node_bytes = single8_node(0, b'z', 10);
1192 let parsed = parse_bti_node(&node_bytes, 200).unwrap();
1193 assert_eq!(parsed.node_type, BtiNodeType::Single);
1194 match &parsed.data {
1195 BtiNodeData::Single { transition } => {
1196 assert_eq!(transition.byte, b'z');
1197 assert_eq!(transition.child.distance, 190); }
1199 other => panic!("Expected Single, got {:?}", other),
1200 }
1201 }
1202
1203 #[test]
1204 fn parse_bti_node_single_nopayload12_ordinal3() {
1205 let node_bytes = single_nopayload12_node(0x123, b'k');
1207 let parsed = parse_bti_node(&node_bytes, 1000).unwrap();
1208 assert_eq!(parsed.node_type, BtiNodeType::Single);
1209 match &parsed.data {
1210 BtiNodeData::Single { transition } => {
1211 assert_eq!(transition.byte, b'k');
1212 assert_eq!(transition.child.distance, 1000 - 0x123);
1213 }
1214 other => panic!("Expected Single, got {:?}", other),
1215 }
1216 }
1217
1218 #[test]
1219 fn parse_bti_node_single16_ordinal4() {
1220 let node_bytes = single16_node(0, b'm', 0x0400);
1222 let parsed = parse_bti_node(&node_bytes, 2048).unwrap();
1223 assert_eq!(parsed.node_type, BtiNodeType::Single);
1224 match &parsed.data {
1225 BtiNodeData::Single { transition } => {
1226 assert_eq!(transition.byte, b'm');
1227 assert_eq!(transition.child.distance, 2048 - 1024);
1228 }
1229 other => panic!("Expected Single, got {:?}", other),
1230 }
1231 }
1232
1233 #[test]
1238 fn parse_bti_node_sparse8_ordinal5_two_transitions() {
1239 let node_bytes = sparse8_node(0, &[(b'a', 10), (b'b', 20)]);
1241 let parsed = parse_bti_node(&node_bytes, 100).unwrap();
1242 assert_eq!(parsed.node_type, BtiNodeType::Sparse);
1243 match &parsed.data {
1244 BtiNodeData::Sparse { transitions } => {
1245 assert_eq!(transitions.len(), 2);
1246 assert_eq!(transitions[0].byte, b'a');
1247 assert_eq!(transitions[0].child.distance, 90); assert_eq!(transitions[1].byte, b'b');
1249 assert_eq!(transitions[1].child.distance, 80); }
1251 other => panic!("Expected Sparse, got {:?}", other),
1252 }
1253 }
1254
1255 #[test]
1256 fn parse_bti_node_sparse16_ordinal7_three_transitions() {
1257 let payload_flags = 0u8;
1260 let pairs: &[(u8, u16)] = &[(b'x', 0x0010), (b'y', 0x0020), (b'z', 0x0030)];
1261 let mut node_bytes = vec![0x70 | payload_flags, pairs.len() as u8];
1262 for &(t, _) in pairs {
1263 node_bytes.push(t);
1264 }
1265 for &(_, d) in pairs {
1266 node_bytes.extend_from_slice(&d.to_be_bytes());
1267 }
1268 let parsed = parse_bti_node(&node_bytes, 0x100).unwrap();
1269 assert_eq!(parsed.node_type, BtiNodeType::Sparse);
1270 match &parsed.data {
1271 BtiNodeData::Sparse { transitions } => {
1272 assert_eq!(transitions.len(), 3);
1273 assert_eq!(transitions[0].byte, b'x');
1274 assert_eq!(transitions[0].child.distance, 0x100 - 0x0010);
1275 assert_eq!(transitions[2].byte, b'z');
1276 assert_eq!(transitions[2].child.distance, 0x100 - 0x0030);
1277 }
1278 other => panic!("Expected Sparse, got {:?}", other),
1279 }
1280 }
1281
1282 #[test]
1294 fn parse_bti_node_sparse12_ordinal6_count1_exact_minimal_5_bytes() {
1295 let node_bytes = sparse12_node(0, &[(b'a', 0xABC)]);
1300 assert_eq!(
1301 node_bytes.len(),
1302 5,
1303 "Sparse12 count=1 must be exactly 5 bytes (was over-counted as 6 with old formula)"
1304 );
1305 let offset: u64 = 0x1000;
1306 let parsed = parse_bti_node(&node_bytes, offset)
1307 .expect("exact-minimal Sparse12 count=1 must parse successfully");
1308 assert_eq!(parsed.node_type, BtiNodeType::Sparse);
1309 match &parsed.data {
1310 BtiNodeData::Sparse { transitions } => {
1311 assert_eq!(transitions.len(), 1);
1312 assert_eq!(transitions[0].byte, b'a');
1313 assert_eq!(
1314 transitions[0].child.distance,
1315 offset - 0xABC,
1316 "child offset = parent(0x1000) - delta(0xABC) = 0x544"
1317 );
1318 }
1319 other => panic!("Expected BtiNodeData::Sparse, got {:?}", other),
1320 }
1321 }
1322
1323 #[test]
1324 fn parse_bti_node_sparse12_ordinal6_count2_exact_minimal_7_bytes() {
1325 let node_bytes = sparse12_node(0, &[(b'x', 0x100), (b'y', 0x200)]);
1330 assert_eq!(
1331 node_bytes.len(),
1332 7,
1333 "Sparse12 count=2 must be exactly 7 bytes"
1334 );
1335 let offset: u64 = 0x800;
1336 let parsed = parse_bti_node(&node_bytes, offset)
1337 .expect("exact-minimal Sparse12 count=2 must parse successfully");
1338 assert_eq!(parsed.node_type, BtiNodeType::Sparse);
1339 match &parsed.data {
1340 BtiNodeData::Sparse { transitions } => {
1341 assert_eq!(transitions.len(), 2);
1342 assert_eq!(transitions[0].byte, b'x');
1343 assert_eq!(transitions[0].child.distance, offset - 0x100);
1344 assert_eq!(transitions[1].byte, b'y');
1345 assert_eq!(transitions[1].child.distance, offset - 0x200);
1346 }
1347 other => panic!("Expected BtiNodeData::Sparse, got {:?}", other),
1348 }
1349 }
1350
1351 #[test]
1360 fn parse_bti_node_sparse24_ordinal8_count1_exact_minimal_6_bytes() {
1361 let node_bytes = sparse24_node(0, &[(b'p', 0x010203)]);
1363 assert_eq!(
1364 node_bytes.len(),
1365 6,
1366 "Sparse24 count=1 must be exactly 6 bytes"
1367 );
1368 let offset: u64 = 0x20000;
1369 let parsed = parse_bti_node(&node_bytes, offset)
1370 .expect("exact-minimal Sparse24 count=1 must parse successfully");
1371 assert_eq!(parsed.node_type, BtiNodeType::Sparse);
1372 match &parsed.data {
1373 BtiNodeData::Sparse { transitions } => {
1374 assert_eq!(transitions.len(), 1);
1375 assert_eq!(transitions[0].byte, b'p');
1376 assert_eq!(transitions[0].child.distance, offset - 0x010203);
1377 }
1378 other => panic!("Expected BtiNodeData::Sparse, got {:?}", other),
1379 }
1380 }
1381
1382 #[test]
1391 fn parse_bti_node_sparse40_ordinal9_count1_exact_minimal_8_bytes() {
1392 let delta: u64 = 0x0000_0001_0000;
1394 let node_bytes = sparse40_node(0, &[(b'q', delta)]);
1395 assert_eq!(
1396 node_bytes.len(),
1397 8,
1398 "Sparse40 count=1 must be exactly 8 bytes"
1399 );
1400 let offset: u64 = 0x0010_0000;
1401 let parsed = parse_bti_node(&node_bytes, offset)
1402 .expect("exact-minimal Sparse40 count=1 must parse successfully");
1403 assert_eq!(parsed.node_type, BtiNodeType::Sparse);
1404 match &parsed.data {
1405 BtiNodeData::Sparse { transitions } => {
1406 assert_eq!(transitions.len(), 1);
1407 assert_eq!(transitions[0].byte, b'q');
1408 assert_eq!(transitions[0].child.distance, offset - delta);
1409 }
1410 other => panic!("Expected BtiNodeData::Sparse, got {:?}", other),
1411 }
1412 }
1413
1414 #[test]
1425 fn parse_bti_node_dense12_ordinal10_range1_exact_minimal_5_bytes() {
1426 let node_bytes = dense12_node(0, b'A', &[0x123]);
1428 assert_eq!(
1429 node_bytes.len(),
1430 5,
1431 "Dense12 range_len=1 must be exactly 5 bytes"
1432 );
1433 let offset: u64 = 0x500;
1434 let parsed = parse_bti_node(&node_bytes, offset)
1435 .expect("exact-minimal Dense12 range=1 must parse successfully");
1436 assert_eq!(parsed.node_type, BtiNodeType::Dense);
1437 match &parsed.data {
1438 BtiNodeData::Dense {
1439 start_byte,
1440 children,
1441 } => {
1442 assert_eq!(*start_byte, b'A');
1443 assert_eq!(children.len(), 1);
1444 assert_eq!(children[0].distance, offset - 0x123);
1445 }
1446 other => panic!("Expected BtiNodeData::Dense, got {:?}", other),
1447 }
1448 }
1449
1450 #[test]
1451 fn parse_bti_node_dense12_ordinal10_range2_exact_minimal_6_bytes() {
1452 let node_bytes = dense12_node(0, b'A', &[0x100, 0x200]);
1455 assert_eq!(
1456 node_bytes.len(),
1457 6,
1458 "Dense12 range_len=2 must be exactly 6 bytes"
1459 );
1460 let offset: u64 = 0x800;
1461 let parsed = parse_bti_node(&node_bytes, offset)
1462 .expect("exact-minimal Dense12 range=2 must parse successfully");
1463 assert_eq!(parsed.node_type, BtiNodeType::Dense);
1464 match &parsed.data {
1465 BtiNodeData::Dense {
1466 start_byte,
1467 children,
1468 } => {
1469 assert_eq!(*start_byte, b'A');
1470 assert_eq!(children.len(), 2);
1471 assert_eq!(children[0].distance, offset - 0x100);
1472 assert_eq!(children[1].distance, offset - 0x200);
1473 }
1474 other => panic!("Expected BtiNodeData::Dense, got {:?}", other),
1475 }
1476 }
1477
1478 #[test]
1483 fn parse_bti_node_dense16_ordinal11_three_children() {
1484 let node_bytes = dense16_node(0, b'a', &[0x0010, 0x0000, 0x0030]);
1487 let parsed = parse_bti_node(&node_bytes, 0x200).unwrap();
1488 assert_eq!(parsed.node_type, BtiNodeType::Dense);
1489 match &parsed.data {
1490 BtiNodeData::Dense {
1491 start_byte,
1492 children,
1493 } => {
1494 assert_eq!(*start_byte, b'a');
1495 assert_eq!(children.len(), 3);
1496 assert_eq!(children[0].distance, 0x200 - 0x0010);
1498 assert_eq!(children[1].distance, 0);
1500 assert_eq!(children[2].distance, 0x200 - 0x0030);
1502 }
1503 other => panic!("Expected Dense, got {:?}", other),
1504 }
1505 }
1506
1507 #[test]
1508 fn parse_bti_node_long_dense_ordinal15_two_children() {
1509 let node_bytes = long_dense_node(0, b'A', &[0x0000_0000_0000_0100, 0x0000_0000_0000_0200]);
1511 let parsed = parse_bti_node(&node_bytes, 0x10000).unwrap();
1512 assert_eq!(parsed.node_type, BtiNodeType::Dense);
1513 match &parsed.data {
1514 BtiNodeData::Dense {
1515 start_byte,
1516 children,
1517 } => {
1518 assert_eq!(*start_byte, b'A');
1519 assert_eq!(children.len(), 2);
1520 assert_eq!(children[0].distance, 0x10000 - 0x100);
1521 assert_eq!(children[1].distance, 0x10000 - 0x200);
1522 }
1523 other => panic!("Expected Dense, got {:?}", other),
1524 }
1525 }
1526
1527 #[test]
1532 fn classify_node_nibble_all_ordinals() {
1533 assert_eq!(classify_node_nibble(0).unwrap(), BtiNodeType::PayloadOnly);
1535 for n in 1u8..=4 {
1537 assert_eq!(
1538 classify_node_nibble(n).unwrap(),
1539 BtiNodeType::Single,
1540 "ordinal {} should be Single",
1541 n
1542 );
1543 }
1544 for n in 5u8..=9 {
1546 assert_eq!(
1547 classify_node_nibble(n).unwrap(),
1548 BtiNodeType::Sparse,
1549 "ordinal {} should be Sparse",
1550 n
1551 );
1552 }
1553 for n in 10u8..=15 {
1555 assert_eq!(
1556 classify_node_nibble(n).unwrap(),
1557 BtiNodeType::Dense,
1558 "ordinal {} should be Dense",
1559 n
1560 );
1561 }
1562 }
1563
1564 #[test]
1571 fn rows_parser_sparse_root_node_not_mislabeled() {
1572 let root_node = sparse8_node(0, &[(b'a', 5), (b'b', 10)]);
1575 let data = make_bti_file(root_node);
1576 let cursor = Cursor::new(data);
1577 let mut parser = RowsParser::new(cursor).unwrap();
1578
1579 let root_offset = parser.header.root_offset;
1581 let node = parser.load_node(root_offset).unwrap();
1582
1583 assert_eq!(
1584 node.node_type,
1585 BtiNodeType::Sparse,
1586 "RowsParser returned {:?} for a Sparse8 root node — regression from #647",
1587 node.node_type
1588 );
1589 assert_eq!(node.child_count(), 2);
1590 }
1591
1592 #[test]
1594 fn rows_parser_dense_root_node_not_mislabeled() {
1595 let root_node = dense16_node(0, b'0', &[0x0020, 0x0000, 0x0040]);
1596 let data = make_bti_file(root_node);
1597 let cursor = Cursor::new(data);
1598 let mut parser = RowsParser::new(cursor).unwrap();
1599 let root_offset = parser.header.root_offset;
1600 let node = parser.load_node(root_offset).unwrap();
1601
1602 assert_eq!(
1603 node.node_type,
1604 BtiNodeType::Dense,
1605 "RowsParser returned {:?} for a Dense16 root node",
1606 node.node_type
1607 );
1608 }
1609
1610 #[test]
1612 fn rows_parser_single_nopayload4_root_node_not_mislabeled() {
1613 let root_offset_val: u64 = 64;
1614 let root_node = single_nopayload4_node(3, b'q');
1616 let data = make_bti_file(root_node);
1617 let cursor = Cursor::new(data);
1618 let mut parser = RowsParser::new(cursor).unwrap();
1619 let root_offset = parser.header.root_offset;
1620 assert_eq!(root_offset, root_offset_val);
1621 let node = parser.load_node(root_offset).unwrap();
1622
1623 assert_eq!(
1624 node.node_type,
1625 BtiNodeType::Single,
1626 "RowsParser returned {:?} for a SingleNoPayload4 root node",
1627 node.node_type
1628 );
1629 match &node.data {
1630 BtiNodeData::Single { transition } => {
1631 assert_eq!(transition.byte, b'q');
1632 assert_eq!(transition.child.distance, root_offset_val - 3);
1633 }
1634 other => panic!("Expected Single data, got {:?}", other),
1635 }
1636 }
1637
1638 #[test]
1643 fn test_bti_header_parsing() {
1644 let mut header_data = Vec::new();
1645 header_data.extend_from_slice(&BtiHeader::MAGIC.to_be_bytes());
1646 header_data.extend_from_slice(&BtiHeader::VERSION.to_be_bytes());
1647 header_data.extend_from_slice(&0u16.to_be_bytes()); header_data.extend_from_slice(&1024u64.to_be_bytes()); header_data.extend_from_slice(&100u64.to_be_bytes()); let (header, size) = BtiHeader::parse(&header_data).unwrap();
1652 assert_eq!(header.magic, BtiHeader::MAGIC);
1653 assert_eq!(header.version, BtiHeader::VERSION);
1654 assert_eq!(header.root_offset, 1024);
1655 assert_eq!(header.entry_count, 100);
1656 assert_eq!(size, 24);
1657 }
1658
1659 #[test]
1660 fn test_partitions_parser_creation() {
1661 let data = make_bti_file(payload_only_node(1000, 50));
1662 let cursor = Cursor::new(data);
1663 let _parser = PartitionsParser::new(cursor).unwrap();
1664 }
1665
1666 #[test]
1667 fn test_rows_parser_creation() {
1668 let data = make_bti_file(payload_only_node(1000, 50));
1669 let cursor = Cursor::new(data);
1670 let _parser = RowsParser::new(cursor).unwrap();
1671 }
1672
1673 #[test]
1674 fn test_partition_lookup() {
1675 let data = make_bti_file(payload_only_node(1000, 50));
1676 let cursor = Cursor::new(data);
1677 let mut parser = PartitionsParser::new(cursor).unwrap();
1678
1679 let partition_key = vec![Value::Text("test_partition".to_string())];
1681 let result = parser.lookup_partition(&partition_key).unwrap();
1682 assert!(result.is_some());
1683 }
1684
1685 #[test]
1686 fn test_header_serialization_round_trip() {
1687 let original_header = BtiHeader {
1688 magic: BtiHeader::MAGIC,
1689 version: BtiHeader::VERSION,
1690 flags: 0x1234,
1691 root_offset: 0x123456789ABCDEF0,
1692 entry_count: 0xFEDCBA9876543210,
1693 metadata_size: 0x12345678,
1694 };
1695
1696 let serialized = original_header.to_bytes();
1697 let (parsed_header, _) = BtiHeader::parse(&serialized).unwrap();
1698
1699 assert_eq!(original_header.magic, parsed_header.magic);
1700 assert_eq!(original_header.version, parsed_header.version);
1701 assert_eq!(original_header.flags, parsed_header.flags);
1702 assert_eq!(original_header.root_offset, parsed_header.root_offset);
1703 assert_eq!(original_header.entry_count, parsed_header.entry_count);
1704 assert_eq!(original_header.metadata_size, parsed_header.metadata_size);
1705 }
1706
1707 #[test]
1712 fn read_be_unsigned_edge_cases() {
1713 assert_eq!(read_be_unsigned(&[]), 0);
1714 assert_eq!(read_be_unsigned(&[0xFF]), 255);
1715 assert_eq!(read_be_unsigned(&[0x01, 0x00]), 256);
1716 assert_eq!(read_be_unsigned(&[0xFF, 0xFF, 0xFF, 0xFF]), 0xFFFF_FFFF);
1717 }
1718
1719 #[test]
1720 fn read_12bit_packed_even_and_odd_indices() {
1721 let data: &[u8] = &[0xAB, 0xC1, 0x23];
1724 assert_eq!(read_12bit_packed(data, 0), 0xABC);
1725 assert_eq!(read_12bit_packed(data, 1), 0x123);
1726 }
1727}