use std::sync::Arc;
use arrow::record_batch::RecordBatch;
use exon_common::ExonArrayBuilder;
use futures::Stream;
use tokio::io::AsyncBufRead;
use super::error::Result;
use super::{array_builder::GFFArrayBuilder, GFFConfig};
pub struct BatchReader<R> {
reader: noodles::gff::AsyncReader<R>,
config: Arc<GFFConfig>,
region: Option<Arc<noodles::core::Region>>,
}
impl<R> BatchReader<R>
where
R: AsyncBufRead + Unpin + Send,
{
pub fn new(reader: R, config: Arc<GFFConfig>) -> Self {
Self {
reader: noodles::gff::AsyncReader::new(reader),
config,
region: None,
}
}
pub fn with_region(mut self, region: Arc<noodles::core::Region>) -> Self {
self.region = Some(region);
self
}
pub fn into_stream(self) -> impl Stream<Item = Result<RecordBatch>> {
futures::stream::unfold(self, |mut reader| async move {
match reader.read_batch().await {
Ok(Some(batch)) => Some((Ok(batch), reader)),
Ok(None) => None,
Err(e) => Some((Err(e), reader)),
}
})
}
async fn read_line(&mut self) -> Result<Option<noodles::gff::Line>> {
let mut line = noodles::gff::Line::default();
match self.reader.read_line(&mut line).await {
Ok(0) => Ok(None),
Ok(_) => Ok(Some(line)),
Err(e) => Err(e.into()),
}
}
fn filter(&self, record: &noodles::gff::Record) -> Result<bool> {
let chrom = record.reference_sequence_name();
match &self.region {
Some(region) => {
let region_name = std::str::from_utf8(region.name())?;
if chrom != region_name {
return Ok(false);
}
let start = record.start()?;
if !region.interval().contains(start) {
return Ok(false);
}
Ok(true)
}
None => Ok(true),
}
}
async fn read_batch(&mut self) -> Result<Option<RecordBatch>> {
let mut gff_array_builder = GFFArrayBuilder::new(
self.config.file_schema.clone(),
self.config.projection.clone(),
);
loop {
match self.read_line().await? {
None => break,
Some(line) => match line.as_record() {
Some(Ok(record)) => {
if !self.filter(&record)? {
continue;
}
gff_array_builder.append(&record)?;
}
Some(Err(e)) => return Err(e.into()),
None => {}
},
}
}
if gff_array_builder.is_empty() {
return Ok(None);
}
let schema = self.config.projected_schema()?;
let batch = gff_array_builder.try_into_record_batch(schema)?;
Ok(Some(batch))
}
}