1use std::io::{self, Write};
62
63use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
64
65pub const TBP_MAGIC: u32 = 0x544F_4F4E;
67
68pub const TBP_VERSION: u16 = 1;
70
71pub const TBP_HEADER_SIZE: usize = 32;
73
74#[derive(Debug, Clone, Copy, Default)]
76pub struct TbpFlags(pub u16);
77
78impl TbpFlags {
79 pub const HAS_NULLS: u16 = 1 << 0;
81 pub const HAS_ROW_INDEX: u16 = 1 << 1;
83 pub const COMPRESSED: u16 = 1 << 2;
85 pub const EMBEDDED_SCHEMA: u16 = 1 << 3;
87
88 pub fn has_nulls(&self) -> bool {
89 self.0 & Self::HAS_NULLS != 0
90 }
91
92 pub fn has_row_index(&self) -> bool {
93 self.0 & Self::HAS_ROW_INDEX != 0
94 }
95
96 pub fn is_compressed(&self) -> bool {
97 self.0 & Self::COMPRESSED != 0
98 }
99
100 pub fn has_embedded_schema(&self) -> bool {
101 self.0 & Self::EMBEDDED_SCHEMA != 0
102 }
103}
104
105#[derive(Debug, Clone, Copy, PartialEq, Eq)]
107#[repr(u8)]
108pub enum TbpColumnType {
109 Null = 0,
111 Bool = 1,
113 Int8 = 2,
115 UInt8 = 3,
117 Int16 = 4,
119 UInt16 = 5,
121 Int32 = 6,
123 UInt32 = 7,
125 Int64 = 8,
127 UInt64 = 9,
129 Float32 = 10,
131 Float64 = 11,
133 String = 12,
135 Binary = 13,
137 Timestamp = 14,
139 FixedBinary = 15,
141}
142
143impl TbpColumnType {
144 pub fn fixed_size(&self) -> Option<usize> {
146 match self {
147 TbpColumnType::Null => Some(0),
148 TbpColumnType::Bool => Some(1),
149 TbpColumnType::Int8 | TbpColumnType::UInt8 => Some(1),
150 TbpColumnType::Int16 | TbpColumnType::UInt16 => Some(2),
151 TbpColumnType::Int32 | TbpColumnType::UInt32 | TbpColumnType::Float32 => Some(4),
152 TbpColumnType::Int64 | TbpColumnType::UInt64 | TbpColumnType::Float64 | TbpColumnType::Timestamp => Some(8),
153 TbpColumnType::String | TbpColumnType::Binary => None,
154 TbpColumnType::FixedBinary => None, }
156 }
157
158 pub fn is_variable(&self) -> bool {
160 self.fixed_size().is_none()
161 }
162
163 pub fn from_byte(b: u8) -> Option<Self> {
164 match b {
165 0 => Some(Self::Null),
166 1 => Some(Self::Bool),
167 2 => Some(Self::Int8),
168 3 => Some(Self::UInt8),
169 4 => Some(Self::Int16),
170 5 => Some(Self::UInt16),
171 6 => Some(Self::Int32),
172 7 => Some(Self::UInt32),
173 8 => Some(Self::Int64),
174 9 => Some(Self::UInt64),
175 10 => Some(Self::Float32),
176 11 => Some(Self::Float64),
177 12 => Some(Self::String),
178 13 => Some(Self::Binary),
179 14 => Some(Self::Timestamp),
180 15 => Some(Self::FixedBinary),
181 _ => None,
182 }
183 }
184}
185
186#[derive(Debug, Clone)]
188pub struct TbpColumn {
189 pub name: String,
191 pub col_type: TbpColumnType,
193 pub fixed_size: Option<u16>,
195 pub nullable: bool,
197}
198
199impl TbpColumn {
200 pub fn new(name: impl Into<String>, col_type: TbpColumnType) -> Self {
201 Self {
202 name: name.into(),
203 col_type,
204 fixed_size: None,
205 nullable: true,
206 }
207 }
208
209 pub fn with_fixed_size(mut self, size: u16) -> Self {
210 self.fixed_size = Some(size);
211 self
212 }
213
214 pub fn not_null(mut self) -> Self {
215 self.nullable = false;
216 self
217 }
218}
219
220#[derive(Debug, Clone)]
222pub struct TbpSchema {
223 pub name: String,
225 pub columns: Vec<TbpColumn>,
227 pub schema_id: u64,
229}
230
231impl TbpSchema {
232 pub fn new(name: impl Into<String>, columns: Vec<TbpColumn>) -> Self {
233 let name = name.into();
234 let schema_id = Self::compute_schema_id(&name, &columns);
235 Self {
236 name,
237 columns,
238 schema_id,
239 }
240 }
241
242 fn compute_schema_id(name: &str, columns: &[TbpColumn]) -> u64 {
244 use std::hash::{Hash, Hasher};
245 use std::collections::hash_map::DefaultHasher;
246
247 let mut hasher = DefaultHasher::new();
248 name.hash(&mut hasher);
249 for col in columns {
250 col.name.hash(&mut hasher);
251 (col.col_type as u8).hash(&mut hasher);
252 col.fixed_size.hash(&mut hasher);
253 col.nullable.hash(&mut hasher);
254 }
255 hasher.finish()
256 }
257
258 pub fn has_variable_columns(&self) -> bool {
260 self.columns.iter().any(|c| c.col_type.is_variable())
261 }
262
263 pub fn has_nullable_columns(&self) -> bool {
265 self.columns.iter().any(|c| c.nullable)
266 }
267
268 pub fn fixed_row_size(&self) -> Option<usize> {
270 if self.has_variable_columns() {
271 return None;
272 }
273
274 let mut size = 0;
275 for col in &self.columns {
276 match col.col_type {
277 TbpColumnType::FixedBinary => {
278 size += col.fixed_size.unwrap_or(0) as usize;
279 }
280 _ => {
281 size += col.col_type.fixed_size()?;
282 }
283 }
284 }
285 Some(size)
286 }
287}
288
289#[derive(Debug, Clone)]
291pub struct TbpHeader {
292 pub magic: u32,
294 pub version: u16,
296 pub flags: TbpFlags,
298 pub schema_id: u64,
300 pub row_count: u32,
302 pub column_count: u16,
304 pub reserved: u16,
306 pub null_bitmap_offset: u32,
308 pub row_index_offset: u32,
310}
311
312impl TbpHeader {
313 pub fn write<W: Write>(&self, w: &mut W) -> io::Result<()> {
315 w.write_u32::<LittleEndian>(self.magic)?;
316 w.write_u16::<LittleEndian>(self.version)?;
317 w.write_u16::<LittleEndian>(self.flags.0)?;
318 w.write_u64::<LittleEndian>(self.schema_id)?;
319 w.write_u32::<LittleEndian>(self.row_count)?;
320 w.write_u16::<LittleEndian>(self.column_count)?;
321 w.write_u16::<LittleEndian>(self.reserved)?;
322 w.write_u32::<LittleEndian>(self.null_bitmap_offset)?;
323 w.write_u32::<LittleEndian>(self.row_index_offset)?;
324 Ok(())
325 }
326
327 pub fn read(data: &[u8]) -> io::Result<Self> {
329 if data.len() < TBP_HEADER_SIZE {
330 return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "Header too short"));
331 }
332
333 let mut cursor = std::io::Cursor::new(data);
334 let magic = cursor.read_u32::<LittleEndian>()?;
335 if magic != TBP_MAGIC {
336 return Err(io::Error::new(io::ErrorKind::InvalidData, "Invalid TBP magic"));
337 }
338
339 Ok(Self {
340 magic,
341 version: cursor.read_u16::<LittleEndian>()?,
342 flags: TbpFlags(cursor.read_u16::<LittleEndian>()?),
343 schema_id: cursor.read_u64::<LittleEndian>()?,
344 row_count: cursor.read_u32::<LittleEndian>()?,
345 column_count: cursor.read_u16::<LittleEndian>()?,
346 reserved: cursor.read_u16::<LittleEndian>()?,
347 null_bitmap_offset: cursor.read_u32::<LittleEndian>()?,
348 row_index_offset: cursor.read_u32::<LittleEndian>()?,
349 })
350 }
351}
352
353#[derive(Debug, Clone, Copy)]
355pub struct NullBitmap<'a> {
356 data: &'a [u8],
357 columns: usize,
358}
359
360impl<'a> NullBitmap<'a> {
361 pub fn new(data: &'a [u8], columns: usize) -> Self {
362 Self { data, columns }
363 }
364
365 #[inline]
367 pub fn is_null(&self, row: usize, col: usize) -> bool {
368 let bit_idx = row * self.columns + col;
369 let byte_idx = bit_idx / 8;
370 let bit_pos = bit_idx % 8;
371
372 if byte_idx >= self.data.len() {
373 return false;
374 }
375
376 self.data[byte_idx] & (1 << bit_pos) != 0
377 }
378
379 pub fn required_size(rows: usize, cols: usize) -> usize {
381 (rows * cols + 7) / 8
382 }
383}
384
385pub struct NullBitmapMut {
387 data: Vec<u8>,
388 columns: usize,
389}
390
391impl NullBitmapMut {
392 pub fn new(rows: usize, columns: usize) -> Self {
393 let size = NullBitmap::required_size(rows, columns);
394 Self {
395 data: vec![0; size],
396 columns,
397 }
398 }
399
400 #[inline]
402 pub fn set_null(&mut self, row: usize, col: usize) {
403 let bit_idx = row * self.columns + col;
404 let byte_idx = bit_idx / 8;
405 let bit_pos = bit_idx % 8;
406
407 if byte_idx < self.data.len() {
408 self.data[byte_idx] |= 1 << bit_pos;
409 }
410 }
411
412 pub fn as_bytes(&self) -> &[u8] {
414 &self.data
415 }
416
417 pub fn into_bytes(self) -> Vec<u8> {
419 self.data
420 }
421}
422
423#[derive(Debug, Clone)]
425pub struct RowView<'a> {
426 schema: &'a TbpSchema,
428 data: &'a [u8],
430 null_bitmap: Option<&'a NullBitmap<'a>>,
432 row_idx: usize,
434}
435
436impl<'a> RowView<'a> {
437 pub fn new(
438 schema: &'a TbpSchema,
439 data: &'a [u8],
440 null_bitmap: Option<&'a NullBitmap<'a>>,
441 row_idx: usize,
442 ) -> Self {
443 Self {
444 schema,
445 data,
446 null_bitmap,
447 row_idx,
448 }
449 }
450
451 #[inline]
453 pub fn is_null(&self, col: usize) -> bool {
454 self.null_bitmap
455 .map(|b| b.is_null(self.row_idx, col))
456 .unwrap_or(false)
457 }
458
459 fn column_offset(&self, col: usize) -> usize {
461 let mut offset = 0;
462 for c in &self.schema.columns[..col] {
463 offset += match c.col_type {
464 TbpColumnType::FixedBinary => c.fixed_size.unwrap_or(0) as usize,
465 _ => c.col_type.fixed_size().unwrap_or(0),
466 };
467 }
468 offset
469 }
470
471 pub fn read_bool(&self, col: usize) -> Option<bool> {
473 if self.is_null(col) {
474 return None;
475 }
476 let offset = self.column_offset(col);
477 Some(self.data.get(offset).copied().unwrap_or(0) != 0)
478 }
479
480 pub fn read_i64(&self, col: usize) -> Option<i64> {
482 if self.is_null(col) {
483 return None;
484 }
485 let offset = self.column_offset(col);
486 if offset + 8 > self.data.len() {
487 return None;
488 }
489 let bytes: [u8; 8] = self.data[offset..offset + 8].try_into().ok()?;
490 Some(i64::from_le_bytes(bytes))
491 }
492
493 pub fn read_u64(&self, col: usize) -> Option<u64> {
495 if self.is_null(col) {
496 return None;
497 }
498 let offset = self.column_offset(col);
499 if offset + 8 > self.data.len() {
500 return None;
501 }
502 let bytes: [u8; 8] = self.data[offset..offset + 8].try_into().ok()?;
503 Some(u64::from_le_bytes(bytes))
504 }
505
506 pub fn read_f64(&self, col: usize) -> Option<f64> {
508 if self.is_null(col) {
509 return None;
510 }
511 let offset = self.column_offset(col);
512 if offset + 8 > self.data.len() {
513 return None;
514 }
515 let bytes: [u8; 8] = self.data[offset..offset + 8].try_into().ok()?;
516 Some(f64::from_le_bytes(bytes))
517 }
518
519 pub fn read_i32(&self, col: usize) -> Option<i32> {
521 if self.is_null(col) {
522 return None;
523 }
524 let offset = self.column_offset(col);
525 if offset + 4 > self.data.len() {
526 return None;
527 }
528 let bytes: [u8; 4] = self.data[offset..offset + 4].try_into().ok()?;
529 Some(i32::from_le_bytes(bytes))
530 }
531
532 pub fn read_f32(&self, col: usize) -> Option<f32> {
534 if self.is_null(col) {
535 return None;
536 }
537 let offset = self.column_offset(col);
538 if offset + 4 > self.data.len() {
539 return None;
540 }
541 let bytes: [u8; 4] = self.data[offset..offset + 4].try_into().ok()?;
542 Some(f32::from_le_bytes(bytes))
543 }
544
545 pub fn raw_data(&self) -> &[u8] {
547 self.data
548 }
549}
550
551pub struct TbpWriter {
553 schema: TbpSchema,
554 null_bitmap: NullBitmapMut,
555 row_index: Vec<u32>,
556 data: Vec<u8>,
557 row_count: usize,
558}
559
560impl TbpWriter {
561 pub fn new(schema: TbpSchema, estimated_rows: usize) -> Self {
562 Self {
563 null_bitmap: NullBitmapMut::new(estimated_rows, schema.columns.len()),
564 row_index: Vec::with_capacity(estimated_rows),
565 data: Vec::with_capacity(estimated_rows * schema.fixed_row_size().unwrap_or(64)),
566 row_count: 0,
567 schema,
568 }
569 }
570
571 pub fn start_row(&mut self) -> TbpRowWriter<'_> {
573 let offset = self.data.len() as u32;
574 self.row_index.push(offset);
575 TbpRowWriter {
576 writer: self,
577 col_idx: 0,
578 }
579 }
580
581 fn set_null(&mut self, row: usize, col: usize) {
583 self.null_bitmap.set_null(row, col);
584 }
585
586 pub fn finish(self) -> Vec<u8> {
588 let has_nulls = self.schema.has_nullable_columns();
589 let has_variable = self.schema.has_variable_columns();
590
591 let mut flags = TbpFlags(0);
592 if has_nulls {
593 flags.0 |= TbpFlags::HAS_NULLS;
594 }
595 if has_variable {
596 flags.0 |= TbpFlags::HAS_ROW_INDEX;
597 }
598
599 let null_bitmap_offset = if has_nulls { TBP_HEADER_SIZE as u32 } else { 0 };
601 let null_bitmap_size = if has_nulls {
602 NullBitmap::required_size(self.row_count, self.schema.columns.len())
603 } else {
604 0
605 };
606
607 let row_index_offset = if has_variable {
608 (TBP_HEADER_SIZE + null_bitmap_size) as u32
609 } else {
610 0
611 };
612 let row_index_size = if has_variable {
613 self.row_count * 4
614 } else {
615 0
616 };
617
618 let data_offset = TBP_HEADER_SIZE + null_bitmap_size + row_index_size;
619
620 let header = TbpHeader {
621 magic: TBP_MAGIC,
622 version: TBP_VERSION,
623 flags,
624 schema_id: self.schema.schema_id,
625 row_count: self.row_count as u32,
626 column_count: self.schema.columns.len() as u16,
627 reserved: 0,
628 null_bitmap_offset,
629 row_index_offset,
630 };
631
632 let total_size = data_offset + self.data.len();
633 let mut buffer = Vec::with_capacity(total_size);
634
635 header.write(&mut buffer).unwrap();
637
638 if has_nulls {
640 let required = NullBitmap::required_size(self.row_count, self.schema.columns.len());
641 buffer.extend_from_slice(&self.null_bitmap.as_bytes()[..required]);
642 }
643
644 if has_variable {
646 for offset in &self.row_index {
647 buffer.write_u32::<LittleEndian>(*offset + data_offset as u32).unwrap();
648 }
649 }
650
651 buffer.extend_from_slice(&self.data);
653
654 buffer
655 }
656}
657
658pub struct TbpRowWriter<'a> {
660 writer: &'a mut TbpWriter,
661 col_idx: usize,
662}
663
664impl<'a> TbpRowWriter<'a> {
665 pub fn write_null(mut self) -> Self {
667 self.writer.set_null(self.writer.row_count, self.col_idx);
668 self.col_idx += 1;
669 self
670 }
671
672 pub fn write_bool(mut self, value: bool) -> Self {
674 self.writer.data.push(if value { 1 } else { 0 });
675 self.col_idx += 1;
676 self
677 }
678
679 pub fn write_i64(mut self, value: i64) -> Self {
681 self.writer.data.extend_from_slice(&value.to_le_bytes());
682 self.col_idx += 1;
683 self
684 }
685
686 pub fn write_u64(mut self, value: u64) -> Self {
688 self.writer.data.extend_from_slice(&value.to_le_bytes());
689 self.col_idx += 1;
690 self
691 }
692
693 pub fn write_f64(mut self, value: f64) -> Self {
695 self.writer.data.extend_from_slice(&value.to_le_bytes());
696 self.col_idx += 1;
697 self
698 }
699
700 pub fn write_i32(mut self, value: i32) -> Self {
702 self.writer.data.extend_from_slice(&value.to_le_bytes());
703 self.col_idx += 1;
704 self
705 }
706
707 pub fn write_f32(mut self, value: f32) -> Self {
709 self.writer.data.extend_from_slice(&value.to_le_bytes());
710 self.col_idx += 1;
711 self
712 }
713
714 pub fn write_string(mut self, value: &str) -> Self {
716 let bytes = value.as_bytes();
717 self.writer.data.write_u32::<LittleEndian>(bytes.len() as u32).unwrap();
718 self.writer.data.extend_from_slice(bytes);
719 self.col_idx += 1;
720 self
721 }
722
723 pub fn write_binary(mut self, value: &[u8]) -> Self {
725 self.writer.data.write_u32::<LittleEndian>(value.len() as u32).unwrap();
726 self.writer.data.extend_from_slice(value);
727 self.col_idx += 1;
728 self
729 }
730
731 pub fn finish(self) {
733 self.writer.row_count += 1;
734 }
735}
736
737pub struct TbpReader<'a> {
739 data: &'a [u8],
740 header: TbpHeader,
741 schema: &'a TbpSchema,
742}
743
744impl<'a> TbpReader<'a> {
745 pub fn new(data: &'a [u8], schema: &'a TbpSchema) -> io::Result<Self> {
747 let header = TbpHeader::read(data)?;
748
749 if header.schema_id != schema.schema_id {
750 return Err(io::Error::new(
751 io::ErrorKind::InvalidData,
752 "Schema ID mismatch",
753 ));
754 }
755
756 Ok(Self {
757 data,
758 header,
759 schema,
760 })
761 }
762
763 pub fn row_count(&self) -> usize {
765 self.header.row_count as usize
766 }
767
768 pub fn get_row(&self, row: usize) -> Option<RowView<'_>> {
770 if row >= self.row_count() {
771 return None;
772 }
773
774 let row_offset = if self.header.flags.has_row_index() {
776 let idx_offset = self.header.row_index_offset as usize + row * 4;
777 if idx_offset + 4 > self.data.len() {
778 return None;
779 }
780 let bytes: [u8; 4] = self.data[idx_offset..idx_offset + 4].try_into().ok()?;
781 u32::from_le_bytes(bytes) as usize
782 } else {
783 let row_size = self.schema.fixed_row_size()?;
785 let null_bitmap_size = if self.header.flags.has_nulls() {
786 NullBitmap::required_size(self.row_count(), self.schema.columns.len())
787 } else {
788 0
789 };
790 TBP_HEADER_SIZE + null_bitmap_size + row * row_size
791 };
792
793 let row_data = &self.data[row_offset..];
794
795 Some(RowView::new(self.schema, row_data, None, row))
797 }
798
799 pub fn iter(&'a self) -> impl Iterator<Item = RowView<'a>> {
801 (0..self.row_count()).filter_map(move |i| self.get_row(i))
802 }
803}
804
805#[cfg(test)]
806mod tests {
807 use super::*;
808
809 #[test]
810 fn test_header_roundtrip() {
811 let header = TbpHeader {
812 magic: TBP_MAGIC,
813 version: TBP_VERSION,
814 flags: TbpFlags(TbpFlags::HAS_NULLS | TbpFlags::HAS_ROW_INDEX),
815 schema_id: 12345678,
816 row_count: 100,
817 column_count: 5,
818 reserved: 0,
819 null_bitmap_offset: 32,
820 row_index_offset: 48,
821 };
822
823 let mut buffer = Vec::new();
824 header.write(&mut buffer).unwrap();
825 assert_eq!(buffer.len(), TBP_HEADER_SIZE);
826
827 let parsed = TbpHeader::read(&buffer).unwrap();
828 assert_eq!(parsed.magic, TBP_MAGIC);
829 assert_eq!(parsed.version, TBP_VERSION);
830 assert_eq!(parsed.row_count, 100);
831 assert_eq!(parsed.column_count, 5);
832 }
833
834 #[test]
835 fn test_null_bitmap() {
836 let mut bitmap = NullBitmapMut::new(10, 5);
837 bitmap.set_null(0, 0);
838 bitmap.set_null(5, 3);
839 bitmap.set_null(9, 4);
840
841 let data = bitmap.as_bytes();
842 let reader = NullBitmap::new(data, 5);
843
844 assert!(reader.is_null(0, 0));
845 assert!(!reader.is_null(0, 1));
846 assert!(reader.is_null(5, 3));
847 assert!(reader.is_null(9, 4));
848 assert!(!reader.is_null(9, 3));
849 }
850
851 #[test]
852 fn test_writer_reader_roundtrip() {
853 let schema = TbpSchema::new(
854 "test_table",
855 vec![
856 TbpColumn::new("id", TbpColumnType::Int64).not_null(),
857 TbpColumn::new("value", TbpColumnType::Float64),
858 ],
859 );
860
861 let mut writer = TbpWriter::new(schema.clone(), 100);
862
863 for i in 0..10 {
865 writer
866 .start_row()
867 .write_i64(i)
868 .write_f64(i as f64 * 1.5)
869 .finish();
870 }
871
872 let data = writer.finish();
873
874 let reader = TbpReader::new(&data, &schema).unwrap();
876 assert_eq!(reader.row_count(), 10);
877
878 let row = reader.get_row(5).unwrap();
879 assert_eq!(row.read_i64(0), Some(5));
880 assert_eq!(row.read_f64(1), Some(7.5));
881 }
882}