rustsim-io 0.0.1

Arrow batch builders, CSV bridge, and ClickHouse writer for rustsim
Documentation
//! Bridge between data collection and Arrow batch building.
//!
//! [`CollectArrowBridge`] wraps an [`ArrowBatchBuilder`] and auto-flushes
//! completed batches when the configured `batch_size` is reached. This
//! decouples the per-row collection loop from batch management.
//!
//! [`ArrowBatchBuilder`]: crate::arrow::ArrowBatchBuilder

use crate::arrow::{ArrowBatchBuilder, ArrowValue};
use arrow_schema::{ArrowError, SchemaRef};
use thiserror::Error;

/// Errors that can occur when configuring or flushing a [`CollectArrowBridge`].
#[derive(Debug, Error)]
pub enum BridgeError {
    /// Arrow serialization or schema error.
    #[error("arrow error: {0}")]
    Arrow(#[from] ArrowError),
    /// Batch size must be strictly positive.
    #[error("batch_size must be positive")]
    InvalidBatchSize,
}

/// Lightweight batching metrics for [`CollectArrowBridge`].
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct BridgeMetrics {
    /// Configured auto-flush batch size.
    pub batch_size: usize,
    /// Number of rows currently buffered but not yet flushed.
    pub pending_rows: usize,
    /// Number of completed batches waiting to be drained.
    pub completed_batches: usize,
    /// Number of rows contained in completed batches waiting to be drained.
    pub completed_rows: usize,
    /// Total rows tracked by the bridge (`pending_rows + completed_rows`).
    pub total_rows: usize,
}

/// Auto-flushing bridge from row-level data collection to Arrow batches.
///
/// Rows are pushed via [`push_row`](Self::push_row). When the internal builder
/// reaches `batch_size`, it is automatically flushed to the internal batch
/// buffer. Call [`take_batches`](Self::take_batches) to retrieve all completed
/// batches (including any partial data), or [`drain_completed`](Self::drain_completed)
/// to retrieve only full batches without flushing partial data.
pub struct CollectArrowBridge {
    builder: ArrowBatchBuilder,
    batch_size: usize,
    batches: Vec<arrow_array::RecordBatch>,
}

impl CollectArrowBridge {
    /// Create a new bridge.
    ///
    /// # Arguments
    ///
    /// - `schema` - Arrow schema for the batches.
    /// - `batch_size` - number of rows per batch before auto-flush.
    pub fn new(schema: SchemaRef, batch_size: usize) -> Result<Self, BridgeError> {
        if batch_size == 0 {
            return Err(BridgeError::InvalidBatchSize);
        }
        Ok(Self {
            builder: ArrowBatchBuilder::new(schema)?,
            batch_size,
            batches: Vec::new(),
        })
    }

    /// Append a row. Auto-flushes to a new batch when `batch_size` is reached.
    pub fn push_row(&mut self, values: &[ArrowValue]) -> Result<(), BridgeError> {
        self.builder.push_row(values)?;
        if self.builder.len() >= self.batch_size {
            self.flush()?;
        }
        Ok(())
    }

    /// Flush any partial data in the builder to a batch.
    pub fn flush(&mut self) -> Result<(), BridgeError> {
        if !self.builder.is_empty() {
            let batch = self.builder.finish()?;
            self.batches.push(batch);
        }
        Ok(())
    }

    /// Flush and return all accumulated batches (including partial data).
    pub fn take_batches(&mut self) -> Result<Vec<arrow_array::RecordBatch>, BridgeError> {
        self.flush()?;
        Ok(std::mem::take(&mut self.batches))
    }

    /// Take only batches that have already been auto-flushed, without flushing partial data.
    pub fn drain_completed(&mut self) -> Vec<arrow_array::RecordBatch> {
        std::mem::take(&mut self.batches)
    }

    /// Restore completed batches back into the bridge in their original order.
    ///
    /// This is used when a downstream delivery stage fails after batches have
    /// already been drained from the bridge but before they were durably handed
    /// off to the next stage.
    pub fn restore_completed(&mut self, mut batches: Vec<arrow_array::RecordBatch>) {
        if batches.is_empty() {
            return;
        }
        batches.append(&mut self.batches);
        self.batches = batches;
    }

    /// Borrow the accumulated (auto-flushed) batches.
    pub fn batches(&self) -> &[arrow_array::RecordBatch] {
        &self.batches
    }

    /// Number of rows currently buffered but not yet flushed.
    pub fn pending_rows(&self) -> usize {
        self.builder.len()
    }

    /// Number of completed batches waiting to be drained.
    pub fn completed_batches(&self) -> usize {
        self.batches.len()
    }

    /// Number of rows contained in completed batches waiting to be drained.
    pub fn completed_rows(&self) -> usize {
        self.batches.iter().map(|b| b.num_rows()).sum()
    }

    /// Configured auto-flush batch size.
    pub fn batch_size(&self) -> usize {
        self.batch_size
    }

    /// Snapshot of current bridge metrics.
    pub fn metrics(&self) -> BridgeMetrics {
        let pending_rows = self.pending_rows();
        let completed_rows = self.completed_rows();
        BridgeMetrics {
            batch_size: self.batch_size,
            pending_rows,
            completed_batches: self.completed_batches(),
            completed_rows,
            total_rows: pending_rows + completed_rows,
        }
    }

    /// Total row count (flushed + pending).
    pub fn total_rows(&self) -> usize {
        let flushed: usize = self.batches.iter().map(|b| b.num_rows()).sum();
        flushed + self.builder.len()
    }
}