rustsim_io/bridge.rs
1//! Bridge between data collection and Arrow batch building.
2//!
3//! [`CollectArrowBridge`] wraps an [`ArrowBatchBuilder`] and auto-flushes
4//! completed batches when the configured `batch_size` is reached. This
5//! decouples the per-row collection loop from batch management.
6//!
7//! [`ArrowBatchBuilder`]: crate::arrow::ArrowBatchBuilder
8
9use crate::arrow::{ArrowBatchBuilder, ArrowValue};
10use arrow_schema::{ArrowError, SchemaRef};
11use thiserror::Error;
12
13/// Errors that can occur when configuring or flushing a [`CollectArrowBridge`].
14#[derive(Debug, Error)]
15pub enum BridgeError {
16 /// Arrow serialization or schema error.
17 #[error("arrow error: {0}")]
18 Arrow(#[from] ArrowError),
19 /// Batch size must be strictly positive.
20 #[error("batch_size must be positive")]
21 InvalidBatchSize,
22}
23
24/// Lightweight batching metrics for [`CollectArrowBridge`].
25#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
26pub struct BridgeMetrics {
27 /// Configured auto-flush batch size.
28 pub batch_size: usize,
29 /// Number of rows currently buffered but not yet flushed.
30 pub pending_rows: usize,
31 /// Number of completed batches waiting to be drained.
32 pub completed_batches: usize,
33 /// Number of rows contained in completed batches waiting to be drained.
34 pub completed_rows: usize,
35 /// Total rows tracked by the bridge (`pending_rows + completed_rows`).
36 pub total_rows: usize,
37}
38
39/// Auto-flushing bridge from row-level data collection to Arrow batches.
40///
41/// Rows are pushed via [`push_row`](Self::push_row). When the internal builder
42/// reaches `batch_size`, it is automatically flushed to the internal batch
43/// buffer. Call [`take_batches`](Self::take_batches) to retrieve all completed
44/// batches (including any partial data), or [`drain_completed`](Self::drain_completed)
45/// to retrieve only full batches without flushing partial data.
46pub struct CollectArrowBridge {
47 builder: ArrowBatchBuilder,
48 batch_size: usize,
49 batches: Vec<arrow_array::RecordBatch>,
50}
51
52impl CollectArrowBridge {
53 /// Create a new bridge.
54 ///
55 /// # Arguments
56 ///
57 /// - `schema` - Arrow schema for the batches.
58 /// - `batch_size` - number of rows per batch before auto-flush.
59 pub fn new(schema: SchemaRef, batch_size: usize) -> Result<Self, BridgeError> {
60 if batch_size == 0 {
61 return Err(BridgeError::InvalidBatchSize);
62 }
63 Ok(Self {
64 builder: ArrowBatchBuilder::new(schema)?,
65 batch_size,
66 batches: Vec::new(),
67 })
68 }
69
70 /// Append a row. Auto-flushes to a new batch when `batch_size` is reached.
71 pub fn push_row(&mut self, values: &[ArrowValue]) -> Result<(), BridgeError> {
72 self.builder.push_row(values)?;
73 if self.builder.len() >= self.batch_size {
74 self.flush()?;
75 }
76 Ok(())
77 }
78
79 /// Flush any partial data in the builder to a batch.
80 pub fn flush(&mut self) -> Result<(), BridgeError> {
81 if !self.builder.is_empty() {
82 let batch = self.builder.finish()?;
83 self.batches.push(batch);
84 }
85 Ok(())
86 }
87
88 /// Flush and return all accumulated batches (including partial data).
89 pub fn take_batches(&mut self) -> Result<Vec<arrow_array::RecordBatch>, BridgeError> {
90 self.flush()?;
91 Ok(std::mem::take(&mut self.batches))
92 }
93
94 /// Take only batches that have already been auto-flushed, without flushing partial data.
95 pub fn drain_completed(&mut self) -> Vec<arrow_array::RecordBatch> {
96 std::mem::take(&mut self.batches)
97 }
98
99 /// Restore completed batches back into the bridge in their original order.
100 ///
101 /// This is used when a downstream delivery stage fails after batches have
102 /// already been drained from the bridge but before they were durably handed
103 /// off to the next stage.
104 pub fn restore_completed(&mut self, mut batches: Vec<arrow_array::RecordBatch>) {
105 if batches.is_empty() {
106 return;
107 }
108 batches.append(&mut self.batches);
109 self.batches = batches;
110 }
111
112 /// Borrow the accumulated (auto-flushed) batches.
113 pub fn batches(&self) -> &[arrow_array::RecordBatch] {
114 &self.batches
115 }
116
117 /// Number of rows currently buffered but not yet flushed.
118 pub fn pending_rows(&self) -> usize {
119 self.builder.len()
120 }
121
122 /// Number of completed batches waiting to be drained.
123 pub fn completed_batches(&self) -> usize {
124 self.batches.len()
125 }
126
127 /// Number of rows contained in completed batches waiting to be drained.
128 pub fn completed_rows(&self) -> usize {
129 self.batches.iter().map(|b| b.num_rows()).sum()
130 }
131
132 /// Configured auto-flush batch size.
133 pub fn batch_size(&self) -> usize {
134 self.batch_size
135 }
136
137 /// Snapshot of current bridge metrics.
138 pub fn metrics(&self) -> BridgeMetrics {
139 let pending_rows = self.pending_rows();
140 let completed_rows = self.completed_rows();
141 BridgeMetrics {
142 batch_size: self.batch_size,
143 pending_rows,
144 completed_batches: self.completed_batches(),
145 completed_rows,
146 total_rows: pending_rows + completed_rows,
147 }
148 }
149
150 /// Total row count (flushed + pending).
151 pub fn total_rows(&self) -> usize {
152 let flushed: usize = self.batches.iter().map(|b| b.num_rows()).sum();
153 flushed + self.builder.len()
154 }
155}