1use super::headers::Headers;
2use crate::target::Target;
3use crate::transform::{compute_hash, Transform};
4use crate::{Error, Pipeline, PipelineIter, Row, RowResult};
5use linked_hash_map::{Entry, LinkedHashMap};
6
7pub struct PipelinesChain<'a, P> {
8 pub pipelines: P,
9 pub current: Option<PipelineIter<'a>>,
10 pub index: usize,
11 pub headers: Headers,
12}
13impl<'a, P> Iterator for PipelinesChain<'a, P>
14where
15 P: Iterator<Item = Pipeline<'a>>,
16{
17 type Item = RowResult;
18
19 fn next(&mut self) -> Option<Self::Item> {
20 match self.current.as_mut()?.next() {
22 Some(mut row) => {
23 if let Err(e) = row.as_mut() {
24 e.source = self.index;
25 }
26 return Some(row);
27 }
28 None => {}
29 };
30 match self.pipelines.next() {
32 Some(pipeline) => {
33 self.index += 1;
34 self.current = Some(pipeline.build());
35 let current = self.current.as_mut().unwrap();
36 if current.headers.get_row() != self.headers.get_row() {
37 return Some(Err(Error::MismatchedHeaders(
38 self.headers.get_row().to_owned(),
39 current.headers.get_row().to_owned(),
40 )
41 .at_source(self.index)));
42 }
43 }
44 None => {
45 self.current = None;
46 return None;
47 }
48 }
49 self.next()
50 }
51}
52
53pub struct AddCol<I, F: FnMut(&Headers, &Row) -> Result<String, Error>> {
54 pub iterator: I,
55 pub f: F,
56 pub source: usize,
57 pub headers: Headers,
58}
59impl<I, F> Iterator for AddCol<I, F>
60where
61 I: Iterator<Item = RowResult>,
62 F: FnMut(&Headers, &Row) -> Result<String, Error>,
63{
64 type Item = RowResult;
65
66 fn next(&mut self) -> Option<Self::Item> {
67 let mut row = match self.iterator.next()? {
68 Ok(row) => row,
69 Err(e) => return Some(Err(e)),
70 };
71 match (self.f)(&self.headers, &row) {
72 Ok(value) => {
73 row.push_field(&value);
74 Some(Ok(row))
75 }
76 Err(e) => Some(Err(e.at_source(self.source))),
77 }
78 }
79}
80
81pub struct MapRow<I, F: FnMut(&Headers, Row) -> Result<Row, Error>> {
82 pub iterator: I,
83 pub f: F,
84 pub source: usize,
85 pub headers: Headers,
86}
87impl<I, F> Iterator for MapRow<I, F>
88where
89 I: Iterator<Item = RowResult>,
90 F: FnMut(&Headers, Row) -> Result<Row, Error>,
91{
92 type Item = RowResult;
93
94 fn next(&mut self) -> Option<Self::Item> {
95 let row = match self.iterator.next()? {
96 Ok(row) => row,
97 Err(e) => return Some(Err(e)),
98 };
99 match (self.f)(&self.headers, row) {
100 Ok(value) => Some(Ok(value)),
101 Err(e) => Some(Err(e.at_source(self.source))),
102 }
103 }
104}
105
106pub struct MapCol<I, F: FnMut(&str) -> Result<String, Error>> {
107 pub iterator: I,
108 pub f: F,
109 pub name: String,
110 pub source: usize,
111 pub index: Option<usize>,
112}
113impl<I, F> Iterator for MapCol<I, F>
114where
115 I: Iterator<Item = RowResult>,
116 F: FnMut(&str) -> Result<String, Error>,
117{
118 type Item = RowResult;
119
120 fn next(&mut self) -> Option<Self::Item> {
121 let row = match self.iterator.next()? {
122 Ok(row) => row,
123 Err(e) => return Some(Err(e)),
124 };
125 let mut row_vec: Vec<_> = row.into_iter().collect();
126 let index = match self.index {
127 Some(index) => index,
128 None => {
129 return Some(Err(
130 Error::MissingColumn(self.name.clone()).at_source(self.source)
131 ))
132 }
133 };
134 let field = match row_vec.get_mut(index) {
135 Some(field) => field,
136 None => {
137 return Some(Err(
138 Error::MissingColumn(self.name.clone()).at_source(self.source)
139 ))
140 }
141 };
142 let new_value = match (self.f)(field) {
143 Ok(value) => value,
144 Err(e) => return Some(Err(e.at_source(self.source))),
145 };
146 *field = &new_value;
147 Some(Ok(row_vec.into()))
148 }
149}
150
151pub struct Filter<I, F: FnMut(&Headers, &Row) -> bool> {
152 pub iterator: I,
153 pub f: F,
154 pub source: usize,
155 pub headers: Headers,
156}
157impl<I, F> Iterator for Filter<I, F>
158where
159 I: Iterator<Item = RowResult>,
160 F: FnMut(&Headers, &Row) -> bool,
161{
162 type Item = RowResult;
163
164 fn next(&mut self) -> Option<Self::Item> {
165 loop {
166 let row = match self.iterator.next()? {
167 Ok(row) => row,
168 Err(e) => return Some(Err(e)),
169 };
170 let pass_filter = (self.f)(&self.headers, &row);
171 if pass_filter {
172 return Some(Ok(row));
173 }
174 }
175 }
176}
177
178pub struct FilterCol<I, F: FnMut(&str) -> bool> {
179 pub name: String,
180 pub iterator: I,
181 pub f: F,
182 pub source: usize,
183 pub headers: Headers,
184}
185impl<I, F> Iterator for FilterCol<I, F>
186where
187 I: Iterator<Item = RowResult>,
188 F: FnMut(&str) -> bool,
189{
190 type Item = RowResult;
191
192 fn next(&mut self) -> Option<Self::Item> {
193 loop {
194 let row = match self.iterator.next()? {
195 Ok(row) => row,
196 Err(e) => return Some(Err(e)),
197 };
198 let field = match self.headers.get_field(&row, &self.name) {
199 Some(field) => field,
200 None => {
201 return Some(Err(
202 Error::MissingColumn(self.name.clone()).at_source(self.source)
203 ))
204 }
205 };
206 let pass_filter = (self.f)(field);
207 if pass_filter {
208 return Some(Ok(row));
209 }
210 }
211 }
212}
213
214pub struct Select<I> {
215 pub iterator: I,
216 pub columns: Vec<String>,
217 pub source: usize,
218 pub headers: Headers,
219}
220impl<I> Iterator for Select<I>
221where
222 I: Iterator<Item = RowResult>,
223{
224 type Item = RowResult;
225
226 fn next(&mut self) -> Option<Self::Item> {
227 let row = match self.iterator.next()? {
228 Ok(row) => row,
229 Err(e) => return Some(Err(e)),
230 };
231 let mut selection = Vec::with_capacity(self.columns.len());
232 for col in &self.columns {
233 let field = match self.headers.get_field(&row, col) {
234 Some(field) => field,
235 None => return Some(Err(Error::MissingColumn(col.clone()).at_source(self.source))),
236 };
237 selection.push(field);
238 }
239 Some(Ok(selection.into()))
240 }
241}
242
243pub struct TransformInto<I, F>
244where
245 F: FnMut() -> Vec<Box<dyn Transform>>,
246{
247 pub iterator: I,
248 pub groups: LinkedHashMap<u64, Vec<Box<dyn Transform>>>,
249 pub hashers: Vec<Box<dyn Transform>>,
250 pub get_transformers: F,
251 pub source: usize,
252 pub headers: Headers,
253}
254impl<I, F> Iterator for TransformInto<I, F>
255where
256 I: Iterator<Item = RowResult>,
257 F: FnMut() -> Vec<Box<dyn Transform>>,
258{
259 type Item = RowResult;
260
261 fn next(&mut self) -> Option<Self::Item> {
262 while let Some(row_result) = self.iterator.next() {
264 let row = match row_result {
266 Ok(row) => row,
267 Err(e) => return Some(Err(e)),
268 };
269 let hash = match compute_hash(&self.hashers, &self.headers, &row) {
270 Ok(hash) => hash,
271 Err(e) => return Some(Err(e.at_source(self.source))),
272 };
273
274 match self.groups.entry(hash) {
275 Entry::Occupied(_) => {}
276 Entry::Vacant(entry) => {
277 let transformers = (self.get_transformers)();
278 entry.insert(transformers);
279 }
280 }
281
282 let group_row = self.groups.get_mut(&hash).unwrap();
283 for reducer in group_row {
284 let result = reducer.add_row(&self.headers, &row);
285 if let Err(e) = result {
286 return Some(Err(e.at_source(self.source)));
287 }
288 }
289 }
290 if let Some(key) = self.groups.keys().next().copied() {
292 let reducers = self.groups.remove(&key).unwrap();
293 let fields: Vec<_> = reducers.iter().map(|reducer| reducer.value()).collect();
294 let row = Row::from(fields);
295 Some(Ok(row))
296 } else {
297 None
298 }
299 }
300}
301
302pub struct Validate<I, F> {
303 pub iterator: I,
304 pub f: F,
305 pub source: usize,
306 pub headers: Headers,
307}
308impl<I, F> Iterator for Validate<I, F>
309where
310 I: Iterator<Item = RowResult>,
311 F: FnMut(&Headers, &Row) -> Result<(), Error>,
312{
313 type Item = RowResult;
314
315 fn next(&mut self) -> Option<Self::Item> {
316 let row = match self.iterator.next()? {
317 Ok(row) => row,
318 Err(e) => return Some(Err(e)),
319 };
320 match (self.f)(&self.headers, &row) {
321 Ok(()) => Some(Ok(row)),
322 Err(e) => Some(Err(e.at_source(self.source))),
323 }
324 }
325}
326
327pub struct ValidateCol<I, F> {
328 pub name: String,
329 pub iterator: I,
330 pub f: F,
331 pub source: usize,
332 pub headers: Headers,
333}
334impl<I, F> Iterator for ValidateCol<I, F>
335where
336 I: Iterator<Item = RowResult>,
337 F: FnMut(&str) -> Result<(), Error>,
338{
339 type Item = RowResult;
340
341 fn next(&mut self) -> Option<Self::Item> {
342 let row = match self.iterator.next()? {
343 Ok(row) => row,
344 Err(e) => return Some(Err(e)),
345 };
346 let field = match self.headers.get_field(&row, &self.name) {
347 Some(field) => field,
348 None => {
349 return Some(Err(
350 Error::MissingColumn(self.name.clone()).at_source(self.source)
351 ))
352 }
353 };
354 match (self.f)(&field) {
355 Ok(()) => Some(Ok(row)),
356 Err(e) => Some(Err(e.at_source(self.source))),
357 }
358 }
359}
360
361pub struct Flush<I, T> {
362 pub iterator: I,
363 pub target: T,
364 pub source: usize,
365 headers: Option<Headers>,
367}
368impl<I, T> Flush<I, T> {
369 pub fn new(iterator: I, target: T, source: usize, headers: Headers) -> Self {
370 Self {
371 iterator,
372 target,
373 source,
374 headers: Some(headers),
375 }
376 }
377}
378impl<I, T> Iterator for Flush<I, T>
379where
380 I: Iterator<Item = RowResult>,
381 T: Target,
382{
383 type Item = RowResult;
384
385 fn next(&mut self) -> Option<Self::Item> {
386 if let Some(headers) = &self.headers {
387 match self.target.write_headers(headers) {
388 Ok(()) => self.headers = None,
389 Err(e) => return Some(Err(Error::Csv(e).at_source(self.source))),
390 }
391 }
392
393 let row = match self.iterator.next()? {
394 Ok(row) => row,
395 Err(e) => return Some(Err(e)),
396 };
397 let r = match self.target.write_row(&row) {
398 Ok(()) => Some(Ok(row)),
399 Err(e) => return Some(Err(Error::Csv(e).at_source(self.source))),
400 };
401 r
402 }
403}