enact_core/kernel/execution_strategy.rs
1//! Execution Strategy - Pure scheduling logic for parallel execution
2//!
3//! This module provides **pure scheduling logic** for coordinating parallel execution
4//! (fan-out, fan-in). It does NOT define execution semantics - those are in `flow/`.
5//!
6//! ## Key Distinctions
7//!
8//! | Module | Responsibility |
9//! |--------|----------------|
10//! | `flow/parallel.rs` | **Execution semantics** - defines HOW parallel execution works |
11//! | `kernel/execution_strategy.rs` | **Scheduling logic** - coordinates fan-out/fan-in |
12//! | `kernel/reducer.rs` | **State transitions** - what actually happened |
13//!
14//! ## What This Module Does
15//!
16//! - Coordinates parallel execution of callables
17//! - Manages fan-out (spawn parallel tasks)
18//! - Manages fan-in (collect results)
19//! - Emits events for parallel execution
20//!
21//! ## What This Module Does NOT Do
22//!
23//! - Define execution semantics (that's `flow/parallel.rs`)
24//! - Make policy decisions (that's `policy/`)
25//! - Mutate execution state (that's `reducer.rs`)
26//! - Decide when to parallelize (that's `flow/`)
27//!
28//! ## Architecture
29//!
30//! This is **pluggable scheduling logic**. The kernel uses this for coordination,
31//! but the semantics of parallel execution come from flow primitives.
32//!
33//! ```text
34//! Flow decides: "Execute these in parallel"
35//! ↓
36//! execution_strategy coordinates: "Spawn tasks, collect results"
37//! ↓
38//! reducer records: "What happened"
39//! ```
40
41use super::error::ExecutionError;
42use super::ids::{ExecutionId, StepId};
43use crate::callable::Callable;
44use crate::streaming::{EventEmitter, StreamEvent};
45use std::sync::Arc;
46
47/// Result of a parallel execution branch
48#[derive(Debug)]
49pub struct ParallelResult {
50 pub agent_name: String,
51 pub execution_id: ExecutionId,
52 pub output: Result<String, String>,
53}
54
55/// Run multiple callables in parallel
56pub async fn run_parallel<A: Callable + 'static>(
57 agents: Vec<Arc<A>>,
58 input: &str,
59 emitter: &EventEmitter,
60) -> Vec<ParallelResult> {
61 let input = input.to_string();
62
63 let handles: Vec<_> = agents
64 .into_iter()
65 .map(|agent| {
66 let input = input.clone();
67 let execution_id = ExecutionId::new();
68 let agent_name = agent.name().to_string();
69
70 tokio::spawn(async move {
71 let result = agent.run(&input).await;
72 ParallelResult {
73 agent_name,
74 execution_id,
75 output: result.map_err(|e| e.to_string()),
76 }
77 })
78 })
79 .collect();
80
81 let mut results = Vec::new();
82 for handle in handles {
83 match handle.await {
84 Ok(result) => {
85 let step_id = StepId::new();
86 let output_str = result
87 .output
88 .clone()
89 .unwrap_or_else(|e| format!("Error: {}", e));
90
91 emitter.emit(StreamEvent::step_end(
92 &result.execution_id,
93 &step_id,
94 Some(output_str),
95 0, // Duration not tracked in parallel executor
96 ));
97 results.push(result);
98 }
99 Err(e) => {
100 let execution_id = ExecutionId::new();
101 let error = ExecutionError::kernel_internal(e.to_string());
102 emitter.emit(StreamEvent::execution_failed(&execution_id, error.clone()));
103 results.push(ParallelResult {
104 agent_name: "unknown".to_string(),
105 execution_id,
106 output: Err(e.to_string()),
107 });
108 }
109 }
110 }
111
112 results
113}