use arrow::array::RecordBatch;
use arrow::datatypes::Schema as ArrowSchema;
use std::sync::Arc;
use copybook_core::schema::FieldKind;
use crate::builders::{ColumnAccumulator, create_accumulator};
use crate::decode_direct::decode_record_to_columns;
use crate::options::ArrowOptions;
use crate::{ArrowError, Result};
pub struct RecordBatchBuilder {
arrow_schema: Arc<ArrowSchema>,
cobol_schema: copybook_core::Schema,
accumulators: Vec<Box<dyn ColumnAccumulator>>,
options: ArrowOptions,
current_count: usize,
}
impl RecordBatchBuilder {
#[inline]
pub fn new(
arrow_schema: Arc<ArrowSchema>,
cobol_schema: ©book_core::Schema,
options: &ArrowOptions,
) -> Result<Self> {
let accumulators = create_accumulators(cobol_schema, options)?;
Ok(Self {
arrow_schema,
cobol_schema: cobol_schema.clone(),
accumulators,
options: options.clone(),
current_count: 0,
})
}
#[inline]
pub fn append_record(&mut self, record: &[u8]) -> Result<Option<RecordBatch>> {
decode_record_to_columns(
&self.cobol_schema,
record,
&mut self.accumulators,
&self.options,
)?;
self.current_count += 1;
if self.current_count >= self.options.batch_size {
let batch = self.build_batch()?;
self.reset_accumulators()?;
return Ok(Some(batch));
}
Ok(None)
}
#[inline]
pub fn flush(&mut self) -> Result<Option<RecordBatch>> {
if self.current_count == 0 {
return Ok(None);
}
let batch = self.build_batch()?;
self.reset_accumulators()?;
Ok(Some(batch))
}
fn build_batch(&mut self) -> Result<RecordBatch> {
let columns: Vec<_> = self.accumulators.iter_mut().map(|a| a.finish()).collect();
RecordBatch::try_new(self.arrow_schema.clone(), columns)
.map_err(|e| ArrowError::ColumnBuild(format!("RecordBatch build failed: {e}")))
}
fn reset_accumulators(&mut self) -> Result<()> {
self.accumulators = create_accumulators(&self.cobol_schema, &self.options)?;
self.current_count = 0;
Ok(())
}
}
fn create_accumulators(
schema: ©book_core::Schema,
options: &ArrowOptions,
) -> Result<Vec<Box<dyn ColumnAccumulator>>> {
let mut accumulators = Vec::new();
for field in &schema.fields {
collect_accumulators(field, options, &mut accumulators)?;
}
Ok(accumulators)
}
fn collect_accumulators(
field: ©book_core::schema::Field,
options: &ArrowOptions,
output: &mut Vec<Box<dyn ColumnAccumulator>>,
) -> Result<()> {
if matches!(
field.kind,
FieldKind::Condition { .. } | FieldKind::Renames { .. }
) {
return Ok(());
}
if (field.name.starts_with("_filler_") || field.name.eq_ignore_ascii_case("FILLER"))
&& !options.emit_filler
{
return Ok(());
}
if matches!(field.kind, FieldKind::Group) {
if options.flatten_groups {
for child in &field.children {
collect_accumulators(child, options, output)?;
}
}
return Ok(());
}
let acc = create_accumulator(&field.kind, field.len, options)?;
output.push(acc);
Ok(())
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
use crate::schema_convert::cobol_schema_to_arrow;
use copybook_core::schema::{Field, FieldKind, Schema};
fn make_field(name: &str, kind: FieldKind, offset: u32, len: u32) -> Field {
let mut f = Field::with_kind(5, name.to_string(), kind);
f.path = name.to_string();
f.offset = offset;
f.len = len;
f
}
#[test]
fn test_batch_builder_flush() {
let schema = Schema::from_fields(vec![make_field(
"AMOUNT",
FieldKind::PackedDecimal {
digits: 5,
scale: 2,
signed: true,
},
0,
3,
)]);
let opts = ArrowOptions::default();
let arrow_schema = cobol_schema_to_arrow(&schema, &opts).unwrap();
let mut builder = RecordBatchBuilder::new(Arc::new(arrow_schema), &schema, &opts).unwrap();
let record = [0x12, 0x34, 0x5C];
let batch = builder.append_record(&record).unwrap();
assert!(batch.is_none());
let batch = builder.flush().unwrap();
assert!(batch.is_some());
let batch = batch.unwrap();
assert_eq!(batch.num_rows(), 1);
assert_eq!(batch.num_columns(), 1);
}
#[test]
fn test_batch_builder_auto_flush() {
let schema = Schema::from_fields(vec![make_field(
"NAME",
FieldKind::Alphanum { len: 4 },
0,
4,
)]);
let opts = ArrowOptions {
batch_size: 2, codepage: copybook_codec::Codepage::ASCII,
..ArrowOptions::default()
};
let arrow_schema = cobol_schema_to_arrow(&schema, &opts).unwrap();
let mut builder = RecordBatchBuilder::new(Arc::new(arrow_schema), &schema, &opts).unwrap();
let r1 = b"ABCD";
let r2 = b"EFGH";
assert!(builder.append_record(r1).unwrap().is_none());
let batch = builder.append_record(r2).unwrap();
assert!(batch.is_some());
assert_eq!(batch.unwrap().num_rows(), 2);
}
}