matrixcode_core/workflow/
context.rs1use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8
9#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
11#[serde(rename_all = "snake_case")]
12#[derive(Default)]
13pub enum WorkflowStatus {
14 #[default]
16 Pending,
17 Running,
19 Paused,
21 Completed,
23 Failed,
25 Cancelled,
27}
28
29#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
31#[serde(rename_all = "snake_case")]
32#[derive(Default)]
33pub enum NodeStatus {
34 #[default]
36 Pending,
37 Running,
39 Completed,
41 Failed,
43 Skipped,
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct NodeExecution {
50 pub node_id: String,
52 pub status: NodeStatus,
54 pub started_at: Option<DateTime<Utc>>,
56 pub finished_at: Option<DateTime<Utc>>,
58 pub retry_count: u32,
60 pub error: Option<String>,
62 pub output: Option<serde_json::Value>,
64}
65
66impl NodeExecution {
67 pub fn new(node_id: String) -> Self {
68 Self {
69 node_id,
70 status: NodeStatus::Pending,
71 started_at: None,
72 finished_at: None,
73 retry_count: 0,
74 error: None,
75 output: None,
76 }
77 }
78
79 pub fn start(&mut self) {
80 self.status = NodeStatus::Running;
81 self.started_at = Some(Utc::now());
82 }
83
84 pub fn complete(&mut self, output: Option<serde_json::Value>) {
85 self.status = NodeStatus::Completed;
86 self.finished_at = Some(Utc::now());
87 self.output = output;
88 }
89
90 pub fn fail(&mut self, error: String) {
91 self.status = NodeStatus::Failed;
92 self.finished_at = Some(Utc::now());
93 self.error = Some(error);
94 }
95
96 pub fn skip(&mut self) {
97 self.status = NodeStatus::Skipped;
98 self.finished_at = Some(Utc::now());
99 }
100
101 pub fn increment_retry(&mut self) {
102 self.retry_count += 1;
103 }
104
105 pub fn duration_ms(&self) -> Option<i64> {
107 match (self.started_at, self.finished_at) {
108 (Some(start), Some(end)) => Some((end - start).num_milliseconds()),
109 _ => None,
110 }
111 }
112}
113
114#[derive(Debug, Clone, Serialize, Deserialize)]
116pub struct WorkflowContext {
117 pub instance_id: String,
119 pub workflow_id: String,
121 pub status: WorkflowStatus,
123 pub current_node_id: Option<String>,
125 pub inputs: HashMap<String, serde_json::Value>,
127 pub variables: HashMap<String, serde_json::Value>,
129 pub node_executions: HashMap<String, NodeExecution>,
131 pub execution_path: Vec<String>,
133 pub created_at: DateTime<Utc>,
135 pub updated_at: DateTime<Utc>,
137 pub started_at: Option<DateTime<Utc>>,
139 pub finished_at: Option<DateTime<Utc>>,
141 pub error: Option<String>,
143}
144
145impl WorkflowContext {
146 pub fn new(workflow_id: String, inputs: HashMap<String, serde_json::Value>) -> Self {
148 let now = Utc::now();
149 Self {
150 instance_id: format!("inst_{}", uuid::Uuid::new_v4()),
151 workflow_id,
152 status: WorkflowStatus::Pending,
153 current_node_id: None,
154 inputs,
155 variables: HashMap::new(),
156 node_executions: HashMap::new(),
157 execution_path: Vec::new(),
158 created_at: now,
159 updated_at: now,
160 started_at: None,
161 finished_at: None,
162 error: None,
163 }
164 }
165
166 pub fn set_variable(&mut self, key: String, value: serde_json::Value) {
168 self.variables.insert(key, value);
169 self.updated_at = Utc::now();
170 }
171
172 pub fn get_variable(&self, key: &str) -> Option<&serde_json::Value> {
174 self.variables.get(key)
175 }
176
177 pub fn set_input(&mut self, key: String, value: serde_json::Value) {
179 self.inputs.insert(key, value);
180 self.updated_at = Utc::now();
181 }
182
183 pub fn get_input(&self, key: &str) -> Option<&serde_json::Value> {
185 self.inputs.get(key)
186 }
187
188 pub fn start(&mut self) {
190 self.status = WorkflowStatus::Running;
191 self.started_at = Some(Utc::now());
192 self.updated_at = Utc::now();
193 }
194
195 pub fn complete(&mut self) {
197 self.status = WorkflowStatus::Completed;
198 self.finished_at = Some(Utc::now());
199 self.updated_at = Utc::now();
200 }
201
202 pub fn fail(&mut self, error: String) {
204 self.status = WorkflowStatus::Failed;
205 self.error = Some(error);
206 self.finished_at = Some(Utc::now());
207 self.updated_at = Utc::now();
208 }
209
210 pub fn cancel(&mut self) {
212 self.status = WorkflowStatus::Cancelled;
213 self.finished_at = Some(Utc::now());
214 self.updated_at = Utc::now();
215 }
216
217 pub fn pause(&mut self) {
219 self.status = WorkflowStatus::Paused;
220 self.updated_at = Utc::now();
221 }
222
223 pub fn resume(&mut self) {
225 self.status = WorkflowStatus::Running;
226 self.updated_at = Utc::now();
227 }
228
229 pub fn set_current_node(&mut self, node_id: String) {
231 self.current_node_id = Some(node_id.clone());
232 self.execution_path.push(node_id);
233 self.updated_at = Utc::now();
234 }
235
236 pub fn get_or_create_node_execution(&mut self, node_id: &str) -> &mut NodeExecution {
238 self.node_executions
239 .entry(node_id.to_string())
240 .or_insert_with(|| NodeExecution::new(node_id.to_string()))
241 }
242
243 pub fn get_node_execution(&self, node_id: &str) -> Option<&NodeExecution> {
245 self.node_executions.get(node_id)
246 }
247
248 pub fn total_duration_ms(&self) -> Option<i64> {
250 match (self.started_at, self.finished_at) {
251 (Some(start), Some(end)) => Some((end - start).num_milliseconds()),
252 _ => None,
253 }
254 }
255
256 pub fn can_continue(&self) -> bool {
258 matches!(
259 self.status,
260 WorkflowStatus::Pending | WorkflowStatus::Running
261 )
262 }
263}
264
265#[cfg(test)]
266mod tests {
267 use super::*;
268
269 #[test]
270 fn test_workflow_context_new() {
271 let ctx = WorkflowContext::new("test-workflow".to_string(), HashMap::new());
272 assert_eq!(ctx.status, WorkflowStatus::Pending);
273 assert!(ctx.current_node_id.is_none());
274 }
275
276 #[test]
277 fn test_workflow_context_lifecycle() {
278 let mut ctx = WorkflowContext::new("test-workflow".to_string(), HashMap::new());
279
280 ctx.start();
281 assert_eq!(ctx.status, WorkflowStatus::Running);
282
283 ctx.set_current_node("node1".to_string());
284 assert_eq!(ctx.current_node_id, Some("node1".to_string()));
285
286 ctx.complete();
287 assert_eq!(ctx.status, WorkflowStatus::Completed);
288 assert!(ctx.finished_at.is_some());
289 }
290
291 #[test]
292 fn test_node_execution() {
293 let mut exec = NodeExecution::new("node1".to_string());
294 assert_eq!(exec.status, NodeStatus::Pending);
295
296 exec.start();
297 assert_eq!(exec.status, NodeStatus::Running);
298
299 exec.complete(Some(serde_json::json!({"result": "ok"})));
300 assert_eq!(exec.status, NodeStatus::Completed);
301 assert!(exec.output.is_some());
302 }
303
304 #[test]
305 fn test_pause_resume() {
306 let mut ctx = WorkflowContext::new("test-workflow".to_string(), HashMap::new());
307
308 ctx.start();
309 assert_eq!(ctx.status, WorkflowStatus::Running);
310 assert!(ctx.can_continue());
311
312 ctx.pause();
313 assert_eq!(ctx.status, WorkflowStatus::Paused);
314 assert!(!ctx.can_continue());
315
316 ctx.resume();
317 assert_eq!(ctx.status, WorkflowStatus::Running);
318 assert!(ctx.can_continue());
319 }
320}