missiond_runner/
runner.rs1use crate::types::*;
6use std::process::Stdio;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::Arc;
9use tokio::io::{AsyncBufReadExt, BufReader};
10use tokio::process::{Child, Command};
11use tokio::sync::Mutex;
12use tracing::{debug, warn};
13
14pub struct ClaudeRunner {
18 process: Arc<Mutex<Option<Child>>>,
20 cancelled: Arc<AtomicBool>,
22}
23
24impl Default for ClaudeRunner {
25 fn default() -> Self {
26 Self::new()
27 }
28}
29
30impl ClaudeRunner {
31 pub fn new() -> Self {
33 Self {
34 process: Arc::new(Mutex::new(None)),
35 cancelled: Arc::new(AtomicBool::new(false)),
36 }
37 }
38
39 pub async fn run(&self, options: RunOptions) -> Result<RunResult, RunnerError> {
41 let RunOptions {
42 prompt,
43 cwd,
44 session_id,
45 mcp_config,
46 timeout,
47 on_progress,
48 } = options;
49
50 self.cancelled.store(false, Ordering::SeqCst);
52
53 let mut args = vec![
55 "--print".to_string(),
56 prompt,
57 "--output-format".to_string(),
58 "stream-json".to_string(),
59 "--verbose".to_string(),
60 ];
61
62 if let Some(sid) = session_id {
63 args.push("--resume".to_string());
64 args.push(sid);
65 }
66
67 if let Some(mcp) = mcp_config {
68 args.push("--mcp-config".to_string());
69 args.push(mcp.to_string_lossy().to_string());
70 }
71
72 let mut cmd = Command::new("claude");
74 cmd.args(&args)
75 .stdin(Stdio::null())
76 .stdout(Stdio::piped())
77 .stderr(Stdio::piped())
78 .kill_on_drop(true);
79
80 if let Some(dir) = cwd {
81 cmd.current_dir(dir);
82 }
83
84 debug!(?args, "Starting Claude CLI");
85
86 let mut child = cmd.spawn()?;
88
89 let stdout = child
91 .stdout
92 .take()
93 .ok_or_else(|| RunnerError::CliFailed("Failed to capture stdout".into()))?;
94 let stderr = child
95 .stderr
96 .take()
97 .ok_or_else(|| RunnerError::CliFailed("Failed to capture stderr".into()))?;
98
99 *self.process.lock().await = Some(child);
101
102 let mut result_event: Option<ResultEvent> = None;
104 let mut errors: Vec<String> = Vec::new();
105
106 let mut stdout_reader = BufReader::new(stdout).lines();
108 let mut stderr_reader = BufReader::new(stderr);
109
110 let errors_handle = {
112 let errors = Arc::new(Mutex::new(errors.clone()));
113 let errors_ref = errors.clone();
114 tokio::spawn(async move {
115 let mut buf = String::new();
116 loop {
117 buf.clear();
118 match tokio::io::AsyncBufReadExt::read_line(&mut stderr_reader, &mut buf).await {
119 Ok(0) => break,
120 Ok(_) => {
121 let line = buf.trim().to_string();
122 if !line.is_empty() {
123 errors_ref.lock().await.push(line);
124 }
125 }
126 Err(_) => break,
127 }
128 }
129 errors_ref.lock().await.clone()
130 })
131 };
132
133 let cancelled = self.cancelled.clone();
135 let process_result = tokio::time::timeout(timeout, async {
136 while let Some(line) = stdout_reader.next_line().await? {
137 if cancelled.load(Ordering::SeqCst) {
139 return Err(RunnerError::Cancelled);
140 }
141
142 if line.trim().is_empty() {
143 continue;
144 }
145
146 match serde_json::from_str::<serde_json::Value>(&line) {
148 Ok(value) => {
149 if let Some(event_type) = value.get("type").and_then(|v| v.as_str()) {
151 match event_type {
152 "result" => {
153 if let Ok(evt) = serde_json::from_value::<ResultEvent>(value.clone()) {
154 result_event = Some(evt.clone());
155 if let Some(ref cb) = on_progress {
156 cb(StreamEvent::Result(evt));
157 }
158 }
159 }
160 "assistant" => {
161 if let Ok(evt) = serde_json::from_value::<AssistantEvent>(value.clone()) {
162 if let Some(ref cb) = on_progress {
163 cb(StreamEvent::Assistant(evt));
164 }
165 }
166 }
167 "user" => {
168 if let Ok(evt) = serde_json::from_value::<UserEvent>(value.clone()) {
169 if let Some(ref cb) = on_progress {
170 cb(StreamEvent::User(evt));
171 }
172 }
173 }
174 "system" => {
175 if let Ok(evt) = serde_json::from_value::<SystemEvent>(value.clone()) {
176 if let Some(ref cb) = on_progress {
177 cb(StreamEvent::System(evt));
178 }
179 }
180 }
181 _ => {
182 debug!(?event_type, "Unknown event type");
183 }
184 }
185 }
186 }
187 Err(e) => {
188 debug!(?line, ?e, "Non-JSON line from Claude CLI");
190 }
191 }
192 }
193 Ok::<_, RunnerError>(())
194 })
195 .await;
196
197 errors = errors_handle.await.unwrap_or_default();
199
200 let mut proc_guard = self.process.lock().await;
202 if let Some(mut child) = proc_guard.take() {
203 let _ = child.wait().await;
204 }
205
206 match process_result {
208 Err(_) => {
209 return Err(RunnerError::Timeout(timeout));
210 }
211 Ok(Err(e)) => {
212 return Err(e);
213 }
214 Ok(Ok(())) => {}
215 }
216
217 let result_event = result_event.ok_or_else(|| {
219 RunnerError::NoResult(errors.join("\n"))
220 })?;
221
222 Ok(RunResult {
224 session_id: result_event.session_id,
225 result: result_event.result,
226 is_error: result_event.is_error,
227 duration_ms: result_event.duration_ms,
228 duration_api_ms: result_event.duration_api_ms,
229 num_turns: result_event.num_turns,
230 total_cost_usd: result_event.total_cost_usd,
231 usage: result_event.usage.map(|u| Usage {
232 input_tokens: u.input_tokens,
233 output_tokens: u.output_tokens,
234 }),
235 })
236 }
237
238 pub async fn cancel(&self) {
240 self.cancelled.store(true, Ordering::SeqCst);
242
243 let mut proc = self.process.lock().await;
245 if let Some(ref mut child) = *proc {
246 if let Err(e) = child.kill().await {
248 warn!(?e, "Failed to kill Claude CLI process");
249 }
250 }
251 }
252
253 pub async fn is_running(&self) -> bool {
255 let proc = self.process.lock().await;
256 proc.is_some()
257 }
258}
259
260#[cfg(test)]
261mod tests {
262 use super::*;
263
264 #[test]
265 fn test_parse_result_event() {
266 let json = r#"{
267 "type": "result",
268 "subtype": "success",
269 "session_id": "abc123",
270 "result": "Hello, world!",
271 "is_error": false,
272 "duration_ms": 1000,
273 "duration_api_ms": 800,
274 "num_turns": 1,
275 "total_cost_usd": 0.001,
276 "usage": {
277 "input_tokens": 100,
278 "output_tokens": 50
279 }
280 }"#;
281
282 let event: ResultEvent = serde_json::from_str(json).unwrap();
283 assert_eq!(event.session_id, "abc123");
284 assert_eq!(event.result, "Hello, world!");
285 assert!(!event.is_error);
286 assert_eq!(event.num_turns, 1);
287 }
288
289 #[test]
290 fn test_parse_assistant_event() {
291 let json = r#"{
292 "type": "assistant",
293 "message": {
294 "content": [
295 {"type": "text", "text": "Hello!"},
296 {"type": "tool_use", "id": "tool1", "name": "bash", "input": {"command": "ls"}}
297 ]
298 }
299 }"#;
300
301 let value: serde_json::Value = serde_json::from_str(json).unwrap();
302 let event: AssistantEvent = serde_json::from_value(value).unwrap();
303 assert_eq!(event.message.content.len(), 2);
304 }
305}