use std::collections::BTreeMap;
use crossbeam_queue::ArrayQueue;
use noodles::sam::Header;
use parking_lot::Mutex;
use std::fs::File;
use std::io::{self, BufRead, BufReader, Write};
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::thread;
use crate::bgzf_reader::{
BGZF_EOF, BGZF_FOOTER_SIZE, BGZF_HEADER_SIZE, decompress_block_slice_into, read_raw_blocks,
};
use crate::bgzf_writer::InlineBgzfCompressor;
use crate::fastq_parse::FastqRecord;
use crate::grouper::FastqTemplate;
use crate::progress::ProgressTracker;
use crate::reorder_buffer::ReorderBuffer;
use libdeflater::Decompressor;
use super::base::{
ActiveSteps, CompressedBlockBatch, HasCompressor, HasHeldBoundaries, HasHeldCompressed,
HasHeldProcessed, HasHeldSerialized, HasRecycledBuffers, HasWorkerCore, MemoryEstimate,
MonitorableState, OutputPipelineQueues, OutputPipelineState, PipelineLifecycle, PipelineStats,
PipelineStep, PipelineValidationError, ProcessPipelineState, SerializePipelineState,
SerializedBatch, StepContext, WorkerCoreState, WorkerStateCommon, WritePipelineState,
finalize_pipeline, generic_worker_loop, handle_worker_panic, join_monitor_thread,
join_worker_threads, run_monitor_loop, shared_try_step_compress,
};
use super::deadlock::{DeadlockConfig, DeadlockState, QueueSnapshot};
use super::scheduler::{BackpressureState, SchedulerStrategy};
pub enum StreamReader<R: BufRead + Send> {
Bgzf(Box<dyn BufRead + Send>),
Decompressed(R),
}
#[derive(Debug, Clone)]
pub struct PerStreamChunk {
pub stream_idx: usize,
pub batch_num: u64,
pub data: Vec<u8>,
pub offsets: Option<Vec<usize>>,
}
impl PerStreamChunk {
#[must_use]
pub fn estimate_heap_size(&self) -> usize {
self.data.capacity()
+ self.offsets.as_ref().map_or(0, |o| o.capacity() * std::mem::size_of::<usize>())
}
}
pub(crate) struct PairState {
pending: BTreeMap<u64, Vec<Option<PerStreamChunk>>>,
next_emit: u64,
num_streams: usize,
}
impl PairState {
fn new(num_streams: usize) -> Self {
Self { pending: BTreeMap::new(), next_emit: 0, num_streams }
}
fn insert(&mut self, chunk: PerStreamChunk) {
let stream_idx = chunk.stream_idx;
let batch_num = chunk.batch_num;
let slots = self.pending.entry(batch_num).or_insert_with(|| vec![None; self.num_streams]);
slots[stream_idx] = Some(chunk);
}
fn try_pop_complete(&mut self, all_arrived: bool) -> Option<Vec<PerStreamChunk>> {
let slots = self.pending.get(&self.next_emit)?;
let complete = if all_arrived {
slots.iter().any(Option::is_some)
} else {
slots.iter().all(Option::is_some)
};
if !complete {
return None;
}
let slots = self
.pending
.remove(&self.next_emit)
.expect("next_emit key must exist in pending map after get() succeeded");
self.next_emit += 1;
Some(slots.into_iter().flatten().collect())
}
fn is_empty(&self) -> bool {
self.pending.is_empty()
}
}
#[derive(Debug, Clone)]
pub struct FastqDecompressedBatch {
pub chunks: Vec<FastqDecompressedChunk>,
pub serial: u64,
}
#[derive(Debug, Clone)]
pub struct FastqDecompressedChunk {
pub stream_idx: usize,
pub data: Vec<u8>,
}
impl MemoryEstimate for FastqDecompressedBatch {
fn estimate_heap_size(&self) -> usize {
self.chunks.iter().map(|c| c.data.capacity()).sum::<usize>()
+ self.chunks.capacity() * std::mem::size_of::<FastqDecompressedChunk>()
}
}
#[derive(Debug, Clone)]
pub struct FastqBoundaryBatch {
pub streams: Vec<FastqStreamBoundaries>,
pub serial: u64,
}
#[derive(Debug, Clone)]
pub struct FastqStreamBoundaries {
pub stream_idx: usize,
pub data: Vec<u8>,
pub offsets: Vec<usize>,
}
impl MemoryEstimate for FastqBoundaryBatch {
fn estimate_heap_size(&self) -> usize {
self.streams
.iter()
.map(|s| s.data.capacity() + s.offsets.capacity() * std::mem::size_of::<usize>())
.sum::<usize>()
+ self.streams.capacity() * std::mem::size_of::<FastqStreamBoundaries>()
}
}
#[derive(Debug, Clone)]
pub struct FastqParsedStream {
pub stream_idx: usize,
pub records: Vec<FastqRecord>,
}
#[derive(Debug, Clone)]
pub struct FastqParsedBatch {
pub streams: Vec<FastqParsedStream>,
pub serial: u64,
}
impl MemoryEstimate for FastqParsedBatch {
fn estimate_heap_size(&self) -> usize {
self.streams
.iter()
.map(|stream| {
stream.records.iter().map(MemoryEstimate::estimate_heap_size).sum::<usize>()
+ stream.records.capacity() * std::mem::size_of::<FastqRecord>()
})
.sum::<usize>()
+ self.streams.capacity() * std::mem::size_of::<FastqParsedStream>()
}
}
#[derive(Debug, Clone, Default)]
pub struct StreamBoundaryState {
pub leftover: Vec<u8>,
pub work_buffer: Vec<u8>,
}
#[derive(Debug, Default)]
pub struct FastqBoundaryState {
pub stream_states: Vec<parking_lot::Mutex<StreamBoundaryState>>,
}
impl FastqBoundaryState {
#[must_use]
pub fn new(num_streams: usize) -> Self {
Self {
stream_states: (0..num_streams)
.map(|_| parking_lot::Mutex::new(StreamBoundaryState::default()))
.collect(),
}
}
}
pub struct FastqFormat;
impl FastqFormat {
pub fn find_boundaries(
state: &FastqBoundaryState,
batch: FastqDecompressedBatch,
) -> io::Result<FastqBoundaryBatch> {
let max_stream = batch.chunks.iter().map(|c| c.stream_idx).max().unwrap_or(0);
assert!(
state.stream_states.len() > max_stream,
"FastqBoundaryState not initialized for stream {max_stream}"
);
let mut streams_with_chunks = vec![false; state.stream_states.len()];
let mut streams = Vec::with_capacity(state.stream_states.len());
for chunk in batch.chunks {
let stream_idx = chunk.stream_idx;
streams_with_chunks[stream_idx] = true;
let mut stream_state = state.stream_states[stream_idx].lock();
stream_state.work_buffer.clear();
let leftover = std::mem::take(&mut stream_state.leftover);
if !leftover.is_empty() {
stream_state.work_buffer.extend_from_slice(&leftover);
}
stream_state.work_buffer.extend_from_slice(&chunk.data);
let (data, offsets, leftover_start) =
find_fastq_boundaries_inplace(&stream_state.work_buffer);
stream_state.leftover = stream_state.work_buffer[leftover_start..].to_vec();
streams.push(FastqStreamBoundaries { stream_idx, data, offsets });
}
if state.stream_states.len() > 1 {
for (stream_idx, &had_chunk) in streams_with_chunks.iter().enumerate() {
if had_chunk {
continue; }
let mut stream_state = state.stream_states[stream_idx].lock();
if stream_state.leftover.is_empty() {
continue; }
stream_state.work_buffer.clear();
let leftover = std::mem::take(&mut stream_state.leftover);
stream_state.work_buffer.extend_from_slice(&leftover);
let (data, offsets, leftover_start) =
find_fastq_boundaries_inplace(&stream_state.work_buffer);
stream_state.leftover = stream_state.work_buffer[leftover_start..].to_vec();
streams.push(FastqStreamBoundaries { stream_idx, data, offsets });
}
}
if streams.len() > 1 {
let min_records =
streams.iter().map(|s| s.offsets.len().saturating_sub(1)).min().unwrap_or(0);
for stream in &mut streams {
let record_count = stream.offsets.len().saturating_sub(1);
if record_count > min_records {
let excess_start = stream.offsets[min_records];
let mut stream_state = state.stream_states[stream.stream_idx].lock();
let excess_bytes = stream.data[excess_start..].to_vec();
let incomplete_leftover = std::mem::take(&mut stream_state.leftover);
stream_state.leftover = excess_bytes;
stream_state.leftover.extend(incomplete_leftover);
stream.data.truncate(excess_start);
stream.offsets.truncate(min_records + 1);
}
}
}
Ok(FastqBoundaryBatch { streams, serial: batch.serial })
}
pub fn parse_records(batch: FastqBoundaryBatch) -> io::Result<FastqParsedBatch> {
let streams = batch
.streams
.into_iter()
.map(|stream| {
let records = parse_fastq_records_from_boundaries(&stream.data, &stream.offsets)?;
Ok(FastqParsedStream { stream_idx: stream.stream_idx, records })
})
.collect::<io::Result<Vec<_>>>()?;
Ok(FastqParsedBatch { streams, serial: batch.serial })
}
}
fn find_fastq_boundaries_inplace(data: &[u8]) -> (Vec<u8>, Vec<usize>, usize) {
if data.is_empty() {
return (Vec::new(), vec![0], 0);
}
let offsets = fgumi_simd_fastq::find_record_offsets(data);
let last_offset = offsets.last().copied().unwrap_or(0);
let complete_data = data[..last_offset].to_vec();
(complete_data, offsets, last_offset)
}
fn parse_fastq_records_from_boundaries(
data: &[u8],
offsets: &[usize],
) -> io::Result<Vec<FastqRecord>> {
if offsets.len() <= 1 {
return Ok(Vec::new());
}
let num_records = offsets.len() - 1;
let mut records = Vec::with_capacity(num_records);
for i in 0..num_records {
let start = offsets[i];
let end = offsets[i + 1];
if start >= end || start >= data.len() {
continue;
}
let record_data = &data[start..end.min(data.len())];
let record = parse_single_fastq_record(record_data)?;
records.push(record);
}
Ok(records)
}
fn parse_single_fastq_record(data: &[u8]) -> io::Result<FastqRecord> {
FastqRecord::from_slice(data)
}
const MAX_BATCHES_PER_LOCK: usize = 8;
#[derive(Debug)]
pub struct BlockParsed {
pub block_idx: u64,
pub stream_idx: usize,
pub records: Vec<FastqRecord>,
pub prefix_bytes: Vec<u8>,
pub suffix_bytes: Vec<u8>,
}
impl MemoryEstimate for BlockParsed {
fn estimate_heap_size(&self) -> usize {
self.records.iter().map(MemoryEstimate::estimate_heap_size).sum::<usize>()
+ self.records.capacity() * std::mem::size_of::<FastqRecord>()
+ self.prefix_bytes.capacity()
+ self.suffix_bytes.capacity()
}
}
pub(crate) struct BlockMergeState {
r1_pending: BTreeMap<u64, BlockParsed>,
r2_pending: BTreeMap<u64, BlockParsed>,
r1_next: u64,
r2_next: u64,
r1_suffix_bytes: Vec<u8>,
r2_suffix_bytes: Vec<u8>,
r1_surplus: Vec<FastqRecord>,
r2_surplus: Vec<FastqRecord>,
serial_out: u64,
pending_heap_bytes: u64,
}
impl BlockMergeState {
fn new() -> Self {
Self {
r1_pending: BTreeMap::new(),
r2_pending: BTreeMap::new(),
r1_next: 0,
r2_next: 0,
r1_suffix_bytes: Vec::new(),
r2_suffix_bytes: Vec::new(),
r1_surplus: Vec::new(),
r2_surplus: Vec::new(),
serial_out: 0,
pending_heap_bytes: 0,
}
}
fn is_empty(&self) -> bool {
let empty = self.r1_pending.is_empty()
&& self.r2_pending.is_empty()
&& self.r1_surplus.is_empty()
&& self.r2_surplus.is_empty()
&& self.r1_suffix_bytes.is_empty()
&& self.r2_suffix_bytes.is_empty();
debug_assert!(
!empty || self.pending_heap_bytes == 0,
"pending_heap_bytes={} but maps are empty",
self.pending_heap_bytes
);
empty
}
}
fn detect_prefix_end(data: &[u8]) -> usize {
if data.is_empty() {
return 0;
}
let mut newlines = [0usize; 8];
let mut count = 0;
for (i, &b) in data.iter().enumerate() {
if b == b'\n' {
newlines[count] = i;
count += 1;
if count == 8 {
break;
}
}
}
if count < 4 {
return data.len(); }
for start in 0..count.saturating_sub(3) {
let line0_start = if start == 0 { 0 } else { newlines[start - 1] + 1 };
let line0_end = newlines[start]; let line1_end = newlines[start + 1]; let line2_end = newlines[start + 2]; let line3_end = newlines[start + 3];
if data[line0_start] != b'@' {
continue;
}
let line2_start = line1_end + 1;
if line2_start >= data.len() || data[line2_start] != b'+' {
continue;
}
let seq_len = line1_end - (line0_end + 1);
let qual_len = line3_end - (line2_end + 1);
if seq_len != qual_len {
continue;
}
return line0_start;
}
data.len()
}
fn detect_suffix_start(data: &[u8]) -> usize {
if data.is_empty() {
return 0;
}
let mut newlines = [0usize; 8];
let mut count = 0;
let mut i = data.len();
while i > 0 {
i -= 1;
if data[i] == b'\n' {
newlines[count] = i;
count += 1;
if count == 8 {
break;
}
}
}
if count < 4 {
return 0;
}
newlines[..count].reverse();
let window_start = count.saturating_sub(4);
for start in (0..=window_start).rev() {
if start + 3 >= count {
continue;
}
let line0_start = if start == 0 { 0 } else { newlines[start - 1] + 1 };
let line0_end = newlines[start];
let line1_end = newlines[start + 1];
let line2_end = newlines[start + 2];
let line3_end = newlines[start + 3];
if data[line0_start] != b'@' {
continue;
}
let line2_start = line1_end + 1;
if line2_start >= data.len() || data[line2_start] != b'+' {
continue;
}
let seq_len = line1_end - (line0_end + 1);
let qual_len = line3_end - (line2_end + 1);
if seq_len != qual_len {
continue;
}
return line3_end + 1;
}
0
}
fn stitch_cross_block_record(
suffix_bytes: &[u8],
prefix_bytes: &[u8],
) -> io::Result<Option<FastqRecord>> {
if suffix_bytes.is_empty() && prefix_bytes.is_empty() {
return Ok(None);
}
let mut combined = Vec::with_capacity(suffix_bytes.len() + prefix_bytes.len());
combined.extend_from_slice(suffix_bytes);
combined.extend_from_slice(prefix_bytes);
let record = FastqRecord::from_slice(&combined)?;
Ok(Some(record))
}
#[derive(Debug, Clone)]
#[allow(clippy::struct_excessive_bools)]
pub struct FastqPipelineConfig {
pub num_threads: usize,
pub queue_capacity: usize,
pub compression_level: u32,
pub collect_stats: bool,
pub inputs_are_bgzf: bool,
pub batch_size: usize,
pub scheduler_strategy: SchedulerStrategy,
pub queue_memory_limit: u64,
pub deadlock_timeout_secs: u64,
pub deadlock_recover_enabled: bool,
pub shared_stats: Option<Arc<PipelineStats>>,
pub records_per_batch: usize,
pub async_reader: bool,
}
impl FastqPipelineConfig {
#[must_use]
pub fn new(num_threads: usize, inputs_are_bgzf: bool, compression_level: u32) -> Self {
let threads = num_threads.max(1);
let queue_capacity = (threads * 128).min(1024);
let records_per_batch = (200 * threads.min(4)).min(800);
Self {
num_threads: threads,
queue_capacity,
compression_level,
collect_stats: false,
inputs_are_bgzf,
batch_size: 400, scheduler_strategy: SchedulerStrategy::default(),
queue_memory_limit: 4 * 1024 * 1024 * 1024, deadlock_timeout_secs: 10, deadlock_recover_enabled: false,
shared_stats: None, records_per_batch,
async_reader: false,
}
}
#[must_use]
pub fn with_stats(mut self, enabled: bool) -> Self {
self.collect_stats = enabled;
self
}
#[must_use]
pub fn with_compression_level(mut self, level: u32) -> Self {
self.compression_level = level.min(12);
self
}
#[must_use]
pub fn with_queue_capacity(mut self, capacity: usize) -> Self {
self.queue_capacity = capacity.max(8);
self
}
#[must_use]
pub fn with_batch_size(mut self, size: usize) -> Self {
self.batch_size = size.max(1);
self
}
#[must_use]
pub fn with_scheduler_strategy(mut self, strategy: SchedulerStrategy) -> Self {
self.scheduler_strategy = strategy;
self
}
#[must_use]
pub fn with_queue_memory_limit(mut self, limit_bytes: u64) -> Self {
self.queue_memory_limit = limit_bytes;
self
}
#[must_use]
pub fn with_deadlock_timeout(mut self, timeout_secs: u64) -> Self {
self.deadlock_timeout_secs = timeout_secs;
self
}
#[must_use]
pub fn with_deadlock_recovery(mut self, enabled: bool) -> Self {
self.deadlock_recover_enabled = enabled;
self
}
#[must_use]
pub fn with_async_reader(mut self, enabled: bool) -> Self {
self.async_reader = enabled;
self
}
#[must_use]
pub fn active_steps(&self) -> ActiveSteps {
ActiveSteps::new(&[
PipelineStep::Read,
PipelineStep::Decompress,
PipelineStep::FindBoundaries,
PipelineStep::Decode,
PipelineStep::Process,
PipelineStep::Serialize,
PipelineStep::Compress,
PipelineStep::Write,
])
}
}
const DEFAULT_BLOCKS_PER_BATCH: usize = 4;
const Q2_BLOCK_PARSED_BACKPRESSURE_BYTES: u64 = 128 * 1024 * 1024;
const PENDING_BACKPRESSURE_BYTES: u64 = 256 * 1024 * 1024;
fn read_n_fastq_records<R: BufRead>(
reader: &mut R,
n: usize,
) -> io::Result<(Vec<u8>, Vec<usize>, bool)> {
let mut data = Vec::with_capacity(n * 300);
let mut offsets = Vec::with_capacity(n + 1);
offsets.push(0);
let mut at_eof = false;
for _ in 0..n {
let record_start = data.len();
let mut lines_read = 0;
for _ in 0..4 {
let before = data.len();
let bytes_read = reader.read_until(b'\n', &mut data)?;
if bytes_read == 0 {
data.truncate(record_start);
at_eof = true;
break;
}
lines_read += 1;
if data[data.len() - 1] != b'\n' {
data.push(b'\n');
at_eof = true;
}
if lines_read == 1 && data[before] != b'@' {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"Expected FASTQ record to start with '@', got '{}'",
data[before] as char
),
));
}
}
if lines_read < 4 {
break;
}
offsets.push(data.len());
if at_eof {
break;
}
}
Ok((data, offsets, at_eof))
}
fn estimate_uncompressed_size(raw_data: &[u8]) -> usize {
let mut total = 0;
let mut offset = 0;
while offset + BGZF_HEADER_SIZE + BGZF_FOOTER_SIZE <= raw_data.len() {
let bsize = u16::from_le_bytes([raw_data[offset + 16], raw_data[offset + 17]]) as usize;
let block_size = bsize + 1;
if offset + block_size > raw_data.len() {
break;
}
let isize_offset = offset + block_size - 4;
if isize_offset + 4 <= raw_data.len() {
let isize = u32::from_le_bytes([
raw_data[isize_offset],
raw_data[isize_offset + 1],
raw_data[isize_offset + 2],
raw_data[isize_offset + 3],
]) as usize;
total += isize;
}
offset += block_size;
}
total
}
fn decompress_bgzf_chunk(raw_data: &[u8], decompressor: &mut Decompressor) -> io::Result<Vec<u8>> {
let estimated_size = estimate_uncompressed_size(raw_data);
let mut result = Vec::with_capacity(estimated_size);
let mut offset = 0;
while offset < raw_data.len() {
if offset + BGZF_HEADER_SIZE > raw_data.len() {
break; }
let bsize = u16::from_le_bytes([raw_data[offset + 16], raw_data[offset + 17]]) as usize;
let block_size = bsize + 1;
if offset + block_size > raw_data.len() {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"BGZF block extends past chunk: offset={}, block_size={}, chunk_len={}",
offset,
block_size,
raw_data.len()
),
));
}
decompress_block_slice_into(
&raw_data[offset..offset + block_size],
decompressor,
&mut result,
)?;
offset += block_size;
}
Ok(result)
}
fn align_stream_records(
mut streams: Vec<FastqStreamBoundaries>,
serial: u64,
) -> FastqBoundaryBatch {
if streams.len() > 1 {
let min_records =
streams.iter().map(|s| s.offsets.len().saturating_sub(1)).min().unwrap_or(0);
for stream in &mut streams {
let record_count = stream.offsets.len().saturating_sub(1);
if record_count > min_records && min_records > 0 {
let excess_start = stream.offsets[min_records];
stream.data.truncate(excess_start);
stream.offsets.truncate(min_records + 1);
}
}
}
FastqBoundaryBatch { streams, serial }
}
const SERIALIZATION_BUFFER_CAPACITY: usize = 256 * 1024;
pub struct FastqWorkerState<P: Send> {
pub core: WorkerCoreState,
pub decompressor: Decompressor,
pub next_stream: usize,
pub held_chunk: Option<(u64, PerStreamChunk)>,
pub held_decompressed_chunk: Option<(u64, PerStreamChunk)>,
pub held_boundaries: Option<(u64, FastqBoundaryBatch)>,
pub held_block_parsed: Option<BlockParsed>,
pub held_parsed: Option<(u64, Vec<FastqTemplate>, usize)>,
pub held_processed: Option<(u64, Vec<P>, usize)>,
pub held_serialized: Option<(u64, SerializedBatch, usize)>,
pub held_compressed: Option<(u64, CompressedBlockBatch, usize)>,
}
impl<P: Send> FastqWorkerState<P> {
#[must_use]
pub fn new(
compression_level: u32,
thread_id: usize,
num_threads: usize,
scheduler_strategy: SchedulerStrategy,
active_steps: ActiveSteps,
) -> Self {
Self {
core: WorkerCoreState::new(
compression_level,
thread_id,
num_threads,
scheduler_strategy,
active_steps,
),
decompressor: Decompressor::new(),
next_stream: thread_id, held_chunk: None,
held_decompressed_chunk: None,
held_boundaries: None,
held_block_parsed: None,
held_parsed: None,
held_processed: None,
held_serialized: None,
held_compressed: None,
}
}
#[must_use]
pub fn has_any_held_items(&self) -> bool {
self.held_chunk.is_some()
|| self.held_decompressed_chunk.is_some()
|| self.held_boundaries.is_some()
|| self.held_block_parsed.is_some()
|| self.held_parsed.is_some()
|| self.held_processed.is_some()
|| self.held_serialized.is_some()
|| self.held_compressed.is_some()
}
pub fn clear_held_items(&mut self) {
self.held_chunk = None;
self.held_decompressed_chunk = None;
self.held_boundaries = None;
self.held_block_parsed = None;
self.held_parsed = None;
self.held_processed = None;
self.held_serialized = None;
self.held_compressed = None;
}
}
impl<P: Send> HasCompressor for FastqWorkerState<P> {
fn compressor_mut(&mut self) -> &mut InlineBgzfCompressor {
&mut self.core.compressor
}
}
impl<P: Send> HasRecycledBuffers for FastqWorkerState<P> {
fn take_or_alloc_buffer(&mut self, capacity: usize) -> Vec<u8> {
self.core.take_or_alloc_buffer(capacity)
}
fn recycle_buffer(&mut self, buf: Vec<u8>) {
self.core.recycle_buffer(buf);
}
}
impl<P: Send> HasHeldCompressed for FastqWorkerState<P> {
fn held_compressed_mut(&mut self) -> &mut Option<(u64, CompressedBlockBatch, usize)> {
&mut self.held_compressed
}
}
impl<P: Send> HasHeldBoundaries<FastqBoundaryBatch> for FastqWorkerState<P> {
fn held_boundaries_mut(&mut self) -> &mut Option<(u64, FastqBoundaryBatch)> {
&mut self.held_boundaries
}
}
impl<P: Send> HasHeldProcessed<P> for FastqWorkerState<P> {
fn held_processed_mut(&mut self) -> &mut Option<(u64, Vec<P>, usize)> {
&mut self.held_processed
}
}
impl<P: Send> HasHeldSerialized for FastqWorkerState<P> {
fn held_serialized_mut(&mut self) -> &mut Option<(u64, SerializedBatch, usize)> {
&mut self.held_serialized
}
fn serialization_buffer_mut(&mut self) -> &mut Vec<u8> {
&mut self.core.serialization_buffer
}
fn serialization_buffer_capacity(&self) -> usize {
SERIALIZATION_BUFFER_CAPACITY }
}
impl<P: Send> WorkerStateCommon for FastqWorkerState<P> {
fn has_any_held_items(&self) -> bool {
FastqWorkerState::has_any_held_items(self)
}
fn clear_held_items(&mut self) {
FastqWorkerState::clear_held_items(self);
}
}
impl<P: Send> HasWorkerCore for FastqWorkerState<P> {
fn core(&self) -> &WorkerCoreState {
&self.core
}
fn core_mut(&mut self) -> &mut WorkerCoreState {
&mut self.core
}
}
pub struct FastqPipelineState<R: BufRead + Send, P: Send + MemoryEstimate> {
pub config: FastqPipelineConfig,
pub readers: Vec<Mutex<Option<StreamReader<R>>>>,
pub batch_counters: Vec<AtomicU64>,
pub stream_eof: Vec<AtomicBool>,
pub num_streams: usize,
pub blocks_per_batch: usize,
pub records_per_batch: usize,
pub read_done: AtomicBool,
pub batches_read: AtomicU64,
pub q0_chunks: ArrayQueue<(u64, PerStreamChunk)>,
pub q1_decompressed: ArrayQueue<(u64, PerStreamChunk)>,
pub chunks_block_parsed: AtomicU64,
pub q2_block_parsed: ArrayQueue<BlockParsed>,
pub q2_block_parsed_heap_bytes: AtomicU64,
pub(crate) block_merge_state: Mutex<BlockMergeState>,
pub block_merge_done: AtomicBool,
pub blocks_merged: AtomicU64,
pub chunks_paired: AtomicU64,
pub(crate) pair_state: Mutex<PairState>,
pub boundary_state: FastqBoundaryState,
pub boundaries_done: AtomicBool,
pub batches_boundaries_found: AtomicU64,
pub q2_5_boundaries: ArrayQueue<(u64, FastqBoundaryBatch)>,
pub q2_5_boundaries_heap_bytes: AtomicU64,
pub parse_done: AtomicBool,
pub batches_parsed: AtomicU64,
pub batches_grouped: AtomicU64,
pub group_done: AtomicBool,
pub total_templates_pushed: AtomicU64,
pub total_records_serialized: AtomicU64,
pub output: OutputPipelineQueues<FastqTemplate, P>,
pub deadlock_state: DeadlockState,
}
impl<R: BufRead + Send, P: Send + MemoryEstimate> FastqPipelineState<R, P> {
#[must_use]
pub fn new(
config: FastqPipelineConfig,
readers: Vec<StreamReader<R>>,
output: Box<dyn Write + Send>,
) -> Self {
let cap = config.queue_capacity;
let num_streams = readers.len();
let stats = if config.collect_stats {
config.shared_stats.clone().or_else(|| Some(Arc::new(PipelineStats::new())))
} else {
None
};
let deadlock_config =
DeadlockConfig::new(config.deadlock_timeout_secs, config.deadlock_recover_enabled);
let memory_limit = config.queue_memory_limit;
let deadlock_state = DeadlockState::new(&deadlock_config, memory_limit);
let per_stream_readers: Vec<Mutex<Option<StreamReader<R>>>> =
readers.into_iter().map(|r| Mutex::new(Some(r))).collect();
let batch_counters: Vec<AtomicU64> = (0..num_streams).map(|_| AtomicU64::new(0)).collect();
let stream_eof: Vec<AtomicBool> =
(0..num_streams).map(|_| AtomicBool::new(false)).collect();
Self {
readers: per_stream_readers,
batch_counters,
stream_eof,
num_streams,
blocks_per_batch: DEFAULT_BLOCKS_PER_BATCH,
records_per_batch: config.records_per_batch,
read_done: AtomicBool::new(false),
batches_read: AtomicU64::new(0),
q0_chunks: ArrayQueue::new(cap),
q1_decompressed: ArrayQueue::new(cap),
chunks_block_parsed: AtomicU64::new(0),
q2_block_parsed: ArrayQueue::new(cap),
q2_block_parsed_heap_bytes: AtomicU64::new(0),
block_merge_state: Mutex::new(BlockMergeState::new()),
block_merge_done: AtomicBool::new(false),
blocks_merged: AtomicU64::new(0),
chunks_paired: AtomicU64::new(0),
pair_state: Mutex::new(PairState::new(num_streams)),
boundary_state: FastqBoundaryState::new(num_streams),
boundaries_done: AtomicBool::new(false),
batches_boundaries_found: AtomicU64::new(0),
q2_5_boundaries: ArrayQueue::new(cap),
q2_5_boundaries_heap_bytes: AtomicU64::new(0),
parse_done: AtomicBool::new(false),
batches_parsed: AtomicU64::new(0),
batches_grouped: AtomicU64::new(0),
group_done: AtomicBool::new(false),
total_templates_pushed: AtomicU64::new(0),
total_records_serialized: AtomicU64::new(0),
output: OutputPipelineQueues::new(
cap,
output,
stats,
"Processed records",
memory_limit,
),
deadlock_state,
config,
}
}
pub fn set_error(&self, error: io::Error) {
self.output.set_error(error);
}
pub fn has_error(&self) -> bool {
self.output.has_error()
}
#[must_use]
pub fn is_q2_block_parsed_memory_high(&self) -> bool {
self.q2_block_parsed_heap_bytes.load(Ordering::Acquire)
>= Q2_BLOCK_PARSED_BACKPRESSURE_BYTES
}
#[must_use]
pub fn is_q4_memory_high(&self) -> bool {
self.output.is_processed_memory_high()
}
#[must_use]
pub fn is_draining(&self) -> bool {
self.output.is_draining()
}
pub fn take_error(&self) -> Option<io::Error> {
self.output.take_error()
}
pub fn is_complete(&self) -> bool {
let read_done = self.read_done.load(Ordering::Acquire);
let group_done = self.group_done.load(Ordering::Acquire);
if !read_done || !group_done {
log::trace!("is_complete: read_done={read_done}, group_done={group_done}");
return false;
}
if self.config.inputs_are_bgzf {
let block_merge_done = self.block_merge_done.load(Ordering::Acquire);
let parse_done = self.parse_done.load(Ordering::Acquire);
if !block_merge_done || !parse_done {
log::debug!(
"is_complete: BGZF flags not done: block_merge_done={block_merge_done}, parse_done={parse_done}"
);
return false;
}
if !self.q0_chunks.is_empty()
|| !self.q1_decompressed.is_empty()
|| !self.q2_block_parsed.is_empty()
{
return false;
}
} else {
let boundaries_done = self.boundaries_done.load(Ordering::Acquire);
let parse_done = self.parse_done.load(Ordering::Acquire);
if !boundaries_done || !parse_done {
log::debug!(
"is_complete: gzip flags not done: boundaries_done={boundaries_done}, parse_done={parse_done}"
);
return false;
}
if !self.q0_chunks.is_empty() || !self.q1_decompressed.is_empty() {
return false;
}
if !self.q2_5_boundaries.is_empty() {
log::trace!(
"is_complete: q2_5_boundaries not empty: {}",
self.q2_5_boundaries.len()
);
return false;
}
}
self.output.are_queues_empty()
}
#[must_use]
pub fn stats(&self) -> Option<&PipelineStats> {
self.output.stats.as_deref()
}
#[must_use]
pub fn progress(&self) -> &ProgressTracker {
&self.output.progress
}
#[must_use]
pub fn items_written(&self) -> u64 {
self.output.items_written.load(Ordering::Relaxed)
}
pub fn set_draining(&self, value: bool) {
self.output.set_draining(value);
}
pub fn flush_output(&self) -> io::Result<()> {
if let Some(mut writer) = self.output.output.lock().take() {
writer.write_all(&BGZF_EOF)?;
writer.flush()?;
}
Ok(())
}
#[allow(clippy::too_many_lines)]
pub fn validate_completion(&self) -> Result<(), PipelineValidationError> {
let mut non_empty_queues = Vec::new();
let mut counter_mismatches = Vec::new();
if !self.q0_chunks.is_empty() {
non_empty_queues.push(format!("q0_chunks ({})", self.q0_chunks.len()));
}
if !self.q1_decompressed.is_empty() {
non_empty_queues.push(format!("q1_decompressed ({})", self.q1_decompressed.len()));
}
if self.config.inputs_are_bgzf {
if !self.q2_block_parsed.is_empty() {
non_empty_queues.push(format!("q2_block_parsed ({})", self.q2_block_parsed.len()));
}
{
let merge = self.block_merge_state.lock();
if !merge.is_empty() {
non_empty_queues.push("block_merge_state (non-empty)".to_string());
}
}
} else {
if !self.q2_5_boundaries.is_empty() {
non_empty_queues.push(format!("q2_5_boundaries ({})", self.q2_5_boundaries.len()));
}
{
let pair = self.pair_state.lock();
if !pair.is_empty() {
non_empty_queues.push("pair_state (non-empty)".to_string());
}
}
for (idx, stream_state) in self.boundary_state.stream_states.iter().enumerate() {
let leftover_len = stream_state.lock().leftover.len();
if leftover_len > 0 {
non_empty_queues.push(format!("boundary_leftover[{idx}] ({leftover_len})"));
}
}
}
if !self.output.groups.is_empty() {
non_empty_queues.push(format!("q3_templates ({})", self.output.groups.len()));
}
if !self.output.processed.is_empty() {
non_empty_queues.push(format!("q4_processed ({})", self.output.processed.len()));
}
if !self.output.serialized.is_empty() {
non_empty_queues.push(format!("q5_serialized ({})", self.output.serialized.len()));
}
if !self.output.compressed.is_empty() {
non_empty_queues.push(format!("q6_compressed ({})", self.output.compressed.len()));
}
{
let write_reorder = self.output.write_reorder.lock();
if !write_reorder.is_empty() {
non_empty_queues.push(format!("write_reorder ({})", write_reorder.len()));
}
}
if !self.config.inputs_are_bgzf {
let batches_grouped = self.batches_grouped.load(Ordering::Acquire);
let batches_boundaries_found = self.batches_boundaries_found.load(Ordering::Acquire);
let batches_parsed = self.batches_parsed.load(Ordering::Acquire);
if batches_parsed != batches_boundaries_found {
counter_mismatches.push(format!(
"batches_parsed ({batches_parsed}) != batches_boundaries_found ({batches_boundaries_found})"
));
}
if batches_grouped != batches_parsed {
counter_mismatches.push(format!(
"batches_grouped ({batches_grouped}) != batches_parsed ({batches_parsed})"
));
}
}
let leaked_heap_bytes = 0u64;
if !non_empty_queues.is_empty() || !counter_mismatches.is_empty() {
return Err(PipelineValidationError {
non_empty_queues,
counter_mismatches,
leaked_heap_bytes,
});
}
Ok(())
}
}
impl<R: BufRead + Send + 'static, P: Send + MemoryEstimate + 'static> PipelineLifecycle
for FastqPipelineState<R, P>
{
fn is_complete(&self) -> bool {
FastqPipelineState::is_complete(self)
}
fn has_error(&self) -> bool {
FastqPipelineState::has_error(self)
}
fn take_error(&self) -> Option<io::Error> {
FastqPipelineState::take_error(self)
}
fn set_error(&self, error: io::Error) {
self.output.set_error(error);
}
fn is_draining(&self) -> bool {
self.output.is_draining()
}
fn set_draining(&self, value: bool) {
FastqPipelineState::set_draining(self, value);
}
fn stats(&self) -> Option<&PipelineStats> {
FastqPipelineState::stats(self)
}
fn progress(&self) -> &ProgressTracker {
FastqPipelineState::progress(self)
}
fn items_written(&self) -> u64 {
FastqPipelineState::items_written(self)
}
fn flush_output(&self) -> io::Result<()> {
FastqPipelineState::flush_output(self)
}
fn validate_completion(&self) -> Result<(), PipelineValidationError> {
FastqPipelineState::validate_completion(self)
}
}
impl<R: BufRead + Send + 'static, P: Send + MemoryEstimate + 'static> MonitorableState
for FastqPipelineState<R, P>
{
fn deadlock_state(&self) -> &DeadlockState {
&self.deadlock_state
}
fn build_queue_snapshot(&self) -> QueueSnapshot {
let parse_done = self.parse_done.load(Ordering::Relaxed);
let batches_read = self.batches_read.load(Ordering::Relaxed);
let (q2b_len, extra_state) = if self.config.inputs_are_bgzf {
let block_merge_done = self.block_merge_done.load(Ordering::Relaxed);
let chunks_bp = self.chunks_block_parsed.load(Ordering::Relaxed);
let blocks_merged = self.blocks_merged.load(Ordering::Relaxed);
let q2_heap_mb =
self.q2_block_parsed_heap_bytes.load(Ordering::Relaxed) / (1024 * 1024);
(
self.q2_block_parsed.len(),
Some(format!(
"block_merge_done={block_merge_done}, parse_done={parse_done}, batches: read={batches_read} block_parsed={chunks_bp} merged={blocks_merged}, q2_heap={q2_heap_mb}MB"
)),
)
} else {
let boundaries_done = self.boundaries_done.load(Ordering::Relaxed);
let chunks_paired = self.chunks_paired.load(Ordering::Relaxed);
let batches_found = self.batches_boundaries_found.load(Ordering::Relaxed);
let batches_parsed = self.batches_parsed.load(Ordering::Relaxed);
(
self.q2_5_boundaries.len(),
Some(format!(
"boundaries_done={boundaries_done}, parse_done={parse_done}, batches: read={batches_read} paired={chunks_paired} found={batches_found} parsed={batches_parsed}"
)),
)
};
QueueSnapshot {
q1_len: self.q0_chunks.len(),
q2_len: self.q1_decompressed.len(),
q2b_len,
q3_len: self.output.groups.len(),
q4_len: self.output.processed.len(),
q5_len: self.output.serialized.len(),
q6_len: self.output.compressed.len(),
q7_len: 0, q2_reorder_mem: 0, q3_reorder_mem: 0,
memory_limit: self.deadlock_state.get_memory_limit(),
read_done: self.read_done.load(Ordering::Relaxed),
group_done: self.group_done.load(Ordering::Relaxed),
draining: self.output.draining.load(Ordering::Relaxed),
extra_state,
}
}
}
impl<R: BufRead + Send + 'static, P: Send + MemoryEstimate + 'static> OutputPipelineState
for FastqPipelineState<R, P>
{
type Processed = P;
fn has_error(&self) -> bool {
self.output.has_error()
}
fn set_error(&self, error: io::Error) {
self.output.set_error(error);
}
fn q5_pop(&self) -> Option<(u64, SerializedBatch)> {
self.output.serialized.pop()
}
fn q5_push(&self, item: (u64, SerializedBatch)) -> Result<(), (u64, SerializedBatch)> {
self.output.serialized.push(item)
}
fn q5_is_full(&self) -> bool {
self.output.serialized.is_full()
}
fn q5_track_pop(&self, heap_size: u64) {
self.output.serialized_heap_bytes.fetch_sub(heap_size, Ordering::AcqRel);
}
fn q6_pop(&self) -> Option<(u64, CompressedBlockBatch)> {
self.output.compressed.pop()
}
fn q6_push(
&self,
item: (u64, CompressedBlockBatch),
) -> Result<(), (u64, CompressedBlockBatch)> {
let heap_size = item.1.estimate_heap_size();
let result = self.output.compressed.push(item);
if result.is_ok() {
self.output.compressed_heap_bytes.fetch_add(heap_size as u64, Ordering::AcqRel);
}
result
}
fn q6_is_full(&self) -> bool {
self.output.compressed.is_full()
}
fn q6_track_pop(&self, heap_size: u64) {
self.output.compressed_heap_bytes.fetch_sub(heap_size, Ordering::AcqRel);
}
fn q6_reorder_insert(&self, serial: u64, batch: CompressedBlockBatch) {
self.output.write_reorder.lock().insert(serial, batch);
}
fn q6_reorder_try_pop_next(&self) -> Option<CompressedBlockBatch> {
self.output.write_reorder.lock().try_pop_next()
}
fn output_try_lock(
&self,
) -> Option<parking_lot::MutexGuard<'_, Option<Box<dyn Write + Send>>>> {
self.output.output.try_lock()
}
fn increment_written(&self) -> u64 {
self.output.items_written.fetch_add(1, Ordering::Release)
}
fn record_compressed_bytes_out(&self, bytes: u64) {
if let Some(ref stats) = self.output.stats {
stats.compressed_bytes_out.fetch_add(bytes, Ordering::Relaxed);
}
}
fn record_q6_pop_progress(&self) {
self.deadlock_state.record_q6_pop();
}
fn record_q7_push_progress(&self) {
self.deadlock_state.record_q7_push();
}
fn write_reorder_can_proceed(&self, serial: u64) -> bool {
self.output.write_reorder_state.can_proceed(serial)
}
fn write_reorder_is_memory_high(&self) -> bool {
self.output.write_reorder_state.is_memory_high()
}
fn stats(&self) -> Option<&PipelineStats> {
self.output.stats.as_deref()
}
}
impl<R: BufRead + Send + 'static, P: Send + MemoryEstimate + 'static>
ProcessPipelineState<FastqTemplate, P> for FastqPipelineState<R, P>
{
fn process_input_pop(&self) -> Option<(u64, Vec<FastqTemplate>)> {
self.output.groups.pop()
}
fn process_output_is_full(&self) -> bool {
self.output.processed.is_full()
}
fn process_output_push(&self, item: (u64, Vec<P>)) -> Result<(), (u64, Vec<P>)> {
self.output.processed.push(item)
}
fn has_error(&self) -> bool {
self.output.has_error()
}
fn set_error(&self, error: io::Error) {
self.output.set_error(error);
}
fn should_apply_process_backpressure(&self) -> bool {
self.output.should_apply_process_backpressure()
}
fn is_draining(&self) -> bool {
self.output.is_draining()
}
}
impl<R: BufRead + Send + 'static, P: Send + MemoryEstimate + 'static> SerializePipelineState<P>
for FastqPipelineState<R, P>
{
fn serialize_input_pop(&self) -> Option<(u64, Vec<P>)> {
self.output.processed.pop()
}
fn serialize_output_is_full(&self) -> bool {
self.output.serialized.is_full()
}
fn serialize_output_push(
&self,
item: (u64, SerializedBatch),
) -> Result<(), (u64, SerializedBatch)> {
self.output.serialized.push(item)
}
fn has_error(&self) -> bool {
self.output.has_error()
}
fn set_error(&self, error: io::Error) {
self.output.set_error(error);
}
fn record_serialized_bytes(&self, bytes: u64) {
if let Some(ref stats) = self.output.stats {
stats.serialized_bytes.fetch_add(bytes, Ordering::Relaxed);
}
}
}
impl<R: BufRead + Send + 'static, P: Send + MemoryEstimate + 'static> WritePipelineState
for FastqPipelineState<R, P>
{
fn write_input_queue(&self) -> &ArrayQueue<(u64, CompressedBlockBatch)> {
&self.output.compressed
}
fn write_reorder_buffer(&self) -> &Mutex<ReorderBuffer<CompressedBlockBatch>> {
&self.output.write_reorder
}
fn write_reorder_state(&self) -> &super::base::ReorderBufferState {
&self.output.write_reorder_state
}
fn write_output(&self) -> &Mutex<Option<Box<dyn Write + Send>>> {
&self.output.output
}
fn has_error(&self) -> bool {
self.output.has_error()
}
fn set_error(&self, error: io::Error) {
self.output.set_error(error);
}
fn record_written(&self, count: u64) {
self.output.items_written.fetch_add(count, Ordering::Release);
}
fn stats(&self) -> Option<&PipelineStats> {
self.output.stats.as_deref()
}
}
#[allow(clippy::too_many_lines)]
fn fastq_try_step_read<R: BufRead + Send, P: Send + MemoryEstimate>(
state: &FastqPipelineState<R, P>,
worker: &mut FastqWorkerState<P>,
) -> bool {
if let Some((serial, held)) = worker.held_chunk.take() {
match state.q0_chunks.push((serial, held)) {
Ok(()) => {
state.deadlock_state.record_q1_push();
}
Err((serial, held)) => {
worker.held_chunk = Some((serial, held));
return false;
}
}
}
if state.read_done.load(Ordering::Relaxed) || state.has_error() {
return false;
}
if state.q0_chunks.is_full() {
return false;
}
let start = worker.next_stream % state.num_streams;
for i in 0..state.num_streams {
let stream_idx = (start + i) % state.num_streams;
if state.stream_eof[stream_idx].load(Ordering::Relaxed) {
continue;
}
let Some(mut guard) = state.readers[stream_idx].try_lock() else {
continue; };
let Some(ref mut reader) = *guard else {
continue;
};
worker.next_stream = stream_idx + 1;
let batch_num = state.batch_counters[stream_idx].fetch_add(1, Ordering::Relaxed);
match reader {
StreamReader::Bgzf(r) => {
match read_raw_blocks(r, state.blocks_per_batch) {
Ok(blocks) if blocks.is_empty() => {
state.batch_counters[stream_idx].fetch_sub(1, Ordering::Relaxed);
state.stream_eof[stream_idx].store(true, Ordering::Release);
if state.stream_eof.iter().all(|f| f.load(Ordering::Acquire)) {
state.read_done.store(true, Ordering::Release);
}
}
Ok(blocks) => {
let total_size: usize = blocks.iter().map(|b| b.data.len()).sum();
let mut raw_data = Vec::with_capacity(total_size);
for block in blocks {
raw_data.extend_from_slice(&block.data);
}
let serial = state.batches_read.fetch_add(1, Ordering::Release);
let chunk =
PerStreamChunk { stream_idx, batch_num, data: raw_data, offsets: None };
match state.q0_chunks.push((serial, chunk)) {
Ok(()) => {
state.deadlock_state.record_q1_push();
return true;
}
Err((serial, chunk)) => {
worker.held_chunk = Some((serial, chunk));
return true;
}
}
}
Err(e) => {
state.set_error(e);
return false;
}
}
}
StreamReader::Decompressed(r) => {
match read_n_fastq_records(r, state.records_per_batch) {
Ok((data, offsets, at_eof)) => {
if offsets.len() <= 1 {
state.batch_counters[stream_idx].fetch_sub(1, Ordering::Relaxed);
if at_eof {
state.stream_eof[stream_idx].store(true, Ordering::Release);
if state.stream_eof.iter().all(|f| f.load(Ordering::Acquire)) {
state.read_done.store(true, Ordering::Release);
}
}
continue;
}
if at_eof {
state.stream_eof[stream_idx].store(true, Ordering::Release);
if state.stream_eof.iter().all(|f| f.load(Ordering::Acquire)) {
state.read_done.store(true, Ordering::Release);
}
}
let serial = state.batches_read.fetch_add(1, Ordering::Release);
let chunk =
PerStreamChunk { stream_idx, batch_num, data, offsets: Some(offsets) };
match state.q0_chunks.push((serial, chunk)) {
Ok(()) => {
state.deadlock_state.record_q1_push();
return true;
}
Err((serial, chunk)) => {
worker.held_chunk = Some((serial, chunk));
return true;
}
}
}
Err(e) => {
state.set_error(e);
return false;
}
}
}
}
}
false }
fn fastq_try_step_decompress<R: BufRead + Send, P: Send + MemoryEstimate>(
state: &FastqPipelineState<R, P>,
worker: &mut FastqWorkerState<P>,
) -> bool {
if let Some((serial, held)) = worker.held_decompressed_chunk.take() {
match state.q1_decompressed.push((serial, held)) {
Ok(()) => {
state.deadlock_state.record_q2_push();
}
Err((serial, held)) => {
worker.held_decompressed_chunk = Some((serial, held));
return false;
}
}
}
if state.has_error() {
return false;
}
if state.q1_decompressed.is_full() {
return false;
}
let Some((serial, chunk)) = state.q0_chunks.pop() else {
if let Some(stats) = state.stats() {
stats.record_queue_empty(1);
}
return false;
};
state.deadlock_state.record_q1_pop();
let decompressed = if chunk.offsets.is_some() {
chunk
} else {
match decompress_bgzf_chunk(&chunk.data, &mut worker.decompressor) {
Ok(decompressed_data) => PerStreamChunk {
stream_idx: chunk.stream_idx,
batch_num: chunk.batch_num,
data: decompressed_data,
offsets: None, },
Err(e) => {
state.set_error(e);
return false;
}
}
};
match state.q1_decompressed.push((serial, decompressed)) {
Ok(()) => {
state.deadlock_state.record_q2_push();
true
}
Err((serial, chunk)) => {
worker.held_decompressed_chunk = Some((serial, chunk));
true }
}
}
fn fastq_try_step_block_parse<R: BufRead + Send, P: Send + MemoryEstimate>(
state: &FastqPipelineState<R, P>,
worker: &mut FastqWorkerState<P>,
) -> bool {
if state.has_error() {
return false;
}
if let Some(held) = worker.held_block_parsed.take() {
match state.q2_block_parsed.push(held) {
Ok(()) => {
state.chunks_block_parsed.fetch_add(1, Ordering::Release);
state.deadlock_state.record_q2_5_push();
}
Err(held) => {
worker.held_block_parsed = Some(held);
return false;
}
}
}
if state.block_merge_done.load(Ordering::Relaxed) {
return false;
}
if state.q2_block_parsed.is_full() || state.is_q2_block_parsed_memory_high() {
return false;
}
let Some((serial, chunk)) = state.q1_decompressed.pop() else {
return false;
};
state.deadlock_state.record_q2_pop();
let _ = serial;
let stream_idx = chunk.stream_idx;
let block_idx = chunk.batch_num;
if chunk.offsets.is_some() {
state.set_error(io::Error::new(
io::ErrorKind::InvalidData,
"BlockParseFast received a gzip/plain chunk in BGZF mode",
));
return false;
}
let data = &chunk.data;
let prefix_end = detect_prefix_end(data);
let suffix_start = if prefix_end >= data.len() {
data.len() } else {
detect_suffix_start(&data[prefix_end..]) + prefix_end
};
let prefix_bytes = data[..prefix_end].to_vec();
let suffix_bytes = data[suffix_start..].to_vec();
let middle = &data[prefix_end..suffix_start];
let offsets = fgumi_simd_fastq::find_record_offsets(middle);
let num_records = offsets.len().saturating_sub(1);
let mut records = Vec::with_capacity(num_records);
for i in 0..num_records {
let start = offsets[i];
let end = offsets[i + 1];
if start >= end || start >= middle.len() {
continue;
}
match FastqRecord::from_slice(&middle[start..end.min(middle.len())]) {
Ok(rec) => records.push(rec),
Err(e) => {
state.set_error(e);
return false;
}
}
}
let block_parsed = BlockParsed { block_idx, stream_idx, records, prefix_bytes, suffix_bytes };
let heap_bytes = block_parsed.estimate_heap_size() as u64;
state.q2_block_parsed_heap_bytes.fetch_add(heap_bytes, Ordering::Release);
match state.q2_block_parsed.push(block_parsed) {
Ok(()) => {
state.chunks_block_parsed.fetch_add(1, Ordering::Release);
state.deadlock_state.record_q2_5_push();
true
}
Err(held) => {
worker.held_block_parsed = Some(held);
true }
}
}
enum DrainResult {
Ok { did_work: bool, batches_this_call: usize },
HeldParsed(u64, Vec<FastqTemplate>, usize),
Error(io::Error),
}
fn drain_exhausted_stream<R: BufRead + Send, P: Send + MemoryEstimate>(
state: &FastqPipelineState<R, P>,
merge: &mut BlockMergeState,
drain_r1: bool,
mut did_work: bool,
mut batches_this_call: usize,
) -> DrainResult {
let (pending, suffix, surplus, other_surplus, next) = if drain_r1 {
(
&mut merge.r1_pending,
&mut merge.r1_suffix_bytes,
&mut merge.r1_surplus,
&mut merge.r2_surplus,
&mut merge.r1_next,
)
} else {
(
&mut merge.r2_pending,
&mut merge.r2_suffix_bytes,
&mut merge.r2_surplus,
&mut merge.r1_surplus,
&mut merge.r2_next,
)
};
while batches_this_call < MAX_BATCHES_PER_LOCK {
let block_next = *next;
let Some(block) = pending.remove(&block_next) else {
break;
};
merge.pending_heap_bytes =
merge.pending_heap_bytes.saturating_sub(block.estimate_heap_size() as u64);
*next += 1;
let cross = match stitch_cross_block_record(suffix, &block.prefix_bytes) {
Ok(rec) => rec,
Err(e) => return DrainResult::Error(e),
};
let mut all_this: Vec<FastqRecord> = std::mem::take(surplus);
if let Some(rec) = cross {
all_this.push(rec);
}
all_this.extend(block.records);
*suffix = block.suffix_bytes;
let all_other: Vec<FastqRecord> = std::mem::take(other_surplus);
let pair_count = all_this.len().min(all_other.len());
if pair_count > 0 {
let (mut r1_vec, mut r2_vec) =
if drain_r1 { (all_this, all_other) } else { (all_other, all_this) };
let templates: Vec<FastqTemplate> = r1_vec
.drain(..pair_count)
.zip(r2_vec.drain(..pair_count))
.map(|(r1, r2)| {
let name = r1.name().to_vec();
FastqTemplate { name, records: vec![r1, r2] }
})
.collect();
if drain_r1 {
*surplus = r1_vec;
*other_surplus = r2_vec;
} else {
*surplus = r2_vec;
*other_surplus = r1_vec;
}
let serial = merge.serial_out;
merge.serial_out += 1;
let count = templates.len();
match state.output.groups.push((serial, templates)) {
Ok(()) => {
state.total_templates_pushed.fetch_add(count as u64, Ordering::Release);
if let Some(stats) = state.stats() {
stats.groups_produced.fetch_add(count as u64, Ordering::Relaxed);
}
state.deadlock_state.record_q4_push();
did_work = true;
}
Err((_serial, returned)) => {
return DrainResult::HeldParsed(serial, returned, count);
}
}
} else {
*surplus = all_this;
*other_surplus = all_other;
}
batches_this_call += 1;
}
DrainResult::Ok { did_work, batches_this_call }
}
#[allow(clippy::too_many_lines)]
fn fastq_try_step_block_merge<R: BufRead + Send, P: Send + MemoryEstimate>(
state: &FastqPipelineState<R, P>,
worker: &mut FastqWorkerState<P>,
) -> (bool, bool) {
if state.has_error() {
return (false, false);
}
let mut did_work = false;
if let Some((serial, held_templates, count)) = worker.held_parsed.take() {
match state.output.groups.push((serial, held_templates)) {
Ok(()) => {
state.total_templates_pushed.fetch_add(count as u64, Ordering::Release);
if let Some(stats) = state.stats() {
stats.groups_produced.fetch_add(count as u64, Ordering::Relaxed);
}
state.deadlock_state.record_q4_push();
did_work = true;
}
Err(returned) => {
worker.held_parsed = Some((serial, returned.1, count));
return (false, false);
}
}
}
if state.block_merge_done.load(Ordering::Relaxed) {
return (did_work, false);
}
if state.output.groups.is_full() {
return (did_work, false);
}
let Some(mut merge) = state.block_merge_state.try_lock() else {
return (did_work, true); };
let num_streams = state.num_streams;
let can_process = if num_streams == 1 {
merge.r1_pending.contains_key(&merge.r1_next)
} else {
merge.r1_pending.contains_key(&merge.r1_next)
&& merge.r2_pending.contains_key(&merge.r2_next)
};
let within_limit = merge.pending_heap_bytes < PENDING_BACKPRESSURE_BYTES;
let mut drained = 0;
if within_limit || !can_process {
while let Some(block) = state.q2_block_parsed.pop() {
let heap_bytes = block.estimate_heap_size() as u64;
state.q2_block_parsed_heap_bytes.fetch_sub(heap_bytes, Ordering::Release);
merge.pending_heap_bytes += heap_bytes;
state.deadlock_state.record_q2_5_pop();
state.blocks_merged.fetch_add(1, Ordering::Release);
if block.stream_idx == 0 {
merge.r1_pending.insert(block.block_idx, block);
} else {
merge.r2_pending.insert(block.block_idx, block);
}
drained += 1;
}
}
if drained == 0 && merge.r1_pending.is_empty() && merge.r2_pending.is_empty() {
let all_chunks_block_parsed = state.read_done.load(Ordering::Acquire)
&& state.chunks_block_parsed.load(Ordering::Acquire)
== state.batches_read.load(Ordering::Acquire);
if all_chunks_block_parsed && merge.is_empty() {
state.block_merge_done.store(true, Ordering::Release);
state.parse_done.store(true, Ordering::Release);
state.group_done.store(true, Ordering::Release);
}
return (did_work, false);
}
let mut batches_this_call = 0;
if num_streams == 1 {
while batches_this_call < MAX_BATCHES_PER_LOCK {
let r1_next = merge.r1_next;
let Some(r1_block) = merge.r1_pending.remove(&r1_next) else {
break;
};
merge.pending_heap_bytes =
merge.pending_heap_bytes.saturating_sub(r1_block.estimate_heap_size() as u64);
merge.r1_next += 1;
let cross =
match stitch_cross_block_record(&merge.r1_suffix_bytes, &r1_block.prefix_bytes) {
Ok(rec) => rec,
Err(e) => {
state.set_error(e);
return (true, false);
}
};
let mut all_records: Vec<FastqRecord> = std::mem::take(&mut merge.r1_surplus);
if let Some(rec) = cross {
all_records.push(rec);
}
all_records.extend(r1_block.records);
merge.r1_suffix_bytes = r1_block.suffix_bytes;
let templates: Vec<FastqTemplate> = all_records
.into_iter()
.map(|r| {
let name = r.name().to_vec();
FastqTemplate { name, records: vec![r] }
})
.collect();
let serial = merge.serial_out;
merge.serial_out += 1;
let count = templates.len();
match state.output.groups.push((serial, templates)) {
Ok(()) => {
state.total_templates_pushed.fetch_add(count as u64, Ordering::Release);
if let Some(stats) = state.stats() {
stats.groups_produced.fetch_add(count as u64, Ordering::Relaxed);
}
state.deadlock_state.record_q4_push();
did_work = true;
}
Err((serial, returned)) => {
worker.held_parsed = Some((serial, returned, count));
did_work = true;
break;
}
}
batches_this_call += 1;
}
} else {
while batches_this_call < MAX_BATCHES_PER_LOCK {
let r1_next = merge.r1_next;
let r2_next = merge.r2_next;
if !merge.r1_pending.contains_key(&r1_next) || !merge.r2_pending.contains_key(&r2_next)
{
break;
}
let r1_block = merge.r1_pending.remove(&r1_next).expect("just checked");
let r2_block = merge.r2_pending.remove(&r2_next).expect("just checked");
merge.pending_heap_bytes = merge.pending_heap_bytes.saturating_sub(
(r1_block.estimate_heap_size() + r2_block.estimate_heap_size()) as u64,
);
merge.r1_next += 1;
merge.r2_next += 1;
let r1_cross =
match stitch_cross_block_record(&merge.r1_suffix_bytes, &r1_block.prefix_bytes) {
Ok(rec) => rec,
Err(e) => {
state.set_error(e);
return (true, false);
}
};
let r2_cross =
match stitch_cross_block_record(&merge.r2_suffix_bytes, &r2_block.prefix_bytes) {
Ok(rec) => rec,
Err(e) => {
state.set_error(e);
return (true, false);
}
};
let mut all_r1: Vec<FastqRecord> = std::mem::take(&mut merge.r1_surplus);
if let Some(rec) = r1_cross {
all_r1.push(rec);
}
all_r1.extend(r1_block.records);
let mut all_r2: Vec<FastqRecord> = std::mem::take(&mut merge.r2_surplus);
if let Some(rec) = r2_cross {
all_r2.push(rec);
}
all_r2.extend(r2_block.records);
merge.r1_suffix_bytes = r1_block.suffix_bytes;
merge.r2_suffix_bytes = r2_block.suffix_bytes;
let pair_count = all_r1.len().min(all_r2.len());
let templates: Vec<FastqTemplate> = all_r1
.drain(..pair_count)
.zip(all_r2.drain(..pair_count))
.map(|(r1, r2)| {
let name = r1.name().to_vec();
FastqTemplate { name, records: vec![r1, r2] }
})
.collect();
merge.r1_surplus = all_r1;
merge.r2_surplus = all_r2;
let serial = merge.serial_out;
merge.serial_out += 1;
let count = templates.len();
match state.output.groups.push((serial, templates)) {
Ok(()) => {
state.total_templates_pushed.fetch_add(count as u64, Ordering::Release);
if let Some(stats) = state.stats() {
stats.groups_produced.fetch_add(count as u64, Ordering::Relaxed);
}
state.deadlock_state.record_q4_push();
did_work = true;
}
Err((serial, returned)) => {
worker.held_parsed = Some((serial, returned, count));
did_work = true;
break;
}
}
batches_this_call += 1;
}
if worker.held_parsed.is_none() && batches_this_call < MAX_BATCHES_PER_LOCK {
let r1_total = state.batch_counters[0].load(Ordering::Acquire);
let r2_total = state.batch_counters[1].load(Ordering::Acquire);
let r1_exhausted = state.stream_eof[0].load(Ordering::Acquire)
&& merge.r1_next == r1_total
&& !merge.r1_pending.contains_key(&merge.r1_next);
let r2_exhausted = state.stream_eof[1].load(Ordering::Acquire)
&& merge.r2_next == r2_total
&& !merge.r2_pending.contains_key(&merge.r2_next);
if r1_exhausted && !merge.r2_pending.is_empty() {
match drain_exhausted_stream(state, &mut merge, false, did_work, batches_this_call)
{
DrainResult::Ok { did_work: dw, batches_this_call: bc } => {
did_work = dw;
batches_this_call = bc;
}
DrainResult::HeldParsed(serial, templates, count) => {
worker.held_parsed = Some((serial, templates, count));
did_work = true;
}
DrainResult::Error(e) => {
state.set_error(e);
return (true, false);
}
}
}
if worker.held_parsed.is_none() && r2_exhausted && !merge.r1_pending.is_empty() {
match drain_exhausted_stream(state, &mut merge, true, did_work, batches_this_call) {
DrainResult::Ok { did_work: dw, batches_this_call: _bc } => {
did_work = dw;
}
DrainResult::HeldParsed(serial, templates, count) => {
worker.held_parsed = Some((serial, templates, count));
did_work = true;
}
DrainResult::Error(e) => {
state.set_error(e);
return (true, false);
}
}
}
}
}
let all_chunks_block_parsed = state.read_done.load(Ordering::Acquire)
&& state.chunks_block_parsed.load(Ordering::Acquire)
== state.batches_read.load(Ordering::Acquire);
if all_chunks_block_parsed
&& state.q2_block_parsed.is_empty()
&& merge.r1_pending.is_empty()
&& merge.r2_pending.is_empty()
&& merge.r1_surplus.is_empty()
&& merge.r2_surplus.is_empty()
&& merge.r1_suffix_bytes.is_empty()
&& merge.r2_suffix_bytes.is_empty()
&& worker.held_parsed.is_none()
{
state.block_merge_done.store(true, Ordering::Release);
state.parse_done.store(true, Ordering::Release);
state.group_done.store(true, Ordering::Release);
}
(did_work, false)
}
fn fastq_try_step_find_boundaries<R: BufRead + Send, P: Send + MemoryEstimate>(
state: &FastqPipelineState<R, P>,
worker: &mut FastqWorkerState<P>,
) -> (bool, bool) {
if state.has_error() {
return (false, false);
}
let mut did_work = false;
if let Some((serial, held)) = worker.held_boundaries.take() {
let boundary_heap_size = held.estimate_heap_size();
match state.q2_5_boundaries.push((serial, held)) {
Ok(()) => {
state
.q2_5_boundaries_heap_bytes
.fetch_add(boundary_heap_size as u64, Ordering::Relaxed);
state.deadlock_state.record_q2_5_push();
did_work = true;
}
Err((serial, held)) => {
worker.held_boundaries = Some((serial, held));
return (false, false);
}
}
}
if state.boundaries_done.load(Ordering::Relaxed) {
return (did_work, false);
}
if state.q2_5_boundaries.is_full() {
return (did_work, false);
}
let Some(mut pair) = state.pair_state.try_lock() else {
return (did_work, true); };
while let Some((_, chunk)) = state.q1_decompressed.pop() {
state.deadlock_state.record_q2_pop();
state.chunks_paired.fetch_add(1, Ordering::Release);
pair.insert(chunk);
}
let all_arrived = state.read_done.load(Ordering::Acquire)
&& state.chunks_paired.load(Ordering::Acquire)
== state.batches_read.load(Ordering::Acquire);
let mut batches_this_call = 0;
while batches_this_call < MAX_BATCHES_PER_LOCK {
let Some(chunks) = pair.try_pop_complete(all_arrived) else { break };
let serial = state.batches_boundaries_found.fetch_add(1, Ordering::Release);
let boundary_batch = if chunks.iter().all(|c| c.offsets.is_some()) {
let streams: Vec<FastqStreamBoundaries> = chunks
.into_iter()
.map(|c| FastqStreamBoundaries {
stream_idx: c.stream_idx,
data: c.data,
offsets: c.offsets.expect("gzip chunks must have pre-computed offsets"),
})
.collect();
align_stream_records(streams, serial)
} else {
let decompressed = FastqDecompressedBatch {
chunks: chunks
.into_iter()
.map(|c| FastqDecompressedChunk { stream_idx: c.stream_idx, data: c.data })
.collect(),
serial,
};
match FastqFormat::find_boundaries(&state.boundary_state, decompressed) {
Ok(batch) => batch,
Err(e) => {
state.set_error(e);
return (true, false);
}
}
};
let boundary_heap_size = boundary_batch.estimate_heap_size();
match state.q2_5_boundaries.push((serial, boundary_batch)) {
Ok(()) => {
state
.q2_5_boundaries_heap_bytes
.fetch_add(boundary_heap_size as u64, Ordering::Relaxed);
state.deadlock_state.record_q2_5_push();
did_work = true;
}
Err((serial, batch)) => {
worker.held_boundaries = Some((serial, batch));
did_work = true;
break; }
}
batches_this_call += 1;
}
if all_arrived && pair.is_empty() {
state.boundaries_done.store(true, Ordering::Release);
}
(did_work, false)
}
fn fastq_try_step_parse<R: BufRead + Send, P: Send + MemoryEstimate>(
state: &FastqPipelineState<R, P>,
worker: &mut FastqWorkerState<P>,
) -> bool {
if state.parse_done.load(Ordering::Relaxed) || state.has_error() {
return false;
}
if let Some((serial, held_templates, count)) = worker.held_parsed.take() {
match state.output.groups.push((serial, held_templates)) {
Ok(()) => {
#[cfg(feature = "memory-debug")]
{
let q4_heap: u64 = 0; state.output.groups_heap_bytes.fetch_add(q4_heap, Ordering::AcqRel);
}
state.total_templates_pushed.fetch_add(count as u64, Ordering::Release);
if let Some(stats) = state.stats() {
stats.groups_produced.fetch_add(count as u64, Ordering::Relaxed);
}
state.deadlock_state.record_q4_push();
state.batches_parsed.fetch_add(1, Ordering::Release);
state.batches_grouped.fetch_add(1, Ordering::Release);
}
Err(returned) => {
worker.held_parsed = Some((serial, returned.1, count));
return false; }
}
}
if state.output.groups.is_full() {
return false;
}
let Some((serial, boundary_batch)) = state.q2_5_boundaries.pop() else {
if let Some(stats) = state.stats() {
stats.record_queue_empty(25); }
let boundaries_done = state.boundaries_done.load(Ordering::Acquire);
let all_parsed = state.batches_boundaries_found.load(Ordering::Acquire)
== state.batches_parsed.load(Ordering::Acquire);
if boundaries_done && all_parsed && state.q2_5_boundaries.is_empty() {
state.parse_done.store(true, Ordering::Release);
state.group_done.store(true, Ordering::Release);
log::trace!("PARSE: set parse_done=true, group_done=true");
} else if let Some(stats) = state.stats() {
stats.record_queue_empty(2);
}
return false;
};
state.deadlock_state.record_q2_5_pop();
let input_heap_size = boundary_batch.estimate_heap_size();
match FastqFormat::parse_records(boundary_batch) {
Ok(parsed_batch) => {
state.q2_5_boundaries_heap_bytes.fetch_sub(input_heap_size as u64, Ordering::Relaxed);
let templates = match create_templates_from_streams(parsed_batch.streams) {
Ok(t) => t,
Err(e) => {
state.set_error(e);
return false;
}
};
let count = templates.len();
match state.output.groups.push((serial, templates)) {
Ok(()) => {
#[cfg(feature = "memory-debug")]
{
let q4_heap: u64 = 0;
state.output.groups_heap_bytes.fetch_add(q4_heap, Ordering::AcqRel);
}
state.total_templates_pushed.fetch_add(count as u64, Ordering::Release);
if let Some(stats) = state.stats() {
stats.groups_produced.fetch_add(count as u64, Ordering::Relaxed);
}
state.deadlock_state.record_q4_push();
state.batches_parsed.fetch_add(1, Ordering::Release);
state.batches_grouped.fetch_add(1, Ordering::Release);
true
}
Err(returned) => {
worker.held_parsed = Some((serial, returned.1, count));
true }
}
}
Err(e) => {
state.q2_5_boundaries_heap_bytes.fetch_sub(input_heap_size as u64, Ordering::Relaxed);
state.set_error(e);
false
}
}
}
fn create_templates_from_streams(
mut streams: Vec<FastqParsedStream>,
) -> io::Result<Vec<FastqTemplate>> {
let num_streams = streams.len();
match num_streams {
0 => Ok(Vec::new()),
1 => {
let records = streams.pop().expect("streams is non-empty in single-end branch").records;
Ok(records
.into_iter()
.map(|r| {
let name = r.name().to_vec();
FastqTemplate { name, records: vec![r] }
})
.collect())
}
2 => {
streams.sort_by_key(|s| s.stream_idx);
let mut drain = streams.into_iter();
let r1_records = drain.next().expect("sorted streams must have R1 at index 0").records;
let r2_records = drain.next().expect("sorted streams must have R2 at index 1").records;
if r1_records.len() != r2_records.len() {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"FASTQ batch size mismatch: R1 has {} records, R2 has {} records",
r1_records.len(),
r2_records.len()
),
));
}
let templates: Vec<FastqTemplate> = r1_records
.into_iter()
.zip(r2_records)
.map(|(r1, r2)| {
let name = r1.name().to_vec();
FastqTemplate { name, records: vec![r1, r2] }
})
.collect();
Ok(templates)
}
n => Err(io::Error::new(
io::ErrorKind::Unsupported,
format!("Synchronized mode not supported for {n} streams (max 2)"),
)),
}
}
fn fastq_try_step_process<R: BufRead + Send, P: Send + MemoryEstimate, PF>(
state: &FastqPipelineState<R, P>,
process_fn: &PF,
worker: &mut FastqWorkerState<P>,
) -> bool
where
PF: Fn(FastqTemplate) -> io::Result<P>,
{
if let Some((serial, held, heap_size)) = worker.held_processed.take() {
match state.output.processed.push((serial, held)) {
Ok(()) => {
state.output.processed_heap_bytes.fetch_add(heap_size as u64, Ordering::AcqRel);
state.deadlock_state.record_q5_push();
}
Err((serial, held)) => {
worker.held_processed = Some((serial, held, heap_size));
return false;
}
}
}
if state.has_error() {
return false;
}
if state.output.processed.is_full() || state.is_q4_memory_high() {
return false;
}
let max_batches = 8;
let mut did_work = false;
for _ in 0..max_batches {
if state.output.processed.is_full() || state.is_q4_memory_high() {
break;
}
let Some((serial, batch)) = state.output.groups.pop() else {
if let Some(stats) = state.stats() {
stats.record_queue_empty(4);
}
break;
};
state.deadlock_state.record_q4_pop();
#[cfg(feature = "memory-debug")]
{
let q4_heap: u64 = batch.iter().map(|t| t.estimate_heap_size() as u64).sum();
state.output.groups_heap_bytes.fetch_sub(q4_heap, Ordering::AcqRel);
}
log::trace!(
"fastq_try_step_process: processing batch of {} templates, serial={}",
batch.len(),
serial
);
let mut results: Vec<P> = Vec::with_capacity(batch.len());
for template in batch {
match process_fn(template) {
Ok(processed) => results.push(processed),
Err(e) => {
log::error!("fastq_try_step_process: error: {e:?}");
state.set_error(e);
return false;
}
}
}
log::trace!("fastq_try_step_process: processed {} items successfully", results.len());
let heap_size: usize = results.iter().map(MemoryEstimate::estimate_heap_size).sum();
match state.output.processed.push((serial, results)) {
Ok(()) => {
state.output.processed_heap_bytes.fetch_add(heap_size as u64, Ordering::AcqRel);
state.deadlock_state.record_q5_push();
did_work = true;
}
Err((serial, results)) => {
worker.held_processed = Some((serial, results, heap_size));
break;
}
}
}
did_work
}
fn fastq_try_step_serialize<R: BufRead + Send, P: Send + MemoryEstimate, SF>(
state: &FastqPipelineState<R, P>,
serialize_fn: &SF,
header: &Header,
worker: &mut FastqWorkerState<P>,
) -> bool
where
SF: Fn(P, &Header, &mut Vec<u8>) -> io::Result<u64>,
{
if let Some((serial, held, heap_size)) = worker.held_serialized.take() {
match state.output.serialized.push((serial, held)) {
Ok(()) => {
state.output.serialized_heap_bytes.fetch_add(heap_size as u64, Ordering::AcqRel);
state.deadlock_state.record_q6_push();
}
Err((serial, held)) => {
worker.held_serialized = Some((serial, held, heap_size));
return false;
}
}
}
if state.has_error() {
return false;
}
if state.output.serialized.is_full() {
return false;
}
let Some((serial, batch)) = state.output.processed.pop() else {
if let Some(stats) = state.stats() {
stats.record_queue_empty(5);
}
return false;
};
state.deadlock_state.record_q5_pop();
let q4_heap_size: usize = batch.iter().map(MemoryEstimate::estimate_heap_size).sum();
state.output.processed_heap_bytes.fetch_sub(q4_heap_size as u64, Ordering::AcqRel);
worker.core.serialization_buffer.clear();
log::trace!(
"fastq_try_step_serialize: serializing batch of {} items, serial={}",
batch.len(),
serial
);
let mut total_record_count: u64 = 0;
for item in batch {
match serialize_fn(item, header, &mut worker.core.serialization_buffer) {
Ok(record_count) => {
total_record_count += record_count;
}
Err(e) => {
log::error!("fastq_try_step_serialize: error: {e:?}");
state.set_error(e);
return false;
}
}
}
let combined_data = std::mem::replace(
&mut worker.core.serialization_buffer,
Vec::with_capacity(SERIALIZATION_BUFFER_CAPACITY),
);
log::trace!(
"fastq_try_step_serialize: serialized successfully, total_data_len={}, record_count={}",
combined_data.len(),
total_record_count
);
state.total_records_serialized.fetch_add(total_record_count, Ordering::Release);
if let Some(stats) = state.stats() {
stats.serialized_bytes.fetch_add(combined_data.len() as u64, Ordering::Relaxed);
}
let batch = SerializedBatch {
data: combined_data,
record_count: total_record_count,
secondary_data: None,
};
let heap_size = batch.estimate_heap_size();
match state.output.serialized.push((serial, batch)) {
Ok(()) => {
state.output.serialized_heap_bytes.fetch_add(heap_size as u64, Ordering::AcqRel);
state.deadlock_state.record_q6_push();
true
}
Err((serial, batch)) => {
worker.held_serialized = Some((serial, batch, heap_size));
false
}
}
}
fn fastq_try_step_compress<R: BufRead + Send + 'static, P: Send + MemoryEstimate + 'static>(
state: &FastqPipelineState<R, P>,
worker: &mut FastqWorkerState<P>,
) -> bool {
shared_try_step_compress(state, worker).is_success()
}
fn fastq_try_step_write<R: BufRead + Send + 'static, P: Send + MemoryEstimate + 'static>(
state: &FastqPipelineState<R, P>,
) -> bool {
if state.has_error() {
return false;
}
let Some(mut output_guard) = state.output.output.try_lock() else {
if let Some(stats) = state.stats() {
stats.record_contention(PipelineStep::Write);
}
return false;
};
let Some(ref mut output) = *output_guard else {
return false;
};
let mut wrote_any = false;
let q7_truly_empty;
{
let mut reorder = state.output.write_reorder.lock();
while let Some((serial, batch)) = state.output.compressed.pop() {
let heap_size = batch.estimate_heap_size();
let q7_heap = heap_size as u64;
state.q6_track_pop(q7_heap);
reorder.insert_with_size(serial, batch, heap_size);
state.output.write_reorder_state.add_heap_bytes(q7_heap);
state.deadlock_state.record_q7_pop();
}
while let Some((batch, heap_size)) = reorder.try_pop_next_with_size() {
let mut batch_bytes: u64 = 0;
for block in &batch.blocks {
batch_bytes += block.data.len() as u64;
if let Err(e) = output.write_all(&block.data) {
state.set_error(e);
return false;
}
}
state.output.write_reorder_state.sub_heap_bytes(heap_size as u64);
state.output.write_reorder_state.update_next_seq(reorder.next_seq());
if let Some(stats) = state.stats() {
stats.bytes_written.fetch_add(batch_bytes, Ordering::Relaxed);
}
let records_in_batch = batch.record_count;
state.output.items_written.fetch_add(records_in_batch, Ordering::Relaxed);
state.output.progress.log_if_needed(records_in_batch);
wrote_any = true;
}
q7_truly_empty = reorder.is_empty();
}
if !wrote_any && q7_truly_empty {
if let Some(stats) = state.stats() {
stats.record_queue_empty(7);
}
}
wrote_any
}
fn fastq_execute_step<R: BufRead + Send + 'static, P: Send + MemoryEstimate + 'static, PF, SF>(
state: &FastqPipelineState<R, P>,
header: &Header,
process_fn: &PF,
serialize_fn: &SF,
worker: &mut FastqWorkerState<P>,
step: PipelineStep,
) -> (bool, bool)
where
PF: Fn(FastqTemplate) -> io::Result<P>,
SF: Fn(P, &Header, &mut Vec<u8>) -> io::Result<u64>,
{
match step {
PipelineStep::Read => (fastq_try_step_read(state, worker), false),
PipelineStep::Decompress => (fastq_try_step_decompress(state, worker), false),
PipelineStep::FindBoundaries => {
if state.config.inputs_are_bgzf {
fastq_try_step_block_merge(state, worker)
} else {
fastq_try_step_find_boundaries(state, worker)
}
}
PipelineStep::Decode => {
if state.config.inputs_are_bgzf {
let success = fastq_try_step_block_parse(state, worker);
(success, false)
} else {
let success = fastq_try_step_parse(state, worker);
(success, false)
}
}
PipelineStep::Group => {
(false, false)
}
PipelineStep::Process => (fastq_try_step_process(state, process_fn, worker), false),
PipelineStep::Serialize => {
(fastq_try_step_serialize(state, serialize_fn, header, worker), false)
}
PipelineStep::Compress => (fastq_try_step_compress(state, worker), false),
PipelineStep::Write => {
let success = fastq_try_step_write(state);
(success, !success && !state.output.compressed.is_empty())
}
}
}
pub struct FastqStepContext<'a, R: BufRead + Send, P: Send + MemoryEstimate, PF, SF> {
pub state: &'a FastqPipelineState<R, P>,
pub header: &'a Header,
pub process_fn: &'a PF,
pub serialize_fn: &'a SF,
pub is_reader: bool,
}
impl<R, P, PF, SF> StepContext for FastqStepContext<'_, R, P, PF, SF>
where
R: BufRead + Send + 'static,
P: Send + MemoryEstimate + 'static,
PF: Fn(FastqTemplate) -> io::Result<P>,
SF: Fn(P, &Header, &mut Vec<u8>) -> io::Result<u64>,
{
type Worker = FastqWorkerState<P>;
fn execute_step(&self, worker: &mut Self::Worker, step: PipelineStep) -> (bool, bool) {
fastq_execute_step(
self.state,
self.header,
self.process_fn,
self.serialize_fn,
worker,
step,
)
}
fn get_backpressure(&self, _worker: &Self::Worker) -> BackpressureState {
let cap = self.state.config.queue_capacity;
let read_done = self.state.read_done.load(Ordering::Relaxed);
BackpressureState {
output_high: self.state.output.compressed.len() > cap * 3 / 4,
input_low: self.state.q0_chunks.len() < cap / 4,
read_done,
memory_high: !self.state.is_draining()
&& self.state.output.write_reorder_state.is_memory_high(),
memory_drained: self.state.output.write_reorder_state.is_memory_drained(),
}
}
fn check_drain_mode(&self) {
let read_done = self.state.read_done.load(Ordering::Relaxed);
if read_done && self.state.q0_chunks.is_empty() {
self.state.output.draining.store(true, Ordering::Relaxed);
}
}
fn has_error(&self) -> bool {
self.state.has_error()
}
fn is_complete(&self) -> bool {
self.state.is_complete()
}
fn stats(&self) -> Option<&PipelineStats> {
self.state.stats()
}
fn skip_read(&self) -> bool {
true
}
fn should_attempt_sticky_read(&self) -> bool {
self.is_reader
}
fn sticky_read_should_continue(&self) -> bool {
true
}
fn execute_read_step(&self, worker: &mut Self::Worker) -> bool {
fastq_try_step_read(self.state, worker)
}
fn exclusive_step_owned(&self, worker: &Self::Worker) -> Option<PipelineStep> {
if self.is_reader {
None
} else {
worker.core.scheduler.exclusive_step_owned()
}
}
}
#[allow(clippy::too_many_lines, clippy::needless_pass_by_value)]
pub fn run_fastq_pipeline<P, PF, SF>(
config: FastqPipelineConfig,
fastq_paths: &[PathBuf],
decompressed_readers: Option<Vec<Box<dyn BufRead + Send>>>,
header: &Header,
mut output: Box<dyn Write + Send>,
process_fn: PF,
serialize_fn: SF,
) -> io::Result<u64>
where
P: Send + MemoryEstimate + 'static,
PF: Fn(FastqTemplate) -> io::Result<P> + Send + Sync + 'static,
SF: Fn(P, &Header, &mut Vec<u8>) -> io::Result<u64> + Send + Sync + 'static,
{
log::debug!("run_fastq_pipeline: starting, num_threads={}", config.num_threads);
log::debug!("run_fastq_pipeline: writing BAM header");
write_bam_header(&mut output, header)?;
log::debug!("run_fastq_pipeline: BAM header written successfully");
log::debug!(
"run_fastq_pipeline: creating readers, decompressed_readers.is_some()={}, inputs_are_bgzf={}",
decompressed_readers.is_some(),
config.inputs_are_bgzf,
);
let stream_readers: Vec<StreamReader<Box<dyn BufRead + Send>>> =
if let Some(readers) = decompressed_readers {
let num_readers = readers.len();
log::debug!("run_fastq_pipeline: using {num_readers} Decompressed readers");
readers.into_iter().map(StreamReader::Decompressed).collect()
} else {
log::debug!(
"run_fastq_pipeline: using {} BGZF readers (async_reader={})",
fastq_paths.len(),
config.async_reader,
);
fastq_paths
.iter()
.map(|p| {
let file = File::open(p)?;
crate::os_hints::advise_sequential(&file);
let inner: Box<dyn BufRead + Send> = if config.async_reader {
let prefetch = crate::prefetch_reader::PrefetchReader::from_file(file);
Box::new(BufReader::with_capacity(256 * 1024, prefetch))
} else {
Box::new(BufReader::with_capacity(256 * 1024, file))
};
Ok(StreamReader::Bgzf(inner))
})
.collect::<io::Result<Vec<_>>>()?
};
log::debug!("run_fastq_pipeline: creating pipeline state");
let state = Arc::new(FastqPipelineState::<Box<dyn BufRead + Send>, P>::new(
config.clone(),
stream_readers,
output,
));
log::debug!("run_fastq_pipeline: state created, spawning {} workers", config.num_threads);
let process_fn = Arc::new(process_fn);
let serialize_fn = Arc::new(serialize_fn);
let header = Arc::new(header.clone());
let num_threads = config.num_threads;
let compression_level = config.compression_level;
let scheduler_strategy = config.scheduler_strategy;
let active_steps = config.active_steps();
log::debug!("run_fastq_pipeline: spawning {num_threads} worker threads");
let handles: Vec<_> = (0..num_threads)
.map(|thread_id| {
let state = Arc::clone(&state);
let process_fn = Arc::clone(&process_fn);
let serialize_fn = Arc::clone(&serialize_fn);
let header = Arc::clone(&header);
let active_steps = active_steps.clone();
thread::spawn(move || {
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
log::debug!("Worker thread {thread_id} starting");
let mut worker = FastqWorkerState::new(
compression_level,
thread_id,
num_threads,
scheduler_strategy,
active_steps,
);
log::debug!("Worker thread {thread_id} created worker state");
let ctx = FastqStepContext {
state: &state,
header: &header,
process_fn: &*process_fn,
serialize_fn: &*serialize_fn,
is_reader: thread_id < state.num_streams,
};
generic_worker_loop(&ctx, &mut worker);
log::debug!("Worker thread {thread_id} finished");
}));
if let Err(panic_info) = result {
handle_worker_panic(&*state, thread_id, panic_info);
}
})
})
.collect();
log::debug!("run_fastq_pipeline: all workers spawned");
let monitor_handle = if state.stats().is_some() || state.deadlock_state.is_enabled() {
let state_clone = Arc::clone(&state);
Some(thread::spawn(move || {
run_monitor_loop(&state_clone, 100, 10, |s| {
if s.deadlock_state.is_enabled() {
let bd = s.boundaries_done.load(Ordering::Relaxed);
let pd = s.parse_done.load(Ordering::Relaxed);
let br = s.batches_read.load(Ordering::Relaxed);
let bf = s.batches_boundaries_found.load(Ordering::Relaxed);
let bp = s.batches_parsed.load(Ordering::Relaxed);
let bg = s.batches_grouped.load(Ordering::Relaxed);
log::trace!(
"Parallel parse state: boundaries_done={bd}, parse_done={pd}, batches: read={br}, boundaries={bf}, parsed={bp}, grouped={bg}"
);
}
});
}))
} else {
None
};
log::debug!("run_fastq_pipeline: waiting for workers to complete");
join_worker_threads(handles)?;
log::debug!("run_fastq_pipeline: all workers joined");
join_monitor_thread(monitor_handle);
log::info!(
"Pipeline counters: batches_read={}, batches_grouped={}, total_templates_pushed={}, total_records_serialized={}, templates_written={}",
state.batches_read.load(Ordering::Relaxed),
state.batches_grouped.load(Ordering::Relaxed),
state.total_templates_pushed.load(Ordering::Relaxed),
state.total_records_serialized.load(Ordering::Relaxed),
state.output.items_written.load(Ordering::Relaxed)
);
finalize_pipeline(&*state)
}
fn write_bam_header(writer: &mut dyn Write, header: &Header) -> io::Result<()> {
use noodles::bam;
let mut encoder = bam::io::Writer::new(writer);
encoder.write_header(header)?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::super::bam::*;
use super::super::base::*;
use super::*;
use crate::bgzf_reader::RawBgzfBlock;
use crate::bgzf_writer::CompressedBlock;
use PipelineStep::*;
use rstest::rstest;
use std::io::Cursor;
#[test]
fn test_raw_block_batch() {
let mut batch = RawBlockBatch::new();
assert!(batch.is_empty());
assert_eq!(batch.len(), 0);
batch.blocks.push(RawBgzfBlock { data: vec![0u8; 100] });
assert!(!batch.is_empty());
assert_eq!(batch.len(), 1);
batch.clear();
assert!(batch.is_empty());
}
#[test]
fn test_compressed_block_batch() {
let mut batch = CompressedBlockBatch::new();
assert!(batch.is_empty());
assert_eq!(batch.len(), 0);
batch.blocks.push(CompressedBlock { serial: 0, data: vec![1, 2, 3, 4, 5] });
assert!(!batch.is_empty());
assert_eq!(batch.len(), 1);
assert_eq!(batch.total_size(), 5);
batch.clear();
assert!(batch.is_empty());
}
#[test]
fn test_bgzf_batch_config() {
let config = BgzfBatchConfig::default();
assert_eq!(config.blocks_per_batch, 16);
assert_eq!(config.compression_level, 6);
let config = BgzfBatchConfig::new(32).with_compression_level(9);
assert_eq!(config.blocks_per_batch, 32);
assert_eq!(config.compression_level, 9);
}
#[test]
fn test_pipeline_config() {
let config = PipelineConfig::new(4, 6);
assert_eq!(config.num_threads, 4);
assert_eq!(config.compression_level, 6);
let config = PipelineConfig::new(8, 6).with_compression_level(9);
assert_eq!(config.num_threads, 8);
assert_eq!(config.compression_level, 9);
}
#[test]
fn test_pipeline_step_is_exclusive() {
assert!(PipelineStep::Read.is_exclusive());
assert!(!PipelineStep::Decompress.is_exclusive());
assert!(PipelineStep::FindBoundaries.is_exclusive());
assert!(!PipelineStep::Decode.is_exclusive());
assert!(PipelineStep::Group.is_exclusive());
assert!(!PipelineStep::Process.is_exclusive());
assert!(!PipelineStep::Serialize.is_exclusive());
assert!(!PipelineStep::Compress.is_exclusive());
assert!(PipelineStep::Write.is_exclusive());
}
#[test]
fn test_pipeline_step_all() {
let all = PipelineStep::all();
assert_eq!(all.len(), 9);
assert_eq!(all[0], PipelineStep::Read);
assert_eq!(all[8], PipelineStep::Write);
}
#[test]
fn test_decompressed_batch() {
let mut batch = DecompressedBatch::new();
assert!(batch.is_empty());
batch.data.extend_from_slice(b"hello");
assert!(!batch.is_empty());
batch.clear();
assert!(batch.is_empty());
}
#[test]
fn test_serialized_batch() {
let mut batch = SerializedBatch::new();
assert!(batch.is_empty());
batch.data.extend_from_slice(b"data");
assert!(!batch.is_empty());
batch.clear();
assert!(batch.is_empty());
}
#[test]
fn test_bam_pipeline_config() {
let config = BamPipelineConfig::new(4, 6);
assert_eq!(config.pipeline.num_threads, 4);
assert_eq!(config.compression_level, 6);
let config = BamPipelineConfig::new(8, 6).with_compression_level(9);
assert_eq!(config.compression_level, 9);
assert_eq!(config.pipeline.compression_level, 9);
}
#[test]
fn test_read_raw_blocks_from_memory() {
let bgzf_empty_block: Vec<u8> = vec![
0x1f, 0x8b, 0x08, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0x06, 0x00, 0x42, 0x43, 0x02, 0x00, 0x1b, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, ];
let mut reader = Cursor::new(bgzf_empty_block);
let blocks = read_raw_blocks(&mut reader, 10).expect("failed to read raw blocks");
assert_eq!(blocks.len(), 0);
}
#[test]
fn test_discarded_push_causes_data_loss() {
use std::sync::Arc;
use std::thread;
let queue: Arc<ArrayQueue<u64>> = Arc::new(ArrayQueue::new(2));
let items_pushed = Arc::new(AtomicU64::new(0));
let items_received = Arc::new(AtomicU64::new(0));
let queue_producer = Arc::clone(&queue);
let pushed = Arc::clone(&items_pushed);
let producer = thread::spawn(move || {
for i in 0..100u64 {
let _ = queue_producer.push(i);
pushed.fetch_add(1, Ordering::Relaxed);
}
});
let queue_consumer = Arc::clone(&queue);
let received = Arc::clone(&items_received);
let consumer = thread::spawn(move || {
thread::sleep(std::time::Duration::from_millis(50));
while queue_consumer.pop().is_some() {
received.fetch_add(1, Ordering::Relaxed);
}
});
producer.join().expect("thread should not panic");
consumer.join().expect("thread should not panic");
let pushed_count = items_pushed.load(Ordering::Relaxed);
let received_count = items_received.load(Ordering::Relaxed);
assert_eq!(pushed_count, 100, "Producer thought it pushed 100 items");
assert!(
received_count < pushed_count,
"Data was lost! Pushed {pushed_count} but only received {received_count}",
);
assert_eq!(received_count, 2, "Only queue capacity items should be received");
}
#[test]
fn test_config_defaults() {
let config = FastqPipelineConfig::new(4, false, 6);
assert!(!config.inputs_are_bgzf);
assert_eq!(config.num_threads, 4);
}
#[test]
fn test_parallel_parse_boundary_finding_integration() {
let boundary_state = FastqBoundaryState::new(2);
let batch = FastqDecompressedBatch {
chunks: vec![
FastqDecompressedChunk {
stream_idx: 0,
data: b"@read1\nACGT\n+\nIIII\n@read2\nGGGG\n+\nJJJJ\n".to_vec(),
},
FastqDecompressedChunk {
stream_idx: 1,
data: b"@read1\nTTTT\n+\nKKKK\n@read2\nCCCC\n+\nLLLL\n".to_vec(),
},
],
serial: 0,
};
let boundary_batch =
FastqFormat::find_boundaries(&boundary_state, batch).expect("find_boundaries failed");
assert_eq!(boundary_batch.streams.len(), 2);
assert_eq!(boundary_batch.streams[0].offsets.len(), 3);
assert_eq!(boundary_batch.streams[1].offsets.len(), 3);
let parsed_batch =
FastqFormat::parse_records(boundary_batch).expect("parse_records failed");
assert_eq!(parsed_batch.streams.len(), 2);
assert_eq!(parsed_batch.streams[0].stream_idx, 0);
assert_eq!(parsed_batch.streams[1].stream_idx, 1);
assert_eq!(parsed_batch.streams[0].records.len(), 2);
assert_eq!(parsed_batch.streams[1].records.len(), 2);
assert_eq!(parsed_batch.streams[0].records[0].name(), b"read1");
assert_eq!(parsed_batch.streams[0].records[0].sequence(), b"ACGT");
assert_eq!(parsed_batch.streams[0].records[1].name(), b"read2");
assert_eq!(parsed_batch.streams[1].records[0].name(), b"read1");
assert_eq!(parsed_batch.streams[1].records[0].sequence(), b"TTTT");
}
#[test]
fn test_parallel_parse_records_spanning_chunks() {
let boundary_state = FastqBoundaryState::new(1);
let batch1 = FastqDecompressedBatch {
chunks: vec![FastqDecompressedChunk {
stream_idx: 0,
data: b"@read1\nACGT\n+\nIIII\n@read2\nGG".to_vec(),
}],
serial: 0,
};
let boundary_batch1 =
FastqFormat::find_boundaries(&boundary_state, batch1).expect("find_boundaries failed");
assert_eq!(boundary_batch1.streams[0].offsets.len(), 2); assert!(!boundary_state.stream_states[0].lock().leftover.is_empty());
let batch2 = FastqDecompressedBatch {
chunks: vec![FastqDecompressedChunk { stream_idx: 0, data: b"GG\n+\nJJJJ\n".to_vec() }],
serial: 1,
};
let boundary_batch2 =
FastqFormat::find_boundaries(&boundary_state, batch2).expect("find_boundaries failed");
assert!(boundary_batch2.streams[0].offsets.len() >= 2);
assert!(boundary_state.stream_states[0].lock().leftover.is_empty());
let parsed1 = FastqFormat::parse_records(boundary_batch1).expect("parse_records failed");
let parsed2 = FastqFormat::parse_records(boundary_batch2).expect("parse_records failed");
assert_eq!(parsed1.streams[0].records.len(), 1);
assert_eq!(parsed1.streams[0].records[0].name(), b"read1");
assert_eq!(parsed2.streams[0].records.len(), 1);
assert_eq!(parsed2.streams[0].records[0].name(), b"read2");
}
#[test]
fn test_parallel_parse_thread_safety() {
use std::thread;
let num_threads = 4;
let batches_per_thread = 10;
let results: Vec<_> = (0..num_threads)
.map(|thread_id| {
thread::spawn(move || {
let mut records_parsed = 0;
for batch_id in 0..batches_per_thread {
let name = format!("read_t{thread_id}_b{batch_id}");
let data = format!("@{name}\nACGT\n+\nIIII\n");
let boundary_batch = FastqBoundaryBatch {
streams: vec![FastqStreamBoundaries {
stream_idx: 0,
data: data.as_bytes().to_vec(),
offsets: vec![0, data.len()],
}],
serial: (thread_id * batches_per_thread + batch_id) as u64,
};
let parsed = FastqFormat::parse_records(boundary_batch)
.expect("parse_records failed");
assert_eq!(parsed.streams[0].stream_idx, 0);
assert_eq!(parsed.streams[0].records.len(), 1);
assert_eq!(
String::from_utf8_lossy(parsed.streams[0].records[0].name()),
name
);
records_parsed += 1;
}
records_parsed
})
})
.collect();
let total_parsed: usize =
results.into_iter().map(|h| h.join().expect("Thread panicked")).sum();
assert_eq!(total_parsed, num_threads * batches_per_thread, "All records should be parsed");
}
#[test]
fn test_find_boundaries_aligns_unequal_record_counts() {
let boundary_state = FastqBoundaryState::new(2);
let batch = FastqDecompressedBatch {
chunks: vec![
FastqDecompressedChunk {
stream_idx: 0,
data: b"@r1\nACGT\n+\nIIII\n@r2\nACGT\n+\nIIII\n@r3\nACGT\n+\nIIII\n".to_vec(),
},
FastqDecompressedChunk {
stream_idx: 1,
data: b"@r1\nTTTT\n+\nJJJJ\n@r2\nTTTT\n+\nJJJJ\n".to_vec(),
},
],
serial: 0,
};
let boundary_batch =
FastqFormat::find_boundaries(&boundary_state, batch).expect("find_boundaries failed");
assert_eq!(boundary_batch.streams.len(), 2);
assert_eq!(
boundary_batch.streams[0].offsets.len(),
3,
"Stream 0 should have 2 records (3 offsets)"
);
assert_eq!(
boundary_batch.streams[1].offsets.len(),
3,
"Stream 1 should have 2 records (3 offsets)"
);
let leftover = &boundary_state.stream_states[0].lock().leftover;
assert!(!leftover.is_empty(), "Stream 0 should have leftover containing the excess record");
assert!(leftover.starts_with(b"@r3\n"), "Leftover should contain the third record");
let leftover1 = &boundary_state.stream_states[1].lock().leftover;
assert!(leftover1.is_empty(), "Stream 1 should have no leftover");
}
#[test]
fn test_find_boundaries_leftover_persists_to_next_batch() {
let boundary_state = FastqBoundaryState::new(2);
let batch1 = FastqDecompressedBatch {
chunks: vec![
FastqDecompressedChunk {
stream_idx: 0,
data: b"@r1\nAAAA\n+\nIIII\n@r2\nAAAA\n+\nIIII\n@r3\nAAAA\n+\nIIII\n".to_vec(),
},
FastqDecompressedChunk {
stream_idx: 1,
data: b"@r1\nTTTT\n+\nJJJJ\n@r2\nTTTT\n+\nJJJJ\n".to_vec(),
},
],
serial: 0,
};
let boundary_batch1 =
FastqFormat::find_boundaries(&boundary_state, batch1).expect("find_boundaries failed");
assert_eq!(boundary_batch1.streams[0].offsets.len() - 1, 2); assert_eq!(boundary_batch1.streams[1].offsets.len() - 1, 2);
let batch2 = FastqDecompressedBatch {
chunks: vec![
FastqDecompressedChunk { stream_idx: 0, data: b"@r4\nAAAA\n+\nIIII\n".to_vec() },
FastqDecompressedChunk {
stream_idx: 1,
data: b"@r3\nTTTT\n+\nJJJJ\n@r4\nTTTT\n+\nJJJJ\n".to_vec(),
},
],
serial: 1,
};
let boundary_batch2 =
FastqFormat::find_boundaries(&boundary_state, batch2).expect("find_boundaries failed");
assert_eq!(
boundary_batch2.streams[0].offsets.len() - 1,
2,
"Stream 0 should have 2 records (leftover + new)"
);
assert_eq!(
boundary_batch2.streams[1].offsets.len() - 1,
2,
"Stream 1 should have 2 records"
);
let parsed = FastqFormat::parse_records(boundary_batch2).expect("parse_records failed");
assert_eq!(
parsed.streams[0].records[0].name(),
b"r3",
"First record should be r3 from leftover"
);
assert_eq!(parsed.streams[0].records[1].name(), b"r4", "Second record should be r4");
}
#[test]
fn test_find_boundaries_processes_leftover_without_new_chunk() {
let boundary_state = FastqBoundaryState::new(2);
let batch1 = FastqDecompressedBatch {
chunks: vec![
FastqDecompressedChunk {
stream_idx: 0,
data: b"@r1\nAAAA\n+\nIIII\n@r2\nAAAA\n+\nIIII\n".to_vec(),
},
FastqDecompressedChunk { stream_idx: 1, data: b"@r1\nTTTT\n+\nJJJJ\n".to_vec() },
],
serial: 0,
};
let boundary_batch1 =
FastqFormat::find_boundaries(&boundary_state, batch1).expect("find_boundaries failed");
assert_eq!(boundary_batch1.streams[0].offsets.len() - 1, 1);
assert_eq!(boundary_batch1.streams[1].offsets.len() - 1, 1);
assert!(!boundary_state.stream_states[0].lock().leftover.is_empty());
let batch2 = FastqDecompressedBatch {
chunks: vec![FastqDecompressedChunk {
stream_idx: 1,
data: b"@r2\nTTTT\n+\nJJJJ\n".to_vec(),
}],
serial: 1,
};
let boundary_batch2 =
FastqFormat::find_boundaries(&boundary_state, batch2).expect("find_boundaries failed");
assert_eq!(boundary_batch2.streams.len(), 2, "Both streams should be present");
let stream0 =
boundary_batch2.streams.iter().find(|s| s.stream_idx == 0).expect("stream not found");
let stream1 =
boundary_batch2.streams.iter().find(|s| s.stream_idx == 1).expect("stream not found");
assert_eq!(stream0.offsets.len() - 1, 1, "Stream 0 should have 1 record from leftover");
assert_eq!(stream1.offsets.len() - 1, 1, "Stream 1 should have 1 record");
assert!(
boundary_state.stream_states[0].lock().leftover.is_empty(),
"Stream 0 leftover should be consumed"
);
}
#[test]
fn test_find_boundaries_equal_counts_no_alignment_needed() {
let boundary_state = FastqBoundaryState::new(2);
let batch = FastqDecompressedBatch {
chunks: vec![
FastqDecompressedChunk {
stream_idx: 0,
data: b"@r1\nACGT\n+\nIIII\n@r2\nACGT\n+\nIIII\n".to_vec(),
},
FastqDecompressedChunk {
stream_idx: 1,
data: b"@r1\nTTTT\n+\nJJJJ\n@r2\nTTTT\n+\nJJJJ\n".to_vec(),
},
],
serial: 0,
};
let boundary_batch =
FastqFormat::find_boundaries(&boundary_state, batch).expect("find_boundaries failed");
assert_eq!(boundary_batch.streams[0].offsets.len() - 1, 2);
assert_eq!(boundary_batch.streams[1].offsets.len() - 1, 2);
assert!(boundary_state.stream_states[0].lock().leftover.is_empty());
assert!(boundary_state.stream_states[1].lock().leftover.is_empty());
}
#[test]
fn test_find_boundaries_single_stream_no_alignment() {
let boundary_state = FastqBoundaryState::new(1);
let batch = FastqDecompressedBatch {
chunks: vec![FastqDecompressedChunk {
stream_idx: 0,
data: b"@r1\nACGT\n+\nIIII\n@r2\nACGT\n+\nIIII\n@r3\nACGT\n+\nIIII\n".to_vec(),
}],
serial: 0,
};
let boundary_batch =
FastqFormat::find_boundaries(&boundary_state, batch).expect("find_boundaries failed");
assert_eq!(boundary_batch.streams[0].offsets.len() - 1, 3);
assert!(boundary_state.stream_states[0].lock().leftover.is_empty());
}
#[test]
fn test_find_boundaries_empty_batch_with_leftover() {
let boundary_state = FastqBoundaryState::new(2);
let batch1 = FastqDecompressedBatch {
chunks: vec![
FastqDecompressedChunk {
stream_idx: 0,
data: b"@r1\nAAAA\n+\nIIII\n@r2\nAAAA\n+\nIIII\n".to_vec(),
},
FastqDecompressedChunk { stream_idx: 1, data: b"@r1\nTTTT\n+\nJJJJ\n".to_vec() },
],
serial: 0,
};
let _ =
FastqFormat::find_boundaries(&boundary_state, batch1).expect("find_boundaries failed");
assert!(!boundary_state.stream_states[0].lock().leftover.is_empty());
let batch2 = FastqDecompressedBatch {
chunks: vec![FastqDecompressedChunk {
stream_idx: 1,
data: b"@r2\nTTTT\n+\nJJJJ\n".to_vec(),
}],
serial: 1,
};
let boundary_batch2 =
FastqFormat::find_boundaries(&boundary_state, batch2).expect("find_boundaries failed");
assert_eq!(boundary_batch2.streams.len(), 2);
assert!(boundary_state.stream_states[0].lock().leftover.is_empty());
}
#[test]
fn test_parse_records_preserves_stream_idx() {
let boundary_batch = FastqBoundaryBatch {
streams: vec![
FastqStreamBoundaries {
stream_idx: 1, data: b"@read1\nTTTT\n+\nJJJJ\n".to_vec(),
offsets: vec![0, 20],
},
FastqStreamBoundaries {
stream_idx: 0, data: b"@read1\nACGT\n+\nIIII\n".to_vec(),
offsets: vec![0, 20],
},
],
serial: 42,
};
let parsed = FastqFormat::parse_records(boundary_batch).expect("parse_records failed");
assert_eq!(parsed.serial, 42);
assert_eq!(parsed.streams.len(), 2);
assert_eq!(
parsed.streams[0].stream_idx, 1,
"First parsed stream should be R2 (stream_idx=1)"
);
assert_eq!(
parsed.streams[1].stream_idx, 0,
"Second parsed stream should be R1 (stream_idx=0)"
);
assert_eq!(parsed.streams[0].records[0].sequence(), b"TTTT");
assert_eq!(parsed.streams[1].records[0].sequence(), b"ACGT");
}
#[test]
fn test_create_templates_from_reversed_streams() {
let streams = vec![
FastqParsedStream {
stream_idx: 1, records: vec![
FastqRecord::from_slice(b"@read1\nTTTT\n+\nJJJJ\n").unwrap(), ],
},
FastqParsedStream {
stream_idx: 0, records: vec![
FastqRecord::from_slice(b"@read1\nACGT\n+\nIIII\n").unwrap(), ],
},
];
let templates =
create_templates_from_streams(streams).expect("create templates from streams");
assert_eq!(templates.len(), 1);
assert_eq!(templates[0].records.len(), 2);
assert_eq!(
templates[0].records[0].sequence(),
b"ACGT",
"First record in template must be R1 (ACGT), not R2"
);
assert_eq!(
templates[0].records[1].sequence(),
b"TTTT",
"Second record in template must be R2 (TTTT), not R1"
);
}
#[test]
fn test_create_templates_from_correctly_ordered_streams() {
let streams = vec![
FastqParsedStream {
stream_idx: 0,
records: vec![FastqRecord::from_slice(b"@read1\nACGT\n+\nIIII\n").unwrap()],
},
FastqParsedStream {
stream_idx: 1,
records: vec![FastqRecord::from_slice(b"@read1\nTTTT\n+\nJJJJ\n").unwrap()],
},
];
let templates =
create_templates_from_streams(streams).expect("create templates from streams");
assert_eq!(templates.len(), 1);
assert_eq!(templates[0].records[0].sequence(), b"ACGT", "R1 should be first");
assert_eq!(templates[0].records[1].sequence(), b"TTTT", "R2 should be second");
}
#[test]
fn test_end_to_end_reversed_stream_order_at_eof() {
let boundary_state = FastqBoundaryState::new(2);
let batch1 = FastqDecompressedBatch {
chunks: vec![
FastqDecompressedChunk {
stream_idx: 0,
data: b"@read1\nACGT\n+\nIIII\n@read2\nGGGG\n+\nJJJJ\n".to_vec(),
},
FastqDecompressedChunk { stream_idx: 1, data: b"@read1\nTTTT\n+\nKKKK\n".to_vec() },
],
serial: 0,
};
let boundary_batch1 =
FastqFormat::find_boundaries(&boundary_state, batch1).expect("find_boundaries failed");
assert_eq!(boundary_batch1.streams[0].offsets.len() - 1, 1);
assert_eq!(boundary_batch1.streams[1].offsets.len() - 1, 1);
let batch2 = FastqDecompressedBatch {
chunks: vec![FastqDecompressedChunk {
stream_idx: 1,
data: b"@read2\nCCCC\n+\nLLLL\n".to_vec(),
}],
serial: 1,
};
let boundary_batch2 =
FastqFormat::find_boundaries(&boundary_state, batch2).expect("find_boundaries failed");
assert_eq!(boundary_batch2.streams.len(), 2);
let first_stream_idx = boundary_batch2.streams[0].stream_idx;
let second_stream_idx = boundary_batch2.streams[1].stream_idx;
let parsed = FastqFormat::parse_records(boundary_batch2).expect("parse_records failed");
assert_eq!(parsed.streams[0].stream_idx, first_stream_idx);
assert_eq!(parsed.streams[1].stream_idx, second_stream_idx);
let templates =
create_templates_from_streams(parsed.streams).expect("create templates from streams");
assert_eq!(templates.len(), 1);
assert_eq!(templates[0].name, b"read2");
assert_eq!(templates[0].records.len(), 2);
assert_eq!(
templates[0].records[0].sequence(),
b"GGGG",
"records[0] must be R1 (stream 0) data, not R2"
);
assert_eq!(
templates[0].records[1].sequence(),
b"CCCC",
"records[1] must be R2 (stream 1) data, not R1"
);
}
fn make_fastq_records(records: &[(&str, &str)]) -> Vec<u8> {
let mut data = Vec::new();
for (name, seq) in records {
data.extend_from_slice(format!("@{name}\n").as_bytes());
data.extend_from_slice(seq.as_bytes());
data.push(b'\n');
data.extend_from_slice(b"+\n");
data.extend(std::iter::repeat_n(b'I', seq.len()));
data.push(b'\n');
}
data
}
#[test]
fn test_read_n_fastq_records_basic() {
let data = make_fastq_records(&[("r1", "ACGT"), ("r2", "TGCA"), ("r3", "AAAA")]);
let mut cursor = Cursor::new(data);
let (buf, offsets, at_eof) =
read_n_fastq_records(&mut cursor, 2).expect("read N FASTQ records");
assert_eq!(offsets.len(), 3); assert!(!at_eof);
assert_eq!(offsets[0], 0);
assert!(offsets[1] > 0);
assert!(offsets[2] > offsets[1]);
assert_eq!(buf[0], b'@');
}
#[test]
fn test_read_n_fastq_records_at_eof() {
let data = make_fastq_records(&[("r1", "ACGT")]);
let mut cursor = Cursor::new(data);
let (_, offsets, at_eof) =
read_n_fastq_records(&mut cursor, 5).expect("read N FASTQ records");
assert_eq!(offsets.len(), 2); assert!(at_eof);
}
#[test]
fn test_read_n_fastq_records_empty_input() {
let mut cursor = Cursor::new(Vec::<u8>::new());
let (_, offsets, at_eof) =
read_n_fastq_records(&mut cursor, 5).expect("read N FASTQ records");
assert_eq!(offsets.len(), 1); assert!(at_eof);
}
#[test]
fn test_pair_state_basic() {
let pair = PairState::new(2);
assert!(pair.is_empty());
}
#[test]
fn test_pair_state_insert_and_pop() {
let mut pair = PairState::new(2);
pair.insert(PerStreamChunk {
stream_idx: 0,
batch_num: 0,
data: b"data0".to_vec(),
offsets: Some(vec![0, 5]),
});
assert!(pair.try_pop_complete(false).is_none());
pair.insert(PerStreamChunk {
stream_idx: 1,
batch_num: 0,
data: b"data1".to_vec(),
offsets: Some(vec![0, 5]),
});
let chunks = pair.try_pop_complete(false).expect("try_pop_complete should succeed");
assert_eq!(chunks.len(), 2);
assert!(pair.is_empty());
}
#[test]
fn test_pair_state_uneven_streams() {
let mut pair = PairState::new(2);
pair.insert(PerStreamChunk {
stream_idx: 0,
batch_num: 0,
data: b"d00".to_vec(),
offsets: Some(vec![0, 3]),
});
pair.insert(PerStreamChunk {
stream_idx: 1,
batch_num: 0,
data: b"d10".to_vec(),
offsets: Some(vec![0, 3]),
});
pair.insert(PerStreamChunk {
stream_idx: 0,
batch_num: 1,
data: b"d01".to_vec(),
offsets: Some(vec![0, 3]),
});
let chunks = pair.try_pop_complete(false).expect("try_pop_complete should succeed");
assert_eq!(chunks.len(), 2);
assert!(pair.try_pop_complete(false).is_none());
let chunks = pair.try_pop_complete(true).expect("try_pop_complete should succeed");
assert_eq!(chunks.len(), 1);
assert_eq!(chunks[0].stream_idx, 0);
assert!(pair.is_empty());
}
#[test]
fn test_pair_state_count_based_completion() {
let mut pair = PairState::new(2);
pair.insert(PerStreamChunk {
stream_idx: 0,
batch_num: 0,
data: b"s0b0".to_vec(),
offsets: Some(vec![0, 4]),
});
pair.insert(PerStreamChunk {
stream_idx: 1,
batch_num: 0,
data: b"s1b0".to_vec(),
offsets: Some(vec![0, 4]),
});
let all_arrived = false;
let chunks = pair.try_pop_complete(all_arrived).expect("try_pop_complete should succeed");
assert_eq!(chunks.len(), 2);
pair.insert(PerStreamChunk {
stream_idx: 0,
batch_num: 1,
data: b"s0b1".to_vec(),
offsets: Some(vec![0, 4]),
});
let all_arrived = false;
assert!(pair.try_pop_complete(all_arrived).is_none());
let all_arrived = true;
let chunks = pair.try_pop_complete(all_arrived).expect("try_pop_complete should succeed");
assert_eq!(chunks.len(), 1);
assert_eq!(chunks[0].stream_idx, 0);
assert!(pair.is_empty());
}
#[test]
fn test_align_stream_records_equal() {
let streams = vec![
FastqStreamBoundaries {
stream_idx: 0,
data: b"@r1\nACGT\n+\nIIII\n@r2\nACGT\n+\nIIII\n".to_vec(),
offsets: vec![0, 18, 36],
},
FastqStreamBoundaries {
stream_idx: 1,
data: b"@r1\nTTTT\n+\nJJJJ\n@r2\nTTTT\n+\nJJJJ\n".to_vec(),
offsets: vec![0, 18, 36],
},
];
let batch = align_stream_records(streams, 0);
assert_eq!(batch.streams[0].offsets.len() - 1, 2);
assert_eq!(batch.streams[1].offsets.len() - 1, 2);
}
#[test]
fn test_align_stream_records_unequal() {
let streams = vec![
FastqStreamBoundaries {
stream_idx: 0,
data: b"@r1\nACGT\n+\nIIII\n@r2\nACGT\n+\nIIII\n@r3\nACGT\n+\nIIII\n".to_vec(),
offsets: vec![0, 18, 36, 54],
},
FastqStreamBoundaries {
stream_idx: 1,
data: b"@r1\nTTTT\n+\nJJJJ\n@r2\nTTTT\n+\nJJJJ\n".to_vec(),
offsets: vec![0, 18, 36],
},
];
let batch = align_stream_records(streams, 0);
assert_eq!(batch.streams[0].offsets.len() - 1, 2);
assert_eq!(batch.streams[1].offsets.len() - 1, 2);
}
#[rstest]
#[case::gzip(false)]
#[case::bgzf(true)]
fn test_active_steps(#[case] inputs_are_bgzf: bool) {
let config = FastqPipelineConfig::new(4, inputs_are_bgzf, 1);
let steps = config.active_steps();
assert_eq!(
steps.steps(),
&[Read, Decompress, FindBoundaries, Decode, Process, Serialize, Compress, Write]
);
assert!(!steps.is_active(Group));
}
#[test]
fn test_fastq_decompressed_batch_memory_estimate() {
let mut data = Vec::with_capacity(2048);
data.extend_from_slice(&[0u8; 100]);
let mut chunks = Vec::with_capacity(4);
chunks.push(FastqDecompressedChunk { stream_idx: 0, data });
let batch = FastqDecompressedBatch { chunks, serial: 0 };
let estimate = batch.estimate_heap_size();
assert!(estimate >= 2048, "estimate {estimate} should be >= 2048 (data capacity)");
let vec_overhead = 4 * std::mem::size_of::<FastqDecompressedChunk>();
assert!(
estimate >= 2048 + vec_overhead,
"estimate {estimate} should include chunk Vec overhead"
);
}
#[test]
fn test_fastq_boundary_batch_memory_estimate() {
let mut data = Vec::with_capacity(1024);
data.extend_from_slice(&[0u8; 100]);
let mut offsets = Vec::with_capacity(16);
offsets.push(0usize);
let mut streams = Vec::with_capacity(4);
streams.push(FastqStreamBoundaries { stream_idx: 0, data, offsets });
let batch = FastqBoundaryBatch { streams, serial: 0 };
let estimate = batch.estimate_heap_size();
let expected_min = 1024
+ 16 * std::mem::size_of::<usize>()
+ 4 * std::mem::size_of::<FastqStreamBoundaries>();
assert!(
estimate >= expected_min,
"estimate {estimate} should be >= {expected_min} (capacities + overhead)"
);
}
#[test]
fn test_fastq_parsed_batch_memory_estimate() {
use crate::fastq_parse::FastqRecord;
let record =
FastqRecord::from_slice(b"@read1\nACGT\n+\nIIII\n").expect("valid FASTQ record");
let mut records = Vec::with_capacity(8);
records.push(record);
let mut streams = Vec::with_capacity(4);
streams.push(FastqParsedStream { stream_idx: 0, records });
let batch = FastqParsedBatch { streams, serial: 0 };
let estimate = batch.estimate_heap_size();
let records_overhead = 8 * std::mem::size_of::<FastqRecord>();
let streams_overhead = 4 * std::mem::size_of::<FastqParsedStream>();
let expected_min = b"@read1\nACGT\n+\nIIII\n".len() + records_overhead + streams_overhead;
assert!(
estimate >= expected_min,
"estimate {estimate} should be >= {expected_min} (data capacity + overhead)"
);
}
#[test]
fn test_fastq_record_from_slice_normal() {
use crate::fastq_parse::FastqRecord;
let data = b"@read1\nACGT\n+\nIIII\n";
let rec = FastqRecord::from_slice(data).expect("valid FASTQ record");
assert_eq!(rec.name(), b"read1");
assert_eq!(rec.sequence(), b"ACGT");
assert_eq!(rec.quality(), b"IIII");
}
#[test]
fn test_fastq_record_from_slice_at_in_quality() {
use crate::fastq_parse::FastqRecord;
let data = b"@read1\nACGT\n+\n@+!I\n";
let rec = FastqRecord::from_slice(data).expect("record with @ in quality");
assert_eq!(rec.name(), b"read1");
assert_eq!(rec.sequence(), b"ACGT");
assert_eq!(rec.quality(), b"@+!I");
}
#[test]
fn test_fastq_record_from_slice_mismatched_lengths() {
use crate::fastq_parse::FastqRecord;
let data = b"@read1\nACGT\n+\nIII\n";
let result = FastqRecord::from_slice(data);
assert!(result.is_err(), "mismatched seq/qual lengths must return an error");
}
#[test]
fn test_fastq_record_from_slice_empty_data() {
use crate::fastq_parse::FastqRecord;
let result = FastqRecord::from_slice(b"");
assert!(result.is_err(), "empty data must return an error");
}
#[test]
fn test_fastq_record_from_slice_no_leading_at() {
use crate::fastq_parse::FastqRecord;
let data = b"read1\nACGT\n+\nIIII\n";
let result = FastqRecord::from_slice(data);
assert!(result.is_err(), "missing @ prefix must return an error");
}
#[test]
fn test_fastq_record_from_slice_no_trailing_newline() {
use crate::fastq_parse::FastqRecord;
let data = b"@read1\nACGT\n+\nIIII";
let rec = FastqRecord::from_slice(data).expect("record without trailing newline");
assert_eq!(rec.quality(), b"IIII");
}
#[test]
fn test_detect_prefix_end_empty() {
assert_eq!(detect_prefix_end(b""), 0);
}
#[test]
fn test_detect_prefix_end_starts_on_boundary() {
let data = b"@read1\nACGT\n+\nIIII\n";
assert_eq!(detect_prefix_end(data), 0);
}
#[test]
fn test_detect_prefix_end_mid_record() {
let suffix = b"@read2\nGGGG\n+\nJJJJ\n";
let prefix = b"CGT\n+\nIIII\n";
let mut data = prefix.to_vec();
data.extend_from_slice(suffix);
let end = detect_prefix_end(&data);
assert_eq!(&data[end..=end], b"@");
assert!(end > 0, "prefix_end must be > 0 when data starts mid-record");
}
#[test]
fn test_detect_prefix_end_single_record() {
let data = b"@r\nA\n+\nI\n";
assert_eq!(detect_prefix_end(data), 0);
}
#[test]
fn test_detect_prefix_end_insufficient_data() {
let data = b"partial_no_newlines";
assert_eq!(detect_prefix_end(data), data.len());
}
#[test]
fn test_detect_prefix_end_at_in_quality_is_not_boundary() {
let data = b"!@\n@r2\nAA\n+\nII\n";
let end = detect_prefix_end(data);
assert_eq!(&data[end..=end], b"@");
}
#[test]
fn test_detect_suffix_start_empty() {
assert_eq!(detect_suffix_start(b""), 0);
}
#[test]
fn test_detect_suffix_start_ends_on_boundary() {
let data = b"@read1\nACGT\n+\nIIII\n";
assert_eq!(detect_suffix_start(data), data.len());
}
#[test]
fn test_detect_suffix_start_ends_mid_record() {
let complete = b"@r1\nACGT\n+\nIIII\n";
let partial = b"@r2\nGG";
let mut data = complete.to_vec();
data.extend_from_slice(partial);
let suffix_start = detect_suffix_start(&data);
assert_eq!(suffix_start, complete.len());
}
#[test]
fn test_detect_suffix_start_single_record_whole_buffer() {
let data = b"@r\nA\n+\nI\n";
assert_eq!(detect_suffix_start(data), data.len());
}
#[test]
fn test_detect_suffix_start_insufficient_data() {
let data = b"@r1\nAC";
assert_eq!(detect_suffix_start(data), 0);
}
#[test]
fn test_detect_suffix_start_multiple_records() {
let data = b"@r1\nACGT\n+\nIIII\n@r2\nGGGG\n+\nJJJJ\n";
assert_eq!(detect_suffix_start(data), data.len());
}
#[test]
fn test_prefix_suffix_round_trip() {
let prefix_bytes = b"GT\n+\nIIII\n";
let middle_bytes = b"@r2\nACGT\n+\nJJJJ\n";
let suffix_bytes = b"@r3\nAA";
let mut data = prefix_bytes.to_vec();
data.extend_from_slice(middle_bytes);
data.extend_from_slice(suffix_bytes);
let prefix_end = detect_prefix_end(&data);
let suffix_start = detect_suffix_start(&data[prefix_end..]) + prefix_end;
assert_eq!(prefix_end, prefix_bytes.len());
assert_eq!(suffix_start, prefix_bytes.len() + middle_bytes.len());
assert_eq!(&data[prefix_end..suffix_start], middle_bytes);
}
#[test]
fn test_stitch_cross_block_record_both_empty() {
let result = stitch_cross_block_record(b"", b"").expect("no error for empty slices");
assert!(result.is_none(), "both empty → None");
}
#[test]
fn test_stitch_cross_block_record_valid() {
let suffix = b"@r1\nACGT\n+\n";
let prefix = b"IIII\n";
let result = stitch_cross_block_record(suffix, prefix)
.expect("valid cross-block record")
.expect("record should be Some");
assert_eq!(result.name(), b"r1");
assert_eq!(result.sequence(), b"ACGT");
assert_eq!(result.quality(), b"IIII");
}
#[test]
fn test_block_parse_split_prefix_middle_suffix() {
let prefix_frag = b"\nIIII\n"; let complete_r2 = b"@r2\nGGGG\n+\nJJJJ\n";
let suffix_frag = b"@r3\nTT";
let mut data = prefix_frag.to_vec();
data.extend_from_slice(complete_r2);
data.extend_from_slice(suffix_frag);
let prefix_end = detect_prefix_end(&data);
let suffix_start = if prefix_end >= data.len() {
data.len()
} else {
detect_suffix_start(&data[prefix_end..]) + prefix_end
};
let prefix_bytes = &data[..prefix_end];
let suffix_bytes = &data[suffix_start..];
let middle = &data[prefix_end..suffix_start];
assert_eq!(prefix_bytes, prefix_frag);
assert_eq!(suffix_bytes, suffix_frag);
assert_eq!(middle, complete_r2);
let offsets = fgumi_simd_fastq::find_record_offsets(middle);
assert_eq!(offsets.len(), 2, "one complete record → two offsets (start + end)");
let rec =
FastqRecord::from_slice(&middle[offsets[0]..offsets[1]]).expect("valid r2 record");
assert_eq!(rec.name(), b"r2");
assert_eq!(rec.sequence(), b"GGGG");
assert_eq!(rec.quality(), b"JJJJ");
}
#[test]
fn test_block_parse_starts_on_boundary() {
let data = b"@r1\nACGT\n+\nIIII\n@r2\nGGGG\n+\nJJJJ\n";
let prefix_end = detect_prefix_end(data);
assert_eq!(prefix_end, 0, "no prefix when data starts on @");
let suffix_start = detect_suffix_start(&data[prefix_end..]) + prefix_end;
assert_eq!(suffix_start, data.len(), "no suffix when data ends on record boundary");
let offsets = fgumi_simd_fastq::find_record_offsets(data);
assert_eq!(offsets.len(), 3, "two complete records → three offsets");
}
#[test]
fn test_block_parse_entire_buffer_is_prefix() {
let data = b"only_partial_no_newlines";
let prefix_end = detect_prefix_end(data);
assert_eq!(prefix_end, data.len());
}
fn make_block_parsed(
block_idx: u64,
stream_idx: usize,
records: Vec<FastqRecord>,
prefix_bytes: Vec<u8>,
suffix_bytes: Vec<u8>,
) -> BlockParsed {
BlockParsed { block_idx, stream_idx, records, prefix_bytes, suffix_bytes }
}
fn make_record(name: &str, seq: &str, qual: &str) -> FastqRecord {
let raw = format!("@{name}\n{seq}\n+\n{qual}\n");
FastqRecord::from_slice(raw.as_bytes()).unwrap()
}
#[test]
fn test_block_merge_state_single_stream_basic() {
let mut state = BlockMergeState::new();
assert!(state.is_empty());
let r1 = make_record("r1", "ACGT", "IIII");
let r2 = make_record("r2", "GGGG", "JJJJ");
let block0 = make_block_parsed(0, 0, vec![r1.clone()], vec![], vec![]);
let block1 = make_block_parsed(1, 0, vec![r2.clone()], vec![], vec![]);
state.r1_pending.insert(0, block0);
state.r1_pending.insert(1, block1);
let r1_next = state.r1_next;
let r1_block = state.r1_pending.remove(&r1_next).unwrap();
state.r1_next += 1;
let cross0 =
stitch_cross_block_record(&state.r1_suffix_bytes, &r1_block.prefix_bytes).unwrap();
assert!(cross0.is_none(), "no cross-block record in block 0");
let mut all_r1: Vec<FastqRecord> = std::mem::take(&mut state.r1_surplus);
if let Some(rec) = cross0 {
all_r1.push(rec);
}
all_r1.extend(r1_block.records);
state.r1_suffix_bytes = r1_block.suffix_bytes;
assert_eq!(all_r1.len(), 1);
assert_eq!(all_r1[0].name(), b"r1");
assert_eq!(all_r1[0].sequence(), b"ACGT");
}
#[test]
fn test_block_merge_cross_block_record() {
let suffix_bytes = b"@r1\nACGT\n+\n".to_vec();
let prefix_bytes = b"IIII\n".to_vec();
let cross = stitch_cross_block_record(&suffix_bytes, &prefix_bytes)
.expect("valid cross-block record")
.expect("should be Some");
assert_eq!(cross.name(), b"r1");
assert_eq!(cross.sequence(), b"ACGT");
assert_eq!(cross.quality(), b"IIII");
}
#[test]
fn test_block_merge_r1_surplus_carries() {
let r1_records = [
make_record("r1", "AAAA", "IIII"),
make_record("r2", "CCCC", "IIII"),
make_record("r3", "GGGG", "IIII"),
];
let r2_records = [make_record("r1", "TTTT", "JJJJ"), make_record("r2", "CCCC", "JJJJ")];
let pair_count = r1_records.len().min(r2_records.len()); let surplus: Vec<FastqRecord> = r1_records[pair_count..].to_vec();
assert_eq!(pair_count, 2);
assert_eq!(surplus.len(), 1);
assert_eq!(surplus[0].name(), b"r3");
}
#[test]
fn test_block_merge_r2_surplus_carries() {
let r1_records = [make_record("r1", "AAAA", "IIII")];
let r2_records = [make_record("r1", "TTTT", "JJJJ"), make_record("r2", "CCCC", "JJJJ")];
let pair_count = r1_records.len().min(r2_records.len()); let r2_surplus: Vec<FastqRecord> = r2_records[pair_count..].to_vec();
assert_eq!(pair_count, 1);
assert_eq!(r2_surplus.len(), 1);
assert_eq!(r2_surplus[0].name(), b"r2");
}
#[test]
fn test_block_merge_state_out_of_order_insertion() {
let mut state = BlockMergeState::new();
let r1 = make_record("r1", "ACGT", "IIII");
let r2 = make_record("r2", "GGGG", "JJJJ");
state.r1_pending.insert(1, make_block_parsed(1, 0, vec![r2], vec![], vec![]));
state.r1_pending.insert(0, make_block_parsed(0, 0, vec![r1], vec![], vec![]));
let first_key = *state.r1_pending.keys().next().unwrap();
assert_eq!(first_key, 0, "BTreeMap must yield block 0 first");
}
#[test]
fn test_block_merge_state_empty() {
let state = BlockMergeState::new();
assert!(state.is_empty());
assert!(state.r1_pending.is_empty());
assert!(state.r2_pending.is_empty());
assert!(state.r1_surplus.is_empty());
assert!(state.r2_surplus.is_empty());
assert!(state.r1_suffix_bytes.is_empty());
assert!(state.r2_suffix_bytes.is_empty());
assert_eq!(state.serial_out, 0);
}
fn make_fastq_state() -> FastqPipelineState<Cursor<Vec<u8>>, Vec<u8>> {
let config = FastqPipelineConfig::new(2, false, 6);
let readers = vec![StreamReader::Decompressed(Cursor::new(Vec::new()))];
let output: Box<dyn std::io::Write + Send> = Box::new(Vec::<u8>::new());
FastqPipelineState::new(config, readers, output)
}
#[test]
fn test_fastq_backpressure_reports_write_reorder_memory() {
let state = make_fastq_state();
let header = Header::default();
let process_fn = |_t: FastqTemplate| -> io::Result<Vec<u8>> { Ok(Vec::new()) };
let serialize_fn =
|_p: Vec<u8>, _h: &Header, _buf: &mut Vec<u8>| -> io::Result<u64> { Ok(0) };
let ctx = FastqStepContext {
state: &state,
header: &header,
process_fn: &process_fn,
serialize_fn: &serialize_fn,
is_reader: false,
};
let active = ActiveSteps::all();
let worker = FastqWorkerState::new(6, 0, 2, SchedulerStrategy::default(), active);
let bp = ctx.get_backpressure(&worker);
assert!(!bp.memory_high, "memory_high should be false when write reorder buffer is empty");
assert!(
bp.memory_drained,
"memory_drained should be true when write reorder buffer is empty"
);
let threshold = BACKPRESSURE_THRESHOLD_BYTES;
state.output.write_reorder_state.add_heap_bytes(threshold + 1);
let bp = ctx.get_backpressure(&worker);
assert!(
bp.memory_high,
"memory_high should be true when write reorder buffer exceeds limit"
);
assert!(
!bp.memory_drained,
"memory_drained should be false when write reorder buffer exceeds limit"
);
state.output.set_draining(true);
let bp = ctx.get_backpressure(&worker);
assert!(!bp.memory_high, "memory_high should be false during drain even with high memory");
state.output.write_reorder_state.sub_heap_bytes(threshold + 1);
state.output.set_draining(false);
let bp = ctx.get_backpressure(&worker);
assert!(!bp.memory_high, "memory_high should be false after draining memory");
assert!(
bp.memory_drained,
"memory_drained should be true after draining below low-water mark"
);
}
}