1use std::sync::Arc;
16
17use arrow::{array::RecordBatch, error::ArrowError};
18use exon_common::ExonArrayBuilder;
19use tokio::io::AsyncBufRead;
20
21use crate::config::SDFConfig;
22
23pub struct BatchReader<R> {
24 reader: crate::io::Reader<R>,
25 n_records: usize,
26 config: Arc<crate::config::SDFConfig>,
27}
28
29impl<R> BatchReader<R>
30where
31 R: AsyncBufRead + Unpin,
32{
33 pub fn new(inner: R, config: Arc<SDFConfig>) -> Self {
34 BatchReader {
35 reader: crate::io::Reader::new(inner),
36 config,
37 n_records: 0,
38 }
39 }
40
41 pub async fn read_batch(&mut self) -> crate::Result<Option<RecordBatch>> {
42 if self.n_records >= self.config.limit.unwrap_or(usize::MAX) {
43 return Ok(None);
44 }
45
46 let file_schema = self.config.file_schema.clone();
47 let mut array_builder = crate::array_builder::SDFArrayBuilder::new(
48 file_schema.fields().clone(),
49 self.config.clone(),
50 )?;
51
52 for _ in 0..self.config.effective_batch_size() {
53 match self.reader.read_record().await? {
54 Some(record) => array_builder.append_value(record)?,
55 None => break,
56 }
57 }
58
59 self.n_records += array_builder.len();
60
61 if array_builder.is_empty() {
62 Ok(None)
63 } else {
64 let schema = self.config.projected_schema()?;
67 let rb = array_builder.try_into_record_batch(schema)?;
68
69 Ok(Some(rb))
70 }
71 }
72
73 pub fn into_stream(self) -> impl futures::Stream<Item = Result<RecordBatch, ArrowError>> {
74 futures::stream::unfold(self, |mut reader| async move {
75 match reader.read_batch().await {
76 Ok(Some(batch)) => Some((Ok(batch), reader)),
77 Ok(None) => None,
78 Err(e) => {
79 let arrow_error = e.into();
80 Some((Err(arrow_error), reader))
81 }
82 }
83 })
84 }
85}