brainwires_network/remote/
heartbeat.rs1use std::collections::HashMap;
11use std::path::{Path, PathBuf};
12
13use anyhow::Result;
14
15use super::protocol::{AgentEventType, RemoteAgentInfo};
16use crate::ipc::discovery::{cleanup_stale_sockets, list_agent_sessions_with_metadata};
17use crate::ipc::protocol::AgentMetadata;
18
19#[derive(Debug, Clone)]
21pub struct HeartbeatData {
22 pub agents: Vec<RemoteAgentInfo>,
24 pub system_load: f32,
26 pub hostname: String,
28 pub os: String,
30 pub version: String,
32}
33
34#[derive(Debug, Clone)]
36pub struct AgentEvent {
37 pub event_type: AgentEventType,
39 pub agent_id: String,
41 pub data: serde_json::Value,
43}
44
45pub struct HeartbeatCollector {
47 last_agents: HashMap<String, RemoteAgentInfo>,
49 sessions_dir: PathBuf,
51 version: String,
53}
54
55impl HeartbeatCollector {
56 pub fn new(sessions_dir: PathBuf, version: String) -> Self {
62 Self {
63 last_agents: HashMap::new(),
64 sessions_dir,
65 version,
66 }
67 }
68
69 pub async fn collect(&mut self) -> Result<HeartbeatData> {
74 if let Err(e) = cleanup_stale_sockets(&self.sessions_dir).await {
76 tracing::warn!("Failed to cleanup stale sockets: {}", e);
77 }
78
79 let metadata_list =
80 list_agent_sessions_with_metadata(&self.sessions_dir).unwrap_or_default();
81
82 let agents: Vec<RemoteAgentInfo> = metadata_list
83 .into_iter()
84 .map(RemoteAgentInfo::from)
85 .collect();
86
87 self.last_agents = agents
89 .iter()
90 .map(|a| (a.session_id.clone(), a.clone()))
91 .collect();
92
93 Ok(HeartbeatData {
94 agents,
95 system_load: get_system_load(),
96 hostname: gethostname::gethostname().to_string_lossy().to_string(),
97 os: std::env::consts::OS.to_string(),
98 version: self.version.clone(),
99 })
100 }
101
102 pub fn detect_changes(&mut self) -> Result<Vec<AgentEvent>> {
106 let current_metadata =
107 list_agent_sessions_with_metadata(&self.sessions_dir).unwrap_or_default();
108 let current_agents: HashMap<String, RemoteAgentInfo> = current_metadata
109 .into_iter()
110 .map(|m| {
111 let info = RemoteAgentInfo::from(m);
112 (info.session_id.clone(), info)
113 })
114 .collect();
115
116 let mut events = Vec::new();
117
118 for (session_id, agent) in ¤t_agents {
120 if !self.last_agents.contains_key(session_id) {
121 events.push(AgentEvent {
122 event_type: AgentEventType::Spawned,
123 agent_id: session_id.clone(),
124 data: serde_json::to_value(agent).unwrap_or_default(),
125 });
126 }
127 }
128
129 for session_id in self.last_agents.keys() {
131 if !current_agents.contains_key(session_id) {
132 events.push(AgentEvent {
133 event_type: AgentEventType::Exited,
134 agent_id: session_id.clone(),
135 data: serde_json::json!({}),
136 });
137 }
138 }
139
140 for (session_id, current) in ¤t_agents {
142 if let Some(previous) = self.last_agents.get(session_id) {
143 if current.is_busy != previous.is_busy {
145 events.push(AgentEvent {
146 event_type: if current.is_busy {
147 AgentEventType::Busy
148 } else {
149 AgentEventType::Idle
150 },
151 agent_id: session_id.clone(),
152 data: serde_json::json!({
153 "is_busy": current.is_busy,
154 "status": current.status,
155 }),
156 });
157 }
158
159 if current.message_count != previous.message_count
161 || current.status != previous.status
162 {
163 events.push(AgentEvent {
164 event_type: AgentEventType::StateChanged,
165 agent_id: session_id.clone(),
166 data: serde_json::json!({
167 "message_count": current.message_count,
168 "status": current.status,
169 "previous_message_count": previous.message_count,
170 "previous_status": previous.status,
171 }),
172 });
173 }
174 }
175 }
176
177 self.last_agents = current_agents;
179
180 Ok(events)
181 }
182
183 pub fn get_current_agents(&self) -> Vec<RemoteAgentInfo> {
185 self.last_agents.values().cloned().collect()
186 }
187
188 pub fn has_agents(&self) -> bool {
190 !self.last_agents.is_empty()
191 }
192
193 pub fn agent_count(&self) -> usize {
195 self.last_agents.len()
196 }
197
198 pub fn sessions_dir(&self) -> &Path {
200 &self.sessions_dir
201 }
202}
203
204impl From<AgentMetadata> for RemoteAgentInfo {
206 fn from(meta: AgentMetadata) -> Self {
207 Self {
208 session_id: meta.session_id,
209 model: meta.model,
210 is_busy: meta.is_busy,
211 parent_id: meta.parent_agent_id,
212 working_directory: meta.working_directory,
213 message_count: 0, last_activity: meta.last_activity,
215 status: if meta.is_busy {
216 "busy".to_string()
217 } else {
218 "idle".to_string()
219 },
220 name: meta.spawn_reason,
221 }
222 }
223}
224
225fn get_system_load() -> f32 {
230 #[cfg(target_os = "linux")]
232 {
233 if let Ok(contents) = std::fs::read_to_string("/proc/loadavg")
234 && let Some(first) = contents.split_whitespace().next()
235 && let Ok(load) = first.parse::<f32>()
236 {
237 let num_cpus = std::thread::available_parallelism()
239 .map(|p| p.get() as f32)
240 .unwrap_or(1.0);
241 return (load / num_cpus).min(1.0);
242 }
243 }
244
245 #[cfg(target_os = "macos")]
247 {
248 }
251
252 0.0
253}
254
255#[cfg(test)]
256mod tests {
257 use super::*;
258
259 #[test]
260 fn test_heartbeat_collector_new() {
261 let temp_dir = tempfile::tempdir().unwrap();
262 let collector =
263 HeartbeatCollector::new(temp_dir.path().to_path_buf(), "0.1.0-test".to_string());
264 assert!(!collector.has_agents());
265 assert_eq!(collector.agent_count(), 0);
266 }
267
268 #[test]
269 fn test_remote_agent_info_from_metadata() {
270 let metadata = AgentMetadata::new(
271 "test-session".to_string(),
272 "claude-3-5-sonnet".to_string(),
273 "/home/user/project".to_string(),
274 );
275
276 let info = RemoteAgentInfo::from(metadata);
277
278 assert_eq!(info.session_id, "test-session");
279 assert_eq!(info.model, "claude-3-5-sonnet");
280 assert_eq!(info.working_directory, "/home/user/project");
281 assert!(!info.is_busy);
282 assert_eq!(info.status, "idle");
283 }
284
285 #[test]
286 fn test_remote_agent_info_busy_status() {
287 let mut metadata = AgentMetadata::new(
288 "busy-session".to_string(),
289 "gpt-4".to_string(),
290 "/tmp".to_string(),
291 );
292 metadata.is_busy = true;
293
294 let info = RemoteAgentInfo::from(metadata);
295
296 assert!(info.is_busy);
297 assert_eq!(info.status, "busy");
298 }
299
300 #[test]
301 fn test_system_load() {
302 let load = get_system_load();
303 assert!(load >= 0.0);
304 assert!(load <= 1.0);
305 }
306}