csvsc 2.2.1

Build processing chains for CSV files
Documentation
use crate::{Headers, RowResult, RowStream, error};
use std::vec;

pub mod aggregate;
pub mod group;
pub mod reducer;

use aggregate::Aggregate;
use group::Group;

/// Used to aggregate the given rows, yielding the results as a new stream
/// of rows with potentially new columns.
pub struct Reduce<I> {
    iter: I,
    columns: Vec<Box<dyn Aggregate>>,
    old_headers: Headers,
    headers: Headers,
}

impl<I> Reduce<I>
where
    I: RowStream,
{
    /// Creates a new Reduce from an implementor of the RowStream trait, a set
    /// of column names for grouping and a set of aggregates to calculate and
    /// add as columns.
    pub fn new(
        iter: I,
        columns: Vec<Box<dyn Aggregate>>,
    ) -> error::Result<Reduce<I>> {
        let old_headers = iter.headers().clone();
        let mut headers = Headers::new();
        let mut whole_columns: Vec<Box<dyn Aggregate>> = Vec::with_capacity(columns.len());

        for col in columns {
            if !headers.add(col.colname()) {
                return Err(error::Error::DuplicatedColumn(col.colname().to_string()));
            }

            whole_columns.push(col);
        }

        Ok(Reduce {
            iter,
            columns: whole_columns,
            headers,
            old_headers,
        })
    }
}

pub struct IntoIter {
    iter: vec::IntoIter<RowResult>,
}

impl Iterator for IntoIter
{
    type Item = RowResult;

    fn next(&mut self) -> Option<Self::Item> {
        self.iter.next()
    }
}

impl<I> IntoIterator for Reduce<I>
where
    I: RowStream,
{
    type Item = RowResult;

    type IntoIter = IntoIter;

    fn into_iter(self) -> Self::IntoIter {
        let mut errors = vec![];
        let mut onlygroup = Group::from(self.columns);

        for rs in self.iter {
            match rs {
                Ok(row) => if let Err(e) = onlygroup.update(&self.old_headers, &row) {
                    errors.push(Err(e));
                }
                Err(e) => errors.push(Err(e))
            }
        }

        errors.push(Ok(onlygroup.into_row()));

        IntoIter {
            iter: errors.into_iter(),
        }
    }
}

impl<I> RowStream for Reduce<I>
where
    Reduce<I>: IntoIterator<Item = RowResult>,
{
    fn headers(&self) -> &Headers {
        &self.headers
    }
}

#[cfg(test)]
mod tests {
    use std::f64;

    use super::{Reduce, aggregate::{Avg, Sum, Max, Min}};
    use crate::{Row, Error, mock::MockStream};

    #[test]
    fn test_reduce_avg() {
        let iter = MockStream::from_rows(
            vec![
                Ok(Row::from(vec!["a", "b"])),
                Ok(Row::from(vec!["1", "2"])),
                Ok(Row::from(vec!["1", "4"])),
                Ok(Row::from(vec!["2", "7"])),
                Ok(Row::from(vec!["2", "9"])),
            ]
            .into_iter(),
        )
        .unwrap();

        let r = Reduce::new(iter, vec![Box::new(Avg::new("new".into(), "b".into()))])
            .unwrap()
            .into_iter();

        let mut results: Vec<Row> = r.map(|i| i.unwrap()).collect();

        results.sort_by(|a, b| a.as_slice().cmp(b.as_slice()));

        assert_eq!(
            results,
            vec![
                Row::from(vec!["5.5"]),
            ]
        );
    }

    #[test]
    fn test_reduce_min() {
        let iter = MockStream::from_rows(
            vec![
                Ok(Row::from(vec!["a", "b"])),
                Ok(Row::from(vec!["1", "2"])),
                Ok(Row::from(vec!["1", "4"])),
                Ok(Row::from(vec!["2", "7"])),
                Ok(Row::from(vec!["2", "9"])),
            ]
            .into_iter(),
        )
        .unwrap();

        let r = Reduce::new(iter, vec![Box::new(Min::new("new".into(), "b".into(), f64::INFINITY))])
            .unwrap()
            .into_iter();

        let mut results: Vec<Row> = r.map(|i| i.unwrap()).collect();

        results.sort_by(|a, b| a.as_slice().cmp(b.as_slice()));

        assert_eq!(
            results,
            vec![
                Row::from(vec!["2"]),
            ]
        );
    }

    #[test]
    fn test_reduce_max() {
        let iter = MockStream::from_rows(
            vec![
                Ok(Row::from(vec!["a", "b"])),
                Ok(Row::from(vec!["1", "2"])),
                Ok(Row::from(vec!["1", "4"])),
                Ok(Row::from(vec!["2", "7"])),
                Ok(Row::from(vec!["2", "9"])),
            ]
            .into_iter(),
        )
        .unwrap();

        let r = Reduce::new(iter, vec![Box::new(Max::new("new".into(), "b".into(), f64::NEG_INFINITY))])
            .unwrap()
            .into_iter();

        let mut results: Vec<Row> = r.map(|i| i.unwrap()).collect();

        results.sort_by(|a, b| a.as_slice().cmp(b.as_slice()));

        assert_eq!(
            results,
            vec![
                Row::from(vec!["9"]),
            ]
        );
    }

    #[test]
    fn test_reduce_sum() {
        let iter = MockStream::from_rows(
            vec![
                Ok(Row::from(vec!["a", "b"])),
                Ok(Row::from(vec!["1", "2"])),
                Ok(Row::from(vec!["1", "4"])),
                Ok(Row::from(vec!["2", "7"])),
                Ok(Row::from(vec!["2", "9"])),
            ]
            .into_iter(),
        )
        .unwrap();

        let r = Reduce::new(iter, vec![Box::new(Sum::new("new".into(), "b".into(), 0.0))])
            .unwrap()
            .into_iter();

        let mut results: Vec<Row> = r.map(|i| i.unwrap()).collect();

        results.sort_by(|a, b| a.as_slice().cmp(b.as_slice()));

        assert_eq!(
            results,
            vec![
                Row::from(vec!["22"]),
            ]
        );
    }

    #[test]
    fn test_reduce_error() {
        let iter = MockStream::from_rows(
            vec![
                Ok(Row::from(vec!["a", "b"])),
                Err(Error::InconsistentHeaders),
                Ok(Row::from(vec!["1", "2"])),
                Ok(Row::from(vec!["1", "4"])),
                Ok(Row::from(vec!["2", "7"])),
                Ok(Row::from(vec!["2", "9"])),
            ]
            .into_iter(),
        )
        .unwrap();

        let mut r = Reduce::new(iter, vec![Box::new(Sum::new("new".into(), "b".into(), 0.0))])
            .unwrap()
            .into_iter();

        match r.next().unwrap().unwrap_err() {
            Error::InconsistentHeaders => {}
            _ => panic!("didn't expect this"),
        }

        let mut results: Vec<Row> = r.map(|i| i.unwrap()).collect();

        results.sort_by(|a, b| a.as_slice().cmp(b.as_slice()));

        assert_eq!(
            results,
            vec![
                Row::from(vec!["22"]),
            ]
        );
    }
}