fgumi 0.2.0

High-performance tools for UMI-tagged sequencing data: extraction, grouping, and consensus calling
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
//! Optimized Chase-Bottleneck scheduler.
//!
//! An enhanced version of chase-bottleneck with optimizations based on profiling:
//!
//! 1. **Compress-biased prioritization**: Compress is consistently the bottleneck
//!    (~40% of work time), so it's always in the top priorities for parallel threads.
//!
//! 2. **Exclusive step avoidance**: Non-specialized threads deprioritize exclusive
//!    steps (Read, `FindBoundaries`, Group, Write) to reduce contention.
//!
//! 3. **Bottleneck stickiness**: Extra sticky on Compress/Serialize - don't wander
//!    away from these high-value steps even on temporary failures.
//!
//! 4. **Contention-aware backoff**: After failing due to contention on an exclusive
//!    step, temporarily avoid it to let the specialized thread work.

use super::{BackpressureState, Direction, Scheduler};
use crate::unified_pipeline::base::{ActiveSteps, PipelineStep};

/// Optimized chase-bottleneck scheduler.
pub struct OptimizedChaseScheduler {
    /// Thread ID.
    thread_id: usize,
    /// Total number of threads.
    #[allow(dead_code)]
    num_threads: usize,
    /// Current step this thread is focused on.
    current_step: PipelineStep,
    /// Current direction of movement.
    direction: Direction,
    /// Priority buffer for returning all steps in adaptive order.
    priority_buffer: [PipelineStep; 9],
    /// Consecutive successes on bottleneck steps (for extra stickiness).
    bottleneck_streak: u8,
    /// Backoff counter for exclusive steps after contention.
    exclusive_backoff: u8,
    /// Whether this thread is specialized for an exclusive step.
    is_exclusive_specialist: bool,
    /// The exclusive step this thread specializes in (if any).
    exclusive_specialty: Option<PipelineStep>,
    /// Active steps in the pipeline.
    active_steps: ActiveSteps,
}

impl OptimizedChaseScheduler {
    /// Exclusive steps that only one thread can execute at a time.
    const EXCLUSIVE_STEPS: [PipelineStep; 4] = [
        PipelineStep::Read,
        PipelineStep::FindBoundaries,
        PipelineStep::Group,
        PipelineStep::Write,
    ];

    /// Bottleneck steps that deserve extra stickiness.
    const BOTTLENECK_STEPS: [PipelineStep; 2] = [PipelineStep::Compress, PipelineStep::Serialize];

    /// How many failures before leaving a bottleneck step.
    const BOTTLENECK_STICKY_THRESHOLD: u8 = 3;

    /// How many cycles to avoid exclusive steps after contention.
    const EXCLUSIVE_BACKOFF_CYCLES: u8 = 5;

    /// Create a new optimized chase scheduler.
    #[must_use]
    pub fn new(thread_id: usize, num_threads: usize, active_steps: ActiveSteps) -> Self {
        let (current_step, is_exclusive_specialist, exclusive_specialty) =
            Self::determine_role(thread_id, num_threads);

        Self {
            thread_id,
            num_threads,
            current_step,
            direction: Direction::Forward,
            priority_buffer: PipelineStep::all(),
            bottleneck_streak: 0,
            exclusive_backoff: 0,
            is_exclusive_specialist,
            exclusive_specialty,
            active_steps,
        }
    }

    /// Determine thread role and initial step.
    fn determine_role(
        thread_id: usize,
        num_threads: usize,
    ) -> (PipelineStep, bool, Option<PipelineStep>) {
        use PipelineStep::{Compress, FindBoundaries, Group, Read, Serialize, Write};

        if thread_id == 0 {
            // Thread 0: Reader specialist
            (Read, true, Some(Read))
        } else if thread_id == num_threads - 1 && num_threads > 1 {
            // Last thread: Writer specialist
            (Write, true, Some(Write))
        } else if thread_id == 1 && num_threads > 2 {
            // Thread 1: Boundary specialist
            (FindBoundaries, true, Some(FindBoundaries))
        } else if thread_id == num_threads - 2 && num_threads > 3 {
            // Second-to-last: Group specialist
            (Group, true, Some(Group))
        } else {
            // Middle threads: Start on bottleneck steps (Compress/Serialize)
            // Spread them out to maximize parallelism
            let bottleneck_idx = (thread_id - 2) % 2;
            let step = if bottleneck_idx == 0 { Compress } else { Serialize };
            (step, false, None)
        }
    }

    /// Check if a step is exclusive.
    fn is_exclusive(step: PipelineStep) -> bool {
        Self::EXCLUSIVE_STEPS.contains(&step)
    }

    /// Check if a step is a bottleneck.
    fn is_bottleneck(step: PipelineStep) -> bool {
        Self::BOTTLENECK_STEPS.contains(&step)
    }

    /// Build optimized priority list.
    #[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap, clippy::cast_sign_loss)]
    fn build_priorities(&mut self, _bp: BackpressureState) {
        let mut priorities = Vec::with_capacity(9);
        let current_idx = self.current_step.index();

        // 1. Current step first (if not backing off from exclusive)
        let skip_current = Self::is_exclusive(self.current_step)
            && !self.is_exclusive_specialist
            && self.exclusive_backoff > 0;

        if !skip_current {
            priorities.push(self.current_step);
        }

        // 2. For exclusive specialists, always include their specialty second
        if let Some(specialty) = self.exclusive_specialty {
            if specialty != self.current_step {
                priorities.push(specialty);
            }
        }

        // 3. Bottleneck boost: Compress and Serialize always in top priorities
        //    for non-exclusive-specialist threads
        if !self.is_exclusive_specialist {
            for &step in &Self::BOTTLENECK_STEPS {
                if !priorities.contains(&step) {
                    priorities.push(step);
                }
            }
        }

        // 4. Build remaining priorities based on direction
        let (first_dir, second_dir): (i32, i32) = match self.direction {
            Direction::Forward => (1, -1),  // downstream first
            Direction::Backward => (-1, 1), // upstream first
        };

        // Expand from current position
        for distance in 1..=8 {
            // Primary direction
            let idx1 = current_idx as i32 + first_dir * distance;
            if (0..9).contains(&idx1) {
                let step = PipelineStep::all()[idx1 as usize];
                if !priorities.contains(&step) && self.should_include_step(step) {
                    priorities.push(step);
                }
            }

            // Secondary direction
            let idx2 = current_idx as i32 + second_dir * distance;
            if (0..9).contains(&idx2) {
                let step = PipelineStep::all()[idx2 as usize];
                if !priorities.contains(&step) && self.should_include_step(step) {
                    priorities.push(step);
                }
            }
        }

        // 5. Fill any remaining steps (shouldn't happen, but safety)
        for &step in &PipelineStep::all() {
            if !priorities.contains(&step) {
                priorities.push(step);
            }
        }

        // Copy to buffer
        for (i, &step) in priorities.iter().take(9).enumerate() {
            self.priority_buffer[i] = step;
        }
    }

    /// Check if this thread should include a step in its priorities.
    fn should_include_step(&self, step: PipelineStep) -> bool {
        if !Self::is_exclusive(step) {
            // Always include parallel steps
            return true;
        }

        // For exclusive steps:
        if self.is_exclusive_specialist {
            // Specialists include all exclusive steps (they might help others)
            true
        } else {
            // Non-specialists only include exclusive steps if not backing off
            self.exclusive_backoff == 0
        }
    }
}

impl Scheduler for OptimizedChaseScheduler {
    fn get_priorities(&mut self, bp: BackpressureState) -> &[PipelineStep] {
        // Decrement backoff counter
        if self.exclusive_backoff > 0 {
            self.exclusive_backoff -= 1;
        }

        // Apply global backpressure hints
        if bp.output_high {
            self.direction = Direction::Forward;
        } else if bp.input_low && !bp.read_done {
            self.direction = Direction::Backward;
        }

        self.build_priorities(bp);
        let n = self.active_steps.filter_in_place(&mut self.priority_buffer);
        &self.priority_buffer[..n]
    }

    fn record_outcome(&mut self, step: PipelineStep, success: bool, was_contention: bool) {
        if success {
            // Success: stay sticky
            self.current_step = step;

            // Track bottleneck streak
            if Self::is_bottleneck(step) {
                self.bottleneck_streak = self.bottleneck_streak.saturating_add(1);
            } else {
                self.bottleneck_streak = 0;
            }

            // Reset backoff on any success
            self.exclusive_backoff = 0;
        } else {
            // Failure handling

            // Contention on exclusive step: back off if not specialist
            if was_contention && Self::is_exclusive(step) && !self.is_exclusive_specialist {
                self.exclusive_backoff = Self::EXCLUSIVE_BACKOFF_CYCLES;
            }

            // Extra stickiness for bottleneck steps
            if Self::is_bottleneck(self.current_step)
                && self.bottleneck_streak < Self::BOTTLENECK_STICKY_THRESHOLD
            {
                // Stay on bottleneck step despite failure
                self.bottleneck_streak = self.bottleneck_streak.saturating_add(1);
                return;
            }

            // Normal movement: advance in current direction
            let idx = self.current_step.index();
            self.current_step = match self.direction {
                Direction::Forward => {
                    if idx < 8 {
                        PipelineStep::all()[idx + 1]
                    } else {
                        // Wrap to first parallel step, not Read
                        PipelineStep::Decompress
                    }
                }
                Direction::Backward => {
                    if idx > 1 {
                        PipelineStep::all()[idx - 1]
                    } else {
                        // Wrap to last parallel step, not Write
                        PipelineStep::Compress
                    }
                }
            };

            // Reset bottleneck streak when moving
            self.bottleneck_streak = 0;
        }
    }

    fn thread_id(&self) -> usize {
        self.thread_id
    }

    fn num_threads(&self) -> usize {
        self.num_threads
    }

    fn active_steps(&self) -> &ActiveSteps {
        &self.active_steps
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_thread_0_is_reader_specialist() {
        let scheduler = OptimizedChaseScheduler::new(0, 8, ActiveSteps::all());
        assert_eq!(scheduler.current_step, PipelineStep::Read);
        assert!(scheduler.is_exclusive_specialist);
        assert_eq!(scheduler.exclusive_specialty, Some(PipelineStep::Read));
    }

    #[test]
    fn test_thread_7_is_writer_specialist() {
        let scheduler = OptimizedChaseScheduler::new(7, 8, ActiveSteps::all());
        assert_eq!(scheduler.current_step, PipelineStep::Write);
        assert!(scheduler.is_exclusive_specialist);
        assert_eq!(scheduler.exclusive_specialty, Some(PipelineStep::Write));
    }

    #[test]
    fn test_thread_1_is_boundary_specialist() {
        let scheduler = OptimizedChaseScheduler::new(1, 8, ActiveSteps::all());
        assert_eq!(scheduler.current_step, PipelineStep::FindBoundaries);
        assert!(scheduler.is_exclusive_specialist);
    }

    #[test]
    fn test_thread_6_is_group_specialist() {
        let scheduler = OptimizedChaseScheduler::new(6, 8, ActiveSteps::all());
        assert_eq!(scheduler.current_step, PipelineStep::Group);
        assert!(scheduler.is_exclusive_specialist);
    }

    #[test]
    fn test_middle_threads_start_on_bottleneck() {
        let scheduler = OptimizedChaseScheduler::new(2, 8, ActiveSteps::all());
        assert!(
            scheduler.current_step == PipelineStep::Compress
                || scheduler.current_step == PipelineStep::Serialize
        );
        assert!(!scheduler.is_exclusive_specialist);
    }

    #[test]
    fn test_bottleneck_steps_in_priorities() {
        let mut scheduler = OptimizedChaseScheduler::new(3, 8, ActiveSteps::all());
        let bp = BackpressureState::default();
        let priorities = scheduler.get_priorities(bp);

        // Compress and Serialize should be in top 4 for non-specialist
        let compress_pos = priorities.iter().position(|&s| s == PipelineStep::Compress);
        let serialize_pos = priorities.iter().position(|&s| s == PipelineStep::Serialize);

        assert!(compress_pos.expect("compress position should be Some") < 4);
        assert!(serialize_pos.expect("serialize position should be Some") < 4);
    }

    #[test]
    fn test_exclusive_backoff_after_contention() {
        let mut scheduler = OptimizedChaseScheduler::new(3, 8, ActiveSteps::all()); // Non-specialist
        scheduler.current_step = PipelineStep::Group;

        // Fail with contention on exclusive step
        scheduler.record_outcome(PipelineStep::Group, false, true);

        assert_eq!(scheduler.exclusive_backoff, OptimizedChaseScheduler::EXCLUSIVE_BACKOFF_CYCLES);
    }

    #[test]
    fn test_specialist_no_backoff() {
        let mut scheduler = OptimizedChaseScheduler::new(0, 8, ActiveSteps::all()); // Reader specialist

        // Fail with contention on Read
        scheduler.record_outcome(PipelineStep::Read, false, true);

        // Specialist should not back off from their specialty
        assert_eq!(scheduler.exclusive_backoff, 0);
    }

    #[test]
    fn test_bottleneck_stickiness() {
        let mut scheduler = OptimizedChaseScheduler::new(3, 8, ActiveSteps::all());
        scheduler.current_step = PipelineStep::Compress;

        // First failure: stay sticky
        scheduler.record_outcome(PipelineStep::Compress, false, false);
        assert_eq!(scheduler.current_step, PipelineStep::Compress);

        // Second failure: still sticky
        scheduler.record_outcome(PipelineStep::Compress, false, false);
        assert_eq!(scheduler.current_step, PipelineStep::Compress);

        // Third failure: still sticky (at threshold)
        scheduler.record_outcome(PipelineStep::Compress, false, false);
        assert_eq!(scheduler.current_step, PipelineStep::Compress);

        // Fourth failure: now move
        scheduler.record_outcome(PipelineStep::Compress, false, false);
        assert_ne!(scheduler.current_step, PipelineStep::Compress);
    }

    #[test]
    fn test_wrap_to_parallel_steps() {
        let mut scheduler = OptimizedChaseScheduler::new(3, 8, ActiveSteps::all());
        scheduler.current_step = PipelineStep::Write;
        scheduler.direction = Direction::Forward;
        scheduler.bottleneck_streak = 10; // Past threshold

        scheduler.record_outcome(PipelineStep::Write, false, false);

        // Should wrap to Decompress, not Read
        assert_eq!(scheduler.current_step, PipelineStep::Decompress);
    }

    #[test]
    fn test_priorities_returns_all_steps() {
        let mut scheduler = OptimizedChaseScheduler::new(3, 8, ActiveSteps::all());
        let bp = BackpressureState::default();
        let priorities = scheduler.get_priorities(bp);
        assert_eq!(priorities.len(), 9);
    }
}