use crate::base::{McpError, ToolDescriptor};
use serde_json::{json, Value};
use std::io::{Read, Write};
use crate::repl;
pub fn dev_exec_python_descriptor() -> ToolDescriptor {
ToolDescriptor::with_timeout(
"dev_exec_python",
"DEV/DEBUG ONLY: Execute arbitrary Python source code on the Jumperless V5 via \
the MicroPython Raw REPL. Returns stdout, stderr, and is_error. \
This is an unstructured escape hatch for unforeseen debugging needs — \
not for production callers. Use structured tools (overlay.*, slot.*, etc.) \
where available. \
CONSTRAINTS: code must not contain triple-single-quotes (''') — they corrupt \
REPL framing. CRLF line endings cause SyntaxError. hashlib.sha256().hexdigest() \
is unavailable on V5 — use binascii.hexlify(h.digest()).decode() instead.",
json!({
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "Python source code to execute. Must not contain triple-single-quotes. LF line endings preferred."
}
},
"required": ["code"],
"additionalProperties": false
}),
10_000,
)
}
pub fn descriptors() -> Vec<ToolDescriptor> {
vec![dev_exec_python_descriptor()]
}
pub fn handle_dev_exec_python<P: Read + Write + ?Sized>(
port: &mut P,
args: &Value,
) -> Result<Value, McpError> {
let code = args
.get("code")
.and_then(|v| v.as_str())
.ok_or_else(|| McpError::Protocol("dev_exec_python requires a 'code' argument".into()))?;
if code.is_empty() {
return Err(McpError::Protocol(
"dev_exec_python: 'code' must be non-empty".into(),
));
}
if code.contains("'''") {
return Err(McpError::Protocol(
"dev_exec_python: code must not contain triple-single-quotes (''') — \
triple-quote tokens of either form are rejected to keep wire-framing safe"
.into(),
));
}
if code.contains("\"\"\"") {
return Err(McpError::Protocol(
"dev_exec_python: code must not contain triple-double-quotes (\"\"\") — \
triple-quote tokens of either form are rejected to keep wire-framing safe"
.into(),
));
}
let normalized = code.replace("\r\n", "\n").replace('\r', "\n");
tracing::info!(
code_len = normalized.len(),
"dev_exec_python: executing caller-supplied Python"
);
match repl::exec_code(port, &normalized) {
Ok(resp) => {
if resp.is_error() {
tracing::warn!(
stderr = %resp.stderr.trim(),
"dev_exec_python: device-side exception (returning Ok with is_error:true)"
);
let _ = port.write_all(&[0x03]);
let _ = port.flush();
std::thread::sleep(std::time::Duration::from_millis(10));
}
Ok(json!({
"stdout": resp.stdout,
"stderr": resp.stderr,
"is_error": resp.is_error()
}))
}
Err(e) => {
let _ = port.write_all(&[0x03]);
let _ = port.flush();
Err(McpError::Protocol(format!("dev_exec_python: {e}")))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::VecDeque;
use std::io::{self, Read, Write};
struct MockPort {
read_data: VecDeque<u8>,
pub write_data: Vec<u8>,
}
impl MockPort {
fn with_responses(responses: &[&[u8]]) -> Self {
let mut buf = Vec::new();
for r in responses {
buf.extend_from_slice(r);
}
MockPort {
read_data: VecDeque::from(buf),
write_data: Vec::new(),
}
}
fn ok_frame() -> Vec<u8> {
b"OK\x04\x04>".to_vec()
}
fn ok_with_stdout(line: &str) -> Vec<u8> {
let mut v = b"OK".to_vec();
v.extend_from_slice(line.as_bytes());
v.push(b'\n');
v.extend_from_slice(b"\x04\x04>");
v
}
fn error_frame(msg: &str) -> Vec<u8> {
let mut v = b"OK\x04".to_vec();
v.extend_from_slice(msg.as_bytes());
v.push(b'\n');
v.push(b'\x04');
v.push(b'>');
v
}
}
impl Read for MockPort {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let n = buf.len().min(self.read_data.len());
if n == 0 {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"MockPort: no more scripted bytes",
));
}
for (dst, src) in buf[..n].iter_mut().zip(self.read_data.drain(..n)) {
*dst = src;
}
Ok(n)
}
}
impl Write for MockPort {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.write_data.extend_from_slice(buf);
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
#[test]
fn descriptor_has_correct_name() {
let d = dev_exec_python_descriptor();
assert_eq!(d.name, "dev_exec_python");
}
#[test]
fn descriptor_requires_code_field() {
let d = dev_exec_python_descriptor();
let required = d.input_schema.get("required").unwrap();
let arr = required.as_array().unwrap();
assert!(
arr.iter().any(|v| v.as_str() == Some("code")),
"dev_exec_python must require 'code'"
);
}
#[test]
fn descriptor_has_additional_properties_false() {
let d = dev_exec_python_descriptor();
assert_eq!(
d.input_schema.get("additionalProperties"),
Some(&Value::Bool(false))
);
}
#[test]
fn descriptor_has_generous_timeout() {
let d = dev_exec_python_descriptor();
assert!(
d.timeout_ms >= 5_000,
"dev_exec_python needs generous timeout"
);
}
#[test]
fn descriptors_returns_one_tool() {
assert_eq!(descriptors().len(), 1);
}
#[test]
fn exec_python_happy_path_returns_stdout() {
let frame = MockPort::ok_with_stdout("42");
let mut port = MockPort::with_responses(&[&frame]);
let args = json!({"code": "print(6*7)"});
let result = handle_dev_exec_python(&mut port, &args).unwrap();
assert!(result["stdout"].as_str().unwrap().contains("42"));
assert_eq!(result["is_error"], false);
}
#[test]
fn exec_python_device_exception_returns_ok_with_is_error_true() {
let err = MockPort::error_frame("ZeroDivisionError: division by zero");
let mut port = MockPort::with_responses(&[&err]);
let args = json!({"code": "print(1/0)"});
let result = handle_dev_exec_python(&mut port, &args);
assert!(
result.is_ok(),
"device exception must return Ok(…), not Err — got: {result:?}"
);
let val = result.unwrap();
assert_eq!(
val["is_error"], true,
"is_error must be true on device exception"
);
assert!(
val["stderr"]
.as_str()
.unwrap_or("")
.contains("ZeroDivisionError"),
"stderr must contain the traceback text"
);
assert!(
port.write_data.contains(&0x03),
"Ctrl-C must be sent after device exception"
);
}
#[test]
fn exec_python_missing_code_returns_error() {
let mut port = MockPort::with_responses(&[]);
let args = json!({});
let result = handle_dev_exec_python(&mut port, &args);
assert!(result.is_err());
match result.unwrap_err() {
McpError::Protocol(msg) => {
assert!(msg.contains("code"), "error must mention 'code' arg");
}
other => panic!("expected McpError::Protocol, got: {other:?}"),
}
}
#[test]
fn exec_python_empty_code_returns_error() {
let mut port = MockPort::with_responses(&[]);
let args = json!({"code": ""});
let result = handle_dev_exec_python(&mut port, &args);
assert!(result.is_err());
}
#[test]
fn exec_python_triple_single_quotes_returns_error() {
let mut port = MockPort::with_responses(&[]);
let args = json!({"code": "x = '''hello'''"});
let result = handle_dev_exec_python(&mut port, &args);
assert!(result.is_err());
match result.unwrap_err() {
McpError::Protocol(msg) => {
assert!(
msg.contains("triple-single-quotes"),
"error must mention triple-single-quotes; got: {msg}"
);
}
other => panic!("expected McpError::Protocol, got: {other:?}"),
}
}
#[test]
fn exec_python_triple_double_quotes_returns_error() {
let mut port = MockPort::with_responses(&[]);
let args = json!({"code": "x = \"\"\"hello\"\"\""});
let result = handle_dev_exec_python(&mut port, &args);
assert!(result.is_err());
match result.unwrap_err() {
McpError::Protocol(msg) => {
assert!(
msg.contains("triple-double-quotes"),
"error must mention triple-double-quotes; got: {msg}"
);
}
other => panic!("expected McpError::Protocol, got: {other:?}"),
}
}
#[test]
fn exec_python_transport_error_returns_err_not_ok() {
let mut port = MockPort::with_responses(&[]);
let args = json!({"code": "print('hello')"});
let result = handle_dev_exec_python(&mut port, &args);
assert!(result.is_err(), "transport error must return Err");
}
#[test]
fn exec_python_normalizes_crlf_before_sending() {
let frame = MockPort::ok_frame();
let mut port = MockPort::with_responses(&[&frame]);
let args = json!({"code": "x = 1\r\ny = 2\r\nprint(x+y)"});
let _ = handle_dev_exec_python(&mut port, &args);
let written = String::from_utf8_lossy(&port.write_data);
assert!(
!written.contains("\r\n"),
"CRLF must be normalized to LF before sending to device"
);
}
#[test]
fn exec_python_transport_error_returns_err() {
let mut port = MockPort::with_responses(&[]);
let args = json!({"code": "print('hello')"});
let result = handle_dev_exec_python(&mut port, &args);
assert!(result.is_err());
}
}