use crate::{
error::Error,
storage::sstable::bti::{
encoder::ByteComparableEncoder,
node::{
BtiNode, BtiNodeData, BtiNodeType, BtiResult, PayloadRef, SizedPointer, Transition,
TrieNavigator,
},
},
types::Value,
};
use std::collections::HashMap;
use std::io::{Read, Seek, SeekFrom};
fn classify_node_nibble(nibble: u8) -> BtiResult<BtiNodeType> {
match nibble {
0 => Ok(BtiNodeType::PayloadOnly),
1..=4 => Ok(BtiNodeType::Single),
5..=9 => Ok(BtiNodeType::Sparse),
10..=15 => Ok(BtiNodeType::Dense),
other => Err(Error::Parse(format!(
"Invalid BTI node type nibble: {}",
other
))),
}
}
fn pointer_bytes_for_ordinal(ordinal: u8) -> u8 {
match ordinal {
0 => 0, 1 => 0, 2 => 1, 3 => 0, 4 => 2, 5 => 1, 6 => 0, 7 => 2, 8 => 3, 9 => 5, 10 => 0, 11 => 2, 12 => 3, 13 => 4, 14 => 5, 15 => 8, _ => 0,
}
}
fn parse_bti_node(data: &[u8], offset: u64) -> BtiResult<BtiNode> {
if data.is_empty() {
return Err(Error::Parse("Empty BTI node data".to_string()));
}
let header_byte = data[0];
let ordinal = (header_byte >> 4) & 0x0F;
let payload_flags = header_byte & 0x0F;
let has_payload = payload_flags != 0;
let node_type = classify_node_nibble(ordinal)?;
match node_type {
BtiNodeType::PayloadOnly => {
if !has_payload {
return Err(Error::Parse(
"PayloadOnly node has no payload flags set".to_string(),
));
}
let payload = parse_payload_ref(&data[1..])?;
Ok(BtiNode {
node_type,
level: 0,
key_prefix: Vec::new(),
data: BtiNodeData::PayloadOnly { payload },
})
}
BtiNodeType::Single => {
match ordinal {
1 => {
if data.len() < 2 {
return Err(Error::Parse(
"SingleNoPayload4 node data too short".to_string(),
));
}
let delta = (header_byte & 0x0F) as u64;
let transition_byte = data[1];
let child_offset = offset.saturating_sub(delta);
Ok(BtiNode {
node_type,
level: 1,
key_prefix: Vec::new(),
data: BtiNodeData::Single {
transition: Transition::new(
transition_byte,
SizedPointer::new(child_offset),
),
},
})
}
3 => {
if data.len() < 3 {
return Err(Error::Parse(
"SingleNoPayload12 node data too short".to_string(),
));
}
let delta = (((header_byte & 0x0F) as u64) << 8) | (data[1] as u64);
let transition_byte = data[2];
let child_offset = offset.saturating_sub(delta);
Ok(BtiNode {
node_type,
level: 1,
key_prefix: Vec::new(),
data: BtiNodeData::Single {
transition: Transition::new(
transition_byte,
SizedPointer::new(child_offset),
),
},
})
}
_ => {
let ptr_bytes = pointer_bytes_for_ordinal(ordinal) as usize;
let needed = 2 + ptr_bytes;
if data.len() < needed {
return Err(Error::Parse(format!(
"Single node (ordinal {}) data too short: need {} bytes, have {}",
ordinal,
needed,
data.len()
)));
}
let transition_byte = data[1];
let delta = read_be_unsigned(&data[2..2 + ptr_bytes]);
let child_offset = offset.saturating_sub(delta);
let transition =
Transition::new(transition_byte, SizedPointer::new(child_offset));
Ok(BtiNode {
node_type,
level: 1,
key_prefix: Vec::new(),
data: BtiNodeData::Single { transition },
})
}
}
}
BtiNodeType::Sparse => {
if data.len() < 2 {
return Err(Error::Parse("Sparse node data too short".to_string()));
}
let count = data[1] as usize;
if count == 0 {
return Err(Error::Parse(
"Sparse node must have at least one transition".to_string(),
));
}
let bytes_start = 2;
let pointers_start = bytes_start + count;
if ordinal == 6 {
let packed_len = (count * 3).div_ceil(2); let needed = pointers_start + packed_len;
if data.len() < needed {
return Err(Error::Parse(format!(
"Sparse12 node data too short: need {}, have {}",
needed,
data.len()
)));
}
let mut transitions = Vec::with_capacity(count);
for i in 0..count {
let t_byte = data[bytes_start + i];
let delta = read_12bit_packed(&data[pointers_start..], i);
let child_offset = offset.saturating_sub(delta);
transitions.push(Transition::new(t_byte, SizedPointer::new(child_offset)));
}
Ok(BtiNode {
node_type,
level: 1,
key_prefix: Vec::new(),
data: BtiNodeData::Sparse { transitions },
})
} else {
let ptr_bytes = pointer_bytes_for_ordinal(ordinal) as usize;
let needed = pointers_start + count * ptr_bytes;
if data.len() < needed {
return Err(Error::Parse(format!(
"Sparse node (ordinal {}) data too short: need {}, have {}",
ordinal,
needed,
data.len()
)));
}
let mut transitions = Vec::with_capacity(count);
for i in 0..count {
let t_byte = data[bytes_start + i];
let ptr_off = pointers_start + i * ptr_bytes;
let delta = read_be_unsigned(&data[ptr_off..ptr_off + ptr_bytes]);
let child_offset = offset.saturating_sub(delta);
transitions.push(Transition::new(t_byte, SizedPointer::new(child_offset)));
}
Ok(BtiNode {
node_type,
level: 1,
key_prefix: Vec::new(),
data: BtiNodeData::Sparse { transitions },
})
}
}
BtiNodeType::Dense => {
if data.len() < 3 {
return Err(Error::Parse("Dense node data too short".to_string()));
}
let start_byte = data[1];
let range_len = data[2] as usize + 1;
if ordinal == 10 {
let packed_len = (range_len * 3).div_ceil(2);
let needed = 3 + packed_len;
if data.len() < needed {
return Err(Error::Parse(format!(
"Dense12 node data too short: need {}, have {}",
needed,
data.len()
)));
}
let mut children = Vec::with_capacity(range_len);
for i in 0..range_len {
let delta = read_12bit_packed(&data[3..], i);
let child_offset = if delta == 0 {
0
} else {
offset.saturating_sub(delta)
};
children.push(SizedPointer::new(child_offset));
}
Ok(BtiNode {
node_type,
level: 1,
key_prefix: Vec::new(),
data: BtiNodeData::Dense {
start_byte,
children,
},
})
} else {
let ptr_bytes = pointer_bytes_for_ordinal(ordinal) as usize;
let needed = 3 + range_len * ptr_bytes;
if data.len() < needed {
return Err(Error::Parse(format!(
"Dense node (ordinal {}) data too short: need {}, have {}",
ordinal,
needed,
data.len()
)));
}
let mut children = Vec::with_capacity(range_len);
for i in 0..range_len {
let ptr_off = 3 + i * ptr_bytes;
let delta = read_be_unsigned(&data[ptr_off..ptr_off + ptr_bytes]);
let child_offset = if delta == 0 {
0
} else {
offset.saturating_sub(delta)
};
children.push(SizedPointer::new(child_offset));
}
Ok(BtiNode {
node_type,
level: 1,
key_prefix: Vec::new(),
data: BtiNodeData::Dense {
start_byte,
children,
},
})
}
}
}
}
fn read_be_unsigned(data: &[u8]) -> u64 {
let mut result = 0u64;
for &byte in data {
result = (result << 8) | (byte as u64);
}
result
}
fn read_12bit_packed(data: &[u8], index: usize) -> u64 {
let byte_offset = (3 * index) / 2;
if byte_offset + 1 >= data.len() {
return 0;
}
let word = ((data[byte_offset] as u16) << 8) | (data[byte_offset + 1] as u16);
let value = if (index & 1) == 0 {
word >> 4
} else {
word & 0x0FFF
};
value as u64
}
fn parse_payload_ref(data: &[u8]) -> BtiResult<PayloadRef> {
if data.len() < 12 {
return Err(Error::Parse(format!(
"PayloadRef data too short: need 12 bytes, have {}",
data.len()
)));
}
let offset = u64::from_be_bytes([
data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7],
]);
let length = u32::from_be_bytes([data[8], data[9], data[10], data[11]]);
Ok(PayloadRef::new(offset, length))
}
#[derive(Debug, Clone)]
pub struct BtiHeader {
pub magic: u32,
pub version: u16,
pub flags: u16,
pub root_offset: u64,
pub entry_count: u64,
pub metadata_size: u32,
}
impl BtiHeader {
pub const MAGIC: u32 = 0x6461_0000;
pub const VERSION: u16 = 0x0001;
pub fn parse(data: &[u8]) -> BtiResult<(Self, usize)> {
if data.len() < 24 {
return Err(Error::Parse("BTI header too short".to_string()));
}
let magic = u32::from_be_bytes([data[0], data[1], data[2], data[3]]);
if magic != Self::MAGIC {
return Err(Error::Parse(format!(
"Invalid BTI magic: 0x{:08x}, expected 0x{:08x}",
magic,
Self::MAGIC
)));
}
let version = u16::from_be_bytes([data[4], data[5]]);
if version != Self::VERSION {
return Err(Error::Parse(format!(
"Unsupported BTI version: 0x{:04x}, expected 0x{:04x}",
version,
Self::VERSION
)));
}
let flags = u16::from_be_bytes([data[6], data[7]]);
let root_offset = u64::from_be_bytes([
data[8], data[9], data[10], data[11], data[12], data[13], data[14], data[15],
]);
let entry_count = u64::from_be_bytes([
data[16], data[17], data[18], data[19], data[20], data[21], data[22], data[23],
]);
let metadata_size = if data.len() >= 28 {
u32::from_be_bytes([data[24], data[25], data[26], data[27]])
} else {
0
};
let header = BtiHeader {
magic,
version,
flags,
root_offset,
entry_count,
metadata_size,
};
let header_size = if metadata_size > 0 { 28 } else { 24 };
Ok((header, header_size))
}
pub fn to_bytes(&self) -> Vec<u8> {
let mut bytes = Vec::with_capacity(28);
bytes.extend_from_slice(&self.magic.to_be_bytes());
bytes.extend_from_slice(&self.version.to_be_bytes());
bytes.extend_from_slice(&self.flags.to_be_bytes());
bytes.extend_from_slice(&self.root_offset.to_be_bytes());
bytes.extend_from_slice(&self.entry_count.to_be_bytes());
if self.metadata_size > 0 {
bytes.extend_from_slice(&self.metadata_size.to_be_bytes());
}
bytes
}
}
pub struct PartitionsParser<R: Read + Seek> {
reader: R,
header: BtiHeader,
encoder: ByteComparableEncoder,
node_cache: HashMap<u64, BtiNode>,
}
impl<R: Read + Seek> PartitionsParser<R> {
pub fn new(mut reader: R) -> BtiResult<Self> {
reader.seek(SeekFrom::Start(0))?;
let mut header_data = vec![0u8; 28];
reader.read_exact(&mut header_data)?;
let (header, _) = BtiHeader::parse(&header_data)?;
Ok(Self {
reader,
header,
encoder: ByteComparableEncoder::new(),
node_cache: HashMap::new(),
})
}
pub fn lookup_partition(&mut self, partition_key: &[Value]) -> BtiResult<Option<PayloadRef>> {
let encoded_key = self.encoder.encode_composite_key(partition_key)?;
let mut navigator = TrieNavigator::new(self.header.root_offset);
self.lookup_in_trie(&mut navigator, &encoded_key)
}
fn lookup_in_trie(
&mut self,
navigator: &mut TrieNavigator,
encoded_key: &[u8],
) -> BtiResult<Option<PayloadRef>> {
let mut key_pos = 0;
loop {
let current_node = self.load_node(navigator.current_offset)?;
if current_node.is_leaf() {
return Ok(current_node.get_payload().cloned());
}
if let Some(payload) = current_node.get_payload() {
if key_pos >= encoded_key.len() {
return Ok(Some(payload.clone()));
}
}
if key_pos >= encoded_key.len() {
return Ok(current_node.get_payload().cloned());
}
let next_byte = encoded_key[key_pos];
if let Some(child_pointer) = current_node.find_child(next_byte) {
navigator.navigate_to_child(next_byte, child_pointer)?;
key_pos += 1;
} else {
return Ok(None);
}
}
}
fn load_node(&mut self, offset: u64) -> BtiResult<BtiNode> {
if let Some(cached_node) = self.node_cache.get(&offset) {
return Ok(cached_node.clone());
}
self.reader.seek(SeekFrom::Start(offset))?;
let mut node_data = vec![0u8; 4096]; let bytes_read = self.reader.read(&mut node_data)?;
node_data.truncate(bytes_read);
let node = self.parse_node_data(&node_data, offset)?;
self.node_cache.insert(offset, node.clone());
Ok(node)
}
fn parse_node_data(&self, data: &[u8], offset: u64) -> BtiResult<BtiNode> {
parse_bti_node(data, offset)
}
pub fn iterate_partitions(&mut self) -> BtiResult<PartitionIterator<'_, R>> {
PartitionIterator::new(self)
}
pub fn header(&self) -> &BtiHeader {
&self.header
}
pub fn get_stats(&self) -> BtiIndexStats {
BtiIndexStats {
entry_count: self.header.entry_count,
root_offset: self.header.root_offset,
cached_nodes: self.node_cache.len(),
}
}
}
pub struct RowsParser<R: Read + Seek> {
reader: R,
header: BtiHeader,
encoder: ByteComparableEncoder,
node_cache: HashMap<u64, BtiNode>,
}
impl<R: Read + Seek> RowsParser<R> {
pub fn new(mut reader: R) -> BtiResult<Self> {
reader.seek(SeekFrom::Start(0))?;
let mut header_data = vec![0u8; 28];
reader.read_exact(&mut header_data)?;
let (header, _) = BtiHeader::parse(&header_data)?;
Ok(Self {
reader,
header,
encoder: ByteComparableEncoder::new(),
node_cache: HashMap::new(),
})
}
pub fn lookup_row(&mut self, clustering_key: &[Value]) -> BtiResult<Option<PayloadRef>> {
let encoded_key = self.encoder.encode_composite_key(clustering_key)?;
let mut navigator = TrieNavigator::new(self.header.root_offset);
self.lookup_in_trie(&mut navigator, &encoded_key)
}
fn lookup_in_trie(
&mut self,
navigator: &mut TrieNavigator,
encoded_key: &[u8],
) -> BtiResult<Option<PayloadRef>> {
let mut key_pos = 0;
loop {
let current_node = self.load_node(navigator.current_offset)?;
if let Some(payload) = current_node.get_payload() {
if key_pos >= encoded_key.len() {
return Ok(Some(payload.clone()));
}
}
if key_pos >= encoded_key.len() {
return Ok(current_node.get_payload().cloned());
}
let next_byte = encoded_key[key_pos];
if let Some(child_pointer) = current_node.find_child(next_byte) {
navigator.navigate_to_child(next_byte, child_pointer)?;
key_pos += 1;
} else {
return Ok(None);
}
}
}
fn load_node(&mut self, offset: u64) -> BtiResult<BtiNode> {
if let Some(cached_node) = self.node_cache.get(&offset) {
return Ok(cached_node.clone());
}
self.reader.seek(SeekFrom::Start(offset))?;
let mut node_data = vec![0u8; 4096]; let bytes_read = self.reader.read(&mut node_data)?;
node_data.truncate(bytes_read);
let node = self.parse_node_data(&node_data, offset)?;
self.node_cache.insert(offset, node.clone());
Ok(node)
}
fn parse_node_data(&self, data: &[u8], offset: u64) -> BtiResult<BtiNode> {
parse_bti_node(data, offset)
}
pub fn range_query(
&mut self,
start_key: &[Value],
end_key: &[Value],
) -> BtiResult<Vec<PayloadRef>> {
let _encoded_start = self.encoder.encode_composite_key(start_key)?;
let _encoded_end = self.encoder.encode_composite_key(end_key)?;
let results = Vec::new();
let _navigator = TrieNavigator::new(self.header.root_offset);
Ok(results)
}
pub fn iterate_rows(&mut self) -> BtiResult<RowIterator<'_, R>> {
RowIterator::new(self)
}
pub fn header(&self) -> &BtiHeader {
&self.header
}
}
pub struct PartitionIterator<'a, R: Read + Seek> {
#[allow(dead_code)]
parser: &'a mut PartitionsParser<R>,
#[allow(dead_code)]
current_position: u64,
finished: bool,
}
impl<'a, R: Read + Seek> PartitionIterator<'a, R> {
fn new(parser: &'a mut PartitionsParser<R>) -> BtiResult<Self> {
let root_offset = parser.header.root_offset;
Ok(Self {
parser,
current_position: root_offset,
finished: false,
})
}
}
impl<'a, R: Read + Seek> Iterator for PartitionIterator<'a, R> {
type Item = BtiResult<(Vec<u8>, PayloadRef)>;
fn next(&mut self) -> Option<Self::Item> {
if self.finished {
return None;
}
self.finished = true;
None
}
}
pub struct RowIterator<'a, R: Read + Seek> {
#[allow(dead_code)]
parser: &'a mut RowsParser<R>,
#[allow(dead_code)]
current_position: u64,
finished: bool,
}
impl<'a, R: Read + Seek> RowIterator<'a, R> {
fn new(parser: &'a mut RowsParser<R>) -> BtiResult<Self> {
let root_offset = parser.header.root_offset;
Ok(Self {
parser,
current_position: root_offset,
finished: false,
})
}
}
impl<'a, R: Read + Seek> Iterator for RowIterator<'a, R> {
type Item = BtiResult<(Vec<u8>, PayloadRef)>;
fn next(&mut self) -> Option<Self::Item> {
if self.finished {
return None;
}
self.finished = true;
None
}
}
#[derive(Debug, Clone)]
pub struct BtiIndexStats {
pub entry_count: u64,
pub root_offset: u64,
pub cached_nodes: usize,
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
fn make_bti_file(root_node_bytes: Vec<u8>) -> Vec<u8> {
let root_offset: u64 = 64; let mut data = Vec::new();
data.extend_from_slice(&BtiHeader::MAGIC.to_be_bytes());
data.extend_from_slice(&BtiHeader::VERSION.to_be_bytes());
data.extend_from_slice(&0u16.to_be_bytes()); data.extend_from_slice(&root_offset.to_be_bytes());
data.extend_from_slice(&1u64.to_be_bytes()); data.extend_from_slice(&0u32.to_be_bytes()); while data.len() < root_offset as usize {
data.push(0);
}
data.extend(root_node_bytes);
data
}
fn payload_only_node(data_offset: u64, length: u32) -> Vec<u8> {
let mut v = vec![0x01u8]; v.extend_from_slice(&data_offset.to_be_bytes());
v.extend_from_slice(&length.to_be_bytes());
v
}
fn single8_node(payload_flags: u8, transition: u8, delta: u8) -> Vec<u8> {
vec![0x20 | (payload_flags & 0x0F), transition, delta]
}
fn single_nopayload4_node(delta4: u8, transition: u8) -> Vec<u8> {
vec![0x10 | (delta4 & 0x0F), transition]
}
fn single_nopayload12_node(delta: u16, transition: u8) -> Vec<u8> {
vec![
0x30 | ((delta >> 8) as u8 & 0x0F),
(delta & 0xFF) as u8,
transition,
]
}
fn single16_node(payload_flags: u8, transition: u8, delta: u16) -> Vec<u8> {
let mut v = vec![0x40 | (payload_flags & 0x0F), transition];
v.extend_from_slice(&delta.to_be_bytes());
v
}
fn sparse8_node(payload_flags: u8, pairs: &[(u8, u8)]) -> Vec<u8> {
let mut v = vec![0x50 | (payload_flags & 0x0F), pairs.len() as u8];
for &(t, _) in pairs {
v.push(t);
}
for &(_, d) in pairs {
v.push(d);
}
v
}
fn dense16_node(payload_flags: u8, start: u8, deltas: &[u16]) -> Vec<u8> {
let len = deltas.len() as u8;
let mut v = vec![0xB0 | (payload_flags & 0x0F), start, len - 1];
for &d in deltas {
v.extend_from_slice(&d.to_be_bytes());
}
v
}
fn long_dense_node(payload_flags: u8, start: u8, deltas: &[u64]) -> Vec<u8> {
let len = deltas.len() as u8;
let mut v = vec![0xF0 | (payload_flags & 0x0F), start, len - 1];
for &d in deltas {
v.extend_from_slice(&d.to_be_bytes());
}
v
}
fn sparse12_node(payload_flags: u8, pairs: &[(u8, u16)]) -> Vec<u8> {
let count = pairs.len();
let mut v = vec![0x60 | (payload_flags & 0x0F), count as u8];
for &(t, _) in pairs {
v.push(t);
}
let mut i = 0;
while i + 2 <= count {
let p0 = pairs[i].1 as u32;
let p1 = pairs[i + 1].1 as u32;
v.push((p0 >> 4) as u8);
v.push(((p0 << 4) | (p1 >> 8)) as u8);
v.push((p1 & 0xFF) as u8);
i += 2;
}
if i < count {
let pd = pairs[i].1 as u32;
let s = (pd << 4) as u16;
v.extend_from_slice(&s.to_be_bytes());
}
v
}
fn sparse24_node(payload_flags: u8, pairs: &[(u8, u32)]) -> Vec<u8> {
let count = pairs.len();
let mut v = vec![0x80 | (payload_flags & 0x0F), count as u8];
for &(t, _) in pairs {
v.push(t);
}
for &(_, d) in pairs {
v.push(((d >> 16) & 0xFF) as u8);
v.push(((d >> 8) & 0xFF) as u8);
v.push((d & 0xFF) as u8);
}
v
}
fn sparse40_node(payload_flags: u8, pairs: &[(u8, u64)]) -> Vec<u8> {
let count = pairs.len();
let mut v = vec![0x90 | (payload_flags & 0x0F), count as u8];
for &(t, _) in pairs {
v.push(t);
}
for &(_, d) in pairs {
v.push(((d >> 32) & 0xFF) as u8);
v.push(((d >> 24) & 0xFF) as u8);
v.push(((d >> 16) & 0xFF) as u8);
v.push(((d >> 8) & 0xFF) as u8);
v.push((d & 0xFF) as u8);
}
v
}
fn dense12_node(payload_flags: u8, start: u8, deltas: &[u16]) -> Vec<u8> {
let range_len = deltas.len();
let mut v = vec![0xA0 | (payload_flags & 0x0F), start, (range_len - 1) as u8];
let mut carry: u8 = 0;
for (i, &d) in deltas.iter().enumerate() {
let val = d as u32;
if (i & 1) == 0 {
v.push((val >> 4) as u8);
carry = (val << 4) as u8;
} else {
v.push(carry | (val >> 8) as u8);
v.push((val & 0xFF) as u8);
carry = 0;
}
}
if (range_len & 1) == 1 {
v.push(carry);
}
v
}
#[test]
fn regression_rows_parser_single_node_not_mislabeled_as_payload_only() {
let node_bytes = single8_node(0, b'a', 5);
let offset: u64 = 100;
let node = parse_bti_node(&node_bytes, offset)
.expect("parse_bti_node must succeed for a valid Single8 node");
assert_eq!(
node.node_type,
BtiNodeType::Single,
"Single8 node (nibble 0x2) was mislabeled as {:?} — regression from #647 stub",
node.node_type,
);
match &node.data {
BtiNodeData::Single { transition } => {
assert_eq!(transition.byte, b'a');
assert_eq!(
transition.child.distance, 95,
"child offset should be parent(100) - delta(5) = 95"
);
}
other => panic!("Expected BtiNodeData::Single, got {:?}", other),
}
}
#[test]
fn parse_bti_node_payload_only_correct_type_and_offsets() {
let node = payload_only_node(0xDEAD_BEEF_0000_1234, 42);
let parsed = parse_bti_node(&node, 0).unwrap();
assert_eq!(parsed.node_type, BtiNodeType::PayloadOnly);
match &parsed.data {
BtiNodeData::PayloadOnly { payload } => {
assert_eq!(payload.offset, 0xDEAD_BEEF_0000_1234);
assert_eq!(payload.length, 42);
}
other => panic!("Expected PayloadOnly, got {:?}", other),
}
}
#[test]
fn parse_bti_node_payload_only_no_payload_flags_is_error() {
let node_bytes = vec![0x00u8]; let err = parse_bti_node(&node_bytes, 0);
assert!(err.is_err(), "PayloadOnly with flags=0 should be an error");
}
#[test]
fn parse_bti_node_single_nopayload4_ordinal1() {
let node_bytes = single_nopayload4_node(3, b'x');
let parsed = parse_bti_node(&node_bytes, 50).unwrap();
assert_eq!(parsed.node_type, BtiNodeType::Single);
match &parsed.data {
BtiNodeData::Single { transition } => {
assert_eq!(transition.byte, b'x');
assert_eq!(transition.child.distance, 47); }
other => panic!("Expected Single, got {:?}", other),
}
}
#[test]
fn parse_bti_node_single8_ordinal2() {
let node_bytes = single8_node(0, b'z', 10);
let parsed = parse_bti_node(&node_bytes, 200).unwrap();
assert_eq!(parsed.node_type, BtiNodeType::Single);
match &parsed.data {
BtiNodeData::Single { transition } => {
assert_eq!(transition.byte, b'z');
assert_eq!(transition.child.distance, 190); }
other => panic!("Expected Single, got {:?}", other),
}
}
#[test]
fn parse_bti_node_single_nopayload12_ordinal3() {
let node_bytes = single_nopayload12_node(0x123, b'k');
let parsed = parse_bti_node(&node_bytes, 1000).unwrap();
assert_eq!(parsed.node_type, BtiNodeType::Single);
match &parsed.data {
BtiNodeData::Single { transition } => {
assert_eq!(transition.byte, b'k');
assert_eq!(transition.child.distance, 1000 - 0x123);
}
other => panic!("Expected Single, got {:?}", other),
}
}
#[test]
fn parse_bti_node_single16_ordinal4() {
let node_bytes = single16_node(0, b'm', 0x0400);
let parsed = parse_bti_node(&node_bytes, 2048).unwrap();
assert_eq!(parsed.node_type, BtiNodeType::Single);
match &parsed.data {
BtiNodeData::Single { transition } => {
assert_eq!(transition.byte, b'm');
assert_eq!(transition.child.distance, 2048 - 1024);
}
other => panic!("Expected Single, got {:?}", other),
}
}
#[test]
fn parse_bti_node_sparse8_ordinal5_two_transitions() {
let node_bytes = sparse8_node(0, &[(b'a', 10), (b'b', 20)]);
let parsed = parse_bti_node(&node_bytes, 100).unwrap();
assert_eq!(parsed.node_type, BtiNodeType::Sparse);
match &parsed.data {
BtiNodeData::Sparse { transitions } => {
assert_eq!(transitions.len(), 2);
assert_eq!(transitions[0].byte, b'a');
assert_eq!(transitions[0].child.distance, 90); assert_eq!(transitions[1].byte, b'b');
assert_eq!(transitions[1].child.distance, 80); }
other => panic!("Expected Sparse, got {:?}", other),
}
}
#[test]
fn parse_bti_node_sparse16_ordinal7_three_transitions() {
let payload_flags = 0u8;
let pairs: &[(u8, u16)] = &[(b'x', 0x0010), (b'y', 0x0020), (b'z', 0x0030)];
let mut node_bytes = vec![0x70 | payload_flags, pairs.len() as u8];
for &(t, _) in pairs {
node_bytes.push(t);
}
for &(_, d) in pairs {
node_bytes.extend_from_slice(&d.to_be_bytes());
}
let parsed = parse_bti_node(&node_bytes, 0x100).unwrap();
assert_eq!(parsed.node_type, BtiNodeType::Sparse);
match &parsed.data {
BtiNodeData::Sparse { transitions } => {
assert_eq!(transitions.len(), 3);
assert_eq!(transitions[0].byte, b'x');
assert_eq!(transitions[0].child.distance, 0x100 - 0x0010);
assert_eq!(transitions[2].byte, b'z');
assert_eq!(transitions[2].child.distance, 0x100 - 0x0030);
}
other => panic!("Expected Sparse, got {:?}", other),
}
}
#[test]
fn parse_bti_node_sparse12_ordinal6_count1_exact_minimal_5_bytes() {
let node_bytes = sparse12_node(0, &[(b'a', 0xABC)]);
assert_eq!(
node_bytes.len(),
5,
"Sparse12 count=1 must be exactly 5 bytes (was over-counted as 6 with old formula)"
);
let offset: u64 = 0x1000;
let parsed = parse_bti_node(&node_bytes, offset)
.expect("exact-minimal Sparse12 count=1 must parse successfully");
assert_eq!(parsed.node_type, BtiNodeType::Sparse);
match &parsed.data {
BtiNodeData::Sparse { transitions } => {
assert_eq!(transitions.len(), 1);
assert_eq!(transitions[0].byte, b'a');
assert_eq!(
transitions[0].child.distance,
offset - 0xABC,
"child offset = parent(0x1000) - delta(0xABC) = 0x544"
);
}
other => panic!("Expected BtiNodeData::Sparse, got {:?}", other),
}
}
#[test]
fn parse_bti_node_sparse12_ordinal6_count2_exact_minimal_7_bytes() {
let node_bytes = sparse12_node(0, &[(b'x', 0x100), (b'y', 0x200)]);
assert_eq!(
node_bytes.len(),
7,
"Sparse12 count=2 must be exactly 7 bytes"
);
let offset: u64 = 0x800;
let parsed = parse_bti_node(&node_bytes, offset)
.expect("exact-minimal Sparse12 count=2 must parse successfully");
assert_eq!(parsed.node_type, BtiNodeType::Sparse);
match &parsed.data {
BtiNodeData::Sparse { transitions } => {
assert_eq!(transitions.len(), 2);
assert_eq!(transitions[0].byte, b'x');
assert_eq!(transitions[0].child.distance, offset - 0x100);
assert_eq!(transitions[1].byte, b'y');
assert_eq!(transitions[1].child.distance, offset - 0x200);
}
other => panic!("Expected BtiNodeData::Sparse, got {:?}", other),
}
}
#[test]
fn parse_bti_node_sparse24_ordinal8_count1_exact_minimal_6_bytes() {
let node_bytes = sparse24_node(0, &[(b'p', 0x010203)]);
assert_eq!(
node_bytes.len(),
6,
"Sparse24 count=1 must be exactly 6 bytes"
);
let offset: u64 = 0x20000;
let parsed = parse_bti_node(&node_bytes, offset)
.expect("exact-minimal Sparse24 count=1 must parse successfully");
assert_eq!(parsed.node_type, BtiNodeType::Sparse);
match &parsed.data {
BtiNodeData::Sparse { transitions } => {
assert_eq!(transitions.len(), 1);
assert_eq!(transitions[0].byte, b'p');
assert_eq!(transitions[0].child.distance, offset - 0x010203);
}
other => panic!("Expected BtiNodeData::Sparse, got {:?}", other),
}
}
#[test]
fn parse_bti_node_sparse40_ordinal9_count1_exact_minimal_8_bytes() {
let delta: u64 = 0x0000_0001_0000;
let node_bytes = sparse40_node(0, &[(b'q', delta)]);
assert_eq!(
node_bytes.len(),
8,
"Sparse40 count=1 must be exactly 8 bytes"
);
let offset: u64 = 0x0010_0000;
let parsed = parse_bti_node(&node_bytes, offset)
.expect("exact-minimal Sparse40 count=1 must parse successfully");
assert_eq!(parsed.node_type, BtiNodeType::Sparse);
match &parsed.data {
BtiNodeData::Sparse { transitions } => {
assert_eq!(transitions.len(), 1);
assert_eq!(transitions[0].byte, b'q');
assert_eq!(transitions[0].child.distance, offset - delta);
}
other => panic!("Expected BtiNodeData::Sparse, got {:?}", other),
}
}
#[test]
fn parse_bti_node_dense12_ordinal10_range1_exact_minimal_5_bytes() {
let node_bytes = dense12_node(0, b'A', &[0x123]);
assert_eq!(
node_bytes.len(),
5,
"Dense12 range_len=1 must be exactly 5 bytes"
);
let offset: u64 = 0x500;
let parsed = parse_bti_node(&node_bytes, offset)
.expect("exact-minimal Dense12 range=1 must parse successfully");
assert_eq!(parsed.node_type, BtiNodeType::Dense);
match &parsed.data {
BtiNodeData::Dense {
start_byte,
children,
} => {
assert_eq!(*start_byte, b'A');
assert_eq!(children.len(), 1);
assert_eq!(children[0].distance, offset - 0x123);
}
other => panic!("Expected BtiNodeData::Dense, got {:?}", other),
}
}
#[test]
fn parse_bti_node_dense12_ordinal10_range2_exact_minimal_6_bytes() {
let node_bytes = dense12_node(0, b'A', &[0x100, 0x200]);
assert_eq!(
node_bytes.len(),
6,
"Dense12 range_len=2 must be exactly 6 bytes"
);
let offset: u64 = 0x800;
let parsed = parse_bti_node(&node_bytes, offset)
.expect("exact-minimal Dense12 range=2 must parse successfully");
assert_eq!(parsed.node_type, BtiNodeType::Dense);
match &parsed.data {
BtiNodeData::Dense {
start_byte,
children,
} => {
assert_eq!(*start_byte, b'A');
assert_eq!(children.len(), 2);
assert_eq!(children[0].distance, offset - 0x100);
assert_eq!(children[1].distance, offset - 0x200);
}
other => panic!("Expected BtiNodeData::Dense, got {:?}", other),
}
}
#[test]
fn parse_bti_node_dense16_ordinal11_three_children() {
let node_bytes = dense16_node(0, b'a', &[0x0010, 0x0000, 0x0030]);
let parsed = parse_bti_node(&node_bytes, 0x200).unwrap();
assert_eq!(parsed.node_type, BtiNodeType::Dense);
match &parsed.data {
BtiNodeData::Dense {
start_byte,
children,
} => {
assert_eq!(*start_byte, b'a');
assert_eq!(children.len(), 3);
assert_eq!(children[0].distance, 0x200 - 0x0010);
assert_eq!(children[1].distance, 0);
assert_eq!(children[2].distance, 0x200 - 0x0030);
}
other => panic!("Expected Dense, got {:?}", other),
}
}
#[test]
fn parse_bti_node_long_dense_ordinal15_two_children() {
let node_bytes = long_dense_node(0, b'A', &[0x0000_0000_0000_0100, 0x0000_0000_0000_0200]);
let parsed = parse_bti_node(&node_bytes, 0x10000).unwrap();
assert_eq!(parsed.node_type, BtiNodeType::Dense);
match &parsed.data {
BtiNodeData::Dense {
start_byte,
children,
} => {
assert_eq!(*start_byte, b'A');
assert_eq!(children.len(), 2);
assert_eq!(children[0].distance, 0x10000 - 0x100);
assert_eq!(children[1].distance, 0x10000 - 0x200);
}
other => panic!("Expected Dense, got {:?}", other),
}
}
#[test]
fn classify_node_nibble_all_ordinals() {
assert_eq!(classify_node_nibble(0).unwrap(), BtiNodeType::PayloadOnly);
for n in 1u8..=4 {
assert_eq!(
classify_node_nibble(n).unwrap(),
BtiNodeType::Single,
"ordinal {} should be Single",
n
);
}
for n in 5u8..=9 {
assert_eq!(
classify_node_nibble(n).unwrap(),
BtiNodeType::Sparse,
"ordinal {} should be Sparse",
n
);
}
for n in 10u8..=15 {
assert_eq!(
classify_node_nibble(n).unwrap(),
BtiNodeType::Dense,
"ordinal {} should be Dense",
n
);
}
}
#[test]
fn rows_parser_sparse_root_node_not_mislabeled() {
let root_node = sparse8_node(0, &[(b'a', 5), (b'b', 10)]);
let data = make_bti_file(root_node);
let cursor = Cursor::new(data);
let mut parser = RowsParser::new(cursor).unwrap();
let root_offset = parser.header.root_offset;
let node = parser.load_node(root_offset).unwrap();
assert_eq!(
node.node_type,
BtiNodeType::Sparse,
"RowsParser returned {:?} for a Sparse8 root node — regression from #647",
node.node_type
);
assert_eq!(node.child_count(), 2);
}
#[test]
fn rows_parser_dense_root_node_not_mislabeled() {
let root_node = dense16_node(0, b'0', &[0x0020, 0x0000, 0x0040]);
let data = make_bti_file(root_node);
let cursor = Cursor::new(data);
let mut parser = RowsParser::new(cursor).unwrap();
let root_offset = parser.header.root_offset;
let node = parser.load_node(root_offset).unwrap();
assert_eq!(
node.node_type,
BtiNodeType::Dense,
"RowsParser returned {:?} for a Dense16 root node",
node.node_type
);
}
#[test]
fn rows_parser_single_nopayload4_root_node_not_mislabeled() {
let root_offset_val: u64 = 64;
let root_node = single_nopayload4_node(3, b'q');
let data = make_bti_file(root_node);
let cursor = Cursor::new(data);
let mut parser = RowsParser::new(cursor).unwrap();
let root_offset = parser.header.root_offset;
assert_eq!(root_offset, root_offset_val);
let node = parser.load_node(root_offset).unwrap();
assert_eq!(
node.node_type,
BtiNodeType::Single,
"RowsParser returned {:?} for a SingleNoPayload4 root node",
node.node_type
);
match &node.data {
BtiNodeData::Single { transition } => {
assert_eq!(transition.byte, b'q');
assert_eq!(transition.child.distance, root_offset_val - 3);
}
other => panic!("Expected Single data, got {:?}", other),
}
}
#[test]
fn test_bti_header_parsing() {
let mut header_data = Vec::new();
header_data.extend_from_slice(&BtiHeader::MAGIC.to_be_bytes());
header_data.extend_from_slice(&BtiHeader::VERSION.to_be_bytes());
header_data.extend_from_slice(&0u16.to_be_bytes()); header_data.extend_from_slice(&1024u64.to_be_bytes()); header_data.extend_from_slice(&100u64.to_be_bytes());
let (header, size) = BtiHeader::parse(&header_data).unwrap();
assert_eq!(header.magic, BtiHeader::MAGIC);
assert_eq!(header.version, BtiHeader::VERSION);
assert_eq!(header.root_offset, 1024);
assert_eq!(header.entry_count, 100);
assert_eq!(size, 24);
}
#[test]
fn test_partitions_parser_creation() {
let data = make_bti_file(payload_only_node(1000, 50));
let cursor = Cursor::new(data);
let _parser = PartitionsParser::new(cursor).unwrap();
}
#[test]
fn test_rows_parser_creation() {
let data = make_bti_file(payload_only_node(1000, 50));
let cursor = Cursor::new(data);
let _parser = RowsParser::new(cursor).unwrap();
}
#[test]
fn test_partition_lookup() {
let data = make_bti_file(payload_only_node(1000, 50));
let cursor = Cursor::new(data);
let mut parser = PartitionsParser::new(cursor).unwrap();
let partition_key = vec![Value::Text("test_partition".to_string())];
let result = parser.lookup_partition(&partition_key).unwrap();
assert!(result.is_some());
}
#[test]
fn test_header_serialization_round_trip() {
let original_header = BtiHeader {
magic: BtiHeader::MAGIC,
version: BtiHeader::VERSION,
flags: 0x1234,
root_offset: 0x123456789ABCDEF0,
entry_count: 0xFEDCBA9876543210,
metadata_size: 0x12345678,
};
let serialized = original_header.to_bytes();
let (parsed_header, _) = BtiHeader::parse(&serialized).unwrap();
assert_eq!(original_header.magic, parsed_header.magic);
assert_eq!(original_header.version, parsed_header.version);
assert_eq!(original_header.flags, parsed_header.flags);
assert_eq!(original_header.root_offset, parsed_header.root_offset);
assert_eq!(original_header.entry_count, parsed_header.entry_count);
assert_eq!(original_header.metadata_size, parsed_header.metadata_size);
}
#[test]
fn read_be_unsigned_edge_cases() {
assert_eq!(read_be_unsigned(&[]), 0);
assert_eq!(read_be_unsigned(&[0xFF]), 255);
assert_eq!(read_be_unsigned(&[0x01, 0x00]), 256);
assert_eq!(read_be_unsigned(&[0xFF, 0xFF, 0xFF, 0xFF]), 0xFFFF_FFFF);
}
#[test]
fn read_12bit_packed_even_and_odd_indices() {
let data: &[u8] = &[0xAB, 0xC1, 0x23];
assert_eq!(read_12bit_packed(data, 0), 0xABC);
assert_eq!(read_12bit_packed(data, 1), 0x123);
}
}