Skip to main content

enact_core/kernel/
execution_strategy.rs

1//! Execution Strategy - Pure scheduling logic for parallel execution
2//!
3//! This module provides **pure scheduling logic** for coordinating parallel execution
4//! (fan-out, fan-in). It does NOT define execution semantics - those are in `flow/`.
5//!
6//! ## Key Distinctions
7//!
8//! | Module | Responsibility |
9//! |--------|----------------|
10//! | `flow/parallel.rs` | **Execution semantics** - defines HOW parallel execution works |
11//! | `kernel/execution_strategy.rs` | **Scheduling logic** - coordinates fan-out/fan-in |
12//! | `kernel/reducer.rs` | **State transitions** - what actually happened |
13//!
14//! ## What This Module Does
15//!
16//! - Coordinates parallel execution of callables
17//! - Manages fan-out (spawn parallel tasks)
18//! - Manages fan-in (collect results)
19//! - Emits events for parallel execution
20//!
21//! ## What This Module Does NOT Do
22//!
23//! - Define execution semantics (that's `flow/parallel.rs`)
24//! - Make policy decisions (that's `policy/`)
25//! - Mutate execution state (that's `reducer.rs`)
26//! - Decide when to parallelize (that's `flow/`)
27//!
28//! ## Architecture
29//!
30//! This is **pluggable scheduling logic**. The kernel uses this for coordination,
31//! but the semantics of parallel execution come from flow primitives.
32//!
33//! ```text
34//! Flow decides: "Execute these in parallel"
35//!     ↓
36//! execution_strategy coordinates: "Spawn tasks, collect results"
37//!     ↓
38//! reducer records: "What happened"
39//! ```
40
41use super::error::ExecutionError;
42use super::ids::{ExecutionId, StepId};
43use crate::callable::Callable;
44use crate::streaming::{EventEmitter, StreamEvent};
45use std::sync::Arc;
46
47/// Result of a parallel execution branch
48#[derive(Debug)]
49pub struct ParallelResult {
50    pub agent_name: String,
51    pub execution_id: ExecutionId,
52    pub output: Result<String, String>,
53}
54
55/// Run multiple callables in parallel
56pub async fn run_parallel<A: Callable + 'static>(
57    agents: Vec<Arc<A>>,
58    input: &str,
59    emitter: &EventEmitter,
60) -> Vec<ParallelResult> {
61    let input = input.to_string();
62
63    let handles: Vec<_> = agents
64        .into_iter()
65        .map(|agent| {
66            let input = input.clone();
67            let execution_id = ExecutionId::new();
68            let agent_name = agent.name().to_string();
69
70            tokio::spawn(async move {
71                let result = agent.run(&input).await;
72                ParallelResult {
73                    agent_name,
74                    execution_id,
75                    output: result.map_err(|e| e.to_string()),
76                }
77            })
78        })
79        .collect();
80
81    let mut results = Vec::new();
82    for handle in handles {
83        match handle.await {
84            Ok(result) => {
85                let step_id = StepId::new();
86                let output_str = result
87                    .output
88                    .clone()
89                    .unwrap_or_else(|e| format!("Error: {}", e));
90
91                emitter.emit(StreamEvent::step_end(
92                    &result.execution_id,
93                    &step_id,
94                    Some(output_str),
95                    0, // Duration not tracked in parallel executor
96                ));
97                results.push(result);
98            }
99            Err(e) => {
100                let execution_id = ExecutionId::new();
101                let error = ExecutionError::kernel_internal(e.to_string());
102                emitter.emit(StreamEvent::execution_failed(&execution_id, error.clone()));
103                results.push(ParallelResult {
104                    agent_name: "unknown".to_string(),
105                    execution_id,
106                    output: Err(e.to_string()),
107                });
108            }
109        }
110    }
111
112    results
113}