Skip to main content

fgumi_lib/unified_pipeline/
base.rs

1//! Core infrastructure, traits, and shared types for the unified pipeline.
2//!
3//! This module contains:
4//! - Batch buffer types (`RawBlockBatch`, `CompressedBlockBatch`, etc.)
5//! - BGZF compression types
6//! - `PipelineConfig` and `PipelineStats`
7//! - `OutputPipelineState` trait for shared step functions
8//! - `HasCompressor` trait
9//! - `BatchWeight` trait for template-based batching
10//! - Shared step functions (`shared_try_step_compress`, `shared_try_step_write`)
11
12use crossbeam_queue::ArrayQueue;
13use log::info;
14use parking_lot::Mutex;
15use std::io::{self, Read, Write};
16use std::sync::Arc;
17use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering};
18use std::thread;
19use std::time::{Duration, Instant};
20
21use crate::progress::ProgressTracker;
22
23use super::deadlock::{DeadlockState, QueueSnapshot, check_deadlock_and_restore};
24
25use crate::bgzf_reader::{RawBgzfBlock, decompress_block_into, read_raw_blocks};
26use crate::read_info::LibraryIndex;
27use crate::reorder_buffer::ReorderBuffer;
28use noodles::sam::alignment::RecordBuf;
29use noodles::sam::alignment::record::data::field::Tag;
30
31use super::scheduler::{BackpressureState, Scheduler, SchedulerStrategy, create_scheduler};
32
33// ============================================================================
34// Memory Breakdown Structure
35// ============================================================================
36
37/// Memory breakdown for comprehensive debugging
38#[cfg(feature = "memory-debug")]
39#[derive(Debug, Clone)]
40pub struct MemoryBreakdown {
41    /// System RSS in GB
42    pub system_rss_gb: f64,
43    /// Total tracked memory in GB
44    pub tracked_total_gb: f64,
45    /// Untracked memory (allocator overhead, etc.) in GB
46    pub untracked_gb: f64,
47
48    // Queue memory in MB/GB
49    pub q1_mb: f64,
50    pub q2_mb: f64,
51    pub q3_mb: f64,
52    pub q4_gb: f64,
53    pub q5_gb: f64,
54    pub q6_mb: f64,
55    pub q7_mb: f64,
56
57    // Processing memory in GB/MB
58    pub position_groups_gb: f64,
59    pub templates_gb: f64,
60    pub reorder_buffers_mb: f64,
61    pub grouper_mb: f64,
62    pub worker_local_mb: f64,
63
64    // Infrastructure memory in MB
65    pub decompressors_mb: f64,
66    pub compressors_mb: f64,
67    pub worker_buffers_mb: f64,
68    pub io_buffers_mb: f64,
69    pub thread_stacks_mb: f64,
70    pub queue_capacity_mb: f64,
71    /// Total infrastructure memory in GB (sum of above)
72    pub infrastructure_gb: f64,
73}
74
75/// Comprehensive memory tracking fields, grouped into a single struct to keep
76/// `PipelineStats` readable. All fields are `AtomicU64` for lock-free updates.
77#[cfg(feature = "memory-debug")]
78#[derive(Debug)]
79pub struct MemoryDebugStats {
80    // Queue-specific memory tracking
81    /// Memory held in Q1 (raw BGZF blocks)
82    pub q1_memory_bytes: AtomicU64,
83    /// Memory held in Q2 (decompressed blocks)
84    pub q2_memory_bytes: AtomicU64,
85    /// Memory held in Q3 (decoded records)
86    pub q3_memory_bytes: AtomicU64,
87    /// Memory held in Q4 (position groups) - likely the big one
88    pub q4_memory_bytes: AtomicU64,
89    /// Memory held in Q5 (processed groups)
90    pub q5_memory_bytes: AtomicU64,
91    /// Memory held in Q6 (serialized data)
92    pub q6_memory_bytes: AtomicU64,
93    /// Memory held in Q7 (compressed output blocks)
94    pub q7_memory_bytes: AtomicU64,
95
96    // Processing memory tracking
97    /// Memory held in position groups during processing
98    pub position_group_processing_bytes: AtomicU64,
99    /// Memory held in templates during operations
100    pub template_processing_bytes: AtomicU64,
101    /// Memory held in reorder buffers
102    pub reorder_buffer_bytes: AtomicU64,
103    /// Memory held by grouper state
104    pub grouper_memory_bytes: AtomicU64,
105    /// Memory held by worker-local allocations
106    pub worker_local_memory_bytes: AtomicU64,
107
108    // Infrastructure memory (known constants set once at pipeline startup)
109    /// Memory for per-thread decompressor instances (libdeflater)
110    pub decompressor_memory_bytes: AtomicU64,
111    /// Memory for per-thread compressor instances (InlineBgzfCompressor)
112    pub compressor_memory_bytes: AtomicU64,
113    /// Memory for per-thread serialization + decompression buffers
114    pub worker_buffer_memory_bytes: AtomicU64,
115    /// Memory for I/O buffers (BufReader + BufWriter)
116    pub io_buffer_memory_bytes: AtomicU64,
117    /// Memory for thread stacks (2MB per thread default)
118    pub thread_stack_memory_bytes: AtomicU64,
119    /// Memory for ArrayQueue pre-allocation overhead
120    pub queue_capacity_memory_bytes: AtomicU64,
121
122    // System memory tracking
123    /// Actual system RSS (from /proc/self/status or sysinfo)
124    pub system_rss_bytes: AtomicU64,
125}
126
127#[cfg(feature = "memory-debug")]
128impl MemoryDebugStats {
129    /// Create a new memory debug stats collector with all counters at zero.
130    #[must_use]
131    pub fn new() -> Self {
132        Self {
133            q1_memory_bytes: AtomicU64::new(0),
134            q2_memory_bytes: AtomicU64::new(0),
135            q3_memory_bytes: AtomicU64::new(0),
136            q4_memory_bytes: AtomicU64::new(0),
137            q5_memory_bytes: AtomicU64::new(0),
138            q6_memory_bytes: AtomicU64::new(0),
139            q7_memory_bytes: AtomicU64::new(0),
140            position_group_processing_bytes: AtomicU64::new(0),
141            template_processing_bytes: AtomicU64::new(0),
142            reorder_buffer_bytes: AtomicU64::new(0),
143            grouper_memory_bytes: AtomicU64::new(0),
144            worker_local_memory_bytes: AtomicU64::new(0),
145            decompressor_memory_bytes: AtomicU64::new(0),
146            compressor_memory_bytes: AtomicU64::new(0),
147            worker_buffer_memory_bytes: AtomicU64::new(0),
148            io_buffer_memory_bytes: AtomicU64::new(0),
149            thread_stack_memory_bytes: AtomicU64::new(0),
150            queue_capacity_memory_bytes: AtomicU64::new(0),
151            system_rss_bytes: AtomicU64::new(0),
152        }
153    }
154}
155
156// ============================================================================
157// Thread-Local Memory Tracking
158// ============================================================================
159
160#[cfg(feature = "memory-debug")]
161/// Sentinel value meaning "no thread ID assigned yet".
162const THREAD_ID_UNSET: usize = usize::MAX;
163
164#[cfg(feature = "memory-debug")]
165thread_local! {
166    static THREAD_ID: std::cell::Cell<usize> = std::cell::Cell::new(THREAD_ID_UNSET);
167}
168
169#[cfg(feature = "memory-debug")]
170/// Get or assign a thread ID for memory tracking.
171/// Returns IDs in range 0..MAX_THREADS. IDs wrap if more than MAX_THREADS
172/// unique threads call this function (per-thread data will be shared).
173pub fn get_or_assign_thread_id() -> usize {
174    THREAD_ID.with(|id| {
175        let current = id.get();
176        if current == THREAD_ID_UNSET {
177            use std::sync::atomic::{AtomicUsize, Ordering};
178            static THREAD_COUNTER: AtomicUsize = AtomicUsize::new(0);
179
180            let new_id = THREAD_COUNTER.fetch_add(1, Ordering::Relaxed) % MAX_THREADS;
181            id.set(new_id);
182            new_id
183        } else {
184            current
185        }
186    })
187}
188
189// ============================================================================
190// System Memory Utilities
191// ============================================================================
192
193#[cfg(feature = "memory-debug")]
194/// Get current process RSS from /proc/self/status (Linux) or sysinfo (cross-platform)
195pub fn get_process_rss_bytes() -> Option<u64> {
196    // Try Linux-style /proc/self/status first (most accurate)
197    if let Ok(status) = std::fs::read_to_string("/proc/self/status") {
198        return status
199            .lines()
200            .find(|line| line.starts_with("VmRSS:"))?
201            .split_whitespace()
202            .nth(1)?
203            .parse::<u64>()
204            .ok()
205            .map(|kb| kb * 1024); // Convert KB to bytes
206    }
207
208    // Cross-platform fallback using sysinfo (cached instance)
209    use std::sync::Mutex;
210    use sysinfo::{ProcessRefreshKind, RefreshKind, System};
211
212    static RSS_SYSTEM: std::sync::OnceLock<Mutex<System>> = std::sync::OnceLock::new();
213
214    let sys = RSS_SYSTEM.get_or_init(|| {
215        Mutex::new(System::new_with_specifics(
216            RefreshKind::nothing().with_processes(ProcessRefreshKind::nothing().with_memory()),
217        ))
218    });
219
220    let mut sys_guard = sys.lock().ok()?;
221    sys_guard.refresh_processes_specifics(
222        sysinfo::ProcessesToUpdate::All,
223        false,
224        ProcessRefreshKind::nothing().with_memory(),
225    );
226
227    // Get current process RSS
228    let pid = sysinfo::get_current_pid().ok()?;
229    let process = sys_guard.process(pid)?;
230    Some(process.memory()) // sysinfo returns bytes
231}
232
233#[cfg(feature = "memory-debug")]
234/// Log comprehensive memory statistics with accuracy tracking
235pub fn log_comprehensive_memory_stats(stats: &PipelineStats) {
236    // Update RSS
237    if let Some(rss) = get_process_rss_bytes() {
238        stats.update_system_rss(rss);
239    }
240
241    // Queue memory integration deferred — reading actual queue sizes requires a pipeline
242    // reference that is not available here.  Queue memory is tracked through estimates only.
243
244    let breakdown = stats.get_memory_breakdown();
245
246    // Main memory line with RSS vs tracked accuracy
247    if breakdown.system_rss_gb > 0.0 {
248        let pct = (breakdown.tracked_total_gb / breakdown.system_rss_gb * 100.0) as u32;
249        log::info!(
250            "MEMORY: RSS={:.1}GB Tracked={:.1}GB ({}%) | Queue: Q1:{:.0}MB Q2:{:.0}MB Q3:{:.0}MB Q4:{:.1}GB Q5:{:.1}GB Q6:{:.0}MB Q7:{:.0}MB | Proc: Pos={:.1}GB Tmpl={:.1}GB | Infra={:.0}MB",
251            breakdown.system_rss_gb,
252            breakdown.tracked_total_gb,
253            pct,
254            breakdown.q1_mb,
255            breakdown.q2_mb,
256            breakdown.q3_mb,
257            breakdown.q4_gb,
258            breakdown.q5_gb,
259            breakdown.q6_mb,
260            breakdown.q7_mb,
261            breakdown.position_groups_gb,
262            breakdown.templates_gb,
263            breakdown.infrastructure_gb * 1e3, // Convert GB to MB for display
264        );
265    } else {
266        log::info!(
267            "MEMORY: Tracked={:.1}GB | Queue: Q1:{:.0}MB Q2:{:.0}MB Q3:{:.0}MB Q4:{:.1}GB Q5:{:.1}GB Q6:{:.0}MB Q7:{:.0}MB | Proc: Pos={:.1}GB Tmpl={:.1}GB | Infra={:.0}MB",
268            breakdown.tracked_total_gb,
269            breakdown.q1_mb,
270            breakdown.q2_mb,
271            breakdown.q3_mb,
272            breakdown.q4_gb,
273            breakdown.q5_gb,
274            breakdown.q6_mb,
275            breakdown.q7_mb,
276            breakdown.position_groups_gb,
277            breakdown.templates_gb,
278            breakdown.infrastructure_gb * 1e3,
279        );
280    }
281
282    // Untracked memory details (only if RSS is available)
283    if breakdown.system_rss_gb > 0.0 {
284        let untracked_pct = ((breakdown.untracked_gb / breakdown.system_rss_gb) * 100.0) as u32;
285        if breakdown.untracked_gb > 1.0 {
286            log::info!(
287                "   Untracked: {:.1}GB ({}%) = allocator fragmentation + noodles internals",
288                breakdown.untracked_gb,
289                untracked_pct,
290            );
291        }
292    }
293}
294
295#[cfg(feature = "memory-debug")]
296/// Start memory monitoring thread that reports at the given interval.
297pub fn start_memory_monitor(
298    stats: Arc<PipelineStats>,
299    shutdown_signal: Arc<AtomicBool>,
300    report_interval_secs: u64,
301) -> thread::JoinHandle<()> {
302    thread::spawn(move || {
303        let mut last_report = Instant::now();
304        let report_interval = Duration::from_secs(report_interval_secs);
305
306        let mut last_rss: u64 = 0;
307        let mut peak_rss: u64 = 0;
308        let mut stats_printed = false;
309        while !shutdown_signal.load(Ordering::Relaxed) {
310            if last_report.elapsed() >= report_interval {
311                log_comprehensive_memory_stats(&stats);
312                let current_rss = stats.memory.system_rss_bytes.load(Ordering::Relaxed);
313                // Print mimalloc stats when RSS starts declining (just past peak)
314                if !stats_printed
315                    && current_rss > 0
316                    && last_rss > 0
317                    && current_rss < last_rss
318                    && peak_rss > 4_000_000_000
319                {
320                    log::info!("=== MIMALLOC STATS AT PEAK (no mi_collect) ===");
321                    // SAFETY: `mi_stats_print_out(None, null_mut())` prints mimalloc allocator
322                    // statistics to stderr using the default output handler. mimalloc internally
323                    // synchronizes stats collection, making this safe to call concurrently with
324                    // allocation/deallocation on other threads.
325                    unsafe {
326                        libmimalloc_sys::mi_stats_print_out(None, std::ptr::null_mut());
327                    }
328                    stats_printed = true;
329                }
330                if current_rss > peak_rss {
331                    peak_rss = current_rss;
332                }
333                last_rss = current_rss;
334                last_report = Instant::now();
335            }
336            thread::sleep(Duration::from_millis(100));
337        }
338    })
339}
340
341// ============================================================================
342// Batch Weight Trait (for template-based batching)
343// ============================================================================
344
345/// Trait for groups that can report their "weight" for batching purposes.
346///
347/// The weight is typically the number of templates in the group, allowing
348/// the pipeline to batch groups based on total templates rather than group count.
349/// This provides more consistent batch sizes across datasets with varying
350/// templates-per-group ratios.
351///
352/// # Example
353///
354/// For a position group with 10 templates, `batch_weight()` returns 10.
355/// The pipeline accumulates groups until the total weight reaches a threshold
356/// (e.g., 500 templates), then flushes the batch.
357pub trait BatchWeight {
358    /// Returns the weight of this group for batching purposes.
359    /// For position groups, this is typically the number of templates.
360    fn batch_weight(&self) -> usize;
361}
362
363// ============================================================================
364// Memory Estimate Trait (for memory-based queue limits)
365// ============================================================================
366
367/// Trait for types that can estimate their heap memory usage.
368///
369/// This is used by memory-bounded queues to track how much memory is
370/// consumed by items in the queue. The estimate should include heap
371/// allocations (Vec contents, String data, etc.) but not the struct itself.
372///
373/// # Example
374///
375/// For a batch containing a `Vec<u8>` with 1000 bytes, `estimate_heap_size()`
376/// returns approximately 1000 (the Vec's heap allocation).
377pub trait MemoryEstimate {
378    /// Returns an estimate of the heap memory used by this value in bytes.
379    ///
380    /// This should include:
381    /// - Vec/String heap allocations (capacity, not just len)
382    /// - Nested struct heap allocations
383    ///
384    /// This should NOT include:
385    /// - The size of the struct itself (stack size)
386    /// - Shared/reference-counted data (counted once, not per-reference)
387    fn estimate_heap_size(&self) -> usize;
388}
389
390// ============================================================================
391// Memory Tracking for Queues
392// ============================================================================
393
394/// Tracks memory usage across pipeline queues.
395///
396/// This tracker uses atomic operations to enable lock-free memory accounting.
397/// It's designed to provide backpressure when queue memory exceeds a limit,
398/// preventing unbounded memory growth in threaded pipelines.
399///
400/// # Usage
401///
402/// ```ignore
403/// let tracker = MemoryTracker::new(1_000_000_000); // 1 GB limit
404///
405/// // Before pushing to a queue
406/// let item_size = item.estimate_heap_size();
407/// if tracker.try_add(item_size) {
408///     queue.push(item);
409/// } else {
410///     // Queue is at memory limit - apply backpressure
411/// }
412///
413/// // After popping from a queue
414/// let item = queue.pop();
415/// let item_size = item.estimate_heap_size();
416/// tracker.remove(item_size);
417/// ```
418#[derive(Debug)]
419#[allow(clippy::struct_field_names)]
420pub struct MemoryTracker {
421    /// Current memory usage in bytes.
422    current_bytes: AtomicU64,
423    /// Peak memory usage in bytes (for stats).
424    peak_bytes: AtomicU64,
425    /// Maximum allowed memory in bytes. 0 = no limit.
426    limit_bytes: u64,
427}
428
429impl MemoryTracker {
430    /// Create a new memory tracker with the specified limit.
431    ///
432    /// # Arguments
433    /// * `limit_bytes` - Maximum allowed memory in bytes. Use 0 for no limit.
434    #[must_use]
435    pub fn new(limit_bytes: u64) -> Self {
436        Self { current_bytes: AtomicU64::new(0), peak_bytes: AtomicU64::new(0), limit_bytes }
437    }
438
439    /// Create a new memory tracker with no limit.
440    #[must_use]
441    pub fn unlimited() -> Self {
442        Self::new(0)
443    }
444
445    /// Try to add bytes to the tracker.
446    ///
447    /// Returns `true` if the memory was added, `false` if we're already at or
448    /// above the limit. The key behavior is: if we're currently under the limit,
449    /// any single addition succeeds (even if it would exceed the limit). We only
450    /// reject additions when we're already at or above capacity.
451    ///
452    /// Note: When the limit is 0 (no limit), this always returns `true`.
453    pub fn try_add(&self, bytes: usize) -> bool {
454        if self.limit_bytes == 0 {
455            // No limit - just track without checking
456            let new_total =
457                self.current_bytes.fetch_add(bytes as u64, Ordering::Relaxed) + bytes as u64;
458            self.update_peak(new_total);
459            return true;
460        }
461
462        // Use compare-and-swap loop to atomically check and add
463        loop {
464            let current = self.current_bytes.load(Ordering::Relaxed);
465
466            // If we're already at or above the limit, reject the addition
467            if current >= self.limit_bytes {
468                return false;
469            }
470
471            // We're under the limit, so allow this addition (even if it exceeds)
472            let new_total = current + bytes as u64;
473
474            // Try to update atomically
475            if self
476                .current_bytes
477                .compare_exchange_weak(current, new_total, Ordering::Relaxed, Ordering::Relaxed)
478                .is_ok()
479            {
480                self.update_peak(new_total);
481                return true;
482            }
483            // CAS failed - another thread modified, retry
484        }
485    }
486
487    /// Update peak memory if `new_total` is higher.
488    #[inline]
489    fn update_peak(&self, new_total: u64) {
490        let mut current_peak = self.peak_bytes.load(Ordering::Relaxed);
491        while new_total > current_peak {
492            match self.peak_bytes.compare_exchange_weak(
493                current_peak,
494                new_total,
495                Ordering::Relaxed,
496                Ordering::Relaxed,
497            ) {
498                Ok(_) => break,
499                Err(actual) => current_peak = actual,
500            }
501        }
502    }
503
504    /// Remove bytes from the tracker (call when item is consumed from queue).
505    /// Uses saturating subtraction to prevent underflow from estimation mismatches.
506    pub fn remove(&self, bytes: usize) {
507        let bytes = bytes as u64;
508        let mut current = self.current_bytes.load(Ordering::Relaxed);
509        loop {
510            let new_val = current.saturating_sub(bytes);
511            match self.current_bytes.compare_exchange_weak(
512                current,
513                new_val,
514                Ordering::Relaxed,
515                Ordering::Relaxed,
516            ) {
517                Ok(_) => break,
518                Err(actual) => current = actual,
519            }
520        }
521    }
522
523    /// Add bytes unconditionally (for tracking after a successful push).
524    /// Unlike `try_add`, this always adds and doesn't check the limit.
525    pub fn add(&self, bytes: usize) {
526        let new_total =
527            self.current_bytes.fetch_add(bytes as u64, Ordering::Relaxed) + bytes as u64;
528        self.update_peak(new_total);
529    }
530
531    /// Get the current memory usage in bytes.
532    #[must_use]
533    pub fn current(&self) -> u64 {
534        self.current_bytes.load(Ordering::Relaxed)
535    }
536
537    /// Get the memory limit in bytes. Returns 0 if no limit.
538    #[must_use]
539    pub fn limit(&self) -> u64 {
540        self.limit_bytes
541    }
542
543    /// Get the peak memory usage in bytes.
544    #[must_use]
545    pub fn peak(&self) -> u64 {
546        self.peak_bytes.load(Ordering::Relaxed)
547    }
548
549    /// Check if we're at or above the backpressure threshold.
550    ///
551    /// For optimal performance, we apply backpressure at a capped threshold
552    /// (512MB) regardless of the configured limit. This keeps the pipeline
553    /// lean with good cache behavior. The configured limit acts as a safety
554    /// cap but doesn't delay backpressure.
555    ///
556    /// Even with no limit (`limit_bytes` == 0), we still apply backpressure
557    /// at 512MB to maintain optimal throughput.
558    #[must_use]
559    pub fn is_at_limit(&self) -> bool {
560        // Always apply backpressure at 512MB for optimal cache behavior.
561        // If a lower limit is configured, use that instead.
562        let backpressure_threshold = if self.limit_bytes == 0 {
563            BACKPRESSURE_THRESHOLD_BYTES
564        } else {
565            self.limit_bytes.min(BACKPRESSURE_THRESHOLD_BYTES)
566        };
567        self.current() >= backpressure_threshold
568    }
569
570    /// Check if we've drained below the low-water mark.
571    ///
572    /// This is used for hysteresis to prevent thrashing: we enter drain mode
573    /// when at the backpressure threshold, but only exit when we've drained
574    /// to half that threshold.
575    ///
576    /// The threshold is 256MB (half of the 512MB backpressure cap), or half
577    /// of a lower configured limit if one is set.
578    #[must_use]
579    pub fn is_below_drain_threshold(&self) -> bool {
580        // Drain threshold is half of the backpressure threshold
581        let backpressure_threshold = if self.limit_bytes == 0 {
582            BACKPRESSURE_THRESHOLD_BYTES
583        } else {
584            self.limit_bytes.min(BACKPRESSURE_THRESHOLD_BYTES)
585        };
586        self.current() < backpressure_threshold / 2
587    }
588}
589
590impl Default for MemoryTracker {
591    fn default() -> Self {
592        Self::unlimited()
593    }
594}
595
596// ============================================================================
597// Step Result Enum
598// ============================================================================
599
600/// Result of attempting a pipeline step.
601///
602/// This enum enables non-blocking step functions that can report why they
603/// couldn't make progress, allowing the scheduler to make informed decisions
604/// about which step to try next.
605///
606/// # Deadlock Prevention
607///
608/// By returning immediately with `OutputFull` or `InputEmpty` instead of
609/// blocking, workers can try other steps (especially Write to drain queues),
610/// preventing the deadlock that occurs when all threads block on push.
611#[derive(Debug, Clone, Copy, PartialEq, Eq)]
612pub enum StepResult {
613    /// Successfully processed and advanced an item.
614    Success,
615    /// Output queue is full - should try downstream steps to drain.
616    OutputFull,
617    /// Input queue is empty - should try upstream steps to fill.
618    InputEmpty,
619}
620
621impl StepResult {
622    /// Returns true if the step made progress.
623    #[inline]
624    #[must_use]
625    pub fn is_success(self) -> bool {
626        matches!(self, StepResult::Success)
627    }
628}
629
630/// Progress logging interval for the 7-step pipeline.
631pub const PROGRESS_LOG_INTERVAL: u64 = 1_000_000;
632
633/// Memory threshold for scheduler backpressure signaling (512 MB).
634///
635/// This constant controls when the pipeline signals memory pressure to the scheduler,
636/// which responds by prioritizing downstream steps (Process, Serialize, Compress, Write)
637/// to drain buffered data before allowing more input.
638///
639/// # Architecture
640///
641/// Memory is tracked at the reorder buffer BEFORE the Group step:
642/// - **BAM pipeline**: `q3_reorder_heap_bytes` (after Decode, before Group)
643/// - **FASTQ pipeline**: `q2_reorder_heap_bytes` (after Decompress, before Group)
644///
645/// This placement is critical: we track memory before the exclusive Group step rather
646/// than after it. Tracking after Group creates a coordination problem where memory is
647/// released from the pre-Group buffer before knowing if the post-Group queue can accept
648/// it, potentially leaving data in an untracked intermediate buffer.
649///
650/// # Threshold Behavior
651///
652/// - When tracked memory >= `BACKPRESSURE_THRESHOLD_BYTES`, `is_memory_high()` returns true
653/// - The scheduler enters "drain mode" and prioritizes output steps
654/// - When tracked memory < `BACKPRESSURE_THRESHOLD_BYTES / 2`, `is_memory_drained()` returns true
655/// - The scheduler exits drain mode (hysteresis prevents thrashing)
656///
657/// # Relationship to Queue Memory Limit
658///
659/// This threshold is independent of `queue_memory_limit` (default 4GB), which controls
660/// the hard limit for `can_decompress_proceed()` / `can_decode_proceed()`. The backpressure
661/// threshold is deliberately lower to allow gradual slowdown before hitting hard limits.
662///
663/// The effective threshold is `min(queue_memory_limit, BACKPRESSURE_THRESHOLD_BYTES)`,
664/// so if a smaller memory limit is configured, backpressure activates at that limit.
665pub const BACKPRESSURE_THRESHOLD_BYTES: u64 = 512 * 1024 * 1024; // 512 MB
666
667/// Q5 (processed queue) backpressure threshold.
668///
669/// This is set lower than the Q3 threshold (256 MB vs 512 MB) because items in Q5
670/// are typically larger (e.g., `SimplexProcessedBatch` with `RecordBuf` vectors).
671/// When Q5 memory exceeds this threshold, the Process step pauses to let
672/// downstream steps (Serialize, Compress, Write) catch up.
673pub const Q5_BACKPRESSURE_THRESHOLD_BYTES: u64 = 256 * 1024 * 1024; // 256 MB
674
675// ============================================================================
676// Input-Half Reorder Buffer State (shared between BAM and FASTQ pipelines)
677// ============================================================================
678
679/// Atomic state for tracking a reorder buffer's memory and sequence position.
680///
681/// This struct encapsulates the atomic counters needed for lock-free backpressure
682/// decisions on reorder buffers. Both BAM and FASTQ pipelines use this pattern
683/// for their pre-Group reorder buffers (Q3 in BAM, Q2 in FASTQ).
684///
685/// # Usage Pattern
686///
687/// ```ignore
688/// // Producer (Decompress/Decode step):
689/// if state.can_proceed(serial, memory_limit) {
690///     // Push to reorder buffer and update heap bytes
691///     state.add_heap_bytes(item_size);
692/// }
693///
694/// // Consumer (Group step):
695/// if let Some(item) = reorder_buffer.try_pop_next() {
696///     state.update_next_seq(new_seq);
697///     state.sub_heap_bytes(item_size);
698/// }
699/// ```
700#[derive(Debug)]
701pub struct ReorderBufferState {
702    /// Next sequence number the reorder buffer needs to make progress.
703    /// Updated by the consumer (Group step) after popping from the buffer.
704    pub next_seq: AtomicU64,
705    /// Current heap bytes tracked in the reorder buffer.
706    /// Updated by producer (add) and consumer (sub).
707    pub heap_bytes: AtomicU64,
708    /// Memory limit for backpressure. 0 means use default threshold.
709    memory_limit: u64,
710}
711
712impl ReorderBufferState {
713    /// Create a new reorder buffer state with the given memory limit.
714    ///
715    /// # Arguments
716    /// * `memory_limit` - Memory limit in bytes. Use 0 for default threshold.
717    #[must_use]
718    pub fn new(memory_limit: u64) -> Self {
719        Self { next_seq: AtomicU64::new(0), heap_bytes: AtomicU64::new(0), memory_limit }
720    }
721
722    /// Check if a producer can proceed with the given serial number.
723    ///
724    /// Always allows the serial that the consumer needs (`next_seq`) to proceed,
725    /// even if over memory limit. This ensures the consumer can always make progress.
726    /// For other serials, applies backpressure if over 50% of the limit.
727    #[must_use]
728    pub fn can_proceed(&self, serial: u64) -> bool {
729        let limit = self.effective_limit();
730        let next_seq = self.next_seq.load(Ordering::Acquire);
731        let heap_bytes = self.heap_bytes.load(Ordering::Acquire);
732
733        // Always accept the serial the reorder buffer needs to make progress.
734        // This fills gaps and allows the consumer to pop items, reducing memory.
735        if serial == next_seq {
736            return true;
737        }
738
739        // Apply backpressure if over 50% of limit.
740        let effective_limit = limit / 2;
741        heap_bytes < effective_limit
742    }
743
744    /// Check if memory is at the backpressure threshold.
745    ///
746    /// Returns true when tracked memory >= threshold, signaling that the
747    /// scheduler should enter drain mode and prioritize output steps.
748    #[must_use]
749    pub fn is_memory_high(&self) -> bool {
750        let threshold = self.effective_limit();
751        self.heap_bytes.load(Ordering::Acquire) >= threshold
752    }
753
754    /// Check if memory has drained below the low-water mark.
755    ///
756    /// Provides hysteresis to prevent thrashing: enter drain mode at backpressure
757    /// threshold, only exit when drained to half that threshold.
758    #[must_use]
759    pub fn is_memory_drained(&self) -> bool {
760        let threshold = self.effective_limit();
761        self.heap_bytes.load(Ordering::Acquire) < threshold / 2
762    }
763
764    /// Get the effective memory limit (respecting default threshold).
765    #[inline]
766    #[must_use]
767    fn effective_limit(&self) -> u64 {
768        if self.memory_limit == 0 {
769            BACKPRESSURE_THRESHOLD_BYTES
770        } else {
771            self.memory_limit.min(BACKPRESSURE_THRESHOLD_BYTES)
772        }
773    }
774
775    /// Add heap bytes to the tracker (after pushing to reorder buffer).
776    #[inline]
777    pub fn add_heap_bytes(&self, bytes: u64) {
778        self.heap_bytes.fetch_add(bytes, Ordering::AcqRel);
779    }
780
781    /// Subtract heap bytes from the tracker (after popping from reorder buffer).
782    #[inline]
783    pub fn sub_heap_bytes(&self, bytes: u64) {
784        self.heap_bytes.fetch_sub(bytes, Ordering::AcqRel);
785    }
786
787    /// Update the next sequence number (after consumer advances).
788    #[inline]
789    pub fn update_next_seq(&self, new_seq: u64) {
790        self.next_seq.store(new_seq, Ordering::Release);
791    }
792
793    /// Get the current next sequence number.
794    #[inline]
795    #[must_use]
796    pub fn get_next_seq(&self) -> u64 {
797        self.next_seq.load(Ordering::Acquire)
798    }
799
800    /// Get the current heap bytes.
801    #[inline]
802    #[must_use]
803    pub fn get_heap_bytes(&self) -> u64 {
804        self.heap_bytes.load(Ordering::Acquire)
805    }
806
807    /// Get the configured memory limit.
808    #[inline]
809    #[must_use]
810    pub fn get_memory_limit(&self) -> u64 {
811        self.memory_limit
812    }
813}
814
815// ============================================================================
816// BGZF Batch Buffer Types
817// ============================================================================
818
819use crate::bgzf_writer::{CompressedBlock, InlineBgzfCompressor};
820
821/// Batch of raw BGZF blocks (input side).
822///
823/// This is the input buffer type for the unified pipeline when processing
824/// BAM files. It holds multiple raw (compressed) BGZF blocks that will be
825/// decompressed and processed by workers.
826#[derive(Default)]
827pub struct RawBlockBatch {
828    /// The raw BGZF blocks in this batch.
829    pub blocks: Vec<RawBgzfBlock>,
830}
831
832impl RawBlockBatch {
833    /// Create a new empty batch.
834    #[must_use]
835    pub fn new() -> Self {
836        Self { blocks: Vec::new() }
837    }
838
839    /// Create a batch with pre-allocated capacity.
840    #[must_use]
841    pub fn with_capacity(capacity: usize) -> Self {
842        Self { blocks: Vec::with_capacity(capacity) }
843    }
844
845    /// Number of blocks in this batch.
846    #[must_use]
847    pub fn len(&self) -> usize {
848        self.blocks.len()
849    }
850
851    /// Returns true if the batch contains no blocks.
852    #[must_use]
853    pub fn is_empty(&self) -> bool {
854        self.blocks.is_empty()
855    }
856
857    /// Total uncompressed size of all blocks.
858    #[must_use]
859    pub fn total_uncompressed_size(&self) -> usize {
860        self.blocks.iter().map(RawBgzfBlock::uncompressed_size).sum()
861    }
862
863    /// Total compressed size of all blocks (raw bytes read from file).
864    #[must_use]
865    pub fn total_compressed_size(&self) -> usize {
866        self.blocks.iter().map(RawBgzfBlock::len).sum()
867    }
868
869    /// Clear all blocks from this batch.
870    pub fn clear(&mut self) {
871        self.blocks.clear();
872    }
873}
874
875impl MemoryEstimate for RawBlockBatch {
876    fn estimate_heap_size(&self) -> usize {
877        // Vec overhead + sum of block data sizes
878        self.blocks.iter().map(|b| b.data.capacity()).sum::<usize>()
879            + self.blocks.capacity() * std::mem::size_of::<RawBgzfBlock>()
880    }
881}
882
883/// Batch of compressed BGZF blocks (output side).
884///
885/// This is the output buffer type for the unified pipeline. It holds
886/// compressed blocks ready to be written to the output file.
887#[derive(Default)]
888pub struct CompressedBlockBatch {
889    /// The compressed blocks in this batch.
890    pub blocks: Vec<CompressedBlock>,
891    /// Number of records/templates in this batch (for progress logging).
892    pub record_count: u64,
893    /// Optional secondary data passed through without compression.
894    /// Used for dual-output pipelines (e.g., rejected records).
895    pub secondary_data: Option<Vec<u8>>,
896}
897
898impl CompressedBlockBatch {
899    /// Create a new empty batch.
900    #[must_use]
901    pub fn new() -> Self {
902        Self { blocks: Vec::new(), record_count: 0, secondary_data: None }
903    }
904
905    /// Create a batch with pre-allocated capacity.
906    #[must_use]
907    pub fn with_capacity(capacity: usize) -> Self {
908        Self { blocks: Vec::with_capacity(capacity), record_count: 0, secondary_data: None }
909    }
910
911    /// Number of blocks in this batch.
912    #[must_use]
913    pub fn len(&self) -> usize {
914        self.blocks.len()
915    }
916
917    /// Returns true if the batch contains no blocks.
918    #[must_use]
919    pub fn is_empty(&self) -> bool {
920        self.blocks.is_empty()
921    }
922
923    /// Total size of all compressed blocks.
924    #[must_use]
925    pub fn total_size(&self) -> usize {
926        self.blocks.iter().map(|b| b.data.len()).sum()
927    }
928
929    /// Clear all blocks from this batch.
930    pub fn clear(&mut self) {
931        self.blocks.clear();
932        self.record_count = 0;
933        self.secondary_data = None;
934    }
935}
936
937impl MemoryEstimate for CompressedBlockBatch {
938    fn estimate_heap_size(&self) -> usize {
939        // Vec overhead + sum of block data sizes
940        self.blocks.iter().map(|b| b.data.capacity()).sum::<usize>()
941            + self.blocks.capacity() * std::mem::size_of::<CompressedBlock>()
942            + self.secondary_data.as_ref().map_or(0, Vec::capacity)
943    }
944}
945
946/// Configuration for BGZF batch processing.
947#[derive(Debug, Clone)]
948pub struct BgzfBatchConfig {
949    /// Number of raw blocks to read per batch.
950    pub blocks_per_batch: usize,
951    /// Compression level for output (0-12, default 6).
952    pub compression_level: u32,
953}
954
955impl Default for BgzfBatchConfig {
956    fn default() -> Self {
957        Self { blocks_per_batch: 16, compression_level: 6 }
958    }
959}
960
961impl BgzfBatchConfig {
962    /// Create a new configuration with the given blocks per batch.
963    #[must_use]
964    pub fn new(blocks_per_batch: usize) -> Self {
965        Self { blocks_per_batch, ..Default::default() }
966    }
967
968    /// Set the compression level.
969    #[must_use]
970    pub fn with_compression_level(mut self, level: u32) -> Self {
971        self.compression_level = level;
972        self
973    }
974}
975
976/// Read a batch of raw BGZF blocks into a buffer.
977///
978/// This is the read function for the unified pipeline when processing BAM files.
979/// Returns `Ok(true)` if blocks were read, `Ok(false)` at EOF.
980///
981/// # Errors
982///
983/// Returns an I/O error if reading from the underlying reader fails.
984pub fn read_raw_block_batch(
985    reader: &mut dyn Read,
986    buffer: &mut RawBlockBatch,
987    blocks_per_batch: usize,
988) -> io::Result<bool> {
989    buffer.clear();
990    let blocks = read_raw_blocks(reader, blocks_per_batch)?;
991    if blocks.is_empty() {
992        return Ok(false);
993    }
994    buffer.blocks = blocks;
995    Ok(true)
996}
997
998/// Write a batch of compressed blocks to output.
999///
1000/// This is the write function for the unified pipeline.
1001///
1002/// # Errors
1003///
1004/// Returns an I/O error if writing to the underlying writer fails.
1005pub fn write_compressed_batch(
1006    writer: &mut dyn Write,
1007    batch: &CompressedBlockBatch,
1008) -> io::Result<()> {
1009    for block in &batch.blocks {
1010        writer.write_all(&block.data)?;
1011    }
1012    Ok(())
1013}
1014
1015/// Per-worker state for BGZF processing.
1016///
1017/// Each worker thread has its own decompressor and compressor instances
1018/// to avoid synchronization overhead.
1019pub struct BgzfWorkerState {
1020    /// Decompressor for input blocks.
1021    pub decompressor: libdeflater::Decompressor,
1022    /// Compressor for output blocks.
1023    pub compressor: InlineBgzfCompressor,
1024}
1025
1026impl BgzfWorkerState {
1027    /// Create new worker state with the given compression level.
1028    #[must_use]
1029    pub fn new(compression_level: u32) -> Self {
1030        Self {
1031            decompressor: libdeflater::Decompressor::new(),
1032            compressor: InlineBgzfCompressor::new(compression_level),
1033        }
1034    }
1035
1036    /// Decompress a batch of raw blocks into uncompressed data.
1037    ///
1038    /// Returns the concatenated uncompressed data from all blocks.
1039    ///
1040    /// # Errors
1041    ///
1042    /// Returns an I/O error if decompression fails.
1043    pub fn decompress_batch(&mut self, batch: &RawBlockBatch) -> io::Result<Vec<u8>> {
1044        let total_size = batch.total_uncompressed_size();
1045        let mut result = Vec::with_capacity(total_size);
1046
1047        for block in &batch.blocks {
1048            decompress_block_into(block, &mut self.decompressor, &mut result)?;
1049        }
1050
1051        Ok(result)
1052    }
1053}
1054
1055// ============================================================================
1056// 9-Step Pipeline Infrastructure
1057// ============================================================================
1058//
1059// This section implements the 9-step unified pipeline where any thread
1060// can execute any step, but some steps are exclusive (only one thread at a time).
1061//
1062// Steps:
1063// 1. Read (exclusive) - Read raw BGZF blocks from input file
1064// 2. Decompress (parallel) - Decompress blocks using libdeflater
1065// 3. FindBoundaries (exclusive, FAST) - Find record boundaries in decompressed data
1066// 4. Decode (parallel) - Decode BAM records at known boundaries
1067// 5. Group (exclusive) - Group decoded records
1068// 6. Process (parallel) - Command-specific processing
1069// 7. Serialize (parallel) - Encode output records to BAM bytes
1070// 8. Compress (parallel) - Compress to BGZF blocks
1071// 9. Write (exclusive) - Write blocks to output file
1072//
1073// Queues: Q1(1→2), Q2(2→3, reorder), Q2b(3→4), Q3(4→5, reorder), Q4(5→6), Q5(6→7), Q6(7→8), Q7(8→9, reorder)
1074
1075/// The 9 pipeline steps.
1076#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1077pub enum PipelineStep {
1078    /// Step 1: Read raw BGZF blocks from input (exclusive - file mutex)
1079    Read,
1080    /// Step 2: Decompress blocks (parallel)
1081    Decompress,
1082    /// Step 3: Find record boundaries in decompressed data (exclusive, FAST ~0.1μs/block)
1083    FindBoundaries,
1084    /// Step 4: Decode BAM records at known boundaries (parallel)
1085    Decode,
1086    /// Step 5: Group decoded records (exclusive - grouper state)
1087    Group,
1088    /// Step 6: Process groups - command-specific work (parallel)
1089    Process,
1090    /// Step 7: Serialize output records to bytes (parallel)
1091    Serialize,
1092    /// Step 8: Compress bytes to BGZF blocks (parallel)
1093    Compress,
1094    /// Step 9: Write blocks to output file (exclusive - file mutex)
1095    Write,
1096}
1097
1098impl PipelineStep {
1099    /// Returns true if this step requires exclusive access (only one thread at a time).
1100    #[must_use]
1101    pub const fn is_exclusive(&self) -> bool {
1102        matches!(self, Self::Read | Self::FindBoundaries | Self::Group | Self::Write)
1103    }
1104
1105    /// Returns all steps in pipeline order.
1106    #[must_use]
1107    pub const fn all() -> [PipelineStep; 9] {
1108        [
1109            PipelineStep::Read,
1110            PipelineStep::Decompress,
1111            PipelineStep::FindBoundaries,
1112            PipelineStep::Decode,
1113            PipelineStep::Group,
1114            PipelineStep::Process,
1115            PipelineStep::Serialize,
1116            PipelineStep::Compress,
1117            PipelineStep::Write,
1118        ]
1119    }
1120
1121    /// Convert from step index to `PipelineStep`.
1122    ///
1123    /// # Panics
1124    ///
1125    /// Panics if `idx` is greater than 8.
1126    #[must_use]
1127    pub const fn from_index(idx: usize) -> PipelineStep {
1128        match idx {
1129            0 => PipelineStep::Read,
1130            1 => PipelineStep::Decompress,
1131            2 => PipelineStep::FindBoundaries,
1132            3 => PipelineStep::Decode,
1133            4 => PipelineStep::Group,
1134            5 => PipelineStep::Process,
1135            6 => PipelineStep::Serialize,
1136            7 => PipelineStep::Compress,
1137            8 => PipelineStep::Write,
1138            _ => panic!("PipelineStep::from_index: invalid index (must be 0..=8)"),
1139        }
1140    }
1141
1142    /// Get the 0-based index of this step (Read=0, ..., Write=8).
1143    #[must_use]
1144    pub const fn index(&self) -> usize {
1145        match self {
1146            PipelineStep::Read => 0,
1147            PipelineStep::Decompress => 1,
1148            PipelineStep::FindBoundaries => 2,
1149            PipelineStep::Decode => 3,
1150            PipelineStep::Group => 4,
1151            PipelineStep::Process => 5,
1152            PipelineStep::Serialize => 6,
1153            PipelineStep::Compress => 7,
1154            PipelineStep::Write => 8,
1155        }
1156    }
1157
1158    /// Get short name for display.
1159    #[must_use]
1160    pub const fn short_name(&self) -> &'static str {
1161        match self {
1162            PipelineStep::Read => "Rd",
1163            PipelineStep::Decompress => "Dc",
1164            PipelineStep::FindBoundaries => "Fb",
1165            PipelineStep::Decode => "De",
1166            PipelineStep::Group => "Gr",
1167            PipelineStep::Process => "Pr",
1168            PipelineStep::Serialize => "Se",
1169            PipelineStep::Compress => "Co",
1170            PipelineStep::Write => "Wr",
1171        }
1172    }
1173}
1174
1175// ============================================================================
1176// ActiveSteps - Configurable set of active pipeline steps
1177// ============================================================================
1178
1179/// Tracks which pipeline steps are active for a given pipeline configuration.
1180///
1181/// Some steps may be skipped depending on input type and mode:
1182/// - Gzip+synchronized: skips `Decompress`, `FindBoundaries`, `Group`
1183/// - BGZF+synchronized: skips `Group`
1184/// - BAM (all active): all 9 steps
1185#[derive(Debug, Clone)]
1186pub struct ActiveSteps {
1187    /// Active steps in pipeline order.
1188    steps: Vec<PipelineStep>,
1189    /// Fast lookup: `active[step.index()]` is true iff step is active.
1190    active: [bool; 9],
1191}
1192
1193impl ActiveSteps {
1194    /// Create from a list of active steps (must be in pipeline order, unique).
1195    ///
1196    /// # Panics
1197    ///
1198    /// Panics if steps are not in ascending pipeline order or contain duplicates.
1199    #[must_use]
1200    pub fn new(steps: &[PipelineStep]) -> Self {
1201        assert!(
1202            steps.windows(2).all(|w| w[0].index() < w[1].index()),
1203            "ActiveSteps must be unique and in pipeline order"
1204        );
1205        let mut active = [false; 9];
1206        for &step in steps {
1207            active[step.index()] = true;
1208        }
1209        Self { steps: steps.to_vec(), active }
1210    }
1211
1212    /// All 9 steps active (BAM pipeline, default).
1213    #[must_use]
1214    pub fn all() -> Self {
1215        Self::new(&PipelineStep::all())
1216    }
1217
1218    /// Check if a step is active.
1219    #[must_use]
1220    pub fn is_active(&self, step: PipelineStep) -> bool {
1221        self.active[step.index()]
1222    }
1223
1224    /// Get the active steps in pipeline order.
1225    #[must_use]
1226    pub fn steps(&self) -> &[PipelineStep] {
1227        &self.steps
1228    }
1229
1230    /// Get only the active exclusive steps in pipeline order.
1231    #[must_use]
1232    pub fn exclusive_steps(&self) -> Vec<PipelineStep> {
1233        self.steps.iter().copied().filter(PipelineStep::is_exclusive).collect()
1234    }
1235
1236    /// Number of active steps.
1237    #[must_use]
1238    pub fn len(&self) -> usize {
1239        self.steps.len()
1240    }
1241
1242    /// Returns true if no steps are active.
1243    #[must_use]
1244    pub fn is_empty(&self) -> bool {
1245        self.steps.is_empty()
1246    }
1247
1248    /// Filter a priority buffer in-place, keeping only active steps.
1249    /// Returns the number of active steps remaining.
1250    pub fn filter_in_place(&self, buffer: &mut [PipelineStep; 9]) -> usize {
1251        let mut write = 0;
1252        for read in 0..9 {
1253            if self.active[buffer[read].index()] {
1254                buffer[write] = buffer[read];
1255                write += 1;
1256            }
1257        }
1258        write
1259    }
1260}
1261
1262// ============================================================================
1263// GroupKey - Pre-computed grouping key for fast comparison in Group step
1264// ============================================================================
1265
1266/// Pre-computed grouping key for fast comparison in Group step.
1267///
1268/// All fields are integers/hashes for O(1) comparison. This is computed during
1269/// the parallel Decode step so the serial Group step only does integer comparisons.
1270///
1271/// For paired-end reads, positions are normalized so the lower position comes first.
1272/// For single-end reads, the mate fields use `UNKNOWN_*` sentinel values.
1273#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1274pub struct GroupKey {
1275    // Position info (normalized: lower position first)
1276    /// Reference sequence index for position 1 (lower).
1277    pub ref_id1: i32,
1278    /// Unclipped 5' position for position 1.
1279    pub pos1: i32,
1280    /// Strand for position 1 (0=forward, 1=reverse).
1281    pub strand1: u8,
1282    /// Reference sequence index for position 2 (higher or mate).
1283    pub ref_id2: i32,
1284    /// Unclipped 5' position for position 2.
1285    pub pos2: i32,
1286    /// Strand for position 2.
1287    pub strand2: u8,
1288
1289    // Grouping metadata
1290    /// Library index (pre-computed from RG tag via header lookup).
1291    pub library_idx: u16,
1292    /// Hash of cell barcode (0 if none).
1293    pub cell_hash: u64,
1294
1295    // For name-based grouping within position groups
1296    /// Hash of QNAME for fast name comparison.
1297    pub name_hash: u64,
1298}
1299
1300impl GroupKey {
1301    /// Sentinel value for unknown reference ID (unpaired reads).
1302    pub const UNKNOWN_REF: i32 = i32::MAX;
1303    /// Sentinel value for unknown position (unpaired reads).
1304    pub const UNKNOWN_POS: i32 = i32::MAX;
1305    /// Sentinel value for unknown strand (unpaired reads).
1306    pub const UNKNOWN_STRAND: u8 = u8::MAX;
1307
1308    /// Create a `GroupKey` for a paired-end read with mate info.
1309    ///
1310    /// Positions are automatically normalized so the lower position comes first.
1311    #[must_use]
1312    #[allow(clippy::too_many_arguments)]
1313    pub fn paired(
1314        ref_id: i32,
1315        pos: i32,
1316        strand: u8,
1317        mate_ref_id: i32,
1318        mate_pos: i32,
1319        mate_strand: u8,
1320        library_idx: u16,
1321        cell_hash: u64,
1322        name_hash: u64,
1323    ) -> Self {
1324        // Normalize: put lower position first (matching ReadInfo behavior)
1325        let (ref_id1, pos1, strand1, ref_id2, pos2, strand2) =
1326            if (ref_id, pos, strand) <= (mate_ref_id, mate_pos, mate_strand) {
1327                (ref_id, pos, strand, mate_ref_id, mate_pos, mate_strand)
1328            } else {
1329                (mate_ref_id, mate_pos, mate_strand, ref_id, pos, strand)
1330            };
1331
1332        Self { ref_id1, pos1, strand1, ref_id2, pos2, strand2, library_idx, cell_hash, name_hash }
1333    }
1334
1335    /// Create a `GroupKey` for a single-end/unpaired read.
1336    #[must_use]
1337    pub fn single(
1338        ref_id: i32,
1339        pos: i32,
1340        strand: u8,
1341        library_idx: u16,
1342        cell_hash: u64,
1343        name_hash: u64,
1344    ) -> Self {
1345        Self {
1346            ref_id1: ref_id,
1347            pos1: pos,
1348            strand1: strand,
1349            ref_id2: Self::UNKNOWN_REF,
1350            pos2: Self::UNKNOWN_POS,
1351            strand2: Self::UNKNOWN_STRAND,
1352            library_idx,
1353            cell_hash,
1354            name_hash,
1355        }
1356    }
1357
1358    /// Returns the position-only key for grouping by genomic position.
1359    ///
1360    /// This is used by `RecordPositionGrouper` to determine if records belong to
1361    /// the same position group (ignoring name).
1362    #[must_use]
1363    pub fn position_key(&self) -> (i32, i32, u8, i32, i32, u8, u16, u64) {
1364        (
1365            self.ref_id1,
1366            self.pos1,
1367            self.strand1,
1368            self.ref_id2,
1369            self.pos2,
1370            self.strand2,
1371            self.library_idx,
1372            self.cell_hash,
1373        )
1374    }
1375}
1376
1377impl PartialOrd for GroupKey {
1378    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1379        Some(self.cmp(other))
1380    }
1381}
1382
1383impl Ord for GroupKey {
1384    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1385        self.position_key()
1386            .cmp(&other.position_key())
1387            .then_with(|| self.name_hash.cmp(&other.name_hash))
1388    }
1389}
1390
1391impl Default for GroupKey {
1392    fn default() -> Self {
1393        Self {
1394            ref_id1: Self::UNKNOWN_REF,
1395            pos1: Self::UNKNOWN_POS,
1396            strand1: Self::UNKNOWN_STRAND,
1397            ref_id2: Self::UNKNOWN_REF,
1398            pos2: Self::UNKNOWN_POS,
1399            strand2: Self::UNKNOWN_STRAND,
1400            library_idx: 0,
1401            cell_hash: 0,
1402            name_hash: 0,
1403        }
1404    }
1405}
1406
1407// ============================================================================
1408// DecodedRecord - Record with pre-computed grouping key
1409// ============================================================================
1410
1411/// A decoded BAM record with its pre-computed grouping key.
1412///
1413/// This is the output of the Decode step and input to the Group step.
1414/// The key is computed during the parallel Decode step so that the
1415/// serial Group step only needs to do fast integer comparisons.
1416///
1417/// Uses an enum for the record data to avoid carrying both `RecordBuf` and
1418/// `Vec<u8>` — saving ~24 bytes per record in parsed mode and ~200 bytes
1419/// in raw mode.
1420#[derive(Debug)]
1421pub struct DecodedRecord {
1422    /// Pre-computed grouping key.
1423    pub key: GroupKey,
1424    /// The record data — either a parsed `RecordBuf` or raw bytes.
1425    pub(crate) data: DecodedRecordData,
1426}
1427
1428/// Record data: either a parsed noodles `RecordBuf` or raw BAM bytes.
1429#[derive(Debug)]
1430pub enum DecodedRecordData {
1431    Parsed(RecordBuf),
1432    Raw(Vec<u8>),
1433}
1434
1435impl DecodedRecord {
1436    /// Create a new decoded record with its grouping key.
1437    #[must_use]
1438    pub fn new(record: RecordBuf, key: GroupKey) -> Self {
1439        Self { key, data: DecodedRecordData::Parsed(record) }
1440    }
1441
1442    /// Create a decoded record from raw bytes, skipping noodles decode.
1443    #[must_use]
1444    pub fn from_raw_bytes(raw: Vec<u8>, key: GroupKey) -> Self {
1445        Self { key, data: DecodedRecordData::Raw(raw) }
1446    }
1447
1448    /// Returns the raw bytes if this is a raw-mode record.
1449    #[must_use]
1450    pub fn raw_bytes(&self) -> Option<&[u8]> {
1451        match &self.data {
1452            DecodedRecordData::Raw(v) => Some(v),
1453            DecodedRecordData::Parsed(_) => None,
1454        }
1455    }
1456
1457    /// Takes the raw bytes out if this is a raw-mode record.
1458    #[must_use]
1459    pub fn into_raw_bytes(self) -> Option<Vec<u8>> {
1460        match self.data {
1461            DecodedRecordData::Raw(v) => Some(v),
1462            DecodedRecordData::Parsed(_) => None,
1463        }
1464    }
1465
1466    /// Returns a reference to the `RecordBuf` if this is a parsed-mode record.
1467    #[must_use]
1468    pub fn record(&self) -> Option<&RecordBuf> {
1469        match &self.data {
1470            DecodedRecordData::Parsed(r) => Some(r),
1471            DecodedRecordData::Raw(_) => None,
1472        }
1473    }
1474
1475    /// Takes the `RecordBuf` out if this is a parsed-mode record.
1476    #[must_use]
1477    pub fn into_record(self) -> Option<RecordBuf> {
1478        match self.data {
1479            DecodedRecordData::Parsed(r) => Some(r),
1480            DecodedRecordData::Raw(_) => None,
1481        }
1482    }
1483}
1484
1485impl MemoryEstimate for DecodedRecord {
1486    fn estimate_heap_size(&self) -> usize {
1487        match &self.data {
1488            DecodedRecordData::Parsed(record) => {
1489                crate::template::estimate_record_buf_heap_size(record)
1490            }
1491            DecodedRecordData::Raw(raw) => raw.capacity(),
1492        }
1493    }
1494}
1495
1496impl MemoryEstimate for Vec<DecodedRecord> {
1497    fn estimate_heap_size(&self) -> usize {
1498        self.iter().map(MemoryEstimate::estimate_heap_size).sum::<usize>()
1499            + self.capacity() * std::mem::size_of::<DecodedRecord>()
1500    }
1501}
1502
1503impl MemoryEstimate for RecordBuf {
1504    fn estimate_heap_size(&self) -> usize {
1505        // Delegate to the detailed estimation in template.rs (single source of truth)
1506        crate::template::estimate_record_buf_heap_size(self)
1507    }
1508}
1509
1510impl MemoryEstimate for Vec<RecordBuf> {
1511    fn estimate_heap_size(&self) -> usize {
1512        self.iter().map(MemoryEstimate::estimate_heap_size).sum::<usize>()
1513            + self.capacity() * std::mem::size_of::<RecordBuf>()
1514    }
1515}
1516
1517impl MemoryEstimate for Vec<u8> {
1518    fn estimate_heap_size(&self) -> usize {
1519        self.capacity()
1520    }
1521}
1522
1523// ============================================================================
1524// GroupKeyConfig - Configuration for computing `GroupKey` during Decode
1525// ============================================================================
1526
1527/// Configuration for computing `GroupKey` during the Decode step.
1528///
1529/// When this is provided to the pipeline, the Decode step will compute
1530/// full `GroupKey` values for each record. This moves expensive computations
1531/// (CIGAR parsing, tag extraction) from the serial Group step to the parallel
1532/// Decode step.
1533#[derive(Debug, Clone)]
1534pub struct GroupKeyConfig {
1535    /// Library index for fast RG → library lookup.
1536    pub library_index: Arc<LibraryIndex>,
1537    /// Tag used for cell barcode extraction. None skips cell extraction.
1538    pub cell_tag: Option<Tag>,
1539    /// When true, skip noodles decode and work with raw BAM bytes.
1540    pub raw_byte_mode: bool,
1541}
1542
1543impl GroupKeyConfig {
1544    /// Create a new `GroupKeyConfig`.
1545    #[must_use]
1546    pub fn new(library_index: LibraryIndex, cell_tag: Tag) -> Self {
1547        Self {
1548            library_index: Arc::new(library_index),
1549            cell_tag: Some(cell_tag),
1550            raw_byte_mode: false,
1551        }
1552    }
1553
1554    /// Create a `GroupKeyConfig` for raw-byte mode.
1555    #[must_use]
1556    pub fn new_raw(library_index: LibraryIndex, cell_tag: Tag) -> Self {
1557        Self {
1558            library_index: Arc::new(library_index),
1559            cell_tag: Some(cell_tag),
1560            raw_byte_mode: true,
1561        }
1562    }
1563
1564    /// Create a `GroupKeyConfig` for raw-byte mode without cell barcode extraction.
1565    #[must_use]
1566    pub fn new_raw_no_cell(library_index: LibraryIndex) -> Self {
1567        Self { library_index: Arc::new(library_index), cell_tag: None, raw_byte_mode: true }
1568    }
1569}
1570
1571impl Default for GroupKeyConfig {
1572    fn default() -> Self {
1573        Self {
1574            library_index: Arc::new(LibraryIndex::default()),
1575            cell_tag: Some(Tag::from([b'C', b'B'])), // Default cell barcode tag
1576            raw_byte_mode: false,
1577        }
1578    }
1579}
1580
1581/// Decompressed data batch (output of Step 2, input to Step 3).
1582#[derive(Default)]
1583pub struct DecompressedBatch {
1584    /// Concatenated decompressed bytes from multiple BGZF blocks.
1585    pub data: Vec<u8>,
1586}
1587
1588impl DecompressedBatch {
1589    /// Create a new empty batch.
1590    #[must_use]
1591    pub fn new() -> Self {
1592        Self { data: Vec::new() }
1593    }
1594
1595    /// Create a batch with pre-allocated capacity.
1596    #[must_use]
1597    pub fn with_capacity(capacity: usize) -> Self {
1598        Self { data: Vec::with_capacity(capacity) }
1599    }
1600
1601    /// Returns true if the batch contains no data.
1602    #[must_use]
1603    pub fn is_empty(&self) -> bool {
1604        self.data.is_empty()
1605    }
1606
1607    /// Clear all data from this batch.
1608    pub fn clear(&mut self) {
1609        self.data.clear();
1610    }
1611}
1612
1613impl MemoryEstimate for DecompressedBatch {
1614    fn estimate_heap_size(&self) -> usize {
1615        self.data.capacity()
1616    }
1617}
1618
1619/// Serialized data batch (output of Step 5, input to Step 6).
1620#[derive(Default)]
1621pub struct SerializedBatch {
1622    /// Encoded BAM bytes ready for compression.
1623    pub data: Vec<u8>,
1624    /// Number of records/templates in this batch (for progress logging).
1625    pub record_count: u64,
1626    /// Optional secondary output data (e.g., rejected records).
1627    /// Passed through compression unchanged and written to a secondary output file.
1628    pub secondary_data: Option<Vec<u8>>,
1629}
1630
1631impl SerializedBatch {
1632    /// Create a new empty batch.
1633    #[must_use]
1634    pub fn new() -> Self {
1635        Self { data: Vec::new(), record_count: 0, secondary_data: None }
1636    }
1637
1638    /// Returns true if the batch contains no data.
1639    #[must_use]
1640    pub fn is_empty(&self) -> bool {
1641        self.data.is_empty()
1642    }
1643
1644    /// Clear all data from this batch.
1645    pub fn clear(&mut self) {
1646        self.data.clear();
1647        self.record_count = 0;
1648        self.secondary_data = None;
1649    }
1650}
1651
1652impl MemoryEstimate for SerializedBatch {
1653    fn estimate_heap_size(&self) -> usize {
1654        self.data.capacity() + self.secondary_data.as_ref().map_or(0, Vec::capacity)
1655    }
1656}
1657
1658// ============================================================================
1659// OutputPipelineQueues - Shared output-half state for both pipelines
1660// ============================================================================
1661
1662/// Shared output-half state for both BAM and FASTQ pipelines.
1663///
1664/// Contains all queues and state from Group step onwards:
1665/// Group → Process → Serialize → Compress → Write
1666///
1667/// # Type Parameters
1668/// - `G`: Group/template type (input to Process step)
1669/// - `P`: Processed type (output of Process step)
1670pub struct OutputPipelineQueues<G, P: MemoryEstimate> {
1671    // ========== Queue: Group → Process ==========
1672    /// Batches of groups/templates waiting to be processed.
1673    pub groups: ArrayQueue<(u64, Vec<G>)>,
1674    /// Current heap bytes in groups queue (stats/reporting only, not used for backpressure).
1675    /// Mutations are gated behind the `memory-debug` feature; reads are zero without the feature.
1676    pub groups_heap_bytes: AtomicU64,
1677
1678    // ========== Queue: Process → Serialize ==========
1679    /// Batches of processed data waiting for serialization.
1680    pub processed: ArrayQueue<(u64, Vec<P>)>,
1681    /// Current heap bytes in processed queue.
1682    pub processed_heap_bytes: AtomicU64,
1683
1684    // ========== Queue: Serialize → Compress ==========
1685    /// Serialized bytes waiting for compression.
1686    pub serialized: ArrayQueue<(u64, SerializedBatch)>,
1687    /// Current heap bytes in serialized queue.
1688    pub serialized_heap_bytes: AtomicU64,
1689
1690    // ========== Queue: Compress → Write (with reorder) ==========
1691    /// Compressed blocks waiting to be written.
1692    pub compressed: ArrayQueue<(u64, CompressedBlockBatch)>,
1693    /// Current heap bytes in compressed queue.
1694    pub compressed_heap_bytes: AtomicU64,
1695    /// Reorder buffer to ensure blocks are written in order.
1696    pub write_reorder: Mutex<ReorderBuffer<CompressedBlockBatch>>,
1697
1698    // ========== Output ==========
1699    /// Output file, mutex-protected for exclusive access.
1700    pub output: Mutex<Option<Box<dyn Write + Send>>>,
1701    /// Optional secondary output writer for dual-output pipelines (e.g., reject BAM).
1702    pub secondary_output: Option<Mutex<Option<crate::bam_io::RawBamWriter>>>,
1703
1704    // ========== Completion and Error Tracking ==========
1705    /// Flag indicating an error occurred.
1706    pub error_flag: AtomicBool,
1707    /// Storage for the first error.
1708    pub error: Mutex<Option<io::Error>>,
1709    /// Count of items written (groups for BAM, templates for FASTQ).
1710    pub items_written: AtomicU64,
1711    /// Flag indicating pipeline is draining (input complete, finishing up).
1712    pub draining: AtomicBool,
1713    /// Progress tracker for logging.
1714    pub progress: ProgressTracker,
1715
1716    // ========== Performance Statistics ==========
1717    /// Optional performance statistics collector.
1718    pub stats: Option<Arc<PipelineStats>>,
1719}
1720
1721impl<G: Send, P: Send + MemoryEstimate> OutputPipelineQueues<G, P> {
1722    /// Create new output pipeline queues.
1723    ///
1724    /// # Arguments
1725    /// - `queue_capacity`: Capacity for all `ArrayQueue`s
1726    /// - `output`: Output writer (will be wrapped in Mutex)
1727    /// - `stats`: Optional performance statistics collector
1728    /// - `progress_name`: Name for the progress tracker (e.g., "Processed records")
1729    #[must_use]
1730    pub fn new(
1731        queue_capacity: usize,
1732        output: Box<dyn Write + Send>,
1733        stats: Option<Arc<PipelineStats>>,
1734        progress_name: &str,
1735    ) -> Self {
1736        Self {
1737            groups: ArrayQueue::new(queue_capacity),
1738            groups_heap_bytes: AtomicU64::new(0),
1739            processed: ArrayQueue::new(queue_capacity),
1740            processed_heap_bytes: AtomicU64::new(0),
1741            serialized: ArrayQueue::new(queue_capacity),
1742            serialized_heap_bytes: AtomicU64::new(0),
1743            compressed: ArrayQueue::new(queue_capacity),
1744            compressed_heap_bytes: AtomicU64::new(0),
1745            write_reorder: Mutex::new(ReorderBuffer::new()),
1746            output: Mutex::new(Some(output)),
1747            secondary_output: None,
1748            error_flag: AtomicBool::new(false),
1749            error: Mutex::new(None),
1750            items_written: AtomicU64::new(0),
1751            draining: AtomicBool::new(false),
1752            progress: ProgressTracker::new(progress_name).with_interval(PROGRESS_LOG_INTERVAL),
1753            stats,
1754        }
1755    }
1756
1757    /// Set a secondary output writer for dual-output pipelines.
1758    ///
1759    /// Must be called before the pipeline is started.
1760    pub fn set_secondary_output(&mut self, writer: crate::bam_io::RawBamWriter) {
1761        self.secondary_output = Some(Mutex::new(Some(writer)));
1762    }
1763
1764    // ========== Error Handling ==========
1765
1766    /// Record an error and signal threads to stop.
1767    pub fn set_error(&self, error: io::Error) {
1768        self.error_flag.store(true, Ordering::SeqCst);
1769        let mut guard = self.error.lock();
1770        if guard.is_none() {
1771            *guard = Some(error);
1772        }
1773    }
1774
1775    /// Check if an error has occurred.
1776    #[must_use]
1777    pub fn has_error(&self) -> bool {
1778        self.error_flag.load(Ordering::Relaxed)
1779    }
1780
1781    /// Take the stored error.
1782    pub fn take_error(&self) -> Option<io::Error> {
1783        self.error.lock().take()
1784    }
1785
1786    // ========== Memory Backpressure ==========
1787
1788    /// Check if processed queue memory is at the backpressure threshold.
1789    #[must_use]
1790    pub fn is_processed_memory_high(&self) -> bool {
1791        self.processed_heap_bytes.load(Ordering::Acquire) >= Q5_BACKPRESSURE_THRESHOLD_BYTES
1792    }
1793
1794    /// Check if pipeline is in drain mode (bypasses memory backpressure).
1795    #[must_use]
1796    pub fn is_draining(&self) -> bool {
1797        self.draining.load(Ordering::Relaxed)
1798    }
1799
1800    /// Set the draining flag.
1801    pub fn set_draining(&self, value: bool) {
1802        self.draining.store(value, Ordering::Relaxed);
1803    }
1804
1805    /// Check backpressure for Process step (queue full OR memory high, unless draining).
1806    #[must_use]
1807    pub fn should_apply_process_backpressure(&self) -> bool {
1808        self.processed.is_full() || (!self.is_draining() && self.is_processed_memory_high())
1809    }
1810
1811    // ========== Queue Depths ==========
1812
1813    /// Get current lengths of all output queues.
1814    #[must_use]
1815    pub fn queue_depths(&self) -> OutputQueueDepths {
1816        OutputQueueDepths {
1817            groups: self.groups.len(),
1818            processed: self.processed.len(),
1819            serialized: self.serialized.len(),
1820            compressed: self.compressed.len(),
1821        }
1822    }
1823
1824    /// Check if all output queues are empty.
1825    #[must_use]
1826    pub fn are_queues_empty(&self) -> bool {
1827        self.groups.is_empty()
1828            && self.processed.is_empty()
1829            && self.serialized.is_empty()
1830            && self.compressed.is_empty()
1831            && self.write_reorder.lock().is_empty()
1832    }
1833}
1834
1835/// Snapshot of output queue depths.
1836#[derive(Debug, Clone, Copy)]
1837pub struct OutputQueueDepths {
1838    pub groups: usize,
1839    pub processed: usize,
1840    pub serialized: usize,
1841    pub compressed: usize,
1842}
1843
1844// ============================================================================
1845// WorkerCoreState - Shared worker state for both pipelines
1846// ============================================================================
1847
1848/// Minimum backoff duration in microseconds.
1849pub const MIN_BACKOFF_US: u64 = 10;
1850/// Maximum backoff duration in microseconds (1ms).
1851pub const MAX_BACKOFF_US: u64 = 1000;
1852/// Default serialization buffer capacity (256KB).
1853pub const SERIALIZATION_BUFFER_CAPACITY: usize = 256 * 1024;
1854
1855/// Core state shared by all worker threads in both BAM and FASTQ pipelines.
1856///
1857/// This struct contains the common fields that every pipeline worker needs:
1858/// compressor, scheduler, serialization buffer, and adaptive backoff state.
1859///
1860/// Pipeline-specific worker states (like held items for different queue stages)
1861/// can embed this struct and add their own fields.
1862///
1863/// # Example
1864///
1865/// ```ignore
1866/// pub struct MyPipelineWorkerState<P: Send> {
1867///     pub core: WorkerCoreState,
1868///     pub held_processed: Option<(u64, Vec<P>, usize)>,
1869///     // ... pipeline-specific held items
1870/// }
1871/// ```
1872pub struct WorkerCoreState {
1873    /// Compressor for output BGZF blocks.
1874    pub compressor: InlineBgzfCompressor,
1875    /// Scheduler for step selection (strategy-based).
1876    pub scheduler: Box<dyn Scheduler>,
1877    /// Reusable buffer for serialization.
1878    /// Swapped out each batch to avoid per-batch allocation.
1879    pub serialization_buffer: Vec<u8>,
1880    /// Reusable buffer for secondary serialization (e.g., reject output).
1881    /// Only allocated when a secondary serialize function is used.
1882    pub secondary_serialization_buffer: Vec<u8>,
1883    /// Current backoff duration in microseconds (for adaptive backoff).
1884    pub backoff_us: u64,
1885    /// Recycled serialization buffers to avoid repeated allocation.
1886    /// Populated by Compress step after consuming `SerializedBatch` data.
1887    /// Kept small (max 2) to limit memory overhead.
1888    pub recycled_buffers: Vec<Vec<u8>>,
1889}
1890
1891impl WorkerCoreState {
1892    /// Create new worker core state.
1893    ///
1894    /// # Arguments
1895    /// * `compression_level` - BGZF compression level (0-12)
1896    /// * `thread_id` - Thread index (0-based)
1897    /// * `num_threads` - Total number of worker threads
1898    /// * `scheduler_strategy` - Strategy for step scheduling
1899    /// * `active_steps` - Which pipeline steps are active
1900    #[must_use]
1901    pub fn new(
1902        compression_level: u32,
1903        thread_id: usize,
1904        num_threads: usize,
1905        scheduler_strategy: SchedulerStrategy,
1906        active_steps: ActiveSteps,
1907    ) -> Self {
1908        Self {
1909            compressor: InlineBgzfCompressor::new(compression_level),
1910            scheduler: create_scheduler(scheduler_strategy, thread_id, num_threads, active_steps),
1911            serialization_buffer: Vec::with_capacity(SERIALIZATION_BUFFER_CAPACITY),
1912            secondary_serialization_buffer: Vec::new(),
1913            backoff_us: MIN_BACKOFF_US,
1914            recycled_buffers: Vec::with_capacity(2),
1915        }
1916    }
1917
1918    /// Reset backoff to minimum (after successful work).
1919    #[inline]
1920    pub fn reset_backoff(&mut self) {
1921        self.backoff_us = MIN_BACKOFF_US;
1922    }
1923
1924    /// Increase backoff exponentially (after no work done).
1925    #[inline]
1926    pub fn increase_backoff(&mut self) {
1927        self.backoff_us = (self.backoff_us * 2).min(MAX_BACKOFF_US);
1928    }
1929
1930    /// Sleep for the current backoff duration with jitter.
1931    ///
1932    /// Uses `yield_now()` for minimum backoff to avoid sleep syscall overhead.
1933    /// Adds jitter (±25%) to prevent thundering herd synchronization.
1934    #[inline]
1935    pub fn sleep_backoff(&self) {
1936        if self.backoff_us <= MIN_BACKOFF_US {
1937            // At minimum backoff, yield is cheaper than sleep
1938            std::thread::yield_now();
1939        } else {
1940            // Add jitter to prevent thundering herd (±25%)
1941            // Use thread ID hash + time for simple pseudo-random jitter
1942            let jitter_range = self.backoff_us / 4;
1943            let jitter_seed = u64::from(
1944                std::time::SystemTime::now()
1945                    .duration_since(std::time::UNIX_EPOCH)
1946                    .map(|d| d.subsec_nanos())
1947                    .unwrap_or(0),
1948            );
1949            // Simple hash: mix in some bits to vary across threads
1950            let jitter_offset = (jitter_seed % (jitter_range * 2)).saturating_sub(jitter_range);
1951            let actual_us = self.backoff_us.saturating_add(jitter_offset).max(MIN_BACKOFF_US);
1952            std::thread::sleep(std::time::Duration::from_micros(actual_us));
1953        }
1954    }
1955
1956    /// Take a recycled buffer or allocate a new one with the given capacity.
1957    #[inline]
1958    pub fn take_or_alloc_buffer(&mut self, capacity: usize) -> Vec<u8> {
1959        if let Some(mut buf) = self.recycled_buffers.pop() {
1960            buf.clear();
1961            // After clear(), len=0, so reserve(capacity) guarantees
1962            // buf.capacity() >= capacity.
1963            buf.reserve(capacity);
1964            buf
1965        } else {
1966            Vec::with_capacity(capacity)
1967        }
1968    }
1969
1970    /// Return a buffer for recycling. Keeps at most 2 buffers to limit memory.
1971    #[inline]
1972    pub fn recycle_buffer(&mut self, buf: Vec<u8>) {
1973        if self.recycled_buffers.len() < 2 {
1974            self.recycled_buffers.push(buf);
1975        }
1976    }
1977}
1978
1979impl HasCompressor for WorkerCoreState {
1980    fn compressor_mut(&mut self) -> &mut InlineBgzfCompressor {
1981        &mut self.compressor
1982    }
1983}
1984
1985impl HasRecycledBuffers for WorkerCoreState {
1986    fn take_or_alloc_buffer(&mut self, capacity: usize) -> Vec<u8> {
1987        self.take_or_alloc_buffer(capacity)
1988    }
1989
1990    fn recycle_buffer(&mut self, buf: Vec<u8>) {
1991        self.recycle_buffer(buf);
1992    }
1993}
1994
1995// ============================================================================
1996// Pipeline Validation - Post-shutdown data loss detection
1997// ============================================================================
1998
1999/// Error returned when pipeline validation detects data loss or inconsistency.
2000///
2001/// This error provides detailed diagnostics about what went wrong, including
2002/// which queues still contain data, which counters don't match, and how much
2003/// memory is leaked in tracking.
2004#[derive(Debug, Clone)]
2005pub struct PipelineValidationError {
2006    /// Names of queues that are not empty at completion.
2007    pub non_empty_queues: Vec<String>,
2008    /// Descriptions of counter mismatches.
2009    pub counter_mismatches: Vec<String>,
2010    /// Total heap bytes still tracked (should be 0 at completion).
2011    pub leaked_heap_bytes: u64,
2012}
2013
2014impl std::fmt::Display for PipelineValidationError {
2015    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2016        writeln!(f, "Pipeline validation failed - potential data loss detected:")?;
2017        if !self.non_empty_queues.is_empty() {
2018            writeln!(f, "  Non-empty queues: {}", self.non_empty_queues.join(", "))?;
2019        }
2020        if !self.counter_mismatches.is_empty() {
2021            writeln!(f, "  Counter mismatches:")?;
2022            for mismatch in &self.counter_mismatches {
2023                writeln!(f, "    - {mismatch}")?;
2024            }
2025        }
2026        if self.leaked_heap_bytes > 0 {
2027            writeln!(f, "  Leaked heap bytes: {}", self.leaked_heap_bytes)?;
2028        }
2029        Ok(())
2030    }
2031}
2032
2033impl std::error::Error for PipelineValidationError {}
2034
2035// ============================================================================
2036// PipelineLifecycle Trait - Common patterns for pipeline state management
2037// ============================================================================
2038
2039/// Trait for common pipeline lifecycle operations.
2040///
2041/// This trait captures the patterns shared between BAM and FASTQ pipelines:
2042/// - Completion checking
2043/// - Error handling
2044/// - Drain mode management
2045/// - Output queue access
2046///
2047/// Implementing this trait allows sharing monitor thread logic, worker loop
2048/// exit conditions, and pipeline finalization code.
2049pub trait PipelineLifecycle {
2050    /// Check if the pipeline has completed all work.
2051    ///
2052    /// Returns true when:
2053    /// 1. Input is fully consumed (`read_done`, `group_done`)
2054    /// 2. All queues are empty
2055    /// 3. All reorder buffers are empty
2056    fn is_complete(&self) -> bool;
2057
2058    /// Check if an error has occurred.
2059    fn has_error(&self) -> bool;
2060
2061    /// Take the stored error (if any).
2062    fn take_error(&self) -> Option<io::Error>;
2063
2064    /// Set an error on the pipeline.
2065    fn set_error(&self, error: io::Error);
2066
2067    /// Check if the pipeline is in drain mode.
2068    ///
2069    /// When draining, memory-based backpressure is bypassed to prevent
2070    /// deadlock during pipeline completion.
2071    fn is_draining(&self) -> bool;
2072
2073    /// Set the draining flag.
2074    fn set_draining(&self, value: bool);
2075
2076    /// Get reference to optional pipeline statistics.
2077    fn stats(&self) -> Option<&PipelineStats>;
2078
2079    /// Get reference to the progress tracker.
2080    fn progress(&self) -> &ProgressTracker;
2081
2082    /// Get the number of items written.
2083    fn items_written(&self) -> u64;
2084
2085    /// Flush the output writer and finalize.
2086    ///
2087    /// # Errors
2088    ///
2089    /// Returns an I/O error if flushing fails.
2090    fn flush_output(&self) -> io::Result<()>;
2091
2092    /// Validate pipeline completion to detect data loss.
2093    ///
2094    /// This method should be called after the pipeline has completed to verify:
2095    /// 1. All queues are empty (no data stuck in transit)
2096    /// 2. All batch counters match (no batches lost between stages)
2097    /// 3. Internal buffers are empty (grouper state, boundary leftovers)
2098    ///
2099    /// Note: Heap byte tracking is reported in `PipelineValidationError::leaked_heap_bytes`
2100    /// but is currently advisory only (implementations set it to 0) because estimation
2101    /// can be imprecise. Only queue emptiness and counter checks cause validation failure.
2102    ///
2103    /// Returns `Ok(())` if validation passes, or `Err(PipelineValidationError)`
2104    /// with detailed diagnostics if any issues are detected.
2105    ///
2106    /// # Errors
2107    ///
2108    /// Returns `PipelineValidationError` if validation detects data loss or inconsistency.
2109    fn validate_completion(&self) -> Result<(), PipelineValidationError>;
2110}
2111
2112/// Monitor thread exit condition - returns true when pipeline should stop.
2113///
2114/// This helper function encapsulates the common exit condition check for
2115/// monitor threads in both pipelines.
2116#[inline]
2117pub fn should_monitor_exit<P: PipelineLifecycle>(pipeline: &P) -> bool {
2118    pipeline.is_complete() || pipeline.has_error()
2119}
2120
2121/// Trait for states that support monitor thread operations.
2122///
2123/// This trait extends `PipelineLifecycle` with deadlock detection capabilities,
2124/// allowing shared monitor thread logic between BAM and FASTQ pipelines.
2125pub trait MonitorableState: PipelineLifecycle {
2126    /// Get reference to the deadlock state.
2127    fn deadlock_state(&self) -> &DeadlockState;
2128
2129    /// Build a queue snapshot for deadlock detection.
2130    ///
2131    /// This method captures the current state of all queues and reorder buffers
2132    /// for deadlock analysis. The implementation is pipeline-specific since
2133    /// BAM and FASTQ have different queue structures.
2134    fn build_queue_snapshot(&self) -> QueueSnapshot;
2135}
2136
2137/// Run monitor loop until pipeline completes or errors.
2138///
2139/// This function provides the common monitor thread pattern:
2140/// 1. Sleep for the sample interval
2141/// 2. Check exit condition (complete or error)
2142/// 3. Call the pipeline-specific sampling callback
2143/// 4. Periodically perform deadlock detection
2144///
2145/// # Arguments
2146/// - `state`: Arc-wrapped pipeline state implementing `MonitorableState`
2147/// - `sample_interval_ms`: Milliseconds between samples (typically 100)
2148/// - `deadlock_check_samples`: Number of samples between deadlock checks (10 = ~1 second)
2149/// - `on_sample`: Callback for pipeline-specific per-sample work (stats collection, logging)
2150///
2151/// # Example
2152/// ```ignore
2153/// run_monitor_loop(&state, 100, 10, |s| {
2154///     if let Some(ref stats) = s.stats() {
2155///         stats.add_queue_sample(...);
2156///     }
2157/// });
2158/// ```
2159pub fn run_monitor_loop<S, F>(
2160    state: &Arc<S>,
2161    sample_interval_ms: u64,
2162    deadlock_check_samples: u32,
2163    on_sample: F,
2164) where
2165    S: MonitorableState,
2166    F: Fn(&S),
2167{
2168    let mut deadlock_counter = 0u32;
2169    loop {
2170        thread::sleep(Duration::from_millis(sample_interval_ms));
2171
2172        if should_monitor_exit(state.as_ref()) {
2173            break;
2174        }
2175
2176        // Call pipeline-specific sampling function
2177        on_sample(state.as_ref());
2178
2179        // Deadlock check at specified interval
2180        if state.deadlock_state().is_enabled() {
2181            deadlock_counter += 1;
2182            if deadlock_counter >= deadlock_check_samples {
2183                deadlock_counter = 0;
2184                let snapshot = state.build_queue_snapshot();
2185                check_deadlock_and_restore(state.deadlock_state(), &snapshot);
2186            }
2187        }
2188    }
2189}
2190
2191// ============================================================================
2192// Panic Handling Helpers
2193// ============================================================================
2194
2195/// Extract a human-readable message from a panic payload.
2196///
2197/// This is used to provide useful error messages when worker threads panic.
2198/// It handles the common cases of &str and String panic payloads, with a
2199/// fallback for other types.
2200#[must_use]
2201#[allow(clippy::needless_pass_by_value)]
2202pub fn extract_panic_message(panic_info: Box<dyn std::any::Any + Send>) -> String {
2203    if let Some(s) = panic_info.downcast_ref::<&str>() {
2204        (*s).to_string()
2205    } else if let Some(s) = panic_info.downcast_ref::<String>() {
2206        s.clone()
2207    } else {
2208        "Unknown panic".to_string()
2209    }
2210}
2211
2212/// Handle a worker thread panic by setting an error on the pipeline state.
2213///
2214/// This combines panic message extraction with error setting for cleaner
2215/// worker thread spawning code.
2216pub fn handle_worker_panic<S: PipelineLifecycle>(
2217    state: &S,
2218    thread_id: usize,
2219    panic_info: Box<dyn std::any::Any + Send>,
2220) {
2221    let msg = extract_panic_message(panic_info);
2222    state.set_error(io::Error::other(format!("Worker thread {thread_id} panicked: {msg}")));
2223}
2224
2225// ============================================================================
2226// Pipeline Finalization Helpers
2227// ============================================================================
2228
2229/// Join all worker threads, waiting for completion.
2230///
2231/// Returns an error if any thread panicked without setting an error on the state.
2232///
2233/// # Errors
2234///
2235/// Returns an I/O error if any worker thread panicked.
2236pub fn join_worker_threads(handles: Vec<thread::JoinHandle<()>>) -> io::Result<()> {
2237    for handle in handles {
2238        handle.join().map_err(|_| io::Error::other("Worker thread panicked"))?;
2239    }
2240    Ok(())
2241}
2242
2243/// Join the monitor thread if present.
2244pub fn join_monitor_thread(handle: Option<thread::JoinHandle<()>>) {
2245    if let Some(h) = handle {
2246        let _ = h.join();
2247    }
2248}
2249
2250/// Finalize a pipeline after all threads have completed.
2251///
2252/// This function:
2253/// 1. Checks for errors that occurred during processing
2254/// 2. Flushes the output writer
2255/// 3. Logs pipeline statistics if enabled
2256/// 4. Returns the count of items written
2257///
2258/// # Arguments
2259/// - `state`: The pipeline state implementing `PipelineLifecycle`
2260///
2261/// # Returns
2262/// - `Ok(items_written)` on success
2263/// - `Err(error)` if an error occurred during processing
2264///
2265/// # Errors
2266///
2267/// Returns an I/O error if an error occurred during processing or output flush fails.
2268pub fn finalize_pipeline<S: PipelineLifecycle>(state: &S) -> io::Result<u64> {
2269    // Check for errors
2270    if let Some(error) = state.take_error() {
2271        return Err(error);
2272    }
2273
2274    // Validate pipeline completion to detect data loss
2275    state.validate_completion().map_err(io::Error::other)?;
2276
2277    // Flush output
2278    state.flush_output()?;
2279
2280    // Log pipeline statistics if enabled
2281    if let Some(stats) = state.stats() {
2282        stats.log_summary();
2283    }
2284
2285    Ok(state.items_written())
2286}
2287
2288// ============================================================================
2289// Trait Implementations for OutputPipelineQueues
2290// ============================================================================
2291
2292impl<G: Send + 'static, P: Send + MemoryEstimate + 'static> ProcessPipelineState<G, P>
2293    for OutputPipelineQueues<G, P>
2294{
2295    fn process_input_pop(&self) -> Option<(u64, Vec<G>)> {
2296        self.groups.pop()
2297    }
2298
2299    fn process_output_is_full(&self) -> bool {
2300        self.processed.is_full()
2301    }
2302
2303    fn process_output_push(&self, item: (u64, Vec<P>)) -> Result<(), (u64, Vec<P>)> {
2304        let heap_size: usize = item.1.iter().map(MemoryEstimate::estimate_heap_size).sum();
2305        let result = self.processed.push(item);
2306        if result.is_ok() {
2307            self.processed_heap_bytes.fetch_add(heap_size as u64, Ordering::AcqRel);
2308        }
2309        result
2310    }
2311
2312    fn has_error(&self) -> bool {
2313        self.error_flag.load(Ordering::Acquire)
2314    }
2315
2316    fn set_error(&self, error: io::Error) {
2317        OutputPipelineQueues::set_error(self, error);
2318    }
2319
2320    fn should_apply_process_backpressure(&self) -> bool {
2321        OutputPipelineQueues::should_apply_process_backpressure(self)
2322    }
2323
2324    fn is_draining(&self) -> bool {
2325        OutputPipelineQueues::is_draining(self)
2326    }
2327}
2328
2329impl<G: Send + 'static, P: Send + MemoryEstimate + 'static> SerializePipelineState<P>
2330    for OutputPipelineQueues<G, P>
2331{
2332    fn serialize_input_pop(&self) -> Option<(u64, Vec<P>)> {
2333        let result = self.processed.pop();
2334        if let Some((_, ref batch)) = result {
2335            let heap_size: usize = batch.iter().map(MemoryEstimate::estimate_heap_size).sum();
2336            self.processed_heap_bytes.fetch_sub(heap_size as u64, Ordering::AcqRel);
2337        }
2338        result
2339    }
2340
2341    fn serialize_output_is_full(&self) -> bool {
2342        self.serialized.is_full()
2343    }
2344
2345    fn serialize_output_push(
2346        &self,
2347        item: (u64, SerializedBatch),
2348    ) -> Result<(), (u64, SerializedBatch)> {
2349        let heap_size = item.1.estimate_heap_size();
2350        let result = self.serialized.push(item);
2351        if result.is_ok() {
2352            self.serialized_heap_bytes.fetch_add(heap_size as u64, Ordering::AcqRel);
2353        }
2354        result
2355    }
2356
2357    fn has_error(&self) -> bool {
2358        self.error_flag.load(Ordering::Acquire)
2359    }
2360
2361    fn set_error(&self, error: io::Error) {
2362        OutputPipelineQueues::set_error(self, error);
2363    }
2364
2365    fn record_serialized_bytes(&self, bytes: u64) {
2366        if let Some(ref stats) = self.stats {
2367            stats.serialized_bytes.fetch_add(bytes, Ordering::Relaxed);
2368        }
2369    }
2370}
2371
2372impl<G: Send + 'static, P: Send + MemoryEstimate + 'static> WritePipelineState
2373    for OutputPipelineQueues<G, P>
2374{
2375    fn write_input_queue(&self) -> &ArrayQueue<(u64, CompressedBlockBatch)> {
2376        &self.compressed
2377    }
2378
2379    fn write_reorder_buffer(&self) -> &Mutex<ReorderBuffer<CompressedBlockBatch>> {
2380        &self.write_reorder
2381    }
2382
2383    fn write_output(&self) -> &Mutex<Option<Box<dyn Write + Send>>> {
2384        &self.output
2385    }
2386
2387    fn has_error(&self) -> bool {
2388        self.error_flag.load(Ordering::Acquire)
2389    }
2390
2391    fn set_error(&self, error: io::Error) {
2392        OutputPipelineQueues::set_error(self, error);
2393    }
2394
2395    fn record_written(&self, count: u64) {
2396        self.items_written.fetch_add(count, Ordering::Release);
2397    }
2398
2399    fn stats(&self) -> Option<&PipelineStats> {
2400        self.stats.as_deref()
2401    }
2402}
2403
2404impl<G: Send + 'static, P: Send + MemoryEstimate + 'static> OutputPipelineState
2405    for OutputPipelineQueues<G, P>
2406{
2407    type Processed = P;
2408
2409    fn has_error(&self) -> bool {
2410        self.error_flag.load(Ordering::Acquire)
2411    }
2412
2413    fn set_error(&self, error: io::Error) {
2414        OutputPipelineQueues::set_error(self, error);
2415    }
2416
2417    fn q5_pop(&self) -> Option<(u64, SerializedBatch)> {
2418        self.serialized.pop()
2419    }
2420
2421    fn q5_push(&self, item: (u64, SerializedBatch)) -> Result<(), (u64, SerializedBatch)> {
2422        self.serialized.push(item)
2423    }
2424
2425    fn q5_is_full(&self) -> bool {
2426        self.serialized.is_full()
2427    }
2428
2429    fn q5_track_pop(&self, heap_size: u64) {
2430        self.serialized_heap_bytes.fetch_sub(heap_size, Ordering::AcqRel);
2431    }
2432
2433    fn q6_pop(&self) -> Option<(u64, CompressedBlockBatch)> {
2434        self.compressed.pop()
2435    }
2436
2437    fn q6_push(
2438        &self,
2439        item: (u64, CompressedBlockBatch),
2440    ) -> Result<(), (u64, CompressedBlockBatch)> {
2441        self.compressed.push(item)
2442    }
2443
2444    fn q6_is_full(&self) -> bool {
2445        self.compressed.is_full()
2446    }
2447
2448    fn q6_reorder_insert(&self, serial: u64, batch: CompressedBlockBatch) {
2449        self.write_reorder.lock().insert(serial, batch);
2450    }
2451
2452    fn q6_reorder_try_pop_next(&self) -> Option<CompressedBlockBatch> {
2453        self.write_reorder.lock().try_pop_next()
2454    }
2455
2456    fn output_try_lock(
2457        &self,
2458    ) -> Option<parking_lot::MutexGuard<'_, Option<Box<dyn Write + Send>>>> {
2459        self.output.try_lock()
2460    }
2461
2462    fn increment_written(&self) -> u64 {
2463        self.items_written.fetch_add(1, Ordering::Release)
2464    }
2465
2466    fn record_compressed_bytes_out(&self, bytes: u64) {
2467        if let Some(ref stats) = self.stats {
2468            stats.compressed_bytes_out.fetch_add(bytes, Ordering::Relaxed);
2469        }
2470    }
2471
2472    fn record_q6_pop_progress(&self) {
2473        // Deadlock tracking handled by caller if needed
2474    }
2475
2476    fn record_q7_push_progress(&self) {
2477        // Deadlock tracking handled by caller if needed
2478    }
2479
2480    fn stats(&self) -> Option<&PipelineStats> {
2481        self.stats.as_deref()
2482    }
2483}
2484
2485/// Unit type always has zero heap size (used in tests).
2486impl MemoryEstimate for () {
2487    fn estimate_heap_size(&self) -> usize {
2488        0
2489    }
2490}
2491
2492/// Configuration for the BAM pipeline.
2493#[derive(Debug, Clone)]
2494pub struct PipelineConfig {
2495    /// Number of threads in the pool.
2496    pub num_threads: usize,
2497    /// Capacity for each queue.
2498    pub queue_capacity: usize,
2499    /// Low water mark - prioritize reading when Q1 is below this.
2500    pub input_low_water: usize,
2501    /// High water mark - prioritize writing when Q6 is above this.
2502    pub output_high_water: usize,
2503    /// Compression level for BGZF output (0-12).
2504    pub compression_level: u32,
2505    /// Number of raw BGZF blocks to read per batch.
2506    pub blocks_per_read_batch: usize,
2507    /// Whether to collect pipeline performance statistics.
2508    pub collect_stats: bool,
2509    /// Number of groups to batch before processing (1 = no batching).
2510    ///
2511    /// This is critical for performance when work units are small. Each work unit
2512    /// goes through compress → write individually, and the compress step flushes
2513    /// after each unit, creating BGZF blocks. Small work units (~100-500 bytes)
2514    /// create tiny blocks instead of optimal 64KB blocks.
2515    ///
2516    /// - **`batch_size=1`**: For commands with naturally large work units
2517    ///   (e.g., `group` with `RawPositionGroups` containing many records).
2518    /// - **`batch_size>1`**: For commands with small work units
2519    ///   (e.g., `filter` with single records). Use ~400 for ~60KB batches.
2520    pub batch_size: usize,
2521    /// Target templates per batch for weight-based batching.
2522    ///
2523    /// When set to non-zero, groups are batched by total template count rather
2524    /// than group count. This provides consistent batch sizes across datasets
2525    /// with varying templates-per-group ratios.
2526    ///
2527    /// - **`target_templates_per_batch=0`**: Use `batch_size` for group-count batching.
2528    /// - **`target_templates_per_batch>0`**: Accumulate groups until total templates >= this value.
2529    ///
2530    /// Recommended value: 500 templates per batch for good performance.
2531    pub target_templates_per_batch: usize,
2532    /// Whether the BAM header has already been read from the input stream.
2533    ///
2534    /// When true, the boundary finder will skip header parsing since the header
2535    /// has already been consumed by the caller (e.g., for streaming from stdin).
2536    /// This enables streaming input support where the header must be read once
2537    /// and passed separately.
2538    pub header_already_read: bool,
2539    /// Scheduler strategy for thread work assignment.
2540    ///
2541    /// - `FixedPriority` (default): Thread roles are fixed (reader, writer, workers).
2542    /// - `ChaseBottleneck`: Threads dynamically follow work through the pipeline.
2543    pub scheduler_strategy: SchedulerStrategy,
2544    /// Memory limit for queue contents in bytes. 0 = no limit.
2545    ///
2546    /// When set, the pipeline will apply backpressure (refuse to push to queues)
2547    /// when total queue memory exceeds this limit. This prevents unbounded memory
2548    /// growth in threaded pipelines processing large datasets.
2549    ///
2550    /// Recommended values:
2551    /// - 0: No limit (default, uses count-based queue capacity only)
2552    /// - `500_000_000`: 500 MB limit
2553    /// - `1_000_000_000`: 1 GB limit
2554    /// - `2_000_000_000`: 2 GB limit
2555    pub queue_memory_limit: u64,
2556    /// Deadlock detection timeout in seconds (default 10, 0 = disabled).
2557    ///
2558    /// When no progress is made for this duration, a warning is logged with
2559    /// diagnostic info (queue depths, memory usage, per-queue timestamps).
2560    pub deadlock_timeout_secs: u64,
2561    /// Whether automatic deadlock recovery is enabled (default false).
2562    ///
2563    /// When enabled, uses progressive doubling: 2x -> 4x -> 8x -> unbind,
2564    /// then restores toward original limits after 30s of sustained progress.
2565    pub deadlock_recover_enabled: bool,
2566    /// Shared statistics instance for external memory monitoring.
2567    /// When provided, the pipeline will use this instead of creating its own.
2568    pub shared_stats: Option<Arc<PipelineStats>>,
2569}
2570
2571impl PipelineConfig {
2572    /// Create a new configuration with the specified thread count and compression level.
2573    #[must_use]
2574    pub fn new(num_threads: usize, compression_level: u32) -> Self {
2575        Self {
2576            num_threads: num_threads.max(1),
2577            queue_capacity: 64,
2578            input_low_water: 8,
2579            output_high_water: 32,
2580            compression_level,
2581            blocks_per_read_batch: 16,
2582            collect_stats: false,
2583            batch_size: 1,
2584            target_templates_per_batch: 0, // 0 = use batch_size instead
2585            header_already_read: false,
2586            scheduler_strategy: SchedulerStrategy::default(),
2587            queue_memory_limit: 0,           // No limit by default
2588            deadlock_timeout_secs: 10,       // Default 10 second timeout
2589            deadlock_recover_enabled: false, // Detection only by default
2590            shared_stats: None,              // No shared stats by default
2591        }
2592    }
2593
2594    /// Set the compression level.
2595    #[must_use]
2596    pub fn with_compression_level(mut self, level: u32) -> Self {
2597        self.compression_level = level;
2598        self
2599    }
2600
2601    /// Set blocks per read batch.
2602    #[must_use]
2603    pub fn with_blocks_per_batch(mut self, blocks: usize) -> Self {
2604        self.blocks_per_read_batch = blocks;
2605        self
2606    }
2607
2608    /// Enable or disable performance statistics collection.
2609    #[must_use]
2610    pub fn with_stats(mut self, collect: bool) -> Self {
2611        self.collect_stats = collect;
2612        self
2613    }
2614
2615    /// Set custom statistics instance for memory debugging.
2616    /// This allows external monitoring to access the same stats used by the pipeline.
2617    #[must_use]
2618    pub fn with_shared_stats(mut self, stats: Arc<PipelineStats>) -> Self {
2619        self.collect_stats = true; // Enable stats collection
2620        self.shared_stats = Some(stats);
2621        self
2622    }
2623
2624    /// Set the batch size for grouping work units before processing.
2625    #[must_use]
2626    pub fn with_batch_size(mut self, size: usize) -> Self {
2627        self.batch_size = size.max(1);
2628        self
2629    }
2630
2631    /// Set the target templates per batch for weight-based batching.
2632    ///
2633    /// When set to non-zero, groups are batched by total template count rather
2634    /// than group count. Set to 0 to use `batch_size` instead.
2635    #[must_use]
2636    pub fn with_target_templates_per_batch(mut self, count: usize) -> Self {
2637        self.target_templates_per_batch = count;
2638        self
2639    }
2640
2641    /// Create a configuration auto-tuned for the given thread count.
2642    ///
2643    /// This adjusts queue capacity and batch sizes based on the number of threads
2644    /// to optimize throughput and reduce contention.
2645    #[must_use]
2646    pub fn auto_tuned(num_threads: usize, compression_level: u32) -> Self {
2647        let num_threads = num_threads.max(1);
2648
2649        // Scale queue capacity with thread count (min 64, max 256)
2650        let queue_capacity = (num_threads * 16).clamp(64, 256);
2651
2652        // Low water mark: trigger read refill when below thread count
2653        let input_low_water = num_threads.max(4);
2654
2655        // High water mark: trigger write when output is above 4x threads
2656        let output_high_water = (num_threads * 4).max(32);
2657
2658        // Scale BAM block batch size with thread count to reduce queue ops
2659        let blocks_per_read_batch = match num_threads {
2660            1..=3 => 16,
2661            4..=7 => 32,
2662            8..=15 => 48,
2663            _ => 64,
2664        };
2665
2666        Self {
2667            num_threads,
2668            queue_capacity,
2669            input_low_water,
2670            output_high_water,
2671            compression_level,
2672            blocks_per_read_batch,
2673            collect_stats: false,
2674            batch_size: 1,
2675            // Template-based batching: accumulate groups until ~500 templates
2676            // This provides consistent batch sizes regardless of templates-per-group ratio
2677            target_templates_per_batch: 500,
2678            header_already_read: false,
2679            scheduler_strategy: SchedulerStrategy::default(),
2680            queue_memory_limit: 0,           // No limit by default
2681            deadlock_timeout_secs: 10,       // Default 10 second timeout
2682            deadlock_recover_enabled: false, // Detection only by default
2683            shared_stats: None,              // No shared stats by default
2684        }
2685    }
2686
2687    /// Set the scheduler strategy.
2688    #[must_use]
2689    pub fn with_scheduler_strategy(mut self, strategy: SchedulerStrategy) -> Self {
2690        self.scheduler_strategy = strategy;
2691        self
2692    }
2693
2694    /// Set the queue memory limit.
2695    ///
2696    /// When set to a non-zero value, the pipeline will apply backpressure
2697    /// (refuse to push to queues) when total queue memory exceeds this limit.
2698    ///
2699    /// # Arguments
2700    /// * `limit_bytes` - Maximum allowed queue memory in bytes. Use 0 for no limit.
2701    #[must_use]
2702    pub fn with_queue_memory_limit(mut self, limit_bytes: u64) -> Self {
2703        self.queue_memory_limit = limit_bytes;
2704        self
2705    }
2706
2707    /// Set the deadlock detection timeout.
2708    ///
2709    /// # Arguments
2710    /// * `timeout_secs` - Timeout in seconds (0 = disabled, default 10)
2711    #[must_use]
2712    pub fn with_deadlock_timeout(mut self, timeout_secs: u64) -> Self {
2713        self.deadlock_timeout_secs = timeout_secs;
2714        self
2715    }
2716
2717    /// Enable or disable deadlock recovery.
2718    ///
2719    /// When enabled, the pipeline will double memory limits when a deadlock
2720    /// is detected, and restore toward original limits after sustained progress.
2721    #[must_use]
2722    pub fn with_deadlock_recovery(mut self, enabled: bool) -> Self {
2723        self.deadlock_recover_enabled = enabled;
2724        self
2725    }
2726}
2727
2728// ============================================================================
2729// Pipeline Statistics
2730// ============================================================================
2731
2732/// Maximum number of threads supported for per-thread statistics.
2733pub const MAX_THREADS: usize = 32;
2734
2735/// Number of pipeline steps for array sizing.
2736const NUM_STEPS: usize = 9;
2737
2738/// A snapshot of queue sizes at a point in time.
2739#[derive(Debug, Clone)]
2740pub struct QueueSample {
2741    /// Time since pipeline start in milliseconds.
2742    pub time_ms: u64,
2743    /// Size of each queue: `[Q1, Q2, Q2b, Q3, Q4, Q5, Q6, Q7]`.
2744    pub queue_sizes: [usize; 8],
2745    /// Size of reorder buffers: `[Q2_reorder, Q3_reorder, Q7_reorder]`.
2746    pub reorder_sizes: [usize; 3],
2747    /// Estimated memory usage per queue in bytes: `[Q1, Q2, Q2b, Q3, Q4, Q5, Q6, Q7]`.
2748    /// Zero means not measured.
2749    pub queue_memory_bytes: [u64; 8],
2750    /// Estimated memory in reorder buffers in bytes: `[Q2_reorder, Q3_reorder, Q7_reorder]`.
2751    pub reorder_memory_bytes: [u64; 3],
2752    /// Current step each thread is on (0-8, or 255 for idle).
2753    pub thread_steps: Vec<u8>,
2754}
2755
2756/// Statistics collected during pipeline execution.
2757///
2758/// These metrics help identify bottlenecks and optimization opportunities.
2759/// All counters are atomic to allow lock-free updates from multiple threads.
2760#[derive(Debug)]
2761pub struct PipelineStats {
2762    // Per-step timing (nanoseconds)
2763    /// Total time spent in Step 1: Read
2764    pub step_read_ns: AtomicU64,
2765    /// Total time spent in Step 2: Decompress
2766    pub step_decompress_ns: AtomicU64,
2767    /// Total time spent in Step 3: `FindBoundaries`
2768    pub step_find_boundaries_ns: AtomicU64,
2769    /// Total time spent in Step 4: Decode
2770    pub step_decode_ns: AtomicU64,
2771    /// Total time spent in Step 5: Group
2772    pub step_group_ns: AtomicU64,
2773    /// Total time spent in Step 6: Process
2774    pub step_process_ns: AtomicU64,
2775    /// Total time spent in Step 7: Serialize
2776    pub step_serialize_ns: AtomicU64,
2777    /// Total time spent in Step 8: Compress
2778    pub step_compress_ns: AtomicU64,
2779    /// Total time spent in Step 9: Write
2780    pub step_write_ns: AtomicU64,
2781
2782    // Per-step success counts
2783    /// Number of successful Read operations
2784    pub step_read_count: AtomicU64,
2785    /// Number of successful Decompress operations
2786    pub step_decompress_count: AtomicU64,
2787    /// Number of successful `FindBoundaries` operations
2788    pub step_find_boundaries_count: AtomicU64,
2789    /// Number of successful Decode operations
2790    pub step_decode_count: AtomicU64,
2791    /// Number of successful Group operations
2792    pub step_group_count: AtomicU64,
2793    /// Number of successful Process operations
2794    pub step_process_count: AtomicU64,
2795    /// Number of successful Serialize operations
2796    pub step_serialize_count: AtomicU64,
2797    /// Number of successful Compress operations
2798    pub step_compress_count: AtomicU64,
2799    /// Number of successful Write operations
2800    pub step_write_count: AtomicU64,
2801
2802    // Contention metrics
2803    /// Number of failed `try_lock` attempts on input file
2804    pub read_contention: AtomicU64,
2805    /// Number of failed `try_lock` attempts on boundary state
2806    pub boundary_contention: AtomicU64,
2807    /// Number of failed `try_lock` attempts on group state
2808    pub group_contention: AtomicU64,
2809    /// Number of failed `try_lock` attempts on output file
2810    pub write_contention: AtomicU64,
2811
2812    // Queue metrics
2813    /// Number of times Q1 was empty when trying to decompress
2814    pub q1_empty: AtomicU64,
2815    /// Number of times Q2 was empty when trying to find boundaries
2816    pub q2_empty: AtomicU64,
2817    /// Number of times Q2b was empty when trying to decode
2818    pub q2b_empty: AtomicU64,
2819    /// Number of times Q3 was empty when trying to group
2820    pub q3_empty: AtomicU64,
2821    /// Number of times Q4 was empty when trying to process
2822    pub q4_empty: AtomicU64,
2823    /// Number of times Q5 was empty when trying to serialize
2824    pub q5_empty: AtomicU64,
2825    /// Number of times Q6 was empty when trying to compress
2826    pub q6_empty: AtomicU64,
2827    /// Number of times Q7 was empty when trying to write
2828    pub q7_empty: AtomicU64,
2829
2830    // Idle tracking
2831    /// Number of `yield_now()` calls (no work available)
2832    pub idle_yields: AtomicU64,
2833
2834    // ========================================================================
2835    // NEW: Per-thread statistics for bottleneck analysis
2836    // ========================================================================
2837    /// Per-thread step counts (successes): `[thread_id][step_index]`
2838    /// Tracks how many times each thread successfully executed each step.
2839    pub per_thread_step_counts: Box<[[AtomicU64; NUM_STEPS]; MAX_THREADS]>,
2840
2841    /// Per-thread step attempts: `[thread_id][step_index]`
2842    /// Tracks how many times each thread attempted each step (success + failure).
2843    pub per_thread_step_attempts: Box<[[AtomicU64; NUM_STEPS]; MAX_THREADS]>,
2844
2845    /// Per-thread idle time in nanoseconds.
2846    /// Time spent in `yield_now()` when no work was available.
2847    pub per_thread_idle_ns: Box<[AtomicU64; MAX_THREADS]>,
2848
2849    /// Current step each thread is working on (for activity snapshots).
2850    /// Value is step index (0-8) or 255 for idle.
2851    pub per_thread_current_step: Box<[AtomicU8; MAX_THREADS]>,
2852
2853    // Queue wait time (time spent waiting for exclusive step locks)
2854    /// Total time spent waiting for `FindBoundaries` lock (nanoseconds).
2855    pub boundary_wait_ns: AtomicU64,
2856    /// Total time spent waiting for Group lock (nanoseconds).
2857    pub group_wait_ns: AtomicU64,
2858    /// Total time spent waiting for Write lock (nanoseconds).
2859    pub write_wait_ns: AtomicU64,
2860
2861    // Batch size tracking
2862    /// Sum of all batch sizes (for computing average).
2863    pub batch_size_sum: AtomicU64,
2864    /// Number of batches processed.
2865    pub batch_count: AtomicU64,
2866    /// Minimum batch size seen.
2867    pub batch_size_min: AtomicU64,
2868    /// Maximum batch size seen.
2869    pub batch_size_max: AtomicU64,
2870
2871    /// Number of threads configured (for display).
2872    pub num_threads: AtomicU64,
2873
2874    // ========================================================================
2875    // Throughput metrics (bytes/records processed per step)
2876    // ========================================================================
2877    /// Total bytes read from input file.
2878    pub bytes_read: AtomicU64,
2879    /// Total bytes written to output file.
2880    pub bytes_written: AtomicU64,
2881    /// Compressed bytes input to decompress step.
2882    pub compressed_bytes_in: AtomicU64,
2883    /// Decompressed bytes output from decompress step.
2884    pub decompressed_bytes: AtomicU64,
2885    /// Serialized bytes input to compress step.
2886    pub serialized_bytes: AtomicU64,
2887    /// Compressed bytes output from compress step.
2888    pub compressed_bytes_out: AtomicU64,
2889    /// Number of records decoded.
2890    pub records_decoded: AtomicU64,
2891    /// Number of groups produced by grouper.
2892    pub groups_produced: AtomicU64,
2893
2894    // ========================================================================
2895    // Queue monitoring samples (periodic snapshots of queue sizes)
2896    // ========================================================================
2897    /// Collected queue size samples for timeline analysis.
2898    /// Protected by Mutex since samples are collected periodically from monitor thread.
2899    pub queue_samples: Mutex<Vec<QueueSample>>,
2900
2901    // ========================================================================
2902    // Memory limiting statistics
2903    // ========================================================================
2904    /// Number of times memory drain mode was activated.
2905    pub memory_drain_activations: AtomicU64,
2906    /// Number of times Group step was rejected due to memory limit.
2907    pub group_memory_rejects: AtomicU64,
2908    /// Peak memory usage in the reorder buffer (bytes).
2909    pub peak_memory_bytes: AtomicU64,
2910
2911    // ========================================================================
2912    // COMPREHENSIVE MEMORY TRACKING SYSTEM (behind memory-debug feature)
2913    // ========================================================================
2914    /// Comprehensive memory tracking stats, only present with `memory-debug` feature.
2915    #[cfg(feature = "memory-debug")]
2916    pub memory: MemoryDebugStats,
2917}
2918
2919/// Helper to create a boxed array of `AtomicU64` initialized to zero.
2920/// We use Box to avoid stack overflow for large arrays.
2921#[allow(clippy::unnecessary_box_returns)]
2922fn new_atomic_array<const N: usize>() -> Box<[AtomicU64; N]> {
2923    // Create a Vec and convert to boxed slice, then try_into boxed array
2924    let v: Vec<AtomicU64> = (0..N).map(|_| AtomicU64::new(0)).collect();
2925    v.into_boxed_slice().try_into().unwrap()
2926}
2927
2928/// Helper to create a boxed 2D array of `AtomicU64` initialized to zero.
2929/// We use Box to avoid stack overflow for large arrays.
2930#[allow(clippy::unnecessary_box_returns)]
2931fn new_atomic_2d_array<const R: usize, const C: usize>() -> Box<[[AtomicU64; C]; R]> {
2932    let v: Vec<[AtomicU64; C]> =
2933        (0..R).map(|_| std::array::from_fn(|_| AtomicU64::new(0))).collect();
2934    v.into_boxed_slice().try_into().unwrap()
2935}
2936
2937/// Helper to create a boxed array of `AtomicU8` initialized to a value.
2938#[allow(clippy::unnecessary_box_returns)]
2939fn new_atomic_u8_array<const N: usize>(init: u8) -> Box<[AtomicU8; N]> {
2940    let v: Vec<AtomicU8> = (0..N).map(|_| AtomicU8::new(init)).collect();
2941    v.into_boxed_slice().try_into().unwrap()
2942}
2943
2944impl Default for PipelineStats {
2945    fn default() -> Self {
2946        Self::new()
2947    }
2948}
2949
2950impl PipelineStats {
2951    /// Create a new stats collector.
2952    #[must_use]
2953    pub fn new() -> Self {
2954        Self {
2955            step_read_ns: AtomicU64::new(0),
2956            step_decompress_ns: AtomicU64::new(0),
2957            step_find_boundaries_ns: AtomicU64::new(0),
2958            step_decode_ns: AtomicU64::new(0),
2959            step_group_ns: AtomicU64::new(0),
2960            step_process_ns: AtomicU64::new(0),
2961            step_serialize_ns: AtomicU64::new(0),
2962            step_compress_ns: AtomicU64::new(0),
2963            step_write_ns: AtomicU64::new(0),
2964            step_read_count: AtomicU64::new(0),
2965            step_decompress_count: AtomicU64::new(0),
2966            step_find_boundaries_count: AtomicU64::new(0),
2967            step_decode_count: AtomicU64::new(0),
2968            step_group_count: AtomicU64::new(0),
2969            step_process_count: AtomicU64::new(0),
2970            step_serialize_count: AtomicU64::new(0),
2971            step_compress_count: AtomicU64::new(0),
2972            step_write_count: AtomicU64::new(0),
2973            read_contention: AtomicU64::new(0),
2974            boundary_contention: AtomicU64::new(0),
2975            group_contention: AtomicU64::new(0),
2976            write_contention: AtomicU64::new(0),
2977            q1_empty: AtomicU64::new(0),
2978            q2_empty: AtomicU64::new(0),
2979            q2b_empty: AtomicU64::new(0),
2980            q3_empty: AtomicU64::new(0),
2981            q4_empty: AtomicU64::new(0),
2982            q5_empty: AtomicU64::new(0),
2983            q6_empty: AtomicU64::new(0),
2984            q7_empty: AtomicU64::new(0),
2985            idle_yields: AtomicU64::new(0),
2986            // New per-thread stats
2987            per_thread_step_counts: new_atomic_2d_array::<MAX_THREADS, NUM_STEPS>(),
2988            per_thread_step_attempts: new_atomic_2d_array::<MAX_THREADS, NUM_STEPS>(),
2989            per_thread_idle_ns: new_atomic_array::<MAX_THREADS>(),
2990            per_thread_current_step: new_atomic_u8_array::<MAX_THREADS>(255), // 255 = idle
2991            boundary_wait_ns: AtomicU64::new(0),
2992            group_wait_ns: AtomicU64::new(0),
2993            write_wait_ns: AtomicU64::new(0),
2994            batch_size_sum: AtomicU64::new(0),
2995            batch_count: AtomicU64::new(0),
2996            batch_size_min: AtomicU64::new(u64::MAX),
2997            batch_size_max: AtomicU64::new(0),
2998            num_threads: AtomicU64::new(0),
2999            // Throughput metrics
3000            bytes_read: AtomicU64::new(0),
3001            bytes_written: AtomicU64::new(0),
3002            compressed_bytes_in: AtomicU64::new(0),
3003            decompressed_bytes: AtomicU64::new(0),
3004            serialized_bytes: AtomicU64::new(0),
3005            compressed_bytes_out: AtomicU64::new(0),
3006            records_decoded: AtomicU64::new(0),
3007            groups_produced: AtomicU64::new(0),
3008            // Queue monitoring
3009            queue_samples: Mutex::new(Vec::new()),
3010            // Memory limiting stats
3011            memory_drain_activations: AtomicU64::new(0),
3012            group_memory_rejects: AtomicU64::new(0),
3013            peak_memory_bytes: AtomicU64::new(0),
3014
3015            #[cfg(feature = "memory-debug")]
3016            memory: MemoryDebugStats::new(),
3017        }
3018    }
3019
3020    /// Set the number of threads for display purposes.
3021    pub fn set_num_threads(&self, n: usize) {
3022        self.num_threads.store(n as u64, Ordering::Relaxed);
3023    }
3024
3025    /// Record time and success for a step (without per-thread tracking).
3026    #[inline]
3027    pub fn record_step(&self, step: PipelineStep, elapsed_ns: u64) {
3028        self.record_step_for_thread(step, elapsed_ns, None);
3029    }
3030
3031    /// Record time and success for a step with per-thread tracking.
3032    #[inline]
3033    pub fn record_step_for_thread(
3034        &self,
3035        step: PipelineStep,
3036        elapsed_ns: u64,
3037        thread_id: Option<usize>,
3038    ) {
3039        // Record global stats
3040        match step {
3041            PipelineStep::Read => {
3042                self.step_read_ns.fetch_add(elapsed_ns, Ordering::Relaxed);
3043                self.step_read_count.fetch_add(1, Ordering::Relaxed);
3044            }
3045            PipelineStep::Decompress => {
3046                self.step_decompress_ns.fetch_add(elapsed_ns, Ordering::Relaxed);
3047                self.step_decompress_count.fetch_add(1, Ordering::Relaxed);
3048            }
3049            PipelineStep::FindBoundaries => {
3050                self.step_find_boundaries_ns.fetch_add(elapsed_ns, Ordering::Relaxed);
3051                self.step_find_boundaries_count.fetch_add(1, Ordering::Relaxed);
3052            }
3053            PipelineStep::Decode => {
3054                self.step_decode_ns.fetch_add(elapsed_ns, Ordering::Relaxed);
3055                self.step_decode_count.fetch_add(1, Ordering::Relaxed);
3056            }
3057            PipelineStep::Group => {
3058                self.step_group_ns.fetch_add(elapsed_ns, Ordering::Relaxed);
3059                self.step_group_count.fetch_add(1, Ordering::Relaxed);
3060            }
3061            PipelineStep::Process => {
3062                self.step_process_ns.fetch_add(elapsed_ns, Ordering::Relaxed);
3063                self.step_process_count.fetch_add(1, Ordering::Relaxed);
3064            }
3065            PipelineStep::Serialize => {
3066                self.step_serialize_ns.fetch_add(elapsed_ns, Ordering::Relaxed);
3067                self.step_serialize_count.fetch_add(1, Ordering::Relaxed);
3068            }
3069            PipelineStep::Compress => {
3070                self.step_compress_ns.fetch_add(elapsed_ns, Ordering::Relaxed);
3071                self.step_compress_count.fetch_add(1, Ordering::Relaxed);
3072            }
3073            PipelineStep::Write => {
3074                self.step_write_ns.fetch_add(elapsed_ns, Ordering::Relaxed);
3075                self.step_write_count.fetch_add(1, Ordering::Relaxed);
3076            }
3077        }
3078
3079        // Record per-thread stats if thread_id provided
3080        if let Some(tid) = thread_id {
3081            if tid < MAX_THREADS {
3082                let step_idx = step as usize;
3083                self.per_thread_step_counts[tid][step_idx].fetch_add(1, Ordering::Relaxed);
3084            }
3085        }
3086    }
3087
3088    /// Record lock contention for a mutex.
3089    #[inline]
3090    pub fn record_contention(&self, step: PipelineStep) {
3091        match step {
3092            PipelineStep::Read => {
3093                self.read_contention.fetch_add(1, Ordering::Relaxed);
3094            }
3095            PipelineStep::FindBoundaries => {
3096                self.boundary_contention.fetch_add(1, Ordering::Relaxed);
3097            }
3098            PipelineStep::Group => {
3099                self.group_contention.fetch_add(1, Ordering::Relaxed);
3100            }
3101            PipelineStep::Write => {
3102                self.write_contention.fetch_add(1, Ordering::Relaxed);
3103            }
3104            _ => {}
3105        }
3106    }
3107
3108    /// Record time spent waiting for a lock (exclusive step contention time).
3109    #[inline]
3110    pub fn record_wait_time(&self, step: PipelineStep, wait_ns: u64) {
3111        match step {
3112            PipelineStep::FindBoundaries => {
3113                self.boundary_wait_ns.fetch_add(wait_ns, Ordering::Relaxed);
3114            }
3115            PipelineStep::Group => {
3116                self.group_wait_ns.fetch_add(wait_ns, Ordering::Relaxed);
3117            }
3118            PipelineStep::Write => {
3119                self.write_wait_ns.fetch_add(wait_ns, Ordering::Relaxed);
3120            }
3121            _ => {}
3122        }
3123    }
3124
3125    /// Record empty queue poll.
3126    ///
3127    /// Queue numbers: 1=raw, 2=decompressed, 25=boundaries (Q2b), 3=decoded,
3128    /// 4=groups, 5=processed, 6=serialized, 7=compressed
3129    #[inline]
3130    pub fn record_queue_empty(&self, queue_num: usize) {
3131        match queue_num {
3132            1 => self.q1_empty.fetch_add(1, Ordering::Relaxed),
3133            2 => self.q2_empty.fetch_add(1, Ordering::Relaxed),
3134            25 => self.q2b_empty.fetch_add(1, Ordering::Relaxed), // Q2b (boundaries)
3135            3 => self.q3_empty.fetch_add(1, Ordering::Relaxed),
3136            4 => self.q4_empty.fetch_add(1, Ordering::Relaxed),
3137            5 => self.q5_empty.fetch_add(1, Ordering::Relaxed),
3138            6 => self.q6_empty.fetch_add(1, Ordering::Relaxed),
3139            7 => self.q7_empty.fetch_add(1, Ordering::Relaxed),
3140            _ => 0,
3141        };
3142    }
3143
3144    /// Record an idle yield (without timing).
3145    #[inline]
3146    pub fn record_idle(&self) {
3147        self.idle_yields.fetch_add(1, Ordering::Relaxed);
3148    }
3149
3150    /// Record idle time for a specific thread.
3151    #[inline]
3152    pub fn record_idle_for_thread(&self, thread_id: usize, idle_ns: u64) {
3153        self.idle_yields.fetch_add(1, Ordering::Relaxed);
3154        if thread_id < MAX_THREADS {
3155            self.per_thread_idle_ns[thread_id].fetch_add(idle_ns, Ordering::Relaxed);
3156        }
3157    }
3158
3159    /// Record a step attempt for a specific thread (called before attempting).
3160    /// This tracks total attempts regardless of success/failure.
3161    #[inline]
3162    #[allow(clippy::cast_possible_truncation)]
3163    pub fn record_step_attempt(&self, thread_id: usize, step: PipelineStep) {
3164        if thread_id < MAX_THREADS {
3165            let step_idx = step as usize;
3166            self.per_thread_step_attempts[thread_id][step_idx].fetch_add(1, Ordering::Relaxed);
3167            // Also update current step
3168            self.per_thread_current_step[thread_id].store(step_idx as u8, Ordering::Relaxed);
3169        }
3170    }
3171
3172    /// Set the current step a thread is working on.
3173    #[inline]
3174    pub fn set_current_step(&self, thread_id: usize, step: PipelineStep) {
3175        if thread_id < MAX_THREADS {
3176            self.per_thread_current_step[thread_id].store(step as u8, Ordering::Relaxed);
3177        }
3178    }
3179
3180    /// Clear the current step (thread is idle or between steps).
3181    #[inline]
3182    pub fn clear_current_step(&self, thread_id: usize) {
3183        if thread_id < MAX_THREADS {
3184            self.per_thread_current_step[thread_id].store(255, Ordering::Relaxed);
3185        }
3186    }
3187
3188    /// Get current step for all threads (for activity snapshot).
3189    #[allow(clippy::cast_possible_truncation)]
3190    pub fn get_thread_activity(&self, num_threads: usize) -> Vec<Option<PipelineStep>> {
3191        (0..num_threads.min(MAX_THREADS))
3192            .map(|tid| {
3193                let step_idx = self.per_thread_current_step[tid].load(Ordering::Relaxed);
3194                if step_idx < NUM_STEPS as u8 {
3195                    Some(PipelineStep::from_index(step_idx as usize))
3196                } else {
3197                    None // idle
3198                }
3199            })
3200            .collect()
3201    }
3202
3203    /// Record a batch size for batch size distribution tracking.
3204    #[inline]
3205    pub fn record_batch_size(&self, size: usize) {
3206        let size = size as u64;
3207        self.batch_size_sum.fetch_add(size, Ordering::Relaxed);
3208        self.batch_count.fetch_add(1, Ordering::Relaxed);
3209
3210        // Update min (using compare-exchange loop)
3211        let mut current_min = self.batch_size_min.load(Ordering::Relaxed);
3212        while size < current_min {
3213            match self.batch_size_min.compare_exchange_weak(
3214                current_min,
3215                size,
3216                Ordering::Relaxed,
3217                Ordering::Relaxed,
3218            ) {
3219                Ok(_) => break,
3220                Err(actual) => current_min = actual,
3221            }
3222        }
3223
3224        // Update max
3225        let mut current_max = self.batch_size_max.load(Ordering::Relaxed);
3226        while size > current_max {
3227            match self.batch_size_max.compare_exchange_weak(
3228                current_max,
3229                size,
3230                Ordering::Relaxed,
3231                Ordering::Relaxed,
3232            ) {
3233                Ok(_) => break,
3234                Err(actual) => current_max = actual,
3235            }
3236        }
3237    }
3238
3239    /// Add a queue size sample for periodic monitoring.
3240    /// Called from a background monitor thread every ~100ms.
3241    pub fn add_queue_sample(&self, sample: QueueSample) {
3242        self.queue_samples.lock().push(sample);
3243    }
3244
3245    /// Get the collected queue samples for analysis.
3246    pub fn get_queue_samples(&self) -> Vec<QueueSample> {
3247        self.queue_samples.lock().clone()
3248    }
3249
3250    /// Record a memory drain mode activation.
3251    #[inline]
3252    pub fn record_memory_drain_activation(&self) {
3253        self.memory_drain_activations.fetch_add(1, Ordering::Relaxed);
3254    }
3255
3256    /// Record a Group step rejection due to memory limit.
3257    #[inline]
3258    pub fn record_group_memory_reject(&self) {
3259        self.group_memory_rejects.fetch_add(1, Ordering::Relaxed);
3260    }
3261
3262    /// Update peak memory usage if current is higher.
3263    #[inline]
3264    pub fn record_memory_usage(&self, bytes: u64) {
3265        let mut current_peak = self.peak_memory_bytes.load(Ordering::Relaxed);
3266        while bytes > current_peak {
3267            match self.peak_memory_bytes.compare_exchange_weak(
3268                current_peak,
3269                bytes,
3270                Ordering::Relaxed,
3271                Ordering::Relaxed,
3272            ) {
3273                Ok(_) => break,
3274                Err(actual) => current_peak = actual,
3275            }
3276        }
3277    }
3278
3279    /// Format statistics as a human-readable summary.
3280    #[allow(clippy::similar_names)] // Intentional: xxx_ns vs xxx_ms for nanoseconds vs milliseconds
3281    #[allow(
3282        clippy::too_many_lines,
3283        clippy::cast_precision_loss,
3284        clippy::cast_possible_truncation,
3285        clippy::cast_sign_loss
3286    )]
3287    pub fn format_summary(&self) -> String {
3288        use std::fmt::Write;
3289
3290        let mut s = String::new();
3291        writeln!(s, "Pipeline Statistics:").unwrap();
3292        writeln!(s).unwrap();
3293
3294        // Helper to format step stats
3295        #[allow(clippy::uninlined_format_args)]
3296        let format_step = |name: &str, ns: u64, count: u64| -> String {
3297            if count == 0 {
3298                format!("  {:<20} {:>10} ops, {:>12}", name, 0, "-")
3299            } else {
3300                let total_ms = ns as f64 / 1_000_000.0;
3301                let avg_us = (ns as f64 / count as f64) / 1_000.0;
3302                format!(
3303                    "  {:<20} {:>10} ops, {:>10.1}ms total, {:>8.1}µs avg",
3304                    name, count, total_ms, avg_us
3305                )
3306            }
3307        };
3308
3309        writeln!(s, "Step Timing:").unwrap();
3310        writeln!(
3311            s,
3312            "{}",
3313            format_step(
3314                "Read",
3315                self.step_read_ns.load(Ordering::Relaxed),
3316                self.step_read_count.load(Ordering::Relaxed)
3317            )
3318        )
3319        .unwrap();
3320        writeln!(
3321            s,
3322            "{}",
3323            format_step(
3324                "Decompress",
3325                self.step_decompress_ns.load(Ordering::Relaxed),
3326                self.step_decompress_count.load(Ordering::Relaxed)
3327            )
3328        )
3329        .unwrap();
3330        writeln!(
3331            s,
3332            "{}",
3333            format_step(
3334                "FindBoundaries",
3335                self.step_find_boundaries_ns.load(Ordering::Relaxed),
3336                self.step_find_boundaries_count.load(Ordering::Relaxed)
3337            )
3338        )
3339        .unwrap();
3340        writeln!(
3341            s,
3342            "{}",
3343            format_step(
3344                "Decode",
3345                self.step_decode_ns.load(Ordering::Relaxed),
3346                self.step_decode_count.load(Ordering::Relaxed)
3347            )
3348        )
3349        .unwrap();
3350        writeln!(
3351            s,
3352            "{}",
3353            format_step(
3354                "Group",
3355                self.step_group_ns.load(Ordering::Relaxed),
3356                self.step_group_count.load(Ordering::Relaxed)
3357            )
3358        )
3359        .unwrap();
3360        writeln!(
3361            s,
3362            "{}",
3363            format_step(
3364                "Process",
3365                self.step_process_ns.load(Ordering::Relaxed),
3366                self.step_process_count.load(Ordering::Relaxed)
3367            )
3368        )
3369        .unwrap();
3370        writeln!(
3371            s,
3372            "{}",
3373            format_step(
3374                "Serialize",
3375                self.step_serialize_ns.load(Ordering::Relaxed),
3376                self.step_serialize_count.load(Ordering::Relaxed)
3377            )
3378        )
3379        .unwrap();
3380        writeln!(
3381            s,
3382            "{}",
3383            format_step(
3384                "Compress",
3385                self.step_compress_ns.load(Ordering::Relaxed),
3386                self.step_compress_count.load(Ordering::Relaxed)
3387            )
3388        )
3389        .unwrap();
3390        writeln!(
3391            s,
3392            "{}",
3393            format_step(
3394                "Write",
3395                self.step_write_ns.load(Ordering::Relaxed),
3396                self.step_write_count.load(Ordering::Relaxed)
3397            )
3398        )
3399        .unwrap();
3400
3401        writeln!(s).unwrap();
3402        writeln!(s, "Contention:").unwrap();
3403        writeln!(
3404            s,
3405            "  Read lock:     {:>10} failed attempts",
3406            self.read_contention.load(Ordering::Relaxed)
3407        )
3408        .unwrap();
3409        writeln!(
3410            s,
3411            "  Boundary lock: {:>10} failed attempts",
3412            self.boundary_contention.load(Ordering::Relaxed)
3413        )
3414        .unwrap();
3415        writeln!(
3416            s,
3417            "  Group lock:    {:>10} failed attempts",
3418            self.group_contention.load(Ordering::Relaxed)
3419        )
3420        .unwrap();
3421        writeln!(
3422            s,
3423            "  Write lock:    {:>10} failed attempts",
3424            self.write_contention.load(Ordering::Relaxed)
3425        )
3426        .unwrap();
3427
3428        writeln!(s).unwrap();
3429        writeln!(s, "Queue Empty Polls:").unwrap();
3430        writeln!(s, "  Q1 (raw):        {:>10}", self.q1_empty.load(Ordering::Relaxed)).unwrap();
3431        writeln!(s, "  Q2 (decomp):     {:>10}", self.q2_empty.load(Ordering::Relaxed)).unwrap();
3432        writeln!(s, "  Q2b (boundary):  {:>10}", self.q2b_empty.load(Ordering::Relaxed)).unwrap();
3433        writeln!(s, "  Q3 (decoded):    {:>10}", self.q3_empty.load(Ordering::Relaxed)).unwrap();
3434        writeln!(s, "  Q4 (groups):     {:>10}", self.q4_empty.load(Ordering::Relaxed)).unwrap();
3435        writeln!(s, "  Q5 (processed):  {:>10}", self.q5_empty.load(Ordering::Relaxed)).unwrap();
3436        writeln!(s, "  Q6 (serialized): {:>10}", self.q6_empty.load(Ordering::Relaxed)).unwrap();
3437        writeln!(s, "  Q7 (compressed): {:>10}", self.q7_empty.load(Ordering::Relaxed)).unwrap();
3438
3439        writeln!(s).unwrap();
3440        writeln!(s, "Idle Yields: {:>10}", self.idle_yields.load(Ordering::Relaxed)).unwrap();
3441
3442        // NEW: Wait time for exclusive steps
3443        let boundary_wait = self.boundary_wait_ns.load(Ordering::Relaxed);
3444        let group_wait = self.group_wait_ns.load(Ordering::Relaxed);
3445        let write_wait = self.write_wait_ns.load(Ordering::Relaxed);
3446        if boundary_wait > 0 || group_wait > 0 || write_wait > 0 {
3447            writeln!(s).unwrap();
3448            writeln!(s, "Lock Wait Time:").unwrap();
3449            writeln!(s, "  Boundary lock: {:>10.1}ms", boundary_wait as f64 / 1_000_000.0).unwrap();
3450            writeln!(s, "  Group lock:    {:>10.1}ms", group_wait as f64 / 1_000_000.0).unwrap();
3451            writeln!(s, "  Write lock:    {:>10.1}ms", write_wait as f64 / 1_000_000.0).unwrap();
3452        }
3453
3454        // NEW: Batch size statistics
3455        let batch_count = self.batch_count.load(Ordering::Relaxed);
3456        if batch_count > 0 {
3457            let batch_sum = self.batch_size_sum.load(Ordering::Relaxed);
3458            let batch_min = self.batch_size_min.load(Ordering::Relaxed);
3459            let batch_max = self.batch_size_max.load(Ordering::Relaxed);
3460            let batch_avg = batch_sum as f64 / batch_count as f64;
3461
3462            writeln!(s).unwrap();
3463            writeln!(s, "Batch Size (records per batch):").unwrap();
3464            writeln!(s, "  Count:   {batch_count:>10}").unwrap();
3465            writeln!(s, "  Min:     {batch_min:>10}").unwrap();
3466            writeln!(s, "  Max:     {batch_max:>10}").unwrap();
3467            writeln!(s, "  Average: {batch_avg:>10.1}").unwrap();
3468        }
3469
3470        // NEW: Per-thread work distribution
3471        let num_threads = self.num_threads.load(Ordering::Relaxed) as usize;
3472        if num_threads > 0 {
3473            writeln!(s).unwrap();
3474            writeln!(s, "Per-Thread Work Distribution:").unwrap();
3475
3476            // Step names for the header
3477            let step_names = ["Rd", "Dc", "Fb", "De", "Gr", "Pr", "Se", "Co", "Wr"];
3478
3479            // Header
3480            write!(s, "  Thread ").unwrap();
3481            for name in &step_names {
3482                write!(s, " {name:>6}").unwrap();
3483            }
3484            writeln!(s, "    Idle ms").unwrap();
3485
3486            // Per-thread rows
3487            for tid in 0..num_threads.min(MAX_THREADS) {
3488                write!(s, "  T{tid:<5} ").unwrap();
3489                for step_idx in 0..NUM_STEPS {
3490                    let count = self.per_thread_step_counts[tid][step_idx].load(Ordering::Relaxed);
3491                    write!(s, " {count:>6}").unwrap();
3492                }
3493                let idle_ns = self.per_thread_idle_ns[tid].load(Ordering::Relaxed);
3494                writeln!(s, " {:>10.1}", idle_ns as f64 / 1_000_000.0).unwrap();
3495            }
3496
3497            // Total row
3498            write!(s, "  Total  ").unwrap();
3499            for step_idx in 0..NUM_STEPS {
3500                let mut total = 0u64;
3501                for tid in 0..num_threads.min(MAX_THREADS) {
3502                    total += self.per_thread_step_counts[tid][step_idx].load(Ordering::Relaxed);
3503                }
3504                write!(s, " {total:>6}").unwrap();
3505            }
3506            let total_idle: u64 = (0..num_threads.min(MAX_THREADS))
3507                .map(|tid| self.per_thread_idle_ns[tid].load(Ordering::Relaxed))
3508                .sum();
3509            writeln!(s, " {:>10.1}", total_idle as f64 / 1_000_000.0).unwrap();
3510
3511            // Per-thread attempt statistics (shows success rate)
3512            writeln!(s).unwrap();
3513            writeln!(s, "Per-Thread Attempt Success Rate:").unwrap();
3514
3515            // Header
3516            write!(s, "  Thread ").unwrap();
3517            for name in &step_names {
3518                write!(s, " {name:>6}").unwrap();
3519            }
3520            writeln!(s, "   Total%").unwrap();
3521
3522            // Per-thread rows with success rates
3523            for tid in 0..num_threads.min(MAX_THREADS) {
3524                write!(s, "  T{tid:<5} ").unwrap();
3525                let mut thread_attempts = 0u64;
3526                let mut thread_successes = 0u64;
3527                for step_idx in 0..NUM_STEPS {
3528                    let attempts =
3529                        self.per_thread_step_attempts[tid][step_idx].load(Ordering::Relaxed);
3530                    let successes =
3531                        self.per_thread_step_counts[tid][step_idx].load(Ordering::Relaxed);
3532                    thread_attempts += attempts;
3533                    thread_successes += successes;
3534                    if attempts == 0 {
3535                        write!(s, "   -  ").unwrap();
3536                    } else {
3537                        let rate = (successes as f64 / attempts as f64) * 100.0;
3538                        write!(s, " {rate:>5.0}%").unwrap();
3539                    }
3540                }
3541                if thread_attempts == 0 {
3542                    writeln!(s, "      -").unwrap();
3543                } else {
3544                    let total_rate = (thread_successes as f64 / thread_attempts as f64) * 100.0;
3545                    writeln!(s, "  {total_rate:>5.1}%").unwrap();
3546                }
3547            }
3548        }
3549
3550        // Thread utilization summary
3551        let total_work_ns = self.step_read_ns.load(Ordering::Relaxed)
3552            + self.step_decompress_ns.load(Ordering::Relaxed)
3553            + self.step_find_boundaries_ns.load(Ordering::Relaxed)
3554            + self.step_decode_ns.load(Ordering::Relaxed)
3555            + self.step_group_ns.load(Ordering::Relaxed)
3556            + self.step_process_ns.load(Ordering::Relaxed)
3557            + self.step_serialize_ns.load(Ordering::Relaxed)
3558            + self.step_compress_ns.load(Ordering::Relaxed)
3559            + self.step_write_ns.load(Ordering::Relaxed);
3560
3561        let total_idle_ns: u64 = (0..num_threads.min(MAX_THREADS))
3562            .map(|tid| self.per_thread_idle_ns[tid].load(Ordering::Relaxed))
3563            .sum();
3564
3565        let total_contention = self.read_contention.load(Ordering::Relaxed)
3566            + self.boundary_contention.load(Ordering::Relaxed)
3567            + self.group_contention.load(Ordering::Relaxed)
3568            + self.write_contention.load(Ordering::Relaxed);
3569
3570        if total_work_ns > 0 {
3571            writeln!(s).unwrap();
3572            writeln!(s, "Thread Utilization:").unwrap();
3573
3574            let work_ms = total_work_ns as f64 / 1_000_000.0;
3575            let idle_ms = total_idle_ns as f64 / 1_000_000.0;
3576            let total_thread_ms = work_ms + idle_ms;
3577
3578            if total_thread_ms > 0.0 {
3579                let utilization = (work_ms / total_thread_ms) * 100.0;
3580                writeln!(s, "  Work time:       {work_ms:>10.1}ms").unwrap();
3581                writeln!(s, "  Idle time:       {idle_ms:>10.1}ms").unwrap();
3582                writeln!(s, "  Utilization:     {utilization:>10.1}%").unwrap();
3583                writeln!(s, "  Contention attempts: {total_contention:>7}").unwrap();
3584            }
3585        }
3586
3587        // ========== Throughput Metrics ==========
3588        let bytes_read = self.bytes_read.load(Ordering::Relaxed);
3589        let bytes_written = self.bytes_written.load(Ordering::Relaxed);
3590        let compressed_bytes_in = self.compressed_bytes_in.load(Ordering::Relaxed);
3591        let decompressed_bytes = self.decompressed_bytes.load(Ordering::Relaxed);
3592        let serialized_bytes = self.serialized_bytes.load(Ordering::Relaxed);
3593        let compressed_bytes_out = self.compressed_bytes_out.load(Ordering::Relaxed);
3594        let records_decoded = self.records_decoded.load(Ordering::Relaxed);
3595        let groups_produced = self.groups_produced.load(Ordering::Relaxed);
3596
3597        // Only show throughput section if we have data
3598        if bytes_read > 0 || bytes_written > 0 {
3599            writeln!(s).unwrap();
3600            writeln!(s, "Throughput:").unwrap();
3601
3602            // Helper function to format bytes nicely
3603            let format_bytes = |bytes: u64| -> String {
3604                if bytes >= 1_000_000_000 {
3605                    format!("{:.2} GB", bytes as f64 / 1_000_000_000.0)
3606                } else if bytes >= 1_000_000 {
3607                    format!("{:.1} MB", bytes as f64 / 1_000_000.0)
3608                } else if bytes >= 1_000 {
3609                    format!("{:.1} KB", bytes as f64 / 1_000.0)
3610                } else {
3611                    format!("{bytes} B")
3612                }
3613            };
3614
3615            // Helper to format count with K/M suffix
3616            let format_count = |count: u64| -> String {
3617                if count >= 1_000_000 {
3618                    format!("{:.2}M", count as f64 / 1_000_000.0)
3619                } else if count >= 1_000 {
3620                    format!("{:.1}K", count as f64 / 1_000.0)
3621                } else {
3622                    format!("{count}")
3623                }
3624            };
3625
3626            // Read throughput
3627            let read_ns = self.step_read_ns.load(Ordering::Relaxed);
3628            if bytes_read > 0 && read_ns > 0 {
3629                let read_ms = read_ns as f64 / 1_000_000.0;
3630                let read_mb_s = (bytes_read as f64 / 1_000_000.0) / (read_ms / 1000.0);
3631                writeln!(
3632                    s,
3633                    "  Read:        {:>10} in {:>8.1}ms = {:>8.1} MB/s",
3634                    format_bytes(bytes_read),
3635                    read_ms,
3636                    read_mb_s
3637                )
3638                .unwrap();
3639            }
3640
3641            // Decompress throughput (input → output with expansion ratio)
3642            let decompress_ns = self.step_decompress_ns.load(Ordering::Relaxed);
3643            if compressed_bytes_in > 0 && decompressed_bytes > 0 && decompress_ns > 0 {
3644                let decompress_ms = decompress_ns as f64 / 1_000_000.0;
3645                let in_mb_s = (compressed_bytes_in as f64 / 1_000_000.0) / (decompress_ms / 1000.0);
3646                let out_mb_s = (decompressed_bytes as f64 / 1_000_000.0) / (decompress_ms / 1000.0);
3647                let expansion = decompressed_bytes as f64 / compressed_bytes_in as f64;
3648                writeln!(
3649                    s,
3650                    "  Decompress:  {:>10} → {:>10} = {:>6.1} → {:>6.1} MB/s ({:.2}x expansion)",
3651                    format_bytes(compressed_bytes_in),
3652                    format_bytes(decompressed_bytes),
3653                    in_mb_s,
3654                    out_mb_s,
3655                    expansion
3656                )
3657                .unwrap();
3658            }
3659
3660            // Decode throughput (records per second)
3661            let decode_ns = self.step_decode_ns.load(Ordering::Relaxed);
3662            if records_decoded > 0 && decode_ns > 0 {
3663                let decode_ms = decode_ns as f64 / 1_000_000.0;
3664                let records_per_s = records_decoded as f64 / (decode_ms / 1000.0);
3665                writeln!(
3666                    s,
3667                    "  Decode:      {:>10} records     = {:>8} records/s",
3668                    format_count(records_decoded),
3669                    format_count(records_per_s as u64)
3670                )
3671                .unwrap();
3672            }
3673
3674            // Group throughput (records in → groups out)
3675            let group_ns = self.step_group_ns.load(Ordering::Relaxed);
3676            if records_decoded > 0 && groups_produced > 0 && group_ns > 0 {
3677                let group_ms = group_ns as f64 / 1_000_000.0;
3678                let records_in_per_s = records_decoded as f64 / (group_ms / 1000.0);
3679                let groups_out_per_s = groups_produced as f64 / (group_ms / 1000.0);
3680                writeln!(
3681                    s,
3682                    "  Group:       {:>10} → {:>10} = {:>6} records/s in, {:>6} groups/s out",
3683                    format_count(records_decoded),
3684                    format_count(groups_produced),
3685                    format_count(records_in_per_s as u64),
3686                    format_count(groups_out_per_s as u64)
3687                )
3688                .unwrap();
3689            }
3690
3691            // Process throughput (groups per second)
3692            let process_ns = self.step_process_ns.load(Ordering::Relaxed);
3693            if groups_produced > 0 && process_ns > 0 {
3694                let process_ms = process_ns as f64 / 1_000_000.0;
3695                let groups_per_s = groups_produced as f64 / (process_ms / 1000.0);
3696                writeln!(
3697                    s,
3698                    "  Process:     {:>10} groups      = {:>8} groups/s",
3699                    format_count(groups_produced),
3700                    format_count(groups_per_s as u64)
3701                )
3702                .unwrap();
3703            }
3704
3705            // Serialize throughput
3706            let serialize_ns = self.step_serialize_ns.load(Ordering::Relaxed);
3707            if serialized_bytes > 0 && serialize_ns > 0 {
3708                let serialize_ms = serialize_ns as f64 / 1_000_000.0;
3709                let mb_per_s = (serialized_bytes as f64 / 1_000_000.0) / (serialize_ms / 1000.0);
3710                writeln!(
3711                    s,
3712                    "  Serialize:   {:>10}             = {:>8.1} MB/s",
3713                    format_bytes(serialized_bytes),
3714                    mb_per_s
3715                )
3716                .unwrap();
3717            }
3718
3719            // Compress throughput (input → output with compression ratio)
3720            let compress_ns = self.step_compress_ns.load(Ordering::Relaxed);
3721            if serialized_bytes > 0 && compressed_bytes_out > 0 && compress_ns > 0 {
3722                let compress_ms = compress_ns as f64 / 1_000_000.0;
3723                let in_mb_s = (serialized_bytes as f64 / 1_000_000.0) / (compress_ms / 1000.0);
3724                let out_mb_s = (compressed_bytes_out as f64 / 1_000_000.0) / (compress_ms / 1000.0);
3725                let compression = serialized_bytes as f64 / compressed_bytes_out as f64;
3726                writeln!(
3727                    s,
3728                    "  Compress:    {:>10} → {:>10} = {:>6.1} → {:>6.1} MB/s ({:.2}x compression)",
3729                    format_bytes(serialized_bytes),
3730                    format_bytes(compressed_bytes_out),
3731                    in_mb_s,
3732                    out_mb_s,
3733                    compression
3734                )
3735                .unwrap();
3736            }
3737
3738            // Write throughput
3739            let write_ns = self.step_write_ns.load(Ordering::Relaxed);
3740            if bytes_written > 0 && write_ns > 0 {
3741                let write_ms = write_ns as f64 / 1_000_000.0;
3742                let write_mb_s = (bytes_written as f64 / 1_000_000.0) / (write_ms / 1000.0);
3743                writeln!(
3744                    s,
3745                    "  Write:       {:>10} in {:>8.1}ms = {:>8.1} MB/s",
3746                    format_bytes(bytes_written),
3747                    write_ms,
3748                    write_mb_s
3749                )
3750                .unwrap();
3751            }
3752        }
3753
3754        // Queue sample summary if we have any
3755        let samples = self.queue_samples.lock();
3756        if !samples.is_empty() {
3757            writeln!(s).unwrap();
3758            writeln!(s, "Queue Size Timeline ({} samples at ~100ms intervals):", samples.len())
3759                .unwrap();
3760            writeln!(
3761                s,
3762                "  Time   Q1   Q2  Q2b   Q3   Q4   Q5   Q6   Q7 | R2  R3  R7 |  R3_MB  Threads"
3763            )
3764            .unwrap();
3765
3766            // Show all samples
3767            for sample in samples.iter() {
3768                let r3_mb = sample.reorder_memory_bytes[1] as f64 / 1_048_576.0;
3769                write!(
3770                    s,
3771                    "  {:>4}  {:>3}  {:>3}  {:>3}  {:>3}  {:>3}  {:>3}  {:>3}  {:>3} | {:>3} {:>3} {:>3} | {:>6.1}  ",
3772                    sample.time_ms,
3773                    sample.queue_sizes[0],
3774                    sample.queue_sizes[1],
3775                    sample.queue_sizes[2],
3776                    sample.queue_sizes[3],
3777                    sample.queue_sizes[4],
3778                    sample.queue_sizes[5],
3779                    sample.queue_sizes[6],
3780                    sample.queue_sizes[7],
3781                    sample.reorder_sizes[0],
3782                    sample.reorder_sizes[1],
3783                    sample.reorder_sizes[2],
3784                    r3_mb,
3785                )
3786                .unwrap();
3787                // Show thread activity as compact string
3788                for &step_idx in &sample.thread_steps {
3789                    if step_idx < NUM_STEPS as u8 {
3790                        let short = match step_idx {
3791                            0 => "R",
3792                            1 => "D",
3793                            2 => "F",
3794                            3 => "d",
3795                            4 => "G",
3796                            5 => "P",
3797                            6 => "S",
3798                            7 => "C",
3799                            8 => "W",
3800                            _ => "?",
3801                        };
3802                        write!(s, "{short}").unwrap();
3803                    } else {
3804                        write!(s, ".").unwrap();
3805                    }
3806                }
3807                writeln!(s).unwrap();
3808            }
3809
3810            // Summary of peak reorder buffer usage
3811            let peak_r3_items = samples.iter().map(|s| s.reorder_sizes[1]).max().unwrap_or(0);
3812            let peak_r3_bytes =
3813                samples.iter().map(|s| s.reorder_memory_bytes[1]).max().unwrap_or(0);
3814            let peak_r3_mb = peak_r3_bytes as f64 / 1_048_576.0;
3815            writeln!(s).unwrap();
3816            writeln!(s, "Peak Q3 Reorder Buffer: {peak_r3_items} items, {peak_r3_mb:.1} MB")
3817                .unwrap();
3818        }
3819
3820        // Memory limiting statistics
3821        let group_rejects = self.group_memory_rejects.load(Ordering::Relaxed);
3822        let peak_memory = self.peak_memory_bytes.load(Ordering::Relaxed);
3823
3824        if group_rejects > 0 || peak_memory > 0 {
3825            writeln!(s).unwrap();
3826            writeln!(s, "Memory Limiting:").unwrap();
3827            if group_rejects > 0 {
3828                writeln!(s, "  Group rejects (memory): {group_rejects:>10}").unwrap();
3829            }
3830            if peak_memory > 0 {
3831                let peak_mb = peak_memory as f64 / 1_048_576.0;
3832                writeln!(s, "  Peak memory usage:      {peak_mb:>10.1} MB").unwrap();
3833            }
3834        }
3835
3836        s
3837    }
3838
3839    /// Log statistics using the log crate at info level.
3840    pub fn log_summary(&self) {
3841        for line in self.format_summary().lines() {
3842            info!("{line}");
3843        }
3844    }
3845}
3846
3847// ============================================================================
3848// Comprehensive Memory Tracking Methods (behind memory-debug feature)
3849// ============================================================================
3850
3851#[cfg(feature = "memory-debug")]
3852impl PipelineStats {
3853    /// Track queue memory addition (queue memory is separate from processing memory)
3854    pub fn track_queue_memory_add(&self, queue_name: &str, size: usize) {
3855        let m = &self.memory;
3856        let size_u64 = size as u64;
3857        match queue_name {
3858            "q1" => m.q1_memory_bytes.fetch_add(size_u64, Ordering::Relaxed),
3859            "q2" => m.q2_memory_bytes.fetch_add(size_u64, Ordering::Relaxed),
3860            "q3" => m.q3_memory_bytes.fetch_add(size_u64, Ordering::Relaxed),
3861            "q4" => m.q4_memory_bytes.fetch_add(size_u64, Ordering::Relaxed),
3862            "q5" => m.q5_memory_bytes.fetch_add(size_u64, Ordering::Relaxed),
3863            "q6" => m.q6_memory_bytes.fetch_add(size_u64, Ordering::Relaxed),
3864            "q7" => m.q7_memory_bytes.fetch_add(size_u64, Ordering::Relaxed),
3865            _ => 0,
3866        };
3867    }
3868
3869    /// Track queue memory removal (queue memory is separate from processing memory).
3870    /// Uses saturating subtraction to prevent u64 underflow from estimation mismatches.
3871    pub fn track_queue_memory_remove(&self, queue_name: &str, size: usize) {
3872        let m = &self.memory;
3873        let counter = match queue_name {
3874            "q1" => &m.q1_memory_bytes,
3875            "q2" => &m.q2_memory_bytes,
3876            "q3" => &m.q3_memory_bytes,
3877            "q4" => &m.q4_memory_bytes,
3878            "q5" => &m.q5_memory_bytes,
3879            "q6" => &m.q6_memory_bytes,
3880            "q7" => &m.q7_memory_bytes,
3881            _ => return,
3882        };
3883        let size_u64 = size as u64;
3884        let mut current = counter.load(Ordering::Relaxed);
3885        loop {
3886            let new_val = current.saturating_sub(size_u64);
3887            match counter.compare_exchange_weak(
3888                current,
3889                new_val,
3890                Ordering::Relaxed,
3891                Ordering::Relaxed,
3892            ) {
3893                Ok(_) => break,
3894                Err(actual) => current = actual,
3895            }
3896        }
3897    }
3898
3899    /// Track position group processing memory (separate from queue memory).
3900    /// Uses saturating subtraction to prevent u64 underflow from estimation mismatches.
3901    pub fn track_position_group_memory(&self, size: usize, is_allocation: bool) {
3902        let counter = &self.memory.position_group_processing_bytes;
3903        let size_u64 = size as u64;
3904        if is_allocation {
3905            counter.fetch_add(size_u64, Ordering::Relaxed);
3906        } else {
3907            let mut current = counter.load(Ordering::Relaxed);
3908            loop {
3909                let new_val = current.saturating_sub(size_u64);
3910                match counter.compare_exchange_weak(
3911                    current,
3912                    new_val,
3913                    Ordering::Relaxed,
3914                    Ordering::Relaxed,
3915                ) {
3916                    Ok(_) => break,
3917                    Err(actual) => current = actual,
3918                }
3919            }
3920        }
3921    }
3922
3923    /// Track template processing memory (separate from queue memory).
3924    /// Uses saturating subtraction to prevent u64 underflow from estimation mismatches.
3925    pub fn track_template_memory(&self, size: usize, is_allocation: bool) {
3926        let counter = &self.memory.template_processing_bytes;
3927        let size_u64 = size as u64;
3928        if is_allocation {
3929            counter.fetch_add(size_u64, Ordering::Relaxed);
3930        } else {
3931            let mut current = counter.load(Ordering::Relaxed);
3932            loop {
3933                let new_val = current.saturating_sub(size_u64);
3934                match counter.compare_exchange_weak(
3935                    current,
3936                    new_val,
3937                    Ordering::Relaxed,
3938                    Ordering::Relaxed,
3939                ) {
3940                    Ok(_) => break,
3941                    Err(actual) => current = actual,
3942                }
3943            }
3944        }
3945    }
3946
3947    /// Update system RSS.
3948    pub fn update_system_rss(&self, rss_bytes: u64) {
3949        self.memory.system_rss_bytes.store(rss_bytes, Ordering::Relaxed);
3950    }
3951
3952    /// Set infrastructure memory estimates (call once at pipeline startup).
3953    pub fn set_infrastructure_memory(&self, num_threads: usize, queue_capacity: usize) {
3954        let m = &self.memory;
3955        m.decompressor_memory_bytes.store(num_threads as u64 * 32 * 1024, Ordering::Relaxed);
3956        m.compressor_memory_bytes.store(num_threads as u64 * 280 * 1024, Ordering::Relaxed);
3957        m.worker_buffer_memory_bytes.store(num_threads as u64 * 512 * 1024, Ordering::Relaxed);
3958        m.io_buffer_memory_bytes.store(16u64 * 1024 * 1024, Ordering::Relaxed);
3959        m.thread_stack_memory_bytes
3960            .store((num_threads as u64 + 1) * 2 * 1024 * 1024, Ordering::Relaxed);
3961        m.queue_capacity_memory_bytes.store(7u64 * queue_capacity as u64 * 128, Ordering::Relaxed);
3962    }
3963
3964    /// Update queue memory stats from actual queues (call periodically during monitoring)
3965    pub fn update_queue_memory_from_external(&self, queue_stats: &[(&str, u64)]) {
3966        let m = &self.memory;
3967        for (queue_name, current_bytes) in queue_stats {
3968            match *queue_name {
3969                "q1" => m.q1_memory_bytes.store(*current_bytes, Ordering::Relaxed),
3970                "q2" => m.q2_memory_bytes.store(*current_bytes, Ordering::Relaxed),
3971                "q3" => m.q3_memory_bytes.store(*current_bytes, Ordering::Relaxed),
3972                "q4" => m.q4_memory_bytes.store(*current_bytes, Ordering::Relaxed),
3973                "q5" => m.q5_memory_bytes.store(*current_bytes, Ordering::Relaxed),
3974                "q6" => m.q6_memory_bytes.store(*current_bytes, Ordering::Relaxed),
3975                "q7" => m.q7_memory_bytes.store(*current_bytes, Ordering::Relaxed),
3976                _ => {}
3977            }
3978        }
3979    }
3980
3981    /// Get current memory breakdown
3982    pub fn get_memory_breakdown(&self) -> MemoryBreakdown {
3983        let m = &self.memory;
3984
3985        // Load each counter once to avoid divergence under contention
3986        let q1 = m.q1_memory_bytes.load(Ordering::Relaxed);
3987        let q2 = m.q2_memory_bytes.load(Ordering::Relaxed);
3988        let q3 = m.q3_memory_bytes.load(Ordering::Relaxed);
3989        let q4 = m.q4_memory_bytes.load(Ordering::Relaxed);
3990        let q5 = m.q5_memory_bytes.load(Ordering::Relaxed);
3991        let q6 = m.q6_memory_bytes.load(Ordering::Relaxed);
3992        let q7 = m.q7_memory_bytes.load(Ordering::Relaxed);
3993        let queue_total = q1 + q2 + q3 + q4 + q5 + q6 + q7;
3994
3995        let pos_groups = m.position_group_processing_bytes.load(Ordering::Relaxed);
3996        let templates = m.template_processing_bytes.load(Ordering::Relaxed);
3997        let reorder = m.reorder_buffer_bytes.load(Ordering::Relaxed);
3998        let grouper = m.grouper_memory_bytes.load(Ordering::Relaxed);
3999        let worker_local = m.worker_local_memory_bytes.load(Ordering::Relaxed);
4000        let processing_total = pos_groups + templates + reorder + grouper + worker_local;
4001
4002        let infra_decompressors = m.decompressor_memory_bytes.load(Ordering::Relaxed);
4003        let infra_compressors = m.compressor_memory_bytes.load(Ordering::Relaxed);
4004        let infra_buffers = m.worker_buffer_memory_bytes.load(Ordering::Relaxed);
4005        let infra_io = m.io_buffer_memory_bytes.load(Ordering::Relaxed);
4006        let infra_stacks = m.thread_stack_memory_bytes.load(Ordering::Relaxed);
4007        let infra_queues = m.queue_capacity_memory_bytes.load(Ordering::Relaxed);
4008        let infra_total = infra_decompressors
4009            + infra_compressors
4010            + infra_buffers
4011            + infra_io
4012            + infra_stacks
4013            + infra_queues;
4014
4015        let tracked_total = queue_total + processing_total + infra_total;
4016        let system_rss = m.system_rss_bytes.load(Ordering::Relaxed);
4017        let untracked = system_rss.saturating_sub(tracked_total);
4018
4019        MemoryBreakdown {
4020            system_rss_gb: system_rss as f64 / 1e9,
4021            tracked_total_gb: tracked_total as f64 / 1e9,
4022            untracked_gb: untracked as f64 / 1e9,
4023
4024            q1_mb: q1 as f64 / 1e6,
4025            q2_mb: q2 as f64 / 1e6,
4026            q3_mb: q3 as f64 / 1e6,
4027            q4_gb: q4 as f64 / 1e9,
4028            q5_gb: q5 as f64 / 1e9,
4029            q6_mb: q6 as f64 / 1e6,
4030            q7_mb: q7 as f64 / 1e6,
4031
4032            position_groups_gb: pos_groups as f64 / 1e9,
4033            templates_gb: templates as f64 / 1e9,
4034            reorder_buffers_mb: reorder as f64 / 1e6,
4035            grouper_mb: grouper as f64 / 1e6,
4036            worker_local_mb: worker_local as f64 / 1e6,
4037
4038            decompressors_mb: infra_decompressors as f64 / 1e6,
4039            compressors_mb: infra_compressors as f64 / 1e6,
4040            worker_buffers_mb: infra_buffers as f64 / 1e6,
4041            io_buffers_mb: infra_io as f64 / 1e6,
4042            thread_stacks_mb: infra_stacks as f64 / 1e6,
4043            queue_capacity_mb: infra_queues as f64 / 1e6,
4044            infrastructure_gb: infra_total as f64 / 1e9,
4045        }
4046    }
4047}
4048
4049// ============================================================================
4050// Shared Traits for Steps 4-7 (used by both BAM and FASTQ pipelines)
4051// ============================================================================
4052
4053/// Trait for accessing shared pipeline state needed by steps 5-7.
4054///
4055/// This allows the serialize, compress, and write steps to be generic over
4056/// both BAM and FASTQ pipelines, reducing code duplication.
4057pub trait OutputPipelineState: Send + Sync {
4058    /// The type of processed items (output of process step).
4059    type Processed: Send;
4060
4061    // Error handling
4062    fn has_error(&self) -> bool;
4063    fn set_error(&self, error: io::Error);
4064
4065    // Queue 5 access (Serialize → Compress)
4066    fn q5_pop(&self) -> Option<(u64, SerializedBatch)>;
4067    /// # Errors
4068    ///
4069    /// Returns the item if the queue is full.
4070    fn q5_push(&self, item: (u64, SerializedBatch)) -> Result<(), (u64, SerializedBatch)>;
4071    fn q5_is_full(&self) -> bool;
4072    /// Track memory released when popping from Q5.
4073    fn q5_track_pop(&self, _heap_size: u64) {}
4074
4075    // Queue 6 access (Compress → Write)
4076    fn q6_pop(&self) -> Option<(u64, CompressedBlockBatch)>;
4077    /// # Errors
4078    ///
4079    /// Returns the item if the queue is full.
4080    fn q6_push(&self, item: (u64, CompressedBlockBatch))
4081    -> Result<(), (u64, CompressedBlockBatch)>;
4082    fn q6_is_full(&self) -> bool;
4083    /// Track memory released when popping from Q6.
4084    fn q6_track_pop(&self, _heap_size: u64) {}
4085
4086    // Reorder buffer for Q6
4087    fn q6_reorder_insert(&self, serial: u64, batch: CompressedBlockBatch);
4088    fn q6_reorder_try_pop_next(&self) -> Option<CompressedBlockBatch>;
4089
4090    // Output access
4091    fn output_try_lock(&self)
4092    -> Option<parking_lot::MutexGuard<'_, Option<Box<dyn Write + Send>>>>;
4093
4094    // Completion tracking
4095    fn increment_written(&self) -> u64;
4096
4097    // Throughput metrics (optional, with default no-op implementation)
4098    /// Record compressed bytes output from compress step.
4099    fn record_compressed_bytes_out(&self, _bytes: u64) {}
4100
4101    // Deadlock detection progress tracking (optional, with default no-op)
4102    /// Record progress when popping from serialized queue (Q6).
4103    fn record_q6_pop_progress(&self) {}
4104    /// Record progress when pushing to compressed queue (Q7).
4105    fn record_q7_push_progress(&self) {}
4106
4107    // Stats access (optional, with default no-op implementation)
4108    /// Get reference to pipeline stats for recording metrics.
4109    fn stats(&self) -> Option<&PipelineStats> {
4110        None
4111    }
4112}
4113
4114/// Trait for types that have a BGZF compressor.
4115pub trait HasCompressor {
4116    fn compressor_mut(&mut self) -> &mut InlineBgzfCompressor;
4117}
4118
4119/// Trait for worker states that support buffer recycling.
4120///
4121/// Used by the Serialize and Compress steps to reuse `Vec<u8>` buffers
4122/// instead of allocating fresh ones each batch. The Compress step recycles
4123/// consumed `SerializedBatch` data buffers, and the Serialize step pops
4124/// recycled buffers instead of calling `Vec::with_capacity`.
4125pub trait HasRecycledBuffers {
4126    /// Take a recycled buffer or allocate a new one with the given capacity.
4127    fn take_or_alloc_buffer(&mut self, capacity: usize) -> Vec<u8>;
4128    /// Return a buffer for recycling. Keeps at most 2 buffers.
4129    fn recycle_buffer(&mut self, buf: Vec<u8>);
4130}
4131
4132/// Trait for worker states that may hold items between iterations.
4133///
4134/// When a worker tries to push to a full queue, it "holds" the item and returns
4135/// immediately (non-blocking). The next iteration tries to advance the held item
4136/// before doing new work.
4137///
4138/// **CRITICAL**: Worker loops MUST check `has_any_held_items()` before exiting!
4139/// If a worker exits while holding items, that data is lost. The correct exit
4140/// condition is:
4141///
4142/// ```ignore
4143/// if state.is_complete() && !worker.has_any_held_items() {
4144///     break;
4145/// }
4146/// ```
4147///
4148/// This trait ensures both BAM and FASTQ pipelines use the same exit logic,
4149/// preventing the race condition where data in held items is lost on exit.
4150pub trait WorkerStateCommon {
4151    /// Check if this worker is holding any items that haven't been pushed yet.
4152    ///
4153    /// Returns `true` if any `held_*` field contains data. Used to prevent
4154    /// premature exit from the worker loop - workers must not exit while
4155    /// holding data or it will be lost.
4156    fn has_any_held_items(&self) -> bool;
4157
4158    /// Clear all held items (used during error recovery or cleanup).
4159    ///
4160    /// **WARNING**: This discards data! Only use when the pipeline is in an
4161    /// error state and data loss is acceptable.
4162    fn clear_held_items(&mut self);
4163}
4164
4165/// Trait for workers that have a `WorkerCoreState`.
4166///
4167/// This enables shared worker loop functions to access scheduler and backoff state.
4168pub trait HasWorkerCore {
4169    fn core(&self) -> &WorkerCoreState;
4170    fn core_mut(&mut self) -> &mut WorkerCoreState;
4171}
4172
4173/// Handle worker backoff with optional idle time tracking.
4174///
4175/// This consolidates the common backoff pattern used in both BAM and FASTQ worker loops:
4176/// - If work was done, reset backoff
4177/// - If no work, mark thread as idle, sleep and record idle time (if stats enabled), then increase backoff
4178#[inline]
4179#[allow(clippy::cast_possible_truncation)]
4180pub fn handle_worker_backoff<W: HasWorkerCore>(
4181    worker: &mut W,
4182    stats: Option<&PipelineStats>,
4183    did_work: bool,
4184) {
4185    if did_work {
4186        worker.core_mut().reset_backoff();
4187    } else {
4188        if let Some(stats) = stats {
4189            let tid = worker.core().scheduler.thread_id();
4190            // Mark thread as idle before sleeping so activity snapshots are accurate
4191            stats.clear_current_step(tid);
4192            let idle_start = Instant::now();
4193            worker.core_mut().sleep_backoff();
4194            stats.record_idle_for_thread(tid, idle_start.elapsed().as_nanos() as u64);
4195        } else {
4196            worker.core_mut().sleep_backoff();
4197        }
4198        worker.core_mut().increase_backoff();
4199    }
4200}
4201
4202// ============================================================================
4203// Generic Worker Loop (Consolidated Implementation)
4204// ============================================================================
4205
4206/// Trait for pipeline step execution context.
4207///
4208/// This trait abstracts over the differences between BAM and FASTQ pipelines,
4209/// allowing a single `generic_worker_loop` implementation to handle both.
4210/// Each pipeline provides a context struct that implements this trait.
4211pub trait StepContext {
4212    /// The worker type for this pipeline (e.g., `WorkerState<P>` or `FastqWorkerState<P>`)
4213    type Worker: WorkerStateCommon + HasWorkerCore;
4214
4215    /// Execute a pipeline step, returning `(success, was_contention)`.
4216    fn execute_step(&self, worker: &mut Self::Worker, step: PipelineStep) -> (bool, bool);
4217
4218    /// Compute backpressure state for the scheduler.
4219    fn get_backpressure(&self, worker: &Self::Worker) -> BackpressureState;
4220
4221    /// Check and set drain mode if appropriate (called each iteration).
4222    fn check_drain_mode(&self);
4223
4224    /// Check if the pipeline has encountered an error.
4225    fn has_error(&self) -> bool;
4226
4227    /// Check if the pipeline is complete.
4228    fn is_complete(&self) -> bool;
4229
4230    /// Get stats for recording (if enabled).
4231    fn stats(&self) -> Option<&PipelineStats>;
4232
4233    /// Whether this context should skip the Read step.
4234    /// Returns `true` for non-reader worker threads.
4235    fn skip_read(&self) -> bool;
4236
4237    /// Whether to check completion at end of loop iteration (original BAM behavior).
4238    /// If false (default), checks at start (original FASTQ behavior).
4239    fn check_completion_at_end(&self) -> bool {
4240        false
4241    }
4242
4243    // -------------------------------------------------------------------------
4244    // Sticky Read Methods (consolidated)
4245    // -------------------------------------------------------------------------
4246
4247    /// Fast check before entering sticky read loop.
4248    /// BAM uses this to skip the loop entirely when `read_done` is true.
4249    /// Default returns false (no sticky read).
4250    fn should_attempt_sticky_read(&self) -> bool {
4251        false
4252    }
4253
4254    /// Condition for continuing the sticky read loop.
4255    /// Called before each read attempt.
4256    /// - BAM: `!error && !read_done && q1.len() < capacity`
4257    /// - FASTQ: `true` (relies on `execute_read_step` returning false)
4258    fn sticky_read_should_continue(&self) -> bool {
4259        false
4260    }
4261
4262    /// Execute the read step. Returns true if read succeeded.
4263    fn execute_read_step(&self, _worker: &mut Self::Worker) -> bool {
4264        false
4265    }
4266
4267    /// Get drain mode flag for exclusive step relaxation.
4268    /// Default is false.
4269    fn is_drain_mode(&self) -> bool {
4270        false
4271    }
4272
4273    /// Check if worker should attempt an exclusive step.
4274    /// Default implementation always returns true (no ownership checks).
4275    fn should_attempt_step(
4276        &self,
4277        _worker: &Self::Worker,
4278        _step: PipelineStep,
4279        _drain_mode: bool,
4280    ) -> bool {
4281        true
4282    }
4283
4284    /// Get the exclusive step owned by this worker (if any).
4285    /// Used by BAM to prioritize owned steps before the normal priority loop.
4286    /// Default returns None (no exclusive step ownership).
4287    fn exclusive_step_owned(&self, _worker: &Self::Worker) -> Option<PipelineStep> {
4288        None
4289    }
4290}
4291
4292/// Generic worker loop implementation used by both BAM and FASTQ pipelines.
4293///
4294/// This consolidates the duplicated worker loop logic into a single function.
4295/// The `StepContext` trait provides pipeline-specific behavior.
4296#[allow(clippy::too_many_lines, clippy::cast_possible_truncation)]
4297pub fn generic_worker_loop<C: StepContext>(ctx: &C, worker: &mut C::Worker) {
4298    let collect_stats = ctx.stats().is_some();
4299    let check_completion_at_end = ctx.check_completion_at_end();
4300
4301    loop {
4302        // Check for errors
4303        if ctx.has_error() {
4304            break;
4305        }
4306
4307        // Check for completion at start (FASTQ behavior, default)
4308        // CRITICAL: Don't exit while holding items - they would be lost!
4309        if !check_completion_at_end && ctx.is_complete() && !worker.has_any_held_items() {
4310            break;
4311        }
4312
4313        let mut did_work = false;
4314
4315        // Sticky read for reader threads (fills Q1 before doing other work)
4316        // Uses outer guard to skip when not applicable (BAM optimization)
4317        if ctx.should_attempt_sticky_read() {
4318            while ctx.sticky_read_should_continue() {
4319                // Record attempt before executing
4320                if let Some(stats) = ctx.stats() {
4321                    stats.record_step_attempt(
4322                        worker.core().scheduler.thread_id(),
4323                        PipelineStep::Read,
4324                    );
4325                }
4326
4327                let success = if collect_stats {
4328                    let start = Instant::now();
4329                    let success = ctx.execute_read_step(worker);
4330                    if success {
4331                        if let Some(stats) = ctx.stats() {
4332                            stats.record_step_for_thread(
4333                                PipelineStep::Read,
4334                                start.elapsed().as_nanos() as u64,
4335                                Some(worker.core().scheduler.thread_id()),
4336                            );
4337                        }
4338                    }
4339                    success
4340                } else {
4341                    ctx.execute_read_step(worker)
4342                };
4343
4344                if success {
4345                    did_work = true;
4346                } else {
4347                    break;
4348                }
4349            }
4350        }
4351
4352        // Check/set drain mode
4353        ctx.check_drain_mode();
4354
4355        // Get step priorities from scheduler
4356        let backpressure = ctx.get_backpressure(worker);
4357        let priorities_slice = worker.core_mut().scheduler.get_priorities(backpressure);
4358        let priority_count = priorities_slice.len().min(9);
4359        let mut priorities = [PipelineStep::Read; 9];
4360        priorities[..priority_count].copy_from_slice(&priorities_slice[..priority_count]);
4361
4362        let drain_mode = ctx.is_drain_mode();
4363
4364        // BAM optimization: try owned exclusive step first (before priority loop)
4365        // This prevents starvation: since only this thread can do the step, prioritize it
4366        let owned_step = ctx.exclusive_step_owned(worker);
4367        if let Some(step) = owned_step {
4368            if step != PipelineStep::Read && !ctx.has_error() {
4369                // Record attempt
4370                if let Some(stats) = ctx.stats() {
4371                    stats.record_step_attempt(worker.core().scheduler.thread_id(), step);
4372                }
4373
4374                let (success, elapsed_ns, was_contention) = if collect_stats {
4375                    let start = Instant::now();
4376                    let (success, was_contention) = ctx.execute_step(worker, step);
4377                    (success, start.elapsed().as_nanos() as u64, was_contention)
4378                } else {
4379                    let (success, was_contention) = ctx.execute_step(worker, step);
4380                    (success, 0, was_contention)
4381                };
4382
4383                worker.core_mut().scheduler.record_outcome(step, success, was_contention);
4384
4385                if success {
4386                    if let Some(stats) = ctx.stats() {
4387                        stats.record_step_for_thread(
4388                            step,
4389                            elapsed_ns,
4390                            Some(worker.core().scheduler.thread_id()),
4391                        );
4392                    }
4393                    did_work = true;
4394                }
4395            }
4396        }
4397
4398        // Execute steps in priority order (if owned step didn't succeed)
4399        if !did_work {
4400            for &step in &priorities[..priority_count] {
4401                if ctx.has_error() {
4402                    break;
4403                }
4404
4405                // Skip Read step for non-reader workers
4406                if ctx.skip_read() && step == PipelineStep::Read {
4407                    continue;
4408                }
4409
4410                // Skip the owned step (already tried above)
4411                if Some(step) == owned_step {
4412                    continue;
4413                }
4414
4415                // Skip exclusive steps this worker doesn't own (BAM optimization)
4416                if !ctx.should_attempt_step(worker, step, drain_mode) {
4417                    continue;
4418                }
4419
4420                // Record attempt
4421                if let Some(stats) = ctx.stats() {
4422                    stats.record_step_attempt(worker.core().scheduler.thread_id(), step);
4423                }
4424
4425                // Execute with timing
4426                let (success, elapsed_ns, was_contention) = if collect_stats {
4427                    let start = Instant::now();
4428                    let (success, was_contention) = ctx.execute_step(worker, step);
4429                    (success, start.elapsed().as_nanos() as u64, was_contention)
4430                } else {
4431                    let (success, was_contention) = ctx.execute_step(worker, step);
4432                    (success, 0, was_contention)
4433                };
4434
4435                // Record outcome for adaptive scheduler
4436                worker.core_mut().scheduler.record_outcome(step, success, was_contention);
4437
4438                if success {
4439                    if let Some(stats) = ctx.stats() {
4440                        stats.record_step_for_thread(
4441                            step,
4442                            elapsed_ns,
4443                            Some(worker.core().scheduler.thread_id()),
4444                        );
4445                    }
4446                    did_work = true;
4447                    break; // Restart priority evaluation
4448                }
4449            }
4450        }
4451
4452        // Check for completion at end (original BAM behavior)
4453        // CRITICAL: Don't exit while holding items - they would be lost!
4454        if check_completion_at_end && ctx.is_complete() && !worker.has_any_held_items() {
4455            break;
4456        }
4457
4458        // Backoff handling
4459        handle_worker_backoff(worker, ctx.stats(), did_work);
4460    }
4461}
4462
4463/// Trait for workers that can hold compressed batches when output queue is full.
4464///
4465/// This trait enables non-blocking compress steps. When the output queue is full,
4466/// instead of blocking, the worker holds the compressed batch and returns
4467/// `StepResult::OutputFull`. The next time the compress step runs, it first
4468/// tries to advance the held batch before processing new work.
4469pub trait HasHeldCompressed {
4470    /// Get mutable reference to the held compressed batch.
4471    /// Includes `heap_size` for memory tracking.
4472    fn held_compressed_mut(&mut self) -> &mut Option<(u64, CompressedBlockBatch, usize)>;
4473}
4474
4475/// Trait for workers that hold boundary batches when output queue is full.
4476///
4477/// IMPORTANT: This pattern must be kept in sync between BAM and FASTQ pipelines.
4478/// See: bam.rs `try_step_find_boundaries()` and fastq.rs `fastq_try_step_find_boundaries()`
4479///
4480/// The pattern:
4481/// 1. Check/advance held item first (priority 1)
4482/// 2. Acquire ordering lock (BAM: `boundary_state`, FASTQ: `boundary_lock`)
4483/// 3. Brief lock for reorder buffer insert/pop
4484/// 4. Do boundary work (under ordering lock to ensure sequential processing)
4485/// 5. Push result or hold if output queue full
4486///
4487/// Note: FASTQ requires strict ordering due to per-stream leftover state, so it
4488/// uses a separate `boundary_lock`. BAM uses `boundary_state` which serves both
4489/// as the ordering lock and contains the boundary-finding state.
4490pub trait HasHeldBoundaries<B> {
4491    fn held_boundaries_mut(&mut self) -> &mut Option<(u64, B)>;
4492}
4493
4494// ============================================================================
4495// Shared Step Functions (used by both BAM and FASTQ pipelines)
4496// ============================================================================
4497
4498/// Shared Step: Compress serialized data to BGZF blocks (non-blocking).
4499///
4500/// This step is parallel - multiple threads can compress concurrently.
4501/// It takes data from Q5, compresses it using the worker's compressor,
4502/// and pushes the resulting blocks to Q6.
4503///
4504/// # Non-Blocking Behavior
4505///
4506/// Instead of blocking when Q6 is full, this function:
4507/// 1. First tries to advance any held compressed batch from a previous attempt
4508/// 2. If held batch can't be advanced, returns `OutputFull` immediately
4509/// 3. Otherwise, processes new work and tries to push the result
4510/// 4. If push fails, holds the result for the next attempt
4511///
4512/// This prevents deadlock by ensuring workers can always return and try
4513/// other steps (especially Write to drain queues).
4514pub fn shared_try_step_compress<S, W>(state: &S, worker: &mut W) -> StepResult
4515where
4516    S: OutputPipelineState,
4517    W: HasCompressor + HasHeldCompressed + HasRecycledBuffers,
4518{
4519    // =========================================================================
4520    // Priority 1: Try to advance any held compressed batch
4521    // =========================================================================
4522    if let Some((serial, held, _heap_size)) = worker.held_compressed_mut().take() {
4523        match state.q6_push((serial, held)) {
4524            Ok(()) => {
4525                // Successfully advanced held item, continue to process more
4526                state.record_q7_push_progress();
4527            }
4528            Err((serial, held)) => {
4529                // Still can't push - put it back and signal output full
4530                let heap_size = held.estimate_heap_size();
4531                *worker.held_compressed_mut() = Some((serial, held, heap_size));
4532                return StepResult::OutputFull;
4533            }
4534        }
4535    }
4536
4537    // =========================================================================
4538    // Priority 2: Check if output queue has space (soft check)
4539    // =========================================================================
4540    if state.q6_is_full() {
4541        return StepResult::OutputFull;
4542    }
4543
4544    // =========================================================================
4545    // Priority 3: Pop from Q5
4546    // =========================================================================
4547    let Some((serial, serialized)) = state.q5_pop() else {
4548        if let Some(stats) = state.stats() {
4549            stats.record_queue_empty(6);
4550        }
4551        return StepResult::InputEmpty;
4552    };
4553    state.record_q6_pop_progress();
4554
4555    // Track memory released from Q5
4556    let q5_heap_size = serialized.estimate_heap_size() as u64;
4557    state.q5_track_pop(q5_heap_size);
4558
4559    // =========================================================================
4560    // Priority 4: Compress the data
4561    // =========================================================================
4562    let SerializedBatch { data, record_count, secondary_data } = serialized;
4563
4564    // Scope the compressor borrow so we can recycle the buffer afterwards
4565    let blocks = {
4566        let compressor = worker.compressor_mut();
4567
4568        if let Err(e) = compressor.write_all(&data) {
4569            state.set_error(e);
4570            return StepResult::InputEmpty;
4571        }
4572        if let Err(e) = compressor.flush() {
4573            state.set_error(e);
4574            return StepResult::InputEmpty;
4575        }
4576
4577        // Take the compressed blocks, preserving the record count
4578        compressor.take_blocks()
4579    };
4580
4581    // Recycle the serialized data buffer for reuse in Serialize step
4582    worker.recycle_buffer(data);
4583
4584    // Record compressed bytes for throughput metrics
4585    let compressed_bytes: u64 = blocks.iter().map(|b| b.data.len() as u64).sum();
4586    state.record_compressed_bytes_out(compressed_bytes);
4587
4588    let batch = CompressedBlockBatch { blocks, record_count, secondary_data };
4589
4590    // =========================================================================
4591    // Priority 5: Try to push result
4592    // =========================================================================
4593    match state.q6_push((serial, batch)) {
4594        Ok(()) => {
4595            state.record_q7_push_progress();
4596            StepResult::Success
4597        }
4598        Err((serial, batch)) => {
4599            // Output full - hold the result for next attempt
4600            let heap_size = batch.estimate_heap_size();
4601            *worker.held_compressed_mut() = Some((serial, batch, heap_size));
4602            StepResult::OutputFull
4603        }
4604    }
4605}
4606
4607/// Shared Step: Write compressed blocks to output.
4608///
4609/// This step is exclusive - only one thread at a time can write.
4610/// It drains Q6 into a reorder buffer, then writes batches in order.
4611///
4612/// Note: Currently unused as both pipelines have their own write functions
4613/// with specific features (BAM: stats/progress logging, FASTQ: similar).
4614/// Kept for future unification in Phase 2b.
4615#[allow(dead_code)]
4616fn shared_try_step_write<S: OutputPipelineState>(state: &S) -> bool {
4617    // Try to acquire exclusive access to output file FIRST
4618    let Some(mut guard) = state.output_try_lock() else {
4619        return false; // Another thread is writing
4620    };
4621
4622    let Some(ref mut writer) = *guard else {
4623        return false; // File already closed
4624    };
4625
4626    // Drain Q6 into reorder buffer
4627    while let Some((serial, batch)) = state.q6_pop() {
4628        let q7_heap = batch.estimate_heap_size() as u64;
4629        state.q6_track_pop(q7_heap);
4630        state.q6_reorder_insert(serial, batch);
4631    }
4632
4633    // Write all ready batches in order
4634    let mut wrote_any = false;
4635    while let Some(batch) = state.q6_reorder_try_pop_next() {
4636        // Write all blocks in the batch
4637        for block in &batch.blocks {
4638            if let Err(e) = writer.write_all(&block.data) {
4639                state.set_error(e);
4640                return false;
4641            }
4642        }
4643        state.increment_written();
4644        wrote_any = true;
4645    }
4646
4647    wrote_any
4648}
4649
4650// ============================================================================
4651// Held-Item Pattern Helpers
4652// ============================================================================
4653
4654/// Try to advance a held item to a queue.
4655///
4656/// Returns true if the item was successfully pushed (or nothing was held).
4657/// Returns false if the queue is still full and the item is still held.
4658///
4659/// This is a helper for the non-blocking held-item pattern used to prevent deadlock.
4660#[inline]
4661pub fn try_advance_held<T>(queue: &ArrayQueue<(u64, T)>, held: &mut Option<(u64, T)>) -> bool {
4662    if let Some((serial, item)) = held.take() {
4663        match queue.push((serial, item)) {
4664            Ok(()) => true,
4665            Err(returned) => {
4666                *held = Some(returned);
4667                false
4668            }
4669        }
4670    } else {
4671        true // Nothing held
4672    }
4673}
4674
4675/// Try to push an item to a queue, holding it if the queue is full.
4676///
4677/// Returns `Success` if pushed, `OutputFull` if held for later.
4678#[inline]
4679pub fn try_push_or_hold<T>(
4680    queue: &ArrayQueue<(u64, T)>,
4681    serial: u64,
4682    item: T,
4683    held: &mut Option<(u64, T)>,
4684) -> StepResult {
4685    match queue.push((serial, item)) {
4686        Ok(()) => StepResult::Success,
4687        Err(returned) => {
4688            *held = Some(returned);
4689            StepResult::OutputFull
4690        }
4691    }
4692}
4693
4694// ============================================================================
4695// Process Step Traits (Step 4)
4696// ============================================================================
4697
4698/// Trait for pipeline states that support the Process step.
4699///
4700/// This trait abstracts over the queue access patterns for the process step,
4701/// allowing `shared_try_step_process` to work with both BAM and FASTQ pipelines.
4702pub trait ProcessPipelineState<G, P>: Send + Sync {
4703    /// Pop a batch from the input queue (groups/templates).
4704    fn process_input_pop(&self) -> Option<(u64, Vec<G>)>;
4705
4706    /// Check if output queue is full.
4707    fn process_output_is_full(&self) -> bool;
4708
4709    /// Push processed results to output queue.
4710    ///
4711    /// # Errors
4712    ///
4713    /// Returns the item if the queue is full.
4714    fn process_output_push(&self, item: (u64, Vec<P>)) -> Result<(), (u64, Vec<P>)>;
4715
4716    /// Check if an error has occurred.
4717    fn has_error(&self) -> bool;
4718
4719    /// Set an error.
4720    fn set_error(&self, e: io::Error);
4721
4722    /// Check if backpressure should be applied before processing new work.
4723    /// Returns true if queue is full OR memory is high (unless draining).
4724    /// Default: just checks queue capacity (backwards compatible).
4725    fn should_apply_process_backpressure(&self) -> bool {
4726        self.process_output_is_full()
4727    }
4728
4729    /// Check if pipeline is in draining mode (completing after input exhausted).
4730    /// When draining, memory backpressure is bypassed to prevent deadlock.
4731    /// Default: false (backwards compatible).
4732    fn is_draining(&self) -> bool {
4733        false
4734    }
4735}
4736
4737/// Trait for workers that can hold processed batches.
4738///
4739/// This enables non-blocking process steps - when the output queue is full,
4740/// the worker holds the result and returns `OutputFull` instead of blocking.
4741pub trait HasHeldProcessed<P> {
4742    /// Get mutable reference to held processed batch.
4743    /// Includes `heap_size` for memory tracking.
4744    fn held_processed_mut(&mut self) -> &mut Option<(u64, Vec<P>, usize)>;
4745}
4746
4747/// Shared Process step - pops batch, applies `process_fn`, pushes results.
4748///
4749/// # Non-Blocking Behavior
4750///
4751/// 1. First tries to advance any held batch from a previous attempt
4752/// 2. If held batch can't be advanced, returns `OutputFull`
4753/// 3. Otherwise processes new work and tries to push
4754/// 4. If push fails, holds the result for next attempt
4755#[inline]
4756pub fn shared_try_step_process<S, W, G, P, F>(
4757    state: &S,
4758    worker: &mut W,
4759    process_fn: F,
4760) -> StepResult
4761where
4762    S: ProcessPipelineState<G, P>,
4763    W: HasHeldProcessed<P>,
4764    P: MemoryEstimate,
4765    F: Fn(G) -> io::Result<P>,
4766{
4767    // Priority 1: Advance held item
4768    let held = worker.held_processed_mut();
4769    if let Some((serial, items, _heap_size)) = held.take() {
4770        match state.process_output_push((serial, items)) {
4771            Ok(()) => {
4772                // Successfully pushed - memory tracking handled by trait impl
4773            }
4774            Err((serial, items)) => {
4775                // Re-calculate heap_size for accurate memory tracking
4776                let heap_size: usize = items.iter().map(MemoryEstimate::estimate_heap_size).sum();
4777                *held = Some((serial, items, heap_size));
4778                return StepResult::OutputFull;
4779            }
4780        }
4781    }
4782
4783    // Priority 2: Check errors
4784    if state.has_error() {
4785        return StepResult::InputEmpty;
4786    }
4787
4788    // Priority 3: Check backpressure (queue capacity AND memory)
4789    if state.should_apply_process_backpressure() {
4790        return StepResult::OutputFull;
4791    }
4792
4793    // Priority 4: Pop input
4794    let Some((serial, batch)) = state.process_input_pop() else {
4795        return StepResult::InputEmpty;
4796    };
4797
4798    // Priority 5: Process items
4799    let mut results = Vec::with_capacity(batch.len());
4800    for item in batch {
4801        match process_fn(item) {
4802            Ok(processed) => results.push(processed),
4803            Err(e) => {
4804                state.set_error(e);
4805                return StepResult::InputEmpty;
4806            }
4807        }
4808    }
4809
4810    // Priority 6: Push result
4811    match state.process_output_push((serial, results)) {
4812        Ok(()) => StepResult::Success,
4813        Err((serial, results)) => {
4814            // Calculate heap_size for accurate memory tracking
4815            let heap_size: usize = results.iter().map(MemoryEstimate::estimate_heap_size).sum();
4816            *worker.held_processed_mut() = Some((serial, results, heap_size));
4817            StepResult::OutputFull
4818        }
4819    }
4820}
4821
4822// ============================================================================
4823// Serialize Step Traits (Step 5)
4824// ============================================================================
4825
4826/// Trait for pipeline states that support the Serialize step.
4827///
4828/// This trait abstracts over the queue access patterns for the serialize step,
4829/// allowing `shared_try_step_serialize` to work with both BAM and FASTQ pipelines.
4830pub trait SerializePipelineState<P>: Send + Sync {
4831    /// Pop a batch from the input queue (processed items).
4832    fn serialize_input_pop(&self) -> Option<(u64, Vec<P>)>;
4833
4834    /// Check if output queue is full.
4835    fn serialize_output_is_full(&self) -> bool;
4836
4837    /// Push serialized batch to output queue.
4838    ///
4839    /// # Errors
4840    ///
4841    /// Returns the item if the queue is full.
4842    fn serialize_output_push(
4843        &self,
4844        item: (u64, SerializedBatch),
4845    ) -> Result<(), (u64, SerializedBatch)>;
4846
4847    /// Check if an error has occurred.
4848    fn has_error(&self) -> bool;
4849
4850    /// Set an error.
4851    fn set_error(&self, e: io::Error);
4852
4853    /// Record serialized bytes for throughput metrics (optional).
4854    fn record_serialized_bytes(&self, _bytes: u64) {}
4855
4856    /// Record serialized record count for completion tracking (optional).
4857    fn record_serialized_records(&self, _count: u64) {}
4858}
4859
4860/// Trait for workers that can hold serialized batches.
4861pub trait HasHeldSerialized {
4862    /// Get mutable reference to held serialized batch.
4863    /// Includes `heap_size` for memory tracking.
4864    fn held_serialized_mut(&mut self) -> &mut Option<(u64, SerializedBatch, usize)>;
4865
4866    /// Get mutable reference to serialization buffer (for reuse).
4867    fn serialization_buffer_mut(&mut self) -> &mut Vec<u8>;
4868
4869    /// Get the capacity for the serialization buffer.
4870    /// BAM uses 64KB, FASTQ uses 256KB.
4871    fn serialization_buffer_capacity(&self) -> usize;
4872}
4873
4874/// Shared Serialize step - pops batch, serializes items, pushes concatenated result.
4875///
4876/// Uses a buffer-based serialize function signature for efficiency: the `serialize_fn`
4877/// writes directly into the provided buffer, avoiding intermediate allocations.
4878///
4879/// # Non-Blocking Behavior
4880///
4881/// Same pattern as `shared_try_step_process` - holds result if output full.
4882#[inline]
4883pub fn shared_try_step_serialize<S, W, P, F>(
4884    state: &S,
4885    worker: &mut W,
4886    mut serialize_fn: F,
4887) -> StepResult
4888where
4889    S: SerializePipelineState<P>,
4890    W: HasHeldSerialized + HasRecycledBuffers,
4891    F: FnMut(P, &mut Vec<u8>) -> io::Result<u64>,
4892{
4893    // Priority 1: Advance held item
4894    if let Some((serial, held_batch, _heap_size)) = worker.held_serialized_mut().take() {
4895        match state.serialize_output_push((serial, held_batch)) {
4896            Ok(()) => {
4897                // Note: Memory tracking would be added here when needed
4898            }
4899            Err((serial, held_batch)) => {
4900                let heap_size = held_batch.estimate_heap_size();
4901                *worker.held_serialized_mut() = Some((serial, held_batch, heap_size));
4902                return StepResult::OutputFull;
4903            }
4904        }
4905    }
4906
4907    // Priority 2: Check errors
4908    if state.has_error() {
4909        return StepResult::InputEmpty;
4910    }
4911
4912    // Priority 3: Check output space
4913    if state.serialize_output_is_full() {
4914        return StepResult::OutputFull;
4915    }
4916
4917    // Priority 4: Pop input
4918    let Some((serial, batch)) = state.serialize_input_pop() else {
4919        return StepResult::InputEmpty;
4920    };
4921
4922    // Get capacity first (before mutable borrow of buffer)
4923    let capacity = worker.serialization_buffer_capacity();
4924
4925    // Priority 5: Serialize directly into buffer (no intermediate allocation)
4926    let total_records = {
4927        let buffer = worker.serialization_buffer_mut();
4928        buffer.clear();
4929        let mut total_records: u64 = 0;
4930
4931        for item in batch {
4932            match serialize_fn(item, buffer) {
4933                Ok(record_count) => {
4934                    total_records += record_count;
4935                }
4936                Err(e) => {
4937                    state.set_error(e);
4938                    return StepResult::InputEmpty;
4939                }
4940            }
4941        }
4942        total_records
4943    };
4944
4945    // Swap buffer - try recycled buffer first, fall back to fresh allocation
4946    let replacement = worker.take_or_alloc_buffer(capacity);
4947    let buffer = worker.serialization_buffer_mut();
4948    let data = std::mem::replace(buffer, replacement);
4949    state.record_serialized_bytes(data.len() as u64);
4950    state.record_serialized_records(total_records);
4951
4952    let result_batch = SerializedBatch { data, record_count: total_records, secondary_data: None };
4953
4954    // Priority 6: Push result
4955    match state.serialize_output_push((serial, result_batch)) {
4956        Ok(()) => StepResult::Success,
4957        Err((serial, result_batch)) => {
4958            let heap_size = result_batch.estimate_heap_size();
4959            *worker.held_serialized_mut() = Some((serial, result_batch, heap_size));
4960            StepResult::OutputFull
4961        }
4962    }
4963}
4964
4965// ============================================================================
4966// Write Step Traits (Step 7)
4967// ============================================================================
4968
4969/// Trait for pipeline states that support the Write step.
4970///
4971/// This trait abstracts over the write step's needs for queue access,
4972/// reorder buffer, and output file.
4973pub trait WritePipelineState: Send + Sync {
4974    /// Get reference to the input queue for write step.
4975    fn write_input_queue(&self) -> &ArrayQueue<(u64, CompressedBlockBatch)>;
4976
4977    /// Get reference to the reorder buffer for maintaining output order.
4978    fn write_reorder_buffer(&self) -> &Mutex<ReorderBuffer<CompressedBlockBatch>>;
4979
4980    /// Get reference to the output writer.
4981    fn write_output(&self) -> &Mutex<Option<Box<dyn Write + Send>>>;
4982
4983    /// Check if an error has occurred.
4984    fn has_error(&self) -> bool;
4985
4986    /// Set an error.
4987    fn set_error(&self, e: io::Error);
4988
4989    /// Record that records were written.
4990    fn record_written(&self, count: u64);
4991
4992    /// Get reference to stats for recording contention (optional).
4993    fn stats(&self) -> Option<&PipelineStats>;
4994}
4995
4996/// Shared Write step - drains queue to reorder buffer, writes in order.
4997///
4998/// Uses a two-phase approach to reduce contention:
4999/// - **Phase 1**: Drain Q7 into the reorder buffer (only needs reorder lock).
5000///   Any thread can do this, reducing the frequency of output lock contention.
5001/// - **Phase 2**: Write ready batches to output (needs output lock).
5002///   Re-drains under both locks to catch items arriving between phases.
5003///
5004/// Returns `Success` if any data was written, `InputEmpty` otherwise.
5005pub fn shared_try_step_write_new<S: WritePipelineState>(state: &S) -> StepResult {
5006    if state.has_error() {
5007        return StepResult::InputEmpty;
5008    }
5009
5010    // Phase 1: Drain Q7 into reorder buffer (only needs reorder lock)
5011    {
5012        let mut reorder = state.write_reorder_buffer().lock();
5013        let queue = state.write_input_queue();
5014        while let Some((serial, batch)) = queue.pop() {
5015            reorder.insert(serial, batch);
5016        }
5017    }
5018    // Reorder lock released here
5019
5020    // Phase 2: Write ready batches (needs output lock)
5021    let Some(mut output_guard) = state.write_output().try_lock() else {
5022        if let Some(stats) = state.stats() {
5023            stats.record_contention(PipelineStep::Write);
5024        }
5025        // Q7 drain already happened (phase 1), so contention here
5026        // only means another thread is writing, not that Q7 is blocked
5027        return StepResult::InputEmpty;
5028    };
5029
5030    let Some(ref mut output) = *output_guard else {
5031        return StepResult::InputEmpty;
5032    };
5033
5034    let mut wrote_any = false;
5035    {
5036        let mut reorder = state.write_reorder_buffer().lock();
5037        let queue = state.write_input_queue();
5038
5039        // Drain any additional items that arrived since phase 1
5040        while let Some((serial, batch)) = queue.pop() {
5041            reorder.insert(serial, batch);
5042        }
5043
5044        // Write all ready batches in sequence order
5045        while let Some(batch) = reorder.try_pop_next() {
5046            for block in &batch.blocks {
5047                if let Err(e) = output.write_all(&block.data) {
5048                    state.set_error(e);
5049                    return StepResult::InputEmpty;
5050                }
5051            }
5052            state.record_written(batch.record_count);
5053            wrote_any = true;
5054        }
5055    }
5056
5057    if wrote_any { StepResult::Success } else { StepResult::InputEmpty }
5058}
5059
5060// ============================================================================
5061// Tests
5062// ============================================================================
5063
5064#[cfg(test)]
5065mod tests {
5066    use super::*;
5067
5068    #[test]
5069    fn test_stats_record_step_timing() {
5070        let stats = PipelineStats::new();
5071
5072        // Record some step timing
5073        stats.record_step(PipelineStep::Decompress, 1_000_000); // 1ms
5074        stats.record_step(PipelineStep::Decompress, 2_000_000); // 2ms
5075
5076        assert_eq!(stats.step_decompress_ns.load(Ordering::Relaxed), 3_000_000);
5077        assert_eq!(stats.step_decompress_count.load(Ordering::Relaxed), 2);
5078    }
5079
5080    #[test]
5081    fn test_stats_record_step_for_thread() {
5082        let stats = PipelineStats::new();
5083
5084        stats.record_step_for_thread(PipelineStep::Read, 500_000, Some(0));
5085        stats.record_step_for_thread(PipelineStep::Read, 500_000, Some(1));
5086
5087        assert_eq!(stats.step_read_ns.load(Ordering::Relaxed), 1_000_000);
5088        assert_eq!(stats.step_read_count.load(Ordering::Relaxed), 2);
5089        assert_eq!(stats.per_thread_step_counts[0][0].load(Ordering::Relaxed), 1);
5090        assert_eq!(stats.per_thread_step_counts[1][0].load(Ordering::Relaxed), 1);
5091    }
5092
5093    #[test]
5094    fn test_stats_record_queue_empty() {
5095        let stats = PipelineStats::new();
5096
5097        stats.record_queue_empty(1);
5098        stats.record_queue_empty(1);
5099        stats.record_queue_empty(2);
5100        stats.record_queue_empty(25); // Q2b
5101        stats.record_queue_empty(7);
5102
5103        assert_eq!(stats.q1_empty.load(Ordering::Relaxed), 2);
5104        assert_eq!(stats.q2_empty.load(Ordering::Relaxed), 1);
5105        assert_eq!(stats.q2b_empty.load(Ordering::Relaxed), 1);
5106        assert_eq!(stats.q7_empty.load(Ordering::Relaxed), 1);
5107        assert_eq!(stats.q3_empty.load(Ordering::Relaxed), 0);
5108    }
5109
5110    #[test]
5111    fn test_stats_record_idle_for_thread() {
5112        let stats = PipelineStats::new();
5113
5114        stats.record_idle_for_thread(0, 100_000);
5115        stats.record_idle_for_thread(0, 200_000);
5116        stats.record_idle_for_thread(1, 50_000);
5117
5118        assert_eq!(stats.idle_yields.load(Ordering::Relaxed), 3);
5119        assert_eq!(stats.per_thread_idle_ns[0].load(Ordering::Relaxed), 300_000);
5120        assert_eq!(stats.per_thread_idle_ns[1].load(Ordering::Relaxed), 50_000);
5121    }
5122
5123    // ========================================================================
5124    // MemoryTracker tests
5125    // ========================================================================
5126
5127    #[test]
5128    fn test_memory_tracker_new() {
5129        let tracker = MemoryTracker::new(1000);
5130        assert_eq!(tracker.current(), 0);
5131        assert_eq!(tracker.peak(), 0);
5132        assert_eq!(tracker.limit(), 1000);
5133    }
5134
5135    #[test]
5136    fn test_memory_tracker_unlimited() {
5137        let tracker = MemoryTracker::unlimited();
5138        assert_eq!(tracker.limit(), 0);
5139        // Unlimited tracker should always succeed
5140        assert!(tracker.try_add(1_000_000));
5141        assert!(tracker.try_add(1_000_000_000));
5142        assert_eq!(tracker.current(), 1_001_000_000);
5143    }
5144
5145    #[test]
5146    fn test_memory_tracker_try_add_under_limit() {
5147        let tracker = MemoryTracker::new(1000);
5148        assert!(tracker.try_add(500));
5149        assert_eq!(tracker.current(), 500);
5150    }
5151
5152    #[test]
5153    fn test_memory_tracker_try_add_at_limit() {
5154        let tracker = MemoryTracker::new(1000);
5155        assert!(tracker.try_add(1000));
5156        // Now at the limit, next add should be rejected
5157        assert!(!tracker.try_add(1));
5158    }
5159
5160    #[test]
5161    fn test_memory_tracker_try_add_single_exceeds() {
5162        // Key behavior: if currently under limit, a single addition that
5163        // would exceed the limit still succeeds.
5164        let tracker = MemoryTracker::new(1000);
5165        assert!(tracker.try_add(500)); // under limit
5166        assert!(tracker.try_add(600)); // would exceed, but we're under limit so it succeeds
5167        assert_eq!(tracker.current(), 1100);
5168        // Now over limit, next add should be rejected
5169        assert!(!tracker.try_add(1));
5170    }
5171
5172    #[test]
5173    fn test_memory_tracker_remove_saturating() {
5174        let tracker = MemoryTracker::new(1000);
5175        tracker.try_add(100);
5176        // Remove more than current -> saturates to 0
5177        tracker.remove(200);
5178        assert_eq!(tracker.current(), 0);
5179    }
5180
5181    #[test]
5182    fn test_memory_tracker_peak_tracking() {
5183        let tracker = MemoryTracker::new(0); // unlimited
5184        tracker.try_add(100);
5185        tracker.try_add(200);
5186        assert_eq!(tracker.peak(), 300);
5187        tracker.remove(250);
5188        assert_eq!(tracker.current(), 50);
5189        // Peak should still reflect the high-water mark
5190        assert_eq!(tracker.peak(), 300);
5191    }
5192
5193    #[test]
5194    fn test_memory_tracker_is_at_limit() {
5195        // Backpressure threshold is min(limit, 512MB).
5196        // With a limit of 1000, backpressure threshold = 1000.
5197        let tracker = MemoryTracker::new(1000);
5198        assert!(!tracker.is_at_limit());
5199        tracker.try_add(999);
5200        assert!(!tracker.is_at_limit());
5201        tracker.try_add(1);
5202        assert!(tracker.is_at_limit());
5203    }
5204
5205    #[test]
5206    fn test_memory_tracker_drain_threshold() {
5207        // Drain threshold is half of backpressure threshold.
5208        // With limit=1000, backpressure=1000, drain=500.
5209        let tracker = MemoryTracker::new(1000);
5210        tracker.try_add(1000);
5211        assert!(!tracker.is_below_drain_threshold()); // at 1000, threshold is 500
5212        tracker.remove(501);
5213        assert!(tracker.is_below_drain_threshold()); // at 499, below 500
5214    }
5215
5216    #[test]
5217    fn test_memory_tracker_default_is_unlimited() {
5218        let tracker = MemoryTracker::default();
5219        assert_eq!(tracker.limit(), 0);
5220        // Should behave like unlimited
5221        assert!(tracker.try_add(1_000_000));
5222    }
5223
5224    // ========================================================================
5225    // ReorderBufferState tests
5226    // ========================================================================
5227
5228    #[test]
5229    fn test_reorder_buffer_state_new() {
5230        let state = ReorderBufferState::new(1000);
5231        assert_eq!(state.get_next_seq(), 0);
5232        assert_eq!(state.get_heap_bytes(), 0);
5233        assert_eq!(state.get_memory_limit(), 1000);
5234    }
5235
5236    #[test]
5237    fn test_reorder_buffer_state_can_proceed_next_seq() {
5238        // Always allows the next_seq serial, even if over memory limit
5239        let state = ReorderBufferState::new(100);
5240        state.add_heap_bytes(10_000); // way over limit
5241        // next_seq is 0, so serial 0 should always proceed
5242        assert!(state.can_proceed(0));
5243    }
5244
5245    #[test]
5246    fn test_reorder_buffer_state_can_proceed_over_limit() {
5247        // Blocks non-next serials when heap_bytes >= effective_limit / 2
5248        let state = ReorderBufferState::new(1000);
5249        // effective_limit = min(1000, 512MB) = 1000
5250        // backpressure at 50% = 500
5251        state.add_heap_bytes(500);
5252        // Serial 1 is not next_seq (0), and heap_bytes >= 500, so should block
5253        assert!(!state.can_proceed(1));
5254        // Serial 0 is next_seq, should still proceed
5255        assert!(state.can_proceed(0));
5256    }
5257
5258    #[test]
5259    fn test_reorder_buffer_state_is_memory_high() {
5260        let state = ReorderBufferState::new(1000);
5261        assert!(!state.is_memory_high());
5262        state.add_heap_bytes(1000);
5263        assert!(state.is_memory_high());
5264    }
5265
5266    #[test]
5267    fn test_reorder_buffer_state_is_memory_drained() {
5268        // Hysteresis: enter drain at threshold, exit at half threshold
5269        let state = ReorderBufferState::new(1000);
5270        state.add_heap_bytes(1000);
5271        assert!(!state.is_memory_drained()); // at 1000, threshold/2 = 500
5272        state.sub_heap_bytes(501);
5273        assert!(state.is_memory_drained()); // at 499, below 500
5274    }
5275
5276    #[test]
5277    fn test_reorder_buffer_state_effective_limit() {
5278        // 0 uses BACKPRESSURE_THRESHOLD_BYTES, non-zero uses min
5279        let state_zero = ReorderBufferState::new(0);
5280        // is_memory_high checks heap_bytes >= effective_limit
5281        // With 0 limit, effective_limit = BACKPRESSURE_THRESHOLD_BYTES (512MB)
5282        assert!(!state_zero.is_memory_high()); // 0 < 512MB
5283
5284        let state_small = ReorderBufferState::new(100);
5285        state_small.add_heap_bytes(100);
5286        assert!(state_small.is_memory_high()); // 100 >= min(100, 512MB) = 100
5287    }
5288
5289    #[test]
5290    fn test_reorder_buffer_state_add_sub_heap_bytes() {
5291        let state = ReorderBufferState::new(0);
5292        state.add_heap_bytes(100);
5293        assert_eq!(state.get_heap_bytes(), 100);
5294        state.add_heap_bytes(50);
5295        assert_eq!(state.get_heap_bytes(), 150);
5296        state.sub_heap_bytes(30);
5297        assert_eq!(state.get_heap_bytes(), 120);
5298    }
5299
5300    // ========================================================================
5301    // GroupKey tests
5302    // ========================================================================
5303
5304    #[test]
5305    fn test_group_key_single() {
5306        let key = GroupKey::single(1, 100, 0, 5, 0, 42);
5307        assert_eq!(key.ref_id1, 1);
5308        assert_eq!(key.pos1, 100);
5309        assert_eq!(key.strand1, 0);
5310        assert_eq!(key.ref_id2, GroupKey::UNKNOWN_REF);
5311        assert_eq!(key.pos2, GroupKey::UNKNOWN_POS);
5312        assert_eq!(key.strand2, GroupKey::UNKNOWN_STRAND);
5313        assert_eq!(key.library_idx, 5);
5314        assert_eq!(key.cell_hash, 0);
5315        assert_eq!(key.name_hash, 42);
5316    }
5317
5318    #[test]
5319    fn test_group_key_paired() {
5320        // When (ref_id, pos, strand) <= (mate_ref_id, mate_pos, mate_strand),
5321        // the positions stay as given.
5322        let key = GroupKey::paired(1, 100, 0, 2, 200, 1, 3, 0, 99);
5323        assert_eq!(key.ref_id1, 1);
5324        assert_eq!(key.pos1, 100);
5325        assert_eq!(key.strand1, 0);
5326        assert_eq!(key.ref_id2, 2);
5327        assert_eq!(key.pos2, 200);
5328        assert_eq!(key.strand2, 1);
5329    }
5330
5331    #[test]
5332    fn test_group_key_paired_swap() {
5333        // When (ref_id, pos, strand) > (mate_ref_id, mate_pos, mate_strand),
5334        // the positions are swapped so lower comes first.
5335        let key = GroupKey::paired(5, 500, 1, 1, 100, 0, 3, 0, 99);
5336        assert_eq!(key.ref_id1, 1);
5337        assert_eq!(key.pos1, 100);
5338        assert_eq!(key.strand1, 0);
5339        assert_eq!(key.ref_id2, 5);
5340        assert_eq!(key.pos2, 500);
5341        assert_eq!(key.strand2, 1);
5342    }
5343
5344    #[test]
5345    fn test_group_key_position_key() {
5346        let key = GroupKey::single(1, 100, 0, 5, 7, 42);
5347        let pk = key.position_key();
5348        // position_key returns tuple without name_hash
5349        assert_eq!(
5350            pk,
5351            (
5352                1,
5353                100,
5354                0,
5355                GroupKey::UNKNOWN_REF,
5356                GroupKey::UNKNOWN_POS,
5357                GroupKey::UNKNOWN_STRAND,
5358                5,
5359                7
5360            )
5361        );
5362    }
5363
5364    #[test]
5365    fn test_group_key_ord_by_position() {
5366        let key_a = GroupKey::single(1, 100, 0, 0, 0, 0);
5367        let key_b = GroupKey::single(2, 50, 0, 0, 0, 0);
5368        // key_a has lower ref_id1, so it should come first
5369        assert!(key_a < key_b);
5370    }
5371
5372    #[test]
5373    fn test_group_key_ord_tiebreak_name_hash() {
5374        let key_a = GroupKey::single(1, 100, 0, 0, 0, 10);
5375        let key_b = GroupKey::single(1, 100, 0, 0, 0, 20);
5376        // Same position, name_hash breaks tie
5377        assert!(key_a < key_b);
5378    }
5379
5380    #[test]
5381    fn test_group_key_default() {
5382        let key = GroupKey::default();
5383        assert_eq!(key.ref_id1, GroupKey::UNKNOWN_REF);
5384        assert_eq!(key.pos1, GroupKey::UNKNOWN_POS);
5385        assert_eq!(key.strand1, GroupKey::UNKNOWN_STRAND);
5386        assert_eq!(key.ref_id2, GroupKey::UNKNOWN_REF);
5387        assert_eq!(key.pos2, GroupKey::UNKNOWN_POS);
5388        assert_eq!(key.strand2, GroupKey::UNKNOWN_STRAND);
5389        assert_eq!(key.library_idx, 0);
5390        assert_eq!(key.cell_hash, 0);
5391        assert_eq!(key.name_hash, 0);
5392    }
5393
5394    #[test]
5395    fn test_group_key_eq() {
5396        let key_a = GroupKey::single(1, 100, 0, 5, 0, 42);
5397        let key_b = GroupKey::single(1, 100, 0, 5, 0, 42);
5398        assert_eq!(key_a, key_b);
5399    }
5400
5401    #[test]
5402    fn test_group_key_paired_same_position() {
5403        // Same ref_id and pos, different strand normalizes correctly
5404        let key = GroupKey::paired(1, 100, 1, 1, 100, 0, 0, 0, 0);
5405        // (1,100,0) < (1,100,1) so mate comes first
5406        assert_eq!(key.ref_id1, 1);
5407        assert_eq!(key.pos1, 100);
5408        assert_eq!(key.strand1, 0);
5409        assert_eq!(key.strand2, 1);
5410    }
5411
5412    #[test]
5413    fn test_group_key_hash() {
5414        use std::collections::HashSet;
5415        let key_a = GroupKey::single(1, 100, 0, 0, 0, 42);
5416        let key_b = GroupKey::single(1, 100, 0, 0, 0, 42);
5417        let key_c = GroupKey::single(2, 200, 1, 0, 0, 99);
5418        let mut set = HashSet::new();
5419        set.insert(key_a);
5420        assert!(set.contains(&key_b));
5421        set.insert(key_c);
5422        assert_eq!(set.len(), 2);
5423    }
5424
5425    // ========================================================================
5426    // PipelineStep tests
5427    // ========================================================================
5428
5429    #[test]
5430    fn test_pipeline_step_is_exclusive() {
5431        assert!(PipelineStep::Read.is_exclusive());
5432        assert!(!PipelineStep::Decompress.is_exclusive());
5433        assert!(PipelineStep::FindBoundaries.is_exclusive());
5434        assert!(!PipelineStep::Decode.is_exclusive());
5435        assert!(PipelineStep::Group.is_exclusive());
5436        assert!(!PipelineStep::Process.is_exclusive());
5437        assert!(!PipelineStep::Serialize.is_exclusive());
5438        assert!(!PipelineStep::Compress.is_exclusive());
5439        assert!(PipelineStep::Write.is_exclusive());
5440    }
5441
5442    #[test]
5443    fn test_pipeline_step_all() {
5444        let all = PipelineStep::all();
5445        assert_eq!(all.len(), 9);
5446        assert_eq!(all[0], PipelineStep::Read);
5447        assert_eq!(all[1], PipelineStep::Decompress);
5448        assert_eq!(all[2], PipelineStep::FindBoundaries);
5449        assert_eq!(all[3], PipelineStep::Decode);
5450        assert_eq!(all[4], PipelineStep::Group);
5451        assert_eq!(all[5], PipelineStep::Process);
5452        assert_eq!(all[6], PipelineStep::Serialize);
5453        assert_eq!(all[7], PipelineStep::Compress);
5454        assert_eq!(all[8], PipelineStep::Write);
5455    }
5456
5457    #[test]
5458    fn test_pipeline_step_from_index() {
5459        assert_eq!(PipelineStep::from_index(0), PipelineStep::Read);
5460        assert_eq!(PipelineStep::from_index(1), PipelineStep::Decompress);
5461        assert_eq!(PipelineStep::from_index(2), PipelineStep::FindBoundaries);
5462        assert_eq!(PipelineStep::from_index(3), PipelineStep::Decode);
5463        assert_eq!(PipelineStep::from_index(4), PipelineStep::Group);
5464        assert_eq!(PipelineStep::from_index(5), PipelineStep::Process);
5465        assert_eq!(PipelineStep::from_index(6), PipelineStep::Serialize);
5466        assert_eq!(PipelineStep::from_index(7), PipelineStep::Compress);
5467        assert_eq!(PipelineStep::from_index(8), PipelineStep::Write);
5468    }
5469
5470    #[test]
5471    fn test_pipeline_step_short_name() {
5472        assert_eq!(PipelineStep::Read.short_name(), "Rd");
5473        assert_eq!(PipelineStep::Decompress.short_name(), "Dc");
5474        assert_eq!(PipelineStep::FindBoundaries.short_name(), "Fb");
5475        assert_eq!(PipelineStep::Decode.short_name(), "De");
5476        assert_eq!(PipelineStep::Group.short_name(), "Gr");
5477        assert_eq!(PipelineStep::Process.short_name(), "Pr");
5478        assert_eq!(PipelineStep::Serialize.short_name(), "Se");
5479        assert_eq!(PipelineStep::Compress.short_name(), "Co");
5480        assert_eq!(PipelineStep::Write.short_name(), "Wr");
5481        // Verify each is exactly 2 chars
5482        for step in PipelineStep::all() {
5483            assert_eq!(step.short_name().len(), 2);
5484        }
5485    }
5486
5487    // ========================================================================
5488    // StepResult tests
5489    // ========================================================================
5490
5491    #[test]
5492    fn test_step_result_is_success() {
5493        assert!(StepResult::Success.is_success());
5494        assert!(!StepResult::OutputFull.is_success());
5495        assert!(!StepResult::InputEmpty.is_success());
5496    }
5497
5498    #[test]
5499    fn test_step_result_variants() {
5500        // All three variants exist and are distinct
5501        let s = StepResult::Success;
5502        let o = StepResult::OutputFull;
5503        let i = StepResult::InputEmpty;
5504        assert_ne!(s, o);
5505        assert_ne!(s, i);
5506        assert_ne!(o, i);
5507    }
5508
5509    // ========================================================================
5510    // Batch type tests
5511    // ========================================================================
5512
5513    #[test]
5514    fn test_raw_block_batch_new_empty() {
5515        let batch = RawBlockBatch::new();
5516        assert!(batch.is_empty());
5517        assert_eq!(batch.len(), 0);
5518    }
5519
5520    #[test]
5521    fn test_raw_block_batch_with_capacity() {
5522        let batch = RawBlockBatch::with_capacity(32);
5523        assert!(batch.is_empty());
5524        assert!(batch.blocks.capacity() >= 32);
5525    }
5526
5527    #[test]
5528    fn test_compressed_block_batch_new() {
5529        let batch = CompressedBlockBatch::new();
5530        assert!(batch.is_empty());
5531        assert_eq!(batch.len(), 0);
5532        assert_eq!(batch.record_count, 0);
5533    }
5534
5535    #[test]
5536    fn test_compressed_block_batch_clear() {
5537        let mut batch = CompressedBlockBatch::new();
5538        batch.record_count = 42;
5539        batch.secondary_data = Some(vec![1, 2, 3]);
5540        batch.clear();
5541        assert!(batch.is_empty());
5542        assert_eq!(batch.record_count, 0);
5543        assert!(batch.secondary_data.is_none());
5544    }
5545
5546    #[test]
5547    fn test_bgzf_batch_config_default() {
5548        let config = BgzfBatchConfig::default();
5549        assert_eq!(config.blocks_per_batch, 16);
5550        assert_eq!(config.compression_level, 6);
5551    }
5552
5553    #[test]
5554    fn test_bgzf_batch_config_new() {
5555        let config = BgzfBatchConfig::new(64);
5556        assert_eq!(config.blocks_per_batch, 64);
5557        // compression_level should still be default
5558        assert_eq!(config.compression_level, 6);
5559    }
5560
5561    #[test]
5562    fn test_decompressed_batch_new_empty() {
5563        let batch = DecompressedBatch::new();
5564        assert!(batch.is_empty());
5565        assert!(batch.data.is_empty());
5566    }
5567
5568    #[test]
5569    fn test_serialized_batch_clear() {
5570        let mut batch = SerializedBatch::new();
5571        batch.data.extend_from_slice(&[1, 2, 3]);
5572        batch.record_count = 10;
5573        batch.secondary_data = Some(vec![4, 5, 6]);
5574        batch.clear();
5575        assert!(batch.is_empty());
5576        assert_eq!(batch.record_count, 0);
5577        assert!(batch.secondary_data.is_none());
5578    }
5579
5580    // ========================================================================
5581    // PipelineConfig tests
5582    // ========================================================================
5583
5584    #[test]
5585    fn test_pipeline_config_new_defaults() {
5586        let config = PipelineConfig::new(4, 6);
5587        assert_eq!(config.num_threads, 4);
5588        assert_eq!(config.compression_level, 6);
5589        assert_eq!(config.queue_capacity, 64);
5590        assert_eq!(config.batch_size, 1);
5591        assert_eq!(config.queue_memory_limit, 0);
5592        assert!(!config.collect_stats);
5593    }
5594
5595    #[test]
5596    fn test_pipeline_config_builder_chain() {
5597        let config = PipelineConfig::new(4, 6)
5598            .with_compression_level(9)
5599            .with_batch_size(100)
5600            .with_stats(true)
5601            .with_queue_memory_limit(1_000_000);
5602        assert_eq!(config.compression_level, 9);
5603        assert_eq!(config.batch_size, 100);
5604        assert!(config.collect_stats);
5605        assert_eq!(config.queue_memory_limit, 1_000_000);
5606    }
5607
5608    #[test]
5609    fn test_pipeline_config_auto_tuned_1_thread() {
5610        let config = PipelineConfig::auto_tuned(1, 6);
5611        assert_eq!(config.num_threads, 1);
5612        // queue_capacity = (1*16).clamp(64, 256) = 64
5613        assert_eq!(config.queue_capacity, 64);
5614    }
5615
5616    #[test]
5617    fn test_pipeline_config_auto_tuned_8_threads() {
5618        let config = PipelineConfig::auto_tuned(8, 6);
5619        assert_eq!(config.num_threads, 8);
5620        // queue_capacity = (8*16).clamp(64, 256) = 128
5621        assert_eq!(config.queue_capacity, 128);
5622        // blocks_per_read_batch = 48 for 8..=15 threads
5623        assert_eq!(config.blocks_per_read_batch, 48);
5624    }
5625
5626    #[test]
5627    fn test_pipeline_config_auto_tuned_32_threads() {
5628        let config = PipelineConfig::auto_tuned(32, 6);
5629        assert_eq!(config.num_threads, 32);
5630        // queue_capacity = (32*16).clamp(64, 256) = 256 (capped)
5631        assert_eq!(config.queue_capacity, 256);
5632    }
5633
5634    #[test]
5635    fn test_pipeline_config_with_compression_level() {
5636        let config = PipelineConfig::new(4, 6).with_compression_level(12);
5637        assert_eq!(config.compression_level, 12);
5638    }
5639
5640    #[test]
5641    fn test_pipeline_config_with_batch_size_min_1() {
5642        let config = PipelineConfig::new(4, 6).with_batch_size(0);
5643        // batch_size of 0 gets clamped to 1
5644        assert_eq!(config.batch_size, 1);
5645    }
5646
5647    #[test]
5648    fn test_pipeline_config_with_queue_memory_limit() {
5649        let config = PipelineConfig::new(4, 6).with_queue_memory_limit(500_000_000);
5650        assert_eq!(config.queue_memory_limit, 500_000_000);
5651    }
5652
5653    // ========================================================================
5654    // PipelineValidationError tests
5655    // ========================================================================
5656
5657    #[test]
5658    fn test_pipeline_validation_error_display_empty() {
5659        let err = PipelineValidationError {
5660            non_empty_queues: vec![],
5661            counter_mismatches: vec![],
5662            leaked_heap_bytes: 0,
5663        };
5664        let display = format!("{err}");
5665        // Even empty error prints the header
5666        assert!(display.contains("Pipeline validation failed"));
5667    }
5668
5669    #[test]
5670    fn test_pipeline_validation_error_display_full() {
5671        let err = PipelineValidationError {
5672            non_empty_queues: vec!["Q1".to_string(), "Q2".to_string()],
5673            counter_mismatches: vec!["read_count != write_count".to_string()],
5674            leaked_heap_bytes: 1024,
5675        };
5676        let display = format!("{err}");
5677        assert!(display.contains("Pipeline validation failed"));
5678        assert!(display.contains("Q1"));
5679        assert!(display.contains("Q2"));
5680        assert!(display.contains("read_count != write_count"));
5681        assert!(display.contains("1024"));
5682    }
5683
5684    // ========================================================================
5685    // extract_panic_message tests
5686    // ========================================================================
5687
5688    #[test]
5689    fn test_extract_panic_message_str() {
5690        let payload: Box<dyn std::any::Any + Send> = Box::new("something went wrong");
5691        let msg = extract_panic_message(payload);
5692        assert_eq!(msg, "something went wrong");
5693    }
5694
5695    #[test]
5696    fn test_extract_panic_message_string() {
5697        let payload: Box<dyn std::any::Any + Send> = Box::new(String::from("an error occurred"));
5698        let msg = extract_panic_message(payload);
5699        assert_eq!(msg, "an error occurred");
5700    }
5701
5702    #[test]
5703    fn test_extract_panic_message_other() {
5704        let payload: Box<dyn std::any::Any + Send> = Box::new(42_i32);
5705        let msg = extract_panic_message(payload);
5706        assert_eq!(msg, "Unknown panic");
5707    }
5708
5709    // ========================================================================
5710    // WorkerCoreState tests
5711    // ========================================================================
5712
5713    #[test]
5714    fn test_worker_core_state_initial_values() {
5715        use super::super::scheduler::SchedulerStrategy;
5716        let state = WorkerCoreState::new(6, 0, 4, SchedulerStrategy::default(), ActiveSteps::all());
5717        assert_eq!(state.backoff_us, MIN_BACKOFF_US);
5718    }
5719
5720    #[test]
5721    fn test_worker_core_state_reset_backoff() {
5722        use super::super::scheduler::SchedulerStrategy;
5723        let mut state =
5724            WorkerCoreState::new(6, 0, 4, SchedulerStrategy::default(), ActiveSteps::all());
5725        state.increase_backoff();
5726        assert!(state.backoff_us > MIN_BACKOFF_US);
5727        state.reset_backoff();
5728        assert_eq!(state.backoff_us, MIN_BACKOFF_US);
5729    }
5730
5731    #[test]
5732    fn test_worker_core_state_increase_backoff() {
5733        use super::super::scheduler::SchedulerStrategy;
5734        let mut state =
5735            WorkerCoreState::new(6, 0, 4, SchedulerStrategy::default(), ActiveSteps::all());
5736        assert_eq!(state.backoff_us, MIN_BACKOFF_US); // 10
5737        state.increase_backoff();
5738        assert_eq!(state.backoff_us, MIN_BACKOFF_US * 2); // 20
5739        state.increase_backoff();
5740        assert_eq!(state.backoff_us, MIN_BACKOFF_US * 4); // 40
5741        // Keep increasing until we hit the cap
5742        for _ in 0..20 {
5743            state.increase_backoff();
5744        }
5745        assert_eq!(state.backoff_us, MAX_BACKOFF_US);
5746    }
5747
5748    // ========================================================================
5749    // OutputPipelineQueues tests
5750    // ========================================================================
5751
5752    struct TestProcessed {
5753        size: usize,
5754    }
5755
5756    impl MemoryEstimate for TestProcessed {
5757        fn estimate_heap_size(&self) -> usize {
5758            self.size
5759        }
5760    }
5761
5762    #[test]
5763    fn test_output_queues_new() {
5764        let output: Box<dyn std::io::Write + Send> = Box::new(Vec::<u8>::new());
5765        let queues: OutputPipelineQueues<(), TestProcessed> =
5766            OutputPipelineQueues::new(16, output, None, "test");
5767        assert!(queues.groups.is_empty());
5768        assert!(queues.processed.is_empty());
5769        assert!(queues.serialized.is_empty());
5770        assert!(queues.compressed.is_empty());
5771    }
5772
5773    #[test]
5774    fn test_output_queues_set_take_error() {
5775        let output: Box<dyn std::io::Write + Send> = Box::new(Vec::<u8>::new());
5776        let queues: OutputPipelineQueues<(), TestProcessed> =
5777            OutputPipelineQueues::new(16, output, None, "test");
5778        assert!(!queues.has_error());
5779        queues.set_error(io::Error::other("test error"));
5780        assert!(queues.has_error());
5781        let err = queues.take_error();
5782        assert!(err.is_some());
5783        assert_eq!(err.unwrap().to_string(), "test error");
5784    }
5785
5786    #[test]
5787    fn test_output_queues_draining() {
5788        let output: Box<dyn std::io::Write + Send> = Box::new(Vec::<u8>::new());
5789        let queues: OutputPipelineQueues<(), TestProcessed> =
5790            OutputPipelineQueues::new(16, output, None, "test");
5791        assert!(!queues.is_draining());
5792        queues.set_draining(true);
5793        assert!(queues.is_draining());
5794    }
5795
5796    #[test]
5797    fn test_output_queues_queue_depths_empty() {
5798        let output: Box<dyn std::io::Write + Send> = Box::new(Vec::<u8>::new());
5799        let queues: OutputPipelineQueues<(), TestProcessed> =
5800            OutputPipelineQueues::new(16, output, None, "test");
5801        let depths = queues.queue_depths();
5802        assert_eq!(depths.groups, 0);
5803        assert_eq!(depths.processed, 0);
5804        assert_eq!(depths.serialized, 0);
5805        assert_eq!(depths.compressed, 0);
5806    }
5807
5808    #[test]
5809    fn test_output_queues_are_queues_empty() {
5810        let output: Box<dyn std::io::Write + Send> = Box::new(Vec::<u8>::new());
5811        let queues: OutputPipelineQueues<(), TestProcessed> =
5812            OutputPipelineQueues::new(16, output, None, "test");
5813        assert!(queues.are_queues_empty());
5814    }
5815
5816    // ========================================================================
5817    // MemoryEstimate impl tests
5818    // ========================================================================
5819
5820    #[test]
5821    fn test_memory_estimate_unit() {
5822        let unit = ();
5823        assert_eq!(unit.estimate_heap_size(), 0);
5824    }
5825
5826    #[test]
5827    fn test_decoded_record_record_accessor() {
5828        let rec = RecordBuf::default();
5829        let parsed = DecodedRecord::new(rec, GroupKey::default());
5830        assert!(parsed.record().is_some());
5831        assert!(parsed.raw_bytes().is_none());
5832
5833        let raw = DecodedRecord::from_raw_bytes(vec![0u8; 32], GroupKey::default());
5834        assert!(raw.record().is_none());
5835        assert!(raw.raw_bytes().is_some());
5836    }
5837
5838    #[test]
5839    fn test_memory_estimate_serialized_batch() {
5840        let mut batch = SerializedBatch::new();
5841        batch.data.reserve(1024);
5842        assert!(batch.estimate_heap_size() >= 1024);
5843    }
5844
5845    #[test]
5846    fn test_memory_estimate_decompressed_batch() {
5847        let mut batch = DecompressedBatch::new();
5848        batch.data.reserve(2048);
5849        assert!(batch.estimate_heap_size() >= 2048);
5850    }
5851
5852    #[test]
5853    fn test_memory_estimate_vec_record_buf() {
5854        use crate::sam::builder::RecordBuilder;
5855        use noodles::sam::alignment::record_buf::RecordBuf;
5856
5857        let record = RecordBuilder::new().sequence("ACGT").qualities(&[30, 30, 30, 30]).build();
5858        let mut records: Vec<RecordBuf> = Vec::with_capacity(10);
5859        records.push(record);
5860
5861        let estimate = records.estimate_heap_size();
5862        // Should include Vec overhead for capacity * size_of::<RecordBuf>()
5863        let vec_overhead = 10 * std::mem::size_of::<RecordBuf>();
5864        assert!(
5865            estimate >= vec_overhead,
5866            "estimate {estimate} should include Vec<RecordBuf> overhead {vec_overhead}"
5867        );
5868    }
5869
5870    #[test]
5871    fn test_serialized_batch_memory_estimate_with_secondary() {
5872        let batch = SerializedBatch {
5873            data: vec![0u8; 100],
5874            record_count: 5,
5875            secondary_data: Some(vec![0u8; 50]),
5876        };
5877        let estimate = batch.estimate_heap_size();
5878        assert!(
5879            estimate >= 150,
5880            "Should include both primary ({}) and secondary data, got {estimate}",
5881            batch.data.capacity()
5882        );
5883    }
5884
5885    #[test]
5886    fn test_serialized_batch_memory_estimate_without_secondary() {
5887        let batch = SerializedBatch { data: vec![0u8; 100], record_count: 5, secondary_data: None };
5888        let estimate = batch.estimate_heap_size();
5889        assert!(estimate >= 100);
5890        assert!(estimate < 150, "Should not include phantom secondary data, got {estimate}");
5891    }
5892
5893    #[test]
5894    fn test_compressed_block_batch_memory_with_secondary() {
5895        let batch = CompressedBlockBatch {
5896            blocks: vec![],
5897            record_count: 0,
5898            secondary_data: Some(vec![0u8; 200]),
5899        };
5900        assert!(
5901            batch.estimate_heap_size() >= 200,
5902            "Should include secondary data, got {}",
5903            batch.estimate_heap_size()
5904        );
5905    }
5906}