scud/backend/cli.rs
1//! CLI subprocess backend.
2//!
3//! Wraps the existing headless CLI spawning infrastructure (Claude Code,
4//! OpenCode, Cursor) behind the [`AgentBackend`] trait.
5
6use anyhow::Result;
7use async_trait::async_trait;
8use tokio::sync::mpsc;
9use tokio_util::sync::CancellationToken;
10
11use super::{
12 AgentBackend, AgentEvent, AgentHandle, AgentRequest, AgentResult, AgentStatus, ToolCallRecord,
13};
14use crate::commands::spawn::headless::events::StreamEventKind;
15use crate::commands::spawn::headless::runner::create_runner;
16use crate::commands::spawn::terminal::Harness;
17
18/// Backend that spawns a CLI subprocess (Claude Code, OpenCode, Cursor).
19pub struct CliBackend {
20 harness: Harness,
21}
22
23impl CliBackend {
24 /// Create a new CLI backend for the specified harness.
25 pub fn new(harness: Harness) -> Result<Self> {
26 // Validate that the binary exists early
27 let _ = create_runner(harness.clone())?;
28 Ok(Self { harness })
29 }
30}
31
32#[async_trait]
33impl AgentBackend for CliBackend {
34 async fn execute(&self, req: AgentRequest) -> Result<AgentHandle> {
35 let runner = create_runner(self.harness.clone())?;
36 let session = runner
37 .start(
38 "agent",
39 &req.prompt,
40 &req.working_dir,
41 req.model.as_deref(),
42 )
43 .await?;
44
45 // Destructure session to own both the events receiver and the process handle
46 let (mut stream_events, mut session_process) = session.into_parts();
47
48 let (tx, rx) = mpsc::channel(1000);
49 let cancel = CancellationToken::new();
50 let cancel_clone = cancel.clone();
51
52 // Bridge StreamEvent -> AgentEvent
53 tokio::spawn(async move {
54 let mut text_parts = Vec::new();
55 let mut tool_calls = Vec::new();
56
57 loop {
58 tokio::select! {
59 _ = cancel_clone.cancelled() => {
60 // Kill the subprocess on cancellation
61 let _ = session_process.kill();
62 let _ = tx.send(AgentEvent::Complete(AgentResult {
63 text: text_parts.join(""),
64 status: AgentStatus::Cancelled,
65 tool_calls,
66 usage: None,
67 })).await;
68 break;
69 }
70 event = stream_events.recv() => {
71 match event {
72 Some(stream_event) => {
73 let agent_event = match &stream_event.kind {
74 StreamEventKind::TextDelta { text } => {
75 text_parts.push(text.clone());
76 AgentEvent::TextDelta(text.clone())
77 }
78 StreamEventKind::ToolStart { tool_name, tool_id, .. } => {
79 tool_calls.push(ToolCallRecord {
80 id: tool_id.clone(),
81 name: tool_name.clone(),
82 output: String::new(),
83 });
84 AgentEvent::ToolCallStart {
85 id: tool_id.clone(),
86 name: tool_name.clone(),
87 }
88 }
89 StreamEventKind::ToolResult { tool_id, success, .. } => {
90 if let Some(record) = tool_calls.iter_mut().find(|r| r.id == *tool_id) {
91 record.output = if *success { "ok".into() } else { "error".into() };
92 }
93 AgentEvent::ToolCallEnd {
94 id: tool_id.clone(),
95 output: if *success { "ok".into() } else { "error".into() },
96 }
97 }
98 StreamEventKind::Complete { success } => {
99 let status = if *success {
100 AgentStatus::Completed
101 } else {
102 AgentStatus::Failed("Agent reported failure".into())
103 };
104 let _ = tx.send(AgentEvent::Complete(AgentResult {
105 text: text_parts.join(""),
106 status,
107 tool_calls: tool_calls.clone(),
108 usage: None,
109 })).await;
110 break;
111 }
112 StreamEventKind::Error { message } => {
113 AgentEvent::Error(message.clone())
114 }
115 StreamEventKind::SessionAssigned { .. } => continue,
116 };
117 if tx.send(agent_event).await.is_err() {
118 break;
119 }
120 }
121 None => {
122 // Stream ended without Complete event
123 let _ = tx.send(AgentEvent::Complete(AgentResult {
124 text: text_parts.join(""),
125 status: AgentStatus::Completed,
126 tool_calls,
127 usage: None,
128 })).await;
129 break;
130 }
131 }
132 }
133 }
134 }
135 // Keep session_process alive until bridge completes
136 drop(session_process);
137 });
138
139 Ok(AgentHandle { events: rx, cancel })
140 }
141}