Skip to main content

dataflow_rs/engine/
workflow.rs

1use crate::engine::error::{DataflowError, Result};
2use crate::engine::task::Task;
3use chrono::{DateTime, Utc};
4use datalogic_rs::Logic;
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7use std::fs;
8use std::path::Path;
9use std::sync::Arc;
10
11/// Workflow lifecycle status
12#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
13#[serde(rename_all = "lowercase")]
14pub enum WorkflowStatus {
15    #[default]
16    Active,
17    Paused,
18    Archived,
19}
20
21/// Workflow represents a collection of tasks that execute sequentially (also known as a Rule in rules-engine terminology).
22///
23/// Conditions are evaluated against the full message context, including `data`, `metadata`, and `temp_data` fields.
24#[derive(Clone, Debug, Deserialize)]
25pub struct Workflow {
26    pub id: String,
27    /// Engine-internal: `Arc<str>` mirror of `id`, populated by
28    /// `LogicCompiler::compile_workflows`. Cloning is a refcount bump; per-message
29    /// `AuditTrail` entries reuse it instead of allocating from `&id` each time.
30    /// Not part of the stable API.
31    #[doc(hidden)]
32    #[serde(skip)]
33    pub id_arc: Arc<str>,
34    pub name: String,
35    #[serde(default)]
36    pub priority: u32,
37    pub description: Option<String>,
38    #[serde(default = "default_condition")]
39    pub condition: Value,
40    /// Engine-internal: pre-compiled JSONLogic for `condition`, populated by
41    /// `LogicCompiler`. `None` is treated as "no condition / always run" by
42    /// the executor. Not part of the stable API.
43    #[doc(hidden)]
44    #[serde(skip)]
45    pub compiled_condition: Option<Arc<Logic>>,
46    /// Engine-internal: `true` when every task is a synchronous built-in
47    /// (`is_sync_builtin`), so the whole workflow can run inside a shared
48    /// `with_arena` scope with no `.await`. Populated by `LogicCompiler`; the
49    /// `false` default means an uncompiled workflow conservatively takes the
50    /// async path. Not part of the stable API.
51    #[doc(hidden)]
52    #[serde(skip, default)]
53    pub fully_sync: bool,
54    pub tasks: Vec<Task>,
55    #[serde(default)]
56    pub continue_on_error: bool,
57    /// Channel for routing (default: "default")
58    #[serde(default = "default_channel")]
59    pub channel: String,
60    /// Version number for rule versioning (default: 1)
61    #[serde(default = "default_version")]
62    pub version: u32,
63    /// Workflow status — Active, Paused, or Archived (default: Active)
64    #[serde(default)]
65    pub status: WorkflowStatus,
66    /// Tags for categorization and filtering
67    #[serde(default)]
68    pub tags: Vec<String>,
69    /// Creation timestamp
70    #[serde(default)]
71    pub created_at: Option<DateTime<Utc>>,
72    /// Last update timestamp
73    #[serde(default)]
74    pub updated_at: Option<DateTime<Utc>>,
75}
76
77fn default_condition() -> Value {
78    Value::Bool(true)
79}
80
81fn default_channel() -> String {
82    "default".to_string()
83}
84
85fn default_version() -> u32 {
86    1
87}
88
89impl Default for Workflow {
90    fn default() -> Self {
91        Self::new()
92    }
93}
94
95impl Workflow {
96    pub fn new() -> Self {
97        Workflow {
98            id: String::new(),
99            id_arc: Arc::from(""),
100            name: String::new(),
101            priority: 0,
102            description: None,
103            condition: Value::Bool(true),
104            compiled_condition: None,
105            fully_sync: false,
106            tasks: Vec::new(),
107            continue_on_error: false,
108            channel: default_channel(),
109            version: 1,
110            status: WorkflowStatus::Active,
111            tags: Vec::new(),
112            created_at: None,
113            updated_at: None,
114        }
115    }
116
117    /// Create a workflow (rule) with a condition and tasks.
118    ///
119    /// This is a convenience constructor for the IFTTT-style rules engine pattern:
120    /// **IF** `condition` **THEN** execute `tasks`.
121    ///
122    /// # Arguments
123    /// * `id` - Unique identifier for the rule
124    /// * `name` - Human-readable name
125    /// * `condition` - JSONLogic condition evaluated against the full context (data, metadata, temp_data)
126    /// * `tasks` - Actions to execute when the condition is met
127    pub fn rule(id: &str, name: &str, condition: Value, tasks: Vec<Task>) -> Self {
128        Workflow {
129            id: id.to_string(),
130            id_arc: Arc::from(id),
131            name: name.to_string(),
132            priority: 0,
133            description: None,
134            condition,
135            compiled_condition: None,
136            fully_sync: false,
137            tasks,
138            continue_on_error: false,
139            channel: default_channel(),
140            version: 1,
141            status: WorkflowStatus::Active,
142            tags: Vec::new(),
143            created_at: None,
144            updated_at: None,
145        }
146    }
147
148    /// Load workflow from JSON string
149    pub fn from_json(json_str: &str) -> Result<Self> {
150        serde_json::from_str(json_str).map_err(DataflowError::from_serde)
151    }
152
153    /// Load workflow from JSON file
154    pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self> {
155        let json_str = fs::read_to_string(path).map_err(DataflowError::from_io)?;
156
157        Self::from_json(&json_str)
158    }
159
160    /// Validate the workflow structure
161    pub fn validate(&self) -> Result<()> {
162        // Check required fields
163        if self.id.is_empty() {
164            return Err(DataflowError::Workflow(
165                "Workflow id cannot be empty".to_string(),
166            ));
167        }
168
169        if self.name.is_empty() {
170            return Err(DataflowError::Workflow(
171                "Workflow name cannot be empty".to_string(),
172            ));
173        }
174
175        // Check tasks
176        if self.tasks.is_empty() {
177            return Err(DataflowError::Workflow(
178                "Workflow must have at least one task".to_string(),
179            ));
180        }
181
182        // Validate that task IDs are unique
183        let mut task_ids = std::collections::HashSet::new();
184        for task in &self.tasks {
185            if !task_ids.insert(&task.id) {
186                return Err(DataflowError::Workflow(format!(
187                    "Duplicate task ID '{}' in workflow",
188                    task.id
189                )));
190            }
191        }
192
193        Ok(())
194    }
195}