sochdb_core/
tbp.rs

1// Copyright 2025 Sushanth (https://github.com/sushanthpy)
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! TOON Binary Protocol (TBP) - Zero-Copy Binary Wire Format
16//!
17//! From mm.md Task 3.1: Zero-Copy Binary Wire Format
18//!
19//! ## Problem
20//!
21//! Current TOON format is text-based with parsing overhead:
22//! - O(n) string allocations per row
23//! - UTF-8 validation on every parse
24//! - No random access (must scan from start)
25//! - Variable-length encoding requires sequential parsing
26//!
27//! ## Solution
28//!
29//! Binary protocol enables:
30//! - O(1) field access via row index + column offset
31//! - Zero-copy reads from mmap'd files
32//! - Null bitmap for efficient NULL handling
33//! - LLM-friendly text emission on demand
34//!
35//! ## Layout
36//!
37//! ```text
38//! TBP Layout (Little-Endian, 32-byte header):
39//! ┌─────────────────────────────────────────────────────┐
40//! │ magic: u32 = 0x544F4F4E ("TOON")                    │
41//! │ version: u16, flags: u16                            │
42//! │ schema_id: u64 (hash for validation)                │
43//! │ row_count: u32, column_count: u16                   │
44//! │ null_bitmap_offset: u32, row_index_offset: u32      │
45//! │ data_offset: u32                                    │
46//! ├─────────────────────────────────────────────────────┤
47//! │ Null Bitmap: ceil(rows × cols / 8) bytes            │
48//! │ Row Index: [u32; row_count] offsets                 │
49//! │ Data Section (columnar within blocks)               │
50//! └─────────────────────────────────────────────────────┘
51//!
52//! Access complexity:
53//! - Row access: O(1) via row_index[row]
54//! - Field access: O(1) via column_type + fixed_offset
55//! - Null check: O(1) via bitmap[row * cols + col]
56//! ```
57
58use std::io::{self, Write};
59
60use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
61
62/// TBP magic number: "TOON" in ASCII
63pub const TBP_MAGIC: u32 = 0x544F_4F4E;
64
65/// Current TBP version
66pub const TBP_VERSION: u16 = 1;
67
68/// TBP header size in bytes
69pub const TBP_HEADER_SIZE: usize = 32;
70
71/// TBP flags
72#[derive(Debug, Clone, Copy, Default)]
73pub struct TbpFlags(pub u16);
74
75impl TbpFlags {
76    /// Null bitmap is present
77    pub const HAS_NULLS: u16 = 1 << 0;
78    /// Row index is present (for variable-length data)
79    pub const HAS_ROW_INDEX: u16 = 1 << 1;
80    /// Data is compressed
81    pub const COMPRESSED: u16 = 1 << 2;
82    /// Schema is embedded in the file
83    pub const EMBEDDED_SCHEMA: u16 = 1 << 3;
84
85    pub fn has_nulls(&self) -> bool {
86        self.0 & Self::HAS_NULLS != 0
87    }
88
89    pub fn has_row_index(&self) -> bool {
90        self.0 & Self::HAS_ROW_INDEX != 0
91    }
92
93    pub fn is_compressed(&self) -> bool {
94        self.0 & Self::COMPRESSED != 0
95    }
96
97    pub fn has_embedded_schema(&self) -> bool {
98        self.0 & Self::EMBEDDED_SCHEMA != 0
99    }
100}
101
102/// Column type for TBP
103#[derive(Debug, Clone, Copy, PartialEq, Eq)]
104#[repr(u8)]
105pub enum TbpColumnType {
106    /// Null (no data)
107    Null = 0,
108    /// Boolean (1 byte, 0 or 1)
109    Bool = 1,
110    /// Signed 8-bit integer
111    Int8 = 2,
112    /// Unsigned 8-bit integer
113    UInt8 = 3,
114    /// Signed 16-bit integer
115    Int16 = 4,
116    /// Unsigned 16-bit integer
117    UInt16 = 5,
118    /// Signed 32-bit integer
119    Int32 = 6,
120    /// Unsigned 32-bit integer
121    UInt32 = 7,
122    /// Signed 64-bit integer
123    Int64 = 8,
124    /// Unsigned 64-bit integer
125    UInt64 = 9,
126    /// 32-bit float
127    Float32 = 10,
128    /// 64-bit float
129    Float64 = 11,
130    /// Variable-length string (UTF-8)
131    String = 12,
132    /// Variable-length binary
133    Binary = 13,
134    /// Timestamp (microseconds since epoch)
135    Timestamp = 14,
136    /// Fixed-size binary (e.g., UUIDs)
137    FixedBinary = 15,
138}
139
140impl TbpColumnType {
141    /// Get the fixed size of this type, or None for variable-length types
142    pub fn fixed_size(&self) -> Option<usize> {
143        match self {
144            TbpColumnType::Null => Some(0),
145            TbpColumnType::Bool => Some(1),
146            TbpColumnType::Int8 | TbpColumnType::UInt8 => Some(1),
147            TbpColumnType::Int16 | TbpColumnType::UInt16 => Some(2),
148            TbpColumnType::Int32 | TbpColumnType::UInt32 | TbpColumnType::Float32 => Some(4),
149            TbpColumnType::Int64 | TbpColumnType::UInt64 | TbpColumnType::Float64 | TbpColumnType::Timestamp => Some(8),
150            TbpColumnType::String | TbpColumnType::Binary => None,
151            TbpColumnType::FixedBinary => None, // Size specified per column
152        }
153    }
154
155    /// Check if this type is variable-length
156    pub fn is_variable(&self) -> bool {
157        self.fixed_size().is_none()
158    }
159
160    pub fn from_byte(b: u8) -> Option<Self> {
161        match b {
162            0 => Some(Self::Null),
163            1 => Some(Self::Bool),
164            2 => Some(Self::Int8),
165            3 => Some(Self::UInt8),
166            4 => Some(Self::Int16),
167            5 => Some(Self::UInt16),
168            6 => Some(Self::Int32),
169            7 => Some(Self::UInt32),
170            8 => Some(Self::Int64),
171            9 => Some(Self::UInt64),
172            10 => Some(Self::Float32),
173            11 => Some(Self::Float64),
174            12 => Some(Self::String),
175            13 => Some(Self::Binary),
176            14 => Some(Self::Timestamp),
177            15 => Some(Self::FixedBinary),
178            _ => None,
179        }
180    }
181}
182
183/// Column definition in TBP schema
184#[derive(Debug, Clone)]
185pub struct TbpColumn {
186    /// Column name
187    pub name: String,
188    /// Column type
189    pub col_type: TbpColumnType,
190    /// Fixed size for FixedBinary type
191    pub fixed_size: Option<u16>,
192    /// Column is nullable
193    pub nullable: bool,
194}
195
196impl TbpColumn {
197    pub fn new(name: impl Into<String>, col_type: TbpColumnType) -> Self {
198        Self {
199            name: name.into(),
200            col_type,
201            fixed_size: None,
202            nullable: true,
203        }
204    }
205
206    pub fn with_fixed_size(mut self, size: u16) -> Self {
207        self.fixed_size = Some(size);
208        self
209    }
210
211    pub fn not_null(mut self) -> Self {
212        self.nullable = false;
213        self
214    }
215}
216
217/// TBP schema
218#[derive(Debug, Clone)]
219pub struct TbpSchema {
220    /// Table name
221    pub name: String,
222    /// Columns
223    pub columns: Vec<TbpColumn>,
224    /// Schema ID (hash for validation)
225    pub schema_id: u64,
226}
227
228impl TbpSchema {
229    pub fn new(name: impl Into<String>, columns: Vec<TbpColumn>) -> Self {
230        let name = name.into();
231        let schema_id = Self::compute_schema_id(&name, &columns);
232        Self {
233            name,
234            columns,
235            schema_id,
236        }
237    }
238
239    /// Compute a hash of the schema for validation
240    fn compute_schema_id(name: &str, columns: &[TbpColumn]) -> u64 {
241        use std::hash::{Hash, Hasher};
242        use std::collections::hash_map::DefaultHasher;
243
244        let mut hasher = DefaultHasher::new();
245        name.hash(&mut hasher);
246        for col in columns {
247            col.name.hash(&mut hasher);
248            (col.col_type as u8).hash(&mut hasher);
249            col.fixed_size.hash(&mut hasher);
250            col.nullable.hash(&mut hasher);
251        }
252        hasher.finish()
253    }
254
255    /// Check if schema has any variable-length columns
256    pub fn has_variable_columns(&self) -> bool {
257        self.columns.iter().any(|c| c.col_type.is_variable())
258    }
259
260    /// Check if schema has any nullable columns
261    pub fn has_nullable_columns(&self) -> bool {
262        self.columns.iter().any(|c| c.nullable)
263    }
264
265    /// Get the fixed row size (if all columns are fixed-size)
266    pub fn fixed_row_size(&self) -> Option<usize> {
267        if self.has_variable_columns() {
268            return None;
269        }
270
271        let mut size = 0;
272        for col in &self.columns {
273            match col.col_type {
274                TbpColumnType::FixedBinary => {
275                    size += col.fixed_size.unwrap_or(0) as usize;
276                }
277                _ => {
278                    size += col.col_type.fixed_size()?;
279                }
280            }
281        }
282        Some(size)
283    }
284}
285
286/// TBP header (32 bytes)
287#[derive(Debug, Clone)]
288pub struct TbpHeader {
289    /// Magic number (should be TBP_MAGIC)
290    pub magic: u32,
291    /// Version number
292    pub version: u16,
293    /// Flags
294    pub flags: TbpFlags,
295    /// Schema ID for validation
296    pub schema_id: u64,
297    /// Number of rows
298    pub row_count: u32,
299    /// Number of columns
300    pub column_count: u16,
301    /// Reserved
302    pub reserved: u16,
303    /// Offset to null bitmap (0 if no nulls)
304    pub null_bitmap_offset: u32,
305    /// Offset to row index (0 if fixed-size rows)
306    pub row_index_offset: u32,
307}
308
309impl TbpHeader {
310    /// Write header to a buffer
311    pub fn write<W: Write>(&self, w: &mut W) -> io::Result<()> {
312        w.write_u32::<LittleEndian>(self.magic)?;
313        w.write_u16::<LittleEndian>(self.version)?;
314        w.write_u16::<LittleEndian>(self.flags.0)?;
315        w.write_u64::<LittleEndian>(self.schema_id)?;
316        w.write_u32::<LittleEndian>(self.row_count)?;
317        w.write_u16::<LittleEndian>(self.column_count)?;
318        w.write_u16::<LittleEndian>(self.reserved)?;
319        w.write_u32::<LittleEndian>(self.null_bitmap_offset)?;
320        w.write_u32::<LittleEndian>(self.row_index_offset)?;
321        Ok(())
322    }
323
324    /// Read header from a buffer
325    pub fn read(data: &[u8]) -> io::Result<Self> {
326        if data.len() < TBP_HEADER_SIZE {
327            return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "Header too short"));
328        }
329
330        let mut cursor = std::io::Cursor::new(data);
331        let magic = cursor.read_u32::<LittleEndian>()?;
332        if magic != TBP_MAGIC {
333            return Err(io::Error::new(io::ErrorKind::InvalidData, "Invalid TBP magic"));
334        }
335
336        Ok(Self {
337            magic,
338            version: cursor.read_u16::<LittleEndian>()?,
339            flags: TbpFlags(cursor.read_u16::<LittleEndian>()?),
340            schema_id: cursor.read_u64::<LittleEndian>()?,
341            row_count: cursor.read_u32::<LittleEndian>()?,
342            column_count: cursor.read_u16::<LittleEndian>()?,
343            reserved: cursor.read_u16::<LittleEndian>()?,
344            null_bitmap_offset: cursor.read_u32::<LittleEndian>()?,
345            row_index_offset: cursor.read_u32::<LittleEndian>()?,
346        })
347    }
348}
349
350/// Null bitmap for efficient null checking
351#[derive(Debug, Clone, Copy)]
352pub struct NullBitmap<'a> {
353    data: &'a [u8],
354    columns: usize,
355}
356
357impl<'a> NullBitmap<'a> {
358    pub fn new(data: &'a [u8], columns: usize) -> Self {
359        Self { data, columns }
360    }
361
362    /// Check if a cell is null - O(1)
363    #[inline]
364    pub fn is_null(&self, row: usize, col: usize) -> bool {
365        let bit_idx = row * self.columns + col;
366        let byte_idx = bit_idx / 8;
367        let bit_pos = bit_idx % 8;
368
369        if byte_idx >= self.data.len() {
370            return false;
371        }
372
373        self.data[byte_idx] & (1 << bit_pos) != 0
374    }
375
376    /// Calculate required size for bitmap
377    pub fn required_size(rows: usize, cols: usize) -> usize {
378        (rows * cols + 7) / 8
379    }
380}
381
382/// Mutable null bitmap for writing
383pub struct NullBitmapMut {
384    data: Vec<u8>,
385    columns: usize,
386}
387
388impl NullBitmapMut {
389    pub fn new(rows: usize, columns: usize) -> Self {
390        let size = NullBitmap::required_size(rows, columns);
391        Self {
392            data: vec![0; size],
393            columns,
394        }
395    }
396
397    /// Set a cell as null
398    #[inline]
399    pub fn set_null(&mut self, row: usize, col: usize) {
400        let bit_idx = row * self.columns + col;
401        let byte_idx = bit_idx / 8;
402        let bit_pos = bit_idx % 8;
403
404        if byte_idx < self.data.len() {
405            self.data[byte_idx] |= 1 << bit_pos;
406        }
407    }
408
409    /// Get the raw bitmap data
410    pub fn as_bytes(&self) -> &[u8] {
411        &self.data
412    }
413
414    /// Into raw data
415    pub fn into_bytes(self) -> Vec<u8> {
416        self.data
417    }
418}
419
420/// Zero-copy row view into TBP data
421#[derive(Debug, Clone)]
422pub struct RowView<'a> {
423    /// Schema reference
424    schema: &'a TbpSchema,
425    /// Raw row data
426    data: &'a [u8],
427    /// Null bitmap reference
428    null_bitmap: Option<&'a NullBitmap<'a>>,
429    /// Row index for null bitmap access
430    row_idx: usize,
431}
432
433impl<'a> RowView<'a> {
434    pub fn new(
435        schema: &'a TbpSchema,
436        data: &'a [u8],
437        null_bitmap: Option<&'a NullBitmap<'a>>,
438        row_idx: usize,
439    ) -> Self {
440        Self {
441            schema,
442            data,
443            null_bitmap,
444            row_idx,
445        }
446    }
447
448    /// Check if column is null - O(1)
449    #[inline]
450    pub fn is_null(&self, col: usize) -> bool {
451        self.null_bitmap
452            .map(|b| b.is_null(self.row_idx, col))
453            .unwrap_or(false)
454    }
455
456    /// Get column offset for fixed-size columns
457    fn column_offset(&self, col: usize) -> usize {
458        let mut offset = 0;
459        for c in &self.schema.columns[..col] {
460            offset += match c.col_type {
461                TbpColumnType::FixedBinary => c.fixed_size.unwrap_or(0) as usize,
462                _ => c.col_type.fixed_size().unwrap_or(0),
463            };
464        }
465        offset
466    }
467
468    /// Read a boolean column - O(1)
469    pub fn read_bool(&self, col: usize) -> Option<bool> {
470        if self.is_null(col) {
471            return None;
472        }
473        let offset = self.column_offset(col);
474        Some(self.data.get(offset).copied().unwrap_or(0) != 0)
475    }
476
477    /// Read an i64 column - O(1)
478    pub fn read_i64(&self, col: usize) -> Option<i64> {
479        if self.is_null(col) {
480            return None;
481        }
482        let offset = self.column_offset(col);
483        if offset + 8 > self.data.len() {
484            return None;
485        }
486        let bytes: [u8; 8] = self.data[offset..offset + 8].try_into().ok()?;
487        Some(i64::from_le_bytes(bytes))
488    }
489
490    /// Read a u64 column - O(1)
491    pub fn read_u64(&self, col: usize) -> Option<u64> {
492        if self.is_null(col) {
493            return None;
494        }
495        let offset = self.column_offset(col);
496        if offset + 8 > self.data.len() {
497            return None;
498        }
499        let bytes: [u8; 8] = self.data[offset..offset + 8].try_into().ok()?;
500        Some(u64::from_le_bytes(bytes))
501    }
502
503    /// Read an f64 column - O(1)
504    pub fn read_f64(&self, col: usize) -> Option<f64> {
505        if self.is_null(col) {
506            return None;
507        }
508        let offset = self.column_offset(col);
509        if offset + 8 > self.data.len() {
510            return None;
511        }
512        let bytes: [u8; 8] = self.data[offset..offset + 8].try_into().ok()?;
513        Some(f64::from_le_bytes(bytes))
514    }
515
516    /// Read an i32 column - O(1)
517    pub fn read_i32(&self, col: usize) -> Option<i32> {
518        if self.is_null(col) {
519            return None;
520        }
521        let offset = self.column_offset(col);
522        if offset + 4 > self.data.len() {
523            return None;
524        }
525        let bytes: [u8; 4] = self.data[offset..offset + 4].try_into().ok()?;
526        Some(i32::from_le_bytes(bytes))
527    }
528
529    /// Read an f32 column - O(1)
530    pub fn read_f32(&self, col: usize) -> Option<f32> {
531        if self.is_null(col) {
532            return None;
533        }
534        let offset = self.column_offset(col);
535        if offset + 4 > self.data.len() {
536            return None;
537        }
538        let bytes: [u8; 4] = self.data[offset..offset + 4].try_into().ok()?;
539        Some(f32::from_le_bytes(bytes))
540    }
541
542    /// Get raw row data
543    pub fn raw_data(&self) -> &[u8] {
544        self.data
545    }
546}
547
548/// TBP writer for creating binary tables
549pub struct TbpWriter {
550    schema: TbpSchema,
551    null_bitmap: NullBitmapMut,
552    row_index: Vec<u32>,
553    data: Vec<u8>,
554    row_count: usize,
555}
556
557impl TbpWriter {
558    pub fn new(schema: TbpSchema, estimated_rows: usize) -> Self {
559        Self {
560            null_bitmap: NullBitmapMut::new(estimated_rows, schema.columns.len()),
561            row_index: Vec::with_capacity(estimated_rows),
562            data: Vec::with_capacity(estimated_rows * schema.fixed_row_size().unwrap_or(64)),
563            row_count: 0,
564            schema,
565        }
566    }
567
568    /// Start a new row and return a row writer
569    pub fn start_row(&mut self) -> TbpRowWriter<'_> {
570        let offset = self.data.len() as u32;
571        self.row_index.push(offset);
572        TbpRowWriter {
573            writer: self,
574            col_idx: 0,
575        }
576    }
577
578    /// Mark a cell as null
579    fn set_null(&mut self, row: usize, col: usize) {
580        self.null_bitmap.set_null(row, col);
581    }
582
583    /// Finish writing and produce the final buffer
584    pub fn finish(self) -> Vec<u8> {
585        let has_nulls = self.schema.has_nullable_columns();
586        let has_variable = self.schema.has_variable_columns();
587
588        let mut flags = TbpFlags(0);
589        if has_nulls {
590            flags.0 |= TbpFlags::HAS_NULLS;
591        }
592        if has_variable {
593            flags.0 |= TbpFlags::HAS_ROW_INDEX;
594        }
595
596        // Calculate offsets
597        let null_bitmap_offset = if has_nulls { TBP_HEADER_SIZE as u32 } else { 0 };
598        let null_bitmap_size = if has_nulls {
599            NullBitmap::required_size(self.row_count, self.schema.columns.len())
600        } else {
601            0
602        };
603
604        let row_index_offset = if has_variable {
605            (TBP_HEADER_SIZE + null_bitmap_size) as u32
606        } else {
607            0
608        };
609        let row_index_size = if has_variable {
610            self.row_count * 4
611        } else {
612            0
613        };
614
615        let data_offset = TBP_HEADER_SIZE + null_bitmap_size + row_index_size;
616
617        let header = TbpHeader {
618            magic: TBP_MAGIC,
619            version: TBP_VERSION,
620            flags,
621            schema_id: self.schema.schema_id,
622            row_count: self.row_count as u32,
623            column_count: self.schema.columns.len() as u16,
624            reserved: 0,
625            null_bitmap_offset,
626            row_index_offset,
627        };
628
629        let total_size = data_offset + self.data.len();
630        let mut buffer = Vec::with_capacity(total_size);
631
632        // Write header
633        header.write(&mut buffer).unwrap();
634
635        // Write null bitmap
636        if has_nulls {
637            let required = NullBitmap::required_size(self.row_count, self.schema.columns.len());
638            buffer.extend_from_slice(&self.null_bitmap.as_bytes()[..required]);
639        }
640
641        // Write row index
642        if has_variable {
643            for offset in &self.row_index {
644                buffer.write_u32::<LittleEndian>(*offset + data_offset as u32).unwrap();
645            }
646        }
647
648        // Write data
649        buffer.extend_from_slice(&self.data);
650
651        buffer
652    }
653}
654
655/// Row writer for TBP
656pub struct TbpRowWriter<'a> {
657    writer: &'a mut TbpWriter,
658    col_idx: usize,
659}
660
661impl<'a> TbpRowWriter<'a> {
662    /// Write a null value
663    pub fn write_null(mut self) -> Self {
664        self.writer.set_null(self.writer.row_count, self.col_idx);
665        self.col_idx += 1;
666        self
667    }
668
669    /// Write a boolean
670    pub fn write_bool(mut self, value: bool) -> Self {
671        self.writer.data.push(if value { 1 } else { 0 });
672        self.col_idx += 1;
673        self
674    }
675
676    /// Write an i64
677    pub fn write_i64(mut self, value: i64) -> Self {
678        self.writer.data.extend_from_slice(&value.to_le_bytes());
679        self.col_idx += 1;
680        self
681    }
682
683    /// Write a u64
684    pub fn write_u64(mut self, value: u64) -> Self {
685        self.writer.data.extend_from_slice(&value.to_le_bytes());
686        self.col_idx += 1;
687        self
688    }
689
690    /// Write an f64
691    pub fn write_f64(mut self, value: f64) -> Self {
692        self.writer.data.extend_from_slice(&value.to_le_bytes());
693        self.col_idx += 1;
694        self
695    }
696
697    /// Write an i32
698    pub fn write_i32(mut self, value: i32) -> Self {
699        self.writer.data.extend_from_slice(&value.to_le_bytes());
700        self.col_idx += 1;
701        self
702    }
703
704    /// Write an f32
705    pub fn write_f32(mut self, value: f32) -> Self {
706        self.writer.data.extend_from_slice(&value.to_le_bytes());
707        self.col_idx += 1;
708        self
709    }
710
711    /// Write a string (variable length)
712    pub fn write_string(mut self, value: &str) -> Self {
713        let bytes = value.as_bytes();
714        self.writer.data.write_u32::<LittleEndian>(bytes.len() as u32).unwrap();
715        self.writer.data.extend_from_slice(bytes);
716        self.col_idx += 1;
717        self
718    }
719
720    /// Write binary data (variable length)
721    pub fn write_binary(mut self, value: &[u8]) -> Self {
722        self.writer.data.write_u32::<LittleEndian>(value.len() as u32).unwrap();
723        self.writer.data.extend_from_slice(value);
724        self.col_idx += 1;
725        self
726    }
727
728    /// Finish the row
729    pub fn finish(self) {
730        self.writer.row_count += 1;
731    }
732}
733
734/// TBP reader for zero-copy access
735pub struct TbpReader<'a> {
736    data: &'a [u8],
737    header: TbpHeader,
738    schema: &'a TbpSchema,
739}
740
741impl<'a> TbpReader<'a> {
742    /// Create a new reader
743    pub fn new(data: &'a [u8], schema: &'a TbpSchema) -> io::Result<Self> {
744        let header = TbpHeader::read(data)?;
745
746        if header.schema_id != schema.schema_id {
747            return Err(io::Error::new(
748                io::ErrorKind::InvalidData,
749                "Schema ID mismatch",
750            ));
751        }
752
753        Ok(Self {
754            data,
755            header,
756            schema,
757        })
758    }
759
760    /// Get number of rows
761    pub fn row_count(&self) -> usize {
762        self.header.row_count as usize
763    }
764
765    /// Get a row by index - O(1)
766    pub fn get_row(&self, row: usize) -> Option<RowView<'_>> {
767        if row >= self.row_count() {
768            return None;
769        }
770
771        // Get row offset
772        let row_offset = if self.header.flags.has_row_index() {
773            let idx_offset = self.header.row_index_offset as usize + row * 4;
774            if idx_offset + 4 > self.data.len() {
775                return None;
776            }
777            let bytes: [u8; 4] = self.data[idx_offset..idx_offset + 4].try_into().ok()?;
778            u32::from_le_bytes(bytes) as usize
779        } else {
780            // Fixed-size rows
781            let row_size = self.schema.fixed_row_size()?;
782            let null_bitmap_size = if self.header.flags.has_nulls() {
783                NullBitmap::required_size(self.row_count(), self.schema.columns.len())
784            } else {
785                0
786            };
787            TBP_HEADER_SIZE + null_bitmap_size + row * row_size
788        };
789
790        let row_data = &self.data[row_offset..];
791
792        // TODO: properly construct null bitmap reference
793        Some(RowView::new(self.schema, row_data, None, row))
794    }
795
796    /// Iterate over all rows - zero allocation per row
797    pub fn iter(&'a self) -> impl Iterator<Item = RowView<'a>> {
798        (0..self.row_count()).filter_map(move |i| self.get_row(i))
799    }
800}
801
802#[cfg(test)]
803mod tests {
804    use super::*;
805
806    #[test]
807    fn test_header_roundtrip() {
808        let header = TbpHeader {
809            magic: TBP_MAGIC,
810            version: TBP_VERSION,
811            flags: TbpFlags(TbpFlags::HAS_NULLS | TbpFlags::HAS_ROW_INDEX),
812            schema_id: 12345678,
813            row_count: 100,
814            column_count: 5,
815            reserved: 0,
816            null_bitmap_offset: 32,
817            row_index_offset: 48,
818        };
819
820        let mut buffer = Vec::new();
821        header.write(&mut buffer).unwrap();
822        assert_eq!(buffer.len(), TBP_HEADER_SIZE);
823
824        let parsed = TbpHeader::read(&buffer).unwrap();
825        assert_eq!(parsed.magic, TBP_MAGIC);
826        assert_eq!(parsed.version, TBP_VERSION);
827        assert_eq!(parsed.row_count, 100);
828        assert_eq!(parsed.column_count, 5);
829    }
830
831    #[test]
832    fn test_null_bitmap() {
833        let mut bitmap = NullBitmapMut::new(10, 5);
834        bitmap.set_null(0, 0);
835        bitmap.set_null(5, 3);
836        bitmap.set_null(9, 4);
837
838        let data = bitmap.as_bytes();
839        let reader = NullBitmap::new(data, 5);
840
841        assert!(reader.is_null(0, 0));
842        assert!(!reader.is_null(0, 1));
843        assert!(reader.is_null(5, 3));
844        assert!(reader.is_null(9, 4));
845        assert!(!reader.is_null(9, 3));
846    }
847
848    #[test]
849    fn test_writer_reader_roundtrip() {
850        let schema = TbpSchema::new(
851            "test_table",
852            vec![
853                TbpColumn::new("id", TbpColumnType::Int64).not_null(),
854                TbpColumn::new("value", TbpColumnType::Float64),
855            ],
856        );
857
858        let mut writer = TbpWriter::new(schema.clone(), 100);
859
860        // Write some rows
861        for i in 0..10 {
862            writer
863                .start_row()
864                .write_i64(i)
865                .write_f64(i as f64 * 1.5)
866                .finish();
867        }
868
869        let data = writer.finish();
870
871        // Read back
872        let reader = TbpReader::new(&data, &schema).unwrap();
873        assert_eq!(reader.row_count(), 10);
874
875        let row = reader.get_row(5).unwrap();
876        assert_eq!(row.read_i64(0), Some(5));
877        assert_eq!(row.read_f64(1), Some(7.5));
878    }
879}