1use std::sync::{
70 Arc,
71 atomic::{AtomicU64, Ordering},
72};
73use std::time::{Duration, Instant};
74
75use crate::transport::PeerTransport;
76use crate::wire::webrtc::{SignalingClient, WebRtcCoordinator};
77use tokio::sync::{mpsc, oneshot};
78use tokio_util::sync::CancellationToken;
79
80use super::connection_supervisor::{ConnectionFact, ConnectionSupervisor};
81
82const NETWORK_EVENT_SETTLE_WINDOW: Duration = Duration::from_millis(400);
83const NETWORK_EVENT_RESULT_TIMEOUT: Duration = Duration::from_secs(5);
84const SIGNALING_PROBE_TIMEOUT: Duration = Duration::from_secs(1);
85pub(super) const LONG_BACKGROUND_RECONNECT_THRESHOLD_MS: u64 = 30_000;
86static NEXT_NETWORK_EVENT_REQUEST_ID: AtomicU64 = AtomicU64::new(1);
87
88#[derive(Debug, Clone, PartialEq, Eq, Hash)]
90pub struct NetworkSnapshot {
91 pub sequence: u64,
92 pub availability: NetworkAvailability,
93 pub transport: NetworkTransportFlags,
94 pub is_expensive: bool,
95 pub is_constrained: bool,
96}
97
98impl NetworkSnapshot {
99 pub fn is_offline(&self) -> bool {
100 matches!(self.availability, NetworkAvailability::Unavailable)
101 }
102
103 pub fn should_restore(&self) -> bool {
104 matches!(self.availability, NetworkAvailability::Available)
105 }
106}
107
108#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
110pub enum NetworkAvailability {
111 Unknown,
112 Available,
113 Unavailable,
114}
115
116#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
118pub struct NetworkTransportFlags {
119 pub wifi: bool,
120 pub cellular: bool,
121 pub ethernet: bool,
122 pub vpn: bool,
123 pub other: bool,
124}
125
126#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
128pub enum AppLifecycleState {
129 Background,
130 Foreground { background_duration_ms: u64 },
131}
132
133#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
135pub enum CleanupReason {
136 AppTerminating,
137 UserLogout,
138 StaleConnectionSuspected,
139 ManualReset,
140}
141
142#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
144pub enum ReconnectReason {
145 NetworkPathChanged,
146 LongBackground,
147 ProbeFailed,
148 ManualReconnect,
149 StaleConnectionSuspected,
150}
151
152#[derive(Debug, Clone, PartialEq, Eq, Hash)]
154pub enum NetworkEvent {
155 NetworkPathChanged { snapshot: NetworkSnapshot },
157
158 AppLifecycleChanged { state: AppLifecycleState },
160
161 CleanupConnections { reason: CleanupReason },
168
169 ForceReconnect { reason: ReconnectReason },
171}
172
173#[derive(Debug, Clone, Copy, PartialEq, Eq)]
175pub enum NetworkRecoveryAction {
176 Noop,
177 Offline,
178 Probe,
179 Restore,
180 CleanupOnly,
181 ForceReconnect,
182}
183
184#[derive(Debug, Clone)]
186pub struct NetworkEventResult {
187 pub event: NetworkEvent,
189
190 pub success: bool,
192
193 pub error: Option<String>,
195
196 pub duration_ms: u64,
198}
199
200impl NetworkEventResult {
201 pub fn success(event: NetworkEvent, duration_ms: u64) -> Self {
202 Self {
203 event,
204 success: true,
205 error: None,
206 duration_ms,
207 }
208 }
209
210 pub fn failure(event: NetworkEvent, error: String, duration_ms: u64) -> Self {
211 Self {
212 event,
213 success: false,
214 error: Some(error),
215 duration_ms,
216 }
217 }
218}
219
220#[async_trait::async_trait]
224pub trait NetworkEventProcessor: Send + Sync {
225 async fn process_network_available(&self) -> Result<(), String>;
231
232 async fn process_network_lost(&self) -> Result<(), String>;
238
239 async fn process_network_type_changed(
245 &self,
246 is_wifi: bool,
247 is_cellular: bool,
248 ) -> Result<(), String>;
249
250 async fn cleanup_connections(&self) -> Result<(), String>;
273
274 async fn probe_connectivity(&self) -> Result<(), String> {
276 Ok(())
277 }
278
279 async fn force_reconnect(&self) -> Result<(), String> {
281 self.cleanup_connections().await?;
282 self.process_network_available().await
283 }
284
285 async fn process_network_recovery_action(
290 &self,
291 action: NetworkRecoveryAction,
292 ) -> Result<(), String> {
293 match action {
294 NetworkRecoveryAction::Noop => Ok(()),
295 NetworkRecoveryAction::Offline => self.process_network_lost().await,
296 NetworkRecoveryAction::Probe => self.probe_connectivity().await,
297 NetworkRecoveryAction::Restore => self.process_network_available().await,
298 NetworkRecoveryAction::CleanupOnly => self.cleanup_connections().await,
299 NetworkRecoveryAction::ForceReconnect => self.force_reconnect().await,
300 }
301 }
302}
303
304#[derive(Debug, Clone)]
306pub struct DebounceConfig {
307 pub window: Duration,
309}
310
311impl Default for DebounceConfig {
312 fn default() -> Self {
313 Self {
314 window: Duration::from_secs(2),
316 }
317 }
318}
319
320#[derive(Debug)]
322struct DebounceState {
323 last_available: tokio::sync::Mutex<Option<Instant>>,
324 last_lost: tokio::sync::Mutex<Option<Instant>>,
325 last_type_changed: tokio::sync::Mutex<Option<Instant>>,
326}
327
328impl DebounceState {
329 fn new() -> Self {
330 Self {
331 last_available: tokio::sync::Mutex::new(None),
332 last_lost: tokio::sync::Mutex::new(None),
333 last_type_changed: tokio::sync::Mutex::new(None),
334 }
335 }
336}
337
338#[derive(Debug, Clone, Copy, PartialEq, Eq)]
339enum DebounceEvent {
340 Available,
341 Lost,
342 TypeChanged,
343}
344
345#[derive(Debug)]
346struct SignalingRecoveryState {
347 connect_lock: tokio::sync::Mutex<()>,
348 last_successful_connect: tokio::sync::Mutex<Option<Instant>>,
349}
350
351impl SignalingRecoveryState {
352 fn new() -> Self {
353 Self {
354 connect_lock: tokio::sync::Mutex::new(()),
355 last_successful_connect: tokio::sync::Mutex::new(None),
356 }
357 }
358}
359
360pub struct DefaultNetworkEventProcessor {
362 signaling_client: Arc<dyn SignalingClient>,
363 webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
364 peer_transport: Option<Arc<PeerTransport>>,
365 debounce_config: DebounceConfig,
366 debounce_state: Arc<DebounceState>,
367 recovery_state: Arc<SignalingRecoveryState>,
368}
369
370impl DefaultNetworkEventProcessor {
371 pub fn new(
372 signaling_client: Arc<dyn SignalingClient>,
373 webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
374 ) -> Self {
375 Self::new_with_debounce_and_peer_transport(
376 signaling_client,
377 webrtc_coordinator,
378 DebounceConfig::default(),
379 None,
380 )
381 }
382
383 pub fn new_with_debounce(
384 signaling_client: Arc<dyn SignalingClient>,
385 webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
386 debounce_config: DebounceConfig,
387 ) -> Self {
388 Self::new_with_debounce_and_peer_transport(
389 signaling_client,
390 webrtc_coordinator,
391 debounce_config,
392 None,
393 )
394 }
395
396 pub(crate) fn new_with_peer_transport(
397 signaling_client: Arc<dyn SignalingClient>,
398 webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
399 peer_transport: Option<Arc<PeerTransport>>,
400 ) -> Self {
401 Self::new_with_debounce_and_peer_transport(
402 signaling_client,
403 webrtc_coordinator,
404 DebounceConfig::default(),
405 peer_transport,
406 )
407 }
408
409 pub(crate) fn new_with_debounce_and_peer_transport(
410 signaling_client: Arc<dyn SignalingClient>,
411 webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
412 debounce_config: DebounceConfig,
413 peer_transport: Option<Arc<PeerTransport>>,
414 ) -> Self {
415 Self {
416 signaling_client,
417 webrtc_coordinator,
418 peer_transport,
419 debounce_config,
420 debounce_state: Arc::new(DebounceState::new()),
421 recovery_state: Arc::new(SignalingRecoveryState::new()),
422 }
423 }
424
425 async fn should_process_event(&self, event: DebounceEvent) -> bool {
431 let now = Instant::now();
432
433 match event {
434 DebounceEvent::Available => {
435 let mut last = self.debounce_state.last_available.lock().await;
436 if let Some(last_time) = *last {
437 if now.duration_since(last_time) < self.debounce_config.window {
438 tracing::debug!(
439 "⏸️ Debouncing Network Available event (last event was {:?} ago)",
440 now.duration_since(last_time)
441 );
442 return false;
443 }
444 }
445 *last = Some(now);
446 true
447 }
448 DebounceEvent::Lost => {
449 let mut last = self.debounce_state.last_lost.lock().await;
450 if let Some(last_time) = *last {
451 if now.duration_since(last_time) < self.debounce_config.window {
452 tracing::debug!(
453 "⏸️ Debouncing Network Lost event (last event was {:?} ago)",
454 now.duration_since(last_time)
455 );
456 return false;
457 }
458 }
459 *last = Some(now);
460 true
461 }
462 DebounceEvent::TypeChanged => {
463 let mut last = self.debounce_state.last_type_changed.lock().await;
464 if let Some(last_time) = *last {
465 if now.duration_since(last_time) < self.debounce_config.window {
466 tracing::debug!(
467 "⏸️ Debouncing Network TypeChanged event (last event was {:?} ago)",
468 now.duration_since(last_time)
469 );
470 return false;
471 }
472 }
473 *last = Some(now);
474 true
475 }
476 }
477 }
478
479 async fn ensure_signaling_healthy_once(&self, reason: &str) -> Result<(), String> {
480 let _guard = self.recovery_state.connect_lock.lock().await;
481
482 if !self.signaling_client.is_connected() {
483 tracing::info!(reason = reason, "🔄 Connecting signaling");
484 self.signaling_client.connect_once().await.map_err(|e| {
485 let err_msg = format!("WebSocket connect failed: {}", e);
486 tracing::error!("❌ {}", err_msg);
487 err_msg
488 })?;
489
490 *self.recovery_state.last_successful_connect.lock().await = Some(Instant::now());
491 tracing::info!(reason = reason, "✅ Signaling connected");
492 return Ok(());
493 }
494
495 tracing::debug!(
496 reason = reason,
497 timeout_ms = SIGNALING_PROBE_TIMEOUT.as_millis() as u64,
498 "🔎 Probing existing signaling WebSocket"
499 );
500
501 match self
502 .signaling_client
503 .probe_alive(SIGNALING_PROBE_TIMEOUT)
504 .await
505 {
506 Ok(()) => {
507 tracing::debug!(
508 reason = reason,
509 "✅ Signaling probe succeeded; keeping existing WebSocket"
510 );
511 Ok(())
512 }
513 Err(e) => {
514 tracing::warn!(
515 reason = reason,
516 "⚠️ Signaling probe failed; rebuilding WebSocket: {}",
517 e
518 );
519
520 if let Err(disconnect_err) = self.signaling_client.disconnect().await {
521 tracing::warn!(
522 reason = reason,
523 "⚠️ Failed to disconnect unhealthy signaling before rebuild: {}",
524 disconnect_err
525 );
526 }
527
528 tracing::info!(reason = reason, "🔄 Rebuilding signaling: connecting");
529 self.signaling_client
530 .connect_once()
531 .await
532 .map_err(|connect_err| {
533 let err_msg = format!("WebSocket rebuild failed: {}", connect_err);
534 tracing::error!("❌ {}", err_msg);
535 err_msg
536 })?;
537
538 *self.recovery_state.last_successful_connect.lock().await = Some(Instant::now());
539 tracing::info!(reason = reason, "✅ Signaling rebuilt");
540 Ok(())
541 }
542 }
543 }
544
545 async fn restore_signaling_and_webrtc(&self, reason: &str) -> Result<(), String> {
546 let recovery_targets = if let Some(coordinator) = self.webrtc_coordinator.clone() {
547 coordinator.begin_network_recovery(reason).await
548 } else {
549 Vec::new()
550 };
551
552 self.ensure_signaling_healthy_once(reason).await?;
553
554 let coordinator = self.webrtc_coordinator.clone();
555
556 if let Some(coordinator) = coordinator {
557 if recovery_targets.is_empty() {
558 tracing::info!("♻️ Resuming ICE restart for peers already in network recovery");
559 } else {
560 tracing::info!("♻️ Triggering ICE restart for recovering connections...");
561 }
562 coordinator.restart_network_recovery_connections().await;
563 }
564
565 Ok(())
566 }
567
568 async fn probe_or_restore(&self, reason: &str) -> Result<(), String> {
569 match self.probe_connectivity().await {
570 Ok(()) => Ok(()),
571 Err(e) => {
572 tracing::warn!(
573 reason = reason,
574 "Connectivity probe failed; restoring connections: {}",
575 e
576 );
577 self.restore_signaling_and_webrtc(reason).await
578 }
579 }
580 }
581
582 async fn process_offline(&self) -> Result<(), String> {
583 tracing::info!("📱 Processing: Network offline");
584
585 if let Some(ref coordinator) = self.webrtc_coordinator {
586 coordinator.begin_network_recovery("NetworkLost").await;
587 tracing::info!("🧹 Clearing pending ICE restart attempts...");
588 coordinator.clear_pending_restarts().await;
589 }
590
591 tracing::info!("🔌 Disconnecting WebSocket...");
592 let _ = self.signaling_client.disconnect().await;
593
594 Ok(())
595 }
596}
597
598#[async_trait::async_trait]
599impl NetworkEventProcessor for DefaultNetworkEventProcessor {
600 async fn process_network_available(&self) -> Result<(), String> {
602 let should_process = self.should_process_event(DebounceEvent::Available).await;
604 if !should_process && self.signaling_client.is_connected() {
605 return Ok(());
606 }
607
608 tracing::info!("📱 Processing: Network available");
609
610 self.restore_signaling_and_webrtc("NetworkAvailable").await
611 }
612
613 async fn process_network_lost(&self) -> Result<(), String> {
615 if !self.should_process_event(DebounceEvent::Lost).await {
617 return Ok(());
618 }
619
620 self.process_offline().await
621 }
622
623 async fn process_network_type_changed(
625 &self,
626 is_wifi: bool,
627 is_cellular: bool,
628 ) -> Result<(), String> {
629 let should_process = self.should_process_event(DebounceEvent::TypeChanged).await;
631 if !should_process && self.signaling_client.is_connected() {
632 return Ok(());
633 }
634
635 tracing::info!(
636 "📱 Processing: Network type changed (WiFi={}, Cellular={})",
637 is_wifi,
638 is_cellular
639 );
640
641 self.restore_signaling_and_webrtc("NetworkTypeChanged")
642 .await
643 }
644
645 async fn cleanup_connections(&self) -> Result<(), String> {
651 let _cleanup_guard = self
652 .webrtc_coordinator
653 .as_ref()
654 .map(|coordinator| coordinator.cleanup_guard());
655
656 tracing::info!("🧹 Manually cleaning up all connections...");
657
658 if let Some(ref coordinator) = self.webrtc_coordinator {
660 tracing::info!("♻️ Clearing pending ICE restart attempts...");
661 coordinator.clear_pending_restarts().await;
662 }
663
664 if let Some(ref peer_transport) = self.peer_transport {
666 tracing::info!("🔻 Closing all PeerTransport connections...");
667 if let Err(e) = peer_transport.close_all().await {
668 let err_msg = format!("Failed to close peer transports: {}", e);
669 tracing::warn!("⚠️ {}", err_msg);
670 } else {
672 tracing::info!("✅ All PeerTransport connections closed");
673 }
674 }
675
676 if let Some(ref coordinator) = self.webrtc_coordinator {
678 tracing::info!("🔻 Closing all WebRTC peer connections...");
679 if let Err(e) = coordinator.close_all_peers().await {
680 let err_msg = format!("Failed to close all peers: {}", e);
681 tracing::warn!("⚠️ {}", err_msg);
682 } else {
684 tracing::info!("✅ All WebRTC peer connections closed");
685 }
686 }
687
688 tracing::info!("🔌 Disconnecting WebSocket...");
690 match self.signaling_client.disconnect().await {
691 Ok(_) => {
692 tracing::info!("✅ WebSocket disconnected successfully");
693 }
694 Err(e) => {
695 let err_msg = format!("Failed to disconnect WebSocket: {}", e);
696 tracing::warn!("⚠️ {}", err_msg);
697 }
699 }
700
701 tracing::info!("✅ Connection cleanup completed");
702
703 Ok(())
704 }
705
706 async fn probe_connectivity(&self) -> Result<(), String> {
707 self.signaling_client
708 .probe_alive(SIGNALING_PROBE_TIMEOUT)
709 .await
710 .map_err(|e| format!("Signaling probe failed: {}", e))
711 }
712
713 async fn force_reconnect(&self) -> Result<(), String> {
714 self.cleanup_connections().await?;
715 self.restore_signaling_and_webrtc("ForceReconnect").await
716 }
717
718 async fn process_network_recovery_action(
719 &self,
720 action: NetworkRecoveryAction,
721 ) -> Result<(), String> {
722 match action {
723 NetworkRecoveryAction::Noop => Ok(()),
724 NetworkRecoveryAction::Offline => self.process_offline().await,
725 NetworkRecoveryAction::Probe => self.probe_or_restore("Probe").await,
726 NetworkRecoveryAction::Restore => {
727 self.restore_signaling_and_webrtc("NetworkEventBatch").await
728 }
729 NetworkRecoveryAction::CleanupOnly => self.cleanup_connections().await,
730 NetworkRecoveryAction::ForceReconnect => self.force_reconnect().await,
731 }
732 }
733}
734
735pub fn select_network_recovery_action(events: &[NetworkEvent]) -> NetworkRecoveryAction {
736 ConnectionSupervisor::select_action(events)
737}
738
739pub async fn process_network_event_batch(
740 events: Vec<NetworkEvent>,
741 processor: Arc<dyn NetworkEventProcessor>,
742) -> Vec<NetworkEventResult> {
743 if events.is_empty() {
744 return Vec::new();
745 }
746
747 let action = select_network_recovery_action(&events);
748 let start = Instant::now();
749
750 tracing::info!(
751 event_count = events.len(),
752 action = ?action,
753 "network_event.action.start"
754 );
755
756 let result = processor.process_network_recovery_action(action).await;
757
758 let duration_ms = start.elapsed().as_millis() as u64;
759 match &result {
760 Ok(()) => tracing::info!(
761 event_count = events.len(),
762 action = ?action,
763 duration_ms,
764 "network_event.action.completed"
765 ),
766 Err(e) => tracing::warn!(
767 event_count = events.len(),
768 action = ?action,
769 duration_ms,
770 error = %e,
771 "network_event.action.completed"
772 ),
773 }
774
775 events
776 .into_iter()
777 .map(|event| match &result {
778 Ok(()) => NetworkEventResult::success(event, duration_ms),
779 Err(e) => NetworkEventResult::failure(event, e.clone(), duration_ms),
780 })
781 .collect()
782}
783
784pub struct NetworkEventRequest {
785 pub event: NetworkEvent,
786 pub result_tx: oneshot::Sender<NetworkEventResult>,
787}
788
789pub async fn run_network_event_reconciler(
790 mut event_rx: mpsc::Receiver<NetworkEventRequest>,
791 processor: Arc<dyn NetworkEventProcessor>,
792 shutdown_token: CancellationToken,
793) {
794 tracing::info!("🔄 Network event reconciler started");
795
796 loop {
797 tokio::select! {
798 Some(first_request) = event_rx.recv() => {
799 tracing::debug!(
800 event = ?first_request.event,
801 "network_event.reconciler.received"
802 );
803 let mut requests = vec![first_request];
804 let settle = tokio::time::sleep(NETWORK_EVENT_SETTLE_WINDOW);
805 tokio::pin!(settle);
806
807 loop {
808 tokio::select! {
809 Some(next_request) = event_rx.recv() => {
810 tracing::debug!(
811 event = ?next_request.event,
812 "network_event.reconciler.coalesced"
813 );
814 requests.push(next_request);
815 }
816 _ = &mut settle => {
817 break;
818 }
819 _ = shutdown_token.cancelled() => {
820 tracing::info!("🛑 Network event reconciler shutting down");
821 return;
822 }
823 else => {
824 break;
825 }
826 }
827 }
828
829 while let Ok(next_request) = event_rx.try_recv() {
830 tracing::debug!(
831 event = ?next_request.event,
832 "network_event.reconciler.coalesced"
833 );
834 requests.push(next_request);
835 }
836
837 let events = requests
838 .iter()
839 .map(|request| request.event.clone())
840 .collect::<Vec<_>>();
841 let action = select_network_recovery_action(&events);
842 let facts = events
843 .iter()
844 .map(ConnectionFact::from_network_event)
845 .collect::<Vec<_>>();
846 tracing::info!(
847 event_count = events.len(),
848 action = ?action,
849 events = ?events,
850 facts = ?facts,
851 settle_window_ms = NETWORK_EVENT_SETTLE_WINDOW.as_millis() as u64,
852 "network_event.reconciler.batch_reconciled"
853 );
854
855 let results = process_network_event_batch(events, processor.clone()).await;
856
857 for (request, result) in requests.into_iter().zip(results) {
858 if request.result_tx.send(result).is_err() {
859 tracing::debug!("Network event caller dropped before receiving result");
860 }
861 }
862 }
863 _ = shutdown_token.cancelled() => {
864 tracing::info!("🛑 Network event reconciler shutting down");
865 break;
866 }
867 else => break,
868 }
869 }
870}
871
872pub struct NetworkEventHandle {
877 event_tx: mpsc::Sender<NetworkEventRequest>,
879 result_timeout: Duration,
880}
881
882impl NetworkEventHandle {
883 pub fn new(event_tx: mpsc::Sender<NetworkEventRequest>) -> Self {
885 Self::new_with_result_timeout(event_tx, NETWORK_EVENT_RESULT_TIMEOUT)
886 }
887
888 pub fn new_with_result_timeout(
894 event_tx: mpsc::Sender<NetworkEventRequest>,
895 result_timeout: Duration,
896 ) -> Self {
897 Self {
898 event_tx,
899 result_timeout,
900 }
901 }
902
903 pub async fn handle_network_path_changed(
905 &self,
906 snapshot: NetworkSnapshot,
907 ) -> Result<NetworkEventResult, String> {
908 self.send_event_and_await_result(NetworkEvent::NetworkPathChanged { snapshot })
909 .await
910 }
911
912 pub async fn handle_app_lifecycle_changed(
914 &self,
915 state: AppLifecycleState,
916 ) -> Result<NetworkEventResult, String> {
917 self.send_event_and_await_result(NetworkEvent::AppLifecycleChanged { state })
918 .await
919 }
920
921 pub async fn cleanup_connections(
923 &self,
924 reason: CleanupReason,
925 ) -> Result<NetworkEventResult, String> {
926 self.send_event_and_await_result(NetworkEvent::CleanupConnections { reason })
927 .await
928 }
929
930 pub async fn force_reconnect(
932 &self,
933 reason: ReconnectReason,
934 ) -> Result<NetworkEventResult, String> {
935 self.send_event_and_await_result(NetworkEvent::ForceReconnect { reason })
936 .await
937 }
938
939 async fn send_event_and_await_result(
941 &self,
942 event: NetworkEvent,
943 ) -> Result<NetworkEventResult, String> {
944 let event_request_id = NEXT_NETWORK_EVENT_REQUEST_ID.fetch_add(1, Ordering::Relaxed);
945 let start = Instant::now();
946 let (result_tx, result_rx) = oneshot::channel();
947 let request = NetworkEventRequest {
948 event: event.clone(),
949 result_tx,
950 };
951
952 tracing::info!(
953 event_request_id,
954 event = ?event,
955 result_timeout_ms = self.result_timeout.as_millis() as u64,
956 "network_event.handle.enqueue"
957 );
958
959 if let Err(e) = self.event_tx.send(request).await {
960 let err = format!("Failed to send network event: {}", e);
961 tracing::warn!(
962 event_request_id,
963 event = ?event,
964 error = %err,
965 "network_event.handle.enqueue_failed"
966 );
967 return Err(err);
968 }
969
970 let result = match tokio::time::timeout(self.result_timeout, result_rx).await {
971 Ok(Ok(result)) => Ok(result),
972 Ok(Err(_)) => Err("Failed to receive network event result".to_string()),
973 Err(_) => Err(format!(
974 "Timed out waiting for network event result after {}ms",
975 self.result_timeout.as_millis()
976 )),
977 };
978
979 let wait_ms = start.elapsed().as_millis() as u64;
980 match &result {
981 Ok(result) if result.success => tracing::info!(
982 event_request_id,
983 event = ?event,
984 result_event = ?result.event,
985 duration_ms = result.duration_ms,
986 wait_ms,
987 "network_event.handle.result_received"
988 ),
989 Ok(result) => tracing::warn!(
990 event_request_id,
991 event = ?event,
992 result_event = ?result.event,
993 duration_ms = result.duration_ms,
994 wait_ms,
995 error = ?result.error,
996 "network_event.handle.result_received"
997 ),
998 Err(e) => tracing::warn!(
999 event_request_id,
1000 event = ?event,
1001 wait_ms,
1002 error = %e,
1003 "network_event.handle.result_failed"
1004 ),
1005 }
1006
1007 result
1008 }
1009}
1010
1011impl Clone for NetworkEventHandle {
1012 fn clone(&self) -> Self {
1013 Self {
1014 event_tx: self.event_tx.clone(),
1015 result_timeout: self.result_timeout,
1016 }
1017 }
1018}