Skip to main content

sochdb_core/
tbp.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! TOON Binary Protocol (TBP) - Zero-Copy Binary Wire Format
19//!
20//! From mm.md Task 3.1: Zero-Copy Binary Wire Format
21//!
22//! ## Problem
23//!
24//! Current TOON format is text-based with parsing overhead:
25//! - O(n) string allocations per row
26//! - UTF-8 validation on every parse
27//! - No random access (must scan from start)
28//! - Variable-length encoding requires sequential parsing
29//!
30//! ## Solution
31//!
32//! Binary protocol enables:
33//! - O(1) field access via row index + column offset
34//! - Zero-copy reads from mmap'd files
35//! - Null bitmap for efficient NULL handling
36//! - LLM-friendly text emission on demand
37//!
38//! ## Layout
39//!
40//! ```text
41//! TBP Layout (Little-Endian, 32-byte header):
42//! ┌─────────────────────────────────────────────────────┐
43//! │ magic: u32 = 0x544F4F4E ("TOON")                    │
44//! │ version: u16, flags: u16                            │
45//! │ schema_id: u64 (hash for validation)                │
46//! │ row_count: u32, column_count: u16                   │
47//! │ null_bitmap_offset: u32, row_index_offset: u32      │
48//! │ data_offset: u32                                    │
49//! ├─────────────────────────────────────────────────────┤
50//! │ Null Bitmap: ceil(rows × cols / 8) bytes            │
51//! │ Row Index: [u32; row_count] offsets                 │
52//! │ Data Section (columnar within blocks)               │
53//! └─────────────────────────────────────────────────────┘
54//!
55//! Access complexity:
56//! - Row access: O(1) via row_index[row]
57//! - Field access: O(1) via column_type + fixed_offset
58//! - Null check: O(1) via bitmap[row * cols + col]
59//! ```
60
61use std::io::{self, Write};
62
63use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
64
65/// TBP magic number: "TOON" in ASCII
66pub const TBP_MAGIC: u32 = 0x544F_4F4E;
67
68/// Current TBP version
69pub const TBP_VERSION: u16 = 1;
70
71/// TBP header size in bytes
72pub const TBP_HEADER_SIZE: usize = 32;
73
74/// TBP flags
75#[derive(Debug, Clone, Copy, Default)]
76pub struct TbpFlags(pub u16);
77
78impl TbpFlags {
79    /// Null bitmap is present
80    pub const HAS_NULLS: u16 = 1 << 0;
81    /// Row index is present (for variable-length data)
82    pub const HAS_ROW_INDEX: u16 = 1 << 1;
83    /// Data is compressed
84    pub const COMPRESSED: u16 = 1 << 2;
85    /// Schema is embedded in the file
86    pub const EMBEDDED_SCHEMA: u16 = 1 << 3;
87
88    pub fn has_nulls(&self) -> bool {
89        self.0 & Self::HAS_NULLS != 0
90    }
91
92    pub fn has_row_index(&self) -> bool {
93        self.0 & Self::HAS_ROW_INDEX != 0
94    }
95
96    pub fn is_compressed(&self) -> bool {
97        self.0 & Self::COMPRESSED != 0
98    }
99
100    pub fn has_embedded_schema(&self) -> bool {
101        self.0 & Self::EMBEDDED_SCHEMA != 0
102    }
103}
104
105/// Column type for TBP
106#[derive(Debug, Clone, Copy, PartialEq, Eq)]
107#[repr(u8)]
108pub enum TbpColumnType {
109    /// Null (no data)
110    Null = 0,
111    /// Boolean (1 byte, 0 or 1)
112    Bool = 1,
113    /// Signed 8-bit integer
114    Int8 = 2,
115    /// Unsigned 8-bit integer
116    UInt8 = 3,
117    /// Signed 16-bit integer
118    Int16 = 4,
119    /// Unsigned 16-bit integer
120    UInt16 = 5,
121    /// Signed 32-bit integer
122    Int32 = 6,
123    /// Unsigned 32-bit integer
124    UInt32 = 7,
125    /// Signed 64-bit integer
126    Int64 = 8,
127    /// Unsigned 64-bit integer
128    UInt64 = 9,
129    /// 32-bit float
130    Float32 = 10,
131    /// 64-bit float
132    Float64 = 11,
133    /// Variable-length string (UTF-8)
134    String = 12,
135    /// Variable-length binary
136    Binary = 13,
137    /// Timestamp (microseconds since epoch)
138    Timestamp = 14,
139    /// Fixed-size binary (e.g., UUIDs)
140    FixedBinary = 15,
141}
142
143impl TbpColumnType {
144    /// Get the fixed size of this type, or None for variable-length types
145    pub fn fixed_size(&self) -> Option<usize> {
146        match self {
147            TbpColumnType::Null => Some(0),
148            TbpColumnType::Bool => Some(1),
149            TbpColumnType::Int8 | TbpColumnType::UInt8 => Some(1),
150            TbpColumnType::Int16 | TbpColumnType::UInt16 => Some(2),
151            TbpColumnType::Int32 | TbpColumnType::UInt32 | TbpColumnType::Float32 => Some(4),
152            TbpColumnType::Int64 | TbpColumnType::UInt64 | TbpColumnType::Float64 | TbpColumnType::Timestamp => Some(8),
153            TbpColumnType::String | TbpColumnType::Binary => None,
154            TbpColumnType::FixedBinary => None, // Size specified per column
155        }
156    }
157
158    /// Check if this type is variable-length
159    pub fn is_variable(&self) -> bool {
160        self.fixed_size().is_none()
161    }
162
163    pub fn from_byte(b: u8) -> Option<Self> {
164        match b {
165            0 => Some(Self::Null),
166            1 => Some(Self::Bool),
167            2 => Some(Self::Int8),
168            3 => Some(Self::UInt8),
169            4 => Some(Self::Int16),
170            5 => Some(Self::UInt16),
171            6 => Some(Self::Int32),
172            7 => Some(Self::UInt32),
173            8 => Some(Self::Int64),
174            9 => Some(Self::UInt64),
175            10 => Some(Self::Float32),
176            11 => Some(Self::Float64),
177            12 => Some(Self::String),
178            13 => Some(Self::Binary),
179            14 => Some(Self::Timestamp),
180            15 => Some(Self::FixedBinary),
181            _ => None,
182        }
183    }
184}
185
186/// Column definition in TBP schema
187#[derive(Debug, Clone)]
188pub struct TbpColumn {
189    /// Column name
190    pub name: String,
191    /// Column type
192    pub col_type: TbpColumnType,
193    /// Fixed size for FixedBinary type
194    pub fixed_size: Option<u16>,
195    /// Column is nullable
196    pub nullable: bool,
197}
198
199impl TbpColumn {
200    pub fn new(name: impl Into<String>, col_type: TbpColumnType) -> Self {
201        Self {
202            name: name.into(),
203            col_type,
204            fixed_size: None,
205            nullable: true,
206        }
207    }
208
209    pub fn with_fixed_size(mut self, size: u16) -> Self {
210        self.fixed_size = Some(size);
211        self
212    }
213
214    pub fn not_null(mut self) -> Self {
215        self.nullable = false;
216        self
217    }
218}
219
220/// TBP schema
221#[derive(Debug, Clone)]
222pub struct TbpSchema {
223    /// Table name
224    pub name: String,
225    /// Columns
226    pub columns: Vec<TbpColumn>,
227    /// Schema ID (hash for validation)
228    pub schema_id: u64,
229}
230
231impl TbpSchema {
232    pub fn new(name: impl Into<String>, columns: Vec<TbpColumn>) -> Self {
233        let name = name.into();
234        let schema_id = Self::compute_schema_id(&name, &columns);
235        Self {
236            name,
237            columns,
238            schema_id,
239        }
240    }
241
242    /// Compute a hash of the schema for validation
243    fn compute_schema_id(name: &str, columns: &[TbpColumn]) -> u64 {
244        use std::hash::{Hash, Hasher};
245        use std::collections::hash_map::DefaultHasher;
246
247        let mut hasher = DefaultHasher::new();
248        name.hash(&mut hasher);
249        for col in columns {
250            col.name.hash(&mut hasher);
251            (col.col_type as u8).hash(&mut hasher);
252            col.fixed_size.hash(&mut hasher);
253            col.nullable.hash(&mut hasher);
254        }
255        hasher.finish()
256    }
257
258    /// Check if schema has any variable-length columns
259    pub fn has_variable_columns(&self) -> bool {
260        self.columns.iter().any(|c| c.col_type.is_variable())
261    }
262
263    /// Check if schema has any nullable columns
264    pub fn has_nullable_columns(&self) -> bool {
265        self.columns.iter().any(|c| c.nullable)
266    }
267
268    /// Get the fixed row size (if all columns are fixed-size)
269    pub fn fixed_row_size(&self) -> Option<usize> {
270        if self.has_variable_columns() {
271            return None;
272        }
273
274        let mut size = 0;
275        for col in &self.columns {
276            match col.col_type {
277                TbpColumnType::FixedBinary => {
278                    size += col.fixed_size.unwrap_or(0) as usize;
279                }
280                _ => {
281                    size += col.col_type.fixed_size()?;
282                }
283            }
284        }
285        Some(size)
286    }
287}
288
289/// TBP header (32 bytes)
290#[derive(Debug, Clone)]
291pub struct TbpHeader {
292    /// Magic number (should be TBP_MAGIC)
293    pub magic: u32,
294    /// Version number
295    pub version: u16,
296    /// Flags
297    pub flags: TbpFlags,
298    /// Schema ID for validation
299    pub schema_id: u64,
300    /// Number of rows
301    pub row_count: u32,
302    /// Number of columns
303    pub column_count: u16,
304    /// Reserved
305    pub reserved: u16,
306    /// Offset to null bitmap (0 if no nulls)
307    pub null_bitmap_offset: u32,
308    /// Offset to row index (0 if fixed-size rows)
309    pub row_index_offset: u32,
310}
311
312impl TbpHeader {
313    /// Write header to a buffer
314    pub fn write<W: Write>(&self, w: &mut W) -> io::Result<()> {
315        w.write_u32::<LittleEndian>(self.magic)?;
316        w.write_u16::<LittleEndian>(self.version)?;
317        w.write_u16::<LittleEndian>(self.flags.0)?;
318        w.write_u64::<LittleEndian>(self.schema_id)?;
319        w.write_u32::<LittleEndian>(self.row_count)?;
320        w.write_u16::<LittleEndian>(self.column_count)?;
321        w.write_u16::<LittleEndian>(self.reserved)?;
322        w.write_u32::<LittleEndian>(self.null_bitmap_offset)?;
323        w.write_u32::<LittleEndian>(self.row_index_offset)?;
324        Ok(())
325    }
326
327    /// Read header from a buffer
328    pub fn read(data: &[u8]) -> io::Result<Self> {
329        if data.len() < TBP_HEADER_SIZE {
330            return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "Header too short"));
331        }
332
333        let mut cursor = std::io::Cursor::new(data);
334        let magic = cursor.read_u32::<LittleEndian>()?;
335        if magic != TBP_MAGIC {
336            return Err(io::Error::new(io::ErrorKind::InvalidData, "Invalid TBP magic"));
337        }
338
339        Ok(Self {
340            magic,
341            version: cursor.read_u16::<LittleEndian>()?,
342            flags: TbpFlags(cursor.read_u16::<LittleEndian>()?),
343            schema_id: cursor.read_u64::<LittleEndian>()?,
344            row_count: cursor.read_u32::<LittleEndian>()?,
345            column_count: cursor.read_u16::<LittleEndian>()?,
346            reserved: cursor.read_u16::<LittleEndian>()?,
347            null_bitmap_offset: cursor.read_u32::<LittleEndian>()?,
348            row_index_offset: cursor.read_u32::<LittleEndian>()?,
349        })
350    }
351}
352
353/// Null bitmap for efficient null checking
354#[derive(Debug, Clone, Copy)]
355pub struct NullBitmap<'a> {
356    data: &'a [u8],
357    columns: usize,
358}
359
360impl<'a> NullBitmap<'a> {
361    pub fn new(data: &'a [u8], columns: usize) -> Self {
362        Self { data, columns }
363    }
364
365    /// Check if a cell is null - O(1)
366    #[inline]
367    pub fn is_null(&self, row: usize, col: usize) -> bool {
368        let bit_idx = row * self.columns + col;
369        let byte_idx = bit_idx / 8;
370        let bit_pos = bit_idx % 8;
371
372        if byte_idx >= self.data.len() {
373            return false;
374        }
375
376        self.data[byte_idx] & (1 << bit_pos) != 0
377    }
378
379    /// Calculate required size for bitmap
380    pub fn required_size(rows: usize, cols: usize) -> usize {
381        (rows * cols + 7) / 8
382    }
383}
384
385/// Mutable null bitmap for writing
386pub struct NullBitmapMut {
387    data: Vec<u8>,
388    columns: usize,
389}
390
391impl NullBitmapMut {
392    pub fn new(rows: usize, columns: usize) -> Self {
393        let size = NullBitmap::required_size(rows, columns);
394        Self {
395            data: vec![0; size],
396            columns,
397        }
398    }
399
400    /// Set a cell as null
401    #[inline]
402    pub fn set_null(&mut self, row: usize, col: usize) {
403        let bit_idx = row * self.columns + col;
404        let byte_idx = bit_idx / 8;
405        let bit_pos = bit_idx % 8;
406
407        if byte_idx < self.data.len() {
408            self.data[byte_idx] |= 1 << bit_pos;
409        }
410    }
411
412    /// Get the raw bitmap data
413    pub fn as_bytes(&self) -> &[u8] {
414        &self.data
415    }
416
417    /// Into raw data
418    pub fn into_bytes(self) -> Vec<u8> {
419        self.data
420    }
421}
422
423/// Zero-copy row view into TBP data
424#[derive(Debug, Clone)]
425pub struct RowView<'a> {
426    /// Schema reference
427    schema: &'a TbpSchema,
428    /// Raw row data
429    data: &'a [u8],
430    /// Null bitmap reference
431    null_bitmap: Option<&'a NullBitmap<'a>>,
432    /// Row index for null bitmap access
433    row_idx: usize,
434}
435
436impl<'a> RowView<'a> {
437    pub fn new(
438        schema: &'a TbpSchema,
439        data: &'a [u8],
440        null_bitmap: Option<&'a NullBitmap<'a>>,
441        row_idx: usize,
442    ) -> Self {
443        Self {
444            schema,
445            data,
446            null_bitmap,
447            row_idx,
448        }
449    }
450
451    /// Check if column is null - O(1)
452    #[inline]
453    pub fn is_null(&self, col: usize) -> bool {
454        self.null_bitmap
455            .map(|b| b.is_null(self.row_idx, col))
456            .unwrap_or(false)
457    }
458
459    /// Get column offset for fixed-size columns
460    fn column_offset(&self, col: usize) -> usize {
461        let mut offset = 0;
462        for c in &self.schema.columns[..col] {
463            offset += match c.col_type {
464                TbpColumnType::FixedBinary => c.fixed_size.unwrap_or(0) as usize,
465                _ => c.col_type.fixed_size().unwrap_or(0),
466            };
467        }
468        offset
469    }
470
471    /// Read a boolean column - O(1)
472    pub fn read_bool(&self, col: usize) -> Option<bool> {
473        if self.is_null(col) {
474            return None;
475        }
476        let offset = self.column_offset(col);
477        Some(self.data.get(offset).copied().unwrap_or(0) != 0)
478    }
479
480    /// Read an i64 column - O(1)
481    pub fn read_i64(&self, col: usize) -> Option<i64> {
482        if self.is_null(col) {
483            return None;
484        }
485        let offset = self.column_offset(col);
486        if offset + 8 > self.data.len() {
487            return None;
488        }
489        let bytes: [u8; 8] = self.data[offset..offset + 8].try_into().ok()?;
490        Some(i64::from_le_bytes(bytes))
491    }
492
493    /// Read a u64 column - O(1)
494    pub fn read_u64(&self, col: usize) -> Option<u64> {
495        if self.is_null(col) {
496            return None;
497        }
498        let offset = self.column_offset(col);
499        if offset + 8 > self.data.len() {
500            return None;
501        }
502        let bytes: [u8; 8] = self.data[offset..offset + 8].try_into().ok()?;
503        Some(u64::from_le_bytes(bytes))
504    }
505
506    /// Read an f64 column - O(1)
507    pub fn read_f64(&self, col: usize) -> Option<f64> {
508        if self.is_null(col) {
509            return None;
510        }
511        let offset = self.column_offset(col);
512        if offset + 8 > self.data.len() {
513            return None;
514        }
515        let bytes: [u8; 8] = self.data[offset..offset + 8].try_into().ok()?;
516        Some(f64::from_le_bytes(bytes))
517    }
518
519    /// Read an i32 column - O(1)
520    pub fn read_i32(&self, col: usize) -> Option<i32> {
521        if self.is_null(col) {
522            return None;
523        }
524        let offset = self.column_offset(col);
525        if offset + 4 > self.data.len() {
526            return None;
527        }
528        let bytes: [u8; 4] = self.data[offset..offset + 4].try_into().ok()?;
529        Some(i32::from_le_bytes(bytes))
530    }
531
532    /// Read an f32 column - O(1)
533    pub fn read_f32(&self, col: usize) -> Option<f32> {
534        if self.is_null(col) {
535            return None;
536        }
537        let offset = self.column_offset(col);
538        if offset + 4 > self.data.len() {
539            return None;
540        }
541        let bytes: [u8; 4] = self.data[offset..offset + 4].try_into().ok()?;
542        Some(f32::from_le_bytes(bytes))
543    }
544
545    /// Get raw row data
546    pub fn raw_data(&self) -> &[u8] {
547        self.data
548    }
549}
550
551/// TBP writer for creating binary tables
552pub struct TbpWriter {
553    schema: TbpSchema,
554    null_bitmap: NullBitmapMut,
555    row_index: Vec<u32>,
556    data: Vec<u8>,
557    row_count: usize,
558}
559
560impl TbpWriter {
561    pub fn new(schema: TbpSchema, estimated_rows: usize) -> Self {
562        Self {
563            null_bitmap: NullBitmapMut::new(estimated_rows, schema.columns.len()),
564            row_index: Vec::with_capacity(estimated_rows),
565            data: Vec::with_capacity(estimated_rows * schema.fixed_row_size().unwrap_or(64)),
566            row_count: 0,
567            schema,
568        }
569    }
570
571    /// Start a new row and return a row writer
572    pub fn start_row(&mut self) -> TbpRowWriter<'_> {
573        let offset = self.data.len() as u32;
574        self.row_index.push(offset);
575        TbpRowWriter {
576            writer: self,
577            col_idx: 0,
578        }
579    }
580
581    /// Mark a cell as null
582    fn set_null(&mut self, row: usize, col: usize) {
583        self.null_bitmap.set_null(row, col);
584    }
585
586    /// Finish writing and produce the final buffer
587    pub fn finish(self) -> Vec<u8> {
588        let has_nulls = self.schema.has_nullable_columns();
589        let has_variable = self.schema.has_variable_columns();
590
591        let mut flags = TbpFlags(0);
592        if has_nulls {
593            flags.0 |= TbpFlags::HAS_NULLS;
594        }
595        if has_variable {
596            flags.0 |= TbpFlags::HAS_ROW_INDEX;
597        }
598
599        // Calculate offsets
600        let null_bitmap_offset = if has_nulls { TBP_HEADER_SIZE as u32 } else { 0 };
601        let null_bitmap_size = if has_nulls {
602            NullBitmap::required_size(self.row_count, self.schema.columns.len())
603        } else {
604            0
605        };
606
607        let row_index_offset = if has_variable {
608            (TBP_HEADER_SIZE + null_bitmap_size) as u32
609        } else {
610            0
611        };
612        let row_index_size = if has_variable {
613            self.row_count * 4
614        } else {
615            0
616        };
617
618        let data_offset = TBP_HEADER_SIZE + null_bitmap_size + row_index_size;
619
620        let header = TbpHeader {
621            magic: TBP_MAGIC,
622            version: TBP_VERSION,
623            flags,
624            schema_id: self.schema.schema_id,
625            row_count: self.row_count as u32,
626            column_count: self.schema.columns.len() as u16,
627            reserved: 0,
628            null_bitmap_offset,
629            row_index_offset,
630        };
631
632        let total_size = data_offset + self.data.len();
633        let mut buffer = Vec::with_capacity(total_size);
634
635        // Write header
636        header.write(&mut buffer).unwrap();
637
638        // Write null bitmap
639        if has_nulls {
640            let required = NullBitmap::required_size(self.row_count, self.schema.columns.len());
641            buffer.extend_from_slice(&self.null_bitmap.as_bytes()[..required]);
642        }
643
644        // Write row index
645        if has_variable {
646            for offset in &self.row_index {
647                buffer.write_u32::<LittleEndian>(*offset + data_offset as u32).unwrap();
648            }
649        }
650
651        // Write data
652        buffer.extend_from_slice(&self.data);
653
654        buffer
655    }
656}
657
658/// Row writer for TBP
659pub struct TbpRowWriter<'a> {
660    writer: &'a mut TbpWriter,
661    col_idx: usize,
662}
663
664impl<'a> TbpRowWriter<'a> {
665    /// Write a null value
666    pub fn write_null(mut self) -> Self {
667        self.writer.set_null(self.writer.row_count, self.col_idx);
668        self.col_idx += 1;
669        self
670    }
671
672    /// Write a boolean
673    pub fn write_bool(mut self, value: bool) -> Self {
674        self.writer.data.push(if value { 1 } else { 0 });
675        self.col_idx += 1;
676        self
677    }
678
679    /// Write an i64
680    pub fn write_i64(mut self, value: i64) -> Self {
681        self.writer.data.extend_from_slice(&value.to_le_bytes());
682        self.col_idx += 1;
683        self
684    }
685
686    /// Write a u64
687    pub fn write_u64(mut self, value: u64) -> Self {
688        self.writer.data.extend_from_slice(&value.to_le_bytes());
689        self.col_idx += 1;
690        self
691    }
692
693    /// Write an f64
694    pub fn write_f64(mut self, value: f64) -> Self {
695        self.writer.data.extend_from_slice(&value.to_le_bytes());
696        self.col_idx += 1;
697        self
698    }
699
700    /// Write an i32
701    pub fn write_i32(mut self, value: i32) -> Self {
702        self.writer.data.extend_from_slice(&value.to_le_bytes());
703        self.col_idx += 1;
704        self
705    }
706
707    /// Write an f32
708    pub fn write_f32(mut self, value: f32) -> Self {
709        self.writer.data.extend_from_slice(&value.to_le_bytes());
710        self.col_idx += 1;
711        self
712    }
713
714    /// Write a string (variable length)
715    pub fn write_string(mut self, value: &str) -> Self {
716        let bytes = value.as_bytes();
717        self.writer.data.write_u32::<LittleEndian>(bytes.len() as u32).unwrap();
718        self.writer.data.extend_from_slice(bytes);
719        self.col_idx += 1;
720        self
721    }
722
723    /// Write binary data (variable length)
724    pub fn write_binary(mut self, value: &[u8]) -> Self {
725        self.writer.data.write_u32::<LittleEndian>(value.len() as u32).unwrap();
726        self.writer.data.extend_from_slice(value);
727        self.col_idx += 1;
728        self
729    }
730
731    /// Finish the row
732    pub fn finish(self) {
733        self.writer.row_count += 1;
734    }
735}
736
737/// TBP reader for zero-copy access
738pub struct TbpReader<'a> {
739    data: &'a [u8],
740    header: TbpHeader,
741    schema: &'a TbpSchema,
742}
743
744impl<'a> TbpReader<'a> {
745    /// Create a new reader
746    pub fn new(data: &'a [u8], schema: &'a TbpSchema) -> io::Result<Self> {
747        let header = TbpHeader::read(data)?;
748
749        if header.schema_id != schema.schema_id {
750            return Err(io::Error::new(
751                io::ErrorKind::InvalidData,
752                "Schema ID mismatch",
753            ));
754        }
755
756        Ok(Self {
757            data,
758            header,
759            schema,
760        })
761    }
762
763    /// Get number of rows
764    pub fn row_count(&self) -> usize {
765        self.header.row_count as usize
766    }
767
768    /// Get a row by index - O(1)
769    pub fn get_row(&self, row: usize) -> Option<RowView<'_>> {
770        if row >= self.row_count() {
771            return None;
772        }
773
774        // Get row offset
775        let row_offset = if self.header.flags.has_row_index() {
776            let idx_offset = self.header.row_index_offset as usize + row * 4;
777            if idx_offset + 4 > self.data.len() {
778                return None;
779            }
780            let bytes: [u8; 4] = self.data[idx_offset..idx_offset + 4].try_into().ok()?;
781            u32::from_le_bytes(bytes) as usize
782        } else {
783            // Fixed-size rows
784            let row_size = self.schema.fixed_row_size()?;
785            let null_bitmap_size = if self.header.flags.has_nulls() {
786                NullBitmap::required_size(self.row_count(), self.schema.columns.len())
787            } else {
788                0
789            };
790            TBP_HEADER_SIZE + null_bitmap_size + row * row_size
791        };
792
793        let row_data = &self.data[row_offset..];
794
795        // TODO: properly construct null bitmap reference
796        Some(RowView::new(self.schema, row_data, None, row))
797    }
798
799    /// Iterate over all rows - zero allocation per row
800    pub fn iter(&'a self) -> impl Iterator<Item = RowView<'a>> {
801        (0..self.row_count()).filter_map(move |i| self.get_row(i))
802    }
803}
804
805#[cfg(test)]
806mod tests {
807    use super::*;
808
809    #[test]
810    fn test_header_roundtrip() {
811        let header = TbpHeader {
812            magic: TBP_MAGIC,
813            version: TBP_VERSION,
814            flags: TbpFlags(TbpFlags::HAS_NULLS | TbpFlags::HAS_ROW_INDEX),
815            schema_id: 12345678,
816            row_count: 100,
817            column_count: 5,
818            reserved: 0,
819            null_bitmap_offset: 32,
820            row_index_offset: 48,
821        };
822
823        let mut buffer = Vec::new();
824        header.write(&mut buffer).unwrap();
825        assert_eq!(buffer.len(), TBP_HEADER_SIZE);
826
827        let parsed = TbpHeader::read(&buffer).unwrap();
828        assert_eq!(parsed.magic, TBP_MAGIC);
829        assert_eq!(parsed.version, TBP_VERSION);
830        assert_eq!(parsed.row_count, 100);
831        assert_eq!(parsed.column_count, 5);
832    }
833
834    #[test]
835    fn test_null_bitmap() {
836        let mut bitmap = NullBitmapMut::new(10, 5);
837        bitmap.set_null(0, 0);
838        bitmap.set_null(5, 3);
839        bitmap.set_null(9, 4);
840
841        let data = bitmap.as_bytes();
842        let reader = NullBitmap::new(data, 5);
843
844        assert!(reader.is_null(0, 0));
845        assert!(!reader.is_null(0, 1));
846        assert!(reader.is_null(5, 3));
847        assert!(reader.is_null(9, 4));
848        assert!(!reader.is_null(9, 3));
849    }
850
851    #[test]
852    fn test_writer_reader_roundtrip() {
853        let schema = TbpSchema::new(
854            "test_table",
855            vec![
856                TbpColumn::new("id", TbpColumnType::Int64).not_null(),
857                TbpColumn::new("value", TbpColumnType::Float64),
858            ],
859        );
860
861        let mut writer = TbpWriter::new(schema.clone(), 100);
862
863        // Write some rows
864        for i in 0..10 {
865            writer
866                .start_row()
867                .write_i64(i)
868                .write_f64(i as f64 * 1.5)
869                .finish();
870        }
871
872        let data = writer.finish();
873
874        // Read back
875        let reader = TbpReader::new(&data, &schema).unwrap();
876        assert_eq!(reader.row_count(), 10);
877
878        let row = reader.get_row(5).unwrap();
879        assert_eq!(row.read_i64(0), Some(5));
880        assert_eq!(row.read_f64(1), Some(7.5));
881    }
882}