1use crate::{AmiError, AmiEvent, Manager, ManagerOptions};
54use serde::{Deserialize, Serialize};
55use std::sync::atomic::{AtomicU64, Ordering};
56use std::sync::Arc;
57use std::time::{Duration, Instant};
58use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
59use tokio_stream::{Stream, StreamExt};
60
61const RECONNECTION_DELAY_SECONDS: u64 = 5;
63
64#[derive(Debug, Clone, Default)]
66pub struct ResilientMetrics {
67 pub reconnection_attempts: Arc<AtomicU64>,
68 pub successful_reconnections: Arc<AtomicU64>,
69 pub failed_reconnections: Arc<AtomicU64>,
70 pub connection_lost_events: Arc<AtomicU64>,
71 pub last_reconnection_duration_ms: Arc<AtomicU64>,
72}
73
74impl ResilientMetrics {
75 pub fn new() -> Self {
76 Self::default()
77 }
78
79 pub fn record_reconnection_attempt(&self) {
80 self.reconnection_attempts.fetch_add(1, Ordering::Relaxed);
81 }
82
83 pub fn record_successful_reconnection(&self, duration: Duration) {
84 self.successful_reconnections
85 .fetch_add(1, Ordering::Relaxed);
86 self.last_reconnection_duration_ms
87 .store(duration.as_millis() as u64, Ordering::Relaxed);
88 }
89
90 pub fn record_failed_reconnection(&self) {
91 self.failed_reconnections.fetch_add(1, Ordering::Relaxed);
92 }
93
94 pub fn record_connection_lost(&self) {
95 self.connection_lost_events.fetch_add(1, Ordering::Relaxed);
96 }
97
98 pub fn snapshot(&self) -> (u64, u64, u64, u64, u64) {
100 (
101 self.reconnection_attempts.load(Ordering::Relaxed),
102 self.successful_reconnections.load(Ordering::Relaxed),
103 self.failed_reconnections.load(Ordering::Relaxed),
104 self.connection_lost_events.load(Ordering::Relaxed),
105 self.last_reconnection_duration_ms.load(Ordering::Relaxed),
106 )
107 }
108}
109
110#[derive(Debug, Clone, Serialize, Deserialize)]
112pub struct ResilientOptions {
113 pub manager_options: ManagerOptions,
115 pub buffer_size: usize,
117 pub enable_heartbeat: bool,
119 pub enable_watchdog: bool,
121 pub heartbeat_interval: u64,
123 pub watchdog_interval: u64,
125 pub max_retries: u32,
130 #[serde(skip)]
132 pub metrics: Option<ResilientMetrics>,
133 #[serde(skip)]
135 pub cumulative_attempts_counter: Option<Arc<AtomicU64>>,
136}
137
138impl ResilientOptions {
139 pub fn with_global_counter(mut self) -> Self {
142 self.cumulative_attempts_counter = Some(Arc::new(AtomicU64::new(0)));
143 self
144 }
145}
146
147impl Default for ResilientOptions {
148 fn default() -> Self {
149 Self {
150 manager_options: ManagerOptions {
151 port: 5038,
152 host: "127.0.0.1".to_string(),
153 username: "admin".to_string(),
154 password: "admin".to_string(),
155 events: true,
156 },
157 buffer_size: 2048,
158 enable_heartbeat: true,
159 enable_watchdog: true,
160 heartbeat_interval: 30,
161 watchdog_interval: 1,
162 max_retries: 3,
163 metrics: None,
164 cumulative_attempts_counter: None,
165 }
166 }
167}
168
169pub async fn connect_resilient(options: ResilientOptions) -> Result<Manager, AmiError> {
174 let mut manager = Manager::new_with_buffer(options.buffer_size);
175
176 manager
178 .connect_and_login(options.manager_options.clone())
179 .await?;
180
181 if options.enable_heartbeat {
183 manager
184 .start_heartbeat_with_interval(options.heartbeat_interval)
185 .await?;
186 }
187
188 if options.enable_watchdog {
190 manager
191 .start_watchdog_with_interval(options.manager_options, options.watchdog_interval)
192 .await?;
193 }
194
195 Ok(manager)
196}
197
198pub async fn infinite_events_stream(
204 options: ResilientOptions,
205) -> Result<impl Stream<Item = Result<AmiEvent, AmiError>>, AmiError> {
206 let manager = connect_resilient(options.clone()).await?;
207 Ok(create_infinite_stream(manager, options))
208}
209
210fn create_infinite_stream(
211 manager: Manager,
212 options: ResilientOptions,
213) -> impl Stream<Item = Result<AmiEvent, AmiError>> {
214 async_stream::stream! {
215 let stream_id = uuid::Uuid::new_v4().to_string()[..8].to_string();
217 log::debug!("Creating new resilient stream instance [{}]", stream_id);
218
219 let mut current_manager = manager;
220 let mut retry_count = 0;
221
222 let cumulative_counter = options.cumulative_attempts_counter
224 .clone()
225 .unwrap_or_else(|| Arc::new(AtomicU64::new(0)));
226
227 loop {
228 let mut event_stream = current_manager.all_events_stream().await;
229
230 loop {
231 match event_stream.next().await {
232 Some(Ok(event)) => {
233 retry_count = 0;
235
236 if let AmiEvent::InternalConnectionLost { .. } = &event {
238 if let Some(ref metrics) = options.metrics {
240 metrics.record_connection_lost();
241 }
242 yield Ok(event);
244 break;
245 } else {
246 yield Ok(event);
247 }
248 }
249 Some(Err(BroadcastStreamRecvError::Lagged(count))) => {
250 log::debug!("Event stream lagged by {} events, resubscribing", count);
251 event_stream = current_manager.all_events_stream().await;
253 continue;
254 }
255 None => {
256 log::debug!("Event stream ended, attempting reconnection");
258 break;
259 }
260 }
261 }
262
263 'reconnect_loop: loop {
265 let reconnection_start = Instant::now();
267
268 if let Some(ref metrics) = options.metrics {
270 metrics.record_reconnection_attempt();
271 }
272
273 retry_count += 1;
275 let cumulative_attempts = cumulative_counter.fetch_add(1, Ordering::SeqCst) + 1;
276
277 let retry_delay = if retry_count <= options.max_retries {
279 0 } else {
281 RECONNECTION_DELAY_SECONDS };
283
284 log::debug!(
286 "[{}] Reconnection attempt #{}, cumulative #{}, delay {}s (max_retries={})",
287 stream_id,
288 retry_count,
289 cumulative_attempts,
290 retry_delay,
291 options.max_retries
292 );
293
294 log::debug!("[{}] Attempting to reconnect now (attempt #{}, cumulative #{})", stream_id, retry_count, cumulative_attempts);
296 match connect_resilient(options.clone()).await {
297 Ok(new_manager) => {
298 let reconnection_duration = reconnection_start.elapsed();
299 if let Some(ref metrics) = options.metrics {
301 metrics.record_successful_reconnection(reconnection_duration);
302 }
303 log::debug!(
304 "[{}] Successfully reconnected to AMI on attempt #{}, cumulative #{} (total time since first failure: {:.1}s)",
305 stream_id,
306 retry_count,
307 cumulative_attempts,
308 reconnection_duration.as_secs_f64()
309 );
310 current_manager = new_manager;
311 retry_count = 0;
312 break 'reconnect_loop; }
315 Err(e) => {
316 if let Some(ref metrics) = options.metrics {
318 metrics.record_failed_reconnection();
319 }
320 log::debug!(
321 "[{}] Reconnection attempt #{} failed: {}, retrying in {} seconds",
322 stream_id,
323 retry_count,
324 e,
325 retry_delay
326 );
327
328 if retry_delay > 0 {
330 let sleep_start = std::time::Instant::now();
331 tokio::time::sleep(Duration::from_secs(retry_delay)).await;
332 let sleep_duration = sleep_start.elapsed();
333 log::debug!(
334 "[{}] Retry delay completed after {:.1}s, starting next attempt...",
335 stream_id,
336 sleep_duration.as_secs_f64()
337 );
338 }
339
340 if retry_count > options.max_retries {
342 log::debug!(
343 "[{}] Completed retry cycle ({} immediate + 1 delayed), resetting counter",
344 stream_id,
345 options.max_retries
346 );
347 retry_count = 0;
348 }
349
350 log::trace!("[{}] Continuing reconnection loop for next attempt", stream_id);
352 }
353 }
354 } } } }
358
359#[cfg(test)]
360mod tests {
361 use super::*;
362
363 #[test]
364 fn test_resilient_options_default() {
365 let opts = ResilientOptions::default();
366 assert_eq!(opts.buffer_size, 2048);
367 assert!(opts.enable_heartbeat);
368 assert!(opts.enable_watchdog);
369 assert_eq!(opts.heartbeat_interval, 30);
370 assert_eq!(opts.watchdog_interval, 1);
371 }
372
373 #[test]
374 fn test_resilient_options_clone() {
375 let opts = ResilientOptions {
376 manager_options: ManagerOptions {
377 port: 5038,
378 host: "localhost".to_string(),
379 username: "test".to_string(),
380 password: "test".to_string(),
381 events: true,
382 },
383 buffer_size: 1024,
384 enable_heartbeat: false,
385 enable_watchdog: false,
386 heartbeat_interval: 60,
387 watchdog_interval: 2,
388 max_retries: 3,
389 metrics: None,
390 cumulative_attempts_counter: None,
391 };
392 let opts2 = opts.clone();
393 assert_eq!(opts.buffer_size, opts2.buffer_size);
394 assert_eq!(opts.enable_heartbeat, opts2.enable_heartbeat);
395 assert_eq!(opts.enable_watchdog, opts2.enable_watchdog);
396 }
397
398 #[test]
399 fn test_resilient_options_serialization() {
400 let opts = ResilientOptions::default();
401
402 let json = serde_json::to_string(&opts).expect("Failed to serialize");
404 let deserialized: ResilientOptions =
405 serde_json::from_str(&json).expect("Failed to deserialize");
406
407 assert_eq!(opts.buffer_size, deserialized.buffer_size);
408 assert_eq!(opts.enable_heartbeat, deserialized.enable_heartbeat);
409 assert_eq!(opts.enable_watchdog, deserialized.enable_watchdog);
410 assert_eq!(opts.heartbeat_interval, deserialized.heartbeat_interval);
411 assert_eq!(opts.watchdog_interval, deserialized.watchdog_interval);
412 }
413
414 #[test]
415 fn test_max_retries_behavior() {
416 let opts = ResilientOptions {
418 manager_options: ManagerOptions {
419 port: 5038,
420 host: "test".to_string(),
421 username: "test".to_string(),
422 password: "test".to_string(),
423 events: true,
424 },
425 buffer_size: 1024,
426 enable_heartbeat: false,
427 enable_watchdog: false,
428 heartbeat_interval: 30,
429 watchdog_interval: 1,
430 max_retries: 5, metrics: None,
432 cumulative_attempts_counter: None,
433 };
434
435 assert_eq!(opts.max_retries, 5);
436
437 let default_opts = ResilientOptions::default();
439 assert_eq!(default_opts.max_retries, 3);
440 }
441
442 #[tokio::test]
443 async fn test_connect_resilient_invalid_connection() {
444 let opts = ResilientOptions {
446 manager_options: ManagerOptions {
447 port: 65534, host: "nonexistent.invalid".to_string(),
449 username: "test".to_string(),
450 password: "test".to_string(),
451 events: true,
452 },
453 buffer_size: 1024,
454 enable_heartbeat: false, enable_watchdog: false, heartbeat_interval: 30,
457 watchdog_interval: 1,
458 max_retries: 3,
459 metrics: None,
460 cumulative_attempts_counter: None,
461 };
462
463 let result = connect_resilient(opts).await;
465 assert!(result.is_err());
466 }
467
468 #[tokio::test]
469 async fn test_infinite_events_stream_creation() {
470 let opts = ResilientOptions {
472 manager_options: ManagerOptions {
473 port: 65535, host: "nonexistent.invalid".to_string(),
475 username: "test".to_string(),
476 password: "test".to_string(),
477 events: true,
478 },
479 buffer_size: 1024,
480 enable_heartbeat: false,
481 enable_watchdog: false,
482 heartbeat_interval: 30,
483 watchdog_interval: 1,
484 max_retries: 3,
485 metrics: None,
486 cumulative_attempts_counter: None,
487 };
488
489 let result = infinite_events_stream(opts).await;
491 assert!(result.is_err());
492 }
493}