ant_quic/workflow/
mod.rs

1//! Workflow Orchestration System for NAT Traversal
2//!
3//! This module provides a comprehensive workflow orchestration system that coordinates
4//! the complex multi-phase NAT traversal process across distributed components.
5
6use std::{
7    collections::HashMap,
8    fmt,
9    sync::Arc,
10    time::{Duration, Instant},
11};
12
13use serde::{Deserialize, Serialize};
14use tokio::sync::{mpsc, RwLock};
15
16pub mod definition;
17pub mod engine;
18pub mod state_store;
19pub mod coordinator;
20pub mod monitor;
21
22pub use definition::*;
23pub use engine::*;
24pub use state_store::*;
25pub use coordinator::*;
26pub use monitor::*;
27
28/// Unique identifier for a workflow
29#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
30pub struct WorkflowId(pub [u8; 16]);
31
32impl WorkflowId {
33    /// Generate a new random workflow ID
34    pub fn generate() -> Self {
35        let mut id = [0u8; 16];
36        use rand::Rng;
37        rand::thread_rng().fill(&mut id);
38        Self(id)
39    }
40}
41
42impl fmt::Display for WorkflowId {
43    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
44        write!(f, "{}", hex::encode(&self.0[..8]))
45    }
46}
47
48/// Unique identifier for a workflow stage
49#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
50pub struct StageId(pub String);
51
52impl fmt::Display for StageId {
53    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54        write!(f, "{}", self.0)
55    }
56}
57
58/// Version identifier for workflow definitions
59#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
60pub struct Version {
61    pub major: u32,
62    pub minor: u32,
63    pub patch: u32,
64}
65
66impl fmt::Display for Version {
67    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
68        write!(f, "{}.{}.{}", self.major, self.minor, self.patch)
69    }
70}
71
72/// Events that can trigger workflow state transitions
73#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
74pub enum WorkflowEvent {
75    /// Start the workflow
76    Start,
77    /// Stage completed successfully
78    StageCompleted { stage_id: StageId },
79    /// Stage failed with error
80    StageFailed { stage_id: StageId, error: String },
81    /// External event from another component
82    External { event_type: String, data: Vec<u8> },
83    /// Timeout occurred
84    Timeout { stage_id: StageId },
85    /// User-initiated cancellation
86    Cancel,
87    /// System error
88    SystemError { error: String },
89}
90
91/// Current status of a workflow instance
92#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
93pub enum WorkflowStatus {
94    /// Workflow is being initialized
95    Initializing,
96    /// Workflow is actively executing
97    Running { current_stage: StageId },
98    /// Workflow is waiting for an event
99    Waiting { stage: StageId, event: String },
100    /// Workflow is paused
101    Paused { stage: StageId },
102    /// Workflow completed successfully
103    Completed { result: WorkflowResult },
104    /// Workflow failed
105    Failed { error: WorkflowError },
106    /// Workflow was cancelled
107    Cancelled,
108}
109
110/// Result of a successful workflow completion
111#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
112pub struct WorkflowResult {
113    /// Final output data
114    pub output: HashMap<String, Vec<u8>>,
115    /// Execution duration
116    pub duration: Duration,
117    /// Metrics collected during execution
118    pub metrics: WorkflowMetrics,
119}
120
121/// Error information for failed workflows
122#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
123pub struct WorkflowError {
124    /// Error code
125    pub code: String,
126    /// Human-readable error message
127    pub message: String,
128    /// Stage where error occurred
129    pub stage: Option<StageId>,
130    /// Stack trace if available
131    pub trace: Option<String>,
132    /// Recovery suggestions
133    pub recovery_hints: Vec<String>,
134}
135
136impl fmt::Display for WorkflowError {
137    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
138        write!(f, "[{}] {}", self.code, self.message)?;
139        if let Some(stage) = &self.stage {
140            write!(f, " at stage {}", stage)?;
141        }
142        Ok(())
143    }
144}
145
146impl std::error::Error for WorkflowError {}
147
148/// Metrics collected during workflow execution
149#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
150pub struct WorkflowMetrics {
151    /// Total stages executed
152    pub stages_executed: u32,
153    /// Number of retries
154    pub retry_count: u32,
155    /// Number of errors encountered
156    pub error_count: u32,
157    /// Time spent in each stage
158    pub stage_durations: HashMap<StageId, Duration>,
159    /// Custom metrics
160    pub custom: HashMap<String, f64>,
161}
162
163/// Handle to interact with a running workflow
164#[derive(Debug, Clone)]
165pub struct WorkflowHandle {
166    /// Workflow ID
167    pub id: WorkflowId,
168    /// Channel to send events to the workflow
169    event_tx: mpsc::Sender<WorkflowEvent>,
170    /// Current status
171    status: Arc<RwLock<WorkflowStatus>>,
172}
173
174impl WorkflowHandle {
175    /// Create a new workflow handle
176    pub fn new(id: WorkflowId, event_tx: mpsc::Sender<WorkflowEvent>) -> Self {
177        Self {
178            id,
179            event_tx,
180            status: Arc::new(RwLock::new(WorkflowStatus::Initializing)),
181        }
182    }
183
184    /// Send an event to the workflow
185    pub async fn send_event(&self, event: WorkflowEvent) -> Result<(), WorkflowError> {
186        self.event_tx.send(event).await.map_err(|_| WorkflowError {
187            code: "SEND_FAILED".to_string(),
188            message: "Failed to send event to workflow".to_string(),
189            stage: None,
190            trace: None,
191            recovery_hints: vec!["Workflow may have terminated".to_string()],
192        })
193    }
194
195    /// Get the current status of the workflow
196    pub async fn status(&self) -> WorkflowStatus {
197        self.status.read().await.clone()
198    }
199
200    /// Cancel the workflow
201    pub async fn cancel(&self) -> Result<(), WorkflowError> {
202        self.send_event(WorkflowEvent::Cancel).await
203    }
204
205    /// Update the status (internal use)
206    pub(crate) async fn update_status(&self, status: WorkflowStatus) {
207        *self.status.write().await = status;
208    }
209}
210
211/// Context provided to workflow actions during execution
212#[derive(Debug)]
213pub struct WorkflowContext {
214    /// Workflow ID
215    pub workflow_id: WorkflowId,
216    /// Current stage
217    pub current_stage: StageId,
218    /// Shared state between stages
219    pub state: HashMap<String, Vec<u8>>,
220    /// Metrics collector
221    pub metrics: WorkflowMetrics,
222    /// Start time of current stage
223    pub stage_start: Instant,
224}
225
226impl WorkflowContext {
227    /// Store a value in the workflow state
228    pub fn set_state(&mut self, key: String, value: Vec<u8>) {
229        self.state.insert(key, value);
230    }
231
232    /// Retrieve a value from the workflow state
233    pub fn get_state(&self, key: &str) -> Option<&Vec<u8>> {
234        self.state.get(key)
235    }
236
237    /// Record a custom metric
238    pub fn record_metric(&mut self, name: String, value: f64) {
239        self.metrics.custom.insert(name, value);
240    }
241}
242
243/// Trait for implementing workflow actions
244#[async_trait::async_trait]
245pub trait WorkflowAction: Send + Sync {
246    /// Execute the action
247    async fn execute(&self, context: &mut WorkflowContext) -> Result<(), WorkflowError>;
248    
249    /// Get the action name for logging
250    fn name(&self) -> &str;
251}
252
253/// Condition that must be satisfied for stage execution
254#[async_trait::async_trait]
255pub trait Condition: Send + Sync {
256    /// Check if the condition is satisfied
257    async fn check(&self, context: &WorkflowContext) -> bool;
258    
259    /// Get the condition description
260    fn description(&self) -> &str;
261}
262
263/// Error handler for workflow stages
264#[derive(Debug, Clone, Serialize, Deserialize)]
265pub struct ErrorHandler {
266    /// Maximum retry attempts
267    pub max_retries: u32,
268    /// Backoff strategy
269    pub backoff: BackoffStrategy,
270    /// Fallback stage on failure
271    pub fallback_stage: Option<StageId>,
272    /// Whether to propagate the error
273    pub propagate: bool,
274}
275
276/// Backoff strategy for retries
277#[derive(Debug, Clone, Serialize, Deserialize)]
278pub enum BackoffStrategy {
279    /// Fixed delay between retries
280    Fixed { delay: Duration },
281    /// Exponential backoff
282    Exponential { initial: Duration, max: Duration, factor: f64 },
283    /// Linear increase
284    Linear { initial: Duration, increment: Duration },
285}
286
287impl BackoffStrategy {
288    /// Calculate the delay for a given retry attempt
289    pub fn calculate_delay(&self, attempt: u32) -> Duration {
290        match self {
291            BackoffStrategy::Fixed { delay } => *delay,
292            BackoffStrategy::Exponential { initial, max, factor } => {
293                let delay = initial.as_millis() as f64 * factor.powi(attempt as i32);
294                let delay_ms = delay.min(max.as_millis() as f64) as u64;
295                Duration::from_millis(delay_ms)
296            }
297            BackoffStrategy::Linear { initial, increment } => {
298                *initial + increment.saturating_mul(attempt)
299            }
300        }
301    }
302}
303
304/// Rollback strategy for failed stages
305#[derive(Debug, Clone, Serialize, Deserialize)]
306pub enum RollbackStrategy {
307    /// No rollback
308    None,
309    /// Execute compensating actions
310    Compensate { actions: Vec<String> },
311    /// Restore from checkpoint
312    RestoreCheckpoint { checkpoint_id: String },
313    /// Jump to a specific stage
314    JumpToStage { stage_id: StageId },
315}
316
317#[cfg(test)]
318mod tests {
319    use super::*;
320
321    #[test]
322    fn test_workflow_id_generation() {
323        let id1 = WorkflowId::generate();
324        let id2 = WorkflowId::generate();
325        assert_ne!(id1, id2);
326    }
327
328    #[test]
329    fn test_backoff_strategy() {
330        let fixed = BackoffStrategy::Fixed { delay: Duration::from_secs(1) };
331        assert_eq!(fixed.calculate_delay(0), Duration::from_secs(1));
332        assert_eq!(fixed.calculate_delay(5), Duration::from_secs(1));
333
334        let exponential = BackoffStrategy::Exponential {
335            initial: Duration::from_millis(100),
336            max: Duration::from_secs(10),
337            factor: 2.0,
338        };
339        assert_eq!(exponential.calculate_delay(0), Duration::from_millis(100));
340        assert_eq!(exponential.calculate_delay(1), Duration::from_millis(200));
341        assert_eq!(exponential.calculate_delay(2), Duration::from_millis(400));
342    }
343
344    #[test]
345    fn test_version_display() {
346        let version = Version { major: 1, minor: 2, patch: 3 };
347        assert_eq!(version.to_string(), "1.2.3");
348    }
349}