exon_vcf/
async_batch_stream.rs1use std::sync::Arc;
16
17use arrow::{
18 error::{ArrowError, Result as ArrowResult},
19 record_batch::RecordBatch,
20};
21use exon_common::ExonArrayBuilder;
22use futures::Stream;
23use tokio::io::AsyncBufRead;
24
25use super::{array_builder::LazyVCFArrayBuilder, config::VCFConfig};
26
27pub struct AsyncBatchStream<R>
29where
30 R: AsyncBufRead + Unpin,
31{
32 reader: noodles::vcf::AsyncReader<R>,
34
35 config: Arc<VCFConfig>,
37
38 header: Arc<noodles::vcf::Header>,
40}
41
42impl<R> AsyncBatchStream<R>
43where
44 R: AsyncBufRead + Unpin,
45{
46 pub fn new(
48 reader: noodles::vcf::AsyncReader<R>,
49 config: Arc<VCFConfig>,
50 header: Arc<noodles::vcf::Header>,
51 ) -> Self {
52 Self {
53 reader,
54 config,
55 header,
56 }
57 }
58
59 async fn read_record(&mut self) -> std::io::Result<Option<noodles::vcf::Record>> {
60 let mut record = noodles::vcf::Record::default();
61
62 match self.reader.read_record(&mut record).await {
63 Ok(0) => Ok(None),
64 Ok(_) => Ok(Some(record)),
65 Err(e) => Err(e),
66 }
67 }
68
69 pub fn into_stream(self) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
71 futures::stream::unfold(self, |mut reader| async move {
72 match reader.read_batch().await {
73 Ok(Some(batch)) => Some((Ok(batch), reader)),
74 Ok(None) => None,
75 Err(e) => Some((Err(ArrowError::ExternalError(Box::new(e))), reader)),
76 }
77 })
78 }
79
80 async fn read_batch(&mut self) -> ArrowResult<Option<RecordBatch>> {
81 let mut array_builder = LazyVCFArrayBuilder::create(
82 self.config.file_schema.clone(),
83 self.config.batch_size,
84 self.config.projection.clone(),
85 self.header.clone(),
86 )?;
87
88 while array_builder.len() < self.config.batch_size {
89 let record = self.read_record().await?;
90
91 match record {
92 Some(record) => {
93 array_builder.append(record)?;
94 }
95 None => {
96 break;
97 }
98 }
99 }
100
101 if array_builder.is_empty() {
102 return Ok(None);
103 }
104
105 let schema = self.config.projected_schema();
106 let batch = array_builder.try_into_record_batch(schema)?;
107
108 Ok(Some(batch))
109 }
110}