car-server-core 0.25.0

Transport-neutral library for the CAR daemon JSON-RPC dispatcher (used by car-server and tokhn-daemon)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
//! Agent run tracing — per-turn recorder, end-to-end (U2).
//!
//! Drives `runs.start` then `proposal.submit` over a real `run_dispatch`
//! WebSocket session (mirroring `run_lifecycle.rs`) and asserts the
//! recorder captured, on the session's current run, each turn's prompt,
//! CLI outcome, verifier verdict, and policy rejections (R2 / R11):
//!
//! 1. A `drive_cli` turn records the exact prompt + `output_tail` +
//!    `exit_code`.
//! 2. A `check_outcome` turn records `pass` when `passed=true` and `fail`
//!    when `false`.
//! 3. A policy-rejected `drive_cli` records `policy_rejected` +
//!    `cli_outcome=not-run`, and the tool body never ran.
//! 4. A timed-out `drive_cli` records `cli_outcome=timeout` +
//!    `verifier_verdict=not-run`.
//! 5. A non-Bulldozer tool still records a generic turn (tool/params/
//!    output).
//!
//! The test client services the daemon's `tools.execute` callback inline
//! (the daemon calls back over the same WS during `proposal.submit`),
//! returning a fixed Bulldozer-shaped output, so prompt→output pairing is
//! exercised through the real execution path.

use car_proto::{CliOutcome, RunRecord, RunTurn, VerifierVerdict};
use car_memgine::MemgineEngine;
use car_server_core::{run_dispatch, ServerState, ServerStateConfig};
use futures::{SinkExt, StreamExt};
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::sync::Arc;
use tempfile::TempDir;
use tokio::net::TcpListener;
use tokio::sync::Mutex;
use tokio_tungstenite::{accept_async, connect_async, tungstenite::Message};

type Ws = tokio_tungstenite::WebSocketStream<
    tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
>;

fn loopback_state(journal_dir: std::path::PathBuf) -> Arc<ServerState> {
    let engine = Arc::new(Mutex::new(MemgineEngine::new(None)));
    let cfg = ServerStateConfig::new(journal_dir).with_shared_memgine(engine);
    Arc::new(ServerState::with_config(cfg))
}

async fn spawn_dispatcher(state: Arc<ServerState>) -> SocketAddr {
    let listener = TcpListener::bind(SocketAddr::V4(SocketAddrV4::new(
        Ipv4Addr::new(127, 0, 0, 1),
        0,
    )))
    .await
    .expect("bind loopback");
    let addr = listener.local_addr().expect("local_addr");
    tokio::spawn(async move {
        let (stream, peer) = listener.accept().await.expect("accept");
        let ws = accept_async(stream).await.expect("ws handshake");
        let (write, read) = ws.split();
        let _ = run_dispatch(read, Box::pin(write), peer.to_string(), state).await;
    });
    addr
}

async fn send(ws: &mut Ws, id: &str, method: &str, params: serde_json::Value) {
    ws.send(Message::Text(
        serde_json::json!({ "jsonrpc": "2.0", "id": id, "method": method, "params": params })
            .to_string()
            .into(),
    ))
    .await
    .expect("send request");
}

/// Read the next text frame as JSON.
async fn next_json(ws: &mut Ws) -> serde_json::Value {
    let text = ws
        .next()
        .await
        .expect("frame")
        .expect("frame ok")
        .into_text()
        .expect("text")
        .to_string();
    serde_json::from_str(&text).expect("parse json")
}

/// Send a request expected to return a `result`; servicing any inbound
/// `tools.execute` callback the daemon issues mid-flight by replying with
/// `tool_output` for the requested `action_id`. Returns the eventual
/// response to `id`.
async fn call_servicing_callbacks(
    ws: &mut Ws,
    id: &str,
    method: &str,
    params: serde_json::Value,
    tool_output: &serde_json::Value,
) -> serde_json::Value {
    send(ws, id, method, params).await;
    loop {
        let msg = next_json(ws).await;
        // A server-initiated tools.execute callback: reply with our fixed
        // Bulldozer-shaped output keyed to the callback's routing id.
        if msg.get("method").and_then(|m| m.as_str()) == Some("tools.execute") {
            let cb_id = msg["id"].as_str().expect("callback id").to_string();
            // The daemon takes the JSON-RPC `result` value verbatim as the
            // tool's output (handler.rs: `output: Some(result)`), so the
            // bare Bulldozer-shaped output goes directly in `result`.
            ws.send(Message::Text(
                serde_json::json!({
                    "jsonrpc": "2.0",
                    "id": cb_id,
                    "result": tool_output
                })
                .to_string()
                .into(),
            ))
            .await
            .expect("send callback reply");
            continue;
        }
        // The response to our request id.
        if msg.get("id").and_then(|v| v.as_str()) == Some(id) {
            return msg;
        }
    }
}

/// Register the tools the proposals use, schemaless (empty `{}` schema),
/// so action validation passes and the body actually dispatches through
/// the WS callback. Without this, an unregistered tool is `Rejected` by
/// the validator before execute — which would be indistinguishable from a
/// policy rejection in the trace.
async fn register_tools(ws: &mut Ws, names: &[&str]) {
    let tools: Vec<serde_json::Value> = names
        .iter()
        .map(|n| serde_json::json!({ "name": n, "description": "", "parameters": {} }))
        .collect();
    send(ws, "reg", "tools.register", serde_json::json!(tools)).await;
    let resp = next_json(ws).await;
    assert!(resp.get("error").is_none(), "tools.register failed: {resp}");
}

async fn start_run(ws: &mut Ws, agent_id: &str) -> String {
    send(
        ws,
        "start",
        "runs.start",
        serde_json::json!({ "agent_id": agent_id, "intent": "do the thing" }),
    )
    .await;
    let resp = next_json(ws).await;
    assert!(resp.get("error").is_none(), "runs.start failed: {resp}");
    resp["result"]["run_id"].as_str().unwrap().to_string()
}

/// The recorded turns for a run, decoded back into `RunTurn`s.
async fn recorded_turns(state: &Arc<ServerState>, run_id: &str) -> Vec<RunTurn> {
    state
        .run_turns(run_id)
        .await
        .into_iter()
        .map(|rec| match rec {
            RunRecord::Turn(t) => t,
            other => panic!("expected only Turn records, got {other:?}"),
        })
        .collect()
}

/// A single `drive_cli` tool-call proposal.
fn drive_proposal(action_id: &str, prompt: &str) -> serde_json::Value {
    serde_json::json!({
        "proposal": {
            "id": "p-drive",
            "source": "test",
            "actions": [{
                "id": action_id,
                "type": "tool_call",
                "tool": "drive_cli",
                "parameters": { "cli": "claude", "prompt": prompt }
            }]
        }
    })
}

fn check_proposal(action_id: &str, command: &str) -> serde_json::Value {
    serde_json::json!({
        "proposal": {
            "id": "p-check",
            "source": "test",
            "actions": [{
                "id": action_id,
                "type": "tool_call",
                "tool": "check_outcome",
                "parameters": { "command": command }
            }]
        }
    })
}

#[tokio::test]
async fn drive_cli_turn_records_prompt_output_and_exit_code() {
    let tmp = TempDir::new().unwrap();
    let state = loopback_state(tmp.path().join("journals"));
    let addr = spawn_dispatcher(state.clone()).await;
    let (mut ws, _) = connect_async(format!("ws://{addr}")).await.unwrap();

    register_tools(&mut ws, &["drive_cli"]).await;
    let run_id = start_run(&mut ws, "agent-a").await;

    let prompt = "make the failing test pass";
    let out = serde_json::json!({
        "cli": "claude", "exit_code": 0, "timed_out": false,
        "output_tail": "1 passed, 0 failed"
    });
    let resp = call_servicing_callbacks(
        &mut ws,
        "sub1",
        "proposal.submit",
        drive_proposal("a1", prompt),
        &out,
    )
    .await;
    assert!(resp.get("error").is_none(), "submit failed: {resp}");

    let turns = recorded_turns(&state, &run_id).await;
    assert_eq!(turns.len(), 1, "exactly one turn recorded");
    let t = &turns[0];
    assert_eq!(t.index, 0);
    assert_eq!(t.prompt.as_deref(), Some(prompt), "exact prompt recorded");
    assert_eq!(t.tool.as_deref(), Some("drive_cli"));
    assert_eq!(t.cli_outcome, Some(CliOutcome::Exited { code: 0 }));
    assert_eq!(t.verifier_verdict, VerifierVerdict::NotRun);
    // output_tail preserved verbatim from the tool result.
    assert_eq!(
        t.output.as_ref().unwrap().get("output_tail").unwrap(),
        &serde_json::json!("1 passed, 0 failed")
    );
}

#[tokio::test]
async fn check_outcome_records_pass_then_fail() {
    let tmp = TempDir::new().unwrap();
    let state = loopback_state(tmp.path().join("journals"));
    let addr = spawn_dispatcher(state.clone()).await;
    let (mut ws, _) = connect_async(format!("ws://{addr}")).await.unwrap();

    register_tools(&mut ws, &["check_outcome"]).await;
    let run_id = start_run(&mut ws, "agent-a").await;

    // passed = true → Pass
    let pass_out = serde_json::json!({ "passed": true, "output_tail": "ok" });
    call_servicing_callbacks(
        &mut ws,
        "v1",
        "proposal.submit",
        check_proposal("c1", "test -f built"),
        &pass_out,
    )
    .await;

    // passed = false → Fail (healthy re-prod, amber — not a run failure)
    let fail_out = serde_json::json!({ "passed": false, "output_tail": "still failing" });
    call_servicing_callbacks(
        &mut ws,
        "v2",
        "proposal.submit",
        check_proposal("c2", "test -f built"),
        &fail_out,
    )
    .await;

    let turns = recorded_turns(&state, &run_id).await;
    assert_eq!(turns.len(), 2);
    assert_eq!(turns[0].verifier_verdict, VerifierVerdict::Pass);
    assert_eq!(turns[0].index, 0);
    assert_eq!(turns[1].verifier_verdict, VerifierVerdict::Fail);
    // Monotonic index across proposals in the same run.
    assert_eq!(turns[1].index, 1);
    // A verifier turn has no CLI outcome.
    assert!(turns[1].cli_outcome.is_none());
}

#[tokio::test]
async fn policy_rejected_drive_records_rejection_and_tool_never_runs() {
    let tmp = TempDir::new().unwrap();
    let state = loopback_state(tmp.path().join("journals"));
    let addr = spawn_dispatcher(state.clone()).await;
    let (mut ws, _) = connect_async(format!("ws://{addr}")).await.unwrap();

    register_tools(&mut ws, &["drive_cli"]).await;
    let run_id = start_run(&mut ws, "agent-a").await;

    // Register a deny_tool_param policy that blocks a destructive prompt.
    send(
        &mut ws,
        "pol",
        "policy.register",
        serde_json::json!({
            "name": "no-destructive",
            "rule": "deny_tool_param",
            "target": "drive_cli",
            "key": "prompt",
            "pattern": "rm -rf"
        }),
    )
    .await;
    let pol_resp = next_json(&mut ws).await;
    assert!(pol_resp.get("error").is_none(), "policy.register failed: {pol_resp}");

    // The tool output here would only be sent IF the tool body ran — it
    // must NOT, because the policy rejects before execute. We mark it so a
    // (wrong) callback would be visible.
    let never = serde_json::json!({ "exit_code": 0, "output_tail": "SHOULD NOT APPEAR" });
    let resp = call_servicing_callbacks(
        &mut ws,
        "sub1",
        "proposal.submit",
        drive_proposal("a1", "please run rm -rf / to clean up"),
        &never,
    )
    .await;
    assert!(resp.get("error").is_none(), "submit failed: {resp}");

    let turns = recorded_turns(&state, &run_id).await;
    assert_eq!(turns.len(), 1);
    let t = &turns[0];
    let pr = t.policy_rejected.as_ref().expect("policy_rejected recorded");
    assert!(pr.rule.contains("no-destructive"), "rule carries policy name: {}", pr.rule);
    assert_eq!(pr.param.as_deref(), Some("prompt"));
    // The tool body never ran: not-run CLI outcome, no verifier verdict,
    // and crucially no tool output (the "SHOULD NOT APPEAR" output never
    // landed because no callback fired).
    assert_eq!(t.cli_outcome, None);
    assert_eq!(t.verifier_verdict, VerifierVerdict::NotRun);
    assert!(
        t.output.is_none(),
        "rejected action must have no tool output (body never ran), got {:?}",
        t.output
    );
}

#[tokio::test]
async fn timed_out_drive_records_timeout_and_not_run() {
    let tmp = TempDir::new().unwrap();
    let state = loopback_state(tmp.path().join("journals"));
    let addr = spawn_dispatcher(state.clone()).await;
    let (mut ws, _) = connect_async(format!("ws://{addr}")).await.unwrap();

    register_tools(&mut ws, &["drive_cli"]).await;
    let run_id = start_run(&mut ws, "agent-a").await;

    let out = serde_json::json!({
        "cli": "claude", "exit_code": null, "timed_out": true,
        "output_tail": "...(killed after 180s)"
    });
    call_servicing_callbacks(
        &mut ws,
        "sub1",
        "proposal.submit",
        drive_proposal("a1", "a very long task"),
        &out,
    )
    .await;

    let turns = recorded_turns(&state, &run_id).await;
    assert_eq!(turns.len(), 1);
    assert_eq!(turns[0].cli_outcome, Some(CliOutcome::Timeout));
    assert_eq!(turns[0].verifier_verdict, VerifierVerdict::NotRun);
}

#[tokio::test]
async fn non_bulldozer_tool_records_generic_turn() {
    let tmp = TempDir::new().unwrap();
    let state = loopback_state(tmp.path().join("journals"));
    let addr = spawn_dispatcher(state.clone()).await;
    let (mut ws, _) = connect_async(format!("ws://{addr}")).await.unwrap();

    register_tools(&mut ws, &["search"]).await;
    let run_id = start_run(&mut ws, "agent-a").await;

    let proposal = serde_json::json!({
        "proposal": {
            "id": "p-search",
            "source": "test",
            "actions": [{
                "id": "s1",
                "type": "tool_call",
                "tool": "search",
                "parameters": { "query": "rust ownership" }
            }]
        }
    });
    let out = serde_json::json!({ "hits": ["a", "b"] });
    call_servicing_callbacks(&mut ws, "sub1", "proposal.submit", proposal, &out).await;

    let turns = recorded_turns(&state, &run_id).await;
    assert_eq!(turns.len(), 1);
    let t = &turns[0];
    assert_eq!(t.tool.as_deref(), Some("search"));
    assert_eq!(
        t.parameters.get("query").unwrap(),
        &serde_json::json!("rust ownership")
    );
    assert_eq!(t.output, Some(out));
    // No Bulldozer classification for an unknown tool.
    assert_eq!(t.cli_outcome, None);
    assert_eq!(t.verifier_verdict, VerifierVerdict::NotRun);
    assert!(t.policy_rejected.is_none());
}

#[tokio::test]
async fn no_run_means_no_turns_recorded() {
    // proposal.submit without an open run records nothing (no current_run_id).
    let tmp = TempDir::new().unwrap();
    let state = loopback_state(tmp.path().join("journals"));
    let addr = spawn_dispatcher(state.clone()).await;
    let (mut ws, _) = connect_async(format!("ws://{addr}")).await.unwrap();

    let out = serde_json::json!({ "exit_code": 0, "output_tail": "x" });
    call_servicing_callbacks(
        &mut ws,
        "sub1",
        "proposal.submit",
        drive_proposal("a1", "no bracket"),
        &out,
    )
    .await;

    // Nothing was bracketed, so the runs registry is empty.
    assert!(
        state.runs.lock().await.is_empty(),
        "no runs.start means no run, hence no recorded turns"
    );
}

/// FIX 6 (defense-in-depth): two concurrent `record_run_turns` batches on a
/// single run must yield contiguous, correctly-ordered turn indices (the
/// index is re-stamped under the `runs` lock from the live append position,
/// not a pre-read TOCTOU `start_index`) AND a parseable on-disk file (per-run
/// write serialization + single `write_all` so the two batches don't
/// interleave mid-record).
#[tokio::test]
async fn concurrent_record_run_turns_yield_contiguous_indices_and_parseable_file() {
    use car_server_core::RunMeta;

    let tmp = TempDir::new().unwrap();
    let state = loopback_state(tmp.path().join("journals"));

    // Open a run directly on the registry (no WS needed for this path).
    state
        .start_run(RunMeta {
            run_id: "r-concurrent".to_string(),
            agent_id: "agent-a".to_string(),
            client_id: "c0".to_string(),
            intent: "stress".to_string(),
            outcome_description: None,
            started_at: chrono::Utc::now(),
            termination: None,
            ended_at: None,
            turns: Vec::new(),
        })
        .await;

    // Build a batch of `n` turns. The indices we pass here are deliberately
    // WRONG/overlapping (both batches claim 0..n) — the fix must re-stamp
    // them authoritatively under the lock, so the provisional value is moot.
    fn batch(n: usize, tag: &str) -> Vec<RunRecord> {
        (0..n)
            .map(|i| {
                RunRecord::Turn(RunTurn {
                    index: i, // intentionally collides across the two batches
                    prompt: Some(format!("{tag}-{i}")),
                    tool: Some("drive_cli".to_string()),
                    parameters: serde_json::json!({ "prompt": format!("{tag}-{i}") }),
                    output: Some(serde_json::json!({ "exit_code": 0 })),
                    cli_outcome: None,
                    verifier_verdict: VerifierVerdict::NotRun,
                    policy_rejected: None,
                })
            })
            .collect()
    }

    let per_batch = 25usize;
    let s1 = state.clone();
    let s2 = state.clone();
    let h1 = tokio::spawn(async move {
        s1.record_run_turns("r-concurrent", batch(per_batch, "A"))
            .await
    });
    let h2 = tokio::spawn(async move {
        s2.record_run_turns("r-concurrent", batch(per_batch, "B"))
            .await
    });
    h1.await.unwrap();
    h2.await.unwrap();

    // In-memory: indices are contiguous 0..2*per_batch with no gaps/dups.
    let turns = state.run_turns("r-concurrent").await;
    let mut indices: Vec<usize> = turns
        .iter()
        .filter_map(|r| match r {
            RunRecord::Turn(t) => Some(t.index),
            _ => None,
        })
        .collect();
    let total = per_batch * 2;
    assert_eq!(indices.len(), total, "all turns recorded, none dropped");
    // Already in append order; assert it's exactly 0..total with no
    // collisions (the TOCTOU would have produced two 0..per_batch runs).
    let observed = indices.clone();
    indices.sort_unstable();
    indices.dedup();
    assert_eq!(
        indices.len(),
        total,
        "indices must be unique (no TOCTOU collision), got {observed:?}"
    );
    assert_eq!(*indices.first().unwrap(), 0);
    assert_eq!(*indices.last().unwrap(), total - 1);
    // Buffer order matches index order (monotonic, contiguous).
    assert!(
        observed.windows(2).all(|w| w[1] == w[0] + 1),
        "in-memory turn indices must be contiguous & ordered, got {observed:?}"
    );

    // On disk: the file is fully parseable (no torn/interleaved line) and
    // carries exactly `total` Turn records.
    let trace = state
        .run_store
        .get_run_trace("r-concurrent")
        .expect("on-disk trace exists");
    let disk_turns = trace
        .iter()
        .filter(|r| matches!(r, RunRecord::Turn(_)))
        .count();
    assert_eq!(
        disk_turns, total,
        "disk file must hold every turn, fully parseable (no interleaved write)"
    );
}