1use crate::session::WsChannel;
8use car_proto::{
9 CreateHostApprovalRequest, HostAgent, HostAgentStatus, HostApprovalRequest, HostApprovalStatus,
10 HostEvent, RegisterHostAgentRequest, ResolveHostApprovalRequest, SetHostAgentStatusRequest,
11};
12use chrono::Utc;
13use futures::SinkExt;
14use serde_json::Value;
15use std::collections::{HashMap, VecDeque};
16use std::sync::Arc;
17use std::time::Duration;
18use tokio::sync::{Mutex, Notify};
19use tokio_tungstenite::tungstenite::Message;
20
21const MAX_EVENTS: usize = 500;
22
23#[derive(Default)]
24pub struct HostState {
25 agents: Mutex<HashMap<String, HostAgent>>,
26 approvals: Mutex<HashMap<String, HostApprovalRequest>>,
27 events: Mutex<VecDeque<HostEvent>>,
28 subscribers: Mutex<HashMap<String, Arc<WsChannel>>>,
29 notifies: Mutex<HashMap<String, Arc<Notify>>>,
37}
38
39#[derive(Debug, Clone, Copy, PartialEq, Eq)]
44pub enum ApprovalOutcome {
45 Approved,
48 Denied,
51 TimedOut,
55}
56
57impl HostState {
58 pub fn new() -> Self {
59 Self::default()
60 }
61
62 pub async fn subscribe(&self, client_id: &str, channel: Arc<WsChannel>) {
63 self.subscribers
64 .lock()
65 .await
66 .insert(client_id.to_string(), channel);
67 }
68
69 pub async fn unsubscribe(&self, client_id: &str) {
70 self.subscribers.lock().await.remove(client_id);
71 }
72
73 pub async fn register_agent(
74 &self,
75 client_id: &str,
76 req: RegisterHostAgentRequest,
77 ) -> Result<HostAgent, String> {
78 let id = req.id.unwrap_or_else(|| format!("agent-{}", short_id()));
79 {
84 let agents = self.agents.lock().await;
85 if let Some(existing) = agents.get(&id) {
86 match existing.session_id.as_deref() {
87 Some(owner) if owner != client_id => {
88 return Err(format!(
89 "agent '{id}' is owned by another session; \
90 unregister it from that session first"
91 ));
92 }
93 _ => {}
94 }
95 }
96 }
97
98 let agent = HostAgent {
99 id: id.clone(),
100 name: req.name,
101 kind: req.kind,
102 capabilities: req.capabilities,
103 project: req.project,
104 session_id: Some(client_id.to_string()),
105 status: HostAgentStatus::Idle,
106 current_task: None,
107 pid: req.pid,
108 display: req.display,
109 updated_at: Utc::now(),
110 metadata: req.metadata,
111 };
112
113 self.agents.lock().await.insert(id.clone(), agent.clone());
114 self.record_event(
115 "agent.registered",
116 Some(id),
117 format!("{} registered", agent.name),
118 serde_json::to_value(&agent).map_err(|e| e.to_string())?,
119 )
120 .await;
121
122 Ok(agent)
123 }
124
125 pub async fn unregister_agent(
126 &self,
127 caller_client_id: &str,
128 agent_id: &str,
129 ) -> Result<(), String> {
130 {
134 let agents = self.agents.lock().await;
135 if let Some(existing) = agents.get(agent_id) {
136 if let Some(owner) = existing.session_id.as_deref() {
137 if owner != caller_client_id {
138 return Err(format!("agent '{agent_id}' is owned by another session"));
139 }
140 }
141 }
142 }
143
144 let removed = self.agents.lock().await.remove(agent_id);
145 if removed.is_none() {
146 return Err(format!("unknown agent '{}'", agent_id));
147 }
148 self.record_event(
149 "agent.unregistered",
150 Some(agent_id.to_string()),
151 format!("{} unregistered", agent_id),
152 Value::Null,
153 )
154 .await;
155 Ok(())
156 }
157
158 pub async fn set_status(
159 &self,
160 caller_client_id: &str,
161 req: SetHostAgentStatusRequest,
162 ) -> Result<HostAgent, String> {
163 let mut agents = self.agents.lock().await;
164 let agent = agents
165 .get_mut(&req.agent_id)
166 .ok_or_else(|| format!("unknown agent '{}'", req.agent_id))?;
167
168 if let Some(owner) = agent.session_id.as_deref() {
173 if owner != caller_client_id {
174 return Err(format!(
175 "agent '{}' is owned by another session",
176 req.agent_id
177 ));
178 }
179 }
180
181 agent.status = req.status.clone();
182 agent.current_task = req.current_task.clone();
183 agent.updated_at = Utc::now();
184 let updated = agent.clone();
185 drop(agents);
186
187 let message = req
188 .message
189 .unwrap_or_else(|| format!("{} is {:?}", updated.name, updated.status));
190 self.record_event(
191 "agent.status_changed",
192 Some(updated.id.clone()),
193 message,
194 if req.payload.is_null() {
195 serde_json::to_value(&updated).map_err(|e| e.to_string())?
196 } else {
197 req.payload
198 },
199 )
200 .await;
201
202 Ok(updated)
203 }
204
205 pub async fn create_approval(
213 &self,
214 caller_client_id: Option<&str>,
215 req: CreateHostApprovalRequest,
216 ) -> Result<HostApprovalRequest, String> {
217 let approval = HostApprovalRequest {
218 id: format!("approval-{}", short_id()),
219 agent_id: req.agent_id,
220 client_id: caller_client_id.map(|s| s.to_string()),
221 action: req.action,
222 details: req.details,
223 options: if req.options.is_empty() {
224 vec!["approve".to_string(), "deny".to_string()]
225 } else {
226 req.options
227 },
228 status: HostApprovalStatus::Pending,
229 created_at: Utc::now(),
230 resolved_at: None,
231 resolution: None,
232 };
233
234 self.approvals
235 .lock()
236 .await
237 .insert(approval.id.clone(), approval.clone());
238 self.record_event(
239 "approval.requested",
240 approval.agent_id.clone(),
241 format!("Approval requested: {}", approval.action),
242 serde_json::to_value(&approval).map_err(|e| e.to_string())?,
243 )
244 .await;
245 Ok(approval)
246 }
247
248 pub async fn resolve_approval(
255 &self,
256 caller_client_id: &str,
257 req: ResolveHostApprovalRequest,
258 ) -> Result<HostApprovalRequest, String> {
259 let mut approvals = self.approvals.lock().await;
260 let approval = approvals
261 .get_mut(&req.approval_id)
262 .ok_or_else(|| format!("unknown approval '{}'", req.approval_id))?;
263 if let Some(owner) = approval.client_id.as_deref() {
264 if owner != caller_client_id {
265 return Err(format!(
266 "approval '{}' is owned by another session",
267 req.approval_id
268 ));
269 }
270 }
271 approval.status = HostApprovalStatus::Resolved;
272 approval.resolution = Some(req.resolution);
273 approval.resolved_at = Some(Utc::now());
274 let resolved = approval.clone();
275 drop(approvals);
276
277 if let Some(notify) = self.notifies.lock().await.remove(&resolved.id) {
282 notify.notify_one();
283 }
284
285 self.record_event(
286 "approval.resolved",
287 resolved.agent_id.clone(),
288 format!("Approval resolved: {}", resolved.action),
289 serde_json::to_value(&resolved).map_err(|e| e.to_string())?,
290 )
291 .await;
292 Ok(resolved)
293 }
294
295 pub async fn request_and_wait_approval(
317 &self,
318 req: CreateHostApprovalRequest,
319 approve_label: &str,
320 timeout: Duration,
321 ) -> Result<ApprovalOutcome, String> {
322 let approval = self.create_approval(None, req).await?;
328 let approval_id = approval.id.clone();
329
330 let notify = Arc::new(Notify::new());
335 {
336 let mut map = self.notifies.lock().await;
337 map.insert(approval_id.clone(), notify.clone());
338 }
339
340 if let Some(resolved) = self.approvals.lock().await.get(&approval_id).cloned() {
345 if matches!(resolved.status, HostApprovalStatus::Resolved) {
346 self.notifies.lock().await.remove(&approval_id);
347 return Ok(classify_resolution(&resolved, approve_label));
348 }
349 }
350
351 let woken = tokio::time::timeout(timeout, notify.notified()).await;
356 if woken.is_err() {
357 self.notifies.lock().await.remove(&approval_id);
358 return Ok(ApprovalOutcome::TimedOut);
359 }
360
361 let resolved = self
362 .approvals
363 .lock()
364 .await
365 .get(&approval_id)
366 .cloned()
367 .ok_or_else(|| format!("approval '{}' vanished after notify", approval_id))?;
368 Ok(classify_resolution(&resolved, approve_label))
369 }
370
371 pub async fn agents(&self) -> Vec<HostAgent> {
372 let mut agents: Vec<_> = self.agents.lock().await.values().cloned().collect();
373 agents.sort_by(|a, b| a.name.cmp(&b.name).then(a.id.cmp(&b.id)));
374 agents
375 }
376
377 pub async fn approvals(&self) -> Vec<HostApprovalRequest> {
378 let mut approvals: Vec<_> = self.approvals.lock().await.values().cloned().collect();
379 approvals.sort_by(|a, b| b.created_at.cmp(&a.created_at));
380 approvals
381 }
382
383 pub async fn events(&self, limit: usize) -> Vec<HostEvent> {
384 self.events
385 .lock()
386 .await
387 .iter()
388 .rev()
389 .take(limit)
390 .cloned()
391 .collect()
392 }
393
394 pub async fn record_event(
395 &self,
396 kind: impl Into<String>,
397 agent_id: Option<String>,
398 message: impl Into<String>,
399 payload: Value,
400 ) -> HostEvent {
401 let event = HostEvent {
402 id: format!("event-{}", short_id()),
403 timestamp: Utc::now(),
404 kind: kind.into(),
405 agent_id,
406 message: message.into(),
407 payload,
408 };
409
410 {
411 let mut events = self.events.lock().await;
412 events.push_back(event.clone());
413 while events.len() > MAX_EVENTS {
414 events.pop_front();
415 }
416 }
417
418 self.broadcast_event(&event).await;
419 event
420 }
421
422 async fn broadcast_event(&self, event: &HostEvent) {
423 let subscribers: Vec<Arc<WsChannel>> =
424 self.subscribers.lock().await.values().cloned().collect();
425
426 let Ok(json) = serde_json::to_string(&serde_json::json!({
427 "jsonrpc": "2.0",
428 "method": "host.event",
429 "params": event,
430 })) else {
431 return;
432 };
433
434 for channel in subscribers {
435 let _ = channel
436 .write
437 .lock()
438 .await
439 .send(Message::Text(json.clone().into()))
440 .await;
441 }
442 }
443}
444
445fn short_id() -> String {
446 uuid::Uuid::new_v4().simple().to_string()[..12].to_string()
447}
448
449fn classify_resolution(approval: &HostApprovalRequest, approve_label: &str) -> ApprovalOutcome {
450 match approval.resolution.as_deref() {
451 Some(r) if r == approve_label => ApprovalOutcome::Approved,
452 _ => ApprovalOutcome::Denied,
453 }
454}
455
456#[cfg(test)]
457mod tests {
458 use super::*;
459
460 #[tokio::test]
461 async fn host_tracks_agents_events_and_approvals() {
462 let host = HostState::new();
463
464 let agent = host
465 .register_agent(
466 "client-1",
467 RegisterHostAgentRequest {
468 id: Some("agent-1".to_string()),
469 name: "Researcher".to_string(),
470 kind: "builtin".to_string(),
471 capabilities: vec!["search".to_string()],
472 project: Some("/tmp/project".to_string()),
473 pid: None,
474 display: car_proto::HostAgentDisplay {
475 label: Some("Research Lead".to_string()),
476 icon: Some("magnifying-glass".to_string()),
477 accent: Some("#0a84ff".to_string()),
478 },
479 metadata: Value::Null,
480 },
481 )
482 .await
483 .expect("register agent");
484
485 assert_eq!(agent.status, HostAgentStatus::Idle);
486 assert_eq!(agent.display.label.as_deref(), Some("Research Lead"));
487 assert_eq!(agent.display.icon.as_deref(), Some("magnifying-glass"));
488 assert_eq!(agent.display.accent.as_deref(), Some("#0a84ff"));
489 assert_eq!(host.agents().await.len(), 1);
490
491 let updated = host
492 .set_status(
493 "client-1",
494 SetHostAgentStatusRequest {
495 agent_id: "agent-1".to_string(),
496 status: HostAgentStatus::Running,
497 current_task: Some("Collect facts".to_string()),
498 message: None,
499 payload: Value::Null,
500 },
501 )
502 .await
503 .expect("set status");
504
505 assert_eq!(updated.status, HostAgentStatus::Running);
506 assert_eq!(updated.current_task.as_deref(), Some("Collect facts"));
507
508 let approval = host
509 .create_approval(
510 Some("client-1"),
511 CreateHostApprovalRequest {
512 agent_id: Some("agent-1".to_string()),
513 action: "Run tests".to_string(),
514 details: serde_json::json!({ "command": "cargo test" }),
515 options: vec![],
516 system_level: false,
517 },
518 )
519 .await
520 .expect("create approval");
521
522 assert_eq!(approval.options, vec!["approve", "deny"]);
523 assert_eq!(approval.status, HostApprovalStatus::Pending);
524
525 let resolved = host
526 .resolve_approval(
527 "client-1",
528 ResolveHostApprovalRequest {
529 approval_id: approval.id,
530 resolution: "approve".to_string(),
531 },
532 )
533 .await
534 .expect("resolve approval");
535
536 assert_eq!(resolved.status, HostApprovalStatus::Resolved);
537 assert_eq!(resolved.resolution.as_deref(), Some("approve"));
538 assert!(host.events(10).await.len() >= 4);
539 }
540
541 #[tokio::test]
542 async fn request_and_wait_returns_approved_when_user_approves() {
543 let host = Arc::new(HostState::new());
544 let host2 = host.clone();
545
546 let waiter = tokio::spawn(async move {
549 host2
550 .request_and_wait_approval(
551 CreateHostApprovalRequest {
552 agent_id: None,
553 action: "automation.run_applescript".into(),
554 details: serde_json::json!({}),
555 options: vec![],
556 system_level: false,
557 },
558 "approve",
559 Duration::from_secs(2),
560 )
561 .await
562 .expect("gate ran")
563 });
564
565 tokio::time::sleep(Duration::from_millis(20)).await;
570 let pending = host.approvals().await;
571 assert_eq!(pending.len(), 1, "exactly one pending approval");
572 assert!(
573 pending[0].client_id.is_none(),
574 "gate-raised approvals must be system-level"
575 );
576 host.resolve_approval(
577 "ui-session",
578 ResolveHostApprovalRequest {
579 approval_id: pending[0].id.clone(),
580 resolution: "approve".into(),
581 },
582 )
583 .await
584 .unwrap();
585
586 let outcome = waiter.await.unwrap();
587 assert_eq!(outcome, ApprovalOutcome::Approved);
588 }
589
590 #[tokio::test]
591 async fn request_and_wait_returns_denied_on_other_resolution() {
592 let host = Arc::new(HostState::new());
593 let host2 = host.clone();
594 let waiter = tokio::spawn(async move {
595 host2
596 .request_and_wait_approval(
597 CreateHostApprovalRequest {
598 agent_id: None,
599 action: "messages.send".into(),
600 details: serde_json::json!({}),
601 options: vec![],
602 system_level: false,
603 },
604 "approve",
605 Duration::from_secs(2),
606 )
607 .await
608 .unwrap()
609 });
610 tokio::time::sleep(Duration::from_millis(20)).await;
611 let pending = host.approvals().await;
612 host.resolve_approval(
613 "ui-session",
614 ResolveHostApprovalRequest {
615 approval_id: pending[0].id.clone(),
616 resolution: "deny".into(),
617 },
618 )
619 .await
620 .unwrap();
621 assert_eq!(waiter.await.unwrap(), ApprovalOutcome::Denied);
622 }
623
624 #[tokio::test]
625 async fn request_and_wait_times_out_when_no_resolution() {
626 let host = HostState::new();
627 let outcome = host
628 .request_and_wait_approval(
629 CreateHostApprovalRequest {
630 agent_id: None,
631 action: "vision.ocr".into(),
632 details: serde_json::json!({}),
633 options: vec![],
634 system_level: false,
635 },
636 "approve",
637 Duration::from_millis(50),
638 )
639 .await
640 .unwrap();
641 assert_eq!(outcome, ApprovalOutcome::TimedOut);
642 let pending = host.approvals().await;
644 assert_eq!(pending.len(), 1);
645 assert_eq!(pending[0].status, HostApprovalStatus::Pending);
646 }
647
648 fn make_register_request(name: &str) -> RegisterHostAgentRequest {
651 RegisterHostAgentRequest {
652 id: Some(name.into()),
653 name: name.into(),
654 kind: "test".into(),
655 capabilities: vec![],
656 project: None,
657 pid: None,
658 display: car_proto::HostAgentDisplay {
659 label: None,
660 icon: None,
661 accent: None,
662 },
663 metadata: Value::Null,
664 }
665 }
666
667 #[tokio::test]
668 async fn set_status_rejects_non_owning_session() {
669 let host = HostState::new();
670 host.register_agent("client-A", make_register_request("worker"))
671 .await
672 .unwrap();
673
674 let err = host
676 .set_status(
677 "client-B",
678 SetHostAgentStatusRequest {
679 agent_id: "worker".into(),
680 status: HostAgentStatus::Errored,
681 current_task: None,
682 message: None,
683 payload: Value::Null,
684 },
685 )
686 .await
687 .unwrap_err();
688 assert!(
689 err.contains("owned by another session"),
690 "unexpected rejection message: {err}"
691 );
692
693 host.set_status(
695 "client-A",
696 SetHostAgentStatusRequest {
697 agent_id: "worker".into(),
698 status: HostAgentStatus::Running,
699 current_task: None,
700 message: None,
701 payload: Value::Null,
702 },
703 )
704 .await
705 .expect("owner can mutate");
706 }
707
708 #[tokio::test]
709 async fn unregister_rejects_non_owning_session() {
710 let host = HostState::new();
711 host.register_agent("client-A", make_register_request("worker"))
712 .await
713 .unwrap();
714
715 let err = host
716 .unregister_agent("client-B", "worker")
717 .await
718 .unwrap_err();
719 assert!(err.contains("owned by another session"));
720 assert_eq!(host.agents().await.len(), 1, "agent must survive");
721
722 host.unregister_agent("client-A", "worker")
723 .await
724 .expect("owner can unregister");
725 assert_eq!(host.agents().await.len(), 0);
726 }
727
728 #[tokio::test]
729 async fn register_refuses_to_overwrite_other_sessions_agent() {
730 let host = HostState::new();
731 host.register_agent("client-A", make_register_request("worker"))
732 .await
733 .unwrap();
734
735 let err = host
736 .register_agent("client-B", make_register_request("worker"))
737 .await
738 .unwrap_err();
739 assert!(
740 err.contains("owned by another session"),
741 "unexpected message: {err}"
742 );
743 }
744
745 #[tokio::test]
746 async fn manual_approval_only_resolvable_by_creator() {
747 let host = HostState::new();
748 let approval = host
749 .create_approval(
750 Some("client-A"),
751 CreateHostApprovalRequest {
752 agent_id: None,
753 action: "manual.action".into(),
754 details: serde_json::json!({}),
755 options: vec![],
756 system_level: false,
757 },
758 )
759 .await
760 .unwrap();
761
762 let err = host
764 .resolve_approval(
765 "client-B",
766 ResolveHostApprovalRequest {
767 approval_id: approval.id.clone(),
768 resolution: "approve".into(),
769 },
770 )
771 .await
772 .unwrap_err();
773 assert!(err.contains("owned by another session"));
774
775 host.resolve_approval(
777 "client-A",
778 ResolveHostApprovalRequest {
779 approval_id: approval.id,
780 resolution: "deny".into(),
781 },
782 )
783 .await
784 .expect("owner can resolve");
785 }
786
787 #[tokio::test]
788 async fn system_approval_resolvable_by_any_session() {
789 let host = HostState::new();
793 let approval = host
794 .create_approval(
795 None,
796 CreateHostApprovalRequest {
797 agent_id: None,
798 action: "ws.method:automation.run_applescript".into(),
799 details: serde_json::json!({}),
800 options: vec![],
801 system_level: false,
802 },
803 )
804 .await
805 .unwrap();
806 assert!(
807 approval.client_id.is_none(),
808 "system approval must carry no owner"
809 );
810
811 host.resolve_approval(
812 "ui-session",
813 ResolveHostApprovalRequest {
814 approval_id: approval.id,
815 resolution: "approve".into(),
816 },
817 )
818 .await
819 .expect("any session can resolve a system approval");
820 }
821}