Skip to main content

actr_hyper/lifecycle/
network_event.rs

1//! Network Event Handling Architecture
2//!
3//! This module defines the network event handling infrastructure.
4//!
5//! # Architecture Overview
6//!
7//! ```text
8//!        ┌─────────────────────────────────────────────┐
9//!        │ (FFI Path - Implemented)  (Actor Path - TODO)
10//!        ▼                                             ▼
11//! ┌──────────────────────────┐      ┌──────────────────────────┐
12//! │ NetworkEventHandle       │      │ Direct Proto Message     │
13//! │ • Platform FFI calls     │      │ • Actor call/tell        │
14//! │ • Send via channel       │      │ • Send to actor mailbox  │
15//! │ • Await result           │      │ • No handle needed       │
16//! └────────┬─────────────────┘      └──────┬───────────────────┘
17//!          │                               │
18//!          └───────────────┬───────────────┘
19//!                          │ Both trigger
20//!                          ▼
21//! ┌─────────────────────────────────────────────────────────┐
22//! │  ActrNode::network_event_loop()                         │
23//! │  • Receive event from channel (FFI path)                │
24//! │  • Or handle message directly (Actor path - TODO)       │
25//! │  • Delegate to NetworkEventProcessor                    │
26//! │  • Send result back via channel                         │
27//! └──────────────────────┬──────────────────────────────────┘
28//!                        │ Delegate
29//!                        ▼
30//! ┌─────────────────────────────────────────────────────────┐
31//! │  NetworkEventProcessor (Trait)                          │
32//! │                                                          │
33//! │  DefaultNetworkEventProcessor:                          │
34//! │  • reconcile settled network/app events                 │
35//! │  • execute one recovery action                          │
36//! │    └─► Offline / Probe / Restore / Cleanup / Reconnect  │
37//! └─────────────────────────────────────────────────────────┘
38//! ```
39//!
40//! # Key Components
41//!
42//! - **NetworkEvent**: Unified mobile network/app/command events
43//! - **NetworkEventResult**: Processing result with success/error/duration
44//! - **NetworkEventProcessor**: Trait for custom event handling logic
45//! - **DefaultNetworkEventProcessor**: Default implementation with signaling + WebRTC recovery
46//!
47//! # Usage Patterns
48//!
49//! ## 1. Platform FFI Call (Primary, Implemented)
50//! ```ignore
51//! // Platform layer calls NetworkEventHandle via FFI
52//! let network_handle = system.create_network_event_handle();
53//! let result = network_handle.handle_network_path_changed(snapshot).await?;
54//! if result.success {
55//!     println!("Processed in {}ms", result.duration_ms);
56//! }
57//! ```
58//!
59//! ## 2. Actor Proto Message (Optional, TODO)
60//! ```ignore
61//! // TODO: actors send proto message directly (not yet implemented)
62//! actor_ref.call(NetworkPathChangedMessage { snapshot }).await?;
63//! ```
64//!
65//! **Key Differences:**
66//! - FFI path: Uses NetworkEventHandle + channel (implemented)
67//! - Actor path: Direct proto message to mailbox (TODO, future enhancement)
68
69use std::sync::{
70    Arc,
71    atomic::{AtomicU64, Ordering},
72};
73use std::time::{Duration, Instant};
74
75use crate::transport::PeerTransport;
76use crate::wire::webrtc::{CleanupGuard, SignalingClient, WebRtcCoordinator};
77use tokio::sync::{mpsc, oneshot};
78use tokio_util::sync::CancellationToken;
79
80use super::connection_supervisor::{ConnectionFact, ConnectionSupervisor};
81
82const NETWORK_EVENT_SETTLE_WINDOW: Duration = Duration::from_millis(400);
83const NETWORK_EVENT_RESULT_TIMEOUT: Duration = Duration::from_secs(5);
84const SIGNALING_PROBE_TIMEOUT: Duration = Duration::from_secs(1);
85pub(super) const LONG_BACKGROUND_RECONNECT_THRESHOLD_MS: u64 = 30_000;
86static NEXT_NETWORK_EVENT_REQUEST_ID: AtomicU64 = AtomicU64::new(1);
87
88/// Keeps outbound sends behind a network-event lifecycle barrier while the
89/// reconciler settles and processes a queued batch.
90pub struct NetworkEventBarrier {
91    _cleanup_guard: CleanupGuard,
92}
93
94/// Mobile network path snapshot.
95#[derive(Debug, Clone, PartialEq, Eq, Hash)]
96pub struct NetworkSnapshot {
97    pub sequence: u64,
98    pub availability: NetworkAvailability,
99    pub transport: NetworkTransportFlags,
100    pub is_expensive: bool,
101    pub is_constrained: bool,
102}
103
104impl NetworkSnapshot {
105    pub fn is_offline(&self) -> bool {
106        matches!(self.availability, NetworkAvailability::Unavailable)
107    }
108
109    pub fn should_restore(&self) -> bool {
110        matches!(self.availability, NetworkAvailability::Available)
111    }
112}
113
114/// Whether the platform currently has a usable network path.
115#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
116pub enum NetworkAvailability {
117    Unknown,
118    Available,
119    Unavailable,
120}
121
122/// Active network transport flags. Multiple flags can be true at the same time.
123#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
124pub struct NetworkTransportFlags {
125    pub wifi: bool,
126    pub cellular: bool,
127    pub ethernet: bool,
128    pub vpn: bool,
129    pub other: bool,
130}
131
132/// App lifecycle state relevant to connection recovery.
133#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
134pub enum AppLifecycleState {
135    Background,
136    Foreground { background_duration_ms: u64 },
137}
138
139/// Reason for a cleanup-only operation.
140#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
141pub enum CleanupReason {
142    AppTerminating,
143    UserLogout,
144    StaleConnectionSuspected,
145    ManualReset,
146}
147
148/// Reason for a forced cleanup + restore operation.
149#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
150pub enum ReconnectReason {
151    NetworkPathChanged,
152    LongBackground,
153    ProbeFailed,
154    ManualReconnect,
155    StaleConnectionSuspected,
156}
157
158/// Network event type
159#[derive(Debug, Clone, PartialEq, Eq, Hash)]
160pub enum NetworkEvent {
161    /// Full mobile network path changed.
162    NetworkPathChanged { snapshot: NetworkSnapshot },
163
164    /// App lifecycle changed.
165    AppLifecycleChanged { state: AppLifecycleState },
166
167    /// Proactively clean up all connections
168    ///
169    /// Used for app lifecycle management scenarios:
170    /// - App entering background
171    /// - User actively logging out
172    /// - App about to exit
173    CleanupConnections { reason: CleanupReason },
174
175    /// Proactively clean up and restore connections.
176    ForceReconnect { reason: ReconnectReason },
177}
178
179/// Final action selected from a settled batch of network events.
180#[derive(Debug, Clone, Copy, PartialEq, Eq)]
181pub enum NetworkRecoveryAction {
182    Noop,
183    Offline,
184    Probe,
185    Restore,
186    CleanupOnly,
187    ForceReconnect,
188}
189
190fn network_event_needs_lifecycle_barrier(event: &NetworkEvent) -> bool {
191    match event {
192        NetworkEvent::NetworkPathChanged { snapshot } => {
193            snapshot.is_offline() || snapshot.should_restore()
194        }
195        NetworkEvent::AppLifecycleChanged { state } => match state {
196            AppLifecycleState::Background => false,
197            AppLifecycleState::Foreground {
198                background_duration_ms,
199            } => *background_duration_ms >= LONG_BACKGROUND_RECONNECT_THRESHOLD_MS,
200        },
201        NetworkEvent::CleanupConnections { .. } | NetworkEvent::ForceReconnect { .. } => true,
202    }
203}
204
205/// Network event processing result
206#[derive(Debug, Clone)]
207pub struct NetworkEventResult {
208    /// Event type
209    pub event: NetworkEvent,
210
211    /// Whether processing succeeded
212    pub success: bool,
213
214    /// Error message (if failed)
215    pub error: Option<String>,
216
217    /// Processing duration (milliseconds)
218    pub duration_ms: u64,
219}
220
221impl NetworkEventResult {
222    pub fn success(event: NetworkEvent, duration_ms: u64) -> Self {
223        Self {
224            event,
225            success: true,
226            error: None,
227            duration_ms,
228        }
229    }
230
231    pub fn failure(event: NetworkEvent, error: String, duration_ms: u64) -> Self {
232        Self {
233            event,
234            success: false,
235            error: Some(error),
236            duration_ms,
237        }
238    }
239}
240
241/// Network event processor trait
242///
243/// Defines the processing logic for network events; can be custom-implemented by users
244#[async_trait::async_trait]
245pub trait NetworkEventProcessor: Send + Sync {
246    /// Enter a lifecycle barrier as soon as a queued event is observed by the
247    /// reconciler. The default is no barrier for custom processors.
248    fn begin_network_event_barrier(&self, _event: &NetworkEvent) -> Option<NetworkEventBarrier> {
249        None
250    }
251
252    /// Process network available event
253    ///
254    /// # Returns
255    /// - `Ok(())`: processing succeeded
256    /// - `Err(String)`: processing failed, contains error message
257    async fn process_network_available(&self) -> Result<(), String>;
258
259    /// Process network lost event
260    ///
261    /// # Returns
262    /// - `Ok(())`: processing succeeded
263    /// - `Err(String)`: processing failed, contains error message
264    async fn process_network_lost(&self) -> Result<(), String>;
265
266    /// Process network type changed event
267    ///
268    /// # Returns
269    /// - `Ok(())`: processing succeeded
270    /// - `Err(String)`: processing failed, contains error message
271    async fn process_network_type_changed(
272        &self,
273        is_wifi: bool,
274        is_cellular: bool,
275    ) -> Result<(), String>;
276
277    /// Proactively clean up all connections
278    ///
279    /// This method proactively cleans up all network connections. Applicable scenarios:
280    /// - App entering background (iOS/Android)
281    /// - User actively logging out
282    /// - App about to exit
283    /// - Need to reset network state
284    ///
285    /// # FFI Binding Note
286    ///
287    /// This method is specifically designed for FFI bindings, allowing upper-layer
288    /// platform code (Swift/Kotlin) to proactively manage connection lifecycle
289    /// through the unified `NetworkEventProcessor` interface.
290    ///
291    /// # Difference from Event Response
292    ///
293    /// - `process_network_lost()`: passively responds to network disconnection events
294    /// - `cleanup_connections()`: proactively cleans up connections (independent of network events)
295    ///
296    /// # Returns
297    /// - `Ok(())`: cleanup succeeded
298    /// - `Err(String)`: cleanup failed, contains error message
299    async fn cleanup_connections(&self) -> Result<(), String>;
300
301    /// Probe existing connectivity without forcing cleanup.
302    async fn probe_connectivity(&self) -> Result<(), String> {
303        Ok(())
304    }
305
306    /// Proactively clean up and restore connections.
307    async fn force_reconnect(&self) -> Result<(), String> {
308        self.cleanup_connections().await?;
309        self.process_network_available().await
310    }
311
312    /// Process the final action selected from a settled event batch.
313    ///
314    /// Custom processors can rely on the default mapping. The default runtime
315    /// processor overrides this to bypass per-event debounce after reconciliation.
316    async fn process_network_recovery_action(
317        &self,
318        action: NetworkRecoveryAction,
319    ) -> Result<(), String> {
320        match action {
321            NetworkRecoveryAction::Noop => Ok(()),
322            NetworkRecoveryAction::Offline => self.process_network_lost().await,
323            NetworkRecoveryAction::Probe => self.probe_connectivity().await,
324            NetworkRecoveryAction::Restore => self.process_network_available().await,
325            NetworkRecoveryAction::CleanupOnly => self.cleanup_connections().await,
326            NetworkRecoveryAction::ForceReconnect => self.force_reconnect().await,
327        }
328    }
329}
330
331/// Debounce configuration
332#[derive(Debug, Clone)]
333pub struct DebounceConfig {
334    /// Debounce time window (duplicate events within this window are ignored)
335    pub window: Duration,
336}
337
338impl Default for DebounceConfig {
339    fn default() -> Self {
340        Self {
341            // Default debounce window
342            window: Duration::from_secs(2),
343        }
344    }
345}
346
347/// Debounce state tracking
348#[derive(Debug)]
349struct DebounceState {
350    last_available: tokio::sync::Mutex<Option<Instant>>,
351    last_lost: tokio::sync::Mutex<Option<Instant>>,
352    last_type_changed: tokio::sync::Mutex<Option<Instant>>,
353}
354
355impl DebounceState {
356    fn new() -> Self {
357        Self {
358            last_available: tokio::sync::Mutex::new(None),
359            last_lost: tokio::sync::Mutex::new(None),
360            last_type_changed: tokio::sync::Mutex::new(None),
361        }
362    }
363}
364
365#[derive(Debug, Clone, Copy, PartialEq, Eq)]
366enum DebounceEvent {
367    Available,
368    Lost,
369    TypeChanged,
370}
371
372#[derive(Debug)]
373struct SignalingRecoveryState {
374    connect_lock: tokio::sync::Mutex<()>,
375    last_successful_connect: tokio::sync::Mutex<Option<Instant>>,
376}
377
378impl SignalingRecoveryState {
379    fn new() -> Self {
380        Self {
381            connect_lock: tokio::sync::Mutex::new(()),
382            last_successful_connect: tokio::sync::Mutex::new(None),
383        }
384    }
385}
386
387/// Default network event processor implementation
388pub struct DefaultNetworkEventProcessor {
389    signaling_client: Arc<dyn SignalingClient>,
390    webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
391    peer_transport: Option<Arc<PeerTransport>>,
392    debounce_config: DebounceConfig,
393    debounce_state: Arc<DebounceState>,
394    recovery_state: Arc<SignalingRecoveryState>,
395}
396
397impl DefaultNetworkEventProcessor {
398    pub fn new(
399        signaling_client: Arc<dyn SignalingClient>,
400        webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
401    ) -> Self {
402        Self::new_with_debounce_and_peer_transport(
403            signaling_client,
404            webrtc_coordinator,
405            DebounceConfig::default(),
406            None,
407        )
408    }
409
410    pub fn new_with_debounce(
411        signaling_client: Arc<dyn SignalingClient>,
412        webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
413        debounce_config: DebounceConfig,
414    ) -> Self {
415        Self::new_with_debounce_and_peer_transport(
416            signaling_client,
417            webrtc_coordinator,
418            debounce_config,
419            None,
420        )
421    }
422
423    pub(crate) fn new_with_peer_transport(
424        signaling_client: Arc<dyn SignalingClient>,
425        webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
426        peer_transport: Option<Arc<PeerTransport>>,
427    ) -> Self {
428        Self::new_with_debounce_and_peer_transport(
429            signaling_client,
430            webrtc_coordinator,
431            DebounceConfig::default(),
432            peer_transport,
433        )
434    }
435
436    pub(crate) fn new_with_debounce_and_peer_transport(
437        signaling_client: Arc<dyn SignalingClient>,
438        webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
439        debounce_config: DebounceConfig,
440        peer_transport: Option<Arc<PeerTransport>>,
441    ) -> Self {
442        Self {
443            signaling_client,
444            webrtc_coordinator,
445            peer_transport,
446            debounce_config,
447            debounce_state: Arc::new(DebounceState::new()),
448            recovery_state: Arc::new(SignalingRecoveryState::new()),
449        }
450    }
451
452    fn lifecycle_barrier(&self) -> Option<NetworkEventBarrier> {
453        self.webrtc_coordinator
454            .as_ref()
455            .map(|coordinator| NetworkEventBarrier {
456                _cleanup_guard: coordinator.cleanup_guard(),
457            })
458    }
459
460    /// Check whether an event should be filtered by debounce
461    ///
462    /// # Returns
463    /// - `true`: the event should be processed
464    /// - `false`: the event is within the debounce window and should be ignored
465    async fn should_process_event(&self, event: DebounceEvent) -> bool {
466        let now = Instant::now();
467
468        match event {
469            DebounceEvent::Available => {
470                let mut last = self.debounce_state.last_available.lock().await;
471                if let Some(last_time) = *last {
472                    if now.duration_since(last_time) < self.debounce_config.window {
473                        tracing::debug!(
474                            "⏸️  Debouncing Network Available event (last event was {:?} ago)",
475                            now.duration_since(last_time)
476                        );
477                        return false;
478                    }
479                }
480                *last = Some(now);
481                true
482            }
483            DebounceEvent::Lost => {
484                let mut last = self.debounce_state.last_lost.lock().await;
485                if let Some(last_time) = *last {
486                    if now.duration_since(last_time) < self.debounce_config.window {
487                        tracing::debug!(
488                            "⏸️  Debouncing Network Lost event (last event was {:?} ago)",
489                            now.duration_since(last_time)
490                        );
491                        return false;
492                    }
493                }
494                *last = Some(now);
495                true
496            }
497            DebounceEvent::TypeChanged => {
498                let mut last = self.debounce_state.last_type_changed.lock().await;
499                if let Some(last_time) = *last {
500                    if now.duration_since(last_time) < self.debounce_config.window {
501                        tracing::debug!(
502                            "⏸️  Debouncing Network TypeChanged event (last event was {:?} ago)",
503                            now.duration_since(last_time)
504                        );
505                        return false;
506                    }
507                }
508                *last = Some(now);
509                true
510            }
511        }
512    }
513
514    async fn ensure_signaling_healthy_once(&self, reason: &str) -> Result<(), String> {
515        let _guard = self.recovery_state.connect_lock.lock().await;
516
517        if !self.signaling_client.is_connected() {
518            tracing::info!(
519                reason = reason,
520                "Network recovery event resetting signaling reconnect backoff before connect"
521            );
522            self.signaling_client
523                .schedule_auto_reconnect_reset_backoff();
524            tracing::info!(reason = reason, "🔄 Connecting signaling");
525            self.signaling_client.connect_once().await.map_err(|e| {
526                let err_msg = format!("WebSocket connect failed: {}", e);
527                tracing::error!("❌ {}", err_msg);
528                err_msg
529            })?;
530
531            *self.recovery_state.last_successful_connect.lock().await = Some(Instant::now());
532            tracing::info!(reason = reason, "✅ Signaling connected");
533            return Ok(());
534        }
535
536        tracing::debug!(
537            reason = reason,
538            timeout_ms = SIGNALING_PROBE_TIMEOUT.as_millis() as u64,
539            "🔎 Probing existing signaling WebSocket"
540        );
541
542        match self
543            .signaling_client
544            .probe_alive(SIGNALING_PROBE_TIMEOUT)
545            .await
546        {
547            Ok(()) => {
548                tracing::debug!(
549                    reason = reason,
550                    "✅ Signaling probe succeeded; keeping existing WebSocket"
551                );
552                Ok(())
553            }
554            Err(e) => {
555                tracing::warn!(
556                    reason = reason,
557                    "⚠️ Signaling probe failed; rebuilding WebSocket: {}",
558                    e
559                );
560
561                if let Err(disconnect_err) = self.signaling_client.disconnect().await {
562                    tracing::warn!(
563                        reason = reason,
564                        "⚠️ Failed to disconnect unhealthy signaling before rebuild: {}",
565                        disconnect_err
566                    );
567                }
568
569                tracing::info!(
570                    reason = reason,
571                    "Network recovery event resetting signaling reconnect backoff before rebuild"
572                );
573                self.signaling_client
574                    .schedule_auto_reconnect_reset_backoff();
575                tracing::info!(reason = reason, "🔄 Rebuilding signaling: connecting");
576                self.signaling_client
577                    .connect_once()
578                    .await
579                    .map_err(|connect_err| {
580                        let err_msg = format!("WebSocket rebuild failed: {}", connect_err);
581                        tracing::error!("❌ {}", err_msg);
582                        err_msg
583                    })?;
584
585                *self.recovery_state.last_successful_connect.lock().await = Some(Instant::now());
586                tracing::info!(reason = reason, "✅ Signaling rebuilt");
587                Ok(())
588            }
589        }
590    }
591
592    async fn restore_signaling_and_webrtc(&self, reason: &str) -> Result<(), String> {
593        let _cleanup_guard = self.lifecycle_barrier();
594        let recovery_targets = if let Some(coordinator) = self.webrtc_coordinator.clone() {
595            coordinator.begin_network_recovery(reason).await
596        } else {
597            Vec::new()
598        };
599
600        self.ensure_signaling_healthy_once(reason).await?;
601
602        let coordinator = self.webrtc_coordinator.clone();
603
604        if let Some(coordinator) = coordinator {
605            if recovery_targets.is_empty() {
606                tracing::info!("♻️ Resuming ICE restart for peers already in network recovery");
607            } else {
608                tracing::info!("♻️ Triggering ICE restart for recovering connections...");
609            }
610            coordinator.restart_network_recovery_connections().await;
611        }
612
613        Ok(())
614    }
615
616    fn schedule_auto_reconnect_after_recovery_failure(&self, reason: &str, err: &str) {
617        tracing::warn!(
618            reason = reason,
619            error = %err,
620            "Network recovery failed; ensuring signaling auto-reconnect remains scheduled"
621        );
622        self.signaling_client.schedule_auto_reconnect();
623    }
624
625    async fn restore_signaling_and_webrtc_from_network_event(
626        &self,
627        reason: &str,
628    ) -> Result<(), String> {
629        let result = self.restore_signaling_and_webrtc(reason).await;
630        if let Err(err) = &result {
631            self.schedule_auto_reconnect_after_recovery_failure(reason, err);
632        }
633        result
634    }
635
636    async fn probe_or_restore(&self, reason: &str) -> Result<(), String> {
637        match self.probe_connectivity().await {
638            Ok(()) => Ok(()),
639            Err(e) => {
640                tracing::warn!(
641                    reason = reason,
642                    "Connectivity probe failed; restoring connections: {}",
643                    e
644                );
645                if let Err(disconnect_err) = self.signaling_client.disconnect().await {
646                    tracing::warn!(
647                        reason = reason,
648                        "Failed to disconnect unhealthy signaling before restore: {}",
649                        disconnect_err
650                    );
651                }
652                self.restore_signaling_and_webrtc_from_network_event(reason)
653                    .await
654            }
655        }
656    }
657
658    async fn process_offline(&self) -> Result<(), String> {
659        let _cleanup_guard = self.lifecycle_barrier();
660        tracing::info!("📱 Processing: Network offline");
661
662        if let Some(ref coordinator) = self.webrtc_coordinator {
663            coordinator.begin_network_recovery("NetworkLost").await;
664            tracing::info!("🧹 Clearing pending ICE restart attempts...");
665            coordinator.clear_pending_restarts().await;
666        }
667
668        tracing::info!("🔌 Disconnecting WebSocket...");
669        let _ = self.signaling_client.disconnect().await;
670
671        Ok(())
672    }
673}
674
675#[async_trait::async_trait]
676impl NetworkEventProcessor for DefaultNetworkEventProcessor {
677    fn begin_network_event_barrier(&self, event: &NetworkEvent) -> Option<NetworkEventBarrier> {
678        if network_event_needs_lifecycle_barrier(event) {
679            self.lifecycle_barrier()
680        } else {
681            None
682        }
683    }
684
685    /// Process network available event
686    async fn process_network_available(&self) -> Result<(), String> {
687        // Debounce check
688        let should_process = self.should_process_event(DebounceEvent::Available).await;
689        if !should_process && self.signaling_client.is_connected() {
690            return Ok(());
691        }
692
693        tracing::info!("📱 Processing: Network available");
694
695        self.restore_signaling_and_webrtc_from_network_event("NetworkAvailable")
696            .await
697    }
698
699    /// Process network lost event
700    async fn process_network_lost(&self) -> Result<(), String> {
701        // Debounce check
702        if !self.should_process_event(DebounceEvent::Lost).await {
703            return Ok(());
704        }
705
706        self.process_offline().await
707    }
708
709    /// Process network type changed event
710    async fn process_network_type_changed(
711        &self,
712        is_wifi: bool,
713        is_cellular: bool,
714    ) -> Result<(), String> {
715        // Debounce check
716        let should_process = self.should_process_event(DebounceEvent::TypeChanged).await;
717        if !should_process && self.signaling_client.is_connected() {
718            return Ok(());
719        }
720
721        tracing::info!(
722            "📱 Processing: Network type changed (WiFi={}, Cellular={})",
723            is_wifi,
724            is_cellular
725        );
726
727        self.restore_signaling_and_webrtc_from_network_event("NetworkTypeChanged")
728            .await
729    }
730
731    /// Proactively clean up all connections
732    ///
733    /// Differs from `process_network_lost()`:
734    /// - No debounce check (proactive calls always execute)
735    /// - Intended for app lifecycle management, not network event response
736    async fn cleanup_connections(&self) -> Result<(), String> {
737        let _cleanup_guard = self.lifecycle_barrier();
738
739        tracing::info!("🧹 Manually cleaning up all connections...");
740
741        // Step 1: Clear pending ICE restart attempts
742        if let Some(ref coordinator) = self.webrtc_coordinator {
743            tracing::info!("♻️  Clearing pending ICE restart attempts...");
744            coordinator.clear_pending_restarts().await;
745        }
746
747        // Step 2: Cancel PeerTransport singleflight and close established transports.
748        if let Some(ref peer_transport) = self.peer_transport {
749            tracing::info!("🔻 Closing all PeerTransport connections...");
750            if let Err(e) = peer_transport.close_all().await {
751                let err_msg = format!("Failed to close peer transports: {}", e);
752                tracing::warn!("⚠️  {}", err_msg);
753                // Do not fail the whole cleanup; continue releasing other resources.
754            } else {
755                tracing::info!("✅ All PeerTransport connections closed");
756            }
757        }
758
759        // Step 3: Close all WebRTC peer connections
760        if let Some(ref coordinator) = self.webrtc_coordinator {
761            tracing::info!("🔻 Closing all WebRTC peer connections...");
762            if let Err(e) = coordinator.close_all_peers().await {
763                let err_msg = format!("Failed to close all peers: {}", e);
764                tracing::warn!("⚠️  {}", err_msg);
765                // Do not fail the whole cleanup; continue releasing other resources.
766            } else {
767                tracing::info!("✅ All WebRTC peer connections closed");
768            }
769        }
770
771        // Step 4: Proactively disconnect the WebSocket.
772        tracing::info!("🔌 Disconnecting WebSocket...");
773        match self.signaling_client.disconnect().await {
774            Ok(_) => {
775                tracing::info!("✅ WebSocket disconnected successfully");
776            }
777            Err(e) => {
778                let err_msg = format!("Failed to disconnect WebSocket: {}", e);
779                tracing::warn!("⚠️  {}", err_msg);
780                // Do not fail the whole cleanup; continue releasing other resources.
781            }
782        }
783
784        tracing::info!("✅ Connection cleanup completed");
785
786        Ok(())
787    }
788
789    async fn probe_connectivity(&self) -> Result<(), String> {
790        self.signaling_client
791            .probe_alive(SIGNALING_PROBE_TIMEOUT)
792            .await
793            .map_err(|e| format!("Signaling probe failed: {}", e))
794    }
795
796    async fn force_reconnect(&self) -> Result<(), String> {
797        self.cleanup_connections().await?;
798        self.restore_signaling_and_webrtc_from_network_event("ForceReconnect")
799            .await
800    }
801
802    async fn process_network_recovery_action(
803        &self,
804        action: NetworkRecoveryAction,
805    ) -> Result<(), String> {
806        match action {
807            NetworkRecoveryAction::Noop => Ok(()),
808            NetworkRecoveryAction::Offline => self.process_offline().await,
809            NetworkRecoveryAction::Probe => self.probe_or_restore("Probe").await,
810            NetworkRecoveryAction::Restore => {
811                self.restore_signaling_and_webrtc_from_network_event("NetworkEventBatch")
812                    .await
813            }
814            NetworkRecoveryAction::CleanupOnly => self.cleanup_connections().await,
815            NetworkRecoveryAction::ForceReconnect => self.force_reconnect().await,
816        }
817    }
818}
819
820pub fn select_network_recovery_action(events: &[NetworkEvent]) -> NetworkRecoveryAction {
821    ConnectionSupervisor::select_action(events)
822}
823
824pub async fn process_network_event_batch(
825    events: Vec<NetworkEvent>,
826    processor: Arc<dyn NetworkEventProcessor>,
827) -> Vec<NetworkEventResult> {
828    if events.is_empty() {
829        return Vec::new();
830    }
831
832    let action = select_network_recovery_action(&events);
833    let start = Instant::now();
834
835    tracing::info!(
836        event_count = events.len(),
837        action = ?action,
838        "network_event.action.start"
839    );
840
841    let result = processor.process_network_recovery_action(action).await;
842
843    let duration_ms = start.elapsed().as_millis() as u64;
844    match &result {
845        Ok(()) => tracing::info!(
846            event_count = events.len(),
847            action = ?action,
848            duration_ms,
849            "network_event.action.completed"
850        ),
851        Err(e) => tracing::warn!(
852            event_count = events.len(),
853            action = ?action,
854            duration_ms,
855            error = %e,
856            "network_event.action.completed"
857        ),
858    }
859
860    events
861        .into_iter()
862        .map(|event| match &result {
863            Ok(()) => NetworkEventResult::success(event, duration_ms),
864            Err(e) => NetworkEventResult::failure(event, e.clone(), duration_ms),
865        })
866        .collect()
867}
868
869pub struct NetworkEventRequest {
870    pub event: NetworkEvent,
871    pub result_tx: oneshot::Sender<NetworkEventResult>,
872}
873
874pub async fn run_network_event_reconciler(
875    mut event_rx: mpsc::Receiver<NetworkEventRequest>,
876    processor: Arc<dyn NetworkEventProcessor>,
877    shutdown_token: CancellationToken,
878) {
879    tracing::info!("🔄 Network event reconciler started");
880
881    loop {
882        tokio::select! {
883            Some(first_request) = event_rx.recv() => {
884                tracing::debug!(
885                    event = ?first_request.event,
886                    "network_event.reconciler.received"
887                );
888                let mut event_barrier = processor.begin_network_event_barrier(&first_request.event);
889                let mut requests = vec![first_request];
890                let settle = tokio::time::sleep(NETWORK_EVENT_SETTLE_WINDOW);
891                tokio::pin!(settle);
892
893                loop {
894                    tokio::select! {
895                        Some(next_request) = event_rx.recv() => {
896                            tracing::debug!(
897                                event = ?next_request.event,
898                                "network_event.reconciler.coalesced"
899                            );
900                            if event_barrier.is_none() {
901                                event_barrier = processor.begin_network_event_barrier(&next_request.event);
902                            }
903                            requests.push(next_request);
904                        }
905                        _ = &mut settle => {
906                            break;
907                        }
908                        _ = shutdown_token.cancelled() => {
909                            tracing::info!("🛑 Network event reconciler shutting down");
910                            return;
911                        }
912                        else => {
913                            break;
914                        }
915                    }
916                }
917
918                while let Ok(next_request) = event_rx.try_recv() {
919                    tracing::debug!(
920                        event = ?next_request.event,
921                        "network_event.reconciler.coalesced"
922                    );
923                    if event_barrier.is_none() {
924                        event_barrier = processor.begin_network_event_barrier(&next_request.event);
925                    }
926                    requests.push(next_request);
927                }
928
929                let events = requests
930                    .iter()
931                    .map(|request| request.event.clone())
932                    .collect::<Vec<_>>();
933                let action = select_network_recovery_action(&events);
934                let facts = events
935                    .iter()
936                    .map(ConnectionFact::from_network_event)
937                    .collect::<Vec<_>>();
938                tracing::info!(
939                    event_count = events.len(),
940                    action = ?action,
941                    events = ?events,
942                    facts = ?facts,
943                    settle_window_ms = NETWORK_EVENT_SETTLE_WINDOW.as_millis() as u64,
944                    "network_event.reconciler.batch_reconciled"
945                );
946
947                let results = process_network_event_batch(events, processor.clone()).await;
948                drop(event_barrier);
949
950                for (request, result) in requests.into_iter().zip(results) {
951                    if request.result_tx.send(result).is_err() {
952                        tracing::debug!("Network event caller dropped before receiving result");
953                    }
954                }
955            }
956            _ = shutdown_token.cancelled() => {
957                tracing::info!("🛑 Network event reconciler shutting down");
958                break;
959            }
960            else => break,
961        }
962    }
963}
964
965/// Network Event Handle
966///
967/// Lightweight handle for sending network events and receiving processing results.
968/// Created before `ActrNode::start()` to bridge platform network events.
969pub struct NetworkEventHandle {
970    /// Event sender (to ActrNode)
971    event_tx: mpsc::Sender<NetworkEventRequest>,
972    result_timeout: Duration,
973}
974
975impl NetworkEventHandle {
976    /// Create a new NetworkEventHandle
977    pub fn new(event_tx: mpsc::Sender<NetworkEventRequest>) -> Self {
978        Self::new_with_result_timeout(event_tx, NETWORK_EVENT_RESULT_TIMEOUT)
979    }
980
981    /// Create a new NetworkEventHandle with a custom result timeout.
982    ///
983    /// Production bindings use [`NetworkEventHandle::new`]. Tests can use this
984    /// constructor to verify bounded waiting without sleeping for the full
985    /// binding timeout.
986    pub fn new_with_result_timeout(
987        event_tx: mpsc::Sender<NetworkEventRequest>,
988        result_timeout: Duration,
989    ) -> Self {
990        Self {
991            event_tx,
992            result_timeout,
993        }
994    }
995
996    /// Handle full network path changes.
997    pub async fn handle_network_path_changed(
998        &self,
999        snapshot: NetworkSnapshot,
1000    ) -> Result<NetworkEventResult, String> {
1001        self.send_event_and_await_result(NetworkEvent::NetworkPathChanged { snapshot })
1002            .await
1003    }
1004
1005    /// Handle app lifecycle changes.
1006    pub async fn handle_app_lifecycle_changed(
1007        &self,
1008        state: AppLifecycleState,
1009    ) -> Result<NetworkEventResult, String> {
1010        self.send_event_and_await_result(NetworkEvent::AppLifecycleChanged { state })
1011            .await
1012    }
1013
1014    /// Proactively clean up all connections with a reason. This never reconnects.
1015    pub async fn cleanup_connections(
1016        &self,
1017        reason: CleanupReason,
1018    ) -> Result<NetworkEventResult, String> {
1019        self.send_event_and_await_result(NetworkEvent::CleanupConnections { reason })
1020            .await
1021    }
1022
1023    /// Force cleanup and reconnect.
1024    pub async fn force_reconnect(
1025        &self,
1026        reason: ReconnectReason,
1027    ) -> Result<NetworkEventResult, String> {
1028        self.send_event_and_await_result(NetworkEvent::ForceReconnect { reason })
1029            .await
1030    }
1031
1032    /// Send event and await result (internal helper)
1033    async fn send_event_and_await_result(
1034        &self,
1035        event: NetworkEvent,
1036    ) -> Result<NetworkEventResult, String> {
1037        let event_request_id = NEXT_NETWORK_EVENT_REQUEST_ID.fetch_add(1, Ordering::Relaxed);
1038        let start = Instant::now();
1039        let (result_tx, result_rx) = oneshot::channel();
1040        let request = NetworkEventRequest {
1041            event: event.clone(),
1042            result_tx,
1043        };
1044
1045        tracing::info!(
1046            event_request_id,
1047            event = ?event,
1048            result_timeout_ms = self.result_timeout.as_millis() as u64,
1049            "network_event.handle.enqueue"
1050        );
1051
1052        if let Err(e) = self.event_tx.send(request).await {
1053            let err = format!("Failed to send network event: {}", e);
1054            tracing::warn!(
1055                event_request_id,
1056                event = ?event,
1057                error = %err,
1058                "network_event.handle.enqueue_failed"
1059            );
1060            return Err(err);
1061        }
1062
1063        let result = match tokio::time::timeout(self.result_timeout, result_rx).await {
1064            Ok(Ok(result)) => Ok(result),
1065            Ok(Err(_)) => Err("Failed to receive network event result".to_string()),
1066            Err(_) => Err(format!(
1067                "Timed out waiting for network event result after {}ms",
1068                self.result_timeout.as_millis()
1069            )),
1070        };
1071
1072        let wait_ms = start.elapsed().as_millis() as u64;
1073        match &result {
1074            Ok(result) if result.success => tracing::info!(
1075                event_request_id,
1076                event = ?event,
1077                result_event = ?result.event,
1078                duration_ms = result.duration_ms,
1079                wait_ms,
1080                "network_event.handle.result_received"
1081            ),
1082            Ok(result) => tracing::warn!(
1083                event_request_id,
1084                event = ?event,
1085                result_event = ?result.event,
1086                duration_ms = result.duration_ms,
1087                wait_ms,
1088                error = ?result.error,
1089                "network_event.handle.result_received"
1090            ),
1091            Err(e) => tracing::warn!(
1092                event_request_id,
1093                event = ?event,
1094                wait_ms,
1095                error = %e,
1096                "network_event.handle.result_failed"
1097            ),
1098        }
1099
1100        result
1101    }
1102}
1103
1104impl Clone for NetworkEventHandle {
1105    fn clone(&self) -> Self {
1106        Self {
1107            event_tx: self.event_tx.clone(),
1108            result_timeout: self.result_timeout,
1109        }
1110    }
1111}
1112
1113#[cfg(test)]
1114mod tests {
1115    use super::*;
1116    use crate::lifecycle::CredentialState;
1117    use crate::transport::{NetworkError, NetworkResult};
1118    use crate::wire::webrtc::{SignalingEvent, SignalingStats};
1119    use actr_protocol::{
1120        AIdCredential, ActrId, Pong, RegisterRequest, RegisterResponse, RouteCandidatesRequest,
1121        RouteCandidatesResponse, ServiceAvailabilityState, SignalingEnvelope, UnregisterResponse,
1122    };
1123    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering as AtomicOrdering};
1124    use tokio::sync::broadcast;
1125
1126    struct ForceReconnectFakeSignalingClient {
1127        connected: AtomicBool,
1128        connect_once_should_fail: bool,
1129        disconnect_calls: AtomicUsize,
1130        connect_once_calls: AtomicUsize,
1131        schedule_auto_reconnect_calls: AtomicUsize,
1132        schedule_auto_reconnect_reset_backoff_calls: AtomicUsize,
1133        event_tx: broadcast::Sender<SignalingEvent>,
1134    }
1135
1136    impl ForceReconnectFakeSignalingClient {
1137        fn new(connect_once_should_fail: bool) -> Self {
1138            let (event_tx, _rx) = broadcast::channel(8);
1139            Self {
1140                connected: AtomicBool::new(false),
1141                connect_once_should_fail,
1142                disconnect_calls: AtomicUsize::new(0),
1143                connect_once_calls: AtomicUsize::new(0),
1144                schedule_auto_reconnect_calls: AtomicUsize::new(0),
1145                schedule_auto_reconnect_reset_backoff_calls: AtomicUsize::new(0),
1146                event_tx,
1147            }
1148        }
1149    }
1150
1151    #[async_trait::async_trait]
1152    impl SignalingClient for ForceReconnectFakeSignalingClient {
1153        async fn connect(&self) -> NetworkResult<()> {
1154            Ok(())
1155        }
1156
1157        async fn connect_once(&self) -> NetworkResult<()> {
1158            self.connect_once_calls.fetch_add(1, AtomicOrdering::SeqCst);
1159            if self.connect_once_should_fail {
1160                return Err(NetworkError::ConnectionError(
1161                    "forced connect_once failure".to_string(),
1162                ));
1163            }
1164
1165            self.connected.store(true, AtomicOrdering::SeqCst);
1166            Ok(())
1167        }
1168
1169        fn schedule_auto_reconnect(&self) {
1170            self.schedule_auto_reconnect_calls
1171                .fetch_add(1, AtomicOrdering::SeqCst);
1172        }
1173
1174        fn schedule_auto_reconnect_reset_backoff(&self) {
1175            self.schedule_auto_reconnect_reset_backoff_calls
1176                .fetch_add(1, AtomicOrdering::SeqCst);
1177            self.schedule_auto_reconnect();
1178        }
1179
1180        async fn disconnect(&self) -> NetworkResult<()> {
1181            self.disconnect_calls.fetch_add(1, AtomicOrdering::SeqCst);
1182            self.connected.store(false, AtomicOrdering::SeqCst);
1183            Ok(())
1184        }
1185
1186        async fn send_register_request(
1187            &self,
1188            _request: RegisterRequest,
1189        ) -> NetworkResult<RegisterResponse> {
1190            Err(NetworkError::ConnectionError("unused".to_string()))
1191        }
1192
1193        async fn send_unregister_request(
1194            &self,
1195            _actor_id: ActrId,
1196            _credential: AIdCredential,
1197            _reason: Option<String>,
1198        ) -> NetworkResult<UnregisterResponse> {
1199            Err(NetworkError::ConnectionError("unused".to_string()))
1200        }
1201
1202        async fn send_heartbeat(
1203            &self,
1204            _actor_id: ActrId,
1205            _credential: AIdCredential,
1206            _availability: ServiceAvailabilityState,
1207            _power_reserve: f32,
1208            _mailbox_backlog: f32,
1209        ) -> NetworkResult<Pong> {
1210            Err(NetworkError::ConnectionError("unused".to_string()))
1211        }
1212
1213        async fn send_route_candidates_request(
1214            &self,
1215            _actor_id: ActrId,
1216            _credential: AIdCredential,
1217            _request: RouteCandidatesRequest,
1218        ) -> NetworkResult<RouteCandidatesResponse> {
1219            Err(NetworkError::ConnectionError("unused".to_string()))
1220        }
1221
1222        async fn get_signing_key(
1223            &self,
1224            _actor_id: ActrId,
1225            _credential: AIdCredential,
1226            _key_id: u32,
1227        ) -> NetworkResult<(u32, Vec<u8>)> {
1228            Err(NetworkError::ConnectionError("unused".to_string()))
1229        }
1230
1231        async fn send_credential_update_request(
1232            &self,
1233            _actor_id: ActrId,
1234            _credential: AIdCredential,
1235        ) -> NetworkResult<RegisterResponse> {
1236            Err(NetworkError::ConnectionError("unused".to_string()))
1237        }
1238
1239        async fn send_envelope(&self, _envelope: SignalingEnvelope) -> NetworkResult<()> {
1240            Err(NetworkError::ConnectionError("unused".to_string()))
1241        }
1242
1243        async fn receive_envelope(&self) -> NetworkResult<Option<SignalingEnvelope>> {
1244            Err(NetworkError::ConnectionError("unused".to_string()))
1245        }
1246
1247        fn is_connected(&self) -> bool {
1248            self.connected.load(AtomicOrdering::SeqCst)
1249        }
1250
1251        fn get_stats(&self) -> SignalingStats {
1252            SignalingStats::default()
1253        }
1254
1255        fn subscribe_events(&self) -> broadcast::Receiver<SignalingEvent> {
1256            self.event_tx.subscribe()
1257        }
1258
1259        async fn set_actor_id(&self, _actor_id: ActrId) {}
1260
1261        async fn set_credential_state(&self, _credential_state: CredentialState) {}
1262
1263        async fn clear_identity(&self) {}
1264    }
1265
1266    fn snapshot(sequence: u64, availability: NetworkAvailability) -> NetworkSnapshot {
1267        NetworkSnapshot {
1268            sequence,
1269            availability,
1270            transport: NetworkTransportFlags::default(),
1271            is_expensive: false,
1272            is_constrained: false,
1273        }
1274    }
1275
1276    #[test]
1277    fn lifecycle_barrier_is_scoped_to_events_that_change_connections() {
1278        let cases = [
1279            (
1280                NetworkEvent::NetworkPathChanged {
1281                    snapshot: snapshot(1, NetworkAvailability::Unavailable),
1282                },
1283                true,
1284            ),
1285            (
1286                NetworkEvent::NetworkPathChanged {
1287                    snapshot: snapshot(2, NetworkAvailability::Available),
1288                },
1289                true,
1290            ),
1291            (
1292                NetworkEvent::NetworkPathChanged {
1293                    snapshot: snapshot(3, NetworkAvailability::Unknown),
1294                },
1295                false,
1296            ),
1297            (
1298                NetworkEvent::AppLifecycleChanged {
1299                    state: AppLifecycleState::Background,
1300                },
1301                false,
1302            ),
1303            (
1304                NetworkEvent::AppLifecycleChanged {
1305                    state: AppLifecycleState::Foreground {
1306                        background_duration_ms: LONG_BACKGROUND_RECONNECT_THRESHOLD_MS - 1,
1307                    },
1308                },
1309                false,
1310            ),
1311            (
1312                NetworkEvent::AppLifecycleChanged {
1313                    state: AppLifecycleState::Foreground {
1314                        background_duration_ms: LONG_BACKGROUND_RECONNECT_THRESHOLD_MS,
1315                    },
1316                },
1317                true,
1318            ),
1319            (
1320                NetworkEvent::CleanupConnections {
1321                    reason: CleanupReason::ManualReset,
1322                },
1323                true,
1324            ),
1325            (
1326                NetworkEvent::ForceReconnect {
1327                    reason: ReconnectReason::ManualReconnect,
1328                },
1329                true,
1330            ),
1331        ];
1332
1333        for (event, expected) in cases {
1334            assert_eq!(
1335                network_event_needs_lifecycle_barrier(&event),
1336                expected,
1337                "{event:?}"
1338            );
1339        }
1340    }
1341
1342    #[tokio::test]
1343    async fn force_reconnect_failure_schedules_auto_reconnect() {
1344        let signaling = Arc::new(ForceReconnectFakeSignalingClient::new(true));
1345        let processor = DefaultNetworkEventProcessor::new(signaling.clone(), None);
1346
1347        let result = processor.force_reconnect().await;
1348
1349        assert!(result.is_err());
1350        assert_eq!(
1351            signaling.disconnect_calls.load(AtomicOrdering::SeqCst),
1352            1,
1353            "ForceReconnect cleanup should disconnect signaling once"
1354        );
1355        assert_eq!(
1356            signaling.connect_once_calls.load(AtomicOrdering::SeqCst),
1357            1,
1358            "ForceReconnect restore should make one quick connect attempt"
1359        );
1360        assert_eq!(
1361            signaling
1362                .schedule_auto_reconnect_calls
1363                .load(AtomicOrdering::SeqCst),
1364            2,
1365            "ForceReconnect should wake auto-reconnect before restore and keep it scheduled after failure"
1366        );
1367        assert_eq!(
1368            signaling
1369                .schedule_auto_reconnect_reset_backoff_calls
1370                .load(AtomicOrdering::SeqCst),
1371            1,
1372            "ForceReconnect should reset reconnect backoff before the quick restore attempt"
1373        );
1374    }
1375
1376    #[tokio::test]
1377    async fn restore_failure_schedules_auto_reconnect_reset_backoff() {
1378        let signaling = Arc::new(ForceReconnectFakeSignalingClient::new(true));
1379        let processor = DefaultNetworkEventProcessor::new(signaling.clone(), None);
1380
1381        let result = processor
1382            .process_network_recovery_action(NetworkRecoveryAction::Restore)
1383            .await;
1384
1385        assert!(result.is_err());
1386        assert_eq!(
1387            signaling.connect_once_calls.load(AtomicOrdering::SeqCst),
1388            1,
1389            "Restore should make one quick connect attempt"
1390        );
1391        assert_eq!(
1392            signaling
1393                .schedule_auto_reconnect_reset_backoff_calls
1394                .load(AtomicOrdering::SeqCst),
1395            1,
1396            "failed Restore should reset reconnect backoff"
1397        );
1398    }
1399
1400    #[tokio::test]
1401    async fn restore_schedules_reset_backoff_before_quick_connect() {
1402        let signaling = Arc::new(ForceReconnectFakeSignalingClient::new(false));
1403        let processor = DefaultNetworkEventProcessor::new(signaling.clone(), None);
1404
1405        processor
1406            .process_network_recovery_action(NetworkRecoveryAction::Restore)
1407            .await
1408            .expect("Restore should connect successfully");
1409
1410        assert_eq!(
1411            signaling.connect_once_calls.load(AtomicOrdering::SeqCst),
1412            1,
1413            "Restore should make one quick connect attempt"
1414        );
1415        assert_eq!(
1416            signaling
1417                .schedule_auto_reconnect_reset_backoff_calls
1418                .load(AtomicOrdering::SeqCst),
1419            1,
1420            "Restore should reset reconnect backoff before the quick connect attempt"
1421        );
1422    }
1423}