ibu/parallel.rs
1//! Parallel processing traits for high-throughput record processing.
2//!
3//! This module provides traits for processing IBU records in parallel across multiple threads.
4//! The design separates concerns between processors (which define how to handle records) and
5//! readers (which provide the data and coordinate parallel execution).
6//!
7//! # Architecture
8//!
9//! The parallel processing system uses a work-stealing approach where:
10//! 1. The input data is divided into chunks across available CPU cores
11//! 2. Each thread gets its own clone of the processor
12//! 3. Records are processed in batches to minimize synchronization overhead
13//! 4. Each processor can maintain thread-local state and periodically sync with global state
14//!
15//! # Performance Considerations
16//!
17//! - Batch processing reduces lock contention and improves cache locality
18//! - Thread-local accumulators minimize shared memory access during processing
19//! - The `on_batch_complete` callback allows efficient aggregation of results
20//! - Memory-mapped files enable zero-copy access to records across threads
21
22use crate::{Record, Result};
23
24/// Trait for types that can process records in parallel.
25///
26/// This trait defines how individual records should be processed and how thread-local
27/// results should be aggregated. Implementors must be `Send + Clone` to enable
28/// distribution across threads.
29///
30/// The processing model follows a batch-oriented approach:
31/// 1. Each thread receives its own clone of the processor
32/// 2. Records are processed individually via `process_record`
33/// 3. After processing a batch, `on_batch_complete` is called for aggregation
34/// 4. This cycle repeats until all records are processed
35///
36/// # Thread Safety
37///
38/// Processors must be `Send` and `Clone`. Each thread gets its own clone, so no
39/// explicit synchronization is needed within `process_record`. However, if you need
40/// to aggregate results across threads, use shared state (like `Arc<Mutex<T>>`) and
41/// update it in `on_batch_complete`.
42///
43/// # Examples
44///
45/// ## Simple Record Counter
46///
47/// ```rust
48/// use ibu::{ParallelProcessor, Record};
49/// use std::sync::{Arc, Mutex};
50///
51/// #[derive(Clone, Default)]
52/// struct RecordCounter {
53/// local_count: u64,
54/// global_count: Arc<Mutex<u64>>,
55/// }
56///
57/// impl ParallelProcessor for RecordCounter {
58/// fn process_record(&mut self, _record: Record) -> ibu::Result<()> {
59/// self.local_count += 1;
60/// Ok(())
61/// }
62///
63/// fn on_batch_complete(&mut self) -> ibu::Result<()> {
64/// let mut guard = self.global_count.lock().unwrap();
65/// *guard += self.local_count;
66/// self.local_count = 0;
67/// Ok(())
68/// }
69/// }
70/// ```
71///
72/// ## Barcode Analysis
73///
74/// ```rust
75/// use ibu::{ParallelProcessor, Record};
76/// use std::collections::HashMap;
77/// use std::sync::{Arc, Mutex};
78///
79/// #[derive(Clone)]
80/// struct BarcodeAnalyzer {
81/// local_stats: HashMap<u64, u64>,
82/// global_stats: Arc<Mutex<HashMap<u64, u64>>>,
83/// }
84///
85/// impl ParallelProcessor for BarcodeAnalyzer {
86/// fn process_record(&mut self, record: Record) -> ibu::Result<()> {
87/// *self.local_stats.entry(record.barcode).or_insert(0) += 1;
88/// Ok(())
89/// }
90///
91/// fn on_batch_complete(&mut self) -> ibu::Result<()> {
92/// let mut global = self.global_stats.lock().unwrap();
93/// for (barcode, count) in self.local_stats.drain() {
94/// *global.entry(barcode).or_insert(0) += count;
95/// }
96/// Ok(())
97/// }
98/// }
99/// ```
100pub trait ParallelProcessor: Send + Clone {
101 /// Processes a single record.
102 ///
103 /// This method is called for every record in the dataset. It should be efficient
104 /// and avoid heavy synchronization, as it's called millions of times in typical
105 /// genomics workflows.
106 ///
107 /// Thread-local state can be accumulated here and flushed in `on_batch_complete`.
108 ///
109 /// # Arguments
110 ///
111 /// * `record` - The record to process
112 ///
113 /// # Errors
114 ///
115 /// Should return an error if processing fails. This will stop the entire
116 /// parallel processing operation.
117 fn process_record(&mut self, record: Record) -> Result<()>;
118
119 /// Called when a thread finishes processing a batch of records.
120 ///
121 /// This is the appropriate place to:
122 /// - Flush thread-local accumulators to shared state
123 /// - Perform expensive operations that don't need to happen per-record
124 /// - Update progress indicators
125 ///
126 /// The default implementation does nothing.
127 ///
128 /// # Errors
129 ///
130 /// Should return an error if aggregation fails. This will stop the entire
131 /// parallel processing operation.
132 ///
133 /// # Examples
134 ///
135 /// ```rust
136 /// use ibu::{ParallelProcessor, Record};
137 /// use std::sync::{Arc, Mutex};
138 ///
139 /// #[derive(Clone)]
140 /// struct StatCollector {
141 /// local_sum: u64,
142 /// global_sum: Arc<Mutex<u64>>,
143 /// }
144 ///
145 /// impl ParallelProcessor for StatCollector {
146 /// fn process_record(&mut self, record: Record) -> ibu::Result<()> {
147 /// self.local_sum += record.index;
148 /// Ok(())
149 /// }
150 ///
151 /// fn on_batch_complete(&mut self) -> ibu::Result<()> {
152 /// if self.local_sum > 0 {
153 /// let mut guard = self.global_sum.lock().unwrap();
154 /// *guard += self.local_sum;
155 /// self.local_sum = 0;
156 /// }
157 /// Ok(())
158 /// }
159 /// }
160 /// ```
161 #[allow(unused_variables)]
162 fn on_batch_complete(&mut self) -> Result<()> {
163 Ok(())
164 }
165
166 /// Sets the thread ID for this processor instance.
167 ///
168 /// Called once per thread before processing begins. Can be useful for:
169 /// - Thread-specific logging or debugging
170 /// - Implementing thread-aware algorithms
171 /// - Performance profiling per thread
172 ///
173 /// The default implementation does nothing.
174 ///
175 /// # Arguments
176 ///
177 /// * `tid` - Thread ID (0-based index)
178 #[allow(unused_variables)]
179 fn set_tid(&mut self, tid: usize) {
180 // Default implementation does nothing
181 }
182
183 /// Returns the thread ID for this processor instance.
184 ///
185 /// Returns `None` by default. Implement this if you store the thread ID
186 /// in `set_tid`.
187 fn get_tid(&self) -> Option<usize> {
188 None
189 }
190}
191
192/// Trait for IBU readers that can process records in parallel.
193///
194/// This trait is implemented by readers that can efficiently distribute records
195/// across multiple threads for parallel processing. Currently implemented by
196/// [`MmapReader`](crate::MmapReader) for memory-mapped file access.
197///
198/// # Threading Model
199///
200/// The parallel processing uses a divide-and-conquer approach:
201/// 1. The total number of records is divided evenly across threads
202/// 2. Each thread processes its assigned range independently
203/// 3. Within each thread, records are processed in batches for efficiency
204/// 4. Results are aggregated through the processor's `on_batch_complete` method
205///
206/// # Performance
207///
208/// Parallel processing typically scales linearly with the number of CPU cores
209/// for CPU-bound operations. For I/O-bound operations, the benefits depend on
210/// the underlying storage system.
211///
212/// # Examples
213///
214/// ```rust,no_run
215/// use ibu::{MmapReader, ParallelProcessor, ParallelReader, Record};
216/// use std::sync::{Arc, Mutex};
217///
218/// #[derive(Clone, Default)]
219/// struct SimpleCounter {
220/// local: u64,
221/// global: Arc<Mutex<u64>>,
222/// }
223///
224/// impl ParallelProcessor for SimpleCounter {
225/// fn process_record(&mut self, _record: Record) -> ibu::Result<()> {
226/// self.local += 1;
227/// Ok(())
228/// }
229///
230/// fn on_batch_complete(&mut self) -> ibu::Result<()> {
231/// *self.global.lock().unwrap() += self.local;
232/// self.local = 0;
233/// Ok(())
234/// }
235/// }
236///
237/// # fn main() -> ibu::Result<()> {
238/// let reader = MmapReader::new("data.ibu")?;
239/// let counter = SimpleCounter::default();
240///
241/// // Process with 4 threads
242/// reader.process_parallel(counter.clone(), 4)?;
243///
244/// // Check results
245/// let total = *counter.global.lock().unwrap();
246/// println!("Processed {} records", total);
247/// # Ok(())
248/// # }
249/// ```
250pub trait ParallelReader {
251 /// Processes all records in parallel using the specified processor.
252 ///
253 /// Divides the records across the specified number of threads and processes
254 /// them in parallel. Each thread gets its own clone of the processor.
255 ///
256 /// # Arguments
257 ///
258 /// * `processor` - The processor to use for handling records
259 /// * `num_threads` - Number of threads to use (0 = use all available cores)
260 ///
261 /// # Errors
262 ///
263 /// Returns an error if:
264 /// - Any thread encounters a processing error
265 /// - Thread creation or coordination fails
266 /// - The processor returns an error from `process_record` or `on_batch_complete`
267 ///
268 /// # Examples
269 ///
270 /// ```rust,no_run
271 /// use ibu::{MmapReader, ParallelProcessor, ParallelReader, Record};
272 ///
273 /// #[derive(Clone, Default)]
274 /// struct NoOpProcessor;
275 ///
276 /// impl ParallelProcessor for NoOpProcessor {
277 /// fn process_record(&mut self, _record: Record) -> ibu::Result<()> {
278 /// Ok(()) // Do nothing
279 /// }
280 /// }
281 ///
282 /// # fn main() -> ibu::Result<()> {
283 /// let reader = MmapReader::new("data.ibu")?;
284 /// let processor = NoOpProcessor::default();
285 ///
286 /// // Use all available cores
287 /// reader.process_parallel(processor, 0)?;
288 /// # Ok(())
289 /// # }
290 /// ```
291 fn process_parallel<P: ParallelProcessor + Clone + 'static>(
292 &self,
293 processor: P,
294 num_threads: usize,
295 ) -> Result<()>;
296}
297
298#[cfg(test)]
299mod tests {
300 use super::*;
301 use std::sync::atomic::{AtomicU64, Ordering};
302 use std::sync::Arc;
303
304 #[derive(Clone, Default)]
305 struct TestProcessor {
306 local_count: u64,
307 local_sum: u64,
308 global_count: Arc<AtomicU64>,
309 global_sum: Arc<AtomicU64>,
310 tid: Option<usize>,
311 }
312
313 impl ParallelProcessor for TestProcessor {
314 fn process_record(&mut self, record: Record) -> Result<()> {
315 self.local_count += 1;
316 self.local_sum += record.barcode + record.umi + record.index;
317 Ok(())
318 }
319
320 fn on_batch_complete(&mut self) -> Result<()> {
321 self.global_count
322 .fetch_add(self.local_count, Ordering::Relaxed);
323 self.global_sum.fetch_add(self.local_sum, Ordering::Relaxed);
324 self.local_count = 0;
325 self.local_sum = 0;
326 Ok(())
327 }
328
329 fn set_tid(&mut self, tid: usize) {
330 self.tid = Some(tid);
331 }
332
333 fn get_tid(&self) -> Option<usize> {
334 self.tid
335 }
336 }
337
338 #[derive(Clone)]
339 struct ErrorProcessor {
340 fail_on_record: u64,
341 current_record: u64,
342 }
343
344 impl ParallelProcessor for ErrorProcessor {
345 fn process_record(&mut self, record: Record) -> Result<()> {
346 self.current_record += 1;
347 if record.index == self.fail_on_record {
348 return Err(crate::IbuError::Process("Test error".into()));
349 }
350 Ok(())
351 }
352 }
353
354 #[test]
355 fn test_processor_basic_functionality() {
356 let processor = TestProcessor::default();
357 let mut processor_clone = processor.clone();
358
359 // Test set_tid and get_tid
360 assert_eq!(processor_clone.get_tid(), None);
361 processor_clone.set_tid(42);
362 assert_eq!(processor_clone.get_tid(), Some(42));
363
364 // Test processing records
365 let record1 = Record::new(1, 2, 3);
366 let record2 = Record::new(4, 5, 6);
367
368 processor_clone.process_record(record1).unwrap();
369 processor_clone.process_record(record2).unwrap();
370
371 assert_eq!(processor_clone.local_count, 2);
372 assert_eq!(processor_clone.local_sum, 1 + 2 + 3 + 4 + 5 + 6);
373
374 // Test batch completion
375 processor_clone.on_batch_complete().unwrap();
376
377 assert_eq!(processor_clone.local_count, 0);
378 assert_eq!(processor_clone.local_sum, 0);
379 assert_eq!(processor.global_count.load(Ordering::Relaxed), 2);
380 assert_eq!(processor.global_sum.load(Ordering::Relaxed), 21);
381 }
382
383 #[test]
384 fn test_processor_thread_safety() {
385 let processor = TestProcessor::default();
386
387 // Test that processor is Send + Clone
388 fn is_send<T: Send>() {}
389 fn is_clone<T: Clone>() {}
390
391 is_send::<TestProcessor>();
392 is_clone::<TestProcessor>();
393
394 // Test actual cloning
395 let clone1 = processor.clone();
396 let clone2 = processor.clone();
397
398 // Clones should have independent local state
399 let mut clone1 = clone1;
400 let mut clone2 = clone2;
401
402 clone1.set_tid(1);
403 clone2.set_tid(2);
404
405 assert_eq!(clone1.get_tid(), Some(1));
406 assert_eq!(clone2.get_tid(), Some(2));
407
408 // But share global state
409 assert!(Arc::ptr_eq(&clone1.global_count, &clone2.global_count));
410 assert!(Arc::ptr_eq(&clone1.global_sum, &clone2.global_sum));
411 }
412
413 #[test]
414 fn test_error_handling() {
415 let mut processor = ErrorProcessor {
416 fail_on_record: 5,
417 current_record: 0,
418 };
419
420 // Should succeed for records that don't match fail condition
421 let record1 = Record::new(1, 2, 3);
422 assert!(processor.process_record(record1).is_ok());
423
424 let record2 = Record::new(1, 2, 4);
425 assert!(processor.process_record(record2).is_ok());
426
427 // Should fail for record with index 5
428 let record3 = Record::new(1, 2, 5);
429 let result = processor.process_record(record3);
430 assert!(result.is_err());
431
432 match result {
433 Err(crate::IbuError::Process(_)) => {} // Expected
434 other => panic!("Expected Process error, got: {:?}", other),
435 }
436 }
437
438 #[test]
439 fn test_default_implementations() {
440 #[derive(Clone)]
441 struct MinimalProcessor;
442
443 impl ParallelProcessor for MinimalProcessor {
444 fn process_record(&mut self, _record: Record) -> Result<()> {
445 Ok(())
446 }
447 }
448
449 let mut processor = MinimalProcessor;
450
451 // Test default implementations
452 assert!(processor.on_batch_complete().is_ok());
453 assert_eq!(processor.get_tid(), None);
454
455 // set_tid should not panic
456 processor.set_tid(123);
457 assert_eq!(processor.get_tid(), None); // Still None with default impl
458 }
459
460 #[test]
461 fn test_multiple_batch_completions() {
462 let processor = TestProcessor::default();
463 let mut processor_clone = processor.clone();
464
465 // Process some records and complete batch
466 processor_clone
467 .process_record(Record::new(1, 0, 0))
468 .unwrap();
469 processor_clone.on_batch_complete().unwrap();
470
471 // Process more records and complete another batch
472 processor_clone
473 .process_record(Record::new(2, 0, 0))
474 .unwrap();
475 processor_clone
476 .process_record(Record::new(3, 0, 0))
477 .unwrap();
478 processor_clone.on_batch_complete().unwrap();
479
480 // Check accumulated results
481 assert_eq!(processor.global_count.load(Ordering::Relaxed), 3);
482 assert_eq!(processor.global_sum.load(Ordering::Relaxed), 1 + 2 + 3);
483 }
484}