car-server-core 0.7.0

Transport-neutral library for the CAR daemon JSON-RPC dispatcher (used by car-server and tokhn-daemon)
Documentation
//! End-to-end test for `car_ffi_common::proxy::DaemonClient` against
//! a real `car-server-core` dispatcher.
//!
//! The bug we exist to guard against: the prior proxy infrastructure
//! opened a fresh WebSocket per JSON-RPC call. The daemon scopes
//! sessions to WS connection lifetime. So `state.set` followed by
//! `state.get` would land on different sessions and `state.get`
//! would return null. The unit tests under `proxy::tests` cover
//! connection-error wording but cannot prove session continuity —
//! this test does, by running both calls on a single client and
//! asserting the read-back matches the write.
//!
//! Spins up a one-shot dispatcher on `127.0.0.1:0` (kernel-assigned
//! port), points `DaemonClient` at it via constructor URL, exercises
//! the round-trip. No external services touched, no global env
//! state mutated.

use car_ffi_common::proxy::DaemonClient;
use car_memgine::MemgineEngine;
use car_server_core::{run_dispatch, ServerState, ServerStateConfig};
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::sync::Arc;
use tempfile::TempDir;
use tokio::net::TcpListener;
use tokio::sync::Mutex;
use tokio_tungstenite::accept_async;

async fn spawn_one_shot_dispatcher(state: Arc<ServerState>) -> SocketAddr {
    let listener = TcpListener::bind(SocketAddr::V4(SocketAddrV4::new(
        Ipv4Addr::new(127, 0, 0, 1),
        0,
    )))
    .await
    .expect("bind loopback");
    let addr = listener.local_addr().expect("local_addr");

    tokio::spawn(async move {
        let (stream, peer) = listener.accept().await.expect("accept");
        let ws = accept_async(stream).await.expect("ws handshake");
        let (write, read) = futures::StreamExt::split(ws);
        let _ = run_dispatch(read, write, peer, state).await;
    });

    addr
}

fn loopback_state(journal_dir: std::path::PathBuf) -> Arc<ServerState> {
    let engine = Arc::new(Mutex::new(MemgineEngine::new(None)));
    let cfg = ServerStateConfig::new(journal_dir).with_shared_memgine(engine);
    Arc::new(ServerState::with_config(cfg))
}

/// `state.set` followed by `state.get` on a single `DaemonClient`
/// must round-trip — the write and the read share one daemon
/// session because the client holds the WS open across calls.
#[tokio::test]
async fn state_round_trips_on_single_client() {
    let tmp = TempDir::new().expect("tempdir");
    let state = loopback_state(tmp.path().to_path_buf());
    let addr = spawn_one_shot_dispatcher(state).await;
    let url = format!("ws://{}", addr);

    let client = DaemonClient::with_url(url);

    client
        .call(
            "state.set",
            serde_json::json!({ "key": "k", "value": "v" }),
        )
        .await
        .expect("state.set");

    let got = client
        .call("state.get", serde_json::json!({ "key": "k" }))
        .await
        .expect("state.get");

    assert_eq!(
        got,
        serde_json::Value::String("v".into()),
        "expected the write to be visible to the read on the same client/session"
    );
}