jumperless-mcp 0.1.0

MCP server for the Jumperless V5 — persistent USB-serial bridge exposing the firmware API to LLMs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
//! Dev escape-hatch ToolDef: `jumperless.dev.*`
//!
//! One tool for running arbitrary Python code on the Jumperless V5 via the
//! Raw REPL. This is explicitly a dev/debug tool — not for production callers.
//!
//! ## Design notes:
//! - This is the unstructured backdoor for unforeseen debugging needs.
//! - Caller-supplied Python source is executed directly via repl::exec_code.
//! - exec_with_cleanup is used for the Ctrl-C-on-error safety guarantee.
//! - Returns stdout, stderr, and is_error — same fields as ReplResponse.
//!
//! ## Landmines to avoid (from brief):
//! - Do NOT use `hashlib.sha256(...).hexdigest()` — unavailable on V5.
//!   Use `binascii.hexlify(...digest()).decode()` instead.
//! - CRLF in code will cause device-side SyntaxError. Normalize before send.
//! - Triple-single-quotes in code will corrupt the REPL framing — disallow.

use crate::base::{McpError, ToolDescriptor};
use serde_json::{json, Value};
use std::io::{Read, Write};

use crate::repl;

// ── ToolDescriptors ───────────────────────────────────────────────────────────

/// Escape-hatch: execute arbitrary Python on the Jumperless V5.
pub fn dev_exec_python_descriptor() -> ToolDescriptor {
    ToolDescriptor::with_timeout(
        "dev_exec_python",
        "DEV/DEBUG ONLY: Execute arbitrary Python source code on the Jumperless V5 via \
         the MicroPython Raw REPL. Returns stdout, stderr, and is_error. \
         This is an unstructured escape hatch for unforeseen debugging needs — \
         not for production callers. Use structured tools (overlay.*, slot.*, etc.) \
         where available. \
         CONSTRAINTS: code must not contain triple-single-quotes (''') — they corrupt \
         REPL framing. CRLF line endings cause SyntaxError. hashlib.sha256().hexdigest() \
         is unavailable on V5 — use binascii.hexlify(h.digest()).decode() instead.",
        json!({
            "type": "object",
            "properties": {
                "code": {
                    "type": "string",
                    "description": "Python source code to execute. Must not contain triple-single-quotes. LF line endings preferred."
                }
            },
            "required": ["code"],
            "additionalProperties": false
        }),
        10_000,
    )
}

/// Return the single dev ToolDescriptor.
pub fn descriptors() -> Vec<ToolDescriptor> {
    vec![dev_exec_python_descriptor()]
}

// ── Handlers ─────────────────────────────────────────────────────────────────

/// Execute caller-supplied Python and return `{"stdout", "stderr", "is_error"}`.
///
/// ## Exception handling contract
///
/// This tool deliberately differs from other tools in how device exceptions are handled:
/// - **Device-side exceptions** (MicroPython tracebacks): returned as `Ok` with
///   `is_error: true`. The caller gets the full stdout + stderr for inspection.
///   Ctrl-C is sent after a device exception to clean up the REPL state.
/// - **Transport-level errors** (I/O failure, protocol framing error): returned as `Err`.
///
/// This is the escape-hatch tool — operators reaching for it specifically want structured
/// access to tracebacks. Converting device exceptions to `Err` would destroy that value.
///
/// We bypass `exec_with_cleanup` (which converts `is_error()` to `Err`) and use
/// `repl::exec_code` directly, handling cleanup ourselves.
pub fn handle_dev_exec_python<P: Read + Write + ?Sized>(
    port: &mut P,
    args: &Value,
) -> Result<Value, McpError> {
    let code = args
        .get("code")
        .and_then(|v| v.as_str())
        .ok_or_else(|| McpError::Protocol("dev_exec_python requires a 'code' argument".into()))?;

    if code.is_empty() {
        return Err(McpError::Protocol(
            "dev_exec_python: 'code' must be non-empty".into(),
        ));
    }

    // Safety: reject triple-quote tokens of either form to keep wire-framing safe
    // for future composition (''' corrupts paste-mode framing; """ is disallowed
    // for consistency so callers don't have to track which form is rejected).
    if code.contains("'''") {
        return Err(McpError::Protocol(
            "dev_exec_python: code must not contain triple-single-quotes (''') — \
             triple-quote tokens of either form are rejected to keep wire-framing safe"
                .into(),
        ));
    }
    if code.contains("\"\"\"") {
        return Err(McpError::Protocol(
            "dev_exec_python: code must not contain triple-double-quotes (\"\"\") — \
             triple-quote tokens of either form are rejected to keep wire-framing safe"
                .into(),
        ));
    }

    // Normalize CRLF → LF to prevent device-side SyntaxError (landmine #3 from brief).
    let normalized = code.replace("\r\n", "\n").replace('\r', "\n");

    tracing::info!(
        code_len = normalized.len(),
        "dev_exec_python: executing caller-supplied Python"
    );

    // Use repl::exec_code directly (NOT exec_with_cleanup) so device-side exceptions
    // are returned as Ok with is_error:true instead of converted to Err.
    match repl::exec_code(port, &normalized) {
        Ok(resp) => {
            if resp.is_error() {
                // Device exception: send Ctrl-C to clean up REPL state, then return Ok
                // with the structured traceback. The caller needs the stderr content.
                tracing::warn!(
                    stderr = %resp.stderr.trim(),
                    "dev_exec_python: device-side exception (returning Ok with is_error:true)"
                );
                let _ = port.write_all(&[0x03]);
                let _ = port.flush();
                std::thread::sleep(std::time::Duration::from_millis(10));
            }
            Ok(json!({
                "stdout": resp.stdout,
                "stderr": resp.stderr,
                "is_error": resp.is_error()
            }))
        }
        Err(e) => {
            // Transport-level error (I/O failure, framing error, port disconnect).
            // Send Ctrl-C best-effort to clean up, then propagate as Err.
            let _ = port.write_all(&[0x03]);
            let _ = port.flush();
            Err(McpError::Protocol(format!("dev_exec_python: {e}")))
        }
    }
}

// ── Tests ─────────────────────────────────────────────────────────────────────

#[cfg(test)]
mod tests {
    use super::*;
    use std::collections::VecDeque;
    use std::io::{self, Read, Write};

    // ── MockPort ──────────────────────────────────────────────────────────────

    struct MockPort {
        read_data: VecDeque<u8>,
        pub write_data: Vec<u8>,
    }

    impl MockPort {
        fn with_responses(responses: &[&[u8]]) -> Self {
            let mut buf = Vec::new();
            for r in responses {
                buf.extend_from_slice(r);
            }
            MockPort {
                read_data: VecDeque::from(buf),
                write_data: Vec::new(),
            }
        }

        fn ok_frame() -> Vec<u8> {
            b"OK\x04\x04>".to_vec()
        }

        fn ok_with_stdout(line: &str) -> Vec<u8> {
            let mut v = b"OK".to_vec();
            v.extend_from_slice(line.as_bytes());
            v.push(b'\n');
            v.extend_from_slice(b"\x04\x04>");
            v
        }

        fn error_frame(msg: &str) -> Vec<u8> {
            let mut v = b"OK\x04".to_vec();
            v.extend_from_slice(msg.as_bytes());
            v.push(b'\n');
            v.push(b'\x04');
            v.push(b'>');
            v
        }
    }

    impl Read for MockPort {
        fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
            let n = buf.len().min(self.read_data.len());
            if n == 0 {
                return Err(io::Error::new(
                    io::ErrorKind::UnexpectedEof,
                    "MockPort: no more scripted bytes",
                ));
            }
            for (dst, src) in buf[..n].iter_mut().zip(self.read_data.drain(..n)) {
                *dst = src;
            }
            Ok(n)
        }
    }

    impl Write for MockPort {
        fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
            self.write_data.extend_from_slice(buf);
            Ok(buf.len())
        }
        fn flush(&mut self) -> io::Result<()> {
            Ok(())
        }
    }

    // ── Descriptor tests ──────────────────────────────────────────────────────

    #[test]
    fn descriptor_has_correct_name() {
        let d = dev_exec_python_descriptor();
        assert_eq!(d.name, "dev_exec_python");
    }

    #[test]
    fn descriptor_requires_code_field() {
        let d = dev_exec_python_descriptor();
        let required = d.input_schema.get("required").unwrap();
        let arr = required.as_array().unwrap();
        assert!(
            arr.iter().any(|v| v.as_str() == Some("code")),
            "dev_exec_python must require 'code'"
        );
    }

    #[test]
    fn descriptor_has_additional_properties_false() {
        let d = dev_exec_python_descriptor();
        assert_eq!(
            d.input_schema.get("additionalProperties"),
            Some(&Value::Bool(false))
        );
    }

    #[test]
    fn descriptor_has_generous_timeout() {
        let d = dev_exec_python_descriptor();
        assert!(
            d.timeout_ms >= 5_000,
            "dev_exec_python needs generous timeout"
        );
    }

    #[test]
    fn descriptors_returns_one_tool() {
        assert_eq!(descriptors().len(), 1);
    }

    // ── Handler: dev_exec_python ──────────────────────────────────────────────

    #[test]
    fn exec_python_happy_path_returns_stdout() {
        let frame = MockPort::ok_with_stdout("42");
        let mut port = MockPort::with_responses(&[&frame]);
        let args = json!({"code": "print(6*7)"});
        let result = handle_dev_exec_python(&mut port, &args).unwrap();
        assert!(result["stdout"].as_str().unwrap().contains("42"));
        assert_eq!(result["is_error"], false);
    }

    #[test]
    fn exec_python_device_exception_returns_ok_with_is_error_true() {
        // R4 fix: device-side exceptions must return Ok with is_error:true, NOT Err.
        // The caller (escape-hatch tool) needs structured access to the traceback.
        // Ctrl-C must still be sent to clean up REPL state.
        let err = MockPort::error_frame("ZeroDivisionError: division by zero");
        let mut port = MockPort::with_responses(&[&err]);
        let args = json!({"code": "print(1/0)"});
        let result = handle_dev_exec_python(&mut port, &args);
        // Must be Ok, NOT Err.
        assert!(
            result.is_ok(),
            "device exception must return Ok(…), not Err — got: {result:?}"
        );
        let val = result.unwrap();
        assert_eq!(
            val["is_error"], true,
            "is_error must be true on device exception"
        );
        // stderr must contain the traceback.
        assert!(
            val["stderr"]
                .as_str()
                .unwrap_or("")
                .contains("ZeroDivisionError"),
            "stderr must contain the traceback text"
        );
        // Ctrl-C must still have been sent to clean up REPL state.
        assert!(
            port.write_data.contains(&0x03),
            "Ctrl-C must be sent after device exception"
        );
    }

    #[test]
    fn exec_python_missing_code_returns_error() {
        let mut port = MockPort::with_responses(&[]);
        let args = json!({});
        let result = handle_dev_exec_python(&mut port, &args);
        assert!(result.is_err());
        match result.unwrap_err() {
            McpError::Protocol(msg) => {
                assert!(msg.contains("code"), "error must mention 'code' arg");
            }
            other => panic!("expected McpError::Protocol, got: {other:?}"),
        }
    }

    #[test]
    fn exec_python_empty_code_returns_error() {
        let mut port = MockPort::with_responses(&[]);
        let args = json!({"code": ""});
        let result = handle_dev_exec_python(&mut port, &args);
        assert!(result.is_err());
    }

    #[test]
    fn exec_python_triple_single_quotes_returns_error() {
        let mut port = MockPort::with_responses(&[]);
        let args = json!({"code": "x = '''hello'''"});
        let result = handle_dev_exec_python(&mut port, &args);
        assert!(result.is_err());
        match result.unwrap_err() {
            McpError::Protocol(msg) => {
                assert!(
                    msg.contains("triple-single-quotes"),
                    "error must mention triple-single-quotes; got: {msg}"
                );
            }
            other => panic!("expected McpError::Protocol, got: {other:?}"),
        }
    }

    #[test]
    fn exec_python_triple_double_quotes_returns_error() {
        // R9: triple-double-quotes are also rejected for consistency.
        // The framing risk from ''' doesn't apply to this tool's Ctrl-D framing,
        // but both forms are disallowed to keep the rule simple for callers.
        let mut port = MockPort::with_responses(&[]);
        let args = json!({"code": "x = \"\"\"hello\"\"\""});
        let result = handle_dev_exec_python(&mut port, &args);
        assert!(result.is_err());
        match result.unwrap_err() {
            McpError::Protocol(msg) => {
                assert!(
                    msg.contains("triple-double-quotes"),
                    "error must mention triple-double-quotes; got: {msg}"
                );
            }
            other => panic!("expected McpError::Protocol, got: {other:?}"),
        }
    }

    #[test]
    fn exec_python_transport_error_returns_err_not_ok() {
        // Transport-level errors (I/O failure) must still propagate as Err.
        // Distinguishes "device raised exception" (Ok + is_error:true) from
        // "port disconnected" (Err).
        let mut port = MockPort::with_responses(&[]);
        let args = json!({"code": "print('hello')"});
        let result = handle_dev_exec_python(&mut port, &args);
        assert!(result.is_err(), "transport error must return Err");
    }

    #[test]
    fn exec_python_normalizes_crlf_before_sending() {
        // CRLF in code → should be normalized to LF before exec_code.
        // We can verify by checking what was written to the port.
        let frame = MockPort::ok_frame();
        let mut port = MockPort::with_responses(&[&frame]);
        let args = json!({"code": "x = 1\r\ny = 2\r\nprint(x+y)"});
        let _ = handle_dev_exec_python(&mut port, &args);
        let written = String::from_utf8_lossy(&port.write_data);
        assert!(
            !written.contains("\r\n"),
            "CRLF must be normalized to LF before sending to device"
        );
    }

    #[test]
    fn exec_python_transport_error_returns_err() {
        // Empty MockPort → exec_code returns UnexpectedEof.
        let mut port = MockPort::with_responses(&[]);
        let args = json!({"code": "print('hello')"});
        let result = handle_dev_exec_python(&mut port, &args);
        assert!(result.is_err());
    }
}