dataflow_rs/engine/
workflow.rs1use crate::engine::error::{DataflowError, Result};
2use crate::engine::task::Task;
3use chrono::{DateTime, Utc};
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6use std::fs;
7use std::path::Path;
8
9#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
11#[serde(rename_all = "lowercase")]
12pub enum WorkflowStatus {
13 #[default]
14 Active,
15 Paused,
16 Archived,
17}
18
19#[derive(Clone, Debug, Deserialize)]
23pub struct Workflow {
24 pub id: String,
25 pub name: String,
26 #[serde(default)]
27 pub priority: u32,
28 pub description: Option<String>,
29 #[serde(default = "default_condition")]
30 pub condition: Value,
31 #[serde(skip)]
32 pub condition_index: Option<usize>,
33 pub tasks: Vec<Task>,
34 #[serde(default)]
35 pub continue_on_error: bool,
36 #[serde(default = "default_channel")]
38 pub channel: String,
39 #[serde(default = "default_version")]
41 pub version: u32,
42 #[serde(default)]
44 pub status: WorkflowStatus,
45 #[serde(default)]
47 pub tags: Vec<String>,
48 #[serde(default)]
50 pub created_at: Option<DateTime<Utc>>,
51 #[serde(default)]
53 pub updated_at: Option<DateTime<Utc>>,
54}
55
56fn default_condition() -> Value {
57 Value::Bool(true)
58}
59
60fn default_channel() -> String {
61 "default".to_string()
62}
63
64fn default_version() -> u32 {
65 1
66}
67
68impl Default for Workflow {
69 fn default() -> Self {
70 Self::new()
71 }
72}
73
74impl Workflow {
75 pub fn new() -> Self {
76 Workflow {
77 id: String::new(),
78 name: String::new(),
79 priority: 0,
80 description: None,
81 condition: Value::Bool(true),
82 condition_index: None,
83 tasks: Vec::new(),
84 continue_on_error: false,
85 channel: default_channel(),
86 version: 1,
87 status: WorkflowStatus::Active,
88 tags: Vec::new(),
89 created_at: None,
90 updated_at: None,
91 }
92 }
93
94 pub fn rule(id: &str, name: &str, condition: Value, tasks: Vec<Task>) -> Self {
105 Workflow {
106 id: id.to_string(),
107 name: name.to_string(),
108 priority: 0,
109 description: None,
110 condition,
111 condition_index: None,
112 tasks,
113 continue_on_error: false,
114 channel: default_channel(),
115 version: 1,
116 status: WorkflowStatus::Active,
117 tags: Vec::new(),
118 created_at: None,
119 updated_at: None,
120 }
121 }
122
123 pub fn from_json(json_str: &str) -> Result<Self> {
125 serde_json::from_str(json_str).map_err(DataflowError::from_serde)
126 }
127
128 pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self> {
130 let json_str = fs::read_to_string(path).map_err(DataflowError::from_io)?;
131
132 Self::from_json(&json_str)
133 }
134
135 pub fn validate(&self) -> Result<()> {
137 if self.id.is_empty() {
139 return Err(DataflowError::Workflow(
140 "Workflow id cannot be empty".to_string(),
141 ));
142 }
143
144 if self.name.is_empty() {
145 return Err(DataflowError::Workflow(
146 "Workflow name cannot be empty".to_string(),
147 ));
148 }
149
150 if self.tasks.is_empty() {
152 return Err(DataflowError::Workflow(
153 "Workflow must have at least one task".to_string(),
154 ));
155 }
156
157 let mut task_ids = std::collections::HashSet::new();
159 for task in &self.tasks {
160 if !task_ids.insert(&task.id) {
161 return Err(DataflowError::Workflow(format!(
162 "Duplicate task ID '{}' in workflow",
163 task.id
164 )));
165 }
166 }
167
168 Ok(())
169 }
170}