1use crate::session::WsChannel;
8use car_proto::{
9 CreateHostApprovalRequest, HostAgent, HostAgentStatus, HostApprovalRequest, HostApprovalStatus,
10 HostEvent, RegisterHostAgentRequest, ResolveHostApprovalRequest, SetHostAgentStatusRequest,
11};
12use chrono::Utc;
13use futures::SinkExt;
14use serde_json::Value;
15use std::collections::{HashMap, VecDeque};
16use std::sync::Arc;
17use std::time::Duration;
18use tokio::sync::{Mutex, Notify};
19use tokio_tungstenite::tungstenite::Message;
20
21const MAX_EVENTS: usize = 500;
22
23#[derive(Default)]
24pub struct HostState {
25 agents: Mutex<HashMap<String, HostAgent>>,
26 approvals: Mutex<HashMap<String, HostApprovalRequest>>,
27 events: Mutex<VecDeque<HostEvent>>,
28 subscribers: Mutex<HashMap<String, Arc<WsChannel>>>,
29 notifies: Mutex<HashMap<String, Arc<Notify>>>,
37}
38
39#[derive(Debug, Clone, Copy, PartialEq, Eq)]
44pub enum ApprovalOutcome {
45 Approved,
48 Denied,
51 TimedOut,
55}
56
57impl HostState {
58 pub fn new() -> Self {
59 Self::default()
60 }
61
62 pub async fn subscribe(&self, client_id: &str, channel: Arc<WsChannel>) {
63 self.subscribers
64 .lock()
65 .await
66 .insert(client_id.to_string(), channel);
67 }
68
69 pub async fn unsubscribe(&self, client_id: &str) {
70 self.subscribers.lock().await.remove(client_id);
71 }
72
73 pub async fn register_agent(
74 &self,
75 client_id: &str,
76 req: RegisterHostAgentRequest,
77 ) -> Result<HostAgent, String> {
78 let id = req.id.unwrap_or_else(|| format!("agent-{}", short_id()));
79 {
84 let agents = self.agents.lock().await;
85 if let Some(existing) = agents.get(&id) {
86 match existing.session_id.as_deref() {
87 Some(owner) if owner != client_id => {
88 return Err(format!(
89 "agent '{id}' is owned by another session; \
90 unregister it from that session first"
91 ));
92 }
93 _ => {}
94 }
95 }
96 }
97
98 let agent = HostAgent {
99 id: id.clone(),
100 name: req.name,
101 kind: req.kind,
102 capabilities: req.capabilities,
103 project: req.project,
104 session_id: Some(client_id.to_string()),
105 status: HostAgentStatus::Idle,
106 current_task: None,
107 pid: req.pid,
108 display: req.display,
109 updated_at: Utc::now(),
110 metadata: req.metadata,
111 };
112
113 self.agents.lock().await.insert(id.clone(), agent.clone());
114 self.record_event(
115 "agent.registered",
116 Some(id),
117 format!("{} registered", agent.name),
118 serde_json::to_value(&agent).map_err(|e| e.to_string())?,
119 )
120 .await;
121
122 Ok(agent)
123 }
124
125 pub async fn unregister_agent(
126 &self,
127 caller_client_id: &str,
128 agent_id: &str,
129 ) -> Result<(), String> {
130 {
134 let agents = self.agents.lock().await;
135 if let Some(existing) = agents.get(agent_id) {
136 if let Some(owner) = existing.session_id.as_deref() {
137 if owner != caller_client_id {
138 return Err(format!(
139 "agent '{agent_id}' is owned by another session"
140 ));
141 }
142 }
143 }
144 }
145
146 let removed = self.agents.lock().await.remove(agent_id);
147 if removed.is_none() {
148 return Err(format!("unknown agent '{}'", agent_id));
149 }
150 self.record_event(
151 "agent.unregistered",
152 Some(agent_id.to_string()),
153 format!("{} unregistered", agent_id),
154 Value::Null,
155 )
156 .await;
157 Ok(())
158 }
159
160 pub async fn set_status(
161 &self,
162 caller_client_id: &str,
163 req: SetHostAgentStatusRequest,
164 ) -> Result<HostAgent, String> {
165 let mut agents = self.agents.lock().await;
166 let agent = agents
167 .get_mut(&req.agent_id)
168 .ok_or_else(|| format!("unknown agent '{}'", req.agent_id))?;
169
170 if let Some(owner) = agent.session_id.as_deref() {
175 if owner != caller_client_id {
176 return Err(format!(
177 "agent '{}' is owned by another session",
178 req.agent_id
179 ));
180 }
181 }
182
183 agent.status = req.status.clone();
184 agent.current_task = req.current_task.clone();
185 agent.updated_at = Utc::now();
186 let updated = agent.clone();
187 drop(agents);
188
189 let message = req
190 .message
191 .unwrap_or_else(|| format!("{} is {:?}", updated.name, updated.status));
192 self.record_event(
193 "agent.status_changed",
194 Some(updated.id.clone()),
195 message,
196 if req.payload.is_null() {
197 serde_json::to_value(&updated).map_err(|e| e.to_string())?
198 } else {
199 req.payload
200 },
201 )
202 .await;
203
204 Ok(updated)
205 }
206
207 pub async fn create_approval(
215 &self,
216 caller_client_id: Option<&str>,
217 req: CreateHostApprovalRequest,
218 ) -> Result<HostApprovalRequest, String> {
219 let approval = HostApprovalRequest {
220 id: format!("approval-{}", short_id()),
221 agent_id: req.agent_id,
222 client_id: caller_client_id.map(|s| s.to_string()),
223 action: req.action,
224 details: req.details,
225 options: if req.options.is_empty() {
226 vec!["approve".to_string(), "deny".to_string()]
227 } else {
228 req.options
229 },
230 status: HostApprovalStatus::Pending,
231 created_at: Utc::now(),
232 resolved_at: None,
233 resolution: None,
234 };
235
236 self.approvals
237 .lock()
238 .await
239 .insert(approval.id.clone(), approval.clone());
240 self.record_event(
241 "approval.requested",
242 approval.agent_id.clone(),
243 format!("Approval requested: {}", approval.action),
244 serde_json::to_value(&approval).map_err(|e| e.to_string())?,
245 )
246 .await;
247 Ok(approval)
248 }
249
250 pub async fn resolve_approval(
257 &self,
258 caller_client_id: &str,
259 req: ResolveHostApprovalRequest,
260 ) -> Result<HostApprovalRequest, String> {
261 let mut approvals = self.approvals.lock().await;
262 let approval = approvals
263 .get_mut(&req.approval_id)
264 .ok_or_else(|| format!("unknown approval '{}'", req.approval_id))?;
265 if let Some(owner) = approval.client_id.as_deref() {
266 if owner != caller_client_id {
267 return Err(format!(
268 "approval '{}' is owned by another session",
269 req.approval_id
270 ));
271 }
272 }
273 approval.status = HostApprovalStatus::Resolved;
274 approval.resolution = Some(req.resolution);
275 approval.resolved_at = Some(Utc::now());
276 let resolved = approval.clone();
277 drop(approvals);
278
279 if let Some(notify) = self.notifies.lock().await.remove(&resolved.id) {
284 notify.notify_one();
285 }
286
287 self.record_event(
288 "approval.resolved",
289 resolved.agent_id.clone(),
290 format!("Approval resolved: {}", resolved.action),
291 serde_json::to_value(&resolved).map_err(|e| e.to_string())?,
292 )
293 .await;
294 Ok(resolved)
295 }
296
297 pub async fn request_and_wait_approval(
319 &self,
320 req: CreateHostApprovalRequest,
321 approve_label: &str,
322 timeout: Duration,
323 ) -> Result<ApprovalOutcome, String> {
324 let approval = self.create_approval(None, req).await?;
330 let approval_id = approval.id.clone();
331
332 let notify = Arc::new(Notify::new());
337 {
338 let mut map = self.notifies.lock().await;
339 map.insert(approval_id.clone(), notify.clone());
340 }
341
342 if let Some(resolved) = self.approvals.lock().await.get(&approval_id).cloned() {
347 if matches!(resolved.status, HostApprovalStatus::Resolved) {
348 self.notifies.lock().await.remove(&approval_id);
349 return Ok(classify_resolution(&resolved, approve_label));
350 }
351 }
352
353 let woken = tokio::time::timeout(timeout, notify.notified()).await;
358 if woken.is_err() {
359 self.notifies.lock().await.remove(&approval_id);
360 return Ok(ApprovalOutcome::TimedOut);
361 }
362
363 let resolved = self
364 .approvals
365 .lock()
366 .await
367 .get(&approval_id)
368 .cloned()
369 .ok_or_else(|| format!("approval '{}' vanished after notify", approval_id))?;
370 Ok(classify_resolution(&resolved, approve_label))
371 }
372
373 pub async fn agents(&self) -> Vec<HostAgent> {
374 let mut agents: Vec<_> = self.agents.lock().await.values().cloned().collect();
375 agents.sort_by(|a, b| a.name.cmp(&b.name).then(a.id.cmp(&b.id)));
376 agents
377 }
378
379 pub async fn approvals(&self) -> Vec<HostApprovalRequest> {
380 let mut approvals: Vec<_> = self.approvals.lock().await.values().cloned().collect();
381 approvals.sort_by(|a, b| b.created_at.cmp(&a.created_at));
382 approvals
383 }
384
385 pub async fn events(&self, limit: usize) -> Vec<HostEvent> {
386 self.events
387 .lock()
388 .await
389 .iter()
390 .rev()
391 .take(limit)
392 .cloned()
393 .collect()
394 }
395
396 pub async fn record_event(
397 &self,
398 kind: impl Into<String>,
399 agent_id: Option<String>,
400 message: impl Into<String>,
401 payload: Value,
402 ) -> HostEvent {
403 let event = HostEvent {
404 id: format!("event-{}", short_id()),
405 timestamp: Utc::now(),
406 kind: kind.into(),
407 agent_id,
408 message: message.into(),
409 payload,
410 };
411
412 {
413 let mut events = self.events.lock().await;
414 events.push_back(event.clone());
415 while events.len() > MAX_EVENTS {
416 events.pop_front();
417 }
418 }
419
420 self.broadcast_event(&event).await;
421 event
422 }
423
424 async fn broadcast_event(&self, event: &HostEvent) {
425 let subscribers: Vec<Arc<WsChannel>> =
426 self.subscribers.lock().await.values().cloned().collect();
427
428 let Ok(json) = serde_json::to_string(&serde_json::json!({
429 "jsonrpc": "2.0",
430 "method": "host.event",
431 "params": event,
432 })) else {
433 return;
434 };
435
436 for channel in subscribers {
437 let _ = channel
438 .write
439 .lock()
440 .await
441 .send(Message::Text(json.clone().into()))
442 .await;
443 }
444 }
445}
446
447fn short_id() -> String {
448 uuid::Uuid::new_v4().simple().to_string()[..12].to_string()
449}
450
451fn classify_resolution(approval: &HostApprovalRequest, approve_label: &str) -> ApprovalOutcome {
452 match approval.resolution.as_deref() {
453 Some(r) if r == approve_label => ApprovalOutcome::Approved,
454 _ => ApprovalOutcome::Denied,
455 }
456}
457
458#[cfg(test)]
459mod tests {
460 use super::*;
461
462 #[tokio::test]
463 async fn host_tracks_agents_events_and_approvals() {
464 let host = HostState::new();
465
466 let agent = host
467 .register_agent(
468 "client-1",
469 RegisterHostAgentRequest {
470 id: Some("agent-1".to_string()),
471 name: "Researcher".to_string(),
472 kind: "builtin".to_string(),
473 capabilities: vec!["search".to_string()],
474 project: Some("/tmp/project".to_string()),
475 pid: None,
476 display: car_proto::HostAgentDisplay {
477 label: Some("Research Lead".to_string()),
478 icon: Some("magnifying-glass".to_string()),
479 accent: Some("#0a84ff".to_string()),
480 },
481 metadata: Value::Null,
482 },
483 )
484 .await
485 .expect("register agent");
486
487 assert_eq!(agent.status, HostAgentStatus::Idle);
488 assert_eq!(agent.display.label.as_deref(), Some("Research Lead"));
489 assert_eq!(agent.display.icon.as_deref(), Some("magnifying-glass"));
490 assert_eq!(agent.display.accent.as_deref(), Some("#0a84ff"));
491 assert_eq!(host.agents().await.len(), 1);
492
493 let updated = host
494 .set_status(
495 "client-1",
496 SetHostAgentStatusRequest {
497 agent_id: "agent-1".to_string(),
498 status: HostAgentStatus::Running,
499 current_task: Some("Collect facts".to_string()),
500 message: None,
501 payload: Value::Null,
502 },
503 )
504 .await
505 .expect("set status");
506
507 assert_eq!(updated.status, HostAgentStatus::Running);
508 assert_eq!(updated.current_task.as_deref(), Some("Collect facts"));
509
510 let approval = host
511 .create_approval(
512 Some("client-1"),
513 CreateHostApprovalRequest {
514 agent_id: Some("agent-1".to_string()),
515 action: "Run tests".to_string(),
516 details: serde_json::json!({ "command": "cargo test" }),
517 options: vec![],
518 },
519 )
520 .await
521 .expect("create approval");
522
523 assert_eq!(approval.options, vec!["approve", "deny"]);
524 assert_eq!(approval.status, HostApprovalStatus::Pending);
525
526 let resolved = host
527 .resolve_approval(
528 "client-1",
529 ResolveHostApprovalRequest {
530 approval_id: approval.id,
531 resolution: "approve".to_string(),
532 },
533 )
534 .await
535 .expect("resolve approval");
536
537 assert_eq!(resolved.status, HostApprovalStatus::Resolved);
538 assert_eq!(resolved.resolution.as_deref(), Some("approve"));
539 assert!(host.events(10).await.len() >= 4);
540 }
541
542 #[tokio::test]
543 async fn request_and_wait_returns_approved_when_user_approves() {
544 let host = Arc::new(HostState::new());
545 let host2 = host.clone();
546
547 let waiter = tokio::spawn(async move {
550 host2
551 .request_and_wait_approval(
552 CreateHostApprovalRequest {
553 agent_id: None,
554 action: "automation.run_applescript".into(),
555 details: serde_json::json!({}),
556 options: vec![],
557 },
558 "approve",
559 Duration::from_secs(2),
560 )
561 .await
562 .expect("gate ran")
563 });
564
565 tokio::time::sleep(Duration::from_millis(20)).await;
570 let pending = host.approvals().await;
571 assert_eq!(pending.len(), 1, "exactly one pending approval");
572 assert!(
573 pending[0].client_id.is_none(),
574 "gate-raised approvals must be system-level"
575 );
576 host.resolve_approval(
577 "ui-session",
578 ResolveHostApprovalRequest {
579 approval_id: pending[0].id.clone(),
580 resolution: "approve".into(),
581 },
582 )
583 .await
584 .unwrap();
585
586 let outcome = waiter.await.unwrap();
587 assert_eq!(outcome, ApprovalOutcome::Approved);
588 }
589
590 #[tokio::test]
591 async fn request_and_wait_returns_denied_on_other_resolution() {
592 let host = Arc::new(HostState::new());
593 let host2 = host.clone();
594 let waiter = tokio::spawn(async move {
595 host2
596 .request_and_wait_approval(
597 CreateHostApprovalRequest {
598 agent_id: None,
599 action: "messages.send".into(),
600 details: serde_json::json!({}),
601 options: vec![],
602 },
603 "approve",
604 Duration::from_secs(2),
605 )
606 .await
607 .unwrap()
608 });
609 tokio::time::sleep(Duration::from_millis(20)).await;
610 let pending = host.approvals().await;
611 host.resolve_approval(
612 "ui-session",
613 ResolveHostApprovalRequest {
614 approval_id: pending[0].id.clone(),
615 resolution: "deny".into(),
616 },
617 )
618 .await
619 .unwrap();
620 assert_eq!(waiter.await.unwrap(), ApprovalOutcome::Denied);
621 }
622
623 #[tokio::test]
624 async fn request_and_wait_times_out_when_no_resolution() {
625 let host = HostState::new();
626 let outcome = host
627 .request_and_wait_approval(
628 CreateHostApprovalRequest {
629 agent_id: None,
630 action: "vision.ocr".into(),
631 details: serde_json::json!({}),
632 options: vec![],
633 },
634 "approve",
635 Duration::from_millis(50),
636 )
637 .await
638 .unwrap();
639 assert_eq!(outcome, ApprovalOutcome::TimedOut);
640 let pending = host.approvals().await;
642 assert_eq!(pending.len(), 1);
643 assert_eq!(pending[0].status, HostApprovalStatus::Pending);
644 }
645
646 fn make_register_request(name: &str) -> RegisterHostAgentRequest {
649 RegisterHostAgentRequest {
650 id: Some(name.into()),
651 name: name.into(),
652 kind: "test".into(),
653 capabilities: vec![],
654 project: None,
655 pid: None,
656 display: car_proto::HostAgentDisplay {
657 label: None,
658 icon: None,
659 accent: None,
660 },
661 metadata: Value::Null,
662 }
663 }
664
665 #[tokio::test]
666 async fn set_status_rejects_non_owning_session() {
667 let host = HostState::new();
668 host.register_agent("client-A", make_register_request("worker"))
669 .await
670 .unwrap();
671
672 let err = host
674 .set_status(
675 "client-B",
676 SetHostAgentStatusRequest {
677 agent_id: "worker".into(),
678 status: HostAgentStatus::Errored,
679 current_task: None,
680 message: None,
681 payload: Value::Null,
682 },
683 )
684 .await
685 .unwrap_err();
686 assert!(
687 err.contains("owned by another session"),
688 "unexpected rejection message: {err}"
689 );
690
691 host.set_status(
693 "client-A",
694 SetHostAgentStatusRequest {
695 agent_id: "worker".into(),
696 status: HostAgentStatus::Running,
697 current_task: None,
698 message: None,
699 payload: Value::Null,
700 },
701 )
702 .await
703 .expect("owner can mutate");
704 }
705
706 #[tokio::test]
707 async fn unregister_rejects_non_owning_session() {
708 let host = HostState::new();
709 host.register_agent("client-A", make_register_request("worker"))
710 .await
711 .unwrap();
712
713 let err = host
714 .unregister_agent("client-B", "worker")
715 .await
716 .unwrap_err();
717 assert!(err.contains("owned by another session"));
718 assert_eq!(host.agents().await.len(), 1, "agent must survive");
719
720 host.unregister_agent("client-A", "worker")
721 .await
722 .expect("owner can unregister");
723 assert_eq!(host.agents().await.len(), 0);
724 }
725
726 #[tokio::test]
727 async fn register_refuses_to_overwrite_other_sessions_agent() {
728 let host = HostState::new();
729 host.register_agent("client-A", make_register_request("worker"))
730 .await
731 .unwrap();
732
733 let err = host
734 .register_agent("client-B", make_register_request("worker"))
735 .await
736 .unwrap_err();
737 assert!(
738 err.contains("owned by another session"),
739 "unexpected message: {err}"
740 );
741 }
742
743 #[tokio::test]
744 async fn manual_approval_only_resolvable_by_creator() {
745 let host = HostState::new();
746 let approval = host
747 .create_approval(
748 Some("client-A"),
749 CreateHostApprovalRequest {
750 agent_id: None,
751 action: "manual.action".into(),
752 details: serde_json::json!({}),
753 options: vec![],
754 },
755 )
756 .await
757 .unwrap();
758
759 let err = host
761 .resolve_approval(
762 "client-B",
763 ResolveHostApprovalRequest {
764 approval_id: approval.id.clone(),
765 resolution: "approve".into(),
766 },
767 )
768 .await
769 .unwrap_err();
770 assert!(err.contains("owned by another session"));
771
772 host.resolve_approval(
774 "client-A",
775 ResolveHostApprovalRequest {
776 approval_id: approval.id,
777 resolution: "deny".into(),
778 },
779 )
780 .await
781 .expect("owner can resolve");
782 }
783
784 #[tokio::test]
785 async fn system_approval_resolvable_by_any_session() {
786 let host = HostState::new();
790 let approval = host
791 .create_approval(
792 None,
793 CreateHostApprovalRequest {
794 agent_id: None,
795 action: "ws.method:automation.run_applescript".into(),
796 details: serde_json::json!({}),
797 options: vec![],
798 },
799 )
800 .await
801 .unwrap();
802 assert!(
803 approval.client_id.is_none(),
804 "system approval must carry no owner"
805 );
806
807 host.resolve_approval(
808 "ui-session",
809 ResolveHostApprovalRequest {
810 approval_id: approval.id,
811 resolution: "approve".into(),
812 },
813 )
814 .await
815 .expect("any session can resolve a system approval");
816 }
817}