car_server_core/session.rs
1//! Server-side session state — shared across all connections.
2
3use car_engine::{Runtime, ToolExecutor};
4use car_eventlog::EventLog;
5use car_proto::{ToolExecuteRequest, ToolExecuteResponse};
6use futures::Sink;
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use std::collections::HashMap;
10use std::path::PathBuf;
11use std::pin::Pin;
12use std::sync::atomic::{AtomicU64, Ordering};
13use std::sync::Arc;
14use tokio::sync::{oneshot, Mutex};
15use tokio_tungstenite::tungstenite::{Error as WsError, Message};
16
17/// Type-erased WebSocket sink. The dispatch loop accepts either a
18/// `WebSocketStream<TcpStream>` (the legacy car-server TCP listener)
19/// or a `WebSocketStream<UnixStream>` (the daemon-as-default UDS
20/// listener) — both implement `Sink<Message, Error = WsError>` after
21/// the tungstenite handshake. Erasing the type here avoids cascading
22/// a generic parameter through every WsChannel / Session / ServerState
23/// touchpoint in the dispatcher.
24pub type WsSink =
25 Pin<Box<dyn Sink<Message, Error = WsError> + Send + Unpin + 'static>>;
26
27/// Server-side credentials for continuing an A2A-owned A2UI surface.
28///
29/// This intentionally lives outside `car_a2ui::A2uiSurfaceOwner` so
30/// renderers can inspect surface ownership without receiving secrets.
31#[derive(Debug, Clone, Serialize, Deserialize)]
32#[serde(rename_all = "camelCase", tag = "type")]
33pub enum A2aRouteAuth {
34 None,
35 Bearer { token: String },
36 Header { name: String, value: String },
37}
38
39/// Shared write half of the WebSocket, plus pending callback channels.
40/// `write` is type-erased via [`WsSink`] so the dispatcher can run
41/// against any transport-specific WebSocketStream (TCP or UDS today;
42/// axum-bridged in future) without templatizing every consumer.
43pub struct WsChannel {
44 pub write: Mutex<WsSink>,
45 /// Pending tool execution callbacks: request_id → oneshot sender
46 pub pending: Mutex<HashMap<String, oneshot::Sender<ToolExecuteResponse>>>,
47 pub next_id: AtomicU64,
48}
49
50impl WsChannel {
51 pub fn next_request_id(&self) -> String {
52 let id = self.next_id.fetch_add(1, Ordering::SeqCst);
53 format!("cb-{}", id)
54 }
55}
56
57/// Tool executor that sends callbacks to the client over WebSocket.
58pub struct WsToolExecutor {
59 pub channel: Arc<WsChannel>,
60}
61
62#[async_trait::async_trait]
63impl ToolExecutor for WsToolExecutor {
64 async fn execute(&self, tool: &str, params: &Value) -> Result<Value, String> {
65 use futures::SinkExt;
66
67 let request_id = self.channel.next_request_id();
68
69 let callback = ToolExecuteRequest {
70 action_id: request_id.clone(),
71 tool: tool.to_string(),
72 parameters: params.clone(),
73 timeout_ms: None,
74 attempt: 1,
75 };
76
77 // Create a oneshot channel for the response
78 let (tx, rx) = oneshot::channel();
79 self.channel
80 .pending
81 .lock()
82 .await
83 .insert(request_id.clone(), tx);
84
85 // Send the callback to the client as a JSON-RPC request
86 let rpc_request = serde_json::json!({
87 "jsonrpc": "2.0",
88 "method": "tools.execute",
89 "params": callback,
90 "id": request_id,
91 });
92
93 let msg = Message::Text(
94 serde_json::to_string(&rpc_request)
95 .map_err(|e| e.to_string())?
96 .into(),
97 );
98 self.channel
99 .write
100 .lock()
101 .await
102 .send(msg)
103 .await
104 .map_err(|e| format!("failed to send tool callback: {}", e))?;
105
106 // Wait for the client to respond (with a timeout)
107 let response = tokio::time::timeout(std::time::Duration::from_secs(60), rx)
108 .await
109 .map_err(|_| format!("tool '{}' callback timed out (60s)", tool))?
110 .map_err(|_| format!("tool '{}' callback channel closed", tool))?;
111
112 if let Some(err) = response.error {
113 Err(err)
114 } else {
115 Ok(response.output.unwrap_or(Value::Null))
116 }
117 }
118}
119
120/// Voice event sink that forwards events to a specific WebSocket client
121/// as `voice.event` JSON-RPC notifications.
122///
123/// Each `voice.transcribe_stream.start` call constructs one of these
124/// bound to the originating client's [`WsChannel`], so a client only
125/// receives events for sessions it started.
126pub struct WsVoiceEventSink {
127 pub channel: Arc<WsChannel>,
128}
129
130impl car_voice::VoiceEventSink for WsVoiceEventSink {
131 fn send(&self, session_id: &str, event_json: String) {
132 use futures::SinkExt;
133 let channel = self.channel.clone();
134 let session_id = session_id.to_string();
135 tokio::spawn(async move {
136 let payload: Value = serde_json::from_str(&event_json)
137 .unwrap_or_else(|_| Value::String(event_json.clone()));
138 let notification = serde_json::json!({
139 "jsonrpc": "2.0",
140 "method": "voice.event",
141 "params": {
142 "session_id": session_id,
143 "event": payload,
144 },
145 });
146 let Ok(text) = serde_json::to_string(¬ification) else {
147 return;
148 };
149 let _ = channel
150 .write
151 .lock()
152 .await
153 .send(Message::Text(text.into()))
154 .await;
155 });
156 }
157}
158
159/// Per-meeting fanout sink that ingests transcript text into a
160/// session-scoped memgine using the `Arc<tokio::sync::Mutex<...>>`
161/// wrapper, then forwards every event upstream untouched.
162///
163/// Lives here (not in `car-ffi-common`) because the engine handle uses
164/// `tokio::sync::Mutex` per the "one-wrapper rule" — the FFI-common
165/// `MeetingMemgineFanout` still uses `std::sync::Mutex` for the NAPI/
166/// PyO3 bindings, which keep their sync wrappers. Each binding owns the
167/// fanout that matches its lock primitive; the parsing/formatting logic
168/// itself is shared via [`car_meeting::extract_transcript_for_ingest`].
169///
170/// `send` is called from the voice drain task and must be non-blocking,
171/// so the lock acquisition is shipped to a `tokio::spawn`. Transcript
172/// events are independent so reordering across spawned tasks is fine.
173pub struct WsMemgineIngestSink {
174 pub meeting_id: String,
175 pub engine: Arc<Mutex<car_memgine::MemgineEngine>>,
176 pub upstream: Arc<dyn car_voice::VoiceEventSink>,
177}
178
179impl car_voice::VoiceEventSink for WsMemgineIngestSink {
180 fn send(&self, voice_session_id: &str, event_json: String) {
181 if let Ok(value) = serde_json::from_str::<Value>(&event_json) {
182 if let Some((speaker, text)) = car_meeting::extract_transcript_for_ingest(
183 &value,
184 &self.meeting_id,
185 voice_session_id,
186 ) {
187 let engine = self.engine.clone();
188 tokio::spawn(async move {
189 let mut guard = engine.lock().await;
190 guard.ingest_conversation(&speaker, &text, chrono::Utc::now());
191 });
192 }
193 }
194 self.upstream.send(voice_session_id, event_json);
195 }
196}
197
198/// Per-client session.
199pub struct ClientSession {
200 pub client_id: String,
201 pub runtime: Arc<Runtime>,
202 pub channel: Arc<WsChannel>,
203 pub host: Arc<crate::host::HostState>,
204 /// Memgine handle. Wrapped in `tokio::sync::Mutex` so dispatcher
205 /// handlers can hold the lock across `.await` points without
206 /// risking poisoning. Migrated from `std::sync::Mutex` in the
207 /// car-server-core extraction (U1) per the "one-wrapper rule".
208 pub memgine: Arc<Mutex<car_memgine::MemgineEngine>>,
209 /// Lazy browser session — first `browser.run` call launches Chromium,
210 /// subsequent calls reuse it so element IDs resolve across invocations
211 /// within the same WebSocket connection.
212 pub browser: car_ffi_common::browser::BrowserSessionSlot,
213 /// Per-connection auth state. Starts `false`; flips to `true`
214 /// after a successful `session.auth` handshake. Always considered
215 /// authenticated when `ServerState::auth_token` is unset (auth
216 /// disabled). Closes Parslee-ai/car-releases#32.
217 pub authenticated: std::sync::atomic::AtomicBool,
218 /// Bound agent identity (#169). `Some(id)` once a lifecycle-agent
219 /// child has called `session.auth { token, agent_id }` and the
220 /// supervisor confirmed `agent_id` is supervised + token matches.
221 /// Used by `agents.list` to surface which managed agents have
222 /// actually attached vs. just being marked `Running` at the
223 /// process level. Cleared at disconnect by `remove_session`.
224 pub agent_id: tokio::sync::Mutex<Option<String>>,
225 /// Bound persistent memgine (#170). `Some` after `session.auth`
226 /// successfully attaches the connection to a daemon-owned
227 /// per-agent memgine (paired with `agent_id`). Memory handlers
228 /// route through [`ClientSession::effective_memgine`] which
229 /// returns this when set, falling back to the ephemeral
230 /// `memgine` field for browser/host/CLI connections.
231 pub bound_memgine:
232 tokio::sync::Mutex<Option<Arc<Mutex<car_memgine::MemgineEngine>>>>,
233}
234
235impl ClientSession {
236 /// Returns the memgine handle the memory.* handlers should use:
237 /// the bound per-agent memgine when this session attached via
238 /// `session.auth { agent_id }` (#169 + #170), otherwise the
239 /// ephemeral per-WS memgine. Cheap (one async lock + Arc clone).
240 pub async fn effective_memgine(&self) -> Arc<Mutex<car_memgine::MemgineEngine>> {
241 if let Some(eng) = self.bound_memgine.lock().await.as_ref() {
242 return eng.clone();
243 }
244 self.memgine.clone()
245 }
246}
247
248/// Builder for constructing a [`ServerState`] with embedder-supplied
249/// dependencies. Embedders (e.g. `tokhn-daemon`) use this to inject
250/// their own memgine handle and other shared infrastructure; the
251/// Approval-gate policy for high-risk WS methods.
252///
253/// Every method in `methods` must be acknowledged via
254/// `host.resolve_approval` before the dispatcher will route the
255/// request to its handler. The dispatcher waits up to `timeout` for
256/// a resolution; on timeout (or any non-`approve` resolution) the
257/// request fails with JSON-RPC error `-32003`.
258///
259/// Default: gate enabled, the macOS-automation surface
260/// (`automation.run_applescript`, `automation.shortcuts.run`,
261/// `messages.send`, `mail.send`, `vision.ocr`), 60-second timeout.
262/// `car-server --no-approvals` (or embedders calling
263/// [`ServerStateConfig::with_approval_gate`] with `enabled=false`)
264/// turns it off — only appropriate when no untrusted caller can
265/// reach the WS port.
266#[derive(Debug, Clone)]
267pub struct ApprovalGate {
268 /// Master switch. When `false`, every method dispatches without
269 /// raising an approval — the pre-2026-05 behaviour.
270 pub enabled: bool,
271 /// Methods that require approval. Match is by exact method-name
272 /// string against the JSON-RPC `method` field.
273 pub methods: std::collections::HashSet<String>,
274 /// How long to wait for the user to resolve the approval before
275 /// timing out and surfacing an error to the caller.
276 pub timeout: std::time::Duration,
277}
278
279impl Default for ApprovalGate {
280 fn default() -> Self {
281 let methods = [
282 "automation.run_applescript",
283 "automation.shortcuts.run",
284 "messages.send",
285 "mail.send",
286 "vision.ocr",
287 ]
288 .iter()
289 .map(|s| s.to_string())
290 .collect();
291 Self {
292 enabled: true,
293 methods,
294 timeout: std::time::Duration::from_secs(60),
295 }
296 }
297}
298
299impl ApprovalGate {
300 /// Disable the gate entirely. Equivalent to passing
301 /// `car-server --no-approvals`. Only appropriate when no
302 /// untrusted caller can reach the WS port.
303 pub fn disabled() -> Self {
304 Self {
305 enabled: false,
306 methods: std::collections::HashSet::new(),
307 timeout: std::time::Duration::from_secs(60),
308 }
309 }
310
311 /// `true` if this method must be acknowledged before dispatch.
312 pub fn requires_approval(&self, method: &str) -> bool {
313 self.enabled && self.methods.contains(method)
314 }
315}
316
317/// standalone `car-server` binary uses [`ServerState::standalone`]
318/// which calls `with_config` under the hood.
319pub struct ServerStateConfig {
320 pub journal_dir: PathBuf,
321 /// Optional pre-constructed memgine engine. When `None`, each
322 /// `create_session` call builds a fresh engine; embedders that want
323 /// to share a single engine across sessions can supply a clone of
324 /// their `Arc<Mutex<MemgineEngine>>` here.
325 pub shared_memgine: Option<Arc<Mutex<car_memgine::MemgineEngine>>>,
326 /// Optional pre-constructed inference engine.
327 pub inference: Option<Arc<car_inference::InferenceEngine>>,
328 /// Optional embedder-supplied A2A runtime. Used by the in-core
329 /// `A2aDispatcher` to execute peer-driven proposals. When `None`,
330 /// the dispatcher uses a fresh `Runtime` with `register_agent_basics`
331 /// — peer agents see CAR's built-in tools and nothing else,
332 /// matching the behaviour of the standalone `start_a2a_listener`.
333 pub a2a_runtime: Option<Arc<car_engine::Runtime>>,
334 /// Optional embedder-supplied A2A task store. When `None`,
335 /// defaults to `InMemoryTaskStore`. tokhn-style embedders that
336 /// want a polling-friendly persistent store plug it in here.
337 pub a2a_store: Option<Arc<dyn car_a2a::TaskStore>>,
338 /// Optional embedder-supplied agent card factory. When `None`,
339 /// the dispatcher serves a card built from the A2A runtime's
340 /// tool schemas at construction time, advertising its public URL
341 /// as `ws://127.0.0.1:9100/` (the WS surface the dispatcher itself
342 /// is reachable on).
343 pub a2a_card_source: Option<Arc<car_a2a::AgentCardSource>>,
344 /// Approval-gate policy. When `None`, the dispatcher uses
345 /// [`ApprovalGate::default`] (gate ON, the macOS-automation
346 /// surface gated, 60s timeout). Pass
347 /// [`ApprovalGate::disabled`] to opt out — only appropriate
348 /// when no untrusted caller can reach the WS port.
349 pub approval_gate: Option<ApprovalGate>,
350}
351
352impl ServerStateConfig {
353 /// Minimal config suitable for the standalone car-server binary:
354 /// only the journal dir is required; everything else is lazily
355 /// constructed at first use.
356 pub fn new(journal_dir: PathBuf) -> Self {
357 Self {
358 journal_dir,
359 shared_memgine: None,
360 inference: None,
361 a2a_runtime: None,
362 a2a_store: None,
363 a2a_card_source: None,
364 approval_gate: None,
365 }
366 }
367
368 pub fn with_shared_memgine(mut self, engine: Arc<Mutex<car_memgine::MemgineEngine>>) -> Self {
369 self.shared_memgine = Some(engine);
370 self
371 }
372
373 pub fn with_inference(mut self, engine: Arc<car_inference::InferenceEngine>) -> Self {
374 self.inference = Some(engine);
375 self
376 }
377
378 /// Plug in an embedder-supplied runtime for the A2A dispatcher.
379 /// Use case: tokhn-daemon wants peers to see its OPA preflight
380 /// tooling, not just CAR's `register_agent_basics` defaults.
381 pub fn with_a2a_runtime(mut self, runtime: Arc<car_engine::Runtime>) -> Self {
382 self.a2a_runtime = Some(runtime);
383 self
384 }
385
386 /// Plug in an embedder-supplied task store for the A2A
387 /// dispatcher. Use case: tokhn's polling-friendly persistent
388 /// store keyed by their session id.
389 pub fn with_a2a_store(mut self, store: Arc<dyn car_a2a::TaskStore>) -> Self {
390 self.a2a_store = Some(store);
391 self
392 }
393
394 /// Plug in an embedder-supplied agent card factory. The factory
395 /// is invoked on every `agent/getAuthenticatedExtendedCard`
396 /// dispatch, so embedders can reflect runtime tool changes.
397 pub fn with_a2a_card_source(
398 mut self,
399 source: Arc<car_a2a::AgentCardSource>,
400 ) -> Self {
401 self.a2a_card_source = Some(source);
402 self
403 }
404
405 /// Override the approval-gate policy. Pass
406 /// [`ApprovalGate::disabled`] to skip the gate entirely (only
407 /// appropriate when no untrusted caller can reach the WS port);
408 /// pass a customised [`ApprovalGate`] to add or remove methods
409 /// or to change the timeout.
410 pub fn with_approval_gate(mut self, gate: ApprovalGate) -> Self {
411 self.approval_gate = Some(gate);
412 self
413 }
414}
415
416/// Global server state shared across all connections.
417pub struct ServerState {
418 pub journal_dir: PathBuf,
419 pub sessions: Mutex<HashMap<String, Arc<ClientSession>>>,
420 pub inference: std::sync::OnceLock<Arc<car_inference::InferenceEngine>>,
421 pub host: Arc<crate::host::HostState>,
422 /// When `Some`, `create_session` clones this handle into every new
423 /// `ClientSession.memgine` — embedders that want a single shared
424 /// memgine across all WS sessions set this. Standalone car-server
425 /// leaves it `None`, which gives each session its own engine
426 /// (preserving today's behavior).
427 pub shared_memgine: Option<Arc<Mutex<car_memgine::MemgineEngine>>>,
428 /// Process-wide voice session registry. Each
429 /// `voice.transcribe_stream.start` call registers its own per-client
430 /// [`WsVoiceEventSink`] so events route back to the originating WS
431 /// connection only.
432 pub voice_sessions: Arc<car_voice::VoiceSessionRegistry>,
433 /// Process-wide meeting registry. Meeting ids are global; each
434 /// meeting binds to the originating client's WS for upstream
435 /// events but persists transcripts to the resolved
436 /// `.car/meetings/<id>/` regardless of which client started it.
437 pub meetings: Arc<car_meeting::MeetingRegistry>,
438 /// Process-wide A2UI surface store. Agent-produced surfaces are
439 /// visible to every host UI subscriber, independent of the
440 /// WebSocket session that applied the update.
441 pub a2ui: car_a2ui::A2uiSurfaceStore,
442 /// Process-wide concurrency gate for inference RPC handlers. Sized
443 /// from host RAM at startup, overridable via
444 /// [`crate::admission::ENV_MAX_CONCURRENT`]. Without this, N
445 /// concurrent users multiply KV-cache and activation memory and
446 /// take the host out (#114-adjacent: filed alongside the daemon
447 /// always-on rework). The semaphore lives on `ServerState` so it
448 /// is shared across every WebSocket session in the same process.
449 pub admission: Arc<crate::admission::InferenceAdmission>,
450 /// Server-side A2A continuation auth keyed by A2UI surface id.
451 /// Kept out of `A2uiSurface.owner` so host renderers never see
452 /// bearer/API-key material.
453 pub a2ui_route_auth: Mutex<HashMap<String, A2aRouteAuth>>,
454 /// Lifecycle-managed agents — declarative manifest at
455 /// `~/.car/agents.json` driving spawn/restart/stop. Closes
456 /// Parslee-ai/car-releases#27. Lazy-initialized so embedders that
457 /// don't want process supervision don't pay the disk-touch cost
458 /// at server start.
459 pub supervisor: std::sync::OnceLock<Arc<car_registry::supervisor::Supervisor>>,
460 /// In-core A2A dispatcher — embedders that consume `car-server-core`
461 /// get A2A reachability "for free" without standing up a separate
462 /// HTTP listener. Closes Parslee-ai/car-releases#28. Lazy-init so
463 /// the embedder can override the runtime / task store / agent card
464 /// via [`ServerStateConfig::with_a2a_runtime`] etc. before the
465 /// first dispatch.
466 pub a2a_dispatcher: std::sync::OnceLock<Arc<car_a2a::A2aDispatcher>>,
467 /// WS clients subscribed to A2UI envelope events. After every
468 /// successful `a2ui.apply` / `a2ui.ingest`, the resulting
469 /// `A2uiApplyResult` is broadcast to every subscriber as an
470 /// `a2ui.event` JSON-RPC notification. Closes
471 /// Parslee-ai/car-releases#29. Subscribers register via the
472 /// `a2ui/subscribe` method and are auto-cleaned on WS disconnect.
473 pub a2ui_subscribers: Mutex<HashMap<String, Arc<WsChannel>>>,
474 /// Per-launch auth token. When `Some`, the WS dispatcher rejects
475 /// non-auth methods on unauthenticated sessions until the client
476 /// calls `session.auth` with the matching value. When `None`,
477 /// auth is disabled and every connection works as before. Set
478 /// at startup by `car-server` unless `--no-auth` is passed
479 /// (default flipped 2026-05); embedders that want to enable
480 /// auth call [`ServerState::install_auth_token`]. Closes
481 /// Parslee-ai/car-releases#32.
482 pub auth_token: std::sync::OnceLock<String>,
483 /// `agent_id -> client_id` map of currently-attached lifecycle
484 /// agents (#169). Populated by the `session.auth` handler when a
485 /// supervised child presents its `agent_id` + per-agent token;
486 /// drained on disconnect by `remove_session`. Single-claim:
487 /// a second connection presenting the same `agent_id` is
488 /// rejected so the daemon-side per-agent state stays unambiguous.
489 pub attached_agents: Mutex<HashMap<String, String>>,
490 /// `agent_id -> persistent memgine` map (#170). Lazy-loaded on
491 /// first connection per id from `~/.car/memory/agents/<id>.jsonl`,
492 /// retained across daemon restart, surviving any single
493 /// disconnect/reconnect of the supervised child. Connections
494 /// that auth without an `agent_id` (browser, host, ad-hoc CLI)
495 /// keep the per-WS ephemeral memgine on `ClientSession.memgine`
496 /// — no behaviour change.
497 pub agent_memgines:
498 Mutex<HashMap<String, Arc<Mutex<car_memgine::MemgineEngine>>>>,
499 /// Bound MCP HTTP-streamable URL (e.g.
500 /// `"http://127.0.0.1:9102/mcp"`) — `car-server` installs this
501 /// after binding the listener. Used by the
502 /// `agents.invoke_external` handler to default
503 /// `InvokeOptions.mcp_endpoint` so external agents
504 /// (Claude Code today) load the daemon's CAR namespace via
505 /// `--mcp-config` automatically. `None` when MCP isn't bound
506 /// (e.g. `--mcp-bind disabled`).
507 pub mcp_url: std::sync::OnceLock<String>,
508 /// Registry of connected MCP SSE sessions. Populated alongside
509 /// [`mcp_url`] when `car-server` boots the MCP listener. Public
510 /// so handlers can call `crate::mcp::push_to_session` to send
511 /// server-initiated requests to a specific MCP-connected
512 /// client (MCP-3 foundation; MCP-3b will wire host-owned tool
513 /// dispatch through this).
514 pub mcp_sessions: std::sync::OnceLock<Arc<crate::mcp::SessionMap>>,
515 /// Approval gate for high-risk WS methods (audit 2026-05). The
516 /// gate intercepts `automation.run_applescript`,
517 /// `automation.shortcuts.run`, `messages.send`, `mail.send`, and
518 /// `vision.ocr` before they dispatch, raises a
519 /// `host.create_approval` for the user to act on, and waits
520 /// (with a timeout) for `host.resolve_approval`. Approve →
521 /// dispatch continues; deny / timeout → JSON-RPC error code
522 /// `-32003`. The set of gated methods and the wait timeout are
523 /// embedder-overridable via
524 /// [`ServerStateConfig::with_approval_gate`].
525 pub approval_gate: ApprovalGate,
526 /// A2A-runtime / store / card factory carried over from the
527 /// embedder's [`ServerStateConfig`]. Consumed lazily on first
528 /// `a2a_dispatcher()` call so embedders can construct
529 /// `ServerState` without paying the runtime spin-up cost when
530 /// they don't actually use the A2A surface.
531 pub(crate) a2a_runtime: std::sync::Mutex<Option<Arc<car_engine::Runtime>>>,
532 pub(crate) a2a_store: std::sync::Mutex<Option<Arc<dyn car_a2a::TaskStore>>>,
533 pub(crate) a2a_card_source: std::sync::Mutex<Option<Arc<car_a2a::AgentCardSource>>>,
534}
535
536impl ServerState {
537 /// Constructor for the standalone `car-server` binary. Each WS
538 /// connection gets its own per-session memgine — matches the
539 /// pre-extraction default and is correct for a single-process
540 /// daemon serving one user at a time.
541 ///
542 /// **Embedders must not call this.** It silently leaves
543 /// `shared_memgine = None`, which re-introduces the dual-memgine
544 /// bug U7 was created to prevent (one engine in the embedder, a
545 /// fresh one inside every WS session). Embedders use
546 /// [`ServerState::embedded`] instead, which makes the shared
547 /// engine handle a required argument so it cannot be forgotten.
548 pub fn standalone(journal_dir: PathBuf) -> Self {
549 Self::with_config(ServerStateConfig::new(journal_dir))
550 }
551
552 /// Constructor for embedders (e.g. `tokhn-daemon`). The shared
553 /// memgine handle is **required**: every WS session created by
554 /// this state will reuse the same engine, preventing the
555 /// dual-memgine bug.
556 ///
557 /// For embedders that also want to inject a pre-warmed inference
558 /// engine or other advanced wiring, build a [`ServerStateConfig`]
559 /// directly and call [`ServerState::with_config`].
560 pub fn embedded(
561 journal_dir: PathBuf,
562 shared_memgine: Arc<Mutex<car_memgine::MemgineEngine>>,
563 ) -> Self {
564 Self::with_config(ServerStateConfig::new(journal_dir).with_shared_memgine(shared_memgine))
565 }
566
567 /// Build a `ServerState` from a [`ServerStateConfig`] — the path
568 /// embedders use when they need to inject a shared memgine *and*
569 /// a pre-warmed inference engine, or any other advanced wiring
570 /// the convenience constructors don't cover.
571 pub fn with_config(cfg: ServerStateConfig) -> Self {
572 let inference = std::sync::OnceLock::new();
573 if let Some(eng) = cfg.inference {
574 // OnceLock::set returns Err if already set — fresh OnceLock
575 // means it's empty, so this is infallible here.
576 let _ = inference.set(eng);
577 }
578 let voice_sessions = Arc::new(car_voice::VoiceSessionRegistry::new());
579 // Reap sessions whose clients dropped without calling
580 // voice.transcribe_stream.stop (WS disconnect, process exit,
581 // etc.). Listener handles otherwise leak for the daemon's
582 // lifetime. `with_config` is sync but always called from the
583 // `#[tokio::main]` entry point, so `Handle::try_current()`
584 // inside `start_sweeper` finds the runtime.
585 voice_sessions.start_sweeper();
586 Self {
587 journal_dir: cfg.journal_dir,
588 sessions: Mutex::new(HashMap::new()),
589 inference,
590 host: Arc::new(crate::host::HostState::new()),
591 shared_memgine: cfg.shared_memgine,
592 voice_sessions,
593 meetings: Arc::new(car_meeting::MeetingRegistry::new()),
594 a2ui: car_a2ui::A2uiSurfaceStore::new(),
595 admission: Arc::new(crate::admission::InferenceAdmission::new()),
596 a2ui_route_auth: Mutex::new(HashMap::new()),
597 supervisor: std::sync::OnceLock::new(),
598 a2a_dispatcher: std::sync::OnceLock::new(),
599 a2a_runtime: std::sync::Mutex::new(cfg.a2a_runtime),
600 a2a_store: std::sync::Mutex::new(cfg.a2a_store),
601 a2a_card_source: std::sync::Mutex::new(cfg.a2a_card_source),
602 a2ui_subscribers: Mutex::new(HashMap::new()),
603 auth_token: std::sync::OnceLock::new(),
604 attached_agents: Mutex::new(HashMap::new()),
605 agent_memgines: Mutex::new(HashMap::new()),
606 mcp_url: std::sync::OnceLock::new(),
607 mcp_sessions: std::sync::OnceLock::new(),
608 approval_gate: cfg.approval_gate.unwrap_or_default(),
609 }
610 }
611
612 /// Enable the per-launch auth handshake. After this call, every
613 /// new WS connection must call `session.auth` with `token` as
614 /// the first frame; otherwise the connection is closed. Called
615 /// by `car-server` at startup unless `--no-auth` is set
616 /// (default flipped 2026-05); embedders supply their own token
617 /// if they want the same posture. Returns `Err(token)` when
618 /// auth was already installed.
619 pub fn install_auth_token(&self, token: String) -> Result<(), String> {
620 self.auth_token.set(token)
621 }
622
623 /// Install the bound MCP URL after car-server's listener is up.
624 /// Idempotent on the first call; subsequent calls are accepted
625 /// silently (matches the supervisor / a2a_dispatcher install
626 /// idiom). Returns `Err(())` when an MCP URL was already
627 /// installed — embedders should treat this as "another
628 /// component beat us to it" and use whichever value is now set.
629 pub fn install_mcp_url(&self, url: String) -> Result<(), String> {
630 self.mcp_url.set(url)
631 }
632
633 /// Install the MCP SSE session registry. Pairs with
634 /// [`install_mcp_url`] — both come from the same `start_mcp`
635 /// call and either both get installed or neither does (the
636 /// daemon binds them together).
637 pub fn install_mcp_sessions(
638 &self,
639 sessions: Arc<crate::mcp::SessionMap>,
640 ) -> Result<(), Arc<crate::mcp::SessionMap>> {
641 self.mcp_sessions.set(sessions)
642 }
643
644 /// Lazy-initialize and return the agent supervisor. The first
645 /// call constructs a [`car_registry::supervisor::Supervisor`] backed by
646 /// `~/.car/agents.json` + `~/.car/logs/`. Embedders that need a
647 /// non-default location should call
648 /// [`ServerState::install_supervisor`] before any handler runs.
649 pub fn supervisor(&self) -> Result<Arc<car_registry::supervisor::Supervisor>, String> {
650 if let Some(s) = self.supervisor.get() {
651 return Ok(s.clone());
652 }
653 let s = car_registry::supervisor::Supervisor::user_default()
654 .map(Arc::new)
655 .map_err(|e| e.to_string())?;
656 // OnceLock::set returns the original arg back on collision —
657 // a concurrent caller racing through user_default. Take
658 // whichever wins.
659 let _ = self.supervisor.set(s);
660 Ok(self.supervisor.get().expect("set or pre-existing").clone())
661 }
662
663 /// Replace the lazy default with a caller-supplied supervisor.
664 /// Returns `Err(())` when a supervisor was already installed.
665 /// Used by the standalone `car-server` binary to call
666 /// `start_all()` on a known-good handle without paying the
667 /// lazy-init lookup cost.
668 pub fn install_supervisor(
669 &self,
670 supervisor: Arc<car_registry::supervisor::Supervisor>,
671 ) -> Result<(), Arc<car_registry::supervisor::Supervisor>> {
672 self.supervisor.set(supervisor)
673 }
674
675 /// Lazy-initialize and return the in-core A2A dispatcher. The
676 /// first call constructs an [`car_a2a::A2aDispatcher`] from
677 /// either the embedder's overrides (set via
678 /// [`ServerStateConfig::with_a2a_runtime`] / `with_a2a_store` /
679 /// `with_a2a_card_source`) or sensible defaults: a fresh
680 /// `Runtime` with `register_agent_basics` registered, an
681 /// `InMemoryTaskStore`, and a card built from the runtime's
682 /// tool schemas advertising `ws://127.0.0.1:9100/` as the
683 /// public URL. Closes Parslee-ai/car-releases#28.
684 pub async fn a2a_dispatcher(&self) -> Arc<car_a2a::A2aDispatcher> {
685 if let Some(d) = self.a2a_dispatcher.get() {
686 return d.clone();
687 }
688
689 // Embedder overrides take precedence; fall back to defaults
690 // for each slot independently (so an embedder that only
691 // wants a custom card can leave the runtime + store at
692 // defaults). `Mutex::take()` consumes the slot so the
693 // defaults aren't reconstructed on a racing init that loses
694 // the OnceLock::set call below.
695 let runtime = self
696 .a2a_runtime
697 .lock()
698 .expect("a2a_runtime mutex poisoned")
699 .take();
700 let runtime = match runtime {
701 Some(r) => r,
702 None => {
703 let r = Arc::new(car_engine::Runtime::new());
704 r.register_agent_basics().await;
705 r
706 }
707 };
708
709 let store = self
710 .a2a_store
711 .lock()
712 .expect("a2a_store mutex poisoned")
713 .take()
714 .unwrap_or_else(|| Arc::new(car_a2a::InMemoryTaskStore::new()));
715
716 let card_source = self
717 .a2a_card_source
718 .lock()
719 .expect("a2a_card_source mutex poisoned")
720 .take();
721 let card_source = match card_source {
722 Some(c) => c,
723 None => {
724 let card = car_a2a::build_default_agent_card(
725 &runtime,
726 car_a2a::AgentCardConfig::minimal(
727 "Common Agent Runtime",
728 "Embedded CAR daemon — A2A v1.0 reachable over WebSocket JSON-RPC.",
729 "ws://127.0.0.1:9100/",
730 car_a2a::AgentProvider {
731 organization: "Parslee".into(),
732 url: Some("https://github.com/Parslee-ai/car".into()),
733 },
734 ),
735 )
736 .await;
737 Arc::new(move || card.clone()) as Arc<car_a2a::AgentCardSource>
738 }
739 };
740
741 let dispatcher = Arc::new(car_a2a::A2aDispatcher::new(runtime, store, card_source));
742 // OnceLock::set returns Err on race — accept whichever
743 // dispatcher won and clone-return that one.
744 let _ = self.a2a_dispatcher.set(dispatcher);
745 self.a2a_dispatcher
746 .get()
747 .expect("a2a_dispatcher set or pre-existing")
748 .clone()
749 }
750
751 pub async fn create_session(
752 &self,
753 client_id: &str,
754 channel: Arc<WsChannel>,
755 ) -> Arc<ClientSession> {
756 let journal_path = self.journal_dir.join(format!("{}.jsonl", client_id));
757 let event_log = EventLog::with_journal(journal_path);
758
759 let executor = Arc::new(WsToolExecutor {
760 channel: channel.clone(),
761 });
762
763 let runtime = Runtime::new()
764 .with_event_log(event_log)
765 .with_executor(executor);
766
767 // If the embedder supplied a shared memgine, every session uses it.
768 // Otherwise each session gets its own — matches pre-extraction behavior.
769 let memgine = match &self.shared_memgine {
770 Some(eng) => eng.clone(),
771 None => Arc::new(Mutex::new(car_memgine::MemgineEngine::new(None))),
772 };
773
774 let session = Arc::new(ClientSession {
775 client_id: client_id.to_string(),
776 runtime: Arc::new(runtime),
777 channel,
778 host: self.host.clone(),
779 memgine,
780 browser: car_ffi_common::browser::BrowserSessionSlot::new(),
781 // When auth is disabled (no token installed), every
782 // session is "authenticated" by default — preserves the
783 // pre-#32 behaviour. When auth is enabled, the value is
784 // ignored on creation; the dispatcher's gate checks
785 // `ServerState::auth_token.is_some()` to decide whether
786 // to enforce.
787 authenticated: std::sync::atomic::AtomicBool::new(false),
788 agent_id: tokio::sync::Mutex::new(None),
789 bound_memgine: tokio::sync::Mutex::new(None),
790 });
791
792 self.sessions
793 .lock()
794 .await
795 .insert(client_id.to_string(), session.clone());
796
797 session
798 }
799
800 /// Remove a per-client session from the registry on disconnect.
801 /// Returns the removed session if present so callers can drop any
802 /// remaining strong refs (e.g. drain pending tool callbacks). Fix
803 /// for MULTI-4 / WS-3 — without this, `state.sessions` retains
804 /// `Arc<ClientSession>` for every connection that ever existed.
805 pub async fn remove_session(&self, client_id: &str) -> Option<Arc<ClientSession>> {
806 let removed = self.sessions.lock().await.remove(client_id);
807 if let Some(session) = &removed {
808 // #169: drop the agent_id → client_id binding so a
809 // disconnected lifecycle agent can reconnect (or its
810 // supervisor-respawned replacement can take the slot)
811 // without colliding with the stale claim.
812 let bound = session.agent_id.lock().await.clone();
813 if let Some(id) = bound {
814 let mut attached = self.attached_agents.lock().await;
815 if attached.get(&id).map(String::as_str) == Some(client_id) {
816 attached.remove(&id);
817 }
818 }
819 }
820 removed
821 }
822}