1use crossbeam_queue::ArrayQueue;
13use log::info;
14use parking_lot::Mutex;
15use std::io::{self, Read, Write};
16use std::sync::Arc;
17use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering};
18use std::thread;
19use std::time::{Duration, Instant};
20
21use crate::progress::ProgressTracker;
22
23use super::deadlock::{DeadlockState, QueueSnapshot, check_deadlock_and_restore};
24
25use crate::bgzf_reader::{RawBgzfBlock, decompress_block_into, read_raw_blocks};
26use crate::read_info::LibraryIndex;
27use crate::reorder_buffer::ReorderBuffer;
28use noodles::sam::alignment::RecordBuf;
29use noodles::sam::alignment::record::data::field::Tag;
30
31use super::scheduler::{BackpressureState, Scheduler, SchedulerStrategy, create_scheduler};
32
33#[cfg(feature = "memory-debug")]
39#[derive(Debug, Clone)]
40pub struct MemoryBreakdown {
41 pub system_rss_gb: f64,
43 pub tracked_total_gb: f64,
45 pub untracked_gb: f64,
47
48 pub q1_mb: f64,
50 pub q2_mb: f64,
51 pub q3_mb: f64,
52 pub q4_gb: f64,
53 pub q5_gb: f64,
54 pub q6_mb: f64,
55 pub q7_mb: f64,
56
57 pub position_groups_gb: f64,
59 pub templates_gb: f64,
60 pub reorder_buffers_mb: f64,
61 pub grouper_mb: f64,
62 pub worker_local_mb: f64,
63
64 pub decompressors_mb: f64,
66 pub compressors_mb: f64,
67 pub worker_buffers_mb: f64,
68 pub io_buffers_mb: f64,
69 pub thread_stacks_mb: f64,
70 pub queue_capacity_mb: f64,
71 pub infrastructure_gb: f64,
73}
74
75#[cfg(feature = "memory-debug")]
78#[derive(Debug)]
79pub struct MemoryDebugStats {
80 pub q1_memory_bytes: AtomicU64,
83 pub q2_memory_bytes: AtomicU64,
85 pub q3_memory_bytes: AtomicU64,
87 pub q4_memory_bytes: AtomicU64,
89 pub q5_memory_bytes: AtomicU64,
91 pub q6_memory_bytes: AtomicU64,
93 pub q7_memory_bytes: AtomicU64,
95
96 pub position_group_processing_bytes: AtomicU64,
99 pub template_processing_bytes: AtomicU64,
101 pub reorder_buffer_bytes: AtomicU64,
103 pub grouper_memory_bytes: AtomicU64,
105 pub worker_local_memory_bytes: AtomicU64,
107
108 pub decompressor_memory_bytes: AtomicU64,
111 pub compressor_memory_bytes: AtomicU64,
113 pub worker_buffer_memory_bytes: AtomicU64,
115 pub io_buffer_memory_bytes: AtomicU64,
117 pub thread_stack_memory_bytes: AtomicU64,
119 pub queue_capacity_memory_bytes: AtomicU64,
121
122 pub system_rss_bytes: AtomicU64,
125}
126
127#[cfg(feature = "memory-debug")]
128impl MemoryDebugStats {
129 #[must_use]
131 pub fn new() -> Self {
132 Self {
133 q1_memory_bytes: AtomicU64::new(0),
134 q2_memory_bytes: AtomicU64::new(0),
135 q3_memory_bytes: AtomicU64::new(0),
136 q4_memory_bytes: AtomicU64::new(0),
137 q5_memory_bytes: AtomicU64::new(0),
138 q6_memory_bytes: AtomicU64::new(0),
139 q7_memory_bytes: AtomicU64::new(0),
140 position_group_processing_bytes: AtomicU64::new(0),
141 template_processing_bytes: AtomicU64::new(0),
142 reorder_buffer_bytes: AtomicU64::new(0),
143 grouper_memory_bytes: AtomicU64::new(0),
144 worker_local_memory_bytes: AtomicU64::new(0),
145 decompressor_memory_bytes: AtomicU64::new(0),
146 compressor_memory_bytes: AtomicU64::new(0),
147 worker_buffer_memory_bytes: AtomicU64::new(0),
148 io_buffer_memory_bytes: AtomicU64::new(0),
149 thread_stack_memory_bytes: AtomicU64::new(0),
150 queue_capacity_memory_bytes: AtomicU64::new(0),
151 system_rss_bytes: AtomicU64::new(0),
152 }
153 }
154}
155
156#[cfg(feature = "memory-debug")]
161const THREAD_ID_UNSET: usize = usize::MAX;
163
164#[cfg(feature = "memory-debug")]
165thread_local! {
166 static THREAD_ID: std::cell::Cell<usize> = std::cell::Cell::new(THREAD_ID_UNSET);
167}
168
169#[cfg(feature = "memory-debug")]
170pub fn get_or_assign_thread_id() -> usize {
174 THREAD_ID.with(|id| {
175 let current = id.get();
176 if current == THREAD_ID_UNSET {
177 use std::sync::atomic::{AtomicUsize, Ordering};
178 static THREAD_COUNTER: AtomicUsize = AtomicUsize::new(0);
179
180 let new_id = THREAD_COUNTER.fetch_add(1, Ordering::Relaxed) % MAX_THREADS;
181 id.set(new_id);
182 new_id
183 } else {
184 current
185 }
186 })
187}
188
189#[cfg(feature = "memory-debug")]
194pub fn get_process_rss_bytes() -> Option<u64> {
196 if let Ok(status) = std::fs::read_to_string("/proc/self/status") {
198 return status
199 .lines()
200 .find(|line| line.starts_with("VmRSS:"))?
201 .split_whitespace()
202 .nth(1)?
203 .parse::<u64>()
204 .ok()
205 .map(|kb| kb * 1024); }
207
208 use std::sync::Mutex;
210 use sysinfo::{ProcessRefreshKind, RefreshKind, System};
211
212 static RSS_SYSTEM: std::sync::OnceLock<Mutex<System>> = std::sync::OnceLock::new();
213
214 let sys = RSS_SYSTEM.get_or_init(|| {
215 Mutex::new(System::new_with_specifics(
216 RefreshKind::nothing().with_processes(ProcessRefreshKind::nothing().with_memory()),
217 ))
218 });
219
220 let mut sys_guard = sys.lock().ok()?;
221 sys_guard.refresh_processes_specifics(
222 sysinfo::ProcessesToUpdate::All,
223 false,
224 ProcessRefreshKind::nothing().with_memory(),
225 );
226
227 let pid = sysinfo::get_current_pid().ok()?;
229 let process = sys_guard.process(pid)?;
230 Some(process.memory()) }
232
233#[cfg(feature = "memory-debug")]
234pub fn log_comprehensive_memory_stats(stats: &PipelineStats) {
236 if let Some(rss) = get_process_rss_bytes() {
238 stats.update_system_rss(rss);
239 }
240
241 let breakdown = stats.get_memory_breakdown();
245
246 if breakdown.system_rss_gb > 0.0 {
248 let pct = (breakdown.tracked_total_gb / breakdown.system_rss_gb * 100.0) as u32;
249 log::info!(
250 "MEMORY: RSS={:.1}GB Tracked={:.1}GB ({}%) | Queue: Q1:{:.0}MB Q2:{:.0}MB Q3:{:.0}MB Q4:{:.1}GB Q5:{:.1}GB Q6:{:.0}MB Q7:{:.0}MB | Proc: Pos={:.1}GB Tmpl={:.1}GB | Infra={:.0}MB",
251 breakdown.system_rss_gb,
252 breakdown.tracked_total_gb,
253 pct,
254 breakdown.q1_mb,
255 breakdown.q2_mb,
256 breakdown.q3_mb,
257 breakdown.q4_gb,
258 breakdown.q5_gb,
259 breakdown.q6_mb,
260 breakdown.q7_mb,
261 breakdown.position_groups_gb,
262 breakdown.templates_gb,
263 breakdown.infrastructure_gb * 1e3, );
265 } else {
266 log::info!(
267 "MEMORY: Tracked={:.1}GB | Queue: Q1:{:.0}MB Q2:{:.0}MB Q3:{:.0}MB Q4:{:.1}GB Q5:{:.1}GB Q6:{:.0}MB Q7:{:.0}MB | Proc: Pos={:.1}GB Tmpl={:.1}GB | Infra={:.0}MB",
268 breakdown.tracked_total_gb,
269 breakdown.q1_mb,
270 breakdown.q2_mb,
271 breakdown.q3_mb,
272 breakdown.q4_gb,
273 breakdown.q5_gb,
274 breakdown.q6_mb,
275 breakdown.q7_mb,
276 breakdown.position_groups_gb,
277 breakdown.templates_gb,
278 breakdown.infrastructure_gb * 1e3,
279 );
280 }
281
282 if breakdown.system_rss_gb > 0.0 {
284 let untracked_pct = ((breakdown.untracked_gb / breakdown.system_rss_gb) * 100.0) as u32;
285 if breakdown.untracked_gb > 1.0 {
286 log::info!(
287 " Untracked: {:.1}GB ({}%) = allocator fragmentation + noodles internals",
288 breakdown.untracked_gb,
289 untracked_pct,
290 );
291 }
292 }
293}
294
295#[cfg(feature = "memory-debug")]
296pub fn start_memory_monitor(
298 stats: Arc<PipelineStats>,
299 shutdown_signal: Arc<AtomicBool>,
300 report_interval_secs: u64,
301) -> thread::JoinHandle<()> {
302 thread::spawn(move || {
303 let mut last_report = Instant::now();
304 let report_interval = Duration::from_secs(report_interval_secs);
305
306 let mut last_rss: u64 = 0;
307 let mut peak_rss: u64 = 0;
308 let mut stats_printed = false;
309 while !shutdown_signal.load(Ordering::Relaxed) {
310 if last_report.elapsed() >= report_interval {
311 log_comprehensive_memory_stats(&stats);
312 let current_rss = stats.memory.system_rss_bytes.load(Ordering::Relaxed);
313 if !stats_printed
315 && current_rss > 0
316 && last_rss > 0
317 && current_rss < last_rss
318 && peak_rss > 4_000_000_000
319 {
320 log::info!("=== MIMALLOC STATS AT PEAK (no mi_collect) ===");
321 unsafe {
326 libmimalloc_sys::mi_stats_print_out(None, std::ptr::null_mut());
327 }
328 stats_printed = true;
329 }
330 if current_rss > peak_rss {
331 peak_rss = current_rss;
332 }
333 last_rss = current_rss;
334 last_report = Instant::now();
335 }
336 thread::sleep(Duration::from_millis(100));
337 }
338 })
339}
340
341pub trait BatchWeight {
358 fn batch_weight(&self) -> usize;
361}
362
363pub trait MemoryEstimate {
378 fn estimate_heap_size(&self) -> usize;
388}
389
390#[derive(Debug)]
419#[allow(clippy::struct_field_names)]
420pub struct MemoryTracker {
421 current_bytes: AtomicU64,
423 peak_bytes: AtomicU64,
425 limit_bytes: u64,
427}
428
429impl MemoryTracker {
430 #[must_use]
435 pub fn new(limit_bytes: u64) -> Self {
436 Self { current_bytes: AtomicU64::new(0), peak_bytes: AtomicU64::new(0), limit_bytes }
437 }
438
439 #[must_use]
441 pub fn unlimited() -> Self {
442 Self::new(0)
443 }
444
445 pub fn try_add(&self, bytes: usize) -> bool {
454 if self.limit_bytes == 0 {
455 let new_total =
457 self.current_bytes.fetch_add(bytes as u64, Ordering::Relaxed) + bytes as u64;
458 self.update_peak(new_total);
459 return true;
460 }
461
462 loop {
464 let current = self.current_bytes.load(Ordering::Relaxed);
465
466 if current >= self.limit_bytes {
468 return false;
469 }
470
471 let new_total = current + bytes as u64;
473
474 if self
476 .current_bytes
477 .compare_exchange_weak(current, new_total, Ordering::Relaxed, Ordering::Relaxed)
478 .is_ok()
479 {
480 self.update_peak(new_total);
481 return true;
482 }
483 }
485 }
486
487 #[inline]
489 fn update_peak(&self, new_total: u64) {
490 let mut current_peak = self.peak_bytes.load(Ordering::Relaxed);
491 while new_total > current_peak {
492 match self.peak_bytes.compare_exchange_weak(
493 current_peak,
494 new_total,
495 Ordering::Relaxed,
496 Ordering::Relaxed,
497 ) {
498 Ok(_) => break,
499 Err(actual) => current_peak = actual,
500 }
501 }
502 }
503
504 pub fn remove(&self, bytes: usize) {
507 let bytes = bytes as u64;
508 let mut current = self.current_bytes.load(Ordering::Relaxed);
509 loop {
510 let new_val = current.saturating_sub(bytes);
511 match self.current_bytes.compare_exchange_weak(
512 current,
513 new_val,
514 Ordering::Relaxed,
515 Ordering::Relaxed,
516 ) {
517 Ok(_) => break,
518 Err(actual) => current = actual,
519 }
520 }
521 }
522
523 pub fn add(&self, bytes: usize) {
526 let new_total =
527 self.current_bytes.fetch_add(bytes as u64, Ordering::Relaxed) + bytes as u64;
528 self.update_peak(new_total);
529 }
530
531 #[must_use]
533 pub fn current(&self) -> u64 {
534 self.current_bytes.load(Ordering::Relaxed)
535 }
536
537 #[must_use]
539 pub fn limit(&self) -> u64 {
540 self.limit_bytes
541 }
542
543 #[must_use]
545 pub fn peak(&self) -> u64 {
546 self.peak_bytes.load(Ordering::Relaxed)
547 }
548
549 #[must_use]
559 pub fn is_at_limit(&self) -> bool {
560 let backpressure_threshold = if self.limit_bytes == 0 {
563 BACKPRESSURE_THRESHOLD_BYTES
564 } else {
565 self.limit_bytes.min(BACKPRESSURE_THRESHOLD_BYTES)
566 };
567 self.current() >= backpressure_threshold
568 }
569
570 #[must_use]
579 pub fn is_below_drain_threshold(&self) -> bool {
580 let backpressure_threshold = if self.limit_bytes == 0 {
582 BACKPRESSURE_THRESHOLD_BYTES
583 } else {
584 self.limit_bytes.min(BACKPRESSURE_THRESHOLD_BYTES)
585 };
586 self.current() < backpressure_threshold / 2
587 }
588}
589
590impl Default for MemoryTracker {
591 fn default() -> Self {
592 Self::unlimited()
593 }
594}
595
596#[derive(Debug, Clone, Copy, PartialEq, Eq)]
612pub enum StepResult {
613 Success,
615 OutputFull,
617 InputEmpty,
619}
620
621impl StepResult {
622 #[inline]
624 #[must_use]
625 pub fn is_success(self) -> bool {
626 matches!(self, StepResult::Success)
627 }
628}
629
630pub const PROGRESS_LOG_INTERVAL: u64 = 1_000_000;
632
633pub const BACKPRESSURE_THRESHOLD_BYTES: u64 = 512 * 1024 * 1024; pub const Q5_BACKPRESSURE_THRESHOLD_BYTES: u64 = 256 * 1024 * 1024; #[derive(Debug)]
701pub struct ReorderBufferState {
702 pub next_seq: AtomicU64,
705 pub heap_bytes: AtomicU64,
708 memory_limit: u64,
710}
711
712impl ReorderBufferState {
713 #[must_use]
718 pub fn new(memory_limit: u64) -> Self {
719 Self { next_seq: AtomicU64::new(0), heap_bytes: AtomicU64::new(0), memory_limit }
720 }
721
722 #[must_use]
728 pub fn can_proceed(&self, serial: u64) -> bool {
729 let limit = self.effective_limit();
730 let next_seq = self.next_seq.load(Ordering::Acquire);
731 let heap_bytes = self.heap_bytes.load(Ordering::Acquire);
732
733 if serial == next_seq {
736 return true;
737 }
738
739 let effective_limit = limit / 2;
741 heap_bytes < effective_limit
742 }
743
744 #[must_use]
749 pub fn is_memory_high(&self) -> bool {
750 let threshold = self.effective_limit();
751 self.heap_bytes.load(Ordering::Acquire) >= threshold
752 }
753
754 #[must_use]
759 pub fn is_memory_drained(&self) -> bool {
760 let threshold = self.effective_limit();
761 self.heap_bytes.load(Ordering::Acquire) < threshold / 2
762 }
763
764 #[inline]
766 #[must_use]
767 fn effective_limit(&self) -> u64 {
768 if self.memory_limit == 0 {
769 BACKPRESSURE_THRESHOLD_BYTES
770 } else {
771 self.memory_limit.min(BACKPRESSURE_THRESHOLD_BYTES)
772 }
773 }
774
775 #[inline]
777 pub fn add_heap_bytes(&self, bytes: u64) {
778 self.heap_bytes.fetch_add(bytes, Ordering::AcqRel);
779 }
780
781 #[inline]
783 pub fn sub_heap_bytes(&self, bytes: u64) {
784 self.heap_bytes.fetch_sub(bytes, Ordering::AcqRel);
785 }
786
787 #[inline]
789 pub fn update_next_seq(&self, new_seq: u64) {
790 self.next_seq.store(new_seq, Ordering::Release);
791 }
792
793 #[inline]
795 #[must_use]
796 pub fn get_next_seq(&self) -> u64 {
797 self.next_seq.load(Ordering::Acquire)
798 }
799
800 #[inline]
802 #[must_use]
803 pub fn get_heap_bytes(&self) -> u64 {
804 self.heap_bytes.load(Ordering::Acquire)
805 }
806
807 #[inline]
809 #[must_use]
810 pub fn get_memory_limit(&self) -> u64 {
811 self.memory_limit
812 }
813}
814
815use crate::bgzf_writer::{CompressedBlock, InlineBgzfCompressor};
820
821#[derive(Default)]
827pub struct RawBlockBatch {
828 pub blocks: Vec<RawBgzfBlock>,
830}
831
832impl RawBlockBatch {
833 #[must_use]
835 pub fn new() -> Self {
836 Self { blocks: Vec::new() }
837 }
838
839 #[must_use]
841 pub fn with_capacity(capacity: usize) -> Self {
842 Self { blocks: Vec::with_capacity(capacity) }
843 }
844
845 #[must_use]
847 pub fn len(&self) -> usize {
848 self.blocks.len()
849 }
850
851 #[must_use]
853 pub fn is_empty(&self) -> bool {
854 self.blocks.is_empty()
855 }
856
857 #[must_use]
859 pub fn total_uncompressed_size(&self) -> usize {
860 self.blocks.iter().map(RawBgzfBlock::uncompressed_size).sum()
861 }
862
863 #[must_use]
865 pub fn total_compressed_size(&self) -> usize {
866 self.blocks.iter().map(RawBgzfBlock::len).sum()
867 }
868
869 pub fn clear(&mut self) {
871 self.blocks.clear();
872 }
873}
874
875impl MemoryEstimate for RawBlockBatch {
876 fn estimate_heap_size(&self) -> usize {
877 self.blocks.iter().map(|b| b.data.capacity()).sum::<usize>()
879 + self.blocks.capacity() * std::mem::size_of::<RawBgzfBlock>()
880 }
881}
882
883#[derive(Default)]
888pub struct CompressedBlockBatch {
889 pub blocks: Vec<CompressedBlock>,
891 pub record_count: u64,
893 pub secondary_data: Option<Vec<u8>>,
896}
897
898impl CompressedBlockBatch {
899 #[must_use]
901 pub fn new() -> Self {
902 Self { blocks: Vec::new(), record_count: 0, secondary_data: None }
903 }
904
905 #[must_use]
907 pub fn with_capacity(capacity: usize) -> Self {
908 Self { blocks: Vec::with_capacity(capacity), record_count: 0, secondary_data: None }
909 }
910
911 #[must_use]
913 pub fn len(&self) -> usize {
914 self.blocks.len()
915 }
916
917 #[must_use]
919 pub fn is_empty(&self) -> bool {
920 self.blocks.is_empty()
921 }
922
923 #[must_use]
925 pub fn total_size(&self) -> usize {
926 self.blocks.iter().map(|b| b.data.len()).sum()
927 }
928
929 pub fn clear(&mut self) {
931 self.blocks.clear();
932 self.record_count = 0;
933 self.secondary_data = None;
934 }
935}
936
937impl MemoryEstimate for CompressedBlockBatch {
938 fn estimate_heap_size(&self) -> usize {
939 self.blocks.iter().map(|b| b.data.capacity()).sum::<usize>()
941 + self.blocks.capacity() * std::mem::size_of::<CompressedBlock>()
942 + self.secondary_data.as_ref().map_or(0, Vec::capacity)
943 }
944}
945
946#[derive(Debug, Clone)]
948pub struct BgzfBatchConfig {
949 pub blocks_per_batch: usize,
951 pub compression_level: u32,
953}
954
955impl Default for BgzfBatchConfig {
956 fn default() -> Self {
957 Self { blocks_per_batch: 16, compression_level: 6 }
958 }
959}
960
961impl BgzfBatchConfig {
962 #[must_use]
964 pub fn new(blocks_per_batch: usize) -> Self {
965 Self { blocks_per_batch, ..Default::default() }
966 }
967
968 #[must_use]
970 pub fn with_compression_level(mut self, level: u32) -> Self {
971 self.compression_level = level;
972 self
973 }
974}
975
976pub fn read_raw_block_batch(
985 reader: &mut dyn Read,
986 buffer: &mut RawBlockBatch,
987 blocks_per_batch: usize,
988) -> io::Result<bool> {
989 buffer.clear();
990 let blocks = read_raw_blocks(reader, blocks_per_batch)?;
991 if blocks.is_empty() {
992 return Ok(false);
993 }
994 buffer.blocks = blocks;
995 Ok(true)
996}
997
998pub fn write_compressed_batch(
1006 writer: &mut dyn Write,
1007 batch: &CompressedBlockBatch,
1008) -> io::Result<()> {
1009 for block in &batch.blocks {
1010 writer.write_all(&block.data)?;
1011 }
1012 Ok(())
1013}
1014
1015pub struct BgzfWorkerState {
1020 pub decompressor: libdeflater::Decompressor,
1022 pub compressor: InlineBgzfCompressor,
1024}
1025
1026impl BgzfWorkerState {
1027 #[must_use]
1029 pub fn new(compression_level: u32) -> Self {
1030 Self {
1031 decompressor: libdeflater::Decompressor::new(),
1032 compressor: InlineBgzfCompressor::new(compression_level),
1033 }
1034 }
1035
1036 pub fn decompress_batch(&mut self, batch: &RawBlockBatch) -> io::Result<Vec<u8>> {
1044 let total_size = batch.total_uncompressed_size();
1045 let mut result = Vec::with_capacity(total_size);
1046
1047 for block in &batch.blocks {
1048 decompress_block_into(block, &mut self.decompressor, &mut result)?;
1049 }
1050
1051 Ok(result)
1052 }
1053}
1054
1055#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1077pub enum PipelineStep {
1078 Read,
1080 Decompress,
1082 FindBoundaries,
1084 Decode,
1086 Group,
1088 Process,
1090 Serialize,
1092 Compress,
1094 Write,
1096}
1097
1098impl PipelineStep {
1099 #[must_use]
1101 pub const fn is_exclusive(&self) -> bool {
1102 matches!(self, Self::Read | Self::FindBoundaries | Self::Group | Self::Write)
1103 }
1104
1105 #[must_use]
1107 pub const fn all() -> [PipelineStep; 9] {
1108 [
1109 PipelineStep::Read,
1110 PipelineStep::Decompress,
1111 PipelineStep::FindBoundaries,
1112 PipelineStep::Decode,
1113 PipelineStep::Group,
1114 PipelineStep::Process,
1115 PipelineStep::Serialize,
1116 PipelineStep::Compress,
1117 PipelineStep::Write,
1118 ]
1119 }
1120
1121 #[must_use]
1127 pub const fn from_index(idx: usize) -> PipelineStep {
1128 match idx {
1129 0 => PipelineStep::Read,
1130 1 => PipelineStep::Decompress,
1131 2 => PipelineStep::FindBoundaries,
1132 3 => PipelineStep::Decode,
1133 4 => PipelineStep::Group,
1134 5 => PipelineStep::Process,
1135 6 => PipelineStep::Serialize,
1136 7 => PipelineStep::Compress,
1137 8 => PipelineStep::Write,
1138 _ => panic!("PipelineStep::from_index: invalid index (must be 0..=8)"),
1139 }
1140 }
1141
1142 #[must_use]
1144 pub const fn index(&self) -> usize {
1145 match self {
1146 PipelineStep::Read => 0,
1147 PipelineStep::Decompress => 1,
1148 PipelineStep::FindBoundaries => 2,
1149 PipelineStep::Decode => 3,
1150 PipelineStep::Group => 4,
1151 PipelineStep::Process => 5,
1152 PipelineStep::Serialize => 6,
1153 PipelineStep::Compress => 7,
1154 PipelineStep::Write => 8,
1155 }
1156 }
1157
1158 #[must_use]
1160 pub const fn short_name(&self) -> &'static str {
1161 match self {
1162 PipelineStep::Read => "Rd",
1163 PipelineStep::Decompress => "Dc",
1164 PipelineStep::FindBoundaries => "Fb",
1165 PipelineStep::Decode => "De",
1166 PipelineStep::Group => "Gr",
1167 PipelineStep::Process => "Pr",
1168 PipelineStep::Serialize => "Se",
1169 PipelineStep::Compress => "Co",
1170 PipelineStep::Write => "Wr",
1171 }
1172 }
1173}
1174
1175#[derive(Debug, Clone)]
1186pub struct ActiveSteps {
1187 steps: Vec<PipelineStep>,
1189 active: [bool; 9],
1191}
1192
1193impl ActiveSteps {
1194 #[must_use]
1200 pub fn new(steps: &[PipelineStep]) -> Self {
1201 assert!(
1202 steps.windows(2).all(|w| w[0].index() < w[1].index()),
1203 "ActiveSteps must be unique and in pipeline order"
1204 );
1205 let mut active = [false; 9];
1206 for &step in steps {
1207 active[step.index()] = true;
1208 }
1209 Self { steps: steps.to_vec(), active }
1210 }
1211
1212 #[must_use]
1214 pub fn all() -> Self {
1215 Self::new(&PipelineStep::all())
1216 }
1217
1218 #[must_use]
1220 pub fn is_active(&self, step: PipelineStep) -> bool {
1221 self.active[step.index()]
1222 }
1223
1224 #[must_use]
1226 pub fn steps(&self) -> &[PipelineStep] {
1227 &self.steps
1228 }
1229
1230 #[must_use]
1232 pub fn exclusive_steps(&self) -> Vec<PipelineStep> {
1233 self.steps.iter().copied().filter(PipelineStep::is_exclusive).collect()
1234 }
1235
1236 #[must_use]
1238 pub fn len(&self) -> usize {
1239 self.steps.len()
1240 }
1241
1242 #[must_use]
1244 pub fn is_empty(&self) -> bool {
1245 self.steps.is_empty()
1246 }
1247
1248 pub fn filter_in_place(&self, buffer: &mut [PipelineStep; 9]) -> usize {
1251 let mut write = 0;
1252 for read in 0..9 {
1253 if self.active[buffer[read].index()] {
1254 buffer[write] = buffer[read];
1255 write += 1;
1256 }
1257 }
1258 write
1259 }
1260}
1261
1262#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1274pub struct GroupKey {
1275 pub ref_id1: i32,
1278 pub pos1: i32,
1280 pub strand1: u8,
1282 pub ref_id2: i32,
1284 pub pos2: i32,
1286 pub strand2: u8,
1288
1289 pub library_idx: u16,
1292 pub cell_hash: u64,
1294
1295 pub name_hash: u64,
1298}
1299
1300impl GroupKey {
1301 pub const UNKNOWN_REF: i32 = i32::MAX;
1303 pub const UNKNOWN_POS: i32 = i32::MAX;
1305 pub const UNKNOWN_STRAND: u8 = u8::MAX;
1307
1308 #[must_use]
1312 #[allow(clippy::too_many_arguments)]
1313 pub fn paired(
1314 ref_id: i32,
1315 pos: i32,
1316 strand: u8,
1317 mate_ref_id: i32,
1318 mate_pos: i32,
1319 mate_strand: u8,
1320 library_idx: u16,
1321 cell_hash: u64,
1322 name_hash: u64,
1323 ) -> Self {
1324 let (ref_id1, pos1, strand1, ref_id2, pos2, strand2) =
1326 if (ref_id, pos, strand) <= (mate_ref_id, mate_pos, mate_strand) {
1327 (ref_id, pos, strand, mate_ref_id, mate_pos, mate_strand)
1328 } else {
1329 (mate_ref_id, mate_pos, mate_strand, ref_id, pos, strand)
1330 };
1331
1332 Self { ref_id1, pos1, strand1, ref_id2, pos2, strand2, library_idx, cell_hash, name_hash }
1333 }
1334
1335 #[must_use]
1337 pub fn single(
1338 ref_id: i32,
1339 pos: i32,
1340 strand: u8,
1341 library_idx: u16,
1342 cell_hash: u64,
1343 name_hash: u64,
1344 ) -> Self {
1345 Self {
1346 ref_id1: ref_id,
1347 pos1: pos,
1348 strand1: strand,
1349 ref_id2: Self::UNKNOWN_REF,
1350 pos2: Self::UNKNOWN_POS,
1351 strand2: Self::UNKNOWN_STRAND,
1352 library_idx,
1353 cell_hash,
1354 name_hash,
1355 }
1356 }
1357
1358 #[must_use]
1363 pub fn position_key(&self) -> (i32, i32, u8, i32, i32, u8, u16, u64) {
1364 (
1365 self.ref_id1,
1366 self.pos1,
1367 self.strand1,
1368 self.ref_id2,
1369 self.pos2,
1370 self.strand2,
1371 self.library_idx,
1372 self.cell_hash,
1373 )
1374 }
1375}
1376
1377impl PartialOrd for GroupKey {
1378 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1379 Some(self.cmp(other))
1380 }
1381}
1382
1383impl Ord for GroupKey {
1384 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1385 self.position_key()
1386 .cmp(&other.position_key())
1387 .then_with(|| self.name_hash.cmp(&other.name_hash))
1388 }
1389}
1390
1391impl Default for GroupKey {
1392 fn default() -> Self {
1393 Self {
1394 ref_id1: Self::UNKNOWN_REF,
1395 pos1: Self::UNKNOWN_POS,
1396 strand1: Self::UNKNOWN_STRAND,
1397 ref_id2: Self::UNKNOWN_REF,
1398 pos2: Self::UNKNOWN_POS,
1399 strand2: Self::UNKNOWN_STRAND,
1400 library_idx: 0,
1401 cell_hash: 0,
1402 name_hash: 0,
1403 }
1404 }
1405}
1406
1407#[derive(Debug)]
1421pub struct DecodedRecord {
1422 pub key: GroupKey,
1424 pub(crate) data: DecodedRecordData,
1426}
1427
1428#[derive(Debug)]
1430pub enum DecodedRecordData {
1431 Parsed(RecordBuf),
1432 Raw(Vec<u8>),
1433}
1434
1435impl DecodedRecord {
1436 #[must_use]
1438 pub fn new(record: RecordBuf, key: GroupKey) -> Self {
1439 Self { key, data: DecodedRecordData::Parsed(record) }
1440 }
1441
1442 #[must_use]
1444 pub fn from_raw_bytes(raw: Vec<u8>, key: GroupKey) -> Self {
1445 Self { key, data: DecodedRecordData::Raw(raw) }
1446 }
1447
1448 #[must_use]
1450 pub fn raw_bytes(&self) -> Option<&[u8]> {
1451 match &self.data {
1452 DecodedRecordData::Raw(v) => Some(v),
1453 DecodedRecordData::Parsed(_) => None,
1454 }
1455 }
1456
1457 #[must_use]
1459 pub fn into_raw_bytes(self) -> Option<Vec<u8>> {
1460 match self.data {
1461 DecodedRecordData::Raw(v) => Some(v),
1462 DecodedRecordData::Parsed(_) => None,
1463 }
1464 }
1465
1466 #[must_use]
1468 pub fn record(&self) -> Option<&RecordBuf> {
1469 match &self.data {
1470 DecodedRecordData::Parsed(r) => Some(r),
1471 DecodedRecordData::Raw(_) => None,
1472 }
1473 }
1474
1475 #[must_use]
1477 pub fn into_record(self) -> Option<RecordBuf> {
1478 match self.data {
1479 DecodedRecordData::Parsed(r) => Some(r),
1480 DecodedRecordData::Raw(_) => None,
1481 }
1482 }
1483}
1484
1485impl MemoryEstimate for DecodedRecord {
1486 fn estimate_heap_size(&self) -> usize {
1487 match &self.data {
1488 DecodedRecordData::Parsed(record) => {
1489 crate::template::estimate_record_buf_heap_size(record)
1490 }
1491 DecodedRecordData::Raw(raw) => raw.capacity(),
1492 }
1493 }
1494}
1495
1496impl MemoryEstimate for Vec<DecodedRecord> {
1497 fn estimate_heap_size(&self) -> usize {
1498 self.iter().map(MemoryEstimate::estimate_heap_size).sum::<usize>()
1499 + self.capacity() * std::mem::size_of::<DecodedRecord>()
1500 }
1501}
1502
1503impl MemoryEstimate for RecordBuf {
1504 fn estimate_heap_size(&self) -> usize {
1505 crate::template::estimate_record_buf_heap_size(self)
1507 }
1508}
1509
1510impl MemoryEstimate for Vec<RecordBuf> {
1511 fn estimate_heap_size(&self) -> usize {
1512 self.iter().map(MemoryEstimate::estimate_heap_size).sum::<usize>()
1513 + self.capacity() * std::mem::size_of::<RecordBuf>()
1514 }
1515}
1516
1517impl MemoryEstimate for Vec<u8> {
1518 fn estimate_heap_size(&self) -> usize {
1519 self.capacity()
1520 }
1521}
1522
1523#[derive(Debug, Clone)]
1534pub struct GroupKeyConfig {
1535 pub library_index: Arc<LibraryIndex>,
1537 pub cell_tag: Option<Tag>,
1539 pub raw_byte_mode: bool,
1541}
1542
1543impl GroupKeyConfig {
1544 #[must_use]
1546 pub fn new(library_index: LibraryIndex, cell_tag: Tag) -> Self {
1547 Self {
1548 library_index: Arc::new(library_index),
1549 cell_tag: Some(cell_tag),
1550 raw_byte_mode: false,
1551 }
1552 }
1553
1554 #[must_use]
1556 pub fn new_raw(library_index: LibraryIndex, cell_tag: Tag) -> Self {
1557 Self {
1558 library_index: Arc::new(library_index),
1559 cell_tag: Some(cell_tag),
1560 raw_byte_mode: true,
1561 }
1562 }
1563
1564 #[must_use]
1566 pub fn new_raw_no_cell(library_index: LibraryIndex) -> Self {
1567 Self { library_index: Arc::new(library_index), cell_tag: None, raw_byte_mode: true }
1568 }
1569}
1570
1571impl Default for GroupKeyConfig {
1572 fn default() -> Self {
1573 Self {
1574 library_index: Arc::new(LibraryIndex::default()),
1575 cell_tag: Some(Tag::from([b'C', b'B'])), raw_byte_mode: false,
1577 }
1578 }
1579}
1580
1581#[derive(Default)]
1583pub struct DecompressedBatch {
1584 pub data: Vec<u8>,
1586}
1587
1588impl DecompressedBatch {
1589 #[must_use]
1591 pub fn new() -> Self {
1592 Self { data: Vec::new() }
1593 }
1594
1595 #[must_use]
1597 pub fn with_capacity(capacity: usize) -> Self {
1598 Self { data: Vec::with_capacity(capacity) }
1599 }
1600
1601 #[must_use]
1603 pub fn is_empty(&self) -> bool {
1604 self.data.is_empty()
1605 }
1606
1607 pub fn clear(&mut self) {
1609 self.data.clear();
1610 }
1611}
1612
1613impl MemoryEstimate for DecompressedBatch {
1614 fn estimate_heap_size(&self) -> usize {
1615 self.data.capacity()
1616 }
1617}
1618
1619#[derive(Default)]
1621pub struct SerializedBatch {
1622 pub data: Vec<u8>,
1624 pub record_count: u64,
1626 pub secondary_data: Option<Vec<u8>>,
1629}
1630
1631impl SerializedBatch {
1632 #[must_use]
1634 pub fn new() -> Self {
1635 Self { data: Vec::new(), record_count: 0, secondary_data: None }
1636 }
1637
1638 #[must_use]
1640 pub fn is_empty(&self) -> bool {
1641 self.data.is_empty()
1642 }
1643
1644 pub fn clear(&mut self) {
1646 self.data.clear();
1647 self.record_count = 0;
1648 self.secondary_data = None;
1649 }
1650}
1651
1652impl MemoryEstimate for SerializedBatch {
1653 fn estimate_heap_size(&self) -> usize {
1654 self.data.capacity() + self.secondary_data.as_ref().map_or(0, Vec::capacity)
1655 }
1656}
1657
1658pub struct OutputPipelineQueues<G, P: MemoryEstimate> {
1671 pub groups: ArrayQueue<(u64, Vec<G>)>,
1674 pub groups_heap_bytes: AtomicU64,
1677
1678 pub processed: ArrayQueue<(u64, Vec<P>)>,
1681 pub processed_heap_bytes: AtomicU64,
1683
1684 pub serialized: ArrayQueue<(u64, SerializedBatch)>,
1687 pub serialized_heap_bytes: AtomicU64,
1689
1690 pub compressed: ArrayQueue<(u64, CompressedBlockBatch)>,
1693 pub compressed_heap_bytes: AtomicU64,
1695 pub write_reorder: Mutex<ReorderBuffer<CompressedBlockBatch>>,
1697
1698 pub output: Mutex<Option<Box<dyn Write + Send>>>,
1701 pub secondary_output: Option<Mutex<Option<crate::bam_io::RawBamWriter>>>,
1703
1704 pub error_flag: AtomicBool,
1707 pub error: Mutex<Option<io::Error>>,
1709 pub items_written: AtomicU64,
1711 pub draining: AtomicBool,
1713 pub progress: ProgressTracker,
1715
1716 pub stats: Option<Arc<PipelineStats>>,
1719}
1720
1721impl<G: Send, P: Send + MemoryEstimate> OutputPipelineQueues<G, P> {
1722 #[must_use]
1730 pub fn new(
1731 queue_capacity: usize,
1732 output: Box<dyn Write + Send>,
1733 stats: Option<Arc<PipelineStats>>,
1734 progress_name: &str,
1735 ) -> Self {
1736 Self {
1737 groups: ArrayQueue::new(queue_capacity),
1738 groups_heap_bytes: AtomicU64::new(0),
1739 processed: ArrayQueue::new(queue_capacity),
1740 processed_heap_bytes: AtomicU64::new(0),
1741 serialized: ArrayQueue::new(queue_capacity),
1742 serialized_heap_bytes: AtomicU64::new(0),
1743 compressed: ArrayQueue::new(queue_capacity),
1744 compressed_heap_bytes: AtomicU64::new(0),
1745 write_reorder: Mutex::new(ReorderBuffer::new()),
1746 output: Mutex::new(Some(output)),
1747 secondary_output: None,
1748 error_flag: AtomicBool::new(false),
1749 error: Mutex::new(None),
1750 items_written: AtomicU64::new(0),
1751 draining: AtomicBool::new(false),
1752 progress: ProgressTracker::new(progress_name).with_interval(PROGRESS_LOG_INTERVAL),
1753 stats,
1754 }
1755 }
1756
1757 pub fn set_secondary_output(&mut self, writer: crate::bam_io::RawBamWriter) {
1761 self.secondary_output = Some(Mutex::new(Some(writer)));
1762 }
1763
1764 pub fn set_error(&self, error: io::Error) {
1768 self.error_flag.store(true, Ordering::SeqCst);
1769 let mut guard = self.error.lock();
1770 if guard.is_none() {
1771 *guard = Some(error);
1772 }
1773 }
1774
1775 #[must_use]
1777 pub fn has_error(&self) -> bool {
1778 self.error_flag.load(Ordering::Relaxed)
1779 }
1780
1781 pub fn take_error(&self) -> Option<io::Error> {
1783 self.error.lock().take()
1784 }
1785
1786 #[must_use]
1790 pub fn is_processed_memory_high(&self) -> bool {
1791 self.processed_heap_bytes.load(Ordering::Acquire) >= Q5_BACKPRESSURE_THRESHOLD_BYTES
1792 }
1793
1794 #[must_use]
1796 pub fn is_draining(&self) -> bool {
1797 self.draining.load(Ordering::Relaxed)
1798 }
1799
1800 pub fn set_draining(&self, value: bool) {
1802 self.draining.store(value, Ordering::Relaxed);
1803 }
1804
1805 #[must_use]
1807 pub fn should_apply_process_backpressure(&self) -> bool {
1808 self.processed.is_full() || (!self.is_draining() && self.is_processed_memory_high())
1809 }
1810
1811 #[must_use]
1815 pub fn queue_depths(&self) -> OutputQueueDepths {
1816 OutputQueueDepths {
1817 groups: self.groups.len(),
1818 processed: self.processed.len(),
1819 serialized: self.serialized.len(),
1820 compressed: self.compressed.len(),
1821 }
1822 }
1823
1824 #[must_use]
1826 pub fn are_queues_empty(&self) -> bool {
1827 self.groups.is_empty()
1828 && self.processed.is_empty()
1829 && self.serialized.is_empty()
1830 && self.compressed.is_empty()
1831 && self.write_reorder.lock().is_empty()
1832 }
1833}
1834
1835#[derive(Debug, Clone, Copy)]
1837pub struct OutputQueueDepths {
1838 pub groups: usize,
1839 pub processed: usize,
1840 pub serialized: usize,
1841 pub compressed: usize,
1842}
1843
1844pub const MIN_BACKOFF_US: u64 = 10;
1850pub const MAX_BACKOFF_US: u64 = 1000;
1852pub const SERIALIZATION_BUFFER_CAPACITY: usize = 256 * 1024;
1854
1855pub struct WorkerCoreState {
1873 pub compressor: InlineBgzfCompressor,
1875 pub scheduler: Box<dyn Scheduler>,
1877 pub serialization_buffer: Vec<u8>,
1880 pub secondary_serialization_buffer: Vec<u8>,
1883 pub backoff_us: u64,
1885 pub recycled_buffers: Vec<Vec<u8>>,
1889}
1890
1891impl WorkerCoreState {
1892 #[must_use]
1901 pub fn new(
1902 compression_level: u32,
1903 thread_id: usize,
1904 num_threads: usize,
1905 scheduler_strategy: SchedulerStrategy,
1906 active_steps: ActiveSteps,
1907 ) -> Self {
1908 Self {
1909 compressor: InlineBgzfCompressor::new(compression_level),
1910 scheduler: create_scheduler(scheduler_strategy, thread_id, num_threads, active_steps),
1911 serialization_buffer: Vec::with_capacity(SERIALIZATION_BUFFER_CAPACITY),
1912 secondary_serialization_buffer: Vec::new(),
1913 backoff_us: MIN_BACKOFF_US,
1914 recycled_buffers: Vec::with_capacity(2),
1915 }
1916 }
1917
1918 #[inline]
1920 pub fn reset_backoff(&mut self) {
1921 self.backoff_us = MIN_BACKOFF_US;
1922 }
1923
1924 #[inline]
1926 pub fn increase_backoff(&mut self) {
1927 self.backoff_us = (self.backoff_us * 2).min(MAX_BACKOFF_US);
1928 }
1929
1930 #[inline]
1935 pub fn sleep_backoff(&self) {
1936 if self.backoff_us <= MIN_BACKOFF_US {
1937 std::thread::yield_now();
1939 } else {
1940 let jitter_range = self.backoff_us / 4;
1943 let jitter_seed = u64::from(
1944 std::time::SystemTime::now()
1945 .duration_since(std::time::UNIX_EPOCH)
1946 .map(|d| d.subsec_nanos())
1947 .unwrap_or(0),
1948 );
1949 let jitter_offset = (jitter_seed % (jitter_range * 2)).saturating_sub(jitter_range);
1951 let actual_us = self.backoff_us.saturating_add(jitter_offset).max(MIN_BACKOFF_US);
1952 std::thread::sleep(std::time::Duration::from_micros(actual_us));
1953 }
1954 }
1955
1956 #[inline]
1958 pub fn take_or_alloc_buffer(&mut self, capacity: usize) -> Vec<u8> {
1959 if let Some(mut buf) = self.recycled_buffers.pop() {
1960 buf.clear();
1961 buf.reserve(capacity);
1964 buf
1965 } else {
1966 Vec::with_capacity(capacity)
1967 }
1968 }
1969
1970 #[inline]
1972 pub fn recycle_buffer(&mut self, buf: Vec<u8>) {
1973 if self.recycled_buffers.len() < 2 {
1974 self.recycled_buffers.push(buf);
1975 }
1976 }
1977}
1978
1979impl HasCompressor for WorkerCoreState {
1980 fn compressor_mut(&mut self) -> &mut InlineBgzfCompressor {
1981 &mut self.compressor
1982 }
1983}
1984
1985impl HasRecycledBuffers for WorkerCoreState {
1986 fn take_or_alloc_buffer(&mut self, capacity: usize) -> Vec<u8> {
1987 self.take_or_alloc_buffer(capacity)
1988 }
1989
1990 fn recycle_buffer(&mut self, buf: Vec<u8>) {
1991 self.recycle_buffer(buf);
1992 }
1993}
1994
1995#[derive(Debug, Clone)]
2005pub struct PipelineValidationError {
2006 pub non_empty_queues: Vec<String>,
2008 pub counter_mismatches: Vec<String>,
2010 pub leaked_heap_bytes: u64,
2012}
2013
2014impl std::fmt::Display for PipelineValidationError {
2015 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2016 writeln!(f, "Pipeline validation failed - potential data loss detected:")?;
2017 if !self.non_empty_queues.is_empty() {
2018 writeln!(f, " Non-empty queues: {}", self.non_empty_queues.join(", "))?;
2019 }
2020 if !self.counter_mismatches.is_empty() {
2021 writeln!(f, " Counter mismatches:")?;
2022 for mismatch in &self.counter_mismatches {
2023 writeln!(f, " - {mismatch}")?;
2024 }
2025 }
2026 if self.leaked_heap_bytes > 0 {
2027 writeln!(f, " Leaked heap bytes: {}", self.leaked_heap_bytes)?;
2028 }
2029 Ok(())
2030 }
2031}
2032
2033impl std::error::Error for PipelineValidationError {}
2034
2035pub trait PipelineLifecycle {
2050 fn is_complete(&self) -> bool;
2057
2058 fn has_error(&self) -> bool;
2060
2061 fn take_error(&self) -> Option<io::Error>;
2063
2064 fn set_error(&self, error: io::Error);
2066
2067 fn is_draining(&self) -> bool;
2072
2073 fn set_draining(&self, value: bool);
2075
2076 fn stats(&self) -> Option<&PipelineStats>;
2078
2079 fn progress(&self) -> &ProgressTracker;
2081
2082 fn items_written(&self) -> u64;
2084
2085 fn flush_output(&self) -> io::Result<()>;
2091
2092 fn validate_completion(&self) -> Result<(), PipelineValidationError>;
2110}
2111
2112#[inline]
2117pub fn should_monitor_exit<P: PipelineLifecycle>(pipeline: &P) -> bool {
2118 pipeline.is_complete() || pipeline.has_error()
2119}
2120
2121pub trait MonitorableState: PipelineLifecycle {
2126 fn deadlock_state(&self) -> &DeadlockState;
2128
2129 fn build_queue_snapshot(&self) -> QueueSnapshot;
2135}
2136
2137pub fn run_monitor_loop<S, F>(
2160 state: &Arc<S>,
2161 sample_interval_ms: u64,
2162 deadlock_check_samples: u32,
2163 on_sample: F,
2164) where
2165 S: MonitorableState,
2166 F: Fn(&S),
2167{
2168 let mut deadlock_counter = 0u32;
2169 loop {
2170 thread::sleep(Duration::from_millis(sample_interval_ms));
2171
2172 if should_monitor_exit(state.as_ref()) {
2173 break;
2174 }
2175
2176 on_sample(state.as_ref());
2178
2179 if state.deadlock_state().is_enabled() {
2181 deadlock_counter += 1;
2182 if deadlock_counter >= deadlock_check_samples {
2183 deadlock_counter = 0;
2184 let snapshot = state.build_queue_snapshot();
2185 check_deadlock_and_restore(state.deadlock_state(), &snapshot);
2186 }
2187 }
2188 }
2189}
2190
2191#[must_use]
2201#[allow(clippy::needless_pass_by_value)]
2202pub fn extract_panic_message(panic_info: Box<dyn std::any::Any + Send>) -> String {
2203 if let Some(s) = panic_info.downcast_ref::<&str>() {
2204 (*s).to_string()
2205 } else if let Some(s) = panic_info.downcast_ref::<String>() {
2206 s.clone()
2207 } else {
2208 "Unknown panic".to_string()
2209 }
2210}
2211
2212pub fn handle_worker_panic<S: PipelineLifecycle>(
2217 state: &S,
2218 thread_id: usize,
2219 panic_info: Box<dyn std::any::Any + Send>,
2220) {
2221 let msg = extract_panic_message(panic_info);
2222 state.set_error(io::Error::other(format!("Worker thread {thread_id} panicked: {msg}")));
2223}
2224
2225pub fn join_worker_threads(handles: Vec<thread::JoinHandle<()>>) -> io::Result<()> {
2237 for handle in handles {
2238 handle.join().map_err(|_| io::Error::other("Worker thread panicked"))?;
2239 }
2240 Ok(())
2241}
2242
2243pub fn join_monitor_thread(handle: Option<thread::JoinHandle<()>>) {
2245 if let Some(h) = handle {
2246 let _ = h.join();
2247 }
2248}
2249
2250pub fn finalize_pipeline<S: PipelineLifecycle>(state: &S) -> io::Result<u64> {
2269 if let Some(error) = state.take_error() {
2271 return Err(error);
2272 }
2273
2274 state.validate_completion().map_err(io::Error::other)?;
2276
2277 state.flush_output()?;
2279
2280 if let Some(stats) = state.stats() {
2282 stats.log_summary();
2283 }
2284
2285 Ok(state.items_written())
2286}
2287
2288impl<G: Send + 'static, P: Send + MemoryEstimate + 'static> ProcessPipelineState<G, P>
2293 for OutputPipelineQueues<G, P>
2294{
2295 fn process_input_pop(&self) -> Option<(u64, Vec<G>)> {
2296 self.groups.pop()
2297 }
2298
2299 fn process_output_is_full(&self) -> bool {
2300 self.processed.is_full()
2301 }
2302
2303 fn process_output_push(&self, item: (u64, Vec<P>)) -> Result<(), (u64, Vec<P>)> {
2304 let heap_size: usize = item.1.iter().map(MemoryEstimate::estimate_heap_size).sum();
2305 let result = self.processed.push(item);
2306 if result.is_ok() {
2307 self.processed_heap_bytes.fetch_add(heap_size as u64, Ordering::AcqRel);
2308 }
2309 result
2310 }
2311
2312 fn has_error(&self) -> bool {
2313 self.error_flag.load(Ordering::Acquire)
2314 }
2315
2316 fn set_error(&self, error: io::Error) {
2317 OutputPipelineQueues::set_error(self, error);
2318 }
2319
2320 fn should_apply_process_backpressure(&self) -> bool {
2321 OutputPipelineQueues::should_apply_process_backpressure(self)
2322 }
2323
2324 fn is_draining(&self) -> bool {
2325 OutputPipelineQueues::is_draining(self)
2326 }
2327}
2328
2329impl<G: Send + 'static, P: Send + MemoryEstimate + 'static> SerializePipelineState<P>
2330 for OutputPipelineQueues<G, P>
2331{
2332 fn serialize_input_pop(&self) -> Option<(u64, Vec<P>)> {
2333 let result = self.processed.pop();
2334 if let Some((_, ref batch)) = result {
2335 let heap_size: usize = batch.iter().map(MemoryEstimate::estimate_heap_size).sum();
2336 self.processed_heap_bytes.fetch_sub(heap_size as u64, Ordering::AcqRel);
2337 }
2338 result
2339 }
2340
2341 fn serialize_output_is_full(&self) -> bool {
2342 self.serialized.is_full()
2343 }
2344
2345 fn serialize_output_push(
2346 &self,
2347 item: (u64, SerializedBatch),
2348 ) -> Result<(), (u64, SerializedBatch)> {
2349 let heap_size = item.1.estimate_heap_size();
2350 let result = self.serialized.push(item);
2351 if result.is_ok() {
2352 self.serialized_heap_bytes.fetch_add(heap_size as u64, Ordering::AcqRel);
2353 }
2354 result
2355 }
2356
2357 fn has_error(&self) -> bool {
2358 self.error_flag.load(Ordering::Acquire)
2359 }
2360
2361 fn set_error(&self, error: io::Error) {
2362 OutputPipelineQueues::set_error(self, error);
2363 }
2364
2365 fn record_serialized_bytes(&self, bytes: u64) {
2366 if let Some(ref stats) = self.stats {
2367 stats.serialized_bytes.fetch_add(bytes, Ordering::Relaxed);
2368 }
2369 }
2370}
2371
2372impl<G: Send + 'static, P: Send + MemoryEstimate + 'static> WritePipelineState
2373 for OutputPipelineQueues<G, P>
2374{
2375 fn write_input_queue(&self) -> &ArrayQueue<(u64, CompressedBlockBatch)> {
2376 &self.compressed
2377 }
2378
2379 fn write_reorder_buffer(&self) -> &Mutex<ReorderBuffer<CompressedBlockBatch>> {
2380 &self.write_reorder
2381 }
2382
2383 fn write_output(&self) -> &Mutex<Option<Box<dyn Write + Send>>> {
2384 &self.output
2385 }
2386
2387 fn has_error(&self) -> bool {
2388 self.error_flag.load(Ordering::Acquire)
2389 }
2390
2391 fn set_error(&self, error: io::Error) {
2392 OutputPipelineQueues::set_error(self, error);
2393 }
2394
2395 fn record_written(&self, count: u64) {
2396 self.items_written.fetch_add(count, Ordering::Release);
2397 }
2398
2399 fn stats(&self) -> Option<&PipelineStats> {
2400 self.stats.as_deref()
2401 }
2402}
2403
2404impl<G: Send + 'static, P: Send + MemoryEstimate + 'static> OutputPipelineState
2405 for OutputPipelineQueues<G, P>
2406{
2407 type Processed = P;
2408
2409 fn has_error(&self) -> bool {
2410 self.error_flag.load(Ordering::Acquire)
2411 }
2412
2413 fn set_error(&self, error: io::Error) {
2414 OutputPipelineQueues::set_error(self, error);
2415 }
2416
2417 fn q5_pop(&self) -> Option<(u64, SerializedBatch)> {
2418 self.serialized.pop()
2419 }
2420
2421 fn q5_push(&self, item: (u64, SerializedBatch)) -> Result<(), (u64, SerializedBatch)> {
2422 self.serialized.push(item)
2423 }
2424
2425 fn q5_is_full(&self) -> bool {
2426 self.serialized.is_full()
2427 }
2428
2429 fn q5_track_pop(&self, heap_size: u64) {
2430 self.serialized_heap_bytes.fetch_sub(heap_size, Ordering::AcqRel);
2431 }
2432
2433 fn q6_pop(&self) -> Option<(u64, CompressedBlockBatch)> {
2434 self.compressed.pop()
2435 }
2436
2437 fn q6_push(
2438 &self,
2439 item: (u64, CompressedBlockBatch),
2440 ) -> Result<(), (u64, CompressedBlockBatch)> {
2441 self.compressed.push(item)
2442 }
2443
2444 fn q6_is_full(&self) -> bool {
2445 self.compressed.is_full()
2446 }
2447
2448 fn q6_reorder_insert(&self, serial: u64, batch: CompressedBlockBatch) {
2449 self.write_reorder.lock().insert(serial, batch);
2450 }
2451
2452 fn q6_reorder_try_pop_next(&self) -> Option<CompressedBlockBatch> {
2453 self.write_reorder.lock().try_pop_next()
2454 }
2455
2456 fn output_try_lock(
2457 &self,
2458 ) -> Option<parking_lot::MutexGuard<'_, Option<Box<dyn Write + Send>>>> {
2459 self.output.try_lock()
2460 }
2461
2462 fn increment_written(&self) -> u64 {
2463 self.items_written.fetch_add(1, Ordering::Release)
2464 }
2465
2466 fn record_compressed_bytes_out(&self, bytes: u64) {
2467 if let Some(ref stats) = self.stats {
2468 stats.compressed_bytes_out.fetch_add(bytes, Ordering::Relaxed);
2469 }
2470 }
2471
2472 fn record_q6_pop_progress(&self) {
2473 }
2475
2476 fn record_q7_push_progress(&self) {
2477 }
2479
2480 fn stats(&self) -> Option<&PipelineStats> {
2481 self.stats.as_deref()
2482 }
2483}
2484
2485impl MemoryEstimate for () {
2487 fn estimate_heap_size(&self) -> usize {
2488 0
2489 }
2490}
2491
2492#[derive(Debug, Clone)]
2494pub struct PipelineConfig {
2495 pub num_threads: usize,
2497 pub queue_capacity: usize,
2499 pub input_low_water: usize,
2501 pub output_high_water: usize,
2503 pub compression_level: u32,
2505 pub blocks_per_read_batch: usize,
2507 pub collect_stats: bool,
2509 pub batch_size: usize,
2521 pub target_templates_per_batch: usize,
2532 pub header_already_read: bool,
2539 pub scheduler_strategy: SchedulerStrategy,
2544 pub queue_memory_limit: u64,
2556 pub deadlock_timeout_secs: u64,
2561 pub deadlock_recover_enabled: bool,
2566 pub shared_stats: Option<Arc<PipelineStats>>,
2569}
2570
2571impl PipelineConfig {
2572 #[must_use]
2574 pub fn new(num_threads: usize, compression_level: u32) -> Self {
2575 Self {
2576 num_threads: num_threads.max(1),
2577 queue_capacity: 64,
2578 input_low_water: 8,
2579 output_high_water: 32,
2580 compression_level,
2581 blocks_per_read_batch: 16,
2582 collect_stats: false,
2583 batch_size: 1,
2584 target_templates_per_batch: 0, header_already_read: false,
2586 scheduler_strategy: SchedulerStrategy::default(),
2587 queue_memory_limit: 0, deadlock_timeout_secs: 10, deadlock_recover_enabled: false, shared_stats: None, }
2592 }
2593
2594 #[must_use]
2596 pub fn with_compression_level(mut self, level: u32) -> Self {
2597 self.compression_level = level;
2598 self
2599 }
2600
2601 #[must_use]
2603 pub fn with_blocks_per_batch(mut self, blocks: usize) -> Self {
2604 self.blocks_per_read_batch = blocks;
2605 self
2606 }
2607
2608 #[must_use]
2610 pub fn with_stats(mut self, collect: bool) -> Self {
2611 self.collect_stats = collect;
2612 self
2613 }
2614
2615 #[must_use]
2618 pub fn with_shared_stats(mut self, stats: Arc<PipelineStats>) -> Self {
2619 self.collect_stats = true; self.shared_stats = Some(stats);
2621 self
2622 }
2623
2624 #[must_use]
2626 pub fn with_batch_size(mut self, size: usize) -> Self {
2627 self.batch_size = size.max(1);
2628 self
2629 }
2630
2631 #[must_use]
2636 pub fn with_target_templates_per_batch(mut self, count: usize) -> Self {
2637 self.target_templates_per_batch = count;
2638 self
2639 }
2640
2641 #[must_use]
2646 pub fn auto_tuned(num_threads: usize, compression_level: u32) -> Self {
2647 let num_threads = num_threads.max(1);
2648
2649 let queue_capacity = (num_threads * 16).clamp(64, 256);
2651
2652 let input_low_water = num_threads.max(4);
2654
2655 let output_high_water = (num_threads * 4).max(32);
2657
2658 let blocks_per_read_batch = match num_threads {
2660 1..=3 => 16,
2661 4..=7 => 32,
2662 8..=15 => 48,
2663 _ => 64,
2664 };
2665
2666 Self {
2667 num_threads,
2668 queue_capacity,
2669 input_low_water,
2670 output_high_water,
2671 compression_level,
2672 blocks_per_read_batch,
2673 collect_stats: false,
2674 batch_size: 1,
2675 target_templates_per_batch: 500,
2678 header_already_read: false,
2679 scheduler_strategy: SchedulerStrategy::default(),
2680 queue_memory_limit: 0, deadlock_timeout_secs: 10, deadlock_recover_enabled: false, shared_stats: None, }
2685 }
2686
2687 #[must_use]
2689 pub fn with_scheduler_strategy(mut self, strategy: SchedulerStrategy) -> Self {
2690 self.scheduler_strategy = strategy;
2691 self
2692 }
2693
2694 #[must_use]
2702 pub fn with_queue_memory_limit(mut self, limit_bytes: u64) -> Self {
2703 self.queue_memory_limit = limit_bytes;
2704 self
2705 }
2706
2707 #[must_use]
2712 pub fn with_deadlock_timeout(mut self, timeout_secs: u64) -> Self {
2713 self.deadlock_timeout_secs = timeout_secs;
2714 self
2715 }
2716
2717 #[must_use]
2722 pub fn with_deadlock_recovery(mut self, enabled: bool) -> Self {
2723 self.deadlock_recover_enabled = enabled;
2724 self
2725 }
2726}
2727
2728pub const MAX_THREADS: usize = 32;
2734
2735const NUM_STEPS: usize = 9;
2737
2738#[derive(Debug, Clone)]
2740pub struct QueueSample {
2741 pub time_ms: u64,
2743 pub queue_sizes: [usize; 8],
2745 pub reorder_sizes: [usize; 3],
2747 pub queue_memory_bytes: [u64; 8],
2750 pub reorder_memory_bytes: [u64; 3],
2752 pub thread_steps: Vec<u8>,
2754}
2755
2756#[derive(Debug)]
2761pub struct PipelineStats {
2762 pub step_read_ns: AtomicU64,
2765 pub step_decompress_ns: AtomicU64,
2767 pub step_find_boundaries_ns: AtomicU64,
2769 pub step_decode_ns: AtomicU64,
2771 pub step_group_ns: AtomicU64,
2773 pub step_process_ns: AtomicU64,
2775 pub step_serialize_ns: AtomicU64,
2777 pub step_compress_ns: AtomicU64,
2779 pub step_write_ns: AtomicU64,
2781
2782 pub step_read_count: AtomicU64,
2785 pub step_decompress_count: AtomicU64,
2787 pub step_find_boundaries_count: AtomicU64,
2789 pub step_decode_count: AtomicU64,
2791 pub step_group_count: AtomicU64,
2793 pub step_process_count: AtomicU64,
2795 pub step_serialize_count: AtomicU64,
2797 pub step_compress_count: AtomicU64,
2799 pub step_write_count: AtomicU64,
2801
2802 pub read_contention: AtomicU64,
2805 pub boundary_contention: AtomicU64,
2807 pub group_contention: AtomicU64,
2809 pub write_contention: AtomicU64,
2811
2812 pub q1_empty: AtomicU64,
2815 pub q2_empty: AtomicU64,
2817 pub q2b_empty: AtomicU64,
2819 pub q3_empty: AtomicU64,
2821 pub q4_empty: AtomicU64,
2823 pub q5_empty: AtomicU64,
2825 pub q6_empty: AtomicU64,
2827 pub q7_empty: AtomicU64,
2829
2830 pub idle_yields: AtomicU64,
2833
2834 pub per_thread_step_counts: Box<[[AtomicU64; NUM_STEPS]; MAX_THREADS]>,
2840
2841 pub per_thread_step_attempts: Box<[[AtomicU64; NUM_STEPS]; MAX_THREADS]>,
2844
2845 pub per_thread_idle_ns: Box<[AtomicU64; MAX_THREADS]>,
2848
2849 pub per_thread_current_step: Box<[AtomicU8; MAX_THREADS]>,
2852
2853 pub boundary_wait_ns: AtomicU64,
2856 pub group_wait_ns: AtomicU64,
2858 pub write_wait_ns: AtomicU64,
2860
2861 pub batch_size_sum: AtomicU64,
2864 pub batch_count: AtomicU64,
2866 pub batch_size_min: AtomicU64,
2868 pub batch_size_max: AtomicU64,
2870
2871 pub num_threads: AtomicU64,
2873
2874 pub bytes_read: AtomicU64,
2879 pub bytes_written: AtomicU64,
2881 pub compressed_bytes_in: AtomicU64,
2883 pub decompressed_bytes: AtomicU64,
2885 pub serialized_bytes: AtomicU64,
2887 pub compressed_bytes_out: AtomicU64,
2889 pub records_decoded: AtomicU64,
2891 pub groups_produced: AtomicU64,
2893
2894 pub queue_samples: Mutex<Vec<QueueSample>>,
2900
2901 pub memory_drain_activations: AtomicU64,
2906 pub group_memory_rejects: AtomicU64,
2908 pub peak_memory_bytes: AtomicU64,
2910
2911 #[cfg(feature = "memory-debug")]
2916 pub memory: MemoryDebugStats,
2917}
2918
2919#[allow(clippy::unnecessary_box_returns)]
2922fn new_atomic_array<const N: usize>() -> Box<[AtomicU64; N]> {
2923 let v: Vec<AtomicU64> = (0..N).map(|_| AtomicU64::new(0)).collect();
2925 v.into_boxed_slice().try_into().unwrap()
2926}
2927
2928#[allow(clippy::unnecessary_box_returns)]
2931fn new_atomic_2d_array<const R: usize, const C: usize>() -> Box<[[AtomicU64; C]; R]> {
2932 let v: Vec<[AtomicU64; C]> =
2933 (0..R).map(|_| std::array::from_fn(|_| AtomicU64::new(0))).collect();
2934 v.into_boxed_slice().try_into().unwrap()
2935}
2936
2937#[allow(clippy::unnecessary_box_returns)]
2939fn new_atomic_u8_array<const N: usize>(init: u8) -> Box<[AtomicU8; N]> {
2940 let v: Vec<AtomicU8> = (0..N).map(|_| AtomicU8::new(init)).collect();
2941 v.into_boxed_slice().try_into().unwrap()
2942}
2943
2944impl Default for PipelineStats {
2945 fn default() -> Self {
2946 Self::new()
2947 }
2948}
2949
2950impl PipelineStats {
2951 #[must_use]
2953 pub fn new() -> Self {
2954 Self {
2955 step_read_ns: AtomicU64::new(0),
2956 step_decompress_ns: AtomicU64::new(0),
2957 step_find_boundaries_ns: AtomicU64::new(0),
2958 step_decode_ns: AtomicU64::new(0),
2959 step_group_ns: AtomicU64::new(0),
2960 step_process_ns: AtomicU64::new(0),
2961 step_serialize_ns: AtomicU64::new(0),
2962 step_compress_ns: AtomicU64::new(0),
2963 step_write_ns: AtomicU64::new(0),
2964 step_read_count: AtomicU64::new(0),
2965 step_decompress_count: AtomicU64::new(0),
2966 step_find_boundaries_count: AtomicU64::new(0),
2967 step_decode_count: AtomicU64::new(0),
2968 step_group_count: AtomicU64::new(0),
2969 step_process_count: AtomicU64::new(0),
2970 step_serialize_count: AtomicU64::new(0),
2971 step_compress_count: AtomicU64::new(0),
2972 step_write_count: AtomicU64::new(0),
2973 read_contention: AtomicU64::new(0),
2974 boundary_contention: AtomicU64::new(0),
2975 group_contention: AtomicU64::new(0),
2976 write_contention: AtomicU64::new(0),
2977 q1_empty: AtomicU64::new(0),
2978 q2_empty: AtomicU64::new(0),
2979 q2b_empty: AtomicU64::new(0),
2980 q3_empty: AtomicU64::new(0),
2981 q4_empty: AtomicU64::new(0),
2982 q5_empty: AtomicU64::new(0),
2983 q6_empty: AtomicU64::new(0),
2984 q7_empty: AtomicU64::new(0),
2985 idle_yields: AtomicU64::new(0),
2986 per_thread_step_counts: new_atomic_2d_array::<MAX_THREADS, NUM_STEPS>(),
2988 per_thread_step_attempts: new_atomic_2d_array::<MAX_THREADS, NUM_STEPS>(),
2989 per_thread_idle_ns: new_atomic_array::<MAX_THREADS>(),
2990 per_thread_current_step: new_atomic_u8_array::<MAX_THREADS>(255), boundary_wait_ns: AtomicU64::new(0),
2992 group_wait_ns: AtomicU64::new(0),
2993 write_wait_ns: AtomicU64::new(0),
2994 batch_size_sum: AtomicU64::new(0),
2995 batch_count: AtomicU64::new(0),
2996 batch_size_min: AtomicU64::new(u64::MAX),
2997 batch_size_max: AtomicU64::new(0),
2998 num_threads: AtomicU64::new(0),
2999 bytes_read: AtomicU64::new(0),
3001 bytes_written: AtomicU64::new(0),
3002 compressed_bytes_in: AtomicU64::new(0),
3003 decompressed_bytes: AtomicU64::new(0),
3004 serialized_bytes: AtomicU64::new(0),
3005 compressed_bytes_out: AtomicU64::new(0),
3006 records_decoded: AtomicU64::new(0),
3007 groups_produced: AtomicU64::new(0),
3008 queue_samples: Mutex::new(Vec::new()),
3010 memory_drain_activations: AtomicU64::new(0),
3012 group_memory_rejects: AtomicU64::new(0),
3013 peak_memory_bytes: AtomicU64::new(0),
3014
3015 #[cfg(feature = "memory-debug")]
3016 memory: MemoryDebugStats::new(),
3017 }
3018 }
3019
3020 pub fn set_num_threads(&self, n: usize) {
3022 self.num_threads.store(n as u64, Ordering::Relaxed);
3023 }
3024
3025 #[inline]
3027 pub fn record_step(&self, step: PipelineStep, elapsed_ns: u64) {
3028 self.record_step_for_thread(step, elapsed_ns, None);
3029 }
3030
3031 #[inline]
3033 pub fn record_step_for_thread(
3034 &self,
3035 step: PipelineStep,
3036 elapsed_ns: u64,
3037 thread_id: Option<usize>,
3038 ) {
3039 match step {
3041 PipelineStep::Read => {
3042 self.step_read_ns.fetch_add(elapsed_ns, Ordering::Relaxed);
3043 self.step_read_count.fetch_add(1, Ordering::Relaxed);
3044 }
3045 PipelineStep::Decompress => {
3046 self.step_decompress_ns.fetch_add(elapsed_ns, Ordering::Relaxed);
3047 self.step_decompress_count.fetch_add(1, Ordering::Relaxed);
3048 }
3049 PipelineStep::FindBoundaries => {
3050 self.step_find_boundaries_ns.fetch_add(elapsed_ns, Ordering::Relaxed);
3051 self.step_find_boundaries_count.fetch_add(1, Ordering::Relaxed);
3052 }
3053 PipelineStep::Decode => {
3054 self.step_decode_ns.fetch_add(elapsed_ns, Ordering::Relaxed);
3055 self.step_decode_count.fetch_add(1, Ordering::Relaxed);
3056 }
3057 PipelineStep::Group => {
3058 self.step_group_ns.fetch_add(elapsed_ns, Ordering::Relaxed);
3059 self.step_group_count.fetch_add(1, Ordering::Relaxed);
3060 }
3061 PipelineStep::Process => {
3062 self.step_process_ns.fetch_add(elapsed_ns, Ordering::Relaxed);
3063 self.step_process_count.fetch_add(1, Ordering::Relaxed);
3064 }
3065 PipelineStep::Serialize => {
3066 self.step_serialize_ns.fetch_add(elapsed_ns, Ordering::Relaxed);
3067 self.step_serialize_count.fetch_add(1, Ordering::Relaxed);
3068 }
3069 PipelineStep::Compress => {
3070 self.step_compress_ns.fetch_add(elapsed_ns, Ordering::Relaxed);
3071 self.step_compress_count.fetch_add(1, Ordering::Relaxed);
3072 }
3073 PipelineStep::Write => {
3074 self.step_write_ns.fetch_add(elapsed_ns, Ordering::Relaxed);
3075 self.step_write_count.fetch_add(1, Ordering::Relaxed);
3076 }
3077 }
3078
3079 if let Some(tid) = thread_id {
3081 if tid < MAX_THREADS {
3082 let step_idx = step as usize;
3083 self.per_thread_step_counts[tid][step_idx].fetch_add(1, Ordering::Relaxed);
3084 }
3085 }
3086 }
3087
3088 #[inline]
3090 pub fn record_contention(&self, step: PipelineStep) {
3091 match step {
3092 PipelineStep::Read => {
3093 self.read_contention.fetch_add(1, Ordering::Relaxed);
3094 }
3095 PipelineStep::FindBoundaries => {
3096 self.boundary_contention.fetch_add(1, Ordering::Relaxed);
3097 }
3098 PipelineStep::Group => {
3099 self.group_contention.fetch_add(1, Ordering::Relaxed);
3100 }
3101 PipelineStep::Write => {
3102 self.write_contention.fetch_add(1, Ordering::Relaxed);
3103 }
3104 _ => {}
3105 }
3106 }
3107
3108 #[inline]
3110 pub fn record_wait_time(&self, step: PipelineStep, wait_ns: u64) {
3111 match step {
3112 PipelineStep::FindBoundaries => {
3113 self.boundary_wait_ns.fetch_add(wait_ns, Ordering::Relaxed);
3114 }
3115 PipelineStep::Group => {
3116 self.group_wait_ns.fetch_add(wait_ns, Ordering::Relaxed);
3117 }
3118 PipelineStep::Write => {
3119 self.write_wait_ns.fetch_add(wait_ns, Ordering::Relaxed);
3120 }
3121 _ => {}
3122 }
3123 }
3124
3125 #[inline]
3130 pub fn record_queue_empty(&self, queue_num: usize) {
3131 match queue_num {
3132 1 => self.q1_empty.fetch_add(1, Ordering::Relaxed),
3133 2 => self.q2_empty.fetch_add(1, Ordering::Relaxed),
3134 25 => self.q2b_empty.fetch_add(1, Ordering::Relaxed), 3 => self.q3_empty.fetch_add(1, Ordering::Relaxed),
3136 4 => self.q4_empty.fetch_add(1, Ordering::Relaxed),
3137 5 => self.q5_empty.fetch_add(1, Ordering::Relaxed),
3138 6 => self.q6_empty.fetch_add(1, Ordering::Relaxed),
3139 7 => self.q7_empty.fetch_add(1, Ordering::Relaxed),
3140 _ => 0,
3141 };
3142 }
3143
3144 #[inline]
3146 pub fn record_idle(&self) {
3147 self.idle_yields.fetch_add(1, Ordering::Relaxed);
3148 }
3149
3150 #[inline]
3152 pub fn record_idle_for_thread(&self, thread_id: usize, idle_ns: u64) {
3153 self.idle_yields.fetch_add(1, Ordering::Relaxed);
3154 if thread_id < MAX_THREADS {
3155 self.per_thread_idle_ns[thread_id].fetch_add(idle_ns, Ordering::Relaxed);
3156 }
3157 }
3158
3159 #[inline]
3162 #[allow(clippy::cast_possible_truncation)]
3163 pub fn record_step_attempt(&self, thread_id: usize, step: PipelineStep) {
3164 if thread_id < MAX_THREADS {
3165 let step_idx = step as usize;
3166 self.per_thread_step_attempts[thread_id][step_idx].fetch_add(1, Ordering::Relaxed);
3167 self.per_thread_current_step[thread_id].store(step_idx as u8, Ordering::Relaxed);
3169 }
3170 }
3171
3172 #[inline]
3174 pub fn set_current_step(&self, thread_id: usize, step: PipelineStep) {
3175 if thread_id < MAX_THREADS {
3176 self.per_thread_current_step[thread_id].store(step as u8, Ordering::Relaxed);
3177 }
3178 }
3179
3180 #[inline]
3182 pub fn clear_current_step(&self, thread_id: usize) {
3183 if thread_id < MAX_THREADS {
3184 self.per_thread_current_step[thread_id].store(255, Ordering::Relaxed);
3185 }
3186 }
3187
3188 #[allow(clippy::cast_possible_truncation)]
3190 pub fn get_thread_activity(&self, num_threads: usize) -> Vec<Option<PipelineStep>> {
3191 (0..num_threads.min(MAX_THREADS))
3192 .map(|tid| {
3193 let step_idx = self.per_thread_current_step[tid].load(Ordering::Relaxed);
3194 if step_idx < NUM_STEPS as u8 {
3195 Some(PipelineStep::from_index(step_idx as usize))
3196 } else {
3197 None }
3199 })
3200 .collect()
3201 }
3202
3203 #[inline]
3205 pub fn record_batch_size(&self, size: usize) {
3206 let size = size as u64;
3207 self.batch_size_sum.fetch_add(size, Ordering::Relaxed);
3208 self.batch_count.fetch_add(1, Ordering::Relaxed);
3209
3210 let mut current_min = self.batch_size_min.load(Ordering::Relaxed);
3212 while size < current_min {
3213 match self.batch_size_min.compare_exchange_weak(
3214 current_min,
3215 size,
3216 Ordering::Relaxed,
3217 Ordering::Relaxed,
3218 ) {
3219 Ok(_) => break,
3220 Err(actual) => current_min = actual,
3221 }
3222 }
3223
3224 let mut current_max = self.batch_size_max.load(Ordering::Relaxed);
3226 while size > current_max {
3227 match self.batch_size_max.compare_exchange_weak(
3228 current_max,
3229 size,
3230 Ordering::Relaxed,
3231 Ordering::Relaxed,
3232 ) {
3233 Ok(_) => break,
3234 Err(actual) => current_max = actual,
3235 }
3236 }
3237 }
3238
3239 pub fn add_queue_sample(&self, sample: QueueSample) {
3242 self.queue_samples.lock().push(sample);
3243 }
3244
3245 pub fn get_queue_samples(&self) -> Vec<QueueSample> {
3247 self.queue_samples.lock().clone()
3248 }
3249
3250 #[inline]
3252 pub fn record_memory_drain_activation(&self) {
3253 self.memory_drain_activations.fetch_add(1, Ordering::Relaxed);
3254 }
3255
3256 #[inline]
3258 pub fn record_group_memory_reject(&self) {
3259 self.group_memory_rejects.fetch_add(1, Ordering::Relaxed);
3260 }
3261
3262 #[inline]
3264 pub fn record_memory_usage(&self, bytes: u64) {
3265 let mut current_peak = self.peak_memory_bytes.load(Ordering::Relaxed);
3266 while bytes > current_peak {
3267 match self.peak_memory_bytes.compare_exchange_weak(
3268 current_peak,
3269 bytes,
3270 Ordering::Relaxed,
3271 Ordering::Relaxed,
3272 ) {
3273 Ok(_) => break,
3274 Err(actual) => current_peak = actual,
3275 }
3276 }
3277 }
3278
3279 #[allow(clippy::similar_names)] #[allow(
3282 clippy::too_many_lines,
3283 clippy::cast_precision_loss,
3284 clippy::cast_possible_truncation,
3285 clippy::cast_sign_loss
3286 )]
3287 pub fn format_summary(&self) -> String {
3288 use std::fmt::Write;
3289
3290 let mut s = String::new();
3291 writeln!(s, "Pipeline Statistics:").unwrap();
3292 writeln!(s).unwrap();
3293
3294 #[allow(clippy::uninlined_format_args)]
3296 let format_step = |name: &str, ns: u64, count: u64| -> String {
3297 if count == 0 {
3298 format!(" {:<20} {:>10} ops, {:>12}", name, 0, "-")
3299 } else {
3300 let total_ms = ns as f64 / 1_000_000.0;
3301 let avg_us = (ns as f64 / count as f64) / 1_000.0;
3302 format!(
3303 " {:<20} {:>10} ops, {:>10.1}ms total, {:>8.1}µs avg",
3304 name, count, total_ms, avg_us
3305 )
3306 }
3307 };
3308
3309 writeln!(s, "Step Timing:").unwrap();
3310 writeln!(
3311 s,
3312 "{}",
3313 format_step(
3314 "Read",
3315 self.step_read_ns.load(Ordering::Relaxed),
3316 self.step_read_count.load(Ordering::Relaxed)
3317 )
3318 )
3319 .unwrap();
3320 writeln!(
3321 s,
3322 "{}",
3323 format_step(
3324 "Decompress",
3325 self.step_decompress_ns.load(Ordering::Relaxed),
3326 self.step_decompress_count.load(Ordering::Relaxed)
3327 )
3328 )
3329 .unwrap();
3330 writeln!(
3331 s,
3332 "{}",
3333 format_step(
3334 "FindBoundaries",
3335 self.step_find_boundaries_ns.load(Ordering::Relaxed),
3336 self.step_find_boundaries_count.load(Ordering::Relaxed)
3337 )
3338 )
3339 .unwrap();
3340 writeln!(
3341 s,
3342 "{}",
3343 format_step(
3344 "Decode",
3345 self.step_decode_ns.load(Ordering::Relaxed),
3346 self.step_decode_count.load(Ordering::Relaxed)
3347 )
3348 )
3349 .unwrap();
3350 writeln!(
3351 s,
3352 "{}",
3353 format_step(
3354 "Group",
3355 self.step_group_ns.load(Ordering::Relaxed),
3356 self.step_group_count.load(Ordering::Relaxed)
3357 )
3358 )
3359 .unwrap();
3360 writeln!(
3361 s,
3362 "{}",
3363 format_step(
3364 "Process",
3365 self.step_process_ns.load(Ordering::Relaxed),
3366 self.step_process_count.load(Ordering::Relaxed)
3367 )
3368 )
3369 .unwrap();
3370 writeln!(
3371 s,
3372 "{}",
3373 format_step(
3374 "Serialize",
3375 self.step_serialize_ns.load(Ordering::Relaxed),
3376 self.step_serialize_count.load(Ordering::Relaxed)
3377 )
3378 )
3379 .unwrap();
3380 writeln!(
3381 s,
3382 "{}",
3383 format_step(
3384 "Compress",
3385 self.step_compress_ns.load(Ordering::Relaxed),
3386 self.step_compress_count.load(Ordering::Relaxed)
3387 )
3388 )
3389 .unwrap();
3390 writeln!(
3391 s,
3392 "{}",
3393 format_step(
3394 "Write",
3395 self.step_write_ns.load(Ordering::Relaxed),
3396 self.step_write_count.load(Ordering::Relaxed)
3397 )
3398 )
3399 .unwrap();
3400
3401 writeln!(s).unwrap();
3402 writeln!(s, "Contention:").unwrap();
3403 writeln!(
3404 s,
3405 " Read lock: {:>10} failed attempts",
3406 self.read_contention.load(Ordering::Relaxed)
3407 )
3408 .unwrap();
3409 writeln!(
3410 s,
3411 " Boundary lock: {:>10} failed attempts",
3412 self.boundary_contention.load(Ordering::Relaxed)
3413 )
3414 .unwrap();
3415 writeln!(
3416 s,
3417 " Group lock: {:>10} failed attempts",
3418 self.group_contention.load(Ordering::Relaxed)
3419 )
3420 .unwrap();
3421 writeln!(
3422 s,
3423 " Write lock: {:>10} failed attempts",
3424 self.write_contention.load(Ordering::Relaxed)
3425 )
3426 .unwrap();
3427
3428 writeln!(s).unwrap();
3429 writeln!(s, "Queue Empty Polls:").unwrap();
3430 writeln!(s, " Q1 (raw): {:>10}", self.q1_empty.load(Ordering::Relaxed)).unwrap();
3431 writeln!(s, " Q2 (decomp): {:>10}", self.q2_empty.load(Ordering::Relaxed)).unwrap();
3432 writeln!(s, " Q2b (boundary): {:>10}", self.q2b_empty.load(Ordering::Relaxed)).unwrap();
3433 writeln!(s, " Q3 (decoded): {:>10}", self.q3_empty.load(Ordering::Relaxed)).unwrap();
3434 writeln!(s, " Q4 (groups): {:>10}", self.q4_empty.load(Ordering::Relaxed)).unwrap();
3435 writeln!(s, " Q5 (processed): {:>10}", self.q5_empty.load(Ordering::Relaxed)).unwrap();
3436 writeln!(s, " Q6 (serialized): {:>10}", self.q6_empty.load(Ordering::Relaxed)).unwrap();
3437 writeln!(s, " Q7 (compressed): {:>10}", self.q7_empty.load(Ordering::Relaxed)).unwrap();
3438
3439 writeln!(s).unwrap();
3440 writeln!(s, "Idle Yields: {:>10}", self.idle_yields.load(Ordering::Relaxed)).unwrap();
3441
3442 let boundary_wait = self.boundary_wait_ns.load(Ordering::Relaxed);
3444 let group_wait = self.group_wait_ns.load(Ordering::Relaxed);
3445 let write_wait = self.write_wait_ns.load(Ordering::Relaxed);
3446 if boundary_wait > 0 || group_wait > 0 || write_wait > 0 {
3447 writeln!(s).unwrap();
3448 writeln!(s, "Lock Wait Time:").unwrap();
3449 writeln!(s, " Boundary lock: {:>10.1}ms", boundary_wait as f64 / 1_000_000.0).unwrap();
3450 writeln!(s, " Group lock: {:>10.1}ms", group_wait as f64 / 1_000_000.0).unwrap();
3451 writeln!(s, " Write lock: {:>10.1}ms", write_wait as f64 / 1_000_000.0).unwrap();
3452 }
3453
3454 let batch_count = self.batch_count.load(Ordering::Relaxed);
3456 if batch_count > 0 {
3457 let batch_sum = self.batch_size_sum.load(Ordering::Relaxed);
3458 let batch_min = self.batch_size_min.load(Ordering::Relaxed);
3459 let batch_max = self.batch_size_max.load(Ordering::Relaxed);
3460 let batch_avg = batch_sum as f64 / batch_count as f64;
3461
3462 writeln!(s).unwrap();
3463 writeln!(s, "Batch Size (records per batch):").unwrap();
3464 writeln!(s, " Count: {batch_count:>10}").unwrap();
3465 writeln!(s, " Min: {batch_min:>10}").unwrap();
3466 writeln!(s, " Max: {batch_max:>10}").unwrap();
3467 writeln!(s, " Average: {batch_avg:>10.1}").unwrap();
3468 }
3469
3470 let num_threads = self.num_threads.load(Ordering::Relaxed) as usize;
3472 if num_threads > 0 {
3473 writeln!(s).unwrap();
3474 writeln!(s, "Per-Thread Work Distribution:").unwrap();
3475
3476 let step_names = ["Rd", "Dc", "Fb", "De", "Gr", "Pr", "Se", "Co", "Wr"];
3478
3479 write!(s, " Thread ").unwrap();
3481 for name in &step_names {
3482 write!(s, " {name:>6}").unwrap();
3483 }
3484 writeln!(s, " Idle ms").unwrap();
3485
3486 for tid in 0..num_threads.min(MAX_THREADS) {
3488 write!(s, " T{tid:<5} ").unwrap();
3489 for step_idx in 0..NUM_STEPS {
3490 let count = self.per_thread_step_counts[tid][step_idx].load(Ordering::Relaxed);
3491 write!(s, " {count:>6}").unwrap();
3492 }
3493 let idle_ns = self.per_thread_idle_ns[tid].load(Ordering::Relaxed);
3494 writeln!(s, " {:>10.1}", idle_ns as f64 / 1_000_000.0).unwrap();
3495 }
3496
3497 write!(s, " Total ").unwrap();
3499 for step_idx in 0..NUM_STEPS {
3500 let mut total = 0u64;
3501 for tid in 0..num_threads.min(MAX_THREADS) {
3502 total += self.per_thread_step_counts[tid][step_idx].load(Ordering::Relaxed);
3503 }
3504 write!(s, " {total:>6}").unwrap();
3505 }
3506 let total_idle: u64 = (0..num_threads.min(MAX_THREADS))
3507 .map(|tid| self.per_thread_idle_ns[tid].load(Ordering::Relaxed))
3508 .sum();
3509 writeln!(s, " {:>10.1}", total_idle as f64 / 1_000_000.0).unwrap();
3510
3511 writeln!(s).unwrap();
3513 writeln!(s, "Per-Thread Attempt Success Rate:").unwrap();
3514
3515 write!(s, " Thread ").unwrap();
3517 for name in &step_names {
3518 write!(s, " {name:>6}").unwrap();
3519 }
3520 writeln!(s, " Total%").unwrap();
3521
3522 for tid in 0..num_threads.min(MAX_THREADS) {
3524 write!(s, " T{tid:<5} ").unwrap();
3525 let mut thread_attempts = 0u64;
3526 let mut thread_successes = 0u64;
3527 for step_idx in 0..NUM_STEPS {
3528 let attempts =
3529 self.per_thread_step_attempts[tid][step_idx].load(Ordering::Relaxed);
3530 let successes =
3531 self.per_thread_step_counts[tid][step_idx].load(Ordering::Relaxed);
3532 thread_attempts += attempts;
3533 thread_successes += successes;
3534 if attempts == 0 {
3535 write!(s, " - ").unwrap();
3536 } else {
3537 let rate = (successes as f64 / attempts as f64) * 100.0;
3538 write!(s, " {rate:>5.0}%").unwrap();
3539 }
3540 }
3541 if thread_attempts == 0 {
3542 writeln!(s, " -").unwrap();
3543 } else {
3544 let total_rate = (thread_successes as f64 / thread_attempts as f64) * 100.0;
3545 writeln!(s, " {total_rate:>5.1}%").unwrap();
3546 }
3547 }
3548 }
3549
3550 let total_work_ns = self.step_read_ns.load(Ordering::Relaxed)
3552 + self.step_decompress_ns.load(Ordering::Relaxed)
3553 + self.step_find_boundaries_ns.load(Ordering::Relaxed)
3554 + self.step_decode_ns.load(Ordering::Relaxed)
3555 + self.step_group_ns.load(Ordering::Relaxed)
3556 + self.step_process_ns.load(Ordering::Relaxed)
3557 + self.step_serialize_ns.load(Ordering::Relaxed)
3558 + self.step_compress_ns.load(Ordering::Relaxed)
3559 + self.step_write_ns.load(Ordering::Relaxed);
3560
3561 let total_idle_ns: u64 = (0..num_threads.min(MAX_THREADS))
3562 .map(|tid| self.per_thread_idle_ns[tid].load(Ordering::Relaxed))
3563 .sum();
3564
3565 let total_contention = self.read_contention.load(Ordering::Relaxed)
3566 + self.boundary_contention.load(Ordering::Relaxed)
3567 + self.group_contention.load(Ordering::Relaxed)
3568 + self.write_contention.load(Ordering::Relaxed);
3569
3570 if total_work_ns > 0 {
3571 writeln!(s).unwrap();
3572 writeln!(s, "Thread Utilization:").unwrap();
3573
3574 let work_ms = total_work_ns as f64 / 1_000_000.0;
3575 let idle_ms = total_idle_ns as f64 / 1_000_000.0;
3576 let total_thread_ms = work_ms + idle_ms;
3577
3578 if total_thread_ms > 0.0 {
3579 let utilization = (work_ms / total_thread_ms) * 100.0;
3580 writeln!(s, " Work time: {work_ms:>10.1}ms").unwrap();
3581 writeln!(s, " Idle time: {idle_ms:>10.1}ms").unwrap();
3582 writeln!(s, " Utilization: {utilization:>10.1}%").unwrap();
3583 writeln!(s, " Contention attempts: {total_contention:>7}").unwrap();
3584 }
3585 }
3586
3587 let bytes_read = self.bytes_read.load(Ordering::Relaxed);
3589 let bytes_written = self.bytes_written.load(Ordering::Relaxed);
3590 let compressed_bytes_in = self.compressed_bytes_in.load(Ordering::Relaxed);
3591 let decompressed_bytes = self.decompressed_bytes.load(Ordering::Relaxed);
3592 let serialized_bytes = self.serialized_bytes.load(Ordering::Relaxed);
3593 let compressed_bytes_out = self.compressed_bytes_out.load(Ordering::Relaxed);
3594 let records_decoded = self.records_decoded.load(Ordering::Relaxed);
3595 let groups_produced = self.groups_produced.load(Ordering::Relaxed);
3596
3597 if bytes_read > 0 || bytes_written > 0 {
3599 writeln!(s).unwrap();
3600 writeln!(s, "Throughput:").unwrap();
3601
3602 let format_bytes = |bytes: u64| -> String {
3604 if bytes >= 1_000_000_000 {
3605 format!("{:.2} GB", bytes as f64 / 1_000_000_000.0)
3606 } else if bytes >= 1_000_000 {
3607 format!("{:.1} MB", bytes as f64 / 1_000_000.0)
3608 } else if bytes >= 1_000 {
3609 format!("{:.1} KB", bytes as f64 / 1_000.0)
3610 } else {
3611 format!("{bytes} B")
3612 }
3613 };
3614
3615 let format_count = |count: u64| -> String {
3617 if count >= 1_000_000 {
3618 format!("{:.2}M", count as f64 / 1_000_000.0)
3619 } else if count >= 1_000 {
3620 format!("{:.1}K", count as f64 / 1_000.0)
3621 } else {
3622 format!("{count}")
3623 }
3624 };
3625
3626 let read_ns = self.step_read_ns.load(Ordering::Relaxed);
3628 if bytes_read > 0 && read_ns > 0 {
3629 let read_ms = read_ns as f64 / 1_000_000.0;
3630 let read_mb_s = (bytes_read as f64 / 1_000_000.0) / (read_ms / 1000.0);
3631 writeln!(
3632 s,
3633 " Read: {:>10} in {:>8.1}ms = {:>8.1} MB/s",
3634 format_bytes(bytes_read),
3635 read_ms,
3636 read_mb_s
3637 )
3638 .unwrap();
3639 }
3640
3641 let decompress_ns = self.step_decompress_ns.load(Ordering::Relaxed);
3643 if compressed_bytes_in > 0 && decompressed_bytes > 0 && decompress_ns > 0 {
3644 let decompress_ms = decompress_ns as f64 / 1_000_000.0;
3645 let in_mb_s = (compressed_bytes_in as f64 / 1_000_000.0) / (decompress_ms / 1000.0);
3646 let out_mb_s = (decompressed_bytes as f64 / 1_000_000.0) / (decompress_ms / 1000.0);
3647 let expansion = decompressed_bytes as f64 / compressed_bytes_in as f64;
3648 writeln!(
3649 s,
3650 " Decompress: {:>10} → {:>10} = {:>6.1} → {:>6.1} MB/s ({:.2}x expansion)",
3651 format_bytes(compressed_bytes_in),
3652 format_bytes(decompressed_bytes),
3653 in_mb_s,
3654 out_mb_s,
3655 expansion
3656 )
3657 .unwrap();
3658 }
3659
3660 let decode_ns = self.step_decode_ns.load(Ordering::Relaxed);
3662 if records_decoded > 0 && decode_ns > 0 {
3663 let decode_ms = decode_ns as f64 / 1_000_000.0;
3664 let records_per_s = records_decoded as f64 / (decode_ms / 1000.0);
3665 writeln!(
3666 s,
3667 " Decode: {:>10} records = {:>8} records/s",
3668 format_count(records_decoded),
3669 format_count(records_per_s as u64)
3670 )
3671 .unwrap();
3672 }
3673
3674 let group_ns = self.step_group_ns.load(Ordering::Relaxed);
3676 if records_decoded > 0 && groups_produced > 0 && group_ns > 0 {
3677 let group_ms = group_ns as f64 / 1_000_000.0;
3678 let records_in_per_s = records_decoded as f64 / (group_ms / 1000.0);
3679 let groups_out_per_s = groups_produced as f64 / (group_ms / 1000.0);
3680 writeln!(
3681 s,
3682 " Group: {:>10} → {:>10} = {:>6} records/s in, {:>6} groups/s out",
3683 format_count(records_decoded),
3684 format_count(groups_produced),
3685 format_count(records_in_per_s as u64),
3686 format_count(groups_out_per_s as u64)
3687 )
3688 .unwrap();
3689 }
3690
3691 let process_ns = self.step_process_ns.load(Ordering::Relaxed);
3693 if groups_produced > 0 && process_ns > 0 {
3694 let process_ms = process_ns as f64 / 1_000_000.0;
3695 let groups_per_s = groups_produced as f64 / (process_ms / 1000.0);
3696 writeln!(
3697 s,
3698 " Process: {:>10} groups = {:>8} groups/s",
3699 format_count(groups_produced),
3700 format_count(groups_per_s as u64)
3701 )
3702 .unwrap();
3703 }
3704
3705 let serialize_ns = self.step_serialize_ns.load(Ordering::Relaxed);
3707 if serialized_bytes > 0 && serialize_ns > 0 {
3708 let serialize_ms = serialize_ns as f64 / 1_000_000.0;
3709 let mb_per_s = (serialized_bytes as f64 / 1_000_000.0) / (serialize_ms / 1000.0);
3710 writeln!(
3711 s,
3712 " Serialize: {:>10} = {:>8.1} MB/s",
3713 format_bytes(serialized_bytes),
3714 mb_per_s
3715 )
3716 .unwrap();
3717 }
3718
3719 let compress_ns = self.step_compress_ns.load(Ordering::Relaxed);
3721 if serialized_bytes > 0 && compressed_bytes_out > 0 && compress_ns > 0 {
3722 let compress_ms = compress_ns as f64 / 1_000_000.0;
3723 let in_mb_s = (serialized_bytes as f64 / 1_000_000.0) / (compress_ms / 1000.0);
3724 let out_mb_s = (compressed_bytes_out as f64 / 1_000_000.0) / (compress_ms / 1000.0);
3725 let compression = serialized_bytes as f64 / compressed_bytes_out as f64;
3726 writeln!(
3727 s,
3728 " Compress: {:>10} → {:>10} = {:>6.1} → {:>6.1} MB/s ({:.2}x compression)",
3729 format_bytes(serialized_bytes),
3730 format_bytes(compressed_bytes_out),
3731 in_mb_s,
3732 out_mb_s,
3733 compression
3734 )
3735 .unwrap();
3736 }
3737
3738 let write_ns = self.step_write_ns.load(Ordering::Relaxed);
3740 if bytes_written > 0 && write_ns > 0 {
3741 let write_ms = write_ns as f64 / 1_000_000.0;
3742 let write_mb_s = (bytes_written as f64 / 1_000_000.0) / (write_ms / 1000.0);
3743 writeln!(
3744 s,
3745 " Write: {:>10} in {:>8.1}ms = {:>8.1} MB/s",
3746 format_bytes(bytes_written),
3747 write_ms,
3748 write_mb_s
3749 )
3750 .unwrap();
3751 }
3752 }
3753
3754 let samples = self.queue_samples.lock();
3756 if !samples.is_empty() {
3757 writeln!(s).unwrap();
3758 writeln!(s, "Queue Size Timeline ({} samples at ~100ms intervals):", samples.len())
3759 .unwrap();
3760 writeln!(
3761 s,
3762 " Time Q1 Q2 Q2b Q3 Q4 Q5 Q6 Q7 | R2 R3 R7 | R3_MB Threads"
3763 )
3764 .unwrap();
3765
3766 for sample in samples.iter() {
3768 let r3_mb = sample.reorder_memory_bytes[1] as f64 / 1_048_576.0;
3769 write!(
3770 s,
3771 " {:>4} {:>3} {:>3} {:>3} {:>3} {:>3} {:>3} {:>3} {:>3} | {:>3} {:>3} {:>3} | {:>6.1} ",
3772 sample.time_ms,
3773 sample.queue_sizes[0],
3774 sample.queue_sizes[1],
3775 sample.queue_sizes[2],
3776 sample.queue_sizes[3],
3777 sample.queue_sizes[4],
3778 sample.queue_sizes[5],
3779 sample.queue_sizes[6],
3780 sample.queue_sizes[7],
3781 sample.reorder_sizes[0],
3782 sample.reorder_sizes[1],
3783 sample.reorder_sizes[2],
3784 r3_mb,
3785 )
3786 .unwrap();
3787 for &step_idx in &sample.thread_steps {
3789 if step_idx < NUM_STEPS as u8 {
3790 let short = match step_idx {
3791 0 => "R",
3792 1 => "D",
3793 2 => "F",
3794 3 => "d",
3795 4 => "G",
3796 5 => "P",
3797 6 => "S",
3798 7 => "C",
3799 8 => "W",
3800 _ => "?",
3801 };
3802 write!(s, "{short}").unwrap();
3803 } else {
3804 write!(s, ".").unwrap();
3805 }
3806 }
3807 writeln!(s).unwrap();
3808 }
3809
3810 let peak_r3_items = samples.iter().map(|s| s.reorder_sizes[1]).max().unwrap_or(0);
3812 let peak_r3_bytes =
3813 samples.iter().map(|s| s.reorder_memory_bytes[1]).max().unwrap_or(0);
3814 let peak_r3_mb = peak_r3_bytes as f64 / 1_048_576.0;
3815 writeln!(s).unwrap();
3816 writeln!(s, "Peak Q3 Reorder Buffer: {peak_r3_items} items, {peak_r3_mb:.1} MB")
3817 .unwrap();
3818 }
3819
3820 let group_rejects = self.group_memory_rejects.load(Ordering::Relaxed);
3822 let peak_memory = self.peak_memory_bytes.load(Ordering::Relaxed);
3823
3824 if group_rejects > 0 || peak_memory > 0 {
3825 writeln!(s).unwrap();
3826 writeln!(s, "Memory Limiting:").unwrap();
3827 if group_rejects > 0 {
3828 writeln!(s, " Group rejects (memory): {group_rejects:>10}").unwrap();
3829 }
3830 if peak_memory > 0 {
3831 let peak_mb = peak_memory as f64 / 1_048_576.0;
3832 writeln!(s, " Peak memory usage: {peak_mb:>10.1} MB").unwrap();
3833 }
3834 }
3835
3836 s
3837 }
3838
3839 pub fn log_summary(&self) {
3841 for line in self.format_summary().lines() {
3842 info!("{line}");
3843 }
3844 }
3845}
3846
3847#[cfg(feature = "memory-debug")]
3852impl PipelineStats {
3853 pub fn track_queue_memory_add(&self, queue_name: &str, size: usize) {
3855 let m = &self.memory;
3856 let size_u64 = size as u64;
3857 match queue_name {
3858 "q1" => m.q1_memory_bytes.fetch_add(size_u64, Ordering::Relaxed),
3859 "q2" => m.q2_memory_bytes.fetch_add(size_u64, Ordering::Relaxed),
3860 "q3" => m.q3_memory_bytes.fetch_add(size_u64, Ordering::Relaxed),
3861 "q4" => m.q4_memory_bytes.fetch_add(size_u64, Ordering::Relaxed),
3862 "q5" => m.q5_memory_bytes.fetch_add(size_u64, Ordering::Relaxed),
3863 "q6" => m.q6_memory_bytes.fetch_add(size_u64, Ordering::Relaxed),
3864 "q7" => m.q7_memory_bytes.fetch_add(size_u64, Ordering::Relaxed),
3865 _ => 0,
3866 };
3867 }
3868
3869 pub fn track_queue_memory_remove(&self, queue_name: &str, size: usize) {
3872 let m = &self.memory;
3873 let counter = match queue_name {
3874 "q1" => &m.q1_memory_bytes,
3875 "q2" => &m.q2_memory_bytes,
3876 "q3" => &m.q3_memory_bytes,
3877 "q4" => &m.q4_memory_bytes,
3878 "q5" => &m.q5_memory_bytes,
3879 "q6" => &m.q6_memory_bytes,
3880 "q7" => &m.q7_memory_bytes,
3881 _ => return,
3882 };
3883 let size_u64 = size as u64;
3884 let mut current = counter.load(Ordering::Relaxed);
3885 loop {
3886 let new_val = current.saturating_sub(size_u64);
3887 match counter.compare_exchange_weak(
3888 current,
3889 new_val,
3890 Ordering::Relaxed,
3891 Ordering::Relaxed,
3892 ) {
3893 Ok(_) => break,
3894 Err(actual) => current = actual,
3895 }
3896 }
3897 }
3898
3899 pub fn track_position_group_memory(&self, size: usize, is_allocation: bool) {
3902 let counter = &self.memory.position_group_processing_bytes;
3903 let size_u64 = size as u64;
3904 if is_allocation {
3905 counter.fetch_add(size_u64, Ordering::Relaxed);
3906 } else {
3907 let mut current = counter.load(Ordering::Relaxed);
3908 loop {
3909 let new_val = current.saturating_sub(size_u64);
3910 match counter.compare_exchange_weak(
3911 current,
3912 new_val,
3913 Ordering::Relaxed,
3914 Ordering::Relaxed,
3915 ) {
3916 Ok(_) => break,
3917 Err(actual) => current = actual,
3918 }
3919 }
3920 }
3921 }
3922
3923 pub fn track_template_memory(&self, size: usize, is_allocation: bool) {
3926 let counter = &self.memory.template_processing_bytes;
3927 let size_u64 = size as u64;
3928 if is_allocation {
3929 counter.fetch_add(size_u64, Ordering::Relaxed);
3930 } else {
3931 let mut current = counter.load(Ordering::Relaxed);
3932 loop {
3933 let new_val = current.saturating_sub(size_u64);
3934 match counter.compare_exchange_weak(
3935 current,
3936 new_val,
3937 Ordering::Relaxed,
3938 Ordering::Relaxed,
3939 ) {
3940 Ok(_) => break,
3941 Err(actual) => current = actual,
3942 }
3943 }
3944 }
3945 }
3946
3947 pub fn update_system_rss(&self, rss_bytes: u64) {
3949 self.memory.system_rss_bytes.store(rss_bytes, Ordering::Relaxed);
3950 }
3951
3952 pub fn set_infrastructure_memory(&self, num_threads: usize, queue_capacity: usize) {
3954 let m = &self.memory;
3955 m.decompressor_memory_bytes.store(num_threads as u64 * 32 * 1024, Ordering::Relaxed);
3956 m.compressor_memory_bytes.store(num_threads as u64 * 280 * 1024, Ordering::Relaxed);
3957 m.worker_buffer_memory_bytes.store(num_threads as u64 * 512 * 1024, Ordering::Relaxed);
3958 m.io_buffer_memory_bytes.store(16u64 * 1024 * 1024, Ordering::Relaxed);
3959 m.thread_stack_memory_bytes
3960 .store((num_threads as u64 + 1) * 2 * 1024 * 1024, Ordering::Relaxed);
3961 m.queue_capacity_memory_bytes.store(7u64 * queue_capacity as u64 * 128, Ordering::Relaxed);
3962 }
3963
3964 pub fn update_queue_memory_from_external(&self, queue_stats: &[(&str, u64)]) {
3966 let m = &self.memory;
3967 for (queue_name, current_bytes) in queue_stats {
3968 match *queue_name {
3969 "q1" => m.q1_memory_bytes.store(*current_bytes, Ordering::Relaxed),
3970 "q2" => m.q2_memory_bytes.store(*current_bytes, Ordering::Relaxed),
3971 "q3" => m.q3_memory_bytes.store(*current_bytes, Ordering::Relaxed),
3972 "q4" => m.q4_memory_bytes.store(*current_bytes, Ordering::Relaxed),
3973 "q5" => m.q5_memory_bytes.store(*current_bytes, Ordering::Relaxed),
3974 "q6" => m.q6_memory_bytes.store(*current_bytes, Ordering::Relaxed),
3975 "q7" => m.q7_memory_bytes.store(*current_bytes, Ordering::Relaxed),
3976 _ => {}
3977 }
3978 }
3979 }
3980
3981 pub fn get_memory_breakdown(&self) -> MemoryBreakdown {
3983 let m = &self.memory;
3984
3985 let q1 = m.q1_memory_bytes.load(Ordering::Relaxed);
3987 let q2 = m.q2_memory_bytes.load(Ordering::Relaxed);
3988 let q3 = m.q3_memory_bytes.load(Ordering::Relaxed);
3989 let q4 = m.q4_memory_bytes.load(Ordering::Relaxed);
3990 let q5 = m.q5_memory_bytes.load(Ordering::Relaxed);
3991 let q6 = m.q6_memory_bytes.load(Ordering::Relaxed);
3992 let q7 = m.q7_memory_bytes.load(Ordering::Relaxed);
3993 let queue_total = q1 + q2 + q3 + q4 + q5 + q6 + q7;
3994
3995 let pos_groups = m.position_group_processing_bytes.load(Ordering::Relaxed);
3996 let templates = m.template_processing_bytes.load(Ordering::Relaxed);
3997 let reorder = m.reorder_buffer_bytes.load(Ordering::Relaxed);
3998 let grouper = m.grouper_memory_bytes.load(Ordering::Relaxed);
3999 let worker_local = m.worker_local_memory_bytes.load(Ordering::Relaxed);
4000 let processing_total = pos_groups + templates + reorder + grouper + worker_local;
4001
4002 let infra_decompressors = m.decompressor_memory_bytes.load(Ordering::Relaxed);
4003 let infra_compressors = m.compressor_memory_bytes.load(Ordering::Relaxed);
4004 let infra_buffers = m.worker_buffer_memory_bytes.load(Ordering::Relaxed);
4005 let infra_io = m.io_buffer_memory_bytes.load(Ordering::Relaxed);
4006 let infra_stacks = m.thread_stack_memory_bytes.load(Ordering::Relaxed);
4007 let infra_queues = m.queue_capacity_memory_bytes.load(Ordering::Relaxed);
4008 let infra_total = infra_decompressors
4009 + infra_compressors
4010 + infra_buffers
4011 + infra_io
4012 + infra_stacks
4013 + infra_queues;
4014
4015 let tracked_total = queue_total + processing_total + infra_total;
4016 let system_rss = m.system_rss_bytes.load(Ordering::Relaxed);
4017 let untracked = system_rss.saturating_sub(tracked_total);
4018
4019 MemoryBreakdown {
4020 system_rss_gb: system_rss as f64 / 1e9,
4021 tracked_total_gb: tracked_total as f64 / 1e9,
4022 untracked_gb: untracked as f64 / 1e9,
4023
4024 q1_mb: q1 as f64 / 1e6,
4025 q2_mb: q2 as f64 / 1e6,
4026 q3_mb: q3 as f64 / 1e6,
4027 q4_gb: q4 as f64 / 1e9,
4028 q5_gb: q5 as f64 / 1e9,
4029 q6_mb: q6 as f64 / 1e6,
4030 q7_mb: q7 as f64 / 1e6,
4031
4032 position_groups_gb: pos_groups as f64 / 1e9,
4033 templates_gb: templates as f64 / 1e9,
4034 reorder_buffers_mb: reorder as f64 / 1e6,
4035 grouper_mb: grouper as f64 / 1e6,
4036 worker_local_mb: worker_local as f64 / 1e6,
4037
4038 decompressors_mb: infra_decompressors as f64 / 1e6,
4039 compressors_mb: infra_compressors as f64 / 1e6,
4040 worker_buffers_mb: infra_buffers as f64 / 1e6,
4041 io_buffers_mb: infra_io as f64 / 1e6,
4042 thread_stacks_mb: infra_stacks as f64 / 1e6,
4043 queue_capacity_mb: infra_queues as f64 / 1e6,
4044 infrastructure_gb: infra_total as f64 / 1e9,
4045 }
4046 }
4047}
4048
4049pub trait OutputPipelineState: Send + Sync {
4058 type Processed: Send;
4060
4061 fn has_error(&self) -> bool;
4063 fn set_error(&self, error: io::Error);
4064
4065 fn q5_pop(&self) -> Option<(u64, SerializedBatch)>;
4067 fn q5_push(&self, item: (u64, SerializedBatch)) -> Result<(), (u64, SerializedBatch)>;
4071 fn q5_is_full(&self) -> bool;
4072 fn q5_track_pop(&self, _heap_size: u64) {}
4074
4075 fn q6_pop(&self) -> Option<(u64, CompressedBlockBatch)>;
4077 fn q6_push(&self, item: (u64, CompressedBlockBatch))
4081 -> Result<(), (u64, CompressedBlockBatch)>;
4082 fn q6_is_full(&self) -> bool;
4083 fn q6_track_pop(&self, _heap_size: u64) {}
4085
4086 fn q6_reorder_insert(&self, serial: u64, batch: CompressedBlockBatch);
4088 fn q6_reorder_try_pop_next(&self) -> Option<CompressedBlockBatch>;
4089
4090 fn output_try_lock(&self)
4092 -> Option<parking_lot::MutexGuard<'_, Option<Box<dyn Write + Send>>>>;
4093
4094 fn increment_written(&self) -> u64;
4096
4097 fn record_compressed_bytes_out(&self, _bytes: u64) {}
4100
4101 fn record_q6_pop_progress(&self) {}
4104 fn record_q7_push_progress(&self) {}
4106
4107 fn stats(&self) -> Option<&PipelineStats> {
4110 None
4111 }
4112}
4113
4114pub trait HasCompressor {
4116 fn compressor_mut(&mut self) -> &mut InlineBgzfCompressor;
4117}
4118
4119pub trait HasRecycledBuffers {
4126 fn take_or_alloc_buffer(&mut self, capacity: usize) -> Vec<u8>;
4128 fn recycle_buffer(&mut self, buf: Vec<u8>);
4130}
4131
4132pub trait WorkerStateCommon {
4151 fn has_any_held_items(&self) -> bool;
4157
4158 fn clear_held_items(&mut self);
4163}
4164
4165pub trait HasWorkerCore {
4169 fn core(&self) -> &WorkerCoreState;
4170 fn core_mut(&mut self) -> &mut WorkerCoreState;
4171}
4172
4173#[inline]
4179#[allow(clippy::cast_possible_truncation)]
4180pub fn handle_worker_backoff<W: HasWorkerCore>(
4181 worker: &mut W,
4182 stats: Option<&PipelineStats>,
4183 did_work: bool,
4184) {
4185 if did_work {
4186 worker.core_mut().reset_backoff();
4187 } else {
4188 if let Some(stats) = stats {
4189 let tid = worker.core().scheduler.thread_id();
4190 stats.clear_current_step(tid);
4192 let idle_start = Instant::now();
4193 worker.core_mut().sleep_backoff();
4194 stats.record_idle_for_thread(tid, idle_start.elapsed().as_nanos() as u64);
4195 } else {
4196 worker.core_mut().sleep_backoff();
4197 }
4198 worker.core_mut().increase_backoff();
4199 }
4200}
4201
4202pub trait StepContext {
4212 type Worker: WorkerStateCommon + HasWorkerCore;
4214
4215 fn execute_step(&self, worker: &mut Self::Worker, step: PipelineStep) -> (bool, bool);
4217
4218 fn get_backpressure(&self, worker: &Self::Worker) -> BackpressureState;
4220
4221 fn check_drain_mode(&self);
4223
4224 fn has_error(&self) -> bool;
4226
4227 fn is_complete(&self) -> bool;
4229
4230 fn stats(&self) -> Option<&PipelineStats>;
4232
4233 fn skip_read(&self) -> bool;
4236
4237 fn check_completion_at_end(&self) -> bool {
4240 false
4241 }
4242
4243 fn should_attempt_sticky_read(&self) -> bool {
4251 false
4252 }
4253
4254 fn sticky_read_should_continue(&self) -> bool {
4259 false
4260 }
4261
4262 fn execute_read_step(&self, _worker: &mut Self::Worker) -> bool {
4264 false
4265 }
4266
4267 fn is_drain_mode(&self) -> bool {
4270 false
4271 }
4272
4273 fn should_attempt_step(
4276 &self,
4277 _worker: &Self::Worker,
4278 _step: PipelineStep,
4279 _drain_mode: bool,
4280 ) -> bool {
4281 true
4282 }
4283
4284 fn exclusive_step_owned(&self, _worker: &Self::Worker) -> Option<PipelineStep> {
4288 None
4289 }
4290}
4291
4292#[allow(clippy::too_many_lines, clippy::cast_possible_truncation)]
4297pub fn generic_worker_loop<C: StepContext>(ctx: &C, worker: &mut C::Worker) {
4298 let collect_stats = ctx.stats().is_some();
4299 let check_completion_at_end = ctx.check_completion_at_end();
4300
4301 loop {
4302 if ctx.has_error() {
4304 break;
4305 }
4306
4307 if !check_completion_at_end && ctx.is_complete() && !worker.has_any_held_items() {
4310 break;
4311 }
4312
4313 let mut did_work = false;
4314
4315 if ctx.should_attempt_sticky_read() {
4318 while ctx.sticky_read_should_continue() {
4319 if let Some(stats) = ctx.stats() {
4321 stats.record_step_attempt(
4322 worker.core().scheduler.thread_id(),
4323 PipelineStep::Read,
4324 );
4325 }
4326
4327 let success = if collect_stats {
4328 let start = Instant::now();
4329 let success = ctx.execute_read_step(worker);
4330 if success {
4331 if let Some(stats) = ctx.stats() {
4332 stats.record_step_for_thread(
4333 PipelineStep::Read,
4334 start.elapsed().as_nanos() as u64,
4335 Some(worker.core().scheduler.thread_id()),
4336 );
4337 }
4338 }
4339 success
4340 } else {
4341 ctx.execute_read_step(worker)
4342 };
4343
4344 if success {
4345 did_work = true;
4346 } else {
4347 break;
4348 }
4349 }
4350 }
4351
4352 ctx.check_drain_mode();
4354
4355 let backpressure = ctx.get_backpressure(worker);
4357 let priorities_slice = worker.core_mut().scheduler.get_priorities(backpressure);
4358 let priority_count = priorities_slice.len().min(9);
4359 let mut priorities = [PipelineStep::Read; 9];
4360 priorities[..priority_count].copy_from_slice(&priorities_slice[..priority_count]);
4361
4362 let drain_mode = ctx.is_drain_mode();
4363
4364 let owned_step = ctx.exclusive_step_owned(worker);
4367 if let Some(step) = owned_step {
4368 if step != PipelineStep::Read && !ctx.has_error() {
4369 if let Some(stats) = ctx.stats() {
4371 stats.record_step_attempt(worker.core().scheduler.thread_id(), step);
4372 }
4373
4374 let (success, elapsed_ns, was_contention) = if collect_stats {
4375 let start = Instant::now();
4376 let (success, was_contention) = ctx.execute_step(worker, step);
4377 (success, start.elapsed().as_nanos() as u64, was_contention)
4378 } else {
4379 let (success, was_contention) = ctx.execute_step(worker, step);
4380 (success, 0, was_contention)
4381 };
4382
4383 worker.core_mut().scheduler.record_outcome(step, success, was_contention);
4384
4385 if success {
4386 if let Some(stats) = ctx.stats() {
4387 stats.record_step_for_thread(
4388 step,
4389 elapsed_ns,
4390 Some(worker.core().scheduler.thread_id()),
4391 );
4392 }
4393 did_work = true;
4394 }
4395 }
4396 }
4397
4398 if !did_work {
4400 for &step in &priorities[..priority_count] {
4401 if ctx.has_error() {
4402 break;
4403 }
4404
4405 if ctx.skip_read() && step == PipelineStep::Read {
4407 continue;
4408 }
4409
4410 if Some(step) == owned_step {
4412 continue;
4413 }
4414
4415 if !ctx.should_attempt_step(worker, step, drain_mode) {
4417 continue;
4418 }
4419
4420 if let Some(stats) = ctx.stats() {
4422 stats.record_step_attempt(worker.core().scheduler.thread_id(), step);
4423 }
4424
4425 let (success, elapsed_ns, was_contention) = if collect_stats {
4427 let start = Instant::now();
4428 let (success, was_contention) = ctx.execute_step(worker, step);
4429 (success, start.elapsed().as_nanos() as u64, was_contention)
4430 } else {
4431 let (success, was_contention) = ctx.execute_step(worker, step);
4432 (success, 0, was_contention)
4433 };
4434
4435 worker.core_mut().scheduler.record_outcome(step, success, was_contention);
4437
4438 if success {
4439 if let Some(stats) = ctx.stats() {
4440 stats.record_step_for_thread(
4441 step,
4442 elapsed_ns,
4443 Some(worker.core().scheduler.thread_id()),
4444 );
4445 }
4446 did_work = true;
4447 break; }
4449 }
4450 }
4451
4452 if check_completion_at_end && ctx.is_complete() && !worker.has_any_held_items() {
4455 break;
4456 }
4457
4458 handle_worker_backoff(worker, ctx.stats(), did_work);
4460 }
4461}
4462
4463pub trait HasHeldCompressed {
4470 fn held_compressed_mut(&mut self) -> &mut Option<(u64, CompressedBlockBatch, usize)>;
4473}
4474
4475pub trait HasHeldBoundaries<B> {
4491 fn held_boundaries_mut(&mut self) -> &mut Option<(u64, B)>;
4492}
4493
4494pub fn shared_try_step_compress<S, W>(state: &S, worker: &mut W) -> StepResult
4515where
4516 S: OutputPipelineState,
4517 W: HasCompressor + HasHeldCompressed + HasRecycledBuffers,
4518{
4519 if let Some((serial, held, _heap_size)) = worker.held_compressed_mut().take() {
4523 match state.q6_push((serial, held)) {
4524 Ok(()) => {
4525 state.record_q7_push_progress();
4527 }
4528 Err((serial, held)) => {
4529 let heap_size = held.estimate_heap_size();
4531 *worker.held_compressed_mut() = Some((serial, held, heap_size));
4532 return StepResult::OutputFull;
4533 }
4534 }
4535 }
4536
4537 if state.q6_is_full() {
4541 return StepResult::OutputFull;
4542 }
4543
4544 let Some((serial, serialized)) = state.q5_pop() else {
4548 if let Some(stats) = state.stats() {
4549 stats.record_queue_empty(6);
4550 }
4551 return StepResult::InputEmpty;
4552 };
4553 state.record_q6_pop_progress();
4554
4555 let q5_heap_size = serialized.estimate_heap_size() as u64;
4557 state.q5_track_pop(q5_heap_size);
4558
4559 let SerializedBatch { data, record_count, secondary_data } = serialized;
4563
4564 let blocks = {
4566 let compressor = worker.compressor_mut();
4567
4568 if let Err(e) = compressor.write_all(&data) {
4569 state.set_error(e);
4570 return StepResult::InputEmpty;
4571 }
4572 if let Err(e) = compressor.flush() {
4573 state.set_error(e);
4574 return StepResult::InputEmpty;
4575 }
4576
4577 compressor.take_blocks()
4579 };
4580
4581 worker.recycle_buffer(data);
4583
4584 let compressed_bytes: u64 = blocks.iter().map(|b| b.data.len() as u64).sum();
4586 state.record_compressed_bytes_out(compressed_bytes);
4587
4588 let batch = CompressedBlockBatch { blocks, record_count, secondary_data };
4589
4590 match state.q6_push((serial, batch)) {
4594 Ok(()) => {
4595 state.record_q7_push_progress();
4596 StepResult::Success
4597 }
4598 Err((serial, batch)) => {
4599 let heap_size = batch.estimate_heap_size();
4601 *worker.held_compressed_mut() = Some((serial, batch, heap_size));
4602 StepResult::OutputFull
4603 }
4604 }
4605}
4606
4607#[allow(dead_code)]
4616fn shared_try_step_write<S: OutputPipelineState>(state: &S) -> bool {
4617 let Some(mut guard) = state.output_try_lock() else {
4619 return false; };
4621
4622 let Some(ref mut writer) = *guard else {
4623 return false; };
4625
4626 while let Some((serial, batch)) = state.q6_pop() {
4628 let q7_heap = batch.estimate_heap_size() as u64;
4629 state.q6_track_pop(q7_heap);
4630 state.q6_reorder_insert(serial, batch);
4631 }
4632
4633 let mut wrote_any = false;
4635 while let Some(batch) = state.q6_reorder_try_pop_next() {
4636 for block in &batch.blocks {
4638 if let Err(e) = writer.write_all(&block.data) {
4639 state.set_error(e);
4640 return false;
4641 }
4642 }
4643 state.increment_written();
4644 wrote_any = true;
4645 }
4646
4647 wrote_any
4648}
4649
4650#[inline]
4661pub fn try_advance_held<T>(queue: &ArrayQueue<(u64, T)>, held: &mut Option<(u64, T)>) -> bool {
4662 if let Some((serial, item)) = held.take() {
4663 match queue.push((serial, item)) {
4664 Ok(()) => true,
4665 Err(returned) => {
4666 *held = Some(returned);
4667 false
4668 }
4669 }
4670 } else {
4671 true }
4673}
4674
4675#[inline]
4679pub fn try_push_or_hold<T>(
4680 queue: &ArrayQueue<(u64, T)>,
4681 serial: u64,
4682 item: T,
4683 held: &mut Option<(u64, T)>,
4684) -> StepResult {
4685 match queue.push((serial, item)) {
4686 Ok(()) => StepResult::Success,
4687 Err(returned) => {
4688 *held = Some(returned);
4689 StepResult::OutputFull
4690 }
4691 }
4692}
4693
4694pub trait ProcessPipelineState<G, P>: Send + Sync {
4703 fn process_input_pop(&self) -> Option<(u64, Vec<G>)>;
4705
4706 fn process_output_is_full(&self) -> bool;
4708
4709 fn process_output_push(&self, item: (u64, Vec<P>)) -> Result<(), (u64, Vec<P>)>;
4715
4716 fn has_error(&self) -> bool;
4718
4719 fn set_error(&self, e: io::Error);
4721
4722 fn should_apply_process_backpressure(&self) -> bool {
4726 self.process_output_is_full()
4727 }
4728
4729 fn is_draining(&self) -> bool {
4733 false
4734 }
4735}
4736
4737pub trait HasHeldProcessed<P> {
4742 fn held_processed_mut(&mut self) -> &mut Option<(u64, Vec<P>, usize)>;
4745}
4746
4747#[inline]
4756pub fn shared_try_step_process<S, W, G, P, F>(
4757 state: &S,
4758 worker: &mut W,
4759 process_fn: F,
4760) -> StepResult
4761where
4762 S: ProcessPipelineState<G, P>,
4763 W: HasHeldProcessed<P>,
4764 P: MemoryEstimate,
4765 F: Fn(G) -> io::Result<P>,
4766{
4767 let held = worker.held_processed_mut();
4769 if let Some((serial, items, _heap_size)) = held.take() {
4770 match state.process_output_push((serial, items)) {
4771 Ok(()) => {
4772 }
4774 Err((serial, items)) => {
4775 let heap_size: usize = items.iter().map(MemoryEstimate::estimate_heap_size).sum();
4777 *held = Some((serial, items, heap_size));
4778 return StepResult::OutputFull;
4779 }
4780 }
4781 }
4782
4783 if state.has_error() {
4785 return StepResult::InputEmpty;
4786 }
4787
4788 if state.should_apply_process_backpressure() {
4790 return StepResult::OutputFull;
4791 }
4792
4793 let Some((serial, batch)) = state.process_input_pop() else {
4795 return StepResult::InputEmpty;
4796 };
4797
4798 let mut results = Vec::with_capacity(batch.len());
4800 for item in batch {
4801 match process_fn(item) {
4802 Ok(processed) => results.push(processed),
4803 Err(e) => {
4804 state.set_error(e);
4805 return StepResult::InputEmpty;
4806 }
4807 }
4808 }
4809
4810 match state.process_output_push((serial, results)) {
4812 Ok(()) => StepResult::Success,
4813 Err((serial, results)) => {
4814 let heap_size: usize = results.iter().map(MemoryEstimate::estimate_heap_size).sum();
4816 *worker.held_processed_mut() = Some((serial, results, heap_size));
4817 StepResult::OutputFull
4818 }
4819 }
4820}
4821
4822pub trait SerializePipelineState<P>: Send + Sync {
4831 fn serialize_input_pop(&self) -> Option<(u64, Vec<P>)>;
4833
4834 fn serialize_output_is_full(&self) -> bool;
4836
4837 fn serialize_output_push(
4843 &self,
4844 item: (u64, SerializedBatch),
4845 ) -> Result<(), (u64, SerializedBatch)>;
4846
4847 fn has_error(&self) -> bool;
4849
4850 fn set_error(&self, e: io::Error);
4852
4853 fn record_serialized_bytes(&self, _bytes: u64) {}
4855
4856 fn record_serialized_records(&self, _count: u64) {}
4858}
4859
4860pub trait HasHeldSerialized {
4862 fn held_serialized_mut(&mut self) -> &mut Option<(u64, SerializedBatch, usize)>;
4865
4866 fn serialization_buffer_mut(&mut self) -> &mut Vec<u8>;
4868
4869 fn serialization_buffer_capacity(&self) -> usize;
4872}
4873
4874#[inline]
4883pub fn shared_try_step_serialize<S, W, P, F>(
4884 state: &S,
4885 worker: &mut W,
4886 mut serialize_fn: F,
4887) -> StepResult
4888where
4889 S: SerializePipelineState<P>,
4890 W: HasHeldSerialized + HasRecycledBuffers,
4891 F: FnMut(P, &mut Vec<u8>) -> io::Result<u64>,
4892{
4893 if let Some((serial, held_batch, _heap_size)) = worker.held_serialized_mut().take() {
4895 match state.serialize_output_push((serial, held_batch)) {
4896 Ok(()) => {
4897 }
4899 Err((serial, held_batch)) => {
4900 let heap_size = held_batch.estimate_heap_size();
4901 *worker.held_serialized_mut() = Some((serial, held_batch, heap_size));
4902 return StepResult::OutputFull;
4903 }
4904 }
4905 }
4906
4907 if state.has_error() {
4909 return StepResult::InputEmpty;
4910 }
4911
4912 if state.serialize_output_is_full() {
4914 return StepResult::OutputFull;
4915 }
4916
4917 let Some((serial, batch)) = state.serialize_input_pop() else {
4919 return StepResult::InputEmpty;
4920 };
4921
4922 let capacity = worker.serialization_buffer_capacity();
4924
4925 let total_records = {
4927 let buffer = worker.serialization_buffer_mut();
4928 buffer.clear();
4929 let mut total_records: u64 = 0;
4930
4931 for item in batch {
4932 match serialize_fn(item, buffer) {
4933 Ok(record_count) => {
4934 total_records += record_count;
4935 }
4936 Err(e) => {
4937 state.set_error(e);
4938 return StepResult::InputEmpty;
4939 }
4940 }
4941 }
4942 total_records
4943 };
4944
4945 let replacement = worker.take_or_alloc_buffer(capacity);
4947 let buffer = worker.serialization_buffer_mut();
4948 let data = std::mem::replace(buffer, replacement);
4949 state.record_serialized_bytes(data.len() as u64);
4950 state.record_serialized_records(total_records);
4951
4952 let result_batch = SerializedBatch { data, record_count: total_records, secondary_data: None };
4953
4954 match state.serialize_output_push((serial, result_batch)) {
4956 Ok(()) => StepResult::Success,
4957 Err((serial, result_batch)) => {
4958 let heap_size = result_batch.estimate_heap_size();
4959 *worker.held_serialized_mut() = Some((serial, result_batch, heap_size));
4960 StepResult::OutputFull
4961 }
4962 }
4963}
4964
4965pub trait WritePipelineState: Send + Sync {
4974 fn write_input_queue(&self) -> &ArrayQueue<(u64, CompressedBlockBatch)>;
4976
4977 fn write_reorder_buffer(&self) -> &Mutex<ReorderBuffer<CompressedBlockBatch>>;
4979
4980 fn write_output(&self) -> &Mutex<Option<Box<dyn Write + Send>>>;
4982
4983 fn has_error(&self) -> bool;
4985
4986 fn set_error(&self, e: io::Error);
4988
4989 fn record_written(&self, count: u64);
4991
4992 fn stats(&self) -> Option<&PipelineStats>;
4994}
4995
4996pub fn shared_try_step_write_new<S: WritePipelineState>(state: &S) -> StepResult {
5006 if state.has_error() {
5007 return StepResult::InputEmpty;
5008 }
5009
5010 {
5012 let mut reorder = state.write_reorder_buffer().lock();
5013 let queue = state.write_input_queue();
5014 while let Some((serial, batch)) = queue.pop() {
5015 reorder.insert(serial, batch);
5016 }
5017 }
5018 let Some(mut output_guard) = state.write_output().try_lock() else {
5022 if let Some(stats) = state.stats() {
5023 stats.record_contention(PipelineStep::Write);
5024 }
5025 return StepResult::InputEmpty;
5028 };
5029
5030 let Some(ref mut output) = *output_guard else {
5031 return StepResult::InputEmpty;
5032 };
5033
5034 let mut wrote_any = false;
5035 {
5036 let mut reorder = state.write_reorder_buffer().lock();
5037 let queue = state.write_input_queue();
5038
5039 while let Some((serial, batch)) = queue.pop() {
5041 reorder.insert(serial, batch);
5042 }
5043
5044 while let Some(batch) = reorder.try_pop_next() {
5046 for block in &batch.blocks {
5047 if let Err(e) = output.write_all(&block.data) {
5048 state.set_error(e);
5049 return StepResult::InputEmpty;
5050 }
5051 }
5052 state.record_written(batch.record_count);
5053 wrote_any = true;
5054 }
5055 }
5056
5057 if wrote_any { StepResult::Success } else { StepResult::InputEmpty }
5058}
5059
5060#[cfg(test)]
5065mod tests {
5066 use super::*;
5067
5068 #[test]
5069 fn test_stats_record_step_timing() {
5070 let stats = PipelineStats::new();
5071
5072 stats.record_step(PipelineStep::Decompress, 1_000_000); stats.record_step(PipelineStep::Decompress, 2_000_000); assert_eq!(stats.step_decompress_ns.load(Ordering::Relaxed), 3_000_000);
5077 assert_eq!(stats.step_decompress_count.load(Ordering::Relaxed), 2);
5078 }
5079
5080 #[test]
5081 fn test_stats_record_step_for_thread() {
5082 let stats = PipelineStats::new();
5083
5084 stats.record_step_for_thread(PipelineStep::Read, 500_000, Some(0));
5085 stats.record_step_for_thread(PipelineStep::Read, 500_000, Some(1));
5086
5087 assert_eq!(stats.step_read_ns.load(Ordering::Relaxed), 1_000_000);
5088 assert_eq!(stats.step_read_count.load(Ordering::Relaxed), 2);
5089 assert_eq!(stats.per_thread_step_counts[0][0].load(Ordering::Relaxed), 1);
5090 assert_eq!(stats.per_thread_step_counts[1][0].load(Ordering::Relaxed), 1);
5091 }
5092
5093 #[test]
5094 fn test_stats_record_queue_empty() {
5095 let stats = PipelineStats::new();
5096
5097 stats.record_queue_empty(1);
5098 stats.record_queue_empty(1);
5099 stats.record_queue_empty(2);
5100 stats.record_queue_empty(25); stats.record_queue_empty(7);
5102
5103 assert_eq!(stats.q1_empty.load(Ordering::Relaxed), 2);
5104 assert_eq!(stats.q2_empty.load(Ordering::Relaxed), 1);
5105 assert_eq!(stats.q2b_empty.load(Ordering::Relaxed), 1);
5106 assert_eq!(stats.q7_empty.load(Ordering::Relaxed), 1);
5107 assert_eq!(stats.q3_empty.load(Ordering::Relaxed), 0);
5108 }
5109
5110 #[test]
5111 fn test_stats_record_idle_for_thread() {
5112 let stats = PipelineStats::new();
5113
5114 stats.record_idle_for_thread(0, 100_000);
5115 stats.record_idle_for_thread(0, 200_000);
5116 stats.record_idle_for_thread(1, 50_000);
5117
5118 assert_eq!(stats.idle_yields.load(Ordering::Relaxed), 3);
5119 assert_eq!(stats.per_thread_idle_ns[0].load(Ordering::Relaxed), 300_000);
5120 assert_eq!(stats.per_thread_idle_ns[1].load(Ordering::Relaxed), 50_000);
5121 }
5122
5123 #[test]
5128 fn test_memory_tracker_new() {
5129 let tracker = MemoryTracker::new(1000);
5130 assert_eq!(tracker.current(), 0);
5131 assert_eq!(tracker.peak(), 0);
5132 assert_eq!(tracker.limit(), 1000);
5133 }
5134
5135 #[test]
5136 fn test_memory_tracker_unlimited() {
5137 let tracker = MemoryTracker::unlimited();
5138 assert_eq!(tracker.limit(), 0);
5139 assert!(tracker.try_add(1_000_000));
5141 assert!(tracker.try_add(1_000_000_000));
5142 assert_eq!(tracker.current(), 1_001_000_000);
5143 }
5144
5145 #[test]
5146 fn test_memory_tracker_try_add_under_limit() {
5147 let tracker = MemoryTracker::new(1000);
5148 assert!(tracker.try_add(500));
5149 assert_eq!(tracker.current(), 500);
5150 }
5151
5152 #[test]
5153 fn test_memory_tracker_try_add_at_limit() {
5154 let tracker = MemoryTracker::new(1000);
5155 assert!(tracker.try_add(1000));
5156 assert!(!tracker.try_add(1));
5158 }
5159
5160 #[test]
5161 fn test_memory_tracker_try_add_single_exceeds() {
5162 let tracker = MemoryTracker::new(1000);
5165 assert!(tracker.try_add(500)); assert!(tracker.try_add(600)); assert_eq!(tracker.current(), 1100);
5168 assert!(!tracker.try_add(1));
5170 }
5171
5172 #[test]
5173 fn test_memory_tracker_remove_saturating() {
5174 let tracker = MemoryTracker::new(1000);
5175 tracker.try_add(100);
5176 tracker.remove(200);
5178 assert_eq!(tracker.current(), 0);
5179 }
5180
5181 #[test]
5182 fn test_memory_tracker_peak_tracking() {
5183 let tracker = MemoryTracker::new(0); tracker.try_add(100);
5185 tracker.try_add(200);
5186 assert_eq!(tracker.peak(), 300);
5187 tracker.remove(250);
5188 assert_eq!(tracker.current(), 50);
5189 assert_eq!(tracker.peak(), 300);
5191 }
5192
5193 #[test]
5194 fn test_memory_tracker_is_at_limit() {
5195 let tracker = MemoryTracker::new(1000);
5198 assert!(!tracker.is_at_limit());
5199 tracker.try_add(999);
5200 assert!(!tracker.is_at_limit());
5201 tracker.try_add(1);
5202 assert!(tracker.is_at_limit());
5203 }
5204
5205 #[test]
5206 fn test_memory_tracker_drain_threshold() {
5207 let tracker = MemoryTracker::new(1000);
5210 tracker.try_add(1000);
5211 assert!(!tracker.is_below_drain_threshold()); tracker.remove(501);
5213 assert!(tracker.is_below_drain_threshold()); }
5215
5216 #[test]
5217 fn test_memory_tracker_default_is_unlimited() {
5218 let tracker = MemoryTracker::default();
5219 assert_eq!(tracker.limit(), 0);
5220 assert!(tracker.try_add(1_000_000));
5222 }
5223
5224 #[test]
5229 fn test_reorder_buffer_state_new() {
5230 let state = ReorderBufferState::new(1000);
5231 assert_eq!(state.get_next_seq(), 0);
5232 assert_eq!(state.get_heap_bytes(), 0);
5233 assert_eq!(state.get_memory_limit(), 1000);
5234 }
5235
5236 #[test]
5237 fn test_reorder_buffer_state_can_proceed_next_seq() {
5238 let state = ReorderBufferState::new(100);
5240 state.add_heap_bytes(10_000); assert!(state.can_proceed(0));
5243 }
5244
5245 #[test]
5246 fn test_reorder_buffer_state_can_proceed_over_limit() {
5247 let state = ReorderBufferState::new(1000);
5249 state.add_heap_bytes(500);
5252 assert!(!state.can_proceed(1));
5254 assert!(state.can_proceed(0));
5256 }
5257
5258 #[test]
5259 fn test_reorder_buffer_state_is_memory_high() {
5260 let state = ReorderBufferState::new(1000);
5261 assert!(!state.is_memory_high());
5262 state.add_heap_bytes(1000);
5263 assert!(state.is_memory_high());
5264 }
5265
5266 #[test]
5267 fn test_reorder_buffer_state_is_memory_drained() {
5268 let state = ReorderBufferState::new(1000);
5270 state.add_heap_bytes(1000);
5271 assert!(!state.is_memory_drained()); state.sub_heap_bytes(501);
5273 assert!(state.is_memory_drained()); }
5275
5276 #[test]
5277 fn test_reorder_buffer_state_effective_limit() {
5278 let state_zero = ReorderBufferState::new(0);
5280 assert!(!state_zero.is_memory_high()); let state_small = ReorderBufferState::new(100);
5285 state_small.add_heap_bytes(100);
5286 assert!(state_small.is_memory_high()); }
5288
5289 #[test]
5290 fn test_reorder_buffer_state_add_sub_heap_bytes() {
5291 let state = ReorderBufferState::new(0);
5292 state.add_heap_bytes(100);
5293 assert_eq!(state.get_heap_bytes(), 100);
5294 state.add_heap_bytes(50);
5295 assert_eq!(state.get_heap_bytes(), 150);
5296 state.sub_heap_bytes(30);
5297 assert_eq!(state.get_heap_bytes(), 120);
5298 }
5299
5300 #[test]
5305 fn test_group_key_single() {
5306 let key = GroupKey::single(1, 100, 0, 5, 0, 42);
5307 assert_eq!(key.ref_id1, 1);
5308 assert_eq!(key.pos1, 100);
5309 assert_eq!(key.strand1, 0);
5310 assert_eq!(key.ref_id2, GroupKey::UNKNOWN_REF);
5311 assert_eq!(key.pos2, GroupKey::UNKNOWN_POS);
5312 assert_eq!(key.strand2, GroupKey::UNKNOWN_STRAND);
5313 assert_eq!(key.library_idx, 5);
5314 assert_eq!(key.cell_hash, 0);
5315 assert_eq!(key.name_hash, 42);
5316 }
5317
5318 #[test]
5319 fn test_group_key_paired() {
5320 let key = GroupKey::paired(1, 100, 0, 2, 200, 1, 3, 0, 99);
5323 assert_eq!(key.ref_id1, 1);
5324 assert_eq!(key.pos1, 100);
5325 assert_eq!(key.strand1, 0);
5326 assert_eq!(key.ref_id2, 2);
5327 assert_eq!(key.pos2, 200);
5328 assert_eq!(key.strand2, 1);
5329 }
5330
5331 #[test]
5332 fn test_group_key_paired_swap() {
5333 let key = GroupKey::paired(5, 500, 1, 1, 100, 0, 3, 0, 99);
5336 assert_eq!(key.ref_id1, 1);
5337 assert_eq!(key.pos1, 100);
5338 assert_eq!(key.strand1, 0);
5339 assert_eq!(key.ref_id2, 5);
5340 assert_eq!(key.pos2, 500);
5341 assert_eq!(key.strand2, 1);
5342 }
5343
5344 #[test]
5345 fn test_group_key_position_key() {
5346 let key = GroupKey::single(1, 100, 0, 5, 7, 42);
5347 let pk = key.position_key();
5348 assert_eq!(
5350 pk,
5351 (
5352 1,
5353 100,
5354 0,
5355 GroupKey::UNKNOWN_REF,
5356 GroupKey::UNKNOWN_POS,
5357 GroupKey::UNKNOWN_STRAND,
5358 5,
5359 7
5360 )
5361 );
5362 }
5363
5364 #[test]
5365 fn test_group_key_ord_by_position() {
5366 let key_a = GroupKey::single(1, 100, 0, 0, 0, 0);
5367 let key_b = GroupKey::single(2, 50, 0, 0, 0, 0);
5368 assert!(key_a < key_b);
5370 }
5371
5372 #[test]
5373 fn test_group_key_ord_tiebreak_name_hash() {
5374 let key_a = GroupKey::single(1, 100, 0, 0, 0, 10);
5375 let key_b = GroupKey::single(1, 100, 0, 0, 0, 20);
5376 assert!(key_a < key_b);
5378 }
5379
5380 #[test]
5381 fn test_group_key_default() {
5382 let key = GroupKey::default();
5383 assert_eq!(key.ref_id1, GroupKey::UNKNOWN_REF);
5384 assert_eq!(key.pos1, GroupKey::UNKNOWN_POS);
5385 assert_eq!(key.strand1, GroupKey::UNKNOWN_STRAND);
5386 assert_eq!(key.ref_id2, GroupKey::UNKNOWN_REF);
5387 assert_eq!(key.pos2, GroupKey::UNKNOWN_POS);
5388 assert_eq!(key.strand2, GroupKey::UNKNOWN_STRAND);
5389 assert_eq!(key.library_idx, 0);
5390 assert_eq!(key.cell_hash, 0);
5391 assert_eq!(key.name_hash, 0);
5392 }
5393
5394 #[test]
5395 fn test_group_key_eq() {
5396 let key_a = GroupKey::single(1, 100, 0, 5, 0, 42);
5397 let key_b = GroupKey::single(1, 100, 0, 5, 0, 42);
5398 assert_eq!(key_a, key_b);
5399 }
5400
5401 #[test]
5402 fn test_group_key_paired_same_position() {
5403 let key = GroupKey::paired(1, 100, 1, 1, 100, 0, 0, 0, 0);
5405 assert_eq!(key.ref_id1, 1);
5407 assert_eq!(key.pos1, 100);
5408 assert_eq!(key.strand1, 0);
5409 assert_eq!(key.strand2, 1);
5410 }
5411
5412 #[test]
5413 fn test_group_key_hash() {
5414 use std::collections::HashSet;
5415 let key_a = GroupKey::single(1, 100, 0, 0, 0, 42);
5416 let key_b = GroupKey::single(1, 100, 0, 0, 0, 42);
5417 let key_c = GroupKey::single(2, 200, 1, 0, 0, 99);
5418 let mut set = HashSet::new();
5419 set.insert(key_a);
5420 assert!(set.contains(&key_b));
5421 set.insert(key_c);
5422 assert_eq!(set.len(), 2);
5423 }
5424
5425 #[test]
5430 fn test_pipeline_step_is_exclusive() {
5431 assert!(PipelineStep::Read.is_exclusive());
5432 assert!(!PipelineStep::Decompress.is_exclusive());
5433 assert!(PipelineStep::FindBoundaries.is_exclusive());
5434 assert!(!PipelineStep::Decode.is_exclusive());
5435 assert!(PipelineStep::Group.is_exclusive());
5436 assert!(!PipelineStep::Process.is_exclusive());
5437 assert!(!PipelineStep::Serialize.is_exclusive());
5438 assert!(!PipelineStep::Compress.is_exclusive());
5439 assert!(PipelineStep::Write.is_exclusive());
5440 }
5441
5442 #[test]
5443 fn test_pipeline_step_all() {
5444 let all = PipelineStep::all();
5445 assert_eq!(all.len(), 9);
5446 assert_eq!(all[0], PipelineStep::Read);
5447 assert_eq!(all[1], PipelineStep::Decompress);
5448 assert_eq!(all[2], PipelineStep::FindBoundaries);
5449 assert_eq!(all[3], PipelineStep::Decode);
5450 assert_eq!(all[4], PipelineStep::Group);
5451 assert_eq!(all[5], PipelineStep::Process);
5452 assert_eq!(all[6], PipelineStep::Serialize);
5453 assert_eq!(all[7], PipelineStep::Compress);
5454 assert_eq!(all[8], PipelineStep::Write);
5455 }
5456
5457 #[test]
5458 fn test_pipeline_step_from_index() {
5459 assert_eq!(PipelineStep::from_index(0), PipelineStep::Read);
5460 assert_eq!(PipelineStep::from_index(1), PipelineStep::Decompress);
5461 assert_eq!(PipelineStep::from_index(2), PipelineStep::FindBoundaries);
5462 assert_eq!(PipelineStep::from_index(3), PipelineStep::Decode);
5463 assert_eq!(PipelineStep::from_index(4), PipelineStep::Group);
5464 assert_eq!(PipelineStep::from_index(5), PipelineStep::Process);
5465 assert_eq!(PipelineStep::from_index(6), PipelineStep::Serialize);
5466 assert_eq!(PipelineStep::from_index(7), PipelineStep::Compress);
5467 assert_eq!(PipelineStep::from_index(8), PipelineStep::Write);
5468 }
5469
5470 #[test]
5471 fn test_pipeline_step_short_name() {
5472 assert_eq!(PipelineStep::Read.short_name(), "Rd");
5473 assert_eq!(PipelineStep::Decompress.short_name(), "Dc");
5474 assert_eq!(PipelineStep::FindBoundaries.short_name(), "Fb");
5475 assert_eq!(PipelineStep::Decode.short_name(), "De");
5476 assert_eq!(PipelineStep::Group.short_name(), "Gr");
5477 assert_eq!(PipelineStep::Process.short_name(), "Pr");
5478 assert_eq!(PipelineStep::Serialize.short_name(), "Se");
5479 assert_eq!(PipelineStep::Compress.short_name(), "Co");
5480 assert_eq!(PipelineStep::Write.short_name(), "Wr");
5481 for step in PipelineStep::all() {
5483 assert_eq!(step.short_name().len(), 2);
5484 }
5485 }
5486
5487 #[test]
5492 fn test_step_result_is_success() {
5493 assert!(StepResult::Success.is_success());
5494 assert!(!StepResult::OutputFull.is_success());
5495 assert!(!StepResult::InputEmpty.is_success());
5496 }
5497
5498 #[test]
5499 fn test_step_result_variants() {
5500 let s = StepResult::Success;
5502 let o = StepResult::OutputFull;
5503 let i = StepResult::InputEmpty;
5504 assert_ne!(s, o);
5505 assert_ne!(s, i);
5506 assert_ne!(o, i);
5507 }
5508
5509 #[test]
5514 fn test_raw_block_batch_new_empty() {
5515 let batch = RawBlockBatch::new();
5516 assert!(batch.is_empty());
5517 assert_eq!(batch.len(), 0);
5518 }
5519
5520 #[test]
5521 fn test_raw_block_batch_with_capacity() {
5522 let batch = RawBlockBatch::with_capacity(32);
5523 assert!(batch.is_empty());
5524 assert!(batch.blocks.capacity() >= 32);
5525 }
5526
5527 #[test]
5528 fn test_compressed_block_batch_new() {
5529 let batch = CompressedBlockBatch::new();
5530 assert!(batch.is_empty());
5531 assert_eq!(batch.len(), 0);
5532 assert_eq!(batch.record_count, 0);
5533 }
5534
5535 #[test]
5536 fn test_compressed_block_batch_clear() {
5537 let mut batch = CompressedBlockBatch::new();
5538 batch.record_count = 42;
5539 batch.secondary_data = Some(vec![1, 2, 3]);
5540 batch.clear();
5541 assert!(batch.is_empty());
5542 assert_eq!(batch.record_count, 0);
5543 assert!(batch.secondary_data.is_none());
5544 }
5545
5546 #[test]
5547 fn test_bgzf_batch_config_default() {
5548 let config = BgzfBatchConfig::default();
5549 assert_eq!(config.blocks_per_batch, 16);
5550 assert_eq!(config.compression_level, 6);
5551 }
5552
5553 #[test]
5554 fn test_bgzf_batch_config_new() {
5555 let config = BgzfBatchConfig::new(64);
5556 assert_eq!(config.blocks_per_batch, 64);
5557 assert_eq!(config.compression_level, 6);
5559 }
5560
5561 #[test]
5562 fn test_decompressed_batch_new_empty() {
5563 let batch = DecompressedBatch::new();
5564 assert!(batch.is_empty());
5565 assert!(batch.data.is_empty());
5566 }
5567
5568 #[test]
5569 fn test_serialized_batch_clear() {
5570 let mut batch = SerializedBatch::new();
5571 batch.data.extend_from_slice(&[1, 2, 3]);
5572 batch.record_count = 10;
5573 batch.secondary_data = Some(vec![4, 5, 6]);
5574 batch.clear();
5575 assert!(batch.is_empty());
5576 assert_eq!(batch.record_count, 0);
5577 assert!(batch.secondary_data.is_none());
5578 }
5579
5580 #[test]
5585 fn test_pipeline_config_new_defaults() {
5586 let config = PipelineConfig::new(4, 6);
5587 assert_eq!(config.num_threads, 4);
5588 assert_eq!(config.compression_level, 6);
5589 assert_eq!(config.queue_capacity, 64);
5590 assert_eq!(config.batch_size, 1);
5591 assert_eq!(config.queue_memory_limit, 0);
5592 assert!(!config.collect_stats);
5593 }
5594
5595 #[test]
5596 fn test_pipeline_config_builder_chain() {
5597 let config = PipelineConfig::new(4, 6)
5598 .with_compression_level(9)
5599 .with_batch_size(100)
5600 .with_stats(true)
5601 .with_queue_memory_limit(1_000_000);
5602 assert_eq!(config.compression_level, 9);
5603 assert_eq!(config.batch_size, 100);
5604 assert!(config.collect_stats);
5605 assert_eq!(config.queue_memory_limit, 1_000_000);
5606 }
5607
5608 #[test]
5609 fn test_pipeline_config_auto_tuned_1_thread() {
5610 let config = PipelineConfig::auto_tuned(1, 6);
5611 assert_eq!(config.num_threads, 1);
5612 assert_eq!(config.queue_capacity, 64);
5614 }
5615
5616 #[test]
5617 fn test_pipeline_config_auto_tuned_8_threads() {
5618 let config = PipelineConfig::auto_tuned(8, 6);
5619 assert_eq!(config.num_threads, 8);
5620 assert_eq!(config.queue_capacity, 128);
5622 assert_eq!(config.blocks_per_read_batch, 48);
5624 }
5625
5626 #[test]
5627 fn test_pipeline_config_auto_tuned_32_threads() {
5628 let config = PipelineConfig::auto_tuned(32, 6);
5629 assert_eq!(config.num_threads, 32);
5630 assert_eq!(config.queue_capacity, 256);
5632 }
5633
5634 #[test]
5635 fn test_pipeline_config_with_compression_level() {
5636 let config = PipelineConfig::new(4, 6).with_compression_level(12);
5637 assert_eq!(config.compression_level, 12);
5638 }
5639
5640 #[test]
5641 fn test_pipeline_config_with_batch_size_min_1() {
5642 let config = PipelineConfig::new(4, 6).with_batch_size(0);
5643 assert_eq!(config.batch_size, 1);
5645 }
5646
5647 #[test]
5648 fn test_pipeline_config_with_queue_memory_limit() {
5649 let config = PipelineConfig::new(4, 6).with_queue_memory_limit(500_000_000);
5650 assert_eq!(config.queue_memory_limit, 500_000_000);
5651 }
5652
5653 #[test]
5658 fn test_pipeline_validation_error_display_empty() {
5659 let err = PipelineValidationError {
5660 non_empty_queues: vec![],
5661 counter_mismatches: vec![],
5662 leaked_heap_bytes: 0,
5663 };
5664 let display = format!("{err}");
5665 assert!(display.contains("Pipeline validation failed"));
5667 }
5668
5669 #[test]
5670 fn test_pipeline_validation_error_display_full() {
5671 let err = PipelineValidationError {
5672 non_empty_queues: vec!["Q1".to_string(), "Q2".to_string()],
5673 counter_mismatches: vec!["read_count != write_count".to_string()],
5674 leaked_heap_bytes: 1024,
5675 };
5676 let display = format!("{err}");
5677 assert!(display.contains("Pipeline validation failed"));
5678 assert!(display.contains("Q1"));
5679 assert!(display.contains("Q2"));
5680 assert!(display.contains("read_count != write_count"));
5681 assert!(display.contains("1024"));
5682 }
5683
5684 #[test]
5689 fn test_extract_panic_message_str() {
5690 let payload: Box<dyn std::any::Any + Send> = Box::new("something went wrong");
5691 let msg = extract_panic_message(payload);
5692 assert_eq!(msg, "something went wrong");
5693 }
5694
5695 #[test]
5696 fn test_extract_panic_message_string() {
5697 let payload: Box<dyn std::any::Any + Send> = Box::new(String::from("an error occurred"));
5698 let msg = extract_panic_message(payload);
5699 assert_eq!(msg, "an error occurred");
5700 }
5701
5702 #[test]
5703 fn test_extract_panic_message_other() {
5704 let payload: Box<dyn std::any::Any + Send> = Box::new(42_i32);
5705 let msg = extract_panic_message(payload);
5706 assert_eq!(msg, "Unknown panic");
5707 }
5708
5709 #[test]
5714 fn test_worker_core_state_initial_values() {
5715 use super::super::scheduler::SchedulerStrategy;
5716 let state = WorkerCoreState::new(6, 0, 4, SchedulerStrategy::default(), ActiveSteps::all());
5717 assert_eq!(state.backoff_us, MIN_BACKOFF_US);
5718 }
5719
5720 #[test]
5721 fn test_worker_core_state_reset_backoff() {
5722 use super::super::scheduler::SchedulerStrategy;
5723 let mut state =
5724 WorkerCoreState::new(6, 0, 4, SchedulerStrategy::default(), ActiveSteps::all());
5725 state.increase_backoff();
5726 assert!(state.backoff_us > MIN_BACKOFF_US);
5727 state.reset_backoff();
5728 assert_eq!(state.backoff_us, MIN_BACKOFF_US);
5729 }
5730
5731 #[test]
5732 fn test_worker_core_state_increase_backoff() {
5733 use super::super::scheduler::SchedulerStrategy;
5734 let mut state =
5735 WorkerCoreState::new(6, 0, 4, SchedulerStrategy::default(), ActiveSteps::all());
5736 assert_eq!(state.backoff_us, MIN_BACKOFF_US); state.increase_backoff();
5738 assert_eq!(state.backoff_us, MIN_BACKOFF_US * 2); state.increase_backoff();
5740 assert_eq!(state.backoff_us, MIN_BACKOFF_US * 4); for _ in 0..20 {
5743 state.increase_backoff();
5744 }
5745 assert_eq!(state.backoff_us, MAX_BACKOFF_US);
5746 }
5747
5748 struct TestProcessed {
5753 size: usize,
5754 }
5755
5756 impl MemoryEstimate for TestProcessed {
5757 fn estimate_heap_size(&self) -> usize {
5758 self.size
5759 }
5760 }
5761
5762 #[test]
5763 fn test_output_queues_new() {
5764 let output: Box<dyn std::io::Write + Send> = Box::new(Vec::<u8>::new());
5765 let queues: OutputPipelineQueues<(), TestProcessed> =
5766 OutputPipelineQueues::new(16, output, None, "test");
5767 assert!(queues.groups.is_empty());
5768 assert!(queues.processed.is_empty());
5769 assert!(queues.serialized.is_empty());
5770 assert!(queues.compressed.is_empty());
5771 }
5772
5773 #[test]
5774 fn test_output_queues_set_take_error() {
5775 let output: Box<dyn std::io::Write + Send> = Box::new(Vec::<u8>::new());
5776 let queues: OutputPipelineQueues<(), TestProcessed> =
5777 OutputPipelineQueues::new(16, output, None, "test");
5778 assert!(!queues.has_error());
5779 queues.set_error(io::Error::other("test error"));
5780 assert!(queues.has_error());
5781 let err = queues.take_error();
5782 assert!(err.is_some());
5783 assert_eq!(err.unwrap().to_string(), "test error");
5784 }
5785
5786 #[test]
5787 fn test_output_queues_draining() {
5788 let output: Box<dyn std::io::Write + Send> = Box::new(Vec::<u8>::new());
5789 let queues: OutputPipelineQueues<(), TestProcessed> =
5790 OutputPipelineQueues::new(16, output, None, "test");
5791 assert!(!queues.is_draining());
5792 queues.set_draining(true);
5793 assert!(queues.is_draining());
5794 }
5795
5796 #[test]
5797 fn test_output_queues_queue_depths_empty() {
5798 let output: Box<dyn std::io::Write + Send> = Box::new(Vec::<u8>::new());
5799 let queues: OutputPipelineQueues<(), TestProcessed> =
5800 OutputPipelineQueues::new(16, output, None, "test");
5801 let depths = queues.queue_depths();
5802 assert_eq!(depths.groups, 0);
5803 assert_eq!(depths.processed, 0);
5804 assert_eq!(depths.serialized, 0);
5805 assert_eq!(depths.compressed, 0);
5806 }
5807
5808 #[test]
5809 fn test_output_queues_are_queues_empty() {
5810 let output: Box<dyn std::io::Write + Send> = Box::new(Vec::<u8>::new());
5811 let queues: OutputPipelineQueues<(), TestProcessed> =
5812 OutputPipelineQueues::new(16, output, None, "test");
5813 assert!(queues.are_queues_empty());
5814 }
5815
5816 #[test]
5821 fn test_memory_estimate_unit() {
5822 let unit = ();
5823 assert_eq!(unit.estimate_heap_size(), 0);
5824 }
5825
5826 #[test]
5827 fn test_decoded_record_record_accessor() {
5828 let rec = RecordBuf::default();
5829 let parsed = DecodedRecord::new(rec, GroupKey::default());
5830 assert!(parsed.record().is_some());
5831 assert!(parsed.raw_bytes().is_none());
5832
5833 let raw = DecodedRecord::from_raw_bytes(vec![0u8; 32], GroupKey::default());
5834 assert!(raw.record().is_none());
5835 assert!(raw.raw_bytes().is_some());
5836 }
5837
5838 #[test]
5839 fn test_memory_estimate_serialized_batch() {
5840 let mut batch = SerializedBatch::new();
5841 batch.data.reserve(1024);
5842 assert!(batch.estimate_heap_size() >= 1024);
5843 }
5844
5845 #[test]
5846 fn test_memory_estimate_decompressed_batch() {
5847 let mut batch = DecompressedBatch::new();
5848 batch.data.reserve(2048);
5849 assert!(batch.estimate_heap_size() >= 2048);
5850 }
5851
5852 #[test]
5853 fn test_memory_estimate_vec_record_buf() {
5854 use crate::sam::builder::RecordBuilder;
5855 use noodles::sam::alignment::record_buf::RecordBuf;
5856
5857 let record = RecordBuilder::new().sequence("ACGT").qualities(&[30, 30, 30, 30]).build();
5858 let mut records: Vec<RecordBuf> = Vec::with_capacity(10);
5859 records.push(record);
5860
5861 let estimate = records.estimate_heap_size();
5862 let vec_overhead = 10 * std::mem::size_of::<RecordBuf>();
5864 assert!(
5865 estimate >= vec_overhead,
5866 "estimate {estimate} should include Vec<RecordBuf> overhead {vec_overhead}"
5867 );
5868 }
5869
5870 #[test]
5871 fn test_serialized_batch_memory_estimate_with_secondary() {
5872 let batch = SerializedBatch {
5873 data: vec![0u8; 100],
5874 record_count: 5,
5875 secondary_data: Some(vec![0u8; 50]),
5876 };
5877 let estimate = batch.estimate_heap_size();
5878 assert!(
5879 estimate >= 150,
5880 "Should include both primary ({}) and secondary data, got {estimate}",
5881 batch.data.capacity()
5882 );
5883 }
5884
5885 #[test]
5886 fn test_serialized_batch_memory_estimate_without_secondary() {
5887 let batch = SerializedBatch { data: vec![0u8; 100], record_count: 5, secondary_data: None };
5888 let estimate = batch.estimate_heap_size();
5889 assert!(estimate >= 100);
5890 assert!(estimate < 150, "Should not include phantom secondary data, got {estimate}");
5891 }
5892
5893 #[test]
5894 fn test_compressed_block_batch_memory_with_secondary() {
5895 let batch = CompressedBlockBatch {
5896 blocks: vec![],
5897 record_count: 0,
5898 secondary_data: Some(vec![0u8; 200]),
5899 };
5900 assert!(
5901 batch.estimate_heap_size() >= 200,
5902 "Should include secondary data, got {}",
5903 batch.estimate_heap_size()
5904 );
5905 }
5906}