use super::error::ExecutionError;
use super::ids::{ExecutionId, StepId};
use crate::callable::Callable;
use crate::streaming::{EventEmitter, StreamEvent};
use std::sync::Arc;
#[derive(Debug)]
pub struct ParallelResult {
pub agent_name: String,
pub execution_id: ExecutionId,
pub output: Result<String, String>,
}
pub async fn run_parallel<A: Callable + 'static>(
agents: Vec<Arc<A>>,
input: &str,
emitter: &EventEmitter,
) -> Vec<ParallelResult> {
let input = input.to_string();
let handles: Vec<_> = agents
.into_iter()
.map(|agent| {
let input = input.clone();
let execution_id = ExecutionId::new();
let agent_name = agent.name().to_string();
tokio::spawn(async move {
let result = agent.run(&input).await;
ParallelResult {
agent_name,
execution_id,
output: result.map_err(|e| e.to_string()),
}
})
})
.collect();
let mut results = Vec::new();
for handle in handles {
match handle.await {
Ok(result) => {
let step_id = StepId::new();
let output_str = result
.output
.clone()
.unwrap_or_else(|e| format!("Error: {}", e));
emitter.emit(StreamEvent::step_end(
&result.execution_id,
&step_id,
Some(output_str),
0, ));
results.push(result);
}
Err(e) => {
let execution_id = ExecutionId::new();
let error = ExecutionError::kernel_internal(e.to_string());
emitter.emit(StreamEvent::execution_failed(&execution_id, error.clone()));
results.push(ParallelResult {
agent_name: "unknown".to_string(),
execution_id,
output: Err(e.to_string()),
});
}
}
}
results
}