Skip to main content

car_server_core/
session.rs

1//! Server-side session state — shared across all connections.
2
3use car_engine::{Runtime, ToolExecutor};
4use car_eventlog::EventLog;
5use car_proto::{ToolExecuteRequest, ToolExecuteResponse};
6use futures::Sink;
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use std::collections::HashMap;
10use std::path::PathBuf;
11use std::pin::Pin;
12use std::sync::atomic::{AtomicU64, Ordering};
13use std::sync::Arc;
14use tokio::sync::{oneshot, Mutex};
15use tokio_tungstenite::tungstenite::{Error as WsError, Message};
16
17/// Type-erased WebSocket sink. The dispatch loop accepts either a
18/// `WebSocketStream<TcpStream>` (the legacy car-server TCP listener)
19/// or a `WebSocketStream<UnixStream>` (the daemon-as-default UDS
20/// listener) — both implement `Sink<Message, Error = WsError>` after
21/// the tungstenite handshake. Erasing the type here avoids cascading
22/// a generic parameter through every WsChannel / Session / ServerState
23/// touchpoint in the dispatcher.
24pub type WsSink = Pin<Box<dyn Sink<Message, Error = WsError> + Send + Unpin + 'static>>;
25
26/// Grace window applied on disconnect before a still-open run is marked
27/// `Incomplete` (agent run tracing, U1 — R5). Short on purpose: it only
28/// has to cover the gap between a healthy `runs.complete` being
29/// dispatched on a spawned task and its terminal record landing, not any
30/// real work. Long enough to absorb that scheduling jitter, short enough
31/// that a genuinely abandoned run is reported `Incomplete` promptly.
32pub const RUN_COMPLETE_GRACE: std::time::Duration = std::time::Duration::from_millis(250);
33
34/// Server-side credentials for continuing an A2A-owned A2UI surface.
35///
36/// This intentionally lives outside `car_a2ui::A2uiSurfaceOwner` so
37/// renderers can inspect surface ownership without receiving secrets.
38#[derive(Debug, Clone, Serialize, Deserialize)]
39#[serde(rename_all = "camelCase", tag = "type")]
40pub enum A2aRouteAuth {
41    None,
42    Bearer { token: String },
43    Header { name: String, value: String },
44}
45
46/// Shared write half of the WebSocket, plus pending callback channels.
47/// `write` is type-erased via [`WsSink`] so the dispatcher can run
48/// against any transport-specific WebSocketStream (TCP or UDS today;
49/// axum-bridged in future) without templatizing every consumer.
50pub struct WsChannel {
51    pub write: Mutex<WsSink>,
52    /// Pending tool execution callbacks: request_id → oneshot sender
53    pub pending: Mutex<HashMap<String, oneshot::Sender<ToolExecuteResponse>>>,
54    pub next_id: AtomicU64,
55}
56
57impl WsChannel {
58    pub fn next_request_id(&self) -> String {
59        let id = self.next_id.fetch_add(1, Ordering::SeqCst);
60        format!("cb-{}", id)
61    }
62
63    /// Test-only stub that returns a WsChannel whose write sink drains
64    /// to nowhere. Used by `host.rs` tests that need a real
65    /// `Arc<WsChannel>` in the subscribers map (to exercise membership
66    /// checks like the cross-session resolve fan-out) without
67    /// constructing a tungstenite handshake. Never writes are
68    /// performed against this stub; if anything tries, the drain sink
69    /// quietly absorbs.
70    #[cfg(test)]
71    pub fn test_stub() -> Self {
72        use futures::sink::SinkExt;
73        let sink: WsSink = Box::pin(futures::sink::drain().sink_map_err(|_| {
74            tokio_tungstenite::tungstenite::Error::ConnectionClosed
75        }));
76        WsChannel {
77            write: Mutex::new(sink),
78            pending: Mutex::new(HashMap::new()),
79            next_id: AtomicU64::new(0),
80        }
81    }
82}
83
84/// In-flight `agents.chat` session bookkeeping. Created when a host
85/// client calls `agents.chat`, removed when the agent emits a terminal
86/// `agent.chat.event` (`kind: "done"` or `"error"`), when either side
87/// disconnects, or when the host cancels via `agents.chat.cancel`.
88///
89/// The session_id is host-supplied (or server-generated when omitted)
90/// and threads through every `agent.chat.event` notification so the
91/// server can route streamed deltas back to the originating host
92/// without needing per-session subscriptions. See
93/// `docs/proposals/agent-chat-surface.md` for the wire contract.
94#[derive(Debug, Clone)]
95pub struct ChatSession {
96    /// Agent that owns this chat — populated from
97    /// `attached_agents` at `agents.chat` dispatch time.
98    pub agent_id: String,
99    /// Client id of the host that issued `agents.chat`. The server
100    /// forwards `agent.chat.event` notifications back to *this* host
101    /// only, so two CarHost windows chatting with the same agent are
102    /// independent streams.
103    pub host_client_id: String,
104    /// Unix-seconds creation time — used by the future stale-session
105    /// sweeper to drop sessions whose agent died without emitting a
106    /// terminal event.
107    pub created_at: u64,
108}
109
110/// Daemon-side record of a single agent run (agent run tracing, U1).
111///
112/// Keyed by `run_id` in the process-wide [`ServerState::runs`] registry
113/// so the record survives the WS connection that produced it — needed
114/// both for the disconnect grace window (R5) and so the disk store (U3)
115/// can flush a run even after its session is gone. U1 keeps this purely
116/// in memory; persistence is U3.
117#[derive(Debug, Clone)]
118pub struct RunMeta {
119    pub run_id: String,
120    /// Owning agent. Set at `runs.start` from the resolved id; used by
121    /// the read RPCs' ownership check (U5/KTD10) and the disk key (U3).
122    pub agent_id: String,
123    /// WS `client_id` that called `runs.start`. Lets disconnect cleanup
124    /// find this run's owning connection.
125    pub client_id: String,
126    pub intent: String,
127    pub outcome_description: Option<String>,
128    pub started_at: chrono::DateTime<chrono::Utc>,
129    /// `None` while the run is in progress; `Some` once a terminal
130    /// record (a reported outcome or an `Incomplete` marker) is
131    /// written. The presence of this field is the "is this run still
132    /// open?" signal the grace window checks.
133    pub termination: Option<car_proto::RunTermination>,
134    /// When the terminal record was written, if any.
135    pub ended_at: Option<chrono::DateTime<chrono::Utc>>,
136    /// The ordered per-turn trace recorded for this run (agent run
137    /// tracing, U2). Each entry is a [`car_proto::RunRecord::Turn`]
138    /// produced by the recorder from a submitted proposal + its
139    /// `ActionResult`s. U1 leaves this empty; the U2 recorder appends to
140    /// it via [`ServerState::record_run_turns`]. U3 flushes this buffer
141    /// to disk and U4 broadcasts it — both read it through
142    /// [`ServerState::run_turns`]. The `turn.index` field is monotonic
143    /// across the run's proposals, so this Vec is the clean ordered
144    /// stream those units consume.
145    pub turns: Vec<car_proto::RunRecord>,
146}
147
148/// Per-run turn ceiling — a runaway-loop backstop sized well above any
149/// healthy main-agent-only cycle (tens of turns), never a trimmer. This is
150/// a TRUE hard cap, and it is enforced HERE — inside
151/// [`ServerState::record_run_turns`], under the `runs` lock — not only in
152/// the WS handler's pre-check. The handler keeps a fast-path pre-check off a
153/// lock-free snapshot, but the dispatcher spawns a task per frame, so
154/// pipelined `runs.record_turns` batches can all read a sub-ceiling snapshot
155/// and pass that pre-check before any of them appends. Only the under-lock
156/// check below is atomic with the append, so it is the one that actually
157/// bounds the run (ADV-1). A batch that would take the run PAST this many
158/// recorded turns is refused WHOLE.
159pub const RECORD_TURNS_RUN_CEILING: usize = 2000;
160
161/// Outcome of [`ServerState::record_run_turns`] — distinguishes the three
162/// reasons a batch can fail to land so the caller maps each to the right
163/// `runs.record_turns` drop reason (ADV-1). Before this enum the function
164/// returned a bare `usize` (the new total, or `0` for "nothing appended"),
165/// which collapsed an under-lock CEILING refusal and an unknown/terminal run
166/// into the same `0` — the handler then mislabeled a ceiling refusal as
167/// `run_terminal`.
168#[derive(Debug, Clone, Copy, PartialEq, Eq)]
169pub enum RecordRunTurnsOutcome {
170    /// The batch was appended; the run's new total turn count.
171    Appended { new_total: usize },
172    /// The batch was refused WHOLE because it would take the run past
173    /// [`RECORD_TURNS_RUN_CEILING`] (a runaway backstop). Maps to
174    /// `dropped: "run_turn_limit"`.
175    RefusedCeiling,
176    /// The run id is unknown to this process OR already terminal — nothing
177    /// was appended. Maps to `dropped: "run_terminal"` (the benign TOCTOU:
178    /// the run went terminal between the handler's pre-check and the
179    /// append), matching the prior silent-zero semantics.
180    UnknownOrTerminal,
181}
182
183impl RunMeta {
184    /// True once a terminal record (outcome or `Incomplete`) is set.
185    pub fn is_terminal(&self) -> bool {
186        self.termination.is_some()
187    }
188
189    /// The coarse live status of this run for the U4 subscribe snapshot /
190    /// `runs.trace.event` (agent run tracing). Mirrors `RunStore`'s
191    /// `RunStatus` but reads off the in-memory `RunMeta` so the live path
192    /// never touches disk.
193    pub fn live_status(&self) -> car_proto::RunLiveStatus {
194        match &self.termination {
195            None => car_proto::RunLiveStatus::InProgress,
196            Some(car_proto::RunTermination::Outcome { .. }) => {
197                car_proto::RunLiveStatus::Completed
198            }
199            Some(car_proto::RunTermination::Incomplete) => car_proto::RunLiveStatus::Incomplete,
200        }
201    }
202
203    /// Count of `RunRecord::Turn` entries in this run's buffer — the live
204    /// turn cursor. Equals `turns.len()` because the buffer holds only
205    /// `Turn` records (Started/Ended are not buffered here).
206    pub fn turn_cursor(&self) -> usize {
207        self.turns.len()
208    }
209}
210
211/// Default ceiling for the daemon→host `tools.execute` callback wait when
212/// an action carries no explicit `timeout_ms`. Overridable via
213/// `CAR_TOOL_TIMEOUT` (seconds). Raised from the old hardcoded 60s — real
214/// tools (build steps, CLI drivers, slow APIs) routinely run longer, and a
215/// 60s ceiling reaped them regardless of the agent's budget (Parslee-ai/car#259).
216pub const DEFAULT_TOOL_TIMEOUT_MS: u64 = 300_000;
217
218/// Grace added to the callback wait above an action's own `timeout_ms`. The
219/// executor (`car-engine`) already wraps each attempt in
220/// `timeout(action.timeout_ms, dispatch)`; if this wait used the *same*
221/// value it would race that outer deadline with zero slack. By waiting a
222/// little longer, the executor's deadline fires first and reaps+rolls back
223/// cleanly with its structured "action timed out" error — this wait is only
224/// a transport backstop for the case the executor deadline doesn't apply.
225const TOOL_TIMEOUT_GRACE_MS: u64 = 5_000;
226
227/// The callback-wait budget for one tool call.
228///
229/// - **`Some(X)`**: the executor is the authority (it bounds the attempt at
230///   `X`); this wait is `X + grace`, a backstop that lets the executor reap
231///   first. So the agent's per-action budget takes effect — the bug was that
232///   this wait used a hardcoded 60s and ignored `X` entirely (car#259).
233/// - **`None`**: the executor applies **no** deadline, so this wait is the
234///   *sole* bound on the call — the `CAR_TOOL_TIMEOUT`-overridable
235///   [`DEFAULT_TOOL_TIMEOUT_MS`].
236fn tool_callback_timeout(action_timeout_ms: Option<u64>) -> std::time::Duration {
237    let ms = match action_timeout_ms {
238        Some(budget) => budget.saturating_add(TOOL_TIMEOUT_GRACE_MS),
239        None => std::env::var("CAR_TOOL_TIMEOUT")
240            .ok()
241            .and_then(|s| s.trim().parse::<u64>().ok())
242            .map(|secs| secs.saturating_mul(1000))
243            .unwrap_or(DEFAULT_TOOL_TIMEOUT_MS),
244    };
245    std::time::Duration::from_millis(ms)
246}
247
248/// Tool executor that sends callbacks to the client over WebSocket.
249pub struct WsToolExecutor {
250    pub channel: Arc<WsChannel>,
251}
252
253#[async_trait::async_trait]
254impl ToolExecutor for WsToolExecutor {
255    async fn execute(&self, tool: &str, params: &Value) -> Result<Value, String> {
256        // Legacy callers that don't have a proposal-level Action.id
257        // (e.g. internal `executor.execute` chains in tests) — emit an
258        // empty action_id so the client-side handler can still see the
259        // payload shape and decide whether to fail loudly.
260        self.execute_with_action(tool, params, "", None).await
261    }
262
263    async fn execute_with_action(
264        &self,
265        tool: &str,
266        params: &Value,
267        action_id: &str,
268        timeout_ms: Option<u64>,
269    ) -> Result<Value, String> {
270        use futures::SinkExt;
271
272        // The JSON-RPC request id is the daemon's callback-routing key
273        // (used by the pending-response map below). The `action_id`
274        // FIELD on the payload is the originating proposal Action.id
275        // surfaced to the host so process-wide handlers can route
276        // concurrent callbacks back to per-call dispatchers
277        // (Parslee-ai/car-releases#43 follow-up). They serve different
278        // purposes and must stay distinct: routing id is daemon-side,
279        // action id is host-side.
280        let request_id = self.channel.next_request_id();
281
282        let callback = ToolExecuteRequest {
283            action_id: action_id.to_string(),
284            tool: tool.to_string(),
285            parameters: params.clone(),
286            // Surface the action's budget to the host so its own tool runner
287            // can bound the work too (Parslee-ai/car#259) — was always None.
288            timeout_ms,
289            attempt: 1,
290        };
291
292        // Create a oneshot channel for the response
293        let (tx, rx) = oneshot::channel();
294        self.channel
295            .pending
296            .lock()
297            .await
298            .insert(request_id.clone(), tx);
299
300        // Send the callback to the client as a JSON-RPC request
301        let rpc_request = serde_json::json!({
302            "jsonrpc": "2.0",
303            "method": "tools.execute",
304            "params": callback,
305            "id": request_id,
306        });
307
308        let msg = Message::Text(
309            serde_json::to_string(&rpc_request)
310                .map_err(|e| e.to_string())?
311                .into(),
312        );
313        self.channel
314            .write
315            .lock()
316            .await
317            .send(msg)
318            .await
319            .map_err(|e| format!("failed to send tool callback: {}", e))?;
320
321        // Wait for the client to respond, bounded by the action's budget
322        // (or the configurable default) — NOT a hardcoded 60s. This is the
323        // ceiling that previously ignored `Action.timeout_ms` and reaped
324        // legitimately-long tool calls (Parslee-ai/car#259).
325        let wait = tool_callback_timeout(timeout_ms);
326        let response = tokio::time::timeout(wait, rx)
327            .await
328            .map_err(|_| {
329                format!(
330                    "tool '{}' callback timed out ({}s)",
331                    tool,
332                    wait.as_secs()
333                )
334            })?
335            .map_err(|_| format!("tool '{}' callback channel closed", tool))?;
336
337        if let Some(err) = response.error {
338            Err(err)
339        } else {
340            Ok(response.output.unwrap_or(Value::Null))
341        }
342    }
343}
344
345/// Voice event sink that forwards events to a specific WebSocket client
346/// as `voice.event` JSON-RPC notifications.
347///
348/// Each `voice.transcribe_stream.start` call constructs one of these
349/// bound to the originating client's [`WsChannel`], so a client only
350/// receives events for sessions it started.
351pub struct WsVoiceEventSink {
352    pub channel: Arc<WsChannel>,
353}
354
355impl car_voice::VoiceEventSink for WsVoiceEventSink {
356    fn send(&self, session_id: &str, event_json: String) {
357        use futures::SinkExt;
358        let channel = self.channel.clone();
359        let session_id = session_id.to_string();
360        tokio::spawn(async move {
361            let payload: Value = serde_json::from_str(&event_json)
362                .unwrap_or_else(|_| Value::String(event_json.clone()));
363            let notification = serde_json::json!({
364                "jsonrpc": "2.0",
365                "method": "voice.event",
366                "params": {
367                    "session_id": session_id,
368                    "event": payload,
369                },
370            });
371            let Ok(text) = serde_json::to_string(&notification) else {
372                return;
373            };
374            let _ = channel
375                .write
376                .lock()
377                .await
378                .send(Message::Text(text.into()))
379                .await;
380        });
381    }
382
383    fn send_binary(&self, frame: Vec<u8>) {
384        use futures::SinkExt;
385        let channel = self.channel.clone();
386        tokio::spawn(async move {
387            let _ = channel
388                .write
389                .lock()
390                .await
391                .send(Message::Binary(frame.into()))
392                .await;
393        });
394    }
395}
396
397/// Per-meeting fanout sink that ingests transcript text into a
398/// session-scoped memgine using the `Arc<tokio::sync::Mutex<...>>`
399/// wrapper, then forwards every event upstream untouched.
400///
401/// Lives here (not in `car-ffi-common`) because the engine handle uses
402/// `tokio::sync::Mutex` per the "one-wrapper rule" — the FFI-common
403/// `MeetingMemgineFanout` still uses `std::sync::Mutex` for the NAPI/
404/// PyO3 bindings, which keep their sync wrappers. Each binding owns the
405/// fanout that matches its lock primitive; the parsing/formatting logic
406/// itself is shared via [`car_meeting::extract_transcript_for_ingest`].
407///
408/// `send` is called from the voice drain task and must be non-blocking,
409/// so the lock acquisition is shipped to a `tokio::spawn`. Transcript
410/// events are independent so reordering across spawned tasks is fine.
411pub struct WsMemgineIngestSink {
412    pub meeting_id: String,
413    pub engine: Arc<Mutex<car_memgine::MemgineEngine>>,
414    pub upstream: Arc<dyn car_voice::VoiceEventSink>,
415}
416
417impl car_voice::VoiceEventSink for WsMemgineIngestSink {
418    fn send(&self, voice_session_id: &str, event_json: String) {
419        if let Ok(value) = serde_json::from_str::<Value>(&event_json) {
420            if let Some((speaker, text)) = car_meeting::extract_transcript_for_ingest(
421                &value,
422                &self.meeting_id,
423                voice_session_id,
424            ) {
425                let engine = self.engine.clone();
426                tokio::spawn(async move {
427                    let mut guard = engine.lock().await;
428                    guard.ingest_conversation(&speaker, &text, chrono::Utc::now());
429                });
430            }
431        }
432        self.upstream.send(voice_session_id, event_json);
433    }
434}
435
436/// Per-client session.
437pub struct ClientSession {
438    pub client_id: String,
439    pub runtime: Arc<Runtime>,
440    pub channel: Arc<WsChannel>,
441    pub host: Arc<crate::host::HostState>,
442    /// Memgine handle. Wrapped in `tokio::sync::Mutex` so dispatcher
443    /// handlers can hold the lock across `.await` points without
444    /// risking poisoning. Migrated from `std::sync::Mutex` in the
445    /// car-server-core extraction (U1) per the "one-wrapper rule".
446    pub memgine: Arc<Mutex<car_memgine::MemgineEngine>>,
447    /// Lazy browser session — first `browser.run` call launches Chromium,
448    /// subsequent calls reuse it so element IDs resolve across invocations
449    /// within the same WebSocket connection.
450    pub browser: car_ffi_common::browser::BrowserSessionSlot,
451    /// Per-connection auth state. Starts `false`; flips to `true`
452    /// after a successful `session.auth` handshake. Always considered
453    /// authenticated when `ServerState::auth_token` is unset (auth
454    /// disabled). Closes Parslee-ai/car-releases#32.
455    pub authenticated: std::sync::atomic::AtomicBool,
456    /// Host-management role (Parslee-ai/car#254). Starts `false`; flips
457    /// to `true` only when the connection presents the per-launch host
458    /// token via `session.auth { host_token }` (validated against
459    /// `ServerState::host_token`). `authorize_run_access` requires this
460    /// for cross-agent run-trace reads — being merely `host.subscribe`d
461    /// is no longer sufficient, which is what closes the self-elevation
462    /// hole. Cleared implicitly when the connection drops.
463    pub is_host: std::sync::atomic::AtomicBool,
464    /// Bound agent identity (#169). `Some(id)` once a lifecycle-agent
465    /// child has called `session.auth { token, agent_id }` and the
466    /// supervisor confirmed `agent_id` is supervised + token matches.
467    /// Used by `agents.list` to surface which managed agents have
468    /// actually attached vs. just being marked `Running` at the
469    /// process level. Cleared at disconnect by `remove_session`.
470    pub agent_id: tokio::sync::Mutex<Option<String>>,
471    /// Bound persistent memgine (#170). `Some` after `session.auth`
472    /// successfully attaches the connection to a daemon-owned
473    /// per-agent memgine (paired with `agent_id`). Memory handlers
474    /// route through [`ClientSession::effective_memgine`] which
475    /// returns this when set, falling back to the ephemeral
476    /// `memgine` field for browser/host/CLI connections.
477    pub bound_memgine: tokio::sync::Mutex<Option<Arc<Mutex<car_memgine::MemgineEngine>>>>,
478    /// The run currently bracketed on this connection (agent run
479    /// tracing, U1). Set by `runs.start` **before** that handler
480    /// responds, so the per-turn recorder (U2) always reads the
481    /// `run_id` the bracket established — no race across the
482    /// concurrently-spawned dispatch tasks (KTD3). Cleared by
483    /// `runs.complete`. On disconnect with a still-set current run and
484    /// no recorded terminal, the daemon marks it `Incomplete` (R5).
485    pub current_run_id: tokio::sync::Mutex<Option<String>>,
486}
487
488impl ClientSession {
489    /// Returns the memgine handle the memory.* handlers should use:
490    /// the bound per-agent memgine when this session attached via
491    /// `session.auth { agent_id }` (#169 + #170), otherwise the
492    /// ephemeral per-WS memgine. Cheap (one async lock + Arc clone).
493    pub async fn effective_memgine(&self) -> Arc<Mutex<car_memgine::MemgineEngine>> {
494        if let Some(eng) = self.bound_memgine.lock().await.as_ref() {
495            return eng.clone();
496        }
497        self.memgine.clone()
498    }
499}
500
501/// Builder for constructing a [`ServerState`] with embedder-supplied
502/// dependencies. Embedders (e.g. `tokhn-daemon`) use this to inject
503/// their own memgine handle and other shared infrastructure; the
504/// Approval-gate policy for high-risk WS methods.
505///
506/// Every method in `methods` must be acknowledged via
507/// `host.resolve_approval` before the dispatcher will route the
508/// request to its handler. The dispatcher waits up to `timeout` for
509/// a resolution; on timeout (or any non-`approve` resolution) the
510/// request fails with JSON-RPC error `-32003`.
511///
512/// Default: gate enabled, the macOS-automation surface
513/// (`automation.run_applescript`, `automation.shortcuts.run`,
514/// `messages.send`, `mail.send`, `vision.ocr`), 60-second timeout.
515/// `car-server --no-approvals` (or embedders calling
516/// [`ServerStateConfig::with_approval_gate`] with `enabled=false`)
517/// turns it off — only appropriate when no untrusted caller can
518/// reach the WS port.
519#[derive(Debug, Clone)]
520pub struct ApprovalGate {
521    /// Master switch. When `false`, every method dispatches without
522    /// raising an approval — the pre-2026-05 behaviour.
523    pub enabled: bool,
524    /// Methods that require approval. Match is by exact method-name
525    /// string against the JSON-RPC `method` field.
526    pub methods: std::collections::HashSet<String>,
527    /// How long to wait for the user to resolve the approval before
528    /// timing out and surfacing an error to the caller.
529    pub timeout: std::time::Duration,
530}
531
532impl Default for ApprovalGate {
533    fn default() -> Self {
534        let methods = [
535            "automation.run_applescript",
536            "automation.shortcuts.run",
537            "messages.send",
538            "mail.send",
539            "vision.ocr",
540        ]
541        .iter()
542        .map(|s| s.to_string())
543        .collect();
544        Self {
545            enabled: true,
546            methods,
547            timeout: std::time::Duration::from_secs(60),
548        }
549    }
550}
551
552impl ApprovalGate {
553    /// Disable the gate entirely. Equivalent to passing
554    /// `car-server --no-approvals`. Only appropriate when no
555    /// untrusted caller can reach the WS port.
556    pub fn disabled() -> Self {
557        Self {
558            enabled: false,
559            methods: std::collections::HashSet::new(),
560            timeout: std::time::Duration::from_secs(60),
561        }
562    }
563
564    /// `true` if this method must be acknowledged before dispatch.
565    pub fn requires_approval(&self, method: &str) -> bool {
566        self.enabled && self.methods.contains(method)
567    }
568}
569
570/// standalone `car-server` binary uses [`ServerState::standalone`]
571/// which calls `with_config` under the hood.
572pub struct ServerStateConfig {
573    pub journal_dir: PathBuf,
574    /// Optional pre-constructed memgine engine. When `None`, each
575    /// `create_session` call builds a fresh engine; embedders that want
576    /// to share a single engine across sessions can supply a clone of
577    /// their `Arc<Mutex<MemgineEngine>>` here.
578    pub shared_memgine: Option<Arc<Mutex<car_memgine::MemgineEngine>>>,
579    /// Optional pre-constructed inference engine.
580    pub inference: Option<Arc<car_inference::InferenceEngine>>,
581    /// Optional embedder-supplied A2A runtime. Used by the in-core
582    /// `A2aDispatcher` to execute peer-driven proposals. When `None`,
583    /// the dispatcher uses a fresh `Runtime` with `register_agent_basics`
584    /// — peer agents see CAR's built-in tools and nothing else,
585    /// matching the behaviour of the standalone `start_a2a_listener`.
586    pub a2a_runtime: Option<Arc<car_engine::Runtime>>,
587    /// Optional embedder-supplied A2A task store. When `None`,
588    /// defaults to `InMemoryTaskStore`. tokhn-style embedders that
589    /// want a polling-friendly persistent store plug it in here.
590    pub a2a_store: Option<Arc<dyn car_a2a::TaskStore>>,
591    /// Optional embedder-supplied agent card factory. When `None`,
592    /// the dispatcher serves a card built from the A2A runtime's
593    /// tool schemas at construction time, advertising its public URL
594    /// as `ws://127.0.0.1:9100/` (the WS surface the dispatcher itself
595    /// is reachable on).
596    pub a2a_card_source: Option<Arc<car_a2a::AgentCardSource>>,
597    /// Approval-gate policy. When `None`, the dispatcher uses
598    /// [`ApprovalGate::default`] (gate ON, the macOS-automation
599    /// surface gated, 60s timeout). Pass
600    /// [`ApprovalGate::disabled`] to opt out — only appropriate
601    /// when no untrusted caller can reach the WS port.
602    pub approval_gate: Option<ApprovalGate>,
603}
604
605impl ServerStateConfig {
606    /// Minimal config suitable for the standalone car-server binary:
607    /// only the journal dir is required; everything else is lazily
608    /// constructed at first use.
609    pub fn new(journal_dir: PathBuf) -> Self {
610        Self {
611            journal_dir,
612            shared_memgine: None,
613            inference: None,
614            a2a_runtime: None,
615            a2a_store: None,
616            a2a_card_source: None,
617            approval_gate: None,
618        }
619    }
620
621    pub fn with_shared_memgine(mut self, engine: Arc<Mutex<car_memgine::MemgineEngine>>) -> Self {
622        self.shared_memgine = Some(engine);
623        self
624    }
625
626    pub fn with_inference(mut self, engine: Arc<car_inference::InferenceEngine>) -> Self {
627        self.inference = Some(engine);
628        self
629    }
630
631    /// Plug in an embedder-supplied runtime for the A2A dispatcher.
632    /// Use case: tokhn-daemon wants peers to see its OPA preflight
633    /// tooling, not just CAR's `register_agent_basics` defaults.
634    pub fn with_a2a_runtime(mut self, runtime: Arc<car_engine::Runtime>) -> Self {
635        self.a2a_runtime = Some(runtime);
636        self
637    }
638
639    /// Plug in an embedder-supplied task store for the A2A
640    /// dispatcher. Use case: tokhn's polling-friendly persistent
641    /// store keyed by their session id.
642    pub fn with_a2a_store(mut self, store: Arc<dyn car_a2a::TaskStore>) -> Self {
643        self.a2a_store = Some(store);
644        self
645    }
646
647    /// Plug in an embedder-supplied agent card factory. The factory
648    /// is invoked on every `agent/getAuthenticatedExtendedCard`
649    /// dispatch, so embedders can reflect runtime tool changes.
650    pub fn with_a2a_card_source(mut self, source: Arc<car_a2a::AgentCardSource>) -> Self {
651        self.a2a_card_source = Some(source);
652        self
653    }
654
655    /// Override the approval-gate policy. Pass
656    /// [`ApprovalGate::disabled`] to skip the gate entirely (only
657    /// appropriate when no untrusted caller can reach the WS port);
658    /// pass a customised [`ApprovalGate`] to add or remove methods
659    /// or to change the timeout.
660    pub fn with_approval_gate(mut self, gate: ApprovalGate) -> Self {
661        self.approval_gate = Some(gate);
662        self
663    }
664}
665
666/// Global server state shared across all connections.
667pub struct ServerState {
668    pub journal_dir: PathBuf,
669    pub sessions: Mutex<HashMap<String, Arc<ClientSession>>>,
670    pub inference: std::sync::OnceLock<Arc<car_inference::InferenceEngine>>,
671    pub host: Arc<crate::host::HostState>,
672    /// When `Some`, `create_session` clones this handle into every new
673    /// `ClientSession.memgine` — embedders that want a single shared
674    /// memgine across all WS sessions set this. Standalone car-server
675    /// leaves it `None`, which gives each session its own engine
676    /// (preserving today's behavior).
677    pub shared_memgine: Option<Arc<Mutex<car_memgine::MemgineEngine>>>,
678    /// Process-wide voice session registry. Each
679    /// `voice.transcribe_stream.start` call registers its own per-client
680    /// [`WsVoiceEventSink`] so events route back to the originating WS
681    /// connection only.
682    pub voice_sessions: Arc<car_voice::VoiceSessionRegistry>,
683    /// Process-wide meeting registry. Meeting ids are global; each
684    /// meeting binds to the originating client's WS for upstream
685    /// events but persists transcripts to the resolved
686    /// `.car/meetings/<id>/` regardless of which client started it.
687    pub meetings: Arc<car_meeting::MeetingRegistry>,
688    /// Process-wide A2UI surface store. Agent-produced surfaces are
689    /// visible to every host UI subscriber, independent of the
690    /// WebSocket session that applied the update.
691    pub a2ui: car_a2ui::A2uiSurfaceStore,
692    /// In-process UI-improvement agent. Invoked from
693    /// `handle_a2ui_render_report` with each inbound report; returned
694    /// `Decision::Patch` envelopes are applied via the standard
695    /// `apply_a2ui_envelope` path so all subscribers see the patch.
696    /// `Arc` so the agent's interior `DashMap` state survives across
697    /// handler calls even when `ServerState` is cheap-cloned.
698    pub ui_agent: Arc<car_ui_agent::UIImprovementAgent>,
699    /// Per-surface oscillation detector for the UI-improvement
700    /// loop. Sits between the agent's `Decision::Patch` and the
701    /// apply path so A→B→A patch cycles get cooled down without
702    /// the agent itself having to track history. neo's review:
703    /// "controllers use workqueue backoff; reconcilers stay
704    /// stateless."
705    pub ui_agent_oscillation: Arc<crate::ui_agent_loop::OscillationDetector>,
706    /// Per-surface iteration budget. Backstop against runaway
707    /// loops the oscillation detector misses — caps total agent-
708    /// driven patches per surface at `DEFAULT_MAX_ITERATIONS`.
709    pub ui_agent_budget: Arc<crate::ui_agent_loop::IterationBudget>,
710    /// Process-wide concurrency gate for inference RPC handlers. Sized
711    /// from host RAM at startup, overridable via
712    /// [`crate::admission::ENV_MAX_CONCURRENT`]. Without this, N
713    /// concurrent users multiply KV-cache and activation memory and
714    /// take the host out (#114-adjacent: filed alongside the daemon
715    /// always-on rework). The semaphore lives on `ServerState` so it
716    /// is shared across every WebSocket session in the same process.
717    pub admission: Arc<crate::admission::InferenceAdmission>,
718    /// Server-side A2A continuation auth keyed by A2UI surface id.
719    /// Kept out of `A2uiSurface.owner` so host renderers never see
720    /// bearer/API-key material.
721    pub a2ui_route_auth: Mutex<HashMap<String, A2aRouteAuth>>,
722    /// Lifecycle-managed agents — declarative manifest at
723    /// `~/.car/agents.json` driving spawn/restart/stop. Closes
724    /// Parslee-ai/car-releases#27. Lazy-initialized so embedders that
725    /// don't want process supervision don't pay the disk-touch cost
726    /// at server start.
727    pub supervisor: std::sync::OnceLock<Arc<car_registry::supervisor::Supervisor>>,
728    /// Manifest path this daemon is *observing* but does NOT own.
729    /// Set by `car-server` when boot-time supervisor construction
730    /// fails with [`car_registry::supervisor::SupervisorError::AlreadyRunning`]
731    /// — another car-server process on the host holds the exclusive
732    /// lock on this manifest. In that state, `supervisor()` returns a
733    /// clear "observe-only" error so mutation handlers refuse
734    /// (preventing the duplicate-spawn bug from
735    /// Parslee-ai/car-releases#44), while read-only handlers
736    /// (`agents.list`, `agents.health`) fall back to
737    /// [`car_registry::supervisor::Supervisor::list_from_manifest`] /
738    /// [`car_registry::supervisor::Supervisor::health_from_manifest`]
739    /// so operators can still inspect what the primary daemon is
740    /// supervising.
741    pub observer_manifest_path: std::sync::OnceLock<PathBuf>,
742    /// In-core A2A dispatcher — embedders that consume `car-server-core`
743    /// get A2A reachability "for free" without standing up a separate
744    /// HTTP listener. Closes Parslee-ai/car-releases#28. Lazy-init so
745    /// the embedder can override the runtime / task store / agent card
746    /// via [`ServerStateConfig::with_a2a_runtime`] etc. before the
747    /// first dispatch.
748    pub a2a_dispatcher: std::sync::OnceLock<Arc<car_a2a::A2aDispatcher>>,
749    /// WS clients subscribed to A2UI envelope events. After every
750    /// successful `a2ui.apply` / `a2ui.ingest`, the resulting
751    /// `A2uiApplyResult` is broadcast to every subscriber as an
752    /// `a2ui.event` JSON-RPC notification. Closes
753    /// Parslee-ai/car-releases#29. Subscribers register via the
754    /// `a2ui/subscribe` method and are auto-cleaned on WS disconnect.
755    pub a2ui_subscribers: Mutex<HashMap<String, Arc<WsChannel>>>,
756    /// Per-launch auth token. When `Some`, the WS dispatcher rejects
757    /// non-auth methods on unauthenticated sessions until the client
758    /// calls `session.auth` with the matching value. When `None`,
759    /// auth is disabled and every connection works as before. Set
760    /// at startup by `car-server` unless `--no-auth` is passed
761    /// (default flipped 2026-05); embedders that want to enable
762    /// auth call [`ServerState::install_auth_token`]. Closes
763    /// Parslee-ai/car-releases#32.
764    pub auth_token: std::sync::OnceLock<String>,
765    /// Per-launch **host** token — a credential distinct from
766    /// `auth_token`, granting the host-management role (host-class
767    /// reads like cross-agent run traces). Critically it is **never**
768    /// served over `GET /auth-token`: a session becomes host-role only
769    /// by presenting this via `session.auth { host_token }`, and the
770    /// only way to obtain it is reading the `0600` `host-token` file,
771    /// which a different local user cannot. This is what stops any
772    /// authenticated local client from self-elevating to host and
773    /// reading every agent's run traces (Parslee-ai/car#254). When
774    /// `None`, host-role can't be granted (no host reads).
775    pub host_token: std::sync::OnceLock<String>,
776    /// Parslee cloud identity loaded from the user's OS keychain at
777    /// daemon startup when `car auth login` has been completed.
778    pub parslee_session: std::sync::OnceLock<crate::parslee_auth::ParsleeSession>,
779    /// `agent_id -> client_id` map of currently-attached lifecycle
780    /// agents (#169). Populated by the `session.auth` handler when a
781    /// supervised child presents its `agent_id` + per-agent token;
782    /// drained on disconnect by `remove_session`. Single-claim:
783    /// a second connection presenting the same `agent_id` is
784    /// rejected so the daemon-side per-agent state stays unambiguous.
785    pub attached_agents: Mutex<HashMap<String, String>>,
786    /// `agent_id -> persistent memgine` map (#170). Lazy-loaded on
787    /// first connection per id from `~/.car/memory/agents/<id>.jsonl`,
788    /// retained across daemon restart, surviving any single
789    /// disconnect/reconnect of the supervised child. Connections
790    /// that auth without an `agent_id` (browser, host, ad-hoc CLI)
791    /// keep the per-WS ephemeral memgine on `ClientSession.memgine`
792    /// — no behaviour change.
793    pub agent_memgines: Mutex<HashMap<String, Arc<Mutex<car_memgine::MemgineEngine>>>>,
794    /// In-flight `agents.chat` sessions keyed by `session_id`. See
795    /// [`ChatSession`] for shape. Populated by `agents.chat`,
796    /// cleared on terminal `agent.chat.event` or
797    /// `agents.chat.cancel`. Disconnect cleanup happens in
798    /// `remove_session` — any in-flight session bound to either the
799    /// disconnecting host or agent client is dropped so subsequent
800    /// stray notifications from a respawned agent fall on the floor
801    /// rather than racing into a stale stream.
802    pub chat_sessions: Mutex<HashMap<String, ChatSession>>,
803    /// Agent runs keyed by `run_id` (agent run tracing, U1). Process-
804    /// wide (not per-session) so a run's record outlives the WS
805    /// connection that produced it — the durable, connection-
806    /// independent boundary `client_id` cannot be (R1). Populated by
807    /// `runs.start`, made terminal by `runs.complete`, and swept to
808    /// `Incomplete` on a mid-run disconnect past the grace window
809    /// (R5). U2/U3 build the per-turn recorder and disk store on top
810    /// of this registry.
811    pub runs: Mutex<HashMap<String, RunMeta>>,
812    /// Live `runs.trace.event` subscribers keyed by `(run_id,
813    /// host_client_id)` (agent run tracing, U4). Each value is a
814    /// [`crate::host::RunTraceSubscriber`] — the producer side of a
815    /// bounded channel whose dedicated drain task writes frames to that
816    /// connection's WS. Two CarHost windows on one run register two
817    /// distinct entries (explicit fanout — the built-in notification
818    /// registry is single-subscriber-per-method).
819    ///
820    /// **Lock contract (invariant #1):** `runs.subscribe` snapshots the
821    /// run's turns AND inserts the subscriber while holding the
822    /// [`runs`](Self::runs) lock; the recorder (`record_run_turns`),
823    /// `start_run`, `complete_run`, and `mark_run_incomplete` append to
824    /// `runs` and push to `run_subscribers` while holding the SAME
825    /// `runs` lock — so snapshot/register and append/notify are
826    /// serialized. No turn appended in the snapshot/register window is
827    /// dropped (gap) or double-delivered (dup). The lock order is always
828    /// `runs` → `run_subscribers`; never the reverse.
829    pub run_subscribers: Mutex<HashMap<(String, String), crate::host::RunTraceSubscriber>>,
830    /// Per-run disk-write serialization (FIX 6). The `runs` lock is
831    /// released before [`record_run_turns`](Self::record_run_turns) does
832    /// its blocking disk append (so I/O never stalls the global registry),
833    /// so without this two concurrent batches for the same `run_id` could
834    /// interleave their appended records on disk. Each run gets its own
835    /// `Mutex<()>` held only across the append; entries are reclaimed when
836    /// the run leaves the registry. Defense-in-depth: the canonical harness
837    /// is sequential, but a misbehaving caller must not corrupt the trace.
838    run_write_locks: Mutex<HashMap<String, Arc<Mutex<()>>>>,
839    /// Disk-backed run-trace store (agent run tracing, U3). Source of
840    /// truth for REPLAY (U5) — persists each run's `RunStarted`, turns,
841    /// and terminal record as JSONL under `~/.car/runs/{agent_id}/` so a
842    /// run survives daemon restarts (R4), bounded by retention/GC (R6)
843    /// and protected at rest with `0600`/`0700` perms + backup exclusion
844    /// (R14). The in-memory [`runs`](Self::runs) buffer stays the source
845    /// for the LIVE stream (U4); this store mirrors what was recorded.
846    /// Derived from `journal_dir` at construction (sibling `runs/`).
847    pub run_store: crate::run_store::RunStore,
848    /// Bound MCP HTTP-streamable URL (e.g.
849    /// `"http://127.0.0.1:9102/mcp"`) — `car-server` installs this
850    /// after binding the listener. Used by the
851    /// `agents.invoke_external` handler to default
852    /// `InvokeOptions.mcp_endpoint` so external agents
853    /// (Claude Code today) load the daemon's CAR namespace via
854    /// `--mcp-config` automatically. `None` when MCP isn't bound
855    /// (e.g. `--mcp-bind disabled`).
856    pub mcp_url: std::sync::OnceLock<String>,
857    /// Registry of connected MCP SSE sessions. Populated alongside
858    /// [`mcp_url`] when `car-server` boots the MCP listener. Public
859    /// so handlers can call `crate::mcp::push_to_session` to send
860    /// server-initiated requests to a specific MCP-connected
861    /// client (MCP-3 foundation; MCP-3b will wire host-owned tool
862    /// dispatch through this).
863    pub mcp_sessions: std::sync::OnceLock<Arc<crate::mcp::SessionMap>>,
864    /// Approval gate for high-risk WS methods (audit 2026-05). The
865    /// gate intercepts `automation.run_applescript`,
866    /// `automation.shortcuts.run`, `messages.send`, `mail.send`, and
867    /// `vision.ocr` before they dispatch, raises a
868    /// `host.create_approval` for the user to act on, and waits
869    /// (with a timeout) for `host.resolve_approval`. Approve →
870    /// dispatch continues; deny / timeout → JSON-RPC error code
871    /// `-32003`. The set of gated methods and the wait timeout are
872    /// embedder-overridable via
873    /// [`ServerStateConfig::with_approval_gate`].
874    pub approval_gate: ApprovalGate,
875    /// A2A-runtime / store / card factory carried over from the
876    /// embedder's [`ServerStateConfig`]. Consumed lazily on first
877    /// `a2a_dispatcher()` call so embedders can construct
878    /// `ServerState` without paying the runtime spin-up cost when
879    /// they don't actually use the A2A surface.
880    pub(crate) a2a_runtime: std::sync::Mutex<Option<Arc<car_engine::Runtime>>>,
881    pub(crate) a2a_store: std::sync::Mutex<Option<Arc<dyn car_a2a::TaskStore>>>,
882    pub(crate) a2a_card_source: std::sync::Mutex<Option<Arc<car_a2a::AgentCardSource>>>,
883}
884
885impl ServerState {
886    /// Constructor for the standalone `car-server` binary. Each WS
887    /// connection gets its own per-session memgine — matches the
888    /// pre-extraction default and is correct for a single-process
889    /// daemon serving one user at a time.
890    ///
891    /// **Embedders must not call this.** It silently leaves
892    /// `shared_memgine = None`, which re-introduces the dual-memgine
893    /// bug U7 was created to prevent (one engine in the embedder, a
894    /// fresh one inside every WS session). Embedders use
895    /// [`ServerState::embedded`] instead, which makes the shared
896    /// engine handle a required argument so it cannot be forgotten.
897    pub fn standalone(journal_dir: PathBuf) -> Self {
898        Self::with_config(ServerStateConfig::new(journal_dir))
899    }
900
901    /// Constructor for embedders (e.g. `tokhn-daemon`). The shared
902    /// memgine handle is **required**: every WS session created by
903    /// this state will reuse the same engine, preventing the
904    /// dual-memgine bug.
905    ///
906    /// For embedders that also want to inject a pre-warmed inference
907    /// engine or other advanced wiring, build a [`ServerStateConfig`]
908    /// directly and call [`ServerState::with_config`].
909    pub fn embedded(
910        journal_dir: PathBuf,
911        shared_memgine: Arc<Mutex<car_memgine::MemgineEngine>>,
912    ) -> Self {
913        Self::with_config(ServerStateConfig::new(journal_dir).with_shared_memgine(shared_memgine))
914    }
915
916    /// Build a `ServerState` from a [`ServerStateConfig`] — the path
917    /// embedders use when they need to inject a shared memgine *and*
918    /// a pre-warmed inference engine, or any other advanced wiring
919    /// the convenience constructors don't cover.
920    pub fn with_config(cfg: ServerStateConfig) -> Self {
921        let inference = std::sync::OnceLock::new();
922        if let Some(eng) = cfg.inference {
923            // OnceLock::set returns Err if already set — fresh OnceLock
924            // means it's empty, so this is infallible here.
925            let _ = inference.set(eng);
926        }
927        let voice_sessions = Arc::new(car_voice::VoiceSessionRegistry::new());
928        // Reap sessions whose clients dropped without calling
929        // voice.transcribe_stream.stop (WS disconnect, process exit,
930        // etc.). Listener handles otherwise leak for the daemon's
931        // lifetime. `with_config` is sync but always called from the
932        // `#[tokio::main]` entry point, so `Handle::try_current()`
933        // inside `start_sweeper` finds the runtime.
934        voice_sessions.start_sweeper();
935        // UI-improvement agent is pure decision logic — no I/O, no
936        // persistence handle. Memgine ingest of strategy outcomes is
937        // the caller's responsibility (handler.rs after a successful
938        // Decision::Patch). Keeps the agent crate Mutex-flavor
939        // agnostic so it can compose with std/tokio mutex callers.
940        let ui_agent = Arc::new(car_ui_agent::UIImprovementAgent::with_default_strategies());
941        let ui_agent_oscillation = Arc::new(crate::ui_agent_loop::OscillationDetector::new());
942        let ui_agent_budget = Arc::new(crate::ui_agent_loop::IterationBudget::new());
943        // Disk-backed run-trace store (U3). Rooted at the `runs/` sibling
944        // of the journal dir (`~/.car/runs`), with retention read from
945        // `~/.car/config.toml`. Boot-time GC enforces the retention cap
946        // (R6) — best-effort; a failed GC must never block startup, so the
947        // count is dropped. Never evicts an in-progress run.
948        let run_store = crate::run_store::RunStore::from_journal_dir(&cfg.journal_dir);
949        // Adopt crash-orphaned runs BEFORE gc() (FIX 4). The in-memory map is
950        // empty at boot, so any on-disk InProgress run is a crashed prior
951        // process; mark it Incomplete so it's terminal and age-GC-eligible —
952        // otherwise it leaks forever (GC never evicts in-progress runs).
953        let _adopted = run_store.adopt_orphans();
954        let _evicted = run_store.gc();
955        Self {
956            journal_dir: cfg.journal_dir,
957            sessions: Mutex::new(HashMap::new()),
958            inference,
959            host: Arc::new(crate::host::HostState::new()),
960            shared_memgine: cfg.shared_memgine,
961            voice_sessions,
962            meetings: Arc::new(car_meeting::MeetingRegistry::new()),
963            a2ui: car_a2ui::A2uiSurfaceStore::new(),
964            ui_agent,
965            ui_agent_oscillation,
966            ui_agent_budget,
967            admission: Arc::new(crate::admission::InferenceAdmission::new()),
968            a2ui_route_auth: Mutex::new(HashMap::new()),
969            supervisor: std::sync::OnceLock::new(),
970            observer_manifest_path: std::sync::OnceLock::new(),
971            a2a_dispatcher: std::sync::OnceLock::new(),
972            a2a_runtime: std::sync::Mutex::new(cfg.a2a_runtime),
973            a2a_store: std::sync::Mutex::new(cfg.a2a_store),
974            a2a_card_source: std::sync::Mutex::new(cfg.a2a_card_source),
975            a2ui_subscribers: Mutex::new(HashMap::new()),
976            auth_token: std::sync::OnceLock::new(),
977            host_token: std::sync::OnceLock::new(),
978            parslee_session: std::sync::OnceLock::new(),
979            attached_agents: Mutex::new(HashMap::new()),
980            agent_memgines: Mutex::new(HashMap::new()),
981            chat_sessions: Mutex::new(HashMap::new()),
982            runs: Mutex::new(HashMap::new()),
983            run_subscribers: Mutex::new(HashMap::new()),
984            run_write_locks: Mutex::new(HashMap::new()),
985            run_store,
986            mcp_url: std::sync::OnceLock::new(),
987            mcp_sessions: std::sync::OnceLock::new(),
988            approval_gate: cfg.approval_gate.unwrap_or_default(),
989        }
990    }
991
992    /// Enable the per-launch auth handshake. After this call, every
993    /// new WS connection must call `session.auth` with `token` as
994    /// the first frame; otherwise the connection is closed. Called
995    /// by `car-server` at startup unless `--no-auth` is set
996    /// (default flipped 2026-05); embedders supply their own token
997    /// if they want the same posture. Returns `Err(token)` when
998    /// auth was already installed.
999    pub fn install_auth_token(&self, token: String) -> Result<(), String> {
1000        self.auth_token.set(token)
1001    }
1002
1003    /// Install the per-launch host token (Parslee-ai/car#254). A
1004    /// session that later presents this via `session.auth { host_token }`
1005    /// is granted the host-management role (`ClientSession::is_host`),
1006    /// which `authorize_run_access` requires for cross-agent run-trace
1007    /// reads. Set by `car-server` at startup (mints + writes the `0600`
1008    /// `host-token` file) unless `--no-auth` is set. Returns `Err(token)`
1009    /// when a host token was already installed.
1010    pub fn install_host_token(&self, token: String) -> Result<(), String> {
1011        self.host_token.set(token)
1012    }
1013
1014    pub fn install_parslee_session(
1015        &self,
1016        session: crate::parslee_auth::ParsleeSession,
1017    ) -> Result<(), crate::parslee_auth::ParsleeSession> {
1018        self.parslee_session.set(session)
1019    }
1020
1021    /// Install the bound MCP URL after car-server's listener is up.
1022    /// Idempotent on the first call; subsequent calls are accepted
1023    /// silently (matches the supervisor / a2a_dispatcher install
1024    /// idiom). Returns `Err(())` when an MCP URL was already
1025    /// installed — embedders should treat this as "another
1026    /// component beat us to it" and use whichever value is now set.
1027    pub fn install_mcp_url(&self, url: String) -> Result<(), String> {
1028        self.mcp_url.set(url)
1029    }
1030
1031    /// Install the MCP SSE session registry. Pairs with
1032    /// [`install_mcp_url`] — both come from the same `start_mcp`
1033    /// call and either both get installed or neither does (the
1034    /// daemon binds them together).
1035    pub fn install_mcp_sessions(
1036        &self,
1037        sessions: Arc<crate::mcp::SessionMap>,
1038    ) -> Result<(), Arc<crate::mcp::SessionMap>> {
1039        self.mcp_sessions.set(sessions)
1040    }
1041
1042    /// Lazy-initialize and return the agent supervisor. The first
1043    /// call constructs a [`car_registry::supervisor::Supervisor`] backed by
1044    /// `~/.car/agents.json` + `~/.car/logs/`. Embedders that need a
1045    /// non-default location should call
1046    /// [`ServerState::install_supervisor`] before any handler runs.
1047    ///
1048    /// In observer mode (set via [`install_observer_manifest`]),
1049    /// returns a clear error mentioning the manifest path the
1050    /// primary daemon owns. This prevents the second daemon from
1051    /// re-attempting `user_default()` (which would also fail with
1052    /// `AlreadyRunning`) on every WS call, and gives mutation
1053    /// handlers a stable refusal path. Read-only handlers
1054    /// (`agents.list`, `agents.health`) should call
1055    /// [`Self::observer_manifest_path`] first and fall back to
1056    /// [`car_registry::supervisor::Supervisor::list_from_manifest`] /
1057    /// `health_from_manifest` when set. Closes
1058    /// Parslee-ai/car-releases#44.
1059    pub fn supervisor(&self) -> Result<Arc<car_registry::supervisor::Supervisor>, String> {
1060        if let Some(s) = self.supervisor.get() {
1061            return Ok(s.clone());
1062        }
1063        if let Some(p) = self.observer_manifest_path.get() {
1064            return Err(format!(
1065                "this car-server is observe-only — another car-server process \
1066                 holds the supervisor lock for {}. Mutations refuse here; route \
1067                 them to the primary daemon, or stop the other car-server first.",
1068                p.display()
1069            ));
1070        }
1071        let s = car_registry::supervisor::Supervisor::user_default()
1072            .map(Arc::new)
1073            .map_err(|e| e.to_string())?;
1074        // OnceLock::set returns the original arg back on collision —
1075        // a concurrent caller racing through user_default. Take
1076        // whichever wins.
1077        let _ = self.supervisor.set(s);
1078        Ok(self.supervisor.get().expect("set or pre-existing").clone())
1079    }
1080
1081    /// Replace the lazy default with a caller-supplied supervisor.
1082    /// Returns `Err(())` when a supervisor was already installed.
1083    /// Used by the standalone `car-server` binary to call
1084    /// `start_all()` on a known-good handle without paying the
1085    /// lazy-init lookup cost.
1086    pub fn install_supervisor(
1087        &self,
1088        supervisor: Arc<car_registry::supervisor::Supervisor>,
1089    ) -> Result<(), Arc<car_registry::supervisor::Supervisor>> {
1090        self.supervisor.set(supervisor)
1091    }
1092
1093    /// Non-acquiring read of the currently-installed supervisor.
1094    /// Unlike [`supervisor`](Self::supervisor), this does NOT lazy-
1095    /// init via `user_default()` — it returns `None` instead of
1096    /// constructing a fresh `Supervisor` and acquiring the
1097    /// `<manifest>.lock` as a side effect. Use this from read-only
1098    /// metadata paths (`host.subscribe` identity, status surfaces)
1099    /// where causing lock acquisition on observation would be a
1100    /// Heisenberg subscribe — the act of asking "do you own the
1101    /// lock?" must not be the act of taking it.
1102    pub fn supervisor_if_installed(&self) -> Option<Arc<car_registry::supervisor::Supervisor>> {
1103        self.supervisor.get().cloned()
1104    }
1105
1106    /// Mark this daemon as *observing* a manifest owned by another
1107    /// car-server process. After this call, `supervisor()` returns
1108    /// an "observe-only" error and read-only handlers
1109    /// (`agents.list`, `agents.health`) fall back to the static
1110    /// `Supervisor::list_from_manifest` / `health_from_manifest`
1111    /// paths. Idempotent — subsequent calls with the same path are
1112    /// no-ops; a different path returns `Err(())`. Closes
1113    /// Parslee-ai/car-releases#44.
1114    pub fn install_observer_manifest(&self, path: PathBuf) -> Result<(), PathBuf> {
1115        self.observer_manifest_path.set(path)
1116    }
1117
1118    /// Path of the manifest this daemon is observing but not
1119    /// supervising. `None` when this daemon owns the supervisor
1120    /// (the normal case) or when no manifest is configured at all
1121    /// (no `HOME`, embedder didn't install one).
1122    pub fn observer_manifest_path(&self) -> Option<&PathBuf> {
1123        self.observer_manifest_path.get()
1124    }
1125
1126    /// Lazy-initialize and return the in-core A2A dispatcher. The
1127    /// first call constructs an [`car_a2a::A2aDispatcher`] from
1128    /// either the embedder's overrides (set via
1129    /// [`ServerStateConfig::with_a2a_runtime`] / `with_a2a_store` /
1130    /// `with_a2a_card_source`) or sensible defaults: a fresh
1131    /// `Runtime` with `register_agent_basics` registered, an
1132    /// `InMemoryTaskStore`, and a card built from the runtime's
1133    /// tool schemas advertising `ws://127.0.0.1:9100/` as the
1134    /// public URL. Closes Parslee-ai/car-releases#28.
1135    pub async fn a2a_dispatcher(&self) -> Arc<car_a2a::A2aDispatcher> {
1136        if let Some(d) = self.a2a_dispatcher.get() {
1137            return d.clone();
1138        }
1139
1140        // Embedder overrides take precedence; fall back to defaults
1141        // for each slot independently (so an embedder that only
1142        // wants a custom card can leave the runtime + store at
1143        // defaults). `Mutex::take()` consumes the slot so the
1144        // defaults aren't reconstructed on a racing init that loses
1145        // the OnceLock::set call below.
1146        let runtime = self
1147            .a2a_runtime
1148            .lock()
1149            .expect("a2a_runtime mutex poisoned")
1150            .take();
1151        let runtime = match runtime {
1152            Some(r) => r,
1153            None => {
1154                let r = Arc::new(car_engine::Runtime::new());
1155                r.register_agent_basics().await;
1156                r
1157            }
1158        };
1159
1160        let store = self
1161            .a2a_store
1162            .lock()
1163            .expect("a2a_store mutex poisoned")
1164            .take()
1165            .unwrap_or_else(|| Arc::new(car_a2a::InMemoryTaskStore::new()));
1166
1167        let card_source = self
1168            .a2a_card_source
1169            .lock()
1170            .expect("a2a_card_source mutex poisoned")
1171            .take();
1172        let card_source = match card_source {
1173            Some(c) => c,
1174            None => {
1175                let card = car_a2a::build_default_agent_card(
1176                    &runtime,
1177                    car_a2a::AgentCardConfig::minimal(
1178                        "Common Agent Runtime",
1179                        "Embedded CAR daemon — A2A v1.0 reachable over WebSocket JSON-RPC.",
1180                        "ws://127.0.0.1:9100/",
1181                        car_a2a::AgentProvider {
1182                            organization: "Parslee".into(),
1183                            url: Some("https://github.com/Parslee-ai/car".into()),
1184                        },
1185                    ),
1186                )
1187                .await;
1188                Arc::new(move || card.clone()) as Arc<car_a2a::AgentCardSource>
1189            }
1190        };
1191
1192        let dispatcher = Arc::new(car_a2a::A2aDispatcher::new(runtime, store, card_source));
1193        // OnceLock::set returns Err on race — accept whichever
1194        // dispatcher won and clone-return that one.
1195        let _ = self.a2a_dispatcher.set(dispatcher);
1196        self.a2a_dispatcher
1197            .get()
1198            .expect("a2a_dispatcher set or pre-existing")
1199            .clone()
1200    }
1201
1202    /// Record the start of a run and return its [`RunMeta`]. The
1203    /// `run_id` is minted by the caller (the `runs.start` handler)
1204    /// so it can set `session.current_run_id` to the same value
1205    /// **before** responding (KTD3). Idempotent collision on an
1206    /// already-present `run_id` is treated as a fresh insert (uuids
1207    /// don't collide in practice; if one did, the latest start wins).
1208    ///
1209    /// U3: also writes the `RunStarted` line to the disk store + creates
1210    /// the run file, so the run is on disk from the first record (REPLAY
1211    /// survives a restart even before any turn lands). Disk failures are
1212    /// logged, never fatal — the in-memory registry is the live path.
1213    /// Fan one `runs.trace.event` out to every live subscriber of
1214    /// `run_id` (agent run tracing, U4). Called by the lifecycle methods
1215    /// **while they hold the `runs` lock** so append-and-notify is
1216    /// serialized with subscribe's snapshot-and-register (invariant #1).
1217    ///
1218    /// Each push is a non-blocking `try_send` onto the subscriber's
1219    /// bounded channel — the WS socket is written only by that
1220    /// subscriber's dedicated drain task, never here (invariant #2). A
1221    /// full channel drops the event (the slow-subscriber case); the
1222    /// client detects the cursor gap and re-subscribes (R8). Takes the
1223    /// already-acquired subscribers guard so the caller controls the lock
1224    /// scope and the `runs` → `run_subscribers` order.
1225    fn fanout_locked(
1226        subscribers: &HashMap<(String, String), crate::host::RunTraceSubscriber>,
1227        run_id: &str,
1228        agent_id: &str,
1229        record: car_proto::RunRecord,
1230        cursor: usize,
1231        status: car_proto::RunLiveStatus,
1232    ) {
1233        for ((sub_run, _client), sub) in subscribers.iter() {
1234            if sub_run != run_id {
1235                continue;
1236            }
1237            let event = car_proto::RunTraceEvent {
1238                run_id: run_id.to_string(),
1239                agent_id: agent_id.to_string(),
1240                record: record.clone(),
1241                cursor,
1242                status,
1243            };
1244            // Best-effort: a wedged subscriber's full channel drops the
1245            // event rather than stalling the producer. Logged at debug —
1246            // the client backfills via re-subscribe.
1247            if !sub.push(event) {
1248                tracing::debug!(
1249                    run_id,
1250                    "run-trace: dropped event for slow subscriber (channel full)"
1251                );
1252            }
1253        }
1254    }
1255
1256    pub async fn start_run(&self, meta: RunMeta) {
1257        let started = car_proto::RunStarted {
1258            run_id: meta.run_id.clone(),
1259            agent_id: meta.agent_id.clone(),
1260            intent: meta.intent.clone(),
1261            outcome_description: meta.outcome_description.clone(),
1262            started_at: meta.started_at,
1263        };
1264        // Insert under the runs lock; while still holding it, fan the
1265        // `Started` lifecycle event out to any subscribers. In practice
1266        // there are none yet (a client can only subscribe by `run_id`
1267        // after the run exists), but emitting keeps the broadcast path
1268        // uniform and correct if a subscribe ever races the insert.
1269        {
1270            let mut runs = self.runs.lock().await;
1271            runs.insert(meta.run_id.clone(), meta);
1272            let subs = self.run_subscribers.lock().await;
1273            Self::fanout_locked(
1274                &subs,
1275                &started.run_id,
1276                &started.agent_id,
1277                car_proto::RunRecord::Started(started.clone()),
1278                0,
1279                car_proto::RunLiveStatus::InProgress,
1280            );
1281        }
1282        if let Err(e) = self.run_store.write_started(&started) {
1283            tracing::warn!(run_id = %started.run_id, error = %e, "run-store: failed to persist RunStarted");
1284        }
1285    }
1286
1287    /// Make a run terminal with a harness-reported outcome
1288    /// (`runs.complete`). Returns `Err` if the `run_id` is unknown or
1289    /// already terminal — the handler maps that to a JSON-RPC error so
1290    /// a double-complete or stale id is visible, not silently swallowed.
1291    ///
1292    /// U3: also appends the terminal `RunEnded` line to the run's JSONL
1293    /// file so REPLAY (U5) reports the right status (Completed) after a
1294    /// restart. The disk write happens after the lock is released; disk
1295    /// failures are logged, never fatal.
1296    pub async fn complete_run(
1297        &self,
1298        run_id: &str,
1299        termination: car_proto::RunTermination,
1300    ) -> Result<(), String> {
1301        let ended = {
1302            let mut runs = self.runs.lock().await;
1303            let meta = runs
1304                .get_mut(run_id)
1305                .ok_or_else(|| format!("unknown run_id `{run_id}`"))?;
1306            if meta.is_terminal() {
1307                return Err(format!(
1308                    "run `{run_id}` is already terminal — cannot record another outcome"
1309                ));
1310            }
1311            let ended_at = chrono::Utc::now();
1312            meta.termination = Some(termination.clone());
1313            meta.ended_at = Some(ended_at);
1314            let ended = car_proto::RunEnded {
1315                run_id: run_id.to_string(),
1316                agent_id: meta.agent_id.clone(),
1317                termination,
1318                ended_at,
1319            };
1320            // Fan the terminal event out under the runs lock so a
1321            // subscriber that registered just before this transition sees
1322            // the terminal record exactly once, after its last turn.
1323            let cursor = meta.turn_cursor();
1324            let status = meta.live_status();
1325            let agent_id = meta.agent_id.clone();
1326            let subs = self.run_subscribers.lock().await;
1327            Self::fanout_locked(
1328                &subs,
1329                run_id,
1330                &agent_id,
1331                car_proto::RunRecord::Ended(ended.clone()),
1332                cursor,
1333                status,
1334            );
1335            ended
1336        };
1337        if let Err(e) = self.run_store.write_ended(&ended) {
1338            tracing::warn!(run_id, error = %e, "run-store: failed to persist RunEnded");
1339        }
1340        // Heap hygiene: the run is now terminal and fully flushed to disk,
1341        // and the terminal `runs.trace.event` has already fanned out to any
1342        // live subscriber. Drop the in-memory per-turn buffer (each entry
1343        // holds a full prompt + CLI output, potentially MBs) so completed
1344        // runs don't pin the heaviest payloads for the daemon's lifetime.
1345        // The lightweight `RunMeta` header (agent_id, status, started_at,
1346        // termination) stays resident so `run_meta`/status lookups still
1347        // work. A late `runs.subscribe` to this terminal run re-sources its
1348        // snapshot from disk (see `subscribe_run`). Only terminal runs are
1349        // cleared; an in-progress run keeps its turns (the live snapshot
1350        // source). `Vec::new()` frees the buffer's capacity, not just its
1351        // length.
1352        self.clear_terminal_run_turns(run_id).await;
1353        Ok(())
1354    }
1355
1356    /// Mark a run `Incomplete` (R5) — used by disconnect cleanup when a
1357    /// harness drops without `runs.complete`. No-op if the run is
1358    /// already terminal (the common healthy-close case where
1359    /// `runs.complete` won the race). Returns `true` if it actually
1360    /// wrote the `Incomplete` marker.
1361    ///
1362    /// U3: on the transition to `Incomplete`, appends the terminal
1363    /// `RunEnded { Incomplete }` line to disk so an orphaned run reports
1364    /// `Incomplete` (not `InProgress`) on REPLAY after a restart. Disk
1365    /// failures are logged, never fatal.
1366    pub async fn mark_run_incomplete(&self, run_id: &str) -> bool {
1367        let ended = {
1368            let mut runs = self.runs.lock().await;
1369            match runs.get_mut(run_id) {
1370                Some(meta) if !meta.is_terminal() => {
1371                    let ended_at = chrono::Utc::now();
1372                    meta.termination = Some(car_proto::RunTermination::Incomplete);
1373                    meta.ended_at = Some(ended_at);
1374                    let ended = car_proto::RunEnded {
1375                        run_id: run_id.to_string(),
1376                        agent_id: meta.agent_id.clone(),
1377                        termination: car_proto::RunTermination::Incomplete,
1378                        ended_at,
1379                    };
1380                    // Notify subscribers of the terminal Incomplete under
1381                    // the runs lock (same contract as complete_run).
1382                    let cursor = meta.turn_cursor();
1383                    let agent_id = meta.agent_id.clone();
1384                    let subs = self.run_subscribers.lock().await;
1385                    Self::fanout_locked(
1386                        &subs,
1387                        run_id,
1388                        &agent_id,
1389                        car_proto::RunRecord::Ended(ended.clone()),
1390                        cursor,
1391                        car_proto::RunLiveStatus::Incomplete,
1392                    );
1393                    Some(ended)
1394                }
1395                _ => None,
1396            }
1397        };
1398        match ended {
1399            Some(ended) => {
1400                if let Err(e) = self.run_store.write_ended(&ended) {
1401                    tracing::warn!(run_id, error = %e, "run-store: failed to persist Incomplete");
1402                }
1403                // Heap hygiene (same rationale as `complete_run`): this run
1404                // just became terminal (`Incomplete`) and is flushed to disk
1405                // with its terminal event fanned out. Drop the in-memory turn
1406                // buffer; a late subscriber re-sources from disk. Only runs
1407                // that actually transitioned reach here (`Some(ended)`); a
1408                // no-op (already-terminal or unknown) leaves nothing to clear.
1409                self.clear_terminal_run_turns(run_id).await;
1410                true
1411            }
1412            None => false,
1413        }
1414    }
1415
1416    /// Free the in-memory per-turn buffer of a TERMINAL run, keeping the
1417    /// lightweight `RunMeta` header resident (agent run tracing, heap
1418    /// hygiene follow-up). Called at the end of `complete_run` /
1419    /// `mark_run_incomplete`, AFTER the disk flush and terminal fanout, so
1420    /// the disk store remains the durable source of truth and any late
1421    /// `runs.subscribe` re-sources the snapshot from disk
1422    /// ([`Self::subscribe_run`]).
1423    ///
1424    /// Guards on `is_terminal()` so a same-id reuse that somehow re-opened
1425    /// the run (it shouldn't — ids are uuids) never has live turns dropped.
1426    /// `Vec::new()` releases the buffer's capacity, not just its length —
1427    /// the per-turn payloads (full prompt + CLI output) are the heavy part.
1428    async fn clear_terminal_run_turns(&self, run_id: &str) {
1429        let mut runs = self.runs.lock().await;
1430        if let Some(meta) = runs.get_mut(run_id) {
1431            if meta.is_terminal() {
1432                meta.turns = Vec::new();
1433            }
1434        }
1435        drop(runs);
1436        // FIX 6 hygiene: a terminal run accepts no more turns, so its
1437        // per-run disk-write lock is dead weight. Drop the map entry to keep
1438        // `run_write_locks` from growing unbounded. Any in-flight append
1439        // already holds its own `Arc<Mutex<()>>` clone, so removing the map
1440        // entry can't pull the rug out from under it.
1441        self.run_write_locks.lock().await.remove(run_id);
1442    }
1443
1444    /// Non-acquiring read of a run's current metadata (clone). Used by
1445    /// tests and by the U2 recorder to learn a run's owning `agent_id`.
1446    ///
1447    /// NOTE: this clones the ENTIRE `RunMeta`, including its `turns` buffer
1448    /// (full prompts + CLI output — up to the per-run caps). Hot callers
1449    /// that need only the run's header facts must use [`Self::run_header`]
1450    /// instead; cloning the whole buffer per batch RPC is a ~512 MB
1451    /// worst-case copy at this PR's caps (ADV-2).
1452    pub async fn run_meta(&self, run_id: &str) -> Option<RunMeta> {
1453        self.runs.lock().await.get(run_id).cloned()
1454    }
1455
1456    /// Lightweight, non-cloning read of a run's header facts under the
1457    /// `runs` lock: `(agent_id, terminal, turn_count)`. The owning agent id
1458    /// is the only heap allocation; the turn BUFFER is never cloned (ADV-2).
1459    ///
1460    /// This is what `handle_runs_record_turns` needs — owner for the write
1461    /// authz check, terminality for the distinguishable `run_terminal` drop,
1462    /// and the current turn count for the (fast-path) ceiling pre-check —
1463    /// without the `run_meta` deep clone of every recorded turn's payload.
1464    pub async fn run_header(&self, run_id: &str) -> Option<(String, bool, usize)> {
1465        self.runs
1466            .lock()
1467            .await
1468            .get(run_id)
1469            .map(|m| (m.agent_id.clone(), m.is_terminal(), m.turns.len()))
1470    }
1471
1472    /// Append per-turn trace records to a run's in-memory buffer (agent
1473    /// run tracing, U2). The recorder calls this after every
1474    /// `proposal.submit` on a session with a current run, passing the
1475    /// `RunRecord::Turn`s the recorder produced for that proposal.
1476    ///
1477    /// Returns a [`RecordRunTurnsOutcome`]:
1478    /// - `Appended { new_total }` — the batch landed; `new_total` is the
1479    ///   run's new total turn count (the caller computes its next
1480    ///   `start_index` from it).
1481    /// - `RefusedCeiling` — the batch would take the run past
1482    ///   [`RECORD_TURNS_RUN_CEILING`] and was refused WHOLE (the under-lock
1483    ///   runaway backstop, ADV-1). Nothing was appended.
1484    /// - `UnknownOrTerminal` — the `run_id` is unknown (the bracket was
1485    ///   never opened) or already terminal (a turn arriving after
1486    ///   `runs.complete` is dropped — the run is closed). Nothing was
1487    ///   appended.
1488    ///
1489    /// U3 flushes the in-memory buffer to disk; U4 broadcasts it. Both read
1490    /// via [`Self::run_turns`].
1491    ///
1492    /// U3: the same turn records are appended to the run's JSONL file so
1493    /// REPLAY (U5) sees them after a restart. The disk append happens for
1494    /// exactly the records that were accepted into the in-memory buffer
1495    /// (so disk and memory stay in lock-step), keyed by the run's owning
1496    /// `agent_id`. Disk failures are logged, never fatal.
1497    pub async fn record_run_turns(
1498        &self,
1499        run_id: &str,
1500        mut records: Vec<car_proto::RunRecord>,
1501    ) -> RecordRunTurnsOutcome {
1502        // Mutate the in-memory buffer under the lock, capturing the owning
1503        // agent_id and the accepted records so the disk append (which does
1504        // its own blocking I/O) happens after the lock is released.
1505        //
1506        // Invariant #1: the append AND the per-turn fanout both happen
1507        // while holding the `runs` lock, so they are serialized with
1508        // `runs.subscribe`'s snapshot+register (which holds the same
1509        // lock). A subscriber registered at cursor C therefore receives
1510        // exactly the turns appended after C — the snapshot covered ≤ C —
1511        // with no gap and no dup at the boundary.
1512        //
1513        // ADV-1: the runaway ceiling is enforced HERE, under the same lock,
1514        // BEFORE the append. The dispatcher runs a task per frame, so
1515        // pipelined batches each pass the handler's lock-free pre-check
1516        // against a stale snapshot; only this under-lock check is atomic
1517        // with the append, so it is the one that actually caps the run. A
1518        // batch that would take the run PAST the ceiling is refused WHOLE
1519        // (no partial admit) and reported back distinguishably so the
1520        // handler emits `run_turn_limit`, not `run_terminal`.
1521        enum Locked {
1522            Persist(String, Vec<car_proto::RunRecord>, usize),
1523            RefusedCeiling,
1524            UnknownOrTerminal,
1525        }
1526        let locked = {
1527            let mut runs = self.runs.lock().await;
1528            match runs.get_mut(run_id) {
1529                Some(meta) if !meta.is_terminal() => {
1530                    // ADV-1: hard ceiling, under the lock. Count only the
1531                    // `Turn` records — Started/Ended are not buffered here,
1532                    // and the ceiling is a turn-count cap. Refuse the whole
1533                    // batch if appending it would cross the ceiling.
1534                    let incoming_turns = records
1535                        .iter()
1536                        .filter(|r| matches!(r, car_proto::RunRecord::Turn(_)))
1537                        .count();
1538                    if meta.turns.len() + incoming_turns > RECORD_TURNS_RUN_CEILING {
1539                        Locked::RefusedCeiling
1540                    } else {
1541                        // Position of the first record about to be appended —
1542                        // its post-append cursor is `base + 1`.
1543                        let base = meta.turns.len();
1544                        // FIX 6: re-stamp each turn's `index` from the LIVE
1545                        // append position under the `runs` lock. The caller
1546                        // computes a provisional `start_index` from a pre-read
1547                        // `run_turn_count` OUTSIDE this lock — a TOCTOU: two
1548                        // concurrent proposals on one connection can read the
1549                        // same `start_index` and bake overlapping indices.
1550                        // Authoritatively assigning the index here, from
1551                        // `base + offset`, makes it contiguous and correct
1552                        // regardless of the racing reads upstream.
1553                        for (offset, record) in records.iter_mut().enumerate() {
1554                            if let car_proto::RunRecord::Turn(turn) = record {
1555                                turn.index = base + offset;
1556                            }
1557                        }
1558                        meta.turns.extend(records.iter().cloned());
1559                        let len = meta.turns.len();
1560                        let agent_id = meta.agent_id.clone();
1561                        let status = meta.live_status();
1562                        // Fan each newly-appended record out AFTER it is in the
1563                        // buffer, in order, carrying its 1-based post-append
1564                        // cursor (`base + offset + 1`).
1565                        let subs = self.run_subscribers.lock().await;
1566                        for (offset, record) in records.iter().enumerate() {
1567                            Self::fanout_locked(
1568                                &subs,
1569                                run_id,
1570                                &agent_id,
1571                                record.clone(),
1572                                base + offset + 1,
1573                                status,
1574                            );
1575                        }
1576                        drop(subs);
1577                        Locked::Persist(agent_id, records, len)
1578                    }
1579                }
1580                _ => Locked::UnknownOrTerminal,
1581            }
1582        };
1583        match locked {
1584            Locked::Persist(agent_id, records, len) => {
1585                // FIX 6: serialize the disk append per run. The `runs` lock
1586                // is released above (so blocking I/O doesn't stall the
1587                // global registry), so two concurrent batches for the same
1588                // run could otherwise interleave their writes on disk. The
1589                // per-run write lock makes each batch's append atomic w.r.t.
1590                // other batches; `append_turns` itself writes the whole
1591                // batch in a single `write_all` so a batch is never torn
1592                // mid-record by a concurrent appender.
1593                let write_guard = self.run_write_lock(run_id).await;
1594                let _held = write_guard.lock().await;
1595                if let Err(e) = self.run_store.append_turns(&agent_id, run_id, &records) {
1596                    tracing::warn!(run_id, error = %e, "run-store: failed to persist turns");
1597                }
1598                RecordRunTurnsOutcome::Appended { new_total: len }
1599            }
1600            Locked::RefusedCeiling => RecordRunTurnsOutcome::RefusedCeiling,
1601            Locked::UnknownOrTerminal => RecordRunTurnsOutcome::UnknownOrTerminal,
1602        }
1603    }
1604
1605    /// Per-run disk-write mutex (FIX 6). Returns (creating on first use) the
1606    /// `Mutex` that serializes appends for `run_id`, so concurrent
1607    /// `record_run_turns` batches for the same run can't interleave on disk.
1608    /// Keyed by `run_id`; the map entry is reclaimed when the run goes
1609    /// terminal (see [`Self::clear_terminal_run_turns`]).
1610    async fn run_write_lock(&self, run_id: &str) -> Arc<Mutex<()>> {
1611        let mut locks = self.run_write_locks.lock().await;
1612        locks
1613            .entry(run_id.to_string())
1614            .or_insert_with(|| Arc::new(Mutex::new(())))
1615            .clone()
1616    }
1617
1618    /// Number of turns already recorded for a run — the `start_index` the
1619    /// recorder passes so per-turn `index` stays monotonic across the
1620    /// run's proposals. `0` for an unknown run.
1621    pub async fn run_turn_count(&self, run_id: &str) -> usize {
1622        self.runs
1623            .lock()
1624            .await
1625            .get(run_id)
1626            .map(|m| m.turns.len())
1627            .unwrap_or(0)
1628    }
1629
1630    /// Clone of a run's ordered per-turn trace (agent run tracing, U2).
1631    /// This is the accessor U3 reads to flush turns to disk and U4 reads
1632    /// to broadcast them. Empty Vec for an unknown run or a run with no
1633    /// turns yet.
1634    pub async fn run_turns(&self, run_id: &str) -> Vec<car_proto::RunRecord> {
1635        self.runs
1636            .lock()
1637            .await
1638            .get(run_id)
1639            .map(|m| m.turns.clone())
1640            .unwrap_or_default()
1641    }
1642
1643    /// Atomically snapshot a run's turns AND register a live
1644    /// `runs.trace.event` subscriber for `(run_id, host_client_id)` —
1645    /// the load-bearing invariant #1 (agent run tracing, U4).
1646    ///
1647    /// Holds the `runs` lock across BOTH (a) reading the run's current
1648    /// turn buffer as the snapshot at cursor `C = turns.len()` and (b)
1649    /// inserting the subscriber into `run_subscribers`. Because the
1650    /// lifecycle path (`record_run_turns` / `complete_run` /
1651    /// `mark_run_incomplete`) appends-and-notifies under the SAME lock,
1652    /// the snapshot contains exactly the turns ≤ C and every turn appended
1653    /// after registration is delivered — no turn in the snapshot/register
1654    /// window is dropped (gap) or double-delivered (dup).
1655    ///
1656    /// Spawns the subscriber's drain task (bounded channel → WS) so the
1657    /// producer never writes the socket directly (invariant #2). Returns
1658    /// the snapshot response shape. `None` if the `run_id` is unknown (the
1659    /// handler maps that to a not-found error). A re-subscribe by the same
1660    /// `(run_id, host_client_id)` replaces the prior subscriber (its drain
1661    /// task ends when the old sender drops) and re-snapshots — the R8
1662    /// reconnect path: the fresh snapshot covers any turns emitted during
1663    /// the gap with no dup.
1664    ///
1665    /// Terminal-run disk fallback: a completed/incomplete run has its
1666    /// in-memory `turns` evicted for heap hygiene (see
1667    /// [`Self::clear_terminal_run_turns`]). When the run is terminal AND its
1668    /// in-memory buffer is empty, the snapshot is re-sourced from the disk
1669    /// store ([`crate::run_store::RunStore::get_run_trace`], the same source
1670    /// `runs.get_trace` reads), filtered to `RunRecord::Turn` so the shape
1671    /// matches the live in-memory snapshot. A terminal run takes no further
1672    /// appends, so the disk trail is final and there is no gap/dup concern —
1673    /// the snapshot/register atomicity the `runs` lock provides is only
1674    /// load-bearing for the in-progress path, which is unchanged.
1675    pub async fn subscribe_run(
1676        &self,
1677        run_id: &str,
1678        host_client_id: &str,
1679        channel: Arc<WsChannel>,
1680    ) -> Option<car_proto::RunSubscribeResponse> {
1681        // Build the subscriber (and its drain task) OUTSIDE the runs lock
1682        // so spawning never happens under the hot lock; registration
1683        // itself is the only thing serialized.
1684        let subscriber = crate::host::RunTraceSubscriber::spawn(
1685            host_client_id.to_string(),
1686            channel,
1687        );
1688
1689        let runs = self.runs.lock().await;
1690        let meta = runs.get(run_id)?;
1691        let mut snapshot = meta.turns.clone();
1692        let agent_id = meta.agent_id.clone();
1693        let status = meta.live_status();
1694        // A TERMINAL run whose in-memory turns were evicted (heap hygiene at
1695        // `complete_run`/`mark_run_incomplete`) must re-source its snapshot
1696        // from disk, or a late subscriber to a just-completed run would get
1697        // an empty snapshot. This is sound WITHOUT widening the lock: a
1698        // terminal run takes no further appends, so its disk trail is final
1699        // and immutable — there is no snapshot/register race to guard, which
1700        // is the only thing the `runs` lock protects here. We therefore keep
1701        // the no-blocking-I/O-under-the-async-mutex discipline (matching the
1702        // write paths) and defer the disk read until after the lock drops.
1703        let needs_disk_snapshot = meta.is_terminal() && snapshot.is_empty();
1704        // Register under the same lock — snapshot+register stays atomic for
1705        // the live (in-progress) path; only the terminal-and-evicted SOURCE
1706        // moves to disk, and that case has no concurrent appends.
1707        {
1708            let mut subs = self.run_subscribers.lock().await;
1709            // A re-subscribe replaces the prior entry; dropping the old
1710            // RunTraceSubscriber drops its sender, ending its drain task.
1711            subs.insert((run_id.to_string(), host_client_id.to_string()), subscriber);
1712        }
1713        drop(runs);
1714
1715        if needs_disk_snapshot {
1716            // Load the durable trail (the same source `runs.get_trace` uses)
1717            // and keep only `RunRecord::Turn`s — `turns_so_far` is turns-only
1718            // by contract (`Started`/`Ended` are conveyed by `agent_id` /
1719            // `status`), so this matches the live in-memory snapshot exactly.
1720            if let Some(records) = self.run_store.get_run_trace(run_id) {
1721                snapshot = records
1722                    .into_iter()
1723                    .filter(|r| matches!(r, car_proto::RunRecord::Turn(_)))
1724                    .collect();
1725            }
1726        }
1727        let cursor = snapshot.len();
1728
1729        Some(car_proto::RunSubscribeResponse {
1730            run_id: run_id.to_string(),
1731            agent_id,
1732            turns_so_far: snapshot,
1733            cursor,
1734            status,
1735        })
1736    }
1737
1738    /// Remove a live run-trace subscriber for `(run_id, host_client_id)`
1739    /// (agent run tracing, U4). Returns `true` if a subscription existed.
1740    /// Dropping the [`crate::host::RunTraceSubscriber`] drops its channel
1741    /// sender, which ends the drain task.
1742    pub async fn unsubscribe_run(&self, run_id: &str, host_client_id: &str) -> bool {
1743        self.run_subscribers
1744            .lock()
1745            .await
1746            .remove(&(run_id.to_string(), host_client_id.to_string()))
1747            .is_some()
1748    }
1749
1750    /// Drop every live run-trace subscription owned by `host_client_id`
1751    /// (agent run tracing, U4 — R8 cleanup). Called from
1752    /// [`remove_session`] on disconnect so a CarHost that drops doesn't
1753    /// leave dangling drain tasks. Reconnect-durability is client-side:
1754    /// the run stays subscribable while it lives, and the CarHost re-issues
1755    /// `runs.subscribe {run_id}` on its new connection — the server never
1756    /// synthesizes a failure on subscriber drop.
1757    pub async fn drop_run_subscribers_for_client(&self, host_client_id: &str) {
1758        self.run_subscribers
1759            .lock()
1760            .await
1761            .retain(|(_run, client), _| client != host_client_id);
1762    }
1763
1764    /// Disconnect cleanup for agent runs (R5). Called from
1765    /// [`remove_session`] with the `client_id` of the dropping
1766    /// connection. For every run this connection owns that is **not**
1767    /// yet terminal, wait a short grace window then re-check: if a
1768    /// concurrently-dispatched `runs.complete` made it terminal in the
1769    /// meantime, leave it; otherwise mark it `Incomplete`. The grace
1770    /// window is the fix for the serve-mode false-positive where a
1771    /// healthy `runs.complete` is still in flight (in a spawned
1772    /// dispatch task) when the close frame arrives (Risk: run identity
1773    /// races).
1774    async fn sweep_runs_for_disconnect(&self, client_id: &str) {
1775        // Snapshot the still-open runs owned by this connection.
1776        let pending: Vec<String> = {
1777            let runs = self.runs.lock().await;
1778            runs.values()
1779                .filter(|m| m.client_id == client_id && !m.is_terminal())
1780                .map(|m| m.run_id.clone())
1781                .collect()
1782        };
1783        if pending.is_empty() {
1784            return;
1785        }
1786        // Short grace window so an in-flight `runs.complete` (dispatched
1787        // on a spawned task that may outrace the close frame) can land
1788        // its terminal record before we conclude the run was abandoned.
1789        tokio::time::sleep(RUN_COMPLETE_GRACE).await;
1790        for run_id in pending {
1791            self.mark_run_incomplete(&run_id).await;
1792        }
1793    }
1794
1795    pub async fn create_session(
1796        &self,
1797        client_id: &str,
1798        channel: Arc<WsChannel>,
1799    ) -> Arc<ClientSession> {
1800        let journal_path = self.journal_dir.join(format!("{}.jsonl", client_id));
1801        let event_log = EventLog::with_journal(journal_path);
1802
1803        let executor = Arc::new(WsToolExecutor {
1804            channel: channel.clone(),
1805        });
1806
1807        let runtime = Runtime::new()
1808            .with_event_log(event_log)
1809            .with_executor(executor);
1810
1811        // If the embedder supplied a shared memgine, every session uses it.
1812        // Otherwise each session gets its own — matches pre-extraction behavior.
1813        let memgine = match &self.shared_memgine {
1814            Some(eng) => eng.clone(),
1815            None => Arc::new(Mutex::new(car_memgine::MemgineEngine::new(None))),
1816        };
1817
1818        let session = Arc::new(ClientSession {
1819            client_id: client_id.to_string(),
1820            runtime: Arc::new(runtime),
1821            channel,
1822            host: self.host.clone(),
1823            memgine,
1824            browser: car_ffi_common::browser::BrowserSessionSlot::new(),
1825            // When auth is disabled (no token installed), every
1826            // session is "authenticated" by default — preserves the
1827            // pre-#32 behaviour. When auth is enabled, the value is
1828            // ignored on creation; the dispatcher's gate checks
1829            // `ServerState::auth_token.is_some()` to decide whether
1830            // to enforce.
1831            authenticated: std::sync::atomic::AtomicBool::new(false),
1832            is_host: std::sync::atomic::AtomicBool::new(false),
1833            agent_id: tokio::sync::Mutex::new(None),
1834            bound_memgine: tokio::sync::Mutex::new(None),
1835            current_run_id: tokio::sync::Mutex::new(None),
1836        });
1837
1838        self.sessions
1839            .lock()
1840            .await
1841            .insert(client_id.to_string(), session.clone());
1842
1843        session
1844    }
1845
1846    /// Remove a per-client session from the registry on disconnect.
1847    /// Returns the removed session if present so callers can drop any
1848    /// remaining strong refs (e.g. drain pending tool callbacks). Fix
1849    /// for MULTI-4 / WS-3 — without this, `state.sessions` retains
1850    /// `Arc<ClientSession>` for every connection that ever existed.
1851    pub async fn remove_session(&self, client_id: &str) -> Option<Arc<ClientSession>> {
1852        let removed = self.sessions.lock().await.remove(client_id);
1853        if let Some(session) = &removed {
1854            // #169: drop the agent_id → client_id binding so a
1855            // disconnected lifecycle agent can reconnect (or its
1856            // supervisor-respawned replacement can take the slot)
1857            // without colliding with the stale claim.
1858            let bound = session.agent_id.lock().await.clone();
1859            if let Some(id) = bound {
1860                let mut attached = self.attached_agents.lock().await;
1861                if attached.get(&id).map(String::as_str) == Some(client_id) {
1862                    attached.remove(&id);
1863                }
1864            }
1865            // Drop any in-flight `agents.chat` sessions bound to this
1866            // client — either side disconnecting orphans the stream,
1867            // and a respawned agent's stray `agent.chat.event`
1868            // notifications must not race into a stale routing entry.
1869            // See `docs/proposals/agent-chat-surface.md`.
1870            let bound_agent = session.agent_id.lock().await.clone();
1871            let mut chats = self.chat_sessions.lock().await;
1872            chats.retain(|_, s| {
1873                if s.host_client_id == client_id {
1874                    return false;
1875                }
1876                if let Some(agent_id) = &bound_agent {
1877                    if &s.agent_id == agent_id {
1878                        return false;
1879                    }
1880                }
1881                true
1882            });
1883            // Drop the chat lock before the run sweep, which awaits the
1884            // grace window and re-locks `runs` — keeping locks disjoint
1885            // avoids holding `chat_sessions` across the sleep.
1886            drop(chats);
1887            // Agent run tracing (U4): drop this connection's live
1888            // run-trace subscriptions so its drain tasks end and the
1889            // registry doesn't accumulate stale `(run_id, client)` entries
1890            // across reconnects. Reconnect-durability is client-side: the
1891            // run stays subscribable; CarHost re-subscribes by `run_id` on
1892            // its new connection (R8). This is exempt from the chat
1893            // drain-and-synthesize-error path — a dropped trace subscriber
1894            // never marks the underlying run failed.
1895            self.drop_run_subscribers_for_client(client_id).await;
1896            // Agent run tracing (R5): any run this connection owns that
1897            // has no terminal record yet is swept to `Incomplete` after
1898            // a short grace window — long enough for an in-flight
1899            // `runs.complete` to land first so a healthy close is never
1900            // mislabeled.
1901            self.sweep_runs_for_disconnect(client_id).await;
1902        }
1903        removed
1904    }
1905}
1906
1907#[cfg(test)]
1908mod tool_timeout_tests {
1909    use super::*;
1910
1911    #[test]
1912    fn honors_action_timeout_over_default() {
1913        // The action's budget bounds the wait (+grace so the executor's own
1914        // deadline reaps first) — no 60s ceiling, no env read.
1915        assert_eq!(
1916            tool_callback_timeout(Some(180_000)),
1917            std::time::Duration::from_millis(180_000 + TOOL_TIMEOUT_GRACE_MS)
1918        );
1919        // A budget far above the old 60s ceiling is honored (the #259 bug),
1920        // and the wait always exceeds the budget (so the executor wins).
1921        assert!(tool_callback_timeout(Some(600_000)) > std::time::Duration::from_secs(600));
1922        assert!(tool_callback_timeout(Some(180_000)) >= std::time::Duration::from_millis(180_000));
1923    }
1924
1925    #[test]
1926    fn default_is_not_the_old_60s() {
1927        // With no action budget and no env override, the fallback is the
1928        // raised default — not the hardcoded 60s that reaped real tools.
1929        // (Asserted only when CAR_TOOL_TIMEOUT is unset, which it is in CI.)
1930        if std::env::var_os("CAR_TOOL_TIMEOUT").is_none() {
1931            assert_eq!(
1932                tool_callback_timeout(None),
1933                std::time::Duration::from_millis(DEFAULT_TOOL_TIMEOUT_MS)
1934            );
1935            assert!(DEFAULT_TOOL_TIMEOUT_MS > 60_000, "default must exceed the old 60s");
1936        }
1937    }
1938}
1939
1940#[cfg(test)]
1941mod observer_mode_tests {
1942    use super::*;
1943
1944    fn journal_dir() -> PathBuf {
1945        let target = std::env::var_os("CARGO_TARGET_DIR")
1946            .map(std::path::PathBuf::from)
1947            .unwrap_or_else(|| {
1948                std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
1949                    .join("..")
1950                    .join("..")
1951                    .join("target")
1952            });
1953        std::fs::create_dir_all(&target).ok();
1954        let target = std::fs::canonicalize(&target).unwrap_or(target);
1955        let tmp = tempfile::TempDir::new_in(&target).unwrap();
1956        let p = tmp.path().to_path_buf();
1957        std::mem::forget(tmp); // keep the dir alive for the test
1958        p
1959    }
1960
1961    #[test]
1962    fn supervisor_returns_observer_error_when_marker_set() {
1963        // Closes Parslee-ai/car-releases#44: the second car-server on
1964        // a host installs the observer marker after `with_paths`
1965        // returns AlreadyRunning. Subsequent `state.supervisor()`
1966        // calls must return a clear "observe-only" error mentioning
1967        // the manifest path — they must NOT retry user_default()
1968        // (which would re-acquire the lock and likely also fail).
1969        let state = ServerState::standalone(journal_dir());
1970        let fake_manifest = PathBuf::from("/tmp/fake-manifest-for-test.json");
1971        state
1972            .install_observer_manifest(fake_manifest.clone())
1973            .expect("install_observer_manifest succeeds on fresh state");
1974        assert_eq!(state.observer_manifest_path(), Some(&fake_manifest));
1975
1976        let err = state.supervisor().map(|_| ()).unwrap_err();
1977        assert!(
1978            err.contains("observe-only"),
1979            "error must mention observe-only mode: {err}"
1980        );
1981        assert!(
1982            err.contains("fake-manifest-for-test.json"),
1983            "error must surface the manifest path so operators know which daemon owns it: {err}"
1984        );
1985    }
1986
1987    #[test]
1988    fn install_observer_manifest_is_idempotent_per_path_collision() {
1989        let state = ServerState::standalone(journal_dir());
1990        let p = PathBuf::from("/tmp/manifest-a.json");
1991        let q = PathBuf::from("/tmp/manifest-b.json");
1992        state.install_observer_manifest(p.clone()).unwrap();
1993        // OnceLock::set returns the value back on collision.
1994        let err = state.install_observer_manifest(q.clone()).unwrap_err();
1995        assert_eq!(err, q);
1996        assert_eq!(state.observer_manifest_path(), Some(&p));
1997    }
1998
1999    #[test]
2000    fn supervisor_if_installed_does_not_lazy_init() {
2001        // The Heisenberg-subscribe guard: `host.subscribe`'s
2002        // identity path must use the non-acquiring read so a
2003        // purely observational client can't cause the daemon to
2004        // claim `<manifest>.lock` as a side effect of asking
2005        // about it. Fresh state has no supervisor installed.
2006        let state = ServerState::standalone(journal_dir());
2007        assert!(state.supervisor_if_installed().is_none());
2008        // observer_manifest_path should remain unset too — no
2009        // implicit init.
2010        assert!(state.observer_manifest_path().is_none());
2011    }
2012}