use crate::base::{McpError, ToolDescriptor};
use serde_json::{json, Value};
use std::io::{Read, Write};
use crate::library::exec_with_cleanup;
const PIN_STRING_IDENTIFIERS: &[&str] = &[
"GPIO_1", "GPIO_2", "GPIO_3", "GPIO_4", "GPIO_5", "GPIO_6", "GPIO_7", "GPIO_8", "UART_TX",
"UART_RX", "D0", "D1", "D2", "D3", "D4", "D5", "D6", "D7", "D8", "D9", "D10", "D11", "D12",
"D13", "A0", "A1", "A2", "A3", "A4", "A5", "A6", "A7",
];
fn encode_pin(pin: &Value) -> Result<String, McpError> {
match pin {
Value::Number(n) => {
let i = n.as_i64().ok_or_else(|| {
McpError::Protocol("pin must be an integer 1-8 or a string identifier".into())
})?;
if !(1..=8).contains(&i) {
return Err(McpError::Protocol(format!(
"pin integer must be 1-8; got {i}"
)));
}
Ok(i.to_string())
}
Value::String(s) => {
if s.is_empty() {
return Err(McpError::Protocol("pin string must not be empty".into()));
}
if s.len() > 16 {
return Err(McpError::Protocol(format!(
"pin identifier too long (max 16 chars): '{s}'"
)));
}
if !s.chars().all(|c| c.is_ascii_alphanumeric() || c == '_') {
return Err(McpError::Protocol(format!(
"pin identifier contains invalid characters (must be alphanumeric + '_'): '{s}'"
)));
}
if !PIN_STRING_IDENTIFIERS.contains(&s.as_str()) {
return Err(McpError::Protocol(format!(
"unknown pin identifier '{s}'; valid: GPIO_1-GPIO_8, UART_TX, UART_RX, D0-D13, A0-A7"
)));
}
Ok(s.clone())
}
_ => Err(McpError::Protocol(
"pin must be an integer 1-8 or a string identifier".into(),
)),
}
}
fn pin_schema() -> Value {
json!({
"oneOf": [
{
"type": "integer",
"minimum": 1,
"maximum": 8,
"description": "GPIO pin number 1-8"
},
{
"type": "string",
"description": "Named GPIO constant: GPIO_1-GPIO_8, UART_TX, UART_RX, D0-D13, A0-A7"
}
]
})
}
pub fn gpio_set_descriptor() -> ToolDescriptor {
ToolDescriptor::with_timeout(
"gpio_set",
"Set a Jumperless V5 digital GPIO pin HIGH (3.3 V) or LOW (0 V). \
Pin must first be configured as OUTPUT via gpio_set_dir. \
Pin can be an integer 1-8 or a named constant (GPIO_1-GPIO_8, UART_TX, UART_RX, D0-D13, A0-A7). \
Returns {\"set\": true, \"pin\": <as-passed>, \"value\": bool}.",
json!({
"type": "object",
"properties": {
"pin": pin_schema(),
"value": {
"type": "boolean",
"description": "true = 3.3 V (HIGH), false = 0 V (LOW)"
}
},
"required": ["pin", "value"],
"additionalProperties": false
}),
1_500,
)
}
pub fn gpio_get_descriptor() -> ToolDescriptor {
ToolDescriptor::with_timeout(
"gpio_get",
"Read the current level of a Jumperless V5 GPIO pin. \
Returns {\"value\": \"HIGH\" | \"LOW\" | \"FLOATING\", \"pin\": <as-passed>}. \
FLOATING is reported when the pin is INPUT with no driver. \
Pin can be an integer 1-8 or a named constant (GPIO_1-GPIO_8, UART_TX, UART_RX, D0-D13, A0-A7).",
json!({
"type": "object",
"properties": {
"pin": pin_schema()
},
"required": ["pin"],
"additionalProperties": false
}),
1_500,
)
}
pub fn gpio_set_dir_descriptor() -> ToolDescriptor {
ToolDescriptor::with_timeout(
"gpio_set_dir",
"Set the direction of a Jumperless V5 GPIO pin. \
\"OUTPUT\" drives the pin; \"INPUT\" puts it in high-impedance input mode. \
Pin can be an integer 1-8 or a named constant (GPIO_1-GPIO_8, UART_TX, UART_RX, D0-D13, A0-A7). \
Returns {\"set\": true, \"pin\": <as-passed>, \"direction\": <as-passed>}.",
json!({
"type": "object",
"properties": {
"pin": pin_schema(),
"direction": {
"type": "string",
"enum": ["OUTPUT", "INPUT"],
"description": "\"OUTPUT\" = drive pin; \"INPUT\" = high-impedance"
}
},
"required": ["pin", "direction"],
"additionalProperties": false
}),
1_500,
)
}
pub fn descriptors() -> Vec<ToolDescriptor> {
vec![
gpio_set_descriptor(),
gpio_get_descriptor(),
gpio_set_dir_descriptor(),
]
}
pub fn handle_gpio_set<P: Read + Write + ?Sized>(
port: &mut P,
args: &Value,
) -> Result<Value, McpError> {
let pin_raw = args
.get("pin")
.ok_or_else(|| McpError::Protocol("gpio_set requires 'pin'".into()))?;
let pin_expr = encode_pin(pin_raw)?;
let value = args
.get("value")
.and_then(|v| v.as_bool())
.ok_or_else(|| McpError::Protocol("gpio_set requires 'value' (bool)".into()))?;
let py_bool = if value { "True" } else { "False" };
let code = format!("gpio_set({pin_expr}, {py_bool})");
exec_with_cleanup(port, &code, "gpio_set")?;
Ok(json!({
"set": true,
"pin": pin_raw,
"value": value
}))
}
pub fn handle_gpio_get<P: Read + Write + ?Sized>(
port: &mut P,
args: &Value,
) -> Result<Value, McpError> {
let pin_raw = args
.get("pin")
.ok_or_else(|| McpError::Protocol("gpio_get requires 'pin'".into()))?;
let pin_expr = encode_pin(pin_raw)?;
let code = format!("print(gpio_get({pin_expr}))");
let resp = exec_with_cleanup(port, &code, "gpio_get")?;
let trimmed = resp.stdout.trim();
let level = match trimmed {
"HIGH" | "LOW" | "FLOATING" => trimmed,
other => {
return Err(McpError::Protocol(format!(
"gpio_get: unexpected device response: '{other}'"
)));
}
};
Ok(json!({
"value": level,
"pin": pin_raw
}))
}
pub fn handle_gpio_set_dir<P: Read + Write + ?Sized>(
port: &mut P,
args: &Value,
) -> Result<Value, McpError> {
let pin_raw = args
.get("pin")
.ok_or_else(|| McpError::Protocol("gpio_set_dir requires 'pin'".into()))?;
let pin_expr = encode_pin(pin_raw)?;
let direction = args
.get("direction")
.and_then(|v| v.as_str())
.ok_or_else(|| McpError::Protocol("gpio_set_dir requires 'direction' (string)".into()))?;
let py_bool = match direction {
"OUTPUT" => "True",
"INPUT" => "False",
other => {
return Err(McpError::Protocol(format!(
"gpio_set_dir: direction must be \"OUTPUT\" or \"INPUT\"; got \"{other}\""
)));
}
};
let code = format!("gpio_set_dir({pin_expr}, {py_bool})");
exec_with_cleanup(port, &code, "gpio_set_dir")?;
Ok(json!({
"set": true,
"pin": pin_raw,
"direction": direction
}))
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::VecDeque;
use std::io::{self, Read, Write};
struct MockPort {
read_data: VecDeque<u8>,
pub write_data: Vec<u8>,
}
impl MockPort {
fn with_responses(responses: &[&[u8]]) -> Self {
let mut buf = Vec::new();
for r in responses {
buf.extend_from_slice(r);
}
MockPort {
read_data: VecDeque::from(buf),
write_data: Vec::new(),
}
}
fn ok_frame() -> Vec<u8> {
b"OK\x04\x04>".to_vec()
}
fn ok_with_stdout(line: &str) -> Vec<u8> {
let mut v = b"OK".to_vec();
v.extend_from_slice(line.as_bytes());
v.push(b'\n');
v.extend_from_slice(b"\x04\x04>");
v
}
fn error_frame(msg: &str) -> Vec<u8> {
let mut v = b"OK\x04".to_vec();
v.extend_from_slice(msg.as_bytes());
v.push(b'\n');
v.push(b'\x04');
v.push(b'>');
v
}
}
impl Read for MockPort {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let n = buf.len().min(self.read_data.len());
if n == 0 {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"MockPort: no more scripted bytes",
));
}
for (dst, src) in buf[..n].iter_mut().zip(self.read_data.drain(..n)) {
*dst = src;
}
Ok(n)
}
}
impl Write for MockPort {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.write_data.extend_from_slice(buf);
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
#[test]
fn all_descriptors_have_correct_names() {
let descs = descriptors();
let names: Vec<&str> = descs.iter().map(|d| d.name.as_str()).collect();
assert!(names.contains(&"gpio_set"));
assert!(names.contains(&"gpio_get"));
assert!(names.contains(&"gpio_set_dir"));
assert_eq!(descs.len(), 3);
}
#[test]
fn all_descriptors_have_additional_properties_false() {
for d in descriptors() {
assert_eq!(
d.input_schema.get("additionalProperties"),
Some(&Value::Bool(false)),
"descriptor '{}' must have additionalProperties=false",
d.name
);
}
}
#[test]
fn encode_pin_int_roundtrip() {
assert_eq!(encode_pin(&json!(3)).unwrap(), "3");
assert_eq!(encode_pin(&json!(1)).unwrap(), "1");
assert_eq!(encode_pin(&json!(8)).unwrap(), "8");
}
#[test]
fn encode_pin_int_out_of_range_rejected() {
assert!(encode_pin(&json!(0)).is_err());
assert!(encode_pin(&json!(9)).is_err());
assert!(encode_pin(&json!(-1)).is_err());
}
#[test]
fn encode_pin_string_identifier_roundtrip() {
assert_eq!(encode_pin(&json!("GPIO_1")).unwrap(), "GPIO_1");
assert_eq!(encode_pin(&json!("UART_TX")).unwrap(), "UART_TX");
assert_eq!(encode_pin(&json!("D13")).unwrap(), "D13");
assert_eq!(encode_pin(&json!("A7")).unwrap(), "A7");
}
#[test]
fn encode_pin_string_too_long_rejected() {
let long = "A".repeat(17);
assert!(encode_pin(&Value::String(long)).is_err());
}
#[test]
fn encode_pin_string_injection_rejected() {
assert!(encode_pin(&json!("GPIO'1")).is_err());
assert!(encode_pin(&json!("GPIO;DROP")).is_err());
assert!(encode_pin(&json!("GPIO 1")).is_err());
}
#[test]
fn encode_pin_empty_string_rejected() {
assert!(encode_pin(&json!("")).is_err());
}
#[test]
fn encode_pin_non_whitelisted_identifier_rejected() {
for bad in &[
"os",
"exec",
"__import__",
"print",
"open",
"eval",
"GPIO_9",
"D14",
"A8",
] {
let result = encode_pin(&json!(bad));
assert!(
result.is_err(),
"non-whitelisted identifier '{bad}' must be rejected (shape-valid but not a real pin)"
);
match result.unwrap_err() {
McpError::Protocol(msg) => assert!(
msg.contains("unknown pin"),
"error must say 'unknown pin'; got: {msg}"
),
other => panic!("expected Protocol err, got: {other:?}"),
}
}
}
#[test]
fn encode_pin_whitelisted_identifiers_accepted() {
for ok in &[
"GPIO_1", "GPIO_8", "UART_TX", "UART_RX", "D0", "D13", "A0", "A7",
] {
assert!(
encode_pin(&json!(ok)).is_ok(),
"whitelisted identifier '{ok}' must be accepted"
);
}
}
#[test]
fn gpio_set_high_int_pin() {
let frame = MockPort::ok_frame();
let mut port = MockPort::with_responses(&[&frame]);
let args = json!({"pin": 3, "value": true});
let result = handle_gpio_set(&mut port, &args).unwrap();
assert_eq!(result["set"], true);
assert_eq!(result["value"], true);
let cmd = String::from_utf8_lossy(&port.write_data);
assert!(cmd.contains("gpio_set(3, True)"), "cmd was: {cmd}");
}
#[test]
fn gpio_set_low_string_pin() {
let frame = MockPort::ok_frame();
let mut port = MockPort::with_responses(&[&frame]);
let args = json!({"pin": "D5", "value": false});
let result = handle_gpio_set(&mut port, &args).unwrap();
assert_eq!(result["set"], true);
assert_eq!(result["value"], false);
let cmd = String::from_utf8_lossy(&port.write_data);
assert!(cmd.contains("gpio_set(D5, False)"), "cmd was: {cmd}");
}
#[test]
fn gpio_set_invalid_pin_int_rejected() {
let mut port = MockPort::with_responses(&[]);
let args = json!({"pin": 99, "value": true});
assert!(handle_gpio_set(&mut port, &args).is_err());
}
#[test]
fn gpio_set_invalid_pin_string_rejected() {
let mut port = MockPort::with_responses(&[]);
let args = json!({"pin": "bad'pin", "value": true});
assert!(handle_gpio_set(&mut port, &args).is_err());
}
#[test]
fn gpio_set_device_error_sends_ctrl_c() {
let err = MockPort::error_frame("NameError: gpio_set");
let mut port = MockPort::with_responses(&[&err]);
let args = json!({"pin": 1, "value": true});
let result = handle_gpio_set(&mut port, &args);
assert!(result.is_err());
assert!(port.write_data.contains(&0x03));
}
#[test]
fn gpio_get_high_response() {
let frame = MockPort::ok_with_stdout("HIGH");
let mut port = MockPort::with_responses(&[&frame]);
let args = json!({"pin": 2});
let result = handle_gpio_get(&mut port, &args).unwrap();
assert_eq!(result["value"], "HIGH");
assert_eq!(result["pin"], 2);
}
#[test]
fn gpio_get_low_response() {
let frame = MockPort::ok_with_stdout("LOW");
let mut port = MockPort::with_responses(&[&frame]);
let args = json!({"pin": "GPIO_3"});
let result = handle_gpio_get(&mut port, &args).unwrap();
assert_eq!(result["value"], "LOW");
assert_eq!(result["pin"], "GPIO_3");
}
#[test]
fn gpio_get_floating_response() {
let frame = MockPort::ok_with_stdout("FLOATING");
let mut port = MockPort::with_responses(&[&frame]);
let args = json!({"pin": 7});
let result = handle_gpio_get(&mut port, &args).unwrap();
assert_eq!(result["value"], "FLOATING");
}
#[test]
fn gpio_get_unexpected_response_returns_err() {
let frame = MockPort::ok_with_stdout("UNKNOWN");
let mut port = MockPort::with_responses(&[&frame]);
let args = json!({"pin": 1});
let result = handle_gpio_get(&mut port, &args);
assert!(result.is_err());
match result.unwrap_err() {
McpError::Protocol(msg) => {
assert!(msg.contains("unexpected"), "error must say 'unexpected'");
assert!(
msg.contains("UNKNOWN"),
"error must include the actual response"
);
}
other => panic!("expected McpError::Protocol, got: {other:?}"),
}
}
#[test]
fn gpio_get_empty_response_returns_err() {
let frame = MockPort::ok_with_stdout("");
let mut port = MockPort::with_responses(&[&frame]);
let args = json!({"pin": 1});
assert!(handle_gpio_get(&mut port, &args).is_err());
}
#[test]
fn gpio_get_device_error_sends_ctrl_c() {
let err = MockPort::error_frame("NameError: gpio_get");
let mut port = MockPort::with_responses(&[&err]);
let args = json!({"pin": 1});
let result = handle_gpio_get(&mut port, &args);
assert!(result.is_err());
assert!(port.write_data.contains(&0x03));
}
#[test]
fn gpio_set_dir_output() {
let frame = MockPort::ok_frame();
let mut port = MockPort::with_responses(&[&frame]);
let args = json!({"pin": 4, "direction": "OUTPUT"});
let result = handle_gpio_set_dir(&mut port, &args).unwrap();
assert_eq!(result["set"], true);
assert_eq!(result["direction"], "OUTPUT");
let cmd = String::from_utf8_lossy(&port.write_data);
assert!(cmd.contains("gpio_set_dir(4, True)"), "cmd was: {cmd}");
}
#[test]
fn gpio_set_dir_input() {
let frame = MockPort::ok_frame();
let mut port = MockPort::with_responses(&[&frame]);
let args = json!({"pin": "UART_RX", "direction": "INPUT"});
let result = handle_gpio_set_dir(&mut port, &args).unwrap();
assert_eq!(result["set"], true);
assert_eq!(result["direction"], "INPUT");
let cmd = String::from_utf8_lossy(&port.write_data);
assert!(
cmd.contains("gpio_set_dir(UART_RX, False)"),
"cmd was: {cmd}"
);
}
#[test]
fn gpio_set_dir_invalid_direction_rejected() {
let mut port = MockPort::with_responses(&[]);
let args = json!({"pin": 1, "direction": "TRISTATE"});
let result = handle_gpio_set_dir(&mut port, &args);
assert!(result.is_err());
match result.unwrap_err() {
McpError::Protocol(msg) => {
assert!(
msg.contains("TRISTATE"),
"error must include the bad value; got: {msg}"
);
}
other => panic!("expected McpError::Protocol, got: {other:?}"),
}
}
#[test]
fn gpio_set_dir_lowercase_direction_rejected() {
let mut port = MockPort::with_responses(&[]);
let args = json!({"pin": 1, "direction": "output"});
assert!(handle_gpio_set_dir(&mut port, &args).is_err());
}
#[test]
fn gpio_set_dir_device_error_sends_ctrl_c() {
let err = MockPort::error_frame("NameError: gpio_set_dir");
let mut port = MockPort::with_responses(&[&err]);
let args = json!({"pin": 1, "direction": "OUTPUT"});
let result = handle_gpio_set_dir(&mut port, &args);
assert!(result.is_err());
assert!(port.write_data.contains(&0x03));
}
}