exon_gff/
batch_reader.rs

1// Copyright 2023 WHERE TRUE Technologies.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use arrow::record_batch::RecordBatch;
18
19use exon_common::ExonArrayBuilder;
20use futures::Stream;
21use tokio::io::AsyncBufRead;
22
23use super::error::Result;
24
25use super::{array_builder::GFFArrayBuilder, GFFConfig};
26
27/// Reads a GFF file into arrow record batches.
28pub struct BatchReader<R> {
29    /// The reader to read from.
30    reader: noodles::gff::AsyncReader<R>,
31
32    /// The configuration for this reader.
33    config: Arc<GFFConfig>,
34
35    /// A region to filter on.
36    region: Option<Arc<noodles::core::Region>>,
37}
38
39impl<R> BatchReader<R>
40where
41    R: AsyncBufRead + Unpin + Send,
42{
43    pub fn new(reader: R, config: Arc<GFFConfig>) -> Self {
44        Self {
45            reader: noodles::gff::AsyncReader::new(reader),
46            config,
47            region: None,
48        }
49    }
50
51    pub fn with_region(mut self, region: Arc<noodles::core::Region>) -> Self {
52        self.region = Some(region);
53        self
54    }
55
56    pub fn into_stream(self) -> impl Stream<Item = Result<RecordBatch>> {
57        futures::stream::unfold(self, |mut reader| async move {
58            match reader.read_batch().await {
59                Ok(Some(batch)) => Some((Ok(batch), reader)),
60                Ok(None) => None,
61                Err(e) => Some((Err(e), reader)),
62            }
63        })
64    }
65
66    async fn read_line(&mut self) -> Result<Option<noodles::gff::Line>> {
67        let mut line = noodles::gff::Line::default();
68
69        match self.reader.read_line(&mut line).await {
70            Ok(0) => Ok(None),
71            Ok(_) => Ok(Some(line)),
72            Err(e) => Err(e.into()),
73        }
74    }
75
76    fn filter(&self, record: &noodles::gff::Record) -> Result<bool> {
77        let chrom = record.reference_sequence_name();
78
79        match &self.region {
80            Some(region) => {
81                let region_name = std::str::from_utf8(region.name())?;
82
83                if chrom != region_name {
84                    return Ok(false);
85                }
86
87                let start = record.start()?;
88
89                if !region.interval().contains(start) {
90                    return Ok(false);
91                }
92
93                Ok(true)
94            }
95            None => Ok(true),
96        }
97    }
98
99    async fn read_batch(&mut self) -> Result<Option<RecordBatch>> {
100        let mut gff_array_builder = GFFArrayBuilder::new(
101            self.config.file_schema.clone(),
102            self.config.projection.clone(),
103        );
104
105        loop {
106            match self.read_line().await? {
107                None => break,
108                Some(line) => match line.as_record() {
109                    Some(Ok(record)) => {
110                        if !self.filter(&record)? {
111                            continue;
112                        }
113
114                        gff_array_builder.append(&record)?;
115                    }
116                    Some(Err(e)) => return Err(e.into()),
117                    None => {}
118                },
119            }
120        }
121
122        if gff_array_builder.is_empty() {
123            return Ok(None);
124        }
125
126        let schema = self.config.projected_schema()?;
127        let batch = gff_array_builder.try_into_record_batch(schema)?;
128
129        Ok(Some(batch))
130    }
131}