jumperless-mcp 0.1.0

MCP server for the Jumperless V5 — persistent USB-serial bridge exposing the firmware API to LLMs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
//! Firmware identification probe for the Jumperless V5.
//!
//! Execs a diagnostic Python script over the MicroPython Raw REPL and returns
//! the verbatim stdout. The probe is purely read-only — it does not modify
//! device state (no `jfs.write`, no `nodes_save`, no overlay paints).
//!
//! # Design
//!
//! `FIRMWARE_PROBE_SCRIPT` is embedded at compile time via `include_str!` so
//! the binary is fully self-contained. The script is exec'd transiently each
//! invocation and is NOT installed to the device filesystem.
//!
//! The exec path mirrors `library::device_verify_script` / `device_dump_script`:
//! - bump read timeout to `LIBRARY_FILE_OP_TIMEOUT_MS` (30s)
//! - call `repl::exec_code`
//! - restore short timeout
//!
//! # Output parsing
//!
//! `parse_probe_output` does a minimal line-scan to extract four verdict fields
//! for the JSON output mode:
//! - `probe_complete` — from `"Probe complete"` sentinel line (script end marker)
//! - `force_service_bound` — from `jumperless.force_service present: True`
//! - `jos_present` — from `jumperless.jOS present: True`
//! - `library_version` — from `VERSION file: '<value>'`
//!
//! Full probe output is always returned verbatim regardless of parsing.
//!
//! # Error handling
//!
//! On `exec_code` failure the probe sends Ctrl-C (0x03) to abort any partially
//! running MicroPython code before returning the error.  This mirrors the
//! canonical pattern in `library::exec_with_cleanup`.  The 10ms sleep that
//! follows accommodates the device's interrupt-poll cadence; stale bytes that
//! don't drain in that window will be consumed (or cause an early EOF) on the
//! next exec_code attempt — acceptable per the library.rs comment.

use std::io::ErrorKind;

use crate::base::McpError;
use crate::repl;
use crate::repl::ReplError;

/// Embedded firmware probe script.
///
/// Exec'd transiently on each `jumperless-mcp firmware-probe` invocation.
/// NOT installed to the device filesystem.
pub const FIRMWARE_PROBE_SCRIPT: &str = include_str!("../python/firmware_probe.py");

/// Timeout for the probe exec. The probe is read-only and small; 30s matches
/// the library-op convention and provides headroom for slow MicroPython paths.
pub const PROBE_TIMEOUT_MS: u64 = crate::library::LIBRARY_FILE_OP_TIMEOUT_MS;

/// Result of a successful firmware probe exec.
#[derive(Debug)]
pub struct ProbeResult {
    /// Verbatim stdout from the probe script.
    pub raw_output: String,
    /// True if the probe script reached its final `"Probe complete"` sentinel.
    ///
    /// When false, the output is partial (probe crashed mid-script, stale REPL
    /// bytes, etc.).  Callers SHOULD discount verdict flags when this is false.
    pub probe_complete: bool,
    /// True if `jumperless.force_service` is bound (JumperlOS marker).
    pub force_service_bound: bool,
    /// True if `jumperless.jOS` is present (JumperlOS marker).
    pub jos_present: bool,
    /// Installed resident-library version string, or `None` if not found.
    pub library_version: Option<String>,
}

/// Run the firmware probe on an already-raw-mode serial port.
///
/// Bumps the read timeout to [`PROBE_TIMEOUT_MS`] before the exec,
/// restores [`crate::PORT_OPEN_TIMEOUT_MS`] after (best-effort on restore).
///
/// Returns `Err` if `exec_code` fails or the device returns a Python-level
/// exception (non-empty stderr). The caller is responsible for running this
/// inside `spawn_blocking`.
///
/// On `exec_code` failure a Ctrl-C byte is sent before returning the error so
/// that the device exits any partially-running MicroPython context.  This
/// mirrors `library::exec_with_cleanup`.
pub fn run_probe(port: &mut dyn serialport::SerialPort) -> Result<ProbeResult, McpError> {
    // BEFORE-bump: required or the read loop will time out before the device
    // finishes running the probe (~1-3s on real hardware).
    port.set_timeout(std::time::Duration::from_millis(PROBE_TIMEOUT_MS))
        .map_err(|e| {
            McpError::Connection(format!(
                "failed to set probe read timeout to {PROBE_TIMEOUT_MS}ms: {e}; \
                 device may be left in Raw REPL — power-cycle or run --smoke-test"
            ))
        })?;

    // Finding 5: normalize CRLF line endings before exec.  On Windows
    // `include_str!` may embed \r\n if the file was saved with CRLF; MicroPython
    // raises SyntaxError on \r\n-terminated source.
    let normalized_script = FIRMWARE_PROBE_SCRIPT
        .replace("\r\n", "\n")
        .replace('\r', "\n");

    let exec_result = repl::exec_code(port, &normalized_script);

    // Finding 1: on exec_code failure, send Ctrl-C to abort any partially-
    // running MicroPython code.  Mirror of library::exec_with_cleanup.
    let resp = match exec_result {
        Ok(r) => r,
        Err(e) => {
            tracing::warn!(err = %e, "firmware probe exec failed; sending Ctrl-C abort");
            let _ = port.write_all(&[0x03]);
            let _ = port.flush();
            std::thread::sleep(std::time::Duration::from_millis(10));

            // Finding 2: emit an actionable message when the error is a timeout.
            let is_timeout = matches!(
                &e,
                ReplError::Io(io_err)
                    if io_err.kind() == ErrorKind::TimedOut
                        || io_err.kind() == ErrorKind::WouldBlock
            );
            if is_timeout {
                return Err(McpError::Connection(
                    "firmware probe timed out after 30s — device may be unresponsive; \
                     power-cycle or try `jumperless-mcp --smoke-test`"
                        .into(),
                ));
            }
            return Err(McpError::Connection(format!(
                "firmware probe exec failed: {e}"
            )));
        }
    };

    // AFTER-restore: best-effort. A failed restore only means subsequent fast
    // ops incur one longer wait until the next set_timeout call.
    let _ = port.set_timeout(std::time::Duration::from_millis(
        crate::PORT_OPEN_TIMEOUT_MS,
    ));

    if resp.is_error() {
        return Err(McpError::Protocol(format!(
            "firmware probe raised a Python exception on device:\n{}\n{}",
            resp.stderr.trim(),
            resp.stdout.trim(),
        )));
    }

    let raw_output = resp.stdout.clone();
    let parsed = parse_probe_output(&raw_output);

    Ok(ProbeResult {
        raw_output,
        probe_complete: parsed.probe_complete,
        force_service_bound: parsed.force_service_bound,
        jos_present: parsed.jos_present,
        library_version: parsed.library_version,
    })
}

/// Parsed verdict fields extracted from probe stdout.
#[derive(Debug, Default)]
pub struct ParsedProbeOutput {
    /// True if the output contains the `"Probe complete"` sentinel line.
    ///
    /// When false the output is partial — either the probe script crashed
    /// mid-execution or stale REPL bytes were parsed instead of real output.
    /// Callers should discount the verdict flags when this is false.
    pub probe_complete: bool,
    pub force_service_bound: bool,
    pub jos_present: bool,
    pub library_version: Option<String>,
}

/// Minimal line-scan parser for probe stdout.
///
/// Looks for specific marker lines emitted by the probe script:
/// - `"Probe complete"` — completion sentinel (last line of the script)
/// - `  jumperless.force_service present: True`
/// - `  jumperless.jOS present: True`
/// - `  VERSION file: '<version>'`
///
/// Does not attempt full structured parsing; probe output is human-readable
/// by design and may vary across firmware versions. Unknown lines are ignored.
pub fn parse_probe_output(output: &str) -> ParsedProbeOutput {
    let mut result = ParsedProbeOutput::default();

    for line in output.lines() {
        let trimmed = line.trim();

        // Completion sentinel: the probe script's final `print("Probe complete")`.
        // Must be present for the output to be considered trustworthy.
        if trimmed == "Probe complete" {
            result.probe_complete = true;
        }

        // Section 2: `jumperless.force_service present: True`
        if trimmed == "jumperless.force_service present: True" {
            result.force_service_bound = true;
        }

        // Section 5: `jumperless.jOS present: True`
        if trimmed == "jumperless.jOS present: True" {
            result.jos_present = true;
        }

        // Section 8: `VERSION file: '0.2.0+20260510'`
        // The probe emits: `  VERSION file: '<value>'`
        // We strip leading `VERSION file: ` then unquote.
        if let Some(rest) = trimmed.strip_prefix("VERSION file: ") {
            // rest is e.g. `'0.2.0+20260510'` or `<not installed>`
            // Only extract if it looks like a quoted string.
            if rest.starts_with('\'') && rest.ends_with('\'') && rest.len() >= 2 {
                let inner = &rest[1..rest.len() - 1];
                if !inner.is_empty() {
                    result.library_version = Some(inner.to_string());
                }
            }
        }
    }

    result
}

// ── Tests ─────────────────────────────────────────────────────────────────────

#[cfg(test)]
mod tests {
    use super::*;

    // Sample probe output for the "JumperlOS firmware, library installed" case.
    const SAMPLE_JOS_WITH_LIB: &str = "\
============================================================\n\
Jumperless V5 firmware probe\n\
============================================================\n\
\n\
[1] System identity\n\
  sys.platform: 'rp2'\n\
  sys.implementation: namespace(name='micropython', version=(1, 24, 0), _machine='RP2350', _mpy=132)\n\
  sys.version: '3.4.0; MicroPython v1.24.0 on 2024-10-25'\n\
  os.uname: (sysname='rp2', nodename='rp2', release='1.24.0', version='v1.24.0 on 2024-10-25', machine='RP2350 with RP2350')\n\
\n\
[2] Gating check: cooperative-yield primitive\n\
  jumperless.force_service present: True\n\
  -> Verdict: likely JumperlOS firmware\n\
\n\
[3] API surface fingerprint\n\
  jumperless.force_service: True\n\
  jumperless.jOS: True\n\
  jumperless.connect: True\n\
\n\
[5] jOS namespace (JumperlOS marker)\n\
  jumperless.jOS present: True\n\
  jOS attr count: 3\n\
    jOS.task_count\n\
    jOS.yield_now\n\
    jOS.schedule\n\
\n\
[8] Resident jumperless_mcp library\n\
  VERSION file: '0.2.0+20260510'\n\
\n\
============================================================\n\
Probe complete\n\
============================================================\n\
";

    // Sample probe output for "RP23V50firmware, library not installed" case.
    const SAMPLE_STABLE_NO_LIB: &str = "\
============================================================\n\
Jumperless V5 firmware probe\n\
============================================================\n\
\n\
[2] Gating check: cooperative-yield primitive\n\
  jumperless.force_service present: False\n\
  -> Verdict: likely RP23V50firmware (stable)\n\
\n\
[5] jOS namespace (JumperlOS marker)\n\
  jumperless.jOS present: False\n\
\n\
[8] Resident jumperless_mcp library\n\
  VERSION file: <not installed>\n\
\n\
Probe complete\n\
";

    #[test]
    fn parse_jos_firmware_with_library() {
        let parsed = parse_probe_output(SAMPLE_JOS_WITH_LIB);
        assert!(
            parsed.probe_complete,
            "probe_complete should be true when sentinel is present"
        );
        assert!(
            parsed.force_service_bound,
            "force_service_bound should be true for JumperlOS output"
        );
        assert!(
            parsed.jos_present,
            "jos_present should be true for JumperlOS output"
        );
        assert_eq!(
            parsed.library_version,
            Some("0.2.0+20260510".to_string()),
            "library_version should parse the quoted VERSION value"
        );
    }

    #[test]
    fn parse_stable_firmware_no_library() {
        let parsed = parse_probe_output(SAMPLE_STABLE_NO_LIB);
        assert!(
            parsed.probe_complete,
            "probe_complete should be true when sentinel is present"
        );
        assert!(
            !parsed.force_service_bound,
            "force_service_bound should be false for stable firmware"
        );
        assert!(
            !parsed.jos_present,
            "jos_present should be false for stable firmware"
        );
        assert_eq!(
            parsed.library_version, None,
            "library_version should be None when VERSION file is <not installed>"
        );
    }

    #[test]
    fn parse_empty_output_is_all_false() {
        let parsed = parse_probe_output("");
        assert!(!parsed.probe_complete);
        assert!(!parsed.force_service_bound);
        assert!(!parsed.jos_present);
        assert_eq!(parsed.library_version, None);
    }

    /// Partial output (probe crashed before sentinel) → probe_complete is false.
    /// Verdict flags may still be partially parsed but the caller knows to
    /// discount them.
    #[test]
    fn parse_truncated_output_probe_complete_false() {
        // Cut SAMPLE_JOS_WITH_LIB off just before "Probe complete"
        let truncated = "\
============================================================\n\
Jumperless V5 firmware probe\n\
============================================================\n\
\n\
[2] Gating check: cooperative-yield primitive\n\
  jumperless.force_service present: True\n\
  -> Verdict: likely JumperlOS firmware\n\
\n\
[5] jOS namespace (JumperlOS marker)\n\
  jumperless.jOS present: True\n\
\n\
[8] Resident jumperless_mcp library\n\
  VERSION file: '0.2.0+20260510'\n\
";
        let parsed = parse_probe_output(truncated);
        assert!(
            !parsed.probe_complete,
            "probe_complete must be false when sentinel is absent"
        );
        // Verdict flags can still be parsed honestly from partial output — the
        // caller discounts them via probe_complete, not by zeroing them here.
        assert!(
            parsed.force_service_bound,
            "force_service_bound still parsed from partial output"
        );
        assert!(
            parsed.jos_present,
            "jos_present still parsed from partial output"
        );
        assert_eq!(
            parsed.library_version,
            Some("0.2.0+20260510".to_string()),
            "library_version still parsed from partial output"
        );
    }

    #[test]
    fn parse_force_service_false_line_does_not_set_flag() {
        // The line `jumperless.force_service present: False` must NOT set the flag.
        let output = "  jumperless.force_service present: False\n";
        let parsed = parse_probe_output(output);
        assert!(!parsed.force_service_bound);
    }

    #[test]
    fn parse_jos_false_line_does_not_set_flag() {
        let output = "  jumperless.jOS present: False\n";
        let parsed = parse_probe_output(output);
        assert!(!parsed.jos_present);
    }

    #[test]
    fn parse_version_unquoted_not_extracted() {
        // VERSION line that isn't quoted (e.g. `<not installed>`) yields None.
        let output = "  VERSION file: <not installed>\n";
        let parsed = parse_probe_output(output);
        assert_eq!(parsed.library_version, None);
    }

    #[test]
    fn parse_version_various_semver_strings() {
        let cases = [
            ("  VERSION file: '0.1.0'\n", "0.1.0"),
            ("  VERSION file: '0.2.0+20260510'\n", "0.2.0+20260510"),
            ("  VERSION file: '1.0.0-beta.1'\n", "1.0.0-beta.1"),
        ];
        for (output, expected) in cases {
            let parsed = parse_probe_output(output);
            assert_eq!(
                parsed.library_version.as_deref(),
                Some(expected),
                "failed to parse version from: {output:?}"
            );
        }
    }

    /// `include_str!` compile-time check — if the embedded path is wrong the
    /// build fails. This test just asserts the embedded content is non-empty.
    #[test]
    fn probe_script_is_non_empty() {
        assert!(
            !FIRMWARE_PROBE_SCRIPT.is_empty(),
            "FIRMWARE_PROBE_SCRIPT must not be empty"
        );
        // Must contain the probe's output sentinel so we know the right file loaded.
        assert!(
            FIRMWARE_PROBE_SCRIPT.contains("Jumperless V5 firmware probe"),
            "FIRMWARE_PROBE_SCRIPT must contain probe header"
        );
    }
}