ragc_core/
worker.rs

1// Worker thread implementation for streaming compression pipeline
2// Matches C++ AGC's start_compressing_threads (agc_compressor.cpp:1097-1270)
3
4use crate::contig_compression::{compress_contig, CompressionContext};
5use crate::priority_queue::{BoundedPriorityQueue, PopResult};
6use crate::segment_buffer::BufferedSegments;
7use crate::segment_compression::compress_reference_segment;
8use crate::task::{ContigProcessingStage, Task};
9use crate::zstd_pool::compress_segment_pooled;
10use ragc_common::{Archive, Contig};
11use std::collections::{HashMap, HashSet};
12use std::sync::atomic::{AtomicUsize, Ordering};
13use std::sync::{Arc, Barrier, Mutex};
14
15/// Segment placement info for collection metadata
16///
17/// Matches C++ AGC's segment placement tracking (agc_compressor.cpp:974-1050)
18#[derive(Debug, Clone)]
19struct SegmentPlacement {
20    sample_name: String,
21    contig_name: String,
22    place: usize, // Segment index within contig (seg_part_no)
23    group_id: u32,
24    in_group_id: u32,
25    is_rev_comp: bool,
26    raw_length: u32, // Uncompressed segment length
27}
28
29/// Segment group manager matching C++ AGC's CSegment
30///
31/// Manages a group of segments with the same (kmer1, kmer2) pair:
32/// - Stores reference segment (first in group)
33/// - Buffers subsequent delta segments
34/// - Writes to archive streams
35struct SegmentGroup {
36    group_id: u32,
37    stream_id: usize,          // Delta stream for packed segments
38    ref_stream_id: usize,      // Reference stream for first segment
39    reference: Option<Contig>, // First segment (reference for LZ encoding)
40    ref_written: bool,         // Whether reference has been written
41    in_group_counter: u32,     // Counter for in_group_id assignment
42}
43
44impl SegmentGroup {
45    fn new(group_id: u32, stream_id: usize, ref_stream_id: usize) -> Self {
46        SegmentGroup {
47            group_id,
48            stream_id,
49            ref_stream_id,
50            reference: None,
51            ref_written: false,
52            in_group_counter: 0,
53        }
54    }
55
56    /// Add a segment to this group
57    ///
58    /// The first segment becomes the reference, subsequent segments are delta-encoded.
59    /// Returns in_group_id (0 for reference, 1+ for delta segments)
60    fn add_segment(&mut self, seg_data: &[u8], archive: &mut Archive) -> anyhow::Result<u32> {
61        if self.reference.is_none() {
62            // First segment - store as reference with adaptive compression
63            let seg_vec = seg_data.to_vec();
64            self.reference = Some(seg_vec.clone());
65
66            // Compress reference segment (chooses between plain ZSTD or tuple packing)
67            let (compressed, marker) = compress_reference_segment(&seg_vec)?;
68
69            // Write compressed reference to archive with marker byte as metadata
70            archive.add_part(self.ref_stream_id, &compressed, marker as u64)?;
71            self.ref_written = true;
72
73            let in_group_id = self.in_group_counter;
74            self.in_group_counter += 1;
75            Ok(in_group_id) // in_group_id = 0 for reference
76        } else {
77            // Subsequent segment - delta encode against reference
78            // TODO: Implement LZ encoding - for now just compress with ZSTD
79            let seg_vec = seg_data.to_vec();
80            let compressed = compress_segment_pooled(&seg_vec, 17)?;
81
82            // Write compressed delta to archive (marker 0 = plain ZSTD)
83            archive.add_part(self.stream_id, &compressed, 0)?;
84
85            let in_group_id = self.in_group_counter;
86            self.in_group_counter += 1;
87            Ok(in_group_id)
88        }
89    }
90}
91
92/// Shared state accessible to all worker threads
93///
94/// This matches C++ AGC's shared variables captured by lambda (agc_compressor.cpp:1099):
95/// - Atomic counters (processed_bases, processed_samples)
96/// - Mutexes for shared collections (v_raw_contigs, vv_fallback_minimizers, etc.)
97/// - Archive writer, collection descriptor, etc.
98pub struct SharedCompressorState {
99    /// Total bases processed (for progress tracking)
100    pub processed_bases: AtomicUsize,
101
102    /// Number of samples processed (for progress tracking)
103    pub processed_samples: AtomicUsize,
104
105    /// Contigs that failed to compress well (need adaptive splitters)
106    /// Matches C++ AGC's v_raw_contigs (protected by mtx_raw_contigs)
107    pub raw_contigs: Mutex<Vec<(String, String, Vec<u8>)>>,
108
109    /// Verbosity level (0 = quiet, 1 = normal, 2 = verbose)
110    pub verbosity: usize,
111
112    /// Buffered segments (KNOWN + NEW)
113    /// Matches C++ AGC's CBufferedSegPart buffered_seg_part
114    pub buffered_segments: Arc<Mutex<BufferedSegments>>,
115
116    /// Splitter k-mers (exact set)
117    /// Matches C++ AGC's hs_splitters
118    pub splitters: Arc<Mutex<HashSet<u64>>>,
119
120    /// Bloom filter for splitters (fast probabilistic check)
121    /// Matches C++ AGC's bloom_splitters
122    pub bloom_splitters: Arc<Mutex<crate::bloom_filter::BloomFilter>>,
123
124    /// Per-thread vectors for accumulating new splitters (adaptive mode)
125    /// Matches C++ AGC's vv_splitters (agc_compressor.h:721)
126    /// Each worker thread accumulates splitters, then merges at barrier
127    pub vv_splitters: Mutex<Vec<Vec<u64>>>,
128
129    /// Reference genome singleton k-mers (for adaptive mode exclusion)
130    /// Matches C++ AGC's v_candidate_kmers (agc_compressor.h:708)
131    /// Sorted for efficient set_difference operations
132    pub v_candidate_kmers: Vec<u64>,
133
134    /// Reference genome duplicated k-mers (for adaptive mode exclusion)
135    /// Matches C++ AGC's v_duplicated_kmers (agc_compressor.h:710)
136    /// Sorted for efficient set_difference operations
137    pub v_duplicated_kmers: Vec<u64>,
138
139    /// K-mer length for segmentation
140    pub kmer_length: usize,
141
142    /// Adaptive compression mode (find new splitters for hard contigs)
143    pub adaptive_mode: bool,
144
145    /// Map: (kmer1, kmer2) → group_id
146    /// Matches C++ AGC's map_segments
147    /// Lower group_id wins when same (k1, k2) pair (earlier samples are reference)
148    pub map_segments: Arc<Mutex<HashMap<(u64, u64), u32>>>,
149
150    /// Map: kmer → Vec<kmer> (sorted terminators)
151    /// Matches C++ AGC's map_segments_terminators
152    /// Used for split detection: find shared terminators between k1 and k2
153    pub map_segments_terminators: Arc<Mutex<HashMap<u64, Vec<u64>>>>,
154
155    /// Concatenated genomes mode (treat all contigs as one sample)
156    pub concatenated_genomes: bool,
157
158    /// Total number of segment groups (assigned group IDs)
159    /// Matches C++ AGC's no_segments
160    pub no_segments: Arc<Mutex<u32>>,
161
162    /// Number of raw groups (groups without LZ encoding)
163    /// Matches C++ AGC's no_raw_groups
164    pub no_raw_groups: u32,
165
166    /// Archive writer for segment output
167    /// Matches C++ AGC's out_archive
168    pub archive: Option<Arc<Mutex<Archive>>>,
169
170    /// Segment groups indexed by group_id
171    /// Matches C++ AGC's v_segments (vector<CSegment*>)
172    /// Lazily created when first segment arrives for that group
173    pub v_segments: Arc<Mutex<Vec<Option<SegmentGroup>>>>,
174
175    /// Collection metadata for samples/contigs/segments
176    /// Matches C++ AGC's collection_desc
177    pub collection: Option<Arc<Mutex<ragc_common::CollectionV3>>>,
178
179    /// Auxiliary queue for adaptive mode (re-enqueue hard contigs)
180    /// Matches C++ AGC's pq_contigs_desc_aux
181    pub aux_queue: Arc<BoundedPriorityQueue<Task>>,
182
183    /// Working queue pointer (switches between main and aux)
184    /// Matches C++ AGC's pq_contigs_desc_working
185    pub working_queue: Mutex<Arc<BoundedPriorityQueue<Task>>>,
186    // TODO: Add more shared state as needed:
187    // - vv_fallback_minimizers: Mutex<Vec<Vec<Vec<u64>>>>
188    // - vv_splitters: Mutex<Vec<Vec<u64>>>
189}
190
191impl SharedCompressorState {
192    /// Create new shared state
193    pub fn new(
194        verbosity: usize,
195        kmer_length: usize,
196        adaptive_mode: bool,
197        concatenated_genomes: bool,
198        no_raw_groups: u32,
199        main_queue: Arc<BoundedPriorityQueue<Task>>,
200    ) -> Self {
201        // Create bloom filter sized for expected splitters
202        // C++ AGC uses ~10K splitters, allocate 8 bits/item = 80K bits = 10KB
203        let bloom_filter = crate::bloom_filter::BloomFilter::new(80 * 1024);
204
205        // Create auxiliary queue for adaptive mode (unlimited capacity)
206        let aux_queue = Arc::new(BoundedPriorityQueue::new(1, usize::MAX));
207
208        SharedCompressorState {
209            processed_bases: AtomicUsize::new(0),
210            processed_samples: AtomicUsize::new(0),
211            raw_contigs: Mutex::new(Vec::new()),
212            verbosity,
213            buffered_segments: Arc::new(Mutex::new(BufferedSegments::new(no_raw_groups as usize))),
214            splitters: Arc::new(Mutex::new(HashSet::new())),
215            bloom_splitters: Arc::new(Mutex::new(bloom_filter)),
216            vv_splitters: Mutex::new(Vec::new()),
217            v_candidate_kmers: Vec::new(),
218            v_duplicated_kmers: Vec::new(),
219            kmer_length,
220            adaptive_mode,
221            map_segments: Arc::new(Mutex::new(HashMap::new())),
222            map_segments_terminators: Arc::new(Mutex::new(HashMap::new())),
223            concatenated_genomes,
224            no_segments: Arc::new(Mutex::new(0)),
225            no_raw_groups,
226            archive: None, // Set later when archive is created
227            v_segments: Arc::new(Mutex::new(Vec::new())),
228            collection: None, // Set later when collection is created
229            aux_queue,
230            working_queue: Mutex::new(main_queue), // Start with main queue
231        }
232    }
233}
234
235/// Worker thread main loop
236///
237/// This matches C++ AGC's worker lambda (agc_compressor.cpp:1099-1270):
238/// ```cpp
239/// v_threads.emplace_back([&, i, n_t]() {
240///     auto zstd_cctx = ZSTD_createCCtx();
241///     auto zstd_dctx = ZSTD_createDCtx();
242///     uint32_t thread_id = i;
243///
244///     while(true) {
245///         task_t task;
246///         auto q_res = pq_contigs_desc_working->PopLarge(task);
247///         // ... process task ...
248///     }
249///
250///     ZSTD_freeCCtx(zstd_cctx);
251///     ZSTD_freeDCtx(zstd_dctx);
252/// });
253/// ```
254///
255/// # Arguments
256/// * `worker_id` - Thread ID (0 to num_workers-1)
257/// * `queue` - Priority queue for pulling tasks
258/// * `barrier` - Synchronization barrier for registration/new_splitters stages
259/// * `shared` - Shared state accessible to all workers
260pub fn worker_thread(
261    worker_id: usize,
262    num_workers: usize,
263    _queue: Arc<BoundedPriorityQueue<Task>>, // Legacy parameter, use shared.working_queue instead
264    barrier: Arc<Barrier>,
265    shared: Arc<SharedCompressorState>,
266) {
267    // TODO: Create per-thread ZSTD contexts
268    // let mut zstd_encoder = ...;
269    // let mut zstd_decoder = ...;
270
271    loop {
272        // Pop task from working queue (can switch between main and aux)
273        // Matches C++ AGC's pq_contigs_desc_working (agc_compressor.cpp:1108)
274        let queue = {
275            let working_queue_guard = shared.working_queue.lock().unwrap();
276            Arc::clone(&*working_queue_guard)
277        };
278        let (result, task_opt) = queue.pop_large();
279
280        match result {
281            // Queue empty but producers still active - retry
282            PopResult::Empty => continue,
283
284            // Queue empty and no producers - exit worker loop
285            PopResult::Completed => break,
286
287            // Successfully got a task - process it
288            PopResult::Normal => {
289                let task = task_opt.expect("PopResult::Normal should have task");
290
291                match task.stage {
292                    ContigProcessingStage::Registration => {
293                        // Handle registration synchronization (agc_compressor.cpp:1114-1185)
294                        handle_registration_stage(worker_id, &barrier, &shared);
295                        continue; // Return to top of loop
296                    }
297
298                    ContigProcessingStage::NewSplitters => {
299                        // Handle adaptive splitter finding (agc_compressor.cpp:1187-1237)
300                        handle_new_splitters_stage(worker_id, num_workers, &barrier, &shared);
301                        continue; // Return to top of loop
302                    }
303
304                    ContigProcessingStage::AllContigs => {
305                        // Preprocess contig before compression (agc_compressor.cpp:1241)
306                        // TODO: preprocess_raw_contig(&task.sequence);
307                    }
308
309                    ContigProcessingStage::HardContigs => {
310                        // Hard contigs don't need preprocessing (already normalized)
311                    }
312                }
313
314                // Compress contig (AllContigs and HardContigs both use this path)
315                let ctg_size = task.sequence.len();
316
317                if compress_contig_task(&task, worker_id, &barrier, &shared) {
318                    // Success: update progress counter (agc_compressor.cpp:1248-1255)
319                    let old_pb = shared
320                        .processed_bases
321                        .fetch_add(ctg_size, Ordering::Relaxed);
322                    let new_pb = old_pb + ctg_size;
323
324                    // Print progress every 10 MB
325                    if shared.verbosity > 0 && old_pb / 10_000_000 != new_pb / 10_000_000 {
326                        eprintln!("Compressed: {} Mb\r", new_pb / 1_000_000);
327                    }
328                } else {
329                    // Failed to compress well: save for reprocessing (agc_compressor.cpp:1258-1261)
330                    let mut raw_contigs = shared.raw_contigs.lock().unwrap();
331                    raw_contigs.push((
332                        task.sample_name.clone(),
333                        task.contig_name.clone(),
334                        task.sequence.clone(),
335                    ));
336                }
337
338                // Task memory automatically freed (Rust Drop)
339            }
340        }
341    }
342
343    // ZSTD contexts automatically freed (Rust Drop)
344}
345
346/// Handle registration stage synchronization
347///
348/// Matches C++ AGC's registration logic (agc_compressor.cpp:1114-1185)
349/// See BARRIER_USAGE_PATTERN.md for detailed documentation.
350fn handle_registration_stage(
351    worker_id: usize,
352    barrier: &Arc<Barrier>,
353    shared: &Arc<SharedCompressorState>,
354) {
355    // Barrier 1: All workers arrive
356    let wait_result = barrier.wait();
357
358    // Leader thread: register segments and process fallbacks
359    if wait_result.is_leader() {
360        register_segments(shared);
361        // TODO: process_fallback_minimizers(shared);
362    }
363
364    // Barrier 2: Wait for registration complete
365    barrier.wait();
366
367    // All threads: store their segments
368    store_segments(worker_id, shared);
369
370    // Barrier 3: Wait for storage complete
371    barrier.wait();
372
373    // Thread 0 or 1: cleanup and flush (C++ AGC pattern: BOTH threads do work!)
374    // See agc_compressor.cpp:1136-1180
375    if worker_id == 0 {
376        // TODO: buffered_seg_part.clear(max(1, num_workers - 1));
377        // TODO: update processed_samples
378        // TODO: store_contig_batch if needed
379        // TODO: flush archive buffers
380        // TODO: if adaptive_compression: switch back to main queue
381    } else if worker_id == 1 {
382        // TODO: update processed_samples (same as thread 0!)
383        // TODO: store_contig_batch if needed (same as thread 0!)
384        // TODO: flush archive buffers (same as thread 0!)
385    }
386
387    // Barrier 4: All ready to continue
388    barrier.wait();
389}
390
391/// Find new splitters for a hard contig
392///
393/// Matches C++ AGC's find_new_splitters (agc_compressor.cpp:2046-2077)
394///
395/// Algorithm:
396/// 1. Extract all k-mers from contig and find singletons
397/// 2. Exclude k-mers present in reference genome (singletons)
398/// 3. Exclude k-mers present in reference genome (duplicates)
399/// 4. Add remaining k-mers to thread-local splitter vector
400///
401/// # Arguments
402/// * `contig` - The contig sequence that needs new splitters
403/// * `thread_id` - Worker thread ID for vv_splitters indexing
404/// * `shared` - Shared compressor state with reference k-mers
405fn find_new_splitters(
406    contig: &ragc_common::Contig,
407    thread_id: usize,
408    shared: &Arc<SharedCompressorState>,
409) {
410    use crate::kmer_extract::{enumerate_kmers, remove_non_singletons};
411
412    // Step 1: Extract k-mers from contig and find singletons
413    let mut v_contig_kmers = enumerate_kmers(contig, shared.kmer_length);
414    v_contig_kmers.sort_unstable();
415    remove_non_singletons(&mut v_contig_kmers, 0);
416
417    if shared.verbosity > 1 {
418        eprintln!(
419            "find_new_splitters: contig has {} singleton k-mers",
420            v_contig_kmers.len()
421        );
422    }
423
424    // Step 2: Exclude k-mers in reference genome (singletons)
425    // C++ AGC uses v_candidate_kmers_offset to skip some candidates
426    // For now, we use the full v_candidate_kmers (offset=0)
427    let mut v_tmp = Vec::with_capacity(v_contig_kmers.len());
428    set_difference(&v_contig_kmers, &shared.v_candidate_kmers, &mut v_tmp);
429
430    if shared.verbosity > 1 {
431        eprintln!(
432            "find_new_splitters: {} k-mers after excluding reference singletons",
433            v_tmp.len()
434        );
435    }
436
437    // Step 3: Exclude k-mers in reference genome (duplicates)
438    v_contig_kmers.clear();
439    set_difference(&v_tmp, &shared.v_duplicated_kmers, &mut v_contig_kmers);
440
441    if shared.verbosity > 1 {
442        eprintln!(
443            "find_new_splitters: {} NEW splitters found",
444            v_contig_kmers.len()
445        );
446    }
447
448    // Step 4: Add to thread-local splitter vector
449    let mut vv_splitters = shared.vv_splitters.lock().unwrap();
450    vv_splitters[thread_id].extend(v_contig_kmers);
451}
452
453/// Compute set difference: result = a \ b (elements in a but not in b)
454///
455/// Both input vectors must be sorted. Matches std::set_difference behavior.
456fn set_difference(a: &[u64], b: &[u64], result: &mut Vec<u64>) {
457    result.clear();
458    let mut i = 0;
459    let mut j = 0;
460
461    while i < a.len() && j < b.len() {
462        if a[i] < b[j] {
463            result.push(a[i]);
464            i += 1;
465        } else if a[i] > b[j] {
466            j += 1;
467        } else {
468            // a[i] == b[j], skip
469            i += 1;
470            j += 1;
471        }
472    }
473
474    // Add remaining elements from a
475    result.extend_from_slice(&a[i..]);
476}
477
478/// Handle new splitters stage synchronization
479///
480/// Matches C++ AGC's new_splitters logic (agc_compressor.cpp:1187-1237)
481/// See BARRIER_USAGE_PATTERN.md for detailed documentation.
482fn handle_new_splitters_stage(
483    worker_id: usize,
484    num_workers: usize,
485    barrier: &Arc<Barrier>,
486    shared: &Arc<SharedCompressorState>,
487) {
488    // Barrier 1: All workers arrive
489    barrier.wait();
490
491    // bloom_insert logic: Insert new splitters into bloom filter and hash set
492    // In C++ AGC, this is a lambda defined inline (lines 1191-1209)
493    //
494    // Multi-threaded execution: All threads merge in parallel
495    // Single-threaded: Only thread 0 does the merge
496    if num_workers > 1 || worker_id == 0 {
497        let mut splitters = shared.splitters.lock().unwrap();
498        let mut bloom = shared.bloom_splitters.lock().unwrap();
499        let mut vv_splitters = shared.vv_splitters.lock().unwrap();
500
501        let mut total_new = 0;
502        for thread_splitters in vv_splitters.iter_mut() {
503            total_new += thread_splitters.len();
504            for &kmer in thread_splitters.iter() {
505                splitters.insert(kmer);
506                bloom.insert(kmer);
507            }
508            thread_splitters.clear();
509        }
510
511        if shared.verbosity > 0 && total_new > 0 {
512            eprintln!("Adaptive mode: Added {} new splitters", total_new);
513            eprintln!("Total splitters: {}", splitters.len());
514        }
515
516        // Check bloom filter filling factor and resize if > 0.3 (matches C++ AGC)
517        let filling_factor = bloom.filling_factor();
518        if filling_factor > 0.3 {
519            if shared.verbosity > 1 {
520                eprintln!(
521                    "Bloom filter filling factor {:.2}, resizing...",
522                    filling_factor
523                );
524            }
525
526            // Resize to accommodate current + expected growth
527            let new_size_bits = (splitters.len() as f64 / 0.25) as usize * 8; // 25% target fill
528            bloom.resize(new_size_bits);
529
530            // Re-insert all splitters
531            for &kmer in splitters.iter() {
532                bloom.insert(kmer);
533            }
534
535            if shared.verbosity > 1 {
536                eprintln!("Bloom filter resized to {} bits", new_size_bits);
537            }
538        }
539    }
540
541    // Thread 0: Re-enqueue hard contigs and switch queues
542    // Matches C++ AGC lines 1216-1236
543    if worker_id == 0 {
544        let mut raw_contigs = shared.raw_contigs.lock().unwrap();
545
546        if shared.verbosity > 0 && !raw_contigs.is_empty() {
547            eprintln!(
548                "Adaptive mode: Re-enqueueing {} hard contigs for reprocessing",
549                raw_contigs.len()
550            );
551        }
552
553        // Re-enqueue hard contigs into aux_queue with HardContigs stage
554        // Priority = 1 (higher than normal contigs), cost = sequence length
555        for (sample_name, contig_name, sequence) in raw_contigs.drain(..) {
556            let cost = sequence.len();
557            shared.aux_queue.emplace(
558                Task::new_contig(
559                    sample_name,
560                    contig_name,
561                    sequence,
562                    ContigProcessingStage::HardContigs,
563                ),
564                1, // priority
565                cost,
566            );
567        }
568
569        // Enqueue registration sync tokens for next round
570        // Priority = 0 (lowest), cost = 0
571        shared.aux_queue.emplace_many_no_cost(
572            Task::new_sync(ContigProcessingStage::Registration),
573            0, // priority
574            num_workers,
575        );
576
577        // Switch working queue to aux queue
578        // Matches C++ AGC: pq_contigs_desc_working = pq_contigs_desc_aux
579        let mut working_queue = shared.working_queue.lock().unwrap();
580        *working_queue = Arc::clone(&shared.aux_queue);
581
582        if shared.verbosity > 1 {
583            eprintln!("Switched to auxiliary queue for hard contig reprocessing");
584        }
585    }
586
587    // Barrier 2: Ready to continue with new splitters
588    // All workers synchronized after splitter merge
589    barrier.wait();
590}
591
592/// Compress a contig task
593///
594/// Matches C++ AGC's compress_contig (agc_compressor.cpp:2000-2054)
595///
596/// Returns:
597/// - true: Contig compressed successfully
598/// - false: Contig didn't compress well (needs adaptive splitters)
599fn compress_contig_task(
600    task: &Task,
601    worker_id: usize,
602    _barrier: &Arc<Barrier>,
603    shared: &Arc<SharedCompressorState>,
604) -> bool {
605    // Create compression context from shared state
606    let ctx = CompressionContext {
607        splitters: Arc::clone(&shared.splitters),
608        bloom_splitters: Arc::clone(&shared.bloom_splitters),
609        buffered_segments: Arc::clone(&shared.buffered_segments),
610        kmer_length: shared.kmer_length,
611        adaptive_mode: shared.adaptive_mode,
612        map_segments: Arc::clone(&shared.map_segments),
613        map_segments_terminators: Arc::clone(&shared.map_segments_terminators),
614        concatenated_genomes: shared.concatenated_genomes,
615    };
616
617    // Call the actual compression function
618    let success = compress_contig(&task.sample_name, &task.contig_name, &task.sequence, &ctx);
619
620    // Adaptive mode: Handle hard contigs (C++ AGC agc_compressor.cpp:2033-2039)
621    if shared.adaptive_mode
622        && !success
623        && task.stage == ContigProcessingStage::AllContigs
624        && task.sequence.len() >= 1000
625    {
626        // This is a hard contig - no splitters found, and it's large enough to matter
627        if shared.verbosity > 1 {
628            eprintln!(
629                "Hard contig detected: {}/{} ({} bp)",
630                task.sample_name,
631                task.contig_name,
632                task.sequence.len()
633            );
634        }
635
636        // Find new splitters from this contig
637        find_new_splitters(&task.sequence, worker_id, shared);
638
639        // Store contig for reprocessing after NewSplitters barrier
640        let mut raw_contigs = shared.raw_contigs.lock().unwrap();
641        raw_contigs.push((
642            task.sample_name.clone(),
643            task.contig_name.clone(),
644            task.sequence.clone(),
645        ));
646
647        // Return false to indicate contig wasn't processed yet
648        return false;
649    }
650
651    success
652}
653
654/// Register segments - assign group IDs to NEW segments
655///
656/// Matches C++ AGC's register_segments (agc_compressor.cpp:954-971)
657///
658/// Called by leader thread only (thread 0) during registration barrier.
659///
660/// # Steps
661/// 1. Sort KNOWN segments by (sample, contig, part_no)
662/// 2. Process NEW segments - assign group IDs
663/// 3. Register archive streams (placeholder for now)
664/// 4. Distribute raw group segments (if applicable)
665/// 5. Restart read pointer for parallel storage
666fn register_segments(shared: &Arc<SharedCompressorState>) {
667    let mut buffered = shared.buffered_segments.lock().unwrap();
668
669    // Step 1: Sort KNOWN segments in parallel
670    // TODO: Implement parallel sort with rayon
671    // For now: sequential sort is already done in BufferedSegments
672    buffered.sort_known(1); // Single-threaded for now
673
674    // Step 2: Process NEW segments - assign group IDs
675    let no_new = buffered.process_new();
676
677    drop(buffered);
678
679    if no_new > 0 {
680        // Step 3: Register archive streams for new groups
681        let mut no_segments = shared.no_segments.lock().unwrap();
682        let current_no_segments = *no_segments;
683
684        if let Some(archive_mutex) = &shared.archive {
685            let mut archive = archive_mutex.lock().unwrap();
686
687            // Register streams for each new group
688            // Each group gets two streams: "seg-N" (delta) and "seg_dN" (reference)
689            for i in 0..no_new {
690                let seg_num = current_no_segments + i;
691                // Use AGC v3 format (version 3000)
692                archive.register_stream(&ragc_common::stream_delta_name(3000, seg_num));
693                archive.register_stream(&ragc_common::stream_ref_name(3000, seg_num));
694            }
695        }
696
697        *no_segments += no_new;
698        drop(no_segments);
699
700        // Step 4: Resize v_segments vector to accommodate new groups
701        let no_segments_value = *shared.no_segments.lock().unwrap();
702        let mut v_segments = shared.v_segments.lock().unwrap();
703        v_segments.resize_with(no_segments_value as usize, || None);
704    }
705
706    // Step 5: Distribute raw group segments (if applicable)
707    if shared.no_raw_groups > 0 {
708        let buffered = shared.buffered_segments.lock().unwrap();
709        buffered.distribute_segments(0, 0, shared.no_raw_groups);
710    }
711
712    // Step 6: Restart read pointer for parallel storage
713    let buffered = shared.buffered_segments.lock().unwrap();
714    buffered.restart_read_vec();
715}
716
717/// Store segments - write all buffered segments to archive
718///
719/// Matches C++ AGC's store_segments (agc_compressor.cpp:974-1050)
720///
721/// Called by ALL worker threads in parallel during registration barrier.
722///
723/// # Algorithm
724/// 1. Atomic get_vec_id() to claim a block of groups
725/// 2. For each group in block:
726///    - get_part() to pop segments
727///    - Create CSegment object on first use (lazy initialization)
728///    - Update map_segments and map_segments_terminators (CRITICAL!)
729///    - Write segment to archive
730/// 3. Batch collection metadata updates (every 32 segments)
731fn store_segments(_worker_id: usize, shared: &Arc<SharedCompressorState>) {
732    const MAX_BUFF_SIZE: usize = 32;
733    let mut buffered_placements: Vec<SegmentPlacement> = Vec::with_capacity(MAX_BUFF_SIZE);
734
735    let buffered = shared.buffered_segments.lock().unwrap();
736
737    loop {
738        // Step 1: Atomic block allocation
739        let block_group_id = buffered.get_vec_id();
740
741        if block_group_id < 0 {
742            break; // No more blocks
743        }
744
745        // Step 2: Process groups in block (backwards iteration)
746        for group_id in
747            (block_group_id - crate::segment_buffer::PART_ID_STEP + 1..=block_group_id).rev()
748        {
749            if buffered.is_empty_part(group_id) {
750                continue;
751            }
752
753            // Step 3: Process all segments in this group
754            while let Some((
755                kmer1,
756                kmer2,
757                sample_name,
758                contig_name,
759                seg_data,
760                is_rev_comp,
761                seg_part_no,
762            )) = buffered.get_part(group_id)
763            {
764                // Step 4: Create SegmentGroup on first use (lazy initialization)
765                let mut v_segments = shared.v_segments.lock().unwrap();
766                if group_id >= 0
767                    && (group_id as usize) < v_segments.len()
768                    && v_segments[group_id as usize].is_none()
769                {
770                    // Get stream IDs for this group
771                    if let Some(archive_mutex) = &shared.archive {
772                        let archive = archive_mutex.lock().unwrap();
773                        let stream_id = archive
774                            .get_stream_id(&ragc_common::stream_delta_name(3000, group_id as u32))
775                            .unwrap_or(0);
776                        let ref_stream_id = archive
777                            .get_stream_id(&ragc_common::stream_ref_name(3000, group_id as u32))
778                            .unwrap_or(0);
779                        drop(archive);
780
781                        v_segments[group_id as usize] =
782                            Some(SegmentGroup::new(group_id as u32, stream_id, ref_stream_id));
783                    }
784                }
785
786                // Step 5: Write segment to archive and get in_group_id
787                let mut in_group_id = 0;
788                if group_id >= 0 && (group_id as usize) < v_segments.len() {
789                    if let Some(segment_group) = &mut v_segments[group_id as usize] {
790                        if let Some(archive_mutex) = &shared.archive {
791                            let mut archive = archive_mutex.lock().unwrap();
792                            if let Ok(id) = segment_group.add_segment(&seg_data, &mut archive) {
793                                in_group_id = id;
794                            }
795                        }
796                    }
797                }
798                drop(v_segments);
799
800                // Step 6: Buffer collection metadata
801                buffered_placements.push(SegmentPlacement {
802                    sample_name,
803                    contig_name,
804                    place: seg_part_no as usize,
805                    group_id: group_id as u32,
806                    in_group_id,
807                    is_rev_comp,
808                    raw_length: seg_data.len() as u32,
809                });
810
811                // Flush batch if full
812                if buffered_placements.len() >= MAX_BUFF_SIZE {
813                    if let Some(collection_mutex) = &shared.collection {
814                        let mut collection = collection_mutex.lock().unwrap();
815                        for placement in &buffered_placements {
816                            let _ = collection.add_segment_placed(
817                                &placement.sample_name,
818                                &placement.contig_name,
819                                placement.place,
820                                placement.group_id,
821                                placement.in_group_id,
822                                placement.is_rev_comp,
823                                placement.raw_length,
824                            );
825                        }
826                    }
827                    buffered_placements.clear();
828                }
829
830                // Step 6: Update map_segments (CRITICAL REGISTRATION!)
831                let key = (kmer1, kmer2);
832                let mut map_segments = shared.map_segments.lock().unwrap();
833                map_segments
834                    .entry(key)
835                    .and_modify(|existing| {
836                        // Keep lower group_id (earlier samples are reference)
837                        if (group_id as u32) < *existing {
838                            *existing = group_id as u32;
839                        }
840                    })
841                    .or_insert(group_id as u32);
842                drop(map_segments);
843
844                // Step 7: Update map_segments_terminators (CRITICAL!)
845                if kmer1 != crate::contig_compression::MISSING_KMER
846                    && kmer2 != crate::contig_compression::MISSING_KMER
847                {
848                    let mut terminators = shared.map_segments_terminators.lock().unwrap();
849
850                    // Add k2 to k1's terminator list
851                    let list1 = terminators.entry(kmer1).or_insert_with(Vec::new);
852                    if !list1.contains(&kmer2) {
853                        list1.push(kmer2);
854                        list1.sort_unstable();
855                    }
856
857                    // Add k1 to k2's terminator list (if different)
858                    if kmer1 != kmer2 {
859                        let list2 = terminators.entry(kmer2).or_insert_with(Vec::new);
860                        if !list2.contains(&kmer1) {
861                            list2.push(kmer1);
862                            list2.sort_unstable();
863                        }
864                    }
865
866                    drop(terminators);
867                }
868            }
869        }
870    }
871
872    // Step 9: Final flush of remaining placements
873    if !buffered_placements.is_empty() {
874        if let Some(collection_mutex) = &shared.collection {
875            let mut collection = collection_mutex.lock().unwrap();
876            for placement in &buffered_placements {
877                let _ = collection.add_segment_placed(
878                    &placement.sample_name,
879                    &placement.contig_name,
880                    placement.place,
881                    placement.group_id,
882                    placement.in_group_id,
883                    placement.is_rev_comp,
884                    placement.raw_length,
885                );
886            }
887        }
888    }
889}
890
891// ============================================================================
892// Phase 4: Main Compression Flow
893// ============================================================================
894
895/// Create an AGC archive from FASTA files - complete end-to-end pipeline
896///
897/// This is the top-level API for creating AGC archives using the streaming pipeline.
898///
899/// # Arguments
900/// * `output_path` - Path to write the AGC archive
901/// * `sample_files` - Vector of (sample_name, file_path) pairs
902/// * `splitters` - Set of splitter k-mers (from determine_splitters)
903/// * `kmer_length` - K-mer length for segmentation
904/// * `segment_size` - Segment size parameter for collection
905/// * `num_threads` - Number of threads to use
906/// * `adaptive_mode` - Enable adaptive splitter finding
907/// * `concatenated_genomes` - Treat all contigs as one sample
908/// * `verbosity` - Verbosity level (0=quiet, 1=normal, 2=verbose)
909///
910/// # Returns
911/// Ok(()) on success, Err on failure
912pub fn create_agc_archive(
913    output_path: &str,
914    sample_files: Vec<(String, String)>,
915    splitters: HashSet<u64>,
916    candidate_kmers: HashSet<u64>,
917    duplicated_kmers: HashSet<u64>,
918    kmer_length: usize,
919    segment_size: u32,
920    num_threads: usize,
921    adaptive_mode: bool,
922    concatenated_genomes: bool,
923    verbosity: usize,
924) -> anyhow::Result<()> {
925    use ragc_common::CollectionV3;
926
927    if sample_files.is_empty() {
928        return Ok(());
929    }
930
931    let num_samples = sample_files.len();
932
933    // Create archive for writing
934    let mut archive = Archive::new_writer();
935    archive.open(output_path)?;
936
937    // Write file_type_info stream (C++ AGC compatibility)
938    {
939        let mut data = Vec::new();
940        let append_str = |data: &mut Vec<u8>, s: &str| {
941            data.extend_from_slice(s.as_bytes());
942            data.push(0);
943        };
944
945        append_str(&mut data, "producer");
946        append_str(&mut data, "ragc");
947
948        append_str(&mut data, "producer_version_major");
949        append_str(&mut data, &ragc_common::AGC_FILE_MAJOR.to_string());
950
951        append_str(&mut data, "producer_version_minor");
952        append_str(&mut data, &ragc_common::AGC_FILE_MINOR.to_string());
953
954        append_str(&mut data, "producer_version_build");
955        append_str(&mut data, "0");
956
957        append_str(&mut data, "file_version_major");
958        append_str(&mut data, &ragc_common::AGC_FILE_MAJOR.to_string());
959
960        append_str(&mut data, "file_version_minor");
961        append_str(&mut data, &ragc_common::AGC_FILE_MINOR.to_string());
962
963        append_str(&mut data, "comment");
964        append_str(
965            &mut data,
966            &format!(
967                "RAGC v.{}.{}",
968                ragc_common::AGC_FILE_MAJOR,
969                ragc_common::AGC_FILE_MINOR
970            ),
971        );
972
973        let stream_id = archive.register_stream("file_type_info");
974        archive.add_part(stream_id, &data, 7)?; // 7 key-value pairs
975
976        if verbosity > 0 {
977            eprintln!(
978                "Wrote file_type_info: version {}.{}",
979                ragc_common::AGC_FILE_MAJOR,
980                ragc_common::AGC_FILE_MINOR
981            );
982        }
983    }
984
985    // Write params stream (kmer_length, min_match_len, pack_cardinality, segment_size)
986    {
987        let params_stream_id = archive.register_stream("params");
988        let mut params_data = Vec::new();
989
990        // Standard params format (16 bytes for C++ AGC compatibility)
991        params_data.extend_from_slice(&(kmer_length as u32).to_le_bytes());
992        params_data.extend_from_slice(&20u32.to_le_bytes()); // min_match_len (default)
993        params_data.extend_from_slice(&50u32.to_le_bytes()); // pack_cardinality (default)
994        params_data.extend_from_slice(&segment_size.to_le_bytes());
995
996        archive.add_part(params_stream_id, &params_data, 0)?;
997
998        if verbosity > 0 {
999            eprintln!(
1000                "Wrote params: k={}, segment_size={}",
1001                kmer_length, segment_size
1002            );
1003        }
1004    }
1005
1006    // Create collection for metadata
1007    let mut collection = CollectionV3::new();
1008    collection.set_config(segment_size, kmer_length as u32, None);
1009
1010    // Register collection streams in archive
1011    collection.prepare_for_compression(&mut archive)?;
1012
1013    // Wrap in Arc<Mutex<>> for shared access during compression
1014    let archive = Arc::new(Mutex::new(archive));
1015    let collection = Arc::new(Mutex::new(collection));
1016
1017    // Run compression pipeline
1018    compress_samples_streaming_with_archive(
1019        sample_files,
1020        splitters,
1021        candidate_kmers,
1022        duplicated_kmers,
1023        kmer_length,
1024        num_threads,
1025        adaptive_mode,
1026        concatenated_genomes,
1027        verbosity,
1028        Some(archive.clone()),
1029        Some(collection.clone()),
1030    )
1031    .map_err(|e| anyhow::anyhow!(e))?;
1032
1033    // Serialize collection metadata to archive
1034    {
1035        let mut archive_guard = archive.lock().unwrap();
1036        let mut collection_guard = collection.lock().unwrap();
1037
1038        if verbosity > 0 {
1039            eprintln!(
1040                "Serializing collection metadata for {} samples...",
1041                num_samples
1042            );
1043        }
1044
1045        // Write sample names
1046        collection_guard.store_batch_sample_names(&mut archive_guard)?;
1047
1048        // Write contig names and segment details
1049        collection_guard.store_contig_batch(&mut archive_guard, 0, num_samples)?;
1050
1051        if verbosity > 0 {
1052            eprintln!("Collection metadata serialized successfully");
1053        }
1054    }
1055
1056    // Close archive (writes footer)
1057    let mut archive = archive.lock().unwrap();
1058    archive.close()?;
1059
1060    if verbosity > 0 {
1061        eprintln!("AGC archive created: {}", output_path);
1062    }
1063
1064    Ok(())
1065}
1066
1067/// Compress sample files using streaming pipeline with priority queue and worker threads
1068///
1069/// Internal function - use create_agc_archive() for complete end-to-end pipeline
1070///
1071/// Matches C++ AGC's AddSampleFiles() (agc_compressor.cpp:2121-2270)
1072///
1073/// # Arguments
1074/// * `sample_files` - Vector of (sample_name, file_path) pairs
1075/// * `splitters` - Set of splitter k-mers
1076/// * `kmer_length` - K-mer length for segmentation
1077/// * `num_threads` - Number of threads to use
1078/// * `adaptive_mode` - Enable adaptive splitter finding
1079/// * `concatenated_genomes` - Treat all contigs as one sample
1080/// * `verbosity` - Verbosity level (0=quiet, 1=normal, 2=verbose)
1081/// * `archive` - Optional archive for writing compressed segments
1082/// * `collection` - Optional collection for metadata tracking
1083///
1084/// # Returns
1085/// Ok(()) on success, Err on failure
1086fn compress_samples_streaming_with_archive(
1087    sample_files: Vec<(String, String)>,
1088    splitters: HashSet<u64>,
1089    candidate_kmers: HashSet<u64>,
1090    duplicated_kmers: HashSet<u64>,
1091    kmer_length: usize,
1092    num_threads: usize,
1093    adaptive_mode: bool,
1094    concatenated_genomes: bool,
1095    verbosity: usize,
1096    archive: Option<Arc<Mutex<Archive>>>,
1097    collection: Option<Arc<Mutex<ragc_common::CollectionV3>>>,
1098) -> Result<(), String> {
1099    use crate::genome_io::GenomeIO;
1100
1101    if sample_files.is_empty() {
1102        return Ok(());
1103    }
1104
1105    // Step 1: Queue initialization (agc_compressor.cpp:2128-2132)
1106    let queue_capacity = std::cmp::max(2_u64 << 30, num_threads as u64 * (192_u64 << 20));
1107    let queue = Arc::new(BoundedPriorityQueue::new(1, queue_capacity as usize));
1108
1109    // Step 2: Worker thread count (agc_compressor.cpp:2134)
1110    let no_workers = if num_threads < 8 {
1111        num_threads
1112    } else {
1113        num_threads - 1
1114    };
1115
1116    // Convert reference k-mers to sorted vectors (for set_difference in adaptive mode)
1117    let mut v_candidate_kmers: Vec<u64> = candidate_kmers.into_iter().collect();
1118    v_candidate_kmers.sort_unstable();
1119
1120    let mut v_duplicated_kmers: Vec<u64> = duplicated_kmers.into_iter().collect();
1121    v_duplicated_kmers.sort_unstable();
1122
1123    // Step 3: Shared state initialization
1124    // Initialize bloom filter and populate with base splitters
1125    let mut bloom_filter = crate::bloom_filter::BloomFilter::new(
1126        splitters.len() * 8, // 8 bits per splitter
1127    );
1128    for &kmer in &splitters {
1129        bloom_filter.insert(kmer);
1130    }
1131
1132    // Create auxiliary queue for adaptive mode
1133    let aux_queue = Arc::new(BoundedPriorityQueue::new(1, usize::MAX));
1134
1135    let shared = Arc::new(SharedCompressorState {
1136        processed_bases: AtomicUsize::new(0),
1137        processed_samples: AtomicUsize::new(0),
1138        raw_contigs: Mutex::new(Vec::new()),
1139        verbosity,
1140        buffered_segments: Arc::new(Mutex::new(BufferedSegments::new(0))),
1141        splitters: Arc::new(Mutex::new(splitters)),
1142        bloom_splitters: Arc::new(Mutex::new(bloom_filter)),
1143        vv_splitters: Mutex::new(vec![Vec::new(); no_workers]),
1144        v_candidate_kmers,
1145        v_duplicated_kmers,
1146        kmer_length,
1147        adaptive_mode,
1148        map_segments: Arc::new(Mutex::new(HashMap::new())),
1149        map_segments_terminators: Arc::new(Mutex::new(HashMap::new())),
1150        concatenated_genomes,
1151        no_segments: Arc::new(Mutex::new(0)),
1152        no_raw_groups: 0,
1153        archive,
1154        v_segments: Arc::new(Mutex::new(Vec::new())),
1155        collection,
1156        aux_queue,
1157        working_queue: Mutex::new(Arc::clone(&queue)), // Start with main queue
1158    });
1159
1160    // Step 4: Thread spawning (agc_compressor.cpp:2136-2141)
1161    let barrier = Arc::new(Barrier::new(no_workers));
1162    let mut worker_handles = Vec::with_capacity(no_workers);
1163
1164    for worker_id in 0..no_workers {
1165        let q = queue.clone();
1166        let b = barrier.clone();
1167        let s = shared.clone();
1168
1169        let handle = std::thread::spawn(move || {
1170            worker_thread(worker_id, no_workers, q, b, s);
1171        });
1172
1173        worker_handles.push(handle);
1174    }
1175
1176    // Step 5: Main processing loop (agc_compressor.cpp:2163-2242)
1177    let mut sample_priority = usize::MAX;
1178    let mut _cnt_contigs_in_sample = 0;
1179    const PACK_CARDINALITY: usize = 50; // TODO: Get from config
1180
1181    for (sample_name, file_path) in sample_files {
1182        if verbosity > 0 {
1183            eprintln!("Processing sample: {} from {}", sample_name, file_path);
1184        }
1185
1186        // Open FASTA file
1187        let mut gio = match GenomeIO::open(&file_path) {
1188            Ok(g) => g,
1189            Err(e) => {
1190                eprintln!("Cannot open file {}: {}", file_path, e);
1191                continue;
1192            }
1193        };
1194
1195        let mut any_contigs_added = false;
1196
1197        // Read contigs from file
1198        while let Ok(Some((contig_name, sequence))) = gio.read_contig_raw() {
1199            if concatenated_genomes {
1200                // Concatenated mode: treat all contigs as one sample (empty name)
1201                // Matches C++ AGC behavior: all genomes concatenated into single sample
1202                let concat_sample_name = String::from("");
1203
1204                // Register contig with empty sample name
1205                if let Some(collection_mutex) = &shared.collection {
1206                    let mut collection = collection_mutex.lock().unwrap();
1207                    let _ = collection.register_sample_contig(&concat_sample_name, &contig_name);
1208                }
1209
1210                let cost = sequence.len();
1211                queue.emplace(
1212                    Task::new_contig(
1213                        concat_sample_name,
1214                        contig_name.clone(),
1215                        sequence,
1216                        ContigProcessingStage::AllContigs,
1217                    ),
1218                    sample_priority,
1219                    cost,
1220                );
1221
1222                any_contigs_added = true;
1223            } else {
1224                // Normal mode: one sample per file
1225                // Register sample/contig with collection
1226                if let Some(collection_mutex) = &shared.collection {
1227                    let mut collection = collection_mutex.lock().unwrap();
1228                    let _ = collection.register_sample_contig(&sample_name, &contig_name);
1229                }
1230
1231                let cost = sequence.len();
1232                queue.emplace(
1233                    Task::new_contig(
1234                        sample_name.clone(),
1235                        contig_name.clone(),
1236                        sequence,
1237                        ContigProcessingStage::AllContigs,
1238                    ),
1239                    sample_priority,
1240                    cost,
1241                );
1242
1243                any_contigs_added = true;
1244            }
1245        }
1246
1247        // Step 6: Send synchronization tokens after each sample (agc_compressor.cpp:2148-2155)
1248        if !concatenated_genomes && any_contigs_added {
1249            let sync_stage = if adaptive_mode {
1250                ContigProcessingStage::NewSplitters
1251            } else {
1252                ContigProcessingStage::Registration
1253            };
1254
1255            // Send exactly no_workers sync tokens (0 cost)
1256            queue.emplace_many_no_cost(Task::new_sync(sync_stage), sample_priority, no_workers);
1257
1258            sample_priority -= 1;
1259        }
1260    }
1261
1262    // Step 7: Signal completion and wait for workers (agc_compressor.cpp:2254-2256)
1263    queue.mark_completed();
1264
1265    for handle in worker_handles {
1266        handle.join().map_err(|_| "Worker thread panicked")?;
1267    }
1268
1269    if verbosity > 0 {
1270        let total_bases = shared.processed_bases.load(Ordering::Relaxed);
1271        eprintln!("Compression complete: {} bases processed", total_bases);
1272    }
1273
1274    Ok(())
1275}
1276
1277#[cfg(test)]
1278mod tests {
1279    use super::*;
1280
1281    #[test]
1282    fn test_shared_state_creation() {
1283        let queue = Arc::new(BoundedPriorityQueue::new(1, 1000));
1284        let state = SharedCompressorState::new(1, 21, false, false, 0, queue);
1285        assert_eq!(state.verbosity, 1);
1286        assert_eq!(state.kmer_length, 21);
1287        assert_eq!(state.adaptive_mode, false);
1288        assert_eq!(state.concatenated_genomes, false);
1289        assert_eq!(state.no_raw_groups, 0);
1290        assert_eq!(state.processed_bases.load(Ordering::Relaxed), 0);
1291        assert_eq!(state.processed_samples.load(Ordering::Relaxed), 0);
1292        assert_eq!(state.raw_contigs.lock().unwrap().len(), 0);
1293        assert_eq!(*state.no_segments.lock().unwrap(), 0);
1294    }
1295
1296    #[test]
1297    fn test_worker_thread_completion() {
1298        use std::thread;
1299
1300        let queue = Arc::new(BoundedPriorityQueue::new(1, 1000));
1301        let barrier = Arc::new(Barrier::new(2));
1302        let shared = Arc::new(SharedCompressorState::new(
1303            0,
1304            21,
1305            false,
1306            false,
1307            0,
1308            queue.clone(),
1309        ));
1310
1311        // Mark queue as completed (no tasks)
1312        queue.mark_completed();
1313
1314        // Spawn two workers
1315        let handles: Vec<_> = (0..2)
1316            .map(|worker_id| {
1317                let q = queue.clone();
1318                let b = barrier.clone();
1319                let s = shared.clone();
1320
1321                thread::spawn(move || {
1322                    worker_thread(worker_id, 2, q, b, s);
1323                })
1324            })
1325            .collect();
1326
1327        // Workers should exit immediately
1328        for handle in handles {
1329            handle.join().unwrap();
1330        }
1331    }
1332
1333    #[test]
1334    fn test_worker_thread_with_task() {
1335        use std::thread;
1336
1337        let queue = Arc::new(BoundedPriorityQueue::new(1, 1000));
1338        let barrier = Arc::new(Barrier::new(1));
1339        let shared = Arc::new(SharedCompressorState::new(
1340            0,
1341            21,
1342            false,
1343            false,
1344            0,
1345            queue.clone(),
1346        ));
1347
1348        // Enqueue a simple task
1349        queue.emplace(
1350            Task::new_contig(
1351                "sample1".to_string(),
1352                "chr1".to_string(),
1353                vec![0, 1, 2, 3, 0, 1, 2, 3], // 8 bases
1354                ContigProcessingStage::AllContigs,
1355            ),
1356            100,
1357            8,
1358        );
1359        queue.mark_completed();
1360
1361        // Spawn single worker
1362        let q = queue.clone();
1363        let b = barrier.clone();
1364        let s = shared.clone();
1365
1366        let handle = thread::spawn(move || {
1367            worker_thread(0, 1, q, b, s);
1368        });
1369
1370        handle.join().unwrap();
1371
1372        // Verify task was processed (compress_contig_task returns true)
1373        assert_eq!(shared.processed_bases.load(Ordering::Relaxed), 8);
1374    }
1375}