use std::vec;
use crate::{
add_with, error,
add::Add, add::ColSpec, flush::Flush, headers::Headers, inspect::Inspect,
reduce::Reduce, Row, RowResult, add_with::AddWith, del::Del,
adjacent_group::AdjacentGroup, MockStream, rename::Rename, select::Select,
reduce::aggregate::Aggregate,
map::{MapRow, MapCol},
group::{Group, GroupCriteria},
filter::{FilterCol, FilterRow},
};
/// This trait describes de behaviour of every component in the CSV transformation
/// chain. Functions provided by this trait help construct the chain and can be
/// _chained_.
///
/// Implement this trait to extend `csvsc` with your own processors.
pub trait RowStream: IntoIterator<Item = RowResult> {
/// Must return headers as they are in this point of the chain. For example
/// if implementor adds a column, its `headers()` function must return the
/// new headers including the one just added.
fn headers(&self) -> &Headers;
/// Add a column to each row of the stream.
///
/// New columns can be build arbitrarily from previous columns or from a
/// specific column using a regular expression.
///
/// ```
/// use csvsc::prelude::*;
/// use encoding::all::UTF_8;
///
/// let mut chain = InputStreamBuilder::from_paths(&["test/assets/1.csv"])
/// .unwrap().build().unwrap()
/// .add(
/// Column::with_name("new column")
/// .from_all_previous()
/// .definition("{old col1} - {old col2}")
/// ).unwrap();
/// ```
///
/// See [`Column`](struct.Column.html) for options.
///
/// If you want to add a constant value or have some other requirement take
/// a look at `.add_with()`.
#[inline]
fn add(self, column: ColSpec) -> error::Result<Add<Self>>
where
Self: Sized,
{
Add::new(self, column)
}
/// Deletes the specified columns from each row of the stream. If you have
/// too many columns to delete perhaps instead use [`RowStream::select`].
#[inline]
fn del(self, columns: Vec<&str>) -> Del<Self>
where
Self: Sized,
{
Del::new(self, columns)
}
/// Outputs only the selected columns, ignoring the rest.
///
/// The returned rows contain its values in the order corresponding to the
/// order in which the headers were given to this function. That means that
/// this function can be used to reorder the headers.
///
/// If you only want do delete specific columns take a look at
/// [`RowStream::del`].
#[inline]
fn select(self, columns: Vec<&str>) -> Select<Self>
where
Self: Sized,
{
Select::new(self, columns)
}
/// Adds a column to each row of the stream using a closure to compute its
/// value.
///
/// This you can use to add a constant value also.
///
/// ## Example
///
/// ```
/// use csvsc::prelude::*;
/// use encoding::all::UTF_8;
///
/// let mut chain = InputStreamBuilder::from_paths(&["test/assets/1.csv"])
/// .unwrap().build().unwrap()
/// .add_with("new col", |headers, row| {
/// Ok("value".into())
/// }).unwrap();
/// ```
#[inline]
fn add_with<F>(self, colname: &str, f: F) -> Result<AddWith<Self, F>, add_with::BuildError>
where
Self: Sized,
F: FnMut(&Headers, &Row) -> error::Result<String>,
{
AddWith::new(self, colname, f)
}
/// Reduce all the incoming stream into one row, computing some aggregates
/// in the way. All the stream collapses into one row.
///
/// The final row contains only the result of reducers and no other column
/// but you might preserve a column using the `.last()` aggregate.
///
/// You'll likely be using this inside a `.group()` or `.adjacent_group()`.
///
/// ## Example
///
/// ```
/// use csvsc::prelude::*;
/// use encoding::all::UTF_8;
///
/// let mut chain = InputStreamBuilder::from_paths(&["test/assets/chicken_north.csv"])
/// .unwrap().build().unwrap()
/// .group(["month"], |row_stream| {
/// row_stream
/// .reduce(vec![
/// Reducer::with_name("avg").of_column("eggs per week").average().unwrap(),
/// ]).unwrap()
/// });
/// ```
///
/// See [`Reducer`](./struct.Reducer.html) for built-in aggregates.
#[inline]
fn reduce(
self,
columns: Vec<Box<dyn Aggregate>>,
) -> error::Result<Reduce<Self>>
where
Self: Sized,
{
Reduce::new(self, columns)
}
/// Groups rows by the given criteria, but assuming a "group" is a set of
/// adjacent rows.
///
/// This means that sets of rows that meet the same criteria but are not
/// adjacent will not be grouped together. Only use it if you are sure that
/// your data follows this pattern and you want to take advantage of it.
///
/// An interesting advantage of using this is that only one group is kept in
/// memory at a time.
///
/// See [`RowStream::group`] for more details.
///
/// ## Example
///
/// Consider a file `test/assets/groups.csv` with this contents. Notice that
/// there are four adjacent groups that have the same value for column
/// `name`: two with value `a` and two with `b`.
///
/// ```text
/// name,value
/// a,1
/// a,1
/// b,2
/// b,2
/// a,3
/// a,3
/// b,4
/// b,4
/// ```
///
/// Then the following code works as expected, generating an average for all
/// of the four adjacent groups that have the same value for column `name`.
///
/// ```
/// use csvsc::prelude::*;
///
/// let mut rows = InputStreamBuilder::from_paths(&["test/assets/groups.csv"]).unwrap().build().unwrap()
/// .adjacent_group(["name"], |row_stream| {
/// row_stream
/// .reduce(vec![
/// Reducer::with_name("name").of_column("name").last("").unwrap(),
/// Reducer::with_name("avg").of_column("value").average().unwrap(),
/// ]).unwrap()
/// })
/// .into_iter();
///
/// assert_eq!(rows.next().unwrap().unwrap(), Row::from(vec!["a", "1"]));
/// assert_eq!(rows.next().unwrap().unwrap(), Row::from(vec!["b", "2"]));
/// assert_eq!(rows.next().unwrap().unwrap(), Row::from(vec!["a", "3"]));
/// assert_eq!(rows.next().unwrap().unwrap(), Row::from(vec!["b", "4"]));
/// ```
#[inline]
fn adjacent_group<F, R, G>(
self,
grouping: G,
f: F,
) -> AdjacentGroup<Self, F, G>
where
F: Fn(MockStream<vec::IntoIter<RowResult>>) -> R,
R: RowStream,
G: GroupCriteria,
Self: Sized,
{
AdjacentGroup::new(self, f, grouping)
}
/// Groups rows by the given criteria. You'll be given a RowStream instance
/// as the first argument of a closure that you can use to further process
/// the grouped rows.
///
/// The first argument is the group criteria and it can be any of:
///
/// * A slice of `&str`: `&["foo", "bar"]`,
/// * an array of `&str`: `["foo", "bar"]`,
/// * a closure `Fn(&Headers, &Row) -> Hash`,
/// * any type that implements
/// [`GroupCriteria`](crate::group::GroupCriteria)
///
/// In the first two cases the `&str`s are treated as column names. Rows
/// having the same values for the specified columns will belong to the same
/// group. Strings that don't match any column name will be ignored.
///
/// In the closure case you'll be given the headers and every row and you
/// must return a hashable type that identifies the group where that row
/// belongs.
///
/// [`GroupCriteria`](crate::group::GroupCriteria) is a trait you can
/// implement for your own types if you want to use them as grouping
/// criteria.
///
/// ## Example
///
/// Consider the following file:
///
/// ```text
/// name,value
/// a,1
/// a,1
/// b,2
/// b,2
/// a,3
/// a,3
/// b,4
/// b,4
/// ```
///
/// Then we can group for example using the column `name` and get the
/// following results:
///
/// ```
/// use csvsc::prelude::*;
///
/// let mut rows: Vec<_> = InputStreamBuilder::from_paths(&["test/assets/groups.csv"]).unwrap().build().unwrap()
/// .group(["name"], |row_stream| {
/// row_stream
/// .reduce(vec![
/// Reducer::with_name("name").of_column("name").last("").unwrap(),
/// Reducer::with_name("avg").of_column("value").average().unwrap(),
/// ]).unwrap()
/// })
/// .into_iter()
/// .filter_map(|r| r.ok())
/// .collect();
///
/// rows.sort_by_key(|row| row.get(0).unwrap().to_string());
///
/// assert_eq!(rows[0], Row::from(vec!["a", "2"]));
/// assert_eq!(rows[1], Row::from(vec!["b", "3"]));
/// ```
///
/// ## Grouping by closure
///
/// If you decide that you need an arbitrary grouping criteria you can use a
/// closure that returns a hashable type like this:
///
/// ```
/// use csvsc::prelude::*;
///
/// let mut rows: Vec<_> = InputStreamBuilder::from_paths(&["test/assets/groups.csv"]).unwrap().build().unwrap()
/// .group(|headers: &Headers, row: &Row| {
/// headers.get_field(row, "name").unwrap().to_string()
/// }, |row_stream| {
/// row_stream
/// .reduce(vec![
/// Reducer::with_name("name").of_column("name").last("").unwrap(),
/// Reducer::with_name("avg").of_column("value").average().unwrap(),
/// ]).unwrap()
/// })
/// .into_iter()
/// .filter_map(|r| r.ok())
/// .collect();
///
/// rows.sort_by_key(|row| row.get(0).unwrap().to_string());
///
/// assert_eq!(rows[0], Row::from(vec!["a", "2"]));
/// assert_eq!(rows[1], Row::from(vec!["b", "3"]));
/// ```
#[inline]
fn group<F, R, G>(
self,
grouping: G,
f: F,
) -> Group<Self, F, G>
where
F: Fn(MockStream<vec::IntoIter<RowResult>>) -> R,
R: RowStream,
G: GroupCriteria,
Self: Sized,
{
Group::new(self, f, grouping)
}
/// When consumed, writes to destination specified by the column given in
/// the first argument. Other than that this behaves like an `id(x)`
/// function so you can specify more links in the chain and even more
/// flushers.
#[inline]
fn flush<T>(self, target: T) -> error::Result<Flush<Self, T>>
where
Self: Sized,
{
Flush::new(self, target)
}
/// Mostly for debugging, calls a closure on each element. Behaves like the
/// identity function on the stream returning each row untouched.
#[inline]
fn review<F>(self, f: F) -> Inspect<Self, F>
where
Self: Sized,
F: FnMut(&Headers, &RowResult),
{
Inspect::new(self, f)
}
/// Renames some columns
#[inline]
fn rename(self, old_name: &str, new_name: &str) -> Rename<Self>
where
Self: Sized,
{
Rename::new(self, old_name, new_name)
}
/// Apply a function to every row and use the return values to build the
/// row stream.
///
/// This method accepts a closure that must return an iterator over
/// RowResult values, this means that you can create more rows out of a
/// single one.
///
/// You're responsible of providing the new headers and for that you need to
/// use the second closure, that maps the old headers to the new ones.
///
/// # Example
///
/// ```
/// use csvsc::prelude::*;
/// use encoding::all::UTF_8;
///
/// InputStreamBuilder::from_paths(&["test/assets/1.csv"])
/// .unwrap().build().unwrap()
/// .map_row(|_headers, row| {
/// // Go creative here in the creation of your new row(s)
/// Ok(vec![
/// Ok(row.clone())
/// ].into_iter())
/// }, |old_headers| {
/// // be responsible and provide proper headers from the old ones
/// old_headers.clone()
/// })
/// .into_iter();
/// ```
#[inline]
fn map_row<F, H, R>(self, f: F, header_map: H) -> MapRow<Self, F>
where
Self: Sized,
F: Fn(&Headers, &Row) -> error::Result<R>,
H: Fn(&Headers) -> Headers,
R: Iterator<Item = RowResult>,
{
let new_headers = (header_map)(self.headers());
MapRow::new(self, f, new_headers)
}
/// Apply a function to a single column of the stream, this function dones't fail
/// if the column dones't exist.
#[inline]
fn map_col<F>(self, col: &str, f: F) -> MapCol<Self, F>
where
Self: Sized,
F: Fn(&str) -> error::Result<String>,
{
MapCol::new(self, col.into(), f)
}
/// filter entire rows out depending on one column's value and a
/// condition, leaving errored rows untouched.
#[inline]
fn filter_col<F>(self, col: &str, f: F) -> error::Result<FilterCol<Self, F>>
where
Self: Sized,
F: Fn(&str) -> bool,
{
FilterCol::new(self, col.into(), f)
}
/// filter entire rows out depending on one column's value and a
/// condition, leaving errored rows untouched.
#[inline]
fn filter_row<F>(self, f: F) -> FilterRow<Self, F>
where
Self: Sized,
F: Fn(&Headers, &Row) -> bool,
{
FilterRow::new(self, f)
}
}