use std::io::{BufRead, BufReader, Write};
use std::process::{Command, Stdio};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::mpsc;
use std::time::Duration;
use axum::extract::State;
use axum::http::{HeaderMap, StatusCode};
use axum::response::{IntoResponse, Response};
use axum::routing::{get, post};
use axum::{Json, Router};
use serde_json::{Value, json};
#[derive(Clone)]
struct Mock {
valid_session: Arc<std::sync::Mutex<Option<String>>>,
restarted: Arc<AtomicBool>,
init_count: Arc<AtomicU64>,
toolcall_ok: Arc<AtomicU64>,
}
async fn mcp(State(s): State<Mock>, headers: HeaderMap, body: String) -> Response {
let v: Value = serde_json::from_str(&body).unwrap_or_else(|_| json!({}));
let method = v["method"].as_str().unwrap_or("");
let id = v.get("id").cloned();
match method {
"initialize" => {
let n = s.init_count.fetch_add(1, Ordering::SeqCst) + 1;
let sid = format!("sess-{n}");
*s.valid_session.lock().unwrap() = Some(sid.clone());
s.restarted.store(false, Ordering::SeqCst);
let mut resp = Json(json!({
"jsonrpc": "2.0", "id": id,
"result": {"protocolVersion":"2025-03-26","capabilities":{},
"serverInfo":{"name":"mock","version":"0"}}
}))
.into_response();
resp.headers_mut()
.insert("mcp-session-id", sid.parse().unwrap());
resp
}
"notifications/initialized" => StatusCode::ACCEPTED.into_response(),
"tools/call" => {
let sid = headers
.get("mcp-session-id")
.and_then(|h| h.to_str().ok())
.map(String::from);
let valid = s.valid_session.lock().unwrap().clone();
let stale = s.restarted.load(Ordering::SeqCst) || sid.is_none() || sid != valid;
if stale {
return (StatusCode::UNPROCESSABLE_ENTITY, "expect initialize").into_response();
}
s.toolcall_ok.fetch_add(1, Ordering::SeqCst);
Json(json!({
"jsonrpc": "2.0", "id": id,
"result": {"content": [{"type": "text", "text": "{\"ok\":true}"}]}
}))
.into_response()
}
_ => Json(json!({"jsonrpc":"2.0","id":id,"result":{}})).into_response(),
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn bridge_selects_by_identity_forwards_and_survives_restart() {
let mock = Mock {
valid_session: Arc::new(std::sync::Mutex::new(None)),
restarted: Arc::new(AtomicBool::new(false)),
init_count: Arc::new(AtomicU64::new(0)),
toolcall_ok: Arc::new(AtomicU64::new(0)),
};
let app = Router::new()
.route("/health", get(|| async { "ok" }))
.route("/mcp", post(mcp))
.with_state(mock.clone());
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
tokio::spawn(async move {
axum::serve(listener, app).await.unwrap();
});
let ident = "com.test.e2e-bridge";
let pid = std::process::id();
let dir = std::env::temp_dir().join("victauri").join(pid.to_string());
std::fs::create_dir_all(&dir).unwrap();
std::fs::write(dir.join("port"), port.to_string()).unwrap();
std::fs::write(dir.join("token"), "e2e-token").unwrap();
std::fs::write(
dir.join("metadata.json"),
json!({"pid": pid, "port": port, "identifier": ident, "product_name": "E2E"}).to_string(),
)
.unwrap();
let mut child = Command::new(env!("CARGO_BIN_EXE_victauri"))
.args(["bridge", "--wait", "--app", ident])
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::null())
.spawn()
.expect("spawn victauri bridge");
let mut stdin = child.stdin.take().unwrap();
let stdout = child.stdout.take().unwrap();
let (tx, rx) = mpsc::channel::<String>();
std::thread::spawn(move || {
for line in BufReader::new(stdout).lines() {
let Ok(line) = line else { break };
if tx.send(line).is_err() {
break;
}
}
});
let recv = |rx: &mpsc::Receiver<String>| -> Value {
let line = rx
.recv_timeout(Duration::from_secs(15))
.expect("bridge produced no response in time");
serde_json::from_str(&line).expect("bridge stdout is JSON")
};
let send = |stdin: &mut std::process::ChildStdin, v: Value| {
writeln!(stdin, "{v}").unwrap();
stdin.flush().unwrap();
};
send(
&mut stdin,
json!({"jsonrpc":"2.0","id":1,"method":"initialize","params":{}}),
);
let init = recv(&rx);
assert_eq!(init["id"], 1, "init response: {init}");
assert!(init.get("result").is_some(), "init had a result: {init}");
send(
&mut stdin,
json!({"jsonrpc":"2.0","method":"notifications/initialized"}),
);
send(
&mut stdin,
json!({"jsonrpc":"2.0","id":2,"method":"tools/call","params":{"name":"eval_js","arguments":{"code":"1+1"}}}),
);
let r2 = recv(&rx);
assert_eq!(r2["id"], 2);
assert!(r2.get("result").is_some(), "tool call #1 succeeded: {r2}");
mock.restarted.store(true, Ordering::SeqCst);
send(
&mut stdin,
json!({"jsonrpc":"2.0","id":3,"method":"tools/call","params":{"name":"eval_js","arguments":{"code":"2+2"}}}),
);
let r3 = recv(&rx);
assert_eq!(r3["id"], 3, "post-restart response: {r3}");
assert!(
r3.get("result").is_some() && r3.get("error").is_none(),
"tool call AFTER restart must transparently recover (not error): {r3}"
);
assert_eq!(
mock.init_count.load(Ordering::SeqCst),
2,
"expected one re-initialization after the restart"
);
assert_eq!(
mock.toolcall_ok.load(Ordering::SeqCst),
2,
"both tool calls should have executed on the backend"
);
drop(stdin);
let _ = child.kill();
let _ = child.wait();
let _ = std::fs::remove_dir_all(&dir);
}