asterisk_manager/
resilient.rs

1//! Resilient AMI connection module
2//!
3//! This module provides resilient connection management for Asterisk AMI,
4//! including automatic reconnection, heartbeat monitoring, and infinite event streams.
5//!
6//! # Example Usage
7//!
8//! ```rust,no_run
9//! use asterisk_manager::resilient::{ResilientOptions, connect_resilient, infinite_events_stream};
10//! use asterisk_manager::ManagerOptions;
11//! use tokio_stream::StreamExt;
12//!
13//! #[tokio::main]
14//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
15//!     let resilient_options = ResilientOptions {
16//!         manager_options: ManagerOptions {
17//!             port: 5038,
18//!             host: "127.0.0.1".to_string(),
19//!             username: "admin".to_string(),
20//!             password: "password".to_string(),
21//!             events: true,
22//!         },
23//!         buffer_size: 2048,
24//!         enable_heartbeat: true,
25//!         enable_watchdog: true,
26//!         heartbeat_interval: 30,
27//!         watchdog_interval: 1,
28//!         max_retries: 3,
29//!         metrics: None,
30//!         cumulative_attempts_counter: None,
31//!     };
32//!
33//!     // Option 1: Connect with resilient features
34//!     let manager = connect_resilient(resilient_options.clone()).await?;
35//!     
36//!     // Option 2: Create an infinite event stream that handles reconnection
37//!     let mut event_stream = infinite_events_stream(resilient_options).await?;
38//!     
39//!     // Pin the stream for polling
40//!     tokio::pin!(event_stream);
41//!     
42//!     while let Some(event_result) = event_stream.next().await {
43//!         match event_result {
44//!             Ok(event) => println!("Received event: {:?}", event),
45//!             Err(e) => println!("Error receiving event: {:?}", e),
46//!         }
47//!     }
48//!     
49//!     Ok(())
50//! }
51//! ```
52
53use crate::{AmiError, AmiEvent, Manager, ManagerOptions};
54use serde::{Deserialize, Serialize};
55use std::sync::atomic::{AtomicU64, Ordering};
56use std::sync::Arc;
57use std::time::{Duration, Instant};
58use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
59use tokio_stream::{Stream, StreamExt};
60
61/// Fixed delay in seconds between reconnection attempts after exceeding max_retries
62const RECONNECTION_DELAY_SECONDS: u64 = 5;
63
64/// Simple metrics for resilient connections (optional instrumentation)
65#[derive(Debug, Clone, Default)]
66pub struct ResilientMetrics {
67    pub reconnection_attempts: Arc<AtomicU64>,
68    pub successful_reconnections: Arc<AtomicU64>,
69    pub failed_reconnections: Arc<AtomicU64>,
70    pub connection_lost_events: Arc<AtomicU64>,
71    pub last_reconnection_duration_ms: Arc<AtomicU64>,
72}
73
74impl ResilientMetrics {
75    pub fn new() -> Self {
76        Self::default()
77    }
78
79    pub fn record_reconnection_attempt(&self) {
80        self.reconnection_attempts.fetch_add(1, Ordering::Relaxed);
81    }
82
83    pub fn record_successful_reconnection(&self, duration: Duration) {
84        self.successful_reconnections
85            .fetch_add(1, Ordering::Relaxed);
86        self.last_reconnection_duration_ms
87            .store(duration.as_millis() as u64, Ordering::Relaxed);
88    }
89
90    pub fn record_failed_reconnection(&self) {
91        self.failed_reconnections.fetch_add(1, Ordering::Relaxed);
92    }
93
94    pub fn record_connection_lost(&self) {
95        self.connection_lost_events.fetch_add(1, Ordering::Relaxed);
96    }
97
98    /// Get current metrics as a snapshot
99    pub fn snapshot(&self) -> (u64, u64, u64, u64, u64) {
100        (
101            self.reconnection_attempts.load(Ordering::Relaxed),
102            self.successful_reconnections.load(Ordering::Relaxed),
103            self.failed_reconnections.load(Ordering::Relaxed),
104            self.connection_lost_events.load(Ordering::Relaxed),
105            self.last_reconnection_duration_ms.load(Ordering::Relaxed),
106        )
107    }
108}
109
110/// Configuration options for resilient AMI connections
111#[derive(Debug, Clone, Serialize, Deserialize)]
112pub struct ResilientOptions {
113    /// Basic connection options
114    pub manager_options: ManagerOptions,
115    /// Buffer size for event broadcaster (default: 2048)
116    pub buffer_size: usize,
117    /// Enable heartbeat monitoring (default: true)
118    pub enable_heartbeat: bool,
119    /// Enable automatic reconnection watchdog (default: true)
120    pub enable_watchdog: bool,
121    /// Heartbeat interval in seconds (default: 30)
122    pub heartbeat_interval: u64,
123    /// Watchdog check interval in seconds (default: 1)
124    pub watchdog_interval: u64,
125    /// Maximum number of immediate reconnection attempts before adding delays.
126    /// Retry behavior: first `max_retries` attempts are immediate (no delay),
127    /// followed by one delayed attempt, then the counter resets and the cycle repeats.
128    /// Default: 3.
129    pub max_retries: u32,
130    /// Optional metrics collection for monitoring reconnection behavior
131    #[serde(skip)]
132    pub metrics: Option<ResilientMetrics>,
133    /// Global cumulative attempts counter shared across stream instances
134    #[serde(skip)]
135    pub cumulative_attempts_counter: Option<Arc<AtomicU64>>,
136}
137
138impl ResilientOptions {
139    /// Create a new ResilientOptions with a shared global cumulative attempts counter.
140    /// This ensures that cumulative attempt counts persist across stream recreation.
141    pub fn with_global_counter(mut self) -> Self {
142        self.cumulative_attempts_counter = Some(Arc::new(AtomicU64::new(0)));
143        self
144    }
145}
146
147impl Default for ResilientOptions {
148    fn default() -> Self {
149        Self {
150            manager_options: ManagerOptions {
151                port: 5038,
152                host: "127.0.0.1".to_string(),
153                username: "admin".to_string(),
154                password: "admin".to_string(),
155                events: true,
156            },
157            buffer_size: 2048,
158            enable_heartbeat: true,
159            enable_watchdog: true,
160            heartbeat_interval: 30,
161            watchdog_interval: 1,
162            max_retries: 3,
163            metrics: None,
164            cumulative_attempts_counter: None,
165        }
166    }
167}
168
169/// Connect a manager with resilient features enabled
170///
171/// This function creates a Manager instance with the specified buffer size,
172/// connects it, and optionally starts heartbeat and watchdog monitoring.
173pub async fn connect_resilient(options: ResilientOptions) -> Result<Manager, AmiError> {
174    let mut manager = Manager::new_with_buffer(options.buffer_size);
175    let instance_id = manager.inner.lock().await.instance_id.clone();
176
177    log::debug!(
178        "[{}] Connecting resilient AMI manager to '{}'@{}:{}",
179        instance_id,
180        options.manager_options.username,
181        options.manager_options.host,
182        options.manager_options.port
183    );
184
185    // Connect and login
186    manager
187        .connect_and_login(options.manager_options.clone())
188        .await?;
189
190    // Start heartbeat if enabled
191    if options.enable_heartbeat {
192        log::debug!("[{instance_id}] Starting heartbeat for resilient AMI manager");
193        manager
194            .start_heartbeat_with_interval(options.heartbeat_interval)
195            .await?;
196    }
197
198    // Start watchdog if enabled
199    if options.enable_watchdog {
200        log::debug!("[{instance_id}] Starting watchdog for resilient AMI manager");
201        manager
202            .start_watchdog_with_interval(options.manager_options, options.watchdog_interval)
203            .await?;
204    }
205
206    log::debug!("[{instance_id}] Resilient AMI manager connected successfully");
207    Ok(manager)
208}
209
210/// Create an infinite events stream that automatically handles reconnection and lag
211///
212/// Once successfully created, this stream never ends on its own and will attempt to
213/// resubscribe to the broadcast channel on lag errors and recreate the stream on connection losses.
214/// However, if the initial connection fails, the stream will not be created and an error will be returned.
215pub async fn infinite_events_stream(
216    options: ResilientOptions,
217) -> Result<impl Stream<Item = Result<AmiEvent, AmiError>>, AmiError> {
218    let manager = connect_resilient(options.clone()).await?;
219    Ok(create_infinite_stream(manager, options))
220}
221
222fn create_infinite_stream(
223    manager: Manager,
224    options: ResilientOptions,
225) -> impl Stream<Item = Result<AmiEvent, AmiError>> {
226    async_stream::stream! {
227        // Generate a unique ID for this stream instance to track lifecycle
228        let stream_id = uuid::Uuid::new_v4().to_string()[..8].to_string();
229        log::debug!("Creating new resilient stream instance [{}]", stream_id);
230
231        let mut current_manager = manager;
232        let mut retry_count = 0;
233
234        // Use global counter if provided, otherwise create a local one
235        let cumulative_counter = options.cumulative_attempts_counter
236            .clone()
237            .unwrap_or_else(|| Arc::new(AtomicU64::new(0)));
238
239        loop {
240            let mut event_stream = current_manager.all_events_stream().await;
241
242            loop {
243                match event_stream.next().await {
244                    Some(Ok(event)) => {
245                        // Reset retry count on successful event
246                        retry_count = 0;
247
248                        // Check for internal connection lost events
249                        if let AmiEvent::InternalConnectionLost { .. } = &event {
250                            // Record connection lost event in metrics if available
251                            if let Some(ref metrics) = options.metrics {
252                                metrics.record_connection_lost();
253                            }
254                            // Connection lost, break to outer loop to reconnect
255                            yield Ok(event);
256                            break;
257                        } else {
258                            yield Ok(event);
259                        }
260                    }
261                    Some(Err(BroadcastStreamRecvError::Lagged(count))) => {
262                        log::debug!("Event stream lagged by {count} events, resubscribing");
263                        // Stream lagged, resubscribe to get a fresh receiver
264                        event_stream = current_manager.all_events_stream().await;
265                        continue;
266                    }
267                    None => {
268                        // Stream ended, try to reconnect
269                        log::debug!("Event stream ended, attempting reconnection");
270                        break;
271                    }
272                }
273            }
274
275            // Reconnection attempts with max_retries cycles
276            'reconnect_loop: loop {
277                // Start timing reconnection attempts
278                let reconnection_start = Instant::now();
279
280                // Record attempt in metrics if available
281                if let Some(ref metrics) = options.metrics {
282                    metrics.record_reconnection_attempt();
283                }
284
285                // Attempt reconnection
286                retry_count += 1;
287                let cumulative_attempts = cumulative_counter.fetch_add(1, Ordering::SeqCst) + 1;
288
289                // Determine delay based on retry_count and max_retries
290                let retry_delay = if retry_count <= options.max_retries {
291                    0 // Immediate retry for the first max_retries attempts
292                } else {
293                    RECONNECTION_DELAY_SECONDS // Fixed delay after exceeding max_retries
294                };
295
296                // Log attempt info - use DEBUG for library internal logging
297                log::debug!(
298                    "[{}] Reconnection attempt #{}, cumulative #{}, delay {}s (max_retries={})",
299                    stream_id,
300                    retry_count,
301                    cumulative_attempts,
302                    retry_delay,
303                    options.max_retries
304                );
305
306                // Try to reconnect and log result with attempt number
307                log::debug!("[{stream_id}] Attempting to reconnect now (attempt #{retry_count}, cumulative #{cumulative_attempts})");
308                match connect_resilient(options.clone()).await {
309                    Ok(new_manager) => {
310                        let reconnection_duration = reconnection_start.elapsed();
311                        // Record successful reconnection in metrics
312                        if let Some(ref metrics) = options.metrics {
313                            metrics.record_successful_reconnection(reconnection_duration);
314                        }
315                        log::debug!(
316                            "[{}] Successfully reconnected to AMI on attempt #{}, cumulative #{} (total time since first failure: {:.1}s)",
317                            stream_id,
318                            retry_count,
319                            cumulative_attempts,
320                            reconnection_duration.as_secs_f64()
321                        );
322                        current_manager = new_manager;
323                        retry_count = 0;
324                        // Note: we don't reset cumulative_attempts here to track total attempts across all failures
325                        break 'reconnect_loop; // Exit reconnection loop, go back to event streaming
326                    }
327                    Err(e) => {
328                        // Record failed reconnection in metrics
329                        if let Some(ref metrics) = options.metrics {
330                            metrics.record_failed_reconnection();
331                        }
332                        log::debug!(
333                            "[{stream_id}] Reconnection attempt #{retry_count} failed: {e}, retrying in {retry_delay} seconds"
334                        );
335
336                        // Apply delay if needed
337                        if retry_delay > 0 {
338                            let sleep_start = std::time::Instant::now();
339                            tokio::time::sleep(Duration::from_secs(retry_delay)).await;
340                            let sleep_duration = sleep_start.elapsed();
341                            log::debug!(
342                                "[{}] Retry delay completed after {:.1}s, starting next attempt...",
343                                stream_id,
344                                sleep_duration.as_secs_f64()
345                            );
346                        }
347
348                        // Reset retry_count after going through a full cycle (max_retries immediate + 1 delayed attempt)
349                        if retry_count > options.max_retries {
350                            log::debug!(
351                                "[{}] Completed retry cycle ({} immediate + 1 delayed), resetting counter",
352                                stream_id,
353                                options.max_retries
354                            );
355                            retry_count = 0;
356                        }
357
358                        // Continue the reconnection loop (will loop back to retry)
359                        log::trace!("[{stream_id}] Continuing reconnection loop for next attempt");
360                    }
361                }
362            } // End of reconnection loop
363        } // End of main event loop
364    } // End of async_stream::stream!
365}
366
367#[cfg(test)]
368mod tests {
369    use super::*;
370
371    #[test]
372    fn test_resilient_options_default() {
373        let opts = ResilientOptions::default();
374        assert_eq!(opts.buffer_size, 2048);
375        assert!(opts.enable_heartbeat);
376        assert!(opts.enable_watchdog);
377        assert_eq!(opts.heartbeat_interval, 30);
378        assert_eq!(opts.watchdog_interval, 1);
379    }
380
381    #[test]
382    fn test_resilient_options_clone() {
383        let opts = ResilientOptions {
384            manager_options: ManagerOptions {
385                port: 5038,
386                host: "localhost".to_string(),
387                username: "test".to_string(),
388                password: "test".to_string(),
389                events: true,
390            },
391            buffer_size: 1024,
392            enable_heartbeat: false,
393            enable_watchdog: false,
394            heartbeat_interval: 60,
395            watchdog_interval: 2,
396            max_retries: 3,
397            metrics: None,
398            cumulative_attempts_counter: None,
399        };
400        let opts2 = opts.clone();
401        assert_eq!(opts.buffer_size, opts2.buffer_size);
402        assert_eq!(opts.enable_heartbeat, opts2.enable_heartbeat);
403        assert_eq!(opts.enable_watchdog, opts2.enable_watchdog);
404    }
405
406    #[test]
407    fn test_resilient_options_serialization() {
408        let opts = ResilientOptions::default();
409
410        // Test that it can be serialized and deserialized
411        let json = serde_json::to_string(&opts).expect("Failed to serialize");
412        let deserialized: ResilientOptions =
413            serde_json::from_str(&json).expect("Failed to deserialize");
414
415        assert_eq!(opts.buffer_size, deserialized.buffer_size);
416        assert_eq!(opts.enable_heartbeat, deserialized.enable_heartbeat);
417        assert_eq!(opts.enable_watchdog, deserialized.enable_watchdog);
418        assert_eq!(opts.heartbeat_interval, deserialized.heartbeat_interval);
419        assert_eq!(opts.watchdog_interval, deserialized.watchdog_interval);
420    }
421
422    #[test]
423    fn test_max_retries_behavior() {
424        // Test that max_retries is properly included in ResilientOptions
425        let opts = ResilientOptions {
426            manager_options: ManagerOptions {
427                port: 5038,
428                host: "test".to_string(),
429                username: "test".to_string(),
430                password: "test".to_string(),
431                events: true,
432            },
433            buffer_size: 1024,
434            enable_heartbeat: false,
435            enable_watchdog: false,
436            heartbeat_interval: 30,
437            watchdog_interval: 1,
438            max_retries: 5, // Test custom value
439            metrics: None,
440            cumulative_attempts_counter: None,
441        };
442
443        assert_eq!(opts.max_retries, 5);
444
445        // Test that default is 3
446        let default_opts = ResilientOptions::default();
447        assert_eq!(default_opts.max_retries, 3);
448    }
449
450    #[tokio::test]
451    async fn test_connect_resilient_invalid_connection() {
452        // Test connect_resilient with invalid connection parameters
453        let opts = ResilientOptions {
454            manager_options: ManagerOptions {
455                port: 65534, // Invalid port
456                host: "nonexistent.invalid".to_string(),
457                username: "test".to_string(),
458                password: "test".to_string(),
459                events: true,
460            },
461            buffer_size: 1024,
462            enable_heartbeat: false, // Disable to avoid heartbeat issues
463            enable_watchdog: false,  // Disable to avoid watchdog reconnection
464            heartbeat_interval: 30,
465            watchdog_interval: 1,
466            max_retries: 3,
467            metrics: None,
468            cumulative_attempts_counter: None,
469        };
470
471        // Should fail to connect
472        let result = connect_resilient(opts).await;
473        assert!(result.is_err());
474    }
475
476    #[tokio::test]
477    async fn test_infinite_events_stream_creation() {
478        // Test that infinite_events_stream can be created with valid options
479        let opts = ResilientOptions {
480            manager_options: ManagerOptions {
481                port: 65535, // Invalid port to test error handling
482                host: "nonexistent.invalid".to_string(),
483                username: "test".to_string(),
484                password: "test".to_string(),
485                events: true,
486            },
487            buffer_size: 1024,
488            enable_heartbeat: false,
489            enable_watchdog: false,
490            heartbeat_interval: 30,
491            watchdog_interval: 1,
492            max_retries: 3,
493            metrics: None,
494            cumulative_attempts_counter: None,
495        };
496
497        // Should fail to create due to connection failure
498        let result = infinite_events_stream(opts).await;
499        assert!(result.is_err());
500    }
501}