1use std::sync::{
70 Arc,
71 atomic::{AtomicU64, Ordering},
72};
73use std::time::{Duration, Instant};
74
75use crate::transport::PeerTransport;
76use crate::wire::webrtc::{CleanupGuard, SignalingClient, WebRtcCoordinator};
77use tokio::sync::{mpsc, oneshot};
78use tokio_util::sync::CancellationToken;
79
80use super::connection_supervisor::{ConnectionFact, ConnectionSupervisor};
81
82const NETWORK_EVENT_SETTLE_WINDOW: Duration = Duration::from_millis(400);
83const NETWORK_EVENT_RESULT_TIMEOUT: Duration = Duration::from_secs(5);
84const SIGNALING_PROBE_TIMEOUT: Duration = Duration::from_secs(1);
85pub(super) const LONG_BACKGROUND_RECONNECT_THRESHOLD_MS: u64 = 30_000;
86static NEXT_NETWORK_EVENT_REQUEST_ID: AtomicU64 = AtomicU64::new(1);
87
88pub struct NetworkEventBarrier {
91 _cleanup_guard: CleanupGuard,
92}
93
94#[derive(Debug, Clone, PartialEq, Eq, Hash)]
96pub struct NetworkSnapshot {
97 pub sequence: u64,
98 pub availability: NetworkAvailability,
99 pub transport: NetworkTransportFlags,
100 pub is_expensive: bool,
101 pub is_constrained: bool,
102}
103
104impl NetworkSnapshot {
105 pub fn is_offline(&self) -> bool {
106 matches!(self.availability, NetworkAvailability::Unavailable)
107 }
108
109 pub fn should_restore(&self) -> bool {
110 matches!(self.availability, NetworkAvailability::Available)
111 }
112}
113
114#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
116pub enum NetworkAvailability {
117 Unknown,
118 Available,
119 Unavailable,
120}
121
122#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
124pub struct NetworkTransportFlags {
125 pub wifi: bool,
126 pub cellular: bool,
127 pub ethernet: bool,
128 pub vpn: bool,
129 pub other: bool,
130}
131
132#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
134pub enum AppLifecycleState {
135 Background,
136 Foreground { background_duration_ms: u64 },
137}
138
139#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
141pub enum CleanupReason {
142 AppTerminating,
143 UserLogout,
144 StaleConnectionSuspected,
145 ManualReset,
146}
147
148#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
150pub enum ReconnectReason {
151 NetworkPathChanged,
152 LongBackground,
153 ProbeFailed,
154 ManualReconnect,
155 StaleConnectionSuspected,
156}
157
158#[derive(Debug, Clone, PartialEq, Eq, Hash)]
160pub enum NetworkEvent {
161 NetworkPathChanged { snapshot: NetworkSnapshot },
163
164 AppLifecycleChanged { state: AppLifecycleState },
166
167 CleanupConnections { reason: CleanupReason },
174
175 ForceReconnect { reason: ReconnectReason },
177}
178
179#[derive(Debug, Clone, Copy, PartialEq, Eq)]
181pub enum NetworkRecoveryAction {
182 Noop,
183 Offline,
184 Probe,
185 Restore,
186 CleanupOnly,
187 ForceReconnect,
188}
189
190fn network_event_needs_lifecycle_barrier(event: &NetworkEvent) -> bool {
191 match event {
192 NetworkEvent::NetworkPathChanged { snapshot } => {
193 snapshot.is_offline() || snapshot.should_restore()
194 }
195 NetworkEvent::AppLifecycleChanged { state } => match state {
196 AppLifecycleState::Background => false,
197 AppLifecycleState::Foreground {
198 background_duration_ms,
199 } => *background_duration_ms >= LONG_BACKGROUND_RECONNECT_THRESHOLD_MS,
200 },
201 NetworkEvent::CleanupConnections { .. } | NetworkEvent::ForceReconnect { .. } => true,
202 }
203}
204
205#[derive(Debug, Clone)]
207pub struct NetworkEventResult {
208 pub event: NetworkEvent,
210
211 pub success: bool,
213
214 pub error: Option<String>,
216
217 pub duration_ms: u64,
219}
220
221impl NetworkEventResult {
222 pub fn success(event: NetworkEvent, duration_ms: u64) -> Self {
223 Self {
224 event,
225 success: true,
226 error: None,
227 duration_ms,
228 }
229 }
230
231 pub fn failure(event: NetworkEvent, error: String, duration_ms: u64) -> Self {
232 Self {
233 event,
234 success: false,
235 error: Some(error),
236 duration_ms,
237 }
238 }
239}
240
241#[async_trait::async_trait]
245pub trait NetworkEventProcessor: Send + Sync {
246 fn begin_network_event_barrier(&self, _event: &NetworkEvent) -> Option<NetworkEventBarrier> {
249 None
250 }
251
252 async fn process_network_available(&self) -> Result<(), String>;
258
259 async fn process_network_lost(&self) -> Result<(), String>;
265
266 async fn process_network_type_changed(
272 &self,
273 is_wifi: bool,
274 is_cellular: bool,
275 ) -> Result<(), String>;
276
277 async fn cleanup_connections(&self) -> Result<(), String>;
300
301 async fn probe_connectivity(&self) -> Result<(), String> {
303 Ok(())
304 }
305
306 async fn force_reconnect(&self) -> Result<(), String> {
308 self.cleanup_connections().await?;
309 self.process_network_available().await
310 }
311
312 async fn process_network_recovery_action(
317 &self,
318 action: NetworkRecoveryAction,
319 ) -> Result<(), String> {
320 match action {
321 NetworkRecoveryAction::Noop => Ok(()),
322 NetworkRecoveryAction::Offline => self.process_network_lost().await,
323 NetworkRecoveryAction::Probe => self.probe_connectivity().await,
324 NetworkRecoveryAction::Restore => self.process_network_available().await,
325 NetworkRecoveryAction::CleanupOnly => self.cleanup_connections().await,
326 NetworkRecoveryAction::ForceReconnect => self.force_reconnect().await,
327 }
328 }
329}
330
331#[derive(Debug, Clone)]
333pub struct DebounceConfig {
334 pub window: Duration,
336}
337
338impl Default for DebounceConfig {
339 fn default() -> Self {
340 Self {
341 window: Duration::from_secs(2),
343 }
344 }
345}
346
347#[derive(Debug)]
349struct DebounceState {
350 last_available: tokio::sync::Mutex<Option<Instant>>,
351 last_lost: tokio::sync::Mutex<Option<Instant>>,
352 last_type_changed: tokio::sync::Mutex<Option<Instant>>,
353}
354
355impl DebounceState {
356 fn new() -> Self {
357 Self {
358 last_available: tokio::sync::Mutex::new(None),
359 last_lost: tokio::sync::Mutex::new(None),
360 last_type_changed: tokio::sync::Mutex::new(None),
361 }
362 }
363}
364
365#[derive(Debug, Clone, Copy, PartialEq, Eq)]
366enum DebounceEvent {
367 Available,
368 Lost,
369 TypeChanged,
370}
371
372#[derive(Debug)]
373struct SignalingRecoveryState {
374 connect_lock: tokio::sync::Mutex<()>,
375 last_successful_connect: tokio::sync::Mutex<Option<Instant>>,
376}
377
378impl SignalingRecoveryState {
379 fn new() -> Self {
380 Self {
381 connect_lock: tokio::sync::Mutex::new(()),
382 last_successful_connect: tokio::sync::Mutex::new(None),
383 }
384 }
385}
386
387pub struct DefaultNetworkEventProcessor {
389 signaling_client: Arc<dyn SignalingClient>,
390 webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
391 peer_transport: Option<Arc<PeerTransport>>,
392 debounce_config: DebounceConfig,
393 debounce_state: Arc<DebounceState>,
394 recovery_state: Arc<SignalingRecoveryState>,
395}
396
397impl DefaultNetworkEventProcessor {
398 pub fn new(
399 signaling_client: Arc<dyn SignalingClient>,
400 webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
401 ) -> Self {
402 Self::new_with_debounce_and_peer_transport(
403 signaling_client,
404 webrtc_coordinator,
405 DebounceConfig::default(),
406 None,
407 )
408 }
409
410 pub fn new_with_debounce(
411 signaling_client: Arc<dyn SignalingClient>,
412 webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
413 debounce_config: DebounceConfig,
414 ) -> Self {
415 Self::new_with_debounce_and_peer_transport(
416 signaling_client,
417 webrtc_coordinator,
418 debounce_config,
419 None,
420 )
421 }
422
423 pub(crate) fn new_with_peer_transport(
424 signaling_client: Arc<dyn SignalingClient>,
425 webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
426 peer_transport: Option<Arc<PeerTransport>>,
427 ) -> Self {
428 Self::new_with_debounce_and_peer_transport(
429 signaling_client,
430 webrtc_coordinator,
431 DebounceConfig::default(),
432 peer_transport,
433 )
434 }
435
436 pub(crate) fn new_with_debounce_and_peer_transport(
437 signaling_client: Arc<dyn SignalingClient>,
438 webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
439 debounce_config: DebounceConfig,
440 peer_transport: Option<Arc<PeerTransport>>,
441 ) -> Self {
442 Self {
443 signaling_client,
444 webrtc_coordinator,
445 peer_transport,
446 debounce_config,
447 debounce_state: Arc::new(DebounceState::new()),
448 recovery_state: Arc::new(SignalingRecoveryState::new()),
449 }
450 }
451
452 fn lifecycle_barrier(&self) -> Option<NetworkEventBarrier> {
453 self.webrtc_coordinator
454 .as_ref()
455 .map(|coordinator| NetworkEventBarrier {
456 _cleanup_guard: coordinator.cleanup_guard(),
457 })
458 }
459
460 async fn should_process_event(&self, event: DebounceEvent) -> bool {
466 let now = Instant::now();
467
468 match event {
469 DebounceEvent::Available => {
470 let mut last = self.debounce_state.last_available.lock().await;
471 if let Some(last_time) = *last {
472 if now.duration_since(last_time) < self.debounce_config.window {
473 tracing::debug!(
474 "⏸️ Debouncing Network Available event (last event was {:?} ago)",
475 now.duration_since(last_time)
476 );
477 return false;
478 }
479 }
480 *last = Some(now);
481 true
482 }
483 DebounceEvent::Lost => {
484 let mut last = self.debounce_state.last_lost.lock().await;
485 if let Some(last_time) = *last {
486 if now.duration_since(last_time) < self.debounce_config.window {
487 tracing::debug!(
488 "⏸️ Debouncing Network Lost event (last event was {:?} ago)",
489 now.duration_since(last_time)
490 );
491 return false;
492 }
493 }
494 *last = Some(now);
495 true
496 }
497 DebounceEvent::TypeChanged => {
498 let mut last = self.debounce_state.last_type_changed.lock().await;
499 if let Some(last_time) = *last {
500 if now.duration_since(last_time) < self.debounce_config.window {
501 tracing::debug!(
502 "⏸️ Debouncing Network TypeChanged event (last event was {:?} ago)",
503 now.duration_since(last_time)
504 );
505 return false;
506 }
507 }
508 *last = Some(now);
509 true
510 }
511 }
512 }
513
514 async fn ensure_signaling_healthy_once(&self, reason: &str) -> Result<(), String> {
515 let _guard = self.recovery_state.connect_lock.lock().await;
516
517 if !self.signaling_client.is_connected() {
518 tracing::info!(
519 reason = reason,
520 "Network recovery event resetting signaling reconnect backoff before connect"
521 );
522 self.signaling_client
523 .schedule_auto_reconnect_reset_backoff();
524 tracing::info!(reason = reason, "🔄 Connecting signaling");
525 self.signaling_client.connect_once().await.map_err(|e| {
526 let err_msg = format!("WebSocket connect failed: {}", e);
527 tracing::error!("❌ {}", err_msg);
528 err_msg
529 })?;
530
531 *self.recovery_state.last_successful_connect.lock().await = Some(Instant::now());
532 tracing::info!(reason = reason, "✅ Signaling connected");
533 return Ok(());
534 }
535
536 tracing::debug!(
537 reason = reason,
538 timeout_ms = SIGNALING_PROBE_TIMEOUT.as_millis() as u64,
539 "🔎 Probing existing signaling WebSocket"
540 );
541
542 match self
543 .signaling_client
544 .probe_alive(SIGNALING_PROBE_TIMEOUT)
545 .await
546 {
547 Ok(()) => {
548 tracing::debug!(
549 reason = reason,
550 "✅ Signaling probe succeeded; keeping existing WebSocket"
551 );
552 Ok(())
553 }
554 Err(e) => {
555 tracing::warn!(
556 reason = reason,
557 "⚠️ Signaling probe failed; rebuilding WebSocket: {}",
558 e
559 );
560
561 if let Err(disconnect_err) = self.signaling_client.disconnect().await {
562 tracing::warn!(
563 reason = reason,
564 "⚠️ Failed to disconnect unhealthy signaling before rebuild: {}",
565 disconnect_err
566 );
567 }
568
569 tracing::info!(
570 reason = reason,
571 "Network recovery event resetting signaling reconnect backoff before rebuild"
572 );
573 self.signaling_client
574 .schedule_auto_reconnect_reset_backoff();
575 tracing::info!(reason = reason, "🔄 Rebuilding signaling: connecting");
576 self.signaling_client
577 .connect_once()
578 .await
579 .map_err(|connect_err| {
580 let err_msg = format!("WebSocket rebuild failed: {}", connect_err);
581 tracing::error!("❌ {}", err_msg);
582 err_msg
583 })?;
584
585 *self.recovery_state.last_successful_connect.lock().await = Some(Instant::now());
586 tracing::info!(reason = reason, "✅ Signaling rebuilt");
587 Ok(())
588 }
589 }
590 }
591
592 async fn restore_signaling_and_webrtc(&self, reason: &str) -> Result<(), String> {
593 let _cleanup_guard = self.lifecycle_barrier();
594 let recovery_targets = if let Some(coordinator) = self.webrtc_coordinator.clone() {
595 coordinator.begin_network_recovery(reason).await
596 } else {
597 Vec::new()
598 };
599
600 self.ensure_signaling_healthy_once(reason).await?;
601
602 let coordinator = self.webrtc_coordinator.clone();
603
604 if let Some(coordinator) = coordinator {
605 if recovery_targets.is_empty() {
606 tracing::info!("♻️ Resuming ICE restart for peers already in network recovery");
607 } else {
608 tracing::info!("♻️ Triggering ICE restart for recovering connections...");
609 }
610 coordinator.restart_network_recovery_connections().await;
611 }
612
613 Ok(())
614 }
615
616 fn schedule_auto_reconnect_after_recovery_failure(&self, reason: &str, err: &str) {
617 tracing::warn!(
618 reason = reason,
619 error = %err,
620 "Network recovery failed; ensuring signaling auto-reconnect remains scheduled"
621 );
622 self.signaling_client.schedule_auto_reconnect();
623 }
624
625 async fn restore_signaling_and_webrtc_from_network_event(
626 &self,
627 reason: &str,
628 ) -> Result<(), String> {
629 let result = self.restore_signaling_and_webrtc(reason).await;
630 if let Err(err) = &result {
631 self.schedule_auto_reconnect_after_recovery_failure(reason, err);
632 }
633 result
634 }
635
636 async fn probe_or_restore(&self, reason: &str) -> Result<(), String> {
637 match self.probe_connectivity().await {
638 Ok(()) => Ok(()),
639 Err(e) => {
640 tracing::warn!(
641 reason = reason,
642 "Connectivity probe failed; restoring connections: {}",
643 e
644 );
645 if let Err(disconnect_err) = self.signaling_client.disconnect().await {
646 tracing::warn!(
647 reason = reason,
648 "Failed to disconnect unhealthy signaling before restore: {}",
649 disconnect_err
650 );
651 }
652 self.restore_signaling_and_webrtc_from_network_event(reason)
653 .await
654 }
655 }
656 }
657
658 async fn process_offline(&self) -> Result<(), String> {
659 let _cleanup_guard = self.lifecycle_barrier();
660 tracing::info!("📱 Processing: Network offline");
661
662 if let Some(ref coordinator) = self.webrtc_coordinator {
663 coordinator.begin_network_recovery("NetworkLost").await;
664 tracing::info!("🧹 Clearing pending ICE restart attempts...");
665 coordinator.clear_pending_restarts().await;
666 }
667
668 tracing::info!("🔌 Disconnecting WebSocket...");
669 let _ = self.signaling_client.disconnect().await;
670
671 Ok(())
672 }
673}
674
675#[async_trait::async_trait]
676impl NetworkEventProcessor for DefaultNetworkEventProcessor {
677 fn begin_network_event_barrier(&self, event: &NetworkEvent) -> Option<NetworkEventBarrier> {
678 if network_event_needs_lifecycle_barrier(event) {
679 self.lifecycle_barrier()
680 } else {
681 None
682 }
683 }
684
685 async fn process_network_available(&self) -> Result<(), String> {
687 let should_process = self.should_process_event(DebounceEvent::Available).await;
689 if !should_process && self.signaling_client.is_connected() {
690 return Ok(());
691 }
692
693 tracing::info!("📱 Processing: Network available");
694
695 self.restore_signaling_and_webrtc_from_network_event("NetworkAvailable")
696 .await
697 }
698
699 async fn process_network_lost(&self) -> Result<(), String> {
701 if !self.should_process_event(DebounceEvent::Lost).await {
703 return Ok(());
704 }
705
706 self.process_offline().await
707 }
708
709 async fn process_network_type_changed(
711 &self,
712 is_wifi: bool,
713 is_cellular: bool,
714 ) -> Result<(), String> {
715 let should_process = self.should_process_event(DebounceEvent::TypeChanged).await;
717 if !should_process && self.signaling_client.is_connected() {
718 return Ok(());
719 }
720
721 tracing::info!(
722 "📱 Processing: Network type changed (WiFi={}, Cellular={})",
723 is_wifi,
724 is_cellular
725 );
726
727 self.restore_signaling_and_webrtc_from_network_event("NetworkTypeChanged")
728 .await
729 }
730
731 async fn cleanup_connections(&self) -> Result<(), String> {
737 let _cleanup_guard = self.lifecycle_barrier();
738
739 tracing::info!("🧹 Manually cleaning up all connections...");
740
741 if let Some(ref coordinator) = self.webrtc_coordinator {
743 tracing::info!("♻️ Clearing pending ICE restart attempts...");
744 coordinator.clear_pending_restarts().await;
745 }
746
747 if let Some(ref peer_transport) = self.peer_transport {
749 tracing::info!("🔻 Closing all PeerTransport connections...");
750 if let Err(e) = peer_transport.close_all().await {
751 let err_msg = format!("Failed to close peer transports: {}", e);
752 tracing::warn!("⚠️ {}", err_msg);
753 } else {
755 tracing::info!("✅ All PeerTransport connections closed");
756 }
757 }
758
759 if let Some(ref coordinator) = self.webrtc_coordinator {
761 tracing::info!("🔻 Closing all WebRTC peer connections...");
762 if let Err(e) = coordinator.close_all_peers().await {
763 let err_msg = format!("Failed to close all peers: {}", e);
764 tracing::warn!("⚠️ {}", err_msg);
765 } else {
767 tracing::info!("✅ All WebRTC peer connections closed");
768 }
769 }
770
771 tracing::info!("🔌 Disconnecting WebSocket...");
773 match self.signaling_client.disconnect().await {
774 Ok(_) => {
775 tracing::info!("✅ WebSocket disconnected successfully");
776 }
777 Err(e) => {
778 let err_msg = format!("Failed to disconnect WebSocket: {}", e);
779 tracing::warn!("⚠️ {}", err_msg);
780 }
782 }
783
784 tracing::info!("✅ Connection cleanup completed");
785
786 Ok(())
787 }
788
789 async fn probe_connectivity(&self) -> Result<(), String> {
790 self.signaling_client
791 .probe_alive(SIGNALING_PROBE_TIMEOUT)
792 .await
793 .map_err(|e| format!("Signaling probe failed: {}", e))
794 }
795
796 async fn force_reconnect(&self) -> Result<(), String> {
797 self.cleanup_connections().await?;
798 self.restore_signaling_and_webrtc_from_network_event("ForceReconnect")
799 .await
800 }
801
802 async fn process_network_recovery_action(
803 &self,
804 action: NetworkRecoveryAction,
805 ) -> Result<(), String> {
806 match action {
807 NetworkRecoveryAction::Noop => Ok(()),
808 NetworkRecoveryAction::Offline => self.process_offline().await,
809 NetworkRecoveryAction::Probe => self.probe_or_restore("Probe").await,
810 NetworkRecoveryAction::Restore => {
811 self.restore_signaling_and_webrtc_from_network_event("NetworkEventBatch")
812 .await
813 }
814 NetworkRecoveryAction::CleanupOnly => self.cleanup_connections().await,
815 NetworkRecoveryAction::ForceReconnect => self.force_reconnect().await,
816 }
817 }
818}
819
820pub fn select_network_recovery_action(events: &[NetworkEvent]) -> NetworkRecoveryAction {
821 ConnectionSupervisor::select_action(events)
822}
823
824pub async fn process_network_event_batch(
825 events: Vec<NetworkEvent>,
826 processor: Arc<dyn NetworkEventProcessor>,
827) -> Vec<NetworkEventResult> {
828 if events.is_empty() {
829 return Vec::new();
830 }
831
832 let action = select_network_recovery_action(&events);
833 let start = Instant::now();
834
835 tracing::info!(
836 event_count = events.len(),
837 action = ?action,
838 "network_event.action.start"
839 );
840
841 let result = processor.process_network_recovery_action(action).await;
842
843 let duration_ms = start.elapsed().as_millis() as u64;
844 match &result {
845 Ok(()) => tracing::info!(
846 event_count = events.len(),
847 action = ?action,
848 duration_ms,
849 "network_event.action.completed"
850 ),
851 Err(e) => tracing::warn!(
852 event_count = events.len(),
853 action = ?action,
854 duration_ms,
855 error = %e,
856 "network_event.action.completed"
857 ),
858 }
859
860 events
861 .into_iter()
862 .map(|event| match &result {
863 Ok(()) => NetworkEventResult::success(event, duration_ms),
864 Err(e) => NetworkEventResult::failure(event, e.clone(), duration_ms),
865 })
866 .collect()
867}
868
869pub struct NetworkEventRequest {
870 pub event: NetworkEvent,
871 pub result_tx: oneshot::Sender<NetworkEventResult>,
872}
873
874pub async fn run_network_event_reconciler(
875 mut event_rx: mpsc::Receiver<NetworkEventRequest>,
876 processor: Arc<dyn NetworkEventProcessor>,
877 shutdown_token: CancellationToken,
878) {
879 tracing::info!("🔄 Network event reconciler started");
880
881 loop {
882 tokio::select! {
883 Some(first_request) = event_rx.recv() => {
884 tracing::debug!(
885 event = ?first_request.event,
886 "network_event.reconciler.received"
887 );
888 let mut event_barrier = processor.begin_network_event_barrier(&first_request.event);
889 let mut requests = vec![first_request];
890 let settle = tokio::time::sleep(NETWORK_EVENT_SETTLE_WINDOW);
891 tokio::pin!(settle);
892
893 loop {
894 tokio::select! {
895 Some(next_request) = event_rx.recv() => {
896 tracing::debug!(
897 event = ?next_request.event,
898 "network_event.reconciler.coalesced"
899 );
900 if event_barrier.is_none() {
901 event_barrier = processor.begin_network_event_barrier(&next_request.event);
902 }
903 requests.push(next_request);
904 }
905 _ = &mut settle => {
906 break;
907 }
908 _ = shutdown_token.cancelled() => {
909 tracing::info!("🛑 Network event reconciler shutting down");
910 return;
911 }
912 else => {
913 break;
914 }
915 }
916 }
917
918 while let Ok(next_request) = event_rx.try_recv() {
919 tracing::debug!(
920 event = ?next_request.event,
921 "network_event.reconciler.coalesced"
922 );
923 if event_barrier.is_none() {
924 event_barrier = processor.begin_network_event_barrier(&next_request.event);
925 }
926 requests.push(next_request);
927 }
928
929 let events = requests
930 .iter()
931 .map(|request| request.event.clone())
932 .collect::<Vec<_>>();
933 let action = select_network_recovery_action(&events);
934 let facts = events
935 .iter()
936 .map(ConnectionFact::from_network_event)
937 .collect::<Vec<_>>();
938 tracing::info!(
939 event_count = events.len(),
940 action = ?action,
941 events = ?events,
942 facts = ?facts,
943 settle_window_ms = NETWORK_EVENT_SETTLE_WINDOW.as_millis() as u64,
944 "network_event.reconciler.batch_reconciled"
945 );
946
947 let results = process_network_event_batch(events, processor.clone()).await;
948 drop(event_barrier);
949
950 for (request, result) in requests.into_iter().zip(results) {
951 if request.result_tx.send(result).is_err() {
952 tracing::debug!("Network event caller dropped before receiving result");
953 }
954 }
955 }
956 _ = shutdown_token.cancelled() => {
957 tracing::info!("🛑 Network event reconciler shutting down");
958 break;
959 }
960 else => break,
961 }
962 }
963}
964
965pub struct NetworkEventHandle {
970 event_tx: mpsc::Sender<NetworkEventRequest>,
972 result_timeout: Duration,
973}
974
975impl NetworkEventHandle {
976 pub fn new(event_tx: mpsc::Sender<NetworkEventRequest>) -> Self {
978 Self::new_with_result_timeout(event_tx, NETWORK_EVENT_RESULT_TIMEOUT)
979 }
980
981 pub fn new_with_result_timeout(
987 event_tx: mpsc::Sender<NetworkEventRequest>,
988 result_timeout: Duration,
989 ) -> Self {
990 Self {
991 event_tx,
992 result_timeout,
993 }
994 }
995
996 pub async fn handle_network_path_changed(
998 &self,
999 snapshot: NetworkSnapshot,
1000 ) -> Result<NetworkEventResult, String> {
1001 self.send_event_and_await_result(NetworkEvent::NetworkPathChanged { snapshot })
1002 .await
1003 }
1004
1005 pub async fn handle_app_lifecycle_changed(
1007 &self,
1008 state: AppLifecycleState,
1009 ) -> Result<NetworkEventResult, String> {
1010 self.send_event_and_await_result(NetworkEvent::AppLifecycleChanged { state })
1011 .await
1012 }
1013
1014 pub async fn cleanup_connections(
1016 &self,
1017 reason: CleanupReason,
1018 ) -> Result<NetworkEventResult, String> {
1019 self.send_event_and_await_result(NetworkEvent::CleanupConnections { reason })
1020 .await
1021 }
1022
1023 pub async fn force_reconnect(
1025 &self,
1026 reason: ReconnectReason,
1027 ) -> Result<NetworkEventResult, String> {
1028 self.send_event_and_await_result(NetworkEvent::ForceReconnect { reason })
1029 .await
1030 }
1031
1032 async fn send_event_and_await_result(
1034 &self,
1035 event: NetworkEvent,
1036 ) -> Result<NetworkEventResult, String> {
1037 let event_request_id = NEXT_NETWORK_EVENT_REQUEST_ID.fetch_add(1, Ordering::Relaxed);
1038 let start = Instant::now();
1039 let (result_tx, result_rx) = oneshot::channel();
1040 let request = NetworkEventRequest {
1041 event: event.clone(),
1042 result_tx,
1043 };
1044
1045 tracing::info!(
1046 event_request_id,
1047 event = ?event,
1048 result_timeout_ms = self.result_timeout.as_millis() as u64,
1049 "network_event.handle.enqueue"
1050 );
1051
1052 if let Err(e) = self.event_tx.send(request).await {
1053 let err = format!("Failed to send network event: {}", e);
1054 tracing::warn!(
1055 event_request_id,
1056 event = ?event,
1057 error = %err,
1058 "network_event.handle.enqueue_failed"
1059 );
1060 return Err(err);
1061 }
1062
1063 let result = match tokio::time::timeout(self.result_timeout, result_rx).await {
1064 Ok(Ok(result)) => Ok(result),
1065 Ok(Err(_)) => Err("Failed to receive network event result".to_string()),
1066 Err(_) => Err(format!(
1067 "Timed out waiting for network event result after {}ms",
1068 self.result_timeout.as_millis()
1069 )),
1070 };
1071
1072 let wait_ms = start.elapsed().as_millis() as u64;
1073 match &result {
1074 Ok(result) if result.success => tracing::info!(
1075 event_request_id,
1076 event = ?event,
1077 result_event = ?result.event,
1078 duration_ms = result.duration_ms,
1079 wait_ms,
1080 "network_event.handle.result_received"
1081 ),
1082 Ok(result) => tracing::warn!(
1083 event_request_id,
1084 event = ?event,
1085 result_event = ?result.event,
1086 duration_ms = result.duration_ms,
1087 wait_ms,
1088 error = ?result.error,
1089 "network_event.handle.result_received"
1090 ),
1091 Err(e) => tracing::warn!(
1092 event_request_id,
1093 event = ?event,
1094 wait_ms,
1095 error = %e,
1096 "network_event.handle.result_failed"
1097 ),
1098 }
1099
1100 result
1101 }
1102}
1103
1104impl Clone for NetworkEventHandle {
1105 fn clone(&self) -> Self {
1106 Self {
1107 event_tx: self.event_tx.clone(),
1108 result_timeout: self.result_timeout,
1109 }
1110 }
1111}
1112
1113#[cfg(test)]
1114mod tests {
1115 use super::*;
1116 use crate::lifecycle::CredentialState;
1117 use crate::transport::{NetworkError, NetworkResult};
1118 use crate::wire::webrtc::{SignalingEvent, SignalingStats};
1119 use actr_protocol::{
1120 AIdCredential, ActrId, Pong, RegisterRequest, RegisterResponse, RouteCandidatesRequest,
1121 RouteCandidatesResponse, ServiceAvailabilityState, SignalingEnvelope, UnregisterResponse,
1122 };
1123 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering as AtomicOrdering};
1124 use tokio::sync::broadcast;
1125
1126 struct ForceReconnectFakeSignalingClient {
1127 connected: AtomicBool,
1128 connect_once_should_fail: bool,
1129 disconnect_calls: AtomicUsize,
1130 connect_once_calls: AtomicUsize,
1131 schedule_auto_reconnect_calls: AtomicUsize,
1132 schedule_auto_reconnect_reset_backoff_calls: AtomicUsize,
1133 event_tx: broadcast::Sender<SignalingEvent>,
1134 }
1135
1136 impl ForceReconnectFakeSignalingClient {
1137 fn new(connect_once_should_fail: bool) -> Self {
1138 let (event_tx, _rx) = broadcast::channel(8);
1139 Self {
1140 connected: AtomicBool::new(false),
1141 connect_once_should_fail,
1142 disconnect_calls: AtomicUsize::new(0),
1143 connect_once_calls: AtomicUsize::new(0),
1144 schedule_auto_reconnect_calls: AtomicUsize::new(0),
1145 schedule_auto_reconnect_reset_backoff_calls: AtomicUsize::new(0),
1146 event_tx,
1147 }
1148 }
1149 }
1150
1151 #[async_trait::async_trait]
1152 impl SignalingClient for ForceReconnectFakeSignalingClient {
1153 async fn connect(&self) -> NetworkResult<()> {
1154 Ok(())
1155 }
1156
1157 async fn connect_once(&self) -> NetworkResult<()> {
1158 self.connect_once_calls.fetch_add(1, AtomicOrdering::SeqCst);
1159 if self.connect_once_should_fail {
1160 return Err(NetworkError::ConnectionError(
1161 "forced connect_once failure".to_string(),
1162 ));
1163 }
1164
1165 self.connected.store(true, AtomicOrdering::SeqCst);
1166 Ok(())
1167 }
1168
1169 fn schedule_auto_reconnect(&self) {
1170 self.schedule_auto_reconnect_calls
1171 .fetch_add(1, AtomicOrdering::SeqCst);
1172 }
1173
1174 fn schedule_auto_reconnect_reset_backoff(&self) {
1175 self.schedule_auto_reconnect_reset_backoff_calls
1176 .fetch_add(1, AtomicOrdering::SeqCst);
1177 self.schedule_auto_reconnect();
1178 }
1179
1180 async fn disconnect(&self) -> NetworkResult<()> {
1181 self.disconnect_calls.fetch_add(1, AtomicOrdering::SeqCst);
1182 self.connected.store(false, AtomicOrdering::SeqCst);
1183 Ok(())
1184 }
1185
1186 async fn send_register_request(
1187 &self,
1188 _request: RegisterRequest,
1189 ) -> NetworkResult<RegisterResponse> {
1190 Err(NetworkError::ConnectionError("unused".to_string()))
1191 }
1192
1193 async fn send_unregister_request(
1194 &self,
1195 _actor_id: ActrId,
1196 _credential: AIdCredential,
1197 _reason: Option<String>,
1198 ) -> NetworkResult<UnregisterResponse> {
1199 Err(NetworkError::ConnectionError("unused".to_string()))
1200 }
1201
1202 async fn send_heartbeat(
1203 &self,
1204 _actor_id: ActrId,
1205 _credential: AIdCredential,
1206 _availability: ServiceAvailabilityState,
1207 _power_reserve: f32,
1208 _mailbox_backlog: f32,
1209 ) -> NetworkResult<Pong> {
1210 Err(NetworkError::ConnectionError("unused".to_string()))
1211 }
1212
1213 async fn send_route_candidates_request(
1214 &self,
1215 _actor_id: ActrId,
1216 _credential: AIdCredential,
1217 _request: RouteCandidatesRequest,
1218 ) -> NetworkResult<RouteCandidatesResponse> {
1219 Err(NetworkError::ConnectionError("unused".to_string()))
1220 }
1221
1222 async fn get_signing_key(
1223 &self,
1224 _actor_id: ActrId,
1225 _credential: AIdCredential,
1226 _key_id: u32,
1227 ) -> NetworkResult<(u32, Vec<u8>)> {
1228 Err(NetworkError::ConnectionError("unused".to_string()))
1229 }
1230
1231 async fn send_credential_update_request(
1232 &self,
1233 _actor_id: ActrId,
1234 _credential: AIdCredential,
1235 ) -> NetworkResult<RegisterResponse> {
1236 Err(NetworkError::ConnectionError("unused".to_string()))
1237 }
1238
1239 async fn send_envelope(&self, _envelope: SignalingEnvelope) -> NetworkResult<()> {
1240 Err(NetworkError::ConnectionError("unused".to_string()))
1241 }
1242
1243 async fn receive_envelope(&self) -> NetworkResult<Option<SignalingEnvelope>> {
1244 Err(NetworkError::ConnectionError("unused".to_string()))
1245 }
1246
1247 fn is_connected(&self) -> bool {
1248 self.connected.load(AtomicOrdering::SeqCst)
1249 }
1250
1251 fn get_stats(&self) -> SignalingStats {
1252 SignalingStats::default()
1253 }
1254
1255 fn subscribe_events(&self) -> broadcast::Receiver<SignalingEvent> {
1256 self.event_tx.subscribe()
1257 }
1258
1259 async fn set_actor_id(&self, _actor_id: ActrId) {}
1260
1261 async fn set_credential_state(&self, _credential_state: CredentialState) {}
1262
1263 async fn clear_identity(&self) {}
1264 }
1265
1266 fn snapshot(sequence: u64, availability: NetworkAvailability) -> NetworkSnapshot {
1267 NetworkSnapshot {
1268 sequence,
1269 availability,
1270 transport: NetworkTransportFlags::default(),
1271 is_expensive: false,
1272 is_constrained: false,
1273 }
1274 }
1275
1276 #[test]
1277 fn lifecycle_barrier_is_scoped_to_events_that_change_connections() {
1278 let cases = [
1279 (
1280 NetworkEvent::NetworkPathChanged {
1281 snapshot: snapshot(1, NetworkAvailability::Unavailable),
1282 },
1283 true,
1284 ),
1285 (
1286 NetworkEvent::NetworkPathChanged {
1287 snapshot: snapshot(2, NetworkAvailability::Available),
1288 },
1289 true,
1290 ),
1291 (
1292 NetworkEvent::NetworkPathChanged {
1293 snapshot: snapshot(3, NetworkAvailability::Unknown),
1294 },
1295 false,
1296 ),
1297 (
1298 NetworkEvent::AppLifecycleChanged {
1299 state: AppLifecycleState::Background,
1300 },
1301 false,
1302 ),
1303 (
1304 NetworkEvent::AppLifecycleChanged {
1305 state: AppLifecycleState::Foreground {
1306 background_duration_ms: LONG_BACKGROUND_RECONNECT_THRESHOLD_MS - 1,
1307 },
1308 },
1309 false,
1310 ),
1311 (
1312 NetworkEvent::AppLifecycleChanged {
1313 state: AppLifecycleState::Foreground {
1314 background_duration_ms: LONG_BACKGROUND_RECONNECT_THRESHOLD_MS,
1315 },
1316 },
1317 true,
1318 ),
1319 (
1320 NetworkEvent::CleanupConnections {
1321 reason: CleanupReason::ManualReset,
1322 },
1323 true,
1324 ),
1325 (
1326 NetworkEvent::ForceReconnect {
1327 reason: ReconnectReason::ManualReconnect,
1328 },
1329 true,
1330 ),
1331 ];
1332
1333 for (event, expected) in cases {
1334 assert_eq!(
1335 network_event_needs_lifecycle_barrier(&event),
1336 expected,
1337 "{event:?}"
1338 );
1339 }
1340 }
1341
1342 #[tokio::test]
1343 async fn force_reconnect_failure_schedules_auto_reconnect() {
1344 let signaling = Arc::new(ForceReconnectFakeSignalingClient::new(true));
1345 let processor = DefaultNetworkEventProcessor::new(signaling.clone(), None);
1346
1347 let result = processor.force_reconnect().await;
1348
1349 assert!(result.is_err());
1350 assert_eq!(
1351 signaling.disconnect_calls.load(AtomicOrdering::SeqCst),
1352 1,
1353 "ForceReconnect cleanup should disconnect signaling once"
1354 );
1355 assert_eq!(
1356 signaling.connect_once_calls.load(AtomicOrdering::SeqCst),
1357 1,
1358 "ForceReconnect restore should make one quick connect attempt"
1359 );
1360 assert_eq!(
1361 signaling
1362 .schedule_auto_reconnect_calls
1363 .load(AtomicOrdering::SeqCst),
1364 2,
1365 "ForceReconnect should wake auto-reconnect before restore and keep it scheduled after failure"
1366 );
1367 assert_eq!(
1368 signaling
1369 .schedule_auto_reconnect_reset_backoff_calls
1370 .load(AtomicOrdering::SeqCst),
1371 1,
1372 "ForceReconnect should reset reconnect backoff before the quick restore attempt"
1373 );
1374 }
1375
1376 #[tokio::test]
1377 async fn restore_failure_schedules_auto_reconnect_reset_backoff() {
1378 let signaling = Arc::new(ForceReconnectFakeSignalingClient::new(true));
1379 let processor = DefaultNetworkEventProcessor::new(signaling.clone(), None);
1380
1381 let result = processor
1382 .process_network_recovery_action(NetworkRecoveryAction::Restore)
1383 .await;
1384
1385 assert!(result.is_err());
1386 assert_eq!(
1387 signaling.connect_once_calls.load(AtomicOrdering::SeqCst),
1388 1,
1389 "Restore should make one quick connect attempt"
1390 );
1391 assert_eq!(
1392 signaling
1393 .schedule_auto_reconnect_reset_backoff_calls
1394 .load(AtomicOrdering::SeqCst),
1395 1,
1396 "failed Restore should reset reconnect backoff"
1397 );
1398 }
1399
1400 #[tokio::test]
1401 async fn restore_schedules_reset_backoff_before_quick_connect() {
1402 let signaling = Arc::new(ForceReconnectFakeSignalingClient::new(false));
1403 let processor = DefaultNetworkEventProcessor::new(signaling.clone(), None);
1404
1405 processor
1406 .process_network_recovery_action(NetworkRecoveryAction::Restore)
1407 .await
1408 .expect("Restore should connect successfully");
1409
1410 assert_eq!(
1411 signaling.connect_once_calls.load(AtomicOrdering::SeqCst),
1412 1,
1413 "Restore should make one quick connect attempt"
1414 );
1415 assert_eq!(
1416 signaling
1417 .schedule_auto_reconnect_reset_backoff_calls
1418 .load(AtomicOrdering::SeqCst),
1419 1,
1420 "Restore should reset reconnect backoff before the quick connect attempt"
1421 );
1422 }
1423}