car-server-core 0.31.0

Transport-neutral library for the CAR daemon JSON-RPC dispatcher (used by car-server and tokhn-daemon)
//! WS-coupled host pieces that stay in `car-server-core` after the #418 lift.
//!
//! `HostState`, the [`EventSubscriber`] trait, and the channel/approval types
//! moved to `car-server-types`; they are re-exported here so `crate::host::*`
//! keeps resolving across the dispatcher with no call-site churn.
//!
//! What stays: [`RunTraceSubscriber`] — it owns a `WsChannel` write sink and
//! needs a bounded write + failure signal (to exit its drain task on a wedged
//! socket) that the simple `EventSubscriber::send_text` contract doesn't model.

pub use car_server_types::host::*;

use crate::session::WsChannel;
use futures::SinkExt;
use std::sync::Arc;
use std::time::Duration;
use tokio_tungstenite::tungstenite::Message;

/// Capacity of each run-trace subscriber's bounded channel (agent run
/// tracing, U4). One slot per buffered `runs.trace.event` between the
/// producer (the recorder / lifecycle path) and the per-subscriber drain
/// task. Sized generously: a healthy CarHost drains far faster than turns
/// are produced, so the buffer only fills if a socket genuinely wedges —
/// at which point `try_send` drops the event rather than blocking the
/// producer (invariant #2). 256 absorbs a burst of fast turns without
/// dropping under normal load.
pub const RUN_TRACE_CHANNEL_CAP: usize = 256;

/// One live `runs.trace.event` subscriber — the producer side of the
/// bounded channel whose drain task writes frames to the subscriber's
/// WebSocket (agent run tracing, U4).
///
/// Keyed in [`crate::session::ServerState::run_subscribers`] by
/// `(run_id, host_client_id)` so two CarHost windows on the same run are
/// independent streams (the explicit fanout the single-subscriber-
/// per-method notification registry can't provide), and so disconnect
/// cleanup can drop exactly this connection's subscriptions.
///
/// The producer holds ONLY the `tx` and calls [`RunTraceSubscriber::push`]
/// — a non-blocking `try_send`. It never touches the WS socket, so a slow
/// CarHost can never stall the recorder, the `runs` lock, or any other
/// in-flight RPC (invariant #2). The dedicated drain task owns the socket
/// write.
pub struct RunTraceSubscriber {
    /// The connection that subscribed — its WS `client_id`.
    pub host_client_id: String,
    /// Non-blocking producer handle into the drain task's channel.
    tx: tokio::sync::mpsc::Sender<car_proto::RunTraceEvent>,
}

impl RunTraceSubscriber {
    /// Spawn a drain task bound to `channel` and return the producer-side
    /// subscriber handle. The drain task serializes each
    /// `runs.trace.event` to a JSON-RPC notification frame and writes it
    /// to the subscriber's WS; it exits when the `tx` is dropped (the
    /// subscriber is removed on unsubscribe / disconnect) — at which point
    /// the channel closes and `recv()` returns `None`.
    pub fn spawn(host_client_id: String, channel: Arc<WsChannel>) -> Self {
        let (tx, mut rx) =
            tokio::sync::mpsc::channel::<car_proto::RunTraceEvent>(RUN_TRACE_CHANNEL_CAP);
        tokio::spawn(async move {
            while let Some(event) = rx.recv().await {
                let Ok(json) = serde_json::to_string(&serde_json::json!({
                    "jsonrpc": "2.0",
                    "method": "runs.trace.event",
                    "params": event,
                })) else {
                    continue;
                };
                // A wedged socket should not hang the drain task forever —
                // bound the write so a dead connection's task exits and
                // frees the channel rather than parking on a full TCP
                // buffer. On any write error the connection is gone; stop
                // draining (the subscriber is reaped on disconnect).
                let mut guard = channel.write.lock().await;
                let send = tokio::time::timeout(
                    Duration::from_secs(10),
                    guard.send(Message::Text(json.into())),
                )
                .await;
                drop(guard);
                match send {
                    Ok(Ok(())) => {}
                    // Timed out or errored — the socket is unusable. Exit
                    // the drain loop; the producer's `try_send` will start
                    // dropping (bounded) and the subscriber is cleaned up
                    // on disconnect.
                    _ => break,
                }
            }
        });
        Self { host_client_id, tx }
    }

    /// Non-blocking push of one event onto the drain channel. Returns
    /// `false` when the channel is full (a wedged subscriber) — the event
    /// is dropped rather than blocking the producer (invariant #2). The
    /// producer treats the drop as best-effort: the client detects the
    /// resulting cursor gap and re-subscribes to backfill (R8).
    pub fn push(&self, event: car_proto::RunTraceEvent) -> bool {
        self.tx.try_send(event).is_ok()
    }

    #[cfg(test)]
    pub fn is_closed(&self) -> bool {
        self.tx.is_closed()
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::collections::HashMap;
    use tokio::sync::Mutex;

    /// Build a `WsChannel` whose writes land in a `futures::mpsc`
    /// receiver the test can drain to observe the drain task's frames.
    /// `futures::channel::mpsc::UnboundedSender` already implements
    /// `Sink<Message>`; we map its `SendError` into the tungstenite error
    /// the `WsSink` alias expects.
    fn channel_with_capture() -> (
        Arc<WsChannel>,
        futures::channel::mpsc::UnboundedReceiver<Message>,
    ) {
        use futures::sink::SinkExt;
        let (tx, rx) = futures::channel::mpsc::unbounded::<Message>();
        let sink: crate::session::WsSink =
            Box::pin(tx.sink_map_err(|_| tokio_tungstenite::tungstenite::Error::ConnectionClosed));
        let channel = Arc::new(WsChannel {
            write: Mutex::new(sink),
            pending: Mutex::new(HashMap::new()),
            next_id: std::sync::atomic::AtomicU64::new(0),
        });
        (channel, rx)
    }

    #[tokio::test]
    async fn run_trace_subscriber_drains_event_to_socket() {
        let (channel, mut rx) = channel_with_capture();
        let sub = RunTraceSubscriber::spawn("host-1".to_string(), channel);
        let event = car_proto::RunTraceEvent {
            run_id: "run-1".to_string(),
            agent_id: "agent-a".to_string(),
            record: car_proto::RunRecord::Started(car_proto::RunStarted {
                run_id: "run-1".to_string(),
                agent_id: "agent-a".to_string(),
                intent: "go".to_string(),
                outcome_description: None,
                started_at: chrono::Utc::now(),
            }),
            cursor: 0,
            status: car_proto::RunLiveStatus::InProgress,
        };
        assert!(sub.push(event), "push onto a fresh channel succeeds");

        // The drain task serializes the event to a runs.trace.event frame.
        use futures::StreamExt;
        let frame = tokio::time::timeout(Duration::from_secs(2), rx.next())
            .await
            .expect("drain task wrote within the deadline")
            .expect("a frame");
        let text = match frame {
            Message::Text(t) => t.to_string(),
            other => panic!("expected a text frame, got {other:?}"),
        };
        let json: serde_json::Value = serde_json::from_str(&text).unwrap();
        assert_eq!(json["method"], "runs.trace.event");
        assert_eq!(json["params"]["run_id"], "run-1");
        assert_eq!(json["params"]["status"], "in_progress");
    }

    #[tokio::test]
    async fn dropping_subscriber_ends_its_drain_task() {
        let (channel, _rx) = channel_with_capture();
        let sub = RunTraceSubscriber::spawn("host-1".to_string(), channel);
        assert!(!sub.is_closed());
        // Dropping the subscriber drops its sender; the drain task's
        // recv() returns None and the task exits. We can only assert the
        // sender side here (the task is detached), but is_closed flips
        // once the receiver is gone — which happens when the task ends.
        drop(sub);
        // No panic / hang is the assertion; the spawned task tears down.
    }
}