car-server-core 0.7.0

Transport-neutral library for the CAR daemon JSON-RPC dispatcher (used by car-server and tokhn-daemon)
Documentation
//! Host-facing state for OS integrations.
//!
//! This module is intentionally UI-agnostic. Native shells such as a macOS menu
//! bar app, Windows tray app, or Linux status notifier can subscribe to these
//! events and render the same agent control surface.

use crate::session::WsChannel;
use car_proto::{
    CreateHostApprovalRequest, HostAgent, HostAgentStatus, HostApprovalRequest, HostApprovalStatus,
    HostEvent, RegisterHostAgentRequest, ResolveHostApprovalRequest, SetHostAgentStatusRequest,
};
use chrono::Utc;
use futures::SinkExt;
use serde_json::Value;
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio_tungstenite::tungstenite::Message;

const MAX_EVENTS: usize = 500;

#[derive(Default)]
pub struct HostState {
    agents: Mutex<HashMap<String, HostAgent>>,
    approvals: Mutex<HashMap<String, HostApprovalRequest>>,
    events: Mutex<VecDeque<HostEvent>>,
    subscribers: Mutex<HashMap<String, Arc<WsChannel>>>,
}

impl HostState {
    pub fn new() -> Self {
        Self::default()
    }

    pub async fn subscribe(&self, client_id: &str, channel: Arc<WsChannel>) {
        self.subscribers
            .lock()
            .await
            .insert(client_id.to_string(), channel);
    }

    pub async fn unsubscribe(&self, client_id: &str) {
        self.subscribers.lock().await.remove(client_id);
    }

    pub async fn register_agent(
        &self,
        client_id: &str,
        req: RegisterHostAgentRequest,
    ) -> Result<HostAgent, String> {
        let id = req.id.unwrap_or_else(|| format!("agent-{}", short_id()));
        let agent = HostAgent {
            id: id.clone(),
            name: req.name,
            kind: req.kind,
            capabilities: req.capabilities,
            project: req.project,
            session_id: Some(client_id.to_string()),
            status: HostAgentStatus::Idle,
            current_task: None,
            pid: req.pid,
            display: req.display,
            updated_at: Utc::now(),
            metadata: req.metadata,
        };

        self.agents.lock().await.insert(id.clone(), agent.clone());
        self.record_event(
            "agent.registered",
            Some(id),
            format!("{} registered", agent.name),
            serde_json::to_value(&agent).map_err(|e| e.to_string())?,
        )
        .await;

        Ok(agent)
    }

    pub async fn unregister_agent(&self, agent_id: &str) -> Result<(), String> {
        let removed = self.agents.lock().await.remove(agent_id);
        if removed.is_none() {
            return Err(format!("unknown agent '{}'", agent_id));
        }
        self.record_event(
            "agent.unregistered",
            Some(agent_id.to_string()),
            format!("{} unregistered", agent_id),
            Value::Null,
        )
        .await;
        Ok(())
    }

    pub async fn set_status(&self, req: SetHostAgentStatusRequest) -> Result<HostAgent, String> {
        let mut agents = self.agents.lock().await;
        let agent = agents
            .get_mut(&req.agent_id)
            .ok_or_else(|| format!("unknown agent '{}'", req.agent_id))?;

        agent.status = req.status.clone();
        agent.current_task = req.current_task.clone();
        agent.updated_at = Utc::now();
        let updated = agent.clone();
        drop(agents);

        let message = req
            .message
            .unwrap_or_else(|| format!("{} is {:?}", updated.name, updated.status));
        self.record_event(
            "agent.status_changed",
            Some(updated.id.clone()),
            message,
            if req.payload.is_null() {
                serde_json::to_value(&updated).map_err(|e| e.to_string())?
            } else {
                req.payload
            },
        )
        .await;

        Ok(updated)
    }

    pub async fn create_approval(
        &self,
        req: CreateHostApprovalRequest,
    ) -> Result<HostApprovalRequest, String> {
        let approval = HostApprovalRequest {
            id: format!("approval-{}", short_id()),
            agent_id: req.agent_id,
            action: req.action,
            details: req.details,
            options: if req.options.is_empty() {
                vec!["approve".to_string(), "deny".to_string()]
            } else {
                req.options
            },
            status: HostApprovalStatus::Pending,
            created_at: Utc::now(),
            resolved_at: None,
            resolution: None,
        };

        self.approvals
            .lock()
            .await
            .insert(approval.id.clone(), approval.clone());
        self.record_event(
            "approval.requested",
            approval.agent_id.clone(),
            format!("Approval requested: {}", approval.action),
            serde_json::to_value(&approval).map_err(|e| e.to_string())?,
        )
        .await;
        Ok(approval)
    }

    pub async fn resolve_approval(
        &self,
        req: ResolveHostApprovalRequest,
    ) -> Result<HostApprovalRequest, String> {
        let mut approvals = self.approvals.lock().await;
        let approval = approvals
            .get_mut(&req.approval_id)
            .ok_or_else(|| format!("unknown approval '{}'", req.approval_id))?;
        approval.status = HostApprovalStatus::Resolved;
        approval.resolution = Some(req.resolution);
        approval.resolved_at = Some(Utc::now());
        let resolved = approval.clone();
        drop(approvals);

        self.record_event(
            "approval.resolved",
            resolved.agent_id.clone(),
            format!("Approval resolved: {}", resolved.action),
            serde_json::to_value(&resolved).map_err(|e| e.to_string())?,
        )
        .await;
        Ok(resolved)
    }

    pub async fn agents(&self) -> Vec<HostAgent> {
        let mut agents: Vec<_> = self.agents.lock().await.values().cloned().collect();
        agents.sort_by(|a, b| a.name.cmp(&b.name).then(a.id.cmp(&b.id)));
        agents
    }

    pub async fn approvals(&self) -> Vec<HostApprovalRequest> {
        let mut approvals: Vec<_> = self.approvals.lock().await.values().cloned().collect();
        approvals.sort_by(|a, b| b.created_at.cmp(&a.created_at));
        approvals
    }

    pub async fn events(&self, limit: usize) -> Vec<HostEvent> {
        self.events
            .lock()
            .await
            .iter()
            .rev()
            .take(limit)
            .cloned()
            .collect()
    }

    pub async fn record_event(
        &self,
        kind: impl Into<String>,
        agent_id: Option<String>,
        message: impl Into<String>,
        payload: Value,
    ) -> HostEvent {
        let event = HostEvent {
            id: format!("event-{}", short_id()),
            timestamp: Utc::now(),
            kind: kind.into(),
            agent_id,
            message: message.into(),
            payload,
        };

        {
            let mut events = self.events.lock().await;
            events.push_back(event.clone());
            while events.len() > MAX_EVENTS {
                events.pop_front();
            }
        }

        self.broadcast_event(&event).await;
        event
    }

    async fn broadcast_event(&self, event: &HostEvent) {
        let subscribers: Vec<Arc<WsChannel>> =
            self.subscribers.lock().await.values().cloned().collect();

        let Ok(json) = serde_json::to_string(&serde_json::json!({
            "jsonrpc": "2.0",
            "method": "host.event",
            "params": event,
        })) else {
            return;
        };

        for channel in subscribers {
            let _ = channel
                .write
                .lock()
                .await
                .send(Message::Text(json.clone().into()))
                .await;
        }
    }
}

fn short_id() -> String {
    uuid::Uuid::new_v4().simple().to_string()[..12].to_string()
}

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn host_tracks_agents_events_and_approvals() {
        let host = HostState::new();

        let agent = host
            .register_agent(
                "client-1",
                RegisterHostAgentRequest {
                    id: Some("agent-1".to_string()),
                    name: "Researcher".to_string(),
                    kind: "builtin".to_string(),
                    capabilities: vec!["search".to_string()],
                    project: Some("/tmp/project".to_string()),
                    pid: None,
                    display: car_proto::HostAgentDisplay {
                        label: Some("Research Lead".to_string()),
                        icon: Some("magnifying-glass".to_string()),
                        accent: Some("#0a84ff".to_string()),
                    },
                    metadata: Value::Null,
                },
            )
            .await
            .expect("register agent");

        assert_eq!(agent.status, HostAgentStatus::Idle);
        assert_eq!(agent.display.label.as_deref(), Some("Research Lead"));
        assert_eq!(agent.display.icon.as_deref(), Some("magnifying-glass"));
        assert_eq!(agent.display.accent.as_deref(), Some("#0a84ff"));
        assert_eq!(host.agents().await.len(), 1);

        let updated = host
            .set_status(SetHostAgentStatusRequest {
                agent_id: "agent-1".to_string(),
                status: HostAgentStatus::Running,
                current_task: Some("Collect facts".to_string()),
                message: None,
                payload: Value::Null,
            })
            .await
            .expect("set status");

        assert_eq!(updated.status, HostAgentStatus::Running);
        assert_eq!(updated.current_task.as_deref(), Some("Collect facts"));

        let approval = host
            .create_approval(CreateHostApprovalRequest {
                agent_id: Some("agent-1".to_string()),
                action: "Run tests".to_string(),
                details: serde_json::json!({ "command": "cargo test" }),
                options: vec![],
            })
            .await
            .expect("create approval");

        assert_eq!(approval.options, vec!["approve", "deny"]);
        assert_eq!(approval.status, HostApprovalStatus::Pending);

        let resolved = host
            .resolve_approval(ResolveHostApprovalRequest {
                approval_id: approval.id,
                resolution: "approve".to_string(),
            })
            .await
            .expect("resolve approval");

        assert_eq!(resolved.status, HostApprovalStatus::Resolved);
        assert_eq!(resolved.resolution.as_deref(), Some("approve"));
        assert!(host.events(10).await.len() >= 4);
    }
}