use anyhow::{anyhow, Context, Result};
use std::time::Duration;
use trusty_common::mcp::{self, DaemonBridgeConfig};
use crate::commands::daemon_guard::daemon_base_url;
const REQUEST_TIMEOUT: Duration = Duration::from_secs(60);
pub(crate) fn build_rpc_client() -> Result<reqwest::Client> {
reqwest::Client::builder()
.timeout(REQUEST_TIMEOUT)
.connect_timeout(REQUEST_TIMEOUT)
.build()
.context("build reqwest client for daemon-bridge")
}
pub(crate) async fn forward_rpc(
client: &reqwest::Client,
base_url: &str,
req: serde_json::Value,
) -> Result<serde_json::Value> {
let url = format!("{base_url}/rpc");
let resp = client
.post(&url)
.json(&req)
.send()
.await
.with_context(|| format!("POST {url}: connection to daemon failed"))?;
if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
return Err(anyhow!(
"daemon returned HTTP {status} for POST /rpc: {body}"
));
}
resp.json::<serde_json::Value>()
.await
.context("deserialise JSON-RPC response from daemon")
}
fn build_bridge_config() -> DaemonBridgeConfig {
DaemonBridgeConfig {
service_name: "trusty-memory".to_string(),
spawn_args: vec![],
health_path: "/health".to_string(),
base_url_fn: Box::new(daemon_base_url),
startup_timeout: None, poll_interval: None, no_spawn: true,
}
}
pub(crate) async fn ensure_daemon_up_for_stdio() -> Result<String> {
let config = build_bridge_config();
trusty_common::mcp::ensure_daemon_up(&config).await
}
fn is_notification(req: &mcp::Request) -> bool {
req.id.is_none() || req.method.starts_with("notifications/")
}
pub async fn run_stdio_bridge(palace: Option<String>) -> Result<()> {
let base_url = ensure_daemon_up_for_stdio().await?;
let default_palace = palace;
let client = build_rpc_client()?;
let result = mcp::run_stdio_loop(move |req| {
let client = client.clone();
let base_url = base_url.clone();
let default_palace = default_palace.clone();
async move {
if is_notification(&req) {
return mcp::Response::suppressed();
}
let req_value = inject_default_palace(req_to_value(&req), default_palace.as_deref());
match forward_rpc(&client, &base_url, req_value).await {
Ok(resp_value) => value_to_mcp_response(resp_value),
Err(e) => {
tracing::warn!("daemon bridge: transport error: {e:#}");
mcp::Response::err(
None,
mcp::error_codes::INTERNAL_ERROR,
format!("trusty-memory daemon unreachable: {e:#}"),
)
}
}
}
})
.await;
result
}
fn req_to_value(req: &mcp::Request) -> serde_json::Value {
serde_json::to_value(req).unwrap_or_else(|_| serde_json::json!({}))
}
fn inject_default_palace(
mut req: serde_json::Value,
default_palace: Option<&str>,
) -> serde_json::Value {
let Some(palace) = default_palace else {
return req;
};
let params = match req.get_mut("params") {
Some(p) if p.is_object() => p,
Some(p) if p.is_null() => {
*p = serde_json::json!({});
p
}
None => {
req["params"] = serde_json::json!({});
req.get_mut("params").expect("just inserted")
}
_ => return req,
};
if params.get("palace").is_none() {
params["palace"] = serde_json::Value::String(palace.to_string());
}
req
}
pub(crate) fn value_to_mcp_response(v: serde_json::Value) -> mcp::Response {
let id = v
.get("id")
.cloned()
.and_then(|id| if id.is_null() { None } else { Some(id) });
if let Some(result) = v.get("result").cloned() {
return mcp::Response::ok(id, result);
}
if let Some(err) = v.get("error") {
let code = err
.get("code")
.and_then(|c| c.as_i64())
.map(|c| c as i32)
.unwrap_or(mcp::error_codes::INTERNAL_ERROR);
let message = err
.get("message")
.and_then(|m| m.as_str())
.unwrap_or("unknown daemon error")
.to_string();
return mcp::Response::err(id, code, &message);
}
mcp::Response::err(
id,
mcp::error_codes::INTERNAL_ERROR,
"daemon returned a response with neither result nor error",
)
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn inject_default_palace_adds_when_absent() {
let req = json!({
"jsonrpc": "2.0",
"id": 1,
"method": "memory_remember",
"params": {"content": "hello"}
});
let out = inject_default_palace(req, Some("my-palace"));
assert_eq!(out["params"]["palace"], "my-palace");
assert_eq!(out["params"]["content"], "hello");
}
#[test]
fn inject_default_palace_preserves_existing() {
let req = json!({
"jsonrpc": "2.0",
"id": 1,
"method": "memory_remember",
"params": {"content": "hi", "palace": "caller-palace"}
});
let out = inject_default_palace(req, Some("default-palace"));
assert_eq!(out["params"]["palace"], "caller-palace");
}
#[test]
fn inject_default_palace_noop_when_none() {
let req = json!({
"jsonrpc": "2.0",
"id": 1,
"method": "memory_remember",
"params": {"content": "hi"}
});
let out = inject_default_palace(req.clone(), None);
assert_eq!(out, req);
}
#[test]
fn inject_default_palace_null_params_becomes_object() {
let req = json!({
"jsonrpc": "2.0",
"id": 1,
"method": "palace_list",
"params": null
});
let out = inject_default_palace(req, Some("my-palace"));
assert_eq!(out["params"]["palace"], "my-palace");
}
#[test]
fn value_to_mcp_response_variants() {
let ok = value_to_mcp_response(json!({"jsonrpc":"2.0","id":42,"result":{"tools":[]}}));
assert!(!ok.suppress);
assert_eq!(ok.id, Some(json!(42)));
assert!(ok.error.is_none());
let err = value_to_mcp_response(
json!({"jsonrpc":"2.0","id":7,"error":{"code":-32601,"message":"Not found"}}),
);
assert_eq!(err.error.unwrap().code, -32601);
let bad = value_to_mcp_response(json!({"jsonrpc":"2.0","id":1}));
assert_eq!(bad.error.unwrap().code, mcp::error_codes::INTERNAL_ERROR);
let null_id = value_to_mcp_response(json!({"jsonrpc":"2.0","id":null,"result":{}}));
assert_eq!(null_id.id, None);
}
#[test]
fn notification_requests_are_suppressed() {
let normal = mcp::Request {
jsonrpc: Some("2.0".to_string()),
id: Some(json!(1)),
method: "tools/list".to_string(),
params: None,
};
assert!(!is_notification(&normal));
let notif = mcp::Request {
jsonrpc: Some("2.0".to_string()),
id: None,
method: "notifications/initialized".to_string(),
params: None,
};
assert!(is_notification(¬if));
let notif_with_id = mcp::Request {
jsonrpc: Some("2.0".to_string()),
id: Some(json!(99)),
method: "notifications/cancelled".to_string(),
params: None,
};
assert!(is_notification(¬if_with_id));
}
#[tokio::test]
async fn forward_rpc_returns_error_on_connection_refused() {
let client = build_rpc_client().expect("build client");
let result =
forward_rpc(&client, "http://127.0.0.1:65534", json!({"method": "ping"})).await;
assert!(result.is_err(), "should fail when no server is listening");
}
#[test]
fn build_rpc_client_succeeds() {
assert!(build_rpc_client().is_ok());
}
}