1use super::headers::Headers;
2use crate::pipeline_iterators::{
3 AddCol, Filter, FilterCol, Flush, MapCol, MapRow, PipelinesChain, Select, TransformInto,
4 Validate, ValidateCol,
5};
6use crate::target::{StringTarget, Target};
7use crate::transform::Transform;
8use crate::{Error, PlError, Row, RowResult};
9use csv::{Reader, ReaderBuilder, StringRecordsIntoIter};
10use linked_hash_map::LinkedHashMap;
11use std::borrow::BorrowMut;
12use std::io;
13use std::path::Path;
14
15pub struct Pipeline<'a> {
17 pub headers: Headers,
18 pub(crate) source: usize,
19 iterator: Box<dyn Iterator<Item = RowResult> + 'a>,
20}
21
22impl<'a> Pipeline<'a> {
23 pub fn from_reader<R: io::Read + 'a>(mut reader: Reader<R>) -> Result<Self, PlError> {
24 let headers_row = reader.headers().unwrap().clone();
25 let row_iterator = RowIter::from_records(0, reader.into_records());
26 Ok(Pipeline {
27 headers: match Headers::from_row(headers_row) {
28 Ok(headers) => headers,
29 Err(duplicated_col) => {
30 return Err(Error::DuplicateColumn(duplicated_col).at_source(0))
31 }
32 },
33 source: 0,
34 iterator: Box::new(row_iterator),
35 })
36 }
37
38 pub fn from_path<P: AsRef<Path>>(file_path: P) -> Result<Self, PlError> {
40 let ext = file_path.as_ref().extension().unwrap_or_default();
41 let delimiter = match ext.to_string_lossy().as_ref() {
42 "tsv" => b'\t',
43 "csv" => b',',
44 _ => panic!("Unsupported file {}", file_path.as_ref().display()),
45 };
46 let reader_result = ReaderBuilder::new()
47 .delimiter(delimiter)
48 .from_path(file_path);
49 match reader_result {
50 Ok(reader) => Self::from_reader(reader),
51 Err(e) => Err(Error::Csv(e).at_source(0)),
52 }
53 }
54
55 pub fn from_rows<I: IntoIterator<Item = Row>>(records: I) -> Result<Self, PlError>
56 where
57 <I as IntoIterator>::IntoIter: 'a,
58 {
59 let mut records = records.into_iter();
60 let headers_row = records.next().unwrap();
61 let row_iterator = records.map(|row| -> RowResult {
62 return Ok(row);
63 });
64 Ok(Pipeline {
65 headers: match Headers::from_row(headers_row) {
66 Ok(headers) => headers,
67 Err(duplicated_col) => {
68 return Err(Error::DuplicateColumn(duplicated_col).at_source(0))
69 }
70 },
71 source: 0,
72 iterator: Box::new(row_iterator),
73 })
74 }
75
76 pub fn from_pipelines<I>(pipelines: I) -> Self
93 where
94 I: IntoIterator<Item = Pipeline<'a>>,
95 <I as IntoIterator>::IntoIter: 'a,
96 {
97 let mut pipelines = pipelines.into_iter();
98 let current = pipelines.next();
99 let headers = match current {
100 Some(ref pipeline) => pipeline.headers.clone(),
101 None => Headers::new(),
102 };
103 Pipeline {
104 headers: headers.clone(),
105 source: 0,
106 iterator: Box::new(PipelinesChain {
107 pipelines,
108 current: current.map(|p| p.build()),
109 index: 0,
110 headers,
111 }),
112 }
113 }
114
115 pub fn add_col<F>(mut self, name: &str, get_value: F) -> Self
133 where
134 F: FnMut(&Headers, &Row) -> Result<String, Error> + 'a,
135 {
136 self.headers.push_field(name);
137 self.iterator = Box::new(AddCol {
138 iterator: self.iterator,
139 f: get_value,
140 source: self.source,
141 headers: self.headers.clone(),
142 });
143 self
144 }
145
146 pub fn map<F>(mut self, get_row: F) -> Self
165 where
166 F: FnMut(&Headers, Row) -> Result<Row, Error> + 'a,
167 {
168 self.iterator = Box::new(MapRow {
169 iterator: self.iterator,
170 f: get_row,
171 source: self.source,
172 headers: self.headers.clone(),
173 });
174 self
175 }
176
177 pub fn map_col<F>(mut self, col: &str, get_value: F) -> Self
198 where
199 F: FnMut(&str) -> Result<String, Error> + 'a,
200 {
201 self.iterator = Box::new(MapCol {
202 iterator: self.iterator,
203 f: get_value,
204 name: col.to_string(),
205 source: self.source,
206 index: self.headers.get_index(col),
207 });
208 self
209 }
210
211 pub fn filter<F>(mut self, get_row: F) -> Self
234 where
235 F: FnMut(&Headers, &Row) -> bool + 'a,
236 {
237 self.iterator = Box::new(Filter {
238 iterator: self.iterator,
239 f: get_row,
240 source: self.source,
241 headers: self.headers.clone(),
242 });
243 self
244 }
245
246 pub fn filter_col<F>(mut self, name: &str, get_row: F) -> Self
266 where
267 F: FnMut(&str) -> bool + 'a,
268 {
269 self.iterator = Box::new(FilterCol {
270 name: name.to_string(),
271 iterator: self.iterator,
272 f: get_row,
273 source: self.source,
274 headers: self.headers.clone(),
275 });
276 self
277 }
278
279 pub fn select(mut self, columns: Vec<&str>) -> Self {
295 let new_header_row = Row::from(columns.clone());
296 self.iterator = Box::new(Select {
297 iterator: self.iterator,
298 columns: columns.into_iter().map(String::from).collect(),
299 source: self.source,
300 headers: self.headers.clone(),
301 });
302 self.headers = Headers::from_row(new_header_row).unwrap();
303 self
304 }
305
306 pub fn rename_col(mut self, from: &str, to: &str) -> Self {
322 match self.headers.rename(from, to) {
323 Ok(()) => (),
324 Err(e) => panic!("Error renaming column in source {}: {}", self.source, e),
325 };
326 self
327 }
328
329 pub fn rename_cols<R>(mut self, mut get_name: R) -> Self
350 where
351 R: FnMut(usize, &str) -> &str,
352 {
353 let mut new_headers = Headers::new();
354 for (i, name) in self.headers.into_iter().enumerate().borrow_mut() {
355 let new_name = get_name(i, name);
356 match new_headers.push_field(new_name) {
357 true => (),
358 false => panic!("New column name already exists"),
359 }
360 }
361 self.headers = new_headers;
362 self
363 }
364
365 pub fn transform_into<T>(self, mut get_transformers: T) -> Self
386 where
387 T: FnMut() -> Vec<Box<dyn Transform>> + 'a,
388 {
389 let hashers = get_transformers();
390 let names: Vec<_> = hashers.iter().map(|hasher| hasher.name()).collect();
391 Pipeline {
392 headers: Headers::from_row(Row::from(names)).unwrap(),
393 source: self.source,
394 iterator: Box::new(TransformInto {
395 iterator: self.iterator,
396 groups: LinkedHashMap::new(),
397 hashers: get_transformers(),
398 get_transformers,
399 source: self.source,
400 headers: self.headers.clone(),
401 }),
402 }
403 }
404
405 pub fn validate<F>(mut self, f: F) -> Self
407 where
408 F: FnMut(&Headers, &Row) -> Result<(), Error> + 'a,
409 {
410 self.iterator = Box::new(Validate {
411 iterator: self.iterator,
412 f,
413 source: self.source,
414 headers: self.headers.clone(),
415 });
416 self
417 }
418
419 pub fn validate_col<F>(mut self, name: &str, f: F) -> Self
421 where
422 F: FnMut(&str) -> Result<(), Error> + 'a,
423 {
424 self.iterator = Box::new(ValidateCol {
425 name: name.to_string(),
426 iterator: self.iterator,
427 f,
428 source: self.source,
429 headers: self.headers.clone(),
430 });
431 self
432 }
433
434 pub fn flush(mut self, target: impl Target + 'a) -> Self {
451 let flush = Flush::new(self.iterator, target, self.source, self.headers.clone());
452 self.iterator = Box::new(flush);
453 self
454 }
455
456 pub fn build(self) -> PipelineIter<'a> {
459 PipelineIter {
460 headers: self.headers,
461 iterator: Box::new(self.iterator),
462 }
463 }
464
465 pub fn run(self) -> Result<(), PlError> {
467 self.build().run()
468 }
469
470 pub fn collect_into_rows(self) -> Result<Vec<Row>, PlError> {
471 let pipeline_iter = self.build();
472 let header_row = pipeline_iter.headers.get_row().clone();
473 let records: Result<Vec<Row>, PlError> = pipeline_iter.map(|record| record).collect();
474 let rows = vec![header_row]
475 .into_iter()
476 .chain(records?.into_iter())
477 .collect();
478 Ok(rows)
479 }
480
481 pub fn collect_into_string(self) -> Result<String, PlError> {
482 let mut csv = String::new();
483 self.flush(StringTarget::new(&mut csv)).run()?;
484 Ok(csv)
485 }
486}
487impl<'a> IntoIterator for Pipeline<'a> {
488 type Item = RowResult;
489 type IntoIter = PipelineIter<'a>;
490
491 fn into_iter(self) -> Self::IntoIter {
492 self.build()
493 }
494}
495
496pub struct PipelineIter<'a> {
498 pub headers: Headers,
499 pub iterator: Box<dyn Iterator<Item = RowResult> + 'a>,
500}
501
502impl<'a> PipelineIter<'a> {
503 pub fn next_error(&mut self) -> Option<PlError> {
507 while let Some(item) = self.next() {
508 if let Err(err) = item {
509 return Some(err);
510 }
511 }
512 None
513 }
514
515 pub fn run(&mut self) -> Result<(), PlError> {
517 while let Some(item) = self.next() {
518 item?;
519 }
520 Ok(())
521 }
522}
523impl<'a> Iterator for PipelineIter<'a> {
524 type Item = RowResult;
525
526 fn next(&mut self) -> Option<Self::Item> {
527 self.iterator.next()
528 }
529}
530
531pub struct RowIter<R: io::Read> {
532 inner: StringRecordsIntoIter<R>,
533 source: usize,
534}
535impl<R: io::Read> RowIter<R> {
536 pub fn from_records(source: usize, records: StringRecordsIntoIter<R>) -> Self {
537 RowIter {
538 source,
539 inner: records,
540 }
541 }
542}
543impl<R: io::Read> Iterator for RowIter<R> {
544 type Item = RowResult;
545
546 fn next(&mut self) -> Option<Self::Item> {
547 self.inner.next().map(|result| {
548 result.map_err(|err| {
549 return Error::Csv(err).at_source(self.source);
550 })
551 })
552 }
553}
554
555#[test]
556fn from_pipelines_mismatch() {
557 let err = Pipeline::from_pipelines(vec![
558 Pipeline::from_path("test/AB.csv").unwrap(),
559 Pipeline::from_path("test/AB.csv").unwrap(),
560 Pipeline::from_path("test/Countries.csv").unwrap(),
561 ])
562 .collect_into_string()
563 .unwrap_err();
564
565 assert_eq!(err.source, 2);
566 match err.error {
567 Error::MismatchedHeaders(h1, h2) => {
568 assert_eq!(h1, Row::from(vec!["A", "B"]));
569 assert_eq!(h2, Row::from(vec!["ID", "Country"]));
570 }
571 _ => panic!("Expected MismatchedHeaders"),
572 }
573}