Skip to main content

dataflow_rs/engine/
workflow.rs

1use crate::engine::error::{DataflowError, Result};
2use crate::engine::task::Task;
3use chrono::{DateTime, Utc};
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6use std::fs;
7use std::path::Path;
8
9/// Workflow lifecycle status
10#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
11#[serde(rename_all = "lowercase")]
12pub enum WorkflowStatus {
13    #[default]
14    Active,
15    Paused,
16    Archived,
17}
18
19/// Workflow represents a collection of tasks that execute sequentially (also known as a Rule in rules-engine terminology).
20///
21/// Conditions are evaluated against the full message context, including `data`, `metadata`, and `temp_data` fields.
22#[derive(Clone, Debug, Deserialize)]
23pub struct Workflow {
24    pub id: String,
25    pub name: String,
26    #[serde(default)]
27    pub priority: u32,
28    pub description: Option<String>,
29    #[serde(default = "default_condition")]
30    pub condition: Value,
31    #[serde(skip)]
32    pub condition_index: Option<usize>,
33    pub tasks: Vec<Task>,
34    #[serde(default)]
35    pub continue_on_error: bool,
36    /// Channel for routing (default: "default")
37    #[serde(default = "default_channel")]
38    pub channel: String,
39    /// Version number for rule versioning (default: 1)
40    #[serde(default = "default_version")]
41    pub version: u32,
42    /// Workflow status — Active, Paused, or Archived (default: Active)
43    #[serde(default)]
44    pub status: WorkflowStatus,
45    /// Tags for categorization and filtering
46    #[serde(default)]
47    pub tags: Vec<String>,
48    /// Creation timestamp
49    #[serde(default)]
50    pub created_at: Option<DateTime<Utc>>,
51    /// Last update timestamp
52    #[serde(default)]
53    pub updated_at: Option<DateTime<Utc>>,
54}
55
56fn default_condition() -> Value {
57    Value::Bool(true)
58}
59
60fn default_channel() -> String {
61    "default".to_string()
62}
63
64fn default_version() -> u32 {
65    1
66}
67
68impl Default for Workflow {
69    fn default() -> Self {
70        Self::new()
71    }
72}
73
74impl Workflow {
75    pub fn new() -> Self {
76        Workflow {
77            id: String::new(),
78            name: String::new(),
79            priority: 0,
80            description: None,
81            condition: Value::Bool(true),
82            condition_index: None,
83            tasks: Vec::new(),
84            continue_on_error: false,
85            channel: default_channel(),
86            version: 1,
87            status: WorkflowStatus::Active,
88            tags: Vec::new(),
89            created_at: None,
90            updated_at: None,
91        }
92    }
93
94    /// Create a workflow (rule) with a condition and tasks.
95    ///
96    /// This is a convenience constructor for the IFTTT-style rules engine pattern:
97    /// **IF** `condition` **THEN** execute `tasks`.
98    ///
99    /// # Arguments
100    /// * `id` - Unique identifier for the rule
101    /// * `name` - Human-readable name
102    /// * `condition` - JSONLogic condition evaluated against the full context (data, metadata, temp_data)
103    /// * `tasks` - Actions to execute when the condition is met
104    pub fn rule(id: &str, name: &str, condition: Value, tasks: Vec<Task>) -> Self {
105        Workflow {
106            id: id.to_string(),
107            name: name.to_string(),
108            priority: 0,
109            description: None,
110            condition,
111            condition_index: None,
112            tasks,
113            continue_on_error: false,
114            channel: default_channel(),
115            version: 1,
116            status: WorkflowStatus::Active,
117            tags: Vec::new(),
118            created_at: None,
119            updated_at: None,
120        }
121    }
122
123    /// Load workflow from JSON string
124    pub fn from_json(json_str: &str) -> Result<Self> {
125        serde_json::from_str(json_str).map_err(DataflowError::from_serde)
126    }
127
128    /// Load workflow from JSON file
129    pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self> {
130        let json_str = fs::read_to_string(path).map_err(DataflowError::from_io)?;
131
132        Self::from_json(&json_str)
133    }
134
135    /// Validate the workflow structure
136    pub fn validate(&self) -> Result<()> {
137        // Check required fields
138        if self.id.is_empty() {
139            return Err(DataflowError::Workflow(
140                "Workflow id cannot be empty".to_string(),
141            ));
142        }
143
144        if self.name.is_empty() {
145            return Err(DataflowError::Workflow(
146                "Workflow name cannot be empty".to_string(),
147            ));
148        }
149
150        // Check tasks
151        if self.tasks.is_empty() {
152            return Err(DataflowError::Workflow(
153                "Workflow must have at least one task".to_string(),
154            ));
155        }
156
157        // Validate that task IDs are unique
158        let mut task_ids = std::collections::HashSet::new();
159        for task in &self.tasks {
160            if !task_ids.insert(&task.id) {
161                return Err(DataflowError::Workflow(format!(
162                    "Duplicate task ID '{}' in workflow",
163                    task.id
164                )));
165            }
166        }
167
168        Ok(())
169    }
170}