1use std::sync::Arc;
16
17use arrow::{error::ArrowError, record_batch::RecordBatch};
18
19use exon_common::ExonArrayBuilder;
20use futures::Stream;
21use noodles::sam::alignment::RecordBuf;
22use tokio::io::{AsyncBufRead, AsyncRead};
23
24use super::{array_builder::SAMArrayBuilder, config::SAMConfig};
25
26pub struct BatchReader<R>
28where
29 R: AsyncRead,
30{
31 reader: noodles::sam::AsyncReader<R>,
33
34 config: Arc<SAMConfig>,
36
37 header: noodles::sam::Header,
39}
40
41impl<R> BatchReader<R>
42where
43 R: AsyncBufRead + Unpin + Send + AsyncRead,
44{
45 pub async fn new(inner: R, config: Arc<SAMConfig>) -> std::io::Result<Self> {
46 let mut reader = noodles::sam::AsyncReader::new(inner);
47
48 let header = reader.read_header().await?;
49
50 Ok(Self {
51 reader,
52 header,
53 config,
54 })
55 }
56
57 pub fn into_stream(self) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
58 futures::stream::unfold(self, |mut reader| async move {
59 match reader.read_batch().await {
60 Ok(Some(batch)) => Some((Ok(batch), reader)),
61 Ok(None) => None,
62 Err(e) => Some((Err(ArrowError::ExternalError(Box::new(e))), reader)),
63 }
64 })
65 }
66
67 async fn read_record(&mut self) -> std::io::Result<Option<RecordBuf>> {
68 let mut record = RecordBuf::default();
69
70 match self
71 .reader
72 .read_record_buf(&self.header, &mut record)
73 .await?
74 {
75 0 => Ok(None),
76 _ => Ok(Some(record)),
77 }
78 }
79
80 async fn read_batch(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
81 let mut array_builder = SAMArrayBuilder::create(self.header.clone(), self.config.clone());
82
83 for _ in 0..self.config.batch_size {
84 match self.read_record().await? {
85 Some(record) => array_builder.append(&record).map_err(|e| {
86 std::io::Error::new(
87 std::io::ErrorKind::InvalidData,
88 format!("invalid record: {e}"),
89 )
90 })?,
91 None => break,
92 }
93 }
94
95 if array_builder.is_empty() {
96 return Ok(None);
97 }
98
99 let schema = self.config.projected_schema()?;
100 let batch = array_builder.try_into_record_batch(schema)?;
101
102 Ok(Some(batch))
103 }
104}