Skip to main content

dataflow_rs/engine/
workflow.rs

1use crate::engine::error::{DataflowError, Result};
2use crate::engine::task::Task;
3use chrono::{DateTime, Utc};
4use datalogic_rs::Logic;
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7use std::fs;
8use std::path::Path;
9use std::sync::Arc;
10
11/// Workflow lifecycle status
12#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
13#[serde(rename_all = "lowercase")]
14pub enum WorkflowStatus {
15    #[default]
16    Active,
17    Paused,
18    Archived,
19}
20
21/// Workflow represents a collection of tasks that execute sequentially (also known as a Rule in rules-engine terminology).
22///
23/// Conditions are evaluated against the full message context, including `data`, `metadata`, and `temp_data` fields.
24#[derive(Clone, Debug, Deserialize)]
25pub struct Workflow {
26    pub id: String,
27    /// Engine-internal: `Arc<str>` mirror of `id`, populated by
28    /// `LogicCompiler::compile_workflows`. Cloning is a refcount bump; per-message
29    /// `AuditTrail` entries reuse it instead of allocating from `&id` each time.
30    /// Not part of the stable API.
31    #[doc(hidden)]
32    #[serde(skip)]
33    pub id_arc: Arc<str>,
34    pub name: String,
35    #[serde(default)]
36    pub priority: u32,
37    pub description: Option<String>,
38    #[serde(default = "default_condition")]
39    pub condition: Value,
40    /// Engine-internal: pre-compiled JSONLogic for `condition`, populated by
41    /// `LogicCompiler`. `None` is treated as "no condition / always run" by
42    /// the executor. Not part of the stable API.
43    #[doc(hidden)]
44    #[serde(skip)]
45    pub compiled_condition: Option<Arc<Logic>>,
46    pub tasks: Vec<Task>,
47    #[serde(default)]
48    pub continue_on_error: bool,
49    /// Channel for routing (default: "default")
50    #[serde(default = "default_channel")]
51    pub channel: String,
52    /// Version number for rule versioning (default: 1)
53    #[serde(default = "default_version")]
54    pub version: u32,
55    /// Workflow status — Active, Paused, or Archived (default: Active)
56    #[serde(default)]
57    pub status: WorkflowStatus,
58    /// Tags for categorization and filtering
59    #[serde(default)]
60    pub tags: Vec<String>,
61    /// Creation timestamp
62    #[serde(default)]
63    pub created_at: Option<DateTime<Utc>>,
64    /// Last update timestamp
65    #[serde(default)]
66    pub updated_at: Option<DateTime<Utc>>,
67}
68
69fn default_condition() -> Value {
70    Value::Bool(true)
71}
72
73fn default_channel() -> String {
74    "default".to_string()
75}
76
77fn default_version() -> u32 {
78    1
79}
80
81impl Default for Workflow {
82    fn default() -> Self {
83        Self::new()
84    }
85}
86
87impl Workflow {
88    pub fn new() -> Self {
89        Workflow {
90            id: String::new(),
91            id_arc: Arc::from(""),
92            name: String::new(),
93            priority: 0,
94            description: None,
95            condition: Value::Bool(true),
96            compiled_condition: None,
97            tasks: Vec::new(),
98            continue_on_error: false,
99            channel: default_channel(),
100            version: 1,
101            status: WorkflowStatus::Active,
102            tags: Vec::new(),
103            created_at: None,
104            updated_at: None,
105        }
106    }
107
108    /// Create a workflow (rule) with a condition and tasks.
109    ///
110    /// This is a convenience constructor for the IFTTT-style rules engine pattern:
111    /// **IF** `condition` **THEN** execute `tasks`.
112    ///
113    /// # Arguments
114    /// * `id` - Unique identifier for the rule
115    /// * `name` - Human-readable name
116    /// * `condition` - JSONLogic condition evaluated against the full context (data, metadata, temp_data)
117    /// * `tasks` - Actions to execute when the condition is met
118    pub fn rule(id: &str, name: &str, condition: Value, tasks: Vec<Task>) -> Self {
119        Workflow {
120            id: id.to_string(),
121            id_arc: Arc::from(id),
122            name: name.to_string(),
123            priority: 0,
124            description: None,
125            condition,
126            compiled_condition: None,
127            tasks,
128            continue_on_error: false,
129            channel: default_channel(),
130            version: 1,
131            status: WorkflowStatus::Active,
132            tags: Vec::new(),
133            created_at: None,
134            updated_at: None,
135        }
136    }
137
138    /// Load workflow from JSON string
139    pub fn from_json(json_str: &str) -> Result<Self> {
140        serde_json::from_str(json_str).map_err(DataflowError::from_serde)
141    }
142
143    /// Load workflow from JSON file
144    pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self> {
145        let json_str = fs::read_to_string(path).map_err(DataflowError::from_io)?;
146
147        Self::from_json(&json_str)
148    }
149
150    /// Validate the workflow structure
151    pub fn validate(&self) -> Result<()> {
152        // Check required fields
153        if self.id.is_empty() {
154            return Err(DataflowError::Workflow(
155                "Workflow id cannot be empty".to_string(),
156            ));
157        }
158
159        if self.name.is_empty() {
160            return Err(DataflowError::Workflow(
161                "Workflow name cannot be empty".to_string(),
162            ));
163        }
164
165        // Check tasks
166        if self.tasks.is_empty() {
167            return Err(DataflowError::Workflow(
168                "Workflow must have at least one task".to_string(),
169            ));
170        }
171
172        // Validate that task IDs are unique
173        let mut task_ids = std::collections::HashSet::new();
174        for task in &self.tasks {
175            if !task_ids.insert(&task.id) {
176                return Err(DataflowError::Workflow(format!(
177                    "Duplicate task ID '{}' in workflow",
178                    task.id
179                )));
180            }
181        }
182
183        Ok(())
184    }
185}