Skip to main content

sochdb_core/
tbp.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! TOON Binary Protocol (TBP) - Zero-Copy Binary Wire Format
19//!
20//! From mm.md Task 3.1: Zero-Copy Binary Wire Format
21//!
22//! ## Problem
23//!
24//! Current TOON format is text-based with parsing overhead:
25//! - O(n) string allocations per row
26//! - UTF-8 validation on every parse
27//! - No random access (must scan from start)
28//! - Variable-length encoding requires sequential parsing
29//!
30//! ## Solution
31//!
32//! Binary protocol enables:
33//! - O(1) field access via row index + column offset
34//! - Zero-copy reads from mmap'd files
35//! - Null bitmap for efficient NULL handling
36//! - LLM-friendly text emission on demand
37//!
38//! ## Layout
39//!
40//! ```text
41//! TBP Layout (Little-Endian, 32-byte header):
42//! ┌─────────────────────────────────────────────────────┐
43//! │ magic: u32 = 0x544F4F4E ("TOON")                    │
44//! │ version: u16, flags: u16                            │
45//! │ schema_id: u64 (hash for validation)                │
46//! │ row_count: u32, column_count: u16                   │
47//! │ null_bitmap_offset: u32, row_index_offset: u32      │
48//! │ data_offset: u32                                    │
49//! ├─────────────────────────────────────────────────────┤
50//! │ Null Bitmap: ceil(rows × cols / 8) bytes            │
51//! │ Row Index: [u32; row_count] offsets                 │
52//! │ Data Section (columnar within blocks)               │
53//! └─────────────────────────────────────────────────────┘
54//!
55//! Access complexity:
56//! - Row access: O(1) via row_index[row]
57//! - Field access: O(1) via column_type + fixed_offset
58//! - Null check: O(1) via bitmap[row * cols + col]
59//! ```
60
61use std::io::{self, Write};
62
63use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
64
65/// TBP magic number: "TOON" in ASCII
66pub const TBP_MAGIC: u32 = 0x544F_4F4E;
67
68/// Current TBP version
69pub const TBP_VERSION: u16 = 1;
70
71/// TBP header size in bytes
72pub const TBP_HEADER_SIZE: usize = 32;
73
74/// TBP flags
75#[derive(Debug, Clone, Copy, Default)]
76pub struct TbpFlags(pub u16);
77
78impl TbpFlags {
79    /// Null bitmap is present
80    pub const HAS_NULLS: u16 = 1 << 0;
81    /// Row index is present (for variable-length data)
82    pub const HAS_ROW_INDEX: u16 = 1 << 1;
83    /// Data is compressed
84    pub const COMPRESSED: u16 = 1 << 2;
85    /// Schema is embedded in the file
86    pub const EMBEDDED_SCHEMA: u16 = 1 << 3;
87
88    pub fn has_nulls(&self) -> bool {
89        self.0 & Self::HAS_NULLS != 0
90    }
91
92    pub fn has_row_index(&self) -> bool {
93        self.0 & Self::HAS_ROW_INDEX != 0
94    }
95
96    pub fn is_compressed(&self) -> bool {
97        self.0 & Self::COMPRESSED != 0
98    }
99
100    pub fn has_embedded_schema(&self) -> bool {
101        self.0 & Self::EMBEDDED_SCHEMA != 0
102    }
103}
104
105/// Column type for TBP
106#[derive(Debug, Clone, Copy, PartialEq, Eq)]
107#[repr(u8)]
108pub enum TbpColumnType {
109    /// Null (no data)
110    Null = 0,
111    /// Boolean (1 byte, 0 or 1)
112    Bool = 1,
113    /// Signed 8-bit integer
114    Int8 = 2,
115    /// Unsigned 8-bit integer
116    UInt8 = 3,
117    /// Signed 16-bit integer
118    Int16 = 4,
119    /// Unsigned 16-bit integer
120    UInt16 = 5,
121    /// Signed 32-bit integer
122    Int32 = 6,
123    /// Unsigned 32-bit integer
124    UInt32 = 7,
125    /// Signed 64-bit integer
126    Int64 = 8,
127    /// Unsigned 64-bit integer
128    UInt64 = 9,
129    /// 32-bit float
130    Float32 = 10,
131    /// 64-bit float
132    Float64 = 11,
133    /// Variable-length string (UTF-8)
134    String = 12,
135    /// Variable-length binary
136    Binary = 13,
137    /// Timestamp (microseconds since epoch)
138    Timestamp = 14,
139    /// Fixed-size binary (e.g., UUIDs)
140    FixedBinary = 15,
141}
142
143impl TbpColumnType {
144    /// Get the fixed size of this type, or None for variable-length types
145    pub fn fixed_size(&self) -> Option<usize> {
146        match self {
147            TbpColumnType::Null => Some(0),
148            TbpColumnType::Bool => Some(1),
149            TbpColumnType::Int8 | TbpColumnType::UInt8 => Some(1),
150            TbpColumnType::Int16 | TbpColumnType::UInt16 => Some(2),
151            TbpColumnType::Int32 | TbpColumnType::UInt32 | TbpColumnType::Float32 => Some(4),
152            TbpColumnType::Int64
153            | TbpColumnType::UInt64
154            | TbpColumnType::Float64
155            | TbpColumnType::Timestamp => Some(8),
156            TbpColumnType::String | TbpColumnType::Binary => None,
157            TbpColumnType::FixedBinary => None, // Size specified per column
158        }
159    }
160
161    /// Check if this type is variable-length
162    pub fn is_variable(&self) -> bool {
163        self.fixed_size().is_none()
164    }
165
166    pub fn from_byte(b: u8) -> Option<Self> {
167        match b {
168            0 => Some(Self::Null),
169            1 => Some(Self::Bool),
170            2 => Some(Self::Int8),
171            3 => Some(Self::UInt8),
172            4 => Some(Self::Int16),
173            5 => Some(Self::UInt16),
174            6 => Some(Self::Int32),
175            7 => Some(Self::UInt32),
176            8 => Some(Self::Int64),
177            9 => Some(Self::UInt64),
178            10 => Some(Self::Float32),
179            11 => Some(Self::Float64),
180            12 => Some(Self::String),
181            13 => Some(Self::Binary),
182            14 => Some(Self::Timestamp),
183            15 => Some(Self::FixedBinary),
184            _ => None,
185        }
186    }
187}
188
189/// Column definition in TBP schema
190#[derive(Debug, Clone)]
191pub struct TbpColumn {
192    /// Column name
193    pub name: String,
194    /// Column type
195    pub col_type: TbpColumnType,
196    /// Fixed size for FixedBinary type
197    pub fixed_size: Option<u16>,
198    /// Column is nullable
199    pub nullable: bool,
200}
201
202impl TbpColumn {
203    pub fn new(name: impl Into<String>, col_type: TbpColumnType) -> Self {
204        Self {
205            name: name.into(),
206            col_type,
207            fixed_size: None,
208            nullable: true,
209        }
210    }
211
212    pub fn with_fixed_size(mut self, size: u16) -> Self {
213        self.fixed_size = Some(size);
214        self
215    }
216
217    pub fn not_null(mut self) -> Self {
218        self.nullable = false;
219        self
220    }
221}
222
223/// TBP schema
224#[derive(Debug, Clone)]
225pub struct TbpSchema {
226    /// Table name
227    pub name: String,
228    /// Columns
229    pub columns: Vec<TbpColumn>,
230    /// Schema ID (hash for validation)
231    pub schema_id: u64,
232}
233
234impl TbpSchema {
235    pub fn new(name: impl Into<String>, columns: Vec<TbpColumn>) -> Self {
236        let name = name.into();
237        let schema_id = Self::compute_schema_id(&name, &columns);
238        Self {
239            name,
240            columns,
241            schema_id,
242        }
243    }
244
245    /// Compute a hash of the schema for validation
246    fn compute_schema_id(name: &str, columns: &[TbpColumn]) -> u64 {
247        use std::collections::hash_map::DefaultHasher;
248        use std::hash::{Hash, Hasher};
249
250        let mut hasher = DefaultHasher::new();
251        name.hash(&mut hasher);
252        for col in columns {
253            col.name.hash(&mut hasher);
254            (col.col_type as u8).hash(&mut hasher);
255            col.fixed_size.hash(&mut hasher);
256            col.nullable.hash(&mut hasher);
257        }
258        hasher.finish()
259    }
260
261    /// Check if schema has any variable-length columns
262    pub fn has_variable_columns(&self) -> bool {
263        self.columns.iter().any(|c| c.col_type.is_variable())
264    }
265
266    /// Check if schema has any nullable columns
267    pub fn has_nullable_columns(&self) -> bool {
268        self.columns.iter().any(|c| c.nullable)
269    }
270
271    /// Get the fixed row size (if all columns are fixed-size)
272    pub fn fixed_row_size(&self) -> Option<usize> {
273        if self.has_variable_columns() {
274            return None;
275        }
276
277        let mut size = 0;
278        for col in &self.columns {
279            match col.col_type {
280                TbpColumnType::FixedBinary => {
281                    size += col.fixed_size.unwrap_or(0) as usize;
282                }
283                _ => {
284                    size += col.col_type.fixed_size()?;
285                }
286            }
287        }
288        Some(size)
289    }
290}
291
292/// TBP header (32 bytes)
293#[derive(Debug, Clone)]
294pub struct TbpHeader {
295    /// Magic number (should be TBP_MAGIC)
296    pub magic: u32,
297    /// Version number
298    pub version: u16,
299    /// Flags
300    pub flags: TbpFlags,
301    /// Schema ID for validation
302    pub schema_id: u64,
303    /// Number of rows
304    pub row_count: u32,
305    /// Number of columns
306    pub column_count: u16,
307    /// Reserved
308    pub reserved: u16,
309    /// Offset to null bitmap (0 if no nulls)
310    pub null_bitmap_offset: u32,
311    /// Offset to row index (0 if fixed-size rows)
312    pub row_index_offset: u32,
313}
314
315impl TbpHeader {
316    /// Write header to a buffer
317    pub fn write<W: Write>(&self, w: &mut W) -> io::Result<()> {
318        w.write_u32::<LittleEndian>(self.magic)?;
319        w.write_u16::<LittleEndian>(self.version)?;
320        w.write_u16::<LittleEndian>(self.flags.0)?;
321        w.write_u64::<LittleEndian>(self.schema_id)?;
322        w.write_u32::<LittleEndian>(self.row_count)?;
323        w.write_u16::<LittleEndian>(self.column_count)?;
324        w.write_u16::<LittleEndian>(self.reserved)?;
325        w.write_u32::<LittleEndian>(self.null_bitmap_offset)?;
326        w.write_u32::<LittleEndian>(self.row_index_offset)?;
327        Ok(())
328    }
329
330    /// Read header from a buffer
331    pub fn read(data: &[u8]) -> io::Result<Self> {
332        if data.len() < TBP_HEADER_SIZE {
333            return Err(io::Error::new(
334                io::ErrorKind::UnexpectedEof,
335                "Header too short",
336            ));
337        }
338
339        let mut cursor = std::io::Cursor::new(data);
340        let magic = cursor.read_u32::<LittleEndian>()?;
341        if magic != TBP_MAGIC {
342            return Err(io::Error::new(
343                io::ErrorKind::InvalidData,
344                "Invalid TBP magic",
345            ));
346        }
347
348        let header = Self {
349            magic,
350            version: cursor.read_u16::<LittleEndian>()?,
351            flags: TbpFlags(cursor.read_u16::<LittleEndian>()?),
352            schema_id: cursor.read_u64::<LittleEndian>()?,
353            row_count: cursor.read_u32::<LittleEndian>()?,
354            column_count: cursor.read_u16::<LittleEndian>()?,
355            reserved: cursor.read_u16::<LittleEndian>()?,
356            null_bitmap_offset: cursor.read_u32::<LittleEndian>()?,
357            row_index_offset: cursor.read_u32::<LittleEndian>()?,
358        };
359
360        let data_len = data.len() as u64;
361
362        // Validate offsets are within the buffer to prevent OOB access
363        // from malformed or adversarial TBP payloads.
364        // Only validate when the buffer contains more than just the header
365        // (header-only buffers are used for serialization roundtrip tests).
366        if data_len > TBP_HEADER_SIZE as u64 {
367            if header.null_bitmap_offset != 0 && (header.null_bitmap_offset as u64) >= data_len {
368                return Err(io::Error::new(
369                    io::ErrorKind::InvalidData,
370                    format!(
371                        "null_bitmap_offset ({}) exceeds data length ({})",
372                        header.null_bitmap_offset, data_len
373                    ),
374                ));
375            }
376            if header.row_index_offset != 0 && (header.row_index_offset as u64) >= data_len {
377                return Err(io::Error::new(
378                    io::ErrorKind::InvalidData,
379                    format!(
380                        "row_index_offset ({}) exceeds data length ({})",
381                        header.row_index_offset, data_len
382                    ),
383                ));
384            }
385        }
386
387        Ok(header)
388    }
389}
390
391/// Null bitmap for efficient null checking
392#[derive(Debug, Clone, Copy)]
393pub struct NullBitmap<'a> {
394    data: &'a [u8],
395    columns: usize,
396}
397
398impl<'a> NullBitmap<'a> {
399    pub fn new(data: &'a [u8], columns: usize) -> Self {
400        Self { data, columns }
401    }
402
403    /// Check if a cell is null - O(1)
404    #[inline]
405    pub fn is_null(&self, row: usize, col: usize) -> bool {
406        let bit_idx = row * self.columns + col;
407        let byte_idx = bit_idx / 8;
408        let bit_pos = bit_idx % 8;
409
410        if byte_idx >= self.data.len() {
411            return false;
412        }
413
414        self.data[byte_idx] & (1 << bit_pos) != 0
415    }
416
417    /// Calculate required size for bitmap
418    pub fn required_size(rows: usize, cols: usize) -> usize {
419        (rows * cols + 7) / 8
420    }
421}
422
423/// Mutable null bitmap for writing
424pub struct NullBitmapMut {
425    data: Vec<u8>,
426    columns: usize,
427}
428
429impl NullBitmapMut {
430    pub fn new(rows: usize, columns: usize) -> Self {
431        let size = NullBitmap::required_size(rows, columns);
432        Self {
433            data: vec![0; size],
434            columns,
435        }
436    }
437
438    /// Set a cell as null
439    #[inline]
440    pub fn set_null(&mut self, row: usize, col: usize) {
441        let bit_idx = row * self.columns + col;
442        let byte_idx = bit_idx / 8;
443        let bit_pos = bit_idx % 8;
444
445        if byte_idx < self.data.len() {
446            self.data[byte_idx] |= 1 << bit_pos;
447        }
448    }
449
450    /// Get the raw bitmap data
451    pub fn as_bytes(&self) -> &[u8] {
452        &self.data
453    }
454
455    /// Into raw data
456    pub fn into_bytes(self) -> Vec<u8> {
457        self.data
458    }
459}
460
461/// Zero-copy row view into TBP data
462#[derive(Debug, Clone)]
463pub struct RowView<'a> {
464    /// Schema reference
465    schema: &'a TbpSchema,
466    /// Raw row data
467    data: &'a [u8],
468    /// Null bitmap reference
469    null_bitmap: Option<&'a NullBitmap<'a>>,
470    /// Row index for null bitmap access
471    row_idx: usize,
472}
473
474impl<'a> RowView<'a> {
475    pub fn new(
476        schema: &'a TbpSchema,
477        data: &'a [u8],
478        null_bitmap: Option<&'a NullBitmap<'a>>,
479        row_idx: usize,
480    ) -> Self {
481        Self {
482            schema,
483            data,
484            null_bitmap,
485            row_idx,
486        }
487    }
488
489    /// Check if column is null - O(1)
490    #[inline]
491    pub fn is_null(&self, col: usize) -> bool {
492        self.null_bitmap
493            .map(|b| b.is_null(self.row_idx, col))
494            .unwrap_or(false)
495    }
496
497    /// Get column offset for fixed-size columns
498    fn column_offset(&self, col: usize) -> usize {
499        let mut offset = 0;
500        for c in &self.schema.columns[..col] {
501            offset += match c.col_type {
502                TbpColumnType::FixedBinary => c.fixed_size.unwrap_or(0) as usize,
503                _ => c.col_type.fixed_size().unwrap_or(0),
504            };
505        }
506        offset
507    }
508
509    /// Read a boolean column - O(1)
510    pub fn read_bool(&self, col: usize) -> Option<bool> {
511        if self.is_null(col) {
512            return None;
513        }
514        let offset = self.column_offset(col);
515        Some(self.data.get(offset).copied().unwrap_or(0) != 0)
516    }
517
518    /// Read an i64 column - O(1)
519    pub fn read_i64(&self, col: usize) -> Option<i64> {
520        if self.is_null(col) {
521            return None;
522        }
523        let offset = self.column_offset(col);
524        if offset + 8 > self.data.len() {
525            return None;
526        }
527        let bytes: [u8; 8] = self.data[offset..offset + 8].try_into().ok()?;
528        Some(i64::from_le_bytes(bytes))
529    }
530
531    /// Read a u64 column - O(1)
532    pub fn read_u64(&self, col: usize) -> Option<u64> {
533        if self.is_null(col) {
534            return None;
535        }
536        let offset = self.column_offset(col);
537        if offset + 8 > self.data.len() {
538            return None;
539        }
540        let bytes: [u8; 8] = self.data[offset..offset + 8].try_into().ok()?;
541        Some(u64::from_le_bytes(bytes))
542    }
543
544    /// Read an f64 column - O(1)
545    pub fn read_f64(&self, col: usize) -> Option<f64> {
546        if self.is_null(col) {
547            return None;
548        }
549        let offset = self.column_offset(col);
550        if offset + 8 > self.data.len() {
551            return None;
552        }
553        let bytes: [u8; 8] = self.data[offset..offset + 8].try_into().ok()?;
554        Some(f64::from_le_bytes(bytes))
555    }
556
557    /// Read an i32 column - O(1)
558    pub fn read_i32(&self, col: usize) -> Option<i32> {
559        if self.is_null(col) {
560            return None;
561        }
562        let offset = self.column_offset(col);
563        if offset + 4 > self.data.len() {
564            return None;
565        }
566        let bytes: [u8; 4] = self.data[offset..offset + 4].try_into().ok()?;
567        Some(i32::from_le_bytes(bytes))
568    }
569
570    /// Read an f32 column - O(1)
571    pub fn read_f32(&self, col: usize) -> Option<f32> {
572        if self.is_null(col) {
573            return None;
574        }
575        let offset = self.column_offset(col);
576        if offset + 4 > self.data.len() {
577            return None;
578        }
579        let bytes: [u8; 4] = self.data[offset..offset + 4].try_into().ok()?;
580        Some(f32::from_le_bytes(bytes))
581    }
582
583    /// Get raw row data
584    pub fn raw_data(&self) -> &[u8] {
585        self.data
586    }
587}
588
589/// TBP writer for creating binary tables
590pub struct TbpWriter {
591    schema: TbpSchema,
592    null_bitmap: NullBitmapMut,
593    row_index: Vec<u32>,
594    data: Vec<u8>,
595    row_count: usize,
596}
597
598impl TbpWriter {
599    pub fn new(schema: TbpSchema, estimated_rows: usize) -> Self {
600        Self {
601            null_bitmap: NullBitmapMut::new(estimated_rows, schema.columns.len()),
602            row_index: Vec::with_capacity(estimated_rows),
603            data: Vec::with_capacity(estimated_rows * schema.fixed_row_size().unwrap_or(64)),
604            row_count: 0,
605            schema,
606        }
607    }
608
609    /// Start a new row and return a row writer
610    pub fn start_row(&mut self) -> TbpRowWriter<'_> {
611        let offset = self.data.len() as u32;
612        self.row_index.push(offset);
613        TbpRowWriter {
614            writer: self,
615            col_idx: 0,
616        }
617    }
618
619    /// Mark a cell as null
620    fn set_null(&mut self, row: usize, col: usize) {
621        self.null_bitmap.set_null(row, col);
622    }
623
624    /// Finish writing and produce the final buffer
625    pub fn finish(self) -> Vec<u8> {
626        let has_nulls = self.schema.has_nullable_columns();
627        let has_variable = self.schema.has_variable_columns();
628
629        let mut flags = TbpFlags(0);
630        if has_nulls {
631            flags.0 |= TbpFlags::HAS_NULLS;
632        }
633        if has_variable {
634            flags.0 |= TbpFlags::HAS_ROW_INDEX;
635        }
636
637        // Calculate offsets
638        let null_bitmap_offset = if has_nulls { TBP_HEADER_SIZE as u32 } else { 0 };
639        let null_bitmap_size = if has_nulls {
640            NullBitmap::required_size(self.row_count, self.schema.columns.len())
641        } else {
642            0
643        };
644
645        let row_index_offset = if has_variable {
646            (TBP_HEADER_SIZE + null_bitmap_size) as u32
647        } else {
648            0
649        };
650        let row_index_size = if has_variable { self.row_count * 4 } else { 0 };
651
652        let data_offset = TBP_HEADER_SIZE + null_bitmap_size + row_index_size;
653
654        let header = TbpHeader {
655            magic: TBP_MAGIC,
656            version: TBP_VERSION,
657            flags,
658            schema_id: self.schema.schema_id,
659            row_count: self.row_count as u32,
660            column_count: self.schema.columns.len() as u16,
661            reserved: 0,
662            null_bitmap_offset,
663            row_index_offset,
664        };
665
666        let total_size = data_offset + self.data.len();
667        let mut buffer = Vec::with_capacity(total_size);
668
669        // Write header
670        header.write(&mut buffer).unwrap();
671
672        // Write null bitmap
673        if has_nulls {
674            let required = NullBitmap::required_size(self.row_count, self.schema.columns.len());
675            buffer.extend_from_slice(&self.null_bitmap.as_bytes()[..required]);
676        }
677
678        // Write row index
679        if has_variable {
680            for offset in &self.row_index {
681                buffer
682                    .write_u32::<LittleEndian>(*offset + data_offset as u32)
683                    .unwrap();
684            }
685        }
686
687        // Write data
688        buffer.extend_from_slice(&self.data);
689
690        buffer
691    }
692}
693
694/// Row writer for TBP
695pub struct TbpRowWriter<'a> {
696    writer: &'a mut TbpWriter,
697    col_idx: usize,
698}
699
700impl<'a> TbpRowWriter<'a> {
701    /// Write a null value
702    pub fn write_null(mut self) -> Self {
703        self.writer.set_null(self.writer.row_count, self.col_idx);
704        self.col_idx += 1;
705        self
706    }
707
708    /// Write a boolean
709    pub fn write_bool(mut self, value: bool) -> Self {
710        self.writer.data.push(if value { 1 } else { 0 });
711        self.col_idx += 1;
712        self
713    }
714
715    /// Write an i64
716    pub fn write_i64(mut self, value: i64) -> Self {
717        self.writer.data.extend_from_slice(&value.to_le_bytes());
718        self.col_idx += 1;
719        self
720    }
721
722    /// Write a u64
723    pub fn write_u64(mut self, value: u64) -> Self {
724        self.writer.data.extend_from_slice(&value.to_le_bytes());
725        self.col_idx += 1;
726        self
727    }
728
729    /// Write an f64
730    pub fn write_f64(mut self, value: f64) -> Self {
731        self.writer.data.extend_from_slice(&value.to_le_bytes());
732        self.col_idx += 1;
733        self
734    }
735
736    /// Write an i32
737    pub fn write_i32(mut self, value: i32) -> Self {
738        self.writer.data.extend_from_slice(&value.to_le_bytes());
739        self.col_idx += 1;
740        self
741    }
742
743    /// Write an f32
744    pub fn write_f32(mut self, value: f32) -> Self {
745        self.writer.data.extend_from_slice(&value.to_le_bytes());
746        self.col_idx += 1;
747        self
748    }
749
750    /// Write a string (variable length)
751    pub fn write_string(mut self, value: &str) -> Self {
752        let bytes = value.as_bytes();
753        self.writer
754            .data
755            .write_u32::<LittleEndian>(bytes.len() as u32)
756            .unwrap();
757        self.writer.data.extend_from_slice(bytes);
758        self.col_idx += 1;
759        self
760    }
761
762    /// Write binary data (variable length)
763    pub fn write_binary(mut self, value: &[u8]) -> Self {
764        self.writer
765            .data
766            .write_u32::<LittleEndian>(value.len() as u32)
767            .unwrap();
768        self.writer.data.extend_from_slice(value);
769        self.col_idx += 1;
770        self
771    }
772
773    /// Finish the row
774    pub fn finish(self) {
775        self.writer.row_count += 1;
776    }
777}
778
779/// TBP reader for zero-copy access
780pub struct TbpReader<'a> {
781    data: &'a [u8],
782    header: TbpHeader,
783    schema: &'a TbpSchema,
784}
785
786impl<'a> TbpReader<'a> {
787    /// Create a new reader
788    pub fn new(data: &'a [u8], schema: &'a TbpSchema) -> io::Result<Self> {
789        let header = TbpHeader::read(data)?;
790
791        if header.schema_id != schema.schema_id {
792            return Err(io::Error::new(
793                io::ErrorKind::InvalidData,
794                "Schema ID mismatch",
795            ));
796        }
797
798        Ok(Self {
799            data,
800            header,
801            schema,
802        })
803    }
804
805    /// Get number of rows
806    pub fn row_count(&self) -> usize {
807        self.header.row_count as usize
808    }
809
810    /// Get a row by index - O(1)
811    pub fn get_row(&self, row: usize) -> Option<RowView<'_>> {
812        if row >= self.row_count() {
813            return None;
814        }
815
816        // Get row offset
817        let row_offset = if self.header.flags.has_row_index() {
818            let idx_offset = self.header.row_index_offset as usize + row * 4;
819            if idx_offset + 4 > self.data.len() {
820                return None;
821            }
822            let bytes: [u8; 4] = self.data[idx_offset..idx_offset + 4].try_into().ok()?;
823            u32::from_le_bytes(bytes) as usize
824        } else {
825            // Fixed-size rows
826            let row_size = self.schema.fixed_row_size()?;
827            let null_bitmap_size = if self.header.flags.has_nulls() {
828                NullBitmap::required_size(self.row_count(), self.schema.columns.len())
829            } else {
830                0
831            };
832            TBP_HEADER_SIZE + null_bitmap_size + row * row_size
833        };
834
835        let row_data = &self.data[row_offset..];
836
837        // TODO: properly construct null bitmap reference
838        Some(RowView::new(self.schema, row_data, None, row))
839    }
840
841    /// Iterate over all rows - zero allocation per row
842    pub fn iter(&'a self) -> impl Iterator<Item = RowView<'a>> {
843        (0..self.row_count()).filter_map(move |i| self.get_row(i))
844    }
845}
846
847#[cfg(test)]
848mod tests {
849    use super::*;
850
851    #[test]
852    fn test_header_roundtrip() {
853        let header = TbpHeader {
854            magic: TBP_MAGIC,
855            version: TBP_VERSION,
856            flags: TbpFlags(TbpFlags::HAS_NULLS | TbpFlags::HAS_ROW_INDEX),
857            schema_id: 12345678,
858            row_count: 100,
859            column_count: 5,
860            reserved: 0,
861            null_bitmap_offset: 32,
862            row_index_offset: 48,
863        };
864
865        let mut buffer = Vec::new();
866        header.write(&mut buffer).unwrap();
867        assert_eq!(buffer.len(), TBP_HEADER_SIZE);
868
869        let parsed = TbpHeader::read(&buffer).unwrap();
870        assert_eq!(parsed.magic, TBP_MAGIC);
871        assert_eq!(parsed.version, TBP_VERSION);
872        assert_eq!(parsed.row_count, 100);
873        assert_eq!(parsed.column_count, 5);
874    }
875
876    #[test]
877    fn test_null_bitmap() {
878        let mut bitmap = NullBitmapMut::new(10, 5);
879        bitmap.set_null(0, 0);
880        bitmap.set_null(5, 3);
881        bitmap.set_null(9, 4);
882
883        let data = bitmap.as_bytes();
884        let reader = NullBitmap::new(data, 5);
885
886        assert!(reader.is_null(0, 0));
887        assert!(!reader.is_null(0, 1));
888        assert!(reader.is_null(5, 3));
889        assert!(reader.is_null(9, 4));
890        assert!(!reader.is_null(9, 3));
891    }
892
893    #[test]
894    fn test_writer_reader_roundtrip() {
895        let schema = TbpSchema::new(
896            "test_table",
897            vec![
898                TbpColumn::new("id", TbpColumnType::Int64).not_null(),
899                TbpColumn::new("value", TbpColumnType::Float64),
900            ],
901        );
902
903        let mut writer = TbpWriter::new(schema.clone(), 100);
904
905        // Write some rows
906        for i in 0..10 {
907            writer
908                .start_row()
909                .write_i64(i)
910                .write_f64(i as f64 * 1.5)
911                .finish();
912        }
913
914        let data = writer.finish();
915
916        // Read back
917        let reader = TbpReader::new(&data, &schema).unwrap();
918        assert_eq!(reader.row_count(), 10);
919
920        let row = reader.get_row(5).unwrap();
921        assert_eq!(row.read_i64(0), Some(5));
922        assert_eq!(row.read_f64(1), Some(7.5));
923    }
924}