aprender_mcp/tools/
subprocess.rs1use crate::types::ToolCallResult;
15use std::io::{BufRead, BufReader, Read};
16use std::process::{Command, Stdio};
17use std::sync::mpsc::{Receiver, TryRecvError};
18use std::time::{Duration, Instant};
19
20pub const CANCEL_GRACE_MS: u64 = 30_000;
26
27const POLL_INTERVAL: Duration = Duration::from_millis(10);
29
30#[must_use]
38pub fn run_apr(args: &[&str]) -> ToolCallResult {
39 let output = match Command::new("apr").args(args).output() {
40 Ok(o) => o,
41 Err(e) => {
42 let cmd = format!("apr {}", args.join(" "));
43 return ToolCallResult::error(format!("Failed to spawn `{cmd}`: {e}"));
44 }
45 };
46
47 let stdout = String::from_utf8_lossy(&output.stdout).into_owned();
48 let stderr = String::from_utf8_lossy(&output.stderr).into_owned();
49
50 if output.status.success() {
51 if stdout.trim().is_empty() {
52 let cmd = format!("apr {}", args.join(" "));
53 ToolCallResult::error(format!("`{cmd}` produced no output"))
54 } else {
55 ToolCallResult::success(stdout)
56 }
57 } else {
58 let code = output.status.code().unwrap_or(-1);
59 let detail = if stderr.trim().is_empty() {
60 stdout
61 } else {
62 stderr
63 };
64 let cmd = format!("apr {}", args.join(" "));
65 ToolCallResult::error(format!("`{cmd}` failed (exit {code}): {detail}"))
66 }
67}
68
69#[must_use]
80pub fn run_apr_cancellable(
81 args: &[&str],
82 cancel_rx: &Receiver<()>,
83 grace_ms: u64,
84) -> ToolCallResult {
85 spawn_cancellable("apr", args, cancel_rx, grace_ms)
86}
87
88#[must_use]
91pub fn spawn_cancellable(
92 program: &str,
93 args: &[&str],
94 cancel_rx: &Receiver<()>,
95 grace_ms: u64,
96) -> ToolCallResult {
97 let cmd_display = format!("{program} {}", args.join(" "));
98
99 let mut child = match Command::new(program)
100 .args(args)
101 .stdout(Stdio::piped())
102 .stderr(Stdio::piped())
103 .spawn()
104 {
105 Ok(c) => c,
106 Err(e) => {
107 return ToolCallResult::error(format!("Failed to spawn `{cmd_display}`: {e}"));
108 }
109 };
110
111 let pid = child.id();
112
113 let wait_status = loop {
117 match child.try_wait() {
118 Ok(Some(status)) => break Ok(status),
119 Ok(None) => {}
120 Err(e) => {
121 return ToolCallResult::error(format!("Failed to poll `{cmd_display}`: {e}"));
122 }
123 }
124
125 match cancel_rx.try_recv() {
126 Ok(()) => break Err(CancelReason::Signalled),
127 Err(TryRecvError::Empty) => {}
128 Err(TryRecvError::Disconnected) => {
129 }
132 }
133
134 std::thread::sleep(POLL_INTERVAL);
135 };
136
137 match wait_status {
138 Ok(status) => {
139 let stdout = drain(&mut child.stdout.take());
141 let stderr = drain(&mut child.stderr.take());
142 if status.success() {
143 if stdout.trim().is_empty() {
144 ToolCallResult::error(format!("`{cmd_display}` produced no output"))
145 } else {
146 ToolCallResult::success(stdout)
147 }
148 } else {
149 let code = status.code().unwrap_or(-1);
150 let detail = if stderr.trim().is_empty() {
151 stdout
152 } else {
153 stderr
154 };
155 ToolCallResult::error(format!("`{cmd_display}` failed (exit {code}): {detail}"))
156 }
157 }
158 Err(CancelReason::Signalled) => {
159 send_sigterm(pid);
161 let deadline = Instant::now() + Duration::from_millis(grace_ms);
162 let mut escalated = false;
163 loop {
164 match child.try_wait() {
165 Ok(Some(_)) => break,
166 Ok(None) => {}
167 Err(_) => break,
168 }
169 if Instant::now() >= deadline {
170 if !escalated {
171 let _ = child.kill();
175 escalated = true;
176 } else {
177 break;
181 }
182 }
183 std::thread::sleep(POLL_INTERVAL);
184 }
185 let _ = child.wait();
187
188 let stdout = drain(&mut child.stdout.take());
189 let preview = truncate_for_preview(&stdout);
190 ToolCallResult::error(format!(
191 "Cancelled: `{cmd_display}` terminated by notifications/cancelled; partial stdout: {preview}"
192 ))
193 }
194 }
195}
196
197enum CancelReason {
198 Signalled,
199}
200
201fn drain<R: Read>(reader: &mut Option<R>) -> String {
202 let mut buf = String::new();
203 if let Some(r) = reader.as_mut() {
204 let _ = r.read_to_string(&mut buf);
205 }
206 buf
207}
208
209fn truncate_for_preview(s: &str) -> String {
210 const MAX: usize = 512;
211 if s.len() <= MAX {
212 s.to_string()
213 } else {
214 let truncated: String = s.chars().take(MAX).collect();
215 format!("{truncated}… (truncated)")
216 }
217}
218
219#[cfg(unix)]
220fn send_sigterm(pid: u32) {
221 use nix::sys::signal::{kill, Signal};
222 use nix::unistd::Pid;
223
224 #[allow(clippy::cast_possible_wrap)]
229 let raw = pid as i32;
230 let _ = kill(Pid::from_raw(raw), Signal::SIGTERM);
231}
232
233#[cfg(not(unix))]
234fn send_sigterm(_pid: u32) {
235 }
238
239#[must_use]
251pub fn run_apr_streaming<F>(args: &[&str], on_line: F) -> ToolCallResult
252where
253 F: FnMut(&str),
254{
255 spawn_streaming("apr", args, on_line)
256}
257
258#[must_use]
261pub fn spawn_streaming<F>(program: &str, args: &[&str], mut on_line: F) -> ToolCallResult
262where
263 F: FnMut(&str),
264{
265 let cmd_display = format!("{program} {}", args.join(" "));
266
267 let mut child = match Command::new(program)
268 .args(args)
269 .stdout(Stdio::piped())
270 .stderr(Stdio::piped())
271 .spawn()
272 {
273 Ok(c) => c,
274 Err(e) => {
275 return ToolCallResult::error(format!("Failed to spawn `{cmd_display}`: {e}"));
276 }
277 };
278
279 let stdout_pipe = match child.stdout.take() {
282 Some(p) => p,
283 None => {
284 let _ = child.wait();
285 return ToolCallResult::error(format!("Failed to capture stdout of `{cmd_display}`"));
286 }
287 };
288
289 let mut accumulated = String::new();
290 let reader = BufReader::new(stdout_pipe);
291 for line in reader.lines() {
292 match line {
293 Ok(text) => {
294 on_line(&text);
295 accumulated.push_str(&text);
296 accumulated.push('\n');
297 }
298 Err(e) => {
299 let _ = child.wait();
302 return ToolCallResult::error(format!(
303 "Failed to read stdout of `{cmd_display}`: {e}"
304 ));
305 }
306 }
307 }
308
309 let status = match child.wait() {
312 Ok(s) => s,
313 Err(e) => {
314 return ToolCallResult::error(format!("Failed to reap `{cmd_display}`: {e}"));
315 }
316 };
317
318 let stderr = drain(&mut child.stderr.take());
319
320 if status.success() {
321 if accumulated.trim().is_empty() {
322 ToolCallResult::error(format!("`{cmd_display}` produced no output"))
323 } else {
324 ToolCallResult::success(accumulated)
325 }
326 } else {
327 let code = status.code().unwrap_or(-1);
328 let detail = if stderr.trim().is_empty() {
329 accumulated
330 } else {
331 stderr
332 };
333 ToolCallResult::error(format!("`{cmd_display}` failed (exit {code}): {detail}"))
334 }
335}
336
337#[cfg(test)]
338#[allow(clippy::disallowed_methods)] mod tests {
340 use super::*;
341 use std::sync::mpsc;
342 use std::thread;
343
344 #[test]
347 fn spawn_failure_maps_to_tool_error() {
348 let result = run_apr(&["this-subcommand-does-not-exist"]);
349 assert_eq!(result.is_error, Some(true));
350 }
351
352 #[test]
355 fn cancellable_natural_exit_matches_run_apr() {
356 let (_tx, rx) = mpsc::channel::<()>();
357 let result = spawn_cancellable("echo", &["hello"], &rx, CANCEL_GRACE_MS);
358 assert!(result.is_error.is_none(), "echo should succeed");
359 assert!(result.content[0].text.contains("hello"));
360 }
361
362 #[test]
366 fn cancellable_disconnected_channel_is_noop() {
367 let (tx, rx) = mpsc::channel::<()>();
368 drop(tx);
369 let result = spawn_cancellable("echo", &["world"], &rx, CANCEL_GRACE_MS);
370 assert!(result.is_error.is_none());
371 assert!(result.content[0].text.contains("world"));
372 }
373
374 #[test]
376 fn cancellable_spawn_failure_maps_to_error() {
377 let (_tx, rx) = mpsc::channel::<()>();
378 let result = spawn_cancellable(
379 "/this/binary/does/not/exist/apr-mcp-test",
380 &[],
381 &rx,
382 CANCEL_GRACE_MS,
383 );
384 assert_eq!(result.is_error, Some(true));
385 assert!(result.content[0].text.contains("Failed to spawn"));
386 }
387
388 #[test]
391 fn streaming_invokes_callback_per_line() {
392 let lines = std::sync::Mutex::new(Vec::<String>::new());
393 let result = spawn_streaming("printf", &["line1\nline2\nline3\n"], |line| {
394 lines
395 .lock()
396 .expect("test mutex not poisoned")
397 .push(line.to_string());
398 });
399 assert!(result.is_error.is_none(), "printf should succeed");
400
401 let captured = lines.lock().expect("mutex").clone();
402 assert_eq!(captured, vec!["line1", "line2", "line3"]);
403 assert!(result.content[0].text.contains("line1"));
404 assert!(result.content[0].text.contains("line3"));
405 }
406
407 #[test]
410 fn streaming_spawn_failure_does_not_call_callback() {
411 let called = std::sync::Mutex::new(false);
412 let result = spawn_streaming(
413 "/this/binary/does/not/exist/apr-mcp-streaming-test",
414 &[],
415 |_| {
416 *called.lock().expect("mutex") = true;
417 },
418 );
419 assert_eq!(result.is_error, Some(true));
420 assert!(!*called.lock().expect("mutex"));
421 assert!(result.content[0].text.contains("Failed to spawn"));
422 }
423
424 #[test]
426 #[cfg(unix)]
427 fn streaming_nonzero_exit_is_error() {
428 let result = spawn_streaming("sh", &["-c", "echo partial; exit 3"], |_| {});
429 assert_eq!(result.is_error, Some(true));
430 assert!(
431 result.content[0].text.contains("exit 3"),
432 "message should include exit code: {}",
433 result.content[0].text
434 );
435 }
436
437 #[test]
442 #[cfg(unix)]
443 fn cancellable_stops_long_running_subprocess_within_grace() {
444 let (tx, rx) = mpsc::channel::<()>();
445
446 let handle = thread::spawn(move || {
449 thread::sleep(Duration::from_millis(100));
450 let _ = tx.send(());
451 });
452
453 let t0 = Instant::now();
454 let result = spawn_cancellable("sleep", &["60"], &rx, 2_000);
457 let elapsed = t0.elapsed();
458
459 handle.join().expect("cancel-sender thread joins");
460
461 assert_eq!(result.is_error, Some(true), "cancelled calls are errors");
462 assert!(
463 result.content[0].text.starts_with("Cancelled:"),
464 "message should indicate cancellation, got: {}",
465 result.content[0].text
466 );
467 assert!(
470 elapsed < Duration::from_millis(2_500),
471 "cancel should finish within grace + slack, took {elapsed:?}"
472 );
473 assert!(
476 elapsed < Duration::from_secs(5),
477 "cancelled call must return far before sleep 60's natural exit"
478 );
479 }
480}