exon_fastq/
batch_reader.rs

1// Copyright 2023 WHERE TRUE Technologies.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use exon_common::ExonArrayBuilder;
18
19use arrow::record_batch::RecordBatch;
20use noodles::fastq;
21
22use tokio::io::AsyncBufRead;
23
24use crate::error::ExonFastqResult;
25
26use super::{array_builder::FASTQArrayBuilder, FASTQConfig};
27
28pub struct BatchReader<R> {
29    /// The underlying FASTQ reader.
30    reader: noodles::fastq::AsyncReader<R>,
31    /// The FASTQ configuration.
32    config: Arc<FASTQConfig>,
33}
34
35impl<R> BatchReader<R>
36where
37    R: AsyncBufRead + Unpin + Send,
38{
39    pub fn new(inner: R, config: Arc<FASTQConfig>) -> Self {
40        Self {
41            reader: noodles::fastq::AsyncReader::new(inner),
42            config,
43        }
44    }
45
46    /// Stream built `RecordBatch`es from the underlying FASTQ reader.
47    pub fn into_stream(self) -> impl futures::Stream<Item = ExonFastqResult<RecordBatch>> {
48        futures::stream::try_unfold(self, |mut reader| async move {
49            match reader.read_batch(reader.config.batch_size).await? {
50                Some(batch) => Ok(Some((batch, reader))),
51                None => Ok(None),
52            }
53        })
54    }
55
56    async fn read_record(&mut self, record: &mut fastq::Record) -> ExonFastqResult<Option<()>> {
57        match self.reader.read_record(record).await? {
58            0 => Ok(None),
59            _ => Ok(Some(())),
60        }
61    }
62
63    async fn read_batch(&mut self, batch_size: usize) -> ExonFastqResult<Option<RecordBatch>> {
64        let mut array = FASTQArrayBuilder::with_capacity(batch_size, self.config.projection());
65        let mut record = fastq::Record::default(); // Allocate once
66
67        for _ in 0..batch_size {
68            match self.read_record(&mut record).await? {
69                Some(_) => array.append(&record)?,
70                None => break,
71            }
72        }
73
74        if array.len() == 0 {
75            return Ok(None);
76        }
77
78        let schema = self.config.projected_schema()?;
79        let batch = array.try_into_record_batch(schema)?;
80
81        Ok(Some(batch))
82    }
83}