use crate::{Headers, RowResult, RowStream, error};
use std::vec;
pub mod aggregate;
pub mod group;
pub mod reducer;
use aggregate::Aggregate;
use group::Group;
pub struct Reduce<I> {
iter: I,
columns: Vec<Box<dyn Aggregate>>,
old_headers: Headers,
headers: Headers,
}
impl<I> Reduce<I>
where
I: RowStream,
{
pub fn new(
iter: I,
columns: Vec<Box<dyn Aggregate>>,
) -> error::Result<Reduce<I>> {
let old_headers = iter.headers().clone();
let mut headers = Headers::new();
let mut whole_columns: Vec<Box<dyn Aggregate>> = Vec::with_capacity(columns.len());
for col in columns {
if !headers.add(col.colname()) {
return Err(error::Error::DuplicatedColumn(col.colname().to_string()));
}
whole_columns.push(col);
}
Ok(Reduce {
iter,
columns: whole_columns,
headers,
old_headers,
})
}
}
pub struct IntoIter {
iter: vec::IntoIter<RowResult>,
}
impl Iterator for IntoIter
{
type Item = RowResult;
fn next(&mut self) -> Option<Self::Item> {
self.iter.next()
}
}
impl<I> IntoIterator for Reduce<I>
where
I: RowStream,
{
type Item = RowResult;
type IntoIter = IntoIter;
fn into_iter(self) -> Self::IntoIter {
let mut errors = vec![];
let mut onlygroup = Group::from(self.columns);
for rs in self.iter {
match rs {
Ok(row) => if let Err(e) = onlygroup.update(&self.old_headers, &row) {
errors.push(Err(e));
}
Err(e) => errors.push(Err(e))
}
}
errors.push(Ok(onlygroup.into_row()));
IntoIter {
iter: errors.into_iter(),
}
}
}
impl<I> RowStream for Reduce<I>
where
Reduce<I>: IntoIterator<Item = RowResult>,
{
fn headers(&self) -> &Headers {
&self.headers
}
}
#[cfg(test)]
mod tests {
use std::f64;
use super::{Reduce, aggregate::{Avg, Sum, Max, Min}};
use crate::{Row, Error, mock::MockStream};
#[test]
fn test_reduce_avg() {
let iter = MockStream::from_rows(
vec![
Ok(Row::from(vec!["a", "b"])),
Ok(Row::from(vec!["1", "2"])),
Ok(Row::from(vec!["1", "4"])),
Ok(Row::from(vec!["2", "7"])),
Ok(Row::from(vec!["2", "9"])),
]
.into_iter(),
)
.unwrap();
let r = Reduce::new(iter, vec![Box::new(Avg::new("new".into(), "b".into()))])
.unwrap()
.into_iter();
let mut results: Vec<Row> = r.map(|i| i.unwrap()).collect();
results.sort_by(|a, b| a.as_slice().cmp(b.as_slice()));
assert_eq!(
results,
vec![
Row::from(vec!["5.5"]),
]
);
}
#[test]
fn test_reduce_min() {
let iter = MockStream::from_rows(
vec![
Ok(Row::from(vec!["a", "b"])),
Ok(Row::from(vec!["1", "2"])),
Ok(Row::from(vec!["1", "4"])),
Ok(Row::from(vec!["2", "7"])),
Ok(Row::from(vec!["2", "9"])),
]
.into_iter(),
)
.unwrap();
let r = Reduce::new(iter, vec![Box::new(Min::new("new".into(), "b".into(), f64::INFINITY))])
.unwrap()
.into_iter();
let mut results: Vec<Row> = r.map(|i| i.unwrap()).collect();
results.sort_by(|a, b| a.as_slice().cmp(b.as_slice()));
assert_eq!(
results,
vec![
Row::from(vec!["2"]),
]
);
}
#[test]
fn test_reduce_max() {
let iter = MockStream::from_rows(
vec![
Ok(Row::from(vec!["a", "b"])),
Ok(Row::from(vec!["1", "2"])),
Ok(Row::from(vec!["1", "4"])),
Ok(Row::from(vec!["2", "7"])),
Ok(Row::from(vec!["2", "9"])),
]
.into_iter(),
)
.unwrap();
let r = Reduce::new(iter, vec![Box::new(Max::new("new".into(), "b".into(), f64::NEG_INFINITY))])
.unwrap()
.into_iter();
let mut results: Vec<Row> = r.map(|i| i.unwrap()).collect();
results.sort_by(|a, b| a.as_slice().cmp(b.as_slice()));
assert_eq!(
results,
vec![
Row::from(vec!["9"]),
]
);
}
#[test]
fn test_reduce_sum() {
let iter = MockStream::from_rows(
vec![
Ok(Row::from(vec!["a", "b"])),
Ok(Row::from(vec!["1", "2"])),
Ok(Row::from(vec!["1", "4"])),
Ok(Row::from(vec!["2", "7"])),
Ok(Row::from(vec!["2", "9"])),
]
.into_iter(),
)
.unwrap();
let r = Reduce::new(iter, vec![Box::new(Sum::new("new".into(), "b".into(), 0.0))])
.unwrap()
.into_iter();
let mut results: Vec<Row> = r.map(|i| i.unwrap()).collect();
results.sort_by(|a, b| a.as_slice().cmp(b.as_slice()));
assert_eq!(
results,
vec![
Row::from(vec!["22"]),
]
);
}
#[test]
fn test_reduce_error() {
let iter = MockStream::from_rows(
vec![
Ok(Row::from(vec!["a", "b"])),
Err(Error::InconsistentHeaders),
Ok(Row::from(vec!["1", "2"])),
Ok(Row::from(vec!["1", "4"])),
Ok(Row::from(vec!["2", "7"])),
Ok(Row::from(vec!["2", "9"])),
]
.into_iter(),
)
.unwrap();
let mut r = Reduce::new(iter, vec![Box::new(Sum::new("new".into(), "b".into(), 0.0))])
.unwrap()
.into_iter();
match r.next().unwrap().unwrap_err() {
Error::InconsistentHeaders => {}
_ => panic!("didn't expect this"),
}
let mut results: Vec<Row> = r.map(|i| i.unwrap()).collect();
results.sort_by(|a, b| a.as_slice().cmp(b.as_slice()));
assert_eq!(
results,
vec![
Row::from(vec!["22"]),
]
);
}
}