1use anyhow::Result;
7use async_trait::async_trait;
8use tokio::sync::mpsc;
9use tokio_util::sync::CancellationToken;
10
11use super::{
12 AgentBackend, AgentEvent, AgentHandle, AgentRequest, AgentResult, AgentStatus, ToolCallRecord,
13};
14use crate::commands::spawn::headless::events::StreamEventKind;
15use crate::commands::spawn::headless::runner::create_runner;
16use crate::commands::spawn::terminal::Harness;
17
18pub struct CliBackend {
20 harness: Harness,
21}
22
23fn reconcile_completion_status(stream_success: bool, process_ok: bool) -> AgentStatus {
24 if stream_success && process_ok {
25 AgentStatus::Completed
26 } else if process_ok {
27 AgentStatus::Failed("Agent reported failure".into())
28 } else {
29 AgentStatus::Failed("Agent process exited with non-zero status".into())
30 }
31}
32
33fn status_for_stream_end(process_ok: bool) -> AgentStatus {
34 if process_ok {
35 AgentStatus::Completed
36 } else {
37 AgentStatus::Failed("Agent process exited without completion event".into())
38 }
39}
40
41impl CliBackend {
42 pub fn new(harness: Harness) -> Result<Self> {
44 let _ = create_runner(harness.clone())?;
46 Ok(Self { harness })
47 }
48}
49
50#[async_trait]
51impl AgentBackend for CliBackend {
52 async fn execute(&self, req: AgentRequest) -> Result<AgentHandle> {
53 let runner = create_runner(self.harness.clone())?;
54 let session = runner
55 .start("agent", &req.prompt, &req.working_dir, req.model.as_deref())
56 .await?;
57
58 let (mut stream_events, mut session_process) = session.into_parts();
60
61 let (tx, rx) = mpsc::channel(1000);
62 let cancel = CancellationToken::new();
63 let cancel_clone = cancel.clone();
64
65 tokio::spawn(async move {
67 let mut text_parts = Vec::new();
68 let mut tool_calls = Vec::new();
69
70 loop {
71 tokio::select! {
72 _ = cancel_clone.cancelled() => {
73 let _ = session_process.kill();
75 let _ = tx.send(AgentEvent::Complete(AgentResult {
76 text: text_parts.join(""),
77 status: AgentStatus::Cancelled,
78 tool_calls,
79 usage: None,
80 })).await;
81 break;
82 }
83 event = stream_events.recv() => {
84 match event {
85 Some(stream_event) => {
86 let agent_event = match &stream_event.kind {
87 StreamEventKind::TextDelta { text } => {
88 text_parts.push(text.clone());
89 AgentEvent::TextDelta(text.clone())
90 }
91 StreamEventKind::ToolStart { tool_name, tool_id, .. } => {
92 tool_calls.push(ToolCallRecord {
93 id: tool_id.clone(),
94 name: tool_name.clone(),
95 output: String::new(),
96 });
97 AgentEvent::ToolCallStart {
98 id: tool_id.clone(),
99 name: tool_name.clone(),
100 }
101 }
102 StreamEventKind::ToolResult { tool_id, success, .. } => {
103 if let Some(record) = tool_calls.iter_mut().find(|r| r.id == *tool_id) {
104 record.output = if *success { "ok".into() } else { "error".into() };
105 }
106 AgentEvent::ToolCallEnd {
107 id: tool_id.clone(),
108 output: if *success { "ok".into() } else { "error".into() },
109 }
110 }
111 StreamEventKind::Complete { success } => {
112 let process_ok = session_process.wait().await.unwrap_or(false);
113 let status = reconcile_completion_status(*success, process_ok);
114 let _ = tx.send(AgentEvent::Complete(AgentResult {
115 text: text_parts.join(""),
116 status,
117 tool_calls: tool_calls.clone(),
118 usage: None,
119 })).await;
120 break;
121 }
122 StreamEventKind::Error { message } => {
123 AgentEvent::Error(message.clone())
124 }
125 StreamEventKind::SessionAssigned { .. } => continue,
126 };
127 if tx.send(agent_event).await.is_err() {
128 break;
129 }
130 }
131 None => {
132 let process_ok = session_process.wait().await.unwrap_or(false);
134 let _ = tx.send(AgentEvent::Complete(AgentResult {
135 text: text_parts.join(""),
136 status: status_for_stream_end(process_ok),
137 tool_calls,
138 usage: None,
139 })).await;
140 break;
141 }
142 }
143 }
144 }
145 }
146 drop(session_process);
148 });
149
150 Ok(AgentHandle { events: rx, cancel })
151 }
152}
153
154#[cfg(test)]
155mod tests {
156 use super::*;
157
158 #[test]
159 fn reconcile_completion_status_prefers_process_exit() {
160 assert!(matches!(
161 reconcile_completion_status(true, true),
162 AgentStatus::Completed
163 ));
164 assert!(matches!(
165 reconcile_completion_status(true, false),
166 AgentStatus::Failed(msg) if msg.contains("non-zero")
167 ));
168 assert!(matches!(
169 reconcile_completion_status(false, true),
170 AgentStatus::Failed(msg) if msg.contains("reported failure")
171 ));
172 }
173
174 #[test]
175 fn status_for_stream_end_uses_process_result() {
176 assert!(matches!(
177 status_for_stream_end(true),
178 AgentStatus::Completed
179 ));
180 assert!(matches!(
181 status_for_stream_end(false),
182 AgentStatus::Failed(msg) if msg.contains("without completion event")
183 ));
184 }
185}