copybook_arrow/
streaming.rs1use arrow::array::RecordBatch;
8use arrow::datatypes::Schema as ArrowSchema;
9use std::io::Read;
10use std::sync::Arc;
11
12use crate::batch_builder::RecordBatchBuilder;
13use crate::options::ArrowOptions;
14use crate::schema_convert::cobol_schema_to_arrow;
15use crate::{ArrowError, Result};
16
17#[inline]
26pub fn stream_to_batches<R: Read>(
27 mut reader: R,
28 cobol_schema: ©book_core::Schema,
29 options: &ArrowOptions,
30) -> Result<Vec<RecordBatch>> {
31 let arrow_schema = cobol_schema_to_arrow(cobol_schema, options)?;
32 let arrow_schema = Arc::new(arrow_schema);
33
34 let record_len = cobol_schema.lrecl_fixed.ok_or_else(|| {
35 ArrowError::SchemaConversion(
36 "Fixed record length (lrecl_fixed) required for streaming".to_string(),
37 )
38 })? as usize;
39
40 let mut builder = RecordBatchBuilder::new(Arc::clone(&arrow_schema), cobol_schema, options)?;
41 let mut batches = Vec::new();
42 let mut buf = vec![0u8; record_len];
43
44 loop {
45 match reader.read_exact(&mut buf) {
46 Ok(()) => {
47 if let Some(batch) = builder.append_record(&buf)? {
48 batches.push(batch);
49 }
50 }
51 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
52 Err(e) => return Err(ArrowError::Io(e)),
53 }
54 }
55
56 if let Some(batch) = builder.flush()? {
58 batches.push(batch);
59 }
60
61 Ok(batches)
62}
63
64#[inline]
73pub fn stream_to_batches_with_schema<R: Read>(
74 reader: R,
75 cobol_schema: ©book_core::Schema,
76 options: &ArrowOptions,
77) -> Result<(ArrowSchema, Vec<RecordBatch>)> {
78 let arrow_schema = cobol_schema_to_arrow(cobol_schema, options)?;
79 let batches = stream_to_batches(reader, cobol_schema, options)?;
80 Ok((arrow_schema, batches))
81}