use car_ffi_common::proxy::DaemonClient;
use car_memgine::MemgineEngine;
use car_server_core::{run_dispatch, ServerState, ServerStateConfig};
use futures::{SinkExt, StreamExt};
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::sync::Arc;
use tempfile::TempDir;
use tokio::net::TcpListener;
use tokio::sync::Mutex;
use tokio_tungstenite::accept_async;
use tokio_tungstenite::tungstenite::Message;
async fn spawn_one_shot_dispatcher(state: Arc<ServerState>) -> SocketAddr {
let listener = TcpListener::bind(SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(127, 0, 0, 1),
0,
)))
.await
.expect("bind loopback");
let addr = listener.local_addr().expect("local_addr");
tokio::spawn(async move {
let (stream, peer) = listener.accept().await.expect("accept");
let ws = accept_async(stream).await.expect("ws handshake");
let (write, read) = futures::StreamExt::split(ws);
let _ = run_dispatch(read, Box::pin(write), peer.to_string(), state).await;
});
addr
}
fn loopback_state(journal_dir: std::path::PathBuf) -> Arc<ServerState> {
let engine = Arc::new(Mutex::new(MemgineEngine::new(None)));
let cfg = ServerStateConfig::new(journal_dir).with_shared_memgine(engine);
Arc::new(ServerState::with_config(cfg))
}
#[tokio::test]
async fn state_round_trips_on_single_client() {
let tmp = TempDir::new().expect("tempdir");
let state = loopback_state(tmp.path().to_path_buf());
let addr = spawn_one_shot_dispatcher(state).await;
let url = format!("ws://{}", addr);
let client = DaemonClient::with_url(url);
client
.call(
"state.set",
serde_json::json!({ "key": "k", "value": "v" }),
)
.await
.expect("state.set");
let got = client
.call("state.get", serde_json::json!({ "key": "k" }))
.await
.expect("state.get");
assert_eq!(
got,
serde_json::Value::String("v".into()),
"expected the write to be visible to the read on the same client/session"
);
}
#[tokio::test]
async fn a2ui_capabilities_and_round_trip() {
let tmp = TempDir::new().expect("tempdir");
let state = loopback_state(tmp.path().to_path_buf());
let addr = spawn_one_shot_dispatcher(state).await;
let url = format!("ws://{}", addr);
let client = DaemonClient::with_url(url);
let caps = client
.call("a2ui.capabilities", serde_json::Value::Null)
.await
.expect("a2ui.capabilities");
assert!(
caps.get("version").is_some(),
"capabilities should advertise version, got: {caps:#}"
);
assert!(
caps.get("catalogs").and_then(|v| v.as_array()).is_some(),
"capabilities should advertise catalogs array, got: {caps:#}"
);
let initial = client
.call("a2ui.surfaces", serde_json::Value::Null)
.await
.expect("a2ui.surfaces (initial)");
assert_eq!(
initial,
serde_json::Value::Array(vec![]),
"no surfaces yet"
);
}
#[tokio::test]
async fn state_extras_round_trip() {
let tmp = TempDir::new().expect("tempdir");
let state = loopback_state(tmp.path().to_path_buf());
let addr = spawn_one_shot_dispatcher(state).await;
let url = format!("ws://{}", addr);
let client = DaemonClient::with_url(url);
client
.call("state.set", serde_json::json!({ "key": "a", "value": 1 }))
.await
.expect("set a");
client
.call("state.set", serde_json::json!({ "key": "b", "value": "two" }))
.await
.expect("set b");
let exists_a = client
.call("state.exists", serde_json::json!({ "key": "a" }))
.await
.expect("exists a");
assert_eq!(exists_a, serde_json::Value::Bool(true));
let exists_z = client
.call("state.exists", serde_json::json!({ "key": "z" }))
.await
.expect("exists z");
assert_eq!(exists_z, serde_json::Value::Bool(false));
let keys = client
.call("state.keys", serde_json::Value::Null)
.await
.expect("keys");
let mut keys_vec: Vec<String> =
serde_json::from_value(keys).expect("keys is array of strings");
keys_vec.sort();
assert_eq!(keys_vec, vec!["a".to_string(), "b".to_string()]);
let snapshot = client
.call("state.snapshot", serde_json::Value::Null)
.await
.expect("snapshot");
assert_eq!(
snapshot,
serde_json::json!({ "a": 1, "b": "two" }),
"snapshot should return all keys with their values"
);
}
#[tokio::test]
async fn server_initiated_request_routes_to_handler() {
let listener = TcpListener::bind(SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(127, 0, 0, 1),
0,
)))
.await
.expect("bind loopback");
let addr = listener.local_addr().expect("local_addr");
tokio::spawn(async move {
let (stream, _peer) = listener.accept().await.expect("accept");
let ws = accept_async(stream).await.expect("ws handshake");
let (mut write, mut read) = ws.split();
let first = read
.next()
.await
.expect("first frame")
.expect("frame ok");
let text = first.to_text().expect("text").to_string();
let env: serde_json::Value = serde_json::from_str(&text).expect("parse client req");
let client_req_id = env.get("id").cloned().unwrap_or(serde_json::Value::Null);
let server_req = serde_json::json!({
"jsonrpc": "2.0",
"id": "cb-1",
"method": "tools.execute",
"params": {
"action_id": "cb-1",
"tool": "echo",
"parameters": { "value": "hello" },
"attempt": 1,
},
});
write
.send(Message::Text(server_req.to_string().into()))
.await
.expect("send server req");
let resp_frame = read
.next()
.await
.expect("response frame")
.expect("frame ok");
let resp_text = resp_frame.to_text().expect("text").to_string();
let resp: serde_json::Value = serde_json::from_str(&resp_text).expect("parse resp");
let captured = resp.get("result").cloned().unwrap_or(serde_json::Value::Null);
let final_resp = serde_json::json!({
"jsonrpc": "2.0",
"id": client_req_id,
"result": captured,
});
write
.send(Message::Text(final_resp.to_string().into()))
.await
.expect("send final resp");
});
let url = format!("ws://{}", addr);
let client = DaemonClient::with_url(url);
client.register_handler("tools.execute", |params| async move {
let echoed = params
.get("parameters")
.and_then(|p| p.get("value"))
.cloned()
.unwrap_or(serde_json::Value::Null);
Ok(serde_json::json!({ "echoed": echoed }))
});
let result = client
.call("noop", serde_json::Value::Null)
.await
.expect("noop call");
assert_eq!(
result,
serde_json::json!({ "echoed": "hello" }),
"tool handler should have echoed the parameter back through the server"
);
}
#[tokio::test]
async fn notification_handler_receives_voice_event_shape() {
use std::sync::Arc as StdArc;
use std::sync::Mutex as StdMutex;
let listener = TcpListener::bind(SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(127, 0, 0, 1),
0,
)))
.await
.expect("bind loopback");
let addr = listener.local_addr().expect("local_addr");
tokio::spawn(async move {
let (stream, _) = listener.accept().await.expect("accept");
let ws = accept_async(stream).await.expect("ws handshake");
let (mut write, mut read) = futures::StreamExt::split(ws);
let _ = tokio::time::timeout(std::time::Duration::from_millis(50), read.next()).await;
let notification = serde_json::json!({
"jsonrpc": "2.0",
"method": "voice.event",
"params": {
"session_id": "sess-1",
"event": { "type": "delta", "text": "hello" }
}
});
let _ = write
.send(Message::Text(notification.to_string().into()))
.await;
loop {
tokio::time::sleep(std::time::Duration::from_secs(60)).await;
}
});
let url = format!("ws://{}", addr);
let client = DaemonClient::with_url(url);
let captured: StdArc<StdMutex<Option<serde_json::Value>>> = StdArc::new(StdMutex::new(None));
let captured_for_handler = captured.clone();
client.register_notification_handler("voice.event", move |params| {
if let Ok(mut g) = captured_for_handler.lock() {
*g = Some(params);
}
});
let _ = tokio::time::timeout(
std::time::Duration::from_millis(500),
client.call("__unknown__", serde_json::Value::Null),
)
.await;
for _ in 0..40 {
if captured.lock().unwrap().is_some() {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
let got = captured.lock().unwrap().clone();
let got = got.expect("notification handler should have fired");
assert_eq!(got["session_id"], "sess-1");
assert_eq!(got["event"]["type"], "delta");
assert_eq!(got["event"]["text"], "hello");
}
#[tokio::test]
async fn notification_handler_panic_does_not_kill_recv_loop() {
use std::sync::Arc as StdArc;
use std::sync::Mutex as StdMutex;
let listener = TcpListener::bind(SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(127, 0, 0, 1),
0,
)))
.await
.expect("bind loopback");
let addr = listener.local_addr().expect("local_addr");
tokio::spawn(async move {
let (stream, _) = listener.accept().await.expect("accept");
let ws = accept_async(stream).await.expect("ws handshake");
let (mut write, mut read) = futures::StreamExt::split(ws);
let _ = tokio::time::timeout(std::time::Duration::from_millis(50), read.next()).await;
let mk_event = |which: &str| {
serde_json::json!({
"jsonrpc": "2.0",
"method": "voice.event",
"params": { "session_id": which, "event": {} }
})
.to_string()
};
let _ = write.send(Message::Text(mk_event("first").into())).await;
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let _ = write.send(Message::Text(mk_event("second").into())).await;
loop {
tokio::time::sleep(std::time::Duration::from_secs(60)).await;
}
});
let url = format!("ws://{}", addr);
let client = DaemonClient::with_url(url);
let seen: StdArc<StdMutex<Vec<String>>> = StdArc::new(StdMutex::new(Vec::new()));
let seen_for_handler = seen.clone();
client.register_notification_handler("voice.event", move |params| {
let id = params
.get("session_id")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
if id == "first" {
panic!("intentional panic from handler");
}
if let Ok(mut g) = seen_for_handler.lock() {
g.push(id);
}
});
let _ = tokio::time::timeout(
std::time::Duration::from_millis(500),
client.call("__unknown__", serde_json::Value::Null),
)
.await;
for _ in 0..40 {
if !seen.lock().unwrap().is_empty() {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
let seen_ids = seen.lock().unwrap().clone();
assert_eq!(
seen_ids,
vec!["second".to_string()],
"second notification must process — recv loop survived first handler's panic"
);
}