use crate::{Headers, RowResult, RowStream, Error};
use std::rc::Rc;
use std::vec;
pub mod aggregate;
pub mod group;
use aggregate::{Aggregate, AggregateError};
use group::Group;
#[derive(Debug)]
pub enum BuildError {
DuplicatedHeader(String),
}
pub struct Reduce<I> {
iter: I,
columns: Vec<Box<dyn Aggregate>>,
headers: Headers,
}
impl<I> Reduce<I>
where
I: RowStream,
{
pub fn new(
iter: I,
columns: Vec<Box<dyn Aggregate>>,
) -> Result<Reduce<I>, BuildError> {
let mut headers = iter.headers().clone();
let mut whole_columns: Vec<Box<dyn Aggregate>> = Vec::with_capacity(headers.len() + columns.len());
for header in headers.iter() {
let source = Rc::new(header.to_string());
whole_columns.push(Box::new(aggregate::Last::new(header, &source)));
}
for col in columns.iter() {
if let Err(_) = headers.add(col.colname()) {
return Err(BuildError::DuplicatedHeader(col.colname().to_string()));
}
}
for column in columns {
whole_columns.push(column);
}
Ok(Reduce {
iter,
columns: whole_columns,
headers,
})
}
}
pub struct IntoIter {
iter: vec::IntoIter<RowResult>,
}
impl Iterator for IntoIter
{
type Item = RowResult;
fn next(&mut self) -> Option<Self::Item> {
self.iter.next()
}
}
impl<I> IntoIterator for Reduce<I>
where
I: RowStream,
{
type Item = RowResult;
type IntoIter = IntoIter;
fn into_iter(self) -> Self::IntoIter {
let mut errors = vec![];
let mut onlygroup = Group::from(self.columns);
for item in self.iter {
match item {
Ok(row) => match onlygroup.update(&self.headers, &row) {
Err(e) => errors.push(Err(Error::AggregateError(e))),
_ => {},
},
Err(e) => errors.push(Err(e)),
}
}
errors.push(Ok(onlygroup.as_row()));
IntoIter {
iter: errors.into_iter(),
}
}
}
impl<I> RowStream for Reduce<I>
where
Reduce<I>: IntoIterator<Item = RowResult>,
{
fn headers(&self) -> &Headers {
&self.headers
}
}
#[cfg(test)]
mod tests {
use super::{Reduce, aggregate::{Avg, Sum, Max, Min}};
use crate::{Row, Error, col, mock::MockStream};
#[test]
fn test_reduce_id_function() {
let iter = MockStream::from_rows(
vec![
Ok(Row::from(vec!["name", "_target"])),
Ok(Row::from(vec!["a", "a"])),
Ok(Row::from(vec!["b", "a"])),
Ok(Row::from(vec!["c", "a"])),
]
.into_iter(),
)
.unwrap();
let re = Reduce::new(iter, Vec::new()).unwrap();
let r = re.into_iter();
let mut results: Vec<Row> = r.map(|i| i.unwrap()).collect();
results.sort_by(|a, b| a.as_slice().cmp(b.as_slice()));
assert_eq!(
results,
vec![
Row::from(vec!["c", "a"]),
]
);
}
#[test]
fn test_reduce_avg() {
let iter = MockStream::from_rows(
vec![
Ok(Row::from(vec!["a", "b"])),
Ok(Row::from(vec!["1", "2"])),
Ok(Row::from(vec!["1", "4"])),
Ok(Row::from(vec!["2", "7"])),
Ok(Row::from(vec!["2", "9"])),
]
.into_iter(),
)
.unwrap();
let r = Reduce::new(iter, vec![Box::new(Avg::new("new", "b"))])
.unwrap()
.into_iter();
let mut results: Vec<Row> = r.map(|i| i.unwrap()).collect();
results.sort_by(|a, b| a.as_slice().cmp(b.as_slice()));
assert_eq!(
results,
vec![
Row::from(vec!["2", "9", "5.5"]),
]
);
}
#[test]
fn test_reduce_min() {
let iter = MockStream::from_rows(
vec![
Ok(Row::from(vec!["a", "b"])),
Ok(Row::from(vec!["1", "2"])),
Ok(Row::from(vec!["1", "4"])),
Ok(Row::from(vec!["2", "7"])),
Ok(Row::from(vec!["2", "9"])),
]
.into_iter(),
)
.unwrap();
let r = Reduce::new(iter, vec![Box::new(Min::new("new", "b"))])
.unwrap()
.into_iter();
let mut results: Vec<Row> = r.map(|i| i.unwrap()).collect();
results.sort_by(|a, b| a.as_slice().cmp(b.as_slice()));
assert_eq!(
results,
vec![
Row::from(vec!["2", "9", "2"]),
]
);
}
#[test]
fn test_reduce_max() {
let iter = MockStream::from_rows(
vec![
Ok(Row::from(vec!["a", "b"])),
Ok(Row::from(vec!["1", "2"])),
Ok(Row::from(vec!["1", "4"])),
Ok(Row::from(vec!["2", "7"])),
Ok(Row::from(vec!["2", "9"])),
]
.into_iter(),
)
.unwrap();
let r = Reduce::new(iter, vec![Box::new(Max::new("new", "b"))])
.unwrap()
.into_iter();
let mut results: Vec<Row> = r.map(|i| i.unwrap()).collect();
results.sort_by(|a, b| a.as_slice().cmp(b.as_slice()));
assert_eq!(
results,
vec![
Row::from(vec!["2", "9", "9"]),
]
);
}
#[test]
fn test_reduce_sum() {
let iter = MockStream::from_rows(
vec![
Ok(Row::from(vec!["a", "b"])),
Ok(Row::from(vec!["1", "2"])),
Ok(Row::from(vec!["1", "4"])),
Ok(Row::from(vec!["2", "7"])),
Ok(Row::from(vec!["2", "9"])),
]
.into_iter(),
)
.unwrap();
let r = Reduce::new(iter, vec![Box::new(Sum::new("new", "b"))])
.unwrap()
.into_iter();
let mut results: Vec<Row> = r.map(|i| i.unwrap()).collect();
results.sort_by(|a, b| a.as_slice().cmp(b.as_slice()));
assert_eq!(
results,
vec![
Row::from(vec!["2", "9", "22"]),
]
);
}
#[test]
fn test_reduce_error() {
let iter = MockStream::from_rows(
vec![
Ok(Row::from(vec!["a", "b"])),
Err(Error::ColBuildError(col::BuildError::InvalidFormat)),
Ok(Row::from(vec!["1", "2"])),
Ok(Row::from(vec!["1", "4"])),
Ok(Row::from(vec!["2", "7"])),
Ok(Row::from(vec!["2", "9"])),
]
.into_iter(),
)
.unwrap();
let mut r = Reduce::new(iter, vec![Box::new(Sum::new("new", "b"))])
.unwrap()
.into_iter();
match r.next().unwrap().unwrap_err() {
Error::ColBuildError(col::BuildError::InvalidFormat) => {}
_ => panic!("didn't expect this"),
}
let mut results: Vec<Row> = r.map(|i| i.unwrap()).collect();
results.sort_by(|a, b| a.as_slice().cmp(b.as_slice()));
assert_eq!(
results,
vec![
Row::from(vec!["2", "9", "22"]),
]
);
}
}