Skip to main content

car_server_core/
session.rs

1//! Server-side session state — shared across all connections.
2
3use car_engine::{Runtime, ToolExecutor};
4use car_eventlog::EventLog;
5use car_proto::{ToolExecuteRequest, ToolExecuteResponse};
6use serde::{Deserialize, Serialize};
7use serde_json::Value;
8use std::collections::HashMap;
9use std::path::PathBuf;
10use std::sync::atomic::{AtomicU64, Ordering};
11use std::sync::Arc;
12use tokio::sync::{oneshot, Mutex};
13use tokio_tungstenite::tungstenite::Message;
14
15/// Server-side credentials for continuing an A2A-owned A2UI surface.
16///
17/// This intentionally lives outside `car_a2ui::A2uiSurfaceOwner` so
18/// renderers can inspect surface ownership without receiving secrets.
19#[derive(Debug, Clone, Serialize, Deserialize)]
20#[serde(rename_all = "camelCase", tag = "type")]
21pub enum A2aRouteAuth {
22    None,
23    Bearer { token: String },
24    Header { name: String, value: String },
25}
26
27/// Shared write half of the WebSocket, plus pending callback channels.
28pub struct WsChannel {
29    pub write: Mutex<
30        futures::stream::SplitSink<
31            tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>,
32            Message,
33        >,
34    >,
35    /// Pending tool execution callbacks: request_id → oneshot sender
36    pub pending: Mutex<HashMap<String, oneshot::Sender<ToolExecuteResponse>>>,
37    pub next_id: AtomicU64,
38}
39
40impl WsChannel {
41    pub fn next_request_id(&self) -> String {
42        let id = self.next_id.fetch_add(1, Ordering::SeqCst);
43        format!("cb-{}", id)
44    }
45}
46
47/// Tool executor that sends callbacks to the client over WebSocket.
48pub struct WsToolExecutor {
49    pub channel: Arc<WsChannel>,
50}
51
52#[async_trait::async_trait]
53impl ToolExecutor for WsToolExecutor {
54    async fn execute(&self, tool: &str, params: &Value) -> Result<Value, String> {
55        use futures::SinkExt;
56
57        let request_id = self.channel.next_request_id();
58
59        let callback = ToolExecuteRequest {
60            action_id: request_id.clone(),
61            tool: tool.to_string(),
62            parameters: params.clone(),
63            timeout_ms: None,
64            attempt: 1,
65        };
66
67        // Create a oneshot channel for the response
68        let (tx, rx) = oneshot::channel();
69        self.channel
70            .pending
71            .lock()
72            .await
73            .insert(request_id.clone(), tx);
74
75        // Send the callback to the client as a JSON-RPC request
76        let rpc_request = serde_json::json!({
77            "jsonrpc": "2.0",
78            "method": "tools.execute",
79            "params": callback,
80            "id": request_id,
81        });
82
83        let msg = Message::Text(
84            serde_json::to_string(&rpc_request)
85                .map_err(|e| e.to_string())?
86                .into(),
87        );
88        self.channel
89            .write
90            .lock()
91            .await
92            .send(msg)
93            .await
94            .map_err(|e| format!("failed to send tool callback: {}", e))?;
95
96        // Wait for the client to respond (with a timeout)
97        let response = tokio::time::timeout(std::time::Duration::from_secs(60), rx)
98            .await
99            .map_err(|_| format!("tool '{}' callback timed out (60s)", tool))?
100            .map_err(|_| format!("tool '{}' callback channel closed", tool))?;
101
102        if let Some(err) = response.error {
103            Err(err)
104        } else {
105            Ok(response.output.unwrap_or(Value::Null))
106        }
107    }
108}
109
110/// Voice event sink that forwards events to a specific WebSocket client
111/// as `voice.event` JSON-RPC notifications.
112///
113/// Each `voice.transcribe_stream.start` call constructs one of these
114/// bound to the originating client's [`WsChannel`], so a client only
115/// receives events for sessions it started.
116pub struct WsVoiceEventSink {
117    pub channel: Arc<WsChannel>,
118}
119
120impl car_voice::VoiceEventSink for WsVoiceEventSink {
121    fn send(&self, session_id: &str, event_json: String) {
122        use futures::SinkExt;
123        let channel = self.channel.clone();
124        let session_id = session_id.to_string();
125        tokio::spawn(async move {
126            let payload: Value = serde_json::from_str(&event_json)
127                .unwrap_or_else(|_| Value::String(event_json.clone()));
128            let notification = serde_json::json!({
129                "jsonrpc": "2.0",
130                "method": "voice.event",
131                "params": {
132                    "session_id": session_id,
133                    "event": payload,
134                },
135            });
136            let Ok(text) = serde_json::to_string(&notification) else {
137                return;
138            };
139            let _ = channel
140                .write
141                .lock()
142                .await
143                .send(Message::Text(text.into()))
144                .await;
145        });
146    }
147}
148
149/// Per-meeting fanout sink that ingests transcript text into a
150/// session-scoped memgine using the `Arc<tokio::sync::Mutex<...>>`
151/// wrapper, then forwards every event upstream untouched.
152///
153/// Lives here (not in `car-ffi-common`) because the engine handle uses
154/// `tokio::sync::Mutex` per the "one-wrapper rule" — the FFI-common
155/// `MeetingMemgineFanout` still uses `std::sync::Mutex` for the NAPI/
156/// PyO3 bindings, which keep their sync wrappers. Each binding owns the
157/// fanout that matches its lock primitive; the parsing/formatting logic
158/// itself is shared via [`car_meeting::extract_transcript_for_ingest`].
159///
160/// `send` is called from the voice drain task and must be non-blocking,
161/// so the lock acquisition is shipped to a `tokio::spawn`. Transcript
162/// events are independent so reordering across spawned tasks is fine.
163pub struct WsMemgineIngestSink {
164    pub meeting_id: String,
165    pub engine: Arc<Mutex<car_memgine::MemgineEngine>>,
166    pub upstream: Arc<dyn car_voice::VoiceEventSink>,
167}
168
169impl car_voice::VoiceEventSink for WsMemgineIngestSink {
170    fn send(&self, voice_session_id: &str, event_json: String) {
171        if let Ok(value) = serde_json::from_str::<Value>(&event_json) {
172            if let Some((speaker, text)) = car_meeting::extract_transcript_for_ingest(
173                &value,
174                &self.meeting_id,
175                voice_session_id,
176            ) {
177                let engine = self.engine.clone();
178                tokio::spawn(async move {
179                    let mut guard = engine.lock().await;
180                    guard.ingest_conversation(&speaker, &text, chrono::Utc::now());
181                });
182            }
183        }
184        self.upstream.send(voice_session_id, event_json);
185    }
186}
187
188/// Per-client session.
189pub struct ClientSession {
190    pub client_id: String,
191    pub runtime: Arc<Runtime>,
192    pub channel: Arc<WsChannel>,
193    pub host: Arc<crate::host::HostState>,
194    /// Memgine handle. Wrapped in `tokio::sync::Mutex` so dispatcher
195    /// handlers can hold the lock across `.await` points without
196    /// risking poisoning. Migrated from `std::sync::Mutex` in the
197    /// car-server-core extraction (U1) per the "one-wrapper rule".
198    pub memgine: Arc<Mutex<car_memgine::MemgineEngine>>,
199    /// Lazy browser session — first `browser.run` call launches Chromium,
200    /// subsequent calls reuse it so element IDs resolve across invocations
201    /// within the same WebSocket connection.
202    pub browser: car_ffi_common::browser::BrowserSessionSlot,
203}
204
205/// Builder for constructing a [`ServerState`] with embedder-supplied
206/// dependencies. Embedders (e.g. `tokhn-daemon`) use this to inject
207/// their own memgine handle and other shared infrastructure; the
208/// standalone `car-server` binary uses [`ServerState::standalone`]
209/// which calls `with_config` under the hood.
210pub struct ServerStateConfig {
211    pub journal_dir: PathBuf,
212    /// Optional pre-constructed memgine engine. When `None`, each
213    /// `create_session` call builds a fresh engine; embedders that want
214    /// to share a single engine across sessions can supply a clone of
215    /// their `Arc<Mutex<MemgineEngine>>` here.
216    pub shared_memgine: Option<Arc<Mutex<car_memgine::MemgineEngine>>>,
217    /// Optional pre-constructed inference engine.
218    pub inference: Option<Arc<car_inference::InferenceEngine>>,
219}
220
221impl ServerStateConfig {
222    /// Minimal config suitable for the standalone car-server binary:
223    /// only the journal dir is required; everything else is lazily
224    /// constructed at first use.
225    pub fn new(journal_dir: PathBuf) -> Self {
226        Self {
227            journal_dir,
228            shared_memgine: None,
229            inference: None,
230        }
231    }
232
233    pub fn with_shared_memgine(mut self, engine: Arc<Mutex<car_memgine::MemgineEngine>>) -> Self {
234        self.shared_memgine = Some(engine);
235        self
236    }
237
238    pub fn with_inference(mut self, engine: Arc<car_inference::InferenceEngine>) -> Self {
239        self.inference = Some(engine);
240        self
241    }
242}
243
244/// Global server state shared across all connections.
245pub struct ServerState {
246    pub journal_dir: PathBuf,
247    pub sessions: Mutex<HashMap<String, Arc<ClientSession>>>,
248    pub inference: std::sync::OnceLock<Arc<car_inference::InferenceEngine>>,
249    pub host: Arc<crate::host::HostState>,
250    /// When `Some`, `create_session` clones this handle into every new
251    /// `ClientSession.memgine` — embedders that want a single shared
252    /// memgine across all WS sessions set this. Standalone car-server
253    /// leaves it `None`, which gives each session its own engine
254    /// (preserving today's behavior).
255    pub shared_memgine: Option<Arc<Mutex<car_memgine::MemgineEngine>>>,
256    /// Process-wide voice session registry. Each
257    /// `voice.transcribe_stream.start` call registers its own per-client
258    /// [`WsVoiceEventSink`] so events route back to the originating WS
259    /// connection only.
260    pub voice_sessions: Arc<car_voice::VoiceSessionRegistry>,
261    /// Process-wide meeting registry. Meeting ids are global; each
262    /// meeting binds to the originating client's WS for upstream
263    /// events but persists transcripts to the resolved
264    /// `.car/meetings/<id>/` regardless of which client started it.
265    pub meetings: Arc<car_meeting::MeetingRegistry>,
266    /// Process-wide A2UI surface store. Agent-produced surfaces are
267    /// visible to every host UI subscriber, independent of the
268    /// WebSocket session that applied the update.
269    pub a2ui: car_a2ui::A2uiSurfaceStore,
270    /// Process-wide concurrency gate for inference RPC handlers. Sized
271    /// from host RAM at startup, overridable via
272    /// [`crate::admission::ENV_MAX_CONCURRENT`]. Without this, N
273    /// concurrent users multiply KV-cache and activation memory and
274    /// take the host out (#114-adjacent: filed alongside the daemon
275    /// always-on rework). The semaphore lives on `ServerState` so it
276    /// is shared across every WebSocket session in the same process.
277    pub admission: Arc<crate::admission::InferenceAdmission>,
278    /// Server-side A2A continuation auth keyed by A2UI surface id.
279    /// Kept out of `A2uiSurface.owner` so host renderers never see
280    /// bearer/API-key material.
281    pub a2ui_route_auth: Mutex<HashMap<String, A2aRouteAuth>>,
282}
283
284impl ServerState {
285    /// Constructor for the standalone `car-server` binary. Each WS
286    /// connection gets its own per-session memgine — matches the
287    /// pre-extraction default and is correct for a single-process
288    /// daemon serving one user at a time.
289    ///
290    /// **Embedders must not call this.** It silently leaves
291    /// `shared_memgine = None`, which re-introduces the dual-memgine
292    /// bug U7 was created to prevent (one engine in the embedder, a
293    /// fresh one inside every WS session). Embedders use
294    /// [`ServerState::embedded`] instead, which makes the shared
295    /// engine handle a required argument so it cannot be forgotten.
296    pub fn standalone(journal_dir: PathBuf) -> Self {
297        Self::with_config(ServerStateConfig::new(journal_dir))
298    }
299
300    /// Constructor for embedders (e.g. `tokhn-daemon`). The shared
301    /// memgine handle is **required**: every WS session created by
302    /// this state will reuse the same engine, preventing the
303    /// dual-memgine bug.
304    ///
305    /// For embedders that also want to inject a pre-warmed inference
306    /// engine or other advanced wiring, build a [`ServerStateConfig`]
307    /// directly and call [`ServerState::with_config`].
308    pub fn embedded(
309        journal_dir: PathBuf,
310        shared_memgine: Arc<Mutex<car_memgine::MemgineEngine>>,
311    ) -> Self {
312        Self::with_config(ServerStateConfig::new(journal_dir).with_shared_memgine(shared_memgine))
313    }
314
315    /// Build a `ServerState` from a [`ServerStateConfig`] — the path
316    /// embedders use when they need to inject a shared memgine *and*
317    /// a pre-warmed inference engine, or any other advanced wiring
318    /// the convenience constructors don't cover.
319    pub fn with_config(cfg: ServerStateConfig) -> Self {
320        let inference = std::sync::OnceLock::new();
321        if let Some(eng) = cfg.inference {
322            // OnceLock::set returns Err if already set — fresh OnceLock
323            // means it's empty, so this is infallible here.
324            let _ = inference.set(eng);
325        }
326        let voice_sessions = Arc::new(car_voice::VoiceSessionRegistry::new());
327        // Reap sessions whose clients dropped without calling
328        // voice.transcribe_stream.stop (WS disconnect, process exit,
329        // etc.). Listener handles otherwise leak for the daemon's
330        // lifetime. `with_config` is sync but always called from the
331        // `#[tokio::main]` entry point, so `Handle::try_current()`
332        // inside `start_sweeper` finds the runtime.
333        voice_sessions.start_sweeper();
334        Self {
335            journal_dir: cfg.journal_dir,
336            sessions: Mutex::new(HashMap::new()),
337            inference,
338            host: Arc::new(crate::host::HostState::new()),
339            shared_memgine: cfg.shared_memgine,
340            voice_sessions,
341            meetings: Arc::new(car_meeting::MeetingRegistry::new()),
342            a2ui: car_a2ui::A2uiSurfaceStore::new(),
343            admission: Arc::new(crate::admission::InferenceAdmission::new()),
344            a2ui_route_auth: Mutex::new(HashMap::new()),
345        }
346    }
347
348    pub async fn create_session(
349        &self,
350        client_id: &str,
351        channel: Arc<WsChannel>,
352    ) -> Arc<ClientSession> {
353        let journal_path = self.journal_dir.join(format!("{}.jsonl", client_id));
354        let event_log = EventLog::with_journal(journal_path);
355
356        let executor = Arc::new(WsToolExecutor {
357            channel: channel.clone(),
358        });
359
360        let runtime = Runtime::new()
361            .with_event_log(event_log)
362            .with_executor(executor);
363
364        // If the embedder supplied a shared memgine, every session uses it.
365        // Otherwise each session gets its own — matches pre-extraction behavior.
366        let memgine = match &self.shared_memgine {
367            Some(eng) => eng.clone(),
368            None => Arc::new(Mutex::new(car_memgine::MemgineEngine::new(None))),
369        };
370
371        let session = Arc::new(ClientSession {
372            client_id: client_id.to_string(),
373            runtime: Arc::new(runtime),
374            channel,
375            host: self.host.clone(),
376            memgine,
377            browser: car_ffi_common::browser::BrowserSessionSlot::new(),
378        });
379
380        self.sessions
381            .lock()
382            .await
383            .insert(client_id.to_string(), session.clone());
384
385        session
386    }
387
388    /// Remove a per-client session from the registry on disconnect.
389    /// Returns the removed session if present so callers can drop any
390    /// remaining strong refs (e.g. drain pending tool callbacks). Fix
391    /// for MULTI-4 / WS-3 — without this, `state.sessions` retains
392    /// `Arc<ClientSession>` for every connection that ever existed.
393    pub async fn remove_session(&self, client_id: &str) -> Option<Arc<ClientSession>> {
394        self.sessions.lock().await.remove(client_id)
395    }
396}