ragc_core/task.rs
1// Task types for streaming compression pipeline
2// Matches C++ AGC's contig_processing_stage_t and task_t (agc_compressor.h:654-656)
3
4use ragc_common::Contig;
5
6/// Processing stage for a contig task
7///
8/// This matches C++ AGC's contig_processing_stage_t enum (agc_compressor.h:654):
9/// ```cpp
10/// enum class contig_processing_stage_t {
11/// unknown, all_contigs, new_splitters, hard_contigs, registration
12/// };
13/// ```
14///
15/// Stages:
16/// - **AllContigs**: Initial contig processing (normal compression flow)
17/// - **Registration**: Synchronization barrier for segment registration
18/// - Sent after each sample when adaptive_mode is OFF
19/// - Workers arrive at barrier, register segments, then continue
20/// - **NewSplitters**: Synchronization barrier for adaptive splitter finding
21/// - Sent after each sample when adaptive_mode is ON
22/// - Workers arrive at barrier, find new splitters, then continue
23/// - **HardContigs**: Re-process contigs that need new splitters
24/// - Contigs that didn't segment well are re-enqueued with this stage
25/// - Uses expanded splitter set from adaptive mode
26#[derive(Debug, Clone, Copy, PartialEq, Eq)]
27pub enum ContigProcessingStage {
28 /// Initial contig processing (compression)
29 AllContigs,
30
31 /// Synchronization barrier: register segments and update terminators
32 Registration,
33
34 /// Synchronization barrier: find adaptive splitters for hard contigs
35 NewSplitters,
36
37 /// Re-process contigs with new splitters (adaptive mode)
38 HardContigs,
39}
40
41/// A task for the worker queue
42///
43/// This matches C++ AGC's task_t typedef (agc_compressor.h:656):
44/// ```cpp
45/// using task_t = tuple<contig_processing_stage_t, string, string, contig_t>;
46/// ```
47///
48/// Fields:
49/// - `stage`: What type of processing this task needs
50/// - `sample_name`: Which sample this contig belongs to
51/// - `contig_name`: Contig identifier (chromosome name, etc.)
52/// - `sequence`: The actual sequence data
53///
54/// For synchronization tasks (Registration, NewSplitters):
55/// - `sample_name`, `contig_name`, and `sequence` are empty
56/// - Workers detect sync tasks and enter barrier synchronization
57#[derive(Debug, Clone, PartialEq, Eq)]
58pub struct Task {
59 pub stage: ContigProcessingStage,
60 pub sample_name: String,
61 pub contig_name: String,
62 pub sequence: Contig,
63}
64
65impl Task {
66 /// Create a new contig processing task
67 pub fn new_contig(
68 sample_name: String,
69 contig_name: String,
70 sequence: Contig,
71 stage: ContigProcessingStage,
72 ) -> Self {
73 Task {
74 stage,
75 sample_name,
76 contig_name,
77 sequence,
78 }
79 }
80
81 /// Create a synchronization barrier task
82 ///
83 /// This is sent N times (once per worker) to trigger barrier synchronization.
84 /// Matches C++ AGC's EmplaceManyNoCost() pattern (agc_compressor.cpp:2197).
85 pub fn new_sync(stage: ContigProcessingStage) -> Self {
86 assert!(
87 stage == ContigProcessingStage::Registration
88 || stage == ContigProcessingStage::NewSplitters,
89 "Sync tasks must be Registration or NewSplitters"
90 );
91
92 Task {
93 stage,
94 sample_name: String::new(),
95 contig_name: String::new(),
96 sequence: Vec::new(),
97 }
98 }
99
100 /// Check if this is a synchronization task
101 pub fn is_sync(&self) -> bool {
102 (self.stage == ContigProcessingStage::Registration
103 || self.stage == ContigProcessingStage::NewSplitters)
104 && self.sample_name.is_empty()
105 && self.contig_name.is_empty()
106 }
107}
108
109#[cfg(test)]
110mod tests {
111 use super::*;
112
113 #[test]
114 fn test_contig_task_creation() {
115 let task = Task::new_contig(
116 "sample1".to_string(),
117 "chr1".to_string(),
118 vec![0, 1, 2, 3],
119 ContigProcessingStage::AllContigs,
120 );
121
122 assert_eq!(task.stage, ContigProcessingStage::AllContigs);
123 assert_eq!(task.sample_name, "sample1");
124 assert_eq!(task.contig_name, "chr1");
125 assert_eq!(task.sequence, vec![0, 1, 2, 3]);
126 assert!(!task.is_sync());
127 }
128
129 #[test]
130 fn test_registration_sync_task() {
131 let task = Task::new_sync(ContigProcessingStage::Registration);
132
133 assert_eq!(task.stage, ContigProcessingStage::Registration);
134 assert!(task.sample_name.is_empty());
135 assert!(task.contig_name.is_empty());
136 assert!(task.sequence.is_empty());
137 assert!(task.is_sync());
138 }
139
140 #[test]
141 fn test_new_splitters_sync_task() {
142 let task = Task::new_sync(ContigProcessingStage::NewSplitters);
143
144 assert_eq!(task.stage, ContigProcessingStage::NewSplitters);
145 assert!(task.is_sync());
146 }
147
148 #[test]
149 #[should_panic(expected = "Sync tasks must be Registration or NewSplitters")]
150 fn test_invalid_sync_task() {
151 Task::new_sync(ContigProcessingStage::AllContigs);
152 }
153
154 #[test]
155 fn test_hard_contig_task() {
156 let task = Task::new_contig(
157 "sample1".to_string(),
158 "chr1".to_string(),
159 vec![0, 1, 2, 3],
160 ContigProcessingStage::HardContigs,
161 );
162
163 assert_eq!(task.stage, ContigProcessingStage::HardContigs);
164 assert!(!task.is_sync());
165 }
166}