Skip to main content

matrixcode_core/workflow/
context.rs

1//! Workflow Context and Runtime State
2//!
3//! 管理工作流的运行时状态和上下文数据。
4
5use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8
9/// 工作流状态
10#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
11#[serde(rename_all = "snake_case")]
12#[derive(Default)]
13pub enum WorkflowStatus {
14    /// 待执行
15    #[default]
16    Pending,
17    /// 运行中
18    Running,
19    /// 已暂停
20    Paused,
21    /// 已完成
22    Completed,
23    /// 已失败
24    Failed,
25    /// 已取消
26    Cancelled,
27}
28
29/// 节点状态
30#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
31#[serde(rename_all = "snake_case")]
32#[derive(Default)]
33pub enum NodeStatus {
34    /// 待执行
35    #[default]
36    Pending,
37    /// 运行中
38    Running,
39    /// 已完成
40    Completed,
41    /// 已失败
42    Failed,
43    /// 已跳过
44    Skipped,
45}
46
47/// 节点执行记录
48#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct NodeExecution {
50    /// 节点ID
51    pub node_id: String,
52    /// 执行状态
53    pub status: NodeStatus,
54    /// 开始时间
55    pub started_at: Option<DateTime<Utc>>,
56    /// 结束时间
57    pub finished_at: Option<DateTime<Utc>>,
58    /// 重试次数
59    pub retry_count: u32,
60    /// 错误信息
61    pub error: Option<String>,
62    /// 输出数据
63    pub output: Option<serde_json::Value>,
64}
65
66impl NodeExecution {
67    pub fn new(node_id: String) -> Self {
68        Self {
69            node_id,
70            status: NodeStatus::Pending,
71            started_at: None,
72            finished_at: None,
73            retry_count: 0,
74            error: None,
75            output: None,
76        }
77    }
78
79    pub fn start(&mut self) {
80        self.status = NodeStatus::Running;
81        self.started_at = Some(Utc::now());
82    }
83
84    pub fn complete(&mut self, output: Option<serde_json::Value>) {
85        self.status = NodeStatus::Completed;
86        self.finished_at = Some(Utc::now());
87        self.output = output;
88    }
89
90    pub fn fail(&mut self, error: String) {
91        self.status = NodeStatus::Failed;
92        self.finished_at = Some(Utc::now());
93        self.error = Some(error);
94    }
95
96    pub fn skip(&mut self) {
97        self.status = NodeStatus::Skipped;
98        self.finished_at = Some(Utc::now());
99    }
100
101    pub fn increment_retry(&mut self) {
102        self.retry_count += 1;
103    }
104
105    /// 获取执行时长(毫秒)
106    pub fn duration_ms(&self) -> Option<i64> {
107        match (self.started_at, self.finished_at) {
108            (Some(start), Some(end)) => Some((end - start).num_milliseconds()),
109            _ => None,
110        }
111    }
112}
113
114/// 工作流上下文
115#[derive(Debug, Clone, Serialize, Deserialize)]
116pub struct WorkflowContext {
117    /// 工作流实例ID
118    pub instance_id: String,
119    /// 工作流定义ID
120    pub workflow_id: String,
121    /// 当前状态
122    pub status: WorkflowStatus,
123    /// 当前节点ID
124    pub current_node_id: Option<String>,
125    /// 输入参数
126    pub inputs: HashMap<String, serde_json::Value>,
127    /// 变量存储
128    pub variables: HashMap<String, serde_json::Value>,
129    /// 节点执行记录
130    pub node_executions: HashMap<String, NodeExecution>,
131    /// 执行历史(节点ID顺序)
132    pub execution_path: Vec<String>,
133    /// 创建时间
134    pub created_at: DateTime<Utc>,
135    /// 更新时间
136    pub updated_at: DateTime<Utc>,
137    /// 开始时间
138    pub started_at: Option<DateTime<Utc>>,
139    /// 结束时间
140    pub finished_at: Option<DateTime<Utc>>,
141    /// 错误信息
142    pub error: Option<String>,
143}
144
145impl WorkflowContext {
146    /// 创建新的工作流上下文
147    pub fn new(workflow_id: String, inputs: HashMap<String, serde_json::Value>) -> Self {
148        let now = Utc::now();
149        Self {
150            instance_id: format!("inst_{}", uuid::Uuid::new_v4()),
151            workflow_id,
152            status: WorkflowStatus::Pending,
153            current_node_id: None,
154            inputs,
155            variables: HashMap::new(),
156            node_executions: HashMap::new(),
157            execution_path: Vec::new(),
158            created_at: now,
159            updated_at: now,
160            started_at: None,
161            finished_at: None,
162            error: None,
163        }
164    }
165
166    /// 设置变量
167    pub fn set_variable(&mut self, key: String, value: serde_json::Value) {
168        self.variables.insert(key, value);
169        self.updated_at = Utc::now();
170    }
171
172    /// 获取变量
173    pub fn get_variable(&self, key: &str) -> Option<&serde_json::Value> {
174        self.variables.get(key)
175    }
176
177    /// 设置输入参数
178    pub fn set_input(&mut self, key: String, value: serde_json::Value) {
179        self.inputs.insert(key, value);
180        self.updated_at = Utc::now();
181    }
182
183    /// 获取输入参数
184    pub fn get_input(&self, key: &str) -> Option<&serde_json::Value> {
185        self.inputs.get(key)
186    }
187
188    /// 开始工作流
189    pub fn start(&mut self) {
190        self.status = WorkflowStatus::Running;
191        self.started_at = Some(Utc::now());
192        self.updated_at = Utc::now();
193    }
194
195    /// 完成工作流
196    pub fn complete(&mut self) {
197        self.status = WorkflowStatus::Completed;
198        self.finished_at = Some(Utc::now());
199        self.updated_at = Utc::now();
200    }
201
202    /// 失败工作流
203    pub fn fail(&mut self, error: String) {
204        self.status = WorkflowStatus::Failed;
205        self.error = Some(error);
206        self.finished_at = Some(Utc::now());
207        self.updated_at = Utc::now();
208    }
209
210    /// 取消工作流
211    pub fn cancel(&mut self) {
212        self.status = WorkflowStatus::Cancelled;
213        self.finished_at = Some(Utc::now());
214        self.updated_at = Utc::now();
215    }
216
217    /// 暂停工作流
218    pub fn pause(&mut self) {
219        self.status = WorkflowStatus::Paused;
220        self.updated_at = Utc::now();
221    }
222
223    /// 恢复工作流
224    pub fn resume(&mut self) {
225        self.status = WorkflowStatus::Running;
226        self.updated_at = Utc::now();
227    }
228
229    /// 设置当前节点
230    pub fn set_current_node(&mut self, node_id: String) {
231        self.current_node_id = Some(node_id.clone());
232        self.execution_path.push(node_id);
233        self.updated_at = Utc::now();
234    }
235
236    /// 获取或创建节点执行记录
237    pub fn get_or_create_node_execution(&mut self, node_id: &str) -> &mut NodeExecution {
238        self.node_executions
239            .entry(node_id.to_string())
240            .or_insert_with(|| NodeExecution::new(node_id.to_string()))
241    }
242
243    /// 获取节点执行记录
244    pub fn get_node_execution(&self, node_id: &str) -> Option<&NodeExecution> {
245        self.node_executions.get(node_id)
246    }
247
248    /// 获取总执行时长(毫秒)
249    pub fn total_duration_ms(&self) -> Option<i64> {
250        match (self.started_at, self.finished_at) {
251            (Some(start), Some(end)) => Some((end - start).num_milliseconds()),
252            _ => None,
253        }
254    }
255
256    /// 检查是否可以继续执行
257    pub fn can_continue(&self) -> bool {
258        matches!(
259            self.status,
260            WorkflowStatus::Pending | WorkflowStatus::Running
261        )
262    }
263}
264
265#[cfg(test)]
266mod tests {
267    use super::*;
268
269    #[test]
270    fn test_workflow_context_new() {
271        let ctx = WorkflowContext::new("test-workflow".to_string(), HashMap::new());
272        assert_eq!(ctx.status, WorkflowStatus::Pending);
273        assert!(ctx.current_node_id.is_none());
274    }
275
276    #[test]
277    fn test_workflow_context_lifecycle() {
278        let mut ctx = WorkflowContext::new("test-workflow".to_string(), HashMap::new());
279
280        ctx.start();
281        assert_eq!(ctx.status, WorkflowStatus::Running);
282
283        ctx.set_current_node("node1".to_string());
284        assert_eq!(ctx.current_node_id, Some("node1".to_string()));
285
286        ctx.complete();
287        assert_eq!(ctx.status, WorkflowStatus::Completed);
288        assert!(ctx.finished_at.is_some());
289    }
290
291    #[test]
292    fn test_node_execution() {
293        let mut exec = NodeExecution::new("node1".to_string());
294        assert_eq!(exec.status, NodeStatus::Pending);
295
296        exec.start();
297        assert_eq!(exec.status, NodeStatus::Running);
298
299        exec.complete(Some(serde_json::json!({"result": "ok"})));
300        assert_eq!(exec.status, NodeStatus::Completed);
301        assert!(exec.output.is_some());
302    }
303
304    #[test]
305    fn test_pause_resume() {
306        let mut ctx = WorkflowContext::new("test-workflow".to_string(), HashMap::new());
307
308        ctx.start();
309        assert_eq!(ctx.status, WorkflowStatus::Running);
310        assert!(ctx.can_continue());
311
312        ctx.pause();
313        assert_eq!(ctx.status, WorkflowStatus::Paused);
314        assert!(!ctx.can_continue());
315
316        ctx.resume();
317        assert_eq!(ctx.status, WorkflowStatus::Running);
318        assert!(ctx.can_continue());
319    }
320}