car_server_core/session.rs
1//! Server-side session state — shared across all connections.
2
3use car_engine::{Runtime, ToolExecutor};
4use car_eventlog::EventLog;
5use car_proto::{ToolExecuteRequest, ToolExecuteResponse};
6use futures::Sink;
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use std::collections::HashMap;
10use std::path::PathBuf;
11use std::pin::Pin;
12use std::sync::atomic::{AtomicU64, Ordering};
13use std::sync::Arc;
14use tokio::sync::{oneshot, Mutex};
15use tokio_tungstenite::tungstenite::{Error as WsError, Message};
16
17/// Type-erased WebSocket sink. The dispatch loop accepts either a
18/// `WebSocketStream<TcpStream>` (the legacy car-server TCP listener)
19/// or a `WebSocketStream<UnixStream>` (the daemon-as-default UDS
20/// listener) — both implement `Sink<Message, Error = WsError>` after
21/// the tungstenite handshake. Erasing the type here avoids cascading
22/// a generic parameter through every WsChannel / Session / ServerState
23/// touchpoint in the dispatcher.
24pub type WsSink = Pin<Box<dyn Sink<Message, Error = WsError> + Send + Unpin + 'static>>;
25
26/// Server-side credentials for continuing an A2A-owned A2UI surface.
27///
28/// This intentionally lives outside `car_a2ui::A2uiSurfaceOwner` so
29/// renderers can inspect surface ownership without receiving secrets.
30#[derive(Debug, Clone, Serialize, Deserialize)]
31#[serde(rename_all = "camelCase", tag = "type")]
32pub enum A2aRouteAuth {
33 None,
34 Bearer { token: String },
35 Header { name: String, value: String },
36}
37
38/// Shared write half of the WebSocket, plus pending callback channels.
39/// `write` is type-erased via [`WsSink`] so the dispatcher can run
40/// against any transport-specific WebSocketStream (TCP or UDS today;
41/// axum-bridged in future) without templatizing every consumer.
42pub struct WsChannel {
43 pub write: Mutex<WsSink>,
44 /// Pending tool execution callbacks: request_id → oneshot sender
45 pub pending: Mutex<HashMap<String, oneshot::Sender<ToolExecuteResponse>>>,
46 pub next_id: AtomicU64,
47}
48
49impl WsChannel {
50 pub fn next_request_id(&self) -> String {
51 let id = self.next_id.fetch_add(1, Ordering::SeqCst);
52 format!("cb-{}", id)
53 }
54}
55
56/// Tool executor that sends callbacks to the client over WebSocket.
57pub struct WsToolExecutor {
58 pub channel: Arc<WsChannel>,
59}
60
61#[async_trait::async_trait]
62impl ToolExecutor for WsToolExecutor {
63 async fn execute(&self, tool: &str, params: &Value) -> Result<Value, String> {
64 use futures::SinkExt;
65
66 let request_id = self.channel.next_request_id();
67
68 let callback = ToolExecuteRequest {
69 action_id: request_id.clone(),
70 tool: tool.to_string(),
71 parameters: params.clone(),
72 timeout_ms: None,
73 attempt: 1,
74 };
75
76 // Create a oneshot channel for the response
77 let (tx, rx) = oneshot::channel();
78 self.channel
79 .pending
80 .lock()
81 .await
82 .insert(request_id.clone(), tx);
83
84 // Send the callback to the client as a JSON-RPC request
85 let rpc_request = serde_json::json!({
86 "jsonrpc": "2.0",
87 "method": "tools.execute",
88 "params": callback,
89 "id": request_id,
90 });
91
92 let msg = Message::Text(
93 serde_json::to_string(&rpc_request)
94 .map_err(|e| e.to_string())?
95 .into(),
96 );
97 self.channel
98 .write
99 .lock()
100 .await
101 .send(msg)
102 .await
103 .map_err(|e| format!("failed to send tool callback: {}", e))?;
104
105 // Wait for the client to respond (with a timeout)
106 let response = tokio::time::timeout(std::time::Duration::from_secs(60), rx)
107 .await
108 .map_err(|_| format!("tool '{}' callback timed out (60s)", tool))?
109 .map_err(|_| format!("tool '{}' callback channel closed", tool))?;
110
111 if let Some(err) = response.error {
112 Err(err)
113 } else {
114 Ok(response.output.unwrap_or(Value::Null))
115 }
116 }
117}
118
119/// Voice event sink that forwards events to a specific WebSocket client
120/// as `voice.event` JSON-RPC notifications.
121///
122/// Each `voice.transcribe_stream.start` call constructs one of these
123/// bound to the originating client's [`WsChannel`], so a client only
124/// receives events for sessions it started.
125pub struct WsVoiceEventSink {
126 pub channel: Arc<WsChannel>,
127}
128
129impl car_voice::VoiceEventSink for WsVoiceEventSink {
130 fn send(&self, session_id: &str, event_json: String) {
131 use futures::SinkExt;
132 let channel = self.channel.clone();
133 let session_id = session_id.to_string();
134 tokio::spawn(async move {
135 let payload: Value = serde_json::from_str(&event_json)
136 .unwrap_or_else(|_| Value::String(event_json.clone()));
137 let notification = serde_json::json!({
138 "jsonrpc": "2.0",
139 "method": "voice.event",
140 "params": {
141 "session_id": session_id,
142 "event": payload,
143 },
144 });
145 let Ok(text) = serde_json::to_string(¬ification) else {
146 return;
147 };
148 let _ = channel
149 .write
150 .lock()
151 .await
152 .send(Message::Text(text.into()))
153 .await;
154 });
155 }
156}
157
158/// Per-meeting fanout sink that ingests transcript text into a
159/// session-scoped memgine using the `Arc<tokio::sync::Mutex<...>>`
160/// wrapper, then forwards every event upstream untouched.
161///
162/// Lives here (not in `car-ffi-common`) because the engine handle uses
163/// `tokio::sync::Mutex` per the "one-wrapper rule" — the FFI-common
164/// `MeetingMemgineFanout` still uses `std::sync::Mutex` for the NAPI/
165/// PyO3 bindings, which keep their sync wrappers. Each binding owns the
166/// fanout that matches its lock primitive; the parsing/formatting logic
167/// itself is shared via [`car_meeting::extract_transcript_for_ingest`].
168///
169/// `send` is called from the voice drain task and must be non-blocking,
170/// so the lock acquisition is shipped to a `tokio::spawn`. Transcript
171/// events are independent so reordering across spawned tasks is fine.
172pub struct WsMemgineIngestSink {
173 pub meeting_id: String,
174 pub engine: Arc<Mutex<car_memgine::MemgineEngine>>,
175 pub upstream: Arc<dyn car_voice::VoiceEventSink>,
176}
177
178impl car_voice::VoiceEventSink for WsMemgineIngestSink {
179 fn send(&self, voice_session_id: &str, event_json: String) {
180 if let Ok(value) = serde_json::from_str::<Value>(&event_json) {
181 if let Some((speaker, text)) = car_meeting::extract_transcript_for_ingest(
182 &value,
183 &self.meeting_id,
184 voice_session_id,
185 ) {
186 let engine = self.engine.clone();
187 tokio::spawn(async move {
188 let mut guard = engine.lock().await;
189 guard.ingest_conversation(&speaker, &text, chrono::Utc::now());
190 });
191 }
192 }
193 self.upstream.send(voice_session_id, event_json);
194 }
195}
196
197/// Per-client session.
198pub struct ClientSession {
199 pub client_id: String,
200 pub runtime: Arc<Runtime>,
201 pub channel: Arc<WsChannel>,
202 pub host: Arc<crate::host::HostState>,
203 /// Memgine handle. Wrapped in `tokio::sync::Mutex` so dispatcher
204 /// handlers can hold the lock across `.await` points without
205 /// risking poisoning. Migrated from `std::sync::Mutex` in the
206 /// car-server-core extraction (U1) per the "one-wrapper rule".
207 pub memgine: Arc<Mutex<car_memgine::MemgineEngine>>,
208 /// Lazy browser session — first `browser.run` call launches Chromium,
209 /// subsequent calls reuse it so element IDs resolve across invocations
210 /// within the same WebSocket connection.
211 pub browser: car_ffi_common::browser::BrowserSessionSlot,
212 /// Per-connection auth state. Starts `false`; flips to `true`
213 /// after a successful `session.auth` handshake. Always considered
214 /// authenticated when `ServerState::auth_token` is unset (auth
215 /// disabled). Closes Parslee-ai/car-releases#32.
216 pub authenticated: std::sync::atomic::AtomicBool,
217 /// Bound agent identity (#169). `Some(id)` once a lifecycle-agent
218 /// child has called `session.auth { token, agent_id }` and the
219 /// supervisor confirmed `agent_id` is supervised + token matches.
220 /// Used by `agents.list` to surface which managed agents have
221 /// actually attached vs. just being marked `Running` at the
222 /// process level. Cleared at disconnect by `remove_session`.
223 pub agent_id: tokio::sync::Mutex<Option<String>>,
224 /// Bound persistent memgine (#170). `Some` after `session.auth`
225 /// successfully attaches the connection to a daemon-owned
226 /// per-agent memgine (paired with `agent_id`). Memory handlers
227 /// route through [`ClientSession::effective_memgine`] which
228 /// returns this when set, falling back to the ephemeral
229 /// `memgine` field for browser/host/CLI connections.
230 pub bound_memgine: tokio::sync::Mutex<Option<Arc<Mutex<car_memgine::MemgineEngine>>>>,
231}
232
233impl ClientSession {
234 /// Returns the memgine handle the memory.* handlers should use:
235 /// the bound per-agent memgine when this session attached via
236 /// `session.auth { agent_id }` (#169 + #170), otherwise the
237 /// ephemeral per-WS memgine. Cheap (one async lock + Arc clone).
238 pub async fn effective_memgine(&self) -> Arc<Mutex<car_memgine::MemgineEngine>> {
239 if let Some(eng) = self.bound_memgine.lock().await.as_ref() {
240 return eng.clone();
241 }
242 self.memgine.clone()
243 }
244}
245
246/// Builder for constructing a [`ServerState`] with embedder-supplied
247/// dependencies. Embedders (e.g. `tokhn-daemon`) use this to inject
248/// their own memgine handle and other shared infrastructure; the
249/// Approval-gate policy for high-risk WS methods.
250///
251/// Every method in `methods` must be acknowledged via
252/// `host.resolve_approval` before the dispatcher will route the
253/// request to its handler. The dispatcher waits up to `timeout` for
254/// a resolution; on timeout (or any non-`approve` resolution) the
255/// request fails with JSON-RPC error `-32003`.
256///
257/// Default: gate enabled, the macOS-automation surface
258/// (`automation.run_applescript`, `automation.shortcuts.run`,
259/// `messages.send`, `mail.send`, `vision.ocr`), 60-second timeout.
260/// `car-server --no-approvals` (or embedders calling
261/// [`ServerStateConfig::with_approval_gate`] with `enabled=false`)
262/// turns it off — only appropriate when no untrusted caller can
263/// reach the WS port.
264#[derive(Debug, Clone)]
265pub struct ApprovalGate {
266 /// Master switch. When `false`, every method dispatches without
267 /// raising an approval — the pre-2026-05 behaviour.
268 pub enabled: bool,
269 /// Methods that require approval. Match is by exact method-name
270 /// string against the JSON-RPC `method` field.
271 pub methods: std::collections::HashSet<String>,
272 /// How long to wait for the user to resolve the approval before
273 /// timing out and surfacing an error to the caller.
274 pub timeout: std::time::Duration,
275}
276
277impl Default for ApprovalGate {
278 fn default() -> Self {
279 let methods = [
280 "automation.run_applescript",
281 "automation.shortcuts.run",
282 "messages.send",
283 "mail.send",
284 "vision.ocr",
285 ]
286 .iter()
287 .map(|s| s.to_string())
288 .collect();
289 Self {
290 enabled: true,
291 methods,
292 timeout: std::time::Duration::from_secs(60),
293 }
294 }
295}
296
297impl ApprovalGate {
298 /// Disable the gate entirely. Equivalent to passing
299 /// `car-server --no-approvals`. Only appropriate when no
300 /// untrusted caller can reach the WS port.
301 pub fn disabled() -> Self {
302 Self {
303 enabled: false,
304 methods: std::collections::HashSet::new(),
305 timeout: std::time::Duration::from_secs(60),
306 }
307 }
308
309 /// `true` if this method must be acknowledged before dispatch.
310 pub fn requires_approval(&self, method: &str) -> bool {
311 self.enabled && self.methods.contains(method)
312 }
313}
314
315/// standalone `car-server` binary uses [`ServerState::standalone`]
316/// which calls `with_config` under the hood.
317pub struct ServerStateConfig {
318 pub journal_dir: PathBuf,
319 /// Optional pre-constructed memgine engine. When `None`, each
320 /// `create_session` call builds a fresh engine; embedders that want
321 /// to share a single engine across sessions can supply a clone of
322 /// their `Arc<Mutex<MemgineEngine>>` here.
323 pub shared_memgine: Option<Arc<Mutex<car_memgine::MemgineEngine>>>,
324 /// Optional pre-constructed inference engine.
325 pub inference: Option<Arc<car_inference::InferenceEngine>>,
326 /// Optional embedder-supplied A2A runtime. Used by the in-core
327 /// `A2aDispatcher` to execute peer-driven proposals. When `None`,
328 /// the dispatcher uses a fresh `Runtime` with `register_agent_basics`
329 /// — peer agents see CAR's built-in tools and nothing else,
330 /// matching the behaviour of the standalone `start_a2a_listener`.
331 pub a2a_runtime: Option<Arc<car_engine::Runtime>>,
332 /// Optional embedder-supplied A2A task store. When `None`,
333 /// defaults to `InMemoryTaskStore`. tokhn-style embedders that
334 /// want a polling-friendly persistent store plug it in here.
335 pub a2a_store: Option<Arc<dyn car_a2a::TaskStore>>,
336 /// Optional embedder-supplied agent card factory. When `None`,
337 /// the dispatcher serves a card built from the A2A runtime's
338 /// tool schemas at construction time, advertising its public URL
339 /// as `ws://127.0.0.1:9100/` (the WS surface the dispatcher itself
340 /// is reachable on).
341 pub a2a_card_source: Option<Arc<car_a2a::AgentCardSource>>,
342 /// Approval-gate policy. When `None`, the dispatcher uses
343 /// [`ApprovalGate::default`] (gate ON, the macOS-automation
344 /// surface gated, 60s timeout). Pass
345 /// [`ApprovalGate::disabled`] to opt out — only appropriate
346 /// when no untrusted caller can reach the WS port.
347 pub approval_gate: Option<ApprovalGate>,
348}
349
350impl ServerStateConfig {
351 /// Minimal config suitable for the standalone car-server binary:
352 /// only the journal dir is required; everything else is lazily
353 /// constructed at first use.
354 pub fn new(journal_dir: PathBuf) -> Self {
355 Self {
356 journal_dir,
357 shared_memgine: None,
358 inference: None,
359 a2a_runtime: None,
360 a2a_store: None,
361 a2a_card_source: None,
362 approval_gate: None,
363 }
364 }
365
366 pub fn with_shared_memgine(mut self, engine: Arc<Mutex<car_memgine::MemgineEngine>>) -> Self {
367 self.shared_memgine = Some(engine);
368 self
369 }
370
371 pub fn with_inference(mut self, engine: Arc<car_inference::InferenceEngine>) -> Self {
372 self.inference = Some(engine);
373 self
374 }
375
376 /// Plug in an embedder-supplied runtime for the A2A dispatcher.
377 /// Use case: tokhn-daemon wants peers to see its OPA preflight
378 /// tooling, not just CAR's `register_agent_basics` defaults.
379 pub fn with_a2a_runtime(mut self, runtime: Arc<car_engine::Runtime>) -> Self {
380 self.a2a_runtime = Some(runtime);
381 self
382 }
383
384 /// Plug in an embedder-supplied task store for the A2A
385 /// dispatcher. Use case: tokhn's polling-friendly persistent
386 /// store keyed by their session id.
387 pub fn with_a2a_store(mut self, store: Arc<dyn car_a2a::TaskStore>) -> Self {
388 self.a2a_store = Some(store);
389 self
390 }
391
392 /// Plug in an embedder-supplied agent card factory. The factory
393 /// is invoked on every `agent/getAuthenticatedExtendedCard`
394 /// dispatch, so embedders can reflect runtime tool changes.
395 pub fn with_a2a_card_source(mut self, source: Arc<car_a2a::AgentCardSource>) -> Self {
396 self.a2a_card_source = Some(source);
397 self
398 }
399
400 /// Override the approval-gate policy. Pass
401 /// [`ApprovalGate::disabled`] to skip the gate entirely (only
402 /// appropriate when no untrusted caller can reach the WS port);
403 /// pass a customised [`ApprovalGate`] to add or remove methods
404 /// or to change the timeout.
405 pub fn with_approval_gate(mut self, gate: ApprovalGate) -> Self {
406 self.approval_gate = Some(gate);
407 self
408 }
409}
410
411/// Global server state shared across all connections.
412pub struct ServerState {
413 pub journal_dir: PathBuf,
414 pub sessions: Mutex<HashMap<String, Arc<ClientSession>>>,
415 pub inference: std::sync::OnceLock<Arc<car_inference::InferenceEngine>>,
416 pub host: Arc<crate::host::HostState>,
417 /// When `Some`, `create_session` clones this handle into every new
418 /// `ClientSession.memgine` — embedders that want a single shared
419 /// memgine across all WS sessions set this. Standalone car-server
420 /// leaves it `None`, which gives each session its own engine
421 /// (preserving today's behavior).
422 pub shared_memgine: Option<Arc<Mutex<car_memgine::MemgineEngine>>>,
423 /// Process-wide voice session registry. Each
424 /// `voice.transcribe_stream.start` call registers its own per-client
425 /// [`WsVoiceEventSink`] so events route back to the originating WS
426 /// connection only.
427 pub voice_sessions: Arc<car_voice::VoiceSessionRegistry>,
428 /// Process-wide meeting registry. Meeting ids are global; each
429 /// meeting binds to the originating client's WS for upstream
430 /// events but persists transcripts to the resolved
431 /// `.car/meetings/<id>/` regardless of which client started it.
432 pub meetings: Arc<car_meeting::MeetingRegistry>,
433 /// Process-wide A2UI surface store. Agent-produced surfaces are
434 /// visible to every host UI subscriber, independent of the
435 /// WebSocket session that applied the update.
436 pub a2ui: car_a2ui::A2uiSurfaceStore,
437 /// In-process UI-improvement agent. Invoked from
438 /// `handle_a2ui_render_report` with each inbound report; returned
439 /// `Decision::Patch` envelopes are applied via the standard
440 /// `apply_a2ui_envelope` path so all subscribers see the patch.
441 /// `Arc` so the agent's interior `DashMap` state survives across
442 /// handler calls even when `ServerState` is cheap-cloned.
443 pub ui_agent: Arc<car_ui_agent::UIImprovementAgent>,
444 /// Per-surface oscillation detector for the UI-improvement
445 /// loop. Sits between the agent's `Decision::Patch` and the
446 /// apply path so A→B→A patch cycles get cooled down without
447 /// the agent itself having to track history. neo's review:
448 /// "controllers use workqueue backoff; reconcilers stay
449 /// stateless."
450 pub ui_agent_oscillation: Arc<crate::ui_agent_loop::OscillationDetector>,
451 /// Per-surface iteration budget. Backstop against runaway
452 /// loops the oscillation detector misses — caps total agent-
453 /// driven patches per surface at `DEFAULT_MAX_ITERATIONS`.
454 pub ui_agent_budget: Arc<crate::ui_agent_loop::IterationBudget>,
455 /// Process-wide concurrency gate for inference RPC handlers. Sized
456 /// from host RAM at startup, overridable via
457 /// [`crate::admission::ENV_MAX_CONCURRENT`]. Without this, N
458 /// concurrent users multiply KV-cache and activation memory and
459 /// take the host out (#114-adjacent: filed alongside the daemon
460 /// always-on rework). The semaphore lives on `ServerState` so it
461 /// is shared across every WebSocket session in the same process.
462 pub admission: Arc<crate::admission::InferenceAdmission>,
463 /// Server-side A2A continuation auth keyed by A2UI surface id.
464 /// Kept out of `A2uiSurface.owner` so host renderers never see
465 /// bearer/API-key material.
466 pub a2ui_route_auth: Mutex<HashMap<String, A2aRouteAuth>>,
467 /// Lifecycle-managed agents — declarative manifest at
468 /// `~/.car/agents.json` driving spawn/restart/stop. Closes
469 /// Parslee-ai/car-releases#27. Lazy-initialized so embedders that
470 /// don't want process supervision don't pay the disk-touch cost
471 /// at server start.
472 pub supervisor: std::sync::OnceLock<Arc<car_registry::supervisor::Supervisor>>,
473 /// In-core A2A dispatcher — embedders that consume `car-server-core`
474 /// get A2A reachability "for free" without standing up a separate
475 /// HTTP listener. Closes Parslee-ai/car-releases#28. Lazy-init so
476 /// the embedder can override the runtime / task store / agent card
477 /// via [`ServerStateConfig::with_a2a_runtime`] etc. before the
478 /// first dispatch.
479 pub a2a_dispatcher: std::sync::OnceLock<Arc<car_a2a::A2aDispatcher>>,
480 /// WS clients subscribed to A2UI envelope events. After every
481 /// successful `a2ui.apply` / `a2ui.ingest`, the resulting
482 /// `A2uiApplyResult` is broadcast to every subscriber as an
483 /// `a2ui.event` JSON-RPC notification. Closes
484 /// Parslee-ai/car-releases#29. Subscribers register via the
485 /// `a2ui/subscribe` method and are auto-cleaned on WS disconnect.
486 pub a2ui_subscribers: Mutex<HashMap<String, Arc<WsChannel>>>,
487 /// Per-launch auth token. When `Some`, the WS dispatcher rejects
488 /// non-auth methods on unauthenticated sessions until the client
489 /// calls `session.auth` with the matching value. When `None`,
490 /// auth is disabled and every connection works as before. Set
491 /// at startup by `car-server` unless `--no-auth` is passed
492 /// (default flipped 2026-05); embedders that want to enable
493 /// auth call [`ServerState::install_auth_token`]. Closes
494 /// Parslee-ai/car-releases#32.
495 pub auth_token: std::sync::OnceLock<String>,
496 /// `agent_id -> client_id` map of currently-attached lifecycle
497 /// agents (#169). Populated by the `session.auth` handler when a
498 /// supervised child presents its `agent_id` + per-agent token;
499 /// drained on disconnect by `remove_session`. Single-claim:
500 /// a second connection presenting the same `agent_id` is
501 /// rejected so the daemon-side per-agent state stays unambiguous.
502 pub attached_agents: Mutex<HashMap<String, String>>,
503 /// `agent_id -> persistent memgine` map (#170). Lazy-loaded on
504 /// first connection per id from `~/.car/memory/agents/<id>.jsonl`,
505 /// retained across daemon restart, surviving any single
506 /// disconnect/reconnect of the supervised child. Connections
507 /// that auth without an `agent_id` (browser, host, ad-hoc CLI)
508 /// keep the per-WS ephemeral memgine on `ClientSession.memgine`
509 /// — no behaviour change.
510 pub agent_memgines: Mutex<HashMap<String, Arc<Mutex<car_memgine::MemgineEngine>>>>,
511 /// Bound MCP HTTP-streamable URL (e.g.
512 /// `"http://127.0.0.1:9102/mcp"`) — `car-server` installs this
513 /// after binding the listener. Used by the
514 /// `agents.invoke_external` handler to default
515 /// `InvokeOptions.mcp_endpoint` so external agents
516 /// (Claude Code today) load the daemon's CAR namespace via
517 /// `--mcp-config` automatically. `None` when MCP isn't bound
518 /// (e.g. `--mcp-bind disabled`).
519 pub mcp_url: std::sync::OnceLock<String>,
520 /// Registry of connected MCP SSE sessions. Populated alongside
521 /// [`mcp_url`] when `car-server` boots the MCP listener. Public
522 /// so handlers can call `crate::mcp::push_to_session` to send
523 /// server-initiated requests to a specific MCP-connected
524 /// client (MCP-3 foundation; MCP-3b will wire host-owned tool
525 /// dispatch through this).
526 pub mcp_sessions: std::sync::OnceLock<Arc<crate::mcp::SessionMap>>,
527 /// Approval gate for high-risk WS methods (audit 2026-05). The
528 /// gate intercepts `automation.run_applescript`,
529 /// `automation.shortcuts.run`, `messages.send`, `mail.send`, and
530 /// `vision.ocr` before they dispatch, raises a
531 /// `host.create_approval` for the user to act on, and waits
532 /// (with a timeout) for `host.resolve_approval`. Approve →
533 /// dispatch continues; deny / timeout → JSON-RPC error code
534 /// `-32003`. The set of gated methods and the wait timeout are
535 /// embedder-overridable via
536 /// [`ServerStateConfig::with_approval_gate`].
537 pub approval_gate: ApprovalGate,
538 /// A2A-runtime / store / card factory carried over from the
539 /// embedder's [`ServerStateConfig`]. Consumed lazily on first
540 /// `a2a_dispatcher()` call so embedders can construct
541 /// `ServerState` without paying the runtime spin-up cost when
542 /// they don't actually use the A2A surface.
543 pub(crate) a2a_runtime: std::sync::Mutex<Option<Arc<car_engine::Runtime>>>,
544 pub(crate) a2a_store: std::sync::Mutex<Option<Arc<dyn car_a2a::TaskStore>>>,
545 pub(crate) a2a_card_source: std::sync::Mutex<Option<Arc<car_a2a::AgentCardSource>>>,
546}
547
548impl ServerState {
549 /// Constructor for the standalone `car-server` binary. Each WS
550 /// connection gets its own per-session memgine — matches the
551 /// pre-extraction default and is correct for a single-process
552 /// daemon serving one user at a time.
553 ///
554 /// **Embedders must not call this.** It silently leaves
555 /// `shared_memgine = None`, which re-introduces the dual-memgine
556 /// bug U7 was created to prevent (one engine in the embedder, a
557 /// fresh one inside every WS session). Embedders use
558 /// [`ServerState::embedded`] instead, which makes the shared
559 /// engine handle a required argument so it cannot be forgotten.
560 pub fn standalone(journal_dir: PathBuf) -> Self {
561 Self::with_config(ServerStateConfig::new(journal_dir))
562 }
563
564 /// Constructor for embedders (e.g. `tokhn-daemon`). The shared
565 /// memgine handle is **required**: every WS session created by
566 /// this state will reuse the same engine, preventing the
567 /// dual-memgine bug.
568 ///
569 /// For embedders that also want to inject a pre-warmed inference
570 /// engine or other advanced wiring, build a [`ServerStateConfig`]
571 /// directly and call [`ServerState::with_config`].
572 pub fn embedded(
573 journal_dir: PathBuf,
574 shared_memgine: Arc<Mutex<car_memgine::MemgineEngine>>,
575 ) -> Self {
576 Self::with_config(ServerStateConfig::new(journal_dir).with_shared_memgine(shared_memgine))
577 }
578
579 /// Build a `ServerState` from a [`ServerStateConfig`] — the path
580 /// embedders use when they need to inject a shared memgine *and*
581 /// a pre-warmed inference engine, or any other advanced wiring
582 /// the convenience constructors don't cover.
583 pub fn with_config(cfg: ServerStateConfig) -> Self {
584 let inference = std::sync::OnceLock::new();
585 if let Some(eng) = cfg.inference {
586 // OnceLock::set returns Err if already set — fresh OnceLock
587 // means it's empty, so this is infallible here.
588 let _ = inference.set(eng);
589 }
590 let voice_sessions = Arc::new(car_voice::VoiceSessionRegistry::new());
591 // Reap sessions whose clients dropped without calling
592 // voice.transcribe_stream.stop (WS disconnect, process exit,
593 // etc.). Listener handles otherwise leak for the daemon's
594 // lifetime. `with_config` is sync but always called from the
595 // `#[tokio::main]` entry point, so `Handle::try_current()`
596 // inside `start_sweeper` finds the runtime.
597 voice_sessions.start_sweeper();
598 // UI-improvement agent is pure decision logic — no I/O, no
599 // persistence handle. Memgine ingest of strategy outcomes is
600 // the caller's responsibility (handler.rs after a successful
601 // Decision::Patch). Keeps the agent crate Mutex-flavor
602 // agnostic so it can compose with std/tokio mutex callers.
603 let ui_agent = Arc::new(car_ui_agent::UIImprovementAgent::with_default_strategies());
604 let ui_agent_oscillation =
605 Arc::new(crate::ui_agent_loop::OscillationDetector::new());
606 let ui_agent_budget = Arc::new(crate::ui_agent_loop::IterationBudget::new());
607 Self {
608 journal_dir: cfg.journal_dir,
609 sessions: Mutex::new(HashMap::new()),
610 inference,
611 host: Arc::new(crate::host::HostState::new()),
612 shared_memgine: cfg.shared_memgine,
613 voice_sessions,
614 meetings: Arc::new(car_meeting::MeetingRegistry::new()),
615 a2ui: car_a2ui::A2uiSurfaceStore::new(),
616 ui_agent,
617 ui_agent_oscillation,
618 ui_agent_budget,
619 admission: Arc::new(crate::admission::InferenceAdmission::new()),
620 a2ui_route_auth: Mutex::new(HashMap::new()),
621 supervisor: std::sync::OnceLock::new(),
622 a2a_dispatcher: std::sync::OnceLock::new(),
623 a2a_runtime: std::sync::Mutex::new(cfg.a2a_runtime),
624 a2a_store: std::sync::Mutex::new(cfg.a2a_store),
625 a2a_card_source: std::sync::Mutex::new(cfg.a2a_card_source),
626 a2ui_subscribers: Mutex::new(HashMap::new()),
627 auth_token: std::sync::OnceLock::new(),
628 attached_agents: Mutex::new(HashMap::new()),
629 agent_memgines: Mutex::new(HashMap::new()),
630 mcp_url: std::sync::OnceLock::new(),
631 mcp_sessions: std::sync::OnceLock::new(),
632 approval_gate: cfg.approval_gate.unwrap_or_default(),
633 }
634 }
635
636 /// Enable the per-launch auth handshake. After this call, every
637 /// new WS connection must call `session.auth` with `token` as
638 /// the first frame; otherwise the connection is closed. Called
639 /// by `car-server` at startup unless `--no-auth` is set
640 /// (default flipped 2026-05); embedders supply their own token
641 /// if they want the same posture. Returns `Err(token)` when
642 /// auth was already installed.
643 pub fn install_auth_token(&self, token: String) -> Result<(), String> {
644 self.auth_token.set(token)
645 }
646
647 /// Install the bound MCP URL after car-server's listener is up.
648 /// Idempotent on the first call; subsequent calls are accepted
649 /// silently (matches the supervisor / a2a_dispatcher install
650 /// idiom). Returns `Err(())` when an MCP URL was already
651 /// installed — embedders should treat this as "another
652 /// component beat us to it" and use whichever value is now set.
653 pub fn install_mcp_url(&self, url: String) -> Result<(), String> {
654 self.mcp_url.set(url)
655 }
656
657 /// Install the MCP SSE session registry. Pairs with
658 /// [`install_mcp_url`] — both come from the same `start_mcp`
659 /// call and either both get installed or neither does (the
660 /// daemon binds them together).
661 pub fn install_mcp_sessions(
662 &self,
663 sessions: Arc<crate::mcp::SessionMap>,
664 ) -> Result<(), Arc<crate::mcp::SessionMap>> {
665 self.mcp_sessions.set(sessions)
666 }
667
668 /// Lazy-initialize and return the agent supervisor. The first
669 /// call constructs a [`car_registry::supervisor::Supervisor`] backed by
670 /// `~/.car/agents.json` + `~/.car/logs/`. Embedders that need a
671 /// non-default location should call
672 /// [`ServerState::install_supervisor`] before any handler runs.
673 pub fn supervisor(&self) -> Result<Arc<car_registry::supervisor::Supervisor>, String> {
674 if let Some(s) = self.supervisor.get() {
675 return Ok(s.clone());
676 }
677 let s = car_registry::supervisor::Supervisor::user_default()
678 .map(Arc::new)
679 .map_err(|e| e.to_string())?;
680 // OnceLock::set returns the original arg back on collision —
681 // a concurrent caller racing through user_default. Take
682 // whichever wins.
683 let _ = self.supervisor.set(s);
684 Ok(self.supervisor.get().expect("set or pre-existing").clone())
685 }
686
687 /// Replace the lazy default with a caller-supplied supervisor.
688 /// Returns `Err(())` when a supervisor was already installed.
689 /// Used by the standalone `car-server` binary to call
690 /// `start_all()` on a known-good handle without paying the
691 /// lazy-init lookup cost.
692 pub fn install_supervisor(
693 &self,
694 supervisor: Arc<car_registry::supervisor::Supervisor>,
695 ) -> Result<(), Arc<car_registry::supervisor::Supervisor>> {
696 self.supervisor.set(supervisor)
697 }
698
699 /// Lazy-initialize and return the in-core A2A dispatcher. The
700 /// first call constructs an [`car_a2a::A2aDispatcher`] from
701 /// either the embedder's overrides (set via
702 /// [`ServerStateConfig::with_a2a_runtime`] / `with_a2a_store` /
703 /// `with_a2a_card_source`) or sensible defaults: a fresh
704 /// `Runtime` with `register_agent_basics` registered, an
705 /// `InMemoryTaskStore`, and a card built from the runtime's
706 /// tool schemas advertising `ws://127.0.0.1:9100/` as the
707 /// public URL. Closes Parslee-ai/car-releases#28.
708 pub async fn a2a_dispatcher(&self) -> Arc<car_a2a::A2aDispatcher> {
709 if let Some(d) = self.a2a_dispatcher.get() {
710 return d.clone();
711 }
712
713 // Embedder overrides take precedence; fall back to defaults
714 // for each slot independently (so an embedder that only
715 // wants a custom card can leave the runtime + store at
716 // defaults). `Mutex::take()` consumes the slot so the
717 // defaults aren't reconstructed on a racing init that loses
718 // the OnceLock::set call below.
719 let runtime = self
720 .a2a_runtime
721 .lock()
722 .expect("a2a_runtime mutex poisoned")
723 .take();
724 let runtime = match runtime {
725 Some(r) => r,
726 None => {
727 let r = Arc::new(car_engine::Runtime::new());
728 r.register_agent_basics().await;
729 r
730 }
731 };
732
733 let store = self
734 .a2a_store
735 .lock()
736 .expect("a2a_store mutex poisoned")
737 .take()
738 .unwrap_or_else(|| Arc::new(car_a2a::InMemoryTaskStore::new()));
739
740 let card_source = self
741 .a2a_card_source
742 .lock()
743 .expect("a2a_card_source mutex poisoned")
744 .take();
745 let card_source = match card_source {
746 Some(c) => c,
747 None => {
748 let card = car_a2a::build_default_agent_card(
749 &runtime,
750 car_a2a::AgentCardConfig::minimal(
751 "Common Agent Runtime",
752 "Embedded CAR daemon — A2A v1.0 reachable over WebSocket JSON-RPC.",
753 "ws://127.0.0.1:9100/",
754 car_a2a::AgentProvider {
755 organization: "Parslee".into(),
756 url: Some("https://github.com/Parslee-ai/car".into()),
757 },
758 ),
759 )
760 .await;
761 Arc::new(move || card.clone()) as Arc<car_a2a::AgentCardSource>
762 }
763 };
764
765 let dispatcher = Arc::new(car_a2a::A2aDispatcher::new(runtime, store, card_source));
766 // OnceLock::set returns Err on race — accept whichever
767 // dispatcher won and clone-return that one.
768 let _ = self.a2a_dispatcher.set(dispatcher);
769 self.a2a_dispatcher
770 .get()
771 .expect("a2a_dispatcher set or pre-existing")
772 .clone()
773 }
774
775 pub async fn create_session(
776 &self,
777 client_id: &str,
778 channel: Arc<WsChannel>,
779 ) -> Arc<ClientSession> {
780 let journal_path = self.journal_dir.join(format!("{}.jsonl", client_id));
781 let event_log = EventLog::with_journal(journal_path);
782
783 let executor = Arc::new(WsToolExecutor {
784 channel: channel.clone(),
785 });
786
787 let runtime = Runtime::new()
788 .with_event_log(event_log)
789 .with_executor(executor);
790
791 // If the embedder supplied a shared memgine, every session uses it.
792 // Otherwise each session gets its own — matches pre-extraction behavior.
793 let memgine = match &self.shared_memgine {
794 Some(eng) => eng.clone(),
795 None => Arc::new(Mutex::new(car_memgine::MemgineEngine::new(None))),
796 };
797
798 let session = Arc::new(ClientSession {
799 client_id: client_id.to_string(),
800 runtime: Arc::new(runtime),
801 channel,
802 host: self.host.clone(),
803 memgine,
804 browser: car_ffi_common::browser::BrowserSessionSlot::new(),
805 // When auth is disabled (no token installed), every
806 // session is "authenticated" by default — preserves the
807 // pre-#32 behaviour. When auth is enabled, the value is
808 // ignored on creation; the dispatcher's gate checks
809 // `ServerState::auth_token.is_some()` to decide whether
810 // to enforce.
811 authenticated: std::sync::atomic::AtomicBool::new(false),
812 agent_id: tokio::sync::Mutex::new(None),
813 bound_memgine: tokio::sync::Mutex::new(None),
814 });
815
816 self.sessions
817 .lock()
818 .await
819 .insert(client_id.to_string(), session.clone());
820
821 session
822 }
823
824 /// Remove a per-client session from the registry on disconnect.
825 /// Returns the removed session if present so callers can drop any
826 /// remaining strong refs (e.g. drain pending tool callbacks). Fix
827 /// for MULTI-4 / WS-3 — without this, `state.sessions` retains
828 /// `Arc<ClientSession>` for every connection that ever existed.
829 pub async fn remove_session(&self, client_id: &str) -> Option<Arc<ClientSession>> {
830 let removed = self.sessions.lock().await.remove(client_id);
831 if let Some(session) = &removed {
832 // #169: drop the agent_id → client_id binding so a
833 // disconnected lifecycle agent can reconnect (or its
834 // supervisor-respawned replacement can take the slot)
835 // without colliding with the stale claim.
836 let bound = session.agent_id.lock().await.clone();
837 if let Some(id) = bound {
838 let mut attached = self.attached_agents.lock().await;
839 if attached.get(&id).map(String::as_str) == Some(client_id) {
840 attached.remove(&id);
841 }
842 }
843 }
844 removed
845 }
846}