Skip to main content

matrixcode_core/workflow/
context.rs

1//! Workflow Context and Runtime State
2//!
3//! 管理工作流的运行时状态和上下文数据。
4
5use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8
9/// 工作流状态
10#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
11#[serde(rename_all = "snake_case")]
12#[derive(Default)]
13pub enum WorkflowStatus {
14    /// 待执行
15    #[default]
16    Pending,
17    /// 运行中
18    Running,
19    /// 已暂停
20    Paused,
21    /// 已完成
22    Completed,
23    /// 已失败
24    Failed,
25    /// 已取消
26    Cancelled,
27}
28
29
30/// 节点状态
31#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
32#[serde(rename_all = "snake_case")]
33#[derive(Default)]
34pub enum NodeStatus {
35    /// 待执行
36    #[default]
37    Pending,
38    /// 运行中
39    Running,
40    /// 已完成
41    Completed,
42    /// 已失败
43    Failed,
44    /// 已跳过
45    Skipped,
46}
47
48
49/// 节点执行记录
50#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct NodeExecution {
52    /// 节点ID
53    pub node_id: String,
54    /// 执行状态
55    pub status: NodeStatus,
56    /// 开始时间
57    pub started_at: Option<DateTime<Utc>>,
58    /// 结束时间
59    pub finished_at: Option<DateTime<Utc>>,
60    /// 重试次数
61    pub retry_count: u32,
62    /// 错误信息
63    pub error: Option<String>,
64    /// 输出数据
65    pub output: Option<serde_json::Value>,
66}
67
68impl NodeExecution {
69    pub fn new(node_id: String) -> Self {
70        Self {
71            node_id,
72            status: NodeStatus::Pending,
73            started_at: None,
74            finished_at: None,
75            retry_count: 0,
76            error: None,
77            output: None,
78        }
79    }
80
81    pub fn start(&mut self) {
82        self.status = NodeStatus::Running;
83        self.started_at = Some(Utc::now());
84    }
85
86    pub fn complete(&mut self, output: Option<serde_json::Value>) {
87        self.status = NodeStatus::Completed;
88        self.finished_at = Some(Utc::now());
89        self.output = output;
90    }
91
92    pub fn fail(&mut self, error: String) {
93        self.status = NodeStatus::Failed;
94        self.finished_at = Some(Utc::now());
95        self.error = Some(error);
96    }
97
98    pub fn skip(&mut self) {
99        self.status = NodeStatus::Skipped;
100        self.finished_at = Some(Utc::now());
101    }
102
103    pub fn increment_retry(&mut self) {
104        self.retry_count += 1;
105    }
106
107    /// 获取执行时长(毫秒)
108    pub fn duration_ms(&self) -> Option<i64> {
109        match (self.started_at, self.finished_at) {
110            (Some(start), Some(end)) => {
111                Some((end - start).num_milliseconds())
112            }
113            _ => None,
114        }
115    }
116}
117
118/// 工作流上下文
119#[derive(Debug, Clone, Serialize, Deserialize)]
120pub struct WorkflowContext {
121    /// 工作流实例ID
122    pub instance_id: String,
123    /// 工作流定义ID
124    pub workflow_id: String,
125    /// 当前状态
126    pub status: WorkflowStatus,
127    /// 当前节点ID
128    pub current_node_id: Option<String>,
129    /// 输入参数
130    pub inputs: HashMap<String, serde_json::Value>,
131    /// 变量存储
132    pub variables: HashMap<String, serde_json::Value>,
133    /// 节点执行记录
134    pub node_executions: HashMap<String, NodeExecution>,
135    /// 执行历史(节点ID顺序)
136    pub execution_path: Vec<String>,
137    /// 创建时间
138    pub created_at: DateTime<Utc>,
139    /// 更新时间
140    pub updated_at: DateTime<Utc>,
141    /// 开始时间
142    pub started_at: Option<DateTime<Utc>>,
143    /// 结束时间
144    pub finished_at: Option<DateTime<Utc>>,
145    /// 错误信息
146    pub error: Option<String>,
147}
148
149impl WorkflowContext {
150    /// 创建新的工作流上下文
151    pub fn new(workflow_id: String, inputs: HashMap<String, serde_json::Value>) -> Self {
152        let now = Utc::now();
153        Self {
154            instance_id: format!("inst_{}", uuid::Uuid::new_v4()),
155            workflow_id,
156            status: WorkflowStatus::Pending,
157            current_node_id: None,
158            inputs,
159            variables: HashMap::new(),
160            node_executions: HashMap::new(),
161            execution_path: Vec::new(),
162            created_at: now,
163            updated_at: now,
164            started_at: None,
165            finished_at: None,
166            error: None,
167        }
168    }
169
170    /// 设置变量
171    pub fn set_variable(&mut self, key: String, value: serde_json::Value) {
172        self.variables.insert(key, value);
173        self.updated_at = Utc::now();
174    }
175
176    /// 获取变量
177    pub fn get_variable(&self, key: &str) -> Option<&serde_json::Value> {
178        self.variables.get(key)
179    }
180
181    /// 设置输入参数
182    pub fn set_input(&mut self, key: String, value: serde_json::Value) {
183        self.inputs.insert(key, value);
184        self.updated_at = Utc::now();
185    }
186
187    /// 获取输入参数
188    pub fn get_input(&self, key: &str) -> Option<&serde_json::Value> {
189        self.inputs.get(key)
190    }
191
192    /// 开始工作流
193    pub fn start(&mut self) {
194        self.status = WorkflowStatus::Running;
195        self.started_at = Some(Utc::now());
196        self.updated_at = Utc::now();
197    }
198
199    /// 完成工作流
200    pub fn complete(&mut self) {
201        self.status = WorkflowStatus::Completed;
202        self.finished_at = Some(Utc::now());
203        self.updated_at = Utc::now();
204    }
205
206    /// 失败工作流
207    pub fn fail(&mut self, error: String) {
208        self.status = WorkflowStatus::Failed;
209        self.error = Some(error);
210        self.finished_at = Some(Utc::now());
211        self.updated_at = Utc::now();
212    }
213
214    /// 取消工作流
215    pub fn cancel(&mut self) {
216        self.status = WorkflowStatus::Cancelled;
217        self.finished_at = Some(Utc::now());
218        self.updated_at = Utc::now();
219    }
220
221    /// 暂停工作流
222    pub fn pause(&mut self) {
223        self.status = WorkflowStatus::Paused;
224        self.updated_at = Utc::now();
225    }
226
227    /// 恢复工作流
228    pub fn resume(&mut self) {
229        self.status = WorkflowStatus::Running;
230        self.updated_at = Utc::now();
231    }
232
233    /// 设置当前节点
234    pub fn set_current_node(&mut self, node_id: String) {
235        self.current_node_id = Some(node_id.clone());
236        self.execution_path.push(node_id);
237        self.updated_at = Utc::now();
238    }
239
240    /// 获取或创建节点执行记录
241    pub fn get_or_create_node_execution(&mut self, node_id: &str) -> &mut NodeExecution {
242        self.node_executions
243            .entry(node_id.to_string())
244            .or_insert_with(|| NodeExecution::new(node_id.to_string()))
245    }
246
247    /// 获取节点执行记录
248    pub fn get_node_execution(&self, node_id: &str) -> Option<&NodeExecution> {
249        self.node_executions.get(node_id)
250    }
251
252    /// 获取总执行时长(毫秒)
253    pub fn total_duration_ms(&self) -> Option<i64> {
254        match (self.started_at, self.finished_at) {
255            (Some(start), Some(end)) => Some((end - start).num_milliseconds()),
256            _ => None,
257        }
258    }
259
260    /// 检查是否可以继续执行
261    pub fn can_continue(&self) -> bool {
262        matches!(self.status, WorkflowStatus::Pending | WorkflowStatus::Running)
263    }
264}
265
266#[cfg(test)]
267mod tests {
268    use super::*;
269
270    #[test]
271    fn test_workflow_context_new() {
272        let ctx = WorkflowContext::new(
273            "test-workflow".to_string(),
274            HashMap::new(),
275        );
276        assert_eq!(ctx.status, WorkflowStatus::Pending);
277        assert!(ctx.current_node_id.is_none());
278    }
279
280    #[test]
281    fn test_workflow_context_lifecycle() {
282        let mut ctx = WorkflowContext::new(
283            "test-workflow".to_string(),
284            HashMap::new(),
285        );
286
287        ctx.start();
288        assert_eq!(ctx.status, WorkflowStatus::Running);
289
290        ctx.set_current_node("node1".to_string());
291        assert_eq!(ctx.current_node_id, Some("node1".to_string()));
292
293        ctx.complete();
294        assert_eq!(ctx.status, WorkflowStatus::Completed);
295        assert!(ctx.finished_at.is_some());
296    }
297
298    #[test]
299    fn test_node_execution() {
300        let mut exec = NodeExecution::new("node1".to_string());
301        assert_eq!(exec.status, NodeStatus::Pending);
302
303        exec.start();
304        assert_eq!(exec.status, NodeStatus::Running);
305
306        exec.complete(Some(serde_json::json!({"result": "ok"})));
307        assert_eq!(exec.status, NodeStatus::Completed);
308        assert!(exec.output.is_some());
309    }
310
311    #[test]
312    fn test_pause_resume() {
313        let mut ctx = WorkflowContext::new(
314            "test-workflow".to_string(),
315            HashMap::new(),
316        );
317
318        ctx.start();
319        assert_eq!(ctx.status, WorkflowStatus::Running);
320        assert!(ctx.can_continue());
321
322        ctx.pause();
323        assert_eq!(ctx.status, WorkflowStatus::Paused);
324        assert!(!ctx.can_continue());
325
326        ctx.resume();
327        assert_eq!(ctx.status, WorkflowStatus::Running);
328        assert!(ctx.can_continue());
329    }
330}