car_server_core/session.rs
1//! Server-side session state — shared across all connections.
2
3use car_engine::{Runtime, ToolExecutor};
4use car_eventlog::EventLog;
5use car_proto::{ToolExecuteRequest, ToolExecuteResponse};
6use serde_json::Value;
7use std::collections::HashMap;
8use std::path::PathBuf;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::Arc;
11use tokio::sync::{oneshot, Mutex};
12use tokio_tungstenite::tungstenite::Message;
13
14/// Shared write half of the WebSocket, plus pending callback channels.
15pub struct WsChannel {
16 pub write: Mutex<
17 futures::stream::SplitSink<
18 tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>,
19 Message,
20 >,
21 >,
22 /// Pending tool execution callbacks: request_id → oneshot sender
23 pub pending: Mutex<HashMap<String, oneshot::Sender<ToolExecuteResponse>>>,
24 pub next_id: AtomicU64,
25}
26
27impl WsChannel {
28 pub fn next_request_id(&self) -> String {
29 let id = self.next_id.fetch_add(1, Ordering::SeqCst);
30 format!("cb-{}", id)
31 }
32}
33
34/// Tool executor that sends callbacks to the client over WebSocket.
35pub struct WsToolExecutor {
36 pub channel: Arc<WsChannel>,
37}
38
39#[async_trait::async_trait]
40impl ToolExecutor for WsToolExecutor {
41 async fn execute(&self, tool: &str, params: &Value) -> Result<Value, String> {
42 use futures::SinkExt;
43
44 let request_id = self.channel.next_request_id();
45
46 let callback = ToolExecuteRequest {
47 action_id: request_id.clone(),
48 tool: tool.to_string(),
49 parameters: params.clone(),
50 timeout_ms: None,
51 attempt: 1,
52 };
53
54 // Create a oneshot channel for the response
55 let (tx, rx) = oneshot::channel();
56 self.channel
57 .pending
58 .lock()
59 .await
60 .insert(request_id.clone(), tx);
61
62 // Send the callback to the client as a JSON-RPC request
63 let rpc_request = serde_json::json!({
64 "jsonrpc": "2.0",
65 "method": "tools.execute",
66 "params": callback,
67 "id": request_id,
68 });
69
70 let msg = Message::Text(
71 serde_json::to_string(&rpc_request)
72 .map_err(|e| e.to_string())?
73 .into(),
74 );
75 self.channel
76 .write
77 .lock()
78 .await
79 .send(msg)
80 .await
81 .map_err(|e| format!("failed to send tool callback: {}", e))?;
82
83 // Wait for the client to respond (with a timeout)
84 let response = tokio::time::timeout(std::time::Duration::from_secs(60), rx)
85 .await
86 .map_err(|_| format!("tool '{}' callback timed out (60s)", tool))?
87 .map_err(|_| format!("tool '{}' callback channel closed", tool))?;
88
89 if let Some(err) = response.error {
90 Err(err)
91 } else {
92 Ok(response.output.unwrap_or(Value::Null))
93 }
94 }
95}
96
97/// Voice event sink that forwards events to a specific WebSocket client
98/// as `voice.event` JSON-RPC notifications.
99///
100/// Each `voice.transcribe_stream.start` call constructs one of these
101/// bound to the originating client's [`WsChannel`], so a client only
102/// receives events for sessions it started.
103pub struct WsVoiceEventSink {
104 pub channel: Arc<WsChannel>,
105}
106
107impl car_voice::VoiceEventSink for WsVoiceEventSink {
108 fn send(&self, session_id: &str, event_json: String) {
109 use futures::SinkExt;
110 let channel = self.channel.clone();
111 let session_id = session_id.to_string();
112 tokio::spawn(async move {
113 let payload: Value = serde_json::from_str(&event_json)
114 .unwrap_or_else(|_| Value::String(event_json.clone()));
115 let notification = serde_json::json!({
116 "jsonrpc": "2.0",
117 "method": "voice.event",
118 "params": {
119 "session_id": session_id,
120 "event": payload,
121 },
122 });
123 let Ok(text) = serde_json::to_string(¬ification) else {
124 return;
125 };
126 let _ = channel
127 .write
128 .lock()
129 .await
130 .send(Message::Text(text.into()))
131 .await;
132 });
133 }
134}
135
136/// Per-meeting fanout sink that ingests transcript text into a
137/// session-scoped memgine using the `Arc<tokio::sync::Mutex<...>>`
138/// wrapper, then forwards every event upstream untouched.
139///
140/// Lives here (not in `car-ffi-common`) because the engine handle uses
141/// `tokio::sync::Mutex` per the "one-wrapper rule" — the FFI-common
142/// `MeetingMemgineFanout` still uses `std::sync::Mutex` for the NAPI/
143/// PyO3 bindings, which keep their sync wrappers. Each binding owns the
144/// fanout that matches its lock primitive; the parsing/formatting logic
145/// itself is shared via [`car_meeting::extract_transcript_for_ingest`].
146///
147/// `send` is called from the voice drain task and must be non-blocking,
148/// so the lock acquisition is shipped to a `tokio::spawn`. Transcript
149/// events are independent so reordering across spawned tasks is fine.
150pub struct WsMemgineIngestSink {
151 pub meeting_id: String,
152 pub engine: Arc<Mutex<car_memgine::MemgineEngine>>,
153 pub upstream: Arc<dyn car_voice::VoiceEventSink>,
154}
155
156impl car_voice::VoiceEventSink for WsMemgineIngestSink {
157 fn send(&self, voice_session_id: &str, event_json: String) {
158 if let Ok(value) = serde_json::from_str::<Value>(&event_json) {
159 if let Some((speaker, text)) = car_meeting::extract_transcript_for_ingest(
160 &value,
161 &self.meeting_id,
162 voice_session_id,
163 ) {
164 let engine = self.engine.clone();
165 tokio::spawn(async move {
166 let mut guard = engine.lock().await;
167 guard.ingest_conversation(&speaker, &text, chrono::Utc::now());
168 });
169 }
170 }
171 self.upstream.send(voice_session_id, event_json);
172 }
173}
174
175/// Per-client session.
176pub struct ClientSession {
177 pub client_id: String,
178 pub runtime: Arc<Runtime>,
179 pub channel: Arc<WsChannel>,
180 pub host: Arc<crate::host::HostState>,
181 /// Memgine handle. Wrapped in `tokio::sync::Mutex` so dispatcher
182 /// handlers can hold the lock across `.await` points without
183 /// risking poisoning. Migrated from `std::sync::Mutex` in the
184 /// car-server-core extraction (U1) per the "one-wrapper rule".
185 pub memgine: Arc<Mutex<car_memgine::MemgineEngine>>,
186 /// Lazy browser session — first `browser.run` call launches Chromium,
187 /// subsequent calls reuse it so element IDs resolve across invocations
188 /// within the same WebSocket connection.
189 pub browser: car_ffi_common::browser::BrowserSessionSlot,
190}
191
192/// Builder for constructing a [`ServerState`] with embedder-supplied
193/// dependencies. Embedders (e.g. `tokhn-daemon`) use this to inject
194/// their own memgine handle and other shared infrastructure; the
195/// standalone `car-server` binary uses [`ServerState::standalone`]
196/// which calls `with_config` under the hood.
197pub struct ServerStateConfig {
198 pub journal_dir: PathBuf,
199 /// Optional pre-constructed memgine engine. When `None`, each
200 /// `create_session` call builds a fresh engine; embedders that want
201 /// to share a single engine across sessions can supply a clone of
202 /// their `Arc<Mutex<MemgineEngine>>` here.
203 pub shared_memgine: Option<Arc<Mutex<car_memgine::MemgineEngine>>>,
204 /// Optional pre-constructed inference engine.
205 pub inference: Option<Arc<car_inference::InferenceEngine>>,
206}
207
208impl ServerStateConfig {
209 /// Minimal config suitable for the standalone car-server binary:
210 /// only the journal dir is required; everything else is lazily
211 /// constructed at first use.
212 pub fn new(journal_dir: PathBuf) -> Self {
213 Self {
214 journal_dir,
215 shared_memgine: None,
216 inference: None,
217 }
218 }
219
220 pub fn with_shared_memgine(
221 mut self,
222 engine: Arc<Mutex<car_memgine::MemgineEngine>>,
223 ) -> Self {
224 self.shared_memgine = Some(engine);
225 self
226 }
227
228 pub fn with_inference(mut self, engine: Arc<car_inference::InferenceEngine>) -> Self {
229 self.inference = Some(engine);
230 self
231 }
232}
233
234/// Global server state shared across all connections.
235pub struct ServerState {
236 pub journal_dir: PathBuf,
237 pub sessions: Mutex<HashMap<String, Arc<ClientSession>>>,
238 pub inference: std::sync::OnceLock<Arc<car_inference::InferenceEngine>>,
239 pub host: Arc<crate::host::HostState>,
240 /// When `Some`, `create_session` clones this handle into every new
241 /// `ClientSession.memgine` — embedders that want a single shared
242 /// memgine across all WS sessions set this. Standalone car-server
243 /// leaves it `None`, which gives each session its own engine
244 /// (preserving today's behavior).
245 pub shared_memgine: Option<Arc<Mutex<car_memgine::MemgineEngine>>>,
246 /// Process-wide voice session registry. Each
247 /// `voice.transcribe_stream.start` call registers its own per-client
248 /// [`WsVoiceEventSink`] so events route back to the originating WS
249 /// connection only.
250 pub voice_sessions: Arc<car_voice::VoiceSessionRegistry>,
251 /// Process-wide meeting registry. Meeting ids are global; each
252 /// meeting binds to the originating client's WS for upstream
253 /// events but persists transcripts to the resolved
254 /// `.car/meetings/<id>/` regardless of which client started it.
255 pub meetings: Arc<car_meeting::MeetingRegistry>,
256 /// Process-wide concurrency gate for inference RPC handlers. Sized
257 /// from host RAM at startup, overridable via
258 /// [`crate::admission::ENV_MAX_CONCURRENT`]. Without this, N
259 /// concurrent users multiply KV-cache and activation memory and
260 /// take the host out (#114-adjacent: filed alongside the daemon
261 /// always-on rework). The semaphore lives on `ServerState` so it
262 /// is shared across every WebSocket session in the same process.
263 pub admission: Arc<crate::admission::InferenceAdmission>,
264}
265
266impl ServerState {
267 /// Constructor for the standalone `car-server` binary. Each WS
268 /// connection gets its own per-session memgine — matches the
269 /// pre-extraction default and is correct for a single-process
270 /// daemon serving one user at a time.
271 ///
272 /// **Embedders must not call this.** It silently leaves
273 /// `shared_memgine = None`, which re-introduces the dual-memgine
274 /// bug U7 was created to prevent (one engine in the embedder, a
275 /// fresh one inside every WS session). Embedders use
276 /// [`ServerState::embedded`] instead, which makes the shared
277 /// engine handle a required argument so it cannot be forgotten.
278 pub fn standalone(journal_dir: PathBuf) -> Self {
279 Self::with_config(ServerStateConfig::new(journal_dir))
280 }
281
282 /// Constructor for embedders (e.g. `tokhn-daemon`). The shared
283 /// memgine handle is **required**: every WS session created by
284 /// this state will reuse the same engine, preventing the
285 /// dual-memgine bug.
286 ///
287 /// For embedders that also want to inject a pre-warmed inference
288 /// engine or other advanced wiring, build a [`ServerStateConfig`]
289 /// directly and call [`ServerState::with_config`].
290 pub fn embedded(
291 journal_dir: PathBuf,
292 shared_memgine: Arc<Mutex<car_memgine::MemgineEngine>>,
293 ) -> Self {
294 Self::with_config(
295 ServerStateConfig::new(journal_dir).with_shared_memgine(shared_memgine),
296 )
297 }
298
299 /// Build a `ServerState` from a [`ServerStateConfig`] — the path
300 /// embedders use when they need to inject a shared memgine *and*
301 /// a pre-warmed inference engine, or any other advanced wiring
302 /// the convenience constructors don't cover.
303 pub fn with_config(cfg: ServerStateConfig) -> Self {
304 let inference = std::sync::OnceLock::new();
305 if let Some(eng) = cfg.inference {
306 // OnceLock::set returns Err if already set — fresh OnceLock
307 // means it's empty, so this is infallible here.
308 let _ = inference.set(eng);
309 }
310 Self {
311 journal_dir: cfg.journal_dir,
312 sessions: Mutex::new(HashMap::new()),
313 inference,
314 host: Arc::new(crate::host::HostState::new()),
315 shared_memgine: cfg.shared_memgine,
316 voice_sessions: Arc::new(car_voice::VoiceSessionRegistry::new()),
317 meetings: Arc::new(car_meeting::MeetingRegistry::new()),
318 admission: Arc::new(crate::admission::InferenceAdmission::new()),
319 }
320 }
321
322 pub async fn create_session(
323 &self,
324 client_id: &str,
325 channel: Arc<WsChannel>,
326 ) -> Arc<ClientSession> {
327 let journal_path = self.journal_dir.join(format!("{}.jsonl", client_id));
328 let event_log = EventLog::with_journal(journal_path);
329
330 let executor = Arc::new(WsToolExecutor {
331 channel: channel.clone(),
332 });
333
334 let runtime = Runtime::new()
335 .with_event_log(event_log)
336 .with_executor(executor);
337
338 // If the embedder supplied a shared memgine, every session uses it.
339 // Otherwise each session gets its own — matches pre-extraction behavior.
340 let memgine = match &self.shared_memgine {
341 Some(eng) => eng.clone(),
342 None => Arc::new(Mutex::new(car_memgine::MemgineEngine::new(None))),
343 };
344
345 let session = Arc::new(ClientSession {
346 client_id: client_id.to_string(),
347 runtime: Arc::new(runtime),
348 channel,
349 host: self.host.clone(),
350 memgine,
351 browser: car_ffi_common::browser::BrowserSessionSlot::new(),
352 });
353
354 self.sessions
355 .lock()
356 .await
357 .insert(client_id.to_string(), session.clone());
358
359 session
360 }
361
362 /// Remove a per-client session from the registry on disconnect.
363 /// Returns the removed session if present so callers can drop any
364 /// remaining strong refs (e.g. drain pending tool callbacks). Fix
365 /// for MULTI-4 / WS-3 — without this, `state.sessions` retains
366 /// `Arc<ClientSession>` for every connection that ever existed.
367 pub async fn remove_session(&self, client_id: &str) -> Option<Arc<ClientSession>> {
368 self.sessions.lock().await.remove(client_id)
369 }
370}