car_server_core/session.rs
1//! Server-side session state — shared across all connections.
2
3use car_engine::{Runtime, ToolExecutor};
4use car_eventlog::EventLog;
5use car_proto::{ToolExecuteRequest, ToolExecuteResponse};
6use serde::{Deserialize, Serialize};
7use serde_json::Value;
8use std::collections::HashMap;
9use std::path::PathBuf;
10use std::sync::atomic::{AtomicU64, Ordering};
11use std::sync::Arc;
12use tokio::sync::{oneshot, Mutex};
13use tokio_tungstenite::tungstenite::Message;
14
15/// Server-side credentials for continuing an A2A-owned A2UI surface.
16///
17/// This intentionally lives outside `car_a2ui::A2uiSurfaceOwner` so
18/// renderers can inspect surface ownership without receiving secrets.
19#[derive(Debug, Clone, Serialize, Deserialize)]
20#[serde(rename_all = "camelCase", tag = "type")]
21pub enum A2aRouteAuth {
22 None,
23 Bearer { token: String },
24 Header { name: String, value: String },
25}
26
27/// Shared write half of the WebSocket, plus pending callback channels.
28pub struct WsChannel {
29 pub write: Mutex<
30 futures::stream::SplitSink<
31 tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>,
32 Message,
33 >,
34 >,
35 /// Pending tool execution callbacks: request_id → oneshot sender
36 pub pending: Mutex<HashMap<String, oneshot::Sender<ToolExecuteResponse>>>,
37 pub next_id: AtomicU64,
38}
39
40impl WsChannel {
41 pub fn next_request_id(&self) -> String {
42 let id = self.next_id.fetch_add(1, Ordering::SeqCst);
43 format!("cb-{}", id)
44 }
45}
46
47/// Tool executor that sends callbacks to the client over WebSocket.
48pub struct WsToolExecutor {
49 pub channel: Arc<WsChannel>,
50}
51
52#[async_trait::async_trait]
53impl ToolExecutor for WsToolExecutor {
54 async fn execute(&self, tool: &str, params: &Value) -> Result<Value, String> {
55 use futures::SinkExt;
56
57 let request_id = self.channel.next_request_id();
58
59 let callback = ToolExecuteRequest {
60 action_id: request_id.clone(),
61 tool: tool.to_string(),
62 parameters: params.clone(),
63 timeout_ms: None,
64 attempt: 1,
65 };
66
67 // Create a oneshot channel for the response
68 let (tx, rx) = oneshot::channel();
69 self.channel
70 .pending
71 .lock()
72 .await
73 .insert(request_id.clone(), tx);
74
75 // Send the callback to the client as a JSON-RPC request
76 let rpc_request = serde_json::json!({
77 "jsonrpc": "2.0",
78 "method": "tools.execute",
79 "params": callback,
80 "id": request_id,
81 });
82
83 let msg = Message::Text(
84 serde_json::to_string(&rpc_request)
85 .map_err(|e| e.to_string())?
86 .into(),
87 );
88 self.channel
89 .write
90 .lock()
91 .await
92 .send(msg)
93 .await
94 .map_err(|e| format!("failed to send tool callback: {}", e))?;
95
96 // Wait for the client to respond (with a timeout)
97 let response = tokio::time::timeout(std::time::Duration::from_secs(60), rx)
98 .await
99 .map_err(|_| format!("tool '{}' callback timed out (60s)", tool))?
100 .map_err(|_| format!("tool '{}' callback channel closed", tool))?;
101
102 if let Some(err) = response.error {
103 Err(err)
104 } else {
105 Ok(response.output.unwrap_or(Value::Null))
106 }
107 }
108}
109
110/// Voice event sink that forwards events to a specific WebSocket client
111/// as `voice.event` JSON-RPC notifications.
112///
113/// Each `voice.transcribe_stream.start` call constructs one of these
114/// bound to the originating client's [`WsChannel`], so a client only
115/// receives events for sessions it started.
116pub struct WsVoiceEventSink {
117 pub channel: Arc<WsChannel>,
118}
119
120impl car_voice::VoiceEventSink for WsVoiceEventSink {
121 fn send(&self, session_id: &str, event_json: String) {
122 use futures::SinkExt;
123 let channel = self.channel.clone();
124 let session_id = session_id.to_string();
125 tokio::spawn(async move {
126 let payload: Value = serde_json::from_str(&event_json)
127 .unwrap_or_else(|_| Value::String(event_json.clone()));
128 let notification = serde_json::json!({
129 "jsonrpc": "2.0",
130 "method": "voice.event",
131 "params": {
132 "session_id": session_id,
133 "event": payload,
134 },
135 });
136 let Ok(text) = serde_json::to_string(¬ification) else {
137 return;
138 };
139 let _ = channel
140 .write
141 .lock()
142 .await
143 .send(Message::Text(text.into()))
144 .await;
145 });
146 }
147}
148
149/// Per-meeting fanout sink that ingests transcript text into a
150/// session-scoped memgine using the `Arc<tokio::sync::Mutex<...>>`
151/// wrapper, then forwards every event upstream untouched.
152///
153/// Lives here (not in `car-ffi-common`) because the engine handle uses
154/// `tokio::sync::Mutex` per the "one-wrapper rule" — the FFI-common
155/// `MeetingMemgineFanout` still uses `std::sync::Mutex` for the NAPI/
156/// PyO3 bindings, which keep their sync wrappers. Each binding owns the
157/// fanout that matches its lock primitive; the parsing/formatting logic
158/// itself is shared via [`car_meeting::extract_transcript_for_ingest`].
159///
160/// `send` is called from the voice drain task and must be non-blocking,
161/// so the lock acquisition is shipped to a `tokio::spawn`. Transcript
162/// events are independent so reordering across spawned tasks is fine.
163pub struct WsMemgineIngestSink {
164 pub meeting_id: String,
165 pub engine: Arc<Mutex<car_memgine::MemgineEngine>>,
166 pub upstream: Arc<dyn car_voice::VoiceEventSink>,
167}
168
169impl car_voice::VoiceEventSink for WsMemgineIngestSink {
170 fn send(&self, voice_session_id: &str, event_json: String) {
171 if let Ok(value) = serde_json::from_str::<Value>(&event_json) {
172 if let Some((speaker, text)) = car_meeting::extract_transcript_for_ingest(
173 &value,
174 &self.meeting_id,
175 voice_session_id,
176 ) {
177 let engine = self.engine.clone();
178 tokio::spawn(async move {
179 let mut guard = engine.lock().await;
180 guard.ingest_conversation(&speaker, &text, chrono::Utc::now());
181 });
182 }
183 }
184 self.upstream.send(voice_session_id, event_json);
185 }
186}
187
188/// Per-client session.
189pub struct ClientSession {
190 pub client_id: String,
191 pub runtime: Arc<Runtime>,
192 pub channel: Arc<WsChannel>,
193 pub host: Arc<crate::host::HostState>,
194 /// Memgine handle. Wrapped in `tokio::sync::Mutex` so dispatcher
195 /// handlers can hold the lock across `.await` points without
196 /// risking poisoning. Migrated from `std::sync::Mutex` in the
197 /// car-server-core extraction (U1) per the "one-wrapper rule".
198 pub memgine: Arc<Mutex<car_memgine::MemgineEngine>>,
199 /// Lazy browser session — first `browser.run` call launches Chromium,
200 /// subsequent calls reuse it so element IDs resolve across invocations
201 /// within the same WebSocket connection.
202 pub browser: car_ffi_common::browser::BrowserSessionSlot,
203}
204
205/// Builder for constructing a [`ServerState`] with embedder-supplied
206/// dependencies. Embedders (e.g. `tokhn-daemon`) use this to inject
207/// their own memgine handle and other shared infrastructure; the
208/// standalone `car-server` binary uses [`ServerState::standalone`]
209/// which calls `with_config` under the hood.
210pub struct ServerStateConfig {
211 pub journal_dir: PathBuf,
212 /// Optional pre-constructed memgine engine. When `None`, each
213 /// `create_session` call builds a fresh engine; embedders that want
214 /// to share a single engine across sessions can supply a clone of
215 /// their `Arc<Mutex<MemgineEngine>>` here.
216 pub shared_memgine: Option<Arc<Mutex<car_memgine::MemgineEngine>>>,
217 /// Optional pre-constructed inference engine.
218 pub inference: Option<Arc<car_inference::InferenceEngine>>,
219}
220
221impl ServerStateConfig {
222 /// Minimal config suitable for the standalone car-server binary:
223 /// only the journal dir is required; everything else is lazily
224 /// constructed at first use.
225 pub fn new(journal_dir: PathBuf) -> Self {
226 Self {
227 journal_dir,
228 shared_memgine: None,
229 inference: None,
230 }
231 }
232
233 pub fn with_shared_memgine(mut self, engine: Arc<Mutex<car_memgine::MemgineEngine>>) -> Self {
234 self.shared_memgine = Some(engine);
235 self
236 }
237
238 pub fn with_inference(mut self, engine: Arc<car_inference::InferenceEngine>) -> Self {
239 self.inference = Some(engine);
240 self
241 }
242}
243
244/// Global server state shared across all connections.
245pub struct ServerState {
246 pub journal_dir: PathBuf,
247 pub sessions: Mutex<HashMap<String, Arc<ClientSession>>>,
248 pub inference: std::sync::OnceLock<Arc<car_inference::InferenceEngine>>,
249 pub host: Arc<crate::host::HostState>,
250 /// When `Some`, `create_session` clones this handle into every new
251 /// `ClientSession.memgine` — embedders that want a single shared
252 /// memgine across all WS sessions set this. Standalone car-server
253 /// leaves it `None`, which gives each session its own engine
254 /// (preserving today's behavior).
255 pub shared_memgine: Option<Arc<Mutex<car_memgine::MemgineEngine>>>,
256 /// Process-wide voice session registry. Each
257 /// `voice.transcribe_stream.start` call registers its own per-client
258 /// [`WsVoiceEventSink`] so events route back to the originating WS
259 /// connection only.
260 pub voice_sessions: Arc<car_voice::VoiceSessionRegistry>,
261 /// Process-wide meeting registry. Meeting ids are global; each
262 /// meeting binds to the originating client's WS for upstream
263 /// events but persists transcripts to the resolved
264 /// `.car/meetings/<id>/` regardless of which client started it.
265 pub meetings: Arc<car_meeting::MeetingRegistry>,
266 /// Process-wide A2UI surface store. Agent-produced surfaces are
267 /// visible to every host UI subscriber, independent of the
268 /// WebSocket session that applied the update.
269 pub a2ui: car_a2ui::A2uiSurfaceStore,
270 /// Process-wide concurrency gate for inference RPC handlers. Sized
271 /// from host RAM at startup, overridable via
272 /// [`crate::admission::ENV_MAX_CONCURRENT`]. Without this, N
273 /// concurrent users multiply KV-cache and activation memory and
274 /// take the host out (#114-adjacent: filed alongside the daemon
275 /// always-on rework). The semaphore lives on `ServerState` so it
276 /// is shared across every WebSocket session in the same process.
277 pub admission: Arc<crate::admission::InferenceAdmission>,
278 /// Server-side A2A continuation auth keyed by A2UI surface id.
279 /// Kept out of `A2uiSurface.owner` so host renderers never see
280 /// bearer/API-key material.
281 pub a2ui_route_auth: Mutex<HashMap<String, A2aRouteAuth>>,
282}
283
284impl ServerState {
285 /// Constructor for the standalone `car-server` binary. Each WS
286 /// connection gets its own per-session memgine — matches the
287 /// pre-extraction default and is correct for a single-process
288 /// daemon serving one user at a time.
289 ///
290 /// **Embedders must not call this.** It silently leaves
291 /// `shared_memgine = None`, which re-introduces the dual-memgine
292 /// bug U7 was created to prevent (one engine in the embedder, a
293 /// fresh one inside every WS session). Embedders use
294 /// [`ServerState::embedded`] instead, which makes the shared
295 /// engine handle a required argument so it cannot be forgotten.
296 pub fn standalone(journal_dir: PathBuf) -> Self {
297 Self::with_config(ServerStateConfig::new(journal_dir))
298 }
299
300 /// Constructor for embedders (e.g. `tokhn-daemon`). The shared
301 /// memgine handle is **required**: every WS session created by
302 /// this state will reuse the same engine, preventing the
303 /// dual-memgine bug.
304 ///
305 /// For embedders that also want to inject a pre-warmed inference
306 /// engine or other advanced wiring, build a [`ServerStateConfig`]
307 /// directly and call [`ServerState::with_config`].
308 pub fn embedded(
309 journal_dir: PathBuf,
310 shared_memgine: Arc<Mutex<car_memgine::MemgineEngine>>,
311 ) -> Self {
312 Self::with_config(ServerStateConfig::new(journal_dir).with_shared_memgine(shared_memgine))
313 }
314
315 /// Build a `ServerState` from a [`ServerStateConfig`] — the path
316 /// embedders use when they need to inject a shared memgine *and*
317 /// a pre-warmed inference engine, or any other advanced wiring
318 /// the convenience constructors don't cover.
319 pub fn with_config(cfg: ServerStateConfig) -> Self {
320 let inference = std::sync::OnceLock::new();
321 if let Some(eng) = cfg.inference {
322 // OnceLock::set returns Err if already set — fresh OnceLock
323 // means it's empty, so this is infallible here.
324 let _ = inference.set(eng);
325 }
326 let voice_sessions = Arc::new(car_voice::VoiceSessionRegistry::new());
327 // Reap sessions whose clients dropped without calling
328 // voice.transcribe_stream.stop (WS disconnect, process exit,
329 // etc.). Listener handles otherwise leak for the daemon's
330 // lifetime. `with_config` is sync but always called from the
331 // `#[tokio::main]` entry point, so `Handle::try_current()`
332 // inside `start_sweeper` finds the runtime.
333 voice_sessions.start_sweeper();
334 Self {
335 journal_dir: cfg.journal_dir,
336 sessions: Mutex::new(HashMap::new()),
337 inference,
338 host: Arc::new(crate::host::HostState::new()),
339 shared_memgine: cfg.shared_memgine,
340 voice_sessions,
341 meetings: Arc::new(car_meeting::MeetingRegistry::new()),
342 a2ui: car_a2ui::A2uiSurfaceStore::new(),
343 admission: Arc::new(crate::admission::InferenceAdmission::new()),
344 a2ui_route_auth: Mutex::new(HashMap::new()),
345 }
346 }
347
348 pub async fn create_session(
349 &self,
350 client_id: &str,
351 channel: Arc<WsChannel>,
352 ) -> Arc<ClientSession> {
353 let journal_path = self.journal_dir.join(format!("{}.jsonl", client_id));
354 let event_log = EventLog::with_journal(journal_path);
355
356 let executor = Arc::new(WsToolExecutor {
357 channel: channel.clone(),
358 });
359
360 let runtime = Runtime::new()
361 .with_event_log(event_log)
362 .with_executor(executor);
363
364 // If the embedder supplied a shared memgine, every session uses it.
365 // Otherwise each session gets its own — matches pre-extraction behavior.
366 let memgine = match &self.shared_memgine {
367 Some(eng) => eng.clone(),
368 None => Arc::new(Mutex::new(car_memgine::MemgineEngine::new(None))),
369 };
370
371 let session = Arc::new(ClientSession {
372 client_id: client_id.to_string(),
373 runtime: Arc::new(runtime),
374 channel,
375 host: self.host.clone(),
376 memgine,
377 browser: car_ffi_common::browser::BrowserSessionSlot::new(),
378 });
379
380 self.sessions
381 .lock()
382 .await
383 .insert(client_id.to_string(), session.clone());
384
385 session
386 }
387
388 /// Remove a per-client session from the registry on disconnect.
389 /// Returns the removed session if present so callers can drop any
390 /// remaining strong refs (e.g. drain pending tool callbacks). Fix
391 /// for MULTI-4 / WS-3 — without this, `state.sessions` retains
392 /// `Arc<ClientSession>` for every connection that ever existed.
393 pub async fn remove_session(&self, client_id: &str) -> Option<Arc<ClientSession>> {
394 self.sessions.lock().await.remove(client_id)
395 }
396}