Skip to main content

oxigdal_cluster/workflow/
mod.rs

1//! Workflow orchestration engine for complex distributed tasks.
2//!
3//! This module provides workflow capabilities including:
4//! - Workflow templates and definitions
5//! - Conditional execution (if/else branching)
6//! - Loops and iteration
7//! - Workflow versioning
8//! - Workflow resumption after failure
9//! - Workflow monitoring and visualization
10
11use crate::error::{ClusterError, Result};
12use dashmap::DashMap;
13use parking_lot::RwLock;
14use serde::{Deserialize, Serialize};
15use std::collections::{HashMap, VecDeque};
16use std::sync::Arc;
17use std::time::{Duration, SystemTime};
18
19/// Workflow identifier.
20pub type WorkflowId = uuid::Uuid;
21
22/// Workflow definition.
23#[derive(Debug, Clone, Serialize, Deserialize)]
24pub struct Workflow {
25    /// Workflow ID
26    pub id: WorkflowId,
27    /// Workflow name
28    pub name: String,
29    /// Workflow version
30    pub version: String,
31    /// Workflow description
32    pub description: Option<String>,
33    /// Workflow steps
34    pub steps: Vec<WorkflowStep>,
35    /// Workflow variables
36    pub variables: HashMap<String, serde_json::Value>,
37    /// Created at
38    pub created_at: SystemTime,
39    /// Updated at
40    pub updated_at: SystemTime,
41}
42
43/// Workflow step definition.
44#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct WorkflowStep {
46    /// Step ID
47    pub id: String,
48    /// Step name
49    pub name: String,
50    /// Step type
51    pub step_type: StepType,
52    /// Dependencies (step IDs that must complete before this step)
53    pub depends_on: Vec<String>,
54    /// Retry configuration
55    pub retry: Option<RetryConfig>,
56    /// Timeout
57    pub timeout: Option<Duration>,
58}
59
60/// Workflow step type.
61#[derive(Debug, Clone, Serialize, Deserialize)]
62#[serde(tag = "type")]
63pub enum StepType {
64    /// Execute a task
65    Task {
66        /// Task template name to execute
67        task_template: String,
68        /// Parameters for the task
69        parameters: HashMap<String, serde_json::Value>,
70    },
71    /// Conditional execution
72    Condition {
73        /// Condition expression to evaluate
74        condition: String,
75        /// Steps to execute if condition is true
76        then_steps: Vec<String>,
77        /// Steps to execute if condition is false
78        else_steps: Option<Vec<String>>,
79    },
80    /// Loop iteration
81    Loop {
82        /// Variable name for loop iterator
83        iterator: String,
84        /// Items to iterate over
85        items: Vec<serde_json::Value>,
86        /// Steps to execute in each iteration
87        body_steps: Vec<String>,
88    },
89    /// Parallel execution
90    Parallel {
91        /// Branches to execute in parallel
92        branches: Vec<Vec<String>>,
93    },
94    /// Wait for duration
95    Wait {
96        /// Duration to wait
97        duration: Duration,
98    },
99    /// Checkpoint for workflow state persistence
100    Checkpoint {
101        /// Checkpoint name
102        name: String,
103    },
104}
105
106/// Retry configuration.
107#[derive(Debug, Clone, Serialize, Deserialize)]
108pub struct RetryConfig {
109    /// Maximum retry attempts
110    pub max_attempts: u32,
111    /// Backoff strategy
112    pub backoff: BackoffStrategy,
113    /// Retry on specific errors
114    pub retry_on: Vec<String>,
115}
116
117/// Backoff strategy for retries.
118#[derive(Debug, Clone, Serialize, Deserialize)]
119pub enum BackoffStrategy {
120    /// Fixed delay between retries
121    Fixed {
122        /// Delay duration between retries
123        delay: Duration,
124    },
125    /// Exponential backoff with multiplier
126    Exponential {
127        /// Initial delay duration
128        initial: Duration,
129        /// Multiplier for each subsequent retry
130        multiplier: f64,
131        /// Maximum delay cap
132        max: Duration,
133    },
134    /// Linear backoff with constant increment
135    Linear {
136        /// Initial delay duration
137        initial: Duration,
138        /// Increment added for each retry
139        increment: Duration,
140        /// Maximum delay cap
141        max: Duration,
142    },
143}
144
145/// Workflow execution instance.
146#[derive(Debug, Clone, Serialize, Deserialize)]
147pub struct WorkflowExecution {
148    /// Execution ID
149    pub id: uuid::Uuid,
150    /// Workflow ID
151    pub workflow_id: WorkflowId,
152    /// Execution status
153    pub status: WorkflowStatus,
154    /// Current step
155    pub current_step: Option<String>,
156    /// Completed steps
157    pub completed_steps: Vec<String>,
158    /// Failed steps
159    pub failed_steps: Vec<String>,
160    /// Execution context (variables)
161    pub context: HashMap<String, serde_json::Value>,
162    /// Started at
163    pub started_at: SystemTime,
164    /// Completed at
165    pub completed_at: Option<SystemTime>,
166    /// Execution history
167    pub history: Vec<ExecutionEvent>,
168}
169
170/// Workflow execution status.
171#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
172pub enum WorkflowStatus {
173    /// Pending execution
174    Pending,
175    /// Currently running
176    Running,
177    /// Paused (can be resumed)
178    Paused,
179    /// Completed successfully
180    Completed,
181    /// Failed
182    Failed,
183    /// Cancelled
184    Cancelled,
185}
186
187/// Execution event for history tracking.
188#[derive(Debug, Clone, Serialize, Deserialize)]
189pub struct ExecutionEvent {
190    /// Event type
191    pub event_type: EventType,
192    /// Step ID
193    pub step_id: Option<String>,
194    /// Timestamp
195    pub timestamp: SystemTime,
196    /// Event message
197    pub message: String,
198}
199
200/// Event type.
201#[derive(Debug, Clone, Serialize, Deserialize)]
202pub enum EventType {
203    /// Workflow execution started
204    WorkflowStarted,
205    /// Step execution started
206    StepStarted,
207    /// Step completed successfully
208    StepCompleted,
209    /// Step failed
210    StepFailed,
211    /// Step is being retried
212    StepRetrying,
213    /// Workflow completed successfully
214    WorkflowCompleted,
215    /// Workflow failed
216    WorkflowFailed,
217    /// Workflow paused
218    WorkflowPaused,
219    /// Workflow resumed from pause
220    WorkflowResumed,
221    /// Workflow cancelled
222    WorkflowCancelled,
223}
224
225/// Workflow engine for orchestrating executions.
226pub struct WorkflowEngine {
227    /// Workflow definitions
228    workflows: Arc<DashMap<WorkflowId, Workflow>>,
229    /// Active executions
230    executions: Arc<DashMap<uuid::Uuid, RwLock<WorkflowExecution>>>,
231    /// Workflow templates
232    templates: Arc<DashMap<String, WorkflowTemplate>>,
233    /// Execution queue
234    queue: Arc<RwLock<VecDeque<uuid::Uuid>>>,
235    /// Statistics
236    stats: Arc<RwLock<WorkflowStats>>,
237}
238
239/// Workflow template.
240#[derive(Debug, Clone, Serialize, Deserialize)]
241pub struct WorkflowTemplate {
242    /// Template name
243    pub name: String,
244    /// Template version
245    pub version: String,
246    /// Parameters that can be customized
247    pub parameters: Vec<TemplateParameter>,
248    /// Steps in the workflow
249    pub steps: Vec<WorkflowStep>,
250}
251
252/// Template parameter definition.
253#[derive(Debug, Clone, Serialize, Deserialize)]
254pub struct TemplateParameter {
255    /// Parameter name
256    pub name: String,
257    /// Parameter type (string, number, etc.)
258    pub param_type: String,
259    /// Whether the parameter is required
260    pub required: bool,
261    /// Default value if not provided
262    pub default: Option<serde_json::Value>,
263    /// Human-readable description
264    pub description: Option<String>,
265}
266
267/// Workflow statistics.
268#[derive(Debug, Clone, Default, Serialize, Deserialize)]
269pub struct WorkflowStats {
270    /// Total number of registered workflows
271    pub total_workflows: usize,
272    /// Total number of executions started
273    pub total_executions: u64,
274    /// Number of currently running executions
275    pub running_executions: usize,
276    /// Number of completed executions
277    pub completed_executions: u64,
278    /// Number of failed executions
279    pub failed_executions: u64,
280    /// Average time to complete an execution
281    pub average_execution_time: Duration,
282}
283
284impl WorkflowEngine {
285    /// Create a new workflow engine.
286    pub fn new() -> Self {
287        Self {
288            workflows: Arc::new(DashMap::new()),
289            executions: Arc::new(DashMap::new()),
290            templates: Arc::new(DashMap::new()),
291            queue: Arc::new(RwLock::new(VecDeque::new())),
292            stats: Arc::new(RwLock::new(WorkflowStats::default())),
293        }
294    }
295
296    /// Register a workflow definition.
297    pub fn register_workflow(&self, workflow: Workflow) -> Result<WorkflowId> {
298        let id = workflow.id;
299        self.workflows.insert(id, workflow);
300
301        let mut stats = self.stats.write();
302        stats.total_workflows = self.workflows.len();
303
304        Ok(id)
305    }
306
307    /// Register a workflow template.
308    pub fn register_template(&self, template: WorkflowTemplate) -> Result<()> {
309        self.templates.insert(template.name.clone(), template);
310        Ok(())
311    }
312
313    /// Create workflow from template.
314    pub fn create_from_template(
315        &self,
316        template_name: &str,
317        parameters: HashMap<String, serde_json::Value>,
318    ) -> Result<Workflow> {
319        let template = self
320            .templates
321            .get(template_name)
322            .ok_or_else(|| ClusterError::WorkflowNotFound(template_name.to_string()))?;
323
324        // Validate required parameters
325        for param in &template.parameters {
326            if param.required && !parameters.contains_key(&param.name) {
327                return Err(ClusterError::InvalidConfiguration(format!(
328                    "Missing required parameter: {}",
329                    param.name
330                )));
331            }
332        }
333
334        let workflow = Workflow {
335            id: uuid::Uuid::new_v4(),
336            name: template.name.clone(),
337            version: template.version.clone(),
338            description: None,
339            steps: template.steps.clone(),
340            variables: parameters,
341            created_at: SystemTime::now(),
342            updated_at: SystemTime::now(),
343        };
344
345        Ok(workflow)
346    }
347
348    /// Start a workflow execution.
349    pub fn start_execution(&self, workflow_id: WorkflowId) -> Result<uuid::Uuid> {
350        let workflow = self
351            .workflows
352            .get(&workflow_id)
353            .ok_or_else(|| ClusterError::WorkflowNotFound(workflow_id.to_string()))?;
354
355        let execution_id = uuid::Uuid::new_v4();
356        let execution = WorkflowExecution {
357            id: execution_id,
358            workflow_id,
359            status: WorkflowStatus::Running,
360            current_step: None,
361            completed_steps: Vec::new(),
362            failed_steps: Vec::new(),
363            context: workflow.variables.clone(),
364            started_at: SystemTime::now(),
365            completed_at: None,
366            history: vec![ExecutionEvent {
367                event_type: EventType::WorkflowStarted,
368                step_id: None,
369                timestamp: SystemTime::now(),
370                message: format!("Started workflow execution: {}", workflow.name),
371            }],
372        };
373
374        self.executions.insert(execution_id, RwLock::new(execution));
375        self.queue.write().push_back(execution_id);
376
377        let mut stats = self.stats.write();
378        stats.total_executions += 1;
379        stats.running_executions = self.count_running_executions();
380
381        Ok(execution_id)
382    }
383
384    /// Pause a workflow execution.
385    pub fn pause_execution(&self, execution_id: uuid::Uuid) -> Result<()> {
386        let execution = self
387            .executions
388            .get(&execution_id)
389            .ok_or_else(|| ClusterError::WorkflowNotFound(execution_id.to_string()))?;
390
391        let mut exec = execution.write();
392        if exec.status != WorkflowStatus::Running {
393            return Err(ClusterError::InvalidOperation(format!(
394                "Cannot pause workflow in status {:?}",
395                exec.status
396            )));
397        }
398
399        exec.status = WorkflowStatus::Paused;
400        let current_step = exec.current_step.clone();
401        exec.history.push(ExecutionEvent {
402            event_type: EventType::WorkflowPaused,
403            step_id: current_step,
404            timestamp: SystemTime::now(),
405            message: "Workflow paused".to_string(),
406        });
407
408        Ok(())
409    }
410
411    /// Resume a paused workflow execution.
412    pub fn resume_execution(&self, execution_id: uuid::Uuid) -> Result<()> {
413        let execution = self
414            .executions
415            .get(&execution_id)
416            .ok_or_else(|| ClusterError::WorkflowNotFound(execution_id.to_string()))?;
417
418        let mut exec = execution.write();
419        if exec.status != WorkflowStatus::Paused {
420            return Err(ClusterError::InvalidOperation(format!(
421                "Cannot resume workflow in status {:?}",
422                exec.status
423            )));
424        }
425
426        exec.status = WorkflowStatus::Running;
427        let current_step = exec.current_step.clone();
428        exec.history.push(ExecutionEvent {
429            event_type: EventType::WorkflowResumed,
430            step_id: current_step,
431            timestamp: SystemTime::now(),
432            message: "Workflow resumed".to_string(),
433        });
434
435        self.queue.write().push_back(execution_id);
436
437        Ok(())
438    }
439
440    /// Cancel a workflow execution.
441    pub fn cancel_execution(&self, execution_id: uuid::Uuid) -> Result<()> {
442        let execution = self
443            .executions
444            .get(&execution_id)
445            .ok_or_else(|| ClusterError::WorkflowNotFound(execution_id.to_string()))?;
446
447        let mut exec = execution.write();
448        exec.status = WorkflowStatus::Cancelled;
449        exec.completed_at = Some(SystemTime::now());
450        let current_step = exec.current_step.clone();
451        exec.history.push(ExecutionEvent {
452            event_type: EventType::WorkflowCancelled,
453            step_id: current_step,
454            timestamp: SystemTime::now(),
455            message: "Workflow cancelled".to_string(),
456        });
457
458        let mut stats = self.stats.write();
459        stats.running_executions = self.count_running_executions();
460
461        Ok(())
462    }
463
464    /// Complete a workflow step.
465    pub fn complete_step(&self, execution_id: uuid::Uuid, step_id: String) -> Result<()> {
466        let execution = self
467            .executions
468            .get(&execution_id)
469            .ok_or_else(|| ClusterError::WorkflowNotFound(execution_id.to_string()))?;
470
471        let mut exec = execution.write();
472        exec.completed_steps.push(step_id.clone());
473        exec.history.push(ExecutionEvent {
474            event_type: EventType::StepCompleted,
475            step_id: Some(step_id),
476            timestamp: SystemTime::now(),
477            message: "Step completed successfully".to_string(),
478        });
479
480        Ok(())
481    }
482
483    /// Fail a workflow step.
484    pub fn fail_step(
485        &self,
486        execution_id: uuid::Uuid,
487        step_id: String,
488        error: String,
489    ) -> Result<()> {
490        let execution = self
491            .executions
492            .get(&execution_id)
493            .ok_or_else(|| ClusterError::WorkflowNotFound(execution_id.to_string()))?;
494
495        let mut exec = execution.write();
496        exec.failed_steps.push(step_id.clone());
497        exec.history.push(ExecutionEvent {
498            event_type: EventType::StepFailed,
499            step_id: Some(step_id),
500            timestamp: SystemTime::now(),
501            message: error,
502        });
503
504        Ok(())
505    }
506
507    /// Get workflow execution status.
508    pub fn get_execution(&self, execution_id: uuid::Uuid) -> Option<WorkflowExecution> {
509        self.executions.get(&execution_id).map(|e| e.read().clone())
510    }
511
512    /// List all executions for a workflow.
513    pub fn list_executions(&self, workflow_id: WorkflowId) -> Vec<WorkflowExecution> {
514        self.executions
515            .iter()
516            .filter(|entry| entry.value().read().workflow_id == workflow_id)
517            .map(|entry| entry.value().read().clone())
518            .collect()
519    }
520
521    /// List running executions.
522    pub fn list_running_executions(&self) -> Vec<WorkflowExecution> {
523        self.executions
524            .iter()
525            .filter(|entry| entry.value().read().status == WorkflowStatus::Running)
526            .map(|entry| entry.value().read().clone())
527            .collect()
528    }
529
530    fn count_running_executions(&self) -> usize {
531        self.executions
532            .iter()
533            .filter(|entry| entry.value().read().status == WorkflowStatus::Running)
534            .count()
535    }
536
537    /// Get workflow statistics.
538    pub fn get_stats(&self) -> WorkflowStats {
539        self.stats.read().clone()
540    }
541}
542
543impl Default for WorkflowEngine {
544    fn default() -> Self {
545        Self::new()
546    }
547}
548
549#[cfg(test)]
550#[allow(clippy::expect_used, clippy::unwrap_used)]
551mod tests {
552    use super::*;
553
554    #[test]
555    fn test_workflow_creation() {
556        let workflow = Workflow {
557            id: uuid::Uuid::new_v4(),
558            name: "test-workflow".to_string(),
559            version: "1.0.0".to_string(),
560            description: Some("Test workflow".to_string()),
561            steps: vec![],
562            variables: HashMap::new(),
563            created_at: SystemTime::now(),
564            updated_at: SystemTime::now(),
565        };
566
567        let engine = WorkflowEngine::new();
568        let result = engine.register_workflow(workflow.clone());
569
570        assert!(result.is_ok());
571        assert_eq!(
572            result.expect("workflow registration should succeed"),
573            workflow.id
574        );
575    }
576
577    #[test]
578    fn test_workflow_execution() {
579        let workflow = Workflow {
580            id: uuid::Uuid::new_v4(),
581            name: "test-workflow".to_string(),
582            version: "1.0.0".to_string(),
583            description: None,
584            steps: vec![],
585            variables: HashMap::new(),
586            created_at: SystemTime::now(),
587            updated_at: SystemTime::now(),
588        };
589
590        let engine = WorkflowEngine::new();
591        let workflow_id = engine
592            .register_workflow(workflow)
593            .expect("workflow registration should succeed");
594
595        let execution_id = engine
596            .start_execution(workflow_id)
597            .expect("workflow execution should start");
598        let execution = engine.get_execution(execution_id);
599
600        assert!(execution.is_some());
601        let execution = execution.expect("execution should exist");
602        assert_eq!(execution.status, WorkflowStatus::Running);
603    }
604
605    #[test]
606    fn test_workflow_pause_resume() {
607        let workflow = Workflow {
608            id: uuid::Uuid::new_v4(),
609            name: "test-workflow".to_string(),
610            version: "1.0.0".to_string(),
611            description: None,
612            steps: vec![],
613            variables: HashMap::new(),
614            created_at: SystemTime::now(),
615            updated_at: SystemTime::now(),
616        };
617
618        let engine = WorkflowEngine::new();
619        let workflow_id = engine
620            .register_workflow(workflow)
621            .expect("workflow registration should succeed");
622        let execution_id = engine
623            .start_execution(workflow_id)
624            .expect("workflow execution should start");
625
626        // Pause
627        engine.pause_execution(execution_id).ok();
628        let execution = engine
629            .get_execution(execution_id)
630            .expect("execution should exist after pause");
631        assert_eq!(execution.status, WorkflowStatus::Paused);
632
633        // Resume
634        engine.resume_execution(execution_id).ok();
635        let execution = engine
636            .get_execution(execution_id)
637            .expect("execution should exist after resume");
638        assert_eq!(execution.status, WorkflowStatus::Running);
639    }
640
641    #[test]
642    fn test_template_creation() {
643        let template = WorkflowTemplate {
644            name: "test-template".to_string(),
645            version: "1.0.0".to_string(),
646            parameters: vec![TemplateParameter {
647                name: "input".to_string(),
648                param_type: "string".to_string(),
649                required: true,
650                default: None,
651                description: Some("Input parameter".to_string()),
652            }],
653            steps: vec![],
654        };
655
656        let engine = WorkflowEngine::new();
657        engine.register_template(template.clone()).ok();
658
659        let mut params = HashMap::new();
660        params.insert("input".to_string(), serde_json::json!("test-value"));
661
662        let workflow = engine.create_from_template("test-template", params);
663        assert!(workflow.is_ok());
664    }
665}