flowbuilder_runtime/
enhanced_executor.rs

1//! # FlowBuilder Runtime - 增强的任务执行器
2//!
3//! 基于执行计划的任务执行器,负责执行具体的任务
4
5use anyhow::Result;
6use flowbuilder_context::SharedContext;
7use flowbuilder_core::{
8    ActionSpec, ExecutionNode, ExecutionPhase, ExecutionPlan, Executor,
9    ExecutorStatus, PhaseExecutionMode, RetryStrategy,
10};
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13use tokio::sync::Semaphore;
14
15/// 增强的任务执行器
16pub struct EnhancedTaskExecutor {
17    /// 执行器配置
18    config: ExecutorConfig,
19    /// 执行器状态
20    status: ExecutorStatus,
21    /// 并发控制信号量
22    semaphore: Arc<Semaphore>,
23    /// 执行统计
24    stats: ExecutionStats,
25}
26
27/// 执行器配置
28#[derive(Debug, Clone)]
29pub struct ExecutorConfig {
30    /// 最大并发任务数
31    pub max_concurrent_tasks: usize,
32    /// 默认超时时间(毫秒)
33    pub default_timeout: u64,
34    /// 是否启用性能监控
35    pub enable_performance_monitoring: bool,
36    /// 是否启用详细日志
37    pub enable_detailed_logging: bool,
38}
39
40impl Default for ExecutorConfig {
41    fn default() -> Self {
42        Self {
43            max_concurrent_tasks: 10,
44            default_timeout: 30000, // 30秒
45            enable_performance_monitoring: true,
46            enable_detailed_logging: false,
47        }
48    }
49}
50
51/// 执行统计
52#[derive(Debug, Clone, Default)]
53pub struct ExecutionStats {
54    /// 总任务数
55    pub total_tasks: usize,
56    /// 成功任务数
57    pub successful_tasks: usize,
58    /// 失败任务数
59    pub failed_tasks: usize,
60    /// 跳过任务数
61    pub skipped_tasks: usize,
62    /// 总执行时间
63    pub total_execution_time: Duration,
64    /// 平均执行时间
65    pub average_execution_time: Duration,
66}
67
68impl Default for EnhancedTaskExecutor {
69    fn default() -> Self {
70        Self::new()
71    }
72}
73
74impl EnhancedTaskExecutor {
75    /// 创建新的任务执行器
76    pub fn new() -> Self {
77        let config = ExecutorConfig::default();
78        let semaphore = Arc::new(Semaphore::new(config.max_concurrent_tasks));
79
80        Self {
81            config,
82            status: ExecutorStatus::Idle,
83            semaphore,
84            stats: ExecutionStats::default(),
85        }
86    }
87
88    /// 使用配置创建任务执行器
89    pub fn with_config(config: ExecutorConfig) -> Self {
90        let semaphore = Arc::new(Semaphore::new(config.max_concurrent_tasks));
91
92        Self {
93            config,
94            status: ExecutorStatus::Idle,
95            semaphore,
96            stats: ExecutionStats::default(),
97        }
98    }
99
100    /// 执行执行计划
101    pub async fn execute_plan(
102        &mut self,
103        plan: ExecutionPlan,
104        context: SharedContext,
105    ) -> Result<ExecutionResult> {
106        self.status = ExecutorStatus::Running;
107        let start_time = Instant::now();
108
109        if self.config.enable_detailed_logging {
110            println!("开始执行计划: {}", plan.metadata.workflow_name);
111            println!("总阶段数: {}", plan.phases.len());
112            println!("总节点数: {}", plan.metadata.total_nodes);
113        }
114
115        let mut result = ExecutionResult {
116            plan_id: plan.metadata.plan_id.clone(),
117            start_time,
118            end_time: None,
119            phase_results: Vec::new(),
120            total_duration: Duration::default(),
121            success: true,
122            error_message: None,
123        };
124
125        // 设置环境变量和流程变量到上下文
126        self.setup_context(&plan, context.clone()).await?;
127
128        // 按阶段执行
129        for (index, phase) in plan.phases.iter().enumerate() {
130            if self.config.enable_detailed_logging {
131                println!(
132                    "执行阶段 {}: {} ({:?})",
133                    index + 1,
134                    phase.name,
135                    phase.execution_mode
136                );
137            }
138
139            let phase_start = Instant::now();
140            let phase_result =
141                match self.execute_phase(phase, context.clone()).await {
142                    Ok(r) => r,
143                    Err(e) => {
144                        result.success = false;
145                        result.error_message = Some(e.to_string());
146                        PhaseResult {
147                            phase_id: phase.id.clone(),
148                            phase_name: phase.name.clone(),
149                            start_time: phase_start,
150                            end_time: Some(Instant::now()),
151                            duration: phase_start.elapsed(),
152                            success: false,
153                            error_message: Some(e.to_string()),
154                            node_results: Vec::new(),
155                        }
156                    }
157                };
158
159            result.phase_results.push(phase_result);
160
161            // 如果阶段失败,根据配置决定是否继续
162            if !result.success {
163                break;
164            }
165        }
166
167        result.end_time = Some(Instant::now());
168        result.total_duration = start_time.elapsed();
169
170        // 更新统计信息
171        self.update_stats(&result);
172
173        self.status = ExecutorStatus::Idle;
174
175        if self.config.enable_detailed_logging {
176            println!("执行计划完成,总用时: {:?}", result.total_duration);
177        }
178
179        Ok(result)
180    }
181
182    /// 执行阶段
183    async fn execute_phase(
184        &mut self,
185        phase: &ExecutionPhase,
186        context: SharedContext,
187    ) -> Result<PhaseResult> {
188        let start_time = Instant::now();
189        let mut phase_result = PhaseResult {
190            phase_id: phase.id.clone(),
191            phase_name: phase.name.clone(),
192            start_time,
193            end_time: None,
194            duration: Duration::default(),
195            success: true,
196            error_message: None,
197            node_results: Vec::new(),
198        };
199
200        // 检查阶段条件
201        if let Some(condition) = &phase.condition {
202            // 这里应该使用表达式评估器检查条件
203            // 为了简化,这里假设条件总是满足
204            if self.config.enable_detailed_logging {
205                println!("  检查阶段条件: {condition}");
206            }
207        }
208
209        match phase.execution_mode {
210            PhaseExecutionMode::Sequential => {
211                for node in &phase.nodes {
212                    let node_result =
213                        self.execute_node(node, context.clone()).await?;
214                    phase_result.node_results.push(node_result);
215                }
216            }
217            PhaseExecutionMode::Parallel => {
218                let mut handles = Vec::new();
219
220                for node in &phase.nodes {
221                    let node_clone = node.clone();
222                    let context_clone = context.clone();
223                    let semaphore = self.semaphore.clone();
224                    let config = self.config.clone();
225
226                    let handle = tokio::spawn(async move {
227                        let _permit = semaphore.acquire().await.unwrap();
228                        Self::execute_node_static(
229                            &node_clone,
230                            context_clone,
231                            &config,
232                        )
233                        .await
234                    });
235
236                    handles.push(handle);
237                }
238
239                // 等待所有任务完成
240                for handle in handles {
241                    match handle.await {
242                        Ok(node_result) => match node_result {
243                            Ok(result) => {
244                                phase_result.node_results.push(result)
245                            }
246                            Err(e) => {
247                                phase_result.success = false;
248                                phase_result.error_message =
249                                    Some(e.to_string());
250                                return Err(e);
251                            }
252                        },
253                        Err(e) => {
254                            phase_result.success = false;
255                            phase_result.error_message = Some(e.to_string());
256                            return Err(anyhow::anyhow!("任务执行失败: {}", e));
257                        }
258                    }
259                }
260            }
261            PhaseExecutionMode::Conditional { ref condition } => {
262                // 检查条件
263                if self.config.enable_detailed_logging {
264                    println!("  检查条件: {condition}");
265                }
266
267                // 简化的条件检查,实际应该使用表达式评估器
268                let condition_met = true; // 假设条件满足
269
270                if condition_met {
271                    for node in &phase.nodes {
272                        let node_result =
273                            self.execute_node(node, context.clone()).await?;
274                        phase_result.node_results.push(node_result);
275                    }
276                } else if self.config.enable_detailed_logging {
277                    println!("  跳过阶段 {} (条件不满足)", phase.name);
278                }
279            }
280        }
281
282        phase_result.end_time = Some(Instant::now());
283        phase_result.duration = start_time.elapsed();
284
285        Ok(phase_result)
286    }
287
288    /// 执行节点
289    async fn execute_node(
290        &mut self,
291        node: &ExecutionNode,
292        context: SharedContext,
293    ) -> Result<NodeResult> {
294        Self::execute_node_static(node, context, &self.config).await
295    }
296
297    /// 静态执行节点(用于并发执行)
298    async fn execute_node_static(
299        node: &ExecutionNode,
300        context: SharedContext,
301        config: &ExecutorConfig,
302    ) -> Result<NodeResult> {
303        let start_time = Instant::now();
304        let mut result = NodeResult {
305            node_id: node.id.clone(),
306            node_name: node.name.clone(),
307            start_time,
308            end_time: None,
309            duration: Duration::default(),
310            success: true,
311            error_message: None,
312            retry_count: 0,
313        };
314
315        if config.enable_detailed_logging {
316            println!("    执行节点: {} - {}", node.id, node.name);
317        }
318
319        // 检查节点条件
320        if let Some(condition) = &node.condition {
321            if config.enable_detailed_logging {
322                println!("      检查节点条件: {condition}");
323            }
324            // 简化的条件检查
325            let condition_met = true;
326            if !condition_met {
327                if config.enable_detailed_logging {
328                    println!("      跳过节点 {} (条件不满足)", node.name);
329                }
330                result.end_time = Some(Instant::now());
331                result.duration = start_time.elapsed();
332                return Ok(result);
333            }
334        }
335
336        // 执行重试逻辑
337        let max_retries = node
338            .retry_config
339            .as_ref()
340            .map(|c| c.max_retries)
341            .unwrap_or(0);
342        let mut retries = 0;
343
344        loop {
345            let execute_result =
346                Self::execute_node_action(node, context.clone(), config).await;
347
348            match execute_result {
349                Ok(()) => {
350                    result.success = true;
351                    break;
352                }
353                Err(e) => {
354                    if retries < max_retries {
355                        retries += 1;
356                        result.retry_count = retries;
357
358                        if config.enable_detailed_logging {
359                            println!(
360                                "      重试节点 {} ({}/{})",
361                                node.name, retries, max_retries
362                            );
363                        }
364
365                        if let Some(retry_config) = &node.retry_config {
366                            let delay = match retry_config.strategy {
367                                RetryStrategy::Fixed => retry_config.delay,
368                                RetryStrategy::Exponential { multiplier } => {
369                                    (retry_config.delay as f64
370                                        * multiplier.powi(retries as i32))
371                                        as u64
372                                }
373                                RetryStrategy::Linear { increment } => {
374                                    retry_config.delay
375                                        + (increment * retries as u64)
376                                }
377                            };
378
379                            tokio::time::sleep(Duration::from_millis(delay))
380                                .await;
381                        }
382                        continue;
383                    } else {
384                        result.success = false;
385                        result.error_message = Some(e.to_string());
386                        break;
387                    }
388                }
389            }
390        }
391
392        result.end_time = Some(Instant::now());
393        result.duration = start_time.elapsed();
394
395        Ok(result)
396    }
397
398    /// 执行节点动作
399    async fn execute_node_action(
400        node: &ExecutionNode,
401        context: SharedContext,
402        config: &ExecutorConfig,
403    ) -> Result<()> {
404        let action_spec = &node.action_spec;
405
406        // 设置超时
407        let timeout_duration = node
408            .timeout_config
409            .as_ref()
410            .map(|c| Duration::from_millis(c.duration))
411            .unwrap_or_else(|| Duration::from_millis(config.default_timeout));
412
413        let action_future = Self::execute_action_by_type(action_spec, context);
414
415        match tokio::time::timeout(timeout_duration, action_future).await {
416            Ok(result) => result,
417            Err(_) => {
418                if config.enable_detailed_logging {
419                    println!("      节点 {} 执行超时", node.name);
420                }
421                Err(anyhow::anyhow!("节点 {} 执行超时", node.name))
422            }
423        }
424    }
425
426    /// 根据动作类型执行动作
427    async fn execute_action_by_type(
428        action_spec: &ActionSpec,
429        context: SharedContext,
430    ) -> Result<()> {
431        match action_spec.action_type.as_str() {
432            "builtin" => {
433                // 执行内置动作
434                tokio::time::sleep(Duration::from_millis(100)).await;
435                println!("        执行内置动作");
436            }
437            "cmd" => {
438                // 执行命令动作
439                tokio::time::sleep(Duration::from_millis(200)).await;
440                println!("        执行命令动作");
441            }
442            "http" => {
443                // 执行HTTP动作
444                tokio::time::sleep(Duration::from_millis(300)).await;
445                println!("        执行HTTP动作");
446            }
447            "wasm" => {
448                // 执行WASM动作
449                tokio::time::sleep(Duration::from_millis(150)).await;
450                println!("        执行WASM动作");
451            }
452            _ => {
453                return Err(anyhow::anyhow!(
454                    "不支持的动作类型: {}",
455                    action_spec.action_type
456                ));
457            }
458        }
459
460        // 存储输出到上下文
461        for (key, value) in &action_spec.outputs {
462            let mut guard = context.lock().await;
463            guard.set_variable(key.clone(), format!("{value:?}"));
464        }
465
466        Ok(())
467    }
468
469    /// 设置上下文
470    async fn setup_context(
471        &self,
472        plan: &ExecutionPlan,
473        context: SharedContext,
474    ) -> Result<()> {
475        let mut guard = context.lock().await;
476
477        // 设置环境变量
478        for (key, value) in &plan.env_vars {
479            guard.set_variable(format!("env.{key}"), format!("{value:?}"));
480        }
481
482        // 设置流程变量
483        for (key, value) in &plan.flow_vars {
484            guard.set_variable(format!("flow.{key}"), format!("{value:?}"));
485        }
486
487        Ok(())
488    }
489
490    /// 更新统计信息
491    fn update_stats(&mut self, result: &ExecutionResult) {
492        self.stats.total_execution_time = result.total_duration;
493
494        for phase_result in &result.phase_results {
495            for node_result in &phase_result.node_results {
496                self.stats.total_tasks += 1;
497                if node_result.success {
498                    self.stats.successful_tasks += 1;
499                } else {
500                    self.stats.failed_tasks += 1;
501                }
502            }
503        }
504
505        if self.stats.total_tasks > 0 {
506            self.stats.average_execution_time = Duration::from_nanos(
507                self.stats.total_execution_time.as_nanos() as u64
508                    / self.stats.total_tasks as u64,
509            );
510        }
511    }
512
513    /// 获取执行统计
514    pub fn get_stats(&self) -> &ExecutionStats {
515        &self.stats
516    }
517}
518
519impl Executor for EnhancedTaskExecutor {
520    type Input = (ExecutionPlan, SharedContext);
521    type Output = ExecutionResult;
522    type Error = anyhow::Error;
523
524    async fn execute(
525        &mut self,
526        input: Self::Input,
527    ) -> Result<Self::Output, Self::Error> {
528        let (plan, context) = input;
529        self.execute_plan(plan, context).await
530    }
531
532    fn status(&self) -> ExecutorStatus {
533        self.status.clone()
534    }
535
536    async fn stop(&mut self) -> Result<(), Self::Error> {
537        self.status = ExecutorStatus::Stopped;
538        Ok(())
539    }
540}
541
542/// 执行结果
543#[derive(Debug, Clone)]
544pub struct ExecutionResult {
545    /// 计划ID
546    pub plan_id: String,
547    /// 开始时间
548    pub start_time: Instant,
549    /// 结束时间
550    pub end_time: Option<Instant>,
551    /// 阶段结果
552    pub phase_results: Vec<PhaseResult>,
553    /// 总执行时间
554    pub total_duration: Duration,
555    /// 是否成功
556    pub success: bool,
557    /// 错误信息
558    pub error_message: Option<String>,
559}
560
561/// 阶段结果
562#[derive(Debug, Clone)]
563pub struct PhaseResult {
564    /// 阶段ID
565    pub phase_id: String,
566    /// 阶段名称
567    pub phase_name: String,
568    /// 开始时间
569    pub start_time: Instant,
570    /// 结束时间
571    pub end_time: Option<Instant>,
572    /// 执行时间
573    pub duration: Duration,
574    /// 是否成功
575    pub success: bool,
576    /// 错误信息
577    pub error_message: Option<String>,
578    /// 节点结果
579    pub node_results: Vec<NodeResult>,
580}
581
582/// 节点结果
583#[derive(Debug, Clone)]
584pub struct NodeResult {
585    /// 节点ID
586    pub node_id: String,
587    /// 节点名称
588    pub node_name: String,
589    /// 开始时间
590    pub start_time: Instant,
591    /// 结束时间
592    pub end_time: Option<Instant>,
593    /// 执行时间
594    pub duration: Duration,
595    /// 是否成功
596    pub success: bool,
597    /// 错误信息
598    pub error_message: Option<String>,
599    /// 重试次数
600    pub retry_count: u32,
601}
602
603#[cfg(test)]
604mod tests {
605    use super::*;
606    use flowbuilder_core::{ActionSpec, ExecutionNode};
607    use std::collections::HashMap;
608
609    #[tokio::test]
610    async fn test_executor_creation() {
611        let executor = EnhancedTaskExecutor::new();
612        assert_eq!(executor.status(), ExecutorStatus::Idle);
613    }
614
615    #[tokio::test]
616    async fn test_node_execution() {
617        let config = ExecutorConfig::default();
618        let context = Arc::new(tokio::sync::Mutex::new(
619            flowbuilder_context::FlowContext::default(),
620        ));
621
622        let node = ExecutionNode::new(
623            "test_node".to_string(),
624            "Test Node".to_string(),
625            ActionSpec {
626                action_type: "builtin".to_string(),
627                parameters: HashMap::new(),
628                outputs: HashMap::new(),
629            },
630        );
631
632        let result =
633            EnhancedTaskExecutor::execute_node_static(&node, context, &config)
634                .await;
635        assert!(result.is_ok());
636
637        let node_result = result.unwrap();
638        assert!(node_result.success);
639        assert_eq!(node_result.node_id, "test_node");
640    }
641}