Skip to main content

sochdb_core/
tbp.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! TOON Binary Protocol (TBP) - Zero-Copy Binary Wire Format
19//!
20//! From mm.md Task 3.1: Zero-Copy Binary Wire Format
21//!
22//! ## Problem
23//!
24//! Current TOON format is text-based with parsing overhead:
25//! - O(n) string allocations per row
26//! - UTF-8 validation on every parse
27//! - No random access (must scan from start)
28//! - Variable-length encoding requires sequential parsing
29//!
30//! ## Solution
31//!
32//! Binary protocol enables:
33//! - O(1) field access via row index + column offset
34//! - Zero-copy reads from mmap'd files
35//! - Null bitmap for efficient NULL handling
36//! - LLM-friendly text emission on demand
37//!
38//! ## Layout
39//!
40//! ```text
41//! TBP Layout (Little-Endian, 32-byte header):
42//! ┌─────────────────────────────────────────────────────┐
43//! │ magic: u32 = 0x544F4F4E ("TOON")                    │
44//! │ version: u16, flags: u16                            │
45//! │ schema_id: u64 (hash for validation)                │
46//! │ row_count: u32, column_count: u16                   │
47//! │ null_bitmap_offset: u32, row_index_offset: u32      │
48//! │ data_offset: u32                                    │
49//! ├─────────────────────────────────────────────────────┤
50//! │ Null Bitmap: ceil(rows × cols / 8) bytes            │
51//! │ Row Index: [u32; row_count] offsets                 │
52//! │ Data Section (columnar within blocks)               │
53//! └─────────────────────────────────────────────────────┘
54//!
55//! Access complexity:
56//! - Row access: O(1) via row_index[row]
57//! - Field access: O(1) via column_type + fixed_offset
58//! - Null check: O(1) via bitmap[row * cols + col]
59//! ```
60
61use std::io::{self, Write};
62
63use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
64
65/// TBP magic number: "TOON" in ASCII
66pub const TBP_MAGIC: u32 = 0x544F_4F4E;
67
68/// Current TBP version
69pub const TBP_VERSION: u16 = 1;
70
71/// TBP header size in bytes
72pub const TBP_HEADER_SIZE: usize = 32;
73
74/// TBP flags
75#[derive(Debug, Clone, Copy, Default)]
76pub struct TbpFlags(pub u16);
77
78impl TbpFlags {
79    /// Null bitmap is present
80    pub const HAS_NULLS: u16 = 1 << 0;
81    /// Row index is present (for variable-length data)
82    pub const HAS_ROW_INDEX: u16 = 1 << 1;
83    /// Data is compressed
84    pub const COMPRESSED: u16 = 1 << 2;
85    /// Schema is embedded in the file
86    pub const EMBEDDED_SCHEMA: u16 = 1 << 3;
87
88    pub fn has_nulls(&self) -> bool {
89        self.0 & Self::HAS_NULLS != 0
90    }
91
92    pub fn has_row_index(&self) -> bool {
93        self.0 & Self::HAS_ROW_INDEX != 0
94    }
95
96    pub fn is_compressed(&self) -> bool {
97        self.0 & Self::COMPRESSED != 0
98    }
99
100    pub fn has_embedded_schema(&self) -> bool {
101        self.0 & Self::EMBEDDED_SCHEMA != 0
102    }
103}
104
105/// Column type for TBP
106#[derive(Debug, Clone, Copy, PartialEq, Eq)]
107#[repr(u8)]
108pub enum TbpColumnType {
109    /// Null (no data)
110    Null = 0,
111    /// Boolean (1 byte, 0 or 1)
112    Bool = 1,
113    /// Signed 8-bit integer
114    Int8 = 2,
115    /// Unsigned 8-bit integer
116    UInt8 = 3,
117    /// Signed 16-bit integer
118    Int16 = 4,
119    /// Unsigned 16-bit integer
120    UInt16 = 5,
121    /// Signed 32-bit integer
122    Int32 = 6,
123    /// Unsigned 32-bit integer
124    UInt32 = 7,
125    /// Signed 64-bit integer
126    Int64 = 8,
127    /// Unsigned 64-bit integer
128    UInt64 = 9,
129    /// 32-bit float
130    Float32 = 10,
131    /// 64-bit float
132    Float64 = 11,
133    /// Variable-length string (UTF-8)
134    String = 12,
135    /// Variable-length binary
136    Binary = 13,
137    /// Timestamp (microseconds since epoch)
138    Timestamp = 14,
139    /// Fixed-size binary (e.g., UUIDs)
140    FixedBinary = 15,
141}
142
143impl TbpColumnType {
144    /// Get the fixed size of this type, or None for variable-length types
145    pub fn fixed_size(&self) -> Option<usize> {
146        match self {
147            TbpColumnType::Null => Some(0),
148            TbpColumnType::Bool => Some(1),
149            TbpColumnType::Int8 | TbpColumnType::UInt8 => Some(1),
150            TbpColumnType::Int16 | TbpColumnType::UInt16 => Some(2),
151            TbpColumnType::Int32 | TbpColumnType::UInt32 | TbpColumnType::Float32 => Some(4),
152            TbpColumnType::Int64 | TbpColumnType::UInt64 | TbpColumnType::Float64 | TbpColumnType::Timestamp => Some(8),
153            TbpColumnType::String | TbpColumnType::Binary => None,
154            TbpColumnType::FixedBinary => None, // Size specified per column
155        }
156    }
157
158    /// Check if this type is variable-length
159    pub fn is_variable(&self) -> bool {
160        self.fixed_size().is_none()
161    }
162
163    pub fn from_byte(b: u8) -> Option<Self> {
164        match b {
165            0 => Some(Self::Null),
166            1 => Some(Self::Bool),
167            2 => Some(Self::Int8),
168            3 => Some(Self::UInt8),
169            4 => Some(Self::Int16),
170            5 => Some(Self::UInt16),
171            6 => Some(Self::Int32),
172            7 => Some(Self::UInt32),
173            8 => Some(Self::Int64),
174            9 => Some(Self::UInt64),
175            10 => Some(Self::Float32),
176            11 => Some(Self::Float64),
177            12 => Some(Self::String),
178            13 => Some(Self::Binary),
179            14 => Some(Self::Timestamp),
180            15 => Some(Self::FixedBinary),
181            _ => None,
182        }
183    }
184}
185
186/// Column definition in TBP schema
187#[derive(Debug, Clone)]
188pub struct TbpColumn {
189    /// Column name
190    pub name: String,
191    /// Column type
192    pub col_type: TbpColumnType,
193    /// Fixed size for FixedBinary type
194    pub fixed_size: Option<u16>,
195    /// Column is nullable
196    pub nullable: bool,
197}
198
199impl TbpColumn {
200    pub fn new(name: impl Into<String>, col_type: TbpColumnType) -> Self {
201        Self {
202            name: name.into(),
203            col_type,
204            fixed_size: None,
205            nullable: true,
206        }
207    }
208
209    pub fn with_fixed_size(mut self, size: u16) -> Self {
210        self.fixed_size = Some(size);
211        self
212    }
213
214    pub fn not_null(mut self) -> Self {
215        self.nullable = false;
216        self
217    }
218}
219
220/// TBP schema
221#[derive(Debug, Clone)]
222pub struct TbpSchema {
223    /// Table name
224    pub name: String,
225    /// Columns
226    pub columns: Vec<TbpColumn>,
227    /// Schema ID (hash for validation)
228    pub schema_id: u64,
229}
230
231impl TbpSchema {
232    pub fn new(name: impl Into<String>, columns: Vec<TbpColumn>) -> Self {
233        let name = name.into();
234        let schema_id = Self::compute_schema_id(&name, &columns);
235        Self {
236            name,
237            columns,
238            schema_id,
239        }
240    }
241
242    /// Compute a hash of the schema for validation
243    fn compute_schema_id(name: &str, columns: &[TbpColumn]) -> u64 {
244        use std::hash::{Hash, Hasher};
245        use std::collections::hash_map::DefaultHasher;
246
247        let mut hasher = DefaultHasher::new();
248        name.hash(&mut hasher);
249        for col in columns {
250            col.name.hash(&mut hasher);
251            (col.col_type as u8).hash(&mut hasher);
252            col.fixed_size.hash(&mut hasher);
253            col.nullable.hash(&mut hasher);
254        }
255        hasher.finish()
256    }
257
258    /// Check if schema has any variable-length columns
259    pub fn has_variable_columns(&self) -> bool {
260        self.columns.iter().any(|c| c.col_type.is_variable())
261    }
262
263    /// Check if schema has any nullable columns
264    pub fn has_nullable_columns(&self) -> bool {
265        self.columns.iter().any(|c| c.nullable)
266    }
267
268    /// Get the fixed row size (if all columns are fixed-size)
269    pub fn fixed_row_size(&self) -> Option<usize> {
270        if self.has_variable_columns() {
271            return None;
272        }
273
274        let mut size = 0;
275        for col in &self.columns {
276            match col.col_type {
277                TbpColumnType::FixedBinary => {
278                    size += col.fixed_size.unwrap_or(0) as usize;
279                }
280                _ => {
281                    size += col.col_type.fixed_size()?;
282                }
283            }
284        }
285        Some(size)
286    }
287}
288
289/// TBP header (32 bytes)
290#[derive(Debug, Clone)]
291pub struct TbpHeader {
292    /// Magic number (should be TBP_MAGIC)
293    pub magic: u32,
294    /// Version number
295    pub version: u16,
296    /// Flags
297    pub flags: TbpFlags,
298    /// Schema ID for validation
299    pub schema_id: u64,
300    /// Number of rows
301    pub row_count: u32,
302    /// Number of columns
303    pub column_count: u16,
304    /// Reserved
305    pub reserved: u16,
306    /// Offset to null bitmap (0 if no nulls)
307    pub null_bitmap_offset: u32,
308    /// Offset to row index (0 if fixed-size rows)
309    pub row_index_offset: u32,
310}
311
312impl TbpHeader {
313    /// Write header to a buffer
314    pub fn write<W: Write>(&self, w: &mut W) -> io::Result<()> {
315        w.write_u32::<LittleEndian>(self.magic)?;
316        w.write_u16::<LittleEndian>(self.version)?;
317        w.write_u16::<LittleEndian>(self.flags.0)?;
318        w.write_u64::<LittleEndian>(self.schema_id)?;
319        w.write_u32::<LittleEndian>(self.row_count)?;
320        w.write_u16::<LittleEndian>(self.column_count)?;
321        w.write_u16::<LittleEndian>(self.reserved)?;
322        w.write_u32::<LittleEndian>(self.null_bitmap_offset)?;
323        w.write_u32::<LittleEndian>(self.row_index_offset)?;
324        Ok(())
325    }
326
327    /// Read header from a buffer
328    pub fn read(data: &[u8]) -> io::Result<Self> {
329        if data.len() < TBP_HEADER_SIZE {
330            return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "Header too short"));
331        }
332
333        let mut cursor = std::io::Cursor::new(data);
334        let magic = cursor.read_u32::<LittleEndian>()?;
335        if magic != TBP_MAGIC {
336            return Err(io::Error::new(io::ErrorKind::InvalidData, "Invalid TBP magic"));
337        }
338
339        let header = Self {
340            magic,
341            version: cursor.read_u16::<LittleEndian>()?,
342            flags: TbpFlags(cursor.read_u16::<LittleEndian>()?),
343            schema_id: cursor.read_u64::<LittleEndian>()?,
344            row_count: cursor.read_u32::<LittleEndian>()?,
345            column_count: cursor.read_u16::<LittleEndian>()?,
346            reserved: cursor.read_u16::<LittleEndian>()?,
347            null_bitmap_offset: cursor.read_u32::<LittleEndian>()?,
348            row_index_offset: cursor.read_u32::<LittleEndian>()?,
349        };
350
351        let data_len = data.len() as u64;
352
353        // Validate offsets are within the buffer to prevent OOB access
354        // from malformed or adversarial TBP payloads.
355        // Only validate when the buffer contains more than just the header
356        // (header-only buffers are used for serialization roundtrip tests).
357        if data_len > TBP_HEADER_SIZE as u64 {
358            if header.null_bitmap_offset != 0 && (header.null_bitmap_offset as u64) >= data_len {
359                return Err(io::Error::new(
360                    io::ErrorKind::InvalidData,
361                    format!(
362                        "null_bitmap_offset ({}) exceeds data length ({})",
363                        header.null_bitmap_offset, data_len
364                    ),
365                ));
366            }
367            if header.row_index_offset != 0 && (header.row_index_offset as u64) >= data_len {
368                return Err(io::Error::new(
369                    io::ErrorKind::InvalidData,
370                    format!(
371                        "row_index_offset ({}) exceeds data length ({})",
372                        header.row_index_offset, data_len
373                    ),
374                ));
375            }
376        }
377
378        Ok(header)
379    }
380}
381
382/// Null bitmap for efficient null checking
383#[derive(Debug, Clone, Copy)]
384pub struct NullBitmap<'a> {
385    data: &'a [u8],
386    columns: usize,
387}
388
389impl<'a> NullBitmap<'a> {
390    pub fn new(data: &'a [u8], columns: usize) -> Self {
391        Self { data, columns }
392    }
393
394    /// Check if a cell is null - O(1)
395    #[inline]
396    pub fn is_null(&self, row: usize, col: usize) -> bool {
397        let bit_idx = row * self.columns + col;
398        let byte_idx = bit_idx / 8;
399        let bit_pos = bit_idx % 8;
400
401        if byte_idx >= self.data.len() {
402            return false;
403        }
404
405        self.data[byte_idx] & (1 << bit_pos) != 0
406    }
407
408    /// Calculate required size for bitmap
409    pub fn required_size(rows: usize, cols: usize) -> usize {
410        (rows * cols + 7) / 8
411    }
412}
413
414/// Mutable null bitmap for writing
415pub struct NullBitmapMut {
416    data: Vec<u8>,
417    columns: usize,
418}
419
420impl NullBitmapMut {
421    pub fn new(rows: usize, columns: usize) -> Self {
422        let size = NullBitmap::required_size(rows, columns);
423        Self {
424            data: vec![0; size],
425            columns,
426        }
427    }
428
429    /// Set a cell as null
430    #[inline]
431    pub fn set_null(&mut self, row: usize, col: usize) {
432        let bit_idx = row * self.columns + col;
433        let byte_idx = bit_idx / 8;
434        let bit_pos = bit_idx % 8;
435
436        if byte_idx < self.data.len() {
437            self.data[byte_idx] |= 1 << bit_pos;
438        }
439    }
440
441    /// Get the raw bitmap data
442    pub fn as_bytes(&self) -> &[u8] {
443        &self.data
444    }
445
446    /// Into raw data
447    pub fn into_bytes(self) -> Vec<u8> {
448        self.data
449    }
450}
451
452/// Zero-copy row view into TBP data
453#[derive(Debug, Clone)]
454pub struct RowView<'a> {
455    /// Schema reference
456    schema: &'a TbpSchema,
457    /// Raw row data
458    data: &'a [u8],
459    /// Null bitmap reference
460    null_bitmap: Option<&'a NullBitmap<'a>>,
461    /// Row index for null bitmap access
462    row_idx: usize,
463}
464
465impl<'a> RowView<'a> {
466    pub fn new(
467        schema: &'a TbpSchema,
468        data: &'a [u8],
469        null_bitmap: Option<&'a NullBitmap<'a>>,
470        row_idx: usize,
471    ) -> Self {
472        Self {
473            schema,
474            data,
475            null_bitmap,
476            row_idx,
477        }
478    }
479
480    /// Check if column is null - O(1)
481    #[inline]
482    pub fn is_null(&self, col: usize) -> bool {
483        self.null_bitmap
484            .map(|b| b.is_null(self.row_idx, col))
485            .unwrap_or(false)
486    }
487
488    /// Get column offset for fixed-size columns
489    fn column_offset(&self, col: usize) -> usize {
490        let mut offset = 0;
491        for c in &self.schema.columns[..col] {
492            offset += match c.col_type {
493                TbpColumnType::FixedBinary => c.fixed_size.unwrap_or(0) as usize,
494                _ => c.col_type.fixed_size().unwrap_or(0),
495            };
496        }
497        offset
498    }
499
500    /// Read a boolean column - O(1)
501    pub fn read_bool(&self, col: usize) -> Option<bool> {
502        if self.is_null(col) {
503            return None;
504        }
505        let offset = self.column_offset(col);
506        Some(self.data.get(offset).copied().unwrap_or(0) != 0)
507    }
508
509    /// Read an i64 column - O(1)
510    pub fn read_i64(&self, col: usize) -> Option<i64> {
511        if self.is_null(col) {
512            return None;
513        }
514        let offset = self.column_offset(col);
515        if offset + 8 > self.data.len() {
516            return None;
517        }
518        let bytes: [u8; 8] = self.data[offset..offset + 8].try_into().ok()?;
519        Some(i64::from_le_bytes(bytes))
520    }
521
522    /// Read a u64 column - O(1)
523    pub fn read_u64(&self, col: usize) -> Option<u64> {
524        if self.is_null(col) {
525            return None;
526        }
527        let offset = self.column_offset(col);
528        if offset + 8 > self.data.len() {
529            return None;
530        }
531        let bytes: [u8; 8] = self.data[offset..offset + 8].try_into().ok()?;
532        Some(u64::from_le_bytes(bytes))
533    }
534
535    /// Read an f64 column - O(1)
536    pub fn read_f64(&self, col: usize) -> Option<f64> {
537        if self.is_null(col) {
538            return None;
539        }
540        let offset = self.column_offset(col);
541        if offset + 8 > self.data.len() {
542            return None;
543        }
544        let bytes: [u8; 8] = self.data[offset..offset + 8].try_into().ok()?;
545        Some(f64::from_le_bytes(bytes))
546    }
547
548    /// Read an i32 column - O(1)
549    pub fn read_i32(&self, col: usize) -> Option<i32> {
550        if self.is_null(col) {
551            return None;
552        }
553        let offset = self.column_offset(col);
554        if offset + 4 > self.data.len() {
555            return None;
556        }
557        let bytes: [u8; 4] = self.data[offset..offset + 4].try_into().ok()?;
558        Some(i32::from_le_bytes(bytes))
559    }
560
561    /// Read an f32 column - O(1)
562    pub fn read_f32(&self, col: usize) -> Option<f32> {
563        if self.is_null(col) {
564            return None;
565        }
566        let offset = self.column_offset(col);
567        if offset + 4 > self.data.len() {
568            return None;
569        }
570        let bytes: [u8; 4] = self.data[offset..offset + 4].try_into().ok()?;
571        Some(f32::from_le_bytes(bytes))
572    }
573
574    /// Get raw row data
575    pub fn raw_data(&self) -> &[u8] {
576        self.data
577    }
578}
579
580/// TBP writer for creating binary tables
581pub struct TbpWriter {
582    schema: TbpSchema,
583    null_bitmap: NullBitmapMut,
584    row_index: Vec<u32>,
585    data: Vec<u8>,
586    row_count: usize,
587}
588
589impl TbpWriter {
590    pub fn new(schema: TbpSchema, estimated_rows: usize) -> Self {
591        Self {
592            null_bitmap: NullBitmapMut::new(estimated_rows, schema.columns.len()),
593            row_index: Vec::with_capacity(estimated_rows),
594            data: Vec::with_capacity(estimated_rows * schema.fixed_row_size().unwrap_or(64)),
595            row_count: 0,
596            schema,
597        }
598    }
599
600    /// Start a new row and return a row writer
601    pub fn start_row(&mut self) -> TbpRowWriter<'_> {
602        let offset = self.data.len() as u32;
603        self.row_index.push(offset);
604        TbpRowWriter {
605            writer: self,
606            col_idx: 0,
607        }
608    }
609
610    /// Mark a cell as null
611    fn set_null(&mut self, row: usize, col: usize) {
612        self.null_bitmap.set_null(row, col);
613    }
614
615    /// Finish writing and produce the final buffer
616    pub fn finish(self) -> Vec<u8> {
617        let has_nulls = self.schema.has_nullable_columns();
618        let has_variable = self.schema.has_variable_columns();
619
620        let mut flags = TbpFlags(0);
621        if has_nulls {
622            flags.0 |= TbpFlags::HAS_NULLS;
623        }
624        if has_variable {
625            flags.0 |= TbpFlags::HAS_ROW_INDEX;
626        }
627
628        // Calculate offsets
629        let null_bitmap_offset = if has_nulls { TBP_HEADER_SIZE as u32 } else { 0 };
630        let null_bitmap_size = if has_nulls {
631            NullBitmap::required_size(self.row_count, self.schema.columns.len())
632        } else {
633            0
634        };
635
636        let row_index_offset = if has_variable {
637            (TBP_HEADER_SIZE + null_bitmap_size) as u32
638        } else {
639            0
640        };
641        let row_index_size = if has_variable {
642            self.row_count * 4
643        } else {
644            0
645        };
646
647        let data_offset = TBP_HEADER_SIZE + null_bitmap_size + row_index_size;
648
649        let header = TbpHeader {
650            magic: TBP_MAGIC,
651            version: TBP_VERSION,
652            flags,
653            schema_id: self.schema.schema_id,
654            row_count: self.row_count as u32,
655            column_count: self.schema.columns.len() as u16,
656            reserved: 0,
657            null_bitmap_offset,
658            row_index_offset,
659        };
660
661        let total_size = data_offset + self.data.len();
662        let mut buffer = Vec::with_capacity(total_size);
663
664        // Write header
665        header.write(&mut buffer).unwrap();
666
667        // Write null bitmap
668        if has_nulls {
669            let required = NullBitmap::required_size(self.row_count, self.schema.columns.len());
670            buffer.extend_from_slice(&self.null_bitmap.as_bytes()[..required]);
671        }
672
673        // Write row index
674        if has_variable {
675            for offset in &self.row_index {
676                buffer.write_u32::<LittleEndian>(*offset + data_offset as u32).unwrap();
677            }
678        }
679
680        // Write data
681        buffer.extend_from_slice(&self.data);
682
683        buffer
684    }
685}
686
687/// Row writer for TBP
688pub struct TbpRowWriter<'a> {
689    writer: &'a mut TbpWriter,
690    col_idx: usize,
691}
692
693impl<'a> TbpRowWriter<'a> {
694    /// Write a null value
695    pub fn write_null(mut self) -> Self {
696        self.writer.set_null(self.writer.row_count, self.col_idx);
697        self.col_idx += 1;
698        self
699    }
700
701    /// Write a boolean
702    pub fn write_bool(mut self, value: bool) -> Self {
703        self.writer.data.push(if value { 1 } else { 0 });
704        self.col_idx += 1;
705        self
706    }
707
708    /// Write an i64
709    pub fn write_i64(mut self, value: i64) -> Self {
710        self.writer.data.extend_from_slice(&value.to_le_bytes());
711        self.col_idx += 1;
712        self
713    }
714
715    /// Write a u64
716    pub fn write_u64(mut self, value: u64) -> Self {
717        self.writer.data.extend_from_slice(&value.to_le_bytes());
718        self.col_idx += 1;
719        self
720    }
721
722    /// Write an f64
723    pub fn write_f64(mut self, value: f64) -> Self {
724        self.writer.data.extend_from_slice(&value.to_le_bytes());
725        self.col_idx += 1;
726        self
727    }
728
729    /// Write an i32
730    pub fn write_i32(mut self, value: i32) -> Self {
731        self.writer.data.extend_from_slice(&value.to_le_bytes());
732        self.col_idx += 1;
733        self
734    }
735
736    /// Write an f32
737    pub fn write_f32(mut self, value: f32) -> Self {
738        self.writer.data.extend_from_slice(&value.to_le_bytes());
739        self.col_idx += 1;
740        self
741    }
742
743    /// Write a string (variable length)
744    pub fn write_string(mut self, value: &str) -> Self {
745        let bytes = value.as_bytes();
746        self.writer.data.write_u32::<LittleEndian>(bytes.len() as u32).unwrap();
747        self.writer.data.extend_from_slice(bytes);
748        self.col_idx += 1;
749        self
750    }
751
752    /// Write binary data (variable length)
753    pub fn write_binary(mut self, value: &[u8]) -> Self {
754        self.writer.data.write_u32::<LittleEndian>(value.len() as u32).unwrap();
755        self.writer.data.extend_from_slice(value);
756        self.col_idx += 1;
757        self
758    }
759
760    /// Finish the row
761    pub fn finish(self) {
762        self.writer.row_count += 1;
763    }
764}
765
766/// TBP reader for zero-copy access
767pub struct TbpReader<'a> {
768    data: &'a [u8],
769    header: TbpHeader,
770    schema: &'a TbpSchema,
771}
772
773impl<'a> TbpReader<'a> {
774    /// Create a new reader
775    pub fn new(data: &'a [u8], schema: &'a TbpSchema) -> io::Result<Self> {
776        let header = TbpHeader::read(data)?;
777
778        if header.schema_id != schema.schema_id {
779            return Err(io::Error::new(
780                io::ErrorKind::InvalidData,
781                "Schema ID mismatch",
782            ));
783        }
784
785        Ok(Self {
786            data,
787            header,
788            schema,
789        })
790    }
791
792    /// Get number of rows
793    pub fn row_count(&self) -> usize {
794        self.header.row_count as usize
795    }
796
797    /// Get a row by index - O(1)
798    pub fn get_row(&self, row: usize) -> Option<RowView<'_>> {
799        if row >= self.row_count() {
800            return None;
801        }
802
803        // Get row offset
804        let row_offset = if self.header.flags.has_row_index() {
805            let idx_offset = self.header.row_index_offset as usize + row * 4;
806            if idx_offset + 4 > self.data.len() {
807                return None;
808            }
809            let bytes: [u8; 4] = self.data[idx_offset..idx_offset + 4].try_into().ok()?;
810            u32::from_le_bytes(bytes) as usize
811        } else {
812            // Fixed-size rows
813            let row_size = self.schema.fixed_row_size()?;
814            let null_bitmap_size = if self.header.flags.has_nulls() {
815                NullBitmap::required_size(self.row_count(), self.schema.columns.len())
816            } else {
817                0
818            };
819            TBP_HEADER_SIZE + null_bitmap_size + row * row_size
820        };
821
822        let row_data = &self.data[row_offset..];
823
824        // TODO: properly construct null bitmap reference
825        Some(RowView::new(self.schema, row_data, None, row))
826    }
827
828    /// Iterate over all rows - zero allocation per row
829    pub fn iter(&'a self) -> impl Iterator<Item = RowView<'a>> {
830        (0..self.row_count()).filter_map(move |i| self.get_row(i))
831    }
832}
833
834#[cfg(test)]
835mod tests {
836    use super::*;
837
838    #[test]
839    fn test_header_roundtrip() {
840        let header = TbpHeader {
841            magic: TBP_MAGIC,
842            version: TBP_VERSION,
843            flags: TbpFlags(TbpFlags::HAS_NULLS | TbpFlags::HAS_ROW_INDEX),
844            schema_id: 12345678,
845            row_count: 100,
846            column_count: 5,
847            reserved: 0,
848            null_bitmap_offset: 32,
849            row_index_offset: 48,
850        };
851
852        let mut buffer = Vec::new();
853        header.write(&mut buffer).unwrap();
854        assert_eq!(buffer.len(), TBP_HEADER_SIZE);
855
856        let parsed = TbpHeader::read(&buffer).unwrap();
857        assert_eq!(parsed.magic, TBP_MAGIC);
858        assert_eq!(parsed.version, TBP_VERSION);
859        assert_eq!(parsed.row_count, 100);
860        assert_eq!(parsed.column_count, 5);
861    }
862
863    #[test]
864    fn test_null_bitmap() {
865        let mut bitmap = NullBitmapMut::new(10, 5);
866        bitmap.set_null(0, 0);
867        bitmap.set_null(5, 3);
868        bitmap.set_null(9, 4);
869
870        let data = bitmap.as_bytes();
871        let reader = NullBitmap::new(data, 5);
872
873        assert!(reader.is_null(0, 0));
874        assert!(!reader.is_null(0, 1));
875        assert!(reader.is_null(5, 3));
876        assert!(reader.is_null(9, 4));
877        assert!(!reader.is_null(9, 3));
878    }
879
880    #[test]
881    fn test_writer_reader_roundtrip() {
882        let schema = TbpSchema::new(
883            "test_table",
884            vec![
885                TbpColumn::new("id", TbpColumnType::Int64).not_null(),
886                TbpColumn::new("value", TbpColumnType::Float64),
887            ],
888        );
889
890        let mut writer = TbpWriter::new(schema.clone(), 100);
891
892        // Write some rows
893        for i in 0..10 {
894            writer
895                .start_row()
896                .write_i64(i)
897                .write_f64(i as f64 * 1.5)
898                .finish();
899        }
900
901        let data = writer.finish();
902
903        // Read back
904        let reader = TbpReader::new(&data, &schema).unwrap();
905        assert_eq!(reader.row_count(), 10);
906
907        let row = reader.get_row(5).unwrap();
908        assert_eq!(row.read_i64(0), Some(5));
909        assert_eq!(row.read_f64(1), Some(7.5));
910    }
911}