jumperless-mcp 0.1.0

MCP server for the Jumperless V5 — persistent USB-serial bridge exposing the firmware API to LLMs
//! Connect / disconnect ceremony scripts for the Jumperless V5.
//!
//! Phase A+B+C: ceremony now uses `import jumperless_mcp` rather than
//! `exec(jfs.read(...))`. The library lives at
//! `/python_scripts/lib/jumperless_mcp/` which is on `sys.path` at firmware
//! init (confirmed Python_Proper.cpp:4706), so a bare `import jumperless_mcp`
//! resolves without any path manipulation.
//!
//! Both scripts are now 2-liners:
//! ```python
//! import jumperless_mcp
//! jumperless_mcp._ceremony_connect()   # or _ceremony_disconnect()
//! ```
//!
//! The ceremony logic lives in `__init__.py` on the device, which imports
//! `jumperless_mcp.effects` and `jumperless_mcp.font` via normal package
//! mechanics. This eliminates all `exec(jfs.read(...))` / `jfs.open(...)`
//! / `fs_read(...)` chains and the 4096-byte static-buffer truncation risk.
//!
//! # Import retry
//!
//! Scout flagged that `import jumperless_mcp` might fail immediately after
//! `jfs.mkdir` on a fresh device (VFS may need a moment to settle after
//! directory creation). Both scripts include a 100ms retry on ImportError.
//! If still failing after retry, the exception propagates — connect proceeds
//! with ceremony skipped (same best-effort semantics as before).
//!
//! # Rail color flip — firmware-locked (5.6.6.2 confirmed)
//!
//! `assignNetColors()` in JumperlOS/src/LEDs.cpp (lines 1985-1988) runs every
//! tick and overwrites nets 1-5 back to hardcoded `railNetColors[]`. Rail flips
//! still don't work. Re-add only when Architeuthis exposes an override API.
//!
//! # Ceremony phases (connect)
//!
//! Defined in `__init__.py` `_ceremony_connect()`:
//! 1. **Clear OLED** (~0.05s)
//! 2. **Announce** (~0.05s): print "MCP" on OLED.
//! 3. **Banner wipe** (~1.0s): `wipe_edges(NASA_ORANGE, "L2R")`.
//! 4. **Connected banner** (~0.8s): print "MCP Connected" + hold.
//! 5. **Text scroll** (~2.7s): `marquee_scroll("MCP CONNECTED", NASA_ORANGE, "L2R")`.
//! 6. **Settle** (~0.05s): `overlay_clear_all()` + `corner_frame(NASA_ORANGE)` + `oled_clear()`.
//!
//! # Ceremony phases (disconnect)
//!
//! Defined in `__init__.py` `_ceremony_disconnect()`:
//! 1. **Reverse wipe** (~1.0s): `wipe_edges(NASA_ORANGE, "R2L")`.
//! 2. **Announce** (~0.8s): print "MCP Disconnected" + hold.
//! 3. **Text scroll** (~2.7s): `marquee_scroll("MCP DISCONNECTED", NASA_ORANGE, "R2L")`.
//! 4. **Clear** (~0.05s): `overlay_clear_all()` + `oled_clear()`.
//!
//! # Opt-out
//!
//! Both scripts are skipped when `--no-ceremony` is passed on the CLI, or when
//! the library install failed (graceful degradation: connect proceeds, ceremony skipped).
//!
// (Internal design notes for this module are kept outside the public source.)

/// Python script executed after Raw REPL entry + library install on connect.
///
/// Imports `jumperless_mcp` and calls `_ceremony_connect()`. Total wall-clock
/// including import: ~5-6s on device.
///
/// Import retry: on a fresh install the VFS may need ~100ms to settle after
/// jfs.mkdir. One retry after sleep_ms(100) handles this edge case cleanly.
/// If both attempts fail, propagates the ImportError — caught by connect's
/// best-effort ceremony handler and logged as a warning.
pub(crate) const CEREMONY_CONNECT_SCRIPT: &str = r#"
import sys
# Cache invalidation: MicroPython's sys.modules persists across raw-REPL
# sessions on V5 (REPL state is sticky until device reboot). If an earlier
# import populated sys.modules with a partial/namespace-package version of
# jumperless_mcp (e.g. during install verification when only some files
# existed), subsequent imports would see the cached version missing
# _ceremony_connect. Pop the package + submodules to force a fresh load
# from disk after every install.
sys.modules.pop('jumperless_mcp', None)
sys.modules.pop('jumperless_mcp.effects', None)
sys.modules.pop('jumperless_mcp.font', None)
try:
    import jumperless_mcp
except ImportError:
    import time
    time.sleep_ms(100)
    import jumperless_mcp
jumperless_mcp._ceremony_connect()
"#;

/// Python script executed on disconnect, BEFORE `exit_raw_mode`.
///
/// Mirror symmetry with connect: imports `jumperless_mcp` and calls
/// `_ceremony_disconnect()`. Total wall-clock: ~5-6s on device.
///
/// Same import retry logic as connect: one retry after 100ms if ImportError.
pub(crate) const CEREMONY_DISCONNECT_SCRIPT: &str = r#"
import sys
# Cache invalidation: same rationale as CONNECT_SCRIPT — force fresh load
# rather than trust sys.modules.
sys.modules.pop('jumperless_mcp', None)
sys.modules.pop('jumperless_mcp.effects', None)
sys.modules.pop('jumperless_mcp.font', None)
try:
    import jumperless_mcp
except ImportError:
    import time
    time.sleep_ms(100)
    import jumperless_mcp
jumperless_mcp._ceremony_disconnect()
"#;

// ── Tests ──────────────────────────────────────────────────────────────────────

#[cfg(test)]
mod tests {
    use super::*;

    // ── CEREMONY_CONNECT_SCRIPT content tests ─────────────────────────────────

    #[test]
    fn connect_script_is_non_empty() {
        assert!(!CEREMONY_CONNECT_SCRIPT.is_empty());
    }

    #[test]
    fn connect_script_uses_import_not_exec_jfs_read() {
        assert!(
            CEREMONY_CONNECT_SCRIPT.contains("import jumperless_mcp"),
            "CEREMONY_CONNECT_SCRIPT must use 'import jumperless_mcp' (Phase B); got:\n{}",
            CEREMONY_CONNECT_SCRIPT
        );
        assert!(
            !CEREMONY_CONNECT_SCRIPT.contains("exec("),
            "CEREMONY_CONNECT_SCRIPT must NOT use exec() — import replaces it; got:\n{}",
            CEREMONY_CONNECT_SCRIPT
        );
        assert!(
            !CEREMONY_CONNECT_SCRIPT.contains("jfs.open"),
            "CEREMONY_CONNECT_SCRIPT must NOT use jfs.open — import replaces it; got:\n{}",
            CEREMONY_CONNECT_SCRIPT
        );
        assert!(
            !CEREMONY_CONNECT_SCRIPT.contains("jfs.read"),
            "CEREMONY_CONNECT_SCRIPT must NOT use jfs.read — import replaces it; got:\n{}",
            CEREMONY_CONNECT_SCRIPT
        );
        assert!(
            !CEREMONY_CONNECT_SCRIPT.contains("fs_read("),
            "CEREMONY_CONNECT_SCRIPT must NOT use fs_read() fallback — import replaces it; got:\n{}",
            CEREMONY_CONNECT_SCRIPT
        );
    }

    #[test]
    fn connect_script_calls_ceremony_connect() {
        assert!(
            CEREMONY_CONNECT_SCRIPT.contains("jumperless_mcp._ceremony_connect()"),
            "CEREMONY_CONNECT_SCRIPT must call jumperless_mcp._ceremony_connect(); got:\n{}",
            CEREMONY_CONNECT_SCRIPT
        );
    }

    #[test]
    fn connect_script_has_import_retry_on_import_error() {
        assert!(
            CEREMONY_CONNECT_SCRIPT.contains("except ImportError:"),
            "CEREMONY_CONNECT_SCRIPT must have ImportError retry (VFS settle time); got:\n{}",
            CEREMONY_CONNECT_SCRIPT
        );
        assert!(
            CEREMONY_CONNECT_SCRIPT.contains("sleep_ms(100)"),
            "CEREMONY_CONNECT_SCRIPT retry must sleep 100ms before retry; got:\n{}",
            CEREMONY_CONNECT_SCRIPT
        );
    }

    /// Regression test (Phase 0b.4.3.5): rails are firmware-locked.
    #[test]
    fn connect_script_does_not_attempt_rail_color_flip() {
        assert!(
            !CEREMONY_CONNECT_SCRIPT.contains("set_net_color"),
            "CEREMONY_CONNECT_SCRIPT must NOT call set_net_color — rails are \
             firmware-locked (LEDs.cpp:1985-1988 assignNetColors overwrites every tick). Got:\n{}",
            CEREMONY_CONNECT_SCRIPT
        );
    }

    /// Phase B: ceremony must NOT inline FONT or marquee — those are in the package.
    #[test]
    fn connect_script_does_not_inline_font_dict() {
        assert!(
            !CEREMONY_CONNECT_SCRIPT.contains("FONT = {"),
            "CEREMONY_CONNECT_SCRIPT must NOT inline FONT dict — it's in the resident library; got:\n{}",
            CEREMONY_CONNECT_SCRIPT
        );
    }

    #[test]
    fn connect_script_does_not_define_marquee_function() {
        assert!(
            !CEREMONY_CONNECT_SCRIPT.contains("def marquee("),
            "CEREMONY_CONNECT_SCRIPT must NOT define marquee() inline — it's in the resident library; got:\n{}",
            CEREMONY_CONNECT_SCRIPT
        );
    }

    // ── CEREMONY_DISCONNECT_SCRIPT content tests ──────────────────────────────

    #[test]
    fn disconnect_script_is_non_empty() {
        assert!(!CEREMONY_DISCONNECT_SCRIPT.is_empty());
    }

    #[test]
    fn disconnect_script_uses_import_not_exec_jfs_read() {
        assert!(
            CEREMONY_DISCONNECT_SCRIPT.contains("import jumperless_mcp"),
            "CEREMONY_DISCONNECT_SCRIPT must use 'import jumperless_mcp' (Phase B); got:\n{}",
            CEREMONY_DISCONNECT_SCRIPT
        );
        assert!(
            !CEREMONY_DISCONNECT_SCRIPT.contains("exec("),
            "CEREMONY_DISCONNECT_SCRIPT must NOT use exec() — import replaces it; got:\n{}",
            CEREMONY_DISCONNECT_SCRIPT
        );
        assert!(
            !CEREMONY_DISCONNECT_SCRIPT.contains("jfs.open"),
            "CEREMONY_DISCONNECT_SCRIPT must NOT use jfs.open — import replaces it; got:\n{}",
            CEREMONY_DISCONNECT_SCRIPT
        );
        assert!(
            !CEREMONY_DISCONNECT_SCRIPT.contains("jfs.read"),
            "CEREMONY_DISCONNECT_SCRIPT must NOT use jfs.read — import replaces it; got:\n{}",
            CEREMONY_DISCONNECT_SCRIPT
        );
        assert!(
            !CEREMONY_DISCONNECT_SCRIPT.contains("fs_read("),
            "CEREMONY_DISCONNECT_SCRIPT must NOT use fs_read() fallback — import replaces it; got:\n{}",
            CEREMONY_DISCONNECT_SCRIPT
        );
    }

    #[test]
    fn disconnect_script_calls_ceremony_disconnect() {
        assert!(
            CEREMONY_DISCONNECT_SCRIPT.contains("jumperless_mcp._ceremony_disconnect()"),
            "CEREMONY_DISCONNECT_SCRIPT must call jumperless_mcp._ceremony_disconnect(); got:\n{}",
            CEREMONY_DISCONNECT_SCRIPT
        );
    }

    #[test]
    fn disconnect_script_has_import_retry_on_import_error() {
        assert!(
            CEREMONY_DISCONNECT_SCRIPT.contains("except ImportError:"),
            "CEREMONY_DISCONNECT_SCRIPT must have ImportError retry; got:\n{}",
            CEREMONY_DISCONNECT_SCRIPT
        );
        assert!(
            CEREMONY_DISCONNECT_SCRIPT.contains("sleep_ms(100)"),
            "CEREMONY_DISCONNECT_SCRIPT retry must sleep 100ms before retry; got:\n{}",
            CEREMONY_DISCONNECT_SCRIPT
        );
    }

    /// Regression: no bitmap file loader.
    #[test]
    fn disconnect_script_does_not_call_bitmap_file_loader() {
        assert!(
            !CEREMONY_DISCONNECT_SCRIPT.contains("oled_show_bitmap_file"),
            "CEREMONY_DISCONNECT_SCRIPT must NOT call oled_show_bitmap_file. Got:\n{}",
            CEREMONY_DISCONNECT_SCRIPT
        );
    }

    #[test]
    fn disconnect_script_does_not_attempt_rail_restore() {
        assert!(
            !CEREMONY_DISCONNECT_SCRIPT.contains("set_net_color"),
            "CEREMONY_DISCONNECT_SCRIPT must NOT call set_net_color — rails were \
             never flipped on connect (firmware-locked). Got:\n{}",
            CEREMONY_DISCONNECT_SCRIPT
        );
    }
}