Skip to main content

exon_bigwig/
value_batch_reader.rs

1// Copyright 2024 WHERE TRUE Technologies.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use bigtools::{utils::reopen::ReopenableFile, BigWigRead, ChromInfo, Value};
18
19use arrow::{
20    array::RecordBatch,
21    error::{ArrowError, Result as ArrowResult},
22};
23use noodles::core::Region;
24
25mod array_builder;
26mod config;
27
28use self::{array_builder::ValueArrayBuilder, config::ValueReadType};
29
30pub use self::config::BigWigValueConfig;
31pub use self::config::SchemaBuilder;
32
33pub struct ValueRecordBatchReader {
34    config: Arc<BigWigValueConfig>,
35
36    scanner: ValueScanner,
37}
38
39impl ValueRecordBatchReader {
40    pub fn try_new(file_path: &str, config: Arc<BigWigValueConfig>) -> ArrowResult<Self> {
41        let reader = BigWigRead::open_file(file_path).map_err(|e| {
42            ArrowError::IoError(
43                "failed to open bigwig file".to_string(),
44                std::io::Error::new(std::io::ErrorKind::Other, e),
45            )
46        })?;
47
48        let interval = match config.read_type {
49            ValueReadType::Interval(ref interval) => Some(interval.clone()),
50            _ => None,
51        };
52
53        let scanner = ValueScanner::try_new(reader, interval)?;
54
55        Ok(Self { config, scanner })
56    }
57
58    pub fn read_batch(&mut self) -> ArrowResult<Option<RecordBatch>> {
59        let mut record_batch = ValueArrayBuilder::with_capacity(self.config.batch_size);
60
61        for val in self.scanner.by_ref().take(self.config.batch_size) {
62            let (chrom, record) = val?;
63            record_batch.append(&chrom, record);
64        }
65
66        if record_batch.is_empty() {
67            return Ok(None);
68        }
69
70        let arrays = record_batch.finish();
71
72        let batch = RecordBatch::try_new(self.config.file_schema.clone(), arrays)?;
73
74        match &self.config.projection {
75            Some(projection) => {
76                let projected_batch = batch.project(projection)?;
77
78                Ok(Some(projected_batch))
79            }
80            None => Ok(Some(batch)),
81        }
82    }
83
84    /// Return a stream of record_batches.
85    pub fn into_stream(self) -> impl futures::Stream<Item = ArrowResult<RecordBatch>> {
86        futures::stream::unfold(self, |mut reader| async move {
87            match reader.read_batch() {
88                Ok(Some(batch)) => Some((Ok(batch), reader)),
89                Ok(None) => None,
90                Err(e) => Some((Err(e), reader)),
91            }
92        })
93    }
94}
95
96struct ValueScanner {
97    reader: BigWigRead<ReopenableFile>,
98    chroms: Vec<ChromInfo>,
99    chrom_position: usize,
100    current_records: Vec<Value>,
101    within_batch_position: usize,
102}
103
104impl ValueScanner {
105    pub fn try_new(
106        mut reader: BigWigRead<ReopenableFile>,
107        interval: Option<Region>,
108    ) -> ArrowResult<Self> {
109        if let Some(region) = interval {
110            let name = std::str::from_utf8(region.name())?;
111
112            let chroms = reader
113                .chroms()
114                .iter()
115                .filter(|c| c.name == name)
116                .cloned()
117                .collect::<Vec<_>>();
118
119            let chrom = chroms.first().ok_or_else(|| {
120                ArrowError::InvalidArgumentError(format!("chromosome {} not found", name))
121            })?;
122
123            let start = region.interval().start().map_or(0, |s| s.get() as u32);
124            let end = region
125                .interval()
126                .end()
127                .map_or(chrom.length, |e| e.get() as u32);
128
129            let inter = reader.get_interval(name, start, end).map_err(|e| {
130                ArrowError::IoError(
131                    e.to_string(),
132                    std::io::Error::new(std::io::ErrorKind::Other, e),
133                )
134            })?;
135
136            let records = inter.collect::<Result<Vec<Value>, _>>().map_err(|e| {
137                ArrowError::IoError(
138                    "failed to read bigwig records".to_string(),
139                    std::io::Error::new(std::io::ErrorKind::Other, e),
140                )
141            })?;
142
143            Ok(Self {
144                reader,
145                chroms,
146                current_records: records,
147                chrom_position: 0,
148                within_batch_position: 0,
149            })
150        } else {
151            let chroms = reader.chroms().to_vec();
152
153            if chroms.is_empty() {
154                return Err(ArrowError::InvalidArgumentError(
155                    "no chromosomes found in bigwig file".to_string(),
156                ));
157            }
158
159            let c = &chroms[0];
160            let chrom_name = &c.name;
161            let start = 0;
162            let end = c.length;
163
164            let inter = reader.get_interval(chrom_name, start, end).map_err(|e| {
165                ArrowError::IoError(
166                    e.to_string(),
167                    std::io::Error::new(std::io::ErrorKind::Other, e),
168                )
169            })?;
170
171            let records = inter.collect::<Result<Vec<Value>, _>>().map_err(|e| {
172                ArrowError::IoError(
173                    "failed to read bigwig records".to_string(),
174                    std::io::Error::new(std::io::ErrorKind::Other, e),
175                )
176            })?;
177
178            Ok(Self {
179                reader,
180                current_records: records,
181                chroms,
182                chrom_position: 0,
183                within_batch_position: 0,
184            })
185        }
186    }
187
188    pub fn chrom_name(&self) -> &str {
189        self.chroms[self.chrom_position].name.as_str()
190    }
191}
192
193impl Iterator for ValueScanner {
194    type Item = ArrowResult<(String, Value)>;
195
196    fn next(&mut self) -> Option<Self::Item> {
197        if self.within_batch_position >= self.current_records.len() {
198            self.chrom_position += 1;
199            if self.chrom_position >= self.chroms.len() {
200                return None;
201            }
202
203            let c = &self.chroms[self.chrom_position];
204            let i = self
205                .reader
206                .get_interval(&c.name, 0, c.length)
207                .map_err(|e| {
208                    ArrowError::IoError(
209                        e.to_string(),
210                        std::io::Error::new(std::io::ErrorKind::Other, e),
211                    )
212                })
213                .ok()?
214                .collect::<Result<Vec<Value>, _>>()
215                .unwrap();
216
217            self.current_records = i;
218            self.within_batch_position = 0;
219        }
220
221        let record = self.current_records[self.within_batch_position];
222        self.within_batch_position += 1;
223
224        Some(Ok((self.chrom_name().to_string(), record)))
225    }
226}
227
228#[cfg(test)]
229mod tests {
230    use std::{str::FromStr, sync::Arc};
231
232    use noodles::core::Region;
233    use object_store::local::LocalFileSystem;
234
235    use crate::value_batch_reader::{config::BigWigValueConfig, ValueRecordBatchReader};
236
237    // Test reading from a bigwig file
238    #[tokio::test]
239    async fn test_read_bigwig() -> Result<(), Box<dyn std::error::Error>> {
240        let cargo_path = std::env::var("CARGO_MANIFEST_DIR").unwrap();
241        let file_path = format!(
242            "{}/../../exon/exon-core/test-data/datasources/bigwig/test.bw",
243            cargo_path
244        );
245
246        let object_store = Arc::new(LocalFileSystem::default());
247
248        let region = Region::from_str("1:1-1000")?;
249        let config = BigWigValueConfig::new(object_store).with_some_interval(Some(region));
250
251        let mut reader = ValueRecordBatchReader::try_new(&file_path, Arc::new(config))?;
252
253        let batch = reader.read_batch()?.ok_or("no batch")?;
254
255        assert_eq!(batch.num_rows(), 4);
256        assert_eq!(batch.num_columns(), 4);
257
258        Ok(())
259    }
260}