use crate::session::WsChannel;
use car_proto::{
CreateHostApprovalRequest, HostAgent, HostAgentStatus, HostApprovalRequest, HostApprovalStatus,
HostEvent, RegisterHostAgentRequest, ResolveHostApprovalRequest, SetHostAgentStatusRequest,
};
use chrono::Utc;
use futures::SinkExt;
use serde_json::Value;
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio_tungstenite::tungstenite::Message;
const MAX_EVENTS: usize = 500;
#[derive(Default)]
pub struct HostState {
agents: Mutex<HashMap<String, HostAgent>>,
approvals: Mutex<HashMap<String, HostApprovalRequest>>,
events: Mutex<VecDeque<HostEvent>>,
subscribers: Mutex<HashMap<String, Arc<WsChannel>>>,
}
impl HostState {
pub fn new() -> Self {
Self::default()
}
pub async fn subscribe(&self, client_id: &str, channel: Arc<WsChannel>) {
self.subscribers
.lock()
.await
.insert(client_id.to_string(), channel);
}
pub async fn unsubscribe(&self, client_id: &str) {
self.subscribers.lock().await.remove(client_id);
}
pub async fn register_agent(
&self,
client_id: &str,
req: RegisterHostAgentRequest,
) -> Result<HostAgent, String> {
let id = req.id.unwrap_or_else(|| format!("agent-{}", short_id()));
let agent = HostAgent {
id: id.clone(),
name: req.name,
kind: req.kind,
capabilities: req.capabilities,
project: req.project,
session_id: Some(client_id.to_string()),
status: HostAgentStatus::Idle,
current_task: None,
pid: req.pid,
display: req.display,
updated_at: Utc::now(),
metadata: req.metadata,
};
self.agents.lock().await.insert(id.clone(), agent.clone());
self.record_event(
"agent.registered",
Some(id),
format!("{} registered", agent.name),
serde_json::to_value(&agent).map_err(|e| e.to_string())?,
)
.await;
Ok(agent)
}
pub async fn unregister_agent(&self, agent_id: &str) -> Result<(), String> {
let removed = self.agents.lock().await.remove(agent_id);
if removed.is_none() {
return Err(format!("unknown agent '{}'", agent_id));
}
self.record_event(
"agent.unregistered",
Some(agent_id.to_string()),
format!("{} unregistered", agent_id),
Value::Null,
)
.await;
Ok(())
}
pub async fn set_status(&self, req: SetHostAgentStatusRequest) -> Result<HostAgent, String> {
let mut agents = self.agents.lock().await;
let agent = agents
.get_mut(&req.agent_id)
.ok_or_else(|| format!("unknown agent '{}'", req.agent_id))?;
agent.status = req.status.clone();
agent.current_task = req.current_task.clone();
agent.updated_at = Utc::now();
let updated = agent.clone();
drop(agents);
let message = req
.message
.unwrap_or_else(|| format!("{} is {:?}", updated.name, updated.status));
self.record_event(
"agent.status_changed",
Some(updated.id.clone()),
message,
if req.payload.is_null() {
serde_json::to_value(&updated).map_err(|e| e.to_string())?
} else {
req.payload
},
)
.await;
Ok(updated)
}
pub async fn create_approval(
&self,
req: CreateHostApprovalRequest,
) -> Result<HostApprovalRequest, String> {
let approval = HostApprovalRequest {
id: format!("approval-{}", short_id()),
agent_id: req.agent_id,
action: req.action,
details: req.details,
options: if req.options.is_empty() {
vec!["approve".to_string(), "deny".to_string()]
} else {
req.options
},
status: HostApprovalStatus::Pending,
created_at: Utc::now(),
resolved_at: None,
resolution: None,
};
self.approvals
.lock()
.await
.insert(approval.id.clone(), approval.clone());
self.record_event(
"approval.requested",
approval.agent_id.clone(),
format!("Approval requested: {}", approval.action),
serde_json::to_value(&approval).map_err(|e| e.to_string())?,
)
.await;
Ok(approval)
}
pub async fn resolve_approval(
&self,
req: ResolveHostApprovalRequest,
) -> Result<HostApprovalRequest, String> {
let mut approvals = self.approvals.lock().await;
let approval = approvals
.get_mut(&req.approval_id)
.ok_or_else(|| format!("unknown approval '{}'", req.approval_id))?;
approval.status = HostApprovalStatus::Resolved;
approval.resolution = Some(req.resolution);
approval.resolved_at = Some(Utc::now());
let resolved = approval.clone();
drop(approvals);
self.record_event(
"approval.resolved",
resolved.agent_id.clone(),
format!("Approval resolved: {}", resolved.action),
serde_json::to_value(&resolved).map_err(|e| e.to_string())?,
)
.await;
Ok(resolved)
}
pub async fn agents(&self) -> Vec<HostAgent> {
let mut agents: Vec<_> = self.agents.lock().await.values().cloned().collect();
agents.sort_by(|a, b| a.name.cmp(&b.name).then(a.id.cmp(&b.id)));
agents
}
pub async fn approvals(&self) -> Vec<HostApprovalRequest> {
let mut approvals: Vec<_> = self.approvals.lock().await.values().cloned().collect();
approvals.sort_by(|a, b| b.created_at.cmp(&a.created_at));
approvals
}
pub async fn events(&self, limit: usize) -> Vec<HostEvent> {
self.events
.lock()
.await
.iter()
.rev()
.take(limit)
.cloned()
.collect()
}
pub async fn record_event(
&self,
kind: impl Into<String>,
agent_id: Option<String>,
message: impl Into<String>,
payload: Value,
) -> HostEvent {
let event = HostEvent {
id: format!("event-{}", short_id()),
timestamp: Utc::now(),
kind: kind.into(),
agent_id,
message: message.into(),
payload,
};
{
let mut events = self.events.lock().await;
events.push_back(event.clone());
while events.len() > MAX_EVENTS {
events.pop_front();
}
}
self.broadcast_event(&event).await;
event
}
async fn broadcast_event(&self, event: &HostEvent) {
let subscribers: Vec<Arc<WsChannel>> =
self.subscribers.lock().await.values().cloned().collect();
let Ok(json) = serde_json::to_string(&serde_json::json!({
"jsonrpc": "2.0",
"method": "host.event",
"params": event,
})) else {
return;
};
for channel in subscribers {
let _ = channel
.write
.lock()
.await
.send(Message::Text(json.clone().into()))
.await;
}
}
}
fn short_id() -> String {
uuid::Uuid::new_v4().simple().to_string()[..12].to_string()
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn host_tracks_agents_events_and_approvals() {
let host = HostState::new();
let agent = host
.register_agent(
"client-1",
RegisterHostAgentRequest {
id: Some("agent-1".to_string()),
name: "Researcher".to_string(),
kind: "builtin".to_string(),
capabilities: vec!["search".to_string()],
project: Some("/tmp/project".to_string()),
pid: None,
display: car_proto::HostAgentDisplay {
label: Some("Research Lead".to_string()),
icon: Some("magnifying-glass".to_string()),
accent: Some("#0a84ff".to_string()),
},
metadata: Value::Null,
},
)
.await
.expect("register agent");
assert_eq!(agent.status, HostAgentStatus::Idle);
assert_eq!(agent.display.label.as_deref(), Some("Research Lead"));
assert_eq!(agent.display.icon.as_deref(), Some("magnifying-glass"));
assert_eq!(agent.display.accent.as_deref(), Some("#0a84ff"));
assert_eq!(host.agents().await.len(), 1);
let updated = host
.set_status(SetHostAgentStatusRequest {
agent_id: "agent-1".to_string(),
status: HostAgentStatus::Running,
current_task: Some("Collect facts".to_string()),
message: None,
payload: Value::Null,
})
.await
.expect("set status");
assert_eq!(updated.status, HostAgentStatus::Running);
assert_eq!(updated.current_task.as_deref(), Some("Collect facts"));
let approval = host
.create_approval(CreateHostApprovalRequest {
agent_id: Some("agent-1".to_string()),
action: "Run tests".to_string(),
details: serde_json::json!({ "command": "cargo test" }),
options: vec![],
})
.await
.expect("create approval");
assert_eq!(approval.options, vec!["approve", "deny"]);
assert_eq!(approval.status, HostApprovalStatus::Pending);
let resolved = host
.resolve_approval(ResolveHostApprovalRequest {
approval_id: approval.id,
resolution: "approve".to_string(),
})
.await
.expect("resolve approval");
assert_eq!(resolved.status, HostApprovalStatus::Resolved);
assert_eq!(resolved.resolution.as_deref(), Some("approve"));
assert!(host.events(10).await.len() >= 4);
}
}