rustsim-io 0.0.1

Arrow batch builders, CSV bridge, and ClickHouse writer for rustsim
Documentation
//! Arrow batch building utilities.
//!
//! Provides [`ArrowBatchBuilder`] for incrementally constructing Arrow
//! [`RecordBatch`] values from dynamically-typed [`ArrowValue`] rows,
//! and [`RecordBatchCollector`] for accumulating completed batches.

use arrow_array::builder::{
    ArrayBuilder, BooleanBuilder, Float64Builder, Int64Builder, StringBuilder,
};
use arrow_array::{ArrayRef, RecordBatch};
use arrow_schema::{ArrowError, DataType, Field, Schema, SchemaRef};
use std::sync::Arc;

/// A dynamically-typed cell value for one column of an Arrow row.
///
/// Used with [`ArrowBatchBuilder::push_row`] to append rows without
/// requiring a compile-time schema.
#[derive(Debug, Clone)]
pub enum ArrowValue {
    /// 64-bit signed integer.
    Int64(i64),
    /// 64-bit float.
    Float64(f64),
    /// Boolean.
    Boolean(bool),
    /// UTF-8 string.
    Utf8(String),
}

/// Incremental Arrow [`RecordBatch`] builder.
///
/// Rows are pushed one at a time via [`push_row`](Self::push_row). Call
/// [`finish`](Self::finish) to produce a `RecordBatch` and reset the builder.
///
/// The builder validates that each row's length matches the schema.
pub struct ArrowBatchBuilder {
    schema: SchemaRef,
    builders: Vec<Box<dyn ArrayBuilder>>,
    len: usize,
}

impl ArrowBatchBuilder {
    /// Create a new builder for the given schema.
    ///
    /// Returns an error if the schema contains unsupported data types.
    pub fn new(schema: SchemaRef) -> Result<Self, ArrowError> {
        let builders = build_builders(&schema)?;
        Ok(Self {
            schema,
            builders,
            len: 0,
        })
    }

    /// The schema this builder produces.
    pub fn schema(&self) -> &SchemaRef {
        &self.schema
    }

    /// Number of rows appended since the last `finish`.
    pub fn len(&self) -> usize {
        self.len
    }

    /// Returns `true` if no rows have been appended.
    pub fn is_empty(&self) -> bool {
        self.len == 0
    }

    /// Append a row of values.
    ///
    /// Returns [`ArrowError::SchemaError`] if the row length doesn't match
    /// the schema, or if a value type doesn't match its column's data type.
    pub fn push_row(&mut self, values: &[ArrowValue]) -> Result<(), ArrowError> {
        if values.len() != self.builders.len() {
            return Err(ArrowError::SchemaError(
                "row length does not match schema".to_string(),
            ));
        }

        for (value, builder) in values.iter().zip(self.builders.iter_mut()) {
            append_value(builder.as_mut(), value)?;
        }

        self.len += 1;
        Ok(())
    }

    /// Consume the accumulated rows and produce a [`RecordBatch`].
    ///
    /// The builder is reset and can be reused for the next batch.
    pub fn finish(&mut self) -> Result<RecordBatch, ArrowError> {
        let arrays = self
            .builders
            .iter_mut()
            .map(|builder| builder.finish())
            .collect::<Vec<ArrayRef>>();

        let batch = RecordBatch::try_new(self.schema.clone(), arrays)?;
        self.builders = build_builders(&self.schema)?;
        self.len = 0;
        Ok(batch)
    }
}

/// Accumulator for completed [`RecordBatch`] values.
#[derive(Debug, Default)]
pub struct RecordBatchCollector {
    batches: Vec<RecordBatch>,
}

impl RecordBatchCollector {
    /// Create an empty collector.
    pub fn new() -> Self {
        Self {
            batches: Vec::new(),
        }
    }

    /// Add a batch to the collector.
    pub fn push(&mut self, batch: RecordBatch) {
        self.batches.push(batch);
    }

    /// Take all collected batches, leaving the collector empty.
    pub fn take(&mut self) -> Vec<RecordBatch> {
        std::mem::take(&mut self.batches)
    }

    /// Borrow the collected batches.
    pub fn batches(&self) -> &[RecordBatch] {
        &self.batches
    }
}

/// Create an Arrow schema from a list of fields.
pub fn schema_from_fields(fields: Vec<Field>) -> SchemaRef {
    Arc::new(Schema::new(fields))
}

fn build_builders(schema: &SchemaRef) -> Result<Vec<Box<dyn ArrayBuilder>>, ArrowError> {
    schema
        .fields()
        .iter()
        .map(|field| builder_for_field(field))
        .collect()
}

fn builder_for_field(field: &Field) -> Result<Box<dyn ArrayBuilder>, ArrowError> {
    match field.data_type() {
        DataType::Int64 => Ok(Box::new(Int64Builder::new())),
        DataType::Float64 => Ok(Box::new(Float64Builder::new())),
        DataType::Boolean => Ok(Box::new(BooleanBuilder::new())),
        DataType::Utf8 => Ok(Box::new(StringBuilder::new())),
        other => Err(ArrowError::SchemaError(format!(
            "unsupported data type {other:?}"
        ))),
    }
}

fn append_value(builder: &mut dyn ArrayBuilder, value: &ArrowValue) -> Result<(), ArrowError> {
    if let (Some(builder), ArrowValue::Int64(value)) =
        (builder.as_any_mut().downcast_mut::<Int64Builder>(), value)
    {
        builder.append_value(*value);
        return Ok(());
    }

    if let (Some(builder), ArrowValue::Float64(value)) =
        (builder.as_any_mut().downcast_mut::<Float64Builder>(), value)
    {
        builder.append_value(*value);
        return Ok(());
    }

    if let (Some(builder), ArrowValue::Boolean(value)) =
        (builder.as_any_mut().downcast_mut::<BooleanBuilder>(), value)
    {
        builder.append_value(*value);
        return Ok(());
    }

    if let (Some(builder), ArrowValue::Utf8(value)) =
        (builder.as_any_mut().downcast_mut::<StringBuilder>(), value)
    {
        builder.append_value(value);
        return Ok(());
    }

    Err(ArrowError::SchemaError(
        "value does not match builder type".to_string(),
    ))
}