Skip to main content

feagi_agent/core/
heartbeat.rs

1// Copyright 2025 Neuraville Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Heartbeat service for maintaining agent liveness
5
6use crate::core::error::{Result, SdkError};
7use std::future::Future;
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::{Arc, Mutex};
10use std::thread::{self, JoinHandle};
11use std::time::Duration;
12use tokio::runtime::Handle;
13use tokio::runtime::Runtime;
14use tokio::task::block_in_place;
15use tokio::time::timeout;
16use tracing::{debug, warn};
17use zeromq::{DealerSocket, SocketRecv, SocketSend, ZmqMessage};
18
19fn block_on_with<T>(
20    handle: &Handle,
21    runtime: Option<&Runtime>,
22    future: impl Future<Output = T>,
23) -> T {
24    if Handle::try_current().is_ok() {
25        block_in_place(|| handle.block_on(future))
26    } else if let Some(runtime) = runtime {
27        runtime.block_on(future)
28    } else {
29        handle.block_on(future)
30    }
31}
32
33/// Registration payload used to re-register an agent after FEAGI restarts.
34///
35/// @cursor:critical-path
36/// This is used ONLY for automatic recovery when FEAGI loses in-memory registry state.
37#[derive(Debug, Clone)]
38pub struct ReconnectSpec {
39    pub agent_id: String,
40    pub agent_type: String,
41    pub capabilities: serde_json::Value,
42    pub registration_retries: u32,
43    pub retry_backoff_ms: u64,
44}
45
46/// Heartbeat service managing periodic keepalive messages
47pub struct HeartbeatService {
48    /// Agent ID
49    agent_id: String,
50
51    /// ZMQ registration socket (shared with main client)
52    socket: Arc<Mutex<DealerSocket>>,
53
54    /// Tokio runtime handle (ambient if available)
55    runtime_handle: Handle,
56
57    /// Owned runtime when created outside tokio
58    runtime: Option<Arc<Runtime>>,
59
60    /// Heartbeat interval
61    interval: Duration,
62
63    /// Running flag
64    running: Arc<AtomicBool>,
65
66    /// Thread handle
67    thread: Option<JoinHandle<()>>,
68
69    /// Optional auto-reconnect/re-register configuration
70    reconnect: Option<ReconnectSpec>,
71}
72
73impl HeartbeatService {
74    /// Create a new heartbeat service
75    ///
76    /// # Arguments
77    /// * `agent_id` - Agent identifier
78    /// * `socket` - Shared ZMQ socket for sending heartbeats
79    /// * `interval_secs` - Heartbeat interval in seconds
80    pub fn new(
81        agent_id: String,
82        socket: Arc<Mutex<DealerSocket>>,
83        runtime_handle: Handle,
84        runtime: Option<Arc<Runtime>>,
85        interval_secs: f64,
86    ) -> Self {
87        Self {
88            agent_id,
89            socket,
90            runtime_handle,
91            runtime,
92            interval: Duration::from_secs_f64(interval_secs),
93            running: Arc::new(AtomicBool::new(false)),
94            thread: None,
95            reconnect: None,
96        }
97    }
98
99    /// Enable automatic re-register attempts when heartbeats are rejected due to agent not found.
100    ///
101    /// This is intended for FEAGI restarts (registry reset). It does NOT attempt reconnect
102    /// after voluntary deregistration because `AgentClient` stops this service on disconnect.
103    pub fn with_reconnect_spec(mut self, spec: ReconnectSpec) -> Self {
104        self.reconnect = Some(spec);
105        self
106    }
107
108    /// Start the heartbeat service
109    pub fn start(&mut self) -> Result<()> {
110        if self.running.load(Ordering::Relaxed) {
111            return Err(SdkError::Other(
112                "Heartbeat service already running".to_string(),
113            ));
114        }
115
116        self.running.store(true, Ordering::Relaxed);
117
118        let agent_id = self.agent_id.clone();
119        let socket = Arc::clone(&self.socket);
120        let runtime_handle = self.runtime_handle.clone();
121        let runtime = self.runtime.clone();
122        let interval = self.interval;
123        let running = Arc::clone(&self.running);
124        let reconnect = self.reconnect.clone();
125
126        let thread = thread::spawn(move || {
127            debug!("[HEARTBEAT] Service started for agent: {}", agent_id);
128
129            while running.load(Ordering::Relaxed) {
130                // Sleep first to avoid immediate heartbeat after registration
131                thread::sleep(interval);
132
133                if !running.load(Ordering::Relaxed) {
134                    break;
135                }
136
137                // Send heartbeat
138                if let Err(e) = Self::send_heartbeat(
139                    &agent_id,
140                    &socket,
141                    &runtime_handle,
142                    runtime.as_deref(),
143                    reconnect.as_ref(),
144                ) {
145                    warn!(
146                        "[HEARTBEAT] Failed to send heartbeat for {}: {}",
147                        agent_id, e
148                    );
149                    // Don't stop on error - network might recover
150                }
151            }
152
153            debug!("[HEARTBEAT] Service stopped for agent: {}", agent_id);
154        });
155
156        self.thread = Some(thread);
157        Ok(())
158    }
159
160    /// Stop the heartbeat service
161    ///
162    /// This ensures proper thread cleanup:
163    /// 1. Signal thread to stop
164    /// 2. Wait for thread to finish (with timeout)
165    /// 3. Force terminate if stuck
166    pub fn stop(&mut self) {
167        if !self.running.load(Ordering::Relaxed) {
168            debug!(
169                "[HEARTBEAT] Service already stopped for agent: {}",
170                self.agent_id
171            );
172            return;
173        }
174
175        debug!("[HEARTBEAT] Stopping service for agent: {}", self.agent_id);
176
177        // Step 1: Signal thread to stop
178        self.running.store(false, Ordering::Relaxed);
179
180        // Step 2: Wait for thread to finish
181        if let Some(thread) = self.thread.take() {
182            match thread.join() {
183                Ok(_) => {
184                    debug!(
185                        "[HEARTBEAT] Thread stopped cleanly for agent: {}",
186                        self.agent_id
187                    );
188                }
189                Err(e) => {
190                    warn!(
191                        "[HEARTBEAT] Thread join failed for agent {} (thread may have panicked): {:?}",
192                        self.agent_id, e
193                    );
194                }
195            }
196        }
197
198        debug!(
199            "[HEARTBEAT] Service fully stopped for agent: {}",
200            self.agent_id
201        );
202    }
203
204    /// Send a single heartbeat message
205    fn send_heartbeat(
206        agent_id: &str,
207        socket: &Arc<Mutex<DealerSocket>>,
208        runtime_handle: &Handle,
209        runtime: Option<&Runtime>,
210        reconnect: Option<&ReconnectSpec>,
211    ) -> Result<()> {
212        let message = serde_json::json!({
213            "method": "POST",
214            "path": "/v1/agent/heartbeat",
215            "body": {
216                "agent_id": agent_id,
217                "timestamp": std::time::SystemTime::now()
218                    .duration_since(std::time::UNIX_EPOCH)
219                    .unwrap()
220                    .as_millis() as u64,
221            }
222        });
223
224        let mut socket_guard = socket
225            .lock()
226            .map_err(|e| SdkError::ThreadError(format!("Failed to lock socket: {}", e)))?;
227
228        // Send heartbeat request
229        let message = ZmqMessage::from(message.to_string().into_bytes());
230        block_on_with(runtime_handle, runtime, socket_guard.send(message))
231            .map_err(SdkError::Zmq)?;
232
233        // Wait for response (ROUTER replies may include empty delimiter)
234        let response = block_on_with(runtime_handle, runtime, async {
235            timeout(Duration::from_millis(1000), socket_guard.recv()).await
236        })
237        .map_err(|_| SdkError::Timeout("Heartbeat response timeout".to_string()))?
238        .map_err(SdkError::Zmq)?;
239        let frames = response.into_vec();
240        let last = frames
241            .last()
242            .ok_or_else(|| SdkError::Other("Heartbeat reply was empty".to_string()))?;
243        let response: serde_json::Value = serde_json::from_slice(last)?;
244
245        // Heartbeat response schema varies by FEAGI version/transport:
246        // - Legacy: {"status":"success", ...}
247        // - HTTP-style: {"status":200,"body":{"message":"ok"}, ...}
248        //
249        // Treat both as success deterministically.
250        let status_value = response.get("status");
251        let is_success = match status_value {
252            Some(serde_json::Value::String(s)) => s == "success" || s == "ok",
253            Some(serde_json::Value::Number(n)) => n.as_u64() == Some(200),
254            _ => false,
255        };
256
257        if is_success {
258            debug!("[HEARTBEAT] ✓ Heartbeat acknowledged for {}", agent_id);
259            Ok(())
260        } else {
261            warn!("[HEARTBEAT] ⚠ Heartbeat rejected: {:?}", response);
262            if Self::is_agent_not_registered(&response) {
263                if let Some(spec) = reconnect {
264                    if Self::try_re_register(spec, socket, runtime_handle, runtime).is_ok() {
265                        debug!(
266                            "[HEARTBEAT] ✓ Auto re-registered agent after heartbeat rejection: {}",
267                            agent_id
268                        );
269                        return Ok(());
270                    }
271                }
272            }
273            Err(SdkError::HeartbeatFailed(format!("{:?}", response)))
274        }
275    }
276
277    /// Detect FEAGI responses indicating the agent is not currently registered.
278    fn is_agent_not_registered(response: &serde_json::Value) -> bool {
279        let status = response
280            .get("status")
281            .and_then(|v| v.as_u64())
282            .unwrap_or_default();
283        if status != 404 {
284            return false;
285        }
286        response
287            .get("body")
288            .and_then(|b| b.get("error"))
289            .and_then(|e| e.as_str())
290            .map(|s| s.contains("not found in registry") || s.contains("not found"))
291            .unwrap_or(false)
292    }
293
294    /// Attempt to re-register the agent (used after FEAGI restarts).
295    fn try_re_register(
296        spec: &ReconnectSpec,
297        socket: &Arc<Mutex<DealerSocket>>,
298        runtime_handle: &Handle,
299        runtime: Option<&Runtime>,
300    ) -> Result<()> {
301        let registration_msg = serde_json::json!({
302            "method": "POST",
303            "path": "/v1/agent/register",
304            "body": {
305                "agent_id": spec.agent_id,
306                "agent_type": spec.agent_type,
307                "capabilities": spec.capabilities,
308            }
309        });
310
311        let mut attempt = 0u32;
312        loop {
313            attempt += 1;
314            let mut socket = socket
315                .lock()
316                .map_err(|e| SdkError::ThreadError(format!("Failed to lock socket: {}", e)))?;
317
318            let message = ZmqMessage::from(registration_msg.to_string().into_bytes());
319            block_on_with(runtime_handle, runtime, socket.send(message)).map_err(SdkError::Zmq)?;
320            let response = block_on_with(runtime_handle, runtime, async {
321                timeout(Duration::from_millis(1000), socket.recv()).await
322            })
323            .map_err(|_| SdkError::Timeout("Registration response timeout".to_string()))?
324            .map_err(SdkError::Zmq)?;
325            let frames = response.into_vec();
326            let last = frames
327                .last()
328                .ok_or_else(|| SdkError::Other("Registration reply was empty".to_string()))?;
329            let response: serde_json::Value = serde_json::from_slice(last)?;
330            let status_code = response
331                .get("status")
332                .and_then(|s| s.as_u64())
333                .unwrap_or(500);
334            if status_code == 200 {
335                return Ok(());
336            }
337
338            if attempt > spec.registration_retries {
339                return Err(SdkError::RegistrationFailed(
340                    "Auto re-register failed (exhausted retries)".to_string(),
341                ));
342            }
343
344            // Deterministic backoff controlled by AgentConfig (no hardcoded defaults here).
345            std::thread::sleep(Duration::from_millis(spec.retry_backoff_ms));
346        }
347    }
348
349    /// Check if heartbeat service is running
350    pub fn is_running(&self) -> bool {
351        self.running.load(Ordering::Relaxed)
352    }
353}
354
355impl Drop for HeartbeatService {
356    fn drop(&mut self) {
357        self.stop();
358    }
359}
360
361#[cfg(test)]
362mod tests {
363    #[test]
364    fn test_heartbeat_service_lifecycle() {
365        // Create mock socket (would need actual ZMQ context in real test)
366        // This is a placeholder test structure
367
368        // Note: Full integration tests require actual ZMQ sockets
369        // and a running FEAGI instance
370    }
371}