1use std::sync::{
70 Arc,
71 atomic::{AtomicU64, Ordering},
72};
73use std::time::{Duration, Instant};
74
75use crate::transport::PeerTransport;
76use crate::wire::webrtc::{CleanupGuard, SignalingClient, WebRtcCoordinator};
77use tokio::sync::{mpsc, oneshot};
78use tokio_util::sync::CancellationToken;
79
80use super::connection_supervisor::{ConnectionFact, ConnectionSupervisor};
81
82const NETWORK_EVENT_SETTLE_WINDOW: Duration = Duration::from_millis(400);
83const NETWORK_EVENT_RESULT_TIMEOUT: Duration = Duration::from_secs(5);
84const SIGNALING_PROBE_TIMEOUT: Duration = Duration::from_secs(1);
85pub(super) const LONG_BACKGROUND_RECONNECT_THRESHOLD_MS: u64 = 30_000;
86static NEXT_NETWORK_EVENT_REQUEST_ID: AtomicU64 = AtomicU64::new(1);
87
88pub struct NetworkEventBarrier {
91 _cleanup_guard: CleanupGuard,
92}
93
94#[derive(Debug, Clone, PartialEq, Eq, Hash)]
96pub struct NetworkSnapshot {
97 pub sequence: u64,
98 pub availability: NetworkAvailability,
99 pub transport: NetworkTransportFlags,
100 pub is_expensive: bool,
101 pub is_constrained: bool,
102}
103
104impl NetworkSnapshot {
105 pub fn is_offline(&self) -> bool {
106 matches!(self.availability, NetworkAvailability::Unavailable)
107 }
108
109 pub fn should_restore(&self) -> bool {
110 matches!(self.availability, NetworkAvailability::Available)
111 }
112}
113
114#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
116pub enum NetworkAvailability {
117 Unknown,
118 Available,
119 Unavailable,
120}
121
122#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
124pub struct NetworkTransportFlags {
125 pub wifi: bool,
126 pub cellular: bool,
127 pub ethernet: bool,
128 pub vpn: bool,
129 pub other: bool,
130}
131
132#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
134pub enum AppLifecycleState {
135 Background,
136 Foreground { background_duration_ms: u64 },
137}
138
139#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
141pub enum CleanupReason {
142 AppTerminating,
143 UserLogout,
144 StaleConnectionSuspected,
145 ManualReset,
146}
147
148#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
150pub enum ReconnectReason {
151 NetworkPathChanged,
152 LongBackground,
153 ProbeFailed,
154 ManualReconnect,
155 StaleConnectionSuspected,
156}
157
158#[derive(Debug, Clone, PartialEq, Eq, Hash)]
160pub enum NetworkEvent {
161 NetworkPathChanged { snapshot: NetworkSnapshot },
163
164 AppLifecycleChanged { state: AppLifecycleState },
166
167 CleanupConnections { reason: CleanupReason },
174
175 ForceReconnect { reason: ReconnectReason },
177}
178
179#[derive(Debug, Clone, Copy, PartialEq, Eq)]
181pub enum NetworkRecoveryAction {
182 Noop,
183 Offline,
184 Probe,
185 Restore,
186 CleanupOnly,
187 ForceReconnect,
188}
189
190fn network_event_needs_lifecycle_barrier(event: &NetworkEvent) -> bool {
191 match event {
192 NetworkEvent::NetworkPathChanged { snapshot } => {
193 snapshot.is_offline() || snapshot.should_restore()
194 }
195 NetworkEvent::AppLifecycleChanged { state } => match state {
196 AppLifecycleState::Background => false,
197 AppLifecycleState::Foreground {
198 background_duration_ms,
199 } => *background_duration_ms >= LONG_BACKGROUND_RECONNECT_THRESHOLD_MS,
200 },
201 NetworkEvent::CleanupConnections { .. } | NetworkEvent::ForceReconnect { .. } => true,
202 }
203}
204
205#[derive(Debug, Clone)]
207pub struct NetworkEventResult {
208 pub event: NetworkEvent,
210
211 pub success: bool,
213
214 pub error: Option<String>,
216
217 pub duration_ms: u64,
219}
220
221impl NetworkEventResult {
222 pub fn success(event: NetworkEvent, duration_ms: u64) -> Self {
223 Self {
224 event,
225 success: true,
226 error: None,
227 duration_ms,
228 }
229 }
230
231 pub fn failure(event: NetworkEvent, error: String, duration_ms: u64) -> Self {
232 Self {
233 event,
234 success: false,
235 error: Some(error),
236 duration_ms,
237 }
238 }
239}
240
241#[async_trait::async_trait]
245pub trait NetworkEventProcessor: Send + Sync {
246 fn begin_network_event_barrier(&self, _event: &NetworkEvent) -> Option<NetworkEventBarrier> {
249 None
250 }
251
252 async fn process_network_available(&self) -> Result<(), String>;
258
259 async fn process_network_lost(&self) -> Result<(), String>;
265
266 async fn process_network_type_changed(
272 &self,
273 is_wifi: bool,
274 is_cellular: bool,
275 ) -> Result<(), String>;
276
277 async fn cleanup_connections(&self) -> Result<(), String>;
300
301 async fn probe_connectivity(&self) -> Result<(), String> {
303 Ok(())
304 }
305
306 async fn force_reconnect(&self) -> Result<(), String> {
308 self.cleanup_connections().await?;
309 self.process_network_available().await
310 }
311
312 async fn process_network_recovery_action(
317 &self,
318 action: NetworkRecoveryAction,
319 ) -> Result<(), String> {
320 match action {
321 NetworkRecoveryAction::Noop => Ok(()),
322 NetworkRecoveryAction::Offline => self.process_network_lost().await,
323 NetworkRecoveryAction::Probe => self.probe_connectivity().await,
324 NetworkRecoveryAction::Restore => self.process_network_available().await,
325 NetworkRecoveryAction::CleanupOnly => self.cleanup_connections().await,
326 NetworkRecoveryAction::ForceReconnect => self.force_reconnect().await,
327 }
328 }
329}
330
331#[derive(Debug, Clone)]
333pub struct DebounceConfig {
334 pub window: Duration,
336}
337
338impl Default for DebounceConfig {
339 fn default() -> Self {
340 Self {
341 window: Duration::from_secs(2),
343 }
344 }
345}
346
347#[derive(Debug)]
349struct DebounceState {
350 last_available: tokio::sync::Mutex<Option<Instant>>,
351 last_lost: tokio::sync::Mutex<Option<Instant>>,
352 last_type_changed: tokio::sync::Mutex<Option<Instant>>,
353}
354
355impl DebounceState {
356 fn new() -> Self {
357 Self {
358 last_available: tokio::sync::Mutex::new(None),
359 last_lost: tokio::sync::Mutex::new(None),
360 last_type_changed: tokio::sync::Mutex::new(None),
361 }
362 }
363}
364
365#[derive(Debug, Clone, Copy, PartialEq, Eq)]
366enum DebounceEvent {
367 Available,
368 Lost,
369 TypeChanged,
370}
371
372#[derive(Debug)]
373struct SignalingRecoveryState {
374 connect_lock: tokio::sync::Mutex<()>,
375 last_successful_connect: tokio::sync::Mutex<Option<Instant>>,
376}
377
378impl SignalingRecoveryState {
379 fn new() -> Self {
380 Self {
381 connect_lock: tokio::sync::Mutex::new(()),
382 last_successful_connect: tokio::sync::Mutex::new(None),
383 }
384 }
385}
386
387pub struct DefaultNetworkEventProcessor {
389 signaling_client: Arc<dyn SignalingClient>,
390 webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
391 peer_transport: Option<Arc<PeerTransport>>,
392 debounce_config: DebounceConfig,
393 debounce_state: Arc<DebounceState>,
394 recovery_state: Arc<SignalingRecoveryState>,
395}
396
397impl DefaultNetworkEventProcessor {
398 pub fn new(
399 signaling_client: Arc<dyn SignalingClient>,
400 webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
401 ) -> Self {
402 Self::new_with_debounce_and_peer_transport(
403 signaling_client,
404 webrtc_coordinator,
405 DebounceConfig::default(),
406 None,
407 )
408 }
409
410 pub fn new_with_debounce(
411 signaling_client: Arc<dyn SignalingClient>,
412 webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
413 debounce_config: DebounceConfig,
414 ) -> Self {
415 Self::new_with_debounce_and_peer_transport(
416 signaling_client,
417 webrtc_coordinator,
418 debounce_config,
419 None,
420 )
421 }
422
423 pub(crate) fn new_with_peer_transport(
424 signaling_client: Arc<dyn SignalingClient>,
425 webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
426 peer_transport: Option<Arc<PeerTransport>>,
427 ) -> Self {
428 Self::new_with_debounce_and_peer_transport(
429 signaling_client,
430 webrtc_coordinator,
431 DebounceConfig::default(),
432 peer_transport,
433 )
434 }
435
436 pub(crate) fn new_with_debounce_and_peer_transport(
437 signaling_client: Arc<dyn SignalingClient>,
438 webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
439 debounce_config: DebounceConfig,
440 peer_transport: Option<Arc<PeerTransport>>,
441 ) -> Self {
442 Self {
443 signaling_client,
444 webrtc_coordinator,
445 peer_transport,
446 debounce_config,
447 debounce_state: Arc::new(DebounceState::new()),
448 recovery_state: Arc::new(SignalingRecoveryState::new()),
449 }
450 }
451
452 fn lifecycle_barrier(&self) -> Option<NetworkEventBarrier> {
453 self.webrtc_coordinator
454 .as_ref()
455 .map(|coordinator| NetworkEventBarrier {
456 _cleanup_guard: coordinator.cleanup_guard(),
457 })
458 }
459
460 async fn should_process_event(&self, event: DebounceEvent) -> bool {
466 let now = Instant::now();
467
468 match event {
469 DebounceEvent::Available => {
470 let mut last = self.debounce_state.last_available.lock().await;
471 if let Some(last_time) = *last {
472 if now.duration_since(last_time) < self.debounce_config.window {
473 tracing::debug!(
474 "⏸️ Debouncing Network Available event (last event was {:?} ago)",
475 now.duration_since(last_time)
476 );
477 return false;
478 }
479 }
480 *last = Some(now);
481 true
482 }
483 DebounceEvent::Lost => {
484 let mut last = self.debounce_state.last_lost.lock().await;
485 if let Some(last_time) = *last {
486 if now.duration_since(last_time) < self.debounce_config.window {
487 tracing::debug!(
488 "⏸️ Debouncing Network Lost event (last event was {:?} ago)",
489 now.duration_since(last_time)
490 );
491 return false;
492 }
493 }
494 *last = Some(now);
495 true
496 }
497 DebounceEvent::TypeChanged => {
498 let mut last = self.debounce_state.last_type_changed.lock().await;
499 if let Some(last_time) = *last {
500 if now.duration_since(last_time) < self.debounce_config.window {
501 tracing::debug!(
502 "⏸️ Debouncing Network TypeChanged event (last event was {:?} ago)",
503 now.duration_since(last_time)
504 );
505 return false;
506 }
507 }
508 *last = Some(now);
509 true
510 }
511 }
512 }
513
514 async fn ensure_signaling_healthy_once(&self, reason: &str) -> Result<(), String> {
515 let _guard = self.recovery_state.connect_lock.lock().await;
516
517 if !self.signaling_client.is_connected() {
518 tracing::info!(reason = reason, "🔄 Connecting signaling");
519 self.signaling_client.connect_once().await.map_err(|e| {
520 let err_msg = format!("WebSocket connect failed: {}", e);
521 tracing::error!("❌ {}", err_msg);
522 err_msg
523 })?;
524
525 *self.recovery_state.last_successful_connect.lock().await = Some(Instant::now());
526 tracing::info!(reason = reason, "✅ Signaling connected");
527 return Ok(());
528 }
529
530 tracing::debug!(
531 reason = reason,
532 timeout_ms = SIGNALING_PROBE_TIMEOUT.as_millis() as u64,
533 "🔎 Probing existing signaling WebSocket"
534 );
535
536 match self
537 .signaling_client
538 .probe_alive(SIGNALING_PROBE_TIMEOUT)
539 .await
540 {
541 Ok(()) => {
542 tracing::debug!(
543 reason = reason,
544 "✅ Signaling probe succeeded; keeping existing WebSocket"
545 );
546 Ok(())
547 }
548 Err(e) => {
549 tracing::warn!(
550 reason = reason,
551 "⚠️ Signaling probe failed; rebuilding WebSocket: {}",
552 e
553 );
554
555 if let Err(disconnect_err) = self.signaling_client.disconnect().await {
556 tracing::warn!(
557 reason = reason,
558 "⚠️ Failed to disconnect unhealthy signaling before rebuild: {}",
559 disconnect_err
560 );
561 }
562
563 tracing::info!(reason = reason, "🔄 Rebuilding signaling: connecting");
564 self.signaling_client
565 .connect_once()
566 .await
567 .map_err(|connect_err| {
568 let err_msg = format!("WebSocket rebuild failed: {}", connect_err);
569 tracing::error!("❌ {}", err_msg);
570 err_msg
571 })?;
572
573 *self.recovery_state.last_successful_connect.lock().await = Some(Instant::now());
574 tracing::info!(reason = reason, "✅ Signaling rebuilt");
575 Ok(())
576 }
577 }
578 }
579
580 async fn restore_signaling_and_webrtc(&self, reason: &str) -> Result<(), String> {
581 let _cleanup_guard = self.lifecycle_barrier();
582 let recovery_targets = if let Some(coordinator) = self.webrtc_coordinator.clone() {
583 coordinator.begin_network_recovery(reason).await
584 } else {
585 Vec::new()
586 };
587
588 self.ensure_signaling_healthy_once(reason).await?;
589
590 let coordinator = self.webrtc_coordinator.clone();
591
592 if let Some(coordinator) = coordinator {
593 if recovery_targets.is_empty() {
594 tracing::info!("♻️ Resuming ICE restart for peers already in network recovery");
595 } else {
596 tracing::info!("♻️ Triggering ICE restart for recovering connections...");
597 }
598 coordinator.restart_network_recovery_connections().await;
599 }
600
601 Ok(())
602 }
603
604 async fn probe_or_restore(&self, reason: &str) -> Result<(), String> {
605 match self.probe_connectivity().await {
606 Ok(()) => Ok(()),
607 Err(e) => {
608 tracing::warn!(
609 reason = reason,
610 "Connectivity probe failed; restoring connections: {}",
611 e
612 );
613 if let Err(disconnect_err) = self.signaling_client.disconnect().await {
614 tracing::warn!(
615 reason = reason,
616 "Failed to disconnect unhealthy signaling before restore: {}",
617 disconnect_err
618 );
619 }
620 self.restore_signaling_and_webrtc(reason).await
621 }
622 }
623 }
624
625 async fn process_offline(&self) -> Result<(), String> {
626 let _cleanup_guard = self.lifecycle_barrier();
627 tracing::info!("📱 Processing: Network offline");
628
629 if let Some(ref coordinator) = self.webrtc_coordinator {
630 coordinator.begin_network_recovery("NetworkLost").await;
631 tracing::info!("🧹 Clearing pending ICE restart attempts...");
632 coordinator.clear_pending_restarts().await;
633 }
634
635 tracing::info!("🔌 Disconnecting WebSocket...");
636 let _ = self.signaling_client.disconnect().await;
637
638 Ok(())
639 }
640}
641
642#[async_trait::async_trait]
643impl NetworkEventProcessor for DefaultNetworkEventProcessor {
644 fn begin_network_event_barrier(&self, event: &NetworkEvent) -> Option<NetworkEventBarrier> {
645 if network_event_needs_lifecycle_barrier(event) {
646 self.lifecycle_barrier()
647 } else {
648 None
649 }
650 }
651
652 async fn process_network_available(&self) -> Result<(), String> {
654 let should_process = self.should_process_event(DebounceEvent::Available).await;
656 if !should_process && self.signaling_client.is_connected() {
657 return Ok(());
658 }
659
660 tracing::info!("📱 Processing: Network available");
661
662 self.restore_signaling_and_webrtc("NetworkAvailable").await
663 }
664
665 async fn process_network_lost(&self) -> Result<(), String> {
667 if !self.should_process_event(DebounceEvent::Lost).await {
669 return Ok(());
670 }
671
672 self.process_offline().await
673 }
674
675 async fn process_network_type_changed(
677 &self,
678 is_wifi: bool,
679 is_cellular: bool,
680 ) -> Result<(), String> {
681 let should_process = self.should_process_event(DebounceEvent::TypeChanged).await;
683 if !should_process && self.signaling_client.is_connected() {
684 return Ok(());
685 }
686
687 tracing::info!(
688 "📱 Processing: Network type changed (WiFi={}, Cellular={})",
689 is_wifi,
690 is_cellular
691 );
692
693 self.restore_signaling_and_webrtc("NetworkTypeChanged")
694 .await
695 }
696
697 async fn cleanup_connections(&self) -> Result<(), String> {
703 let _cleanup_guard = self.lifecycle_barrier();
704
705 tracing::info!("🧹 Manually cleaning up all connections...");
706
707 if let Some(ref coordinator) = self.webrtc_coordinator {
709 tracing::info!("♻️ Clearing pending ICE restart attempts...");
710 coordinator.clear_pending_restarts().await;
711 }
712
713 if let Some(ref peer_transport) = self.peer_transport {
715 tracing::info!("🔻 Closing all PeerTransport connections...");
716 if let Err(e) = peer_transport.close_all().await {
717 let err_msg = format!("Failed to close peer transports: {}", e);
718 tracing::warn!("⚠️ {}", err_msg);
719 } else {
721 tracing::info!("✅ All PeerTransport connections closed");
722 }
723 }
724
725 if let Some(ref coordinator) = self.webrtc_coordinator {
727 tracing::info!("🔻 Closing all WebRTC peer connections...");
728 if let Err(e) = coordinator.close_all_peers().await {
729 let err_msg = format!("Failed to close all peers: {}", e);
730 tracing::warn!("⚠️ {}", err_msg);
731 } else {
733 tracing::info!("✅ All WebRTC peer connections closed");
734 }
735 }
736
737 tracing::info!("🔌 Disconnecting WebSocket...");
739 match self.signaling_client.disconnect().await {
740 Ok(_) => {
741 tracing::info!("✅ WebSocket disconnected successfully");
742 }
743 Err(e) => {
744 let err_msg = format!("Failed to disconnect WebSocket: {}", e);
745 tracing::warn!("⚠️ {}", err_msg);
746 }
748 }
749
750 tracing::info!("✅ Connection cleanup completed");
751
752 Ok(())
753 }
754
755 async fn probe_connectivity(&self) -> Result<(), String> {
756 self.signaling_client
757 .probe_alive(SIGNALING_PROBE_TIMEOUT)
758 .await
759 .map_err(|e| format!("Signaling probe failed: {}", e))
760 }
761
762 async fn force_reconnect(&self) -> Result<(), String> {
763 self.cleanup_connections().await?;
764 let result = self.restore_signaling_and_webrtc("ForceReconnect").await;
765 if let Err(err) = &result {
766 tracing::warn!(
767 error = %err,
768 "ForceReconnect restore failed; scheduling signaling auto-reconnect"
769 );
770 self.signaling_client.schedule_auto_reconnect();
771 }
772 result
773 }
774
775 async fn process_network_recovery_action(
776 &self,
777 action: NetworkRecoveryAction,
778 ) -> Result<(), String> {
779 match action {
780 NetworkRecoveryAction::Noop => Ok(()),
781 NetworkRecoveryAction::Offline => self.process_offline().await,
782 NetworkRecoveryAction::Probe => self.probe_or_restore("Probe").await,
783 NetworkRecoveryAction::Restore => {
784 self.restore_signaling_and_webrtc("NetworkEventBatch").await
785 }
786 NetworkRecoveryAction::CleanupOnly => self.cleanup_connections().await,
787 NetworkRecoveryAction::ForceReconnect => self.force_reconnect().await,
788 }
789 }
790}
791
792pub fn select_network_recovery_action(events: &[NetworkEvent]) -> NetworkRecoveryAction {
793 ConnectionSupervisor::select_action(events)
794}
795
796pub async fn process_network_event_batch(
797 events: Vec<NetworkEvent>,
798 processor: Arc<dyn NetworkEventProcessor>,
799) -> Vec<NetworkEventResult> {
800 if events.is_empty() {
801 return Vec::new();
802 }
803
804 let action = select_network_recovery_action(&events);
805 let start = Instant::now();
806
807 tracing::info!(
808 event_count = events.len(),
809 action = ?action,
810 "network_event.action.start"
811 );
812
813 let result = processor.process_network_recovery_action(action).await;
814
815 let duration_ms = start.elapsed().as_millis() as u64;
816 match &result {
817 Ok(()) => tracing::info!(
818 event_count = events.len(),
819 action = ?action,
820 duration_ms,
821 "network_event.action.completed"
822 ),
823 Err(e) => tracing::warn!(
824 event_count = events.len(),
825 action = ?action,
826 duration_ms,
827 error = %e,
828 "network_event.action.completed"
829 ),
830 }
831
832 events
833 .into_iter()
834 .map(|event| match &result {
835 Ok(()) => NetworkEventResult::success(event, duration_ms),
836 Err(e) => NetworkEventResult::failure(event, e.clone(), duration_ms),
837 })
838 .collect()
839}
840
841pub struct NetworkEventRequest {
842 pub event: NetworkEvent,
843 pub result_tx: oneshot::Sender<NetworkEventResult>,
844}
845
846pub async fn run_network_event_reconciler(
847 mut event_rx: mpsc::Receiver<NetworkEventRequest>,
848 processor: Arc<dyn NetworkEventProcessor>,
849 shutdown_token: CancellationToken,
850) {
851 tracing::info!("🔄 Network event reconciler started");
852
853 loop {
854 tokio::select! {
855 Some(first_request) = event_rx.recv() => {
856 tracing::debug!(
857 event = ?first_request.event,
858 "network_event.reconciler.received"
859 );
860 let mut event_barrier = processor.begin_network_event_barrier(&first_request.event);
861 let mut requests = vec![first_request];
862 let settle = tokio::time::sleep(NETWORK_EVENT_SETTLE_WINDOW);
863 tokio::pin!(settle);
864
865 loop {
866 tokio::select! {
867 Some(next_request) = event_rx.recv() => {
868 tracing::debug!(
869 event = ?next_request.event,
870 "network_event.reconciler.coalesced"
871 );
872 if event_barrier.is_none() {
873 event_barrier = processor.begin_network_event_barrier(&next_request.event);
874 }
875 requests.push(next_request);
876 }
877 _ = &mut settle => {
878 break;
879 }
880 _ = shutdown_token.cancelled() => {
881 tracing::info!("🛑 Network event reconciler shutting down");
882 return;
883 }
884 else => {
885 break;
886 }
887 }
888 }
889
890 while let Ok(next_request) = event_rx.try_recv() {
891 tracing::debug!(
892 event = ?next_request.event,
893 "network_event.reconciler.coalesced"
894 );
895 if event_barrier.is_none() {
896 event_barrier = processor.begin_network_event_barrier(&next_request.event);
897 }
898 requests.push(next_request);
899 }
900
901 let events = requests
902 .iter()
903 .map(|request| request.event.clone())
904 .collect::<Vec<_>>();
905 let action = select_network_recovery_action(&events);
906 let facts = events
907 .iter()
908 .map(ConnectionFact::from_network_event)
909 .collect::<Vec<_>>();
910 tracing::info!(
911 event_count = events.len(),
912 action = ?action,
913 events = ?events,
914 facts = ?facts,
915 settle_window_ms = NETWORK_EVENT_SETTLE_WINDOW.as_millis() as u64,
916 "network_event.reconciler.batch_reconciled"
917 );
918
919 let results = process_network_event_batch(events, processor.clone()).await;
920 drop(event_barrier);
921
922 for (request, result) in requests.into_iter().zip(results) {
923 if request.result_tx.send(result).is_err() {
924 tracing::debug!("Network event caller dropped before receiving result");
925 }
926 }
927 }
928 _ = shutdown_token.cancelled() => {
929 tracing::info!("🛑 Network event reconciler shutting down");
930 break;
931 }
932 else => break,
933 }
934 }
935}
936
937pub struct NetworkEventHandle {
942 event_tx: mpsc::Sender<NetworkEventRequest>,
944 result_timeout: Duration,
945}
946
947impl NetworkEventHandle {
948 pub fn new(event_tx: mpsc::Sender<NetworkEventRequest>) -> Self {
950 Self::new_with_result_timeout(event_tx, NETWORK_EVENT_RESULT_TIMEOUT)
951 }
952
953 pub fn new_with_result_timeout(
959 event_tx: mpsc::Sender<NetworkEventRequest>,
960 result_timeout: Duration,
961 ) -> Self {
962 Self {
963 event_tx,
964 result_timeout,
965 }
966 }
967
968 pub async fn handle_network_path_changed(
970 &self,
971 snapshot: NetworkSnapshot,
972 ) -> Result<NetworkEventResult, String> {
973 self.send_event_and_await_result(NetworkEvent::NetworkPathChanged { snapshot })
974 .await
975 }
976
977 pub async fn handle_app_lifecycle_changed(
979 &self,
980 state: AppLifecycleState,
981 ) -> Result<NetworkEventResult, String> {
982 self.send_event_and_await_result(NetworkEvent::AppLifecycleChanged { state })
983 .await
984 }
985
986 pub async fn cleanup_connections(
988 &self,
989 reason: CleanupReason,
990 ) -> Result<NetworkEventResult, String> {
991 self.send_event_and_await_result(NetworkEvent::CleanupConnections { reason })
992 .await
993 }
994
995 pub async fn force_reconnect(
997 &self,
998 reason: ReconnectReason,
999 ) -> Result<NetworkEventResult, String> {
1000 self.send_event_and_await_result(NetworkEvent::ForceReconnect { reason })
1001 .await
1002 }
1003
1004 async fn send_event_and_await_result(
1006 &self,
1007 event: NetworkEvent,
1008 ) -> Result<NetworkEventResult, String> {
1009 let event_request_id = NEXT_NETWORK_EVENT_REQUEST_ID.fetch_add(1, Ordering::Relaxed);
1010 let start = Instant::now();
1011 let (result_tx, result_rx) = oneshot::channel();
1012 let request = NetworkEventRequest {
1013 event: event.clone(),
1014 result_tx,
1015 };
1016
1017 tracing::info!(
1018 event_request_id,
1019 event = ?event,
1020 result_timeout_ms = self.result_timeout.as_millis() as u64,
1021 "network_event.handle.enqueue"
1022 );
1023
1024 if let Err(e) = self.event_tx.send(request).await {
1025 let err = format!("Failed to send network event: {}", e);
1026 tracing::warn!(
1027 event_request_id,
1028 event = ?event,
1029 error = %err,
1030 "network_event.handle.enqueue_failed"
1031 );
1032 return Err(err);
1033 }
1034
1035 let result = match tokio::time::timeout(self.result_timeout, result_rx).await {
1036 Ok(Ok(result)) => Ok(result),
1037 Ok(Err(_)) => Err("Failed to receive network event result".to_string()),
1038 Err(_) => Err(format!(
1039 "Timed out waiting for network event result after {}ms",
1040 self.result_timeout.as_millis()
1041 )),
1042 };
1043
1044 let wait_ms = start.elapsed().as_millis() as u64;
1045 match &result {
1046 Ok(result) if result.success => tracing::info!(
1047 event_request_id,
1048 event = ?event,
1049 result_event = ?result.event,
1050 duration_ms = result.duration_ms,
1051 wait_ms,
1052 "network_event.handle.result_received"
1053 ),
1054 Ok(result) => tracing::warn!(
1055 event_request_id,
1056 event = ?event,
1057 result_event = ?result.event,
1058 duration_ms = result.duration_ms,
1059 wait_ms,
1060 error = ?result.error,
1061 "network_event.handle.result_received"
1062 ),
1063 Err(e) => tracing::warn!(
1064 event_request_id,
1065 event = ?event,
1066 wait_ms,
1067 error = %e,
1068 "network_event.handle.result_failed"
1069 ),
1070 }
1071
1072 result
1073 }
1074}
1075
1076impl Clone for NetworkEventHandle {
1077 fn clone(&self) -> Self {
1078 Self {
1079 event_tx: self.event_tx.clone(),
1080 result_timeout: self.result_timeout,
1081 }
1082 }
1083}
1084
1085#[cfg(test)]
1086mod tests {
1087 use super::*;
1088 use crate::lifecycle::CredentialState;
1089 use crate::transport::{NetworkError, NetworkResult};
1090 use crate::wire::webrtc::{SignalingEvent, SignalingStats};
1091 use actr_protocol::{
1092 AIdCredential, ActrId, Pong, RegisterRequest, RegisterResponse, RouteCandidatesRequest,
1093 RouteCandidatesResponse, ServiceAvailabilityState, SignalingEnvelope, UnregisterResponse,
1094 };
1095 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering as AtomicOrdering};
1096 use tokio::sync::broadcast;
1097
1098 struct ForceReconnectFakeSignalingClient {
1099 connected: AtomicBool,
1100 connect_once_should_fail: bool,
1101 disconnect_calls: AtomicUsize,
1102 connect_once_calls: AtomicUsize,
1103 schedule_auto_reconnect_calls: AtomicUsize,
1104 event_tx: broadcast::Sender<SignalingEvent>,
1105 }
1106
1107 impl ForceReconnectFakeSignalingClient {
1108 fn new(connect_once_should_fail: bool) -> Self {
1109 let (event_tx, _rx) = broadcast::channel(8);
1110 Self {
1111 connected: AtomicBool::new(false),
1112 connect_once_should_fail,
1113 disconnect_calls: AtomicUsize::new(0),
1114 connect_once_calls: AtomicUsize::new(0),
1115 schedule_auto_reconnect_calls: AtomicUsize::new(0),
1116 event_tx,
1117 }
1118 }
1119 }
1120
1121 #[async_trait::async_trait]
1122 impl SignalingClient for ForceReconnectFakeSignalingClient {
1123 async fn connect(&self) -> NetworkResult<()> {
1124 Ok(())
1125 }
1126
1127 async fn connect_once(&self) -> NetworkResult<()> {
1128 self.connect_once_calls.fetch_add(1, AtomicOrdering::SeqCst);
1129 if self.connect_once_should_fail {
1130 return Err(NetworkError::ConnectionError(
1131 "forced connect_once failure".to_string(),
1132 ));
1133 }
1134
1135 self.connected.store(true, AtomicOrdering::SeqCst);
1136 Ok(())
1137 }
1138
1139 fn schedule_auto_reconnect(&self) {
1140 self.schedule_auto_reconnect_calls
1141 .fetch_add(1, AtomicOrdering::SeqCst);
1142 }
1143
1144 async fn disconnect(&self) -> NetworkResult<()> {
1145 self.disconnect_calls.fetch_add(1, AtomicOrdering::SeqCst);
1146 self.connected.store(false, AtomicOrdering::SeqCst);
1147 Ok(())
1148 }
1149
1150 async fn send_register_request(
1151 &self,
1152 _request: RegisterRequest,
1153 ) -> NetworkResult<RegisterResponse> {
1154 Err(NetworkError::ConnectionError("unused".to_string()))
1155 }
1156
1157 async fn send_unregister_request(
1158 &self,
1159 _actor_id: ActrId,
1160 _credential: AIdCredential,
1161 _reason: Option<String>,
1162 ) -> NetworkResult<UnregisterResponse> {
1163 Err(NetworkError::ConnectionError("unused".to_string()))
1164 }
1165
1166 async fn send_heartbeat(
1167 &self,
1168 _actor_id: ActrId,
1169 _credential: AIdCredential,
1170 _availability: ServiceAvailabilityState,
1171 _power_reserve: f32,
1172 _mailbox_backlog: f32,
1173 ) -> NetworkResult<Pong> {
1174 Err(NetworkError::ConnectionError("unused".to_string()))
1175 }
1176
1177 async fn send_route_candidates_request(
1178 &self,
1179 _actor_id: ActrId,
1180 _credential: AIdCredential,
1181 _request: RouteCandidatesRequest,
1182 ) -> NetworkResult<RouteCandidatesResponse> {
1183 Err(NetworkError::ConnectionError("unused".to_string()))
1184 }
1185
1186 async fn get_signing_key(
1187 &self,
1188 _actor_id: ActrId,
1189 _credential: AIdCredential,
1190 _key_id: u32,
1191 ) -> NetworkResult<(u32, Vec<u8>)> {
1192 Err(NetworkError::ConnectionError("unused".to_string()))
1193 }
1194
1195 async fn send_credential_update_request(
1196 &self,
1197 _actor_id: ActrId,
1198 _credential: AIdCredential,
1199 ) -> NetworkResult<RegisterResponse> {
1200 Err(NetworkError::ConnectionError("unused".to_string()))
1201 }
1202
1203 async fn send_envelope(&self, _envelope: SignalingEnvelope) -> NetworkResult<()> {
1204 Err(NetworkError::ConnectionError("unused".to_string()))
1205 }
1206
1207 async fn receive_envelope(&self) -> NetworkResult<Option<SignalingEnvelope>> {
1208 Err(NetworkError::ConnectionError("unused".to_string()))
1209 }
1210
1211 fn is_connected(&self) -> bool {
1212 self.connected.load(AtomicOrdering::SeqCst)
1213 }
1214
1215 fn get_stats(&self) -> SignalingStats {
1216 SignalingStats::default()
1217 }
1218
1219 fn subscribe_events(&self) -> broadcast::Receiver<SignalingEvent> {
1220 self.event_tx.subscribe()
1221 }
1222
1223 async fn set_actor_id(&self, _actor_id: ActrId) {}
1224
1225 async fn set_credential_state(&self, _credential_state: CredentialState) {}
1226
1227 async fn clear_identity(&self) {}
1228 }
1229
1230 fn snapshot(sequence: u64, availability: NetworkAvailability) -> NetworkSnapshot {
1231 NetworkSnapshot {
1232 sequence,
1233 availability,
1234 transport: NetworkTransportFlags::default(),
1235 is_expensive: false,
1236 is_constrained: false,
1237 }
1238 }
1239
1240 #[test]
1241 fn lifecycle_barrier_is_scoped_to_events_that_change_connections() {
1242 let cases = [
1243 (
1244 NetworkEvent::NetworkPathChanged {
1245 snapshot: snapshot(1, NetworkAvailability::Unavailable),
1246 },
1247 true,
1248 ),
1249 (
1250 NetworkEvent::NetworkPathChanged {
1251 snapshot: snapshot(2, NetworkAvailability::Available),
1252 },
1253 true,
1254 ),
1255 (
1256 NetworkEvent::NetworkPathChanged {
1257 snapshot: snapshot(3, NetworkAvailability::Unknown),
1258 },
1259 false,
1260 ),
1261 (
1262 NetworkEvent::AppLifecycleChanged {
1263 state: AppLifecycleState::Background,
1264 },
1265 false,
1266 ),
1267 (
1268 NetworkEvent::AppLifecycleChanged {
1269 state: AppLifecycleState::Foreground {
1270 background_duration_ms: LONG_BACKGROUND_RECONNECT_THRESHOLD_MS - 1,
1271 },
1272 },
1273 false,
1274 ),
1275 (
1276 NetworkEvent::AppLifecycleChanged {
1277 state: AppLifecycleState::Foreground {
1278 background_duration_ms: LONG_BACKGROUND_RECONNECT_THRESHOLD_MS,
1279 },
1280 },
1281 true,
1282 ),
1283 (
1284 NetworkEvent::CleanupConnections {
1285 reason: CleanupReason::ManualReset,
1286 },
1287 true,
1288 ),
1289 (
1290 NetworkEvent::ForceReconnect {
1291 reason: ReconnectReason::ManualReconnect,
1292 },
1293 true,
1294 ),
1295 ];
1296
1297 for (event, expected) in cases {
1298 assert_eq!(
1299 network_event_needs_lifecycle_barrier(&event),
1300 expected,
1301 "{event:?}"
1302 );
1303 }
1304 }
1305
1306 #[tokio::test]
1307 async fn force_reconnect_failure_schedules_auto_reconnect() {
1308 let signaling = Arc::new(ForceReconnectFakeSignalingClient::new(true));
1309 let processor = DefaultNetworkEventProcessor::new(signaling.clone(), None);
1310
1311 let result = processor.force_reconnect().await;
1312
1313 assert!(result.is_err());
1314 assert_eq!(
1315 signaling.disconnect_calls.load(AtomicOrdering::SeqCst),
1316 1,
1317 "ForceReconnect cleanup should disconnect signaling once"
1318 );
1319 assert_eq!(
1320 signaling.connect_once_calls.load(AtomicOrdering::SeqCst),
1321 1,
1322 "ForceReconnect restore should make one quick connect attempt"
1323 );
1324 assert_eq!(
1325 signaling
1326 .schedule_auto_reconnect_calls
1327 .load(AtomicOrdering::SeqCst),
1328 1,
1329 "failed ForceReconnect restore should wake the auto-reconnect manager"
1330 );
1331 }
1332}