Skip to main content

actr_hyper/lifecycle/
network_event.rs

1//! Network Event Handling Architecture
2//!
3//! This module defines the network event handling infrastructure.
4//!
5//! # Architecture Overview
6//!
7//! ```text
8//!        ┌─────────────────────────────────────────────┐
9//!        │ (FFI Path - Implemented)  (Actor Path - TODO)
10//!        ▼                                             ▼
11//! ┌──────────────────────────┐      ┌──────────────────────────┐
12//! │ NetworkEventHandle       │      │ Direct Proto Message     │
13//! │ • Platform FFI calls     │      │ • Actor call/tell        │
14//! │ • Send via channel       │      │ • Send to actor mailbox  │
15//! │ • Await result           │      │ • No handle needed       │
16//! └────────┬─────────────────┘      └──────┬───────────────────┘
17//!          │                               │
18//!          └───────────────┬───────────────┘
19//!                          │ Both trigger
20//!                          ▼
21//! ┌─────────────────────────────────────────────────────────┐
22//! │  ActrNode::network_event_loop()                         │
23//! │  • Receive event from channel (FFI path)                │
24//! │  • Or handle message directly (Actor path - TODO)       │
25//! │  • Delegate to NetworkEventProcessor                    │
26//! │  • Send result back via channel                         │
27//! └──────────────────────┬──────────────────────────────────┘
28//!                        │ Delegate
29//!                        ▼
30//! ┌─────────────────────────────────────────────────────────┐
31//! │  NetworkEventProcessor (Trait)                          │
32//! │                                                          │
33//! │  DefaultNetworkEventProcessor:                          │
34//! │  • reconcile settled network/app events                 │
35//! │  • execute one recovery action                          │
36//! │    └─► Offline / Probe / Restore / Cleanup / Reconnect  │
37//! └─────────────────────────────────────────────────────────┘
38//! ```
39//!
40//! # Key Components
41//!
42//! - **NetworkEvent**: Unified mobile network/app/command events
43//! - **NetworkEventResult**: Processing result with success/error/duration
44//! - **NetworkEventProcessor**: Trait for custom event handling logic
45//! - **DefaultNetworkEventProcessor**: Default implementation with signaling + WebRTC recovery
46//!
47//! # Usage Patterns
48//!
49//! ## 1. Platform FFI Call (Primary, Implemented)
50//! ```ignore
51//! // Platform layer calls NetworkEventHandle via FFI
52//! let network_handle = system.create_network_event_handle();
53//! let result = network_handle.handle_network_path_changed(snapshot).await?;
54//! if result.success {
55//!     println!("Processed in {}ms", result.duration_ms);
56//! }
57//! ```
58//!
59//! ## 2. Actor Proto Message (Optional, TODO)
60//! ```ignore
61//! // TODO: actors send proto message directly (not yet implemented)
62//! actor_ref.call(NetworkPathChangedMessage { snapshot }).await?;
63//! ```
64//!
65//! **Key Differences:**
66//! - FFI path: Uses NetworkEventHandle + channel (implemented)
67//! - Actor path: Direct proto message to mailbox (TODO, future enhancement)
68
69use std::sync::{
70    Arc,
71    atomic::{AtomicU64, Ordering},
72};
73use std::time::{Duration, Instant};
74
75use crate::transport::PeerTransport;
76use crate::wire::webrtc::{SignalingClient, WebRtcCoordinator};
77use tokio::sync::{mpsc, oneshot};
78use tokio_util::sync::CancellationToken;
79
80use super::connection_supervisor::{ConnectionFact, ConnectionSupervisor};
81
82const NETWORK_EVENT_SETTLE_WINDOW: Duration = Duration::from_millis(400);
83const NETWORK_EVENT_RESULT_TIMEOUT: Duration = Duration::from_secs(5);
84const SIGNALING_PROBE_TIMEOUT: Duration = Duration::from_secs(1);
85pub(super) const LONG_BACKGROUND_RECONNECT_THRESHOLD_MS: u64 = 30_000;
86static NEXT_NETWORK_EVENT_REQUEST_ID: AtomicU64 = AtomicU64::new(1);
87
88/// Mobile network path snapshot.
89#[derive(Debug, Clone, PartialEq, Eq, Hash)]
90pub struct NetworkSnapshot {
91    pub sequence: u64,
92    pub availability: NetworkAvailability,
93    pub transport: NetworkTransportFlags,
94    pub is_expensive: bool,
95    pub is_constrained: bool,
96}
97
98impl NetworkSnapshot {
99    pub fn is_offline(&self) -> bool {
100        matches!(self.availability, NetworkAvailability::Unavailable)
101    }
102
103    pub fn should_restore(&self) -> bool {
104        matches!(self.availability, NetworkAvailability::Available)
105    }
106}
107
108/// Whether the platform currently has a usable network path.
109#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
110pub enum NetworkAvailability {
111    Unknown,
112    Available,
113    Unavailable,
114}
115
116/// Active network transport flags. Multiple flags can be true at the same time.
117#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
118pub struct NetworkTransportFlags {
119    pub wifi: bool,
120    pub cellular: bool,
121    pub ethernet: bool,
122    pub vpn: bool,
123    pub other: bool,
124}
125
126/// App lifecycle state relevant to connection recovery.
127#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
128pub enum AppLifecycleState {
129    Background,
130    Foreground { background_duration_ms: u64 },
131}
132
133/// Reason for a cleanup-only operation.
134#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
135pub enum CleanupReason {
136    AppTerminating,
137    UserLogout,
138    StaleConnectionSuspected,
139    ManualReset,
140}
141
142/// Reason for a forced cleanup + restore operation.
143#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
144pub enum ReconnectReason {
145    NetworkPathChanged,
146    LongBackground,
147    ProbeFailed,
148    ManualReconnect,
149    StaleConnectionSuspected,
150}
151
152/// Network event type
153#[derive(Debug, Clone, PartialEq, Eq, Hash)]
154pub enum NetworkEvent {
155    /// Full mobile network path changed.
156    NetworkPathChanged { snapshot: NetworkSnapshot },
157
158    /// App lifecycle changed.
159    AppLifecycleChanged { state: AppLifecycleState },
160
161    /// Proactively clean up all connections
162    ///
163    /// Used for app lifecycle management scenarios:
164    /// - App entering background
165    /// - User actively logging out
166    /// - App about to exit
167    CleanupConnections { reason: CleanupReason },
168
169    /// Proactively clean up and restore connections.
170    ForceReconnect { reason: ReconnectReason },
171}
172
173/// Final action selected from a settled batch of network events.
174#[derive(Debug, Clone, Copy, PartialEq, Eq)]
175pub enum NetworkRecoveryAction {
176    Noop,
177    Offline,
178    Probe,
179    Restore,
180    CleanupOnly,
181    ForceReconnect,
182}
183
184/// Network event processing result
185#[derive(Debug, Clone)]
186pub struct NetworkEventResult {
187    /// Event type
188    pub event: NetworkEvent,
189
190    /// Whether processing succeeded
191    pub success: bool,
192
193    /// Error message (if failed)
194    pub error: Option<String>,
195
196    /// Processing duration (milliseconds)
197    pub duration_ms: u64,
198}
199
200impl NetworkEventResult {
201    pub fn success(event: NetworkEvent, duration_ms: u64) -> Self {
202        Self {
203            event,
204            success: true,
205            error: None,
206            duration_ms,
207        }
208    }
209
210    pub fn failure(event: NetworkEvent, error: String, duration_ms: u64) -> Self {
211        Self {
212            event,
213            success: false,
214            error: Some(error),
215            duration_ms,
216        }
217    }
218}
219
220/// Network event processor trait
221///
222/// Defines the processing logic for network events; can be custom-implemented by users
223#[async_trait::async_trait]
224pub trait NetworkEventProcessor: Send + Sync {
225    /// Process network available event
226    ///
227    /// # Returns
228    /// - `Ok(())`: processing succeeded
229    /// - `Err(String)`: processing failed, contains error message
230    async fn process_network_available(&self) -> Result<(), String>;
231
232    /// Process network lost event
233    ///
234    /// # Returns
235    /// - `Ok(())`: processing succeeded
236    /// - `Err(String)`: processing failed, contains error message
237    async fn process_network_lost(&self) -> Result<(), String>;
238
239    /// Process network type changed event
240    ///
241    /// # Returns
242    /// - `Ok(())`: processing succeeded
243    /// - `Err(String)`: processing failed, contains error message
244    async fn process_network_type_changed(
245        &self,
246        is_wifi: bool,
247        is_cellular: bool,
248    ) -> Result<(), String>;
249
250    /// Proactively clean up all connections
251    ///
252    /// This method proactively cleans up all network connections. Applicable scenarios:
253    /// - App entering background (iOS/Android)
254    /// - User actively logging out
255    /// - App about to exit
256    /// - Need to reset network state
257    ///
258    /// # FFI Binding Note
259    ///
260    /// This method is specifically designed for FFI bindings, allowing upper-layer
261    /// platform code (Swift/Kotlin) to proactively manage connection lifecycle
262    /// through the unified `NetworkEventProcessor` interface.
263    ///
264    /// # Difference from Event Response
265    ///
266    /// - `process_network_lost()`: passively responds to network disconnection events
267    /// - `cleanup_connections()`: proactively cleans up connections (independent of network events)
268    ///
269    /// # Returns
270    /// - `Ok(())`: cleanup succeeded
271    /// - `Err(String)`: cleanup failed, contains error message
272    async fn cleanup_connections(&self) -> Result<(), String>;
273
274    /// Probe existing connectivity without forcing cleanup.
275    async fn probe_connectivity(&self) -> Result<(), String> {
276        Ok(())
277    }
278
279    /// Proactively clean up and restore connections.
280    async fn force_reconnect(&self) -> Result<(), String> {
281        self.cleanup_connections().await?;
282        self.process_network_available().await
283    }
284
285    /// Process the final action selected from a settled event batch.
286    ///
287    /// Custom processors can rely on the default mapping. The default runtime
288    /// processor overrides this to bypass per-event debounce after reconciliation.
289    async fn process_network_recovery_action(
290        &self,
291        action: NetworkRecoveryAction,
292    ) -> Result<(), String> {
293        match action {
294            NetworkRecoveryAction::Noop => Ok(()),
295            NetworkRecoveryAction::Offline => self.process_network_lost().await,
296            NetworkRecoveryAction::Probe => self.probe_connectivity().await,
297            NetworkRecoveryAction::Restore => self.process_network_available().await,
298            NetworkRecoveryAction::CleanupOnly => self.cleanup_connections().await,
299            NetworkRecoveryAction::ForceReconnect => self.force_reconnect().await,
300        }
301    }
302}
303
304/// Debounce configuration
305#[derive(Debug, Clone)]
306pub struct DebounceConfig {
307    /// Debounce time window (duplicate events within this window are ignored)
308    pub window: Duration,
309}
310
311impl Default for DebounceConfig {
312    fn default() -> Self {
313        Self {
314            // Default debounce window
315            window: Duration::from_secs(2),
316        }
317    }
318}
319
320/// Debounce state tracking
321#[derive(Debug)]
322struct DebounceState {
323    last_available: tokio::sync::Mutex<Option<Instant>>,
324    last_lost: tokio::sync::Mutex<Option<Instant>>,
325    last_type_changed: tokio::sync::Mutex<Option<Instant>>,
326}
327
328impl DebounceState {
329    fn new() -> Self {
330        Self {
331            last_available: tokio::sync::Mutex::new(None),
332            last_lost: tokio::sync::Mutex::new(None),
333            last_type_changed: tokio::sync::Mutex::new(None),
334        }
335    }
336}
337
338#[derive(Debug, Clone, Copy, PartialEq, Eq)]
339enum DebounceEvent {
340    Available,
341    Lost,
342    TypeChanged,
343}
344
345#[derive(Debug)]
346struct SignalingRecoveryState {
347    connect_lock: tokio::sync::Mutex<()>,
348    last_successful_connect: tokio::sync::Mutex<Option<Instant>>,
349}
350
351impl SignalingRecoveryState {
352    fn new() -> Self {
353        Self {
354            connect_lock: tokio::sync::Mutex::new(()),
355            last_successful_connect: tokio::sync::Mutex::new(None),
356        }
357    }
358}
359
360/// Default network event processor implementation
361pub struct DefaultNetworkEventProcessor {
362    signaling_client: Arc<dyn SignalingClient>,
363    webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
364    peer_transport: Option<Arc<PeerTransport>>,
365    debounce_config: DebounceConfig,
366    debounce_state: Arc<DebounceState>,
367    recovery_state: Arc<SignalingRecoveryState>,
368}
369
370impl DefaultNetworkEventProcessor {
371    pub fn new(
372        signaling_client: Arc<dyn SignalingClient>,
373        webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
374    ) -> Self {
375        Self::new_with_debounce_and_peer_transport(
376            signaling_client,
377            webrtc_coordinator,
378            DebounceConfig::default(),
379            None,
380        )
381    }
382
383    pub fn new_with_debounce(
384        signaling_client: Arc<dyn SignalingClient>,
385        webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
386        debounce_config: DebounceConfig,
387    ) -> Self {
388        Self::new_with_debounce_and_peer_transport(
389            signaling_client,
390            webrtc_coordinator,
391            debounce_config,
392            None,
393        )
394    }
395
396    pub(crate) fn new_with_peer_transport(
397        signaling_client: Arc<dyn SignalingClient>,
398        webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
399        peer_transport: Option<Arc<PeerTransport>>,
400    ) -> Self {
401        Self::new_with_debounce_and_peer_transport(
402            signaling_client,
403            webrtc_coordinator,
404            DebounceConfig::default(),
405            peer_transport,
406        )
407    }
408
409    pub(crate) fn new_with_debounce_and_peer_transport(
410        signaling_client: Arc<dyn SignalingClient>,
411        webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
412        debounce_config: DebounceConfig,
413        peer_transport: Option<Arc<PeerTransport>>,
414    ) -> Self {
415        Self {
416            signaling_client,
417            webrtc_coordinator,
418            peer_transport,
419            debounce_config,
420            debounce_state: Arc::new(DebounceState::new()),
421            recovery_state: Arc::new(SignalingRecoveryState::new()),
422        }
423    }
424
425    /// Check whether an event should be filtered by debounce
426    ///
427    /// # Returns
428    /// - `true`: the event should be processed
429    /// - `false`: the event is within the debounce window and should be ignored
430    async fn should_process_event(&self, event: DebounceEvent) -> bool {
431        let now = Instant::now();
432
433        match event {
434            DebounceEvent::Available => {
435                let mut last = self.debounce_state.last_available.lock().await;
436                if let Some(last_time) = *last {
437                    if now.duration_since(last_time) < self.debounce_config.window {
438                        tracing::debug!(
439                            "⏸️  Debouncing Network Available event (last event was {:?} ago)",
440                            now.duration_since(last_time)
441                        );
442                        return false;
443                    }
444                }
445                *last = Some(now);
446                true
447            }
448            DebounceEvent::Lost => {
449                let mut last = self.debounce_state.last_lost.lock().await;
450                if let Some(last_time) = *last {
451                    if now.duration_since(last_time) < self.debounce_config.window {
452                        tracing::debug!(
453                            "⏸️  Debouncing Network Lost event (last event was {:?} ago)",
454                            now.duration_since(last_time)
455                        );
456                        return false;
457                    }
458                }
459                *last = Some(now);
460                true
461            }
462            DebounceEvent::TypeChanged => {
463                let mut last = self.debounce_state.last_type_changed.lock().await;
464                if let Some(last_time) = *last {
465                    if now.duration_since(last_time) < self.debounce_config.window {
466                        tracing::debug!(
467                            "⏸️  Debouncing Network TypeChanged event (last event was {:?} ago)",
468                            now.duration_since(last_time)
469                        );
470                        return false;
471                    }
472                }
473                *last = Some(now);
474                true
475            }
476        }
477    }
478
479    async fn ensure_signaling_healthy_once(&self, reason: &str) -> Result<(), String> {
480        let _guard = self.recovery_state.connect_lock.lock().await;
481
482        if !self.signaling_client.is_connected() {
483            tracing::info!(reason = reason, "🔄 Connecting signaling");
484            self.signaling_client.connect_once().await.map_err(|e| {
485                let err_msg = format!("WebSocket connect failed: {}", e);
486                tracing::error!("❌ {}", err_msg);
487                err_msg
488            })?;
489
490            *self.recovery_state.last_successful_connect.lock().await = Some(Instant::now());
491            tracing::info!(reason = reason, "✅ Signaling connected");
492            return Ok(());
493        }
494
495        tracing::debug!(
496            reason = reason,
497            timeout_ms = SIGNALING_PROBE_TIMEOUT.as_millis() as u64,
498            "🔎 Probing existing signaling WebSocket"
499        );
500
501        match self
502            .signaling_client
503            .probe_alive(SIGNALING_PROBE_TIMEOUT)
504            .await
505        {
506            Ok(()) => {
507                tracing::debug!(
508                    reason = reason,
509                    "✅ Signaling probe succeeded; keeping existing WebSocket"
510                );
511                Ok(())
512            }
513            Err(e) => {
514                tracing::warn!(
515                    reason = reason,
516                    "⚠️ Signaling probe failed; rebuilding WebSocket: {}",
517                    e
518                );
519
520                if let Err(disconnect_err) = self.signaling_client.disconnect().await {
521                    tracing::warn!(
522                        reason = reason,
523                        "⚠️ Failed to disconnect unhealthy signaling before rebuild: {}",
524                        disconnect_err
525                    );
526                }
527
528                tracing::info!(reason = reason, "🔄 Rebuilding signaling: connecting");
529                self.signaling_client
530                    .connect_once()
531                    .await
532                    .map_err(|connect_err| {
533                        let err_msg = format!("WebSocket rebuild failed: {}", connect_err);
534                        tracing::error!("❌ {}", err_msg);
535                        err_msg
536                    })?;
537
538                *self.recovery_state.last_successful_connect.lock().await = Some(Instant::now());
539                tracing::info!(reason = reason, "✅ Signaling rebuilt");
540                Ok(())
541            }
542        }
543    }
544
545    async fn restore_signaling_and_webrtc(&self, reason: &str) -> Result<(), String> {
546        let recovery_targets = if let Some(coordinator) = self.webrtc_coordinator.clone() {
547            coordinator.begin_network_recovery(reason).await
548        } else {
549            Vec::new()
550        };
551
552        self.ensure_signaling_healthy_once(reason).await?;
553
554        let coordinator = self.webrtc_coordinator.clone();
555
556        if let Some(coordinator) = coordinator {
557            if recovery_targets.is_empty() {
558                tracing::info!("♻️ Resuming ICE restart for peers already in network recovery");
559            } else {
560                tracing::info!("♻️ Triggering ICE restart for recovering connections...");
561            }
562            coordinator.restart_network_recovery_connections().await;
563        }
564
565        Ok(())
566    }
567
568    async fn probe_or_restore(&self, reason: &str) -> Result<(), String> {
569        match self.probe_connectivity().await {
570            Ok(()) => Ok(()),
571            Err(e) => {
572                tracing::warn!(
573                    reason = reason,
574                    "Connectivity probe failed; restoring connections: {}",
575                    e
576                );
577                self.restore_signaling_and_webrtc(reason).await
578            }
579        }
580    }
581
582    async fn process_offline(&self) -> Result<(), String> {
583        tracing::info!("📱 Processing: Network offline");
584
585        if let Some(ref coordinator) = self.webrtc_coordinator {
586            coordinator.begin_network_recovery("NetworkLost").await;
587            tracing::info!("🧹 Clearing pending ICE restart attempts...");
588            coordinator.clear_pending_restarts().await;
589        }
590
591        tracing::info!("🔌 Disconnecting WebSocket...");
592        let _ = self.signaling_client.disconnect().await;
593
594        Ok(())
595    }
596}
597
598#[async_trait::async_trait]
599impl NetworkEventProcessor for DefaultNetworkEventProcessor {
600    /// Process network available event
601    async fn process_network_available(&self) -> Result<(), String> {
602        // Debounce check
603        let should_process = self.should_process_event(DebounceEvent::Available).await;
604        if !should_process && self.signaling_client.is_connected() {
605            return Ok(());
606        }
607
608        tracing::info!("📱 Processing: Network available");
609
610        self.restore_signaling_and_webrtc("NetworkAvailable").await
611    }
612
613    /// Process network lost event
614    async fn process_network_lost(&self) -> Result<(), String> {
615        // Debounce check
616        if !self.should_process_event(DebounceEvent::Lost).await {
617            return Ok(());
618        }
619
620        self.process_offline().await
621    }
622
623    /// Process network type changed event
624    async fn process_network_type_changed(
625        &self,
626        is_wifi: bool,
627        is_cellular: bool,
628    ) -> Result<(), String> {
629        // Debounce check
630        let should_process = self.should_process_event(DebounceEvent::TypeChanged).await;
631        if !should_process && self.signaling_client.is_connected() {
632            return Ok(());
633        }
634
635        tracing::info!(
636            "📱 Processing: Network type changed (WiFi={}, Cellular={})",
637            is_wifi,
638            is_cellular
639        );
640
641        self.restore_signaling_and_webrtc("NetworkTypeChanged")
642            .await
643    }
644
645    /// Proactively clean up all connections
646    ///
647    /// Differs from `process_network_lost()`:
648    /// - No debounce check (proactive calls always execute)
649    /// - Intended for app lifecycle management, not network event response
650    async fn cleanup_connections(&self) -> Result<(), String> {
651        let _cleanup_guard = self
652            .webrtc_coordinator
653            .as_ref()
654            .map(|coordinator| coordinator.cleanup_guard());
655
656        tracing::info!("🧹 Manually cleaning up all connections...");
657
658        // Step 1: Clear pending ICE restart attempts
659        if let Some(ref coordinator) = self.webrtc_coordinator {
660            tracing::info!("♻️  Clearing pending ICE restart attempts...");
661            coordinator.clear_pending_restarts().await;
662        }
663
664        // Step 2: Cancel PeerTransport singleflight and close established transports.
665        if let Some(ref peer_transport) = self.peer_transport {
666            tracing::info!("🔻 Closing all PeerTransport connections...");
667            if let Err(e) = peer_transport.close_all().await {
668                let err_msg = format!("Failed to close peer transports: {}", e);
669                tracing::warn!("⚠️  {}", err_msg);
670                // Do not fail the whole cleanup; continue releasing other resources.
671            } else {
672                tracing::info!("✅ All PeerTransport connections closed");
673            }
674        }
675
676        // Step 3: Close all WebRTC peer connections
677        if let Some(ref coordinator) = self.webrtc_coordinator {
678            tracing::info!("🔻 Closing all WebRTC peer connections...");
679            if let Err(e) = coordinator.close_all_peers().await {
680                let err_msg = format!("Failed to close all peers: {}", e);
681                tracing::warn!("⚠️  {}", err_msg);
682                // Do not fail the whole cleanup; continue releasing other resources.
683            } else {
684                tracing::info!("✅ All WebRTC peer connections closed");
685            }
686        }
687
688        // Step 4: Proactively disconnect the WebSocket.
689        tracing::info!("🔌 Disconnecting WebSocket...");
690        match self.signaling_client.disconnect().await {
691            Ok(_) => {
692                tracing::info!("✅ WebSocket disconnected successfully");
693            }
694            Err(e) => {
695                let err_msg = format!("Failed to disconnect WebSocket: {}", e);
696                tracing::warn!("⚠️  {}", err_msg);
697                // Do not fail the whole cleanup; continue releasing other resources.
698            }
699        }
700
701        tracing::info!("✅ Connection cleanup completed");
702
703        Ok(())
704    }
705
706    async fn probe_connectivity(&self) -> Result<(), String> {
707        self.signaling_client
708            .probe_alive(SIGNALING_PROBE_TIMEOUT)
709            .await
710            .map_err(|e| format!("Signaling probe failed: {}", e))
711    }
712
713    async fn force_reconnect(&self) -> Result<(), String> {
714        self.cleanup_connections().await?;
715        self.restore_signaling_and_webrtc("ForceReconnect").await
716    }
717
718    async fn process_network_recovery_action(
719        &self,
720        action: NetworkRecoveryAction,
721    ) -> Result<(), String> {
722        match action {
723            NetworkRecoveryAction::Noop => Ok(()),
724            NetworkRecoveryAction::Offline => self.process_offline().await,
725            NetworkRecoveryAction::Probe => self.probe_or_restore("Probe").await,
726            NetworkRecoveryAction::Restore => {
727                self.restore_signaling_and_webrtc("NetworkEventBatch").await
728            }
729            NetworkRecoveryAction::CleanupOnly => self.cleanup_connections().await,
730            NetworkRecoveryAction::ForceReconnect => self.force_reconnect().await,
731        }
732    }
733}
734
735pub fn select_network_recovery_action(events: &[NetworkEvent]) -> NetworkRecoveryAction {
736    ConnectionSupervisor::select_action(events)
737}
738
739pub async fn process_network_event_batch(
740    events: Vec<NetworkEvent>,
741    processor: Arc<dyn NetworkEventProcessor>,
742) -> Vec<NetworkEventResult> {
743    if events.is_empty() {
744        return Vec::new();
745    }
746
747    let action = select_network_recovery_action(&events);
748    let start = Instant::now();
749
750    tracing::info!(
751        event_count = events.len(),
752        action = ?action,
753        "network_event.action.start"
754    );
755
756    let result = processor.process_network_recovery_action(action).await;
757
758    let duration_ms = start.elapsed().as_millis() as u64;
759    match &result {
760        Ok(()) => tracing::info!(
761            event_count = events.len(),
762            action = ?action,
763            duration_ms,
764            "network_event.action.completed"
765        ),
766        Err(e) => tracing::warn!(
767            event_count = events.len(),
768            action = ?action,
769            duration_ms,
770            error = %e,
771            "network_event.action.completed"
772        ),
773    }
774
775    events
776        .into_iter()
777        .map(|event| match &result {
778            Ok(()) => NetworkEventResult::success(event, duration_ms),
779            Err(e) => NetworkEventResult::failure(event, e.clone(), duration_ms),
780        })
781        .collect()
782}
783
784pub struct NetworkEventRequest {
785    pub event: NetworkEvent,
786    pub result_tx: oneshot::Sender<NetworkEventResult>,
787}
788
789pub async fn run_network_event_reconciler(
790    mut event_rx: mpsc::Receiver<NetworkEventRequest>,
791    processor: Arc<dyn NetworkEventProcessor>,
792    shutdown_token: CancellationToken,
793) {
794    tracing::info!("🔄 Network event reconciler started");
795
796    loop {
797        tokio::select! {
798            Some(first_request) = event_rx.recv() => {
799                tracing::debug!(
800                    event = ?first_request.event,
801                    "network_event.reconciler.received"
802                );
803                let mut requests = vec![first_request];
804                let settle = tokio::time::sleep(NETWORK_EVENT_SETTLE_WINDOW);
805                tokio::pin!(settle);
806
807                loop {
808                    tokio::select! {
809                        Some(next_request) = event_rx.recv() => {
810                            tracing::debug!(
811                                event = ?next_request.event,
812                                "network_event.reconciler.coalesced"
813                            );
814                            requests.push(next_request);
815                        }
816                        _ = &mut settle => {
817                            break;
818                        }
819                        _ = shutdown_token.cancelled() => {
820                            tracing::info!("🛑 Network event reconciler shutting down");
821                            return;
822                        }
823                        else => {
824                            break;
825                        }
826                    }
827                }
828
829                while let Ok(next_request) = event_rx.try_recv() {
830                    tracing::debug!(
831                        event = ?next_request.event,
832                        "network_event.reconciler.coalesced"
833                    );
834                    requests.push(next_request);
835                }
836
837                let events = requests
838                    .iter()
839                    .map(|request| request.event.clone())
840                    .collect::<Vec<_>>();
841                let action = select_network_recovery_action(&events);
842                let facts = events
843                    .iter()
844                    .map(ConnectionFact::from_network_event)
845                    .collect::<Vec<_>>();
846                tracing::info!(
847                    event_count = events.len(),
848                    action = ?action,
849                    events = ?events,
850                    facts = ?facts,
851                    settle_window_ms = NETWORK_EVENT_SETTLE_WINDOW.as_millis() as u64,
852                    "network_event.reconciler.batch_reconciled"
853                );
854
855                let results = process_network_event_batch(events, processor.clone()).await;
856
857                for (request, result) in requests.into_iter().zip(results) {
858                    if request.result_tx.send(result).is_err() {
859                        tracing::debug!("Network event caller dropped before receiving result");
860                    }
861                }
862            }
863            _ = shutdown_token.cancelled() => {
864                tracing::info!("🛑 Network event reconciler shutting down");
865                break;
866            }
867            else => break,
868        }
869    }
870}
871
872/// Network Event Handle
873///
874/// Lightweight handle for sending network events and receiving processing results.
875/// Created before `ActrNode::start()` to bridge platform network events.
876pub struct NetworkEventHandle {
877    /// Event sender (to ActrNode)
878    event_tx: mpsc::Sender<NetworkEventRequest>,
879    result_timeout: Duration,
880}
881
882impl NetworkEventHandle {
883    /// Create a new NetworkEventHandle
884    pub fn new(event_tx: mpsc::Sender<NetworkEventRequest>) -> Self {
885        Self::new_with_result_timeout(event_tx, NETWORK_EVENT_RESULT_TIMEOUT)
886    }
887
888    /// Create a new NetworkEventHandle with a custom result timeout.
889    ///
890    /// Production bindings use [`NetworkEventHandle::new`]. Tests can use this
891    /// constructor to verify bounded waiting without sleeping for the full
892    /// binding timeout.
893    pub fn new_with_result_timeout(
894        event_tx: mpsc::Sender<NetworkEventRequest>,
895        result_timeout: Duration,
896    ) -> Self {
897        Self {
898            event_tx,
899            result_timeout,
900        }
901    }
902
903    /// Handle full network path changes.
904    pub async fn handle_network_path_changed(
905        &self,
906        snapshot: NetworkSnapshot,
907    ) -> Result<NetworkEventResult, String> {
908        self.send_event_and_await_result(NetworkEvent::NetworkPathChanged { snapshot })
909            .await
910    }
911
912    /// Handle app lifecycle changes.
913    pub async fn handle_app_lifecycle_changed(
914        &self,
915        state: AppLifecycleState,
916    ) -> Result<NetworkEventResult, String> {
917        self.send_event_and_await_result(NetworkEvent::AppLifecycleChanged { state })
918            .await
919    }
920
921    /// Proactively clean up all connections with a reason. This never reconnects.
922    pub async fn cleanup_connections(
923        &self,
924        reason: CleanupReason,
925    ) -> Result<NetworkEventResult, String> {
926        self.send_event_and_await_result(NetworkEvent::CleanupConnections { reason })
927            .await
928    }
929
930    /// Force cleanup and reconnect.
931    pub async fn force_reconnect(
932        &self,
933        reason: ReconnectReason,
934    ) -> Result<NetworkEventResult, String> {
935        self.send_event_and_await_result(NetworkEvent::ForceReconnect { reason })
936            .await
937    }
938
939    /// Send event and await result (internal helper)
940    async fn send_event_and_await_result(
941        &self,
942        event: NetworkEvent,
943    ) -> Result<NetworkEventResult, String> {
944        let event_request_id = NEXT_NETWORK_EVENT_REQUEST_ID.fetch_add(1, Ordering::Relaxed);
945        let start = Instant::now();
946        let (result_tx, result_rx) = oneshot::channel();
947        let request = NetworkEventRequest {
948            event: event.clone(),
949            result_tx,
950        };
951
952        tracing::info!(
953            event_request_id,
954            event = ?event,
955            result_timeout_ms = self.result_timeout.as_millis() as u64,
956            "network_event.handle.enqueue"
957        );
958
959        if let Err(e) = self.event_tx.send(request).await {
960            let err = format!("Failed to send network event: {}", e);
961            tracing::warn!(
962                event_request_id,
963                event = ?event,
964                error = %err,
965                "network_event.handle.enqueue_failed"
966            );
967            return Err(err);
968        }
969
970        let result = match tokio::time::timeout(self.result_timeout, result_rx).await {
971            Ok(Ok(result)) => Ok(result),
972            Ok(Err(_)) => Err("Failed to receive network event result".to_string()),
973            Err(_) => Err(format!(
974                "Timed out waiting for network event result after {}ms",
975                self.result_timeout.as_millis()
976            )),
977        };
978
979        let wait_ms = start.elapsed().as_millis() as u64;
980        match &result {
981            Ok(result) if result.success => tracing::info!(
982                event_request_id,
983                event = ?event,
984                result_event = ?result.event,
985                duration_ms = result.duration_ms,
986                wait_ms,
987                "network_event.handle.result_received"
988            ),
989            Ok(result) => tracing::warn!(
990                event_request_id,
991                event = ?event,
992                result_event = ?result.event,
993                duration_ms = result.duration_ms,
994                wait_ms,
995                error = ?result.error,
996                "network_event.handle.result_received"
997            ),
998            Err(e) => tracing::warn!(
999                event_request_id,
1000                event = ?event,
1001                wait_ms,
1002                error = %e,
1003                "network_event.handle.result_failed"
1004            ),
1005        }
1006
1007        result
1008    }
1009}
1010
1011impl Clone for NetworkEventHandle {
1012    fn clone(&self) -> Self {
1013        Self {
1014            event_tx: self.event_tx.clone(),
1015            result_timeout: self.result_timeout,
1016        }
1017    }
1018}