ragc_core/
streaming_compressor_queue.rs

1// Queue-based streaming compressor API
2// Provides simple push() interface with automatic backpressure and constant memory usage
3
4use crate::lz_diff::LZDiff;
5use crate::memory_bounded_queue::MemoryBoundedQueue;
6use crate::segment::split_at_splitters_with_size;
7use crate::splitters::determine_splitters;
8use anyhow::{Context, Result};
9use ragc_common::{Archive, CollectionV3, Contig, CONTIG_SEPARATOR};
10use std::collections::{HashMap, HashSet};
11use std::path::Path;
12use std::sync::atomic::{AtomicU32, Ordering};
13use std::sync::{Arc, Mutex};
14use std::thread::{self, JoinHandle};
15
16/// Configuration for the streaming queue-based compressor
17#[derive(Debug, Clone)]
18pub struct StreamingQueueConfig {
19    /// K-mer length for splitters
20    pub k: usize,
21
22    /// Segment size for splitting contigs
23    pub segment_size: usize,
24
25    /// Minimum match length for LZ encoding
26    pub min_match_len: usize,
27
28    /// ZSTD compression level (1-22)
29    pub compression_level: i32,
30
31    /// Number of worker threads
32    pub num_threads: usize,
33
34    /// Queue capacity in bytes (default: 2 GB, like C++ AGC)
35    pub queue_capacity: usize,
36
37    /// Verbosity level
38    pub verbosity: usize,
39
40    /// Adaptive mode: find new splitters for samples that can't be segmented well
41    /// (matches C++ AGC -a flag)
42    pub adaptive_mode: bool,
43}
44
45impl Default for StreamingQueueConfig {
46    fn default() -> Self {
47        Self {
48            k: 31,
49            segment_size: 60_000,
50            min_match_len: 20,
51            compression_level: 17,
52            num_threads: rayon::current_num_threads().max(4),
53            queue_capacity: 2 * 1024 * 1024 * 1024, // 2 GB like C++ AGC
54            verbosity: 1,
55            adaptive_mode: false, // Default matches C++ AGC (adaptive mode off)
56        }
57    }
58}
59
60/// Task to be processed by workers
61/// Note: Contig is type alias for Vec<u8>, so we store the name separately
62struct ContigTask {
63    sample_name: String,
64    contig_name: String,
65    data: Contig, // Vec<u8>
66}
67
68/// Segment group identified by flanking k-mers (matching batch mode)
69#[derive(Debug, Clone, PartialEq, Eq, Hash)]
70struct SegmentGroupKey {
71    kmer_front: u64,
72    kmer_back: u64,
73}
74
75/// Buffered segment waiting to be packed
76#[derive(Debug, Clone)]
77struct BufferedSegment {
78    sample_name: String,
79    contig_name: String,
80    seg_part_no: usize,
81    data: Contig,
82    is_rev_comp: bool,
83}
84
85/// Buffer for a segment group (packs 50 segments together)
86struct SegmentGroupBuffer {
87    group_id: u32,
88    stream_id: usize,                           // Delta stream for packed segments
89    ref_stream_id: usize,                       // Reference stream for first segment
90    reference_segment: Option<BufferedSegment>, // First segment (reference for LZ encoding)
91    segments: Vec<BufferedSegment>, // Up to PACK_CARDINALITY segments (EXCLUDING reference)
92    ref_written: bool,              // Whether reference has been written
93    segments_written: u32,          // Counter for delta segments written (NOT including reference)
94}
95
96impl SegmentGroupBuffer {
97    fn new(group_id: u32, stream_id: usize, ref_stream_id: usize) -> Self {
98        Self {
99            group_id,
100            stream_id,
101            ref_stream_id,
102            reference_segment: None,
103            segments: Vec::new(),
104            ref_written: false,
105            segments_written: 0,
106        }
107    }
108}
109
110/// Pack size (C++ AGC default)
111const PACK_CARDINALITY: usize = 50;
112/// First 16 groups are raw-only (no LZ encoding)
113const NO_RAW_GROUPS: u32 = 16;
114
115/// Streaming compressor with queue-based API
116///
117/// # Example
118/// ```no_run
119/// use ragc_core::{StreamingQueueCompressor, StreamingQueueConfig};
120/// use std::collections::HashSet;
121///
122/// # fn main() -> anyhow::Result<()> {
123/// let config = StreamingQueueConfig::default();
124/// let splitters = HashSet::new(); // Normally from reference
125/// let mut compressor = StreamingQueueCompressor::with_splitters(
126///     "output.agc",
127///     config,
128///     splitters
129/// )?;
130///
131/// // Push sequences (blocks when queue is full - automatic backpressure!)
132/// # let sequences = vec![("sample1".to_string(), "chr1".to_string(), vec![0u8; 1000])];
133/// for (sample, contig_name, data) in sequences {
134///     compressor.push(sample, contig_name, data)?;
135/// }
136///
137/// // Finalize - waits for all compression to complete
138/// compressor.finalize()?;
139/// # Ok(())
140/// # }
141/// ```
142pub struct StreamingQueueCompressor {
143    queue: Arc<MemoryBoundedQueue<ContigTask>>,
144    workers: Vec<JoinHandle<Result<()>>>,
145    collection: Arc<Mutex<CollectionV3>>,
146    splitters: Arc<HashSet<u64>>,
147    config: StreamingQueueConfig,
148    archive: Arc<Mutex<Archive>>,
149    segment_groups: Arc<Mutex<HashMap<SegmentGroupKey, SegmentGroupBuffer>>>,
150    group_counter: Arc<AtomicU32>, // Starts at 16 for LZ groups
151    reference_sample_name: Arc<Mutex<Option<String>>>, // First sample becomes reference
152}
153
154impl StreamingQueueCompressor {
155    /// Create a new streaming compressor with pre-computed splitters
156    ///
157    /// Use this when you already have splitters (e.g., from a reference genome)
158    ///
159    /// # Arguments
160    /// * `output_path` - Path to output AGC archive
161    /// * `config` - Compression configuration
162    /// * `splitters` - Pre-computed splitter k-mers
163    pub fn with_splitters(
164        output_path: impl AsRef<Path>,
165        config: StreamingQueueConfig,
166        splitters: HashSet<u64>,
167    ) -> Result<Self> {
168        let output_path = output_path.as_ref();
169        let archive_path = output_path.to_string_lossy().to_string();
170
171        if config.verbosity > 0 {
172            eprintln!("Initializing streaming compressor...");
173            eprintln!(
174                "  Queue capacity: {} GB",
175                config.queue_capacity / (1024 * 1024 * 1024)
176            );
177            eprintln!("  Worker threads: {}", config.num_threads);
178            eprintln!("  Splitters: {}", splitters.len());
179        }
180
181        // Create archive
182        let mut archive = Archive::new_writer();
183        archive.open(output_path)?;
184
185        // Create collection
186        let mut collection = CollectionV3::new();
187        collection.set_config(config.segment_size as u32, config.k as u32, None);
188
189        // CRITICAL: Register collection streams FIRST (C++ AGC compatibility)
190        // C++ AGC expects collection-samples at stream 0, collection-contigs at 1, collection-details at 2
191        collection.prepare_for_compression(&mut archive)?;
192
193        // Write file_type_info stream (after collection streams for C++ AGC compatibility)
194        {
195            let mut data = Vec::new();
196            let append_str = |data: &mut Vec<u8>, s: &str| {
197                data.extend_from_slice(s.as_bytes());
198                data.push(0);
199            };
200
201            append_str(&mut data, "producer");
202            append_str(&mut data, "ragc");
203            append_str(&mut data, "producer_version_major");
204            append_str(&mut data, &ragc_common::AGC_FILE_MAJOR.to_string());
205            append_str(&mut data, "producer_version_minor");
206            append_str(&mut data, &ragc_common::AGC_FILE_MINOR.to_string());
207            append_str(&mut data, "producer_version_build");
208            append_str(&mut data, "0");
209            append_str(&mut data, "file_version_major");
210            append_str(&mut data, &ragc_common::AGC_FILE_MAJOR.to_string());
211            append_str(&mut data, "file_version_minor");
212            append_str(&mut data, &ragc_common::AGC_FILE_MINOR.to_string());
213            append_str(&mut data, "comment");
214            append_str(
215                &mut data,
216                &format!(
217                    "RAGC v.{}.{}",
218                    ragc_common::AGC_FILE_MAJOR,
219                    ragc_common::AGC_FILE_MINOR
220                ),
221            );
222
223            let stream_id = archive.register_stream("file_type_info");
224            archive.add_part(stream_id, &data, 7)?; // 7 key-value pairs
225        }
226
227        // Write params stream
228        {
229            let params_stream_id = archive.register_stream("params");
230            let mut params_data = Vec::new();
231            params_data.extend_from_slice(&(config.k as u32).to_le_bytes());
232            params_data.extend_from_slice(&(config.min_match_len as u32).to_le_bytes());
233            params_data.extend_from_slice(&50u32.to_le_bytes()); // pack_cardinality (default)
234            params_data.extend_from_slice(&(config.segment_size as u32).to_le_bytes());
235            archive.add_part(params_stream_id, &params_data, 0)?;
236        }
237
238        // Write empty splitters stream (C++ AGC compatibility)
239        {
240            let splitters_data = Vec::new();
241            let stream_id = archive.register_stream("splitters");
242            archive.add_part(stream_id, &splitters_data, 0)?;
243        }
244
245        // Write empty segment-splitters stream (C++ AGC compatibility)
246        {
247            let seg_splitters_data = Vec::new();
248            let stream_id = archive.register_stream("segment-splitters");
249            archive.add_part(stream_id, &seg_splitters_data, 0)?;
250        }
251
252        let collection = Arc::new(Mutex::new(collection));
253        let archive = Arc::new(Mutex::new(archive));
254
255        // Create memory-bounded queue
256        let queue = Arc::new(MemoryBoundedQueue::new(config.queue_capacity));
257
258        let splitters = Arc::new(splitters);
259
260        // Segment grouping for LZ packing
261        let segment_groups = Arc::new(Mutex::new(HashMap::new()));
262        let group_counter = Arc::new(AtomicU32::new(NO_RAW_GROUPS)); // Start at 16 for LZ groups
263        let reference_sample_name = Arc::new(Mutex::new(None)); // Shared across all workers
264
265        // Spawn worker threads
266        let mut workers = Vec::new();
267        for worker_id in 0..config.num_threads {
268            let queue = Arc::clone(&queue);
269            let collection = Arc::clone(&collection);
270            let splitters = Arc::clone(&splitters);
271            let archive = Arc::clone(&archive);
272            let segment_groups = Arc::clone(&segment_groups);
273            let group_counter = Arc::clone(&group_counter);
274            let reference_sample_name = Arc::clone(&reference_sample_name);
275            let config = config.clone();
276
277            let handle = thread::spawn(move || {
278                worker_thread(
279                    worker_id,
280                    queue,
281                    collection,
282                    splitters,
283                    archive,
284                    segment_groups,
285                    group_counter,
286                    reference_sample_name,
287                    config,
288                )
289            });
290
291            workers.push(handle);
292        }
293
294        if config.verbosity > 0 {
295            eprintln!("Ready to receive sequences!");
296        }
297
298        Ok(Self {
299            queue,
300            workers,
301            collection,
302            splitters,
303            config,
304            archive,
305            segment_groups,
306            group_counter,
307            reference_sample_name,
308        })
309    }
310
311    /// Create compressor and determine splitters from first contig
312    ///
313    /// **Note**: This requires at least one contig to be pushed before workers start.
314    /// Consider using `with_splitters()` instead if you have a reference genome.
315    pub fn new(output_path: impl AsRef<Path>, config: StreamingQueueConfig) -> Result<Self> {
316        // Start with empty splitters - will be determined from first push
317        Self::with_splitters(output_path, config, HashSet::new())
318    }
319
320    /// Push a contig to the compression queue
321    ///
322    /// **BLOCKS** if the queue is full (automatic backpressure!)
323    ///
324    /// # Arguments
325    /// * `sample_name` - Name of the sample
326    /// * `contig_name` - Name of the contig
327    /// * `data` - Contig sequence data (Vec<u8>)
328    ///
329    /// # Example
330    /// ```no_run
331    /// # use ragc_core::{StreamingQueueCompressor, StreamingQueueConfig};
332    /// # use std::collections::HashSet;
333    /// # let mut compressor = StreamingQueueCompressor::with_splitters("out.agc", StreamingQueueConfig::default(), HashSet::new())?;
334    /// compressor.push("sample1".to_string(), "chr1".to_string(), vec![b'A', b'T', b'G', b'C'])?;
335    /// # Ok::<(), anyhow::Error>(())
336    /// ```
337    pub fn push(&mut self, sample_name: String, contig_name: String, data: Contig) -> Result<()> {
338        // If no splitters yet, determine from this contig
339        if self.splitters.is_empty() && self.workers.is_empty() {
340            if self.config.verbosity > 0 {
341                eprintln!("Determining splitters from first contig...");
342            }
343
344            let (splitters, _, _) =
345                determine_splitters(&[data.clone()], self.config.k, self.config.segment_size);
346
347            if self.config.verbosity > 0 {
348                eprintln!("Found {} splitters", splitters.len());
349            }
350
351            // Update splitters and spawn workers
352            self.splitters = Arc::new(splitters);
353
354            // Spawn workers now that we have splitters
355            for worker_id in 0..self.config.num_threads {
356                let queue = Arc::clone(&self.queue);
357                let collection = Arc::clone(&self.collection);
358                let splitters = Arc::clone(&self.splitters);
359                let archive = Arc::clone(&self.archive);
360                let segment_groups = Arc::clone(&self.segment_groups);
361                let group_counter = Arc::clone(&self.group_counter);
362                let reference_sample_name = Arc::clone(&self.reference_sample_name);
363                let config = self.config.clone();
364
365                let handle = thread::spawn(move || {
366                    worker_thread(
367                        worker_id,
368                        queue,
369                        collection,
370                        splitters,
371                        archive,
372                        segment_groups,
373                        group_counter,
374                        reference_sample_name,
375                        config,
376                    )
377                });
378
379                self.workers.push(handle);
380            }
381
382            if self.config.verbosity > 0 {
383                eprintln!("Workers spawned and ready!");
384            }
385        }
386
387        // Register contig in collection
388        {
389            let mut collection = self.collection.lock().unwrap();
390            collection
391                .register_sample_contig(&sample_name, &contig_name)
392                .context("Failed to register contig")?;
393        }
394
395        // Set first sample as reference (multi-file mode)
396        {
397            let mut ref_sample = self.reference_sample_name.lock().unwrap();
398            if ref_sample.is_none() {
399                if self.config.verbosity > 0 {
400                    eprintln!("Using first sample ({}) as reference", sample_name);
401                }
402                *ref_sample = Some(sample_name.clone());
403            }
404        }
405
406        // Calculate task size
407        let task_size = data.len();
408
409        // Create task
410        let task = ContigTask {
411            sample_name,
412            contig_name,
413            data,
414        };
415
416        // Push to queue (BLOCKS if queue is full!)
417        self.queue
418            .push(task, task_size)
419            .context("Failed to push to queue")?;
420
421        Ok(())
422    }
423
424    /// Finalize compression
425    ///
426    /// This will:
427    /// 1. Close the queue (no more pushes allowed)
428    /// 2. Wait for all worker threads to finish processing
429    /// 3. Write metadata to the archive
430    /// 4. Close the archive file
431    ///
432    /// # Example
433    /// ```no_run
434    /// # use ragc_core::{StreamingQueueCompressor, StreamingQueueConfig};
435    /// # use std::collections::HashSet;
436    /// # let mut compressor = StreamingQueueCompressor::with_splitters("out.agc", StreamingQueueConfig::default(), HashSet::new())?;
437    /// // ... push sequences ...
438    /// compressor.finalize()?;
439    /// # Ok::<(), anyhow::Error>(())
440    /// ```
441    pub fn finalize(self) -> Result<()> {
442        if self.config.verbosity > 0 {
443            eprintln!("Finalizing compression...");
444            eprintln!("  Closing queue...");
445        }
446
447        // Close queue - no more pushes allowed
448        self.queue.close();
449
450        if self.config.verbosity > 0 {
451            eprintln!("  Waiting for {} workers to finish...", self.workers.len());
452        }
453
454        // Wait for all workers to finish
455        for (i, handle) in self.workers.into_iter().enumerate() {
456            handle
457                .join()
458                .expect("Worker thread panicked")
459                .with_context(|| format!("Worker {} failed", i))?;
460        }
461
462        if self.config.verbosity > 0 {
463            eprintln!("All workers finished!");
464            eprintln!("Flushing remaining segment packs...");
465        }
466
467        // Flush all remaining partial packs
468        {
469            let mut groups = self.segment_groups.lock().unwrap();
470            let num_groups = groups.len();
471
472            for (key, buffer) in groups.iter_mut() {
473                // Flush if there are delta segments OR if reference hasn't been written
474                if !buffer.segments.is_empty() || !buffer.ref_written {
475                    if self.config.verbosity > 1 {
476                        eprintln!(
477                            "Flushing group {} with {} segments (k-mers: {:#x}, {:#x})",
478                            buffer.group_id,
479                            buffer.segments.len(),
480                            key.kmer_front,
481                            key.kmer_back
482                        );
483                    }
484                    flush_pack(buffer, &self.collection, &self.archive, &self.config)
485                        .context("Failed to flush remaining pack")?;
486                }
487            }
488
489            if self.config.verbosity > 0 {
490                eprintln!("Flushed {} segment groups", num_groups);
491            }
492        }
493
494        if self.config.verbosity > 0 {
495            eprintln!("Writing metadata...");
496        }
497
498        // Get total sample count for metadata writing
499        let num_samples = {
500            let coll = self.collection.lock().unwrap();
501            coll.get_no_samples()
502        };
503
504        // Write collection metadata to archive
505        {
506            let mut archive = self.archive.lock().unwrap();
507            let mut collection = self.collection.lock().unwrap();
508
509            // Write sample names
510            collection
511                .store_batch_sample_names(&mut archive)
512                .context("Failed to write sample names")?;
513
514            // Write contig names and segment details
515            collection
516                .store_contig_batch(&mut archive, 0, num_samples)
517                .context("Failed to write contig batch")?;
518
519            if self.config.verbosity > 0 {
520                eprintln!("Collection metadata written successfully");
521            }
522
523            // Close archive (writes footer)
524            archive.close().context("Failed to close archive")?;
525        }
526
527        if self.config.verbosity > 0 {
528            eprintln!("Compression complete!");
529        }
530
531        Ok(())
532    }
533
534    /// Get current queue statistics
535    pub fn queue_stats(&self) -> QueueStats {
536        QueueStats {
537            current_size_bytes: self.queue.current_size(),
538            current_items: self.queue.len(),
539            capacity_bytes: self.queue.capacity(),
540            is_closed: self.queue.is_closed(),
541        }
542    }
543}
544
545/// Queue statistics
546#[derive(Debug, Clone)]
547pub struct QueueStats {
548    pub current_size_bytes: usize,
549    pub current_items: usize,
550    pub capacity_bytes: usize,
551    pub is_closed: bool,
552}
553
554/// Flush a complete pack of segments (compress, LZ encode, write to archive)
555fn flush_pack(
556    buffer: &mut SegmentGroupBuffer,
557    collection: &Arc<Mutex<CollectionV3>>,
558    archive: &Arc<Mutex<Archive>>,
559    config: &StreamingQueueConfig,
560) -> Result<()> {
561    use crate::segment_compression::{compress_reference_segment, compress_segment_configured};
562
563    // Skip if no segments to write (but still write reference if present)
564    if buffer.segments.is_empty() && buffer.ref_written {
565        return Ok(());
566    }
567
568    let use_lz_encoding = buffer.group_id >= NO_RAW_GROUPS;
569
570    // Write reference segment if not already written (first pack for this group)
571    if !buffer.ref_written {
572        if let Some(ref_seg) = &buffer.reference_segment {
573            if config.verbosity > 1 {
574                eprintln!(
575                    "  Flushing group {}: reference from {}",
576                    buffer.group_id, ref_seg.sample_name
577                );
578            }
579            // Compress reference using adaptive compression
580            let (mut compressed, marker) = compress_reference_segment(&ref_seg.data)
581                .context("Failed to compress reference")?;
582            compressed.push(marker);
583
584            // Metadata stores the uncompressed size
585            let ref_size = ref_seg.data.len() as u64;
586
587            {
588                let mut arch = archive.lock().unwrap();
589                arch.add_part(buffer.ref_stream_id, &compressed, ref_size)
590                    .context("Failed to write reference")?;
591            }
592
593            // Register reference in collection with in_group_id = 0
594            {
595                let mut coll = collection.lock().unwrap();
596                coll.add_segment_placed(
597                    &ref_seg.sample_name,
598                    &ref_seg.contig_name,
599                    ref_seg.seg_part_no,
600                    buffer.group_id,
601                    0, // Reference is always at position 0
602                    ref_seg.is_rev_comp,
603                    ref_seg.data.len() as u32,
604                )
605                .context("Failed to register reference")?;
606            }
607
608            buffer.ref_written = true;
609        } else if config.verbosity > 1 {
610            eprintln!(
611                "  Group {}: flushing without reference (will use raw encoding)",
612                buffer.group_id
613            );
614        }
615    }
616
617    // Pack segments together
618    // Note: segments do NOT include the reference - it's stored separately
619    let mut packed_data = Vec::new();
620    for (idx_in_pack, seg) in buffer.segments.iter().enumerate() {
621        // Delta segments start at in_group_id = 1 (reference is 0)
622        let in_group_id = buffer.segments_written + idx_in_pack as u32 + 1;
623
624        let contig_data = if !use_lz_encoding || buffer.reference_segment.is_none() {
625            // Raw segment: groups 0-15 OR groups without reference
626            seg.data.clone()
627        } else {
628            // LZ-encoded segment (groups >= 16 with reference)
629            let reference = &buffer.reference_segment.as_ref().unwrap().data;
630            let mut lz_diff = LZDiff::new(config.min_match_len as u32);
631            lz_diff.prepare(reference);
632            lz_diff.encode(&seg.data)
633        };
634
635        // Add contig data
636        packed_data.extend_from_slice(&contig_data);
637        // Add separator (0xFF)
638        packed_data.push(CONTIG_SEPARATOR);
639
640        // Register in collection
641        {
642            let mut coll = collection.lock().unwrap();
643            coll.add_segment_placed(
644                &seg.sample_name,
645                &seg.contig_name,
646                seg.seg_part_no,
647                buffer.group_id,
648                in_group_id, // Start from 1 (reference is 0)
649                seg.is_rev_comp,
650                seg.data.len() as u32,
651            )
652            .context("Failed to register segment")?;
653        }
654    }
655
656    // Update counter (counts delta segments only, not reference)
657    buffer.segments_written += buffer.segments.len() as u32;
658
659    // Compress and write the packed data (if we have any delta segments)
660    if !packed_data.is_empty() {
661        // The metadata field stores the uncompressed size (including separators)
662        let total_raw_size = packed_data.len() as u64;
663
664        let mut compressed = compress_segment_configured(&packed_data, config.compression_level)
665            .context("Failed to compress pack")?;
666        compressed.push(0); // Marker 0 = plain ZSTD
667
668        {
669            let mut arch = archive.lock().unwrap();
670            arch.add_part(buffer.stream_id, &compressed, total_raw_size)
671                .context("Failed to write pack")?;
672        }
673    }
674
675    // Clear segments for next pack
676    buffer.segments.clear();
677
678    Ok(())
679}
680
681/// Worker thread that pulls from queue and compresses
682fn worker_thread(
683    worker_id: usize,
684    queue: Arc<MemoryBoundedQueue<ContigTask>>,
685    collection: Arc<Mutex<CollectionV3>>,
686    splitters: Arc<HashSet<u64>>,
687    archive: Arc<Mutex<Archive>>,
688    segment_groups: Arc<Mutex<HashMap<SegmentGroupKey, SegmentGroupBuffer>>>,
689    group_counter: Arc<AtomicU32>,
690    reference_sample_name: Arc<Mutex<Option<String>>>,
691    config: StreamingQueueConfig,
692) -> Result<()> {
693    let mut processed_count = 0;
694
695    loop {
696        // Pull from queue (blocks if empty, returns None when closed)
697        let Some(task) = queue.pull() else {
698            // Queue is closed and empty - we're done!
699            if config.verbosity > 1 {
700                eprintln!(
701                    "Worker {} finished ({} contigs processed)",
702                    worker_id, processed_count
703                );
704            }
705            break;
706        };
707
708        // Split into segments
709        let segments =
710            split_at_splitters_with_size(&task.data, &splitters, config.k, config.segment_size);
711
712        if config.verbosity > 2 {
713            eprintln!(
714                "Worker {} processing {} (split into {} segments)",
715                worker_id,
716                task.contig_name,
717                segments.len()
718            );
719        }
720
721        // Buffer segments for packing (matching batch mode)
722        for (place, segment) in segments.iter().enumerate() {
723            // Create grouping key from terminal k-mers
724            let key = SegmentGroupKey {
725                kmer_front: segment.front_kmer,
726                kmer_back: segment.back_kmer,
727            };
728
729            // Create buffered segment
730            let buffered = BufferedSegment {
731                sample_name: task.sample_name.clone(),
732                contig_name: task.contig_name.clone(),
733                seg_part_no: place,
734                data: segment.data.clone(),
735                is_rev_comp: false,
736            };
737
738            // Lock segment groups and add segment to buffer
739            {
740                let mut groups = segment_groups.lock().unwrap();
741
742                // Get or create buffer for this group
743                let buffer = groups.entry(key).or_insert_with(|| {
744                    // Allocate new group ID
745                    let group_id = group_counter.fetch_add(1, Ordering::SeqCst);
746
747                    // Register streams for this group
748                    let archive_version =
749                        ragc_common::AGC_FILE_MAJOR * 1000 + ragc_common::AGC_FILE_MINOR;
750                    let delta_stream_name =
751                        ragc_common::stream_delta_name(archive_version, group_id);
752                    let ref_stream_name = ragc_common::stream_ref_name(archive_version, group_id);
753
754                    let mut arch = archive.lock().unwrap();
755                    let stream_id = arch.register_stream(&delta_stream_name);
756                    let ref_stream_id = arch.register_stream(&ref_stream_name);
757                    drop(arch);
758
759                    SegmentGroupBuffer::new(group_id, stream_id, ref_stream_id)
760                });
761
762                // First segment in group becomes reference (regardless of sample)
763                if buffer.reference_segment.is_none() {
764                    // This is the first segment for this group - becomes reference
765                    if config.verbosity > 1 {
766                        eprintln!(
767                            "  Group {}: Setting reference from {}",
768                            buffer.group_id, task.sample_name
769                        );
770                    }
771                    buffer.reference_segment = Some(buffered.clone());
772                    // Don't add reference to segments - it's stored separately
773                } else {
774                    // Subsequent segments are deltas (LZ-encoded against reference)
775                    buffer.segments.push(buffered);
776                }
777
778                // Flush pack if buffer is full
779                if buffer.segments.len() >= PACK_CARDINALITY {
780                    flush_pack(buffer, &collection, &archive, &config)
781                        .context("Failed to flush pack")?;
782                }
783            }
784        }
785
786        processed_count += 1;
787    }
788
789    Ok(())
790}
791
792#[cfg(test)]
793mod tests {
794    use super::*;
795
796    #[test]
797    fn test_create_compressor() {
798        let config = StreamingQueueConfig::default();
799        let splitters = HashSet::new();
800        let compressor =
801            StreamingQueueCompressor::with_splitters("/tmp/test_stream.agc", config, splitters);
802        assert!(compressor.is_ok());
803    }
804
805    #[test]
806    fn test_queue_stats() {
807        let config = StreamingQueueConfig::default();
808        let splitters = HashSet::new();
809        let compressor =
810            StreamingQueueCompressor::with_splitters("/tmp/test_stats.agc", config, splitters)
811                .unwrap();
812
813        let stats = compressor.queue_stats();
814        assert_eq!(stats.current_size_bytes, 0);
815        assert_eq!(stats.current_items, 0);
816        assert_eq!(stats.capacity_bytes, 2 * 1024 * 1024 * 1024);
817        assert!(!stats.is_closed);
818    }
819
820    #[test]
821    fn test_push_and_finalize() {
822        let config = StreamingQueueConfig {
823            verbosity: 0, // Quiet for tests
824            ..Default::default()
825        };
826        let splitters = HashSet::new();
827        let mut compressor =
828            StreamingQueueCompressor::with_splitters("/tmp/test_push.agc", config, splitters)
829                .unwrap();
830
831        // Push a small contig
832        let data = vec![b'A'; 1000];
833        compressor
834            .push("sample1".to_string(), "chr1".to_string(), data)
835            .unwrap();
836
837        // Finalize
838        compressor.finalize().unwrap();
839    }
840}