use std::io::{self, Write};
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
pub const TBP_MAGIC: u32 = 0x544F_4F4E;
pub const TBP_VERSION: u16 = 1;
pub const TBP_HEADER_SIZE: usize = 32;
#[derive(Debug, Clone, Copy, Default)]
pub struct TbpFlags(pub u16);
impl TbpFlags {
pub const HAS_NULLS: u16 = 1 << 0;
pub const HAS_ROW_INDEX: u16 = 1 << 1;
pub const COMPRESSED: u16 = 1 << 2;
pub const EMBEDDED_SCHEMA: u16 = 1 << 3;
pub fn has_nulls(&self) -> bool {
self.0 & Self::HAS_NULLS != 0
}
pub fn has_row_index(&self) -> bool {
self.0 & Self::HAS_ROW_INDEX != 0
}
pub fn is_compressed(&self) -> bool {
self.0 & Self::COMPRESSED != 0
}
pub fn has_embedded_schema(&self) -> bool {
self.0 & Self::EMBEDDED_SCHEMA != 0
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum TbpColumnType {
Null = 0,
Bool = 1,
Int8 = 2,
UInt8 = 3,
Int16 = 4,
UInt16 = 5,
Int32 = 6,
UInt32 = 7,
Int64 = 8,
UInt64 = 9,
Float32 = 10,
Float64 = 11,
String = 12,
Binary = 13,
Timestamp = 14,
FixedBinary = 15,
}
impl TbpColumnType {
pub fn fixed_size(&self) -> Option<usize> {
match self {
TbpColumnType::Null => Some(0),
TbpColumnType::Bool => Some(1),
TbpColumnType::Int8 | TbpColumnType::UInt8 => Some(1),
TbpColumnType::Int16 | TbpColumnType::UInt16 => Some(2),
TbpColumnType::Int32 | TbpColumnType::UInt32 | TbpColumnType::Float32 => Some(4),
TbpColumnType::Int64 | TbpColumnType::UInt64 | TbpColumnType::Float64 | TbpColumnType::Timestamp => Some(8),
TbpColumnType::String | TbpColumnType::Binary => None,
TbpColumnType::FixedBinary => None, }
}
pub fn is_variable(&self) -> bool {
self.fixed_size().is_none()
}
pub fn from_byte(b: u8) -> Option<Self> {
match b {
0 => Some(Self::Null),
1 => Some(Self::Bool),
2 => Some(Self::Int8),
3 => Some(Self::UInt8),
4 => Some(Self::Int16),
5 => Some(Self::UInt16),
6 => Some(Self::Int32),
7 => Some(Self::UInt32),
8 => Some(Self::Int64),
9 => Some(Self::UInt64),
10 => Some(Self::Float32),
11 => Some(Self::Float64),
12 => Some(Self::String),
13 => Some(Self::Binary),
14 => Some(Self::Timestamp),
15 => Some(Self::FixedBinary),
_ => None,
}
}
}
#[derive(Debug, Clone)]
pub struct TbpColumn {
pub name: String,
pub col_type: TbpColumnType,
pub fixed_size: Option<u16>,
pub nullable: bool,
}
impl TbpColumn {
pub fn new(name: impl Into<String>, col_type: TbpColumnType) -> Self {
Self {
name: name.into(),
col_type,
fixed_size: None,
nullable: true,
}
}
pub fn with_fixed_size(mut self, size: u16) -> Self {
self.fixed_size = Some(size);
self
}
pub fn not_null(mut self) -> Self {
self.nullable = false;
self
}
}
#[derive(Debug, Clone)]
pub struct TbpSchema {
pub name: String,
pub columns: Vec<TbpColumn>,
pub schema_id: u64,
}
impl TbpSchema {
pub fn new(name: impl Into<String>, columns: Vec<TbpColumn>) -> Self {
let name = name.into();
let schema_id = Self::compute_schema_id(&name, &columns);
Self {
name,
columns,
schema_id,
}
}
fn compute_schema_id(name: &str, columns: &[TbpColumn]) -> u64 {
use std::hash::{Hash, Hasher};
use std::collections::hash_map::DefaultHasher;
let mut hasher = DefaultHasher::new();
name.hash(&mut hasher);
for col in columns {
col.name.hash(&mut hasher);
(col.col_type as u8).hash(&mut hasher);
col.fixed_size.hash(&mut hasher);
col.nullable.hash(&mut hasher);
}
hasher.finish()
}
pub fn has_variable_columns(&self) -> bool {
self.columns.iter().any(|c| c.col_type.is_variable())
}
pub fn has_nullable_columns(&self) -> bool {
self.columns.iter().any(|c| c.nullable)
}
pub fn fixed_row_size(&self) -> Option<usize> {
if self.has_variable_columns() {
return None;
}
let mut size = 0;
for col in &self.columns {
match col.col_type {
TbpColumnType::FixedBinary => {
size += col.fixed_size.unwrap_or(0) as usize;
}
_ => {
size += col.col_type.fixed_size()?;
}
}
}
Some(size)
}
}
#[derive(Debug, Clone)]
pub struct TbpHeader {
pub magic: u32,
pub version: u16,
pub flags: TbpFlags,
pub schema_id: u64,
pub row_count: u32,
pub column_count: u16,
pub reserved: u16,
pub null_bitmap_offset: u32,
pub row_index_offset: u32,
}
impl TbpHeader {
pub fn write<W: Write>(&self, w: &mut W) -> io::Result<()> {
w.write_u32::<LittleEndian>(self.magic)?;
w.write_u16::<LittleEndian>(self.version)?;
w.write_u16::<LittleEndian>(self.flags.0)?;
w.write_u64::<LittleEndian>(self.schema_id)?;
w.write_u32::<LittleEndian>(self.row_count)?;
w.write_u16::<LittleEndian>(self.column_count)?;
w.write_u16::<LittleEndian>(self.reserved)?;
w.write_u32::<LittleEndian>(self.null_bitmap_offset)?;
w.write_u32::<LittleEndian>(self.row_index_offset)?;
Ok(())
}
pub fn read(data: &[u8]) -> io::Result<Self> {
if data.len() < TBP_HEADER_SIZE {
return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "Header too short"));
}
let mut cursor = std::io::Cursor::new(data);
let magic = cursor.read_u32::<LittleEndian>()?;
if magic != TBP_MAGIC {
return Err(io::Error::new(io::ErrorKind::InvalidData, "Invalid TBP magic"));
}
let header = Self {
magic,
version: cursor.read_u16::<LittleEndian>()?,
flags: TbpFlags(cursor.read_u16::<LittleEndian>()?),
schema_id: cursor.read_u64::<LittleEndian>()?,
row_count: cursor.read_u32::<LittleEndian>()?,
column_count: cursor.read_u16::<LittleEndian>()?,
reserved: cursor.read_u16::<LittleEndian>()?,
null_bitmap_offset: cursor.read_u32::<LittleEndian>()?,
row_index_offset: cursor.read_u32::<LittleEndian>()?,
};
let data_len = data.len() as u64;
if data_len > TBP_HEADER_SIZE as u64 {
if header.null_bitmap_offset != 0 && (header.null_bitmap_offset as u64) >= data_len {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"null_bitmap_offset ({}) exceeds data length ({})",
header.null_bitmap_offset, data_len
),
));
}
if header.row_index_offset != 0 && (header.row_index_offset as u64) >= data_len {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"row_index_offset ({}) exceeds data length ({})",
header.row_index_offset, data_len
),
));
}
}
Ok(header)
}
}
#[derive(Debug, Clone, Copy)]
pub struct NullBitmap<'a> {
data: &'a [u8],
columns: usize,
}
impl<'a> NullBitmap<'a> {
pub fn new(data: &'a [u8], columns: usize) -> Self {
Self { data, columns }
}
#[inline]
pub fn is_null(&self, row: usize, col: usize) -> bool {
let bit_idx = row * self.columns + col;
let byte_idx = bit_idx / 8;
let bit_pos = bit_idx % 8;
if byte_idx >= self.data.len() {
return false;
}
self.data[byte_idx] & (1 << bit_pos) != 0
}
pub fn required_size(rows: usize, cols: usize) -> usize {
(rows * cols + 7) / 8
}
}
pub struct NullBitmapMut {
data: Vec<u8>,
columns: usize,
}
impl NullBitmapMut {
pub fn new(rows: usize, columns: usize) -> Self {
let size = NullBitmap::required_size(rows, columns);
Self {
data: vec![0; size],
columns,
}
}
#[inline]
pub fn set_null(&mut self, row: usize, col: usize) {
let bit_idx = row * self.columns + col;
let byte_idx = bit_idx / 8;
let bit_pos = bit_idx % 8;
if byte_idx < self.data.len() {
self.data[byte_idx] |= 1 << bit_pos;
}
}
pub fn as_bytes(&self) -> &[u8] {
&self.data
}
pub fn into_bytes(self) -> Vec<u8> {
self.data
}
}
#[derive(Debug, Clone)]
pub struct RowView<'a> {
schema: &'a TbpSchema,
data: &'a [u8],
null_bitmap: Option<&'a NullBitmap<'a>>,
row_idx: usize,
}
impl<'a> RowView<'a> {
pub fn new(
schema: &'a TbpSchema,
data: &'a [u8],
null_bitmap: Option<&'a NullBitmap<'a>>,
row_idx: usize,
) -> Self {
Self {
schema,
data,
null_bitmap,
row_idx,
}
}
#[inline]
pub fn is_null(&self, col: usize) -> bool {
self.null_bitmap
.map(|b| b.is_null(self.row_idx, col))
.unwrap_or(false)
}
fn column_offset(&self, col: usize) -> usize {
let mut offset = 0;
for c in &self.schema.columns[..col] {
offset += match c.col_type {
TbpColumnType::FixedBinary => c.fixed_size.unwrap_or(0) as usize,
_ => c.col_type.fixed_size().unwrap_or(0),
};
}
offset
}
pub fn read_bool(&self, col: usize) -> Option<bool> {
if self.is_null(col) {
return None;
}
let offset = self.column_offset(col);
Some(self.data.get(offset).copied().unwrap_or(0) != 0)
}
pub fn read_i64(&self, col: usize) -> Option<i64> {
if self.is_null(col) {
return None;
}
let offset = self.column_offset(col);
if offset + 8 > self.data.len() {
return None;
}
let bytes: [u8; 8] = self.data[offset..offset + 8].try_into().ok()?;
Some(i64::from_le_bytes(bytes))
}
pub fn read_u64(&self, col: usize) -> Option<u64> {
if self.is_null(col) {
return None;
}
let offset = self.column_offset(col);
if offset + 8 > self.data.len() {
return None;
}
let bytes: [u8; 8] = self.data[offset..offset + 8].try_into().ok()?;
Some(u64::from_le_bytes(bytes))
}
pub fn read_f64(&self, col: usize) -> Option<f64> {
if self.is_null(col) {
return None;
}
let offset = self.column_offset(col);
if offset + 8 > self.data.len() {
return None;
}
let bytes: [u8; 8] = self.data[offset..offset + 8].try_into().ok()?;
Some(f64::from_le_bytes(bytes))
}
pub fn read_i32(&self, col: usize) -> Option<i32> {
if self.is_null(col) {
return None;
}
let offset = self.column_offset(col);
if offset + 4 > self.data.len() {
return None;
}
let bytes: [u8; 4] = self.data[offset..offset + 4].try_into().ok()?;
Some(i32::from_le_bytes(bytes))
}
pub fn read_f32(&self, col: usize) -> Option<f32> {
if self.is_null(col) {
return None;
}
let offset = self.column_offset(col);
if offset + 4 > self.data.len() {
return None;
}
let bytes: [u8; 4] = self.data[offset..offset + 4].try_into().ok()?;
Some(f32::from_le_bytes(bytes))
}
pub fn raw_data(&self) -> &[u8] {
self.data
}
}
pub struct TbpWriter {
schema: TbpSchema,
null_bitmap: NullBitmapMut,
row_index: Vec<u32>,
data: Vec<u8>,
row_count: usize,
}
impl TbpWriter {
pub fn new(schema: TbpSchema, estimated_rows: usize) -> Self {
Self {
null_bitmap: NullBitmapMut::new(estimated_rows, schema.columns.len()),
row_index: Vec::with_capacity(estimated_rows),
data: Vec::with_capacity(estimated_rows * schema.fixed_row_size().unwrap_or(64)),
row_count: 0,
schema,
}
}
pub fn start_row(&mut self) -> TbpRowWriter<'_> {
let offset = self.data.len() as u32;
self.row_index.push(offset);
TbpRowWriter {
writer: self,
col_idx: 0,
}
}
fn set_null(&mut self, row: usize, col: usize) {
self.null_bitmap.set_null(row, col);
}
pub fn finish(self) -> Vec<u8> {
let has_nulls = self.schema.has_nullable_columns();
let has_variable = self.schema.has_variable_columns();
let mut flags = TbpFlags(0);
if has_nulls {
flags.0 |= TbpFlags::HAS_NULLS;
}
if has_variable {
flags.0 |= TbpFlags::HAS_ROW_INDEX;
}
let null_bitmap_offset = if has_nulls { TBP_HEADER_SIZE as u32 } else { 0 };
let null_bitmap_size = if has_nulls {
NullBitmap::required_size(self.row_count, self.schema.columns.len())
} else {
0
};
let row_index_offset = if has_variable {
(TBP_HEADER_SIZE + null_bitmap_size) as u32
} else {
0
};
let row_index_size = if has_variable {
self.row_count * 4
} else {
0
};
let data_offset = TBP_HEADER_SIZE + null_bitmap_size + row_index_size;
let header = TbpHeader {
magic: TBP_MAGIC,
version: TBP_VERSION,
flags,
schema_id: self.schema.schema_id,
row_count: self.row_count as u32,
column_count: self.schema.columns.len() as u16,
reserved: 0,
null_bitmap_offset,
row_index_offset,
};
let total_size = data_offset + self.data.len();
let mut buffer = Vec::with_capacity(total_size);
header.write(&mut buffer).unwrap();
if has_nulls {
let required = NullBitmap::required_size(self.row_count, self.schema.columns.len());
buffer.extend_from_slice(&self.null_bitmap.as_bytes()[..required]);
}
if has_variable {
for offset in &self.row_index {
buffer.write_u32::<LittleEndian>(*offset + data_offset as u32).unwrap();
}
}
buffer.extend_from_slice(&self.data);
buffer
}
}
pub struct TbpRowWriter<'a> {
writer: &'a mut TbpWriter,
col_idx: usize,
}
impl<'a> TbpRowWriter<'a> {
pub fn write_null(mut self) -> Self {
self.writer.set_null(self.writer.row_count, self.col_idx);
self.col_idx += 1;
self
}
pub fn write_bool(mut self, value: bool) -> Self {
self.writer.data.push(if value { 1 } else { 0 });
self.col_idx += 1;
self
}
pub fn write_i64(mut self, value: i64) -> Self {
self.writer.data.extend_from_slice(&value.to_le_bytes());
self.col_idx += 1;
self
}
pub fn write_u64(mut self, value: u64) -> Self {
self.writer.data.extend_from_slice(&value.to_le_bytes());
self.col_idx += 1;
self
}
pub fn write_f64(mut self, value: f64) -> Self {
self.writer.data.extend_from_slice(&value.to_le_bytes());
self.col_idx += 1;
self
}
pub fn write_i32(mut self, value: i32) -> Self {
self.writer.data.extend_from_slice(&value.to_le_bytes());
self.col_idx += 1;
self
}
pub fn write_f32(mut self, value: f32) -> Self {
self.writer.data.extend_from_slice(&value.to_le_bytes());
self.col_idx += 1;
self
}
pub fn write_string(mut self, value: &str) -> Self {
let bytes = value.as_bytes();
self.writer.data.write_u32::<LittleEndian>(bytes.len() as u32).unwrap();
self.writer.data.extend_from_slice(bytes);
self.col_idx += 1;
self
}
pub fn write_binary(mut self, value: &[u8]) -> Self {
self.writer.data.write_u32::<LittleEndian>(value.len() as u32).unwrap();
self.writer.data.extend_from_slice(value);
self.col_idx += 1;
self
}
pub fn finish(self) {
self.writer.row_count += 1;
}
}
pub struct TbpReader<'a> {
data: &'a [u8],
header: TbpHeader,
schema: &'a TbpSchema,
}
impl<'a> TbpReader<'a> {
pub fn new(data: &'a [u8], schema: &'a TbpSchema) -> io::Result<Self> {
let header = TbpHeader::read(data)?;
if header.schema_id != schema.schema_id {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Schema ID mismatch",
));
}
Ok(Self {
data,
header,
schema,
})
}
pub fn row_count(&self) -> usize {
self.header.row_count as usize
}
pub fn get_row(&self, row: usize) -> Option<RowView<'_>> {
if row >= self.row_count() {
return None;
}
let row_offset = if self.header.flags.has_row_index() {
let idx_offset = self.header.row_index_offset as usize + row * 4;
if idx_offset + 4 > self.data.len() {
return None;
}
let bytes: [u8; 4] = self.data[idx_offset..idx_offset + 4].try_into().ok()?;
u32::from_le_bytes(bytes) as usize
} else {
let row_size = self.schema.fixed_row_size()?;
let null_bitmap_size = if self.header.flags.has_nulls() {
NullBitmap::required_size(self.row_count(), self.schema.columns.len())
} else {
0
};
TBP_HEADER_SIZE + null_bitmap_size + row * row_size
};
let row_data = &self.data[row_offset..];
Some(RowView::new(self.schema, row_data, None, row))
}
pub fn iter(&'a self) -> impl Iterator<Item = RowView<'a>> {
(0..self.row_count()).filter_map(move |i| self.get_row(i))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_header_roundtrip() {
let header = TbpHeader {
magic: TBP_MAGIC,
version: TBP_VERSION,
flags: TbpFlags(TbpFlags::HAS_NULLS | TbpFlags::HAS_ROW_INDEX),
schema_id: 12345678,
row_count: 100,
column_count: 5,
reserved: 0,
null_bitmap_offset: 32,
row_index_offset: 48,
};
let mut buffer = Vec::new();
header.write(&mut buffer).unwrap();
assert_eq!(buffer.len(), TBP_HEADER_SIZE);
let parsed = TbpHeader::read(&buffer).unwrap();
assert_eq!(parsed.magic, TBP_MAGIC);
assert_eq!(parsed.version, TBP_VERSION);
assert_eq!(parsed.row_count, 100);
assert_eq!(parsed.column_count, 5);
}
#[test]
fn test_null_bitmap() {
let mut bitmap = NullBitmapMut::new(10, 5);
bitmap.set_null(0, 0);
bitmap.set_null(5, 3);
bitmap.set_null(9, 4);
let data = bitmap.as_bytes();
let reader = NullBitmap::new(data, 5);
assert!(reader.is_null(0, 0));
assert!(!reader.is_null(0, 1));
assert!(reader.is_null(5, 3));
assert!(reader.is_null(9, 4));
assert!(!reader.is_null(9, 3));
}
#[test]
fn test_writer_reader_roundtrip() {
let schema = TbpSchema::new(
"test_table",
vec![
TbpColumn::new("id", TbpColumnType::Int64).not_null(),
TbpColumn::new("value", TbpColumnType::Float64),
],
);
let mut writer = TbpWriter::new(schema.clone(), 100);
for i in 0..10 {
writer
.start_row()
.write_i64(i)
.write_f64(i as f64 * 1.5)
.finish();
}
let data = writer.finish();
let reader = TbpReader::new(&data, &schema).unwrap();
assert_eq!(reader.row_count(), 10);
let row = reader.get_row(5).unwrap();
assert_eq!(row.read_i64(0), Some(5));
assert_eq!(row.read_f64(1), Some(7.5));
}
}