1use crate::{AmiError, AmiEvent, Manager, ManagerOptions};
54use serde::{Deserialize, Serialize};
55use std::sync::atomic::{AtomicU64, Ordering};
56use std::sync::Arc;
57use std::time::{Duration, Instant};
58use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
59use tokio_stream::{Stream, StreamExt};
60
61const RECONNECTION_DELAY_SECONDS: u64 = 5;
63
64#[derive(Debug, Clone, Default)]
66pub struct ResilientMetrics {
67 pub reconnection_attempts: Arc<AtomicU64>,
68 pub successful_reconnections: Arc<AtomicU64>,
69 pub failed_reconnections: Arc<AtomicU64>,
70 pub connection_lost_events: Arc<AtomicU64>,
71 pub last_reconnection_duration_ms: Arc<AtomicU64>,
72}
73
74impl ResilientMetrics {
75 pub fn new() -> Self {
76 Self::default()
77 }
78
79 pub fn record_reconnection_attempt(&self) {
80 self.reconnection_attempts.fetch_add(1, Ordering::Relaxed);
81 }
82
83 pub fn record_successful_reconnection(&self, duration: Duration) {
84 self.successful_reconnections
85 .fetch_add(1, Ordering::Relaxed);
86 self.last_reconnection_duration_ms
87 .store(duration.as_millis() as u64, Ordering::Relaxed);
88 }
89
90 pub fn record_failed_reconnection(&self) {
91 self.failed_reconnections.fetch_add(1, Ordering::Relaxed);
92 }
93
94 pub fn record_connection_lost(&self) {
95 self.connection_lost_events.fetch_add(1, Ordering::Relaxed);
96 }
97
98 pub fn snapshot(&self) -> (u64, u64, u64, u64, u64) {
100 (
101 self.reconnection_attempts.load(Ordering::Relaxed),
102 self.successful_reconnections.load(Ordering::Relaxed),
103 self.failed_reconnections.load(Ordering::Relaxed),
104 self.connection_lost_events.load(Ordering::Relaxed),
105 self.last_reconnection_duration_ms.load(Ordering::Relaxed),
106 )
107 }
108}
109
110#[derive(Debug, Clone, Serialize, Deserialize)]
112pub struct ResilientOptions {
113 pub manager_options: ManagerOptions,
115 pub buffer_size: usize,
117 pub enable_heartbeat: bool,
119 pub enable_watchdog: bool,
121 pub heartbeat_interval: u64,
123 pub watchdog_interval: u64,
125 pub max_retries: u32,
130 #[serde(skip)]
132 pub metrics: Option<ResilientMetrics>,
133 #[serde(skip)]
135 pub cumulative_attempts_counter: Option<Arc<AtomicU64>>,
136}
137
138impl ResilientOptions {
139 pub fn with_global_counter(mut self) -> Self {
142 self.cumulative_attempts_counter = Some(Arc::new(AtomicU64::new(0)));
143 self
144 }
145}
146
147impl Default for ResilientOptions {
148 fn default() -> Self {
149 Self {
150 manager_options: ManagerOptions {
151 port: 5038,
152 host: "127.0.0.1".to_string(),
153 username: "admin".to_string(),
154 password: "admin".to_string(),
155 events: true,
156 },
157 buffer_size: 2048,
158 enable_heartbeat: true,
159 enable_watchdog: true,
160 heartbeat_interval: 30,
161 watchdog_interval: 1,
162 max_retries: 3,
163 metrics: None,
164 cumulative_attempts_counter: None,
165 }
166 }
167}
168
169pub async fn connect_resilient(options: ResilientOptions) -> Result<Manager, AmiError> {
174 let mut manager = Manager::new_with_buffer(options.buffer_size);
175 let instance_id = manager.inner.lock().await.instance_id.clone();
176
177 log::debug!(
178 "[{}] Connecting resilient AMI manager to '{}'@{}:{}",
179 instance_id,
180 options.manager_options.username,
181 options.manager_options.host,
182 options.manager_options.port
183 );
184
185 manager
187 .connect_and_login(options.manager_options.clone())
188 .await?;
189
190 if options.enable_heartbeat {
192 log::debug!("[{instance_id}] Starting heartbeat for resilient AMI manager");
193 manager
194 .start_heartbeat_with_interval(options.heartbeat_interval)
195 .await?;
196 }
197
198 if options.enable_watchdog {
200 log::debug!("[{instance_id}] Starting watchdog for resilient AMI manager");
201 manager
202 .start_watchdog_with_interval(options.manager_options, options.watchdog_interval)
203 .await?;
204 }
205
206 log::debug!("[{instance_id}] Resilient AMI manager connected successfully");
207 Ok(manager)
208}
209
210pub async fn infinite_events_stream(
216 options: ResilientOptions,
217) -> Result<impl Stream<Item = Result<AmiEvent, AmiError>>, AmiError> {
218 let manager = connect_resilient(options.clone()).await?;
219 Ok(create_infinite_stream(manager, options))
220}
221
222fn create_infinite_stream(
223 manager: Manager,
224 options: ResilientOptions,
225) -> impl Stream<Item = Result<AmiEvent, AmiError>> {
226 async_stream::stream! {
227 let stream_id = uuid::Uuid::new_v4().to_string()[..8].to_string();
229 log::debug!("Creating new resilient stream instance [{}]", stream_id);
230
231 let mut current_manager = manager;
232 let mut retry_count = 0;
233
234 let cumulative_counter = options.cumulative_attempts_counter
236 .clone()
237 .unwrap_or_else(|| Arc::new(AtomicU64::new(0)));
238
239 loop {
240 let mut event_stream = current_manager.all_events_stream().await;
241
242 loop {
243 match event_stream.next().await {
244 Some(Ok(event)) => {
245 retry_count = 0;
247
248 if let AmiEvent::InternalConnectionLost { .. } = &event {
250 if let Some(ref metrics) = options.metrics {
252 metrics.record_connection_lost();
253 }
254 yield Ok(event);
256 break;
257 } else {
258 yield Ok(event);
259 }
260 }
261 Some(Err(BroadcastStreamRecvError::Lagged(count))) => {
262 log::debug!("Event stream lagged by {count} events, resubscribing");
263 event_stream = current_manager.all_events_stream().await;
265 continue;
266 }
267 None => {
268 log::debug!("Event stream ended, attempting reconnection");
270 break;
271 }
272 }
273 }
274
275 'reconnect_loop: loop {
277 let reconnection_start = Instant::now();
279
280 if let Some(ref metrics) = options.metrics {
282 metrics.record_reconnection_attempt();
283 }
284
285 retry_count += 1;
287 let cumulative_attempts = cumulative_counter.fetch_add(1, Ordering::SeqCst) + 1;
288
289 let retry_delay = if retry_count <= options.max_retries {
291 0 } else {
293 RECONNECTION_DELAY_SECONDS };
295
296 log::debug!(
298 "[{}] Reconnection attempt #{}, cumulative #{}, delay {}s (max_retries={})",
299 stream_id,
300 retry_count,
301 cumulative_attempts,
302 retry_delay,
303 options.max_retries
304 );
305
306 log::debug!("[{stream_id}] Attempting to reconnect now (attempt #{retry_count}, cumulative #{cumulative_attempts})");
308 match connect_resilient(options.clone()).await {
309 Ok(new_manager) => {
310 let reconnection_duration = reconnection_start.elapsed();
311 if let Some(ref metrics) = options.metrics {
313 metrics.record_successful_reconnection(reconnection_duration);
314 }
315 log::debug!(
316 "[{}] Successfully reconnected to AMI on attempt #{}, cumulative #{} (total time since first failure: {:.1}s)",
317 stream_id,
318 retry_count,
319 cumulative_attempts,
320 reconnection_duration.as_secs_f64()
321 );
322 current_manager = new_manager;
323 retry_count = 0;
324 break 'reconnect_loop; }
327 Err(e) => {
328 if let Some(ref metrics) = options.metrics {
330 metrics.record_failed_reconnection();
331 }
332 log::debug!(
333 "[{stream_id}] Reconnection attempt #{retry_count} failed: {e}, retrying in {retry_delay} seconds"
334 );
335
336 if retry_delay > 0 {
338 let sleep_start = std::time::Instant::now();
339 tokio::time::sleep(Duration::from_secs(retry_delay)).await;
340 let sleep_duration = sleep_start.elapsed();
341 log::debug!(
342 "[{}] Retry delay completed after {:.1}s, starting next attempt...",
343 stream_id,
344 sleep_duration.as_secs_f64()
345 );
346 }
347
348 if retry_count > options.max_retries {
350 log::debug!(
351 "[{}] Completed retry cycle ({} immediate + 1 delayed), resetting counter",
352 stream_id,
353 options.max_retries
354 );
355 retry_count = 0;
356 }
357
358 log::trace!("[{stream_id}] Continuing reconnection loop for next attempt");
360 }
361 }
362 } } } }
366
367#[cfg(test)]
368mod tests {
369 use super::*;
370
371 #[test]
372 fn test_resilient_options_default() {
373 let opts = ResilientOptions::default();
374 assert_eq!(opts.buffer_size, 2048);
375 assert!(opts.enable_heartbeat);
376 assert!(opts.enable_watchdog);
377 assert_eq!(opts.heartbeat_interval, 30);
378 assert_eq!(opts.watchdog_interval, 1);
379 }
380
381 #[test]
382 fn test_resilient_options_clone() {
383 let opts = ResilientOptions {
384 manager_options: ManagerOptions {
385 port: 5038,
386 host: "localhost".to_string(),
387 username: "test".to_string(),
388 password: "test".to_string(),
389 events: true,
390 },
391 buffer_size: 1024,
392 enable_heartbeat: false,
393 enable_watchdog: false,
394 heartbeat_interval: 60,
395 watchdog_interval: 2,
396 max_retries: 3,
397 metrics: None,
398 cumulative_attempts_counter: None,
399 };
400 let opts2 = opts.clone();
401 assert_eq!(opts.buffer_size, opts2.buffer_size);
402 assert_eq!(opts.enable_heartbeat, opts2.enable_heartbeat);
403 assert_eq!(opts.enable_watchdog, opts2.enable_watchdog);
404 }
405
406 #[test]
407 fn test_resilient_options_serialization() {
408 let opts = ResilientOptions::default();
409
410 let json = serde_json::to_string(&opts).expect("Failed to serialize");
412 let deserialized: ResilientOptions =
413 serde_json::from_str(&json).expect("Failed to deserialize");
414
415 assert_eq!(opts.buffer_size, deserialized.buffer_size);
416 assert_eq!(opts.enable_heartbeat, deserialized.enable_heartbeat);
417 assert_eq!(opts.enable_watchdog, deserialized.enable_watchdog);
418 assert_eq!(opts.heartbeat_interval, deserialized.heartbeat_interval);
419 assert_eq!(opts.watchdog_interval, deserialized.watchdog_interval);
420 }
421
422 #[test]
423 fn test_max_retries_behavior() {
424 let opts = ResilientOptions {
426 manager_options: ManagerOptions {
427 port: 5038,
428 host: "test".to_string(),
429 username: "test".to_string(),
430 password: "test".to_string(),
431 events: true,
432 },
433 buffer_size: 1024,
434 enable_heartbeat: false,
435 enable_watchdog: false,
436 heartbeat_interval: 30,
437 watchdog_interval: 1,
438 max_retries: 5, metrics: None,
440 cumulative_attempts_counter: None,
441 };
442
443 assert_eq!(opts.max_retries, 5);
444
445 let default_opts = ResilientOptions::default();
447 assert_eq!(default_opts.max_retries, 3);
448 }
449
450 #[tokio::test]
451 async fn test_connect_resilient_invalid_connection() {
452 let opts = ResilientOptions {
454 manager_options: ManagerOptions {
455 port: 65534, host: "nonexistent.invalid".to_string(),
457 username: "test".to_string(),
458 password: "test".to_string(),
459 events: true,
460 },
461 buffer_size: 1024,
462 enable_heartbeat: false, enable_watchdog: false, heartbeat_interval: 30,
465 watchdog_interval: 1,
466 max_retries: 3,
467 metrics: None,
468 cumulative_attempts_counter: None,
469 };
470
471 let result = connect_resilient(opts).await;
473 assert!(result.is_err());
474 }
475
476 #[tokio::test]
477 async fn test_infinite_events_stream_creation() {
478 let opts = ResilientOptions {
480 manager_options: ManagerOptions {
481 port: 65535, host: "nonexistent.invalid".to_string(),
483 username: "test".to_string(),
484 password: "test".to_string(),
485 events: true,
486 },
487 buffer_size: 1024,
488 enable_heartbeat: false,
489 enable_watchdog: false,
490 heartbeat_interval: 30,
491 watchdog_interval: 1,
492 max_retries: 3,
493 metrics: None,
494 cumulative_attempts_counter: None,
495 };
496
497 let result = infinite_events_stream(opts).await;
499 assert!(result.is_err());
500 }
501}