1use std::sync::{
70 Arc,
71 atomic::{AtomicU64, Ordering},
72};
73use std::time::{Duration, Instant};
74
75use crate::transport::PeerTransport;
76use crate::wire::webrtc::{CleanupGuard, SignalingClient, WebRtcCoordinator};
77use tokio::sync::{mpsc, oneshot};
78use tokio_util::sync::CancellationToken;
79
80use super::connection_supervisor::{ConnectionFact, ConnectionSupervisor};
81
82const NETWORK_EVENT_SETTLE_WINDOW: Duration = Duration::from_millis(400);
83const NETWORK_EVENT_RESULT_TIMEOUT: Duration = Duration::from_secs(5);
84const SIGNALING_PROBE_TIMEOUT: Duration = Duration::from_secs(1);
85pub(super) const LONG_BACKGROUND_RECONNECT_THRESHOLD_MS: u64 = 30_000;
86static NEXT_NETWORK_EVENT_REQUEST_ID: AtomicU64 = AtomicU64::new(1);
87
88pub struct NetworkEventBarrier {
91 _cleanup_guard: CleanupGuard,
92}
93
94#[derive(Debug, Clone, PartialEq, Eq, Hash)]
96pub struct NetworkSnapshot {
97 pub sequence: u64,
98 pub availability: NetworkAvailability,
99 pub transport: NetworkTransportFlags,
100 pub is_expensive: bool,
101 pub is_constrained: bool,
102}
103
104impl NetworkSnapshot {
105 pub fn is_offline(&self) -> bool {
106 matches!(self.availability, NetworkAvailability::Unavailable)
107 }
108
109 pub fn should_restore(&self) -> bool {
110 matches!(self.availability, NetworkAvailability::Available)
111 }
112}
113
114#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
116pub enum NetworkAvailability {
117 Unknown,
118 Available,
119 Unavailable,
120}
121
122#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
124pub struct NetworkTransportFlags {
125 pub wifi: bool,
126 pub cellular: bool,
127 pub ethernet: bool,
128 pub vpn: bool,
129 pub other: bool,
130}
131
132#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
134pub enum AppLifecycleState {
135 Background,
136 Foreground { background_duration_ms: u64 },
137}
138
139#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
141pub enum CleanupReason {
142 AppTerminating,
143 UserLogout,
144 StaleConnectionSuspected,
145 ManualReset,
146}
147
148#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
150pub enum ReconnectReason {
151 NetworkPathChanged,
152 LongBackground,
153 ProbeFailed,
154 ManualReconnect,
155 StaleConnectionSuspected,
156}
157
158#[derive(Debug, Clone, PartialEq, Eq, Hash)]
160pub enum NetworkEvent {
161 NetworkPathChanged { snapshot: NetworkSnapshot },
163
164 AppLifecycleChanged { state: AppLifecycleState },
166
167 CleanupConnections { reason: CleanupReason },
174
175 ForceReconnect { reason: ReconnectReason },
177}
178
179#[derive(Debug, Clone, Copy, PartialEq, Eq)]
181pub enum NetworkRecoveryAction {
182 Noop,
183 Offline,
184 Probe,
185 Restore,
186 CleanupOnly,
187 ForceReconnect,
188}
189
190#[derive(Debug, Clone)]
192pub struct NetworkEventResult {
193 pub event: NetworkEvent,
195
196 pub success: bool,
198
199 pub error: Option<String>,
201
202 pub duration_ms: u64,
204}
205
206impl NetworkEventResult {
207 pub fn success(event: NetworkEvent, duration_ms: u64) -> Self {
208 Self {
209 event,
210 success: true,
211 error: None,
212 duration_ms,
213 }
214 }
215
216 pub fn failure(event: NetworkEvent, error: String, duration_ms: u64) -> Self {
217 Self {
218 event,
219 success: false,
220 error: Some(error),
221 duration_ms,
222 }
223 }
224}
225
226#[async_trait::async_trait]
230pub trait NetworkEventProcessor: Send + Sync {
231 fn begin_network_event_barrier(&self, _event: &NetworkEvent) -> Option<NetworkEventBarrier> {
234 None
235 }
236
237 async fn process_network_available(&self) -> Result<(), String>;
243
244 async fn process_network_lost(&self) -> Result<(), String>;
250
251 async fn process_network_type_changed(
257 &self,
258 is_wifi: bool,
259 is_cellular: bool,
260 ) -> Result<(), String>;
261
262 async fn cleanup_connections(&self) -> Result<(), String>;
285
286 async fn probe_connectivity(&self) -> Result<(), String> {
288 Ok(())
289 }
290
291 async fn force_reconnect(&self) -> Result<(), String> {
293 self.cleanup_connections().await?;
294 self.process_network_available().await
295 }
296
297 async fn process_network_recovery_action(
302 &self,
303 action: NetworkRecoveryAction,
304 ) -> Result<(), String> {
305 match action {
306 NetworkRecoveryAction::Noop => Ok(()),
307 NetworkRecoveryAction::Offline => self.process_network_lost().await,
308 NetworkRecoveryAction::Probe => self.probe_connectivity().await,
309 NetworkRecoveryAction::Restore => self.process_network_available().await,
310 NetworkRecoveryAction::CleanupOnly => self.cleanup_connections().await,
311 NetworkRecoveryAction::ForceReconnect => self.force_reconnect().await,
312 }
313 }
314}
315
316#[derive(Debug, Clone)]
318pub struct DebounceConfig {
319 pub window: Duration,
321}
322
323impl Default for DebounceConfig {
324 fn default() -> Self {
325 Self {
326 window: Duration::from_secs(2),
328 }
329 }
330}
331
332#[derive(Debug)]
334struct DebounceState {
335 last_available: tokio::sync::Mutex<Option<Instant>>,
336 last_lost: tokio::sync::Mutex<Option<Instant>>,
337 last_type_changed: tokio::sync::Mutex<Option<Instant>>,
338}
339
340impl DebounceState {
341 fn new() -> Self {
342 Self {
343 last_available: tokio::sync::Mutex::new(None),
344 last_lost: tokio::sync::Mutex::new(None),
345 last_type_changed: tokio::sync::Mutex::new(None),
346 }
347 }
348}
349
350#[derive(Debug, Clone, Copy, PartialEq, Eq)]
351enum DebounceEvent {
352 Available,
353 Lost,
354 TypeChanged,
355}
356
357#[derive(Debug)]
358struct SignalingRecoveryState {
359 connect_lock: tokio::sync::Mutex<()>,
360 last_successful_connect: tokio::sync::Mutex<Option<Instant>>,
361}
362
363impl SignalingRecoveryState {
364 fn new() -> Self {
365 Self {
366 connect_lock: tokio::sync::Mutex::new(()),
367 last_successful_connect: tokio::sync::Mutex::new(None),
368 }
369 }
370}
371
372pub struct DefaultNetworkEventProcessor {
374 signaling_client: Arc<dyn SignalingClient>,
375 webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
376 peer_transport: Option<Arc<PeerTransport>>,
377 debounce_config: DebounceConfig,
378 debounce_state: Arc<DebounceState>,
379 recovery_state: Arc<SignalingRecoveryState>,
380}
381
382impl DefaultNetworkEventProcessor {
383 pub fn new(
384 signaling_client: Arc<dyn SignalingClient>,
385 webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
386 ) -> Self {
387 Self::new_with_debounce_and_peer_transport(
388 signaling_client,
389 webrtc_coordinator,
390 DebounceConfig::default(),
391 None,
392 )
393 }
394
395 pub fn new_with_debounce(
396 signaling_client: Arc<dyn SignalingClient>,
397 webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
398 debounce_config: DebounceConfig,
399 ) -> Self {
400 Self::new_with_debounce_and_peer_transport(
401 signaling_client,
402 webrtc_coordinator,
403 debounce_config,
404 None,
405 )
406 }
407
408 pub(crate) fn new_with_peer_transport(
409 signaling_client: Arc<dyn SignalingClient>,
410 webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
411 peer_transport: Option<Arc<PeerTransport>>,
412 ) -> Self {
413 Self::new_with_debounce_and_peer_transport(
414 signaling_client,
415 webrtc_coordinator,
416 DebounceConfig::default(),
417 peer_transport,
418 )
419 }
420
421 pub(crate) fn new_with_debounce_and_peer_transport(
422 signaling_client: Arc<dyn SignalingClient>,
423 webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
424 debounce_config: DebounceConfig,
425 peer_transport: Option<Arc<PeerTransport>>,
426 ) -> Self {
427 Self {
428 signaling_client,
429 webrtc_coordinator,
430 peer_transport,
431 debounce_config,
432 debounce_state: Arc::new(DebounceState::new()),
433 recovery_state: Arc::new(SignalingRecoveryState::new()),
434 }
435 }
436
437 async fn should_process_event(&self, event: DebounceEvent) -> bool {
443 let now = Instant::now();
444
445 match event {
446 DebounceEvent::Available => {
447 let mut last = self.debounce_state.last_available.lock().await;
448 if let Some(last_time) = *last {
449 if now.duration_since(last_time) < self.debounce_config.window {
450 tracing::debug!(
451 "⏸️ Debouncing Network Available event (last event was {:?} ago)",
452 now.duration_since(last_time)
453 );
454 return false;
455 }
456 }
457 *last = Some(now);
458 true
459 }
460 DebounceEvent::Lost => {
461 let mut last = self.debounce_state.last_lost.lock().await;
462 if let Some(last_time) = *last {
463 if now.duration_since(last_time) < self.debounce_config.window {
464 tracing::debug!(
465 "⏸️ Debouncing Network Lost event (last event was {:?} ago)",
466 now.duration_since(last_time)
467 );
468 return false;
469 }
470 }
471 *last = Some(now);
472 true
473 }
474 DebounceEvent::TypeChanged => {
475 let mut last = self.debounce_state.last_type_changed.lock().await;
476 if let Some(last_time) = *last {
477 if now.duration_since(last_time) < self.debounce_config.window {
478 tracing::debug!(
479 "⏸️ Debouncing Network TypeChanged event (last event was {:?} ago)",
480 now.duration_since(last_time)
481 );
482 return false;
483 }
484 }
485 *last = Some(now);
486 true
487 }
488 }
489 }
490
491 async fn ensure_signaling_healthy_once(&self, reason: &str) -> Result<(), String> {
492 let _guard = self.recovery_state.connect_lock.lock().await;
493
494 if !self.signaling_client.is_connected() {
495 tracing::info!(reason = reason, "🔄 Connecting signaling");
496 self.signaling_client.connect_once().await.map_err(|e| {
497 let err_msg = format!("WebSocket connect failed: {}", e);
498 tracing::error!("❌ {}", err_msg);
499 err_msg
500 })?;
501
502 *self.recovery_state.last_successful_connect.lock().await = Some(Instant::now());
503 tracing::info!(reason = reason, "✅ Signaling connected");
504 return Ok(());
505 }
506
507 tracing::debug!(
508 reason = reason,
509 timeout_ms = SIGNALING_PROBE_TIMEOUT.as_millis() as u64,
510 "🔎 Probing existing signaling WebSocket"
511 );
512
513 match self
514 .signaling_client
515 .probe_alive(SIGNALING_PROBE_TIMEOUT)
516 .await
517 {
518 Ok(()) => {
519 tracing::debug!(
520 reason = reason,
521 "✅ Signaling probe succeeded; keeping existing WebSocket"
522 );
523 Ok(())
524 }
525 Err(e) => {
526 tracing::warn!(
527 reason = reason,
528 "⚠️ Signaling probe failed; rebuilding WebSocket: {}",
529 e
530 );
531
532 if let Err(disconnect_err) = self.signaling_client.disconnect().await {
533 tracing::warn!(
534 reason = reason,
535 "⚠️ Failed to disconnect unhealthy signaling before rebuild: {}",
536 disconnect_err
537 );
538 }
539
540 tracing::info!(reason = reason, "🔄 Rebuilding signaling: connecting");
541 self.signaling_client
542 .connect_once()
543 .await
544 .map_err(|connect_err| {
545 let err_msg = format!("WebSocket rebuild failed: {}", connect_err);
546 tracing::error!("❌ {}", err_msg);
547 err_msg
548 })?;
549
550 *self.recovery_state.last_successful_connect.lock().await = Some(Instant::now());
551 tracing::info!(reason = reason, "✅ Signaling rebuilt");
552 Ok(())
553 }
554 }
555 }
556
557 async fn restore_signaling_and_webrtc(&self, reason: &str) -> Result<(), String> {
558 let recovery_targets = if let Some(coordinator) = self.webrtc_coordinator.clone() {
559 coordinator.begin_network_recovery(reason).await
560 } else {
561 Vec::new()
562 };
563
564 self.ensure_signaling_healthy_once(reason).await?;
565
566 let coordinator = self.webrtc_coordinator.clone();
567
568 if let Some(coordinator) = coordinator {
569 if recovery_targets.is_empty() {
570 tracing::info!("♻️ Resuming ICE restart for peers already in network recovery");
571 } else {
572 tracing::info!("♻️ Triggering ICE restart for recovering connections...");
573 }
574 coordinator.restart_network_recovery_connections().await;
575 }
576
577 Ok(())
578 }
579
580 async fn probe_or_restore(&self, reason: &str) -> Result<(), String> {
581 match self.probe_connectivity().await {
582 Ok(()) => Ok(()),
583 Err(e) => {
584 tracing::warn!(
585 reason = reason,
586 "Connectivity probe failed; restoring connections: {}",
587 e
588 );
589 self.restore_signaling_and_webrtc(reason).await
590 }
591 }
592 }
593
594 async fn process_offline(&self) -> Result<(), String> {
595 tracing::info!("📱 Processing: Network offline");
596
597 if let Some(ref coordinator) = self.webrtc_coordinator {
598 coordinator.begin_network_recovery("NetworkLost").await;
599 tracing::info!("🧹 Clearing pending ICE restart attempts...");
600 coordinator.clear_pending_restarts().await;
601 }
602
603 tracing::info!("🔌 Disconnecting WebSocket...");
604 let _ = self.signaling_client.disconnect().await;
605
606 Ok(())
607 }
608}
609
610#[async_trait::async_trait]
611impl NetworkEventProcessor for DefaultNetworkEventProcessor {
612 fn begin_network_event_barrier(&self, event: &NetworkEvent) -> Option<NetworkEventBarrier> {
613 match event {
614 NetworkEvent::NetworkPathChanged { .. }
615 | NetworkEvent::AppLifecycleChanged { .. }
616 | NetworkEvent::CleanupConnections { .. }
617 | NetworkEvent::ForceReconnect { .. } => {
618 self.webrtc_coordinator
619 .as_ref()
620 .map(|coordinator| NetworkEventBarrier {
621 _cleanup_guard: coordinator.cleanup_guard(),
622 })
623 }
624 }
625 }
626
627 async fn process_network_available(&self) -> Result<(), String> {
629 let should_process = self.should_process_event(DebounceEvent::Available).await;
631 if !should_process && self.signaling_client.is_connected() {
632 return Ok(());
633 }
634
635 tracing::info!("📱 Processing: Network available");
636
637 self.restore_signaling_and_webrtc("NetworkAvailable").await
638 }
639
640 async fn process_network_lost(&self) -> Result<(), String> {
642 if !self.should_process_event(DebounceEvent::Lost).await {
644 return Ok(());
645 }
646
647 self.process_offline().await
648 }
649
650 async fn process_network_type_changed(
652 &self,
653 is_wifi: bool,
654 is_cellular: bool,
655 ) -> Result<(), String> {
656 let should_process = self.should_process_event(DebounceEvent::TypeChanged).await;
658 if !should_process && self.signaling_client.is_connected() {
659 return Ok(());
660 }
661
662 tracing::info!(
663 "📱 Processing: Network type changed (WiFi={}, Cellular={})",
664 is_wifi,
665 is_cellular
666 );
667
668 self.restore_signaling_and_webrtc("NetworkTypeChanged")
669 .await
670 }
671
672 async fn cleanup_connections(&self) -> Result<(), String> {
678 let _cleanup_guard = self
679 .webrtc_coordinator
680 .as_ref()
681 .map(|coordinator| coordinator.cleanup_guard());
682
683 tracing::info!("🧹 Manually cleaning up all connections...");
684
685 if let Some(ref coordinator) = self.webrtc_coordinator {
687 tracing::info!("♻️ Clearing pending ICE restart attempts...");
688 coordinator.clear_pending_restarts().await;
689 }
690
691 if let Some(ref peer_transport) = self.peer_transport {
693 tracing::info!("🔻 Closing all PeerTransport connections...");
694 if let Err(e) = peer_transport.close_all().await {
695 let err_msg = format!("Failed to close peer transports: {}", e);
696 tracing::warn!("⚠️ {}", err_msg);
697 } else {
699 tracing::info!("✅ All PeerTransport connections closed");
700 }
701 }
702
703 if let Some(ref coordinator) = self.webrtc_coordinator {
705 tracing::info!("🔻 Closing all WebRTC peer connections...");
706 if let Err(e) = coordinator.close_all_peers().await {
707 let err_msg = format!("Failed to close all peers: {}", e);
708 tracing::warn!("⚠️ {}", err_msg);
709 } else {
711 tracing::info!("✅ All WebRTC peer connections closed");
712 }
713 }
714
715 tracing::info!("🔌 Disconnecting WebSocket...");
717 match self.signaling_client.disconnect().await {
718 Ok(_) => {
719 tracing::info!("✅ WebSocket disconnected successfully");
720 }
721 Err(e) => {
722 let err_msg = format!("Failed to disconnect WebSocket: {}", e);
723 tracing::warn!("⚠️ {}", err_msg);
724 }
726 }
727
728 tracing::info!("✅ Connection cleanup completed");
729
730 Ok(())
731 }
732
733 async fn probe_connectivity(&self) -> Result<(), String> {
734 self.signaling_client
735 .probe_alive(SIGNALING_PROBE_TIMEOUT)
736 .await
737 .map_err(|e| format!("Signaling probe failed: {}", e))
738 }
739
740 async fn force_reconnect(&self) -> Result<(), String> {
741 self.cleanup_connections().await?;
742 self.restore_signaling_and_webrtc("ForceReconnect").await
743 }
744
745 async fn process_network_recovery_action(
746 &self,
747 action: NetworkRecoveryAction,
748 ) -> Result<(), String> {
749 match action {
750 NetworkRecoveryAction::Noop => Ok(()),
751 NetworkRecoveryAction::Offline => self.process_offline().await,
752 NetworkRecoveryAction::Probe => self.probe_or_restore("Probe").await,
753 NetworkRecoveryAction::Restore => {
754 self.restore_signaling_and_webrtc("NetworkEventBatch").await
755 }
756 NetworkRecoveryAction::CleanupOnly => self.cleanup_connections().await,
757 NetworkRecoveryAction::ForceReconnect => self.force_reconnect().await,
758 }
759 }
760}
761
762pub fn select_network_recovery_action(events: &[NetworkEvent]) -> NetworkRecoveryAction {
763 ConnectionSupervisor::select_action(events)
764}
765
766pub async fn process_network_event_batch(
767 events: Vec<NetworkEvent>,
768 processor: Arc<dyn NetworkEventProcessor>,
769) -> Vec<NetworkEventResult> {
770 if events.is_empty() {
771 return Vec::new();
772 }
773
774 let action = select_network_recovery_action(&events);
775 let start = Instant::now();
776
777 tracing::info!(
778 event_count = events.len(),
779 action = ?action,
780 "network_event.action.start"
781 );
782
783 let result = processor.process_network_recovery_action(action).await;
784
785 let duration_ms = start.elapsed().as_millis() as u64;
786 match &result {
787 Ok(()) => tracing::info!(
788 event_count = events.len(),
789 action = ?action,
790 duration_ms,
791 "network_event.action.completed"
792 ),
793 Err(e) => tracing::warn!(
794 event_count = events.len(),
795 action = ?action,
796 duration_ms,
797 error = %e,
798 "network_event.action.completed"
799 ),
800 }
801
802 events
803 .into_iter()
804 .map(|event| match &result {
805 Ok(()) => NetworkEventResult::success(event, duration_ms),
806 Err(e) => NetworkEventResult::failure(event, e.clone(), duration_ms),
807 })
808 .collect()
809}
810
811pub struct NetworkEventRequest {
812 pub event: NetworkEvent,
813 pub result_tx: oneshot::Sender<NetworkEventResult>,
814}
815
816pub async fn run_network_event_reconciler(
817 mut event_rx: mpsc::Receiver<NetworkEventRequest>,
818 processor: Arc<dyn NetworkEventProcessor>,
819 shutdown_token: CancellationToken,
820) {
821 tracing::info!("🔄 Network event reconciler started");
822
823 loop {
824 tokio::select! {
825 Some(first_request) = event_rx.recv() => {
826 tracing::debug!(
827 event = ?first_request.event,
828 "network_event.reconciler.received"
829 );
830 let mut event_barrier = processor.begin_network_event_barrier(&first_request.event);
831 let mut requests = vec![first_request];
832 let settle = tokio::time::sleep(NETWORK_EVENT_SETTLE_WINDOW);
833 tokio::pin!(settle);
834
835 loop {
836 tokio::select! {
837 Some(next_request) = event_rx.recv() => {
838 tracing::debug!(
839 event = ?next_request.event,
840 "network_event.reconciler.coalesced"
841 );
842 if event_barrier.is_none() {
843 event_barrier = processor.begin_network_event_barrier(&next_request.event);
844 }
845 requests.push(next_request);
846 }
847 _ = &mut settle => {
848 break;
849 }
850 _ = shutdown_token.cancelled() => {
851 tracing::info!("🛑 Network event reconciler shutting down");
852 return;
853 }
854 else => {
855 break;
856 }
857 }
858 }
859
860 while let Ok(next_request) = event_rx.try_recv() {
861 tracing::debug!(
862 event = ?next_request.event,
863 "network_event.reconciler.coalesced"
864 );
865 if event_barrier.is_none() {
866 event_barrier = processor.begin_network_event_barrier(&next_request.event);
867 }
868 requests.push(next_request);
869 }
870
871 let events = requests
872 .iter()
873 .map(|request| request.event.clone())
874 .collect::<Vec<_>>();
875 let action = select_network_recovery_action(&events);
876 let facts = events
877 .iter()
878 .map(ConnectionFact::from_network_event)
879 .collect::<Vec<_>>();
880 tracing::info!(
881 event_count = events.len(),
882 action = ?action,
883 events = ?events,
884 facts = ?facts,
885 settle_window_ms = NETWORK_EVENT_SETTLE_WINDOW.as_millis() as u64,
886 "network_event.reconciler.batch_reconciled"
887 );
888
889 let results = process_network_event_batch(events, processor.clone()).await;
890 drop(event_barrier);
891
892 for (request, result) in requests.into_iter().zip(results) {
893 if request.result_tx.send(result).is_err() {
894 tracing::debug!("Network event caller dropped before receiving result");
895 }
896 }
897 }
898 _ = shutdown_token.cancelled() => {
899 tracing::info!("🛑 Network event reconciler shutting down");
900 break;
901 }
902 else => break,
903 }
904 }
905}
906
907pub struct NetworkEventHandle {
912 event_tx: mpsc::Sender<NetworkEventRequest>,
914 result_timeout: Duration,
915}
916
917impl NetworkEventHandle {
918 pub fn new(event_tx: mpsc::Sender<NetworkEventRequest>) -> Self {
920 Self::new_with_result_timeout(event_tx, NETWORK_EVENT_RESULT_TIMEOUT)
921 }
922
923 pub fn new_with_result_timeout(
929 event_tx: mpsc::Sender<NetworkEventRequest>,
930 result_timeout: Duration,
931 ) -> Self {
932 Self {
933 event_tx,
934 result_timeout,
935 }
936 }
937
938 pub async fn handle_network_path_changed(
940 &self,
941 snapshot: NetworkSnapshot,
942 ) -> Result<NetworkEventResult, String> {
943 self.send_event_and_await_result(NetworkEvent::NetworkPathChanged { snapshot })
944 .await
945 }
946
947 pub async fn handle_app_lifecycle_changed(
949 &self,
950 state: AppLifecycleState,
951 ) -> Result<NetworkEventResult, String> {
952 self.send_event_and_await_result(NetworkEvent::AppLifecycleChanged { state })
953 .await
954 }
955
956 pub async fn cleanup_connections(
958 &self,
959 reason: CleanupReason,
960 ) -> Result<NetworkEventResult, String> {
961 self.send_event_and_await_result(NetworkEvent::CleanupConnections { reason })
962 .await
963 }
964
965 pub async fn force_reconnect(
967 &self,
968 reason: ReconnectReason,
969 ) -> Result<NetworkEventResult, String> {
970 self.send_event_and_await_result(NetworkEvent::ForceReconnect { reason })
971 .await
972 }
973
974 async fn send_event_and_await_result(
976 &self,
977 event: NetworkEvent,
978 ) -> Result<NetworkEventResult, String> {
979 let event_request_id = NEXT_NETWORK_EVENT_REQUEST_ID.fetch_add(1, Ordering::Relaxed);
980 let start = Instant::now();
981 let (result_tx, result_rx) = oneshot::channel();
982 let request = NetworkEventRequest {
983 event: event.clone(),
984 result_tx,
985 };
986
987 tracing::info!(
988 event_request_id,
989 event = ?event,
990 result_timeout_ms = self.result_timeout.as_millis() as u64,
991 "network_event.handle.enqueue"
992 );
993
994 if let Err(e) = self.event_tx.send(request).await {
995 let err = format!("Failed to send network event: {}", e);
996 tracing::warn!(
997 event_request_id,
998 event = ?event,
999 error = %err,
1000 "network_event.handle.enqueue_failed"
1001 );
1002 return Err(err);
1003 }
1004
1005 let result = match tokio::time::timeout(self.result_timeout, result_rx).await {
1006 Ok(Ok(result)) => Ok(result),
1007 Ok(Err(_)) => Err("Failed to receive network event result".to_string()),
1008 Err(_) => Err(format!(
1009 "Timed out waiting for network event result after {}ms",
1010 self.result_timeout.as_millis()
1011 )),
1012 };
1013
1014 let wait_ms = start.elapsed().as_millis() as u64;
1015 match &result {
1016 Ok(result) if result.success => tracing::info!(
1017 event_request_id,
1018 event = ?event,
1019 result_event = ?result.event,
1020 duration_ms = result.duration_ms,
1021 wait_ms,
1022 "network_event.handle.result_received"
1023 ),
1024 Ok(result) => tracing::warn!(
1025 event_request_id,
1026 event = ?event,
1027 result_event = ?result.event,
1028 duration_ms = result.duration_ms,
1029 wait_ms,
1030 error = ?result.error,
1031 "network_event.handle.result_received"
1032 ),
1033 Err(e) => tracing::warn!(
1034 event_request_id,
1035 event = ?event,
1036 wait_ms,
1037 error = %e,
1038 "network_event.handle.result_failed"
1039 ),
1040 }
1041
1042 result
1043 }
1044}
1045
1046impl Clone for NetworkEventHandle {
1047 fn clone(&self) -> Self {
1048 Self {
1049 event_tx: self.event_tx.clone(),
1050 result_timeout: self.result_timeout,
1051 }
1052 }
1053}