1use crate::lz_diff::LZDiff;
5use crate::memory_bounded_queue::MemoryBoundedQueue;
6use crate::segment::split_at_splitters_with_size;
7use crate::splitters::determine_splitters;
8use anyhow::{Context, Result};
9use ragc_common::{Archive, CollectionV3, Contig, CONTIG_SEPARATOR};
10use std::collections::{HashMap, HashSet};
11use std::path::Path;
12use std::sync::atomic::{AtomicU32, Ordering};
13use std::sync::{Arc, Mutex};
14use std::thread::{self, JoinHandle};
15
16#[derive(Debug, Clone)]
18pub struct StreamingQueueConfig {
19 pub k: usize,
21
22 pub segment_size: usize,
24
25 pub min_match_len: usize,
27
28 pub compression_level: i32,
30
31 pub num_threads: usize,
33
34 pub queue_capacity: usize,
36
37 pub verbosity: usize,
39
40 pub adaptive_mode: bool,
43}
44
45impl Default for StreamingQueueConfig {
46 fn default() -> Self {
47 Self {
48 k: 31,
49 segment_size: 60_000,
50 min_match_len: 20,
51 compression_level: 17,
52 num_threads: rayon::current_num_threads().max(4),
53 queue_capacity: 2 * 1024 * 1024 * 1024, verbosity: 1,
55 adaptive_mode: false, }
57 }
58}
59
60struct ContigTask {
63 sample_name: String,
64 contig_name: String,
65 data: Contig, }
67
68#[derive(Debug, Clone, PartialEq, Eq, Hash)]
70struct SegmentGroupKey {
71 kmer_front: u64,
72 kmer_back: u64,
73}
74
75#[derive(Debug, Clone)]
77struct BufferedSegment {
78 sample_name: String,
79 contig_name: String,
80 seg_part_no: usize,
81 data: Contig,
82 is_rev_comp: bool,
83}
84
85struct SegmentGroupBuffer {
87 group_id: u32,
88 stream_id: usize, ref_stream_id: usize, reference_segment: Option<BufferedSegment>, segments: Vec<BufferedSegment>, ref_written: bool, segments_written: u32, }
95
96impl SegmentGroupBuffer {
97 fn new(group_id: u32, stream_id: usize, ref_stream_id: usize) -> Self {
98 Self {
99 group_id,
100 stream_id,
101 ref_stream_id,
102 reference_segment: None,
103 segments: Vec::new(),
104 ref_written: false,
105 segments_written: 0,
106 }
107 }
108}
109
110const PACK_CARDINALITY: usize = 50;
112const NO_RAW_GROUPS: u32 = 16;
114
115pub struct StreamingQueueCompressor {
143 queue: Arc<MemoryBoundedQueue<ContigTask>>,
144 workers: Vec<JoinHandle<Result<()>>>,
145 collection: Arc<Mutex<CollectionV3>>,
146 splitters: Arc<HashSet<u64>>,
147 config: StreamingQueueConfig,
148 archive: Arc<Mutex<Archive>>,
149 segment_groups: Arc<Mutex<HashMap<SegmentGroupKey, SegmentGroupBuffer>>>,
150 group_counter: Arc<AtomicU32>, reference_sample_name: Arc<Mutex<Option<String>>>, }
153
154impl StreamingQueueCompressor {
155 pub fn with_splitters(
164 output_path: impl AsRef<Path>,
165 config: StreamingQueueConfig,
166 splitters: HashSet<u64>,
167 ) -> Result<Self> {
168 let output_path = output_path.as_ref();
169 let archive_path = output_path.to_string_lossy().to_string();
170
171 if config.verbosity > 0 {
172 eprintln!("Initializing streaming compressor...");
173 eprintln!(
174 " Queue capacity: {} GB",
175 config.queue_capacity / (1024 * 1024 * 1024)
176 );
177 eprintln!(" Worker threads: {}", config.num_threads);
178 eprintln!(" Splitters: {}", splitters.len());
179 }
180
181 let mut archive = Archive::new_writer();
183 archive.open(output_path)?;
184
185 let mut collection = CollectionV3::new();
187 collection.set_config(config.segment_size as u32, config.k as u32, None);
188
189 collection.prepare_for_compression(&mut archive)?;
192
193 {
195 let mut data = Vec::new();
196 let append_str = |data: &mut Vec<u8>, s: &str| {
197 data.extend_from_slice(s.as_bytes());
198 data.push(0);
199 };
200
201 append_str(&mut data, "producer");
202 append_str(&mut data, "ragc");
203 append_str(&mut data, "producer_version_major");
204 append_str(&mut data, &ragc_common::AGC_FILE_MAJOR.to_string());
205 append_str(&mut data, "producer_version_minor");
206 append_str(&mut data, &ragc_common::AGC_FILE_MINOR.to_string());
207 append_str(&mut data, "producer_version_build");
208 append_str(&mut data, "0");
209 append_str(&mut data, "file_version_major");
210 append_str(&mut data, &ragc_common::AGC_FILE_MAJOR.to_string());
211 append_str(&mut data, "file_version_minor");
212 append_str(&mut data, &ragc_common::AGC_FILE_MINOR.to_string());
213 append_str(&mut data, "comment");
214 append_str(
215 &mut data,
216 &format!(
217 "RAGC v.{}.{}",
218 ragc_common::AGC_FILE_MAJOR,
219 ragc_common::AGC_FILE_MINOR
220 ),
221 );
222
223 let stream_id = archive.register_stream("file_type_info");
224 archive.add_part(stream_id, &data, 7)?; }
226
227 {
229 let params_stream_id = archive.register_stream("params");
230 let mut params_data = Vec::new();
231 params_data.extend_from_slice(&(config.k as u32).to_le_bytes());
232 params_data.extend_from_slice(&(config.min_match_len as u32).to_le_bytes());
233 params_data.extend_from_slice(&50u32.to_le_bytes()); params_data.extend_from_slice(&(config.segment_size as u32).to_le_bytes());
235 archive.add_part(params_stream_id, ¶ms_data, 0)?;
236 }
237
238 {
240 let splitters_data = Vec::new();
241 let stream_id = archive.register_stream("splitters");
242 archive.add_part(stream_id, &splitters_data, 0)?;
243 }
244
245 {
247 let seg_splitters_data = Vec::new();
248 let stream_id = archive.register_stream("segment-splitters");
249 archive.add_part(stream_id, &seg_splitters_data, 0)?;
250 }
251
252 let collection = Arc::new(Mutex::new(collection));
253 let archive = Arc::new(Mutex::new(archive));
254
255 let queue = Arc::new(MemoryBoundedQueue::new(config.queue_capacity));
257
258 let splitters = Arc::new(splitters);
259
260 let segment_groups = Arc::new(Mutex::new(HashMap::new()));
262 let group_counter = Arc::new(AtomicU32::new(NO_RAW_GROUPS)); let reference_sample_name = Arc::new(Mutex::new(None)); let mut workers = Vec::new();
267 for worker_id in 0..config.num_threads {
268 let queue = Arc::clone(&queue);
269 let collection = Arc::clone(&collection);
270 let splitters = Arc::clone(&splitters);
271 let archive = Arc::clone(&archive);
272 let segment_groups = Arc::clone(&segment_groups);
273 let group_counter = Arc::clone(&group_counter);
274 let reference_sample_name = Arc::clone(&reference_sample_name);
275 let config = config.clone();
276
277 let handle = thread::spawn(move || {
278 worker_thread(
279 worker_id,
280 queue,
281 collection,
282 splitters,
283 archive,
284 segment_groups,
285 group_counter,
286 reference_sample_name,
287 config,
288 )
289 });
290
291 workers.push(handle);
292 }
293
294 if config.verbosity > 0 {
295 eprintln!("Ready to receive sequences!");
296 }
297
298 Ok(Self {
299 queue,
300 workers,
301 collection,
302 splitters,
303 config,
304 archive,
305 segment_groups,
306 group_counter,
307 reference_sample_name,
308 })
309 }
310
311 pub fn new(output_path: impl AsRef<Path>, config: StreamingQueueConfig) -> Result<Self> {
316 Self::with_splitters(output_path, config, HashSet::new())
318 }
319
320 pub fn push(&mut self, sample_name: String, contig_name: String, data: Contig) -> Result<()> {
338 if self.splitters.is_empty() && self.workers.is_empty() {
340 if self.config.verbosity > 0 {
341 eprintln!("Determining splitters from first contig...");
342 }
343
344 let (splitters, _, _) =
345 determine_splitters(&[data.clone()], self.config.k, self.config.segment_size);
346
347 if self.config.verbosity > 0 {
348 eprintln!("Found {} splitters", splitters.len());
349 }
350
351 self.splitters = Arc::new(splitters);
353
354 for worker_id in 0..self.config.num_threads {
356 let queue = Arc::clone(&self.queue);
357 let collection = Arc::clone(&self.collection);
358 let splitters = Arc::clone(&self.splitters);
359 let archive = Arc::clone(&self.archive);
360 let segment_groups = Arc::clone(&self.segment_groups);
361 let group_counter = Arc::clone(&self.group_counter);
362 let reference_sample_name = Arc::clone(&self.reference_sample_name);
363 let config = self.config.clone();
364
365 let handle = thread::spawn(move || {
366 worker_thread(
367 worker_id,
368 queue,
369 collection,
370 splitters,
371 archive,
372 segment_groups,
373 group_counter,
374 reference_sample_name,
375 config,
376 )
377 });
378
379 self.workers.push(handle);
380 }
381
382 if self.config.verbosity > 0 {
383 eprintln!("Workers spawned and ready!");
384 }
385 }
386
387 {
389 let mut collection = self.collection.lock().unwrap();
390 collection
391 .register_sample_contig(&sample_name, &contig_name)
392 .context("Failed to register contig")?;
393 }
394
395 {
397 let mut ref_sample = self.reference_sample_name.lock().unwrap();
398 if ref_sample.is_none() {
399 if self.config.verbosity > 0 {
400 eprintln!("Using first sample ({}) as reference", sample_name);
401 }
402 *ref_sample = Some(sample_name.clone());
403 }
404 }
405
406 let task_size = data.len();
408
409 let task = ContigTask {
411 sample_name,
412 contig_name,
413 data,
414 };
415
416 self.queue
418 .push(task, task_size)
419 .context("Failed to push to queue")?;
420
421 Ok(())
422 }
423
424 pub fn finalize(self) -> Result<()> {
442 if self.config.verbosity > 0 {
443 eprintln!("Finalizing compression...");
444 eprintln!(" Closing queue...");
445 }
446
447 self.queue.close();
449
450 if self.config.verbosity > 0 {
451 eprintln!(" Waiting for {} workers to finish...", self.workers.len());
452 }
453
454 for (i, handle) in self.workers.into_iter().enumerate() {
456 handle
457 .join()
458 .expect("Worker thread panicked")
459 .with_context(|| format!("Worker {} failed", i))?;
460 }
461
462 if self.config.verbosity > 0 {
463 eprintln!("All workers finished!");
464 eprintln!("Flushing remaining segment packs...");
465 }
466
467 {
469 let mut groups = self.segment_groups.lock().unwrap();
470 let num_groups = groups.len();
471
472 for (key, buffer) in groups.iter_mut() {
473 if !buffer.segments.is_empty() || !buffer.ref_written {
475 if self.config.verbosity > 1 {
476 eprintln!(
477 "Flushing group {} with {} segments (k-mers: {:#x}, {:#x})",
478 buffer.group_id,
479 buffer.segments.len(),
480 key.kmer_front,
481 key.kmer_back
482 );
483 }
484 flush_pack(buffer, &self.collection, &self.archive, &self.config)
485 .context("Failed to flush remaining pack")?;
486 }
487 }
488
489 if self.config.verbosity > 0 {
490 eprintln!("Flushed {} segment groups", num_groups);
491 }
492 }
493
494 if self.config.verbosity > 0 {
495 eprintln!("Writing metadata...");
496 }
497
498 let num_samples = {
500 let coll = self.collection.lock().unwrap();
501 coll.get_no_samples()
502 };
503
504 {
506 let mut archive = self.archive.lock().unwrap();
507 let mut collection = self.collection.lock().unwrap();
508
509 collection
511 .store_batch_sample_names(&mut archive)
512 .context("Failed to write sample names")?;
513
514 collection
516 .store_contig_batch(&mut archive, 0, num_samples)
517 .context("Failed to write contig batch")?;
518
519 if self.config.verbosity > 0 {
520 eprintln!("Collection metadata written successfully");
521 }
522
523 archive.close().context("Failed to close archive")?;
525 }
526
527 if self.config.verbosity > 0 {
528 eprintln!("Compression complete!");
529 }
530
531 Ok(())
532 }
533
534 pub fn queue_stats(&self) -> QueueStats {
536 QueueStats {
537 current_size_bytes: self.queue.current_size(),
538 current_items: self.queue.len(),
539 capacity_bytes: self.queue.capacity(),
540 is_closed: self.queue.is_closed(),
541 }
542 }
543}
544
545#[derive(Debug, Clone)]
547pub struct QueueStats {
548 pub current_size_bytes: usize,
549 pub current_items: usize,
550 pub capacity_bytes: usize,
551 pub is_closed: bool,
552}
553
554fn flush_pack(
556 buffer: &mut SegmentGroupBuffer,
557 collection: &Arc<Mutex<CollectionV3>>,
558 archive: &Arc<Mutex<Archive>>,
559 config: &StreamingQueueConfig,
560) -> Result<()> {
561 use crate::segment_compression::{compress_reference_segment, compress_segment_configured};
562
563 if buffer.segments.is_empty() && buffer.ref_written {
565 return Ok(());
566 }
567
568 let use_lz_encoding = buffer.group_id >= NO_RAW_GROUPS;
569
570 if !buffer.ref_written {
572 if let Some(ref_seg) = &buffer.reference_segment {
573 if config.verbosity > 1 {
574 eprintln!(
575 " Flushing group {}: reference from {}",
576 buffer.group_id, ref_seg.sample_name
577 );
578 }
579 let (mut compressed, marker) = compress_reference_segment(&ref_seg.data)
581 .context("Failed to compress reference")?;
582 compressed.push(marker);
583
584 let ref_size = ref_seg.data.len() as u64;
586
587 {
588 let mut arch = archive.lock().unwrap();
589 arch.add_part(buffer.ref_stream_id, &compressed, ref_size)
590 .context("Failed to write reference")?;
591 }
592
593 {
595 let mut coll = collection.lock().unwrap();
596 coll.add_segment_placed(
597 &ref_seg.sample_name,
598 &ref_seg.contig_name,
599 ref_seg.seg_part_no,
600 buffer.group_id,
601 0, ref_seg.is_rev_comp,
603 ref_seg.data.len() as u32,
604 )
605 .context("Failed to register reference")?;
606 }
607
608 buffer.ref_written = true;
609 } else if config.verbosity > 1 {
610 eprintln!(
611 " Group {}: flushing without reference (will use raw encoding)",
612 buffer.group_id
613 );
614 }
615 }
616
617 let mut packed_data = Vec::new();
620 for (idx_in_pack, seg) in buffer.segments.iter().enumerate() {
621 let in_group_id = buffer.segments_written + idx_in_pack as u32 + 1;
623
624 let contig_data = if !use_lz_encoding || buffer.reference_segment.is_none() {
625 seg.data.clone()
627 } else {
628 let reference = &buffer.reference_segment.as_ref().unwrap().data;
630 let mut lz_diff = LZDiff::new(config.min_match_len as u32);
631 lz_diff.prepare(reference);
632 lz_diff.encode(&seg.data)
633 };
634
635 packed_data.extend_from_slice(&contig_data);
637 packed_data.push(CONTIG_SEPARATOR);
639
640 {
642 let mut coll = collection.lock().unwrap();
643 coll.add_segment_placed(
644 &seg.sample_name,
645 &seg.contig_name,
646 seg.seg_part_no,
647 buffer.group_id,
648 in_group_id, seg.is_rev_comp,
650 seg.data.len() as u32,
651 )
652 .context("Failed to register segment")?;
653 }
654 }
655
656 buffer.segments_written += buffer.segments.len() as u32;
658
659 if !packed_data.is_empty() {
661 let total_raw_size = packed_data.len() as u64;
663
664 let mut compressed = compress_segment_configured(&packed_data, config.compression_level)
665 .context("Failed to compress pack")?;
666 compressed.push(0); {
669 let mut arch = archive.lock().unwrap();
670 arch.add_part(buffer.stream_id, &compressed, total_raw_size)
671 .context("Failed to write pack")?;
672 }
673 }
674
675 buffer.segments.clear();
677
678 Ok(())
679}
680
681fn worker_thread(
683 worker_id: usize,
684 queue: Arc<MemoryBoundedQueue<ContigTask>>,
685 collection: Arc<Mutex<CollectionV3>>,
686 splitters: Arc<HashSet<u64>>,
687 archive: Arc<Mutex<Archive>>,
688 segment_groups: Arc<Mutex<HashMap<SegmentGroupKey, SegmentGroupBuffer>>>,
689 group_counter: Arc<AtomicU32>,
690 reference_sample_name: Arc<Mutex<Option<String>>>,
691 config: StreamingQueueConfig,
692) -> Result<()> {
693 let mut processed_count = 0;
694
695 loop {
696 let Some(task) = queue.pull() else {
698 if config.verbosity > 1 {
700 eprintln!(
701 "Worker {} finished ({} contigs processed)",
702 worker_id, processed_count
703 );
704 }
705 break;
706 };
707
708 let segments =
710 split_at_splitters_with_size(&task.data, &splitters, config.k, config.segment_size);
711
712 if config.verbosity > 2 {
713 eprintln!(
714 "Worker {} processing {} (split into {} segments)",
715 worker_id,
716 task.contig_name,
717 segments.len()
718 );
719 }
720
721 for (place, segment) in segments.iter().enumerate() {
723 let key = SegmentGroupKey {
725 kmer_front: segment.front_kmer,
726 kmer_back: segment.back_kmer,
727 };
728
729 let buffered = BufferedSegment {
731 sample_name: task.sample_name.clone(),
732 contig_name: task.contig_name.clone(),
733 seg_part_no: place,
734 data: segment.data.clone(),
735 is_rev_comp: false,
736 };
737
738 {
740 let mut groups = segment_groups.lock().unwrap();
741
742 let buffer = groups.entry(key).or_insert_with(|| {
744 let group_id = group_counter.fetch_add(1, Ordering::SeqCst);
746
747 let archive_version =
749 ragc_common::AGC_FILE_MAJOR * 1000 + ragc_common::AGC_FILE_MINOR;
750 let delta_stream_name =
751 ragc_common::stream_delta_name(archive_version, group_id);
752 let ref_stream_name = ragc_common::stream_ref_name(archive_version, group_id);
753
754 let mut arch = archive.lock().unwrap();
755 let stream_id = arch.register_stream(&delta_stream_name);
756 let ref_stream_id = arch.register_stream(&ref_stream_name);
757 drop(arch);
758
759 SegmentGroupBuffer::new(group_id, stream_id, ref_stream_id)
760 });
761
762 if buffer.reference_segment.is_none() {
764 if config.verbosity > 1 {
766 eprintln!(
767 " Group {}: Setting reference from {}",
768 buffer.group_id, task.sample_name
769 );
770 }
771 buffer.reference_segment = Some(buffered.clone());
772 } else {
774 buffer.segments.push(buffered);
776 }
777
778 if buffer.segments.len() >= PACK_CARDINALITY {
780 flush_pack(buffer, &collection, &archive, &config)
781 .context("Failed to flush pack")?;
782 }
783 }
784 }
785
786 processed_count += 1;
787 }
788
789 Ok(())
790}
791
792#[cfg(test)]
793mod tests {
794 use super::*;
795
796 #[test]
797 fn test_create_compressor() {
798 let config = StreamingQueueConfig::default();
799 let splitters = HashSet::new();
800 let compressor =
801 StreamingQueueCompressor::with_splitters("/tmp/test_stream.agc", config, splitters);
802 assert!(compressor.is_ok());
803 }
804
805 #[test]
806 fn test_queue_stats() {
807 let config = StreamingQueueConfig::default();
808 let splitters = HashSet::new();
809 let compressor =
810 StreamingQueueCompressor::with_splitters("/tmp/test_stats.agc", config, splitters)
811 .unwrap();
812
813 let stats = compressor.queue_stats();
814 assert_eq!(stats.current_size_bytes, 0);
815 assert_eq!(stats.current_items, 0);
816 assert_eq!(stats.capacity_bytes, 2 * 1024 * 1024 * 1024);
817 assert!(!stats.is_closed);
818 }
819
820 #[test]
821 fn test_push_and_finalize() {
822 let config = StreamingQueueConfig {
823 verbosity: 0, ..Default::default()
825 };
826 let splitters = HashSet::new();
827 let mut compressor =
828 StreamingQueueCompressor::with_splitters("/tmp/test_push.agc", config, splitters)
829 .unwrap();
830
831 let data = vec![b'A'; 1000];
833 compressor
834 .push("sample1".to_string(), "chr1".to_string(), data)
835 .unwrap();
836
837 compressor.finalize().unwrap();
839 }
840}