use car_proto::{CliOutcome, RunRecord, RunTurn, VerifierVerdict};
use car_memgine::MemgineEngine;
use car_server_core::{run_dispatch, ServerState, ServerStateConfig};
use futures::{SinkExt, StreamExt};
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::sync::Arc;
use tempfile::TempDir;
use tokio::net::TcpListener;
use tokio::sync::Mutex;
use tokio_tungstenite::{accept_async, connect_async, tungstenite::Message};
type Ws = tokio_tungstenite::WebSocketStream<
tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
>;
fn loopback_state(journal_dir: std::path::PathBuf) -> Arc<ServerState> {
let engine = Arc::new(Mutex::new(MemgineEngine::new(None)));
let cfg = ServerStateConfig::new(journal_dir).with_shared_memgine(engine);
Arc::new(ServerState::with_config(cfg))
}
async fn spawn_dispatcher(state: Arc<ServerState>) -> SocketAddr {
let listener = TcpListener::bind(SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(127, 0, 0, 1),
0,
)))
.await
.expect("bind loopback");
let addr = listener.local_addr().expect("local_addr");
tokio::spawn(async move {
let (stream, peer) = listener.accept().await.expect("accept");
let ws = accept_async(stream).await.expect("ws handshake");
let (write, read) = ws.split();
let _ = run_dispatch(read, Box::pin(write), peer.to_string(), state).await;
});
addr
}
async fn send(ws: &mut Ws, id: &str, method: &str, params: serde_json::Value) {
ws.send(Message::Text(
serde_json::json!({ "jsonrpc": "2.0", "id": id, "method": method, "params": params })
.to_string()
.into(),
))
.await
.expect("send request");
}
async fn next_json(ws: &mut Ws) -> serde_json::Value {
let text = ws
.next()
.await
.expect("frame")
.expect("frame ok")
.into_text()
.expect("text")
.to_string();
serde_json::from_str(&text).expect("parse json")
}
async fn call_servicing_callbacks(
ws: &mut Ws,
id: &str,
method: &str,
params: serde_json::Value,
tool_output: &serde_json::Value,
) -> serde_json::Value {
send(ws, id, method, params).await;
loop {
let msg = next_json(ws).await;
if msg.get("method").and_then(|m| m.as_str()) == Some("tools.execute") {
let cb_id = msg["id"].as_str().expect("callback id").to_string();
ws.send(Message::Text(
serde_json::json!({
"jsonrpc": "2.0",
"id": cb_id,
"result": tool_output
})
.to_string()
.into(),
))
.await
.expect("send callback reply");
continue;
}
if msg.get("id").and_then(|v| v.as_str()) == Some(id) {
return msg;
}
}
}
async fn register_tools(ws: &mut Ws, names: &[&str]) {
let tools: Vec<serde_json::Value> = names
.iter()
.map(|n| serde_json::json!({ "name": n, "description": "", "parameters": {} }))
.collect();
send(ws, "reg", "tools.register", serde_json::json!(tools)).await;
let resp = next_json(ws).await;
assert!(resp.get("error").is_none(), "tools.register failed: {resp}");
}
async fn start_run(ws: &mut Ws, agent_id: &str) -> String {
send(
ws,
"start",
"runs.start",
serde_json::json!({ "agent_id": agent_id, "intent": "do the thing" }),
)
.await;
let resp = next_json(ws).await;
assert!(resp.get("error").is_none(), "runs.start failed: {resp}");
resp["result"]["run_id"].as_str().unwrap().to_string()
}
async fn recorded_turns(state: &Arc<ServerState>, run_id: &str) -> Vec<RunTurn> {
state
.run_turns(run_id)
.await
.into_iter()
.map(|rec| match rec {
RunRecord::Turn(t) => t,
other => panic!("expected only Turn records, got {other:?}"),
})
.collect()
}
fn drive_proposal(action_id: &str, prompt: &str) -> serde_json::Value {
serde_json::json!({
"proposal": {
"id": "p-drive",
"source": "test",
"actions": [{
"id": action_id,
"type": "tool_call",
"tool": "drive_cli",
"parameters": { "cli": "claude", "prompt": prompt }
}]
}
})
}
fn check_proposal(action_id: &str, command: &str) -> serde_json::Value {
serde_json::json!({
"proposal": {
"id": "p-check",
"source": "test",
"actions": [{
"id": action_id,
"type": "tool_call",
"tool": "check_outcome",
"parameters": { "command": command }
}]
}
})
}
#[tokio::test]
async fn drive_cli_turn_records_prompt_output_and_exit_code() {
let tmp = TempDir::new().unwrap();
let state = loopback_state(tmp.path().join("journals"));
let addr = spawn_dispatcher(state.clone()).await;
let (mut ws, _) = connect_async(format!("ws://{addr}")).await.unwrap();
register_tools(&mut ws, &["drive_cli"]).await;
let run_id = start_run(&mut ws, "agent-a").await;
let prompt = "make the failing test pass";
let out = serde_json::json!({
"cli": "claude", "exit_code": 0, "timed_out": false,
"output_tail": "1 passed, 0 failed"
});
let resp = call_servicing_callbacks(
&mut ws,
"sub1",
"proposal.submit",
drive_proposal("a1", prompt),
&out,
)
.await;
assert!(resp.get("error").is_none(), "submit failed: {resp}");
let turns = recorded_turns(&state, &run_id).await;
assert_eq!(turns.len(), 1, "exactly one turn recorded");
let t = &turns[0];
assert_eq!(t.index, 0);
assert_eq!(t.prompt.as_deref(), Some(prompt), "exact prompt recorded");
assert_eq!(t.tool.as_deref(), Some("drive_cli"));
assert_eq!(t.cli_outcome, Some(CliOutcome::Exited { code: 0 }));
assert_eq!(t.verifier_verdict, VerifierVerdict::NotRun);
assert_eq!(
t.output.as_ref().unwrap().get("output_tail").unwrap(),
&serde_json::json!("1 passed, 0 failed")
);
}
#[tokio::test]
async fn check_outcome_records_pass_then_fail() {
let tmp = TempDir::new().unwrap();
let state = loopback_state(tmp.path().join("journals"));
let addr = spawn_dispatcher(state.clone()).await;
let (mut ws, _) = connect_async(format!("ws://{addr}")).await.unwrap();
register_tools(&mut ws, &["check_outcome"]).await;
let run_id = start_run(&mut ws, "agent-a").await;
let pass_out = serde_json::json!({ "passed": true, "output_tail": "ok" });
call_servicing_callbacks(
&mut ws,
"v1",
"proposal.submit",
check_proposal("c1", "test -f built"),
&pass_out,
)
.await;
let fail_out = serde_json::json!({ "passed": false, "output_tail": "still failing" });
call_servicing_callbacks(
&mut ws,
"v2",
"proposal.submit",
check_proposal("c2", "test -f built"),
&fail_out,
)
.await;
let turns = recorded_turns(&state, &run_id).await;
assert_eq!(turns.len(), 2);
assert_eq!(turns[0].verifier_verdict, VerifierVerdict::Pass);
assert_eq!(turns[0].index, 0);
assert_eq!(turns[1].verifier_verdict, VerifierVerdict::Fail);
assert_eq!(turns[1].index, 1);
assert!(turns[1].cli_outcome.is_none());
}
#[tokio::test]
async fn policy_rejected_drive_records_rejection_and_tool_never_runs() {
let tmp = TempDir::new().unwrap();
let state = loopback_state(tmp.path().join("journals"));
let addr = spawn_dispatcher(state.clone()).await;
let (mut ws, _) = connect_async(format!("ws://{addr}")).await.unwrap();
register_tools(&mut ws, &["drive_cli"]).await;
let run_id = start_run(&mut ws, "agent-a").await;
send(
&mut ws,
"pol",
"policy.register",
serde_json::json!({
"name": "no-destructive",
"rule": "deny_tool_param",
"target": "drive_cli",
"key": "prompt",
"pattern": "rm -rf"
}),
)
.await;
let pol_resp = next_json(&mut ws).await;
assert!(pol_resp.get("error").is_none(), "policy.register failed: {pol_resp}");
let never = serde_json::json!({ "exit_code": 0, "output_tail": "SHOULD NOT APPEAR" });
let resp = call_servicing_callbacks(
&mut ws,
"sub1",
"proposal.submit",
drive_proposal("a1", "please run rm -rf / to clean up"),
&never,
)
.await;
assert!(resp.get("error").is_none(), "submit failed: {resp}");
let turns = recorded_turns(&state, &run_id).await;
assert_eq!(turns.len(), 1);
let t = &turns[0];
let pr = t.policy_rejected.as_ref().expect("policy_rejected recorded");
assert!(pr.rule.contains("no-destructive"), "rule carries policy name: {}", pr.rule);
assert_eq!(pr.param.as_deref(), Some("prompt"));
assert_eq!(t.cli_outcome, None);
assert_eq!(t.verifier_verdict, VerifierVerdict::NotRun);
assert!(
t.output.is_none(),
"rejected action must have no tool output (body never ran), got {:?}",
t.output
);
}
#[tokio::test]
async fn timed_out_drive_records_timeout_and_not_run() {
let tmp = TempDir::new().unwrap();
let state = loopback_state(tmp.path().join("journals"));
let addr = spawn_dispatcher(state.clone()).await;
let (mut ws, _) = connect_async(format!("ws://{addr}")).await.unwrap();
register_tools(&mut ws, &["drive_cli"]).await;
let run_id = start_run(&mut ws, "agent-a").await;
let out = serde_json::json!({
"cli": "claude", "exit_code": null, "timed_out": true,
"output_tail": "...(killed after 180s)"
});
call_servicing_callbacks(
&mut ws,
"sub1",
"proposal.submit",
drive_proposal("a1", "a very long task"),
&out,
)
.await;
let turns = recorded_turns(&state, &run_id).await;
assert_eq!(turns.len(), 1);
assert_eq!(turns[0].cli_outcome, Some(CliOutcome::Timeout));
assert_eq!(turns[0].verifier_verdict, VerifierVerdict::NotRun);
}
#[tokio::test]
async fn non_bulldozer_tool_records_generic_turn() {
let tmp = TempDir::new().unwrap();
let state = loopback_state(tmp.path().join("journals"));
let addr = spawn_dispatcher(state.clone()).await;
let (mut ws, _) = connect_async(format!("ws://{addr}")).await.unwrap();
register_tools(&mut ws, &["search"]).await;
let run_id = start_run(&mut ws, "agent-a").await;
let proposal = serde_json::json!({
"proposal": {
"id": "p-search",
"source": "test",
"actions": [{
"id": "s1",
"type": "tool_call",
"tool": "search",
"parameters": { "query": "rust ownership" }
}]
}
});
let out = serde_json::json!({ "hits": ["a", "b"] });
call_servicing_callbacks(&mut ws, "sub1", "proposal.submit", proposal, &out).await;
let turns = recorded_turns(&state, &run_id).await;
assert_eq!(turns.len(), 1);
let t = &turns[0];
assert_eq!(t.tool.as_deref(), Some("search"));
assert_eq!(
t.parameters.get("query").unwrap(),
&serde_json::json!("rust ownership")
);
assert_eq!(t.output, Some(out));
assert_eq!(t.cli_outcome, None);
assert_eq!(t.verifier_verdict, VerifierVerdict::NotRun);
assert!(t.policy_rejected.is_none());
}
#[tokio::test]
async fn no_run_means_no_turns_recorded() {
let tmp = TempDir::new().unwrap();
let state = loopback_state(tmp.path().join("journals"));
let addr = spawn_dispatcher(state.clone()).await;
let (mut ws, _) = connect_async(format!("ws://{addr}")).await.unwrap();
let out = serde_json::json!({ "exit_code": 0, "output_tail": "x" });
call_servicing_callbacks(
&mut ws,
"sub1",
"proposal.submit",
drive_proposal("a1", "no bracket"),
&out,
)
.await;
assert!(
state.runs.lock().await.is_empty(),
"no runs.start means no run, hence no recorded turns"
);
}
#[tokio::test]
async fn concurrent_record_run_turns_yield_contiguous_indices_and_parseable_file() {
use car_server_core::RunMeta;
let tmp = TempDir::new().unwrap();
let state = loopback_state(tmp.path().join("journals"));
state
.start_run(RunMeta {
run_id: "r-concurrent".to_string(),
agent_id: "agent-a".to_string(),
client_id: "c0".to_string(),
intent: "stress".to_string(),
outcome_description: None,
started_at: chrono::Utc::now(),
termination: None,
ended_at: None,
turns: Vec::new(),
})
.await;
fn batch(n: usize, tag: &str) -> Vec<RunRecord> {
(0..n)
.map(|i| {
RunRecord::Turn(RunTurn {
index: i, prompt: Some(format!("{tag}-{i}")),
tool: Some("drive_cli".to_string()),
parameters: serde_json::json!({ "prompt": format!("{tag}-{i}") }),
output: Some(serde_json::json!({ "exit_code": 0 })),
cli_outcome: None,
verifier_verdict: VerifierVerdict::NotRun,
policy_rejected: None,
})
})
.collect()
}
let per_batch = 25usize;
let s1 = state.clone();
let s2 = state.clone();
let h1 = tokio::spawn(async move {
s1.record_run_turns("r-concurrent", batch(per_batch, "A"))
.await
});
let h2 = tokio::spawn(async move {
s2.record_run_turns("r-concurrent", batch(per_batch, "B"))
.await
});
h1.await.unwrap();
h2.await.unwrap();
let turns = state.run_turns("r-concurrent").await;
let mut indices: Vec<usize> = turns
.iter()
.filter_map(|r| match r {
RunRecord::Turn(t) => Some(t.index),
_ => None,
})
.collect();
let total = per_batch * 2;
assert_eq!(indices.len(), total, "all turns recorded, none dropped");
let observed = indices.clone();
indices.sort_unstable();
indices.dedup();
assert_eq!(
indices.len(),
total,
"indices must be unique (no TOCTOU collision), got {observed:?}"
);
assert_eq!(*indices.first().unwrap(), 0);
assert_eq!(*indices.last().unwrap(), total - 1);
assert!(
observed.windows(2).all(|w| w[1] == w[0] + 1),
"in-memory turn indices must be contiguous & ordered, got {observed:?}"
);
let trace = state
.run_store
.get_run_trace("r-concurrent")
.expect("on-disk trace exists");
let disk_turns = trace
.iter()
.filter(|r| matches!(r, RunRecord::Turn(_)))
.count();
assert_eq!(
disk_turns, total,
"disk file must hold every turn, fully parseable (no interleaved write)"
);
}