#![deny(unsafe_code)]
use crossbeam_channel::{Receiver, Sender, bounded};
use crossbeam_queue::ArrayQueue;
use fgumi_bgzf::reader::read_raw_blocks;
use fgumi_bgzf::writer::InlineBgzfCompressor;
use fgumi_bgzf::{RawBgzfBlock, decompress_block};
use log::info;
use std::collections::VecDeque;
use std::fmt::Write as FmtWrite;
use std::io::{BufReader, Read};
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
use std::time::Instant;
use crate::reorder_buffer::ReorderBuffer;
pub struct CompressJob {
pub data: Vec<u8>,
pub serial: u64,
pub result_tx: Sender<CompressResult>,
}
pub struct CompressResult {
pub serial: u64,
pub compressed: Vec<u8>,
pub recycled_buf: Vec<u8>,
}
#[derive(Debug, Default)]
pub(crate) struct PoolStats {
pub(crate) compress_jobs_submitted: AtomicU64,
}
impl PoolStats {
pub fn log_summary(&self) {
let compress = self.compress_jobs_submitted.load(Ordering::Relaxed);
info!(" Pool stats: {compress} compress jobs");
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum SortStep {
ReadInputBlocks = 0,
DecompressInput = 1,
CompressSpill = 2,
Phase2FileWork = 3,
CompressOutput = 4,
}
impl SortStep {
pub const COUNT: usize = 5;
#[must_use]
pub fn label(self) -> &'static str {
match self {
Self::ReadInputBlocks => "RdInp",
Self::DecompressInput => "DecInp",
Self::CompressSpill => "CmpSpl",
Self::Phase2FileWork => "P2File",
Self::CompressOutput => "CmpOut",
}
}
}
const SORT_MAX_THREADS: usize = 32;
pub(crate) struct SortPipelineStats {
pub step_ns: [AtomicU64; SortStep::COUNT],
pub step_count: [AtomicU64; SortStep::COUNT],
pub per_thread_step_counts: Box<[[AtomicU64; SortStep::COUNT]; SORT_MAX_THREADS]>,
pub per_thread_idle_ns: Box<[AtomicU64; SORT_MAX_THREADS]>,
pub num_threads: usize,
}
impl SortPipelineStats {
#[must_use]
pub fn new(num_threads: usize) -> Self {
Self {
step_ns: std::array::from_fn(|_| AtomicU64::new(0)),
step_count: std::array::from_fn(|_| AtomicU64::new(0)),
per_thread_step_counts: new_sort_2d_array(),
per_thread_idle_ns: new_sort_1d_array(),
num_threads,
}
}
pub fn record_step(&self, thread_id: usize, step: SortStep, elapsed_ns: u64) {
let step_idx = step as usize;
self.step_ns[step_idx].fetch_add(elapsed_ns, Ordering::Relaxed);
self.step_count[step_idx].fetch_add(1, Ordering::Relaxed);
if thread_id < SORT_MAX_THREADS {
self.per_thread_step_counts[thread_id][step_idx].fetch_add(1, Ordering::Relaxed);
}
}
pub fn record_idle(&self, thread_id: usize, idle_ns: u64) {
if thread_id < SORT_MAX_THREADS {
self.per_thread_idle_ns[thread_id].fetch_add(idle_ns, Ordering::Relaxed);
}
}
#[allow(clippy::cast_precision_loss)]
pub fn log_summary(&self) {
let mut s = String::with_capacity(1024);
writeln!(s, "=== Sort Pipeline Stats ===").expect("write to String");
let all_steps = [
SortStep::ReadInputBlocks,
SortStep::DecompressInput,
SortStep::CompressSpill,
SortStep::Phase2FileWork,
SortStep::CompressOutput,
];
for &step in &all_steps {
let idx = step as usize;
let count = self.step_count[idx].load(Ordering::Relaxed);
if count > 0 {
let ns = self.step_ns[idx].load(Ordering::Relaxed);
let secs = ns as f64 / 1_000_000_000.0;
writeln!(s, " {:<22} {count:>8} jobs, {secs:>6.1}s total", format!("{step:?}"))
.expect("write");
}
}
let nt = self.num_threads.min(SORT_MAX_THREADS);
if nt > 0 {
writeln!(s).expect("write");
writeln!(s, " Per-Thread Work Distribution:").expect("write");
write!(s, " Thread").expect("write");
for &step in &all_steps {
write!(s, " {:>8}", step.label()).expect("write");
}
writeln!(s, " Idle ms").expect("write");
for tid in 0..nt {
write!(s, " T{tid:<5}").expect("write");
for step_idx in 0..SortStep::COUNT {
let count = self.per_thread_step_counts[tid][step_idx].load(Ordering::Relaxed);
write!(s, " {count:>8}").expect("write");
}
let idle_ns = self.per_thread_idle_ns[tid].load(Ordering::Relaxed);
writeln!(s, " {:>10.1}", idle_ns as f64 / 1_000_000.0).expect("write");
}
write!(s, " Total ").expect("write");
for step_idx in 0..SortStep::COUNT {
let mut total = 0u64;
for tid in 0..nt {
total += self.per_thread_step_counts[tid][step_idx].load(Ordering::Relaxed);
}
write!(s, " {total:>8}").expect("write");
}
let total_idle: u64 =
(0..nt).map(|tid| self.per_thread_idle_ns[tid].load(Ordering::Relaxed)).sum();
writeln!(s, " {:>10.1}", total_idle as f64 / 1_000_000.0).expect("write");
let total_work_ns: u64 =
(0..SortStep::COUNT).map(|i| self.step_ns[i].load(Ordering::Relaxed)).sum();
let work_ms = total_work_ns as f64 / 1_000_000.0;
let idle_ms = total_idle as f64 / 1_000_000.0;
let total_ms = work_ms + idle_ms;
if total_ms > 0.0 {
let utilization = (work_ms / total_ms) * 100.0;
writeln!(s).expect("write");
writeln!(s, " Thread Utilization: {utilization:.1}% (work={work_ms:.1}ms idle={idle_ms:.1}ms)")
.expect("write");
}
}
for line in s.trim_end().lines() {
info!("{line}");
}
}
}
#[allow(clippy::unnecessary_box_returns)]
fn new_sort_2d_array() -> Box<[[AtomicU64; SortStep::COUNT]; SORT_MAX_THREADS]> {
let v: Vec<[AtomicU64; SortStep::COUNT]> =
(0..SORT_MAX_THREADS).map(|_| std::array::from_fn(|_| AtomicU64::new(0))).collect();
v.into_boxed_slice().try_into().expect("Vec length matches SORT_MAX_THREADS")
}
#[allow(clippy::unnecessary_box_returns)]
fn new_sort_1d_array() -> Box<[AtomicU64; SORT_MAX_THREADS]> {
let v: Vec<AtomicU64> = (0..SORT_MAX_THREADS).map(|_| AtomicU64::new(0)).collect();
v.into_boxed_slice().try_into().expect("Vec length matches SORT_MAX_THREADS")
}
pub struct BufferPool {
tx: Sender<Vec<u8>>,
rx: Receiver<Vec<u8>>,
}
impl BufferPool {
#[must_use]
pub fn new(capacity: usize) -> Self {
let (tx, rx) = bounded(capacity);
Self { tx, rx }
}
#[must_use]
pub fn checkout(&self) -> Vec<u8> {
self.rx.try_recv().unwrap_or_default()
}
#[must_use]
pub fn len(&self) -> usize {
self.rx.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.rx.is_empty()
}
pub fn checkin(&self, mut buf: Vec<u8>) {
buf.clear();
let _ = self.tx.try_send(buf);
}
}
impl Clone for BufferPool {
fn clone(&self) -> Self {
Self { tx: self.tx.clone(), rx: self.rx.clone() }
}
}
pub(crate) struct PermitPool {
tx: std::sync::Mutex<Option<Sender<()>>>,
rx: Receiver<()>,
}
impl PermitPool {
pub(crate) fn new(capacity: usize) -> Self {
let (tx, rx) = bounded(capacity);
for _ in 0..capacity {
tx.try_send(()).expect("fresh channel has capacity for initial permits");
}
Self { tx: std::sync::Mutex::new(Some(tx)), rx }
}
pub(crate) fn acquire(&self) -> anyhow::Result<()> {
self.rx.recv().map_err(|_| anyhow::anyhow!("permit pool closed: I/O writer thread exited"))
}
pub(crate) fn release(&self) {
if let Ok(guard) = self.tx.lock() {
if let Some(tx) = guard.as_ref() {
let _ = tx.try_send(());
}
}
}
pub(crate) fn close(&self) {
if let Ok(mut guard) = self.tx.lock() {
guard.take(); }
}
}
const INPUT_READ_BATCH_SIZE: usize = 16;
pub(crate) const PHASE2_RAW_CAP: usize = 8;
pub(crate) const PHASE2_DECOMP_CAP: usize = 8;
pub(crate) const PHASE2_READ_BATCH: usize = 4;
pub(crate) struct Phase2Reader {
pub(crate) inner: BufReader<std::fs::File>,
pub(crate) next_serial: u64,
pub(crate) eof: bool,
}
pub(crate) struct Phase2FileState {
pub(crate) reader: Mutex<Phase2Reader>,
pub(crate) reader_eof: AtomicBool,
pub(crate) raw_blocks: Mutex<VecDeque<(u64, RawBgzfBlock)>>,
pub(crate) decompressed: Mutex<ReorderBuffer<Vec<u8>>>,
pub(crate) decomp_in_flight: AtomicUsize,
}
impl Phase2FileState {
pub(crate) fn new(reader: BufReader<std::fs::File>) -> Self {
Self {
reader: Mutex::new(Phase2Reader { inner: reader, next_serial: 0, eof: false }),
reader_eof: AtomicBool::new(false),
raw_blocks: Mutex::new(VecDeque::with_capacity(PHASE2_RAW_CAP)),
decompressed: Mutex::new(ReorderBuffer::new()),
decomp_in_flight: AtomicUsize::new(0),
}
}
pub(crate) fn mark_reader_eof(&self, reader_guard: &mut Phase2Reader) {
reader_guard.eof = true;
self.reader_eof.store(true, Ordering::Release);
}
pub(crate) fn probe_stats(&self) -> (u64, u64, bool) {
let raw_len =
self.raw_blocks.lock().expect("phase2 raw_blocks mutex poisoned").len() as u64;
let decomp_guard = self.decompressed.lock().expect("phase2 decompressed mutex poisoned");
let decomp_len = decomp_guard.len() as u64;
let decomp_bytes: u64 = decomp_guard.iter().map(|buf| buf.len() as u64).sum();
drop(decomp_guard);
let active = !self.reader_eof.load(Ordering::Relaxed);
(raw_len + decomp_len, decomp_bytes, active)
}
pub(crate) fn is_drained(&self) -> bool {
if !self.reader_eof.load(Ordering::Acquire) {
return false;
}
let raw_empty =
self.raw_blocks.lock().expect("phase2 raw_blocks mutex poisoned").is_empty();
if !raw_empty {
return false;
}
if self.decomp_in_flight.load(Ordering::Acquire) > 0 {
return false;
}
self.decompressed.lock().expect("phase2 decompressed mutex poisoned").is_empty()
}
}
pub mod phase {
pub const SHUTDOWN: u8 = 0;
pub const PHASE1: u8 = 1;
pub const PHASE2: u8 = 2;
pub const LEGACY: u8 = 255;
}
pub struct SortWorkerPool {
shared: Arc<SharedPipelineState>,
workers: Option<Vec<JoinHandle<()>>>,
pub(crate) stats: PoolStats,
pub(crate) pipeline_stats: Arc<SortPipelineStats>,
pub buffer_pool: BufferPool,
num_workers: usize,
}
pub(crate) struct SharedPipelineState {
pub(crate) phase: AtomicU8,
pub(crate) input_file: std::sync::Mutex<Option<Box<dyn Read + Send>>>,
pub(crate) input_eof: AtomicBool,
pub(crate) input_read_error: Arc<AtomicBool>,
pub(crate) decompression_error: Arc<AtomicBool>,
pub(crate) chunk_read_error: Arc<AtomicBool>,
pub(crate) worker_panicked: Arc<AtomicBool>,
input_read_serial: AtomicU64,
pub(crate) raw_input_blocks: Arc<ArrayQueue<(u64, RawBgzfBlock)>>,
pub(crate) decompressed_input: Arc<ArrayQueue<(u64, Vec<u8>)>>,
pub(crate) decompressed_input_done: Arc<AtomicBool>,
input_blocks_queued: AtomicU64,
pub(crate) phase2_files: std::sync::RwLock<Arc<Vec<Phase2FileState>>>,
pub(crate) all_chunks_eof: Arc<AtomicBool>,
sources_at_eof: AtomicU64,
pub(crate) total_sources: AtomicU64,
pub(crate) compress_queue: Arc<ArrayQueue<CompressJob>>,
num_workers: usize,
main_thread_handle: std::thread::Thread,
}
impl SharedPipelineState {
fn new(num_workers: usize, main_thread_handle: std::thread::Thread) -> Self {
let data_queue_cap = num_workers * 8;
let compress_queue_cap = num_workers * 4;
Self {
phase: AtomicU8::new(phase::LEGACY),
input_file: std::sync::Mutex::new(None),
input_eof: AtomicBool::new(false),
input_read_error: Arc::new(AtomicBool::new(false)),
decompression_error: Arc::new(AtomicBool::new(false)),
chunk_read_error: Arc::new(AtomicBool::new(false)),
worker_panicked: Arc::new(AtomicBool::new(false)),
input_read_serial: AtomicU64::new(0),
raw_input_blocks: Arc::new(ArrayQueue::new(data_queue_cap)),
decompressed_input: Arc::new(ArrayQueue::new(data_queue_cap)),
decompressed_input_done: Arc::new(AtomicBool::new(false)),
input_blocks_queued: AtomicU64::new(0),
phase2_files: std::sync::RwLock::new(Arc::new(Vec::new())),
all_chunks_eof: Arc::new(AtomicBool::new(false)),
sources_at_eof: AtomicU64::new(0),
total_sources: AtomicU64::new(0),
compress_queue: Arc::new(ArrayQueue::new(compress_queue_cap)),
num_workers,
main_thread_handle,
}
}
pub(crate) fn phase2_files_snapshot(&self) -> Arc<Vec<Phase2FileState>> {
Arc::clone(&self.phase2_files.read().expect("phase2_files rwlock poisoned"))
}
fn get_backpressure(&self) -> SortBackpressureState {
let current_phase = self.phase.load(Ordering::Acquire);
let low_water = self.num_workers;
SortBackpressureState {
decompressed_input_low: self.decompressed_input.len() < low_water,
input_eof: self.input_eof.load(Ordering::Acquire),
decompressed_input_done: self.decompressed_input_done.load(Ordering::Acquire),
compress_has_items: !self.compress_queue.is_empty(),
phase: current_phase,
}
}
}
struct SortWorkerState {
worker_id: usize,
compressor: InlineBgzfCompressor,
output_compressor: InlineBgzfCompressor,
decompressor: libdeflater::Decompressor,
phase2_file_cursor: usize,
held_raw_input_blocks: Vec<(u64, RawBgzfBlock)>,
held_decompressed_input: Option<(u64, Vec<u8>)>,
backoff_us: u64,
idle_iter: u64,
}
impl SortWorkerState {
fn has_any_held_items(&self) -> bool {
!self.held_raw_input_blocks.is_empty() || self.held_decompressed_input.is_some()
}
}
#[allow(clippy::struct_excessive_bools)]
struct SortBackpressureState {
decompressed_input_low: bool,
input_eof: bool,
decompressed_input_done: bool,
compress_has_items: bool,
phase: u8,
}
fn get_sort_priorities(bp: &SortBackpressureState) -> &'static [SortStep] {
match bp.phase {
phase::PHASE1 => {
if bp.input_eof && !bp.compress_has_items && bp.decompressed_input_done {
&[]
} else if bp.compress_has_items && !bp.decompressed_input_low {
&[SortStep::CompressSpill, SortStep::DecompressInput, SortStep::ReadInputBlocks]
} else {
&[SortStep::DecompressInput, SortStep::ReadInputBlocks, SortStep::CompressSpill]
}
}
phase::PHASE2 => {
if bp.compress_has_items {
&[SortStep::CompressOutput, SortStep::Phase2FileWork]
} else {
&[SortStep::Phase2FileWork, SortStep::CompressOutput]
}
}
_ => &[SortStep::CompressSpill],
}
}
const MIN_BACKOFF_US: u64 = 10;
const MAX_BACKOFF_US: u64 = 1000;
fn sleep_with_jitter(backoff_us: u64, worker_id: usize, iter: u64) {
if backoff_us <= MIN_BACKOFF_US {
std::thread::yield_now();
} else {
let jitter_range = backoff_us / 4;
let jitter_seed = (worker_id as u64).wrapping_mul(0x9e37_79b9_7f4a_7c15).wrapping_add(iter);
let jitter_offset = (jitter_seed % (jitter_range * 2)).saturating_sub(jitter_range);
let actual_us = backoff_us.saturating_add(jitter_offset).max(MIN_BACKOFF_US);
std::thread::sleep(std::time::Duration::from_micros(actual_us));
}
}
fn try_advance_held<T>(queue: &ArrayQueue<T>, held: &mut Option<T>) -> bool {
if let Some(item) = held.take() {
match queue.push(item) {
Ok(()) => true,
Err(item) => {
*held = Some(item);
false
}
}
} else {
true }
}
fn try_advance_held_batch<T>(queue: &ArrayQueue<T>, held: &mut Vec<T>) -> bool {
if held.is_empty() {
return true;
}
let batch = std::mem::take(held);
let mut iter = batch.into_iter();
for item in iter.by_ref() {
match queue.push(item) {
Ok(()) => {}
Err(item) => {
held.push(item);
break;
}
}
}
held.extend(iter); held.is_empty()
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum StepResult {
Success,
OutputFull,
InputEmpty,
}
impl SortWorkerPool {
#[must_use]
pub fn new(num_workers: usize, temp_compression: u32, output_compression: u32) -> Self {
let buffer_pool = BufferPool::new(num_workers * 4);
let stats = PoolStats::default();
let pipeline_stats = Arc::new(SortPipelineStats::new(num_workers));
let main_thread_handle = std::thread::current();
let shared = Arc::new(SharedPipelineState::new(num_workers, main_thread_handle));
let workers: Vec<JoinHandle<()>> = (0..num_workers)
.map(|worker_id| {
let pstats = Arc::clone(&pipeline_stats);
let shared = Arc::clone(&shared);
thread::spawn(move || {
let mut worker = SortWorkerState {
worker_id,
compressor: InlineBgzfCompressor::new(temp_compression),
output_compressor: InlineBgzfCompressor::new(output_compression),
decompressor: libdeflater::Decompressor::new(),
phase2_file_cursor: worker_id,
held_raw_input_blocks: Vec::new(),
held_decompressed_input: None,
backoff_us: MIN_BACKOFF_US,
idle_iter: 0,
};
Self::worker_loop(&shared, &mut worker, &pstats);
})
})
.collect();
Self { shared, workers: Some(workers), stats, pipeline_stats, buffer_pool, num_workers }
}
fn worker_loop(
shared: &SharedPipelineState,
worker: &mut SortWorkerState,
pstats: &SortPipelineStats,
) {
loop {
let current_phase = shared.phase.load(Ordering::Acquire);
if current_phase == phase::SHUTDOWN {
break;
}
if Self::is_phase_complete(shared, current_phase) && !worker.has_any_held_items() {
sleep_with_jitter(worker.backoff_us, worker.worker_id, worker.idle_iter);
worker.idle_iter = worker.idle_iter.wrapping_add(1);
worker.backoff_us = (worker.backoff_us * 2).min(MAX_BACKOFF_US);
continue;
}
let mut did_work = false;
did_work |= Self::try_advance_all_held(shared, worker);
let owned_step = Self::exclusive_step_for(worker.worker_id, shared, current_phase);
if !did_work {
if let Some(step) = owned_step {
if Self::is_step_eligible(step, shared, worker, current_phase) {
let t0 = Instant::now();
let result = Self::execute_step(shared, worker, step);
if result == StepResult::Success {
pstats.record_step(
worker.worker_id,
step,
Self::nanos_u64(t0.elapsed()),
);
did_work = true;
}
}
}
}
if !did_work {
let bp = shared.get_backpressure();
let priorities = get_sort_priorities(&bp);
for &step in priorities {
if !Self::is_step_eligible(step, shared, worker, current_phase) {
continue;
}
if Self::is_exclusive_step(step)
&& !Self::can_attempt_exclusive(owned_step, step, shared)
{
continue;
}
if owned_step == Some(step) {
continue;
}
let t0 = Instant::now();
match Self::execute_step(shared, worker, step) {
StepResult::Success => {
pstats.record_step(
worker.worker_id,
step,
Self::nanos_u64(t0.elapsed()),
);
did_work = true;
break; }
StepResult::OutputFull => break, StepResult::InputEmpty => {} }
}
}
if did_work {
worker.backoff_us = MIN_BACKOFF_US;
} else {
let idle_start = Instant::now();
sleep_with_jitter(worker.backoff_us, worker.worker_id, worker.idle_iter);
worker.idle_iter = worker.idle_iter.wrapping_add(1);
worker.backoff_us = (worker.backoff_us * 2).min(MAX_BACKOFF_US);
pstats.record_idle(worker.worker_id, Self::nanos_u64(idle_start.elapsed()));
}
}
}
fn is_phase_complete(shared: &SharedPipelineState, current_phase: u8) -> bool {
match current_phase {
phase::PHASE1 => {
shared.decompressed_input_done.load(Ordering::Acquire)
&& shared.compress_queue.is_empty()
}
phase::PHASE2 => {
if !shared.all_chunks_eof.load(Ordering::Acquire) {
return false;
}
if !shared.compress_queue.is_empty() {
return false;
}
let files = shared.phase2_files_snapshot();
files.iter().all(Phase2FileState::is_drained)
}
_ => false,
}
}
fn exclusive_step_for(
worker_id: usize,
shared: &SharedPipelineState,
current_phase: u8,
) -> Option<SortStep> {
if shared.num_workers < 2 {
return None; }
if worker_id != 0 {
return None; }
match current_phase {
phase::PHASE1 => Some(SortStep::ReadInputBlocks),
_ => None,
}
}
fn is_exclusive_step(step: SortStep) -> bool {
matches!(step, SortStep::ReadInputBlocks)
}
fn can_attempt_exclusive(
owned_step: Option<SortStep>,
step: SortStep,
shared: &SharedPipelineState,
) -> bool {
if owned_step == Some(step) {
return true;
}
shared.num_workers < 2
}
fn is_step_eligible(
step: SortStep,
shared: &SharedPipelineState,
worker: &SortWorkerState,
current_phase: u8,
) -> bool {
match step {
SortStep::ReadInputBlocks => {
current_phase == phase::PHASE1
&& !shared.input_eof.load(Ordering::Acquire)
&& worker.held_raw_input_blocks.is_empty()
}
SortStep::DecompressInput => {
current_phase == phase::PHASE1
&& worker.held_decompressed_input.is_none()
&& (!shared.raw_input_blocks.is_empty()
|| (shared.input_eof.load(Ordering::Acquire)
&& !shared.decompressed_input_done.load(Ordering::Acquire)))
}
SortStep::CompressSpill | SortStep::CompressOutput => !shared.compress_queue.is_empty(),
SortStep::Phase2FileWork => current_phase == phase::PHASE2,
}
}
fn execute_step(
shared: &SharedPipelineState,
worker: &mut SortWorkerState,
step: SortStep,
) -> StepResult {
match step {
SortStep::ReadInputBlocks => Self::try_read_input_blocks(shared, worker),
SortStep::DecompressInput => Self::try_decompress_input(shared, worker),
SortStep::CompressSpill => Self::try_compress(shared, &mut worker.compressor),
SortStep::CompressOutput => Self::try_compress(shared, &mut worker.output_compressor),
SortStep::Phase2FileWork => Self::try_phase2_file_work(shared, worker),
}
}
fn try_advance_all_held(shared: &SharedPipelineState, worker: &mut SortWorkerState) -> bool {
let mut advanced = false;
if !worker.held_raw_input_blocks.is_empty() {
let before = worker.held_raw_input_blocks.len();
try_advance_held_batch(&shared.raw_input_blocks, &mut worker.held_raw_input_blocks);
if worker.held_raw_input_blocks.len() < before {
advanced = true;
}
}
if worker.held_decompressed_input.is_some() {
let pushed =
try_advance_held(&shared.decompressed_input, &mut worker.held_decompressed_input);
if pushed {
shared.main_thread_handle.unpark();
advanced = true;
let queued = shared.input_blocks_queued.fetch_add(1, Ordering::AcqRel) + 1;
let total = shared.input_read_serial.load(Ordering::Acquire);
if shared.input_eof.load(Ordering::Acquire)
&& shared.raw_input_blocks.is_empty()
&& queued >= total
&& !shared.decompressed_input_done.load(Ordering::Acquire)
{
shared.decompressed_input_done.store(true, Ordering::Release);
shared.main_thread_handle.unpark();
}
}
}
advanced
}
fn try_read_input_blocks(
shared: &SharedPipelineState,
worker: &mut SortWorkerState,
) -> StepResult {
if shared.input_eof.load(Ordering::Acquire) {
return StepResult::InputEmpty;
}
let Ok(mut guard) = shared.input_file.try_lock() else {
return StepResult::InputEmpty; };
let Some(reader) = guard.as_mut() else {
return StepResult::InputEmpty; };
let blocks = match read_raw_blocks(reader.as_mut(), INPUT_READ_BATCH_SIZE) {
Ok(b) => b,
Err(e) => {
log::error!("I/O error reading input BAM: {e}");
shared.input_read_error.store(true, Ordering::Release);
shared.input_eof.store(true, Ordering::Release);
shared.main_thread_handle.unpark();
return StepResult::InputEmpty;
}
};
if blocks.is_empty() {
shared.input_eof.store(true, Ordering::Release);
return StepResult::InputEmpty;
}
drop(guard);
let mut blocks_iter = blocks.into_iter();
for block in blocks_iter.by_ref() {
let serial = shared.input_read_serial.fetch_add(1, Ordering::Relaxed);
match shared.raw_input_blocks.push((serial, block)) {
Ok(()) => {}
Err((serial, block)) => {
worker.held_raw_input_blocks.push((serial, block));
break;
}
}
}
for block in blocks_iter {
let serial = shared.input_read_serial.fetch_add(1, Ordering::Relaxed);
worker.held_raw_input_blocks.push((serial, block));
}
StepResult::Success
}
fn try_decompress_input(
shared: &SharedPipelineState,
worker: &mut SortWorkerState,
) -> StepResult {
if worker.held_decompressed_input.is_some() {
return StepResult::OutputFull;
}
let Some((serial, block)) = shared.raw_input_blocks.pop() else {
if shared.input_eof.load(Ordering::Acquire)
&& !shared.decompressed_input_done.load(Ordering::Acquire)
{
let queued = shared.input_blocks_queued.load(Ordering::Acquire);
let total = shared.input_read_serial.load(Ordering::Acquire);
if queued >= total {
shared.decompressed_input_done.store(true, Ordering::Release);
shared.main_thread_handle.unpark();
}
}
return StepResult::InputEmpty;
};
let data = match decompress_block(&block, &mut worker.decompressor) {
Ok(d) => d,
Err(e) => {
log::error!("BGZF decompression error (input block serial {serial}): {e}");
shared.decompression_error.store(true, Ordering::Release);
shared.main_thread_handle.unpark();
return StepResult::InputEmpty;
}
};
let input_eof = shared.input_eof.load(Ordering::Acquire);
let raw_empty = shared.raw_input_blocks.is_empty();
let pushed = match shared.decompressed_input.push((serial, data)) {
Ok(()) => {
shared.main_thread_handle.unpark();
true
}
Err(item) => {
worker.held_decompressed_input = Some(item);
shared.main_thread_handle.unpark();
false
}
};
if pushed {
let queued = shared.input_blocks_queued.fetch_add(1, Ordering::AcqRel) + 1;
let total = shared.input_read_serial.load(Ordering::Acquire);
if input_eof && raw_empty && queued >= total {
shared.decompressed_input_done.store(true, Ordering::Release);
shared.main_thread_handle.unpark();
}
}
StepResult::Success
}
fn try_phase2_file_work(
shared: &SharedPipelineState,
worker: &mut SortWorkerState,
) -> StepResult {
let files = shared.phase2_files_snapshot();
let n = files.len();
if n == 0 {
return StepResult::InputEmpty;
}
for offset in 0..n {
let i = (worker.phase2_file_cursor + offset) % n;
let file = &files[i];
let popped = Self::try_pop_raw_for_decompress(file);
if let Some((serial, raw_block)) = popped {
let data = match decompress_block(&raw_block, &mut worker.decompressor) {
Ok(d) => d,
Err(e) => {
log::error!(
"BGZF decompression error (chunk source {i} serial {serial}): {e}"
);
shared.decompression_error.store(true, Ordering::Release);
file.decomp_in_flight.fetch_sub(1, Ordering::AcqRel);
shared.main_thread_handle.unpark();
worker.phase2_file_cursor = (i + 1) % n;
return StepResult::Success;
}
};
let now_poppable = {
let mut dec_guard =
file.decompressed.lock().expect("phase2 decompressed mutex poisoned");
dec_guard.insert(serial, data);
dec_guard.can_pop()
};
file.decomp_in_flight.fetch_sub(1, Ordering::AcqRel);
if now_poppable || file.is_drained() {
shared.main_thread_handle.unpark();
}
worker.phase2_file_cursor = (i + 1) % n;
return StepResult::Success;
}
let Ok(mut reader_guard) = file.reader.try_lock() else {
continue; };
if reader_guard.eof {
continue;
}
let raw_full = match file.raw_blocks.try_lock() {
Ok(g) => g.len() >= PHASE2_RAW_CAP,
Err(_) => true,
};
if raw_full {
continue;
}
let blocks = match read_raw_blocks(&mut reader_guard.inner, PHASE2_READ_BATCH) {
Ok(b) => b,
Err(e) => {
log::error!("I/O error reading chunk file (source {i}): {e}");
shared.chunk_read_error.store(true, Ordering::Release);
file.mark_reader_eof(&mut reader_guard);
drop(reader_guard);
shared.main_thread_handle.unpark();
Self::maybe_mark_all_eof(shared);
worker.phase2_file_cursor = (i + 1) % n;
return StepResult::Success;
}
};
if blocks.is_empty() {
file.mark_reader_eof(&mut reader_guard);
drop(reader_guard);
shared.main_thread_handle.unpark();
Self::maybe_mark_all_eof(shared);
worker.phase2_file_cursor = (i + 1) % n;
return StepResult::Success;
}
let mut raw_guard = file.raw_blocks.lock().expect("phase2 raw_blocks mutex poisoned");
let start_serial = reader_guard.next_serial;
reader_guard.next_serial += blocks.len() as u64;
for (idx, b) in blocks.into_iter().enumerate() {
raw_guard.push_back((start_serial + idx as u64, b));
}
drop(raw_guard);
drop(reader_guard);
worker.phase2_file_cursor = (i + 1) % n;
return StepResult::Success;
}
StepResult::InputEmpty
}
fn try_pop_raw_for_decompress(file: &Phase2FileState) -> Option<(u64, RawBgzfBlock)> {
let mut raw_guard = file.raw_blocks.try_lock().ok()?;
let head_serial = raw_guard.front().map(|(s, _)| *s)?;
let admit = {
let dec_guard = file.decompressed.try_lock().ok()?;
dec_guard.len() < PHASE2_DECOMP_CAP
|| (!dec_guard.can_pop() && head_serial == dec_guard.next_seq())
};
if !admit {
return None;
}
let popped = raw_guard.pop_front();
if popped.is_some() {
file.decomp_in_flight.fetch_add(1, Ordering::AcqRel);
}
popped
}
fn try_compress(
shared: &SharedPipelineState,
compressor: &mut InlineBgzfCompressor,
) -> StepResult {
let Some(job) = shared.compress_queue.pop() else {
return StepResult::InputEmpty;
};
Self::handle_compress_job(shared, job, compressor);
StepResult::Success
}
fn maybe_mark_all_eof(shared: &SharedPipelineState) {
let eof_count = shared.sources_at_eof.fetch_add(1, Ordering::AcqRel) + 1;
let total = shared.total_sources.load(Ordering::Acquire);
if total > 0 && eof_count >= total {
shared.all_chunks_eof.store(true, Ordering::Release);
shared.main_thread_handle.unpark();
}
}
pub fn num_workers(&self) -> usize {
self.num_workers
}
pub(crate) fn phase1_queue_depths(&self) -> (usize, usize, usize) {
(
self.shared.raw_input_blocks.len(),
self.shared.decompressed_input.len(),
self.buffer_pool.len(),
)
}
pub(crate) fn decompressed_input_queue(
&self,
) -> Arc<crossbeam_queue::ArrayQueue<(u64, Vec<u8>)>> {
Arc::clone(&self.shared.decompressed_input)
}
pub(crate) fn decompressed_input_done_flag(&self) -> Arc<AtomicBool> {
Arc::clone(&self.shared.decompressed_input_done)
}
pub(crate) fn input_read_error_flag(&self) -> Arc<AtomicBool> {
Arc::clone(&self.shared.input_read_error)
}
pub(crate) fn chunk_read_error_flag(&self) -> Arc<AtomicBool> {
Arc::clone(&self.shared.chunk_read_error)
}
pub(crate) fn worker_panicked_flag(&self) -> Arc<AtomicBool> {
Arc::clone(&self.shared.worker_panicked)
}
pub(crate) fn decompress_error_flag(&self) -> Arc<AtomicBool> {
Arc::clone(&self.shared.decompression_error)
}
pub(crate) fn phase2_files(&self) -> Arc<Vec<Phase2FileState>> {
self.shared.phase2_files_snapshot()
}
pub fn set_phase(&self, new_phase: u8) {
self.shared.phase.store(new_phase, Ordering::Release);
}
pub fn set_input_file(&self, reader: Box<dyn Read + Send>) {
*self.shared.input_file.lock().expect("input_file mutex should not be poisoned") =
Some(reader);
}
pub fn set_phase2_files(&self, files: &[std::path::PathBuf]) -> anyhow::Result<()> {
let total_sources = files.len();
self.shared.total_sources.store(total_sources as u64, Ordering::Release);
self.shared.all_chunks_eof.store(false, Ordering::Release);
self.shared.sources_at_eof.store(0, Ordering::Release);
let mut states: Vec<Phase2FileState> = Vec::with_capacity(total_sources);
for path in files {
let file = std::fs::File::open(path).map_err(|e| {
anyhow::anyhow!("Failed to open chunk file {}: {e}", path.display())
})?;
let reader = BufReader::with_capacity(2 * 1024 * 1024, file);
states.push(Phase2FileState::new(reader));
}
let mut guard = self.shared.phase2_files.write().expect("phase2_files rwlock poisoned");
*guard = Arc::new(states);
Ok(())
}
pub fn clear_phase2_files(&self) {
let mut guard = self.shared.phase2_files.write().expect("phase2_files rwlock poisoned");
*guard = Arc::new(Vec::new());
}
pub fn submit_compress(&self, job: CompressJob) {
self.stats.compress_jobs_submitted.fetch_add(1, Ordering::Relaxed);
let mut job = job;
loop {
if self.shared.phase.load(Ordering::Acquire) == phase::SHUTDOWN {
return; }
match self.shared.compress_queue.push(job) {
Ok(()) => return,
Err(returned) => {
job = returned;
std::thread::yield_now();
}
}
}
}
pub fn compress_result_channel(&self) -> (Sender<CompressResult>, Receiver<CompressResult>) {
bounded(self.num_workers * 2)
}
pub fn shutdown(mut self) {
if log::log_enabled!(log::Level::Debug) {
self.stats.log_summary();
self.pipeline_stats.log_summary();
}
self.do_shutdown();
}
fn do_shutdown(&mut self) {
self.shared.phase.store(phase::SHUTDOWN, Ordering::Release);
if let Some(workers) = self.workers.take() {
for w in workers {
if w.join().is_err() {
self.shared.worker_panicked.store(true, Ordering::Release);
self.shared.main_thread_handle.unpark();
}
}
}
}
#[allow(clippy::cast_possible_truncation)]
fn nanos_u64(d: std::time::Duration) -> u64 {
d.as_nanos() as u64
}
fn handle_compress_job(
shared: &SharedPipelineState,
job: CompressJob,
compressor: &mut InlineBgzfCompressor,
) {
compressor
.write_all(&job.data)
.expect("BGZF compression write should not fail for valid data");
compressor.flush().expect("BGZF compression flush should not fail");
let blocks = compressor.take_blocks();
let mut compressed = Vec::new();
for block in &blocks {
compressed.extend_from_slice(&block.data);
}
let mut recycled = job.data;
recycled.clear();
let serial = job.serial;
let mut result = CompressResult { serial, compressed, recycled_buf: recycled };
loop {
match job.result_tx.try_send(result) {
Ok(()) => return,
Err(crossbeam_channel::TrySendError::Disconnected(_)) => {
log::warn!(
"compress result discarded (serial {serial}): I/O writer thread disconnected"
);
return;
}
Err(crossbeam_channel::TrySendError::Full(r)) => {
if shared.phase.load(Ordering::Acquire) == phase::SHUTDOWN {
return; }
result = r;
std::thread::yield_now();
}
}
}
}
}
impl Drop for SortWorkerPool {
fn drop(&mut self) {
self.do_shutdown();
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_buffer_pool_checkout_empty() {
let pool = BufferPool::new(4);
let buf = pool.checkout();
assert!(buf.is_empty());
}
#[test]
fn test_buffer_pool_recycle() {
let pool = BufferPool::new(4);
let mut buf = Vec::with_capacity(1024);
buf.extend_from_slice(b"hello");
pool.checkin(buf);
let recycled = pool.checkout();
assert!(recycled.is_empty());
assert!(recycled.capacity() >= 1024);
}
#[test]
fn test_pool_stats_log_summary() {
let stats = PoolStats::default();
stats.compress_jobs_submitted.fetch_add(42, Ordering::Relaxed);
stats.log_summary();
assert_eq!(stats.compress_jobs_submitted.load(Ordering::Relaxed), 42);
}
#[test]
fn test_pool_compress_roundtrip() {
let pool = SortWorkerPool::new(2, 1, 6);
let (result_tx, result_rx) = pool.compress_result_channel();
let data = vec![b'A'; 1000];
pool.submit_compress(CompressJob { data, serial: 0, result_tx });
let result = result_rx.recv().expect("should receive compress result");
assert_eq!(result.serial, 0);
assert!(!result.compressed.is_empty());
assert_eq!(&result.compressed[0..2], &[0x1f, 0x8b]);
assert!(result.recycled_buf.is_empty());
pool.shutdown();
}
#[test]
fn test_pool_many_jobs() {
let pool = SortWorkerPool::new(4, 1, 6);
let (result_tx, result_rx) = pool.compress_result_channel();
let num_jobs = 100usize;
let submit_tx = result_tx.clone();
let submit_handle = std::thread::spawn(move || {
for i in 0..num_jobs {
let data = vec![b'X'; 500 + i];
pool.submit_compress(CompressJob {
data,
serial: i as u64,
result_tx: submit_tx.clone(),
});
}
drop(submit_tx);
pool
});
drop(result_tx);
let mut received = 0;
while let Ok(_result) = result_rx.recv() {
received += 1;
}
assert_eq!(received, num_jobs);
let pool = submit_handle.join().expect("submit thread should not panic");
pool.shutdown();
}
#[test]
fn test_pool_stats() {
let pool = SortWorkerPool::new(2, 1, 6);
let (c_tx, c_rx) = pool.compress_result_channel();
pool.submit_compress(CompressJob { data: vec![b'A'; 100], serial: 0, result_tx: c_tx });
let _ = c_rx.recv();
assert_eq!(pool.stats.compress_jobs_submitted.load(Ordering::Relaxed), 1);
pool.shutdown();
}
#[test]
fn test_pipeline_stats_record_step_and_idle() {
let stats = SortPipelineStats::new(2);
stats.record_step(0, SortStep::ReadInputBlocks, 1_000_000);
stats.record_step(0, SortStep::ReadInputBlocks, 500_000);
stats.record_step(1, SortStep::DecompressInput, 2_000_000);
stats.record_step(0, SortStep::CompressSpill, 300_000);
stats.record_idle(0, 100_000);
stats.record_idle(1, 200_000);
let read_idx = SortStep::ReadInputBlocks as usize;
let decomp_idx = SortStep::DecompressInput as usize;
let compress_idx = SortStep::CompressSpill as usize;
assert_eq!(stats.step_count[read_idx].load(Ordering::Relaxed), 2);
assert_eq!(stats.step_ns[read_idx].load(Ordering::Relaxed), 1_500_000);
assert_eq!(stats.step_count[decomp_idx].load(Ordering::Relaxed), 1);
assert_eq!(stats.step_count[compress_idx].load(Ordering::Relaxed), 1);
assert_eq!(stats.per_thread_step_counts[0][read_idx].load(Ordering::Relaxed), 2);
assert_eq!(stats.per_thread_step_counts[1][decomp_idx].load(Ordering::Relaxed), 1);
assert_eq!(stats.per_thread_idle_ns[0].load(Ordering::Relaxed), 100_000);
assert_eq!(stats.per_thread_idle_ns[1].load(Ordering::Relaxed), 200_000);
}
#[test]
fn test_pipeline_stats_log_summary_does_not_panic() {
let stats = SortPipelineStats::new(4);
stats.record_step(0, SortStep::ReadInputBlocks, 1_000_000_000);
stats.record_step(1, SortStep::CompressSpill, 500_000_000);
stats.record_idle(0, 10_000_000);
stats.log_summary();
}
#[test]
fn test_buffer_pool_full_drops_excess() {
let pool = BufferPool::new(2);
pool.checkin(Vec::with_capacity(256));
pool.checkin(Vec::with_capacity(512));
pool.checkin(Vec::with_capacity(1024));
let a = pool.checkout();
let b = pool.checkout();
assert!(
a.capacity() > 0 || b.capacity() > 0,
"at least one pooled buffer should retain allocated capacity"
);
let fresh = pool.checkout();
assert_eq!(fresh.len(), 0);
assert_eq!(fresh.capacity(), 0, "fresh allocation has no pre-allocated capacity");
}
#[test]
fn test_sort_priorities_phase1_default_feeds_main_thread() {
let bp = SortBackpressureState {
decompressed_input_low: true,
input_eof: false,
decompressed_input_done: false,
compress_has_items: false,
phase: phase::PHASE1,
};
let priorities = get_sort_priorities(&bp);
assert_eq!(priorities[0], SortStep::DecompressInput);
}
#[test]
fn test_sort_priorities_phase1_compress_backpressure() {
let bp = SortBackpressureState {
decompressed_input_low: false,
input_eof: false,
decompressed_input_done: false,
compress_has_items: true,
phase: phase::PHASE1,
};
let priorities = get_sort_priorities(&bp);
assert_eq!(priorities[0], SortStep::CompressSpill);
}
#[test]
fn test_sort_priorities_phase1_all_done_returns_empty() {
let bp = SortBackpressureState {
decompressed_input_low: false,
input_eof: true,
decompressed_input_done: true,
compress_has_items: false,
phase: phase::PHASE1,
};
assert!(get_sort_priorities(&bp).is_empty());
}
#[test]
fn test_sort_priorities_phase2_default_feeds_merge_loop() {
let bp = SortBackpressureState {
decompressed_input_low: false,
input_eof: false,
decompressed_input_done: false,
compress_has_items: false,
phase: phase::PHASE2,
};
let priorities = get_sort_priorities(&bp);
assert_eq!(priorities[0], SortStep::Phase2FileWork);
}
#[test]
fn test_sort_priorities_phase2_compress_backpressure() {
let bp = SortBackpressureState {
decompressed_input_low: false,
input_eof: false,
decompressed_input_done: false,
compress_has_items: true,
phase: phase::PHASE2,
};
let priorities = get_sort_priorities(&bp);
assert_eq!(priorities[0], SortStep::CompressOutput);
}
#[test]
fn test_sort_priorities_phase2_after_eof_still_drains_files() {
let bp = SortBackpressureState {
decompressed_input_low: false,
input_eof: false,
decompressed_input_done: false,
compress_has_items: false,
phase: phase::PHASE2,
};
let priorities = get_sort_priorities(&bp);
assert_eq!(priorities[0], SortStep::Phase2FileWork);
}
#[test]
fn test_sort_priorities_legacy_returns_compress_only() {
let bp = SortBackpressureState {
decompressed_input_low: false,
input_eof: false,
decompressed_input_done: false,
compress_has_items: true,
phase: phase::LEGACY,
};
let priorities = get_sort_priorities(&bp);
assert_eq!(priorities.len(), 1);
assert_eq!(priorities[0], SortStep::CompressSpill);
}
#[test]
fn test_worker_pool_num_workers() {
let pool = SortWorkerPool::new(3, 1, 6);
assert_eq!(pool.num_workers(), 3);
pool.shutdown();
}
fn empty_phase2_file() -> Phase2FileState {
let tmp = tempfile::tempfile().expect("failed to create tempfile");
let reader = BufReader::with_capacity(1024, tmp);
Phase2FileState::new(reader)
}
fn dummy_raw_block(byte: u8) -> RawBgzfBlock {
RawBgzfBlock { data: vec![byte; 8] }
}
#[test]
fn test_admission_under_cap_admits() {
let file = empty_phase2_file();
file.raw_blocks.lock().expect("raw lock").push_back((0, dummy_raw_block(0)));
let popped = SortWorkerPool::try_pop_raw_for_decompress(&file);
assert!(popped.is_some(), "under cap with empty reorder buffer should admit");
assert_eq!(file.decomp_in_flight.load(Ordering::Acquire), 1);
assert!(file.raw_blocks.lock().expect("raw lock").is_empty());
}
#[test]
fn test_admission_at_cap_poppable_rejects() {
let file = empty_phase2_file();
{
let mut dec = file.decompressed.lock().expect("dec lock");
for s in 0..PHASE2_DECOMP_CAP as u64 {
dec.insert(s, vec![0u8; 4]);
}
assert_eq!(dec.len(), PHASE2_DECOMP_CAP);
assert!(dec.can_pop());
}
file.raw_blocks
.lock()
.expect("raw lock")
.push_back((PHASE2_DECOMP_CAP as u64, dummy_raw_block(1)));
let popped = SortWorkerPool::try_pop_raw_for_decompress(&file);
assert!(popped.is_none(), "at cap and poppable should reject (apply backpressure)");
assert_eq!(file.decomp_in_flight.load(Ordering::Acquire), 0);
assert_eq!(file.raw_blocks.lock().expect("raw lock").len(), 1);
}
#[test]
fn test_admission_at_cap_stuck_admits_gap_filler() {
let file = empty_phase2_file();
{
let mut dec = file.decompressed.lock().expect("dec lock");
for s in 1..=PHASE2_DECOMP_CAP as u64 {
dec.insert(s, vec![0u8; 4]);
}
assert_eq!(dec.len(), PHASE2_DECOMP_CAP);
assert!(!dec.can_pop(), "buffer should be stuck waiting for serial 0");
}
file.raw_blocks.lock().expect("raw lock").push_back((0, dummy_raw_block(0)));
let popped = SortWorkerPool::try_pop_raw_for_decompress(&file);
assert!(popped.is_some(), "at cap and stuck should admit gap-filler at next_seq");
assert_eq!(file.decomp_in_flight.load(Ordering::Acquire), 1);
}
#[test]
fn test_admission_at_cap_stuck_wrong_head_rejects() {
let file = empty_phase2_file();
{
let mut dec = file.decompressed.lock().expect("dec lock");
for s in 1..=PHASE2_DECOMP_CAP as u64 {
dec.insert(s, vec![0u8; 4]);
}
}
file.raw_blocks
.lock()
.expect("raw lock")
.push_back((PHASE2_DECOMP_CAP as u64 + 1, dummy_raw_block(2)));
let popped = SortWorkerPool::try_pop_raw_for_decompress(&file);
assert!(popped.is_none(), "at cap, stuck, but head != next_seq should reject");
assert_eq!(file.decomp_in_flight.load(Ordering::Acquire), 0);
}
#[test]
fn test_admission_empty_raw_returns_none() {
let file = empty_phase2_file();
let popped = SortWorkerPool::try_pop_raw_for_decompress(&file);
assert!(popped.is_none());
assert_eq!(file.decomp_in_flight.load(Ordering::Acquire), 0);
}
#[test]
fn test_is_drained_respects_in_flight_counter() {
let file = empty_phase2_file();
file.mark_reader_eof(&mut file.reader.lock().expect("reader lock"));
assert!(file.is_drained(), "reader_eof + empty queues + no in-flight should be drained");
file.decomp_in_flight.fetch_add(1, Ordering::AcqRel);
assert!(!file.is_drained(), "in-flight decompression must keep is_drained=false");
file.decomp_in_flight.fetch_sub(1, Ordering::AcqRel);
assert!(file.is_drained());
}
#[test]
fn test_is_drained_blocks_on_pending_raw() {
let file = empty_phase2_file();
file.mark_reader_eof(&mut file.reader.lock().expect("reader lock"));
file.raw_blocks.lock().expect("raw lock").push_back((0, dummy_raw_block(0)));
assert!(!file.is_drained(), "raw blocks pending must keep is_drained=false");
}
#[test]
fn test_is_drained_blocks_on_pending_decompressed() {
let file = empty_phase2_file();
file.mark_reader_eof(&mut file.reader.lock().expect("reader lock"));
file.decompressed.lock().expect("dec lock").insert(0, vec![1, 2, 3]);
assert!(!file.is_drained(), "decompressed blocks pending must keep is_drained=false");
}
}