use crate::agent::plan::workflow::{
PhaseOutput, REVIEWER_TOOLS, ReviewStep, next_review_step, reviewer_prompt,
};
use crate::agent::runner::{AbortRunnerOnDrop, AgentRunner};
use crate::event::AgentEvent;
use crate::provider::AnyAgent;
pub(crate) struct ActivePlan {
pub plan: String,
pub cycles_left: usize,
}
pub(crate) struct PlanKickoff {
pub impl_prompt: String,
pub active: ActivePlan,
}
pub(crate) enum PlanPhaseEvent {
Progress { text: String, error: bool },
Ready(Box<PlanKickoff>),
Aborted,
}
pub(crate) struct PlanPhaseHandle {
pub rx: tokio::sync::mpsc::Receiver<PlanPhaseEvent>,
pub task: tokio::task::JoinHandle<()>,
}
pub(crate) async fn collect_runner_text(runner: AgentRunner) -> PhaseOutput {
let AgentRunner {
event_rx,
task,
cancel_tx,
..
} = runner;
let _guard = AbortRunnerOnDrop { task, cancel_tx };
let mut rx = event_rx;
let mut text = String::new();
while let Some(event) = rx.recv().await {
match event {
AgentEvent::Token(t) => text.push_str(&t),
AgentEvent::Done { response, .. } => {
if !response.is_empty() {
text = response.to_string();
}
break;
}
AgentEvent::Error(msg) => return Err(msg.to_string()),
_ => {}
}
}
Ok(text)
}
pub(crate) async fn review_once(
agent: &AnyAgent,
plan: &str,
transcript: String,
cycles_left: usize,
) -> Result<ReviewStep, String> {
let runner = agent.spawn_phase_runner(reviewer_prompt(plan), transcript, REVIEWER_TOOLS);
let review = collect_runner_text(runner).await?;
Ok(next_review_step(&review, cycles_left))
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::sync::mpsc;
fn runner_replaying(events: Vec<AgentEvent>) -> AgentRunner {
let (tx, event_rx) = mpsc::channel(events.len().max(1));
for e in events {
tx.try_send(e).expect("test channel sized to fit events");
}
drop(tx); let (interject_tx, _) = mpsc::channel(1);
let (cancel_tx, _) = mpsc::channel(1);
let task = tokio::spawn(async {});
AgentRunner {
event_rx,
task,
interject_tx,
cancel_tx,
}
}
#[tokio::test]
async fn accumulates_streamed_tokens_until_done() {
let runner = runner_replaying(vec![
AgentEvent::Token("hello ".into()),
AgentEvent::Token("world".into()),
AgentEvent::Done {
response: "".into(),
tokens: 0,
cost: 0.0,
},
]);
assert_eq!(collect_runner_text(runner).await.unwrap(), "hello world");
}
#[tokio::test]
async fn prefers_authoritative_done_response() {
let runner = runner_replaying(vec![
AgentEvent::Token("partial".into()),
AgentEvent::Done {
response: "the full final report".into(),
tokens: 10,
cost: 0.01,
},
]);
assert_eq!(
collect_runner_text(runner).await.unwrap(),
"the full final report"
);
}
#[tokio::test]
async fn error_event_surfaces_as_err() {
let runner = runner_replaying(vec![
AgentEvent::Token("some work".into()),
AgentEvent::Error("model exploded".into()),
]);
assert_eq!(
collect_runner_text(runner).await,
Err("model exploded".to_string())
);
}
#[tokio::test]
async fn stream_closed_without_done_returns_what_streamed() {
let runner = runner_replaying(vec![AgentEvent::Token("orphaned".into())]);
assert_eq!(collect_runner_text(runner).await.unwrap(), "orphaned");
}
}