aprender-mcp 0.40.0

Model Context Protocol (MCP) server for aprender — exposes apr CLI as MCP tools
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
//! Shared subprocess wrapper for M2/M3 tools.
//!
//! Every M2 tool spawns `apr <subcommand> [...args] --json` and passes stdout
//! through to the MCP client verbatim. Non-zero exit maps to `isError: true`
//! with stderr attached. This module centralizes that pattern so each tool is
//! a thin definition + a list of CLI args.
//!
//! M3 (FALSIFY-MCP-006) adds [`run_apr_cancellable`], which polls a
//! [`std::sync::mpsc::Receiver`] between `try_wait` checks and escalates to
//! SIGTERM → (grace window) → SIGKILL on the spawned subprocess when a
//! cancellation is signalled. The non-cancellable [`run_apr`] is kept as a
//! thin wrapper for tools that don't support cancellation yet.

use crate::types::ToolCallResult;
use std::io::{BufRead, BufReader, Read};
use std::process::{Command, Stdio};
use std::sync::mpsc::{Receiver, TryRecvError};
use std::time::{Duration, Instant};

/// Default grace window between SIGTERM and SIGKILL for cancelled calls.
///
/// Per `docs/specifications/apr-mcp-server-spec.md`:
/// > `notifications/cancelled` from client → kill the spawned `apr`
/// > subprocess with SIGTERM (30s grace) → SIGKILL.
pub const CANCEL_GRACE_MS: u64 = 30_000;

/// Poll interval when waiting for subprocess exit / cancel signal.
const POLL_INTERVAL: Duration = Duration::from_millis(10);

/// Spawn `apr <args...>` and wait synchronously. Shorthand for the
/// non-cancellable path used by every tool except `apr.run`.
///
/// - Successful exit with non-empty stdout → `success(stdout)`
/// - Successful exit with empty stdout → `error("apr ... produced no output")`
/// - Non-zero exit → `error("apr ... failed (exit N): <stderr-or-stdout>")`
/// - Spawn failure → `error("Failed to spawn apr ...: <io-err>")`
#[must_use]
pub fn run_apr(args: &[&str]) -> ToolCallResult {
    let output = match Command::new("apr").args(args).output() {
        Ok(o) => o,
        Err(e) => {
            let cmd = format!("apr {}", args.join(" "));
            return ToolCallResult::error(format!("Failed to spawn `{cmd}`: {e}"));
        }
    };

    let stdout = String::from_utf8_lossy(&output.stdout).into_owned();
    let stderr = String::from_utf8_lossy(&output.stderr).into_owned();

    if output.status.success() {
        if stdout.trim().is_empty() {
            let cmd = format!("apr {}", args.join(" "));
            ToolCallResult::error(format!("`{cmd}` produced no output"))
        } else {
            ToolCallResult::success(stdout)
        }
    } else {
        let code = output.status.code().unwrap_or(-1);
        let detail = if stderr.trim().is_empty() {
            stdout
        } else {
            stderr
        };
        let cmd = format!("apr {}", args.join(" "));
        ToolCallResult::error(format!("`{cmd}` failed (exit {code}): {detail}"))
    }
}

/// Spawn `apr <args...>` cancellable via `cancel_rx`.
///
/// On receipt of any value on `cancel_rx`, the subprocess is sent SIGTERM.
/// If it hasn't exited within `grace_ms` milliseconds, SIGKILL is sent. The
/// returned `ToolCallResult` carries whatever stdout was captured up to the
/// point of cancellation and has `is_error: Some(true)` with a message that
/// starts with `"Cancelled:"`.
///
/// Non-Unix targets do NOT support signalling; this function falls back to
/// `child.kill()` (equivalent to SIGKILL on Windows).
#[must_use]
pub fn run_apr_cancellable(
    args: &[&str],
    cancel_rx: &Receiver<()>,
    grace_ms: u64,
) -> ToolCallResult {
    spawn_cancellable("apr", args, cancel_rx, grace_ms)
}

/// Test-visible generic over the binary name. `run_apr_cancellable` is the
/// `"apr"`-bound wrapper clients should use in production code.
#[must_use]
pub fn spawn_cancellable(
    program: &str,
    args: &[&str],
    cancel_rx: &Receiver<()>,
    grace_ms: u64,
) -> ToolCallResult {
    let cmd_display = format!("{program} {}", args.join(" "));

    let mut child = match Command::new(program)
        .args(args)
        .stdout(Stdio::piped())
        .stderr(Stdio::piped())
        .spawn()
    {
        Ok(c) => c,
        Err(e) => {
            return ToolCallResult::error(format!("Failed to spawn `{cmd_display}`: {e}"));
        }
    };

    let pid = child.id();

    // Poll loop: check if the child exited, then check for a cancel signal.
    // Sleep `POLL_INTERVAL` between iterations. This keeps cancel latency
    // under ~10ms while the subprocess is alive.
    let wait_status = loop {
        match child.try_wait() {
            Ok(Some(status)) => break Ok(status),
            Ok(None) => {}
            Err(e) => {
                return ToolCallResult::error(format!("Failed to poll `{cmd_display}`: {e}"));
            }
        }

        match cancel_rx.try_recv() {
            Ok(()) => break Err(CancelReason::Signalled),
            Err(TryRecvError::Empty) => {}
            Err(TryRecvError::Disconnected) => {
                // Sender dropped without a cancel — treat as "no cancellation
                // will ever come" and just wait for natural exit.
            }
        }

        std::thread::sleep(POLL_INTERVAL);
    };

    match wait_status {
        Ok(status) => {
            // Natural exit: drain pipes and map to success/error.
            let stdout = drain(&mut child.stdout.take());
            let stderr = drain(&mut child.stderr.take());
            if status.success() {
                if stdout.trim().is_empty() {
                    ToolCallResult::error(format!("`{cmd_display}` produced no output"))
                } else {
                    ToolCallResult::success(stdout)
                }
            } else {
                let code = status.code().unwrap_or(-1);
                let detail = if stderr.trim().is_empty() {
                    stdout
                } else {
                    stderr
                };
                ToolCallResult::error(format!("`{cmd_display}` failed (exit {code}): {detail}"))
            }
        }
        Err(CancelReason::Signalled) => {
            // SIGTERM, grace window, then SIGKILL.
            send_sigterm(pid);
            let deadline = Instant::now() + Duration::from_millis(grace_ms);
            let mut escalated = false;
            loop {
                match child.try_wait() {
                    Ok(Some(_)) => break,
                    Ok(None) => {}
                    Err(_) => break,
                }
                if Instant::now() >= deadline {
                    if !escalated {
                        // Best-effort kill (SIGKILL on Unix, TerminateProcess
                        // on Windows). Ignore errors — the process may have
                        // exited between try_wait and here.
                        let _ = child.kill();
                        escalated = true;
                    } else {
                        // Even SIGKILL hasn't reaped it — give up after a
                        // short extra window to avoid hanging the main
                        // thread forever. In practice SIGKILL is immediate.
                        break;
                    }
                }
                std::thread::sleep(POLL_INTERVAL);
            }
            // Reap if still alive (keeps us from leaking a zombie).
            let _ = child.wait();

            let stdout = drain(&mut child.stdout.take());
            let preview = truncate_for_preview(&stdout);
            ToolCallResult::error(format!(
                "Cancelled: `{cmd_display}` terminated by notifications/cancelled; partial stdout: {preview}"
            ))
        }
    }
}

enum CancelReason {
    Signalled,
}

fn drain<R: Read>(reader: &mut Option<R>) -> String {
    let mut buf = String::new();
    if let Some(r) = reader.as_mut() {
        let _ = r.read_to_string(&mut buf);
    }
    buf
}

fn truncate_for_preview(s: &str) -> String {
    const MAX: usize = 512;
    if s.len() <= MAX {
        s.to_string()
    } else {
        let truncated: String = s.chars().take(MAX).collect();
        format!("{truncated}… (truncated)")
    }
}

#[cfg(unix)]
fn send_sigterm(pid: u32) {
    use nix::sys::signal::{kill, Signal};
    use nix::unistd::Pid;

    // Cast is safe: u32 → i32 for PIDs in the valid pid_t range. Values
    // above i32::MAX would be invalid PIDs on every Unix we support, so
    // saturating is fine — `kill` will just fail with EINVAL and we move
    // on to the SIGKILL branch.
    #[allow(clippy::cast_possible_wrap)]
    let raw = pid as i32;
    let _ = kill(Pid::from_raw(raw), Signal::SIGTERM);
}

#[cfg(not(unix))]
fn send_sigterm(_pid: u32) {
    // Windows has no SIGTERM; we skip straight to the SIGKILL equivalent
    // (`child.kill()`) in the escalation path above.
}

/// Spawn `apr <args...>` and stream stdout line-by-line to `on_line`.
///
/// FALSIFY-MCP-PROGRESS-001: this is the streaming variant used by tools that
/// emit `notifications/progress` (currently `apr.finetune`). Each line of
/// stdout (as written by `apr <cmd> --json`) is passed to `on_line`
/// synchronously before the next `read_line` — the caller is responsible for
/// emitting the notification.
///
/// Returns a `ToolCallResult` whose body is the concatenated stdout (the same
/// shape as [`run_apr`] would have produced), so callers can keep the
/// existing "final payload" semantics while layering progress on top.
#[must_use]
pub fn run_apr_streaming<F>(args: &[&str], on_line: F) -> ToolCallResult
where
    F: FnMut(&str),
{
    spawn_streaming("apr", args, on_line)
}

/// Generic-over-program variant of [`run_apr_streaming`] used by tests that
/// need to inject a mock subprocess.
#[must_use]
pub fn spawn_streaming<F>(program: &str, args: &[&str], mut on_line: F) -> ToolCallResult
where
    F: FnMut(&str),
{
    let cmd_display = format!("{program} {}", args.join(" "));

    let mut child = match Command::new(program)
        .args(args)
        .stdout(Stdio::piped())
        .stderr(Stdio::piped())
        .spawn()
    {
        Ok(c) => c,
        Err(e) => {
            return ToolCallResult::error(format!("Failed to spawn `{cmd_display}`: {e}"));
        }
    };

    // Take stdout so we can wrap it in a BufReader. Leaving stderr attached
    // to the child means we can drain it after wait() for the error path.
    let stdout_pipe = match child.stdout.take() {
        Some(p) => p,
        None => {
            let _ = child.wait();
            return ToolCallResult::error(format!("Failed to capture stdout of `{cmd_display}`"));
        }
    };

    let mut accumulated = String::new();
    let reader = BufReader::new(stdout_pipe);
    for line in reader.lines() {
        match line {
            Ok(text) => {
                on_line(&text);
                accumulated.push_str(&text);
                accumulated.push('\n');
            }
            Err(e) => {
                // Best-effort: surface the read error but still try to reap
                // the child so we don't leak a zombie.
                let _ = child.wait();
                return ToolCallResult::error(format!(
                    "Failed to read stdout of `{cmd_display}`: {e}"
                ));
            }
        }
    }

    // stdout closed → subprocess is either exited or about to. Wait for the
    // exit status so we can map success/failure correctly.
    let status = match child.wait() {
        Ok(s) => s,
        Err(e) => {
            return ToolCallResult::error(format!("Failed to reap `{cmd_display}`: {e}"));
        }
    };

    let stderr = drain(&mut child.stderr.take());

    if status.success() {
        if accumulated.trim().is_empty() {
            ToolCallResult::error(format!("`{cmd_display}` produced no output"))
        } else {
            ToolCallResult::success(accumulated)
        }
    } else {
        let code = status.code().unwrap_or(-1);
        let detail = if stderr.trim().is_empty() {
            accumulated
        } else {
            stderr
        };
        ToolCallResult::error(format!("`{cmd_display}` failed (exit {code}): {detail}"))
    }
}

#[cfg(test)]
#[allow(clippy::disallowed_methods)] // test helpers
mod tests {
    use super::*;
    use std::sync::mpsc;
    use std::thread;

    /// Spawning `apr` with an unrecognised subcommand yields a tool error
    /// (non-zero exit), not a panic.
    #[test]
    fn spawn_failure_maps_to_tool_error() {
        let result = run_apr(&["this-subcommand-does-not-exist"]);
        assert_eq!(result.is_error, Some(true));
    }

    /// Cancellable path: a never-firing receiver lets the subprocess run to
    /// natural completion, producing identical behaviour to `run_apr`.
    #[test]
    fn cancellable_natural_exit_matches_run_apr() {
        let (_tx, rx) = mpsc::channel::<()>();
        let result = spawn_cancellable("echo", &["hello"], &rx, CANCEL_GRACE_MS);
        assert!(result.is_error.is_none(), "echo should succeed");
        assert!(result.content[0].text.contains("hello"));
    }

    /// Cancellable path: a disconnected receiver (sender dropped) is
    /// equivalent to "no cancellation will arrive" — behaviour should not
    /// change vs the never-firing channel.
    #[test]
    fn cancellable_disconnected_channel_is_noop() {
        let (tx, rx) = mpsc::channel::<()>();
        drop(tx);
        let result = spawn_cancellable("echo", &["world"], &rx, CANCEL_GRACE_MS);
        assert!(result.is_error.is_none());
        assert!(result.content[0].text.contains("world"));
    }

    /// Spawning a missing binary returns a spawn error without panic.
    #[test]
    fn cancellable_spawn_failure_maps_to_error() {
        let (_tx, rx) = mpsc::channel::<()>();
        let result = spawn_cancellable(
            "/this/binary/does/not/exist/apr-mcp-test",
            &[],
            &rx,
            CANCEL_GRACE_MS,
        );
        assert_eq!(result.is_error, Some(true));
        assert!(result.content[0].text.contains("Failed to spawn"));
    }

    /// FALSIFY-MCP-PROGRESS-001 (unit): `spawn_streaming` fires the callback
    /// once per stdout line before returning the aggregated payload.
    #[test]
    fn streaming_invokes_callback_per_line() {
        let lines = std::sync::Mutex::new(Vec::<String>::new());
        let result = spawn_streaming("printf", &["line1\nline2\nline3\n"], |line| {
            lines
                .lock()
                .expect("test mutex not poisoned")
                .push(line.to_string());
        });
        assert!(result.is_error.is_none(), "printf should succeed");

        let captured = lines.lock().expect("mutex").clone();
        assert_eq!(captured, vec!["line1", "line2", "line3"]);
        assert!(result.content[0].text.contains("line1"));
        assert!(result.content[0].text.contains("line3"));
    }

    /// Spawn failure in the streaming path returns a tool error without
    /// invoking the callback.
    #[test]
    fn streaming_spawn_failure_does_not_call_callback() {
        let called = std::sync::Mutex::new(false);
        let result = spawn_streaming(
            "/this/binary/does/not/exist/apr-mcp-streaming-test",
            &[],
            |_| {
                *called.lock().expect("mutex") = true;
            },
        );
        assert_eq!(result.is_error, Some(true));
        assert!(!*called.lock().expect("mutex"));
        assert!(result.content[0].text.contains("Failed to spawn"));
    }

    /// Streaming path: non-zero exit surfaces as a tool error.
    #[test]
    #[cfg(unix)]
    fn streaming_nonzero_exit_is_error() {
        let result = spawn_streaming("sh", &["-c", "echo partial; exit 3"], |_| {});
        assert_eq!(result.is_error, Some(true));
        assert!(
            result.content[0].text.contains("exit 3"),
            "message should include exit code: {}",
            result.content[0].text
        );
    }

    /// FALSIFY-MCP-006 (unit-level): sending a cancel signal to a
    /// long-running `sleep 60` subprocess returns within the grace window
    /// (SIGTERM is immediate for `sleep`, so we see natural reap well
    /// before the SIGKILL escalation).
    #[test]
    #[cfg(unix)]
    fn cancellable_stops_long_running_subprocess_within_grace() {
        let (tx, rx) = mpsc::channel::<()>();

        // Fire the cancel shortly after spawn to give the subprocess time
        // to get into its sleep syscall.
        let handle = thread::spawn(move || {
            thread::sleep(Duration::from_millis(100));
            let _ = tx.send(());
        });

        let t0 = Instant::now();
        // Grace of 2s: if SIGTERM fails for any reason we still fall back
        // to SIGKILL well before the test's own timeout.
        let result = spawn_cancellable("sleep", &["60"], &rx, 2_000);
        let elapsed = t0.elapsed();

        handle.join().expect("cancel-sender thread joins");

        assert_eq!(result.is_error, Some(true), "cancelled calls are errors");
        assert!(
            result.content[0].text.starts_with("Cancelled:"),
            "message should indicate cancellation, got: {}",
            result.content[0].text
        );
        // 100ms fire + ~immediate SIGTERM response + cleanup — well under
        // the 2s grace + 200ms test slack the spec calls for.
        assert!(
            elapsed < Duration::from_millis(2_500),
            "cancel should finish within grace + slack, took {elapsed:?}"
        );
        // And it must finish meaningfully faster than the underlying
        // `sleep 60` would have — this is the real falsification.
        assert!(
            elapsed < Duration::from_secs(5),
            "cancelled call must return far before sleep 60's natural exit"
        );
    }
}