use crate::arrow::{ArrowBatchBuilder, ArrowValue};
use arrow_schema::{ArrowError, SchemaRef};
use thiserror::Error;
#[derive(Debug, Error)]
pub enum BridgeError {
#[error("arrow error: {0}")]
Arrow(#[from] ArrowError),
#[error("batch_size must be positive")]
InvalidBatchSize,
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct BridgeMetrics {
pub batch_size: usize,
pub pending_rows: usize,
pub completed_batches: usize,
pub completed_rows: usize,
pub total_rows: usize,
}
pub struct CollectArrowBridge {
builder: ArrowBatchBuilder,
batch_size: usize,
batches: Vec<arrow_array::RecordBatch>,
}
impl CollectArrowBridge {
pub fn new(schema: SchemaRef, batch_size: usize) -> Result<Self, BridgeError> {
if batch_size == 0 {
return Err(BridgeError::InvalidBatchSize);
}
Ok(Self {
builder: ArrowBatchBuilder::new(schema)?,
batch_size,
batches: Vec::new(),
})
}
pub fn push_row(&mut self, values: &[ArrowValue]) -> Result<(), BridgeError> {
self.builder.push_row(values)?;
if self.builder.len() >= self.batch_size {
self.flush()?;
}
Ok(())
}
pub fn flush(&mut self) -> Result<(), BridgeError> {
if !self.builder.is_empty() {
let batch = self.builder.finish()?;
self.batches.push(batch);
}
Ok(())
}
pub fn take_batches(&mut self) -> Result<Vec<arrow_array::RecordBatch>, BridgeError> {
self.flush()?;
Ok(std::mem::take(&mut self.batches))
}
pub fn drain_completed(&mut self) -> Vec<arrow_array::RecordBatch> {
std::mem::take(&mut self.batches)
}
pub fn restore_completed(&mut self, mut batches: Vec<arrow_array::RecordBatch>) {
if batches.is_empty() {
return;
}
batches.append(&mut self.batches);
self.batches = batches;
}
pub fn batches(&self) -> &[arrow_array::RecordBatch] {
&self.batches
}
pub fn pending_rows(&self) -> usize {
self.builder.len()
}
pub fn completed_batches(&self) -> usize {
self.batches.len()
}
pub fn completed_rows(&self) -> usize {
self.batches.iter().map(|b| b.num_rows()).sum()
}
pub fn batch_size(&self) -> usize {
self.batch_size
}
pub fn metrics(&self) -> BridgeMetrics {
let pending_rows = self.pending_rows();
let completed_rows = self.completed_rows();
BridgeMetrics {
batch_size: self.batch_size,
pending_rows,
completed_batches: self.completed_batches(),
completed_rows,
total_rows: pending_rows + completed_rows,
}
}
pub fn total_rows(&self) -> usize {
let flushed: usize = self.batches.iter().map(|b| b.num_rows()).sum();
flushed + self.builder.len()
}
}