Skip to main content

copybook_arrow/
streaming.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Streaming record reader that produces Arrow `RecordBatch` objects
3//!
4//! Reads fixed-length records from a byte reader and feeds them through
5//! the `RecordBatchBuilder` to produce batches.
6
7use arrow::array::RecordBatch;
8use arrow::datatypes::Schema as ArrowSchema;
9use std::io::Read;
10use std::sync::Arc;
11
12use crate::batch_builder::RecordBatchBuilder;
13use crate::options::ArrowOptions;
14use crate::schema_convert::cobol_schema_to_arrow;
15use crate::{ArrowError, Result};
16
17/// Stream binary records into Arrow `RecordBatch` objects.
18///
19/// Reads fixed-length records from `reader` (using the schema's `lrecl_fixed`)
20/// and returns a vector of `RecordBatch` objects.
21///
22/// # Errors
23///
24/// Returns an error if schema conversion, I/O, or decoding fails.
25#[inline]
26pub fn stream_to_batches<R: Read>(
27    mut reader: R,
28    cobol_schema: &copybook_core::Schema,
29    options: &ArrowOptions,
30) -> Result<Vec<RecordBatch>> {
31    let arrow_schema = cobol_schema_to_arrow(cobol_schema, options)?;
32    let arrow_schema = Arc::new(arrow_schema);
33
34    let record_len = cobol_schema.lrecl_fixed.ok_or_else(|| {
35        ArrowError::SchemaConversion(
36            "Fixed record length (lrecl_fixed) required for streaming".to_string(),
37        )
38    })? as usize;
39
40    let mut builder = RecordBatchBuilder::new(Arc::clone(&arrow_schema), cobol_schema, options)?;
41    let mut batches = Vec::new();
42    let mut buf = vec![0u8; record_len];
43
44    loop {
45        match reader.read_exact(&mut buf) {
46            Ok(()) => {
47                if let Some(batch) = builder.append_record(&buf)? {
48                    batches.push(batch);
49                }
50            }
51            Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
52            Err(e) => return Err(ArrowError::Io(e)),
53        }
54    }
55
56    // Flush remaining records
57    if let Some(batch) = builder.flush()? {
58        batches.push(batch);
59    }
60
61    Ok(batches)
62}
63
64/// Stream binary records into Arrow `RecordBatch` objects, returning the Arrow schema as well.
65///
66/// Convenience wrapper around [`stream_to_batches`] that also returns the generated
67/// Arrow schema for use in downstream writers.
68///
69/// # Errors
70///
71/// Returns an error if schema conversion, I/O, or decoding fails.
72#[inline]
73pub fn stream_to_batches_with_schema<R: Read>(
74    reader: R,
75    cobol_schema: &copybook_core::Schema,
76    options: &ArrowOptions,
77) -> Result<(ArrowSchema, Vec<RecordBatch>)> {
78    let arrow_schema = cobol_schema_to_arrow(cobol_schema, options)?;
79    let batches = stream_to_batches(reader, cobol_schema, options)?;
80    Ok((arrow_schema, batches))
81}