Skip to main content

car_server_core/
host.rs

1//! Host-facing state for OS integrations.
2//!
3//! This module is intentionally UI-agnostic. Native shells such as a macOS menu
4//! bar app, Windows tray app, or Linux status notifier can subscribe to these
5//! events and render the same agent control surface.
6
7use crate::session::WsChannel;
8use car_proto::{
9    CreateHostApprovalRequest, HostAgent, HostAgentStatus, HostApprovalRequest, HostApprovalStatus,
10    HostEvent, RegisterHostAgentRequest, ResolveHostApprovalRequest, SetHostAgentStatusRequest,
11};
12use chrono::Utc;
13use futures::SinkExt;
14use serde_json::Value;
15use std::collections::{HashMap, VecDeque};
16use std::sync::Arc;
17use tokio::sync::Mutex;
18use tokio_tungstenite::tungstenite::Message;
19
20const MAX_EVENTS: usize = 500;
21
22#[derive(Default)]
23pub struct HostState {
24    agents: Mutex<HashMap<String, HostAgent>>,
25    approvals: Mutex<HashMap<String, HostApprovalRequest>>,
26    events: Mutex<VecDeque<HostEvent>>,
27    subscribers: Mutex<HashMap<String, Arc<WsChannel>>>,
28}
29
30impl HostState {
31    pub fn new() -> Self {
32        Self::default()
33    }
34
35    pub async fn subscribe(&self, client_id: &str, channel: Arc<WsChannel>) {
36        self.subscribers
37            .lock()
38            .await
39            .insert(client_id.to_string(), channel);
40    }
41
42    pub async fn unsubscribe(&self, client_id: &str) {
43        self.subscribers.lock().await.remove(client_id);
44    }
45
46    pub async fn register_agent(
47        &self,
48        client_id: &str,
49        req: RegisterHostAgentRequest,
50    ) -> Result<HostAgent, String> {
51        let id = req.id.unwrap_or_else(|| format!("agent-{}", short_id()));
52        let agent = HostAgent {
53            id: id.clone(),
54            name: req.name,
55            kind: req.kind,
56            capabilities: req.capabilities,
57            project: req.project,
58            session_id: Some(client_id.to_string()),
59            status: HostAgentStatus::Idle,
60            current_task: None,
61            pid: req.pid,
62            display: req.display,
63            updated_at: Utc::now(),
64            metadata: req.metadata,
65        };
66
67        self.agents.lock().await.insert(id.clone(), agent.clone());
68        self.record_event(
69            "agent.registered",
70            Some(id),
71            format!("{} registered", agent.name),
72            serde_json::to_value(&agent).map_err(|e| e.to_string())?,
73        )
74        .await;
75
76        Ok(agent)
77    }
78
79    pub async fn unregister_agent(&self, agent_id: &str) -> Result<(), String> {
80        let removed = self.agents.lock().await.remove(agent_id);
81        if removed.is_none() {
82            return Err(format!("unknown agent '{}'", agent_id));
83        }
84        self.record_event(
85            "agent.unregistered",
86            Some(agent_id.to_string()),
87            format!("{} unregistered", agent_id),
88            Value::Null,
89        )
90        .await;
91        Ok(())
92    }
93
94    pub async fn set_status(&self, req: SetHostAgentStatusRequest) -> Result<HostAgent, String> {
95        let mut agents = self.agents.lock().await;
96        let agent = agents
97            .get_mut(&req.agent_id)
98            .ok_or_else(|| format!("unknown agent '{}'", req.agent_id))?;
99
100        agent.status = req.status.clone();
101        agent.current_task = req.current_task.clone();
102        agent.updated_at = Utc::now();
103        let updated = agent.clone();
104        drop(agents);
105
106        let message = req
107            .message
108            .unwrap_or_else(|| format!("{} is {:?}", updated.name, updated.status));
109        self.record_event(
110            "agent.status_changed",
111            Some(updated.id.clone()),
112            message,
113            if req.payload.is_null() {
114                serde_json::to_value(&updated).map_err(|e| e.to_string())?
115            } else {
116                req.payload
117            },
118        )
119        .await;
120
121        Ok(updated)
122    }
123
124    pub async fn create_approval(
125        &self,
126        req: CreateHostApprovalRequest,
127    ) -> Result<HostApprovalRequest, String> {
128        let approval = HostApprovalRequest {
129            id: format!("approval-{}", short_id()),
130            agent_id: req.agent_id,
131            action: req.action,
132            details: req.details,
133            options: if req.options.is_empty() {
134                vec!["approve".to_string(), "deny".to_string()]
135            } else {
136                req.options
137            },
138            status: HostApprovalStatus::Pending,
139            created_at: Utc::now(),
140            resolved_at: None,
141            resolution: None,
142        };
143
144        self.approvals
145            .lock()
146            .await
147            .insert(approval.id.clone(), approval.clone());
148        self.record_event(
149            "approval.requested",
150            approval.agent_id.clone(),
151            format!("Approval requested: {}", approval.action),
152            serde_json::to_value(&approval).map_err(|e| e.to_string())?,
153        )
154        .await;
155        Ok(approval)
156    }
157
158    pub async fn resolve_approval(
159        &self,
160        req: ResolveHostApprovalRequest,
161    ) -> Result<HostApprovalRequest, String> {
162        let mut approvals = self.approvals.lock().await;
163        let approval = approvals
164            .get_mut(&req.approval_id)
165            .ok_or_else(|| format!("unknown approval '{}'", req.approval_id))?;
166        approval.status = HostApprovalStatus::Resolved;
167        approval.resolution = Some(req.resolution);
168        approval.resolved_at = Some(Utc::now());
169        let resolved = approval.clone();
170        drop(approvals);
171
172        self.record_event(
173            "approval.resolved",
174            resolved.agent_id.clone(),
175            format!("Approval resolved: {}", resolved.action),
176            serde_json::to_value(&resolved).map_err(|e| e.to_string())?,
177        )
178        .await;
179        Ok(resolved)
180    }
181
182    pub async fn agents(&self) -> Vec<HostAgent> {
183        let mut agents: Vec<_> = self.agents.lock().await.values().cloned().collect();
184        agents.sort_by(|a, b| a.name.cmp(&b.name).then(a.id.cmp(&b.id)));
185        agents
186    }
187
188    pub async fn approvals(&self) -> Vec<HostApprovalRequest> {
189        let mut approvals: Vec<_> = self.approvals.lock().await.values().cloned().collect();
190        approvals.sort_by(|a, b| b.created_at.cmp(&a.created_at));
191        approvals
192    }
193
194    pub async fn events(&self, limit: usize) -> Vec<HostEvent> {
195        self.events
196            .lock()
197            .await
198            .iter()
199            .rev()
200            .take(limit)
201            .cloned()
202            .collect()
203    }
204
205    pub async fn record_event(
206        &self,
207        kind: impl Into<String>,
208        agent_id: Option<String>,
209        message: impl Into<String>,
210        payload: Value,
211    ) -> HostEvent {
212        let event = HostEvent {
213            id: format!("event-{}", short_id()),
214            timestamp: Utc::now(),
215            kind: kind.into(),
216            agent_id,
217            message: message.into(),
218            payload,
219        };
220
221        {
222            let mut events = self.events.lock().await;
223            events.push_back(event.clone());
224            while events.len() > MAX_EVENTS {
225                events.pop_front();
226            }
227        }
228
229        self.broadcast_event(&event).await;
230        event
231    }
232
233    async fn broadcast_event(&self, event: &HostEvent) {
234        let subscribers: Vec<Arc<WsChannel>> =
235            self.subscribers.lock().await.values().cloned().collect();
236
237        let Ok(json) = serde_json::to_string(&serde_json::json!({
238            "jsonrpc": "2.0",
239            "method": "host.event",
240            "params": event,
241        })) else {
242            return;
243        };
244
245        for channel in subscribers {
246            let _ = channel
247                .write
248                .lock()
249                .await
250                .send(Message::Text(json.clone().into()))
251                .await;
252        }
253    }
254}
255
256fn short_id() -> String {
257    uuid::Uuid::new_v4().simple().to_string()[..12].to_string()
258}
259
260#[cfg(test)]
261mod tests {
262    use super::*;
263
264    #[tokio::test]
265    async fn host_tracks_agents_events_and_approvals() {
266        let host = HostState::new();
267
268        let agent = host
269            .register_agent(
270                "client-1",
271                RegisterHostAgentRequest {
272                    id: Some("agent-1".to_string()),
273                    name: "Researcher".to_string(),
274                    kind: "builtin".to_string(),
275                    capabilities: vec!["search".to_string()],
276                    project: Some("/tmp/project".to_string()),
277                    pid: None,
278                    display: car_proto::HostAgentDisplay {
279                        label: Some("Research Lead".to_string()),
280                        icon: Some("magnifying-glass".to_string()),
281                        accent: Some("#0a84ff".to_string()),
282                    },
283                    metadata: Value::Null,
284                },
285            )
286            .await
287            .expect("register agent");
288
289        assert_eq!(agent.status, HostAgentStatus::Idle);
290        assert_eq!(agent.display.label.as_deref(), Some("Research Lead"));
291        assert_eq!(agent.display.icon.as_deref(), Some("magnifying-glass"));
292        assert_eq!(agent.display.accent.as_deref(), Some("#0a84ff"));
293        assert_eq!(host.agents().await.len(), 1);
294
295        let updated = host
296            .set_status(SetHostAgentStatusRequest {
297                agent_id: "agent-1".to_string(),
298                status: HostAgentStatus::Running,
299                current_task: Some("Collect facts".to_string()),
300                message: None,
301                payload: Value::Null,
302            })
303            .await
304            .expect("set status");
305
306        assert_eq!(updated.status, HostAgentStatus::Running);
307        assert_eq!(updated.current_task.as_deref(), Some("Collect facts"));
308
309        let approval = host
310            .create_approval(CreateHostApprovalRequest {
311                agent_id: Some("agent-1".to_string()),
312                action: "Run tests".to_string(),
313                details: serde_json::json!({ "command": "cargo test" }),
314                options: vec![],
315            })
316            .await
317            .expect("create approval");
318
319        assert_eq!(approval.options, vec!["approve", "deny"]);
320        assert_eq!(approval.status, HostApprovalStatus::Pending);
321
322        let resolved = host
323            .resolve_approval(ResolveHostApprovalRequest {
324                approval_id: approval.id,
325                resolution: "approve".to_string(),
326            })
327            .await
328            .expect("resolve approval");
329
330        assert_eq!(resolved.status, HostApprovalStatus::Resolved);
331        assert_eq!(resolved.resolution.as_deref(), Some("approve"));
332        assert!(host.events(10).await.len() >= 4);
333    }
334}