ibu/
parallel.rs

1//! Parallel processing traits for high-throughput record processing.
2//!
3//! This module provides traits for processing IBU records in parallel across multiple threads.
4//! The design separates concerns between processors (which define how to handle records) and
5//! readers (which provide the data and coordinate parallel execution).
6//!
7//! # Architecture
8//!
9//! The parallel processing system uses a work-stealing approach where:
10//! 1. The input data is divided into chunks across available CPU cores
11//! 2. Each thread gets its own clone of the processor
12//! 3. Records are processed in batches to minimize synchronization overhead
13//! 4. Each processor can maintain thread-local state and periodically sync with global state
14//!
15//! # Performance Considerations
16//!
17//! - Batch processing reduces lock contention and improves cache locality
18//! - Thread-local accumulators minimize shared memory access during processing
19//! - The `on_batch_complete` callback allows efficient aggregation of results
20//! - Memory-mapped files enable zero-copy access to records across threads
21
22use crate::{Record, Result};
23
24/// Trait for types that can process records in parallel.
25///
26/// This trait defines how individual records should be processed and how thread-local
27/// results should be aggregated. Implementors must be `Send + Clone` to enable
28/// distribution across threads.
29///
30/// The processing model follows a batch-oriented approach:
31/// 1. Each thread receives its own clone of the processor
32/// 2. Records are processed individually via `process_record`
33/// 3. After processing a batch, `on_batch_complete` is called for aggregation
34/// 4. This cycle repeats until all records are processed
35///
36/// # Thread Safety
37///
38/// Processors must be `Send` and `Clone`. Each thread gets its own clone, so no
39/// explicit synchronization is needed within `process_record`. However, if you need
40/// to aggregate results across threads, use shared state (like `Arc<Mutex<T>>`) and
41/// update it in `on_batch_complete`.
42///
43/// # Examples
44///
45/// ## Simple Record Counter
46///
47/// ```rust
48/// use ibu::{ParallelProcessor, Record};
49/// use std::sync::{Arc, Mutex};
50///
51/// #[derive(Clone, Default)]
52/// struct RecordCounter {
53///     local_count: u64,
54///     global_count: Arc<Mutex<u64>>,
55/// }
56///
57/// impl ParallelProcessor for RecordCounter {
58///     fn process_record(&mut self, _record: Record) -> ibu::Result<()> {
59///         self.local_count += 1;
60///         Ok(())
61///     }
62///
63///     fn on_batch_complete(&mut self) -> ibu::Result<()> {
64///         let mut guard = self.global_count.lock().unwrap();
65///         *guard += self.local_count;
66///         self.local_count = 0;
67///         Ok(())
68///     }
69/// }
70/// ```
71///
72/// ## Barcode Analysis
73///
74/// ```rust
75/// use ibu::{ParallelProcessor, Record};
76/// use std::collections::HashMap;
77/// use std::sync::{Arc, Mutex};
78///
79/// #[derive(Clone)]
80/// struct BarcodeAnalyzer {
81///     local_stats: HashMap<u64, u64>,
82///     global_stats: Arc<Mutex<HashMap<u64, u64>>>,
83/// }
84///
85/// impl ParallelProcessor for BarcodeAnalyzer {
86///     fn process_record(&mut self, record: Record) -> ibu::Result<()> {
87///         *self.local_stats.entry(record.barcode).or_insert(0) += 1;
88///         Ok(())
89///     }
90///
91///     fn on_batch_complete(&mut self) -> ibu::Result<()> {
92///         let mut global = self.global_stats.lock().unwrap();
93///         for (barcode, count) in self.local_stats.drain() {
94///             *global.entry(barcode).or_insert(0) += count;
95///         }
96///         Ok(())
97///     }
98/// }
99/// ```
100pub trait ParallelProcessor: Send + Clone {
101    /// Processes a single record.
102    ///
103    /// This method is called for every record in the dataset. It should be efficient
104    /// and avoid heavy synchronization, as it's called millions of times in typical
105    /// genomics workflows.
106    ///
107    /// Thread-local state can be accumulated here and flushed in `on_batch_complete`.
108    ///
109    /// # Arguments
110    ///
111    /// * `record` - The record to process
112    ///
113    /// # Errors
114    ///
115    /// Should return an error if processing fails. This will stop the entire
116    /// parallel processing operation.
117    fn process_record(&mut self, record: Record) -> Result<()>;
118
119    /// Called when a thread finishes processing a batch of records.
120    ///
121    /// This is the appropriate place to:
122    /// - Flush thread-local accumulators to shared state
123    /// - Perform expensive operations that don't need to happen per-record
124    /// - Update progress indicators
125    ///
126    /// The default implementation does nothing.
127    ///
128    /// # Errors
129    ///
130    /// Should return an error if aggregation fails. This will stop the entire
131    /// parallel processing operation.
132    ///
133    /// # Examples
134    ///
135    /// ```rust
136    /// use ibu::{ParallelProcessor, Record};
137    /// use std::sync::{Arc, Mutex};
138    ///
139    /// #[derive(Clone)]
140    /// struct StatCollector {
141    ///     local_sum: u64,
142    ///     global_sum: Arc<Mutex<u64>>,
143    /// }
144    ///
145    /// impl ParallelProcessor for StatCollector {
146    ///     fn process_record(&mut self, record: Record) -> ibu::Result<()> {
147    ///         self.local_sum += record.index;
148    ///         Ok(())
149    ///     }
150    ///
151    ///     fn on_batch_complete(&mut self) -> ibu::Result<()> {
152    ///         if self.local_sum > 0 {
153    ///             let mut guard = self.global_sum.lock().unwrap();
154    ///             *guard += self.local_sum;
155    ///             self.local_sum = 0;
156    ///         }
157    ///         Ok(())
158    ///     }
159    /// }
160    /// ```
161    #[allow(unused_variables)]
162    fn on_batch_complete(&mut self) -> Result<()> {
163        Ok(())
164    }
165
166    /// Sets the thread ID for this processor instance.
167    ///
168    /// Called once per thread before processing begins. Can be useful for:
169    /// - Thread-specific logging or debugging
170    /// - Implementing thread-aware algorithms
171    /// - Performance profiling per thread
172    ///
173    /// The default implementation does nothing.
174    ///
175    /// # Arguments
176    ///
177    /// * `tid` - Thread ID (0-based index)
178    #[allow(unused_variables)]
179    fn set_tid(&mut self, tid: usize) {
180        // Default implementation does nothing
181    }
182
183    /// Returns the thread ID for this processor instance.
184    ///
185    /// Returns `None` by default. Implement this if you store the thread ID
186    /// in `set_tid`.
187    fn get_tid(&self) -> Option<usize> {
188        None
189    }
190}
191
192/// Trait for IBU readers that can process records in parallel.
193///
194/// This trait is implemented by readers that can efficiently distribute records
195/// across multiple threads for parallel processing. Currently implemented by
196/// [`MmapReader`](crate::MmapReader) for memory-mapped file access.
197///
198/// # Threading Model
199///
200/// The parallel processing uses a divide-and-conquer approach:
201/// 1. The total number of records is divided evenly across threads
202/// 2. Each thread processes its assigned range independently
203/// 3. Within each thread, records are processed in batches for efficiency
204/// 4. Results are aggregated through the processor's `on_batch_complete` method
205///
206/// # Performance
207///
208/// Parallel processing typically scales linearly with the number of CPU cores
209/// for CPU-bound operations. For I/O-bound operations, the benefits depend on
210/// the underlying storage system.
211///
212/// # Examples
213///
214/// ```rust,no_run
215/// use ibu::{MmapReader, ParallelProcessor, ParallelReader, Record};
216/// use std::sync::{Arc, Mutex};
217///
218/// #[derive(Clone, Default)]
219/// struct SimpleCounter {
220///     local: u64,
221///     global: Arc<Mutex<u64>>,
222/// }
223///
224/// impl ParallelProcessor for SimpleCounter {
225///     fn process_record(&mut self, _record: Record) -> ibu::Result<()> {
226///         self.local += 1;
227///         Ok(())
228///     }
229///
230///     fn on_batch_complete(&mut self) -> ibu::Result<()> {
231///         *self.global.lock().unwrap() += self.local;
232///         self.local = 0;
233///         Ok(())
234///     }
235/// }
236///
237/// # fn main() -> ibu::Result<()> {
238/// let reader = MmapReader::new("data.ibu")?;
239/// let counter = SimpleCounter::default();
240///
241/// // Process with 4 threads
242/// reader.process_parallel(counter.clone(), 4)?;
243///
244/// // Check results
245/// let total = *counter.global.lock().unwrap();
246/// println!("Processed {} records", total);
247/// # Ok(())
248/// # }
249/// ```
250pub trait ParallelReader {
251    /// Processes all records in parallel using the specified processor.
252    ///
253    /// Divides the records across the specified number of threads and processes
254    /// them in parallel. Each thread gets its own clone of the processor.
255    ///
256    /// # Arguments
257    ///
258    /// * `processor` - The processor to use for handling records
259    /// * `num_threads` - Number of threads to use (0 = use all available cores)
260    ///
261    /// # Errors
262    ///
263    /// Returns an error if:
264    /// - Any thread encounters a processing error
265    /// - Thread creation or coordination fails
266    /// - The processor returns an error from `process_record` or `on_batch_complete`
267    ///
268    /// # Examples
269    ///
270    /// ```rust,no_run
271    /// use ibu::{MmapReader, ParallelProcessor, ParallelReader, Record};
272    ///
273    /// #[derive(Clone, Default)]
274    /// struct NoOpProcessor;
275    ///
276    /// impl ParallelProcessor for NoOpProcessor {
277    ///     fn process_record(&mut self, _record: Record) -> ibu::Result<()> {
278    ///         Ok(()) // Do nothing
279    ///     }
280    /// }
281    ///
282    /// # fn main() -> ibu::Result<()> {
283    /// let reader = MmapReader::new("data.ibu")?;
284    /// let processor = NoOpProcessor::default();
285    ///
286    /// // Use all available cores
287    /// reader.process_parallel(processor, 0)?;
288    /// # Ok(())
289    /// # }
290    /// ```
291    fn process_parallel<P: ParallelProcessor + Clone + 'static>(
292        &self,
293        processor: P,
294        num_threads: usize,
295    ) -> Result<()>;
296}
297
298#[cfg(test)]
299mod tests {
300    use super::*;
301    use std::sync::atomic::{AtomicU64, Ordering};
302    use std::sync::Arc;
303
304    #[derive(Clone, Default)]
305    struct TestProcessor {
306        local_count: u64,
307        local_sum: u64,
308        global_count: Arc<AtomicU64>,
309        global_sum: Arc<AtomicU64>,
310        tid: Option<usize>,
311    }
312
313    impl ParallelProcessor for TestProcessor {
314        fn process_record(&mut self, record: Record) -> Result<()> {
315            self.local_count += 1;
316            self.local_sum += record.barcode + record.umi + record.index;
317            Ok(())
318        }
319
320        fn on_batch_complete(&mut self) -> Result<()> {
321            self.global_count
322                .fetch_add(self.local_count, Ordering::Relaxed);
323            self.global_sum.fetch_add(self.local_sum, Ordering::Relaxed);
324            self.local_count = 0;
325            self.local_sum = 0;
326            Ok(())
327        }
328
329        fn set_tid(&mut self, tid: usize) {
330            self.tid = Some(tid);
331        }
332
333        fn get_tid(&self) -> Option<usize> {
334            self.tid
335        }
336    }
337
338    #[derive(Clone)]
339    struct ErrorProcessor {
340        fail_on_record: u64,
341        current_record: u64,
342    }
343
344    impl ParallelProcessor for ErrorProcessor {
345        fn process_record(&mut self, record: Record) -> Result<()> {
346            self.current_record += 1;
347            if record.index == self.fail_on_record {
348                return Err(crate::IbuError::Process("Test error".into()));
349            }
350            Ok(())
351        }
352    }
353
354    #[test]
355    fn test_processor_basic_functionality() {
356        let processor = TestProcessor::default();
357        let mut processor_clone = processor.clone();
358
359        // Test set_tid and get_tid
360        assert_eq!(processor_clone.get_tid(), None);
361        processor_clone.set_tid(42);
362        assert_eq!(processor_clone.get_tid(), Some(42));
363
364        // Test processing records
365        let record1 = Record::new(1, 2, 3);
366        let record2 = Record::new(4, 5, 6);
367
368        processor_clone.process_record(record1).unwrap();
369        processor_clone.process_record(record2).unwrap();
370
371        assert_eq!(processor_clone.local_count, 2);
372        assert_eq!(processor_clone.local_sum, 1 + 2 + 3 + 4 + 5 + 6);
373
374        // Test batch completion
375        processor_clone.on_batch_complete().unwrap();
376
377        assert_eq!(processor_clone.local_count, 0);
378        assert_eq!(processor_clone.local_sum, 0);
379        assert_eq!(processor.global_count.load(Ordering::Relaxed), 2);
380        assert_eq!(processor.global_sum.load(Ordering::Relaxed), 21);
381    }
382
383    #[test]
384    fn test_processor_thread_safety() {
385        let processor = TestProcessor::default();
386
387        // Test that processor is Send + Clone
388        fn is_send<T: Send>() {}
389        fn is_clone<T: Clone>() {}
390
391        is_send::<TestProcessor>();
392        is_clone::<TestProcessor>();
393
394        // Test actual cloning
395        let clone1 = processor.clone();
396        let clone2 = processor.clone();
397
398        // Clones should have independent local state
399        let mut clone1 = clone1;
400        let mut clone2 = clone2;
401
402        clone1.set_tid(1);
403        clone2.set_tid(2);
404
405        assert_eq!(clone1.get_tid(), Some(1));
406        assert_eq!(clone2.get_tid(), Some(2));
407
408        // But share global state
409        assert!(Arc::ptr_eq(&clone1.global_count, &clone2.global_count));
410        assert!(Arc::ptr_eq(&clone1.global_sum, &clone2.global_sum));
411    }
412
413    #[test]
414    fn test_error_handling() {
415        let mut processor = ErrorProcessor {
416            fail_on_record: 5,
417            current_record: 0,
418        };
419
420        // Should succeed for records that don't match fail condition
421        let record1 = Record::new(1, 2, 3);
422        assert!(processor.process_record(record1).is_ok());
423
424        let record2 = Record::new(1, 2, 4);
425        assert!(processor.process_record(record2).is_ok());
426
427        // Should fail for record with index 5
428        let record3 = Record::new(1, 2, 5);
429        let result = processor.process_record(record3);
430        assert!(result.is_err());
431
432        match result {
433            Err(crate::IbuError::Process(_)) => {} // Expected
434            other => panic!("Expected Process error, got: {:?}", other),
435        }
436    }
437
438    #[test]
439    fn test_default_implementations() {
440        #[derive(Clone)]
441        struct MinimalProcessor;
442
443        impl ParallelProcessor for MinimalProcessor {
444            fn process_record(&mut self, _record: Record) -> Result<()> {
445                Ok(())
446            }
447        }
448
449        let mut processor = MinimalProcessor;
450
451        // Test default implementations
452        assert!(processor.on_batch_complete().is_ok());
453        assert_eq!(processor.get_tid(), None);
454
455        // set_tid should not panic
456        processor.set_tid(123);
457        assert_eq!(processor.get_tid(), None); // Still None with default impl
458    }
459
460    #[test]
461    fn test_multiple_batch_completions() {
462        let processor = TestProcessor::default();
463        let mut processor_clone = processor.clone();
464
465        // Process some records and complete batch
466        processor_clone
467            .process_record(Record::new(1, 0, 0))
468            .unwrap();
469        processor_clone.on_batch_complete().unwrap();
470
471        // Process more records and complete another batch
472        processor_clone
473            .process_record(Record::new(2, 0, 0))
474            .unwrap();
475        processor_clone
476            .process_record(Record::new(3, 0, 0))
477            .unwrap();
478        processor_clone.on_batch_complete().unwrap();
479
480        // Check accumulated results
481        assert_eq!(processor.global_count.load(Ordering::Relaxed), 3);
482        assert_eq!(processor.global_sum.load(Ordering::Relaxed), 1 + 2 + 3);
483    }
484}