use std::vec;
use std::iter::Peekable;
use crate::{
RowStream, Headers, RowResult,
mock::MockStream,
group::GroupCriteria,
error,
};
pub struct AdjacentGroup<I, F, G> {
iter: I,
f: F,
headers: Headers,
old_headers: Headers,
group_by: G,
}
impl<I, F, R, G> AdjacentGroup<I, F, G>
where
I: RowStream,
F: Fn(MockStream<vec::IntoIter<RowResult>>) -> R,
R: RowStream,
{
pub fn new(
iter: I,
f: F,
grouping: G,
) -> AdjacentGroup<I, F, G> {
let old_headers = iter.headers().clone();
let headers = (&f)(MockStream::from_rows(vec![
Ok(old_headers.clone().into())
].into_iter()).unwrap()).headers().clone();
AdjacentGroup {
iter,
f,
headers,
old_headers,
group_by: grouping,
}
}
}
pub struct IntoIter<I, F, R, G>
where
I: Iterator<Item=RowResult>,
F: Fn(MockStream<vec::IntoIter<RowResult>>) -> R,
R: RowStream,
{
iter: Peekable<I>,
f: F,
headers: Headers,
old_headers: Headers,
current_group: Option<R::IntoIter>,
group_by: G,
}
impl<I, F, R, G> Iterator for IntoIter<I, F, R, G>
where
I: Iterator<Item = RowResult>,
F: Fn(MockStream<vec::IntoIter<RowResult>>) -> R,
R: RowStream,
G: GroupCriteria,
{
type Item = RowResult;
fn next(&mut self) -> Option<Self::Item> {
match self.current_group.as_mut() {
Some(group) => match group.next() {
Some(item) => Some(item),
None => {
self.current_group = None;
self.next()
},
},
None => match self.iter.peek() {
None => None,
Some(Ok(_)) => {
let first_row = self.iter.next().unwrap().unwrap();
let current_hash = self.group_by.group_for(&self.headers, &first_row);
let mut current_group = vec![Ok(first_row)];
while let Some(Ok(next_row)) = self.iter.peek() {
let next_hash = self.group_by.group_for(&self.headers, next_row);
if next_hash == current_hash {
current_group.push(self.iter.next().unwrap());
} else {
break;
}
}
let output_stream = (self.f)(
MockStream::new(current_group.into_iter(), self.old_headers.clone())
);
if *output_stream.headers() != self.headers {
return Some(Err(error::Error::InconsistentHeaders));
}
self.current_group = Some(output_stream.into_iter());
self.next()
},
Some(Err(_)) => self.iter.next(),
},
}
}
}
impl<I, F, R, G> IntoIterator for AdjacentGroup<I, F, G>
where
I: RowStream,
F: Fn(MockStream<vec::IntoIter<RowResult>>) -> R,
R: RowStream,
G: GroupCriteria,
{
type Item = RowResult;
type IntoIter = IntoIter<I::IntoIter, F, R, G>;
fn into_iter(self) -> Self::IntoIter {
Self::IntoIter {
iter: self.iter.into_iter().peekable(),
f: self.f,
headers: self.headers,
old_headers: self.old_headers,
group_by: self.group_by,
current_group: None,
}
}
}
impl<I, F, R, G> RowStream for AdjacentGroup<I, F, G>
where
I: RowStream,
F: Fn(MockStream<vec::IntoIter<RowResult>>) -> R,
R: RowStream,
G: GroupCriteria,
{
fn headers(&self) -> &Headers {
&self.headers
}
}
#[cfg(test)]
mod tests {
use crate::{
add::ColSpec,
Row, RowStream,
mock::MockStream,
error::Error,
};
use super::AdjacentGroup;
#[test]
fn test_adjacent_group() {
let iter = MockStream::from_rows(
vec![
Ok(Row::from(vec!["name", "value"])),
Ok(Row::from(vec!["a", "1"])),
Ok(Row::from(vec!["a", "2"])),
Ok(Row::from(vec!["a", "3"])),
Ok(Row::from(vec!["b", "1"])),
Ok(Row::from(vec!["b", "1"])),
Ok(Row::from(vec!["b", "1"])),
]
.into_iter(),
)
.unwrap();
let re = AdjacentGroup::new(iter, |row_stream| {
let headers = row_stream.headers().clone();
let rows: Vec<_> = row_stream.into_iter().collect();
let mut sum = 0.0;
for row in rows.iter() {
let value: f64 = headers.get_field(
row.as_ref().unwrap(),
"value"
).unwrap().parse().unwrap();
sum += value;
}
MockStream::new(rows.into_iter(), headers)
.add(ColSpec::Mix {
colname: "sum".to_string(),
coldef: sum.to_string(),
})
.unwrap()
}, ["name"]);
assert_eq!(
*re.headers(),
Row::from(vec!["name", "value", "sum"]).into(),
);
let r = re.into_iter();
let results: Vec<Row> = r.map(|i| i.unwrap()).collect();
assert_eq!(
results,
vec![
Row::from(vec!["a", "1", "6"]),
Row::from(vec!["a", "2", "6"]),
Row::from(vec!["a", "3", "6"]),
Row::from(vec!["b", "1", "3"]),
Row::from(vec!["b", "1", "3"]),
Row::from(vec!["b", "1", "3"]),
]
);
}
#[allow(clippy::cmp_owned)]
#[test]
fn test_some_errs_in_stream() {
let iter = MockStream::from_rows(
vec![
Ok(Row::from(vec!["name", "value"])),
Ok(Row::from(vec!["a", "1"])),
Err(Error::InconsistentHeaders),
Ok(Row::from(vec!["b", "1"])),
]
.into_iter(),
)
.unwrap();
let re = AdjacentGroup::new(iter, |row_stream| row_stream, ["name"]);
assert_eq!(
*re.headers(),
Row::from(vec!["name", "value"]).into(),
);
let mut r = re.into_iter();
assert!(matches!(r.next(), Some(Ok(ref r)) if *r == Row::from(vec!["a", "1"])));
assert!(matches!(r.next(), Some(Err(Error::InconsistentHeaders))));
assert!(matches!(r.next(), Some(Ok(ref r)) if *r == Row::from(vec!["b", "1"])));
assert!(matches!(r.next(), None));
}
}