1use crate::session::WsChannel;
8use car_proto::{
9 CreateHostApprovalRequest, HostAgent, HostAgentStatus, HostApprovalRequest, HostApprovalStatus,
10 HostEvent, RegisterHostAgentRequest, ResolveHostApprovalRequest, SetHostAgentStatusRequest,
11};
12use chrono::Utc;
13use futures::SinkExt;
14use serde_json::Value;
15use std::collections::{HashMap, VecDeque};
16use std::sync::Arc;
17use std::time::Duration;
18use tokio::sync::{Mutex, Notify};
19use tokio_tungstenite::tungstenite::Message;
20
21const MAX_EVENTS: usize = 500;
22
23#[derive(Default)]
24pub struct HostState {
25 agents: Mutex<HashMap<String, HostAgent>>,
26 approvals: Mutex<HashMap<String, HostApprovalRequest>>,
27 events: Mutex<VecDeque<HostEvent>>,
28 subscribers: Mutex<HashMap<String, Arc<WsChannel>>>,
29 notifies: Mutex<HashMap<String, Arc<Notify>>>,
37}
38
39#[derive(Debug, Clone, Copy, PartialEq, Eq)]
44pub enum ApprovalOutcome {
45 Approved,
48 Denied,
51 TimedOut,
55}
56
57impl HostState {
58 pub fn new() -> Self {
59 Self::default()
60 }
61
62 pub async fn subscribe(&self, client_id: &str, channel: Arc<WsChannel>) {
63 self.subscribers
64 .lock()
65 .await
66 .insert(client_id.to_string(), channel);
67 }
68
69 pub async fn unsubscribe(&self, client_id: &str) {
70 self.subscribers.lock().await.remove(client_id);
71 }
72
73 pub async fn register_agent(
74 &self,
75 client_id: &str,
76 req: RegisterHostAgentRequest,
77 ) -> Result<HostAgent, String> {
78 let id = req.id.unwrap_or_else(|| format!("agent-{}", short_id()));
79 {
84 let agents = self.agents.lock().await;
85 if let Some(existing) = agents.get(&id) {
86 match existing.session_id.as_deref() {
87 Some(owner) if owner != client_id => {
88 return Err(format!(
89 "agent '{id}' is owned by another session; \
90 unregister it from that session first"
91 ));
92 }
93 _ => {}
94 }
95 }
96 }
97
98 let agent = HostAgent {
99 id: id.clone(),
100 name: req.name,
101 kind: req.kind,
102 capabilities: req.capabilities,
103 project: req.project,
104 session_id: Some(client_id.to_string()),
105 status: HostAgentStatus::Idle,
106 current_task: None,
107 pid: req.pid,
108 display: req.display,
109 updated_at: Utc::now(),
110 metadata: req.metadata,
111 };
112
113 self.agents.lock().await.insert(id.clone(), agent.clone());
114 self.record_event(
115 "agent.registered",
116 Some(id),
117 format!("{} registered", agent.name),
118 serde_json::to_value(&agent).map_err(|e| e.to_string())?,
119 )
120 .await;
121
122 Ok(agent)
123 }
124
125 pub async fn unregister_agent(
126 &self,
127 caller_client_id: &str,
128 agent_id: &str,
129 ) -> Result<(), String> {
130 {
134 let agents = self.agents.lock().await;
135 if let Some(existing) = agents.get(agent_id) {
136 if let Some(owner) = existing.session_id.as_deref() {
137 if owner != caller_client_id {
138 return Err(format!("agent '{agent_id}' is owned by another session"));
139 }
140 }
141 }
142 }
143
144 let removed = self.agents.lock().await.remove(agent_id);
145 if removed.is_none() {
146 return Err(format!("unknown agent '{}'", agent_id));
147 }
148 self.record_event(
149 "agent.unregistered",
150 Some(agent_id.to_string()),
151 format!("{} unregistered", agent_id),
152 Value::Null,
153 )
154 .await;
155 Ok(())
156 }
157
158 pub async fn set_status(
159 &self,
160 caller_client_id: &str,
161 req: SetHostAgentStatusRequest,
162 ) -> Result<HostAgent, String> {
163 let mut agents = self.agents.lock().await;
164 let agent = agents
165 .get_mut(&req.agent_id)
166 .ok_or_else(|| format!("unknown agent '{}'", req.agent_id))?;
167
168 if let Some(owner) = agent.session_id.as_deref() {
173 if owner != caller_client_id {
174 return Err(format!(
175 "agent '{}' is owned by another session",
176 req.agent_id
177 ));
178 }
179 }
180
181 agent.status = req.status.clone();
182 agent.current_task = req.current_task.clone();
183 agent.updated_at = Utc::now();
184 let updated = agent.clone();
185 drop(agents);
186
187 let message = req
188 .message
189 .unwrap_or_else(|| format!("{} is {:?}", updated.name, updated.status));
190 self.record_event(
191 "agent.status_changed",
192 Some(updated.id.clone()),
193 message,
194 if req.payload.is_null() {
195 serde_json::to_value(&updated).map_err(|e| e.to_string())?
196 } else {
197 req.payload
198 },
199 )
200 .await;
201
202 Ok(updated)
203 }
204
205 pub async fn create_approval(
213 &self,
214 caller_client_id: Option<&str>,
215 req: CreateHostApprovalRequest,
216 ) -> Result<HostApprovalRequest, String> {
217 let approval = HostApprovalRequest {
218 id: format!("approval-{}", short_id()),
219 agent_id: req.agent_id,
220 client_id: caller_client_id.map(|s| s.to_string()),
221 action: req.action,
222 details: req.details,
223 options: if req.options.is_empty() {
224 vec!["approve".to_string(), "deny".to_string()]
225 } else {
226 req.options
227 },
228 status: HostApprovalStatus::Pending,
229 created_at: Utc::now(),
230 resolved_at: None,
231 resolution: None,
232 };
233
234 self.approvals
235 .lock()
236 .await
237 .insert(approval.id.clone(), approval.clone());
238 self.record_event(
239 "approval.requested",
240 approval.agent_id.clone(),
241 format!("Approval requested: {}", approval.action),
242 serde_json::to_value(&approval).map_err(|e| e.to_string())?,
243 )
244 .await;
245 Ok(approval)
246 }
247
248 pub async fn resolve_approval(
255 &self,
256 caller_client_id: &str,
257 req: ResolveHostApprovalRequest,
258 ) -> Result<HostApprovalRequest, String> {
259 let mut approvals = self.approvals.lock().await;
260 let approval = approvals
261 .get_mut(&req.approval_id)
262 .ok_or_else(|| format!("unknown approval '{}'", req.approval_id))?;
263 if let Some(owner) = approval.client_id.as_deref() {
264 if owner != caller_client_id {
265 return Err(format!(
266 "approval '{}' is owned by another session",
267 req.approval_id
268 ));
269 }
270 }
271 approval.status = HostApprovalStatus::Resolved;
272 approval.resolution = Some(req.resolution);
273 approval.resolved_at = Some(Utc::now());
274 let resolved = approval.clone();
275 drop(approvals);
276
277 if let Some(notify) = self.notifies.lock().await.remove(&resolved.id) {
282 notify.notify_one();
283 }
284
285 self.record_event(
286 "approval.resolved",
287 resolved.agent_id.clone(),
288 format!("Approval resolved: {}", resolved.action),
289 serde_json::to_value(&resolved).map_err(|e| e.to_string())?,
290 )
291 .await;
292 Ok(resolved)
293 }
294
295 pub async fn request_and_wait_approval(
317 &self,
318 req: CreateHostApprovalRequest,
319 approve_label: &str,
320 timeout: Duration,
321 ) -> Result<ApprovalOutcome, String> {
322 let approval = self.create_approval(None, req).await?;
328 let approval_id = approval.id.clone();
329
330 let notify = Arc::new(Notify::new());
335 {
336 let mut map = self.notifies.lock().await;
337 map.insert(approval_id.clone(), notify.clone());
338 }
339
340 if let Some(resolved) = self.approvals.lock().await.get(&approval_id).cloned() {
345 if matches!(resolved.status, HostApprovalStatus::Resolved) {
346 self.notifies.lock().await.remove(&approval_id);
347 return Ok(classify_resolution(&resolved, approve_label));
348 }
349 }
350
351 let woken = tokio::time::timeout(timeout, notify.notified()).await;
356 if woken.is_err() {
357 self.notifies.lock().await.remove(&approval_id);
358 return Ok(ApprovalOutcome::TimedOut);
359 }
360
361 let resolved = self
362 .approvals
363 .lock()
364 .await
365 .get(&approval_id)
366 .cloned()
367 .ok_or_else(|| format!("approval '{}' vanished after notify", approval_id))?;
368 Ok(classify_resolution(&resolved, approve_label))
369 }
370
371 pub async fn agents(&self) -> Vec<HostAgent> {
372 let mut agents: Vec<_> = self.agents.lock().await.values().cloned().collect();
373 agents.sort_by(|a, b| a.name.cmp(&b.name).then(a.id.cmp(&b.id)));
374 agents
375 }
376
377 pub async fn approvals(&self) -> Vec<HostApprovalRequest> {
378 let mut approvals: Vec<_> = self.approvals.lock().await.values().cloned().collect();
379 approvals.sort_by(|a, b| b.created_at.cmp(&a.created_at));
380 approvals
381 }
382
383 pub async fn events(&self, limit: usize) -> Vec<HostEvent> {
384 self.events
385 .lock()
386 .await
387 .iter()
388 .rev()
389 .take(limit)
390 .cloned()
391 .collect()
392 }
393
394 pub async fn record_event(
395 &self,
396 kind: impl Into<String>,
397 agent_id: Option<String>,
398 message: impl Into<String>,
399 payload: Value,
400 ) -> HostEvent {
401 let event = HostEvent {
402 id: format!("event-{}", short_id()),
403 timestamp: Utc::now(),
404 kind: kind.into(),
405 agent_id,
406 message: message.into(),
407 payload,
408 };
409
410 {
411 let mut events = self.events.lock().await;
412 events.push_back(event.clone());
413 while events.len() > MAX_EVENTS {
414 events.pop_front();
415 }
416 }
417
418 self.broadcast_event(&event).await;
419 event
420 }
421
422 async fn broadcast_event(&self, event: &HostEvent) {
423 let subscribers: Vec<Arc<WsChannel>> =
424 self.subscribers.lock().await.values().cloned().collect();
425
426 let Ok(json) = serde_json::to_string(&serde_json::json!({
427 "jsonrpc": "2.0",
428 "method": "host.event",
429 "params": event,
430 })) else {
431 return;
432 };
433
434 for channel in subscribers {
435 let _ = channel
436 .write
437 .lock()
438 .await
439 .send(Message::Text(json.clone().into()))
440 .await;
441 }
442 }
443}
444
445fn short_id() -> String {
446 uuid::Uuid::new_v4().simple().to_string()[..12].to_string()
447}
448
449fn classify_resolution(approval: &HostApprovalRequest, approve_label: &str) -> ApprovalOutcome {
450 match approval.resolution.as_deref() {
451 Some(r) if r == approve_label => ApprovalOutcome::Approved,
452 _ => ApprovalOutcome::Denied,
453 }
454}
455
456#[cfg(test)]
457mod tests {
458 use super::*;
459
460 #[tokio::test]
461 async fn host_tracks_agents_events_and_approvals() {
462 let host = HostState::new();
463
464 let agent = host
465 .register_agent(
466 "client-1",
467 RegisterHostAgentRequest {
468 id: Some("agent-1".to_string()),
469 name: "Researcher".to_string(),
470 kind: "builtin".to_string(),
471 capabilities: vec!["search".to_string()],
472 project: Some("/tmp/project".to_string()),
473 pid: None,
474 display: car_proto::HostAgentDisplay {
475 label: Some("Research Lead".to_string()),
476 icon: Some("magnifying-glass".to_string()),
477 accent: Some("#0a84ff".to_string()),
478 },
479 metadata: Value::Null,
480 },
481 )
482 .await
483 .expect("register agent");
484
485 assert_eq!(agent.status, HostAgentStatus::Idle);
486 assert_eq!(agent.display.label.as_deref(), Some("Research Lead"));
487 assert_eq!(agent.display.icon.as_deref(), Some("magnifying-glass"));
488 assert_eq!(agent.display.accent.as_deref(), Some("#0a84ff"));
489 assert_eq!(host.agents().await.len(), 1);
490
491 let updated = host
492 .set_status(
493 "client-1",
494 SetHostAgentStatusRequest {
495 agent_id: "agent-1".to_string(),
496 status: HostAgentStatus::Running,
497 current_task: Some("Collect facts".to_string()),
498 message: None,
499 payload: Value::Null,
500 },
501 )
502 .await
503 .expect("set status");
504
505 assert_eq!(updated.status, HostAgentStatus::Running);
506 assert_eq!(updated.current_task.as_deref(), Some("Collect facts"));
507
508 let approval = host
509 .create_approval(
510 Some("client-1"),
511 CreateHostApprovalRequest {
512 agent_id: Some("agent-1".to_string()),
513 action: "Run tests".to_string(),
514 details: serde_json::json!({ "command": "cargo test" }),
515 options: vec![],
516 },
517 )
518 .await
519 .expect("create approval");
520
521 assert_eq!(approval.options, vec!["approve", "deny"]);
522 assert_eq!(approval.status, HostApprovalStatus::Pending);
523
524 let resolved = host
525 .resolve_approval(
526 "client-1",
527 ResolveHostApprovalRequest {
528 approval_id: approval.id,
529 resolution: "approve".to_string(),
530 },
531 )
532 .await
533 .expect("resolve approval");
534
535 assert_eq!(resolved.status, HostApprovalStatus::Resolved);
536 assert_eq!(resolved.resolution.as_deref(), Some("approve"));
537 assert!(host.events(10).await.len() >= 4);
538 }
539
540 #[tokio::test]
541 async fn request_and_wait_returns_approved_when_user_approves() {
542 let host = Arc::new(HostState::new());
543 let host2 = host.clone();
544
545 let waiter = tokio::spawn(async move {
548 host2
549 .request_and_wait_approval(
550 CreateHostApprovalRequest {
551 agent_id: None,
552 action: "automation.run_applescript".into(),
553 details: serde_json::json!({}),
554 options: vec![],
555 },
556 "approve",
557 Duration::from_secs(2),
558 )
559 .await
560 .expect("gate ran")
561 });
562
563 tokio::time::sleep(Duration::from_millis(20)).await;
568 let pending = host.approvals().await;
569 assert_eq!(pending.len(), 1, "exactly one pending approval");
570 assert!(
571 pending[0].client_id.is_none(),
572 "gate-raised approvals must be system-level"
573 );
574 host.resolve_approval(
575 "ui-session",
576 ResolveHostApprovalRequest {
577 approval_id: pending[0].id.clone(),
578 resolution: "approve".into(),
579 },
580 )
581 .await
582 .unwrap();
583
584 let outcome = waiter.await.unwrap();
585 assert_eq!(outcome, ApprovalOutcome::Approved);
586 }
587
588 #[tokio::test]
589 async fn request_and_wait_returns_denied_on_other_resolution() {
590 let host = Arc::new(HostState::new());
591 let host2 = host.clone();
592 let waiter = tokio::spawn(async move {
593 host2
594 .request_and_wait_approval(
595 CreateHostApprovalRequest {
596 agent_id: None,
597 action: "messages.send".into(),
598 details: serde_json::json!({}),
599 options: vec![],
600 },
601 "approve",
602 Duration::from_secs(2),
603 )
604 .await
605 .unwrap()
606 });
607 tokio::time::sleep(Duration::from_millis(20)).await;
608 let pending = host.approvals().await;
609 host.resolve_approval(
610 "ui-session",
611 ResolveHostApprovalRequest {
612 approval_id: pending[0].id.clone(),
613 resolution: "deny".into(),
614 },
615 )
616 .await
617 .unwrap();
618 assert_eq!(waiter.await.unwrap(), ApprovalOutcome::Denied);
619 }
620
621 #[tokio::test]
622 async fn request_and_wait_times_out_when_no_resolution() {
623 let host = HostState::new();
624 let outcome = host
625 .request_and_wait_approval(
626 CreateHostApprovalRequest {
627 agent_id: None,
628 action: "vision.ocr".into(),
629 details: serde_json::json!({}),
630 options: vec![],
631 },
632 "approve",
633 Duration::from_millis(50),
634 )
635 .await
636 .unwrap();
637 assert_eq!(outcome, ApprovalOutcome::TimedOut);
638 let pending = host.approvals().await;
640 assert_eq!(pending.len(), 1);
641 assert_eq!(pending[0].status, HostApprovalStatus::Pending);
642 }
643
644 fn make_register_request(name: &str) -> RegisterHostAgentRequest {
647 RegisterHostAgentRequest {
648 id: Some(name.into()),
649 name: name.into(),
650 kind: "test".into(),
651 capabilities: vec![],
652 project: None,
653 pid: None,
654 display: car_proto::HostAgentDisplay {
655 label: None,
656 icon: None,
657 accent: None,
658 },
659 metadata: Value::Null,
660 }
661 }
662
663 #[tokio::test]
664 async fn set_status_rejects_non_owning_session() {
665 let host = HostState::new();
666 host.register_agent("client-A", make_register_request("worker"))
667 .await
668 .unwrap();
669
670 let err = host
672 .set_status(
673 "client-B",
674 SetHostAgentStatusRequest {
675 agent_id: "worker".into(),
676 status: HostAgentStatus::Errored,
677 current_task: None,
678 message: None,
679 payload: Value::Null,
680 },
681 )
682 .await
683 .unwrap_err();
684 assert!(
685 err.contains("owned by another session"),
686 "unexpected rejection message: {err}"
687 );
688
689 host.set_status(
691 "client-A",
692 SetHostAgentStatusRequest {
693 agent_id: "worker".into(),
694 status: HostAgentStatus::Running,
695 current_task: None,
696 message: None,
697 payload: Value::Null,
698 },
699 )
700 .await
701 .expect("owner can mutate");
702 }
703
704 #[tokio::test]
705 async fn unregister_rejects_non_owning_session() {
706 let host = HostState::new();
707 host.register_agent("client-A", make_register_request("worker"))
708 .await
709 .unwrap();
710
711 let err = host
712 .unregister_agent("client-B", "worker")
713 .await
714 .unwrap_err();
715 assert!(err.contains("owned by another session"));
716 assert_eq!(host.agents().await.len(), 1, "agent must survive");
717
718 host.unregister_agent("client-A", "worker")
719 .await
720 .expect("owner can unregister");
721 assert_eq!(host.agents().await.len(), 0);
722 }
723
724 #[tokio::test]
725 async fn register_refuses_to_overwrite_other_sessions_agent() {
726 let host = HostState::new();
727 host.register_agent("client-A", make_register_request("worker"))
728 .await
729 .unwrap();
730
731 let err = host
732 .register_agent("client-B", make_register_request("worker"))
733 .await
734 .unwrap_err();
735 assert!(
736 err.contains("owned by another session"),
737 "unexpected message: {err}"
738 );
739 }
740
741 #[tokio::test]
742 async fn manual_approval_only_resolvable_by_creator() {
743 let host = HostState::new();
744 let approval = host
745 .create_approval(
746 Some("client-A"),
747 CreateHostApprovalRequest {
748 agent_id: None,
749 action: "manual.action".into(),
750 details: serde_json::json!({}),
751 options: vec![],
752 },
753 )
754 .await
755 .unwrap();
756
757 let err = host
759 .resolve_approval(
760 "client-B",
761 ResolveHostApprovalRequest {
762 approval_id: approval.id.clone(),
763 resolution: "approve".into(),
764 },
765 )
766 .await
767 .unwrap_err();
768 assert!(err.contains("owned by another session"));
769
770 host.resolve_approval(
772 "client-A",
773 ResolveHostApprovalRequest {
774 approval_id: approval.id,
775 resolution: "deny".into(),
776 },
777 )
778 .await
779 .expect("owner can resolve");
780 }
781
782 #[tokio::test]
783 async fn system_approval_resolvable_by_any_session() {
784 let host = HostState::new();
788 let approval = host
789 .create_approval(
790 None,
791 CreateHostApprovalRequest {
792 agent_id: None,
793 action: "ws.method:automation.run_applescript".into(),
794 details: serde_json::json!({}),
795 options: vec![],
796 },
797 )
798 .await
799 .unwrap();
800 assert!(
801 approval.client_id.is_none(),
802 "system approval must carry no owner"
803 );
804
805 host.resolve_approval(
806 "ui-session",
807 ResolveHostApprovalRequest {
808 approval_id: approval.id,
809 resolution: "approve".into(),
810 },
811 )
812 .await
813 .expect("any session can resolve a system approval");
814 }
815}