1use std::io::{self, Write};
59
60use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
61
62pub const TBP_MAGIC: u32 = 0x544F_4F4E;
64
65pub const TBP_VERSION: u16 = 1;
67
68pub const TBP_HEADER_SIZE: usize = 32;
70
71#[derive(Debug, Clone, Copy, Default)]
73pub struct TbpFlags(pub u16);
74
75impl TbpFlags {
76 pub const HAS_NULLS: u16 = 1 << 0;
78 pub const HAS_ROW_INDEX: u16 = 1 << 1;
80 pub const COMPRESSED: u16 = 1 << 2;
82 pub const EMBEDDED_SCHEMA: u16 = 1 << 3;
84
85 pub fn has_nulls(&self) -> bool {
86 self.0 & Self::HAS_NULLS != 0
87 }
88
89 pub fn has_row_index(&self) -> bool {
90 self.0 & Self::HAS_ROW_INDEX != 0
91 }
92
93 pub fn is_compressed(&self) -> bool {
94 self.0 & Self::COMPRESSED != 0
95 }
96
97 pub fn has_embedded_schema(&self) -> bool {
98 self.0 & Self::EMBEDDED_SCHEMA != 0
99 }
100}
101
102#[derive(Debug, Clone, Copy, PartialEq, Eq)]
104#[repr(u8)]
105pub enum TbpColumnType {
106 Null = 0,
108 Bool = 1,
110 Int8 = 2,
112 UInt8 = 3,
114 Int16 = 4,
116 UInt16 = 5,
118 Int32 = 6,
120 UInt32 = 7,
122 Int64 = 8,
124 UInt64 = 9,
126 Float32 = 10,
128 Float64 = 11,
130 String = 12,
132 Binary = 13,
134 Timestamp = 14,
136 FixedBinary = 15,
138}
139
140impl TbpColumnType {
141 pub fn fixed_size(&self) -> Option<usize> {
143 match self {
144 TbpColumnType::Null => Some(0),
145 TbpColumnType::Bool => Some(1),
146 TbpColumnType::Int8 | TbpColumnType::UInt8 => Some(1),
147 TbpColumnType::Int16 | TbpColumnType::UInt16 => Some(2),
148 TbpColumnType::Int32 | TbpColumnType::UInt32 | TbpColumnType::Float32 => Some(4),
149 TbpColumnType::Int64 | TbpColumnType::UInt64 | TbpColumnType::Float64 | TbpColumnType::Timestamp => Some(8),
150 TbpColumnType::String | TbpColumnType::Binary => None,
151 TbpColumnType::FixedBinary => None, }
153 }
154
155 pub fn is_variable(&self) -> bool {
157 self.fixed_size().is_none()
158 }
159
160 pub fn from_byte(b: u8) -> Option<Self> {
161 match b {
162 0 => Some(Self::Null),
163 1 => Some(Self::Bool),
164 2 => Some(Self::Int8),
165 3 => Some(Self::UInt8),
166 4 => Some(Self::Int16),
167 5 => Some(Self::UInt16),
168 6 => Some(Self::Int32),
169 7 => Some(Self::UInt32),
170 8 => Some(Self::Int64),
171 9 => Some(Self::UInt64),
172 10 => Some(Self::Float32),
173 11 => Some(Self::Float64),
174 12 => Some(Self::String),
175 13 => Some(Self::Binary),
176 14 => Some(Self::Timestamp),
177 15 => Some(Self::FixedBinary),
178 _ => None,
179 }
180 }
181}
182
183#[derive(Debug, Clone)]
185pub struct TbpColumn {
186 pub name: String,
188 pub col_type: TbpColumnType,
190 pub fixed_size: Option<u16>,
192 pub nullable: bool,
194}
195
196impl TbpColumn {
197 pub fn new(name: impl Into<String>, col_type: TbpColumnType) -> Self {
198 Self {
199 name: name.into(),
200 col_type,
201 fixed_size: None,
202 nullable: true,
203 }
204 }
205
206 pub fn with_fixed_size(mut self, size: u16) -> Self {
207 self.fixed_size = Some(size);
208 self
209 }
210
211 pub fn not_null(mut self) -> Self {
212 self.nullable = false;
213 self
214 }
215}
216
217#[derive(Debug, Clone)]
219pub struct TbpSchema {
220 pub name: String,
222 pub columns: Vec<TbpColumn>,
224 pub schema_id: u64,
226}
227
228impl TbpSchema {
229 pub fn new(name: impl Into<String>, columns: Vec<TbpColumn>) -> Self {
230 let name = name.into();
231 let schema_id = Self::compute_schema_id(&name, &columns);
232 Self {
233 name,
234 columns,
235 schema_id,
236 }
237 }
238
239 fn compute_schema_id(name: &str, columns: &[TbpColumn]) -> u64 {
241 use std::hash::{Hash, Hasher};
242 use std::collections::hash_map::DefaultHasher;
243
244 let mut hasher = DefaultHasher::new();
245 name.hash(&mut hasher);
246 for col in columns {
247 col.name.hash(&mut hasher);
248 (col.col_type as u8).hash(&mut hasher);
249 col.fixed_size.hash(&mut hasher);
250 col.nullable.hash(&mut hasher);
251 }
252 hasher.finish()
253 }
254
255 pub fn has_variable_columns(&self) -> bool {
257 self.columns.iter().any(|c| c.col_type.is_variable())
258 }
259
260 pub fn has_nullable_columns(&self) -> bool {
262 self.columns.iter().any(|c| c.nullable)
263 }
264
265 pub fn fixed_row_size(&self) -> Option<usize> {
267 if self.has_variable_columns() {
268 return None;
269 }
270
271 let mut size = 0;
272 for col in &self.columns {
273 match col.col_type {
274 TbpColumnType::FixedBinary => {
275 size += col.fixed_size.unwrap_or(0) as usize;
276 }
277 _ => {
278 size += col.col_type.fixed_size()?;
279 }
280 }
281 }
282 Some(size)
283 }
284}
285
286#[derive(Debug, Clone)]
288pub struct TbpHeader {
289 pub magic: u32,
291 pub version: u16,
293 pub flags: TbpFlags,
295 pub schema_id: u64,
297 pub row_count: u32,
299 pub column_count: u16,
301 pub reserved: u16,
303 pub null_bitmap_offset: u32,
305 pub row_index_offset: u32,
307}
308
309impl TbpHeader {
310 pub fn write<W: Write>(&self, w: &mut W) -> io::Result<()> {
312 w.write_u32::<LittleEndian>(self.magic)?;
313 w.write_u16::<LittleEndian>(self.version)?;
314 w.write_u16::<LittleEndian>(self.flags.0)?;
315 w.write_u64::<LittleEndian>(self.schema_id)?;
316 w.write_u32::<LittleEndian>(self.row_count)?;
317 w.write_u16::<LittleEndian>(self.column_count)?;
318 w.write_u16::<LittleEndian>(self.reserved)?;
319 w.write_u32::<LittleEndian>(self.null_bitmap_offset)?;
320 w.write_u32::<LittleEndian>(self.row_index_offset)?;
321 Ok(())
322 }
323
324 pub fn read(data: &[u8]) -> io::Result<Self> {
326 if data.len() < TBP_HEADER_SIZE {
327 return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "Header too short"));
328 }
329
330 let mut cursor = std::io::Cursor::new(data);
331 let magic = cursor.read_u32::<LittleEndian>()?;
332 if magic != TBP_MAGIC {
333 return Err(io::Error::new(io::ErrorKind::InvalidData, "Invalid TBP magic"));
334 }
335
336 Ok(Self {
337 magic,
338 version: cursor.read_u16::<LittleEndian>()?,
339 flags: TbpFlags(cursor.read_u16::<LittleEndian>()?),
340 schema_id: cursor.read_u64::<LittleEndian>()?,
341 row_count: cursor.read_u32::<LittleEndian>()?,
342 column_count: cursor.read_u16::<LittleEndian>()?,
343 reserved: cursor.read_u16::<LittleEndian>()?,
344 null_bitmap_offset: cursor.read_u32::<LittleEndian>()?,
345 row_index_offset: cursor.read_u32::<LittleEndian>()?,
346 })
347 }
348}
349
350#[derive(Debug, Clone, Copy)]
352pub struct NullBitmap<'a> {
353 data: &'a [u8],
354 columns: usize,
355}
356
357impl<'a> NullBitmap<'a> {
358 pub fn new(data: &'a [u8], columns: usize) -> Self {
359 Self { data, columns }
360 }
361
362 #[inline]
364 pub fn is_null(&self, row: usize, col: usize) -> bool {
365 let bit_idx = row * self.columns + col;
366 let byte_idx = bit_idx / 8;
367 let bit_pos = bit_idx % 8;
368
369 if byte_idx >= self.data.len() {
370 return false;
371 }
372
373 self.data[byte_idx] & (1 << bit_pos) != 0
374 }
375
376 pub fn required_size(rows: usize, cols: usize) -> usize {
378 (rows * cols + 7) / 8
379 }
380}
381
382pub struct NullBitmapMut {
384 data: Vec<u8>,
385 columns: usize,
386}
387
388impl NullBitmapMut {
389 pub fn new(rows: usize, columns: usize) -> Self {
390 let size = NullBitmap::required_size(rows, columns);
391 Self {
392 data: vec![0; size],
393 columns,
394 }
395 }
396
397 #[inline]
399 pub fn set_null(&mut self, row: usize, col: usize) {
400 let bit_idx = row * self.columns + col;
401 let byte_idx = bit_idx / 8;
402 let bit_pos = bit_idx % 8;
403
404 if byte_idx < self.data.len() {
405 self.data[byte_idx] |= 1 << bit_pos;
406 }
407 }
408
409 pub fn as_bytes(&self) -> &[u8] {
411 &self.data
412 }
413
414 pub fn into_bytes(self) -> Vec<u8> {
416 self.data
417 }
418}
419
420#[derive(Debug, Clone)]
422pub struct RowView<'a> {
423 schema: &'a TbpSchema,
425 data: &'a [u8],
427 null_bitmap: Option<&'a NullBitmap<'a>>,
429 row_idx: usize,
431}
432
433impl<'a> RowView<'a> {
434 pub fn new(
435 schema: &'a TbpSchema,
436 data: &'a [u8],
437 null_bitmap: Option<&'a NullBitmap<'a>>,
438 row_idx: usize,
439 ) -> Self {
440 Self {
441 schema,
442 data,
443 null_bitmap,
444 row_idx,
445 }
446 }
447
448 #[inline]
450 pub fn is_null(&self, col: usize) -> bool {
451 self.null_bitmap
452 .map(|b| b.is_null(self.row_idx, col))
453 .unwrap_or(false)
454 }
455
456 fn column_offset(&self, col: usize) -> usize {
458 let mut offset = 0;
459 for c in &self.schema.columns[..col] {
460 offset += match c.col_type {
461 TbpColumnType::FixedBinary => c.fixed_size.unwrap_or(0) as usize,
462 _ => c.col_type.fixed_size().unwrap_or(0),
463 };
464 }
465 offset
466 }
467
468 pub fn read_bool(&self, col: usize) -> Option<bool> {
470 if self.is_null(col) {
471 return None;
472 }
473 let offset = self.column_offset(col);
474 Some(self.data.get(offset).copied().unwrap_or(0) != 0)
475 }
476
477 pub fn read_i64(&self, col: usize) -> Option<i64> {
479 if self.is_null(col) {
480 return None;
481 }
482 let offset = self.column_offset(col);
483 if offset + 8 > self.data.len() {
484 return None;
485 }
486 let bytes: [u8; 8] = self.data[offset..offset + 8].try_into().ok()?;
487 Some(i64::from_le_bytes(bytes))
488 }
489
490 pub fn read_u64(&self, col: usize) -> Option<u64> {
492 if self.is_null(col) {
493 return None;
494 }
495 let offset = self.column_offset(col);
496 if offset + 8 > self.data.len() {
497 return None;
498 }
499 let bytes: [u8; 8] = self.data[offset..offset + 8].try_into().ok()?;
500 Some(u64::from_le_bytes(bytes))
501 }
502
503 pub fn read_f64(&self, col: usize) -> Option<f64> {
505 if self.is_null(col) {
506 return None;
507 }
508 let offset = self.column_offset(col);
509 if offset + 8 > self.data.len() {
510 return None;
511 }
512 let bytes: [u8; 8] = self.data[offset..offset + 8].try_into().ok()?;
513 Some(f64::from_le_bytes(bytes))
514 }
515
516 pub fn read_i32(&self, col: usize) -> Option<i32> {
518 if self.is_null(col) {
519 return None;
520 }
521 let offset = self.column_offset(col);
522 if offset + 4 > self.data.len() {
523 return None;
524 }
525 let bytes: [u8; 4] = self.data[offset..offset + 4].try_into().ok()?;
526 Some(i32::from_le_bytes(bytes))
527 }
528
529 pub fn read_f32(&self, col: usize) -> Option<f32> {
531 if self.is_null(col) {
532 return None;
533 }
534 let offset = self.column_offset(col);
535 if offset + 4 > self.data.len() {
536 return None;
537 }
538 let bytes: [u8; 4] = self.data[offset..offset + 4].try_into().ok()?;
539 Some(f32::from_le_bytes(bytes))
540 }
541
542 pub fn raw_data(&self) -> &[u8] {
544 self.data
545 }
546}
547
548pub struct TbpWriter {
550 schema: TbpSchema,
551 null_bitmap: NullBitmapMut,
552 row_index: Vec<u32>,
553 data: Vec<u8>,
554 row_count: usize,
555}
556
557impl TbpWriter {
558 pub fn new(schema: TbpSchema, estimated_rows: usize) -> Self {
559 Self {
560 null_bitmap: NullBitmapMut::new(estimated_rows, schema.columns.len()),
561 row_index: Vec::with_capacity(estimated_rows),
562 data: Vec::with_capacity(estimated_rows * schema.fixed_row_size().unwrap_or(64)),
563 row_count: 0,
564 schema,
565 }
566 }
567
568 pub fn start_row(&mut self) -> TbpRowWriter<'_> {
570 let offset = self.data.len() as u32;
571 self.row_index.push(offset);
572 TbpRowWriter {
573 writer: self,
574 col_idx: 0,
575 }
576 }
577
578 fn set_null(&mut self, row: usize, col: usize) {
580 self.null_bitmap.set_null(row, col);
581 }
582
583 pub fn finish(self) -> Vec<u8> {
585 let has_nulls = self.schema.has_nullable_columns();
586 let has_variable = self.schema.has_variable_columns();
587
588 let mut flags = TbpFlags(0);
589 if has_nulls {
590 flags.0 |= TbpFlags::HAS_NULLS;
591 }
592 if has_variable {
593 flags.0 |= TbpFlags::HAS_ROW_INDEX;
594 }
595
596 let null_bitmap_offset = if has_nulls { TBP_HEADER_SIZE as u32 } else { 0 };
598 let null_bitmap_size = if has_nulls {
599 NullBitmap::required_size(self.row_count, self.schema.columns.len())
600 } else {
601 0
602 };
603
604 let row_index_offset = if has_variable {
605 (TBP_HEADER_SIZE + null_bitmap_size) as u32
606 } else {
607 0
608 };
609 let row_index_size = if has_variable {
610 self.row_count * 4
611 } else {
612 0
613 };
614
615 let data_offset = TBP_HEADER_SIZE + null_bitmap_size + row_index_size;
616
617 let header = TbpHeader {
618 magic: TBP_MAGIC,
619 version: TBP_VERSION,
620 flags,
621 schema_id: self.schema.schema_id,
622 row_count: self.row_count as u32,
623 column_count: self.schema.columns.len() as u16,
624 reserved: 0,
625 null_bitmap_offset,
626 row_index_offset,
627 };
628
629 let total_size = data_offset + self.data.len();
630 let mut buffer = Vec::with_capacity(total_size);
631
632 header.write(&mut buffer).unwrap();
634
635 if has_nulls {
637 let required = NullBitmap::required_size(self.row_count, self.schema.columns.len());
638 buffer.extend_from_slice(&self.null_bitmap.as_bytes()[..required]);
639 }
640
641 if has_variable {
643 for offset in &self.row_index {
644 buffer.write_u32::<LittleEndian>(*offset + data_offset as u32).unwrap();
645 }
646 }
647
648 buffer.extend_from_slice(&self.data);
650
651 buffer
652 }
653}
654
655pub struct TbpRowWriter<'a> {
657 writer: &'a mut TbpWriter,
658 col_idx: usize,
659}
660
661impl<'a> TbpRowWriter<'a> {
662 pub fn write_null(mut self) -> Self {
664 self.writer.set_null(self.writer.row_count, self.col_idx);
665 self.col_idx += 1;
666 self
667 }
668
669 pub fn write_bool(mut self, value: bool) -> Self {
671 self.writer.data.push(if value { 1 } else { 0 });
672 self.col_idx += 1;
673 self
674 }
675
676 pub fn write_i64(mut self, value: i64) -> Self {
678 self.writer.data.extend_from_slice(&value.to_le_bytes());
679 self.col_idx += 1;
680 self
681 }
682
683 pub fn write_u64(mut self, value: u64) -> Self {
685 self.writer.data.extend_from_slice(&value.to_le_bytes());
686 self.col_idx += 1;
687 self
688 }
689
690 pub fn write_f64(mut self, value: f64) -> Self {
692 self.writer.data.extend_from_slice(&value.to_le_bytes());
693 self.col_idx += 1;
694 self
695 }
696
697 pub fn write_i32(mut self, value: i32) -> Self {
699 self.writer.data.extend_from_slice(&value.to_le_bytes());
700 self.col_idx += 1;
701 self
702 }
703
704 pub fn write_f32(mut self, value: f32) -> Self {
706 self.writer.data.extend_from_slice(&value.to_le_bytes());
707 self.col_idx += 1;
708 self
709 }
710
711 pub fn write_string(mut self, value: &str) -> Self {
713 let bytes = value.as_bytes();
714 self.writer.data.write_u32::<LittleEndian>(bytes.len() as u32).unwrap();
715 self.writer.data.extend_from_slice(bytes);
716 self.col_idx += 1;
717 self
718 }
719
720 pub fn write_binary(mut self, value: &[u8]) -> Self {
722 self.writer.data.write_u32::<LittleEndian>(value.len() as u32).unwrap();
723 self.writer.data.extend_from_slice(value);
724 self.col_idx += 1;
725 self
726 }
727
728 pub fn finish(self) {
730 self.writer.row_count += 1;
731 }
732}
733
734pub struct TbpReader<'a> {
736 data: &'a [u8],
737 header: TbpHeader,
738 schema: &'a TbpSchema,
739}
740
741impl<'a> TbpReader<'a> {
742 pub fn new(data: &'a [u8], schema: &'a TbpSchema) -> io::Result<Self> {
744 let header = TbpHeader::read(data)?;
745
746 if header.schema_id != schema.schema_id {
747 return Err(io::Error::new(
748 io::ErrorKind::InvalidData,
749 "Schema ID mismatch",
750 ));
751 }
752
753 Ok(Self {
754 data,
755 header,
756 schema,
757 })
758 }
759
760 pub fn row_count(&self) -> usize {
762 self.header.row_count as usize
763 }
764
765 pub fn get_row(&self, row: usize) -> Option<RowView<'_>> {
767 if row >= self.row_count() {
768 return None;
769 }
770
771 let row_offset = if self.header.flags.has_row_index() {
773 let idx_offset = self.header.row_index_offset as usize + row * 4;
774 if idx_offset + 4 > self.data.len() {
775 return None;
776 }
777 let bytes: [u8; 4] = self.data[idx_offset..idx_offset + 4].try_into().ok()?;
778 u32::from_le_bytes(bytes) as usize
779 } else {
780 let row_size = self.schema.fixed_row_size()?;
782 let null_bitmap_size = if self.header.flags.has_nulls() {
783 NullBitmap::required_size(self.row_count(), self.schema.columns.len())
784 } else {
785 0
786 };
787 TBP_HEADER_SIZE + null_bitmap_size + row * row_size
788 };
789
790 let row_data = &self.data[row_offset..];
791
792 Some(RowView::new(self.schema, row_data, None, row))
794 }
795
796 pub fn iter(&'a self) -> impl Iterator<Item = RowView<'a>> {
798 (0..self.row_count()).filter_map(move |i| self.get_row(i))
799 }
800}
801
802#[cfg(test)]
803mod tests {
804 use super::*;
805
806 #[test]
807 fn test_header_roundtrip() {
808 let header = TbpHeader {
809 magic: TBP_MAGIC,
810 version: TBP_VERSION,
811 flags: TbpFlags(TbpFlags::HAS_NULLS | TbpFlags::HAS_ROW_INDEX),
812 schema_id: 12345678,
813 row_count: 100,
814 column_count: 5,
815 reserved: 0,
816 null_bitmap_offset: 32,
817 row_index_offset: 48,
818 };
819
820 let mut buffer = Vec::new();
821 header.write(&mut buffer).unwrap();
822 assert_eq!(buffer.len(), TBP_HEADER_SIZE);
823
824 let parsed = TbpHeader::read(&buffer).unwrap();
825 assert_eq!(parsed.magic, TBP_MAGIC);
826 assert_eq!(parsed.version, TBP_VERSION);
827 assert_eq!(parsed.row_count, 100);
828 assert_eq!(parsed.column_count, 5);
829 }
830
831 #[test]
832 fn test_null_bitmap() {
833 let mut bitmap = NullBitmapMut::new(10, 5);
834 bitmap.set_null(0, 0);
835 bitmap.set_null(5, 3);
836 bitmap.set_null(9, 4);
837
838 let data = bitmap.as_bytes();
839 let reader = NullBitmap::new(data, 5);
840
841 assert!(reader.is_null(0, 0));
842 assert!(!reader.is_null(0, 1));
843 assert!(reader.is_null(5, 3));
844 assert!(reader.is_null(9, 4));
845 assert!(!reader.is_null(9, 3));
846 }
847
848 #[test]
849 fn test_writer_reader_roundtrip() {
850 let schema = TbpSchema::new(
851 "test_table",
852 vec![
853 TbpColumn::new("id", TbpColumnType::Int64).not_null(),
854 TbpColumn::new("value", TbpColumnType::Float64),
855 ],
856 );
857
858 let mut writer = TbpWriter::new(schema.clone(), 100);
859
860 for i in 0..10 {
862 writer
863 .start_row()
864 .write_i64(i)
865 .write_f64(i as f64 * 1.5)
866 .finish();
867 }
868
869 let data = writer.finish();
870
871 let reader = TbpReader::new(&data, &schema).unwrap();
873 assert_eq!(reader.row_count(), 10);
874
875 let row = reader.get_row(5).unwrap();
876 assert_eq!(row.read_i64(0), Some(5));
877 assert_eq!(row.read_f64(1), Some(7.5));
878 }
879}