use car_server_core::{run_dispatch, RunStatus, RunStore, ServerState, ServerStateConfig};
use car_memgine::MemgineEngine;
use futures::{SinkExt, StreamExt};
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::path::Path;
use std::sync::Arc;
use tempfile::TempDir;
use tokio::net::TcpListener;
use tokio::sync::Mutex;
use tokio_tungstenite::{accept_async, connect_async, tungstenite::Message};
type Ws = tokio_tungstenite::WebSocketStream<
tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
>;
fn state_over(car_dir: &Path) -> Arc<ServerState> {
let engine = Arc::new(Mutex::new(MemgineEngine::new(None)));
let cfg = ServerStateConfig::new(car_dir.join("journals")).with_shared_memgine(engine);
Arc::new(ServerState::with_config(cfg))
}
fn store_over(car_dir: &Path) -> RunStore {
RunStore::from_journal_dir(&car_dir.join("journals"))
}
async fn spawn_dispatcher(state: Arc<ServerState>) -> SocketAddr {
let listener = TcpListener::bind(SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(127, 0, 0, 1),
0,
)))
.await
.expect("bind loopback");
let addr = listener.local_addr().expect("local_addr");
tokio::spawn(async move {
let (stream, peer) = listener.accept().await.expect("accept");
let ws = accept_async(stream).await.expect("ws handshake");
let (write, read) = ws.split();
let _ = run_dispatch(read, Box::pin(write), peer.to_string(), state).await;
});
addr
}
async fn send(ws: &mut Ws, id: &str, method: &str, params: serde_json::Value) {
ws.send(Message::Text(
serde_json::json!({ "jsonrpc": "2.0", "id": id, "method": method, "params": params })
.to_string()
.into(),
))
.await
.expect("send request");
}
async fn next_json(ws: &mut Ws) -> serde_json::Value {
let text = ws
.next()
.await
.expect("frame")
.expect("frame ok")
.into_text()
.expect("text")
.to_string();
serde_json::from_str(&text).expect("parse json")
}
async fn call_servicing_callbacks(
ws: &mut Ws,
id: &str,
method: &str,
params: serde_json::Value,
tool_output: &serde_json::Value,
) -> serde_json::Value {
send(ws, id, method, params).await;
loop {
let msg = next_json(ws).await;
if msg.get("method").and_then(|m| m.as_str()) == Some("tools.execute") {
let cb_id = msg["id"].as_str().expect("callback id").to_string();
ws.send(Message::Text(
serde_json::json!({ "jsonrpc": "2.0", "id": cb_id, "result": tool_output })
.to_string()
.into(),
))
.await
.expect("send callback reply");
continue;
}
if msg.get("id").and_then(|v| v.as_str()) == Some(id) {
return msg;
}
}
}
async fn call_ok(
ws: &mut Ws,
id: &str,
method: &str,
params: serde_json::Value,
) -> serde_json::Value {
send(ws, id, method, params).await;
let resp = next_json(ws).await;
assert!(resp.get("error").is_none(), "{method} failed: {resp}");
resp["result"].clone()
}
async fn register_tool(ws: &mut Ws, name: &str) {
let tools = serde_json::json!([{ "name": name, "description": "", "parameters": {} }]);
let resp = {
send(ws, "reg", "tools.register", tools).await;
next_json(ws).await
};
assert!(resp.get("error").is_none(), "tools.register failed: {resp}");
}
fn drive_proposal(action_id: &str, prompt: &str) -> serde_json::Value {
serde_json::json!({
"proposal": {
"id": "p-drive",
"source": "test",
"actions": [{
"id": action_id,
"type": "tool_call",
"tool": "drive_cli",
"parameters": { "cli": "claude", "prompt": prompt }
}]
}
})
}
fn drive_output() -> serde_json::Value {
serde_json::json!({
"cli": "claude", "exit_code": 0, "timed_out": false,
"output_tail": "1 passed, 0 failed"
})
}
fn success_outcome() -> serde_json::Value {
serde_json::json!({
"status": "success",
"summary": "did the thing",
"evidence": [],
"metrics": {
"turns": 1, "tool_calls": 1, "duration_ms": 12.0,
"retries": 0, "actions_succeeded": 1, "actions_failed": 0
},
"timestamp": chrono::Utc::now().to_rfc3339()
})
}
fn first_turn_prompt(trace: &[car_proto::RunRecord]) -> Option<String> {
trace.iter().find_map(|r| match r {
car_proto::RunRecord::Turn(t) => t.prompt.clone(),
_ => None,
})
}
#[tokio::test]
async fn completed_run_replays_from_disk_after_restart() {
let tmp = TempDir::new().unwrap();
let car_dir = tmp.path().to_path_buf();
let run_id = {
let state = state_over(&car_dir);
let addr = spawn_dispatcher(state.clone()).await;
let (mut ws, _) = connect_async(format!("ws://{addr}")).await.unwrap();
register_tool(&mut ws, "drive_cli").await;
let started = call_ok(
&mut ws,
"s1",
"runs.start",
serde_json::json!({ "agent_id": "agent-a", "intent": "fix the build" }),
)
.await;
let run_id = started["run_id"].as_str().unwrap().to_string();
let resp = call_servicing_callbacks(
&mut ws,
"p1",
"proposal.submit",
drive_proposal("a1", "make it green"),
&drive_output(),
)
.await;
assert!(resp.get("error").is_none(), "submit failed: {resp}");
call_ok(
&mut ws,
"c1",
"runs.complete",
serde_json::json!({ "run_id": run_id, "outcome": success_outcome() }),
)
.await;
ws.close(None).await.ok();
drop(ws);
tokio::time::sleep(std::time::Duration::from_millis(400)).await;
run_id
};
let store = store_over(&car_dir);
let summaries = store.list_runs("agent-a");
assert_eq!(summaries.len(), 1, "the completed run lists from disk");
let summary = &summaries[0];
assert_eq!(summary.run_id, run_id);
assert_eq!(summary.agent_id, "agent-a");
assert_eq!(summary.intent, "fix the build");
assert_eq!(
summary.status,
RunStatus::Completed,
"a completed run must replay as Completed, not InProgress/Incomplete"
);
assert_eq!(summary.turn_count, 1, "the one turn is persisted");
let trace = store
.get_run_trace(&run_id)
.expect("trace loads from disk after restart");
assert!(matches!(
trace.first(),
Some(car_proto::RunRecord::Started(_))
));
assert!(matches!(trace.last(), Some(car_proto::RunRecord::Ended(_))));
assert_eq!(
first_turn_prompt(&trace).as_deref(),
Some("make it green"),
"the persisted turn carries the exact prompt the harness sent"
);
assert_eq!(store.agent_for_run(&run_id).as_deref(), Some("agent-a"));
}
#[tokio::test]
async fn mid_run_drop_persists_incomplete_to_disk() {
let tmp = TempDir::new().unwrap();
let car_dir = tmp.path().to_path_buf();
let run_id = {
let state = state_over(&car_dir);
let addr = spawn_dispatcher(state.clone()).await;
let (mut ws, _) = connect_async(format!("ws://{addr}")).await.unwrap();
let started = call_ok(
&mut ws,
"s1",
"runs.start",
serde_json::json!({ "agent_id": "agent-b", "intent": "long task" }),
)
.await;
let run_id = started["run_id"].as_str().unwrap().to_string();
ws.close(None).await.ok();
drop(ws);
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
run_id
};
let store = store_over(&car_dir);
let summaries = store.list_runs("agent-b");
assert_eq!(summaries.len(), 1);
assert_eq!(summaries[0].run_id, run_id);
assert_eq!(
summaries[0].status,
RunStatus::Incomplete,
"a mid-run drop must persist Incomplete to disk"
);
}
#[tokio::test]
async fn distinct_runs_persist_to_separate_files_no_cross_contamination() {
let tmp = TempDir::new().unwrap();
let car_dir = tmp.path().to_path_buf();
{
let state = state_over(&car_dir);
let addr = spawn_dispatcher(state.clone()).await;
let (mut ws, _) = connect_async(format!("ws://{addr}")).await.unwrap();
register_tool(&mut ws, "drive_cli").await;
let r1 = call_ok(
&mut ws,
"s1",
"runs.start",
serde_json::json!({ "agent_id": "agent-a", "intent": "first" }),
)
.await;
call_servicing_callbacks(
&mut ws,
"p1",
"proposal.submit",
drive_proposal("a1", "a-prompt"),
&drive_output(),
)
.await;
call_ok(
&mut ws,
"c1",
"runs.complete",
serde_json::json!({ "run_id": r1["run_id"], "outcome": success_outcome() }),
)
.await;
let r2 = call_ok(
&mut ws,
"s2",
"runs.start",
serde_json::json!({ "agent_id": "agent-c", "intent": "second" }),
)
.await;
call_servicing_callbacks(
&mut ws,
"p2",
"proposal.submit",
drive_proposal("b1", "c-prompt"),
&drive_output(),
)
.await;
call_ok(
&mut ws,
"c2",
"runs.complete",
serde_json::json!({ "run_id": r2["run_id"], "outcome": success_outcome() }),
)
.await;
ws.close(None).await.ok();
drop(ws);
tokio::time::sleep(std::time::Duration::from_millis(300)).await;
}
let store = store_over(&car_dir);
let a = store.list_runs("agent-a");
let c = store.list_runs("agent-c");
assert_eq!(a.len(), 1, "agent-a has one run");
assert_eq!(c.len(), 1, "agent-c has one run");
let a_prompt = first_turn_prompt(&store.get_run_trace(&a[0].run_id).unwrap());
let c_prompt = first_turn_prompt(&store.get_run_trace(&c[0].run_id).unwrap());
assert_eq!(a_prompt.as_deref(), Some("a-prompt"));
assert_eq!(c_prompt.as_deref(), Some("c-prompt"));
}