1use crate::contig_compression::{compress_contig, CompressionContext};
5use crate::priority_queue::{BoundedPriorityQueue, PopResult};
6use crate::segment_buffer::BufferedSegments;
7use crate::segment_compression::compress_reference_segment;
8use crate::task::{ContigProcessingStage, Task};
9use crate::zstd_pool::compress_segment_pooled;
10use ragc_common::{Archive, Contig};
11use std::collections::{HashMap, HashSet};
12use std::sync::atomic::{AtomicUsize, Ordering};
13use std::sync::{Arc, Barrier, Mutex};
14
15#[derive(Debug, Clone)]
19struct SegmentPlacement {
20 sample_name: String,
21 contig_name: String,
22 place: usize, group_id: u32,
24 in_group_id: u32,
25 is_rev_comp: bool,
26 raw_length: u32, }
28
29struct SegmentGroup {
36 group_id: u32,
37 stream_id: usize, ref_stream_id: usize, reference: Option<Contig>, ref_written: bool, in_group_counter: u32, }
43
44impl SegmentGroup {
45 fn new(group_id: u32, stream_id: usize, ref_stream_id: usize) -> Self {
46 SegmentGroup {
47 group_id,
48 stream_id,
49 ref_stream_id,
50 reference: None,
51 ref_written: false,
52 in_group_counter: 0,
53 }
54 }
55
56 fn add_segment(&mut self, seg_data: &[u8], archive: &mut Archive) -> anyhow::Result<u32> {
61 if self.reference.is_none() {
62 let seg_vec = seg_data.to_vec();
64 self.reference = Some(seg_vec.clone());
65
66 let (compressed, marker) = compress_reference_segment(&seg_vec)?;
68
69 archive.add_part(self.ref_stream_id, &compressed, marker as u64)?;
71 self.ref_written = true;
72
73 let in_group_id = self.in_group_counter;
74 self.in_group_counter += 1;
75 Ok(in_group_id) } else {
77 let seg_vec = seg_data.to_vec();
80 let compressed = compress_segment_pooled(&seg_vec, 17)?;
81
82 archive.add_part(self.stream_id, &compressed, 0)?;
84
85 let in_group_id = self.in_group_counter;
86 self.in_group_counter += 1;
87 Ok(in_group_id)
88 }
89 }
90}
91
92pub struct SharedCompressorState {
99 pub processed_bases: AtomicUsize,
101
102 pub processed_samples: AtomicUsize,
104
105 pub raw_contigs: Mutex<Vec<(String, String, Vec<u8>)>>,
108
109 pub verbosity: usize,
111
112 pub buffered_segments: Arc<Mutex<BufferedSegments>>,
115
116 pub splitters: Arc<Mutex<HashSet<u64>>>,
119
120 pub bloom_splitters: Arc<Mutex<crate::bloom_filter::BloomFilter>>,
123
124 pub vv_splitters: Mutex<Vec<Vec<u64>>>,
128
129 pub v_candidate_kmers: Vec<u64>,
133
134 pub v_duplicated_kmers: Vec<u64>,
138
139 pub kmer_length: usize,
141
142 pub adaptive_mode: bool,
144
145 pub map_segments: Arc<Mutex<HashMap<(u64, u64), u32>>>,
149
150 pub map_segments_terminators: Arc<Mutex<HashMap<u64, Vec<u64>>>>,
154
155 pub concatenated_genomes: bool,
157
158 pub no_segments: Arc<Mutex<u32>>,
161
162 pub no_raw_groups: u32,
165
166 pub archive: Option<Arc<Mutex<Archive>>>,
169
170 pub v_segments: Arc<Mutex<Vec<Option<SegmentGroup>>>>,
174
175 pub collection: Option<Arc<Mutex<ragc_common::CollectionV3>>>,
178
179 pub aux_queue: Arc<BoundedPriorityQueue<Task>>,
182
183 pub working_queue: Mutex<Arc<BoundedPriorityQueue<Task>>>,
186 }
190
191impl SharedCompressorState {
192 pub fn new(
194 verbosity: usize,
195 kmer_length: usize,
196 adaptive_mode: bool,
197 concatenated_genomes: bool,
198 no_raw_groups: u32,
199 main_queue: Arc<BoundedPriorityQueue<Task>>,
200 ) -> Self {
201 let bloom_filter = crate::bloom_filter::BloomFilter::new(80 * 1024);
204
205 let aux_queue = Arc::new(BoundedPriorityQueue::new(1, usize::MAX));
207
208 SharedCompressorState {
209 processed_bases: AtomicUsize::new(0),
210 processed_samples: AtomicUsize::new(0),
211 raw_contigs: Mutex::new(Vec::new()),
212 verbosity,
213 buffered_segments: Arc::new(Mutex::new(BufferedSegments::new(no_raw_groups as usize))),
214 splitters: Arc::new(Mutex::new(HashSet::new())),
215 bloom_splitters: Arc::new(Mutex::new(bloom_filter)),
216 vv_splitters: Mutex::new(Vec::new()),
217 v_candidate_kmers: Vec::new(),
218 v_duplicated_kmers: Vec::new(),
219 kmer_length,
220 adaptive_mode,
221 map_segments: Arc::new(Mutex::new(HashMap::new())),
222 map_segments_terminators: Arc::new(Mutex::new(HashMap::new())),
223 concatenated_genomes,
224 no_segments: Arc::new(Mutex::new(0)),
225 no_raw_groups,
226 archive: None, v_segments: Arc::new(Mutex::new(Vec::new())),
228 collection: None, aux_queue,
230 working_queue: Mutex::new(main_queue), }
232 }
233}
234
235pub fn worker_thread(
261 worker_id: usize,
262 num_workers: usize,
263 _queue: Arc<BoundedPriorityQueue<Task>>, barrier: Arc<Barrier>,
265 shared: Arc<SharedCompressorState>,
266) {
267 loop {
272 let queue = {
275 let working_queue_guard = shared.working_queue.lock().unwrap();
276 Arc::clone(&*working_queue_guard)
277 };
278 let (result, task_opt) = queue.pop_large();
279
280 match result {
281 PopResult::Empty => continue,
283
284 PopResult::Completed => break,
286
287 PopResult::Normal => {
289 let task = task_opt.expect("PopResult::Normal should have task");
290
291 match task.stage {
292 ContigProcessingStage::Registration => {
293 handle_registration_stage(worker_id, &barrier, &shared);
295 continue; }
297
298 ContigProcessingStage::NewSplitters => {
299 handle_new_splitters_stage(worker_id, num_workers, &barrier, &shared);
301 continue; }
303
304 ContigProcessingStage::AllContigs => {
305 }
308
309 ContigProcessingStage::HardContigs => {
310 }
312 }
313
314 let ctg_size = task.sequence.len();
316
317 if compress_contig_task(&task, worker_id, &barrier, &shared) {
318 let old_pb = shared
320 .processed_bases
321 .fetch_add(ctg_size, Ordering::Relaxed);
322 let new_pb = old_pb + ctg_size;
323
324 if shared.verbosity > 0 && old_pb / 10_000_000 != new_pb / 10_000_000 {
326 eprintln!("Compressed: {} Mb\r", new_pb / 1_000_000);
327 }
328 } else {
329 let mut raw_contigs = shared.raw_contigs.lock().unwrap();
331 raw_contigs.push((
332 task.sample_name.clone(),
333 task.contig_name.clone(),
334 task.sequence.clone(),
335 ));
336 }
337
338 }
340 }
341 }
342
343 }
345
346fn handle_registration_stage(
351 worker_id: usize,
352 barrier: &Arc<Barrier>,
353 shared: &Arc<SharedCompressorState>,
354) {
355 let wait_result = barrier.wait();
357
358 if wait_result.is_leader() {
360 register_segments(shared);
361 }
363
364 barrier.wait();
366
367 store_segments(worker_id, shared);
369
370 barrier.wait();
372
373 if worker_id == 0 {
376 } else if worker_id == 1 {
382 }
386
387 barrier.wait();
389}
390
391fn find_new_splitters(
406 contig: &ragc_common::Contig,
407 thread_id: usize,
408 shared: &Arc<SharedCompressorState>,
409) {
410 use crate::kmer_extract::{enumerate_kmers, remove_non_singletons};
411
412 let mut v_contig_kmers = enumerate_kmers(contig, shared.kmer_length);
414 v_contig_kmers.sort_unstable();
415 remove_non_singletons(&mut v_contig_kmers, 0);
416
417 if shared.verbosity > 1 {
418 eprintln!(
419 "find_new_splitters: contig has {} singleton k-mers",
420 v_contig_kmers.len()
421 );
422 }
423
424 let mut v_tmp = Vec::with_capacity(v_contig_kmers.len());
428 set_difference(&v_contig_kmers, &shared.v_candidate_kmers, &mut v_tmp);
429
430 if shared.verbosity > 1 {
431 eprintln!(
432 "find_new_splitters: {} k-mers after excluding reference singletons",
433 v_tmp.len()
434 );
435 }
436
437 v_contig_kmers.clear();
439 set_difference(&v_tmp, &shared.v_duplicated_kmers, &mut v_contig_kmers);
440
441 if shared.verbosity > 1 {
442 eprintln!(
443 "find_new_splitters: {} NEW splitters found",
444 v_contig_kmers.len()
445 );
446 }
447
448 let mut vv_splitters = shared.vv_splitters.lock().unwrap();
450 vv_splitters[thread_id].extend(v_contig_kmers);
451}
452
453fn set_difference(a: &[u64], b: &[u64], result: &mut Vec<u64>) {
457 result.clear();
458 let mut i = 0;
459 let mut j = 0;
460
461 while i < a.len() && j < b.len() {
462 if a[i] < b[j] {
463 result.push(a[i]);
464 i += 1;
465 } else if a[i] > b[j] {
466 j += 1;
467 } else {
468 i += 1;
470 j += 1;
471 }
472 }
473
474 result.extend_from_slice(&a[i..]);
476}
477
478fn handle_new_splitters_stage(
483 worker_id: usize,
484 num_workers: usize,
485 barrier: &Arc<Barrier>,
486 shared: &Arc<SharedCompressorState>,
487) {
488 barrier.wait();
490
491 if num_workers > 1 || worker_id == 0 {
497 let mut splitters = shared.splitters.lock().unwrap();
498 let mut bloom = shared.bloom_splitters.lock().unwrap();
499 let mut vv_splitters = shared.vv_splitters.lock().unwrap();
500
501 let mut total_new = 0;
502 for thread_splitters in vv_splitters.iter_mut() {
503 total_new += thread_splitters.len();
504 for &kmer in thread_splitters.iter() {
505 splitters.insert(kmer);
506 bloom.insert(kmer);
507 }
508 thread_splitters.clear();
509 }
510
511 if shared.verbosity > 0 && total_new > 0 {
512 eprintln!("Adaptive mode: Added {} new splitters", total_new);
513 eprintln!("Total splitters: {}", splitters.len());
514 }
515
516 let filling_factor = bloom.filling_factor();
518 if filling_factor > 0.3 {
519 if shared.verbosity > 1 {
520 eprintln!(
521 "Bloom filter filling factor {:.2}, resizing...",
522 filling_factor
523 );
524 }
525
526 let new_size_bits = (splitters.len() as f64 / 0.25) as usize * 8; bloom.resize(new_size_bits);
529
530 for &kmer in splitters.iter() {
532 bloom.insert(kmer);
533 }
534
535 if shared.verbosity > 1 {
536 eprintln!("Bloom filter resized to {} bits", new_size_bits);
537 }
538 }
539 }
540
541 if worker_id == 0 {
544 let mut raw_contigs = shared.raw_contigs.lock().unwrap();
545
546 if shared.verbosity > 0 && !raw_contigs.is_empty() {
547 eprintln!(
548 "Adaptive mode: Re-enqueueing {} hard contigs for reprocessing",
549 raw_contigs.len()
550 );
551 }
552
553 for (sample_name, contig_name, sequence) in raw_contigs.drain(..) {
556 let cost = sequence.len();
557 shared.aux_queue.emplace(
558 Task::new_contig(
559 sample_name,
560 contig_name,
561 sequence,
562 ContigProcessingStage::HardContigs,
563 ),
564 1, cost,
566 );
567 }
568
569 shared.aux_queue.emplace_many_no_cost(
572 Task::new_sync(ContigProcessingStage::Registration),
573 0, num_workers,
575 );
576
577 let mut working_queue = shared.working_queue.lock().unwrap();
580 *working_queue = Arc::clone(&shared.aux_queue);
581
582 if shared.verbosity > 1 {
583 eprintln!("Switched to auxiliary queue for hard contig reprocessing");
584 }
585 }
586
587 barrier.wait();
590}
591
592fn compress_contig_task(
600 task: &Task,
601 worker_id: usize,
602 _barrier: &Arc<Barrier>,
603 shared: &Arc<SharedCompressorState>,
604) -> bool {
605 let ctx = CompressionContext {
607 splitters: Arc::clone(&shared.splitters),
608 bloom_splitters: Arc::clone(&shared.bloom_splitters),
609 buffered_segments: Arc::clone(&shared.buffered_segments),
610 kmer_length: shared.kmer_length,
611 adaptive_mode: shared.adaptive_mode,
612 map_segments: Arc::clone(&shared.map_segments),
613 map_segments_terminators: Arc::clone(&shared.map_segments_terminators),
614 concatenated_genomes: shared.concatenated_genomes,
615 };
616
617 let success = compress_contig(&task.sample_name, &task.contig_name, &task.sequence, &ctx);
619
620 if shared.adaptive_mode
622 && !success
623 && task.stage == ContigProcessingStage::AllContigs
624 && task.sequence.len() >= 1000
625 {
626 if shared.verbosity > 1 {
628 eprintln!(
629 "Hard contig detected: {}/{} ({} bp)",
630 task.sample_name,
631 task.contig_name,
632 task.sequence.len()
633 );
634 }
635
636 find_new_splitters(&task.sequence, worker_id, shared);
638
639 let mut raw_contigs = shared.raw_contigs.lock().unwrap();
641 raw_contigs.push((
642 task.sample_name.clone(),
643 task.contig_name.clone(),
644 task.sequence.clone(),
645 ));
646
647 return false;
649 }
650
651 success
652}
653
654fn register_segments(shared: &Arc<SharedCompressorState>) {
667 let mut buffered = shared.buffered_segments.lock().unwrap();
668
669 buffered.sort_known(1); let no_new = buffered.process_new();
676
677 drop(buffered);
678
679 if no_new > 0 {
680 let mut no_segments = shared.no_segments.lock().unwrap();
682 let current_no_segments = *no_segments;
683
684 if let Some(archive_mutex) = &shared.archive {
685 let mut archive = archive_mutex.lock().unwrap();
686
687 for i in 0..no_new {
690 let seg_num = current_no_segments + i;
691 archive.register_stream(&ragc_common::stream_delta_name(3000, seg_num));
693 archive.register_stream(&ragc_common::stream_ref_name(3000, seg_num));
694 }
695 }
696
697 *no_segments += no_new;
698 drop(no_segments);
699
700 let no_segments_value = *shared.no_segments.lock().unwrap();
702 let mut v_segments = shared.v_segments.lock().unwrap();
703 v_segments.resize_with(no_segments_value as usize, || None);
704 }
705
706 if shared.no_raw_groups > 0 {
708 let buffered = shared.buffered_segments.lock().unwrap();
709 buffered.distribute_segments(0, 0, shared.no_raw_groups);
710 }
711
712 let buffered = shared.buffered_segments.lock().unwrap();
714 buffered.restart_read_vec();
715}
716
717fn store_segments(_worker_id: usize, shared: &Arc<SharedCompressorState>) {
732 const MAX_BUFF_SIZE: usize = 32;
733 let mut buffered_placements: Vec<SegmentPlacement> = Vec::with_capacity(MAX_BUFF_SIZE);
734
735 let buffered = shared.buffered_segments.lock().unwrap();
736
737 loop {
738 let block_group_id = buffered.get_vec_id();
740
741 if block_group_id < 0 {
742 break; }
744
745 for group_id in
747 (block_group_id - crate::segment_buffer::PART_ID_STEP + 1..=block_group_id).rev()
748 {
749 if buffered.is_empty_part(group_id) {
750 continue;
751 }
752
753 while let Some((
755 kmer1,
756 kmer2,
757 sample_name,
758 contig_name,
759 seg_data,
760 is_rev_comp,
761 seg_part_no,
762 )) = buffered.get_part(group_id)
763 {
764 let mut v_segments = shared.v_segments.lock().unwrap();
766 if group_id >= 0
767 && (group_id as usize) < v_segments.len()
768 && v_segments[group_id as usize].is_none()
769 {
770 if let Some(archive_mutex) = &shared.archive {
772 let archive = archive_mutex.lock().unwrap();
773 let stream_id = archive
774 .get_stream_id(&ragc_common::stream_delta_name(3000, group_id as u32))
775 .unwrap_or(0);
776 let ref_stream_id = archive
777 .get_stream_id(&ragc_common::stream_ref_name(3000, group_id as u32))
778 .unwrap_or(0);
779 drop(archive);
780
781 v_segments[group_id as usize] =
782 Some(SegmentGroup::new(group_id as u32, stream_id, ref_stream_id));
783 }
784 }
785
786 let mut in_group_id = 0;
788 if group_id >= 0 && (group_id as usize) < v_segments.len() {
789 if let Some(segment_group) = &mut v_segments[group_id as usize] {
790 if let Some(archive_mutex) = &shared.archive {
791 let mut archive = archive_mutex.lock().unwrap();
792 if let Ok(id) = segment_group.add_segment(&seg_data, &mut archive) {
793 in_group_id = id;
794 }
795 }
796 }
797 }
798 drop(v_segments);
799
800 buffered_placements.push(SegmentPlacement {
802 sample_name,
803 contig_name,
804 place: seg_part_no as usize,
805 group_id: group_id as u32,
806 in_group_id,
807 is_rev_comp,
808 raw_length: seg_data.len() as u32,
809 });
810
811 if buffered_placements.len() >= MAX_BUFF_SIZE {
813 if let Some(collection_mutex) = &shared.collection {
814 let mut collection = collection_mutex.lock().unwrap();
815 for placement in &buffered_placements {
816 let _ = collection.add_segment_placed(
817 &placement.sample_name,
818 &placement.contig_name,
819 placement.place,
820 placement.group_id,
821 placement.in_group_id,
822 placement.is_rev_comp,
823 placement.raw_length,
824 );
825 }
826 }
827 buffered_placements.clear();
828 }
829
830 let key = (kmer1, kmer2);
832 let mut map_segments = shared.map_segments.lock().unwrap();
833 map_segments
834 .entry(key)
835 .and_modify(|existing| {
836 if (group_id as u32) < *existing {
838 *existing = group_id as u32;
839 }
840 })
841 .or_insert(group_id as u32);
842 drop(map_segments);
843
844 if kmer1 != crate::contig_compression::MISSING_KMER
846 && kmer2 != crate::contig_compression::MISSING_KMER
847 {
848 let mut terminators = shared.map_segments_terminators.lock().unwrap();
849
850 let list1 = terminators.entry(kmer1).or_insert_with(Vec::new);
852 if !list1.contains(&kmer2) {
853 list1.push(kmer2);
854 list1.sort_unstable();
855 }
856
857 if kmer1 != kmer2 {
859 let list2 = terminators.entry(kmer2).or_insert_with(Vec::new);
860 if !list2.contains(&kmer1) {
861 list2.push(kmer1);
862 list2.sort_unstable();
863 }
864 }
865
866 drop(terminators);
867 }
868 }
869 }
870 }
871
872 if !buffered_placements.is_empty() {
874 if let Some(collection_mutex) = &shared.collection {
875 let mut collection = collection_mutex.lock().unwrap();
876 for placement in &buffered_placements {
877 let _ = collection.add_segment_placed(
878 &placement.sample_name,
879 &placement.contig_name,
880 placement.place,
881 placement.group_id,
882 placement.in_group_id,
883 placement.is_rev_comp,
884 placement.raw_length,
885 );
886 }
887 }
888 }
889}
890
891pub fn create_agc_archive(
913 output_path: &str,
914 sample_files: Vec<(String, String)>,
915 splitters: HashSet<u64>,
916 candidate_kmers: HashSet<u64>,
917 duplicated_kmers: HashSet<u64>,
918 kmer_length: usize,
919 segment_size: u32,
920 num_threads: usize,
921 adaptive_mode: bool,
922 concatenated_genomes: bool,
923 verbosity: usize,
924) -> anyhow::Result<()> {
925 use ragc_common::CollectionV3;
926
927 if sample_files.is_empty() {
928 return Ok(());
929 }
930
931 let num_samples = sample_files.len();
932
933 let mut archive = Archive::new_writer();
935 archive.open(output_path)?;
936
937 {
939 let mut data = Vec::new();
940 let append_str = |data: &mut Vec<u8>, s: &str| {
941 data.extend_from_slice(s.as_bytes());
942 data.push(0);
943 };
944
945 append_str(&mut data, "producer");
946 append_str(&mut data, "ragc");
947
948 append_str(&mut data, "producer_version_major");
949 append_str(&mut data, &ragc_common::AGC_FILE_MAJOR.to_string());
950
951 append_str(&mut data, "producer_version_minor");
952 append_str(&mut data, &ragc_common::AGC_FILE_MINOR.to_string());
953
954 append_str(&mut data, "producer_version_build");
955 append_str(&mut data, "0");
956
957 append_str(&mut data, "file_version_major");
958 append_str(&mut data, &ragc_common::AGC_FILE_MAJOR.to_string());
959
960 append_str(&mut data, "file_version_minor");
961 append_str(&mut data, &ragc_common::AGC_FILE_MINOR.to_string());
962
963 append_str(&mut data, "comment");
964 append_str(
965 &mut data,
966 &format!(
967 "RAGC v.{}.{}",
968 ragc_common::AGC_FILE_MAJOR,
969 ragc_common::AGC_FILE_MINOR
970 ),
971 );
972
973 let stream_id = archive.register_stream("file_type_info");
974 archive.add_part(stream_id, &data, 7)?; if verbosity > 0 {
977 eprintln!(
978 "Wrote file_type_info: version {}.{}",
979 ragc_common::AGC_FILE_MAJOR,
980 ragc_common::AGC_FILE_MINOR
981 );
982 }
983 }
984
985 {
987 let params_stream_id = archive.register_stream("params");
988 let mut params_data = Vec::new();
989
990 params_data.extend_from_slice(&(kmer_length as u32).to_le_bytes());
992 params_data.extend_from_slice(&20u32.to_le_bytes()); params_data.extend_from_slice(&50u32.to_le_bytes()); params_data.extend_from_slice(&segment_size.to_le_bytes());
995
996 archive.add_part(params_stream_id, ¶ms_data, 0)?;
997
998 if verbosity > 0 {
999 eprintln!(
1000 "Wrote params: k={}, segment_size={}",
1001 kmer_length, segment_size
1002 );
1003 }
1004 }
1005
1006 let mut collection = CollectionV3::new();
1008 collection.set_config(segment_size, kmer_length as u32, None);
1009
1010 collection.prepare_for_compression(&mut archive)?;
1012
1013 let archive = Arc::new(Mutex::new(archive));
1015 let collection = Arc::new(Mutex::new(collection));
1016
1017 compress_samples_streaming_with_archive(
1019 sample_files,
1020 splitters,
1021 candidate_kmers,
1022 duplicated_kmers,
1023 kmer_length,
1024 num_threads,
1025 adaptive_mode,
1026 concatenated_genomes,
1027 verbosity,
1028 Some(archive.clone()),
1029 Some(collection.clone()),
1030 )
1031 .map_err(|e| anyhow::anyhow!(e))?;
1032
1033 {
1035 let mut archive_guard = archive.lock().unwrap();
1036 let mut collection_guard = collection.lock().unwrap();
1037
1038 if verbosity > 0 {
1039 eprintln!(
1040 "Serializing collection metadata for {} samples...",
1041 num_samples
1042 );
1043 }
1044
1045 collection_guard.store_batch_sample_names(&mut archive_guard)?;
1047
1048 collection_guard.store_contig_batch(&mut archive_guard, 0, num_samples)?;
1050
1051 if verbosity > 0 {
1052 eprintln!("Collection metadata serialized successfully");
1053 }
1054 }
1055
1056 let mut archive = archive.lock().unwrap();
1058 archive.close()?;
1059
1060 if verbosity > 0 {
1061 eprintln!("AGC archive created: {}", output_path);
1062 }
1063
1064 Ok(())
1065}
1066
1067fn compress_samples_streaming_with_archive(
1087 sample_files: Vec<(String, String)>,
1088 splitters: HashSet<u64>,
1089 candidate_kmers: HashSet<u64>,
1090 duplicated_kmers: HashSet<u64>,
1091 kmer_length: usize,
1092 num_threads: usize,
1093 adaptive_mode: bool,
1094 concatenated_genomes: bool,
1095 verbosity: usize,
1096 archive: Option<Arc<Mutex<Archive>>>,
1097 collection: Option<Arc<Mutex<ragc_common::CollectionV3>>>,
1098) -> Result<(), String> {
1099 use crate::genome_io::GenomeIO;
1100
1101 if sample_files.is_empty() {
1102 return Ok(());
1103 }
1104
1105 let queue_capacity = std::cmp::max(2_u64 << 30, num_threads as u64 * (192_u64 << 20));
1107 let queue = Arc::new(BoundedPriorityQueue::new(1, queue_capacity as usize));
1108
1109 let no_workers = if num_threads < 8 {
1111 num_threads
1112 } else {
1113 num_threads - 1
1114 };
1115
1116 let mut v_candidate_kmers: Vec<u64> = candidate_kmers.into_iter().collect();
1118 v_candidate_kmers.sort_unstable();
1119
1120 let mut v_duplicated_kmers: Vec<u64> = duplicated_kmers.into_iter().collect();
1121 v_duplicated_kmers.sort_unstable();
1122
1123 let mut bloom_filter = crate::bloom_filter::BloomFilter::new(
1126 splitters.len() * 8, );
1128 for &kmer in &splitters {
1129 bloom_filter.insert(kmer);
1130 }
1131
1132 let aux_queue = Arc::new(BoundedPriorityQueue::new(1, usize::MAX));
1134
1135 let shared = Arc::new(SharedCompressorState {
1136 processed_bases: AtomicUsize::new(0),
1137 processed_samples: AtomicUsize::new(0),
1138 raw_contigs: Mutex::new(Vec::new()),
1139 verbosity,
1140 buffered_segments: Arc::new(Mutex::new(BufferedSegments::new(0))),
1141 splitters: Arc::new(Mutex::new(splitters)),
1142 bloom_splitters: Arc::new(Mutex::new(bloom_filter)),
1143 vv_splitters: Mutex::new(vec![Vec::new(); no_workers]),
1144 v_candidate_kmers,
1145 v_duplicated_kmers,
1146 kmer_length,
1147 adaptive_mode,
1148 map_segments: Arc::new(Mutex::new(HashMap::new())),
1149 map_segments_terminators: Arc::new(Mutex::new(HashMap::new())),
1150 concatenated_genomes,
1151 no_segments: Arc::new(Mutex::new(0)),
1152 no_raw_groups: 0,
1153 archive,
1154 v_segments: Arc::new(Mutex::new(Vec::new())),
1155 collection,
1156 aux_queue,
1157 working_queue: Mutex::new(Arc::clone(&queue)), });
1159
1160 let barrier = Arc::new(Barrier::new(no_workers));
1162 let mut worker_handles = Vec::with_capacity(no_workers);
1163
1164 for worker_id in 0..no_workers {
1165 let q = queue.clone();
1166 let b = barrier.clone();
1167 let s = shared.clone();
1168
1169 let handle = std::thread::spawn(move || {
1170 worker_thread(worker_id, no_workers, q, b, s);
1171 });
1172
1173 worker_handles.push(handle);
1174 }
1175
1176 let mut sample_priority = usize::MAX;
1178 let mut _cnt_contigs_in_sample = 0;
1179 const PACK_CARDINALITY: usize = 50; for (sample_name, file_path) in sample_files {
1182 if verbosity > 0 {
1183 eprintln!("Processing sample: {} from {}", sample_name, file_path);
1184 }
1185
1186 let mut gio = match GenomeIO::open(&file_path) {
1188 Ok(g) => g,
1189 Err(e) => {
1190 eprintln!("Cannot open file {}: {}", file_path, e);
1191 continue;
1192 }
1193 };
1194
1195 let mut any_contigs_added = false;
1196
1197 while let Ok(Some((contig_name, sequence))) = gio.read_contig_raw() {
1199 if concatenated_genomes {
1200 let concat_sample_name = String::from("");
1203
1204 if let Some(collection_mutex) = &shared.collection {
1206 let mut collection = collection_mutex.lock().unwrap();
1207 let _ = collection.register_sample_contig(&concat_sample_name, &contig_name);
1208 }
1209
1210 let cost = sequence.len();
1211 queue.emplace(
1212 Task::new_contig(
1213 concat_sample_name,
1214 contig_name.clone(),
1215 sequence,
1216 ContigProcessingStage::AllContigs,
1217 ),
1218 sample_priority,
1219 cost,
1220 );
1221
1222 any_contigs_added = true;
1223 } else {
1224 if let Some(collection_mutex) = &shared.collection {
1227 let mut collection = collection_mutex.lock().unwrap();
1228 let _ = collection.register_sample_contig(&sample_name, &contig_name);
1229 }
1230
1231 let cost = sequence.len();
1232 queue.emplace(
1233 Task::new_contig(
1234 sample_name.clone(),
1235 contig_name.clone(),
1236 sequence,
1237 ContigProcessingStage::AllContigs,
1238 ),
1239 sample_priority,
1240 cost,
1241 );
1242
1243 any_contigs_added = true;
1244 }
1245 }
1246
1247 if !concatenated_genomes && any_contigs_added {
1249 let sync_stage = if adaptive_mode {
1250 ContigProcessingStage::NewSplitters
1251 } else {
1252 ContigProcessingStage::Registration
1253 };
1254
1255 queue.emplace_many_no_cost(Task::new_sync(sync_stage), sample_priority, no_workers);
1257
1258 sample_priority -= 1;
1259 }
1260 }
1261
1262 queue.mark_completed();
1264
1265 for handle in worker_handles {
1266 handle.join().map_err(|_| "Worker thread panicked")?;
1267 }
1268
1269 if verbosity > 0 {
1270 let total_bases = shared.processed_bases.load(Ordering::Relaxed);
1271 eprintln!("Compression complete: {} bases processed", total_bases);
1272 }
1273
1274 Ok(())
1275}
1276
1277#[cfg(test)]
1278mod tests {
1279 use super::*;
1280
1281 #[test]
1282 fn test_shared_state_creation() {
1283 let queue = Arc::new(BoundedPriorityQueue::new(1, 1000));
1284 let state = SharedCompressorState::new(1, 21, false, false, 0, queue);
1285 assert_eq!(state.verbosity, 1);
1286 assert_eq!(state.kmer_length, 21);
1287 assert_eq!(state.adaptive_mode, false);
1288 assert_eq!(state.concatenated_genomes, false);
1289 assert_eq!(state.no_raw_groups, 0);
1290 assert_eq!(state.processed_bases.load(Ordering::Relaxed), 0);
1291 assert_eq!(state.processed_samples.load(Ordering::Relaxed), 0);
1292 assert_eq!(state.raw_contigs.lock().unwrap().len(), 0);
1293 assert_eq!(*state.no_segments.lock().unwrap(), 0);
1294 }
1295
1296 #[test]
1297 fn test_worker_thread_completion() {
1298 use std::thread;
1299
1300 let queue = Arc::new(BoundedPriorityQueue::new(1, 1000));
1301 let barrier = Arc::new(Barrier::new(2));
1302 let shared = Arc::new(SharedCompressorState::new(
1303 0,
1304 21,
1305 false,
1306 false,
1307 0,
1308 queue.clone(),
1309 ));
1310
1311 queue.mark_completed();
1313
1314 let handles: Vec<_> = (0..2)
1316 .map(|worker_id| {
1317 let q = queue.clone();
1318 let b = barrier.clone();
1319 let s = shared.clone();
1320
1321 thread::spawn(move || {
1322 worker_thread(worker_id, 2, q, b, s);
1323 })
1324 })
1325 .collect();
1326
1327 for handle in handles {
1329 handle.join().unwrap();
1330 }
1331 }
1332
1333 #[test]
1334 fn test_worker_thread_with_task() {
1335 use std::thread;
1336
1337 let queue = Arc::new(BoundedPriorityQueue::new(1, 1000));
1338 let barrier = Arc::new(Barrier::new(1));
1339 let shared = Arc::new(SharedCompressorState::new(
1340 0,
1341 21,
1342 false,
1343 false,
1344 0,
1345 queue.clone(),
1346 ));
1347
1348 queue.emplace(
1350 Task::new_contig(
1351 "sample1".to_string(),
1352 "chr1".to_string(),
1353 vec![0, 1, 2, 3, 0, 1, 2, 3], ContigProcessingStage::AllContigs,
1355 ),
1356 100,
1357 8,
1358 );
1359 queue.mark_completed();
1360
1361 let q = queue.clone();
1363 let b = barrier.clone();
1364 let s = shared.clone();
1365
1366 let handle = thread::spawn(move || {
1367 worker_thread(0, 1, q, b, s);
1368 });
1369
1370 handle.join().unwrap();
1371
1372 assert_eq!(shared.processed_bases.load(Ordering::Relaxed), 8);
1374 }
1375}