Skip to main content

lightstreamer_rs/connection/
management.rs

1//! Connection management module for automatic reconnection functionality
2//!
3//! This module provides the core components for managing WebSocket connections
4//! with automatic reconnection, heartbeat monitoring, and subscription preservation.
5
6use std::collections::HashMap;
7use std::sync::{Arc, Weak};
8use std::time::{Duration, Instant};
9use tokio::sync::{Mutex, Notify, RwLock};
10use tokio::time::sleep;
11use tracing::{debug, error, info, warn};
12
13use crate::client::LightstreamerClient;
14
15/// Main connection manager that orchestrates reconnection, heartbeat, and subscription management
16#[derive(Debug)]
17pub struct ConnectionManager {
18    client: Weak<Mutex<LightstreamerClient>>,
19    reconnection_handler: Arc<ReconnectionHandler>,
20    heartbeat_monitor: Arc<HeartbeatMonitor>,
21    subscription_manager: Arc<SubscriptionManager>,
22    connection_state: Arc<RwLock<ConnectionState>>,
23    shutdown_signal: Arc<Notify>,
24    metrics: Arc<Mutex<ConnectionMetrics>>,
25}
26
27/// Current state of the connection
28#[derive(Debug, Clone, PartialEq, Eq)]
29pub enum ConnectionState {
30    /// Connection is not established
31    Disconnected,
32    /// Connection attempt is in progress
33    Connecting,
34    /// Connection is established and active
35    Connected,
36    /// Reconnection is in progress
37    Reconnecting {
38        /// Current reconnection attempt number
39        attempt: u32,
40        /// Timestamp for the next retry attempt
41        next_retry: Instant,
42    },
43    /// Connection has failed permanently
44    Failed {
45        /// Reason for the connection failure
46        reason: String,
47    },
48}
49
50/// Reason for disconnection
51#[derive(Debug, Clone, PartialEq, Eq)]
52pub enum DisconnectionReason {
53    /// Network-related error occurred
54    NetworkError(String),
55    /// Server-side error occurred
56    ServerError(String),
57    /// Heartbeat timeout was reached
58    HeartbeatTimeout,
59    /// Disconnection was requested by the user
60    UserRequested,
61    /// Unknown or unspecified reason
62    Unknown,
63}
64
65/// Configuration for reconnection behavior
66#[derive(Debug, Clone)]
67pub struct ReconnectionConfig {
68    /// Whether automatic reconnection is enabled
69    pub enabled: bool,
70    /// Initial delay before the first reconnection attempt
71    pub initial_delay: Duration,
72    /// Maximum delay between reconnection attempts
73    pub max_delay: Duration,
74    /// Maximum number of reconnection attempts (None for unlimited)
75    pub max_attempts: Option<u32>,
76    /// Multiplier for exponential backoff between attempts
77    pub backoff_multiplier: f64,
78    /// Whether to add random jitter to delays (deprecated, use jitter_enabled)
79    pub jitter: bool,
80    /// Whether to add random jitter to delays to avoid thundering herd
81    pub jitter_enabled: bool,
82    /// Timeout for each connection attempt
83    pub timeout: Duration,
84}
85
86/// Configuration for heartbeat monitoring
87#[derive(Debug, Clone)]
88pub struct HeartbeatConfig {
89    /// Whether heartbeat monitoring is enabled
90    pub enabled: bool,
91    /// Interval between heartbeat messages
92    pub interval: Duration,
93    /// Timeout for heartbeat responses
94    pub timeout: Duration,
95    /// Maximum number of missed heartbeats before considering connection lost
96    pub max_missed: u32,
97}
98
99/// Connection events that can be emitted
100#[derive(Debug, Clone, PartialEq, Eq)]
101pub enum ConnectionEvent {
102    /// Connection has been established successfully
103    Connected,
104    /// Connection has been lost
105    Disconnected {
106        /// Reason for the disconnection
107        reason: DisconnectionReason,
108    },
109    /// Reconnection attempt is in progress
110    Reconnecting {
111        /// Current attempt number
112        attempt: u32,
113    },
114    /// Reconnection has failed
115    ReconnectionFailed {
116        /// Reason for the failure
117        reason: String,
118    },
119    /// A heartbeat message was missed
120    HeartbeatMissed,
121    /// Subscriptions have been preserved during disconnection
122    SubscriptionPreserved {
123        /// Number of subscriptions preserved
124        count: usize,
125    },
126    /// Subscriptions have been restored after reconnection
127    SubscriptionRestored {
128        /// Number of subscriptions restored
129        count: usize,
130    },
131}
132
133impl Default for ReconnectionConfig {
134    fn default() -> Self {
135        Self {
136            enabled: true,
137            max_attempts: Some(10),
138            initial_delay: Duration::from_secs(1),
139            max_delay: Duration::from_secs(60),
140            backoff_multiplier: 2.0,
141            jitter: true,
142            jitter_enabled: true,
143            timeout: Duration::from_secs(30),
144        }
145    }
146}
147
148impl ReconnectionConfig {
149    /// Creates a new reconnection config with default values
150    pub fn new() -> Self {
151        Self::default()
152    }
153
154    /// Creates a disabled reconnection config
155    pub fn disabled() -> Self {
156        Self {
157            enabled: false,
158            ..Self::default()
159        }
160    }
161
162    /// Creates a fast reconnection config for testing
163    pub fn fast() -> Self {
164        Self {
165            enabled: true,
166            max_attempts: Some(5),
167            initial_delay: Duration::from_millis(100),
168            max_delay: Duration::from_secs(5),
169            backoff_multiplier: 1.5,
170            jitter: true,
171            jitter_enabled: true,
172            timeout: Duration::from_secs(10),
173        }
174    }
175
176    /// Creates a conservative reconnection config
177    pub fn conservative() -> Self {
178        Self {
179            enabled: true,
180            max_attempts: Some(20),
181            initial_delay: Duration::from_secs(5),
182            max_delay: Duration::from_secs(300), // 5 minutes
183            backoff_multiplier: 2.5,
184            jitter: true,
185            jitter_enabled: true,
186            timeout: Duration::from_secs(60),
187        }
188    }
189
190    /// Sets whether reconnection is enabled
191    pub fn with_enabled(mut self, enabled: bool) -> Self {
192        self.enabled = enabled;
193        self
194    }
195
196    /// Sets the maximum number of reconnection attempts
197    pub fn with_max_attempts(mut self, max_attempts: u32) -> Self {
198        self.max_attempts = Some(max_attempts);
199        self
200    }
201
202    /// Sets the initial delay between reconnection attempts
203    pub fn with_initial_delay(mut self, initial_delay: Duration) -> Self {
204        self.initial_delay = initial_delay;
205        self
206    }
207
208    /// Sets the maximum delay between reconnection attempts
209    pub fn with_max_delay(mut self, max_delay: Duration) -> Self {
210        self.max_delay = max_delay;
211        self
212    }
213
214    /// Sets the backoff multiplier
215    pub fn with_backoff_multiplier(mut self, multiplier: f64) -> Self {
216        self.backoff_multiplier = multiplier;
217        self
218    }
219
220    /// Sets whether jitter is enabled
221    pub fn with_jitter_enabled(mut self, jitter_enabled: bool) -> Self {
222        self.jitter_enabled = jitter_enabled;
223        self
224    }
225
226    /// Sets the connection timeout
227    pub fn with_timeout(mut self, timeout: Duration) -> Self {
228        self.timeout = timeout;
229        self
230    }
231
232    /// Validates the configuration
233    pub fn validate(&self) -> Result<(), String> {
234        if let Some(max_attempts) = self.max_attempts
235            && max_attempts == 0
236        {
237            return Err("max_attempts must be greater than 0".to_string());
238        }
239
240        if self.initial_delay.is_zero() {
241            return Err("initial_delay must be greater than 0".to_string());
242        }
243
244        if self.max_delay < self.initial_delay {
245            return Err("max_delay must be greater than or equal to initial_delay".to_string());
246        }
247
248        if self.backoff_multiplier <= 1.0 {
249            return Err("backoff_multiplier must be greater than 1.0".to_string());
250        }
251
252        if self.timeout.is_zero() {
253            return Err("timeout must be greater than 0".to_string());
254        }
255
256        Ok(())
257    }
258}
259
260impl Default for HeartbeatConfig {
261    fn default() -> Self {
262        Self {
263            enabled: true,
264            interval: Duration::from_secs(30),
265            timeout: Duration::from_secs(10),
266            max_missed: 3,
267        }
268    }
269}
270
271impl HeartbeatConfig {
272    /// Creates a new heartbeat config with default values
273    pub fn new() -> Self {
274        Self::default()
275    }
276
277    /// Creates a disabled heartbeat config
278    pub fn disabled() -> Self {
279        Self {
280            enabled: false,
281            ..Self::default()
282        }
283    }
284
285    /// Creates a fast heartbeat config for testing
286    pub fn fast() -> Self {
287        Self {
288            enabled: true,
289            interval: Duration::from_secs(5),
290            timeout: Duration::from_secs(2),
291            max_missed: 2,
292        }
293    }
294
295    /// Creates a conservative heartbeat config
296    pub fn conservative() -> Self {
297        Self {
298            enabled: true,
299            interval: Duration::from_secs(60),
300            timeout: Duration::from_secs(20),
301            max_missed: 5,
302        }
303    }
304
305    /// Sets whether heartbeat is enabled
306    pub fn with_enabled(mut self, enabled: bool) -> Self {
307        self.enabled = enabled;
308        self
309    }
310
311    /// Sets the heartbeat interval
312    pub fn with_interval(mut self, interval: Duration) -> Self {
313        self.interval = interval;
314        self
315    }
316
317    /// Sets the heartbeat timeout
318    pub fn with_timeout(mut self, timeout: Duration) -> Self {
319        self.timeout = timeout;
320        self
321    }
322
323    /// Sets the maximum number of missed heartbeats
324    pub fn with_max_missed(mut self, max_missed: u32) -> Self {
325        self.max_missed = max_missed;
326        self
327    }
328
329    /// Validates the configuration
330    pub fn validate(&self) -> Result<(), String> {
331        if self.interval.is_zero() {
332            return Err("interval must be greater than 0".to_string());
333        }
334
335        if self.timeout.is_zero() {
336            return Err("timeout must be greater than 0".to_string());
337        }
338
339        if self.timeout >= self.interval {
340            return Err("timeout must be less than interval".to_string());
341        }
342
343        if self.max_missed == 0 {
344            return Err("max_missed must be greater than 0".to_string());
345        }
346
347        Ok(())
348    }
349}
350
351/// Connection metrics for monitoring and debugging
352#[derive(Debug, Default, Clone)]
353pub struct ConnectionMetrics {
354    /// Total number of connection attempts made
355    pub total_connections: u64,
356    /// Number of successful reconnection attempts
357    pub successful_reconnections: u64,
358    /// Number of failed reconnection attempts
359    pub failed_reconnections: u64,
360    /// Average time taken for successful reconnections
361    pub average_reconnection_time: Duration,
362    /// Number of heartbeat failures detected
363    pub heartbeat_failures: u64,
364    /// Number of subscription recovery operations performed
365    pub subscription_recoveries: u64,
366    /// Timestamp of when these metrics were last updated
367    pub last_updated: Option<Instant>,
368}
369
370/// Handles reconnection logic with exponential backoff
371#[derive(Debug)]
372pub struct ReconnectionHandler {
373    config: ReconnectionConfig,
374    current_attempt: Arc<Mutex<u32>>,
375    last_attempt: Arc<Mutex<Option<Instant>>>,
376    connection_manager: Weak<ConnectionManager>,
377}
378
379/// Monitors connection health through heartbeats
380#[derive(Debug)]
381pub struct HeartbeatMonitor {
382    config: HeartbeatConfig,
383    last_heartbeat: Arc<Mutex<Instant>>,
384    missed_count: Arc<Mutex<u32>>,
385    connection_manager: Weak<ConnectionManager>,
386    is_running: Arc<Mutex<bool>>,
387}
388
389impl HeartbeatMonitor {
390    /// Creates a new heartbeat monitor
391    pub fn new(config: HeartbeatConfig, connection_manager: Weak<ConnectionManager>) -> Self {
392        Self {
393            config,
394            last_heartbeat: Arc::new(Mutex::new(Instant::now())),
395            missed_count: Arc::new(Mutex::new(0)),
396            connection_manager,
397            is_running: Arc::new(Mutex::new(false)),
398        }
399    }
400
401    /// Sets the connection manager reference
402    pub fn set_connection_manager(&self, _connection_manager: Weak<ConnectionManager>) {
403        // Note: This would require interior mutability in a real implementation
404        // For now, we'll handle this in the constructor
405    }
406
407    /// Starts the heartbeat monitoring
408    pub async fn start(&self) {
409        {
410            let mut running = self.is_running.lock().await;
411            if *running {
412                debug!("Heartbeat monitor already running");
413                return;
414            }
415            *running = true;
416        }
417
418        info!(
419            "Starting heartbeat monitor with interval: {:?}",
420            self.config.interval
421        );
422
423        let mut interval = tokio::time::interval(self.config.interval);
424        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
425
426        loop {
427            // Check if we should stop
428            {
429                let running = self.is_running.lock().await;
430                if !*running {
431                    debug!("Heartbeat monitor stopping");
432                    break;
433                }
434            }
435
436            interval.tick().await;
437
438            // Send heartbeat and check response
439            match self.send_heartbeat().await {
440                Ok(()) => {
441                    // Reset missed count on successful heartbeat
442                    {
443                        let mut missed = self.missed_count.lock().await;
444                        *missed = 0;
445                    }
446
447                    // Update last heartbeat time
448                    {
449                        let mut last = self.last_heartbeat.lock().await;
450                        *last = Instant::now();
451                    }
452
453                    debug!("Heartbeat successful");
454                }
455                Err(e) => {
456                    warn!("Heartbeat failed: {}", e);
457
458                    let missed_count = {
459                        let mut missed = self.missed_count.lock().await;
460                        *missed += 1;
461                        *missed
462                    };
463
464                    // Check if we've exceeded the maximum missed heartbeats
465                    if missed_count >= self.config.max_missed {
466                        error!(
467                            "Maximum missed heartbeats ({}) exceeded, triggering disconnection",
468                            self.config.max_missed
469                        );
470
471                        // Trigger disconnection handling
472                        if let Some(manager) = self.connection_manager.upgrade() {
473                            manager
474                                .handle_disconnection(DisconnectionReason::HeartbeatTimeout)
475                                .await;
476
477                            // Update metrics
478                            {
479                                let mut metrics = manager.metrics.lock().await;
480                                metrics.heartbeat_failures += 1;
481                                metrics.last_updated = Some(Instant::now());
482                            }
483                        }
484
485                        break;
486                    }
487                }
488            }
489        }
490
491        // Mark as not running
492        {
493            let mut running = self.is_running.lock().await;
494            *running = false;
495        }
496
497        info!("Heartbeat monitor stopped");
498    }
499
500    /// Stops the heartbeat monitor
501    pub async fn stop(&self) {
502        info!("Stopping heartbeat monitor");
503        let mut running = self.is_running.lock().await;
504        *running = false;
505    }
506
507    /// Sends a heartbeat message to the server
508    async fn send_heartbeat(&self) -> Result<(), ReconnectionError> {
509        let manager = self
510            .connection_manager
511            .upgrade()
512            .ok_or(ReconnectionError::ClientLost)?;
513
514        let client = manager
515            .client
516            .upgrade()
517            .ok_or(ReconnectionError::ClientLost)?;
518
519        // Check current connection state
520        let state = manager.get_connection_state().await;
521        if !matches!(state, ConnectionState::Connected) {
522            return Err(ReconnectionError::ConnectionFailed(
523                "Not connected".to_string(),
524            ));
525        }
526
527        let _client_guard = client.lock().await;
528
529        // Send a ping/heartbeat message
530        // This would be implemented based on the Lightstreamer protocol
531        // For now, we'll simulate the heartbeat
532        match self.simulate_heartbeat().await {
533            Ok(()) => {
534                debug!("Heartbeat sent successfully");
535                Ok(())
536            }
537            Err(e) => {
538                error!("Failed to send heartbeat: {}", e);
539                Err(ReconnectionError::ConnectionFailed(e))
540            }
541        }
542    }
543
544    /// Simulates sending a heartbeat (placeholder for actual implementation)
545    async fn simulate_heartbeat(&self) -> Result<(), String> {
546        // In a real implementation, this would:
547        // 1. Send a ping message to the WebSocket
548        // 2. Wait for a pong response within the timeout
549        // 3. Return Ok(()) if successful, Err() if timeout or error
550
551        // For simulation, we'll randomly succeed/fail
552        tokio::time::sleep(Duration::from_millis(10)).await;
553
554        // Simulate 95% success rate
555        if rand::random::<f64>() < 0.95 {
556            Ok(())
557        } else {
558            Err("Simulated heartbeat failure".to_string())
559        }
560    }
561
562    /// Gets the time of the last successful heartbeat
563    pub async fn get_last_heartbeat(&self) -> Instant {
564        *self.last_heartbeat.lock().await
565    }
566
567    /// Gets the current missed heartbeat count
568    pub async fn get_missed_count(&self) -> u32 {
569        *self.missed_count.lock().await
570    }
571}
572
573/// Manages subscription state and recovery
574#[derive(Debug)]
575#[allow(dead_code)]
576pub struct SubscriptionManager {
577    subscriptions: Arc<RwLock<HashMap<usize, SubscriptionState>>>,
578    connection_manager: Weak<ConnectionManager>,
579}
580
581/// State of a subscription
582#[derive(Debug, Clone)]
583pub struct SubscriptionState {
584    /// Unique identifier for this subscription
585    pub id: usize,
586    /// Key used to identify the subscription in Lightstreamer
587    pub subscription_key: String,
588    /// Current status of the subscription
589    pub status: SubscriptionStatus,
590    /// Last received values for subscribed items
591    pub last_values: HashMap<String, String>,
592    /// Timestamp when this subscription was created
593    pub created_at: Instant,
594    /// Timestamp of the last update received for this subscription
595    pub last_update: Option<Instant>,
596}
597
598/// Status of a subscription
599#[derive(Debug, Clone, PartialEq, Eq)]
600pub enum SubscriptionStatus {
601    /// Subscription is active and receiving updates
602    Active,
603    /// Subscription is temporarily suspended
604    Suspended,
605    /// Subscription is being reestablished after a disconnection
606    Resubscribing,
607    /// Subscription has failed with the given reason
608    Failed {
609        /// Reason for the subscription failure
610        reason: String,
611    },
612}
613
614/// General Lightstreamer error type
615#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
616pub enum LightstreamerError {
617    /// Connection related errors
618    #[error("connection error: {0}")]
619    Connection(String),
620    /// Subscription related errors
621    #[error("subscription error: {0}")]
622    Subscription(String),
623    /// Authentication errors
624    #[error("authentication error: {0}")]
625    Authentication(String),
626    /// Configuration errors
627    #[error("configuration error: {0}")]
628    Configuration(String),
629    /// Network errors
630    #[error("network error: {0}")]
631    Network(String),
632    /// General errors
633    #[error("error: {0}")]
634    General(String),
635}
636
637/// Errors that can occur during reconnection attempts
638#[derive(Debug, thiserror::Error)]
639pub enum ReconnectionError {
640    /// Connection attempt failed with the given error message
641    #[error("Connection failed: {0}")]
642    ConnectionFailed(String),
643    /// Maximum number of reconnection attempts has been reached
644    #[error("Maximum reconnection attempts reached")]
645    MaxAttemptsReached,
646    /// Reconnection attempt timed out
647    #[error("Reconnection timeout")]
648    Timeout,
649    /// The client reference was lost during reconnection
650    #[error("Client reference lost")]
651    ClientLost,
652    /// A subscription-related error occurred during reconnection
653    #[error("Subscription error: {0}")]
654    SubscriptionError(String),
655}
656
657/// Errors related to subscription management
658#[derive(Debug, thiserror::Error)]
659pub enum SubscriptionError {
660    /// The subscription with the given ID was not found
661    #[error("Subscription not found: {0}")]
662    NotFound(usize),
663    /// Failed to preserve subscription state with the given error message
664    #[error("Failed to preserve subscription: {0}")]
665    PreservationFailed(String),
666    /// Failed to reestablish subscription with the given error message
667    #[error("Failed to reestablish subscription: {0}")]
668    ReestablishmentFailed(String),
669}
670
671impl ConnectionManager {
672    /// Creates a new connection manager
673    pub fn new(
674        client: Weak<Mutex<LightstreamerClient>>,
675        reconnection_config: ReconnectionConfig,
676        heartbeat_config: HeartbeatConfig,
677    ) -> Arc<Self> {
678        let manager = Arc::new(Self {
679            client: client.clone(),
680            reconnection_handler: Arc::new(ReconnectionHandler::new(
681                reconnection_config,
682                Weak::new(), // Will be set after creation
683            )),
684            heartbeat_monitor: Arc::new(HeartbeatMonitor::new(
685                heartbeat_config,
686                Weak::new(), // Will be set after creation
687            )),
688            subscription_manager: Arc::new(SubscriptionManager::new(
689                Weak::new(), // Will be set after creation
690            )),
691            connection_state: Arc::new(RwLock::new(ConnectionState::Disconnected)),
692            shutdown_signal: Arc::new(Notify::new()),
693            metrics: Arc::new(Mutex::new(ConnectionMetrics::default())),
694        });
695
696        // Set weak references to self
697        let weak_manager = Arc::downgrade(&manager);
698        manager
699            .reconnection_handler
700            .set_connection_manager(weak_manager.clone());
701        manager
702            .heartbeat_monitor
703            .set_connection_manager(weak_manager.clone());
704        manager
705            .subscription_manager
706            .set_connection_manager(weak_manager);
707
708        manager
709    }
710
711    /// Starts the connection monitoring tasks
712    pub async fn start_monitoring(&self) {
713        info!("Starting connection monitoring");
714
715        let heartbeat_task = {
716            let monitor = self.heartbeat_monitor.clone();
717            tokio::spawn(async move {
718                monitor.start().await;
719            })
720        };
721
722        let reconnection_task = {
723            let handler = self.reconnection_handler.clone();
724            tokio::spawn(async move {
725                handler.start().await;
726            })
727        };
728
729        tokio::select! {
730            _ = heartbeat_task => {
731                debug!("Heartbeat monitoring task completed");
732            },
733            _ = reconnection_task => {
734                debug!("Reconnection handler task completed");
735            },
736            _ = self.shutdown_signal.notified() => {
737                info!("Shutdown signal received, stopping monitoring");
738            },
739        }
740    }
741
742    /// Handles disconnection and triggers reconnection if needed
743    pub async fn handle_disconnection(&self, reason: DisconnectionReason) {
744        warn!("Connection lost: {:?}", reason);
745
746        // Update connection state
747        {
748            let mut state = self.connection_state.write().await;
749            *state = ConnectionState::Disconnected;
750        }
751
752        // Preserve subscription state
753        if let Err(e) = self.subscription_manager.preserve_subscriptions().await {
754            error!("Failed to preserve subscriptions: {}", e);
755        }
756
757        // Update metrics
758        {
759            let mut metrics = self.metrics.lock().await;
760            metrics.last_updated = Some(Instant::now());
761        }
762
763        // Trigger reconnection unless it was user-requested
764        if !matches!(reason, DisconnectionReason::UserRequested) {
765            self.reconnection_handler.trigger_reconnection(reason).await;
766        }
767    }
768
769    /// Gets the current connection state
770    pub async fn get_connection_state(&self) -> ConnectionState {
771        self.connection_state.read().await.clone()
772    }
773
774    /// Gets connection metrics
775    pub async fn get_metrics(&self) -> ConnectionMetrics {
776        self.metrics.lock().await.clone()
777    }
778
779    /// Forces a reconnection attempt
780    pub async fn force_reconnect(&self) -> Result<(), ReconnectionError> {
781        info!("Forcing reconnection");
782
783        // Trigger reconnection through the handler
784        self.reconnection_handler
785            .trigger_reconnection(DisconnectionReason::UserRequested)
786            .await;
787
788        Ok(())
789    }
790
791    /// Shuts down the connection manager
792    pub async fn shutdown(&self) {
793        info!("Shutting down connection manager");
794        self.shutdown_signal.notify_waiters();
795
796        // Stop heartbeat monitoring
797        self.heartbeat_monitor.stop().await;
798
799        // Update connection state
800        {
801            let mut state = self.connection_state.write().await;
802            *state = ConnectionState::Disconnected;
803        }
804    }
805}
806
807impl ReconnectionHandler {
808    /// Creates a new reconnection handler
809    pub fn new(config: ReconnectionConfig, connection_manager: Weak<ConnectionManager>) -> Self {
810        Self {
811            config,
812            current_attempt: Arc::new(Mutex::new(0)),
813            last_attempt: Arc::new(Mutex::new(None)),
814            connection_manager,
815        }
816    }
817
818    /// Sets the connection manager reference
819    pub fn set_connection_manager(&self, _connection_manager: Weak<ConnectionManager>) {
820        // Note: This would require interior mutability in a real implementation
821        // For now, we'll handle this in the constructor
822    }
823
824    /// Starts the reconnection handler
825    pub async fn start(&self) {
826        debug!("Reconnection handler started");
827        // This will be triggered by handle_disconnection
828    }
829
830    /// Triggers a reconnection attempt
831    pub async fn trigger_reconnection(&self, reason: DisconnectionReason) {
832        info!("Triggering reconnection due to: {:?}", reason);
833
834        // Reset attempt counter for new disconnection
835        {
836            let mut attempt = self.current_attempt.lock().await;
837            *attempt = 0;
838        }
839
840        // Start reconnection loop
841        self.reconnection_loop().await;
842    }
843
844    /// Main reconnection loop with exponential backoff
845    async fn reconnection_loop(&self) {
846        loop {
847            // Check if we've exceeded max attempts
848            let current_attempt = {
849                let mut attempt = self.current_attempt.lock().await;
850                *attempt += 1;
851                *attempt
852            };
853
854            if let Some(max_attempts) = self.config.max_attempts
855                && current_attempt > max_attempts
856            {
857                error!("Maximum reconnection attempts ({}) reached", max_attempts);
858
859                // Update connection manager state
860                if let Some(manager) = self.connection_manager.upgrade() {
861                    let mut state = manager.connection_state.write().await;
862                    *state = ConnectionState::Failed {
863                        reason: "Maximum reconnection attempts reached".to_string(),
864                    };
865
866                    let mut metrics = manager.metrics.lock().await;
867                    metrics.failed_reconnections += 1;
868                    metrics.last_updated = Some(Instant::now());
869                }
870                break;
871            }
872
873            // Calculate delay with exponential backoff
874            let delay = self.calculate_next_delay(current_attempt).await;
875
876            // Update connection state to show we're reconnecting
877            if let Some(manager) = self.connection_manager.upgrade() {
878                let mut state = manager.connection_state.write().await;
879                *state = ConnectionState::Reconnecting {
880                    attempt: current_attempt,
881                    next_retry: Instant::now() + delay,
882                };
883            }
884
885            info!("Reconnection attempt {} in {:?}", current_attempt, delay);
886            sleep(delay).await;
887
888            // Attempt reconnection
889            match self.attempt_reconnection().await {
890                Ok(()) => {
891                    info!("Reconnection successful after {} attempts", current_attempt);
892
893                    // Update metrics and state
894                    if let Some(manager) = self.connection_manager.upgrade() {
895                        let mut state = manager.connection_state.write().await;
896                        *state = ConnectionState::Connected;
897
898                        let mut metrics = manager.metrics.lock().await;
899                        metrics.successful_reconnections += 1;
900                        metrics.total_connections += 1;
901                        metrics.last_updated = Some(Instant::now());
902
903                        // Reestablish subscriptions
904                        if let Some(client_ref) = manager.client.upgrade()
905                            && let Err(e) = manager
906                                .subscription_manager
907                                .reestablish_subscriptions(&client_ref)
908                                .await
909                        {
910                            error!("Failed to reestablish subscriptions: {}", e);
911                        }
912                    }
913
914                    // Reset attempt counter
915                    {
916                        let mut attempt = self.current_attempt.lock().await;
917                        *attempt = 0;
918                    }
919
920                    break;
921                }
922                Err(e) => {
923                    warn!("Reconnection attempt {} failed: {}", current_attempt, e);
924
925                    // Update last attempt time
926                    {
927                        let mut last_attempt = self.last_attempt.lock().await;
928                        *last_attempt = Some(Instant::now());
929                    }
930                }
931            }
932        }
933    }
934
935    /// Calculates the next delay using exponential backoff with optional jitter
936    async fn calculate_next_delay(&self, attempt: u32) -> Duration {
937        let base_delay = self.config.initial_delay.as_millis() as f64;
938        let multiplier = self.config.backoff_multiplier.powi((attempt - 1) as i32);
939        let calculated_delay = (base_delay * multiplier) as u64;
940
941        let delay =
942            Duration::from_millis(calculated_delay.min(self.config.max_delay.as_millis() as u64));
943
944        if self.config.jitter {
945            self.add_jitter(delay)
946        } else {
947            delay
948        }
949    }
950
951    /// Adds random jitter to prevent thundering herd
952    fn add_jitter(&self, delay: Duration) -> Duration {
953        let jitter_range = delay.as_millis() / 4; // 25% jitter
954        let jitter = rand::random::<u64>() % (jitter_range as u64 + 1);
955        delay + Duration::from_millis(jitter)
956    }
957
958    /// Attempts to reconnect to the server
959    async fn attempt_reconnection(&self) -> Result<(), ReconnectionError> {
960        let manager = self
961            .connection_manager
962            .upgrade()
963            .ok_or(ReconnectionError::ClientLost)?;
964
965        let client = manager
966            .client
967            .upgrade()
968            .ok_or(ReconnectionError::ClientLost)?;
969
970        // Attempt to connect
971        // This would call the actual connection logic
972        // For now, we'll simulate the connection attempt
973        let shutdown_signal = Arc::new(tokio::sync::Notify::new());
974        match crate::client::LightstreamerClient::connect(client, shutdown_signal).await {
975            Ok(()) => {
976                info!("Successfully reconnected to server");
977                Ok(())
978            }
979            Err(e) => {
980                error!("Failed to reconnect: {:?}", e);
981                Err(ReconnectionError::ConnectionFailed(format!("{:?}", e)))
982            }
983        }
984    }
985}
986
987impl SubscriptionManager {
988    /// Creates a new subscription manager
989    pub fn new(connection_manager: Weak<ConnectionManager>) -> Self {
990        Self {
991            subscriptions: Arc::new(RwLock::new(HashMap::new())),
992            connection_manager,
993        }
994    }
995
996    /// Sets the connection manager reference
997    pub fn set_connection_manager(&self, _connection_manager: Weak<ConnectionManager>) {
998        // Note: This would require interior mutability in a real implementation
999        // For now, we'll handle this in the constructor
1000    }
1001
1002    /// Adds a subscription to be managed
1003    pub async fn add_subscription(&self, subscription_id: usize, subscription: SubscriptionState) {
1004        let mut subs = self.subscriptions.write().await;
1005        subs.insert(subscription_id, subscription);
1006        info!("Added subscription to manager: {}", subscription_id);
1007    }
1008
1009    /// Removes a subscription from management
1010    pub async fn remove_subscription(&self, subscription_id: usize) -> Option<SubscriptionState> {
1011        let mut subs = self.subscriptions.write().await;
1012        let removed = subs.remove(&subscription_id);
1013        if removed.is_some() {
1014            info!("Removed subscription from manager: {}", subscription_id);
1015        }
1016        removed
1017    }
1018
1019    /// Gets a subscription by ID
1020    pub async fn get_subscription(&self, subscription_id: usize) -> Option<SubscriptionState> {
1021        let subs = self.subscriptions.read().await;
1022        subs.get(&subscription_id).cloned()
1023    }
1024
1025    /// Gets all managed subscriptions
1026    pub async fn get_all_subscriptions(&self) -> HashMap<usize, SubscriptionState> {
1027        let subs = self.subscriptions.read().await;
1028        subs.clone()
1029    }
1030
1031    /// Marks all subscriptions as disconnected
1032    pub async fn mark_all_disconnected(&self) {
1033        let mut subs = self.subscriptions.write().await;
1034        for (id, subscription) in subs.iter_mut() {
1035            subscription.status = SubscriptionStatus::Suspended;
1036            subscription.last_update = Some(Instant::now());
1037            debug!("Marked subscription {} as disconnected", id);
1038        }
1039        info!("Marked {} subscriptions as disconnected", subs.len());
1040    }
1041
1042    /// Preserves subscription state during disconnection
1043    pub async fn preserve_subscriptions(&self) -> Result<(), SubscriptionError> {
1044        let subs = self.subscriptions.read().await;
1045        info!(
1046            "Preserving {} subscriptions during disconnection",
1047            subs.len()
1048        );
1049        // In a real implementation, this would save subscription state
1050        // to persistent storage or memory for later restoration
1051        Ok(())
1052    }
1053
1054    /// Reestablishes all subscriptions after reconnection
1055    pub async fn reestablish_subscriptions(
1056        &self,
1057        client: &Arc<Mutex<crate::client::LightstreamerClient>>,
1058    ) -> Result<(), ReconnectionError> {
1059        let subscriptions = {
1060            let subs = self.subscriptions.read().await;
1061            subs.clone()
1062        };
1063
1064        let mut reestablished = 0;
1065        let mut failed = 0;
1066
1067        for (id, mut subscription) in subscriptions {
1068            match self
1069                .reestablish_single_subscription(id, &mut subscription, client)
1070                .await
1071            {
1072                Ok(()) => {
1073                    reestablished += 1;
1074                    // Update the subscription in the map
1075                    {
1076                        let mut subs = self.subscriptions.write().await;
1077                        if let Some(stored_sub) = subs.get_mut(&id) {
1078                            *stored_sub = subscription;
1079                        }
1080                    }
1081                }
1082                Err(e) => {
1083                    failed += 1;
1084                    error!("Failed to reestablish subscription {}: {}", id, e);
1085
1086                    // Mark as failed
1087                    subscription.status = SubscriptionStatus::Failed {
1088                        reason: e.to_string(),
1089                    };
1090                    subscription.last_update = Some(Instant::now());
1091
1092                    // Update the subscription in the map
1093                    {
1094                        let mut subs = self.subscriptions.write().await;
1095                        if let Some(stored_sub) = subs.get_mut(&id) {
1096                            *stored_sub = subscription;
1097                        }
1098                    }
1099                }
1100            }
1101        }
1102
1103        info!(
1104            "Subscription reestablishment complete: {} succeeded, {} failed",
1105            reestablished, failed
1106        );
1107
1108        if failed > 0 {
1109            Err(ReconnectionError::SubscriptionError(format!(
1110                "Failed to reestablish {} out of {} subscriptions",
1111                failed,
1112                reestablished + failed
1113            )))
1114        } else {
1115            Ok(())
1116        }
1117    }
1118
1119    /// Reestablishes a single subscription
1120    async fn reestablish_single_subscription(
1121        &self,
1122        subscription_id: usize,
1123        subscription: &mut SubscriptionState,
1124        _client: &Arc<Mutex<crate::client::LightstreamerClient>>,
1125    ) -> Result<(), ReconnectionError> {
1126        info!("Reestablishing subscription: {}", subscription_id);
1127
1128        // Update state to connecting
1129        subscription.status = SubscriptionStatus::Resubscribing;
1130        subscription.last_update = Some(Instant::now());
1131
1132        // Simulate subscription reestablishment
1133        // In a real implementation, this would:
1134        // 1. Create a new subscription request
1135        // 2. Send it to the server
1136        // 3. Wait for confirmation
1137        // 4. Update the subscription state
1138
1139        match self
1140            .simulate_subscription_reestablishment(subscription_id)
1141            .await
1142        {
1143            Ok(()) => {
1144                subscription.status = SubscriptionStatus::Active;
1145                subscription.last_update = Some(Instant::now());
1146                info!(
1147                    "Successfully reestablished subscription: {}",
1148                    subscription_id
1149                );
1150                Ok(())
1151            }
1152            Err(e) => {
1153                subscription.status = SubscriptionStatus::Failed { reason: e.clone() };
1154                subscription.last_update = Some(Instant::now());
1155                error!(
1156                    "Failed to reestablish subscription {}: {}",
1157                    subscription_id, e
1158                );
1159                Err(ReconnectionError::SubscriptionError(e))
1160            }
1161        }
1162    }
1163
1164    /// Simulates subscription reestablishment (placeholder for actual implementation)
1165    async fn simulate_subscription_reestablishment(
1166        &self,
1167        subscription_id: usize,
1168    ) -> Result<(), String> {
1169        // Simulate network delay
1170        tokio::time::sleep(Duration::from_millis(100)).await;
1171
1172        // Simulate 90% success rate
1173        if rand::random::<f64>() < 0.9 {
1174            debug!(
1175                "Simulated successful reestablishment for subscription: {}",
1176                subscription_id
1177            );
1178            Ok(())
1179        } else {
1180            Err(format!(
1181                "Simulated failure for subscription: {}",
1182                subscription_id
1183            ))
1184        }
1185    }
1186
1187    /// Gets subscription statistics
1188    pub async fn get_statistics(&self) -> SubscriptionStatistics {
1189        let subs = self.subscriptions.read().await;
1190
1191        let mut stats = SubscriptionStatistics {
1192            total_subscriptions: subs.len(),
1193            active_subscriptions: 0,
1194            failed_subscriptions: 0,
1195            disconnected_subscriptions: 0,
1196            connecting_subscriptions: 0,
1197        };
1198
1199        for subscription in subs.values() {
1200            match subscription.status {
1201                SubscriptionStatus::Active => stats.active_subscriptions += 1,
1202                SubscriptionStatus::Failed { .. } => stats.failed_subscriptions += 1,
1203                SubscriptionStatus::Suspended => stats.disconnected_subscriptions += 1,
1204                SubscriptionStatus::Resubscribing => stats.connecting_subscriptions += 1,
1205            }
1206        }
1207
1208        stats
1209    }
1210
1211    /// Clears all failed subscriptions
1212    pub async fn clear_failed_subscriptions(&self) -> usize {
1213        let mut subs = self.subscriptions.write().await;
1214        let initial_count = subs.len();
1215
1216        subs.retain(|_, subscription| {
1217            !matches!(subscription.status, SubscriptionStatus::Failed { .. })
1218        });
1219
1220        let removed_count = initial_count - subs.len();
1221        if removed_count > 0 {
1222            info!("Cleared {} failed subscriptions", removed_count);
1223        }
1224
1225        removed_count
1226    }
1227}
1228
1229/// Statistics about managed subscriptions
1230#[derive(Debug, Clone)]
1231pub struct SubscriptionStatistics {
1232    /// Total number of subscriptions being managed
1233    pub total_subscriptions: usize,
1234    /// Number of subscriptions that are currently active and receiving data
1235    pub active_subscriptions: usize,
1236    /// Number of subscriptions that have failed and are not receiving data
1237    pub failed_subscriptions: usize,
1238    /// Number of subscriptions that are disconnected but preserved for reconnection
1239    pub disconnected_subscriptions: usize,
1240    /// Number of subscriptions that are currently in the process of reconnecting
1241    pub connecting_subscriptions: usize,
1242}