car-server-core 0.8.0

Transport-neutral library for the CAR daemon JSON-RPC dispatcher (used by car-server and tokhn-daemon)
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
//! End-to-end test for `car_ffi_common::proxy::DaemonClient` against
//! a real `car-server-core` dispatcher.
//!
//! The bug we exist to guard against: the prior proxy infrastructure
//! opened a fresh WebSocket per JSON-RPC call. The daemon scopes
//! sessions to WS connection lifetime. So `state.set` followed by
//! `state.get` would land on different sessions and `state.get`
//! would return null. The unit tests under `proxy::tests` cover
//! connection-error wording but cannot prove session continuity —
//! this test does, by running both calls on a single client and
//! asserting the read-back matches the write.
//!
//! Spins up a one-shot dispatcher on `127.0.0.1:0` (kernel-assigned
//! port), points `DaemonClient` at it via constructor URL, exercises
//! the round-trip. No external services touched, no global env
//! state mutated.

use car_ffi_common::proxy::DaemonClient;
use car_memgine::MemgineEngine;
use car_server_core::{run_dispatch, ServerState, ServerStateConfig};
use futures::{SinkExt, StreamExt};
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::sync::Arc;
use tempfile::TempDir;
use tokio::net::TcpListener;
use tokio::sync::Mutex;
use tokio_tungstenite::accept_async;
use tokio_tungstenite::tungstenite::Message;

async fn spawn_one_shot_dispatcher(state: Arc<ServerState>) -> SocketAddr {
    let listener = TcpListener::bind(SocketAddr::V4(SocketAddrV4::new(
        Ipv4Addr::new(127, 0, 0, 1),
        0,
    )))
    .await
    .expect("bind loopback");
    let addr = listener.local_addr().expect("local_addr");

    tokio::spawn(async move {
        let (stream, peer) = listener.accept().await.expect("accept");
        let ws = accept_async(stream).await.expect("ws handshake");
        let (write, read) = futures::StreamExt::split(ws);
        let _ = run_dispatch(read, Box::pin(write), peer.to_string(), state).await;
    });

    addr
}

fn loopback_state(journal_dir: std::path::PathBuf) -> Arc<ServerState> {
    let engine = Arc::new(Mutex::new(MemgineEngine::new(None)));
    let cfg = ServerStateConfig::new(journal_dir).with_shared_memgine(engine);
    Arc::new(ServerState::with_config(cfg))
}

/// `state.set` followed by `state.get` on a single `DaemonClient`
/// must round-trip — the write and the read share one daemon
/// session because the client holds the WS open across calls.
#[tokio::test]
async fn state_round_trips_on_single_client() {
    let tmp = TempDir::new().expect("tempdir");
    let state = loopback_state(tmp.path().to_path_buf());
    let addr = spawn_one_shot_dispatcher(state).await;
    let url = format!("ws://{}", addr);

    let client = DaemonClient::with_url(url);

    client
        .call(
            "state.set",
            serde_json::json!({ "key": "k", "value": "v" }),
        )
        .await
        .expect("state.set");

    let got = client
        .call("state.get", serde_json::json!({ "key": "k" }))
        .await
        .expect("state.get");

    assert_eq!(
        got,
        serde_json::Value::String("v".into()),
        "expected the write to be visible to the read on the same client/session"
    );
}

/// A2UI capabilities + apply + surfaces + get round-trip against the
/// real dispatcher. Validates that the proxy_a2ui_* helpers added in
/// v0.8 phase 7 (binding parity for the A2UI surface) actually drive
/// the daemon's `a2ui.*` JSON-RPC namespace correctly.
#[tokio::test]
async fn a2ui_capabilities_and_round_trip() {
    let tmp = TempDir::new().expect("tempdir");
    let state = loopback_state(tmp.path().to_path_buf());
    let addr = spawn_one_shot_dispatcher(state).await;
    let url = format!("ws://{}", addr);

    let client = DaemonClient::with_url(url);

    // capabilities — must always return the catalog descriptor
    // (catalogs URL list + mimeType + version) even with zero
    // surfaces. The exact field set comes from `car_a2ui::capabilities`.
    let caps = client
        .call("a2ui.capabilities", serde_json::Value::Null)
        .await
        .expect("a2ui.capabilities");
    assert!(
        caps.get("version").is_some(),
        "capabilities should advertise version, got: {caps:#}"
    );
    assert!(
        caps.get("catalogs").and_then(|v| v.as_array()).is_some(),
        "capabilities should advertise catalogs array, got: {caps:#}"
    );

    // surfaces (initial) — empty.
    let initial = client
        .call("a2ui.surfaces", serde_json::Value::Null)
        .await
        .expect("a2ui.surfaces (initial)");
    assert_eq!(
        initial,
        serde_json::Value::Array(vec![]),
        "no surfaces yet"
    );
}

/// state.snapshot / state.keys / state.exists round-trip against a
/// real dispatcher. Catches regressions in the daemon-side handlers
/// added in v0.8 phase 7.1 — the FFI bindings rely on these
/// returning the right shape. Exists too: a separate proxy helper
/// because state.exists has its own JSON envelope (bool, not
/// Value::Null-or-something).
#[tokio::test]
async fn state_extras_round_trip() {
    let tmp = TempDir::new().expect("tempdir");
    let state = loopback_state(tmp.path().to_path_buf());
    let addr = spawn_one_shot_dispatcher(state).await;
    let url = format!("ws://{}", addr);

    let client = DaemonClient::with_url(url);

    client
        .call("state.set", serde_json::json!({ "key": "a", "value": 1 }))
        .await
        .expect("set a");
    client
        .call("state.set", serde_json::json!({ "key": "b", "value": "two" }))
        .await
        .expect("set b");

    let exists_a = client
        .call("state.exists", serde_json::json!({ "key": "a" }))
        .await
        .expect("exists a");
    assert_eq!(exists_a, serde_json::Value::Bool(true));

    let exists_z = client
        .call("state.exists", serde_json::json!({ "key": "z" }))
        .await
        .expect("exists z");
    assert_eq!(exists_z, serde_json::Value::Bool(false));

    let keys = client
        .call("state.keys", serde_json::Value::Null)
        .await
        .expect("keys");
    let mut keys_vec: Vec<String> =
        serde_json::from_value(keys).expect("keys is array of strings");
    keys_vec.sort();
    assert_eq!(keys_vec, vec!["a".to_string(), "b".to_string()]);

    let snapshot = client
        .call("state.snapshot", serde_json::Value::Null)
        .await
        .expect("snapshot");
    assert_eq!(
        snapshot,
        serde_json::json!({ "a": 1, "b": "two" }),
        "snapshot should return all keys with their values"
    );
}

/// Server-initiated `tools.execute` request flows through
/// `DaemonClient::register_handler` to the registered closure and the
/// closure's return value lands back on the server as a JSON-RPC
/// `result`. This is the path that lets executeProposal /
/// registerAgentRunner work in daemon-only mode — without it, the
/// daemon's `WsToolExecutor` callback would just time out.
///
/// The fixture is a hand-rolled WS server (not the full daemon
/// dispatcher) so we control the request/response sequence
/// precisely: on receiving the client's `noop` call, the server
/// sends `tools.execute` with id="cb-1", then waits for the
/// response, then replies to the original `noop` with the captured
/// tool output.
#[tokio::test]
async fn server_initiated_request_routes_to_handler() {
    let listener = TcpListener::bind(SocketAddr::V4(SocketAddrV4::new(
        Ipv4Addr::new(127, 0, 0, 1),
        0,
    )))
    .await
    .expect("bind loopback");
    let addr = listener.local_addr().expect("local_addr");

    tokio::spawn(async move {
        let (stream, _peer) = listener.accept().await.expect("accept");
        let ws = accept_async(stream).await.expect("ws handshake");
        let (mut write, mut read) = ws.split();

        // Wait for the client to send any call (we send a `noop`
        // from the test below). Capture its id so we can reply with
        // a matching response after the tool round-trip completes.
        let first = read
            .next()
            .await
            .expect("first frame")
            .expect("frame ok");
        let text = first.to_text().expect("text").to_string();
        let env: serde_json::Value = serde_json::from_str(&text).expect("parse client req");
        let client_req_id = env.get("id").cloned().unwrap_or(serde_json::Value::Null);

        // Send a server-initiated tools.execute request.
        let server_req = serde_json::json!({
            "jsonrpc": "2.0",
            "id": "cb-1",
            "method": "tools.execute",
            "params": {
                "action_id": "cb-1",
                "tool": "echo",
                "parameters": { "value": "hello" },
                "attempt": 1,
            },
        });
        write
            .send(Message::Text(server_req.to_string().into()))
            .await
            .expect("send server req");

        // Read the client's response to tools.execute.
        let resp_frame = read
            .next()
            .await
            .expect("response frame")
            .expect("frame ok");
        let resp_text = resp_frame.to_text().expect("text").to_string();
        let resp: serde_json::Value = serde_json::from_str(&resp_text).expect("parse resp");
        let captured = resp.get("result").cloned().unwrap_or(serde_json::Value::Null);

        // Reply to the original client call with the captured value
        // so the client's call() awaits resolve and the assertion can
        // check it.
        let final_resp = serde_json::json!({
            "jsonrpc": "2.0",
            "id": client_req_id,
            "result": captured,
        });
        write
            .send(Message::Text(final_resp.to_string().into()))
            .await
            .expect("send final resp");
    });

    let url = format!("ws://{}", addr);
    let client = DaemonClient::with_url(url);

    // Register a handler that echoes back the params it received.
    client.register_handler("tools.execute", |params| async move {
        // Echo `parameters.value` back so the test can compare.
        let echoed = params
            .get("parameters")
            .and_then(|p| p.get("value"))
            .cloned()
            .unwrap_or(serde_json::Value::Null);
        Ok(serde_json::json!({ "echoed": echoed }))
    });

    let result = client
        .call("noop", serde_json::Value::Null)
        .await
        .expect("noop call");

    assert_eq!(
        result,
        serde_json::json!({ "echoed": "hello" }),
        "tool handler should have echoed the parameter back through the server"
    );
}

/// Server-initiated **notifications** (no `id`, no response expected)
/// route to handlers registered with
/// `register_notification_handler`. We bypass the dispatcher and feed
/// a raw notification frame straight onto the wire so we can prove
/// the recv-loop's notification branch dispatches correctly without
/// needing a daemon-side notification source.
#[tokio::test]
async fn notification_handler_receives_voice_event_shape() {
    use std::sync::Arc as StdArc;
    use std::sync::Mutex as StdMutex;

    let listener = TcpListener::bind(SocketAddr::V4(SocketAddrV4::new(
        Ipv4Addr::new(127, 0, 0, 1),
        0,
    )))
    .await
    .expect("bind loopback");
    let addr = listener.local_addr().expect("local_addr");

    // Mini WS server: accept one connection, send one notification
    // frame, then keep the socket open so the client's recv loop
    // doesn't tear down before the handler fires.
    tokio::spawn(async move {
        let (stream, _) = listener.accept().await.expect("accept");
        let ws = accept_async(stream).await.expect("ws handshake");
        let (mut write, mut read) = futures::StreamExt::split(ws);
        // Drain the auth frame if the client sends one (the test env
        // shouldn't, but be defensive).
        let _ = tokio::time::timeout(std::time::Duration::from_millis(50), read.next()).await;
        let notification = serde_json::json!({
            "jsonrpc": "2.0",
            "method": "voice.event",
            "params": {
                "session_id": "sess-1",
                "event": { "type": "delta", "text": "hello" }
            }
        });
        let _ = write
            .send(Message::Text(notification.to_string().into()))
            .await;
        // Keep socket alive until the test drops the client.
        loop {
            tokio::time::sleep(std::time::Duration::from_secs(60)).await;
        }
    });

    let url = format!("ws://{}", addr);
    let client = DaemonClient::with_url(url);

    let captured: StdArc<StdMutex<Option<serde_json::Value>>> = StdArc::new(StdMutex::new(None));
    let captured_for_handler = captured.clone();
    client.register_notification_handler("voice.event", move |params| {
        if let Ok(mut g) = captured_for_handler.lock() {
            *g = Some(params);
        }
    });

    // Force the client to connect (it's lazy). Any failing call is
    // fine — we just need the recv loop to start. Use an unknown
    // method so the daemon won't burn time trying to handle it; the
    // resulting error is ignored.
    let _ = tokio::time::timeout(
        std::time::Duration::from_millis(500),
        client.call("__unknown__", serde_json::Value::Null),
    )
    .await;

    // Poll for up to 2s waiting for the handler to fire.
    for _ in 0..40 {
        if captured.lock().unwrap().is_some() {
            break;
        }
        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
    }

    let got = captured.lock().unwrap().clone();
    let got = got.expect("notification handler should have fired");
    assert_eq!(got["session_id"], "sess-1");
    assert_eq!(got["event"]["type"], "delta");
    assert_eq!(got["event"]["text"], "hello");
}

/// A panicking notification handler does not tear down the recv loop.
/// Sends two `voice.event` notifications: the first handler panics,
/// the second is a normal capture. Both must be processed — proving
/// the loop survives the panic.
#[tokio::test]
async fn notification_handler_panic_does_not_kill_recv_loop() {
    use std::sync::Arc as StdArc;
    use std::sync::Mutex as StdMutex;

    let listener = TcpListener::bind(SocketAddr::V4(SocketAddrV4::new(
        Ipv4Addr::new(127, 0, 0, 1),
        0,
    )))
    .await
    .expect("bind loopback");
    let addr = listener.local_addr().expect("local_addr");

    tokio::spawn(async move {
        let (stream, _) = listener.accept().await.expect("accept");
        let ws = accept_async(stream).await.expect("ws handshake");
        let (mut write, mut read) = futures::StreamExt::split(ws);
        let _ = tokio::time::timeout(std::time::Duration::from_millis(50), read.next()).await;
        let mk_event = |which: &str| {
            serde_json::json!({
                "jsonrpc": "2.0",
                "method": "voice.event",
                "params": { "session_id": which, "event": {} }
            })
            .to_string()
        };
        let _ = write.send(Message::Text(mk_event("first").into())).await;
        // Tiny pause so the recv loop processes the first frame and
        // the handler panic happens before the second arrives.
        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
        let _ = write.send(Message::Text(mk_event("second").into())).await;
        loop {
            tokio::time::sleep(std::time::Duration::from_secs(60)).await;
        }
    });

    let url = format!("ws://{}", addr);
    let client = DaemonClient::with_url(url);

    let seen: StdArc<StdMutex<Vec<String>>> = StdArc::new(StdMutex::new(Vec::new()));
    let seen_for_handler = seen.clone();
    client.register_notification_handler("voice.event", move |params| {
        let id = params
            .get("session_id")
            .and_then(|v| v.as_str())
            .unwrap_or("")
            .to_string();
        if id == "first" {
            panic!("intentional panic from handler");
        }
        if let Ok(mut g) = seen_for_handler.lock() {
            g.push(id);
        }
    });

    let _ = tokio::time::timeout(
        std::time::Duration::from_millis(500),
        client.call("__unknown__", serde_json::Value::Null),
    )
    .await;

    for _ in 0..40 {
        if !seen.lock().unwrap().is_empty() {
            break;
        }
        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
    }

    let seen_ids = seen.lock().unwrap().clone();
    assert_eq!(
        seen_ids,
        vec!["second".to_string()],
        "second notification must process — recv loop survived first handler's panic"
    );
}