use car_memgine::MemgineEngine;
use car_server_core::{run_dispatch, ServerState, ServerStateConfig};
use futures::{SinkExt, StreamExt};
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::sync::Arc;
use tempfile::TempDir;
use tokio::net::TcpListener;
use tokio::sync::Mutex;
use tokio_tungstenite::{accept_async, connect_async, tungstenite::Message};
fn loopback_state(journal_dir: std::path::PathBuf) -> Arc<ServerState> {
let engine = Arc::new(Mutex::new(MemgineEngine::new(None)));
let cfg = ServerStateConfig::new(journal_dir).with_shared_memgine(engine);
Arc::new(ServerState::with_config(cfg))
}
async fn spawn_one_shot_dispatcher(state: Arc<ServerState>) -> SocketAddr {
let listener = TcpListener::bind(SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(127, 0, 0, 1),
0,
)))
.await
.expect("bind loopback");
let addr = listener.local_addr().expect("local_addr");
tokio::spawn(async move {
let (stream, peer) = listener.accept().await.expect("accept");
let ws = accept_async(stream).await.expect("ws handshake");
let (write, read) = ws.split();
let _ = run_dispatch(read, Box::pin(write), peer.to_string(), state).await;
});
addr
}
async fn rpc_one(addr: SocketAddr, request: serde_json::Value) -> serde_json::Value {
let url = format!("ws://{}", addr);
let (mut ws, _resp) = connect_async(&url).await.expect("ws client connect");
let body = serde_json::to_string(&request).expect("request to_string");
ws.send(Message::Text(body.into()))
.await
.expect("send request");
let resp = ws
.next()
.await
.expect("response frame")
.expect("response frame ok");
let text = match resp {
Message::Text(t) => t.to_string(),
other => panic!("expected Text frame, got {:?}", other),
};
serde_json::from_str(&text).expect("parse response JSON")
}
#[tokio::test]
async fn session_init_round_trip_through_run_dispatch() {
let tmp = TempDir::new().unwrap();
let state = loopback_state(tmp.path().join("journals"));
let addr = spawn_one_shot_dispatcher(state).await;
let resp = rpc_one(
addr,
serde_json::json!({
"jsonrpc": "2.0",
"id": "init-1",
"method": "session.init",
"params": { "client_id": "embed-smoke", "tools": [], "policies": [] }
}),
)
.await;
assert_eq!(resp.get("id").and_then(|v| v.as_str()), Some("init-1"));
let result = resp
.get("result")
.unwrap_or_else(|| panic!("session.init should succeed; full response: {}", resp));
assert!(result.get("session_id").is_some());
assert_eq!(
result.get("tools_registered").and_then(|v| v.as_u64()),
Some(0)
);
assert_eq!(
result.get("policies_registered").and_then(|v| v.as_u64()),
Some(0)
);
}
#[tokio::test]
async fn a2ui_round_trips_through_run_dispatch() {
let tmp = TempDir::new().unwrap();
let state = loopback_state(tmp.path().join("journals"));
let addr = spawn_one_shot_dispatcher(state).await;
let url = format!("ws://{}", addr);
let (mut ws, _) = connect_async(&url).await.expect("ws client connect");
async fn call(
ws: &mut tokio_tungstenite::WebSocketStream<
tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
>,
id: &str,
method: &str,
params: serde_json::Value,
) -> serde_json::Value {
ws.send(Message::Text(
serde_json::json!({
"jsonrpc": "2.0",
"id": id,
"method": method,
"params": params
})
.to_string()
.into(),
))
.await
.expect("send request");
let text = ws
.next()
.await
.expect("response frame")
.expect("response frame ok")
.into_text()
.expect("text response")
.to_string();
let resp: serde_json::Value = serde_json::from_str(&text).expect("parse response");
assert!(
resp.get("error").is_none(),
"{} should succeed; full response: {}",
method,
resp
);
resp["result"].clone()
}
let capabilities = call(&mut ws, "cap", "a2ui.capabilities", serde_json::json!({})).await;
assert!(capabilities["components"]
.as_array()
.unwrap()
.iter()
.any(|component| component == "Button"));
call(
&mut ws,
"create",
"a2ui.apply",
serde_json::json!({
"version": "v0.9",
"createSurface": { "surfaceId": "approval" }
}),
)
.await;
call(
&mut ws,
"components",
"a2ui.apply",
serde_json::json!({
"version": "v0.9",
"updateComponents": {
"surfaceId": "approval",
"components": [
{ "id": "root", "component": "Button", "text": "Approve", "action": { "name": "approve" } }
]
}
}),
)
.await;
let surfaces = call(&mut ws, "list", "a2ui.surfaces", serde_json::json!({})).await;
assert_eq!(surfaces[0]["surfaceId"], "approval");
assert_eq!(surfaces[0]["components"]["root"]["component"], "Button");
let action = call(
&mut ws,
"action",
"a2ui.action",
serde_json::json!({
"name": "approve",
"surfaceId": "approval",
"sourceComponentId": "root",
"timestamp": "2026-05-08T00:00:00Z",
"context": {}
}),
)
.await;
assert_eq!(action["route"]["delivered"], false);
let reap = call(&mut ws, "reap", "a2ui.reap", serde_json::json!({})).await;
assert_eq!(reap["removed"].as_array().unwrap().len(), 0);
}
#[tokio::test]
async fn event_log_can_be_truncated_and_cleared_through_run_dispatch() {
let tmp = TempDir::new().unwrap();
let state = loopback_state(tmp.path().join("journals"));
let addr = spawn_one_shot_dispatcher(state).await;
let url = format!("ws://{}", addr);
let (mut ws, _) = connect_async(&url).await.expect("ws client connect");
async fn call(
ws: &mut tokio_tungstenite::WebSocketStream<
tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
>,
id: &str,
method: &str,
params: serde_json::Value,
) -> serde_json::Value {
ws.send(Message::Text(
serde_json::json!({
"jsonrpc": "2.0",
"id": id,
"method": method,
"params": params
})
.to_string()
.into(),
))
.await
.expect("send request");
let text = ws
.next()
.await
.expect("response frame")
.expect("response frame ok")
.into_text()
.expect("text response")
.to_string();
let resp: serde_json::Value = serde_json::from_str(&text).expect("parse response");
assert!(
resp.get("error").is_none(),
"{} should succeed; full response: {}",
method,
resp
);
resp["result"].clone()
}
call(
&mut ws,
"init",
"session.init",
serde_json::json!({ "client_id": "event-log-smoke", "tools": [], "policies": [] }),
)
.await;
call(
&mut ws,
"proposal",
"proposal.submit",
serde_json::json!({
"proposal": {
"id": "event-log-proposal",
"source": "test",
"actions": [{
"id": "write-x",
"type": "state_write",
"parameters": { "key": "x", "value": 1 }
}]
}
}),
)
.await;
let stats = call(&mut ws, "stats", "events.stats", serde_json::json!({})).await;
assert!(stats["events"].as_u64().unwrap() > 0);
assert!(stats["spans"].as_u64().unwrap() > 0);
let truncated = call(
&mut ws,
"truncate",
"events.truncate",
serde_json::json!({ "maxEvents": 1, "maxSpans": 1 }),
)
.await;
assert!(truncated["removedEvents"].as_u64().unwrap() > 0);
assert_eq!(truncated["stats"]["events"], 1);
assert_eq!(truncated["stats"]["spans"], 1);
let cleared = call(&mut ws, "clear", "events.clear", serde_json::json!({})).await;
assert_eq!(cleared["stats"]["events"], 0);
assert_eq!(cleared["stats"]["spans"], 0);
}
#[tokio::test]
async fn a2a_send_rejects_non_loopback_endpoint_without_opt_in() {
let tmp = TempDir::new().unwrap();
let state = loopback_state(tmp.path().join("journals"));
let addr = spawn_one_shot_dispatcher(state).await;
let resp = rpc_one(
addr,
serde_json::json!({
"jsonrpc": "2.0",
"id": "a2a-send",
"method": "a2a.send",
"params": {
"endpoint": "http://198.51.100.7:8080",
"message": {
"messageId": "m1",
"role": "user",
"parts": [{ "kind": "text", "text": "hello" }]
}
}
}),
)
.await;
let error = resp
.get("error")
.and_then(|err| err.get("message"))
.and_then(|msg| msg.as_str())
.unwrap_or("");
assert!(
error.contains("allowUntrustedEndpoint"),
"expected endpoint rejection, got {}",
resp
);
}
#[tokio::test]
async fn a2ui_update_without_fresh_route_auth_clears_stale_auth() {
let tmp = TempDir::new().unwrap();
let state = loopback_state(tmp.path().join("journals"));
let addr = spawn_one_shot_dispatcher(state.clone()).await;
let url = format!("ws://{}", addr);
let (mut ws, _) = connect_async(&url).await.expect("ws client connect");
async fn call(
ws: &mut tokio_tungstenite::WebSocketStream<
tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
>,
id: &str,
method: &str,
params: serde_json::Value,
) -> serde_json::Value {
ws.send(Message::Text(
serde_json::json!({
"jsonrpc": "2.0",
"id": id,
"method": method,
"params": params
})
.to_string()
.into(),
))
.await
.expect("send request");
let text = ws
.next()
.await
.expect("response frame")
.expect("response frame ok")
.into_text()
.expect("text response")
.to_string();
let resp: serde_json::Value = serde_json::from_str(&text).expect("parse response");
assert!(
resp.get("error").is_none(),
"{} should succeed; full response: {}",
method,
resp
);
resp["result"].clone()
}
call(
&mut ws,
"ingest",
"a2ui.ingest",
serde_json::json!({
"endpoint": "http://127.0.0.1:9109",
"routeAuth": { "type": "bearer", "token": "secret-token" },
"owner": { "kind": "a2a", "taskId": "task-1", "contextId": "ctx-1" },
"payload": {
"version": "v0.9",
"createSurface": { "surfaceId": "auth-test" }
}
}),
)
.await;
assert!(state.a2ui_route_auth.lock().await.contains_key("auth-test"));
call(
&mut ws,
"update",
"a2ui.apply",
serde_json::json!({
"version": "v0.9",
"updateComponents": {
"surfaceId": "auth-test",
"components": [
{ "id": "root", "component": "Text", "text": "updated" }
]
}
}),
)
.await;
assert!(!state.a2ui_route_auth.lock().await.contains_key("auth-test"));
}
#[tokio::test]
async fn memory_add_fact_then_query_through_tokio_lock_path() {
let tmp = TempDir::new().unwrap();
let state = loopback_state(tmp.path().join("journals"));
let addr = spawn_one_shot_dispatcher(state).await;
let url = format!("ws://{}", addr);
let (mut ws, _) = connect_async(&url).await.expect("ws client connect");
ws.send(Message::Text(
serde_json::to_string(&serde_json::json!({
"jsonrpc": "2.0",
"id": "init",
"method": "session.init",
"params": { "client_id": "embed-smoke", "tools": [], "policies": [] }
}))
.unwrap()
.into(),
))
.await
.unwrap();
let _init = ws.next().await.unwrap().unwrap();
ws.send(Message::Text(
serde_json::to_string(&serde_json::json!({
"jsonrpc": "2.0",
"id": "add",
"method": "memory.add_fact",
"params": {
"subject": "smoke-test",
"body": "embedders can write facts",
"kind": "pattern"
}
}))
.unwrap()
.into(),
))
.await
.unwrap();
let add_resp = ws.next().await.unwrap().unwrap();
let add_text = match add_resp {
Message::Text(t) => t.to_string(),
other => panic!("unexpected: {:?}", other),
};
let add_json: serde_json::Value = serde_json::from_str(&add_text).unwrap();
assert!(
add_json.get("result").is_some(),
"memory.add_fact should succeed; got {}",
add_text
);
ws.send(Message::Text(
serde_json::to_string(&serde_json::json!({
"jsonrpc": "2.0",
"id": "q",
"method": "memory.query",
"params": { "query": "smoke-test", "k": 5 }
}))
.unwrap()
.into(),
))
.await
.unwrap();
let q_resp = ws.next().await.unwrap().unwrap();
let q_text = match q_resp {
Message::Text(t) => t.to_string(),
other => panic!("unexpected: {:?}", other),
};
let q_json: serde_json::Value = serde_json::from_str(&q_text).unwrap();
let result_arr = q_json
.get("result")
.and_then(|v| v.as_array())
.expect("memory.query result array");
assert!(
!result_arr.is_empty(),
"memory.query should retrieve the just-added fact; got {}",
q_text
);
}
#[tokio::test]
async fn distinct_states_do_not_share_memgine() {
let tmp1 = TempDir::new().unwrap();
let tmp2 = TempDir::new().unwrap();
let engine_a = Arc::new(Mutex::new(MemgineEngine::new(None)));
let engine_b = Arc::new(Mutex::new(MemgineEngine::new(None)));
let state_a = Arc::new(ServerState::with_config(
ServerStateConfig::new(tmp1.path().join("a")).with_shared_memgine(engine_a.clone()),
));
let state_b = Arc::new(ServerState::with_config(
ServerStateConfig::new(tmp2.path().join("b")).with_shared_memgine(engine_b.clone()),
));
{
let mut guard = engine_a.lock().await;
guard.ingest_fact(
"f-a",
"marker",
"in engine A only",
"user",
"peer",
chrono::Utc::now(),
"global",
None,
vec![],
false,
);
}
let count_b = engine_b.lock().await.valid_fact_count();
assert_eq!(
count_b, 0,
"two distinct ServerStates must not share memgine state"
);
let count_a = engine_a.lock().await.valid_fact_count();
assert_eq!(count_a, 1, "engine_a should hold the fact we wrote");
assert!(state_a.shared_memgine.is_some());
assert!(state_b.shared_memgine.is_some());
assert!(
Arc::ptr_eq(state_a.shared_memgine.as_ref().unwrap(), &engine_a),
"state_a should hold the engine_a handle"
);
assert!(
Arc::ptr_eq(state_b.shared_memgine.as_ref().unwrap(), &engine_b),
"state_b should hold the engine_b handle"
);
}
#[tokio::test]
async fn library_does_not_spawn_a_dream_loop_at_boot() {
let tmp = TempDir::new().unwrap();
let engine = Arc::new(Mutex::new(MemgineEngine::new(None)));
let cfg =
ServerStateConfig::new(tmp.path().join("journals")).with_shared_memgine(engine.clone());
let state = Arc::new(ServerState::with_config(cfg));
let addr = spawn_one_shot_dispatcher(state.clone()).await;
let _ = rpc_one(
addr,
serde_json::json!({
"jsonrpc": "2.0",
"id": "init",
"method": "session.init",
"params": { "client_id": "no-dream-loop", "tools": [], "policies": [] }
}),
)
.await;
assert!(
!tmp.path().join("tasks").exists(),
"library spawned a dream/scheduler task store; it must not"
);
assert!(
!tmp.path().join("trajectories").exists(),
"library wrote dream-loop trajectories; it must not"
);
}
#[tokio::test]
async fn handler_panic_does_not_poison_engine_for_other_connections() {
let engine = Arc::new(Mutex::new(MemgineEngine::new(None)));
let cloned = engine.clone();
let join = tokio::spawn(async move {
let _guard = cloned.lock().await;
panic!("simulated handler panic while holding engine lock");
});
let result = join.await;
assert!(
result.is_err(),
"panicking task should report JoinError, got {:?}",
result
);
let _guard = engine.lock().await;
}
#[tokio::test]
async fn disconnect_removes_session_from_registry() {
let tmp = TempDir::new().unwrap();
let state = loopback_state(tmp.path().join("journals"));
let addr = spawn_one_shot_dispatcher(state.clone()).await;
let url = format!("ws://{}", addr);
{
let (mut ws, _) = connect_async(&url).await.expect("ws client connect");
ws.send(Message::Text(
serde_json::to_string(&serde_json::json!({
"jsonrpc": "2.0",
"id": "init",
"method": "session.init",
"params": { "client_id": "ws3-regression", "tools": [], "policies": [] }
}))
.unwrap()
.into(),
))
.await
.unwrap();
let _init = ws.next().await.unwrap().unwrap();
let count_before = state.sessions.lock().await.len();
assert_eq!(
count_before, 1,
"session.init should have inserted exactly one session"
);
let _ = ws.send(Message::Close(None)).await;
}
let mut removed = false;
for _ in 0..50 {
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
if state.sessions.lock().await.is_empty() {
removed = true;
break;
}
}
assert!(
removed,
"session must be removed from state.sessions within 500ms of disconnect"
);
}
#[tokio::test]
async fn ui_agent_round_trips_through_run_dispatch() {
let tmp = TempDir::new().unwrap();
let state = loopback_state(tmp.path().join("journals"));
let addr = spawn_one_shot_dispatcher(state).await;
let url = format!("ws://{}", addr);
let (mut ws, _) = connect_async(&url).await.expect("ws client connect");
async fn call(
ws: &mut tokio_tungstenite::WebSocketStream<
tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
>,
id: &str,
method: &str,
params: serde_json::Value,
) -> serde_json::Value {
ws.send(Message::Text(
serde_json::json!({
"jsonrpc": "2.0",
"id": id,
"method": method,
"params": params
})
.to_string()
.into(),
))
.await
.expect("send request");
loop {
let text = ws
.next()
.await
.expect("response frame")
.expect("response frame ok")
.into_text()
.expect("text response")
.to_string();
let resp: serde_json::Value = serde_json::from_str(&text).expect("parse response");
if resp.get("id").and_then(|v| v.as_str()) == Some(id) {
assert!(
resp.get("error").is_none(),
"{} should succeed; full response: {}",
method,
resp
);
return resp["result"].clone();
}
}
}
call(
&mut ws,
"create",
"a2ui.apply",
serde_json::json!({
"version": "v0.9",
"createSurface": { "surfaceId": "inbox" }
}),
)
.await;
call(
&mut ws,
"components",
"a2ui.apply",
serde_json::json!({
"version": "v0.9",
"updateComponents": {
"surfaceId": "inbox",
"components": [
{
"id": "root",
"component": "List",
"props": { "items": { "path": "/items" } }
}
]
}
}),
)
.await;
call(
&mut ws,
"report",
"a2ui.render_report",
serde_json::json!({
"surfaceId": "inbox",
"signature": {
"structure": "list",
"cardinality_band": "2-5",
"action_count_band": "none"
},
"density": "comfortable",
"visibleFieldCount": 2,
"actionCount": 0,
"viewport": { "width": 400, "height": 600, "scale": 1.0 },
"hiddenViaOverflow": [],
"timestamp": "2026-05-11T12:00:00Z",
"reportId": "rpt-1",
"sequence": 1,
"placeholderCount": 0,
"signatureStableSince": 1
}),
)
.await;
let surfaces = call(&mut ws, "list", "a2ui.surfaces", serde_json::json!({})).await;
let inbox = surfaces
.as_array()
.expect("surfaces is array")
.iter()
.find(|s| s["surfaceId"] == "inbox")
.expect("inbox surface present");
let domain_hint = &inbox["components"]["root"]["props"]["x-renderer"]["domain"];
assert_eq!(
domain_hint, "small_list",
"agent should have tagged root with x-renderer.domain=small_list; \
got: {}",
inbox["components"]["root"]
);
}
#[tokio::test]
async fn ui_agent_no_patch_when_domain_already_set() {
let tmp = TempDir::new().unwrap();
let state = loopback_state(tmp.path().join("journals"));
let addr = spawn_one_shot_dispatcher(state).await;
let url = format!("ws://{}", addr);
let (mut ws, _) = connect_async(&url).await.expect("ws client connect");
async fn call(
ws: &mut tokio_tungstenite::WebSocketStream<
tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
>,
id: &str,
method: &str,
params: serde_json::Value,
) -> serde_json::Value {
ws.send(Message::Text(
serde_json::json!({
"jsonrpc": "2.0",
"id": id,
"method": method,
"params": params
})
.to_string()
.into(),
))
.await
.expect("send");
loop {
let text = ws
.next()
.await
.expect("frame")
.expect("frame ok")
.into_text()
.expect("text")
.to_string();
let resp: serde_json::Value = serde_json::from_str(&text).expect("json");
if resp.get("id").and_then(|v| v.as_str()) == Some(id) {
assert!(resp.get("error").is_none(), "{}: {}", method, resp);
return resp["result"].clone();
}
}
}
call(
&mut ws,
"create",
"a2ui.apply",
serde_json::json!({
"version": "v0.9",
"createSurface": { "surfaceId": "inbox" }
}),
)
.await;
call(
&mut ws,
"components",
"a2ui.apply",
serde_json::json!({
"version": "v0.9",
"updateComponents": {
"surfaceId": "inbox",
"components": [
{
"id": "root",
"component": "List",
"props": {
"items": { "path": "/items" },
"x-renderer": { "domain": "small_list" }
}
}
]
}
}),
)
.await;
let before = call(&mut ws, "list1", "a2ui.surfaces", serde_json::json!({})).await;
let before_root = before[0]["components"]["root"].clone();
call(
&mut ws,
"report",
"a2ui.render_report",
serde_json::json!({
"surfaceId": "inbox",
"signature": {
"structure": "list",
"cardinality_band": "2-5",
"action_count_band": "none",
"domain": "small_list"
},
"density": "comfortable",
"visibleFieldCount": 2,
"actionCount": 0,
"viewport": { "width": 400, "height": 600, "scale": 1.0 },
"hiddenViaOverflow": [],
"timestamp": "2026-05-11T12:00:00Z",
"reportId": "rpt-1",
"sequence": 1,
"placeholderCount": 0,
"signatureStableSince": 1
}),
)
.await;
let after = call(&mut ws, "list2", "a2ui.surfaces", serde_json::json!({})).await;
let after_root = after[0]["components"]["root"].clone();
assert_eq!(
before_root, after_root,
"agent should not have mutated a domain-tagged surface"
);
}