1use crate::session::WsChannel;
8use car_proto::{
9 CreateHostApprovalRequest, HostAgent, HostAgentStatus, HostApprovalRequest, HostApprovalStatus,
10 HostEvent, RegisterHostAgentRequest, ResolveHostApprovalRequest, SetHostAgentStatusRequest,
11};
12use chrono::Utc;
13use futures::SinkExt;
14use serde_json::Value;
15use std::collections::{HashMap, VecDeque};
16use std::sync::Arc;
17use std::time::Duration;
18use tokio::sync::{Mutex, Notify};
19use tokio_tungstenite::tungstenite::Message;
20
21const MAX_EVENTS: usize = 500;
22
23#[derive(Default)]
24pub struct HostState {
25 agents: Mutex<HashMap<String, HostAgent>>,
26 approvals: Mutex<HashMap<String, HostApprovalRequest>>,
27 events: Mutex<VecDeque<HostEvent>>,
28 subscribers: Mutex<HashMap<String, Arc<WsChannel>>>,
29 notifies: Mutex<HashMap<String, Arc<Notify>>>,
37}
38
39#[derive(Debug, Clone, Copy, PartialEq, Eq)]
44pub enum ApprovalOutcome {
45 Approved,
48 Denied,
51 TimedOut,
55}
56
57impl HostState {
58 pub fn new() -> Self {
59 Self::default()
60 }
61
62 pub async fn subscribe(&self, client_id: &str, channel: Arc<WsChannel>) {
63 self.subscribers
64 .lock()
65 .await
66 .insert(client_id.to_string(), channel);
67 }
68
69 pub async fn unsubscribe(&self, client_id: &str) {
70 self.subscribers.lock().await.remove(client_id);
71 }
72
73 pub async fn register_agent(
74 &self,
75 client_id: &str,
76 req: RegisterHostAgentRequest,
77 ) -> Result<HostAgent, String> {
78 let id = req.id.unwrap_or_else(|| format!("agent-{}", short_id()));
79 {
84 let agents = self.agents.lock().await;
85 if let Some(existing) = agents.get(&id) {
86 match existing.session_id.as_deref() {
87 Some(owner) if owner != client_id => {
88 return Err(format!(
89 "agent '{id}' is owned by another session; \
90 unregister it from that session first"
91 ));
92 }
93 _ => {}
94 }
95 }
96 }
97
98 let agent = HostAgent {
99 id: id.clone(),
100 name: req.name,
101 kind: req.kind,
102 capabilities: req.capabilities,
103 project: req.project,
104 session_id: Some(client_id.to_string()),
105 status: HostAgentStatus::Idle,
106 current_task: None,
107 pid: req.pid,
108 display: req.display,
109 updated_at: Utc::now(),
110 metadata: req.metadata,
111 };
112
113 self.agents.lock().await.insert(id.clone(), agent.clone());
114 self.record_event(
115 "agent.registered",
116 Some(id),
117 format!("{} registered", agent.name),
118 serde_json::to_value(&agent).map_err(|e| e.to_string())?,
119 )
120 .await;
121
122 Ok(agent)
123 }
124
125 pub async fn unregister_agent(
126 &self,
127 caller_client_id: &str,
128 agent_id: &str,
129 ) -> Result<(), String> {
130 {
134 let agents = self.agents.lock().await;
135 if let Some(existing) = agents.get(agent_id) {
136 if let Some(owner) = existing.session_id.as_deref() {
137 if owner != caller_client_id {
138 return Err(format!("agent '{agent_id}' is owned by another session"));
139 }
140 }
141 }
142 }
143
144 let removed = self.agents.lock().await.remove(agent_id);
145 if removed.is_none() {
146 return Err(format!("unknown agent '{}'", agent_id));
147 }
148 self.record_event(
149 "agent.unregistered",
150 Some(agent_id.to_string()),
151 format!("{} unregistered", agent_id),
152 Value::Null,
153 )
154 .await;
155 Ok(())
156 }
157
158 pub async fn set_status(
159 &self,
160 caller_client_id: &str,
161 req: SetHostAgentStatusRequest,
162 ) -> Result<HostAgent, String> {
163 let mut agents = self.agents.lock().await;
164 let agent = agents
165 .get_mut(&req.agent_id)
166 .ok_or_else(|| format!("unknown agent '{}'", req.agent_id))?;
167
168 if let Some(owner) = agent.session_id.as_deref() {
173 if owner != caller_client_id {
174 return Err(format!(
175 "agent '{}' is owned by another session",
176 req.agent_id
177 ));
178 }
179 }
180
181 agent.status = req.status.clone();
182 agent.current_task = req.current_task.clone();
183 agent.updated_at = Utc::now();
184 let updated = agent.clone();
185 drop(agents);
186
187 let message = req
188 .message
189 .unwrap_or_else(|| format!("{} is {:?}", updated.name, updated.status));
190 self.record_event(
191 "agent.status_changed",
192 Some(updated.id.clone()),
193 message,
194 if req.payload.is_null() {
195 serde_json::to_value(&updated).map_err(|e| e.to_string())?
196 } else {
197 req.payload
198 },
199 )
200 .await;
201
202 Ok(updated)
203 }
204
205 pub async fn create_approval(
213 &self,
214 caller_client_id: Option<&str>,
215 req: CreateHostApprovalRequest,
216 ) -> Result<HostApprovalRequest, String> {
217 let approval = HostApprovalRequest {
218 id: format!("approval-{}", short_id()),
219 agent_id: req.agent_id,
220 client_id: caller_client_id.map(|s| s.to_string()),
221 action: req.action,
222 details: req.details,
223 options: if req.options.is_empty() {
224 vec!["approve".to_string(), "deny".to_string()]
225 } else {
226 req.options
227 },
228 status: HostApprovalStatus::Pending,
229 created_at: Utc::now(),
230 resolved_at: None,
231 resolution: None,
232 };
233
234 self.approvals
235 .lock()
236 .await
237 .insert(approval.id.clone(), approval.clone());
238 self.record_event(
239 "approval.requested",
240 approval.agent_id.clone(),
241 format!("Approval requested: {}", approval.action),
242 serde_json::to_value(&approval).map_err(|e| e.to_string())?,
243 )
244 .await;
245 Ok(approval)
246 }
247
248 pub async fn resolve_approval(
270 &self,
271 caller_client_id: &str,
272 req: ResolveHostApprovalRequest,
273 ) -> Result<HostApprovalRequest, String> {
274 let (owner_opt, pending_snapshot) = {
279 let mut approvals = self.approvals.lock().await;
280 let approval = approvals
281 .get_mut(&req.approval_id)
282 .ok_or_else(|| format!("unknown approval '{}'", req.approval_id))?;
283 (approval.client_id.clone(), approval.clone())
284 };
285
286 if let Some(owner) = owner_opt {
287 if owner != caller_client_id {
288 let owner_subscribed =
293 self.subscribers.lock().await.contains_key(&owner);
294 if !owner_subscribed {
295 return Err(format!(
296 "approval '{}' is owned by session '{}' which is not currently connected",
297 req.approval_id, owner,
298 ));
299 }
300 self.record_event(
301 "approval.resolve_requested",
302 pending_snapshot.agent_id.clone(),
303 format!(
304 "Resolution requested for {}: {}",
305 pending_snapshot.action, req.resolution
306 ),
307 serde_json::json!({
308 "approval_id": req.approval_id,
309 "resolution": req.resolution,
310 "requesting_client_id": caller_client_id,
311 "owner_client_id": owner,
312 }),
313 )
314 .await;
315 return Ok(pending_snapshot);
319 }
320 }
321
322 let resolved = {
324 let mut approvals = self.approvals.lock().await;
325 let approval = approvals
326 .get_mut(&req.approval_id)
327 .ok_or_else(|| format!("approval '{}' vanished mid-resolve", req.approval_id))?;
328 approval.status = HostApprovalStatus::Resolved;
329 approval.resolution = Some(req.resolution);
330 approval.resolved_at = Some(Utc::now());
331 approval.clone()
332 };
333
334 if let Some(notify) = self.notifies.lock().await.remove(&resolved.id) {
339 notify.notify_one();
340 }
341
342 self.record_event(
343 "approval.resolved",
344 resolved.agent_id.clone(),
345 format!("Approval resolved: {}", resolved.action),
346 serde_json::to_value(&resolved).map_err(|e| e.to_string())?,
347 )
348 .await;
349 Ok(resolved)
350 }
351
352 pub async fn request_and_wait_approval(
374 &self,
375 req: CreateHostApprovalRequest,
376 approve_label: &str,
377 timeout: Duration,
378 ) -> Result<ApprovalOutcome, String> {
379 let approval = self.create_approval(None, req).await?;
385 let approval_id = approval.id.clone();
386
387 let notify = Arc::new(Notify::new());
392 {
393 let mut map = self.notifies.lock().await;
394 map.insert(approval_id.clone(), notify.clone());
395 }
396
397 if let Some(resolved) = self.approvals.lock().await.get(&approval_id).cloned() {
402 if matches!(resolved.status, HostApprovalStatus::Resolved) {
403 self.notifies.lock().await.remove(&approval_id);
404 return Ok(classify_resolution(&resolved, approve_label));
405 }
406 }
407
408 let woken = tokio::time::timeout(timeout, notify.notified()).await;
413 if woken.is_err() {
414 self.notifies.lock().await.remove(&approval_id);
415 return Ok(ApprovalOutcome::TimedOut);
416 }
417
418 let resolved = self
419 .approvals
420 .lock()
421 .await
422 .get(&approval_id)
423 .cloned()
424 .ok_or_else(|| format!("approval '{}' vanished after notify", approval_id))?;
425 Ok(classify_resolution(&resolved, approve_label))
426 }
427
428 pub async fn agents(&self) -> Vec<HostAgent> {
429 let mut agents: Vec<_> = self.agents.lock().await.values().cloned().collect();
430 agents.sort_by(|a, b| a.name.cmp(&b.name).then(a.id.cmp(&b.id)));
431 agents
432 }
433
434 pub async fn approvals(&self) -> Vec<HostApprovalRequest> {
435 let mut approvals: Vec<_> = self.approvals.lock().await.values().cloned().collect();
436 approvals.sort_by(|a, b| b.created_at.cmp(&a.created_at));
437 approvals
438 }
439
440 pub async fn events(&self, limit: usize) -> Vec<HostEvent> {
441 self.events
442 .lock()
443 .await
444 .iter()
445 .rev()
446 .take(limit)
447 .cloned()
448 .collect()
449 }
450
451 pub async fn record_event(
452 &self,
453 kind: impl Into<String>,
454 agent_id: Option<String>,
455 message: impl Into<String>,
456 payload: Value,
457 ) -> HostEvent {
458 let event = HostEvent {
459 id: format!("event-{}", short_id()),
460 timestamp: Utc::now(),
461 kind: kind.into(),
462 agent_id,
463 message: message.into(),
464 payload,
465 };
466
467 {
468 let mut events = self.events.lock().await;
469 events.push_back(event.clone());
470 while events.len() > MAX_EVENTS {
471 events.pop_front();
472 }
473 }
474
475 self.broadcast_event(&event).await;
476 event
477 }
478
479 async fn broadcast_event(&self, event: &HostEvent) {
480 let subscribers: Vec<Arc<WsChannel>> =
481 self.subscribers.lock().await.values().cloned().collect();
482
483 let Ok(json) = serde_json::to_string(&serde_json::json!({
484 "jsonrpc": "2.0",
485 "method": "host.event",
486 "params": event,
487 })) else {
488 return;
489 };
490
491 for channel in subscribers {
492 let _ = channel
493 .write
494 .lock()
495 .await
496 .send(Message::Text(json.clone().into()))
497 .await;
498 }
499 }
500}
501
502fn short_id() -> String {
503 uuid::Uuid::new_v4().simple().to_string()[..12].to_string()
504}
505
506fn classify_resolution(approval: &HostApprovalRequest, approve_label: &str) -> ApprovalOutcome {
507 match approval.resolution.as_deref() {
508 Some(r) if r == approve_label => ApprovalOutcome::Approved,
509 _ => ApprovalOutcome::Denied,
510 }
511}
512
513#[cfg(test)]
514mod tests {
515 use super::*;
516
517 #[tokio::test]
518 async fn host_tracks_agents_events_and_approvals() {
519 let host = HostState::new();
520
521 let agent = host
522 .register_agent(
523 "client-1",
524 RegisterHostAgentRequest {
525 id: Some("agent-1".to_string()),
526 name: "Researcher".to_string(),
527 kind: "builtin".to_string(),
528 capabilities: vec!["search".to_string()],
529 project: Some("/tmp/project".to_string()),
530 pid: None,
531 display: car_proto::HostAgentDisplay {
532 label: Some("Research Lead".to_string()),
533 icon: Some("magnifying-glass".to_string()),
534 accent: Some("#0a84ff".to_string()),
535 },
536 metadata: Value::Null,
537 },
538 )
539 .await
540 .expect("register agent");
541
542 assert_eq!(agent.status, HostAgentStatus::Idle);
543 assert_eq!(agent.display.label.as_deref(), Some("Research Lead"));
544 assert_eq!(agent.display.icon.as_deref(), Some("magnifying-glass"));
545 assert_eq!(agent.display.accent.as_deref(), Some("#0a84ff"));
546 assert_eq!(host.agents().await.len(), 1);
547
548 let updated = host
549 .set_status(
550 "client-1",
551 SetHostAgentStatusRequest {
552 agent_id: "agent-1".to_string(),
553 status: HostAgentStatus::Running,
554 current_task: Some("Collect facts".to_string()),
555 message: None,
556 payload: Value::Null,
557 },
558 )
559 .await
560 .expect("set status");
561
562 assert_eq!(updated.status, HostAgentStatus::Running);
563 assert_eq!(updated.current_task.as_deref(), Some("Collect facts"));
564
565 let approval = host
566 .create_approval(
567 Some("client-1"),
568 CreateHostApprovalRequest {
569 agent_id: Some("agent-1".to_string()),
570 action: "Run tests".to_string(),
571 details: serde_json::json!({ "command": "cargo test" }),
572 options: vec![],
573 system_level: false,
574 },
575 )
576 .await
577 .expect("create approval");
578
579 assert_eq!(approval.options, vec!["approve", "deny"]);
580 assert_eq!(approval.status, HostApprovalStatus::Pending);
581
582 let resolved = host
583 .resolve_approval(
584 "client-1",
585 ResolveHostApprovalRequest {
586 approval_id: approval.id,
587 resolution: "approve".to_string(),
588 },
589 )
590 .await
591 .expect("resolve approval");
592
593 assert_eq!(resolved.status, HostApprovalStatus::Resolved);
594 assert_eq!(resolved.resolution.as_deref(), Some("approve"));
595 assert!(host.events(10).await.len() >= 4);
596 }
597
598 #[tokio::test]
599 async fn request_and_wait_returns_approved_when_user_approves() {
600 let host = Arc::new(HostState::new());
601 let host2 = host.clone();
602
603 let waiter = tokio::spawn(async move {
606 host2
607 .request_and_wait_approval(
608 CreateHostApprovalRequest {
609 agent_id: None,
610 action: "automation.run_applescript".into(),
611 details: serde_json::json!({}),
612 options: vec![],
613 system_level: false,
614 },
615 "approve",
616 Duration::from_secs(2),
617 )
618 .await
619 .expect("gate ran")
620 });
621
622 tokio::time::sleep(Duration::from_millis(20)).await;
627 let pending = host.approvals().await;
628 assert_eq!(pending.len(), 1, "exactly one pending approval");
629 assert!(
630 pending[0].client_id.is_none(),
631 "gate-raised approvals must be system-level"
632 );
633 host.resolve_approval(
634 "ui-session",
635 ResolveHostApprovalRequest {
636 approval_id: pending[0].id.clone(),
637 resolution: "approve".into(),
638 },
639 )
640 .await
641 .unwrap();
642
643 let outcome = waiter.await.unwrap();
644 assert_eq!(outcome, ApprovalOutcome::Approved);
645 }
646
647 #[tokio::test]
648 async fn request_and_wait_returns_denied_on_other_resolution() {
649 let host = Arc::new(HostState::new());
650 let host2 = host.clone();
651 let waiter = tokio::spawn(async move {
652 host2
653 .request_and_wait_approval(
654 CreateHostApprovalRequest {
655 agent_id: None,
656 action: "messages.send".into(),
657 details: serde_json::json!({}),
658 options: vec![],
659 system_level: false,
660 },
661 "approve",
662 Duration::from_secs(2),
663 )
664 .await
665 .unwrap()
666 });
667 tokio::time::sleep(Duration::from_millis(20)).await;
668 let pending = host.approvals().await;
669 host.resolve_approval(
670 "ui-session",
671 ResolveHostApprovalRequest {
672 approval_id: pending[0].id.clone(),
673 resolution: "deny".into(),
674 },
675 )
676 .await
677 .unwrap();
678 assert_eq!(waiter.await.unwrap(), ApprovalOutcome::Denied);
679 }
680
681 #[tokio::test]
682 async fn request_and_wait_times_out_when_no_resolution() {
683 let host = HostState::new();
684 let outcome = host
685 .request_and_wait_approval(
686 CreateHostApprovalRequest {
687 agent_id: None,
688 action: "vision.ocr".into(),
689 details: serde_json::json!({}),
690 options: vec![],
691 system_level: false,
692 },
693 "approve",
694 Duration::from_millis(50),
695 )
696 .await
697 .unwrap();
698 assert_eq!(outcome, ApprovalOutcome::TimedOut);
699 let pending = host.approvals().await;
701 assert_eq!(pending.len(), 1);
702 assert_eq!(pending[0].status, HostApprovalStatus::Pending);
703 }
704
705 fn make_register_request(name: &str) -> RegisterHostAgentRequest {
708 RegisterHostAgentRequest {
709 id: Some(name.into()),
710 name: name.into(),
711 kind: "test".into(),
712 capabilities: vec![],
713 project: None,
714 pid: None,
715 display: car_proto::HostAgentDisplay {
716 label: None,
717 icon: None,
718 accent: None,
719 },
720 metadata: Value::Null,
721 }
722 }
723
724 #[tokio::test]
725 async fn set_status_rejects_non_owning_session() {
726 let host = HostState::new();
727 host.register_agent("client-A", make_register_request("worker"))
728 .await
729 .unwrap();
730
731 let err = host
733 .set_status(
734 "client-B",
735 SetHostAgentStatusRequest {
736 agent_id: "worker".into(),
737 status: HostAgentStatus::Errored,
738 current_task: None,
739 message: None,
740 payload: Value::Null,
741 },
742 )
743 .await
744 .unwrap_err();
745 assert!(
746 err.contains("owned by another session"),
747 "unexpected rejection message: {err}"
748 );
749
750 host.set_status(
752 "client-A",
753 SetHostAgentStatusRequest {
754 agent_id: "worker".into(),
755 status: HostAgentStatus::Running,
756 current_task: None,
757 message: None,
758 payload: Value::Null,
759 },
760 )
761 .await
762 .expect("owner can mutate");
763 }
764
765 #[tokio::test]
766 async fn unregister_rejects_non_owning_session() {
767 let host = HostState::new();
768 host.register_agent("client-A", make_register_request("worker"))
769 .await
770 .unwrap();
771
772 let err = host
773 .unregister_agent("client-B", "worker")
774 .await
775 .unwrap_err();
776 assert!(err.contains("owned by another session"));
777 assert_eq!(host.agents().await.len(), 1, "agent must survive");
778
779 host.unregister_agent("client-A", "worker")
780 .await
781 .expect("owner can unregister");
782 assert_eq!(host.agents().await.len(), 0);
783 }
784
785 #[tokio::test]
786 async fn register_refuses_to_overwrite_other_sessions_agent() {
787 let host = HostState::new();
788 host.register_agent("client-A", make_register_request("worker"))
789 .await
790 .unwrap();
791
792 let err = host
793 .register_agent("client-B", make_register_request("worker"))
794 .await
795 .unwrap_err();
796 assert!(
797 err.contains("owned by another session"),
798 "unexpected message: {err}"
799 );
800 }
801
802 #[tokio::test]
803 async fn manual_approval_cross_session_with_disconnected_owner_errors() {
804 let host = HostState::new();
809 let approval = host
810 .create_approval(
811 Some("client-A"),
812 CreateHostApprovalRequest {
813 agent_id: None,
814 action: "manual.action".into(),
815 details: serde_json::json!({}),
816 options: vec![],
817 system_level: false,
818 },
819 )
820 .await
821 .unwrap();
822
823 let err = host
824 .resolve_approval(
825 "client-B",
826 ResolveHostApprovalRequest {
827 approval_id: approval.id.clone(),
828 resolution: "approve".into(),
829 },
830 )
831 .await
832 .unwrap_err();
833 assert!(
834 err.contains("not currently connected"),
835 "expected disconnected-owner message, got: {err}",
836 );
837 let still = host
840 .approvals()
841 .await
842 .into_iter()
843 .find(|a| a.id == approval.id)
844 .expect("approval survives failed resolve");
845 assert_eq!(still.status, HostApprovalStatus::Pending);
846
847 host.resolve_approval(
849 "client-A",
850 ResolveHostApprovalRequest {
851 approval_id: approval.id,
852 resolution: "deny".into(),
853 },
854 )
855 .await
856 .expect("owner can resolve");
857 }
858
859 #[tokio::test]
860 async fn manual_approval_cross_session_with_subscribed_owner_fans_out() {
861 let host = HostState::new();
868
869 host.subscribe("client-A", Arc::new(WsChannel::test_stub()))
874 .await;
875
876 let approval = host
877 .create_approval(
878 Some("client-A"),
879 CreateHostApprovalRequest {
880 agent_id: Some("worker".into()),
881 action: "Send email to bob@example.com".into(),
882 details: serde_json::json!({"kind": "email_reply"}),
883 options: vec![],
884 system_level: false,
885 },
886 )
887 .await
888 .unwrap();
889
890 let returned = host
892 .resolve_approval(
893 "client-B",
894 ResolveHostApprovalRequest {
895 approval_id: approval.id.clone(),
896 resolution: "deny".into(),
897 },
898 )
899 .await
900 .expect("cross-session resolve fans out without error");
901 assert_eq!(
902 returned.status,
903 HostApprovalStatus::Pending,
904 "returned row must stay Pending — only the owner mutates",
905 );
906
907 let events = host.events(10).await;
909 let resolve_req = events
910 .iter()
911 .find(|e| e.kind == "approval.resolve_requested")
912 .expect("resolve_requested event recorded");
913 assert_eq!(
914 resolve_req.payload.get("approval_id").and_then(|v| v.as_str()),
915 Some(approval.id.as_str()),
916 );
917 assert_eq!(
918 resolve_req.payload.get("resolution").and_then(|v| v.as_str()),
919 Some("deny"),
920 );
921 assert_eq!(
922 resolve_req.payload.get("owner_client_id").and_then(|v| v.as_str()),
923 Some("client-A"),
924 );
925 assert_eq!(
926 resolve_req
927 .payload
928 .get("requesting_client_id")
929 .and_then(|v| v.as_str()),
930 Some("client-B"),
931 );
932
933 let still = host
936 .approvals()
937 .await
938 .into_iter()
939 .find(|a| a.id == approval.id)
940 .unwrap();
941 assert_eq!(still.status, HostApprovalStatus::Pending);
942
943 let resolved = host
948 .resolve_approval(
949 "client-A",
950 ResolveHostApprovalRequest {
951 approval_id: approval.id.clone(),
952 resolution: "deny".into(),
953 },
954 )
955 .await
956 .expect("owner resolves on own session");
957 assert_eq!(resolved.status, HostApprovalStatus::Resolved);
958 assert_eq!(resolved.resolution.as_deref(), Some("deny"));
959 }
960
961
962 #[tokio::test]
963 async fn system_approval_resolvable_by_any_session() {
964 let host = HostState::new();
968 let approval = host
969 .create_approval(
970 None,
971 CreateHostApprovalRequest {
972 agent_id: None,
973 action: "ws.method:automation.run_applescript".into(),
974 details: serde_json::json!({}),
975 options: vec![],
976 system_level: false,
977 },
978 )
979 .await
980 .unwrap();
981 assert!(
982 approval.client_id.is_none(),
983 "system approval must carry no owner"
984 );
985
986 host.resolve_approval(
987 "ui-session",
988 ResolveHostApprovalRequest {
989 approval_id: approval.id,
990 resolution: "approve".into(),
991 },
992 )
993 .await
994 .expect("any session can resolve a system approval");
995 }
996}