use std::io::ErrorKind;
use crate::base::McpError;
use crate::repl;
use crate::repl::ReplError;
pub const FIRMWARE_PROBE_SCRIPT: &str = include_str!("../python/firmware_probe.py");
pub const PROBE_TIMEOUT_MS: u64 = crate::library::LIBRARY_FILE_OP_TIMEOUT_MS;
#[derive(Debug)]
pub struct ProbeResult {
pub raw_output: String,
pub probe_complete: bool,
pub force_service_bound: bool,
pub jos_present: bool,
pub library_version: Option<String>,
}
pub fn run_probe(port: &mut dyn serialport::SerialPort) -> Result<ProbeResult, McpError> {
port.set_timeout(std::time::Duration::from_millis(PROBE_TIMEOUT_MS))
.map_err(|e| {
McpError::Connection(format!(
"failed to set probe read timeout to {PROBE_TIMEOUT_MS}ms: {e}; \
device may be left in Raw REPL — power-cycle or run --smoke-test"
))
})?;
let normalized_script = FIRMWARE_PROBE_SCRIPT
.replace("\r\n", "\n")
.replace('\r', "\n");
let exec_result = repl::exec_code(port, &normalized_script);
let resp = match exec_result {
Ok(r) => r,
Err(e) => {
tracing::warn!(err = %e, "firmware probe exec failed; sending Ctrl-C abort");
let _ = port.write_all(&[0x03]);
let _ = port.flush();
std::thread::sleep(std::time::Duration::from_millis(10));
let is_timeout = matches!(
&e,
ReplError::Io(io_err)
if io_err.kind() == ErrorKind::TimedOut
|| io_err.kind() == ErrorKind::WouldBlock
);
if is_timeout {
return Err(McpError::Connection(
"firmware probe timed out after 30s — device may be unresponsive; \
power-cycle or try `jumperless-mcp --smoke-test`"
.into(),
));
}
return Err(McpError::Connection(format!(
"firmware probe exec failed: {e}"
)));
}
};
let _ = port.set_timeout(std::time::Duration::from_millis(
crate::PORT_OPEN_TIMEOUT_MS,
));
if resp.is_error() {
return Err(McpError::Protocol(format!(
"firmware probe raised a Python exception on device:\n{}\n{}",
resp.stderr.trim(),
resp.stdout.trim(),
)));
}
let raw_output = resp.stdout.clone();
let parsed = parse_probe_output(&raw_output);
Ok(ProbeResult {
raw_output,
probe_complete: parsed.probe_complete,
force_service_bound: parsed.force_service_bound,
jos_present: parsed.jos_present,
library_version: parsed.library_version,
})
}
#[derive(Debug, Default)]
pub struct ParsedProbeOutput {
pub probe_complete: bool,
pub force_service_bound: bool,
pub jos_present: bool,
pub library_version: Option<String>,
}
pub fn parse_probe_output(output: &str) -> ParsedProbeOutput {
let mut result = ParsedProbeOutput::default();
for line in output.lines() {
let trimmed = line.trim();
if trimmed == "Probe complete" {
result.probe_complete = true;
}
if trimmed == "jumperless.force_service present: True" {
result.force_service_bound = true;
}
if trimmed == "jumperless.jOS present: True" {
result.jos_present = true;
}
if let Some(rest) = trimmed.strip_prefix("VERSION file: ") {
if rest.starts_with('\'') && rest.ends_with('\'') && rest.len() >= 2 {
let inner = &rest[1..rest.len() - 1];
if !inner.is_empty() {
result.library_version = Some(inner.to_string());
}
}
}
}
result
}
#[cfg(test)]
mod tests {
use super::*;
const SAMPLE_JOS_WITH_LIB: &str = "\
============================================================\n\
Jumperless V5 firmware probe\n\
============================================================\n\
\n\
[1] System identity\n\
sys.platform: 'rp2'\n\
sys.implementation: namespace(name='micropython', version=(1, 24, 0), _machine='RP2350', _mpy=132)\n\
sys.version: '3.4.0; MicroPython v1.24.0 on 2024-10-25'\n\
os.uname: (sysname='rp2', nodename='rp2', release='1.24.0', version='v1.24.0 on 2024-10-25', machine='RP2350 with RP2350')\n\
\n\
[2] Gating check: cooperative-yield primitive\n\
jumperless.force_service present: True\n\
-> Verdict: likely JumperlOS firmware\n\
\n\
[3] API surface fingerprint\n\
jumperless.force_service: True\n\
jumperless.jOS: True\n\
jumperless.connect: True\n\
\n\
[5] jOS namespace (JumperlOS marker)\n\
jumperless.jOS present: True\n\
jOS attr count: 3\n\
jOS.task_count\n\
jOS.yield_now\n\
jOS.schedule\n\
\n\
[8] Resident jumperless_mcp library\n\
VERSION file: '0.2.0+20260510'\n\
\n\
============================================================\n\
Probe complete\n\
============================================================\n\
";
const SAMPLE_STABLE_NO_LIB: &str = "\
============================================================\n\
Jumperless V5 firmware probe\n\
============================================================\n\
\n\
[2] Gating check: cooperative-yield primitive\n\
jumperless.force_service present: False\n\
-> Verdict: likely RP23V50firmware (stable)\n\
\n\
[5] jOS namespace (JumperlOS marker)\n\
jumperless.jOS present: False\n\
\n\
[8] Resident jumperless_mcp library\n\
VERSION file: <not installed>\n\
\n\
Probe complete\n\
";
#[test]
fn parse_jos_firmware_with_library() {
let parsed = parse_probe_output(SAMPLE_JOS_WITH_LIB);
assert!(
parsed.probe_complete,
"probe_complete should be true when sentinel is present"
);
assert!(
parsed.force_service_bound,
"force_service_bound should be true for JumperlOS output"
);
assert!(
parsed.jos_present,
"jos_present should be true for JumperlOS output"
);
assert_eq!(
parsed.library_version,
Some("0.2.0+20260510".to_string()),
"library_version should parse the quoted VERSION value"
);
}
#[test]
fn parse_stable_firmware_no_library() {
let parsed = parse_probe_output(SAMPLE_STABLE_NO_LIB);
assert!(
parsed.probe_complete,
"probe_complete should be true when sentinel is present"
);
assert!(
!parsed.force_service_bound,
"force_service_bound should be false for stable firmware"
);
assert!(
!parsed.jos_present,
"jos_present should be false for stable firmware"
);
assert_eq!(
parsed.library_version, None,
"library_version should be None when VERSION file is <not installed>"
);
}
#[test]
fn parse_empty_output_is_all_false() {
let parsed = parse_probe_output("");
assert!(!parsed.probe_complete);
assert!(!parsed.force_service_bound);
assert!(!parsed.jos_present);
assert_eq!(parsed.library_version, None);
}
#[test]
fn parse_truncated_output_probe_complete_false() {
let truncated = "\
============================================================\n\
Jumperless V5 firmware probe\n\
============================================================\n\
\n\
[2] Gating check: cooperative-yield primitive\n\
jumperless.force_service present: True\n\
-> Verdict: likely JumperlOS firmware\n\
\n\
[5] jOS namespace (JumperlOS marker)\n\
jumperless.jOS present: True\n\
\n\
[8] Resident jumperless_mcp library\n\
VERSION file: '0.2.0+20260510'\n\
";
let parsed = parse_probe_output(truncated);
assert!(
!parsed.probe_complete,
"probe_complete must be false when sentinel is absent"
);
assert!(
parsed.force_service_bound,
"force_service_bound still parsed from partial output"
);
assert!(
parsed.jos_present,
"jos_present still parsed from partial output"
);
assert_eq!(
parsed.library_version,
Some("0.2.0+20260510".to_string()),
"library_version still parsed from partial output"
);
}
#[test]
fn parse_force_service_false_line_does_not_set_flag() {
let output = " jumperless.force_service present: False\n";
let parsed = parse_probe_output(output);
assert!(!parsed.force_service_bound);
}
#[test]
fn parse_jos_false_line_does_not_set_flag() {
let output = " jumperless.jOS present: False\n";
let parsed = parse_probe_output(output);
assert!(!parsed.jos_present);
}
#[test]
fn parse_version_unquoted_not_extracted() {
let output = " VERSION file: <not installed>\n";
let parsed = parse_probe_output(output);
assert_eq!(parsed.library_version, None);
}
#[test]
fn parse_version_various_semver_strings() {
let cases = [
(" VERSION file: '0.1.0'\n", "0.1.0"),
(" VERSION file: '0.2.0+20260510'\n", "0.2.0+20260510"),
(" VERSION file: '1.0.0-beta.1'\n", "1.0.0-beta.1"),
];
for (output, expected) in cases {
let parsed = parse_probe_output(output);
assert_eq!(
parsed.library_version.as_deref(),
Some(expected),
"failed to parse version from: {output:?}"
);
}
}
#[test]
fn probe_script_is_non_empty() {
assert!(
!FIRMWARE_PROBE_SCRIPT.is_empty(),
"FIRMWARE_PROBE_SCRIPT must not be empty"
);
assert!(
FIRMWARE_PROBE_SCRIPT.contains("Jumperless V5 firmware probe"),
"FIRMWARE_PROBE_SCRIPT must contain probe header"
);
}
}