#![allow(clippy::too_many_lines)]
use std::path::PathBuf;
use std::process::Stdio;
use std::time::Duration;
use serde_json::{json, Value};
use tempfile::TempDir;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::process::{Child, ChildStdin, ChildStdout};
use tokio::time::timeout;
pub(crate) const RESPONSE_DEADLINE: Duration = Duration::from_secs(60);
pub(crate) const EXIT_DEADLINE: Duration = Duration::from_secs(15);
const POLL_INTERVAL: Duration = Duration::from_millis(50);
const DAEMON_BOOT_TIMEOUT: Duration = Duration::from_secs(30);
pub(crate) fn binary() -> PathBuf {
PathBuf::from(env!("CARGO_BIN_EXE_trusty-memory"))
}
pub(crate) struct StdioChild {
pub(crate) child: Child,
pub(crate) stdin: ChildStdin,
pub(crate) reader: BufReader<ChildStdout>,
daemon: std::process::Child,
_data_dir: TempDir,
}
impl StdioChild {
pub(crate) async fn spawn(palace: Option<&str>) -> Self {
let data_dir = tempfile::tempdir().expect("tempdir");
let daemon = std::process::Command::new(binary())
.arg("serve")
.arg("--foreground")
.arg("--http")
.arg("127.0.0.1:0")
.env("TRUSTY_DATA_DIR_OVERRIDE", data_dir.path())
.env("TRUSTY_SKIP_PALACE_ENFORCEMENT", "1")
.env("RUST_LOG", "warn")
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::inherit())
.spawn()
.expect("spawn daemon");
let readiness_file = data_dir.path().join("trusty-memory").join("http_addr");
let deadline = std::time::Instant::now() + DAEMON_BOOT_TIMEOUT;
loop {
if readiness_file.exists() {
break;
}
assert!(
std::time::Instant::now() < deadline,
"daemon did not write http_addr within {:?}; expected at {}",
DAEMON_BOOT_TIMEOUT,
readiness_file.display()
);
std::thread::sleep(POLL_INTERVAL);
}
let mut cmd = tokio::process::Command::new(binary());
cmd.arg("serve")
.arg("--stdio")
.env("TRUSTY_DATA_DIR_OVERRIDE", data_dir.path())
.env("TRUSTY_SKIP_PALACE_ENFORCEMENT", "1")
.env("RUST_LOG", "warn")
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::inherit());
if let Some(p) = palace {
cmd.arg("--palace").arg(p);
}
let mut child = cmd.spawn().expect("spawn trusty-memory serve --stdio");
let stdin = child.stdin.take().expect("stdin pipe");
let stdout = child.stdout.take().expect("stdout pipe");
Self {
child,
stdin,
reader: BufReader::new(stdout),
daemon,
_data_dir: data_dir,
}
}
pub(crate) async fn send(&mut self, req: &Value) {
let line = serde_json::to_string(req).expect("serialise request");
self.stdin
.write_all(line.as_bytes())
.await
.expect("write request");
self.stdin.write_all(b"\n").await.expect("write newline");
self.stdin.flush().await.expect("flush stdin");
}
pub(crate) async fn recv(&mut self) -> Value {
let read_fut = async {
loop {
let mut line = String::new();
let n = self
.reader
.read_line(&mut line)
.await
.expect("read response line");
if n == 0 {
panic!("child exited without sending a response (EOF on stdout)");
}
let trimmed = line.trim().to_string();
if !trimmed.is_empty() {
return trimmed;
}
}
};
let raw = timeout(RESPONSE_DEADLINE, read_fut)
.await
.expect("response must arrive within deadline — server hung?");
serde_json::from_str(&raw).expect("response must be valid JSON")
}
pub(crate) async fn close(mut self) {
drop(self.stdin);
timeout(EXIT_DEADLINE, self.child.wait())
.await
.expect("bridge child must exit after stdin EOF")
.expect("bridge child wait");
let _ = self.daemon.kill();
let _ = self.daemon.wait();
}
}
#[tokio::test]
async fn stdio_serve_tools_list_bounded() {
let mut child = StdioChild::spawn(None).await;
child
.send(&json!({
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": {"name": "test", "version": "0.0.1"}
}
}))
.await;
let init_resp = child.recv().await;
assert!(
init_resp["error"].is_null(),
"initialize must succeed; got: {init_resp}"
);
assert_eq!(
init_resp["result"]["protocolVersion"], "2024-11-05",
"initialize must echo protocolVersion"
);
assert_eq!(init_resp["id"], 1, "response id must echo request id");
child
.send(&json!({
"jsonrpc": "2.0",
"method": "notifications/initialized"
}))
.await;
child
.send(&json!({
"jsonrpc": "2.0",
"id": 2,
"method": "tools/list"
}))
.await;
let tools_resp = child.recv().await;
assert!(
tools_resp["error"].is_null(),
"tools/list must succeed; got: {tools_resp}"
);
assert_eq!(
tools_resp["id"], 2,
"tools/list response id must be 2, not 1 (notification must not have produced a response)"
);
let tools = tools_resp["result"]["tools"]
.as_array()
.expect("result.tools must be an array");
assert!(
!tools.is_empty(),
"tools/list must return at least one tool"
);
child.close().await;
}
#[tokio::test]
async fn stdio_serve_remember_and_recall_bounded() {
let mut child = StdioChild::spawn(Some("test-palace")).await;
child
.send(&json!({
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {"protocolVersion": "2024-11-05", "capabilities": {}, "clientInfo": {"name":"t","version":"0"}}
}))
.await;
let init_resp = child.recv().await;
assert!(
init_resp["error"].is_null(),
"initialize failed: {init_resp}"
);
child
.send(&json!({
"jsonrpc": "2.0",
"id": 2,
"method": "tools/call",
"params": {
"name": "palace_create",
"arguments": {"name": "test-palace"}
}
}))
.await;
let create_resp = child.recv().await;
assert_eq!(create_resp["id"], 2, "palace_create response id must match");
child
.send(&json!({
"jsonrpc": "2.0",
"id": 3,
"method": "tools/call",
"params": {
"name": "memory_remember",
"arguments": {
"palace": "test-palace",
"text": "The stdio MCP server never hangs"
}
}
}))
.await;
let remember_resp = child.recv().await;
assert_eq!(
remember_resp["id"], 3,
"memory_remember response id must match"
);
let is_ok_or_error = !remember_resp["result"].is_null() || !remember_resp["error"].is_null();
assert!(
is_ok_or_error,
"memory_remember must return a result or error; got: {remember_resp}"
);
child
.send(&json!({
"jsonrpc": "2.0",
"id": 4,
"method": "tools/call",
"params": {
"name": "memory_recall",
"arguments": {
"palace": "test-palace",
"query": "stdio server"
}
}
}))
.await;
let recall_resp = child.recv().await;
assert_eq!(recall_resp["id"], 4, "memory_recall response id must match");
let is_ok_or_error = !recall_resp["result"].is_null() || !recall_resp["error"].is_null();
assert!(
is_ok_or_error,
"memory_recall must return a result or error; got: {recall_resp}"
);
child.close().await;
}
#[tokio::test]
async fn stdio_serve_recall_all_bounded() {
let mut child = StdioChild::spawn(None).await;
child
.send(&json!({
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {"protocolVersion": "2024-11-05", "capabilities": {}, "clientInfo": {"name":"t","version":"0"}}
}))
.await;
let init_resp = child.recv().await;
assert!(
init_resp["error"].is_null(),
"initialize failed: {init_resp}"
);
child
.send(&json!({
"jsonrpc": "2.0",
"id": 2,
"method": "tools/call",
"params": {
"name": "memory_recall_all",
"arguments": {"q": "never hang test"}
}
}))
.await;
let resp = child.recv().await;
assert_eq!(resp["id"], 2, "memory_recall_all response id must match");
let has_result_or_error = !resp["result"].is_null() || !resp["error"].is_null();
assert!(
has_result_or_error,
"memory_recall_all must return result or error within deadline; got: {resp}"
);
child.close().await;
}
#[tokio::test]
async fn stdio_serve_stdout_is_only_json() {
let mut child = StdioChild::spawn(None).await;
child
.send(&json!({
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {"protocolVersion": "2024-11-05", "capabilities": {}, "clientInfo": {"name":"t","version":"0"}}
}))
.await;
let resp = child.recv().await;
assert!(
resp["jsonrpc"].as_str() == Some("2.0"),
"first stdout line must be a JSON-RPC 2.0 response; got: {resp}"
);
child.close().await;
}