use arrow_array::RecordBatch;
use iceberg::spec::Schema as IcebergSchema;
use super::arrow_convert::{ArrowToTbf, iceberg_schema_to_tbf};
use crate::tbf::TableSchema;
#[derive(Clone)]
pub struct TbfWriterConfig {
pub tbf_schema: TableSchema,
}
impl TbfWriterConfig {
pub fn from_iceberg_schema(schema: &IcebergSchema) -> Self {
Self {
tbf_schema: iceberg_schema_to_tbf(schema),
}
}
pub fn with_tbf_schema(tbf_schema: TableSchema) -> Self {
Self { tbf_schema }
}
}
pub struct TbfFileWriter {
tbf_schema: TableSchema,
buffer: Vec<u8>,
row_count: usize,
}
impl TbfFileWriter {
pub fn new(config: TbfWriterConfig) -> Self {
Self {
tbf_schema: config.tbf_schema,
buffer: Vec::new(),
row_count: 0,
}
}
pub fn write(&mut self, batch: &RecordBatch) {
let tbf_bytes = batch.encode_to_tbf(&self.tbf_schema);
if self.buffer.is_empty() {
self.buffer = tbf_bytes;
} else {
self.buffer.extend_from_slice(&tbf_bytes);
}
self.row_count += batch.num_rows();
}
pub fn row_count(&self) -> usize {
self.row_count
}
pub fn buffer_size(&self) -> usize {
self.buffer.len()
}
pub fn finish(self) -> TbfFileData {
TbfFileData {
data: self.buffer,
row_count: self.row_count,
}
}
}
pub struct TbfFileData {
pub data: Vec<u8>,
pub row_count: usize,
}
impl TbfFileData {
pub fn as_bytes(&self) -> &[u8] {
&self.data
}
pub fn into_bytes(self) -> Vec<u8> {
self.data
}
pub fn file_size(&self) -> usize {
self.data.len()
}
}
#[derive(Clone)]
pub struct TbfFileWriterBuilder {
config: Option<TbfWriterConfig>,
}
impl TbfFileWriterBuilder {
pub fn new() -> Self {
Self { config: None }
}
pub fn with_iceberg_schema(mut self, schema: &IcebergSchema) -> Self {
self.config = Some(TbfWriterConfig::from_iceberg_schema(schema));
self
}
pub fn with_tbf_schema(mut self, schema: TableSchema) -> Self {
self.config = Some(TbfWriterConfig::with_tbf_schema(schema));
self
}
pub fn build(self) -> TbfFileWriter {
let config = self.config.expect("Schema must be set before building");
TbfFileWriter::new(config)
}
}
impl Default for TbfFileWriterBuilder {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use arrow_array::{Int32Array, StringArray};
use arrow_schema::{DataType, Field, Schema as ArrowSchema};
use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
use std::sync::Arc;
#[test]
fn test_writer_builder() {
let iceberg_schema = Schema::builder()
.with_fields(vec![
Arc::new(NestedField::required(
1,
"id",
Type::Primitive(PrimitiveType::Int),
)),
Arc::new(NestedField::required(
2,
"name",
Type::Primitive(PrimitiveType::String),
)),
])
.build()
.unwrap();
let writer = TbfFileWriterBuilder::new()
.with_iceberg_schema(&iceberg_schema)
.build();
assert_eq!(writer.row_count(), 0);
assert_eq!(writer.buffer_size(), 0);
}
#[test]
fn test_write_and_finish() {
let arrow_schema = Arc::new(ArrowSchema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, false),
]));
let iceberg_schema = Schema::builder()
.with_fields(vec![
Arc::new(NestedField::required(
1,
"id",
Type::Primitive(PrimitiveType::Int),
)),
Arc::new(NestedField::required(
2,
"name",
Type::Primitive(PrimitiveType::String),
)),
])
.build()
.unwrap();
let mut writer = TbfFileWriterBuilder::new()
.with_iceberg_schema(&iceberg_schema)
.build();
let id_array = Int32Array::from(vec![1, 2, 3]);
let name_array = StringArray::from(vec!["Alice", "Bob", "Carol"]);
let batch =
RecordBatch::try_new(arrow_schema, vec![Arc::new(id_array), Arc::new(name_array)])
.unwrap();
writer.write(&batch);
assert_eq!(writer.row_count(), 3);
assert!(writer.buffer_size() > 0);
let data = writer.finish();
assert_eq!(data.row_count, 3);
assert!(data.file_size() > 0);
}
}