csv_diff/
csv_parser_hasher.rs1use crossbeam_channel::{Receiver, Sender};
2use csv::Reader;
3use std::collections::HashSet;
4use std::hash::Hasher;
5use std::io::{Read, Seek};
6use xxhash_rust::xxh3::{xxh3_128, Xxh3};
7
8use crate::csv::Csv;
9use crate::csv_hasher::CsvHasherExt;
10use crate::csv_parse_result::{
11 CsvByteRecordWithHash, CsvLeftRightParseResult, CsvParseResult, CsvParseResultLeft,
12 CsvParseResultRight, Position, RecordHash, RecordHashWithPosition,
13};
14
15impl<R> CsvParseResult<CsvLeftRightParseResult<R>, R> for CsvParseResultLeft<R> {
16 #[inline]
17 fn new(record_hash: R) -> Self {
18 Self {
19 csv_left_right_parse_result: CsvLeftRightParseResult::Left(record_hash),
20 }
21 }
22 #[inline]
23 fn into_payload(self) -> CsvLeftRightParseResult<R> {
24 self.csv_left_right_parse_result
25 }
26}
27
28impl<R> CsvParseResult<CsvLeftRightParseResult<R>, R> for CsvParseResultRight<R> {
29 #[inline]
30 fn new(record_hash: R) -> Self {
31 Self {
32 csv_left_right_parse_result: CsvLeftRightParseResult::Right(record_hash),
33 }
34 }
35 #[inline]
36 fn into_payload(self) -> CsvLeftRightParseResult<R> {
37 self.csv_left_right_parse_result
38 }
39}
40
41pub(crate) struct CsvParserHasherLinesSender<T> {
42 sender: Sender<T>,
43 sender_total_lines: Sender<u64>,
44}
45
46impl CsvParserHasherLinesSender<CsvLeftRightParseResult<RecordHashWithPosition>> {
47 pub fn new(
48 sender: Sender<CsvLeftRightParseResult<RecordHashWithPosition>>,
49 sender_total_lines: Sender<u64>,
50 ) -> Self {
51 Self {
52 sender,
53 sender_total_lines,
54 }
55 }
56 pub fn parse_and_hash<
57 R: Read + Seek + Send,
58 T: CsvParseResult<CsvLeftRightParseResult<RecordHashWithPosition>, RecordHashWithPosition>,
59 >(
60 &mut self,
61 csv: Csv<R>,
62 primary_key_columns: &HashSet<usize>,
63 ) -> csv::Result<csv::Reader<R>> {
64 let mut csv_reader: Reader<R> = csv.into_csv_reader();
65 let mut csv_record = csv::ByteRecord::new();
66 if csv_reader.read_byte_record(&mut csv_record)? {
68 let csv_record_first = std::mem::take(&mut csv_record);
69 let fields_as_key: Vec<_> = primary_key_columns.iter().copied().collect();
70 let record = csv_record_first;
80 let key_fields_iter = fields_as_key.iter().filter_map(|k_idx| record.get(*k_idx));
81 if key_fields_iter.peekable().peek().is_some() {
82 let key = record.hash_key_fields(fields_as_key.as_slice());
83 let hash_record = record.hash_record();
85 let pos = record.position().expect("a record position");
86 self.sender
87 .send(
88 T::new(RecordHashWithPosition::new(
89 key,
90 hash_record,
91 Position::new(pos.byte(), pos.line()),
92 ))
93 .into_payload(),
94 )
95 .unwrap();
96 let mut line = 2;
97 while csv_reader.read_byte_record(&mut csv_record)? {
98 let key = csv_record.hash_key_fields(fields_as_key.as_slice());
99 let hash_record = csv_record.hash_record();
100 {
101 let pos = csv_record.position().expect("a record position");
102 self.sender
103 .send(
104 T::new(RecordHashWithPosition::new(
105 key,
106 hash_record,
107 Position::new(pos.byte(), pos.line()),
108 ))
109 .into_payload(),
110 )
111 .unwrap();
112 }
113 line += 1;
114 }
115 self.sender_total_lines.send(line).unwrap();
116 }
117 } else {
118 self.sender_total_lines.send(0).unwrap();
119 }
120 Ok(csv_reader)
121 }
122}
123
124pub(crate) struct CsvParserHasherSender<T> {
125 sender: Sender<T>,
126}
127
128impl CsvParserHasherSender<CsvLeftRightParseResult<CsvByteRecordWithHash>> {
129 pub fn new(sender: Sender<CsvLeftRightParseResult<CsvByteRecordWithHash>>) -> Self {
130 Self { sender }
131 }
132 pub fn parse_and_hash<
133 R: Read + Send,
134 T: CsvParseResult<CsvLeftRightParseResult<CsvByteRecordWithHash>, CsvByteRecordWithHash>,
135 >(
136 &mut self,
137 csv: Csv<R>,
138 primary_key_columns: &HashSet<usize>,
139 receiver_csv_recycle: Receiver<csv::ByteRecord>,
140 ) {
141 let mut csv_reader: Reader<R> = csv.into_csv_reader();
142 let mut csv_record = csv::ByteRecord::new();
143 match csv_reader.read_byte_record(&mut csv_record) {
145 Ok(true) => {
146 let record = std::mem::take(&mut csv_record);
147 let fields_as_key: Vec<_> = primary_key_columns.iter().copied().collect();
148 let mut hasher = Xxh3::new();
158 let mut key_fields_iter = fields_as_key
159 .iter()
160 .filter_map(|k_idx| record.get(*k_idx))
161 .peekable();
162 if key_fields_iter.peek().is_some() {
163 for key_field in key_fields_iter {
165 hasher.write(key_field);
166 }
167 let key = hasher.digest128();
168 let hash_record = xxh3_128(record.as_slice());
170 let _ = self.sender.send(
172 T::new(CsvByteRecordWithHash::new(
173 Ok(record),
174 RecordHash::new(key, hash_record),
175 ))
176 .into_payload(),
177 );
178
179 loop {
180 let mut csv_record = receiver_csv_recycle
181 .try_recv()
182 .unwrap_or_else(|_| csv::ByteRecord::new());
183
184 match csv_reader.read_byte_record(&mut csv_record) {
185 Ok(true) => {
186 hasher.reset();
187 let key_fields = fields_as_key
188 .iter()
189 .filter_map(|k_idx| csv_record.get(*k_idx));
190 for key_field in key_fields {
192 hasher.write(key_field);
193 }
194 let key = hasher.digest128();
195 let hash_record = xxh3_128(csv_record.as_slice());
199 if self
200 .sender
201 .send(
202 T::new(CsvByteRecordWithHash::new(
203 Ok(csv_record),
204 RecordHash::new(key, hash_record),
205 ))
206 .into_payload(),
207 )
208 .is_err()
209 {
210 break;
212 }
213 }
214 Ok(false) => break,
215 Err(e) => {
216 if self
217 .sender
218 .send(
219 T::new(CsvByteRecordWithHash::new(
220 Err(e),
221 RecordHash::new(0, 0),
222 ))
223 .into_payload(),
224 )
225 .is_err()
226 {
227 break;
229 }
230 break;
231 }
232 }
233 }
234 }
235 }
236 Ok(false) => { }
237 Err(e) => self
238 .sender
239 .send(
240 T::new(CsvByteRecordWithHash::new(Err(e), RecordHash::new(0, 0)))
241 .into_payload(),
242 )
243 .unwrap(),
244 }
245 }
246}
247
248#[derive(Debug)]
249pub(crate) enum HashMapValue<T, TEq = T> {
250 Initial(u128, T),
251 Equal(TEq, TEq),
252 Modified(T, T),
253}