Skip to main content

car_server_core/
host.rs

1//! Host-facing state for OS integrations.
2//!
3//! This module is intentionally UI-agnostic. Native shells such as a macOS menu
4//! bar app, Windows tray app, or Linux status notifier can subscribe to these
5//! events and render the same agent control surface.
6
7use crate::session::WsChannel;
8use car_proto::{
9    CreateHostApprovalRequest, HostAgent, HostAgentStatus, HostApprovalRequest, HostApprovalStatus,
10    HostEvent, RegisterHostAgentRequest, ResolveHostApprovalRequest, SetHostAgentStatusRequest,
11};
12use chrono::Utc;
13use futures::SinkExt;
14use serde_json::Value;
15use std::collections::{HashMap, VecDeque};
16use std::sync::Arc;
17use std::time::Duration;
18use tokio::sync::{Mutex, Notify};
19use tokio_tungstenite::tungstenite::Message;
20
21const MAX_EVENTS: usize = 500;
22
23#[derive(Default)]
24pub struct HostState {
25    agents: Mutex<HashMap<String, HostAgent>>,
26    approvals: Mutex<HashMap<String, HostApprovalRequest>>,
27    events: Mutex<VecDeque<HostEvent>>,
28    subscribers: Mutex<HashMap<String, Arc<WsChannel>>>,
29    /// Per-approval `Notify` so [`HostState::wait_for_resolution`]
30    /// can park efficiently instead of polling the approvals map.
31    /// Inserted when the gate creates an approval, removed when the
32    /// resolution comes in (or when the wait drops it on timeout).
33    /// Kept off the public surface because it's an implementation
34    /// detail of the gate path — direct callers of `create_approval`
35    /// don't need it.
36    notifies: Mutex<HashMap<String, Arc<Notify>>>,
37}
38
39/// Outcome of a gated high-risk call.
40///
41/// Returned by [`HostState::request_and_wait_approval`]. Callers map
42/// each variant to the JSON-RPC error / success they want to surface.
43#[derive(Debug, Clone, Copy, PartialEq, Eq)]
44pub enum ApprovalOutcome {
45    /// The user picked the explicit "approve" option (or whatever the
46    /// caller declared as the approve label).
47    Approved,
48    /// The user picked any other option, or the resolution string
49    /// didn't match the approve label.
50    Denied,
51    /// No resolution arrived inside the supplied timeout. Treated as
52    /// deny by callers; the approval row is left in `Pending` so the
53    /// UI can still display it for forensics.
54    TimedOut,
55}
56
57impl HostState {
58    pub fn new() -> Self {
59        Self::default()
60    }
61
62    pub async fn subscribe(&self, client_id: &str, channel: Arc<WsChannel>) {
63        self.subscribers
64            .lock()
65            .await
66            .insert(client_id.to_string(), channel);
67    }
68
69    pub async fn unsubscribe(&self, client_id: &str) {
70        self.subscribers.lock().await.remove(client_id);
71    }
72
73    pub async fn register_agent(
74        &self,
75        client_id: &str,
76        req: RegisterHostAgentRequest,
77    ) -> Result<HostAgent, String> {
78        let id = req.id.unwrap_or_else(|| format!("agent-{}", short_id()));
79        // ACL (audit 2026-05): if an agent with this id already
80        // exists and was registered by a *different* session, refuse
81        // — pre-flip a second client could overwrite the first
82        // client's agent record.
83        {
84            let agents = self.agents.lock().await;
85            if let Some(existing) = agents.get(&id) {
86                match existing.session_id.as_deref() {
87                    Some(owner) if owner != client_id => {
88                        return Err(format!(
89                            "agent '{id}' is owned by another session; \
90                             unregister it from that session first"
91                        ));
92                    }
93                    _ => {}
94                }
95            }
96        }
97
98        let agent = HostAgent {
99            id: id.clone(),
100            name: req.name,
101            kind: req.kind,
102            capabilities: req.capabilities,
103            project: req.project,
104            session_id: Some(client_id.to_string()),
105            status: HostAgentStatus::Idle,
106            current_task: None,
107            pid: req.pid,
108            display: req.display,
109            updated_at: Utc::now(),
110            metadata: req.metadata,
111        };
112
113        self.agents.lock().await.insert(id.clone(), agent.clone());
114        self.record_event(
115            "agent.registered",
116            Some(id),
117            format!("{} registered", agent.name),
118            serde_json::to_value(&agent).map_err(|e| e.to_string())?,
119        )
120        .await;
121
122        Ok(agent)
123    }
124
125    pub async fn unregister_agent(
126        &self,
127        caller_client_id: &str,
128        agent_id: &str,
129    ) -> Result<(), String> {
130        // ACL (audit 2026-05): only the registering session can
131        // unregister. Agents with no session_id (legacy / admin-
132        // installed records) accept any caller.
133        {
134            let agents = self.agents.lock().await;
135            if let Some(existing) = agents.get(agent_id) {
136                if let Some(owner) = existing.session_id.as_deref() {
137                    if owner != caller_client_id {
138                        return Err(format!("agent '{agent_id}' is owned by another session"));
139                    }
140                }
141            }
142        }
143
144        let removed = self.agents.lock().await.remove(agent_id);
145        if removed.is_none() {
146            return Err(format!("unknown agent '{}'", agent_id));
147        }
148        self.record_event(
149            "agent.unregistered",
150            Some(agent_id.to_string()),
151            format!("{} unregistered", agent_id),
152            Value::Null,
153        )
154        .await;
155        Ok(())
156    }
157
158    pub async fn set_status(
159        &self,
160        caller_client_id: &str,
161        req: SetHostAgentStatusRequest,
162    ) -> Result<HostAgent, String> {
163        let mut agents = self.agents.lock().await;
164        let agent = agents
165            .get_mut(&req.agent_id)
166            .ok_or_else(|| format!("unknown agent '{}'", req.agent_id))?;
167
168        // ACL (audit 2026-05): only the agent's owning session can
169        // change its status. Pre-flip a second client could mark
170        // someone else's agent as Errored / WaitingForApproval /
171        // anything else from outside the workflow.
172        if let Some(owner) = agent.session_id.as_deref() {
173            if owner != caller_client_id {
174                return Err(format!(
175                    "agent '{}' is owned by another session",
176                    req.agent_id
177                ));
178            }
179        }
180
181        agent.status = req.status.clone();
182        agent.current_task = req.current_task.clone();
183        agent.updated_at = Utc::now();
184        let updated = agent.clone();
185        drop(agents);
186
187        let message = req
188            .message
189            .unwrap_or_else(|| format!("{} is {:?}", updated.name, updated.status));
190        self.record_event(
191            "agent.status_changed",
192            Some(updated.id.clone()),
193            message,
194            if req.payload.is_null() {
195                serde_json::to_value(&updated).map_err(|e| e.to_string())?
196            } else {
197                req.payload
198            },
199        )
200        .await;
201
202        Ok(updated)
203    }
204
205    /// Create an approval owned by `caller_client_id`. Pass `None`
206    /// to mark the approval *system-level* — the high-risk-method
207    /// approval gate uses this so the local UI session (a different
208    /// session than the one whose dispatch is parking) can resolve
209    /// it. Audit 2026-05: prior to this caller arg, `create_approval`
210    /// had no notion of ownership and `resolve_approval` was open
211    /// to any caller, allowing cross-session approval squatting.
212    pub async fn create_approval(
213        &self,
214        caller_client_id: Option<&str>,
215        req: CreateHostApprovalRequest,
216    ) -> Result<HostApprovalRequest, String> {
217        let approval = HostApprovalRequest {
218            id: format!("approval-{}", short_id()),
219            agent_id: req.agent_id,
220            client_id: caller_client_id.map(|s| s.to_string()),
221            action: req.action,
222            details: req.details,
223            options: if req.options.is_empty() {
224                vec!["approve".to_string(), "deny".to_string()]
225            } else {
226                req.options
227            },
228            status: HostApprovalStatus::Pending,
229            created_at: Utc::now(),
230            resolved_at: None,
231            resolution: None,
232        };
233
234        self.approvals
235            .lock()
236            .await
237            .insert(approval.id.clone(), approval.clone());
238        self.record_event(
239            "approval.requested",
240            approval.agent_id.clone(),
241            format!("Approval requested: {}", approval.action),
242            serde_json::to_value(&approval).map_err(|e| e.to_string())?,
243        )
244        .await;
245        Ok(approval)
246    }
247
248    /// Resolve an approval. ACL rules (audit 2026-05, fan-out added 2026-05-15):
249    /// - Approval has `client_id: None` (system-level, e.g. raised
250    ///   by the high-risk-method gate) → any authed caller may
251    ///   resolve, since the gate's whole point is that *the user*
252    ///   acks it via whichever session their UI happens to use.
253    /// - Approval has `client_id: Some(x)` AND caller IS `x` →
254    ///   resolve directly, fire `approval.resolved`.
255    /// - Approval has `client_id: Some(x)` AND caller is a DIFFERENT
256    ///   session AND owner is still subscribed → fan-out: record an
257    ///   `approval.resolve_requested` event that the owning agent
258    ///   hooks to call `resolve_approval` on its own session. The
259    ///   approval row stays Pending until the owner completes it.
260    ///   Caller gets back the pending approval (status: Pending).
261    ///   This unblocks UIs like CarHost that surface every approval
262    ///   in `host.approvals` — including ones agents pushed via
263    ///   `host.request_approval` — without breaking the squat-
264    ///   prevention property: only the OWNER ever mutates the row,
265    ///   the non-owning caller just signals intent.
266    /// - Cross-session AND owner is NOT subscribed → return error
267    ///   identifying the disconnected owner. Caller's UI knows the
268    ///   resolution can't land right now and surfaces accordingly.
269    pub async fn resolve_approval(
270        &self,
271        caller_client_id: &str,
272        req: ResolveHostApprovalRequest,
273    ) -> Result<HostApprovalRequest, String> {
274        // First pass: snapshot the approval and check ownership.
275        // We don't hold the approvals lock across the subscribers
276        // lock — both are mutexes and arbitrary lock-order would
277        // invite a deadlock.
278        let (owner_opt, pending_snapshot) = {
279            let mut approvals = self.approvals.lock().await;
280            let approval = approvals
281                .get_mut(&req.approval_id)
282                .ok_or_else(|| format!("unknown approval '{}'", req.approval_id))?;
283            (approval.client_id.clone(), approval.clone())
284        };
285
286        if let Some(owner) = owner_opt {
287            if owner != caller_client_id {
288                // Cross-session resolve. If the owning session is
289                // currently subscribed, fan out and let them act on
290                // their own session. If not, fail explicitly so the
291                // caller's UI can tell the user.
292                let owner_subscribed =
293                    self.subscribers.lock().await.contains_key(&owner);
294                if !owner_subscribed {
295                    return Err(format!(
296                        "approval '{}' is owned by session '{}' which is not currently connected",
297                        req.approval_id, owner,
298                    ));
299                }
300                self.record_event(
301                    "approval.resolve_requested",
302                    pending_snapshot.agent_id.clone(),
303                    format!(
304                        "Resolution requested for {}: {}",
305                        pending_snapshot.action, req.resolution
306                    ),
307                    serde_json::json!({
308                        "approval_id": req.approval_id,
309                        "resolution": req.resolution,
310                        "requesting_client_id": caller_client_id,
311                        "owner_client_id": owner,
312                    }),
313                )
314                .await;
315                // Return the still-Pending row. The owner's resolve
316                // call will fire `approval.resolved`, which the
317                // caller's subscription picks up to update its UI.
318                return Ok(pending_snapshot);
319            }
320        }
321
322        // Same-session OR system-level. Resolve directly.
323        let resolved = {
324            let mut approvals = self.approvals.lock().await;
325            let approval = approvals
326                .get_mut(&req.approval_id)
327                .ok_or_else(|| format!("approval '{}' vanished mid-resolve", req.approval_id))?;
328            approval.status = HostApprovalStatus::Resolved;
329            approval.resolution = Some(req.resolution);
330            approval.resolved_at = Some(Utc::now());
331            approval.clone()
332        };
333
334        // Wake any gate task parked on this approval. Take the Notify
335        // out of the map (it's one-shot) before notifying so the wait
336        // task can drop its Arc cleanly. `notify_one` is safe even if
337        // no one is waiting yet — the Notify holds the permit.
338        if let Some(notify) = self.notifies.lock().await.remove(&resolved.id) {
339            notify.notify_one();
340        }
341
342        self.record_event(
343            "approval.resolved",
344            resolved.agent_id.clone(),
345            format!("Approval resolved: {}", resolved.action),
346            serde_json::to_value(&resolved).map_err(|e| e.to_string())?,
347        )
348        .await;
349        Ok(resolved)
350    }
351
352    /// Create an approval and block until the user resolves it (or
353    /// `timeout` elapses).
354    ///
355    /// Used by the high-risk-method gate in the WS dispatcher to make
356    /// the human a load-bearing participant in actions like
357    /// `automation.run_applescript`, `messages.send`, etc. The
358    /// outcome maps as follows:
359    ///
360    /// - resolution string equals `approve_label` → [`ApprovalOutcome::Approved`]
361    /// - any other resolution string → [`ApprovalOutcome::Denied`]
362    /// - timeout fires before resolution → [`ApprovalOutcome::TimedOut`]
363    ///
364    /// Subscribers receive the standard `approval.requested` event
365    /// the moment the approval is created; the local HTML UI and
366    /// any other host shell can render approve/deny buttons that
367    /// call `host.resolve_approval`.
368    ///
369    /// On timeout, the approval row is left in `Pending` on purpose
370    /// — the UI still shows it (with a "expired" hint the renderer
371    /// can derive from `created_at`) and the gate path returns
372    /// `TimedOut` so the caller surfaces a clear error.
373    pub async fn request_and_wait_approval(
374        &self,
375        req: CreateHostApprovalRequest,
376        approve_label: &str,
377        timeout: Duration,
378    ) -> Result<ApprovalOutcome, String> {
379        // Gate-raised approvals are system-level (caller_client_id =
380        // None) so the local UI session — which is a *different*
381        // session from the one whose dispatch is parking — is
382        // permitted to resolve them. Per-session ACL would deadlock
383        // the UX otherwise.
384        let approval = self.create_approval(None, req).await?;
385        let approval_id = approval.id.clone();
386
387        // Register the wakeup channel BEFORE we sleep. resolve_approval
388        // pulls this notify out of the map and signals it; if it's
389        // missing (because resolve raced ahead), we re-check the
390        // approvals map below before waiting.
391        let notify = Arc::new(Notify::new());
392        {
393            let mut map = self.notifies.lock().await;
394            map.insert(approval_id.clone(), notify.clone());
395        }
396
397        // Defensive re-check: if resolve_approval landed between
398        // create_approval and the notify insert, the wakeup is gone
399        // but the approvals map already has the resolution. Pull it
400        // out and short-circuit.
401        if let Some(resolved) = self.approvals.lock().await.get(&approval_id).cloned() {
402            if matches!(resolved.status, HostApprovalStatus::Resolved) {
403                self.notifies.lock().await.remove(&approval_id);
404                return Ok(classify_resolution(&resolved, approve_label));
405            }
406        }
407
408        // Park until either the notify fires or the timeout elapses.
409        // We hold the Arc<Notify>; drop ours on the way out so the
410        // map slot is free either way (resolve removes it; timeout
411        // also removes it below).
412        let woken = tokio::time::timeout(timeout, notify.notified()).await;
413        if woken.is_err() {
414            self.notifies.lock().await.remove(&approval_id);
415            return Ok(ApprovalOutcome::TimedOut);
416        }
417
418        let resolved = self
419            .approvals
420            .lock()
421            .await
422            .get(&approval_id)
423            .cloned()
424            .ok_or_else(|| format!("approval '{}' vanished after notify", approval_id))?;
425        Ok(classify_resolution(&resolved, approve_label))
426    }
427
428    pub async fn agents(&self) -> Vec<HostAgent> {
429        let mut agents: Vec<_> = self.agents.lock().await.values().cloned().collect();
430        agents.sort_by(|a, b| a.name.cmp(&b.name).then(a.id.cmp(&b.id)));
431        agents
432    }
433
434    pub async fn approvals(&self) -> Vec<HostApprovalRequest> {
435        let mut approvals: Vec<_> = self.approvals.lock().await.values().cloned().collect();
436        approvals.sort_by(|a, b| b.created_at.cmp(&a.created_at));
437        approvals
438    }
439
440    pub async fn events(&self, limit: usize) -> Vec<HostEvent> {
441        self.events
442            .lock()
443            .await
444            .iter()
445            .rev()
446            .take(limit)
447            .cloned()
448            .collect()
449    }
450
451    pub async fn record_event(
452        &self,
453        kind: impl Into<String>,
454        agent_id: Option<String>,
455        message: impl Into<String>,
456        payload: Value,
457    ) -> HostEvent {
458        let event = HostEvent {
459            id: format!("event-{}", short_id()),
460            timestamp: Utc::now(),
461            kind: kind.into(),
462            agent_id,
463            message: message.into(),
464            payload,
465        };
466
467        {
468            let mut events = self.events.lock().await;
469            events.push_back(event.clone());
470            while events.len() > MAX_EVENTS {
471                events.pop_front();
472            }
473        }
474
475        self.broadcast_event(&event).await;
476        event
477    }
478
479    async fn broadcast_event(&self, event: &HostEvent) {
480        let subscribers: Vec<Arc<WsChannel>> =
481            self.subscribers.lock().await.values().cloned().collect();
482
483        let Ok(json) = serde_json::to_string(&serde_json::json!({
484            "jsonrpc": "2.0",
485            "method": "host.event",
486            "params": event,
487        })) else {
488            return;
489        };
490
491        for channel in subscribers {
492            let _ = channel
493                .write
494                .lock()
495                .await
496                .send(Message::Text(json.clone().into()))
497                .await;
498        }
499    }
500}
501
502fn short_id() -> String {
503    uuid::Uuid::new_v4().simple().to_string()[..12].to_string()
504}
505
506fn classify_resolution(approval: &HostApprovalRequest, approve_label: &str) -> ApprovalOutcome {
507    match approval.resolution.as_deref() {
508        Some(r) if r == approve_label => ApprovalOutcome::Approved,
509        _ => ApprovalOutcome::Denied,
510    }
511}
512
513#[cfg(test)]
514mod tests {
515    use super::*;
516
517    #[tokio::test]
518    async fn host_tracks_agents_events_and_approvals() {
519        let host = HostState::new();
520
521        let agent = host
522            .register_agent(
523                "client-1",
524                RegisterHostAgentRequest {
525                    id: Some("agent-1".to_string()),
526                    name: "Researcher".to_string(),
527                    kind: "builtin".to_string(),
528                    capabilities: vec!["search".to_string()],
529                    project: Some("/tmp/project".to_string()),
530                    pid: None,
531                    display: car_proto::HostAgentDisplay {
532                        label: Some("Research Lead".to_string()),
533                        icon: Some("magnifying-glass".to_string()),
534                        accent: Some("#0a84ff".to_string()),
535                    },
536                    metadata: Value::Null,
537                },
538            )
539            .await
540            .expect("register agent");
541
542        assert_eq!(agent.status, HostAgentStatus::Idle);
543        assert_eq!(agent.display.label.as_deref(), Some("Research Lead"));
544        assert_eq!(agent.display.icon.as_deref(), Some("magnifying-glass"));
545        assert_eq!(agent.display.accent.as_deref(), Some("#0a84ff"));
546        assert_eq!(host.agents().await.len(), 1);
547
548        let updated = host
549            .set_status(
550                "client-1",
551                SetHostAgentStatusRequest {
552                    agent_id: "agent-1".to_string(),
553                    status: HostAgentStatus::Running,
554                    current_task: Some("Collect facts".to_string()),
555                    message: None,
556                    payload: Value::Null,
557                },
558            )
559            .await
560            .expect("set status");
561
562        assert_eq!(updated.status, HostAgentStatus::Running);
563        assert_eq!(updated.current_task.as_deref(), Some("Collect facts"));
564
565        let approval = host
566            .create_approval(
567                Some("client-1"),
568                CreateHostApprovalRequest {
569                    agent_id: Some("agent-1".to_string()),
570                    action: "Run tests".to_string(),
571                    details: serde_json::json!({ "command": "cargo test" }),
572                    options: vec![],
573                    system_level: false,
574                },
575            )
576            .await
577            .expect("create approval");
578
579        assert_eq!(approval.options, vec!["approve", "deny"]);
580        assert_eq!(approval.status, HostApprovalStatus::Pending);
581
582        let resolved = host
583            .resolve_approval(
584                "client-1",
585                ResolveHostApprovalRequest {
586                    approval_id: approval.id,
587                    resolution: "approve".to_string(),
588                },
589            )
590            .await
591            .expect("resolve approval");
592
593        assert_eq!(resolved.status, HostApprovalStatus::Resolved);
594        assert_eq!(resolved.resolution.as_deref(), Some("approve"));
595        assert!(host.events(10).await.len() >= 4);
596    }
597
598    #[tokio::test]
599    async fn request_and_wait_returns_approved_when_user_approves() {
600        let host = Arc::new(HostState::new());
601        let host2 = host.clone();
602
603        // Launch the gate in the background; resolve after a small
604        // delay to mirror the realistic UI round-trip.
605        let waiter = tokio::spawn(async move {
606            host2
607                .request_and_wait_approval(
608                    CreateHostApprovalRequest {
609                        agent_id: None,
610                        action: "automation.run_applescript".into(),
611                        details: serde_json::json!({}),
612                        options: vec![],
613                    system_level: false,
614                    },
615                    "approve",
616                    Duration::from_secs(2),
617                )
618                .await
619                .expect("gate ran")
620        });
621
622        // Find the pending approval and resolve it. The gate raises
623        // approvals as system-level (client_id None), so any caller
624        // can resolve — using "ui-session" here mirrors how the
625        // local UI session would.
626        tokio::time::sleep(Duration::from_millis(20)).await;
627        let pending = host.approvals().await;
628        assert_eq!(pending.len(), 1, "exactly one pending approval");
629        assert!(
630            pending[0].client_id.is_none(),
631            "gate-raised approvals must be system-level"
632        );
633        host.resolve_approval(
634            "ui-session",
635            ResolveHostApprovalRequest {
636                approval_id: pending[0].id.clone(),
637                resolution: "approve".into(),
638            },
639        )
640        .await
641        .unwrap();
642
643        let outcome = waiter.await.unwrap();
644        assert_eq!(outcome, ApprovalOutcome::Approved);
645    }
646
647    #[tokio::test]
648    async fn request_and_wait_returns_denied_on_other_resolution() {
649        let host = Arc::new(HostState::new());
650        let host2 = host.clone();
651        let waiter = tokio::spawn(async move {
652            host2
653                .request_and_wait_approval(
654                    CreateHostApprovalRequest {
655                        agent_id: None,
656                        action: "messages.send".into(),
657                        details: serde_json::json!({}),
658                        options: vec![],
659                    system_level: false,
660                    },
661                    "approve",
662                    Duration::from_secs(2),
663                )
664                .await
665                .unwrap()
666        });
667        tokio::time::sleep(Duration::from_millis(20)).await;
668        let pending = host.approvals().await;
669        host.resolve_approval(
670            "ui-session",
671            ResolveHostApprovalRequest {
672                approval_id: pending[0].id.clone(),
673                resolution: "deny".into(),
674            },
675        )
676        .await
677        .unwrap();
678        assert_eq!(waiter.await.unwrap(), ApprovalOutcome::Denied);
679    }
680
681    #[tokio::test]
682    async fn request_and_wait_times_out_when_no_resolution() {
683        let host = HostState::new();
684        let outcome = host
685            .request_and_wait_approval(
686                CreateHostApprovalRequest {
687                    agent_id: None,
688                    action: "vision.ocr".into(),
689                    details: serde_json::json!({}),
690                    options: vec![],
691                    system_level: false,
692                },
693                "approve",
694                Duration::from_millis(50),
695            )
696            .await
697            .unwrap();
698        assert_eq!(outcome, ApprovalOutcome::TimedOut);
699        // Approval row stays in Pending so the UI keeps a record.
700        let pending = host.approvals().await;
701        assert_eq!(pending.len(), 1);
702        assert_eq!(pending[0].status, HostApprovalStatus::Pending);
703    }
704
705    // -- ACL regression tests (audit 2026-05) --
706
707    fn make_register_request(name: &str) -> RegisterHostAgentRequest {
708        RegisterHostAgentRequest {
709            id: Some(name.into()),
710            name: name.into(),
711            kind: "test".into(),
712            capabilities: vec![],
713            project: None,
714            pid: None,
715            display: car_proto::HostAgentDisplay {
716                label: None,
717                icon: None,
718                accent: None,
719            },
720            metadata: Value::Null,
721        }
722    }
723
724    #[tokio::test]
725    async fn set_status_rejects_non_owning_session() {
726        let host = HostState::new();
727        host.register_agent("client-A", make_register_request("worker"))
728            .await
729            .unwrap();
730
731        // Client B tries to mutate Client A's agent.
732        let err = host
733            .set_status(
734                "client-B",
735                SetHostAgentStatusRequest {
736                    agent_id: "worker".into(),
737                    status: HostAgentStatus::Errored,
738                    current_task: None,
739                    message: None,
740                    payload: Value::Null,
741                },
742            )
743            .await
744            .unwrap_err();
745        assert!(
746            err.contains("owned by another session"),
747            "unexpected rejection message: {err}"
748        );
749
750        // Owner still works.
751        host.set_status(
752            "client-A",
753            SetHostAgentStatusRequest {
754                agent_id: "worker".into(),
755                status: HostAgentStatus::Running,
756                current_task: None,
757                message: None,
758                payload: Value::Null,
759            },
760        )
761        .await
762        .expect("owner can mutate");
763    }
764
765    #[tokio::test]
766    async fn unregister_rejects_non_owning_session() {
767        let host = HostState::new();
768        host.register_agent("client-A", make_register_request("worker"))
769            .await
770            .unwrap();
771
772        let err = host
773            .unregister_agent("client-B", "worker")
774            .await
775            .unwrap_err();
776        assert!(err.contains("owned by another session"));
777        assert_eq!(host.agents().await.len(), 1, "agent must survive");
778
779        host.unregister_agent("client-A", "worker")
780            .await
781            .expect("owner can unregister");
782        assert_eq!(host.agents().await.len(), 0);
783    }
784
785    #[tokio::test]
786    async fn register_refuses_to_overwrite_other_sessions_agent() {
787        let host = HostState::new();
788        host.register_agent("client-A", make_register_request("worker"))
789            .await
790            .unwrap();
791
792        let err = host
793            .register_agent("client-B", make_register_request("worker"))
794            .await
795            .unwrap_err();
796        assert!(
797            err.contains("owned by another session"),
798            "unexpected message: {err}"
799        );
800    }
801
802    #[tokio::test]
803    async fn manual_approval_cross_session_with_disconnected_owner_errors() {
804        // client-A creates an approval but is NOT subscribed. client-B
805        // tries to resolve it; the fan-out path needs the owner
806        // connected, so this returns a clear "not connected" error
807        // (rather than silently no-op'ing or pretending success).
808        let host = HostState::new();
809        let approval = host
810            .create_approval(
811                Some("client-A"),
812                CreateHostApprovalRequest {
813                    agent_id: None,
814                    action: "manual.action".into(),
815                    details: serde_json::json!({}),
816                    options: vec![],
817                    system_level: false,
818                },
819            )
820            .await
821            .unwrap();
822
823        let err = host
824            .resolve_approval(
825                "client-B",
826                ResolveHostApprovalRequest {
827                    approval_id: approval.id.clone(),
828                    resolution: "approve".into(),
829                },
830            )
831            .await
832            .unwrap_err();
833        assert!(
834            err.contains("not currently connected"),
835            "expected disconnected-owner message, got: {err}",
836        );
837        // Approval stays Pending — non-owner failed call must not
838        // mutate state.
839        let still = host
840            .approvals()
841            .await
842            .into_iter()
843            .find(|a| a.id == approval.id)
844            .expect("approval survives failed resolve");
845        assert_eq!(still.status, HostApprovalStatus::Pending);
846
847        // Owner can still resolve directly (same-session fast path).
848        host.resolve_approval(
849            "client-A",
850            ResolveHostApprovalRequest {
851                approval_id: approval.id,
852                resolution: "deny".into(),
853            },
854        )
855        .await
856        .expect("owner can resolve");
857    }
858
859    #[tokio::test]
860    async fn manual_approval_cross_session_with_subscribed_owner_fans_out() {
861        // client-A creates an approval AND subscribes. client-B tries
862        // to resolve. The fan-out path records an
863        // `approval.resolve_requested` event the owner is expected to
864        // hook to perform the resolve on its own session. The approval
865        // row itself stays Pending until the owner acts — non-owner
866        // never mutates state (audit-2026-05 squat-prevention holds).
867        let host = HostState::new();
868
869        // Subscribe client-A. We only exercise the subscribers-map
870        // membership check, so a WsChannel::test_stub() (drain sink)
871        // is enough — nothing is actually written to the channel in
872        // this test.
873        host.subscribe("client-A", Arc::new(WsChannel::test_stub()))
874            .await;
875
876        let approval = host
877            .create_approval(
878                Some("client-A"),
879                CreateHostApprovalRequest {
880                    agent_id: Some("worker".into()),
881                    action: "Send email to bob@example.com".into(),
882                    details: serde_json::json!({"kind": "email_reply"}),
883                    options: vec![],
884                    system_level: false,
885                },
886            )
887            .await
888            .unwrap();
889
890        // client-B asks to resolve — should fan-out, not mutate.
891        let returned = host
892            .resolve_approval(
893                "client-B",
894                ResolveHostApprovalRequest {
895                    approval_id: approval.id.clone(),
896                    resolution: "deny".into(),
897                },
898            )
899            .await
900            .expect("cross-session resolve fans out without error");
901        assert_eq!(
902            returned.status,
903            HostApprovalStatus::Pending,
904            "returned row must stay Pending — only the owner mutates",
905        );
906
907        // An `approval.resolve_requested` event landed in the log.
908        let events = host.events(10).await;
909        let resolve_req = events
910            .iter()
911            .find(|e| e.kind == "approval.resolve_requested")
912            .expect("resolve_requested event recorded");
913        assert_eq!(
914            resolve_req.payload.get("approval_id").and_then(|v| v.as_str()),
915            Some(approval.id.as_str()),
916        );
917        assert_eq!(
918            resolve_req.payload.get("resolution").and_then(|v| v.as_str()),
919            Some("deny"),
920        );
921        assert_eq!(
922            resolve_req.payload.get("owner_client_id").and_then(|v| v.as_str()),
923            Some("client-A"),
924        );
925        assert_eq!(
926            resolve_req
927                .payload
928                .get("requesting_client_id")
929                .and_then(|v| v.as_str()),
930            Some("client-B"),
931        );
932
933        // Approval row in the map still Pending — non-owner did NOT
934        // squat the state.
935        let still = host
936            .approvals()
937            .await
938            .into_iter()
939            .find(|a| a.id == approval.id)
940            .unwrap();
941        assert_eq!(still.status, HostApprovalStatus::Pending);
942
943        // Owner picks up the event and resolves on its own session —
944        // this is the real fan-out completion path. After this fires,
945        // the approval flips to Resolved and `approval.resolved`
946        // lands in the event log for the original caller's UI.
947        let resolved = host
948            .resolve_approval(
949                "client-A",
950                ResolveHostApprovalRequest {
951                    approval_id: approval.id.clone(),
952                    resolution: "deny".into(),
953                },
954            )
955            .await
956            .expect("owner resolves on own session");
957        assert_eq!(resolved.status, HostApprovalStatus::Resolved);
958        assert_eq!(resolved.resolution.as_deref(), Some("deny"));
959    }
960
961
962    #[tokio::test]
963    async fn system_approval_resolvable_by_any_session() {
964        // The high-risk-method gate raises approvals with caller =
965        // None so the local UI session — a different WS connection
966        // from the one whose dispatch is parking — can resolve them.
967        let host = HostState::new();
968        let approval = host
969            .create_approval(
970                None,
971                CreateHostApprovalRequest {
972                    agent_id: None,
973                    action: "ws.method:automation.run_applescript".into(),
974                    details: serde_json::json!({}),
975                    options: vec![],
976                    system_level: false,
977                },
978            )
979            .await
980            .unwrap();
981        assert!(
982            approval.client_id.is_none(),
983            "system approval must carry no owner"
984        );
985
986        host.resolve_approval(
987            "ui-session",
988            ResolveHostApprovalRequest {
989                approval_id: approval.id,
990                resolution: "approve".into(),
991            },
992        )
993        .await
994        .expect("any session can resolve a system approval");
995    }
996}