use car_memgine::MemgineEngine;
use car_proto::{RunRecord, RunTurn, VerifierVerdict};
use car_server_core::{run_dispatch, ServerState, ServerStateConfig};
use futures::{SinkExt, StreamExt};
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::sync::Arc;
use std::time::Duration;
use tempfile::TempDir;
use tokio::net::TcpListener;
use tokio::sync::Mutex;
use tokio_tungstenite::{accept_async, connect_async, tungstenite::Message};
type Ws = tokio_tungstenite::WebSocketStream<
tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
>;
const TEST_HOST_TOKEN: &str = "test-host-token-0123456789abcdef0123456789";
fn loopback_state(journal_dir: std::path::PathBuf) -> Arc<ServerState> {
let engine = Arc::new(Mutex::new(MemgineEngine::new(None)));
let cfg = ServerStateConfig::new(journal_dir).with_shared_memgine(engine);
let state = Arc::new(ServerState::with_config(cfg));
let _ = state.install_host_token(TEST_HOST_TOKEN.to_string());
state
}
async fn auth_host(ws: &mut Ws, events: &mut Vec<serde_json::Value>) {
let resp =
call_collecting_events(ws, "ha", "session.auth",
serde_json::json!({ "host_token": TEST_HOST_TOKEN }), events).await;
assert!(resp.get("error").is_none(), "host auth must succeed: {resp}");
assert_eq!(resp["result"]["role"], "host", "host auth must grant host role: {resp}");
}
async fn spawn_multi_dispatcher(state: Arc<ServerState>) -> SocketAddr {
let listener = TcpListener::bind(SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(127, 0, 0, 1),
0,
)))
.await
.expect("bind loopback");
let addr = listener.local_addr().expect("local_addr");
tokio::spawn(async move {
loop {
let (stream, peer) = match listener.accept().await {
Ok(v) => v,
Err(_) => break,
};
let state = state.clone();
tokio::spawn(async move {
let ws = match accept_async(stream).await {
Ok(ws) => ws,
Err(_) => return,
};
let (write, read) = ws.split();
let _ = run_dispatch(read, Box::pin(write), peer.to_string(), state).await;
});
}
});
addr
}
async fn connect(addr: SocketAddr) -> Ws {
let (ws, _) = connect_async(format!("ws://{addr}")).await.unwrap();
ws
}
async fn send(ws: &mut Ws, id: &str, method: &str, params: serde_json::Value) {
ws.send(Message::Text(
serde_json::json!({ "jsonrpc": "2.0", "id": id, "method": method, "params": params })
.to_string()
.into(),
))
.await
.expect("send request");
}
async fn next_json(ws: &mut Ws) -> serde_json::Value {
let text = ws
.next()
.await
.expect("frame")
.expect("frame ok")
.into_text()
.expect("text")
.to_string();
serde_json::from_str(&text).expect("parse json")
}
async fn call_collecting_events(
ws: &mut Ws,
id: &str,
method: &str,
params: serde_json::Value,
events: &mut Vec<serde_json::Value>,
) -> serde_json::Value {
send(ws, id, method, params).await;
loop {
let msg = next_json(ws).await;
if msg.get("method").and_then(|m| m.as_str()) == Some("runs.trace.event") {
events.push(msg["params"].clone());
continue;
}
if msg.get("id").and_then(|v| v.as_str()) == Some(id) {
return msg;
}
}
}
async fn submit_servicing_callbacks(
ws: &mut Ws,
id: &str,
params: serde_json::Value,
tool_output: &serde_json::Value,
) -> serde_json::Value {
send(ws, id, "proposal.submit", params).await;
loop {
let msg = next_json(ws).await;
if msg.get("method").and_then(|m| m.as_str()) == Some("tools.execute") {
let cb_id = msg["id"].as_str().expect("callback id").to_string();
ws.send(Message::Text(
serde_json::json!({ "jsonrpc": "2.0", "id": cb_id, "result": tool_output })
.to_string()
.into(),
))
.await
.expect("send callback reply");
continue;
}
if msg.get("id").and_then(|v| v.as_str()) == Some(id) {
return msg;
}
}
}
async fn register_tools(ws: &mut Ws, names: &[&str]) {
let tools: Vec<serde_json::Value> = names
.iter()
.map(|n| serde_json::json!({ "name": n, "description": "", "parameters": {} }))
.collect();
send(ws, "reg", "tools.register", serde_json::json!(tools)).await;
let resp = next_json(ws).await;
assert!(resp.get("error").is_none(), "tools.register failed: {resp}");
}
async fn start_run(ws: &mut Ws, agent_id: &str) -> String {
send(
ws,
"start",
"runs.start",
serde_json::json!({ "agent_id": agent_id, "intent": "do the thing" }),
)
.await;
let resp = next_json(ws).await;
assert!(resp.get("error").is_none(), "runs.start failed: {resp}");
resp["result"]["run_id"].as_str().unwrap().to_string()
}
fn drive_proposal(action_id: &str, prompt: &str) -> serde_json::Value {
serde_json::json!({
"proposal": {
"id": format!("p-{action_id}"),
"source": "test",
"actions": [{
"id": action_id,
"type": "tool_call",
"tool": "drive_cli",
"parameters": { "cli": "claude", "prompt": prompt },
"preconditions": [],
"expected_effects": {},
"state_dependencies": [],
"idempotent": false,
"max_retries": 0,
"failure_behavior": "abort",
"metadata": {}
}],
"timestamp": chrono::Utc::now().to_rfc3339(),
"context": {}
}
})
}
fn drive_output() -> serde_json::Value {
serde_json::json!({ "cli": "claude", "exit_code": 0, "output_tail": "ok" })
}
fn success_outcome() -> serde_json::Value {
serde_json::json!({
"status": "success",
"summary": "did it",
"evidence": [],
"metrics": {
"turns": 1, "tool_calls": 1, "duration_ms": 1.0,
"retries": 0, "actions_succeeded": 1, "actions_failed": 0
},
"timestamp": chrono::Utc::now().to_rfc3339()
})
}
fn event_turn(ev: &serde_json::Value) -> Option<(RunTurn, usize)> {
let cursor = ev["cursor"].as_u64()? as usize;
let rec: RunRecord = serde_json::from_value(ev["record"].clone()).ok()?;
match rec {
RunRecord::Turn(t) => Some((t, cursor)),
_ => None,
}
}
async fn read_turn_events_until(
sub: &mut Ws,
events: &mut Vec<serde_json::Value>,
expected_total: usize,
) {
while events.iter().filter(|e| event_turn(e).is_some()).count() < expected_total {
let frame = tokio::time::timeout(Duration::from_secs(2), next_json(sub)).await;
match frame {
Ok(msg) => {
if msg.get("method").and_then(|m| m.as_str()) == Some("runs.trace.event") {
events.push(msg["params"].clone());
}
}
Err(_) => panic!(
"timed out waiting for {expected_total} turn events; have {}",
events.iter().filter(|e| event_turn(e).is_some()).count()
),
}
}
}
#[tokio::test]
async fn subscribe_at_turn_zero_streams_every_turn_in_order() {
let tmp = TempDir::new().unwrap();
let state = loopback_state(tmp.path().join("journals"));
let addr = spawn_multi_dispatcher(state.clone()).await;
let mut harness = connect(addr).await;
register_tools(&mut harness, &["drive_cli"]).await;
let run_id = start_run(&mut harness, "agent-a").await;
let mut sub = connect(addr).await;
let mut sub_events = Vec::new();
auth_host(&mut sub, &mut sub_events).await;
let snap_resp = {
let resp = call_collecting_events(
&mut sub,
"hs",
"host.subscribe",
serde_json::json!({}),
&mut sub_events,
)
.await;
assert!(resp.get("error").is_none(), "host.subscribe: {resp}");
call_collecting_events(
&mut sub,
"sub",
"runs.subscribe",
serde_json::json!({ "run_id": run_id }),
&mut sub_events,
)
.await
};
let snap = &snap_resp["result"];
assert_eq!(snap["cursor"].as_u64().unwrap(), 0, "turn-0 snapshot is empty");
assert!(snap["turns_so_far"].as_array().unwrap().is_empty());
assert_eq!(snap["status"], "in_progress");
for (i, prompt) in ["first", "second", "third"].iter().enumerate() {
let r = submit_servicing_callbacks(
&mut harness,
&format!("p{i}"),
drive_proposal(&format!("a{i}"), prompt),
&drive_output(),
)
.await;
assert!(r.get("error").is_none(), "submit {i}: {r}");
}
read_turn_events_until(&mut sub, &mut sub_events, 3).await;
let turns: Vec<(RunTurn, usize)> =
sub_events.iter().filter_map(event_turn).collect();
assert_eq!(turns.len(), 3, "exactly three streamed turns");
assert_eq!(turns[0].1, 1, "first turn cursor");
assert_eq!(turns[1].1, 2);
assert_eq!(turns[2].1, 3);
assert_eq!(turns[0].0.prompt.as_deref(), Some("first"));
assert_eq!(turns[1].0.prompt.as_deref(), Some("second"));
assert_eq!(turns[2].0.prompt.as_deref(), Some("third"));
assert_eq!(turns[0].0.index, 0);
assert_eq!(turns[2].0.index, 2);
}
#[tokio::test]
async fn subscribe_mid_run_snapshot_then_no_dup_at_boundary() {
let tmp = TempDir::new().unwrap();
let state = loopback_state(tmp.path().join("journals"));
let addr = spawn_multi_dispatcher(state.clone()).await;
let mut harness = connect(addr).await;
register_tools(&mut harness, &["drive_cli"]).await;
let run_id = start_run(&mut harness, "agent-a").await;
for i in 0..7 {
let r = submit_servicing_callbacks(
&mut harness,
&format!("p{i}"),
drive_proposal(&format!("a{i}"), &format!("turn-{i}")),
&drive_output(),
)
.await;
assert!(r.get("error").is_none(), "submit {i}: {r}");
}
let mut sub = connect(addr).await;
let mut sub_events = Vec::new();
auth_host(&mut sub, &mut sub_events).await;
call_collecting_events(&mut sub, "hs", "host.subscribe", serde_json::json!({}), &mut sub_events).await;
let snap_resp = call_collecting_events(
&mut sub,
"sub",
"runs.subscribe",
serde_json::json!({ "run_id": run_id }),
&mut sub_events,
)
.await;
let snap = &snap_resp["result"];
assert_eq!(snap["cursor"].as_u64().unwrap(), 7, "mid-run cursor is the turn count");
let snap_turns = snap["turns_so_far"].as_array().unwrap();
assert_eq!(snap_turns.len(), 7, "snapshot has turns ≤ 7");
for (i, t) in snap_turns.iter().enumerate() {
let turn: RunTurn = serde_json::from_value(t.clone()).unwrap();
assert_eq!(turn.index, i);
assert_eq!(turn.prompt.as_deref(), Some(format!("turn-{i}").as_str()));
}
assert!(
sub_events.iter().filter(|e| event_turn(e).is_some()).count() == 0,
"no streamed turn events before the boundary"
);
let r = submit_servicing_callbacks(
&mut harness,
"p8",
drive_proposal("a8", "turn-7"),
&drive_output(),
)
.await;
assert!(r.get("error").is_none(), "submit 8: {r}");
read_turn_events_until(&mut sub, &mut sub_events, 1).await;
let streamed: Vec<(RunTurn, usize)> = sub_events.iter().filter_map(event_turn).collect();
assert_eq!(streamed.len(), 1, "only the post-boundary turn streams");
assert_eq!(streamed[0].1, 8, "boundary turn cursor is snapshot cursor + 1");
assert_eq!(streamed[0].0.index, 7);
assert_eq!(streamed[0].0.prompt.as_deref(), Some("turn-7"));
}
#[tokio::test]
async fn terminal_event_delivered_on_complete() {
let tmp = TempDir::new().unwrap();
let state = loopback_state(tmp.path().join("journals"));
let addr = spawn_multi_dispatcher(state.clone()).await;
let mut harness = connect(addr).await;
register_tools(&mut harness, &["drive_cli"]).await;
let run_id = start_run(&mut harness, "agent-a").await;
let mut sub = connect(addr).await;
let mut sub_events = Vec::new();
auth_host(&mut sub, &mut sub_events).await;
call_collecting_events(&mut sub, "hs", "host.subscribe", serde_json::json!({}), &mut sub_events).await;
call_collecting_events(
&mut sub,
"sub",
"runs.subscribe",
serde_json::json!({ "run_id": run_id }),
&mut sub_events,
)
.await;
submit_servicing_callbacks(&mut harness, "p0", drive_proposal("a0", "go"), &drive_output()).await;
send(
&mut harness,
"done",
"runs.complete",
serde_json::json!({ "run_id": run_id, "outcome": success_outcome() }),
)
.await;
let ack = next_json(&mut harness).await;
assert_eq!(ack["result"]["ok"], true);
let mut saw_terminal = false;
let deadline = tokio::time::Instant::now() + Duration::from_secs(3);
while tokio::time::Instant::now() < deadline {
let frame = tokio::time::timeout(Duration::from_secs(2), next_json(&mut sub)).await;
let Ok(msg) = frame else { break };
if msg.get("method").and_then(|m| m.as_str()) == Some("runs.trace.event") {
let params = &msg["params"];
let rec: RunRecord = serde_json::from_value(params["record"].clone()).unwrap();
if let RunRecord::Ended(e) = rec {
assert_eq!(e.run_id, run_id);
assert_eq!(params["status"], "completed");
saw_terminal = true;
break;
}
}
}
assert!(saw_terminal, "a terminal runs.trace.event must be delivered on complete");
}
#[tokio::test]
async fn two_subscribers_both_receive_all_events() {
let tmp = TempDir::new().unwrap();
let state = loopback_state(tmp.path().join("journals"));
let addr = spawn_multi_dispatcher(state.clone()).await;
let mut harness = connect(addr).await;
register_tools(&mut harness, &["drive_cli"]).await;
let run_id = start_run(&mut harness, "agent-a").await;
let mut sub1 = connect(addr).await;
let mut e1 = Vec::new();
auth_host(&mut sub1, &mut e1).await;
call_collecting_events(&mut sub1, "hs", "host.subscribe", serde_json::json!({}), &mut e1).await;
call_collecting_events(&mut sub1, "sub", "runs.subscribe", serde_json::json!({ "run_id": run_id }), &mut e1).await;
let mut sub2 = connect(addr).await;
let mut e2 = Vec::new();
auth_host(&mut sub2, &mut e2).await;
call_collecting_events(&mut sub2, "hs", "host.subscribe", serde_json::json!({}), &mut e2).await;
call_collecting_events(&mut sub2, "sub", "runs.subscribe", serde_json::json!({ "run_id": run_id }), &mut e2).await;
for i in 0..2 {
submit_servicing_callbacks(
&mut harness,
&format!("p{i}"),
drive_proposal(&format!("a{i}"), &format!("t{i}")),
&drive_output(),
)
.await;
}
read_turn_events_until(&mut sub1, &mut e1, 2).await;
read_turn_events_until(&mut sub2, &mut e2, 2).await;
let t1: Vec<_> = e1.iter().filter_map(event_turn).collect();
let t2: Vec<_> = e2.iter().filter_map(event_turn).collect();
assert_eq!(t1.len(), 2, "subscriber 1 got both turns");
assert_eq!(t2.len(), 2, "subscriber 2 got both turns (fanout)");
assert_eq!(t1[0].1, 1);
assert_eq!(t2[0].1, 1);
assert_eq!(t1[1].1, 2);
assert_eq!(t2[1].1, 2);
}
#[tokio::test]
async fn slow_subscriber_does_not_stall_runs_lock_or_other_rpcs() {
let tmp = TempDir::new().unwrap();
let state = loopback_state(tmp.path().join("journals"));
let addr = spawn_multi_dispatcher(state.clone()).await;
let mut harness = connect(addr).await;
register_tools(&mut harness, &["drive_cli"]).await;
let run_id = start_run(&mut harness, "agent-a").await;
let mut slow = connect(addr).await;
let mut slow_events = Vec::new();
auth_host(&mut slow, &mut slow_events).await;
call_collecting_events(&mut slow, "hs", "host.subscribe", serde_json::json!({}), &mut slow_events).await;
let snap = call_collecting_events(
&mut slow,
"sub",
"runs.subscribe",
serde_json::json!({ "run_id": run_id }),
&mut slow_events,
)
.await;
assert!(snap.get("error").is_none());
let drive_all = async {
for i in 0..40 {
let r = submit_servicing_callbacks(
&mut harness,
&format!("p{i}"),
drive_proposal(&format!("a{i}"), &format!("t{i}")),
&drive_output(),
)
.await;
assert!(r.get("error").is_none(), "submit {i} stalled or errored: {r}");
}
};
tokio::time::timeout(Duration::from_secs(15), drive_all)
.await
.expect("driving turns must not stall behind a wedged subscriber");
let mut other = connect(addr).await;
let probe = tokio::time::timeout(
Duration::from_secs(3),
call_collecting_events(
&mut other,
"k",
"state.keys",
serde_json::json!({}),
&mut Vec::new(),
),
)
.await
.expect("unrelated RPC must not be stalled by a wedged subscriber");
assert!(probe.get("error").is_none(), "state.keys: {probe}");
assert_eq!(state.run_turn_count(&run_id).await, 40);
}
#[tokio::test]
async fn unentitled_caller_is_rejected_on_subscribe() {
let tmp = TempDir::new().unwrap();
let state = loopback_state(tmp.path().join("journals"));
let addr = spawn_multi_dispatcher(state.clone()).await;
let mut harness = connect(addr).await;
register_tools(&mut harness, &["drive_cli"]).await;
let run_id = start_run(&mut harness, "agent-a").await;
let mut intruder = connect(addr).await;
let resp = call_collecting_events(
&mut intruder,
"sub",
"runs.subscribe",
serde_json::json!({ "run_id": run_id }),
&mut Vec::new(),
)
.await;
assert!(
resp.get("error").is_none(),
"unauthorized subscribe must NOT return a distinguishable error \
frame (existence/owner oracle), got {resp}"
);
assert_eq!(
resp["result"]["not_found"], true,
"unauthorized subscribe must return the uniform not-found marker, got {resp}"
);
assert!(
!resp.to_string().contains("agent-a"),
"response must not leak the owning agent_id, got {resp}"
);
let unknown = call_collecting_events(
&mut intruder,
"sub2",
"runs.subscribe",
serde_json::json!({ "run_id": "00000000-0000-0000-0000-000000000000" }),
&mut Vec::new(),
)
.await;
assert!(unknown.get("error").is_none(), "unknown run is not an error: {unknown}");
assert_eq!(
unknown["result"]["not_found"], true,
"unknown run returns the same uniform not-found marker, got {unknown}"
);
let mut faux_host = connect(addr).await;
let mut fe = Vec::new();
call_collecting_events(&mut faux_host, "hs", "host.subscribe", serde_json::json!({}), &mut fe).await;
let elevated = call_collecting_events(
&mut faux_host,
"sub",
"runs.subscribe",
serde_json::json!({ "run_id": run_id }),
&mut fe,
)
.await;
assert!(elevated.get("error").is_none(), "still no oracle on the faux-host path: {elevated}");
assert_eq!(
elevated["result"]["not_found"], true,
"host.subscribe WITHOUT the host token must NOT authorize run reads (Parslee-ai/car#254 \
self-elevation), got {elevated}"
);
let mut host = connect(addr).await;
let mut he = Vec::new();
auth_host(&mut host, &mut he).await;
let ok = call_collecting_events(
&mut host,
"sub",
"runs.subscribe",
serde_json::json!({ "run_id": run_id }),
&mut he,
)
.await;
assert!(ok.get("error").is_none(), "host-role client must be authorized: {ok}");
assert!(
ok["result"].get("not_found").is_none(),
"an entitled host-role client gets the snapshot, not the not-found marker: {ok}"
);
}
#[tokio::test]
async fn resubscribe_after_drop_covers_gap_without_dup() {
let tmp = TempDir::new().unwrap();
let state = loopback_state(tmp.path().join("journals"));
let addr = spawn_multi_dispatcher(state.clone()).await;
let mut harness = connect(addr).await;
register_tools(&mut harness, &["drive_cli"]).await;
let run_id = start_run(&mut harness, "agent-a").await;
let mut sub = connect(addr).await;
let mut events = Vec::new();
auth_host(&mut sub, &mut events).await;
call_collecting_events(&mut sub, "hs", "host.subscribe", serde_json::json!({}), &mut events).await;
call_collecting_events(&mut sub, "sub", "runs.subscribe", serde_json::json!({ "run_id": run_id }), &mut events).await;
for i in 0..2 {
submit_servicing_callbacks(&mut harness, &format!("p{i}"), drive_proposal(&format!("a{i}"), &format!("t{i}")), &drive_output()).await;
}
read_turn_events_until(&mut sub, &mut events, 2).await;
let seen_before: Vec<_> = events.iter().filter_map(event_turn).collect();
assert_eq!(seen_before.len(), 2);
sub.close(None).await.ok();
drop(sub);
tokio::time::sleep(Duration::from_millis(100)).await;
for i in 2..4 {
submit_servicing_callbacks(&mut harness, &format!("p{i}"), drive_proposal(&format!("a{i}"), &format!("t{i}")), &drive_output()).await;
}
let mut sub2 = connect(addr).await;
let mut events2 = Vec::new();
auth_host(&mut sub2, &mut events2).await;
call_collecting_events(&mut sub2, "hs", "host.subscribe", serde_json::json!({}), &mut events2).await;
let snap_resp = call_collecting_events(&mut sub2, "sub", "runs.subscribe", serde_json::json!({ "run_id": run_id }), &mut events2).await;
let snap = &snap_resp["result"];
assert_eq!(snap["cursor"].as_u64().unwrap(), 4, "fresh snapshot covers the gap");
let snap_turns = snap["turns_so_far"].as_array().unwrap();
assert_eq!(snap_turns.len(), 4, "snapshot includes turns emitted during the outage");
for (i, t) in snap_turns.iter().enumerate() {
let turn: RunTurn = serde_json::from_value(t.clone()).unwrap();
assert_eq!(turn.index, i, "snapshot turn {i} index");
assert_eq!(turn.prompt.as_deref(), Some(format!("t{i}").as_str()));
}
submit_servicing_callbacks(&mut harness, "p4", drive_proposal("a4", "t4"), &drive_output()).await;
read_turn_events_until(&mut sub2, &mut events2, 1).await;
let streamed: Vec<_> = events2.iter().filter_map(event_turn).collect();
assert_eq!(streamed.len(), 1, "only the post-resubscribe turn streams");
assert_eq!(streamed[0].1, 5);
assert_eq!(streamed[0].0.index, 4);
assert_eq!(streamed[0].0.prompt.as_deref(), Some("t4"));
}
#[tokio::test]
async fn unsubscribe_stops_stream() {
let tmp = TempDir::new().unwrap();
let state = loopback_state(tmp.path().join("journals"));
let addr = spawn_multi_dispatcher(state.clone()).await;
let mut harness = connect(addr).await;
register_tools(&mut harness, &["drive_cli"]).await;
let run_id = start_run(&mut harness, "agent-a").await;
let mut sub = connect(addr).await;
let mut events = Vec::new();
auth_host(&mut sub, &mut events).await;
call_collecting_events(&mut sub, "hs", "host.subscribe", serde_json::json!({}), &mut events).await;
call_collecting_events(&mut sub, "sub", "runs.subscribe", serde_json::json!({ "run_id": run_id }), &mut events).await;
submit_servicing_callbacks(&mut harness, "p0", drive_proposal("a0", "t0"), &drive_output()).await;
read_turn_events_until(&mut sub, &mut events, 1).await;
let un = call_collecting_events(&mut sub, "un", "runs.unsubscribe", serde_json::json!({ "run_id": run_id }), &mut events).await;
assert_eq!(un["result"]["removed"], true, "unsubscribe removed the subscription");
submit_servicing_callbacks(&mut harness, "p1", drive_proposal("a1", "t1"), &drive_output()).await;
let extra = tokio::time::timeout(Duration::from_millis(500), next_json(&mut sub)).await;
if let Ok(msg) = extra {
assert_ne!(
msg.get("method").and_then(|m| m.as_str()),
Some("runs.trace.event"),
"no events after unsubscribe; got {msg}"
);
}
let _ = &events;
let seen: Vec<_> = events.iter().filter_map(event_turn).collect();
assert_eq!(seen.len(), 1, "exactly the pre-unsubscribe turn was delivered");
}
#[allow(dead_code)]
fn _verdict_in_scope() -> VerifierVerdict {
VerifierVerdict::NotRun
}