Skip to main content

car_server_core/
host.rs

1//! Host-facing state for OS integrations.
2//!
3//! This module is intentionally UI-agnostic. Native shells such as a macOS menu
4//! bar app, Windows tray app, or Linux status notifier can subscribe to these
5//! events and render the same agent control surface.
6
7use crate::session::WsChannel;
8use car_proto::{
9    CreateHostApprovalRequest, HostAgent, HostAgentStatus, HostApprovalRequest, HostApprovalStatus,
10    HostEvent, RegisterHostAgentRequest, ResolveHostApprovalRequest, SetHostAgentStatusRequest,
11};
12use chrono::Utc;
13use futures::SinkExt;
14use serde_json::Value;
15use std::collections::{HashMap, VecDeque};
16use std::sync::Arc;
17use std::time::Duration;
18use tokio::sync::{Mutex, Notify};
19use tokio_tungstenite::tungstenite::Message;
20
21const MAX_EVENTS: usize = 500;
22
23#[derive(Default)]
24pub struct HostState {
25    agents: Mutex<HashMap<String, HostAgent>>,
26    approvals: Mutex<HashMap<String, HostApprovalRequest>>,
27    events: Mutex<VecDeque<HostEvent>>,
28    subscribers: Mutex<HashMap<String, Arc<WsChannel>>>,
29    /// Per-approval `Notify` so [`HostState::wait_for_resolution`]
30    /// can park efficiently instead of polling the approvals map.
31    /// Inserted when the gate creates an approval, removed when the
32    /// resolution comes in (or when the wait drops it on timeout).
33    /// Kept off the public surface because it's an implementation
34    /// detail of the gate path — direct callers of `create_approval`
35    /// don't need it.
36    notifies: Mutex<HashMap<String, Arc<Notify>>>,
37}
38
39/// Outcome of a gated high-risk call.
40///
41/// Returned by [`HostState::request_and_wait_approval`]. Callers map
42/// each variant to the JSON-RPC error / success they want to surface.
43#[derive(Debug, Clone, Copy, PartialEq, Eq)]
44pub enum ApprovalOutcome {
45    /// The user picked the explicit "approve" option (or whatever the
46    /// caller declared as the approve label).
47    Approved,
48    /// The user picked any other option, or the resolution string
49    /// didn't match the approve label.
50    Denied,
51    /// No resolution arrived inside the supplied timeout. Treated as
52    /// deny by callers; the approval row is left in `Pending` so the
53    /// UI can still display it for forensics.
54    TimedOut,
55}
56
57impl HostState {
58    pub fn new() -> Self {
59        Self::default()
60    }
61
62    pub async fn subscribe(&self, client_id: &str, channel: Arc<WsChannel>) {
63        self.subscribers
64            .lock()
65            .await
66            .insert(client_id.to_string(), channel);
67    }
68
69    pub async fn unsubscribe(&self, client_id: &str) {
70        self.subscribers.lock().await.remove(client_id);
71    }
72
73    pub async fn register_agent(
74        &self,
75        client_id: &str,
76        req: RegisterHostAgentRequest,
77    ) -> Result<HostAgent, String> {
78        let id = req.id.unwrap_or_else(|| format!("agent-{}", short_id()));
79        // ACL (audit 2026-05): if an agent with this id already
80        // exists and was registered by a *different* session, refuse
81        // — pre-flip a second client could overwrite the first
82        // client's agent record.
83        {
84            let agents = self.agents.lock().await;
85            if let Some(existing) = agents.get(&id) {
86                match existing.session_id.as_deref() {
87                    Some(owner) if owner != client_id => {
88                        return Err(format!(
89                            "agent '{id}' is owned by another session; \
90                             unregister it from that session first"
91                        ));
92                    }
93                    _ => {}
94                }
95            }
96        }
97
98        let agent = HostAgent {
99            id: id.clone(),
100            name: req.name,
101            kind: req.kind,
102            capabilities: req.capabilities,
103            project: req.project,
104            session_id: Some(client_id.to_string()),
105            status: HostAgentStatus::Idle,
106            current_task: None,
107            pid: req.pid,
108            display: req.display,
109            updated_at: Utc::now(),
110            metadata: req.metadata,
111        };
112
113        self.agents.lock().await.insert(id.clone(), agent.clone());
114        self.record_event(
115            "agent.registered",
116            Some(id),
117            format!("{} registered", agent.name),
118            serde_json::to_value(&agent).map_err(|e| e.to_string())?,
119        )
120        .await;
121
122        Ok(agent)
123    }
124
125    pub async fn unregister_agent(
126        &self,
127        caller_client_id: &str,
128        agent_id: &str,
129    ) -> Result<(), String> {
130        // ACL (audit 2026-05): only the registering session can
131        // unregister. Agents with no session_id (legacy / admin-
132        // installed records) accept any caller.
133        {
134            let agents = self.agents.lock().await;
135            if let Some(existing) = agents.get(agent_id) {
136                if let Some(owner) = existing.session_id.as_deref() {
137                    if owner != caller_client_id {
138                        return Err(format!("agent '{agent_id}' is owned by another session"));
139                    }
140                }
141            }
142        }
143
144        let removed = self.agents.lock().await.remove(agent_id);
145        if removed.is_none() {
146            return Err(format!("unknown agent '{}'", agent_id));
147        }
148        self.record_event(
149            "agent.unregistered",
150            Some(agent_id.to_string()),
151            format!("{} unregistered", agent_id),
152            Value::Null,
153        )
154        .await;
155        // The agent that owned these is gone — its pending approvals
156        // are no longer decidable (no one will read the resolution).
157        // Auto-cancel so the queue stays in sync. car-releases#48.
158        self.reap_agent_approvals(caller_client_id, agent_id).await;
159        Ok(())
160    }
161
162    pub async fn set_status(
163        &self,
164        caller_client_id: &str,
165        req: SetHostAgentStatusRequest,
166    ) -> Result<HostAgent, String> {
167        let mut agents = self.agents.lock().await;
168        let agent = agents
169            .get_mut(&req.agent_id)
170            .ok_or_else(|| format!("unknown agent '{}'", req.agent_id))?;
171
172        // ACL (audit 2026-05): only the agent's owning session can
173        // change its status. Pre-flip a second client could mark
174        // someone else's agent as Errored / WaitingForApproval /
175        // anything else from outside the workflow.
176        if let Some(owner) = agent.session_id.as_deref() {
177            if owner != caller_client_id {
178                return Err(format!(
179                    "agent '{}' is owned by another session",
180                    req.agent_id
181                ));
182            }
183        }
184
185        agent.status = req.status.clone();
186        agent.current_task = req.current_task.clone();
187        agent.updated_at = Utc::now();
188        let updated = agent.clone();
189        drop(agents);
190
191        let message = req
192            .message
193            .unwrap_or_else(|| format!("{} is {:?}", updated.name, updated.status));
194        self.record_event(
195            "agent.status_changed",
196            Some(updated.id.clone()),
197            message,
198            if req.payload.is_null() {
199                serde_json::to_value(&updated).map_err(|e| e.to_string())?
200            } else {
201                req.payload
202            },
203        )
204        .await;
205
206        Ok(updated)
207    }
208
209    /// Create an approval owned by `caller_client_id`. Pass `None`
210    /// to mark the approval *system-level* — the high-risk-method
211    /// approval gate uses this so the local UI session (a different
212    /// session than the one whose dispatch is parking) can resolve
213    /// it. Audit 2026-05: prior to this caller arg, `create_approval`
214    /// had no notion of ownership and `resolve_approval` was open
215    /// to any caller, allowing cross-session approval squatting.
216    pub async fn create_approval(
217        &self,
218        caller_client_id: Option<&str>,
219        req: CreateHostApprovalRequest,
220    ) -> Result<HostApprovalRequest, String> {
221        let approval = HostApprovalRequest {
222            id: format!("approval-{}", short_id()),
223            agent_id: req.agent_id,
224            client_id: caller_client_id.map(|s| s.to_string()),
225            action: req.action,
226            details: req.details,
227            options: if req.options.is_empty() {
228                vec!["approve".to_string(), "deny".to_string()]
229            } else {
230                req.options
231            },
232            status: HostApprovalStatus::Pending,
233            created_at: Utc::now(),
234            resolved_at: None,
235            resolution: None,
236        };
237
238        self.approvals
239            .lock()
240            .await
241            .insert(approval.id.clone(), approval.clone());
242        self.record_event(
243            "approval.requested",
244            approval.agent_id.clone(),
245            format!("Approval requested: {}", approval.action),
246            serde_json::to_value(&approval).map_err(|e| e.to_string())?,
247        )
248        .await;
249        Ok(approval)
250    }
251
252    /// Resolve an approval. ACL rules (audit 2026-05, fan-out added 2026-05-15):
253    /// - Approval has `client_id: None` (system-level, e.g. raised
254    ///   by the high-risk-method gate) → any authed caller may
255    ///   resolve, since the gate's whole point is that *the user*
256    ///   acks it via whichever session their UI happens to use.
257    /// - Approval has `client_id: Some(x)` AND caller IS `x` →
258    ///   resolve directly, fire `approval.resolved`.
259    /// - Approval has `client_id: Some(x)` AND caller is a DIFFERENT
260    ///   session AND owner is still subscribed → fan-out: record an
261    ///   `approval.resolve_requested` event that the owning agent
262    ///   hooks to call `resolve_approval` on its own session. The
263    ///   approval row stays Pending until the owner completes it.
264    ///   Caller gets back the pending approval (status: Pending).
265    ///   This unblocks UIs like CarHost that surface every approval
266    ///   in `host.approvals` — including ones agents pushed via
267    ///   `host.request_approval` — without breaking the squat-
268    ///   prevention property: only the OWNER ever mutates the row,
269    ///   the non-owning caller just signals intent.
270    /// - Cross-session AND owner is NOT subscribed → return error
271    ///   identifying the disconnected owner. Caller's UI knows the
272    ///   resolution can't land right now and surfaces accordingly.
273    pub async fn resolve_approval(
274        &self,
275        caller_client_id: &str,
276        req: ResolveHostApprovalRequest,
277    ) -> Result<HostApprovalRequest, String> {
278        // First pass: snapshot the approval and check ownership.
279        // We don't hold the approvals lock across the subscribers
280        // lock — both are mutexes and arbitrary lock-order would
281        // invite a deadlock.
282        let (owner_opt, pending_snapshot) = {
283            let mut approvals = self.approvals.lock().await;
284            let approval = approvals
285                .get_mut(&req.approval_id)
286                .ok_or_else(|| format!("unknown approval '{}'", req.approval_id))?;
287            (approval.client_id.clone(), approval.clone())
288        };
289
290        if let Some(owner) = owner_opt {
291            if owner != caller_client_id {
292                // Cross-session resolve. If the owning session is
293                // currently subscribed, fan out and let them act on
294                // their own session. If not, fail explicitly so the
295                // caller's UI can tell the user.
296                let owner_subscribed =
297                    self.subscribers.lock().await.contains_key(&owner);
298                if !owner_subscribed {
299                    return Err(format!(
300                        "approval '{}' is owned by session '{}' which is not currently connected",
301                        req.approval_id, owner,
302                    ));
303                }
304                self.record_event(
305                    "approval.resolve_requested",
306                    pending_snapshot.agent_id.clone(),
307                    format!(
308                        "Resolution requested for {}: {}",
309                        pending_snapshot.action, req.resolution
310                    ),
311                    serde_json::json!({
312                        "approval_id": req.approval_id,
313                        "resolution": req.resolution,
314                        "requesting_client_id": caller_client_id,
315                        "owner_client_id": owner,
316                    }),
317                )
318                .await;
319                // Return the still-Pending row. The owner's resolve
320                // call will fire `approval.resolved`, which the
321                // caller's subscription picks up to update its UI.
322                return Ok(pending_snapshot);
323            }
324        }
325
326        // Same-session OR system-level. Resolve directly.
327        let resolved = {
328            let mut approvals = self.approvals.lock().await;
329            let approval = approvals
330                .get_mut(&req.approval_id)
331                .ok_or_else(|| format!("approval '{}' vanished mid-resolve", req.approval_id))?;
332            approval.status = HostApprovalStatus::Resolved;
333            approval.resolution = Some(req.resolution);
334            approval.resolved_at = Some(Utc::now());
335            approval.clone()
336        };
337
338        // Wake any gate task parked on this approval. Take the Notify
339        // out of the map (it's one-shot) before notifying so the wait
340        // task can drop its Arc cleanly. `notify_one` is safe even if
341        // no one is waiting yet — the Notify holds the permit.
342        if let Some(notify) = self.notifies.lock().await.remove(&resolved.id) {
343            notify.notify_one();
344        }
345
346        self.record_event(
347            "approval.resolved",
348            resolved.agent_id.clone(),
349            format!("Approval resolved: {}", resolved.action),
350            serde_json::to_value(&resolved).map_err(|e| e.to_string())?,
351        )
352        .await;
353        Ok(resolved)
354    }
355
356    /// Auto-resolve every still-`Pending` approval matched by
357    /// `should_reap`, with `resolution = "agent_gone"`, and fan out
358    /// `approval.resolved` so subscribed shells (CarHost) drop them
359    /// from their queue. Returns how many were reaped.
360    ///
361    /// Lock order matches [`resolve_approval`]: mutate under the
362    /// `approvals` lock, collect snapshots, release it, *then* wake
363    /// gate waiters and fire events (`record_event` takes the events
364    /// + subscribers locks).
365    async fn reap_approvals<F>(&self, should_reap: F) -> usize
366    where
367        F: Fn(&HostApprovalRequest) -> bool,
368    {
369        let reaped: Vec<HostApprovalRequest> = {
370            let mut approvals = self.approvals.lock().await;
371            let mut out = Vec::new();
372            for approval in approvals.values_mut() {
373                if approval.status == HostApprovalStatus::Pending && should_reap(approval) {
374                    approval.status = HostApprovalStatus::Resolved;
375                    approval.resolution = Some("agent_gone".to_string());
376                    approval.resolved_at = Some(Utc::now());
377                    out.push(approval.clone());
378                }
379            }
380            out
381        };
382
383        for approval in &reaped {
384            // Defensive: gate-raised approvals are system-level
385            // (client_id None) so they won't match the session/agent
386            // predicates, but if one ever did, don't leave a parked
387            // gate task hung.
388            if let Some(notify) = self.notifies.lock().await.remove(&approval.id) {
389                notify.notify_one();
390            }
391            self.record_event(
392                "approval.resolved",
393                approval.agent_id.clone(),
394                format!("Approval auto-cancelled (agent gone): {}", approval.action),
395                serde_json::to_value(approval).unwrap_or(Value::Null),
396            )
397            .await;
398        }
399        reaped.len()
400    }
401
402    /// Reap a disconnected session's pending approvals. Called on WS
403    /// close — covers graceful unregister+close, hard crash (TCP
404    /// reset), and ping timeout in one place. Only approvals *owned*
405    /// by this session (`client_id == Some(client_id)`) are touched;
406    /// system-level gate approvals (`client_id: None`) are left for
407    /// the user to act on. car-releases#48.
408    pub async fn reap_session_approvals(&self, client_id: &str) -> usize {
409        self.reap_approvals(|a| a.client_id.as_deref() == Some(client_id))
410            .await
411    }
412
413    /// Reap a specific agent's pending approvals when it unregisters
414    /// while its session stays open (agent restarts under a new id on
415    /// the same WS). Scoped to approvals this caller's session owns so
416    /// it can't cancel another session's — or a system gate's — work.
417    pub async fn reap_agent_approvals(&self, caller_client_id: &str, agent_id: &str) -> usize {
418        self.reap_approvals(|a| {
419            a.agent_id.as_deref() == Some(agent_id)
420                && a.client_id.as_deref() == Some(caller_client_id)
421        })
422        .await
423    }
424
425    /// Create an approval and block until the user resolves it (or
426    /// `timeout` elapses).
427    ///
428    /// Used by the high-risk-method gate in the WS dispatcher to make
429    /// the human a load-bearing participant in actions like
430    /// `automation.run_applescript`, `messages.send`, etc. The
431    /// outcome maps as follows:
432    ///
433    /// - resolution string equals `approve_label` → [`ApprovalOutcome::Approved`]
434    /// - any other resolution string → [`ApprovalOutcome::Denied`]
435    /// - timeout fires before resolution → [`ApprovalOutcome::TimedOut`]
436    ///
437    /// Subscribers receive the standard `approval.requested` event
438    /// the moment the approval is created; the local HTML UI and
439    /// any other host shell can render approve/deny buttons that
440    /// call `host.resolve_approval`.
441    ///
442    /// On timeout, the approval row is left in `Pending` on purpose
443    /// — the UI still shows it (with a "expired" hint the renderer
444    /// can derive from `created_at`) and the gate path returns
445    /// `TimedOut` so the caller surfaces a clear error.
446    pub async fn request_and_wait_approval(
447        &self,
448        req: CreateHostApprovalRequest,
449        approve_label: &str,
450        timeout: Duration,
451    ) -> Result<ApprovalOutcome, String> {
452        // Gate-raised approvals are system-level (caller_client_id =
453        // None) so the local UI session — which is a *different*
454        // session from the one whose dispatch is parking — is
455        // permitted to resolve them. Per-session ACL would deadlock
456        // the UX otherwise.
457        let approval = self.create_approval(None, req).await?;
458        let approval_id = approval.id.clone();
459
460        // Register the wakeup channel BEFORE we sleep. resolve_approval
461        // pulls this notify out of the map and signals it; if it's
462        // missing (because resolve raced ahead), we re-check the
463        // approvals map below before waiting.
464        let notify = Arc::new(Notify::new());
465        {
466            let mut map = self.notifies.lock().await;
467            map.insert(approval_id.clone(), notify.clone());
468        }
469
470        // Defensive re-check: if resolve_approval landed between
471        // create_approval and the notify insert, the wakeup is gone
472        // but the approvals map already has the resolution. Pull it
473        // out and short-circuit.
474        if let Some(resolved) = self.approvals.lock().await.get(&approval_id).cloned() {
475            if matches!(resolved.status, HostApprovalStatus::Resolved) {
476                self.notifies.lock().await.remove(&approval_id);
477                return Ok(classify_resolution(&resolved, approve_label));
478            }
479        }
480
481        // Park until either the notify fires or the timeout elapses.
482        // We hold the Arc<Notify>; drop ours on the way out so the
483        // map slot is free either way (resolve removes it; timeout
484        // also removes it below).
485        let woken = tokio::time::timeout(timeout, notify.notified()).await;
486        if woken.is_err() {
487            self.notifies.lock().await.remove(&approval_id);
488            return Ok(ApprovalOutcome::TimedOut);
489        }
490
491        let resolved = self
492            .approvals
493            .lock()
494            .await
495            .get(&approval_id)
496            .cloned()
497            .ok_or_else(|| format!("approval '{}' vanished after notify", approval_id))?;
498        Ok(classify_resolution(&resolved, approve_label))
499    }
500
501    pub async fn agents(&self) -> Vec<HostAgent> {
502        let mut agents: Vec<_> = self.agents.lock().await.values().cloned().collect();
503        agents.sort_by(|a, b| a.name.cmp(&b.name).then(a.id.cmp(&b.id)));
504        agents
505    }
506
507    pub async fn approvals(&self) -> Vec<HostApprovalRequest> {
508        let mut approvals: Vec<_> = self.approvals.lock().await.values().cloned().collect();
509        approvals.sort_by(|a, b| b.created_at.cmp(&a.created_at));
510        approvals
511    }
512
513    pub async fn events(&self, limit: usize) -> Vec<HostEvent> {
514        self.events
515            .lock()
516            .await
517            .iter()
518            .rev()
519            .take(limit)
520            .cloned()
521            .collect()
522    }
523
524    pub async fn record_event(
525        &self,
526        kind: impl Into<String>,
527        agent_id: Option<String>,
528        message: impl Into<String>,
529        payload: Value,
530    ) -> HostEvent {
531        let event = HostEvent {
532            id: format!("event-{}", short_id()),
533            timestamp: Utc::now(),
534            kind: kind.into(),
535            agent_id,
536            message: message.into(),
537            payload,
538        };
539
540        {
541            let mut events = self.events.lock().await;
542            events.push_back(event.clone());
543            while events.len() > MAX_EVENTS {
544                events.pop_front();
545            }
546        }
547
548        self.broadcast_event(&event).await;
549        event
550    }
551
552    async fn broadcast_event(&self, event: &HostEvent) {
553        let subscribers: Vec<Arc<WsChannel>> =
554            self.subscribers.lock().await.values().cloned().collect();
555
556        let Ok(json) = serde_json::to_string(&serde_json::json!({
557            "jsonrpc": "2.0",
558            "method": "host.event",
559            "params": event,
560        })) else {
561            return;
562        };
563
564        for channel in subscribers {
565            let _ = channel
566                .write
567                .lock()
568                .await
569                .send(Message::Text(json.clone().into()))
570                .await;
571        }
572    }
573}
574
575fn short_id() -> String {
576    uuid::Uuid::new_v4().simple().to_string()[..12].to_string()
577}
578
579fn classify_resolution(approval: &HostApprovalRequest, approve_label: &str) -> ApprovalOutcome {
580    match approval.resolution.as_deref() {
581        Some(r) if r == approve_label => ApprovalOutcome::Approved,
582        _ => ApprovalOutcome::Denied,
583    }
584}
585
586#[cfg(test)]
587mod tests {
588    use super::*;
589
590    #[tokio::test]
591    async fn host_tracks_agents_events_and_approvals() {
592        let host = HostState::new();
593
594        let agent = host
595            .register_agent(
596                "client-1",
597                RegisterHostAgentRequest {
598                    id: Some("agent-1".to_string()),
599                    name: "Researcher".to_string(),
600                    kind: "builtin".to_string(),
601                    capabilities: vec!["search".to_string()],
602                    project: Some("/tmp/project".to_string()),
603                    pid: None,
604                    display: car_proto::HostAgentDisplay {
605                        label: Some("Research Lead".to_string()),
606                        icon: Some("magnifying-glass".to_string()),
607                        accent: Some("#0a84ff".to_string()),
608                    },
609                    metadata: Value::Null,
610                },
611            )
612            .await
613            .expect("register agent");
614
615        assert_eq!(agent.status, HostAgentStatus::Idle);
616        assert_eq!(agent.display.label.as_deref(), Some("Research Lead"));
617        assert_eq!(agent.display.icon.as_deref(), Some("magnifying-glass"));
618        assert_eq!(agent.display.accent.as_deref(), Some("#0a84ff"));
619        assert_eq!(host.agents().await.len(), 1);
620
621        let updated = host
622            .set_status(
623                "client-1",
624                SetHostAgentStatusRequest {
625                    agent_id: "agent-1".to_string(),
626                    status: HostAgentStatus::Running,
627                    current_task: Some("Collect facts".to_string()),
628                    message: None,
629                    payload: Value::Null,
630                },
631            )
632            .await
633            .expect("set status");
634
635        assert_eq!(updated.status, HostAgentStatus::Running);
636        assert_eq!(updated.current_task.as_deref(), Some("Collect facts"));
637
638        let approval = host
639            .create_approval(
640                Some("client-1"),
641                CreateHostApprovalRequest {
642                    agent_id: Some("agent-1".to_string()),
643                    action: "Run tests".to_string(),
644                    details: serde_json::json!({ "command": "cargo test" }),
645                    options: vec![],
646                    system_level: false,
647                },
648            )
649            .await
650            .expect("create approval");
651
652        assert_eq!(approval.options, vec!["approve", "deny"]);
653        assert_eq!(approval.status, HostApprovalStatus::Pending);
654
655        let resolved = host
656            .resolve_approval(
657                "client-1",
658                ResolveHostApprovalRequest {
659                    approval_id: approval.id,
660                    resolution: "approve".to_string(),
661                },
662            )
663            .await
664            .expect("resolve approval");
665
666        assert_eq!(resolved.status, HostApprovalStatus::Resolved);
667        assert_eq!(resolved.resolution.as_deref(), Some("approve"));
668        assert!(host.events(10).await.len() >= 4);
669    }
670
671    #[tokio::test]
672    async fn request_and_wait_returns_approved_when_user_approves() {
673        let host = Arc::new(HostState::new());
674        let host2 = host.clone();
675
676        // Launch the gate in the background; resolve after a small
677        // delay to mirror the realistic UI round-trip.
678        let waiter = tokio::spawn(async move {
679            host2
680                .request_and_wait_approval(
681                    CreateHostApprovalRequest {
682                        agent_id: None,
683                        action: "automation.run_applescript".into(),
684                        details: serde_json::json!({}),
685                        options: vec![],
686                    system_level: false,
687                    },
688                    "approve",
689                    Duration::from_secs(2),
690                )
691                .await
692                .expect("gate ran")
693        });
694
695        // Find the pending approval and resolve it. The gate raises
696        // approvals as system-level (client_id None), so any caller
697        // can resolve — using "ui-session" here mirrors how the
698        // local UI session would.
699        tokio::time::sleep(Duration::from_millis(20)).await;
700        let pending = host.approvals().await;
701        assert_eq!(pending.len(), 1, "exactly one pending approval");
702        assert!(
703            pending[0].client_id.is_none(),
704            "gate-raised approvals must be system-level"
705        );
706        host.resolve_approval(
707            "ui-session",
708            ResolveHostApprovalRequest {
709                approval_id: pending[0].id.clone(),
710                resolution: "approve".into(),
711            },
712        )
713        .await
714        .unwrap();
715
716        let outcome = waiter.await.unwrap();
717        assert_eq!(outcome, ApprovalOutcome::Approved);
718    }
719
720    #[tokio::test]
721    async fn request_and_wait_returns_denied_on_other_resolution() {
722        let host = Arc::new(HostState::new());
723        let host2 = host.clone();
724        let waiter = tokio::spawn(async move {
725            host2
726                .request_and_wait_approval(
727                    CreateHostApprovalRequest {
728                        agent_id: None,
729                        action: "messages.send".into(),
730                        details: serde_json::json!({}),
731                        options: vec![],
732                    system_level: false,
733                    },
734                    "approve",
735                    Duration::from_secs(2),
736                )
737                .await
738                .unwrap()
739        });
740        tokio::time::sleep(Duration::from_millis(20)).await;
741        let pending = host.approvals().await;
742        host.resolve_approval(
743            "ui-session",
744            ResolveHostApprovalRequest {
745                approval_id: pending[0].id.clone(),
746                resolution: "deny".into(),
747            },
748        )
749        .await
750        .unwrap();
751        assert_eq!(waiter.await.unwrap(), ApprovalOutcome::Denied);
752    }
753
754    #[tokio::test]
755    async fn request_and_wait_times_out_when_no_resolution() {
756        let host = HostState::new();
757        let outcome = host
758            .request_and_wait_approval(
759                CreateHostApprovalRequest {
760                    agent_id: None,
761                    action: "vision.ocr".into(),
762                    details: serde_json::json!({}),
763                    options: vec![],
764                    system_level: false,
765                },
766                "approve",
767                Duration::from_millis(50),
768            )
769            .await
770            .unwrap();
771        assert_eq!(outcome, ApprovalOutcome::TimedOut);
772        // Approval row stays in Pending so the UI keeps a record.
773        let pending = host.approvals().await;
774        assert_eq!(pending.len(), 1);
775        assert_eq!(pending[0].status, HostApprovalStatus::Pending);
776    }
777
778    // -- ACL regression tests (audit 2026-05) --
779
780    fn make_register_request(name: &str) -> RegisterHostAgentRequest {
781        RegisterHostAgentRequest {
782            id: Some(name.into()),
783            name: name.into(),
784            kind: "test".into(),
785            capabilities: vec![],
786            project: None,
787            pid: None,
788            display: car_proto::HostAgentDisplay {
789                label: None,
790                icon: None,
791                accent: None,
792            },
793            metadata: Value::Null,
794        }
795    }
796
797    #[tokio::test]
798    async fn set_status_rejects_non_owning_session() {
799        let host = HostState::new();
800        host.register_agent("client-A", make_register_request("worker"))
801            .await
802            .unwrap();
803
804        // Client B tries to mutate Client A's agent.
805        let err = host
806            .set_status(
807                "client-B",
808                SetHostAgentStatusRequest {
809                    agent_id: "worker".into(),
810                    status: HostAgentStatus::Errored,
811                    current_task: None,
812                    message: None,
813                    payload: Value::Null,
814                },
815            )
816            .await
817            .unwrap_err();
818        assert!(
819            err.contains("owned by another session"),
820            "unexpected rejection message: {err}"
821        );
822
823        // Owner still works.
824        host.set_status(
825            "client-A",
826            SetHostAgentStatusRequest {
827                agent_id: "worker".into(),
828                status: HostAgentStatus::Running,
829                current_task: None,
830                message: None,
831                payload: Value::Null,
832            },
833        )
834        .await
835        .expect("owner can mutate");
836    }
837
838    #[tokio::test]
839    async fn unregister_rejects_non_owning_session() {
840        let host = HostState::new();
841        host.register_agent("client-A", make_register_request("worker"))
842            .await
843            .unwrap();
844
845        let err = host
846            .unregister_agent("client-B", "worker")
847            .await
848            .unwrap_err();
849        assert!(err.contains("owned by another session"));
850        assert_eq!(host.agents().await.len(), 1, "agent must survive");
851
852        host.unregister_agent("client-A", "worker")
853            .await
854            .expect("owner can unregister");
855        assert_eq!(host.agents().await.len(), 0);
856    }
857
858    #[tokio::test]
859    async fn register_refuses_to_overwrite_other_sessions_agent() {
860        let host = HostState::new();
861        host.register_agent("client-A", make_register_request("worker"))
862            .await
863            .unwrap();
864
865        let err = host
866            .register_agent("client-B", make_register_request("worker"))
867            .await
868            .unwrap_err();
869        assert!(
870            err.contains("owned by another session"),
871            "unexpected message: {err}"
872        );
873    }
874
875    #[tokio::test]
876    async fn manual_approval_cross_session_with_disconnected_owner_errors() {
877        // client-A creates an approval but is NOT subscribed. client-B
878        // tries to resolve it; the fan-out path needs the owner
879        // connected, so this returns a clear "not connected" error
880        // (rather than silently no-op'ing or pretending success).
881        let host = HostState::new();
882        let approval = host
883            .create_approval(
884                Some("client-A"),
885                CreateHostApprovalRequest {
886                    agent_id: None,
887                    action: "manual.action".into(),
888                    details: serde_json::json!({}),
889                    options: vec![],
890                    system_level: false,
891                },
892            )
893            .await
894            .unwrap();
895
896        let err = host
897            .resolve_approval(
898                "client-B",
899                ResolveHostApprovalRequest {
900                    approval_id: approval.id.clone(),
901                    resolution: "approve".into(),
902                },
903            )
904            .await
905            .unwrap_err();
906        assert!(
907            err.contains("not currently connected"),
908            "expected disconnected-owner message, got: {err}",
909        );
910        // Approval stays Pending — non-owner failed call must not
911        // mutate state.
912        let still = host
913            .approvals()
914            .await
915            .into_iter()
916            .find(|a| a.id == approval.id)
917            .expect("approval survives failed resolve");
918        assert_eq!(still.status, HostApprovalStatus::Pending);
919
920        // Owner can still resolve directly (same-session fast path).
921        host.resolve_approval(
922            "client-A",
923            ResolveHostApprovalRequest {
924                approval_id: approval.id,
925                resolution: "deny".into(),
926            },
927        )
928        .await
929        .expect("owner can resolve");
930    }
931
932    #[tokio::test]
933    async fn manual_approval_cross_session_with_subscribed_owner_fans_out() {
934        // client-A creates an approval AND subscribes. client-B tries
935        // to resolve. The fan-out path records an
936        // `approval.resolve_requested` event the owner is expected to
937        // hook to perform the resolve on its own session. The approval
938        // row itself stays Pending until the owner acts — non-owner
939        // never mutates state (audit-2026-05 squat-prevention holds).
940        let host = HostState::new();
941
942        // Subscribe client-A. We only exercise the subscribers-map
943        // membership check, so a WsChannel::test_stub() (drain sink)
944        // is enough — nothing is actually written to the channel in
945        // this test.
946        host.subscribe("client-A", Arc::new(WsChannel::test_stub()))
947            .await;
948
949        let approval = host
950            .create_approval(
951                Some("client-A"),
952                CreateHostApprovalRequest {
953                    agent_id: Some("worker".into()),
954                    action: "Send email to bob@example.com".into(),
955                    details: serde_json::json!({"kind": "email_reply"}),
956                    options: vec![],
957                    system_level: false,
958                },
959            )
960            .await
961            .unwrap();
962
963        // client-B asks to resolve — should fan-out, not mutate.
964        let returned = host
965            .resolve_approval(
966                "client-B",
967                ResolveHostApprovalRequest {
968                    approval_id: approval.id.clone(),
969                    resolution: "deny".into(),
970                },
971            )
972            .await
973            .expect("cross-session resolve fans out without error");
974        assert_eq!(
975            returned.status,
976            HostApprovalStatus::Pending,
977            "returned row must stay Pending — only the owner mutates",
978        );
979
980        // An `approval.resolve_requested` event landed in the log.
981        let events = host.events(10).await;
982        let resolve_req = events
983            .iter()
984            .find(|e| e.kind == "approval.resolve_requested")
985            .expect("resolve_requested event recorded");
986        assert_eq!(
987            resolve_req.payload.get("approval_id").and_then(|v| v.as_str()),
988            Some(approval.id.as_str()),
989        );
990        assert_eq!(
991            resolve_req.payload.get("resolution").and_then(|v| v.as_str()),
992            Some("deny"),
993        );
994        assert_eq!(
995            resolve_req.payload.get("owner_client_id").and_then(|v| v.as_str()),
996            Some("client-A"),
997        );
998        assert_eq!(
999            resolve_req
1000                .payload
1001                .get("requesting_client_id")
1002                .and_then(|v| v.as_str()),
1003            Some("client-B"),
1004        );
1005
1006        // Approval row in the map still Pending — non-owner did NOT
1007        // squat the state.
1008        let still = host
1009            .approvals()
1010            .await
1011            .into_iter()
1012            .find(|a| a.id == approval.id)
1013            .unwrap();
1014        assert_eq!(still.status, HostApprovalStatus::Pending);
1015
1016        // Owner picks up the event and resolves on its own session —
1017        // this is the real fan-out completion path. After this fires,
1018        // the approval flips to Resolved and `approval.resolved`
1019        // lands in the event log for the original caller's UI.
1020        let resolved = host
1021            .resolve_approval(
1022                "client-A",
1023                ResolveHostApprovalRequest {
1024                    approval_id: approval.id.clone(),
1025                    resolution: "deny".into(),
1026                },
1027            )
1028            .await
1029            .expect("owner resolves on own session");
1030        assert_eq!(resolved.status, HostApprovalStatus::Resolved);
1031        assert_eq!(resolved.resolution.as_deref(), Some("deny"));
1032    }
1033
1034
1035    #[tokio::test]
1036    async fn system_approval_resolvable_by_any_session() {
1037        // The high-risk-method gate raises approvals with caller =
1038        // None so the local UI session — a different WS connection
1039        // from the one whose dispatch is parking — can resolve them.
1040        let host = HostState::new();
1041        let approval = host
1042            .create_approval(
1043                None,
1044                CreateHostApprovalRequest {
1045                    agent_id: None,
1046                    action: "ws.method:automation.run_applescript".into(),
1047                    details: serde_json::json!({}),
1048                    options: vec![],
1049                    system_level: false,
1050                },
1051            )
1052            .await
1053            .unwrap();
1054        assert!(
1055            approval.client_id.is_none(),
1056            "system approval must carry no owner"
1057        );
1058
1059        host.resolve_approval(
1060            "ui-session",
1061            ResolveHostApprovalRequest {
1062                approval_id: approval.id,
1063                resolution: "approve".into(),
1064            },
1065        )
1066        .await
1067        .expect("any session can resolve a system approval");
1068    }
1069
1070    // -- car-releases#48: stale-approval reaping --
1071
1072    #[tokio::test]
1073    async fn unregister_agent_reaps_its_pending_approvals() {
1074        let host = HostState::new();
1075        host.register_agent("client-1", make_register_request("agent-1"))
1076            .await
1077            .unwrap();
1078        let approval = host
1079            .create_approval(
1080                Some("client-1"),
1081                CreateHostApprovalRequest {
1082                    agent_id: Some("agent-1".to_string()),
1083                    action: "Wire $1M".to_string(),
1084                    details: Value::Null,
1085                    options: vec![],
1086                    system_level: false,
1087                },
1088            )
1089            .await
1090            .unwrap();
1091        assert_eq!(approval.status, HostApprovalStatus::Pending);
1092
1093        host.unregister_agent("client-1", "agent-1").await.unwrap();
1094
1095        let pending: Vec<_> = host
1096            .approvals()
1097            .await
1098            .into_iter()
1099            .filter(|a| a.status == HostApprovalStatus::Pending)
1100            .collect();
1101        assert!(pending.is_empty(), "agent's approval must be auto-cancelled");
1102        let all = host.approvals().await;
1103        let reaped = all.iter().find(|a| a.id == approval.id).unwrap();
1104        assert_eq!(reaped.status, HostApprovalStatus::Resolved);
1105        assert_eq!(reaped.resolution.as_deref(), Some("agent_gone"));
1106    }
1107
1108    #[tokio::test]
1109    async fn session_disconnect_reaps_owned_but_not_system_approvals() {
1110        let host = HostState::new();
1111        // Owned by the disconnecting session.
1112        let owned = host
1113            .create_approval(
1114                Some("client-9"),
1115                CreateHostApprovalRequest {
1116                    agent_id: Some("agent-x".to_string()),
1117                    action: "owned".to_string(),
1118                    details: Value::Null,
1119                    options: vec![],
1120                    system_level: false,
1121                },
1122            )
1123            .await
1124            .unwrap();
1125        // System-level gate approval (client_id None) — must survive.
1126        let system = host
1127            .create_approval(
1128                None,
1129                CreateHostApprovalRequest {
1130                    agent_id: None,
1131                    action: "system gate".to_string(),
1132                    details: Value::Null,
1133                    options: vec![],
1134                    system_level: true,
1135                },
1136            )
1137            .await
1138            .unwrap();
1139
1140        let n = host.reap_session_approvals("client-9").await;
1141        assert_eq!(n, 1, "only the session-owned approval is reaped");
1142
1143        let all = host.approvals().await;
1144        let owned_now = all.iter().find(|a| a.id == owned.id).unwrap();
1145        let system_now = all.iter().find(|a| a.id == system.id).unwrap();
1146        assert_eq!(owned_now.status, HostApprovalStatus::Resolved);
1147        assert_eq!(owned_now.resolution.as_deref(), Some("agent_gone"));
1148        assert_eq!(
1149            system_now.status,
1150            HostApprovalStatus::Pending,
1151            "system-level gate approvals must outlive a client disconnect"
1152        );
1153    }
1154}