exon_cram/
async_batch_stream.rs1use std::sync::Arc;
16
17use arrow::{
18 error::{ArrowError, Result as ArrowResult},
19 record_batch::RecordBatch,
20};
21use exon_common::{ExonArrayBuilder, DEFAULT_BATCH_SIZE};
22use futures::Stream;
23use noodles::cram::AsyncReader;
24use object_store::ObjectStore;
25use tokio::io::AsyncBufRead;
26
27use crate::{array_builder::CRAMArrayBuilder, CRAMConfig, ObjectStoreFastaRepositoryAdapter};
28
29pub struct AsyncBatchStream<R>
30where
31 R: AsyncBufRead + Unpin,
32{
33 reader: AsyncReader<R>,
34
35 header: noodles::sam::Header,
37
38 config: Arc<CRAMConfig>,
40
41 reference_sequence_repository: noodles::fasta::Repository,
43}
44
45impl<R> AsyncBatchStream<R>
46where
47 R: AsyncBufRead + Unpin,
48{
49 pub async fn try_new(
50 reader: AsyncReader<R>,
51 object_store: Arc<dyn ObjectStore>,
52 header: noodles::sam::Header,
53 config: Arc<CRAMConfig>,
54 ) -> ArrowResult<Self> {
55 let reference_sequence_repository = match &config.fasta_reference {
56 Some(reference) => {
57 let object_store_adapter = ObjectStoreFastaRepositoryAdapter::try_new(
58 object_store.clone(),
59 reference.clone(),
60 )
61 .await?;
62
63 noodles::fasta::Repository::new(object_store_adapter)
64 }
65 None => noodles::fasta::Repository::default(),
66 };
67
68 Ok(Self {
69 reader,
70 header,
71 config,
72 reference_sequence_repository,
73 })
74 }
75
76 async fn read_batch(&mut self) -> ArrowResult<Option<RecordBatch>> {
77 let mut array_builder =
78 CRAMArrayBuilder::new(self.header.clone(), DEFAULT_BATCH_SIZE, &self.config);
79
80 if let Some(container) = self.reader.read_data_container().await? {
81 let records = container
82 .slices()
83 .iter()
84 .map(|slice| {
85 let compression_header = container.compression_header();
86
87 slice.records(compression_header).and_then(|mut records| {
88 slice.resolve_records(
89 &self.reference_sequence_repository,
90 &self.header,
91 compression_header,
92 &mut records,
93 )?;
94
95 Ok(records)
96 })
97 })
98 .collect::<Result<Vec<_>, _>>()?
99 .into_iter()
100 .flatten();
101
102 for record in records {
104 array_builder.append(record)?;
105 }
106 } else {
107 return Ok(None);
108 }
109
110 let schema = self.config.projected_schema();
111 let batch = array_builder.try_into_record_batch(schema)?;
112
113 Ok(Some(batch))
114 }
115
116 pub fn into_stream(self) -> impl Stream<Item = ArrowResult<RecordBatch>> {
117 futures::stream::unfold(self, |mut reader| async move {
118 match reader.read_batch().await {
119 Ok(Some(batch)) => Some((Ok(batch), reader)),
120 Ok(None) => None,
121 Err(e) => Some((Err(ArrowError::ExternalError(Box::new(e))), reader)),
122 }
123 })
124 }
125}