1use anyhow::Result;
6use flowbuilder_context::SharedContext;
7use flowbuilder_core::{
8 ActionSpec, ExecutionNode, ExecutionPhase, ExecutionPlan, Executor,
9 ExecutorStatus, PhaseExecutionMode, RetryStrategy,
10};
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13use tokio::sync::Semaphore;
14
15pub struct EnhancedTaskExecutor {
17 config: ExecutorConfig,
19 status: ExecutorStatus,
21 semaphore: Arc<Semaphore>,
23 stats: ExecutionStats,
25}
26
27#[derive(Debug, Clone)]
29pub struct ExecutorConfig {
30 pub max_concurrent_tasks: usize,
32 pub default_timeout: u64,
34 pub enable_performance_monitoring: bool,
36 pub enable_detailed_logging: bool,
38}
39
40impl Default for ExecutorConfig {
41 fn default() -> Self {
42 Self {
43 max_concurrent_tasks: 10,
44 default_timeout: 30000, enable_performance_monitoring: true,
46 enable_detailed_logging: false,
47 }
48 }
49}
50
51#[derive(Debug, Clone, Default)]
53pub struct ExecutionStats {
54 pub total_tasks: usize,
56 pub successful_tasks: usize,
58 pub failed_tasks: usize,
60 pub skipped_tasks: usize,
62 pub total_execution_time: Duration,
64 pub average_execution_time: Duration,
66}
67
68impl Default for EnhancedTaskExecutor {
69 fn default() -> Self {
70 Self::new()
71 }
72}
73
74impl EnhancedTaskExecutor {
75 pub fn new() -> Self {
77 let config = ExecutorConfig::default();
78 let semaphore = Arc::new(Semaphore::new(config.max_concurrent_tasks));
79
80 Self {
81 config,
82 status: ExecutorStatus::Idle,
83 semaphore,
84 stats: ExecutionStats::default(),
85 }
86 }
87
88 pub fn with_config(config: ExecutorConfig) -> Self {
90 let semaphore = Arc::new(Semaphore::new(config.max_concurrent_tasks));
91
92 Self {
93 config,
94 status: ExecutorStatus::Idle,
95 semaphore,
96 stats: ExecutionStats::default(),
97 }
98 }
99
100 pub async fn execute_plan(
102 &mut self,
103 plan: ExecutionPlan,
104 context: SharedContext,
105 ) -> Result<ExecutionResult> {
106 self.status = ExecutorStatus::Running;
107 let start_time = Instant::now();
108
109 if self.config.enable_detailed_logging {
110 println!("开始执行计划: {}", plan.metadata.workflow_name);
111 println!("总阶段数: {}", plan.phases.len());
112 println!("总节点数: {}", plan.metadata.total_nodes);
113 }
114
115 let mut result = ExecutionResult {
116 plan_id: plan.metadata.plan_id.clone(),
117 start_time,
118 end_time: None,
119 phase_results: Vec::new(),
120 total_duration: Duration::default(),
121 success: true,
122 error_message: None,
123 };
124
125 self.setup_context(&plan, context.clone()).await?;
127
128 for (index, phase) in plan.phases.iter().enumerate() {
130 if self.config.enable_detailed_logging {
131 println!(
132 "执行阶段 {}: {} ({:?})",
133 index + 1,
134 phase.name,
135 phase.execution_mode
136 );
137 }
138
139 let phase_start = Instant::now();
140 let phase_result =
141 match self.execute_phase(phase, context.clone()).await {
142 Ok(r) => r,
143 Err(e) => {
144 result.success = false;
145 result.error_message = Some(e.to_string());
146 PhaseResult {
147 phase_id: phase.id.clone(),
148 phase_name: phase.name.clone(),
149 start_time: phase_start,
150 end_time: Some(Instant::now()),
151 duration: phase_start.elapsed(),
152 success: false,
153 error_message: Some(e.to_string()),
154 node_results: Vec::new(),
155 }
156 }
157 };
158
159 result.phase_results.push(phase_result);
160
161 if !result.success {
163 break;
164 }
165 }
166
167 result.end_time = Some(Instant::now());
168 result.total_duration = start_time.elapsed();
169
170 self.update_stats(&result);
172
173 self.status = ExecutorStatus::Idle;
174
175 if self.config.enable_detailed_logging {
176 println!("执行计划完成,总用时: {:?}", result.total_duration);
177 }
178
179 Ok(result)
180 }
181
182 async fn execute_phase(
184 &mut self,
185 phase: &ExecutionPhase,
186 context: SharedContext,
187 ) -> Result<PhaseResult> {
188 let start_time = Instant::now();
189 let mut phase_result = PhaseResult {
190 phase_id: phase.id.clone(),
191 phase_name: phase.name.clone(),
192 start_time,
193 end_time: None,
194 duration: Duration::default(),
195 success: true,
196 error_message: None,
197 node_results: Vec::new(),
198 };
199
200 if let Some(condition) = &phase.condition {
202 if self.config.enable_detailed_logging {
205 println!(" 检查阶段条件: {condition}");
206 }
207 }
208
209 match phase.execution_mode {
210 PhaseExecutionMode::Sequential => {
211 for node in &phase.nodes {
212 let node_result =
213 self.execute_node(node, context.clone()).await?;
214 phase_result.node_results.push(node_result);
215 }
216 }
217 PhaseExecutionMode::Parallel => {
218 let mut handles = Vec::new();
219
220 for node in &phase.nodes {
221 let node_clone = node.clone();
222 let context_clone = context.clone();
223 let semaphore = self.semaphore.clone();
224 let config = self.config.clone();
225
226 let handle = tokio::spawn(async move {
227 let _permit = semaphore.acquire().await.unwrap();
228 Self::execute_node_static(
229 &node_clone,
230 context_clone,
231 &config,
232 )
233 .await
234 });
235
236 handles.push(handle);
237 }
238
239 for handle in handles {
241 match handle.await {
242 Ok(node_result) => match node_result {
243 Ok(result) => {
244 phase_result.node_results.push(result)
245 }
246 Err(e) => {
247 phase_result.success = false;
248 phase_result.error_message =
249 Some(e.to_string());
250 return Err(e);
251 }
252 },
253 Err(e) => {
254 phase_result.success = false;
255 phase_result.error_message = Some(e.to_string());
256 return Err(anyhow::anyhow!("任务执行失败: {}", e));
257 }
258 }
259 }
260 }
261 PhaseExecutionMode::Conditional { ref condition } => {
262 if self.config.enable_detailed_logging {
264 println!(" 检查条件: {condition}");
265 }
266
267 let condition_met = true; if condition_met {
271 for node in &phase.nodes {
272 let node_result =
273 self.execute_node(node, context.clone()).await?;
274 phase_result.node_results.push(node_result);
275 }
276 } else if self.config.enable_detailed_logging {
277 println!(" 跳过阶段 {} (条件不满足)", phase.name);
278 }
279 }
280 }
281
282 phase_result.end_time = Some(Instant::now());
283 phase_result.duration = start_time.elapsed();
284
285 Ok(phase_result)
286 }
287
288 async fn execute_node(
290 &mut self,
291 node: &ExecutionNode,
292 context: SharedContext,
293 ) -> Result<NodeResult> {
294 Self::execute_node_static(node, context, &self.config).await
295 }
296
297 async fn execute_node_static(
299 node: &ExecutionNode,
300 context: SharedContext,
301 config: &ExecutorConfig,
302 ) -> Result<NodeResult> {
303 let start_time = Instant::now();
304 let mut result = NodeResult {
305 node_id: node.id.clone(),
306 node_name: node.name.clone(),
307 start_time,
308 end_time: None,
309 duration: Duration::default(),
310 success: true,
311 error_message: None,
312 retry_count: 0,
313 };
314
315 if config.enable_detailed_logging {
316 println!(" 执行节点: {} - {}", node.id, node.name);
317 }
318
319 if let Some(condition) = &node.condition {
321 if config.enable_detailed_logging {
322 println!(" 检查节点条件: {condition}");
323 }
324 let condition_met = true;
326 if !condition_met {
327 if config.enable_detailed_logging {
328 println!(" 跳过节点 {} (条件不满足)", node.name);
329 }
330 result.end_time = Some(Instant::now());
331 result.duration = start_time.elapsed();
332 return Ok(result);
333 }
334 }
335
336 let max_retries = node
338 .retry_config
339 .as_ref()
340 .map(|c| c.max_retries)
341 .unwrap_or(0);
342 let mut retries = 0;
343
344 loop {
345 let execute_result =
346 Self::execute_node_action(node, context.clone(), config).await;
347
348 match execute_result {
349 Ok(()) => {
350 result.success = true;
351 break;
352 }
353 Err(e) => {
354 if retries < max_retries {
355 retries += 1;
356 result.retry_count = retries;
357
358 if config.enable_detailed_logging {
359 println!(
360 " 重试节点 {} ({}/{})",
361 node.name, retries, max_retries
362 );
363 }
364
365 if let Some(retry_config) = &node.retry_config {
366 let delay = match retry_config.strategy {
367 RetryStrategy::Fixed => retry_config.delay,
368 RetryStrategy::Exponential { multiplier } => {
369 (retry_config.delay as f64
370 * multiplier.powi(retries as i32))
371 as u64
372 }
373 RetryStrategy::Linear { increment } => {
374 retry_config.delay
375 + (increment * retries as u64)
376 }
377 };
378
379 tokio::time::sleep(Duration::from_millis(delay))
380 .await;
381 }
382 continue;
383 } else {
384 result.success = false;
385 result.error_message = Some(e.to_string());
386 break;
387 }
388 }
389 }
390 }
391
392 result.end_time = Some(Instant::now());
393 result.duration = start_time.elapsed();
394
395 Ok(result)
396 }
397
398 async fn execute_node_action(
400 node: &ExecutionNode,
401 context: SharedContext,
402 config: &ExecutorConfig,
403 ) -> Result<()> {
404 let action_spec = &node.action_spec;
405
406 let timeout_duration = node
408 .timeout_config
409 .as_ref()
410 .map(|c| Duration::from_millis(c.duration))
411 .unwrap_or_else(|| Duration::from_millis(config.default_timeout));
412
413 let action_future = Self::execute_action_by_type(action_spec, context);
414
415 match tokio::time::timeout(timeout_duration, action_future).await {
416 Ok(result) => result,
417 Err(_) => {
418 if config.enable_detailed_logging {
419 println!(" 节点 {} 执行超时", node.name);
420 }
421 Err(anyhow::anyhow!("节点 {} 执行超时", node.name))
422 }
423 }
424 }
425
426 async fn execute_action_by_type(
428 action_spec: &ActionSpec,
429 context: SharedContext,
430 ) -> Result<()> {
431 match action_spec.action_type.as_str() {
432 "builtin" => {
433 tokio::time::sleep(Duration::from_millis(100)).await;
435 println!(" 执行内置动作");
436 }
437 "cmd" => {
438 tokio::time::sleep(Duration::from_millis(200)).await;
440 println!(" 执行命令动作");
441 }
442 "http" => {
443 tokio::time::sleep(Duration::from_millis(300)).await;
445 println!(" 执行HTTP动作");
446 }
447 "wasm" => {
448 tokio::time::sleep(Duration::from_millis(150)).await;
450 println!(" 执行WASM动作");
451 }
452 _ => {
453 return Err(anyhow::anyhow!(
454 "不支持的动作类型: {}",
455 action_spec.action_type
456 ));
457 }
458 }
459
460 for (key, value) in &action_spec.outputs {
462 let mut guard = context.lock().await;
463 guard.set_variable(key.clone(), format!("{value:?}"));
464 }
465
466 Ok(())
467 }
468
469 async fn setup_context(
471 &self,
472 plan: &ExecutionPlan,
473 context: SharedContext,
474 ) -> Result<()> {
475 let mut guard = context.lock().await;
476
477 for (key, value) in &plan.env_vars {
479 guard.set_variable(format!("env.{key}"), format!("{value:?}"));
480 }
481
482 for (key, value) in &plan.flow_vars {
484 guard.set_variable(format!("flow.{key}"), format!("{value:?}"));
485 }
486
487 Ok(())
488 }
489
490 fn update_stats(&mut self, result: &ExecutionResult) {
492 self.stats.total_execution_time = result.total_duration;
493
494 for phase_result in &result.phase_results {
495 for node_result in &phase_result.node_results {
496 self.stats.total_tasks += 1;
497 if node_result.success {
498 self.stats.successful_tasks += 1;
499 } else {
500 self.stats.failed_tasks += 1;
501 }
502 }
503 }
504
505 if self.stats.total_tasks > 0 {
506 self.stats.average_execution_time = Duration::from_nanos(
507 self.stats.total_execution_time.as_nanos() as u64
508 / self.stats.total_tasks as u64,
509 );
510 }
511 }
512
513 pub fn get_stats(&self) -> &ExecutionStats {
515 &self.stats
516 }
517}
518
519impl Executor for EnhancedTaskExecutor {
520 type Input = (ExecutionPlan, SharedContext);
521 type Output = ExecutionResult;
522 type Error = anyhow::Error;
523
524 async fn execute(
525 &mut self,
526 input: Self::Input,
527 ) -> Result<Self::Output, Self::Error> {
528 let (plan, context) = input;
529 self.execute_plan(plan, context).await
530 }
531
532 fn status(&self) -> ExecutorStatus {
533 self.status.clone()
534 }
535
536 async fn stop(&mut self) -> Result<(), Self::Error> {
537 self.status = ExecutorStatus::Stopped;
538 Ok(())
539 }
540}
541
542#[derive(Debug, Clone)]
544pub struct ExecutionResult {
545 pub plan_id: String,
547 pub start_time: Instant,
549 pub end_time: Option<Instant>,
551 pub phase_results: Vec<PhaseResult>,
553 pub total_duration: Duration,
555 pub success: bool,
557 pub error_message: Option<String>,
559}
560
561#[derive(Debug, Clone)]
563pub struct PhaseResult {
564 pub phase_id: String,
566 pub phase_name: String,
568 pub start_time: Instant,
570 pub end_time: Option<Instant>,
572 pub duration: Duration,
574 pub success: bool,
576 pub error_message: Option<String>,
578 pub node_results: Vec<NodeResult>,
580}
581
582#[derive(Debug, Clone)]
584pub struct NodeResult {
585 pub node_id: String,
587 pub node_name: String,
589 pub start_time: Instant,
591 pub end_time: Option<Instant>,
593 pub duration: Duration,
595 pub success: bool,
597 pub error_message: Option<String>,
599 pub retry_count: u32,
601}
602
603#[cfg(test)]
604mod tests {
605 use super::*;
606 use flowbuilder_core::{ActionSpec, ExecutionNode};
607 use std::collections::HashMap;
608
609 #[tokio::test]
610 async fn test_executor_creation() {
611 let executor = EnhancedTaskExecutor::new();
612 assert_eq!(executor.status(), ExecutorStatus::Idle);
613 }
614
615 #[tokio::test]
616 async fn test_node_execution() {
617 let config = ExecutorConfig::default();
618 let context = Arc::new(tokio::sync::Mutex::new(
619 flowbuilder_context::FlowContext::default(),
620 ));
621
622 let node = ExecutionNode::new(
623 "test_node".to_string(),
624 "Test Node".to_string(),
625 ActionSpec {
626 action_type: "builtin".to_string(),
627 parameters: HashMap::new(),
628 outputs: HashMap::new(),
629 },
630 );
631
632 let result =
633 EnhancedTaskExecutor::execute_node_static(&node, context, &config)
634 .await;
635 assert!(result.is_ok());
636
637 let node_result = result.unwrap();
638 assert!(node_result.success);
639 assert_eq!(node_result.node_id, "test_node");
640 }
641}