pub mod add;
mod add_with;
mod group;
mod adjacent_group;
mod del;
pub mod error;
mod flush;
mod headers;
pub mod input;
mod inspect;
mod mock;
mod reduce;
mod rename;
mod row_stream;
mod map;
mod filter;
mod select;
pub mod prelude;
pub use crate::input::InputStream;
pub use crate::add::Column;
pub use crate::flush::Target;
pub use crate::row_stream::RowStream;
pub use crate::headers::Headers;
pub use crate::reduce::reducer::Reducer;
pub use crate::reduce::aggregate::Aggregate;
pub use crate::group::GroupCriteria;
pub use crate::error::{Error, RowResult};
pub use crate::mock::MockStream;
pub type Row = csv::StringRecord;
impl From<Headers> for Row {
fn from(headers: Headers) -> Row {
headers.into_row()
}
}
#[cfg(test)]
mod tests {
use std::f64;
use crate::prelude::*;
#[test]
fn test_from_paths_api() {
let mut chain = InputStreamBuilder::from_paths(&[
"test/assets/1.csv",
"test/assets/2.csv",
]).unwrap().build().unwrap().into_iter();
assert_eq!(chain.next().unwrap().unwrap(), Row::from(vec!["1", "3"]));
}
#[test]
fn test_add_api() {
InputStreamBuilder::from_paths(&["test/assets/1.csv"]).unwrap().build().unwrap()
.add(
Column::with_name("_target")
.from_all_previous()
.definition("data/add/output/{a}.csv")
).unwrap()
.add(
Column::with_name("new_col")
.from_column("old_column")
.with_regex("regex").unwrap()
.definition("a definition")
).unwrap()
.add_with("new_col2", |_headers, _row| {
Ok("new_value".into())
}).unwrap()
.into_iter();
}
#[test]
fn test_reduce_api() {
#[derive(Debug)]
struct Foo {
colname: String,
}
impl Aggregate for Foo {
fn update(&mut self, _headers: &Headers, _rs: &Row) -> crate::error::Result<()> {
unimplemented!()
}
fn colname(&self) -> &str {
&self.colname
}
fn value(&self) -> String {
"-".into()
}
}
InputStreamBuilder::from_paths(vec!["test/assets/1.csv"]).unwrap().build().unwrap()
.reduce(vec![
Reducer::with_name("rows").count(),
Reducer::with_name("avg").of_column("col").average().unwrap(),
Reducer::with_name("last").of_column("col").last("-").unwrap(),
Reducer::with_name("max").of_column("col").max(f64::NEG_INFINITY).unwrap(),
Reducer::with_name("min").of_column("col").min(f64::INFINITY).unwrap(),
Reducer::with_name("sum").of_column("col").sum(0.0).unwrap(),
Reducer::with_name("mul").of_column("col").product(1.0).unwrap(),
Reducer::with_name("closure").of_column("col").with_closure(|acc, cur| {
Ok(acc * cur.parse::<i32>().unwrap())
}, 1).unwrap(),
Reducer::custom(Foo { colname: String::from("custom") }),
]).unwrap()
.into_iter();
}
#[test]
fn test_filter_api() {
InputStreamBuilder::from_paths(vec!["test/assets/1.csv"]).unwrap().build().unwrap()
.filter_col("b", |value| {
value.is_empty() && value != "NaN"
}).unwrap()
.filter_row(|headers, row| {
headers.get_field(row, "column").unwrap().is_empty()
})
.into_iter();
}
#[test]
fn test_group_api() {
InputStreamBuilder::from_paths(vec!["test/assets/1.csv"]).unwrap().build().unwrap()
.adjacent_group(["a", "b"], |stream| {
stream
})
.group(|headers: &Headers, row: &Row| {
headers.get_field(row, "b").unwrap().to_string()
}, |stream| {
stream
})
.into_iter();
}
#[test]
fn test_inspect_api() {
InputStreamBuilder::from_paths(vec!["test/assets/1.csv"]).unwrap().build().unwrap()
.review(|headers, rs| {
if let Ok(row) = rs {
println!("Name: {:?}", headers.get_field(row, "name"));
}
})
.into_iter();
}
#[test]
fn test_rename_api() {
InputStreamBuilder::from_paths(vec!["test/assets/1.csv"]).unwrap().build().unwrap()
.rename("old_name", "new_name")
.into_iter();
}
#[test]
fn test_map_api() {
InputStreamBuilder::from_paths(vec!["test/assets/1.csv"]).unwrap().build().unwrap()
.map_col("colname", |value| {
Ok(value.into())
})
.map_row(|_headers, row| {
Ok(vec![
Ok(row.clone())
].into_iter())
}, |headers| {
headers.clone()
})
.into_iter();
}
#[test]
fn test_del_api() {
InputStreamBuilder::from_paths(vec!["test/assets/1.csv"]).unwrap().build().unwrap()
.del(vec!["b"])
.select(vec!["a"])
.into_iter();
}
#[test]
fn test_flush_api() {
let mut chain = InputStreamBuilder::from_paths(vec!["test/assets/1.csv"]).unwrap().build().unwrap()
.add(Column::with_name("_target").from_all_previous().definition("data/{a}.csv")).unwrap()
.flush(
Target::from_column("_target")
).unwrap()
.select(vec!["a", "b"])
.flush(
Target::path("data/a_path")
).unwrap()
.flush(
Target::stdout()
).unwrap()
.flush(
Target::stderr()
).unwrap()
.into_iter();
assert_eq!(chain.next().unwrap().unwrap(), Row::from(vec!["1", "3"]));
assert_eq!(chain.next().unwrap().unwrap(), Row::from(vec!["5", "2"]));
assert!(chain.next().is_none());
}
}