1use std::collections::HashMap;
7use std::sync::{Arc, Weak};
8use std::time::{Duration, Instant};
9use tokio::sync::{Mutex, Notify, RwLock};
10use tokio::time::sleep;
11use tracing::{debug, error, info, warn};
12
13use crate::client::LightstreamerClient;
14
15#[derive(Debug)]
17pub struct ConnectionManager {
18 client: Weak<Mutex<LightstreamerClient>>,
19 reconnection_handler: Arc<ReconnectionHandler>,
20 heartbeat_monitor: Arc<HeartbeatMonitor>,
21 subscription_manager: Arc<SubscriptionManager>,
22 connection_state: Arc<RwLock<ConnectionState>>,
23 shutdown_signal: Arc<Notify>,
24 metrics: Arc<Mutex<ConnectionMetrics>>,
25}
26
27#[derive(Debug, Clone, PartialEq, Eq)]
29pub enum ConnectionState {
30 Disconnected,
32 Connecting,
34 Connected,
36 Reconnecting {
38 attempt: u32,
40 next_retry: Instant,
42 },
43 Failed {
45 reason: String,
47 },
48}
49
50#[derive(Debug, Clone, PartialEq, Eq)]
52pub enum DisconnectionReason {
53 NetworkError(String),
55 ServerError(String),
57 HeartbeatTimeout,
59 UserRequested,
61 Unknown,
63}
64
65#[derive(Debug, Clone)]
67pub struct ReconnectionConfig {
68 pub enabled: bool,
70 pub initial_delay: Duration,
72 pub max_delay: Duration,
74 pub max_attempts: Option<u32>,
76 pub backoff_multiplier: f64,
78 pub jitter: bool,
80 pub jitter_enabled: bool,
82 pub timeout: Duration,
84}
85
86#[derive(Debug, Clone)]
88pub struct HeartbeatConfig {
89 pub enabled: bool,
91 pub interval: Duration,
93 pub timeout: Duration,
95 pub max_missed: u32,
97}
98
99#[derive(Debug, Clone, PartialEq, Eq)]
101pub enum ConnectionEvent {
102 Connected,
104 Disconnected {
106 reason: DisconnectionReason,
108 },
109 Reconnecting {
111 attempt: u32,
113 },
114 ReconnectionFailed {
116 reason: String,
118 },
119 HeartbeatMissed,
121 SubscriptionPreserved {
123 count: usize,
125 },
126 SubscriptionRestored {
128 count: usize,
130 },
131}
132
133impl Default for ReconnectionConfig {
134 fn default() -> Self {
135 Self {
136 enabled: true,
137 max_attempts: Some(10),
138 initial_delay: Duration::from_secs(1),
139 max_delay: Duration::from_secs(60),
140 backoff_multiplier: 2.0,
141 jitter: true,
142 jitter_enabled: true,
143 timeout: Duration::from_secs(30),
144 }
145 }
146}
147
148impl ReconnectionConfig {
149 pub fn new() -> Self {
151 Self::default()
152 }
153
154 pub fn disabled() -> Self {
156 Self {
157 enabled: false,
158 ..Self::default()
159 }
160 }
161
162 pub fn fast() -> Self {
164 Self {
165 enabled: true,
166 max_attempts: Some(5),
167 initial_delay: Duration::from_millis(100),
168 max_delay: Duration::from_secs(5),
169 backoff_multiplier: 1.5,
170 jitter: true,
171 jitter_enabled: true,
172 timeout: Duration::from_secs(10),
173 }
174 }
175
176 pub fn conservative() -> Self {
178 Self {
179 enabled: true,
180 max_attempts: Some(20),
181 initial_delay: Duration::from_secs(5),
182 max_delay: Duration::from_secs(300), backoff_multiplier: 2.5,
184 jitter: true,
185 jitter_enabled: true,
186 timeout: Duration::from_secs(60),
187 }
188 }
189
190 pub fn with_enabled(mut self, enabled: bool) -> Self {
192 self.enabled = enabled;
193 self
194 }
195
196 pub fn with_max_attempts(mut self, max_attempts: u32) -> Self {
198 self.max_attempts = Some(max_attempts);
199 self
200 }
201
202 pub fn with_initial_delay(mut self, initial_delay: Duration) -> Self {
204 self.initial_delay = initial_delay;
205 self
206 }
207
208 pub fn with_max_delay(mut self, max_delay: Duration) -> Self {
210 self.max_delay = max_delay;
211 self
212 }
213
214 pub fn with_backoff_multiplier(mut self, multiplier: f64) -> Self {
216 self.backoff_multiplier = multiplier;
217 self
218 }
219
220 pub fn with_jitter_enabled(mut self, jitter_enabled: bool) -> Self {
222 self.jitter_enabled = jitter_enabled;
223 self
224 }
225
226 pub fn with_timeout(mut self, timeout: Duration) -> Self {
228 self.timeout = timeout;
229 self
230 }
231
232 pub fn validate(&self) -> Result<(), String> {
234 if let Some(max_attempts) = self.max_attempts
235 && max_attempts == 0
236 {
237 return Err("max_attempts must be greater than 0".to_string());
238 }
239
240 if self.initial_delay.is_zero() {
241 return Err("initial_delay must be greater than 0".to_string());
242 }
243
244 if self.max_delay < self.initial_delay {
245 return Err("max_delay must be greater than or equal to initial_delay".to_string());
246 }
247
248 if self.backoff_multiplier <= 1.0 {
249 return Err("backoff_multiplier must be greater than 1.0".to_string());
250 }
251
252 if self.timeout.is_zero() {
253 return Err("timeout must be greater than 0".to_string());
254 }
255
256 Ok(())
257 }
258}
259
260impl Default for HeartbeatConfig {
261 fn default() -> Self {
262 Self {
263 enabled: true,
264 interval: Duration::from_secs(30),
265 timeout: Duration::from_secs(10),
266 max_missed: 3,
267 }
268 }
269}
270
271impl HeartbeatConfig {
272 pub fn new() -> Self {
274 Self::default()
275 }
276
277 pub fn disabled() -> Self {
279 Self {
280 enabled: false,
281 ..Self::default()
282 }
283 }
284
285 pub fn fast() -> Self {
287 Self {
288 enabled: true,
289 interval: Duration::from_secs(5),
290 timeout: Duration::from_secs(2),
291 max_missed: 2,
292 }
293 }
294
295 pub fn conservative() -> Self {
297 Self {
298 enabled: true,
299 interval: Duration::from_secs(60),
300 timeout: Duration::from_secs(20),
301 max_missed: 5,
302 }
303 }
304
305 pub fn with_enabled(mut self, enabled: bool) -> Self {
307 self.enabled = enabled;
308 self
309 }
310
311 pub fn with_interval(mut self, interval: Duration) -> Self {
313 self.interval = interval;
314 self
315 }
316
317 pub fn with_timeout(mut self, timeout: Duration) -> Self {
319 self.timeout = timeout;
320 self
321 }
322
323 pub fn with_max_missed(mut self, max_missed: u32) -> Self {
325 self.max_missed = max_missed;
326 self
327 }
328
329 pub fn validate(&self) -> Result<(), String> {
331 if self.interval.is_zero() {
332 return Err("interval must be greater than 0".to_string());
333 }
334
335 if self.timeout.is_zero() {
336 return Err("timeout must be greater than 0".to_string());
337 }
338
339 if self.timeout >= self.interval {
340 return Err("timeout must be less than interval".to_string());
341 }
342
343 if self.max_missed == 0 {
344 return Err("max_missed must be greater than 0".to_string());
345 }
346
347 Ok(())
348 }
349}
350
351#[derive(Debug, Default, Clone)]
353pub struct ConnectionMetrics {
354 pub total_connections: u64,
356 pub successful_reconnections: u64,
358 pub failed_reconnections: u64,
360 pub average_reconnection_time: Duration,
362 pub heartbeat_failures: u64,
364 pub subscription_recoveries: u64,
366 pub last_updated: Option<Instant>,
368}
369
370#[derive(Debug)]
372pub struct ReconnectionHandler {
373 config: ReconnectionConfig,
374 current_attempt: Arc<Mutex<u32>>,
375 last_attempt: Arc<Mutex<Option<Instant>>>,
376 connection_manager: Weak<ConnectionManager>,
377}
378
379#[derive(Debug)]
381pub struct HeartbeatMonitor {
382 config: HeartbeatConfig,
383 last_heartbeat: Arc<Mutex<Instant>>,
384 missed_count: Arc<Mutex<u32>>,
385 connection_manager: Weak<ConnectionManager>,
386 is_running: Arc<Mutex<bool>>,
387}
388
389impl HeartbeatMonitor {
390 pub fn new(config: HeartbeatConfig, connection_manager: Weak<ConnectionManager>) -> Self {
392 Self {
393 config,
394 last_heartbeat: Arc::new(Mutex::new(Instant::now())),
395 missed_count: Arc::new(Mutex::new(0)),
396 connection_manager,
397 is_running: Arc::new(Mutex::new(false)),
398 }
399 }
400
401 pub fn set_connection_manager(&self, _connection_manager: Weak<ConnectionManager>) {
403 }
406
407 pub async fn start(&self) {
409 {
410 let mut running = self.is_running.lock().await;
411 if *running {
412 debug!("Heartbeat monitor already running");
413 return;
414 }
415 *running = true;
416 }
417
418 info!(
419 "Starting heartbeat monitor with interval: {:?}",
420 self.config.interval
421 );
422
423 let mut interval = tokio::time::interval(self.config.interval);
424 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
425
426 loop {
427 {
429 let running = self.is_running.lock().await;
430 if !*running {
431 debug!("Heartbeat monitor stopping");
432 break;
433 }
434 }
435
436 interval.tick().await;
437
438 match self.send_heartbeat().await {
440 Ok(()) => {
441 {
443 let mut missed = self.missed_count.lock().await;
444 *missed = 0;
445 }
446
447 {
449 let mut last = self.last_heartbeat.lock().await;
450 *last = Instant::now();
451 }
452
453 debug!("Heartbeat successful");
454 }
455 Err(e) => {
456 warn!("Heartbeat failed: {}", e);
457
458 let missed_count = {
459 let mut missed = self.missed_count.lock().await;
460 *missed += 1;
461 *missed
462 };
463
464 if missed_count >= self.config.max_missed {
466 error!(
467 "Maximum missed heartbeats ({}) exceeded, triggering disconnection",
468 self.config.max_missed
469 );
470
471 if let Some(manager) = self.connection_manager.upgrade() {
473 manager
474 .handle_disconnection(DisconnectionReason::HeartbeatTimeout)
475 .await;
476
477 {
479 let mut metrics = manager.metrics.lock().await;
480 metrics.heartbeat_failures += 1;
481 metrics.last_updated = Some(Instant::now());
482 }
483 }
484
485 break;
486 }
487 }
488 }
489 }
490
491 {
493 let mut running = self.is_running.lock().await;
494 *running = false;
495 }
496
497 info!("Heartbeat monitor stopped");
498 }
499
500 pub async fn stop(&self) {
502 info!("Stopping heartbeat monitor");
503 let mut running = self.is_running.lock().await;
504 *running = false;
505 }
506
507 async fn send_heartbeat(&self) -> Result<(), ReconnectionError> {
509 let manager = self
510 .connection_manager
511 .upgrade()
512 .ok_or(ReconnectionError::ClientLost)?;
513
514 let client = manager
515 .client
516 .upgrade()
517 .ok_or(ReconnectionError::ClientLost)?;
518
519 let state = manager.get_connection_state().await;
521 if !matches!(state, ConnectionState::Connected) {
522 return Err(ReconnectionError::ConnectionFailed(
523 "Not connected".to_string(),
524 ));
525 }
526
527 let _client_guard = client.lock().await;
528
529 match self.simulate_heartbeat().await {
533 Ok(()) => {
534 debug!("Heartbeat sent successfully");
535 Ok(())
536 }
537 Err(e) => {
538 error!("Failed to send heartbeat: {}", e);
539 Err(ReconnectionError::ConnectionFailed(e))
540 }
541 }
542 }
543
544 async fn simulate_heartbeat(&self) -> Result<(), String> {
546 tokio::time::sleep(Duration::from_millis(10)).await;
553
554 if rand::random::<f64>() < 0.95 {
556 Ok(())
557 } else {
558 Err("Simulated heartbeat failure".to_string())
559 }
560 }
561
562 pub async fn get_last_heartbeat(&self) -> Instant {
564 *self.last_heartbeat.lock().await
565 }
566
567 pub async fn get_missed_count(&self) -> u32 {
569 *self.missed_count.lock().await
570 }
571}
572
573#[derive(Debug)]
575#[allow(dead_code)]
576pub struct SubscriptionManager {
577 subscriptions: Arc<RwLock<HashMap<usize, SubscriptionState>>>,
578 connection_manager: Weak<ConnectionManager>,
579}
580
581#[derive(Debug, Clone)]
583pub struct SubscriptionState {
584 pub id: usize,
586 pub subscription_key: String,
588 pub status: SubscriptionStatus,
590 pub last_values: HashMap<String, String>,
592 pub created_at: Instant,
594 pub last_update: Option<Instant>,
596}
597
598#[derive(Debug, Clone, PartialEq, Eq)]
600pub enum SubscriptionStatus {
601 Active,
603 Suspended,
605 Resubscribing,
607 Failed {
609 reason: String,
611 },
612}
613
614#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
616pub enum LightstreamerError {
617 #[error("connection error: {0}")]
619 Connection(String),
620 #[error("subscription error: {0}")]
622 Subscription(String),
623 #[error("authentication error: {0}")]
625 Authentication(String),
626 #[error("configuration error: {0}")]
628 Configuration(String),
629 #[error("network error: {0}")]
631 Network(String),
632 #[error("error: {0}")]
634 General(String),
635}
636
637#[derive(Debug, thiserror::Error)]
639pub enum ReconnectionError {
640 #[error("Connection failed: {0}")]
642 ConnectionFailed(String),
643 #[error("Maximum reconnection attempts reached")]
645 MaxAttemptsReached,
646 #[error("Reconnection timeout")]
648 Timeout,
649 #[error("Client reference lost")]
651 ClientLost,
652 #[error("Subscription error: {0}")]
654 SubscriptionError(String),
655}
656
657#[derive(Debug, thiserror::Error)]
659pub enum SubscriptionError {
660 #[error("Subscription not found: {0}")]
662 NotFound(usize),
663 #[error("Failed to preserve subscription: {0}")]
665 PreservationFailed(String),
666 #[error("Failed to reestablish subscription: {0}")]
668 ReestablishmentFailed(String),
669}
670
671impl ConnectionManager {
672 pub fn new(
674 client: Weak<Mutex<LightstreamerClient>>,
675 reconnection_config: ReconnectionConfig,
676 heartbeat_config: HeartbeatConfig,
677 ) -> Arc<Self> {
678 let manager = Arc::new(Self {
679 client: client.clone(),
680 reconnection_handler: Arc::new(ReconnectionHandler::new(
681 reconnection_config,
682 Weak::new(), )),
684 heartbeat_monitor: Arc::new(HeartbeatMonitor::new(
685 heartbeat_config,
686 Weak::new(), )),
688 subscription_manager: Arc::new(SubscriptionManager::new(
689 Weak::new(), )),
691 connection_state: Arc::new(RwLock::new(ConnectionState::Disconnected)),
692 shutdown_signal: Arc::new(Notify::new()),
693 metrics: Arc::new(Mutex::new(ConnectionMetrics::default())),
694 });
695
696 let weak_manager = Arc::downgrade(&manager);
698 manager
699 .reconnection_handler
700 .set_connection_manager(weak_manager.clone());
701 manager
702 .heartbeat_monitor
703 .set_connection_manager(weak_manager.clone());
704 manager
705 .subscription_manager
706 .set_connection_manager(weak_manager);
707
708 manager
709 }
710
711 pub async fn start_monitoring(&self) {
713 info!("Starting connection monitoring");
714
715 let heartbeat_task = {
716 let monitor = self.heartbeat_monitor.clone();
717 tokio::spawn(async move {
718 monitor.start().await;
719 })
720 };
721
722 let reconnection_task = {
723 let handler = self.reconnection_handler.clone();
724 tokio::spawn(async move {
725 handler.start().await;
726 })
727 };
728
729 tokio::select! {
730 _ = heartbeat_task => {
731 debug!("Heartbeat monitoring task completed");
732 },
733 _ = reconnection_task => {
734 debug!("Reconnection handler task completed");
735 },
736 _ = self.shutdown_signal.notified() => {
737 info!("Shutdown signal received, stopping monitoring");
738 },
739 }
740 }
741
742 pub async fn handle_disconnection(&self, reason: DisconnectionReason) {
744 warn!("Connection lost: {:?}", reason);
745
746 {
748 let mut state = self.connection_state.write().await;
749 *state = ConnectionState::Disconnected;
750 }
751
752 if let Err(e) = self.subscription_manager.preserve_subscriptions().await {
754 error!("Failed to preserve subscriptions: {}", e);
755 }
756
757 {
759 let mut metrics = self.metrics.lock().await;
760 metrics.last_updated = Some(Instant::now());
761 }
762
763 if !matches!(reason, DisconnectionReason::UserRequested) {
765 self.reconnection_handler.trigger_reconnection(reason).await;
766 }
767 }
768
769 pub async fn get_connection_state(&self) -> ConnectionState {
771 self.connection_state.read().await.clone()
772 }
773
774 pub async fn get_metrics(&self) -> ConnectionMetrics {
776 self.metrics.lock().await.clone()
777 }
778
779 pub async fn force_reconnect(&self) -> Result<(), ReconnectionError> {
781 info!("Forcing reconnection");
782
783 self.reconnection_handler
785 .trigger_reconnection(DisconnectionReason::UserRequested)
786 .await;
787
788 Ok(())
789 }
790
791 pub async fn shutdown(&self) {
793 info!("Shutting down connection manager");
794 self.shutdown_signal.notify_waiters();
795
796 self.heartbeat_monitor.stop().await;
798
799 {
801 let mut state = self.connection_state.write().await;
802 *state = ConnectionState::Disconnected;
803 }
804 }
805}
806
807impl ReconnectionHandler {
808 pub fn new(config: ReconnectionConfig, connection_manager: Weak<ConnectionManager>) -> Self {
810 Self {
811 config,
812 current_attempt: Arc::new(Mutex::new(0)),
813 last_attempt: Arc::new(Mutex::new(None)),
814 connection_manager,
815 }
816 }
817
818 pub fn set_connection_manager(&self, _connection_manager: Weak<ConnectionManager>) {
820 }
823
824 pub async fn start(&self) {
826 debug!("Reconnection handler started");
827 }
829
830 pub async fn trigger_reconnection(&self, reason: DisconnectionReason) {
832 info!("Triggering reconnection due to: {:?}", reason);
833
834 {
836 let mut attempt = self.current_attempt.lock().await;
837 *attempt = 0;
838 }
839
840 self.reconnection_loop().await;
842 }
843
844 async fn reconnection_loop(&self) {
846 loop {
847 let current_attempt = {
849 let mut attempt = self.current_attempt.lock().await;
850 *attempt += 1;
851 *attempt
852 };
853
854 if let Some(max_attempts) = self.config.max_attempts
855 && current_attempt > max_attempts
856 {
857 error!("Maximum reconnection attempts ({}) reached", max_attempts);
858
859 if let Some(manager) = self.connection_manager.upgrade() {
861 let mut state = manager.connection_state.write().await;
862 *state = ConnectionState::Failed {
863 reason: "Maximum reconnection attempts reached".to_string(),
864 };
865
866 let mut metrics = manager.metrics.lock().await;
867 metrics.failed_reconnections += 1;
868 metrics.last_updated = Some(Instant::now());
869 }
870 break;
871 }
872
873 let delay = self.calculate_next_delay(current_attempt).await;
875
876 if let Some(manager) = self.connection_manager.upgrade() {
878 let mut state = manager.connection_state.write().await;
879 *state = ConnectionState::Reconnecting {
880 attempt: current_attempt,
881 next_retry: Instant::now() + delay,
882 };
883 }
884
885 info!("Reconnection attempt {} in {:?}", current_attempt, delay);
886 sleep(delay).await;
887
888 match self.attempt_reconnection().await {
890 Ok(()) => {
891 info!("Reconnection successful after {} attempts", current_attempt);
892
893 if let Some(manager) = self.connection_manager.upgrade() {
895 let mut state = manager.connection_state.write().await;
896 *state = ConnectionState::Connected;
897
898 let mut metrics = manager.metrics.lock().await;
899 metrics.successful_reconnections += 1;
900 metrics.total_connections += 1;
901 metrics.last_updated = Some(Instant::now());
902
903 if let Some(client_ref) = manager.client.upgrade()
905 && let Err(e) = manager
906 .subscription_manager
907 .reestablish_subscriptions(&client_ref)
908 .await
909 {
910 error!("Failed to reestablish subscriptions: {}", e);
911 }
912 }
913
914 {
916 let mut attempt = self.current_attempt.lock().await;
917 *attempt = 0;
918 }
919
920 break;
921 }
922 Err(e) => {
923 warn!("Reconnection attempt {} failed: {}", current_attempt, e);
924
925 {
927 let mut last_attempt = self.last_attempt.lock().await;
928 *last_attempt = Some(Instant::now());
929 }
930 }
931 }
932 }
933 }
934
935 async fn calculate_next_delay(&self, attempt: u32) -> Duration {
937 let base_delay = self.config.initial_delay.as_millis() as f64;
938 let multiplier = self.config.backoff_multiplier.powi((attempt - 1) as i32);
939 let calculated_delay = (base_delay * multiplier) as u64;
940
941 let delay =
942 Duration::from_millis(calculated_delay.min(self.config.max_delay.as_millis() as u64));
943
944 if self.config.jitter {
945 self.add_jitter(delay)
946 } else {
947 delay
948 }
949 }
950
951 fn add_jitter(&self, delay: Duration) -> Duration {
953 let jitter_range = delay.as_millis() / 4; let jitter = rand::random::<u64>() % (jitter_range as u64 + 1);
955 delay + Duration::from_millis(jitter)
956 }
957
958 async fn attempt_reconnection(&self) -> Result<(), ReconnectionError> {
960 let manager = self
961 .connection_manager
962 .upgrade()
963 .ok_or(ReconnectionError::ClientLost)?;
964
965 let client = manager
966 .client
967 .upgrade()
968 .ok_or(ReconnectionError::ClientLost)?;
969
970 let shutdown_signal = Arc::new(tokio::sync::Notify::new());
974 match crate::client::LightstreamerClient::connect(client, shutdown_signal).await {
975 Ok(()) => {
976 info!("Successfully reconnected to server");
977 Ok(())
978 }
979 Err(e) => {
980 error!("Failed to reconnect: {:?}", e);
981 Err(ReconnectionError::ConnectionFailed(format!("{:?}", e)))
982 }
983 }
984 }
985}
986
987impl SubscriptionManager {
988 pub fn new(connection_manager: Weak<ConnectionManager>) -> Self {
990 Self {
991 subscriptions: Arc::new(RwLock::new(HashMap::new())),
992 connection_manager,
993 }
994 }
995
996 pub fn set_connection_manager(&self, _connection_manager: Weak<ConnectionManager>) {
998 }
1001
1002 pub async fn add_subscription(&self, subscription_id: usize, subscription: SubscriptionState) {
1004 let mut subs = self.subscriptions.write().await;
1005 subs.insert(subscription_id, subscription);
1006 info!("Added subscription to manager: {}", subscription_id);
1007 }
1008
1009 pub async fn remove_subscription(&self, subscription_id: usize) -> Option<SubscriptionState> {
1011 let mut subs = self.subscriptions.write().await;
1012 let removed = subs.remove(&subscription_id);
1013 if removed.is_some() {
1014 info!("Removed subscription from manager: {}", subscription_id);
1015 }
1016 removed
1017 }
1018
1019 pub async fn get_subscription(&self, subscription_id: usize) -> Option<SubscriptionState> {
1021 let subs = self.subscriptions.read().await;
1022 subs.get(&subscription_id).cloned()
1023 }
1024
1025 pub async fn get_all_subscriptions(&self) -> HashMap<usize, SubscriptionState> {
1027 let subs = self.subscriptions.read().await;
1028 subs.clone()
1029 }
1030
1031 pub async fn mark_all_disconnected(&self) {
1033 let mut subs = self.subscriptions.write().await;
1034 for (id, subscription) in subs.iter_mut() {
1035 subscription.status = SubscriptionStatus::Suspended;
1036 subscription.last_update = Some(Instant::now());
1037 debug!("Marked subscription {} as disconnected", id);
1038 }
1039 info!("Marked {} subscriptions as disconnected", subs.len());
1040 }
1041
1042 pub async fn preserve_subscriptions(&self) -> Result<(), SubscriptionError> {
1044 let subs = self.subscriptions.read().await;
1045 info!(
1046 "Preserving {} subscriptions during disconnection",
1047 subs.len()
1048 );
1049 Ok(())
1052 }
1053
1054 pub async fn reestablish_subscriptions(
1056 &self,
1057 client: &Arc<Mutex<crate::client::LightstreamerClient>>,
1058 ) -> Result<(), ReconnectionError> {
1059 let subscriptions = {
1060 let subs = self.subscriptions.read().await;
1061 subs.clone()
1062 };
1063
1064 let mut reestablished = 0;
1065 let mut failed = 0;
1066
1067 for (id, mut subscription) in subscriptions {
1068 match self
1069 .reestablish_single_subscription(id, &mut subscription, client)
1070 .await
1071 {
1072 Ok(()) => {
1073 reestablished += 1;
1074 {
1076 let mut subs = self.subscriptions.write().await;
1077 if let Some(stored_sub) = subs.get_mut(&id) {
1078 *stored_sub = subscription;
1079 }
1080 }
1081 }
1082 Err(e) => {
1083 failed += 1;
1084 error!("Failed to reestablish subscription {}: {}", id, e);
1085
1086 subscription.status = SubscriptionStatus::Failed {
1088 reason: e.to_string(),
1089 };
1090 subscription.last_update = Some(Instant::now());
1091
1092 {
1094 let mut subs = self.subscriptions.write().await;
1095 if let Some(stored_sub) = subs.get_mut(&id) {
1096 *stored_sub = subscription;
1097 }
1098 }
1099 }
1100 }
1101 }
1102
1103 info!(
1104 "Subscription reestablishment complete: {} succeeded, {} failed",
1105 reestablished, failed
1106 );
1107
1108 if failed > 0 {
1109 Err(ReconnectionError::SubscriptionError(format!(
1110 "Failed to reestablish {} out of {} subscriptions",
1111 failed,
1112 reestablished + failed
1113 )))
1114 } else {
1115 Ok(())
1116 }
1117 }
1118
1119 async fn reestablish_single_subscription(
1121 &self,
1122 subscription_id: usize,
1123 subscription: &mut SubscriptionState,
1124 _client: &Arc<Mutex<crate::client::LightstreamerClient>>,
1125 ) -> Result<(), ReconnectionError> {
1126 info!("Reestablishing subscription: {}", subscription_id);
1127
1128 subscription.status = SubscriptionStatus::Resubscribing;
1130 subscription.last_update = Some(Instant::now());
1131
1132 match self
1140 .simulate_subscription_reestablishment(subscription_id)
1141 .await
1142 {
1143 Ok(()) => {
1144 subscription.status = SubscriptionStatus::Active;
1145 subscription.last_update = Some(Instant::now());
1146 info!(
1147 "Successfully reestablished subscription: {}",
1148 subscription_id
1149 );
1150 Ok(())
1151 }
1152 Err(e) => {
1153 subscription.status = SubscriptionStatus::Failed { reason: e.clone() };
1154 subscription.last_update = Some(Instant::now());
1155 error!(
1156 "Failed to reestablish subscription {}: {}",
1157 subscription_id, e
1158 );
1159 Err(ReconnectionError::SubscriptionError(e))
1160 }
1161 }
1162 }
1163
1164 async fn simulate_subscription_reestablishment(
1166 &self,
1167 subscription_id: usize,
1168 ) -> Result<(), String> {
1169 tokio::time::sleep(Duration::from_millis(100)).await;
1171
1172 if rand::random::<f64>() < 0.9 {
1174 debug!(
1175 "Simulated successful reestablishment for subscription: {}",
1176 subscription_id
1177 );
1178 Ok(())
1179 } else {
1180 Err(format!(
1181 "Simulated failure for subscription: {}",
1182 subscription_id
1183 ))
1184 }
1185 }
1186
1187 pub async fn get_statistics(&self) -> SubscriptionStatistics {
1189 let subs = self.subscriptions.read().await;
1190
1191 let mut stats = SubscriptionStatistics {
1192 total_subscriptions: subs.len(),
1193 active_subscriptions: 0,
1194 failed_subscriptions: 0,
1195 disconnected_subscriptions: 0,
1196 connecting_subscriptions: 0,
1197 };
1198
1199 for subscription in subs.values() {
1200 match subscription.status {
1201 SubscriptionStatus::Active => stats.active_subscriptions += 1,
1202 SubscriptionStatus::Failed { .. } => stats.failed_subscriptions += 1,
1203 SubscriptionStatus::Suspended => stats.disconnected_subscriptions += 1,
1204 SubscriptionStatus::Resubscribing => stats.connecting_subscriptions += 1,
1205 }
1206 }
1207
1208 stats
1209 }
1210
1211 pub async fn clear_failed_subscriptions(&self) -> usize {
1213 let mut subs = self.subscriptions.write().await;
1214 let initial_count = subs.len();
1215
1216 subs.retain(|_, subscription| {
1217 !matches!(subscription.status, SubscriptionStatus::Failed { .. })
1218 });
1219
1220 let removed_count = initial_count - subs.len();
1221 if removed_count > 0 {
1222 info!("Cleared {} failed subscriptions", removed_count);
1223 }
1224
1225 removed_count
1226 }
1227}
1228
1229#[derive(Debug, Clone)]
1231pub struct SubscriptionStatistics {
1232 pub total_subscriptions: usize,
1234 pub active_subscriptions: usize,
1236 pub failed_subscriptions: usize,
1238 pub disconnected_subscriptions: usize,
1240 pub connecting_subscriptions: usize,
1242}