matrixcode_core/workflow/
context.rs1use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8
9#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
11#[serde(rename_all = "snake_case")]
12#[derive(Default)]
13pub enum WorkflowStatus {
14 #[default]
16 Pending,
17 Running,
19 Paused,
21 Completed,
23 Failed,
25 Cancelled,
27}
28
29
30#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
32#[serde(rename_all = "snake_case")]
33#[derive(Default)]
34pub enum NodeStatus {
35 #[default]
37 Pending,
38 Running,
40 Completed,
42 Failed,
44 Skipped,
46}
47
48
49#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct NodeExecution {
52 pub node_id: String,
54 pub status: NodeStatus,
56 pub started_at: Option<DateTime<Utc>>,
58 pub finished_at: Option<DateTime<Utc>>,
60 pub retry_count: u32,
62 pub error: Option<String>,
64 pub output: Option<serde_json::Value>,
66}
67
68impl NodeExecution {
69 pub fn new(node_id: String) -> Self {
70 Self {
71 node_id,
72 status: NodeStatus::Pending,
73 started_at: None,
74 finished_at: None,
75 retry_count: 0,
76 error: None,
77 output: None,
78 }
79 }
80
81 pub fn start(&mut self) {
82 self.status = NodeStatus::Running;
83 self.started_at = Some(Utc::now());
84 }
85
86 pub fn complete(&mut self, output: Option<serde_json::Value>) {
87 self.status = NodeStatus::Completed;
88 self.finished_at = Some(Utc::now());
89 self.output = output;
90 }
91
92 pub fn fail(&mut self, error: String) {
93 self.status = NodeStatus::Failed;
94 self.finished_at = Some(Utc::now());
95 self.error = Some(error);
96 }
97
98 pub fn skip(&mut self) {
99 self.status = NodeStatus::Skipped;
100 self.finished_at = Some(Utc::now());
101 }
102
103 pub fn increment_retry(&mut self) {
104 self.retry_count += 1;
105 }
106
107 pub fn duration_ms(&self) -> Option<i64> {
109 match (self.started_at, self.finished_at) {
110 (Some(start), Some(end)) => {
111 Some((end - start).num_milliseconds())
112 }
113 _ => None,
114 }
115 }
116}
117
118#[derive(Debug, Clone, Serialize, Deserialize)]
120pub struct WorkflowContext {
121 pub instance_id: String,
123 pub workflow_id: String,
125 pub status: WorkflowStatus,
127 pub current_node_id: Option<String>,
129 pub inputs: HashMap<String, serde_json::Value>,
131 pub variables: HashMap<String, serde_json::Value>,
133 pub node_executions: HashMap<String, NodeExecution>,
135 pub execution_path: Vec<String>,
137 pub created_at: DateTime<Utc>,
139 pub updated_at: DateTime<Utc>,
141 pub started_at: Option<DateTime<Utc>>,
143 pub finished_at: Option<DateTime<Utc>>,
145 pub error: Option<String>,
147}
148
149impl WorkflowContext {
150 pub fn new(workflow_id: String, inputs: HashMap<String, serde_json::Value>) -> Self {
152 let now = Utc::now();
153 Self {
154 instance_id: format!("inst_{}", uuid::Uuid::new_v4()),
155 workflow_id,
156 status: WorkflowStatus::Pending,
157 current_node_id: None,
158 inputs,
159 variables: HashMap::new(),
160 node_executions: HashMap::new(),
161 execution_path: Vec::new(),
162 created_at: now,
163 updated_at: now,
164 started_at: None,
165 finished_at: None,
166 error: None,
167 }
168 }
169
170 pub fn set_variable(&mut self, key: String, value: serde_json::Value) {
172 self.variables.insert(key, value);
173 self.updated_at = Utc::now();
174 }
175
176 pub fn get_variable(&self, key: &str) -> Option<&serde_json::Value> {
178 self.variables.get(key)
179 }
180
181 pub fn set_input(&mut self, key: String, value: serde_json::Value) {
183 self.inputs.insert(key, value);
184 self.updated_at = Utc::now();
185 }
186
187 pub fn get_input(&self, key: &str) -> Option<&serde_json::Value> {
189 self.inputs.get(key)
190 }
191
192 pub fn start(&mut self) {
194 self.status = WorkflowStatus::Running;
195 self.started_at = Some(Utc::now());
196 self.updated_at = Utc::now();
197 }
198
199 pub fn complete(&mut self) {
201 self.status = WorkflowStatus::Completed;
202 self.finished_at = Some(Utc::now());
203 self.updated_at = Utc::now();
204 }
205
206 pub fn fail(&mut self, error: String) {
208 self.status = WorkflowStatus::Failed;
209 self.error = Some(error);
210 self.finished_at = Some(Utc::now());
211 self.updated_at = Utc::now();
212 }
213
214 pub fn cancel(&mut self) {
216 self.status = WorkflowStatus::Cancelled;
217 self.finished_at = Some(Utc::now());
218 self.updated_at = Utc::now();
219 }
220
221 pub fn pause(&mut self) {
223 self.status = WorkflowStatus::Paused;
224 self.updated_at = Utc::now();
225 }
226
227 pub fn resume(&mut self) {
229 self.status = WorkflowStatus::Running;
230 self.updated_at = Utc::now();
231 }
232
233 pub fn set_current_node(&mut self, node_id: String) {
235 self.current_node_id = Some(node_id.clone());
236 self.execution_path.push(node_id);
237 self.updated_at = Utc::now();
238 }
239
240 pub fn get_or_create_node_execution(&mut self, node_id: &str) -> &mut NodeExecution {
242 self.node_executions
243 .entry(node_id.to_string())
244 .or_insert_with(|| NodeExecution::new(node_id.to_string()))
245 }
246
247 pub fn get_node_execution(&self, node_id: &str) -> Option<&NodeExecution> {
249 self.node_executions.get(node_id)
250 }
251
252 pub fn total_duration_ms(&self) -> Option<i64> {
254 match (self.started_at, self.finished_at) {
255 (Some(start), Some(end)) => Some((end - start).num_milliseconds()),
256 _ => None,
257 }
258 }
259
260 pub fn can_continue(&self) -> bool {
262 matches!(self.status, WorkflowStatus::Pending | WorkflowStatus::Running)
263 }
264}
265
266#[cfg(test)]
267mod tests {
268 use super::*;
269
270 #[test]
271 fn test_workflow_context_new() {
272 let ctx = WorkflowContext::new(
273 "test-workflow".to_string(),
274 HashMap::new(),
275 );
276 assert_eq!(ctx.status, WorkflowStatus::Pending);
277 assert!(ctx.current_node_id.is_none());
278 }
279
280 #[test]
281 fn test_workflow_context_lifecycle() {
282 let mut ctx = WorkflowContext::new(
283 "test-workflow".to_string(),
284 HashMap::new(),
285 );
286
287 ctx.start();
288 assert_eq!(ctx.status, WorkflowStatus::Running);
289
290 ctx.set_current_node("node1".to_string());
291 assert_eq!(ctx.current_node_id, Some("node1".to_string()));
292
293 ctx.complete();
294 assert_eq!(ctx.status, WorkflowStatus::Completed);
295 assert!(ctx.finished_at.is_some());
296 }
297
298 #[test]
299 fn test_node_execution() {
300 let mut exec = NodeExecution::new("node1".to_string());
301 assert_eq!(exec.status, NodeStatus::Pending);
302
303 exec.start();
304 assert_eq!(exec.status, NodeStatus::Running);
305
306 exec.complete(Some(serde_json::json!({"result": "ok"})));
307 assert_eq!(exec.status, NodeStatus::Completed);
308 assert!(exec.output.is_some());
309 }
310
311 #[test]
312 fn test_pause_resume() {
313 let mut ctx = WorkflowContext::new(
314 "test-workflow".to_string(),
315 HashMap::new(),
316 );
317
318 ctx.start();
319 assert_eq!(ctx.status, WorkflowStatus::Running);
320 assert!(ctx.can_continue());
321
322 ctx.pause();
323 assert_eq!(ctx.status, WorkflowStatus::Paused);
324 assert!(!ctx.can_continue());
325
326 ctx.resume();
327 assert_eq!(ctx.status, WorkflowStatus::Running);
328 assert!(ctx.can_continue());
329 }
330}