fgumi 0.2.0

High-performance tools for UMI-tagged sequencing data: extraction, grouping, and consensus calling
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
//! Balanced Chase Drain scheduler.
//!
//! A variant of balanced-chase that adds "drain mode" - when Serialize
//! fails due to output queue full, temporarily prioritize Compress to
//! drain the queue before retrying Serialize.
//!
//! The key insight is that when Q6 (serialized queue) is full, threads
//! should focus on Compress to drain it rather than immediately retrying
//! Serialize (which will fail again).
//!
//! Drain mode is backpressure-driven: once entered, it stays active until
//! the output queue drops below the high-water mark (`output_high` = false).

use super::{BackpressureState, Direction, Scheduler};
use crate::unified_pipeline::base::{ActiveSteps, PipelineStep};

/// Balanced chase scheduler with drain mode for output backpressure.
pub struct BalancedChaseDrainScheduler {
    /// Thread ID.
    thread_id: usize,
    /// Total number of threads.
    #[allow(dead_code)]
    num_threads: usize,
    /// Current preferred step.
    current_step: PipelineStep,
    /// Current direction of movement.
    direction: Direction,
    /// Priority buffer.
    priority_buffer: [PipelineStep; 9],
    /// Which exclusive step this thread is responsible for (if any).
    exclusive_role: Option<PipelineStep>,
    /// Drain mode flag - when true, prioritize Compress over Serialize.
    /// Activated when Serialize fails, deactivated when `output_high` clears.
    drain_mode: bool,
    /// Memory drain mode - when true, prioritize Process over Group.
    /// Activated when Group fails due to memory pressure, deactivated when `memory_high` clears.
    memory_drain_mode: bool,
    /// Active steps in the pipeline.
    active_steps: ActiveSteps,
    /// Whether this thread prefers Compress over Serialize in normal mode.
    /// Set for half the non-exclusive threads when `num_threads >= 8`.
    /// Provides soft cache affinity without OS-level thread pinning.
    compress_preferred: bool,
}

impl BalancedChaseDrainScheduler {
    /// Create a new balanced chase drain scheduler.
    #[must_use]
    pub fn new(thread_id: usize, num_threads: usize, active_steps: ActiveSteps) -> Self {
        let (current_step, exclusive_role) = Self::determine_role(thread_id, num_threads);
        let compress_preferred =
            Self::is_compress_preferred(thread_id, num_threads, exclusive_role);

        Self {
            thread_id,
            num_threads,
            current_step,
            direction: Direction::Forward,
            priority_buffer: PipelineStep::all(),
            exclusive_role,
            drain_mode: false,
            memory_drain_mode: false,
            active_steps,
            compress_preferred,
        }
    }

    /// Whether this thread should prefer Compress over Serialize in normal mode.
    /// For 8+ threads, non-exclusive even-indexed middle threads prefer Compress.
    fn is_compress_preferred(
        thread_id: usize,
        num_threads: usize,
        exclusive_role: Option<PipelineStep>,
    ) -> bool {
        if num_threads < 8 {
            return false;
        }
        if exclusive_role.is_some() {
            return false;
        }
        // Even-indexed middle threads prefer Compress
        thread_id.is_multiple_of(2)
    }

    /// Determine thread role - same as balanced-chase.
    fn determine_role(
        thread_id: usize,
        num_threads: usize,
    ) -> (PipelineStep, Option<PipelineStep>) {
        super::balanced_chase_determine_role(thread_id, num_threads)
    }

    /// Build priority list with drain mode support.
    fn build_priorities(&mut self, bp: BackpressureState) {
        use PipelineStep::{
            Compress, Decode, Decompress, FindBoundaries, Group, Process, Read, Serialize, Write,
        };

        let mut priorities = Vec::with_capacity(9);

        // 1. If we have an exclusive role, always try it first
        //    (but we won't be sticky on it after success)
        if let Some(role) = self.exclusive_role {
            priorities.push(role);
        }

        // 2. Memory drain mode - prioritize Process to drain q4_groups
        if (self.memory_drain_mode || bp.memory_high) && !priorities.contains(&Process) {
            priorities.push(Process);
        }

        // 3. Bottleneck steps - with drain mode logic
        //    In drain mode OR when output is high, prioritize Compress over Serialize
        if self.drain_mode || bp.output_high {
            // DRAIN MODE: Compress first to drain Q6
            if !priorities.contains(&Compress) {
                priorities.push(Compress);
            }
            // Serialize goes last (will be added at end)

            // 4. Current step (if not already added)
            if !priorities.contains(&self.current_step) {
                priorities.push(self.current_step);
            }

            // 5. Other parallel steps based on direction
            let parallel_order: &[PipelineStep] = match self.direction {
                Direction::Forward => &[Process, Decode, Decompress],
                Direction::Backward => &[Decompress, Decode, Process],
            };

            for &step in parallel_order {
                if !priorities.contains(&step) {
                    priorities.push(step);
                }
            }

            // 6. Exclusive steps last (except our role which is first)
            let exclusive_steps = [Read, FindBoundaries, Group, Write];
            for &step in &exclusive_steps {
                if !priorities.contains(&step) {
                    priorities.push(step);
                }
            }

            // 7. Add Serialize at end if in drain mode (so it's still in the list, just deprioritized)
            if self.drain_mode && !priorities.contains(&Serialize) {
                priorities.push(Serialize);
            }
        } else {
            // Normal mode: order depends on thread affinity
            if self.compress_preferred {
                // Compress-preferred threads: Compress then Serialize
                if !priorities.contains(&Compress) {
                    priorities.push(Compress);
                }
                if !priorities.contains(&Serialize) {
                    priorities.push(Serialize);
                }
            } else {
                // Default: Serialize then Compress
                if !priorities.contains(&Serialize) {
                    priorities.push(Serialize);
                }
                if !priorities.contains(&Compress) {
                    priorities.push(Compress);
                }
            }

            // Current step (if not already added)
            if !priorities.contains(&self.current_step) {
                priorities.push(self.current_step);
            }

            // Other parallel steps based on direction
            let parallel_order: &[PipelineStep] = match self.direction {
                Direction::Forward => &[Process, Decode, Decompress],
                Direction::Backward => &[Decompress, Decode, Process],
            };

            for &step in parallel_order {
                if !priorities.contains(&step) {
                    priorities.push(step);
                }
            }

            // Exclusive steps last (except our role which is first)
            let exclusive_steps = [Read, FindBoundaries, Group, Write];
            for &step in &exclusive_steps {
                if !priorities.contains(&step) {
                    priorities.push(step);
                }
            }
        }

        // Copy to buffer
        for (i, &step) in priorities.iter().take(9).enumerate() {
            self.priority_buffer[i] = step;
        }
    }
}

impl Scheduler for BalancedChaseDrainScheduler {
    fn get_priorities(&mut self, bp: BackpressureState) -> &[PipelineStep] {
        // Update direction based on backpressure
        if bp.output_high || bp.memory_high {
            self.direction = Direction::Forward;
        } else if bp.input_low && !bp.read_done {
            self.direction = Direction::Backward;
        }

        // Exit drain mode when backpressure clears
        // (output_high = false means Q6 has space again)
        if self.drain_mode && !bp.output_high {
            self.drain_mode = false;
        }

        // Exit memory drain mode when memory has drained below 50% (hysteresis)
        // This prevents thrashing at the limit boundary
        if self.memory_drain_mode && bp.memory_drained {
            self.memory_drain_mode = false;
        }

        self.build_priorities(bp);
        let n = self.active_steps.filter_in_place(&mut self.priority_buffer);
        &self.priority_buffer[..n]
    }

    fn record_outcome(&mut self, step: PipelineStep, success: bool, _was_contention: bool) {
        if success {
            // Pivot logic from balanced-chase:
            // After completing exclusive work, pivot to bottleneck
            if self.exclusive_role == Some(step) {
                self.current_step = PipelineStep::Compress;
            } else {
                // Normal case: stay on successful step
                self.current_step = step;
            }
        } else {
            // DRAIN MODE TRIGGER: Serialize failure enters drain mode
            if step == PipelineStep::Serialize {
                self.drain_mode = true;
            }

            // MEMORY DRAIN MODE TRIGGER: Group failure enters memory drain mode
            // This prioritizes Process to drain q4_groups and free memory
            if step == PipelineStep::Group {
                self.memory_drain_mode = true;
            }

            // Movement logic from balanced-chase
            let idx = self.current_step.index();

            // Bias movement toward Compress (index 7) and Serialize (index 6)
            self.current_step = match self.direction {
                Direction::Forward => {
                    if idx < 7 {
                        PipelineStep::all()[idx + 1]
                    } else {
                        PipelineStep::Compress
                    }
                }
                Direction::Backward => {
                    if idx > 1 && idx != 7 && idx != 6 {
                        PipelineStep::all()[idx - 1]
                    } else {
                        // Stay near bottleneck
                        PipelineStep::Serialize
                    }
                }
            };
        }
    }

    fn thread_id(&self) -> usize {
        self.thread_id
    }

    fn num_threads(&self) -> usize {
        self.num_threads
    }

    fn active_steps(&self) -> &ActiveSteps {
        &self.active_steps
    }
}

#[cfg(test)]
mod tests {
    use rstest::rstest;

    use super::*;

    fn all() -> ActiveSteps {
        ActiveSteps::all()
    }

    #[test]
    fn test_reader_starts_on_compress() {
        let scheduler = BalancedChaseDrainScheduler::new(0, 8, all());
        assert_eq!(scheduler.current_step, PipelineStep::Compress);
        assert_eq!(scheduler.exclusive_role, Some(PipelineStep::Read));
        assert!(!scheduler.drain_mode);
    }

    #[test]
    fn test_serialize_failure_triggers_drain_mode() {
        let mut scheduler = BalancedChaseDrainScheduler::new(3, 8, all());

        // Simulate Serialize failure
        scheduler.record_outcome(PipelineStep::Serialize, false, false);

        // Drain mode should be active
        assert!(scheduler.drain_mode);
    }

    #[test]
    fn test_drain_mode_exits_when_backpressure_clears() {
        let mut scheduler = BalancedChaseDrainScheduler::new(3, 8, all());

        // Enter drain mode
        scheduler.drain_mode = true;

        // Backpressure still high - should stay in drain mode
        let bp_high = BackpressureState {
            output_high: true,
            input_low: false,
            read_done: false,
            memory_high: false,
            memory_drained: true,
        };
        scheduler.get_priorities(bp_high);
        assert!(scheduler.drain_mode);

        // Backpressure cleared - should exit drain mode
        let bp_clear = BackpressureState::default(); // output_high = false
        scheduler.get_priorities(bp_clear);
        assert!(!scheduler.drain_mode);
    }

    #[test]
    fn test_drain_mode_prioritizes_compress() {
        let mut scheduler = BalancedChaseDrainScheduler::new(3, 8, all());

        // Enter drain mode
        scheduler.drain_mode = true;

        // Keep output_high so we don't exit drain mode
        let bp = BackpressureState {
            output_high: true,
            input_low: false,
            read_done: false,
            memory_high: false,
            memory_drained: true,
        };
        let priorities = scheduler.get_priorities(bp);

        // Compress should be first (before Serialize)
        let compress_pos = priorities.iter().position(|&s| s == PipelineStep::Compress);
        let serialize_pos = priorities.iter().position(|&s| s == PipelineStep::Serialize);

        assert!(
            compress_pos.expect("compress position should be Some")
                < serialize_pos.expect("serialize position should be Some")
        );
    }

    #[test]
    fn test_normal_mode_serializes_first() {
        // Thread 3 of 8 is odd-indexed middle thread, so NOT compress_preferred
        let mut scheduler = BalancedChaseDrainScheduler::new(3, 8, all());

        // Not in drain mode
        assert!(!scheduler.drain_mode);
        assert!(!scheduler.compress_preferred);

        let bp = BackpressureState::default();
        let priorities = scheduler.get_priorities(bp);

        // Serialize should be before Compress in normal mode for non-preferred threads
        let compress_pos = priorities.iter().position(|&s| s == PipelineStep::Compress);
        let serialize_pos = priorities.iter().position(|&s| s == PipelineStep::Serialize);

        assert!(
            serialize_pos.expect("serialize position should be Some")
                < compress_pos.expect("compress position should be Some")
        );
    }

    #[rstest]
    #[case(0, 4)]
    #[case(1, 4)]
    #[case(2, 4)]
    #[case(3, 4)]
    #[case(0, 2)]
    #[case(1, 2)]
    fn test_compress_preferred_below_8_threads(
        #[case] thread_id: usize,
        #[case] num_threads: usize,
    ) {
        let (_, exclusive_role) =
            BalancedChaseDrainScheduler::determine_role(thread_id, num_threads);
        assert!(
            !BalancedChaseDrainScheduler::is_compress_preferred(
                thread_id,
                num_threads,
                exclusive_role
            ),
            "thread {thread_id} of {num_threads} should not be compress-preferred"
        );
    }

    #[test]
    fn test_compress_preferred_at_8_threads() {
        // At 8 threads: T0=Read, T1=FindBoundaries, T6=Group, T7=Write (exclusive)
        // Middle threads: T2, T3, T4, T5
        // Even-indexed middle: T2, T4 should be compress-preferred
        for (tid, expected) in [
            (0, false), // Read
            (1, false), // FindBoundaries
            (2, true),  // even middle
            (3, false), // odd middle
            (4, true),  // even middle
            (5, false), // odd middle
            (6, false), // Group
            (7, false), // Write
        ] {
            let (_, exclusive_role) = BalancedChaseDrainScheduler::determine_role(tid, 8);
            assert_eq!(
                BalancedChaseDrainScheduler::is_compress_preferred(tid, 8, exclusive_role),
                expected,
                "thread {tid} of 8"
            );
        }
    }

    #[test]
    fn test_compress_preferred_priorities() {
        // Thread 2 of 8 is compress-preferred
        let mut scheduler = BalancedChaseDrainScheduler::new(2, 8, all());
        assert!(scheduler.compress_preferred);

        let bp = BackpressureState::default();
        let priorities = scheduler.get_priorities(bp);

        // Compress should be before Serialize for compress-preferred threads
        let compress_pos = priorities.iter().position(|&s| s == PipelineStep::Compress);
        let serialize_pos = priorities.iter().position(|&s| s == PipelineStep::Serialize);
        assert!(
            compress_pos.expect("compress position should be Some")
                < serialize_pos.expect("serialize position should be Some"),
            "Compress-preferred thread should try Compress before Serialize"
        );
    }

    #[test]
    fn test_compress_preferred_drain_mode() {
        // In drain mode, Compress is already first regardless of preference
        let mut scheduler = BalancedChaseDrainScheduler::new(2, 8, all());
        scheduler.drain_mode = true;

        let bp = BackpressureState {
            output_high: true,
            input_low: false,
            read_done: false,
            memory_high: false,
            memory_drained: true,
        };
        let priorities = scheduler.get_priorities(bp);

        let compress_pos = priorities.iter().position(|&s| s == PipelineStep::Compress);
        let serialize_pos = priorities.iter().position(|&s| s == PipelineStep::Serialize);
        assert!(
            compress_pos.expect("compress position should be Some")
                < serialize_pos.expect("serialize position should be Some")
        );
    }

    #[test]
    fn test_memory_high_prioritizes_process() {
        let mut scheduler = BalancedChaseDrainScheduler::new(3, 8, all());

        // Memory high - should prioritize Process to drain q4_groups
        let bp = BackpressureState {
            output_high: false,
            input_low: false,
            read_done: false,
            memory_high: true,
            memory_drained: false,
        };
        let priorities = scheduler.get_priorities(bp);

        // Process should be early in the priority list (after exclusive role if any)
        let process_pos = priorities.iter().position(|&s| s == PipelineStep::Process);
        assert!(
            process_pos.expect("process position should be Some") <= 2,
            "Process should be prioritized when memory is high"
        );
    }
}