csv_diff/
csv_hash_task_spawner.rs

1use std::{
2    collections::HashSet,
3    io::{Read, Seek},
4    ops::Deref,
5    sync::Arc,
6};
7
8use crate::csv_parse_result::{
9    CsvLeftRightParseResult, CsvParseResult, CsvParseResultLeft, CsvParseResultRight,
10};
11#[cfg(feature = "crossbeam-threads")]
12use crate::thread_scope_strategy::CrossbeamScope;
13#[cfg(feature = "rayon-threads")]
14use crate::thread_scope_strategy::RayonScope;
15use crate::{
16    csv::Csv,
17    csv_hash_receiver_comparer::CsvHashReceiverStreamComparer,
18    csv_parse_result::{CsvByteRecordWithHash, RecordHashWithPosition},
19    csv_parser_hasher::{CsvParserHasherLinesSender, CsvParserHasherSender},
20    diff_result::DiffByteRecordsIterator,
21    thread_scope_strategy::ThreadScoper,
22};
23use crossbeam_channel::{bounded, Receiver, Sender};
24use csv::Reader;
25
26pub struct CsvHashTaskSenderWithRecycleReceiver<R: Read> {
27    sender: Sender<CsvLeftRightParseResult<CsvByteRecordWithHash>>,
28    csv: Csv<R>,
29    receiver_recycle_csv: Receiver<csv::ByteRecord>,
30}
31
32impl<R: Read> CsvHashTaskSenderWithRecycleReceiver<R> {
33    pub(crate) fn new(
34        sender: Sender<CsvLeftRightParseResult<CsvByteRecordWithHash>>,
35        csv: Csv<R>,
36        receiver_recycle_csv: Receiver<csv::ByteRecord>,
37    ) -> Self {
38        Self {
39            sender,
40            csv,
41            receiver_recycle_csv,
42        }
43    }
44}
45
46pub struct CsvHashTaskLineSenders<R: Read> {
47    sender: Sender<CsvLeftRightParseResult<RecordHashWithPosition>>,
48    sender_total_lines: Sender<u64>,
49    sender_csv_reader: Sender<csv::Result<Reader<R>>>,
50    csv: Csv<R>,
51}
52
53impl<R: Read> CsvHashTaskLineSenders<R> {
54    pub(crate) fn new(
55        sender: Sender<CsvLeftRightParseResult<RecordHashWithPosition>>,
56        sender_total_lines: Sender<u64>,
57        sender_csv_reader: Sender<csv::Result<Reader<R>>>,
58        csv: Csv<R>,
59    ) -> Self {
60        Self {
61            sender,
62            sender_total_lines,
63            sender_csv_reader,
64            csv,
65        }
66    }
67}
68
69pub trait CsvHashTaskSpawner {
70    fn spawn_hashing_tasks_and_send_result<R: Read + Send + 'static>(
71        self,
72        csv_hash_task_sender_left: CsvHashTaskSenderWithRecycleReceiver<R>,
73        csv_hash_task_sender_right: CsvHashTaskSenderWithRecycleReceiver<R>,
74        csv_hash_receiver_comparer: CsvHashReceiverStreamComparer,
75        primary_key_columns: HashSet<usize>,
76    ) -> (Self, Receiver<DiffByteRecordsIterator>)
77    where
78        // TODO: this bound is only necessary, because we are returning `self` here;
79        // maybe we can do it differently
80        Self: Sized;
81
82    fn parse_hash_and_send_for_compare<R, P>(
83        csv_hash_task_sender: CsvHashTaskSenderWithRecycleReceiver<R>,
84        primary_key_columns: HashSet<usize>,
85    ) where
86        R: Read + Send,
87        P: CsvParseResult<CsvLeftRightParseResult<CsvByteRecordWithHash>, CsvByteRecordWithHash>,
88    {
89        let mut csv_parser_hasher: CsvParserHasherSender<
90            CsvLeftRightParseResult<CsvByteRecordWithHash>,
91        > = CsvParserHasherSender::new(csv_hash_task_sender.sender);
92        csv_parser_hasher.parse_and_hash::<R, P>(
93            csv_hash_task_sender.csv,
94            &primary_key_columns,
95            csv_hash_task_sender.receiver_recycle_csv,
96        )
97    }
98}
99
100#[derive(Debug)]
101#[cfg(feature = "rayon-threads")]
102pub struct CsvHashTaskSpawnerRayon {
103    thread_pool: OwnOrArc<rayon::ThreadPool>,
104}
105
106#[derive(Debug)]
107enum OwnOrArc<T> {
108    Arced(Arc<T>),
109    Owned(T),
110}
111
112impl<T> Deref for OwnOrArc<T> {
113    type Target = T;
114
115    fn deref(&self) -> &Self::Target {
116        match self {
117            Self::Arced(t) => &*t,
118            Self::Owned(t) => t,
119        }
120    }
121}
122
123#[cfg(feature = "rayon-threads")]
124impl CsvHashTaskSpawnerRayon {
125    pub fn with_thread_pool_arc(thread_pool: Arc<rayon::ThreadPool>) -> Self {
126        Self {
127            thread_pool: OwnOrArc::Arced(thread_pool),
128        }
129    }
130
131    pub fn with_thread_pool_owned(thread_pool: rayon::ThreadPool) -> Self {
132        Self {
133            thread_pool: OwnOrArc::Owned(thread_pool),
134        }
135    }
136}
137
138#[cfg(feature = "rayon-threads")]
139impl CsvHashTaskSpawner for CsvHashTaskSpawnerRayon {
140    fn spawn_hashing_tasks_and_send_result<R: Read + Send + 'static>(
141        self,
142        csv_hash_task_sender_left: CsvHashTaskSenderWithRecycleReceiver<R>,
143        csv_hash_task_sender_right: CsvHashTaskSenderWithRecycleReceiver<R>,
144        csv_hash_receiver_comparer: CsvHashReceiverStreamComparer,
145        primary_key_columns: HashSet<usize>,
146    ) -> (Self, Receiver<DiffByteRecordsIterator>) {
147        let (sender, receiver) = bounded(1);
148
149        let prim_key_columns_clone = primary_key_columns.clone();
150
151        self.thread_pool.spawn(move || {
152            sender
153                .send(csv_hash_receiver_comparer.recv_hashes_and_compare())
154                .unwrap();
155        });
156
157        self.thread_pool.spawn(move || {
158            Self::parse_hash_and_send_for_compare::<R, CsvParseResultLeft<CsvByteRecordWithHash>>(
159                csv_hash_task_sender_left,
160                primary_key_columns,
161            );
162        });
163
164        self.thread_pool.spawn(move || {
165            Self::parse_hash_and_send_for_compare::<R, CsvParseResultRight<CsvByteRecordWithHash>>(
166                csv_hash_task_sender_right,
167                prim_key_columns_clone,
168            );
169        });
170
171        (self, receiver)
172    }
173}
174
175#[derive(Debug, Default)]
176pub struct CsvHashTaskSpawnerStdThreads;
177
178impl CsvHashTaskSpawnerStdThreads {
179    pub fn new() -> Self {
180        Self
181    }
182}
183
184impl CsvHashTaskSpawner for CsvHashTaskSpawnerStdThreads {
185    fn spawn_hashing_tasks_and_send_result<R: Read + Send + 'static>(
186        self,
187        csv_hash_task_sender_left: CsvHashTaskSenderWithRecycleReceiver<R>,
188        csv_hash_task_sender_right: CsvHashTaskSenderWithRecycleReceiver<R>,
189        csv_hash_receiver_comparer: CsvHashReceiverStreamComparer,
190        primary_key_columns: HashSet<usize>,
191    ) -> (Self, Receiver<DiffByteRecordsIterator>)
192    where
193        Self: Sized,
194    {
195        let (sender, receiver) = bounded(1);
196
197        let prim_key_columns_clone = primary_key_columns.clone();
198
199        std::thread::spawn(move || {
200            sender
201                .send(csv_hash_receiver_comparer.recv_hashes_and_compare())
202                .unwrap();
203        });
204
205        std::thread::spawn(move || {
206            Self::parse_hash_and_send_for_compare::<R, CsvParseResultLeft<CsvByteRecordWithHash>>(
207                csv_hash_task_sender_left,
208                primary_key_columns,
209            );
210        });
211
212        std::thread::spawn(move || {
213            Self::parse_hash_and_send_for_compare::<R, CsvParseResultRight<CsvByteRecordWithHash>>(
214                csv_hash_task_sender_right,
215                prim_key_columns_clone,
216            );
217        });
218
219        (self, receiver)
220    }
221}
222
223pub trait CsvHashTaskSpawnerBuilder<T> {
224    fn build(self) -> T;
225}
226
227#[derive(Debug, Default)]
228pub struct CsvHashTaskSpawnerBuilderStdThreads;
229
230impl CsvHashTaskSpawnerBuilderStdThreads {
231    pub fn new() -> Self {
232        Self
233    }
234}
235
236impl CsvHashTaskSpawnerBuilder<CsvHashTaskSpawnerStdThreads>
237    for CsvHashTaskSpawnerBuilderStdThreads
238{
239    fn build(self) -> CsvHashTaskSpawnerStdThreads {
240        CsvHashTaskSpawnerStdThreads::new()
241    }
242}
243
244pub trait CsvHashTaskSpawnerLocal {
245    fn spawn_hashing_tasks_and_send_result<R: Read + Seek + Send>(
246        &self,
247        csv_hash_task_senders_left: CsvHashTaskLineSenders<R>,
248        csv_hash_task_senders_right: CsvHashTaskLineSenders<R>,
249        primary_key_columns: &HashSet<usize>,
250    );
251
252    fn parse_hash_and_send_for_compare<R, P>(
253        csv_hash_task_senders: CsvHashTaskLineSenders<R>,
254        primary_key_columns: &HashSet<usize>,
255    ) where
256        R: Read + Seek + Send,
257        P: CsvParseResult<CsvLeftRightParseResult<RecordHashWithPosition>, RecordHashWithPosition>,
258    {
259        let mut csv_parser_hasher: CsvParserHasherLinesSender<
260            CsvLeftRightParseResult<RecordHashWithPosition>,
261        > = CsvParserHasherLinesSender::new(
262            csv_hash_task_senders.sender,
263            csv_hash_task_senders.sender_total_lines,
264        );
265        csv_hash_task_senders
266            .sender_csv_reader
267            .send(
268                csv_parser_hasher
269                    .parse_and_hash::<R, P>(csv_hash_task_senders.csv, primary_key_columns),
270            )
271            .unwrap();
272    }
273}
274
275#[derive(Debug)]
276#[cfg(feature = "rayon-threads")]
277pub struct CsvHashTaskSpawnerLocalRayon<'tp> {
278    thread_scoper: RayonScope<'tp>,
279}
280
281#[cfg(feature = "rayon-threads")]
282impl<'tp> CsvHashTaskSpawnerLocalRayon<'tp> {
283    pub(crate) fn new(thread_scoper: RayonScope<'tp>) -> Self {
284        Self { thread_scoper }
285    }
286}
287
288#[cfg(feature = "rayon-threads")]
289impl CsvHashTaskSpawnerLocal for CsvHashTaskSpawnerLocalRayon<'_> {
290    fn spawn_hashing_tasks_and_send_result<R>(
291        &self,
292        csv_hash_task_senders_left: CsvHashTaskLineSenders<R>,
293        csv_hash_task_senders_right: CsvHashTaskLineSenders<R>,
294        primary_key_columns: &HashSet<usize>,
295    ) where
296        R: Read + Seek + Send,
297    {
298        self.thread_scoper.scope(move |s| {
299            s.spawn(move |inner_scope| {
300                inner_scope.spawn(move |_s1| {
301                    Self::parse_hash_and_send_for_compare::<
302                        R,
303                        CsvParseResultLeft<RecordHashWithPosition>,
304                    >(csv_hash_task_senders_left, primary_key_columns);
305                });
306                inner_scope.spawn(move |_s2| {
307                    Self::parse_hash_and_send_for_compare::<
308                        R,
309                        CsvParseResultRight<RecordHashWithPosition>,
310                    >(csv_hash_task_senders_right, primary_key_columns);
311                });
312            });
313        });
314    }
315}
316
317#[derive(Debug)]
318#[cfg(feature = "crossbeam-threads")]
319pub struct CsvHashTaskSpawnerLocalCrossbeam {
320    thread_scoper: CrossbeamScope,
321}
322
323#[cfg(feature = "crossbeam-threads")]
324impl CsvHashTaskSpawnerLocalCrossbeam {
325    pub(crate) fn new(thread_scoper: CrossbeamScope) -> Self {
326        Self { thread_scoper }
327    }
328}
329
330#[cfg(feature = "crossbeam-threads")]
331impl CsvHashTaskSpawnerLocal for CsvHashTaskSpawnerLocalCrossbeam {
332    fn spawn_hashing_tasks_and_send_result<R>(
333        &self,
334        csv_hash_task_senders_left: CsvHashTaskLineSenders<R>,
335        csv_hash_task_senders_right: CsvHashTaskLineSenders<R>,
336        primary_key_columns: &HashSet<usize>,
337    ) where
338        R: Read + Seek + Send,
339    {
340        self.thread_scoper.scope(move |s| {
341            s.spawn(move |inner_scope| {
342                inner_scope.spawn(move |_s1| {
343                    Self::parse_hash_and_send_for_compare::<
344                        R,
345                        CsvParseResultLeft<RecordHashWithPosition>,
346                    >(csv_hash_task_senders_left, primary_key_columns);
347                });
348                inner_scope.spawn(move |_s2| {
349                    Self::parse_hash_and_send_for_compare::<
350                        R,
351                        CsvParseResultRight<RecordHashWithPosition>,
352                    >(csv_hash_task_senders_right, primary_key_columns);
353                });
354            });
355        });
356    }
357}
358
359pub trait CsvHashTaskSpawnerLocalBuilder<T> {
360    fn build(self) -> T;
361}
362
363#[cfg(feature = "rayon-threads")]
364pub struct CsvHashTaskSpawnerLocalBuilderRayon<'tp> {
365    thread_pool: &'tp rayon::ThreadPool,
366}
367
368#[cfg(feature = "rayon-threads")]
369impl<'tp> CsvHashTaskSpawnerLocalBuilderRayon<'tp> {
370    pub fn new(thread_pool: &'tp rayon::ThreadPool) -> Self {
371        Self { thread_pool }
372    }
373}
374
375#[cfg(feature = "rayon-threads")]
376impl<'tp> CsvHashTaskSpawnerLocalBuilder<CsvHashTaskSpawnerLocalRayon<'tp>>
377    for CsvHashTaskSpawnerLocalBuilderRayon<'tp>
378{
379    fn build(self) -> CsvHashTaskSpawnerLocalRayon<'tp> {
380        CsvHashTaskSpawnerLocalRayon::new(RayonScope::with_thread_pool_ref(self.thread_pool))
381    }
382}
383
384#[cfg(feature = "crossbeam-threads")]
385pub struct CsvHashTaskSpawnerLocalBuilderCrossbeam;
386
387#[cfg(feature = "crossbeam-threads")]
388impl CsvHashTaskSpawnerLocalBuilderCrossbeam {
389    pub fn new() -> Self {
390        Self
391    }
392}
393
394#[cfg(feature = "crossbeam-threads")]
395impl CsvHashTaskSpawnerLocalBuilder<CsvHashTaskSpawnerLocalCrossbeam>
396    for CsvHashTaskSpawnerLocalBuilderCrossbeam
397{
398    fn build(self) -> CsvHashTaskSpawnerLocalCrossbeam {
399        CsvHashTaskSpawnerLocalCrossbeam::new(CrossbeamScope::new())
400    }
401}