1use std::sync::Arc;
16
17use arrow::record_batch::RecordBatch;
18
19use exon_common::ExonArrayBuilder;
20use futures::Stream;
21use tokio::io::AsyncBufRead;
22
23use super::error::Result;
24
25use super::{array_builder::GFFArrayBuilder, GFFConfig};
26
27pub struct BatchReader<R> {
29 reader: noodles::gff::AsyncReader<R>,
31
32 config: Arc<GFFConfig>,
34
35 region: Option<Arc<noodles::core::Region>>,
37}
38
39impl<R> BatchReader<R>
40where
41 R: AsyncBufRead + Unpin + Send,
42{
43 pub fn new(reader: R, config: Arc<GFFConfig>) -> Self {
44 Self {
45 reader: noodles::gff::AsyncReader::new(reader),
46 config,
47 region: None,
48 }
49 }
50
51 pub fn with_region(mut self, region: Arc<noodles::core::Region>) -> Self {
52 self.region = Some(region);
53 self
54 }
55
56 pub fn into_stream(self) -> impl Stream<Item = Result<RecordBatch>> {
57 futures::stream::unfold(self, |mut reader| async move {
58 match reader.read_batch().await {
59 Ok(Some(batch)) => Some((Ok(batch), reader)),
60 Ok(None) => None,
61 Err(e) => Some((Err(e), reader)),
62 }
63 })
64 }
65
66 async fn read_line(&mut self) -> Result<Option<noodles::gff::Line>> {
67 let mut line = noodles::gff::Line::default();
68
69 match self.reader.read_line(&mut line).await {
70 Ok(0) => Ok(None),
71 Ok(_) => Ok(Some(line)),
72 Err(e) => Err(e.into()),
73 }
74 }
75
76 fn filter(&self, record: &noodles::gff::Record) -> Result<bool> {
77 let chrom = record.reference_sequence_name();
78
79 match &self.region {
80 Some(region) => {
81 let region_name = std::str::from_utf8(region.name())?;
82
83 if chrom != region_name {
84 return Ok(false);
85 }
86
87 let start = record.start()?;
88
89 if !region.interval().contains(start) {
90 return Ok(false);
91 }
92
93 Ok(true)
94 }
95 None => Ok(true),
96 }
97 }
98
99 async fn read_batch(&mut self) -> Result<Option<RecordBatch>> {
100 let mut gff_array_builder = GFFArrayBuilder::new(
101 self.config.file_schema.clone(),
102 self.config.projection.clone(),
103 );
104
105 loop {
106 match self.read_line().await? {
107 None => break,
108 Some(line) => match line.as_record() {
109 Some(Ok(record)) => {
110 if !self.filter(&record)? {
111 continue;
112 }
113
114 gff_array_builder.append(&record)?;
115 }
116 Some(Err(e)) => return Err(e.into()),
117 None => {}
118 },
119 }
120 }
121
122 if gff_array_builder.is_empty() {
123 return Ok(None);
124 }
125
126 let schema = self.config.projected_schema()?;
127 let batch = gff_array_builder.try_into_record_batch(schema)?;
128
129 Ok(Some(batch))
130 }
131}