Skip to main content

aprender_mcp/tools/
subprocess.rs

1//! Shared subprocess wrapper for M2/M3 tools.
2//!
3//! Every M2 tool spawns `apr <subcommand> [...args] --json` and passes stdout
4//! through to the MCP client verbatim. Non-zero exit maps to `isError: true`
5//! with stderr attached. This module centralizes that pattern so each tool is
6//! a thin definition + a list of CLI args.
7//!
8//! M3 (FALSIFY-MCP-006) adds [`run_apr_cancellable`], which polls a
9//! [`std::sync::mpsc::Receiver`] between `try_wait` checks and escalates to
10//! SIGTERM → (grace window) → SIGKILL on the spawned subprocess when a
11//! cancellation is signalled. The non-cancellable [`run_apr`] is kept as a
12//! thin wrapper for tools that don't support cancellation yet.
13
14use crate::types::ToolCallResult;
15use std::io::{BufRead, BufReader, Read};
16use std::process::{Command, Stdio};
17use std::sync::mpsc::{Receiver, TryRecvError};
18use std::time::{Duration, Instant};
19
20/// Default grace window between SIGTERM and SIGKILL for cancelled calls.
21///
22/// Per `docs/specifications/apr-mcp-server-spec.md`:
23/// > `notifications/cancelled` from client → kill the spawned `apr`
24/// > subprocess with SIGTERM (30s grace) → SIGKILL.
25pub const CANCEL_GRACE_MS: u64 = 30_000;
26
27/// Poll interval when waiting for subprocess exit / cancel signal.
28const POLL_INTERVAL: Duration = Duration::from_millis(10);
29
30/// Spawn `apr <args...>` and wait synchronously. Shorthand for the
31/// non-cancellable path used by every tool except `apr.run`.
32///
33/// - Successful exit with non-empty stdout → `success(stdout)`
34/// - Successful exit with empty stdout → `error("apr ... produced no output")`
35/// - Non-zero exit → `error("apr ... failed (exit N): <stderr-or-stdout>")`
36/// - Spawn failure → `error("Failed to spawn apr ...: <io-err>")`
37#[must_use]
38pub fn run_apr(args: &[&str]) -> ToolCallResult {
39    let output = match Command::new("apr").args(args).output() {
40        Ok(o) => o,
41        Err(e) => {
42            let cmd = format!("apr {}", args.join(" "));
43            return ToolCallResult::error(format!("Failed to spawn `{cmd}`: {e}"));
44        }
45    };
46
47    let stdout = String::from_utf8_lossy(&output.stdout).into_owned();
48    let stderr = String::from_utf8_lossy(&output.stderr).into_owned();
49
50    if output.status.success() {
51        if stdout.trim().is_empty() {
52            let cmd = format!("apr {}", args.join(" "));
53            ToolCallResult::error(format!("`{cmd}` produced no output"))
54        } else {
55            ToolCallResult::success(stdout)
56        }
57    } else {
58        let code = output.status.code().unwrap_or(-1);
59        let detail = if stderr.trim().is_empty() {
60            stdout
61        } else {
62            stderr
63        };
64        let cmd = format!("apr {}", args.join(" "));
65        ToolCallResult::error(format!("`{cmd}` failed (exit {code}): {detail}"))
66    }
67}
68
69/// Spawn `apr <args...>` cancellable via `cancel_rx`.
70///
71/// On receipt of any value on `cancel_rx`, the subprocess is sent SIGTERM.
72/// If it hasn't exited within `grace_ms` milliseconds, SIGKILL is sent. The
73/// returned `ToolCallResult` carries whatever stdout was captured up to the
74/// point of cancellation and has `is_error: Some(true)` with a message that
75/// starts with `"Cancelled:"`.
76///
77/// Non-Unix targets do NOT support signalling; this function falls back to
78/// `child.kill()` (equivalent to SIGKILL on Windows).
79#[must_use]
80pub fn run_apr_cancellable(
81    args: &[&str],
82    cancel_rx: &Receiver<()>,
83    grace_ms: u64,
84) -> ToolCallResult {
85    spawn_cancellable("apr", args, cancel_rx, grace_ms)
86}
87
88/// Test-visible generic over the binary name. `run_apr_cancellable` is the
89/// `"apr"`-bound wrapper clients should use in production code.
90#[must_use]
91pub fn spawn_cancellable(
92    program: &str,
93    args: &[&str],
94    cancel_rx: &Receiver<()>,
95    grace_ms: u64,
96) -> ToolCallResult {
97    let cmd_display = format!("{program} {}", args.join(" "));
98
99    let mut child = match Command::new(program)
100        .args(args)
101        .stdout(Stdio::piped())
102        .stderr(Stdio::piped())
103        .spawn()
104    {
105        Ok(c) => c,
106        Err(e) => {
107            return ToolCallResult::error(format!("Failed to spawn `{cmd_display}`: {e}"));
108        }
109    };
110
111    let pid = child.id();
112
113    // Poll loop: check if the child exited, then check for a cancel signal.
114    // Sleep `POLL_INTERVAL` between iterations. This keeps cancel latency
115    // under ~10ms while the subprocess is alive.
116    let wait_status = loop {
117        match child.try_wait() {
118            Ok(Some(status)) => break Ok(status),
119            Ok(None) => {}
120            Err(e) => {
121                return ToolCallResult::error(format!("Failed to poll `{cmd_display}`: {e}"));
122            }
123        }
124
125        match cancel_rx.try_recv() {
126            Ok(()) => break Err(CancelReason::Signalled),
127            Err(TryRecvError::Empty) => {}
128            Err(TryRecvError::Disconnected) => {
129                // Sender dropped without a cancel — treat as "no cancellation
130                // will ever come" and just wait for natural exit.
131            }
132        }
133
134        std::thread::sleep(POLL_INTERVAL);
135    };
136
137    match wait_status {
138        Ok(status) => {
139            // Natural exit: drain pipes and map to success/error.
140            let stdout = drain(&mut child.stdout.take());
141            let stderr = drain(&mut child.stderr.take());
142            if status.success() {
143                if stdout.trim().is_empty() {
144                    ToolCallResult::error(format!("`{cmd_display}` produced no output"))
145                } else {
146                    ToolCallResult::success(stdout)
147                }
148            } else {
149                let code = status.code().unwrap_or(-1);
150                let detail = if stderr.trim().is_empty() {
151                    stdout
152                } else {
153                    stderr
154                };
155                ToolCallResult::error(format!("`{cmd_display}` failed (exit {code}): {detail}"))
156            }
157        }
158        Err(CancelReason::Signalled) => {
159            // SIGTERM, grace window, then SIGKILL.
160            send_sigterm(pid);
161            let deadline = Instant::now() + Duration::from_millis(grace_ms);
162            let mut escalated = false;
163            loop {
164                match child.try_wait() {
165                    Ok(Some(_)) => break,
166                    Ok(None) => {}
167                    Err(_) => break,
168                }
169                if Instant::now() >= deadline {
170                    if !escalated {
171                        // Best-effort kill (SIGKILL on Unix, TerminateProcess
172                        // on Windows). Ignore errors — the process may have
173                        // exited between try_wait and here.
174                        let _ = child.kill();
175                        escalated = true;
176                    } else {
177                        // Even SIGKILL hasn't reaped it — give up after a
178                        // short extra window to avoid hanging the main
179                        // thread forever. In practice SIGKILL is immediate.
180                        break;
181                    }
182                }
183                std::thread::sleep(POLL_INTERVAL);
184            }
185            // Reap if still alive (keeps us from leaking a zombie).
186            let _ = child.wait();
187
188            let stdout = drain(&mut child.stdout.take());
189            let preview = truncate_for_preview(&stdout);
190            ToolCallResult::error(format!(
191                "Cancelled: `{cmd_display}` terminated by notifications/cancelled; partial stdout: {preview}"
192            ))
193        }
194    }
195}
196
197enum CancelReason {
198    Signalled,
199}
200
201fn drain<R: Read>(reader: &mut Option<R>) -> String {
202    let mut buf = String::new();
203    if let Some(r) = reader.as_mut() {
204        let _ = r.read_to_string(&mut buf);
205    }
206    buf
207}
208
209fn truncate_for_preview(s: &str) -> String {
210    const MAX: usize = 512;
211    if s.len() <= MAX {
212        s.to_string()
213    } else {
214        let truncated: String = s.chars().take(MAX).collect();
215        format!("{truncated}… (truncated)")
216    }
217}
218
219#[cfg(unix)]
220fn send_sigterm(pid: u32) {
221    use nix::sys::signal::{kill, Signal};
222    use nix::unistd::Pid;
223
224    // Cast is safe: u32 → i32 for PIDs in the valid pid_t range. Values
225    // above i32::MAX would be invalid PIDs on every Unix we support, so
226    // saturating is fine — `kill` will just fail with EINVAL and we move
227    // on to the SIGKILL branch.
228    #[allow(clippy::cast_possible_wrap)]
229    let raw = pid as i32;
230    let _ = kill(Pid::from_raw(raw), Signal::SIGTERM);
231}
232
233#[cfg(not(unix))]
234fn send_sigterm(_pid: u32) {
235    // Windows has no SIGTERM; we skip straight to the SIGKILL equivalent
236    // (`child.kill()`) in the escalation path above.
237}
238
239/// Spawn `apr <args...>` and stream stdout line-by-line to `on_line`.
240///
241/// FALSIFY-MCP-PROGRESS-001: this is the streaming variant used by tools that
242/// emit `notifications/progress` (currently `apr.finetune`). Each line of
243/// stdout (as written by `apr <cmd> --json`) is passed to `on_line`
244/// synchronously before the next `read_line` — the caller is responsible for
245/// emitting the notification.
246///
247/// Returns a `ToolCallResult` whose body is the concatenated stdout (the same
248/// shape as [`run_apr`] would have produced), so callers can keep the
249/// existing "final payload" semantics while layering progress on top.
250#[must_use]
251pub fn run_apr_streaming<F>(args: &[&str], on_line: F) -> ToolCallResult
252where
253    F: FnMut(&str),
254{
255    spawn_streaming("apr", args, on_line)
256}
257
258/// Generic-over-program variant of [`run_apr_streaming`] used by tests that
259/// need to inject a mock subprocess.
260#[must_use]
261pub fn spawn_streaming<F>(program: &str, args: &[&str], mut on_line: F) -> ToolCallResult
262where
263    F: FnMut(&str),
264{
265    let cmd_display = format!("{program} {}", args.join(" "));
266
267    let mut child = match Command::new(program)
268        .args(args)
269        .stdout(Stdio::piped())
270        .stderr(Stdio::piped())
271        .spawn()
272    {
273        Ok(c) => c,
274        Err(e) => {
275            return ToolCallResult::error(format!("Failed to spawn `{cmd_display}`: {e}"));
276        }
277    };
278
279    // Take stdout so we can wrap it in a BufReader. Leaving stderr attached
280    // to the child means we can drain it after wait() for the error path.
281    let stdout_pipe = match child.stdout.take() {
282        Some(p) => p,
283        None => {
284            let _ = child.wait();
285            return ToolCallResult::error(format!("Failed to capture stdout of `{cmd_display}`"));
286        }
287    };
288
289    let mut accumulated = String::new();
290    let reader = BufReader::new(stdout_pipe);
291    for line in reader.lines() {
292        match line {
293            Ok(text) => {
294                on_line(&text);
295                accumulated.push_str(&text);
296                accumulated.push('\n');
297            }
298            Err(e) => {
299                // Best-effort: surface the read error but still try to reap
300                // the child so we don't leak a zombie.
301                let _ = child.wait();
302                return ToolCallResult::error(format!(
303                    "Failed to read stdout of `{cmd_display}`: {e}"
304                ));
305            }
306        }
307    }
308
309    // stdout closed → subprocess is either exited or about to. Wait for the
310    // exit status so we can map success/failure correctly.
311    let status = match child.wait() {
312        Ok(s) => s,
313        Err(e) => {
314            return ToolCallResult::error(format!("Failed to reap `{cmd_display}`: {e}"));
315        }
316    };
317
318    let stderr = drain(&mut child.stderr.take());
319
320    if status.success() {
321        if accumulated.trim().is_empty() {
322            ToolCallResult::error(format!("`{cmd_display}` produced no output"))
323        } else {
324            ToolCallResult::success(accumulated)
325        }
326    } else {
327        let code = status.code().unwrap_or(-1);
328        let detail = if stderr.trim().is_empty() {
329            accumulated
330        } else {
331            stderr
332        };
333        ToolCallResult::error(format!("`{cmd_display}` failed (exit {code}): {detail}"))
334    }
335}
336
337#[cfg(test)]
338#[allow(clippy::disallowed_methods)] // test helpers
339mod tests {
340    use super::*;
341    use std::sync::mpsc;
342    use std::thread;
343
344    /// Spawning `apr` with an unrecognised subcommand yields a tool error
345    /// (non-zero exit), not a panic.
346    #[test]
347    fn spawn_failure_maps_to_tool_error() {
348        let result = run_apr(&["this-subcommand-does-not-exist"]);
349        assert_eq!(result.is_error, Some(true));
350    }
351
352    /// Cancellable path: a never-firing receiver lets the subprocess run to
353    /// natural completion, producing identical behaviour to `run_apr`.
354    #[test]
355    fn cancellable_natural_exit_matches_run_apr() {
356        let (_tx, rx) = mpsc::channel::<()>();
357        let result = spawn_cancellable("echo", &["hello"], &rx, CANCEL_GRACE_MS);
358        assert!(result.is_error.is_none(), "echo should succeed");
359        assert!(result.content[0].text.contains("hello"));
360    }
361
362    /// Cancellable path: a disconnected receiver (sender dropped) is
363    /// equivalent to "no cancellation will arrive" — behaviour should not
364    /// change vs the never-firing channel.
365    #[test]
366    fn cancellable_disconnected_channel_is_noop() {
367        let (tx, rx) = mpsc::channel::<()>();
368        drop(tx);
369        let result = spawn_cancellable("echo", &["world"], &rx, CANCEL_GRACE_MS);
370        assert!(result.is_error.is_none());
371        assert!(result.content[0].text.contains("world"));
372    }
373
374    /// Spawning a missing binary returns a spawn error without panic.
375    #[test]
376    fn cancellable_spawn_failure_maps_to_error() {
377        let (_tx, rx) = mpsc::channel::<()>();
378        let result = spawn_cancellable(
379            "/this/binary/does/not/exist/apr-mcp-test",
380            &[],
381            &rx,
382            CANCEL_GRACE_MS,
383        );
384        assert_eq!(result.is_error, Some(true));
385        assert!(result.content[0].text.contains("Failed to spawn"));
386    }
387
388    /// FALSIFY-MCP-PROGRESS-001 (unit): `spawn_streaming` fires the callback
389    /// once per stdout line before returning the aggregated payload.
390    #[test]
391    fn streaming_invokes_callback_per_line() {
392        let lines = std::sync::Mutex::new(Vec::<String>::new());
393        let result = spawn_streaming("printf", &["line1\nline2\nline3\n"], |line| {
394            lines
395                .lock()
396                .expect("test mutex not poisoned")
397                .push(line.to_string());
398        });
399        assert!(result.is_error.is_none(), "printf should succeed");
400
401        let captured = lines.lock().expect("mutex").clone();
402        assert_eq!(captured, vec!["line1", "line2", "line3"]);
403        assert!(result.content[0].text.contains("line1"));
404        assert!(result.content[0].text.contains("line3"));
405    }
406
407    /// Spawn failure in the streaming path returns a tool error without
408    /// invoking the callback.
409    #[test]
410    fn streaming_spawn_failure_does_not_call_callback() {
411        let called = std::sync::Mutex::new(false);
412        let result = spawn_streaming(
413            "/this/binary/does/not/exist/apr-mcp-streaming-test",
414            &[],
415            |_| {
416                *called.lock().expect("mutex") = true;
417            },
418        );
419        assert_eq!(result.is_error, Some(true));
420        assert!(!*called.lock().expect("mutex"));
421        assert!(result.content[0].text.contains("Failed to spawn"));
422    }
423
424    /// Streaming path: non-zero exit surfaces as a tool error.
425    #[test]
426    #[cfg(unix)]
427    fn streaming_nonzero_exit_is_error() {
428        let result = spawn_streaming("sh", &["-c", "echo partial; exit 3"], |_| {});
429        assert_eq!(result.is_error, Some(true));
430        assert!(
431            result.content[0].text.contains("exit 3"),
432            "message should include exit code: {}",
433            result.content[0].text
434        );
435    }
436
437    /// FALSIFY-MCP-006 (unit-level): sending a cancel signal to a
438    /// long-running `sleep 60` subprocess returns within the grace window
439    /// (SIGTERM is immediate for `sleep`, so we see natural reap well
440    /// before the SIGKILL escalation).
441    #[test]
442    #[cfg(unix)]
443    fn cancellable_stops_long_running_subprocess_within_grace() {
444        let (tx, rx) = mpsc::channel::<()>();
445
446        // Fire the cancel shortly after spawn to give the subprocess time
447        // to get into its sleep syscall.
448        let handle = thread::spawn(move || {
449            thread::sleep(Duration::from_millis(100));
450            let _ = tx.send(());
451        });
452
453        let t0 = Instant::now();
454        // Grace of 2s: if SIGTERM fails for any reason we still fall back
455        // to SIGKILL well before the test's own timeout.
456        let result = spawn_cancellable("sleep", &["60"], &rx, 2_000);
457        let elapsed = t0.elapsed();
458
459        handle.join().expect("cancel-sender thread joins");
460
461        assert_eq!(result.is_error, Some(true), "cancelled calls are errors");
462        assert!(
463            result.content[0].text.starts_with("Cancelled:"),
464            "message should indicate cancellation, got: {}",
465            result.content[0].text
466        );
467        // 100ms fire + ~immediate SIGTERM response + cleanup — well under
468        // the 2s grace + 200ms test slack the spec calls for.
469        assert!(
470            elapsed < Duration::from_millis(2_500),
471            "cancel should finish within grace + slack, took {elapsed:?}"
472        );
473        // And it must finish meaningfully faster than the underlying
474        // `sleep 60` would have — this is the real falsification.
475        assert!(
476            elapsed < Duration::from_secs(5),
477            "cancelled call must return far before sleep 60's natural exit"
478        );
479    }
480}