exon_vcf/
async_batch_stream.rs

1// Copyright 2023 WHERE TRUE Technologies.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use arrow::{
18    error::{ArrowError, Result as ArrowResult},
19    record_batch::RecordBatch,
20};
21use exon_common::ExonArrayBuilder;
22use futures::Stream;
23use tokio::io::AsyncBufRead;
24
25use super::{array_builder::LazyVCFArrayBuilder, config::VCFConfig};
26
27/// A VCF record batch reader.
28pub struct AsyncBatchStream<R>
29where
30    R: AsyncBufRead + Unpin,
31{
32    /// The underlying reader.
33    reader: noodles::vcf::AsyncReader<R>,
34
35    /// The VCF configuration.
36    config: Arc<VCFConfig>,
37
38    /// The VCF header.
39    header: Arc<noodles::vcf::Header>,
40}
41
42impl<R> AsyncBatchStream<R>
43where
44    R: AsyncBufRead + Unpin,
45{
46    /// Create a new VCF record batch reader.
47    pub fn new(
48        reader: noodles::vcf::AsyncReader<R>,
49        config: Arc<VCFConfig>,
50        header: Arc<noodles::vcf::Header>,
51    ) -> Self {
52        Self {
53            reader,
54            config,
55            header,
56        }
57    }
58
59    async fn read_record(&mut self) -> std::io::Result<Option<noodles::vcf::Record>> {
60        let mut record = noodles::vcf::Record::default();
61
62        match self.reader.read_record(&mut record).await {
63            Ok(0) => Ok(None),
64            Ok(_) => Ok(Some(record)),
65            Err(e) => Err(e),
66        }
67    }
68
69    /// Stream the record batches from the VCF file.
70    pub fn into_stream(self) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
71        futures::stream::unfold(self, |mut reader| async move {
72            match reader.read_batch().await {
73                Ok(Some(batch)) => Some((Ok(batch), reader)),
74                Ok(None) => None,
75                Err(e) => Some((Err(ArrowError::ExternalError(Box::new(e))), reader)),
76            }
77        })
78    }
79
80    async fn read_batch(&mut self) -> ArrowResult<Option<RecordBatch>> {
81        let mut array_builder = LazyVCFArrayBuilder::create(
82            self.config.file_schema.clone(),
83            self.config.batch_size,
84            self.config.projection.clone(),
85            self.header.clone(),
86        )?;
87
88        while array_builder.len() < self.config.batch_size {
89            let record = self.read_record().await?;
90
91            match record {
92                Some(record) => {
93                    array_builder.append(record)?;
94                }
95                None => {
96                    break;
97                }
98            }
99        }
100
101        if array_builder.is_empty() {
102            return Ok(None);
103        }
104
105        let schema = self.config.projected_schema();
106        let batch = array_builder.try_into_record_batch(schema)?;
107
108        Ok(Some(batch))
109    }
110}