use crate::agent::plan::workflow::{PhaseOutput, ReviewStep, next_review_step};
use crate::agent::runner::{AbortRunnerOnDrop, AgentRunner};
use crate::event::AgentEvent;
pub(crate) struct ActivePlan {
pub plan: String,
pub cycles_left: usize,
}
pub(crate) struct PlanKickoff {
pub impl_prompt: String,
pub active: ActivePlan,
}
pub(crate) enum PlanPhaseEvent {
Progress { text: String, error: bool },
Ready(Box<PlanKickoff>),
Aborted,
}
pub(crate) struct PlanPhaseHandle {
pub rx: tokio::sync::mpsc::Receiver<PlanPhaseEvent>,
pub task: tokio::task::JoinHandle<()>,
}
pub(crate) async fn collect_runner_text(runner: AgentRunner) -> PhaseOutput {
let AgentRunner {
event_rx,
task,
cancel_tx,
..
} = runner;
let _guard = AbortRunnerOnDrop { task, cancel_tx };
let mut rx = event_rx;
let mut text = String::new();
while let Some(event) = rx.recv().await {
match event {
AgentEvent::Token(t) => text.push_str(&t),
AgentEvent::Done { response, .. } => {
if !response.is_empty() {
text = response.to_string();
}
break;
}
AgentEvent::Error(msg) => return Err(msg.to_string()),
_ => {}
}
}
Ok(text)
}
pub(crate) enum ReviewPhaseEvent {
Done { result: Result<ReviewStep, String> },
}
pub(crate) struct ReviewPhaseHandle {
pub rx: tokio::sync::mpsc::Receiver<ReviewPhaseEvent>,
pub task: tokio::task::JoinHandle<()>,
pub plan: String,
pub cycles_left: usize,
pub response: String,
pub tool_calls: Vec<crate::session::ToolCallEntry>,
}
pub(crate) fn spawn_review(
runner: AgentRunner,
plan: String,
cycles_left: usize,
response: String,
tool_calls: Vec<crate::session::ToolCallEntry>,
) -> ReviewPhaseHandle {
let (tx, rx) = tokio::sync::mpsc::channel::<ReviewPhaseEvent>(1);
let task = tokio::spawn(async move {
let result = collect_runner_text(runner)
.await
.map(|review| next_review_step(&review, cycles_left));
let _ = tx.send(ReviewPhaseEvent::Done { result }).await;
});
ReviewPhaseHandle {
rx,
task,
plan,
cycles_left,
response,
tool_calls,
}
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::sync::mpsc;
fn runner_replaying(events: Vec<AgentEvent>) -> AgentRunner {
let (tx, event_rx) = mpsc::channel(events.len().max(1));
for e in events {
tx.try_send(e).expect("test channel sized to fit events");
}
drop(tx); let (interject_tx, _) = mpsc::channel(1);
let (cancel_tx, _) = mpsc::channel(1);
let task = tokio::spawn(async {});
AgentRunner {
event_rx,
task,
interject_tx,
cancel_tx,
}
}
#[tokio::test]
async fn accumulates_streamed_tokens_until_done() {
let runner = runner_replaying(vec![
AgentEvent::Token("hello ".into()),
AgentEvent::Token("world".into()),
AgentEvent::Done {
response: "".into(),
tokens: 0,
cost: 0.0,
},
]);
assert_eq!(collect_runner_text(runner).await.unwrap(), "hello world");
}
#[tokio::test]
async fn prefers_authoritative_done_response() {
let runner = runner_replaying(vec![
AgentEvent::Token("partial".into()),
AgentEvent::Done {
response: "the full final report".into(),
tokens: 10,
cost: 0.01,
},
]);
assert_eq!(
collect_runner_text(runner).await.unwrap(),
"the full final report"
);
}
#[tokio::test]
async fn error_event_surfaces_as_err() {
let runner = runner_replaying(vec![
AgentEvent::Token("some work".into()),
AgentEvent::Error("model exploded".into()),
]);
assert_eq!(
collect_runner_text(runner).await,
Err("model exploded".to_string())
);
}
#[tokio::test]
async fn stream_closed_without_done_returns_what_streamed() {
let runner = runner_replaying(vec![AgentEvent::Token("orphaned".into())]);
assert_eq!(collect_runner_text(runner).await.unwrap(), "orphaned");
}
#[tokio::test]
async fn spawn_review_emits_approved_step_off_thread() {
let runner = runner_replaying(vec![AgentEvent::Done {
response: "review done\n```json\n{\"verdict\":\"DONE\",\"missing\":\"\"}\n```".into(),
tokens: 0,
cost: 0.0,
}]);
let mut handle = spawn_review(runner, "the plan".to_string(), 3, String::new(), Vec::new());
assert_eq!(handle.plan, "the plan");
assert_eq!(handle.cycles_left, 3);
match handle.rx.recv().await.expect("a terminal event") {
ReviewPhaseEvent::Done { result } => {
assert!(matches!(result, Ok(ReviewStep::Approved)));
}
}
}
#[tokio::test]
async fn spawn_review_surfaces_reviewer_error() {
let runner = runner_replaying(vec![AgentEvent::Error("reviewer exploded".into())]);
let mut handle = spawn_review(runner, "p".to_string(), 1, String::new(), Vec::new());
match handle.rx.recv().await.expect("a terminal event") {
ReviewPhaseEvent::Done { result } => {
assert_eq!(result, Err("reviewer exploded".to_string()));
}
}
}
}