car-server-core 0.6.0

Transport-neutral library for the CAR daemon JSON-RPC dispatcher (used by car-server and tokhn-daemon)
Documentation
//! Server-side session state — shared across all connections.

use car_engine::{Runtime, ToolExecutor};
use car_eventlog::EventLog;
use car_proto::{ToolExecuteRequest, ToolExecuteResponse};
use serde_json::Value;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tokio::sync::{oneshot, Mutex};
use tokio_tungstenite::tungstenite::Message;

/// Shared write half of the WebSocket, plus pending callback channels.
pub struct WsChannel {
    pub write: Mutex<
        futures::stream::SplitSink<
            tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>,
            Message,
        >,
    >,
    /// Pending tool execution callbacks: request_id → oneshot sender
    pub pending: Mutex<HashMap<String, oneshot::Sender<ToolExecuteResponse>>>,
    pub next_id: AtomicU64,
}

impl WsChannel {
    pub fn next_request_id(&self) -> String {
        let id = self.next_id.fetch_add(1, Ordering::SeqCst);
        format!("cb-{}", id)
    }
}

/// Tool executor that sends callbacks to the client over WebSocket.
pub struct WsToolExecutor {
    pub channel: Arc<WsChannel>,
}

#[async_trait::async_trait]
impl ToolExecutor for WsToolExecutor {
    async fn execute(&self, tool: &str, params: &Value) -> Result<Value, String> {
        use futures::SinkExt;

        let request_id = self.channel.next_request_id();

        let callback = ToolExecuteRequest {
            action_id: request_id.clone(),
            tool: tool.to_string(),
            parameters: params.clone(),
            timeout_ms: None,
            attempt: 1,
        };

        // Create a oneshot channel for the response
        let (tx, rx) = oneshot::channel();
        self.channel
            .pending
            .lock()
            .await
            .insert(request_id.clone(), tx);

        // Send the callback to the client as a JSON-RPC request
        let rpc_request = serde_json::json!({
            "jsonrpc": "2.0",
            "method": "tools.execute",
            "params": callback,
            "id": request_id,
        });

        let msg = Message::Text(
            serde_json::to_string(&rpc_request)
                .map_err(|e| e.to_string())?
                .into(),
        );
        self.channel
            .write
            .lock()
            .await
            .send(msg)
            .await
            .map_err(|e| format!("failed to send tool callback: {}", e))?;

        // Wait for the client to respond (with a timeout)
        let response = tokio::time::timeout(std::time::Duration::from_secs(60), rx)
            .await
            .map_err(|_| format!("tool '{}' callback timed out (60s)", tool))?
            .map_err(|_| format!("tool '{}' callback channel closed", tool))?;

        if let Some(err) = response.error {
            Err(err)
        } else {
            Ok(response.output.unwrap_or(Value::Null))
        }
    }
}

/// Voice event sink that forwards events to a specific WebSocket client
/// as `voice.event` JSON-RPC notifications.
///
/// Each `voice.transcribe_stream.start` call constructs one of these
/// bound to the originating client's [`WsChannel`], so a client only
/// receives events for sessions it started.
pub struct WsVoiceEventSink {
    pub channel: Arc<WsChannel>,
}

impl car_voice::VoiceEventSink for WsVoiceEventSink {
    fn send(&self, session_id: &str, event_json: String) {
        use futures::SinkExt;
        let channel = self.channel.clone();
        let session_id = session_id.to_string();
        tokio::spawn(async move {
            let payload: Value = serde_json::from_str(&event_json)
                .unwrap_or_else(|_| Value::String(event_json.clone()));
            let notification = serde_json::json!({
                "jsonrpc": "2.0",
                "method": "voice.event",
                "params": {
                    "session_id": session_id,
                    "event": payload,
                },
            });
            let Ok(text) = serde_json::to_string(&notification) else {
                return;
            };
            let _ = channel
                .write
                .lock()
                .await
                .send(Message::Text(text.into()))
                .await;
        });
    }
}

/// Per-meeting fanout sink that ingests transcript text into a
/// session-scoped memgine using the `Arc<tokio::sync::Mutex<...>>`
/// wrapper, then forwards every event upstream untouched.
///
/// Lives here (not in `car-ffi-common`) because the engine handle uses
/// `tokio::sync::Mutex` per the "one-wrapper rule" — the FFI-common
/// `MeetingMemgineFanout` still uses `std::sync::Mutex` for the NAPI/
/// PyO3 bindings, which keep their sync wrappers. Each binding owns the
/// fanout that matches its lock primitive; the parsing/formatting logic
/// itself is shared via [`car_meeting::extract_transcript_for_ingest`].
///
/// `send` is called from the voice drain task and must be non-blocking,
/// so the lock acquisition is shipped to a `tokio::spawn`. Transcript
/// events are independent so reordering across spawned tasks is fine.
pub struct WsMemgineIngestSink {
    pub meeting_id: String,
    pub engine: Arc<Mutex<car_memgine::MemgineEngine>>,
    pub upstream: Arc<dyn car_voice::VoiceEventSink>,
}

impl car_voice::VoiceEventSink for WsMemgineIngestSink {
    fn send(&self, voice_session_id: &str, event_json: String) {
        if let Ok(value) = serde_json::from_str::<Value>(&event_json) {
            if let Some((speaker, text)) = car_meeting::extract_transcript_for_ingest(
                &value,
                &self.meeting_id,
                voice_session_id,
            ) {
                let engine = self.engine.clone();
                tokio::spawn(async move {
                    let mut guard = engine.lock().await;
                    guard.ingest_conversation(&speaker, &text, chrono::Utc::now());
                });
            }
        }
        self.upstream.send(voice_session_id, event_json);
    }
}

/// Per-client session.
pub struct ClientSession {
    pub client_id: String,
    pub runtime: Arc<Runtime>,
    pub channel: Arc<WsChannel>,
    pub host: Arc<crate::host::HostState>,
    /// Memgine handle. Wrapped in `tokio::sync::Mutex` so dispatcher
    /// handlers can hold the lock across `.await` points without
    /// risking poisoning. Migrated from `std::sync::Mutex` in the
    /// car-server-core extraction (U1) per the "one-wrapper rule".
    pub memgine: Arc<Mutex<car_memgine::MemgineEngine>>,
    /// Lazy browser session — first `browser.run` call launches Chromium,
    /// subsequent calls reuse it so element IDs resolve across invocations
    /// within the same WebSocket connection.
    pub browser: car_ffi_common::browser::BrowserSessionSlot,
}

/// Builder for constructing a [`ServerState`] with embedder-supplied
/// dependencies. Embedders (e.g. `tokhn-daemon`) use this to inject
/// their own memgine handle and other shared infrastructure; the
/// standalone `car-server` binary uses [`ServerState::standalone`]
/// which calls `with_config` under the hood.
pub struct ServerStateConfig {
    pub journal_dir: PathBuf,
    /// Optional pre-constructed memgine engine. When `None`, each
    /// `create_session` call builds a fresh engine; embedders that want
    /// to share a single engine across sessions can supply a clone of
    /// their `Arc<Mutex<MemgineEngine>>` here.
    pub shared_memgine: Option<Arc<Mutex<car_memgine::MemgineEngine>>>,
    /// Optional pre-constructed inference engine.
    pub inference: Option<Arc<car_inference::InferenceEngine>>,
}

impl ServerStateConfig {
    /// Minimal config suitable for the standalone car-server binary:
    /// only the journal dir is required; everything else is lazily
    /// constructed at first use.
    pub fn new(journal_dir: PathBuf) -> Self {
        Self {
            journal_dir,
            shared_memgine: None,
            inference: None,
        }
    }

    pub fn with_shared_memgine(
        mut self,
        engine: Arc<Mutex<car_memgine::MemgineEngine>>,
    ) -> Self {
        self.shared_memgine = Some(engine);
        self
    }

    pub fn with_inference(mut self, engine: Arc<car_inference::InferenceEngine>) -> Self {
        self.inference = Some(engine);
        self
    }
}

/// Global server state shared across all connections.
pub struct ServerState {
    pub journal_dir: PathBuf,
    pub sessions: Mutex<HashMap<String, Arc<ClientSession>>>,
    pub inference: std::sync::OnceLock<Arc<car_inference::InferenceEngine>>,
    pub host: Arc<crate::host::HostState>,
    /// When `Some`, `create_session` clones this handle into every new
    /// `ClientSession.memgine` — embedders that want a single shared
    /// memgine across all WS sessions set this. Standalone car-server
    /// leaves it `None`, which gives each session its own engine
    /// (preserving today's behavior).
    pub shared_memgine: Option<Arc<Mutex<car_memgine::MemgineEngine>>>,
    /// Process-wide voice session registry. Each
    /// `voice.transcribe_stream.start` call registers its own per-client
    /// [`WsVoiceEventSink`] so events route back to the originating WS
    /// connection only.
    pub voice_sessions: Arc<car_voice::VoiceSessionRegistry>,
    /// Process-wide meeting registry. Meeting ids are global; each
    /// meeting binds to the originating client's WS for upstream
    /// events but persists transcripts to the resolved
    /// `.car/meetings/<id>/` regardless of which client started it.
    pub meetings: Arc<car_meeting::MeetingRegistry>,
    /// Process-wide concurrency gate for inference RPC handlers. Sized
    /// from host RAM at startup, overridable via
    /// [`crate::admission::ENV_MAX_CONCURRENT`]. Without this, N
    /// concurrent users multiply KV-cache and activation memory and
    /// take the host out (#114-adjacent: filed alongside the daemon
    /// always-on rework). The semaphore lives on `ServerState` so it
    /// is shared across every WebSocket session in the same process.
    pub admission: Arc<crate::admission::InferenceAdmission>,
}

impl ServerState {
    /// Constructor for the standalone `car-server` binary. Each WS
    /// connection gets its own per-session memgine — matches the
    /// pre-extraction default and is correct for a single-process
    /// daemon serving one user at a time.
    ///
    /// **Embedders must not call this.** It silently leaves
    /// `shared_memgine = None`, which re-introduces the dual-memgine
    /// bug U7 was created to prevent (one engine in the embedder, a
    /// fresh one inside every WS session). Embedders use
    /// [`ServerState::embedded`] instead, which makes the shared
    /// engine handle a required argument so it cannot be forgotten.
    pub fn standalone(journal_dir: PathBuf) -> Self {
        Self::with_config(ServerStateConfig::new(journal_dir))
    }

    /// Constructor for embedders (e.g. `tokhn-daemon`). The shared
    /// memgine handle is **required**: every WS session created by
    /// this state will reuse the same engine, preventing the
    /// dual-memgine bug.
    ///
    /// For embedders that also want to inject a pre-warmed inference
    /// engine or other advanced wiring, build a [`ServerStateConfig`]
    /// directly and call [`ServerState::with_config`].
    pub fn embedded(
        journal_dir: PathBuf,
        shared_memgine: Arc<Mutex<car_memgine::MemgineEngine>>,
    ) -> Self {
        Self::with_config(
            ServerStateConfig::new(journal_dir).with_shared_memgine(shared_memgine),
        )
    }

    /// Build a `ServerState` from a [`ServerStateConfig`] — the path
    /// embedders use when they need to inject a shared memgine *and*
    /// a pre-warmed inference engine, or any other advanced wiring
    /// the convenience constructors don't cover.
    pub fn with_config(cfg: ServerStateConfig) -> Self {
        let inference = std::sync::OnceLock::new();
        if let Some(eng) = cfg.inference {
            // OnceLock::set returns Err if already set — fresh OnceLock
            // means it's empty, so this is infallible here.
            let _ = inference.set(eng);
        }
        Self {
            journal_dir: cfg.journal_dir,
            sessions: Mutex::new(HashMap::new()),
            inference,
            host: Arc::new(crate::host::HostState::new()),
            shared_memgine: cfg.shared_memgine,
            voice_sessions: Arc::new(car_voice::VoiceSessionRegistry::new()),
            meetings: Arc::new(car_meeting::MeetingRegistry::new()),
            admission: Arc::new(crate::admission::InferenceAdmission::new()),
        }
    }

    pub async fn create_session(
        &self,
        client_id: &str,
        channel: Arc<WsChannel>,
    ) -> Arc<ClientSession> {
        let journal_path = self.journal_dir.join(format!("{}.jsonl", client_id));
        let event_log = EventLog::with_journal(journal_path);

        let executor = Arc::new(WsToolExecutor {
            channel: channel.clone(),
        });

        let runtime = Runtime::new()
            .with_event_log(event_log)
            .with_executor(executor);

        // If the embedder supplied a shared memgine, every session uses it.
        // Otherwise each session gets its own — matches pre-extraction behavior.
        let memgine = match &self.shared_memgine {
            Some(eng) => eng.clone(),
            None => Arc::new(Mutex::new(car_memgine::MemgineEngine::new(None))),
        };

        let session = Arc::new(ClientSession {
            client_id: client_id.to_string(),
            runtime: Arc::new(runtime),
            channel,
            host: self.host.clone(),
            memgine,
            browser: car_ffi_common::browser::BrowserSessionSlot::new(),
        });

        self.sessions
            .lock()
            .await
            .insert(client_id.to_string(), session.clone());

        session
    }

    /// Remove a per-client session from the registry on disconnect.
    /// Returns the removed session if present so callers can drop any
    /// remaining strong refs (e.g. drain pending tool callbacks). Fix
    /// for MULTI-4 / WS-3 — without this, `state.sessions` retains
    /// `Arc<ClientSession>` for every connection that ever existed.
    pub async fn remove_session(&self, client_id: &str) -> Option<Arc<ClientSession>> {
        self.sessions.lock().await.remove(client_id)
    }
}