#![allow(dead_code, unused_imports)]
mod base;
mod ceremony;
mod library;
mod probe;
mod repl;
mod tools;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Mutex;
use std::time::Duration;
use crate::base::{
discover_composite_port_by_index, CommonCli, ConnectArgs, HealthLevel, HealthStatus, McpError,
RingId, Subsystem, ToolDescriptor, VidPid,
};
use async_trait::async_trait;
use clap::{Parser, Subcommand};
use serde_json::Value;
const JL_VID: u16 = 0x1d50;
const JL_PID: u16 = 0xacab;
const JL_RAW_REPL_PORT_INDEX: usize = 2;
const USB_CDC_BAUD: u32 = 115_200;
const PORT_OPEN_TIMEOUT_MS: u64 = 500;
const CEREMONY_READ_TIMEOUT_MS: u64 = 10_000;
const DTR_SETTLE_MS: u64 = 100;
struct Jumperless {
port: Mutex<Option<Box<dyn serialport::SerialPort + Send>>>,
port_name: Mutex<Option<String>>,
skip_handshake: bool,
no_ceremony: bool,
library_installed: AtomicBool,
skip_auto_install: bool,
}
impl Jumperless {
fn new(skip_handshake: bool, no_ceremony: bool) -> Self {
Self {
port: Mutex::new(None),
port_name: Mutex::new(None),
skip_handshake,
no_ceremony,
library_installed: AtomicBool::new(false),
skip_auto_install: false,
}
}
fn new_for_library_cli(skip_handshake: bool) -> Self {
Self {
port: Mutex::new(None),
port_name: Mutex::new(None),
skip_handshake,
no_ceremony: true, library_installed: AtomicBool::new(false),
skip_auto_install: true,
}
}
}
#[async_trait]
impl Subsystem for Jumperless {
fn name(&self) -> &str {
"jumperless"
}
fn tools(&self) -> Vec<ToolDescriptor> {
let mut all = tools::overlay::descriptors();
all.extend(tools::slot::descriptors());
all.extend(tools::context::descriptors());
all.extend(tools::core::descriptors());
all.extend(tools::dev::descriptors());
all.extend(tools::state::descriptors());
all.extend(tools::connections::descriptors());
all.extend(tools::analog::descriptors());
all.extend(tools::digital::descriptors());
all.extend(tools::oled::descriptors());
all.extend(tools::power::descriptors());
all.extend(tools::probe::descriptors());
all.extend(tools::wavegen::descriptors());
all
}
async fn connect(&mut self, args: &ConnectArgs) -> Result<(), McpError> {
if self.port.lock().unwrap().is_some() {
tracing::debug!("connect() called on already-connected Jumperless (no-op)");
return Ok(());
}
let port_name = if let Some(explicit) = &args.port_override {
tracing::info!(port = %explicit, "using explicit --port override");
explicit.clone()
} else {
let info = tokio::task::spawn_blocking(|| {
discover_composite_port_by_index(VidPid(JL_VID, JL_PID), JL_RAW_REPL_PORT_INDEX)
})
.await
.map_err(|e| {
McpError::Connection(format!("spawn_blocking panicked during discovery: {e}"))
})??;
tracing::info!(
port = %info.port_name,
vid = format!("{:#06x}", JL_VID),
pid = format!("{:#06x}", JL_PID),
port_index = JL_RAW_REPL_PORT_INDEX,
"auto-discovered Jumperless Raw REPL (MI_04 / JLV5port5)"
);
info.port_name
};
let port_name_clone = port_name.clone();
let handle = tokio::task::spawn_blocking(move || {
serialport::new(&port_name_clone, USB_CDC_BAUD)
.timeout(Duration::from_millis(PORT_OPEN_TIMEOUT_MS))
.open()
})
.await
.map_err(|e| {
McpError::Connection(format!("spawn_blocking panicked during port open: {e}"))
})?
.map_err(|e| {
McpError::Connection(format!("failed to open serial port '{}': {}", port_name, e))
})?;
tracing::info!(port = %port_name, "serial port opened successfully");
let port_name_for_dtr = port_name.clone();
let handle = tokio::task::spawn_blocking(move || {
let mut h = handle;
h.write_data_terminal_ready(true)?;
h.write_request_to_send(true)?;
Ok::<_, serialport::Error>(h)
})
.await
.map_err(|join_err| {
tracing::error!(
port = %port_name_for_dtr,
err = %join_err,
"spawn_blocking panicked during DTR/RTS assert; port handle dropped, reconnect required"
);
McpError::Connection(format!("spawn_blocking panicked during DTR/RTS assert: {join_err}"))
})?
.map_err(|e| {
McpError::Connection(format!("failed to assert DTR/RTS on '{}': {}", port_name, e))
})?;
tokio::time::sleep(std::time::Duration::from_millis(DTR_SETTLE_MS)).await;
tracing::info!(port = %port_name, "DTR/RTS asserted; line-state settled");
if self.skip_handshake {
tracing::warn!(
port = %port_name,
"--skip-handshake set: raw mode NOT entered; health_check will return Degraded"
);
*self.port.lock().unwrap() = Some(handle);
} else {
let port_name_for_err = port_name.clone();
let blocking_result = tokio::task::spawn_blocking(move || {
let mut h = handle;
let result = repl::enter_raw_mode(&mut *h);
(h, result)
})
.await
.map_err(|join_err| {
tracing::error!(
port = %port_name_for_err,
err = %join_err,
"spawn_blocking panicked during enter_raw_mode; port handle dropped, reconnect required"
);
McpError::Connection(format!(
"spawn_blocking panicked during enter_raw_mode: {join_err}"
))
})?;
let (handle, enter_result) = blocking_result;
enter_result.map_err(|e| {
McpError::Connection(format!(
"Raw REPL handshake failed on '{}': {}",
port_name, e
))
})?;
tracing::info!(port = %port_name, "Raw REPL raw mode entered");
let port_name_for_lib = port_name.clone();
let port_name_for_lib_post = port_name_for_lib.clone();
let skip_auto = self.skip_auto_install;
let lib_spawn = tokio::task::spawn_blocking(move || {
let mut h = handle;
if skip_auto {
return (h, Ok(false));
}
if let Err(e) =
h.set_timeout(Duration::from_millis(library::LIBRARY_FILE_OP_TIMEOUT_MS))
{
tracing::warn!(
port = %port_name_for_lib,
error = %e,
"failed to set timeout before library install; install may fail"
);
}
let result = library::install_if_needed(&mut *h);
if let Err(e) = h.set_timeout(Duration::from_millis(PORT_OPEN_TIMEOUT_MS)) {
tracing::warn!(
port = %port_name_for_lib,
error = %e,
"failed to restore timeout after library install"
);
}
(h, result)
})
.await;
let (handle, lib_ok) = match lib_spawn {
Ok(pair) => pair,
Err(join_err) => {
tracing::error!(
port = %port_name_for_lib_post,
err = %join_err,
"library install spawn_blocking panicked (handle lost, reconnect required)"
);
return Err(McpError::Connection(format!(
"library install spawn_blocking panicked: {join_err}"
)));
}
};
let ceremony_enabled = match lib_ok {
Ok(true) => {
tracing::info!(
port = %port_name_for_lib_post,
version = library::LIBRARY_VERSION,
"installed jumperless_mcp library"
);
self.library_installed.store(true, Ordering::Relaxed);
!self.no_ceremony
}
Ok(false) => {
tracing::debug!(
port = %port_name_for_lib_post,
version = library::LIBRARY_VERSION,
"jumperless_mcp library already up-to-date"
);
self.library_installed.store(true, Ordering::Relaxed);
!self.no_ceremony
}
Err(e) => {
tracing::warn!(
port = %port_name_for_lib_post,
error = %e,
"library install failed; ceremony skipped (connection proceeds). \
Tool calls that depend on the library will fail until \
library_install is run successfully."
);
self.library_installed.store(false, Ordering::Relaxed);
false }
};
if ceremony_enabled {
let port_name_for_ceremony = port_name.clone();
let ceremony_result = tokio::task::spawn_blocking(move || {
let mut h = handle;
h.set_timeout(Duration::from_millis(CEREMONY_READ_TIMEOUT_MS))
.map_err(|e| {
McpError::Connection(format!(
"failed to set ceremony read timeout to {}ms: {}",
CEREMONY_READ_TIMEOUT_MS, e
))
})?;
let result = repl::exec_code(&mut *h, ceremony::CEREMONY_CONNECT_SCRIPT);
if let Err(e) = h.set_timeout(Duration::from_millis(PORT_OPEN_TIMEOUT_MS)) {
tracing::error!(
error = %e,
"failed to restore short read timeout after connect ceremony; \
subsequent fast operations may incur longer waits"
);
}
Ok::<_, McpError>((h, result))
})
.await;
match ceremony_result {
Ok(Ok((h, Ok(resp)))) => {
if resp.is_error() {
tracing::warn!(
port = %port_name_for_ceremony,
stderr = %resp.stderr,
"connect ceremony script raised a Python exception (device-side); connection proceeds"
);
} else {
if !resp.stdout.trim().is_empty() {
tracing::info!(
port = %port_name_for_ceremony,
stdout = %resp.stdout.trim(),
"connect ceremony completed (with stdout)"
);
} else {
tracing::info!(port = %port_name_for_ceremony, "connect ceremony completed");
}
}
*self.port.lock().unwrap() = Some(h);
}
Ok(Ok((mut h, Err(e)))) => {
tracing::warn!(
port = %port_name_for_ceremony,
err = %e,
"connect ceremony failed (best-effort — connection proceeds; aborting device-side script + draining buffer)"
);
if let Err(drain_err) = repl::send_ctrl_c(&mut *h) {
tracing::warn!(error = %drain_err, "failed to send Ctrl-C abort after connect ceremony timeout");
}
let drained1 = repl::drain_read_buffer(&mut *h).unwrap_or(0);
std::thread::sleep(Duration::from_millis(10));
let drained2 = repl::drain_read_buffer(&mut *h).unwrap_or(0);
if drained1 + drained2 > 0 {
tracing::debug!(
drained_bytes = drained1 + drained2,
"drained leftover bytes after connect ceremony abort"
);
}
*self.port.lock().unwrap() = Some(h);
}
Ok(Err(e)) => {
tracing::error!(
port = %port_name_for_ceremony,
err = %e,
"connect ceremony set_timeout failed (hard failure)"
);
return Err(e);
}
Err(join_err) => {
tracing::error!(
port = %port_name_for_ceremony,
err = %join_err,
"connect ceremony spawn_blocking panicked (handle lost, reconnect required)"
);
return Err(McpError::Connection(format!(
"connect ceremony spawn_blocking panicked (handle lost): {join_err}"
)));
}
}
} else {
*self.port.lock().unwrap() = Some(handle);
}
}
*self.port_name.lock().unwrap() = Some(port_name);
Ok(())
}
async fn disconnect(&mut self) -> Result<(), McpError> {
let was_lib_installed = self.library_installed.load(Ordering::Relaxed);
let name = self.port_name.lock().unwrap().take();
match name.as_deref() {
Some(n) => tracing::info!(port = %n, "disconnecting Jumperless"),
None => tracing::debug!("disconnect called on already-disconnected Jumperless (no-op)"),
}
if !self.skip_handshake {
let maybe_handle = { self.port.lock().unwrap().take() };
if let Some(handle) = maybe_handle {
let port_name_snapshot = name.as_deref().unwrap_or("<unknown>").to_owned();
let no_ceremony = self.no_ceremony;
let lib_installed = was_lib_installed;
let port_name_for_dc = port_name_snapshot.clone();
let lib_root = library::LIBRARY_ROOT;
let dc_result = tokio::task::spawn_blocking(move || {
let mut h = handle;
if !no_ceremony && lib_installed {
if let Err(e) = h.set_timeout(Duration::from_millis(CEREMONY_READ_TIMEOUT_MS)) {
tracing::error!(
port = %port_name_for_dc,
error = %e,
"failed to set ceremony read timeout before disconnect ceremony; \
ceremony may time out"
);
}
match repl::exec_code(&mut *h, ceremony::CEREMONY_DISCONNECT_SCRIPT) {
Ok(resp) => {
if resp.is_error() {
if resp.stderr.contains(lib_root) {
tracing::error!(
port = %port_name_for_dc,
stderr = %resp.stderr,
"disconnect ceremony failed because library files are \
missing or corrupt — run `jumperless-mcp library install \
--force` to reinstall"
);
} else {
tracing::warn!(
port = %port_name_for_dc,
stderr = %resp.stderr,
"disconnect ceremony script raised a Python exception (device-side)"
);
}
} else {
tracing::info!(
port = %port_name_for_dc,
"disconnect ceremony completed"
);
}
}
Err(e) => {
tracing::warn!(
port = %port_name_for_dc,
err = %e,
"disconnect ceremony failed (best-effort — disconnect proceeds; aborting device-side script + draining buffer)"
);
if let Err(drain_err) = repl::send_ctrl_c(&mut *h) {
tracing::warn!(error = %drain_err, "failed to send Ctrl-C abort after disconnect ceremony timeout");
}
let drained1 = repl::drain_read_buffer(&mut *h).unwrap_or(0);
std::thread::sleep(Duration::from_millis(10));
let drained2 = repl::drain_read_buffer(&mut *h).unwrap_or(0);
if drained1 + drained2 > 0 {
tracing::debug!(
drained_bytes = drained1 + drained2,
"drained leftover bytes after disconnect ceremony abort"
);
}
}
}
if let Err(e) = h.set_timeout(Duration::from_millis(PORT_OPEN_TIMEOUT_MS)) {
tracing::error!(
error = %e,
"failed to restore short read timeout after disconnect ceremony; \
subsequent fast operations may incur longer waits"
);
}
} else if !no_ceremony && !lib_installed {
tracing::debug!(
port = %port_name_for_dc,
"skipping disconnect ceremony — library not installed"
);
}
match repl::exit_raw_mode(&mut *h) {
Ok(_) => tracing::debug!(port = %port_name_for_dc, "exit_raw_mode completed"),
Err(e) => tracing::warn!(
port = %port_name_for_dc,
err = %e,
"exit_raw_mode failed (best-effort — disconnect proceeds)"
),
}
})
.await;
if let Err(join_err) = dc_result {
tracing::error!(
port = %port_name_snapshot,
err = %join_err,
"disconnect spawn_blocking panicked (handle lost; port dropped)"
);
return Ok(());
}
}
} else {
self.port.lock().unwrap().take();
}
self.library_installed.store(false, Ordering::Relaxed);
Ok(())
}
async fn health_check(&self) -> Result<HealthStatus, McpError> {
if self.skip_handshake {
return Ok(HealthStatus {
level: HealthLevel::Degraded {
reason: "raw mode not entered (--skip-handshake set)".into(),
},
last_seen_unix_ms: 0,
latency_ms: None,
version: self.version().to_string(),
ring: self.carabiner_ring(),
subsystem_specific: serde_json::Value::Null,
});
}
let port_handle = self.port.lock().unwrap().take();
let Some(handle) = port_handle else {
return Ok(HealthStatus {
level: HealthLevel::Unhealthy {
reason: "subsystem not connected".into(),
},
last_seen_unix_ms: 0,
latency_ms: None,
version: self.version().to_string(),
ring: self.carabiner_ring(),
subsystem_specific: serde_json::Value::Null,
});
};
let blocking_result = tokio::task::spawn_blocking(
move || -> (Box<dyn serialport::SerialPort + Send>, Result<(), repl::ReplError>, u64) {
let mut handle = handle;
let t0 = std::time::Instant::now();
let result = repl::ping(&mut *handle);
let latency_ms = t0.elapsed().as_millis() as u64;
(handle, result, latency_ms)
},
)
.await;
let (handle, ping_result, latency_ms) = match blocking_result {
Ok(pair) => pair,
Err(join_err) => {
return Err(McpError::Connection(format!(
"health_check spawn_blocking panicked (handle lost; reconnect required): {join_err}"
)));
}
};
*self.port.lock().unwrap() = Some(handle);
let level = match ping_result {
Ok(()) => HealthLevel::Healthy,
Err(e) => HealthLevel::Unhealthy {
reason: format!("Raw REPL ping failed: {e}"),
},
};
let last_seen_unix_ms = if matches!(level, HealthLevel::Healthy) {
HealthStatus::now_unix_ms()
} else {
0
};
Ok(HealthStatus {
level,
last_seen_unix_ms,
latency_ms: Some(latency_ms),
version: self.version().to_string(),
ring: self.carabiner_ring(),
subsystem_specific: serde_json::Value::Null,
})
}
async fn shutdown(&mut self) -> Result<(), McpError> {
self.disconnect().await
}
async fn invoke(&self, tool_name: &str, args: Value) -> Result<Value, McpError> {
if !self.library_installed.load(Ordering::Relaxed) {
return Err(McpError::Protocol(
"library not installed; run library_install (or check device hardware connection)"
.into(),
));
}
let port_handle = { self.port.lock().unwrap().take() };
let Some(handle) = port_handle else {
return Err(McpError::Connection("device not connected".into()));
};
let tool_name_owned = tool_name.to_string();
let blocking_result = tokio::task::spawn_blocking(move || {
let mut handle = handle;
let port = &mut *handle;
let result: Result<Value, McpError> = match tool_name_owned.as_str() {
"overlay_serialize" => tools::overlay::handle_overlay_serialize(port),
"overlay_list" => tools::overlay::handle_overlay_list(port),
"overlay_clear" => tools::overlay::handle_overlay_clear(port, &args),
"overlay_clear_all" => tools::overlay::handle_overlay_clear_all(port),
"slot_has_changes" => tools::slot::handle_slot_has_changes(port),
"slot_save" => tools::slot::handle_slot_save(port, &args),
"slot_discard" => tools::slot::handle_slot_discard(port),
"slot_load" => tools::slot::handle_slot_load(port, &args),
"slot_get_current" => tools::slot::handle_slot_get_current(port),
"context_get" => tools::context::handle_context_get(port),
"context_toggle" => tools::context::handle_context_toggle(port),
"core_pause" => tools::core::handle_core_pause(port),
"core_resume" => tools::core::handle_core_resume(port),
"dev_exec_python" => tools::dev::handle_dev_exec_python(port, &args),
"get_state" => tools::state::handle_state_get(port, &args),
"set_state" => tools::state::handle_state_set(port, &args),
"connect" => tools::connections::handle_connect(port, &args),
"disconnect" => tools::connections::handle_disconnect(port, &args),
"nodes_clear" => tools::connections::handle_nodes_clear(port),
"is_connected" => tools::connections::handle_is_connected(port, &args),
"dac_set" => tools::analog::handle_dac_set(port, &args),
"dac_get" => tools::analog::handle_dac_get(port, &args),
"adc_get" => tools::analog::handle_adc_get(port, &args),
"adc_get_stats" => tools::analog::handle_adc_get_stats(port, &args),
"gpio_set" => tools::digital::handle_gpio_set(port, &args),
"gpio_get" => tools::digital::handle_gpio_get(port, &args),
"gpio_set_dir" => tools::digital::handle_gpio_set_dir(port, &args),
"oled_print" => tools::oled::handle_oled_print(port, &args),
"oled_clear" => tools::oled::handle_oled_clear(port),
"ina_get_current" => tools::power::handle_ina_get_current(port, &args),
"ina_get_voltage" => tools::power::handle_ina_get_voltage(port, &args),
"ina_get_power" => tools::power::handle_ina_get_power(port, &args),
"probe_read_blocking" => tools::probe::handle_probe_read_blocking(port),
"probe_read_nonblocking" => tools::probe::handle_probe_read_nonblocking(port),
"probe_button" => tools::probe::handle_probe_button(port),
"wavegen_set_output" => tools::wavegen::handle_wavegen_set_output(port, &args),
"wavegen_set_wave" => tools::wavegen::handle_wavegen_set_wave(port, &args),
"wavegen_set_freq" => tools::wavegen::handle_wavegen_set_freq(port, &args),
"wavegen_set_amplitude" => {
tools::wavegen::handle_wavegen_set_amplitude(port, &args)
}
"wavegen_set_offset" => tools::wavegen::handle_wavegen_set_offset(port, &args),
"wavegen_start" => tools::wavegen::handle_wavegen_start(port, &args),
"wavegen_stop" => tools::wavegen::handle_wavegen_stop(port),
_ => Err(McpError::Protocol(format!(
"tool not yet implemented: {tool_name_owned}; \
invoke via CLI subcommand instead: `jumperless-mcp {}`",
tool_name_owned
.strip_prefix("library_")
.unwrap_or(&tool_name_owned)
))),
};
(handle, result)
})
.await;
let (handle, tool_result) = match blocking_result {
Ok(pair) => pair,
Err(join_err) => {
return Err(McpError::Connection(format!(
"invoke spawn_blocking panicked (handle lost; reconnect required): {join_err}"
)));
}
};
*self.port.lock().unwrap() = Some(handle);
tool_result
}
fn version(&self) -> &str {
env!("CARGO_PKG_VERSION")
}
fn carabiner_ring(&self) -> Option<RingId> {
None
}
}
#[derive(Subcommand, Debug)]
enum Command {
FirmwareProbe,
Library(LibraryArgs),
}
#[derive(clap::Args, Debug)]
struct LibraryArgs {
#[command(subcommand)]
command: LibraryCommand,
}
#[derive(Subcommand, Debug)]
enum LibraryCommand {
Install {
#[arg(long)]
force: bool,
},
Uninstall,
CheckInstallation,
Verify,
Dump {
path: String,
#[arg(long)]
out: Option<std::path::PathBuf>,
},
}
#[derive(Parser, Debug)]
#[command(name = "jumperless-mcp", version, about)]
struct JumperlessCli {
#[command(flatten)]
common: CommonCli,
#[arg(long)]
skip_handshake: bool,
#[arg(long)]
no_ceremony: bool,
#[arg(long)]
smoke_test: bool,
#[arg(long, default_value_t = 2000)]
smoke_hold_ms: u64,
#[arg(long)]
json: bool,
#[command(subcommand)]
command: Option<Command>,
}
#[tokio::main]
async fn main() -> Result<(), McpError> {
let cli = JumperlessCli::parse();
if let Some(Command::FirmwareProbe) = &cli.command {
crate::base::init_tracing(cli.common.log_level);
let connect_args = ConnectArgs::from_cli(&cli.common)?;
let mut subsystem = Jumperless::new_for_library_cli(cli.skip_handshake);
subsystem.connect(&connect_args).await?;
let port_name_snap = subsystem
.port_name
.lock()
.unwrap()
.clone()
.unwrap_or_else(|| "<unknown>".to_string());
let port_opt = { subsystem.port.lock().unwrap().take() };
let Some(handle) = port_opt else {
return Err(McpError::Connection("device not connected".into()));
};
let json_mode = cli.json;
let port_name_for_spawn = port_name_snap.clone();
let result = tokio::task::spawn_blocking(move || {
let mut h = handle;
let probe_result = probe::run_probe(&mut *h);
(h, probe_result)
})
.await
.map_err(|e| {
McpError::Connection(format!("firmware-probe spawn_blocking panicked: {e}"))
})?;
let (handle, probe_outcome) = result;
*subsystem.port.lock().unwrap() = Some(handle);
let op_result: Result<(), McpError> = match probe_outcome {
Ok(probe) => {
if json_mode {
let json = serde_json::json!({
"port": port_name_for_spawn,
"probe_complete": probe.probe_complete,
"raw_output": probe.raw_output,
"force_service_bound": probe.force_service_bound,
"jOS_present": probe.jos_present,
"library_version": probe.library_version,
});
println!("{json}");
} else {
println!("[firmware-probe] connected to {port_name_for_spawn}");
print!("{}", probe.raw_output);
if !probe.probe_complete {
eprintln!(
"[firmware-probe] WARNING: probe did not reach completion sentinel \
— verdict flags may be unreliable"
);
}
println!("[firmware-probe] done");
}
tracing::info!(
port = %port_name_for_spawn,
probe_complete = probe.probe_complete,
force_service = probe.force_service_bound,
jos = probe.jos_present,
library_version = ?probe.library_version,
"firmware-probe complete"
);
Ok(())
}
Err(e) => {
tracing::error!(port = %port_name_for_spawn, error = %e, "firmware-probe failed");
Err(e)
}
};
if let Err(disc_err) = subsystem.disconnect().await {
tracing::warn!(error = %disc_err, "firmware-probe disconnect failed (best-effort)");
}
return op_result;
}
if let Some(Command::Library(LibraryArgs { command: lib_cmd })) = &cli.command {
crate::base::init_tracing(cli.common.log_level);
let connect_args = ConnectArgs::from_cli(&cli.common)?;
let mut subsystem = Jumperless::new_for_library_cli(cli.skip_handshake);
subsystem.connect(&connect_args).await?;
let op_result: Result<(), McpError> = {
let port_opt = subsystem.port.lock().unwrap().take();
let Some(handle) = port_opt else {
return Err(McpError::Connection("device not connected".into()));
};
let cmd = match lib_cmd {
LibraryCommand::Install { force } => {
if *force {
"reinstall"
} else {
"install"
}
}
LibraryCommand::Uninstall => "uninstall",
LibraryCommand::CheckInstallation => "check",
LibraryCommand::Verify => "verify",
LibraryCommand::Dump { .. } => "dump",
};
let port_name_snap = subsystem
.port_name
.lock()
.unwrap()
.clone()
.unwrap_or_else(|| "<unknown>".to_string());
let cmd_owned = cmd.to_string();
let dump_path: Option<String> = if let LibraryCommand::Dump { path, .. } = lib_cmd {
Some(path.clone())
} else {
None
};
let dump_out: Option<std::path::PathBuf> =
if let LibraryCommand::Dump { out, .. } = lib_cmd {
out.clone()
} else {
None
};
let result = tokio::task::spawn_blocking(move || {
let mut h = handle;
if let Err(e) = h.set_timeout(Duration::from_millis(library::LIBRARY_FILE_OP_TIMEOUT_MS)) {
return Err(McpError::Connection(format!(
"failed to set library op read timeout: {e}; \
device may be left in Raw REPL — power-cycle the device \
or run `--smoke-test` to verify state"
)));
}
let result: Result<(String, String), McpError> = match cmd_owned.as_str() {
"install" => {
library::install_if_needed(&mut *h)
.map(|did_install| {
let human = if did_install {
format!("installed v{}", library::LIBRARY_VERSION)
} else {
format!("already up-to-date v{}", library::LIBRARY_VERSION)
};
let json = serde_json::json!({
"status": if did_install { "installed" } else { "up_to_date" },
"version": library::LIBRARY_VERSION,
})
.to_string();
(human, json)
})
}
"reinstall" => {
library::reinstall(&mut *h)
.map(|r| {
let human = match &r.pre_uninstall {
None => {
format!(
"reinstall: uninstall step returned Err (see logs); \
proceeded with install. Installed v{}.",
r.installed_version
)
}
Some(pre) if pre.errors.is_empty() => {
format!(
"reinstall: uninstalled {} of {} files; \
installed v{}.",
pre.removed, pre.attempted, r.installed_version
)
}
Some(pre) => {
format!(
"reinstall: uninstalled {} of {} files \
({} error(s): {}); installed v{}.",
pre.removed,
pre.attempted,
pre.errors.len(),
pre.errors.join("; "),
r.installed_version
)
}
};
let pre_json = match &r.pre_uninstall {
None => serde_json::json!(null),
Some(pre) => serde_json::json!({
"removed": pre.removed,
"attempted": pre.attempted,
"attempted_actual": pre.attempted_actual,
"errors": pre.errors,
}),
};
let json = serde_json::json!({
"status": "reinstalled",
"installed_version": r.installed_version,
"pre_uninstall": pre_json,
})
.to_string();
(human, json)
})
}
"uninstall" => {
library::uninstall(&mut *h)
.map(|r| {
let human = if !r.errors.is_empty() {
format!(
"removed {} of {} files (errors: {})",
r.removed, r.attempted,
r.errors.join("; ")
)
} else {
format!("removed {} of {} files", r.removed, r.attempted)
};
let status = if !r.errors.is_empty() {
"partial"
} else {
"removed"
};
let json = serde_json::json!({
"status": status,
"removed": r.removed,
"attempted": r.attempted,
"attempted_actual": r.attempted_actual,
"errors": r.errors,
})
.to_string();
(human, json)
})
}
"check" => {
library::check_installation(&mut *h).map(|s| {
let human = format!(
"installed={} partial={} up_to_date={} installed_version={} \
current_version={} files_present={:?} files_missing={:?}",
s.installed,
s.partial,
s.up_to_date,
s.installed_version.as_deref().unwrap_or("none"),
s.current_version,
s.files_present,
s.files_missing,
);
let json = serde_json::json!({
"installed": s.installed,
"partial": s.partial,
"up_to_date": s.up_to_date,
"installed_version": s.installed_version,
"current_version": s.current_version,
"files_present": s.files_present,
"files_missing": s.files_missing,
})
.to_string();
(human, json)
})
}
"verify" => {
library::verify_installation(&mut *h).map(|r| {
let mut lines: Vec<String> = Vec::new();
for fv in &r.files {
let file_name = fv.path.split('/').next_back().unwrap_or(&fv.path);
if fv.device_size.is_none() {
lines.push(format!("{file_name}: MISSING on device ✗ MISMATCH"));
} else if !fv.matched {
let size_note = match fv.device_size {
Some(ds) if ds != fv.source_size => {
format!("{ds} bytes ≠ {} bytes", fv.source_size)
}
_ => format!("{} bytes (size OK)", fv.source_size),
};
lines.push(format!("{file_name}: {size_note} ✗ MISMATCH"));
if let Some(ref dsha) = fv.device_sha256 {
lines.push(format!(" source SHA: {}", fv.source_sha256));
lines.push(format!(" device SHA: {dsha}"));
}
} else if fv.hash_available_on_device {
lines.push(format!(
"{file_name}: {} bytes ✓ match",
fv.source_size
));
} else {
lines.push(format!(
"{file_name}: {} bytes ✓ match (no hashlib on device — size-only verification)",
fv.source_size
));
}
}
if r.all_match {
lines.push(String::from("\nResult: all files match source."));
} else {
let mismatched: Vec<&str> = r
.files
.iter()
.filter(|fv| !fv.matched)
.map(|fv| fv.path.as_str())
.collect();
lines.push(format!(
"\nResult: MISMATCH on {} — chunked install produced different content than source.",
mismatched.join(", ")
));
}
let human = lines.join("\n");
let files_json: Vec<serde_json::Value> = r
.files
.iter()
.map(|fv| {
serde_json::json!({
"path": fv.path,
"device_size": fv.device_size,
"source_size": fv.source_size,
"device_sha256": fv.device_sha256,
"source_sha256": fv.source_sha256,
"hash_available_on_device": fv.hash_available_on_device,
"match": fv.matched,
})
})
.collect();
let json = serde_json::json!({
"files": files_json,
"all_match": r.all_match,
"library_version": r.library_version,
})
.to_string();
(human, json)
})
}
"dump" => {
let path = dump_path.as_deref().unwrap_or("");
library::dump_device_file(&mut *h, path).map(|r| {
if r.missing {
let human = format!("{path}: MISSING on device");
let json = serde_json::json!({
"path": r.path,
"missing": true,
"size": serde_json::Value::Null,
})
.to_string();
(human, json)
} else {
let bytes = r.content.unwrap_or_default();
let size = r.size.unwrap_or(bytes.len());
let human = if let Some(ref out_path) = dump_out {
match std::fs::write(out_path, &bytes) {
Ok(()) => format!(
"{path}: {size} bytes → written to {}",
out_path.display()
),
Err(e) => format!(
"{path}: {size} bytes — WRITE FAILED: {e}"
),
}
} else {
let content_str = String::from_utf8_lossy(&bytes);
format!(
"--- BEGIN {path} ({size} bytes) ---\n{content_str}\n--- END ---"
)
};
let json = serde_json::json!({
"path": r.path,
"missing": false,
"size": size,
"out": dump_out.as_ref().map(|p| p.display().to_string()),
})
.to_string();
(human, json)
}
})
}
_ => unreachable!(),
};
if let Err(e) = h.set_timeout(Duration::from_millis(PORT_OPEN_TIMEOUT_MS)) {
tracing::error!(
error = %e,
"failed to restore short read timeout after library op; \
subsequent fast operations may incur longer waits"
);
}
Ok((h, result))
})
.await
.map_err(|e| McpError::Connection(format!("spawn_blocking panicked: {e}")))?;
match result {
Err(e) => {
tracing::error!(
port = %port_name_snap,
error = %e,
"library op aborted before running (set_timeout failed); \
handle lost — device may be left in Raw REPL"
);
Err(e)
}
Ok((handle, outcome)) => {
*subsystem.port.lock().unwrap() = Some(handle);
match outcome {
Ok((human_msg, json_msg)) => {
match lib_cmd {
LibraryCommand::Install { .. } => {
subsystem.library_installed.store(true, Ordering::Relaxed);
}
LibraryCommand::Uninstall => {
subsystem.library_installed.store(false, Ordering::Relaxed);
}
_ => {
}
}
if cli.json {
println!("{json_msg}");
} else {
println!("{human_msg}");
}
tracing::info!(port = %port_name_snap, msg = %human_msg, "library op complete");
Ok(())
}
Err(e) => {
tracing::error!(port = %port_name_snap, error = %e, "library op failed");
Err(e)
}
}
}
}
};
subsystem.disconnect().await?;
return op_result;
}
if cli.smoke_test {
crate::base::init_tracing(cli.common.log_level);
tracing::info!(
hold_ms = cli.smoke_hold_ms,
"smoke-test: starting lifecycle dogfood"
);
let connect_args = ConnectArgs::from_cli(&cli.common)?;
let mut subsystem = Jumperless::new(cli.skip_handshake, cli.no_ceremony);
subsystem.connect(&connect_args).await?;
tracing::info!(
hold_ms = cli.smoke_hold_ms,
"smoke-test: connected; holding before disconnect"
);
tokio::time::sleep(Duration::from_millis(cli.smoke_hold_ms)).await;
subsystem.disconnect().await?;
tracing::info!("smoke-test: complete — clean exit");
return Ok(());
}
crate::base::run(
Jumperless::new(cli.skip_handshake, cli.no_ceremony),
cli.common,
)
.await
}
#[cfg(test)]
mod tests {
use crate::base::{McpError, Subsystem};
use clap::Parser;
use super::{library, Jumperless, JumperlessCli};
#[test]
fn smoke_test_defaults() {
let cli = JumperlessCli::parse_from(["jumperless-mcp"]);
assert!(!cli.smoke_test, "smoke_test should default to false");
assert_eq!(
cli.smoke_hold_ms, 2000,
"smoke_hold_ms should default to 2000"
);
}
#[test]
fn smoke_test_flag_parses() {
let cli =
JumperlessCli::parse_from(["jumperless-mcp", "--smoke-test", "--smoke-hold-ms", "500"]);
assert!(cli.smoke_test, "--smoke-test should set smoke_test to true");
assert_eq!(
cli.smoke_hold_ms, 500,
"--smoke-hold-ms 500 should set hold to 500"
);
}
#[test]
fn double_connect_is_idempotent() {
let subsystem = Jumperless::new(false, true);
assert!(
subsystem.port.lock().unwrap().is_none(),
"freshly-constructed Jumperless should have port=None"
);
let is_connected_none: bool = subsystem.port.lock().unwrap().is_some();
assert!(
!is_connected_none,
"port=None should NOT trigger the double-connect guard"
);
}
#[tokio::test]
async fn invoke_returns_error_when_library_not_installed() {
use serde_json::json;
let subsystem = Jumperless::new(false, true);
let result = subsystem.invoke("library_check", json!({})).await;
assert!(
result.is_err(),
"invoke must return Err when library not installed"
);
match result.unwrap_err() {
McpError::Protocol(msg) => {
assert!(
msg.contains("library not installed"),
"error message must mention library not installed; got: {msg}"
);
}
other => panic!("expected McpError::Protocol, got: {other:?}"),
}
}
#[tokio::test]
async fn invoke_returns_not_implemented_error_for_unknown_tool() {
use serde_json::json;
use std::sync::atomic::Ordering;
let subsystem = Jumperless::new(false, true);
subsystem.library_installed.store(true, Ordering::Relaxed);
let result = subsystem.invoke("some_future_tool", json!({})).await;
assert!(result.is_err(), "invoke must return Err when not connected");
match result.unwrap_err() {
McpError::Connection(msg) => {
assert!(
msg.contains("not connected"),
"error must mention 'not connected'; got: {msg}"
);
}
McpError::Protocol(msg) => {
assert!(
!msg.contains("Wave 2 dispatch pending"),
"internal implementation notes must not leak; got: {msg}"
);
}
other => panic!("expected McpError::Connection or McpError::Protocol, got: {other:?}"),
}
}
#[tokio::test]
async fn library_installed_is_reset_on_disconnect() {
use std::sync::atomic::Ordering;
let mut subsystem = Jumperless::new(false, true);
subsystem.library_installed.store(true, Ordering::Relaxed);
assert!(
subsystem.library_installed.load(Ordering::Relaxed),
"pre-condition: flag must be true before disconnect"
);
subsystem
.disconnect()
.await
.expect("disconnect must not fail on unconnected subsystem");
assert!(
!subsystem.library_installed.load(Ordering::Relaxed),
"library_installed must be false after disconnect"
);
}
#[test]
fn json_flag_defaults_to_false() {
let cli = JumperlessCli::parse_from(["jumperless-mcp"]);
assert!(!cli.json, "--json should default to false");
}
#[test]
fn json_flag_sets_true_when_passed() {
let cli = JumperlessCli::parse_from([
"jumperless-mcp",
"--json",
"library",
"check-installation",
]);
assert!(cli.json, "--json flag should set json=true");
}
#[tokio::test]
async fn invoke_error_message_includes_cli_suggestion() {
use serde_json::json;
use std::sync::atomic::Ordering;
let subsystem = Jumperless::new(false, true);
subsystem.library_installed.store(true, Ordering::Relaxed);
let result = subsystem.invoke("library_install", json!({})).await;
assert!(result.is_err(), "invoke must return Err when not connected");
match result.unwrap_err() {
McpError::Connection(msg) => {
assert!(
msg.contains("not connected"),
"error must mention 'not connected'; got: {msg}"
);
}
other => panic!("expected McpError::Connection, got: {other:?}"),
}
}
#[test]
fn new_for_library_cli_sets_skip_auto_install_and_no_ceremony() {
let subsystem = Jumperless::new_for_library_cli(false);
assert!(
subsystem.skip_auto_install,
"library CLI mode must skip auto-install"
);
assert!(
subsystem.no_ceremony,
"library CLI mode must disable ceremony"
);
}
fn uninstall_status_string(r: &library::UninstallResult) -> &'static str {
if !r.errors.is_empty() {
"partial"
} else {
"removed"
}
}
#[test]
fn uninstall_json_status_removed_on_clean_uninstall() {
let r = library::UninstallResult {
attempted: 4,
attempted_actual: 4,
removed: 4,
errors: vec![],
};
assert_eq!(uninstall_status_string(&r), "removed");
}
#[test]
fn uninstall_json_status_partial_on_errors() {
let r = library::UninstallResult {
attempted: 4,
attempted_actual: 4,
removed: 3,
errors: vec!["VERSION: device-side exception: OSError".to_string()],
};
assert_eq!(uninstall_status_string(&r), "partial");
}
#[test]
fn uninstall_json_status_no_unbound_state() {
let r_clean = library::UninstallResult {
attempted: 4,
attempted_actual: 4,
removed: 4,
errors: vec![],
};
let r_partial = library::UninstallResult {
attempted: 4,
attempted_actual: 4,
removed: 2,
errors: vec!["font.py: ERR:IOError".to_string()],
};
assert_eq!(uninstall_status_string(&r_clean), "removed");
assert_eq!(uninstall_status_string(&r_partial), "partial");
assert!(
!["removed", "partial"].contains(&"unbound_fallback"),
"Phase C: unbound_fallback state has been removed"
);
}
#[test]
fn library_installed_flag_starts_false_and_is_settable() {
use std::sync::atomic::Ordering;
let subsystem = Jumperless::new(false, true);
assert!(
!subsystem.library_installed.load(Ordering::Relaxed),
"library_installed must default to false"
);
subsystem.library_installed.store(true, Ordering::Relaxed);
assert!(
subsystem.library_installed.load(Ordering::Relaxed),
"library_installed must be settable to true"
);
}
}