csv_diff/
csv_hash_comparer.rs

1use crate::csv_headers::*;
2use crate::csv_parse_result::{CsvLeftRightParseResult, RecordHash, RecordHashWithPosition};
3use crate::csv_parser_hasher::HashMapValue;
4use crate::diff_result::*;
5use crate::diff_row::*;
6use ahash::AHashMap as HashMap;
7use std::cmp::max;
8use std::io::Read;
9use std::io::Seek;
10
11pub(crate) struct CsvHashComparer<R: Read + Seek> {
12    csv_records_left_map: CsvHashValueMap,
13    csv_records_right_map: CsvHashValueMap,
14    intermediate_left_map: CsvHashValueMap,
15    intermediate_right_map: CsvHashValueMap,
16    max_capacity_left_map: usize,
17    max_capacity_right_map: usize,
18    csv_seek_left_reader: csv::Reader<R>,
19    csv_seek_right_reader: csv::Reader<R>,
20    diff_records: Vec<DiffByteRecord>,
21}
22
23trait ParseRecord {
24    fn parse_record<R: Read + Seek>(
25        self,
26        reader_left: &mut csv::Reader<R>,
27        reader_right: &mut csv::Reader<R>,
28    ) -> csv::Result<csv::ByteRecord>;
29}
30
31impl ParseRecord for &CsvLeftRightParseResult<RecordHashWithPosition> {
32    fn parse_record<R: Read + Seek>(
33        self,
34        reader_left: &mut csv::Reader<R>,
35        reader_right: &mut csv::Reader<R>,
36    ) -> csv::Result<csv::ByteRecord> {
37        match self {
38            CsvLeftRightParseResult::Left(RecordHashWithPosition { pos: pos_left, .. }) => {
39                reader_left
40                    .seek(pos_left.clone().into())
41                    .expect("must find the given position");
42                let mut left_byte_record = csv::ByteRecord::new();
43                reader_left.read_byte_record(&mut left_byte_record)?;
44                Ok(left_byte_record)
45            }
46            CsvLeftRightParseResult::Right(RecordHashWithPosition { pos: pos_right, .. }) => {
47                reader_right
48                    .seek(pos_right.clone().into())
49                    .expect("must find the given position");
50                let mut right_byte_record = csv::ByteRecord::new();
51                reader_right.read_byte_record(&mut right_byte_record)?;
52                Ok(right_byte_record)
53            }
54        }
55    }
56}
57
58impl<R: Read + std::io::Seek> CsvHashComparer<R> {
59    // TODO: maybe we can simplify this to only take one capacity and use it for both?
60    // But keep in mind, we would loose on flexibility (one csv is very small and one very big?)
61    pub fn with_capacity_and_reader(
62        left_capacity: usize,
63        right_capacity: usize,
64        left_reader: csv::Reader<R>,
65        right_reader: csv::Reader<R>,
66    ) -> Self {
67        Self {
68            csv_records_left_map: HashMap::with_capacity(left_capacity),
69            csv_records_right_map: HashMap::with_capacity(right_capacity),
70            intermediate_left_map: HashMap::new(),
71            intermediate_right_map: HashMap::new(),
72            max_capacity_left_map: left_capacity,
73            max_capacity_right_map: right_capacity,
74            csv_seek_left_reader: left_reader,
75            csv_seek_right_reader: right_reader,
76            diff_records: Vec::new(),
77        }
78    }
79
80    pub fn compare_csv_left_right_parse_result(
81        &mut self,
82        csv_left_right_parse_results: impl IntoIterator<
83            Item = CsvLeftRightParseResult<RecordHashWithPosition>,
84        >,
85    ) -> csv::Result<DiffByteRecords> {
86        let (csv_seek_left_reader, csv_seek_right_reader) = (
87            &mut self.csv_seek_left_reader,
88            &mut self.csv_seek_right_reader,
89        );
90        let headers_parsed: HeadersParsed = (
91            csv_seek_left_reader
92                .has_headers()
93                .then(|| csv_seek_left_reader.byte_headers().cloned()),
94            csv_seek_right_reader
95                .has_headers()
96                .then(|| csv_seek_right_reader.byte_headers().cloned()),
97        )
98            .into();
99
100        let headers: Headers = headers_parsed.try_into()?;
101
102        let mut csv_left_right_parse_results = csv_left_right_parse_results.into_iter();
103
104        let (num_columns, first_few) = match headers.max_num_cols() {
105            nc @ Some(_) => (nc, Default::default()),
106            None => match (
107                csv_left_right_parse_results.next(),
108                csv_left_right_parse_results.next(),
109            ) {
110                (Some(first), Some(second)) => {
111                    let (first_num_cols, second_num_cols) = (
112                        first
113                            .parse_record(
114                                &mut self.csv_seek_left_reader,
115                                &mut self.csv_seek_right_reader,
116                            )
117                            .map(|r| r.len())
118                            .ok(),
119                        second
120                            .parse_record(
121                                &mut self.csv_seek_left_reader,
122                                &mut self.csv_seek_right_reader,
123                            )
124                            .map(|r| r.len())
125                            .ok(),
126                    );
127                    (
128                        max(first_num_cols, second_num_cols),
129                        [Some(first), Some(second)],
130                    )
131                }
132                (None, Some(first)) | (Some(first), None) => (
133                    first
134                        .parse_record(
135                            &mut self.csv_seek_left_reader,
136                            &mut self.csv_seek_right_reader,
137                        )
138                        .map(|c| c.len())
139                        .ok(),
140                    [Some(first), None],
141                ),
142                (None, None) => (None, [None, None]),
143            },
144        };
145
146        for csv_left_right_parse_result in first_few
147            .into_iter()
148            .flatten()
149            .chain(csv_left_right_parse_results)
150        {
151            match csv_left_right_parse_result {
152                CsvLeftRightParseResult::Left(left_record_res) => {
153                    let pos_left = left_record_res.pos;
154                    let key = left_record_res.key();
155                    let record_hash_left = left_record_res.record_hash_num();
156                    match self.csv_records_right_map.get_mut(&key) {
157                        Some(hash_map_val) => {
158                            if let HashMapValue::Initial(ref record_hash_right, ref pos_right) =
159                                hash_map_val
160                            {
161                                if record_hash_left != *record_hash_right {
162                                    *hash_map_val = HashMapValue::Modified(pos_left, *pos_right);
163                                } else {
164                                    *hash_map_val = HashMapValue::Equal(
165                                        left_record_res.record_hash,
166                                        RecordHash::new(key, *record_hash_right),
167                                    );
168                                }
169                            }
170                        }
171                        None => {
172                            self.csv_records_left_map
173                                .insert(key, HashMapValue::Initial(record_hash_left, pos_left));
174                        }
175                    }
176                    if self.max_capacity_right_map > 0
177                        && pos_left.line % self.max_capacity_right_map as u64 == 0
178                    {
179                        for (k, v) in self.csv_records_right_map.drain() {
180                            match v {
181                                HashMapValue::Equal(..) => {
182                                    // nothing to do - will be removed
183                                }
184                                HashMapValue::Initial(_hash, _pos) => {
185                                    // put it back, because we don't know what to do with this value yet
186                                    self.intermediate_right_map.insert(k, v);
187                                }
188                                HashMapValue::Modified(pos_left, pos_right) => {
189                                    self.csv_seek_left_reader
190                                        .seek(pos_left.into())
191                                        .expect("must find the given position");
192                                    self.csv_seek_right_reader
193                                        .seek(pos_right.into())
194                                        .expect("must find the given position");
195                                    let mut left_byte_record = csv::ByteRecord::new();
196                                    // TODO: proper error handling (although we are safe here)
197                                    self.csv_seek_left_reader
198                                        .read_byte_record(&mut left_byte_record)
199                                        .expect("can be read");
200                                    let mut right_byte_record = csv::ByteRecord::new();
201                                    // TODO: proper error handling (although we are safe here)
202                                    self.csv_seek_right_reader
203                                        .read_byte_record(&mut right_byte_record)
204                                        .expect("can be read");
205                                    let fields_modified = left_byte_record
206                                        .iter()
207                                        .enumerate()
208                                        .zip(right_byte_record.iter())
209                                        .fold(
210                                            Vec::new(),
211                                            |mut acc, ((idx, field_left), field_right)| {
212                                                if field_left != field_right {
213                                                    acc.push(idx);
214                                                }
215                                                acc
216                                            },
217                                        );
218                                    self.diff_records.push(DiffByteRecord::Modify {
219                                        add: ByteRecordLineInfo::new(
220                                            right_byte_record,
221                                            pos_right.line,
222                                        ),
223                                        delete: ByteRecordLineInfo::new(
224                                            left_byte_record,
225                                            pos_left.line,
226                                        ),
227                                        field_indices: fields_modified,
228                                    });
229                                }
230                            }
231                        }
232
233                        std::mem::swap(
234                            &mut self.intermediate_right_map,
235                            &mut self.csv_records_right_map,
236                        );
237                    }
238                }
239                CsvLeftRightParseResult::Right(right_record_res) => {
240                    let pos_right = right_record_res.pos;
241                    let key = right_record_res.key();
242                    let record_hash_right = right_record_res.record_hash_num();
243                    match self.csv_records_left_map.get_mut(&key) {
244                        Some(hash_map_val) => {
245                            if let HashMapValue::Initial(ref record_hash_left, ref pos_left) =
246                                hash_map_val
247                            {
248                                if *record_hash_left != record_hash_right {
249                                    *hash_map_val = HashMapValue::Modified(*pos_left, pos_right);
250                                } else {
251                                    *hash_map_val = HashMapValue::Equal(
252                                        RecordHash::new(key, *record_hash_left),
253                                        right_record_res.record_hash,
254                                    );
255                                }
256                            }
257                        }
258                        None => {
259                            self.csv_records_right_map
260                                .insert(key, HashMapValue::Initial(record_hash_right, pos_right));
261                        }
262                    }
263                    if self.max_capacity_left_map > 0
264                        && pos_right.line % self.max_capacity_left_map as u64 == 0
265                    {
266                        for (k, v) in self.csv_records_left_map.drain() {
267                            match v {
268                                HashMapValue::Equal(..) => {
269                                    // nothing to do - will be removed
270                                }
271                                HashMapValue::Initial(_hash, _pos) => {
272                                    // put it back, because we don't know what to do with this value yet
273                                    self.intermediate_left_map.insert(k, v);
274                                }
275                                HashMapValue::Modified(pos_left, pos_right) => {
276                                    self.csv_seek_left_reader
277                                        .seek(pos_left.into())
278                                        .expect("must find the given position");
279                                    self.csv_seek_right_reader
280                                        .seek(pos_right.into())
281                                        .expect("must find the given position");
282                                    let mut left_byte_record = csv::ByteRecord::new();
283                                    // TODO: proper error handling (although we are safe here)
284                                    self.csv_seek_left_reader
285                                        .read_byte_record(&mut left_byte_record)
286                                        .expect("can be read");
287                                    let mut right_byte_record = csv::ByteRecord::new();
288                                    // TODO: proper error handling (although we are safe here)
289                                    self.csv_seek_right_reader
290                                        .read_byte_record(&mut right_byte_record)
291                                        .expect("can be read");
292                                    let fields_modified = left_byte_record
293                                        .iter()
294                                        .enumerate()
295                                        .zip(right_byte_record.iter())
296                                        .fold(
297                                            Vec::new(),
298                                            |mut acc, ((idx, field_left), field_right)| {
299                                                if field_left != field_right {
300                                                    acc.push(idx);
301                                                }
302                                                acc
303                                            },
304                                        );
305                                    self.diff_records.push(DiffByteRecord::Modify {
306                                        add: ByteRecordLineInfo::new(
307                                            right_byte_record,
308                                            pos_right.line,
309                                        ),
310                                        delete: ByteRecordLineInfo::new(
311                                            left_byte_record,
312                                            pos_left.line,
313                                        ),
314                                        field_indices: fields_modified,
315                                    });
316                                }
317                            }
318                        }
319                        std::mem::swap(
320                            &mut self.intermediate_left_map,
321                            &mut self.csv_records_left_map,
322                        );
323                    }
324                }
325            }
326        }
327
328        let mut diff_records = std::mem::take(&mut self.diff_records);
329        diff_records.extend(
330            std::mem::take(&mut self.csv_records_left_map)
331                .into_iter()
332                .filter_map(|(_, v)| match v {
333                    HashMapValue::Initial(_hash, pos) => {
334                        self.csv_seek_left_reader
335                            .seek(pos.into())
336                            .expect("must be found");
337                        let mut byte_record = csv::ByteRecord::new();
338                        self.csv_seek_left_reader
339                            .read_byte_record(&mut byte_record)
340                            .expect("can be read");
341                        Some(DiffByteRecord::Delete(ByteRecordLineInfo::new(
342                            byte_record,
343                            pos.line,
344                        )))
345                    }
346                    HashMapValue::Modified(pos_left, pos_right) => {
347                        self.csv_seek_left_reader
348                            .seek(pos_left.into())
349                            .expect("must find the given position");
350                        self.csv_seek_right_reader
351                            .seek(pos_right.into())
352                            .expect("must find the given position");
353                        let mut left_byte_record = csv::ByteRecord::new();
354                        // TODO: proper error handling (although we are safe here)
355                        self.csv_seek_left_reader
356                            .read_byte_record(&mut left_byte_record)
357                            .expect("can be read");
358                        let mut right_byte_record = csv::ByteRecord::new();
359                        // TODO: proper error handling (although we are safe here)
360                        self.csv_seek_right_reader
361                            .read_byte_record(&mut right_byte_record)
362                            .expect("can be read");
363                        let fields_modified = left_byte_record
364                            .iter()
365                            .enumerate()
366                            .zip(right_byte_record.iter())
367                            .fold(Vec::new(), |mut acc, ((idx, field_left), field_right)| {
368                                if field_left != field_right {
369                                    acc.push(idx);
370                                }
371                                acc
372                            });
373                        Some(DiffByteRecord::Modify {
374                            add: ByteRecordLineInfo::new(right_byte_record, pos_right.line),
375                            delete: ByteRecordLineInfo::new(left_byte_record, pos_left.line),
376                            field_indices: fields_modified,
377                        })
378                    }
379                    _ => None,
380                }),
381        );
382
383        diff_records.extend(
384            std::mem::take(&mut self.csv_records_right_map)
385                .into_iter()
386                .filter_map(|(_, v)| match v {
387                    HashMapValue::Initial(_hash, pos) => {
388                        self.csv_seek_right_reader
389                            .seek(pos.into())
390                            .expect("must be found");
391                        let mut byte_record = csv::ByteRecord::new();
392                        self.csv_seek_right_reader
393                            .read_byte_record(&mut byte_record)
394                            .expect("can be read");
395                        Some(DiffByteRecord::Add(ByteRecordLineInfo::new(
396                            byte_record,
397                            pos.line,
398                        )))
399                    }
400                    HashMapValue::Modified(pos_left, pos_right) => {
401                        self.csv_seek_left_reader
402                            .seek(pos_left.into())
403                            .expect("must find the given position");
404                        self.csv_seek_right_reader
405                            .seek(pos_right.into())
406                            .expect("must find the given position");
407                        let mut left_byte_record = csv::ByteRecord::new();
408                        // TODO: proper error handling (although we are safe here)
409                        self.csv_seek_left_reader
410                            .read_byte_record(&mut left_byte_record)
411                            .expect("can be read");
412                        let mut right_byte_record = csv::ByteRecord::new();
413                        // TODO: proper error handling (although we are safe here)
414                        self.csv_seek_right_reader
415                            .read_byte_record(&mut right_byte_record)
416                            .expect("can be read");
417                        let fields_modified = left_byte_record
418                            .iter()
419                            .enumerate()
420                            .zip(right_byte_record.iter())
421                            .fold(Vec::new(), |mut acc, ((idx, field_left), field_right)| {
422                                if field_left != field_right {
423                                    acc.push(idx);
424                                }
425                                acc
426                            });
427                        Some(DiffByteRecord::Modify {
428                            add: ByteRecordLineInfo::new(right_byte_record, pos_right.line),
429                            delete: ByteRecordLineInfo::new(left_byte_record, pos_left.line),
430                            field_indices: fields_modified,
431                        })
432                    }
433                    _ => None,
434                }),
435        );
436
437        Ok(DiffByteRecords::new(diff_records, headers, num_columns))
438    }
439}