Skip to main content

brainwires_network/remote/
heartbeat.rs

1//! Heartbeat collector for agent discovery and status monitoring
2//!
3//! Collects information about all running agents and detects changes
4//! for broadcasting to the remote backend.
5//!
6//! All functions take `sessions_dir: &Path` instead of depending on
7//! CLI-specific path resolution. The CLI passes `PlatformPaths::sessions_dir()`
8//! at call sites.
9
10use std::collections::HashMap;
11use std::path::{Path, PathBuf};
12
13use anyhow::Result;
14
15use super::protocol::{AgentEventType, RemoteAgentInfo};
16use crate::ipc::discovery::{cleanup_stale_sockets, list_agent_sessions_with_metadata};
17use crate::ipc::protocol::AgentMetadata;
18
19/// Data collected during a heartbeat
20#[derive(Debug, Clone)]
21pub struct HeartbeatData {
22    /// Current state of all agents
23    pub agents: Vec<RemoteAgentInfo>,
24    /// System CPU load (0.0 to 1.0)
25    pub system_load: f32,
26    /// Hostname of the machine
27    pub hostname: String,
28    /// Operating system
29    pub os: String,
30    /// CLI version
31    pub version: String,
32}
33
34/// Agent state change event
35#[derive(Debug, Clone)]
36pub struct AgentEvent {
37    /// Type of event
38    pub event_type: AgentEventType,
39    /// Agent session ID
40    pub agent_id: String,
41    /// Additional event data
42    pub data: serde_json::Value,
43}
44
45/// Collects heartbeat data and detects agent changes
46pub struct HeartbeatCollector {
47    /// Last known state of agents (session_id -> info)
48    last_agents: HashMap<String, RemoteAgentInfo>,
49    /// Sessions directory for agent discovery
50    sessions_dir: PathBuf,
51    /// Version string (injected, not read from env!)
52    version: String,
53}
54
55impl HeartbeatCollector {
56    /// Create a new heartbeat collector
57    ///
58    /// # Arguments
59    /// * `sessions_dir` - Directory containing agent session files
60    /// * `version` - CLI version string (injected to avoid env! dependency)
61    pub fn new(sessions_dir: PathBuf, version: String) -> Self {
62        Self {
63            last_agents: HashMap::new(),
64            sessions_dir,
65            version,
66        }
67    }
68
69    /// Collect current state of all agents
70    ///
71    /// This also cleans up stale socket files from dead sessions to ensure
72    /// only actually running agents are reported.
73    pub async fn collect(&mut self) -> Result<HeartbeatData> {
74        // Clean up stale sockets first to avoid reporting dead sessions
75        if let Err(e) = cleanup_stale_sockets(&self.sessions_dir).await {
76            tracing::warn!("Failed to cleanup stale sockets: {}", e);
77        }
78
79        let metadata_list =
80            list_agent_sessions_with_metadata(&self.sessions_dir).unwrap_or_default();
81
82        let agents: Vec<RemoteAgentInfo> = metadata_list
83            .into_iter()
84            .map(RemoteAgentInfo::from)
85            .collect();
86
87        // Update last known state
88        self.last_agents = agents
89            .iter()
90            .map(|a| (a.session_id.clone(), a.clone()))
91            .collect();
92
93        Ok(HeartbeatData {
94            agents,
95            system_load: get_system_load(),
96            hostname: gethostname::gethostname().to_string_lossy().to_string(),
97            os: std::env::consts::OS.to_string(),
98            version: self.version.clone(),
99        })
100    }
101
102    /// Detect changes since last collection
103    ///
104    /// Returns a list of agent events representing what changed.
105    pub fn detect_changes(&mut self) -> Result<Vec<AgentEvent>> {
106        let current_metadata =
107            list_agent_sessions_with_metadata(&self.sessions_dir).unwrap_or_default();
108        let current_agents: HashMap<String, RemoteAgentInfo> = current_metadata
109            .into_iter()
110            .map(|m| {
111                let info = RemoteAgentInfo::from(m);
112                (info.session_id.clone(), info)
113            })
114            .collect();
115
116        let mut events = Vec::new();
117
118        // Check for new agents (spawned)
119        for (session_id, agent) in &current_agents {
120            if !self.last_agents.contains_key(session_id) {
121                events.push(AgentEvent {
122                    event_type: AgentEventType::Spawned,
123                    agent_id: session_id.clone(),
124                    data: serde_json::to_value(agent).unwrap_or_default(),
125                });
126            }
127        }
128
129        // Check for removed agents (exited)
130        for session_id in self.last_agents.keys() {
131            if !current_agents.contains_key(session_id) {
132                events.push(AgentEvent {
133                    event_type: AgentEventType::Exited,
134                    agent_id: session_id.clone(),
135                    data: serde_json::json!({}),
136                });
137            }
138        }
139
140        // Check for state changes in existing agents
141        for (session_id, current) in &current_agents {
142            if let Some(previous) = self.last_agents.get(session_id) {
143                // Check busy state change
144                if current.is_busy != previous.is_busy {
145                    events.push(AgentEvent {
146                        event_type: if current.is_busy {
147                            AgentEventType::Busy
148                        } else {
149                            AgentEventType::Idle
150                        },
151                        agent_id: session_id.clone(),
152                        data: serde_json::json!({
153                            "is_busy": current.is_busy,
154                            "status": current.status,
155                        }),
156                    });
157                }
158
159                // Check for other state changes (message count, status)
160                if current.message_count != previous.message_count
161                    || current.status != previous.status
162                {
163                    events.push(AgentEvent {
164                        event_type: AgentEventType::StateChanged,
165                        agent_id: session_id.clone(),
166                        data: serde_json::json!({
167                            "message_count": current.message_count,
168                            "status": current.status,
169                            "previous_message_count": previous.message_count,
170                            "previous_status": previous.status,
171                        }),
172                    });
173                }
174            }
175        }
176
177        // Update last known state
178        self.last_agents = current_agents;
179
180        Ok(events)
181    }
182
183    /// Get the current list of agents without updating state
184    pub fn get_current_agents(&self) -> Vec<RemoteAgentInfo> {
185        self.last_agents.values().cloned().collect()
186    }
187
188    /// Check if any agents are currently tracked
189    pub fn has_agents(&self) -> bool {
190        !self.last_agents.is_empty()
191    }
192
193    /// Get agent count
194    pub fn agent_count(&self) -> usize {
195        self.last_agents.len()
196    }
197
198    /// Get the sessions directory
199    pub fn sessions_dir(&self) -> &Path {
200        &self.sessions_dir
201    }
202}
203
204/// Convert from IPC AgentMetadata to Remote AgentInfo
205impl From<AgentMetadata> for RemoteAgentInfo {
206    fn from(meta: AgentMetadata) -> Self {
207        Self {
208            session_id: meta.session_id,
209            model: meta.model,
210            is_busy: meta.is_busy,
211            parent_id: meta.parent_agent_id,
212            working_directory: meta.working_directory,
213            message_count: 0, // Not tracked in AgentMetadata
214            last_activity: meta.last_activity,
215            status: if meta.is_busy {
216                "busy".to_string()
217            } else {
218                "idle".to_string()
219            },
220            name: meta.spawn_reason,
221        }
222    }
223}
224
225/// Get current system CPU load
226///
227/// Returns a value between 0.0 and 1.0.
228/// Falls back to 0.0 if load cannot be determined.
229fn get_system_load() -> f32 {
230    // Try to read from /proc/loadavg on Linux
231    #[cfg(target_os = "linux")]
232    {
233        if let Ok(contents) = std::fs::read_to_string("/proc/loadavg")
234            && let Some(first) = contents.split_whitespace().next()
235            && let Ok(load) = first.parse::<f32>()
236        {
237            // Normalize by number of CPUs
238            let num_cpus = std::thread::available_parallelism()
239                .map(|p| p.get() as f32)
240                .unwrap_or(1.0);
241            return (load / num_cpus).min(1.0);
242        }
243    }
244
245    // Try sysctl on macOS
246    #[cfg(target_os = "macos")]
247    {
248        // macOS load average would require different syscalls
249        // For now, return 0.0 as placeholder
250    }
251
252    0.0
253}
254
255#[cfg(test)]
256mod tests {
257    use super::*;
258
259    #[test]
260    fn test_heartbeat_collector_new() {
261        let temp_dir = tempfile::tempdir().unwrap();
262        let collector =
263            HeartbeatCollector::new(temp_dir.path().to_path_buf(), "0.1.0-test".to_string());
264        assert!(!collector.has_agents());
265        assert_eq!(collector.agent_count(), 0);
266    }
267
268    #[test]
269    fn test_remote_agent_info_from_metadata() {
270        let metadata = AgentMetadata::new(
271            "test-session".to_string(),
272            "claude-3-5-sonnet".to_string(),
273            "/home/user/project".to_string(),
274        );
275
276        let info = RemoteAgentInfo::from(metadata);
277
278        assert_eq!(info.session_id, "test-session");
279        assert_eq!(info.model, "claude-3-5-sonnet");
280        assert_eq!(info.working_directory, "/home/user/project");
281        assert!(!info.is_busy);
282        assert_eq!(info.status, "idle");
283    }
284
285    #[test]
286    fn test_remote_agent_info_busy_status() {
287        let mut metadata = AgentMetadata::new(
288            "busy-session".to_string(),
289            "gpt-4".to_string(),
290            "/tmp".to_string(),
291        );
292        metadata.is_busy = true;
293
294        let info = RemoteAgentInfo::from(metadata);
295
296        assert!(info.is_busy);
297        assert_eq!(info.status, "busy");
298    }
299
300    #[test]
301    fn test_system_load() {
302        let load = get_system_load();
303        assert!(load >= 0.0);
304        assert!(load <= 1.0);
305    }
306}