#![cfg(feature = "serve")]
use std::io::{BufRead, BufReader, Write};
use std::process::{Child, ChildStdout, Command, Stdio};
use std::sync::mpsc;
use std::time::{Duration, Instant};
const BIN: &str = env!("CARGO_BIN_EXE_m1nd-mcp");
fn free_port() -> u16 {
let listener = std::net::TcpListener::bind("127.0.0.1:0").expect("bind ephemeral port");
listener.local_addr().expect("read local addr").port()
}
fn spawn_owner(port: u16, tmp: &std::path::Path) -> Child {
Command::new(BIN)
.arg("--serve")
.arg("--port")
.arg(port.to_string())
.arg("--no-gui")
.env("M1ND_RUNTIME_DIR", tmp.join("runtime"))
.env("M1ND_REGISTRY_DIR", tmp.join("registry"))
.env("M1ND_GRAPH_SOURCE", tmp.join("graph.snapshot"))
.env("M1ND_PLASTICITY_STATE", tmp.join("plasticity.json"))
.env("M1ND_NO_GUI", "1")
.stdout(Stdio::null())
.stderr(Stdio::null())
.spawn()
.expect("spawn --serve owner")
}
fn wait_until_serving(base_url: &str) {
let endpoint = format!("{base_url}/mcp");
let init = init_payload();
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("rt");
rt.block_on(async {
let client = reqwest::Client::builder().build().expect("client");
let deadline = Instant::now() + Duration::from_secs(30);
loop {
let r = client
.post(&endpoint)
.header("Accept", "application/json, text/event-stream")
.header("Content-Type", "application/json")
.body(init.clone())
.send()
.await;
if let Ok(resp) = r {
if let Ok(body) = resp.text().await {
if body.contains("\"result\"") {
return;
}
}
}
if Instant::now() >= deadline {
panic!("owner never answered initialize within 30s");
}
std::thread::sleep(Duration::from_millis(150));
}
});
}
fn init_payload() -> String {
serde_json::json!({
"jsonrpc": "2.0", "id": 1, "method": "initialize",
"params": {
"protocolVersion": "2025-06-18",
"capabilities": {},
"clientInfo": { "name": "l21-self-echo-probe", "version": "1" }
}
})
.to_string()
}
fn spawn_line_reader(stdout: ChildStdout) -> mpsc::Receiver<String> {
let (tx, rx) = mpsc::channel();
std::thread::spawn(move || {
let mut reader = BufReader::new(stdout);
loop {
let mut line = String::new();
match reader.read_line(&mut line) {
Ok(0) => break, Ok(_) => {
let trimmed = line.trim_end().to_string();
if !trimmed.is_empty() && tx.send(trimmed).is_err() {
break;
}
}
Err(_) => break,
}
}
});
rx
}
fn next_frame(rx: &mpsc::Receiver<String>, timeout: Duration) -> Option<String> {
rx.recv_timeout(timeout).ok()
}
fn drain_frames(rx: &mpsc::Receiver<String>, window: Duration) -> Vec<String> {
let mut out = Vec::new();
let deadline = Instant::now() + window;
while Instant::now() < deadline {
match rx.recv_timeout(Duration::from_millis(250)) {
Ok(frame) => out.push(frame),
Err(mpsc::RecvTimeoutError::Timeout) => continue,
Err(mpsc::RecvTimeoutError::Disconnected) => break,
}
}
out
}
fn send(stdin: &mut std::process::ChildStdin, value: serde_json::Value) {
let line = format!("{}\n", serde_json::to_string(&value).unwrap());
stdin.write_all(line.as_bytes()).expect("write stdin");
stdin.flush().expect("flush stdin");
}
fn is_graph_changed(frame: &str) -> bool {
serde_json::from_str::<serde_json::Value>(frame)
.ok()
.and_then(|v| v.get("method").and_then(|m| m.as_str()).map(str::to_owned))
.as_deref()
== Some("notifications/m1nd/graph_changed")
}
fn tool_result_json(frame: &str, id: i64) -> Option<serde_json::Value> {
let v: serde_json::Value = serde_json::from_str(frame).ok()?;
if v.get("id").and_then(|i| i.as_i64()) != Some(id) {
return None;
}
let text = v
.get("result")?
.get("content")?
.get(0)?
.get("text")?
.as_str()?;
serde_json::from_str::<serde_json::Value>(text).ok()
}
#[test]
fn write_tools_return_real_envelopes_through_the_bridge() {
let tmp = tempfile::tempdir().expect("tempdir");
let port = free_port();
let base_url = format!("http://127.0.0.1:{port}");
let repo = tmp.path().join("repo");
std::fs::create_dir_all(&repo).expect("repo dir");
std::fs::write(
repo.join("lib.rs"),
"pub fn add(a: i32, b: i32) -> i32 { a + b }\npub fn mul(a: i32, b: i32) -> i32 { a * b }\n",
)
.expect("write lib.rs");
let mut owner = spawn_owner(port, tmp.path());
wait_until_serving(&base_url);
let mut bridge = Command::new(BIN)
.arg("--attach")
.arg(&base_url)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::null())
.spawn()
.expect("spawn --attach bridge");
let mut stdin = bridge.stdin.take().expect("bridge stdin");
let rx = spawn_line_reader(bridge.stdout.take().expect("bridge stdout"));
send(&mut stdin, serde_json::from_str(&init_payload()).unwrap());
let init_resp = next_frame(&rx, Duration::from_secs(15)).expect("initialize response");
assert!(
init_resp.contains("\"result\""),
"bridge initialize should return a result frame, got: {init_resp}"
);
send(
&mut stdin,
serde_json::json!({ "jsonrpc": "2.0", "method": "notifications/initialized" }),
);
std::thread::sleep(Duration::from_millis(800));
send(
&mut stdin,
serde_json::json!({
"jsonrpc": "2.0", "id": 2, "method": "tools/call",
"params": { "name": "ingest", "arguments": {
"agent_id": "l21", "path": repo.to_string_lossy() } }
}),
);
let ingest_frames = drain_frames(&rx, Duration::from_secs(8));
send(
&mut stdin,
serde_json::json!({
"jsonrpc": "2.0", "id": 3, "method": "tools/call",
"params": { "name": "memorize", "arguments": {
"agent_id": "l21", "node_label": "l21-fact",
"claims": [ { "label": "the bridge returns real envelopes", "confidence": 0.9 } ] } }
}),
);
let memorize_frames = drain_frames(&rx, Duration::from_secs(8));
let _ = stdin.write_all(b"");
drop(stdin);
let _ = bridge.kill();
let _ = bridge.wait();
let _ = owner.kill();
let _ = owner.wait();
let ingest_body = ingest_frames
.iter()
.find_map(|f| tool_result_json(f, 2))
.unwrap_or_else(|| {
panic!(
"ingest must return a JSON tool-result body (not null); frames seen: {ingest_frames:?}"
)
});
assert!(
ingest_body
.get("node_count")
.and_then(|v| v.as_u64())
.is_some(),
"ingest body must carry a non-null node_count; got: {ingest_body}"
);
assert!(
ingest_body
.get("edges_created")
.and_then(|v| v.as_u64())
.is_some(),
"ingest body must carry edges_created; got: {ingest_body}"
);
let memorize_body = memorize_frames
.iter()
.find_map(|f| tool_result_json(f, 3))
.unwrap_or_else(|| {
panic!(
"memorize must return a JSON tool-result body (not null); frames seen: {memorize_frames:?}"
)
});
assert_eq!(
memorize_body.get("ok").and_then(|v| v.as_bool()),
Some(true),
"memorize body must carry ok:true; got: {memorize_body}"
);
assert!(
memorize_body
.get("claims_written")
.and_then(|v| v.as_u64())
.is_some(),
"memorize body must carry claims_written; got: {memorize_body}"
);
let self_echoes: Vec<&String> = ingest_frames
.iter()
.chain(memorize_frames.iter())
.filter(|f| is_graph_changed(f))
.collect();
assert!(
self_echoes.is_empty(),
"the bridge must NOT forward a graph_changed self-echo for the caller's own \
mutation (this frame races the response and shows to the host as null); \
saw {} such frame(s): {:?}",
self_echoes.len(),
self_echoes
);
}