1use std::io::{self, Write};
62
63use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
64
65pub const TBP_MAGIC: u32 = 0x544F_4F4E;
67
68pub const TBP_VERSION: u16 = 1;
70
71pub const TBP_HEADER_SIZE: usize = 32;
73
74#[derive(Debug, Clone, Copy, Default)]
76pub struct TbpFlags(pub u16);
77
78impl TbpFlags {
79 pub const HAS_NULLS: u16 = 1 << 0;
81 pub const HAS_ROW_INDEX: u16 = 1 << 1;
83 pub const COMPRESSED: u16 = 1 << 2;
85 pub const EMBEDDED_SCHEMA: u16 = 1 << 3;
87
88 pub fn has_nulls(&self) -> bool {
89 self.0 & Self::HAS_NULLS != 0
90 }
91
92 pub fn has_row_index(&self) -> bool {
93 self.0 & Self::HAS_ROW_INDEX != 0
94 }
95
96 pub fn is_compressed(&self) -> bool {
97 self.0 & Self::COMPRESSED != 0
98 }
99
100 pub fn has_embedded_schema(&self) -> bool {
101 self.0 & Self::EMBEDDED_SCHEMA != 0
102 }
103}
104
105#[derive(Debug, Clone, Copy, PartialEq, Eq)]
107#[repr(u8)]
108pub enum TbpColumnType {
109 Null = 0,
111 Bool = 1,
113 Int8 = 2,
115 UInt8 = 3,
117 Int16 = 4,
119 UInt16 = 5,
121 Int32 = 6,
123 UInt32 = 7,
125 Int64 = 8,
127 UInt64 = 9,
129 Float32 = 10,
131 Float64 = 11,
133 String = 12,
135 Binary = 13,
137 Timestamp = 14,
139 FixedBinary = 15,
141}
142
143impl TbpColumnType {
144 pub fn fixed_size(&self) -> Option<usize> {
146 match self {
147 TbpColumnType::Null => Some(0),
148 TbpColumnType::Bool => Some(1),
149 TbpColumnType::Int8 | TbpColumnType::UInt8 => Some(1),
150 TbpColumnType::Int16 | TbpColumnType::UInt16 => Some(2),
151 TbpColumnType::Int32 | TbpColumnType::UInt32 | TbpColumnType::Float32 => Some(4),
152 TbpColumnType::Int64 | TbpColumnType::UInt64 | TbpColumnType::Float64 | TbpColumnType::Timestamp => Some(8),
153 TbpColumnType::String | TbpColumnType::Binary => None,
154 TbpColumnType::FixedBinary => None, }
156 }
157
158 pub fn is_variable(&self) -> bool {
160 self.fixed_size().is_none()
161 }
162
163 pub fn from_byte(b: u8) -> Option<Self> {
164 match b {
165 0 => Some(Self::Null),
166 1 => Some(Self::Bool),
167 2 => Some(Self::Int8),
168 3 => Some(Self::UInt8),
169 4 => Some(Self::Int16),
170 5 => Some(Self::UInt16),
171 6 => Some(Self::Int32),
172 7 => Some(Self::UInt32),
173 8 => Some(Self::Int64),
174 9 => Some(Self::UInt64),
175 10 => Some(Self::Float32),
176 11 => Some(Self::Float64),
177 12 => Some(Self::String),
178 13 => Some(Self::Binary),
179 14 => Some(Self::Timestamp),
180 15 => Some(Self::FixedBinary),
181 _ => None,
182 }
183 }
184}
185
186#[derive(Debug, Clone)]
188pub struct TbpColumn {
189 pub name: String,
191 pub col_type: TbpColumnType,
193 pub fixed_size: Option<u16>,
195 pub nullable: bool,
197}
198
199impl TbpColumn {
200 pub fn new(name: impl Into<String>, col_type: TbpColumnType) -> Self {
201 Self {
202 name: name.into(),
203 col_type,
204 fixed_size: None,
205 nullable: true,
206 }
207 }
208
209 pub fn with_fixed_size(mut self, size: u16) -> Self {
210 self.fixed_size = Some(size);
211 self
212 }
213
214 pub fn not_null(mut self) -> Self {
215 self.nullable = false;
216 self
217 }
218}
219
220#[derive(Debug, Clone)]
222pub struct TbpSchema {
223 pub name: String,
225 pub columns: Vec<TbpColumn>,
227 pub schema_id: u64,
229}
230
231impl TbpSchema {
232 pub fn new(name: impl Into<String>, columns: Vec<TbpColumn>) -> Self {
233 let name = name.into();
234 let schema_id = Self::compute_schema_id(&name, &columns);
235 Self {
236 name,
237 columns,
238 schema_id,
239 }
240 }
241
242 fn compute_schema_id(name: &str, columns: &[TbpColumn]) -> u64 {
244 use std::hash::{Hash, Hasher};
245 use std::collections::hash_map::DefaultHasher;
246
247 let mut hasher = DefaultHasher::new();
248 name.hash(&mut hasher);
249 for col in columns {
250 col.name.hash(&mut hasher);
251 (col.col_type as u8).hash(&mut hasher);
252 col.fixed_size.hash(&mut hasher);
253 col.nullable.hash(&mut hasher);
254 }
255 hasher.finish()
256 }
257
258 pub fn has_variable_columns(&self) -> bool {
260 self.columns.iter().any(|c| c.col_type.is_variable())
261 }
262
263 pub fn has_nullable_columns(&self) -> bool {
265 self.columns.iter().any(|c| c.nullable)
266 }
267
268 pub fn fixed_row_size(&self) -> Option<usize> {
270 if self.has_variable_columns() {
271 return None;
272 }
273
274 let mut size = 0;
275 for col in &self.columns {
276 match col.col_type {
277 TbpColumnType::FixedBinary => {
278 size += col.fixed_size.unwrap_or(0) as usize;
279 }
280 _ => {
281 size += col.col_type.fixed_size()?;
282 }
283 }
284 }
285 Some(size)
286 }
287}
288
289#[derive(Debug, Clone)]
291pub struct TbpHeader {
292 pub magic: u32,
294 pub version: u16,
296 pub flags: TbpFlags,
298 pub schema_id: u64,
300 pub row_count: u32,
302 pub column_count: u16,
304 pub reserved: u16,
306 pub null_bitmap_offset: u32,
308 pub row_index_offset: u32,
310}
311
312impl TbpHeader {
313 pub fn write<W: Write>(&self, w: &mut W) -> io::Result<()> {
315 w.write_u32::<LittleEndian>(self.magic)?;
316 w.write_u16::<LittleEndian>(self.version)?;
317 w.write_u16::<LittleEndian>(self.flags.0)?;
318 w.write_u64::<LittleEndian>(self.schema_id)?;
319 w.write_u32::<LittleEndian>(self.row_count)?;
320 w.write_u16::<LittleEndian>(self.column_count)?;
321 w.write_u16::<LittleEndian>(self.reserved)?;
322 w.write_u32::<LittleEndian>(self.null_bitmap_offset)?;
323 w.write_u32::<LittleEndian>(self.row_index_offset)?;
324 Ok(())
325 }
326
327 pub fn read(data: &[u8]) -> io::Result<Self> {
329 if data.len() < TBP_HEADER_SIZE {
330 return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "Header too short"));
331 }
332
333 let mut cursor = std::io::Cursor::new(data);
334 let magic = cursor.read_u32::<LittleEndian>()?;
335 if magic != TBP_MAGIC {
336 return Err(io::Error::new(io::ErrorKind::InvalidData, "Invalid TBP magic"));
337 }
338
339 let header = Self {
340 magic,
341 version: cursor.read_u16::<LittleEndian>()?,
342 flags: TbpFlags(cursor.read_u16::<LittleEndian>()?),
343 schema_id: cursor.read_u64::<LittleEndian>()?,
344 row_count: cursor.read_u32::<LittleEndian>()?,
345 column_count: cursor.read_u16::<LittleEndian>()?,
346 reserved: cursor.read_u16::<LittleEndian>()?,
347 null_bitmap_offset: cursor.read_u32::<LittleEndian>()?,
348 row_index_offset: cursor.read_u32::<LittleEndian>()?,
349 };
350
351 let data_len = data.len() as u64;
352
353 if data_len > TBP_HEADER_SIZE as u64 {
358 if header.null_bitmap_offset != 0 && (header.null_bitmap_offset as u64) >= data_len {
359 return Err(io::Error::new(
360 io::ErrorKind::InvalidData,
361 format!(
362 "null_bitmap_offset ({}) exceeds data length ({})",
363 header.null_bitmap_offset, data_len
364 ),
365 ));
366 }
367 if header.row_index_offset != 0 && (header.row_index_offset as u64) >= data_len {
368 return Err(io::Error::new(
369 io::ErrorKind::InvalidData,
370 format!(
371 "row_index_offset ({}) exceeds data length ({})",
372 header.row_index_offset, data_len
373 ),
374 ));
375 }
376 }
377
378 Ok(header)
379 }
380}
381
382#[derive(Debug, Clone, Copy)]
384pub struct NullBitmap<'a> {
385 data: &'a [u8],
386 columns: usize,
387}
388
389impl<'a> NullBitmap<'a> {
390 pub fn new(data: &'a [u8], columns: usize) -> Self {
391 Self { data, columns }
392 }
393
394 #[inline]
396 pub fn is_null(&self, row: usize, col: usize) -> bool {
397 let bit_idx = row * self.columns + col;
398 let byte_idx = bit_idx / 8;
399 let bit_pos = bit_idx % 8;
400
401 if byte_idx >= self.data.len() {
402 return false;
403 }
404
405 self.data[byte_idx] & (1 << bit_pos) != 0
406 }
407
408 pub fn required_size(rows: usize, cols: usize) -> usize {
410 (rows * cols + 7) / 8
411 }
412}
413
414pub struct NullBitmapMut {
416 data: Vec<u8>,
417 columns: usize,
418}
419
420impl NullBitmapMut {
421 pub fn new(rows: usize, columns: usize) -> Self {
422 let size = NullBitmap::required_size(rows, columns);
423 Self {
424 data: vec![0; size],
425 columns,
426 }
427 }
428
429 #[inline]
431 pub fn set_null(&mut self, row: usize, col: usize) {
432 let bit_idx = row * self.columns + col;
433 let byte_idx = bit_idx / 8;
434 let bit_pos = bit_idx % 8;
435
436 if byte_idx < self.data.len() {
437 self.data[byte_idx] |= 1 << bit_pos;
438 }
439 }
440
441 pub fn as_bytes(&self) -> &[u8] {
443 &self.data
444 }
445
446 pub fn into_bytes(self) -> Vec<u8> {
448 self.data
449 }
450}
451
452#[derive(Debug, Clone)]
454pub struct RowView<'a> {
455 schema: &'a TbpSchema,
457 data: &'a [u8],
459 null_bitmap: Option<&'a NullBitmap<'a>>,
461 row_idx: usize,
463}
464
465impl<'a> RowView<'a> {
466 pub fn new(
467 schema: &'a TbpSchema,
468 data: &'a [u8],
469 null_bitmap: Option<&'a NullBitmap<'a>>,
470 row_idx: usize,
471 ) -> Self {
472 Self {
473 schema,
474 data,
475 null_bitmap,
476 row_idx,
477 }
478 }
479
480 #[inline]
482 pub fn is_null(&self, col: usize) -> bool {
483 self.null_bitmap
484 .map(|b| b.is_null(self.row_idx, col))
485 .unwrap_or(false)
486 }
487
488 fn column_offset(&self, col: usize) -> usize {
490 let mut offset = 0;
491 for c in &self.schema.columns[..col] {
492 offset += match c.col_type {
493 TbpColumnType::FixedBinary => c.fixed_size.unwrap_or(0) as usize,
494 _ => c.col_type.fixed_size().unwrap_or(0),
495 };
496 }
497 offset
498 }
499
500 pub fn read_bool(&self, col: usize) -> Option<bool> {
502 if self.is_null(col) {
503 return None;
504 }
505 let offset = self.column_offset(col);
506 Some(self.data.get(offset).copied().unwrap_or(0) != 0)
507 }
508
509 pub fn read_i64(&self, col: usize) -> Option<i64> {
511 if self.is_null(col) {
512 return None;
513 }
514 let offset = self.column_offset(col);
515 if offset + 8 > self.data.len() {
516 return None;
517 }
518 let bytes: [u8; 8] = self.data[offset..offset + 8].try_into().ok()?;
519 Some(i64::from_le_bytes(bytes))
520 }
521
522 pub fn read_u64(&self, col: usize) -> Option<u64> {
524 if self.is_null(col) {
525 return None;
526 }
527 let offset = self.column_offset(col);
528 if offset + 8 > self.data.len() {
529 return None;
530 }
531 let bytes: [u8; 8] = self.data[offset..offset + 8].try_into().ok()?;
532 Some(u64::from_le_bytes(bytes))
533 }
534
535 pub fn read_f64(&self, col: usize) -> Option<f64> {
537 if self.is_null(col) {
538 return None;
539 }
540 let offset = self.column_offset(col);
541 if offset + 8 > self.data.len() {
542 return None;
543 }
544 let bytes: [u8; 8] = self.data[offset..offset + 8].try_into().ok()?;
545 Some(f64::from_le_bytes(bytes))
546 }
547
548 pub fn read_i32(&self, col: usize) -> Option<i32> {
550 if self.is_null(col) {
551 return None;
552 }
553 let offset = self.column_offset(col);
554 if offset + 4 > self.data.len() {
555 return None;
556 }
557 let bytes: [u8; 4] = self.data[offset..offset + 4].try_into().ok()?;
558 Some(i32::from_le_bytes(bytes))
559 }
560
561 pub fn read_f32(&self, col: usize) -> Option<f32> {
563 if self.is_null(col) {
564 return None;
565 }
566 let offset = self.column_offset(col);
567 if offset + 4 > self.data.len() {
568 return None;
569 }
570 let bytes: [u8; 4] = self.data[offset..offset + 4].try_into().ok()?;
571 Some(f32::from_le_bytes(bytes))
572 }
573
574 pub fn raw_data(&self) -> &[u8] {
576 self.data
577 }
578}
579
580pub struct TbpWriter {
582 schema: TbpSchema,
583 null_bitmap: NullBitmapMut,
584 row_index: Vec<u32>,
585 data: Vec<u8>,
586 row_count: usize,
587}
588
589impl TbpWriter {
590 pub fn new(schema: TbpSchema, estimated_rows: usize) -> Self {
591 Self {
592 null_bitmap: NullBitmapMut::new(estimated_rows, schema.columns.len()),
593 row_index: Vec::with_capacity(estimated_rows),
594 data: Vec::with_capacity(estimated_rows * schema.fixed_row_size().unwrap_or(64)),
595 row_count: 0,
596 schema,
597 }
598 }
599
600 pub fn start_row(&mut self) -> TbpRowWriter<'_> {
602 let offset = self.data.len() as u32;
603 self.row_index.push(offset);
604 TbpRowWriter {
605 writer: self,
606 col_idx: 0,
607 }
608 }
609
610 fn set_null(&mut self, row: usize, col: usize) {
612 self.null_bitmap.set_null(row, col);
613 }
614
615 pub fn finish(self) -> Vec<u8> {
617 let has_nulls = self.schema.has_nullable_columns();
618 let has_variable = self.schema.has_variable_columns();
619
620 let mut flags = TbpFlags(0);
621 if has_nulls {
622 flags.0 |= TbpFlags::HAS_NULLS;
623 }
624 if has_variable {
625 flags.0 |= TbpFlags::HAS_ROW_INDEX;
626 }
627
628 let null_bitmap_offset = if has_nulls { TBP_HEADER_SIZE as u32 } else { 0 };
630 let null_bitmap_size = if has_nulls {
631 NullBitmap::required_size(self.row_count, self.schema.columns.len())
632 } else {
633 0
634 };
635
636 let row_index_offset = if has_variable {
637 (TBP_HEADER_SIZE + null_bitmap_size) as u32
638 } else {
639 0
640 };
641 let row_index_size = if has_variable {
642 self.row_count * 4
643 } else {
644 0
645 };
646
647 let data_offset = TBP_HEADER_SIZE + null_bitmap_size + row_index_size;
648
649 let header = TbpHeader {
650 magic: TBP_MAGIC,
651 version: TBP_VERSION,
652 flags,
653 schema_id: self.schema.schema_id,
654 row_count: self.row_count as u32,
655 column_count: self.schema.columns.len() as u16,
656 reserved: 0,
657 null_bitmap_offset,
658 row_index_offset,
659 };
660
661 let total_size = data_offset + self.data.len();
662 let mut buffer = Vec::with_capacity(total_size);
663
664 header.write(&mut buffer).unwrap();
666
667 if has_nulls {
669 let required = NullBitmap::required_size(self.row_count, self.schema.columns.len());
670 buffer.extend_from_slice(&self.null_bitmap.as_bytes()[..required]);
671 }
672
673 if has_variable {
675 for offset in &self.row_index {
676 buffer.write_u32::<LittleEndian>(*offset + data_offset as u32).unwrap();
677 }
678 }
679
680 buffer.extend_from_slice(&self.data);
682
683 buffer
684 }
685}
686
687pub struct TbpRowWriter<'a> {
689 writer: &'a mut TbpWriter,
690 col_idx: usize,
691}
692
693impl<'a> TbpRowWriter<'a> {
694 pub fn write_null(mut self) -> Self {
696 self.writer.set_null(self.writer.row_count, self.col_idx);
697 self.col_idx += 1;
698 self
699 }
700
701 pub fn write_bool(mut self, value: bool) -> Self {
703 self.writer.data.push(if value { 1 } else { 0 });
704 self.col_idx += 1;
705 self
706 }
707
708 pub fn write_i64(mut self, value: i64) -> Self {
710 self.writer.data.extend_from_slice(&value.to_le_bytes());
711 self.col_idx += 1;
712 self
713 }
714
715 pub fn write_u64(mut self, value: u64) -> Self {
717 self.writer.data.extend_from_slice(&value.to_le_bytes());
718 self.col_idx += 1;
719 self
720 }
721
722 pub fn write_f64(mut self, value: f64) -> Self {
724 self.writer.data.extend_from_slice(&value.to_le_bytes());
725 self.col_idx += 1;
726 self
727 }
728
729 pub fn write_i32(mut self, value: i32) -> Self {
731 self.writer.data.extend_from_slice(&value.to_le_bytes());
732 self.col_idx += 1;
733 self
734 }
735
736 pub fn write_f32(mut self, value: f32) -> Self {
738 self.writer.data.extend_from_slice(&value.to_le_bytes());
739 self.col_idx += 1;
740 self
741 }
742
743 pub fn write_string(mut self, value: &str) -> Self {
745 let bytes = value.as_bytes();
746 self.writer.data.write_u32::<LittleEndian>(bytes.len() as u32).unwrap();
747 self.writer.data.extend_from_slice(bytes);
748 self.col_idx += 1;
749 self
750 }
751
752 pub fn write_binary(mut self, value: &[u8]) -> Self {
754 self.writer.data.write_u32::<LittleEndian>(value.len() as u32).unwrap();
755 self.writer.data.extend_from_slice(value);
756 self.col_idx += 1;
757 self
758 }
759
760 pub fn finish(self) {
762 self.writer.row_count += 1;
763 }
764}
765
766pub struct TbpReader<'a> {
768 data: &'a [u8],
769 header: TbpHeader,
770 schema: &'a TbpSchema,
771}
772
773impl<'a> TbpReader<'a> {
774 pub fn new(data: &'a [u8], schema: &'a TbpSchema) -> io::Result<Self> {
776 let header = TbpHeader::read(data)?;
777
778 if header.schema_id != schema.schema_id {
779 return Err(io::Error::new(
780 io::ErrorKind::InvalidData,
781 "Schema ID mismatch",
782 ));
783 }
784
785 Ok(Self {
786 data,
787 header,
788 schema,
789 })
790 }
791
792 pub fn row_count(&self) -> usize {
794 self.header.row_count as usize
795 }
796
797 pub fn get_row(&self, row: usize) -> Option<RowView<'_>> {
799 if row >= self.row_count() {
800 return None;
801 }
802
803 let row_offset = if self.header.flags.has_row_index() {
805 let idx_offset = self.header.row_index_offset as usize + row * 4;
806 if idx_offset + 4 > self.data.len() {
807 return None;
808 }
809 let bytes: [u8; 4] = self.data[idx_offset..idx_offset + 4].try_into().ok()?;
810 u32::from_le_bytes(bytes) as usize
811 } else {
812 let row_size = self.schema.fixed_row_size()?;
814 let null_bitmap_size = if self.header.flags.has_nulls() {
815 NullBitmap::required_size(self.row_count(), self.schema.columns.len())
816 } else {
817 0
818 };
819 TBP_HEADER_SIZE + null_bitmap_size + row * row_size
820 };
821
822 let row_data = &self.data[row_offset..];
823
824 Some(RowView::new(self.schema, row_data, None, row))
826 }
827
828 pub fn iter(&'a self) -> impl Iterator<Item = RowView<'a>> {
830 (0..self.row_count()).filter_map(move |i| self.get_row(i))
831 }
832}
833
834#[cfg(test)]
835mod tests {
836 use super::*;
837
838 #[test]
839 fn test_header_roundtrip() {
840 let header = TbpHeader {
841 magic: TBP_MAGIC,
842 version: TBP_VERSION,
843 flags: TbpFlags(TbpFlags::HAS_NULLS | TbpFlags::HAS_ROW_INDEX),
844 schema_id: 12345678,
845 row_count: 100,
846 column_count: 5,
847 reserved: 0,
848 null_bitmap_offset: 32,
849 row_index_offset: 48,
850 };
851
852 let mut buffer = Vec::new();
853 header.write(&mut buffer).unwrap();
854 assert_eq!(buffer.len(), TBP_HEADER_SIZE);
855
856 let parsed = TbpHeader::read(&buffer).unwrap();
857 assert_eq!(parsed.magic, TBP_MAGIC);
858 assert_eq!(parsed.version, TBP_VERSION);
859 assert_eq!(parsed.row_count, 100);
860 assert_eq!(parsed.column_count, 5);
861 }
862
863 #[test]
864 fn test_null_bitmap() {
865 let mut bitmap = NullBitmapMut::new(10, 5);
866 bitmap.set_null(0, 0);
867 bitmap.set_null(5, 3);
868 bitmap.set_null(9, 4);
869
870 let data = bitmap.as_bytes();
871 let reader = NullBitmap::new(data, 5);
872
873 assert!(reader.is_null(0, 0));
874 assert!(!reader.is_null(0, 1));
875 assert!(reader.is_null(5, 3));
876 assert!(reader.is_null(9, 4));
877 assert!(!reader.is_null(9, 3));
878 }
879
880 #[test]
881 fn test_writer_reader_roundtrip() {
882 let schema = TbpSchema::new(
883 "test_table",
884 vec![
885 TbpColumn::new("id", TbpColumnType::Int64).not_null(),
886 TbpColumn::new("value", TbpColumnType::Float64),
887 ],
888 );
889
890 let mut writer = TbpWriter::new(schema.clone(), 100);
891
892 for i in 0..10 {
894 writer
895 .start_row()
896 .write_i64(i)
897 .write_f64(i as f64 * 1.5)
898 .finish();
899 }
900
901 let data = writer.finish();
902
903 let reader = TbpReader::new(&data, &schema).unwrap();
905 assert_eq!(reader.row_count(), 10);
906
907 let row = reader.get_row(5).unwrap();
908 assert_eq!(row.read_i64(0), Some(5));
909 assert_eq!(row.read_f64(1), Some(7.5));
910 }
911}