Skip to main content

dataflow_rs/engine/
workflow.rs

1use crate::engine::error::{DataflowError, Result};
2use crate::engine::task::Task;
3use serde::Deserialize;
4use serde_json::Value;
5use std::fs;
6use std::path::Path;
7
8/// Workflow represents a collection of tasks that execute sequentially (also known as a Rule in rules-engine terminology).
9///
10/// Conditions are evaluated against the full message context, including `data`, `metadata`, and `temp_data` fields.
11#[derive(Clone, Debug, Deserialize)]
12pub struct Workflow {
13    pub id: String,
14    pub name: String,
15    #[serde(default)]
16    pub priority: u32,
17    pub description: Option<String>,
18    #[serde(default = "default_condition")]
19    pub condition: Value,
20    #[serde(skip)]
21    pub condition_index: Option<usize>,
22    pub tasks: Vec<Task>,
23    #[serde(default)]
24    pub continue_on_error: bool,
25}
26
27fn default_condition() -> Value {
28    Value::Bool(true)
29}
30
31impl Default for Workflow {
32    fn default() -> Self {
33        Self::new()
34    }
35}
36
37impl Workflow {
38    pub fn new() -> Self {
39        Workflow {
40            id: String::new(),
41            name: String::new(),
42            priority: 0,
43            description: None,
44            condition: Value::Bool(true),
45            condition_index: None,
46            tasks: Vec::new(),
47            continue_on_error: false,
48        }
49    }
50
51    /// Create a workflow (rule) with a condition and tasks.
52    ///
53    /// This is a convenience constructor for the IFTTT-style rules engine pattern:
54    /// **IF** `condition` **THEN** execute `tasks`.
55    ///
56    /// # Arguments
57    /// * `id` - Unique identifier for the rule
58    /// * `name` - Human-readable name
59    /// * `condition` - JSONLogic condition evaluated against the full context (data, metadata, temp_data)
60    /// * `tasks` - Actions to execute when the condition is met
61    pub fn rule(id: &str, name: &str, condition: Value, tasks: Vec<Task>) -> Self {
62        Workflow {
63            id: id.to_string(),
64            name: name.to_string(),
65            priority: 0,
66            description: None,
67            condition,
68            condition_index: None,
69            tasks,
70            continue_on_error: false,
71        }
72    }
73
74    /// Load workflow from JSON string
75    pub fn from_json(json_str: &str) -> Result<Self> {
76        serde_json::from_str(json_str).map_err(DataflowError::from_serde)
77    }
78
79    /// Load workflow from JSON file
80    pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self> {
81        let json_str = fs::read_to_string(path).map_err(DataflowError::from_io)?;
82
83        Self::from_json(&json_str)
84    }
85
86    /// Validate the workflow structure
87    pub fn validate(&self) -> Result<()> {
88        // Check required fields
89        if self.id.is_empty() {
90            return Err(DataflowError::Workflow(
91                "Workflow id cannot be empty".to_string(),
92            ));
93        }
94
95        if self.name.is_empty() {
96            return Err(DataflowError::Workflow(
97                "Workflow name cannot be empty".to_string(),
98            ));
99        }
100
101        // Check tasks
102        if self.tasks.is_empty() {
103            return Err(DataflowError::Workflow(
104                "Workflow must have at least one task".to_string(),
105            ));
106        }
107
108        // Validate that task IDs are unique
109        let mut task_ids = std::collections::HashSet::new();
110        for task in &self.tasks {
111            if !task_ids.insert(&task.id) {
112                return Err(DataflowError::Workflow(format!(
113                    "Duplicate task ID '{}' in workflow",
114                    task.id
115                )));
116            }
117        }
118
119        Ok(())
120    }
121}