1use std::{
2 collections::HashSet,
3 io::{Read, Seek},
4 ops::Deref,
5 sync::Arc,
6};
7
8use crate::csv_parse_result::{
9 CsvLeftRightParseResult, CsvParseResult, CsvParseResultLeft, CsvParseResultRight,
10};
11#[cfg(feature = "crossbeam-threads")]
12use crate::thread_scope_strategy::CrossbeamScope;
13#[cfg(feature = "rayon-threads")]
14use crate::thread_scope_strategy::RayonScope;
15use crate::{
16 csv::Csv,
17 csv_hash_receiver_comparer::CsvHashReceiverStreamComparer,
18 csv_parse_result::{CsvByteRecordWithHash, RecordHashWithPosition},
19 csv_parser_hasher::{CsvParserHasherLinesSender, CsvParserHasherSender},
20 diff_result::DiffByteRecordsIterator,
21 thread_scope_strategy::ThreadScoper,
22};
23use crossbeam_channel::{bounded, Receiver, Sender};
24use csv::Reader;
25
26pub struct CsvHashTaskSenderWithRecycleReceiver<R: Read> {
27 sender: Sender<CsvLeftRightParseResult<CsvByteRecordWithHash>>,
28 csv: Csv<R>,
29 receiver_recycle_csv: Receiver<csv::ByteRecord>,
30}
31
32impl<R: Read> CsvHashTaskSenderWithRecycleReceiver<R> {
33 pub(crate) fn new(
34 sender: Sender<CsvLeftRightParseResult<CsvByteRecordWithHash>>,
35 csv: Csv<R>,
36 receiver_recycle_csv: Receiver<csv::ByteRecord>,
37 ) -> Self {
38 Self {
39 sender,
40 csv,
41 receiver_recycle_csv,
42 }
43 }
44}
45
46pub struct CsvHashTaskLineSenders<R: Read> {
47 sender: Sender<CsvLeftRightParseResult<RecordHashWithPosition>>,
48 sender_total_lines: Sender<u64>,
49 sender_csv_reader: Sender<csv::Result<Reader<R>>>,
50 csv: Csv<R>,
51}
52
53impl<R: Read> CsvHashTaskLineSenders<R> {
54 pub(crate) fn new(
55 sender: Sender<CsvLeftRightParseResult<RecordHashWithPosition>>,
56 sender_total_lines: Sender<u64>,
57 sender_csv_reader: Sender<csv::Result<Reader<R>>>,
58 csv: Csv<R>,
59 ) -> Self {
60 Self {
61 sender,
62 sender_total_lines,
63 sender_csv_reader,
64 csv,
65 }
66 }
67}
68
69pub trait CsvHashTaskSpawner {
70 fn spawn_hashing_tasks_and_send_result<R: Read + Send + 'static>(
71 self,
72 csv_hash_task_sender_left: CsvHashTaskSenderWithRecycleReceiver<R>,
73 csv_hash_task_sender_right: CsvHashTaskSenderWithRecycleReceiver<R>,
74 csv_hash_receiver_comparer: CsvHashReceiverStreamComparer,
75 primary_key_columns: HashSet<usize>,
76 ) -> (Self, Receiver<DiffByteRecordsIterator>)
77 where
78 Self: Sized;
81
82 fn parse_hash_and_send_for_compare<R, P>(
83 csv_hash_task_sender: CsvHashTaskSenderWithRecycleReceiver<R>,
84 primary_key_columns: HashSet<usize>,
85 ) where
86 R: Read + Send,
87 P: CsvParseResult<CsvLeftRightParseResult<CsvByteRecordWithHash>, CsvByteRecordWithHash>,
88 {
89 let mut csv_parser_hasher: CsvParserHasherSender<
90 CsvLeftRightParseResult<CsvByteRecordWithHash>,
91 > = CsvParserHasherSender::new(csv_hash_task_sender.sender);
92 csv_parser_hasher.parse_and_hash::<R, P>(
93 csv_hash_task_sender.csv,
94 &primary_key_columns,
95 csv_hash_task_sender.receiver_recycle_csv,
96 )
97 }
98}
99
100#[derive(Debug)]
101#[cfg(feature = "rayon-threads")]
102pub struct CsvHashTaskSpawnerRayon {
103 thread_pool: OwnOrArc<rayon::ThreadPool>,
104}
105
106#[derive(Debug)]
107enum OwnOrArc<T> {
108 Arced(Arc<T>),
109 Owned(T),
110}
111
112impl<T> Deref for OwnOrArc<T> {
113 type Target = T;
114
115 fn deref(&self) -> &Self::Target {
116 match self {
117 Self::Arced(t) => &*t,
118 Self::Owned(t) => t,
119 }
120 }
121}
122
123#[cfg(feature = "rayon-threads")]
124impl CsvHashTaskSpawnerRayon {
125 pub fn with_thread_pool_arc(thread_pool: Arc<rayon::ThreadPool>) -> Self {
126 Self {
127 thread_pool: OwnOrArc::Arced(thread_pool),
128 }
129 }
130
131 pub fn with_thread_pool_owned(thread_pool: rayon::ThreadPool) -> Self {
132 Self {
133 thread_pool: OwnOrArc::Owned(thread_pool),
134 }
135 }
136}
137
138#[cfg(feature = "rayon-threads")]
139impl CsvHashTaskSpawner for CsvHashTaskSpawnerRayon {
140 fn spawn_hashing_tasks_and_send_result<R: Read + Send + 'static>(
141 self,
142 csv_hash_task_sender_left: CsvHashTaskSenderWithRecycleReceiver<R>,
143 csv_hash_task_sender_right: CsvHashTaskSenderWithRecycleReceiver<R>,
144 csv_hash_receiver_comparer: CsvHashReceiverStreamComparer,
145 primary_key_columns: HashSet<usize>,
146 ) -> (Self, Receiver<DiffByteRecordsIterator>) {
147 let (sender, receiver) = bounded(1);
148
149 let prim_key_columns_clone = primary_key_columns.clone();
150
151 self.thread_pool.spawn(move || {
152 sender
153 .send(csv_hash_receiver_comparer.recv_hashes_and_compare())
154 .unwrap();
155 });
156
157 self.thread_pool.spawn(move || {
158 Self::parse_hash_and_send_for_compare::<R, CsvParseResultLeft<CsvByteRecordWithHash>>(
159 csv_hash_task_sender_left,
160 primary_key_columns,
161 );
162 });
163
164 self.thread_pool.spawn(move || {
165 Self::parse_hash_and_send_for_compare::<R, CsvParseResultRight<CsvByteRecordWithHash>>(
166 csv_hash_task_sender_right,
167 prim_key_columns_clone,
168 );
169 });
170
171 (self, receiver)
172 }
173}
174
175#[derive(Debug, Default)]
176pub struct CsvHashTaskSpawnerStdThreads;
177
178impl CsvHashTaskSpawnerStdThreads {
179 pub fn new() -> Self {
180 Self
181 }
182}
183
184impl CsvHashTaskSpawner for CsvHashTaskSpawnerStdThreads {
185 fn spawn_hashing_tasks_and_send_result<R: Read + Send + 'static>(
186 self,
187 csv_hash_task_sender_left: CsvHashTaskSenderWithRecycleReceiver<R>,
188 csv_hash_task_sender_right: CsvHashTaskSenderWithRecycleReceiver<R>,
189 csv_hash_receiver_comparer: CsvHashReceiverStreamComparer,
190 primary_key_columns: HashSet<usize>,
191 ) -> (Self, Receiver<DiffByteRecordsIterator>)
192 where
193 Self: Sized,
194 {
195 let (sender, receiver) = bounded(1);
196
197 let prim_key_columns_clone = primary_key_columns.clone();
198
199 std::thread::spawn(move || {
200 sender
201 .send(csv_hash_receiver_comparer.recv_hashes_and_compare())
202 .unwrap();
203 });
204
205 std::thread::spawn(move || {
206 Self::parse_hash_and_send_for_compare::<R, CsvParseResultLeft<CsvByteRecordWithHash>>(
207 csv_hash_task_sender_left,
208 primary_key_columns,
209 );
210 });
211
212 std::thread::spawn(move || {
213 Self::parse_hash_and_send_for_compare::<R, CsvParseResultRight<CsvByteRecordWithHash>>(
214 csv_hash_task_sender_right,
215 prim_key_columns_clone,
216 );
217 });
218
219 (self, receiver)
220 }
221}
222
223pub trait CsvHashTaskSpawnerBuilder<T> {
224 fn build(self) -> T;
225}
226
227#[derive(Debug, Default)]
228pub struct CsvHashTaskSpawnerBuilderStdThreads;
229
230impl CsvHashTaskSpawnerBuilderStdThreads {
231 pub fn new() -> Self {
232 Self
233 }
234}
235
236impl CsvHashTaskSpawnerBuilder<CsvHashTaskSpawnerStdThreads>
237 for CsvHashTaskSpawnerBuilderStdThreads
238{
239 fn build(self) -> CsvHashTaskSpawnerStdThreads {
240 CsvHashTaskSpawnerStdThreads::new()
241 }
242}
243
244pub trait CsvHashTaskSpawnerLocal {
245 fn spawn_hashing_tasks_and_send_result<R: Read + Seek + Send>(
246 &self,
247 csv_hash_task_senders_left: CsvHashTaskLineSenders<R>,
248 csv_hash_task_senders_right: CsvHashTaskLineSenders<R>,
249 primary_key_columns: &HashSet<usize>,
250 );
251
252 fn parse_hash_and_send_for_compare<R, P>(
253 csv_hash_task_senders: CsvHashTaskLineSenders<R>,
254 primary_key_columns: &HashSet<usize>,
255 ) where
256 R: Read + Seek + Send,
257 P: CsvParseResult<CsvLeftRightParseResult<RecordHashWithPosition>, RecordHashWithPosition>,
258 {
259 let mut csv_parser_hasher: CsvParserHasherLinesSender<
260 CsvLeftRightParseResult<RecordHashWithPosition>,
261 > = CsvParserHasherLinesSender::new(
262 csv_hash_task_senders.sender,
263 csv_hash_task_senders.sender_total_lines,
264 );
265 csv_hash_task_senders
266 .sender_csv_reader
267 .send(
268 csv_parser_hasher
269 .parse_and_hash::<R, P>(csv_hash_task_senders.csv, primary_key_columns),
270 )
271 .unwrap();
272 }
273}
274
275#[derive(Debug)]
276#[cfg(feature = "rayon-threads")]
277pub struct CsvHashTaskSpawnerLocalRayon<'tp> {
278 thread_scoper: RayonScope<'tp>,
279}
280
281#[cfg(feature = "rayon-threads")]
282impl<'tp> CsvHashTaskSpawnerLocalRayon<'tp> {
283 pub(crate) fn new(thread_scoper: RayonScope<'tp>) -> Self {
284 Self { thread_scoper }
285 }
286}
287
288#[cfg(feature = "rayon-threads")]
289impl CsvHashTaskSpawnerLocal for CsvHashTaskSpawnerLocalRayon<'_> {
290 fn spawn_hashing_tasks_and_send_result<R>(
291 &self,
292 csv_hash_task_senders_left: CsvHashTaskLineSenders<R>,
293 csv_hash_task_senders_right: CsvHashTaskLineSenders<R>,
294 primary_key_columns: &HashSet<usize>,
295 ) where
296 R: Read + Seek + Send,
297 {
298 self.thread_scoper.scope(move |s| {
299 s.spawn(move |inner_scope| {
300 inner_scope.spawn(move |_s1| {
301 Self::parse_hash_and_send_for_compare::<
302 R,
303 CsvParseResultLeft<RecordHashWithPosition>,
304 >(csv_hash_task_senders_left, primary_key_columns);
305 });
306 inner_scope.spawn(move |_s2| {
307 Self::parse_hash_and_send_for_compare::<
308 R,
309 CsvParseResultRight<RecordHashWithPosition>,
310 >(csv_hash_task_senders_right, primary_key_columns);
311 });
312 });
313 });
314 }
315}
316
317#[derive(Debug)]
318#[cfg(feature = "crossbeam-threads")]
319pub struct CsvHashTaskSpawnerLocalCrossbeam {
320 thread_scoper: CrossbeamScope,
321}
322
323#[cfg(feature = "crossbeam-threads")]
324impl CsvHashTaskSpawnerLocalCrossbeam {
325 pub(crate) fn new(thread_scoper: CrossbeamScope) -> Self {
326 Self { thread_scoper }
327 }
328}
329
330#[cfg(feature = "crossbeam-threads")]
331impl CsvHashTaskSpawnerLocal for CsvHashTaskSpawnerLocalCrossbeam {
332 fn spawn_hashing_tasks_and_send_result<R>(
333 &self,
334 csv_hash_task_senders_left: CsvHashTaskLineSenders<R>,
335 csv_hash_task_senders_right: CsvHashTaskLineSenders<R>,
336 primary_key_columns: &HashSet<usize>,
337 ) where
338 R: Read + Seek + Send,
339 {
340 self.thread_scoper.scope(move |s| {
341 s.spawn(move |inner_scope| {
342 inner_scope.spawn(move |_s1| {
343 Self::parse_hash_and_send_for_compare::<
344 R,
345 CsvParseResultLeft<RecordHashWithPosition>,
346 >(csv_hash_task_senders_left, primary_key_columns);
347 });
348 inner_scope.spawn(move |_s2| {
349 Self::parse_hash_and_send_for_compare::<
350 R,
351 CsvParseResultRight<RecordHashWithPosition>,
352 >(csv_hash_task_senders_right, primary_key_columns);
353 });
354 });
355 });
356 }
357}
358
359pub trait CsvHashTaskSpawnerLocalBuilder<T> {
360 fn build(self) -> T;
361}
362
363#[cfg(feature = "rayon-threads")]
364pub struct CsvHashTaskSpawnerLocalBuilderRayon<'tp> {
365 thread_pool: &'tp rayon::ThreadPool,
366}
367
368#[cfg(feature = "rayon-threads")]
369impl<'tp> CsvHashTaskSpawnerLocalBuilderRayon<'tp> {
370 pub fn new(thread_pool: &'tp rayon::ThreadPool) -> Self {
371 Self { thread_pool }
372 }
373}
374
375#[cfg(feature = "rayon-threads")]
376impl<'tp> CsvHashTaskSpawnerLocalBuilder<CsvHashTaskSpawnerLocalRayon<'tp>>
377 for CsvHashTaskSpawnerLocalBuilderRayon<'tp>
378{
379 fn build(self) -> CsvHashTaskSpawnerLocalRayon<'tp> {
380 CsvHashTaskSpawnerLocalRayon::new(RayonScope::with_thread_pool_ref(self.thread_pool))
381 }
382}
383
384#[cfg(feature = "crossbeam-threads")]
385pub struct CsvHashTaskSpawnerLocalBuilderCrossbeam;
386
387#[cfg(feature = "crossbeam-threads")]
388impl CsvHashTaskSpawnerLocalBuilderCrossbeam {
389 pub fn new() -> Self {
390 Self
391 }
392}
393
394#[cfg(feature = "crossbeam-threads")]
395impl CsvHashTaskSpawnerLocalBuilder<CsvHashTaskSpawnerLocalCrossbeam>
396 for CsvHashTaskSpawnerLocalBuilderCrossbeam
397{
398 fn build(self) -> CsvHashTaskSpawnerLocalCrossbeam {
399 CsvHashTaskSpawnerLocalCrossbeam::new(CrossbeamScope::new())
400 }
401}