ragc_core/
task.rs

1// Task types for streaming compression pipeline
2// Matches C++ AGC's contig_processing_stage_t and task_t (agc_compressor.h:654-656)
3
4use ragc_common::Contig;
5
6/// Processing stage for a contig task
7///
8/// This matches C++ AGC's contig_processing_stage_t enum (agc_compressor.h:654):
9/// ```cpp
10/// enum class contig_processing_stage_t {
11///     unknown, all_contigs, new_splitters, hard_contigs, registration
12/// };
13/// ```
14///
15/// Stages:
16/// - **AllContigs**: Initial contig processing (normal compression flow)
17/// - **Registration**: Synchronization barrier for segment registration
18///   - Sent after each sample when adaptive_mode is OFF
19///   - Workers arrive at barrier, register segments, then continue
20/// - **NewSplitters**: Synchronization barrier for adaptive splitter finding
21///   - Sent after each sample when adaptive_mode is ON
22///   - Workers arrive at barrier, find new splitters, then continue
23/// - **HardContigs**: Re-process contigs that need new splitters
24///   - Contigs that didn't segment well are re-enqueued with this stage
25///   - Uses expanded splitter set from adaptive mode
26#[derive(Debug, Clone, Copy, PartialEq, Eq)]
27pub enum ContigProcessingStage {
28    /// Initial contig processing (compression)
29    AllContigs,
30
31    /// Synchronization barrier: register segments and update terminators
32    Registration,
33
34    /// Synchronization barrier: find adaptive splitters for hard contigs
35    NewSplitters,
36
37    /// Re-process contigs with new splitters (adaptive mode)
38    HardContigs,
39}
40
41/// A task for the worker queue
42///
43/// This matches C++ AGC's task_t typedef (agc_compressor.h:656):
44/// ```cpp
45/// using task_t = tuple<contig_processing_stage_t, string, string, contig_t>;
46/// ```
47///
48/// Fields:
49/// - `stage`: What type of processing this task needs
50/// - `sample_name`: Which sample this contig belongs to
51/// - `contig_name`: Contig identifier (chromosome name, etc.)
52/// - `sequence`: The actual sequence data
53///
54/// For synchronization tasks (Registration, NewSplitters):
55/// - `sample_name`, `contig_name`, and `sequence` are empty
56/// - Workers detect sync tasks and enter barrier synchronization
57#[derive(Debug, Clone, PartialEq, Eq)]
58pub struct Task {
59    pub stage: ContigProcessingStage,
60    pub sample_name: String,
61    pub contig_name: String,
62    pub sequence: Contig,
63}
64
65impl Task {
66    /// Create a new contig processing task
67    pub fn new_contig(
68        sample_name: String,
69        contig_name: String,
70        sequence: Contig,
71        stage: ContigProcessingStage,
72    ) -> Self {
73        Task {
74            stage,
75            sample_name,
76            contig_name,
77            sequence,
78        }
79    }
80
81    /// Create a synchronization barrier task
82    ///
83    /// This is sent N times (once per worker) to trigger barrier synchronization.
84    /// Matches C++ AGC's EmplaceManyNoCost() pattern (agc_compressor.cpp:2197).
85    pub fn new_sync(stage: ContigProcessingStage) -> Self {
86        assert!(
87            stage == ContigProcessingStage::Registration
88                || stage == ContigProcessingStage::NewSplitters,
89            "Sync tasks must be Registration or NewSplitters"
90        );
91
92        Task {
93            stage,
94            sample_name: String::new(),
95            contig_name: String::new(),
96            sequence: Vec::new(),
97        }
98    }
99
100    /// Check if this is a synchronization task
101    pub fn is_sync(&self) -> bool {
102        (self.stage == ContigProcessingStage::Registration
103            || self.stage == ContigProcessingStage::NewSplitters)
104            && self.sample_name.is_empty()
105            && self.contig_name.is_empty()
106    }
107}
108
109#[cfg(test)]
110mod tests {
111    use super::*;
112
113    #[test]
114    fn test_contig_task_creation() {
115        let task = Task::new_contig(
116            "sample1".to_string(),
117            "chr1".to_string(),
118            vec![0, 1, 2, 3],
119            ContigProcessingStage::AllContigs,
120        );
121
122        assert_eq!(task.stage, ContigProcessingStage::AllContigs);
123        assert_eq!(task.sample_name, "sample1");
124        assert_eq!(task.contig_name, "chr1");
125        assert_eq!(task.sequence, vec![0, 1, 2, 3]);
126        assert!(!task.is_sync());
127    }
128
129    #[test]
130    fn test_registration_sync_task() {
131        let task = Task::new_sync(ContigProcessingStage::Registration);
132
133        assert_eq!(task.stage, ContigProcessingStage::Registration);
134        assert!(task.sample_name.is_empty());
135        assert!(task.contig_name.is_empty());
136        assert!(task.sequence.is_empty());
137        assert!(task.is_sync());
138    }
139
140    #[test]
141    fn test_new_splitters_sync_task() {
142        let task = Task::new_sync(ContigProcessingStage::NewSplitters);
143
144        assert_eq!(task.stage, ContigProcessingStage::NewSplitters);
145        assert!(task.is_sync());
146    }
147
148    #[test]
149    #[should_panic(expected = "Sync tasks must be Registration or NewSplitters")]
150    fn test_invalid_sync_task() {
151        Task::new_sync(ContigProcessingStage::AllContigs);
152    }
153
154    #[test]
155    fn test_hard_contig_task() {
156        let task = Task::new_contig(
157            "sample1".to_string(),
158            "chr1".to_string(),
159            vec![0, 1, 2, 3],
160            ContigProcessingStage::HardContigs,
161        );
162
163        assert_eq!(task.stage, ContigProcessingStage::HardContigs);
164        assert!(!task.is_sync());
165    }
166}