exon_sam/
batch_reader.rs

1// Copyright 2023 WHERE TRUE Technologies.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use arrow::{error::ArrowError, record_batch::RecordBatch};
18
19use exon_common::ExonArrayBuilder;
20use futures::Stream;
21use noodles::sam::alignment::RecordBuf;
22use tokio::io::{AsyncBufRead, AsyncRead};
23
24use super::{array_builder::SAMArrayBuilder, config::SAMConfig};
25
26/// A batch reader for SAM files.
27pub struct BatchReader<R>
28where
29    R: AsyncRead,
30{
31    /// The underlying SAM reader.
32    reader: noodles::sam::AsyncReader<R>,
33
34    /// The configuration for this reader.
35    config: Arc<SAMConfig>,
36
37    /// The header of the SAM file.
38    header: noodles::sam::Header,
39}
40
41impl<R> BatchReader<R>
42where
43    R: AsyncBufRead + Unpin + Send + AsyncRead,
44{
45    pub async fn new(inner: R, config: Arc<SAMConfig>) -> std::io::Result<Self> {
46        let mut reader = noodles::sam::AsyncReader::new(inner);
47
48        let header = reader.read_header().await?;
49
50        Ok(Self {
51            reader,
52            header,
53            config,
54        })
55    }
56
57    pub fn into_stream(self) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
58        futures::stream::unfold(self, |mut reader| async move {
59            match reader.read_batch().await {
60                Ok(Some(batch)) => Some((Ok(batch), reader)),
61                Ok(None) => None,
62                Err(e) => Some((Err(ArrowError::ExternalError(Box::new(e))), reader)),
63            }
64        })
65    }
66
67    async fn read_record(&mut self) -> std::io::Result<Option<RecordBuf>> {
68        let mut record = RecordBuf::default();
69
70        match self
71            .reader
72            .read_record_buf(&self.header, &mut record)
73            .await?
74        {
75            0 => Ok(None),
76            _ => Ok(Some(record)),
77        }
78    }
79
80    async fn read_batch(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
81        let mut array_builder = SAMArrayBuilder::create(self.header.clone(), self.config.clone());
82
83        for _ in 0..self.config.batch_size {
84            match self.read_record().await? {
85                Some(record) => array_builder.append(&record).map_err(|e| {
86                    std::io::Error::new(
87                        std::io::ErrorKind::InvalidData,
88                        format!("invalid record: {e}"),
89                    )
90                })?,
91                None => break,
92            }
93        }
94
95        if array_builder.is_empty() {
96            return Ok(None);
97        }
98
99        let schema = self.config.projected_schema()?;
100        let batch = array_builder.try_into_record_batch(schema)?;
101
102        Ok(Some(batch))
103    }
104}