csv_pipeline/
pipeline_iterators.rs

1use super::headers::Headers;
2use crate::target::Target;
3use crate::transform::{compute_hash, Transform};
4use crate::{Error, Pipeline, PipelineIter, Row, RowResult};
5use linked_hash_map::{Entry, LinkedHashMap};
6
7pub struct PipelinesChain<'a, P> {
8	pub pipelines: P,
9	pub current: Option<PipelineIter<'a>>,
10	pub index: usize,
11	pub headers: Headers,
12}
13impl<'a, P> Iterator for PipelinesChain<'a, P>
14where
15	P: Iterator<Item = Pipeline<'a>>,
16{
17	type Item = RowResult;
18
19	fn next(&mut self) -> Option<Self::Item> {
20		// If current is None, iteration is done
21		match self.current.as_mut()?.next() {
22			Some(mut row) => {
23				if let Err(e) = row.as_mut() {
24					e.source = self.index;
25				}
26				return Some(row);
27			}
28			None => {}
29		};
30		// If current was done, go to the next pipeline
31		match self.pipelines.next() {
32			Some(pipeline) => {
33				self.index += 1;
34				self.current = Some(pipeline.build());
35				let current = self.current.as_mut().unwrap();
36				if current.headers.get_row() != self.headers.get_row() {
37					return Some(Err(Error::MismatchedHeaders(
38						self.headers.get_row().to_owned(),
39						current.headers.get_row().to_owned(),
40					)
41					.at_source(self.index)));
42				}
43			}
44			None => {
45				self.current = None;
46				return None;
47			}
48		}
49		self.next()
50	}
51}
52
53pub struct AddCol<I, F: FnMut(&Headers, &Row) -> Result<String, Error>> {
54	pub iterator: I,
55	pub f: F,
56	pub source: usize,
57	pub headers: Headers,
58}
59impl<I, F> Iterator for AddCol<I, F>
60where
61	I: Iterator<Item = RowResult>,
62	F: FnMut(&Headers, &Row) -> Result<String, Error>,
63{
64	type Item = RowResult;
65
66	fn next(&mut self) -> Option<Self::Item> {
67		let mut row = match self.iterator.next()? {
68			Ok(row) => row,
69			Err(e) => return Some(Err(e)),
70		};
71		match (self.f)(&self.headers, &row) {
72			Ok(value) => {
73				row.push_field(&value);
74				Some(Ok(row))
75			}
76			Err(e) => Some(Err(e.at_source(self.source))),
77		}
78	}
79}
80
81pub struct MapRow<I, F: FnMut(&Headers, Row) -> Result<Row, Error>> {
82	pub iterator: I,
83	pub f: F,
84	pub source: usize,
85	pub headers: Headers,
86}
87impl<I, F> Iterator for MapRow<I, F>
88where
89	I: Iterator<Item = RowResult>,
90	F: FnMut(&Headers, Row) -> Result<Row, Error>,
91{
92	type Item = RowResult;
93
94	fn next(&mut self) -> Option<Self::Item> {
95		let row = match self.iterator.next()? {
96			Ok(row) => row,
97			Err(e) => return Some(Err(e)),
98		};
99		match (self.f)(&self.headers, row) {
100			Ok(value) => Some(Ok(value)),
101			Err(e) => Some(Err(e.at_source(self.source))),
102		}
103	}
104}
105
106pub struct MapCol<I, F: FnMut(&str) -> Result<String, Error>> {
107	pub iterator: I,
108	pub f: F,
109	pub name: String,
110	pub source: usize,
111	pub index: Option<usize>,
112}
113impl<I, F> Iterator for MapCol<I, F>
114where
115	I: Iterator<Item = RowResult>,
116	F: FnMut(&str) -> Result<String, Error>,
117{
118	type Item = RowResult;
119
120	fn next(&mut self) -> Option<Self::Item> {
121		let row = match self.iterator.next()? {
122			Ok(row) => row,
123			Err(e) => return Some(Err(e)),
124		};
125		let mut row_vec: Vec<_> = row.into_iter().collect();
126		let index = match self.index {
127			Some(index) => index,
128			None => {
129				return Some(Err(
130					Error::MissingColumn(self.name.clone()).at_source(self.source)
131				))
132			}
133		};
134		let field = match row_vec.get_mut(index) {
135			Some(field) => field,
136			None => {
137				return Some(Err(
138					Error::MissingColumn(self.name.clone()).at_source(self.source)
139				))
140			}
141		};
142		let new_value = match (self.f)(field) {
143			Ok(value) => value,
144			Err(e) => return Some(Err(e.at_source(self.source))),
145		};
146		*field = &new_value;
147		Some(Ok(row_vec.into()))
148	}
149}
150
151pub struct Filter<I, F: FnMut(&Headers, &Row) -> bool> {
152	pub iterator: I,
153	pub f: F,
154	pub source: usize,
155	pub headers: Headers,
156}
157impl<I, F> Iterator for Filter<I, F>
158where
159	I: Iterator<Item = RowResult>,
160	F: FnMut(&Headers, &Row) -> bool,
161{
162	type Item = RowResult;
163
164	fn next(&mut self) -> Option<Self::Item> {
165		loop {
166			let row = match self.iterator.next()? {
167				Ok(row) => row,
168				Err(e) => return Some(Err(e)),
169			};
170			let pass_filter = (self.f)(&self.headers, &row);
171			if pass_filter {
172				return Some(Ok(row));
173			}
174		}
175	}
176}
177
178pub struct FilterCol<I, F: FnMut(&str) -> bool> {
179	pub name: String,
180	pub iterator: I,
181	pub f: F,
182	pub source: usize,
183	pub headers: Headers,
184}
185impl<I, F> Iterator for FilterCol<I, F>
186where
187	I: Iterator<Item = RowResult>,
188	F: FnMut(&str) -> bool,
189{
190	type Item = RowResult;
191
192	fn next(&mut self) -> Option<Self::Item> {
193		loop {
194			let row = match self.iterator.next()? {
195				Ok(row) => row,
196				Err(e) => return Some(Err(e)),
197			};
198			let field = match self.headers.get_field(&row, &self.name) {
199				Some(field) => field,
200				None => {
201					return Some(Err(
202						Error::MissingColumn(self.name.clone()).at_source(self.source)
203					))
204				}
205			};
206			let pass_filter = (self.f)(field);
207			if pass_filter {
208				return Some(Ok(row));
209			}
210		}
211	}
212}
213
214pub struct Select<I> {
215	pub iterator: I,
216	pub columns: Vec<String>,
217	pub source: usize,
218	pub headers: Headers,
219}
220impl<I> Iterator for Select<I>
221where
222	I: Iterator<Item = RowResult>,
223{
224	type Item = RowResult;
225
226	fn next(&mut self) -> Option<Self::Item> {
227		let row = match self.iterator.next()? {
228			Ok(row) => row,
229			Err(e) => return Some(Err(e)),
230		};
231		let mut selection = Vec::with_capacity(self.columns.len());
232		for col in &self.columns {
233			let field = match self.headers.get_field(&row, col) {
234				Some(field) => field,
235				None => return Some(Err(Error::MissingColumn(col.clone()).at_source(self.source))),
236			};
237			selection.push(field);
238		}
239		Some(Ok(selection.into()))
240	}
241}
242
243pub struct TransformInto<I, F>
244where
245	F: FnMut() -> Vec<Box<dyn Transform>>,
246{
247	pub iterator: I,
248	pub groups: LinkedHashMap<u64, Vec<Box<dyn Transform>>>,
249	pub hashers: Vec<Box<dyn Transform>>,
250	pub get_transformers: F,
251	pub source: usize,
252	pub headers: Headers,
253}
254impl<I, F> Iterator for TransformInto<I, F>
255where
256	I: Iterator<Item = RowResult>,
257	F: FnMut() -> Vec<Box<dyn Transform>>,
258{
259	type Item = RowResult;
260
261	fn next(&mut self) -> Option<Self::Item> {
262		// If any error rows are found, they are returned first
263		while let Some(row_result) = self.iterator.next() {
264			// First run iterator into LinkedHashMap
265			let row = match row_result {
266				Ok(row) => row,
267				Err(e) => return Some(Err(e)),
268			};
269			let hash = match compute_hash(&self.hashers, &self.headers, &row) {
270				Ok(hash) => hash,
271				Err(e) => return Some(Err(e.at_source(self.source))),
272			};
273
274			match self.groups.entry(hash) {
275				Entry::Occupied(_) => {}
276				Entry::Vacant(entry) => {
277					let transformers = (self.get_transformers)();
278					entry.insert(transformers);
279				}
280			}
281
282			let group_row = self.groups.get_mut(&hash).unwrap();
283			for reducer in group_row {
284				let result = reducer.add_row(&self.headers, &row);
285				if let Err(e) = result {
286					return Some(Err(e.at_source(self.source)));
287				}
288			}
289		}
290		// Finally, return rows from the LinkedHashMap
291		if let Some(key) = self.groups.keys().next().copied() {
292			let reducers = self.groups.remove(&key).unwrap();
293			let fields: Vec<_> = reducers.iter().map(|reducer| reducer.value()).collect();
294			let row = Row::from(fields);
295			Some(Ok(row))
296		} else {
297			None
298		}
299	}
300}
301
302pub struct Validate<I, F> {
303	pub iterator: I,
304	pub f: F,
305	pub source: usize,
306	pub headers: Headers,
307}
308impl<I, F> Iterator for Validate<I, F>
309where
310	I: Iterator<Item = RowResult>,
311	F: FnMut(&Headers, &Row) -> Result<(), Error>,
312{
313	type Item = RowResult;
314
315	fn next(&mut self) -> Option<Self::Item> {
316		let row = match self.iterator.next()? {
317			Ok(row) => row,
318			Err(e) => return Some(Err(e)),
319		};
320		match (self.f)(&self.headers, &row) {
321			Ok(()) => Some(Ok(row)),
322			Err(e) => Some(Err(e.at_source(self.source))),
323		}
324	}
325}
326
327pub struct ValidateCol<I, F> {
328	pub name: String,
329	pub iterator: I,
330	pub f: F,
331	pub source: usize,
332	pub headers: Headers,
333}
334impl<I, F> Iterator for ValidateCol<I, F>
335where
336	I: Iterator<Item = RowResult>,
337	F: FnMut(&str) -> Result<(), Error>,
338{
339	type Item = RowResult;
340
341	fn next(&mut self) -> Option<Self::Item> {
342		let row = match self.iterator.next()? {
343			Ok(row) => row,
344			Err(e) => return Some(Err(e)),
345		};
346		let field = match self.headers.get_field(&row, &self.name) {
347			Some(field) => field,
348			None => {
349				return Some(Err(
350					Error::MissingColumn(self.name.clone()).at_source(self.source)
351				))
352			}
353		};
354		match (self.f)(&field) {
355			Ok(()) => Some(Ok(row)),
356			Err(e) => Some(Err(e.at_source(self.source))),
357		}
358	}
359}
360
361pub struct Flush<I, T> {
362	pub iterator: I,
363	pub target: T,
364	pub source: usize,
365	/// `None` if headers have been written, `Some` otherwise
366	headers: Option<Headers>,
367}
368impl<I, T> Flush<I, T> {
369	pub fn new(iterator: I, target: T, source: usize, headers: Headers) -> Self {
370		Self {
371			iterator,
372			target,
373			source,
374			headers: Some(headers),
375		}
376	}
377}
378impl<I, T> Iterator for Flush<I, T>
379where
380	I: Iterator<Item = RowResult>,
381	T: Target,
382{
383	type Item = RowResult;
384
385	fn next(&mut self) -> Option<Self::Item> {
386		if let Some(headers) = &self.headers {
387			match self.target.write_headers(headers) {
388				Ok(()) => self.headers = None,
389				Err(e) => return Some(Err(Error::Csv(e).at_source(self.source))),
390			}
391		}
392
393		let row = match self.iterator.next()? {
394			Ok(row) => row,
395			Err(e) => return Some(Err(e)),
396		};
397		let r = match self.target.write_row(&row) {
398			Ok(()) => Some(Ok(row)),
399			Err(e) => return Some(Err(Error::Csv(e).at_source(self.source))),
400		};
401		r
402	}
403}