csv_pipeline/
transform.rs

1use crate::{Error, Headers, Row};
2use core::fmt::Display;
3use std::collections::hash_map::DefaultHasher;
4use std::hash::{Hash, Hasher};
5use std::ops::AddAssign;
6use std::str::FromStr;
7
8/// For grouping and reducing rows.
9pub trait Transform {
10	/// Add the row to the hasher to group this row separately from others
11	fn hash(
12		&self,
13		_hasher: &mut DefaultHasher,
14		_headers: &Headers,
15		_row: &Row,
16	) -> Result<(), Error> {
17		Ok(())
18	}
19
20	/// Get the resulting column name
21	fn name(&self) -> String;
22
23	/// Combine the row with the value
24	fn add_row(&mut self, headers: &Headers, row: &Row) -> Result<(), Error>;
25
26	/// Turn the current value to a string
27	fn value(&self) -> String;
28}
29
30/// A struct for building a [`Transform`], which you can use with [`Pipeline::transform_into`](crate::Pipeline::transform_into).
31pub struct Transformer {
32	pub name: String,
33	pub from_col: String,
34}
35impl Transformer {
36	pub fn new(col_name: &str) -> Self {
37		Self {
38			name: col_name.to_string(),
39			from_col: col_name.to_string(),
40		}
41	}
42	/// Specify which column the transform should be based on
43	pub fn from_col(mut self, col_name: &str) -> Self {
44		self.from_col = col_name.to_string();
45		self
46	}
47	/// Keep the unique values from this column
48	pub fn keep_unique(self) -> Box<dyn Transform> {
49		Box::new(KeepUnique {
50			name: self.name,
51			from_col: self.from_col,
52			value: "".to_string(),
53		})
54	}
55	/// Sum the values in this column.
56	pub fn sum<'a, N>(self, init: N) -> Box<dyn Transform + 'a>
57	where
58		N: Display + AddAssign + FromStr + Clone + 'a,
59	{
60		Box::new(Sum {
61			name: self.name,
62			from_col: self.from_col,
63			value: init,
64		})
65	}
66	/// Reduce the values from this column into a single value using a closure.
67	pub fn reduce<'a, R, V>(self, reduce: R, init: V) -> Box<dyn Transform + 'a>
68	where
69		R: FnMut(V, &str) -> Result<V, Error> + 'a,
70		V: Display + Clone + 'a,
71	{
72		Box::new(Reduce {
73			name: self.name,
74			from_col: self.from_col,
75			reduce,
76			value: init,
77		})
78	}
79
80	/// Count the rows that were reduced into this row.
81	pub fn count(self) -> Box<dyn Transform> {
82		Box::new(Count {
83			name: self.name,
84			value: 0,
85		})
86	}
87}
88
89struct KeepUnique {
90	name: String,
91	from_col: String,
92	value: String,
93}
94impl Transform for KeepUnique {
95	fn hash(&self, hasher: &mut DefaultHasher, headers: &Headers, row: &Row) -> Result<(), Error> {
96		let field = headers
97			.get_field(row, &self.from_col)
98			.ok_or(Error::MissingColumn(self.from_col.clone()))?;
99		field.hash(hasher);
100		Ok(())
101	}
102
103	fn name(&self) -> String {
104		self.name.clone()
105	}
106
107	fn add_row(&mut self, headers: &Headers, row: &Row) -> Result<(), Error> {
108		self.value = headers
109			.get_field(row, &self.from_col)
110			.ok_or(Error::MissingColumn(self.from_col.clone()))?
111			.to_string();
112		Ok(())
113	}
114
115	fn value(&self) -> String {
116		self.value.clone()
117	}
118}
119
120pub(crate) fn compute_hash<'a>(
121	transformers: &Vec<Box<dyn Transform + 'a>>,
122	headers: &Headers,
123	row: &Row,
124) -> Result<u64, Error> {
125	let mut hasher = DefaultHasher::new();
126	for transformer in transformers {
127		let result = transformer.hash(&mut hasher, &headers, &row);
128		if let Err(e) = result {
129			return Err(e);
130		}
131	}
132	Ok(hasher.finish())
133}
134
135struct Reduce<F, V> {
136	name: String,
137	from_col: String,
138	reduce: F,
139	value: V,
140}
141impl<F, V> Transform for Reduce<F, V>
142where
143	F: FnMut(V, &str) -> Result<V, Error>,
144	V: Display + Clone,
145{
146	fn add_row(&mut self, headers: &Headers, row: &Row) -> Result<(), Error> {
147		let field = headers
148			.get_field(row, &self.from_col)
149			.ok_or(Error::MissingColumn(self.from_col.clone()))?
150			.to_string();
151		self.value = (self.reduce)(self.value.clone(), &field)?;
152		Ok(())
153	}
154
155	fn value(&self) -> String {
156		self.value.to_string()
157	}
158
159	fn name(&self) -> String {
160		self.name.clone()
161	}
162}
163
164struct Sum<N> {
165	name: String,
166	from_col: String,
167	value: N,
168}
169impl<V> Transform for Sum<V>
170where
171	V: Display + AddAssign + FromStr + Clone,
172{
173	fn add_row(&mut self, headers: &Headers, row: &Row) -> Result<(), Error> {
174		let field = headers
175			.get_field(row, &self.from_col)
176			.ok_or(Error::MissingColumn(self.from_col.clone()))?
177			.to_string();
178		let new: V = match field.parse() {
179			Ok(v) => v,
180			Err(_) => return Err(Error::InvalidField(field)),
181		};
182		self.value += new;
183		Ok(())
184	}
185
186	fn value(&self) -> String {
187		self.value.to_string()
188	}
189	fn name(&self) -> String {
190		self.name.clone()
191	}
192}
193
194struct Count {
195	name: String,
196	value: u128,
197}
198impl Transform for Count {
199	fn add_row(&mut self, _headers: &Headers, _row: &Row) -> Result<(), Error> {
200		self.value += 1;
201		Ok(())
202	}
203
204	fn value(&self) -> String {
205		self.value.to_string()
206	}
207	fn name(&self) -> String {
208		self.name.clone()
209	}
210}