1use crate::csv_headers::*;
2use crate::csv_parse_result::{CsvLeftRightParseResult, RecordHash, RecordHashWithPosition};
3use crate::csv_parser_hasher::HashMapValue;
4use crate::diff_result::*;
5use crate::diff_row::*;
6use ahash::AHashMap as HashMap;
7use std::cmp::max;
8use std::io::Read;
9use std::io::Seek;
10
11pub(crate) struct CsvHashComparer<R: Read + Seek> {
12 csv_records_left_map: CsvHashValueMap,
13 csv_records_right_map: CsvHashValueMap,
14 intermediate_left_map: CsvHashValueMap,
15 intermediate_right_map: CsvHashValueMap,
16 max_capacity_left_map: usize,
17 max_capacity_right_map: usize,
18 csv_seek_left_reader: csv::Reader<R>,
19 csv_seek_right_reader: csv::Reader<R>,
20 diff_records: Vec<DiffByteRecord>,
21}
22
23trait ParseRecord {
24 fn parse_record<R: Read + Seek>(
25 self,
26 reader_left: &mut csv::Reader<R>,
27 reader_right: &mut csv::Reader<R>,
28 ) -> csv::Result<csv::ByteRecord>;
29}
30
31impl ParseRecord for &CsvLeftRightParseResult<RecordHashWithPosition> {
32 fn parse_record<R: Read + Seek>(
33 self,
34 reader_left: &mut csv::Reader<R>,
35 reader_right: &mut csv::Reader<R>,
36 ) -> csv::Result<csv::ByteRecord> {
37 match self {
38 CsvLeftRightParseResult::Left(RecordHashWithPosition { pos: pos_left, .. }) => {
39 reader_left
40 .seek(pos_left.clone().into())
41 .expect("must find the given position");
42 let mut left_byte_record = csv::ByteRecord::new();
43 reader_left.read_byte_record(&mut left_byte_record)?;
44 Ok(left_byte_record)
45 }
46 CsvLeftRightParseResult::Right(RecordHashWithPosition { pos: pos_right, .. }) => {
47 reader_right
48 .seek(pos_right.clone().into())
49 .expect("must find the given position");
50 let mut right_byte_record = csv::ByteRecord::new();
51 reader_right.read_byte_record(&mut right_byte_record)?;
52 Ok(right_byte_record)
53 }
54 }
55 }
56}
57
58impl<R: Read + std::io::Seek> CsvHashComparer<R> {
59 pub fn with_capacity_and_reader(
62 left_capacity: usize,
63 right_capacity: usize,
64 left_reader: csv::Reader<R>,
65 right_reader: csv::Reader<R>,
66 ) -> Self {
67 Self {
68 csv_records_left_map: HashMap::with_capacity(left_capacity),
69 csv_records_right_map: HashMap::with_capacity(right_capacity),
70 intermediate_left_map: HashMap::new(),
71 intermediate_right_map: HashMap::new(),
72 max_capacity_left_map: left_capacity,
73 max_capacity_right_map: right_capacity,
74 csv_seek_left_reader: left_reader,
75 csv_seek_right_reader: right_reader,
76 diff_records: Vec::new(),
77 }
78 }
79
80 pub fn compare_csv_left_right_parse_result(
81 &mut self,
82 csv_left_right_parse_results: impl IntoIterator<
83 Item = CsvLeftRightParseResult<RecordHashWithPosition>,
84 >,
85 ) -> csv::Result<DiffByteRecords> {
86 let (csv_seek_left_reader, csv_seek_right_reader) = (
87 &mut self.csv_seek_left_reader,
88 &mut self.csv_seek_right_reader,
89 );
90 let headers_parsed: HeadersParsed = (
91 csv_seek_left_reader
92 .has_headers()
93 .then(|| csv_seek_left_reader.byte_headers().cloned()),
94 csv_seek_right_reader
95 .has_headers()
96 .then(|| csv_seek_right_reader.byte_headers().cloned()),
97 )
98 .into();
99
100 let headers: Headers = headers_parsed.try_into()?;
101
102 let mut csv_left_right_parse_results = csv_left_right_parse_results.into_iter();
103
104 let (num_columns, first_few) = match headers.max_num_cols() {
105 nc @ Some(_) => (nc, Default::default()),
106 None => match (
107 csv_left_right_parse_results.next(),
108 csv_left_right_parse_results.next(),
109 ) {
110 (Some(first), Some(second)) => {
111 let (first_num_cols, second_num_cols) = (
112 first
113 .parse_record(
114 &mut self.csv_seek_left_reader,
115 &mut self.csv_seek_right_reader,
116 )
117 .map(|r| r.len())
118 .ok(),
119 second
120 .parse_record(
121 &mut self.csv_seek_left_reader,
122 &mut self.csv_seek_right_reader,
123 )
124 .map(|r| r.len())
125 .ok(),
126 );
127 (
128 max(first_num_cols, second_num_cols),
129 [Some(first), Some(second)],
130 )
131 }
132 (None, Some(first)) | (Some(first), None) => (
133 first
134 .parse_record(
135 &mut self.csv_seek_left_reader,
136 &mut self.csv_seek_right_reader,
137 )
138 .map(|c| c.len())
139 .ok(),
140 [Some(first), None],
141 ),
142 (None, None) => (None, [None, None]),
143 },
144 };
145
146 for csv_left_right_parse_result in first_few
147 .into_iter()
148 .flatten()
149 .chain(csv_left_right_parse_results)
150 {
151 match csv_left_right_parse_result {
152 CsvLeftRightParseResult::Left(left_record_res) => {
153 let pos_left = left_record_res.pos;
154 let key = left_record_res.key();
155 let record_hash_left = left_record_res.record_hash_num();
156 match self.csv_records_right_map.get_mut(&key) {
157 Some(hash_map_val) => {
158 if let HashMapValue::Initial(ref record_hash_right, ref pos_right) =
159 hash_map_val
160 {
161 if record_hash_left != *record_hash_right {
162 *hash_map_val = HashMapValue::Modified(pos_left, *pos_right);
163 } else {
164 *hash_map_val = HashMapValue::Equal(
165 left_record_res.record_hash,
166 RecordHash::new(key, *record_hash_right),
167 );
168 }
169 }
170 }
171 None => {
172 self.csv_records_left_map
173 .insert(key, HashMapValue::Initial(record_hash_left, pos_left));
174 }
175 }
176 if self.max_capacity_right_map > 0
177 && pos_left.line % self.max_capacity_right_map as u64 == 0
178 {
179 for (k, v) in self.csv_records_right_map.drain() {
180 match v {
181 HashMapValue::Equal(..) => {
182 }
184 HashMapValue::Initial(_hash, _pos) => {
185 self.intermediate_right_map.insert(k, v);
187 }
188 HashMapValue::Modified(pos_left, pos_right) => {
189 self.csv_seek_left_reader
190 .seek(pos_left.into())
191 .expect("must find the given position");
192 self.csv_seek_right_reader
193 .seek(pos_right.into())
194 .expect("must find the given position");
195 let mut left_byte_record = csv::ByteRecord::new();
196 self.csv_seek_left_reader
198 .read_byte_record(&mut left_byte_record)
199 .expect("can be read");
200 let mut right_byte_record = csv::ByteRecord::new();
201 self.csv_seek_right_reader
203 .read_byte_record(&mut right_byte_record)
204 .expect("can be read");
205 let fields_modified = left_byte_record
206 .iter()
207 .enumerate()
208 .zip(right_byte_record.iter())
209 .fold(
210 Vec::new(),
211 |mut acc, ((idx, field_left), field_right)| {
212 if field_left != field_right {
213 acc.push(idx);
214 }
215 acc
216 },
217 );
218 self.diff_records.push(DiffByteRecord::Modify {
219 add: ByteRecordLineInfo::new(
220 right_byte_record,
221 pos_right.line,
222 ),
223 delete: ByteRecordLineInfo::new(
224 left_byte_record,
225 pos_left.line,
226 ),
227 field_indices: fields_modified,
228 });
229 }
230 }
231 }
232
233 std::mem::swap(
234 &mut self.intermediate_right_map,
235 &mut self.csv_records_right_map,
236 );
237 }
238 }
239 CsvLeftRightParseResult::Right(right_record_res) => {
240 let pos_right = right_record_res.pos;
241 let key = right_record_res.key();
242 let record_hash_right = right_record_res.record_hash_num();
243 match self.csv_records_left_map.get_mut(&key) {
244 Some(hash_map_val) => {
245 if let HashMapValue::Initial(ref record_hash_left, ref pos_left) =
246 hash_map_val
247 {
248 if *record_hash_left != record_hash_right {
249 *hash_map_val = HashMapValue::Modified(*pos_left, pos_right);
250 } else {
251 *hash_map_val = HashMapValue::Equal(
252 RecordHash::new(key, *record_hash_left),
253 right_record_res.record_hash,
254 );
255 }
256 }
257 }
258 None => {
259 self.csv_records_right_map
260 .insert(key, HashMapValue::Initial(record_hash_right, pos_right));
261 }
262 }
263 if self.max_capacity_left_map > 0
264 && pos_right.line % self.max_capacity_left_map as u64 == 0
265 {
266 for (k, v) in self.csv_records_left_map.drain() {
267 match v {
268 HashMapValue::Equal(..) => {
269 }
271 HashMapValue::Initial(_hash, _pos) => {
272 self.intermediate_left_map.insert(k, v);
274 }
275 HashMapValue::Modified(pos_left, pos_right) => {
276 self.csv_seek_left_reader
277 .seek(pos_left.into())
278 .expect("must find the given position");
279 self.csv_seek_right_reader
280 .seek(pos_right.into())
281 .expect("must find the given position");
282 let mut left_byte_record = csv::ByteRecord::new();
283 self.csv_seek_left_reader
285 .read_byte_record(&mut left_byte_record)
286 .expect("can be read");
287 let mut right_byte_record = csv::ByteRecord::new();
288 self.csv_seek_right_reader
290 .read_byte_record(&mut right_byte_record)
291 .expect("can be read");
292 let fields_modified = left_byte_record
293 .iter()
294 .enumerate()
295 .zip(right_byte_record.iter())
296 .fold(
297 Vec::new(),
298 |mut acc, ((idx, field_left), field_right)| {
299 if field_left != field_right {
300 acc.push(idx);
301 }
302 acc
303 },
304 );
305 self.diff_records.push(DiffByteRecord::Modify {
306 add: ByteRecordLineInfo::new(
307 right_byte_record,
308 pos_right.line,
309 ),
310 delete: ByteRecordLineInfo::new(
311 left_byte_record,
312 pos_left.line,
313 ),
314 field_indices: fields_modified,
315 });
316 }
317 }
318 }
319 std::mem::swap(
320 &mut self.intermediate_left_map,
321 &mut self.csv_records_left_map,
322 );
323 }
324 }
325 }
326 }
327
328 let mut diff_records = std::mem::take(&mut self.diff_records);
329 diff_records.extend(
330 std::mem::take(&mut self.csv_records_left_map)
331 .into_iter()
332 .filter_map(|(_, v)| match v {
333 HashMapValue::Initial(_hash, pos) => {
334 self.csv_seek_left_reader
335 .seek(pos.into())
336 .expect("must be found");
337 let mut byte_record = csv::ByteRecord::new();
338 self.csv_seek_left_reader
339 .read_byte_record(&mut byte_record)
340 .expect("can be read");
341 Some(DiffByteRecord::Delete(ByteRecordLineInfo::new(
342 byte_record,
343 pos.line,
344 )))
345 }
346 HashMapValue::Modified(pos_left, pos_right) => {
347 self.csv_seek_left_reader
348 .seek(pos_left.into())
349 .expect("must find the given position");
350 self.csv_seek_right_reader
351 .seek(pos_right.into())
352 .expect("must find the given position");
353 let mut left_byte_record = csv::ByteRecord::new();
354 self.csv_seek_left_reader
356 .read_byte_record(&mut left_byte_record)
357 .expect("can be read");
358 let mut right_byte_record = csv::ByteRecord::new();
359 self.csv_seek_right_reader
361 .read_byte_record(&mut right_byte_record)
362 .expect("can be read");
363 let fields_modified = left_byte_record
364 .iter()
365 .enumerate()
366 .zip(right_byte_record.iter())
367 .fold(Vec::new(), |mut acc, ((idx, field_left), field_right)| {
368 if field_left != field_right {
369 acc.push(idx);
370 }
371 acc
372 });
373 Some(DiffByteRecord::Modify {
374 add: ByteRecordLineInfo::new(right_byte_record, pos_right.line),
375 delete: ByteRecordLineInfo::new(left_byte_record, pos_left.line),
376 field_indices: fields_modified,
377 })
378 }
379 _ => None,
380 }),
381 );
382
383 diff_records.extend(
384 std::mem::take(&mut self.csv_records_right_map)
385 .into_iter()
386 .filter_map(|(_, v)| match v {
387 HashMapValue::Initial(_hash, pos) => {
388 self.csv_seek_right_reader
389 .seek(pos.into())
390 .expect("must be found");
391 let mut byte_record = csv::ByteRecord::new();
392 self.csv_seek_right_reader
393 .read_byte_record(&mut byte_record)
394 .expect("can be read");
395 Some(DiffByteRecord::Add(ByteRecordLineInfo::new(
396 byte_record,
397 pos.line,
398 )))
399 }
400 HashMapValue::Modified(pos_left, pos_right) => {
401 self.csv_seek_left_reader
402 .seek(pos_left.into())
403 .expect("must find the given position");
404 self.csv_seek_right_reader
405 .seek(pos_right.into())
406 .expect("must find the given position");
407 let mut left_byte_record = csv::ByteRecord::new();
408 self.csv_seek_left_reader
410 .read_byte_record(&mut left_byte_record)
411 .expect("can be read");
412 let mut right_byte_record = csv::ByteRecord::new();
413 self.csv_seek_right_reader
415 .read_byte_record(&mut right_byte_record)
416 .expect("can be read");
417 let fields_modified = left_byte_record
418 .iter()
419 .enumerate()
420 .zip(right_byte_record.iter())
421 .fold(Vec::new(), |mut acc, ((idx, field_left), field_right)| {
422 if field_left != field_right {
423 acc.push(idx);
424 }
425 acc
426 });
427 Some(DiffByteRecord::Modify {
428 add: ByteRecordLineInfo::new(right_byte_record, pos_right.line),
429 delete: ByteRecordLineInfo::new(left_byte_record, pos_left.line),
430 field_indices: fields_modified,
431 })
432 }
433 _ => None,
434 }),
435 );
436
437 Ok(DiffByteRecords::new(diff_records, headers, num_columns))
438 }
439}