csv_pipeline/
pipeline.rs

1use super::headers::Headers;
2use crate::pipeline_iterators::{
3	AddCol, Filter, FilterCol, Flush, MapCol, MapRow, PipelinesChain, Select, TransformInto,
4	Validate, ValidateCol,
5};
6use crate::target::{StringTarget, Target};
7use crate::transform::Transform;
8use crate::{Error, PlError, Row, RowResult};
9use csv::{Reader, ReaderBuilder, StringRecordsIntoIter};
10use linked_hash_map::LinkedHashMap;
11use std::borrow::BorrowMut;
12use std::io;
13use std::path::Path;
14
15/// The main thing
16pub struct Pipeline<'a> {
17	pub headers: Headers,
18	pub(crate) source: usize,
19	iterator: Box<dyn Iterator<Item = RowResult> + 'a>,
20}
21
22impl<'a> Pipeline<'a> {
23	pub fn from_reader<R: io::Read + 'a>(mut reader: Reader<R>) -> Result<Self, PlError> {
24		let headers_row = reader.headers().unwrap().clone();
25		let row_iterator = RowIter::from_records(0, reader.into_records());
26		Ok(Pipeline {
27			headers: match Headers::from_row(headers_row) {
28				Ok(headers) => headers,
29				Err(duplicated_col) => {
30					return Err(Error::DuplicateColumn(duplicated_col).at_source(0))
31				}
32			},
33			source: 0,
34			iterator: Box::new(row_iterator),
35		})
36	}
37
38	/// Create a pipeline from a CSV or TSV file.
39	pub fn from_path<P: AsRef<Path>>(file_path: P) -> Result<Self, PlError> {
40		let ext = file_path.as_ref().extension().unwrap_or_default();
41		let delimiter = match ext.to_string_lossy().as_ref() {
42			"tsv" => b'\t',
43			"csv" => b',',
44			_ => panic!("Unsupported file {}", file_path.as_ref().display()),
45		};
46		let reader_result = ReaderBuilder::new()
47			.delimiter(delimiter)
48			.from_path(file_path);
49		match reader_result {
50			Ok(reader) => Self::from_reader(reader),
51			Err(e) => Err(Error::Csv(e).at_source(0)),
52		}
53	}
54
55	pub fn from_rows<I: IntoIterator<Item = Row>>(records: I) -> Result<Self, PlError>
56	where
57		<I as IntoIterator>::IntoIter: 'a,
58	{
59		let mut records = records.into_iter();
60		let headers_row = records.next().unwrap();
61		let row_iterator = records.map(|row| -> RowResult {
62			return Ok(row);
63		});
64		Ok(Pipeline {
65			headers: match Headers::from_row(headers_row) {
66				Ok(headers) => headers,
67				Err(duplicated_col) => {
68					return Err(Error::DuplicateColumn(duplicated_col).at_source(0))
69				}
70			},
71			source: 0,
72			iterator: Box::new(row_iterator),
73		})
74	}
75
76	/// Merge multiple source pipelines into one. The source pipelines must have identical headers, otherwise the pipelie will return a [`MismatchedHeaders`](Error::MismatchedHeaders) error  returned.
77	///
78	/// ## Example
79	///
80	/// ```
81	/// use csv_pipeline::Pipeline;
82	///
83	/// let csv = Pipeline::from_pipelines(vec![
84	///   Pipeline::from_path("test/AB.csv").unwrap(),
85	///   Pipeline::from_path("test/AB.csv").unwrap(),
86	/// ])
87	///   .collect_into_string()
88	///   .unwrap();
89	///
90	/// assert_eq!(csv, "A,B\n1,2\n1,2\n");
91	/// ```
92	pub fn from_pipelines<I>(pipelines: I) -> Self
93	where
94		I: IntoIterator<Item = Pipeline<'a>>,
95		<I as IntoIterator>::IntoIter: 'a,
96	{
97		let mut pipelines = pipelines.into_iter();
98		let current = pipelines.next();
99		let headers = match current {
100			Some(ref pipeline) => pipeline.headers.clone(),
101			None => Headers::new(),
102		};
103		Pipeline {
104			headers: headers.clone(),
105			source: 0,
106			iterator: Box::new(PipelinesChain {
107				pipelines,
108				current: current.map(|p| p.build()),
109				index: 0,
110				headers,
111			}),
112		}
113	}
114
115	/// Adds a column with values computed from the closure for each row.
116	///
117	/// ## Example
118	///
119	/// ```
120	/// use csv_pipeline::Pipeline;
121	///
122	/// let csv = Pipeline::from_path("test/AB.csv")
123	///   .unwrap()
124	///   .add_col("C", |headers, row| {
125	///     Ok("3".to_string())
126	///   })
127	///   .collect_into_string()
128	///   .unwrap();
129	///
130	/// assert_eq!(csv, "A,B,C\n1,2,3\n");
131	/// ```
132	pub fn add_col<F>(mut self, name: &str, get_value: F) -> Self
133	where
134		F: FnMut(&Headers, &Row) -> Result<String, Error> + 'a,
135	{
136		self.headers.push_field(name);
137		self.iterator = Box::new(AddCol {
138			iterator: self.iterator,
139			f: get_value,
140			source: self.source,
141			headers: self.headers.clone(),
142		});
143		self
144	}
145
146	/// Maps each row.
147	///
148	/// ## Example
149	///
150	/// ```
151	/// use csv_pipeline::Pipeline;
152	///
153	/// let csv = Pipeline::from_path("test/AB.csv")
154	///   .unwrap()
155	///   .map(|headers, row| {
156	///     Ok(row.into_iter().map(|field| field.to_string() + "0").collect())
157	///   })
158	///   .collect_into_string()
159	///   .unwrap();
160	///
161	/// assert_eq!(csv, "A,B\n10,20\n"
162	/// );
163	/// ```
164	pub fn map<F>(mut self, get_row: F) -> Self
165	where
166		F: FnMut(&Headers, Row) -> Result<Row, Error> + 'a,
167	{
168		self.iterator = Box::new(MapRow {
169			iterator: self.iterator,
170			f: get_row,
171			source: self.source,
172			headers: self.headers.clone(),
173		});
174		self
175	}
176
177	/// Maps each field of a column.
178	///
179	/// ## Example
180	///
181	/// ```
182	/// use csv_pipeline::Pipeline;
183	///
184	/// let csv = Pipeline::from_path("test/Countries.csv")
185	///   .unwrap()
186	///   .map_col("Country", |field| Ok(field.to_uppercase()))
187	///   .collect_into_string()
188	///   .unwrap();
189	///
190	/// assert_eq!(
191	///   csv,
192	///   "ID,Country\n\
193	///     1,NORWAY\n\
194	///     2,TUVALU\n"
195	/// );
196	/// ```
197	pub fn map_col<F>(mut self, col: &str, get_value: F) -> Self
198	where
199		F: FnMut(&str) -> Result<String, Error> + 'a,
200	{
201		self.iterator = Box::new(MapCol {
202			iterator: self.iterator,
203			f: get_value,
204			name: col.to_string(),
205			source: self.source,
206			index: self.headers.get_index(col),
207		});
208		self
209	}
210
211	/// Filter rows using the provided closure.
212	///
213	/// ## Example
214	///
215	/// ```
216	/// use csv_pipeline::Pipeline;
217	///
218	/// let csv = Pipeline::from_path("test/Countries.csv")
219	///   .unwrap()
220	///   .filter(|headers, row| {
221	///     let country = headers.get_field(&row, "Country").unwrap();
222	///     country == "Tuvalu"
223	///   })
224	///   .collect_into_string()
225	///   .unwrap();
226	///
227	/// assert_eq!(
228	///   csv,
229	///   "ID,Country\n\
230	///     2,Tuvalu\n"
231	/// );
232	/// ```
233	pub fn filter<F>(mut self, get_row: F) -> Self
234	where
235		F: FnMut(&Headers, &Row) -> bool + 'a,
236	{
237		self.iterator = Box::new(Filter {
238			iterator: self.iterator,
239			f: get_row,
240			source: self.source,
241			headers: self.headers.clone(),
242		});
243		self
244	}
245
246	/// Filter rows based on the field of the specified column, using the provided closure.
247	///
248	/// ## Example
249	///
250	/// ```
251	/// use csv_pipeline::Pipeline;
252	///
253	/// let csv = Pipeline::from_path("test/Countries.csv")
254	///   .unwrap()
255	///   .filter_col("Country", |country| country == "Tuvalu")
256	///   .collect_into_string()
257	///   .unwrap();
258	///
259	/// assert_eq!(
260	///   csv,
261	///   "ID,Country\n\
262	///     2,Tuvalu\n"
263	/// );
264	/// ```
265	pub fn filter_col<F>(mut self, name: &str, get_row: F) -> Self
266	where
267		F: FnMut(&str) -> bool + 'a,
268	{
269		self.iterator = Box::new(FilterCol {
270			name: name.to_string(),
271			iterator: self.iterator,
272			f: get_row,
273			source: self.source,
274			headers: self.headers.clone(),
275		});
276		self
277	}
278
279	/// Pick which columns to output, in the specified order. Panics if duplicate colums are specified.
280	///
281	/// ## Example
282	///
283	/// ```
284	/// use csv_pipeline::Pipeline;
285	///
286	/// let csv = Pipeline::from_path("test/AB.csv")
287	///   .unwrap()
288	///   .select(vec!["B"])
289	///   .collect_into_string()
290	///   .unwrap();
291	///
292	/// assert_eq!(csv, "B\n2\n");
293	/// ```
294	pub fn select(mut self, columns: Vec<&str>) -> Self {
295		let new_header_row = Row::from(columns.clone());
296		self.iterator = Box::new(Select {
297			iterator: self.iterator,
298			columns: columns.into_iter().map(String::from).collect(),
299			source: self.source,
300			headers: self.headers.clone(),
301		});
302		self.headers = Headers::from_row(new_header_row).unwrap();
303		self
304	}
305
306	/// Panics if a new name already exists
307	///
308	/// ## Example
309	///
310	/// ```
311	/// use csv_pipeline::Pipeline;
312	///
313	/// let csv = Pipeline::from_path("test/AB.csv")
314	///   .unwrap()
315	///   .rename_col("A", "X")
316	///   .collect_into_string()
317	///   .unwrap();
318	///
319	/// assert_eq!(csv, "X,B\n1,2\n");
320	/// ```
321	pub fn rename_col(mut self, from: &str, to: &str) -> Self {
322		match self.headers.rename(from, to) {
323			Ok(()) => (),
324			Err(e) => panic!("Error renaming column in source {}: {}", self.source, e),
325		};
326		self
327	}
328
329	/// Panics if a new name already exists
330	///
331	/// ## Example
332	///
333	/// ```
334	/// use csv_pipeline::Pipeline;
335	///
336	/// let csv = Pipeline::from_path("test/AB.csv")
337	///   .unwrap()
338	///   .rename_cols(|i, name| {
339	///     match name {
340	///       "A" => "X",
341	///       name => name,
342	///     }
343	///   })
344	///   .collect_into_string()
345	///   .unwrap();
346	///
347	/// assert_eq!(csv, "X,B\n1,2\n");
348	/// ```
349	pub fn rename_cols<R>(mut self, mut get_name: R) -> Self
350	where
351		R: FnMut(usize, &str) -> &str,
352	{
353		let mut new_headers = Headers::new();
354		for (i, name) in self.headers.into_iter().enumerate().borrow_mut() {
355			let new_name = get_name(i, name);
356			match new_headers.push_field(new_name) {
357				true => (),
358				false => panic!("New column name already exists"),
359			}
360		}
361		self.headers = new_headers;
362		self
363	}
364
365	/// Group and reduce rows into the provided format. Panics if the transform results in duplicate column names.
366	///
367	/// ## Example
368	///
369	/// ```
370	/// use csv_pipeline::Pipeline;
371	///
372	/// let csv = Pipeline::from_path("test/AB.csv")
373	///   .unwrap()
374	///   .rename_cols(|i, name| {
375	///     match name {
376	///       "A" => "X",
377	///       name => name,
378	///     }
379	///   })
380	///   .collect_into_string()
381	///   .unwrap();
382	///
383	/// assert_eq!(csv, "X,B\n1,2\n");
384	/// ```
385	pub fn transform_into<T>(self, mut get_transformers: T) -> Self
386	where
387		T: FnMut() -> Vec<Box<dyn Transform>> + 'a,
388	{
389		let hashers = get_transformers();
390		let names: Vec<_> = hashers.iter().map(|hasher| hasher.name()).collect();
391		Pipeline {
392			headers: Headers::from_row(Row::from(names)).unwrap(),
393			source: self.source,
394			iterator: Box::new(TransformInto {
395				iterator: self.iterator,
396				groups: LinkedHashMap::new(),
397				hashers: get_transformers(),
398				get_transformers,
399				source: self.source,
400				headers: self.headers.clone(),
401			}),
402		}
403	}
404
405	/// Do your own validation on each row.
406	pub fn validate<F>(mut self, f: F) -> Self
407	where
408		F: FnMut(&Headers, &Row) -> Result<(), Error> + 'a,
409	{
410		self.iterator = Box::new(Validate {
411			iterator: self.iterator,
412			f,
413			source: self.source,
414			headers: self.headers.clone(),
415		});
416		self
417	}
418
419	/// Do your own validation on the fields in a column.
420	pub fn validate_col<F>(mut self, name: &str, f: F) -> Self
421	where
422		F: FnMut(&str) -> Result<(), Error> + 'a,
423	{
424		self.iterator = Box::new(ValidateCol {
425			name: name.to_string(),
426			iterator: self.iterator,
427			f,
428			source: self.source,
429			headers: self.headers.clone(),
430		});
431		self
432	}
433
434	/// Write to the specified [`Target`].
435	///
436	/// ## Example
437	///
438	/// ```
439	/// use csv_pipeline::{Pipeline, Target};
440	///
441	/// let mut csv = String::new();
442	/// Pipeline::from_path("test/AB.csv")
443	///   .unwrap()
444	///   .flush(Target::string(&mut csv))
445	///   .run()
446	///   .unwrap();
447	///
448	/// assert_eq!(csv, "A,B\n1,2\n");
449	/// ```
450	pub fn flush(mut self, target: impl Target + 'a) -> Self {
451		let flush = Flush::new(self.iterator, target, self.source, self.headers.clone());
452		self.iterator = Box::new(flush);
453		self
454	}
455
456	/// Turn the pipeline into an iterator.
457	/// You can also do this using `pipeline.into_iter()`.
458	pub fn build(self) -> PipelineIter<'a> {
459		PipelineIter {
460			headers: self.headers,
461			iterator: Box::new(self.iterator),
462		}
463	}
464
465	/// Shorthand for `.build().run()`.
466	pub fn run(self) -> Result<(), PlError> {
467		self.build().run()
468	}
469
470	pub fn collect_into_rows(self) -> Result<Vec<Row>, PlError> {
471		let pipeline_iter = self.build();
472		let header_row = pipeline_iter.headers.get_row().clone();
473		let records: Result<Vec<Row>, PlError> = pipeline_iter.map(|record| record).collect();
474		let rows = vec![header_row]
475			.into_iter()
476			.chain(records?.into_iter())
477			.collect();
478		Ok(rows)
479	}
480
481	pub fn collect_into_string(self) -> Result<String, PlError> {
482		let mut csv = String::new();
483		self.flush(StringTarget::new(&mut csv)).run()?;
484		Ok(csv)
485	}
486}
487impl<'a> IntoIterator for Pipeline<'a> {
488	type Item = RowResult;
489	type IntoIter = PipelineIter<'a>;
490
491	fn into_iter(self) -> Self::IntoIter {
492		self.build()
493	}
494}
495
496/// A pipeline you can iterate through. You can get one using [`Pipeline::build`].
497pub struct PipelineIter<'a> {
498	pub headers: Headers,
499	pub iterator: Box<dyn Iterator<Item = RowResult> + 'a>,
500}
501
502impl<'a> PipelineIter<'a> {
503	/// Advances the iterator until an error is found.
504	///
505	/// Returns `None` when the iterator is finished.
506	pub fn next_error(&mut self) -> Option<PlError> {
507		while let Some(item) = self.next() {
508			if let Err(err) = item {
509				return Some(err);
510			}
511		}
512		None
513	}
514
515	/// Run through the whole iterator. Returns the first error found, if any
516	pub fn run(&mut self) -> Result<(), PlError> {
517		while let Some(item) = self.next() {
518			item?;
519		}
520		Ok(())
521	}
522}
523impl<'a> Iterator for PipelineIter<'a> {
524	type Item = RowResult;
525
526	fn next(&mut self) -> Option<Self::Item> {
527		self.iterator.next()
528	}
529}
530
531pub struct RowIter<R: io::Read> {
532	inner: StringRecordsIntoIter<R>,
533	source: usize,
534}
535impl<R: io::Read> RowIter<R> {
536	pub fn from_records(source: usize, records: StringRecordsIntoIter<R>) -> Self {
537		RowIter {
538			source,
539			inner: records,
540		}
541	}
542}
543impl<R: io::Read> Iterator for RowIter<R> {
544	type Item = RowResult;
545
546	fn next(&mut self) -> Option<Self::Item> {
547		self.inner.next().map(|result| {
548			result.map_err(|err| {
549				return Error::Csv(err).at_source(self.source);
550			})
551		})
552	}
553}
554
555#[test]
556fn from_pipelines_mismatch() {
557	let err = Pipeline::from_pipelines(vec![
558		Pipeline::from_path("test/AB.csv").unwrap(),
559		Pipeline::from_path("test/AB.csv").unwrap(),
560		Pipeline::from_path("test/Countries.csv").unwrap(),
561	])
562	.collect_into_string()
563	.unwrap_err();
564
565	assert_eq!(err.source, 2);
566	match err.error {
567		Error::MismatchedHeaders(h1, h2) => {
568			assert_eq!(h1, Row::from(vec!["A", "B"]));
569			assert_eq!(h2, Row::from(vec!["ID", "Country"]));
570		}
571		_ => panic!("Expected MismatchedHeaders"),
572	}
573}