1use crate::core::error::{Result, SdkError};
7use std::future::Future;
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::{Arc, Mutex};
10use std::thread::{self, JoinHandle};
11use std::time::Duration;
12use tokio::runtime::Handle;
13use tokio::runtime::Runtime;
14use tokio::task::block_in_place;
15use tokio::time::timeout;
16use tracing::{debug, warn};
17use zeromq::{DealerSocket, SocketRecv, SocketSend, ZmqMessage};
18
19fn block_on_with<T>(
20 handle: &Handle,
21 runtime: Option<&Runtime>,
22 future: impl Future<Output = T>,
23) -> T {
24 if Handle::try_current().is_ok() {
25 block_in_place(|| handle.block_on(future))
26 } else if let Some(runtime) = runtime {
27 runtime.block_on(future)
28 } else {
29 handle.block_on(future)
30 }
31}
32
33#[derive(Debug, Clone)]
38pub struct ReconnectSpec {
39 pub agent_id: String,
40 pub agent_type: String,
41 pub capabilities: serde_json::Value,
42 pub registration_retries: u32,
43 pub retry_backoff_ms: u64,
44}
45
46pub struct HeartbeatService {
48 agent_id: String,
50
51 socket: Arc<Mutex<DealerSocket>>,
53
54 runtime_handle: Handle,
56
57 runtime: Option<Arc<Runtime>>,
59
60 interval: Duration,
62
63 running: Arc<AtomicBool>,
65
66 thread: Option<JoinHandle<()>>,
68
69 reconnect: Option<ReconnectSpec>,
71}
72
73impl HeartbeatService {
74 pub fn new(
81 agent_id: String,
82 socket: Arc<Mutex<DealerSocket>>,
83 runtime_handle: Handle,
84 runtime: Option<Arc<Runtime>>,
85 interval_secs: f64,
86 ) -> Self {
87 Self {
88 agent_id,
89 socket,
90 runtime_handle,
91 runtime,
92 interval: Duration::from_secs_f64(interval_secs),
93 running: Arc::new(AtomicBool::new(false)),
94 thread: None,
95 reconnect: None,
96 }
97 }
98
99 pub fn with_reconnect_spec(mut self, spec: ReconnectSpec) -> Self {
104 self.reconnect = Some(spec);
105 self
106 }
107
108 pub fn start(&mut self) -> Result<()> {
110 if self.running.load(Ordering::Relaxed) {
111 return Err(SdkError::Other(
112 "Heartbeat service already running".to_string(),
113 ));
114 }
115
116 self.running.store(true, Ordering::Relaxed);
117
118 let agent_id = self.agent_id.clone();
119 let socket = Arc::clone(&self.socket);
120 let runtime_handle = self.runtime_handle.clone();
121 let runtime = self.runtime.clone();
122 let interval = self.interval;
123 let running = Arc::clone(&self.running);
124 let reconnect = self.reconnect.clone();
125
126 let thread = thread::spawn(move || {
127 debug!("[HEARTBEAT] Service started for agent: {}", agent_id);
128
129 while running.load(Ordering::Relaxed) {
130 thread::sleep(interval);
132
133 if !running.load(Ordering::Relaxed) {
134 break;
135 }
136
137 if let Err(e) = Self::send_heartbeat(
139 &agent_id,
140 &socket,
141 &runtime_handle,
142 runtime.as_deref(),
143 reconnect.as_ref(),
144 ) {
145 warn!(
146 "[HEARTBEAT] Failed to send heartbeat for {}: {}",
147 agent_id, e
148 );
149 }
151 }
152
153 debug!("[HEARTBEAT] Service stopped for agent: {}", agent_id);
154 });
155
156 self.thread = Some(thread);
157 Ok(())
158 }
159
160 pub fn stop(&mut self) {
167 if !self.running.load(Ordering::Relaxed) {
168 debug!(
169 "[HEARTBEAT] Service already stopped for agent: {}",
170 self.agent_id
171 );
172 return;
173 }
174
175 debug!("[HEARTBEAT] Stopping service for agent: {}", self.agent_id);
176
177 self.running.store(false, Ordering::Relaxed);
179
180 if let Some(thread) = self.thread.take() {
182 match thread.join() {
183 Ok(_) => {
184 debug!(
185 "[HEARTBEAT] Thread stopped cleanly for agent: {}",
186 self.agent_id
187 );
188 }
189 Err(e) => {
190 warn!(
191 "[HEARTBEAT] Thread join failed for agent {} (thread may have panicked): {:?}",
192 self.agent_id, e
193 );
194 }
195 }
196 }
197
198 debug!(
199 "[HEARTBEAT] Service fully stopped for agent: {}",
200 self.agent_id
201 );
202 }
203
204 fn send_heartbeat(
206 agent_id: &str,
207 socket: &Arc<Mutex<DealerSocket>>,
208 runtime_handle: &Handle,
209 runtime: Option<&Runtime>,
210 reconnect: Option<&ReconnectSpec>,
211 ) -> Result<()> {
212 let message = serde_json::json!({
213 "method": "POST",
214 "path": "/v1/agent/heartbeat",
215 "body": {
216 "agent_id": agent_id,
217 "timestamp": std::time::SystemTime::now()
218 .duration_since(std::time::UNIX_EPOCH)
219 .unwrap()
220 .as_millis() as u64,
221 }
222 });
223
224 let mut socket_guard = socket
225 .lock()
226 .map_err(|e| SdkError::ThreadError(format!("Failed to lock socket: {}", e)))?;
227
228 let message = ZmqMessage::from(message.to_string().into_bytes());
230 block_on_with(runtime_handle, runtime, socket_guard.send(message))
231 .map_err(SdkError::Zmq)?;
232
233 let response = block_on_with(runtime_handle, runtime, async {
235 timeout(Duration::from_millis(1000), socket_guard.recv()).await
236 })
237 .map_err(|_| SdkError::Timeout("Heartbeat response timeout".to_string()))?
238 .map_err(SdkError::Zmq)?;
239 let frames = response.into_vec();
240 let last = frames
241 .last()
242 .ok_or_else(|| SdkError::Other("Heartbeat reply was empty".to_string()))?;
243 let response: serde_json::Value = serde_json::from_slice(last)?;
244
245 let status_value = response.get("status");
251 let is_success = match status_value {
252 Some(serde_json::Value::String(s)) => s == "success" || s == "ok",
253 Some(serde_json::Value::Number(n)) => n.as_u64() == Some(200),
254 _ => false,
255 };
256
257 if is_success {
258 debug!("[HEARTBEAT] ✓ Heartbeat acknowledged for {}", agent_id);
259 Ok(())
260 } else {
261 warn!("[HEARTBEAT] ⚠ Heartbeat rejected: {:?}", response);
262 if Self::is_agent_not_registered(&response) {
263 if let Some(spec) = reconnect {
264 if Self::try_re_register(spec, socket, runtime_handle, runtime).is_ok() {
265 debug!(
266 "[HEARTBEAT] ✓ Auto re-registered agent after heartbeat rejection: {}",
267 agent_id
268 );
269 return Ok(());
270 }
271 }
272 }
273 Err(SdkError::HeartbeatFailed(format!("{:?}", response)))
274 }
275 }
276
277 fn is_agent_not_registered(response: &serde_json::Value) -> bool {
279 let status = response
280 .get("status")
281 .and_then(|v| v.as_u64())
282 .unwrap_or_default();
283 if status != 404 {
284 return false;
285 }
286 response
287 .get("body")
288 .and_then(|b| b.get("error"))
289 .and_then(|e| e.as_str())
290 .map(|s| s.contains("not found in registry") || s.contains("not found"))
291 .unwrap_or(false)
292 }
293
294 fn try_re_register(
296 spec: &ReconnectSpec,
297 socket: &Arc<Mutex<DealerSocket>>,
298 runtime_handle: &Handle,
299 runtime: Option<&Runtime>,
300 ) -> Result<()> {
301 let registration_msg = serde_json::json!({
302 "method": "POST",
303 "path": "/v1/agent/register",
304 "body": {
305 "agent_id": spec.agent_id,
306 "agent_type": spec.agent_type,
307 "capabilities": spec.capabilities,
308 }
309 });
310
311 let mut attempt = 0u32;
312 loop {
313 attempt += 1;
314 let mut socket = socket
315 .lock()
316 .map_err(|e| SdkError::ThreadError(format!("Failed to lock socket: {}", e)))?;
317
318 let message = ZmqMessage::from(registration_msg.to_string().into_bytes());
319 block_on_with(runtime_handle, runtime, socket.send(message)).map_err(SdkError::Zmq)?;
320 let response = block_on_with(runtime_handle, runtime, async {
321 timeout(Duration::from_millis(1000), socket.recv()).await
322 })
323 .map_err(|_| SdkError::Timeout("Registration response timeout".to_string()))?
324 .map_err(SdkError::Zmq)?;
325 let frames = response.into_vec();
326 let last = frames
327 .last()
328 .ok_or_else(|| SdkError::Other("Registration reply was empty".to_string()))?;
329 let response: serde_json::Value = serde_json::from_slice(last)?;
330 let status_code = response
331 .get("status")
332 .and_then(|s| s.as_u64())
333 .unwrap_or(500);
334 if status_code == 200 {
335 return Ok(());
336 }
337
338 if attempt > spec.registration_retries {
339 return Err(SdkError::RegistrationFailed(
340 "Auto re-register failed (exhausted retries)".to_string(),
341 ));
342 }
343
344 std::thread::sleep(Duration::from_millis(spec.retry_backoff_ms));
346 }
347 }
348
349 pub fn is_running(&self) -> bool {
351 self.running.load(Ordering::Relaxed)
352 }
353}
354
355impl Drop for HeartbeatService {
356 fn drop(&mut self) {
357 self.stop();
358 }
359}
360
361#[cfg(test)]
362mod tests {
363 #[test]
364 fn test_heartbeat_service_lifecycle() {
365 }
371}