enact-core 0.0.2

Core agent runtime for Enact - Graph-Native AI agents
Documentation
//! Execution Strategy - Pure scheduling logic for parallel execution
//!
//! This module provides **pure scheduling logic** for coordinating parallel execution
//! (fan-out, fan-in). It does NOT define execution semantics - those are in `flow/`.
//!
//! ## Key Distinctions
//!
//! | Module | Responsibility |
//! |--------|----------------|
//! | `flow/parallel.rs` | **Execution semantics** - defines HOW parallel execution works |
//! | `kernel/execution_strategy.rs` | **Scheduling logic** - coordinates fan-out/fan-in |
//! | `kernel/reducer.rs` | **State transitions** - what actually happened |
//!
//! ## What This Module Does
//!
//! - Coordinates parallel execution of callables
//! - Manages fan-out (spawn parallel tasks)
//! - Manages fan-in (collect results)
//! - Emits events for parallel execution
//!
//! ## What This Module Does NOT Do
//!
//! - Define execution semantics (that's `flow/parallel.rs`)
//! - Make policy decisions (that's `policy/`)
//! - Mutate execution state (that's `reducer.rs`)
//! - Decide when to parallelize (that's `flow/`)
//!
//! ## Architecture
//!
//! This is **pluggable scheduling logic**. The kernel uses this for coordination,
//! but the semantics of parallel execution come from flow primitives.
//!
//! ```text
//! Flow decides: "Execute these in parallel"
//!//! execution_strategy coordinates: "Spawn tasks, collect results"
//!//! reducer records: "What happened"
//! ```

use super::error::ExecutionError;
use super::ids::{ExecutionId, StepId};
use crate::callable::Callable;
use crate::streaming::{EventEmitter, StreamEvent};
use std::sync::Arc;

/// Result of a parallel execution branch
#[derive(Debug)]
pub struct ParallelResult {
    pub agent_name: String,
    pub execution_id: ExecutionId,
    pub output: Result<String, String>,
}

/// Run multiple callables in parallel
pub async fn run_parallel<A: Callable + 'static>(
    agents: Vec<Arc<A>>,
    input: &str,
    emitter: &EventEmitter,
) -> Vec<ParallelResult> {
    let input = input.to_string();

    let handles: Vec<_> = agents
        .into_iter()
        .map(|agent| {
            let input = input.clone();
            let execution_id = ExecutionId::new();
            let agent_name = agent.name().to_string();

            tokio::spawn(async move {
                let result = agent.run(&input).await;
                ParallelResult {
                    agent_name,
                    execution_id,
                    output: result.map_err(|e| e.to_string()),
                }
            })
        })
        .collect();

    let mut results = Vec::new();
    for handle in handles {
        match handle.await {
            Ok(result) => {
                let step_id = StepId::new();
                let output_str = result
                    .output
                    .clone()
                    .unwrap_or_else(|e| format!("Error: {}", e));

                emitter.emit(StreamEvent::step_end(
                    &result.execution_id,
                    &step_id,
                    Some(output_str),
                    0, // Duration not tracked in parallel executor
                ));
                results.push(result);
            }
            Err(e) => {
                let execution_id = ExecutionId::new();
                let error = ExecutionError::kernel_internal(e.to_string());
                emitter.emit(StreamEvent::execution_failed(&execution_id, error.clone()));
                results.push(ParallelResult {
                    agent_name: "unknown".to_string(),
                    execution_id,
                    output: Err(e.to_string()),
                });
            }
        }
    }

    results
}