use car_proto::RunTermination;
use car_server_core::{run_dispatch, ServerState, ServerStateConfig};
use car_memgine::MemgineEngine;
use futures::{SinkExt, StreamExt};
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::sync::Arc;
use tempfile::TempDir;
use tokio::net::TcpListener;
use tokio::sync::Mutex;
use tokio_tungstenite::{accept_async, connect_async, tungstenite::Message};
type Ws = tokio_tungstenite::WebSocketStream<
tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
>;
fn loopback_state(journal_dir: std::path::PathBuf) -> Arc<ServerState> {
let engine = Arc::new(Mutex::new(MemgineEngine::new(None)));
let cfg = ServerStateConfig::new(journal_dir).with_shared_memgine(engine);
Arc::new(ServerState::with_config(cfg))
}
async fn spawn_dispatcher(state: Arc<ServerState>) -> SocketAddr {
let listener = TcpListener::bind(SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(127, 0, 0, 1),
0,
)))
.await
.expect("bind loopback");
let addr = listener.local_addr().expect("local_addr");
tokio::spawn(async move {
let (stream, peer) = listener.accept().await.expect("accept");
let ws = accept_async(stream).await.expect("ws handshake");
let (write, read) = ws.split();
let _ = run_dispatch(read, Box::pin(write), peer.to_string(), state).await;
});
addr
}
async fn call_raw(ws: &mut Ws, id: &str, method: &str, params: serde_json::Value) -> serde_json::Value {
ws.send(Message::Text(
serde_json::json!({ "jsonrpc": "2.0", "id": id, "method": method, "params": params })
.to_string()
.into(),
))
.await
.expect("send request");
let text = ws
.next()
.await
.expect("response frame")
.expect("response frame ok")
.into_text()
.expect("text response")
.to_string();
serde_json::from_str(&text).expect("parse response")
}
async fn call_ok(ws: &mut Ws, id: &str, method: &str, params: serde_json::Value) -> serde_json::Value {
let resp = call_raw(ws, id, method, params).await;
assert!(
resp.get("error").is_none(),
"{method} should succeed; full response: {resp}"
);
resp["result"].clone()
}
async fn only_session_current_run(state: &Arc<ServerState>) -> Option<String> {
let session = {
let sessions = state.sessions.lock().await;
sessions
.values()
.next()
.expect("one connected session")
.clone()
};
let current = session.current_run_id.lock().await.clone();
current
}
fn success_outcome() -> serde_json::Value {
serde_json::json!({
"status": "success",
"summary": "did the thing",
"evidence": [],
"metrics": {
"turns": 1, "tool_calls": 1, "duration_ms": 12.0,
"retries": 0, "actions_succeeded": 1, "actions_failed": 0
},
"timestamp": chrono::Utc::now().to_rfc3339()
})
}
#[tokio::test]
async fn runs_start_returns_unique_ids_for_sequential_starts() {
let tmp = TempDir::new().unwrap();
let state = loopback_state(tmp.path().join("journals"));
let addr = spawn_dispatcher(state.clone()).await;
let (mut ws, _) = connect_async(format!("ws://{addr}")).await.unwrap();
let r1 = call_ok(
&mut ws,
"s1",
"runs.start",
serde_json::json!({ "agent_id": "agent-a", "intent": "first goal" }),
)
.await;
let r2 = call_ok(
&mut ws,
"s2",
"runs.start",
serde_json::json!({ "agent_id": "agent-a", "intent": "second goal" }),
)
.await;
let id1 = r1["run_id"].as_str().unwrap();
let id2 = r2["run_id"].as_str().unwrap();
assert!(!id1.is_empty() && !id2.is_empty());
assert_ne!(id1, id2, "two sequential starts must mint distinct run_ids");
assert_eq!(r1["agent_id"], "agent-a");
}
#[tokio::test]
async fn idempotency_key_dedups_runs_start() {
let tmp = TempDir::new().unwrap();
let state = loopback_state(tmp.path().join("journals"));
let addr = spawn_dispatcher(state.clone()).await;
let (mut ws, _) = connect_async(format!("ws://{addr}")).await.unwrap();
let r1 = call_ok(
&mut ws,
"k1",
"runs.start",
serde_json::json!({ "agent_id": "agent-a", "intent": "occurrence", "idempotency_key": "occ-42" }),
)
.await;
assert_eq!(r1["run_id"], "occ-42", "the key becomes the run_id");
assert_eq!(r1["agent_id"], "agent-a");
let r2 = call_ok(
&mut ws,
"k2",
"runs.start",
serde_json::json!({ "agent_id": "agent-a", "intent": "occurrence again", "idempotency_key": "occ-42" }),
)
.await;
assert_eq!(r2["run_id"], "occ-42", "same key returns the same run");
assert_eq!(r2["agent_id"], "agent-a");
assert_eq!(
state.run_store.list_runs("agent-a").len(),
1,
"an idempotent re-start must not open a duplicate run"
);
let r3 = call_ok(
&mut ws,
"k3",
"runs.start",
serde_json::json!({ "agent_id": "agent-a", "intent": "different", "idempotency_key": "occ-99" }),
)
.await;
assert_eq!(r3["run_id"], "occ-99");
assert_eq!(
state.run_store.list_runs("agent-a").len(),
2,
"a fresh key is a fresh run"
);
}
#[tokio::test]
async fn current_run_id_is_set_before_runs_start_responds() {
let tmp = TempDir::new().unwrap();
let state = loopback_state(tmp.path().join("journals"));
let addr = spawn_dispatcher(state.clone()).await;
let (mut ws, _) = connect_async(format!("ws://{addr}")).await.unwrap();
let r = call_ok(
&mut ws,
"s1",
"runs.start",
serde_json::json!({ "agent_id": "agent-a", "intent": "do it" }),
)
.await;
let run_id = r["run_id"].as_str().unwrap().to_string();
let current = only_session_current_run(&state).await;
assert_eq!(
current.as_deref(),
Some(run_id.as_str()),
"current_run_id must be set before runs.start responds"
);
}
#[tokio::test]
async fn runs_complete_records_terminal_outcome_status() {
let tmp = TempDir::new().unwrap();
let state = loopback_state(tmp.path().join("journals"));
let addr = spawn_dispatcher(state.clone()).await;
let (mut ws, _) = connect_async(format!("ws://{addr}")).await.unwrap();
let started = call_ok(
&mut ws,
"s1",
"runs.start",
serde_json::json!({ "agent_id": "agent-a", "intent": "do it" }),
)
.await;
let run_id = started["run_id"].as_str().unwrap().to_string();
let ack = call_ok(
&mut ws,
"c1",
"runs.complete",
serde_json::json!({ "run_id": run_id, "outcome": success_outcome() }),
)
.await;
assert_eq!(ack["ok"], true);
assert_eq!(ack["run_id"], run_id);
let meta = state.run_meta(&run_id).await.expect("run recorded");
match meta.termination {
Some(RunTermination::Outcome { status, .. }) => {
assert_eq!(status, car_ir::OutcomeStatus::Success);
}
other => panic!("expected Outcome termination, got {other:?}"),
}
assert_eq!(only_session_current_run(&state).await, None);
}
#[tokio::test]
async fn completed_then_closed_run_is_not_incomplete() {
let tmp = TempDir::new().unwrap();
let state = loopback_state(tmp.path().join("journals"));
let addr = spawn_dispatcher(state.clone()).await;
let (mut ws, _) = connect_async(format!("ws://{addr}")).await.unwrap();
let started = call_ok(
&mut ws,
"s1",
"runs.start",
serde_json::json!({ "agent_id": "agent-a", "intent": "do it" }),
)
.await;
let run_id = started["run_id"].as_str().unwrap().to_string();
call_ok(
&mut ws,
"c1",
"runs.complete",
serde_json::json!({ "run_id": run_id, "outcome": success_outcome() }),
)
.await;
ws.close(None).await.ok();
drop(ws);
tokio::time::sleep(std::time::Duration::from_millis(600)).await;
let meta = state.run_meta(&run_id).await.expect("run still recorded");
match meta.termination {
Some(RunTermination::Outcome { status, .. }) => {
assert_eq!(
status,
car_ir::OutcomeStatus::Success,
"a completed run must stay Success after close, not be raced to Incomplete"
);
}
other => panic!("expected the run to remain terminal Outcome, got {other:?}"),
}
}
#[tokio::test]
async fn mid_run_drop_yields_incomplete() {
let tmp = TempDir::new().unwrap();
let state = loopback_state(tmp.path().join("journals"));
let addr = spawn_dispatcher(state.clone()).await;
let (mut ws, _) = connect_async(format!("ws://{addr}")).await.unwrap();
let started = call_ok(
&mut ws,
"s1",
"runs.start",
serde_json::json!({ "agent_id": "agent-a", "intent": "do it" }),
)
.await;
let run_id = started["run_id"].as_str().unwrap().to_string();
ws.close(None).await.ok();
drop(ws);
tokio::time::sleep(std::time::Duration::from_millis(600)).await;
let meta = state.run_meta(&run_id).await.expect("run still recorded");
assert!(
matches!(meta.termination, Some(RunTermination::Incomplete)),
"a mid-run drop must record Incomplete, got {:?}",
meta.termination
);
}
#[tokio::test]
async fn runs_start_rejects_unresolvable_agent_but_name_fallback_records() {
let tmp = TempDir::new().unwrap();
let state = loopback_state(tmp.path().join("journals"));
let addr = spawn_dispatcher(state.clone()).await;
let (mut ws, _) = connect_async(format!("ws://{addr}")).await.unwrap();
let restore = std::env::var("CAR_AGENT_ID").ok();
std::env::remove_var("CAR_AGENT_ID");
let rejected = call_raw(
&mut ws,
"s1",
"runs.start",
serde_json::json!({ "intent": "orphan run" }),
)
.await;
assert!(
rejected.get("error").is_some(),
"runs.start with no resolvable agent_id must be rejected; got {rejected}"
);
let ok = call_ok(
&mut ws,
"s2",
"runs.start",
serde_json::json!({ "agent_name": "Bulldozer Agent", "intent": "one-shot run" }),
)
.await;
let agent_id = ok["agent_id"].as_str().unwrap();
assert_eq!(
agent_id, "name:bulldozer-agent",
"agent_name must synthesize a deterministic name-derived id"
);
assert!(!ok["run_id"].as_str().unwrap().is_empty());
if let Some(v) = restore {
std::env::set_var("CAR_AGENT_ID", v);
}
}
async fn bind_only_session(state: &Arc<ServerState>, agent_id: &str) {
let session = {
let sessions = state.sessions.lock().await;
sessions
.values()
.next()
.expect("one connected session")
.clone()
};
*session.agent_id.lock().await = Some(agent_id.to_string());
}
#[tokio::test]
async fn bound_session_cannot_forge_run_under_other_agent() {
let tmp = TempDir::new().unwrap();
let state = loopback_state(tmp.path().join("journals"));
let addr = spawn_dispatcher(state.clone()).await;
let (mut ws, _) = connect_async(format!("ws://{addr}")).await.unwrap();
let started_a = call_ok(
&mut ws,
"s0",
"runs.start",
serde_json::json!({ "agent_id": "agent-A", "intent": "warm up" }),
)
.await;
assert_eq!(started_a["agent_id"], "agent-A");
bind_only_session(&state, "agent-A").await;
let forged = call_raw(
&mut ws,
"s1",
"runs.start",
serde_json::json!({ "agent_id": "agent-B", "intent": "forge under B" }),
)
.await;
assert!(
forged.get("error").is_some(),
"a session bound to A must NOT be able to start a run as B; got {forged}"
);
assert!(
state.run_store.list_runs("agent-B").is_empty(),
"no run may be attributed to agent-B by the A-bound session"
);
let matching = call_ok(
&mut ws,
"s2",
"runs.start",
serde_json::json!({ "agent_id": "agent-A", "intent": "legit, explicit" }),
)
.await;
assert_eq!(matching["agent_id"], "agent-A");
let derived = call_ok(
&mut ws,
"s3",
"runs.start",
serde_json::json!({ "intent": "legit, derived" }),
)
.await;
assert_eq!(
derived["agent_id"], "agent-A",
"an unspecified agent_id on a bound session derives the bound agent"
);
}