use std::path::PathBuf;
use std::time::Duration;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tracing::warn;
use crate::error::{Result, ZeptoError};
use crate::plugins::types::PluginToolDef;
use crate::tools::{Tool, ToolCategory, ToolContext, ToolOutput};
#[derive(Serialize)]
struct PluginJsonRpcRequest {
jsonrpc: String,
id: u64,
method: String,
params: PluginExecuteParams,
}
#[derive(Serialize)]
struct PluginExecuteParams {
tool: String,
args: Value,
}
#[derive(Deserialize)]
struct PluginJsonRpcResponse {
#[allow(dead_code)]
jsonrpc: String,
#[allow(dead_code)]
id: Option<u64>,
result: Option<PluginJsonRpcResult>,
error: Option<PluginJsonRpcError>,
}
#[derive(Deserialize)]
struct PluginJsonRpcResult {
output: String,
}
#[derive(Deserialize)]
struct PluginJsonRpcError {
code: i64,
message: String,
#[allow(dead_code)]
data: Option<Value>,
}
pub struct BinaryPluginTool {
def: PluginToolDef,
plugin_name: String,
binary_path: PathBuf,
timeout: Duration,
}
impl BinaryPluginTool {
pub fn new(
def: PluginToolDef,
plugin_name: impl Into<String>,
binary_path: PathBuf,
timeout_secs: u64,
) -> Self {
Self {
def,
plugin_name: plugin_name.into(),
binary_path,
timeout: Duration::from_secs(timeout_secs),
}
}
}
impl std::fmt::Debug for BinaryPluginTool {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BinaryPluginTool")
.field("name", &self.def.name)
.field("plugin", &self.plugin_name)
.field("binary", &self.binary_path)
.finish()
}
}
#[async_trait]
impl Tool for BinaryPluginTool {
fn name(&self) -> &str {
&self.def.name
}
fn description(&self) -> &str {
&self.def.description
}
fn compact_description(&self) -> &str {
self.description()
}
fn category(&self) -> ToolCategory {
ToolCategory::Shell
}
fn parameters(&self) -> Value {
self.def.parameters.clone()
}
async fn execute(&self, args: Value, ctx: &ToolContext) -> Result<ToolOutput> {
use tokio::io::AsyncWriteExt;
use tokio::process::Command;
let request = PluginJsonRpcRequest {
jsonrpc: "2.0".to_string(),
id: 1,
method: "execute".to_string(),
params: PluginExecuteParams {
tool: self.def.name.clone(),
args,
},
};
let request_json = serde_json::to_string(&request).map_err(|e| {
ZeptoError::Tool(format!("Failed to serialize JSON-RPC request: {}", e))
})?;
let mut retries: u8 = 0;
let mut child = loop {
let mut cmd = Command::new(&self.binary_path);
cmd.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped());
if let Some(workspace) = &ctx.workspace {
cmd.current_dir(workspace);
}
if let Some(env_vars) = &self.def.env {
for (key, value) in env_vars {
cmd.env(key, value);
}
}
match cmd.spawn() {
Ok(child) => break child,
Err(e) if e.raw_os_error() == Some(26) && retries == 0 => {
retries += 1;
tokio::time::sleep(Duration::from_millis(25)).await;
}
Err(e) if e.raw_os_error() == Some(26) => {
tokio::time::sleep(Duration::from_millis(50)).await;
break cmd.spawn().map_err(|final_err| {
ZeptoError::Tool(format!(
"Failed to spawn binary plugin '{}' ({}) after ETXTBSY retries: {}",
self.plugin_name,
self.binary_path.display(),
final_err
))
})?;
}
Err(e) => {
return Err(ZeptoError::Tool(format!(
"Failed to spawn binary plugin '{}' ({}): {}",
self.plugin_name,
self.binary_path.display(),
e
)));
}
}
};
if let Some(mut stdin) = child.stdin.take() {
stdin
.write_all(request_json.as_bytes())
.await
.map_err(|e| {
ZeptoError::Tool(format!(
"Failed to write to binary plugin '{}' stdin: {}",
self.plugin_name, e
))
})?;
stdin.write_all(b"\n").await.ok();
}
let output = match tokio::time::timeout(self.timeout, child.wait_with_output()).await {
Ok(Ok(output)) => output,
Ok(Err(e)) => {
return Err(ZeptoError::Tool(format!(
"Binary plugin '{}' failed: {}",
self.plugin_name, e
)));
}
Err(_) => {
return Err(ZeptoError::Tool(format!(
"Binary plugin '{}' timed out after {}s",
self.plugin_name,
self.timeout.as_secs()
)));
}
};
let stdout = String::from_utf8_lossy(&output.stdout);
let stderr = String::from_utf8_lossy(&output.stderr);
if !output.status.success() {
let code = output.status.code().unwrap_or(-1);
let err_detail = if stderr.is_empty() {
stdout.to_string()
} else {
stderr.to_string()
};
return Err(ZeptoError::Tool(format!(
"Binary plugin '{}' exited with code {}: {}",
self.plugin_name,
code,
err_detail.trim()
)));
}
let response_line = stdout
.lines()
.rev()
.find(|line| !line.trim().is_empty())
.unwrap_or("");
if response_line.is_empty() {
return Err(ZeptoError::Tool(format!(
"Binary plugin '{}' produced no output",
self.plugin_name
)));
}
let response: PluginJsonRpcResponse = serde_json::from_str(response_line).map_err(|e| {
ZeptoError::Tool(format!(
"Binary plugin '{}' returned invalid JSON-RPC: {} (raw: {})",
self.plugin_name,
e,
&crate::utils::string::preview(response_line, 200)
))
})?;
if let Some(err) = response.error {
warn!(
plugin = %self.plugin_name,
code = err.code,
"Binary plugin returned error"
);
return Err(ZeptoError::Tool(format!(
"Binary plugin '{}' error (code {}): {}",
self.plugin_name, err.code, err.message
)));
}
match response.result {
Some(result) => Ok(ToolOutput::llm_only(result.output)),
None => Err(ZeptoError::Tool(format!(
"Binary plugin '{}' returned neither result nor error",
self.plugin_name
))),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[cfg(unix)]
fn is_root() -> bool {
std::process::Command::new("id")
.arg("-u")
.output()
.ok()
.and_then(|o| String::from_utf8(o.stdout).ok())
.is_some_and(|s| s.trim() == "0")
}
#[test]
fn test_jsonrpc_request_serialization() {
let req = PluginJsonRpcRequest {
jsonrpc: "2.0".to_string(),
id: 1,
method: "execute".to_string(),
params: PluginExecuteParams {
tool: "my_tool".to_string(),
args: json!({"limit": 10}),
},
};
let json = serde_json::to_value(&req).unwrap();
assert_eq!(json["jsonrpc"], "2.0");
assert_eq!(json["id"], 1);
assert_eq!(json["method"], "execute");
assert_eq!(json["params"]["tool"], "my_tool");
assert_eq!(json["params"]["args"]["limit"], 10);
}
#[test]
fn test_jsonrpc_response_success_deser() {
let json_str = r#"{"jsonrpc":"2.0","result":{"output":"payment list..."},"id":1}"#;
let resp: PluginJsonRpcResponse = serde_json::from_str(json_str).unwrap();
assert!(resp.result.is_some());
assert!(resp.error.is_none());
assert_eq!(resp.result.unwrap().output, "payment list...");
}
#[test]
fn test_jsonrpc_response_error_deser() {
let json_str =
r#"{"jsonrpc":"2.0","error":{"code":-1,"message":"API key not configured"},"id":1}"#;
let resp: PluginJsonRpcResponse = serde_json::from_str(json_str).unwrap();
assert!(resp.result.is_none());
assert!(resp.error.is_some());
let err = resp.error.unwrap();
assert_eq!(err.code, -1);
assert_eq!(err.message, "API key not configured");
assert!(err.data.is_none());
}
#[test]
fn test_jsonrpc_response_missing_result() {
let json_str = r#"{"jsonrpc":"2.0","id":1}"#;
let resp: PluginJsonRpcResponse = serde_json::from_str(json_str).unwrap();
assert!(resp.result.is_none());
assert!(resp.error.is_none());
}
#[test]
fn test_jsonrpc_response_with_error_data() {
let json_str = r#"{"jsonrpc":"2.0","error":{"code":-32600,"message":"Invalid Request","data":{"details":"missing field"}},"id":1}"#;
let resp: PluginJsonRpcResponse = serde_json::from_str(json_str).unwrap();
let err = resp.error.unwrap();
assert_eq!(err.code, -32600);
assert!(err.data.is_some());
}
fn test_tool_def() -> PluginToolDef {
PluginToolDef {
name: "my_tool".to_string(),
description: "My test tool".to_string(),
parameters: json!({"type": "object", "properties": {"x": {"type": "string"}}}),
command: String::new(),
working_dir: None,
timeout_secs: None,
env: None,
}
}
#[test]
fn test_tool_name() {
let tool = BinaryPluginTool::new(
test_tool_def(),
"test-plugin",
PathBuf::from("/bin/echo"),
30,
);
assert_eq!(tool.name(), "my_tool");
}
#[test]
fn test_tool_description() {
let tool = BinaryPluginTool::new(
test_tool_def(),
"test-plugin",
PathBuf::from("/bin/echo"),
30,
);
assert_eq!(tool.description(), "My test tool");
}
#[test]
fn test_tool_parameters() {
let tool = BinaryPluginTool::new(
test_tool_def(),
"test-plugin",
PathBuf::from("/bin/echo"),
30,
);
let params = tool.parameters();
assert_eq!(params["type"], "object");
assert!(params["properties"]["x"].is_object());
}
#[cfg(unix)]
fn create_test_script(content: &str) -> (tempfile::TempDir, PathBuf) {
use std::io::Write;
use std::os::unix::fs::PermissionsExt;
let dir = tempfile::TempDir::new().unwrap();
let script_path = dir.path().join("plugin.sh");
{
let mut f = std::fs::File::create(&script_path).unwrap();
f.write_all(format!("#!/bin/sh\n{}", content).as_bytes())
.unwrap();
f.sync_all().unwrap();
}
std::fs::set_permissions(&script_path, std::fs::Permissions::from_mode(0o755)).unwrap();
(dir, script_path)
}
#[cfg(unix)]
#[tokio::test]
async fn test_execute_success() {
let (_dir, script_path) = create_test_script(
r#"read input
echo '{"jsonrpc":"2.0","result":{"output":"hello world"},"id":1}'"#,
);
let tool = BinaryPluginTool::new(test_tool_def(), "test-plugin", script_path, 30);
let ctx = ToolContext::new();
let result = tool.execute(json!({"x": "test"}), &ctx).await;
assert!(result.is_ok(), "Expected Ok, got: {:?}", result);
assert_eq!(result.unwrap().for_llm, "hello world");
}
#[cfg(unix)]
#[tokio::test]
async fn test_execute_error_response() {
let (_dir, script_path) = create_test_script(
r#"read input
echo '{"jsonrpc":"2.0","error":{"code":-1,"message":"something broke"},"id":1}'"#,
);
let tool = BinaryPluginTool::new(test_tool_def(), "test-plugin", script_path, 30);
let ctx = ToolContext::new();
let result = tool.execute(json!({}), &ctx).await;
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(err.contains("something broke"), "err was: {}", err);
assert!(err.contains("code -1"), "err was: {}", err);
}
#[cfg(unix)]
#[tokio::test]
async fn test_execute_non_zero_exit() {
let (_dir, script_path) = create_test_script("cat > /dev/null\nexit 1");
let tool = BinaryPluginTool::new(test_tool_def(), "test-plugin", script_path, 30);
let ctx = ToolContext::new();
let result = tool.execute(json!({}), &ctx).await;
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(err.contains("exited with code 1"), "err was: {}", err);
}
#[cfg(unix)]
#[tokio::test]
async fn test_execute_timeout() {
let (_dir, script_path) = create_test_script("sleep 10");
let tool = BinaryPluginTool::new(
test_tool_def(),
"test-plugin",
script_path,
1, );
let ctx = ToolContext::new();
let result = tool.execute(json!({}), &ctx).await;
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(err.contains("timed out"), "err was: {}", err);
}
#[cfg(unix)]
#[tokio::test]
async fn test_execute_malformed_json() {
let (_dir, script_path) = create_test_script("cat > /dev/null\necho 'not json at all'");
let tool = BinaryPluginTool::new(test_tool_def(), "test-plugin", script_path, 30);
let ctx = ToolContext::new();
let result = tool.execute(json!({}), &ctx).await;
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(err.contains("invalid JSON-RPC"), "err was: {}", err);
}
#[cfg(unix)]
#[tokio::test]
async fn test_execute_empty_stdout() {
let (_dir, script_path) = create_test_script("cat > /dev/null\n# produces nothing");
let tool = BinaryPluginTool::new(test_tool_def(), "test-plugin", script_path, 30);
let ctx = ToolContext::new();
let result = tool.execute(json!({}), &ctx).await;
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(err.contains("no output"), "err was: {}", err);
}
#[cfg(unix)]
#[tokio::test]
async fn test_execute_spawn_failure() {
let tool = BinaryPluginTool::new(
test_tool_def(),
"test-plugin",
PathBuf::from("/nonexistent/binary"),
30,
);
let ctx = ToolContext::new();
let result = tool.execute(json!({}), &ctx).await;
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(
err.contains("Failed to spawn") || err.contains("No such file"),
"err was: {}",
err
);
}
#[cfg(unix)]
#[tokio::test]
async fn test_execute_with_args() {
let (_dir, script_path) = create_test_script(
r#"read input
if echo "$input" | grep -q '"x"'; then
echo '{"jsonrpc":"2.0","result":{"output":"args received"},"id":1}'
else
echo '{"jsonrpc":"2.0","error":{"code":-1,"message":"no args"},"id":1}'
fi"#,
);
let tool = BinaryPluginTool::new(test_tool_def(), "test-plugin", script_path, 30);
let ctx = ToolContext::new();
let result = tool.execute(json!({"x": "hello"}), &ctx).await;
assert!(result.is_ok(), "Expected Ok, got: {:?}", result);
assert_eq!(result.unwrap().for_llm, "args received");
}
#[cfg(unix)]
#[tokio::test]
async fn test_execute_with_workspace() {
let workspace = tempfile::TempDir::new().unwrap();
let (_dir, script_path) = create_test_script(
r#"read input
cwd=$(pwd)
echo "{\"jsonrpc\":\"2.0\",\"result\":{\"output\":\"cwd: $cwd\"},\"id\":1}""#,
);
let tool = BinaryPluginTool::new(test_tool_def(), "test-plugin", script_path, 30);
let ctx = ToolContext::new().with_workspace(workspace.path().to_str().unwrap());
let result = tool.execute(json!({}), &ctx).await;
assert!(result.is_ok(), "Expected Ok, got: {:?}", result);
let output = result.unwrap().for_llm;
assert!(output.contains("cwd:"), "output was: {}", output);
}
#[cfg(unix)]
#[tokio::test]
async fn test_execute_with_env() {
let (_dir, script_path) = create_test_script(
r#"read input
echo "{\"jsonrpc\":\"2.0\",\"result\":{\"output\":\"FOO=$MY_TEST_VAR\"},\"id\":1}""#,
);
let mut env = std::collections::HashMap::new();
env.insert("MY_TEST_VAR".to_string(), "bar_value".to_string());
let mut def = test_tool_def();
def.env = Some(env);
let tool = BinaryPluginTool::new(def, "test-plugin", script_path, 30);
let ctx = ToolContext::new();
let result = tool.execute(json!({}), &ctx).await;
assert!(result.is_ok(), "Expected Ok, got: {:?}", result);
assert_eq!(result.unwrap().for_llm, "FOO=bar_value");
}
#[cfg(unix)]
#[tokio::test]
async fn test_execute_stderr_on_failure() {
let (_dir, script_path) = create_test_script(
r#"cat > /dev/null
echo "error details" >&2
exit 1"#,
);
let tool = BinaryPluginTool::new(test_tool_def(), "test-plugin", script_path, 30);
let ctx = ToolContext::new();
let result = tool.execute(json!({}), &ctx).await;
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(err.contains("error details"), "err was: {}", err);
}
#[cfg(unix)]
#[tokio::test]
async fn test_execute_binary_not_executable() {
if is_root() {
eprintln!("skipping test_execute_binary_not_executable: running as root");
return;
}
let dir = tempfile::TempDir::new().unwrap();
let script_path = dir.path().join("plugin.sh");
std::fs::write(&script_path, "#!/bin/sh\necho ok").unwrap();
use std::os::unix::fs::PermissionsExt;
std::fs::set_permissions(&script_path, std::fs::Permissions::from_mode(0o644)).unwrap();
let tool = BinaryPluginTool::new(test_tool_def(), "test-plugin", script_path, 30);
let ctx = ToolContext::new();
let result = tool.execute(json!({}), &ctx).await;
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(err.contains("Failed to spawn"), "err was: {}", err);
}
#[cfg(unix)]
#[tokio::test]
async fn test_execute_large_output() {
let (_dir, script_path) = create_test_script(
r#"read input
large=$(python3 -c "print('x' * 10000)" 2>/dev/null || printf 'x%.0s' $(seq 1 10000))
echo "{\"jsonrpc\":\"2.0\",\"result\":{\"output\":\"$large\"},\"id\":1}""#,
);
let tool = BinaryPluginTool::new(test_tool_def(), "test-plugin", script_path, 30);
let ctx = ToolContext::new();
let result = tool.execute(json!({}), &ctx).await;
assert!(result.is_ok(), "Expected Ok, got: {:?}", result);
assert!(result.unwrap().for_llm.len() >= 10000);
}
#[cfg(unix)]
#[tokio::test]
async fn test_execute_extra_stdout_ignored() {
let (_dir, script_path) = create_test_script(
r#"read input
echo "debug: starting up"
echo "debug: processing"
echo '{"jsonrpc":"2.0","result":{"output":"final answer"},"id":1}'"#,
);
let tool = BinaryPluginTool::new(test_tool_def(), "test-plugin", script_path, 30);
let ctx = ToolContext::new();
let result = tool.execute(json!({}), &ctx).await;
assert!(result.is_ok(), "Expected Ok, got: {:?}", result);
assert_eq!(result.unwrap().for_llm, "final answer");
}
}