ragc_core/
worker.rs

1// Worker thread implementation for streaming compression pipeline
2// Matches C++ AGC's start_compressing_threads (agc_compressor.cpp:1097-1270)
3
4use crate::contig_compression::{compress_contig, CompressionContext};
5use crate::priority_queue::{BoundedPriorityQueue, PopResult};
6use crate::segment_buffer::BufferedSegments;
7use crate::segment_compression::compress_reference_segment;
8use crate::task::{ContigProcessingStage, Task};
9use crate::zstd_pool::compress_segment_pooled;
10use ahash::{AHashMap, AHashSet};
11use ragc_common::{Archive, Contig};
12use std::sync::atomic::{AtomicUsize, Ordering};
13use std::sync::{Arc, Barrier, Mutex};
14
15/// Segment placement info for collection metadata
16///
17/// Matches C++ AGC's segment placement tracking (agc_compressor.cpp:974-1050)
18#[derive(Debug, Clone)]
19struct SegmentPlacement {
20    sample_name: String,
21    contig_name: String,
22    place: usize, // Segment index within contig (seg_part_no)
23    group_id: u32,
24    in_group_id: u32,
25    is_rev_comp: bool,
26    raw_length: u32, // Uncompressed segment length
27}
28
29/// Segment group manager matching C++ AGC's CSegment
30///
31/// Manages a group of segments with the same (kmer1, kmer2) pair:
32/// - Stores reference segment (first in group)
33/// - Buffers subsequent delta segments
34/// - Writes to archive streams
35struct SegmentGroup {
36    group_id: u32,
37    stream_id: usize,          // Delta stream for packed segments
38    ref_stream_id: usize,      // Reference stream for first segment
39    reference: Option<Contig>, // First segment (reference for LZ encoding)
40    ref_written: bool,         // Whether reference has been written
41    in_group_counter: u32,     // Counter for in_group_id assignment
42}
43
44impl SegmentGroup {
45    fn new(group_id: u32, stream_id: usize, ref_stream_id: usize) -> Self {
46        SegmentGroup {
47            group_id,
48            stream_id,
49            ref_stream_id,
50            reference: None,
51            ref_written: false,
52            in_group_counter: 0,
53        }
54    }
55
56    /// Add a segment to this group
57    ///
58    /// The first segment becomes the reference, subsequent segments are delta-encoded.
59    /// Returns in_group_id (0 for reference, 1+ for delta segments)
60    fn add_segment(&mut self, seg_data: &[u8], archive: &mut Archive) -> anyhow::Result<u32> {
61        if self.reference.is_none() {
62            // First segment - store as reference with adaptive compression
63            let seg_vec = seg_data.to_vec();
64            self.reference = Some(seg_vec.clone());
65
66            // Compress reference segment (chooses between plain ZSTD or tuple packing)
67            let (compressed, marker) = compress_reference_segment(&seg_vec)?;
68
69            // Write compressed reference to archive with marker byte as metadata
70            archive.add_part(self.ref_stream_id, &compressed, marker as u64)?;
71            self.ref_written = true;
72
73            let in_group_id = self.in_group_counter;
74            self.in_group_counter += 1;
75            Ok(in_group_id) // in_group_id = 0 for reference
76        } else {
77            // Subsequent segment - delta encode against reference
78            // TODO: Implement LZ encoding - for now just compress with ZSTD
79            let seg_vec = seg_data.to_vec();
80            let compressed = compress_segment_pooled(&seg_vec, 17)?;
81
82            // Write compressed delta to archive (marker 0 = plain ZSTD)
83            archive.add_part(self.stream_id, &compressed, 0)?;
84
85            let in_group_id = self.in_group_counter;
86            self.in_group_counter += 1;
87            Ok(in_group_id)
88        }
89    }
90}
91
92/// Shared state accessible to all worker threads
93///
94/// This matches C++ AGC's shared variables captured by lambda (agc_compressor.cpp:1099):
95/// - Atomic counters (processed_bases, processed_samples)
96/// - Mutexes for shared collections (v_raw_contigs, vv_fallback_minimizers, etc.)
97/// - Archive writer, collection descriptor, etc.
98pub struct SharedCompressorState {
99    /// Total bases processed (for progress tracking)
100    pub processed_bases: AtomicUsize,
101
102    /// Number of samples processed (for progress tracking)
103    pub processed_samples: AtomicUsize,
104
105    /// Contigs that failed to compress well (need adaptive splitters)
106    /// Matches C++ AGC's v_raw_contigs (protected by mtx_raw_contigs)
107    pub raw_contigs: Mutex<Vec<(String, String, Vec<u8>)>>,
108
109    /// Verbosity level (0 = quiet, 1 = normal, 2 = verbose)
110    pub verbosity: usize,
111
112    /// Buffered segments (KNOWN + NEW)
113    /// Matches C++ AGC's CBufferedSegPart buffered_seg_part
114    pub buffered_segments: Arc<Mutex<BufferedSegments>>,
115
116    /// Splitter k-mers (exact set)
117    /// Matches C++ AGC's hs_splitters
118    pub splitters: Arc<Mutex<AHashSet<u64>>>,
119
120    /// Bloom filter for splitters (fast probabilistic check)
121    /// Matches C++ AGC's bloom_splitters
122    pub bloom_splitters: Arc<Mutex<crate::bloom_filter::BloomFilter>>,
123
124    /// Per-thread vectors for accumulating new splitters (adaptive mode)
125    /// Matches C++ AGC's vv_splitters (agc_compressor.h:721)
126    /// Each worker thread accumulates splitters, then merges at barrier
127    pub vv_splitters: Mutex<Vec<Vec<u64>>>,
128
129    /// Reference genome singleton k-mers (for adaptive mode exclusion)
130    /// Matches C++ AGC's v_candidate_kmers (agc_compressor.h:708)
131    /// Sorted for efficient set_difference operations
132    pub v_candidate_kmers: Vec<u64>,
133
134    /// Reference genome duplicated k-mers (for adaptive mode exclusion)
135    /// Matches C++ AGC's v_duplicated_kmers (agc_compressor.h:710)
136    /// Sorted for efficient set_difference operations
137    pub v_duplicated_kmers: Vec<u64>,
138
139    /// K-mer length for segmentation
140    pub kmer_length: usize,
141
142    /// Adaptive compression mode (find new splitters for hard contigs)
143    pub adaptive_mode: bool,
144
145    /// Map: (kmer1, kmer2) → group_id
146    /// Matches C++ AGC's map_segments
147    /// Lower group_id wins when same (k1, k2) pair (earlier samples are reference)
148    pub map_segments: Arc<Mutex<AHashMap<(u64, u64), u32>>>,
149
150    /// Map: kmer → Vec<kmer> (sorted terminators)
151    /// Matches C++ AGC's map_segments_terminators
152    /// Used for split detection: find shared terminators between k1 and k2
153    pub map_segments_terminators: Arc<Mutex<AHashMap<u64, Vec<u64>>>>,
154
155    /// Concatenated genomes mode (treat all contigs as one sample)
156    pub concatenated_genomes: bool,
157
158    /// Total number of segment groups (assigned group IDs)
159    /// Matches C++ AGC's no_segments
160    pub no_segments: Arc<Mutex<u32>>,
161
162    /// Number of raw groups (groups without LZ encoding)
163    /// Matches C++ AGC's no_raw_groups
164    pub no_raw_groups: u32,
165
166    /// Archive writer for segment output
167    /// Matches C++ AGC's out_archive
168    pub archive: Option<Arc<Mutex<Archive>>>,
169
170    /// Segment groups indexed by group_id
171    /// Matches C++ AGC's v_segments (vector<CSegment*>)
172    /// Lazily created when first segment arrives for that group
173    pub v_segments: Arc<Mutex<Vec<Option<SegmentGroup>>>>,
174
175    /// Collection metadata for samples/contigs/segments
176    /// Matches C++ AGC's collection_desc
177    pub collection: Option<Arc<Mutex<ragc_common::CollectionV3>>>,
178
179    /// Auxiliary queue for adaptive mode (re-enqueue hard contigs)
180    /// Matches C++ AGC's pq_contigs_desc_aux
181    pub aux_queue: Arc<BoundedPriorityQueue<Task>>,
182
183    /// Working queue pointer (switches between main and aux)
184    /// Matches C++ AGC's pq_contigs_desc_working
185    pub working_queue: Mutex<Arc<BoundedPriorityQueue<Task>>>,
186    // TODO: Add more shared state as needed:
187    // - vv_fallback_minimizers: Mutex<Vec<Vec<Vec<u64>>>>
188    // - vv_splitters: Mutex<Vec<Vec<u64>>>
189}
190
191impl SharedCompressorState {
192    /// Create new shared state
193    pub fn new(
194        verbosity: usize,
195        kmer_length: usize,
196        adaptive_mode: bool,
197        concatenated_genomes: bool,
198        no_raw_groups: u32,
199        main_queue: Arc<BoundedPriorityQueue<Task>>,
200    ) -> Self {
201        // Create bloom filter sized for expected splitters
202        // C++ AGC uses ~10K splitters, allocate 8 bits/item = 80K bits = 10KB
203        let bloom_filter = crate::bloom_filter::BloomFilter::new(80 * 1024);
204
205        // Create auxiliary queue for adaptive mode (unlimited capacity)
206        let aux_queue = Arc::new(BoundedPriorityQueue::new(1, usize::MAX));
207
208        SharedCompressorState {
209            processed_bases: AtomicUsize::new(0),
210            processed_samples: AtomicUsize::new(0),
211            raw_contigs: Mutex::new(Vec::new()),
212            verbosity,
213            buffered_segments: Arc::new(Mutex::new(BufferedSegments::new(no_raw_groups as usize))),
214            splitters: Arc::new(Mutex::new(AHashSet::new())),
215            bloom_splitters: Arc::new(Mutex::new(bloom_filter)),
216            vv_splitters: Mutex::new(Vec::new()),
217            v_candidate_kmers: Vec::new(),
218            v_duplicated_kmers: Vec::new(),
219            kmer_length,
220            adaptive_mode,
221            map_segments: Arc::new(Mutex::new(AHashMap::new())),
222            map_segments_terminators: Arc::new(Mutex::new(AHashMap::new())),
223            concatenated_genomes,
224            no_segments: Arc::new(Mutex::new(0)),
225            no_raw_groups,
226            archive: None, // Set later when archive is created
227            v_segments: Arc::new(Mutex::new(Vec::new())),
228            collection: None, // Set later when collection is created
229            aux_queue,
230            working_queue: Mutex::new(main_queue), // Start with main queue
231        }
232    }
233}
234
235/// Worker thread main loop
236///
237/// This matches C++ AGC's worker lambda (agc_compressor.cpp:1099-1270):
238/// ```cpp
239/// v_threads.emplace_back([&, i, n_t]() {
240///     auto zstd_cctx = ZSTD_createCCtx();
241///     auto zstd_dctx = ZSTD_createDCtx();
242///     uint32_t thread_id = i;
243///
244///     while(true) {
245///         task_t task;
246///         auto q_res = pq_contigs_desc_working->PopLarge(task);
247///         // ... process task ...
248///     }
249///
250///     ZSTD_freeCCtx(zstd_cctx);
251///     ZSTD_freeDCtx(zstd_dctx);
252/// });
253/// ```
254///
255/// # Arguments
256/// * `worker_id` - Thread ID (0 to num_workers-1)
257/// * `queue` - Priority queue for pulling tasks
258/// * `barrier` - Synchronization barrier for registration/new_splitters stages
259/// * `shared` - Shared state accessible to all workers
260pub fn worker_thread(
261    worker_id: usize,
262    num_workers: usize,
263    _queue: Arc<BoundedPriorityQueue<Task>>, // Legacy parameter, use shared.working_queue instead
264    barrier: Arc<Barrier>,
265    shared: Arc<SharedCompressorState>,
266) {
267    // TODO: Create per-thread ZSTD contexts
268    // let mut zstd_encoder = ...;
269    // let mut zstd_decoder = ...;
270
271    loop {
272        // Pop task from working queue (can switch between main and aux)
273        // Matches C++ AGC's pq_contigs_desc_working (agc_compressor.cpp:1108)
274        let queue = {
275            let working_queue_guard = shared.working_queue.lock().unwrap();
276            Arc::clone(&*working_queue_guard)
277        };
278        let (result, task_opt) = queue.pop_large();
279
280        match result {
281            // Queue empty but producers still active - retry
282            PopResult::Empty => continue,
283
284            // Queue empty and no producers - exit worker loop
285            PopResult::Completed => break,
286
287            // Successfully got a task - process it
288            PopResult::Normal => {
289                let task = task_opt.expect("PopResult::Normal should have task");
290
291                match task.stage {
292                    ContigProcessingStage::Registration => {
293                        // Handle registration synchronization (agc_compressor.cpp:1114-1185)
294                        handle_registration_stage(worker_id, &barrier, &shared);
295                        continue; // Return to top of loop
296                    }
297
298                    ContigProcessingStage::NewSplitters => {
299                        // Handle adaptive splitter finding (agc_compressor.cpp:1187-1237)
300                        handle_new_splitters_stage(worker_id, num_workers, &barrier, &shared);
301                        continue; // Return to top of loop
302                    }
303
304                    ContigProcessingStage::AllContigs => {
305                        // Preprocess contig before compression (agc_compressor.cpp:1241)
306                        // TODO: preprocess_raw_contig(&task.sequence);
307                    }
308
309                    ContigProcessingStage::HardContigs => {
310                        // Hard contigs don't need preprocessing (already normalized)
311                    }
312                }
313
314                // Compress contig (AllContigs and HardContigs both use this path)
315                let ctg_size = task.sequence.len();
316
317                if compress_contig_task(&task, worker_id, &barrier, &shared) {
318                    // Success: update progress counter (agc_compressor.cpp:1248-1255)
319                    let old_pb = shared
320                        .processed_bases
321                        .fetch_add(ctg_size, Ordering::Relaxed);
322                    let new_pb = old_pb + ctg_size;
323
324                    // Print progress every 10 MB
325                    if shared.verbosity > 0 && old_pb / 10_000_000 != new_pb / 10_000_000 {
326                        eprintln!("Compressed: {} Mb\r", new_pb / 1_000_000);
327                    }
328                } else {
329                    // Failed to compress well: save for reprocessing (agc_compressor.cpp:1258-1261)
330                    let mut raw_contigs = shared.raw_contigs.lock().unwrap();
331                    raw_contigs.push((
332                        task.sample_name.clone(),
333                        task.contig_name.clone(),
334                        task.sequence.clone(),
335                    ));
336                }
337
338                // Task memory automatically freed (Rust Drop)
339            }
340        }
341    }
342
343    // ZSTD contexts automatically freed (Rust Drop)
344}
345
346/// Handle registration stage synchronization
347///
348/// Matches C++ AGC's registration logic (agc_compressor.cpp:1114-1185)
349/// See BARRIER_USAGE_PATTERN.md for detailed documentation.
350fn handle_registration_stage(
351    worker_id: usize,
352    barrier: &Arc<Barrier>,
353    shared: &Arc<SharedCompressorState>,
354) {
355    // Barrier 1: All workers arrive
356    let wait_result = barrier.wait();
357
358    // Leader thread: register segments and process fallbacks
359    if wait_result.is_leader() {
360        register_segments(shared);
361        // TODO: process_fallback_minimizers(shared);
362    }
363
364    // Barrier 2: Wait for registration complete
365    barrier.wait();
366
367    // All threads: store their segments
368    store_segments(worker_id, shared);
369
370    // Barrier 3: Wait for storage complete
371    barrier.wait();
372
373    // Thread 0 or 1: cleanup and flush (C++ AGC pattern: BOTH threads do work!)
374    // See agc_compressor.cpp:1136-1180
375    if worker_id == 0 {
376        // TODO: buffered_seg_part.clear(max(1, num_workers - 1));
377        // TODO: update processed_samples
378        // TODO: store_contig_batch if needed
379        // TODO: flush archive buffers
380        // TODO: if adaptive_compression: switch back to main queue
381    } else if worker_id == 1 {
382        // TODO: update processed_samples (same as thread 0!)
383        // TODO: store_contig_batch if needed (same as thread 0!)
384        // TODO: flush archive buffers (same as thread 0!)
385    }
386
387    // Barrier 4: All ready to continue
388    barrier.wait();
389}
390
391/// Find new splitters for a hard contig
392///
393/// Matches C++ AGC's find_new_splitters (agc_compressor.cpp:2046-2077)
394///
395/// Algorithm:
396/// 1. Extract all k-mers from contig and find singletons
397/// 2. Exclude k-mers present in reference genome (singletons)
398/// 3. Exclude k-mers present in reference genome (duplicates)
399/// 4. Add remaining k-mers to thread-local splitter vector
400///
401/// # Arguments
402/// * `contig` - The contig sequence that needs new splitters
403/// * `thread_id` - Worker thread ID for vv_splitters indexing
404/// * `shared` - Shared compressor state with reference k-mers
405fn find_new_splitters(
406    contig: &ragc_common::Contig,
407    thread_id: usize,
408    shared: &Arc<SharedCompressorState>,
409) {
410    use crate::kmer_extract::{enumerate_kmers, remove_non_singletons};
411
412    // Step 1: Extract k-mers from contig and find singletons
413    let mut v_contig_kmers = enumerate_kmers(contig, shared.kmer_length);
414    v_contig_kmers.sort_unstable();
415    remove_non_singletons(&mut v_contig_kmers, 0);
416
417    if shared.verbosity > 1 {
418        eprintln!(
419            "find_new_splitters: contig has {} singleton k-mers",
420            v_contig_kmers.len()
421        );
422    }
423
424    // Step 2: Exclude k-mers in reference genome (singletons)
425    // C++ AGC uses v_candidate_kmers_offset to skip some candidates
426    // For now, we use the full v_candidate_kmers (offset=0)
427    let mut v_tmp = Vec::with_capacity(v_contig_kmers.len());
428    set_difference(&v_contig_kmers, &shared.v_candidate_kmers, &mut v_tmp);
429
430    if shared.verbosity > 1 {
431        eprintln!(
432            "find_new_splitters: {} k-mers after excluding reference singletons",
433            v_tmp.len()
434        );
435    }
436
437    // Step 3: Exclude k-mers in reference genome (duplicates)
438    v_contig_kmers.clear();
439    set_difference(&v_tmp, &shared.v_duplicated_kmers, &mut v_contig_kmers);
440
441    if shared.verbosity > 1 {
442        eprintln!(
443            "find_new_splitters: {} NEW splitters found",
444            v_contig_kmers.len()
445        );
446    }
447
448    // Step 4: Add to thread-local splitter vector
449    let mut vv_splitters = shared.vv_splitters.lock().unwrap();
450    vv_splitters[thread_id].extend(v_contig_kmers);
451}
452
453/// Compute set difference: result = a \ b (elements in a but not in b)
454///
455/// Both input vectors must be sorted. Matches std::set_difference behavior.
456fn set_difference(a: &[u64], b: &[u64], result: &mut Vec<u64>) {
457    result.clear();
458    let mut i = 0;
459    let mut j = 0;
460
461    while i < a.len() && j < b.len() {
462        if a[i] < b[j] {
463            result.push(a[i]);
464            i += 1;
465        } else if a[i] > b[j] {
466            j += 1;
467        } else {
468            // a[i] == b[j], skip
469            i += 1;
470            j += 1;
471        }
472    }
473
474    // Add remaining elements from a
475    result.extend_from_slice(&a[i..]);
476}
477
478/// Handle new splitters stage synchronization
479///
480/// Matches C++ AGC's new_splitters logic (agc_compressor.cpp:1187-1237)
481/// See BARRIER_USAGE_PATTERN.md for detailed documentation.
482fn handle_new_splitters_stage(
483    worker_id: usize,
484    num_workers: usize,
485    barrier: &Arc<Barrier>,
486    shared: &Arc<SharedCompressorState>,
487) {
488    // Barrier 1: All workers arrive
489    barrier.wait();
490
491    // bloom_insert logic: Insert new splitters into bloom filter and hash set
492    // In C++ AGC, this is a lambda defined inline (lines 1191-1209)
493    //
494    // Multi-threaded execution: All threads merge in parallel
495    // Single-threaded: Only thread 0 does the merge
496    if num_workers > 1 || worker_id == 0 {
497        let mut splitters = shared.splitters.lock().unwrap();
498        let mut bloom = shared.bloom_splitters.lock().unwrap();
499        let mut vv_splitters = shared.vv_splitters.lock().unwrap();
500
501        let mut total_new = 0;
502        for thread_splitters in vv_splitters.iter_mut() {
503            total_new += thread_splitters.len();
504            for &kmer in thread_splitters.iter() {
505                splitters.insert(kmer);
506                bloom.insert(kmer);
507            }
508            thread_splitters.clear();
509        }
510
511        if shared.verbosity > 0 && total_new > 0 {
512            eprintln!("Adaptive mode: Added {} new splitters", total_new);
513            eprintln!("Total splitters: {}", splitters.len());
514        }
515
516        // Check bloom filter filling factor and resize if > 0.3 (matches C++ AGC)
517        let filling_factor = bloom.filling_factor();
518        if filling_factor > 0.3 {
519            if shared.verbosity > 1 {
520                eprintln!(
521                    "Bloom filter filling factor {:.2}, resizing...",
522                    filling_factor
523                );
524            }
525
526            // Resize to accommodate current + expected growth
527            let new_size_bits = (splitters.len() as f64 / 0.25) as usize * 8; // 25% target fill
528            bloom.resize(new_size_bits);
529
530            // Re-insert all splitters
531            for &kmer in splitters.iter() {
532                bloom.insert(kmer);
533            }
534
535            if shared.verbosity > 1 {
536                eprintln!("Bloom filter resized to {} bits", new_size_bits);
537            }
538        }
539    }
540
541    // Thread 0: Re-enqueue hard contigs and switch queues
542    // Matches C++ AGC lines 1216-1236
543    if worker_id == 0 {
544        let mut raw_contigs = shared.raw_contigs.lock().unwrap();
545
546        if shared.verbosity > 0 && !raw_contigs.is_empty() {
547            eprintln!(
548                "Adaptive mode: Re-enqueueing {} hard contigs for reprocessing",
549                raw_contigs.len()
550            );
551        }
552
553        // Re-enqueue hard contigs into aux_queue with HardContigs stage
554        // Priority = 1 (higher than normal contigs), cost = sequence length
555        for (sample_name, contig_name, sequence) in raw_contigs.drain(..) {
556            let cost = sequence.len();
557            shared.aux_queue.emplace(
558                Task::new_contig(
559                    sample_name,
560                    contig_name,
561                    sequence,
562                    ContigProcessingStage::HardContigs,
563                ),
564                1, // priority
565                cost,
566            );
567        }
568
569        // Enqueue registration sync tokens for next round
570        // Priority = 0 (lowest), cost = 0
571        shared.aux_queue.emplace_many_no_cost(
572            Task::new_sync(ContigProcessingStage::Registration),
573            0, // priority
574            num_workers,
575        );
576
577        // Switch working queue to aux queue
578        // Matches C++ AGC: pq_contigs_desc_working = pq_contigs_desc_aux
579        let mut working_queue = shared.working_queue.lock().unwrap();
580        *working_queue = Arc::clone(&shared.aux_queue);
581
582        if shared.verbosity > 1 {
583            eprintln!("Switched to auxiliary queue for hard contig reprocessing");
584        }
585    }
586
587    // Barrier 2: Ready to continue with new splitters
588    // All workers synchronized after splitter merge
589    barrier.wait();
590}
591
592/// Compress a contig task
593///
594/// Matches C++ AGC's compress_contig (agc_compressor.cpp:2000-2054)
595///
596/// Returns:
597/// - true: Contig compressed successfully
598/// - false: Contig didn't compress well (needs adaptive splitters)
599fn compress_contig_task(
600    task: &Task,
601    worker_id: usize,
602    _barrier: &Arc<Barrier>,
603    shared: &Arc<SharedCompressorState>,
604) -> bool {
605    // Create compression context from shared state
606    let ctx = CompressionContext {
607        splitters: Arc::clone(&shared.splitters),
608        bloom_splitters: Arc::clone(&shared.bloom_splitters),
609        buffered_segments: Arc::clone(&shared.buffered_segments),
610        kmer_length: shared.kmer_length,
611        adaptive_mode: shared.adaptive_mode,
612        map_segments: Arc::clone(&shared.map_segments),
613        map_segments_terminators: Arc::clone(&shared.map_segments_terminators),
614        concatenated_genomes: shared.concatenated_genomes,
615    };
616
617    // Call the actual compression function
618    let success = compress_contig(&task.sample_name, &task.contig_name, &task.sequence, &ctx);
619
620    // Adaptive mode: Handle hard contigs (C++ AGC agc_compressor.cpp:2033-2039)
621    if shared.adaptive_mode
622        && !success
623        && task.stage == ContigProcessingStage::AllContigs
624        && task.sequence.len() >= 1000
625    {
626        // This is a hard contig - no splitters found, and it's large enough to matter
627        if shared.verbosity > 1 {
628            eprintln!(
629                "Hard contig detected: {}/{} ({} bp)",
630                task.sample_name,
631                task.contig_name,
632                task.sequence.len()
633            );
634        }
635
636        // Find new splitters from this contig
637        find_new_splitters(&task.sequence, worker_id, shared);
638
639        // Store contig for reprocessing after NewSplitters barrier
640        let mut raw_contigs = shared.raw_contigs.lock().unwrap();
641        raw_contigs.push((
642            task.sample_name.clone(),
643            task.contig_name.clone(),
644            task.sequence.clone(),
645        ));
646
647        // Return false to indicate contig wasn't processed yet
648        return false;
649    }
650
651    success
652}
653
654/// Register segments - assign group IDs to NEW segments
655///
656/// Matches C++ AGC's register_segments (agc_compressor.cpp:954-971)
657///
658/// Called by leader thread only (thread 0) during registration barrier.
659///
660/// # Steps
661/// 1. Sort KNOWN segments by (sample, contig, part_no)
662/// 2. Process NEW segments - assign group IDs
663/// 3. Register archive streams (placeholder for now)
664/// 4. Distribute raw group segments (if applicable)
665/// 5. Restart read pointer for parallel storage
666fn register_segments(shared: &Arc<SharedCompressorState>) {
667    let mut buffered = shared.buffered_segments.lock().unwrap();
668
669    // Step 1: Sort KNOWN segments in parallel
670    // TODO: Implement parallel sort with rayon
671    // For now: sequential sort is already done in BufferedSegments
672    buffered.sort_known(1); // Single-threaded for now
673
674    // Step 2: Process NEW segments - assign group IDs
675    // Pass global map_segments so new segments can find existing groups
676    let no_new = buffered.process_new(&shared.map_segments);
677
678    drop(buffered);
679
680    if no_new > 0 {
681        // Step 3: Register archive streams for new groups
682        let mut no_segments = shared.no_segments.lock().unwrap();
683        let current_no_segments = *no_segments;
684
685        if let Some(archive_mutex) = &shared.archive {
686            let mut archive = archive_mutex.lock().unwrap();
687
688            // Register streams for each new group
689            // Each group gets two streams: "seg-N" (delta) and "seg_dN" (reference)
690            for i in 0..no_new {
691                let seg_num = current_no_segments + i;
692                // Use AGC v3 format (version 3000)
693                archive.register_stream(&ragc_common::stream_delta_name(3000, seg_num));
694                archive.register_stream(&ragc_common::stream_ref_name(3000, seg_num));
695            }
696        }
697
698        *no_segments += no_new;
699        drop(no_segments);
700
701        // Step 4: Resize v_segments vector to accommodate new groups
702        let no_segments_value = *shared.no_segments.lock().unwrap();
703        let mut v_segments = shared.v_segments.lock().unwrap();
704        v_segments.resize_with(no_segments_value as usize, || None);
705    }
706
707    // Step 5: Distribute raw group segments (if applicable)
708    if shared.no_raw_groups > 0 {
709        let buffered = shared.buffered_segments.lock().unwrap();
710        buffered.distribute_segments(0, 0, shared.no_raw_groups);
711    }
712
713    // Step 6: Restart read pointer for parallel storage
714    let buffered = shared.buffered_segments.lock().unwrap();
715    buffered.restart_read_vec();
716}
717
718/// Store segments - write all buffered segments to archive
719///
720/// Matches C++ AGC's store_segments (agc_compressor.cpp:974-1050)
721///
722/// Called by ALL worker threads in parallel during registration barrier.
723///
724/// # Algorithm
725/// 1. Atomic get_vec_id() to claim a block of groups
726/// 2. For each group in block:
727///    - get_part() to pop segments
728///    - Create CSegment object on first use (lazy initialization)
729///    - Update map_segments and map_segments_terminators (CRITICAL!)
730///    - Write segment to archive
731/// 3. Batch collection metadata updates (every 32 segments)
732fn store_segments(_worker_id: usize, shared: &Arc<SharedCompressorState>) {
733    const MAX_BUFF_SIZE: usize = 32;
734    let mut buffered_placements: Vec<SegmentPlacement> = Vec::with_capacity(MAX_BUFF_SIZE);
735
736    let buffered = shared.buffered_segments.lock().unwrap();
737
738    loop {
739        // Step 1: Atomic block allocation
740        let block_group_id = buffered.get_vec_id();
741
742        if block_group_id < 0 {
743            break; // No more blocks
744        }
745
746        // Step 2: Process groups in block (backwards iteration)
747        for group_id in
748            (block_group_id - crate::segment_buffer::PART_ID_STEP + 1..=block_group_id).rev()
749        {
750            if buffered.is_empty_part(group_id) {
751                continue;
752            }
753
754            // Step 3: Process all segments in this group
755            while let Some((
756                kmer1,
757                kmer2,
758                sample_name,
759                contig_name,
760                seg_data,
761                is_rev_comp,
762                seg_part_no,
763            )) = buffered.get_part(group_id)
764            {
765                // Step 4: Create SegmentGroup on first use (lazy initialization)
766                let mut v_segments = shared.v_segments.lock().unwrap();
767                if group_id >= 0
768                    && (group_id as usize) < v_segments.len()
769                    && v_segments[group_id as usize].is_none()
770                {
771                    // Get stream IDs for this group
772                    if let Some(archive_mutex) = &shared.archive {
773                        let archive = archive_mutex.lock().unwrap();
774                        let stream_id = archive
775                            .get_stream_id(&ragc_common::stream_delta_name(3000, group_id as u32))
776                            .unwrap_or(0);
777                        let ref_stream_id = archive
778                            .get_stream_id(&ragc_common::stream_ref_name(3000, group_id as u32))
779                            .unwrap_or(0);
780                        drop(archive);
781
782                        v_segments[group_id as usize] =
783                            Some(SegmentGroup::new(group_id as u32, stream_id, ref_stream_id));
784                    }
785                }
786
787                // Step 5: Write segment to archive and get in_group_id
788                let mut in_group_id = 0;
789                if group_id >= 0 && (group_id as usize) < v_segments.len() {
790                    if let Some(segment_group) = &mut v_segments[group_id as usize] {
791                        if let Some(archive_mutex) = &shared.archive {
792                            let mut archive = archive_mutex.lock().unwrap();
793                            if let Ok(id) = segment_group.add_segment(&seg_data, &mut archive) {
794                                in_group_id = id;
795                            }
796                        }
797                    }
798                }
799                drop(v_segments);
800
801                // Step 6: Buffer collection metadata
802                buffered_placements.push(SegmentPlacement {
803                    sample_name,
804                    contig_name,
805                    place: seg_part_no as usize,
806                    group_id: group_id as u32,
807                    in_group_id,
808                    is_rev_comp,
809                    raw_length: seg_data.len() as u32,
810                });
811
812                // Flush batch if full
813                if buffered_placements.len() >= MAX_BUFF_SIZE {
814                    if let Some(collection_mutex) = &shared.collection {
815                        let mut collection = collection_mutex.lock().unwrap();
816                        for placement in &buffered_placements {
817                            let _ = collection.add_segment_placed(
818                                &placement.sample_name,
819                                &placement.contig_name,
820                                placement.place,
821                                placement.group_id,
822                                placement.in_group_id,
823                                placement.is_rev_comp,
824                                placement.raw_length,
825                            );
826                        }
827                    }
828                    buffered_placements.clear();
829                }
830
831                // Step 6: Update map_segments (CRITICAL REGISTRATION!)
832                let key = (kmer1, kmer2);
833                let mut map_segments = shared.map_segments.lock().unwrap();
834                map_segments
835                    .entry(key)
836                    .and_modify(|existing| {
837                        // Keep lower group_id (earlier samples are reference)
838                        if (group_id as u32) < *existing {
839                            *existing = group_id as u32;
840                        }
841                    })
842                    .or_insert(group_id as u32);
843                drop(map_segments);
844
845                // Step 7: Update map_segments_terminators (CRITICAL!)
846                if kmer1 != crate::contig_compression::MISSING_KMER
847                    && kmer2 != crate::contig_compression::MISSING_KMER
848                {
849                    let mut terminators = shared.map_segments_terminators.lock().unwrap();
850
851                    // Add k2 to k1's terminator list
852                    let list1 = terminators.entry(kmer1).or_insert_with(Vec::new);
853                    if !list1.contains(&kmer2) {
854                        list1.push(kmer2);
855                        list1.sort_unstable();
856                    }
857
858                    // Add k1 to k2's terminator list (if different)
859                    if kmer1 != kmer2 {
860                        let list2 = terminators.entry(kmer2).or_insert_with(Vec::new);
861                        if !list2.contains(&kmer1) {
862                            list2.push(kmer1);
863                            list2.sort_unstable();
864                        }
865                    }
866
867                    drop(terminators);
868                }
869            }
870        }
871    }
872
873    // Step 9: Final flush of remaining placements
874    if !buffered_placements.is_empty() {
875        if let Some(collection_mutex) = &shared.collection {
876            let mut collection = collection_mutex.lock().unwrap();
877            for placement in &buffered_placements {
878                let _ = collection.add_segment_placed(
879                    &placement.sample_name,
880                    &placement.contig_name,
881                    placement.place,
882                    placement.group_id,
883                    placement.in_group_id,
884                    placement.is_rev_comp,
885                    placement.raw_length,
886                );
887            }
888        }
889    }
890}
891
892// ============================================================================
893// Phase 4: Main Compression Flow
894// ============================================================================
895
896/// Create an AGC archive from FASTA files - complete end-to-end pipeline
897///
898/// This is the top-level API for creating AGC archives using the streaming pipeline.
899///
900/// # Arguments
901/// * `output_path` - Path to write the AGC archive
902/// * `sample_files` - Vector of (sample_name, file_path) pairs
903/// * `splitters` - Set of splitter k-mers (from determine_splitters)
904/// * `kmer_length` - K-mer length for segmentation
905/// * `segment_size` - Segment size parameter for collection
906/// * `num_threads` - Number of threads to use
907/// * `adaptive_mode` - Enable adaptive splitter finding
908/// * `concatenated_genomes` - Treat all contigs as one sample
909/// * `verbosity` - Verbosity level (0=quiet, 1=normal, 2=verbose)
910///
911/// # Returns
912/// Ok(()) on success, Err on failure
913pub fn create_agc_archive(
914    output_path: &str,
915    sample_files: Vec<(String, String)>,
916    splitters: AHashSet<u64>,
917    candidate_kmers: AHashSet<u64>,
918    duplicated_kmers: AHashSet<u64>,
919    kmer_length: usize,
920    segment_size: u32,
921    num_threads: usize,
922    adaptive_mode: bool,
923    concatenated_genomes: bool,
924    verbosity: usize,
925) -> anyhow::Result<()> {
926    use ragc_common::CollectionV3;
927
928    if sample_files.is_empty() {
929        return Ok(());
930    }
931
932    let num_samples = sample_files.len();
933
934    // Create archive for writing
935    let mut archive = Archive::new_writer();
936    archive.open(output_path)?;
937
938    // Write file_type_info stream (C++ AGC compatibility)
939    {
940        let mut data = Vec::new();
941        let append_str = |data: &mut Vec<u8>, s: &str| {
942            data.extend_from_slice(s.as_bytes());
943            data.push(0);
944        };
945
946        append_str(&mut data, "producer");
947        append_str(&mut data, "ragc");
948
949        append_str(&mut data, "producer_version_major");
950        append_str(&mut data, &ragc_common::AGC_FILE_MAJOR.to_string());
951
952        append_str(&mut data, "producer_version_minor");
953        append_str(&mut data, &ragc_common::AGC_FILE_MINOR.to_string());
954
955        append_str(&mut data, "producer_version_build");
956        append_str(&mut data, "0");
957
958        append_str(&mut data, "file_version_major");
959        append_str(&mut data, &ragc_common::AGC_FILE_MAJOR.to_string());
960
961        append_str(&mut data, "file_version_minor");
962        append_str(&mut data, &ragc_common::AGC_FILE_MINOR.to_string());
963
964        append_str(&mut data, "comment");
965        append_str(
966            &mut data,
967            &format!(
968                "RAGC v.{}.{}",
969                ragc_common::AGC_FILE_MAJOR,
970                ragc_common::AGC_FILE_MINOR
971            ),
972        );
973
974        let stream_id = archive.register_stream("file_type_info");
975        archive.add_part(stream_id, &data, 7)?; // 7 key-value pairs
976
977        if verbosity > 0 {
978            eprintln!(
979                "Wrote file_type_info: version {}.{}",
980                ragc_common::AGC_FILE_MAJOR,
981                ragc_common::AGC_FILE_MINOR
982            );
983        }
984    }
985
986    // Write params stream (kmer_length, min_match_len, pack_cardinality, segment_size)
987    {
988        let params_stream_id = archive.register_stream("params");
989        let mut params_data = Vec::new();
990
991        // Standard params format (16 bytes for C++ AGC compatibility)
992        params_data.extend_from_slice(&(kmer_length as u32).to_le_bytes());
993        params_data.extend_from_slice(&20u32.to_le_bytes()); // min_match_len (default)
994        params_data.extend_from_slice(&50u32.to_le_bytes()); // pack_cardinality (default)
995        params_data.extend_from_slice(&segment_size.to_le_bytes());
996
997        archive.add_part(params_stream_id, &params_data, 0)?;
998
999        if verbosity > 0 {
1000            eprintln!(
1001                "Wrote params: k={}, segment_size={}",
1002                kmer_length, segment_size
1003            );
1004        }
1005    }
1006
1007    // Create collection for metadata
1008    let mut collection = CollectionV3::new();
1009    collection.set_config(segment_size, kmer_length as u32, None);
1010
1011    // Register collection streams in archive
1012    collection.prepare_for_compression(&mut archive)?;
1013
1014    // Wrap in Arc<Mutex<>> for shared access during compression
1015    let archive = Arc::new(Mutex::new(archive));
1016    let collection = Arc::new(Mutex::new(collection));
1017
1018    // Run compression pipeline
1019    compress_samples_streaming_with_archive(
1020        sample_files,
1021        splitters,
1022        candidate_kmers,
1023        duplicated_kmers,
1024        kmer_length,
1025        num_threads,
1026        adaptive_mode,
1027        concatenated_genomes,
1028        verbosity,
1029        Some(archive.clone()),
1030        Some(collection.clone()),
1031    )
1032    .map_err(|e| anyhow::anyhow!(e))?;
1033
1034    // Serialize collection metadata to archive
1035    {
1036        let mut archive_guard = archive.lock().unwrap();
1037        let mut collection_guard = collection.lock().unwrap();
1038
1039        if verbosity > 0 {
1040            eprintln!(
1041                "Serializing collection metadata for {} samples...",
1042                num_samples
1043            );
1044        }
1045
1046        // Write sample names
1047        collection_guard.store_batch_sample_names(&mut archive_guard)?;
1048
1049        // Write contig names and segment details
1050        collection_guard.store_contig_batch(&mut archive_guard, 0, num_samples)?;
1051
1052        if verbosity > 0 {
1053            eprintln!("Collection metadata serialized successfully");
1054        }
1055    }
1056
1057    // Close archive (writes footer)
1058    let mut archive = archive.lock().unwrap();
1059    archive.close()?;
1060
1061    if verbosity > 0 {
1062        eprintln!("AGC archive created: {}", output_path);
1063    }
1064
1065    Ok(())
1066}
1067
1068/// Compress sample files using streaming pipeline with priority queue and worker threads
1069///
1070/// Internal function - use create_agc_archive() for complete end-to-end pipeline
1071///
1072/// Matches C++ AGC's AddSampleFiles() (agc_compressor.cpp:2121-2270)
1073///
1074/// # Arguments
1075/// * `sample_files` - Vector of (sample_name, file_path) pairs
1076/// * `splitters` - Set of splitter k-mers
1077/// * `kmer_length` - K-mer length for segmentation
1078/// * `num_threads` - Number of threads to use
1079/// * `adaptive_mode` - Enable adaptive splitter finding
1080/// * `concatenated_genomes` - Treat all contigs as one sample
1081/// * `verbosity` - Verbosity level (0=quiet, 1=normal, 2=verbose)
1082/// * `archive` - Optional archive for writing compressed segments
1083/// * `collection` - Optional collection for metadata tracking
1084///
1085/// # Returns
1086/// Ok(()) on success, Err on failure
1087fn compress_samples_streaming_with_archive(
1088    sample_files: Vec<(String, String)>,
1089    splitters: AHashSet<u64>,
1090    candidate_kmers: AHashSet<u64>,
1091    duplicated_kmers: AHashSet<u64>,
1092    kmer_length: usize,
1093    num_threads: usize,
1094    adaptive_mode: bool,
1095    concatenated_genomes: bool,
1096    verbosity: usize,
1097    archive: Option<Arc<Mutex<Archive>>>,
1098    collection: Option<Arc<Mutex<ragc_common::CollectionV3>>>,
1099) -> Result<(), String> {
1100    use crate::genome_io::GenomeIO;
1101
1102    if sample_files.is_empty() {
1103        return Ok(());
1104    }
1105
1106    // Step 1: Queue initialization (agc_compressor.cpp:2128-2132)
1107    let queue_capacity = std::cmp::max(2_u64 << 30, num_threads as u64 * (192_u64 << 20));
1108    let queue = Arc::new(BoundedPriorityQueue::new(1, queue_capacity as usize));
1109
1110    // Step 2: Worker thread count (agc_compressor.cpp:2134)
1111    let no_workers = if num_threads < 8 {
1112        num_threads
1113    } else {
1114        num_threads - 1
1115    };
1116
1117    // Convert reference k-mers to sorted vectors (for set_difference in adaptive mode)
1118    let mut v_candidate_kmers: Vec<u64> = candidate_kmers.into_iter().collect();
1119    v_candidate_kmers.sort_unstable();
1120
1121    let mut v_duplicated_kmers: Vec<u64> = duplicated_kmers.into_iter().collect();
1122    v_duplicated_kmers.sort_unstable();
1123
1124    // Step 3: Shared state initialization
1125    // Initialize bloom filter and populate with base splitters
1126    let mut bloom_filter = crate::bloom_filter::BloomFilter::new(
1127        splitters.len() * 8, // 8 bits per splitter
1128    );
1129    for &kmer in &splitters {
1130        bloom_filter.insert(kmer);
1131    }
1132
1133    // Create auxiliary queue for adaptive mode
1134    let aux_queue = Arc::new(BoundedPriorityQueue::new(1, usize::MAX));
1135
1136    let shared = Arc::new(SharedCompressorState {
1137        processed_bases: AtomicUsize::new(0),
1138        processed_samples: AtomicUsize::new(0),
1139        raw_contigs: Mutex::new(Vec::new()),
1140        verbosity,
1141        buffered_segments: Arc::new(Mutex::new(BufferedSegments::new(0))),
1142        splitters: Arc::new(Mutex::new(splitters)),
1143        bloom_splitters: Arc::new(Mutex::new(bloom_filter)),
1144        vv_splitters: Mutex::new(vec![Vec::new(); no_workers]),
1145        v_candidate_kmers,
1146        v_duplicated_kmers,
1147        kmer_length,
1148        adaptive_mode,
1149        map_segments: Arc::new(Mutex::new(AHashMap::new())),
1150        map_segments_terminators: Arc::new(Mutex::new(AHashMap::new())),
1151        concatenated_genomes,
1152        no_segments: Arc::new(Mutex::new(0)),
1153        no_raw_groups: 0,
1154        archive,
1155        v_segments: Arc::new(Mutex::new(Vec::new())),
1156        collection,
1157        aux_queue,
1158        working_queue: Mutex::new(Arc::clone(&queue)), // Start with main queue
1159    });
1160
1161    // Step 4: Thread spawning (agc_compressor.cpp:2136-2141)
1162    let barrier = Arc::new(Barrier::new(no_workers));
1163    let mut worker_handles = Vec::with_capacity(no_workers);
1164
1165    for worker_id in 0..no_workers {
1166        let q = queue.clone();
1167        let b = barrier.clone();
1168        let s = shared.clone();
1169
1170        let handle = std::thread::spawn(move || {
1171            worker_thread(worker_id, no_workers, q, b, s);
1172        });
1173
1174        worker_handles.push(handle);
1175    }
1176
1177    // Step 5: Main processing loop (agc_compressor.cpp:2163-2242)
1178    let mut sample_priority = usize::MAX;
1179    let mut _cnt_contigs_in_sample = 0;
1180    const PACK_CARDINALITY: usize = 50; // TODO: Get from config
1181
1182    for (sample_name, file_path) in sample_files {
1183        if verbosity > 0 {
1184            eprintln!("Processing sample: {} from {}", sample_name, file_path);
1185        }
1186
1187        // Open FASTA file
1188        let mut gio = match GenomeIO::open(&file_path) {
1189            Ok(g) => g,
1190            Err(e) => {
1191                eprintln!("Cannot open file {}: {}", file_path, e);
1192                continue;
1193            }
1194        };
1195
1196        let mut any_contigs_added = false;
1197
1198        // Read contigs from file
1199        while let Ok(Some((contig_name, sequence))) = gio.read_contig_raw() {
1200            if concatenated_genomes {
1201                // Concatenated mode: treat all contigs as one sample (empty name)
1202                // Matches C++ AGC behavior: all genomes concatenated into single sample
1203                let concat_sample_name = String::from("");
1204
1205                // Register contig with empty sample name
1206                if let Some(collection_mutex) = &shared.collection {
1207                    let mut collection = collection_mutex.lock().unwrap();
1208                    let _ = collection.register_sample_contig(&concat_sample_name, &contig_name);
1209                }
1210
1211                let cost = sequence.len();
1212                queue.emplace(
1213                    Task::new_contig(
1214                        concat_sample_name,
1215                        contig_name.clone(),
1216                        sequence,
1217                        ContigProcessingStage::AllContigs,
1218                    ),
1219                    sample_priority,
1220                    cost,
1221                );
1222
1223                any_contigs_added = true;
1224            } else {
1225                // Normal mode: one sample per file
1226                // Register sample/contig with collection
1227                if let Some(collection_mutex) = &shared.collection {
1228                    let mut collection = collection_mutex.lock().unwrap();
1229                    let _ = collection.register_sample_contig(&sample_name, &contig_name);
1230                }
1231
1232                let cost = sequence.len();
1233                queue.emplace(
1234                    Task::new_contig(
1235                        sample_name.clone(),
1236                        contig_name.clone(),
1237                        sequence,
1238                        ContigProcessingStage::AllContigs,
1239                    ),
1240                    sample_priority,
1241                    cost,
1242                );
1243
1244                any_contigs_added = true;
1245            }
1246        }
1247
1248        // Step 6: Send synchronization tokens after each sample (agc_compressor.cpp:2148-2155)
1249        if !concatenated_genomes && any_contigs_added {
1250            let sync_stage = if adaptive_mode {
1251                ContigProcessingStage::NewSplitters
1252            } else {
1253                ContigProcessingStage::Registration
1254            };
1255
1256            // Send exactly no_workers sync tokens (0 cost)
1257            queue.emplace_many_no_cost(Task::new_sync(sync_stage), sample_priority, no_workers);
1258
1259            sample_priority -= 1;
1260        }
1261    }
1262
1263    // Step 7: Signal completion and wait for workers (agc_compressor.cpp:2254-2256)
1264    queue.mark_completed();
1265
1266    for handle in worker_handles {
1267        handle.join().map_err(|_| "Worker thread panicked")?;
1268    }
1269
1270    if verbosity > 0 {
1271        let total_bases = shared.processed_bases.load(Ordering::Relaxed);
1272        eprintln!("Compression complete: {} bases processed", total_bases);
1273    }
1274
1275    Ok(())
1276}
1277
1278#[cfg(test)]
1279mod tests {
1280    use super::*;
1281
1282    #[test]
1283    fn test_shared_state_creation() {
1284        let queue = Arc::new(BoundedPriorityQueue::new(1, 1000));
1285        let state = SharedCompressorState::new(1, 21, false, false, 0, queue);
1286        assert_eq!(state.verbosity, 1);
1287        assert_eq!(state.kmer_length, 21);
1288        assert_eq!(state.adaptive_mode, false);
1289        assert_eq!(state.concatenated_genomes, false);
1290        assert_eq!(state.no_raw_groups, 0);
1291        assert_eq!(state.processed_bases.load(Ordering::Relaxed), 0);
1292        assert_eq!(state.processed_samples.load(Ordering::Relaxed), 0);
1293        assert_eq!(state.raw_contigs.lock().unwrap().len(), 0);
1294        assert_eq!(*state.no_segments.lock().unwrap(), 0);
1295    }
1296
1297    #[test]
1298    fn test_worker_thread_completion() {
1299        use std::thread;
1300
1301        let queue = Arc::new(BoundedPriorityQueue::new(1, 1000));
1302        let barrier = Arc::new(Barrier::new(2));
1303        let shared = Arc::new(SharedCompressorState::new(
1304            0,
1305            21,
1306            false,
1307            false,
1308            0,
1309            queue.clone(),
1310        ));
1311
1312        // Mark queue as completed (no tasks)
1313        queue.mark_completed();
1314
1315        // Spawn two workers
1316        let handles: Vec<_> = (0..2)
1317            .map(|worker_id| {
1318                let q = queue.clone();
1319                let b = barrier.clone();
1320                let s = shared.clone();
1321
1322                thread::spawn(move || {
1323                    worker_thread(worker_id, 2, q, b, s);
1324                })
1325            })
1326            .collect();
1327
1328        // Workers should exit immediately
1329        for handle in handles {
1330            handle.join().unwrap();
1331        }
1332    }
1333
1334    #[test]
1335    fn test_worker_thread_with_task() {
1336        use std::thread;
1337
1338        let queue = Arc::new(BoundedPriorityQueue::new(1, 1000));
1339        let barrier = Arc::new(Barrier::new(1));
1340        let shared = Arc::new(SharedCompressorState::new(
1341            0,
1342            21,
1343            false,
1344            false,
1345            0,
1346            queue.clone(),
1347        ));
1348
1349        // Enqueue a simple task
1350        queue.emplace(
1351            Task::new_contig(
1352                "sample1".to_string(),
1353                "chr1".to_string(),
1354                vec![0, 1, 2, 3, 0, 1, 2, 3], // 8 bases
1355                ContigProcessingStage::AllContigs,
1356            ),
1357            100,
1358            8,
1359        );
1360        queue.mark_completed();
1361
1362        // Spawn single worker
1363        let q = queue.clone();
1364        let b = barrier.clone();
1365        let s = shared.clone();
1366
1367        let handle = thread::spawn(move || {
1368            worker_thread(0, 1, q, b, s);
1369        });
1370
1371        handle.join().unwrap();
1372
1373        // Verify task was processed (compress_contig_task returns true)
1374        assert_eq!(shared.processed_bases.load(Ordering::Relaxed), 8);
1375    }
1376}