use std::sync::Arc;
use arrow::record_batch::RecordBatch;
use exon_common::ExonArrayBuilder;
use futures::Stream;
use tokio::io::AsyncBufRead;
use crate::{error::ExonFastaResult, ExonFastaError};
use super::{array_builder::FASTAArrayBuilder, config::FASTAConfig};
pub struct BatchReader<R> {
reader: noodles::fasta::AsyncReader<R>,
config: Arc<FASTAConfig>,
sequence_buffer: Vec<u8>,
buf: String,
}
impl<R> BatchReader<R>
where
R: AsyncBufRead + Unpin + Send,
{
pub fn new(inner: R, config: Arc<FASTAConfig>) -> Self {
let buffer_size = config.fasta_sequence_buffer_capacity;
Self {
reader: noodles::fasta::AsyncReader::new(inner),
config,
buf: String::with_capacity(50),
sequence_buffer: Vec::with_capacity(buffer_size),
}
}
async fn read_record(&mut self) -> ExonFastaResult<Option<()>> {
self.buf.clear();
if self.reader.read_definition(&mut self.buf).await? == 0 {
return Ok(None);
}
self.sequence_buffer.clear();
if self.reader.read_sequence(&mut self.sequence_buffer).await? == 0 {
return Err(ExonFastaError::ParseError("invalid sequence".to_string()));
}
Ok(Some(()))
}
async fn read_batch(&mut self) -> ExonFastaResult<Option<RecordBatch>> {
let mut array_builder = FASTAArrayBuilder::create(
self.config.file_schema.clone(),
self.config.projection.clone(),
self.config.batch_size,
&self.config.sequence_data_type,
)?;
for _ in 0..self.config.batch_size {
self.buf.clear();
self.sequence_buffer.clear();
if (self.read_record().await?).is_some() {
array_builder.append(&self.buf, &self.sequence_buffer)?;
} else {
break;
}
}
if array_builder.is_empty() {
return Ok(None);
}
let schema = self.config.projected_schema()?;
let batch = array_builder.try_into_record_batch(schema)?;
Ok(Some(batch))
}
pub fn into_stream(self) -> impl Stream<Item = ExonFastaResult<RecordBatch>> {
futures::stream::unfold(self, |mut reader| async move {
match reader.read_batch().await {
Ok(Some(batch)) => Some((Ok(batch), reader)),
Ok(None) => None,
Err(e) => Some((Err(e), reader)),
}
})
}
}