csvsc/row_stream.rs
1use std::vec;
2
3use crate::{
4 add_with, error,
5
6 add::Add, add::ColSpec, flush::Flush, headers::Headers, inspect::Inspect,
7 reduce::Reduce, Row, RowResult, add_with::AddWith, del::Del,
8 adjacent_group::AdjacentGroup, MockStream, rename::Rename, select::Select,
9 reduce::aggregate::Aggregate,
10
11 map::{MapRow, MapCol},
12
13 group::{Group, GroupCriteria},
14
15 filter::{FilterCol, FilterRow},
16};
17
18/// This trait describes de behaviour of every component in the CSV transformation
19/// chain. Functions provided by this trait help construct the chain and can be
20/// _chained_.
21///
22/// Implement this trait to extend `csvsc` with your own processors.
23pub trait RowStream: IntoIterator<Item = RowResult> {
24
25 /// Must return headers as they are in this point of the chain. For example
26 /// if implementor adds a column, its `headers()` function must return the
27 /// new headers including the one just added.
28 fn headers(&self) -> &Headers;
29
30 /// Add a column to each row of the stream.
31 ///
32 /// New columns can be build arbitrarily from previous columns or from a
33 /// specific column using a regular expression.
34 ///
35 /// ```
36 /// use csvsc::prelude::*;
37 /// use encoding::all::UTF_8;
38 ///
39 /// let mut chain = InputStreamBuilder::from_paths(&["test/assets/1.csv"])
40 /// .unwrap().build().unwrap()
41 /// .add(
42 /// Column::with_name("new column")
43 /// .from_all_previous()
44 /// .definition("{old col1} - {old col2}")
45 /// ).unwrap();
46 /// ```
47 ///
48 /// See [`Column`](struct.Column.html) for options.
49 ///
50 /// If you want to add a constant value or have some other requirement take
51 /// a look at `.add_with()`.
52 #[inline]
53 fn add(self, column: ColSpec) -> error::Result<Add<Self>>
54 where
55 Self: Sized,
56 {
57 Add::new(self, column)
58 }
59
60 /// Deletes the specified columns from each row of the stream. If you have
61 /// too many columns to delete perhaps instead use [`RowStream::select`].
62 #[inline]
63 fn del(self, columns: Vec<&str>) -> Del<Self>
64 where
65 Self: Sized,
66 {
67 Del::new(self, columns)
68 }
69
70 /// Outputs only the selected columns, ignoring the rest.
71 ///
72 /// The returned rows contain its values in the order corresponding to the
73 /// order in which the headers were given to this function. That means that
74 /// this function can be used to reorder the headers.
75 ///
76 /// If you only want do delete specific columns take a look at
77 /// [`RowStream::del`].
78 #[inline]
79 fn select(self, columns: Vec<&str>) -> Select<Self>
80 where
81 Self: Sized,
82 {
83 Select::new(self, columns)
84 }
85
86 /// Adds a column to each row of the stream using a closure to compute its
87 /// value.
88 ///
89 /// This you can use to add a constant value also.
90 ///
91 /// ## Example
92 ///
93 /// ```
94 /// use csvsc::prelude::*;
95 /// use encoding::all::UTF_8;
96 ///
97 /// let mut chain = InputStreamBuilder::from_paths(&["test/assets/1.csv"])
98 /// .unwrap().build().unwrap()
99 /// .add_with("new col", |headers, row| {
100 /// Ok("value".into())
101 /// }).unwrap();
102 /// ```
103 #[inline]
104 fn add_with<F>(self, colname: &str, f: F) -> Result<AddWith<Self, F>, add_with::BuildError>
105 where
106 Self: Sized,
107 F: FnMut(&Headers, &Row) -> error::Result<String>,
108 {
109 AddWith::new(self, colname, f)
110 }
111
112 /// Reduce all the incoming stream into one row, computing some aggregates
113 /// in the way. All the stream collapses into one row.
114 ///
115 /// The final row contains only the result of reducers and no other column
116 /// but you might preserve a column using the `.last()` aggregate.
117 ///
118 /// You'll likely be using this inside a `.group()` or `.adjacent_group()`.
119 ///
120 /// ## Example
121 ///
122 /// ```
123 /// use csvsc::prelude::*;
124 /// use encoding::all::UTF_8;
125 ///
126 /// let mut chain = InputStreamBuilder::from_paths(&["test/assets/chicken_north.csv"])
127 /// .unwrap().build().unwrap()
128 /// .group(["month"], |row_stream| {
129 /// row_stream
130 /// .reduce(vec![
131 /// Reducer::with_name("avg").of_column("eggs per week").average().unwrap(),
132 /// ]).unwrap()
133 /// });
134 /// ```
135 ///
136 /// See [`Reducer`](./struct.Reducer.html) for built-in aggregates.
137 #[inline]
138 fn reduce(
139 self,
140 columns: Vec<Box<dyn Aggregate>>,
141 ) -> error::Result<Reduce<Self>>
142 where
143 Self: Sized,
144 {
145 Reduce::new(self, columns)
146 }
147
148 /// Groups rows by the given criteria, but assuming a "group" is a set of
149 /// adjacent rows.
150 ///
151 /// This means that sets of rows that meet the same criteria but are not
152 /// adjacent will not be grouped together. Only use it if you are sure that
153 /// your data follows this pattern and you want to take advantage of it.
154 ///
155 /// An interesting advantage of using this is that only one group is kept in
156 /// memory at a time.
157 ///
158 /// See [`RowStream::group`] for more details.
159 ///
160 /// ## Example
161 ///
162 /// Consider a file `test/assets/groups.csv` with this contents. Notice that
163 /// there are four adjacent groups that have the same value for column
164 /// `name`: two with value `a` and two with `b`.
165 ///
166 /// ```text
167 /// name,value
168 /// a,1
169 /// a,1
170 /// b,2
171 /// b,2
172 /// a,3
173 /// a,3
174 /// b,4
175 /// b,4
176 /// ```
177 ///
178 /// Then the following code works as expected, generating an average for all
179 /// of the four adjacent groups that have the same value for column `name`.
180 ///
181 /// ```
182 /// use csvsc::prelude::*;
183 ///
184 /// let mut rows = InputStreamBuilder::from_paths(&["test/assets/groups.csv"]).unwrap().build().unwrap()
185 /// .adjacent_group(["name"], |row_stream| {
186 /// row_stream
187 /// .reduce(vec![
188 /// Reducer::with_name("name").of_column("name").last("").unwrap(),
189 /// Reducer::with_name("avg").of_column("value").average().unwrap(),
190 /// ]).unwrap()
191 /// })
192 /// .into_iter();
193 ///
194 /// assert_eq!(rows.next().unwrap().unwrap(), Row::from(vec!["a", "1"]));
195 /// assert_eq!(rows.next().unwrap().unwrap(), Row::from(vec!["b", "2"]));
196 /// assert_eq!(rows.next().unwrap().unwrap(), Row::from(vec!["a", "3"]));
197 /// assert_eq!(rows.next().unwrap().unwrap(), Row::from(vec!["b", "4"]));
198 /// ```
199 #[inline]
200 fn adjacent_group<F, R, G>(
201 self,
202 grouping: G,
203 f: F,
204 ) -> AdjacentGroup<Self, F, G>
205 where
206 F: Fn(MockStream<vec::IntoIter<RowResult>>) -> R,
207 R: RowStream,
208 G: GroupCriteria,
209 Self: Sized,
210 {
211 AdjacentGroup::new(self, f, grouping)
212 }
213
214 /// Groups rows by the given criteria. You'll be given a RowStream instance
215 /// as the first argument of a closure that you can use to further process
216 /// the grouped rows.
217 ///
218 /// The first argument is the group criteria and it can be any of:
219 ///
220 /// * A slice of `&str`: `&["foo", "bar"]`,
221 /// * an array of `&str`: `["foo", "bar"]`,
222 /// * a closure `Fn(&Headers, &Row) -> Hash`,
223 /// * any type that implements
224 /// [`GroupCriteria`](crate::group::GroupCriteria)
225 ///
226 /// In the first two cases the `&str`s are treated as column names. Rows
227 /// having the same values for the specified columns will belong to the same
228 /// group. Strings that don't match any column name will be ignored.
229 ///
230 /// In the closure case you'll be given the headers and every row and you
231 /// must return a hashable type that identifies the group where that row
232 /// belongs.
233 ///
234 /// [`GroupCriteria`](crate::group::GroupCriteria) is a trait you can
235 /// implement for your own types if you want to use them as grouping
236 /// criteria.
237 ///
238 /// ## Example
239 ///
240 /// Consider the following file:
241 ///
242 /// ```text
243 /// name,value
244 /// a,1
245 /// a,1
246 /// b,2
247 /// b,2
248 /// a,3
249 /// a,3
250 /// b,4
251 /// b,4
252 /// ```
253 ///
254 /// Then we can group for example using the column `name` and get the
255 /// following results:
256 ///
257 /// ```
258 /// use csvsc::prelude::*;
259 ///
260 /// let mut rows: Vec<_> = InputStreamBuilder::from_paths(&["test/assets/groups.csv"]).unwrap().build().unwrap()
261 /// .group(["name"], |row_stream| {
262 /// row_stream
263 /// .reduce(vec![
264 /// Reducer::with_name("name").of_column("name").last("").unwrap(),
265 /// Reducer::with_name("avg").of_column("value").average().unwrap(),
266 /// ]).unwrap()
267 /// })
268 /// .into_iter()
269 /// .filter_map(|r| r.ok())
270 /// .collect();
271 ///
272 /// rows.sort_by_key(|row| row.get(0).unwrap().to_string());
273 ///
274 /// assert_eq!(rows[0], Row::from(vec!["a", "2"]));
275 /// assert_eq!(rows[1], Row::from(vec!["b", "3"]));
276 /// ```
277 ///
278 /// ## Grouping by closure
279 ///
280 /// If you decide that you need an arbitrary grouping criteria you can use a
281 /// closure that returns a hashable type like this:
282 ///
283 /// ```
284 /// use csvsc::prelude::*;
285 ///
286 /// let mut rows: Vec<_> = InputStreamBuilder::from_paths(&["test/assets/groups.csv"]).unwrap().build().unwrap()
287 /// .group(|headers: &Headers, row: &Row| {
288 /// headers.get_field(row, "name").unwrap().to_string()
289 /// }, |row_stream| {
290 /// row_stream
291 /// .reduce(vec![
292 /// Reducer::with_name("name").of_column("name").last("").unwrap(),
293 /// Reducer::with_name("avg").of_column("value").average().unwrap(),
294 /// ]).unwrap()
295 /// })
296 /// .into_iter()
297 /// .filter_map(|r| r.ok())
298 /// .collect();
299 ///
300 /// rows.sort_by_key(|row| row.get(0).unwrap().to_string());
301 ///
302 /// assert_eq!(rows[0], Row::from(vec!["a", "2"]));
303 /// assert_eq!(rows[1], Row::from(vec!["b", "3"]));
304 /// ```
305 #[inline]
306 fn group<F, R, G>(
307 self,
308 grouping: G,
309 f: F,
310 ) -> Group<Self, F, G>
311 where
312 F: Fn(MockStream<vec::IntoIter<RowResult>>) -> R,
313 R: RowStream,
314 G: GroupCriteria,
315 Self: Sized,
316 {
317 Group::new(self, f, grouping)
318 }
319
320 /// When consumed, writes to destination specified by the column given in
321 /// the first argument. Other than that this behaves like an `id(x)`
322 /// function so you can specify more links in the chain and even more
323 /// flushers.
324 #[inline]
325 fn flush<T>(self, target: T) -> error::Result<Flush<Self, T>>
326 where
327 Self: Sized,
328 {
329 Flush::new(self, target)
330 }
331
332 /// Mostly for debugging, calls a closure on each element. Behaves like the
333 /// identity function on the stream returning each row untouched.
334 #[inline]
335 fn review<F>(self, f: F) -> Inspect<Self, F>
336 where
337 Self: Sized,
338 F: FnMut(&Headers, &RowResult),
339 {
340 Inspect::new(self, f)
341 }
342
343 /// Renames some columns
344 #[inline]
345 fn rename(self, old_name: &str, new_name: &str) -> Rename<Self>
346 where
347 Self: Sized,
348 {
349 Rename::new(self, old_name, new_name)
350 }
351
352 /// Apply a function to every row and use the return values to build the
353 /// row stream.
354 ///
355 /// This method accepts a closure that must return an iterator over
356 /// RowResult values, this means that you can create more rows out of a
357 /// single one.
358 ///
359 /// You're responsible of providing the new headers and for that you need to
360 /// use the second closure, that maps the old headers to the new ones.
361 ///
362 /// # Example
363 ///
364 /// ```
365 /// use csvsc::prelude::*;
366 /// use encoding::all::UTF_8;
367 ///
368 /// InputStreamBuilder::from_paths(&["test/assets/1.csv"])
369 /// .unwrap().build().unwrap()
370 /// .map_row(|_headers, row| {
371 /// // Go creative here in the creation of your new row(s)
372 /// Ok(vec![
373 /// Ok(row.clone())
374 /// ].into_iter())
375 /// }, |old_headers| {
376 /// // be responsible and provide proper headers from the old ones
377 /// old_headers.clone()
378 /// })
379 /// .into_iter();
380 /// ```
381 #[inline]
382 fn map_row<F, H, R>(self, f: F, header_map: H) -> MapRow<Self, F>
383 where
384 Self: Sized,
385 F: Fn(&Headers, &Row) -> error::Result<R>,
386 H: Fn(&Headers) -> Headers,
387 R: Iterator<Item = RowResult>,
388 {
389 let new_headers = (header_map)(self.headers());
390
391 MapRow::new(self, f, new_headers)
392 }
393
394 /// Apply a function to a single column of the stream, this function dones't fail
395 /// if the column dones't exist.
396 #[inline]
397 fn map_col<F>(self, col: &str, f: F) -> MapCol<Self, F>
398 where
399 Self: Sized,
400 F: Fn(&str) -> error::Result<String>,
401 {
402 MapCol::new(self, col.into(), f)
403 }
404
405 /// filter entire rows out depending on one column's value and a
406 /// condition, leaving errored rows untouched.
407 #[inline]
408 fn filter_col<F>(self, col: &str, f: F) -> error::Result<FilterCol<Self, F>>
409 where
410 Self: Sized,
411 F: Fn(&str) -> bool,
412 {
413 FilterCol::new(self, col.into(), f)
414 }
415
416 /// filter entire rows out depending on one column's value and a
417 /// condition, leaving errored rows untouched.
418 #[inline]
419 fn filter_row<F>(self, f: F) -> FilterRow<Self, F>
420 where
421 Self: Sized,
422 F: Fn(&Headers, &Row) -> bool,
423 {
424 FilterRow::new(self, f)
425 }
426}