exon_fasta/
batch_reader.rs

1// Copyright 2023 WHERE TRUE Technologies.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use arrow::record_batch::RecordBatch;
18use exon_common::ExonArrayBuilder;
19use futures::Stream;
20
21use tokio::io::AsyncBufRead;
22
23use crate::{error::ExonFASTAResult, ExonFASTAError};
24
25use super::{array_builder::FASTAArrayBuilder, config::FASTAConfig};
26
27/// A FASTA batch reader.
28pub struct BatchReader<R> {
29    /// The underlying FASTA reader.
30    reader: noodles::fasta::AsyncReader<R>,
31
32    /// The FASTA configuration.
33    config: Arc<FASTAConfig>,
34
35    /// Internal buffer for the sequence.
36    sequence_buffer: Vec<u8>,
37
38    /// Internal buffer for the definition.
39    buf: String,
40}
41
42impl<R> BatchReader<R>
43where
44    R: AsyncBufRead + Unpin + Send,
45{
46    /// Creates a FASTA batch reader.
47    pub fn new(inner: R, config: Arc<FASTAConfig>) -> Self {
48        let buffer_size = config.fasta_sequence_buffer_capacity;
49
50        Self {
51            reader: noodles::fasta::AsyncReader::new(inner),
52            config,
53            buf: String::with_capacity(50),
54            sequence_buffer: Vec::with_capacity(buffer_size),
55        }
56    }
57
58    async fn read_record(&mut self) -> ExonFASTAResult<Option<()>> {
59        self.buf.clear();
60        if self.reader.read_definition(&mut self.buf).await? == 0 {
61            return Ok(None);
62        }
63
64        self.sequence_buffer.clear();
65        if self.reader.read_sequence(&mut self.sequence_buffer).await? == 0 {
66            return Err(ExonFASTAError::ParseError("invalid sequence".to_string()));
67        }
68
69        Ok(Some(()))
70    }
71
72    async fn read_batch(&mut self) -> ExonFASTAResult<Option<RecordBatch>> {
73        let mut array_builder = FASTAArrayBuilder::create(
74            self.config.file_schema.clone(),
75            self.config.projection.clone(),
76            self.config.batch_size,
77            &self.config.sequence_data_type,
78        )?;
79
80        for _ in 0..self.config.batch_size {
81            self.buf.clear();
82            self.sequence_buffer.clear();
83
84            if (self.read_record().await?).is_some() {
85                array_builder.append(&self.buf, &self.sequence_buffer)?;
86            } else {
87                break;
88            }
89        }
90
91        if array_builder.is_empty() {
92            return Ok(None);
93        }
94
95        let schema = self.config.projected_schema()?;
96        let batch = array_builder.try_into_record_batch(schema)?;
97
98        Ok(Some(batch))
99    }
100
101    /// Converts the reader into a stream of batches.
102    pub fn into_stream(self) -> impl Stream<Item = ExonFASTAResult<RecordBatch>> {
103        futures::stream::unfold(self, |mut reader| async move {
104            match reader.read_batch().await {
105                Ok(Some(batch)) => Some((Ok(batch), reader)),
106                Ok(None) => None,
107                Err(e) => Some((Err(e), reader)),
108            }
109        })
110    }
111}