Skip to main content

cqlite_core/storage/sstable/bti/
parser.rs

1//! BTI (Big Trie Index) parser implementation
2//!
3//! This module provides parsing capabilities for BTI format components:
4//! - Partitions.db BTI index for partition lookups
5//! - Rows.db BTI index for clustering key lookups within partitions
6//!
7//! ## Node-type encoding (TrieNode.java, Cassandra 5.0)
8//!
9//! The high nibble (bits 7-4) of every node's first byte is a 4-bit ordinal that
10//! selects one of up to 16 concrete trie-node subtypes:
11//!
12//! | Nibble | Java class          | Rust category    | Pointer size |
13//! |--------|---------------------|------------------|--------------|
14//! | 0      | PayloadOnly         | `PayloadOnly`    | —            |
15//! | 1      | SingleNoPayload4    | `Single`         | 4-bit delta  |
16//! | 2      | Single8             | `Single`         | 1 byte       |
17//! | 3      | SingleNoPayload12   | `Single`         | 12-bit delta |
18//! | 4      | Single16            | `Single`         | 2 bytes      |
19//! | 5      | Sparse8             | `Sparse`         | 1 byte       |
20//! | 6      | Sparse12            | `Sparse`         | 12-bit       |
21//! | 7      | Sparse16            | `Sparse`         | 2 bytes      |
22//! | 8      | Sparse24            | `Sparse`         | 3 bytes      |
23//! | 9      | Sparse40            | `Sparse`         | 5 bytes      |
24//! | 10     | Dense12             | `Dense`          | 12-bit       |
25//! | 11     | Dense16             | `Dense`          | 2 bytes      |
26//! | 12     | Dense24             | `Dense`          | 3 bytes      |
27//! | 13     | Dense32             | `Dense`          | 4 bytes      |
28//! | 14     | Dense40             | `Dense`          | 5 bytes      |
29//! | 15     | LongDense           | `Dense`          | 8 bytes      |
30//!
31//! The low nibble (bits 3-0) carries payload flags; a non-zero value means a
32//! payload follows immediately after the node's fixed-size header.
33//!
34//! All transition deltas are stored as *backward* distances (the child always
35//! appears earlier in the file), so the absolute child position is:
36//!   `child_pos = parent_pos - delta`
37//!
38//! The "no-payload" Single variants (nibbles 1 and 3) encode the delta in the
39//! low nibble / low 12 bits of the first two bytes respectively, and cannot
40//! carry a payload (their payload-flag nibble is always 0).
41
42use crate::{
43    error::Error,
44    storage::sstable::bti::{
45        encoder::ByteComparableEncoder,
46        node::{
47            BtiNode, BtiNodeData, BtiNodeType, BtiResult, PayloadRef, SizedPointer, Transition,
48            TrieNavigator,
49        },
50    },
51    types::Value,
52};
53use std::collections::HashMap;
54use std::io::{Read, Seek, SeekFrom};
55
56// ---------------------------------------------------------------------------
57// Shared node-parsing helpers
58// ---------------------------------------------------------------------------
59
60/// Classify the high nibble of a BTI node's first byte into one of the four
61/// Rust-level node categories.
62///
63/// The mapping follows the 16-entry `Types.values[]` array in `TrieNode.java`:
64/// - nibble 0               → `PayloadOnly`
65/// - nibbles 1-4 (Singles)  → `Single`
66/// - nibbles 5-9 (Sparses)  → `Sparse`
67/// - nibbles 10-15 (Denses) → `Dense`
68fn classify_node_nibble(nibble: u8) -> BtiResult<BtiNodeType> {
69    match nibble {
70        0 => Ok(BtiNodeType::PayloadOnly),
71        1..=4 => Ok(BtiNodeType::Single),
72        5..=9 => Ok(BtiNodeType::Sparse),
73        10..=15 => Ok(BtiNodeType::Dense),
74        other => Err(Error::Parse(format!(
75            "Invalid BTI node type nibble: {}",
76            other
77        ))),
78    }
79}
80
81/// Return the number of bytes used to store each child pointer for this node,
82/// given the raw high nibble (`ordinal`) from the first byte.
83///
84/// Matches the `bytesPerPointer` field of each `TrieNode` subtype in
85/// `TrieNode.java`.  Fractional (12-bit) encodings return `0` as a sentinel;
86/// callers that need to handle those cases do so explicitly.
87fn pointer_bytes_for_ordinal(ordinal: u8) -> u8 {
88    match ordinal {
89        0 => 0,  // PayloadOnly — no pointers
90        1 => 0,  // SingleNoPayload4  — 4-bit delta in low nibble, handled specially
91        2 => 1,  // Single8
92        3 => 0,  // SingleNoPayload12 — 12-bit delta across first two bytes
93        4 => 2,  // Single16
94        5 => 1,  // Sparse8
95        6 => 0,  // Sparse12 — 12-bit packed
96        7 => 2,  // Sparse16
97        8 => 3,  // Sparse24
98        9 => 5,  // Sparse40
99        10 => 0, // Dense12  — 12-bit packed
100        11 => 2, // Dense16
101        12 => 3, // Dense24
102        13 => 4, // Dense32
103        14 => 5, // Dense40
104        15 => 8, // LongDense
105        _ => 0,
106    }
107}
108
109/// Parse a BTI trie node from a raw byte slice.
110///
111/// This is the **single shared implementation** used by both
112/// [`PartitionsParser`] and [`RowsParser`].  It correctly dispatches all 16
113/// node-type ordinals defined in `TrieNode.java` and produces the appropriate
114/// [`BtiNode`] variant.
115///
116/// # Arguments
117/// * `data`   – raw bytes starting at the node's first byte
118/// * `offset` – absolute file offset of this node (used to compute child
119///   positions; see the module-level doc for the sign convention)
120///
121/// # Errors
122/// Returns a [`crate::error::Error::Parse`] if the data is too short, the
123/// node-type nibble is out of range, or any other structural invariant is
124/// violated.
125fn parse_bti_node(data: &[u8], offset: u64) -> BtiResult<BtiNode> {
126    if data.is_empty() {
127        return Err(Error::Parse("Empty BTI node data".to_string()));
128    }
129
130    let header_byte = data[0];
131    let ordinal = (header_byte >> 4) & 0x0F;
132    let payload_flags = header_byte & 0x0F;
133    let has_payload = payload_flags != 0;
134    let node_type = classify_node_nibble(ordinal)?;
135
136    match node_type {
137        BtiNodeType::PayloadOnly => {
138            // Layout: [header:1] [payload if has_payload]
139            // PayloadOnly MUST have a payload (it is a leaf node).
140            if !has_payload {
141                return Err(Error::Parse(
142                    "PayloadOnly node has no payload flags set".to_string(),
143                ));
144            }
145            let payload = parse_payload_ref(&data[1..])?;
146            Ok(BtiNode {
147                node_type,
148                level: 0,
149                key_prefix: Vec::new(),
150                data: BtiNodeData::PayloadOnly { payload },
151            })
152        }
153
154        BtiNodeType::Single => {
155            // Three concrete layouts depending on ordinal:
156            //
157            //   ordinal 1 (SingleNoPayload4):
158            //     byte 0: [1|delta_high4]  (delta is low 4 bits of byte 0)
159            //     byte 1: transition byte
160            //     NO payload
161            //
162            //   ordinal 3 (SingleNoPayload12):
163            //     byte 0: [3|delta_high4]
164            //     byte 1: delta_low8        → delta = (low4(byte0) << 8) | byte1
165            //     byte 2: transition byte
166            //     NO payload
167            //
168            //   ordinals 2, 4 (Single8, Single16):
169            //     byte 0: [ordinal|payload_flags]
170            //     byte 1: transition byte
171            //     bytes 2..(2+ptr_bytes): backward delta (unsigned big-endian)
172            //     [payload if has_payload]
173            match ordinal {
174                1 => {
175                    // SingleNoPayload4
176                    if data.len() < 2 {
177                        return Err(Error::Parse(
178                            "SingleNoPayload4 node data too short".to_string(),
179                        ));
180                    }
181                    let delta = (header_byte & 0x0F) as u64;
182                    let transition_byte = data[1];
183                    let child_offset = offset.saturating_sub(delta);
184                    Ok(BtiNode {
185                        node_type,
186                        level: 1,
187                        key_prefix: Vec::new(),
188                        data: BtiNodeData::Single {
189                            transition: Transition::new(
190                                transition_byte,
191                                SizedPointer::new(child_offset),
192                            ),
193                        },
194                    })
195                }
196                3 => {
197                    // SingleNoPayload12
198                    if data.len() < 3 {
199                        return Err(Error::Parse(
200                            "SingleNoPayload12 node data too short".to_string(),
201                        ));
202                    }
203                    let delta = (((header_byte & 0x0F) as u64) << 8) | (data[1] as u64);
204                    let transition_byte = data[2];
205                    let child_offset = offset.saturating_sub(delta);
206                    Ok(BtiNode {
207                        node_type,
208                        level: 1,
209                        key_prefix: Vec::new(),
210                        data: BtiNodeData::Single {
211                            transition: Transition::new(
212                                transition_byte,
213                                SizedPointer::new(child_offset),
214                            ),
215                        },
216                    })
217                }
218                _ => {
219                    // Single8 (ordinal 2) or Single16 (ordinal 4)
220                    let ptr_bytes = pointer_bytes_for_ordinal(ordinal) as usize;
221                    let needed = 2 + ptr_bytes;
222                    if data.len() < needed {
223                        return Err(Error::Parse(format!(
224                            "Single node (ordinal {}) data too short: need {} bytes, have {}",
225                            ordinal,
226                            needed,
227                            data.len()
228                        )));
229                    }
230                    let transition_byte = data[1];
231                    let delta = read_be_unsigned(&data[2..2 + ptr_bytes]);
232                    let child_offset = offset.saturating_sub(delta);
233                    let transition =
234                        Transition::new(transition_byte, SizedPointer::new(child_offset));
235                    Ok(BtiNode {
236                        node_type,
237                        level: 1,
238                        key_prefix: Vec::new(),
239                        data: BtiNodeData::Single { transition },
240                    })
241                }
242            }
243        }
244
245        BtiNodeType::Sparse => {
246            // Layout (ordinals 5, 7, 8, 9 — full-byte pointers):
247            //   byte 0: [ordinal|payload_flags]
248            //   byte 1: count (number of transitions, 1-255)
249            //   bytes 2..(2+count): transition bytes (sorted)
250            //   then count × ptr_bytes: backward deltas
251            //   [payload if has_payload]
252            //
253            // ordinal 6 (Sparse12) packs two 12-bit deltas into 3 bytes;
254            // that encoding is rare in practice and we decode it correctly
255            // but store the absolute child offsets in the same SizedPointer.
256            if data.len() < 2 {
257                return Err(Error::Parse("Sparse node data too short".to_string()));
258            }
259            let count = data[1] as usize;
260            if count == 0 {
261                return Err(Error::Parse(
262                    "Sparse node must have at least one transition".to_string(),
263                ));
264            }
265
266            let bytes_start = 2;
267            let pointers_start = bytes_start + count;
268
269            if ordinal == 6 {
270                // Sparse12: each pair of pointers packed into 3 bytes
271                let packed_len = (count * 3).div_ceil(2); // ceil(count * 12 / 8) = ceil(count * 3 / 2)
272                let needed = pointers_start + packed_len;
273                if data.len() < needed {
274                    return Err(Error::Parse(format!(
275                        "Sparse12 node data too short: need {}, have {}",
276                        needed,
277                        data.len()
278                    )));
279                }
280                let mut transitions = Vec::with_capacity(count);
281                for i in 0..count {
282                    let t_byte = data[bytes_start + i];
283                    let delta = read_12bit_packed(&data[pointers_start..], i);
284                    let child_offset = offset.saturating_sub(delta);
285                    transitions.push(Transition::new(t_byte, SizedPointer::new(child_offset)));
286                }
287                Ok(BtiNode {
288                    node_type,
289                    level: 1,
290                    key_prefix: Vec::new(),
291                    data: BtiNodeData::Sparse { transitions },
292                })
293            } else {
294                let ptr_bytes = pointer_bytes_for_ordinal(ordinal) as usize;
295                let needed = pointers_start + count * ptr_bytes;
296                if data.len() < needed {
297                    return Err(Error::Parse(format!(
298                        "Sparse node (ordinal {}) data too short: need {}, have {}",
299                        ordinal,
300                        needed,
301                        data.len()
302                    )));
303                }
304                let mut transitions = Vec::with_capacity(count);
305                for i in 0..count {
306                    let t_byte = data[bytes_start + i];
307                    let ptr_off = pointers_start + i * ptr_bytes;
308                    let delta = read_be_unsigned(&data[ptr_off..ptr_off + ptr_bytes]);
309                    let child_offset = offset.saturating_sub(delta);
310                    transitions.push(Transition::new(t_byte, SizedPointer::new(child_offset)));
311                }
312                Ok(BtiNode {
313                    node_type,
314                    level: 1,
315                    key_prefix: Vec::new(),
316                    data: BtiNodeData::Sparse { transitions },
317                })
318            }
319        }
320
321        BtiNodeType::Dense => {
322            // Layout (ordinals 11-14 — full-byte pointers):
323            //   byte 0: [ordinal|payload_flags]
324            //   byte 1: start byte (first transition character)
325            //   byte 2: (end - start), i.e. (range_len - 1)  → range_len = byte2+1
326            //   then range_len × ptr_bytes: backward deltas; 0 means "no transition"
327            //   [payload if has_payload]
328            //
329            // ordinal 10 (Dense12): packed 12-bit deltas, similar to Sparse12.
330            // ordinal 15 (LongDense): 8-byte deltas.
331            if data.len() < 3 {
332                return Err(Error::Parse("Dense node data too short".to_string()));
333            }
334            let start_byte = data[1];
335            let range_len = data[2] as usize + 1; // byte2 is (len-1)
336
337            if ordinal == 10 {
338                // Dense12: ceil(range_len * 12 / 8) = (range_len * 3 + 1) / 2
339                let packed_len = (range_len * 3).div_ceil(2);
340                let needed = 3 + packed_len;
341                if data.len() < needed {
342                    return Err(Error::Parse(format!(
343                        "Dense12 node data too short: need {}, have {}",
344                        needed,
345                        data.len()
346                    )));
347                }
348                let mut children = Vec::with_capacity(range_len);
349                for i in 0..range_len {
350                    let delta = read_12bit_packed(&data[3..], i);
351                    // delta == 0 means "no transition" for Dense nodes
352                    let child_offset = if delta == 0 {
353                        0
354                    } else {
355                        offset.saturating_sub(delta)
356                    };
357                    children.push(SizedPointer::new(child_offset));
358                }
359                Ok(BtiNode {
360                    node_type,
361                    level: 1,
362                    key_prefix: Vec::new(),
363                    data: BtiNodeData::Dense {
364                        start_byte,
365                        children,
366                    },
367                })
368            } else {
369                let ptr_bytes = pointer_bytes_for_ordinal(ordinal) as usize;
370                let needed = 3 + range_len * ptr_bytes;
371                if data.len() < needed {
372                    return Err(Error::Parse(format!(
373                        "Dense node (ordinal {}) data too short: need {}, have {}",
374                        ordinal,
375                        needed,
376                        data.len()
377                    )));
378                }
379                let mut children = Vec::with_capacity(range_len);
380                for i in 0..range_len {
381                    let ptr_off = 3 + i * ptr_bytes;
382                    let delta = read_be_unsigned(&data[ptr_off..ptr_off + ptr_bytes]);
383                    // delta == 0 means "no transition" for Dense nodes
384                    let child_offset = if delta == 0 {
385                        0
386                    } else {
387                        offset.saturating_sub(delta)
388                    };
389                    children.push(SizedPointer::new(child_offset));
390                }
391                Ok(BtiNode {
392                    node_type,
393                    level: 1,
394                    key_prefix: Vec::new(),
395                    data: BtiNodeData::Dense {
396                        start_byte,
397                        children,
398                    },
399                })
400            }
401        }
402    }
403}
404
405/// Read a big-endian unsigned integer of 0-8 bytes from `data`.
406///
407/// Returns 0 for an empty slice; panics if `data.len() > 8` (caller is
408/// responsible for bounds-checking first).
409fn read_be_unsigned(data: &[u8]) -> u64 {
410    let mut result = 0u64;
411    for &byte in data {
412        result = (result << 8) | (byte as u64);
413    }
414    result
415}
416
417/// Read a 12-bit value stored in the packed format used by Sparse12 / Dense12.
418///
419/// The packing (from `TrieNode.java#read12Bits`):
420/// ```text
421/// word = getShort(base + (3 * index) / 2)
422/// if (index & 1) == 0:  value = word >> 4
423/// else:                  value = word & 0xFFF
424/// ```
425fn read_12bit_packed(data: &[u8], index: usize) -> u64 {
426    let byte_offset = (3 * index) / 2;
427    if byte_offset + 1 >= data.len() {
428        return 0;
429    }
430    let word = ((data[byte_offset] as u16) << 8) | (data[byte_offset + 1] as u16);
431    let value = if (index & 1) == 0 {
432        word >> 4
433    } else {
434        word & 0x0FFF
435    };
436    value as u64
437}
438
439/// Parse a [`PayloadRef`] from the bytes immediately following a node header.
440///
441/// Format (matches `RowIndexReader.readPayload` / `PartitionIndex`):
442///   8 bytes big-endian: data file offset
443///   4 bytes big-endian: payload byte length
444fn parse_payload_ref(data: &[u8]) -> BtiResult<PayloadRef> {
445    if data.len() < 12 {
446        return Err(Error::Parse(format!(
447            "PayloadRef data too short: need 12 bytes, have {}",
448            data.len()
449        )));
450    }
451    let offset = u64::from_be_bytes([
452        data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7],
453    ]);
454    let length = u32::from_be_bytes([data[8], data[9], data[10], data[11]]);
455    Ok(PayloadRef::new(offset, length))
456}
457
458/// BTI header structure for index files
459#[derive(Debug, Clone)]
460pub struct BtiHeader {
461    /// BTI format magic number
462    pub magic: u32,
463    /// Format version
464    pub version: u16,
465    /// Format flags
466    pub flags: u16,
467    /// Offset to root node
468    pub root_offset: u64,
469    /// Number of entries in the index
470    pub entry_count: u64,
471    /// Additional metadata size
472    pub metadata_size: u32,
473}
474
475impl BtiHeader {
476    /// BTI magic number
477    pub const MAGIC: u32 = 0x6461_0000; // 'da\0\0'
478
479    /// Current BTI version
480    pub const VERSION: u16 = 0x0001;
481
482    /// Parse BTI header from bytes
483    pub fn parse(data: &[u8]) -> BtiResult<(Self, usize)> {
484        if data.len() < 24 {
485            return Err(Error::Parse("BTI header too short".to_string()));
486        }
487
488        let magic = u32::from_be_bytes([data[0], data[1], data[2], data[3]]);
489        if magic != Self::MAGIC {
490            return Err(Error::Parse(format!(
491                "Invalid BTI magic: 0x{:08x}, expected 0x{:08x}",
492                magic,
493                Self::MAGIC
494            )));
495        }
496
497        let version = u16::from_be_bytes([data[4], data[5]]);
498        if version != Self::VERSION {
499            return Err(Error::Parse(format!(
500                "Unsupported BTI version: 0x{:04x}, expected 0x{:04x}",
501                version,
502                Self::VERSION
503            )));
504        }
505
506        let flags = u16::from_be_bytes([data[6], data[7]]);
507        let root_offset = u64::from_be_bytes([
508            data[8], data[9], data[10], data[11], data[12], data[13], data[14], data[15],
509        ]);
510        let entry_count = u64::from_be_bytes([
511            data[16], data[17], data[18], data[19], data[20], data[21], data[22], data[23],
512        ]);
513
514        let metadata_size = if data.len() >= 28 {
515            u32::from_be_bytes([data[24], data[25], data[26], data[27]])
516        } else {
517            0
518        };
519
520        let header = BtiHeader {
521            magic,
522            version,
523            flags,
524            root_offset,
525            entry_count,
526            metadata_size,
527        };
528
529        let header_size = if metadata_size > 0 { 28 } else { 24 };
530        Ok((header, header_size))
531    }
532
533    /// Serialize header to bytes
534    pub fn to_bytes(&self) -> Vec<u8> {
535        let mut bytes = Vec::with_capacity(28);
536
537        bytes.extend_from_slice(&self.magic.to_be_bytes());
538        bytes.extend_from_slice(&self.version.to_be_bytes());
539        bytes.extend_from_slice(&self.flags.to_be_bytes());
540        bytes.extend_from_slice(&self.root_offset.to_be_bytes());
541        bytes.extend_from_slice(&self.entry_count.to_be_bytes());
542
543        if self.metadata_size > 0 {
544            bytes.extend_from_slice(&self.metadata_size.to_be_bytes());
545        }
546
547        bytes
548    }
549}
550
551/// Parser for Partitions.db BTI index
552pub struct PartitionsParser<R: Read + Seek> {
553    /// Input reader
554    reader: R,
555    /// BTI header
556    header: BtiHeader,
557    /// Byte-comparable encoder for key encoding
558    encoder: ByteComparableEncoder,
559    /// Node cache for performance
560    node_cache: HashMap<u64, BtiNode>,
561}
562
563impl<R: Read + Seek> PartitionsParser<R> {
564    /// Create new partitions parser
565    pub fn new(mut reader: R) -> BtiResult<Self> {
566        // Read and parse header
567        reader.seek(SeekFrom::Start(0))?;
568        let mut header_data = vec![0u8; 28];
569        reader.read_exact(&mut header_data)?;
570
571        let (header, _) = BtiHeader::parse(&header_data)?;
572
573        Ok(Self {
574            reader,
575            header,
576            encoder: ByteComparableEncoder::new(),
577            node_cache: HashMap::new(),
578        })
579    }
580
581    /// Lookup partition by key
582    pub fn lookup_partition(&mut self, partition_key: &[Value]) -> BtiResult<Option<PayloadRef>> {
583        // Encode partition key for lookup
584        let encoded_key = self.encoder.encode_composite_key(partition_key)?;
585
586        // Navigate trie to find the partition
587        let mut navigator = TrieNavigator::new(self.header.root_offset);
588
589        self.lookup_in_trie(&mut navigator, &encoded_key)
590    }
591
592    /// Navigate trie to find encoded key
593    fn lookup_in_trie(
594        &mut self,
595        navigator: &mut TrieNavigator,
596        encoded_key: &[u8],
597    ) -> BtiResult<Option<PayloadRef>> {
598        let mut key_pos = 0;
599
600        loop {
601            // Load current node
602            let current_node = self.load_node(navigator.current_offset)?;
603
604            // If this is a payload-only node (leaf), return its payload
605            if current_node.is_leaf() {
606                return Ok(current_node.get_payload().cloned());
607            }
608
609            // Check if we have a payload at this level (for prefix matches)
610            if let Some(payload) = current_node.get_payload() {
611                if key_pos >= encoded_key.len() {
612                    return Ok(Some(payload.clone()));
613                }
614            }
615
616            // If we've consumed all key bytes, return any payload we have
617            if key_pos >= encoded_key.len() {
618                return Ok(current_node.get_payload().cloned());
619            }
620
621            // Find transition for next byte
622            let next_byte = encoded_key[key_pos];
623            if let Some(child_pointer) = current_node.find_child(next_byte) {
624                navigator.navigate_to_child(next_byte, child_pointer)?;
625                key_pos += 1;
626            } else {
627                // No transition found - key doesn't exist
628                return Ok(None);
629            }
630        }
631    }
632
633    /// Load node from file
634    fn load_node(&mut self, offset: u64) -> BtiResult<BtiNode> {
635        if let Some(cached_node) = self.node_cache.get(&offset) {
636            return Ok(cached_node.clone());
637        }
638
639        // Read node from file
640        self.reader.seek(SeekFrom::Start(offset))?;
641        let mut node_data = vec![0u8; 4096]; // Read up to 4KB for node
642        let bytes_read = self.reader.read(&mut node_data)?;
643        node_data.truncate(bytes_read);
644
645        // Parse node
646        let node = self.parse_node_data(&node_data, offset)?;
647
648        // Cache the node
649        self.node_cache.insert(offset, node.clone());
650        Ok(node)
651    }
652
653    /// Parse node data from bytes.
654    ///
655    /// Delegates to the module-level [`parse_bti_node`] helper which handles
656    /// all 16 BTI node-type ordinals defined in `TrieNode.java`.
657    fn parse_node_data(&self, data: &[u8], offset: u64) -> BtiResult<BtiNode> {
658        parse_bti_node(data, offset)
659    }
660
661    /// Iterator over all partitions in the index
662    pub fn iterate_partitions(&mut self) -> BtiResult<PartitionIterator<'_, R>> {
663        PartitionIterator::new(self)
664    }
665
666    /// Get header information
667    pub fn header(&self) -> &BtiHeader {
668        &self.header
669    }
670
671    /// Get statistics about the index
672    pub fn get_stats(&self) -> BtiIndexStats {
673        BtiIndexStats {
674            entry_count: self.header.entry_count,
675            root_offset: self.header.root_offset,
676            cached_nodes: self.node_cache.len(),
677        }
678    }
679}
680
681/// Parser for Rows.db BTI index (clustering keys within a partition)
682pub struct RowsParser<R: Read + Seek> {
683    /// Input reader
684    reader: R,
685    /// BTI header
686    header: BtiHeader,
687    /// Byte-comparable encoder for key encoding
688    encoder: ByteComparableEncoder,
689    /// Node cache for performance
690    node_cache: HashMap<u64, BtiNode>,
691}
692
693impl<R: Read + Seek> RowsParser<R> {
694    /// Create new rows parser
695    pub fn new(mut reader: R) -> BtiResult<Self> {
696        // Read and parse header
697        reader.seek(SeekFrom::Start(0))?;
698        let mut header_data = vec![0u8; 28];
699        reader.read_exact(&mut header_data)?;
700
701        let (header, _) = BtiHeader::parse(&header_data)?;
702
703        Ok(Self {
704            reader,
705            header,
706            encoder: ByteComparableEncoder::new(),
707            node_cache: HashMap::new(),
708        })
709    }
710
711    /// Lookup row by clustering key
712    pub fn lookup_row(&mut self, clustering_key: &[Value]) -> BtiResult<Option<PayloadRef>> {
713        // Encode clustering key for lookup
714        let encoded_key = self.encoder.encode_composite_key(clustering_key)?;
715
716        // Navigate trie to find the row
717        let mut navigator = TrieNavigator::new(self.header.root_offset);
718
719        self.lookup_in_trie(&mut navigator, &encoded_key)
720    }
721
722    /// Navigate trie to find encoded key (similar to partitions parser)
723    fn lookup_in_trie(
724        &mut self,
725        navigator: &mut TrieNavigator,
726        encoded_key: &[u8],
727    ) -> BtiResult<Option<PayloadRef>> {
728        let mut key_pos = 0;
729
730        loop {
731            // Load current node
732            let current_node = self.load_node(navigator.current_offset)?;
733
734            // Check if we have a payload at this level
735            if let Some(payload) = current_node.get_payload() {
736                if key_pos >= encoded_key.len() {
737                    return Ok(Some(payload.clone()));
738                }
739            }
740
741            // If we've consumed all key bytes and this is a leaf, we found it
742            if key_pos >= encoded_key.len() {
743                return Ok(current_node.get_payload().cloned());
744            }
745
746            // Find transition for next byte
747            let next_byte = encoded_key[key_pos];
748            if let Some(child_pointer) = current_node.find_child(next_byte) {
749                navigator.navigate_to_child(next_byte, child_pointer)?;
750                key_pos += 1;
751            } else {
752                // No transition found - key doesn't exist
753                return Ok(None);
754            }
755        }
756    }
757
758    /// Load node from file (similar to partitions parser)
759    fn load_node(&mut self, offset: u64) -> BtiResult<BtiNode> {
760        if let Some(cached_node) = self.node_cache.get(&offset) {
761            return Ok(cached_node.clone());
762        }
763
764        // Read node from file
765        self.reader.seek(SeekFrom::Start(offset))?;
766        let mut node_data = vec![0u8; 4096]; // Read up to 4KB for node
767        let bytes_read = self.reader.read(&mut node_data)?;
768        node_data.truncate(bytes_read);
769
770        // Parse node
771        let node = self.parse_node_data(&node_data, offset)?;
772
773        // Cache the node
774        self.node_cache.insert(offset, node.clone());
775        Ok(node)
776    }
777
778    /// Parse node data from bytes.
779    ///
780    /// Delegates to the module-level [`parse_bti_node`] helper which handles
781    /// all 16 BTI node-type ordinals defined in `TrieNode.java`.
782    ///
783    /// Previously this was a stub that always returned `BtiNodeType::PayloadOnly`
784    /// regardless of the actual node type encoded in the header byte (#647).
785    fn parse_node_data(&self, data: &[u8], offset: u64) -> BtiResult<BtiNode> {
786        parse_bti_node(data, offset)
787    }
788
789    /// Range query for clustering keys
790    pub fn range_query(
791        &mut self,
792        start_key: &[Value],
793        end_key: &[Value],
794    ) -> BtiResult<Vec<PayloadRef>> {
795        let _encoded_start = self.encoder.encode_composite_key(start_key)?;
796        let _encoded_end = self.encoder.encode_composite_key(end_key)?;
797
798        let results = Vec::new();
799
800        // Navigate to start position
801        let _navigator = TrieNavigator::new(self.header.root_offset);
802
803        // For now, just return empty results - full implementation would traverse range
804        // TODO: Implement proper range traversal
805
806        Ok(results)
807    }
808
809    /// Iterator over all rows in the index
810    pub fn iterate_rows(&mut self) -> BtiResult<RowIterator<'_, R>> {
811        RowIterator::new(self)
812    }
813
814    /// Get header information
815    pub fn header(&self) -> &BtiHeader {
816        &self.header
817    }
818}
819
820/// Iterator over partitions in BTI index
821pub struct PartitionIterator<'a, R: Read + Seek> {
822    #[allow(dead_code)]
823    parser: &'a mut PartitionsParser<R>,
824    #[allow(dead_code)]
825    current_position: u64,
826    finished: bool,
827}
828
829impl<'a, R: Read + Seek> PartitionIterator<'a, R> {
830    fn new(parser: &'a mut PartitionsParser<R>) -> BtiResult<Self> {
831        let root_offset = parser.header.root_offset;
832        Ok(Self {
833            parser,
834            current_position: root_offset,
835            finished: false,
836        })
837    }
838}
839
840impl<'a, R: Read + Seek> Iterator for PartitionIterator<'a, R> {
841    type Item = BtiResult<(Vec<u8>, PayloadRef)>;
842
843    fn next(&mut self) -> Option<Self::Item> {
844        if self.finished {
845            return None;
846        }
847
848        // TODO: Implement proper trie traversal for iteration
849        // For now, just mark as finished
850        self.finished = true;
851        None
852    }
853}
854
855/// Iterator over rows in BTI index
856pub struct RowIterator<'a, R: Read + Seek> {
857    #[allow(dead_code)]
858    parser: &'a mut RowsParser<R>,
859    #[allow(dead_code)]
860    current_position: u64,
861    finished: bool,
862}
863
864impl<'a, R: Read + Seek> RowIterator<'a, R> {
865    fn new(parser: &'a mut RowsParser<R>) -> BtiResult<Self> {
866        let root_offset = parser.header.root_offset;
867        Ok(Self {
868            parser,
869            current_position: root_offset,
870            finished: false,
871        })
872    }
873}
874
875impl<'a, R: Read + Seek> Iterator for RowIterator<'a, R> {
876    type Item = BtiResult<(Vec<u8>, PayloadRef)>;
877
878    fn next(&mut self) -> Option<Self::Item> {
879        if self.finished {
880            return None;
881        }
882
883        // TODO: Implement proper trie traversal for iteration
884        // For now, just mark as finished
885        self.finished = true;
886        None
887    }
888}
889
890/// Statistics about BTI index
891#[derive(Debug, Clone)]
892pub struct BtiIndexStats {
893    /// Number of entries in the index
894    pub entry_count: u64,
895    /// Root node offset
896    pub root_offset: u64,
897    /// Number of cached nodes
898    pub cached_nodes: usize,
899}
900
901#[cfg(test)]
902mod tests {
903    use super::*;
904    use std::io::Cursor;
905
906    // -----------------------------------------------------------------------
907    // Helper: build a minimal valid BTI file with a given root node payload
908    // -----------------------------------------------------------------------
909
910    fn make_bti_file(root_node_bytes: Vec<u8>) -> Vec<u8> {
911        let root_offset: u64 = 64; // place root after header + padding
912        let mut data = Vec::new();
913        data.extend_from_slice(&BtiHeader::MAGIC.to_be_bytes());
914        data.extend_from_slice(&BtiHeader::VERSION.to_be_bytes());
915        data.extend_from_slice(&0u16.to_be_bytes()); // flags
916        data.extend_from_slice(&root_offset.to_be_bytes());
917        data.extend_from_slice(&1u64.to_be_bytes()); // entry_count
918        data.extend_from_slice(&0u32.to_be_bytes()); // metadata_size
919        while data.len() < root_offset as usize {
920            data.push(0);
921        }
922        data.extend(root_node_bytes);
923        data
924    }
925
926    // -----------------------------------------------------------------------
927    // Crafted node bytes per TrieNode.java
928    // -----------------------------------------------------------------------
929
930    /// PayloadOnly (ordinal 0) with a non-zero payload flag (nibble = 1).
931    /// Layout: [0x01] [8-byte data offset] [4-byte length]
932    fn payload_only_node(data_offset: u64, length: u32) -> Vec<u8> {
933        let mut v = vec![0x01u8]; // ordinal=0, payload_flags=1
934        v.extend_from_slice(&data_offset.to_be_bytes());
935        v.extend_from_slice(&length.to_be_bytes());
936        v
937    }
938
939    /// Single8 (ordinal 2): [0x20|pf] [transition_byte] [1-byte backward delta]
940    fn single8_node(payload_flags: u8, transition: u8, delta: u8) -> Vec<u8> {
941        vec![0x20 | (payload_flags & 0x0F), transition, delta]
942    }
943
944    /// SingleNoPayload4 (ordinal 1): delta in low 4 bits of first byte, no payload.
945    /// Layout: [0x10 | delta4] [transition_byte]
946    fn single_nopayload4_node(delta4: u8, transition: u8) -> Vec<u8> {
947        vec![0x10 | (delta4 & 0x0F), transition]
948    }
949
950    /// SingleNoPayload12 (ordinal 3): 12-bit delta across first 2 bytes, no payload.
951    /// Layout: [0x30 | delta_high4] [delta_low8] [transition_byte]
952    fn single_nopayload12_node(delta: u16, transition: u8) -> Vec<u8> {
953        vec![
954            0x30 | ((delta >> 8) as u8 & 0x0F),
955            (delta & 0xFF) as u8,
956            transition,
957        ]
958    }
959
960    /// Single16 (ordinal 4): [0x40|pf] [transition_byte] [2-byte big-endian delta]
961    fn single16_node(payload_flags: u8, transition: u8, delta: u16) -> Vec<u8> {
962        let mut v = vec![0x40 | (payload_flags & 0x0F), transition];
963        v.extend_from_slice(&delta.to_be_bytes());
964        v
965    }
966
967    /// Sparse8 (ordinal 5): [0x50|pf] [count] [count transition bytes] [count 1-byte deltas]
968    fn sparse8_node(payload_flags: u8, pairs: &[(u8, u8)]) -> Vec<u8> {
969        let mut v = vec![0x50 | (payload_flags & 0x0F), pairs.len() as u8];
970        for &(t, _) in pairs {
971            v.push(t);
972        }
973        for &(_, d) in pairs {
974            v.push(d);
975        }
976        v
977    }
978
979    /// Dense16 (ordinal 11): [0xB0|pf] [start] [len-1] [range * 2-byte deltas]
980    fn dense16_node(payload_flags: u8, start: u8, deltas: &[u16]) -> Vec<u8> {
981        let len = deltas.len() as u8;
982        let mut v = vec![0xB0 | (payload_flags & 0x0F), start, len - 1];
983        for &d in deltas {
984            v.extend_from_slice(&d.to_be_bytes());
985        }
986        v
987    }
988
989    /// LongDense (ordinal 15): [0xF0|pf] [start] [len-1] [range * 8-byte deltas]
990    fn long_dense_node(payload_flags: u8, start: u8, deltas: &[u64]) -> Vec<u8> {
991        let len = deltas.len() as u8;
992        let mut v = vec![0xF0 | (payload_flags & 0x0F), start, len - 1];
993        for &d in deltas {
994            v.extend_from_slice(&d.to_be_bytes());
995        }
996        v
997    }
998
999    /// Sparse12 (ordinal 6): packed 12-bit pointers.
1000    ///
1001    /// Layout per TrieNode.java Sparse12.serialize():
1002    ///   [0x60|pf] [count] [count transition bytes]
1003    ///   [ceil(count*3/2) packed-pointer bytes]
1004    ///
1005    /// Packing: for each even/odd pair (p0, p1) → 3 bytes [p0>>4, (p0<<4)|(p1>>8), p1&0xFF].
1006    /// An odd trailing pointer → 2 bytes [(pd << 4) as short big-endian].
1007    ///
1008    /// Total node bytes = 2 + count + ceil(count*3/2).
1009    /// count=1 → 5 bytes; count=2 → 7 bytes.
1010    fn sparse12_node(payload_flags: u8, pairs: &[(u8, u16)]) -> Vec<u8> {
1011        let count = pairs.len();
1012        let mut v = vec![0x60 | (payload_flags & 0x0F), count as u8];
1013        for &(t, _) in pairs {
1014            v.push(t);
1015        }
1016        // Pack pointers: process pairs, then trailing odd entry
1017        let mut i = 0;
1018        while i + 2 <= count {
1019            let p0 = pairs[i].1 as u32;
1020            let p1 = pairs[i + 1].1 as u32;
1021            v.push((p0 >> 4) as u8);
1022            v.push(((p0 << 4) | (p1 >> 8)) as u8);
1023            v.push((p1 & 0xFF) as u8);
1024            i += 2;
1025        }
1026        if i < count {
1027            // Trailing odd pointer: writeShort((short)(pd << 4)) big-endian
1028            let pd = pairs[i].1 as u32;
1029            let s = (pd << 4) as u16;
1030            v.extend_from_slice(&s.to_be_bytes());
1031        }
1032        v
1033    }
1034
1035    /// Sparse24 (ordinal 8): [0x80|pf] [count] [count transition bytes] [count * 3-byte big-endian deltas]
1036    fn sparse24_node(payload_flags: u8, pairs: &[(u8, u32)]) -> Vec<u8> {
1037        let count = pairs.len();
1038        let mut v = vec![0x80 | (payload_flags & 0x0F), count as u8];
1039        for &(t, _) in pairs {
1040            v.push(t);
1041        }
1042        for &(_, d) in pairs {
1043            // 3-byte big-endian
1044            v.push(((d >> 16) & 0xFF) as u8);
1045            v.push(((d >> 8) & 0xFF) as u8);
1046            v.push((d & 0xFF) as u8);
1047        }
1048        v
1049    }
1050
1051    /// Sparse40 (ordinal 9): [0x90|pf] [count] [count transition bytes] [count * 5-byte big-endian deltas]
1052    fn sparse40_node(payload_flags: u8, pairs: &[(u8, u64)]) -> Vec<u8> {
1053        let count = pairs.len();
1054        let mut v = vec![0x90 | (payload_flags & 0x0F), count as u8];
1055        for &(t, _) in pairs {
1056            v.push(t);
1057        }
1058        for &(_, d) in pairs {
1059            // 5-byte big-endian
1060            v.push(((d >> 32) & 0xFF) as u8);
1061            v.push(((d >> 24) & 0xFF) as u8);
1062            v.push(((d >> 16) & 0xFF) as u8);
1063            v.push(((d >> 8) & 0xFF) as u8);
1064            v.push((d & 0xFF) as u8);
1065        }
1066        v
1067    }
1068
1069    /// Dense12 (ordinal 10): packed 12-bit pointers for a contiguous byte range.
1070    ///
1071    /// Layout per TrieNode.java Dense12.serialize():
1072    ///   [0xA0|pf] [start_byte] [range_len - 1] [ceil(range_len*3/2) packed bytes]
1073    ///
1074    /// Packing matches write12Bits(): even index → [val>>4, carry=val<<4]; odd index → [carry|(val>>8), val&0xFF].
1075    fn dense12_node(payload_flags: u8, start: u8, deltas: &[u16]) -> Vec<u8> {
1076        let range_len = deltas.len();
1077        let mut v = vec![0xA0 | (payload_flags & 0x0F), start, (range_len - 1) as u8];
1078        let mut carry: u8 = 0;
1079        for (i, &d) in deltas.iter().enumerate() {
1080            let val = d as u32;
1081            if (i & 1) == 0 {
1082                v.push((val >> 4) as u8);
1083                carry = (val << 4) as u8;
1084            } else {
1085                v.push(carry | (val >> 8) as u8);
1086                v.push((val & 0xFF) as u8);
1087                carry = 0;
1088            }
1089        }
1090        // If odd number of entries, flush carry byte
1091        if (range_len & 1) == 1 {
1092            v.push(carry);
1093        }
1094        v
1095    }
1096
1097    // -----------------------------------------------------------------------
1098    // REGRESSION TEST — proves the pre-fix stub misbehaviour
1099    //
1100    // Before the fix, RowsParser::parse_node_data always returned a
1101    // BtiNodeType::PayloadOnly node regardless of the actual header nibble.
1102    // This test crafts a Single8 node (ordinal 2, high nibble = 0x2) and
1103    // verifies that parse_bti_node correctly identifies it as Single, NOT
1104    // PayloadOnly.  On the old code this assertion would fail.
1105    // -----------------------------------------------------------------------
1106    #[test]
1107    fn regression_rows_parser_single_node_not_mislabeled_as_payload_only() {
1108        // Craft a Single8 (ordinal 2) node: nibble = 0x2 → must NOT be PayloadOnly.
1109        // The old stub parsed the header byte and then threw away the result,
1110        // always constructing BtiNodeType::PayloadOnly.
1111        //
1112        // Node layout (Single8, no payload, delta=5):
1113        //   byte 0: 0x20  (ordinal=2, payload_flags=0)
1114        //   byte 1: 0x61  ('a' transition)
1115        //   byte 2: 0x05  (backward delta = 5)
1116        let node_bytes = single8_node(0, b'a', 5);
1117        let offset: u64 = 100;
1118
1119        let node = parse_bti_node(&node_bytes, offset)
1120            .expect("parse_bti_node must succeed for a valid Single8 node");
1121
1122        // Core regression assertion: the stub returned PayloadOnly here.
1123        assert_eq!(
1124            node.node_type,
1125            BtiNodeType::Single,
1126            "Single8 node (nibble 0x2) was mislabeled as {:?} — regression from #647 stub",
1127            node.node_type,
1128        );
1129
1130        // Structural check: child offset = parent - delta = 100 - 5 = 95
1131        match &node.data {
1132            BtiNodeData::Single { transition } => {
1133                assert_eq!(transition.byte, b'a');
1134                assert_eq!(
1135                    transition.child.distance, 95,
1136                    "child offset should be parent(100) - delta(5) = 95"
1137                );
1138            }
1139            other => panic!("Expected BtiNodeData::Single, got {:?}", other),
1140        }
1141    }
1142
1143    // -----------------------------------------------------------------------
1144    // parse_bti_node: PayloadOnly (ordinal 0)
1145    // -----------------------------------------------------------------------
1146
1147    #[test]
1148    fn parse_bti_node_payload_only_correct_type_and_offsets() {
1149        // ordinal=0, payload_flags=1 → PayloadOnly with payload
1150        let node = payload_only_node(0xDEAD_BEEF_0000_1234, 42);
1151        let parsed = parse_bti_node(&node, 0).unwrap();
1152        assert_eq!(parsed.node_type, BtiNodeType::PayloadOnly);
1153        match &parsed.data {
1154            BtiNodeData::PayloadOnly { payload } => {
1155                assert_eq!(payload.offset, 0xDEAD_BEEF_0000_1234);
1156                assert_eq!(payload.length, 42);
1157            }
1158            other => panic!("Expected PayloadOnly, got {:?}", other),
1159        }
1160    }
1161
1162    #[test]
1163    fn parse_bti_node_payload_only_no_payload_flags_is_error() {
1164        // ordinal=0, payload_flags=0 → error (leaf must have a payload)
1165        let node_bytes = vec![0x00u8]; // nibble=0, flags=0
1166        let err = parse_bti_node(&node_bytes, 0);
1167        assert!(err.is_err(), "PayloadOnly with flags=0 should be an error");
1168    }
1169
1170    // -----------------------------------------------------------------------
1171    // parse_bti_node: Single family (ordinals 1-4)
1172    // -----------------------------------------------------------------------
1173
1174    #[test]
1175    fn parse_bti_node_single_nopayload4_ordinal1() {
1176        // delta=3 encoded in low nibble; transition = b'x'; parent offset = 50
1177        let node_bytes = single_nopayload4_node(3, b'x');
1178        let parsed = parse_bti_node(&node_bytes, 50).unwrap();
1179        assert_eq!(parsed.node_type, BtiNodeType::Single);
1180        match &parsed.data {
1181            BtiNodeData::Single { transition } => {
1182                assert_eq!(transition.byte, b'x');
1183                assert_eq!(transition.child.distance, 47); // 50 - 3
1184            }
1185            other => panic!("Expected Single, got {:?}", other),
1186        }
1187    }
1188
1189    #[test]
1190    fn parse_bti_node_single8_ordinal2() {
1191        let node_bytes = single8_node(0, b'z', 10);
1192        let parsed = parse_bti_node(&node_bytes, 200).unwrap();
1193        assert_eq!(parsed.node_type, BtiNodeType::Single);
1194        match &parsed.data {
1195            BtiNodeData::Single { transition } => {
1196                assert_eq!(transition.byte, b'z');
1197                assert_eq!(transition.child.distance, 190); // 200 - 10
1198            }
1199            other => panic!("Expected Single, got {:?}", other),
1200        }
1201    }
1202
1203    #[test]
1204    fn parse_bti_node_single_nopayload12_ordinal3() {
1205        // delta = 0x123 (291): high 4 bits in byte0 low nibble, low 8 in byte1
1206        let node_bytes = single_nopayload12_node(0x123, b'k');
1207        let parsed = parse_bti_node(&node_bytes, 1000).unwrap();
1208        assert_eq!(parsed.node_type, BtiNodeType::Single);
1209        match &parsed.data {
1210            BtiNodeData::Single { transition } => {
1211                assert_eq!(transition.byte, b'k');
1212                assert_eq!(transition.child.distance, 1000 - 0x123);
1213            }
1214            other => panic!("Expected Single, got {:?}", other),
1215        }
1216    }
1217
1218    #[test]
1219    fn parse_bti_node_single16_ordinal4() {
1220        // delta = 0x0400 (1024)
1221        let node_bytes = single16_node(0, b'm', 0x0400);
1222        let parsed = parse_bti_node(&node_bytes, 2048).unwrap();
1223        assert_eq!(parsed.node_type, BtiNodeType::Single);
1224        match &parsed.data {
1225            BtiNodeData::Single { transition } => {
1226                assert_eq!(transition.byte, b'm');
1227                assert_eq!(transition.child.distance, 2048 - 1024);
1228            }
1229            other => panic!("Expected Single, got {:?}", other),
1230        }
1231    }
1232
1233    // -----------------------------------------------------------------------
1234    // parse_bti_node: Sparse family (ordinals 5, 7, 8, 9)
1235    // -----------------------------------------------------------------------
1236
1237    #[test]
1238    fn parse_bti_node_sparse8_ordinal5_two_transitions() {
1239        // Two transitions: (b'a', delta=10), (b'b', delta=20); parent offset=100
1240        let node_bytes = sparse8_node(0, &[(b'a', 10), (b'b', 20)]);
1241        let parsed = parse_bti_node(&node_bytes, 100).unwrap();
1242        assert_eq!(parsed.node_type, BtiNodeType::Sparse);
1243        match &parsed.data {
1244            BtiNodeData::Sparse { transitions } => {
1245                assert_eq!(transitions.len(), 2);
1246                assert_eq!(transitions[0].byte, b'a');
1247                assert_eq!(transitions[0].child.distance, 90); // 100 - 10
1248                assert_eq!(transitions[1].byte, b'b');
1249                assert_eq!(transitions[1].child.distance, 80); // 100 - 20
1250            }
1251            other => panic!("Expected Sparse, got {:?}", other),
1252        }
1253    }
1254
1255    #[test]
1256    fn parse_bti_node_sparse16_ordinal7_three_transitions() {
1257        // Sparse16: ordinal=7, 2-byte deltas
1258        // Layout: [0x70|pf] [count] [count bytes] [count * 2 byte deltas]
1259        let payload_flags = 0u8;
1260        let pairs: &[(u8, u16)] = &[(b'x', 0x0010), (b'y', 0x0020), (b'z', 0x0030)];
1261        let mut node_bytes = vec![0x70 | payload_flags, pairs.len() as u8];
1262        for &(t, _) in pairs {
1263            node_bytes.push(t);
1264        }
1265        for &(_, d) in pairs {
1266            node_bytes.extend_from_slice(&d.to_be_bytes());
1267        }
1268        let parsed = parse_bti_node(&node_bytes, 0x100).unwrap();
1269        assert_eq!(parsed.node_type, BtiNodeType::Sparse);
1270        match &parsed.data {
1271            BtiNodeData::Sparse { transitions } => {
1272                assert_eq!(transitions.len(), 3);
1273                assert_eq!(transitions[0].byte, b'x');
1274                assert_eq!(transitions[0].child.distance, 0x100 - 0x0010);
1275                assert_eq!(transitions[2].byte, b'z');
1276                assert_eq!(transitions[2].child.distance, 0x100 - 0x0030);
1277            }
1278            other => panic!("Expected Sparse, got {:?}", other),
1279        }
1280    }
1281
1282    // -----------------------------------------------------------------------
1283    // parse_bti_node: Sparse12 (ordinal 6) — exact-minimal size tests
1284    //
1285    // Per TrieNode.java Sparse12.payloadPosition:
1286    //   total = position + 2 + (5*count+1)/2
1287    //         = 2 + count_transition_bytes_region + ceil(count*3/2) pointer_region
1288    // count=1 → 5 bytes; count=2 → 7 bytes.
1289    // The old formula used (count*5).div_ceil(2) for the pointer region alone,
1290    // which over-counted by `count` bytes; this was fixed to (count*3).div_ceil(2).
1291    // -----------------------------------------------------------------------
1292
1293    #[test]
1294    fn parse_bti_node_sparse12_ordinal6_count1_exact_minimal_5_bytes() {
1295        // count=1: exact-minimal node is 5 bytes.
1296        // Layout: [0x60] [0x01] [transition_byte] [2-byte packed 12-bit pointer]
1297        // delta = 0xABC (2748), packed as big-endian short (0xABC << 4 = 0xABC0).
1298        // Parent offset = 0x1000; child = 0x1000 - 0xABC = 0x544.
1299        let node_bytes = sparse12_node(0, &[(b'a', 0xABC)]);
1300        assert_eq!(
1301            node_bytes.len(),
1302            5,
1303            "Sparse12 count=1 must be exactly 5 bytes (was over-counted as 6 with old formula)"
1304        );
1305        let offset: u64 = 0x1000;
1306        let parsed = parse_bti_node(&node_bytes, offset)
1307            .expect("exact-minimal Sparse12 count=1 must parse successfully");
1308        assert_eq!(parsed.node_type, BtiNodeType::Sparse);
1309        match &parsed.data {
1310            BtiNodeData::Sparse { transitions } => {
1311                assert_eq!(transitions.len(), 1);
1312                assert_eq!(transitions[0].byte, b'a');
1313                assert_eq!(
1314                    transitions[0].child.distance,
1315                    offset - 0xABC,
1316                    "child offset = parent(0x1000) - delta(0xABC) = 0x544"
1317                );
1318            }
1319            other => panic!("Expected BtiNodeData::Sparse, got {:?}", other),
1320        }
1321    }
1322
1323    #[test]
1324    fn parse_bti_node_sparse12_ordinal6_count2_exact_minimal_7_bytes() {
1325        // count=2: exact-minimal node is 7 bytes.
1326        // Layout: [0x60] [0x02] [t0] [t1] [3-byte packed for two 12-bit pointers]
1327        // p0=0x100 (256), p1=0x200 (512); packed: [0x10, 0x02, 0x00].
1328        // Parent offset = 0x800.
1329        let node_bytes = sparse12_node(0, &[(b'x', 0x100), (b'y', 0x200)]);
1330        assert_eq!(
1331            node_bytes.len(),
1332            7,
1333            "Sparse12 count=2 must be exactly 7 bytes"
1334        );
1335        let offset: u64 = 0x800;
1336        let parsed = parse_bti_node(&node_bytes, offset)
1337            .expect("exact-minimal Sparse12 count=2 must parse successfully");
1338        assert_eq!(parsed.node_type, BtiNodeType::Sparse);
1339        match &parsed.data {
1340            BtiNodeData::Sparse { transitions } => {
1341                assert_eq!(transitions.len(), 2);
1342                assert_eq!(transitions[0].byte, b'x');
1343                assert_eq!(transitions[0].child.distance, offset - 0x100);
1344                assert_eq!(transitions[1].byte, b'y');
1345                assert_eq!(transitions[1].child.distance, offset - 0x200);
1346            }
1347            other => panic!("Expected BtiNodeData::Sparse, got {:?}", other),
1348        }
1349    }
1350
1351    // -----------------------------------------------------------------------
1352    // parse_bti_node: Sparse24 (ordinal 8) — exact-minimal size test
1353    //
1354    // Per TrieNode.java Sparse.payloadPosition:
1355    //   total = 2 + (bytesPerPointer + 1) * count = 2 + 4*count
1356    // count=1 → 6 bytes: [header][count][transition][3-byte delta]
1357    // -----------------------------------------------------------------------
1358
1359    #[test]
1360    fn parse_bti_node_sparse24_ordinal8_count1_exact_minimal_6_bytes() {
1361        // delta = 0x010203 (66051)
1362        let node_bytes = sparse24_node(0, &[(b'p', 0x010203)]);
1363        assert_eq!(
1364            node_bytes.len(),
1365            6,
1366            "Sparse24 count=1 must be exactly 6 bytes"
1367        );
1368        let offset: u64 = 0x20000;
1369        let parsed = parse_bti_node(&node_bytes, offset)
1370            .expect("exact-minimal Sparse24 count=1 must parse successfully");
1371        assert_eq!(parsed.node_type, BtiNodeType::Sparse);
1372        match &parsed.data {
1373            BtiNodeData::Sparse { transitions } => {
1374                assert_eq!(transitions.len(), 1);
1375                assert_eq!(transitions[0].byte, b'p');
1376                assert_eq!(transitions[0].child.distance, offset - 0x010203);
1377            }
1378            other => panic!("Expected BtiNodeData::Sparse, got {:?}", other),
1379        }
1380    }
1381
1382    // -----------------------------------------------------------------------
1383    // parse_bti_node: Sparse40 (ordinal 9) — exact-minimal size test
1384    //
1385    // Per TrieNode.java Sparse.payloadPosition:
1386    //   total = 2 + (bytesPerPointer + 1) * count = 2 + 6*count
1387    // count=1 → 8 bytes: [header][count][transition][5-byte delta]
1388    // -----------------------------------------------------------------------
1389
1390    #[test]
1391    fn parse_bti_node_sparse40_ordinal9_count1_exact_minimal_8_bytes() {
1392        // delta = 0x0000_0001_0000 (65536)
1393        let delta: u64 = 0x0000_0001_0000;
1394        let node_bytes = sparse40_node(0, &[(b'q', delta)]);
1395        assert_eq!(
1396            node_bytes.len(),
1397            8,
1398            "Sparse40 count=1 must be exactly 8 bytes"
1399        );
1400        let offset: u64 = 0x0010_0000;
1401        let parsed = parse_bti_node(&node_bytes, offset)
1402            .expect("exact-minimal Sparse40 count=1 must parse successfully");
1403        assert_eq!(parsed.node_type, BtiNodeType::Sparse);
1404        match &parsed.data {
1405            BtiNodeData::Sparse { transitions } => {
1406                assert_eq!(transitions.len(), 1);
1407                assert_eq!(transitions[0].byte, b'q');
1408                assert_eq!(transitions[0].child.distance, offset - delta);
1409            }
1410            other => panic!("Expected BtiNodeData::Sparse, got {:?}", other),
1411        }
1412    }
1413
1414    // -----------------------------------------------------------------------
1415    // parse_bti_node: Dense12 (ordinal 10) — exact-minimal size test
1416    //
1417    // Per TrieNode.java Dense12.payloadPosition:
1418    //   total = 3 + (range_len*3 + 1)/2 = 3 + ceil(range_len*3/2)
1419    // range_len=1 → 5 bytes: [header][start][0x00][2-byte packed 12-bit pointer]
1420    // range_len=2 → 6 bytes: [header][start][0x01][3-byte packed for two 12-bit values]
1421    // The existing Dense12 bounds formula is correct; this test pins it.
1422    // -----------------------------------------------------------------------
1423
1424    #[test]
1425    fn parse_bti_node_dense12_ordinal10_range1_exact_minimal_5_bytes() {
1426        // range_len=1 (start=b'A', end=b'A'): delta=0x123 (291).
1427        let node_bytes = dense12_node(0, b'A', &[0x123]);
1428        assert_eq!(
1429            node_bytes.len(),
1430            5,
1431            "Dense12 range_len=1 must be exactly 5 bytes"
1432        );
1433        let offset: u64 = 0x500;
1434        let parsed = parse_bti_node(&node_bytes, offset)
1435            .expect("exact-minimal Dense12 range=1 must parse successfully");
1436        assert_eq!(parsed.node_type, BtiNodeType::Dense);
1437        match &parsed.data {
1438            BtiNodeData::Dense {
1439                start_byte,
1440                children,
1441            } => {
1442                assert_eq!(*start_byte, b'A');
1443                assert_eq!(children.len(), 1);
1444                assert_eq!(children[0].distance, offset - 0x123);
1445            }
1446            other => panic!("Expected BtiNodeData::Dense, got {:?}", other),
1447        }
1448    }
1449
1450    #[test]
1451    fn parse_bti_node_dense12_ordinal10_range2_exact_minimal_6_bytes() {
1452        // range_len=2 (start=b'A', spans 'A' and 'B'):
1453        // delta[0]=0x100, delta[1]=0x200 (0 = no-child for Dense).
1454        let node_bytes = dense12_node(0, b'A', &[0x100, 0x200]);
1455        assert_eq!(
1456            node_bytes.len(),
1457            6,
1458            "Dense12 range_len=2 must be exactly 6 bytes"
1459        );
1460        let offset: u64 = 0x800;
1461        let parsed = parse_bti_node(&node_bytes, offset)
1462            .expect("exact-minimal Dense12 range=2 must parse successfully");
1463        assert_eq!(parsed.node_type, BtiNodeType::Dense);
1464        match &parsed.data {
1465            BtiNodeData::Dense {
1466                start_byte,
1467                children,
1468            } => {
1469                assert_eq!(*start_byte, b'A');
1470                assert_eq!(children.len(), 2);
1471                assert_eq!(children[0].distance, offset - 0x100);
1472                assert_eq!(children[1].distance, offset - 0x200);
1473            }
1474            other => panic!("Expected BtiNodeData::Dense, got {:?}", other),
1475        }
1476    }
1477
1478    // -----------------------------------------------------------------------
1479    // parse_bti_node: Dense family (ordinals 11-15)
1480    // -----------------------------------------------------------------------
1481
1482    #[test]
1483    fn parse_bti_node_dense16_ordinal11_three_children() {
1484        // Dense16: ordinal=11 (0xB), start=b'a', range=['a','b','c'], deltas
1485        // 0 means "no child" for Dense nodes.
1486        let node_bytes = dense16_node(0, b'a', &[0x0010, 0x0000, 0x0030]);
1487        let parsed = parse_bti_node(&node_bytes, 0x200).unwrap();
1488        assert_eq!(parsed.node_type, BtiNodeType::Dense);
1489        match &parsed.data {
1490            BtiNodeData::Dense {
1491                start_byte,
1492                children,
1493            } => {
1494                assert_eq!(*start_byte, b'a');
1495                assert_eq!(children.len(), 3);
1496                // child 0 (b'a'): offset = 0x200 - 0x0010 = 0x1F0
1497                assert_eq!(children[0].distance, 0x200 - 0x0010);
1498                // child 1 (b'b'): delta=0 → no child, offset=0
1499                assert_eq!(children[1].distance, 0);
1500                // child 2 (b'c'): offset = 0x200 - 0x0030 = 0x1D0
1501                assert_eq!(children[2].distance, 0x200 - 0x0030);
1502            }
1503            other => panic!("Expected Dense, got {:?}", other),
1504        }
1505    }
1506
1507    #[test]
1508    fn parse_bti_node_long_dense_ordinal15_two_children() {
1509        // LongDense: ordinal=15 (0xF), 8-byte deltas
1510        let node_bytes = long_dense_node(0, b'A', &[0x0000_0000_0000_0100, 0x0000_0000_0000_0200]);
1511        let parsed = parse_bti_node(&node_bytes, 0x10000).unwrap();
1512        assert_eq!(parsed.node_type, BtiNodeType::Dense);
1513        match &parsed.data {
1514            BtiNodeData::Dense {
1515                start_byte,
1516                children,
1517            } => {
1518                assert_eq!(*start_byte, b'A');
1519                assert_eq!(children.len(), 2);
1520                assert_eq!(children[0].distance, 0x10000 - 0x100);
1521                assert_eq!(children[1].distance, 0x10000 - 0x200);
1522            }
1523            other => panic!("Expected Dense, got {:?}", other),
1524        }
1525    }
1526
1527    // -----------------------------------------------------------------------
1528    // classify_node_nibble: all 16 nibbles map to the right category
1529    // -----------------------------------------------------------------------
1530
1531    #[test]
1532    fn classify_node_nibble_all_ordinals() {
1533        // Ordinal 0 → PayloadOnly
1534        assert_eq!(classify_node_nibble(0).unwrap(), BtiNodeType::PayloadOnly);
1535        // Ordinals 1-4 → Single
1536        for n in 1u8..=4 {
1537            assert_eq!(
1538                classify_node_nibble(n).unwrap(),
1539                BtiNodeType::Single,
1540                "ordinal {} should be Single",
1541                n
1542            );
1543        }
1544        // Ordinals 5-9 → Sparse
1545        for n in 5u8..=9 {
1546            assert_eq!(
1547                classify_node_nibble(n).unwrap(),
1548                BtiNodeType::Sparse,
1549                "ordinal {} should be Sparse",
1550                n
1551            );
1552        }
1553        // Ordinals 10-15 → Dense
1554        for n in 10u8..=15 {
1555            assert_eq!(
1556                classify_node_nibble(n).unwrap(),
1557                BtiNodeType::Dense,
1558                "ordinal {} should be Dense",
1559                n
1560            );
1561        }
1562    }
1563
1564    // -----------------------------------------------------------------------
1565    // RowsParser: non-PayloadOnly node is correctly parsed (was broken before)
1566    // -----------------------------------------------------------------------
1567
1568    /// Integration test: embed a Sparse8 node as the root of a Rows.db file
1569    /// and verify RowsParser reads it as Sparse (not PayloadOnly).
1570    #[test]
1571    fn rows_parser_sparse_root_node_not_mislabeled() {
1572        // Build a Rows.db file whose root is a Sparse8 node.
1573        // The stub would have returned PayloadOnly; real code must return Sparse.
1574        let root_node = sparse8_node(0, &[(b'a', 5), (b'b', 10)]);
1575        let data = make_bti_file(root_node);
1576        let cursor = Cursor::new(data);
1577        let mut parser = RowsParser::new(cursor).unwrap();
1578
1579        // Force the root node to be loaded and parsed.
1580        let root_offset = parser.header.root_offset;
1581        let node = parser.load_node(root_offset).unwrap();
1582
1583        assert_eq!(
1584            node.node_type,
1585            BtiNodeType::Sparse,
1586            "RowsParser returned {:?} for a Sparse8 root node — regression from #647",
1587            node.node_type
1588        );
1589        assert_eq!(node.child_count(), 2);
1590    }
1591
1592    /// Integration test: embed a Dense16 node as the root of a Rows.db file.
1593    #[test]
1594    fn rows_parser_dense_root_node_not_mislabeled() {
1595        let root_node = dense16_node(0, b'0', &[0x0020, 0x0000, 0x0040]);
1596        let data = make_bti_file(root_node);
1597        let cursor = Cursor::new(data);
1598        let mut parser = RowsParser::new(cursor).unwrap();
1599        let root_offset = parser.header.root_offset;
1600        let node = parser.load_node(root_offset).unwrap();
1601
1602        assert_eq!(
1603            node.node_type,
1604            BtiNodeType::Dense,
1605            "RowsParser returned {:?} for a Dense16 root node",
1606            node.node_type
1607        );
1608    }
1609
1610    /// Integration test: embed a SingleNoPayload4 node as the root of a Rows.db file.
1611    #[test]
1612    fn rows_parser_single_nopayload4_root_node_not_mislabeled() {
1613        let root_offset_val: u64 = 64;
1614        // delta=3: child is at 64-3=61, but 61 < 64 so saturating_sub keeps it valid
1615        let root_node = single_nopayload4_node(3, b'q');
1616        let data = make_bti_file(root_node);
1617        let cursor = Cursor::new(data);
1618        let mut parser = RowsParser::new(cursor).unwrap();
1619        let root_offset = parser.header.root_offset;
1620        assert_eq!(root_offset, root_offset_val);
1621        let node = parser.load_node(root_offset).unwrap();
1622
1623        assert_eq!(
1624            node.node_type,
1625            BtiNodeType::Single,
1626            "RowsParser returned {:?} for a SingleNoPayload4 root node",
1627            node.node_type
1628        );
1629        match &node.data {
1630            BtiNodeData::Single { transition } => {
1631                assert_eq!(transition.byte, b'q');
1632                assert_eq!(transition.child.distance, root_offset_val - 3);
1633            }
1634            other => panic!("Expected Single data, got {:?}", other),
1635        }
1636    }
1637
1638    // -----------------------------------------------------------------------
1639    // Existing tests (preserved from before the fix)
1640    // -----------------------------------------------------------------------
1641
1642    #[test]
1643    fn test_bti_header_parsing() {
1644        let mut header_data = Vec::new();
1645        header_data.extend_from_slice(&BtiHeader::MAGIC.to_be_bytes());
1646        header_data.extend_from_slice(&BtiHeader::VERSION.to_be_bytes());
1647        header_data.extend_from_slice(&0u16.to_be_bytes()); // flags
1648        header_data.extend_from_slice(&1024u64.to_be_bytes()); // root_offset
1649        header_data.extend_from_slice(&100u64.to_be_bytes()); // entry_count
1650
1651        let (header, size) = BtiHeader::parse(&header_data).unwrap();
1652        assert_eq!(header.magic, BtiHeader::MAGIC);
1653        assert_eq!(header.version, BtiHeader::VERSION);
1654        assert_eq!(header.root_offset, 1024);
1655        assert_eq!(header.entry_count, 100);
1656        assert_eq!(size, 24);
1657    }
1658
1659    #[test]
1660    fn test_partitions_parser_creation() {
1661        let data = make_bti_file(payload_only_node(1000, 50));
1662        let cursor = Cursor::new(data);
1663        let _parser = PartitionsParser::new(cursor).unwrap();
1664    }
1665
1666    #[test]
1667    fn test_rows_parser_creation() {
1668        let data = make_bti_file(payload_only_node(1000, 50));
1669        let cursor = Cursor::new(data);
1670        let _parser = RowsParser::new(cursor).unwrap();
1671    }
1672
1673    #[test]
1674    fn test_partition_lookup() {
1675        let data = make_bti_file(payload_only_node(1000, 50));
1676        let cursor = Cursor::new(data);
1677        let mut parser = PartitionsParser::new(cursor).unwrap();
1678
1679        // Test lookup with simple key — PayloadOnly root returns its payload immediately
1680        let partition_key = vec![Value::Text("test_partition".to_string())];
1681        let result = parser.lookup_partition(&partition_key).unwrap();
1682        assert!(result.is_some());
1683    }
1684
1685    #[test]
1686    fn test_header_serialization_round_trip() {
1687        let original_header = BtiHeader {
1688            magic: BtiHeader::MAGIC,
1689            version: BtiHeader::VERSION,
1690            flags: 0x1234,
1691            root_offset: 0x123456789ABCDEF0,
1692            entry_count: 0xFEDCBA9876543210,
1693            metadata_size: 0x12345678,
1694        };
1695
1696        let serialized = original_header.to_bytes();
1697        let (parsed_header, _) = BtiHeader::parse(&serialized).unwrap();
1698
1699        assert_eq!(original_header.magic, parsed_header.magic);
1700        assert_eq!(original_header.version, parsed_header.version);
1701        assert_eq!(original_header.flags, parsed_header.flags);
1702        assert_eq!(original_header.root_offset, parsed_header.root_offset);
1703        assert_eq!(original_header.entry_count, parsed_header.entry_count);
1704        assert_eq!(original_header.metadata_size, parsed_header.metadata_size);
1705    }
1706
1707    // -----------------------------------------------------------------------
1708    // Helper functions used in tests above (these are tested implicitly)
1709    // -----------------------------------------------------------------------
1710
1711    #[test]
1712    fn read_be_unsigned_edge_cases() {
1713        assert_eq!(read_be_unsigned(&[]), 0);
1714        assert_eq!(read_be_unsigned(&[0xFF]), 255);
1715        assert_eq!(read_be_unsigned(&[0x01, 0x00]), 256);
1716        assert_eq!(read_be_unsigned(&[0xFF, 0xFF, 0xFF, 0xFF]), 0xFFFF_FFFF);
1717    }
1718
1719    #[test]
1720    fn read_12bit_packed_even_and_odd_indices() {
1721        // Pack two values: index 0 = 0xABC, index 1 = 0x123
1722        // Bytes:  [0xAB, 0xC1, 0x23]  → word0 = 0xABC1 >> 4 = 0xABC; word1 = 0xC123 & 0xFFF = 0x123
1723        let data: &[u8] = &[0xAB, 0xC1, 0x23];
1724        assert_eq!(read_12bit_packed(data, 0), 0xABC);
1725        assert_eq!(read_12bit_packed(data, 1), 0x123);
1726    }
1727}