dataflow_rs/engine/
workflow.rs1use crate::engine::error::{DataflowError, Result};
2use crate::engine::task::Task;
3use chrono::{DateTime, Utc};
4use datalogic_rs::Logic;
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7use std::fs;
8use std::path::Path;
9use std::sync::Arc;
10
11#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
13#[serde(rename_all = "lowercase")]
14pub enum WorkflowStatus {
15 #[default]
16 Active,
17 Paused,
18 Archived,
19}
20
21#[derive(Clone, Debug, Deserialize)]
25pub struct Workflow {
26 pub id: String,
27 #[doc(hidden)]
32 #[serde(skip)]
33 pub id_arc: Arc<str>,
34 pub name: String,
35 #[serde(default)]
36 pub priority: u32,
37 pub description: Option<String>,
38 #[serde(default = "default_condition")]
39 pub condition: Value,
40 #[doc(hidden)]
44 #[serde(skip)]
45 pub compiled_condition: Option<Arc<Logic>>,
46 #[doc(hidden)]
52 #[serde(skip, default)]
53 pub fully_sync: bool,
54 pub tasks: Vec<Task>,
55 #[serde(default)]
56 pub continue_on_error: bool,
57 #[serde(default = "default_channel")]
59 pub channel: String,
60 #[serde(default = "default_version")]
62 pub version: u32,
63 #[serde(default)]
65 pub status: WorkflowStatus,
66 #[serde(default)]
68 pub tags: Vec<String>,
69 #[serde(default)]
71 pub created_at: Option<DateTime<Utc>>,
72 #[serde(default)]
74 pub updated_at: Option<DateTime<Utc>>,
75}
76
77fn default_condition() -> Value {
78 Value::Bool(true)
79}
80
81fn default_channel() -> String {
82 "default".to_string()
83}
84
85fn default_version() -> u32 {
86 1
87}
88
89impl Default for Workflow {
90 fn default() -> Self {
91 Self::new()
92 }
93}
94
95impl Workflow {
96 pub fn new() -> Self {
97 Workflow {
98 id: String::new(),
99 id_arc: Arc::from(""),
100 name: String::new(),
101 priority: 0,
102 description: None,
103 condition: Value::Bool(true),
104 compiled_condition: None,
105 fully_sync: false,
106 tasks: Vec::new(),
107 continue_on_error: false,
108 channel: default_channel(),
109 version: 1,
110 status: WorkflowStatus::Active,
111 tags: Vec::new(),
112 created_at: None,
113 updated_at: None,
114 }
115 }
116
117 pub fn rule(id: &str, name: &str, condition: Value, tasks: Vec<Task>) -> Self {
128 Workflow {
129 id: id.to_string(),
130 id_arc: Arc::from(id),
131 name: name.to_string(),
132 priority: 0,
133 description: None,
134 condition,
135 compiled_condition: None,
136 fully_sync: false,
137 tasks,
138 continue_on_error: false,
139 channel: default_channel(),
140 version: 1,
141 status: WorkflowStatus::Active,
142 tags: Vec::new(),
143 created_at: None,
144 updated_at: None,
145 }
146 }
147
148 pub fn from_json(json_str: &str) -> Result<Self> {
150 serde_json::from_str(json_str).map_err(DataflowError::from_serde)
151 }
152
153 pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self> {
155 let json_str = fs::read_to_string(path).map_err(DataflowError::from_io)?;
156
157 Self::from_json(&json_str)
158 }
159
160 pub fn validate(&self) -> Result<()> {
162 if self.id.is_empty() {
164 return Err(DataflowError::Workflow(
165 "Workflow id cannot be empty".to_string(),
166 ));
167 }
168
169 if self.name.is_empty() {
170 return Err(DataflowError::Workflow(
171 "Workflow name cannot be empty".to_string(),
172 ));
173 }
174
175 if self.tasks.is_empty() {
177 return Err(DataflowError::Workflow(
178 "Workflow must have at least one task".to_string(),
179 ));
180 }
181
182 let mut task_ids = std::collections::HashSet::new();
184 for task in &self.tasks {
185 if !task_ids.insert(&task.id) {
186 return Err(DataflowError::Workflow(format!(
187 "Duplicate task ID '{}' in workflow",
188 task.id
189 )));
190 }
191 }
192
193 Ok(())
194 }
195}