use crate::base::{McpError, ToolDescriptor};
use serde_json::{json, Value};
use std::io::{Read, Write};
use crate::library::exec_with_cleanup;
fn no_args() -> Value {
json!({"type": "object", "properties": {}, "additionalProperties": false})
}
pub fn context_get_descriptor() -> ToolDescriptor {
ToolDescriptor::with_timeout(
"context_get",
"Get the current Python connection context on the Jumperless V5. \
Returns {\"context\": \"global\"} or {\"context\": \"python\"}. \
In 'global' mode, Python changes persist to the active slot. \
In 'python' mode, Python changes are isolated to slot 99 and do NOT \
survive Python exit. Context resets to 'global' on every power cycle.",
no_args(),
1_000,
)
}
pub fn context_toggle_descriptor() -> ToolDescriptor {
ToolDescriptor::with_timeout(
"context_toggle",
"Toggle the Python connection context on the Jumperless V5 between 'global' and \
'python' isolation modes. Returns the NEW context after the toggle. \
WARNING: the change takes effect on the NEXT Python session entry only — \
toggling mid-session has no retroactive effect on the current session. \
Context is RAM-only and resets to 'global' on power cycle.",
no_args(),
1_000,
)
}
pub fn descriptors() -> Vec<ToolDescriptor> {
vec![context_get_descriptor(), context_toggle_descriptor()]
}
pub fn handle_context_get<P: Read + Write + ?Sized>(port: &mut P) -> Result<Value, McpError> {
let code = "print(context_get())";
let resp = exec_with_cleanup(port, code, "context_get")?;
let ctx = resp.stdout.trim().to_string();
if ctx != "global" && ctx != "python" {
return Err(McpError::Protocol(format!(
"context_get: unexpected value from device: '{ctx}'"
)));
}
Ok(json!({ "context": ctx }))
}
pub fn handle_context_toggle<P: Read + Write + ?Sized>(port: &mut P) -> Result<Value, McpError> {
let toggle_code = "context_toggle()";
exec_with_cleanup(port, toggle_code, "context_toggle")?;
let get_code = "print(context_get())";
let resp = exec_with_cleanup(port, get_code, "context_toggle:get_new")?;
let new_ctx = resp.stdout.trim().to_string();
if new_ctx != "global" && new_ctx != "python" {
return Err(McpError::Protocol(format!(
"context_toggle: unexpected new context from device: '{new_ctx}'"
)));
}
Ok(json!({
"new_context": new_ctx,
"note": "change takes effect on NEXT Python session entry only"
}))
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::VecDeque;
use std::io::{self, Read, Write};
struct MockPort {
read_data: VecDeque<u8>,
pub write_data: Vec<u8>,
}
impl MockPort {
fn with_responses(responses: &[&[u8]]) -> Self {
let mut buf = Vec::new();
for r in responses {
buf.extend_from_slice(r);
}
MockPort {
read_data: VecDeque::from(buf),
write_data: Vec::new(),
}
}
fn ok_frame() -> Vec<u8> {
b"OK\x04\x04>".to_vec()
}
fn ok_with_stdout(line: &str) -> Vec<u8> {
let mut v = b"OK".to_vec();
v.extend_from_slice(line.as_bytes());
v.push(b'\n');
v.extend_from_slice(b"\x04\x04>");
v
}
fn error_frame(msg: &str) -> Vec<u8> {
let mut v = b"OK\x04".to_vec();
v.extend_from_slice(msg.as_bytes());
v.push(b'\n');
v.push(b'\x04');
v.push(b'>');
v
}
}
impl Read for MockPort {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let n = buf.len().min(self.read_data.len());
if n == 0 {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"MockPort: no more scripted bytes",
));
}
for (dst, src) in buf[..n].iter_mut().zip(self.read_data.drain(..n)) {
*dst = src;
}
Ok(n)
}
}
impl Write for MockPort {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.write_data.extend_from_slice(buf);
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
#[test]
fn all_descriptors_have_correct_names() {
let descs = descriptors();
let names: Vec<&str> = descs.iter().map(|d| d.name.as_str()).collect();
assert!(names.contains(&"context_get"));
assert!(names.contains(&"context_toggle"));
assert_eq!(descs.len(), 2);
}
#[test]
fn all_descriptors_have_object_schema_with_additional_properties_false() {
for d in descriptors() {
assert!(
matches!(d.input_schema, Value::Object(_)),
"descriptor '{}' must have object input_schema",
d.name
);
assert_eq!(
d.input_schema.get("additionalProperties"),
Some(&Value::Bool(false)),
"descriptor '{}' must have additionalProperties=false",
d.name
);
}
}
#[test]
fn context_toggle_description_mentions_next_entry() {
let d = context_toggle_descriptor();
assert!(
d.description.contains("NEXT Python session entry"),
"context_toggle must warn about next-entry semantics"
);
}
#[test]
fn context_get_returns_global() {
let frame = MockPort::ok_with_stdout("global");
let mut port = MockPort::with_responses(&[&frame]);
let result = handle_context_get(&mut port).unwrap();
assert_eq!(result["context"], "global");
}
#[test]
fn context_get_returns_python() {
let frame = MockPort::ok_with_stdout("python");
let mut port = MockPort::with_responses(&[&frame]);
let result = handle_context_get(&mut port).unwrap();
assert_eq!(result["context"], "python");
}
#[test]
fn context_get_unexpected_value_returns_error() {
let frame = MockPort::ok_with_stdout("unknown_mode");
let mut port = MockPort::with_responses(&[&frame]);
let result = handle_context_get(&mut port);
assert!(result.is_err());
}
#[test]
fn context_get_device_error_sends_ctrl_c() {
let err = MockPort::error_frame("NameError: context_get");
let mut port = MockPort::with_responses(&[&err]);
let result = handle_context_get(&mut port);
assert!(result.is_err());
assert!(port.write_data.contains(&0x03));
}
#[test]
fn context_toggle_returns_new_context_python() {
let toggle_frame = MockPort::ok_frame();
let get_frame = MockPort::ok_with_stdout("python");
let mut port = MockPort::with_responses(&[&toggle_frame, &get_frame]);
let result = handle_context_toggle(&mut port).unwrap();
assert_eq!(result["new_context"], "python");
assert!(result.get("note").is_some(), "must include next-entry note");
}
#[test]
fn context_toggle_returns_new_context_global() {
let toggle_frame = MockPort::ok_frame();
let get_frame = MockPort::ok_with_stdout("global");
let mut port = MockPort::with_responses(&[&toggle_frame, &get_frame]);
let result = handle_context_toggle(&mut port).unwrap();
assert_eq!(result["new_context"], "global");
}
#[test]
fn context_toggle_device_error_on_toggle_sends_ctrl_c() {
let err = MockPort::error_frame("NameError: context_toggle");
let mut port = MockPort::with_responses(&[&err]);
let result = handle_context_toggle(&mut port);
assert!(result.is_err());
assert!(port.write_data.contains(&0x03));
}
}