Skip to main content

car_server_core/
session.rs

1//! Server-side session state — shared across all connections.
2
3use car_engine::{Runtime, ToolExecutor};
4use car_eventlog::EventLog;
5use car_proto::{ToolExecuteRequest, ToolExecuteResponse};
6use futures::Sink;
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use std::collections::HashMap;
10use std::path::PathBuf;
11use std::pin::Pin;
12use std::sync::atomic::{AtomicU64, Ordering};
13use std::sync::Arc;
14use tokio::sync::{oneshot, Mutex};
15use tokio_tungstenite::tungstenite::{Error as WsError, Message};
16
17/// Type-erased WebSocket sink. The dispatch loop accepts either a
18/// `WebSocketStream<TcpStream>` (the legacy car-server TCP listener)
19/// or a `WebSocketStream<UnixStream>` (the daemon-as-default UDS
20/// listener) — both implement `Sink<Message, Error = WsError>` after
21/// the tungstenite handshake. Erasing the type here avoids cascading
22/// a generic parameter through every WsChannel / Session / ServerState
23/// touchpoint in the dispatcher.
24pub type WsSink = Pin<Box<dyn Sink<Message, Error = WsError> + Send + Unpin + 'static>>;
25
26/// Server-side credentials for continuing an A2A-owned A2UI surface.
27///
28/// This intentionally lives outside `car_a2ui::A2uiSurfaceOwner` so
29/// renderers can inspect surface ownership without receiving secrets.
30#[derive(Debug, Clone, Serialize, Deserialize)]
31#[serde(rename_all = "camelCase", tag = "type")]
32pub enum A2aRouteAuth {
33    None,
34    Bearer { token: String },
35    Header { name: String, value: String },
36}
37
38/// Shared write half of the WebSocket, plus pending callback channels.
39/// `write` is type-erased via [`WsSink`] so the dispatcher can run
40/// against any transport-specific WebSocketStream (TCP or UDS today;
41/// axum-bridged in future) without templatizing every consumer.
42pub struct WsChannel {
43    pub write: Mutex<WsSink>,
44    /// Pending tool execution callbacks: request_id → oneshot sender
45    pub pending: Mutex<HashMap<String, oneshot::Sender<ToolExecuteResponse>>>,
46    pub next_id: AtomicU64,
47}
48
49impl WsChannel {
50    pub fn next_request_id(&self) -> String {
51        let id = self.next_id.fetch_add(1, Ordering::SeqCst);
52        format!("cb-{}", id)
53    }
54
55    /// Test-only stub that returns a WsChannel whose write sink drains
56    /// to nowhere. Used by `host.rs` tests that need a real
57    /// `Arc<WsChannel>` in the subscribers map (to exercise membership
58    /// checks like the cross-session resolve fan-out) without
59    /// constructing a tungstenite handshake. Never writes are
60    /// performed against this stub; if anything tries, the drain sink
61    /// quietly absorbs.
62    #[cfg(test)]
63    pub fn test_stub() -> Self {
64        use futures::sink::SinkExt;
65        let sink: WsSink = Box::pin(futures::sink::drain().sink_map_err(|_| {
66            tokio_tungstenite::tungstenite::Error::ConnectionClosed
67        }));
68        WsChannel {
69            write: Mutex::new(sink),
70            pending: Mutex::new(HashMap::new()),
71            next_id: AtomicU64::new(0),
72        }
73    }
74}
75
76/// In-flight `agents.chat` session bookkeeping. Created when a host
77/// client calls `agents.chat`, removed when the agent emits a terminal
78/// `agent.chat.event` (`kind: "done"` or `"error"`), when either side
79/// disconnects, or when the host cancels via `agents.chat.cancel`.
80///
81/// The session_id is host-supplied (or server-generated when omitted)
82/// and threads through every `agent.chat.event` notification so the
83/// server can route streamed deltas back to the originating host
84/// without needing per-session subscriptions. See
85/// `docs/proposals/agent-chat-surface.md` for the wire contract.
86#[derive(Debug, Clone)]
87pub struct ChatSession {
88    /// Agent that owns this chat — populated from
89    /// `attached_agents` at `agents.chat` dispatch time.
90    pub agent_id: String,
91    /// Client id of the host that issued `agents.chat`. The server
92    /// forwards `agent.chat.event` notifications back to *this* host
93    /// only, so two CarHost windows chatting with the same agent are
94    /// independent streams.
95    pub host_client_id: String,
96    /// Unix-seconds creation time — used by the future stale-session
97    /// sweeper to drop sessions whose agent died without emitting a
98    /// terminal event.
99    pub created_at: u64,
100}
101
102/// Tool executor that sends callbacks to the client over WebSocket.
103pub struct WsToolExecutor {
104    pub channel: Arc<WsChannel>,
105}
106
107#[async_trait::async_trait]
108impl ToolExecutor for WsToolExecutor {
109    async fn execute(&self, tool: &str, params: &Value) -> Result<Value, String> {
110        // Legacy callers that don't have a proposal-level Action.id
111        // (e.g. internal `executor.execute` chains in tests) — emit an
112        // empty action_id so the client-side handler can still see the
113        // payload shape and decide whether to fail loudly.
114        self.execute_with_action(tool, params, "").await
115    }
116
117    async fn execute_with_action(
118        &self,
119        tool: &str,
120        params: &Value,
121        action_id: &str,
122    ) -> Result<Value, String> {
123        use futures::SinkExt;
124
125        // The JSON-RPC request id is the daemon's callback-routing key
126        // (used by the pending-response map below). The `action_id`
127        // FIELD on the payload is the originating proposal Action.id
128        // surfaced to the host so process-wide handlers can route
129        // concurrent callbacks back to per-call dispatchers
130        // (Parslee-ai/car-releases#43 follow-up). They serve different
131        // purposes and must stay distinct: routing id is daemon-side,
132        // action id is host-side.
133        let request_id = self.channel.next_request_id();
134
135        let callback = ToolExecuteRequest {
136            action_id: action_id.to_string(),
137            tool: tool.to_string(),
138            parameters: params.clone(),
139            timeout_ms: None,
140            attempt: 1,
141        };
142
143        // Create a oneshot channel for the response
144        let (tx, rx) = oneshot::channel();
145        self.channel
146            .pending
147            .lock()
148            .await
149            .insert(request_id.clone(), tx);
150
151        // Send the callback to the client as a JSON-RPC request
152        let rpc_request = serde_json::json!({
153            "jsonrpc": "2.0",
154            "method": "tools.execute",
155            "params": callback,
156            "id": request_id,
157        });
158
159        let msg = Message::Text(
160            serde_json::to_string(&rpc_request)
161                .map_err(|e| e.to_string())?
162                .into(),
163        );
164        self.channel
165            .write
166            .lock()
167            .await
168            .send(msg)
169            .await
170            .map_err(|e| format!("failed to send tool callback: {}", e))?;
171
172        // Wait for the client to respond (with a timeout)
173        let response = tokio::time::timeout(std::time::Duration::from_secs(60), rx)
174            .await
175            .map_err(|_| format!("tool '{}' callback timed out (60s)", tool))?
176            .map_err(|_| format!("tool '{}' callback channel closed", tool))?;
177
178        if let Some(err) = response.error {
179            Err(err)
180        } else {
181            Ok(response.output.unwrap_or(Value::Null))
182        }
183    }
184}
185
186/// Voice event sink that forwards events to a specific WebSocket client
187/// as `voice.event` JSON-RPC notifications.
188///
189/// Each `voice.transcribe_stream.start` call constructs one of these
190/// bound to the originating client's [`WsChannel`], so a client only
191/// receives events for sessions it started.
192pub struct WsVoiceEventSink {
193    pub channel: Arc<WsChannel>,
194}
195
196impl car_voice::VoiceEventSink for WsVoiceEventSink {
197    fn send(&self, session_id: &str, event_json: String) {
198        use futures::SinkExt;
199        let channel = self.channel.clone();
200        let session_id = session_id.to_string();
201        tokio::spawn(async move {
202            let payload: Value = serde_json::from_str(&event_json)
203                .unwrap_or_else(|_| Value::String(event_json.clone()));
204            let notification = serde_json::json!({
205                "jsonrpc": "2.0",
206                "method": "voice.event",
207                "params": {
208                    "session_id": session_id,
209                    "event": payload,
210                },
211            });
212            let Ok(text) = serde_json::to_string(&notification) else {
213                return;
214            };
215            let _ = channel
216                .write
217                .lock()
218                .await
219                .send(Message::Text(text.into()))
220                .await;
221        });
222    }
223
224    fn send_binary(&self, frame: Vec<u8>) {
225        use futures::SinkExt;
226        let channel = self.channel.clone();
227        tokio::spawn(async move {
228            let _ = channel
229                .write
230                .lock()
231                .await
232                .send(Message::Binary(frame.into()))
233                .await;
234        });
235    }
236}
237
238/// Per-meeting fanout sink that ingests transcript text into a
239/// session-scoped memgine using the `Arc<tokio::sync::Mutex<...>>`
240/// wrapper, then forwards every event upstream untouched.
241///
242/// Lives here (not in `car-ffi-common`) because the engine handle uses
243/// `tokio::sync::Mutex` per the "one-wrapper rule" — the FFI-common
244/// `MeetingMemgineFanout` still uses `std::sync::Mutex` for the NAPI/
245/// PyO3 bindings, which keep their sync wrappers. Each binding owns the
246/// fanout that matches its lock primitive; the parsing/formatting logic
247/// itself is shared via [`car_meeting::extract_transcript_for_ingest`].
248///
249/// `send` is called from the voice drain task and must be non-blocking,
250/// so the lock acquisition is shipped to a `tokio::spawn`. Transcript
251/// events are independent so reordering across spawned tasks is fine.
252pub struct WsMemgineIngestSink {
253    pub meeting_id: String,
254    pub engine: Arc<Mutex<car_memgine::MemgineEngine>>,
255    pub upstream: Arc<dyn car_voice::VoiceEventSink>,
256}
257
258impl car_voice::VoiceEventSink for WsMemgineIngestSink {
259    fn send(&self, voice_session_id: &str, event_json: String) {
260        if let Ok(value) = serde_json::from_str::<Value>(&event_json) {
261            if let Some((speaker, text)) = car_meeting::extract_transcript_for_ingest(
262                &value,
263                &self.meeting_id,
264                voice_session_id,
265            ) {
266                let engine = self.engine.clone();
267                tokio::spawn(async move {
268                    let mut guard = engine.lock().await;
269                    guard.ingest_conversation(&speaker, &text, chrono::Utc::now());
270                });
271            }
272        }
273        self.upstream.send(voice_session_id, event_json);
274    }
275}
276
277/// Per-client session.
278pub struct ClientSession {
279    pub client_id: String,
280    pub runtime: Arc<Runtime>,
281    pub channel: Arc<WsChannel>,
282    pub host: Arc<crate::host::HostState>,
283    /// Memgine handle. Wrapped in `tokio::sync::Mutex` so dispatcher
284    /// handlers can hold the lock across `.await` points without
285    /// risking poisoning. Migrated from `std::sync::Mutex` in the
286    /// car-server-core extraction (U1) per the "one-wrapper rule".
287    pub memgine: Arc<Mutex<car_memgine::MemgineEngine>>,
288    /// Lazy browser session — first `browser.run` call launches Chromium,
289    /// subsequent calls reuse it so element IDs resolve across invocations
290    /// within the same WebSocket connection.
291    pub browser: car_ffi_common::browser::BrowserSessionSlot,
292    /// Per-connection auth state. Starts `false`; flips to `true`
293    /// after a successful `session.auth` handshake. Always considered
294    /// authenticated when `ServerState::auth_token` is unset (auth
295    /// disabled). Closes Parslee-ai/car-releases#32.
296    pub authenticated: std::sync::atomic::AtomicBool,
297    /// Bound agent identity (#169). `Some(id)` once a lifecycle-agent
298    /// child has called `session.auth { token, agent_id }` and the
299    /// supervisor confirmed `agent_id` is supervised + token matches.
300    /// Used by `agents.list` to surface which managed agents have
301    /// actually attached vs. just being marked `Running` at the
302    /// process level. Cleared at disconnect by `remove_session`.
303    pub agent_id: tokio::sync::Mutex<Option<String>>,
304    /// Bound persistent memgine (#170). `Some` after `session.auth`
305    /// successfully attaches the connection to a daemon-owned
306    /// per-agent memgine (paired with `agent_id`). Memory handlers
307    /// route through [`ClientSession::effective_memgine`] which
308    /// returns this when set, falling back to the ephemeral
309    /// `memgine` field for browser/host/CLI connections.
310    pub bound_memgine: tokio::sync::Mutex<Option<Arc<Mutex<car_memgine::MemgineEngine>>>>,
311}
312
313impl ClientSession {
314    /// Returns the memgine handle the memory.* handlers should use:
315    /// the bound per-agent memgine when this session attached via
316    /// `session.auth { agent_id }` (#169 + #170), otherwise the
317    /// ephemeral per-WS memgine. Cheap (one async lock + Arc clone).
318    pub async fn effective_memgine(&self) -> Arc<Mutex<car_memgine::MemgineEngine>> {
319        if let Some(eng) = self.bound_memgine.lock().await.as_ref() {
320            return eng.clone();
321        }
322        self.memgine.clone()
323    }
324}
325
326/// Builder for constructing a [`ServerState`] with embedder-supplied
327/// dependencies. Embedders (e.g. `tokhn-daemon`) use this to inject
328/// their own memgine handle and other shared infrastructure; the
329/// Approval-gate policy for high-risk WS methods.
330///
331/// Every method in `methods` must be acknowledged via
332/// `host.resolve_approval` before the dispatcher will route the
333/// request to its handler. The dispatcher waits up to `timeout` for
334/// a resolution; on timeout (or any non-`approve` resolution) the
335/// request fails with JSON-RPC error `-32003`.
336///
337/// Default: gate enabled, the macOS-automation surface
338/// (`automation.run_applescript`, `automation.shortcuts.run`,
339/// `messages.send`, `mail.send`, `vision.ocr`), 60-second timeout.
340/// `car-server --no-approvals` (or embedders calling
341/// [`ServerStateConfig::with_approval_gate`] with `enabled=false`)
342/// turns it off — only appropriate when no untrusted caller can
343/// reach the WS port.
344#[derive(Debug, Clone)]
345pub struct ApprovalGate {
346    /// Master switch. When `false`, every method dispatches without
347    /// raising an approval — the pre-2026-05 behaviour.
348    pub enabled: bool,
349    /// Methods that require approval. Match is by exact method-name
350    /// string against the JSON-RPC `method` field.
351    pub methods: std::collections::HashSet<String>,
352    /// How long to wait for the user to resolve the approval before
353    /// timing out and surfacing an error to the caller.
354    pub timeout: std::time::Duration,
355}
356
357impl Default for ApprovalGate {
358    fn default() -> Self {
359        let methods = [
360            "automation.run_applescript",
361            "automation.shortcuts.run",
362            "messages.send",
363            "mail.send",
364            "vision.ocr",
365        ]
366        .iter()
367        .map(|s| s.to_string())
368        .collect();
369        Self {
370            enabled: true,
371            methods,
372            timeout: std::time::Duration::from_secs(60),
373        }
374    }
375}
376
377impl ApprovalGate {
378    /// Disable the gate entirely. Equivalent to passing
379    /// `car-server --no-approvals`. Only appropriate when no
380    /// untrusted caller can reach the WS port.
381    pub fn disabled() -> Self {
382        Self {
383            enabled: false,
384            methods: std::collections::HashSet::new(),
385            timeout: std::time::Duration::from_secs(60),
386        }
387    }
388
389    /// `true` if this method must be acknowledged before dispatch.
390    pub fn requires_approval(&self, method: &str) -> bool {
391        self.enabled && self.methods.contains(method)
392    }
393}
394
395/// standalone `car-server` binary uses [`ServerState::standalone`]
396/// which calls `with_config` under the hood.
397pub struct ServerStateConfig {
398    pub journal_dir: PathBuf,
399    /// Optional pre-constructed memgine engine. When `None`, each
400    /// `create_session` call builds a fresh engine; embedders that want
401    /// to share a single engine across sessions can supply a clone of
402    /// their `Arc<Mutex<MemgineEngine>>` here.
403    pub shared_memgine: Option<Arc<Mutex<car_memgine::MemgineEngine>>>,
404    /// Optional pre-constructed inference engine.
405    pub inference: Option<Arc<car_inference::InferenceEngine>>,
406    /// Optional embedder-supplied A2A runtime. Used by the in-core
407    /// `A2aDispatcher` to execute peer-driven proposals. When `None`,
408    /// the dispatcher uses a fresh `Runtime` with `register_agent_basics`
409    /// — peer agents see CAR's built-in tools and nothing else,
410    /// matching the behaviour of the standalone `start_a2a_listener`.
411    pub a2a_runtime: Option<Arc<car_engine::Runtime>>,
412    /// Optional embedder-supplied A2A task store. When `None`,
413    /// defaults to `InMemoryTaskStore`. tokhn-style embedders that
414    /// want a polling-friendly persistent store plug it in here.
415    pub a2a_store: Option<Arc<dyn car_a2a::TaskStore>>,
416    /// Optional embedder-supplied agent card factory. When `None`,
417    /// the dispatcher serves a card built from the A2A runtime's
418    /// tool schemas at construction time, advertising its public URL
419    /// as `ws://127.0.0.1:9100/` (the WS surface the dispatcher itself
420    /// is reachable on).
421    pub a2a_card_source: Option<Arc<car_a2a::AgentCardSource>>,
422    /// Approval-gate policy. When `None`, the dispatcher uses
423    /// [`ApprovalGate::default`] (gate ON, the macOS-automation
424    /// surface gated, 60s timeout). Pass
425    /// [`ApprovalGate::disabled`] to opt out — only appropriate
426    /// when no untrusted caller can reach the WS port.
427    pub approval_gate: Option<ApprovalGate>,
428}
429
430impl ServerStateConfig {
431    /// Minimal config suitable for the standalone car-server binary:
432    /// only the journal dir is required; everything else is lazily
433    /// constructed at first use.
434    pub fn new(journal_dir: PathBuf) -> Self {
435        Self {
436            journal_dir,
437            shared_memgine: None,
438            inference: None,
439            a2a_runtime: None,
440            a2a_store: None,
441            a2a_card_source: None,
442            approval_gate: None,
443        }
444    }
445
446    pub fn with_shared_memgine(mut self, engine: Arc<Mutex<car_memgine::MemgineEngine>>) -> Self {
447        self.shared_memgine = Some(engine);
448        self
449    }
450
451    pub fn with_inference(mut self, engine: Arc<car_inference::InferenceEngine>) -> Self {
452        self.inference = Some(engine);
453        self
454    }
455
456    /// Plug in an embedder-supplied runtime for the A2A dispatcher.
457    /// Use case: tokhn-daemon wants peers to see its OPA preflight
458    /// tooling, not just CAR's `register_agent_basics` defaults.
459    pub fn with_a2a_runtime(mut self, runtime: Arc<car_engine::Runtime>) -> Self {
460        self.a2a_runtime = Some(runtime);
461        self
462    }
463
464    /// Plug in an embedder-supplied task store for the A2A
465    /// dispatcher. Use case: tokhn's polling-friendly persistent
466    /// store keyed by their session id.
467    pub fn with_a2a_store(mut self, store: Arc<dyn car_a2a::TaskStore>) -> Self {
468        self.a2a_store = Some(store);
469        self
470    }
471
472    /// Plug in an embedder-supplied agent card factory. The factory
473    /// is invoked on every `agent/getAuthenticatedExtendedCard`
474    /// dispatch, so embedders can reflect runtime tool changes.
475    pub fn with_a2a_card_source(mut self, source: Arc<car_a2a::AgentCardSource>) -> Self {
476        self.a2a_card_source = Some(source);
477        self
478    }
479
480    /// Override the approval-gate policy. Pass
481    /// [`ApprovalGate::disabled`] to skip the gate entirely (only
482    /// appropriate when no untrusted caller can reach the WS port);
483    /// pass a customised [`ApprovalGate`] to add or remove methods
484    /// or to change the timeout.
485    pub fn with_approval_gate(mut self, gate: ApprovalGate) -> Self {
486        self.approval_gate = Some(gate);
487        self
488    }
489}
490
491/// Global server state shared across all connections.
492pub struct ServerState {
493    pub journal_dir: PathBuf,
494    pub sessions: Mutex<HashMap<String, Arc<ClientSession>>>,
495    pub inference: std::sync::OnceLock<Arc<car_inference::InferenceEngine>>,
496    pub host: Arc<crate::host::HostState>,
497    /// When `Some`, `create_session` clones this handle into every new
498    /// `ClientSession.memgine` — embedders that want a single shared
499    /// memgine across all WS sessions set this. Standalone car-server
500    /// leaves it `None`, which gives each session its own engine
501    /// (preserving today's behavior).
502    pub shared_memgine: Option<Arc<Mutex<car_memgine::MemgineEngine>>>,
503    /// Process-wide voice session registry. Each
504    /// `voice.transcribe_stream.start` call registers its own per-client
505    /// [`WsVoiceEventSink`] so events route back to the originating WS
506    /// connection only.
507    pub voice_sessions: Arc<car_voice::VoiceSessionRegistry>,
508    /// Process-wide meeting registry. Meeting ids are global; each
509    /// meeting binds to the originating client's WS for upstream
510    /// events but persists transcripts to the resolved
511    /// `.car/meetings/<id>/` regardless of which client started it.
512    pub meetings: Arc<car_meeting::MeetingRegistry>,
513    /// Process-wide A2UI surface store. Agent-produced surfaces are
514    /// visible to every host UI subscriber, independent of the
515    /// WebSocket session that applied the update.
516    pub a2ui: car_a2ui::A2uiSurfaceStore,
517    /// In-process UI-improvement agent. Invoked from
518    /// `handle_a2ui_render_report` with each inbound report; returned
519    /// `Decision::Patch` envelopes are applied via the standard
520    /// `apply_a2ui_envelope` path so all subscribers see the patch.
521    /// `Arc` so the agent's interior `DashMap` state survives across
522    /// handler calls even when `ServerState` is cheap-cloned.
523    pub ui_agent: Arc<car_ui_agent::UIImprovementAgent>,
524    /// Per-surface oscillation detector for the UI-improvement
525    /// loop. Sits between the agent's `Decision::Patch` and the
526    /// apply path so A→B→A patch cycles get cooled down without
527    /// the agent itself having to track history. neo's review:
528    /// "controllers use workqueue backoff; reconcilers stay
529    /// stateless."
530    pub ui_agent_oscillation: Arc<crate::ui_agent_loop::OscillationDetector>,
531    /// Per-surface iteration budget. Backstop against runaway
532    /// loops the oscillation detector misses — caps total agent-
533    /// driven patches per surface at `DEFAULT_MAX_ITERATIONS`.
534    pub ui_agent_budget: Arc<crate::ui_agent_loop::IterationBudget>,
535    /// Process-wide concurrency gate for inference RPC handlers. Sized
536    /// from host RAM at startup, overridable via
537    /// [`crate::admission::ENV_MAX_CONCURRENT`]. Without this, N
538    /// concurrent users multiply KV-cache and activation memory and
539    /// take the host out (#114-adjacent: filed alongside the daemon
540    /// always-on rework). The semaphore lives on `ServerState` so it
541    /// is shared across every WebSocket session in the same process.
542    pub admission: Arc<crate::admission::InferenceAdmission>,
543    /// Server-side A2A continuation auth keyed by A2UI surface id.
544    /// Kept out of `A2uiSurface.owner` so host renderers never see
545    /// bearer/API-key material.
546    pub a2ui_route_auth: Mutex<HashMap<String, A2aRouteAuth>>,
547    /// Lifecycle-managed agents — declarative manifest at
548    /// `~/.car/agents.json` driving spawn/restart/stop. Closes
549    /// Parslee-ai/car-releases#27. Lazy-initialized so embedders that
550    /// don't want process supervision don't pay the disk-touch cost
551    /// at server start.
552    pub supervisor: std::sync::OnceLock<Arc<car_registry::supervisor::Supervisor>>,
553    /// Manifest path this daemon is *observing* but does NOT own.
554    /// Set by `car-server` when boot-time supervisor construction
555    /// fails with [`car_registry::supervisor::SupervisorError::AlreadyRunning`]
556    /// — another car-server process on the host holds the exclusive
557    /// lock on this manifest. In that state, `supervisor()` returns a
558    /// clear "observe-only" error so mutation handlers refuse
559    /// (preventing the duplicate-spawn bug from
560    /// Parslee-ai/car-releases#44), while read-only handlers
561    /// (`agents.list`, `agents.health`) fall back to
562    /// [`car_registry::supervisor::Supervisor::list_from_manifest`] /
563    /// [`car_registry::supervisor::Supervisor::health_from_manifest`]
564    /// so operators can still inspect what the primary daemon is
565    /// supervising.
566    pub observer_manifest_path: std::sync::OnceLock<PathBuf>,
567    /// In-core A2A dispatcher — embedders that consume `car-server-core`
568    /// get A2A reachability "for free" without standing up a separate
569    /// HTTP listener. Closes Parslee-ai/car-releases#28. Lazy-init so
570    /// the embedder can override the runtime / task store / agent card
571    /// via [`ServerStateConfig::with_a2a_runtime`] etc. before the
572    /// first dispatch.
573    pub a2a_dispatcher: std::sync::OnceLock<Arc<car_a2a::A2aDispatcher>>,
574    /// WS clients subscribed to A2UI envelope events. After every
575    /// successful `a2ui.apply` / `a2ui.ingest`, the resulting
576    /// `A2uiApplyResult` is broadcast to every subscriber as an
577    /// `a2ui.event` JSON-RPC notification. Closes
578    /// Parslee-ai/car-releases#29. Subscribers register via the
579    /// `a2ui/subscribe` method and are auto-cleaned on WS disconnect.
580    pub a2ui_subscribers: Mutex<HashMap<String, Arc<WsChannel>>>,
581    /// Per-launch auth token. When `Some`, the WS dispatcher rejects
582    /// non-auth methods on unauthenticated sessions until the client
583    /// calls `session.auth` with the matching value. When `None`,
584    /// auth is disabled and every connection works as before. Set
585    /// at startup by `car-server` unless `--no-auth` is passed
586    /// (default flipped 2026-05); embedders that want to enable
587    /// auth call [`ServerState::install_auth_token`]. Closes
588    /// Parslee-ai/car-releases#32.
589    pub auth_token: std::sync::OnceLock<String>,
590    /// Parslee cloud identity loaded from the user's OS keychain at
591    /// daemon startup when `car auth login` has been completed.
592    pub parslee_session: std::sync::OnceLock<crate::parslee_auth::ParsleeSession>,
593    /// `agent_id -> client_id` map of currently-attached lifecycle
594    /// agents (#169). Populated by the `session.auth` handler when a
595    /// supervised child presents its `agent_id` + per-agent token;
596    /// drained on disconnect by `remove_session`. Single-claim:
597    /// a second connection presenting the same `agent_id` is
598    /// rejected so the daemon-side per-agent state stays unambiguous.
599    pub attached_agents: Mutex<HashMap<String, String>>,
600    /// `agent_id -> persistent memgine` map (#170). Lazy-loaded on
601    /// first connection per id from `~/.car/memory/agents/<id>.jsonl`,
602    /// retained across daemon restart, surviving any single
603    /// disconnect/reconnect of the supervised child. Connections
604    /// that auth without an `agent_id` (browser, host, ad-hoc CLI)
605    /// keep the per-WS ephemeral memgine on `ClientSession.memgine`
606    /// — no behaviour change.
607    pub agent_memgines: Mutex<HashMap<String, Arc<Mutex<car_memgine::MemgineEngine>>>>,
608    /// In-flight `agents.chat` sessions keyed by `session_id`. See
609    /// [`ChatSession`] for shape. Populated by `agents.chat`,
610    /// cleared on terminal `agent.chat.event` or
611    /// `agents.chat.cancel`. Disconnect cleanup happens in
612    /// `remove_session` — any in-flight session bound to either the
613    /// disconnecting host or agent client is dropped so subsequent
614    /// stray notifications from a respawned agent fall on the floor
615    /// rather than racing into a stale stream.
616    pub chat_sessions: Mutex<HashMap<String, ChatSession>>,
617    /// Bound MCP HTTP-streamable URL (e.g.
618    /// `"http://127.0.0.1:9102/mcp"`) — `car-server` installs this
619    /// after binding the listener. Used by the
620    /// `agents.invoke_external` handler to default
621    /// `InvokeOptions.mcp_endpoint` so external agents
622    /// (Claude Code today) load the daemon's CAR namespace via
623    /// `--mcp-config` automatically. `None` when MCP isn't bound
624    /// (e.g. `--mcp-bind disabled`).
625    pub mcp_url: std::sync::OnceLock<String>,
626    /// Registry of connected MCP SSE sessions. Populated alongside
627    /// [`mcp_url`] when `car-server` boots the MCP listener. Public
628    /// so handlers can call `crate::mcp::push_to_session` to send
629    /// server-initiated requests to a specific MCP-connected
630    /// client (MCP-3 foundation; MCP-3b will wire host-owned tool
631    /// dispatch through this).
632    pub mcp_sessions: std::sync::OnceLock<Arc<crate::mcp::SessionMap>>,
633    /// Approval gate for high-risk WS methods (audit 2026-05). The
634    /// gate intercepts `automation.run_applescript`,
635    /// `automation.shortcuts.run`, `messages.send`, `mail.send`, and
636    /// `vision.ocr` before they dispatch, raises a
637    /// `host.create_approval` for the user to act on, and waits
638    /// (with a timeout) for `host.resolve_approval`. Approve →
639    /// dispatch continues; deny / timeout → JSON-RPC error code
640    /// `-32003`. The set of gated methods and the wait timeout are
641    /// embedder-overridable via
642    /// [`ServerStateConfig::with_approval_gate`].
643    pub approval_gate: ApprovalGate,
644    /// A2A-runtime / store / card factory carried over from the
645    /// embedder's [`ServerStateConfig`]. Consumed lazily on first
646    /// `a2a_dispatcher()` call so embedders can construct
647    /// `ServerState` without paying the runtime spin-up cost when
648    /// they don't actually use the A2A surface.
649    pub(crate) a2a_runtime: std::sync::Mutex<Option<Arc<car_engine::Runtime>>>,
650    pub(crate) a2a_store: std::sync::Mutex<Option<Arc<dyn car_a2a::TaskStore>>>,
651    pub(crate) a2a_card_source: std::sync::Mutex<Option<Arc<car_a2a::AgentCardSource>>>,
652}
653
654impl ServerState {
655    /// Constructor for the standalone `car-server` binary. Each WS
656    /// connection gets its own per-session memgine — matches the
657    /// pre-extraction default and is correct for a single-process
658    /// daemon serving one user at a time.
659    ///
660    /// **Embedders must not call this.** It silently leaves
661    /// `shared_memgine = None`, which re-introduces the dual-memgine
662    /// bug U7 was created to prevent (one engine in the embedder, a
663    /// fresh one inside every WS session). Embedders use
664    /// [`ServerState::embedded`] instead, which makes the shared
665    /// engine handle a required argument so it cannot be forgotten.
666    pub fn standalone(journal_dir: PathBuf) -> Self {
667        Self::with_config(ServerStateConfig::new(journal_dir))
668    }
669
670    /// Constructor for embedders (e.g. `tokhn-daemon`). The shared
671    /// memgine handle is **required**: every WS session created by
672    /// this state will reuse the same engine, preventing the
673    /// dual-memgine bug.
674    ///
675    /// For embedders that also want to inject a pre-warmed inference
676    /// engine or other advanced wiring, build a [`ServerStateConfig`]
677    /// directly and call [`ServerState::with_config`].
678    pub fn embedded(
679        journal_dir: PathBuf,
680        shared_memgine: Arc<Mutex<car_memgine::MemgineEngine>>,
681    ) -> Self {
682        Self::with_config(ServerStateConfig::new(journal_dir).with_shared_memgine(shared_memgine))
683    }
684
685    /// Build a `ServerState` from a [`ServerStateConfig`] — the path
686    /// embedders use when they need to inject a shared memgine *and*
687    /// a pre-warmed inference engine, or any other advanced wiring
688    /// the convenience constructors don't cover.
689    pub fn with_config(cfg: ServerStateConfig) -> Self {
690        let inference = std::sync::OnceLock::new();
691        if let Some(eng) = cfg.inference {
692            // OnceLock::set returns Err if already set — fresh OnceLock
693            // means it's empty, so this is infallible here.
694            let _ = inference.set(eng);
695        }
696        let voice_sessions = Arc::new(car_voice::VoiceSessionRegistry::new());
697        // Reap sessions whose clients dropped without calling
698        // voice.transcribe_stream.stop (WS disconnect, process exit,
699        // etc.). Listener handles otherwise leak for the daemon's
700        // lifetime. `with_config` is sync but always called from the
701        // `#[tokio::main]` entry point, so `Handle::try_current()`
702        // inside `start_sweeper` finds the runtime.
703        voice_sessions.start_sweeper();
704        // UI-improvement agent is pure decision logic — no I/O, no
705        // persistence handle. Memgine ingest of strategy outcomes is
706        // the caller's responsibility (handler.rs after a successful
707        // Decision::Patch). Keeps the agent crate Mutex-flavor
708        // agnostic so it can compose with std/tokio mutex callers.
709        let ui_agent = Arc::new(car_ui_agent::UIImprovementAgent::with_default_strategies());
710        let ui_agent_oscillation = Arc::new(crate::ui_agent_loop::OscillationDetector::new());
711        let ui_agent_budget = Arc::new(crate::ui_agent_loop::IterationBudget::new());
712        Self {
713            journal_dir: cfg.journal_dir,
714            sessions: Mutex::new(HashMap::new()),
715            inference,
716            host: Arc::new(crate::host::HostState::new()),
717            shared_memgine: cfg.shared_memgine,
718            voice_sessions,
719            meetings: Arc::new(car_meeting::MeetingRegistry::new()),
720            a2ui: car_a2ui::A2uiSurfaceStore::new(),
721            ui_agent,
722            ui_agent_oscillation,
723            ui_agent_budget,
724            admission: Arc::new(crate::admission::InferenceAdmission::new()),
725            a2ui_route_auth: Mutex::new(HashMap::new()),
726            supervisor: std::sync::OnceLock::new(),
727            observer_manifest_path: std::sync::OnceLock::new(),
728            a2a_dispatcher: std::sync::OnceLock::new(),
729            a2a_runtime: std::sync::Mutex::new(cfg.a2a_runtime),
730            a2a_store: std::sync::Mutex::new(cfg.a2a_store),
731            a2a_card_source: std::sync::Mutex::new(cfg.a2a_card_source),
732            a2ui_subscribers: Mutex::new(HashMap::new()),
733            auth_token: std::sync::OnceLock::new(),
734            parslee_session: std::sync::OnceLock::new(),
735            attached_agents: Mutex::new(HashMap::new()),
736            agent_memgines: Mutex::new(HashMap::new()),
737            chat_sessions: Mutex::new(HashMap::new()),
738            mcp_url: std::sync::OnceLock::new(),
739            mcp_sessions: std::sync::OnceLock::new(),
740            approval_gate: cfg.approval_gate.unwrap_or_default(),
741        }
742    }
743
744    /// Enable the per-launch auth handshake. After this call, every
745    /// new WS connection must call `session.auth` with `token` as
746    /// the first frame; otherwise the connection is closed. Called
747    /// by `car-server` at startup unless `--no-auth` is set
748    /// (default flipped 2026-05); embedders supply their own token
749    /// if they want the same posture. Returns `Err(token)` when
750    /// auth was already installed.
751    pub fn install_auth_token(&self, token: String) -> Result<(), String> {
752        self.auth_token.set(token)
753    }
754
755    pub fn install_parslee_session(
756        &self,
757        session: crate::parslee_auth::ParsleeSession,
758    ) -> Result<(), crate::parslee_auth::ParsleeSession> {
759        self.parslee_session.set(session)
760    }
761
762    /// Install the bound MCP URL after car-server's listener is up.
763    /// Idempotent on the first call; subsequent calls are accepted
764    /// silently (matches the supervisor / a2a_dispatcher install
765    /// idiom). Returns `Err(())` when an MCP URL was already
766    /// installed — embedders should treat this as "another
767    /// component beat us to it" and use whichever value is now set.
768    pub fn install_mcp_url(&self, url: String) -> Result<(), String> {
769        self.mcp_url.set(url)
770    }
771
772    /// Install the MCP SSE session registry. Pairs with
773    /// [`install_mcp_url`] — both come from the same `start_mcp`
774    /// call and either both get installed or neither does (the
775    /// daemon binds them together).
776    pub fn install_mcp_sessions(
777        &self,
778        sessions: Arc<crate::mcp::SessionMap>,
779    ) -> Result<(), Arc<crate::mcp::SessionMap>> {
780        self.mcp_sessions.set(sessions)
781    }
782
783    /// Lazy-initialize and return the agent supervisor. The first
784    /// call constructs a [`car_registry::supervisor::Supervisor`] backed by
785    /// `~/.car/agents.json` + `~/.car/logs/`. Embedders that need a
786    /// non-default location should call
787    /// [`ServerState::install_supervisor`] before any handler runs.
788    ///
789    /// In observer mode (set via [`install_observer_manifest`]),
790    /// returns a clear error mentioning the manifest path the
791    /// primary daemon owns. This prevents the second daemon from
792    /// re-attempting `user_default()` (which would also fail with
793    /// `AlreadyRunning`) on every WS call, and gives mutation
794    /// handlers a stable refusal path. Read-only handlers
795    /// (`agents.list`, `agents.health`) should call
796    /// [`Self::observer_manifest_path`] first and fall back to
797    /// [`car_registry::supervisor::Supervisor::list_from_manifest`] /
798    /// `health_from_manifest` when set. Closes
799    /// Parslee-ai/car-releases#44.
800    pub fn supervisor(&self) -> Result<Arc<car_registry::supervisor::Supervisor>, String> {
801        if let Some(s) = self.supervisor.get() {
802            return Ok(s.clone());
803        }
804        if let Some(p) = self.observer_manifest_path.get() {
805            return Err(format!(
806                "this car-server is observe-only — another car-server process \
807                 holds the supervisor lock for {}. Mutations refuse here; route \
808                 them to the primary daemon, or stop the other car-server first.",
809                p.display()
810            ));
811        }
812        let s = car_registry::supervisor::Supervisor::user_default()
813            .map(Arc::new)
814            .map_err(|e| e.to_string())?;
815        // OnceLock::set returns the original arg back on collision —
816        // a concurrent caller racing through user_default. Take
817        // whichever wins.
818        let _ = self.supervisor.set(s);
819        Ok(self.supervisor.get().expect("set or pre-existing").clone())
820    }
821
822    /// Replace the lazy default with a caller-supplied supervisor.
823    /// Returns `Err(())` when a supervisor was already installed.
824    /// Used by the standalone `car-server` binary to call
825    /// `start_all()` on a known-good handle without paying the
826    /// lazy-init lookup cost.
827    pub fn install_supervisor(
828        &self,
829        supervisor: Arc<car_registry::supervisor::Supervisor>,
830    ) -> Result<(), Arc<car_registry::supervisor::Supervisor>> {
831        self.supervisor.set(supervisor)
832    }
833
834    /// Non-acquiring read of the currently-installed supervisor.
835    /// Unlike [`supervisor`](Self::supervisor), this does NOT lazy-
836    /// init via `user_default()` — it returns `None` instead of
837    /// constructing a fresh `Supervisor` and acquiring the
838    /// `<manifest>.lock` as a side effect. Use this from read-only
839    /// metadata paths (`host.subscribe` identity, status surfaces)
840    /// where causing lock acquisition on observation would be a
841    /// Heisenberg subscribe — the act of asking "do you own the
842    /// lock?" must not be the act of taking it.
843    pub fn supervisor_if_installed(&self) -> Option<Arc<car_registry::supervisor::Supervisor>> {
844        self.supervisor.get().cloned()
845    }
846
847    /// Mark this daemon as *observing* a manifest owned by another
848    /// car-server process. After this call, `supervisor()` returns
849    /// an "observe-only" error and read-only handlers
850    /// (`agents.list`, `agents.health`) fall back to the static
851    /// `Supervisor::list_from_manifest` / `health_from_manifest`
852    /// paths. Idempotent — subsequent calls with the same path are
853    /// no-ops; a different path returns `Err(())`. Closes
854    /// Parslee-ai/car-releases#44.
855    pub fn install_observer_manifest(&self, path: PathBuf) -> Result<(), PathBuf> {
856        self.observer_manifest_path.set(path)
857    }
858
859    /// Path of the manifest this daemon is observing but not
860    /// supervising. `None` when this daemon owns the supervisor
861    /// (the normal case) or when no manifest is configured at all
862    /// (no `HOME`, embedder didn't install one).
863    pub fn observer_manifest_path(&self) -> Option<&PathBuf> {
864        self.observer_manifest_path.get()
865    }
866
867    /// Lazy-initialize and return the in-core A2A dispatcher. The
868    /// first call constructs an [`car_a2a::A2aDispatcher`] from
869    /// either the embedder's overrides (set via
870    /// [`ServerStateConfig::with_a2a_runtime`] / `with_a2a_store` /
871    /// `with_a2a_card_source`) or sensible defaults: a fresh
872    /// `Runtime` with `register_agent_basics` registered, an
873    /// `InMemoryTaskStore`, and a card built from the runtime's
874    /// tool schemas advertising `ws://127.0.0.1:9100/` as the
875    /// public URL. Closes Parslee-ai/car-releases#28.
876    pub async fn a2a_dispatcher(&self) -> Arc<car_a2a::A2aDispatcher> {
877        if let Some(d) = self.a2a_dispatcher.get() {
878            return d.clone();
879        }
880
881        // Embedder overrides take precedence; fall back to defaults
882        // for each slot independently (so an embedder that only
883        // wants a custom card can leave the runtime + store at
884        // defaults). `Mutex::take()` consumes the slot so the
885        // defaults aren't reconstructed on a racing init that loses
886        // the OnceLock::set call below.
887        let runtime = self
888            .a2a_runtime
889            .lock()
890            .expect("a2a_runtime mutex poisoned")
891            .take();
892        let runtime = match runtime {
893            Some(r) => r,
894            None => {
895                let r = Arc::new(car_engine::Runtime::new());
896                r.register_agent_basics().await;
897                r
898            }
899        };
900
901        let store = self
902            .a2a_store
903            .lock()
904            .expect("a2a_store mutex poisoned")
905            .take()
906            .unwrap_or_else(|| Arc::new(car_a2a::InMemoryTaskStore::new()));
907
908        let card_source = self
909            .a2a_card_source
910            .lock()
911            .expect("a2a_card_source mutex poisoned")
912            .take();
913        let card_source = match card_source {
914            Some(c) => c,
915            None => {
916                let card = car_a2a::build_default_agent_card(
917                    &runtime,
918                    car_a2a::AgentCardConfig::minimal(
919                        "Common Agent Runtime",
920                        "Embedded CAR daemon — A2A v1.0 reachable over WebSocket JSON-RPC.",
921                        "ws://127.0.0.1:9100/",
922                        car_a2a::AgentProvider {
923                            organization: "Parslee".into(),
924                            url: Some("https://github.com/Parslee-ai/car".into()),
925                        },
926                    ),
927                )
928                .await;
929                Arc::new(move || card.clone()) as Arc<car_a2a::AgentCardSource>
930            }
931        };
932
933        let dispatcher = Arc::new(car_a2a::A2aDispatcher::new(runtime, store, card_source));
934        // OnceLock::set returns Err on race — accept whichever
935        // dispatcher won and clone-return that one.
936        let _ = self.a2a_dispatcher.set(dispatcher);
937        self.a2a_dispatcher
938            .get()
939            .expect("a2a_dispatcher set or pre-existing")
940            .clone()
941    }
942
943    pub async fn create_session(
944        &self,
945        client_id: &str,
946        channel: Arc<WsChannel>,
947    ) -> Arc<ClientSession> {
948        let journal_path = self.journal_dir.join(format!("{}.jsonl", client_id));
949        let event_log = EventLog::with_journal(journal_path);
950
951        let executor = Arc::new(WsToolExecutor {
952            channel: channel.clone(),
953        });
954
955        let runtime = Runtime::new()
956            .with_event_log(event_log)
957            .with_executor(executor);
958
959        // If the embedder supplied a shared memgine, every session uses it.
960        // Otherwise each session gets its own — matches pre-extraction behavior.
961        let memgine = match &self.shared_memgine {
962            Some(eng) => eng.clone(),
963            None => Arc::new(Mutex::new(car_memgine::MemgineEngine::new(None))),
964        };
965
966        let session = Arc::new(ClientSession {
967            client_id: client_id.to_string(),
968            runtime: Arc::new(runtime),
969            channel,
970            host: self.host.clone(),
971            memgine,
972            browser: car_ffi_common::browser::BrowserSessionSlot::new(),
973            // When auth is disabled (no token installed), every
974            // session is "authenticated" by default — preserves the
975            // pre-#32 behaviour. When auth is enabled, the value is
976            // ignored on creation; the dispatcher's gate checks
977            // `ServerState::auth_token.is_some()` to decide whether
978            // to enforce.
979            authenticated: std::sync::atomic::AtomicBool::new(false),
980            agent_id: tokio::sync::Mutex::new(None),
981            bound_memgine: tokio::sync::Mutex::new(None),
982        });
983
984        self.sessions
985            .lock()
986            .await
987            .insert(client_id.to_string(), session.clone());
988
989        session
990    }
991
992    /// Remove a per-client session from the registry on disconnect.
993    /// Returns the removed session if present so callers can drop any
994    /// remaining strong refs (e.g. drain pending tool callbacks). Fix
995    /// for MULTI-4 / WS-3 — without this, `state.sessions` retains
996    /// `Arc<ClientSession>` for every connection that ever existed.
997    pub async fn remove_session(&self, client_id: &str) -> Option<Arc<ClientSession>> {
998        let removed = self.sessions.lock().await.remove(client_id);
999        if let Some(session) = &removed {
1000            // #169: drop the agent_id → client_id binding so a
1001            // disconnected lifecycle agent can reconnect (or its
1002            // supervisor-respawned replacement can take the slot)
1003            // without colliding with the stale claim.
1004            let bound = session.agent_id.lock().await.clone();
1005            if let Some(id) = bound {
1006                let mut attached = self.attached_agents.lock().await;
1007                if attached.get(&id).map(String::as_str) == Some(client_id) {
1008                    attached.remove(&id);
1009                }
1010            }
1011            // Drop any in-flight `agents.chat` sessions bound to this
1012            // client — either side disconnecting orphans the stream,
1013            // and a respawned agent's stray `agent.chat.event`
1014            // notifications must not race into a stale routing entry.
1015            // See `docs/proposals/agent-chat-surface.md`.
1016            let bound_agent = session.agent_id.lock().await.clone();
1017            let mut chats = self.chat_sessions.lock().await;
1018            chats.retain(|_, s| {
1019                if s.host_client_id == client_id {
1020                    return false;
1021                }
1022                if let Some(agent_id) = &bound_agent {
1023                    if &s.agent_id == agent_id {
1024                        return false;
1025                    }
1026                }
1027                true
1028            });
1029        }
1030        removed
1031    }
1032}
1033
1034#[cfg(test)]
1035mod observer_mode_tests {
1036    use super::*;
1037
1038    fn journal_dir() -> PathBuf {
1039        let target = std::env::var_os("CARGO_TARGET_DIR")
1040            .map(std::path::PathBuf::from)
1041            .unwrap_or_else(|| {
1042                std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
1043                    .join("..")
1044                    .join("..")
1045                    .join("target")
1046            });
1047        std::fs::create_dir_all(&target).ok();
1048        let target = std::fs::canonicalize(&target).unwrap_or(target);
1049        let tmp = tempfile::TempDir::new_in(&target).unwrap();
1050        let p = tmp.path().to_path_buf();
1051        std::mem::forget(tmp); // keep the dir alive for the test
1052        p
1053    }
1054
1055    #[test]
1056    fn supervisor_returns_observer_error_when_marker_set() {
1057        // Closes Parslee-ai/car-releases#44: the second car-server on
1058        // a host installs the observer marker after `with_paths`
1059        // returns AlreadyRunning. Subsequent `state.supervisor()`
1060        // calls must return a clear "observe-only" error mentioning
1061        // the manifest path — they must NOT retry user_default()
1062        // (which would re-acquire the lock and likely also fail).
1063        let state = ServerState::standalone(journal_dir());
1064        let fake_manifest = PathBuf::from("/tmp/fake-manifest-for-test.json");
1065        state
1066            .install_observer_manifest(fake_manifest.clone())
1067            .expect("install_observer_manifest succeeds on fresh state");
1068        assert_eq!(state.observer_manifest_path(), Some(&fake_manifest));
1069
1070        let err = state.supervisor().map(|_| ()).unwrap_err();
1071        assert!(
1072            err.contains("observe-only"),
1073            "error must mention observe-only mode: {err}"
1074        );
1075        assert!(
1076            err.contains("fake-manifest-for-test.json"),
1077            "error must surface the manifest path so operators know which daemon owns it: {err}"
1078        );
1079    }
1080
1081    #[test]
1082    fn install_observer_manifest_is_idempotent_per_path_collision() {
1083        let state = ServerState::standalone(journal_dir());
1084        let p = PathBuf::from("/tmp/manifest-a.json");
1085        let q = PathBuf::from("/tmp/manifest-b.json");
1086        state.install_observer_manifest(p.clone()).unwrap();
1087        // OnceLock::set returns the value back on collision.
1088        let err = state.install_observer_manifest(q.clone()).unwrap_err();
1089        assert_eq!(err, q);
1090        assert_eq!(state.observer_manifest_path(), Some(&p));
1091    }
1092
1093    #[test]
1094    fn supervisor_if_installed_does_not_lazy_init() {
1095        // The Heisenberg-subscribe guard: `host.subscribe`'s
1096        // identity path must use the non-acquiring read so a
1097        // purely observational client can't cause the daemon to
1098        // claim `<manifest>.lock` as a side effect of asking
1099        // about it. Fresh state has no supervisor installed.
1100        let state = ServerState::standalone(journal_dir());
1101        assert!(state.supervisor_if_installed().is_none());
1102        // observer_manifest_path should remain unset too — no
1103        // implicit init.
1104        assert!(state.observer_manifest_path().is_none());
1105    }
1106}