1use std::sync::{
70 Arc,
71 atomic::{AtomicU64, Ordering},
72};
73use std::time::{Duration, Instant};
74
75use crate::transport::PeerTransport;
76use crate::wire::webrtc::{CleanupGuard, SignalingClient, WebRtcCoordinator};
77use tokio::sync::{mpsc, oneshot};
78use tokio_util::sync::CancellationToken;
79
80use super::connection_supervisor::{ConnectionFact, ConnectionSupervisor};
81
82const NETWORK_EVENT_SETTLE_WINDOW: Duration = Duration::from_millis(400);
83const NETWORK_EVENT_RESULT_TIMEOUT: Duration = Duration::from_secs(5);
84const SIGNALING_PROBE_TIMEOUT: Duration = Duration::from_secs(1);
85pub(super) const LONG_BACKGROUND_RECONNECT_THRESHOLD_MS: u64 = 30_000;
86static NEXT_NETWORK_EVENT_REQUEST_ID: AtomicU64 = AtomicU64::new(1);
87
88pub struct NetworkEventBarrier {
91 _cleanup_guard: CleanupGuard,
92}
93
94#[derive(Debug, Clone, PartialEq, Eq, Hash)]
96pub struct NetworkSnapshot {
97 pub sequence: u64,
98 pub availability: NetworkAvailability,
99 pub transport: NetworkTransportFlags,
100 pub is_expensive: bool,
101 pub is_constrained: bool,
102}
103
104impl NetworkSnapshot {
105 pub fn is_offline(&self) -> bool {
106 matches!(self.availability, NetworkAvailability::Unavailable)
107 }
108
109 pub fn should_restore(&self) -> bool {
110 matches!(self.availability, NetworkAvailability::Available)
111 }
112}
113
114#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
116pub enum NetworkAvailability {
117 Unknown,
118 Available,
119 Unavailable,
120}
121
122#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
124pub struct NetworkTransportFlags {
125 pub wifi: bool,
126 pub cellular: bool,
127 pub ethernet: bool,
128 pub vpn: bool,
129 pub other: bool,
130}
131
132#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
134pub enum AppLifecycleState {
135 Background,
136 Foreground { background_duration_ms: u64 },
137}
138
139#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
141pub enum CleanupReason {
142 AppTerminating,
143 UserLogout,
144 StaleConnectionSuspected,
145 ManualReset,
146}
147
148#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
150pub enum ReconnectReason {
151 NetworkPathChanged,
152 LongBackground,
153 ProbeFailed,
154 ManualReconnect,
155 StaleConnectionSuspected,
156}
157
158#[derive(Debug, Clone, PartialEq, Eq, Hash)]
160pub enum NetworkEvent {
161 NetworkPathChanged { snapshot: NetworkSnapshot },
163
164 AppLifecycleChanged { state: AppLifecycleState },
166
167 CleanupConnections { reason: CleanupReason },
174
175 ForceReconnect { reason: ReconnectReason },
177}
178
179#[derive(Debug, Clone, Copy, PartialEq, Eq)]
181pub enum NetworkRecoveryAction {
182 Noop,
183 Offline,
184 Probe,
185 Restore,
186 CleanupOnly,
187 ForceReconnect,
188}
189
190fn network_event_needs_lifecycle_barrier(event: &NetworkEvent) -> bool {
191 match event {
192 NetworkEvent::NetworkPathChanged { snapshot } => {
193 snapshot.is_offline() || snapshot.should_restore()
194 }
195 NetworkEvent::AppLifecycleChanged { state } => match state {
196 AppLifecycleState::Background => false,
197 AppLifecycleState::Foreground {
198 background_duration_ms,
199 } => *background_duration_ms >= LONG_BACKGROUND_RECONNECT_THRESHOLD_MS,
200 },
201 NetworkEvent::CleanupConnections { .. } | NetworkEvent::ForceReconnect { .. } => true,
202 }
203}
204
205#[derive(Debug, Clone)]
207pub struct NetworkEventResult {
208 pub event: NetworkEvent,
210
211 pub success: bool,
213
214 pub error: Option<String>,
216
217 pub duration_ms: u64,
219}
220
221impl NetworkEventResult {
222 pub fn success(event: NetworkEvent, duration_ms: u64) -> Self {
223 Self {
224 event,
225 success: true,
226 error: None,
227 duration_ms,
228 }
229 }
230
231 pub fn failure(event: NetworkEvent, error: String, duration_ms: u64) -> Self {
232 Self {
233 event,
234 success: false,
235 error: Some(error),
236 duration_ms,
237 }
238 }
239}
240
241#[async_trait::async_trait]
245pub trait NetworkEventProcessor: Send + Sync {
246 fn begin_network_event_barrier(&self, _event: &NetworkEvent) -> Option<NetworkEventBarrier> {
249 None
250 }
251
252 async fn process_network_available(&self) -> Result<(), String>;
258
259 async fn process_network_lost(&self) -> Result<(), String>;
265
266 async fn process_network_type_changed(
272 &self,
273 is_wifi: bool,
274 is_cellular: bool,
275 ) -> Result<(), String>;
276
277 async fn cleanup_connections(&self) -> Result<(), String>;
300
301 async fn probe_connectivity(&self) -> Result<(), String> {
303 Ok(())
304 }
305
306 async fn force_reconnect(&self) -> Result<(), String> {
308 self.cleanup_connections().await?;
309 self.process_network_available().await
310 }
311
312 async fn process_network_recovery_action(
317 &self,
318 action: NetworkRecoveryAction,
319 ) -> Result<(), String> {
320 match action {
321 NetworkRecoveryAction::Noop => Ok(()),
322 NetworkRecoveryAction::Offline => self.process_network_lost().await,
323 NetworkRecoveryAction::Probe => self.probe_connectivity().await,
324 NetworkRecoveryAction::Restore => self.process_network_available().await,
325 NetworkRecoveryAction::CleanupOnly => self.cleanup_connections().await,
326 NetworkRecoveryAction::ForceReconnect => self.force_reconnect().await,
327 }
328 }
329}
330
331#[derive(Debug, Clone)]
333pub struct DebounceConfig {
334 pub window: Duration,
336}
337
338impl Default for DebounceConfig {
339 fn default() -> Self {
340 Self {
341 window: Duration::from_secs(2),
343 }
344 }
345}
346
347#[derive(Debug)]
349struct DebounceState {
350 last_available: tokio::sync::Mutex<Option<Instant>>,
351 last_lost: tokio::sync::Mutex<Option<Instant>>,
352 last_type_changed: tokio::sync::Mutex<Option<Instant>>,
353}
354
355impl DebounceState {
356 fn new() -> Self {
357 Self {
358 last_available: tokio::sync::Mutex::new(None),
359 last_lost: tokio::sync::Mutex::new(None),
360 last_type_changed: tokio::sync::Mutex::new(None),
361 }
362 }
363}
364
365#[derive(Debug, Clone, Copy, PartialEq, Eq)]
366enum DebounceEvent {
367 Available,
368 Lost,
369 TypeChanged,
370}
371
372#[derive(Debug)]
373struct SignalingRecoveryState {
374 connect_lock: tokio::sync::Mutex<()>,
375 last_successful_connect: tokio::sync::Mutex<Option<Instant>>,
376}
377
378impl SignalingRecoveryState {
379 fn new() -> Self {
380 Self {
381 connect_lock: tokio::sync::Mutex::new(()),
382 last_successful_connect: tokio::sync::Mutex::new(None),
383 }
384 }
385}
386
387pub struct DefaultNetworkEventProcessor {
389 signaling_client: Arc<dyn SignalingClient>,
390 webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
391 peer_transport: Option<Arc<PeerTransport>>,
392 debounce_config: DebounceConfig,
393 debounce_state: Arc<DebounceState>,
394 recovery_state: Arc<SignalingRecoveryState>,
395}
396
397impl DefaultNetworkEventProcessor {
398 pub fn new(
399 signaling_client: Arc<dyn SignalingClient>,
400 webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
401 ) -> Self {
402 Self::new_with_debounce_and_peer_transport(
403 signaling_client,
404 webrtc_coordinator,
405 DebounceConfig::default(),
406 None,
407 )
408 }
409
410 pub fn new_with_debounce(
411 signaling_client: Arc<dyn SignalingClient>,
412 webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
413 debounce_config: DebounceConfig,
414 ) -> Self {
415 Self::new_with_debounce_and_peer_transport(
416 signaling_client,
417 webrtc_coordinator,
418 debounce_config,
419 None,
420 )
421 }
422
423 pub(crate) fn new_with_peer_transport(
424 signaling_client: Arc<dyn SignalingClient>,
425 webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
426 peer_transport: Option<Arc<PeerTransport>>,
427 ) -> Self {
428 Self::new_with_debounce_and_peer_transport(
429 signaling_client,
430 webrtc_coordinator,
431 DebounceConfig::default(),
432 peer_transport,
433 )
434 }
435
436 pub(crate) fn new_with_debounce_and_peer_transport(
437 signaling_client: Arc<dyn SignalingClient>,
438 webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
439 debounce_config: DebounceConfig,
440 peer_transport: Option<Arc<PeerTransport>>,
441 ) -> Self {
442 Self {
443 signaling_client,
444 webrtc_coordinator,
445 peer_transport,
446 debounce_config,
447 debounce_state: Arc::new(DebounceState::new()),
448 recovery_state: Arc::new(SignalingRecoveryState::new()),
449 }
450 }
451
452 fn lifecycle_barrier(&self) -> Option<NetworkEventBarrier> {
453 self.webrtc_coordinator
454 .as_ref()
455 .map(|coordinator| NetworkEventBarrier {
456 _cleanup_guard: coordinator.cleanup_guard(),
457 })
458 }
459
460 async fn should_process_event(&self, event: DebounceEvent) -> bool {
466 let now = Instant::now();
467
468 match event {
469 DebounceEvent::Available => {
470 let mut last = self.debounce_state.last_available.lock().await;
471 if let Some(last_time) = *last {
472 if now.duration_since(last_time) < self.debounce_config.window {
473 tracing::debug!(
474 "⏸️ Debouncing Network Available event (last event was {:?} ago)",
475 now.duration_since(last_time)
476 );
477 return false;
478 }
479 }
480 *last = Some(now);
481 true
482 }
483 DebounceEvent::Lost => {
484 let mut last = self.debounce_state.last_lost.lock().await;
485 if let Some(last_time) = *last {
486 if now.duration_since(last_time) < self.debounce_config.window {
487 tracing::debug!(
488 "⏸️ Debouncing Network Lost event (last event was {:?} ago)",
489 now.duration_since(last_time)
490 );
491 return false;
492 }
493 }
494 *last = Some(now);
495 true
496 }
497 DebounceEvent::TypeChanged => {
498 let mut last = self.debounce_state.last_type_changed.lock().await;
499 if let Some(last_time) = *last {
500 if now.duration_since(last_time) < self.debounce_config.window {
501 tracing::debug!(
502 "⏸️ Debouncing Network TypeChanged event (last event was {:?} ago)",
503 now.duration_since(last_time)
504 );
505 return false;
506 }
507 }
508 *last = Some(now);
509 true
510 }
511 }
512 }
513
514 async fn ensure_signaling_healthy_once(&self, reason: &str) -> Result<(), String> {
515 let _guard = self.recovery_state.connect_lock.lock().await;
516
517 if !self.signaling_client.is_connected() {
518 tracing::info!(reason = reason, "🔄 Connecting signaling");
519 self.signaling_client.connect_once().await.map_err(|e| {
520 let err_msg = format!("WebSocket connect failed: {}", e);
521 tracing::error!("❌ {}", err_msg);
522 err_msg
523 })?;
524
525 *self.recovery_state.last_successful_connect.lock().await = Some(Instant::now());
526 tracing::info!(reason = reason, "✅ Signaling connected");
527 return Ok(());
528 }
529
530 tracing::debug!(
531 reason = reason,
532 timeout_ms = SIGNALING_PROBE_TIMEOUT.as_millis() as u64,
533 "🔎 Probing existing signaling WebSocket"
534 );
535
536 match self
537 .signaling_client
538 .probe_alive(SIGNALING_PROBE_TIMEOUT)
539 .await
540 {
541 Ok(()) => {
542 tracing::debug!(
543 reason = reason,
544 "✅ Signaling probe succeeded; keeping existing WebSocket"
545 );
546 Ok(())
547 }
548 Err(e) => {
549 tracing::warn!(
550 reason = reason,
551 "⚠️ Signaling probe failed; rebuilding WebSocket: {}",
552 e
553 );
554
555 if let Err(disconnect_err) = self.signaling_client.disconnect().await {
556 tracing::warn!(
557 reason = reason,
558 "⚠️ Failed to disconnect unhealthy signaling before rebuild: {}",
559 disconnect_err
560 );
561 }
562
563 tracing::info!(reason = reason, "🔄 Rebuilding signaling: connecting");
564 self.signaling_client
565 .connect_once()
566 .await
567 .map_err(|connect_err| {
568 let err_msg = format!("WebSocket rebuild failed: {}", connect_err);
569 tracing::error!("❌ {}", err_msg);
570 err_msg
571 })?;
572
573 *self.recovery_state.last_successful_connect.lock().await = Some(Instant::now());
574 tracing::info!(reason = reason, "✅ Signaling rebuilt");
575 Ok(())
576 }
577 }
578 }
579
580 async fn restore_signaling_and_webrtc(&self, reason: &str) -> Result<(), String> {
581 let _cleanup_guard = self.lifecycle_barrier();
582 let recovery_targets = if let Some(coordinator) = self.webrtc_coordinator.clone() {
583 coordinator.begin_network_recovery(reason).await
584 } else {
585 Vec::new()
586 };
587
588 self.ensure_signaling_healthy_once(reason).await?;
589
590 let coordinator = self.webrtc_coordinator.clone();
591
592 if let Some(coordinator) = coordinator {
593 if recovery_targets.is_empty() {
594 tracing::info!("♻️ Resuming ICE restart for peers already in network recovery");
595 } else {
596 tracing::info!("♻️ Triggering ICE restart for recovering connections...");
597 }
598 coordinator.restart_network_recovery_connections().await;
599 }
600
601 Ok(())
602 }
603
604 async fn probe_or_restore(&self, reason: &str) -> Result<(), String> {
605 match self.probe_connectivity().await {
606 Ok(()) => Ok(()),
607 Err(e) => {
608 tracing::warn!(
609 reason = reason,
610 "Connectivity probe failed; restoring connections: {}",
611 e
612 );
613 self.restore_signaling_and_webrtc(reason).await
614 }
615 }
616 }
617
618 async fn process_offline(&self) -> Result<(), String> {
619 let _cleanup_guard = self.lifecycle_barrier();
620 tracing::info!("📱 Processing: Network offline");
621
622 if let Some(ref coordinator) = self.webrtc_coordinator {
623 coordinator.begin_network_recovery("NetworkLost").await;
624 tracing::info!("🧹 Clearing pending ICE restart attempts...");
625 coordinator.clear_pending_restarts().await;
626 }
627
628 tracing::info!("🔌 Disconnecting WebSocket...");
629 let _ = self.signaling_client.disconnect().await;
630
631 Ok(())
632 }
633}
634
635#[async_trait::async_trait]
636impl NetworkEventProcessor for DefaultNetworkEventProcessor {
637 fn begin_network_event_barrier(&self, event: &NetworkEvent) -> Option<NetworkEventBarrier> {
638 if network_event_needs_lifecycle_barrier(event) {
639 self.lifecycle_barrier()
640 } else {
641 None
642 }
643 }
644
645 async fn process_network_available(&self) -> Result<(), String> {
647 let should_process = self.should_process_event(DebounceEvent::Available).await;
649 if !should_process && self.signaling_client.is_connected() {
650 return Ok(());
651 }
652
653 tracing::info!("📱 Processing: Network available");
654
655 self.restore_signaling_and_webrtc("NetworkAvailable").await
656 }
657
658 async fn process_network_lost(&self) -> Result<(), String> {
660 if !self.should_process_event(DebounceEvent::Lost).await {
662 return Ok(());
663 }
664
665 self.process_offline().await
666 }
667
668 async fn process_network_type_changed(
670 &self,
671 is_wifi: bool,
672 is_cellular: bool,
673 ) -> Result<(), String> {
674 let should_process = self.should_process_event(DebounceEvent::TypeChanged).await;
676 if !should_process && self.signaling_client.is_connected() {
677 return Ok(());
678 }
679
680 tracing::info!(
681 "📱 Processing: Network type changed (WiFi={}, Cellular={})",
682 is_wifi,
683 is_cellular
684 );
685
686 self.restore_signaling_and_webrtc("NetworkTypeChanged")
687 .await
688 }
689
690 async fn cleanup_connections(&self) -> Result<(), String> {
696 let _cleanup_guard = self.lifecycle_barrier();
697
698 tracing::info!("🧹 Manually cleaning up all connections...");
699
700 if let Some(ref coordinator) = self.webrtc_coordinator {
702 tracing::info!("♻️ Clearing pending ICE restart attempts...");
703 coordinator.clear_pending_restarts().await;
704 }
705
706 if let Some(ref peer_transport) = self.peer_transport {
708 tracing::info!("🔻 Closing all PeerTransport connections...");
709 if let Err(e) = peer_transport.close_all().await {
710 let err_msg = format!("Failed to close peer transports: {}", e);
711 tracing::warn!("⚠️ {}", err_msg);
712 } else {
714 tracing::info!("✅ All PeerTransport connections closed");
715 }
716 }
717
718 if let Some(ref coordinator) = self.webrtc_coordinator {
720 tracing::info!("🔻 Closing all WebRTC peer connections...");
721 if let Err(e) = coordinator.close_all_peers().await {
722 let err_msg = format!("Failed to close all peers: {}", e);
723 tracing::warn!("⚠️ {}", err_msg);
724 } else {
726 tracing::info!("✅ All WebRTC peer connections closed");
727 }
728 }
729
730 tracing::info!("🔌 Disconnecting WebSocket...");
732 match self.signaling_client.disconnect().await {
733 Ok(_) => {
734 tracing::info!("✅ WebSocket disconnected successfully");
735 }
736 Err(e) => {
737 let err_msg = format!("Failed to disconnect WebSocket: {}", e);
738 tracing::warn!("⚠️ {}", err_msg);
739 }
741 }
742
743 tracing::info!("✅ Connection cleanup completed");
744
745 Ok(())
746 }
747
748 async fn probe_connectivity(&self) -> Result<(), String> {
749 self.signaling_client
750 .probe_alive(SIGNALING_PROBE_TIMEOUT)
751 .await
752 .map_err(|e| format!("Signaling probe failed: {}", e))
753 }
754
755 async fn force_reconnect(&self) -> Result<(), String> {
756 self.cleanup_connections().await?;
757 self.restore_signaling_and_webrtc("ForceReconnect").await
758 }
759
760 async fn process_network_recovery_action(
761 &self,
762 action: NetworkRecoveryAction,
763 ) -> Result<(), String> {
764 match action {
765 NetworkRecoveryAction::Noop => Ok(()),
766 NetworkRecoveryAction::Offline => self.process_offline().await,
767 NetworkRecoveryAction::Probe => self.probe_or_restore("Probe").await,
768 NetworkRecoveryAction::Restore => {
769 self.restore_signaling_and_webrtc("NetworkEventBatch").await
770 }
771 NetworkRecoveryAction::CleanupOnly => self.cleanup_connections().await,
772 NetworkRecoveryAction::ForceReconnect => self.force_reconnect().await,
773 }
774 }
775}
776
777pub fn select_network_recovery_action(events: &[NetworkEvent]) -> NetworkRecoveryAction {
778 ConnectionSupervisor::select_action(events)
779}
780
781pub async fn process_network_event_batch(
782 events: Vec<NetworkEvent>,
783 processor: Arc<dyn NetworkEventProcessor>,
784) -> Vec<NetworkEventResult> {
785 if events.is_empty() {
786 return Vec::new();
787 }
788
789 let action = select_network_recovery_action(&events);
790 let start = Instant::now();
791
792 tracing::info!(
793 event_count = events.len(),
794 action = ?action,
795 "network_event.action.start"
796 );
797
798 let result = processor.process_network_recovery_action(action).await;
799
800 let duration_ms = start.elapsed().as_millis() as u64;
801 match &result {
802 Ok(()) => tracing::info!(
803 event_count = events.len(),
804 action = ?action,
805 duration_ms,
806 "network_event.action.completed"
807 ),
808 Err(e) => tracing::warn!(
809 event_count = events.len(),
810 action = ?action,
811 duration_ms,
812 error = %e,
813 "network_event.action.completed"
814 ),
815 }
816
817 events
818 .into_iter()
819 .map(|event| match &result {
820 Ok(()) => NetworkEventResult::success(event, duration_ms),
821 Err(e) => NetworkEventResult::failure(event, e.clone(), duration_ms),
822 })
823 .collect()
824}
825
826pub struct NetworkEventRequest {
827 pub event: NetworkEvent,
828 pub result_tx: oneshot::Sender<NetworkEventResult>,
829}
830
831pub async fn run_network_event_reconciler(
832 mut event_rx: mpsc::Receiver<NetworkEventRequest>,
833 processor: Arc<dyn NetworkEventProcessor>,
834 shutdown_token: CancellationToken,
835) {
836 tracing::info!("🔄 Network event reconciler started");
837
838 loop {
839 tokio::select! {
840 Some(first_request) = event_rx.recv() => {
841 tracing::debug!(
842 event = ?first_request.event,
843 "network_event.reconciler.received"
844 );
845 let mut event_barrier = processor.begin_network_event_barrier(&first_request.event);
846 let mut requests = vec![first_request];
847 let settle = tokio::time::sleep(NETWORK_EVENT_SETTLE_WINDOW);
848 tokio::pin!(settle);
849
850 loop {
851 tokio::select! {
852 Some(next_request) = event_rx.recv() => {
853 tracing::debug!(
854 event = ?next_request.event,
855 "network_event.reconciler.coalesced"
856 );
857 if event_barrier.is_none() {
858 event_barrier = processor.begin_network_event_barrier(&next_request.event);
859 }
860 requests.push(next_request);
861 }
862 _ = &mut settle => {
863 break;
864 }
865 _ = shutdown_token.cancelled() => {
866 tracing::info!("🛑 Network event reconciler shutting down");
867 return;
868 }
869 else => {
870 break;
871 }
872 }
873 }
874
875 while let Ok(next_request) = event_rx.try_recv() {
876 tracing::debug!(
877 event = ?next_request.event,
878 "network_event.reconciler.coalesced"
879 );
880 if event_barrier.is_none() {
881 event_barrier = processor.begin_network_event_barrier(&next_request.event);
882 }
883 requests.push(next_request);
884 }
885
886 let events = requests
887 .iter()
888 .map(|request| request.event.clone())
889 .collect::<Vec<_>>();
890 let action = select_network_recovery_action(&events);
891 let facts = events
892 .iter()
893 .map(ConnectionFact::from_network_event)
894 .collect::<Vec<_>>();
895 tracing::info!(
896 event_count = events.len(),
897 action = ?action,
898 events = ?events,
899 facts = ?facts,
900 settle_window_ms = NETWORK_EVENT_SETTLE_WINDOW.as_millis() as u64,
901 "network_event.reconciler.batch_reconciled"
902 );
903
904 let results = process_network_event_batch(events, processor.clone()).await;
905 drop(event_barrier);
906
907 for (request, result) in requests.into_iter().zip(results) {
908 if request.result_tx.send(result).is_err() {
909 tracing::debug!("Network event caller dropped before receiving result");
910 }
911 }
912 }
913 _ = shutdown_token.cancelled() => {
914 tracing::info!("🛑 Network event reconciler shutting down");
915 break;
916 }
917 else => break,
918 }
919 }
920}
921
922pub struct NetworkEventHandle {
927 event_tx: mpsc::Sender<NetworkEventRequest>,
929 result_timeout: Duration,
930}
931
932impl NetworkEventHandle {
933 pub fn new(event_tx: mpsc::Sender<NetworkEventRequest>) -> Self {
935 Self::new_with_result_timeout(event_tx, NETWORK_EVENT_RESULT_TIMEOUT)
936 }
937
938 pub fn new_with_result_timeout(
944 event_tx: mpsc::Sender<NetworkEventRequest>,
945 result_timeout: Duration,
946 ) -> Self {
947 Self {
948 event_tx,
949 result_timeout,
950 }
951 }
952
953 pub async fn handle_network_path_changed(
955 &self,
956 snapshot: NetworkSnapshot,
957 ) -> Result<NetworkEventResult, String> {
958 self.send_event_and_await_result(NetworkEvent::NetworkPathChanged { snapshot })
959 .await
960 }
961
962 pub async fn handle_app_lifecycle_changed(
964 &self,
965 state: AppLifecycleState,
966 ) -> Result<NetworkEventResult, String> {
967 self.send_event_and_await_result(NetworkEvent::AppLifecycleChanged { state })
968 .await
969 }
970
971 pub async fn cleanup_connections(
973 &self,
974 reason: CleanupReason,
975 ) -> Result<NetworkEventResult, String> {
976 self.send_event_and_await_result(NetworkEvent::CleanupConnections { reason })
977 .await
978 }
979
980 pub async fn force_reconnect(
982 &self,
983 reason: ReconnectReason,
984 ) -> Result<NetworkEventResult, String> {
985 self.send_event_and_await_result(NetworkEvent::ForceReconnect { reason })
986 .await
987 }
988
989 async fn send_event_and_await_result(
991 &self,
992 event: NetworkEvent,
993 ) -> Result<NetworkEventResult, String> {
994 let event_request_id = NEXT_NETWORK_EVENT_REQUEST_ID.fetch_add(1, Ordering::Relaxed);
995 let start = Instant::now();
996 let (result_tx, result_rx) = oneshot::channel();
997 let request = NetworkEventRequest {
998 event: event.clone(),
999 result_tx,
1000 };
1001
1002 tracing::info!(
1003 event_request_id,
1004 event = ?event,
1005 result_timeout_ms = self.result_timeout.as_millis() as u64,
1006 "network_event.handle.enqueue"
1007 );
1008
1009 if let Err(e) = self.event_tx.send(request).await {
1010 let err = format!("Failed to send network event: {}", e);
1011 tracing::warn!(
1012 event_request_id,
1013 event = ?event,
1014 error = %err,
1015 "network_event.handle.enqueue_failed"
1016 );
1017 return Err(err);
1018 }
1019
1020 let result = match tokio::time::timeout(self.result_timeout, result_rx).await {
1021 Ok(Ok(result)) => Ok(result),
1022 Ok(Err(_)) => Err("Failed to receive network event result".to_string()),
1023 Err(_) => Err(format!(
1024 "Timed out waiting for network event result after {}ms",
1025 self.result_timeout.as_millis()
1026 )),
1027 };
1028
1029 let wait_ms = start.elapsed().as_millis() as u64;
1030 match &result {
1031 Ok(result) if result.success => tracing::info!(
1032 event_request_id,
1033 event = ?event,
1034 result_event = ?result.event,
1035 duration_ms = result.duration_ms,
1036 wait_ms,
1037 "network_event.handle.result_received"
1038 ),
1039 Ok(result) => tracing::warn!(
1040 event_request_id,
1041 event = ?event,
1042 result_event = ?result.event,
1043 duration_ms = result.duration_ms,
1044 wait_ms,
1045 error = ?result.error,
1046 "network_event.handle.result_received"
1047 ),
1048 Err(e) => tracing::warn!(
1049 event_request_id,
1050 event = ?event,
1051 wait_ms,
1052 error = %e,
1053 "network_event.handle.result_failed"
1054 ),
1055 }
1056
1057 result
1058 }
1059}
1060
1061impl Clone for NetworkEventHandle {
1062 fn clone(&self) -> Self {
1063 Self {
1064 event_tx: self.event_tx.clone(),
1065 result_timeout: self.result_timeout,
1066 }
1067 }
1068}
1069
1070#[cfg(test)]
1071mod tests {
1072 use super::*;
1073
1074 fn snapshot(sequence: u64, availability: NetworkAvailability) -> NetworkSnapshot {
1075 NetworkSnapshot {
1076 sequence,
1077 availability,
1078 transport: NetworkTransportFlags::default(),
1079 is_expensive: false,
1080 is_constrained: false,
1081 }
1082 }
1083
1084 #[test]
1085 fn lifecycle_barrier_is_scoped_to_events_that_change_connections() {
1086 let cases = [
1087 (
1088 NetworkEvent::NetworkPathChanged {
1089 snapshot: snapshot(1, NetworkAvailability::Unavailable),
1090 },
1091 true,
1092 ),
1093 (
1094 NetworkEvent::NetworkPathChanged {
1095 snapshot: snapshot(2, NetworkAvailability::Available),
1096 },
1097 true,
1098 ),
1099 (
1100 NetworkEvent::NetworkPathChanged {
1101 snapshot: snapshot(3, NetworkAvailability::Unknown),
1102 },
1103 false,
1104 ),
1105 (
1106 NetworkEvent::AppLifecycleChanged {
1107 state: AppLifecycleState::Background,
1108 },
1109 false,
1110 ),
1111 (
1112 NetworkEvent::AppLifecycleChanged {
1113 state: AppLifecycleState::Foreground {
1114 background_duration_ms: LONG_BACKGROUND_RECONNECT_THRESHOLD_MS - 1,
1115 },
1116 },
1117 false,
1118 ),
1119 (
1120 NetworkEvent::AppLifecycleChanged {
1121 state: AppLifecycleState::Foreground {
1122 background_duration_ms: LONG_BACKGROUND_RECONNECT_THRESHOLD_MS,
1123 },
1124 },
1125 true,
1126 ),
1127 (
1128 NetworkEvent::CleanupConnections {
1129 reason: CleanupReason::ManualReset,
1130 },
1131 true,
1132 ),
1133 (
1134 NetworkEvent::ForceReconnect {
1135 reason: ReconnectReason::ManualReconnect,
1136 },
1137 true,
1138 ),
1139 ];
1140
1141 for (event, expected) in cases {
1142 assert_eq!(
1143 network_event_needs_lifecycle_barrier(&event),
1144 expected,
1145 "{event:?}"
1146 );
1147 }
1148 }
1149}