Skip to main content

rustsim_io/
bridge.rs

1//! Bridge between data collection and Arrow batch building.
2//!
3//! [`CollectArrowBridge`] wraps an [`ArrowBatchBuilder`] and auto-flushes
4//! completed batches when the configured `batch_size` is reached. This
5//! decouples the per-row collection loop from batch management.
6//!
7//! [`ArrowBatchBuilder`]: crate::arrow::ArrowBatchBuilder
8
9use crate::arrow::{ArrowBatchBuilder, ArrowValue};
10use arrow_schema::{ArrowError, SchemaRef};
11use thiserror::Error;
12
13/// Errors that can occur when configuring or flushing a [`CollectArrowBridge`].
14#[derive(Debug, Error)]
15pub enum BridgeError {
16    /// Arrow serialization or schema error.
17    #[error("arrow error: {0}")]
18    Arrow(#[from] ArrowError),
19    /// Batch size must be strictly positive.
20    #[error("batch_size must be positive")]
21    InvalidBatchSize,
22}
23
24/// Lightweight batching metrics for [`CollectArrowBridge`].
25#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
26pub struct BridgeMetrics {
27    /// Configured auto-flush batch size.
28    pub batch_size: usize,
29    /// Number of rows currently buffered but not yet flushed.
30    pub pending_rows: usize,
31    /// Number of completed batches waiting to be drained.
32    pub completed_batches: usize,
33    /// Number of rows contained in completed batches waiting to be drained.
34    pub completed_rows: usize,
35    /// Total rows tracked by the bridge (`pending_rows + completed_rows`).
36    pub total_rows: usize,
37}
38
39/// Auto-flushing bridge from row-level data collection to Arrow batches.
40///
41/// Rows are pushed via [`push_row`](Self::push_row). When the internal builder
42/// reaches `batch_size`, it is automatically flushed to the internal batch
43/// buffer. Call [`take_batches`](Self::take_batches) to retrieve all completed
44/// batches (including any partial data), or [`drain_completed`](Self::drain_completed)
45/// to retrieve only full batches without flushing partial data.
46pub struct CollectArrowBridge {
47    builder: ArrowBatchBuilder,
48    batch_size: usize,
49    batches: Vec<arrow_array::RecordBatch>,
50}
51
52impl CollectArrowBridge {
53    /// Create a new bridge.
54    ///
55    /// # Arguments
56    ///
57    /// - `schema` - Arrow schema for the batches.
58    /// - `batch_size` - number of rows per batch before auto-flush.
59    pub fn new(schema: SchemaRef, batch_size: usize) -> Result<Self, BridgeError> {
60        if batch_size == 0 {
61            return Err(BridgeError::InvalidBatchSize);
62        }
63        Ok(Self {
64            builder: ArrowBatchBuilder::new(schema)?,
65            batch_size,
66            batches: Vec::new(),
67        })
68    }
69
70    /// Append a row. Auto-flushes to a new batch when `batch_size` is reached.
71    pub fn push_row(&mut self, values: &[ArrowValue]) -> Result<(), BridgeError> {
72        self.builder.push_row(values)?;
73        if self.builder.len() >= self.batch_size {
74            self.flush()?;
75        }
76        Ok(())
77    }
78
79    /// Flush any partial data in the builder to a batch.
80    pub fn flush(&mut self) -> Result<(), BridgeError> {
81        if !self.builder.is_empty() {
82            let batch = self.builder.finish()?;
83            self.batches.push(batch);
84        }
85        Ok(())
86    }
87
88    /// Flush and return all accumulated batches (including partial data).
89    pub fn take_batches(&mut self) -> Result<Vec<arrow_array::RecordBatch>, BridgeError> {
90        self.flush()?;
91        Ok(std::mem::take(&mut self.batches))
92    }
93
94    /// Take only batches that have already been auto-flushed, without flushing partial data.
95    pub fn drain_completed(&mut self) -> Vec<arrow_array::RecordBatch> {
96        std::mem::take(&mut self.batches)
97    }
98
99    /// Restore completed batches back into the bridge in their original order.
100    ///
101    /// This is used when a downstream delivery stage fails after batches have
102    /// already been drained from the bridge but before they were durably handed
103    /// off to the next stage.
104    pub fn restore_completed(&mut self, mut batches: Vec<arrow_array::RecordBatch>) {
105        if batches.is_empty() {
106            return;
107        }
108        batches.append(&mut self.batches);
109        self.batches = batches;
110    }
111
112    /// Borrow the accumulated (auto-flushed) batches.
113    pub fn batches(&self) -> &[arrow_array::RecordBatch] {
114        &self.batches
115    }
116
117    /// Number of rows currently buffered but not yet flushed.
118    pub fn pending_rows(&self) -> usize {
119        self.builder.len()
120    }
121
122    /// Number of completed batches waiting to be drained.
123    pub fn completed_batches(&self) -> usize {
124        self.batches.len()
125    }
126
127    /// Number of rows contained in completed batches waiting to be drained.
128    pub fn completed_rows(&self) -> usize {
129        self.batches.iter().map(|b| b.num_rows()).sum()
130    }
131
132    /// Configured auto-flush batch size.
133    pub fn batch_size(&self) -> usize {
134        self.batch_size
135    }
136
137    /// Snapshot of current bridge metrics.
138    pub fn metrics(&self) -> BridgeMetrics {
139        let pending_rows = self.pending_rows();
140        let completed_rows = self.completed_rows();
141        BridgeMetrics {
142            batch_size: self.batch_size,
143            pending_rows,
144            completed_batches: self.completed_batches(),
145            completed_rows,
146            total_rows: pending_rows + completed_rows,
147        }
148    }
149
150    /// Total row count (flushed + pending).
151    pub fn total_rows(&self) -> usize {
152        let flushed: usize = self.batches.iter().map(|b| b.num_rows()).sum();
153        flushed + self.builder.len()
154    }
155}