use crossbeam_queue::ArrayQueue;
use noodles::bam::{self};
use noodles::sam::{Header, alignment::RecordBuf};
use parking_lot::Mutex;
use std::collections::VecDeque;
use std::fs::File;
use std::io::{self, BufReader, BufWriter, Read, Write};
use std::path::Path;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::thread;
use std::time::{Duration, Instant};
use crate::bam_io::is_stdout_path;
use crate::bgzf_reader::{BGZF_EOF, decompress_block_into, read_raw_blocks};
use crate::bgzf_writer::InlineBgzfCompressor;
use crate::progress::ProgressTracker;
use crate::reorder_buffer::ReorderBuffer;
use crate::sam::SamTag;
use noodles::sam::alignment::record::data::field::Tag;
use super::base::{
ActiveSteps, BatchWeight, CompressedBlockBatch, DecodedRecord, DecompressedBatch,
GroupKeyConfig, HasCompressor, HasHeldBoundaries, HasHeldCompressed, HasHeldProcessed,
HasHeldSerialized, HasRecycledBuffers, HasWorkerCore, MemoryEstimate, MonitorableState,
OutputPipelineQueues, OutputPipelineState, PROGRESS_LOG_INTERVAL, PipelineConfig,
PipelineLifecycle, PipelineStats, PipelineStep, PipelineValidationError, ProcessPipelineState,
QueueSample, RawBlockBatch, ReorderBufferState, SerializePipelineState, SerializedBatch,
StepContext, WorkerCoreState, WorkerStateCommon, WritePipelineState, finalize_pipeline,
generic_worker_loop, handle_worker_panic, join_monitor_thread, join_worker_threads,
shared_try_step_compress,
};
use super::deadlock::{
DeadlockAction, DeadlockConfig, DeadlockState, QueueSnapshot, check_deadlock_and_restore,
};
use super::scheduler::{BackpressureState, SchedulerStrategy};
use crate::read_info::{LibraryIndex, compute_group_key};
use crate::sort::bam_fields;
const IO_BUFFER_SIZE: usize = 8 * 1024 * 1024;
pub const DEFAULT_TARGET_TEMPLATES_PER_BATCH: usize = 500;
#[derive(Debug, Clone)]
pub struct BoundaryBatch {
pub buffer: Vec<u8>,
pub offsets: Vec<usize>,
}
pub struct BoundaryState {
leftover: Vec<u8>,
work_buffer: Vec<u8>,
header_skipped: bool,
}
impl BoundaryState {
#[must_use]
pub fn new() -> Self {
Self { leftover: Vec::new(), work_buffer: Vec::new(), header_skipped: false }
}
#[must_use]
pub fn new_no_header() -> Self {
Self { leftover: Vec::new(), work_buffer: Vec::new(), header_skipped: true }
}
fn parse_header_size(data: &[u8]) -> Option<usize> {
if data.len() < 8 {
return None;
}
if &data[0..4] != bam_fields::BAM_MAGIC {
return Some(0);
}
let l_text = u32::from_le_bytes([data[4], data[5], data[6], data[7]]) as usize;
let mut offset = 8 + l_text;
if data.len() < offset + 4 {
return None;
}
let n_ref = u32::from_le_bytes([
data[offset],
data[offset + 1],
data[offset + 2],
data[offset + 3],
]) as usize;
offset += 4;
for _ in 0..n_ref {
if data.len() < offset + 4 {
return None;
}
let l_name = u32::from_le_bytes([
data[offset],
data[offset + 1],
data[offset + 2],
data[offset + 3],
]) as usize;
offset += 4 + l_name + 4;
if data.len() < offset {
return None;
}
}
Some(offset)
}
pub fn find_boundaries(&mut self, decompressed: &[u8]) -> io::Result<BoundaryBatch> {
self.work_buffer.clear();
if !self.leftover.is_empty() {
self.work_buffer.append(&mut self.leftover);
}
self.work_buffer.extend_from_slice(decompressed);
let mut cursor = 0usize;
if !self.header_skipped {
if let Some(header_size) = Self::parse_header_size(&self.work_buffer) {
cursor = header_size;
self.header_skipped = true;
} else {
std::mem::swap(&mut self.leftover, &mut self.work_buffer);
return Ok(BoundaryBatch { buffer: Vec::new(), offsets: vec![0] });
}
}
let start_cursor = cursor;
let mut offsets = vec![0usize];
while cursor + 4 <= self.work_buffer.len() {
let block_size = u32::from_le_bytes([
self.work_buffer[cursor],
self.work_buffer[cursor + 1],
self.work_buffer[cursor + 2],
self.work_buffer[cursor + 3],
]) as usize;
let record_end = cursor + 4 + block_size;
if record_end > self.work_buffer.len() {
break; }
cursor = record_end;
offsets.push(cursor - start_cursor);
}
self.leftover.clear();
self.leftover.extend_from_slice(&self.work_buffer[cursor..]);
let buffer = self.work_buffer[start_cursor..cursor].to_vec();
#[cfg(debug_assertions)]
for i in 0..offsets.len().saturating_sub(1) {
let start = offsets[i];
let end = offsets[i + 1];
if end > start + 4 {
let stored = u32::from_le_bytes([
buffer[start],
buffer[start + 1],
buffer[start + 2],
buffer[start + 3],
]) as usize;
let expected = end - start - 4;
debug_assert_eq!(
stored, expected,
"find_boundaries: block_size mismatch at record {i}: stored={stored}, expected={expected}"
);
}
}
Ok(BoundaryBatch { buffer, offsets })
}
pub fn finish(&mut self) -> io::Result<Option<BoundaryBatch>> {
if self.leftover.is_empty() {
return Ok(None);
}
let mut offsets = vec![0usize];
let mut cursor = 0usize;
while cursor + 4 <= self.leftover.len() {
let block_size = u32::from_le_bytes([
self.leftover[cursor],
self.leftover[cursor + 1],
self.leftover[cursor + 2],
self.leftover[cursor + 3],
]) as usize;
let record_end = cursor + 4 + block_size;
if record_end > self.leftover.len() {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
format!(
"Incomplete BAM record at EOF: need {} bytes, have {}",
record_end - cursor,
self.leftover.len() - cursor
),
));
}
cursor = record_end;
offsets.push(cursor);
}
if cursor == 0 {
return Ok(None);
}
Ok(Some(BoundaryBatch { buffer: std::mem::take(&mut self.leftover), offsets }))
}
}
impl Default for BoundaryState {
fn default() -> Self {
Self::new()
}
}
pub fn decode_records(
batch: &BoundaryBatch,
group_key_config: &GroupKeyConfig,
) -> io::Result<Vec<DecodedRecord>> {
let num_records = batch.offsets.len().saturating_sub(1);
let mut records = Vec::with_capacity(num_records);
let header = noodles::sam::Header::default();
for i in 0..num_records {
let start = batch.offsets[i];
let end = batch.offsets[i + 1];
if end <= start + 4 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"Invalid record bounds: start={start}, end={end}, record_index={i}, \
num_records={num_records}, buffer_len={}",
batch.buffer.len()
),
));
}
let stored_block_size = u32::from_le_bytes([
batch.buffer[start],
batch.buffer[start + 1],
batch.buffer[start + 2],
batch.buffer[start + 3],
]) as usize;
let expected_block_size = end - start - 4;
if stored_block_size != expected_block_size {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"Block size mismatch: stored={stored_block_size}, expected={expected_block_size}, \
record_index={i}, start={start}, end={end}, buffer_len={}",
batch.buffer.len()
),
));
}
let record_data = &batch.buffer[start + 4..end];
if group_key_config.raw_byte_mode {
let raw = record_data.to_vec();
if raw.len() < 32 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("BAM record too short: len={}", raw.len()),
));
}
let l_rn = raw[8] as usize;
if raw.len() < 32 + l_rn {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"BAM record truncated: len={}, l_read_name={l_rn} (need >= {})",
raw.len(),
32 + l_rn
),
));
}
let key = compute_group_key_from_raw(
&raw,
&group_key_config.library_index,
group_key_config.cell_tag,
);
records.push(DecodedRecord::from_raw_bytes(raw, key));
} else {
let mut reader = noodles::bam::io::Reader::from(&batch.buffer[start..end]);
let mut record = RecordBuf::default();
reader.read_record_buf(&header, &mut record)?;
let key = compute_group_key(
&record,
&group_key_config.library_index,
group_key_config.cell_tag,
);
records.push(DecodedRecord::new(record, key));
}
}
Ok(records)
}
fn compute_group_key_from_raw(
raw: &[u8],
library_index: &LibraryIndex,
cell_tag: Option<noodles::sam::alignment::record::data::field::Tag>,
) -> super::base::GroupKey {
use super::base::GroupKey;
let name = bam_fields::read_name(raw);
let name_hash = if name.is_empty() {
LibraryIndex::hash_name(None)
} else {
LibraryIndex::hash_name(Some(name))
};
let flg = bam_fields::flags(raw);
let is_secondary = (flg & bam_fields::flags::SECONDARY) != 0;
let is_supplementary = (flg & bam_fields::flags::SUPPLEMENTARY) != 0;
if is_secondary || is_supplementary {
return GroupKey { name_hash, ..GroupKey::default() };
}
let reverse = (flg & bam_fields::flags::REVERSE) != 0;
let own_pos = bam_fields::unclipped_5prime_from_raw_bam(raw);
let own_ref_id = bam_fields::ref_id(raw);
let strand = u8::from(reverse);
let aux_data = bam_fields::aux_data_slice(raw);
let cell_tag_bytes = cell_tag.map_or([0u8; 2], |t| [t.as_ref()[0], t.as_ref()[1]]);
let aux_tags = bam_fields::extract_aux_string_tags(aux_data, &cell_tag_bytes);
let library_idx = if let Some(rg) = aux_tags.rg {
let rg_hash = LibraryIndex::hash_rg(rg);
library_index.get(rg_hash)
} else {
0
};
let cell_hash =
if let Some(cb) = aux_tags.cell { LibraryIndex::hash_cell_barcode(Some(cb)) } else { 0 };
let is_paired = (flg & bam_fields::flags::PAIRED) != 0;
if !is_paired {
return GroupKey::single(own_ref_id, own_pos, strand, library_idx, cell_hash, name_hash);
}
let mate_unmapped = (flg & bam_fields::flags::MATE_UNMAPPED) != 0;
let mate_reverse = (flg & bam_fields::flags::MATE_REVERSE) != 0;
let mate_strand = u8::from(mate_reverse);
let raw_mate_ref_id = bam_fields::mate_ref_id(raw);
let raw_mate_pos = bam_fields::mate_pos(raw);
let mate_pos_result = if mate_unmapped {
None
} else {
aux_tags
.mc
.map(|mc| bam_fields::mate_unclipped_5prime_1based(raw_mate_pos, mate_reverse, mc))
};
match mate_pos_result {
Some(mp) => GroupKey::paired(
own_ref_id,
own_pos,
strand,
raw_mate_ref_id,
mp,
mate_strand,
library_idx,
cell_hash,
name_hash,
),
None => {
GroupKey::single(own_ref_id, own_pos, strand, library_idx, cell_hash, name_hash)
}
}
}
pub struct BamPipelineState<G, P: MemoryEstimate> {
pub config: PipelineConfig,
pub input_file: Mutex<Option<Box<dyn Read + Send>>>,
pub read_done: AtomicBool,
pub next_read_serial: AtomicU64,
pub q1_raw_blocks: ArrayQueue<(u64, RawBlockBatch)>,
#[cfg(feature = "memory-debug")]
pub q1_heap_bytes: AtomicU64,
pub decompress_done: AtomicBool,
pub batches_decompressed: AtomicU64,
pub q2_decompressed: ArrayQueue<(u64, DecompressedBatch)>,
pub q2_reorder: Mutex<ReorderBuffer<DecompressedBatch>>,
pub q2_reorder_state: ReorderBufferState,
pub boundary_state: Mutex<BoundaryState>,
pub boundary_done: AtomicBool,
pub next_boundary_serial: AtomicU64,
pub batches_boundary_found: AtomicU64,
pub batches_boundary_processed: AtomicU64,
pub q2b_boundaries: ArrayQueue<(u64, BoundaryBatch)>,
pub decode_done: AtomicBool,
pub batches_decoded: AtomicU64,
pub group_key_config: GroupKeyConfig,
pub q3_decoded: ArrayQueue<(u64, Vec<DecodedRecord>)>,
pub q3_reorder: Mutex<ReorderBuffer<Vec<DecodedRecord>>>,
pub q3_reorder_state: ReorderBufferState,
pub q3_reorder_can_pop: AtomicBool,
pub group_done: AtomicBool,
pub next_group_serial: AtomicU64,
pub batches_grouped: AtomicU64,
pub output: OutputPipelineQueues<G, P>,
pub deadlock_state: DeadlockState,
}
impl<G: Send, P: Send + MemoryEstimate> BamPipelineState<G, P> {
#[must_use]
pub fn new(
config: PipelineConfig,
input: Box<dyn Read + Send>,
output: Box<dyn Write + Send>,
group_key_config: GroupKeyConfig,
) -> Self {
let cap = config.queue_capacity;
let memory_limit = config.queue_memory_limit;
let stats = if config.collect_stats {
config.shared_stats.clone().or_else(|| Some(Arc::new(PipelineStats::new())))
} else {
None
};
let boundary_state = if config.header_already_read {
BoundaryState::new_no_header()
} else {
BoundaryState::new()
};
let deadlock_config =
DeadlockConfig::new(config.deadlock_timeout_secs, config.deadlock_recover_enabled);
let deadlock_state = DeadlockState::new(&deadlock_config, memory_limit);
Self {
config,
input_file: Mutex::new(Some(input)),
read_done: AtomicBool::new(false),
next_read_serial: AtomicU64::new(0),
q1_raw_blocks: ArrayQueue::new(cap),
#[cfg(feature = "memory-debug")]
q1_heap_bytes: AtomicU64::new(0),
decompress_done: AtomicBool::new(false),
batches_decompressed: AtomicU64::new(0),
q2_decompressed: ArrayQueue::new(cap),
q2_reorder: Mutex::new(ReorderBuffer::new()),
q2_reorder_state: ReorderBufferState::new(memory_limit),
boundary_state: Mutex::new(boundary_state),
boundary_done: AtomicBool::new(false),
next_boundary_serial: AtomicU64::new(0),
batches_boundary_found: AtomicU64::new(0),
batches_boundary_processed: AtomicU64::new(0),
q2b_boundaries: ArrayQueue::new(cap),
decode_done: AtomicBool::new(false),
batches_decoded: AtomicU64::new(0),
group_key_config,
q3_decoded: ArrayQueue::new(cap),
q3_reorder: Mutex::new(ReorderBuffer::new()),
q3_reorder_state: ReorderBufferState::new(memory_limit),
q3_reorder_can_pop: AtomicBool::new(false),
group_done: AtomicBool::new(false),
next_group_serial: AtomicU64::new(0),
batches_grouped: AtomicU64::new(0),
output: OutputPipelineQueues::new(
cap,
output,
stats,
"Processed records",
memory_limit,
),
deadlock_state,
}
}
pub fn set_error(&self, error: io::Error) {
self.output.set_error(error);
}
#[must_use]
pub fn has_error(&self) -> bool {
self.output.has_error()
}
pub fn take_error(&self) -> Option<io::Error> {
self.output.take_error()
}
#[must_use]
pub fn is_complete(&self) -> bool {
if !self.read_done.load(Ordering::Acquire) || !self.group_done.load(Ordering::Acquire) {
return false;
}
if !self.q1_raw_blocks.is_empty()
|| !self.q2_decompressed.is_empty()
|| !self.q2b_boundaries.is_empty()
|| !self.q3_decoded.is_empty()
{
return false;
}
let q2_empty = self.q2_reorder.lock().is_empty();
let q3_empty = self.q3_reorder.lock().is_empty();
if !q2_empty || !q3_empty {
return false;
}
self.output.are_queues_empty()
}
#[must_use]
pub fn queue_depths(&self) -> QueueDepths {
let output_depths = self.output.queue_depths();
QueueDepths {
q1: self.q1_raw_blocks.len(),
q2: self.q2_decompressed.len(),
q2b: self.q2b_boundaries.len(),
q3: self.q3_decoded.len(),
q4: output_depths.groups,
q5: output_depths.processed,
q6: output_depths.serialized,
q7: output_depths.compressed,
}
}
#[must_use]
pub fn can_decompress_proceed(&self, serial: u64) -> bool {
self.q2_reorder_state.can_proceed(serial)
}
#[must_use]
pub fn can_decode_proceed(&self, serial: u64) -> bool {
self.q3_reorder_state.can_proceed(serial)
}
#[must_use]
pub fn is_memory_high(&self) -> bool {
self.q3_reorder_state.is_memory_high()
}
#[must_use]
pub fn is_memory_drained(&self) -> bool {
self.q3_reorder_state.is_memory_drained()
}
#[must_use]
pub fn is_q5_memory_high(&self) -> bool {
self.output.is_processed_memory_high()
}
#[must_use]
pub fn is_draining(&self) -> bool {
self.output.is_draining()
}
#[must_use]
pub fn stats(&self) -> Option<&PipelineStats> {
self.output.stats.as_deref()
}
#[must_use]
pub fn progress(&self) -> &ProgressTracker {
&self.output.progress
}
#[must_use]
pub fn items_written(&self) -> u64 {
self.output.items_written.load(Ordering::Relaxed)
}
pub fn set_draining(&self, value: bool) {
self.output.set_draining(value);
}
pub fn flush_output(&self) -> io::Result<()> {
if let Some(mut writer) = self.output.output.lock().take() {
writer.flush()?;
writer.write_all(&BGZF_EOF)?;
writer.flush()?;
}
Ok(())
}
pub fn validate_completion(&self) -> Result<(), PipelineValidationError> {
let mut non_empty_queues = Vec::new();
let mut counter_mismatches = Vec::new();
if !self.q1_raw_blocks.is_empty() {
non_empty_queues.push(format!("q1_raw_blocks ({})", self.q1_raw_blocks.len()));
}
if !self.q2_decompressed.is_empty() {
non_empty_queues.push(format!("q2_decompressed ({})", self.q2_decompressed.len()));
}
if !self.q2b_boundaries.is_empty() {
non_empty_queues.push(format!("q2b_boundaries ({})", self.q2b_boundaries.len()));
}
if !self.q3_decoded.is_empty() {
non_empty_queues.push(format!("q3_decoded ({})", self.q3_decoded.len()));
}
if !self.output.groups.is_empty() {
non_empty_queues.push(format!("q4_groups ({})", self.output.groups.len()));
}
if !self.output.processed.is_empty() {
non_empty_queues.push(format!("q5_processed ({})", self.output.processed.len()));
}
if !self.output.serialized.is_empty() {
non_empty_queues.push(format!("q6_serialized ({})", self.output.serialized.len()));
}
if !self.output.compressed.is_empty() {
non_empty_queues.push(format!("q7_compressed ({})", self.output.compressed.len()));
}
{
let q2_reorder = self.q2_reorder.lock();
if !q2_reorder.is_empty() {
non_empty_queues.push(format!("q2_reorder ({})", q2_reorder.len()));
}
}
{
let q3_reorder = self.q3_reorder.lock();
if !q3_reorder.is_empty() {
non_empty_queues.push(format!("q3_reorder ({})", q3_reorder.len()));
}
}
{
let write_reorder = self.output.write_reorder.lock();
if !write_reorder.is_empty() {
non_empty_queues.push(format!("write_reorder ({})", write_reorder.len()));
}
}
let total_read = self.next_read_serial.load(Ordering::Acquire);
let batches_decompressed = self.batches_decompressed.load(Ordering::Acquire);
let batches_boundary_processed = self.batches_boundary_processed.load(Ordering::Acquire);
let batches_boundary_found = self.batches_boundary_found.load(Ordering::Acquire);
let batches_decoded = self.batches_decoded.load(Ordering::Acquire);
let batches_grouped = self.batches_grouped.load(Ordering::Acquire);
if batches_decompressed != total_read {
counter_mismatches.push(format!(
"batches_decompressed ({batches_decompressed}) != total_read ({total_read})"
));
}
if batches_boundary_processed != total_read {
counter_mismatches.push(format!(
"batches_boundary_processed ({batches_boundary_processed}) != total_read ({total_read})"
));
}
if batches_decoded != batches_boundary_found {
counter_mismatches.push(format!(
"batches_decoded ({batches_decoded}) != batches_boundary_found ({batches_boundary_found})"
));
}
if batches_grouped != batches_boundary_found {
counter_mismatches.push(format!(
"batches_grouped ({batches_grouped}) != batches_boundary_found ({batches_boundary_found})"
));
}
let leaked_heap_bytes = 0u64;
if !non_empty_queues.is_empty() || !counter_mismatches.is_empty() {
return Err(PipelineValidationError {
non_empty_queues,
counter_mismatches,
leaked_heap_bytes,
});
}
Ok(())
}
}
impl<G: Send + 'static, P: Send + MemoryEstimate + 'static> PipelineLifecycle
for BamPipelineState<G, P>
{
fn is_complete(&self) -> bool {
BamPipelineState::is_complete(self)
}
fn has_error(&self) -> bool {
BamPipelineState::has_error(self)
}
fn take_error(&self) -> Option<io::Error> {
BamPipelineState::take_error(self)
}
fn set_error(&self, error: io::Error) {
BamPipelineState::set_error(self, error);
}
fn is_draining(&self) -> bool {
BamPipelineState::is_draining(self)
}
fn set_draining(&self, value: bool) {
BamPipelineState::set_draining(self, value);
}
fn stats(&self) -> Option<&PipelineStats> {
BamPipelineState::stats(self)
}
fn progress(&self) -> &ProgressTracker {
BamPipelineState::progress(self)
}
fn items_written(&self) -> u64 {
BamPipelineState::items_written(self)
}
fn flush_output(&self) -> io::Result<()> {
BamPipelineState::flush_output(self)
}
fn validate_completion(&self) -> Result<(), PipelineValidationError> {
BamPipelineState::validate_completion(self)
}
}
impl<G: Send + 'static, P: Send + MemoryEstimate + 'static> MonitorableState
for BamPipelineState<G, P>
{
fn deadlock_state(&self) -> &DeadlockState {
&self.deadlock_state
}
fn build_queue_snapshot(&self) -> QueueSnapshot {
let q2_reorder_mem = {
let reorder = self.q2_reorder.lock();
reorder.total_heap_size() as u64
};
let q3_reorder_mem = {
let reorder = self.q3_reorder.lock();
reorder.total_heap_size() as u64
};
QueueSnapshot {
q1_len: self.q1_raw_blocks.len(),
q2_len: self.q2_decompressed.len(),
q2b_len: self.q2b_boundaries.len(),
q3_len: self.q3_decoded.len(),
q4_len: self.output.groups.len(),
q5_len: self.output.processed.len(),
q6_len: self.output.serialized.len(),
q7_len: self.output.compressed.len(),
q2_reorder_mem,
q3_reorder_mem,
memory_limit: self.deadlock_state.get_memory_limit(),
read_done: self.read_done.load(Ordering::Relaxed),
group_done: self.group_done.load(Ordering::Relaxed),
draining: self.output.draining.load(Ordering::Relaxed),
extra_state: None,
}
}
}
impl<G: Send + 'static, P: Send + MemoryEstimate + 'static> OutputPipelineState
for BamPipelineState<G, P>
{
type Processed = P;
fn has_error(&self) -> bool {
self.output.has_error()
}
fn set_error(&self, error: io::Error) {
self.output.set_error(error);
}
fn q5_pop(&self) -> Option<(u64, SerializedBatch)> {
self.output.serialized.pop()
}
fn q5_push(&self, item: (u64, SerializedBatch)) -> Result<(), (u64, SerializedBatch)> {
self.output.serialized.push(item)
}
fn q5_is_full(&self) -> bool {
self.output.serialized.is_full()
}
fn q5_track_pop(&self, heap_size: u64) {
self.output.serialized_heap_bytes.fetch_sub(heap_size, Ordering::AcqRel);
}
fn q6_pop(&self) -> Option<(u64, CompressedBlockBatch)> {
self.output.compressed.pop()
}
fn q6_push(
&self,
item: (u64, CompressedBlockBatch),
) -> Result<(), (u64, CompressedBlockBatch)> {
let heap_size = item.1.estimate_heap_size();
let result = self.output.compressed.push(item);
if result.is_ok() {
self.output.compressed_heap_bytes.fetch_add(heap_size as u64, Ordering::AcqRel);
}
result
}
fn q6_is_full(&self) -> bool {
self.output.compressed.is_full()
}
fn q6_track_pop(&self, heap_size: u64) {
self.output.compressed_heap_bytes.fetch_sub(heap_size, Ordering::AcqRel);
}
fn q6_reorder_insert(&self, serial: u64, batch: CompressedBlockBatch) {
self.output.write_reorder.lock().insert(serial, batch);
}
fn q6_reorder_try_pop_next(&self) -> Option<CompressedBlockBatch> {
self.output.write_reorder.lock().try_pop_next()
}
fn output_try_lock(
&self,
) -> Option<parking_lot::MutexGuard<'_, Option<Box<dyn Write + Send>>>> {
self.output.output.try_lock()
}
fn increment_written(&self) -> u64 {
self.output.items_written.fetch_add(1, Ordering::Release)
}
fn record_compressed_bytes_out(&self, bytes: u64) {
if let Some(ref stats) = self.output.stats {
stats.compressed_bytes_out.fetch_add(bytes, Ordering::Relaxed);
}
}
fn record_q6_pop_progress(&self) {
self.deadlock_state.record_q6_pop();
}
fn record_q7_push_progress(&self) {
self.deadlock_state.record_q7_push();
}
fn write_reorder_can_proceed(&self, serial: u64) -> bool {
self.output.write_reorder_state.can_proceed(serial)
}
fn write_reorder_is_memory_high(&self) -> bool {
self.output.write_reorder_state.is_memory_high()
}
fn stats(&self) -> Option<&PipelineStats> {
self.output.stats.as_deref()
}
}
impl<G: Send + MemoryEstimate + 'static, P: Send + MemoryEstimate + 'static>
ProcessPipelineState<G, P> for BamPipelineState<G, P>
{
fn process_input_pop(&self) -> Option<(u64, Vec<G>)> {
let result = self.output.groups.pop();
if result.is_some() {
self.deadlock_state.record_q4_pop();
}
result
}
fn process_output_is_full(&self) -> bool {
self.output.processed.is_full()
}
fn process_output_push(&self, item: (u64, Vec<P>)) -> Result<(), (u64, Vec<P>)> {
let heap_size: usize = item.1.iter().map(MemoryEstimate::estimate_heap_size).sum();
let result = self.output.processed.push(item);
if result.is_ok() {
self.output.processed_heap_bytes.fetch_add(heap_size as u64, Ordering::AcqRel);
self.deadlock_state.record_q5_push();
}
result
}
fn has_error(&self) -> bool {
self.output.has_error()
}
fn set_error(&self, error: io::Error) {
self.output.set_error(error);
}
fn should_apply_process_backpressure(&self) -> bool {
self.output.should_apply_process_backpressure()
}
fn is_draining(&self) -> bool {
self.output.is_draining()
}
}
impl<G: Send + 'static, P: Send + MemoryEstimate + 'static> SerializePipelineState<P>
for BamPipelineState<G, P>
{
fn serialize_input_pop(&self) -> Option<(u64, Vec<P>)> {
let result = self.output.processed.pop();
if let Some((_, ref batch)) = result {
let heap_size: usize = batch.iter().map(MemoryEstimate::estimate_heap_size).sum();
self.output.processed_heap_bytes.fetch_sub(heap_size as u64, Ordering::AcqRel);
self.deadlock_state.record_q5_pop();
}
result
}
fn serialize_output_is_full(&self) -> bool {
self.output.serialized.is_full()
}
fn serialize_output_push(
&self,
item: (u64, SerializedBatch),
) -> Result<(), (u64, SerializedBatch)> {
let heap_size = item.1.estimate_heap_size();
let result = self.output.serialized.push(item);
if result.is_ok() {
self.output.serialized_heap_bytes.fetch_add(heap_size as u64, Ordering::AcqRel);
self.deadlock_state.record_q6_push();
}
result
}
fn has_error(&self) -> bool {
self.output.has_error()
}
fn set_error(&self, error: io::Error) {
self.output.set_error(error);
}
fn record_serialized_bytes(&self, bytes: u64) {
if let Some(ref stats) = self.output.stats {
stats.serialized_bytes.fetch_add(bytes, Ordering::Relaxed);
}
}
}
impl<G: Send + 'static, P: Send + MemoryEstimate + 'static> WritePipelineState
for BamPipelineState<G, P>
{
fn write_input_queue(&self) -> &ArrayQueue<(u64, CompressedBlockBatch)> {
&self.output.compressed
}
fn write_reorder_buffer(&self) -> &Mutex<ReorderBuffer<CompressedBlockBatch>> {
&self.output.write_reorder
}
fn write_reorder_state(&self) -> &super::base::ReorderBufferState {
&self.output.write_reorder_state
}
fn write_output(&self) -> &Mutex<Option<Box<dyn Write + Send>>> {
&self.output.output
}
fn has_error(&self) -> bool {
self.output.has_error()
}
fn set_error(&self, error: io::Error) {
self.output.set_error(error);
}
fn record_written(&self, count: u64) {
self.output.items_written.fetch_add(count, Ordering::Release);
}
fn stats(&self) -> Option<&PipelineStats> {
self.output.stats.as_deref()
}
}
#[derive(Debug, Clone, Copy)]
pub struct QueueDepths {
pub q1: usize,
pub q2: usize,
pub q2b: usize,
pub q3: usize,
pub q4: usize,
pub q5: usize,
pub q6: usize,
pub q7: usize,
}
impl QueueDepths {
#[inline]
#[must_use]
pub fn has_input_for_step(&self, step: PipelineStep) -> bool {
match step {
PipelineStep::Read => true, PipelineStep::Decompress => self.q1 > 0,
PipelineStep::FindBoundaries => self.q2 > 0,
PipelineStep::Decode => self.q2b > 0,
PipelineStep::Group => self.q3 > 0,
PipelineStep::Process => self.q4 > 0,
PipelineStep::Serialize => self.q5 > 0,
PipelineStep::Compress => self.q6 > 0,
PipelineStep::Write => self.q7 > 0,
}
}
}
pub trait Grouper: Send {
type Group: Send;
fn add_records(&mut self, records: Vec<DecodedRecord>) -> io::Result<Vec<Self::Group>>;
fn finish(&mut self) -> io::Result<Option<Self::Group>>;
fn has_pending(&self) -> bool;
}
pub struct GroupState<G: Send> {
pub grouper: Box<dyn Grouper<Group = G> + Send>,
finished: bool,
pending_groups: VecDeque<G>,
pending_weight: usize,
}
impl<G: Send> GroupState<G> {
#[must_use]
pub fn new(grouper: Box<dyn Grouper<Group = G> + Send>) -> Self {
Self { grouper, finished: false, pending_groups: VecDeque::new(), pending_weight: 0 }
}
#[must_use]
pub fn has_pending_output(&self) -> bool {
!self.pending_groups.is_empty()
}
pub fn process(&mut self, records: Vec<DecodedRecord>) -> io::Result<Vec<G>> {
self.grouper.add_records(records)
}
pub fn finish(&mut self) -> io::Result<Option<G>> {
if self.finished {
return Ok(None);
}
self.finished = true;
self.grouper.finish()
}
#[must_use]
pub fn is_finished(&self) -> bool {
self.finished
}
#[must_use]
pub fn has_pending(&self) -> bool {
self.grouper.has_pending()
}
}
#[allow(clippy::type_complexity)]
pub struct PipelineFunctions<G: Send, P: Send> {
pub process_fn: Box<dyn Fn(G) -> io::Result<P> + Send + Sync>,
pub serialize_fn: Box<dyn Fn(P, &mut Vec<u8>) -> io::Result<u64> + Send + Sync>,
pub secondary_serialize_fn:
Option<Box<dyn Fn(&P, &mut Vec<u8>) -> io::Result<u64> + Send + Sync>>,
}
impl<G: Send, P: Send> PipelineFunctions<G, P> {
pub fn new<ProcessFn, SerializeFn>(process_fn: ProcessFn, serialize_fn: SerializeFn) -> Self
where
ProcessFn: Fn(G) -> io::Result<P> + Send + Sync + 'static,
SerializeFn: Fn(P, &mut Vec<u8>) -> io::Result<u64> + Send + Sync + 'static,
{
Self {
process_fn: Box::new(process_fn),
serialize_fn: Box::new(serialize_fn),
secondary_serialize_fn: None,
}
}
#[must_use]
pub fn with_secondary_serialize<F>(mut self, f: F) -> Self
where
F: Fn(&P, &mut Vec<u8>) -> io::Result<u64> + Send + Sync + 'static,
{
self.secondary_serialize_fn = Some(Box::new(f));
self
}
}
const DECOMPRESSION_BUFFER_CAPACITY: usize = 256 * 1024; const SERIALIZATION_BUFFER_CAPACITY: usize = 64 * 1024;
pub struct WorkerState<P: Send> {
pub core: WorkerCoreState,
pub decompressor: libdeflater::Decompressor,
pub decompression_buffer: Vec<u8>,
pub held_raw: Option<(u64, RawBlockBatch)>,
pub held_decompressed: Option<(u64, DecompressedBatch, usize)>,
pub held_boundaries: Option<(u64, BoundaryBatch)>,
pub held_decoded: Option<(u64, Vec<DecodedRecord>, usize)>,
pub held_processed: Option<(u64, Vec<P>, usize)>,
pub held_serialized: Option<(u64, SerializedBatch, usize)>,
pub held_compressed: Option<(u64, CompressedBlockBatch, usize)>,
}
impl<P: Send> WorkerState<P> {
#[must_use]
pub fn new(
compression_level: u32,
thread_id: usize,
num_threads: usize,
scheduler_strategy: SchedulerStrategy,
) -> Self {
Self {
core: WorkerCoreState::new(
compression_level,
thread_id,
num_threads,
scheduler_strategy,
ActiveSteps::all(),
),
decompressor: libdeflater::Decompressor::new(),
decompression_buffer: Vec::with_capacity(DECOMPRESSION_BUFFER_CAPACITY),
held_raw: None,
held_decompressed: None,
held_boundaries: None,
held_decoded: None,
held_processed: None,
held_serialized: None,
held_compressed: None,
}
}
#[inline]
#[must_use]
pub fn has_any_held_items(&self) -> bool {
self.held_raw.is_some()
|| self.held_decompressed.is_some()
|| self.held_boundaries.is_some()
|| self.held_decoded.is_some()
|| self.held_processed.is_some()
|| self.held_serialized.is_some()
|| self.held_compressed.is_some()
}
pub fn clear_held_items(&mut self) {
self.held_raw = None;
self.held_decompressed = None;
self.held_boundaries = None;
self.held_decoded = None;
self.held_processed = None;
self.held_serialized = None;
self.held_compressed = None;
}
}
impl<P: Send> HasCompressor for WorkerState<P> {
fn compressor_mut(&mut self) -> &mut InlineBgzfCompressor {
&mut self.core.compressor
}
}
impl<P: Send> HasRecycledBuffers for WorkerState<P> {
fn take_or_alloc_buffer(&mut self, capacity: usize) -> Vec<u8> {
self.core.take_or_alloc_buffer(capacity)
}
fn recycle_buffer(&mut self, buf: Vec<u8>) {
self.core.recycle_buffer(buf);
}
}
impl<P: Send> HasHeldCompressed for WorkerState<P> {
fn held_compressed_mut(&mut self) -> &mut Option<(u64, CompressedBlockBatch, usize)> {
&mut self.held_compressed
}
}
impl<P: Send> HasHeldBoundaries<BoundaryBatch> for WorkerState<P> {
fn held_boundaries_mut(&mut self) -> &mut Option<(u64, BoundaryBatch)> {
&mut self.held_boundaries
}
}
impl<P: Send> HasHeldProcessed<P> for WorkerState<P> {
fn held_processed_mut(&mut self) -> &mut Option<(u64, Vec<P>, usize)> {
&mut self.held_processed
}
}
impl<P: Send> HasHeldSerialized for WorkerState<P> {
fn held_serialized_mut(&mut self) -> &mut Option<(u64, SerializedBatch, usize)> {
&mut self.held_serialized
}
fn serialization_buffer_mut(&mut self) -> &mut Vec<u8> {
&mut self.core.serialization_buffer
}
fn serialization_buffer_capacity(&self) -> usize {
SERIALIZATION_BUFFER_CAPACITY }
}
impl<P: Send> WorkerStateCommon for WorkerState<P> {
fn has_any_held_items(&self) -> bool {
WorkerState::has_any_held_items(self)
}
fn clear_held_items(&mut self) {
WorkerState::clear_held_items(self);
}
}
impl<P: Send> HasWorkerCore for WorkerState<P> {
fn core(&self) -> &WorkerCoreState {
&self.core
}
fn core_mut(&mut self) -> &mut WorkerCoreState {
&mut self.core
}
}
fn try_step_read<G: Send, P: Send + MemoryEstimate>(
state: &BamPipelineState<G, P>,
worker: &mut WorkerState<P>,
) -> bool {
if let Some((serial, held)) = worker.held_raw.take() {
#[cfg(feature = "memory-debug")]
let q1_bytes = held.total_compressed_size() as u64;
match state.q1_raw_blocks.push((serial, held)) {
Ok(()) => {
#[cfg(feature = "memory-debug")]
state.q1_heap_bytes.fetch_add(q1_bytes, Ordering::Relaxed);
state.deadlock_state.record_q1_push();
}
Err((serial, held)) => {
worker.held_raw = Some((serial, held));
return false;
}
}
}
if state.read_done.load(Ordering::Relaxed) {
return false;
}
if state.q1_raw_blocks.len() >= state.config.queue_capacity {
return false;
}
let Some(mut guard) = state.input_file.try_lock() else {
if let Some(stats) = state.stats() {
stats.record_contention(PipelineStep::Read);
}
return false; };
let Some(ref mut reader) = *guard else {
return false; };
match read_raw_blocks(reader.as_mut(), state.config.blocks_per_read_batch) {
Ok(blocks) if blocks.is_empty() => {
state.read_done.store(true, Ordering::SeqCst);
false
}
Ok(blocks) => {
let serial = state.next_read_serial.fetch_add(1, Ordering::SeqCst);
let batch = RawBlockBatch { blocks };
if let Some(stats) = state.stats() {
stats.bytes_read.fetch_add(batch.total_compressed_size() as u64, Ordering::Relaxed);
}
#[cfg(feature = "memory-debug")]
let q1_bytes = batch.total_compressed_size() as u64;
match state.q1_raw_blocks.push((serial, batch)) {
Ok(()) => {
#[cfg(feature = "memory-debug")]
state.q1_heap_bytes.fetch_add(q1_bytes, Ordering::Relaxed);
state.deadlock_state.record_q1_push();
true
}
Err((serial, batch)) => {
worker.held_raw = Some((serial, batch));
false
}
}
}
Err(e) => {
state.set_error(e);
false
}
}
}
fn try_step_decompress<G: Send, P: Send + MemoryEstimate>(
state: &BamPipelineState<G, P>,
worker: &mut WorkerState<P>,
) -> bool {
let mut advanced_held = false;
if let Some((serial, held, heap_size)) = worker.held_decompressed.take() {
state.q2_reorder_state.add_heap_bytes(heap_size as u64);
match state.q2_decompressed.push((serial, held)) {
Ok(()) => {
state.batches_decompressed.fetch_add(1, Ordering::Release);
state.deadlock_state.record_q2_push();
advanced_held = true;
}
Err((serial, held)) => {
state.q2_reorder_state.sub_heap_bytes(heap_size as u64);
worker.held_decompressed = Some((serial, held, heap_size));
return false;
}
}
}
if state.q2_decompressed.is_full() || state.q2_reorder_state.is_memory_high() {
return advanced_held;
}
let Some((serial, raw_batch)) = state.q1_raw_blocks.pop() else {
if let Some(stats) = state.stats() {
stats.record_queue_empty(1);
}
return advanced_held;
};
#[cfg(feature = "memory-debug")]
{
let q1_pop_bytes = raw_batch.total_compressed_size() as u64;
state.q1_heap_bytes.fetch_sub(q1_pop_bytes, Ordering::Relaxed);
}
state.deadlock_state.record_q1_pop();
worker.decompression_buffer.clear();
let expected_size = raw_batch.total_uncompressed_size();
worker.decompression_buffer.reserve(expected_size);
for block in &raw_batch.blocks {
if let Err(e) =
decompress_block_into(block, &mut worker.decompressor, &mut worker.decompression_buffer)
{
state.set_error(e);
return false;
}
}
let decompressed = std::mem::replace(
&mut worker.decompression_buffer,
Vec::with_capacity(DECOMPRESSION_BUFFER_CAPACITY),
);
if let Some(stats) = state.stats() {
stats
.compressed_bytes_in
.fetch_add(raw_batch.total_compressed_size() as u64, Ordering::Relaxed);
stats.decompressed_bytes.fetch_add(decompressed.len() as u64, Ordering::Relaxed);
}
let batch = DecompressedBatch { data: decompressed };
let heap_size = batch.estimate_heap_size();
state.q2_reorder_state.add_heap_bytes(heap_size as u64);
match state.q2_decompressed.push((serial, batch)) {
Ok(()) => {
state.batches_decompressed.fetch_add(1, Ordering::Release);
state.deadlock_state.record_q2_push();
true
}
Err((serial, batch)) => {
state.q2_reorder_state.sub_heap_bytes(heap_size as u64);
worker.held_decompressed = Some((serial, batch, heap_size));
false
}
}
}
#[allow(clippy::too_many_lines)]
fn try_step_find_boundaries<G: Send, P: Send + MemoryEstimate>(
state: &BamPipelineState<G, P>,
worker: &mut WorkerState<P>,
) -> (bool, bool) {
const MAX_BATCHES_PER_LOCK: usize = 8;
let mut did_work = false;
if let Some((serial, held)) = worker.held_boundaries.take() {
match state.q2b_boundaries.push((serial, held)) {
Ok(()) => {
state.batches_boundary_found.fetch_add(1, Ordering::Release);
state.deadlock_state.record_q2b_push();
did_work = true;
}
Err((serial, held)) => {
worker.held_boundaries = Some((serial, held));
return (false, false); }
}
}
if state.q2b_boundaries.is_full() {
return (false, false); }
let Some(mut boundary_guard) = state.boundary_state.try_lock() else {
if let Some(stats) = state.stats() {
stats.record_contention(PipelineStep::FindBoundaries);
}
return (did_work, true); };
for _ in 0..MAX_BATCHES_PER_LOCK {
if state.q2b_boundaries.is_full() {
break;
}
let batch_with_size = {
let mut reorder = state.q2_reorder.lock();
while let Some((serial, batch)) = state.q2_decompressed.pop() {
state.deadlock_state.record_q2_pop();
let heap_size = batch.estimate_heap_size();
reorder.insert_with_size(serial, batch, heap_size);
}
let result = reorder.try_pop_next_with_size();
state.q2_reorder_state.update_next_seq(reorder.next_seq());
result
};
let Some((batch, heap_size)) = batch_with_size else {
if !did_work {
if let Some(stats) = state.stats() {
stats.record_queue_empty(2);
}
}
break; };
state.q2_reorder_state.sub_heap_bytes(heap_size as u64);
state.batches_boundary_processed.fetch_add(1, Ordering::Release);
match boundary_guard.find_boundaries(&batch.data) {
Ok(boundary_batch) => {
if boundary_batch.offsets.len() > 1 {
let num_records = boundary_batch.offsets.len() - 1;
if let Some(stats) = state.stats() {
stats.record_batch_size(num_records);
}
let serial = state.next_boundary_serial.fetch_add(1, Ordering::SeqCst);
match state.q2b_boundaries.push((serial, boundary_batch)) {
Ok(()) => {
state.batches_boundary_found.fetch_add(1, Ordering::Release);
state.deadlock_state.record_q2b_push();
}
Err((serial, boundary_batch)) => {
worker.held_boundaries = Some((serial, boundary_batch));
return (true, false); }
}
}
did_work = true;
}
Err(e) => {
state.set_error(e);
return (false, false);
}
}
}
if did_work {
return (true, false); }
let read_done = state.read_done.load(Ordering::Acquire);
let total_read = state.next_read_serial.load(Ordering::Acquire);
let batches_boundary_processed = state.batches_boundary_processed.load(Ordering::Acquire);
if read_done
&& batches_boundary_processed == total_read
&& !state.boundary_done.load(Ordering::Acquire)
{
match boundary_guard.finish() {
Ok(Some(final_batch)) => {
if final_batch.offsets.len() > 1 {
let serial = state.next_boundary_serial.fetch_add(1, Ordering::SeqCst);
match state.q2b_boundaries.push((serial, final_batch)) {
Ok(()) => {
state.batches_boundary_found.fetch_add(1, Ordering::Release);
state.deadlock_state.record_q2b_push();
}
Err((serial, final_batch)) => {
worker.held_boundaries = Some((serial, final_batch));
return (true, false);
}
}
}
state.boundary_done.store(true, Ordering::SeqCst);
(true, false)
}
Ok(None) => {
state.boundary_done.store(true, Ordering::SeqCst);
(false, false)
}
Err(e) => {
state.set_error(e);
(false, false)
}
}
} else {
(false, false) }
}
fn try_step_decode<G: Send, P: Send + MemoryEstimate>(
state: &BamPipelineState<G, P>,
worker: &mut WorkerState<P>,
) -> bool {
let mut advanced_held = false;
if let Some((serial, held, heap_size)) = worker.held_decoded.take() {
state.q3_reorder_state.add_heap_bytes(heap_size as u64);
match state.q3_decoded.push((serial, held)) {
Ok(()) => {
state.batches_decoded.fetch_add(1, Ordering::Release);
state.deadlock_state.record_q3_push();
advanced_held = true;
}
Err((serial, held)) => {
state.q3_reorder_state.sub_heap_bytes(heap_size as u64);
worker.held_decoded = Some((serial, held, heap_size));
return false;
}
}
}
if state.q3_decoded.is_full() || state.q3_reorder_state.is_memory_high() {
return advanced_held;
}
let Some((serial, boundary_batch)) = state.q2b_boundaries.pop() else {
if let Some(stats) = state.stats() {
stats.record_queue_empty(25); }
if state.boundary_done.load(Ordering::SeqCst) && state.q2b_boundaries.is_empty() {
state.decode_done.store(true, Ordering::SeqCst);
} else if let Some(stats) = state.stats() {
stats.record_queue_empty(2);
}
return advanced_held;
};
state.deadlock_state.record_q2b_pop();
match decode_records(&boundary_batch, &state.group_key_config) {
Ok(records) => {
if let Some(stats) = state.stats() {
stats.records_decoded.fetch_add(records.len() as u64, Ordering::Relaxed);
}
let heap_size = records.estimate_heap_size();
state.q3_reorder_state.add_heap_bytes(heap_size as u64);
match state.q3_decoded.push((serial, records)) {
Ok(()) => {
state.batches_decoded.fetch_add(1, Ordering::Release);
state.deadlock_state.record_q3_push();
true
}
Err((serial, records)) => {
state.q3_reorder_state.sub_heap_bytes(heap_size as u64);
worker.held_decoded = Some((serial, records, heap_size));
false
}
}
}
Err(e) => {
state.set_error(e);
false
}
}
}
#[allow(clippy::too_many_lines)]
fn try_step_group<G: Send + BatchWeight + MemoryEstimate + 'static, P: Send + MemoryEstimate>(
state: &BamPipelineState<G, P>,
group_state: &Mutex<GroupState<G>>,
) -> (bool, bool) {
const MAX_BATCHES_PER_LOCK: usize = 8;
const MAX_PENDING_DRAIN: usize = 16;
let Some(mut guard) = group_state.try_lock() else {
if let Some(stats) = state.stats() {
stats.record_contention(PipelineStep::Group);
}
return (false, true); };
let batch_size = state.config.batch_size;
let target_weight = state.config.target_templates_per_batch;
let use_weight_batching = target_weight > 0;
let should_flush = |pending_len: usize, pending_weight: usize| -> bool {
if use_weight_batching {
pending_weight >= target_weight
} else {
pending_len >= batch_size
}
};
let push_batch = |groups: Vec<G>, state: &BamPipelineState<G, P>| -> Result<(), Vec<G>> {
if state.output.groups.is_full() {
return Err(groups);
}
#[cfg(feature = "memory-debug")]
{
let heap_size: u64 = groups.iter().map(|g| g.estimate_heap_size() as u64).sum();
state.output.groups_heap_bytes.fetch_add(heap_size, Ordering::AcqRel);
}
let serial = state.next_group_serial.fetch_add(1, Ordering::SeqCst);
state
.output
.groups
.push((serial, groups))
.unwrap_or_else(|_| panic!("groups push failed after is_full check"));
state.deadlock_state.record_q4_push();
Ok(())
};
let flush_all = |guard: &mut GroupState<G>, state: &BamPipelineState<G, P>| -> Option<bool> {
if guard.pending_groups.is_empty() {
return Some(true);
}
if state.output.groups.is_full() {
return None; }
let batch: Vec<G> = guard.pending_groups.drain(..).collect();
guard.pending_weight = 0;
match push_batch(batch, state) {
Ok(()) => Some(true),
Err(batch) => {
for group in batch.into_iter().rev() {
guard.pending_weight += group.batch_weight();
guard.pending_groups.push_front(group);
}
None
}
}
};
while should_flush(guard.pending_groups.len(), guard.pending_weight) {
if state.output.groups.is_full() {
return (false, false); }
if use_weight_batching {
let batch: Vec<G> = guard.pending_groups.drain(..).collect();
guard.pending_weight = 0;
if let Err(batch) = push_batch(batch, state) {
for group in batch.into_iter().rev() {
guard.pending_weight += group.batch_weight();
guard.pending_groups.push_front(group);
}
return (false, false);
}
break; }
let batch: Vec<G> = guard.pending_groups.drain(..batch_size).collect();
if let Err(batch) = push_batch(batch, state) {
for group in batch.into_iter().rev() {
guard.pending_weight += group.batch_weight();
guard.pending_groups.push_front(group);
}
return (false, false); }
}
if guard.is_finished() && !state.group_done.load(Ordering::SeqCst) {
if flush_all(&mut guard, state).is_some() {
state.group_done.store(true, Ordering::SeqCst);
return (true, false);
}
return (false, false); }
let mut did_work = false;
let mut pending: Vec<(u64, Vec<DecodedRecord>, usize)> = Vec::with_capacity(MAX_PENDING_DRAIN);
for _ in 0..MAX_BATCHES_PER_LOCK {
pending.clear();
while pending.len() < MAX_PENDING_DRAIN {
if let Some((serial, batch)) = state.q3_decoded.pop() {
state.deadlock_state.record_q3_pop();
let heap_size = batch.estimate_heap_size();
pending.push((serial, batch, heap_size));
} else {
break;
}
}
let records = {
let mut reorder = state.q3_reorder.lock();
for (serial, batch, heap_size) in pending.drain(..) {
reorder.insert_with_size(serial, batch, heap_size);
}
let result = reorder.try_pop_next_with_size();
state.q3_reorder_state.update_next_seq(reorder.next_seq());
state.q3_reorder_can_pop.store(reorder.can_pop(), Ordering::Release);
result
};
let Some((records, heap_size)) = records else {
if !did_work {
if let Some(stats) = state.stats() {
stats.record_queue_empty(3);
}
}
break; };
state.q3_reorder_state.sub_heap_bytes(heap_size as u64);
state.batches_grouped.fetch_add(1, Ordering::Release);
match guard.process(records) {
Ok(groups) => {
if let Some(stats) = state.stats() {
stats.groups_produced.fetch_add(groups.len() as u64, Ordering::Relaxed);
}
for group in groups {
guard.pending_weight += group.batch_weight();
guard.pending_groups.push_back(group);
}
while should_flush(guard.pending_groups.len(), guard.pending_weight) {
if state.output.groups.is_full() {
return (true, false); }
if use_weight_batching {
let batch: Vec<G> = guard.pending_groups.drain(..).collect();
guard.pending_weight = 0;
if let Err(batch) = push_batch(batch, state) {
for group in batch.into_iter().rev() {
guard.pending_weight += group.batch_weight();
guard.pending_groups.push_front(group);
}
return (true, false); }
break;
}
let batch: Vec<G> = guard.pending_groups.drain(..batch_size).collect();
if let Err(batch) = push_batch(batch, state) {
for group in batch.into_iter().rev() {
guard.pending_weight += group.batch_weight();
guard.pending_groups.push_front(group);
}
return (true, false); }
}
did_work = true;
}
Err(e) => {
state.set_error(e);
return (false, false);
}
}
}
if did_work {
return (true, false); }
if guard.is_finished() {
return (false, false); }
let boundary_done = state.boundary_done.load(Ordering::Acquire);
let total_boundary_batches = state.batches_boundary_found.load(Ordering::Acquire);
let batches_grouped = state.batches_grouped.load(Ordering::Acquire);
if boundary_done && batches_grouped == total_boundary_batches {
match guard.finish() {
Ok(Some(group)) => {
guard.pending_weight += group.batch_weight();
guard.pending_groups.push_back(group);
while should_flush(guard.pending_groups.len(), guard.pending_weight) {
if state.output.groups.is_full() {
return (true, false); }
if use_weight_batching {
let batch: Vec<G> = guard.pending_groups.drain(..).collect();
guard.pending_weight = 0;
if let Err(batch) = push_batch(batch, state) {
for group in batch.into_iter().rev() {
guard.pending_weight += group.batch_weight();
guard.pending_groups.push_front(group);
}
return (true, false); }
break;
}
let batch: Vec<G> = guard.pending_groups.drain(..batch_size).collect();
if let Err(batch) = push_batch(batch, state) {
for group in batch.into_iter().rev() {
guard.pending_weight += group.batch_weight();
guard.pending_groups.push_front(group);
}
return (true, false); }
}
if flush_all(&mut guard, state).is_some() {
state.group_done.store(true, Ordering::SeqCst);
}
(true, false) }
Ok(None) => {
if flush_all(&mut guard, state).is_some() {
state.group_done.store(true, Ordering::SeqCst);
}
(false, false) }
Err(e) => {
state.set_error(e);
(false, false)
}
}
} else {
(false, false) }
}
fn try_step_process<G: Send + MemoryEstimate + 'static, P: Send + MemoryEstimate + 'static>(
state: &BamPipelineState<G, P>,
fns: &PipelineFunctions<G, P>,
worker: &mut WorkerState<P>,
) -> bool {
const MAX_BATCHES: usize = 8;
if let Some((serial, held, heap_size)) = worker.held_processed.take() {
match state.output.processed.push((serial, held)) {
Ok(()) => {
state.output.processed_heap_bytes.fetch_add(heap_size as u64, Ordering::AcqRel);
state.deadlock_state.record_q5_push();
}
Err((serial, held)) => {
worker.held_processed = Some((serial, held, heap_size));
return false;
}
}
}
if state.output.processed.is_full() || state.is_q5_memory_high() {
return false;
}
let mut did_work = false;
for _ in 0..MAX_BATCHES {
if state.output.processed.is_full() || state.is_q5_memory_high() {
break;
}
let Some((serial, batch)) = state.output.groups.pop() else {
if let Some(stats) = state.stats() {
stats.record_queue_empty(4);
}
break;
};
state.deadlock_state.record_q4_pop();
#[cfg(feature = "memory-debug")]
{
let q4_heap: u64 = batch.iter().map(|g| g.estimate_heap_size() as u64).sum();
state.output.groups_heap_bytes.fetch_sub(q4_heap, Ordering::AcqRel);
}
let mut results: Vec<P> = Vec::with_capacity(batch.len());
for group in batch {
match (fns.process_fn)(group) {
Ok(processed) => results.push(processed),
Err(e) => {
state.set_error(e);
return false;
}
}
}
let heap_size: usize = results.iter().map(MemoryEstimate::estimate_heap_size).sum();
match state.output.processed.push((serial, results)) {
Ok(()) => {
state.output.processed_heap_bytes.fetch_add(heap_size as u64, Ordering::AcqRel);
state.deadlock_state.record_q5_push();
did_work = true;
}
Err((serial, results)) => {
worker.held_processed = Some((serial, results, heap_size));
break;
}
}
}
did_work
}
fn try_step_serialize<G: Send + 'static, P: Send + MemoryEstimate + 'static>(
state: &BamPipelineState<G, P>,
fns: &PipelineFunctions<G, P>,
worker: &mut WorkerState<P>,
) -> bool {
if let Some((serial, held, heap_size)) = worker.held_serialized.take() {
match state.output.serialized.push((serial, held)) {
Ok(()) => {
state.output.serialized_heap_bytes.fetch_add(heap_size as u64, Ordering::AcqRel);
state.deadlock_state.record_q6_push();
}
Err((serial, held)) => {
worker.held_serialized = Some((serial, held, heap_size));
return false;
}
}
}
if state.output.serialized.is_full() {
return false;
}
let Some((serial, batch)) = state.output.processed.pop() else {
if let Some(stats) = state.stats() {
stats.record_queue_empty(5);
}
return false;
};
state.deadlock_state.record_q5_pop();
let q5_heap_size: usize = batch.iter().map(MemoryEstimate::estimate_heap_size).sum();
state.output.processed_heap_bytes.fetch_sub(q5_heap_size as u64, Ordering::AcqRel);
worker.core.serialization_buffer.clear();
worker.core.secondary_serialization_buffer.clear();
let mut total_record_count: u64 = 0;
for item in batch {
if let Some(ref secondary_fn) = fns.secondary_serialize_fn {
if let Err(e) = (secondary_fn)(&item, &mut worker.core.secondary_serialization_buffer) {
state.set_error(e);
return false;
}
}
match (fns.serialize_fn)(item, &mut worker.core.serialization_buffer) {
Ok(record_count) => {
total_record_count += record_count;
}
Err(e) => {
state.set_error(e);
return false;
}
}
}
let combined_data = std::mem::replace(
&mut worker.core.serialization_buffer,
Vec::with_capacity(SERIALIZATION_BUFFER_CAPACITY),
);
let secondary_data = if worker.core.secondary_serialization_buffer.is_empty() {
None
} else {
Some(std::mem::replace(
&mut worker.core.secondary_serialization_buffer,
Vec::with_capacity(SERIALIZATION_BUFFER_CAPACITY),
))
};
if let Some(stats) = state.stats() {
stats.serialized_bytes.fetch_add(combined_data.len() as u64, Ordering::Relaxed);
}
let batch =
SerializedBatch { data: combined_data, record_count: total_record_count, secondary_data };
let heap_size = batch.estimate_heap_size();
match state.output.serialized.push((serial, batch)) {
Ok(()) => {
state.output.serialized_heap_bytes.fetch_add(heap_size as u64, Ordering::AcqRel);
state.deadlock_state.record_q6_push();
true
}
Err((serial, batch)) => {
worker.held_serialized = Some((serial, batch, heap_size));
false
}
}
}
fn try_step_compress<G: Send + 'static, P: Send + MemoryEstimate + 'static>(
state: &BamPipelineState<G, P>,
worker: &mut WorkerState<P>,
) -> bool {
shared_try_step_compress(state, worker).is_success()
}
fn try_step_write<G: Send + 'static, P: Send + MemoryEstimate + 'static>(
state: &BamPipelineState<G, P>,
) -> (bool, bool) {
let Some(mut guard) = state.output.output.try_lock() else {
if let Some(stats) = state.stats() {
stats.record_contention(PipelineStep::Write);
}
return (false, true); };
let Some(ref mut writer) = *guard else {
return (false, false); };
let mut wrote_any = false;
let q7_truly_empty;
{
let mut reorder = state.output.write_reorder.lock();
while let Some((serial, batch)) = state.output.compressed.pop() {
let q7_heap = batch.estimate_heap_size() as u64;
state.q6_track_pop(q7_heap);
state.deadlock_state.record_q7_pop();
let heap_size = batch.estimate_heap_size();
reorder.insert_with_size(serial, batch, heap_size);
state.output.write_reorder_state.add_heap_bytes(heap_size as u64);
}
while let Some((batch, heap_size)) = reorder.try_pop_next_with_size() {
let mut batch_bytes: u64 = 0;
for block in &batch.blocks {
if let Err(e) = writer.write_all(&block.data) {
state.set_error(e);
return (false, false); }
batch_bytes += block.data.len() as u64;
}
if let Some(ref secondary_data) = batch.secondary_data {
if !secondary_data.is_empty() {
if let Some(ref secondary_mutex) = state.output.secondary_output {
let mut sw_guard = secondary_mutex.lock();
if let Some(ref mut sw) = *sw_guard {
if let Err(e) = sw.write_raw_bytes(secondary_data) {
state.set_error(e);
return (false, false);
}
}
}
}
}
state.output.write_reorder_state.sub_heap_bytes(heap_size as u64);
state.output.write_reorder_state.update_next_seq(reorder.next_seq());
if let Some(stats) = state.stats() {
stats.bytes_written.fetch_add(batch_bytes, Ordering::Relaxed);
}
let records_in_batch = batch.record_count;
state.output.items_written.fetch_add(records_in_batch, Ordering::Relaxed);
state.output.progress.log_if_needed(records_in_batch);
wrote_any = true;
}
q7_truly_empty = reorder.is_empty();
}
if !wrote_any && q7_truly_empty {
if let Some(stats) = state.stats() {
stats.record_queue_empty(7);
}
}
(wrote_any, false) }
pub struct BamStepContext<'a, G: Send, P: Send + MemoryEstimate> {
pub state: &'a BamPipelineState<G, P>,
pub group_state: &'a Mutex<GroupState<G>>,
pub fns: &'a PipelineFunctions<G, P>,
pub is_reader: bool,
}
impl<G, P> StepContext for BamStepContext<'_, G, P>
where
G: Send + BatchWeight + MemoryEstimate + 'static,
P: Send + MemoryEstimate + 'static,
{
type Worker = WorkerState<P>;
fn execute_step(&self, worker: &mut Self::Worker, step: PipelineStep) -> (bool, bool) {
execute_step(self.state, self.group_state, self.fns, worker, step)
}
fn get_backpressure(&self, _worker: &Self::Worker) -> BackpressureState {
let depths = self.state.queue_depths();
let read_done = self.state.read_done.load(Ordering::Relaxed);
BackpressureState {
output_high: depths.q7 > self.state.config.output_high_water,
input_low: depths.q1 < self.state.config.input_low_water,
read_done,
memory_high: !self.state.is_draining() && self.state.is_memory_high(),
memory_drained: self.state.is_memory_drained(),
}
}
fn check_drain_mode(&self) {
let read_done = self.state.read_done.load(Ordering::Relaxed);
if read_done && self.state.q1_raw_blocks.is_empty() {
self.state.output.draining.store(true, Ordering::Relaxed);
}
}
fn has_error(&self) -> bool {
self.state.has_error()
}
fn is_complete(&self) -> bool {
self.state.is_complete()
}
fn stats(&self) -> Option<&PipelineStats> {
self.state.stats()
}
fn skip_read(&self) -> bool {
true
}
fn check_completion_at_end(&self) -> bool {
true }
fn should_attempt_sticky_read(&self) -> bool {
self.is_reader && !self.state.read_done.load(Ordering::Relaxed)
}
fn sticky_read_should_continue(&self) -> bool {
!self.state.has_error()
&& !self.state.read_done.load(Ordering::Relaxed)
&& self.state.q1_raw_blocks.len() < self.state.config.queue_capacity
}
fn execute_read_step(&self, worker: &mut Self::Worker) -> bool {
try_step_read(self.state, worker)
}
fn is_drain_mode(&self) -> bool {
let read_done = self.state.read_done.load(Ordering::Relaxed);
let group_done = self.state.group_done.load(Ordering::Relaxed);
read_done && group_done
}
fn should_attempt_step(
&self,
worker: &Self::Worker,
step: PipelineStep,
drain_mode: bool,
) -> bool {
worker.core.scheduler.should_attempt_step_with_drain(step, drain_mode)
}
fn exclusive_step_owned(&self, worker: &Self::Worker) -> Option<PipelineStep> {
if self.is_reader {
None
} else {
worker.core.scheduler.exclusive_step_owned()
}
}
}
fn execute_step<
G: Send + BatchWeight + MemoryEstimate + 'static,
P: Send + MemoryEstimate + 'static,
>(
state: &BamPipelineState<G, P>,
group_state: &Mutex<GroupState<G>>,
fns: &PipelineFunctions<G, P>,
worker: &mut WorkerState<P>,
step: PipelineStep,
) -> (bool, bool) {
match step {
PipelineStep::Read => (false, false), PipelineStep::Decompress => (try_step_decompress(state, worker), false),
PipelineStep::FindBoundaries => try_step_find_boundaries(state, worker),
PipelineStep::Decode => (try_step_decode(state, worker), false),
PipelineStep::Group => try_step_group(state, group_state),
PipelineStep::Process => (try_step_process(state, fns, worker), false),
PipelineStep::Serialize => (try_step_serialize(state, fns, worker), false),
PipelineStep::Compress => (try_step_compress(state, worker), false),
PipelineStep::Write => try_step_write(state),
}
}
struct SingleThreadedBuffers {
decompressed: Vec<u8>,
serialized: Vec<u8>,
secondary: Vec<u8>,
}
impl SingleThreadedBuffers {
fn new() -> Self {
Self {
decompressed: Vec::with_capacity(256 * 1024),
serialized: Vec::with_capacity(64 * 1024),
secondary: Vec::new(),
}
}
}
#[allow(clippy::needless_pass_by_value)]
fn run_bam_pipeline_single_threaded<G, P>(
config: &PipelineConfig,
mut input: Box<dyn Read + Send>,
mut output: Box<dyn Write + Send>,
mut grouper: Box<dyn Grouper<Group = G> + Send>,
fns: PipelineFunctions<G, P>,
group_key_config: GroupKeyConfig,
mut secondary_writer: Option<crate::bam_io::RawBamWriter>,
) -> io::Result<u64>
where
G: Send + 'static,
P: Send + MemoryEstimate + 'static,
{
let mut decompressor = libdeflater::Decompressor::new();
let mut boundary_state = if config.header_already_read {
BoundaryState::new_no_header()
} else {
BoundaryState::new()
};
let mut compressor = InlineBgzfCompressor::new(config.compression_level);
let mut buffers = SingleThreadedBuffers::new();
let progress = ProgressTracker::new("Processed records").with_interval(PROGRESS_LOG_INTERVAL);
loop {
let blocks = read_raw_blocks(input.as_mut(), 4)?; if blocks.is_empty() {
break; }
buffers.decompressed.clear();
let expected_size: usize =
blocks.iter().map(super::super::bgzf_reader::RawBgzfBlock::uncompressed_size).sum();
buffers.decompressed.reserve(expected_size);
for block in &blocks {
decompress_block_into(block, &mut decompressor, &mut buffers.decompressed)?;
}
let boundary_batch = boundary_state.find_boundaries(&buffers.decompressed)?;
if boundary_batch.offsets.len() > 1 {
let decoded = decode_records(&boundary_batch, &group_key_config)?;
let groups = grouper.add_records(decoded)?;
for group in groups {
let processed = (fns.process_fn)(group)?;
buffers.secondary.clear();
if let Some(ref secondary_fn) = fns.secondary_serialize_fn {
(secondary_fn)(&processed, &mut buffers.secondary)?;
}
buffers.serialized.clear();
let record_count = (fns.serialize_fn)(processed, &mut buffers.serialized)?;
compressor.write_all(&buffers.serialized)?;
compressor.maybe_compress()?;
compressor.write_blocks_to(output.as_mut())?;
if !buffers.secondary.is_empty() {
if let Some(ref mut sw) = secondary_writer {
sw.write_raw_bytes(&buffers.secondary)?;
}
}
progress.log_if_needed(record_count);
}
}
}
if let Some(final_batch) = boundary_state.finish()? {
if final_batch.offsets.len() > 1 {
let decoded = decode_records(&final_batch, &group_key_config)?;
let groups = grouper.add_records(decoded)?;
for group in groups {
let processed = (fns.process_fn)(group)?;
buffers.secondary.clear();
if let Some(ref secondary_fn) = fns.secondary_serialize_fn {
(secondary_fn)(&processed, &mut buffers.secondary)?;
}
buffers.serialized.clear();
let record_count = (fns.serialize_fn)(processed, &mut buffers.serialized)?;
compressor.write_all(&buffers.serialized)?;
compressor.maybe_compress()?;
compressor.write_blocks_to(output.as_mut())?;
if !buffers.secondary.is_empty() {
if let Some(ref mut sw) = secondary_writer {
sw.write_raw_bytes(&buffers.secondary)?;
}
}
progress.log_if_needed(record_count);
}
}
}
if let Some(final_group) = grouper.finish()? {
let processed = (fns.process_fn)(final_group)?;
buffers.secondary.clear();
if let Some(ref secondary_fn) = fns.secondary_serialize_fn {
(secondary_fn)(&processed, &mut buffers.secondary)?;
}
buffers.serialized.clear();
let record_count = (fns.serialize_fn)(processed, &mut buffers.serialized)?;
compressor.write_all(&buffers.serialized)?;
compressor.maybe_compress()?;
compressor.write_blocks_to(output.as_mut())?;
if !buffers.secondary.is_empty() {
if let Some(ref mut sw) = secondary_writer {
sw.write_raw_bytes(&buffers.secondary)?;
}
}
progress.log_if_needed(record_count);
}
compressor.flush()?;
compressor.write_blocks_to(output.as_mut())?;
output.flush()?;
output.write_all(&BGZF_EOF)?;
output.flush()?;
if let Some(writer) = secondary_writer {
writer.finish().map_err(|e| {
io::Error::new(e.kind(), format!("Failed to finalize secondary output: {e}"))
})?;
}
Ok(progress.count())
}
#[allow(clippy::too_many_lines, clippy::cast_possible_truncation)]
pub fn run_bam_pipeline<G, P>(
config: PipelineConfig,
input: Box<dyn Read + Send>,
output: Box<dyn Write + Send>,
grouper: Box<dyn Grouper<Group = G> + Send>,
fns: PipelineFunctions<G, P>,
group_key_config: GroupKeyConfig,
secondary_writer: Option<crate::bam_io::RawBamWriter>,
) -> io::Result<u64>
where
G: Send + BatchWeight + MemoryEstimate + 'static,
P: Send + MemoryEstimate + 'static,
{
let num_threads = config.num_threads;
let compression_level = config.compression_level;
let scheduler_strategy = config.scheduler_strategy;
if num_threads == 1 {
return run_bam_pipeline_single_threaded(
&config,
input,
output,
grouper,
fns,
group_key_config,
secondary_writer,
);
}
let mut state = BamPipelineState::<G, P>::new(config, input, output, group_key_config);
if let Some(sw) = secondary_writer {
state.output.set_secondary_output(sw);
}
let state = Arc::new(state);
if let Some(stats) = state.stats() {
stats.set_num_threads(num_threads);
#[cfg(feature = "memory-debug")]
stats.set_infrastructure_memory(num_threads, state.config.queue_capacity);
}
let group_state = Arc::new(Mutex::new(GroupState::new(grouper)));
let fns = Arc::new(fns);
let handles: Vec<_> = (0..num_threads)
.map(|thread_id| {
let state = Arc::clone(&state);
let group_state = Arc::clone(&group_state);
let fns = Arc::clone(&fns);
thread::spawn(move || {
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let mut worker = WorkerState::new(
compression_level,
thread_id,
num_threads,
scheduler_strategy,
);
let ctx = BamStepContext {
state: &state,
group_state: &group_state,
fns: &fns,
is_reader: thread_id == 0,
};
generic_worker_loop(&ctx, &mut worker);
}));
if let Err(panic_info) = result {
handle_worker_panic(&*state, thread_id, panic_info);
}
})
})
.collect();
let monitor_handle = if state.stats().is_some() || state.deadlock_state.is_enabled() {
let state_clone = Arc::clone(&state);
Some(thread::spawn(move || {
let start_time = Instant::now();
let mut deadlock_check_counter = 0u32;
loop {
thread::sleep(Duration::from_millis(100));
if state_clone.is_complete() || state_clone.has_error() {
break;
}
let queue_sizes = [
state_clone.q1_raw_blocks.len(),
state_clone.q2_decompressed.len(),
state_clone.q2b_boundaries.len(),
state_clone.q3_decoded.len(),
state_clone.output.groups.len(),
state_clone.output.processed.len(),
state_clone.output.serialized.len(),
state_clone.output.compressed.len(),
];
let (q2_reorder_len, q2_reorder_mem) = {
let reorder = state_clone.q2_reorder.lock();
(reorder.len(), reorder.total_heap_size() as u64)
};
let (q3_reorder_len, q3_reorder_mem) = {
let reorder = state_clone.q3_reorder.lock();
(reorder.len(), reorder.total_heap_size() as u64)
};
let (q7_reorder_len, q7_reorder_mem) = {
let reorder = state_clone.output.write_reorder.lock();
(reorder.len(), reorder.total_heap_size() as u64)
};
let reorder_sizes = [q2_reorder_len, q3_reorder_len, q7_reorder_len];
let reorder_memory_bytes = [q2_reorder_mem, q3_reorder_mem, q7_reorder_mem];
#[cfg(feature = "memory-debug")]
let q1_mem = state_clone.q1_heap_bytes.load(Ordering::Relaxed);
#[cfg(not(feature = "memory-debug"))]
let q1_mem: u64 = 0;
let q2_mem = state_clone.q2_reorder_state.heap_bytes.load(Ordering::Relaxed);
let q3_mem = state_clone.q3_reorder_state.heap_bytes.load(Ordering::Relaxed);
let q4_mem = state_clone.output.groups_heap_bytes.load(Ordering::Relaxed);
let q5_mem = state_clone.output.processed_heap_bytes.load(Ordering::Relaxed);
let q6_mem = state_clone.output.serialized_heap_bytes.load(Ordering::Relaxed);
let q7_mem = state_clone.output.compressed_heap_bytes.load(Ordering::Relaxed);
let queue_memory_bytes =
[q1_mem, q2_mem, 0, q3_mem, q4_mem, q5_mem, q6_mem, q7_mem];
let thread_steps: Vec<u8> = if let Some(stats) = state_clone.stats() {
let num_threads = stats.num_threads.load(Ordering::Relaxed) as usize;
(0..num_threads)
.map(|tid| stats.per_thread_current_step[tid].load(Ordering::Relaxed))
.collect()
} else {
Vec::new()
};
if let Some(stats) = state_clone.stats() {
let total_mem = q1_mem
+ q2_mem
+ q3_mem
+ q7_reorder_mem
+ q4_mem
+ q5_mem
+ q6_mem
+ q7_mem;
stats.record_memory_usage(total_mem);
#[cfg(feature = "memory-debug")]
stats.update_queue_memory_from_external(&[
("q1", q1_mem),
("q2", q2_mem),
("q3", q3_mem),
("q4", q4_mem),
("q5", q5_mem),
("q6", q6_mem),
("q7", q7_mem),
]);
stats.add_queue_sample(QueueSample {
time_ms: start_time.elapsed().as_millis() as u64,
queue_sizes,
reorder_sizes,
queue_memory_bytes,
reorder_memory_bytes,
thread_steps,
});
}
if state_clone.deadlock_state.is_enabled() {
deadlock_check_counter += 1;
if deadlock_check_counter >= 10 {
deadlock_check_counter = 0;
let snapshot = state_clone.build_queue_snapshot();
if let DeadlockAction::Detected =
check_deadlock_and_restore(&state_clone.deadlock_state, &snapshot)
{
state_clone.set_error(io::Error::new(
io::ErrorKind::TimedOut,
"pipeline deadlock detected with recovery disabled; \
use --deadlock-recover to enable automatic recovery",
));
break;
}
}
}
}
}))
} else {
None
};
join_worker_threads(handles)?;
join_monitor_thread(monitor_handle);
let result = finalize_pipeline(&*state);
if let Some(ref secondary_mutex) = state.output.secondary_output {
let mut guard = secondary_mutex.lock();
if let Some(writer) = guard.take() {
if let Err(e) = writer.finish().map_err(|e| {
io::Error::new(e.kind(), format!("Failed to finalize secondary output: {e}"))
}) {
if result.is_err() {
log::error!("Secondary output finalization also failed: {e}");
} else {
return Err(e);
}
}
}
}
result
}
thread_local! {
static SERIALIZE_RECORD_BUFFER: std::cell::RefCell<Vec<u8>> =
std::cell::RefCell::new(Vec::with_capacity(512));
}
#[allow(clippy::cast_possible_truncation)]
pub fn serialize_bam_records_into(
records: &[RecordBuf],
header: &Header,
output: &mut Vec<u8>,
) -> io::Result<u64> {
use crate::vendored::bam_codec::encode_record_buf;
log::trace!("serialize_bam_records_into: {} records", records.len());
let estimated_batch_size = records.len() * 400;
output.reserve(estimated_batch_size);
SERIALIZE_RECORD_BUFFER.with(|buf| {
let mut record_data = buf.borrow_mut();
for (i, record) in records.iter().enumerate() {
record_data.clear();
if let Err(e) = encode_record_buf(&mut record_data, header, record) {
log::error!(
"serialize_bam_records_into: failed to encode record {}: {:?}, name={:?}, seq_len={}, qual_len={}",
i,
e,
record.name(),
record.sequence().len(),
record.quality_scores().len(),
);
return Err(e);
}
let block_size = record_data.len() as u32;
output.extend_from_slice(&block_size.to_le_bytes());
output.extend_from_slice(&record_data);
}
Ok(records.len() as u64)
})
}
pub fn serialize_bam_records(
records: &[RecordBuf],
header: &Header,
) -> io::Result<SerializedBatch> {
let mut data = Vec::with_capacity(records.len() * 256);
let record_count = serialize_bam_records_into(records, header, &mut data)?;
Ok(SerializedBatch { data, record_count, secondary_data: None })
}
pub fn serialize_bam_record(record: &RecordBuf, header: &Header) -> io::Result<SerializedBatch> {
let mut data = Vec::with_capacity(256);
let record_count = serialize_bam_record_into(record, header, &mut data)?;
Ok(SerializedBatch { data, record_count, secondary_data: None })
}
#[allow(clippy::cast_possible_truncation)]
pub fn serialize_bam_record_into(
record: &RecordBuf,
header: &Header,
output: &mut Vec<u8>,
) -> io::Result<u64> {
use crate::vendored::bam_codec::encode_record_buf;
SERIALIZE_RECORD_BUFFER.with(|buf| {
let mut record_data = buf.borrow_mut();
record_data.clear();
encode_record_buf(&mut record_data, header, record)?;
let block_size = record_data.len() as u32;
output.extend_from_slice(&block_size.to_le_bytes());
output.extend_from_slice(&record_data);
Ok(1)
})
}
#[allow(clippy::cast_possible_truncation)]
pub fn serialize_bam_records_to_compressor(
records: &[RecordBuf],
header: &Header,
compressor: &mut crate::bgzf_writer::InlineBgzfCompressor,
) -> io::Result<u64> {
use crate::vendored::bam_codec::encode_record_buf;
log::trace!("serialize_bam_records_to_compressor: {} records", records.len());
SERIALIZE_RECORD_BUFFER.with(|buf| {
let mut record_data = buf.borrow_mut();
for (i, record) in records.iter().enumerate() {
record_data.clear();
if let Err(e) = encode_record_buf(&mut record_data, header, record) {
log::error!(
"serialize_bam_records_to_compressor: failed to encode record {}: {:?}, name={:?}, seq_len={}, qual_len={}",
i,
e,
record.name(),
record.sequence().len(),
record.quality_scores().len(),
);
return Err(e);
}
let block_size = record_data.len() as u32;
let buffer = compressor.buffer_mut();
buffer.extend_from_slice(&block_size.to_le_bytes());
buffer.extend_from_slice(&record_data);
compressor.maybe_compress()?;
}
Ok(records.len() as u64)
})
}
#[derive(Debug, Clone)]
pub struct BamPipelineConfig {
pub pipeline: PipelineConfig,
pub compression_level: u32,
pub group_key_config: Option<GroupKeyConfig>,
}
impl BamPipelineConfig {
#[must_use]
pub fn new(num_threads: usize, compression_level: u32) -> Self {
Self {
pipeline: PipelineConfig::new(num_threads, compression_level),
compression_level,
group_key_config: None,
}
}
#[must_use]
pub fn auto_tuned(num_threads: usize, compression_level: u32) -> Self {
Self {
pipeline: PipelineConfig::auto_tuned(num_threads, compression_level),
compression_level,
group_key_config: None,
}
}
#[must_use]
pub fn with_compression_level(mut self, level: u32) -> Self {
self.compression_level = level;
self.pipeline.compression_level = level;
self
}
#[must_use]
pub fn with_group_key_config(mut self, config: GroupKeyConfig) -> Self {
self.group_key_config = Some(config);
self
}
}
fn open_pipeline_output(output_path: &Path) -> io::Result<Box<dyn Write + Send>> {
if is_stdout_path(output_path) {
Ok(Box::new(std::io::stdout()))
} else {
let file = File::create(output_path)
.map_err(|e| io::Error::new(e.kind(), format!("Failed to create output: {e}")))?;
Ok(Box::new(file))
}
}
pub fn run_bam_pipeline_with_grouper<G, P, GrouperFn, ProcessFn, SerializeFn>(
config: BamPipelineConfig,
input_path: &Path,
output_path: &Path,
grouper_fn: GrouperFn,
process_fn: ProcessFn,
serialize_fn: SerializeFn,
) -> io::Result<u64>
where
G: Send + BatchWeight + MemoryEstimate + 'static,
P: Send + MemoryEstimate + 'static,
GrouperFn: FnOnce(&Header) -> Box<dyn Grouper<Group = G> + Send>,
ProcessFn: Fn(G) -> io::Result<P> + Send + Sync + 'static,
SerializeFn: Fn(P, &Header, &mut Vec<u8>) -> io::Result<u64> + Send + Sync + 'static,
{
let header = {
let input_file = File::open(input_path)
.map_err(|e| io::Error::new(e.kind(), format!("Failed to open input: {e}")))?;
let mut bam_reader = bam::io::Reader::new(input_file);
bam_reader.read_header().map_err(|e| {
io::Error::new(io::ErrorKind::InvalidData, format!("Failed to read BAM header: {e}"))
})?
};
let output_writer = open_pipeline_output(output_path)?;
let mut header_writer = bam::io::Writer::new(output_writer);
header_writer
.write_header(&header)
.map_err(|e| io::Error::other(format!("Failed to write BAM header: {e}")))?;
let mut bgzf_writer = header_writer.into_inner();
bgzf_writer
.try_finish()
.map_err(|e| io::Error::other(format!("Failed to finish BGZF header: {e}")))?;
let output = bgzf_writer.into_inner();
let input = File::open(input_path)
.map_err(|e| io::Error::new(e.kind(), format!("Failed to re-open input: {e}")))?;
let input = BufReader::with_capacity(IO_BUFFER_SIZE, input);
let output = BufWriter::with_capacity(IO_BUFFER_SIZE, output);
let group_key_config = config.group_key_config.unwrap_or_else(|| {
let library_index = LibraryIndex::from_header(&header);
let cell_tag = Tag::from(SamTag::CB);
GroupKeyConfig::new(library_index, cell_tag)
});
let grouper = grouper_fn(&header);
let header_clone = header.clone();
let fns = PipelineFunctions::new(process_fn, move |p: P, buf: &mut Vec<u8>| {
serialize_fn(p, &header_clone, buf)
});
run_bam_pipeline(
config.pipeline,
Box::new(input),
Box::new(output),
grouper,
fns,
group_key_config,
None,
)
}
pub fn run_bam_pipeline_with_header<G, P, GrouperFn, ProcessFn, SerializeFn>(
config: BamPipelineConfig,
input_path: &Path,
output_path: &Path,
output_header: Header,
grouper_fn: GrouperFn,
process_fn: ProcessFn,
serialize_fn: SerializeFn,
) -> io::Result<u64>
where
G: Send + BatchWeight + MemoryEstimate + 'static,
P: Send + MemoryEstimate + 'static,
GrouperFn: FnOnce(&Header) -> Box<dyn Grouper<Group = G> + Send>,
ProcessFn: Fn(G) -> io::Result<P> + Send + Sync + 'static,
SerializeFn: Fn(P, &Header, &mut Vec<u8>) -> io::Result<u64> + Send + Sync + 'static,
{
let input_header = {
let input_file = File::open(input_path)
.map_err(|e| io::Error::new(e.kind(), format!("Failed to open input: {e}")))?;
let mut bam_reader = bam::io::Reader::new(input_file);
bam_reader.read_header().map_err(|e| {
io::Error::new(io::ErrorKind::InvalidData, format!("Failed to read BAM header: {e}"))
})?
};
let output_writer = open_pipeline_output(output_path)?;
let mut header_writer = bam::io::Writer::new(output_writer);
header_writer
.write_header(&output_header)
.map_err(|e| io::Error::other(format!("Failed to write BAM header: {e}")))?;
let mut bgzf_writer = header_writer.into_inner();
bgzf_writer
.try_finish()
.map_err(|e| io::Error::other(format!("Failed to finish BGZF header: {e}")))?;
let output = bgzf_writer.into_inner();
let input = File::open(input_path)
.map_err(|e| io::Error::new(e.kind(), format!("Failed to re-open input: {e}")))?;
let input = BufReader::with_capacity(IO_BUFFER_SIZE, input);
let output = BufWriter::with_capacity(IO_BUFFER_SIZE, output);
let group_key_config = config.group_key_config.unwrap_or_else(|| {
let library_index = LibraryIndex::from_header(&input_header);
let cell_tag = Tag::from(SamTag::CB);
GroupKeyConfig::new(library_index, cell_tag)
});
let grouper = grouper_fn(&input_header);
let fns = PipelineFunctions::new(process_fn, move |p: P, buf: &mut Vec<u8>| {
serialize_fn(p, &output_header, buf)
});
run_bam_pipeline(
config.pipeline,
Box::new(input),
Box::new(output),
grouper,
fns,
group_key_config,
None,
)
}
#[allow(clippy::too_many_arguments, clippy::needless_pass_by_value)]
pub fn run_bam_pipeline_from_reader<G, P, R, GrouperFn, ProcessFn, SerializeFn>(
config: BamPipelineConfig,
input: R,
input_header: Header,
output_path: &Path,
output_header: Option<Header>,
grouper_fn: GrouperFn,
process_fn: ProcessFn,
serialize_fn: SerializeFn,
) -> io::Result<u64>
where
G: Send + BatchWeight + MemoryEstimate + 'static,
P: Send + MemoryEstimate + 'static,
R: Read + Send + 'static,
GrouperFn: FnOnce(&Header) -> Box<dyn Grouper<Group = G> + Send>,
ProcessFn: Fn(G) -> io::Result<P> + Send + Sync + 'static,
SerializeFn: Fn(P, &Header, &mut Vec<u8>) -> io::Result<u64> + Send + Sync + 'static,
{
let output_header = output_header.unwrap_or_else(|| input_header.clone());
let output_writer = open_pipeline_output(output_path)?;
let mut header_writer = bam::io::Writer::new(output_writer);
header_writer
.write_header(&output_header)
.map_err(|e| io::Error::other(format!("Failed to write BAM header: {e}")))?;
let mut bgzf_writer = header_writer.into_inner();
bgzf_writer
.try_finish()
.map_err(|e| io::Error::other(format!("Failed to finish BGZF header: {e}")))?;
let output = bgzf_writer.into_inner();
let output = BufWriter::with_capacity(IO_BUFFER_SIZE, output);
let group_key_config = config.group_key_config.unwrap_or_else(|| {
let library_index = LibraryIndex::from_header(&input_header);
let cell_tag = Tag::from(SamTag::CB);
GroupKeyConfig::new(library_index, cell_tag)
});
let grouper = grouper_fn(&input_header);
let fns = PipelineFunctions::new(process_fn, move |p: P, buf: &mut Vec<u8>| {
serialize_fn(p, &output_header, buf)
});
run_bam_pipeline(
config.pipeline,
Box::new(input),
Box::new(output),
grouper,
fns,
group_key_config,
None,
)
}
#[allow(clippy::too_many_arguments, clippy::needless_pass_by_value)]
pub fn run_bam_pipeline_from_reader_with_secondary<
G,
P,
R,
GrouperFn,
ProcessFn,
SerializeFn,
SecondaryFn,
>(
config: BamPipelineConfig,
input: R,
input_header: Header,
output_path: &Path,
output_header: Option<Header>,
secondary_output_path: &Path,
grouper_fn: GrouperFn,
process_fn: ProcessFn,
serialize_fn: SerializeFn,
secondary_serialize_fn: SecondaryFn,
) -> io::Result<u64>
where
G: Send + BatchWeight + MemoryEstimate + 'static,
P: Send + MemoryEstimate + 'static,
R: Read + Send + 'static,
GrouperFn: FnOnce(&Header) -> Box<dyn Grouper<Group = G> + Send>,
ProcessFn: Fn(G) -> io::Result<P> + Send + Sync + 'static,
SerializeFn: Fn(P, &Header, &mut Vec<u8>) -> io::Result<u64> + Send + Sync + 'static,
SecondaryFn: Fn(&P, &mut Vec<u8>) -> io::Result<u64> + Send + Sync + 'static,
{
let output_header = output_header.unwrap_or_else(|| input_header.clone());
let output_writer = open_pipeline_output(output_path)?;
let mut header_writer = bam::io::Writer::new(output_writer);
header_writer
.write_header(&output_header)
.map_err(|e| io::Error::other(format!("Failed to write BAM header: {e}")))?;
let mut bgzf_writer = header_writer.into_inner();
bgzf_writer
.try_finish()
.map_err(|e| io::Error::other(format!("Failed to finish BGZF header: {e}")))?;
let output = bgzf_writer.into_inner();
let output = BufWriter::with_capacity(IO_BUFFER_SIZE, output);
let secondary_writer = crate::bam_io::create_raw_bam_writer(
secondary_output_path,
&output_header,
1, config.compression_level,
)
.map_err(|e| io::Error::other(format!("Failed to create secondary output: {e}")))?;
let group_key_config = config.group_key_config.unwrap_or_else(|| {
let library_index = LibraryIndex::from_header(&input_header);
let cell_tag = Tag::from(SamTag::CB);
GroupKeyConfig::new(library_index, cell_tag)
});
let grouper = grouper_fn(&input_header);
let fns = PipelineFunctions::new(process_fn, move |p: P, buf: &mut Vec<u8>| {
serialize_fn(p, &output_header, buf)
})
.with_secondary_serialize(secondary_serialize_fn);
let pipeline_config = config.pipeline;
run_bam_pipeline(
pipeline_config,
Box::new(input),
Box::new(output),
grouper,
fns,
group_key_config,
Some(secondary_writer),
)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::read_info::LibraryIndex;
fn create_test_state(memory_limit: u64) -> BamPipelineState<(), ()> {
let config = PipelineConfig::new(2, 6).with_queue_memory_limit(memory_limit);
let input: Box<dyn Read + Send> = Box::new(std::io::empty());
let output: Box<dyn Write + Send> = Box::new(std::io::sink());
let header = Header::default();
let library_index = LibraryIndex::from_header(&header);
let group_key_config = GroupKeyConfig::new(library_index, SamTag::CB.into());
BamPipelineState::new(config, input, output, group_key_config)
}
#[test]
fn test_can_decompress_proceed_no_limit() {
let state = create_test_state(0); assert!(state.can_decompress_proceed(0));
assert!(state.can_decompress_proceed(100));
}
#[test]
fn test_can_decompress_proceed_under_limit() {
let state = create_test_state(1024 * 1024); state.q2_reorder_state.heap_bytes.store(100_000, Ordering::SeqCst);
assert!(state.can_decompress_proceed(5));
}
#[test]
fn test_can_decompress_proceed_over_limit_but_needed_serial() {
let state = create_test_state(1024 * 1024); state.q2_reorder_state.heap_bytes.store(600_000, Ordering::SeqCst);
state.q2_reorder_state.next_seq.store(5, Ordering::SeqCst);
assert!(state.can_decompress_proceed(5));
assert!(!state.can_decompress_proceed(6));
assert!(!state.can_decompress_proceed(10));
}
#[test]
fn test_can_decompress_proceed_over_limit() {
let state = create_test_state(1024 * 1024); state.q2_reorder_state.heap_bytes.store(600_000, Ordering::SeqCst);
state.q2_reorder_state.next_seq.store(0, Ordering::SeqCst);
assert!(!state.can_decompress_proceed(5));
}
#[test]
fn test_can_decode_proceed_no_limit() {
let state = create_test_state(0); assert!(state.can_decode_proceed(0));
assert!(state.can_decode_proceed(100));
}
#[test]
fn test_can_decode_proceed_under_limit() {
let state = create_test_state(1024 * 1024); state.q3_reorder_state.heap_bytes.store(100_000, Ordering::SeqCst);
assert!(state.can_decode_proceed(5));
}
#[test]
fn test_can_decode_proceed_over_limit_but_needed_serial() {
let state = create_test_state(1024 * 1024); state.q3_reorder_state.heap_bytes.store(600_000, Ordering::SeqCst);
state.q3_reorder_state.next_seq.store(5, Ordering::SeqCst);
assert!(state.can_decode_proceed(5));
assert!(!state.can_decode_proceed(6));
}
#[test]
fn test_is_memory_high_threshold() {
let state = create_test_state(1024 * 1024 * 1024); state.q3_reorder_state.heap_bytes.store(500 * 1024 * 1024, Ordering::SeqCst);
assert!(!state.is_memory_high());
state.q3_reorder_state.heap_bytes.store(512 * 1024 * 1024, Ordering::SeqCst);
assert!(state.is_memory_high());
}
#[test]
fn test_is_memory_drained_threshold() {
let state = create_test_state(1024 * 1024 * 1024); state.q3_reorder_state.heap_bytes.store(200 * 1024 * 1024, Ordering::SeqCst);
assert!(state.is_memory_drained());
state.q3_reorder_state.heap_bytes.store(256 * 1024 * 1024, Ordering::SeqCst);
assert!(!state.is_memory_drained());
}
#[test]
fn test_validation_passes_when_complete() {
let state = create_test_state(0);
state.read_done.store(true, Ordering::SeqCst);
state.group_done.store(true, Ordering::SeqCst);
let result = state.validate_completion();
assert!(result.is_ok(), "Validation should pass: {result:?}");
}
#[test]
fn test_validation_detects_non_empty_q1() {
let state = create_test_state(0);
state.read_done.store(true, Ordering::SeqCst);
state.group_done.store(true, Ordering::SeqCst);
let batch = RawBlockBatch { blocks: vec![] };
assert!(state.q1_raw_blocks.push((0, batch)).is_ok());
let result = state.validate_completion();
assert!(result.is_err());
let err = result.unwrap_err();
assert!(err.non_empty_queues.iter().any(|s| s.contains("q1_raw_blocks")));
}
#[test]
fn test_validation_detects_non_empty_q2() {
let state = create_test_state(0);
state.read_done.store(true, Ordering::SeqCst);
state.group_done.store(true, Ordering::SeqCst);
let batch = DecompressedBatch { data: vec![] };
assert!(state.q2_decompressed.push((0, batch)).is_ok());
let result = state.validate_completion();
assert!(result.is_err());
let err = result.unwrap_err();
assert!(err.non_empty_queues.iter().any(|s| s.contains("q2_decompressed")));
}
#[test]
fn test_validation_detects_counter_mismatch_decompressed() {
let state = create_test_state(0);
state.read_done.store(true, Ordering::SeqCst);
state.group_done.store(true, Ordering::SeqCst);
state.next_read_serial.store(5, Ordering::SeqCst);
state.batches_decompressed.store(3, Ordering::SeqCst);
let result = state.validate_completion();
assert!(result.is_err());
let err = result.unwrap_err();
assert!(err.counter_mismatches.iter().any(|s| s.contains("batches_decompressed")));
}
#[test]
fn test_validation_detects_counter_mismatch_boundary_processed() {
let state = create_test_state(0);
state.read_done.store(true, Ordering::SeqCst);
state.group_done.store(true, Ordering::SeqCst);
state.next_read_serial.store(5, Ordering::SeqCst);
state.batches_decompressed.store(5, Ordering::SeqCst);
state.batches_boundary_processed.store(3, Ordering::SeqCst);
let result = state.validate_completion();
assert!(result.is_err());
let err = result.unwrap_err();
assert!(err.counter_mismatches.iter().any(|s| s.contains("batches_boundary_processed")));
}
#[test]
fn test_validation_detects_counter_mismatch_grouped() {
let state = create_test_state(0);
state.read_done.store(true, Ordering::SeqCst);
state.group_done.store(true, Ordering::SeqCst);
state.next_read_serial.store(5, Ordering::SeqCst);
state.batches_decompressed.store(5, Ordering::SeqCst);
state.batches_boundary_processed.store(5, Ordering::SeqCst);
state.batches_boundary_found.store(5, Ordering::SeqCst);
state.batches_decoded.store(5, Ordering::SeqCst);
state.batches_grouped.store(3, Ordering::SeqCst);
let result = state.validate_completion();
assert!(result.is_err());
let err = result.unwrap_err();
assert!(err.counter_mismatches.iter().any(|s| s.contains("batches_grouped")));
}
#[test]
fn test_validation_error_display() {
let err = PipelineValidationError {
non_empty_queues: vec!["q1 (5)".to_string(), "q2 (3)".to_string()],
counter_mismatches: vec!["batches_x (5) != batches_y (3)".to_string()],
leaked_heap_bytes: 0,
};
let display = err.to_string();
assert!(display.contains("q1"));
assert!(display.contains("q2"));
assert!(display.contains("batches_x"));
}
#[test]
fn test_validation_detects_non_empty_reorder_buffer() {
let state = create_test_state(0);
state.read_done.store(true, Ordering::SeqCst);
state.group_done.store(true, Ordering::SeqCst);
{
let mut q2_reorder = state.q2_reorder.lock();
let batch = DecompressedBatch { data: vec![] };
q2_reorder.insert(0, batch);
}
let result = state.validate_completion();
assert!(result.is_err());
let err = result.unwrap_err();
assert!(err.non_empty_queues.iter().any(|s| s.contains("q2_reorder")));
}
#[test]
fn test_validation_detects_non_empty_q3_reorder() {
let state = create_test_state(0);
state.read_done.store(true, Ordering::SeqCst);
state.group_done.store(true, Ordering::SeqCst);
{
let mut q3_reorder = state.q3_reorder.lock();
let batch: Vec<DecodedRecord> = vec![];
q3_reorder.insert(0, batch);
}
let result = state.validate_completion();
assert!(result.is_err());
let err = result.unwrap_err();
assert!(err.non_empty_queues.iter().any(|s| s.contains("q3_reorder")));
}
#[test]
fn test_validation_detects_non_empty_output_queue() {
let state = create_test_state(0);
state.read_done.store(true, Ordering::SeqCst);
state.group_done.store(true, Ordering::SeqCst);
let batch: Vec<()> = vec![()];
assert!(state.output.groups.push((0, batch)).is_ok());
let result = state.validate_completion();
assert!(result.is_err());
let err = result.unwrap_err();
assert!(err.non_empty_queues.iter().any(|s| s.contains("q4_groups")));
}
#[test]
fn test_validation_detects_non_empty_write_reorder() {
let state = create_test_state(0);
state.read_done.store(true, Ordering::SeqCst);
state.group_done.store(true, Ordering::SeqCst);
{
let mut write_reorder = state.output.write_reorder.lock();
let batch =
CompressedBlockBatch { blocks: vec![], record_count: 0, secondary_data: None };
write_reorder.insert(0, batch);
}
let result = state.validate_completion();
assert!(result.is_err());
let err = result.unwrap_err();
assert!(err.non_empty_queues.iter().any(|s| s.contains("write_reorder")));
}
fn create_test_worker() -> WorkerState<()> {
WorkerState::new(6, 0, 2, SchedulerStrategy::default())
}
fn setup_memory_backpressure(reorder_state: &ReorderBufferState) {
reorder_state.heap_bytes.store(800, Ordering::SeqCst);
reorder_state.next_seq.store(0, Ordering::SeqCst);
}
#[test]
fn test_decompress_held_pushes_unconditionally_when_q2_has_room() {
let state = create_test_state(1024);
setup_memory_backpressure(&state.q2_reorder_state);
let raw = RawBlockBatch::new();
assert!(state.q1_raw_blocks.push((0, raw)).is_ok());
let mut worker = create_test_worker();
worker.held_decompressed = Some((50, DecompressedBatch { data: vec![0u8; 16] }, 16));
let result = try_step_decompress(&state, &mut worker);
assert!(result, "should succeed — held batch pushed, then new batch processed");
assert!(!state.has_error(), "should not error");
assert!(state.q1_raw_blocks.is_empty(), "Q1 should have been popped");
assert_eq!(state.q2_decompressed.len(), 2, "Q2 should have both batches");
assert!(worker.held_decompressed.is_none(), "held slot should be empty");
}
#[test]
fn test_decompress_held_blocked_by_full_q2() {
let state = create_test_state(1024);
let cap = state.q2_decompressed.capacity();
for i in 0..cap {
assert!(
state
.q2_decompressed
.push((i as u64, DecompressedBatch { data: vec![0u8; 8] }))
.is_ok(),
"failed to fill q2 at serial {i}"
);
}
assert!(state.q2_decompressed.is_full());
assert!(state.q1_raw_blocks.push((100, RawBlockBatch::new())).is_ok());
let mut worker = create_test_worker();
worker.held_decompressed = Some((50, DecompressedBatch { data: vec![0u8; 16] }, 16));
let result = try_step_decompress(&state, &mut worker);
assert!(!result, "should return false when Q2 is full");
assert!(!state.has_error(), "should NOT set error; got: {:?}", state.take_error());
assert!(worker.held_decompressed.is_some(), "held batch should be preserved");
assert!(!state.q1_raw_blocks.is_empty(), "Q1 should not have been popped");
}
#[test]
fn test_decode_held_pushes_unconditionally_when_q3_has_room() {
let state = create_test_state(1024);
setup_memory_backpressure(&state.q3_reorder_state);
let boundary = BoundaryBatch { buffer: Vec::new(), offsets: vec![0] };
assert!(state.q2b_boundaries.push((0, boundary)).is_ok());
let mut worker = create_test_worker();
worker.held_decoded = Some((50, vec![], 16));
let result = try_step_decode(&state, &mut worker);
assert!(result, "should succeed — held batch pushed, then new batch processed");
assert!(!state.has_error(), "should not error");
assert!(state.q2b_boundaries.is_empty(), "Q2b should have been popped");
assert_eq!(state.q3_decoded.len(), 2, "Q3 should have both batches");
assert!(worker.held_decoded.is_none(), "held slot should be empty");
}
#[test]
fn test_decode_held_blocked_by_full_q3() {
let state = create_test_state(1024);
let cap = state.q3_decoded.capacity();
for i in 0..cap {
assert!(
state.q3_decoded.push((i as u64, vec![])).is_ok(),
"failed to fill q3 at serial {i}"
);
}
assert!(state.q3_decoded.is_full());
let boundary = BoundaryBatch { buffer: Vec::new(), offsets: vec![0] };
assert!(state.q2b_boundaries.push((100, boundary)).is_ok());
let mut worker = create_test_worker();
worker.held_decoded = Some((50, vec![], 16));
let result = try_step_decode(&state, &mut worker);
assert!(!result, "should return false when Q3 is full");
assert!(!state.has_error(), "should NOT set error; got: {:?}", state.take_error());
assert!(worker.held_decoded.is_some(), "held batch should be preserved");
assert!(!state.q2b_boundaries.is_empty(), "Q2b should not have been popped");
}
#[test]
fn test_pipeline_functions_secondary_serialize() {
let fns = PipelineFunctions::<Vec<u8>, Vec<u8>>::new(Ok, |data, buf| {
buf.extend_from_slice(&data);
Ok(1)
});
assert!(fns.secondary_serialize_fn.is_none());
let fns = fns.with_secondary_serialize(|data: &Vec<u8>, buf: &mut Vec<u8>| {
buf.extend_from_slice(data);
Ok(1)
});
assert!(fns.secondary_serialize_fn.is_some());
let test_data = vec![1u8, 2, 3, 4];
let mut buf = Vec::new();
let count =
(fns.secondary_serialize_fn.as_ref().expect("secondary_serialize_fn should be set"))(
&test_data, &mut buf,
)
.expect("serialize should succeed");
assert_eq!(count, 1);
assert_eq!(buf, vec![1, 2, 3, 4]);
}
}