1use crate::contig_compression::{compress_contig, CompressionContext};
5use crate::priority_queue::{BoundedPriorityQueue, PopResult};
6use crate::segment_buffer::BufferedSegments;
7use crate::segment_compression::compress_reference_segment;
8use crate::task::{ContigProcessingStage, Task};
9use crate::zstd_pool::compress_segment_pooled;
10use ahash::{AHashMap, AHashSet};
11use ragc_common::{Archive, Contig};
12use std::sync::atomic::{AtomicUsize, Ordering};
13use std::sync::{Arc, Barrier, Mutex};
14
15#[derive(Debug, Clone)]
19struct SegmentPlacement {
20 sample_name: String,
21 contig_name: String,
22 place: usize, group_id: u32,
24 in_group_id: u32,
25 is_rev_comp: bool,
26 raw_length: u32, }
28
29struct SegmentGroup {
36 group_id: u32,
37 stream_id: usize, ref_stream_id: usize, reference: Option<Contig>, ref_written: bool, in_group_counter: u32, }
43
44impl SegmentGroup {
45 fn new(group_id: u32, stream_id: usize, ref_stream_id: usize) -> Self {
46 SegmentGroup {
47 group_id,
48 stream_id,
49 ref_stream_id,
50 reference: None,
51 ref_written: false,
52 in_group_counter: 0,
53 }
54 }
55
56 fn add_segment(&mut self, seg_data: &[u8], archive: &mut Archive) -> anyhow::Result<u32> {
61 if self.reference.is_none() {
62 let seg_vec = seg_data.to_vec();
64 self.reference = Some(seg_vec.clone());
65
66 let (compressed, marker) = compress_reference_segment(&seg_vec)?;
68
69 archive.add_part(self.ref_stream_id, &compressed, marker as u64)?;
71 self.ref_written = true;
72
73 let in_group_id = self.in_group_counter;
74 self.in_group_counter += 1;
75 Ok(in_group_id) } else {
77 let seg_vec = seg_data.to_vec();
80 let compressed = compress_segment_pooled(&seg_vec, 17)?;
81
82 archive.add_part(self.stream_id, &compressed, 0)?;
84
85 let in_group_id = self.in_group_counter;
86 self.in_group_counter += 1;
87 Ok(in_group_id)
88 }
89 }
90}
91
92pub struct SharedCompressorState {
99 pub processed_bases: AtomicUsize,
101
102 pub processed_samples: AtomicUsize,
104
105 pub raw_contigs: Mutex<Vec<(String, String, Vec<u8>)>>,
108
109 pub verbosity: usize,
111
112 pub buffered_segments: Arc<Mutex<BufferedSegments>>,
115
116 pub splitters: Arc<Mutex<AHashSet<u64>>>,
119
120 pub bloom_splitters: Arc<Mutex<crate::bloom_filter::BloomFilter>>,
123
124 pub vv_splitters: Mutex<Vec<Vec<u64>>>,
128
129 pub v_candidate_kmers: Vec<u64>,
133
134 pub v_duplicated_kmers: Vec<u64>,
138
139 pub kmer_length: usize,
141
142 pub adaptive_mode: bool,
144
145 pub map_segments: Arc<Mutex<AHashMap<(u64, u64), u32>>>,
149
150 pub map_segments_terminators: Arc<Mutex<AHashMap<u64, Vec<u64>>>>,
154
155 pub concatenated_genomes: bool,
157
158 pub no_segments: Arc<Mutex<u32>>,
161
162 pub no_raw_groups: u32,
165
166 pub archive: Option<Arc<Mutex<Archive>>>,
169
170 pub v_segments: Arc<Mutex<Vec<Option<SegmentGroup>>>>,
174
175 pub collection: Option<Arc<Mutex<ragc_common::CollectionV3>>>,
178
179 pub aux_queue: Arc<BoundedPriorityQueue<Task>>,
182
183 pub working_queue: Mutex<Arc<BoundedPriorityQueue<Task>>>,
186 }
190
191impl SharedCompressorState {
192 pub fn new(
194 verbosity: usize,
195 kmer_length: usize,
196 adaptive_mode: bool,
197 concatenated_genomes: bool,
198 no_raw_groups: u32,
199 main_queue: Arc<BoundedPriorityQueue<Task>>,
200 ) -> Self {
201 let bloom_filter = crate::bloom_filter::BloomFilter::new(80 * 1024);
204
205 let aux_queue = Arc::new(BoundedPriorityQueue::new(1, usize::MAX));
207
208 SharedCompressorState {
209 processed_bases: AtomicUsize::new(0),
210 processed_samples: AtomicUsize::new(0),
211 raw_contigs: Mutex::new(Vec::new()),
212 verbosity,
213 buffered_segments: Arc::new(Mutex::new(BufferedSegments::new(no_raw_groups as usize))),
214 splitters: Arc::new(Mutex::new(AHashSet::new())),
215 bloom_splitters: Arc::new(Mutex::new(bloom_filter)),
216 vv_splitters: Mutex::new(Vec::new()),
217 v_candidate_kmers: Vec::new(),
218 v_duplicated_kmers: Vec::new(),
219 kmer_length,
220 adaptive_mode,
221 map_segments: Arc::new(Mutex::new(AHashMap::new())),
222 map_segments_terminators: Arc::new(Mutex::new(AHashMap::new())),
223 concatenated_genomes,
224 no_segments: Arc::new(Mutex::new(0)),
225 no_raw_groups,
226 archive: None, v_segments: Arc::new(Mutex::new(Vec::new())),
228 collection: None, aux_queue,
230 working_queue: Mutex::new(main_queue), }
232 }
233}
234
235pub fn worker_thread(
261 worker_id: usize,
262 num_workers: usize,
263 _queue: Arc<BoundedPriorityQueue<Task>>, barrier: Arc<Barrier>,
265 shared: Arc<SharedCompressorState>,
266) {
267 loop {
272 let queue = {
275 let working_queue_guard = shared.working_queue.lock().unwrap();
276 Arc::clone(&*working_queue_guard)
277 };
278 let (result, task_opt) = queue.pop_large();
279
280 match result {
281 PopResult::Empty => continue,
283
284 PopResult::Completed => break,
286
287 PopResult::Normal => {
289 let task = task_opt.expect("PopResult::Normal should have task");
290
291 match task.stage {
292 ContigProcessingStage::Registration => {
293 handle_registration_stage(worker_id, &barrier, &shared);
295 continue; }
297
298 ContigProcessingStage::NewSplitters => {
299 handle_new_splitters_stage(worker_id, num_workers, &barrier, &shared);
301 continue; }
303
304 ContigProcessingStage::AllContigs => {
305 }
308
309 ContigProcessingStage::HardContigs => {
310 }
312 }
313
314 let ctg_size = task.sequence.len();
316
317 if compress_contig_task(&task, worker_id, &barrier, &shared) {
318 let old_pb = shared
320 .processed_bases
321 .fetch_add(ctg_size, Ordering::Relaxed);
322 let new_pb = old_pb + ctg_size;
323
324 if shared.verbosity > 0 && old_pb / 10_000_000 != new_pb / 10_000_000 {
326 eprintln!("Compressed: {} Mb\r", new_pb / 1_000_000);
327 }
328 } else {
329 let mut raw_contigs = shared.raw_contigs.lock().unwrap();
331 raw_contigs.push((
332 task.sample_name.clone(),
333 task.contig_name.clone(),
334 task.sequence.clone(),
335 ));
336 }
337
338 }
340 }
341 }
342
343 }
345
346fn handle_registration_stage(
351 worker_id: usize,
352 barrier: &Arc<Barrier>,
353 shared: &Arc<SharedCompressorState>,
354) {
355 let wait_result = barrier.wait();
357
358 if wait_result.is_leader() {
360 register_segments(shared);
361 }
363
364 barrier.wait();
366
367 store_segments(worker_id, shared);
369
370 barrier.wait();
372
373 if worker_id == 0 {
376 } else if worker_id == 1 {
382 }
386
387 barrier.wait();
389}
390
391fn find_new_splitters(
406 contig: &ragc_common::Contig,
407 thread_id: usize,
408 shared: &Arc<SharedCompressorState>,
409) {
410 use crate::kmer_extract::{enumerate_kmers, remove_non_singletons};
411
412 let mut v_contig_kmers = enumerate_kmers(contig, shared.kmer_length);
414 v_contig_kmers.sort_unstable();
415 remove_non_singletons(&mut v_contig_kmers, 0);
416
417 if shared.verbosity > 1 {
418 eprintln!(
419 "find_new_splitters: contig has {} singleton k-mers",
420 v_contig_kmers.len()
421 );
422 }
423
424 let mut v_tmp = Vec::with_capacity(v_contig_kmers.len());
428 set_difference(&v_contig_kmers, &shared.v_candidate_kmers, &mut v_tmp);
429
430 if shared.verbosity > 1 {
431 eprintln!(
432 "find_new_splitters: {} k-mers after excluding reference singletons",
433 v_tmp.len()
434 );
435 }
436
437 v_contig_kmers.clear();
439 set_difference(&v_tmp, &shared.v_duplicated_kmers, &mut v_contig_kmers);
440
441 if shared.verbosity > 1 {
442 eprintln!(
443 "find_new_splitters: {} NEW splitters found",
444 v_contig_kmers.len()
445 );
446 }
447
448 let mut vv_splitters = shared.vv_splitters.lock().unwrap();
450 vv_splitters[thread_id].extend(v_contig_kmers);
451}
452
453fn set_difference(a: &[u64], b: &[u64], result: &mut Vec<u64>) {
457 result.clear();
458 let mut i = 0;
459 let mut j = 0;
460
461 while i < a.len() && j < b.len() {
462 if a[i] < b[j] {
463 result.push(a[i]);
464 i += 1;
465 } else if a[i] > b[j] {
466 j += 1;
467 } else {
468 i += 1;
470 j += 1;
471 }
472 }
473
474 result.extend_from_slice(&a[i..]);
476}
477
478fn handle_new_splitters_stage(
483 worker_id: usize,
484 num_workers: usize,
485 barrier: &Arc<Barrier>,
486 shared: &Arc<SharedCompressorState>,
487) {
488 barrier.wait();
490
491 if num_workers > 1 || worker_id == 0 {
497 let mut splitters = shared.splitters.lock().unwrap();
498 let mut bloom = shared.bloom_splitters.lock().unwrap();
499 let mut vv_splitters = shared.vv_splitters.lock().unwrap();
500
501 let mut total_new = 0;
502 for thread_splitters in vv_splitters.iter_mut() {
503 total_new += thread_splitters.len();
504 for &kmer in thread_splitters.iter() {
505 splitters.insert(kmer);
506 bloom.insert(kmer);
507 }
508 thread_splitters.clear();
509 }
510
511 if shared.verbosity > 0 && total_new > 0 {
512 eprintln!("Adaptive mode: Added {} new splitters", total_new);
513 eprintln!("Total splitters: {}", splitters.len());
514 }
515
516 let filling_factor = bloom.filling_factor();
518 if filling_factor > 0.3 {
519 if shared.verbosity > 1 {
520 eprintln!(
521 "Bloom filter filling factor {:.2}, resizing...",
522 filling_factor
523 );
524 }
525
526 let new_size_bits = (splitters.len() as f64 / 0.25) as usize * 8; bloom.resize(new_size_bits);
529
530 for &kmer in splitters.iter() {
532 bloom.insert(kmer);
533 }
534
535 if shared.verbosity > 1 {
536 eprintln!("Bloom filter resized to {} bits", new_size_bits);
537 }
538 }
539 }
540
541 if worker_id == 0 {
544 let mut raw_contigs = shared.raw_contigs.lock().unwrap();
545
546 if shared.verbosity > 0 && !raw_contigs.is_empty() {
547 eprintln!(
548 "Adaptive mode: Re-enqueueing {} hard contigs for reprocessing",
549 raw_contigs.len()
550 );
551 }
552
553 for (sample_name, contig_name, sequence) in raw_contigs.drain(..) {
556 let cost = sequence.len();
557 shared.aux_queue.emplace(
558 Task::new_contig(
559 sample_name,
560 contig_name,
561 sequence,
562 ContigProcessingStage::HardContigs,
563 ),
564 1, cost,
566 );
567 }
568
569 shared.aux_queue.emplace_many_no_cost(
572 Task::new_sync(ContigProcessingStage::Registration),
573 0, num_workers,
575 );
576
577 let mut working_queue = shared.working_queue.lock().unwrap();
580 *working_queue = Arc::clone(&shared.aux_queue);
581
582 if shared.verbosity > 1 {
583 eprintln!("Switched to auxiliary queue for hard contig reprocessing");
584 }
585 }
586
587 barrier.wait();
590}
591
592fn compress_contig_task(
600 task: &Task,
601 worker_id: usize,
602 _barrier: &Arc<Barrier>,
603 shared: &Arc<SharedCompressorState>,
604) -> bool {
605 let ctx = CompressionContext {
607 splitters: Arc::clone(&shared.splitters),
608 bloom_splitters: Arc::clone(&shared.bloom_splitters),
609 buffered_segments: Arc::clone(&shared.buffered_segments),
610 kmer_length: shared.kmer_length,
611 adaptive_mode: shared.adaptive_mode,
612 map_segments: Arc::clone(&shared.map_segments),
613 map_segments_terminators: Arc::clone(&shared.map_segments_terminators),
614 concatenated_genomes: shared.concatenated_genomes,
615 };
616
617 let success = compress_contig(&task.sample_name, &task.contig_name, &task.sequence, &ctx);
619
620 if shared.adaptive_mode
622 && !success
623 && task.stage == ContigProcessingStage::AllContigs
624 && task.sequence.len() >= 1000
625 {
626 if shared.verbosity > 1 {
628 eprintln!(
629 "Hard contig detected: {}/{} ({} bp)",
630 task.sample_name,
631 task.contig_name,
632 task.sequence.len()
633 );
634 }
635
636 find_new_splitters(&task.sequence, worker_id, shared);
638
639 let mut raw_contigs = shared.raw_contigs.lock().unwrap();
641 raw_contigs.push((
642 task.sample_name.clone(),
643 task.contig_name.clone(),
644 task.sequence.clone(),
645 ));
646
647 return false;
649 }
650
651 success
652}
653
654fn register_segments(shared: &Arc<SharedCompressorState>) {
667 let mut buffered = shared.buffered_segments.lock().unwrap();
668
669 buffered.sort_known(1); let no_new = buffered.process_new(&shared.map_segments);
677
678 drop(buffered);
679
680 if no_new > 0 {
681 let mut no_segments = shared.no_segments.lock().unwrap();
683 let current_no_segments = *no_segments;
684
685 if let Some(archive_mutex) = &shared.archive {
686 let mut archive = archive_mutex.lock().unwrap();
687
688 for i in 0..no_new {
691 let seg_num = current_no_segments + i;
692 archive.register_stream(&ragc_common::stream_delta_name(3000, seg_num));
694 archive.register_stream(&ragc_common::stream_ref_name(3000, seg_num));
695 }
696 }
697
698 *no_segments += no_new;
699 drop(no_segments);
700
701 let no_segments_value = *shared.no_segments.lock().unwrap();
703 let mut v_segments = shared.v_segments.lock().unwrap();
704 v_segments.resize_with(no_segments_value as usize, || None);
705 }
706
707 if shared.no_raw_groups > 0 {
709 let buffered = shared.buffered_segments.lock().unwrap();
710 buffered.distribute_segments(0, 0, shared.no_raw_groups);
711 }
712
713 let buffered = shared.buffered_segments.lock().unwrap();
715 buffered.restart_read_vec();
716}
717
718fn store_segments(_worker_id: usize, shared: &Arc<SharedCompressorState>) {
733 const MAX_BUFF_SIZE: usize = 32;
734 let mut buffered_placements: Vec<SegmentPlacement> = Vec::with_capacity(MAX_BUFF_SIZE);
735
736 let buffered = shared.buffered_segments.lock().unwrap();
737
738 loop {
739 let block_group_id = buffered.get_vec_id();
741
742 if block_group_id < 0 {
743 break; }
745
746 for group_id in
748 (block_group_id - crate::segment_buffer::PART_ID_STEP + 1..=block_group_id).rev()
749 {
750 if buffered.is_empty_part(group_id) {
751 continue;
752 }
753
754 while let Some((
756 kmer1,
757 kmer2,
758 sample_name,
759 contig_name,
760 seg_data,
761 is_rev_comp,
762 seg_part_no,
763 )) = buffered.get_part(group_id)
764 {
765 let mut v_segments = shared.v_segments.lock().unwrap();
767 if group_id >= 0
768 && (group_id as usize) < v_segments.len()
769 && v_segments[group_id as usize].is_none()
770 {
771 if let Some(archive_mutex) = &shared.archive {
773 let archive = archive_mutex.lock().unwrap();
774 let stream_id = archive
775 .get_stream_id(&ragc_common::stream_delta_name(3000, group_id as u32))
776 .unwrap_or(0);
777 let ref_stream_id = archive
778 .get_stream_id(&ragc_common::stream_ref_name(3000, group_id as u32))
779 .unwrap_or(0);
780 drop(archive);
781
782 v_segments[group_id as usize] =
783 Some(SegmentGroup::new(group_id as u32, stream_id, ref_stream_id));
784 }
785 }
786
787 let mut in_group_id = 0;
789 if group_id >= 0 && (group_id as usize) < v_segments.len() {
790 if let Some(segment_group) = &mut v_segments[group_id as usize] {
791 if let Some(archive_mutex) = &shared.archive {
792 let mut archive = archive_mutex.lock().unwrap();
793 if let Ok(id) = segment_group.add_segment(&seg_data, &mut archive) {
794 in_group_id = id;
795 }
796 }
797 }
798 }
799 drop(v_segments);
800
801 buffered_placements.push(SegmentPlacement {
803 sample_name,
804 contig_name,
805 place: seg_part_no as usize,
806 group_id: group_id as u32,
807 in_group_id,
808 is_rev_comp,
809 raw_length: seg_data.len() as u32,
810 });
811
812 if buffered_placements.len() >= MAX_BUFF_SIZE {
814 if let Some(collection_mutex) = &shared.collection {
815 let mut collection = collection_mutex.lock().unwrap();
816 for placement in &buffered_placements {
817 let _ = collection.add_segment_placed(
818 &placement.sample_name,
819 &placement.contig_name,
820 placement.place,
821 placement.group_id,
822 placement.in_group_id,
823 placement.is_rev_comp,
824 placement.raw_length,
825 );
826 }
827 }
828 buffered_placements.clear();
829 }
830
831 let key = (kmer1, kmer2);
833 let mut map_segments = shared.map_segments.lock().unwrap();
834 map_segments
835 .entry(key)
836 .and_modify(|existing| {
837 if (group_id as u32) < *existing {
839 *existing = group_id as u32;
840 }
841 })
842 .or_insert(group_id as u32);
843 drop(map_segments);
844
845 if kmer1 != crate::contig_compression::MISSING_KMER
847 && kmer2 != crate::contig_compression::MISSING_KMER
848 {
849 let mut terminators = shared.map_segments_terminators.lock().unwrap();
850
851 let list1 = terminators.entry(kmer1).or_insert_with(Vec::new);
853 if !list1.contains(&kmer2) {
854 list1.push(kmer2);
855 list1.sort_unstable();
856 }
857
858 if kmer1 != kmer2 {
860 let list2 = terminators.entry(kmer2).or_insert_with(Vec::new);
861 if !list2.contains(&kmer1) {
862 list2.push(kmer1);
863 list2.sort_unstable();
864 }
865 }
866
867 drop(terminators);
868 }
869 }
870 }
871 }
872
873 if !buffered_placements.is_empty() {
875 if let Some(collection_mutex) = &shared.collection {
876 let mut collection = collection_mutex.lock().unwrap();
877 for placement in &buffered_placements {
878 let _ = collection.add_segment_placed(
879 &placement.sample_name,
880 &placement.contig_name,
881 placement.place,
882 placement.group_id,
883 placement.in_group_id,
884 placement.is_rev_comp,
885 placement.raw_length,
886 );
887 }
888 }
889 }
890}
891
892pub fn create_agc_archive(
914 output_path: &str,
915 sample_files: Vec<(String, String)>,
916 splitters: AHashSet<u64>,
917 candidate_kmers: AHashSet<u64>,
918 duplicated_kmers: AHashSet<u64>,
919 kmer_length: usize,
920 segment_size: u32,
921 num_threads: usize,
922 adaptive_mode: bool,
923 concatenated_genomes: bool,
924 verbosity: usize,
925) -> anyhow::Result<()> {
926 use ragc_common::CollectionV3;
927
928 if sample_files.is_empty() {
929 return Ok(());
930 }
931
932 let num_samples = sample_files.len();
933
934 let mut archive = Archive::new_writer();
936 archive.open(output_path)?;
937
938 {
940 let mut data = Vec::new();
941 let append_str = |data: &mut Vec<u8>, s: &str| {
942 data.extend_from_slice(s.as_bytes());
943 data.push(0);
944 };
945
946 append_str(&mut data, "producer");
947 append_str(&mut data, "ragc");
948
949 append_str(&mut data, "producer_version_major");
950 append_str(&mut data, &ragc_common::AGC_FILE_MAJOR.to_string());
951
952 append_str(&mut data, "producer_version_minor");
953 append_str(&mut data, &ragc_common::AGC_FILE_MINOR.to_string());
954
955 append_str(&mut data, "producer_version_build");
956 append_str(&mut data, "0");
957
958 append_str(&mut data, "file_version_major");
959 append_str(&mut data, &ragc_common::AGC_FILE_MAJOR.to_string());
960
961 append_str(&mut data, "file_version_minor");
962 append_str(&mut data, &ragc_common::AGC_FILE_MINOR.to_string());
963
964 append_str(&mut data, "comment");
965 append_str(
966 &mut data,
967 &format!(
968 "RAGC v.{}.{}",
969 ragc_common::AGC_FILE_MAJOR,
970 ragc_common::AGC_FILE_MINOR
971 ),
972 );
973
974 let stream_id = archive.register_stream("file_type_info");
975 archive.add_part(stream_id, &data, 7)?; if verbosity > 0 {
978 eprintln!(
979 "Wrote file_type_info: version {}.{}",
980 ragc_common::AGC_FILE_MAJOR,
981 ragc_common::AGC_FILE_MINOR
982 );
983 }
984 }
985
986 {
988 let params_stream_id = archive.register_stream("params");
989 let mut params_data = Vec::new();
990
991 params_data.extend_from_slice(&(kmer_length as u32).to_le_bytes());
993 params_data.extend_from_slice(&20u32.to_le_bytes()); params_data.extend_from_slice(&50u32.to_le_bytes()); params_data.extend_from_slice(&segment_size.to_le_bytes());
996
997 archive.add_part(params_stream_id, ¶ms_data, 0)?;
998
999 if verbosity > 0 {
1000 eprintln!(
1001 "Wrote params: k={}, segment_size={}",
1002 kmer_length, segment_size
1003 );
1004 }
1005 }
1006
1007 let mut collection = CollectionV3::new();
1009 collection.set_config(segment_size, kmer_length as u32, None);
1010
1011 collection.prepare_for_compression(&mut archive)?;
1013
1014 let archive = Arc::new(Mutex::new(archive));
1016 let collection = Arc::new(Mutex::new(collection));
1017
1018 compress_samples_streaming_with_archive(
1020 sample_files,
1021 splitters,
1022 candidate_kmers,
1023 duplicated_kmers,
1024 kmer_length,
1025 num_threads,
1026 adaptive_mode,
1027 concatenated_genomes,
1028 verbosity,
1029 Some(archive.clone()),
1030 Some(collection.clone()),
1031 )
1032 .map_err(|e| anyhow::anyhow!(e))?;
1033
1034 {
1036 let mut archive_guard = archive.lock().unwrap();
1037 let mut collection_guard = collection.lock().unwrap();
1038
1039 if verbosity > 0 {
1040 eprintln!(
1041 "Serializing collection metadata for {} samples...",
1042 num_samples
1043 );
1044 }
1045
1046 collection_guard.store_batch_sample_names(&mut archive_guard)?;
1048
1049 collection_guard.store_contig_batch(&mut archive_guard, 0, num_samples)?;
1051
1052 if verbosity > 0 {
1053 eprintln!("Collection metadata serialized successfully");
1054 }
1055 }
1056
1057 let mut archive = archive.lock().unwrap();
1059 archive.close()?;
1060
1061 if verbosity > 0 {
1062 eprintln!("AGC archive created: {}", output_path);
1063 }
1064
1065 Ok(())
1066}
1067
1068fn compress_samples_streaming_with_archive(
1088 sample_files: Vec<(String, String)>,
1089 splitters: AHashSet<u64>,
1090 candidate_kmers: AHashSet<u64>,
1091 duplicated_kmers: AHashSet<u64>,
1092 kmer_length: usize,
1093 num_threads: usize,
1094 adaptive_mode: bool,
1095 concatenated_genomes: bool,
1096 verbosity: usize,
1097 archive: Option<Arc<Mutex<Archive>>>,
1098 collection: Option<Arc<Mutex<ragc_common::CollectionV3>>>,
1099) -> Result<(), String> {
1100 use crate::genome_io::GenomeIO;
1101
1102 if sample_files.is_empty() {
1103 return Ok(());
1104 }
1105
1106 let queue_capacity = std::cmp::max(2_u64 << 30, num_threads as u64 * (192_u64 << 20));
1108 let queue = Arc::new(BoundedPriorityQueue::new(1, queue_capacity as usize));
1109
1110 let no_workers = if num_threads < 8 {
1112 num_threads
1113 } else {
1114 num_threads - 1
1115 };
1116
1117 let mut v_candidate_kmers: Vec<u64> = candidate_kmers.into_iter().collect();
1119 v_candidate_kmers.sort_unstable();
1120
1121 let mut v_duplicated_kmers: Vec<u64> = duplicated_kmers.into_iter().collect();
1122 v_duplicated_kmers.sort_unstable();
1123
1124 let mut bloom_filter = crate::bloom_filter::BloomFilter::new(
1127 splitters.len() * 8, );
1129 for &kmer in &splitters {
1130 bloom_filter.insert(kmer);
1131 }
1132
1133 let aux_queue = Arc::new(BoundedPriorityQueue::new(1, usize::MAX));
1135
1136 let shared = Arc::new(SharedCompressorState {
1137 processed_bases: AtomicUsize::new(0),
1138 processed_samples: AtomicUsize::new(0),
1139 raw_contigs: Mutex::new(Vec::new()),
1140 verbosity,
1141 buffered_segments: Arc::new(Mutex::new(BufferedSegments::new(0))),
1142 splitters: Arc::new(Mutex::new(splitters)),
1143 bloom_splitters: Arc::new(Mutex::new(bloom_filter)),
1144 vv_splitters: Mutex::new(vec![Vec::new(); no_workers]),
1145 v_candidate_kmers,
1146 v_duplicated_kmers,
1147 kmer_length,
1148 adaptive_mode,
1149 map_segments: Arc::new(Mutex::new(AHashMap::new())),
1150 map_segments_terminators: Arc::new(Mutex::new(AHashMap::new())),
1151 concatenated_genomes,
1152 no_segments: Arc::new(Mutex::new(0)),
1153 no_raw_groups: 0,
1154 archive,
1155 v_segments: Arc::new(Mutex::new(Vec::new())),
1156 collection,
1157 aux_queue,
1158 working_queue: Mutex::new(Arc::clone(&queue)), });
1160
1161 let barrier = Arc::new(Barrier::new(no_workers));
1163 let mut worker_handles = Vec::with_capacity(no_workers);
1164
1165 for worker_id in 0..no_workers {
1166 let q = queue.clone();
1167 let b = barrier.clone();
1168 let s = shared.clone();
1169
1170 let handle = std::thread::spawn(move || {
1171 worker_thread(worker_id, no_workers, q, b, s);
1172 });
1173
1174 worker_handles.push(handle);
1175 }
1176
1177 let mut sample_priority = usize::MAX;
1179 let mut _cnt_contigs_in_sample = 0;
1180 const PACK_CARDINALITY: usize = 50; for (sample_name, file_path) in sample_files {
1183 if verbosity > 0 {
1184 eprintln!("Processing sample: {} from {}", sample_name, file_path);
1185 }
1186
1187 let mut gio = match GenomeIO::open(&file_path) {
1189 Ok(g) => g,
1190 Err(e) => {
1191 eprintln!("Cannot open file {}: {}", file_path, e);
1192 continue;
1193 }
1194 };
1195
1196 let mut any_contigs_added = false;
1197
1198 while let Ok(Some((contig_name, sequence))) = gio.read_contig_raw() {
1200 if concatenated_genomes {
1201 let concat_sample_name = String::from("");
1204
1205 if let Some(collection_mutex) = &shared.collection {
1207 let mut collection = collection_mutex.lock().unwrap();
1208 let _ = collection.register_sample_contig(&concat_sample_name, &contig_name);
1209 }
1210
1211 let cost = sequence.len();
1212 queue.emplace(
1213 Task::new_contig(
1214 concat_sample_name,
1215 contig_name.clone(),
1216 sequence,
1217 ContigProcessingStage::AllContigs,
1218 ),
1219 sample_priority,
1220 cost,
1221 );
1222
1223 any_contigs_added = true;
1224 } else {
1225 if let Some(collection_mutex) = &shared.collection {
1228 let mut collection = collection_mutex.lock().unwrap();
1229 let _ = collection.register_sample_contig(&sample_name, &contig_name);
1230 }
1231
1232 let cost = sequence.len();
1233 queue.emplace(
1234 Task::new_contig(
1235 sample_name.clone(),
1236 contig_name.clone(),
1237 sequence,
1238 ContigProcessingStage::AllContigs,
1239 ),
1240 sample_priority,
1241 cost,
1242 );
1243
1244 any_contigs_added = true;
1245 }
1246 }
1247
1248 if !concatenated_genomes && any_contigs_added {
1250 let sync_stage = if adaptive_mode {
1251 ContigProcessingStage::NewSplitters
1252 } else {
1253 ContigProcessingStage::Registration
1254 };
1255
1256 queue.emplace_many_no_cost(Task::new_sync(sync_stage), sample_priority, no_workers);
1258
1259 sample_priority -= 1;
1260 }
1261 }
1262
1263 queue.mark_completed();
1265
1266 for handle in worker_handles {
1267 handle.join().map_err(|_| "Worker thread panicked")?;
1268 }
1269
1270 if verbosity > 0 {
1271 let total_bases = shared.processed_bases.load(Ordering::Relaxed);
1272 eprintln!("Compression complete: {} bases processed", total_bases);
1273 }
1274
1275 Ok(())
1276}
1277
1278#[cfg(test)]
1279mod tests {
1280 use super::*;
1281
1282 #[test]
1283 fn test_shared_state_creation() {
1284 let queue = Arc::new(BoundedPriorityQueue::new(1, 1000));
1285 let state = SharedCompressorState::new(1, 21, false, false, 0, queue);
1286 assert_eq!(state.verbosity, 1);
1287 assert_eq!(state.kmer_length, 21);
1288 assert_eq!(state.adaptive_mode, false);
1289 assert_eq!(state.concatenated_genomes, false);
1290 assert_eq!(state.no_raw_groups, 0);
1291 assert_eq!(state.processed_bases.load(Ordering::Relaxed), 0);
1292 assert_eq!(state.processed_samples.load(Ordering::Relaxed), 0);
1293 assert_eq!(state.raw_contigs.lock().unwrap().len(), 0);
1294 assert_eq!(*state.no_segments.lock().unwrap(), 0);
1295 }
1296
1297 #[test]
1298 fn test_worker_thread_completion() {
1299 use std::thread;
1300
1301 let queue = Arc::new(BoundedPriorityQueue::new(1, 1000));
1302 let barrier = Arc::new(Barrier::new(2));
1303 let shared = Arc::new(SharedCompressorState::new(
1304 0,
1305 21,
1306 false,
1307 false,
1308 0,
1309 queue.clone(),
1310 ));
1311
1312 queue.mark_completed();
1314
1315 let handles: Vec<_> = (0..2)
1317 .map(|worker_id| {
1318 let q = queue.clone();
1319 let b = barrier.clone();
1320 let s = shared.clone();
1321
1322 thread::spawn(move || {
1323 worker_thread(worker_id, 2, q, b, s);
1324 })
1325 })
1326 .collect();
1327
1328 for handle in handles {
1330 handle.join().unwrap();
1331 }
1332 }
1333
1334 #[test]
1335 fn test_worker_thread_with_task() {
1336 use std::thread;
1337
1338 let queue = Arc::new(BoundedPriorityQueue::new(1, 1000));
1339 let barrier = Arc::new(Barrier::new(1));
1340 let shared = Arc::new(SharedCompressorState::new(
1341 0,
1342 21,
1343 false,
1344 false,
1345 0,
1346 queue.clone(),
1347 ));
1348
1349 queue.emplace(
1351 Task::new_contig(
1352 "sample1".to_string(),
1353 "chr1".to_string(),
1354 vec![0, 1, 2, 3, 0, 1, 2, 3], ContigProcessingStage::AllContigs,
1356 ),
1357 100,
1358 8,
1359 );
1360 queue.mark_completed();
1361
1362 let q = queue.clone();
1364 let b = barrier.clone();
1365 let s = shared.clone();
1366
1367 let handle = thread::spawn(move || {
1368 worker_thread(0, 1, q, b, s);
1369 });
1370
1371 handle.join().unwrap();
1372
1373 assert_eq!(shared.processed_bases.load(Ordering::Relaxed), 8);
1375 }
1376}