csvsc/
row_stream.rs

1use std::vec;
2
3use crate::{
4    add_with, error,
5
6    add::Add, add::ColSpec, flush::Flush, headers::Headers, inspect::Inspect,
7    reduce::Reduce, Row, RowResult, add_with::AddWith, del::Del,
8    adjacent_group::AdjacentGroup, MockStream, rename::Rename, select::Select,
9    reduce::aggregate::Aggregate,
10
11    map::{MapRow, MapCol},
12
13    group::{Group, GroupCriteria},
14
15    filter::{FilterCol, FilterRow},
16};
17
18/// This trait describes de behaviour of every component in the CSV transformation
19/// chain. Functions provided by this trait help construct the chain and can be
20/// _chained_.
21///
22/// Implement this trait to extend `csvsc` with your own processors.
23pub trait RowStream: IntoIterator<Item = RowResult> {
24
25    /// Must return headers as they are in this point of the chain. For example
26    /// if implementor adds a column, its `headers()` function must return the
27    /// new headers including the one just added.
28    fn headers(&self) -> &Headers;
29
30    /// Add a column to each row of the stream.
31    ///
32    /// New columns can be build arbitrarily from previous columns or from a
33    /// specific column using a regular expression.
34    ///
35    /// ```
36    /// use csvsc::prelude::*;
37    /// use encoding::all::UTF_8;
38    ///
39    /// let mut chain = InputStreamBuilder::from_paths(&["test/assets/1.csv"])
40    ///     .unwrap().build().unwrap()
41    ///     .add(
42    ///         Column::with_name("new column")
43    ///             .from_all_previous()
44    ///             .definition("{old col1} - {old col2}")
45    ///     ).unwrap();
46    /// ```
47    ///
48    /// See [`Column`](struct.Column.html) for options.
49    ///
50    /// If you want to add a constant value or have some other requirement take
51    /// a look at `.add_with()`.
52    #[inline]
53    fn add(self, column: ColSpec) -> error::Result<Add<Self>>
54    where
55        Self: Sized,
56    {
57        Add::new(self, column)
58    }
59
60    /// Deletes the specified columns from each row of the stream. If you have
61    /// too many columns to delete perhaps instead use [`RowStream::select`].
62    #[inline]
63    fn del(self, columns: Vec<&str>) -> Del<Self>
64    where
65        Self: Sized,
66    {
67        Del::new(self, columns)
68    }
69
70    /// Outputs only the selected columns, ignoring the rest.
71    ///
72    /// The returned rows contain its values in the order corresponding to the
73    /// order in which the headers were given to this function. That means that
74    /// this function can be used to reorder the headers.
75    ///
76    /// If you only want do delete specific columns take a look at
77    /// [`RowStream::del`].
78    #[inline]
79    fn select(self, columns: Vec<&str>) -> Select<Self>
80    where
81        Self: Sized,
82    {
83        Select::new(self, columns)
84    }
85
86    /// Adds a column to each row of the stream using a closure to compute its
87    /// value.
88    ///
89    /// This you can use to add a constant value also.
90    ///
91    /// ## Example
92    ///
93    /// ```
94    /// use csvsc::prelude::*;
95    /// use encoding::all::UTF_8;
96    ///
97    /// let mut chain = InputStreamBuilder::from_paths(&["test/assets/1.csv"])
98    ///     .unwrap().build().unwrap()
99    ///     .add_with("new col", |headers, row| {
100    ///         Ok("value".into())
101    ///     }).unwrap();
102    /// ```
103    #[inline]
104    fn add_with<F>(self, colname: &str, f: F) -> Result<AddWith<Self, F>, add_with::BuildError>
105    where
106        Self: Sized,
107        F: FnMut(&Headers, &Row) -> error::Result<String>,
108    {
109        AddWith::new(self, colname, f)
110    }
111
112    /// Reduce all the incoming stream into one row, computing some aggregates
113    /// in the way. All the stream collapses into one row.
114    ///
115    /// The final row contains only the result of reducers and no other column
116    /// but you might preserve a column using the `.last()` aggregate.
117    ///
118    /// You'll likely be using this inside a `.group()` or `.adjacent_group()`.
119    ///
120    /// ## Example
121    ///
122    /// ```
123    /// use csvsc::prelude::*;
124    /// use encoding::all::UTF_8;
125    ///
126    /// let mut chain = InputStreamBuilder::from_paths(&["test/assets/chicken_north.csv"])
127    ///     .unwrap().build().unwrap()
128    ///     .group(["month"], |row_stream| {
129    ///         row_stream
130    ///             .reduce(vec![
131    ///                 Reducer::with_name("avg").of_column("eggs per week").average().unwrap(),
132    ///             ]).unwrap()
133    ///     });
134    /// ```
135    ///
136    /// See [`Reducer`](./struct.Reducer.html) for built-in aggregates.
137    #[inline]
138    fn reduce(
139        self,
140        columns: Vec<Box<dyn Aggregate>>,
141    ) -> error::Result<Reduce<Self>>
142    where
143        Self: Sized,
144    {
145        Reduce::new(self, columns)
146    }
147
148    /// Groups rows by the given criteria, but assuming a "group" is a set of
149    /// adjacent rows.
150    ///
151    /// This means that sets of rows that meet the same criteria but are not
152    /// adjacent will not be grouped together. Only use it if you are sure that
153    /// your data follows this pattern and you want to take advantage of it.
154    ///
155    /// An interesting advantage of using this is that only one group is kept in
156    /// memory at a time.
157    ///
158    /// See [`RowStream::group`] for more details.
159    ///
160    /// ## Example
161    ///
162    /// Consider a file `test/assets/groups.csv` with this contents. Notice that
163    /// there are four adjacent groups that have the same value for column
164    /// `name`: two with value `a` and two with `b`.
165    ///
166    /// ```text
167    /// name,value
168    /// a,1
169    /// a,1
170    /// b,2
171    /// b,2
172    /// a,3
173    /// a,3
174    /// b,4
175    /// b,4
176    /// ```
177    ///
178    /// Then the following code works as expected, generating an average for all
179    /// of the four adjacent groups that have the same value for column `name`.
180    ///
181    /// ```
182    /// use csvsc::prelude::*;
183    ///
184    /// let mut rows = InputStreamBuilder::from_paths(&["test/assets/groups.csv"]).unwrap().build().unwrap()
185    ///     .adjacent_group(["name"], |row_stream| {
186    ///         row_stream
187    ///             .reduce(vec![
188    ///                 Reducer::with_name("name").of_column("name").last("").unwrap(),
189    ///                 Reducer::with_name("avg").of_column("value").average().unwrap(),
190    ///             ]).unwrap()
191    ///     })
192    ///     .into_iter();
193    ///
194    /// assert_eq!(rows.next().unwrap().unwrap(), Row::from(vec!["a", "1"]));
195    /// assert_eq!(rows.next().unwrap().unwrap(), Row::from(vec!["b", "2"]));
196    /// assert_eq!(rows.next().unwrap().unwrap(), Row::from(vec!["a", "3"]));
197    /// assert_eq!(rows.next().unwrap().unwrap(), Row::from(vec!["b", "4"]));
198    /// ```
199    #[inline]
200    fn adjacent_group<F, R, G>(
201        self,
202        grouping: G,
203        f: F,
204    ) -> AdjacentGroup<Self, F, G>
205    where
206        F: Fn(MockStream<vec::IntoIter<RowResult>>) -> R,
207        R: RowStream,
208        G: GroupCriteria,
209        Self: Sized,
210    {
211        AdjacentGroup::new(self, f, grouping)
212    }
213
214    /// Groups rows by the given criteria. You'll be given a RowStream instance
215    /// as the first argument of a closure that you can use to further process
216    /// the grouped rows.
217    ///
218    /// The first argument is the group criteria and it can be any of:
219    ///
220    /// * A slice of `&str`: `&["foo", "bar"]`,
221    /// * an array of `&str`: `["foo", "bar"]`,
222    /// * a closure `Fn(&Headers, &Row) -> Hash`,
223    /// * any type that implements
224    /// [`GroupCriteria`](crate::group::GroupCriteria)
225    ///
226    /// In the first two cases the `&str`s are treated as column names. Rows
227    /// having the same values for the specified columns will belong to the same
228    /// group. Strings that don't match any column name will be ignored.
229    ///
230    /// In the closure case you'll be given the headers and every row and you
231    /// must return a hashable type that identifies the group where that row
232    /// belongs.
233    ///
234    /// [`GroupCriteria`](crate::group::GroupCriteria) is a trait you can
235    /// implement for your own types if you want to use them as grouping
236    /// criteria.
237    ///
238    /// ## Example
239    ///
240    /// Consider the following file:
241    ///
242    /// ```text
243    /// name,value
244    /// a,1
245    /// a,1
246    /// b,2
247    /// b,2
248    /// a,3
249    /// a,3
250    /// b,4
251    /// b,4
252    /// ```
253    ///
254    /// Then we can group for example using the column `name` and get the
255    /// following results:
256    ///
257    /// ```
258    /// use csvsc::prelude::*;
259    ///
260    /// let mut rows: Vec<_> = InputStreamBuilder::from_paths(&["test/assets/groups.csv"]).unwrap().build().unwrap()
261    ///     .group(["name"], |row_stream| {
262    ///         row_stream
263    ///             .reduce(vec![
264    ///                 Reducer::with_name("name").of_column("name").last("").unwrap(),
265    ///                 Reducer::with_name("avg").of_column("value").average().unwrap(),
266    ///             ]).unwrap()
267    ///     })
268    ///     .into_iter()
269    ///     .filter_map(|r| r.ok())
270    ///     .collect();
271    ///
272    /// rows.sort_by_key(|row| row.get(0).unwrap().to_string());
273    ///
274    /// assert_eq!(rows[0], Row::from(vec!["a", "2"]));
275    /// assert_eq!(rows[1], Row::from(vec!["b", "3"]));
276    /// ```
277    ///
278    /// ## Grouping by closure
279    ///
280    /// If you decide that you need an arbitrary grouping criteria you can use a
281    /// closure that returns a hashable type like this:
282    ///
283    /// ```
284    /// use csvsc::prelude::*;
285    ///
286    /// let mut rows: Vec<_> = InputStreamBuilder::from_paths(&["test/assets/groups.csv"]).unwrap().build().unwrap()
287    ///     .group(|headers: &Headers, row: &Row| {
288    ///         headers.get_field(row, "name").unwrap().to_string()
289    ///     }, |row_stream| {
290    ///         row_stream
291    ///             .reduce(vec![
292    ///                 Reducer::with_name("name").of_column("name").last("").unwrap(),
293    ///                 Reducer::with_name("avg").of_column("value").average().unwrap(),
294    ///             ]).unwrap()
295    ///     })
296    ///     .into_iter()
297    ///     .filter_map(|r| r.ok())
298    ///     .collect();
299    ///
300    /// rows.sort_by_key(|row| row.get(0).unwrap().to_string());
301    ///
302    /// assert_eq!(rows[0], Row::from(vec!["a", "2"]));
303    /// assert_eq!(rows[1], Row::from(vec!["b", "3"]));
304    /// ```
305    #[inline]
306    fn group<F, R, G>(
307        self,
308        grouping: G,
309        f: F,
310    ) -> Group<Self, F, G>
311    where
312        F: Fn(MockStream<vec::IntoIter<RowResult>>) -> R,
313        R: RowStream,
314        G: GroupCriteria,
315        Self: Sized,
316    {
317        Group::new(self, f, grouping)
318    }
319
320    /// When consumed, writes to destination specified by the column given in
321    /// the first argument. Other than that this behaves like an `id(x)`
322    /// function so you can specify more links in the chain and even more
323    /// flushers.
324    #[inline]
325    fn flush<T>(self, target: T) -> error::Result<Flush<Self, T>>
326    where
327        Self: Sized,
328    {
329        Flush::new(self, target)
330    }
331
332    /// Mostly for debugging, calls a closure on each element. Behaves like the
333    /// identity function on the stream returning each row untouched.
334    #[inline]
335    fn review<F>(self, f: F) -> Inspect<Self, F>
336    where
337        Self: Sized,
338        F: FnMut(&Headers, &RowResult),
339    {
340        Inspect::new(self, f)
341    }
342
343    /// Renames some columns
344    #[inline]
345    fn rename(self, old_name: &str, new_name: &str) -> Rename<Self>
346    where
347        Self: Sized,
348    {
349        Rename::new(self, old_name, new_name)
350    }
351
352    /// Apply a function to every row and use the return values to build the
353    /// row stream.
354    ///
355    /// This method accepts a closure that must return an iterator over
356    /// RowResult values, this means that you can create more rows out of a
357    /// single one.
358    ///
359    /// You're responsible of providing the new headers and for that you need to
360    /// use the second closure, that maps the old headers to the new ones.
361    ///
362    /// # Example
363    ///
364    /// ```
365    /// use csvsc::prelude::*;
366    /// use encoding::all::UTF_8;
367    ///
368    /// InputStreamBuilder::from_paths(&["test/assets/1.csv"])
369    ///     .unwrap().build().unwrap()
370    ///     .map_row(|_headers, row| {
371    ///         // Go creative here in the creation of your new row(s)
372    ///         Ok(vec![
373    ///             Ok(row.clone())
374    ///         ].into_iter())
375    ///     }, |old_headers| {
376    ///         // be responsible and provide proper headers from the old ones
377    ///         old_headers.clone()
378    ///     })
379    ///     .into_iter();
380    /// ```
381    #[inline]
382    fn map_row<F, H, R>(self, f: F, header_map: H) -> MapRow<Self, F>
383    where
384        Self: Sized,
385        F: Fn(&Headers, &Row) -> error::Result<R>,
386        H: Fn(&Headers) -> Headers,
387        R: Iterator<Item = RowResult>,
388    {
389        let new_headers = (header_map)(self.headers());
390
391        MapRow::new(self, f, new_headers)
392    }
393
394    /// Apply a function to a single column of the stream, this function dones't fail
395    /// if the column dones't exist.
396    #[inline]
397    fn map_col<F>(self, col: &str, f: F) -> MapCol<Self, F>
398    where
399        Self: Sized,
400        F: Fn(&str) -> error::Result<String>,
401    {
402        MapCol::new(self, col.into(), f)
403    }
404
405    /// filter entire rows out depending on one column's value and a
406    /// condition, leaving errored rows untouched.
407    #[inline]
408    fn filter_col<F>(self, col: &str, f: F) -> error::Result<FilterCol<Self, F>>
409    where
410        Self: Sized,
411        F: Fn(&str) -> bool,
412    {
413        FilterCol::new(self, col.into(), f)
414    }
415
416    /// filter entire rows out depending on one column's value and a
417    /// condition, leaving errored rows untouched.
418    #[inline]
419    fn filter_row<F>(self, f: F) -> FilterRow<Self, F>
420    where
421        Self: Sized,
422        F: Fn(&Headers, &Row) -> bool,
423    {
424        FilterRow::new(self, f)
425    }
426}