use crate::mcp::McpSession;
use serde_json::json;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::Mutex;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CommandOutput {
pub stdout: String,
pub stderr: String,
pub exit_code: i32,
}
#[async_trait::async_trait]
pub trait Substrate: Send + Sync {
fn name(&self) -> &str;
async fn run_command(
&self,
cmd: &str,
timeout_s: Option<f64>,
) -> Result<CommandOutput, String>;
async fn read_text(&self, path: &str) -> Result<String, String>;
async fn write_text(&self, path: &str, content: &str) -> Result<(), String>;
async fn read_bytes(
&self,
path: &str,
offset: Option<u64>,
len: Option<u64>,
) -> Result<Vec<u8>, String>;
async fn write_bytes(&self, path: &str, bytes: &[u8]) -> Result<(), String>;
fn is_local(&self) -> bool {
false
}
fn display_path(&self, path: &str) -> String {
path.to_string()
}
async fn pty_start(&self, _cmd: &str) -> Result<String, String> {
Err("pty not supported by this substrate".into())
}
async fn pty_input(&self, _id: &str, _data: &str) -> Result<(), String> {
Err("pty not supported by this substrate".into())
}
async fn pty_read(&self, _id: &str) -> Result<String, String> {
Err("pty not supported by this substrate".into())
}
async fn pty_resize(&self, _id: &str, _rows: u16, _cols: u16) -> Result<(), String> {
Err("pty not supported by this substrate".into())
}
async fn pty_kill(&self, _id: &str) -> Result<(), String> {
Err("pty not supported by this substrate".into())
}
}
#[derive(Debug, Default, Clone)]
pub struct LocalSubstrate;
impl LocalSubstrate {
pub fn new() -> Self {
Self
}
pub fn resolve_path(path: &str) -> Result<PathBuf, String> {
let candidate = PathBuf::from(path);
if candidate.is_absolute() {
Ok(candidate)
} else {
std::env::current_dir()
.map(|cwd| cwd.join(candidate))
.map_err(|e| format!("failed to resolve working directory: {e}"))
}
}
}
#[async_trait::async_trait]
impl Substrate for LocalSubstrate {
fn name(&self) -> &str {
"local"
}
fn is_local(&self) -> bool {
true
}
async fn run_command(
&self,
cmd: &str,
timeout_s: Option<f64>,
) -> Result<CommandOutput, String> {
use tokio::process::Command;
let mut command = if cfg!(target_os = "windows") {
let mut c = Command::new("cmd");
c.arg("/C").arg(cmd);
c
} else {
let mut c = Command::new("sh");
c.arg("-c").arg(cmd);
c
};
let fut = command.output();
let output = match timeout_s {
Some(secs) if secs > 0.0 => {
let dur = std::time::Duration::from_secs_f64(secs);
match tokio::time::timeout(dur, fut).await {
Ok(res) => res.map_err(|e| format!("failed to run command: {e}"))?,
Err(_) => return Err(format!("command timed out after {secs}s")),
}
}
_ => fut.await.map_err(|e| format!("failed to run command: {e}"))?,
};
Ok(CommandOutput {
stdout: String::from_utf8_lossy(&output.stdout).to_string(),
stderr: String::from_utf8_lossy(&output.stderr).to_string(),
exit_code: output.status.code().unwrap_or(-1),
})
}
async fn read_text(&self, path: &str) -> Result<String, String> {
let full = Self::resolve_path(path)?;
std::fs::read_to_string(&full)
.map_err(|e| format!("failed to read file '{}': {e}", full.display()))
}
async fn write_text(&self, path: &str, content: &str) -> Result<(), String> {
let full = Self::resolve_path(path)?;
if let Some(parent) = full.parent() {
std::fs::create_dir_all(parent)
.map_err(|e| format!("failed to create parent dir '{}': {e}", parent.display()))?;
}
std::fs::write(&full, content)
.map_err(|e| format!("failed to write file '{}': {e}", full.display()))
}
async fn read_bytes(
&self,
path: &str,
offset: Option<u64>,
len: Option<u64>,
) -> Result<Vec<u8>, String> {
let full = Self::resolve_path(path)?;
let bytes = std::fs::read(&full)
.map_err(|e| format!("failed to read file '{}': {e}", full.display()))?;
let start = offset.unwrap_or(0) as usize;
if start >= bytes.len() {
return Ok(Vec::new());
}
let end = match len {
Some(l) => (start + l as usize).min(bytes.len()),
None => bytes.len(),
};
Ok(bytes[start..end].to_vec())
}
async fn write_bytes(&self, path: &str, bytes: &[u8]) -> Result<(), String> {
let full = Self::resolve_path(path)?;
if let Some(parent) = full.parent() {
std::fs::create_dir_all(parent)
.map_err(|e| format!("failed to create parent dir '{}': {e}", parent.display()))?;
}
std::fs::write(&full, bytes)
.map_err(|e| format!("failed to write file '{}': {e}", full.display()))
}
fn display_path(&self, path: &str) -> String {
match Self::resolve_path(path) {
Ok(p) => p.display().to_string(),
Err(_) => path.to_string(),
}
}
}
pub struct McpSubstrate {
session: Arc<Mutex<dyn McpSession>>,
name: String,
}
impl McpSubstrate {
pub fn new(session: Arc<Mutex<dyn McpSession>>, name: impl Into<String>) -> Self {
Self {
session,
name: name.into(),
}
}
async fn call(&self, tool: &str, args: serde_json::Value) -> Result<serde_json::Value, String> {
let mut guard = self.session.lock().await;
guard.call_tool(tool, args).await
}
}
#[async_trait::async_trait]
impl Substrate for McpSubstrate {
fn name(&self) -> &str {
&self.name
}
async fn run_command(
&self,
cmd: &str,
timeout_s: Option<f64>,
) -> Result<CommandOutput, String> {
let mut args = json!({ "command": cmd });
if let Some(secs) = timeout_s {
args["timeout"] = json!(secs);
}
let result = self.call("run_command", args).await?;
match &result {
serde_json::Value::String(s) => Ok(CommandOutput {
stdout: s.clone(),
stderr: String::new(),
exit_code: 0,
}),
serde_json::Value::Object(_) => Ok(CommandOutput {
stdout: result
.get("stdout")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string(),
stderr: result
.get("stderr")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string(),
exit_code: result
.get("exit_code")
.and_then(|v| v.as_i64())
.unwrap_or(0) as i32,
}),
other => Ok(CommandOutput {
stdout: other.to_string(),
stderr: String::new(),
exit_code: 0,
}),
}
}
async fn read_text(&self, path: &str) -> Result<String, String> {
let result = self.call("read_text", json!({ "path": path })).await?;
match result {
serde_json::Value::String(s) => Ok(s),
other => Ok(other
.get("content")
.and_then(|v| v.as_str())
.map(|s| s.to_string())
.unwrap_or_else(|| other.to_string())),
}
}
async fn write_text(&self, path: &str, content: &str) -> Result<(), String> {
self.call("write_text", json!({ "path": path, "content": content }))
.await
.map(|_| ())
}
async fn read_bytes(
&self,
path: &str,
offset: Option<u64>,
len: Option<u64>,
) -> Result<Vec<u8>, String> {
let mut args = json!({ "path": path });
if let Some(o) = offset {
args["offset"] = json!(o);
}
if let Some(l) = len {
args["length"] = json!(l);
}
let result = self.call("read_bytes", args).await?;
let b64 = match &result {
serde_json::Value::String(s) => s.clone(),
other => other
.get("data")
.or_else(|| other.get("base64"))
.and_then(|v| v.as_str())
.map(|s| s.to_string())
.ok_or_else(|| "read_bytes: no base64 data in response".to_string())?,
};
base64_decode(b64.trim()).map_err(|e| format!("read_bytes: invalid base64: {e}"))
}
async fn write_bytes(&self, path: &str, bytes: &[u8]) -> Result<(), String> {
let b64 = base64_encode(bytes);
self.call("write_bytes", json!({ "path": path, "content_b64": b64 }))
.await
.map(|_| ())
}
async fn pty_start(&self, cmd: &str) -> Result<String, String> {
let result = self.call("pty_start", json!({ "command": cmd })).await?;
if let Some(pid) = result.get("pid").and_then(|v| v.as_i64()) {
return Ok(pid.to_string());
}
let text = match &result {
serde_json::Value::String(s) => s.clone(),
other => other.to_string(),
};
text.lines()
.find_map(|l| l.trim_start().strip_prefix("pid:").map(|r| r.trim().to_string()))
.filter(|p| !p.is_empty() && p.bytes().all(|b| b.is_ascii_digit()))
.ok_or_else(|| format!("pty_start: no pid in response: {text}"))
}
async fn pty_input(&self, id: &str, data: &str) -> Result<(), String> {
self.call("pty_input", json!({ "pid": pty_pid(id)?, "data": data }))
.await
.map(|_| ())
}
async fn pty_read(&self, id: &str) -> Result<String, String> {
let result = self.call("pty_read", json!({ "pid": pty_pid(id)? })).await?;
match result {
serde_json::Value::String(s) => Ok(s),
other => Ok(other
.get("data")
.and_then(|v| v.as_str())
.map(|s| s.to_string())
.unwrap_or_else(|| other.to_string())),
}
}
async fn pty_resize(&self, id: &str, rows: u16, cols: u16) -> Result<(), String> {
self.call(
"pty_resize",
json!({ "pid": pty_pid(id)?, "rows": rows, "cols": cols }),
)
.await
.map(|_| ())
}
async fn pty_kill(&self, id: &str) -> Result<(), String> {
self.call("pty_kill", json!({ "pid": pty_pid(id)? }))
.await
.map(|_| ())
}
}
fn pty_pid(id: &str) -> Result<i64, String> {
id.trim()
.parse::<i64>()
.map_err(|_| format!("invalid pty id (expected numeric pid): {id:?}"))
}
const B64_ALPHABET: &[u8; 64] =
b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
fn base64_encode(bytes: &[u8]) -> String {
let mut out = String::with_capacity(bytes.len().div_ceil(3) * 4);
for chunk in bytes.chunks(3) {
let b0 = chunk[0] as u32;
let b1 = *chunk.get(1).unwrap_or(&0) as u32;
let b2 = *chunk.get(2).unwrap_or(&0) as u32;
let n = (b0 << 16) | (b1 << 8) | b2;
out.push(B64_ALPHABET[((n >> 18) & 0x3f) as usize] as char);
out.push(B64_ALPHABET[((n >> 12) & 0x3f) as usize] as char);
if chunk.len() > 1 {
out.push(B64_ALPHABET[((n >> 6) & 0x3f) as usize] as char);
} else {
out.push('=');
}
if chunk.len() > 2 {
out.push(B64_ALPHABET[(n & 0x3f) as usize] as char);
} else {
out.push('=');
}
}
out
}
fn base64_decode(s: &str) -> Result<Vec<u8>, String> {
fn val(c: u8) -> Result<u32, String> {
match c {
b'A'..=b'Z' => Ok((c - b'A') as u32),
b'a'..=b'z' => Ok((c - b'a' + 26) as u32),
b'0'..=b'9' => Ok((c - b'0' + 52) as u32),
b'+' => Ok(62),
b'/' => Ok(63),
_ => Err(format!("invalid base64 char {:?}", c as char)),
}
}
let clean: Vec<u8> = s
.bytes()
.filter(|b| !b.is_ascii_whitespace() && *b != b'=')
.collect();
let mut out = Vec::with_capacity(clean.len() / 4 * 3);
for chunk in clean.chunks(4) {
let mut n = 0u32;
let mut bits = 0;
for &c in chunk {
n = (n << 6) | val(c)?;
bits += 6;
}
n <<= 24 - bits;
let nbytes = bits / 8;
for i in 0..nbytes {
out.push(((n >> (16 - i * 8)) & 0xff) as u8);
}
}
Ok(out)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn base64_roundtrip() {
for case in [&b""[..], b"f", b"fo", b"foo", b"foob", b"fooba", b"foobar", &[0u8, 1, 2, 255]] {
let enc = base64_encode(case);
let dec = base64_decode(&enc).unwrap();
assert_eq!(dec, case, "roundtrip failed for {case:?} (enc={enc})");
}
assert_eq!(base64_encode(b"foobar"), "Zm9vYmFy");
assert_eq!(base64_decode("Zm9vYmFy").unwrap(), b"foobar");
}
#[tokio::test]
async fn local_read_write_roundtrip() {
let dir = std::env::temp_dir().join(format!("car-substrate-{}", uuid_like()));
std::fs::create_dir_all(&dir).unwrap();
let path = dir.join("hello.txt");
let path_str = path.to_string_lossy().to_string();
let sub = LocalSubstrate::new();
assert_eq!(sub.name(), "local");
sub.write_text(&path_str, "hello world").await.unwrap();
let read = sub.read_text(&path_str).await.unwrap();
assert_eq!(read, "hello world");
let bytes = sub.read_bytes(&path_str, None, None).await.unwrap();
assert_eq!(bytes, b"hello world");
let window = sub.read_bytes(&path_str, Some(6), Some(5)).await.unwrap();
assert_eq!(window, b"world");
sub.write_bytes(&path_str, b"\x00\x01\x02").await.unwrap();
let raw = sub.read_bytes(&path_str, None, None).await.unwrap();
assert_eq!(raw, vec![0u8, 1, 2]);
std::fs::remove_dir_all(&dir).ok();
}
#[tokio::test]
async fn local_run_command_captures_output() {
let sub = LocalSubstrate::new();
let out = sub.run_command("echo hi", None).await.unwrap();
assert_eq!(out.stdout.trim(), "hi");
assert_eq!(out.exit_code, 0);
}
#[tokio::test]
async fn local_pty_defaults_to_unsupported() {
let sub = LocalSubstrate::new();
assert!(sub.pty_start("sh").await.is_err());
}
use crate::mcp::McpToolInfo;
use serde_json::Value;
use std::sync::Mutex as StdMutex;
struct MockSession {
name: String,
calls: Arc<StdMutex<Vec<(String, Value)>>>,
files: Arc<StdMutex<std::collections::HashMap<String, String>>>,
}
#[async_trait::async_trait]
impl McpSession for MockSession {
async fn list_tools(&mut self) -> Result<Vec<McpToolInfo>, String> {
Ok(vec![])
}
async fn call_tool(&mut self, name: &str, arguments: Value) -> Result<Value, String> {
self.calls
.lock()
.unwrap()
.push((name.to_string(), arguments.clone()));
match name {
"write_text" => {
let p = arguments["path"].as_str().unwrap().to_string();
let c = arguments["content"].as_str().unwrap().to_string();
self.files.lock().unwrap().insert(p, c);
Ok(Value::String("ok".into()))
}
"read_text" => {
let p = arguments["path"].as_str().unwrap();
let c = self
.files
.lock()
.unwrap()
.get(p)
.cloned()
.ok_or_else(|| "not found".to_string())?;
Ok(Value::String(c))
}
"run_command" => Ok(json!({
"stdout": "from-vm",
"stderr": "",
"exit_code": 0
})),
"read_bytes" => Ok(Value::String("aGk=".into())),
"write_bytes" => Ok(Value::String("ok".into())),
"pty_start" => Ok(Value::String("pid: 4242\ncols: 80\nrows: 24".into())),
"pty_read" => Ok(Value::String("pty-out".into())),
"pty_input" | "pty_resize" | "pty_kill" => Ok(Value::String("ok".into())),
_ => Err(format!("unknown tool {name}")),
}
}
fn name(&self) -> &str {
&self.name
}
}
#[tokio::test]
async fn mcp_substrate_routes_to_session() {
let calls = Arc::new(StdMutex::new(Vec::new()));
let files = Arc::new(StdMutex::new(std::collections::HashMap::new()));
let mock = MockSession {
name: "vm".into(),
calls: calls.clone(),
files: files.clone(),
};
let session: Arc<Mutex<dyn McpSession>> = Arc::new(Mutex::new(mock));
let sub = McpSubstrate::new(session, "vm");
assert_eq!(sub.name(), "vm");
sub.write_text("/tmp/a.txt", "vm-content").await.unwrap();
let read = sub.read_text("/tmp/a.txt").await.unwrap();
assert_eq!(read, "vm-content");
let out = sub.run_command("ls", None).await.unwrap();
assert_eq!(out.stdout, "from-vm");
assert_eq!(out.exit_code, 0);
let recorded = calls.lock().unwrap();
let names: Vec<&str> = recorded.iter().map(|(n, _)| n.as_str()).collect();
assert_eq!(names, vec!["write_text", "read_text", "run_command"]);
}
#[tokio::test]
async fn mcp_substrate_wire_args_match_canonical_bridge() {
let calls = Arc::new(StdMutex::new(Vec::new()));
let files = Arc::new(StdMutex::new(std::collections::HashMap::new()));
let mock = MockSession {
name: "vm".into(),
calls: calls.clone(),
files: files.clone(),
};
let session: Arc<Mutex<dyn McpSession>> = Arc::new(Mutex::new(mock));
let sub = McpSubstrate::new(session, "vm");
sub.run_command("ls", Some(5.0)).await.unwrap();
let bytes = sub.read_bytes("/f", Some(2), Some(4)).await.unwrap();
assert_eq!(bytes, b"hi"); sub.write_bytes("/f", b"hi").await.unwrap();
let pid = sub.pty_start("bash").await.unwrap();
assert_eq!(pid, "4242");
sub.pty_input(&pid, "echo\n").await.unwrap();
let out = sub.pty_read(&pid).await.unwrap();
assert_eq!(out, "pty-out");
sub.pty_resize(&pid, 40, 100).await.unwrap();
sub.pty_kill(&pid).await.unwrap();
let recorded = calls.lock().unwrap();
let by_tool = |t: &str| -> Value {
recorded.iter().find(|(n, _)| n == t).unwrap().1.clone()
};
assert_eq!(by_tool("run_command"), json!({"command": "ls", "timeout": 5.0}));
assert_eq!(by_tool("read_bytes"), json!({"path": "/f", "offset": 2, "length": 4}));
let wb = by_tool("write_bytes");
assert!(wb.get("content_b64").is_some(), "write_bytes must use content_b64: {wb}");
assert!(wb.get("data").is_none(), "write_bytes must not use legacy `data`: {wb}");
assert_eq!(wb["path"], json!("/f"));
assert_eq!(by_tool("pty_start"), json!({"command": "bash"}));
assert_eq!(by_tool("pty_input"), json!({"pid": 4242, "data": "echo\n"}));
assert_eq!(by_tool("pty_read"), json!({"pid": 4242}));
assert_eq!(by_tool("pty_resize"), json!({"pid": 4242, "rows": 40, "cols": 100}));
assert_eq!(by_tool("pty_kill"), json!({"pid": 4242}));
}
fn uuid_like() -> String {
use std::time::{SystemTime, UNIX_EPOCH};
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos();
format!("{nanos}")
}
}