csvsc 2.2.1

Build processing chains for CSV files
Documentation
use std::vec;
use std::iter::Peekable;
use crate::{
    RowStream, Headers, RowResult,
    mock::MockStream,
    group::GroupCriteria,
    error,
};

/// Groups data by a set of columns.
///
/// The groups are passed to you as a RowStream
/// object that you can use to manipulate them. Data grouped is such that the
/// specified columns have exactly the same values. Once the value changes in the
/// stream a new group is created. It is called adjacent because it does not group
/// globally, i.e. multiple groups with the same grouping keys can be found if they
/// are not adjacent.
///
/// If you add or delete headers you're responsible for modifying the headers also,
/// which will be given to you as a parameter of the first closure.
pub struct AdjacentGroup<I, F, G> {
    iter: I,
    f: F,
    headers: Headers,
    old_headers: Headers,
    group_by: G,
}

impl<I, F, R, G> AdjacentGroup<I, F, G>
where
    I: RowStream,
    F: Fn(MockStream<vec::IntoIter<RowResult>>) -> R,
    R: RowStream,
{
    pub fn new(
        iter: I,
        f: F,
        grouping: G,
    ) -> AdjacentGroup<I, F, G> {
        let old_headers = iter.headers().clone();
        let headers = (&f)(MockStream::from_rows(vec![
            Ok(old_headers.clone().into())
        ].into_iter()).unwrap()).headers().clone();

        AdjacentGroup {
            iter,
            f,
            headers,
            old_headers,
            group_by: grouping,
        }
    }
}

pub struct IntoIter<I, F, R, G>
where
    I: Iterator<Item=RowResult>,
    F: Fn(MockStream<vec::IntoIter<RowResult>>) -> R,
    R: RowStream,
{
    iter: Peekable<I>,
    f: F,
    headers: Headers,
    old_headers: Headers,
    current_group: Option<R::IntoIter>,
    group_by: G,
}

impl<I, F, R, G> Iterator for IntoIter<I, F, R, G>
where
    I: Iterator<Item = RowResult>,
    F: Fn(MockStream<vec::IntoIter<RowResult>>) -> R,
    R: RowStream,
    G: GroupCriteria,
{
    type Item = RowResult;

    fn next(&mut self) -> Option<Self::Item> {
        // * I have an already processed group
        //   - Next item of group is some
        //      + return it
        //   - Next item of group is None
        //      + dispose group
        //      + recursive call
        // * There is no group
        //   - Next item is None
        //      + return None
        //   - Next item is Some(Ok())
        //      + build a group
        //      + store it
        //      + recursive call
        //   - Next item is Some(Err())
        //      + build an error group
        //      + store it
        //      + recursive call
        match self.current_group.as_mut() {
            Some(group) => match group.next() {
                Some(item) => Some(item),
                None => {
                    self.current_group = None;

                    self.next()
                },
            },
            None => match self.iter.peek() {
                None => None,
                Some(Ok(_)) => {
                    let first_row = self.iter.next().unwrap().unwrap();
                    let current_hash = self.group_by.group_for(&self.headers, &first_row);
                    let mut current_group = vec![Ok(first_row)];

                    while let Some(Ok(next_row)) = self.iter.peek() {
                        let next_hash = self.group_by.group_for(&self.headers, next_row);

                        if next_hash == current_hash {
                            current_group.push(self.iter.next().unwrap());
                        } else {
                            break;
                        }
                    }

                    let output_stream = (self.f)(
                        MockStream::new(current_group.into_iter(), self.old_headers.clone())
                    );

                    if *output_stream.headers() != self.headers {
                        return Some(Err(error::Error::InconsistentHeaders));
                    }

                    self.current_group = Some(output_stream.into_iter());

                    self.next()
                },
                Some(Err(_)) => self.iter.next(),
            },
        }
    }
}

impl<I, F, R, G> IntoIterator for AdjacentGroup<I, F, G>
where
    I: RowStream,
    F: Fn(MockStream<vec::IntoIter<RowResult>>) -> R,
    R: RowStream,
    G: GroupCriteria,
{
    type Item = RowResult;

    type IntoIter = IntoIter<I::IntoIter, F, R, G>;

    fn into_iter(self) -> Self::IntoIter {
        Self::IntoIter {
            iter: self.iter.into_iter().peekable(),
            f: self.f,
            headers: self.headers,
            old_headers: self.old_headers,
            group_by: self.group_by,
            current_group: None,
        }
    }
}

impl<I, F, R, G> RowStream for AdjacentGroup<I, F, G>
where
    I: RowStream,
    F: Fn(MockStream<vec::IntoIter<RowResult>>) -> R,
    R: RowStream,
    G: GroupCriteria,
{
    fn headers(&self) -> &Headers {
        &self.headers
    }
}

#[cfg(test)]
mod tests {
    use crate::{
        add::ColSpec,
        Row, RowStream,
        mock::MockStream,
        error::Error,
    };
    use super::AdjacentGroup;

    #[test]
    fn test_adjacent_group() {
        let iter = MockStream::from_rows(
            vec![
                Ok(Row::from(vec!["name", "value"])),
                Ok(Row::from(vec!["a", "1"])),
                Ok(Row::from(vec!["a", "2"])),
                Ok(Row::from(vec!["a", "3"])),
                Ok(Row::from(vec!["b", "1"])),
                Ok(Row::from(vec!["b", "1"])),
                Ok(Row::from(vec!["b", "1"])),
            ]
            .into_iter(),
        )
        .unwrap();

        let re = AdjacentGroup::new(iter, |row_stream| {
            let headers = row_stream.headers().clone();
            let rows: Vec<_> = row_stream.into_iter().collect();
            let mut sum = 0.0;

            for row in rows.iter() {
                let value: f64 = headers.get_field(
                    row.as_ref().unwrap(),
                    "value"
                ).unwrap().parse().unwrap();

                sum += value;
            }

            MockStream::new(rows.into_iter(), headers)
                .add(ColSpec::Mix {
                    colname: "sum".to_string(),
                    coldef: sum.to_string(),
                })
                .unwrap()
        }, ["name"]);

        assert_eq!(
            *re.headers(),
            Row::from(vec!["name", "value", "sum"]).into(),
        );

        let r = re.into_iter();

        let results: Vec<Row> = r.map(|i| i.unwrap()).collect();

        assert_eq!(
            results,
            vec![
                Row::from(vec!["a", "1", "6"]),
                Row::from(vec!["a", "2", "6"]),
                Row::from(vec!["a", "3", "6"]),
                Row::from(vec!["b", "1", "3"]),
                Row::from(vec!["b", "1", "3"]),
                Row::from(vec!["b", "1", "3"]),
            ]
        );
    }

    #[allow(clippy::cmp_owned)]
    #[test]
    fn test_some_errs_in_stream() {
        let iter = MockStream::from_rows(
            vec![
                Ok(Row::from(vec!["name", "value"])),
                Ok(Row::from(vec!["a", "1"])),
                Err(Error::InconsistentHeaders),
                Ok(Row::from(vec!["b", "1"])),
            ]
            .into_iter(),
        )
        .unwrap();

        let re = AdjacentGroup::new(iter, |row_stream| row_stream, ["name"]);

        assert_eq!(
            *re.headers(),
            Row::from(vec!["name", "value"]).into(),
        );

        let mut r = re.into_iter();

        assert!(matches!(r.next(), Some(Ok(ref r)) if *r == Row::from(vec!["a", "1"])));
        assert!(matches!(r.next(), Some(Err(Error::InconsistentHeaders))));
        assert!(matches!(r.next(), Some(Ok(ref r)) if *r == Row::from(vec!["b", "1"])));
        assert!(matches!(r.next(), None));
    }
}