dataflow_rs/engine/
workflow.rs1use crate::engine::error::{DataflowError, Result};
2use crate::engine::task::Task;
3use chrono::{DateTime, Utc};
4use datalogic_rs::Logic;
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7use std::fs;
8use std::path::Path;
9use std::sync::Arc;
10
11#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
13#[serde(rename_all = "lowercase")]
14pub enum WorkflowStatus {
15 #[default]
16 Active,
17 Paused,
18 Archived,
19}
20
21#[derive(Clone, Debug, Deserialize)]
25pub struct Workflow {
26 pub id: String,
27 #[doc(hidden)]
32 #[serde(skip)]
33 pub id_arc: Arc<str>,
34 pub name: String,
35 #[serde(default)]
36 pub priority: u32,
37 pub description: Option<String>,
38 #[serde(default = "default_condition")]
39 pub condition: Value,
40 #[doc(hidden)]
44 #[serde(skip)]
45 pub compiled_condition: Option<Arc<Logic>>,
46 pub tasks: Vec<Task>,
47 #[serde(default)]
48 pub continue_on_error: bool,
49 #[serde(default = "default_channel")]
51 pub channel: String,
52 #[serde(default = "default_version")]
54 pub version: u32,
55 #[serde(default)]
57 pub status: WorkflowStatus,
58 #[serde(default)]
60 pub tags: Vec<String>,
61 #[serde(default)]
63 pub created_at: Option<DateTime<Utc>>,
64 #[serde(default)]
66 pub updated_at: Option<DateTime<Utc>>,
67}
68
69fn default_condition() -> Value {
70 Value::Bool(true)
71}
72
73fn default_channel() -> String {
74 "default".to_string()
75}
76
77fn default_version() -> u32 {
78 1
79}
80
81impl Default for Workflow {
82 fn default() -> Self {
83 Self::new()
84 }
85}
86
87impl Workflow {
88 pub fn new() -> Self {
89 Workflow {
90 id: String::new(),
91 id_arc: Arc::from(""),
92 name: String::new(),
93 priority: 0,
94 description: None,
95 condition: Value::Bool(true),
96 compiled_condition: None,
97 tasks: Vec::new(),
98 continue_on_error: false,
99 channel: default_channel(),
100 version: 1,
101 status: WorkflowStatus::Active,
102 tags: Vec::new(),
103 created_at: None,
104 updated_at: None,
105 }
106 }
107
108 pub fn rule(id: &str, name: &str, condition: Value, tasks: Vec<Task>) -> Self {
119 Workflow {
120 id: id.to_string(),
121 id_arc: Arc::from(id),
122 name: name.to_string(),
123 priority: 0,
124 description: None,
125 condition,
126 compiled_condition: None,
127 tasks,
128 continue_on_error: false,
129 channel: default_channel(),
130 version: 1,
131 status: WorkflowStatus::Active,
132 tags: Vec::new(),
133 created_at: None,
134 updated_at: None,
135 }
136 }
137
138 pub fn from_json(json_str: &str) -> Result<Self> {
140 serde_json::from_str(json_str).map_err(DataflowError::from_serde)
141 }
142
143 pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self> {
145 let json_str = fs::read_to_string(path).map_err(DataflowError::from_io)?;
146
147 Self::from_json(&json_str)
148 }
149
150 pub fn validate(&self) -> Result<()> {
152 if self.id.is_empty() {
154 return Err(DataflowError::Workflow(
155 "Workflow id cannot be empty".to_string(),
156 ));
157 }
158
159 if self.name.is_empty() {
160 return Err(DataflowError::Workflow(
161 "Workflow name cannot be empty".to_string(),
162 ));
163 }
164
165 if self.tasks.is_empty() {
167 return Err(DataflowError::Workflow(
168 "Workflow must have at least one task".to_string(),
169 ));
170 }
171
172 let mut task_ids = std::collections::HashSet::new();
174 for task in &self.tasks {
175 if !task_ids.insert(&task.id) {
176 return Err(DataflowError::Workflow(format!(
177 "Duplicate task ID '{}' in workflow",
178 task.id
179 )));
180 }
181 }
182
183 Ok(())
184 }
185}