1use std::io::{self, Write};
62
63use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
64
65pub const TBP_MAGIC: u32 = 0x544F_4F4E;
67
68pub const TBP_VERSION: u16 = 1;
70
71pub const TBP_HEADER_SIZE: usize = 32;
73
74#[derive(Debug, Clone, Copy, Default)]
76pub struct TbpFlags(pub u16);
77
78impl TbpFlags {
79 pub const HAS_NULLS: u16 = 1 << 0;
81 pub const HAS_ROW_INDEX: u16 = 1 << 1;
83 pub const COMPRESSED: u16 = 1 << 2;
85 pub const EMBEDDED_SCHEMA: u16 = 1 << 3;
87
88 pub fn has_nulls(&self) -> bool {
89 self.0 & Self::HAS_NULLS != 0
90 }
91
92 pub fn has_row_index(&self) -> bool {
93 self.0 & Self::HAS_ROW_INDEX != 0
94 }
95
96 pub fn is_compressed(&self) -> bool {
97 self.0 & Self::COMPRESSED != 0
98 }
99
100 pub fn has_embedded_schema(&self) -> bool {
101 self.0 & Self::EMBEDDED_SCHEMA != 0
102 }
103}
104
105#[derive(Debug, Clone, Copy, PartialEq, Eq)]
107#[repr(u8)]
108pub enum TbpColumnType {
109 Null = 0,
111 Bool = 1,
113 Int8 = 2,
115 UInt8 = 3,
117 Int16 = 4,
119 UInt16 = 5,
121 Int32 = 6,
123 UInt32 = 7,
125 Int64 = 8,
127 UInt64 = 9,
129 Float32 = 10,
131 Float64 = 11,
133 String = 12,
135 Binary = 13,
137 Timestamp = 14,
139 FixedBinary = 15,
141}
142
143impl TbpColumnType {
144 pub fn fixed_size(&self) -> Option<usize> {
146 match self {
147 TbpColumnType::Null => Some(0),
148 TbpColumnType::Bool => Some(1),
149 TbpColumnType::Int8 | TbpColumnType::UInt8 => Some(1),
150 TbpColumnType::Int16 | TbpColumnType::UInt16 => Some(2),
151 TbpColumnType::Int32 | TbpColumnType::UInt32 | TbpColumnType::Float32 => Some(4),
152 TbpColumnType::Int64
153 | TbpColumnType::UInt64
154 | TbpColumnType::Float64
155 | TbpColumnType::Timestamp => Some(8),
156 TbpColumnType::String | TbpColumnType::Binary => None,
157 TbpColumnType::FixedBinary => None, }
159 }
160
161 pub fn is_variable(&self) -> bool {
163 self.fixed_size().is_none()
164 }
165
166 pub fn from_byte(b: u8) -> Option<Self> {
167 match b {
168 0 => Some(Self::Null),
169 1 => Some(Self::Bool),
170 2 => Some(Self::Int8),
171 3 => Some(Self::UInt8),
172 4 => Some(Self::Int16),
173 5 => Some(Self::UInt16),
174 6 => Some(Self::Int32),
175 7 => Some(Self::UInt32),
176 8 => Some(Self::Int64),
177 9 => Some(Self::UInt64),
178 10 => Some(Self::Float32),
179 11 => Some(Self::Float64),
180 12 => Some(Self::String),
181 13 => Some(Self::Binary),
182 14 => Some(Self::Timestamp),
183 15 => Some(Self::FixedBinary),
184 _ => None,
185 }
186 }
187}
188
189#[derive(Debug, Clone)]
191pub struct TbpColumn {
192 pub name: String,
194 pub col_type: TbpColumnType,
196 pub fixed_size: Option<u16>,
198 pub nullable: bool,
200}
201
202impl TbpColumn {
203 pub fn new(name: impl Into<String>, col_type: TbpColumnType) -> Self {
204 Self {
205 name: name.into(),
206 col_type,
207 fixed_size: None,
208 nullable: true,
209 }
210 }
211
212 pub fn with_fixed_size(mut self, size: u16) -> Self {
213 self.fixed_size = Some(size);
214 self
215 }
216
217 pub fn not_null(mut self) -> Self {
218 self.nullable = false;
219 self
220 }
221}
222
223#[derive(Debug, Clone)]
225pub struct TbpSchema {
226 pub name: String,
228 pub columns: Vec<TbpColumn>,
230 pub schema_id: u64,
232}
233
234impl TbpSchema {
235 pub fn new(name: impl Into<String>, columns: Vec<TbpColumn>) -> Self {
236 let name = name.into();
237 let schema_id = Self::compute_schema_id(&name, &columns);
238 Self {
239 name,
240 columns,
241 schema_id,
242 }
243 }
244
245 fn compute_schema_id(name: &str, columns: &[TbpColumn]) -> u64 {
247 use std::collections::hash_map::DefaultHasher;
248 use std::hash::{Hash, Hasher};
249
250 let mut hasher = DefaultHasher::new();
251 name.hash(&mut hasher);
252 for col in columns {
253 col.name.hash(&mut hasher);
254 (col.col_type as u8).hash(&mut hasher);
255 col.fixed_size.hash(&mut hasher);
256 col.nullable.hash(&mut hasher);
257 }
258 hasher.finish()
259 }
260
261 pub fn has_variable_columns(&self) -> bool {
263 self.columns.iter().any(|c| c.col_type.is_variable())
264 }
265
266 pub fn has_nullable_columns(&self) -> bool {
268 self.columns.iter().any(|c| c.nullable)
269 }
270
271 pub fn fixed_row_size(&self) -> Option<usize> {
273 if self.has_variable_columns() {
274 return None;
275 }
276
277 let mut size = 0;
278 for col in &self.columns {
279 match col.col_type {
280 TbpColumnType::FixedBinary => {
281 size += col.fixed_size.unwrap_or(0) as usize;
282 }
283 _ => {
284 size += col.col_type.fixed_size()?;
285 }
286 }
287 }
288 Some(size)
289 }
290}
291
292#[derive(Debug, Clone)]
294pub struct TbpHeader {
295 pub magic: u32,
297 pub version: u16,
299 pub flags: TbpFlags,
301 pub schema_id: u64,
303 pub row_count: u32,
305 pub column_count: u16,
307 pub reserved: u16,
309 pub null_bitmap_offset: u32,
311 pub row_index_offset: u32,
313}
314
315impl TbpHeader {
316 pub fn write<W: Write>(&self, w: &mut W) -> io::Result<()> {
318 w.write_u32::<LittleEndian>(self.magic)?;
319 w.write_u16::<LittleEndian>(self.version)?;
320 w.write_u16::<LittleEndian>(self.flags.0)?;
321 w.write_u64::<LittleEndian>(self.schema_id)?;
322 w.write_u32::<LittleEndian>(self.row_count)?;
323 w.write_u16::<LittleEndian>(self.column_count)?;
324 w.write_u16::<LittleEndian>(self.reserved)?;
325 w.write_u32::<LittleEndian>(self.null_bitmap_offset)?;
326 w.write_u32::<LittleEndian>(self.row_index_offset)?;
327 Ok(())
328 }
329
330 pub fn read(data: &[u8]) -> io::Result<Self> {
332 if data.len() < TBP_HEADER_SIZE {
333 return Err(io::Error::new(
334 io::ErrorKind::UnexpectedEof,
335 "Header too short",
336 ));
337 }
338
339 let mut cursor = std::io::Cursor::new(data);
340 let magic = cursor.read_u32::<LittleEndian>()?;
341 if magic != TBP_MAGIC {
342 return Err(io::Error::new(
343 io::ErrorKind::InvalidData,
344 "Invalid TBP magic",
345 ));
346 }
347
348 let header = Self {
349 magic,
350 version: cursor.read_u16::<LittleEndian>()?,
351 flags: TbpFlags(cursor.read_u16::<LittleEndian>()?),
352 schema_id: cursor.read_u64::<LittleEndian>()?,
353 row_count: cursor.read_u32::<LittleEndian>()?,
354 column_count: cursor.read_u16::<LittleEndian>()?,
355 reserved: cursor.read_u16::<LittleEndian>()?,
356 null_bitmap_offset: cursor.read_u32::<LittleEndian>()?,
357 row_index_offset: cursor.read_u32::<LittleEndian>()?,
358 };
359
360 let data_len = data.len() as u64;
361
362 if data_len > TBP_HEADER_SIZE as u64 {
367 if header.null_bitmap_offset != 0 && (header.null_bitmap_offset as u64) >= data_len {
368 return Err(io::Error::new(
369 io::ErrorKind::InvalidData,
370 format!(
371 "null_bitmap_offset ({}) exceeds data length ({})",
372 header.null_bitmap_offset, data_len
373 ),
374 ));
375 }
376 if header.row_index_offset != 0 && (header.row_index_offset as u64) >= data_len {
377 return Err(io::Error::new(
378 io::ErrorKind::InvalidData,
379 format!(
380 "row_index_offset ({}) exceeds data length ({})",
381 header.row_index_offset, data_len
382 ),
383 ));
384 }
385 }
386
387 Ok(header)
388 }
389}
390
391#[derive(Debug, Clone, Copy)]
393pub struct NullBitmap<'a> {
394 data: &'a [u8],
395 columns: usize,
396}
397
398impl<'a> NullBitmap<'a> {
399 pub fn new(data: &'a [u8], columns: usize) -> Self {
400 Self { data, columns }
401 }
402
403 #[inline]
405 pub fn is_null(&self, row: usize, col: usize) -> bool {
406 let bit_idx = row * self.columns + col;
407 let byte_idx = bit_idx / 8;
408 let bit_pos = bit_idx % 8;
409
410 if byte_idx >= self.data.len() {
411 return false;
412 }
413
414 self.data[byte_idx] & (1 << bit_pos) != 0
415 }
416
417 pub fn required_size(rows: usize, cols: usize) -> usize {
419 (rows * cols + 7) / 8
420 }
421}
422
423pub struct NullBitmapMut {
425 data: Vec<u8>,
426 columns: usize,
427}
428
429impl NullBitmapMut {
430 pub fn new(rows: usize, columns: usize) -> Self {
431 let size = NullBitmap::required_size(rows, columns);
432 Self {
433 data: vec![0; size],
434 columns,
435 }
436 }
437
438 #[inline]
440 pub fn set_null(&mut self, row: usize, col: usize) {
441 let bit_idx = row * self.columns + col;
442 let byte_idx = bit_idx / 8;
443 let bit_pos = bit_idx % 8;
444
445 if byte_idx < self.data.len() {
446 self.data[byte_idx] |= 1 << bit_pos;
447 }
448 }
449
450 pub fn as_bytes(&self) -> &[u8] {
452 &self.data
453 }
454
455 pub fn into_bytes(self) -> Vec<u8> {
457 self.data
458 }
459}
460
461#[derive(Debug, Clone)]
463pub struct RowView<'a> {
464 schema: &'a TbpSchema,
466 data: &'a [u8],
468 null_bitmap: Option<&'a NullBitmap<'a>>,
470 row_idx: usize,
472}
473
474impl<'a> RowView<'a> {
475 pub fn new(
476 schema: &'a TbpSchema,
477 data: &'a [u8],
478 null_bitmap: Option<&'a NullBitmap<'a>>,
479 row_idx: usize,
480 ) -> Self {
481 Self {
482 schema,
483 data,
484 null_bitmap,
485 row_idx,
486 }
487 }
488
489 #[inline]
491 pub fn is_null(&self, col: usize) -> bool {
492 self.null_bitmap
493 .map(|b| b.is_null(self.row_idx, col))
494 .unwrap_or(false)
495 }
496
497 fn column_offset(&self, col: usize) -> usize {
499 let mut offset = 0;
500 for c in &self.schema.columns[..col] {
501 offset += match c.col_type {
502 TbpColumnType::FixedBinary => c.fixed_size.unwrap_or(0) as usize,
503 _ => c.col_type.fixed_size().unwrap_or(0),
504 };
505 }
506 offset
507 }
508
509 pub fn read_bool(&self, col: usize) -> Option<bool> {
511 if self.is_null(col) {
512 return None;
513 }
514 let offset = self.column_offset(col);
515 Some(self.data.get(offset).copied().unwrap_or(0) != 0)
516 }
517
518 pub fn read_i64(&self, col: usize) -> Option<i64> {
520 if self.is_null(col) {
521 return None;
522 }
523 let offset = self.column_offset(col);
524 if offset + 8 > self.data.len() {
525 return None;
526 }
527 let bytes: [u8; 8] = self.data[offset..offset + 8].try_into().ok()?;
528 Some(i64::from_le_bytes(bytes))
529 }
530
531 pub fn read_u64(&self, col: usize) -> Option<u64> {
533 if self.is_null(col) {
534 return None;
535 }
536 let offset = self.column_offset(col);
537 if offset + 8 > self.data.len() {
538 return None;
539 }
540 let bytes: [u8; 8] = self.data[offset..offset + 8].try_into().ok()?;
541 Some(u64::from_le_bytes(bytes))
542 }
543
544 pub fn read_f64(&self, col: usize) -> Option<f64> {
546 if self.is_null(col) {
547 return None;
548 }
549 let offset = self.column_offset(col);
550 if offset + 8 > self.data.len() {
551 return None;
552 }
553 let bytes: [u8; 8] = self.data[offset..offset + 8].try_into().ok()?;
554 Some(f64::from_le_bytes(bytes))
555 }
556
557 pub fn read_i32(&self, col: usize) -> Option<i32> {
559 if self.is_null(col) {
560 return None;
561 }
562 let offset = self.column_offset(col);
563 if offset + 4 > self.data.len() {
564 return None;
565 }
566 let bytes: [u8; 4] = self.data[offset..offset + 4].try_into().ok()?;
567 Some(i32::from_le_bytes(bytes))
568 }
569
570 pub fn read_f32(&self, col: usize) -> Option<f32> {
572 if self.is_null(col) {
573 return None;
574 }
575 let offset = self.column_offset(col);
576 if offset + 4 > self.data.len() {
577 return None;
578 }
579 let bytes: [u8; 4] = self.data[offset..offset + 4].try_into().ok()?;
580 Some(f32::from_le_bytes(bytes))
581 }
582
583 pub fn raw_data(&self) -> &[u8] {
585 self.data
586 }
587}
588
589pub struct TbpWriter {
591 schema: TbpSchema,
592 null_bitmap: NullBitmapMut,
593 row_index: Vec<u32>,
594 data: Vec<u8>,
595 row_count: usize,
596}
597
598impl TbpWriter {
599 pub fn new(schema: TbpSchema, estimated_rows: usize) -> Self {
600 Self {
601 null_bitmap: NullBitmapMut::new(estimated_rows, schema.columns.len()),
602 row_index: Vec::with_capacity(estimated_rows),
603 data: Vec::with_capacity(estimated_rows * schema.fixed_row_size().unwrap_or(64)),
604 row_count: 0,
605 schema,
606 }
607 }
608
609 pub fn start_row(&mut self) -> TbpRowWriter<'_> {
611 let offset = self.data.len() as u32;
612 self.row_index.push(offset);
613 TbpRowWriter {
614 writer: self,
615 col_idx: 0,
616 }
617 }
618
619 fn set_null(&mut self, row: usize, col: usize) {
621 self.null_bitmap.set_null(row, col);
622 }
623
624 pub fn finish(self) -> Vec<u8> {
626 let has_nulls = self.schema.has_nullable_columns();
627 let has_variable = self.schema.has_variable_columns();
628
629 let mut flags = TbpFlags(0);
630 if has_nulls {
631 flags.0 |= TbpFlags::HAS_NULLS;
632 }
633 if has_variable {
634 flags.0 |= TbpFlags::HAS_ROW_INDEX;
635 }
636
637 let null_bitmap_offset = if has_nulls { TBP_HEADER_SIZE as u32 } else { 0 };
639 let null_bitmap_size = if has_nulls {
640 NullBitmap::required_size(self.row_count, self.schema.columns.len())
641 } else {
642 0
643 };
644
645 let row_index_offset = if has_variable {
646 (TBP_HEADER_SIZE + null_bitmap_size) as u32
647 } else {
648 0
649 };
650 let row_index_size = if has_variable { self.row_count * 4 } else { 0 };
651
652 let data_offset = TBP_HEADER_SIZE + null_bitmap_size + row_index_size;
653
654 let header = TbpHeader {
655 magic: TBP_MAGIC,
656 version: TBP_VERSION,
657 flags,
658 schema_id: self.schema.schema_id,
659 row_count: self.row_count as u32,
660 column_count: self.schema.columns.len() as u16,
661 reserved: 0,
662 null_bitmap_offset,
663 row_index_offset,
664 };
665
666 let total_size = data_offset + self.data.len();
667 let mut buffer = Vec::with_capacity(total_size);
668
669 header.write(&mut buffer).unwrap();
671
672 if has_nulls {
674 let required = NullBitmap::required_size(self.row_count, self.schema.columns.len());
675 buffer.extend_from_slice(&self.null_bitmap.as_bytes()[..required]);
676 }
677
678 if has_variable {
680 for offset in &self.row_index {
681 buffer
682 .write_u32::<LittleEndian>(*offset + data_offset as u32)
683 .unwrap();
684 }
685 }
686
687 buffer.extend_from_slice(&self.data);
689
690 buffer
691 }
692}
693
694pub struct TbpRowWriter<'a> {
696 writer: &'a mut TbpWriter,
697 col_idx: usize,
698}
699
700impl<'a> TbpRowWriter<'a> {
701 pub fn write_null(mut self) -> Self {
703 self.writer.set_null(self.writer.row_count, self.col_idx);
704 self.col_idx += 1;
705 self
706 }
707
708 pub fn write_bool(mut self, value: bool) -> Self {
710 self.writer.data.push(if value { 1 } else { 0 });
711 self.col_idx += 1;
712 self
713 }
714
715 pub fn write_i64(mut self, value: i64) -> Self {
717 self.writer.data.extend_from_slice(&value.to_le_bytes());
718 self.col_idx += 1;
719 self
720 }
721
722 pub fn write_u64(mut self, value: u64) -> Self {
724 self.writer.data.extend_from_slice(&value.to_le_bytes());
725 self.col_idx += 1;
726 self
727 }
728
729 pub fn write_f64(mut self, value: f64) -> Self {
731 self.writer.data.extend_from_slice(&value.to_le_bytes());
732 self.col_idx += 1;
733 self
734 }
735
736 pub fn write_i32(mut self, value: i32) -> Self {
738 self.writer.data.extend_from_slice(&value.to_le_bytes());
739 self.col_idx += 1;
740 self
741 }
742
743 pub fn write_f32(mut self, value: f32) -> Self {
745 self.writer.data.extend_from_slice(&value.to_le_bytes());
746 self.col_idx += 1;
747 self
748 }
749
750 pub fn write_string(mut self, value: &str) -> Self {
752 let bytes = value.as_bytes();
753 self.writer
754 .data
755 .write_u32::<LittleEndian>(bytes.len() as u32)
756 .unwrap();
757 self.writer.data.extend_from_slice(bytes);
758 self.col_idx += 1;
759 self
760 }
761
762 pub fn write_binary(mut self, value: &[u8]) -> Self {
764 self.writer
765 .data
766 .write_u32::<LittleEndian>(value.len() as u32)
767 .unwrap();
768 self.writer.data.extend_from_slice(value);
769 self.col_idx += 1;
770 self
771 }
772
773 pub fn finish(self) {
775 self.writer.row_count += 1;
776 }
777}
778
779pub struct TbpReader<'a> {
781 data: &'a [u8],
782 header: TbpHeader,
783 schema: &'a TbpSchema,
784}
785
786impl<'a> TbpReader<'a> {
787 pub fn new(data: &'a [u8], schema: &'a TbpSchema) -> io::Result<Self> {
789 let header = TbpHeader::read(data)?;
790
791 if header.schema_id != schema.schema_id {
792 return Err(io::Error::new(
793 io::ErrorKind::InvalidData,
794 "Schema ID mismatch",
795 ));
796 }
797
798 Ok(Self {
799 data,
800 header,
801 schema,
802 })
803 }
804
805 pub fn row_count(&self) -> usize {
807 self.header.row_count as usize
808 }
809
810 pub fn get_row(&self, row: usize) -> Option<RowView<'_>> {
812 if row >= self.row_count() {
813 return None;
814 }
815
816 let row_offset = if self.header.flags.has_row_index() {
818 let idx_offset = self.header.row_index_offset as usize + row * 4;
819 if idx_offset + 4 > self.data.len() {
820 return None;
821 }
822 let bytes: [u8; 4] = self.data[idx_offset..idx_offset + 4].try_into().ok()?;
823 u32::from_le_bytes(bytes) as usize
824 } else {
825 let row_size = self.schema.fixed_row_size()?;
827 let null_bitmap_size = if self.header.flags.has_nulls() {
828 NullBitmap::required_size(self.row_count(), self.schema.columns.len())
829 } else {
830 0
831 };
832 TBP_HEADER_SIZE + null_bitmap_size + row * row_size
833 };
834
835 let row_data = &self.data[row_offset..];
836
837 Some(RowView::new(self.schema, row_data, None, row))
839 }
840
841 pub fn iter(&'a self) -> impl Iterator<Item = RowView<'a>> {
843 (0..self.row_count()).filter_map(move |i| self.get_row(i))
844 }
845}
846
847#[cfg(test)]
848mod tests {
849 use super::*;
850
851 #[test]
852 fn test_header_roundtrip() {
853 let header = TbpHeader {
854 magic: TBP_MAGIC,
855 version: TBP_VERSION,
856 flags: TbpFlags(TbpFlags::HAS_NULLS | TbpFlags::HAS_ROW_INDEX),
857 schema_id: 12345678,
858 row_count: 100,
859 column_count: 5,
860 reserved: 0,
861 null_bitmap_offset: 32,
862 row_index_offset: 48,
863 };
864
865 let mut buffer = Vec::new();
866 header.write(&mut buffer).unwrap();
867 assert_eq!(buffer.len(), TBP_HEADER_SIZE);
868
869 let parsed = TbpHeader::read(&buffer).unwrap();
870 assert_eq!(parsed.magic, TBP_MAGIC);
871 assert_eq!(parsed.version, TBP_VERSION);
872 assert_eq!(parsed.row_count, 100);
873 assert_eq!(parsed.column_count, 5);
874 }
875
876 #[test]
877 fn test_null_bitmap() {
878 let mut bitmap = NullBitmapMut::new(10, 5);
879 bitmap.set_null(0, 0);
880 bitmap.set_null(5, 3);
881 bitmap.set_null(9, 4);
882
883 let data = bitmap.as_bytes();
884 let reader = NullBitmap::new(data, 5);
885
886 assert!(reader.is_null(0, 0));
887 assert!(!reader.is_null(0, 1));
888 assert!(reader.is_null(5, 3));
889 assert!(reader.is_null(9, 4));
890 assert!(!reader.is_null(9, 3));
891 }
892
893 #[test]
894 fn test_writer_reader_roundtrip() {
895 let schema = TbpSchema::new(
896 "test_table",
897 vec![
898 TbpColumn::new("id", TbpColumnType::Int64).not_null(),
899 TbpColumn::new("value", TbpColumnType::Float64),
900 ],
901 );
902
903 let mut writer = TbpWriter::new(schema.clone(), 100);
904
905 for i in 0..10 {
907 writer
908 .start_row()
909 .write_i64(i)
910 .write_f64(i as f64 * 1.5)
911 .finish();
912 }
913
914 let data = writer.finish();
915
916 let reader = TbpReader::new(&data, &schema).unwrap();
918 assert_eq!(reader.row_count(), 10);
919
920 let row = reader.get_row(5).unwrap();
921 assert_eq!(row.read_i64(0), Some(5));
922 assert_eq!(row.read_f64(1), Some(7.5));
923 }
924}