use std::io::{Read, Write};
use crate::base::McpError;
use crate::repl;
pub const LIBRARY_VERSION: &str = "0.2.12+20260512";
pub const FONT_PY: &str = include_str!("../python/font.py");
pub const EFFECTS_PY: &str = include_str!("../python/effects.py");
pub const INIT_PY: &str = include_str!("../python/__init__.py");
pub const LIBRARY_ROOT: &str = "/python_scripts/lib/jumperless_mcp/";
pub const PYTHON_SCRIPTS_LIB: &str = "/python_scripts/lib/";
pub const PYTHON_SCRIPTS: &str = "/python_scripts/";
pub const VERSION_PATH: &str = "/python_scripts/lib/jumperless_mcp/VERSION";
pub const FONT_PATH: &str = "/python_scripts/lib/jumperless_mcp/font.py";
pub const EFFECTS_PATH: &str = "/python_scripts/lib/jumperless_mcp/effects.py";
pub const INIT_PATH: &str = "/python_scripts/lib/jumperless_mcp/__init__.py";
const _: () = {
let bytes = FONT_PY.as_bytes();
let needle = b"'''";
let mut i = 0;
while i + 3 <= bytes.len() {
assert!(
!(bytes[i] == needle[0] && bytes[i + 1] == needle[1] && bytes[i + 2] == needle[2]),
"FONT_PY must not contain triple-single-quotes (would corrupt python_repr output)"
);
i += 1;
}
};
const _: () = {
let bytes = EFFECTS_PY.as_bytes();
let needle = b"'''";
let mut i = 0;
while i + 3 <= bytes.len() {
assert!(
!(bytes[i] == needle[0] && bytes[i + 1] == needle[1] && bytes[i + 2] == needle[2]),
"EFFECTS_PY must not contain triple-single-quotes (would corrupt python_repr output)"
);
i += 1;
}
};
const _: () = {
let bytes = INIT_PY.as_bytes();
let needle = b"'''";
let mut i = 0;
while i + 3 <= bytes.len() {
assert!(
!(bytes[i] == needle[0] && bytes[i + 1] == needle[1] && bytes[i + 2] == needle[2]),
"INIT_PY must not contain triple-single-quotes (would corrupt python_repr output)"
);
i += 1;
}
};
const _: () = {
let bytes = LIBRARY_VERSION.as_bytes();
let mut i = 0;
while i < bytes.len() {
let b = bytes[i];
assert!(
b != b' ' && b != b'\t' && b != b'\n' && b != b'\r',
"LIBRARY_VERSION must not contain whitespace (causes spurious version mismatch)"
);
i += 1;
}
};
const _: () = {
let b = LIBRARY_VERSION.as_bytes();
let mut has_plus = false;
let mut i = 0;
while i < b.len() {
if b[i] == b'+' {
has_plus = true;
}
i += 1;
}
assert!(
has_plus,
"LIBRARY_VERSION must include SemVer 2.0 +build metadata (e.g., 0.1.0+20260510)"
);
};
#[allow(dead_code)]
pub const LIBRARY_SIZE_BYTES_APPROX: usize =
FONT_PY.len() + EFFECTS_PY.len() + INIT_PY.len() + LIBRARY_VERSION.len();
pub const LIBRARY_FILE_OP_TIMEOUT_MS: u64 = 30_000;
pub const JFS_READ_MAX_BYTES: usize = 8192;
const CHUNK_SIZE: usize = 768;
const _: () = {
let init_overhead = "_ygg_assembly = '''".len() + "'''".len(); let append_overhead = "_ygg_assembly = _ygg_assembly + '''".len() + "'''".len(); let max_overhead = if append_overhead > init_overhead {
append_overhead
} else {
init_overhead
};
assert!(
CHUNK_SIZE + max_overhead + 8 <= 1024,
"CHUNK_SIZE + max wrapper overhead + 8-byte safety margin must fit in 1024-byte REPL buffer"
);
};
#[derive(Debug, Clone)]
pub struct InstallationStatus {
pub installed: bool,
pub partial: bool,
pub installed_version: Option<String>,
pub current_version: &'static str,
pub up_to_date: bool,
pub files_present: Vec<&'static str>,
pub files_missing: Vec<&'static str>,
}
#[derive(Debug, Clone)]
pub struct UninstallResult {
pub attempted: usize,
pub attempted_actual: usize,
pub removed: usize,
pub errors: Vec<String>,
}
fn chunk_at_char_boundaries(s: &str, target_size: usize) -> Vec<&str> {
if s.is_empty() {
return vec![];
}
let mut chunks = Vec::new();
let mut start = 0;
while start < s.len() {
let remaining = &s[start..];
if remaining.len() <= target_size {
chunks.push(remaining);
break;
}
let mut end = start;
for (byte_offset, ch) in remaining.char_indices() {
let char_end = byte_offset + ch.len_utf8();
if char_end > target_size {
break;
}
end = start + char_end;
}
if end == start {
let ch = remaining.chars().next().unwrap();
end = start + ch.len_utf8();
}
const MAX_BACKOFF: usize = 8;
let mut backoff = 0;
while end > start && backoff < MAX_BACKOFF {
let last_byte = s.as_bytes()[end - 1];
if last_byte != b'\'' && last_byte != b'\\' {
break;
}
let mut new_end = end - 1;
while new_end > start && !s.is_char_boundary(new_end) {
new_end -= 1;
}
if new_end == start {
break;
}
end = new_end;
backoff += 1;
}
chunks.push(&s[start..end]);
start = end;
}
chunks
}
pub(crate) fn exec_with_cleanup<P: Read + Write + ?Sized>(
port: &mut P,
code: &str,
op_name: &str,
) -> Result<repl::ReplResponse, McpError> {
match repl::exec_code(port, code) {
Ok(resp) => {
if resp.is_error() {
tracing::warn!(
op = op_name,
stderr = %resp.stderr.trim(),
"library op returned device-side exception; sending Ctrl-C abort"
);
let _ = port.write_all(&[0x03]);
let _ = port.flush();
std::thread::sleep(std::time::Duration::from_millis(10));
return Err(McpError::Protocol(format!(
"{op_name}: device-side exception: {}",
resp.stderr.trim()
)));
}
Ok(resp)
}
Err(e) => {
tracing::warn!(op = op_name, err = %e, "library op failed; sending Ctrl-C abort");
let _ = port.write_all(&[0x03]);
let _ = port.flush();
std::thread::sleep(std::time::Duration::from_millis(10));
Err(McpError::Protocol(format!("{op_name}: {e}")))
}
}
}
fn fs_write_chunked<P: Read + Write + ?Sized>(
port: &mut P,
path: &str,
content: &str,
) -> Result<(), McpError> {
let chunks = chunk_at_char_boundaries(content, CHUNK_SIZE);
if chunks.is_empty() {
let code = format!("fs_write('{path}', '')");
exec_with_cleanup(
port,
&code,
&format!("fs_write_chunked({path}): empty write"),
)?;
return Ok(());
}
tracing::info!(
path = path,
chunk_count = chunks.len(),
total_bytes = content.len(),
"fs_write_chunked starting"
);
exec_with_cleanup(
port,
&format!("_ygg_f = open('{path}', 'w')"),
&format!("fs_write_chunked({path}): open"),
)?;
for (i, chunk) in chunks.iter().enumerate() {
tracing::info!(
path = path,
chunk_idx = i,
chunk_bytes = chunk.len(),
"fs_write_chunked writing chunk"
);
let chunk_repr = python_repr(chunk);
exec_with_cleanup(
port,
&format!("_ygg_f.write({chunk_repr})"),
&format!("fs_write_chunked({path}): chunk {i}"),
)?;
}
exec_with_cleanup(
port,
"_ygg_f.close()",
&format!("fs_write_chunked({path}): close"),
)?;
let expected_lf_size = content.replace("\r\n", "\n").replace('\r', "\n").len();
let read_back_code = format!(
"try:\n\
\x20\x20\x20\x20import jumperless\n\
\x20\x20\x20\x20_ygg_rf = jumperless.jfs.open('{path}', 'r')\n\
\x20\x20\x20\x20try:\n\
\x20\x20\x20\x20\x20\x20\x20\x20_ygg_rb = jumperless.jfs.read(_ygg_rf, {JFS_READ_MAX_BYTES})\n\
\x20\x20\x20\x20except TypeError:\n\
\x20\x20\x20\x20\x20\x20\x20\x20# Older firmware: jfs.read takes no size arg. Fall back; may truncate.\n\
\x20\x20\x20\x20\x20\x20\x20\x20_ygg_rb = jumperless.jfs.read(_ygg_rf)\n\
\x20\x20\x20\x20jumperless.jfs.close(_ygg_rf)\n\
\x20\x20\x20\x20del _ygg_rf\n\
\x20\x20\x20\x20print(len(_ygg_rb))\n\
\x20\x20\x20\x20del _ygg_rb\n\
except (NameError, AttributeError):\n\
\x20\x20\x20\x20print(len(fs_read('{path}')))\n\
except Exception as e:\n\
\x20\x20\x20\x20print('ERR:' + str(e))"
);
match repl::exec_code(port, &read_back_code) {
Ok(resp) if !resp.is_error() => {
let out = resp.stdout.trim();
if let Ok(actual_size) = out.parse::<usize>() {
tracing::info!(
path = path,
expected_bytes_after_normalization = expected_lf_size,
actual_bytes_on_device = actual_size,
"fs_write_chunked complete; post-write size verification"
);
} else {
tracing::warn!(
path = path,
device_output = out,
expected_bytes_after_normalization = expected_lf_size,
"fs_write_chunked post-write size read returned unexpected output"
);
}
}
Ok(resp) => {
tracing::warn!(
path = path,
stderr = %resp.stderr.trim(),
"fs_write_chunked post-write size read produced device-side exception"
);
}
Err(e) => {
tracing::warn!(
path = path,
error = %e,
"fs_write_chunked post-write size read failed at protocol level"
);
}
}
match repl::exec_code(port, "del _ygg_f") {
Ok(resp) if resp.is_error() => {
tracing::debug!(
stderr = %resp.stderr.trim(),
"del _ygg_f produced device exception (likely already deleted; safe)"
);
}
Ok(_) => {}
Err(e) => {
tracing::debug!(
error = %e,
"del _ygg_f failed at protocol level (best-effort cleanup; safe)"
);
}
}
Ok(())
}
pub fn read_installed_version<P: Read + Write + ?Sized>(
port: &mut P,
) -> Result<Option<String>, McpError> {
let code = r#"
try:
import jumperless
_vf = jumperless.jfs.open('/python_scripts/lib/jumperless_mcp/VERSION', 'r')
_v = jumperless.jfs.read(_vf, 64)
jumperless.jfs.close(_vf)
if isinstance(_v, str):
print(_v.strip())
else:
print(_v.decode('utf-8').strip())
del _vf
del _v
except OSError:
print('')
except Exception as e:
print('ERR:' + str(e))
"#;
let resp = repl::exec_code(port, code)
.map_err(|e| McpError::Protocol(format!("read_installed_version: {e}")))?;
if resp.is_error() {
return Err(McpError::Protocol(format!(
"read_installed_version: device-side exception: {}",
resp.stderr.trim()
)));
}
let v = resp.stdout.trim();
if v.is_empty() {
Ok(None)
} else if let Some(msg) = v.strip_prefix("ERR:") {
Err(McpError::Protocol(format!(
"read_installed_version: device error: {msg}"
)))
} else {
Ok(Some(v.to_string()))
}
}
pub fn check_installation<P: Read + Write + ?Sized>(
port: &mut P,
) -> Result<InstallationStatus, McpError> {
let installed_version = read_installed_version(port)?;
let mut files_present = Vec::new();
let mut files_missing = Vec::new();
for path in &[VERSION_PATH, FONT_PATH, EFFECTS_PATH, INIT_PATH] {
let code = format!("print(int(fs_exists('{path}')))");
let resp = exec_with_cleanup(
port,
&code,
&format!("check_installation: fs_exists({path})"),
)?;
if resp.stdout.trim() == "1" {
files_present.push(*path);
} else {
files_missing.push(*path);
}
}
let installed = files_missing.is_empty() && installed_version.is_some();
let partial = !files_present.is_empty() && !files_missing.is_empty();
let up_to_date = installed_version.as_deref() == Some(LIBRARY_VERSION);
Ok(InstallationStatus {
installed,
partial,
installed_version,
current_version: LIBRARY_VERSION,
up_to_date,
files_present,
files_missing,
})
}
pub fn install<P: Read + Write + ?Sized>(port: &mut P) -> Result<(), McpError> {
let mkdir_code = format!(
r#"try:
import jumperless
jumperless.jfs.mkdir('{PYTHON_SCRIPTS}')
except OSError:
pass
try:
import jumperless
jumperless.jfs.mkdir('{PYTHON_SCRIPTS_LIB}')
except OSError:
pass
try:
import jumperless
jumperless.jfs.mkdir('{LIBRARY_ROOT}')
except OSError:
pass"#
);
exec_with_cleanup(
port,
&mkdir_code,
"install: mkdir /python_scripts/lib/jumperless_mcp",
)?;
fs_write_chunked(port, FONT_PATH, FONT_PY)?;
fs_write_chunked(port, EFFECTS_PATH, EFFECTS_PY)?;
fs_write_chunked(port, INIT_PATH, INIT_PY)?;
let code = format!("fs_write('{VERSION_PATH}', '{LIBRARY_VERSION}\\n')");
exec_with_cleanup(port, &code, "install: write VERSION")?;
Ok(())
}
pub fn install_if_needed<P: Read + Write + ?Sized>(port: &mut P) -> Result<bool, McpError> {
let status = check_installation(port)?;
if status.installed && status.up_to_date {
return Ok(false);
}
install(port)?;
Ok(true)
}
pub fn reinstall<P: Read + Write + ?Sized>(port: &mut P) -> Result<ReinstallResult, McpError> {
let pre_uninstall = uninstall_best_effort(port);
install(port)?;
Ok(ReinstallResult {
pre_uninstall,
installed_version: LIBRARY_VERSION,
})
}
pub fn uninstall<P: Read + Write + ?Sized>(port: &mut P) -> Result<UninstallResult, McpError> {
let files = [FONT_PATH, EFFECTS_PATH, INIT_PATH, VERSION_PATH];
let attempted = files.len();
let mut attempted_actual = 0usize;
let mut removed = 0usize;
let mut errors: Vec<String> = Vec::new();
for path in &files {
attempted_actual += 1;
let code = format!(
r#"try:
import jumperless
jumperless.jfs.remove('{path}')
print('OK')
except OSError:
print('ABSENT')
except Exception as e:
print('ERR:' + str(e))"#
);
match repl::exec_code(port, &code) {
Err(e) => {
errors.push(format!("{path}: exec failed: {e}"));
}
Ok(resp) => {
if resp.is_error() {
tracing::warn!(path = path, stderr = %resp.stderr.trim(), "uninstall device-side exception");
errors.push(format!(
"{path}: device-side exception: {}",
resp.stderr.trim()
));
continue;
}
let token = resp.stdout.trim();
match token {
"OK" => {
removed += 1;
}
"ABSENT" => {
removed += 1;
} other if other.starts_with("ERR:") => {
tracing::warn!(
path = path,
detail = other,
"unexpected uninstall exception"
);
errors.push(format!("{path}: {other}"));
}
_ => {
tracing::warn!(
path = path,
token = token,
"unrecognised uninstall response token"
);
errors.push(format!("{path}: unrecognised token '{token}'"));
}
}
}
}
}
let dir_code = format!(
r#"try:
import jumperless
jumperless.jfs.rmdir('{LIBRARY_ROOT}')
except Exception:
pass"#
);
if let Err(e) = repl::exec_code(port, &dir_code) {
tracing::debug!(error = %e, "fs_rmdir best-effort cleanup failed at protocol level (safe — files already handled)");
}
Ok(UninstallResult {
attempted,
attempted_actual,
removed,
errors,
})
}
fn uninstall_best_effort<P: Read + Write + ?Sized>(port: &mut P) -> Option<UninstallResult> {
match uninstall(port) {
Ok(r) => {
if !r.errors.is_empty() {
tracing::warn!(
attempted = r.attempted,
removed = r.removed,
errors = ?r.errors,
"uninstall completed with issues"
);
}
Some(r)
}
Err(e) => {
tracing::error!(error = %e, "uninstall returned Err (ignored for reinstall path)");
None
}
}
}
pub struct ReinstallResult {
pub pre_uninstall: Option<UninstallResult>,
pub installed_version: &'static str,
}
#[derive(Debug, Clone)]
pub struct FileVerification {
pub path: String,
pub device_size: Option<usize>,
pub source_size: usize,
pub device_sha256: Option<String>,
pub source_sha256: String,
pub hash_available_on_device: bool,
pub matched: bool,
}
#[derive(Debug)]
pub struct VerifyResult {
pub files: Vec<FileVerification>,
pub library_version: &'static str,
pub all_match: bool,
}
fn device_verify_script(path: &str) -> String {
format!(
r#"try:
try:
import jumperless
_ygg_rf = jumperless.jfs.open('{path}', 'r')
try:
content = jumperless.jfs.read(_ygg_rf, {JFS_READ_MAX_BYTES})
except TypeError:
# Older firmware: jfs.read takes no size arg. Fall back; may truncate.
content = jumperless.jfs.read(_ygg_rf)
jumperless.jfs.close(_ygg_rf)
del _ygg_rf
except (NameError, AttributeError):
content = fs_read('{path}')
if isinstance(content, str):
content = content.encode('utf-8')
import hashlib
print('size:' + str(len(content)))
try:
h = hashlib.sha256(content).hexdigest()
except AttributeError:
try:
import binascii
h = binascii.hexlify(hashlib.sha256(content).digest()).decode()
except Exception:
h = 'NO_HEX_AVAILABLE'
print('sha256:' + h)
except NameError as e:
if 'hashlib' in str(e):
print('size:' + str(len(content)))
print('sha256:NO_HASHLIB')
else:
print('error:' + str(e))
except OSError:
print('size:MISSING')
print('sha256:MISSING')
except Exception as e:
print('error:' + str(e))"#
)
}
pub(crate) fn parse_device_verify_output(
stdout: &str,
) -> Result<(Option<usize>, Option<String>, bool), String> {
let mut size_line: Option<&str> = None;
let mut sha256_line: Option<&str> = None;
let mut error_msg: Option<String> = None;
for line in stdout.lines() {
let line = line.trim();
if let Some(val) = line.strip_prefix("size:") {
size_line = Some(val);
} else if let Some(val) = line.strip_prefix("sha256:") {
sha256_line = Some(val);
} else if let Some(msg) = line.strip_prefix("error:") {
error_msg = Some(msg.to_string());
}
}
let size_str = size_line.unwrap_or("");
let sha256_str = sha256_line.unwrap_or("");
if size_str == "MISSING" {
return Ok((None, None, false));
}
if size_str.is_empty() {
return match error_msg {
Some(msg) => Err(msg),
None => Err("device returned no size or error".to_string()),
};
}
let device_size = size_str
.parse::<usize>()
.map(Some)
.map_err(|_| format!("could not parse size value: '{size_str}'"))?;
let hash_failed = error_msg.is_some();
let (device_sha256, hash_available) = match sha256_str {
"" | "NO_HASHLIB" | "NO_HEX_AVAILABLE" => (None, false),
_ if hash_failed => (None, false),
hex => (Some(hex.to_string()), true),
};
Ok((device_size, device_sha256, hash_available))
}
pub fn verify_installation<P: Read + Write + ?Sized>(
port: &mut P,
) -> Result<VerifyResult, McpError> {
use sha2::{Digest, Sha256};
let version_content = format!("{LIBRARY_VERSION}\n");
let targets: &[(&str, &str)] = &[
(FONT_PATH, FONT_PY),
(EFFECTS_PATH, EFFECTS_PY),
(INIT_PATH, INIT_PY),
(VERSION_PATH, &version_content),
];
let mut file_results: Vec<FileVerification> = Vec::with_capacity(targets.len());
let mut all_match = true;
for (path, source_content) in targets {
let normalized = source_content.replace("\r\n", "\n").replace('\r', "\n");
let source_size = normalized.len();
let digest_bytes = Sha256::digest(normalized.as_bytes());
let source_sha256: String = digest_bytes.iter().map(|b| format!("{b:02x}")).collect();
let script = device_verify_script(path);
let resp = exec_with_cleanup(port, &script, &format!("verify({path})"))?;
let parse_result = parse_device_verify_output(&resp.stdout);
let fv = match parse_result {
Err(e) => {
tracing::warn!(path = path, error = %e, "device verify script reported error");
let fv = FileVerification {
path: path.to_string(),
device_size: None,
source_size,
device_sha256: None,
source_sha256,
hash_available_on_device: false,
matched: false,
};
all_match = false;
fv
}
Ok((device_size, device_sha256_raw, hash_available)) => {
let sizes_match = device_size == Some(source_size);
let hashes_match = if hash_available {
device_sha256_raw.as_deref() == Some(source_sha256.as_str())
} else {
true
};
let matched = sizes_match && hashes_match;
if !matched {
all_match = false;
}
FileVerification {
path: path.to_string(),
device_size,
source_size,
device_sha256: device_sha256_raw,
source_sha256,
hash_available_on_device: hash_available,
matched,
}
}
};
file_results.push(fv);
}
Ok(VerifyResult {
files: file_results,
library_version: LIBRARY_VERSION,
all_match,
})
}
#[derive(Debug)]
pub struct DumpResult {
pub path: String,
pub size: Option<usize>,
pub content: Option<Vec<u8>>,
pub missing: bool,
}
fn device_dump_script(path: &str) -> String {
format!(
r#"try:
try:
import jumperless
_ygg_rf = jumperless.jfs.open('{path}', 'r')
try:
content = jumperless.jfs.read(_ygg_rf, {JFS_READ_MAX_BYTES})
except TypeError:
# Older firmware: jfs.read takes no size arg. Fall back; may truncate.
content = jumperless.jfs.read(_ygg_rf)
jumperless.jfs.close(_ygg_rf)
del _ygg_rf
except (NameError, AttributeError):
content = fs_read('{path}')
if isinstance(content, str):
content = content.encode('utf-8')
import binascii
print('size:' + str(len(content)))
print('hex:' + binascii.hexlify(content).decode())
except OSError:
print('size:MISSING')
print('hex:MISSING')
except Exception as e:
print('error:' + str(e))"#
)
}
pub(crate) fn hex_to_bytes(hex: &str) -> Result<Vec<u8>, String> {
if hex.len() % 2 != 0 {
return Err(format!(
"hex string has odd length {}; expected even number of nibbles",
hex.len()
));
}
let mut out = Vec::with_capacity(hex.len() / 2);
for i in (0..hex.len()).step_by(2) {
let pair = &hex[i..i + 2];
let byte = u8::from_str_radix(pair, 16)
.map_err(|_| format!("invalid hex pair '{pair}' at offset {i}"))?;
out.push(byte);
}
Ok(out)
}
pub(crate) fn parse_dump_output(stdout: &str) -> Result<(Option<usize>, Option<Vec<u8>>), String> {
let mut size_val: Option<&str> = None;
let mut hex_val: Option<&str> = None;
let mut error_msg: Option<String> = None;
for line in stdout.lines() {
let line = line.trim();
if let Some(v) = line.strip_prefix("size:") {
size_val = Some(v);
} else if let Some(v) = line.strip_prefix("hex:") {
hex_val = Some(v);
} else if let Some(msg) = line.strip_prefix("error:") {
error_msg = Some(msg.to_string());
}
}
if let Some(msg) = error_msg {
return Err(msg);
}
let size_str = size_val.unwrap_or("");
let hex_str = hex_val.unwrap_or("");
if size_str == "MISSING" {
return Ok((None, None));
}
if size_str.is_empty() {
return Err("device returned no size or error".to_string());
}
let size = size_str
.parse::<usize>()
.map_err(|_| format!("could not parse size value: '{size_str}'"))?;
if hex_str.is_empty() {
return Err("device returned size but no hex content".to_string());
}
let bytes = hex_to_bytes(hex_str)?;
Ok((Some(size), Some(bytes)))
}
pub fn dump_device_file<P: Read + Write + ?Sized>(
port: &mut P,
path: &str,
) -> Result<DumpResult, McpError> {
let script = device_dump_script(path);
let resp = exec_with_cleanup(port, &script, &format!("dump({path})"))?;
match parse_dump_output(&resp.stdout) {
Ok((None, None)) => Ok(DumpResult {
path: path.to_string(),
size: None,
content: None,
missing: true,
}),
Ok((size, content)) => Ok(DumpResult {
path: path.to_string(),
size,
content,
missing: false,
}),
Err(e) => Err(McpError::Protocol(format!("dump({path}): {e}"))),
}
}
fn python_repr(s: &str) -> String {
let normalized = s.replace("\r\n", "\n").replace('\r', "\n");
let escaped = normalized.replace('\\', "\\\\");
format!("'''{escaped}'''")
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::VecDeque;
use std::io::{self, Read, Write};
struct MockPort {
read_data: VecDeque<u8>,
pub write_data: Vec<u8>,
}
impl MockPort {
fn with_responses(responses: &[&[u8]]) -> Self {
let mut buf = Vec::new();
for r in responses {
buf.extend_from_slice(r);
}
MockPort {
read_data: VecDeque::from(buf),
write_data: Vec::new(),
}
}
fn ok_frame() -> Vec<u8> {
b"OK\x04\x04>".to_vec()
}
fn ok_with_stdout(line: &str) -> Vec<u8> {
let mut v = b"OK".to_vec();
v.extend_from_slice(line.as_bytes());
v.push(b'\n');
v.extend_from_slice(b"\x04\x04>");
v
}
}
impl Read for MockPort {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let n = buf.len().min(self.read_data.len());
if n == 0 {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"MockPort: no more scripted bytes",
));
}
for (dst, src) in buf[..n].iter_mut().zip(self.read_data.drain(..n)) {
*dst = src;
}
Ok(n)
}
}
impl Write for MockPort {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.write_data.extend_from_slice(buf);
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
#[test]
fn library_version_is_semver_2_with_build_metadata() {
assert!(LIBRARY_VERSION.contains('+'));
let (semver, _build) = LIBRARY_VERSION.split_once('+').unwrap();
assert_eq!(semver.split('.').count(), 3);
for part in semver.split('.') {
assert!(
part.parse::<u32>().is_ok(),
"semver part must be numeric: {part}"
);
}
}
#[test]
#[allow(clippy::assertions_on_constants)] fn library_size_is_under_limit() {
assert!(
LIBRARY_SIZE_BYTES_APPROX < 32_000,
"Library size {LIBRARY_SIZE_BYTES_APPROX} exceeds 32KB sanity budget"
);
}
#[test]
fn font_py_and_effects_py_are_non_empty() {
assert!(!FONT_PY.is_empty(), "FONT_PY must not be empty");
assert!(!EFFECTS_PY.is_empty(), "EFFECTS_PY must not be empty");
}
#[test]
fn library_version_matches_font_py_header() {
let (semver, _) = LIBRARY_VERSION.split_once('+').unwrap();
assert!(
FONT_PY.contains(semver),
"font.py header should contain the library version semver {semver}"
);
}
#[test]
fn library_version_matches_effects_py_header() {
let (semver, _) = LIBRARY_VERSION.split_once('+').unwrap();
assert!(
EFFECTS_PY.contains(semver),
"effects.py header should contain the library version semver {semver}"
);
}
#[test]
fn python_repr_roundtrip_simple() {
let s = "hello\nworld";
let r = python_repr(s);
assert!(r.starts_with("'''"), "repr must start with '''");
assert!(r.ends_with("'''"), "repr must end with '''");
assert!(r.contains("hello\nworld"));
}
#[test]
fn python_repr_does_not_escape_single_quotes() {
let s = "it's a test";
let r = python_repr(s);
assert!(
r.contains("it's"),
"single quotes must pass through unchanged inside triple-quoted string; got: {r}"
);
assert!(
!r.contains("\\'"),
"python_repr must not escape single quotes (triple-quote context is safe); got: {r}"
);
}
#[test]
fn python_repr_escapes_backslashes() {
let s = "path\\to\\file";
let r = python_repr(s);
assert!(r.contains("\\\\"), "backslashes must be doubled");
}
#[test]
fn python_repr_normalizes_crlf_to_lf() {
let s = "line1\r\nline2\r\nline3";
let r = python_repr(s);
assert!(!r.contains('\r'), "CR (0x0D) must be stripped; got: {r:?}");
assert!(
r.contains("line1\nline2\nline3"),
"lines must be joined with LF only; got: {r:?}"
);
}
#[test]
fn python_repr_normalizes_bare_cr_to_lf() {
let s = "a\rb\rc";
let r = python_repr(s);
assert!(!r.contains('\r'));
assert!(r.contains("a\nb\nc"));
}
#[test]
fn python_repr_of_embedded_files_is_lf_only() {
let font_repr = python_repr(FONT_PY);
assert!(
!font_repr.contains('\r'),
"FONT_PY repr must be LF-only after normalization"
);
let effects_repr = python_repr(EFFECTS_PY);
assert!(
!effects_repr.contains('\r'),
"EFFECTS_PY repr must be LF-only after normalization"
);
}
#[test]
fn python_repr_font_py_does_not_contain_unescaped_triple_quotes() {
let r = python_repr(FONT_PY);
let inner = &r[3..r.len() - 3];
assert!(
!inner.contains("'''"),
"font.py repr inner must not contain '''"
);
}
#[test]
fn python_repr_effects_py_does_not_contain_unescaped_triple_quotes() {
let r = python_repr(EFFECTS_PY);
let inner = &r[3..r.len() - 3];
assert!(
!inner.contains("'''"),
"effects.py repr inner must not contain '''"
);
}
#[test]
fn chunk_empty_string_returns_empty_vec() {
let chunks = chunk_at_char_boundaries("", 1024);
assert!(chunks.is_empty(), "empty string should produce no chunks");
}
#[test]
fn chunk_string_shorter_than_limit_returns_one_chunk() {
let s = "hello world";
let chunks = chunk_at_char_boundaries(s, 1024);
assert_eq!(chunks.len(), 1);
assert_eq!(chunks[0], s);
}
#[test]
fn chunk_string_exactly_limit_returns_one_chunk() {
let s = "a".repeat(1024);
let chunks = chunk_at_char_boundaries(&s, 1024);
assert_eq!(chunks.len(), 1);
assert_eq!(chunks[0], s.as_str());
}
#[test]
fn chunk_string_one_over_limit_returns_two_chunks() {
let s = "a".repeat(1025);
let chunks = chunk_at_char_boundaries(&s, 1024);
assert_eq!(chunks.len(), 2);
assert_eq!(chunks[0].len(), 1024);
assert_eq!(chunks[1].len(), 1);
}
#[test]
fn chunk_preserves_total_content() {
let s = "the quick brown fox jumps over the lazy dog. ".repeat(100);
let chunks = chunk_at_char_boundaries(&s, 1024);
let reassembled: String = chunks.concat();
assert_eq!(
reassembled, s,
"reassembled chunks must equal original string"
);
}
#[test]
fn chunk_all_chunks_within_size_limit() {
let s = "x".repeat(8192);
let chunks = chunk_at_char_boundaries(&s, 1024);
for (i, chunk) in chunks.iter().enumerate() {
assert!(
chunk.len() <= 1024,
"chunk {i} has length {} which exceeds limit 1024",
chunk.len()
);
}
}
#[test]
fn chunk_boundary_apostrophe_is_avoided() {
let s = "abcdefghi'extra";
let chunks = chunk_at_char_boundaries(s, 10);
assert!(chunks.len() >= 2, "should split into 2+ chunks");
for (i, chunk) in chunks.iter().enumerate() {
if i == chunks.len() - 1 {
continue;
}
let last = chunk.as_bytes().last().copied();
assert_ne!(
last,
Some(b'\''),
"chunk {i} ends with `'` which would collide with `'''<chunk>'''` wrapper: {chunk:?}"
);
}
let reassembled: String = chunks.concat();
assert_eq!(reassembled, s);
}
#[test]
fn chunk_boundary_backslash_is_avoided() {
let s = "abcdefghi\\extra";
let chunks = chunk_at_char_boundaries(s, 10);
assert!(chunks.len() >= 2);
for (i, chunk) in chunks.iter().enumerate() {
if i == chunks.len() - 1 {
continue;
}
let last = chunk.as_bytes().last().copied();
assert_ne!(
last,
Some(b'\\'),
"chunk {i} ends with `\\` which would escape the closing quote: {chunk:?}"
);
}
let reassembled: String = chunks.concat();
assert_eq!(reassembled, s);
}
#[test]
fn chunk_boundary_real_world_apostrophe_pattern() {
let s = "Used as ceremony's transition from idle to active state. \
direction 'L2R' or 'R2L' determines sweep direction.";
let target = 16; let chunks = chunk_at_char_boundaries(s, target);
for (i, chunk) in chunks.iter().enumerate() {
if i == chunks.len() - 1 {
continue;
}
let last = chunk.as_bytes().last().copied();
assert_ne!(
last,
Some(b'\''),
"chunk {i} would collide with wrapper: {chunk:?}"
);
}
let reassembled: String = chunks.concat();
assert_eq!(reassembled, s);
}
#[test]
fn chunk_effects_py_all_chunks_within_size_limit() {
let chunks = chunk_at_char_boundaries(EFFECTS_PY, CHUNK_SIZE);
assert!(
chunks.len() >= 2,
"effects.py ({} chars) should require multiple chunks at CHUNK_SIZE={CHUNK_SIZE}",
EFFECTS_PY.len()
);
for (i, chunk) in chunks.iter().enumerate() {
assert!(
chunk.len() <= CHUNK_SIZE,
"effects.py chunk {i} has {} bytes, exceeds CHUNK_SIZE={CHUNK_SIZE}",
chunk.len()
);
}
let reassembled: String = chunks.concat();
assert_eq!(
reassembled, EFFECTS_PY,
"chunked+reassembled effects.py must equal original"
);
}
#[test]
fn chunk_font_py_all_chunks_within_size_limit() {
let chunks = chunk_at_char_boundaries(FONT_PY, CHUNK_SIZE);
for (i, chunk) in chunks.iter().enumerate() {
assert!(
chunk.len() <= CHUNK_SIZE,
"font.py chunk {i} has {} bytes, exceeds CHUNK_SIZE={CHUNK_SIZE}",
chunk.len()
);
}
let reassembled: String = chunks.concat();
assert_eq!(
reassembled, FONT_PY,
"chunked+reassembled font.py must equal original"
);
}
#[test]
fn chunk_utf8_boundary_safety() {
let euro_seq: String = "€".repeat(341); let s = format!("{euro_seq}x"); let chunks = chunk_at_char_boundaries(&s, 1022);
for (i, chunk) in chunks.iter().enumerate() {
assert!(
chunk.len() <= 1022,
"UTF-8 boundary chunk {i} has {} bytes, exceeds limit 1022",
chunk.len()
);
}
let reassembled: String = chunks.concat();
assert_eq!(
reassembled, s,
"UTF-8 chunked reassembly must equal original"
);
}
#[test]
fn exec_with_cleanup_returns_ok_on_success() {
let mut port = MockPort::with_responses(&[&MockPort::ok_frame()]);
let resp = exec_with_cleanup(&mut port, "pass", "test_op").unwrap();
assert!(!resp.is_error());
}
#[test]
fn exec_with_cleanup_sends_ctrl_c_on_error() {
let mut port = MockPort::with_responses(&[]);
let err = exec_with_cleanup(&mut port, "fail_op", "test_cleanup_op").unwrap_err();
assert!(
matches!(err, McpError::Protocol(_)),
"expected McpError::Protocol, got: {err:?}"
);
assert!(
port.write_data.contains(&0x03),
"exec_with_cleanup must send Ctrl-C (0x03) on error; write_data={:?}",
port.write_data
);
}
#[test]
fn exec_with_cleanup_wraps_error_with_op_name() {
let mut port = MockPort::with_responses(&[]);
let err = exec_with_cleanup(&mut port, "bad_code", "my_op_label").unwrap_err();
match &err {
McpError::Protocol(msg) => {
assert!(
msg.starts_with("my_op_label:"),
"error message should be prefixed with op_name; got: {msg}"
);
}
other => panic!("expected McpError::Protocol, got: {other:?}"),
}
}
#[test]
fn fs_write_chunked_sends_assembly_sequence() {
let content: String = "x".repeat(2100);
let path = "/test/file.py";
let n_chunks = chunk_at_char_boundaries(&content, CHUNK_SIZE).len();
assert_eq!(
n_chunks, 3,
"sanity: 2100-char content should split into 3 chunks at CHUNK_SIZE={CHUNK_SIZE}"
);
let frames: Vec<Vec<u8>> = (0..(n_chunks + 3)).map(|_| MockPort::ok_frame()).collect();
let frame_refs: Vec<&[u8]> = frames.iter().map(|v| v.as_slice()).collect();
let mut port = MockPort::with_responses(&frame_refs);
fs_write_chunked(&mut port, path, &content).unwrap();
let written = String::from_utf8_lossy(&port.write_data);
assert!(
written.contains(&format!("_ygg_f = open('{path}', 'w')")),
"should contain open call; written (truncated)={:.200}",
&written
);
assert!(
written.contains("_ygg_f.write("),
"should contain _ygg_f.write() calls for multi-chunk content"
);
assert!(
written.contains("_ygg_f.close()"),
"should contain _ygg_f.close() call"
);
assert!(
written.contains("del _ygg_f"),
"should contain cleanup del _ygg_f"
);
assert!(
!written.contains("_ygg_f.flush()"),
"must NOT call _ygg_f.flush() — jfs.flush is not in JFS globals table \
(modjumperless.c:4941-4972). Phase C removes the silent no-op flush."
);
assert!(
!written.contains("_ygg_assembly"),
"must not use old _ygg_assembly pattern"
);
assert!(
!written.contains(&format!("fs_write('{path}',")),
"must not call fs_write() directly — replaced by open/write/close"
);
}
#[test]
fn fs_write_chunked_single_chunk_no_appends() {
let content = "short content here";
let path = "/test/short.py";
let n_chunks = chunk_at_char_boundaries(content, CHUNK_SIZE).len();
assert_eq!(n_chunks, 1, "short content should produce exactly 1 chunk");
let frames: Vec<Vec<u8>> = (0..4).map(|_| MockPort::ok_frame()).collect();
let frame_refs: Vec<&[u8]> = frames.iter().map(|v| v.as_slice()).collect();
let mut port = MockPort::with_responses(&frame_refs);
fs_write_chunked(&mut port, path, content).unwrap();
let written = String::from_utf8_lossy(&port.write_data);
assert!(
written.contains(&format!("_ygg_f = open('{path}', 'w')")),
"single-chunk: should open with _ygg_f"
);
let write_count = written.matches("_ygg_f.write(").count();
assert_eq!(
write_count, 1,
"single-chunk: must have exactly one _ygg_f.write() call, got {write_count}"
);
assert!(
written.contains("_ygg_f.close()"),
"single-chunk: should close with _ygg_f.close()"
);
assert!(
!written.contains("_ygg_f.flush()"),
"single-chunk: must NOT call _ygg_f.flush() — jfs.flush is not bound"
);
assert!(
written.contains("del _ygg_f"),
"single-chunk: should cleanup del _ygg_f"
);
assert!(
!written.contains("_ygg_assembly"),
"single-chunk: must not use old _ygg_assembly pattern"
);
}
#[test]
fn install_chunked_succeeds_for_effects_py_sized_content() {
let big_content: String = "x".repeat(5703);
let path = "/python_scripts/lib/jumperless_mcp/effects.py";
let n_chunks = chunk_at_char_boundaries(&big_content, CHUNK_SIZE).len();
assert!(
n_chunks >= 2,
"5703-char string should require ≥2 chunks at CHUNK_SIZE={CHUNK_SIZE}"
);
let frames: Vec<Vec<u8>> = (0..(n_chunks + 3)).map(|_| MockPort::ok_frame()).collect();
let frame_refs: Vec<&[u8]> = frames.iter().map(|v| v.as_slice()).collect();
let mut port = MockPort::with_responses(&frame_refs);
fs_write_chunked(&mut port, path, &big_content).unwrap();
let written = String::from_utf8_lossy(&port.write_data);
assert!(
written.contains(&format!("_ygg_f = open('{path}', 'w')")),
"5KB+ write must open file with _ygg_f"
);
assert!(
written.contains("_ygg_f.write("),
"5KB+ write must use _ygg_f.write() calls"
);
assert!(
!written.contains("_ygg_f.flush()"),
"5KB+ write must NOT call _ygg_f.flush() — jfs.flush is not in JFS globals"
);
assert!(
written.contains("_ygg_f.close()"),
"5KB+ write must close with _ygg_f.close()"
);
assert!(
!written.contains("_ygg_assembly"),
"5KB+ write must not use old _ygg_assembly pattern"
);
let write_count = written.matches("_ygg_f.write(").count();
assert_eq!(
write_count, n_chunks,
"write call count ({write_count}) must equal chunk count ({n_chunks})"
);
}
#[test]
fn check_installation_all_present_and_current() {
let responses: &[&[u8]] = &[
&MockPort::ok_with_stdout(LIBRARY_VERSION),
&MockPort::ok_with_stdout("1"),
&MockPort::ok_with_stdout("1"),
&MockPort::ok_with_stdout("1"),
&MockPort::ok_with_stdout("1"),
];
let mut port = MockPort::with_responses(responses);
let status = check_installation(&mut port).unwrap();
assert!(status.installed);
assert!(!status.partial, "all files present → partial=false");
assert!(status.up_to_date);
assert_eq!(status.installed_version.as_deref(), Some(LIBRARY_VERSION));
assert_eq!(status.current_version, LIBRARY_VERSION);
assert_eq!(status.files_missing.len(), 0);
assert_eq!(status.files_present.len(), 4);
}
#[test]
fn check_installation_not_installed_all_missing() {
let responses: &[&[u8]] = &[
&MockPort::ok_with_stdout(""), &MockPort::ok_with_stdout("0"), &MockPort::ok_with_stdout("0"), &MockPort::ok_with_stdout("0"), &MockPort::ok_with_stdout("0"), ];
let mut port = MockPort::with_responses(responses);
let status = check_installation(&mut port).unwrap();
assert!(!status.installed);
assert!(!status.up_to_date);
assert!(status.installed_version.is_none());
assert_eq!(status.files_present.len(), 0);
assert_eq!(status.files_missing.len(), 4);
}
#[test]
fn check_installation_stale_version() {
let responses: &[&[u8]] = &[
&MockPort::ok_with_stdout("0.1.0+20260510"), &MockPort::ok_with_stdout("1"),
&MockPort::ok_with_stdout("1"),
&MockPort::ok_with_stdout("1"),
&MockPort::ok_with_stdout("1"),
];
let mut port = MockPort::with_responses(responses);
let status = check_installation(&mut port).unwrap();
assert!(
status.installed,
"stale version with all files present → installed=true"
);
assert!(!status.partial, "all files present → partial=false");
assert!(!status.up_to_date); assert_eq!(status.installed_version.as_deref(), Some("0.1.0+20260510"));
}
#[test]
fn check_installation_partial_install_is_not_installed() {
let responses: &[&[u8]] = &[
&MockPort::ok_with_stdout(LIBRARY_VERSION), &MockPort::ok_with_stdout("1"), &MockPort::ok_with_stdout("0"), &MockPort::ok_with_stdout("0"), &MockPort::ok_with_stdout("0"), ];
let mut port = MockPort::with_responses(responses);
let status = check_installation(&mut port).unwrap();
assert!(
!status.installed,
"partial install must NOT report installed=true"
);
assert!(status.partial, "partial install must set partial=true");
assert_eq!(status.files_present.len(), 1);
assert_eq!(status.files_missing.len(), 3);
}
#[test]
fn install_if_needed_skips_when_up_to_date() {
let responses: &[&[u8]] = &[
&MockPort::ok_with_stdout(LIBRARY_VERSION),
&MockPort::ok_with_stdout("1"),
&MockPort::ok_with_stdout("1"),
&MockPort::ok_with_stdout("1"),
&MockPort::ok_with_stdout("1"),
];
let mut port = MockPort::with_responses(responses);
let installed = install_if_needed(&mut port).unwrap();
assert!(!installed, "should return false when already up-to-date");
}
#[test]
fn install_if_needed_installs_when_missing() {
let n_font = chunk_at_char_boundaries(FONT_PY, CHUNK_SIZE).len();
let n_effects = chunk_at_char_boundaries(EFFECTS_PY, CHUNK_SIZE).len();
let n_init = chunk_at_char_boundaries(INIT_PY, CHUNK_SIZE).len();
let mut responses: Vec<Vec<u8>> = vec![
MockPort::ok_with_stdout(""), MockPort::ok_with_stdout("0"), MockPort::ok_with_stdout("0"), MockPort::ok_with_stdout("0"), MockPort::ok_with_stdout("0"), ];
responses.push(MockPort::ok_frame());
for _ in 0..(n_font + 3) {
responses.push(MockPort::ok_frame());
}
responses.push(MockPort::ok_with_stdout("1000")); for _ in 0..(n_effects + 3) {
responses.push(MockPort::ok_frame());
}
responses.push(MockPort::ok_with_stdout("5000")); for _ in 0..(n_init + 3) {
responses.push(MockPort::ok_frame());
}
responses.push(MockPort::ok_with_stdout("2000")); responses.push(MockPort::ok_frame());
let frame_refs: Vec<&[u8]> = responses.iter().map(|v| v.as_slice()).collect();
let mut port = MockPort::with_responses(&frame_refs);
let installed = install_if_needed(&mut port).unwrap();
assert!(installed, "should return true when install was performed");
}
#[test]
fn uninstall_returns_summary_all_ok() {
let responses: &[&[u8]] = &[
&MockPort::ok_with_stdout("OK"), &MockPort::ok_with_stdout("OK"), &MockPort::ok_with_stdout("OK"), &MockPort::ok_with_stdout("OK"), &MockPort::ok_frame(), ];
let mut port = MockPort::with_responses(responses);
let result = uninstall(&mut port).unwrap();
assert_eq!(result.attempted, 4);
assert_eq!(result.attempted_actual, 4);
assert_eq!(result.removed, 4);
assert!(
result.errors.is_empty(),
"no errors expected; got: {:?}",
result.errors
);
let written = String::from_utf8_lossy(&port.write_data);
assert!(
written.contains("jumperless.jfs.remove"),
"uninstall must call jumperless.jfs.remove; got: {written}"
);
assert!(
!written.contains("fs_write") || !written.contains("''"),
"uninstall must NOT use VERSION-clearing fallback (Phase C removed it)"
);
}
#[test]
fn uninstall_no_unbound_token_in_script() {
let responses: &[&[u8]] = &[
&MockPort::ok_with_stdout("OK"),
&MockPort::ok_with_stdout("OK"),
&MockPort::ok_with_stdout("OK"),
&MockPort::ok_with_stdout("OK"),
&MockPort::ok_frame(),
];
let mut port = MockPort::with_responses(responses);
uninstall(&mut port).unwrap();
let written = String::from_utf8_lossy(&port.write_data);
assert!(
!written.contains("UNBOUND"),
"uninstall must NOT classify UNBOUND (Phase C: jfs.remove is confirmed bound); got: {written}"
);
}
#[test]
fn uninstall_counts_absent_file_as_removed() {
let responses: &[&[u8]] = &[
&MockPort::ok_with_stdout("ABSENT"), &MockPort::ok_with_stdout("OK"), &MockPort::ok_with_stdout("OK"), &MockPort::ok_with_stdout("ABSENT"), &MockPort::ok_frame(), ];
let mut port = MockPort::with_responses(responses);
let result = uninstall(&mut port).unwrap();
assert_eq!(result.removed, 4, "ABSENT + OK + OK + ABSENT = 4 removes");
assert_eq!(result.attempted_actual, 4, "all 4 files attempted");
assert!(result.errors.is_empty());
}
#[test]
fn uninstall_captures_unexpected_errors() {
let responses: &[&[u8]] = &[
&MockPort::ok_with_stdout("ERR:IOError something weird"), &MockPort::ok_with_stdout("OK"), &MockPort::ok_with_stdout("OK"), &MockPort::ok_with_stdout("OK"), &MockPort::ok_frame(), ];
let mut port = MockPort::with_responses(responses);
let result = uninstall(&mut port).unwrap();
assert_eq!(result.removed, 3, "OK+OK+OK = 3 removes; ERR doesn't count");
assert_eq!(
result.attempted_actual, 4,
"ERR doesn't break the loop; all 4 attempted"
);
assert_eq!(
result.errors.len(),
1,
"unexpected exception → 1 error entry"
);
assert!(result.errors[0].contains("ERR:IOError"));
}
#[test]
fn library_paths_start_with_library_root() {
assert!(FONT_PATH.starts_with(LIBRARY_ROOT));
assert!(EFFECTS_PATH.starts_with(LIBRARY_ROOT));
assert!(INIT_PATH.starts_with(LIBRARY_ROOT));
assert!(VERSION_PATH.starts_with(LIBRARY_ROOT));
}
#[test]
fn install_paths_are_under_python_scripts_lib() {
assert!(
LIBRARY_ROOT.starts_with(PYTHON_SCRIPTS_LIB),
"LIBRARY_ROOT must be under /python_scripts/lib/ (sys.path); got: {LIBRARY_ROOT}"
);
assert!(
FONT_PATH.starts_with(PYTHON_SCRIPTS_LIB),
"FONT_PATH must be under /python_scripts/lib/; got: {FONT_PATH}"
);
assert!(
EFFECTS_PATH.starts_with(PYTHON_SCRIPTS_LIB),
"EFFECTS_PATH must be under /python_scripts/lib/; got: {EFFECTS_PATH}"
);
assert!(
INIT_PATH.starts_with(PYTHON_SCRIPTS_LIB),
"INIT_PATH must be under /python_scripts/lib/; got: {INIT_PATH}"
);
assert!(
VERSION_PATH.starts_with(PYTHON_SCRIPTS_LIB),
"VERSION_PATH must be under /python_scripts/lib/; got: {VERSION_PATH}"
);
}
#[test]
fn init_py_contains_version_string() {
assert!(
INIT_PY.contains(LIBRARY_VERSION),
"INIT_PY must contain LIBRARY_VERSION ({LIBRARY_VERSION}); got INIT_PY start: {:.100}",
INIT_PY
);
assert!(
INIT_PY.contains("__version__"),
"INIT_PY must define __version__"
);
}
#[test]
fn init_py_contains_ceremony_functions() {
assert!(
INIT_PY.contains("def _ceremony_connect"),
"INIT_PY must define _ceremony_connect()"
);
assert!(
INIT_PY.contains("def _ceremony_disconnect"),
"INIT_PY must define _ceremony_disconnect()"
);
}
#[test]
fn font_py_contains_font_dict() {
assert!(
FONT_PY.contains("FONT = {"),
"font.py must define FONT dict"
);
}
#[test]
fn effects_py_contains_key_functions() {
assert!(
EFFECTS_PY.contains("def marquee("),
"effects.py must define marquee"
);
assert!(
EFFECTS_PY.contains("def marquee_scroll("),
"effects.py must define marquee_scroll"
);
assert!(
EFFECTS_PY.contains("def corner_frame("),
"effects.py must define corner_frame"
);
assert!(
EFFECTS_PY.contains("def wipe_edges("),
"effects.py must define wipe_edges"
);
}
#[test]
fn effects_py_defines_nasa_orange() {
assert!(
EFFECTS_PY.contains("NASA_ORANGE = 0xFC4C00"),
"effects.py must define NASA_ORANGE constant"
);
}
#[test]
fn font_py_covers_mcp_connected_chars() {
for ch in &[
"\"M\"", "\"C\"", "\"P\"", "\"O\"", "\"N\"", "\"E\"", "\"T\"", "\"D\"", "\" \"",
] {
assert!(
FONT_PY.contains(ch),
"font.py FONT dict must include key {} (needed for 'MCP CONNECTED')",
ch
);
}
}
#[test]
fn font_py_covers_mcp_disconnected_chars() {
for ch in &["\"I\"", "\"S\""] {
assert!(
FONT_PY.contains(ch),
"font.py FONT dict must include key {} (needed for 'MCP DISCONNECTED')",
ch
);
}
}
#[test]
fn exec_with_cleanup_rejects_device_side_exception() {
let stderr_msg = b"MemoryError: memory allocation failed";
let mut frame = b"OK".to_vec();
frame.push(b'\x04');
frame.extend_from_slice(stderr_msg);
frame.push(b'\x04');
frame.push(b'>');
let mut port = MockPort::with_responses(&[frame.as_slice()]);
let result = exec_with_cleanup(
&mut port,
"big_concat_op()",
"fs_write_chunked(x): chunk 1 (append)",
);
assert!(
result.is_err(),
"exec_with_cleanup must return Err when resp.is_error() is true; \
got Ok (device exception silently absorbed)"
);
match result.unwrap_err() {
McpError::Protocol(msg) => {
assert!(
msg.contains("device-side exception"),
"error must mention device-side exception; got: {msg}"
);
assert!(
msg.contains("MemoryError"),
"error must include the device stderr content; got: {msg}"
);
}
other => panic!("expected McpError::Protocol, got: {other:?}"),
}
assert!(
port.write_data.contains(&0x03),
"exec_with_cleanup must send Ctrl-C after device-side exception; write_data={:?}",
port.write_data
);
}
#[test]
fn fs_write_chunked_del_cleanup_device_exception_does_not_propagate() {
let content = "short content";
let path = "/test/file.py";
let del_exception_frame = {
let mut frame = b"OK".to_vec();
frame.push(b'\x04');
frame.extend_from_slice(b"NameError: name '_ygg_f' is not defined");
frame.push(b'\x04');
frame.push(b'>');
frame
};
let frames: Vec<Vec<u8>> = vec![
MockPort::ok_frame(), MockPort::ok_frame(), MockPort::ok_frame(), MockPort::ok_frame(), del_exception_frame, ];
let frame_refs: Vec<&[u8]> = frames.iter().map(|v| v.as_slice()).collect();
let mut port = MockPort::with_responses(&frame_refs);
let result = fs_write_chunked(&mut port, path, content);
assert!(
result.is_ok(),
"del _ygg_f device exception must not propagate from fs_write_chunked; got: {result:?}"
);
}
#[test]
fn read_installed_version_returns_some_when_present() {
let mut port = MockPort::with_responses(&[&MockPort::ok_with_stdout(LIBRARY_VERSION)]);
let result = read_installed_version(&mut port);
assert!(result.is_ok(), "should succeed; got: {result:?}");
assert_eq!(result.unwrap(), Some(LIBRARY_VERSION.to_string()));
}
#[test]
fn read_installed_version_returns_none_when_missing() {
let mut port = MockPort::with_responses(&[&MockPort::ok_with_stdout("")]);
let result = read_installed_version(&mut port);
assert!(
result.is_ok(),
"empty stdout = missing file; should be Ok(None)"
);
assert_eq!(result.unwrap(), None);
}
#[test]
fn read_installed_version_returns_err_on_protocol_failure() {
let mut port = MockPort::with_responses(&[]);
let result = read_installed_version(&mut port);
assert!(
result.is_err(),
"protocol-level failure must be Err, not Ok(None); got: {result:?}"
);
}
#[test]
fn reinstall_result_carries_pre_uninstall_info() {
let n_font = chunk_at_char_boundaries(FONT_PY, CHUNK_SIZE).len();
let n_effects = chunk_at_char_boundaries(EFFECTS_PY, CHUNK_SIZE).len();
let n_init = chunk_at_char_boundaries(INIT_PY, CHUNK_SIZE).len();
let mut responses: Vec<Vec<u8>> = vec![
MockPort::ok_with_stdout("OK"), MockPort::ok_with_stdout("OK"), MockPort::ok_with_stdout("OK"), MockPort::ok_with_stdout("OK"), MockPort::ok_frame(), MockPort::ok_with_stdout(""), MockPort::ok_with_stdout("0"), MockPort::ok_with_stdout("0"), MockPort::ok_with_stdout("0"), MockPort::ok_with_stdout("0"), ];
responses.push(MockPort::ok_frame());
for _ in 0..(n_font + 3) {
responses.push(MockPort::ok_frame());
}
responses.push(MockPort::ok_with_stdout("1000"));
for _ in 0..(n_effects + 3) {
responses.push(MockPort::ok_frame());
}
responses.push(MockPort::ok_with_stdout("5000"));
for _ in 0..(n_init + 3) {
responses.push(MockPort::ok_frame());
}
responses.push(MockPort::ok_with_stdout("2000"));
responses.push(MockPort::ok_frame());
let frame_refs: Vec<&[u8]> = responses.iter().map(|v| v.as_slice()).collect();
let mut port = MockPort::with_responses(&frame_refs);
let result = reinstall(&mut port).unwrap();
assert_eq!(result.installed_version, LIBRARY_VERSION);
let pre = result.pre_uninstall.expect("pre_uninstall should be Some");
assert_eq!(
pre.removed, 4,
"all 4 files should be removed in pre-uninstall"
);
assert!(pre.errors.is_empty(), "clean uninstall has no errors");
}
#[test]
fn uninstall_attempted_actual_reflects_actual_iterations() {
let responses: &[&[u8]] = &[
&MockPort::ok_with_stdout("OK"), &MockPort::ok_with_stdout("OK"), &MockPort::ok_with_stdout("ABSENT"), &MockPort::ok_with_stdout("OK"), &MockPort::ok_frame(), ];
let mut port = MockPort::with_responses(responses);
let result = uninstall(&mut port).unwrap();
assert_eq!(result.attempted, 4, "attempted = intent (4 files)");
assert_eq!(result.attempted_actual, 4, "all 4 files attempted");
assert_eq!(result.removed, 4, "OK+OK+ABSENT+OK = 4 removes");
}
#[test]
fn read_installed_version_returns_err_on_device_side_exception() {
let stderr_msg = b"MemoryError: memory allocation failed, allocating 16 bytes";
let mut frame = b"OK".to_vec();
frame.push(b'\x04');
frame.extend_from_slice(stderr_msg);
frame.push(b'\x04');
frame.push(b'>');
let mut port = MockPort::with_responses(&[frame.as_slice()]);
let result = read_installed_version(&mut port);
assert!(
result.is_err(),
"read_installed_version must return Err when resp.is_error()==true; \
got Ok (device exception silently treated as file-missing)"
);
match result.unwrap_err() {
McpError::Protocol(msg) => {
assert!(
msg.contains("device-side exception"),
"error must contain 'device-side exception'; got: {msg}"
);
assert!(
msg.contains("MemoryError"),
"error must include device stderr content; got: {msg}"
);
}
other => panic!("expected McpError::Protocol, got: {other:?}"),
}
}
#[test]
fn uninstall_per_file_device_exception_surfaces_in_errors() {
let exception_frame = {
let mut frame = b"OK".to_vec();
frame.push(b'\x04');
frame.extend_from_slice(b"MemoryError: out of memory printing token");
frame.push(b'\x04');
frame.push(b'>');
frame
};
let frames: Vec<Vec<u8>> = vec![
MockPort::ok_with_stdout("OK"), exception_frame, MockPort::ok_with_stdout("OK"), MockPort::ok_with_stdout("OK"), MockPort::ok_frame(), ];
let frame_refs: Vec<&[u8]> = frames.iter().map(|v| v.as_slice()).collect();
let mut port = MockPort::with_responses(&frame_refs);
let result = uninstall(&mut port).unwrap();
assert_eq!(
result.removed, 3,
"font.py + __init__.py + VERSION should be removed; got {}",
result.removed
);
assert!(
!result.errors.is_empty(),
"device exception must surface in errors; got empty errors"
);
assert!(
result
.errors
.iter()
.any(|e| e.contains("device-side exception") && e.contains("MemoryError")),
"errors must contain 'device-side exception' + stderr content; got: {:?}",
result.errors
);
assert_eq!(
result.attempted_actual, 4,
"exception doesn't break loop; all 4 attempted"
);
}
#[test]
fn chunk_size_payload_fits_in_repl_buffer() {
let append_wrapper = "_ygg_assembly = _ygg_assembly + ''''''".len(); let max_payload = CHUNK_SIZE + append_wrapper;
assert!(
max_payload + 8 <= 1024,
"CHUNK_SIZE={CHUNK_SIZE} + append wrapper ({append_wrapper}) + 8 safety = {} \
must fit in 1024-byte REPL buffer",
max_payload + 8
);
}
#[test]
fn library_version_contains_build_metadata_marker() {
assert!(
LIBRARY_VERSION.contains('+'),
"LIBRARY_VERSION must contain '+' for SemVer 2.0 build metadata; got: {LIBRARY_VERSION}"
);
}
#[test]
fn parse_verify_size_and_hash_present() {
let stdout =
"size:5372\nsha256:abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890\n";
let (size, sha, hash_avail) = parse_device_verify_output(stdout).unwrap();
assert_eq!(size, Some(5372));
assert_eq!(
sha.as_deref(),
Some("abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890")
);
assert!(
hash_avail,
"hash_available should be true when a hex sha256 is returned"
);
}
#[test]
fn parse_verify_no_hashlib_sentinel() {
let stdout = "size:234\nsha256:NO_HASHLIB\n";
let (size, sha, hash_avail) = parse_device_verify_output(stdout).unwrap();
assert_eq!(size, Some(234));
assert!(
sha.is_none(),
"NO_HASHLIB sentinel must produce None device_sha256"
);
assert!(
!hash_avail,
"hash_available must be false when NO_HASHLIB returned"
);
}
#[test]
fn parse_verify_missing_file() {
let stdout = "size:MISSING\nsha256:MISSING\n";
let (size, sha, hash_avail) = parse_device_verify_output(stdout).unwrap();
assert!(size.is_none(), "size must be None for MISSING file");
assert!(sha.is_none(), "sha256 must be None for MISSING file");
assert!(!hash_avail, "hash_available must be false for MISSING file");
}
#[test]
fn parse_verify_error_line_returns_err() {
let stdout = "error:OSError: [Errno 2] ENOENT\n";
let result = parse_device_verify_output(stdout);
assert!(result.is_err(), "error: line with no size must produce Err");
let msg = result.unwrap_err();
assert!(
msg.contains("ENOENT"),
"error message must include device error text; got: {msg}"
);
}
#[test]
fn parse_verify_size_present_with_error_after() {
let stdout = "size:5372\nerror:'sha256' object has no attribute 'hexdigest'\n";
let (size, sha, hash_avail) = parse_device_verify_output(stdout).unwrap();
assert_eq!(
size,
Some(5372),
"size must be preserved even when error: follows"
);
assert!(sha.is_none(), "sha256 must be None when hash errored");
assert!(
!hash_avail,
"hash_available must be false when hash errored"
);
}
#[test]
fn parse_verify_no_hex_available_sentinel() {
let stdout = "size:234\nsha256:NO_HEX_AVAILABLE\n";
let (size, sha, hash_avail) = parse_device_verify_output(stdout).unwrap();
assert_eq!(size, Some(234));
assert!(
sha.is_none(),
"NO_HEX_AVAILABLE sentinel must produce None device_sha256"
);
assert!(
!hash_avail,
"hash_available must be false for NO_HEX_AVAILABLE sentinel"
);
}
#[test]
fn parse_dump_output_happy_path() {
let stdout = "size:5\nhex:48656c6c6f\n";
let (size, content) = parse_dump_output(stdout).unwrap();
assert_eq!(size, Some(5), "size must be 5");
assert_eq!(
content.as_deref(),
Some(b"Hello".as_ref()),
"decoded bytes must equal b\"Hello\""
);
}
#[test]
fn parse_dump_output_missing_file() {
let stdout = "size:MISSING\nhex:MISSING\n";
let (size, content) = parse_dump_output(stdout).unwrap();
assert!(size.is_none(), "size must be None for missing file");
assert!(content.is_none(), "content must be None for missing file");
}
#[test]
fn hex_to_bytes_round_trip() {
let all_bytes: Vec<u8> = (0u8..=255u8).collect();
let hex_lower: String = all_bytes.iter().map(|b| format!("{b:02x}")).collect();
let decoded_lower = hex_to_bytes(&hex_lower).expect("lowercase hex must decode cleanly");
assert_eq!(
decoded_lower, all_bytes,
"lowercase hex round-trip must recover all 256 byte values"
);
let hex_upper: String = all_bytes.iter().map(|b| format!("{b:02X}")).collect();
let decoded_upper = hex_to_bytes(&hex_upper).expect("uppercase hex must decode cleanly");
assert_eq!(
decoded_upper, all_bytes,
"uppercase hex round-trip must recover all 256 byte values"
);
}
#[test]
fn device_verify_script_uses_jfs_pattern() {
let script = device_verify_script("/jumperless_mcp/effects.py");
assert!(
script.contains("jumperless.jfs.open"),
"device_verify_script must use jfs for read; got:\n{script}"
);
assert!(
script.contains(&format!(
"jumperless.jfs.read(_ygg_rf, {JFS_READ_MAX_BYTES})"
)),
"device_verify_script must pass explicit size to jfs.read; got:\n{script}"
);
assert!(
script.contains("except (NameError, AttributeError)"),
"device_verify_script must include jfs-fallback handler; got:\n{script}"
);
assert!(
script.contains("fs_read("),
"device_verify_script must retain fs_read as fallback; got:\n{script}"
);
}
#[test]
fn device_dump_script_uses_jfs_pattern() {
let script = device_dump_script("/jumperless_mcp/effects.py");
assert!(
script.contains("jumperless.jfs.open"),
"device_dump_script must use jfs for read; got:\n{script}"
);
assert!(
script.contains(&format!(
"jumperless.jfs.read(_ygg_rf, {JFS_READ_MAX_BYTES})"
)),
"device_dump_script must pass explicit size to jfs.read; got:\n{script}"
);
assert!(
script.contains("except (NameError, AttributeError)"),
"device_dump_script must include jfs-fallback handler; got:\n{script}"
);
assert!(
script.contains("fs_read("),
"device_dump_script must retain fs_read as fallback; got:\n{script}"
);
}
#[test]
fn jfs_read_with_size_has_typeerror_fallback_in_generated_scripts() {
let verify_script = device_verify_script("/python_scripts/lib/jumperless_mcp/effects.py");
assert!(
verify_script.contains("except TypeError:"),
"device_verify_script must have except TypeError fallback for jfs.read size arg; got:\n{verify_script}"
);
assert!(
verify_script.contains("jumperless.jfs.read(_ygg_rf)"),
"device_verify_script TypeError fallback must call no-arg jfs.read; got:\n{verify_script}"
);
let dump_script = device_dump_script("/python_scripts/lib/jumperless_mcp/effects.py");
assert!(
dump_script.contains("except TypeError:"),
"device_dump_script must have except TypeError fallback for jfs.read size arg; got:\n{dump_script}"
);
assert!(
dump_script.contains("jumperless.jfs.read(_ygg_rf)"),
"device_dump_script TypeError fallback must call no-arg jfs.read; got:\n{dump_script}"
);
use crate::ceremony::{CEREMONY_CONNECT_SCRIPT, CEREMONY_DISCONNECT_SCRIPT};
assert!(
!CEREMONY_CONNECT_SCRIPT.contains("jfs.read"),
"CEREMONY_CONNECT_SCRIPT must NOT use jfs.read (Phase B: import replaces it); got:\n{CEREMONY_CONNECT_SCRIPT}"
);
assert!(
!CEREMONY_DISCONNECT_SCRIPT.contains("jfs.read"),
"CEREMONY_DISCONNECT_SCRIPT must NOT use jfs.read (Phase B: import replaces it); got:\n{CEREMONY_DISCONNECT_SCRIPT}"
);
}
#[test]
fn jfs_fallback_output_is_parseable_by_verify_parser() {
let stdout =
"size:234\nsha256:abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890\n";
let (size, sha, hash_avail) = parse_device_verify_output(stdout).unwrap();
assert_eq!(size, Some(234));
assert!(sha.is_some(), "sha256 must be present in fallback output");
assert!(hash_avail, "hash_available must be true");
}
#[test]
fn jfs_fallback_output_is_parseable_by_dump_parser() {
let stdout = "size:5\nhex:48656c6c6f\n";
let (size, content) = parse_dump_output(stdout).unwrap();
assert_eq!(size, Some(5));
assert_eq!(content.as_deref(), Some(b"Hello".as_ref()));
}
#[test]
fn verify_source_normalization_handles_crlf() {
use sha2::{Digest, Sha256};
let crlf_source = "line1\r\nline2\r\nline3\r\n";
let lf_source = "line1\nline2\nline3\n";
let normalized = crlf_source.replace("\r\n", "\n").replace('\r', "\n");
assert_eq!(
normalized, lf_source,
"normalization must produce LF-only content"
);
let normalized_size = normalized.len();
let crlf_size = crlf_source.len();
assert!(
normalized_size < crlf_size,
"LF-normalized size ({normalized_size}) must be less than CRLF size ({crlf_size})"
);
assert_eq!(
normalized_size,
lf_source.len(),
"normalized size must match pure-LF variant"
);
let sha_of_normalized: String = Sha256::digest(normalized.as_bytes())
.iter()
.map(|b| format!("{b:02x}"))
.collect();
let sha_of_lf: String = Sha256::digest(lf_source.as_bytes())
.iter()
.map(|b| format!("{b:02x}"))
.collect();
let sha_of_crlf: String = Sha256::digest(crlf_source.as_bytes())
.iter()
.map(|b| format!("{b:02x}"))
.collect();
assert_eq!(
sha_of_normalized, sha_of_lf,
"SHA of normalized content must equal SHA of LF-only content"
);
assert_ne!(
sha_of_normalized, sha_of_crlf,
"SHA of normalized content must differ from SHA of raw CRLF content \
(confirming the normalization actually changes the hash)"
);
}
#[test]
fn device_verify_script_contains_isinstance_encode_guard() {
let script = device_verify_script("/jumperless_mcp/font.py");
assert!(
script.contains("if isinstance(content, str):"),
"device_verify_script must contain isinstance(content, str) guard; script:\n{script}"
);
assert!(
script.contains("content = content.encode('utf-8')"),
"device_verify_script must contain content.encode('utf-8') call; script:\n{script}"
);
}
#[test]
fn device_dump_script_contains_isinstance_encode_guard() {
let script = device_dump_script("/jumperless_mcp/effects.py");
assert!(
script.contains("if isinstance(content, str):"),
"device_dump_script must contain isinstance(content, str) guard; script:\n{script}"
);
assert!(
script.contains("content = content.encode('utf-8')"),
"device_dump_script must contain content.encode('utf-8') call; script:\n{script}"
);
}
#[test]
fn fs_write_chunked_no_flush_calls() {
let content: String = "y".repeat(2000); let path = "/python_scripts/lib/jumperless_mcp/test.py";
let n_chunks = chunk_at_char_boundaries(&content, CHUNK_SIZE).len();
let frames: Vec<Vec<u8>> = (0..(n_chunks + 3)).map(|_| MockPort::ok_frame()).collect();
let frame_refs: Vec<&[u8]> = frames.iter().map(|v| v.as_slice()).collect();
let mut port = MockPort::with_responses(&frame_refs);
fs_write_chunked(&mut port, path, &content).unwrap();
let written = String::from_utf8_lossy(&port.write_data);
let flush_count = written.matches("_ygg_f.flush()").count();
assert_eq!(
flush_count, 0,
"Phase C: fs_write_chunked must not emit any flush() calls \
(jfs.flush is not in the JFS module globals table)"
);
}
}