1use crate::session::WsChannel;
8use car_proto::{
9 CreateHostApprovalRequest, HostAgent, HostAgentStatus, HostApprovalRequest, HostApprovalStatus,
10 HostEvent, RegisterHostAgentRequest, ResolveHostApprovalRequest, SetHostAgentStatusRequest,
11};
12use chrono::Utc;
13use futures::SinkExt;
14use serde_json::Value;
15use std::collections::{HashMap, VecDeque};
16use std::sync::Arc;
17use tokio::sync::Mutex;
18use tokio_tungstenite::tungstenite::Message;
19
20const MAX_EVENTS: usize = 500;
21
22#[derive(Default)]
23pub struct HostState {
24 agents: Mutex<HashMap<String, HostAgent>>,
25 approvals: Mutex<HashMap<String, HostApprovalRequest>>,
26 events: Mutex<VecDeque<HostEvent>>,
27 subscribers: Mutex<HashMap<String, Arc<WsChannel>>>,
28}
29
30impl HostState {
31 pub fn new() -> Self {
32 Self::default()
33 }
34
35 pub async fn subscribe(&self, client_id: &str, channel: Arc<WsChannel>) {
36 self.subscribers
37 .lock()
38 .await
39 .insert(client_id.to_string(), channel);
40 }
41
42 pub async fn unsubscribe(&self, client_id: &str) {
43 self.subscribers.lock().await.remove(client_id);
44 }
45
46 pub async fn register_agent(
47 &self,
48 client_id: &str,
49 req: RegisterHostAgentRequest,
50 ) -> Result<HostAgent, String> {
51 let id = req.id.unwrap_or_else(|| format!("agent-{}", short_id()));
52 let agent = HostAgent {
53 id: id.clone(),
54 name: req.name,
55 kind: req.kind,
56 capabilities: req.capabilities,
57 project: req.project,
58 session_id: Some(client_id.to_string()),
59 status: HostAgentStatus::Idle,
60 current_task: None,
61 pid: req.pid,
62 display: req.display,
63 updated_at: Utc::now(),
64 metadata: req.metadata,
65 };
66
67 self.agents.lock().await.insert(id.clone(), agent.clone());
68 self.record_event(
69 "agent.registered",
70 Some(id),
71 format!("{} registered", agent.name),
72 serde_json::to_value(&agent).map_err(|e| e.to_string())?,
73 )
74 .await;
75
76 Ok(agent)
77 }
78
79 pub async fn unregister_agent(&self, agent_id: &str) -> Result<(), String> {
80 let removed = self.agents.lock().await.remove(agent_id);
81 if removed.is_none() {
82 return Err(format!("unknown agent '{}'", agent_id));
83 }
84 self.record_event(
85 "agent.unregistered",
86 Some(agent_id.to_string()),
87 format!("{} unregistered", agent_id),
88 Value::Null,
89 )
90 .await;
91 Ok(())
92 }
93
94 pub async fn set_status(&self, req: SetHostAgentStatusRequest) -> Result<HostAgent, String> {
95 let mut agents = self.agents.lock().await;
96 let agent = agents
97 .get_mut(&req.agent_id)
98 .ok_or_else(|| format!("unknown agent '{}'", req.agent_id))?;
99
100 agent.status = req.status.clone();
101 agent.current_task = req.current_task.clone();
102 agent.updated_at = Utc::now();
103 let updated = agent.clone();
104 drop(agents);
105
106 let message = req
107 .message
108 .unwrap_or_else(|| format!("{} is {:?}", updated.name, updated.status));
109 self.record_event(
110 "agent.status_changed",
111 Some(updated.id.clone()),
112 message,
113 if req.payload.is_null() {
114 serde_json::to_value(&updated).map_err(|e| e.to_string())?
115 } else {
116 req.payload
117 },
118 )
119 .await;
120
121 Ok(updated)
122 }
123
124 pub async fn create_approval(
125 &self,
126 req: CreateHostApprovalRequest,
127 ) -> Result<HostApprovalRequest, String> {
128 let approval = HostApprovalRequest {
129 id: format!("approval-{}", short_id()),
130 agent_id: req.agent_id,
131 action: req.action,
132 details: req.details,
133 options: if req.options.is_empty() {
134 vec!["approve".to_string(), "deny".to_string()]
135 } else {
136 req.options
137 },
138 status: HostApprovalStatus::Pending,
139 created_at: Utc::now(),
140 resolved_at: None,
141 resolution: None,
142 };
143
144 self.approvals
145 .lock()
146 .await
147 .insert(approval.id.clone(), approval.clone());
148 self.record_event(
149 "approval.requested",
150 approval.agent_id.clone(),
151 format!("Approval requested: {}", approval.action),
152 serde_json::to_value(&approval).map_err(|e| e.to_string())?,
153 )
154 .await;
155 Ok(approval)
156 }
157
158 pub async fn resolve_approval(
159 &self,
160 req: ResolveHostApprovalRequest,
161 ) -> Result<HostApprovalRequest, String> {
162 let mut approvals = self.approvals.lock().await;
163 let approval = approvals
164 .get_mut(&req.approval_id)
165 .ok_or_else(|| format!("unknown approval '{}'", req.approval_id))?;
166 approval.status = HostApprovalStatus::Resolved;
167 approval.resolution = Some(req.resolution);
168 approval.resolved_at = Some(Utc::now());
169 let resolved = approval.clone();
170 drop(approvals);
171
172 self.record_event(
173 "approval.resolved",
174 resolved.agent_id.clone(),
175 format!("Approval resolved: {}", resolved.action),
176 serde_json::to_value(&resolved).map_err(|e| e.to_string())?,
177 )
178 .await;
179 Ok(resolved)
180 }
181
182 pub async fn agents(&self) -> Vec<HostAgent> {
183 let mut agents: Vec<_> = self.agents.lock().await.values().cloned().collect();
184 agents.sort_by(|a, b| a.name.cmp(&b.name).then(a.id.cmp(&b.id)));
185 agents
186 }
187
188 pub async fn approvals(&self) -> Vec<HostApprovalRequest> {
189 let mut approvals: Vec<_> = self.approvals.lock().await.values().cloned().collect();
190 approvals.sort_by(|a, b| b.created_at.cmp(&a.created_at));
191 approvals
192 }
193
194 pub async fn events(&self, limit: usize) -> Vec<HostEvent> {
195 self.events
196 .lock()
197 .await
198 .iter()
199 .rev()
200 .take(limit)
201 .cloned()
202 .collect()
203 }
204
205 pub async fn record_event(
206 &self,
207 kind: impl Into<String>,
208 agent_id: Option<String>,
209 message: impl Into<String>,
210 payload: Value,
211 ) -> HostEvent {
212 let event = HostEvent {
213 id: format!("event-{}", short_id()),
214 timestamp: Utc::now(),
215 kind: kind.into(),
216 agent_id,
217 message: message.into(),
218 payload,
219 };
220
221 {
222 let mut events = self.events.lock().await;
223 events.push_back(event.clone());
224 while events.len() > MAX_EVENTS {
225 events.pop_front();
226 }
227 }
228
229 self.broadcast_event(&event).await;
230 event
231 }
232
233 async fn broadcast_event(&self, event: &HostEvent) {
234 let subscribers: Vec<Arc<WsChannel>> =
235 self.subscribers.lock().await.values().cloned().collect();
236
237 let Ok(json) = serde_json::to_string(&serde_json::json!({
238 "jsonrpc": "2.0",
239 "method": "host.event",
240 "params": event,
241 })) else {
242 return;
243 };
244
245 for channel in subscribers {
246 let _ = channel
247 .write
248 .lock()
249 .await
250 .send(Message::Text(json.clone().into()))
251 .await;
252 }
253 }
254}
255
256fn short_id() -> String {
257 uuid::Uuid::new_v4().simple().to_string()[..12].to_string()
258}
259
260#[cfg(test)]
261mod tests {
262 use super::*;
263
264 #[tokio::test]
265 async fn host_tracks_agents_events_and_approvals() {
266 let host = HostState::new();
267
268 let agent = host
269 .register_agent(
270 "client-1",
271 RegisterHostAgentRequest {
272 id: Some("agent-1".to_string()),
273 name: "Researcher".to_string(),
274 kind: "builtin".to_string(),
275 capabilities: vec!["search".to_string()],
276 project: Some("/tmp/project".to_string()),
277 pid: None,
278 display: car_proto::HostAgentDisplay {
279 label: Some("Research Lead".to_string()),
280 icon: Some("magnifying-glass".to_string()),
281 accent: Some("#0a84ff".to_string()),
282 },
283 metadata: Value::Null,
284 },
285 )
286 .await
287 .expect("register agent");
288
289 assert_eq!(agent.status, HostAgentStatus::Idle);
290 assert_eq!(agent.display.label.as_deref(), Some("Research Lead"));
291 assert_eq!(agent.display.icon.as_deref(), Some("magnifying-glass"));
292 assert_eq!(agent.display.accent.as_deref(), Some("#0a84ff"));
293 assert_eq!(host.agents().await.len(), 1);
294
295 let updated = host
296 .set_status(SetHostAgentStatusRequest {
297 agent_id: "agent-1".to_string(),
298 status: HostAgentStatus::Running,
299 current_task: Some("Collect facts".to_string()),
300 message: None,
301 payload: Value::Null,
302 })
303 .await
304 .expect("set status");
305
306 assert_eq!(updated.status, HostAgentStatus::Running);
307 assert_eq!(updated.current_task.as_deref(), Some("Collect facts"));
308
309 let approval = host
310 .create_approval(CreateHostApprovalRequest {
311 agent_id: Some("agent-1".to_string()),
312 action: "Run tests".to_string(),
313 details: serde_json::json!({ "command": "cargo test" }),
314 options: vec![],
315 })
316 .await
317 .expect("create approval");
318
319 assert_eq!(approval.options, vec!["approve", "deny"]);
320 assert_eq!(approval.status, HostApprovalStatus::Pending);
321
322 let resolved = host
323 .resolve_approval(ResolveHostApprovalRequest {
324 approval_id: approval.id,
325 resolution: "approve".to_string(),
326 })
327 .await
328 .expect("resolve approval");
329
330 assert_eq!(resolved.status, HostApprovalStatus::Resolved);
331 assert_eq!(resolved.resolution.as_deref(), Some("approve"));
332 assert!(host.events(10).await.len() >= 4);
333 }
334}