use ahash::AHasher;
use crossbeam_channel::Sender;
use csv::Reader;
use std::collections::HashSet;
use std::hash::Hasher;
use std::io::{Read, Seek};
use crate::csv::Csv;
use crate::csv_parse_result::{
CsvLeftRightParseResult, CsvParseResult, CsvParseResultLeft, CsvParseResultRight, Position,
RecordHash,
};
impl CsvParseResult<CsvLeftRightParseResult, RecordHash> for CsvParseResultLeft {
#[inline]
fn new(record_hash: RecordHash) -> Self {
Self {
csv_left_right_parse_result: CsvLeftRightParseResult::Left(record_hash),
}
}
#[inline]
fn into_payload(self) -> CsvLeftRightParseResult {
self.csv_left_right_parse_result
}
}
impl CsvParseResult<CsvLeftRightParseResult, RecordHash> for CsvParseResultRight {
#[inline]
fn new(record_hash: RecordHash) -> Self {
Self {
csv_left_right_parse_result: CsvLeftRightParseResult::Right(record_hash),
}
}
#[inline]
fn into_payload(self) -> CsvLeftRightParseResult {
self.csv_left_right_parse_result
}
}
pub(crate) const STACK_SIZE_VEC: usize = 8;
pub(crate) type StackVec<T> = smallvec::SmallVec<[T; STACK_SIZE_VEC]>;
pub(crate) struct CsvParserHasherSender<T> {
sender: Sender<StackVec<T>>,
sender_total_lines: Sender<u64>,
}
impl CsvParserHasherSender<CsvLeftRightParseResult> {
pub fn new(
sender: Sender<StackVec<CsvLeftRightParseResult>>,
sender_total_lines: Sender<u64>,
) -> Self {
Self {
sender,
sender_total_lines,
}
}
pub fn parse_and_hash<
R: Read + Seek + Send,
T: CsvParseResult<CsvLeftRightParseResult, RecordHash>,
>(
&mut self,
csv: Csv<R>,
primary_key_columns: &HashSet<usize>,
) -> csv::Result<csv::Reader<R>> {
let mut csv_reader: Reader<R> = csv.into();
let mut csv_record = csv::ByteRecord::new();
if csv_reader.read_byte_record(&mut csv_record)? {
let csv_record_right_first = std::mem::take(&mut csv_record);
let fields_as_key: Vec<_> = primary_key_columns.iter().collect();
let mut records_buff: StackVec<CsvLeftRightParseResult> = smallvec::SmallVec::new();
let mut hasher = AHasher::default();
let record = csv_record_right_first;
let key_fields: Vec<_> = fields_as_key
.iter()
.filter_map(|k_idx| record.get(**k_idx))
.collect();
if !key_fields.is_empty() {
for key_field in key_fields {
hasher.write(key_field);
}
let key = hasher.finish();
hasher.write(record.as_slice());
let pos = record.position().expect("a record position");
records_buff.push(
T::new(RecordHash::new(
key,
hasher.finish(),
Position::new(pos.byte(), pos.line()),
))
.into_payload(),
);
let mut line = 2;
while csv_reader.read_byte_record(&mut csv_record)? {
let mut hasher = AHasher::default();
let key_fields = fields_as_key
.iter()
.filter_map(|k_idx| csv_record.get(**k_idx));
for key_field in key_fields {
hasher.write(key_field);
}
let key = hasher.finish();
hasher.write(csv_record.as_slice());
{
let pos = csv_record.position().expect("a record position");
records_buff.push(
T::new(RecordHash::new(
key,
hasher.finish(),
Position::new(pos.byte(), pos.line()),
))
.into_payload(),
);
if line % STACK_SIZE_VEC as u64 == 0 {
let records_buff_full: StackVec<CsvLeftRightParseResult> =
smallvec::SmallVec::from_slice(records_buff.as_slice());
self.sender.send(records_buff_full).unwrap();
records_buff.clear();
}
}
line += 1;
}
if !records_buff.is_empty() {
self.sender.send(records_buff).unwrap();
}
self.sender_total_lines.send(line).unwrap();
}
} else {
self.sender_total_lines.send(0).unwrap();
}
Ok(csv_reader)
}
}
#[derive(Debug)]
pub(crate) enum HashMapValue {
Initial(u64, Position),
Equal,
Modified(Position, Position),
}