car_server_core/session.rs
1//! Server-side session state — shared across all connections.
2
3use car_engine::{Runtime, ToolExecutor};
4use car_eventlog::EventLog;
5use car_proto::{ToolExecuteRequest, ToolExecuteResponse};
6use futures::Sink;
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use std::collections::HashMap;
10use std::path::PathBuf;
11use std::pin::Pin;
12use std::sync::atomic::{AtomicU64, Ordering};
13use std::sync::Arc;
14use tokio::sync::{oneshot, Mutex};
15use tokio_tungstenite::tungstenite::{Error as WsError, Message};
16
17/// Type-erased WebSocket sink. The dispatch loop accepts either a
18/// `WebSocketStream<TcpStream>` (the legacy car-server TCP listener)
19/// or a `WebSocketStream<UnixStream>` (the daemon-as-default UDS
20/// listener) — both implement `Sink<Message, Error = WsError>` after
21/// the tungstenite handshake. Erasing the type here avoids cascading
22/// a generic parameter through every WsChannel / Session / ServerState
23/// touchpoint in the dispatcher.
24pub type WsSink = Pin<Box<dyn Sink<Message, Error = WsError> + Send + Unpin + 'static>>;
25
26/// Server-side credentials for continuing an A2A-owned A2UI surface.
27///
28/// This intentionally lives outside `car_a2ui::A2uiSurfaceOwner` so
29/// renderers can inspect surface ownership without receiving secrets.
30#[derive(Debug, Clone, Serialize, Deserialize)]
31#[serde(rename_all = "camelCase", tag = "type")]
32pub enum A2aRouteAuth {
33 None,
34 Bearer { token: String },
35 Header { name: String, value: String },
36}
37
38/// Shared write half of the WebSocket, plus pending callback channels.
39/// `write` is type-erased via [`WsSink`] so the dispatcher can run
40/// against any transport-specific WebSocketStream (TCP or UDS today;
41/// axum-bridged in future) without templatizing every consumer.
42pub struct WsChannel {
43 pub write: Mutex<WsSink>,
44 /// Pending tool execution callbacks: request_id → oneshot sender
45 pub pending: Mutex<HashMap<String, oneshot::Sender<ToolExecuteResponse>>>,
46 pub next_id: AtomicU64,
47}
48
49impl WsChannel {
50 pub fn next_request_id(&self) -> String {
51 let id = self.next_id.fetch_add(1, Ordering::SeqCst);
52 format!("cb-{}", id)
53 }
54
55 /// Test-only stub that returns a WsChannel whose write sink drains
56 /// to nowhere. Used by `host.rs` tests that need a real
57 /// `Arc<WsChannel>` in the subscribers map (to exercise membership
58 /// checks like the cross-session resolve fan-out) without
59 /// constructing a tungstenite handshake. Never writes are
60 /// performed against this stub; if anything tries, the drain sink
61 /// quietly absorbs.
62 #[cfg(test)]
63 pub fn test_stub() -> Self {
64 use futures::sink::SinkExt;
65 let sink: WsSink = Box::pin(futures::sink::drain().sink_map_err(|_| {
66 tokio_tungstenite::tungstenite::Error::ConnectionClosed
67 }));
68 WsChannel {
69 write: Mutex::new(sink),
70 pending: Mutex::new(HashMap::new()),
71 next_id: AtomicU64::new(0),
72 }
73 }
74}
75
76/// In-flight `agents.chat` session bookkeeping. Created when a host
77/// client calls `agents.chat`, removed when the agent emits a terminal
78/// `agent.chat.event` (`kind: "done"` or `"error"`), when either side
79/// disconnects, or when the host cancels via `agents.chat.cancel`.
80///
81/// The session_id is host-supplied (or server-generated when omitted)
82/// and threads through every `agent.chat.event` notification so the
83/// server can route streamed deltas back to the originating host
84/// without needing per-session subscriptions. See
85/// `docs/proposals/agent-chat-surface.md` for the wire contract.
86#[derive(Debug, Clone)]
87pub struct ChatSession {
88 /// Agent that owns this chat — populated from
89 /// `attached_agents` at `agents.chat` dispatch time.
90 pub agent_id: String,
91 /// Client id of the host that issued `agents.chat`. The server
92 /// forwards `agent.chat.event` notifications back to *this* host
93 /// only, so two CarHost windows chatting with the same agent are
94 /// independent streams.
95 pub host_client_id: String,
96 /// Unix-seconds creation time — used by the future stale-session
97 /// sweeper to drop sessions whose agent died without emitting a
98 /// terminal event.
99 pub created_at: u64,
100}
101
102/// Tool executor that sends callbacks to the client over WebSocket.
103pub struct WsToolExecutor {
104 pub channel: Arc<WsChannel>,
105}
106
107#[async_trait::async_trait]
108impl ToolExecutor for WsToolExecutor {
109 async fn execute(&self, tool: &str, params: &Value) -> Result<Value, String> {
110 // Legacy callers that don't have a proposal-level Action.id
111 // (e.g. internal `executor.execute` chains in tests) — emit an
112 // empty action_id so the client-side handler can still see the
113 // payload shape and decide whether to fail loudly.
114 self.execute_with_action(tool, params, "").await
115 }
116
117 async fn execute_with_action(
118 &self,
119 tool: &str,
120 params: &Value,
121 action_id: &str,
122 ) -> Result<Value, String> {
123 use futures::SinkExt;
124
125 // The JSON-RPC request id is the daemon's callback-routing key
126 // (used by the pending-response map below). The `action_id`
127 // FIELD on the payload is the originating proposal Action.id
128 // surfaced to the host so process-wide handlers can route
129 // concurrent callbacks back to per-call dispatchers
130 // (Parslee-ai/car-releases#43 follow-up). They serve different
131 // purposes and must stay distinct: routing id is daemon-side,
132 // action id is host-side.
133 let request_id = self.channel.next_request_id();
134
135 let callback = ToolExecuteRequest {
136 action_id: action_id.to_string(),
137 tool: tool.to_string(),
138 parameters: params.clone(),
139 timeout_ms: None,
140 attempt: 1,
141 };
142
143 // Create a oneshot channel for the response
144 let (tx, rx) = oneshot::channel();
145 self.channel
146 .pending
147 .lock()
148 .await
149 .insert(request_id.clone(), tx);
150
151 // Send the callback to the client as a JSON-RPC request
152 let rpc_request = serde_json::json!({
153 "jsonrpc": "2.0",
154 "method": "tools.execute",
155 "params": callback,
156 "id": request_id,
157 });
158
159 let msg = Message::Text(
160 serde_json::to_string(&rpc_request)
161 .map_err(|e| e.to_string())?
162 .into(),
163 );
164 self.channel
165 .write
166 .lock()
167 .await
168 .send(msg)
169 .await
170 .map_err(|e| format!("failed to send tool callback: {}", e))?;
171
172 // Wait for the client to respond (with a timeout)
173 let response = tokio::time::timeout(std::time::Duration::from_secs(60), rx)
174 .await
175 .map_err(|_| format!("tool '{}' callback timed out (60s)", tool))?
176 .map_err(|_| format!("tool '{}' callback channel closed", tool))?;
177
178 if let Some(err) = response.error {
179 Err(err)
180 } else {
181 Ok(response.output.unwrap_or(Value::Null))
182 }
183 }
184}
185
186/// Voice event sink that forwards events to a specific WebSocket client
187/// as `voice.event` JSON-RPC notifications.
188///
189/// Each `voice.transcribe_stream.start` call constructs one of these
190/// bound to the originating client's [`WsChannel`], so a client only
191/// receives events for sessions it started.
192pub struct WsVoiceEventSink {
193 pub channel: Arc<WsChannel>,
194}
195
196impl car_voice::VoiceEventSink for WsVoiceEventSink {
197 fn send(&self, session_id: &str, event_json: String) {
198 use futures::SinkExt;
199 let channel = self.channel.clone();
200 let session_id = session_id.to_string();
201 tokio::spawn(async move {
202 let payload: Value = serde_json::from_str(&event_json)
203 .unwrap_or_else(|_| Value::String(event_json.clone()));
204 let notification = serde_json::json!({
205 "jsonrpc": "2.0",
206 "method": "voice.event",
207 "params": {
208 "session_id": session_id,
209 "event": payload,
210 },
211 });
212 let Ok(text) = serde_json::to_string(¬ification) else {
213 return;
214 };
215 let _ = channel
216 .write
217 .lock()
218 .await
219 .send(Message::Text(text.into()))
220 .await;
221 });
222 }
223}
224
225/// Per-meeting fanout sink that ingests transcript text into a
226/// session-scoped memgine using the `Arc<tokio::sync::Mutex<...>>`
227/// wrapper, then forwards every event upstream untouched.
228///
229/// Lives here (not in `car-ffi-common`) because the engine handle uses
230/// `tokio::sync::Mutex` per the "one-wrapper rule" — the FFI-common
231/// `MeetingMemgineFanout` still uses `std::sync::Mutex` for the NAPI/
232/// PyO3 bindings, which keep their sync wrappers. Each binding owns the
233/// fanout that matches its lock primitive; the parsing/formatting logic
234/// itself is shared via [`car_meeting::extract_transcript_for_ingest`].
235///
236/// `send` is called from the voice drain task and must be non-blocking,
237/// so the lock acquisition is shipped to a `tokio::spawn`. Transcript
238/// events are independent so reordering across spawned tasks is fine.
239pub struct WsMemgineIngestSink {
240 pub meeting_id: String,
241 pub engine: Arc<Mutex<car_memgine::MemgineEngine>>,
242 pub upstream: Arc<dyn car_voice::VoiceEventSink>,
243}
244
245impl car_voice::VoiceEventSink for WsMemgineIngestSink {
246 fn send(&self, voice_session_id: &str, event_json: String) {
247 if let Ok(value) = serde_json::from_str::<Value>(&event_json) {
248 if let Some((speaker, text)) = car_meeting::extract_transcript_for_ingest(
249 &value,
250 &self.meeting_id,
251 voice_session_id,
252 ) {
253 let engine = self.engine.clone();
254 tokio::spawn(async move {
255 let mut guard = engine.lock().await;
256 guard.ingest_conversation(&speaker, &text, chrono::Utc::now());
257 });
258 }
259 }
260 self.upstream.send(voice_session_id, event_json);
261 }
262}
263
264/// Per-client session.
265pub struct ClientSession {
266 pub client_id: String,
267 pub runtime: Arc<Runtime>,
268 pub channel: Arc<WsChannel>,
269 pub host: Arc<crate::host::HostState>,
270 /// Memgine handle. Wrapped in `tokio::sync::Mutex` so dispatcher
271 /// handlers can hold the lock across `.await` points without
272 /// risking poisoning. Migrated from `std::sync::Mutex` in the
273 /// car-server-core extraction (U1) per the "one-wrapper rule".
274 pub memgine: Arc<Mutex<car_memgine::MemgineEngine>>,
275 /// Lazy browser session — first `browser.run` call launches Chromium,
276 /// subsequent calls reuse it so element IDs resolve across invocations
277 /// within the same WebSocket connection.
278 pub browser: car_ffi_common::browser::BrowserSessionSlot,
279 /// Per-connection auth state. Starts `false`; flips to `true`
280 /// after a successful `session.auth` handshake. Always considered
281 /// authenticated when `ServerState::auth_token` is unset (auth
282 /// disabled). Closes Parslee-ai/car-releases#32.
283 pub authenticated: std::sync::atomic::AtomicBool,
284 /// Bound agent identity (#169). `Some(id)` once a lifecycle-agent
285 /// child has called `session.auth { token, agent_id }` and the
286 /// supervisor confirmed `agent_id` is supervised + token matches.
287 /// Used by `agents.list` to surface which managed agents have
288 /// actually attached vs. just being marked `Running` at the
289 /// process level. Cleared at disconnect by `remove_session`.
290 pub agent_id: tokio::sync::Mutex<Option<String>>,
291 /// Bound persistent memgine (#170). `Some` after `session.auth`
292 /// successfully attaches the connection to a daemon-owned
293 /// per-agent memgine (paired with `agent_id`). Memory handlers
294 /// route through [`ClientSession::effective_memgine`] which
295 /// returns this when set, falling back to the ephemeral
296 /// `memgine` field for browser/host/CLI connections.
297 pub bound_memgine: tokio::sync::Mutex<Option<Arc<Mutex<car_memgine::MemgineEngine>>>>,
298}
299
300impl ClientSession {
301 /// Returns the memgine handle the memory.* handlers should use:
302 /// the bound per-agent memgine when this session attached via
303 /// `session.auth { agent_id }` (#169 + #170), otherwise the
304 /// ephemeral per-WS memgine. Cheap (one async lock + Arc clone).
305 pub async fn effective_memgine(&self) -> Arc<Mutex<car_memgine::MemgineEngine>> {
306 if let Some(eng) = self.bound_memgine.lock().await.as_ref() {
307 return eng.clone();
308 }
309 self.memgine.clone()
310 }
311}
312
313/// Builder for constructing a [`ServerState`] with embedder-supplied
314/// dependencies. Embedders (e.g. `tokhn-daemon`) use this to inject
315/// their own memgine handle and other shared infrastructure; the
316/// Approval-gate policy for high-risk WS methods.
317///
318/// Every method in `methods` must be acknowledged via
319/// `host.resolve_approval` before the dispatcher will route the
320/// request to its handler. The dispatcher waits up to `timeout` for
321/// a resolution; on timeout (or any non-`approve` resolution) the
322/// request fails with JSON-RPC error `-32003`.
323///
324/// Default: gate enabled, the macOS-automation surface
325/// (`automation.run_applescript`, `automation.shortcuts.run`,
326/// `messages.send`, `mail.send`, `vision.ocr`), 60-second timeout.
327/// `car-server --no-approvals` (or embedders calling
328/// [`ServerStateConfig::with_approval_gate`] with `enabled=false`)
329/// turns it off — only appropriate when no untrusted caller can
330/// reach the WS port.
331#[derive(Debug, Clone)]
332pub struct ApprovalGate {
333 /// Master switch. When `false`, every method dispatches without
334 /// raising an approval — the pre-2026-05 behaviour.
335 pub enabled: bool,
336 /// Methods that require approval. Match is by exact method-name
337 /// string against the JSON-RPC `method` field.
338 pub methods: std::collections::HashSet<String>,
339 /// How long to wait for the user to resolve the approval before
340 /// timing out and surfacing an error to the caller.
341 pub timeout: std::time::Duration,
342}
343
344impl Default for ApprovalGate {
345 fn default() -> Self {
346 let methods = [
347 "automation.run_applescript",
348 "automation.shortcuts.run",
349 "messages.send",
350 "mail.send",
351 "vision.ocr",
352 ]
353 .iter()
354 .map(|s| s.to_string())
355 .collect();
356 Self {
357 enabled: true,
358 methods,
359 timeout: std::time::Duration::from_secs(60),
360 }
361 }
362}
363
364impl ApprovalGate {
365 /// Disable the gate entirely. Equivalent to passing
366 /// `car-server --no-approvals`. Only appropriate when no
367 /// untrusted caller can reach the WS port.
368 pub fn disabled() -> Self {
369 Self {
370 enabled: false,
371 methods: std::collections::HashSet::new(),
372 timeout: std::time::Duration::from_secs(60),
373 }
374 }
375
376 /// `true` if this method must be acknowledged before dispatch.
377 pub fn requires_approval(&self, method: &str) -> bool {
378 self.enabled && self.methods.contains(method)
379 }
380}
381
382/// standalone `car-server` binary uses [`ServerState::standalone`]
383/// which calls `with_config` under the hood.
384pub struct ServerStateConfig {
385 pub journal_dir: PathBuf,
386 /// Optional pre-constructed memgine engine. When `None`, each
387 /// `create_session` call builds a fresh engine; embedders that want
388 /// to share a single engine across sessions can supply a clone of
389 /// their `Arc<Mutex<MemgineEngine>>` here.
390 pub shared_memgine: Option<Arc<Mutex<car_memgine::MemgineEngine>>>,
391 /// Optional pre-constructed inference engine.
392 pub inference: Option<Arc<car_inference::InferenceEngine>>,
393 /// Optional embedder-supplied A2A runtime. Used by the in-core
394 /// `A2aDispatcher` to execute peer-driven proposals. When `None`,
395 /// the dispatcher uses a fresh `Runtime` with `register_agent_basics`
396 /// — peer agents see CAR's built-in tools and nothing else,
397 /// matching the behaviour of the standalone `start_a2a_listener`.
398 pub a2a_runtime: Option<Arc<car_engine::Runtime>>,
399 /// Optional embedder-supplied A2A task store. When `None`,
400 /// defaults to `InMemoryTaskStore`. tokhn-style embedders that
401 /// want a polling-friendly persistent store plug it in here.
402 pub a2a_store: Option<Arc<dyn car_a2a::TaskStore>>,
403 /// Optional embedder-supplied agent card factory. When `None`,
404 /// the dispatcher serves a card built from the A2A runtime's
405 /// tool schemas at construction time, advertising its public URL
406 /// as `ws://127.0.0.1:9100/` (the WS surface the dispatcher itself
407 /// is reachable on).
408 pub a2a_card_source: Option<Arc<car_a2a::AgentCardSource>>,
409 /// Approval-gate policy. When `None`, the dispatcher uses
410 /// [`ApprovalGate::default`] (gate ON, the macOS-automation
411 /// surface gated, 60s timeout). Pass
412 /// [`ApprovalGate::disabled`] to opt out — only appropriate
413 /// when no untrusted caller can reach the WS port.
414 pub approval_gate: Option<ApprovalGate>,
415}
416
417impl ServerStateConfig {
418 /// Minimal config suitable for the standalone car-server binary:
419 /// only the journal dir is required; everything else is lazily
420 /// constructed at first use.
421 pub fn new(journal_dir: PathBuf) -> Self {
422 Self {
423 journal_dir,
424 shared_memgine: None,
425 inference: None,
426 a2a_runtime: None,
427 a2a_store: None,
428 a2a_card_source: None,
429 approval_gate: None,
430 }
431 }
432
433 pub fn with_shared_memgine(mut self, engine: Arc<Mutex<car_memgine::MemgineEngine>>) -> Self {
434 self.shared_memgine = Some(engine);
435 self
436 }
437
438 pub fn with_inference(mut self, engine: Arc<car_inference::InferenceEngine>) -> Self {
439 self.inference = Some(engine);
440 self
441 }
442
443 /// Plug in an embedder-supplied runtime for the A2A dispatcher.
444 /// Use case: tokhn-daemon wants peers to see its OPA preflight
445 /// tooling, not just CAR's `register_agent_basics` defaults.
446 pub fn with_a2a_runtime(mut self, runtime: Arc<car_engine::Runtime>) -> Self {
447 self.a2a_runtime = Some(runtime);
448 self
449 }
450
451 /// Plug in an embedder-supplied task store for the A2A
452 /// dispatcher. Use case: tokhn's polling-friendly persistent
453 /// store keyed by their session id.
454 pub fn with_a2a_store(mut self, store: Arc<dyn car_a2a::TaskStore>) -> Self {
455 self.a2a_store = Some(store);
456 self
457 }
458
459 /// Plug in an embedder-supplied agent card factory. The factory
460 /// is invoked on every `agent/getAuthenticatedExtendedCard`
461 /// dispatch, so embedders can reflect runtime tool changes.
462 pub fn with_a2a_card_source(mut self, source: Arc<car_a2a::AgentCardSource>) -> Self {
463 self.a2a_card_source = Some(source);
464 self
465 }
466
467 /// Override the approval-gate policy. Pass
468 /// [`ApprovalGate::disabled`] to skip the gate entirely (only
469 /// appropriate when no untrusted caller can reach the WS port);
470 /// pass a customised [`ApprovalGate`] to add or remove methods
471 /// or to change the timeout.
472 pub fn with_approval_gate(mut self, gate: ApprovalGate) -> Self {
473 self.approval_gate = Some(gate);
474 self
475 }
476}
477
478/// Global server state shared across all connections.
479pub struct ServerState {
480 pub journal_dir: PathBuf,
481 pub sessions: Mutex<HashMap<String, Arc<ClientSession>>>,
482 pub inference: std::sync::OnceLock<Arc<car_inference::InferenceEngine>>,
483 pub host: Arc<crate::host::HostState>,
484 /// When `Some`, `create_session` clones this handle into every new
485 /// `ClientSession.memgine` — embedders that want a single shared
486 /// memgine across all WS sessions set this. Standalone car-server
487 /// leaves it `None`, which gives each session its own engine
488 /// (preserving today's behavior).
489 pub shared_memgine: Option<Arc<Mutex<car_memgine::MemgineEngine>>>,
490 /// Process-wide voice session registry. Each
491 /// `voice.transcribe_stream.start` call registers its own per-client
492 /// [`WsVoiceEventSink`] so events route back to the originating WS
493 /// connection only.
494 pub voice_sessions: Arc<car_voice::VoiceSessionRegistry>,
495 /// Process-wide meeting registry. Meeting ids are global; each
496 /// meeting binds to the originating client's WS for upstream
497 /// events but persists transcripts to the resolved
498 /// `.car/meetings/<id>/` regardless of which client started it.
499 pub meetings: Arc<car_meeting::MeetingRegistry>,
500 /// Process-wide A2UI surface store. Agent-produced surfaces are
501 /// visible to every host UI subscriber, independent of the
502 /// WebSocket session that applied the update.
503 pub a2ui: car_a2ui::A2uiSurfaceStore,
504 /// In-process UI-improvement agent. Invoked from
505 /// `handle_a2ui_render_report` with each inbound report; returned
506 /// `Decision::Patch` envelopes are applied via the standard
507 /// `apply_a2ui_envelope` path so all subscribers see the patch.
508 /// `Arc` so the agent's interior `DashMap` state survives across
509 /// handler calls even when `ServerState` is cheap-cloned.
510 pub ui_agent: Arc<car_ui_agent::UIImprovementAgent>,
511 /// Per-surface oscillation detector for the UI-improvement
512 /// loop. Sits between the agent's `Decision::Patch` and the
513 /// apply path so A→B→A patch cycles get cooled down without
514 /// the agent itself having to track history. neo's review:
515 /// "controllers use workqueue backoff; reconcilers stay
516 /// stateless."
517 pub ui_agent_oscillation: Arc<crate::ui_agent_loop::OscillationDetector>,
518 /// Per-surface iteration budget. Backstop against runaway
519 /// loops the oscillation detector misses — caps total agent-
520 /// driven patches per surface at `DEFAULT_MAX_ITERATIONS`.
521 pub ui_agent_budget: Arc<crate::ui_agent_loop::IterationBudget>,
522 /// Process-wide concurrency gate for inference RPC handlers. Sized
523 /// from host RAM at startup, overridable via
524 /// [`crate::admission::ENV_MAX_CONCURRENT`]. Without this, N
525 /// concurrent users multiply KV-cache and activation memory and
526 /// take the host out (#114-adjacent: filed alongside the daemon
527 /// always-on rework). The semaphore lives on `ServerState` so it
528 /// is shared across every WebSocket session in the same process.
529 pub admission: Arc<crate::admission::InferenceAdmission>,
530 /// Server-side A2A continuation auth keyed by A2UI surface id.
531 /// Kept out of `A2uiSurface.owner` so host renderers never see
532 /// bearer/API-key material.
533 pub a2ui_route_auth: Mutex<HashMap<String, A2aRouteAuth>>,
534 /// Lifecycle-managed agents — declarative manifest at
535 /// `~/.car/agents.json` driving spawn/restart/stop. Closes
536 /// Parslee-ai/car-releases#27. Lazy-initialized so embedders that
537 /// don't want process supervision don't pay the disk-touch cost
538 /// at server start.
539 pub supervisor: std::sync::OnceLock<Arc<car_registry::supervisor::Supervisor>>,
540 /// Manifest path this daemon is *observing* but does NOT own.
541 /// Set by `car-server` when boot-time supervisor construction
542 /// fails with [`car_registry::supervisor::SupervisorError::AlreadyRunning`]
543 /// — another car-server process on the host holds the exclusive
544 /// lock on this manifest. In that state, `supervisor()` returns a
545 /// clear "observe-only" error so mutation handlers refuse
546 /// (preventing the duplicate-spawn bug from
547 /// Parslee-ai/car-releases#44), while read-only handlers
548 /// (`agents.list`, `agents.health`) fall back to
549 /// [`car_registry::supervisor::Supervisor::list_from_manifest`] /
550 /// [`car_registry::supervisor::Supervisor::health_from_manifest`]
551 /// so operators can still inspect what the primary daemon is
552 /// supervising.
553 pub observer_manifest_path: std::sync::OnceLock<PathBuf>,
554 /// In-core A2A dispatcher — embedders that consume `car-server-core`
555 /// get A2A reachability "for free" without standing up a separate
556 /// HTTP listener. Closes Parslee-ai/car-releases#28. Lazy-init so
557 /// the embedder can override the runtime / task store / agent card
558 /// via [`ServerStateConfig::with_a2a_runtime`] etc. before the
559 /// first dispatch.
560 pub a2a_dispatcher: std::sync::OnceLock<Arc<car_a2a::A2aDispatcher>>,
561 /// WS clients subscribed to A2UI envelope events. After every
562 /// successful `a2ui.apply` / `a2ui.ingest`, the resulting
563 /// `A2uiApplyResult` is broadcast to every subscriber as an
564 /// `a2ui.event` JSON-RPC notification. Closes
565 /// Parslee-ai/car-releases#29. Subscribers register via the
566 /// `a2ui/subscribe` method and are auto-cleaned on WS disconnect.
567 pub a2ui_subscribers: Mutex<HashMap<String, Arc<WsChannel>>>,
568 /// Per-launch auth token. When `Some`, the WS dispatcher rejects
569 /// non-auth methods on unauthenticated sessions until the client
570 /// calls `session.auth` with the matching value. When `None`,
571 /// auth is disabled and every connection works as before. Set
572 /// at startup by `car-server` unless `--no-auth` is passed
573 /// (default flipped 2026-05); embedders that want to enable
574 /// auth call [`ServerState::install_auth_token`]. Closes
575 /// Parslee-ai/car-releases#32.
576 pub auth_token: std::sync::OnceLock<String>,
577 /// Parslee cloud identity loaded from the user's OS keychain at
578 /// daemon startup when `car auth login` has been completed.
579 pub parslee_session: std::sync::OnceLock<crate::parslee_auth::ParsleeSession>,
580 /// `agent_id -> client_id` map of currently-attached lifecycle
581 /// agents (#169). Populated by the `session.auth` handler when a
582 /// supervised child presents its `agent_id` + per-agent token;
583 /// drained on disconnect by `remove_session`. Single-claim:
584 /// a second connection presenting the same `agent_id` is
585 /// rejected so the daemon-side per-agent state stays unambiguous.
586 pub attached_agents: Mutex<HashMap<String, String>>,
587 /// `agent_id -> persistent memgine` map (#170). Lazy-loaded on
588 /// first connection per id from `~/.car/memory/agents/<id>.jsonl`,
589 /// retained across daemon restart, surviving any single
590 /// disconnect/reconnect of the supervised child. Connections
591 /// that auth without an `agent_id` (browser, host, ad-hoc CLI)
592 /// keep the per-WS ephemeral memgine on `ClientSession.memgine`
593 /// — no behaviour change.
594 pub agent_memgines: Mutex<HashMap<String, Arc<Mutex<car_memgine::MemgineEngine>>>>,
595 /// In-flight `agents.chat` sessions keyed by `session_id`. See
596 /// [`ChatSession`] for shape. Populated by `agents.chat`,
597 /// cleared on terminal `agent.chat.event` or
598 /// `agents.chat.cancel`. Disconnect cleanup happens in
599 /// `remove_session` — any in-flight session bound to either the
600 /// disconnecting host or agent client is dropped so subsequent
601 /// stray notifications from a respawned agent fall on the floor
602 /// rather than racing into a stale stream.
603 pub chat_sessions: Mutex<HashMap<String, ChatSession>>,
604 /// Bound MCP HTTP-streamable URL (e.g.
605 /// `"http://127.0.0.1:9102/mcp"`) — `car-server` installs this
606 /// after binding the listener. Used by the
607 /// `agents.invoke_external` handler to default
608 /// `InvokeOptions.mcp_endpoint` so external agents
609 /// (Claude Code today) load the daemon's CAR namespace via
610 /// `--mcp-config` automatically. `None` when MCP isn't bound
611 /// (e.g. `--mcp-bind disabled`).
612 pub mcp_url: std::sync::OnceLock<String>,
613 /// Registry of connected MCP SSE sessions. Populated alongside
614 /// [`mcp_url`] when `car-server` boots the MCP listener. Public
615 /// so handlers can call `crate::mcp::push_to_session` to send
616 /// server-initiated requests to a specific MCP-connected
617 /// client (MCP-3 foundation; MCP-3b will wire host-owned tool
618 /// dispatch through this).
619 pub mcp_sessions: std::sync::OnceLock<Arc<crate::mcp::SessionMap>>,
620 /// Approval gate for high-risk WS methods (audit 2026-05). The
621 /// gate intercepts `automation.run_applescript`,
622 /// `automation.shortcuts.run`, `messages.send`, `mail.send`, and
623 /// `vision.ocr` before they dispatch, raises a
624 /// `host.create_approval` for the user to act on, and waits
625 /// (with a timeout) for `host.resolve_approval`. Approve →
626 /// dispatch continues; deny / timeout → JSON-RPC error code
627 /// `-32003`. The set of gated methods and the wait timeout are
628 /// embedder-overridable via
629 /// [`ServerStateConfig::with_approval_gate`].
630 pub approval_gate: ApprovalGate,
631 /// A2A-runtime / store / card factory carried over from the
632 /// embedder's [`ServerStateConfig`]. Consumed lazily on first
633 /// `a2a_dispatcher()` call so embedders can construct
634 /// `ServerState` without paying the runtime spin-up cost when
635 /// they don't actually use the A2A surface.
636 pub(crate) a2a_runtime: std::sync::Mutex<Option<Arc<car_engine::Runtime>>>,
637 pub(crate) a2a_store: std::sync::Mutex<Option<Arc<dyn car_a2a::TaskStore>>>,
638 pub(crate) a2a_card_source: std::sync::Mutex<Option<Arc<car_a2a::AgentCardSource>>>,
639}
640
641impl ServerState {
642 /// Constructor for the standalone `car-server` binary. Each WS
643 /// connection gets its own per-session memgine — matches the
644 /// pre-extraction default and is correct for a single-process
645 /// daemon serving one user at a time.
646 ///
647 /// **Embedders must not call this.** It silently leaves
648 /// `shared_memgine = None`, which re-introduces the dual-memgine
649 /// bug U7 was created to prevent (one engine in the embedder, a
650 /// fresh one inside every WS session). Embedders use
651 /// [`ServerState::embedded`] instead, which makes the shared
652 /// engine handle a required argument so it cannot be forgotten.
653 pub fn standalone(journal_dir: PathBuf) -> Self {
654 Self::with_config(ServerStateConfig::new(journal_dir))
655 }
656
657 /// Constructor for embedders (e.g. `tokhn-daemon`). The shared
658 /// memgine handle is **required**: every WS session created by
659 /// this state will reuse the same engine, preventing the
660 /// dual-memgine bug.
661 ///
662 /// For embedders that also want to inject a pre-warmed inference
663 /// engine or other advanced wiring, build a [`ServerStateConfig`]
664 /// directly and call [`ServerState::with_config`].
665 pub fn embedded(
666 journal_dir: PathBuf,
667 shared_memgine: Arc<Mutex<car_memgine::MemgineEngine>>,
668 ) -> Self {
669 Self::with_config(ServerStateConfig::new(journal_dir).with_shared_memgine(shared_memgine))
670 }
671
672 /// Build a `ServerState` from a [`ServerStateConfig`] — the path
673 /// embedders use when they need to inject a shared memgine *and*
674 /// a pre-warmed inference engine, or any other advanced wiring
675 /// the convenience constructors don't cover.
676 pub fn with_config(cfg: ServerStateConfig) -> Self {
677 let inference = std::sync::OnceLock::new();
678 if let Some(eng) = cfg.inference {
679 // OnceLock::set returns Err if already set — fresh OnceLock
680 // means it's empty, so this is infallible here.
681 let _ = inference.set(eng);
682 }
683 let voice_sessions = Arc::new(car_voice::VoiceSessionRegistry::new());
684 // Reap sessions whose clients dropped without calling
685 // voice.transcribe_stream.stop (WS disconnect, process exit,
686 // etc.). Listener handles otherwise leak for the daemon's
687 // lifetime. `with_config` is sync but always called from the
688 // `#[tokio::main]` entry point, so `Handle::try_current()`
689 // inside `start_sweeper` finds the runtime.
690 voice_sessions.start_sweeper();
691 // UI-improvement agent is pure decision logic — no I/O, no
692 // persistence handle. Memgine ingest of strategy outcomes is
693 // the caller's responsibility (handler.rs after a successful
694 // Decision::Patch). Keeps the agent crate Mutex-flavor
695 // agnostic so it can compose with std/tokio mutex callers.
696 let ui_agent = Arc::new(car_ui_agent::UIImprovementAgent::with_default_strategies());
697 let ui_agent_oscillation = Arc::new(crate::ui_agent_loop::OscillationDetector::new());
698 let ui_agent_budget = Arc::new(crate::ui_agent_loop::IterationBudget::new());
699 Self {
700 journal_dir: cfg.journal_dir,
701 sessions: Mutex::new(HashMap::new()),
702 inference,
703 host: Arc::new(crate::host::HostState::new()),
704 shared_memgine: cfg.shared_memgine,
705 voice_sessions,
706 meetings: Arc::new(car_meeting::MeetingRegistry::new()),
707 a2ui: car_a2ui::A2uiSurfaceStore::new(),
708 ui_agent,
709 ui_agent_oscillation,
710 ui_agent_budget,
711 admission: Arc::new(crate::admission::InferenceAdmission::new()),
712 a2ui_route_auth: Mutex::new(HashMap::new()),
713 supervisor: std::sync::OnceLock::new(),
714 observer_manifest_path: std::sync::OnceLock::new(),
715 a2a_dispatcher: std::sync::OnceLock::new(),
716 a2a_runtime: std::sync::Mutex::new(cfg.a2a_runtime),
717 a2a_store: std::sync::Mutex::new(cfg.a2a_store),
718 a2a_card_source: std::sync::Mutex::new(cfg.a2a_card_source),
719 a2ui_subscribers: Mutex::new(HashMap::new()),
720 auth_token: std::sync::OnceLock::new(),
721 parslee_session: std::sync::OnceLock::new(),
722 attached_agents: Mutex::new(HashMap::new()),
723 agent_memgines: Mutex::new(HashMap::new()),
724 chat_sessions: Mutex::new(HashMap::new()),
725 mcp_url: std::sync::OnceLock::new(),
726 mcp_sessions: std::sync::OnceLock::new(),
727 approval_gate: cfg.approval_gate.unwrap_or_default(),
728 }
729 }
730
731 /// Enable the per-launch auth handshake. After this call, every
732 /// new WS connection must call `session.auth` with `token` as
733 /// the first frame; otherwise the connection is closed. Called
734 /// by `car-server` at startup unless `--no-auth` is set
735 /// (default flipped 2026-05); embedders supply their own token
736 /// if they want the same posture. Returns `Err(token)` when
737 /// auth was already installed.
738 pub fn install_auth_token(&self, token: String) -> Result<(), String> {
739 self.auth_token.set(token)
740 }
741
742 pub fn install_parslee_session(
743 &self,
744 session: crate::parslee_auth::ParsleeSession,
745 ) -> Result<(), crate::parslee_auth::ParsleeSession> {
746 self.parslee_session.set(session)
747 }
748
749 /// Install the bound MCP URL after car-server's listener is up.
750 /// Idempotent on the first call; subsequent calls are accepted
751 /// silently (matches the supervisor / a2a_dispatcher install
752 /// idiom). Returns `Err(())` when an MCP URL was already
753 /// installed — embedders should treat this as "another
754 /// component beat us to it" and use whichever value is now set.
755 pub fn install_mcp_url(&self, url: String) -> Result<(), String> {
756 self.mcp_url.set(url)
757 }
758
759 /// Install the MCP SSE session registry. Pairs with
760 /// [`install_mcp_url`] — both come from the same `start_mcp`
761 /// call and either both get installed or neither does (the
762 /// daemon binds them together).
763 pub fn install_mcp_sessions(
764 &self,
765 sessions: Arc<crate::mcp::SessionMap>,
766 ) -> Result<(), Arc<crate::mcp::SessionMap>> {
767 self.mcp_sessions.set(sessions)
768 }
769
770 /// Lazy-initialize and return the agent supervisor. The first
771 /// call constructs a [`car_registry::supervisor::Supervisor`] backed by
772 /// `~/.car/agents.json` + `~/.car/logs/`. Embedders that need a
773 /// non-default location should call
774 /// [`ServerState::install_supervisor`] before any handler runs.
775 ///
776 /// In observer mode (set via [`install_observer_manifest`]),
777 /// returns a clear error mentioning the manifest path the
778 /// primary daemon owns. This prevents the second daemon from
779 /// re-attempting `user_default()` (which would also fail with
780 /// `AlreadyRunning`) on every WS call, and gives mutation
781 /// handlers a stable refusal path. Read-only handlers
782 /// (`agents.list`, `agents.health`) should call
783 /// [`Self::observer_manifest_path`] first and fall back to
784 /// [`car_registry::supervisor::Supervisor::list_from_manifest`] /
785 /// `health_from_manifest` when set. Closes
786 /// Parslee-ai/car-releases#44.
787 pub fn supervisor(&self) -> Result<Arc<car_registry::supervisor::Supervisor>, String> {
788 if let Some(s) = self.supervisor.get() {
789 return Ok(s.clone());
790 }
791 if let Some(p) = self.observer_manifest_path.get() {
792 return Err(format!(
793 "this car-server is observe-only — another car-server process \
794 holds the supervisor lock for {}. Mutations refuse here; route \
795 them to the primary daemon, or stop the other car-server first.",
796 p.display()
797 ));
798 }
799 let s = car_registry::supervisor::Supervisor::user_default()
800 .map(Arc::new)
801 .map_err(|e| e.to_string())?;
802 // OnceLock::set returns the original arg back on collision —
803 // a concurrent caller racing through user_default. Take
804 // whichever wins.
805 let _ = self.supervisor.set(s);
806 Ok(self.supervisor.get().expect("set or pre-existing").clone())
807 }
808
809 /// Replace the lazy default with a caller-supplied supervisor.
810 /// Returns `Err(())` when a supervisor was already installed.
811 /// Used by the standalone `car-server` binary to call
812 /// `start_all()` on a known-good handle without paying the
813 /// lazy-init lookup cost.
814 pub fn install_supervisor(
815 &self,
816 supervisor: Arc<car_registry::supervisor::Supervisor>,
817 ) -> Result<(), Arc<car_registry::supervisor::Supervisor>> {
818 self.supervisor.set(supervisor)
819 }
820
821 /// Non-acquiring read of the currently-installed supervisor.
822 /// Unlike [`supervisor`](Self::supervisor), this does NOT lazy-
823 /// init via `user_default()` — it returns `None` instead of
824 /// constructing a fresh `Supervisor` and acquiring the
825 /// `<manifest>.lock` as a side effect. Use this from read-only
826 /// metadata paths (`host.subscribe` identity, status surfaces)
827 /// where causing lock acquisition on observation would be a
828 /// Heisenberg subscribe — the act of asking "do you own the
829 /// lock?" must not be the act of taking it.
830 pub fn supervisor_if_installed(&self) -> Option<Arc<car_registry::supervisor::Supervisor>> {
831 self.supervisor.get().cloned()
832 }
833
834 /// Mark this daemon as *observing* a manifest owned by another
835 /// car-server process. After this call, `supervisor()` returns
836 /// an "observe-only" error and read-only handlers
837 /// (`agents.list`, `agents.health`) fall back to the static
838 /// `Supervisor::list_from_manifest` / `health_from_manifest`
839 /// paths. Idempotent — subsequent calls with the same path are
840 /// no-ops; a different path returns `Err(())`. Closes
841 /// Parslee-ai/car-releases#44.
842 pub fn install_observer_manifest(&self, path: PathBuf) -> Result<(), PathBuf> {
843 self.observer_manifest_path.set(path)
844 }
845
846 /// Path of the manifest this daemon is observing but not
847 /// supervising. `None` when this daemon owns the supervisor
848 /// (the normal case) or when no manifest is configured at all
849 /// (no `HOME`, embedder didn't install one).
850 pub fn observer_manifest_path(&self) -> Option<&PathBuf> {
851 self.observer_manifest_path.get()
852 }
853
854 /// Lazy-initialize and return the in-core A2A dispatcher. The
855 /// first call constructs an [`car_a2a::A2aDispatcher`] from
856 /// either the embedder's overrides (set via
857 /// [`ServerStateConfig::with_a2a_runtime`] / `with_a2a_store` /
858 /// `with_a2a_card_source`) or sensible defaults: a fresh
859 /// `Runtime` with `register_agent_basics` registered, an
860 /// `InMemoryTaskStore`, and a card built from the runtime's
861 /// tool schemas advertising `ws://127.0.0.1:9100/` as the
862 /// public URL. Closes Parslee-ai/car-releases#28.
863 pub async fn a2a_dispatcher(&self) -> Arc<car_a2a::A2aDispatcher> {
864 if let Some(d) = self.a2a_dispatcher.get() {
865 return d.clone();
866 }
867
868 // Embedder overrides take precedence; fall back to defaults
869 // for each slot independently (so an embedder that only
870 // wants a custom card can leave the runtime + store at
871 // defaults). `Mutex::take()` consumes the slot so the
872 // defaults aren't reconstructed on a racing init that loses
873 // the OnceLock::set call below.
874 let runtime = self
875 .a2a_runtime
876 .lock()
877 .expect("a2a_runtime mutex poisoned")
878 .take();
879 let runtime = match runtime {
880 Some(r) => r,
881 None => {
882 let r = Arc::new(car_engine::Runtime::new());
883 r.register_agent_basics().await;
884 r
885 }
886 };
887
888 let store = self
889 .a2a_store
890 .lock()
891 .expect("a2a_store mutex poisoned")
892 .take()
893 .unwrap_or_else(|| Arc::new(car_a2a::InMemoryTaskStore::new()));
894
895 let card_source = self
896 .a2a_card_source
897 .lock()
898 .expect("a2a_card_source mutex poisoned")
899 .take();
900 let card_source = match card_source {
901 Some(c) => c,
902 None => {
903 let card = car_a2a::build_default_agent_card(
904 &runtime,
905 car_a2a::AgentCardConfig::minimal(
906 "Common Agent Runtime",
907 "Embedded CAR daemon — A2A v1.0 reachable over WebSocket JSON-RPC.",
908 "ws://127.0.0.1:9100/",
909 car_a2a::AgentProvider {
910 organization: "Parslee".into(),
911 url: Some("https://github.com/Parslee-ai/car".into()),
912 },
913 ),
914 )
915 .await;
916 Arc::new(move || card.clone()) as Arc<car_a2a::AgentCardSource>
917 }
918 };
919
920 let dispatcher = Arc::new(car_a2a::A2aDispatcher::new(runtime, store, card_source));
921 // OnceLock::set returns Err on race — accept whichever
922 // dispatcher won and clone-return that one.
923 let _ = self.a2a_dispatcher.set(dispatcher);
924 self.a2a_dispatcher
925 .get()
926 .expect("a2a_dispatcher set or pre-existing")
927 .clone()
928 }
929
930 pub async fn create_session(
931 &self,
932 client_id: &str,
933 channel: Arc<WsChannel>,
934 ) -> Arc<ClientSession> {
935 let journal_path = self.journal_dir.join(format!("{}.jsonl", client_id));
936 let event_log = EventLog::with_journal(journal_path);
937
938 let executor = Arc::new(WsToolExecutor {
939 channel: channel.clone(),
940 });
941
942 let runtime = Runtime::new()
943 .with_event_log(event_log)
944 .with_executor(executor);
945
946 // If the embedder supplied a shared memgine, every session uses it.
947 // Otherwise each session gets its own — matches pre-extraction behavior.
948 let memgine = match &self.shared_memgine {
949 Some(eng) => eng.clone(),
950 None => Arc::new(Mutex::new(car_memgine::MemgineEngine::new(None))),
951 };
952
953 let session = Arc::new(ClientSession {
954 client_id: client_id.to_string(),
955 runtime: Arc::new(runtime),
956 channel,
957 host: self.host.clone(),
958 memgine,
959 browser: car_ffi_common::browser::BrowserSessionSlot::new(),
960 // When auth is disabled (no token installed), every
961 // session is "authenticated" by default — preserves the
962 // pre-#32 behaviour. When auth is enabled, the value is
963 // ignored on creation; the dispatcher's gate checks
964 // `ServerState::auth_token.is_some()` to decide whether
965 // to enforce.
966 authenticated: std::sync::atomic::AtomicBool::new(false),
967 agent_id: tokio::sync::Mutex::new(None),
968 bound_memgine: tokio::sync::Mutex::new(None),
969 });
970
971 self.sessions
972 .lock()
973 .await
974 .insert(client_id.to_string(), session.clone());
975
976 session
977 }
978
979 /// Remove a per-client session from the registry on disconnect.
980 /// Returns the removed session if present so callers can drop any
981 /// remaining strong refs (e.g. drain pending tool callbacks). Fix
982 /// for MULTI-4 / WS-3 — without this, `state.sessions` retains
983 /// `Arc<ClientSession>` for every connection that ever existed.
984 pub async fn remove_session(&self, client_id: &str) -> Option<Arc<ClientSession>> {
985 let removed = self.sessions.lock().await.remove(client_id);
986 if let Some(session) = &removed {
987 // #169: drop the agent_id → client_id binding so a
988 // disconnected lifecycle agent can reconnect (or its
989 // supervisor-respawned replacement can take the slot)
990 // without colliding with the stale claim.
991 let bound = session.agent_id.lock().await.clone();
992 if let Some(id) = bound {
993 let mut attached = self.attached_agents.lock().await;
994 if attached.get(&id).map(String::as_str) == Some(client_id) {
995 attached.remove(&id);
996 }
997 }
998 // Drop any in-flight `agents.chat` sessions bound to this
999 // client — either side disconnecting orphans the stream,
1000 // and a respawned agent's stray `agent.chat.event`
1001 // notifications must not race into a stale routing entry.
1002 // See `docs/proposals/agent-chat-surface.md`.
1003 let bound_agent = session.agent_id.lock().await.clone();
1004 let mut chats = self.chat_sessions.lock().await;
1005 chats.retain(|_, s| {
1006 if s.host_client_id == client_id {
1007 return false;
1008 }
1009 if let Some(agent_id) = &bound_agent {
1010 if &s.agent_id == agent_id {
1011 return false;
1012 }
1013 }
1014 true
1015 });
1016 }
1017 removed
1018 }
1019}
1020
1021#[cfg(test)]
1022mod observer_mode_tests {
1023 use super::*;
1024
1025 fn journal_dir() -> PathBuf {
1026 let target = std::env::var_os("CARGO_TARGET_DIR")
1027 .map(std::path::PathBuf::from)
1028 .unwrap_or_else(|| {
1029 std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
1030 .join("..")
1031 .join("..")
1032 .join("target")
1033 });
1034 std::fs::create_dir_all(&target).ok();
1035 let target = std::fs::canonicalize(&target).unwrap_or(target);
1036 let tmp = tempfile::TempDir::new_in(&target).unwrap();
1037 let p = tmp.path().to_path_buf();
1038 std::mem::forget(tmp); // keep the dir alive for the test
1039 p
1040 }
1041
1042 #[test]
1043 fn supervisor_returns_observer_error_when_marker_set() {
1044 // Closes Parslee-ai/car-releases#44: the second car-server on
1045 // a host installs the observer marker after `with_paths`
1046 // returns AlreadyRunning. Subsequent `state.supervisor()`
1047 // calls must return a clear "observe-only" error mentioning
1048 // the manifest path — they must NOT retry user_default()
1049 // (which would re-acquire the lock and likely also fail).
1050 let state = ServerState::standalone(journal_dir());
1051 let fake_manifest = PathBuf::from("/tmp/fake-manifest-for-test.json");
1052 state
1053 .install_observer_manifest(fake_manifest.clone())
1054 .expect("install_observer_manifest succeeds on fresh state");
1055 assert_eq!(state.observer_manifest_path(), Some(&fake_manifest));
1056
1057 let err = state.supervisor().map(|_| ()).unwrap_err();
1058 assert!(
1059 err.contains("observe-only"),
1060 "error must mention observe-only mode: {err}"
1061 );
1062 assert!(
1063 err.contains("fake-manifest-for-test.json"),
1064 "error must surface the manifest path so operators know which daemon owns it: {err}"
1065 );
1066 }
1067
1068 #[test]
1069 fn install_observer_manifest_is_idempotent_per_path_collision() {
1070 let state = ServerState::standalone(journal_dir());
1071 let p = PathBuf::from("/tmp/manifest-a.json");
1072 let q = PathBuf::from("/tmp/manifest-b.json");
1073 state.install_observer_manifest(p.clone()).unwrap();
1074 // OnceLock::set returns the value back on collision.
1075 let err = state.install_observer_manifest(q.clone()).unwrap_err();
1076 assert_eq!(err, q);
1077 assert_eq!(state.observer_manifest_path(), Some(&p));
1078 }
1079
1080 #[test]
1081 fn supervisor_if_installed_does_not_lazy_init() {
1082 // The Heisenberg-subscribe guard: `host.subscribe`'s
1083 // identity path must use the non-acquiring read so a
1084 // purely observational client can't cause the daemon to
1085 // claim `<manifest>.lock` as a side effect of asking
1086 // about it. Fresh state has no supervisor installed.
1087 let state = ServerState::standalone(journal_dir());
1088 assert!(state.supervisor_if_installed().is_none());
1089 // observer_manifest_path should remain unset too — no
1090 // implicit init.
1091 assert!(state.observer_manifest_path().is_none());
1092 }
1093}