car-server-core 0.25.0

Transport-neutral library for the CAR daemon JSON-RPC dispatcher (used by car-server and tokhn-daemon)
//! Agent run tracing — disk-backed run store, restart survival (U3).
//!
//! Drives `runs.start` / `proposal.submit` / `runs.complete` over a real
//! `run_dispatch` WebSocket session against one `~/.car` base dir, then
//! constructs a fresh `RunStore` over the same base (empty memory — the
//! simulated daemon restart, the way U5's read RPCs will load on a
//! restarted daemon) and asserts the run's trace replays from disk:
//!
//! 1. A completed run's full trace (RunStarted + turns + RunEnded) is
//!    readable from disk after a simulated restart (R4).
//! 2. The per-turn prompt captured by the U2 recorder is persisted and
//!    survives the restart (R2 + R4 end-to-end).
//! 3. `run_id -> agent_id` resolves from disk so a restart can authorize
//!    replay without the in-memory registry (U5 contract).
//! 4. A mid-run drop persists the `Incomplete` terminal marker, so the
//!    restarted store reports `Incomplete` (R5).
//! 5. Distinct runs persist to separate files — no cross-contamination
//!    of prompts between agents (R1).
//!
//! The test client services the daemon's `tools.execute` callback inline
//! (the daemon calls back over the same WS during `proposal.submit`),
//! mirroring `run_trace.rs`, so the turn is captured through the real
//! execution path and persisted by the U3 wiring.

use car_server_core::{run_dispatch, RunStatus, RunStore, ServerState, ServerStateConfig};
use car_memgine::MemgineEngine;
use futures::{SinkExt, StreamExt};
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::path::Path;
use std::sync::Arc;
use tempfile::TempDir;
use tokio::net::TcpListener;
use tokio::sync::Mutex;
use tokio_tungstenite::{accept_async, connect_async, tungstenite::Message};

type Ws = tokio_tungstenite::WebSocketStream<
    tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
>;

/// Build a `ServerState` whose journal dir is `<car_dir>/journals` — so
/// the run store roots at the sibling `<car_dir>/runs`, exactly as the
/// daemon resolves it. A fresh `RunStore` built over the same `car_dir`
/// reads the same on-disk run store (the restart simulation).
fn state_over(car_dir: &Path) -> Arc<ServerState> {
    let engine = Arc::new(Mutex::new(MemgineEngine::new(None)));
    let cfg = ServerStateConfig::new(car_dir.join("journals")).with_shared_memgine(engine);
    Arc::new(ServerState::with_config(cfg))
}

/// Open a fresh `RunStore` over `<car_dir>/runs` — the way U5's read RPCs
/// will load from disk on a restarted daemon with empty memory.
fn store_over(car_dir: &Path) -> RunStore {
    RunStore::from_journal_dir(&car_dir.join("journals"))
}

async fn spawn_dispatcher(state: Arc<ServerState>) -> SocketAddr {
    let listener = TcpListener::bind(SocketAddr::V4(SocketAddrV4::new(
        Ipv4Addr::new(127, 0, 0, 1),
        0,
    )))
    .await
    .expect("bind loopback");
    let addr = listener.local_addr().expect("local_addr");
    tokio::spawn(async move {
        let (stream, peer) = listener.accept().await.expect("accept");
        let ws = accept_async(stream).await.expect("ws handshake");
        let (write, read) = ws.split();
        let _ = run_dispatch(read, Box::pin(write), peer.to_string(), state).await;
    });
    addr
}

async fn send(ws: &mut Ws, id: &str, method: &str, params: serde_json::Value) {
    ws.send(Message::Text(
        serde_json::json!({ "jsonrpc": "2.0", "id": id, "method": method, "params": params })
            .to_string()
            .into(),
    ))
    .await
    .expect("send request");
}

async fn next_json(ws: &mut Ws) -> serde_json::Value {
    let text = ws
        .next()
        .await
        .expect("frame")
        .expect("frame ok")
        .into_text()
        .expect("text")
        .to_string();
    serde_json::from_str(&text).expect("parse json")
}

/// Send a request that may trigger a server-initiated `tools.execute`
/// callback; service the callback with `tool_output`, then return the
/// response to `id`. Mirrors `run_trace.rs`.
async fn call_servicing_callbacks(
    ws: &mut Ws,
    id: &str,
    method: &str,
    params: serde_json::Value,
    tool_output: &serde_json::Value,
) -> serde_json::Value {
    send(ws, id, method, params).await;
    loop {
        let msg = next_json(ws).await;
        if msg.get("method").and_then(|m| m.as_str()) == Some("tools.execute") {
            let cb_id = msg["id"].as_str().expect("callback id").to_string();
            ws.send(Message::Text(
                serde_json::json!({ "jsonrpc": "2.0", "id": cb_id, "result": tool_output })
                    .to_string()
                    .into(),
            ))
            .await
            .expect("send callback reply");
            continue;
        }
        if msg.get("id").and_then(|v| v.as_str()) == Some(id) {
            return msg;
        }
    }
}

/// Plain request expecting a `result`, no callbacks.
async fn call_ok(
    ws: &mut Ws,
    id: &str,
    method: &str,
    params: serde_json::Value,
) -> serde_json::Value {
    send(ws, id, method, params).await;
    let resp = next_json(ws).await;
    assert!(resp.get("error").is_none(), "{method} failed: {resp}");
    resp["result"].clone()
}

/// Register a schemaless tool so the action validates and dispatches
/// through the WS callback (rather than being rejected pre-execute).
async fn register_tool(ws: &mut Ws, name: &str) {
    let tools = serde_json::json!([{ "name": name, "description": "", "parameters": {} }]);
    let resp = {
        send(ws, "reg", "tools.register", tools).await;
        next_json(ws).await
    };
    assert!(resp.get("error").is_none(), "tools.register failed: {resp}");
}

fn drive_proposal(action_id: &str, prompt: &str) -> serde_json::Value {
    serde_json::json!({
        "proposal": {
            "id": "p-drive",
            "source": "test",
            "actions": [{
                "id": action_id,
                "type": "tool_call",
                "tool": "drive_cli",
                "parameters": { "cli": "claude", "prompt": prompt }
            }]
        }
    })
}

fn drive_output() -> serde_json::Value {
    serde_json::json!({
        "cli": "claude", "exit_code": 0, "timed_out": false,
        "output_tail": "1 passed, 0 failed"
    })
}

fn success_outcome() -> serde_json::Value {
    serde_json::json!({
        "status": "success",
        "summary": "did the thing",
        "evidence": [],
        "metrics": {
            "turns": 1, "tool_calls": 1, "duration_ms": 12.0,
            "retries": 0, "actions_succeeded": 1, "actions_failed": 0
        },
        "timestamp": chrono::Utc::now().to_rfc3339()
    })
}

fn first_turn_prompt(trace: &[car_proto::RunRecord]) -> Option<String> {
    trace.iter().find_map(|r| match r {
        car_proto::RunRecord::Turn(t) => t.prompt.clone(),
        _ => None,
    })
}

#[tokio::test]
async fn completed_run_replays_from_disk_after_restart() {
    let tmp = TempDir::new().unwrap();
    let car_dir = tmp.path().to_path_buf();

    // --- First daemon: start, submit a turn, complete. ---
    let run_id = {
        let state = state_over(&car_dir);
        let addr = spawn_dispatcher(state.clone()).await;
        let (mut ws, _) = connect_async(format!("ws://{addr}")).await.unwrap();

        register_tool(&mut ws, "drive_cli").await;
        let started = call_ok(
            &mut ws,
            "s1",
            "runs.start",
            serde_json::json!({ "agent_id": "agent-a", "intent": "fix the build" }),
        )
        .await;
        let run_id = started["run_id"].as_str().unwrap().to_string();

        // One turn — the recorder persists its prompt to disk.
        let resp = call_servicing_callbacks(
            &mut ws,
            "p1",
            "proposal.submit",
            drive_proposal("a1", "make it green"),
            &drive_output(),
        )
        .await;
        assert!(resp.get("error").is_none(), "submit failed: {resp}");

        call_ok(
            &mut ws,
            "c1",
            "runs.complete",
            serde_json::json!({ "run_id": run_id, "outcome": success_outcome() }),
        )
        .await;

        ws.close(None).await.ok();
        drop(ws);
        // Let the disconnect sweep settle — it must NOT overwrite the
        // terminal Success record.
        tokio::time::sleep(std::time::Duration::from_millis(400)).await;
        run_id
    };

    // --- Simulated restart: fresh store, empty memory, same disk. ---
    let store = store_over(&car_dir);
    let summaries = store.list_runs("agent-a");
    assert_eq!(summaries.len(), 1, "the completed run lists from disk");
    let summary = &summaries[0];
    assert_eq!(summary.run_id, run_id);
    assert_eq!(summary.agent_id, "agent-a");
    assert_eq!(summary.intent, "fix the build");
    assert_eq!(
        summary.status,
        RunStatus::Completed,
        "a completed run must replay as Completed, not InProgress/Incomplete"
    );
    assert_eq!(summary.turn_count, 1, "the one turn is persisted");

    // get_run_trace by run_id alone (the U5 read path) loads the full
    // ordered trail from disk after the restart.
    let trace = store
        .get_run_trace(&run_id)
        .expect("trace loads from disk after restart");
    assert!(matches!(
        trace.first(),
        Some(car_proto::RunRecord::Started(_))
    ));
    assert!(matches!(trace.last(), Some(car_proto::RunRecord::Ended(_))));
    assert_eq!(
        first_turn_prompt(&trace).as_deref(),
        Some("make it green"),
        "the persisted turn carries the exact prompt the harness sent"
    );

    // run_id -> agent_id resolves from disk (U5 authorization input).
    assert_eq!(store.agent_for_run(&run_id).as_deref(), Some("agent-a"));
}

#[tokio::test]
async fn mid_run_drop_persists_incomplete_to_disk() {
    let tmp = TempDir::new().unwrap();
    let car_dir = tmp.path().to_path_buf();

    let run_id = {
        let state = state_over(&car_dir);
        let addr = spawn_dispatcher(state.clone()).await;
        let (mut ws, _) = connect_async(format!("ws://{addr}")).await.unwrap();

        let started = call_ok(
            &mut ws,
            "s1",
            "runs.start",
            serde_json::json!({ "agent_id": "agent-b", "intent": "long task" }),
        )
        .await;
        let run_id = started["run_id"].as_str().unwrap().to_string();

        // Drop mid-run WITHOUT runs.complete. The disconnect sweep writes
        // the Incomplete terminal to disk past the grace window.
        ws.close(None).await.ok();
        drop(ws);
        tokio::time::sleep(std::time::Duration::from_millis(500)).await;
        run_id
    };

    // Restart: the orphaned run reports Incomplete from disk (R5), not
    // InProgress — the marker was persisted on disconnect.
    let store = store_over(&car_dir);
    let summaries = store.list_runs("agent-b");
    assert_eq!(summaries.len(), 1);
    assert_eq!(summaries[0].run_id, run_id);
    assert_eq!(
        summaries[0].status,
        RunStatus::Incomplete,
        "a mid-run drop must persist Incomplete to disk"
    );
}

#[tokio::test]
async fn distinct_runs_persist_to_separate_files_no_cross_contamination() {
    let tmp = TempDir::new().unwrap();
    let car_dir = tmp.path().to_path_buf();

    {
        let state = state_over(&car_dir);
        let addr = spawn_dispatcher(state.clone()).await;
        let (mut ws, _) = connect_async(format!("ws://{addr}")).await.unwrap();

        register_tool(&mut ws, "drive_cli").await;

        // Two sequential runs on the same connection for two agents.
        let r1 = call_ok(
            &mut ws,
            "s1",
            "runs.start",
            serde_json::json!({ "agent_id": "agent-a", "intent": "first" }),
        )
        .await;
        call_servicing_callbacks(
            &mut ws,
            "p1",
            "proposal.submit",
            drive_proposal("a1", "a-prompt"),
            &drive_output(),
        )
        .await;
        call_ok(
            &mut ws,
            "c1",
            "runs.complete",
            serde_json::json!({ "run_id": r1["run_id"], "outcome": success_outcome() }),
        )
        .await;

        let r2 = call_ok(
            &mut ws,
            "s2",
            "runs.start",
            serde_json::json!({ "agent_id": "agent-c", "intent": "second" }),
        )
        .await;
        call_servicing_callbacks(
            &mut ws,
            "p2",
            "proposal.submit",
            drive_proposal("b1", "c-prompt"),
            &drive_output(),
        )
        .await;
        call_ok(
            &mut ws,
            "c2",
            "runs.complete",
            serde_json::json!({ "run_id": r2["run_id"], "outcome": success_outcome() }),
        )
        .await;

        ws.close(None).await.ok();
        drop(ws);
        tokio::time::sleep(std::time::Duration::from_millis(300)).await;
    }

    let store = store_over(&car_dir);
    // Each agent has exactly one run; the prompts didn't cross.
    let a = store.list_runs("agent-a");
    let c = store.list_runs("agent-c");
    assert_eq!(a.len(), 1, "agent-a has one run");
    assert_eq!(c.len(), 1, "agent-c has one run");

    let a_prompt = first_turn_prompt(&store.get_run_trace(&a[0].run_id).unwrap());
    let c_prompt = first_turn_prompt(&store.get_run_trace(&c[0].run_id).unwrap());
    assert_eq!(a_prompt.as_deref(), Some("a-prompt"));
    assert_eq!(c_prompt.as_deref(), Some("c-prompt"));
}