flowbuilder_core/
execution_plan.rs

1//! # FlowBuilder Core - 执行计划和节点定义
2//!
3//! 定义流程执行的核心数据结构和接口
4
5use anyhow::Result;
6use std::collections::HashMap;
7
8/// 执行计划 - 编排器生成的执行顺序
9#[derive(Debug, Clone)]
10pub struct ExecutionPlan {
11    /// 执行阶段列表(按顺序执行)
12    pub phases: Vec<ExecutionPhase>,
13    /// 全局环境变量
14    pub env_vars: HashMap<String, serde_yaml::Value>,
15    /// 全局流程变量
16    pub flow_vars: HashMap<String, serde_yaml::Value>,
17    /// 计划元数据
18    pub metadata: PlanMetadata,
19}
20
21/// 执行阶段 - 可以串行或并行执行的任务组
22#[derive(Debug, Clone)]
23pub struct ExecutionPhase {
24    /// 阶段ID
25    pub id: String,
26    /// 阶段名称
27    pub name: String,
28    /// 执行模式
29    pub execution_mode: PhaseExecutionMode,
30    /// 该阶段包含的执行节点
31    pub nodes: Vec<ExecutionNode>,
32    /// 阶段条件
33    pub condition: Option<String>,
34}
35
36/// 阶段执行模式
37#[derive(Debug, Clone)]
38pub enum PhaseExecutionMode {
39    /// 串行执行
40    Sequential,
41    /// 并行执行
42    Parallel,
43    /// 条件执行
44    Conditional { condition: String },
45}
46
47/// 执行节点 - 最小的执行单元
48#[derive(Debug, Clone)]
49pub struct ExecutionNode {
50    /// 节点ID
51    pub id: String,
52    /// 节点名称
53    pub name: String,
54    /// 节点类型
55    pub node_type: NodeType,
56    /// 关联的动作定义
57    pub action_spec: ActionSpec,
58    /// 依赖的节点ID列表
59    pub dependencies: Vec<String>,
60    /// 节点执行条件
61    pub condition: Option<String>,
62    /// 节点优先级
63    pub priority: u32,
64    /// 重试配置
65    pub retry_config: Option<RetryConfig>,
66    /// 超时配置
67    pub timeout_config: Option<TimeoutConfig>,
68}
69
70/// 节点类型
71#[derive(Debug, Clone)]
72pub enum NodeType {
73    /// 动作节点
74    Action,
75    /// 条件节点
76    Condition,
77    /// 分支节点
78    Branch,
79    /// 循环节点
80    Loop,
81    /// 子流程节点
82    Subprocess,
83}
84
85/// 动作规格
86#[derive(Debug, Clone)]
87pub struct ActionSpec {
88    /// 动作类型
89    pub action_type: String,
90    /// 动作参数
91    pub parameters: HashMap<String, serde_yaml::Value>,
92    /// 动作输出
93    pub outputs: HashMap<String, serde_yaml::Value>,
94}
95
96/// 重试配置
97#[derive(Debug, Clone)]
98pub struct RetryConfig {
99    /// 最大重试次数
100    pub max_retries: u32,
101    /// 重试延迟(毫秒)
102    pub delay: u64,
103    /// 重试策略
104    pub strategy: RetryStrategy,
105}
106
107/// 重试策略
108#[derive(Debug, Clone)]
109pub enum RetryStrategy {
110    /// 固定延迟
111    Fixed,
112    /// 指数退避
113    Exponential { multiplier: f64 },
114    /// 线性增长
115    Linear { increment: u64 },
116}
117
118/// 超时配置
119#[derive(Debug, Clone)]
120pub struct TimeoutConfig {
121    /// 超时时间(毫秒)
122    pub duration: u64,
123    /// 超时处理动作
124    pub on_timeout: Option<String>,
125}
126
127/// 计划元数据
128#[derive(Debug, Clone)]
129pub struct PlanMetadata {
130    /// 计划ID
131    pub plan_id: String,
132    /// 创建时间
133    pub created_at: std::time::SystemTime,
134    /// 工作流名称
135    pub workflow_name: String,
136    /// 工作流版本
137    pub workflow_version: String,
138    /// 总节点数
139    pub total_nodes: usize,
140    /// 总阶段数
141    pub total_phases: usize,
142}
143
144/// 执行器接口 - 所有执行器都必须实现这个接口
145pub trait Executor {
146    type Input;
147    type Output;
148    type Error;
149
150    /// 执行输入并返回结果
151    fn execute(
152        &mut self,
153        input: Self::Input,
154    ) -> impl std::future::Future<Output = Result<Self::Output, Self::Error>> + Send;
155
156    /// 获取执行器状态
157    fn status(&self) -> ExecutorStatus;
158
159    /// 停止执行器
160    fn stop(
161        &mut self,
162    ) -> impl std::future::Future<Output = Result<(), Self::Error>> + Send;
163}
164
165/// 执行器状态
166#[derive(Debug, Clone, PartialEq)]
167pub enum ExecutorStatus {
168    /// 空闲状态
169    Idle,
170    /// 运行中
171    Running,
172    /// 已停止
173    Stopped,
174    /// 错误状态
175    Error(String),
176}
177
178/// 配置解析器接口
179pub trait ConfigParser<T> {
180    type Output;
181    type Error;
182
183    /// 解析配置
184    fn parse(&self, config: T) -> Result<Self::Output, Self::Error>;
185}
186
187/// 流程编排器接口
188pub trait FlowPlanner {
189    type Input;
190    type Output;
191    type Error;
192
193    /// 创建执行计划
194    fn create_execution_plan(
195        &self,
196        input: Self::Input,
197    ) -> Result<Self::Output, Self::Error>;
198
199    /// 优化执行计划
200    fn optimize_plan(
201        &self,
202        plan: Self::Output,
203    ) -> Result<Self::Output, Self::Error>;
204}
205
206/// 表达式评估器接口
207pub trait ExpressionEvaluator {
208    type Value;
209    type Error;
210
211    /// 评估表达式
212    fn evaluate(&self, expression: &str) -> Result<Self::Value, Self::Error>;
213
214    /// 评估条件
215    fn evaluate_condition(&self, condition: &str) -> Result<bool, Self::Error>;
216
217    /// 设置变量
218    fn set_variable(&mut self, name: String, value: Self::Value);
219
220    /// 获取变量
221    fn get_variable(&self, name: &str) -> Option<Self::Value>;
222}
223
224impl ExecutionPlan {
225    /// 创建新的执行计划
226    pub fn new(
227        workflow_name: String,
228        workflow_version: String,
229        env_vars: HashMap<String, serde_yaml::Value>,
230        flow_vars: HashMap<String, serde_yaml::Value>,
231    ) -> Self {
232        let metadata = PlanMetadata {
233            plan_id: uuid::Uuid::new_v4().to_string(),
234            created_at: std::time::SystemTime::now(),
235            workflow_name,
236            workflow_version,
237            total_nodes: 0,
238            total_phases: 0,
239        };
240
241        Self {
242            phases: Vec::new(),
243            env_vars,
244            flow_vars,
245            metadata,
246        }
247    }
248
249    /// 添加执行阶段
250    pub fn add_phase(&mut self, phase: ExecutionPhase) {
251        self.metadata.total_nodes += phase.nodes.len();
252        self.phases.push(phase);
253        self.metadata.total_phases = self.phases.len();
254    }
255
256    /// 获取总执行时间估计
257    pub fn estimated_duration(&self) -> std::time::Duration {
258        // 简化的估计逻辑
259        let total_nodes = self.metadata.total_nodes;
260        std::time::Duration::from_millis((total_nodes * 100) as u64)
261    }
262
263    /// 验证计划的有效性
264    pub fn validate(&self) -> Result<(), String> {
265        if self.phases.is_empty() {
266            return Err("执行计划不能为空".to_string());
267        }
268
269        // 验证每个阶段
270        for phase in &self.phases {
271            if phase.nodes.is_empty() {
272                return Err(format!("阶段 {} 不能为空", phase.name));
273            }
274        }
275
276        Ok(())
277    }
278}
279
280impl ExecutionNode {
281    /// 创建新的执行节点
282    pub fn new(id: String, name: String, action_spec: ActionSpec) -> Self {
283        Self {
284            id,
285            name,
286            node_type: NodeType::Action,
287            action_spec,
288            dependencies: Vec::new(),
289            condition: None,
290            priority: 100,
291            retry_config: None,
292            timeout_config: None,
293        }
294    }
295
296    /// 添加依赖
297    pub fn add_dependency(mut self, dependency: String) -> Self {
298        self.dependencies.push(dependency);
299        self
300    }
301
302    /// 设置条件
303    pub fn with_condition(mut self, condition: String) -> Self {
304        self.condition = Some(condition);
305        self
306    }
307
308    /// 设置优先级
309    pub fn with_priority(mut self, priority: u32) -> Self {
310        self.priority = priority;
311        self
312    }
313
314    /// 设置重试配置
315    pub fn with_retry(mut self, retry_config: RetryConfig) -> Self {
316        self.retry_config = Some(retry_config);
317        self
318    }
319
320    /// 设置超时配置
321    pub fn with_timeout(mut self, timeout_config: TimeoutConfig) -> Self {
322        self.timeout_config = Some(timeout_config);
323        self
324    }
325}