exon_bigwig/
value_batch_reader.rs1use std::sync::Arc;
16
17use bigtools::{utils::reopen::ReopenableFile, BigWigRead, ChromInfo, Value};
18
19use arrow::{
20 array::RecordBatch,
21 error::{ArrowError, Result as ArrowResult},
22};
23use noodles::core::Region;
24
25mod array_builder;
26mod config;
27
28use self::{array_builder::ValueArrayBuilder, config::ValueReadType};
29
30pub use self::config::BigWigValueConfig;
31pub use self::config::SchemaBuilder;
32
33pub struct ValueRecordBatchReader {
34 config: Arc<BigWigValueConfig>,
35
36 scanner: ValueScanner,
37}
38
39impl ValueRecordBatchReader {
40 pub fn try_new(file_path: &str, config: Arc<BigWigValueConfig>) -> ArrowResult<Self> {
41 let reader = BigWigRead::open_file(file_path).map_err(|e| {
42 ArrowError::IoError(
43 "failed to open bigwig file".to_string(),
44 std::io::Error::new(std::io::ErrorKind::Other, e),
45 )
46 })?;
47
48 let interval = match config.read_type {
49 ValueReadType::Interval(ref interval) => Some(interval.clone()),
50 _ => None,
51 };
52
53 let scanner = ValueScanner::try_new(reader, interval)?;
54
55 Ok(Self { config, scanner })
56 }
57
58 pub fn read_batch(&mut self) -> ArrowResult<Option<RecordBatch>> {
59 let mut record_batch = ValueArrayBuilder::with_capacity(self.config.batch_size);
60
61 for val in self.scanner.by_ref().take(self.config.batch_size) {
62 let (chrom, record) = val?;
63 record_batch.append(&chrom, record);
64 }
65
66 if record_batch.is_empty() {
67 return Ok(None);
68 }
69
70 let arrays = record_batch.finish();
71
72 let batch = RecordBatch::try_new(self.config.file_schema.clone(), arrays)?;
73
74 match &self.config.projection {
75 Some(projection) => {
76 let projected_batch = batch.project(projection)?;
77
78 Ok(Some(projected_batch))
79 }
80 None => Ok(Some(batch)),
81 }
82 }
83
84 pub fn into_stream(self) -> impl futures::Stream<Item = ArrowResult<RecordBatch>> {
86 futures::stream::unfold(self, |mut reader| async move {
87 match reader.read_batch() {
88 Ok(Some(batch)) => Some((Ok(batch), reader)),
89 Ok(None) => None,
90 Err(e) => Some((Err(e), reader)),
91 }
92 })
93 }
94}
95
96struct ValueScanner {
97 reader: BigWigRead<ReopenableFile>,
98 chroms: Vec<ChromInfo>,
99 chrom_position: usize,
100 current_records: Vec<Value>,
101 within_batch_position: usize,
102}
103
104impl ValueScanner {
105 pub fn try_new(
106 mut reader: BigWigRead<ReopenableFile>,
107 interval: Option<Region>,
108 ) -> ArrowResult<Self> {
109 if let Some(region) = interval {
110 let name = std::str::from_utf8(region.name())?;
111
112 let chroms = reader
113 .chroms()
114 .iter()
115 .filter(|c| c.name == name)
116 .cloned()
117 .collect::<Vec<_>>();
118
119 let chrom = chroms.first().ok_or_else(|| {
120 ArrowError::InvalidArgumentError(format!("chromosome {} not found", name))
121 })?;
122
123 let start = region.interval().start().map_or(0, |s| s.get() as u32);
124 let end = region
125 .interval()
126 .end()
127 .map_or(chrom.length, |e| e.get() as u32);
128
129 let inter = reader.get_interval(name, start, end).map_err(|e| {
130 ArrowError::IoError(
131 e.to_string(),
132 std::io::Error::new(std::io::ErrorKind::Other, e),
133 )
134 })?;
135
136 let records = inter.collect::<Result<Vec<Value>, _>>().map_err(|e| {
137 ArrowError::IoError(
138 "failed to read bigwig records".to_string(),
139 std::io::Error::new(std::io::ErrorKind::Other, e),
140 )
141 })?;
142
143 Ok(Self {
144 reader,
145 chroms,
146 current_records: records,
147 chrom_position: 0,
148 within_batch_position: 0,
149 })
150 } else {
151 let chroms = reader.chroms().to_vec();
152
153 if chroms.is_empty() {
154 return Err(ArrowError::InvalidArgumentError(
155 "no chromosomes found in bigwig file".to_string(),
156 ));
157 }
158
159 let c = &chroms[0];
160 let chrom_name = &c.name;
161 let start = 0;
162 let end = c.length;
163
164 let inter = reader.get_interval(chrom_name, start, end).map_err(|e| {
165 ArrowError::IoError(
166 e.to_string(),
167 std::io::Error::new(std::io::ErrorKind::Other, e),
168 )
169 })?;
170
171 let records = inter.collect::<Result<Vec<Value>, _>>().map_err(|e| {
172 ArrowError::IoError(
173 "failed to read bigwig records".to_string(),
174 std::io::Error::new(std::io::ErrorKind::Other, e),
175 )
176 })?;
177
178 Ok(Self {
179 reader,
180 current_records: records,
181 chroms,
182 chrom_position: 0,
183 within_batch_position: 0,
184 })
185 }
186 }
187
188 pub fn chrom_name(&self) -> &str {
189 self.chroms[self.chrom_position].name.as_str()
190 }
191}
192
193impl Iterator for ValueScanner {
194 type Item = ArrowResult<(String, Value)>;
195
196 fn next(&mut self) -> Option<Self::Item> {
197 if self.within_batch_position >= self.current_records.len() {
198 self.chrom_position += 1;
199 if self.chrom_position >= self.chroms.len() {
200 return None;
201 }
202
203 let c = &self.chroms[self.chrom_position];
204 let i = self
205 .reader
206 .get_interval(&c.name, 0, c.length)
207 .map_err(|e| {
208 ArrowError::IoError(
209 e.to_string(),
210 std::io::Error::new(std::io::ErrorKind::Other, e),
211 )
212 })
213 .ok()?
214 .collect::<Result<Vec<Value>, _>>()
215 .unwrap();
216
217 self.current_records = i;
218 self.within_batch_position = 0;
219 }
220
221 let record = self.current_records[self.within_batch_position];
222 self.within_batch_position += 1;
223
224 Some(Ok((self.chrom_name().to_string(), record)))
225 }
226}
227
228#[cfg(test)]
229mod tests {
230 use std::{str::FromStr, sync::Arc};
231
232 use noodles::core::Region;
233 use object_store::local::LocalFileSystem;
234
235 use crate::value_batch_reader::{config::BigWigValueConfig, ValueRecordBatchReader};
236
237 #[tokio::test]
239 async fn test_read_bigwig() -> Result<(), Box<dyn std::error::Error>> {
240 let cargo_path = std::env::var("CARGO_MANIFEST_DIR").unwrap();
241 let file_path = format!(
242 "{}/../../exon/exon-core/test-data/datasources/bigwig/test.bw",
243 cargo_path
244 );
245
246 let object_store = Arc::new(LocalFileSystem::default());
247
248 let region = Region::from_str("1:1-1000")?;
249 let config = BigWigValueConfig::new(object_store).with_some_interval(Some(region));
250
251 let mut reader = ValueRecordBatchReader::try_new(&file_path, Arc::new(config))?;
252
253 let batch = reader.read_batch()?.ok_or("no batch")?;
254
255 assert_eq!(batch.num_rows(), 4);
256 assert_eq!(batch.num_columns(), 4);
257
258 Ok(())
259 }
260}