use std::{str::FromStr, sync::Arc};
use arrow::record_batch::RecordBatch;
use futures::Stream;
use noodles::fasta::{
record::{Definition, Sequence},
Record,
};
use tokio::io::AsyncBufRead;
use crate::{error::ExonFastaResult, ExonFastaError};
use super::{array_builder::FASTAArrayBuilder, config::FASTAConfig};
pub struct BatchReader<R> {
reader: noodles::fasta::AsyncReader<R>,
config: Arc<FASTAConfig>,
sequence_buffer: Vec<u8>,
buf: String,
}
impl<R> BatchReader<R>
where
R: AsyncBufRead + Unpin + Send,
{
pub fn new(inner: R, config: Arc<FASTAConfig>) -> Self {
let buffer_size = config.fasta_sequence_buffer_capacity;
Self {
reader: noodles::fasta::AsyncReader::new(inner),
config,
buf: String::with_capacity(50),
sequence_buffer: Vec::with_capacity(buffer_size),
}
}
async fn read_record(&mut self) -> ExonFastaResult<Option<noodles::fasta::Record>> {
self.buf.clear();
if self.reader.read_definition(&mut self.buf).await? == 0 {
return Ok(None);
}
let definition = Definition::from_str(&self.buf)?;
self.sequence_buffer.clear();
if self.reader.read_sequence(&mut self.sequence_buffer).await? == 0 {
return Err(ExonFastaError::ParseError("invalid sequence".to_string()));
}
let sequence = Sequence::from_iter(self.sequence_buffer.iter().copied());
let record = Record::new(definition, sequence);
Ok(Some(record))
}
async fn read_batch(&mut self) -> ExonFastaResult<Option<RecordBatch>> {
let mut record_batch =
FASTAArrayBuilder::create(self.config.file_schema.clone(), self.config.batch_size)?;
for _ in 0..self.config.batch_size {
match self.read_record().await? {
Some(record) => record_batch.append(&record)?,
None => break,
}
}
if record_batch.is_empty() {
return Ok(None);
}
let batch = RecordBatch::try_new(self.config.file_schema.clone(), record_batch.finish())?;
match &self.config.projection {
Some(projection) => {
let projected_batch = batch.project(projection)?;
Ok(Some(projected_batch))
}
None => Ok(Some(batch)),
}
}
pub fn into_stream(self) -> impl Stream<Item = ExonFastaResult<RecordBatch>> {
futures::stream::unfold(self, |mut reader| async move {
match reader.read_batch().await {
Ok(Some(batch)) => Some((Ok(batch), reader)),
Ok(None) => None,
Err(e) => Some((Err(e), reader)),
}
})
}
}