1use std::sync::Arc;
73use std::time::{Duration, Instant};
74
75use crate::wire::webrtc::{SignalingClient, WebRtcCoordinator};
76use tokio_util::sync::CancellationToken;
77
78const NETWORK_EVENT_SETTLE_WINDOW: Duration = Duration::from_millis(400);
79const SIGNALING_PROBE_TIMEOUT: Duration = Duration::from_secs(1);
80
81#[derive(Debug, Clone, PartialEq, Eq, Hash)]
83pub enum NetworkEvent {
84 Available,
86
87 Lost,
89
90 TypeChanged { is_wifi: bool, is_cellular: bool },
92
93 CleanupConnections,
100}
101
102#[derive(Debug, Clone, Copy, PartialEq, Eq)]
104pub enum NetworkRecoveryAction {
105 Noop,
106 Offline,
107 Restore,
108 CleanupConnectionsCompat,
109}
110
111#[derive(Debug, Clone)]
113pub struct NetworkEventResult {
114 pub event: NetworkEvent,
116
117 pub success: bool,
119
120 pub error: Option<String>,
122
123 pub duration_ms: u64,
125}
126
127impl NetworkEventResult {
128 pub fn success(event: NetworkEvent, duration_ms: u64) -> Self {
129 Self {
130 event,
131 success: true,
132 error: None,
133 duration_ms,
134 }
135 }
136
137 pub fn failure(event: NetworkEvent, error: String, duration_ms: u64) -> Self {
138 Self {
139 event,
140 success: false,
141 error: Some(error),
142 duration_ms,
143 }
144 }
145}
146
147#[async_trait::async_trait]
151pub trait NetworkEventProcessor: Send + Sync {
152 async fn process_network_available(&self) -> Result<(), String>;
158
159 async fn process_network_lost(&self) -> Result<(), String>;
165
166 async fn process_network_type_changed(
172 &self,
173 is_wifi: bool,
174 is_cellular: bool,
175 ) -> Result<(), String>;
176
177 async fn cleanup_connections(&self) -> Result<(), String>;
200
201 async fn process_network_recovery_action(
206 &self,
207 action: NetworkRecoveryAction,
208 ) -> Result<(), String> {
209 match action {
210 NetworkRecoveryAction::Noop => Ok(()),
211 NetworkRecoveryAction::Offline => self.process_network_lost().await,
212 NetworkRecoveryAction::Restore => self.process_network_available().await,
213 NetworkRecoveryAction::CleanupConnectionsCompat => self.cleanup_connections().await,
214 }
215 }
216}
217
218#[derive(Debug, Clone)]
220pub struct DebounceConfig {
221 pub window: Duration,
223}
224
225impl Default for DebounceConfig {
226 fn default() -> Self {
227 Self {
228 window: Duration::from_secs(2),
230 }
231 }
232}
233
234#[derive(Debug)]
236struct DebounceState {
237 last_available: tokio::sync::Mutex<Option<Instant>>,
238 last_lost: tokio::sync::Mutex<Option<Instant>>,
239 last_type_changed: tokio::sync::Mutex<Option<Instant>>,
240}
241
242impl DebounceState {
243 fn new() -> Self {
244 Self {
245 last_available: tokio::sync::Mutex::new(None),
246 last_lost: tokio::sync::Mutex::new(None),
247 last_type_changed: tokio::sync::Mutex::new(None),
248 }
249 }
250}
251
252#[derive(Debug)]
253struct SignalingRecoveryState {
254 connect_lock: tokio::sync::Mutex<()>,
255 last_successful_connect: tokio::sync::Mutex<Option<Instant>>,
256}
257
258impl SignalingRecoveryState {
259 fn new() -> Self {
260 Self {
261 connect_lock: tokio::sync::Mutex::new(()),
262 last_successful_connect: tokio::sync::Mutex::new(None),
263 }
264 }
265}
266
267pub struct DefaultNetworkEventProcessor {
269 signaling_client: Arc<dyn SignalingClient>,
270 webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
271 debounce_config: DebounceConfig,
272 debounce_state: Arc<DebounceState>,
273 recovery_state: Arc<SignalingRecoveryState>,
274}
275
276impl DefaultNetworkEventProcessor {
277 pub fn new(
278 signaling_client: Arc<dyn SignalingClient>,
279 webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
280 ) -> Self {
281 Self::new_with_debounce(
282 signaling_client,
283 webrtc_coordinator,
284 DebounceConfig::default(),
285 )
286 }
287
288 pub fn new_with_debounce(
289 signaling_client: Arc<dyn SignalingClient>,
290 webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
291 debounce_config: DebounceConfig,
292 ) -> Self {
293 Self {
294 signaling_client,
295 webrtc_coordinator,
296 debounce_config,
297 debounce_state: Arc::new(DebounceState::new()),
298 recovery_state: Arc::new(SignalingRecoveryState::new()),
299 }
300 }
301
302 async fn should_process_event(&self, event: &NetworkEvent) -> bool {
308 let now = Instant::now();
309
310 match event {
311 NetworkEvent::Available => {
312 let mut last = self.debounce_state.last_available.lock().await;
313 if let Some(last_time) = *last {
314 if now.duration_since(last_time) < self.debounce_config.window {
315 tracing::debug!(
316 "⏸️ Debouncing Network Available event (last event was {:?} ago)",
317 now.duration_since(last_time)
318 );
319 return false;
320 }
321 }
322 *last = Some(now);
323 true
324 }
325 NetworkEvent::Lost => {
326 let mut last = self.debounce_state.last_lost.lock().await;
327 if let Some(last_time) = *last {
328 if now.duration_since(last_time) < self.debounce_config.window {
329 tracing::debug!(
330 "⏸️ Debouncing Network Lost event (last event was {:?} ago)",
331 now.duration_since(last_time)
332 );
333 return false;
334 }
335 }
336 *last = Some(now);
337 true
338 }
339 NetworkEvent::TypeChanged { .. } => {
340 let mut last = self.debounce_state.last_type_changed.lock().await;
341 if let Some(last_time) = *last {
342 if now.duration_since(last_time) < self.debounce_config.window {
343 tracing::debug!(
344 "⏸️ Debouncing Network TypeChanged event (last event was {:?} ago)",
345 now.duration_since(last_time)
346 );
347 return false;
348 }
349 }
350 *last = Some(now);
351 true
352 }
353 NetworkEvent::CleanupConnections => {
355 tracing::debug!(
356 "🧹 CleanupConnections event - no debouncing (always execute immediately)"
357 );
358 true
359 }
360 }
361 }
362
363 async fn ensure_signaling_connected_once(&self, reason: &str) -> Result<(), String> {
364 let _guard = self.recovery_state.connect_lock.lock().await;
365
366 if self.signaling_client.is_connected() {
367 tracing::debug!(
368 reason = reason,
369 "Signaling already connected, skipping connect"
370 );
371 return Ok(());
372 }
373
374 let recently_connected = {
375 let last = self.recovery_state.last_successful_connect.lock().await;
376 last.map(|instant| instant.elapsed() < Duration::from_millis(1500))
377 .unwrap_or(false)
378 };
379 if recently_connected && self.signaling_client.is_connected() {
380 tracing::debug!(
381 reason = reason,
382 "Signaling recently connected, reusing connection"
383 );
384 return Ok(());
385 }
386
387 tracing::info!(reason = reason, "🔄 Connecting signaling");
388 self.signaling_client.connect_once().await.map_err(|e| {
389 let err_msg = format!("WebSocket connect failed: {}", e);
390 tracing::error!("❌ {}", err_msg);
391 err_msg
392 })?;
393
394 *self.recovery_state.last_successful_connect.lock().await = Some(Instant::now());
395 tracing::info!(reason = reason, "✅ Signaling connected");
396 Ok(())
397 }
398
399 async fn ensure_signaling_healthy_once(&self, reason: &str) -> Result<(), String> {
400 let _guard = self.recovery_state.connect_lock.lock().await;
401
402 if !self.signaling_client.is_connected() {
403 tracing::info!(reason = reason, "🔄 Connecting signaling");
404 self.signaling_client.connect_once().await.map_err(|e| {
405 let err_msg = format!("WebSocket connect failed: {}", e);
406 tracing::error!("❌ {}", err_msg);
407 err_msg
408 })?;
409
410 *self.recovery_state.last_successful_connect.lock().await = Some(Instant::now());
411 tracing::info!(reason = reason, "✅ Signaling connected");
412 return Ok(());
413 }
414
415 tracing::debug!(
416 reason = reason,
417 timeout_ms = SIGNALING_PROBE_TIMEOUT.as_millis() as u64,
418 "🔎 Probing existing signaling WebSocket"
419 );
420
421 match self
422 .signaling_client
423 .probe_alive(SIGNALING_PROBE_TIMEOUT)
424 .await
425 {
426 Ok(()) => {
427 tracing::debug!(
428 reason = reason,
429 "✅ Signaling probe succeeded; keeping existing WebSocket"
430 );
431 Ok(())
432 }
433 Err(e) => {
434 tracing::warn!(
435 reason = reason,
436 "⚠️ Signaling probe failed; rebuilding WebSocket: {}",
437 e
438 );
439
440 if let Err(disconnect_err) = self.signaling_client.disconnect().await {
441 tracing::warn!(
442 reason = reason,
443 "⚠️ Failed to disconnect unhealthy signaling before rebuild: {}",
444 disconnect_err
445 );
446 }
447
448 tracing::info!(reason = reason, "🔄 Rebuilding signaling: connecting");
449 self.signaling_client
450 .connect_once()
451 .await
452 .map_err(|connect_err| {
453 let err_msg = format!("WebSocket rebuild failed: {}", connect_err);
454 tracing::error!("❌ {}", err_msg);
455 err_msg
456 })?;
457
458 *self.recovery_state.last_successful_connect.lock().await = Some(Instant::now());
459 tracing::info!(reason = reason, "✅ Signaling rebuilt");
460 Ok(())
461 }
462 }
463 }
464
465 async fn restore_signaling_and_webrtc(&self, reason: &str) -> Result<(), String> {
466 let recovery_targets = if let Some(coordinator) = self.webrtc_coordinator.clone() {
467 coordinator.begin_network_recovery(reason).await
468 } else {
469 Vec::new()
470 };
471
472 self.ensure_signaling_healthy_once(reason).await?;
473
474 let coordinator = self.webrtc_coordinator.clone();
475
476 if let Some(coordinator) = coordinator {
477 if recovery_targets.is_empty() {
478 tracing::info!("♻️ Resuming ICE restart for peers already in network recovery");
479 } else {
480 tracing::info!("♻️ Triggering ICE restart for recovering connections...");
481 }
482 coordinator.restart_network_recovery_connections().await;
483 }
484
485 Ok(())
486 }
487
488 async fn process_offline(&self) -> Result<(), String> {
489 tracing::info!("📱 Processing: Network offline");
490
491 if let Some(ref coordinator) = self.webrtc_coordinator {
492 coordinator.begin_network_recovery("NetworkLost").await;
493 tracing::info!("🧹 Clearing pending ICE restart attempts...");
494 coordinator.clear_pending_restarts().await;
495 }
496
497 if self.signaling_client.is_connected() {
498 tracing::info!("🔌 Disconnecting WebSocket...");
499 let _ = self.signaling_client.disconnect().await;
500 }
501
502 Ok(())
503 }
504}
505
506#[async_trait::async_trait]
507impl NetworkEventProcessor for DefaultNetworkEventProcessor {
508 async fn process_network_available(&self) -> Result<(), String> {
510 let should_process = self.should_process_event(&NetworkEvent::Available).await;
512 if !should_process && self.signaling_client.is_connected() {
513 return Ok(());
514 }
515
516 tracing::info!("📱 Processing: Network available");
517
518 self.restore_signaling_and_webrtc("NetworkAvailable").await
519 }
520
521 async fn process_network_lost(&self) -> Result<(), String> {
523 if !self.should_process_event(&NetworkEvent::Lost).await {
525 return Ok(());
526 }
527
528 self.process_offline().await
529 }
530
531 async fn process_network_type_changed(
533 &self,
534 is_wifi: bool,
535 is_cellular: bool,
536 ) -> Result<(), String> {
537 let should_process = self
539 .should_process_event(&NetworkEvent::TypeChanged {
540 is_wifi,
541 is_cellular,
542 })
543 .await;
544 if !should_process && self.signaling_client.is_connected() {
545 return Ok(());
546 }
547
548 tracing::info!(
549 "📱 Processing: Network type changed (WiFi={}, Cellular={})",
550 is_wifi,
551 is_cellular
552 );
553
554 self.restore_signaling_and_webrtc("NetworkTypeChanged")
555 .await
556 }
557
558 async fn cleanup_connections(&self) -> Result<(), String> {
564 let _cleanup_guard = self
565 .webrtc_coordinator
566 .as_ref()
567 .map(|coordinator| coordinator.cleanup_guard());
568
569 tracing::info!("🧹 Manually cleaning up all connections...");
570
571 if let Some(ref coordinator) = self.webrtc_coordinator {
573 tracing::info!("♻️ Clearing pending ICE restart attempts...");
574 coordinator.clear_pending_restarts().await;
575
576 tracing::info!("🔻 Closing all WebRTC peer connections...");
578 if let Err(e) = coordinator.close_all_peers().await {
579 let err_msg = format!("Failed to close all peers: {}", e);
580 tracing::warn!("⚠️ {}", err_msg);
581 } else {
583 tracing::info!("✅ All WebRTC peer connections closed");
584 }
585 }
586
587 if self.signaling_client.is_connected() {
589 tracing::info!("🔌 Disconnecting WebSocket...");
590 match self.signaling_client.disconnect().await {
591 Ok(_) => {
592 tracing::info!("✅ WebSocket disconnected successfully");
593 }
594 Err(e) => {
595 let err_msg = format!("Failed to disconnect WebSocket: {}", e);
596 tracing::warn!("⚠️ {}", err_msg);
597 }
599 }
600 }
601
602 tracing::info!("✅ Connection cleanup completed");
603
604 tracing::info!("🔌 Re-establishing signaling connection...");
607 self.ensure_signaling_connected_once("CompatCleanupConnections")
608 .await?;
609
610 tracing::info!("✅ Connection cleanup and reconnect completed");
611 Ok(())
612 }
613
614 async fn process_network_recovery_action(
615 &self,
616 action: NetworkRecoveryAction,
617 ) -> Result<(), String> {
618 match action {
619 NetworkRecoveryAction::Noop => Ok(()),
620 NetworkRecoveryAction::Offline => self.process_offline().await,
621 NetworkRecoveryAction::Restore => {
622 self.restore_signaling_and_webrtc("NetworkEventBatch").await
623 }
624 NetworkRecoveryAction::CleanupConnectionsCompat => self.cleanup_connections().await,
625 }
626 }
627}
628
629pub fn select_network_recovery_action(events: &[NetworkEvent]) -> NetworkRecoveryAction {
630 let mut saw_cleanup_connections = false;
631 let mut latest_state_action = NetworkRecoveryAction::Noop;
632
633 for event in events {
634 match event {
635 NetworkEvent::CleanupConnections => saw_cleanup_connections = true,
636 NetworkEvent::Available | NetworkEvent::TypeChanged { .. } => {
637 latest_state_action = NetworkRecoveryAction::Restore
638 }
639 NetworkEvent::Lost => latest_state_action = NetworkRecoveryAction::Offline,
640 }
641 }
642
643 if saw_cleanup_connections {
644 NetworkRecoveryAction::CleanupConnectionsCompat
645 } else {
646 latest_state_action
647 }
648}
649
650pub async fn process_network_event_batch(
651 events: Vec<NetworkEvent>,
652 processor: Arc<dyn NetworkEventProcessor>,
653) -> Vec<NetworkEventResult> {
654 if events.is_empty() {
655 return Vec::new();
656 }
657
658 let action = select_network_recovery_action(&events);
659 let start = Instant::now();
660
661 let result = processor.process_network_recovery_action(action).await;
662
663 let duration_ms = start.elapsed().as_millis() as u64;
664 events
665 .into_iter()
666 .map(|event| match &result {
667 Ok(()) => NetworkEventResult::success(event, duration_ms),
668 Err(e) => NetworkEventResult::failure(event, e.clone(), duration_ms),
669 })
670 .collect()
671}
672
673pub async fn run_network_event_reconciler(
674 mut event_rx: tokio::sync::mpsc::Receiver<NetworkEvent>,
675 result_tx: tokio::sync::mpsc::Sender<NetworkEventResult>,
676 processor: Arc<dyn NetworkEventProcessor>,
677 shutdown_token: CancellationToken,
678) {
679 tracing::info!("🔄 Network event reconciler started");
680
681 loop {
682 tokio::select! {
683 Some(first_event) = event_rx.recv() => {
684 let mut events = vec![first_event];
685 let settle = tokio::time::sleep(NETWORK_EVENT_SETTLE_WINDOW);
686 tokio::pin!(settle);
687
688 loop {
689 tokio::select! {
690 Some(next_event) = event_rx.recv() => {
691 events.push(next_event);
692 }
693 _ = &mut settle => {
694 break;
695 }
696 _ = shutdown_token.cancelled() => {
697 tracing::info!("🛑 Network event reconciler shutting down");
698 return;
699 }
700 else => {
701 break;
702 }
703 }
704 }
705
706 while let Ok(next_event) = event_rx.try_recv() {
707 events.push(next_event);
708 }
709
710 let action = select_network_recovery_action(&events);
711 tracing::info!(
712 event_count = events.len(),
713 action = ?action,
714 settle_window_ms = NETWORK_EVENT_SETTLE_WINDOW.as_millis() as u64,
715 "📱 Processing settled network event batch"
716 );
717
718 let results = process_network_event_batch(events, processor.clone()).await;
719
720 for result in results {
721 if let Err(e) = result_tx.send(result).await {
722 tracing::warn!("Failed to send event result: {}", e);
723 }
724 }
725 }
726 _ = shutdown_token.cancelled() => {
727 tracing::info!("🛑 Network event reconciler shutting down");
728 break;
729 }
730 else => break,
731 }
732 }
733}
734
735pub struct NetworkEventHandle {
740 event_tx: tokio::sync::mpsc::Sender<NetworkEvent>,
742
743 result_rx: Arc<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<NetworkEventResult>>>,
746}
747
748impl NetworkEventHandle {
749 pub fn new(
751 event_tx: tokio::sync::mpsc::Sender<NetworkEvent>,
752 result_rx: tokio::sync::mpsc::Receiver<NetworkEventResult>,
753 ) -> Self {
754 Self {
755 event_tx,
756 result_rx: Arc::new(tokio::sync::Mutex::new(result_rx)),
757 }
758 }
759
760 pub async fn handle_network_available(&self) -> Result<NetworkEventResult, String> {
766 self.send_event_and_await_result(NetworkEvent::Available)
767 .await
768 }
769
770 pub async fn handle_network_lost(&self) -> Result<NetworkEventResult, String> {
776 self.send_event_and_await_result(NetworkEvent::Lost).await
777 }
778
779 pub async fn handle_network_type_changed(
785 &self,
786 is_wifi: bool,
787 is_cellular: bool,
788 ) -> Result<NetworkEventResult, String> {
789 self.send_event_and_await_result(NetworkEvent::TypeChanged {
790 is_wifi,
791 is_cellular,
792 })
793 .await
794 }
795
796 pub async fn cleanup_connections(&self) -> Result<NetworkEventResult, String> {
808 self.send_event_and_await_result(NetworkEvent::CleanupConnections)
809 .await
810 }
811
812 async fn send_event_and_await_result(
814 &self,
815 event: NetworkEvent,
816 ) -> Result<NetworkEventResult, String> {
817 self.event_tx
819 .send(event.clone())
820 .await
821 .map_err(|e| format!("Failed to send network event: {}", e))?;
822
823 let mut rx = self.result_rx.lock().await;
825 rx.recv()
826 .await
827 .ok_or_else(|| "Failed to receive network event result".to_string())
828 }
829}
830
831impl Clone for NetworkEventHandle {
832 fn clone(&self) -> Self {
833 Self {
834 event_tx: self.event_tx.clone(),
835 result_rx: self.result_rx.clone(),
836 }
837 }
838}