1use crate::session::WsChannel;
8use car_proto::{
9 CreateHostApprovalRequest, HostAgent, HostAgentStatus, HostApprovalRequest, HostApprovalStatus,
10 HostEvent, RegisterHostAgentRequest, ResolveHostApprovalRequest, SetHostAgentStatusRequest,
11};
12use chrono::Utc;
13use futures::SinkExt;
14use serde_json::Value;
15use std::collections::{HashMap, VecDeque};
16use std::sync::Arc;
17use std::time::Duration;
18use tokio::sync::{Mutex, Notify};
19use tokio_tungstenite::tungstenite::Message;
20
21const MAX_EVENTS: usize = 500;
22
23#[derive(Default)]
24pub struct HostState {
25 agents: Mutex<HashMap<String, HostAgent>>,
26 approvals: Mutex<HashMap<String, HostApprovalRequest>>,
27 events: Mutex<VecDeque<HostEvent>>,
28 subscribers: Mutex<HashMap<String, Arc<WsChannel>>>,
29 notifies: Mutex<HashMap<String, Arc<Notify>>>,
37}
38
39#[derive(Debug, Clone, Copy, PartialEq, Eq)]
44pub enum ApprovalOutcome {
45 Approved,
48 Denied,
51 TimedOut,
55}
56
57impl HostState {
58 pub fn new() -> Self {
59 Self::default()
60 }
61
62 pub async fn subscribe(&self, client_id: &str, channel: Arc<WsChannel>) {
63 self.subscribers
64 .lock()
65 .await
66 .insert(client_id.to_string(), channel);
67 }
68
69 pub async fn unsubscribe(&self, client_id: &str) {
70 self.subscribers.lock().await.remove(client_id);
71 }
72
73 pub async fn register_agent(
74 &self,
75 client_id: &str,
76 req: RegisterHostAgentRequest,
77 ) -> Result<HostAgent, String> {
78 let id = req.id.unwrap_or_else(|| format!("agent-{}", short_id()));
79 {
84 let agents = self.agents.lock().await;
85 if let Some(existing) = agents.get(&id) {
86 match existing.session_id.as_deref() {
87 Some(owner) if owner != client_id => {
88 return Err(format!(
89 "agent '{id}' is owned by another session; \
90 unregister it from that session first"
91 ));
92 }
93 _ => {}
94 }
95 }
96 }
97
98 let agent = HostAgent {
99 id: id.clone(),
100 name: req.name,
101 kind: req.kind,
102 capabilities: req.capabilities,
103 project: req.project,
104 session_id: Some(client_id.to_string()),
105 status: HostAgentStatus::Idle,
106 current_task: None,
107 pid: req.pid,
108 display: req.display,
109 updated_at: Utc::now(),
110 metadata: req.metadata,
111 };
112
113 self.agents.lock().await.insert(id.clone(), agent.clone());
114 self.record_event(
115 "agent.registered",
116 Some(id),
117 format!("{} registered", agent.name),
118 serde_json::to_value(&agent).map_err(|e| e.to_string())?,
119 )
120 .await;
121
122 Ok(agent)
123 }
124
125 pub async fn unregister_agent(
126 &self,
127 caller_client_id: &str,
128 agent_id: &str,
129 ) -> Result<(), String> {
130 {
134 let agents = self.agents.lock().await;
135 if let Some(existing) = agents.get(agent_id) {
136 if let Some(owner) = existing.session_id.as_deref() {
137 if owner != caller_client_id {
138 return Err(format!("agent '{agent_id}' is owned by another session"));
139 }
140 }
141 }
142 }
143
144 let removed = self.agents.lock().await.remove(agent_id);
145 if removed.is_none() {
146 return Err(format!("unknown agent '{}'", agent_id));
147 }
148 self.record_event(
149 "agent.unregistered",
150 Some(agent_id.to_string()),
151 format!("{} unregistered", agent_id),
152 Value::Null,
153 )
154 .await;
155 self.reap_agent_approvals(caller_client_id, agent_id).await;
159 Ok(())
160 }
161
162 pub async fn set_status(
163 &self,
164 caller_client_id: &str,
165 req: SetHostAgentStatusRequest,
166 ) -> Result<HostAgent, String> {
167 let mut agents = self.agents.lock().await;
168 let agent = agents
169 .get_mut(&req.agent_id)
170 .ok_or_else(|| format!("unknown agent '{}'", req.agent_id))?;
171
172 if let Some(owner) = agent.session_id.as_deref() {
177 if owner != caller_client_id {
178 return Err(format!(
179 "agent '{}' is owned by another session",
180 req.agent_id
181 ));
182 }
183 }
184
185 agent.status = req.status.clone();
186 agent.current_task = req.current_task.clone();
187 agent.updated_at = Utc::now();
188 let updated = agent.clone();
189 drop(agents);
190
191 let message = req
192 .message
193 .unwrap_or_else(|| format!("{} is {:?}", updated.name, updated.status));
194 self.record_event(
195 "agent.status_changed",
196 Some(updated.id.clone()),
197 message,
198 if req.payload.is_null() {
199 serde_json::to_value(&updated).map_err(|e| e.to_string())?
200 } else {
201 req.payload
202 },
203 )
204 .await;
205
206 Ok(updated)
207 }
208
209 pub async fn create_approval(
217 &self,
218 caller_client_id: Option<&str>,
219 req: CreateHostApprovalRequest,
220 ) -> Result<HostApprovalRequest, String> {
221 let approval = HostApprovalRequest {
222 id: format!("approval-{}", short_id()),
223 agent_id: req.agent_id,
224 client_id: caller_client_id.map(|s| s.to_string()),
225 action: req.action,
226 details: req.details,
227 options: if req.options.is_empty() {
228 vec!["approve".to_string(), "deny".to_string()]
229 } else {
230 req.options
231 },
232 status: HostApprovalStatus::Pending,
233 created_at: Utc::now(),
234 resolved_at: None,
235 resolution: None,
236 };
237
238 self.approvals
239 .lock()
240 .await
241 .insert(approval.id.clone(), approval.clone());
242 self.record_event(
243 "approval.requested",
244 approval.agent_id.clone(),
245 format!("Approval requested: {}", approval.action),
246 serde_json::to_value(&approval).map_err(|e| e.to_string())?,
247 )
248 .await;
249 Ok(approval)
250 }
251
252 pub async fn resolve_approval(
274 &self,
275 caller_client_id: &str,
276 req: ResolveHostApprovalRequest,
277 ) -> Result<HostApprovalRequest, String> {
278 let (owner_opt, pending_snapshot) = {
283 let mut approvals = self.approvals.lock().await;
284 let approval = approvals
285 .get_mut(&req.approval_id)
286 .ok_or_else(|| format!("unknown approval '{}'", req.approval_id))?;
287 (approval.client_id.clone(), approval.clone())
288 };
289
290 if let Some(owner) = owner_opt {
291 if owner != caller_client_id {
292 let owner_subscribed =
297 self.subscribers.lock().await.contains_key(&owner);
298 if !owner_subscribed {
299 return Err(format!(
300 "approval '{}' is owned by session '{}' which is not currently connected",
301 req.approval_id, owner,
302 ));
303 }
304 self.record_event(
305 "approval.resolve_requested",
306 pending_snapshot.agent_id.clone(),
307 format!(
308 "Resolution requested for {}: {}",
309 pending_snapshot.action, req.resolution
310 ),
311 serde_json::json!({
312 "approval_id": req.approval_id,
313 "resolution": req.resolution,
314 "requesting_client_id": caller_client_id,
315 "owner_client_id": owner,
316 }),
317 )
318 .await;
319 return Ok(pending_snapshot);
323 }
324 }
325
326 let resolved = {
328 let mut approvals = self.approvals.lock().await;
329 let approval = approvals
330 .get_mut(&req.approval_id)
331 .ok_or_else(|| format!("approval '{}' vanished mid-resolve", req.approval_id))?;
332 approval.status = HostApprovalStatus::Resolved;
333 approval.resolution = Some(req.resolution);
334 approval.resolved_at = Some(Utc::now());
335 approval.clone()
336 };
337
338 if let Some(notify) = self.notifies.lock().await.remove(&resolved.id) {
343 notify.notify_one();
344 }
345
346 self.record_event(
347 "approval.resolved",
348 resolved.agent_id.clone(),
349 format!("Approval resolved: {}", resolved.action),
350 serde_json::to_value(&resolved).map_err(|e| e.to_string())?,
351 )
352 .await;
353 Ok(resolved)
354 }
355
356 async fn reap_approvals<F>(&self, should_reap: F) -> usize
366 where
367 F: Fn(&HostApprovalRequest) -> bool,
368 {
369 let reaped: Vec<HostApprovalRequest> = {
370 let mut approvals = self.approvals.lock().await;
371 let mut out = Vec::new();
372 for approval in approvals.values_mut() {
373 if approval.status == HostApprovalStatus::Pending && should_reap(approval) {
374 approval.status = HostApprovalStatus::Resolved;
375 approval.resolution = Some("agent_gone".to_string());
376 approval.resolved_at = Some(Utc::now());
377 out.push(approval.clone());
378 }
379 }
380 out
381 };
382
383 for approval in &reaped {
384 if let Some(notify) = self.notifies.lock().await.remove(&approval.id) {
389 notify.notify_one();
390 }
391 self.record_event(
392 "approval.resolved",
393 approval.agent_id.clone(),
394 format!("Approval auto-cancelled (agent gone): {}", approval.action),
395 serde_json::to_value(approval).unwrap_or(Value::Null),
396 )
397 .await;
398 }
399 reaped.len()
400 }
401
402 pub async fn reap_session_approvals(&self, client_id: &str) -> usize {
409 self.reap_approvals(|a| a.client_id.as_deref() == Some(client_id))
410 .await
411 }
412
413 pub async fn reap_agent_approvals(&self, caller_client_id: &str, agent_id: &str) -> usize {
418 self.reap_approvals(|a| {
419 a.agent_id.as_deref() == Some(agent_id)
420 && a.client_id.as_deref() == Some(caller_client_id)
421 })
422 .await
423 }
424
425 pub async fn request_and_wait_approval(
447 &self,
448 req: CreateHostApprovalRequest,
449 approve_label: &str,
450 timeout: Duration,
451 ) -> Result<ApprovalOutcome, String> {
452 let approval = self.create_approval(None, req).await?;
458 let approval_id = approval.id.clone();
459
460 let notify = Arc::new(Notify::new());
465 {
466 let mut map = self.notifies.lock().await;
467 map.insert(approval_id.clone(), notify.clone());
468 }
469
470 if let Some(resolved) = self.approvals.lock().await.get(&approval_id).cloned() {
475 if matches!(resolved.status, HostApprovalStatus::Resolved) {
476 self.notifies.lock().await.remove(&approval_id);
477 return Ok(classify_resolution(&resolved, approve_label));
478 }
479 }
480
481 let woken = tokio::time::timeout(timeout, notify.notified()).await;
486 if woken.is_err() {
487 self.notifies.lock().await.remove(&approval_id);
488 return Ok(ApprovalOutcome::TimedOut);
489 }
490
491 let resolved = self
492 .approvals
493 .lock()
494 .await
495 .get(&approval_id)
496 .cloned()
497 .ok_or_else(|| format!("approval '{}' vanished after notify", approval_id))?;
498 Ok(classify_resolution(&resolved, approve_label))
499 }
500
501 pub async fn agents(&self) -> Vec<HostAgent> {
502 let mut agents: Vec<_> = self.agents.lock().await.values().cloned().collect();
503 agents.sort_by(|a, b| a.name.cmp(&b.name).then(a.id.cmp(&b.id)));
504 agents
505 }
506
507 pub async fn approvals(&self) -> Vec<HostApprovalRequest> {
508 let mut approvals: Vec<_> = self.approvals.lock().await.values().cloned().collect();
509 approvals.sort_by(|a, b| b.created_at.cmp(&a.created_at));
510 approvals
511 }
512
513 pub async fn events(&self, limit: usize) -> Vec<HostEvent> {
514 self.events
515 .lock()
516 .await
517 .iter()
518 .rev()
519 .take(limit)
520 .cloned()
521 .collect()
522 }
523
524 pub async fn record_event(
525 &self,
526 kind: impl Into<String>,
527 agent_id: Option<String>,
528 message: impl Into<String>,
529 payload: Value,
530 ) -> HostEvent {
531 let event = HostEvent {
532 id: format!("event-{}", short_id()),
533 timestamp: Utc::now(),
534 kind: kind.into(),
535 agent_id,
536 message: message.into(),
537 payload,
538 };
539
540 {
541 let mut events = self.events.lock().await;
542 events.push_back(event.clone());
543 while events.len() > MAX_EVENTS {
544 events.pop_front();
545 }
546 }
547
548 self.broadcast_event(&event).await;
549 event
550 }
551
552 async fn broadcast_event(&self, event: &HostEvent) {
553 let subscribers: Vec<Arc<WsChannel>> =
554 self.subscribers.lock().await.values().cloned().collect();
555
556 let Ok(json) = serde_json::to_string(&serde_json::json!({
557 "jsonrpc": "2.0",
558 "method": "host.event",
559 "params": event,
560 })) else {
561 return;
562 };
563
564 for channel in subscribers {
565 let _ = channel
566 .write
567 .lock()
568 .await
569 .send(Message::Text(json.clone().into()))
570 .await;
571 }
572 }
573}
574
575fn short_id() -> String {
576 uuid::Uuid::new_v4().simple().to_string()[..12].to_string()
577}
578
579fn classify_resolution(approval: &HostApprovalRequest, approve_label: &str) -> ApprovalOutcome {
580 match approval.resolution.as_deref() {
581 Some(r) if r == approve_label => ApprovalOutcome::Approved,
582 _ => ApprovalOutcome::Denied,
583 }
584}
585
586#[cfg(test)]
587mod tests {
588 use super::*;
589
590 #[tokio::test]
591 async fn host_tracks_agents_events_and_approvals() {
592 let host = HostState::new();
593
594 let agent = host
595 .register_agent(
596 "client-1",
597 RegisterHostAgentRequest {
598 id: Some("agent-1".to_string()),
599 name: "Researcher".to_string(),
600 kind: "builtin".to_string(),
601 capabilities: vec!["search".to_string()],
602 project: Some("/tmp/project".to_string()),
603 pid: None,
604 display: car_proto::HostAgentDisplay {
605 label: Some("Research Lead".to_string()),
606 icon: Some("magnifying-glass".to_string()),
607 accent: Some("#0a84ff".to_string()),
608 },
609 metadata: Value::Null,
610 },
611 )
612 .await
613 .expect("register agent");
614
615 assert_eq!(agent.status, HostAgentStatus::Idle);
616 assert_eq!(agent.display.label.as_deref(), Some("Research Lead"));
617 assert_eq!(agent.display.icon.as_deref(), Some("magnifying-glass"));
618 assert_eq!(agent.display.accent.as_deref(), Some("#0a84ff"));
619 assert_eq!(host.agents().await.len(), 1);
620
621 let updated = host
622 .set_status(
623 "client-1",
624 SetHostAgentStatusRequest {
625 agent_id: "agent-1".to_string(),
626 status: HostAgentStatus::Running,
627 current_task: Some("Collect facts".to_string()),
628 message: None,
629 payload: Value::Null,
630 },
631 )
632 .await
633 .expect("set status");
634
635 assert_eq!(updated.status, HostAgentStatus::Running);
636 assert_eq!(updated.current_task.as_deref(), Some("Collect facts"));
637
638 let approval = host
639 .create_approval(
640 Some("client-1"),
641 CreateHostApprovalRequest {
642 agent_id: Some("agent-1".to_string()),
643 action: "Run tests".to_string(),
644 details: serde_json::json!({ "command": "cargo test" }),
645 options: vec![],
646 system_level: false,
647 },
648 )
649 .await
650 .expect("create approval");
651
652 assert_eq!(approval.options, vec!["approve", "deny"]);
653 assert_eq!(approval.status, HostApprovalStatus::Pending);
654
655 let resolved = host
656 .resolve_approval(
657 "client-1",
658 ResolveHostApprovalRequest {
659 approval_id: approval.id,
660 resolution: "approve".to_string(),
661 },
662 )
663 .await
664 .expect("resolve approval");
665
666 assert_eq!(resolved.status, HostApprovalStatus::Resolved);
667 assert_eq!(resolved.resolution.as_deref(), Some("approve"));
668 assert!(host.events(10).await.len() >= 4);
669 }
670
671 #[tokio::test]
672 async fn request_and_wait_returns_approved_when_user_approves() {
673 let host = Arc::new(HostState::new());
674 let host2 = host.clone();
675
676 let waiter = tokio::spawn(async move {
679 host2
680 .request_and_wait_approval(
681 CreateHostApprovalRequest {
682 agent_id: None,
683 action: "automation.run_applescript".into(),
684 details: serde_json::json!({}),
685 options: vec![],
686 system_level: false,
687 },
688 "approve",
689 Duration::from_secs(2),
690 )
691 .await
692 .expect("gate ran")
693 });
694
695 tokio::time::sleep(Duration::from_millis(20)).await;
700 let pending = host.approvals().await;
701 assert_eq!(pending.len(), 1, "exactly one pending approval");
702 assert!(
703 pending[0].client_id.is_none(),
704 "gate-raised approvals must be system-level"
705 );
706 host.resolve_approval(
707 "ui-session",
708 ResolveHostApprovalRequest {
709 approval_id: pending[0].id.clone(),
710 resolution: "approve".into(),
711 },
712 )
713 .await
714 .unwrap();
715
716 let outcome = waiter.await.unwrap();
717 assert_eq!(outcome, ApprovalOutcome::Approved);
718 }
719
720 #[tokio::test]
721 async fn request_and_wait_returns_denied_on_other_resolution() {
722 let host = Arc::new(HostState::new());
723 let host2 = host.clone();
724 let waiter = tokio::spawn(async move {
725 host2
726 .request_and_wait_approval(
727 CreateHostApprovalRequest {
728 agent_id: None,
729 action: "messages.send".into(),
730 details: serde_json::json!({}),
731 options: vec![],
732 system_level: false,
733 },
734 "approve",
735 Duration::from_secs(2),
736 )
737 .await
738 .unwrap()
739 });
740 tokio::time::sleep(Duration::from_millis(20)).await;
741 let pending = host.approvals().await;
742 host.resolve_approval(
743 "ui-session",
744 ResolveHostApprovalRequest {
745 approval_id: pending[0].id.clone(),
746 resolution: "deny".into(),
747 },
748 )
749 .await
750 .unwrap();
751 assert_eq!(waiter.await.unwrap(), ApprovalOutcome::Denied);
752 }
753
754 #[tokio::test]
755 async fn request_and_wait_times_out_when_no_resolution() {
756 let host = HostState::new();
757 let outcome = host
758 .request_and_wait_approval(
759 CreateHostApprovalRequest {
760 agent_id: None,
761 action: "vision.ocr".into(),
762 details: serde_json::json!({}),
763 options: vec![],
764 system_level: false,
765 },
766 "approve",
767 Duration::from_millis(50),
768 )
769 .await
770 .unwrap();
771 assert_eq!(outcome, ApprovalOutcome::TimedOut);
772 let pending = host.approvals().await;
774 assert_eq!(pending.len(), 1);
775 assert_eq!(pending[0].status, HostApprovalStatus::Pending);
776 }
777
778 fn make_register_request(name: &str) -> RegisterHostAgentRequest {
781 RegisterHostAgentRequest {
782 id: Some(name.into()),
783 name: name.into(),
784 kind: "test".into(),
785 capabilities: vec![],
786 project: None,
787 pid: None,
788 display: car_proto::HostAgentDisplay {
789 label: None,
790 icon: None,
791 accent: None,
792 },
793 metadata: Value::Null,
794 }
795 }
796
797 #[tokio::test]
798 async fn set_status_rejects_non_owning_session() {
799 let host = HostState::new();
800 host.register_agent("client-A", make_register_request("worker"))
801 .await
802 .unwrap();
803
804 let err = host
806 .set_status(
807 "client-B",
808 SetHostAgentStatusRequest {
809 agent_id: "worker".into(),
810 status: HostAgentStatus::Errored,
811 current_task: None,
812 message: None,
813 payload: Value::Null,
814 },
815 )
816 .await
817 .unwrap_err();
818 assert!(
819 err.contains("owned by another session"),
820 "unexpected rejection message: {err}"
821 );
822
823 host.set_status(
825 "client-A",
826 SetHostAgentStatusRequest {
827 agent_id: "worker".into(),
828 status: HostAgentStatus::Running,
829 current_task: None,
830 message: None,
831 payload: Value::Null,
832 },
833 )
834 .await
835 .expect("owner can mutate");
836 }
837
838 #[tokio::test]
839 async fn unregister_rejects_non_owning_session() {
840 let host = HostState::new();
841 host.register_agent("client-A", make_register_request("worker"))
842 .await
843 .unwrap();
844
845 let err = host
846 .unregister_agent("client-B", "worker")
847 .await
848 .unwrap_err();
849 assert!(err.contains("owned by another session"));
850 assert_eq!(host.agents().await.len(), 1, "agent must survive");
851
852 host.unregister_agent("client-A", "worker")
853 .await
854 .expect("owner can unregister");
855 assert_eq!(host.agents().await.len(), 0);
856 }
857
858 #[tokio::test]
859 async fn register_refuses_to_overwrite_other_sessions_agent() {
860 let host = HostState::new();
861 host.register_agent("client-A", make_register_request("worker"))
862 .await
863 .unwrap();
864
865 let err = host
866 .register_agent("client-B", make_register_request("worker"))
867 .await
868 .unwrap_err();
869 assert!(
870 err.contains("owned by another session"),
871 "unexpected message: {err}"
872 );
873 }
874
875 #[tokio::test]
876 async fn manual_approval_cross_session_with_disconnected_owner_errors() {
877 let host = HostState::new();
882 let approval = host
883 .create_approval(
884 Some("client-A"),
885 CreateHostApprovalRequest {
886 agent_id: None,
887 action: "manual.action".into(),
888 details: serde_json::json!({}),
889 options: vec![],
890 system_level: false,
891 },
892 )
893 .await
894 .unwrap();
895
896 let err = host
897 .resolve_approval(
898 "client-B",
899 ResolveHostApprovalRequest {
900 approval_id: approval.id.clone(),
901 resolution: "approve".into(),
902 },
903 )
904 .await
905 .unwrap_err();
906 assert!(
907 err.contains("not currently connected"),
908 "expected disconnected-owner message, got: {err}",
909 );
910 let still = host
913 .approvals()
914 .await
915 .into_iter()
916 .find(|a| a.id == approval.id)
917 .expect("approval survives failed resolve");
918 assert_eq!(still.status, HostApprovalStatus::Pending);
919
920 host.resolve_approval(
922 "client-A",
923 ResolveHostApprovalRequest {
924 approval_id: approval.id,
925 resolution: "deny".into(),
926 },
927 )
928 .await
929 .expect("owner can resolve");
930 }
931
932 #[tokio::test]
933 async fn manual_approval_cross_session_with_subscribed_owner_fans_out() {
934 let host = HostState::new();
941
942 host.subscribe("client-A", Arc::new(WsChannel::test_stub()))
947 .await;
948
949 let approval = host
950 .create_approval(
951 Some("client-A"),
952 CreateHostApprovalRequest {
953 agent_id: Some("worker".into()),
954 action: "Send email to bob@example.com".into(),
955 details: serde_json::json!({"kind": "email_reply"}),
956 options: vec![],
957 system_level: false,
958 },
959 )
960 .await
961 .unwrap();
962
963 let returned = host
965 .resolve_approval(
966 "client-B",
967 ResolveHostApprovalRequest {
968 approval_id: approval.id.clone(),
969 resolution: "deny".into(),
970 },
971 )
972 .await
973 .expect("cross-session resolve fans out without error");
974 assert_eq!(
975 returned.status,
976 HostApprovalStatus::Pending,
977 "returned row must stay Pending — only the owner mutates",
978 );
979
980 let events = host.events(10).await;
982 let resolve_req = events
983 .iter()
984 .find(|e| e.kind == "approval.resolve_requested")
985 .expect("resolve_requested event recorded");
986 assert_eq!(
987 resolve_req.payload.get("approval_id").and_then(|v| v.as_str()),
988 Some(approval.id.as_str()),
989 );
990 assert_eq!(
991 resolve_req.payload.get("resolution").and_then(|v| v.as_str()),
992 Some("deny"),
993 );
994 assert_eq!(
995 resolve_req.payload.get("owner_client_id").and_then(|v| v.as_str()),
996 Some("client-A"),
997 );
998 assert_eq!(
999 resolve_req
1000 .payload
1001 .get("requesting_client_id")
1002 .and_then(|v| v.as_str()),
1003 Some("client-B"),
1004 );
1005
1006 let still = host
1009 .approvals()
1010 .await
1011 .into_iter()
1012 .find(|a| a.id == approval.id)
1013 .unwrap();
1014 assert_eq!(still.status, HostApprovalStatus::Pending);
1015
1016 let resolved = host
1021 .resolve_approval(
1022 "client-A",
1023 ResolveHostApprovalRequest {
1024 approval_id: approval.id.clone(),
1025 resolution: "deny".into(),
1026 },
1027 )
1028 .await
1029 .expect("owner resolves on own session");
1030 assert_eq!(resolved.status, HostApprovalStatus::Resolved);
1031 assert_eq!(resolved.resolution.as_deref(), Some("deny"));
1032 }
1033
1034
1035 #[tokio::test]
1036 async fn system_approval_resolvable_by_any_session() {
1037 let host = HostState::new();
1041 let approval = host
1042 .create_approval(
1043 None,
1044 CreateHostApprovalRequest {
1045 agent_id: None,
1046 action: "ws.method:automation.run_applescript".into(),
1047 details: serde_json::json!({}),
1048 options: vec![],
1049 system_level: false,
1050 },
1051 )
1052 .await
1053 .unwrap();
1054 assert!(
1055 approval.client_id.is_none(),
1056 "system approval must carry no owner"
1057 );
1058
1059 host.resolve_approval(
1060 "ui-session",
1061 ResolveHostApprovalRequest {
1062 approval_id: approval.id,
1063 resolution: "approve".into(),
1064 },
1065 )
1066 .await
1067 .expect("any session can resolve a system approval");
1068 }
1069
1070 #[tokio::test]
1073 async fn unregister_agent_reaps_its_pending_approvals() {
1074 let host = HostState::new();
1075 host.register_agent("client-1", make_register_request("agent-1"))
1076 .await
1077 .unwrap();
1078 let approval = host
1079 .create_approval(
1080 Some("client-1"),
1081 CreateHostApprovalRequest {
1082 agent_id: Some("agent-1".to_string()),
1083 action: "Wire $1M".to_string(),
1084 details: Value::Null,
1085 options: vec![],
1086 system_level: false,
1087 },
1088 )
1089 .await
1090 .unwrap();
1091 assert_eq!(approval.status, HostApprovalStatus::Pending);
1092
1093 host.unregister_agent("client-1", "agent-1").await.unwrap();
1094
1095 let pending: Vec<_> = host
1096 .approvals()
1097 .await
1098 .into_iter()
1099 .filter(|a| a.status == HostApprovalStatus::Pending)
1100 .collect();
1101 assert!(pending.is_empty(), "agent's approval must be auto-cancelled");
1102 let all = host.approvals().await;
1103 let reaped = all.iter().find(|a| a.id == approval.id).unwrap();
1104 assert_eq!(reaped.status, HostApprovalStatus::Resolved);
1105 assert_eq!(reaped.resolution.as_deref(), Some("agent_gone"));
1106 }
1107
1108 #[tokio::test]
1109 async fn session_disconnect_reaps_owned_but_not_system_approvals() {
1110 let host = HostState::new();
1111 let owned = host
1113 .create_approval(
1114 Some("client-9"),
1115 CreateHostApprovalRequest {
1116 agent_id: Some("agent-x".to_string()),
1117 action: "owned".to_string(),
1118 details: Value::Null,
1119 options: vec![],
1120 system_level: false,
1121 },
1122 )
1123 .await
1124 .unwrap();
1125 let system = host
1127 .create_approval(
1128 None,
1129 CreateHostApprovalRequest {
1130 agent_id: None,
1131 action: "system gate".to_string(),
1132 details: Value::Null,
1133 options: vec![],
1134 system_level: true,
1135 },
1136 )
1137 .await
1138 .unwrap();
1139
1140 let n = host.reap_session_approvals("client-9").await;
1141 assert_eq!(n, 1, "only the session-owned approval is reaped");
1142
1143 let all = host.approvals().await;
1144 let owned_now = all.iter().find(|a| a.id == owned.id).unwrap();
1145 let system_now = all.iter().find(|a| a.id == system.id).unwrap();
1146 assert_eq!(owned_now.status, HostApprovalStatus::Resolved);
1147 assert_eq!(owned_now.resolution.as_deref(), Some("agent_gone"));
1148 assert_eq!(
1149 system_now.status,
1150 HostApprovalStatus::Pending,
1151 "system-level gate approvals must outlive a client disconnect"
1152 );
1153 }
1154}