asterisk_manager/
resilient.rs

1//! Resilient AMI connection module
2//!
3//! This module provides resilient connection management for Asterisk AMI,
4//! including automatic reconnection, heartbeat monitoring, and infinite event streams.
5//!
6//! # Example Usage
7//!
8//! ```rust,no_run
9//! use asterisk_manager::resilient::{ResilientOptions, connect_resilient, infinite_events_stream};
10//! use asterisk_manager::ManagerOptions;
11//! use tokio_stream::StreamExt;
12//!
13//! #[tokio::main]
14//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
15//!     let resilient_options = ResilientOptions {
16//!         manager_options: ManagerOptions {
17//!             port: 5038,
18//!             host: "127.0.0.1".to_string(),
19//!             username: "admin".to_string(),
20//!             password: "password".to_string(),
21//!             events: true,
22//!         },
23//!         buffer_size: 2048,
24//!         enable_heartbeat: true,
25//!         enable_watchdog: true,
26//!         heartbeat_interval: 30,
27//!         watchdog_interval: 1,
28//!         max_retries: 3,
29//!         metrics: None,
30//!         cumulative_attempts_counter: None,
31//!     };
32//!
33//!     // Option 1: Connect with resilient features
34//!     let manager = connect_resilient(resilient_options.clone()).await?;
35//!     
36//!     // Option 2: Create an infinite event stream that handles reconnection
37//!     let mut event_stream = infinite_events_stream(resilient_options).await?;
38//!     
39//!     // Pin the stream for polling
40//!     tokio::pin!(event_stream);
41//!     
42//!     while let Some(event_result) = event_stream.next().await {
43//!         match event_result {
44//!             Ok(event) => println!("Received event: {:?}", event),
45//!             Err(e) => println!("Error receiving event: {:?}", e),
46//!         }
47//!     }
48//!     
49//!     Ok(())
50//! }
51//! ```
52
53use crate::{AmiError, AmiEvent, Manager, ManagerOptions};
54use serde::{Deserialize, Serialize};
55use std::sync::atomic::{AtomicU64, Ordering};
56use std::sync::Arc;
57use std::time::{Duration, Instant};
58use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
59use tokio_stream::{Stream, StreamExt};
60
61/// Fixed delay in seconds between reconnection attempts after exceeding max_retries
62const RECONNECTION_DELAY_SECONDS: u64 = 5;
63
64/// Simple metrics for resilient connections (optional instrumentation)
65#[derive(Debug, Clone, Default)]
66pub struct ResilientMetrics {
67    pub reconnection_attempts: Arc<AtomicU64>,
68    pub successful_reconnections: Arc<AtomicU64>,
69    pub failed_reconnections: Arc<AtomicU64>,
70    pub connection_lost_events: Arc<AtomicU64>,
71    pub last_reconnection_duration_ms: Arc<AtomicU64>,
72}
73
74impl ResilientMetrics {
75    pub fn new() -> Self {
76        Self::default()
77    }
78
79    pub fn record_reconnection_attempt(&self) {
80        self.reconnection_attempts.fetch_add(1, Ordering::Relaxed);
81    }
82
83    pub fn record_successful_reconnection(&self, duration: Duration) {
84        self.successful_reconnections
85            .fetch_add(1, Ordering::Relaxed);
86        self.last_reconnection_duration_ms
87            .store(duration.as_millis() as u64, Ordering::Relaxed);
88    }
89
90    pub fn record_failed_reconnection(&self) {
91        self.failed_reconnections.fetch_add(1, Ordering::Relaxed);
92    }
93
94    pub fn record_connection_lost(&self) {
95        self.connection_lost_events.fetch_add(1, Ordering::Relaxed);
96    }
97
98    /// Get current metrics as a snapshot
99    pub fn snapshot(&self) -> (u64, u64, u64, u64, u64) {
100        (
101            self.reconnection_attempts.load(Ordering::Relaxed),
102            self.successful_reconnections.load(Ordering::Relaxed),
103            self.failed_reconnections.load(Ordering::Relaxed),
104            self.connection_lost_events.load(Ordering::Relaxed),
105            self.last_reconnection_duration_ms.load(Ordering::Relaxed),
106        )
107    }
108}
109
110/// Configuration options for resilient AMI connections
111#[derive(Debug, Clone, Serialize, Deserialize)]
112pub struct ResilientOptions {
113    /// Basic connection options
114    pub manager_options: ManagerOptions,
115    /// Buffer size for event broadcaster (default: 2048)
116    pub buffer_size: usize,
117    /// Enable heartbeat monitoring (default: true)
118    pub enable_heartbeat: bool,
119    /// Enable automatic reconnection watchdog (default: true)
120    pub enable_watchdog: bool,
121    /// Heartbeat interval in seconds (default: 30)
122    pub heartbeat_interval: u64,
123    /// Watchdog check interval in seconds (default: 1)
124    pub watchdog_interval: u64,
125    /// Maximum number of immediate reconnection attempts before adding delays.
126    /// Retry behavior: first `max_retries` attempts are immediate (no delay),
127    /// followed by one delayed attempt, then the counter resets and the cycle repeats.
128    /// Default: 3.
129    pub max_retries: u32,
130    /// Optional metrics collection for monitoring reconnection behavior
131    #[serde(skip)]
132    pub metrics: Option<ResilientMetrics>,
133    /// Global cumulative attempts counter shared across stream instances
134    #[serde(skip)]
135    pub cumulative_attempts_counter: Option<Arc<AtomicU64>>,
136}
137
138impl ResilientOptions {
139    /// Create a new ResilientOptions with a shared global cumulative attempts counter.
140    /// This ensures that cumulative attempt counts persist across stream recreation.
141    pub fn with_global_counter(mut self) -> Self {
142        self.cumulative_attempts_counter = Some(Arc::new(AtomicU64::new(0)));
143        self
144    }
145}
146
147impl Default for ResilientOptions {
148    fn default() -> Self {
149        Self {
150            manager_options: ManagerOptions {
151                port: 5038,
152                host: "127.0.0.1".to_string(),
153                username: "admin".to_string(),
154                password: "admin".to_string(),
155                events: true,
156            },
157            buffer_size: 2048,
158            enable_heartbeat: true,
159            enable_watchdog: true,
160            heartbeat_interval: 30,
161            watchdog_interval: 1,
162            max_retries: 3,
163            metrics: None,
164            cumulative_attempts_counter: None,
165        }
166    }
167}
168
169/// Connect a manager with resilient features enabled
170///
171/// This function creates a Manager instance with the specified buffer size,
172/// connects it, and optionally starts heartbeat and watchdog monitoring.
173pub async fn connect_resilient(options: ResilientOptions) -> Result<Manager, AmiError> {
174    let mut manager = Manager::new_with_buffer(options.buffer_size);
175
176    // Connect and login
177    manager
178        .connect_and_login(options.manager_options.clone())
179        .await?;
180
181    // Start heartbeat if enabled
182    if options.enable_heartbeat {
183        manager
184            .start_heartbeat_with_interval(options.heartbeat_interval)
185            .await?;
186    }
187
188    // Start watchdog if enabled
189    if options.enable_watchdog {
190        manager
191            .start_watchdog_with_interval(options.manager_options, options.watchdog_interval)
192            .await?;
193    }
194
195    Ok(manager)
196}
197
198/// Create an infinite events stream that automatically handles reconnection and lag
199///
200/// Once successfully created, this stream never ends on its own and will attempt to
201/// resubscribe to the broadcast channel on lag errors and recreate the stream on connection losses.
202/// However, if the initial connection fails, the stream will not be created and an error will be returned.
203pub async fn infinite_events_stream(
204    options: ResilientOptions,
205) -> Result<impl Stream<Item = Result<AmiEvent, AmiError>>, AmiError> {
206    let manager = connect_resilient(options.clone()).await?;
207    Ok(create_infinite_stream(manager, options))
208}
209
210fn create_infinite_stream(
211    manager: Manager,
212    options: ResilientOptions,
213) -> impl Stream<Item = Result<AmiEvent, AmiError>> {
214    async_stream::stream! {
215        // Generate a unique ID for this stream instance to track lifecycle
216        let stream_id = uuid::Uuid::new_v4().to_string()[..8].to_string();
217        log::debug!("Creating new resilient stream instance [{}]", stream_id);
218
219        let mut current_manager = manager;
220        let mut retry_count = 0;
221
222        // Use global counter if provided, otherwise create a local one
223        let cumulative_counter = options.cumulative_attempts_counter
224            .clone()
225            .unwrap_or_else(|| Arc::new(AtomicU64::new(0)));
226
227        loop {
228            let mut event_stream = current_manager.all_events_stream().await;
229
230            loop {
231                match event_stream.next().await {
232                    Some(Ok(event)) => {
233                        // Reset retry count on successful event
234                        retry_count = 0;
235
236                        // Check for internal connection lost events
237                        if let AmiEvent::InternalConnectionLost { .. } = &event {
238                            // Record connection lost event in metrics if available
239                            if let Some(ref metrics) = options.metrics {
240                                metrics.record_connection_lost();
241                            }
242                            // Connection lost, break to outer loop to reconnect
243                            yield Ok(event);
244                            break;
245                        } else {
246                            yield Ok(event);
247                        }
248                    }
249                    Some(Err(BroadcastStreamRecvError::Lagged(count))) => {
250                        log::debug!("Event stream lagged by {} events, resubscribing", count);
251                        // Stream lagged, resubscribe to get a fresh receiver
252                        event_stream = current_manager.all_events_stream().await;
253                        continue;
254                    }
255                    None => {
256                        // Stream ended, try to reconnect
257                        log::debug!("Event stream ended, attempting reconnection");
258                        break;
259                    }
260                }
261            }
262
263            // Reconnection attempts with max_retries cycles
264            'reconnect_loop: loop {
265                // Start timing reconnection attempts
266                let reconnection_start = Instant::now();
267
268                // Record attempt in metrics if available
269                if let Some(ref metrics) = options.metrics {
270                    metrics.record_reconnection_attempt();
271                }
272
273                // Attempt reconnection
274                retry_count += 1;
275                let cumulative_attempts = cumulative_counter.fetch_add(1, Ordering::SeqCst) + 1;
276
277                // Determine delay based on retry_count and max_retries
278                let retry_delay = if retry_count <= options.max_retries {
279                    0 // Immediate retry for the first max_retries attempts
280                } else {
281                    RECONNECTION_DELAY_SECONDS // Fixed delay after exceeding max_retries
282                };
283
284                // Log attempt info - use DEBUG for library internal logging
285                log::debug!(
286                    "[{}] Reconnection attempt #{}, cumulative #{}, delay {}s (max_retries={})",
287                    stream_id,
288                    retry_count,
289                    cumulative_attempts,
290                    retry_delay,
291                    options.max_retries
292                );
293
294                // Try to reconnect and log result with attempt number
295                log::debug!("[{}] Attempting to reconnect now (attempt #{}, cumulative #{})", stream_id, retry_count, cumulative_attempts);
296                match connect_resilient(options.clone()).await {
297                    Ok(new_manager) => {
298                        let reconnection_duration = reconnection_start.elapsed();
299                        // Record successful reconnection in metrics
300                        if let Some(ref metrics) = options.metrics {
301                            metrics.record_successful_reconnection(reconnection_duration);
302                        }
303                        log::debug!(
304                            "[{}] Successfully reconnected to AMI on attempt #{}, cumulative #{} (total time since first failure: {:.1}s)",
305                            stream_id,
306                            retry_count,
307                            cumulative_attempts,
308                            reconnection_duration.as_secs_f64()
309                        );
310                        current_manager = new_manager;
311                        retry_count = 0;
312                        // Note: we don't reset cumulative_attempts here to track total attempts across all failures
313                        break 'reconnect_loop; // Exit reconnection loop, go back to event streaming
314                    }
315                    Err(e) => {
316                        // Record failed reconnection in metrics
317                        if let Some(ref metrics) = options.metrics {
318                            metrics.record_failed_reconnection();
319                        }
320                        log::debug!(
321                            "[{}] Reconnection attempt #{} failed: {}, retrying in {} seconds",
322                            stream_id,
323                            retry_count,
324                            e,
325                            retry_delay
326                        );
327
328                        // Apply delay if needed
329                        if retry_delay > 0 {
330                            let sleep_start = std::time::Instant::now();
331                            tokio::time::sleep(Duration::from_secs(retry_delay)).await;
332                            let sleep_duration = sleep_start.elapsed();
333                            log::debug!(
334                                "[{}] Retry delay completed after {:.1}s, starting next attempt...",
335                                stream_id,
336                                sleep_duration.as_secs_f64()
337                            );
338                        }
339
340                        // Reset retry_count after going through a full cycle (max_retries immediate + 1 delayed attempt)
341                        if retry_count > options.max_retries {
342                            log::debug!(
343                                "[{}] Completed retry cycle ({} immediate + 1 delayed), resetting counter",
344                                stream_id,
345                                options.max_retries
346                            );
347                            retry_count = 0;
348                        }
349
350                        // Continue the reconnection loop (will loop back to retry)
351                        log::trace!("[{}] Continuing reconnection loop for next attempt", stream_id);
352                    }
353                }
354            } // End of reconnection loop
355        } // End of main event loop
356    } // End of async_stream::stream!
357}
358
359#[cfg(test)]
360mod tests {
361    use super::*;
362
363    #[test]
364    fn test_resilient_options_default() {
365        let opts = ResilientOptions::default();
366        assert_eq!(opts.buffer_size, 2048);
367        assert!(opts.enable_heartbeat);
368        assert!(opts.enable_watchdog);
369        assert_eq!(opts.heartbeat_interval, 30);
370        assert_eq!(opts.watchdog_interval, 1);
371    }
372
373    #[test]
374    fn test_resilient_options_clone() {
375        let opts = ResilientOptions {
376            manager_options: ManagerOptions {
377                port: 5038,
378                host: "localhost".to_string(),
379                username: "test".to_string(),
380                password: "test".to_string(),
381                events: true,
382            },
383            buffer_size: 1024,
384            enable_heartbeat: false,
385            enable_watchdog: false,
386            heartbeat_interval: 60,
387            watchdog_interval: 2,
388            max_retries: 3,
389            metrics: None,
390            cumulative_attempts_counter: None,
391        };
392        let opts2 = opts.clone();
393        assert_eq!(opts.buffer_size, opts2.buffer_size);
394        assert_eq!(opts.enable_heartbeat, opts2.enable_heartbeat);
395        assert_eq!(opts.enable_watchdog, opts2.enable_watchdog);
396    }
397
398    #[test]
399    fn test_resilient_options_serialization() {
400        let opts = ResilientOptions::default();
401
402        // Test that it can be serialized and deserialized
403        let json = serde_json::to_string(&opts).expect("Failed to serialize");
404        let deserialized: ResilientOptions =
405            serde_json::from_str(&json).expect("Failed to deserialize");
406
407        assert_eq!(opts.buffer_size, deserialized.buffer_size);
408        assert_eq!(opts.enable_heartbeat, deserialized.enable_heartbeat);
409        assert_eq!(opts.enable_watchdog, deserialized.enable_watchdog);
410        assert_eq!(opts.heartbeat_interval, deserialized.heartbeat_interval);
411        assert_eq!(opts.watchdog_interval, deserialized.watchdog_interval);
412    }
413
414    #[test]
415    fn test_max_retries_behavior() {
416        // Test that max_retries is properly included in ResilientOptions
417        let opts = ResilientOptions {
418            manager_options: ManagerOptions {
419                port: 5038,
420                host: "test".to_string(),
421                username: "test".to_string(),
422                password: "test".to_string(),
423                events: true,
424            },
425            buffer_size: 1024,
426            enable_heartbeat: false,
427            enable_watchdog: false,
428            heartbeat_interval: 30,
429            watchdog_interval: 1,
430            max_retries: 5, // Test custom value
431            metrics: None,
432            cumulative_attempts_counter: None,
433        };
434
435        assert_eq!(opts.max_retries, 5);
436
437        // Test that default is 3
438        let default_opts = ResilientOptions::default();
439        assert_eq!(default_opts.max_retries, 3);
440    }
441
442    #[tokio::test]
443    async fn test_connect_resilient_invalid_connection() {
444        // Test connect_resilient with invalid connection parameters
445        let opts = ResilientOptions {
446            manager_options: ManagerOptions {
447                port: 65534, // Invalid port
448                host: "nonexistent.invalid".to_string(),
449                username: "test".to_string(),
450                password: "test".to_string(),
451                events: true,
452            },
453            buffer_size: 1024,
454            enable_heartbeat: false, // Disable to avoid heartbeat issues
455            enable_watchdog: false,  // Disable to avoid watchdog reconnection
456            heartbeat_interval: 30,
457            watchdog_interval: 1,
458            max_retries: 3,
459            metrics: None,
460            cumulative_attempts_counter: None,
461        };
462
463        // Should fail to connect
464        let result = connect_resilient(opts).await;
465        assert!(result.is_err());
466    }
467
468    #[tokio::test]
469    async fn test_infinite_events_stream_creation() {
470        // Test that infinite_events_stream can be created with valid options
471        let opts = ResilientOptions {
472            manager_options: ManagerOptions {
473                port: 65535, // Invalid port to test error handling
474                host: "nonexistent.invalid".to_string(),
475                username: "test".to_string(),
476                password: "test".to_string(),
477                events: true,
478            },
479            buffer_size: 1024,
480            enable_heartbeat: false,
481            enable_watchdog: false,
482            heartbeat_interval: 30,
483            watchdog_interval: 1,
484            max_retries: 3,
485            metrics: None,
486            cumulative_attempts_counter: None,
487        };
488
489        // Should fail to create due to connection failure
490        let result = infinite_events_stream(opts).await;
491        assert!(result.is_err());
492    }
493}