exon_fasta/
batch_reader.rs1use std::sync::Arc;
16
17use arrow::record_batch::RecordBatch;
18use exon_common::ExonArrayBuilder;
19use futures::Stream;
20
21use tokio::io::AsyncBufRead;
22
23use crate::{error::ExonFASTAResult, ExonFASTAError};
24
25use super::{array_builder::FASTAArrayBuilder, config::FASTAConfig};
26
27pub struct BatchReader<R> {
29 reader: noodles::fasta::AsyncReader<R>,
31
32 config: Arc<FASTAConfig>,
34
35 sequence_buffer: Vec<u8>,
37
38 buf: String,
40}
41
42impl<R> BatchReader<R>
43where
44 R: AsyncBufRead + Unpin + Send,
45{
46 pub fn new(inner: R, config: Arc<FASTAConfig>) -> Self {
48 let buffer_size = config.fasta_sequence_buffer_capacity;
49
50 Self {
51 reader: noodles::fasta::AsyncReader::new(inner),
52 config,
53 buf: String::with_capacity(50),
54 sequence_buffer: Vec::with_capacity(buffer_size),
55 }
56 }
57
58 async fn read_record(&mut self) -> ExonFASTAResult<Option<()>> {
59 self.buf.clear();
60 if self.reader.read_definition(&mut self.buf).await? == 0 {
61 return Ok(None);
62 }
63
64 self.sequence_buffer.clear();
65 if self.reader.read_sequence(&mut self.sequence_buffer).await? == 0 {
66 return Err(ExonFASTAError::ParseError("invalid sequence".to_string()));
67 }
68
69 Ok(Some(()))
70 }
71
72 async fn read_batch(&mut self) -> ExonFASTAResult<Option<RecordBatch>> {
73 let mut array_builder = FASTAArrayBuilder::create(
74 self.config.file_schema.clone(),
75 self.config.projection.clone(),
76 self.config.batch_size,
77 &self.config.sequence_data_type,
78 )?;
79
80 for _ in 0..self.config.batch_size {
81 self.buf.clear();
82 self.sequence_buffer.clear();
83
84 if (self.read_record().await?).is_some() {
85 array_builder.append(&self.buf, &self.sequence_buffer)?;
86 } else {
87 break;
88 }
89 }
90
91 if array_builder.is_empty() {
92 return Ok(None);
93 }
94
95 let schema = self.config.projected_schema()?;
96 let batch = array_builder.try_into_record_batch(schema)?;
97
98 Ok(Some(batch))
99 }
100
101 pub fn into_stream(self) -> impl Stream<Item = ExonFASTAResult<RecordBatch>> {
103 futures::stream::unfold(self, |mut reader| async move {
104 match reader.read_batch().await {
105 Ok(Some(batch)) => Some((Ok(batch), reader)),
106 Ok(None) => None,
107 Err(e) => Some((Err(e), reader)),
108 }
109 })
110 }
111}