use std::vec;
use std::collections::{HashMap, hash_map};
use crate::{
RowStream, Headers, RowResult,
mock::MockStream,
error,
};
mod by;
pub use by::GroupCriteria;
pub struct Group<I, F, G> {
iter: I,
f: F,
headers: Headers,
old_headers: Headers,
group_by: G,
}
impl<I, F, R, G> Group<I, F, G>
where
I: RowStream,
F: Fn(MockStream<vec::IntoIter<RowResult>>) -> R,
R: RowStream,
{
pub fn new(
iter: I,
f: F,
grouping: G,
) -> Group<I, F, G> {
let old_headers = iter.headers().clone();
let headers = (&f)(MockStream::from_rows(vec![
Ok(old_headers.clone().into())
].into_iter()).unwrap()).headers().clone();
Group {
iter,
f,
headers,
old_headers,
group_by: grouping,
}
}
}
#[derive(Hash,PartialEq,Eq)]
enum GroupKey {
Rows(u64),
Errors,
}
pub struct IntoIter<F, R>
where
F: Fn(MockStream<vec::IntoIter<RowResult>>) -> R,
R: RowStream,
{
iter: hash_map::IntoIter<GroupKey, Vec<RowResult>>,
f: F,
headers: Headers,
old_headers: Headers,
current_group: Option<R::IntoIter>,
}
impl<F, R> Iterator for IntoIter<F, R>
where
F: Fn(MockStream<vec::IntoIter<RowResult>>) -> R,
R: RowStream,
{
type Item = RowResult;
fn next(&mut self) -> Option<Self::Item> {
match self.current_group.as_mut() {
Some(group) => match group.next() {
Some(item) => Some(item),
None => {
self.current_group = None;
self.next()
},
},
None => match self.iter.next() {
None => None,
Some((_, vec)) => {
let output_stream = (self.f)(
MockStream::new(vec.into_iter(), self.old_headers.clone())
);
if *output_stream.headers() != self.headers {
return Some(Err(error::Error::InconsistentHeaders));
}
self.current_group = Some(output_stream.into_iter());
self.next()
},
},
}
}
}
impl<I, F, R, G> IntoIterator for Group<I, F, G>
where
I: RowStream,
F: Fn(MockStream<vec::IntoIter<RowResult>>) -> R,
R: RowStream,
G: GroupCriteria,
{
type Item = RowResult;
type IntoIter = IntoIter<F, R>;
fn into_iter(self) -> Self::IntoIter {
let mut groups = HashMap::new();
let iter = self.iter.into_iter();
for result in iter {
match result {
Ok(item) => {
let row_hash = self.group_by.group_for(&self.old_headers, &item);
groups
.entry(GroupKey::Rows(row_hash))
.or_insert(vec![])
.push(Ok(item));
}
Err(error) => {
groups
.entry(GroupKey::Errors)
.or_insert(vec![])
.push(Err(error));
}
}
}
IntoIter {
iter: groups.into_iter(),
f: self.f,
headers: self.headers,
old_headers: self.old_headers,
current_group: None,
}
}
}
impl<I, F, R, G> RowStream for Group<I, F, G>
where
I: RowStream,
F: Fn(MockStream<vec::IntoIter<RowResult>>) -> R,
R: RowStream,
G: GroupCriteria,
{
fn headers(&self) -> &Headers {
&self.headers
}
}
#[cfg(test)]
mod tests {
use crate::{
Row, RowStream, Headers,
mock::MockStream,
error::Error,
add::ColSpec, input::InputStreamBuilder,
add::Column,
reduce::reducer::Reducer,
};
use super::Group;
#[test]
fn test_group() {
let iter = MockStream::from_rows(
vec![
Ok(Row::from(vec!["name", "value"])),
Ok(Row::from(vec!["a", "1"])),
Ok(Row::from(vec!["a", "2"])),
Ok(Row::from(vec!["a", "3"])),
Ok(Row::from(vec!["b", "1"])),
Ok(Row::from(vec!["b", "1"])),
Ok(Row::from(vec!["b", "1"])),
]
.into_iter(),
)
.unwrap();
let re = Group::new(iter, |row_stream| {
let headers = row_stream.headers().clone();
let rows: Vec<_> = row_stream.into_iter().collect();
let mut sum = 0.0;
for row in rows.iter() {
let value: f64 = headers.get_field(
row.as_ref().unwrap(),
"value"
).unwrap().parse().unwrap();
sum += value;
}
MockStream::new(rows.into_iter(), headers)
.add(ColSpec::Mix {
colname: "sum".to_string(),
coldef: sum.to_string(),
})
.unwrap()
}, ["name"]);
let headers = Row::from(vec!["name", "value", "sum"]).into();
assert_eq!(
*re.headers(),
headers,
);
let r = re.into_iter();
let mut results: Vec<Row> = r.map(|i| i.unwrap()).collect();
results.sort_by(|a, b| {
headers.get_field(a, "sum").unwrap().cmp(headers.get_field(b, "sum").unwrap())
});
assert_eq!(
results,
vec![
Row::from(vec!["b", "1", "3"]),
Row::from(vec!["b", "1", "3"]),
Row::from(vec!["b", "1", "3"]),
Row::from(vec!["a", "1", "6"]),
Row::from(vec!["a", "2", "6"]),
Row::from(vec!["a", "3", "6"]),
]
);
}
#[test]
fn test_group_noadjacent() {
let iter = MockStream::from_rows(
vec![
Ok(Row::from(vec!["name", "value"])),
Ok(Row::from(vec!["a", "1"])),
Ok(Row::from(vec!["a", "2"])),
Ok(Row::from(vec!["b", "1"])),
Ok(Row::from(vec!["a", "3"])),
Ok(Row::from(vec!["b", "1"])),
Ok(Row::from(vec!["b", "1"])),
]
.into_iter(),
)
.unwrap();
let re = Group::new(iter, |row_stream| {
let headers = row_stream.headers().clone();
let rows: Vec<_> = row_stream.into_iter().collect();
let mut sum = 0.0;
for row in rows.iter() {
let value: f64 = headers.get_field(
row.as_ref().unwrap(),
"value"
).unwrap().parse().unwrap();
sum += value;
}
MockStream::new(rows.into_iter(), headers)
.add(ColSpec::Mix {
colname: "sum".to_string(),
coldef: sum.to_string(),
})
.unwrap()
}, ["name"]);
let headers = Row::from(vec!["name", "value", "sum"]).into();
assert_eq!(
*re.headers(),
headers,
);
let r = re.into_iter();
let mut results: Vec<Row> = r.map(|i| i.unwrap()).collect();
results.sort_by(|a, b| {
headers.get_field(a, "sum").unwrap().cmp(headers.get_field(b, "sum").unwrap())
});
assert_eq!(
results,
vec![
Row::from(vec!["b", "1", "3"]),
Row::from(vec!["b", "1", "3"]),
Row::from(vec!["b", "1", "3"]),
Row::from(vec!["a", "1", "6"]),
Row::from(vec!["a", "2", "6"]),
Row::from(vec!["a", "3", "6"]),
]
);
}
#[test]
fn test_some_errs_in_stream() {
let iter = MockStream::from_rows(
vec![
Ok(Row::from(vec!["name", "value"])),
Ok(Row::from(vec!["a", "1"])),
Err(Error::InconsistentHeaders),
Ok(Row::from(vec!["b", "1"])),
]
.into_iter(),
)
.unwrap();
let re = Group::new(iter, |row_stream| row_stream, ["name"]);
assert_eq!(
*re.headers(),
Row::from(vec!["name", "value"]).into(),
);
let err = re.into_iter()
.find(|item| matches!(item, Err(Error::InconsistentHeaders)))
.unwrap();
match err {
Err(Error::InconsistentHeaders) => {},
_ => unreachable!(),
}
}
#[test]
fn test_new_group_api() {
let data = InputStreamBuilder::from_paths(vec![
"test/assets/chicken_north.csv",
"test/assets/chicken_south.csv",
]).unwrap().with_source_col("_source").build().unwrap()
.filter_col("eggs per week", |value| {
!value.is_empty() && value != "NaN"
}).unwrap()
.add(
Column::with_name("region")
.from_column("_source")
.with_regex("_([a-z]+).csv").unwrap()
.definition("$1")
).unwrap()
.group(["region", "month"], |row_stream| {
row_stream.reduce(vec![
Reducer::with_name("region").of_column("region").last("").unwrap(),
Reducer::with_name("month").of_column("month").last("").unwrap(),
Reducer::with_name("avg").of_column("eggs per week").average().unwrap(),
Reducer::with_name("sum").of_column("eggs per week").sum(0.0).unwrap(),
]).unwrap()
})
.into_iter().map(|row| row.unwrap());
assert_eq!(data.count(), 6);
}
#[test]
fn test_group_by_closure() {
let data = InputStreamBuilder::from_paths(vec![
"test/assets/chicken_north.csv",
"test/assets/chicken_south.csv",
]).unwrap().with_source_col("_source").build().unwrap()
.filter_col("eggs per week", |value| {
!value.is_empty() && value != "NaN"
}).unwrap()
.add(
Column::with_name("region")
.from_column("_source")
.with_regex("_([a-z]+).csv").unwrap()
.definition("$1")
).unwrap()
.group(|headers: &Headers, row: &Row| {
headers.get_field(row, "region").unwrap().to_string()
}, |row_stream| {
row_stream.reduce(vec![
Reducer::with_name("region").of_column("region").last("").unwrap(),
Reducer::with_name("avg").of_column("eggs per week").average().unwrap(),
Reducer::with_name("sum").of_column("eggs per week").sum(0.0).unwrap(),
]).unwrap()
}).into_iter().map(|row| row.unwrap());
assert_eq!(data.count(), 2);
}
}