Skip to main content

car_server_core/
session.rs

1//! Server-side session state — shared across all connections.
2
3use car_engine::{Runtime, ToolExecutor};
4use car_eventlog::EventLog;
5use car_proto::{ToolExecuteRequest, ToolExecuteResponse};
6use serde_json::Value;
7use std::collections::HashMap;
8use std::path::PathBuf;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::Arc;
11use tokio::sync::{oneshot, Mutex};
12use tokio_tungstenite::tungstenite::Message;
13
14/// Shared write half of the WebSocket, plus pending callback channels.
15pub struct WsChannel {
16    pub write: Mutex<
17        futures::stream::SplitSink<
18            tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>,
19            Message,
20        >,
21    >,
22    /// Pending tool execution callbacks: request_id → oneshot sender
23    pub pending: Mutex<HashMap<String, oneshot::Sender<ToolExecuteResponse>>>,
24    pub next_id: AtomicU64,
25}
26
27impl WsChannel {
28    pub fn next_request_id(&self) -> String {
29        let id = self.next_id.fetch_add(1, Ordering::SeqCst);
30        format!("cb-{}", id)
31    }
32}
33
34/// Tool executor that sends callbacks to the client over WebSocket.
35pub struct WsToolExecutor {
36    pub channel: Arc<WsChannel>,
37}
38
39#[async_trait::async_trait]
40impl ToolExecutor for WsToolExecutor {
41    async fn execute(&self, tool: &str, params: &Value) -> Result<Value, String> {
42        use futures::SinkExt;
43
44        let request_id = self.channel.next_request_id();
45
46        let callback = ToolExecuteRequest {
47            action_id: request_id.clone(),
48            tool: tool.to_string(),
49            parameters: params.clone(),
50            timeout_ms: None,
51            attempt: 1,
52        };
53
54        // Create a oneshot channel for the response
55        let (tx, rx) = oneshot::channel();
56        self.channel
57            .pending
58            .lock()
59            .await
60            .insert(request_id.clone(), tx);
61
62        // Send the callback to the client as a JSON-RPC request
63        let rpc_request = serde_json::json!({
64            "jsonrpc": "2.0",
65            "method": "tools.execute",
66            "params": callback,
67            "id": request_id,
68        });
69
70        let msg = Message::Text(
71            serde_json::to_string(&rpc_request)
72                .map_err(|e| e.to_string())?
73                .into(),
74        );
75        self.channel
76            .write
77            .lock()
78            .await
79            .send(msg)
80            .await
81            .map_err(|e| format!("failed to send tool callback: {}", e))?;
82
83        // Wait for the client to respond (with a timeout)
84        let response = tokio::time::timeout(std::time::Duration::from_secs(60), rx)
85            .await
86            .map_err(|_| format!("tool '{}' callback timed out (60s)", tool))?
87            .map_err(|_| format!("tool '{}' callback channel closed", tool))?;
88
89        if let Some(err) = response.error {
90            Err(err)
91        } else {
92            Ok(response.output.unwrap_or(Value::Null))
93        }
94    }
95}
96
97/// Voice event sink that forwards events to a specific WebSocket client
98/// as `voice.event` JSON-RPC notifications.
99///
100/// Each `voice.transcribe_stream.start` call constructs one of these
101/// bound to the originating client's [`WsChannel`], so a client only
102/// receives events for sessions it started.
103pub struct WsVoiceEventSink {
104    pub channel: Arc<WsChannel>,
105}
106
107impl car_voice::VoiceEventSink for WsVoiceEventSink {
108    fn send(&self, session_id: &str, event_json: String) {
109        use futures::SinkExt;
110        let channel = self.channel.clone();
111        let session_id = session_id.to_string();
112        tokio::spawn(async move {
113            let payload: Value = serde_json::from_str(&event_json)
114                .unwrap_or_else(|_| Value::String(event_json.clone()));
115            let notification = serde_json::json!({
116                "jsonrpc": "2.0",
117                "method": "voice.event",
118                "params": {
119                    "session_id": session_id,
120                    "event": payload,
121                },
122            });
123            let Ok(text) = serde_json::to_string(&notification) else {
124                return;
125            };
126            let _ = channel
127                .write
128                .lock()
129                .await
130                .send(Message::Text(text.into()))
131                .await;
132        });
133    }
134}
135
136/// Per-meeting fanout sink that ingests transcript text into a
137/// session-scoped memgine using the `Arc<tokio::sync::Mutex<...>>`
138/// wrapper, then forwards every event upstream untouched.
139///
140/// Lives here (not in `car-ffi-common`) because the engine handle uses
141/// `tokio::sync::Mutex` per the "one-wrapper rule" — the FFI-common
142/// `MeetingMemgineFanout` still uses `std::sync::Mutex` for the NAPI/
143/// PyO3 bindings, which keep their sync wrappers. Each binding owns the
144/// fanout that matches its lock primitive; the parsing/formatting logic
145/// itself is shared via [`car_meeting::extract_transcript_for_ingest`].
146///
147/// `send` is called from the voice drain task and must be non-blocking,
148/// so the lock acquisition is shipped to a `tokio::spawn`. Transcript
149/// events are independent so reordering across spawned tasks is fine.
150pub struct WsMemgineIngestSink {
151    pub meeting_id: String,
152    pub engine: Arc<Mutex<car_memgine::MemgineEngine>>,
153    pub upstream: Arc<dyn car_voice::VoiceEventSink>,
154}
155
156impl car_voice::VoiceEventSink for WsMemgineIngestSink {
157    fn send(&self, voice_session_id: &str, event_json: String) {
158        if let Ok(value) = serde_json::from_str::<Value>(&event_json) {
159            if let Some((speaker, text)) = car_meeting::extract_transcript_for_ingest(
160                &value,
161                &self.meeting_id,
162                voice_session_id,
163            ) {
164                let engine = self.engine.clone();
165                tokio::spawn(async move {
166                    let mut guard = engine.lock().await;
167                    guard.ingest_conversation(&speaker, &text, chrono::Utc::now());
168                });
169            }
170        }
171        self.upstream.send(voice_session_id, event_json);
172    }
173}
174
175/// Per-client session.
176pub struct ClientSession {
177    pub client_id: String,
178    pub runtime: Arc<Runtime>,
179    pub channel: Arc<WsChannel>,
180    pub host: Arc<crate::host::HostState>,
181    /// Memgine handle. Wrapped in `tokio::sync::Mutex` so dispatcher
182    /// handlers can hold the lock across `.await` points without
183    /// risking poisoning. Migrated from `std::sync::Mutex` in the
184    /// car-server-core extraction (U1) per the "one-wrapper rule".
185    pub memgine: Arc<Mutex<car_memgine::MemgineEngine>>,
186    /// Lazy browser session — first `browser.run` call launches Chromium,
187    /// subsequent calls reuse it so element IDs resolve across invocations
188    /// within the same WebSocket connection.
189    pub browser: car_ffi_common::browser::BrowserSessionSlot,
190}
191
192/// Builder for constructing a [`ServerState`] with embedder-supplied
193/// dependencies. Embedders (e.g. `tokhn-daemon`) use this to inject
194/// their own memgine handle and other shared infrastructure; the
195/// standalone `car-server` binary uses [`ServerState::standalone`]
196/// which calls `with_config` under the hood.
197pub struct ServerStateConfig {
198    pub journal_dir: PathBuf,
199    /// Optional pre-constructed memgine engine. When `None`, each
200    /// `create_session` call builds a fresh engine; embedders that want
201    /// to share a single engine across sessions can supply a clone of
202    /// their `Arc<Mutex<MemgineEngine>>` here.
203    pub shared_memgine: Option<Arc<Mutex<car_memgine::MemgineEngine>>>,
204    /// Optional pre-constructed inference engine.
205    pub inference: Option<Arc<car_inference::InferenceEngine>>,
206}
207
208impl ServerStateConfig {
209    /// Minimal config suitable for the standalone car-server binary:
210    /// only the journal dir is required; everything else is lazily
211    /// constructed at first use.
212    pub fn new(journal_dir: PathBuf) -> Self {
213        Self {
214            journal_dir,
215            shared_memgine: None,
216            inference: None,
217        }
218    }
219
220    pub fn with_shared_memgine(
221        mut self,
222        engine: Arc<Mutex<car_memgine::MemgineEngine>>,
223    ) -> Self {
224        self.shared_memgine = Some(engine);
225        self
226    }
227
228    pub fn with_inference(mut self, engine: Arc<car_inference::InferenceEngine>) -> Self {
229        self.inference = Some(engine);
230        self
231    }
232}
233
234/// Global server state shared across all connections.
235pub struct ServerState {
236    pub journal_dir: PathBuf,
237    pub sessions: Mutex<HashMap<String, Arc<ClientSession>>>,
238    pub inference: std::sync::OnceLock<Arc<car_inference::InferenceEngine>>,
239    pub host: Arc<crate::host::HostState>,
240    /// When `Some`, `create_session` clones this handle into every new
241    /// `ClientSession.memgine` — embedders that want a single shared
242    /// memgine across all WS sessions set this. Standalone car-server
243    /// leaves it `None`, which gives each session its own engine
244    /// (preserving today's behavior).
245    pub shared_memgine: Option<Arc<Mutex<car_memgine::MemgineEngine>>>,
246    /// Process-wide voice session registry. Each
247    /// `voice.transcribe_stream.start` call registers its own per-client
248    /// [`WsVoiceEventSink`] so events route back to the originating WS
249    /// connection only.
250    pub voice_sessions: Arc<car_voice::VoiceSessionRegistry>,
251    /// Process-wide meeting registry. Meeting ids are global; each
252    /// meeting binds to the originating client's WS for upstream
253    /// events but persists transcripts to the resolved
254    /// `.car/meetings/<id>/` regardless of which client started it.
255    pub meetings: Arc<car_meeting::MeetingRegistry>,
256    /// Process-wide concurrency gate for inference RPC handlers. Sized
257    /// from host RAM at startup, overridable via
258    /// [`crate::admission::ENV_MAX_CONCURRENT`]. Without this, N
259    /// concurrent users multiply KV-cache and activation memory and
260    /// take the host out (#114-adjacent: filed alongside the daemon
261    /// always-on rework). The semaphore lives on `ServerState` so it
262    /// is shared across every WebSocket session in the same process.
263    pub admission: Arc<crate::admission::InferenceAdmission>,
264}
265
266impl ServerState {
267    /// Constructor for the standalone `car-server` binary. Each WS
268    /// connection gets its own per-session memgine — matches the
269    /// pre-extraction default and is correct for a single-process
270    /// daemon serving one user at a time.
271    ///
272    /// **Embedders must not call this.** It silently leaves
273    /// `shared_memgine = None`, which re-introduces the dual-memgine
274    /// bug U7 was created to prevent (one engine in the embedder, a
275    /// fresh one inside every WS session). Embedders use
276    /// [`ServerState::embedded`] instead, which makes the shared
277    /// engine handle a required argument so it cannot be forgotten.
278    pub fn standalone(journal_dir: PathBuf) -> Self {
279        Self::with_config(ServerStateConfig::new(journal_dir))
280    }
281
282    /// Constructor for embedders (e.g. `tokhn-daemon`). The shared
283    /// memgine handle is **required**: every WS session created by
284    /// this state will reuse the same engine, preventing the
285    /// dual-memgine bug.
286    ///
287    /// For embedders that also want to inject a pre-warmed inference
288    /// engine or other advanced wiring, build a [`ServerStateConfig`]
289    /// directly and call [`ServerState::with_config`].
290    pub fn embedded(
291        journal_dir: PathBuf,
292        shared_memgine: Arc<Mutex<car_memgine::MemgineEngine>>,
293    ) -> Self {
294        Self::with_config(
295            ServerStateConfig::new(journal_dir).with_shared_memgine(shared_memgine),
296        )
297    }
298
299    /// Build a `ServerState` from a [`ServerStateConfig`] — the path
300    /// embedders use when they need to inject a shared memgine *and*
301    /// a pre-warmed inference engine, or any other advanced wiring
302    /// the convenience constructors don't cover.
303    pub fn with_config(cfg: ServerStateConfig) -> Self {
304        let inference = std::sync::OnceLock::new();
305        if let Some(eng) = cfg.inference {
306            // OnceLock::set returns Err if already set — fresh OnceLock
307            // means it's empty, so this is infallible here.
308            let _ = inference.set(eng);
309        }
310        Self {
311            journal_dir: cfg.journal_dir,
312            sessions: Mutex::new(HashMap::new()),
313            inference,
314            host: Arc::new(crate::host::HostState::new()),
315            shared_memgine: cfg.shared_memgine,
316            voice_sessions: Arc::new(car_voice::VoiceSessionRegistry::new()),
317            meetings: Arc::new(car_meeting::MeetingRegistry::new()),
318            admission: Arc::new(crate::admission::InferenceAdmission::new()),
319        }
320    }
321
322    pub async fn create_session(
323        &self,
324        client_id: &str,
325        channel: Arc<WsChannel>,
326    ) -> Arc<ClientSession> {
327        let journal_path = self.journal_dir.join(format!("{}.jsonl", client_id));
328        let event_log = EventLog::with_journal(journal_path);
329
330        let executor = Arc::new(WsToolExecutor {
331            channel: channel.clone(),
332        });
333
334        let runtime = Runtime::new()
335            .with_event_log(event_log)
336            .with_executor(executor);
337
338        // If the embedder supplied a shared memgine, every session uses it.
339        // Otherwise each session gets its own — matches pre-extraction behavior.
340        let memgine = match &self.shared_memgine {
341            Some(eng) => eng.clone(),
342            None => Arc::new(Mutex::new(car_memgine::MemgineEngine::new(None))),
343        };
344
345        let session = Arc::new(ClientSession {
346            client_id: client_id.to_string(),
347            runtime: Arc::new(runtime),
348            channel,
349            host: self.host.clone(),
350            memgine,
351            browser: car_ffi_common::browser::BrowserSessionSlot::new(),
352        });
353
354        self.sessions
355            .lock()
356            .await
357            .insert(client_id.to_string(), session.clone());
358
359        session
360    }
361
362    /// Remove a per-client session from the registry on disconnect.
363    /// Returns the removed session if present so callers can drop any
364    /// remaining strong refs (e.g. drain pending tool callbacks). Fix
365    /// for MULTI-4 / WS-3 — without this, `state.sessions` retains
366    /// `Arc<ClientSession>` for every connection that ever existed.
367    pub async fn remove_session(&self, client_id: &str) -> Option<Arc<ClientSession>> {
368        self.sessions.lock().await.remove(client_id)
369    }
370}