use anyhow::{Context, Result};
use std::path::Path;
use std::sync::Arc;
use std::time::Instant;
use super::AgentOutput;
use super::step::Step;
use crate::runtime::graph::AgentGraph;
use crate::runtime::llm::LlmClient;
use crate::runtime::model_registry::ModelRegistry;
use crate::runtime::tracer::{BufferedTracer, TraceCtx, Tracer};
pub async fn run(
graph: &AgentGraph,
registry: &Arc<ModelRegistry>,
agent_dir: &Path,
body: &str,
worker: &str,
uses: &[String],
max_iter: u32,
client: &LlmClient,
input: &str,
tracer: &mut dyn Tracer,
ctx: &TraceCtx,
crumb: &str,
) -> Result<AgentOutput> {
eprintln!(" → scatter: map");
let map_system = build_map_system(body, uses);
let map_ctx = ctx.child();
let map_crumb = format!("{crumb}→map");
let t = Instant::now();
tracer.on_agent_start(
&map_ctx,
"map",
"react",
input,
Some(ctx.span_id.as_str()),
&crate::runtime::tracer::new_node_id(),
);
let map_result = super::react::react_loop(
&map_system,
uses,
max_iter,
client,
input,
tracer,
&map_ctx,
&[],
&map_crumb,
)
.await?;
tracer.on_agent_end(
&map_ctx,
&map_result.key,
&map_result.value,
t.elapsed().as_millis(),
);
if map_result.key != "parallel" {
return Ok(map_result);
}
let items: Vec<String> = serde_json::from_str(&map_result.value).with_context(|| {
format!(
"scatter map phase must finish(key='parallel', value='[\"item1\",...]') — got: {}",
map_result.value
)
})?;
eprintln!(" → scatter: parallel — {} items → '{worker}'", items.len());
let map_span = map_ctx.span_id.clone();
let futs: Vec<_> = items
.iter()
.map(|item| {
let prev = Some(map_span.clone());
async move {
let mut sub_tracer = BufferedTracer::new();
let result = super::run_node(
graph,
worker,
registry,
client,
item,
&mut sub_tracer,
ctx,
crumb,
prev,
)
.await;
(item.as_str(), result, sub_tracer)
}
})
.collect();
let results = futures::future::join_all(futs).await;
let mut combined = String::new();
let mut last_worker_span: Option<String> = None;
for (item, result, sub_tracer) in results {
sub_tracer.flush_into(tracer);
match result {
Ok(r) => {
last_worker_span = Some(r.span_id.clone());
combined.push_str(&format!("[Item: {item}]\n{}\n\n", r.value));
}
Err(e) => combined.push_str(&format!("[Item: {item}]\n[error: {e}]\n\n")),
}
}
eprintln!(" → scatter: reduce");
let reduce_step = Step::from_file(&agent_dir.join("reduce.md"))?;
let reduce_input = format!("Original task: {input}\n\nWorker results:\n{combined}");
let reduce_ctx = ctx.child();
let reduce_crumb = format!("{crumb}→reduce");
let t = Instant::now();
let reduce_prev = last_worker_span.or_else(|| Some(map_span.clone()));
tracer.on_agent_start(
&reduce_ctx,
"reduce",
reduce_step.pattern_name(),
&reduce_input,
reduce_prev.as_deref(),
&crate::runtime::tracer::new_node_id(),
);
let reduce_out = reduce_step.run(&reduce_input, "reduce", registry, client, tracer, &reduce_ctx, &reduce_crumb).await?;
let reduce_result = AgentOutput { key: reduce_out.key, value: reduce_out.value, span_id: String::new() };
tracer.on_agent_end(
&reduce_ctx,
&reduce_result.key,
&reduce_result.value,
t.elapsed().as_millis(),
);
Ok(reduce_result)
}
fn build_map_system(body: &str, _: &[String]) -> String {
let scatter_instruction =
"When you have determined the full list of items to process in parallel, \
call finish(key=\"parallel\", value='[\"item1\",\"item2\",...]') with a JSON array of strings.\n\
If no fan-out is needed, call finish(key=\"done\", value=\"...\") as usual.\n\n";
format!("{scatter_instruction}{body}")
}