use std::collections::HashMap;
use std::path::{Path, PathBuf};
use anyhow::Result;
use super::protocol::{AgentEventType, RemoteAgentInfo};
use crate::ipc::discovery::{cleanup_stale_sockets, list_agent_sessions_with_metadata};
use crate::ipc::protocol::AgentMetadata;
#[derive(Debug, Clone)]
pub struct HeartbeatData {
pub agents: Vec<RemoteAgentInfo>,
pub system_load: f32,
pub hostname: String,
pub os: String,
pub version: String,
}
#[derive(Debug, Clone)]
pub struct AgentEvent {
pub event_type: AgentEventType,
pub agent_id: String,
pub data: serde_json::Value,
}
pub struct HeartbeatCollector {
last_agents: HashMap<String, RemoteAgentInfo>,
sessions_dir: PathBuf,
version: String,
}
impl HeartbeatCollector {
pub fn new(sessions_dir: PathBuf, version: String) -> Self {
Self {
last_agents: HashMap::new(),
sessions_dir,
version,
}
}
pub async fn collect(&mut self) -> Result<HeartbeatData> {
if let Err(e) = cleanup_stale_sockets(&self.sessions_dir).await {
tracing::warn!("Failed to cleanup stale sockets: {}", e);
}
let metadata_list = list_agent_sessions_with_metadata(&self.sessions_dir).unwrap_or_default();
let agents: Vec<RemoteAgentInfo> = metadata_list
.into_iter()
.map(RemoteAgentInfo::from)
.collect();
self.last_agents = agents
.iter()
.map(|a| (a.session_id.clone(), a.clone()))
.collect();
Ok(HeartbeatData {
agents,
system_load: get_system_load(),
hostname: gethostname::gethostname().to_string_lossy().to_string(),
os: std::env::consts::OS.to_string(),
version: self.version.clone(),
})
}
pub fn detect_changes(&mut self) -> Result<Vec<AgentEvent>> {
let current_metadata = list_agent_sessions_with_metadata(&self.sessions_dir).unwrap_or_default();
let current_agents: HashMap<String, RemoteAgentInfo> = current_metadata
.into_iter()
.map(|m| {
let info = RemoteAgentInfo::from(m);
(info.session_id.clone(), info)
})
.collect();
let mut events = Vec::new();
for (session_id, agent) in ¤t_agents {
if !self.last_agents.contains_key(session_id) {
events.push(AgentEvent {
event_type: AgentEventType::Spawned,
agent_id: session_id.clone(),
data: serde_json::to_value(agent).unwrap_or_default(),
});
}
}
for session_id in self.last_agents.keys() {
if !current_agents.contains_key(session_id) {
events.push(AgentEvent {
event_type: AgentEventType::Exited,
agent_id: session_id.clone(),
data: serde_json::json!({}),
});
}
}
for (session_id, current) in ¤t_agents {
if let Some(previous) = self.last_agents.get(session_id) {
if current.is_busy != previous.is_busy {
events.push(AgentEvent {
event_type: if current.is_busy {
AgentEventType::Busy
} else {
AgentEventType::Idle
},
agent_id: session_id.clone(),
data: serde_json::json!({
"is_busy": current.is_busy,
"status": current.status,
}),
});
}
if current.message_count != previous.message_count
|| current.status != previous.status
{
events.push(AgentEvent {
event_type: AgentEventType::StateChanged,
agent_id: session_id.clone(),
data: serde_json::json!({
"message_count": current.message_count,
"status": current.status,
"previous_message_count": previous.message_count,
"previous_status": previous.status,
}),
});
}
}
}
self.last_agents = current_agents;
Ok(events)
}
pub fn get_current_agents(&self) -> Vec<RemoteAgentInfo> {
self.last_agents.values().cloned().collect()
}
pub fn has_agents(&self) -> bool {
!self.last_agents.is_empty()
}
pub fn agent_count(&self) -> usize {
self.last_agents.len()
}
pub fn sessions_dir(&self) -> &Path {
&self.sessions_dir
}
}
impl From<AgentMetadata> for RemoteAgentInfo {
fn from(meta: AgentMetadata) -> Self {
Self {
session_id: meta.session_id,
model: meta.model,
is_busy: meta.is_busy,
parent_id: meta.parent_agent_id,
working_directory: meta.working_directory,
message_count: 0, last_activity: meta.last_activity,
status: if meta.is_busy {
"busy".to_string()
} else {
"idle".to_string()
},
name: meta.spawn_reason,
}
}
}
fn get_system_load() -> f32 {
#[cfg(target_os = "linux")]
{
if let Ok(contents) = std::fs::read_to_string("/proc/loadavg")
&& let Some(first) = contents.split_whitespace().next()
&& let Ok(load) = first.parse::<f32>() {
let num_cpus = std::thread::available_parallelism()
.map(|p| p.get() as f32)
.unwrap_or(1.0);
return (load / num_cpus).min(1.0);
}
}
#[cfg(target_os = "macos")]
{
}
0.0
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_heartbeat_collector_new() {
let temp_dir = tempfile::tempdir().unwrap();
let collector = HeartbeatCollector::new(
temp_dir.path().to_path_buf(),
"0.1.0-test".to_string(),
);
assert!(!collector.has_agents());
assert_eq!(collector.agent_count(), 0);
}
#[test]
fn test_remote_agent_info_from_metadata() {
let metadata = AgentMetadata::new(
"test-session".to_string(),
"claude-3-5-sonnet".to_string(),
"/home/user/project".to_string(),
);
let info = RemoteAgentInfo::from(metadata);
assert_eq!(info.session_id, "test-session");
assert_eq!(info.model, "claude-3-5-sonnet");
assert_eq!(info.working_directory, "/home/user/project");
assert!(!info.is_busy);
assert_eq!(info.status, "idle");
}
#[test]
fn test_remote_agent_info_busy_status() {
let mut metadata = AgentMetadata::new(
"busy-session".to_string(),
"gpt-4".to_string(),
"/tmp".to_string(),
);
metadata.is_busy = true;
let info = RemoteAgentInfo::from(metadata);
assert!(info.is_busy);
assert_eq!(info.status, "busy");
}
#[test]
fn test_system_load() {
let load = get_system_load();
assert!(load >= 0.0);
assert!(load <= 1.0);
}
}