exon_cram/
async_batch_stream.rs

1// Copyright 2024 WHERE TRUE Technologies.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use arrow::{
18    error::{ArrowError, Result as ArrowResult},
19    record_batch::RecordBatch,
20};
21use exon_common::{ExonArrayBuilder, DEFAULT_BATCH_SIZE};
22use futures::Stream;
23use noodles::cram::AsyncReader;
24use object_store::ObjectStore;
25use tokio::io::AsyncBufRead;
26
27use crate::{array_builder::CRAMArrayBuilder, CRAMConfig, ObjectStoreFastaRepositoryAdapter};
28
29pub struct AsyncBatchStream<R>
30where
31    R: AsyncBufRead + Unpin,
32{
33    reader: AsyncReader<R>,
34
35    /// The header.
36    header: noodles::sam::Header,
37
38    /// The CRAM config.
39    config: Arc<CRAMConfig>,
40
41    /// The reference repository
42    reference_sequence_repository: noodles::fasta::Repository,
43}
44
45impl<R> AsyncBatchStream<R>
46where
47    R: AsyncBufRead + Unpin,
48{
49    pub async fn try_new(
50        reader: AsyncReader<R>,
51        object_store: Arc<dyn ObjectStore>,
52        header: noodles::sam::Header,
53        config: Arc<CRAMConfig>,
54    ) -> ArrowResult<Self> {
55        let reference_sequence_repository = match &config.fasta_reference {
56            Some(reference) => {
57                let object_store_adapter = ObjectStoreFastaRepositoryAdapter::try_new(
58                    object_store.clone(),
59                    reference.clone(),
60                )
61                .await?;
62
63                noodles::fasta::Repository::new(object_store_adapter)
64            }
65            None => noodles::fasta::Repository::default(),
66        };
67
68        Ok(Self {
69            reader,
70            header,
71            config,
72            reference_sequence_repository,
73        })
74    }
75
76    async fn read_batch(&mut self) -> ArrowResult<Option<RecordBatch>> {
77        let mut array_builder =
78            CRAMArrayBuilder::new(self.header.clone(), DEFAULT_BATCH_SIZE, &self.config);
79
80        if let Some(container) = self.reader.read_data_container().await? {
81            let records = container
82                .slices()
83                .iter()
84                .map(|slice| {
85                    let compression_header = container.compression_header();
86
87                    slice.records(compression_header).and_then(|mut records| {
88                        slice.resolve_records(
89                            &self.reference_sequence_repository,
90                            &self.header,
91                            compression_header,
92                            &mut records,
93                        )?;
94
95                        Ok(records)
96                    })
97                })
98                .collect::<Result<Vec<_>, _>>()?
99                .into_iter()
100                .flatten();
101
102            // iterate through the records and append them to the array builder
103            for record in records {
104                array_builder.append(record)?;
105            }
106        } else {
107            return Ok(None);
108        }
109
110        let schema = self.config.projected_schema();
111        let batch = array_builder.try_into_record_batch(schema)?;
112
113        Ok(Some(batch))
114    }
115
116    pub fn into_stream(self) -> impl Stream<Item = ArrowResult<RecordBatch>> {
117        futures::stream::unfold(self, |mut reader| async move {
118            match reader.read_batch().await {
119                Ok(Some(batch)) => Some((Ok(batch), reader)),
120                Ok(None) => None,
121                Err(e) => Some((Err(ArrowError::ExternalError(Box::new(e))), reader)),
122            }
123        })
124    }
125}