csvsc 2.2.1

Build processing chains for CSV files
Documentation
use std::vec;

use crate::{
    add_with, error,

    add::Add, add::ColSpec, flush::Flush, headers::Headers, inspect::Inspect,
    reduce::Reduce, Row, RowResult, add_with::AddWith, del::Del,
    adjacent_group::AdjacentGroup, MockStream, rename::Rename, select::Select,
    reduce::aggregate::Aggregate,

    map::{MapRow, MapCol},

    group::{Group, GroupCriteria},

    filter::{FilterCol, FilterRow},
};

/// This trait describes de behaviour of every component in the CSV transformation
/// chain. Functions provided by this trait help construct the chain and can be
/// _chained_.
///
/// Implement this trait to extend `csvsc` with your own processors.
pub trait RowStream: IntoIterator<Item = RowResult> {

    /// Must return headers as they are in this point of the chain. For example
    /// if implementor adds a column, its `headers()` function must return the
    /// new headers including the one just added.
    fn headers(&self) -> &Headers;

    /// Add a column to each row of the stream.
    ///
    /// New columns can be build arbitrarily from previous columns or from a
    /// specific column using a regular expression.
    ///
    /// ```
    /// use csvsc::prelude::*;
    /// use encoding::all::UTF_8;
    ///
    /// let mut chain = InputStreamBuilder::from_paths(&["test/assets/1.csv"])
    ///     .unwrap().build().unwrap()
    ///     .add(
    ///         Column::with_name("new column")
    ///             .from_all_previous()
    ///             .definition("{old col1} - {old col2}")
    ///     ).unwrap();
    /// ```
    ///
    /// See [`Column`](struct.Column.html) for options.
    ///
    /// If you want to add a constant value or have some other requirement take
    /// a look at `.add_with()`.
    #[inline]
    fn add(self, column: ColSpec) -> error::Result<Add<Self>>
    where
        Self: Sized,
    {
        Add::new(self, column)
    }

    /// Deletes the specified columns from each row of the stream. If you have
    /// too many columns to delete perhaps instead use [`RowStream::select`].
    #[inline]
    fn del(self, columns: Vec<&str>) -> Del<Self>
    where
        Self: Sized,
    {
        Del::new(self, columns)
    }

    /// Outputs only the selected columns, ignoring the rest.
    ///
    /// The returned rows contain its values in the order corresponding to the
    /// order in which the headers were given to this function. That means that
    /// this function can be used to reorder the headers.
    ///
    /// If you only want do delete specific columns take a look at
    /// [`RowStream::del`].
    #[inline]
    fn select(self, columns: Vec<&str>) -> Select<Self>
    where
        Self: Sized,
    {
        Select::new(self, columns)
    }

    /// Adds a column to each row of the stream using a closure to compute its
    /// value.
    ///
    /// This you can use to add a constant value also.
    ///
    /// ## Example
    ///
    /// ```
    /// use csvsc::prelude::*;
    /// use encoding::all::UTF_8;
    ///
    /// let mut chain = InputStreamBuilder::from_paths(&["test/assets/1.csv"])
    ///     .unwrap().build().unwrap()
    ///     .add_with("new col", |headers, row| {
    ///         Ok("value".into())
    ///     }).unwrap();
    /// ```
    #[inline]
    fn add_with<F>(self, colname: &str, f: F) -> Result<AddWith<Self, F>, add_with::BuildError>
    where
        Self: Sized,
        F: FnMut(&Headers, &Row) -> error::Result<String>,
    {
        AddWith::new(self, colname, f)
    }

    /// Reduce all the incoming stream into one row, computing some aggregates
    /// in the way. All the stream collapses into one row.
    ///
    /// The final row contains only the result of reducers and no other column
    /// but you might preserve a column using the `.last()` aggregate.
    ///
    /// You'll likely be using this inside a `.group()` or `.adjacent_group()`.
    ///
    /// ## Example
    ///
    /// ```
    /// use csvsc::prelude::*;
    /// use encoding::all::UTF_8;
    ///
    /// let mut chain = InputStreamBuilder::from_paths(&["test/assets/chicken_north.csv"])
    ///     .unwrap().build().unwrap()
    ///     .group(["month"], |row_stream| {
    ///         row_stream
    ///             .reduce(vec![
    ///                 Reducer::with_name("avg").of_column("eggs per week").average().unwrap(),
    ///             ]).unwrap()
    ///     });
    /// ```
    ///
    /// See [`Reducer`](./struct.Reducer.html) for built-in aggregates.
    #[inline]
    fn reduce(
        self,
        columns: Vec<Box<dyn Aggregate>>,
    ) -> error::Result<Reduce<Self>>
    where
        Self: Sized,
    {
        Reduce::new(self, columns)
    }

    /// Groups rows by the given criteria, but assuming a "group" is a set of
    /// adjacent rows.
    ///
    /// This means that sets of rows that meet the same criteria but are not
    /// adjacent will not be grouped together. Only use it if you are sure that
    /// your data follows this pattern and you want to take advantage of it.
    ///
    /// An interesting advantage of using this is that only one group is kept in
    /// memory at a time.
    ///
    /// See [`RowStream::group`] for more details.
    ///
    /// ## Example
    ///
    /// Consider a file `test/assets/groups.csv` with this contents. Notice that
    /// there are four adjacent groups that have the same value for column
    /// `name`: two with value `a` and two with `b`.
    ///
    /// ```text
    /// name,value
    /// a,1
    /// a,1
    /// b,2
    /// b,2
    /// a,3
    /// a,3
    /// b,4
    /// b,4
    /// ```
    ///
    /// Then the following code works as expected, generating an average for all
    /// of the four adjacent groups that have the same value for column `name`.
    ///
    /// ```
    /// use csvsc::prelude::*;
    ///
    /// let mut rows = InputStreamBuilder::from_paths(&["test/assets/groups.csv"]).unwrap().build().unwrap()
    ///     .adjacent_group(["name"], |row_stream| {
    ///         row_stream
    ///             .reduce(vec![
    ///                 Reducer::with_name("name").of_column("name").last("").unwrap(),
    ///                 Reducer::with_name("avg").of_column("value").average().unwrap(),
    ///             ]).unwrap()
    ///     })
    ///     .into_iter();
    ///
    /// assert_eq!(rows.next().unwrap().unwrap(), Row::from(vec!["a", "1"]));
    /// assert_eq!(rows.next().unwrap().unwrap(), Row::from(vec!["b", "2"]));
    /// assert_eq!(rows.next().unwrap().unwrap(), Row::from(vec!["a", "3"]));
    /// assert_eq!(rows.next().unwrap().unwrap(), Row::from(vec!["b", "4"]));
    /// ```
    #[inline]
    fn adjacent_group<F, R, G>(
        self,
        grouping: G,
        f: F,
    ) -> AdjacentGroup<Self, F, G>
    where
        F: Fn(MockStream<vec::IntoIter<RowResult>>) -> R,
        R: RowStream,
        G: GroupCriteria,
        Self: Sized,
    {
        AdjacentGroup::new(self, f, grouping)
    }

    /// Groups rows by the given criteria. You'll be given a RowStream instance
    /// as the first argument of a closure that you can use to further process
    /// the grouped rows.
    ///
    /// The first argument is the group criteria and it can be any of:
    ///
    /// * A slice of `&str`: `&["foo", "bar"]`,
    /// * an array of `&str`: `["foo", "bar"]`,
    /// * a closure `Fn(&Headers, &Row) -> Hash`,
    /// * any type that implements
    /// [`GroupCriteria`](crate::group::GroupCriteria)
    ///
    /// In the first two cases the `&str`s are treated as column names. Rows
    /// having the same values for the specified columns will belong to the same
    /// group. Strings that don't match any column name will be ignored.
    ///
    /// In the closure case you'll be given the headers and every row and you
    /// must return a hashable type that identifies the group where that row
    /// belongs.
    ///
    /// [`GroupCriteria`](crate::group::GroupCriteria) is a trait you can
    /// implement for your own types if you want to use them as grouping
    /// criteria.
    ///
    /// ## Example
    ///
    /// Consider the following file:
    ///
    /// ```text
    /// name,value
    /// a,1
    /// a,1
    /// b,2
    /// b,2
    /// a,3
    /// a,3
    /// b,4
    /// b,4
    /// ```
    ///
    /// Then we can group for example using the column `name` and get the
    /// following results:
    ///
    /// ```
    /// use csvsc::prelude::*;
    ///
    /// let mut rows: Vec<_> = InputStreamBuilder::from_paths(&["test/assets/groups.csv"]).unwrap().build().unwrap()
    ///     .group(["name"], |row_stream| {
    ///         row_stream
    ///             .reduce(vec![
    ///                 Reducer::with_name("name").of_column("name").last("").unwrap(),
    ///                 Reducer::with_name("avg").of_column("value").average().unwrap(),
    ///             ]).unwrap()
    ///     })
    ///     .into_iter()
    ///     .filter_map(|r| r.ok())
    ///     .collect();
    ///
    /// rows.sort_by_key(|row| row.get(0).unwrap().to_string());
    ///
    /// assert_eq!(rows[0], Row::from(vec!["a", "2"]));
    /// assert_eq!(rows[1], Row::from(vec!["b", "3"]));
    /// ```
    ///
    /// ## Grouping by closure
    ///
    /// If you decide that you need an arbitrary grouping criteria you can use a
    /// closure that returns a hashable type like this:
    ///
    /// ```
    /// use csvsc::prelude::*;
    ///
    /// let mut rows: Vec<_> = InputStreamBuilder::from_paths(&["test/assets/groups.csv"]).unwrap().build().unwrap()
    ///     .group(|headers: &Headers, row: &Row| {
    ///         headers.get_field(row, "name").unwrap().to_string()
    ///     }, |row_stream| {
    ///         row_stream
    ///             .reduce(vec![
    ///                 Reducer::with_name("name").of_column("name").last("").unwrap(),
    ///                 Reducer::with_name("avg").of_column("value").average().unwrap(),
    ///             ]).unwrap()
    ///     })
    ///     .into_iter()
    ///     .filter_map(|r| r.ok())
    ///     .collect();
    ///
    /// rows.sort_by_key(|row| row.get(0).unwrap().to_string());
    ///
    /// assert_eq!(rows[0], Row::from(vec!["a", "2"]));
    /// assert_eq!(rows[1], Row::from(vec!["b", "3"]));
    /// ```
    #[inline]
    fn group<F, R, G>(
        self,
        grouping: G,
        f: F,
    ) -> Group<Self, F, G>
    where
        F: Fn(MockStream<vec::IntoIter<RowResult>>) -> R,
        R: RowStream,
        G: GroupCriteria,
        Self: Sized,
    {
        Group::new(self, f, grouping)
    }

    /// When consumed, writes to destination specified by the column given in
    /// the first argument. Other than that this behaves like an `id(x)`
    /// function so you can specify more links in the chain and even more
    /// flushers.
    #[inline]
    fn flush<T>(self, target: T) -> error::Result<Flush<Self, T>>
    where
        Self: Sized,
    {
        Flush::new(self, target)
    }

    /// Mostly for debugging, calls a closure on each element. Behaves like the
    /// identity function on the stream returning each row untouched.
    #[inline]
    fn review<F>(self, f: F) -> Inspect<Self, F>
    where
        Self: Sized,
        F: FnMut(&Headers, &RowResult),
    {
        Inspect::new(self, f)
    }

    /// Renames some columns
    #[inline]
    fn rename(self, old_name: &str, new_name: &str) -> Rename<Self>
    where
        Self: Sized,
    {
        Rename::new(self, old_name, new_name)
    }

    /// Apply a function to every row and use the return values to build the
    /// row stream.
    ///
    /// This method accepts a closure that must return an iterator over
    /// RowResult values, this means that you can create more rows out of a
    /// single one.
    ///
    /// You're responsible of providing the new headers and for that you need to
    /// use the second closure, that maps the old headers to the new ones.
    ///
    /// # Example
    ///
    /// ```
    /// use csvsc::prelude::*;
    /// use encoding::all::UTF_8;
    ///
    /// InputStreamBuilder::from_paths(&["test/assets/1.csv"])
    ///     .unwrap().build().unwrap()
    ///     .map_row(|_headers, row| {
    ///         // Go creative here in the creation of your new row(s)
    ///         Ok(vec![
    ///             Ok(row.clone())
    ///         ].into_iter())
    ///     }, |old_headers| {
    ///         // be responsible and provide proper headers from the old ones
    ///         old_headers.clone()
    ///     })
    ///     .into_iter();
    /// ```
    #[inline]
    fn map_row<F, H, R>(self, f: F, header_map: H) -> MapRow<Self, F>
    where
        Self: Sized,
        F: Fn(&Headers, &Row) -> error::Result<R>,
        H: Fn(&Headers) -> Headers,
        R: Iterator<Item = RowResult>,
    {
        let new_headers = (header_map)(self.headers());

        MapRow::new(self, f, new_headers)
    }

    /// Apply a function to a single column of the stream, this function dones't fail
    /// if the column dones't exist.
    #[inline]
    fn map_col<F>(self, col: &str, f: F) -> MapCol<Self, F>
    where
        Self: Sized,
        F: Fn(&str) -> error::Result<String>,
    {
        MapCol::new(self, col.into(), f)
    }

    /// filter entire rows out depending on one column's value and a
    /// condition, leaving errored rows untouched.
    #[inline]
    fn filter_col<F>(self, col: &str, f: F) -> error::Result<FilterCol<Self, F>>
    where
        Self: Sized,
        F: Fn(&str) -> bool,
    {
        FilterCol::new(self, col.into(), f)
    }

    /// filter entire rows out depending on one column's value and a
    /// condition, leaving errored rows untouched.
    #[inline]
    fn filter_row<F>(self, f: F) -> FilterRow<Self, F>
    where
        Self: Sized,
        F: Fn(&Headers, &Row) -> bool,
    {
        FilterRow::new(self, f)
    }
}