Skip to main content

car_server_core/
session.rs

1//! Server-side session state — shared across all connections.
2
3use car_engine::{Runtime, ToolExecutor};
4use car_eventlog::EventLog;
5use car_proto::{ToolExecuteRequest, ToolExecuteResponse};
6use futures::Sink;
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use std::collections::HashMap;
10use std::path::PathBuf;
11use std::pin::Pin;
12use std::sync::atomic::{AtomicU64, Ordering};
13use std::sync::Arc;
14use tokio::sync::{oneshot, Mutex};
15use tokio_tungstenite::tungstenite::{Error as WsError, Message};
16
17/// Type-erased WebSocket sink. The dispatch loop accepts either a
18/// `WebSocketStream<TcpStream>` (the legacy car-server TCP listener)
19/// or a `WebSocketStream<UnixStream>` (the daemon-as-default UDS
20/// listener) — both implement `Sink<Message, Error = WsError>` after
21/// the tungstenite handshake. Erasing the type here avoids cascading
22/// a generic parameter through every WsChannel / Session / ServerState
23/// touchpoint in the dispatcher.
24pub type WsSink =
25    Pin<Box<dyn Sink<Message, Error = WsError> + Send + Unpin + 'static>>;
26
27/// Server-side credentials for continuing an A2A-owned A2UI surface.
28///
29/// This intentionally lives outside `car_a2ui::A2uiSurfaceOwner` so
30/// renderers can inspect surface ownership without receiving secrets.
31#[derive(Debug, Clone, Serialize, Deserialize)]
32#[serde(rename_all = "camelCase", tag = "type")]
33pub enum A2aRouteAuth {
34    None,
35    Bearer { token: String },
36    Header { name: String, value: String },
37}
38
39/// Shared write half of the WebSocket, plus pending callback channels.
40/// `write` is type-erased via [`WsSink`] so the dispatcher can run
41/// against any transport-specific WebSocketStream (TCP or UDS today;
42/// axum-bridged in future) without templatizing every consumer.
43pub struct WsChannel {
44    pub write: Mutex<WsSink>,
45    /// Pending tool execution callbacks: request_id → oneshot sender
46    pub pending: Mutex<HashMap<String, oneshot::Sender<ToolExecuteResponse>>>,
47    pub next_id: AtomicU64,
48}
49
50impl WsChannel {
51    pub fn next_request_id(&self) -> String {
52        let id = self.next_id.fetch_add(1, Ordering::SeqCst);
53        format!("cb-{}", id)
54    }
55}
56
57/// Tool executor that sends callbacks to the client over WebSocket.
58pub struct WsToolExecutor {
59    pub channel: Arc<WsChannel>,
60}
61
62#[async_trait::async_trait]
63impl ToolExecutor for WsToolExecutor {
64    async fn execute(&self, tool: &str, params: &Value) -> Result<Value, String> {
65        use futures::SinkExt;
66
67        let request_id = self.channel.next_request_id();
68
69        let callback = ToolExecuteRequest {
70            action_id: request_id.clone(),
71            tool: tool.to_string(),
72            parameters: params.clone(),
73            timeout_ms: None,
74            attempt: 1,
75        };
76
77        // Create a oneshot channel for the response
78        let (tx, rx) = oneshot::channel();
79        self.channel
80            .pending
81            .lock()
82            .await
83            .insert(request_id.clone(), tx);
84
85        // Send the callback to the client as a JSON-RPC request
86        let rpc_request = serde_json::json!({
87            "jsonrpc": "2.0",
88            "method": "tools.execute",
89            "params": callback,
90            "id": request_id,
91        });
92
93        let msg = Message::Text(
94            serde_json::to_string(&rpc_request)
95                .map_err(|e| e.to_string())?
96                .into(),
97        );
98        self.channel
99            .write
100            .lock()
101            .await
102            .send(msg)
103            .await
104            .map_err(|e| format!("failed to send tool callback: {}", e))?;
105
106        // Wait for the client to respond (with a timeout)
107        let response = tokio::time::timeout(std::time::Duration::from_secs(60), rx)
108            .await
109            .map_err(|_| format!("tool '{}' callback timed out (60s)", tool))?
110            .map_err(|_| format!("tool '{}' callback channel closed", tool))?;
111
112        if let Some(err) = response.error {
113            Err(err)
114        } else {
115            Ok(response.output.unwrap_or(Value::Null))
116        }
117    }
118}
119
120/// Voice event sink that forwards events to a specific WebSocket client
121/// as `voice.event` JSON-RPC notifications.
122///
123/// Each `voice.transcribe_stream.start` call constructs one of these
124/// bound to the originating client's [`WsChannel`], so a client only
125/// receives events for sessions it started.
126pub struct WsVoiceEventSink {
127    pub channel: Arc<WsChannel>,
128}
129
130impl car_voice::VoiceEventSink for WsVoiceEventSink {
131    fn send(&self, session_id: &str, event_json: String) {
132        use futures::SinkExt;
133        let channel = self.channel.clone();
134        let session_id = session_id.to_string();
135        tokio::spawn(async move {
136            let payload: Value = serde_json::from_str(&event_json)
137                .unwrap_or_else(|_| Value::String(event_json.clone()));
138            let notification = serde_json::json!({
139                "jsonrpc": "2.0",
140                "method": "voice.event",
141                "params": {
142                    "session_id": session_id,
143                    "event": payload,
144                },
145            });
146            let Ok(text) = serde_json::to_string(&notification) else {
147                return;
148            };
149            let _ = channel
150                .write
151                .lock()
152                .await
153                .send(Message::Text(text.into()))
154                .await;
155        });
156    }
157}
158
159/// Per-meeting fanout sink that ingests transcript text into a
160/// session-scoped memgine using the `Arc<tokio::sync::Mutex<...>>`
161/// wrapper, then forwards every event upstream untouched.
162///
163/// Lives here (not in `car-ffi-common`) because the engine handle uses
164/// `tokio::sync::Mutex` per the "one-wrapper rule" — the FFI-common
165/// `MeetingMemgineFanout` still uses `std::sync::Mutex` for the NAPI/
166/// PyO3 bindings, which keep their sync wrappers. Each binding owns the
167/// fanout that matches its lock primitive; the parsing/formatting logic
168/// itself is shared via [`car_meeting::extract_transcript_for_ingest`].
169///
170/// `send` is called from the voice drain task and must be non-blocking,
171/// so the lock acquisition is shipped to a `tokio::spawn`. Transcript
172/// events are independent so reordering across spawned tasks is fine.
173pub struct WsMemgineIngestSink {
174    pub meeting_id: String,
175    pub engine: Arc<Mutex<car_memgine::MemgineEngine>>,
176    pub upstream: Arc<dyn car_voice::VoiceEventSink>,
177}
178
179impl car_voice::VoiceEventSink for WsMemgineIngestSink {
180    fn send(&self, voice_session_id: &str, event_json: String) {
181        if let Ok(value) = serde_json::from_str::<Value>(&event_json) {
182            if let Some((speaker, text)) = car_meeting::extract_transcript_for_ingest(
183                &value,
184                &self.meeting_id,
185                voice_session_id,
186            ) {
187                let engine = self.engine.clone();
188                tokio::spawn(async move {
189                    let mut guard = engine.lock().await;
190                    guard.ingest_conversation(&speaker, &text, chrono::Utc::now());
191                });
192            }
193        }
194        self.upstream.send(voice_session_id, event_json);
195    }
196}
197
198/// Per-client session.
199pub struct ClientSession {
200    pub client_id: String,
201    pub runtime: Arc<Runtime>,
202    pub channel: Arc<WsChannel>,
203    pub host: Arc<crate::host::HostState>,
204    /// Memgine handle. Wrapped in `tokio::sync::Mutex` so dispatcher
205    /// handlers can hold the lock across `.await` points without
206    /// risking poisoning. Migrated from `std::sync::Mutex` in the
207    /// car-server-core extraction (U1) per the "one-wrapper rule".
208    pub memgine: Arc<Mutex<car_memgine::MemgineEngine>>,
209    /// Lazy browser session — first `browser.run` call launches Chromium,
210    /// subsequent calls reuse it so element IDs resolve across invocations
211    /// within the same WebSocket connection.
212    pub browser: car_ffi_common::browser::BrowserSessionSlot,
213    /// Per-connection auth state. Starts `false`; flips to `true`
214    /// after a successful `session.auth` handshake. Always considered
215    /// authenticated when `ServerState::auth_token` is unset (auth
216    /// disabled). Closes Parslee-ai/car-releases#32.
217    pub authenticated: std::sync::atomic::AtomicBool,
218    /// Bound agent identity (#169). `Some(id)` once a lifecycle-agent
219    /// child has called `session.auth { token, agent_id }` and the
220    /// supervisor confirmed `agent_id` is supervised + token matches.
221    /// Used by `agents.list` to surface which managed agents have
222    /// actually attached vs. just being marked `Running` at the
223    /// process level. Cleared at disconnect by `remove_session`.
224    pub agent_id: tokio::sync::Mutex<Option<String>>,
225    /// Bound persistent memgine (#170). `Some` after `session.auth`
226    /// successfully attaches the connection to a daemon-owned
227    /// per-agent memgine (paired with `agent_id`). Memory handlers
228    /// route through [`ClientSession::effective_memgine`] which
229    /// returns this when set, falling back to the ephemeral
230    /// `memgine` field for browser/host/CLI connections.
231    pub bound_memgine:
232        tokio::sync::Mutex<Option<Arc<Mutex<car_memgine::MemgineEngine>>>>,
233}
234
235impl ClientSession {
236    /// Returns the memgine handle the memory.* handlers should use:
237    /// the bound per-agent memgine when this session attached via
238    /// `session.auth { agent_id }` (#169 + #170), otherwise the
239    /// ephemeral per-WS memgine. Cheap (one async lock + Arc clone).
240    pub async fn effective_memgine(&self) -> Arc<Mutex<car_memgine::MemgineEngine>> {
241        if let Some(eng) = self.bound_memgine.lock().await.as_ref() {
242            return eng.clone();
243        }
244        self.memgine.clone()
245    }
246}
247
248/// Builder for constructing a [`ServerState`] with embedder-supplied
249/// dependencies. Embedders (e.g. `tokhn-daemon`) use this to inject
250/// their own memgine handle and other shared infrastructure; the
251/// Approval-gate policy for high-risk WS methods.
252///
253/// Every method in `methods` must be acknowledged via
254/// `host.resolve_approval` before the dispatcher will route the
255/// request to its handler. The dispatcher waits up to `timeout` for
256/// a resolution; on timeout (or any non-`approve` resolution) the
257/// request fails with JSON-RPC error `-32003`.
258///
259/// Default: gate enabled, the macOS-automation surface
260/// (`automation.run_applescript`, `automation.shortcuts.run`,
261/// `messages.send`, `mail.send`, `vision.ocr`), 60-second timeout.
262/// `car-server --no-approvals` (or embedders calling
263/// [`ServerStateConfig::with_approval_gate`] with `enabled=false`)
264/// turns it off — only appropriate when no untrusted caller can
265/// reach the WS port.
266#[derive(Debug, Clone)]
267pub struct ApprovalGate {
268    /// Master switch. When `false`, every method dispatches without
269    /// raising an approval — the pre-2026-05 behaviour.
270    pub enabled: bool,
271    /// Methods that require approval. Match is by exact method-name
272    /// string against the JSON-RPC `method` field.
273    pub methods: std::collections::HashSet<String>,
274    /// How long to wait for the user to resolve the approval before
275    /// timing out and surfacing an error to the caller.
276    pub timeout: std::time::Duration,
277}
278
279impl Default for ApprovalGate {
280    fn default() -> Self {
281        let methods = [
282            "automation.run_applescript",
283            "automation.shortcuts.run",
284            "messages.send",
285            "mail.send",
286            "vision.ocr",
287        ]
288        .iter()
289        .map(|s| s.to_string())
290        .collect();
291        Self {
292            enabled: true,
293            methods,
294            timeout: std::time::Duration::from_secs(60),
295        }
296    }
297}
298
299impl ApprovalGate {
300    /// Disable the gate entirely. Equivalent to passing
301    /// `car-server --no-approvals`. Only appropriate when no
302    /// untrusted caller can reach the WS port.
303    pub fn disabled() -> Self {
304        Self {
305            enabled: false,
306            methods: std::collections::HashSet::new(),
307            timeout: std::time::Duration::from_secs(60),
308        }
309    }
310
311    /// `true` if this method must be acknowledged before dispatch.
312    pub fn requires_approval(&self, method: &str) -> bool {
313        self.enabled && self.methods.contains(method)
314    }
315}
316
317/// standalone `car-server` binary uses [`ServerState::standalone`]
318/// which calls `with_config` under the hood.
319pub struct ServerStateConfig {
320    pub journal_dir: PathBuf,
321    /// Optional pre-constructed memgine engine. When `None`, each
322    /// `create_session` call builds a fresh engine; embedders that want
323    /// to share a single engine across sessions can supply a clone of
324    /// their `Arc<Mutex<MemgineEngine>>` here.
325    pub shared_memgine: Option<Arc<Mutex<car_memgine::MemgineEngine>>>,
326    /// Optional pre-constructed inference engine.
327    pub inference: Option<Arc<car_inference::InferenceEngine>>,
328    /// Optional embedder-supplied A2A runtime. Used by the in-core
329    /// `A2aDispatcher` to execute peer-driven proposals. When `None`,
330    /// the dispatcher uses a fresh `Runtime` with `register_agent_basics`
331    /// — peer agents see CAR's built-in tools and nothing else,
332    /// matching the behaviour of the standalone `start_a2a_listener`.
333    pub a2a_runtime: Option<Arc<car_engine::Runtime>>,
334    /// Optional embedder-supplied A2A task store. When `None`,
335    /// defaults to `InMemoryTaskStore`. tokhn-style embedders that
336    /// want a polling-friendly persistent store plug it in here.
337    pub a2a_store: Option<Arc<dyn car_a2a::TaskStore>>,
338    /// Optional embedder-supplied agent card factory. When `None`,
339    /// the dispatcher serves a card built from the A2A runtime's
340    /// tool schemas at construction time, advertising its public URL
341    /// as `ws://127.0.0.1:9100/` (the WS surface the dispatcher itself
342    /// is reachable on).
343    pub a2a_card_source: Option<Arc<car_a2a::AgentCardSource>>,
344    /// Approval-gate policy. When `None`, the dispatcher uses
345    /// [`ApprovalGate::default`] (gate ON, the macOS-automation
346    /// surface gated, 60s timeout). Pass
347    /// [`ApprovalGate::disabled`] to opt out — only appropriate
348    /// when no untrusted caller can reach the WS port.
349    pub approval_gate: Option<ApprovalGate>,
350}
351
352impl ServerStateConfig {
353    /// Minimal config suitable for the standalone car-server binary:
354    /// only the journal dir is required; everything else is lazily
355    /// constructed at first use.
356    pub fn new(journal_dir: PathBuf) -> Self {
357        Self {
358            journal_dir,
359            shared_memgine: None,
360            inference: None,
361            a2a_runtime: None,
362            a2a_store: None,
363            a2a_card_source: None,
364            approval_gate: None,
365        }
366    }
367
368    pub fn with_shared_memgine(mut self, engine: Arc<Mutex<car_memgine::MemgineEngine>>) -> Self {
369        self.shared_memgine = Some(engine);
370        self
371    }
372
373    pub fn with_inference(mut self, engine: Arc<car_inference::InferenceEngine>) -> Self {
374        self.inference = Some(engine);
375        self
376    }
377
378    /// Plug in an embedder-supplied runtime for the A2A dispatcher.
379    /// Use case: tokhn-daemon wants peers to see its OPA preflight
380    /// tooling, not just CAR's `register_agent_basics` defaults.
381    pub fn with_a2a_runtime(mut self, runtime: Arc<car_engine::Runtime>) -> Self {
382        self.a2a_runtime = Some(runtime);
383        self
384    }
385
386    /// Plug in an embedder-supplied task store for the A2A
387    /// dispatcher. Use case: tokhn's polling-friendly persistent
388    /// store keyed by their session id.
389    pub fn with_a2a_store(mut self, store: Arc<dyn car_a2a::TaskStore>) -> Self {
390        self.a2a_store = Some(store);
391        self
392    }
393
394    /// Plug in an embedder-supplied agent card factory. The factory
395    /// is invoked on every `agent/getAuthenticatedExtendedCard`
396    /// dispatch, so embedders can reflect runtime tool changes.
397    pub fn with_a2a_card_source(
398        mut self,
399        source: Arc<car_a2a::AgentCardSource>,
400    ) -> Self {
401        self.a2a_card_source = Some(source);
402        self
403    }
404
405    /// Override the approval-gate policy. Pass
406    /// [`ApprovalGate::disabled`] to skip the gate entirely (only
407    /// appropriate when no untrusted caller can reach the WS port);
408    /// pass a customised [`ApprovalGate`] to add or remove methods
409    /// or to change the timeout.
410    pub fn with_approval_gate(mut self, gate: ApprovalGate) -> Self {
411        self.approval_gate = Some(gate);
412        self
413    }
414}
415
416/// Global server state shared across all connections.
417pub struct ServerState {
418    pub journal_dir: PathBuf,
419    pub sessions: Mutex<HashMap<String, Arc<ClientSession>>>,
420    pub inference: std::sync::OnceLock<Arc<car_inference::InferenceEngine>>,
421    pub host: Arc<crate::host::HostState>,
422    /// When `Some`, `create_session` clones this handle into every new
423    /// `ClientSession.memgine` — embedders that want a single shared
424    /// memgine across all WS sessions set this. Standalone car-server
425    /// leaves it `None`, which gives each session its own engine
426    /// (preserving today's behavior).
427    pub shared_memgine: Option<Arc<Mutex<car_memgine::MemgineEngine>>>,
428    /// Process-wide voice session registry. Each
429    /// `voice.transcribe_stream.start` call registers its own per-client
430    /// [`WsVoiceEventSink`] so events route back to the originating WS
431    /// connection only.
432    pub voice_sessions: Arc<car_voice::VoiceSessionRegistry>,
433    /// Process-wide meeting registry. Meeting ids are global; each
434    /// meeting binds to the originating client's WS for upstream
435    /// events but persists transcripts to the resolved
436    /// `.car/meetings/<id>/` regardless of which client started it.
437    pub meetings: Arc<car_meeting::MeetingRegistry>,
438    /// Process-wide A2UI surface store. Agent-produced surfaces are
439    /// visible to every host UI subscriber, independent of the
440    /// WebSocket session that applied the update.
441    pub a2ui: car_a2ui::A2uiSurfaceStore,
442    /// Process-wide concurrency gate for inference RPC handlers. Sized
443    /// from host RAM at startup, overridable via
444    /// [`crate::admission::ENV_MAX_CONCURRENT`]. Without this, N
445    /// concurrent users multiply KV-cache and activation memory and
446    /// take the host out (#114-adjacent: filed alongside the daemon
447    /// always-on rework). The semaphore lives on `ServerState` so it
448    /// is shared across every WebSocket session in the same process.
449    pub admission: Arc<crate::admission::InferenceAdmission>,
450    /// Server-side A2A continuation auth keyed by A2UI surface id.
451    /// Kept out of `A2uiSurface.owner` so host renderers never see
452    /// bearer/API-key material.
453    pub a2ui_route_auth: Mutex<HashMap<String, A2aRouteAuth>>,
454    /// Lifecycle-managed agents — declarative manifest at
455    /// `~/.car/agents.json` driving spawn/restart/stop. Closes
456    /// Parslee-ai/car-releases#27. Lazy-initialized so embedders that
457    /// don't want process supervision don't pay the disk-touch cost
458    /// at server start.
459    pub supervisor: std::sync::OnceLock<Arc<car_registry::supervisor::Supervisor>>,
460    /// In-core A2A dispatcher — embedders that consume `car-server-core`
461    /// get A2A reachability "for free" without standing up a separate
462    /// HTTP listener. Closes Parslee-ai/car-releases#28. Lazy-init so
463    /// the embedder can override the runtime / task store / agent card
464    /// via [`ServerStateConfig::with_a2a_runtime`] etc. before the
465    /// first dispatch.
466    pub a2a_dispatcher: std::sync::OnceLock<Arc<car_a2a::A2aDispatcher>>,
467    /// WS clients subscribed to A2UI envelope events. After every
468    /// successful `a2ui.apply` / `a2ui.ingest`, the resulting
469    /// `A2uiApplyResult` is broadcast to every subscriber as an
470    /// `a2ui.event` JSON-RPC notification. Closes
471    /// Parslee-ai/car-releases#29. Subscribers register via the
472    /// `a2ui/subscribe` method and are auto-cleaned on WS disconnect.
473    pub a2ui_subscribers: Mutex<HashMap<String, Arc<WsChannel>>>,
474    /// Per-launch auth token. When `Some`, the WS dispatcher rejects
475    /// non-auth methods on unauthenticated sessions until the client
476    /// calls `session.auth` with the matching value. When `None`,
477    /// auth is disabled and every connection works as before. Set
478    /// at startup by `car-server` unless `--no-auth` is passed
479    /// (default flipped 2026-05); embedders that want to enable
480    /// auth call [`ServerState::install_auth_token`]. Closes
481    /// Parslee-ai/car-releases#32.
482    pub auth_token: std::sync::OnceLock<String>,
483    /// `agent_id -> client_id` map of currently-attached lifecycle
484    /// agents (#169). Populated by the `session.auth` handler when a
485    /// supervised child presents its `agent_id` + per-agent token;
486    /// drained on disconnect by `remove_session`. Single-claim:
487    /// a second connection presenting the same `agent_id` is
488    /// rejected so the daemon-side per-agent state stays unambiguous.
489    pub attached_agents: Mutex<HashMap<String, String>>,
490    /// `agent_id -> persistent memgine` map (#170). Lazy-loaded on
491    /// first connection per id from `~/.car/memory/agents/<id>.jsonl`,
492    /// retained across daemon restart, surviving any single
493    /// disconnect/reconnect of the supervised child. Connections
494    /// that auth without an `agent_id` (browser, host, ad-hoc CLI)
495    /// keep the per-WS ephemeral memgine on `ClientSession.memgine`
496    /// — no behaviour change.
497    pub agent_memgines:
498        Mutex<HashMap<String, Arc<Mutex<car_memgine::MemgineEngine>>>>,
499    /// Bound MCP HTTP-streamable URL (e.g.
500    /// `"http://127.0.0.1:9102/mcp"`) — `car-server` installs this
501    /// after binding the listener. Used by the
502    /// `agents.invoke_external` handler to default
503    /// `InvokeOptions.mcp_endpoint` so external agents
504    /// (Claude Code today) load the daemon's CAR namespace via
505    /// `--mcp-config` automatically. `None` when MCP isn't bound
506    /// (e.g. `--mcp-bind disabled`).
507    pub mcp_url: std::sync::OnceLock<String>,
508    /// Registry of connected MCP SSE sessions. Populated alongside
509    /// [`mcp_url`] when `car-server` boots the MCP listener. Public
510    /// so handlers can call `crate::mcp::push_to_session` to send
511    /// server-initiated requests to a specific MCP-connected
512    /// client (MCP-3 foundation; MCP-3b will wire host-owned tool
513    /// dispatch through this).
514    pub mcp_sessions: std::sync::OnceLock<Arc<crate::mcp::SessionMap>>,
515    /// Approval gate for high-risk WS methods (audit 2026-05). The
516    /// gate intercepts `automation.run_applescript`,
517    /// `automation.shortcuts.run`, `messages.send`, `mail.send`, and
518    /// `vision.ocr` before they dispatch, raises a
519    /// `host.create_approval` for the user to act on, and waits
520    /// (with a timeout) for `host.resolve_approval`. Approve →
521    /// dispatch continues; deny / timeout → JSON-RPC error code
522    /// `-32003`. The set of gated methods and the wait timeout are
523    /// embedder-overridable via
524    /// [`ServerStateConfig::with_approval_gate`].
525    pub approval_gate: ApprovalGate,
526    /// A2A-runtime / store / card factory carried over from the
527    /// embedder's [`ServerStateConfig`]. Consumed lazily on first
528    /// `a2a_dispatcher()` call so embedders can construct
529    /// `ServerState` without paying the runtime spin-up cost when
530    /// they don't actually use the A2A surface.
531    pub(crate) a2a_runtime: std::sync::Mutex<Option<Arc<car_engine::Runtime>>>,
532    pub(crate) a2a_store: std::sync::Mutex<Option<Arc<dyn car_a2a::TaskStore>>>,
533    pub(crate) a2a_card_source: std::sync::Mutex<Option<Arc<car_a2a::AgentCardSource>>>,
534}
535
536impl ServerState {
537    /// Constructor for the standalone `car-server` binary. Each WS
538    /// connection gets its own per-session memgine — matches the
539    /// pre-extraction default and is correct for a single-process
540    /// daemon serving one user at a time.
541    ///
542    /// **Embedders must not call this.** It silently leaves
543    /// `shared_memgine = None`, which re-introduces the dual-memgine
544    /// bug U7 was created to prevent (one engine in the embedder, a
545    /// fresh one inside every WS session). Embedders use
546    /// [`ServerState::embedded`] instead, which makes the shared
547    /// engine handle a required argument so it cannot be forgotten.
548    pub fn standalone(journal_dir: PathBuf) -> Self {
549        Self::with_config(ServerStateConfig::new(journal_dir))
550    }
551
552    /// Constructor for embedders (e.g. `tokhn-daemon`). The shared
553    /// memgine handle is **required**: every WS session created by
554    /// this state will reuse the same engine, preventing the
555    /// dual-memgine bug.
556    ///
557    /// For embedders that also want to inject a pre-warmed inference
558    /// engine or other advanced wiring, build a [`ServerStateConfig`]
559    /// directly and call [`ServerState::with_config`].
560    pub fn embedded(
561        journal_dir: PathBuf,
562        shared_memgine: Arc<Mutex<car_memgine::MemgineEngine>>,
563    ) -> Self {
564        Self::with_config(ServerStateConfig::new(journal_dir).with_shared_memgine(shared_memgine))
565    }
566
567    /// Build a `ServerState` from a [`ServerStateConfig`] — the path
568    /// embedders use when they need to inject a shared memgine *and*
569    /// a pre-warmed inference engine, or any other advanced wiring
570    /// the convenience constructors don't cover.
571    pub fn with_config(cfg: ServerStateConfig) -> Self {
572        let inference = std::sync::OnceLock::new();
573        if let Some(eng) = cfg.inference {
574            // OnceLock::set returns Err if already set — fresh OnceLock
575            // means it's empty, so this is infallible here.
576            let _ = inference.set(eng);
577        }
578        let voice_sessions = Arc::new(car_voice::VoiceSessionRegistry::new());
579        // Reap sessions whose clients dropped without calling
580        // voice.transcribe_stream.stop (WS disconnect, process exit,
581        // etc.). Listener handles otherwise leak for the daemon's
582        // lifetime. `with_config` is sync but always called from the
583        // `#[tokio::main]` entry point, so `Handle::try_current()`
584        // inside `start_sweeper` finds the runtime.
585        voice_sessions.start_sweeper();
586        Self {
587            journal_dir: cfg.journal_dir,
588            sessions: Mutex::new(HashMap::new()),
589            inference,
590            host: Arc::new(crate::host::HostState::new()),
591            shared_memgine: cfg.shared_memgine,
592            voice_sessions,
593            meetings: Arc::new(car_meeting::MeetingRegistry::new()),
594            a2ui: car_a2ui::A2uiSurfaceStore::new(),
595            admission: Arc::new(crate::admission::InferenceAdmission::new()),
596            a2ui_route_auth: Mutex::new(HashMap::new()),
597            supervisor: std::sync::OnceLock::new(),
598            a2a_dispatcher: std::sync::OnceLock::new(),
599            a2a_runtime: std::sync::Mutex::new(cfg.a2a_runtime),
600            a2a_store: std::sync::Mutex::new(cfg.a2a_store),
601            a2a_card_source: std::sync::Mutex::new(cfg.a2a_card_source),
602            a2ui_subscribers: Mutex::new(HashMap::new()),
603            auth_token: std::sync::OnceLock::new(),
604            attached_agents: Mutex::new(HashMap::new()),
605            agent_memgines: Mutex::new(HashMap::new()),
606            mcp_url: std::sync::OnceLock::new(),
607            mcp_sessions: std::sync::OnceLock::new(),
608            approval_gate: cfg.approval_gate.unwrap_or_default(),
609        }
610    }
611
612    /// Enable the per-launch auth handshake. After this call, every
613    /// new WS connection must call `session.auth` with `token` as
614    /// the first frame; otherwise the connection is closed. Called
615    /// by `car-server` at startup unless `--no-auth` is set
616    /// (default flipped 2026-05); embedders supply their own token
617    /// if they want the same posture. Returns `Err(token)` when
618    /// auth was already installed.
619    pub fn install_auth_token(&self, token: String) -> Result<(), String> {
620        self.auth_token.set(token)
621    }
622
623    /// Install the bound MCP URL after car-server's listener is up.
624    /// Idempotent on the first call; subsequent calls are accepted
625    /// silently (matches the supervisor / a2a_dispatcher install
626    /// idiom). Returns `Err(())` when an MCP URL was already
627    /// installed — embedders should treat this as "another
628    /// component beat us to it" and use whichever value is now set.
629    pub fn install_mcp_url(&self, url: String) -> Result<(), String> {
630        self.mcp_url.set(url)
631    }
632
633    /// Install the MCP SSE session registry. Pairs with
634    /// [`install_mcp_url`] — both come from the same `start_mcp`
635    /// call and either both get installed or neither does (the
636    /// daemon binds them together).
637    pub fn install_mcp_sessions(
638        &self,
639        sessions: Arc<crate::mcp::SessionMap>,
640    ) -> Result<(), Arc<crate::mcp::SessionMap>> {
641        self.mcp_sessions.set(sessions)
642    }
643
644    /// Lazy-initialize and return the agent supervisor. The first
645    /// call constructs a [`car_registry::supervisor::Supervisor`] backed by
646    /// `~/.car/agents.json` + `~/.car/logs/`. Embedders that need a
647    /// non-default location should call
648    /// [`ServerState::install_supervisor`] before any handler runs.
649    pub fn supervisor(&self) -> Result<Arc<car_registry::supervisor::Supervisor>, String> {
650        if let Some(s) = self.supervisor.get() {
651            return Ok(s.clone());
652        }
653        let s = car_registry::supervisor::Supervisor::user_default()
654            .map(Arc::new)
655            .map_err(|e| e.to_string())?;
656        // OnceLock::set returns the original arg back on collision —
657        // a concurrent caller racing through user_default. Take
658        // whichever wins.
659        let _ = self.supervisor.set(s);
660        Ok(self.supervisor.get().expect("set or pre-existing").clone())
661    }
662
663    /// Replace the lazy default with a caller-supplied supervisor.
664    /// Returns `Err(())` when a supervisor was already installed.
665    /// Used by the standalone `car-server` binary to call
666    /// `start_all()` on a known-good handle without paying the
667    /// lazy-init lookup cost.
668    pub fn install_supervisor(
669        &self,
670        supervisor: Arc<car_registry::supervisor::Supervisor>,
671    ) -> Result<(), Arc<car_registry::supervisor::Supervisor>> {
672        self.supervisor.set(supervisor)
673    }
674
675    /// Lazy-initialize and return the in-core A2A dispatcher. The
676    /// first call constructs an [`car_a2a::A2aDispatcher`] from
677    /// either the embedder's overrides (set via
678    /// [`ServerStateConfig::with_a2a_runtime`] / `with_a2a_store` /
679    /// `with_a2a_card_source`) or sensible defaults: a fresh
680    /// `Runtime` with `register_agent_basics` registered, an
681    /// `InMemoryTaskStore`, and a card built from the runtime's
682    /// tool schemas advertising `ws://127.0.0.1:9100/` as the
683    /// public URL. Closes Parslee-ai/car-releases#28.
684    pub async fn a2a_dispatcher(&self) -> Arc<car_a2a::A2aDispatcher> {
685        if let Some(d) = self.a2a_dispatcher.get() {
686            return d.clone();
687        }
688
689        // Embedder overrides take precedence; fall back to defaults
690        // for each slot independently (so an embedder that only
691        // wants a custom card can leave the runtime + store at
692        // defaults). `Mutex::take()` consumes the slot so the
693        // defaults aren't reconstructed on a racing init that loses
694        // the OnceLock::set call below.
695        let runtime = self
696            .a2a_runtime
697            .lock()
698            .expect("a2a_runtime mutex poisoned")
699            .take();
700        let runtime = match runtime {
701            Some(r) => r,
702            None => {
703                let r = Arc::new(car_engine::Runtime::new());
704                r.register_agent_basics().await;
705                r
706            }
707        };
708
709        let store = self
710            .a2a_store
711            .lock()
712            .expect("a2a_store mutex poisoned")
713            .take()
714            .unwrap_or_else(|| Arc::new(car_a2a::InMemoryTaskStore::new()));
715
716        let card_source = self
717            .a2a_card_source
718            .lock()
719            .expect("a2a_card_source mutex poisoned")
720            .take();
721        let card_source = match card_source {
722            Some(c) => c,
723            None => {
724                let card = car_a2a::build_default_agent_card(
725                    &runtime,
726                    car_a2a::AgentCardConfig::minimal(
727                        "Common Agent Runtime",
728                        "Embedded CAR daemon — A2A v1.0 reachable over WebSocket JSON-RPC.",
729                        "ws://127.0.0.1:9100/",
730                        car_a2a::AgentProvider {
731                            organization: "Parslee".into(),
732                            url: Some("https://github.com/Parslee-ai/car".into()),
733                        },
734                    ),
735                )
736                .await;
737                Arc::new(move || card.clone()) as Arc<car_a2a::AgentCardSource>
738            }
739        };
740
741        let dispatcher = Arc::new(car_a2a::A2aDispatcher::new(runtime, store, card_source));
742        // OnceLock::set returns Err on race — accept whichever
743        // dispatcher won and clone-return that one.
744        let _ = self.a2a_dispatcher.set(dispatcher);
745        self.a2a_dispatcher
746            .get()
747            .expect("a2a_dispatcher set or pre-existing")
748            .clone()
749    }
750
751    pub async fn create_session(
752        &self,
753        client_id: &str,
754        channel: Arc<WsChannel>,
755    ) -> Arc<ClientSession> {
756        let journal_path = self.journal_dir.join(format!("{}.jsonl", client_id));
757        let event_log = EventLog::with_journal(journal_path);
758
759        let executor = Arc::new(WsToolExecutor {
760            channel: channel.clone(),
761        });
762
763        let runtime = Runtime::new()
764            .with_event_log(event_log)
765            .with_executor(executor);
766
767        // If the embedder supplied a shared memgine, every session uses it.
768        // Otherwise each session gets its own — matches pre-extraction behavior.
769        let memgine = match &self.shared_memgine {
770            Some(eng) => eng.clone(),
771            None => Arc::new(Mutex::new(car_memgine::MemgineEngine::new(None))),
772        };
773
774        let session = Arc::new(ClientSession {
775            client_id: client_id.to_string(),
776            runtime: Arc::new(runtime),
777            channel,
778            host: self.host.clone(),
779            memgine,
780            browser: car_ffi_common::browser::BrowserSessionSlot::new(),
781            // When auth is disabled (no token installed), every
782            // session is "authenticated" by default — preserves the
783            // pre-#32 behaviour. When auth is enabled, the value is
784            // ignored on creation; the dispatcher's gate checks
785            // `ServerState::auth_token.is_some()` to decide whether
786            // to enforce.
787            authenticated: std::sync::atomic::AtomicBool::new(false),
788            agent_id: tokio::sync::Mutex::new(None),
789            bound_memgine: tokio::sync::Mutex::new(None),
790        });
791
792        self.sessions
793            .lock()
794            .await
795            .insert(client_id.to_string(), session.clone());
796
797        session
798    }
799
800    /// Remove a per-client session from the registry on disconnect.
801    /// Returns the removed session if present so callers can drop any
802    /// remaining strong refs (e.g. drain pending tool callbacks). Fix
803    /// for MULTI-4 / WS-3 — without this, `state.sessions` retains
804    /// `Arc<ClientSession>` for every connection that ever existed.
805    pub async fn remove_session(&self, client_id: &str) -> Option<Arc<ClientSession>> {
806        let removed = self.sessions.lock().await.remove(client_id);
807        if let Some(session) = &removed {
808            // #169: drop the agent_id → client_id binding so a
809            // disconnected lifecycle agent can reconnect (or its
810            // supervisor-respawned replacement can take the slot)
811            // without colliding with the stale claim.
812            let bound = session.agent_id.lock().await.clone();
813            if let Some(id) = bound {
814                let mut attached = self.attached_agents.lock().await;
815                if attached.get(&id).map(String::as_str) == Some(client_id) {
816                    attached.remove(&id);
817                }
818            }
819        }
820        removed
821    }
822}