use crate::types::ToolCallResult;
use std::io::{BufRead, BufReader, Read};
use std::process::{Command, Stdio};
use std::sync::mpsc::{Receiver, TryRecvError};
use std::time::{Duration, Instant};
pub const CANCEL_GRACE_MS: u64 = 30_000;
const POLL_INTERVAL: Duration = Duration::from_millis(10);
#[must_use]
pub fn run_apr(args: &[&str]) -> ToolCallResult {
let output = match Command::new("apr").args(args).output() {
Ok(o) => o,
Err(e) => {
let cmd = format!("apr {}", args.join(" "));
return ToolCallResult::error(format!("Failed to spawn `{cmd}`: {e}"));
}
};
let stdout = String::from_utf8_lossy(&output.stdout).into_owned();
let stderr = String::from_utf8_lossy(&output.stderr).into_owned();
if output.status.success() {
if stdout.trim().is_empty() {
let cmd = format!("apr {}", args.join(" "));
ToolCallResult::error(format!("`{cmd}` produced no output"))
} else {
ToolCallResult::success(stdout)
}
} else {
let code = output.status.code().unwrap_or(-1);
let detail = if stderr.trim().is_empty() {
stdout
} else {
stderr
};
let cmd = format!("apr {}", args.join(" "));
ToolCallResult::error(format!("`{cmd}` failed (exit {code}): {detail}"))
}
}
#[must_use]
pub fn run_apr_cancellable(
args: &[&str],
cancel_rx: &Receiver<()>,
grace_ms: u64,
) -> ToolCallResult {
spawn_cancellable("apr", args, cancel_rx, grace_ms)
}
#[must_use]
pub fn spawn_cancellable(
program: &str,
args: &[&str],
cancel_rx: &Receiver<()>,
grace_ms: u64,
) -> ToolCallResult {
let cmd_display = format!("{program} {}", args.join(" "));
let mut child = match Command::new(program)
.args(args)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
{
Ok(c) => c,
Err(e) => {
return ToolCallResult::error(format!("Failed to spawn `{cmd_display}`: {e}"));
}
};
let pid = child.id();
let wait_status = loop {
match child.try_wait() {
Ok(Some(status)) => break Ok(status),
Ok(None) => {}
Err(e) => {
return ToolCallResult::error(format!("Failed to poll `{cmd_display}`: {e}"));
}
}
match cancel_rx.try_recv() {
Ok(()) => break Err(CancelReason::Signalled),
Err(TryRecvError::Empty) => {}
Err(TryRecvError::Disconnected) => {
}
}
std::thread::sleep(POLL_INTERVAL);
};
match wait_status {
Ok(status) => {
let stdout = drain(&mut child.stdout.take());
let stderr = drain(&mut child.stderr.take());
if status.success() {
if stdout.trim().is_empty() {
ToolCallResult::error(format!("`{cmd_display}` produced no output"))
} else {
ToolCallResult::success(stdout)
}
} else {
let code = status.code().unwrap_or(-1);
let detail = if stderr.trim().is_empty() {
stdout
} else {
stderr
};
ToolCallResult::error(format!("`{cmd_display}` failed (exit {code}): {detail}"))
}
}
Err(CancelReason::Signalled) => {
send_sigterm(pid);
let deadline = Instant::now() + Duration::from_millis(grace_ms);
let mut escalated = false;
loop {
match child.try_wait() {
Ok(Some(_)) => break,
Ok(None) => {}
Err(_) => break,
}
if Instant::now() >= deadline {
if !escalated {
let _ = child.kill();
escalated = true;
} else {
break;
}
}
std::thread::sleep(POLL_INTERVAL);
}
let _ = child.wait();
let stdout = drain(&mut child.stdout.take());
let preview = truncate_for_preview(&stdout);
ToolCallResult::error(format!(
"Cancelled: `{cmd_display}` terminated by notifications/cancelled; partial stdout: {preview}"
))
}
}
}
enum CancelReason {
Signalled,
}
fn drain<R: Read>(reader: &mut Option<R>) -> String {
let mut buf = String::new();
if let Some(r) = reader.as_mut() {
let _ = r.read_to_string(&mut buf);
}
buf
}
fn truncate_for_preview(s: &str) -> String {
const MAX: usize = 512;
if s.len() <= MAX {
s.to_string()
} else {
let truncated: String = s.chars().take(MAX).collect();
format!("{truncated}… (truncated)")
}
}
#[cfg(unix)]
fn send_sigterm(pid: u32) {
use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
#[allow(clippy::cast_possible_wrap)]
let raw = pid as i32;
let _ = kill(Pid::from_raw(raw), Signal::SIGTERM);
}
#[cfg(not(unix))]
fn send_sigterm(_pid: u32) {
}
#[must_use]
pub fn run_apr_streaming<F>(args: &[&str], on_line: F) -> ToolCallResult
where
F: FnMut(&str),
{
spawn_streaming("apr", args, on_line)
}
#[must_use]
pub fn spawn_streaming<F>(program: &str, args: &[&str], mut on_line: F) -> ToolCallResult
where
F: FnMut(&str),
{
let cmd_display = format!("{program} {}", args.join(" "));
let mut child = match Command::new(program)
.args(args)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
{
Ok(c) => c,
Err(e) => {
return ToolCallResult::error(format!("Failed to spawn `{cmd_display}`: {e}"));
}
};
let stdout_pipe = match child.stdout.take() {
Some(p) => p,
None => {
let _ = child.wait();
return ToolCallResult::error(format!("Failed to capture stdout of `{cmd_display}`"));
}
};
let mut accumulated = String::new();
let reader = BufReader::new(stdout_pipe);
for line in reader.lines() {
match line {
Ok(text) => {
on_line(&text);
accumulated.push_str(&text);
accumulated.push('\n');
}
Err(e) => {
let _ = child.wait();
return ToolCallResult::error(format!(
"Failed to read stdout of `{cmd_display}`: {e}"
));
}
}
}
let status = match child.wait() {
Ok(s) => s,
Err(e) => {
return ToolCallResult::error(format!("Failed to reap `{cmd_display}`: {e}"));
}
};
let stderr = drain(&mut child.stderr.take());
if status.success() {
if accumulated.trim().is_empty() {
ToolCallResult::error(format!("`{cmd_display}` produced no output"))
} else {
ToolCallResult::success(accumulated)
}
} else {
let code = status.code().unwrap_or(-1);
let detail = if stderr.trim().is_empty() {
accumulated
} else {
stderr
};
ToolCallResult::error(format!("`{cmd_display}` failed (exit {code}): {detail}"))
}
}
#[cfg(test)]
#[allow(clippy::disallowed_methods)] mod tests {
use super::*;
use std::sync::mpsc;
use std::thread;
#[test]
fn spawn_failure_maps_to_tool_error() {
let result = run_apr(&["this-subcommand-does-not-exist"]);
assert_eq!(result.is_error, Some(true));
}
#[test]
fn cancellable_natural_exit_matches_run_apr() {
let (_tx, rx) = mpsc::channel::<()>();
let result = spawn_cancellable("echo", &["hello"], &rx, CANCEL_GRACE_MS);
assert!(result.is_error.is_none(), "echo should succeed");
assert!(result.content[0].text.contains("hello"));
}
#[test]
fn cancellable_disconnected_channel_is_noop() {
let (tx, rx) = mpsc::channel::<()>();
drop(tx);
let result = spawn_cancellable("echo", &["world"], &rx, CANCEL_GRACE_MS);
assert!(result.is_error.is_none());
assert!(result.content[0].text.contains("world"));
}
#[test]
fn cancellable_spawn_failure_maps_to_error() {
let (_tx, rx) = mpsc::channel::<()>();
let result = spawn_cancellable(
"/this/binary/does/not/exist/apr-mcp-test",
&[],
&rx,
CANCEL_GRACE_MS,
);
assert_eq!(result.is_error, Some(true));
assert!(result.content[0].text.contains("Failed to spawn"));
}
#[test]
fn streaming_invokes_callback_per_line() {
let lines = std::sync::Mutex::new(Vec::<String>::new());
let result = spawn_streaming("printf", &["line1\nline2\nline3\n"], |line| {
lines
.lock()
.expect("test mutex not poisoned")
.push(line.to_string());
});
assert!(result.is_error.is_none(), "printf should succeed");
let captured = lines.lock().expect("mutex").clone();
assert_eq!(captured, vec!["line1", "line2", "line3"]);
assert!(result.content[0].text.contains("line1"));
assert!(result.content[0].text.contains("line3"));
}
#[test]
fn streaming_spawn_failure_does_not_call_callback() {
let called = std::sync::Mutex::new(false);
let result = spawn_streaming(
"/this/binary/does/not/exist/apr-mcp-streaming-test",
&[],
|_| {
*called.lock().expect("mutex") = true;
},
);
assert_eq!(result.is_error, Some(true));
assert!(!*called.lock().expect("mutex"));
assert!(result.content[0].text.contains("Failed to spawn"));
}
#[test]
#[cfg(unix)]
fn streaming_nonzero_exit_is_error() {
let result = spawn_streaming("sh", &["-c", "echo partial; exit 3"], |_| {});
assert_eq!(result.is_error, Some(true));
assert!(
result.content[0].text.contains("exit 3"),
"message should include exit code: {}",
result.content[0].text
);
}
#[test]
#[cfg(unix)]
fn cancellable_stops_long_running_subprocess_within_grace() {
let (tx, rx) = mpsc::channel::<()>();
let handle = thread::spawn(move || {
thread::sleep(Duration::from_millis(100));
let _ = tx.send(());
});
let t0 = Instant::now();
let result = spawn_cancellable("sleep", &["60"], &rx, 2_000);
let elapsed = t0.elapsed();
handle.join().expect("cancel-sender thread joins");
assert_eq!(result.is_error, Some(true), "cancelled calls are errors");
assert!(
result.content[0].text.starts_with("Cancelled:"),
"message should indicate cancellation, got: {}",
result.content[0].text
);
assert!(
elapsed < Duration::from_millis(2_500),
"cancel should finish within grace + slack, took {elapsed:?}"
);
assert!(
elapsed < Duration::from_secs(5),
"cancelled call must return far before sleep 60's natural exit"
);
}
}