Skip to main content

missiond_runner/
runner.rs

1//! Claude CLI Runner implementation
2//!
3//! Wraps the `claude` CLI with stream-json output parsing.
4
5use crate::types::*;
6use std::process::Stdio;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::Arc;
9use tokio::io::{AsyncBufReadExt, BufReader};
10use tokio::process::{Child, Command};
11use tokio::sync::Mutex;
12use tracing::{debug, warn};
13
14/// Claude Code CLI Runner
15///
16/// Wraps `claude --print --output-format stream-json` execution.
17pub struct ClaudeRunner {
18    /// Running process handle
19    process: Arc<Mutex<Option<Child>>>,
20    /// Cancellation flag
21    cancelled: Arc<AtomicBool>,
22}
23
24impl Default for ClaudeRunner {
25    fn default() -> Self {
26        Self::new()
27    }
28}
29
30impl ClaudeRunner {
31    /// Create a new runner
32    pub fn new() -> Self {
33        Self {
34            process: Arc::new(Mutex::new(None)),
35            cancelled: Arc::new(AtomicBool::new(false)),
36        }
37    }
38
39    /// Execute Claude CLI
40    pub async fn run(&self, options: RunOptions) -> Result<RunResult, RunnerError> {
41        let RunOptions {
42            prompt,
43            cwd,
44            session_id,
45            mcp_config,
46            timeout,
47            on_progress,
48        } = options;
49
50        // Reset cancellation flag
51        self.cancelled.store(false, Ordering::SeqCst);
52
53        // Build command arguments
54        let mut args = vec![
55            "--print".to_string(),
56            prompt,
57            "--output-format".to_string(),
58            "stream-json".to_string(),
59            "--verbose".to_string(),
60        ];
61
62        if let Some(sid) = session_id {
63            args.push("--resume".to_string());
64            args.push(sid);
65        }
66
67        if let Some(mcp) = mcp_config {
68            args.push("--mcp-config".to_string());
69            args.push(mcp.to_string_lossy().to_string());
70        }
71
72        // Build command
73        let mut cmd = Command::new("claude");
74        cmd.args(&args)
75            .stdin(Stdio::null())
76            .stdout(Stdio::piped())
77            .stderr(Stdio::piped())
78            .kill_on_drop(true);
79
80        if let Some(dir) = cwd {
81            cmd.current_dir(dir);
82        }
83
84        debug!(?args, "Starting Claude CLI");
85
86        // Spawn process
87        let mut child = cmd.spawn()?;
88
89        // Get stdout/stderr handles
90        let stdout = child
91            .stdout
92            .take()
93            .ok_or_else(|| RunnerError::CliFailed("Failed to capture stdout".into()))?;
94        let stderr = child
95            .stderr
96            .take()
97            .ok_or_else(|| RunnerError::CliFailed("Failed to capture stderr".into()))?;
98
99        // Store child for potential cancellation
100        *self.process.lock().await = Some(child);
101
102        // Parse stream-json output
103        let mut result_event: Option<ResultEvent> = None;
104        let mut errors: Vec<String> = Vec::new();
105
106        // Read stdout lines
107        let mut stdout_reader = BufReader::new(stdout).lines();
108        let mut stderr_reader = BufReader::new(stderr);
109
110        // Spawn stderr reader
111        let errors_handle = {
112            let errors = Arc::new(Mutex::new(errors.clone()));
113            let errors_ref = errors.clone();
114            tokio::spawn(async move {
115                let mut buf = String::new();
116                loop {
117                    buf.clear();
118                    match tokio::io::AsyncBufReadExt::read_line(&mut stderr_reader, &mut buf).await {
119                        Ok(0) => break,
120                        Ok(_) => {
121                            let line = buf.trim().to_string();
122                            if !line.is_empty() {
123                                errors_ref.lock().await.push(line);
124                            }
125                        }
126                        Err(_) => break,
127                    }
128                }
129                errors_ref.lock().await.clone()
130            })
131        };
132
133        // Process stdout with timeout
134        let cancelled = self.cancelled.clone();
135        let process_result = tokio::time::timeout(timeout, async {
136            while let Some(line) = stdout_reader.next_line().await? {
137                // Check cancellation
138                if cancelled.load(Ordering::SeqCst) {
139                    return Err(RunnerError::Cancelled);
140                }
141
142                if line.trim().is_empty() {
143                    continue;
144                }
145
146                // Parse JSON line
147                match serde_json::from_str::<serde_json::Value>(&line) {
148                    Ok(value) => {
149                        // Try to parse as StreamEvent
150                        if let Some(event_type) = value.get("type").and_then(|v| v.as_str()) {
151                            match event_type {
152                                "result" => {
153                                    if let Ok(evt) = serde_json::from_value::<ResultEvent>(value.clone()) {
154                                        result_event = Some(evt.clone());
155                                        if let Some(ref cb) = on_progress {
156                                            cb(StreamEvent::Result(evt));
157                                        }
158                                    }
159                                }
160                                "assistant" => {
161                                    if let Ok(evt) = serde_json::from_value::<AssistantEvent>(value.clone()) {
162                                        if let Some(ref cb) = on_progress {
163                                            cb(StreamEvent::Assistant(evt));
164                                        }
165                                    }
166                                }
167                                "user" => {
168                                    if let Ok(evt) = serde_json::from_value::<UserEvent>(value.clone()) {
169                                        if let Some(ref cb) = on_progress {
170                                            cb(StreamEvent::User(evt));
171                                        }
172                                    }
173                                }
174                                "system" => {
175                                    if let Ok(evt) = serde_json::from_value::<SystemEvent>(value.clone()) {
176                                        if let Some(ref cb) = on_progress {
177                                            cb(StreamEvent::System(evt));
178                                        }
179                                    }
180                                }
181                                _ => {
182                                    debug!(?event_type, "Unknown event type");
183                                }
184                            }
185                        }
186                    }
187                    Err(e) => {
188                        // Non-JSON line, might be debug output
189                        debug!(?line, ?e, "Non-JSON line from Claude CLI");
190                    }
191                }
192            }
193            Ok::<_, RunnerError>(())
194        })
195        .await;
196
197        // Wait for stderr reader
198        errors = errors_handle.await.unwrap_or_default();
199
200        // Wait for process to complete
201        let mut proc_guard = self.process.lock().await;
202        if let Some(mut child) = proc_guard.take() {
203            let _ = child.wait().await;
204        }
205
206        // Handle timeout
207        match process_result {
208            Err(_) => {
209                return Err(RunnerError::Timeout(timeout));
210            }
211            Ok(Err(e)) => {
212                return Err(e);
213            }
214            Ok(Ok(())) => {}
215        }
216
217        // Check for result
218        let result_event = result_event.ok_or_else(|| {
219            RunnerError::NoResult(errors.join("\n"))
220        })?;
221
222        // Convert to RunResult
223        Ok(RunResult {
224            session_id: result_event.session_id,
225            result: result_event.result,
226            is_error: result_event.is_error,
227            duration_ms: result_event.duration_ms,
228            duration_api_ms: result_event.duration_api_ms,
229            num_turns: result_event.num_turns,
230            total_cost_usd: result_event.total_cost_usd,
231            usage: result_event.usage.map(|u| Usage {
232                input_tokens: u.input_tokens,
233                output_tokens: u.output_tokens,
234            }),
235        })
236    }
237
238    /// Cancel the running process
239    pub async fn cancel(&self) {
240        // Set cancellation flag
241        self.cancelled.store(true, Ordering::SeqCst);
242
243        // Try to kill the process
244        let mut proc = self.process.lock().await;
245        if let Some(ref mut child) = *proc {
246            // Send SIGTERM first
247            if let Err(e) = child.kill().await {
248                warn!(?e, "Failed to kill Claude CLI process");
249            }
250        }
251    }
252
253    /// Check if a process is running
254    pub async fn is_running(&self) -> bool {
255        let proc = self.process.lock().await;
256        proc.is_some()
257    }
258}
259
260#[cfg(test)]
261mod tests {
262    use super::*;
263
264    #[test]
265    fn test_parse_result_event() {
266        let json = r#"{
267            "type": "result",
268            "subtype": "success",
269            "session_id": "abc123",
270            "result": "Hello, world!",
271            "is_error": false,
272            "duration_ms": 1000,
273            "duration_api_ms": 800,
274            "num_turns": 1,
275            "total_cost_usd": 0.001,
276            "usage": {
277                "input_tokens": 100,
278                "output_tokens": 50
279            }
280        }"#;
281
282        let event: ResultEvent = serde_json::from_str(json).unwrap();
283        assert_eq!(event.session_id, "abc123");
284        assert_eq!(event.result, "Hello, world!");
285        assert!(!event.is_error);
286        assert_eq!(event.num_turns, 1);
287    }
288
289    #[test]
290    fn test_parse_assistant_event() {
291        let json = r#"{
292            "type": "assistant",
293            "message": {
294                "content": [
295                    {"type": "text", "text": "Hello!"},
296                    {"type": "tool_use", "id": "tool1", "name": "bash", "input": {"command": "ls"}}
297                ]
298            }
299        }"#;
300
301        let value: serde_json::Value = serde_json::from_str(json).unwrap();
302        let event: AssistantEvent = serde_json::from_value(value).unwrap();
303        assert_eq!(event.message.content.len(), 2);
304    }
305}