csv_diff/
csv_parser_hasher.rs

1use crossbeam_channel::{Receiver, Sender};
2use csv::Reader;
3use std::collections::HashSet;
4use std::hash::Hasher;
5use std::io::{Read, Seek};
6use xxhash_rust::xxh3::{xxh3_128, Xxh3};
7
8use crate::csv::Csv;
9use crate::csv_hasher::CsvHasherExt;
10use crate::csv_parse_result::{
11    CsvByteRecordWithHash, CsvLeftRightParseResult, CsvParseResult, CsvParseResultLeft,
12    CsvParseResultRight, Position, RecordHash, RecordHashWithPosition,
13};
14
15impl<R> CsvParseResult<CsvLeftRightParseResult<R>, R> for CsvParseResultLeft<R> {
16    #[inline]
17    fn new(record_hash: R) -> Self {
18        Self {
19            csv_left_right_parse_result: CsvLeftRightParseResult::Left(record_hash),
20        }
21    }
22    #[inline]
23    fn into_payload(self) -> CsvLeftRightParseResult<R> {
24        self.csv_left_right_parse_result
25    }
26}
27
28impl<R> CsvParseResult<CsvLeftRightParseResult<R>, R> for CsvParseResultRight<R> {
29    #[inline]
30    fn new(record_hash: R) -> Self {
31        Self {
32            csv_left_right_parse_result: CsvLeftRightParseResult::Right(record_hash),
33        }
34    }
35    #[inline]
36    fn into_payload(self) -> CsvLeftRightParseResult<R> {
37        self.csv_left_right_parse_result
38    }
39}
40
41pub(crate) struct CsvParserHasherLinesSender<T> {
42    sender: Sender<T>,
43    sender_total_lines: Sender<u64>,
44}
45
46impl CsvParserHasherLinesSender<CsvLeftRightParseResult<RecordHashWithPosition>> {
47    pub fn new(
48        sender: Sender<CsvLeftRightParseResult<RecordHashWithPosition>>,
49        sender_total_lines: Sender<u64>,
50    ) -> Self {
51        Self {
52            sender,
53            sender_total_lines,
54        }
55    }
56    pub fn parse_and_hash<
57        R: Read + Seek + Send,
58        T: CsvParseResult<CsvLeftRightParseResult<RecordHashWithPosition>, RecordHashWithPosition>,
59    >(
60        &mut self,
61        csv: Csv<R>,
62        primary_key_columns: &HashSet<usize>,
63    ) -> csv::Result<csv::Reader<R>> {
64        let mut csv_reader: Reader<R> = csv.into_csv_reader();
65        let mut csv_record = csv::ByteRecord::new();
66        // read first record in order to get the number of fields
67        if csv_reader.read_byte_record(&mut csv_record)? {
68            let csv_record_first = std::mem::take(&mut csv_record);
69            let fields_as_key: Vec<_> = primary_key_columns.iter().copied().collect();
70            // TODO: maybe use this in order to only hash fields that are values and not act
71            // as primary keys. We should probably only do this, if primary key field indices are
72            // contiguous, because otherwise we will have multiple calls to our hashing function,
73            // which could hurt performance.
74            // let num_of_fields = csv_record_first.len();
75            // let fields_as_value: Vec<_> = (0..num_of_fields)
76            //     .filter(|x| !primary_key_columns.contains(x))
77            //     .collect();
78
79            let record = csv_record_first;
80            let key_fields_iter = fields_as_key.iter().filter_map(|k_idx| record.get(*k_idx));
81            if key_fields_iter.peekable().peek().is_some() {
82                let key = record.hash_key_fields(fields_as_key.as_slice());
83                // TODO: don't hash all of it -> exclude the key fields (see below)
84                let hash_record = record.hash_record();
85                let pos = record.position().expect("a record position");
86                self.sender
87                    .send(
88                        T::new(RecordHashWithPosition::new(
89                            key,
90                            hash_record,
91                            Position::new(pos.byte(), pos.line()),
92                        ))
93                        .into_payload(),
94                    )
95                    .unwrap();
96                let mut line = 2;
97                while csv_reader.read_byte_record(&mut csv_record)? {
98                    let key = csv_record.hash_key_fields(fields_as_key.as_slice());
99                    let hash_record = csv_record.hash_record();
100                    {
101                        let pos = csv_record.position().expect("a record position");
102                        self.sender
103                            .send(
104                                T::new(RecordHashWithPosition::new(
105                                    key,
106                                    hash_record,
107                                    Position::new(pos.byte(), pos.line()),
108                                ))
109                                .into_payload(),
110                            )
111                            .unwrap();
112                    }
113                    line += 1;
114                }
115                self.sender_total_lines.send(line).unwrap();
116            }
117        } else {
118            self.sender_total_lines.send(0).unwrap();
119        }
120        Ok(csv_reader)
121    }
122}
123
124pub(crate) struct CsvParserHasherSender<T> {
125    sender: Sender<T>,
126}
127
128impl CsvParserHasherSender<CsvLeftRightParseResult<CsvByteRecordWithHash>> {
129    pub fn new(sender: Sender<CsvLeftRightParseResult<CsvByteRecordWithHash>>) -> Self {
130        Self { sender }
131    }
132    pub fn parse_and_hash<
133        R: Read + Send,
134        T: CsvParseResult<CsvLeftRightParseResult<CsvByteRecordWithHash>, CsvByteRecordWithHash>,
135    >(
136        &mut self,
137        csv: Csv<R>,
138        primary_key_columns: &HashSet<usize>,
139        receiver_csv_recycle: Receiver<csv::ByteRecord>,
140    ) {
141        let mut csv_reader: Reader<R> = csv.into_csv_reader();
142        let mut csv_record = csv::ByteRecord::new();
143        // read first record in order to get the number of fields
144        match csv_reader.read_byte_record(&mut csv_record) {
145            Ok(true) => {
146                let record = std::mem::take(&mut csv_record);
147                let fields_as_key: Vec<_> = primary_key_columns.iter().copied().collect();
148                // TODO: maybe use this in order to only hash fields that are values and not act
149                // as primary keys. We should probably only do this, if primary key field indices are
150                // contiguous, because otherwise we will have multiple calls to our hashing function,
151                // which could hurt performance.
152                // let num_of_fields = record.len();
153                // let fields_as_value: Vec<_> = (0..num_of_fields)
154                //     .filter(|x| !primary_key_columns.contains(x))
155                //     .collect();
156
157                let mut hasher = Xxh3::new();
158                let mut key_fields_iter = fields_as_key
159                    .iter()
160                    .filter_map(|k_idx| record.get(*k_idx))
161                    .peekable();
162                if key_fields_iter.peek().is_some() {
163                    // TODO: try to do it with as few calls to `write` as possible (see below)
164                    for key_field in key_fields_iter {
165                        hasher.write(key_field);
166                    }
167                    let key = hasher.digest128();
168                    // TODO: don't hash all of it -> exclude the key fields (see below)
169                    let hash_record = xxh3_128(record.as_slice());
170                    // we ignore any sending errors
171                    let _ = self.sender.send(
172                        T::new(CsvByteRecordWithHash::new(
173                            Ok(record),
174                            RecordHash::new(key, hash_record),
175                        ))
176                        .into_payload(),
177                    );
178
179                    loop {
180                        let mut csv_record = receiver_csv_recycle
181                            .try_recv()
182                            .unwrap_or_else(|_| csv::ByteRecord::new());
183
184                        match csv_reader.read_byte_record(&mut csv_record) {
185                            Ok(true) => {
186                                hasher.reset();
187                                let key_fields = fields_as_key
188                                    .iter()
189                                    .filter_map(|k_idx| csv_record.get(*k_idx));
190                                // TODO: try to do it with as few calls to `write` as possible (see below)
191                                for key_field in key_fields {
192                                    hasher.write(key_field);
193                                }
194                                let key = hasher.digest128();
195                                // TODO: don't hash all of it -> exclude the key fields
196                                // in order to still be efficient and do as few `write` calls as possible
197                                // consider using `csv_record.range(...)` method
198                                let hash_record = xxh3_128(csv_record.as_slice());
199                                if self
200                                    .sender
201                                    .send(
202                                        T::new(CsvByteRecordWithHash::new(
203                                            Ok(csv_record),
204                                            RecordHash::new(key, hash_record),
205                                        ))
206                                        .into_payload(),
207                                    )
208                                    .is_err()
209                                {
210                                    // when the receiver is gone, it doesn't make sense to continue here
211                                    break;
212                                }
213                            }
214                            Ok(false) => break,
215                            Err(e) => {
216                                if self
217                                    .sender
218                                    .send(
219                                        T::new(CsvByteRecordWithHash::new(
220                                            Err(e),
221                                            RecordHash::new(0, 0),
222                                        ))
223                                        .into_payload(),
224                                    )
225                                    .is_err()
226                                {
227                                    // when the receiver is gone, it doesn't make sense to continue here
228                                    break;
229                                }
230                                break;
231                            }
232                        }
233                    }
234                }
235            }
236            Ok(false) => { /* Do nothing, we have reached EOF */ }
237            Err(e) => self
238                .sender
239                .send(
240                    T::new(CsvByteRecordWithHash::new(Err(e), RecordHash::new(0, 0)))
241                        .into_payload(),
242                )
243                .unwrap(),
244        }
245    }
246}
247
248#[derive(Debug)]
249pub(crate) enum HashMapValue<T, TEq = T> {
250    Initial(u128, T),
251    Equal(TEq, TEq),
252    Modified(T, T),
253}