exon_sdf/
batch_reader.rs

1// Copyright 2024 WHERE TRUE Technologies.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use arrow::{array::RecordBatch, error::ArrowError};
18use exon_common::ExonArrayBuilder;
19use tokio::io::AsyncBufRead;
20
21use crate::config::SDFConfig;
22
23pub struct BatchReader<R> {
24    reader: crate::io::Reader<R>,
25    n_records: usize,
26    config: Arc<crate::config::SDFConfig>,
27}
28
29impl<R> BatchReader<R>
30where
31    R: AsyncBufRead + Unpin,
32{
33    pub fn new(inner: R, config: Arc<SDFConfig>) -> Self {
34        BatchReader {
35            reader: crate::io::Reader::new(inner),
36            config,
37            n_records: 0,
38        }
39    }
40
41    pub async fn read_batch(&mut self) -> crate::Result<Option<RecordBatch>> {
42        if self.n_records >= self.config.limit.unwrap_or(usize::MAX) {
43            return Ok(None);
44        }
45
46        let file_schema = self.config.file_schema.clone();
47        let mut array_builder = crate::array_builder::SDFArrayBuilder::new(
48            file_schema.fields().clone(),
49            self.config.clone(),
50        )?;
51
52        for _ in 0..self.config.effective_batch_size() {
53            match self.reader.read_record().await? {
54                Some(record) => array_builder.append_value(record)?,
55                None => break,
56            }
57        }
58
59        self.n_records += array_builder.len();
60
61        if array_builder.is_empty() {
62            Ok(None)
63        } else {
64            // let finished_builder = array_builder.finish();
65
66            let schema = self.config.projected_schema()?;
67            let rb = array_builder.try_into_record_batch(schema)?;
68
69            Ok(Some(rb))
70        }
71    }
72
73    pub fn into_stream(self) -> impl futures::Stream<Item = Result<RecordBatch, ArrowError>> {
74        futures::stream::unfold(self, |mut reader| async move {
75            match reader.read_batch().await {
76                Ok(Some(batch)) => Some((Ok(batch), reader)),
77                Ok(None) => None,
78                Err(e) => {
79                    let arrow_error = e.into();
80                    Some((Err(arrow_error), reader))
81                }
82            }
83        })
84    }
85}