csvsc 2.2.1

Build processing chains for CSV files
Documentation
use std::fmt;
use std::cmp::PartialOrd;
use std::str::FromStr;
use std::ops::{AddAssign, MulAssign};

use crate::reduce::aggregate::{
    Aggregate, Count, Avg, Last, Max, Min, Sum, Mul, Closure,
};
use crate::error;

/// An uncomplicated builder of arguments for InputStream's reduce method.
///
/// `RowStream::reduce()` accepts a vector of Aggregates. Use this struct to
/// define them using the methods listed below.
///
/// # Example
///
/// ```
/// use csvsc::prelude::*;
/// use encoding::all::UTF_8;
///
/// InputStreamBuilder::from_paths(&["test/assets/1.csv"]).unwrap().build().unwrap()
///     .reduce(vec![
///         Reducer::with_name("rows").count(),
///     ])
///     .into_iter();
/// ```
pub struct Reducer {
    name: String,
    source: Option<String>,
}

impl Reducer {
    /// Start building an aggregate by giving it a column name. This column
    /// will be present in the output row and will contain the aggregated value.
    pub fn with_name(name: &str) -> Reducer {
        Reducer {
            name: name.into(),
            source: None,
        }
    }

    /// Creates an aggregate that counts all Ok rows of the stream.
    ///
    /// # Example
    ///
    /// ```
    /// # use csvsc::prelude::*;
    /// Reducer::with_name("rows").count();
    /// ```
    pub fn count(self) -> Box<dyn Aggregate> {
        Box::new(Count::new(self.name))
    }

    /// A custom reducer, this method is not strictrly needed because you can
    /// just pass `Box::new(YourReducer::new())` to `.reduce()` but it exists
    /// for consistency.
    ///
    /// # Example
    ///
    /// ```
    /// use csvsc::prelude::*;
    ///
    /// #[derive(Debug)]
    /// struct MyAggregate {
    ///     colname: String,
    ///     state: (f64, f64),
    /// }
    ///
    /// impl Aggregate for MyAggregate {
    ///     fn update(&mut self, headers: &Headers, row: &Row) -> csvsc::error::Result<()> {
    ///         let x: f64 = headers.get_field(row, "x").unwrap().parse().unwrap();
    ///         let y: f64 = headers.get_field(row, "y").unwrap().parse().unwrap();
    ///
    ///         self.state.0 += x;
    ///         self.state.1 += y;
    ///
    ///         Ok(())
    ///     }
    ///
    ///     fn colname(&self) -> &str {
    ///         &self.colname
    ///     }
    ///
    ///     fn value(&self) -> String {
    ///         format!("{},{}", self.state.0, self.state.1)
    ///     }
    /// }
    ///
    /// Reducer::custom(MyAggregate {
    ///     colname: String::from("custom"),
    ///     state: (0.0, 0.0),
    /// });
    /// ```
    pub fn custom<'a, A>(reducer: A) -> Box<dyn Aggregate + 'a>
    where
        A: Aggregate + 'a,
    {
        Box::new(reducer)
    }

    /// Most aggregates need a source column, use this string as source column
    /// name.
    ///
    /// # Example
    ///
    /// ```
    /// # use csvsc::prelude::*;
    /// Reducer::with_name("richest").of_column("income").max(0.0);
    /// ```
    pub fn of_column(mut self, name: &str) -> Reducer {
        self.source = Some(name.into());

        self
    }

    /// Compute the average of the values in the specified column, if a value
    /// cannot be parsed to f64 returns an error row.
    ///
    /// # Example
    ///
    /// ```
    /// # use csvsc::prelude::*;
    /// Reducer::with_name("average").of_column("income").average();
    /// ```
    pub fn average(self) -> Option<Box<dyn Aggregate>> {
        Some(Box::new(Avg::new(self.name, self.source?)))
    }

    /// A simple reducer that takes the last value it sees or uses the argument
    /// given as default.
    ///
    /// Useful for just keeping a column used for grouping in an aggregate.
    ///
    /// # Example
    ///
    /// ```
    /// # use csvsc::prelude::*;
    /// Reducer::with_name("name").of_column("name").last("-");
    /// ```
    pub fn last(self, init: &str) -> Option<Box<dyn Aggregate>> {
        Some(Box::new(Last::new(self.name, self.source?, init.into())))
    }

    /// Return the maximum of the specified column's values and use `init` as
    /// default if the value cannot be parsed to T.
    ///
    /// `init` is both used to specify the type to parse to and the default.
    /// A sensible value for it would be a number smaller than all the values
    /// you expect from your data
    ///
    /// # Example
    ///
    /// ```
    /// # use csvsc::prelude::*;
    /// use std::f64;
    ///
    /// Reducer::with_name("max").of_column("height").max(f64::NEG_INFINITY);
    /// ```
    pub fn max<'a, T>(self, init: T) -> Option<Box<dyn Aggregate + 'a>>
    where
        T: fmt::Display + fmt::Debug + PartialOrd + FromStr + Copy + 'a,
        <T as FromStr>::Err: fmt::Debug,
    {
        Some(Box::new(Max::new(self.name, self.source?, init)))
    }

    /// Return the minimum of the specified column's values and use `init` as
    /// default if the value cannot be parsed to T.
    ///
    /// `init` is both used to specify the type to parse to and the default.
    /// A sensible value for it would be a number higher than all the values
    /// you expect from your data.
    ///
    /// # Example
    ///
    /// ```
    /// # use csvsc::prelude::*;
    /// use std::f64;
    ///
    /// Reducer::with_name("min").of_column("height").min(f64::INFINITY);
    /// ```
    pub fn min<'a, T>(self, init: T) -> Option<Box<dyn Aggregate + 'a>>
    where
        T: fmt::Display + fmt::Debug + PartialOrd + FromStr + Copy + 'a,
        <T as FromStr>::Err: fmt::Debug,
    {
        Some(Box::new(Min::new(self.name, self.source?, init)))
    }

    /// Return the sum of the specified column's values and use `init` as
    /// default if the value cannot be parsed to T.
    ///
    /// `init` is both used to specify the type to parse to and the default.
    /// A sensible value for it would be 0.0
    ///
    /// # Example
    ///
    /// ```
    /// # use csvsc::prelude::*;
    /// Reducer::with_name("sum").of_column("height").sum(0.0);
    /// ```
    pub fn sum<'a, T>(self, init: T) -> Option<Box<dyn Aggregate + 'a>>
    where
        T: fmt::Display + fmt::Debug + AddAssign + FromStr + Copy + 'a,
        <T as FromStr>::Err: fmt::Debug,
    {
        Some(Box::new(Sum::new(self.name, self.source?, init)))
    }

    /// Return the product of the specified column's values and use `init` as
    /// default if the value cannot be parsed to T.
    ///
    /// `init` is both used to specify the type to parse to and the default.
    /// A sensible value for it would be 1.0
    ///
    /// # Example
    ///
    /// ```
    /// # use csvsc::prelude::*;
    /// Reducer::with_name("product").of_column("height").product(1.0);
    /// ```
    pub fn product<'a, T>(self, init: T) -> Option<Box<dyn Aggregate + 'a>>
    where
        T: fmt::Display + fmt::Debug + MulAssign + FromStr + Copy + 'a,
        <T as FromStr>::Err: fmt::Debug,
    {
        Some(Box::new(Mul::new(self.name, self.source?, init)))
    }

    /// Use a closure to compute an aggregate for the specified column.
    ///
    /// Here you have absolute freedom.
    ///
    /// The first argument of the closure is the accumulator, the second is the
    /// current row's value. `init` will be used as starter.
    ///
    /// # Example
    ///
    /// ```
    /// # use csvsc::prelude::*;
    /// Reducer::with_name("closure").of_column("col").with_closure(|acc, cur| {
    ///     Ok(acc + cur.parse::<i32>().unwrap())
    /// }, 0).unwrap();
    /// ```
    pub fn with_closure<'a, F, C>(self, f: F, init: C) -> Option<Box<dyn Aggregate + 'a>>
    where
        F: FnMut(C, &str) -> error::Result<C> + 'a,
        C: fmt::Display + 'a,
    {
        Some(Box::new(Closure::new(self.name, self.source?, f, init)))
    }
}