Skip to main content

actr_hyper/lifecycle/
network_event.rs

1//! Network Event Handling Architecture
2//!
3//! This module defines the network event handling infrastructure.
4//!
5//! # Architecture Overview
6//!
7//! ```text
8//!        ┌─────────────────────────────────────────────┐
9//!        │ (FFI Path - Implemented)  (Actor Path - TODO)
10//!        ▼                                             ▼
11//! ┌──────────────────────────┐      ┌──────────────────────────┐
12//! │ NetworkEventHandle       │      │ Direct Proto Message     │
13//! │ • Platform FFI calls     │      │ • Actor call/tell        │
14//! │ • Send via channel       │      │ • Send to actor mailbox  │
15//! │ • Await result           │      │ • No handle needed       │
16//! └────────┬─────────────────┘      └──────┬───────────────────┘
17//!          │                               │
18//!          └───────────────┬───────────────┘
19//!                          │ Both trigger
20//!                          ▼
21//! ┌─────────────────────────────────────────────────────────┐
22//! │  ActrNode::network_event_loop()                         │
23//! │  • Receive event from channel (FFI path)                │
24//! │  • Or handle message directly (Actor path - TODO)       │
25//! │  • Delegate to NetworkEventProcessor                    │
26//! │  • Send result back via channel                         │
27//! └──────────────────────┬──────────────────────────────────┘
28//!                        │ Delegate
29//!                        ▼
30//! ┌─────────────────────────────────────────────────────────┐
31//! │  NetworkEventProcessor (Trait)                          │
32//! │                                                          │
33//! │  DefaultNetworkEventProcessor:                          │
34//! │  • reconcile settled network/app events                 │
35//! │  • execute one recovery action                          │
36//! │    └─► Offline / Probe / Restore / Cleanup / Reconnect  │
37//! └─────────────────────────────────────────────────────────┘
38//! ```
39//!
40//! # Key Components
41//!
42//! - **NetworkEvent**: Unified mobile network/app/command events
43//! - **NetworkEventResult**: Processing result with success/error/duration
44//! - **NetworkEventProcessor**: Trait for custom event handling logic
45//! - **DefaultNetworkEventProcessor**: Default implementation with signaling + WebRTC recovery
46//!
47//! # Usage Patterns
48//!
49//! ## 1. Platform FFI Call (Primary, Implemented)
50//! ```ignore
51//! // Platform layer calls NetworkEventHandle via FFI
52//! let network_handle = system.create_network_event_handle();
53//! let result = network_handle.handle_network_path_changed(snapshot).await?;
54//! if result.success {
55//!     println!("Processed in {}ms", result.duration_ms);
56//! }
57//! ```
58//!
59//! ## 2. Actor Proto Message (Optional, TODO)
60//! ```ignore
61//! // TODO: actors send proto message directly (not yet implemented)
62//! actor_ref.call(NetworkPathChangedMessage { snapshot }).await?;
63//! ```
64//!
65//! **Key Differences:**
66//! - FFI path: Uses NetworkEventHandle + channel (implemented)
67//! - Actor path: Direct proto message to mailbox (TODO, future enhancement)
68
69use std::sync::{
70    Arc,
71    atomic::{AtomicU64, Ordering},
72};
73use std::time::{Duration, Instant};
74
75use crate::transport::PeerTransport;
76use crate::wire::webrtc::{CleanupGuard, SignalingClient, WebRtcCoordinator};
77use tokio::sync::{mpsc, oneshot};
78use tokio_util::sync::CancellationToken;
79
80use super::connection_supervisor::{ConnectionFact, ConnectionSupervisor};
81
82const NETWORK_EVENT_SETTLE_WINDOW: Duration = Duration::from_millis(400);
83const NETWORK_EVENT_RESULT_TIMEOUT: Duration = Duration::from_secs(5);
84const SIGNALING_PROBE_TIMEOUT: Duration = Duration::from_secs(1);
85pub(super) const LONG_BACKGROUND_RECONNECT_THRESHOLD_MS: u64 = 30_000;
86static NEXT_NETWORK_EVENT_REQUEST_ID: AtomicU64 = AtomicU64::new(1);
87
88/// Keeps outbound sends behind a network-event lifecycle barrier while the
89/// reconciler settles and processes a queued batch.
90pub struct NetworkEventBarrier {
91    _cleanup_guard: CleanupGuard,
92}
93
94/// Mobile network path snapshot.
95#[derive(Debug, Clone, PartialEq, Eq, Hash)]
96pub struct NetworkSnapshot {
97    pub sequence: u64,
98    pub availability: NetworkAvailability,
99    pub transport: NetworkTransportFlags,
100    pub is_expensive: bool,
101    pub is_constrained: bool,
102}
103
104impl NetworkSnapshot {
105    pub fn is_offline(&self) -> bool {
106        matches!(self.availability, NetworkAvailability::Unavailable)
107    }
108
109    pub fn should_restore(&self) -> bool {
110        matches!(self.availability, NetworkAvailability::Available)
111    }
112}
113
114/// Whether the platform currently has a usable network path.
115#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
116pub enum NetworkAvailability {
117    Unknown,
118    Available,
119    Unavailable,
120}
121
122/// Active network transport flags. Multiple flags can be true at the same time.
123#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
124pub struct NetworkTransportFlags {
125    pub wifi: bool,
126    pub cellular: bool,
127    pub ethernet: bool,
128    pub vpn: bool,
129    pub other: bool,
130}
131
132/// App lifecycle state relevant to connection recovery.
133#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
134pub enum AppLifecycleState {
135    Background,
136    Foreground { background_duration_ms: u64 },
137}
138
139/// Reason for a cleanup-only operation.
140#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
141pub enum CleanupReason {
142    AppTerminating,
143    UserLogout,
144    StaleConnectionSuspected,
145    ManualReset,
146}
147
148/// Reason for a forced cleanup + restore operation.
149#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
150pub enum ReconnectReason {
151    NetworkPathChanged,
152    LongBackground,
153    ProbeFailed,
154    ManualReconnect,
155    StaleConnectionSuspected,
156}
157
158/// Network event type
159#[derive(Debug, Clone, PartialEq, Eq, Hash)]
160pub enum NetworkEvent {
161    /// Full mobile network path changed.
162    NetworkPathChanged { snapshot: NetworkSnapshot },
163
164    /// App lifecycle changed.
165    AppLifecycleChanged { state: AppLifecycleState },
166
167    /// Proactively clean up all connections
168    ///
169    /// Used for app lifecycle management scenarios:
170    /// - App entering background
171    /// - User actively logging out
172    /// - App about to exit
173    CleanupConnections { reason: CleanupReason },
174
175    /// Proactively clean up and restore connections.
176    ForceReconnect { reason: ReconnectReason },
177}
178
179/// Final action selected from a settled batch of network events.
180#[derive(Debug, Clone, Copy, PartialEq, Eq)]
181pub enum NetworkRecoveryAction {
182    Noop,
183    Offline,
184    Probe,
185    Restore,
186    CleanupOnly,
187    ForceReconnect,
188}
189
190fn network_event_needs_lifecycle_barrier(event: &NetworkEvent) -> bool {
191    match event {
192        NetworkEvent::NetworkPathChanged { snapshot } => {
193            snapshot.is_offline() || snapshot.should_restore()
194        }
195        NetworkEvent::AppLifecycleChanged { state } => match state {
196            AppLifecycleState::Background => false,
197            AppLifecycleState::Foreground {
198                background_duration_ms,
199            } => *background_duration_ms >= LONG_BACKGROUND_RECONNECT_THRESHOLD_MS,
200        },
201        NetworkEvent::CleanupConnections { .. } | NetworkEvent::ForceReconnect { .. } => true,
202    }
203}
204
205/// Network event processing result
206#[derive(Debug, Clone)]
207pub struct NetworkEventResult {
208    /// Event type
209    pub event: NetworkEvent,
210
211    /// Whether processing succeeded
212    pub success: bool,
213
214    /// Error message (if failed)
215    pub error: Option<String>,
216
217    /// Processing duration (milliseconds)
218    pub duration_ms: u64,
219}
220
221impl NetworkEventResult {
222    pub fn success(event: NetworkEvent, duration_ms: u64) -> Self {
223        Self {
224            event,
225            success: true,
226            error: None,
227            duration_ms,
228        }
229    }
230
231    pub fn failure(event: NetworkEvent, error: String, duration_ms: u64) -> Self {
232        Self {
233            event,
234            success: false,
235            error: Some(error),
236            duration_ms,
237        }
238    }
239}
240
241/// Network event processor trait
242///
243/// Defines the processing logic for network events; can be custom-implemented by users
244#[async_trait::async_trait]
245pub trait NetworkEventProcessor: Send + Sync {
246    /// Enter a lifecycle barrier as soon as a queued event is observed by the
247    /// reconciler. The default is no barrier for custom processors.
248    fn begin_network_event_barrier(&self, _event: &NetworkEvent) -> Option<NetworkEventBarrier> {
249        None
250    }
251
252    /// Process network available event
253    ///
254    /// # Returns
255    /// - `Ok(())`: processing succeeded
256    /// - `Err(String)`: processing failed, contains error message
257    async fn process_network_available(&self) -> Result<(), String>;
258
259    /// Process network lost event
260    ///
261    /// # Returns
262    /// - `Ok(())`: processing succeeded
263    /// - `Err(String)`: processing failed, contains error message
264    async fn process_network_lost(&self) -> Result<(), String>;
265
266    /// Process network type changed event
267    ///
268    /// # Returns
269    /// - `Ok(())`: processing succeeded
270    /// - `Err(String)`: processing failed, contains error message
271    async fn process_network_type_changed(
272        &self,
273        is_wifi: bool,
274        is_cellular: bool,
275    ) -> Result<(), String>;
276
277    /// Proactively clean up all connections
278    ///
279    /// This method proactively cleans up all network connections. Applicable scenarios:
280    /// - App entering background (iOS/Android)
281    /// - User actively logging out
282    /// - App about to exit
283    /// - Need to reset network state
284    ///
285    /// # FFI Binding Note
286    ///
287    /// This method is specifically designed for FFI bindings, allowing upper-layer
288    /// platform code (Swift/Kotlin) to proactively manage connection lifecycle
289    /// through the unified `NetworkEventProcessor` interface.
290    ///
291    /// # Difference from Event Response
292    ///
293    /// - `process_network_lost()`: passively responds to network disconnection events
294    /// - `cleanup_connections()`: proactively cleans up connections (independent of network events)
295    ///
296    /// # Returns
297    /// - `Ok(())`: cleanup succeeded
298    /// - `Err(String)`: cleanup failed, contains error message
299    async fn cleanup_connections(&self) -> Result<(), String>;
300
301    /// Probe existing connectivity without forcing cleanup.
302    async fn probe_connectivity(&self) -> Result<(), String> {
303        Ok(())
304    }
305
306    /// Proactively clean up and restore connections.
307    async fn force_reconnect(&self) -> Result<(), String> {
308        self.cleanup_connections().await?;
309        self.process_network_available().await
310    }
311
312    /// Process the final action selected from a settled event batch.
313    ///
314    /// Custom processors can rely on the default mapping. The default runtime
315    /// processor overrides this to bypass per-event debounce after reconciliation.
316    async fn process_network_recovery_action(
317        &self,
318        action: NetworkRecoveryAction,
319    ) -> Result<(), String> {
320        match action {
321            NetworkRecoveryAction::Noop => Ok(()),
322            NetworkRecoveryAction::Offline => self.process_network_lost().await,
323            NetworkRecoveryAction::Probe => self.probe_connectivity().await,
324            NetworkRecoveryAction::Restore => self.process_network_available().await,
325            NetworkRecoveryAction::CleanupOnly => self.cleanup_connections().await,
326            NetworkRecoveryAction::ForceReconnect => self.force_reconnect().await,
327        }
328    }
329}
330
331/// Debounce configuration
332#[derive(Debug, Clone)]
333pub struct DebounceConfig {
334    /// Debounce time window (duplicate events within this window are ignored)
335    pub window: Duration,
336}
337
338impl Default for DebounceConfig {
339    fn default() -> Self {
340        Self {
341            // Default debounce window
342            window: Duration::from_secs(2),
343        }
344    }
345}
346
347/// Debounce state tracking
348#[derive(Debug)]
349struct DebounceState {
350    last_available: tokio::sync::Mutex<Option<Instant>>,
351    last_lost: tokio::sync::Mutex<Option<Instant>>,
352    last_type_changed: tokio::sync::Mutex<Option<Instant>>,
353}
354
355impl DebounceState {
356    fn new() -> Self {
357        Self {
358            last_available: tokio::sync::Mutex::new(None),
359            last_lost: tokio::sync::Mutex::new(None),
360            last_type_changed: tokio::sync::Mutex::new(None),
361        }
362    }
363}
364
365#[derive(Debug, Clone, Copy, PartialEq, Eq)]
366enum DebounceEvent {
367    Available,
368    Lost,
369    TypeChanged,
370}
371
372#[derive(Debug)]
373struct SignalingRecoveryState {
374    connect_lock: tokio::sync::Mutex<()>,
375    last_successful_connect: tokio::sync::Mutex<Option<Instant>>,
376}
377
378impl SignalingRecoveryState {
379    fn new() -> Self {
380        Self {
381            connect_lock: tokio::sync::Mutex::new(()),
382            last_successful_connect: tokio::sync::Mutex::new(None),
383        }
384    }
385}
386
387/// Default network event processor implementation
388pub struct DefaultNetworkEventProcessor {
389    signaling_client: Arc<dyn SignalingClient>,
390    webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
391    peer_transport: Option<Arc<PeerTransport>>,
392    debounce_config: DebounceConfig,
393    debounce_state: Arc<DebounceState>,
394    recovery_state: Arc<SignalingRecoveryState>,
395}
396
397impl DefaultNetworkEventProcessor {
398    pub fn new(
399        signaling_client: Arc<dyn SignalingClient>,
400        webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
401    ) -> Self {
402        Self::new_with_debounce_and_peer_transport(
403            signaling_client,
404            webrtc_coordinator,
405            DebounceConfig::default(),
406            None,
407        )
408    }
409
410    pub fn new_with_debounce(
411        signaling_client: Arc<dyn SignalingClient>,
412        webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
413        debounce_config: DebounceConfig,
414    ) -> Self {
415        Self::new_with_debounce_and_peer_transport(
416            signaling_client,
417            webrtc_coordinator,
418            debounce_config,
419            None,
420        )
421    }
422
423    pub(crate) fn new_with_peer_transport(
424        signaling_client: Arc<dyn SignalingClient>,
425        webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
426        peer_transport: Option<Arc<PeerTransport>>,
427    ) -> Self {
428        Self::new_with_debounce_and_peer_transport(
429            signaling_client,
430            webrtc_coordinator,
431            DebounceConfig::default(),
432            peer_transport,
433        )
434    }
435
436    pub(crate) fn new_with_debounce_and_peer_transport(
437        signaling_client: Arc<dyn SignalingClient>,
438        webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
439        debounce_config: DebounceConfig,
440        peer_transport: Option<Arc<PeerTransport>>,
441    ) -> Self {
442        Self {
443            signaling_client,
444            webrtc_coordinator,
445            peer_transport,
446            debounce_config,
447            debounce_state: Arc::new(DebounceState::new()),
448            recovery_state: Arc::new(SignalingRecoveryState::new()),
449        }
450    }
451
452    fn lifecycle_barrier(&self) -> Option<NetworkEventBarrier> {
453        self.webrtc_coordinator
454            .as_ref()
455            .map(|coordinator| NetworkEventBarrier {
456                _cleanup_guard: coordinator.cleanup_guard(),
457            })
458    }
459
460    /// Check whether an event should be filtered by debounce
461    ///
462    /// # Returns
463    /// - `true`: the event should be processed
464    /// - `false`: the event is within the debounce window and should be ignored
465    async fn should_process_event(&self, event: DebounceEvent) -> bool {
466        let now = Instant::now();
467
468        match event {
469            DebounceEvent::Available => {
470                let mut last = self.debounce_state.last_available.lock().await;
471                if let Some(last_time) = *last {
472                    if now.duration_since(last_time) < self.debounce_config.window {
473                        tracing::debug!(
474                            "⏸️  Debouncing Network Available event (last event was {:?} ago)",
475                            now.duration_since(last_time)
476                        );
477                        return false;
478                    }
479                }
480                *last = Some(now);
481                true
482            }
483            DebounceEvent::Lost => {
484                let mut last = self.debounce_state.last_lost.lock().await;
485                if let Some(last_time) = *last {
486                    if now.duration_since(last_time) < self.debounce_config.window {
487                        tracing::debug!(
488                            "⏸️  Debouncing Network Lost event (last event was {:?} ago)",
489                            now.duration_since(last_time)
490                        );
491                        return false;
492                    }
493                }
494                *last = Some(now);
495                true
496            }
497            DebounceEvent::TypeChanged => {
498                let mut last = self.debounce_state.last_type_changed.lock().await;
499                if let Some(last_time) = *last {
500                    if now.duration_since(last_time) < self.debounce_config.window {
501                        tracing::debug!(
502                            "⏸️  Debouncing Network TypeChanged event (last event was {:?} ago)",
503                            now.duration_since(last_time)
504                        );
505                        return false;
506                    }
507                }
508                *last = Some(now);
509                true
510            }
511        }
512    }
513
514    async fn ensure_signaling_healthy_once(&self, reason: &str) -> Result<(), String> {
515        let _guard = self.recovery_state.connect_lock.lock().await;
516
517        if !self.signaling_client.is_connected() {
518            tracing::info!(reason = reason, "🔄 Connecting signaling");
519            self.signaling_client.connect_once().await.map_err(|e| {
520                let err_msg = format!("WebSocket connect failed: {}", e);
521                tracing::error!("❌ {}", err_msg);
522                err_msg
523            })?;
524
525            *self.recovery_state.last_successful_connect.lock().await = Some(Instant::now());
526            tracing::info!(reason = reason, "✅ Signaling connected");
527            return Ok(());
528        }
529
530        tracing::debug!(
531            reason = reason,
532            timeout_ms = SIGNALING_PROBE_TIMEOUT.as_millis() as u64,
533            "🔎 Probing existing signaling WebSocket"
534        );
535
536        match self
537            .signaling_client
538            .probe_alive(SIGNALING_PROBE_TIMEOUT)
539            .await
540        {
541            Ok(()) => {
542                tracing::debug!(
543                    reason = reason,
544                    "✅ Signaling probe succeeded; keeping existing WebSocket"
545                );
546                Ok(())
547            }
548            Err(e) => {
549                tracing::warn!(
550                    reason = reason,
551                    "⚠️ Signaling probe failed; rebuilding WebSocket: {}",
552                    e
553                );
554
555                if let Err(disconnect_err) = self.signaling_client.disconnect().await {
556                    tracing::warn!(
557                        reason = reason,
558                        "⚠️ Failed to disconnect unhealthy signaling before rebuild: {}",
559                        disconnect_err
560                    );
561                }
562
563                tracing::info!(reason = reason, "🔄 Rebuilding signaling: connecting");
564                self.signaling_client
565                    .connect_once()
566                    .await
567                    .map_err(|connect_err| {
568                        let err_msg = format!("WebSocket rebuild failed: {}", connect_err);
569                        tracing::error!("❌ {}", err_msg);
570                        err_msg
571                    })?;
572
573                *self.recovery_state.last_successful_connect.lock().await = Some(Instant::now());
574                tracing::info!(reason = reason, "✅ Signaling rebuilt");
575                Ok(())
576            }
577        }
578    }
579
580    async fn restore_signaling_and_webrtc(&self, reason: &str) -> Result<(), String> {
581        let _cleanup_guard = self.lifecycle_barrier();
582        let recovery_targets = if let Some(coordinator) = self.webrtc_coordinator.clone() {
583            coordinator.begin_network_recovery(reason).await
584        } else {
585            Vec::new()
586        };
587
588        self.ensure_signaling_healthy_once(reason).await?;
589
590        let coordinator = self.webrtc_coordinator.clone();
591
592        if let Some(coordinator) = coordinator {
593            if recovery_targets.is_empty() {
594                tracing::info!("♻️ Resuming ICE restart for peers already in network recovery");
595            } else {
596                tracing::info!("♻️ Triggering ICE restart for recovering connections...");
597            }
598            coordinator.restart_network_recovery_connections().await;
599        }
600
601        Ok(())
602    }
603
604    async fn probe_or_restore(&self, reason: &str) -> Result<(), String> {
605        match self.probe_connectivity().await {
606            Ok(()) => Ok(()),
607            Err(e) => {
608                tracing::warn!(
609                    reason = reason,
610                    "Connectivity probe failed; restoring connections: {}",
611                    e
612                );
613                self.restore_signaling_and_webrtc(reason).await
614            }
615        }
616    }
617
618    async fn process_offline(&self) -> Result<(), String> {
619        let _cleanup_guard = self.lifecycle_barrier();
620        tracing::info!("📱 Processing: Network offline");
621
622        if let Some(ref coordinator) = self.webrtc_coordinator {
623            coordinator.begin_network_recovery("NetworkLost").await;
624            tracing::info!("🧹 Clearing pending ICE restart attempts...");
625            coordinator.clear_pending_restarts().await;
626        }
627
628        tracing::info!("🔌 Disconnecting WebSocket...");
629        let _ = self.signaling_client.disconnect().await;
630
631        Ok(())
632    }
633}
634
635#[async_trait::async_trait]
636impl NetworkEventProcessor for DefaultNetworkEventProcessor {
637    fn begin_network_event_barrier(&self, event: &NetworkEvent) -> Option<NetworkEventBarrier> {
638        if network_event_needs_lifecycle_barrier(event) {
639            self.lifecycle_barrier()
640        } else {
641            None
642        }
643    }
644
645    /// Process network available event
646    async fn process_network_available(&self) -> Result<(), String> {
647        // Debounce check
648        let should_process = self.should_process_event(DebounceEvent::Available).await;
649        if !should_process && self.signaling_client.is_connected() {
650            return Ok(());
651        }
652
653        tracing::info!("📱 Processing: Network available");
654
655        self.restore_signaling_and_webrtc("NetworkAvailable").await
656    }
657
658    /// Process network lost event
659    async fn process_network_lost(&self) -> Result<(), String> {
660        // Debounce check
661        if !self.should_process_event(DebounceEvent::Lost).await {
662            return Ok(());
663        }
664
665        self.process_offline().await
666    }
667
668    /// Process network type changed event
669    async fn process_network_type_changed(
670        &self,
671        is_wifi: bool,
672        is_cellular: bool,
673    ) -> Result<(), String> {
674        // Debounce check
675        let should_process = self.should_process_event(DebounceEvent::TypeChanged).await;
676        if !should_process && self.signaling_client.is_connected() {
677            return Ok(());
678        }
679
680        tracing::info!(
681            "📱 Processing: Network type changed (WiFi={}, Cellular={})",
682            is_wifi,
683            is_cellular
684        );
685
686        self.restore_signaling_and_webrtc("NetworkTypeChanged")
687            .await
688    }
689
690    /// Proactively clean up all connections
691    ///
692    /// Differs from `process_network_lost()`:
693    /// - No debounce check (proactive calls always execute)
694    /// - Intended for app lifecycle management, not network event response
695    async fn cleanup_connections(&self) -> Result<(), String> {
696        let _cleanup_guard = self.lifecycle_barrier();
697
698        tracing::info!("🧹 Manually cleaning up all connections...");
699
700        // Step 1: Clear pending ICE restart attempts
701        if let Some(ref coordinator) = self.webrtc_coordinator {
702            tracing::info!("♻️  Clearing pending ICE restart attempts...");
703            coordinator.clear_pending_restarts().await;
704        }
705
706        // Step 2: Cancel PeerTransport singleflight and close established transports.
707        if let Some(ref peer_transport) = self.peer_transport {
708            tracing::info!("🔻 Closing all PeerTransport connections...");
709            if let Err(e) = peer_transport.close_all().await {
710                let err_msg = format!("Failed to close peer transports: {}", e);
711                tracing::warn!("⚠️  {}", err_msg);
712                // Do not fail the whole cleanup; continue releasing other resources.
713            } else {
714                tracing::info!("✅ All PeerTransport connections closed");
715            }
716        }
717
718        // Step 3: Close all WebRTC peer connections
719        if let Some(ref coordinator) = self.webrtc_coordinator {
720            tracing::info!("🔻 Closing all WebRTC peer connections...");
721            if let Err(e) = coordinator.close_all_peers().await {
722                let err_msg = format!("Failed to close all peers: {}", e);
723                tracing::warn!("⚠️  {}", err_msg);
724                // Do not fail the whole cleanup; continue releasing other resources.
725            } else {
726                tracing::info!("✅ All WebRTC peer connections closed");
727            }
728        }
729
730        // Step 4: Proactively disconnect the WebSocket.
731        tracing::info!("🔌 Disconnecting WebSocket...");
732        match self.signaling_client.disconnect().await {
733            Ok(_) => {
734                tracing::info!("✅ WebSocket disconnected successfully");
735            }
736            Err(e) => {
737                let err_msg = format!("Failed to disconnect WebSocket: {}", e);
738                tracing::warn!("⚠️  {}", err_msg);
739                // Do not fail the whole cleanup; continue releasing other resources.
740            }
741        }
742
743        tracing::info!("✅ Connection cleanup completed");
744
745        Ok(())
746    }
747
748    async fn probe_connectivity(&self) -> Result<(), String> {
749        self.signaling_client
750            .probe_alive(SIGNALING_PROBE_TIMEOUT)
751            .await
752            .map_err(|e| format!("Signaling probe failed: {}", e))
753    }
754
755    async fn force_reconnect(&self) -> Result<(), String> {
756        self.cleanup_connections().await?;
757        self.restore_signaling_and_webrtc("ForceReconnect").await
758    }
759
760    async fn process_network_recovery_action(
761        &self,
762        action: NetworkRecoveryAction,
763    ) -> Result<(), String> {
764        match action {
765            NetworkRecoveryAction::Noop => Ok(()),
766            NetworkRecoveryAction::Offline => self.process_offline().await,
767            NetworkRecoveryAction::Probe => self.probe_or_restore("Probe").await,
768            NetworkRecoveryAction::Restore => {
769                self.restore_signaling_and_webrtc("NetworkEventBatch").await
770            }
771            NetworkRecoveryAction::CleanupOnly => self.cleanup_connections().await,
772            NetworkRecoveryAction::ForceReconnect => self.force_reconnect().await,
773        }
774    }
775}
776
777pub fn select_network_recovery_action(events: &[NetworkEvent]) -> NetworkRecoveryAction {
778    ConnectionSupervisor::select_action(events)
779}
780
781pub async fn process_network_event_batch(
782    events: Vec<NetworkEvent>,
783    processor: Arc<dyn NetworkEventProcessor>,
784) -> Vec<NetworkEventResult> {
785    if events.is_empty() {
786        return Vec::new();
787    }
788
789    let action = select_network_recovery_action(&events);
790    let start = Instant::now();
791
792    tracing::info!(
793        event_count = events.len(),
794        action = ?action,
795        "network_event.action.start"
796    );
797
798    let result = processor.process_network_recovery_action(action).await;
799
800    let duration_ms = start.elapsed().as_millis() as u64;
801    match &result {
802        Ok(()) => tracing::info!(
803            event_count = events.len(),
804            action = ?action,
805            duration_ms,
806            "network_event.action.completed"
807        ),
808        Err(e) => tracing::warn!(
809            event_count = events.len(),
810            action = ?action,
811            duration_ms,
812            error = %e,
813            "network_event.action.completed"
814        ),
815    }
816
817    events
818        .into_iter()
819        .map(|event| match &result {
820            Ok(()) => NetworkEventResult::success(event, duration_ms),
821            Err(e) => NetworkEventResult::failure(event, e.clone(), duration_ms),
822        })
823        .collect()
824}
825
826pub struct NetworkEventRequest {
827    pub event: NetworkEvent,
828    pub result_tx: oneshot::Sender<NetworkEventResult>,
829}
830
831pub async fn run_network_event_reconciler(
832    mut event_rx: mpsc::Receiver<NetworkEventRequest>,
833    processor: Arc<dyn NetworkEventProcessor>,
834    shutdown_token: CancellationToken,
835) {
836    tracing::info!("🔄 Network event reconciler started");
837
838    loop {
839        tokio::select! {
840            Some(first_request) = event_rx.recv() => {
841                tracing::debug!(
842                    event = ?first_request.event,
843                    "network_event.reconciler.received"
844                );
845                let mut event_barrier = processor.begin_network_event_barrier(&first_request.event);
846                let mut requests = vec![first_request];
847                let settle = tokio::time::sleep(NETWORK_EVENT_SETTLE_WINDOW);
848                tokio::pin!(settle);
849
850                loop {
851                    tokio::select! {
852                        Some(next_request) = event_rx.recv() => {
853                            tracing::debug!(
854                                event = ?next_request.event,
855                                "network_event.reconciler.coalesced"
856                            );
857                            if event_barrier.is_none() {
858                                event_barrier = processor.begin_network_event_barrier(&next_request.event);
859                            }
860                            requests.push(next_request);
861                        }
862                        _ = &mut settle => {
863                            break;
864                        }
865                        _ = shutdown_token.cancelled() => {
866                            tracing::info!("🛑 Network event reconciler shutting down");
867                            return;
868                        }
869                        else => {
870                            break;
871                        }
872                    }
873                }
874
875                while let Ok(next_request) = event_rx.try_recv() {
876                    tracing::debug!(
877                        event = ?next_request.event,
878                        "network_event.reconciler.coalesced"
879                    );
880                    if event_barrier.is_none() {
881                        event_barrier = processor.begin_network_event_barrier(&next_request.event);
882                    }
883                    requests.push(next_request);
884                }
885
886                let events = requests
887                    .iter()
888                    .map(|request| request.event.clone())
889                    .collect::<Vec<_>>();
890                let action = select_network_recovery_action(&events);
891                let facts = events
892                    .iter()
893                    .map(ConnectionFact::from_network_event)
894                    .collect::<Vec<_>>();
895                tracing::info!(
896                    event_count = events.len(),
897                    action = ?action,
898                    events = ?events,
899                    facts = ?facts,
900                    settle_window_ms = NETWORK_EVENT_SETTLE_WINDOW.as_millis() as u64,
901                    "network_event.reconciler.batch_reconciled"
902                );
903
904                let results = process_network_event_batch(events, processor.clone()).await;
905                drop(event_barrier);
906
907                for (request, result) in requests.into_iter().zip(results) {
908                    if request.result_tx.send(result).is_err() {
909                        tracing::debug!("Network event caller dropped before receiving result");
910                    }
911                }
912            }
913            _ = shutdown_token.cancelled() => {
914                tracing::info!("🛑 Network event reconciler shutting down");
915                break;
916            }
917            else => break,
918        }
919    }
920}
921
922/// Network Event Handle
923///
924/// Lightweight handle for sending network events and receiving processing results.
925/// Created before `ActrNode::start()` to bridge platform network events.
926pub struct NetworkEventHandle {
927    /// Event sender (to ActrNode)
928    event_tx: mpsc::Sender<NetworkEventRequest>,
929    result_timeout: Duration,
930}
931
932impl NetworkEventHandle {
933    /// Create a new NetworkEventHandle
934    pub fn new(event_tx: mpsc::Sender<NetworkEventRequest>) -> Self {
935        Self::new_with_result_timeout(event_tx, NETWORK_EVENT_RESULT_TIMEOUT)
936    }
937
938    /// Create a new NetworkEventHandle with a custom result timeout.
939    ///
940    /// Production bindings use [`NetworkEventHandle::new`]. Tests can use this
941    /// constructor to verify bounded waiting without sleeping for the full
942    /// binding timeout.
943    pub fn new_with_result_timeout(
944        event_tx: mpsc::Sender<NetworkEventRequest>,
945        result_timeout: Duration,
946    ) -> Self {
947        Self {
948            event_tx,
949            result_timeout,
950        }
951    }
952
953    /// Handle full network path changes.
954    pub async fn handle_network_path_changed(
955        &self,
956        snapshot: NetworkSnapshot,
957    ) -> Result<NetworkEventResult, String> {
958        self.send_event_and_await_result(NetworkEvent::NetworkPathChanged { snapshot })
959            .await
960    }
961
962    /// Handle app lifecycle changes.
963    pub async fn handle_app_lifecycle_changed(
964        &self,
965        state: AppLifecycleState,
966    ) -> Result<NetworkEventResult, String> {
967        self.send_event_and_await_result(NetworkEvent::AppLifecycleChanged { state })
968            .await
969    }
970
971    /// Proactively clean up all connections with a reason. This never reconnects.
972    pub async fn cleanup_connections(
973        &self,
974        reason: CleanupReason,
975    ) -> Result<NetworkEventResult, String> {
976        self.send_event_and_await_result(NetworkEvent::CleanupConnections { reason })
977            .await
978    }
979
980    /// Force cleanup and reconnect.
981    pub async fn force_reconnect(
982        &self,
983        reason: ReconnectReason,
984    ) -> Result<NetworkEventResult, String> {
985        self.send_event_and_await_result(NetworkEvent::ForceReconnect { reason })
986            .await
987    }
988
989    /// Send event and await result (internal helper)
990    async fn send_event_and_await_result(
991        &self,
992        event: NetworkEvent,
993    ) -> Result<NetworkEventResult, String> {
994        let event_request_id = NEXT_NETWORK_EVENT_REQUEST_ID.fetch_add(1, Ordering::Relaxed);
995        let start = Instant::now();
996        let (result_tx, result_rx) = oneshot::channel();
997        let request = NetworkEventRequest {
998            event: event.clone(),
999            result_tx,
1000        };
1001
1002        tracing::info!(
1003            event_request_id,
1004            event = ?event,
1005            result_timeout_ms = self.result_timeout.as_millis() as u64,
1006            "network_event.handle.enqueue"
1007        );
1008
1009        if let Err(e) = self.event_tx.send(request).await {
1010            let err = format!("Failed to send network event: {}", e);
1011            tracing::warn!(
1012                event_request_id,
1013                event = ?event,
1014                error = %err,
1015                "network_event.handle.enqueue_failed"
1016            );
1017            return Err(err);
1018        }
1019
1020        let result = match tokio::time::timeout(self.result_timeout, result_rx).await {
1021            Ok(Ok(result)) => Ok(result),
1022            Ok(Err(_)) => Err("Failed to receive network event result".to_string()),
1023            Err(_) => Err(format!(
1024                "Timed out waiting for network event result after {}ms",
1025                self.result_timeout.as_millis()
1026            )),
1027        };
1028
1029        let wait_ms = start.elapsed().as_millis() as u64;
1030        match &result {
1031            Ok(result) if result.success => tracing::info!(
1032                event_request_id,
1033                event = ?event,
1034                result_event = ?result.event,
1035                duration_ms = result.duration_ms,
1036                wait_ms,
1037                "network_event.handle.result_received"
1038            ),
1039            Ok(result) => tracing::warn!(
1040                event_request_id,
1041                event = ?event,
1042                result_event = ?result.event,
1043                duration_ms = result.duration_ms,
1044                wait_ms,
1045                error = ?result.error,
1046                "network_event.handle.result_received"
1047            ),
1048            Err(e) => tracing::warn!(
1049                event_request_id,
1050                event = ?event,
1051                wait_ms,
1052                error = %e,
1053                "network_event.handle.result_failed"
1054            ),
1055        }
1056
1057        result
1058    }
1059}
1060
1061impl Clone for NetworkEventHandle {
1062    fn clone(&self) -> Self {
1063        Self {
1064            event_tx: self.event_tx.clone(),
1065            result_timeout: self.result_timeout,
1066        }
1067    }
1068}
1069
1070#[cfg(test)]
1071mod tests {
1072    use super::*;
1073
1074    fn snapshot(sequence: u64, availability: NetworkAvailability) -> NetworkSnapshot {
1075        NetworkSnapshot {
1076            sequence,
1077            availability,
1078            transport: NetworkTransportFlags::default(),
1079            is_expensive: false,
1080            is_constrained: false,
1081        }
1082    }
1083
1084    #[test]
1085    fn lifecycle_barrier_is_scoped_to_events_that_change_connections() {
1086        let cases = [
1087            (
1088                NetworkEvent::NetworkPathChanged {
1089                    snapshot: snapshot(1, NetworkAvailability::Unavailable),
1090                },
1091                true,
1092            ),
1093            (
1094                NetworkEvent::NetworkPathChanged {
1095                    snapshot: snapshot(2, NetworkAvailability::Available),
1096                },
1097                true,
1098            ),
1099            (
1100                NetworkEvent::NetworkPathChanged {
1101                    snapshot: snapshot(3, NetworkAvailability::Unknown),
1102                },
1103                false,
1104            ),
1105            (
1106                NetworkEvent::AppLifecycleChanged {
1107                    state: AppLifecycleState::Background,
1108                },
1109                false,
1110            ),
1111            (
1112                NetworkEvent::AppLifecycleChanged {
1113                    state: AppLifecycleState::Foreground {
1114                        background_duration_ms: LONG_BACKGROUND_RECONNECT_THRESHOLD_MS - 1,
1115                    },
1116                },
1117                false,
1118            ),
1119            (
1120                NetworkEvent::AppLifecycleChanged {
1121                    state: AppLifecycleState::Foreground {
1122                        background_duration_ms: LONG_BACKGROUND_RECONNECT_THRESHOLD_MS,
1123                    },
1124                },
1125                true,
1126            ),
1127            (
1128                NetworkEvent::CleanupConnections {
1129                    reason: CleanupReason::ManualReset,
1130                },
1131                true,
1132            ),
1133            (
1134                NetworkEvent::ForceReconnect {
1135                    reason: ReconnectReason::ManualReconnect,
1136                },
1137                true,
1138            ),
1139        ];
1140
1141        for (event, expected) in cases {
1142            assert_eq!(
1143                network_event_needs_lifecycle_barrier(&event),
1144                expected,
1145                "{event:?}"
1146            );
1147        }
1148    }
1149}