use crossbeam_queue::ArrayQueue;
use log::info;
use parking_lot::Mutex;
use std::io::{self, Read, Write};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering};
use std::thread;
use std::time::{Duration, Instant};
use crate::progress::ProgressTracker;
use super::deadlock::{DeadlockAction, DeadlockState, QueueSnapshot, check_deadlock_and_restore};
use crate::bgzf_reader::{RawBgzfBlock, decompress_block_into, read_raw_blocks};
use crate::read_info::LibraryIndex;
use crate::reorder_buffer::ReorderBuffer;
use crate::sam::SamTag;
use fgumi_raw_bam::RawRecord;
use noodles::sam::alignment::RecordBuf;
use noodles::sam::alignment::record::data::field::Tag;
use super::scheduler::{BackpressureState, Scheduler, SchedulerStrategy, create_scheduler};
#[cfg(feature = "memory-debug")]
#[derive(Debug, Clone)]
pub struct MemoryBreakdown {
pub system_rss_gb: f64,
pub tracked_total_gb: f64,
pub untracked_gb: f64,
pub q1_mb: f64,
pub q2_mb: f64,
pub q3_mb: f64,
pub q4_gb: f64,
pub q5_gb: f64,
pub q6_mb: f64,
pub q7_mb: f64,
pub position_groups_gb: f64,
pub templates_gb: f64,
pub reorder_buffers_mb: f64,
pub grouper_mb: f64,
pub worker_local_mb: f64,
pub decompressors_mb: f64,
pub compressors_mb: f64,
pub worker_buffers_mb: f64,
pub io_buffers_mb: f64,
pub thread_stacks_mb: f64,
pub queue_capacity_mb: f64,
pub infrastructure_gb: f64,
}
#[cfg(feature = "memory-debug")]
#[derive(Debug)]
pub struct MemoryDebugStats {
pub q1_memory_bytes: AtomicU64,
pub q2_memory_bytes: AtomicU64,
pub q3_memory_bytes: AtomicU64,
pub q4_memory_bytes: AtomicU64,
pub q5_memory_bytes: AtomicU64,
pub q6_memory_bytes: AtomicU64,
pub q7_memory_bytes: AtomicU64,
pub position_group_processing_bytes: AtomicU64,
pub template_processing_bytes: AtomicU64,
pub reorder_buffer_bytes: AtomicU64,
pub grouper_memory_bytes: AtomicU64,
pub worker_local_memory_bytes: AtomicU64,
pub decompressor_memory_bytes: AtomicU64,
pub compressor_memory_bytes: AtomicU64,
pub worker_buffer_memory_bytes: AtomicU64,
pub io_buffer_memory_bytes: AtomicU64,
pub thread_stack_memory_bytes: AtomicU64,
pub queue_capacity_memory_bytes: AtomicU64,
pub system_rss_bytes: AtomicU64,
}
#[cfg(feature = "memory-debug")]
impl MemoryDebugStats {
#[must_use]
pub fn new() -> Self {
Self {
q1_memory_bytes: AtomicU64::new(0),
q2_memory_bytes: AtomicU64::new(0),
q3_memory_bytes: AtomicU64::new(0),
q4_memory_bytes: AtomicU64::new(0),
q5_memory_bytes: AtomicU64::new(0),
q6_memory_bytes: AtomicU64::new(0),
q7_memory_bytes: AtomicU64::new(0),
position_group_processing_bytes: AtomicU64::new(0),
template_processing_bytes: AtomicU64::new(0),
reorder_buffer_bytes: AtomicU64::new(0),
grouper_memory_bytes: AtomicU64::new(0),
worker_local_memory_bytes: AtomicU64::new(0),
decompressor_memory_bytes: AtomicU64::new(0),
compressor_memory_bytes: AtomicU64::new(0),
worker_buffer_memory_bytes: AtomicU64::new(0),
io_buffer_memory_bytes: AtomicU64::new(0),
thread_stack_memory_bytes: AtomicU64::new(0),
queue_capacity_memory_bytes: AtomicU64::new(0),
system_rss_bytes: AtomicU64::new(0),
}
}
}
#[cfg(feature = "memory-debug")]
const THREAD_ID_UNSET: usize = usize::MAX;
#[cfg(feature = "memory-debug")]
thread_local! {
static THREAD_ID: std::cell::Cell<usize> = std::cell::Cell::new(THREAD_ID_UNSET);
}
#[cfg(feature = "memory-debug")]
pub fn get_or_assign_thread_id() -> usize {
THREAD_ID.with(|id| {
let current = id.get();
if current == THREAD_ID_UNSET {
use std::sync::atomic::{AtomicUsize, Ordering};
static THREAD_COUNTER: AtomicUsize = AtomicUsize::new(0);
let new_id = THREAD_COUNTER.fetch_add(1, Ordering::Relaxed) % MAX_THREADS;
id.set(new_id);
new_id
} else {
current
}
})
}
#[cfg(feature = "memory-debug")]
pub fn get_process_rss_bytes() -> Option<u64> {
if let Ok(status) = std::fs::read_to_string("/proc/self/status") {
return status
.lines()
.find(|line| line.starts_with("VmRSS:"))?
.split_whitespace()
.nth(1)?
.parse::<u64>()
.ok()
.map(|kb| kb * 1024); }
use std::sync::Mutex;
use sysinfo::{ProcessRefreshKind, RefreshKind, System};
static RSS_SYSTEM: std::sync::OnceLock<Mutex<System>> = std::sync::OnceLock::new();
let sys = RSS_SYSTEM.get_or_init(|| {
Mutex::new(System::new_with_specifics(
RefreshKind::nothing().with_processes(ProcessRefreshKind::nothing().with_memory()),
))
});
let mut sys_guard = sys.lock().ok()?;
sys_guard.refresh_processes_specifics(
sysinfo::ProcessesToUpdate::All,
false,
ProcessRefreshKind::nothing().with_memory(),
);
let pid = sysinfo::get_current_pid().ok()?;
let process = sys_guard.process(pid)?;
Some(process.memory()) }
#[cfg(feature = "memory-debug")]
pub fn log_comprehensive_memory_stats(stats: &PipelineStats) {
if let Some(rss) = get_process_rss_bytes() {
stats.update_system_rss(rss);
}
let breakdown = stats.get_memory_breakdown();
if breakdown.system_rss_gb > 0.0 {
let pct = (breakdown.tracked_total_gb / breakdown.system_rss_gb * 100.0) as u32;
log::info!(
"MEMORY: RSS={:.1}GB Tracked={:.1}GB ({}%) | Queue: Q1:{:.0}MB Q2:{:.0}MB Q3:{:.0}MB Q4:{:.1}GB Q5:{:.1}GB Q6:{:.0}MB Q7:{:.0}MB | Proc: Pos={:.1}GB Tmpl={:.1}GB | Infra={:.0}MB",
breakdown.system_rss_gb,
breakdown.tracked_total_gb,
pct,
breakdown.q1_mb,
breakdown.q2_mb,
breakdown.q3_mb,
breakdown.q4_gb,
breakdown.q5_gb,
breakdown.q6_mb,
breakdown.q7_mb,
breakdown.position_groups_gb,
breakdown.templates_gb,
breakdown.infrastructure_gb * 1e3, );
} else {
log::info!(
"MEMORY: Tracked={:.1}GB | Queue: Q1:{:.0}MB Q2:{:.0}MB Q3:{:.0}MB Q4:{:.1}GB Q5:{:.1}GB Q6:{:.0}MB Q7:{:.0}MB | Proc: Pos={:.1}GB Tmpl={:.1}GB | Infra={:.0}MB",
breakdown.tracked_total_gb,
breakdown.q1_mb,
breakdown.q2_mb,
breakdown.q3_mb,
breakdown.q4_gb,
breakdown.q5_gb,
breakdown.q6_mb,
breakdown.q7_mb,
breakdown.position_groups_gb,
breakdown.templates_gb,
breakdown.infrastructure_gb * 1e3,
);
}
if breakdown.system_rss_gb > 0.0 {
let untracked_pct = ((breakdown.untracked_gb / breakdown.system_rss_gb) * 100.0) as u32;
if breakdown.untracked_gb > 1.0 {
log::info!(
" Untracked: {:.1}GB ({}%) = allocator fragmentation + noodles internals",
breakdown.untracked_gb,
untracked_pct,
);
}
}
}
#[cfg(feature = "memory-debug")]
pub fn start_memory_monitor(
stats: Arc<PipelineStats>,
shutdown_signal: Arc<AtomicBool>,
report_interval_secs: u64,
) -> thread::JoinHandle<()> {
thread::spawn(move || {
let mut last_report = Instant::now();
let report_interval = Duration::from_secs(report_interval_secs);
let mut last_rss: u64 = 0;
let mut peak_rss: u64 = 0;
let mut stats_printed = false;
while !shutdown_signal.load(Ordering::Relaxed) {
if last_report.elapsed() >= report_interval {
log_comprehensive_memory_stats(&stats);
let current_rss = stats.memory.system_rss_bytes.load(Ordering::Relaxed);
if !stats_printed
&& current_rss > 0
&& last_rss > 0
&& current_rss < last_rss
&& peak_rss > 4_000_000_000
{
log::info!("=== MIMALLOC STATS AT PEAK (no mi_collect) ===");
crate::sort::memory_probe::print_mi_stats();
stats_printed = true;
}
if current_rss > peak_rss {
peak_rss = current_rss;
}
last_rss = current_rss;
last_report = Instant::now();
}
thread::sleep(Duration::from_millis(100));
}
})
}
pub trait BatchWeight {
fn batch_weight(&self) -> usize;
}
pub trait MemoryEstimate {
fn estimate_heap_size(&self) -> usize;
}
#[derive(Debug)]
#[allow(clippy::struct_field_names)]
pub struct MemoryTracker {
current_bytes: AtomicU64,
peak_bytes: AtomicU64,
limit_bytes: u64,
}
impl MemoryTracker {
#[must_use]
pub fn new(limit_bytes: u64) -> Self {
Self { current_bytes: AtomicU64::new(0), peak_bytes: AtomicU64::new(0), limit_bytes }
}
#[must_use]
pub fn unlimited() -> Self {
Self::new(0)
}
pub fn try_add(&self, bytes: usize) -> bool {
if self.limit_bytes == 0 {
let new_total =
self.current_bytes.fetch_add(bytes as u64, Ordering::Relaxed) + bytes as u64;
self.update_peak(new_total);
return true;
}
loop {
let current = self.current_bytes.load(Ordering::Relaxed);
if current >= self.limit_bytes {
return false;
}
let new_total = current + bytes as u64;
if self
.current_bytes
.compare_exchange_weak(current, new_total, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
self.update_peak(new_total);
return true;
}
}
}
#[inline]
fn update_peak(&self, new_total: u64) {
let mut current_peak = self.peak_bytes.load(Ordering::Relaxed);
while new_total > current_peak {
match self.peak_bytes.compare_exchange_weak(
current_peak,
new_total,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(actual) => current_peak = actual,
}
}
}
pub fn remove(&self, bytes: usize) {
let bytes = bytes as u64;
let mut current = self.current_bytes.load(Ordering::Relaxed);
loop {
let new_val = current.saturating_sub(bytes);
match self.current_bytes.compare_exchange_weak(
current,
new_val,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(actual) => current = actual,
}
}
}
pub fn add(&self, bytes: usize) {
let new_total =
self.current_bytes.fetch_add(bytes as u64, Ordering::Relaxed) + bytes as u64;
self.update_peak(new_total);
}
#[must_use]
pub fn current(&self) -> u64 {
self.current_bytes.load(Ordering::Relaxed)
}
#[must_use]
pub fn limit(&self) -> u64 {
self.limit_bytes
}
#[must_use]
pub fn peak(&self) -> u64 {
self.peak_bytes.load(Ordering::Relaxed)
}
#[must_use]
pub fn is_at_limit(&self) -> bool {
let backpressure_threshold = if self.limit_bytes == 0 {
BACKPRESSURE_THRESHOLD_BYTES
} else {
self.limit_bytes.min(BACKPRESSURE_THRESHOLD_BYTES)
};
self.current() >= backpressure_threshold
}
#[must_use]
pub fn is_below_drain_threshold(&self) -> bool {
let backpressure_threshold = if self.limit_bytes == 0 {
BACKPRESSURE_THRESHOLD_BYTES
} else {
self.limit_bytes.min(BACKPRESSURE_THRESHOLD_BYTES)
};
self.current() < backpressure_threshold / 2
}
}
impl Default for MemoryTracker {
fn default() -> Self {
Self::unlimited()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum StepResult {
Success,
OutputFull,
InputEmpty,
}
impl StepResult {
#[inline]
#[must_use]
pub fn is_success(self) -> bool {
matches!(self, StepResult::Success)
}
}
pub const PROGRESS_LOG_INTERVAL: u64 = 1_000_000;
pub const BACKPRESSURE_THRESHOLD_BYTES: u64 = 512 * 1024 * 1024;
pub const Q5_BACKPRESSURE_THRESHOLD_BYTES: u64 = 256 * 1024 * 1024;
#[derive(Debug)]
pub struct ReorderBufferState {
pub next_seq: AtomicU64,
pub heap_bytes: AtomicU64,
memory_limit: u64,
}
impl ReorderBufferState {
#[must_use]
pub fn new(memory_limit: u64) -> Self {
Self { next_seq: AtomicU64::new(0), heap_bytes: AtomicU64::new(0), memory_limit }
}
#[must_use]
pub fn can_proceed(&self, serial: u64) -> bool {
let limit = self.effective_limit();
let next_seq = self.next_seq.load(Ordering::Acquire);
let heap_bytes = self.heap_bytes.load(Ordering::Acquire);
if serial == next_seq {
return true;
}
let effective_limit = limit / 2;
heap_bytes < effective_limit
}
#[must_use]
pub fn is_memory_high(&self) -> bool {
let threshold = self.effective_limit();
self.heap_bytes.load(Ordering::Acquire) >= threshold
}
#[must_use]
pub fn is_memory_drained(&self) -> bool {
let threshold = self.effective_limit();
self.heap_bytes.load(Ordering::Acquire) < threshold / 2
}
#[inline]
#[must_use]
fn effective_limit(&self) -> u64 {
if self.memory_limit == 0 {
BACKPRESSURE_THRESHOLD_BYTES
} else {
self.memory_limit.min(BACKPRESSURE_THRESHOLD_BYTES)
}
}
#[inline]
pub fn add_heap_bytes(&self, bytes: u64) {
self.heap_bytes.fetch_add(bytes, Ordering::AcqRel);
}
#[inline]
pub fn sub_heap_bytes(&self, bytes: u64) {
self.heap_bytes.fetch_sub(bytes, Ordering::AcqRel);
}
#[inline]
pub fn update_next_seq(&self, new_seq: u64) {
self.next_seq.store(new_seq, Ordering::Release);
}
#[inline]
#[must_use]
pub fn get_next_seq(&self) -> u64 {
self.next_seq.load(Ordering::Acquire)
}
#[inline]
#[must_use]
pub fn get_heap_bytes(&self) -> u64 {
self.heap_bytes.load(Ordering::Acquire)
}
#[inline]
#[must_use]
pub fn get_memory_limit(&self) -> u64 {
self.memory_limit
}
}
use crate::bgzf_writer::{CompressedBlock, InlineBgzfCompressor};
#[derive(Default)]
pub struct RawBlockBatch {
pub blocks: Vec<RawBgzfBlock>,
}
impl RawBlockBatch {
#[must_use]
pub fn new() -> Self {
Self { blocks: Vec::new() }
}
#[must_use]
pub fn with_capacity(capacity: usize) -> Self {
Self { blocks: Vec::with_capacity(capacity) }
}
#[must_use]
pub fn len(&self) -> usize {
self.blocks.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.blocks.is_empty()
}
#[must_use]
pub fn total_uncompressed_size(&self) -> usize {
self.blocks.iter().map(RawBgzfBlock::uncompressed_size).sum()
}
#[must_use]
pub fn total_compressed_size(&self) -> usize {
self.blocks.iter().map(RawBgzfBlock::len).sum()
}
pub fn clear(&mut self) {
self.blocks.clear();
}
}
impl MemoryEstimate for RawBlockBatch {
fn estimate_heap_size(&self) -> usize {
self.blocks.iter().map(|b| b.data.capacity()).sum::<usize>()
+ self.blocks.capacity() * std::mem::size_of::<RawBgzfBlock>()
}
}
#[derive(Default)]
pub struct CompressedBlockBatch {
pub blocks: Vec<CompressedBlock>,
pub record_count: u64,
pub secondary_data: Option<Vec<u8>>,
}
impl CompressedBlockBatch {
#[must_use]
pub fn new() -> Self {
Self { blocks: Vec::new(), record_count: 0, secondary_data: None }
}
#[must_use]
pub fn with_capacity(capacity: usize) -> Self {
Self { blocks: Vec::with_capacity(capacity), record_count: 0, secondary_data: None }
}
#[must_use]
pub fn len(&self) -> usize {
self.blocks.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.blocks.is_empty()
}
#[must_use]
pub fn total_size(&self) -> usize {
self.blocks.iter().map(|b| b.data.len()).sum()
}
pub fn clear(&mut self) {
self.blocks.clear();
self.record_count = 0;
self.secondary_data = None;
}
}
impl MemoryEstimate for CompressedBlockBatch {
fn estimate_heap_size(&self) -> usize {
self.blocks.iter().map(|b| b.data.capacity()).sum::<usize>()
+ self.blocks.capacity() * std::mem::size_of::<CompressedBlock>()
+ self.secondary_data.as_ref().map_or(0, Vec::capacity)
}
}
#[derive(Debug, Clone)]
pub struct BgzfBatchConfig {
pub blocks_per_batch: usize,
pub compression_level: u32,
}
impl Default for BgzfBatchConfig {
fn default() -> Self {
Self { blocks_per_batch: 16, compression_level: 6 }
}
}
impl BgzfBatchConfig {
#[must_use]
pub fn new(blocks_per_batch: usize) -> Self {
Self { blocks_per_batch, ..Default::default() }
}
#[must_use]
pub fn with_compression_level(mut self, level: u32) -> Self {
self.compression_level = level;
self
}
}
pub fn read_raw_block_batch(
reader: &mut dyn Read,
buffer: &mut RawBlockBatch,
blocks_per_batch: usize,
) -> io::Result<bool> {
buffer.clear();
let blocks = read_raw_blocks(reader, blocks_per_batch)?;
if blocks.is_empty() {
return Ok(false);
}
buffer.blocks = blocks;
Ok(true)
}
pub fn write_compressed_batch(
writer: &mut dyn Write,
batch: &CompressedBlockBatch,
) -> io::Result<()> {
for block in &batch.blocks {
writer.write_all(&block.data)?;
}
Ok(())
}
pub struct BgzfWorkerState {
pub decompressor: libdeflater::Decompressor,
pub compressor: InlineBgzfCompressor,
}
impl BgzfWorkerState {
#[must_use]
pub fn new(compression_level: u32) -> Self {
Self {
decompressor: libdeflater::Decompressor::new(),
compressor: InlineBgzfCompressor::new(compression_level),
}
}
pub fn decompress_batch(&mut self, batch: &RawBlockBatch) -> io::Result<Vec<u8>> {
let total_size = batch.total_uncompressed_size();
let mut result = Vec::with_capacity(total_size);
for block in &batch.blocks {
decompress_block_into(block, &mut self.decompressor, &mut result)?;
}
Ok(result)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PipelineStep {
Read,
Decompress,
FindBoundaries,
Decode,
Group,
Process,
Serialize,
Compress,
Write,
}
impl PipelineStep {
#[must_use]
pub const fn is_exclusive(&self) -> bool {
matches!(self, Self::Read | Self::FindBoundaries | Self::Group | Self::Write)
}
#[must_use]
pub const fn all() -> [PipelineStep; 9] {
[
PipelineStep::Read,
PipelineStep::Decompress,
PipelineStep::FindBoundaries,
PipelineStep::Decode,
PipelineStep::Group,
PipelineStep::Process,
PipelineStep::Serialize,
PipelineStep::Compress,
PipelineStep::Write,
]
}
#[must_use]
pub const fn from_index(idx: usize) -> PipelineStep {
match idx {
0 => PipelineStep::Read,
1 => PipelineStep::Decompress,
2 => PipelineStep::FindBoundaries,
3 => PipelineStep::Decode,
4 => PipelineStep::Group,
5 => PipelineStep::Process,
6 => PipelineStep::Serialize,
7 => PipelineStep::Compress,
8 => PipelineStep::Write,
_ => panic!("PipelineStep::from_index: invalid index (must be 0..=8)"),
}
}
#[must_use]
pub const fn index(&self) -> usize {
match self {
PipelineStep::Read => 0,
PipelineStep::Decompress => 1,
PipelineStep::FindBoundaries => 2,
PipelineStep::Decode => 3,
PipelineStep::Group => 4,
PipelineStep::Process => 5,
PipelineStep::Serialize => 6,
PipelineStep::Compress => 7,
PipelineStep::Write => 8,
}
}
#[must_use]
pub const fn short_name(&self) -> &'static str {
match self {
PipelineStep::Read => "Rd",
PipelineStep::Decompress => "Dc",
PipelineStep::FindBoundaries => "Fb",
PipelineStep::Decode => "De",
PipelineStep::Group => "Gr",
PipelineStep::Process => "Pr",
PipelineStep::Serialize => "Se",
PipelineStep::Compress => "Co",
PipelineStep::Write => "Wr",
}
}
}
#[derive(Debug, Clone)]
pub struct ActiveSteps {
steps: Vec<PipelineStep>,
active: [bool; 9],
}
impl ActiveSteps {
#[must_use]
pub fn new(steps: &[PipelineStep]) -> Self {
assert!(
steps.windows(2).all(|w| w[0].index() < w[1].index()),
"ActiveSteps must be unique and in pipeline order"
);
let mut active = [false; 9];
for &step in steps {
active[step.index()] = true;
}
Self { steps: steps.to_vec(), active }
}
#[must_use]
pub fn all() -> Self {
Self::new(&PipelineStep::all())
}
#[must_use]
pub fn is_active(&self, step: PipelineStep) -> bool {
self.active[step.index()]
}
#[must_use]
pub fn steps(&self) -> &[PipelineStep] {
&self.steps
}
#[must_use]
pub fn exclusive_steps(&self) -> Vec<PipelineStep> {
self.steps.iter().copied().filter(PipelineStep::is_exclusive).collect()
}
#[must_use]
pub fn len(&self) -> usize {
self.steps.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.steps.is_empty()
}
pub fn filter_in_place(&self, buffer: &mut [PipelineStep; 9]) -> usize {
let mut write = 0;
for read in 0..9 {
if self.active[buffer[read].index()] {
buffer[write] = buffer[read];
write += 1;
}
}
write
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct GroupKey {
pub ref_id1: i32,
pub pos1: i32,
pub strand1: u8,
pub ref_id2: i32,
pub pos2: i32,
pub strand2: u8,
pub library_idx: u16,
pub cell_hash: u64,
pub name_hash: u64,
}
impl GroupKey {
pub const UNKNOWN_REF: i32 = i32::MAX;
pub const UNKNOWN_POS: i32 = i32::MAX;
pub const UNKNOWN_STRAND: u8 = u8::MAX;
#[must_use]
#[allow(clippy::too_many_arguments)]
pub fn paired(
ref_id: i32,
pos: i32,
strand: u8,
mate_ref_id: i32,
mate_pos: i32,
mate_strand: u8,
library_idx: u16,
cell_hash: u64,
name_hash: u64,
) -> Self {
let (ref_id1, pos1, strand1, ref_id2, pos2, strand2) =
if (ref_id, pos, strand) <= (mate_ref_id, mate_pos, mate_strand) {
(ref_id, pos, strand, mate_ref_id, mate_pos, mate_strand)
} else {
(mate_ref_id, mate_pos, mate_strand, ref_id, pos, strand)
};
Self { ref_id1, pos1, strand1, ref_id2, pos2, strand2, library_idx, cell_hash, name_hash }
}
#[must_use]
pub fn single(
ref_id: i32,
pos: i32,
strand: u8,
library_idx: u16,
cell_hash: u64,
name_hash: u64,
) -> Self {
Self {
ref_id1: ref_id,
pos1: pos,
strand1: strand,
ref_id2: Self::UNKNOWN_REF,
pos2: Self::UNKNOWN_POS,
strand2: Self::UNKNOWN_STRAND,
library_idx,
cell_hash,
name_hash,
}
}
#[must_use]
pub fn position_key(&self) -> (i32, i32, u8, i32, i32, u8, u16, u64) {
(
self.ref_id1,
self.pos1,
self.strand1,
self.ref_id2,
self.pos2,
self.strand2,
self.library_idx,
self.cell_hash,
)
}
}
impl PartialOrd for GroupKey {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for GroupKey {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.position_key()
.cmp(&other.position_key())
.then_with(|| self.name_hash.cmp(&other.name_hash))
}
}
impl Default for GroupKey {
fn default() -> Self {
Self {
ref_id1: Self::UNKNOWN_REF,
pos1: Self::UNKNOWN_POS,
strand1: Self::UNKNOWN_STRAND,
ref_id2: Self::UNKNOWN_REF,
pos2: Self::UNKNOWN_POS,
strand2: Self::UNKNOWN_STRAND,
library_idx: 0,
cell_hash: 0,
name_hash: 0,
}
}
}
#[derive(Debug)]
pub struct DecodedRecord {
pub key: GroupKey,
pub(crate) data: RawRecord,
}
impl DecodedRecord {
#[must_use]
pub fn from_raw_bytes(raw: impl Into<RawRecord>, key: GroupKey) -> Self {
Self { key, data: raw.into() }
}
#[must_use]
pub fn raw_bytes(&self) -> &[u8] {
self.data.as_ref()
}
#[must_use]
pub fn into_raw_bytes(self) -> RawRecord {
self.data
}
}
pub(crate) fn estimate_record_buf_heap_size(record: &RecordBuf) -> usize {
let name_size = record.name().map_or(0, |n| n.len());
let seq_len = record.sequence().len();
let qual_len = record.quality_scores().len();
let cigar_ops = record.cigar().as_ref().len();
let cigar_size = cigar_ops * 4;
let data_fields = record.data().iter().count();
let entry_capacity = (data_fields * 115) / 100 + 1;
let entries_size = data_fields * 48;
let hash_table_size = entry_capacity * 16;
let value_heap_size = data_fields * 16;
let data_size = entries_size + hash_table_size + value_heap_size;
name_size + seq_len + qual_len + cigar_size + data_size
}
impl MemoryEstimate for DecodedRecord {
fn estimate_heap_size(&self) -> usize {
self.data.capacity()
}
}
impl MemoryEstimate for Vec<DecodedRecord> {
fn estimate_heap_size(&self) -> usize {
self.iter().map(MemoryEstimate::estimate_heap_size).sum::<usize>()
+ self.capacity() * std::mem::size_of::<DecodedRecord>()
}
}
impl MemoryEstimate for RecordBuf {
fn estimate_heap_size(&self) -> usize {
estimate_record_buf_heap_size(self)
}
}
impl MemoryEstimate for Vec<RecordBuf> {
fn estimate_heap_size(&self) -> usize {
self.iter().map(MemoryEstimate::estimate_heap_size).sum::<usize>()
+ self.capacity() * std::mem::size_of::<RecordBuf>()
}
}
impl MemoryEstimate for Vec<u8> {
fn estimate_heap_size(&self) -> usize {
self.capacity()
}
}
#[derive(Debug, Clone)]
pub struct GroupKeyConfig {
pub library_index: Arc<LibraryIndex>,
pub cell_tag: Option<Tag>,
}
impl GroupKeyConfig {
#[must_use]
pub fn new(library_index: LibraryIndex, cell_tag: Tag) -> Self {
Self { library_index: Arc::new(library_index), cell_tag: Some(cell_tag) }
}
#[must_use]
pub fn new_raw_no_cell(library_index: LibraryIndex) -> Self {
Self { library_index: Arc::new(library_index), cell_tag: None }
}
}
impl Default for GroupKeyConfig {
fn default() -> Self {
Self {
library_index: Arc::new(LibraryIndex::default()),
cell_tag: Some(Tag::from(SamTag::CB)), }
}
}
#[derive(Default)]
pub struct DecompressedBatch {
pub data: Vec<u8>,
}
impl DecompressedBatch {
#[must_use]
pub fn new() -> Self {
Self { data: Vec::new() }
}
#[must_use]
pub fn with_capacity(capacity: usize) -> Self {
Self { data: Vec::with_capacity(capacity) }
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.data.is_empty()
}
pub fn clear(&mut self) {
self.data.clear();
}
}
impl MemoryEstimate for DecompressedBatch {
fn estimate_heap_size(&self) -> usize {
self.data.capacity()
}
}
#[derive(Default)]
pub struct SerializedBatch {
pub data: Vec<u8>,
pub record_count: u64,
pub secondary_data: Option<Vec<u8>>,
}
impl SerializedBatch {
#[must_use]
pub fn new() -> Self {
Self { data: Vec::new(), record_count: 0, secondary_data: None }
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.data.is_empty()
}
pub fn clear(&mut self) {
self.data.clear();
self.record_count = 0;
self.secondary_data = None;
}
}
impl MemoryEstimate for SerializedBatch {
fn estimate_heap_size(&self) -> usize {
self.data.capacity() + self.secondary_data.as_ref().map_or(0, Vec::capacity)
}
}
pub struct OutputPipelineQueues<G, P: MemoryEstimate> {
pub groups: ArrayQueue<(u64, Vec<G>)>,
pub groups_heap_bytes: AtomicU64,
pub processed: ArrayQueue<(u64, Vec<P>)>,
pub processed_heap_bytes: AtomicU64,
pub serialized: ArrayQueue<(u64, SerializedBatch)>,
pub serialized_heap_bytes: AtomicU64,
pub compressed: ArrayQueue<(u64, CompressedBlockBatch)>,
pub compressed_heap_bytes: AtomicU64,
pub write_reorder: Mutex<ReorderBuffer<CompressedBlockBatch>>,
pub write_reorder_state: ReorderBufferState,
pub output: Mutex<Option<Box<dyn Write + Send>>>,
pub secondary_output: Option<Mutex<Option<crate::bam_io::RawBamWriter>>>,
pub error_flag: AtomicBool,
pub error: Mutex<Option<io::Error>>,
pub items_written: AtomicU64,
pub draining: AtomicBool,
pub progress: ProgressTracker,
pub stats: Option<Arc<PipelineStats>>,
}
impl<G: Send, P: Send + MemoryEstimate> OutputPipelineQueues<G, P> {
#[must_use]
pub fn new(
queue_capacity: usize,
output: Box<dyn Write + Send>,
stats: Option<Arc<PipelineStats>>,
progress_name: &str,
queue_memory_limit: u64,
) -> Self {
Self {
groups: ArrayQueue::new(queue_capacity),
groups_heap_bytes: AtomicU64::new(0),
processed: ArrayQueue::new(queue_capacity),
processed_heap_bytes: AtomicU64::new(0),
serialized: ArrayQueue::new(queue_capacity),
serialized_heap_bytes: AtomicU64::new(0),
compressed: ArrayQueue::new(queue_capacity),
compressed_heap_bytes: AtomicU64::new(0),
write_reorder: Mutex::new(ReorderBuffer::new()),
write_reorder_state: ReorderBufferState::new(queue_memory_limit),
output: Mutex::new(Some(output)),
secondary_output: None,
error_flag: AtomicBool::new(false),
error: Mutex::new(None),
items_written: AtomicU64::new(0),
draining: AtomicBool::new(false),
progress: ProgressTracker::new(progress_name).with_interval(PROGRESS_LOG_INTERVAL),
stats,
}
}
pub fn set_secondary_output(&mut self, writer: crate::bam_io::RawBamWriter) {
self.secondary_output = Some(Mutex::new(Some(writer)));
}
pub fn set_error(&self, error: io::Error) {
self.error_flag.store(true, Ordering::SeqCst);
let mut guard = self.error.lock();
if guard.is_none() {
*guard = Some(error);
}
}
#[must_use]
pub fn has_error(&self) -> bool {
self.error_flag.load(Ordering::Relaxed)
}
pub fn take_error(&self) -> Option<io::Error> {
self.error.lock().take()
}
#[must_use]
pub fn is_processed_memory_high(&self) -> bool {
self.processed_heap_bytes.load(Ordering::Acquire) >= Q5_BACKPRESSURE_THRESHOLD_BYTES
}
#[must_use]
pub fn is_draining(&self) -> bool {
self.draining.load(Ordering::Relaxed)
}
pub fn set_draining(&self, value: bool) {
self.draining.store(value, Ordering::Relaxed);
}
#[must_use]
pub fn should_apply_process_backpressure(&self) -> bool {
self.processed.is_full() || self.is_processed_memory_high()
}
#[must_use]
pub fn queue_depths(&self) -> OutputQueueDepths {
OutputQueueDepths {
groups: self.groups.len(),
processed: self.processed.len(),
serialized: self.serialized.len(),
compressed: self.compressed.len(),
}
}
#[must_use]
pub fn are_queues_empty(&self) -> bool {
self.groups.is_empty()
&& self.processed.is_empty()
&& self.serialized.is_empty()
&& self.compressed.is_empty()
&& self.write_reorder.lock().is_empty()
}
}
#[derive(Debug, Clone, Copy)]
pub struct OutputQueueDepths {
pub groups: usize,
pub processed: usize,
pub serialized: usize,
pub compressed: usize,
}
pub const MIN_BACKOFF_US: u64 = 10;
pub const MAX_BACKOFF_US: u64 = 1000;
pub const SERIALIZATION_BUFFER_CAPACITY: usize = 256 * 1024;
pub struct WorkerCoreState {
pub compressor: InlineBgzfCompressor,
pub scheduler: Box<dyn Scheduler>,
pub serialization_buffer: Vec<u8>,
pub secondary_serialization_buffer: Vec<u8>,
pub backoff_us: u64,
pub recycled_buffers: Vec<Vec<u8>>,
}
impl WorkerCoreState {
#[must_use]
pub fn new(
compression_level: u32,
thread_id: usize,
num_threads: usize,
scheduler_strategy: SchedulerStrategy,
active_steps: ActiveSteps,
) -> Self {
Self {
compressor: InlineBgzfCompressor::new(compression_level),
scheduler: create_scheduler(scheduler_strategy, thread_id, num_threads, active_steps),
serialization_buffer: Vec::with_capacity(SERIALIZATION_BUFFER_CAPACITY),
secondary_serialization_buffer: Vec::new(),
backoff_us: MIN_BACKOFF_US,
recycled_buffers: Vec::with_capacity(2),
}
}
#[inline]
pub fn reset_backoff(&mut self) {
self.backoff_us = MIN_BACKOFF_US;
}
#[inline]
pub fn increase_backoff(&mut self) {
self.backoff_us = (self.backoff_us * 2).min(MAX_BACKOFF_US);
}
#[inline]
pub fn sleep_backoff(&self) {
if self.backoff_us <= MIN_BACKOFF_US {
std::thread::yield_now();
} else {
let jitter_range = self.backoff_us / 4;
let jitter_seed = u64::from(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.subsec_nanos())
.unwrap_or(0),
);
let jitter_offset = (jitter_seed % (jitter_range * 2)).saturating_sub(jitter_range);
let actual_us = self.backoff_us.saturating_add(jitter_offset).max(MIN_BACKOFF_US);
std::thread::sleep(std::time::Duration::from_micros(actual_us));
}
}
#[inline]
pub fn take_or_alloc_buffer(&mut self, capacity: usize) -> Vec<u8> {
if let Some(mut buf) = self.recycled_buffers.pop() {
buf.clear();
buf.reserve(capacity);
buf
} else {
Vec::with_capacity(capacity)
}
}
#[inline]
pub fn recycle_buffer(&mut self, buf: Vec<u8>) {
if self.recycled_buffers.len() < 2 {
self.recycled_buffers.push(buf);
}
}
}
impl HasCompressor for WorkerCoreState {
fn compressor_mut(&mut self) -> &mut InlineBgzfCompressor {
&mut self.compressor
}
}
impl HasRecycledBuffers for WorkerCoreState {
fn take_or_alloc_buffer(&mut self, capacity: usize) -> Vec<u8> {
self.take_or_alloc_buffer(capacity)
}
fn recycle_buffer(&mut self, buf: Vec<u8>) {
self.recycle_buffer(buf);
}
}
#[derive(Debug, Clone)]
pub struct PipelineValidationError {
pub non_empty_queues: Vec<String>,
pub counter_mismatches: Vec<String>,
pub leaked_heap_bytes: u64,
}
impl std::fmt::Display for PipelineValidationError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
writeln!(f, "Pipeline validation failed - potential data loss detected:")?;
if !self.non_empty_queues.is_empty() {
writeln!(f, " Non-empty queues: {}", self.non_empty_queues.join(", "))?;
}
if !self.counter_mismatches.is_empty() {
writeln!(f, " Counter mismatches:")?;
for mismatch in &self.counter_mismatches {
writeln!(f, " - {mismatch}")?;
}
}
if self.leaked_heap_bytes > 0 {
writeln!(f, " Leaked heap bytes: {}", self.leaked_heap_bytes)?;
}
Ok(())
}
}
impl std::error::Error for PipelineValidationError {}
pub trait PipelineLifecycle {
fn is_complete(&self) -> bool;
fn has_error(&self) -> bool;
fn take_error(&self) -> Option<io::Error>;
fn set_error(&self, error: io::Error);
fn is_draining(&self) -> bool;
fn set_draining(&self, value: bool);
fn stats(&self) -> Option<&PipelineStats>;
fn progress(&self) -> &ProgressTracker;
fn items_written(&self) -> u64;
fn flush_output(&self) -> io::Result<()>;
fn validate_completion(&self) -> Result<(), PipelineValidationError>;
}
#[inline]
pub fn should_monitor_exit<P: PipelineLifecycle>(pipeline: &P) -> bool {
pipeline.is_complete() || pipeline.has_error()
}
pub trait MonitorableState: PipelineLifecycle {
fn deadlock_state(&self) -> &DeadlockState;
fn build_queue_snapshot(&self) -> QueueSnapshot;
}
pub fn run_monitor_loop<S, F>(
state: &Arc<S>,
sample_interval_ms: u64,
deadlock_check_samples: u32,
on_sample: F,
) where
S: MonitorableState,
F: Fn(&S),
{
let mut deadlock_counter = 0u32;
loop {
thread::sleep(Duration::from_millis(sample_interval_ms));
if should_monitor_exit(state.as_ref()) {
break;
}
on_sample(state.as_ref());
if state.deadlock_state().is_enabled() {
deadlock_counter += 1;
if deadlock_counter >= deadlock_check_samples {
deadlock_counter = 0;
let snapshot = state.build_queue_snapshot();
if let DeadlockAction::Detected =
check_deadlock_and_restore(state.deadlock_state(), &snapshot)
{
state.set_error(io::Error::new(
io::ErrorKind::TimedOut,
"pipeline deadlock detected with recovery disabled; \
use --deadlock-recover to enable automatic recovery",
));
break;
}
}
}
}
}
#[must_use]
#[allow(clippy::needless_pass_by_value)]
pub fn extract_panic_message(panic_info: Box<dyn std::any::Any + Send>) -> String {
if let Some(s) = panic_info.downcast_ref::<&str>() {
(*s).to_string()
} else if let Some(s) = panic_info.downcast_ref::<String>() {
s.clone()
} else {
"Unknown panic".to_string()
}
}
pub fn handle_worker_panic<S: PipelineLifecycle>(
state: &S,
thread_id: usize,
panic_info: Box<dyn std::any::Any + Send>,
) {
let msg = extract_panic_message(panic_info);
state.set_error(io::Error::other(format!("Worker thread {thread_id} panicked: {msg}")));
}
pub fn join_worker_threads(handles: Vec<thread::JoinHandle<()>>) -> io::Result<()> {
for handle in handles {
handle.join().map_err(|_| io::Error::other("Worker thread panicked"))?;
}
Ok(())
}
pub fn join_monitor_thread(handle: Option<thread::JoinHandle<()>>) {
if let Some(h) = handle {
let _ = h.join();
}
}
pub fn finalize_pipeline<S: PipelineLifecycle>(state: &S) -> io::Result<u64> {
if let Some(error) = state.take_error() {
return Err(error);
}
state.validate_completion().map_err(io::Error::other)?;
state.flush_output()?;
if let Some(stats) = state.stats() {
stats.log_summary();
}
Ok(state.items_written())
}
impl<G: Send + 'static, P: Send + MemoryEstimate + 'static> ProcessPipelineState<G, P>
for OutputPipelineQueues<G, P>
{
fn process_input_pop(&self) -> Option<(u64, Vec<G>)> {
self.groups.pop()
}
fn process_output_is_full(&self) -> bool {
self.processed.is_full()
}
fn process_output_push(&self, item: (u64, Vec<P>)) -> Result<(), (u64, Vec<P>)> {
let heap_size: usize = item.1.iter().map(MemoryEstimate::estimate_heap_size).sum();
let result = self.processed.push(item);
if result.is_ok() {
self.processed_heap_bytes.fetch_add(heap_size as u64, Ordering::AcqRel);
}
result
}
fn has_error(&self) -> bool {
self.error_flag.load(Ordering::Acquire)
}
fn set_error(&self, error: io::Error) {
OutputPipelineQueues::set_error(self, error);
}
fn should_apply_process_backpressure(&self) -> bool {
OutputPipelineQueues::should_apply_process_backpressure(self)
}
fn is_draining(&self) -> bool {
OutputPipelineQueues::is_draining(self)
}
}
impl<G: Send + 'static, P: Send + MemoryEstimate + 'static> SerializePipelineState<P>
for OutputPipelineQueues<G, P>
{
fn serialize_input_pop(&self) -> Option<(u64, Vec<P>)> {
let result = self.processed.pop();
if let Some((_, ref batch)) = result {
let heap_size: usize = batch.iter().map(MemoryEstimate::estimate_heap_size).sum();
self.processed_heap_bytes.fetch_sub(heap_size as u64, Ordering::AcqRel);
}
result
}
fn serialize_output_is_full(&self) -> bool {
self.serialized.is_full()
}
fn serialize_output_push(
&self,
item: (u64, SerializedBatch),
) -> Result<(), (u64, SerializedBatch)> {
let heap_size = item.1.estimate_heap_size();
let result = self.serialized.push(item);
if result.is_ok() {
self.serialized_heap_bytes.fetch_add(heap_size as u64, Ordering::AcqRel);
}
result
}
fn has_error(&self) -> bool {
self.error_flag.load(Ordering::Acquire)
}
fn set_error(&self, error: io::Error) {
OutputPipelineQueues::set_error(self, error);
}
fn record_serialized_bytes(&self, bytes: u64) {
if let Some(ref stats) = self.stats {
stats.serialized_bytes.fetch_add(bytes, Ordering::Relaxed);
}
}
}
impl<G: Send + 'static, P: Send + MemoryEstimate + 'static> WritePipelineState
for OutputPipelineQueues<G, P>
{
fn write_input_queue(&self) -> &ArrayQueue<(u64, CompressedBlockBatch)> {
&self.compressed
}
fn write_reorder_buffer(&self) -> &Mutex<ReorderBuffer<CompressedBlockBatch>> {
&self.write_reorder
}
fn write_reorder_state(&self) -> &ReorderBufferState {
&self.write_reorder_state
}
fn write_output(&self) -> &Mutex<Option<Box<dyn Write + Send>>> {
&self.output
}
fn has_error(&self) -> bool {
self.error_flag.load(Ordering::Acquire)
}
fn set_error(&self, error: io::Error) {
OutputPipelineQueues::set_error(self, error);
}
fn record_written(&self, count: u64) {
self.items_written.fetch_add(count, Ordering::Release);
}
fn stats(&self) -> Option<&PipelineStats> {
self.stats.as_deref()
}
}
impl<G: Send + 'static, P: Send + MemoryEstimate + 'static> OutputPipelineState
for OutputPipelineQueues<G, P>
{
type Processed = P;
fn has_error(&self) -> bool {
self.error_flag.load(Ordering::Acquire)
}
fn set_error(&self, error: io::Error) {
OutputPipelineQueues::set_error(self, error);
}
fn q5_pop(&self) -> Option<(u64, SerializedBatch)> {
self.serialized.pop()
}
fn q5_push(&self, item: (u64, SerializedBatch)) -> Result<(), (u64, SerializedBatch)> {
self.serialized.push(item)
}
fn q5_is_full(&self) -> bool {
self.serialized.is_full()
}
fn q5_track_pop(&self, heap_size: u64) {
self.serialized_heap_bytes.fetch_sub(heap_size, Ordering::AcqRel);
}
fn q6_pop(&self) -> Option<(u64, CompressedBlockBatch)> {
self.compressed.pop()
}
fn q6_push(
&self,
item: (u64, CompressedBlockBatch),
) -> Result<(), (u64, CompressedBlockBatch)> {
self.compressed.push(item)
}
fn q6_is_full(&self) -> bool {
self.compressed.is_full()
}
fn q6_reorder_insert(&self, serial: u64, batch: CompressedBlockBatch) {
self.write_reorder.lock().insert(serial, batch);
}
fn q6_reorder_try_pop_next(&self) -> Option<CompressedBlockBatch> {
self.write_reorder.lock().try_pop_next()
}
fn output_try_lock(
&self,
) -> Option<parking_lot::MutexGuard<'_, Option<Box<dyn Write + Send>>>> {
self.output.try_lock()
}
fn increment_written(&self) -> u64 {
self.items_written.fetch_add(1, Ordering::Release)
}
fn record_compressed_bytes_out(&self, bytes: u64) {
if let Some(ref stats) = self.stats {
stats.compressed_bytes_out.fetch_add(bytes, Ordering::Relaxed);
}
}
fn record_q6_pop_progress(&self) {
}
fn record_q7_push_progress(&self) {
}
fn write_reorder_is_memory_high(&self) -> bool {
self.write_reorder_state.is_memory_high()
}
fn stats(&self) -> Option<&PipelineStats> {
self.stats.as_deref()
}
}
impl MemoryEstimate for () {
fn estimate_heap_size(&self) -> usize {
0
}
}
#[derive(Debug, Clone)]
pub struct PipelineConfig {
pub num_threads: usize,
pub queue_capacity: usize,
pub input_low_water: usize,
pub output_high_water: usize,
pub compression_level: u32,
pub blocks_per_read_batch: usize,
pub collect_stats: bool,
pub batch_size: usize,
pub target_templates_per_batch: usize,
pub header_already_read: bool,
pub scheduler_strategy: SchedulerStrategy,
pub queue_memory_limit: u64,
pub deadlock_timeout_secs: u64,
pub deadlock_recover_enabled: bool,
pub shared_stats: Option<Arc<PipelineStats>>,
}
impl PipelineConfig {
#[must_use]
pub fn new(num_threads: usize, compression_level: u32) -> Self {
Self {
num_threads: num_threads.max(1),
queue_capacity: 64,
input_low_water: 8,
output_high_water: 32,
compression_level,
blocks_per_read_batch: 16,
collect_stats: false,
batch_size: 1,
target_templates_per_batch: 0, header_already_read: false,
scheduler_strategy: SchedulerStrategy::default(),
queue_memory_limit: 0, deadlock_timeout_secs: 10, deadlock_recover_enabled: false, shared_stats: None, }
}
#[must_use]
pub fn with_compression_level(mut self, level: u32) -> Self {
self.compression_level = level;
self
}
#[must_use]
pub fn with_blocks_per_batch(mut self, blocks: usize) -> Self {
self.blocks_per_read_batch = blocks;
self
}
#[must_use]
pub fn with_stats(mut self, collect: bool) -> Self {
self.collect_stats = collect;
self
}
#[must_use]
pub fn with_shared_stats(mut self, stats: Arc<PipelineStats>) -> Self {
self.collect_stats = true; self.shared_stats = Some(stats);
self
}
#[must_use]
pub fn with_batch_size(mut self, size: usize) -> Self {
self.batch_size = size.max(1);
self
}
#[must_use]
pub fn with_target_templates_per_batch(mut self, count: usize) -> Self {
self.target_templates_per_batch = count;
self
}
#[must_use]
pub fn auto_tuned(num_threads: usize, compression_level: u32) -> Self {
let num_threads = num_threads.max(1);
let queue_capacity = (num_threads * 16).clamp(64, 256);
let input_low_water = num_threads.max(4);
let output_high_water = (num_threads * 4).max(32);
let blocks_per_read_batch = match num_threads {
1..=3 => 16,
4..=7 => 32,
8..=15 => 48,
_ => 64,
};
Self {
num_threads,
queue_capacity,
input_low_water,
output_high_water,
compression_level,
blocks_per_read_batch,
collect_stats: false,
batch_size: 1,
target_templates_per_batch: 500,
header_already_read: false,
scheduler_strategy: SchedulerStrategy::default(),
queue_memory_limit: 0, deadlock_timeout_secs: 10, deadlock_recover_enabled: false, shared_stats: None, }
}
#[must_use]
pub fn with_scheduler_strategy(mut self, strategy: SchedulerStrategy) -> Self {
self.scheduler_strategy = strategy;
self
}
#[must_use]
pub fn with_queue_memory_limit(mut self, limit_bytes: u64) -> Self {
self.queue_memory_limit = limit_bytes;
self
}
#[must_use]
pub fn with_deadlock_timeout(mut self, timeout_secs: u64) -> Self {
self.deadlock_timeout_secs = timeout_secs;
self
}
#[must_use]
pub fn with_deadlock_recovery(mut self, enabled: bool) -> Self {
self.deadlock_recover_enabled = enabled;
self
}
}
pub const MAX_THREADS: usize = 32;
const NUM_STEPS: usize = 9;
#[derive(Debug, Clone)]
pub struct QueueSample {
pub time_ms: u64,
pub queue_sizes: [usize; 8],
pub reorder_sizes: [usize; 3],
pub queue_memory_bytes: [u64; 8],
pub reorder_memory_bytes: [u64; 3],
pub thread_steps: Vec<u8>,
}
#[derive(Debug)]
pub struct PipelineStats {
pub step_read_ns: AtomicU64,
pub step_decompress_ns: AtomicU64,
pub step_find_boundaries_ns: AtomicU64,
pub step_decode_ns: AtomicU64,
pub step_group_ns: AtomicU64,
pub step_process_ns: AtomicU64,
pub step_serialize_ns: AtomicU64,
pub step_compress_ns: AtomicU64,
pub step_write_ns: AtomicU64,
pub step_read_count: AtomicU64,
pub step_decompress_count: AtomicU64,
pub step_find_boundaries_count: AtomicU64,
pub step_decode_count: AtomicU64,
pub step_group_count: AtomicU64,
pub step_process_count: AtomicU64,
pub step_serialize_count: AtomicU64,
pub step_compress_count: AtomicU64,
pub step_write_count: AtomicU64,
pub read_contention: AtomicU64,
pub boundary_contention: AtomicU64,
pub group_contention: AtomicU64,
pub write_contention: AtomicU64,
pub q1_empty: AtomicU64,
pub q2_empty: AtomicU64,
pub q2b_empty: AtomicU64,
pub q3_empty: AtomicU64,
pub q4_empty: AtomicU64,
pub q5_empty: AtomicU64,
pub q6_empty: AtomicU64,
pub q7_empty: AtomicU64,
pub idle_yields: AtomicU64,
pub per_thread_step_counts: Box<[[AtomicU64; NUM_STEPS]; MAX_THREADS]>,
pub per_thread_step_attempts: Box<[[AtomicU64; NUM_STEPS]; MAX_THREADS]>,
pub per_thread_idle_ns: Box<[AtomicU64; MAX_THREADS]>,
pub per_thread_current_step: Box<[AtomicU8; MAX_THREADS]>,
pub boundary_wait_ns: AtomicU64,
pub group_wait_ns: AtomicU64,
pub write_wait_ns: AtomicU64,
pub batch_size_sum: AtomicU64,
pub batch_count: AtomicU64,
pub batch_size_min: AtomicU64,
pub batch_size_max: AtomicU64,
pub num_threads: AtomicU64,
pub bytes_read: AtomicU64,
pub bytes_written: AtomicU64,
pub compressed_bytes_in: AtomicU64,
pub decompressed_bytes: AtomicU64,
pub serialized_bytes: AtomicU64,
pub compressed_bytes_out: AtomicU64,
pub records_decoded: AtomicU64,
pub groups_produced: AtomicU64,
pub queue_samples: Mutex<Vec<QueueSample>>,
pub memory_drain_activations: AtomicU64,
pub group_memory_rejects: AtomicU64,
pub peak_memory_bytes: AtomicU64,
#[cfg(feature = "memory-debug")]
pub memory: MemoryDebugStats,
}
#[allow(clippy::unnecessary_box_returns)]
fn new_atomic_array<const N: usize>() -> Box<[AtomicU64; N]> {
let v: Vec<AtomicU64> = (0..N).map(|_| AtomicU64::new(0)).collect();
v.into_boxed_slice().try_into().expect("Vec length matches const N")
}
#[allow(clippy::unnecessary_box_returns)]
fn new_atomic_2d_array<const R: usize, const C: usize>() -> Box<[[AtomicU64; C]; R]> {
let v: Vec<[AtomicU64; C]> =
(0..R).map(|_| std::array::from_fn(|_| AtomicU64::new(0))).collect();
v.into_boxed_slice().try_into().expect("Vec length matches const R")
}
#[allow(clippy::unnecessary_box_returns)]
fn new_atomic_u8_array<const N: usize>(init: u8) -> Box<[AtomicU8; N]> {
let v: Vec<AtomicU8> = (0..N).map(|_| AtomicU8::new(init)).collect();
v.into_boxed_slice().try_into().expect("Vec length matches const N")
}
impl Default for PipelineStats {
fn default() -> Self {
Self::new()
}
}
impl PipelineStats {
#[must_use]
pub fn new() -> Self {
Self {
step_read_ns: AtomicU64::new(0),
step_decompress_ns: AtomicU64::new(0),
step_find_boundaries_ns: AtomicU64::new(0),
step_decode_ns: AtomicU64::new(0),
step_group_ns: AtomicU64::new(0),
step_process_ns: AtomicU64::new(0),
step_serialize_ns: AtomicU64::new(0),
step_compress_ns: AtomicU64::new(0),
step_write_ns: AtomicU64::new(0),
step_read_count: AtomicU64::new(0),
step_decompress_count: AtomicU64::new(0),
step_find_boundaries_count: AtomicU64::new(0),
step_decode_count: AtomicU64::new(0),
step_group_count: AtomicU64::new(0),
step_process_count: AtomicU64::new(0),
step_serialize_count: AtomicU64::new(0),
step_compress_count: AtomicU64::new(0),
step_write_count: AtomicU64::new(0),
read_contention: AtomicU64::new(0),
boundary_contention: AtomicU64::new(0),
group_contention: AtomicU64::new(0),
write_contention: AtomicU64::new(0),
q1_empty: AtomicU64::new(0),
q2_empty: AtomicU64::new(0),
q2b_empty: AtomicU64::new(0),
q3_empty: AtomicU64::new(0),
q4_empty: AtomicU64::new(0),
q5_empty: AtomicU64::new(0),
q6_empty: AtomicU64::new(0),
q7_empty: AtomicU64::new(0),
idle_yields: AtomicU64::new(0),
per_thread_step_counts: new_atomic_2d_array::<MAX_THREADS, NUM_STEPS>(),
per_thread_step_attempts: new_atomic_2d_array::<MAX_THREADS, NUM_STEPS>(),
per_thread_idle_ns: new_atomic_array::<MAX_THREADS>(),
per_thread_current_step: new_atomic_u8_array::<MAX_THREADS>(255), boundary_wait_ns: AtomicU64::new(0),
group_wait_ns: AtomicU64::new(0),
write_wait_ns: AtomicU64::new(0),
batch_size_sum: AtomicU64::new(0),
batch_count: AtomicU64::new(0),
batch_size_min: AtomicU64::new(u64::MAX),
batch_size_max: AtomicU64::new(0),
num_threads: AtomicU64::new(0),
bytes_read: AtomicU64::new(0),
bytes_written: AtomicU64::new(0),
compressed_bytes_in: AtomicU64::new(0),
decompressed_bytes: AtomicU64::new(0),
serialized_bytes: AtomicU64::new(0),
compressed_bytes_out: AtomicU64::new(0),
records_decoded: AtomicU64::new(0),
groups_produced: AtomicU64::new(0),
queue_samples: Mutex::new(Vec::new()),
memory_drain_activations: AtomicU64::new(0),
group_memory_rejects: AtomicU64::new(0),
peak_memory_bytes: AtomicU64::new(0),
#[cfg(feature = "memory-debug")]
memory: MemoryDebugStats::new(),
}
}
pub fn set_num_threads(&self, n: usize) {
self.num_threads.store(n as u64, Ordering::Relaxed);
}
#[inline]
pub fn record_step(&self, step: PipelineStep, elapsed_ns: u64) {
self.record_step_for_thread(step, elapsed_ns, None);
}
#[inline]
pub fn record_step_for_thread(
&self,
step: PipelineStep,
elapsed_ns: u64,
thread_id: Option<usize>,
) {
match step {
PipelineStep::Read => {
self.step_read_ns.fetch_add(elapsed_ns, Ordering::Relaxed);
self.step_read_count.fetch_add(1, Ordering::Relaxed);
}
PipelineStep::Decompress => {
self.step_decompress_ns.fetch_add(elapsed_ns, Ordering::Relaxed);
self.step_decompress_count.fetch_add(1, Ordering::Relaxed);
}
PipelineStep::FindBoundaries => {
self.step_find_boundaries_ns.fetch_add(elapsed_ns, Ordering::Relaxed);
self.step_find_boundaries_count.fetch_add(1, Ordering::Relaxed);
}
PipelineStep::Decode => {
self.step_decode_ns.fetch_add(elapsed_ns, Ordering::Relaxed);
self.step_decode_count.fetch_add(1, Ordering::Relaxed);
}
PipelineStep::Group => {
self.step_group_ns.fetch_add(elapsed_ns, Ordering::Relaxed);
self.step_group_count.fetch_add(1, Ordering::Relaxed);
}
PipelineStep::Process => {
self.step_process_ns.fetch_add(elapsed_ns, Ordering::Relaxed);
self.step_process_count.fetch_add(1, Ordering::Relaxed);
}
PipelineStep::Serialize => {
self.step_serialize_ns.fetch_add(elapsed_ns, Ordering::Relaxed);
self.step_serialize_count.fetch_add(1, Ordering::Relaxed);
}
PipelineStep::Compress => {
self.step_compress_ns.fetch_add(elapsed_ns, Ordering::Relaxed);
self.step_compress_count.fetch_add(1, Ordering::Relaxed);
}
PipelineStep::Write => {
self.step_write_ns.fetch_add(elapsed_ns, Ordering::Relaxed);
self.step_write_count.fetch_add(1, Ordering::Relaxed);
}
}
if let Some(tid) = thread_id {
if tid < MAX_THREADS {
let step_idx = step as usize;
self.per_thread_step_counts[tid][step_idx].fetch_add(1, Ordering::Relaxed);
}
}
}
#[inline]
pub fn record_contention(&self, step: PipelineStep) {
match step {
PipelineStep::Read => {
self.read_contention.fetch_add(1, Ordering::Relaxed);
}
PipelineStep::FindBoundaries => {
self.boundary_contention.fetch_add(1, Ordering::Relaxed);
}
PipelineStep::Group => {
self.group_contention.fetch_add(1, Ordering::Relaxed);
}
PipelineStep::Write => {
self.write_contention.fetch_add(1, Ordering::Relaxed);
}
_ => {}
}
}
#[inline]
pub fn record_wait_time(&self, step: PipelineStep, wait_ns: u64) {
match step {
PipelineStep::FindBoundaries => {
self.boundary_wait_ns.fetch_add(wait_ns, Ordering::Relaxed);
}
PipelineStep::Group => {
self.group_wait_ns.fetch_add(wait_ns, Ordering::Relaxed);
}
PipelineStep::Write => {
self.write_wait_ns.fetch_add(wait_ns, Ordering::Relaxed);
}
_ => {}
}
}
#[inline]
pub fn record_queue_empty(&self, queue_num: usize) {
match queue_num {
1 => self.q1_empty.fetch_add(1, Ordering::Relaxed),
2 => self.q2_empty.fetch_add(1, Ordering::Relaxed),
25 => self.q2b_empty.fetch_add(1, Ordering::Relaxed), 3 => self.q3_empty.fetch_add(1, Ordering::Relaxed),
4 => self.q4_empty.fetch_add(1, Ordering::Relaxed),
5 => self.q5_empty.fetch_add(1, Ordering::Relaxed),
6 => self.q6_empty.fetch_add(1, Ordering::Relaxed),
7 => self.q7_empty.fetch_add(1, Ordering::Relaxed),
_ => 0,
};
}
#[inline]
pub fn record_idle(&self) {
self.idle_yields.fetch_add(1, Ordering::Relaxed);
}
#[inline]
pub fn record_idle_for_thread(&self, thread_id: usize, idle_ns: u64) {
self.idle_yields.fetch_add(1, Ordering::Relaxed);
if thread_id < MAX_THREADS {
self.per_thread_idle_ns[thread_id].fetch_add(idle_ns, Ordering::Relaxed);
}
}
#[inline]
#[allow(clippy::cast_possible_truncation)]
pub fn record_step_attempt(&self, thread_id: usize, step: PipelineStep) {
if thread_id < MAX_THREADS {
let step_idx = step as usize;
self.per_thread_step_attempts[thread_id][step_idx].fetch_add(1, Ordering::Relaxed);
self.per_thread_current_step[thread_id].store(step_idx as u8, Ordering::Relaxed);
}
}
#[inline]
pub fn set_current_step(&self, thread_id: usize, step: PipelineStep) {
if thread_id < MAX_THREADS {
self.per_thread_current_step[thread_id].store(step as u8, Ordering::Relaxed);
}
}
#[inline]
pub fn clear_current_step(&self, thread_id: usize) {
if thread_id < MAX_THREADS {
self.per_thread_current_step[thread_id].store(255, Ordering::Relaxed);
}
}
#[allow(clippy::cast_possible_truncation)]
pub fn get_thread_activity(&self, num_threads: usize) -> Vec<Option<PipelineStep>> {
(0..num_threads.min(MAX_THREADS))
.map(|tid| {
let step_idx = self.per_thread_current_step[tid].load(Ordering::Relaxed);
if step_idx < NUM_STEPS as u8 {
Some(PipelineStep::from_index(step_idx as usize))
} else {
None }
})
.collect()
}
#[inline]
pub fn record_batch_size(&self, size: usize) {
let size = size as u64;
self.batch_size_sum.fetch_add(size, Ordering::Relaxed);
self.batch_count.fetch_add(1, Ordering::Relaxed);
let mut current_min = self.batch_size_min.load(Ordering::Relaxed);
while size < current_min {
match self.batch_size_min.compare_exchange_weak(
current_min,
size,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(actual) => current_min = actual,
}
}
let mut current_max = self.batch_size_max.load(Ordering::Relaxed);
while size > current_max {
match self.batch_size_max.compare_exchange_weak(
current_max,
size,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(actual) => current_max = actual,
}
}
}
pub fn add_queue_sample(&self, sample: QueueSample) {
self.queue_samples.lock().push(sample);
}
pub fn get_queue_samples(&self) -> Vec<QueueSample> {
self.queue_samples.lock().clone()
}
#[inline]
pub fn record_memory_drain_activation(&self) {
self.memory_drain_activations.fetch_add(1, Ordering::Relaxed);
}
#[inline]
pub fn record_group_memory_reject(&self) {
self.group_memory_rejects.fetch_add(1, Ordering::Relaxed);
}
#[inline]
pub fn record_memory_usage(&self, bytes: u64) {
let mut current_peak = self.peak_memory_bytes.load(Ordering::Relaxed);
while bytes > current_peak {
match self.peak_memory_bytes.compare_exchange_weak(
current_peak,
bytes,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(actual) => current_peak = actual,
}
}
}
#[allow(clippy::similar_names)] #[allow(
clippy::too_many_lines,
clippy::cast_precision_loss,
clippy::cast_possible_truncation,
clippy::cast_sign_loss
)]
pub fn format_summary(&self) -> String {
use std::fmt::Write;
let mut s = String::new();
writeln!(s, "Pipeline Statistics:").expect("write to String");
writeln!(s).expect("write to String");
#[allow(clippy::uninlined_format_args)]
let format_step = |name: &str, ns: u64, count: u64| -> String {
if count == 0 {
format!(" {:<20} {:>10} ops, {:>12}", name, 0, "-")
} else {
let total_ms = ns as f64 / 1_000_000.0;
let avg_us = (ns as f64 / count as f64) / 1_000.0;
format!(
" {:<20} {:>10} ops, {:>10.1}ms total, {:>8.1}µs avg",
name, count, total_ms, avg_us
)
}
};
writeln!(s, "Step Timing:").expect("write to String");
writeln!(
s,
"{}",
format_step(
"Read",
self.step_read_ns.load(Ordering::Relaxed),
self.step_read_count.load(Ordering::Relaxed)
)
)
.expect("write to String");
writeln!(
s,
"{}",
format_step(
"Decompress",
self.step_decompress_ns.load(Ordering::Relaxed),
self.step_decompress_count.load(Ordering::Relaxed)
)
)
.expect("write to String");
writeln!(
s,
"{}",
format_step(
"FindBoundaries",
self.step_find_boundaries_ns.load(Ordering::Relaxed),
self.step_find_boundaries_count.load(Ordering::Relaxed)
)
)
.expect("write to String");
writeln!(
s,
"{}",
format_step(
"Decode",
self.step_decode_ns.load(Ordering::Relaxed),
self.step_decode_count.load(Ordering::Relaxed)
)
)
.expect("write to String");
writeln!(
s,
"{}",
format_step(
"Group",
self.step_group_ns.load(Ordering::Relaxed),
self.step_group_count.load(Ordering::Relaxed)
)
)
.expect("write to String");
writeln!(
s,
"{}",
format_step(
"Process",
self.step_process_ns.load(Ordering::Relaxed),
self.step_process_count.load(Ordering::Relaxed)
)
)
.expect("write to String");
writeln!(
s,
"{}",
format_step(
"Serialize",
self.step_serialize_ns.load(Ordering::Relaxed),
self.step_serialize_count.load(Ordering::Relaxed)
)
)
.expect("write to String");
writeln!(
s,
"{}",
format_step(
"Compress",
self.step_compress_ns.load(Ordering::Relaxed),
self.step_compress_count.load(Ordering::Relaxed)
)
)
.expect("write to String");
writeln!(
s,
"{}",
format_step(
"Write",
self.step_write_ns.load(Ordering::Relaxed),
self.step_write_count.load(Ordering::Relaxed)
)
)
.expect("write to String");
writeln!(s).expect("write to String");
writeln!(s, "Contention:").expect("write to String");
writeln!(
s,
" Read lock: {:>10} failed attempts",
self.read_contention.load(Ordering::Relaxed)
)
.expect("write to String");
writeln!(
s,
" Boundary lock: {:>10} failed attempts",
self.boundary_contention.load(Ordering::Relaxed)
)
.expect("write to String");
writeln!(
s,
" Group lock: {:>10} failed attempts",
self.group_contention.load(Ordering::Relaxed)
)
.expect("write to String");
writeln!(
s,
" Write lock: {:>10} failed attempts",
self.write_contention.load(Ordering::Relaxed)
)
.expect("write to String");
writeln!(s).expect("write to String");
writeln!(s, "Queue Empty Polls:").expect("write to String");
writeln!(s, " Q1 (raw): {:>10}", self.q1_empty.load(Ordering::Relaxed))
.expect("write to String");
writeln!(s, " Q2 (decomp): {:>10}", self.q2_empty.load(Ordering::Relaxed))
.expect("write to String");
writeln!(s, " Q2b (boundary): {:>10}", self.q2b_empty.load(Ordering::Relaxed))
.expect("write to String");
writeln!(s, " Q3 (decoded): {:>10}", self.q3_empty.load(Ordering::Relaxed))
.expect("write to String");
writeln!(s, " Q4 (groups): {:>10}", self.q4_empty.load(Ordering::Relaxed))
.expect("write to String");
writeln!(s, " Q5 (processed): {:>10}", self.q5_empty.load(Ordering::Relaxed))
.expect("write to String");
writeln!(s, " Q6 (serialized): {:>10}", self.q6_empty.load(Ordering::Relaxed))
.expect("write to String");
writeln!(s, " Q7 (compressed): {:>10}", self.q7_empty.load(Ordering::Relaxed))
.expect("write to String");
writeln!(s).expect("write to String");
writeln!(s, "Idle Yields: {:>10}", self.idle_yields.load(Ordering::Relaxed))
.expect("write to String");
let boundary_wait = self.boundary_wait_ns.load(Ordering::Relaxed);
let group_wait = self.group_wait_ns.load(Ordering::Relaxed);
let write_wait = self.write_wait_ns.load(Ordering::Relaxed);
if boundary_wait > 0 || group_wait > 0 || write_wait > 0 {
writeln!(s).expect("write to String");
writeln!(s, "Lock Wait Time:").expect("write to String");
writeln!(s, " Boundary lock: {:>10.1}ms", boundary_wait as f64 / 1_000_000.0)
.expect("write to String");
writeln!(s, " Group lock: {:>10.1}ms", group_wait as f64 / 1_000_000.0)
.expect("write to String");
writeln!(s, " Write lock: {:>10.1}ms", write_wait as f64 / 1_000_000.0)
.expect("write to String");
}
let batch_count = self.batch_count.load(Ordering::Relaxed);
if batch_count > 0 {
let batch_sum = self.batch_size_sum.load(Ordering::Relaxed);
let batch_min = self.batch_size_min.load(Ordering::Relaxed);
let batch_max = self.batch_size_max.load(Ordering::Relaxed);
let batch_avg = batch_sum as f64 / batch_count as f64;
writeln!(s).expect("write to String");
writeln!(s, "Batch Size (records per batch):").expect("write to String");
writeln!(s, " Count: {batch_count:>10}").expect("write to String");
writeln!(s, " Min: {batch_min:>10}").expect("write to String");
writeln!(s, " Max: {batch_max:>10}").expect("write to String");
writeln!(s, " Average: {batch_avg:>10.1}").expect("write to String");
}
let num_threads = self.num_threads.load(Ordering::Relaxed) as usize;
if num_threads > 0 {
writeln!(s).expect("write to String");
writeln!(s, "Per-Thread Work Distribution:").expect("write to String");
let step_names = ["Rd", "Dc", "Fb", "De", "Gr", "Pr", "Se", "Co", "Wr"];
write!(s, " Thread ").expect("write to String");
for name in &step_names {
write!(s, " {name:>6}").expect("write to String");
}
writeln!(s, " Idle ms").expect("write to String");
for tid in 0..num_threads.min(MAX_THREADS) {
write!(s, " T{tid:<5} ").expect("write to String");
for step_idx in 0..NUM_STEPS {
let count = self.per_thread_step_counts[tid][step_idx].load(Ordering::Relaxed);
write!(s, " {count:>6}").expect("write to String");
}
let idle_ns = self.per_thread_idle_ns[tid].load(Ordering::Relaxed);
writeln!(s, " {:>10.1}", idle_ns as f64 / 1_000_000.0).expect("write to String");
}
write!(s, " Total ").expect("write to String");
for step_idx in 0..NUM_STEPS {
let mut total = 0u64;
for tid in 0..num_threads.min(MAX_THREADS) {
total += self.per_thread_step_counts[tid][step_idx].load(Ordering::Relaxed);
}
write!(s, " {total:>6}").expect("write to String");
}
let total_idle: u64 = (0..num_threads.min(MAX_THREADS))
.map(|tid| self.per_thread_idle_ns[tid].load(Ordering::Relaxed))
.sum();
writeln!(s, " {:>10.1}", total_idle as f64 / 1_000_000.0).expect("write to String");
writeln!(s).expect("write to String");
writeln!(s, "Per-Thread Attempt Success Rate:").expect("write to String");
write!(s, " Thread ").expect("write to String");
for name in &step_names {
write!(s, " {name:>6}").expect("write to String");
}
writeln!(s, " Total%").expect("write to String");
for tid in 0..num_threads.min(MAX_THREADS) {
write!(s, " T{tid:<5} ").expect("write to String");
let mut thread_attempts = 0u64;
let mut thread_successes = 0u64;
for step_idx in 0..NUM_STEPS {
let attempts =
self.per_thread_step_attempts[tid][step_idx].load(Ordering::Relaxed);
let successes =
self.per_thread_step_counts[tid][step_idx].load(Ordering::Relaxed);
thread_attempts += attempts;
thread_successes += successes;
if attempts == 0 {
write!(s, " - ").expect("write to String");
} else {
let rate = (successes as f64 / attempts as f64) * 100.0;
write!(s, " {rate:>5.0}%").expect("write to String");
}
}
if thread_attempts == 0 {
writeln!(s, " -").expect("write to String");
} else {
let total_rate = (thread_successes as f64 / thread_attempts as f64) * 100.0;
writeln!(s, " {total_rate:>5.1}%").expect("write to String");
}
}
}
let total_work_ns = self.step_read_ns.load(Ordering::Relaxed)
+ self.step_decompress_ns.load(Ordering::Relaxed)
+ self.step_find_boundaries_ns.load(Ordering::Relaxed)
+ self.step_decode_ns.load(Ordering::Relaxed)
+ self.step_group_ns.load(Ordering::Relaxed)
+ self.step_process_ns.load(Ordering::Relaxed)
+ self.step_serialize_ns.load(Ordering::Relaxed)
+ self.step_compress_ns.load(Ordering::Relaxed)
+ self.step_write_ns.load(Ordering::Relaxed);
let total_idle_ns: u64 = (0..num_threads.min(MAX_THREADS))
.map(|tid| self.per_thread_idle_ns[tid].load(Ordering::Relaxed))
.sum();
let total_contention = self.read_contention.load(Ordering::Relaxed)
+ self.boundary_contention.load(Ordering::Relaxed)
+ self.group_contention.load(Ordering::Relaxed)
+ self.write_contention.load(Ordering::Relaxed);
if total_work_ns > 0 {
writeln!(s).expect("write to String");
writeln!(s, "Thread Utilization:").expect("write to String");
let work_ms = total_work_ns as f64 / 1_000_000.0;
let idle_ms = total_idle_ns as f64 / 1_000_000.0;
let total_thread_ms = work_ms + idle_ms;
if total_thread_ms > 0.0 {
let utilization = (work_ms / total_thread_ms) * 100.0;
writeln!(s, " Work time: {work_ms:>10.1}ms").expect("write to String");
writeln!(s, " Idle time: {idle_ms:>10.1}ms").expect("write to String");
writeln!(s, " Utilization: {utilization:>10.1}%").expect("write to String");
writeln!(s, " Contention attempts: {total_contention:>7}")
.expect("write to String");
}
}
let bytes_read = self.bytes_read.load(Ordering::Relaxed);
let bytes_written = self.bytes_written.load(Ordering::Relaxed);
let compressed_bytes_in = self.compressed_bytes_in.load(Ordering::Relaxed);
let decompressed_bytes = self.decompressed_bytes.load(Ordering::Relaxed);
let serialized_bytes = self.serialized_bytes.load(Ordering::Relaxed);
let compressed_bytes_out = self.compressed_bytes_out.load(Ordering::Relaxed);
let records_decoded = self.records_decoded.load(Ordering::Relaxed);
let groups_produced = self.groups_produced.load(Ordering::Relaxed);
if bytes_read > 0 || bytes_written > 0 {
writeln!(s).expect("write to String");
writeln!(s, "Throughput:").expect("write to String");
let format_bytes = |bytes: u64| -> String {
if bytes >= 1_000_000_000 {
format!("{:.2} GB", bytes as f64 / 1_000_000_000.0)
} else if bytes >= 1_000_000 {
format!("{:.1} MB", bytes as f64 / 1_000_000.0)
} else if bytes >= 1_000 {
format!("{:.1} KB", bytes as f64 / 1_000.0)
} else {
format!("{bytes} B")
}
};
let format_count = |count: u64| -> String {
if count >= 1_000_000 {
format!("{:.2}M", count as f64 / 1_000_000.0)
} else if count >= 1_000 {
format!("{:.1}K", count as f64 / 1_000.0)
} else {
format!("{count}")
}
};
let read_ns = self.step_read_ns.load(Ordering::Relaxed);
if bytes_read > 0 && read_ns > 0 {
let read_ms = read_ns as f64 / 1_000_000.0;
let read_mb_s = (bytes_read as f64 / 1_000_000.0) / (read_ms / 1000.0);
writeln!(
s,
" Read: {:>10} in {:>8.1}ms = {:>8.1} MB/s",
format_bytes(bytes_read),
read_ms,
read_mb_s
)
.expect("write to String");
}
let decompress_ns = self.step_decompress_ns.load(Ordering::Relaxed);
if compressed_bytes_in > 0 && decompressed_bytes > 0 && decompress_ns > 0 {
let decompress_ms = decompress_ns as f64 / 1_000_000.0;
let in_mb_s = (compressed_bytes_in as f64 / 1_000_000.0) / (decompress_ms / 1000.0);
let out_mb_s = (decompressed_bytes as f64 / 1_000_000.0) / (decompress_ms / 1000.0);
let expansion = decompressed_bytes as f64 / compressed_bytes_in as f64;
writeln!(
s,
" Decompress: {:>10} → {:>10} = {:>6.1} → {:>6.1} MB/s ({:.2}x expansion)",
format_bytes(compressed_bytes_in),
format_bytes(decompressed_bytes),
in_mb_s,
out_mb_s,
expansion
)
.expect("write to String");
}
let decode_ns = self.step_decode_ns.load(Ordering::Relaxed);
if records_decoded > 0 && decode_ns > 0 {
let decode_ms = decode_ns as f64 / 1_000_000.0;
let records_per_s = records_decoded as f64 / (decode_ms / 1000.0);
writeln!(
s,
" Decode: {:>10} records = {:>8} records/s",
format_count(records_decoded),
format_count(records_per_s as u64)
)
.expect("write to String");
}
let group_ns = self.step_group_ns.load(Ordering::Relaxed);
if records_decoded > 0 && groups_produced > 0 && group_ns > 0 {
let group_ms = group_ns as f64 / 1_000_000.0;
let records_in_per_s = records_decoded as f64 / (group_ms / 1000.0);
let groups_out_per_s = groups_produced as f64 / (group_ms / 1000.0);
writeln!(
s,
" Group: {:>10} → {:>10} = {:>6} records/s in, {:>6} groups/s out",
format_count(records_decoded),
format_count(groups_produced),
format_count(records_in_per_s as u64),
format_count(groups_out_per_s as u64)
)
.expect("write to String");
}
let process_ns = self.step_process_ns.load(Ordering::Relaxed);
if groups_produced > 0 && process_ns > 0 {
let process_ms = process_ns as f64 / 1_000_000.0;
let groups_per_s = groups_produced as f64 / (process_ms / 1000.0);
writeln!(
s,
" Process: {:>10} groups = {:>8} groups/s",
format_count(groups_produced),
format_count(groups_per_s as u64)
)
.expect("write to String");
}
let serialize_ns = self.step_serialize_ns.load(Ordering::Relaxed);
if serialized_bytes > 0 && serialize_ns > 0 {
let serialize_ms = serialize_ns as f64 / 1_000_000.0;
let mb_per_s = (serialized_bytes as f64 / 1_000_000.0) / (serialize_ms / 1000.0);
writeln!(
s,
" Serialize: {:>10} = {:>8.1} MB/s",
format_bytes(serialized_bytes),
mb_per_s
)
.expect("write to String");
}
let compress_ns = self.step_compress_ns.load(Ordering::Relaxed);
if serialized_bytes > 0 && compressed_bytes_out > 0 && compress_ns > 0 {
let compress_ms = compress_ns as f64 / 1_000_000.0;
let in_mb_s = (serialized_bytes as f64 / 1_000_000.0) / (compress_ms / 1000.0);
let out_mb_s = (compressed_bytes_out as f64 / 1_000_000.0) / (compress_ms / 1000.0);
let compression = serialized_bytes as f64 / compressed_bytes_out as f64;
writeln!(
s,
" Compress: {:>10} → {:>10} = {:>6.1} → {:>6.1} MB/s ({:.2}x compression)",
format_bytes(serialized_bytes),
format_bytes(compressed_bytes_out),
in_mb_s,
out_mb_s,
compression
)
.expect("write to String");
}
let write_ns = self.step_write_ns.load(Ordering::Relaxed);
if bytes_written > 0 && write_ns > 0 {
let write_ms = write_ns as f64 / 1_000_000.0;
let write_mb_s = (bytes_written as f64 / 1_000_000.0) / (write_ms / 1000.0);
writeln!(
s,
" Write: {:>10} in {:>8.1}ms = {:>8.1} MB/s",
format_bytes(bytes_written),
write_ms,
write_mb_s
)
.expect("write to String");
}
}
let samples = self.queue_samples.lock();
if !samples.is_empty() {
writeln!(s).expect("write to String");
writeln!(s, "Queue Size Timeline ({} samples at ~100ms intervals):", samples.len())
.expect("write to String");
writeln!(
s,
" Time Q1 Q2 Q2b Q3 Q4 Q5 Q6 Q7 | R2 R3 R7 | R3_MB Threads"
)
.expect("write to String");
for sample in samples.iter() {
let r3_mb = sample.reorder_memory_bytes[1] as f64 / 1_048_576.0;
write!(
s,
" {:>4} {:>3} {:>3} {:>3} {:>3} {:>3} {:>3} {:>3} {:>3} | {:>3} {:>3} {:>3} | {:>6.1} ",
sample.time_ms,
sample.queue_sizes[0],
sample.queue_sizes[1],
sample.queue_sizes[2],
sample.queue_sizes[3],
sample.queue_sizes[4],
sample.queue_sizes[5],
sample.queue_sizes[6],
sample.queue_sizes[7],
sample.reorder_sizes[0],
sample.reorder_sizes[1],
sample.reorder_sizes[2],
r3_mb,
)
.expect("write to String");
for &step_idx in &sample.thread_steps {
if step_idx < NUM_STEPS as u8 {
let short = match step_idx {
0 => "R",
1 => "D",
2 => "F",
3 => "d",
4 => "G",
5 => "P",
6 => "S",
7 => "C",
8 => "W",
_ => "?",
};
write!(s, "{short}").expect("write to String");
} else {
write!(s, ".").expect("write to String");
}
}
writeln!(s).expect("write to String");
}
let peak_r3_items = samples.iter().map(|s| s.reorder_sizes[1]).max().unwrap_or(0);
let peak_r3_bytes =
samples.iter().map(|s| s.reorder_memory_bytes[1]).max().unwrap_or(0);
let peak_r3_mb = peak_r3_bytes as f64 / 1_048_576.0;
writeln!(s).expect("write to String");
writeln!(s, "Peak Q3 Reorder Buffer: {peak_r3_items} items, {peak_r3_mb:.1} MB")
.expect("write to String");
}
let group_rejects = self.group_memory_rejects.load(Ordering::Relaxed);
let peak_memory = self.peak_memory_bytes.load(Ordering::Relaxed);
if group_rejects > 0 || peak_memory > 0 {
writeln!(s).expect("write to String");
writeln!(s, "Memory Limiting:").expect("write to String");
if group_rejects > 0 {
writeln!(s, " Group rejects (memory): {group_rejects:>10}")
.expect("write to String");
}
if peak_memory > 0 {
let peak_mb = peak_memory as f64 / 1_048_576.0;
writeln!(s, " Peak memory usage: {peak_mb:>10.1} MB")
.expect("write to String");
}
}
s
}
pub fn log_summary(&self) {
for line in self.format_summary().lines() {
info!("{line}");
}
}
}
#[cfg(feature = "memory-debug")]
impl PipelineStats {
pub fn track_queue_memory_add(&self, queue_name: &str, size: usize) {
let m = &self.memory;
let size_u64 = size as u64;
match queue_name {
"q1" => m.q1_memory_bytes.fetch_add(size_u64, Ordering::Relaxed),
"q2" => m.q2_memory_bytes.fetch_add(size_u64, Ordering::Relaxed),
"q3" => m.q3_memory_bytes.fetch_add(size_u64, Ordering::Relaxed),
"q4" => m.q4_memory_bytes.fetch_add(size_u64, Ordering::Relaxed),
"q5" => m.q5_memory_bytes.fetch_add(size_u64, Ordering::Relaxed),
"q6" => m.q6_memory_bytes.fetch_add(size_u64, Ordering::Relaxed),
"q7" => m.q7_memory_bytes.fetch_add(size_u64, Ordering::Relaxed),
_ => 0,
};
}
pub fn track_queue_memory_remove(&self, queue_name: &str, size: usize) {
let m = &self.memory;
let counter = match queue_name {
"q1" => &m.q1_memory_bytes,
"q2" => &m.q2_memory_bytes,
"q3" => &m.q3_memory_bytes,
"q4" => &m.q4_memory_bytes,
"q5" => &m.q5_memory_bytes,
"q6" => &m.q6_memory_bytes,
"q7" => &m.q7_memory_bytes,
_ => return,
};
let size_u64 = size as u64;
let mut current = counter.load(Ordering::Relaxed);
loop {
let new_val = current.saturating_sub(size_u64);
match counter.compare_exchange_weak(
current,
new_val,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(actual) => current = actual,
}
}
}
pub fn track_position_group_memory(&self, size: usize, is_allocation: bool) {
let counter = &self.memory.position_group_processing_bytes;
let size_u64 = size as u64;
if is_allocation {
counter.fetch_add(size_u64, Ordering::Relaxed);
} else {
let mut current = counter.load(Ordering::Relaxed);
loop {
let new_val = current.saturating_sub(size_u64);
match counter.compare_exchange_weak(
current,
new_val,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(actual) => current = actual,
}
}
}
}
pub fn track_template_memory(&self, size: usize, is_allocation: bool) {
let counter = &self.memory.template_processing_bytes;
let size_u64 = size as u64;
if is_allocation {
counter.fetch_add(size_u64, Ordering::Relaxed);
} else {
let mut current = counter.load(Ordering::Relaxed);
loop {
let new_val = current.saturating_sub(size_u64);
match counter.compare_exchange_weak(
current,
new_val,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(actual) => current = actual,
}
}
}
}
pub fn update_system_rss(&self, rss_bytes: u64) {
self.memory.system_rss_bytes.store(rss_bytes, Ordering::Relaxed);
}
pub fn set_infrastructure_memory(&self, num_threads: usize, queue_capacity: usize) {
let m = &self.memory;
m.decompressor_memory_bytes.store(num_threads as u64 * 32 * 1024, Ordering::Relaxed);
m.compressor_memory_bytes.store(num_threads as u64 * 280 * 1024, Ordering::Relaxed);
m.worker_buffer_memory_bytes.store(num_threads as u64 * 512 * 1024, Ordering::Relaxed);
m.io_buffer_memory_bytes.store(16u64 * 1024 * 1024, Ordering::Relaxed);
m.thread_stack_memory_bytes
.store((num_threads as u64 + 1) * 2 * 1024 * 1024, Ordering::Relaxed);
m.queue_capacity_memory_bytes.store(7u64 * queue_capacity as u64 * 128, Ordering::Relaxed);
}
pub fn update_queue_memory_from_external(&self, queue_stats: &[(&str, u64)]) {
let m = &self.memory;
for (queue_name, current_bytes) in queue_stats {
match *queue_name {
"q1" => m.q1_memory_bytes.store(*current_bytes, Ordering::Relaxed),
"q2" => m.q2_memory_bytes.store(*current_bytes, Ordering::Relaxed),
"q3" => m.q3_memory_bytes.store(*current_bytes, Ordering::Relaxed),
"q4" => m.q4_memory_bytes.store(*current_bytes, Ordering::Relaxed),
"q5" => m.q5_memory_bytes.store(*current_bytes, Ordering::Relaxed),
"q6" => m.q6_memory_bytes.store(*current_bytes, Ordering::Relaxed),
"q7" => m.q7_memory_bytes.store(*current_bytes, Ordering::Relaxed),
_ => {}
}
}
}
pub fn get_memory_breakdown(&self) -> MemoryBreakdown {
let m = &self.memory;
let q1 = m.q1_memory_bytes.load(Ordering::Relaxed);
let q2 = m.q2_memory_bytes.load(Ordering::Relaxed);
let q3 = m.q3_memory_bytes.load(Ordering::Relaxed);
let q4 = m.q4_memory_bytes.load(Ordering::Relaxed);
let q5 = m.q5_memory_bytes.load(Ordering::Relaxed);
let q6 = m.q6_memory_bytes.load(Ordering::Relaxed);
let q7 = m.q7_memory_bytes.load(Ordering::Relaxed);
let queue_total = q1 + q2 + q3 + q4 + q5 + q6 + q7;
let pos_groups = m.position_group_processing_bytes.load(Ordering::Relaxed);
let templates = m.template_processing_bytes.load(Ordering::Relaxed);
let reorder = m.reorder_buffer_bytes.load(Ordering::Relaxed);
let grouper = m.grouper_memory_bytes.load(Ordering::Relaxed);
let worker_local = m.worker_local_memory_bytes.load(Ordering::Relaxed);
let processing_total = pos_groups + templates + reorder + grouper + worker_local;
let infra_decompressors = m.decompressor_memory_bytes.load(Ordering::Relaxed);
let infra_compressors = m.compressor_memory_bytes.load(Ordering::Relaxed);
let infra_buffers = m.worker_buffer_memory_bytes.load(Ordering::Relaxed);
let infra_io = m.io_buffer_memory_bytes.load(Ordering::Relaxed);
let infra_stacks = m.thread_stack_memory_bytes.load(Ordering::Relaxed);
let infra_queues = m.queue_capacity_memory_bytes.load(Ordering::Relaxed);
let infra_total = infra_decompressors
+ infra_compressors
+ infra_buffers
+ infra_io
+ infra_stacks
+ infra_queues;
let tracked_total = queue_total + processing_total + infra_total;
let system_rss = m.system_rss_bytes.load(Ordering::Relaxed);
let untracked = system_rss.saturating_sub(tracked_total);
MemoryBreakdown {
system_rss_gb: system_rss as f64 / 1e9,
tracked_total_gb: tracked_total as f64 / 1e9,
untracked_gb: untracked as f64 / 1e9,
q1_mb: q1 as f64 / 1e6,
q2_mb: q2 as f64 / 1e6,
q3_mb: q3 as f64 / 1e6,
q4_gb: q4 as f64 / 1e9,
q5_gb: q5 as f64 / 1e9,
q6_mb: q6 as f64 / 1e6,
q7_mb: q7 as f64 / 1e6,
position_groups_gb: pos_groups as f64 / 1e9,
templates_gb: templates as f64 / 1e9,
reorder_buffers_mb: reorder as f64 / 1e6,
grouper_mb: grouper as f64 / 1e6,
worker_local_mb: worker_local as f64 / 1e6,
decompressors_mb: infra_decompressors as f64 / 1e6,
compressors_mb: infra_compressors as f64 / 1e6,
worker_buffers_mb: infra_buffers as f64 / 1e6,
io_buffers_mb: infra_io as f64 / 1e6,
thread_stacks_mb: infra_stacks as f64 / 1e6,
queue_capacity_mb: infra_queues as f64 / 1e6,
infrastructure_gb: infra_total as f64 / 1e9,
}
}
}
pub trait OutputPipelineState: Send + Sync {
type Processed: Send;
fn has_error(&self) -> bool;
fn set_error(&self, error: io::Error);
fn q5_pop(&self) -> Option<(u64, SerializedBatch)>;
fn q5_push(&self, item: (u64, SerializedBatch)) -> Result<(), (u64, SerializedBatch)>;
fn q5_is_full(&self) -> bool;
fn q5_track_pop(&self, _heap_size: u64) {}
fn q6_pop(&self) -> Option<(u64, CompressedBlockBatch)>;
fn q6_push(&self, item: (u64, CompressedBlockBatch))
-> Result<(), (u64, CompressedBlockBatch)>;
fn q6_is_full(&self) -> bool;
fn q6_track_pop(&self, _heap_size: u64) {}
fn q6_reorder_insert(&self, serial: u64, batch: CompressedBlockBatch);
fn q6_reorder_try_pop_next(&self) -> Option<CompressedBlockBatch>;
fn output_try_lock(&self)
-> Option<parking_lot::MutexGuard<'_, Option<Box<dyn Write + Send>>>>;
fn increment_written(&self) -> u64;
fn record_compressed_bytes_out(&self, _bytes: u64) {}
fn record_q6_pop_progress(&self) {}
fn record_q7_push_progress(&self) {}
fn write_reorder_can_proceed(&self, _serial: u64) -> bool {
true
}
fn write_reorder_is_memory_high(&self) -> bool {
false
}
fn stats(&self) -> Option<&PipelineStats> {
None
}
}
pub trait HasCompressor {
fn compressor_mut(&mut self) -> &mut InlineBgzfCompressor;
}
pub trait HasRecycledBuffers {
fn take_or_alloc_buffer(&mut self, capacity: usize) -> Vec<u8>;
fn recycle_buffer(&mut self, buf: Vec<u8>);
}
pub trait WorkerStateCommon {
fn has_any_held_items(&self) -> bool;
fn clear_held_items(&mut self);
}
pub trait HasWorkerCore {
fn core(&self) -> &WorkerCoreState;
fn core_mut(&mut self) -> &mut WorkerCoreState;
}
#[inline]
#[allow(clippy::cast_possible_truncation)]
pub fn handle_worker_backoff<W: HasWorkerCore>(
worker: &mut W,
stats: Option<&PipelineStats>,
did_work: bool,
) {
if did_work {
worker.core_mut().reset_backoff();
} else {
if let Some(stats) = stats {
let tid = worker.core().scheduler.thread_id();
stats.clear_current_step(tid);
let idle_start = Instant::now();
worker.core_mut().sleep_backoff();
stats.record_idle_for_thread(tid, idle_start.elapsed().as_nanos() as u64);
} else {
worker.core_mut().sleep_backoff();
}
worker.core_mut().increase_backoff();
}
}
pub trait StepContext {
type Worker: WorkerStateCommon + HasWorkerCore;
fn execute_step(&self, worker: &mut Self::Worker, step: PipelineStep) -> (bool, bool);
fn get_backpressure(&self, worker: &Self::Worker) -> BackpressureState;
fn check_drain_mode(&self);
fn has_error(&self) -> bool;
fn is_complete(&self) -> bool;
fn stats(&self) -> Option<&PipelineStats>;
fn skip_read(&self) -> bool;
fn check_completion_at_end(&self) -> bool {
false
}
fn should_attempt_sticky_read(&self) -> bool {
false
}
fn sticky_read_should_continue(&self) -> bool {
false
}
fn execute_read_step(&self, _worker: &mut Self::Worker) -> bool {
false
}
fn is_drain_mode(&self) -> bool {
false
}
fn should_attempt_step(
&self,
_worker: &Self::Worker,
_step: PipelineStep,
_drain_mode: bool,
) -> bool {
true
}
fn exclusive_step_owned(&self, _worker: &Self::Worker) -> Option<PipelineStep> {
None
}
}
#[allow(clippy::too_many_lines, clippy::cast_possible_truncation)]
pub fn generic_worker_loop<C: StepContext>(ctx: &C, worker: &mut C::Worker) {
let collect_stats = ctx.stats().is_some();
let check_completion_at_end = ctx.check_completion_at_end();
loop {
if ctx.has_error() {
break;
}
if !check_completion_at_end && ctx.is_complete() && !worker.has_any_held_items() {
break;
}
let mut did_work = false;
if ctx.should_attempt_sticky_read() {
while ctx.sticky_read_should_continue() {
if let Some(stats) = ctx.stats() {
stats.record_step_attempt(
worker.core().scheduler.thread_id(),
PipelineStep::Read,
);
}
let success = if collect_stats {
let start = Instant::now();
let success = ctx.execute_read_step(worker);
if success {
if let Some(stats) = ctx.stats() {
stats.record_step_for_thread(
PipelineStep::Read,
start.elapsed().as_nanos() as u64,
Some(worker.core().scheduler.thread_id()),
);
}
}
success
} else {
ctx.execute_read_step(worker)
};
if success {
did_work = true;
} else {
break;
}
}
}
ctx.check_drain_mode();
let backpressure = ctx.get_backpressure(worker);
let priorities_slice = worker.core_mut().scheduler.get_priorities(backpressure);
let priority_count = priorities_slice.len().min(9);
let mut priorities = [PipelineStep::Read; 9];
priorities[..priority_count].copy_from_slice(&priorities_slice[..priority_count]);
let drain_mode = ctx.is_drain_mode();
let owned_step = ctx.exclusive_step_owned(worker);
if let Some(step) = owned_step {
if step != PipelineStep::Read && !ctx.has_error() {
if let Some(stats) = ctx.stats() {
stats.record_step_attempt(worker.core().scheduler.thread_id(), step);
}
let (success, elapsed_ns, was_contention) = if collect_stats {
let start = Instant::now();
let (success, was_contention) = ctx.execute_step(worker, step);
(success, start.elapsed().as_nanos() as u64, was_contention)
} else {
let (success, was_contention) = ctx.execute_step(worker, step);
(success, 0, was_contention)
};
worker.core_mut().scheduler.record_outcome(step, success, was_contention);
if success {
if let Some(stats) = ctx.stats() {
stats.record_step_for_thread(
step,
elapsed_ns,
Some(worker.core().scheduler.thread_id()),
);
}
did_work = true;
}
}
}
if !did_work {
for &step in &priorities[..priority_count] {
if ctx.has_error() {
break;
}
if ctx.skip_read() && step == PipelineStep::Read {
continue;
}
if Some(step) == owned_step {
continue;
}
if !ctx.should_attempt_step(worker, step, drain_mode) {
continue;
}
if let Some(stats) = ctx.stats() {
stats.record_step_attempt(worker.core().scheduler.thread_id(), step);
}
let (success, elapsed_ns, was_contention) = if collect_stats {
let start = Instant::now();
let (success, was_contention) = ctx.execute_step(worker, step);
(success, start.elapsed().as_nanos() as u64, was_contention)
} else {
let (success, was_contention) = ctx.execute_step(worker, step);
(success, 0, was_contention)
};
worker.core_mut().scheduler.record_outcome(step, success, was_contention);
if success {
if let Some(stats) = ctx.stats() {
stats.record_step_for_thread(
step,
elapsed_ns,
Some(worker.core().scheduler.thread_id()),
);
}
did_work = true;
break; }
}
}
if check_completion_at_end && ctx.is_complete() && !worker.has_any_held_items() {
break;
}
handle_worker_backoff(worker, ctx.stats(), did_work);
}
}
pub trait HasHeldCompressed {
fn held_compressed_mut(&mut self) -> &mut Option<(u64, CompressedBlockBatch, usize)>;
}
pub trait HasHeldBoundaries<B> {
fn held_boundaries_mut(&mut self) -> &mut Option<(u64, B)>;
}
pub fn shared_try_step_compress<S, W>(state: &S, worker: &mut W) -> StepResult
where
S: OutputPipelineState,
W: HasCompressor + HasHeldCompressed + HasRecycledBuffers,
{
let mut advanced_held = false;
if let Some((serial, held, _heap_size)) = worker.held_compressed_mut().take() {
match state.q6_push((serial, held)) {
Ok(()) => {
state.record_q7_push_progress();
advanced_held = true;
}
Err((serial, held)) => {
let heap_size = held.estimate_heap_size();
*worker.held_compressed_mut() = Some((serial, held, heap_size));
return StepResult::OutputFull;
}
}
}
if state.q6_is_full() {
return if advanced_held { StepResult::Success } else { StepResult::OutputFull };
}
let Some((serial, serialized)) = state.q5_pop() else {
if let Some(stats) = state.stats() {
stats.record_queue_empty(6);
}
return if advanced_held { StepResult::Success } else { StepResult::InputEmpty };
};
state.record_q6_pop_progress();
let q5_heap_size = serialized.estimate_heap_size() as u64;
state.q5_track_pop(q5_heap_size);
let SerializedBatch { data, record_count, secondary_data } = serialized;
let blocks = {
let compressor = worker.compressor_mut();
if let Err(e) = compressor.write_all(&data) {
state.set_error(e);
return StepResult::InputEmpty;
}
if let Err(e) = compressor.flush() {
state.set_error(e);
return StepResult::InputEmpty;
}
compressor.take_blocks()
};
worker.recycle_buffer(data);
let compressed_bytes: u64 = blocks.iter().map(|b| b.data.len() as u64).sum();
state.record_compressed_bytes_out(compressed_bytes);
let batch = CompressedBlockBatch { blocks, record_count, secondary_data };
match state.q6_push((serial, batch)) {
Ok(()) => {
state.record_q7_push_progress();
StepResult::Success
}
Err((serial, batch)) => {
let heap_size = batch.estimate_heap_size();
*worker.held_compressed_mut() = Some((serial, batch, heap_size));
if advanced_held { StepResult::Success } else { StepResult::OutputFull }
}
}
}
#[allow(dead_code)]
fn shared_try_step_write<S: OutputPipelineState>(state: &S) -> bool {
let Some(mut guard) = state.output_try_lock() else {
return false; };
let Some(ref mut writer) = *guard else {
return false; };
while let Some((serial, batch)) = state.q6_pop() {
let q7_heap = batch.estimate_heap_size() as u64;
state.q6_track_pop(q7_heap);
state.q6_reorder_insert(serial, batch);
}
let mut wrote_any = false;
while let Some(batch) = state.q6_reorder_try_pop_next() {
for block in &batch.blocks {
if let Err(e) = writer.write_all(&block.data) {
state.set_error(e);
return false;
}
}
state.increment_written();
wrote_any = true;
}
wrote_any
}
#[inline]
pub fn try_advance_held<T>(queue: &ArrayQueue<(u64, T)>, held: &mut Option<(u64, T)>) -> bool {
if let Some((serial, item)) = held.take() {
match queue.push((serial, item)) {
Ok(()) => true,
Err(returned) => {
*held = Some(returned);
false
}
}
} else {
true }
}
#[inline]
pub fn try_push_or_hold<T>(
queue: &ArrayQueue<(u64, T)>,
serial: u64,
item: T,
held: &mut Option<(u64, T)>,
) -> StepResult {
match queue.push((serial, item)) {
Ok(()) => StepResult::Success,
Err(returned) => {
*held = Some(returned);
StepResult::OutputFull
}
}
}
pub trait ProcessPipelineState<G, P>: Send + Sync {
fn process_input_pop(&self) -> Option<(u64, Vec<G>)>;
fn process_output_is_full(&self) -> bool;
fn process_output_push(&self, item: (u64, Vec<P>)) -> Result<(), (u64, Vec<P>)>;
fn has_error(&self) -> bool;
fn set_error(&self, e: io::Error);
fn should_apply_process_backpressure(&self) -> bool {
self.process_output_is_full()
}
fn is_draining(&self) -> bool {
false
}
}
pub trait HasHeldProcessed<P> {
fn held_processed_mut(&mut self) -> &mut Option<(u64, Vec<P>, usize)>;
}
#[inline]
pub fn shared_try_step_process<S, W, G, P, F>(
state: &S,
worker: &mut W,
process_fn: F,
) -> StepResult
where
S: ProcessPipelineState<G, P>,
W: HasHeldProcessed<P>,
P: MemoryEstimate,
F: Fn(G) -> io::Result<P>,
{
let held = worker.held_processed_mut();
if let Some((serial, items, _heap_size)) = held.take() {
match state.process_output_push((serial, items)) {
Ok(()) => {
}
Err((serial, items)) => {
let heap_size: usize = items.iter().map(MemoryEstimate::estimate_heap_size).sum();
*held = Some((serial, items, heap_size));
return StepResult::OutputFull;
}
}
}
if state.has_error() {
return StepResult::InputEmpty;
}
if state.should_apply_process_backpressure() {
return StepResult::OutputFull;
}
let Some((serial, batch)) = state.process_input_pop() else {
return StepResult::InputEmpty;
};
let mut results = Vec::with_capacity(batch.len());
for item in batch {
match process_fn(item) {
Ok(processed) => results.push(processed),
Err(e) => {
state.set_error(e);
return StepResult::InputEmpty;
}
}
}
match state.process_output_push((serial, results)) {
Ok(()) => StepResult::Success,
Err((serial, results)) => {
let heap_size: usize = results.iter().map(MemoryEstimate::estimate_heap_size).sum();
*worker.held_processed_mut() = Some((serial, results, heap_size));
StepResult::OutputFull
}
}
}
pub trait SerializePipelineState<P>: Send + Sync {
fn serialize_input_pop(&self) -> Option<(u64, Vec<P>)>;
fn serialize_output_is_full(&self) -> bool;
fn serialize_output_push(
&self,
item: (u64, SerializedBatch),
) -> Result<(), (u64, SerializedBatch)>;
fn has_error(&self) -> bool;
fn set_error(&self, e: io::Error);
fn record_serialized_bytes(&self, _bytes: u64) {}
fn record_serialized_records(&self, _count: u64) {}
}
pub trait HasHeldSerialized {
fn held_serialized_mut(&mut self) -> &mut Option<(u64, SerializedBatch, usize)>;
fn serialization_buffer_mut(&mut self) -> &mut Vec<u8>;
fn serialization_buffer_capacity(&self) -> usize;
}
#[inline]
pub fn shared_try_step_serialize<S, W, P, F>(
state: &S,
worker: &mut W,
mut serialize_fn: F,
) -> StepResult
where
S: SerializePipelineState<P>,
W: HasHeldSerialized + HasRecycledBuffers,
F: FnMut(P, &mut Vec<u8>) -> io::Result<u64>,
{
if let Some((serial, held_batch, _heap_size)) = worker.held_serialized_mut().take() {
match state.serialize_output_push((serial, held_batch)) {
Ok(()) => {
}
Err((serial, held_batch)) => {
let heap_size = held_batch.estimate_heap_size();
*worker.held_serialized_mut() = Some((serial, held_batch, heap_size));
return StepResult::OutputFull;
}
}
}
if state.has_error() {
return StepResult::InputEmpty;
}
if state.serialize_output_is_full() {
return StepResult::OutputFull;
}
let Some((serial, batch)) = state.serialize_input_pop() else {
return StepResult::InputEmpty;
};
let capacity = worker.serialization_buffer_capacity();
let total_records = {
let buffer = worker.serialization_buffer_mut();
buffer.clear();
let mut total_records: u64 = 0;
for item in batch {
match serialize_fn(item, buffer) {
Ok(record_count) => {
total_records += record_count;
}
Err(e) => {
state.set_error(e);
return StepResult::InputEmpty;
}
}
}
total_records
};
let replacement = worker.take_or_alloc_buffer(capacity);
let buffer = worker.serialization_buffer_mut();
let data = std::mem::replace(buffer, replacement);
state.record_serialized_bytes(data.len() as u64);
state.record_serialized_records(total_records);
let result_batch = SerializedBatch { data, record_count: total_records, secondary_data: None };
match state.serialize_output_push((serial, result_batch)) {
Ok(()) => StepResult::Success,
Err((serial, result_batch)) => {
let heap_size = result_batch.estimate_heap_size();
*worker.held_serialized_mut() = Some((serial, result_batch, heap_size));
StepResult::OutputFull
}
}
}
pub trait WritePipelineState: Send + Sync {
fn write_input_queue(&self) -> &ArrayQueue<(u64, CompressedBlockBatch)>;
fn write_reorder_buffer(&self) -> &Mutex<ReorderBuffer<CompressedBlockBatch>>;
fn write_output(&self) -> &Mutex<Option<Box<dyn Write + Send>>>;
fn write_reorder_state(&self) -> &ReorderBufferState;
fn has_error(&self) -> bool;
fn set_error(&self, e: io::Error);
fn record_written(&self, count: u64);
fn stats(&self) -> Option<&PipelineStats>;
}
pub fn shared_try_step_write_new<S: WritePipelineState>(state: &S) -> StepResult {
if state.has_error() {
return StepResult::InputEmpty;
}
{
let mut reorder = state.write_reorder_buffer().lock();
let queue = state.write_input_queue();
while let Some((serial, batch)) = queue.pop() {
let heap_size = batch.estimate_heap_size();
reorder.insert_with_size(serial, batch, heap_size);
state.write_reorder_state().add_heap_bytes(heap_size as u64);
}
}
let Some(mut output_guard) = state.write_output().try_lock() else {
if let Some(stats) = state.stats() {
stats.record_contention(PipelineStep::Write);
}
return StepResult::InputEmpty;
};
let Some(ref mut output) = *output_guard else {
return StepResult::InputEmpty;
};
let mut wrote_any = false;
{
let mut reorder = state.write_reorder_buffer().lock();
let queue = state.write_input_queue();
while let Some((serial, batch)) = queue.pop() {
let heap_size = batch.estimate_heap_size();
reorder.insert_with_size(serial, batch, heap_size);
state.write_reorder_state().add_heap_bytes(heap_size as u64);
}
while let Some((batch, heap_size)) = reorder.try_pop_next_with_size() {
for block in &batch.blocks {
if let Err(e) = output.write_all(&block.data) {
state.set_error(e);
return StepResult::InputEmpty;
}
}
state.write_reorder_state().sub_heap_bytes(heap_size as u64);
state.write_reorder_state().update_next_seq(reorder.next_seq());
state.record_written(batch.record_count);
wrote_any = true;
}
}
if wrote_any { StepResult::Success } else { StepResult::InputEmpty }
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_stats_record_step_timing() {
let stats = PipelineStats::new();
stats.record_step(PipelineStep::Decompress, 1_000_000); stats.record_step(PipelineStep::Decompress, 2_000_000);
assert_eq!(stats.step_decompress_ns.load(Ordering::Relaxed), 3_000_000);
assert_eq!(stats.step_decompress_count.load(Ordering::Relaxed), 2);
}
#[test]
fn test_stats_record_step_for_thread() {
let stats = PipelineStats::new();
stats.record_step_for_thread(PipelineStep::Read, 500_000, Some(0));
stats.record_step_for_thread(PipelineStep::Read, 500_000, Some(1));
assert_eq!(stats.step_read_ns.load(Ordering::Relaxed), 1_000_000);
assert_eq!(stats.step_read_count.load(Ordering::Relaxed), 2);
assert_eq!(stats.per_thread_step_counts[0][0].load(Ordering::Relaxed), 1);
assert_eq!(stats.per_thread_step_counts[1][0].load(Ordering::Relaxed), 1);
}
#[test]
fn test_stats_record_queue_empty() {
let stats = PipelineStats::new();
stats.record_queue_empty(1);
stats.record_queue_empty(1);
stats.record_queue_empty(2);
stats.record_queue_empty(25); stats.record_queue_empty(7);
assert_eq!(stats.q1_empty.load(Ordering::Relaxed), 2);
assert_eq!(stats.q2_empty.load(Ordering::Relaxed), 1);
assert_eq!(stats.q2b_empty.load(Ordering::Relaxed), 1);
assert_eq!(stats.q7_empty.load(Ordering::Relaxed), 1);
assert_eq!(stats.q3_empty.load(Ordering::Relaxed), 0);
}
#[test]
fn test_stats_record_idle_for_thread() {
let stats = PipelineStats::new();
stats.record_idle_for_thread(0, 100_000);
stats.record_idle_for_thread(0, 200_000);
stats.record_idle_for_thread(1, 50_000);
assert_eq!(stats.idle_yields.load(Ordering::Relaxed), 3);
assert_eq!(stats.per_thread_idle_ns[0].load(Ordering::Relaxed), 300_000);
assert_eq!(stats.per_thread_idle_ns[1].load(Ordering::Relaxed), 50_000);
}
#[test]
fn test_memory_tracker_new() {
let tracker = MemoryTracker::new(1000);
assert_eq!(tracker.current(), 0);
assert_eq!(tracker.peak(), 0);
assert_eq!(tracker.limit(), 1000);
}
#[test]
fn test_memory_tracker_unlimited() {
let tracker = MemoryTracker::unlimited();
assert_eq!(tracker.limit(), 0);
assert!(tracker.try_add(1_000_000));
assert!(tracker.try_add(1_000_000_000));
assert_eq!(tracker.current(), 1_001_000_000);
}
#[test]
fn test_memory_tracker_try_add_under_limit() {
let tracker = MemoryTracker::new(1000);
assert!(tracker.try_add(500));
assert_eq!(tracker.current(), 500);
}
#[test]
fn test_memory_tracker_try_add_at_limit() {
let tracker = MemoryTracker::new(1000);
assert!(tracker.try_add(1000));
assert!(!tracker.try_add(1));
}
#[test]
fn test_memory_tracker_try_add_single_exceeds() {
let tracker = MemoryTracker::new(1000);
assert!(tracker.try_add(500)); assert!(tracker.try_add(600)); assert_eq!(tracker.current(), 1100);
assert!(!tracker.try_add(1));
}
#[test]
fn test_memory_tracker_remove_saturating() {
let tracker = MemoryTracker::new(1000);
tracker.try_add(100);
tracker.remove(200);
assert_eq!(tracker.current(), 0);
}
#[test]
fn test_memory_tracker_peak_tracking() {
let tracker = MemoryTracker::new(0); tracker.try_add(100);
tracker.try_add(200);
assert_eq!(tracker.peak(), 300);
tracker.remove(250);
assert_eq!(tracker.current(), 50);
assert_eq!(tracker.peak(), 300);
}
#[test]
fn test_memory_tracker_is_at_limit() {
let tracker = MemoryTracker::new(1000);
assert!(!tracker.is_at_limit());
tracker.try_add(999);
assert!(!tracker.is_at_limit());
tracker.try_add(1);
assert!(tracker.is_at_limit());
}
#[test]
fn test_memory_tracker_drain_threshold() {
let tracker = MemoryTracker::new(1000);
tracker.try_add(1000);
assert!(!tracker.is_below_drain_threshold()); tracker.remove(501);
assert!(tracker.is_below_drain_threshold()); }
#[test]
fn test_memory_tracker_default_is_unlimited() {
let tracker = MemoryTracker::default();
assert_eq!(tracker.limit(), 0);
assert!(tracker.try_add(1_000_000));
}
#[test]
fn test_reorder_buffer_state_new() {
let state = ReorderBufferState::new(1000);
assert_eq!(state.get_next_seq(), 0);
assert_eq!(state.get_heap_bytes(), 0);
assert_eq!(state.get_memory_limit(), 1000);
}
#[test]
fn test_reorder_buffer_state_can_proceed_next_seq() {
let state = ReorderBufferState::new(100);
state.add_heap_bytes(10_000); assert!(state.can_proceed(0));
}
#[test]
fn test_reorder_buffer_state_can_proceed_over_limit() {
let state = ReorderBufferState::new(1000);
state.add_heap_bytes(500);
assert!(!state.can_proceed(1));
assert!(state.can_proceed(0));
}
#[test]
fn test_reorder_buffer_state_is_memory_high() {
let state = ReorderBufferState::new(1000);
assert!(!state.is_memory_high());
state.add_heap_bytes(1000);
assert!(state.is_memory_high());
}
#[test]
fn test_reorder_buffer_state_is_memory_drained() {
let state = ReorderBufferState::new(1000);
state.add_heap_bytes(1000);
assert!(!state.is_memory_drained()); state.sub_heap_bytes(501);
assert!(state.is_memory_drained()); }
#[test]
fn test_reorder_buffer_state_effective_limit() {
let state_zero = ReorderBufferState::new(0);
assert!(!state_zero.is_memory_high());
let state_small = ReorderBufferState::new(100);
state_small.add_heap_bytes(100);
assert!(state_small.is_memory_high()); }
#[test]
fn test_reorder_buffer_state_add_sub_heap_bytes() {
let state = ReorderBufferState::new(0);
state.add_heap_bytes(100);
assert_eq!(state.get_heap_bytes(), 100);
state.add_heap_bytes(50);
assert_eq!(state.get_heap_bytes(), 150);
state.sub_heap_bytes(30);
assert_eq!(state.get_heap_bytes(), 120);
}
#[test]
fn test_group_key_single() {
let key = GroupKey::single(1, 100, 0, 5, 0, 42);
assert_eq!(key.ref_id1, 1);
assert_eq!(key.pos1, 100);
assert_eq!(key.strand1, 0);
assert_eq!(key.ref_id2, GroupKey::UNKNOWN_REF);
assert_eq!(key.pos2, GroupKey::UNKNOWN_POS);
assert_eq!(key.strand2, GroupKey::UNKNOWN_STRAND);
assert_eq!(key.library_idx, 5);
assert_eq!(key.cell_hash, 0);
assert_eq!(key.name_hash, 42);
}
#[test]
fn test_group_key_paired() {
let key = GroupKey::paired(1, 100, 0, 2, 200, 1, 3, 0, 99);
assert_eq!(key.ref_id1, 1);
assert_eq!(key.pos1, 100);
assert_eq!(key.strand1, 0);
assert_eq!(key.ref_id2, 2);
assert_eq!(key.pos2, 200);
assert_eq!(key.strand2, 1);
}
#[test]
fn test_group_key_paired_swap() {
let key = GroupKey::paired(5, 500, 1, 1, 100, 0, 3, 0, 99);
assert_eq!(key.ref_id1, 1);
assert_eq!(key.pos1, 100);
assert_eq!(key.strand1, 0);
assert_eq!(key.ref_id2, 5);
assert_eq!(key.pos2, 500);
assert_eq!(key.strand2, 1);
}
#[test]
fn test_group_key_position_key() {
let key = GroupKey::single(1, 100, 0, 5, 7, 42);
let pk = key.position_key();
assert_eq!(
pk,
(
1,
100,
0,
GroupKey::UNKNOWN_REF,
GroupKey::UNKNOWN_POS,
GroupKey::UNKNOWN_STRAND,
5,
7
)
);
}
#[test]
fn test_group_key_ord_by_position() {
let key_a = GroupKey::single(1, 100, 0, 0, 0, 0);
let key_b = GroupKey::single(2, 50, 0, 0, 0, 0);
assert!(key_a < key_b);
}
#[test]
fn test_group_key_ord_tiebreak_name_hash() {
let key_a = GroupKey::single(1, 100, 0, 0, 0, 10);
let key_b = GroupKey::single(1, 100, 0, 0, 0, 20);
assert!(key_a < key_b);
}
#[test]
fn test_group_key_default() {
let key = GroupKey::default();
assert_eq!(key.ref_id1, GroupKey::UNKNOWN_REF);
assert_eq!(key.pos1, GroupKey::UNKNOWN_POS);
assert_eq!(key.strand1, GroupKey::UNKNOWN_STRAND);
assert_eq!(key.ref_id2, GroupKey::UNKNOWN_REF);
assert_eq!(key.pos2, GroupKey::UNKNOWN_POS);
assert_eq!(key.strand2, GroupKey::UNKNOWN_STRAND);
assert_eq!(key.library_idx, 0);
assert_eq!(key.cell_hash, 0);
assert_eq!(key.name_hash, 0);
}
#[test]
fn test_group_key_eq() {
let key_a = GroupKey::single(1, 100, 0, 5, 0, 42);
let key_b = GroupKey::single(1, 100, 0, 5, 0, 42);
assert_eq!(key_a, key_b);
}
#[test]
fn test_group_key_paired_same_position() {
let key = GroupKey::paired(1, 100, 1, 1, 100, 0, 0, 0, 0);
assert_eq!(key.ref_id1, 1);
assert_eq!(key.pos1, 100);
assert_eq!(key.strand1, 0);
assert_eq!(key.strand2, 1);
}
#[test]
fn test_group_key_hash() {
use std::collections::HashSet;
let key_a = GroupKey::single(1, 100, 0, 0, 0, 42);
let key_b = GroupKey::single(1, 100, 0, 0, 0, 42);
let key_c = GroupKey::single(2, 200, 1, 0, 0, 99);
let mut set = HashSet::new();
set.insert(key_a);
assert!(set.contains(&key_b));
set.insert(key_c);
assert_eq!(set.len(), 2);
}
#[test]
fn test_pipeline_step_is_exclusive() {
assert!(PipelineStep::Read.is_exclusive());
assert!(!PipelineStep::Decompress.is_exclusive());
assert!(PipelineStep::FindBoundaries.is_exclusive());
assert!(!PipelineStep::Decode.is_exclusive());
assert!(PipelineStep::Group.is_exclusive());
assert!(!PipelineStep::Process.is_exclusive());
assert!(!PipelineStep::Serialize.is_exclusive());
assert!(!PipelineStep::Compress.is_exclusive());
assert!(PipelineStep::Write.is_exclusive());
}
#[test]
fn test_pipeline_step_all() {
let all = PipelineStep::all();
assert_eq!(all.len(), 9);
assert_eq!(all[0], PipelineStep::Read);
assert_eq!(all[1], PipelineStep::Decompress);
assert_eq!(all[2], PipelineStep::FindBoundaries);
assert_eq!(all[3], PipelineStep::Decode);
assert_eq!(all[4], PipelineStep::Group);
assert_eq!(all[5], PipelineStep::Process);
assert_eq!(all[6], PipelineStep::Serialize);
assert_eq!(all[7], PipelineStep::Compress);
assert_eq!(all[8], PipelineStep::Write);
}
#[test]
fn test_pipeline_step_from_index() {
assert_eq!(PipelineStep::from_index(0), PipelineStep::Read);
assert_eq!(PipelineStep::from_index(1), PipelineStep::Decompress);
assert_eq!(PipelineStep::from_index(2), PipelineStep::FindBoundaries);
assert_eq!(PipelineStep::from_index(3), PipelineStep::Decode);
assert_eq!(PipelineStep::from_index(4), PipelineStep::Group);
assert_eq!(PipelineStep::from_index(5), PipelineStep::Process);
assert_eq!(PipelineStep::from_index(6), PipelineStep::Serialize);
assert_eq!(PipelineStep::from_index(7), PipelineStep::Compress);
assert_eq!(PipelineStep::from_index(8), PipelineStep::Write);
}
#[test]
fn test_pipeline_step_short_name() {
assert_eq!(PipelineStep::Read.short_name(), "Rd");
assert_eq!(PipelineStep::Decompress.short_name(), "Dc");
assert_eq!(PipelineStep::FindBoundaries.short_name(), "Fb");
assert_eq!(PipelineStep::Decode.short_name(), "De");
assert_eq!(PipelineStep::Group.short_name(), "Gr");
assert_eq!(PipelineStep::Process.short_name(), "Pr");
assert_eq!(PipelineStep::Serialize.short_name(), "Se");
assert_eq!(PipelineStep::Compress.short_name(), "Co");
assert_eq!(PipelineStep::Write.short_name(), "Wr");
for step in PipelineStep::all() {
assert_eq!(step.short_name().len(), 2);
}
}
#[test]
fn test_step_result_is_success() {
assert!(StepResult::Success.is_success());
assert!(!StepResult::OutputFull.is_success());
assert!(!StepResult::InputEmpty.is_success());
}
#[test]
fn test_step_result_variants() {
let s = StepResult::Success;
let o = StepResult::OutputFull;
let i = StepResult::InputEmpty;
assert_ne!(s, o);
assert_ne!(s, i);
assert_ne!(o, i);
}
#[test]
fn test_raw_block_batch_new_empty() {
let batch = RawBlockBatch::new();
assert!(batch.is_empty());
assert_eq!(batch.len(), 0);
}
#[test]
fn test_raw_block_batch_with_capacity() {
let batch = RawBlockBatch::with_capacity(32);
assert!(batch.is_empty());
assert!(batch.blocks.capacity() >= 32);
}
#[test]
fn test_compressed_block_batch_new() {
let batch = CompressedBlockBatch::new();
assert!(batch.is_empty());
assert_eq!(batch.len(), 0);
assert_eq!(batch.record_count, 0);
}
#[test]
fn test_compressed_block_batch_clear() {
let mut batch = CompressedBlockBatch::new();
batch.record_count = 42;
batch.secondary_data = Some(vec![1, 2, 3]);
batch.clear();
assert!(batch.is_empty());
assert_eq!(batch.record_count, 0);
assert!(batch.secondary_data.is_none());
}
#[test]
fn test_bgzf_batch_config_default() {
let config = BgzfBatchConfig::default();
assert_eq!(config.blocks_per_batch, 16);
assert_eq!(config.compression_level, 6);
}
#[test]
fn test_bgzf_batch_config_new() {
let config = BgzfBatchConfig::new(64);
assert_eq!(config.blocks_per_batch, 64);
assert_eq!(config.compression_level, 6);
}
#[test]
fn test_decompressed_batch_new_empty() {
let batch = DecompressedBatch::new();
assert!(batch.is_empty());
assert!(batch.data.is_empty());
}
#[test]
fn test_serialized_batch_clear() {
let mut batch = SerializedBatch::new();
batch.data.extend_from_slice(&[1, 2, 3]);
batch.record_count = 10;
batch.secondary_data = Some(vec![4, 5, 6]);
batch.clear();
assert!(batch.is_empty());
assert_eq!(batch.record_count, 0);
assert!(batch.secondary_data.is_none());
}
#[test]
fn test_pipeline_config_new_defaults() {
let config = PipelineConfig::new(4, 6);
assert_eq!(config.num_threads, 4);
assert_eq!(config.compression_level, 6);
assert_eq!(config.queue_capacity, 64);
assert_eq!(config.batch_size, 1);
assert_eq!(config.queue_memory_limit, 0);
assert!(!config.collect_stats);
}
#[test]
fn test_pipeline_config_builder_chain() {
let config = PipelineConfig::new(4, 6)
.with_compression_level(9)
.with_batch_size(100)
.with_stats(true)
.with_queue_memory_limit(1_000_000);
assert_eq!(config.compression_level, 9);
assert_eq!(config.batch_size, 100);
assert!(config.collect_stats);
assert_eq!(config.queue_memory_limit, 1_000_000);
}
#[test]
fn test_pipeline_config_auto_tuned_1_thread() {
let config = PipelineConfig::auto_tuned(1, 6);
assert_eq!(config.num_threads, 1);
assert_eq!(config.queue_capacity, 64);
}
#[test]
fn test_pipeline_config_auto_tuned_8_threads() {
let config = PipelineConfig::auto_tuned(8, 6);
assert_eq!(config.num_threads, 8);
assert_eq!(config.queue_capacity, 128);
assert_eq!(config.blocks_per_read_batch, 48);
}
#[test]
fn test_pipeline_config_auto_tuned_32_threads() {
let config = PipelineConfig::auto_tuned(32, 6);
assert_eq!(config.num_threads, 32);
assert_eq!(config.queue_capacity, 256);
}
#[test]
fn test_pipeline_config_with_compression_level() {
let config = PipelineConfig::new(4, 6).with_compression_level(12);
assert_eq!(config.compression_level, 12);
}
#[test]
fn test_pipeline_config_with_batch_size_min_1() {
let config = PipelineConfig::new(4, 6).with_batch_size(0);
assert_eq!(config.batch_size, 1);
}
#[test]
fn test_pipeline_config_with_queue_memory_limit() {
let config = PipelineConfig::new(4, 6).with_queue_memory_limit(500_000_000);
assert_eq!(config.queue_memory_limit, 500_000_000);
}
#[test]
fn test_pipeline_validation_error_display_empty() {
let err = PipelineValidationError {
non_empty_queues: vec![],
counter_mismatches: vec![],
leaked_heap_bytes: 0,
};
let display = format!("{err}");
assert!(display.contains("Pipeline validation failed"));
}
#[test]
fn test_pipeline_validation_error_display_full() {
let err = PipelineValidationError {
non_empty_queues: vec!["Q1".to_string(), "Q2".to_string()],
counter_mismatches: vec!["read_count != write_count".to_string()],
leaked_heap_bytes: 1024,
};
let display = format!("{err}");
assert!(display.contains("Pipeline validation failed"));
assert!(display.contains("Q1"));
assert!(display.contains("Q2"));
assert!(display.contains("read_count != write_count"));
assert!(display.contains("1024"));
}
#[test]
fn test_extract_panic_message_str() {
let payload: Box<dyn std::any::Any + Send> = Box::new("something went wrong");
let msg = extract_panic_message(payload);
assert_eq!(msg, "something went wrong");
}
#[test]
fn test_extract_panic_message_string() {
let payload: Box<dyn std::any::Any + Send> = Box::new(String::from("an error occurred"));
let msg = extract_panic_message(payload);
assert_eq!(msg, "an error occurred");
}
#[test]
fn test_extract_panic_message_other() {
let payload: Box<dyn std::any::Any + Send> = Box::new(42_i32);
let msg = extract_panic_message(payload);
assert_eq!(msg, "Unknown panic");
}
#[test]
fn test_worker_core_state_initial_values() {
use super::super::scheduler::SchedulerStrategy;
let state = WorkerCoreState::new(6, 0, 4, SchedulerStrategy::default(), ActiveSteps::all());
assert_eq!(state.backoff_us, MIN_BACKOFF_US);
}
#[test]
fn test_worker_core_state_reset_backoff() {
use super::super::scheduler::SchedulerStrategy;
let mut state =
WorkerCoreState::new(6, 0, 4, SchedulerStrategy::default(), ActiveSteps::all());
state.increase_backoff();
assert!(state.backoff_us > MIN_BACKOFF_US);
state.reset_backoff();
assert_eq!(state.backoff_us, MIN_BACKOFF_US);
}
#[test]
fn test_worker_core_state_increase_backoff() {
use super::super::scheduler::SchedulerStrategy;
let mut state =
WorkerCoreState::new(6, 0, 4, SchedulerStrategy::default(), ActiveSteps::all());
assert_eq!(state.backoff_us, MIN_BACKOFF_US); state.increase_backoff();
assert_eq!(state.backoff_us, MIN_BACKOFF_US * 2); state.increase_backoff();
assert_eq!(state.backoff_us, MIN_BACKOFF_US * 4); for _ in 0..20 {
state.increase_backoff();
}
assert_eq!(state.backoff_us, MAX_BACKOFF_US);
}
struct TestProcessed {
size: usize,
}
impl MemoryEstimate for TestProcessed {
fn estimate_heap_size(&self) -> usize {
self.size
}
}
#[test]
fn test_output_queues_new() {
let output: Box<dyn std::io::Write + Send> = Box::new(Vec::<u8>::new());
let queues: OutputPipelineQueues<(), TestProcessed> =
OutputPipelineQueues::new(16, output, None, "test", 0);
assert!(queues.groups.is_empty());
assert!(queues.processed.is_empty());
assert!(queues.serialized.is_empty());
assert!(queues.compressed.is_empty());
}
#[test]
fn test_output_queues_set_take_error() {
let output: Box<dyn std::io::Write + Send> = Box::new(Vec::<u8>::new());
let queues: OutputPipelineQueues<(), TestProcessed> =
OutputPipelineQueues::new(16, output, None, "test", 0);
assert!(!queues.has_error());
queues.set_error(io::Error::other("test error"));
assert!(queues.has_error());
let err = queues.take_error();
assert!(err.is_some());
assert_eq!(err.expect("error was set above").to_string(), "test error");
}
#[test]
fn test_output_queues_draining() {
let output: Box<dyn std::io::Write + Send> = Box::new(Vec::<u8>::new());
let queues: OutputPipelineQueues<(), TestProcessed> =
OutputPipelineQueues::new(16, output, None, "test", 0);
assert!(!queues.is_draining());
queues.set_draining(true);
assert!(queues.is_draining());
}
#[test]
fn test_output_queues_queue_depths_empty() {
let output: Box<dyn std::io::Write + Send> = Box::new(Vec::<u8>::new());
let queues: OutputPipelineQueues<(), TestProcessed> =
OutputPipelineQueues::new(16, output, None, "test", 0);
let depths = queues.queue_depths();
assert_eq!(depths.groups, 0);
assert_eq!(depths.processed, 0);
assert_eq!(depths.serialized, 0);
assert_eq!(depths.compressed, 0);
}
#[test]
fn test_output_queues_are_queues_empty() {
let output: Box<dyn std::io::Write + Send> = Box::new(Vec::<u8>::new());
let queues: OutputPipelineQueues<(), TestProcessed> =
OutputPipelineQueues::new(16, output, None, "test", 0);
assert!(queues.are_queues_empty());
}
#[test]
fn test_memory_estimate_unit() {
let unit = ();
assert_eq!(unit.estimate_heap_size(), 0);
}
#[test]
fn test_decoded_record_raw_accessor() {
let raw = DecodedRecord::from_raw_bytes(vec![0u8; 32], GroupKey::default());
assert!(!raw.raw_bytes().is_empty());
}
#[test]
fn test_memory_estimate_serialized_batch() {
let mut batch = SerializedBatch::new();
batch.data.reserve(1024);
assert!(batch.estimate_heap_size() >= 1024);
}
#[test]
fn test_memory_estimate_decompressed_batch() {
let mut batch = DecompressedBatch::new();
batch.data.reserve(2048);
assert!(batch.estimate_heap_size() >= 2048);
}
#[test]
fn test_memory_estimate_vec_record_buf() {
use fgumi_raw_bam::{SamBuilder as RawSamBuilder, raw_record_to_record_buf};
use noodles::sam::Header;
use noodles::sam::alignment::record_buf::RecordBuf;
let mut b = RawSamBuilder::new();
b.sequence(b"ACGT").qualities(&[30, 30, 30, 30]).flags(0);
let record = raw_record_to_record_buf(&b.build(), &Header::default())
.expect("raw_record_to_record_buf failed in test");
let mut records: Vec<RecordBuf> = Vec::with_capacity(10);
records.push(record);
let estimate = records.estimate_heap_size();
let vec_overhead = 10 * std::mem::size_of::<RecordBuf>();
assert!(
estimate >= vec_overhead,
"estimate {estimate} should include Vec<RecordBuf> overhead {vec_overhead}"
);
}
#[test]
fn test_serialized_batch_memory_estimate_with_secondary() {
let batch = SerializedBatch {
data: vec![0u8; 100],
record_count: 5,
secondary_data: Some(vec![0u8; 50]),
};
let estimate = batch.estimate_heap_size();
assert!(
estimate >= 150,
"Should include both primary ({}) and secondary data, got {estimate}",
batch.data.capacity()
);
}
#[test]
fn test_serialized_batch_memory_estimate_without_secondary() {
let batch = SerializedBatch { data: vec![0u8; 100], record_count: 5, secondary_data: None };
let estimate = batch.estimate_heap_size();
assert!(estimate >= 100);
assert!(estimate < 150, "Should not include phantom secondary data, got {estimate}");
}
#[test]
fn test_compressed_block_batch_memory_with_secondary() {
let batch = CompressedBlockBatch {
blocks: vec![],
record_count: 0,
secondary_data: Some(vec![0u8; 200]),
};
assert!(
batch.estimate_heap_size() >= 200,
"Should include secondary data, got {}",
batch.estimate_heap_size()
);
}
struct MockCompressState {
q5: ArrayQueue<(u64, SerializedBatch)>,
q6: ArrayQueue<(u64, CompressedBlockBatch)>,
reorder: ReorderBufferState,
error: std::sync::Mutex<Option<io::Error>>,
}
impl MockCompressState {
fn new(capacity: usize, memory_limit: u64) -> Self {
Self {
q5: ArrayQueue::new(capacity),
q6: ArrayQueue::new(capacity),
reorder: ReorderBufferState::new(memory_limit),
error: std::sync::Mutex::new(None),
}
}
}
impl OutputPipelineState for MockCompressState {
type Processed = Vec<u8>;
fn has_error(&self) -> bool {
self.error.lock().unwrap().is_some()
}
fn set_error(&self, error: io::Error) {
*self.error.lock().unwrap() = Some(error);
}
fn q5_pop(&self) -> Option<(u64, SerializedBatch)> {
self.q5.pop()
}
fn q5_push(&self, item: (u64, SerializedBatch)) -> Result<(), (u64, SerializedBatch)> {
self.q5.push(item)
}
fn q5_is_full(&self) -> bool {
self.q5.is_full()
}
fn q6_pop(&self) -> Option<(u64, CompressedBlockBatch)> {
self.q6.pop()
}
fn q6_push(
&self,
item: (u64, CompressedBlockBatch),
) -> Result<(), (u64, CompressedBlockBatch)> {
self.q6.push(item)
}
fn q6_is_full(&self) -> bool {
self.q6.is_full()
}
fn q6_reorder_insert(&self, _serial: u64, _batch: CompressedBlockBatch) {}
fn q6_reorder_try_pop_next(&self) -> Option<CompressedBlockBatch> {
None
}
fn output_try_lock(
&self,
) -> Option<parking_lot::MutexGuard<'_, Option<Box<dyn Write + Send>>>> {
None
}
fn increment_written(&self) -> u64 {
0
}
fn write_reorder_can_proceed(&self, serial: u64) -> bool {
self.reorder.can_proceed(serial)
}
fn write_reorder_is_memory_high(&self) -> bool {
self.reorder.is_memory_high()
}
}
struct MockCompressWorker {
compressor: InlineBgzfCompressor,
held_compressed: Option<(u64, CompressedBlockBatch, usize)>,
recycled: Vec<Vec<u8>>,
}
impl MockCompressWorker {
fn new() -> Self {
Self {
compressor: InlineBgzfCompressor::new(1),
held_compressed: None,
recycled: Vec::new(),
}
}
}
impl HasCompressor for MockCompressWorker {
fn compressor_mut(&mut self) -> &mut InlineBgzfCompressor {
&mut self.compressor
}
}
impl HasHeldCompressed for MockCompressWorker {
fn held_compressed_mut(&mut self) -> &mut Option<(u64, CompressedBlockBatch, usize)> {
&mut self.held_compressed
}
}
impl HasRecycledBuffers for MockCompressWorker {
fn take_or_alloc_buffer(&mut self, capacity: usize) -> Vec<u8> {
self.recycled.pop().unwrap_or_else(|| Vec::with_capacity(capacity))
}
fn recycle_buffer(&mut self, buf: Vec<u8>) {
if self.recycled.len() < 2 {
self.recycled.push(buf);
}
}
}
#[test]
fn test_compress_held_pushes_unconditionally() {
let state = MockCompressState::new(8, 1000);
state.reorder.add_heap_bytes(600);
let batch_0 =
SerializedBatch { data: vec![0u8; 10], record_count: 1, secondary_data: None };
assert!(state.q5.push((0, batch_0)).is_ok());
let mut worker = MockCompressWorker::new();
let held_batch = CompressedBlockBatch::new();
worker.held_compressed = Some((5, held_batch, 100));
let result = shared_try_step_compress(&state, &mut worker);
assert_eq!(result, StepResult::Success, "should succeed");
let first = state.q6.pop();
assert!(first.is_some(), "Q6 should have serial 5");
assert_eq!(first.unwrap().0, 5);
let second = state.q6.pop();
assert!(second.is_some(), "Q6 should have serial 0");
assert_eq!(second.unwrap().0, 0);
assert!(worker.held_compressed.is_none(), "held slot should be empty");
}
#[test]
fn test_compress_held_next_seq_advances_normally() {
let state = MockCompressState::new(8, 1000);
state.reorder.add_heap_bytes(600);
let mut worker = MockCompressWorker::new();
let held_batch = CompressedBlockBatch::new();
worker.held_compressed = Some((0, held_batch, 100));
let batch_1 =
SerializedBatch { data: vec![0u8; 10], record_count: 1, secondary_data: None };
assert!(state.q5.push((1, batch_1)).is_ok());
let result = shared_try_step_compress(&state, &mut worker);
assert!(
result == StepResult::Success || result == StepResult::OutputFull,
"should have advanced held serial 0"
);
let popped = state.q6.pop();
assert!(popped.is_some(), "serial 0 should have been pushed to Q6");
assert_eq!(popped.unwrap().0, 0);
}
#[test]
fn test_compress_no_held_empty_q5_returns_input_empty() {
let state = MockCompressState::new(8, 1000);
let mut worker = MockCompressWorker::new();
let result = shared_try_step_compress(&state, &mut worker);
assert_eq!(result, StepResult::InputEmpty);
}
#[test]
fn test_compress_held_blocked_by_full_q6() {
let state = MockCompressState::new(1, 1000);
let dummy = CompressedBlockBatch::new();
assert!(state.q6.push((99, dummy)).is_ok());
let batch_0 =
SerializedBatch { data: vec![0u8; 10], record_count: 1, secondary_data: None };
assert!(state.q5.push((0, batch_0)).is_ok());
let mut worker = MockCompressWorker::new();
let held_batch = CompressedBlockBatch::new();
worker.held_compressed = Some((5, held_batch, 100));
let result = shared_try_step_compress(&state, &mut worker);
assert!(!state.has_error(), "should NOT set error");
assert!(worker.held_compressed.is_some(), "held batch should be preserved");
assert_eq!(worker.held_compressed.as_ref().unwrap().0, 5, "serial 5 should still be held");
assert!(!state.q5.is_empty(), "Q5 should not have been popped");
assert_eq!(result, StepResult::OutputFull);
}
#[test]
fn test_compress_memory_high_does_not_block_new_work() {
let state = MockCompressState::new(8, 1000);
state.reorder.add_heap_bytes(1200);
assert!(state.write_reorder_is_memory_high(), "precondition: reorder must be memory-high");
assert_eq!(state.reorder.next_seq.load(Ordering::SeqCst), 0);
let batch_0 =
SerializedBatch { data: vec![0u8; 32], record_count: 1, secondary_data: None };
assert!(state.q5.push((0, batch_0)).is_ok());
let mut worker = MockCompressWorker::new();
assert!(worker.held_compressed.is_none());
let result = shared_try_step_compress(&state, &mut worker);
assert!(!state.has_error(), "should NOT set error");
assert_eq!(
result,
StepResult::Success,
"Compress must proceed past memory-high; Q5 is not serial-ordered so \
blocking here deadlocks Write"
);
assert!(state.q5.is_empty(), "Q5 should have been drained");
let pushed = state.q6.pop();
assert!(pushed.is_some(), "serial 0 should have been compressed into Q6");
assert_eq!(pushed.unwrap().0, 0, "pushed serial should be 0 (the waited-for next_seq)");
}
struct MockToctouCompressState {
q5: ArrayQueue<(u64, SerializedBatch)>,
q6_items: std::sync::Mutex<Vec<(u64, CompressedBlockBatch)>>,
q6_push_budget: std::sync::atomic::AtomicUsize,
reorder: ReorderBufferState,
error: std::sync::Mutex<Option<io::Error>>,
}
impl MockToctouCompressState {
fn new(q5_capacity: usize, q6_push_budget: usize) -> Self {
Self {
q5: ArrayQueue::new(q5_capacity),
q6_items: std::sync::Mutex::new(Vec::new()),
q6_push_budget: std::sync::atomic::AtomicUsize::new(q6_push_budget),
reorder: ReorderBufferState::new(1024),
error: std::sync::Mutex::new(None),
}
}
}
impl OutputPipelineState for MockToctouCompressState {
type Processed = Vec<u8>;
fn has_error(&self) -> bool {
self.error.lock().unwrap().is_some()
}
fn set_error(&self, error: io::Error) {
*self.error.lock().unwrap() = Some(error);
}
fn q5_pop(&self) -> Option<(u64, SerializedBatch)> {
self.q5.pop()
}
fn q5_push(&self, item: (u64, SerializedBatch)) -> Result<(), (u64, SerializedBatch)> {
self.q5.push(item)
}
fn q5_is_full(&self) -> bool {
self.q5.is_full()
}
fn q6_pop(&self) -> Option<(u64, CompressedBlockBatch)> {
self.q6_items.lock().unwrap().pop()
}
fn q6_push(
&self,
item: (u64, CompressedBlockBatch),
) -> Result<(), (u64, CompressedBlockBatch)> {
let decremented =
self.q6_push_budget.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| {
if x > 0 { Some(x - 1) } else { None }
});
if decremented.is_ok() {
self.q6_items.lock().unwrap().push(item);
Ok(())
} else {
Err(item)
}
}
fn q6_is_full(&self) -> bool {
false
}
fn q6_reorder_insert(&self, _serial: u64, _batch: CompressedBlockBatch) {}
fn q6_reorder_try_pop_next(&self) -> Option<CompressedBlockBatch> {
None
}
fn output_try_lock(
&self,
) -> Option<parking_lot::MutexGuard<'_, Option<Box<dyn Write + Send>>>> {
None
}
fn increment_written(&self) -> u64 {
0
}
fn write_reorder_can_proceed(&self, serial: u64) -> bool {
self.reorder.can_proceed(serial)
}
fn write_reorder_is_memory_high(&self) -> bool {
self.reorder.is_memory_high()
}
}
#[test]
fn test_compress_advanced_held_survives_p5_toctou_failure() {
let state = MockToctouCompressState::new(8, 1);
let batch_0 =
SerializedBatch { data: vec![0u8; 32], record_count: 1, secondary_data: None };
assert!(state.q5.push((0, batch_0)).is_ok());
let mut worker = MockCompressWorker::new();
let held_batch = CompressedBlockBatch::new();
worker.held_compressed = Some((5, held_batch, 100));
let result = shared_try_step_compress(&state, &mut worker);
assert!(!state.has_error(), "should NOT set error");
let pushes = state.q6_items.lock().unwrap();
assert_eq!(pushes.len(), 1, "P1 should have pushed exactly one item into Q6");
assert_eq!(pushes[0].0, 5, "P1 should have pushed the held serial 5");
drop(pushes);
assert!(state.q5.is_empty(), "Q5 should have been drained at P3");
let held = worker
.held_compressed
.as_ref()
.expect("fresh batch must be held after P5 TOCTOU failure");
assert_eq!(held.0, 0, "the held fresh batch should be serial 0");
assert_eq!(
result,
StepResult::Success,
"P1's successful held-batch push must survive a P5 TOCTOU failure"
);
}
}