Skip to main content

scud/backend/
cli.rs

1//! CLI subprocess backend.
2//!
3//! Wraps the existing headless CLI spawning infrastructure (Claude Code,
4//! OpenCode, Cursor) behind the [`AgentBackend`] trait.
5
6use anyhow::Result;
7use async_trait::async_trait;
8use tokio::sync::mpsc;
9use tokio_util::sync::CancellationToken;
10
11use super::{
12    AgentBackend, AgentEvent, AgentHandle, AgentRequest, AgentResult, AgentStatus, ToolCallRecord,
13};
14use crate::commands::spawn::headless::events::StreamEventKind;
15use crate::commands::spawn::headless::runner::create_runner;
16use crate::commands::spawn::terminal::Harness;
17
18/// Backend that spawns a CLI subprocess (Claude Code, OpenCode, Cursor).
19pub struct CliBackend {
20    harness: Harness,
21}
22
23fn reconcile_completion_status(stream_success: bool, process_ok: bool) -> AgentStatus {
24    if stream_success && process_ok {
25        AgentStatus::Completed
26    } else if process_ok {
27        AgentStatus::Failed("Agent reported failure".into())
28    } else {
29        AgentStatus::Failed("Agent process exited with non-zero status".into())
30    }
31}
32
33fn status_for_stream_end(process_ok: bool) -> AgentStatus {
34    if process_ok {
35        AgentStatus::Completed
36    } else {
37        AgentStatus::Failed("Agent process exited without completion event".into())
38    }
39}
40
41impl CliBackend {
42    /// Create a new CLI backend for the specified harness.
43    pub fn new(harness: Harness) -> Result<Self> {
44        // Validate that the binary exists early
45        let _ = create_runner(harness.clone())?;
46        Ok(Self { harness })
47    }
48}
49
50#[async_trait]
51impl AgentBackend for CliBackend {
52    async fn execute(&self, req: AgentRequest) -> Result<AgentHandle> {
53        let runner = create_runner(self.harness.clone())?;
54        let session = runner
55            .start("agent", &req.prompt, &req.working_dir, req.model.as_deref())
56            .await?;
57
58        // Destructure session to own both the events receiver and the process handle
59        let (mut stream_events, mut session_process) = session.into_parts();
60
61        let (tx, rx) = mpsc::channel(1000);
62        let cancel = CancellationToken::new();
63        let cancel_clone = cancel.clone();
64
65        // Bridge StreamEvent -> AgentEvent
66        tokio::spawn(async move {
67            let mut text_parts = Vec::new();
68            let mut tool_calls = Vec::new();
69
70            loop {
71                tokio::select! {
72                    _ = cancel_clone.cancelled() => {
73                        // Kill the subprocess on cancellation
74                        let _ = session_process.kill();
75                        let _ = tx.send(AgentEvent::Complete(AgentResult {
76                            text: text_parts.join(""),
77                            status: AgentStatus::Cancelled,
78                            tool_calls,
79                            usage: None,
80                        })).await;
81                        break;
82                    }
83                    event = stream_events.recv() => {
84                        match event {
85                            Some(stream_event) => {
86                                let agent_event = match &stream_event.kind {
87                                    StreamEventKind::TextDelta { text } => {
88                                        text_parts.push(text.clone());
89                                        AgentEvent::TextDelta(text.clone())
90                                    }
91                                    StreamEventKind::ToolStart { tool_name, tool_id, .. } => {
92                                        tool_calls.push(ToolCallRecord {
93                                            id: tool_id.clone(),
94                                            name: tool_name.clone(),
95                                            output: String::new(),
96                                        });
97                                        AgentEvent::ToolCallStart {
98                                            id: tool_id.clone(),
99                                            name: tool_name.clone(),
100                                        }
101                                    }
102                                    StreamEventKind::ToolResult { tool_id, success, .. } => {
103                                        if let Some(record) = tool_calls.iter_mut().find(|r| r.id == *tool_id) {
104                                            record.output = if *success { "ok".into() } else { "error".into() };
105                                        }
106                                        AgentEvent::ToolCallEnd {
107                                            id: tool_id.clone(),
108                                            output: if *success { "ok".into() } else { "error".into() },
109                                        }
110                                    }
111                                    StreamEventKind::Complete { success } => {
112                                        let process_ok = session_process.wait().await.unwrap_or(false);
113                                        let status = reconcile_completion_status(*success, process_ok);
114                                        let _ = tx.send(AgentEvent::Complete(AgentResult {
115                                            text: text_parts.join(""),
116                                            status,
117                                            tool_calls: tool_calls.clone(),
118                                            usage: None,
119                                        })).await;
120                                        break;
121                                    }
122                                    StreamEventKind::Error { message } => {
123                                        AgentEvent::Error(message.clone())
124                                    }
125                                    StreamEventKind::SessionAssigned { .. } => continue,
126                                };
127                                if tx.send(agent_event).await.is_err() {
128                                    break;
129                                }
130                            }
131                            None => {
132                                // Stream ended without explicit completion: use process exit status.
133                                let process_ok = session_process.wait().await.unwrap_or(false);
134                                let _ = tx.send(AgentEvent::Complete(AgentResult {
135                                    text: text_parts.join(""),
136                                    status: status_for_stream_end(process_ok),
137                                    tool_calls,
138                                    usage: None,
139                                })).await;
140                                break;
141                            }
142                        }
143                    }
144                }
145            }
146            // Keep session_process alive until bridge completes
147            drop(session_process);
148        });
149
150        Ok(AgentHandle { events: rx, cancel })
151    }
152}
153
154#[cfg(test)]
155mod tests {
156    use super::*;
157
158    #[test]
159    fn reconcile_completion_status_prefers_process_exit() {
160        assert!(matches!(
161            reconcile_completion_status(true, true),
162            AgentStatus::Completed
163        ));
164        assert!(matches!(
165            reconcile_completion_status(true, false),
166            AgentStatus::Failed(msg) if msg.contains("non-zero")
167        ));
168        assert!(matches!(
169            reconcile_completion_status(false, true),
170            AgentStatus::Failed(msg) if msg.contains("reported failure")
171        ));
172    }
173
174    #[test]
175    fn status_for_stream_end_uses_process_result() {
176        assert!(matches!(
177            status_for_stream_end(true),
178            AgentStatus::Completed
179        ));
180        assert!(matches!(
181            status_for_stream_end(false),
182            AgentStatus::Failed(msg) if msg.contains("without completion event")
183        ));
184    }
185}