1use std::io::{self, BufRead, BufReader, Read};
13
14use thiserror::Error;
15
16use crate::bbi::{BedEntry, Value};
17use crate::utils::streaming_linereader::StreamingLineReader;
18
19pub fn parse_bed<'a>(s: &'a str) -> Option<Result<(&'a str, BedEntry), BedValueError>> {
20 let mut split = s.trim_end().splitn(4, '\t');
21 let chrom = match split.next() {
22 Some(chrom) => chrom,
23 None => return None,
24 };
25 let res = (|| {
26 let s = split
27 .next()
28 .ok_or_else(|| BedValueError::InvalidInput(format!("Missing start: {:}", s)))?;
29 let start = s
30 .parse::<u32>()
31 .map_err(|_| BedValueError::InvalidInput(format!("Invalid start: {:}", s)))?;
32 let s = split
33 .next()
34 .ok_or_else(|| BedValueError::InvalidInput(format!("Missing end: {:}", s)))?;
35 let end = s
36 .parse::<u32>()
37 .map_err(|_| BedValueError::InvalidInput(format!("Invalid end: {:}", s)))?;
38 let rest = split.next().unwrap_or("").to_string();
39 Ok((start, end, rest))
40 })();
41 match res {
42 Err(e) => Some(Err(e)),
43 Ok((start, end, rest)) => Some(Ok((chrom, BedEntry { start, end, rest }))),
44 }
45}
46
47pub fn parse_bedgraph<'a>(s: &'a str) -> Option<Result<(&'a str, Value), BedValueError>> {
48 let mut split = s.trim_end().splitn(5, '\t');
49 let chrom = match split.next() {
50 Some(chrom) => chrom,
51 None => return None,
52 };
53 let res = (|| {
54 let s = split
55 .next()
56 .ok_or_else(|| BedValueError::InvalidInput(format!("Missing start: {:}", s)))?;
57 let start = s
58 .parse::<u32>()
59 .map_err(|_| BedValueError::InvalidInput(format!("Invalid start: {:}", s)))?;
60 let s = split
61 .next()
62 .ok_or_else(|| BedValueError::InvalidInput(format!("Missing end: {:}", s)))?;
63 let end = s
64 .parse::<u32>()
65 .map_err(|_| BedValueError::InvalidInput(format!("Invalid end: {:}", s)))?;
66 let s = split
67 .next()
68 .ok_or_else(|| BedValueError::InvalidInput(format!("Missing value: {:}", s)))?;
69 let value = s
70 .parse::<f32>()
71 .map_err(|_| BedValueError::InvalidInput(format!("Invalid value: {:}", s)))?;
72 Ok((start, end, value))
73 })();
74 match res {
75 Err(e) => Some(Err(e)),
76 Ok((start, end, value)) => Some(Ok((chrom, Value { start, end, value }))),
77 }
78}
79
80pub trait StreamingBedValues {
84 type Value;
85
86 fn next(&mut self) -> Option<Result<(&str, Self::Value), BedValueError>>;
87}
88
89#[derive(Error, Debug)]
90pub enum BedValueError {
91 #[error("{}", .0)]
92 InvalidInput(String),
93 #[error("Error occurred: {}", .0)]
94 IoError(#[from] io::Error),
95}
96
97pub type Parser<V> = for<'a> fn(&'a str) -> Option<Result<(&'a str, V), BedValueError>>;
98
99pub struct BedFileStream<V, B> {
101 pub bed: StreamingLineReader<B>,
102 pub parse: Parser<V>,
103}
104
105impl<R: Read> BedFileStream<BedEntry, BufReader<R>> {
106 pub fn from_bed_file(file: R) -> BedFileStream<BedEntry, BufReader<R>> {
107 BedFileStream {
108 bed: StreamingLineReader::new(BufReader::new(file)),
109 parse: parse_bed,
110 }
111 }
112}
113
114impl<R: Read> BedFileStream<Value, BufReader<R>> {
115 pub fn from_bedgraph_file(file: R) -> BedFileStream<Value, BufReader<R>> {
116 BedFileStream {
117 bed: StreamingLineReader::new(BufReader::new(file)),
118 parse: parse_bedgraph,
119 }
120 }
121}
122
123impl<V, B: BufRead> StreamingBedValues for BedFileStream<V, B> {
124 type Value = V;
125
126 fn next(&mut self) -> Option<Result<(&str, Self::Value), BedValueError>> {
127 let line = match self.bed.read()? {
128 Ok(line) => line.trim_end(),
129 Err(e) => return Some(Err(e.into())),
130 };
131 match (self.parse)(line) {
132 None => None,
133 Some(Ok(v)) => Some(Ok(v)),
134 Some(Err(e)) => Some(Err(e.into())),
135 }
136 }
137}
138
139pub struct BedIteratorStream<V, I> {
141 pub(crate) iter: I,
142 pub(crate) curr: Option<(String, V)>,
143}
144
145impl<
146 V: Clone,
147 E: Into<BedValueError>,
148 C: Into<String> + for<'a> PartialEq<&'a str>,
149 I: Iterator<Item = Result<(C, V), E>>,
150 > StreamingBedValues for BedIteratorStream<V, I>
151{
152 type Value = V;
153
154 fn next(&mut self) -> Option<Result<(&str, V), BedValueError>> {
155 use std::ops::Deref;
156 self.curr = match (self.curr.take(), self.iter.next()?) {
157 (_, Err(e)) => return Some(Err(e.into())),
158 (Some(c), Ok(v)) => {
159 if v.0 == &c.0 {
160 Some((c.0, v.1))
161 } else {
162 Some((v.0.into(), v.1))
163 }
164 }
165 (None, Ok(v)) => Some((v.0.into(), v.1)),
166 };
167 self.curr.as_ref().map(|v| Ok((v.0.deref(), v.1.clone())))
168 }
169}
170
171pub struct BedInfallibleIteratorStream<V, I> {
173 pub(crate) iter: I,
174 pub(crate) curr: Option<(String, V)>,
175}
176
177impl<V: Clone, C: Into<String> + for<'a> PartialEq<&'a str>, I: Iterator<Item = (C, V)>>
178 StreamingBedValues for BedInfallibleIteratorStream<V, I>
179{
180 type Value = V;
181
182 fn next(&mut self) -> Option<Result<(&str, V), BedValueError>> {
183 use std::ops::Deref;
184 self.curr = match (self.curr.take(), self.iter.next()?) {
185 (Some(c), v) => {
186 if v.0 == &c.0 {
187 Some((c.0, v.1))
188 } else {
189 Some((v.0.into(), v.1))
190 }
191 }
192 (None, v) => Some((v.0.into(), v.1)),
193 };
194 self.curr.as_ref().map(|v| Ok((v.0.deref(), v.1.clone())))
195 }
196}