1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
//! Unified thread pool pipeline for `--threads N` mode.
//!
//! This module provides a true unified thread pool where all N threads can perform
//! any type of work: reading, processing, or writing. This ensures `--threads N`
//! is a strict thread cap with no separate I/O thread pools.
//!
//! # Architecture
//!
//! ```text
//! ┌─────────────────────────────────────────────────────────────────────────┐
//! │ UNIFIED THREAD POOL (N threads) │
//! │ │
//! │ All threads can do ANY work: READ → PROCESS → WRITE │
//! │ │
//! │ Priority scheduling based on queue depths: │
//! │ - Input queue low → prioritize reading │
//! │ - Output queue high → prioritize writing │
//! │ - Otherwise → prioritize processing │
//! │ │
//! │ File access via parking_lot::Mutex with try_lock() │
//! │ Lock-free queues via crossbeam ArrayQueue │
//! └─────────────────────────────────────────────────────────────────────────┘
//! ```
//!
//! # Pipeline Steps
//!
//! Both BAM and FASTQ pipelines use a 9-step structure:
//!
//! ```text
//! Read → Decompress → FindBoundaries → Parse → Group → Process → Serialize → Compress → Write
//! [parallel] [sequential] [par] [seq] [parallel] [parallel] [parallel] [seq]
//! ```
//!
//! - **Read**: Read raw bytes from input (sequential, requires file lock)
//! - **Decompress**: Decompress BGZF blocks (parallel)
//! - **`FindBoundaries`**: Find record boundaries in decompressed data (sequential)
//! - **Parse**: Construct record objects from boundaries (parallel - key optimization!)
//! - **Group**: Group records by position/UMI/template (sequential)
//! - **Process**: Apply domain-specific processing (parallel)
//! - **Serialize**: Convert output to bytes (parallel)
//! - **Compress**: Compress to BGZF blocks (parallel)
//! - **Write**: Write to output file (sequential, requires file lock)
//!
//! # FASTQ Parallel Parse Optimization
//!
//! The FASTQ pipeline includes a parallel Parse step that can improve
//! thread scaling, especially from t4 to t8. This was implemented because profiling
//! showed that 99.6% of Group step time was spent parsing FASTQ records.
//!
//! When `FastqPipelineConfig::use_parallel_parse` is true:
//! - `FindBoundaries` step scans for FASTQ record boundaries (fast, O(N) scan)
//! - Parse step constructs `FastqRecord` objects in parallel
//! - Group step receives pre-parsed records (no parsing under lock!)
//!
//! # Module Structure
//!
//! - `base`: Core infrastructure, traits, and shared types
//! - `bam`: BAM pipeline implementation
//! - `fastq`: FASTQ pipeline with multi-stream grouping and parallel Parse
//! - `scheduler`: Thread scheduling strategies
//! - `deadlock`: Deadlock detection and recovery
//! - `queue`: Queue implementations
//! - `rebalancer`: Dynamic memory rebalancing
//!
//! # Parallel Ordered Batch Processing Pattern
//!
//! Both BAM and FASTQ pipelines use a common pattern for steps that require
//! ordered output but can do work in parallel:
//!
//! 1. **Per-worker held state**: Each worker holds its result if output queue is full
//! 2. **Brief reorder lock**: Lock held only for insert/pop, not during work
//! 3. **Work outside lock**: Actual processing happens without holding locks
//! 4. **Priority advancement**: Always try to push held items first
//!
//! This pattern is implemented in:
//! - `bam.rs`: `try_step_find_boundaries()` with `WorkerState.held_boundaries`
//! - `fastq.rs`: `fastq_try_step_find_boundaries()` with `FastqWorkerState.held_boundaries`
//!
//! When modifying either implementation, ensure the pattern stays in sync.
//! The `HasHeldBoundaries` trait in `base.rs` documents the interface.
//!
//! # Adding a New Pipeline Type
//!
//! To add a new pipeline (e.g., for a new input format):
//!
//! 1. **Define your pipeline state struct** implementing:
//! - [`PipelineLifecycle`] — completion, error, drain mode, validation
//! - [`MonitorableState`] — if using the shared monitor loop
//! - [`OutputPipelineState`] — if writing BAM/BGZF output
//! - [`ProcessPipelineState`] — for the process step
//! - [`SerializePipelineState`] — for the serialize step
//! - [`WritePipelineState`] — for the write step
//!
//! 2. **Define your worker state struct** implementing:
//! - [`WorkerStateCommon`] + [`HasWorkerCore`] — required for all workers
//! - `HasHeld*` traits — one per non-blocking step your pipeline uses
//! - [`HasCompressor`] + [`HasRecycledBuffers`] — if writing compressed output
//!
//! 3. **Implement [`StepContext`]** to plug into `generic_worker_loop`, or write
//! a custom worker loop.
//!
//! 4. **Reuse shared step functions** (`shared_try_step_compress`, etc.) where
//! possible — they handle non-blocking held-item logic correctly.
//!
//! See `bam.rs` and `fastq.rs` for complete examples of this pattern.
// Re-export everything from base
pub use *;
// Re-export everything from bam
pub use *;
// Re-export everything from fastq
pub use *;
// Re-export queue types
pub use ;
// Re-export rebalancer types
pub use ;
// Re-export scheduler types
pub use ;
// Re-export deadlock detection types
pub use ;