exon_fastq/
batch_reader.rs1use std::sync::Arc;
16
17use exon_common::ExonArrayBuilder;
18
19use arrow::record_batch::RecordBatch;
20use noodles::fastq;
21
22use tokio::io::AsyncBufRead;
23
24use crate::error::ExonFastqResult;
25
26use super::{array_builder::FASTQArrayBuilder, FASTQConfig};
27
28pub struct BatchReader<R> {
29 reader: noodles::fastq::AsyncReader<R>,
31 config: Arc<FASTQConfig>,
33}
34
35impl<R> BatchReader<R>
36where
37 R: AsyncBufRead + Unpin + Send,
38{
39 pub fn new(inner: R, config: Arc<FASTQConfig>) -> Self {
40 Self {
41 reader: noodles::fastq::AsyncReader::new(inner),
42 config,
43 }
44 }
45
46 pub fn into_stream(self) -> impl futures::Stream<Item = ExonFastqResult<RecordBatch>> {
48 futures::stream::try_unfold(self, |mut reader| async move {
49 match reader.read_batch(reader.config.batch_size).await? {
50 Some(batch) => Ok(Some((batch, reader))),
51 None => Ok(None),
52 }
53 })
54 }
55
56 async fn read_record(&mut self, record: &mut fastq::Record) -> ExonFastqResult<Option<()>> {
57 match self.reader.read_record(record).await? {
58 0 => Ok(None),
59 _ => Ok(Some(())),
60 }
61 }
62
63 async fn read_batch(&mut self, batch_size: usize) -> ExonFastqResult<Option<RecordBatch>> {
64 let mut array = FASTQArrayBuilder::with_capacity(batch_size, self.config.projection());
65 let mut record = fastq::Record::default(); for _ in 0..batch_size {
68 match self.read_record(&mut record).await? {
69 Some(_) => array.append(&record)?,
70 None => break,
71 }
72 }
73
74 if array.len() == 0 {
75 return Ok(None);
76 }
77
78 let schema = self.config.projected_schema()?;
79 let batch = array.try_into_record_batch(schema)?;
80
81 Ok(Some(batch))
82 }
83}