Skip to main content

actr_hyper/lifecycle/
network_event.rs

1//! Network Event Handling Architecture
2//!
3//! This module defines the network event handling infrastructure.
4//!
5//! # Architecture Overview
6//!
7//! ```text
8//!        ┌─────────────────────────────────────────────┐
9//!        │ (FFI Path - Implemented)  (Actor Path - TODO)
10//!        ▼                                             ▼
11//! ┌──────────────────────────┐      ┌──────────────────────────┐
12//! │ NetworkEventHandle       │      │ Direct Proto Message     │
13//! │ • Platform FFI calls     │      │ • Actor call/tell        │
14//! │ • Send via channel       │      │ • Send to actor mailbox  │
15//! │ • Await result           │      │ • No handle needed       │
16//! └────────┬─────────────────┘      └──────┬───────────────────┘
17//!          │                               │
18//!          └───────────────┬───────────────┘
19//!                          │ Both trigger
20//!                          ▼
21//! ┌─────────────────────────────────────────────────────────┐
22//! │  ActrNode::network_event_loop()                         │
23//! │  • Receive event from channel (FFI path)                │
24//! │  • Or handle message directly (Actor path - TODO)       │
25//! │  • Delegate to NetworkEventProcessor                    │
26//! │  • Send result back via channel                         │
27//! └──────────────────────┬──────────────────────────────────┘
28//!                        │ Delegate
29//!                        ▼
30//! ┌─────────────────────────────────────────────────────────┐
31//! │  NetworkEventProcessor (Trait)                          │
32//! │                                                          │
33//! │  DefaultNetworkEventProcessor:                          │
34//! │  • reconcile settled network/app events                 │
35//! │  • execute one recovery action                          │
36//! │    └─► Offline / Probe / Restore / Cleanup / Reconnect  │
37//! └─────────────────────────────────────────────────────────┘
38//! ```
39//!
40//! # Key Components
41//!
42//! - **NetworkEvent**: Unified mobile network/app/command events
43//! - **NetworkEventResult**: Processing result with success/error/duration
44//! - **NetworkEventProcessor**: Trait for custom event handling logic
45//! - **DefaultNetworkEventProcessor**: Default implementation with signaling + WebRTC recovery
46//!
47//! # Usage Patterns
48//!
49//! ## 1. Platform FFI Call (Primary, Implemented)
50//! ```ignore
51//! // Platform layer calls NetworkEventHandle via FFI
52//! let network_handle = system.create_network_event_handle();
53//! let result = network_handle.handle_network_path_changed(snapshot).await?;
54//! if result.success {
55//!     println!("Processed in {}ms", result.duration_ms);
56//! }
57//! ```
58//!
59//! ## 2. Actor Proto Message (Optional, TODO)
60//! ```ignore
61//! // TODO: actors send proto message directly (not yet implemented)
62//! actor_ref.call(NetworkPathChangedMessage { snapshot }).await?;
63//! ```
64//!
65//! **Key Differences:**
66//! - FFI path: Uses NetworkEventHandle + channel (implemented)
67//! - Actor path: Direct proto message to mailbox (TODO, future enhancement)
68
69use std::sync::{
70    Arc,
71    atomic::{AtomicU64, Ordering},
72};
73use std::time::{Duration, Instant};
74
75use crate::transport::PeerTransport;
76use crate::wire::webrtc::{CleanupGuard, SignalingClient, WebRtcCoordinator};
77use tokio::sync::{mpsc, oneshot};
78use tokio_util::sync::CancellationToken;
79
80use super::connection_supervisor::{ConnectionFact, ConnectionSupervisor};
81
82const NETWORK_EVENT_SETTLE_WINDOW: Duration = Duration::from_millis(400);
83const NETWORK_EVENT_RESULT_TIMEOUT: Duration = Duration::from_secs(5);
84const SIGNALING_PROBE_TIMEOUT: Duration = Duration::from_secs(1);
85pub(super) const LONG_BACKGROUND_RECONNECT_THRESHOLD_MS: u64 = 30_000;
86static NEXT_NETWORK_EVENT_REQUEST_ID: AtomicU64 = AtomicU64::new(1);
87
88/// Keeps outbound sends behind a network-event lifecycle barrier while the
89/// reconciler settles and processes a queued batch.
90pub struct NetworkEventBarrier {
91    _cleanup_guard: CleanupGuard,
92}
93
94/// Mobile network path snapshot.
95#[derive(Debug, Clone, PartialEq, Eq, Hash)]
96pub struct NetworkSnapshot {
97    pub sequence: u64,
98    pub availability: NetworkAvailability,
99    pub transport: NetworkTransportFlags,
100    pub is_expensive: bool,
101    pub is_constrained: bool,
102}
103
104impl NetworkSnapshot {
105    pub fn is_offline(&self) -> bool {
106        matches!(self.availability, NetworkAvailability::Unavailable)
107    }
108
109    pub fn should_restore(&self) -> bool {
110        matches!(self.availability, NetworkAvailability::Available)
111    }
112}
113
114/// Whether the platform currently has a usable network path.
115#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
116pub enum NetworkAvailability {
117    Unknown,
118    Available,
119    Unavailable,
120}
121
122/// Active network transport flags. Multiple flags can be true at the same time.
123#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
124pub struct NetworkTransportFlags {
125    pub wifi: bool,
126    pub cellular: bool,
127    pub ethernet: bool,
128    pub vpn: bool,
129    pub other: bool,
130}
131
132/// App lifecycle state relevant to connection recovery.
133#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
134pub enum AppLifecycleState {
135    Background,
136    Foreground { background_duration_ms: u64 },
137}
138
139/// Reason for a cleanup-only operation.
140#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
141pub enum CleanupReason {
142    AppTerminating,
143    UserLogout,
144    StaleConnectionSuspected,
145    ManualReset,
146}
147
148/// Reason for a forced cleanup + restore operation.
149#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
150pub enum ReconnectReason {
151    NetworkPathChanged,
152    LongBackground,
153    ProbeFailed,
154    ManualReconnect,
155    StaleConnectionSuspected,
156}
157
158/// Network event type
159#[derive(Debug, Clone, PartialEq, Eq, Hash)]
160pub enum NetworkEvent {
161    /// Full mobile network path changed.
162    NetworkPathChanged { snapshot: NetworkSnapshot },
163
164    /// App lifecycle changed.
165    AppLifecycleChanged { state: AppLifecycleState },
166
167    /// Proactively clean up all connections
168    ///
169    /// Used for app lifecycle management scenarios:
170    /// - App entering background
171    /// - User actively logging out
172    /// - App about to exit
173    CleanupConnections { reason: CleanupReason },
174
175    /// Proactively clean up and restore connections.
176    ForceReconnect { reason: ReconnectReason },
177}
178
179/// Final action selected from a settled batch of network events.
180#[derive(Debug, Clone, Copy, PartialEq, Eq)]
181pub enum NetworkRecoveryAction {
182    Noop,
183    Offline,
184    Probe,
185    Restore,
186    CleanupOnly,
187    ForceReconnect,
188}
189
190fn network_event_needs_lifecycle_barrier(event: &NetworkEvent) -> bool {
191    match event {
192        NetworkEvent::NetworkPathChanged { snapshot } => {
193            snapshot.is_offline() || snapshot.should_restore()
194        }
195        NetworkEvent::AppLifecycleChanged { state } => match state {
196            AppLifecycleState::Background => false,
197            AppLifecycleState::Foreground {
198                background_duration_ms,
199            } => *background_duration_ms >= LONG_BACKGROUND_RECONNECT_THRESHOLD_MS,
200        },
201        NetworkEvent::CleanupConnections { .. } | NetworkEvent::ForceReconnect { .. } => true,
202    }
203}
204
205/// Network event processing result
206#[derive(Debug, Clone)]
207pub struct NetworkEventResult {
208    /// Event type
209    pub event: NetworkEvent,
210
211    /// Whether processing succeeded
212    pub success: bool,
213
214    /// Error message (if failed)
215    pub error: Option<String>,
216
217    /// Processing duration (milliseconds)
218    pub duration_ms: u64,
219}
220
221impl NetworkEventResult {
222    pub fn success(event: NetworkEvent, duration_ms: u64) -> Self {
223        Self {
224            event,
225            success: true,
226            error: None,
227            duration_ms,
228        }
229    }
230
231    pub fn failure(event: NetworkEvent, error: String, duration_ms: u64) -> Self {
232        Self {
233            event,
234            success: false,
235            error: Some(error),
236            duration_ms,
237        }
238    }
239}
240
241/// Network event processor trait
242///
243/// Defines the processing logic for network events; can be custom-implemented by users
244#[async_trait::async_trait]
245pub trait NetworkEventProcessor: Send + Sync {
246    /// Enter a lifecycle barrier as soon as a queued event is observed by the
247    /// reconciler. The default is no barrier for custom processors.
248    fn begin_network_event_barrier(&self, _event: &NetworkEvent) -> Option<NetworkEventBarrier> {
249        None
250    }
251
252    /// Process network available event
253    ///
254    /// # Returns
255    /// - `Ok(())`: processing succeeded
256    /// - `Err(String)`: processing failed, contains error message
257    async fn process_network_available(&self) -> Result<(), String>;
258
259    /// Process network lost event
260    ///
261    /// # Returns
262    /// - `Ok(())`: processing succeeded
263    /// - `Err(String)`: processing failed, contains error message
264    async fn process_network_lost(&self) -> Result<(), String>;
265
266    /// Process network type changed event
267    ///
268    /// # Returns
269    /// - `Ok(())`: processing succeeded
270    /// - `Err(String)`: processing failed, contains error message
271    async fn process_network_type_changed(
272        &self,
273        is_wifi: bool,
274        is_cellular: bool,
275    ) -> Result<(), String>;
276
277    /// Proactively clean up all connections
278    ///
279    /// This method proactively cleans up all network connections. Applicable scenarios:
280    /// - App entering background (iOS/Android)
281    /// - User actively logging out
282    /// - App about to exit
283    /// - Need to reset network state
284    ///
285    /// # FFI Binding Note
286    ///
287    /// This method is specifically designed for FFI bindings, allowing upper-layer
288    /// platform code (Swift/Kotlin) to proactively manage connection lifecycle
289    /// through the unified `NetworkEventProcessor` interface.
290    ///
291    /// # Difference from Event Response
292    ///
293    /// - `process_network_lost()`: passively responds to network disconnection events
294    /// - `cleanup_connections()`: proactively cleans up connections (independent of network events)
295    ///
296    /// # Returns
297    /// - `Ok(())`: cleanup succeeded
298    /// - `Err(String)`: cleanup failed, contains error message
299    async fn cleanup_connections(&self) -> Result<(), String>;
300
301    /// Probe existing connectivity without forcing cleanup.
302    async fn probe_connectivity(&self) -> Result<(), String> {
303        Ok(())
304    }
305
306    /// Proactively clean up and restore connections.
307    async fn force_reconnect(&self) -> Result<(), String> {
308        self.cleanup_connections().await?;
309        self.process_network_available().await
310    }
311
312    /// Process the final action selected from a settled event batch.
313    ///
314    /// Custom processors can rely on the default mapping. The default runtime
315    /// processor overrides this to bypass per-event debounce after reconciliation.
316    async fn process_network_recovery_action(
317        &self,
318        action: NetworkRecoveryAction,
319    ) -> Result<(), String> {
320        match action {
321            NetworkRecoveryAction::Noop => Ok(()),
322            NetworkRecoveryAction::Offline => self.process_network_lost().await,
323            NetworkRecoveryAction::Probe => self.probe_connectivity().await,
324            NetworkRecoveryAction::Restore => self.process_network_available().await,
325            NetworkRecoveryAction::CleanupOnly => self.cleanup_connections().await,
326            NetworkRecoveryAction::ForceReconnect => self.force_reconnect().await,
327        }
328    }
329}
330
331/// Debounce configuration
332#[derive(Debug, Clone)]
333pub struct DebounceConfig {
334    /// Debounce time window (duplicate events within this window are ignored)
335    pub window: Duration,
336}
337
338impl Default for DebounceConfig {
339    fn default() -> Self {
340        Self {
341            // Default debounce window
342            window: Duration::from_secs(2),
343        }
344    }
345}
346
347/// Debounce state tracking
348#[derive(Debug)]
349struct DebounceState {
350    last_available: tokio::sync::Mutex<Option<Instant>>,
351    last_lost: tokio::sync::Mutex<Option<Instant>>,
352    last_type_changed: tokio::sync::Mutex<Option<Instant>>,
353}
354
355impl DebounceState {
356    fn new() -> Self {
357        Self {
358            last_available: tokio::sync::Mutex::new(None),
359            last_lost: tokio::sync::Mutex::new(None),
360            last_type_changed: tokio::sync::Mutex::new(None),
361        }
362    }
363}
364
365#[derive(Debug, Clone, Copy, PartialEq, Eq)]
366enum DebounceEvent {
367    Available,
368    Lost,
369    TypeChanged,
370}
371
372#[derive(Debug)]
373struct SignalingRecoveryState {
374    connect_lock: tokio::sync::Mutex<()>,
375    last_successful_connect: tokio::sync::Mutex<Option<Instant>>,
376}
377
378impl SignalingRecoveryState {
379    fn new() -> Self {
380        Self {
381            connect_lock: tokio::sync::Mutex::new(()),
382            last_successful_connect: tokio::sync::Mutex::new(None),
383        }
384    }
385}
386
387/// Default network event processor implementation
388pub struct DefaultNetworkEventProcessor {
389    signaling_client: Arc<dyn SignalingClient>,
390    webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
391    peer_transport: Option<Arc<PeerTransport>>,
392    debounce_config: DebounceConfig,
393    debounce_state: Arc<DebounceState>,
394    recovery_state: Arc<SignalingRecoveryState>,
395}
396
397impl DefaultNetworkEventProcessor {
398    pub fn new(
399        signaling_client: Arc<dyn SignalingClient>,
400        webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
401    ) -> Self {
402        Self::new_with_debounce_and_peer_transport(
403            signaling_client,
404            webrtc_coordinator,
405            DebounceConfig::default(),
406            None,
407        )
408    }
409
410    pub fn new_with_debounce(
411        signaling_client: Arc<dyn SignalingClient>,
412        webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
413        debounce_config: DebounceConfig,
414    ) -> Self {
415        Self::new_with_debounce_and_peer_transport(
416            signaling_client,
417            webrtc_coordinator,
418            debounce_config,
419            None,
420        )
421    }
422
423    pub(crate) fn new_with_peer_transport(
424        signaling_client: Arc<dyn SignalingClient>,
425        webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
426        peer_transport: Option<Arc<PeerTransport>>,
427    ) -> Self {
428        Self::new_with_debounce_and_peer_transport(
429            signaling_client,
430            webrtc_coordinator,
431            DebounceConfig::default(),
432            peer_transport,
433        )
434    }
435
436    pub(crate) fn new_with_debounce_and_peer_transport(
437        signaling_client: Arc<dyn SignalingClient>,
438        webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
439        debounce_config: DebounceConfig,
440        peer_transport: Option<Arc<PeerTransport>>,
441    ) -> Self {
442        Self {
443            signaling_client,
444            webrtc_coordinator,
445            peer_transport,
446            debounce_config,
447            debounce_state: Arc::new(DebounceState::new()),
448            recovery_state: Arc::new(SignalingRecoveryState::new()),
449        }
450    }
451
452    fn lifecycle_barrier(&self) -> Option<NetworkEventBarrier> {
453        self.webrtc_coordinator
454            .as_ref()
455            .map(|coordinator| NetworkEventBarrier {
456                _cleanup_guard: coordinator.cleanup_guard(),
457            })
458    }
459
460    /// Check whether an event should be filtered by debounce
461    ///
462    /// # Returns
463    /// - `true`: the event should be processed
464    /// - `false`: the event is within the debounce window and should be ignored
465    async fn should_process_event(&self, event: DebounceEvent) -> bool {
466        let now = Instant::now();
467
468        match event {
469            DebounceEvent::Available => {
470                let mut last = self.debounce_state.last_available.lock().await;
471                if let Some(last_time) = *last {
472                    if now.duration_since(last_time) < self.debounce_config.window {
473                        tracing::debug!(
474                            "⏸️  Debouncing Network Available event (last event was {:?} ago)",
475                            now.duration_since(last_time)
476                        );
477                        return false;
478                    }
479                }
480                *last = Some(now);
481                true
482            }
483            DebounceEvent::Lost => {
484                let mut last = self.debounce_state.last_lost.lock().await;
485                if let Some(last_time) = *last {
486                    if now.duration_since(last_time) < self.debounce_config.window {
487                        tracing::debug!(
488                            "⏸️  Debouncing Network Lost event (last event was {:?} ago)",
489                            now.duration_since(last_time)
490                        );
491                        return false;
492                    }
493                }
494                *last = Some(now);
495                true
496            }
497            DebounceEvent::TypeChanged => {
498                let mut last = self.debounce_state.last_type_changed.lock().await;
499                if let Some(last_time) = *last {
500                    if now.duration_since(last_time) < self.debounce_config.window {
501                        tracing::debug!(
502                            "⏸️  Debouncing Network TypeChanged event (last event was {:?} ago)",
503                            now.duration_since(last_time)
504                        );
505                        return false;
506                    }
507                }
508                *last = Some(now);
509                true
510            }
511        }
512    }
513
514    async fn ensure_signaling_healthy_once(&self, reason: &str) -> Result<(), String> {
515        let _guard = self.recovery_state.connect_lock.lock().await;
516
517        if !self.signaling_client.is_connected() {
518            tracing::info!(reason = reason, "🔄 Connecting signaling");
519            self.signaling_client.connect_once().await.map_err(|e| {
520                let err_msg = format!("WebSocket connect failed: {}", e);
521                tracing::error!("❌ {}", err_msg);
522                err_msg
523            })?;
524
525            *self.recovery_state.last_successful_connect.lock().await = Some(Instant::now());
526            tracing::info!(reason = reason, "✅ Signaling connected");
527            return Ok(());
528        }
529
530        tracing::debug!(
531            reason = reason,
532            timeout_ms = SIGNALING_PROBE_TIMEOUT.as_millis() as u64,
533            "🔎 Probing existing signaling WebSocket"
534        );
535
536        match self
537            .signaling_client
538            .probe_alive(SIGNALING_PROBE_TIMEOUT)
539            .await
540        {
541            Ok(()) => {
542                tracing::debug!(
543                    reason = reason,
544                    "✅ Signaling probe succeeded; keeping existing WebSocket"
545                );
546                Ok(())
547            }
548            Err(e) => {
549                tracing::warn!(
550                    reason = reason,
551                    "⚠️ Signaling probe failed; rebuilding WebSocket: {}",
552                    e
553                );
554
555                if let Err(disconnect_err) = self.signaling_client.disconnect().await {
556                    tracing::warn!(
557                        reason = reason,
558                        "⚠️ Failed to disconnect unhealthy signaling before rebuild: {}",
559                        disconnect_err
560                    );
561                }
562
563                tracing::info!(reason = reason, "🔄 Rebuilding signaling: connecting");
564                self.signaling_client
565                    .connect_once()
566                    .await
567                    .map_err(|connect_err| {
568                        let err_msg = format!("WebSocket rebuild failed: {}", connect_err);
569                        tracing::error!("❌ {}", err_msg);
570                        err_msg
571                    })?;
572
573                *self.recovery_state.last_successful_connect.lock().await = Some(Instant::now());
574                tracing::info!(reason = reason, "✅ Signaling rebuilt");
575                Ok(())
576            }
577        }
578    }
579
580    async fn restore_signaling_and_webrtc(&self, reason: &str) -> Result<(), String> {
581        let _cleanup_guard = self.lifecycle_barrier();
582        let recovery_targets = if let Some(coordinator) = self.webrtc_coordinator.clone() {
583            coordinator.begin_network_recovery(reason).await
584        } else {
585            Vec::new()
586        };
587
588        self.ensure_signaling_healthy_once(reason).await?;
589
590        let coordinator = self.webrtc_coordinator.clone();
591
592        if let Some(coordinator) = coordinator {
593            if recovery_targets.is_empty() {
594                tracing::info!("♻️ Resuming ICE restart for peers already in network recovery");
595            } else {
596                tracing::info!("♻️ Triggering ICE restart for recovering connections...");
597            }
598            coordinator.restart_network_recovery_connections().await;
599        }
600
601        Ok(())
602    }
603
604    async fn probe_or_restore(&self, reason: &str) -> Result<(), String> {
605        match self.probe_connectivity().await {
606            Ok(()) => Ok(()),
607            Err(e) => {
608                tracing::warn!(
609                    reason = reason,
610                    "Connectivity probe failed; restoring connections: {}",
611                    e
612                );
613                if let Err(disconnect_err) = self.signaling_client.disconnect().await {
614                    tracing::warn!(
615                        reason = reason,
616                        "Failed to disconnect unhealthy signaling before restore: {}",
617                        disconnect_err
618                    );
619                }
620                self.restore_signaling_and_webrtc(reason).await
621            }
622        }
623    }
624
625    async fn process_offline(&self) -> Result<(), String> {
626        let _cleanup_guard = self.lifecycle_barrier();
627        tracing::info!("📱 Processing: Network offline");
628
629        if let Some(ref coordinator) = self.webrtc_coordinator {
630            coordinator.begin_network_recovery("NetworkLost").await;
631            tracing::info!("🧹 Clearing pending ICE restart attempts...");
632            coordinator.clear_pending_restarts().await;
633        }
634
635        tracing::info!("🔌 Disconnecting WebSocket...");
636        let _ = self.signaling_client.disconnect().await;
637
638        Ok(())
639    }
640}
641
642#[async_trait::async_trait]
643impl NetworkEventProcessor for DefaultNetworkEventProcessor {
644    fn begin_network_event_barrier(&self, event: &NetworkEvent) -> Option<NetworkEventBarrier> {
645        if network_event_needs_lifecycle_barrier(event) {
646            self.lifecycle_barrier()
647        } else {
648            None
649        }
650    }
651
652    /// Process network available event
653    async fn process_network_available(&self) -> Result<(), String> {
654        // Debounce check
655        let should_process = self.should_process_event(DebounceEvent::Available).await;
656        if !should_process && self.signaling_client.is_connected() {
657            return Ok(());
658        }
659
660        tracing::info!("📱 Processing: Network available");
661
662        self.restore_signaling_and_webrtc("NetworkAvailable").await
663    }
664
665    /// Process network lost event
666    async fn process_network_lost(&self) -> Result<(), String> {
667        // Debounce check
668        if !self.should_process_event(DebounceEvent::Lost).await {
669            return Ok(());
670        }
671
672        self.process_offline().await
673    }
674
675    /// Process network type changed event
676    async fn process_network_type_changed(
677        &self,
678        is_wifi: bool,
679        is_cellular: bool,
680    ) -> Result<(), String> {
681        // Debounce check
682        let should_process = self.should_process_event(DebounceEvent::TypeChanged).await;
683        if !should_process && self.signaling_client.is_connected() {
684            return Ok(());
685        }
686
687        tracing::info!(
688            "📱 Processing: Network type changed (WiFi={}, Cellular={})",
689            is_wifi,
690            is_cellular
691        );
692
693        self.restore_signaling_and_webrtc("NetworkTypeChanged")
694            .await
695    }
696
697    /// Proactively clean up all connections
698    ///
699    /// Differs from `process_network_lost()`:
700    /// - No debounce check (proactive calls always execute)
701    /// - Intended for app lifecycle management, not network event response
702    async fn cleanup_connections(&self) -> Result<(), String> {
703        let _cleanup_guard = self.lifecycle_barrier();
704
705        tracing::info!("🧹 Manually cleaning up all connections...");
706
707        // Step 1: Clear pending ICE restart attempts
708        if let Some(ref coordinator) = self.webrtc_coordinator {
709            tracing::info!("♻️  Clearing pending ICE restart attempts...");
710            coordinator.clear_pending_restarts().await;
711        }
712
713        // Step 2: Cancel PeerTransport singleflight and close established transports.
714        if let Some(ref peer_transport) = self.peer_transport {
715            tracing::info!("🔻 Closing all PeerTransport connections...");
716            if let Err(e) = peer_transport.close_all().await {
717                let err_msg = format!("Failed to close peer transports: {}", e);
718                tracing::warn!("⚠️  {}", err_msg);
719                // Do not fail the whole cleanup; continue releasing other resources.
720            } else {
721                tracing::info!("✅ All PeerTransport connections closed");
722            }
723        }
724
725        // Step 3: Close all WebRTC peer connections
726        if let Some(ref coordinator) = self.webrtc_coordinator {
727            tracing::info!("🔻 Closing all WebRTC peer connections...");
728            if let Err(e) = coordinator.close_all_peers().await {
729                let err_msg = format!("Failed to close all peers: {}", e);
730                tracing::warn!("⚠️  {}", err_msg);
731                // Do not fail the whole cleanup; continue releasing other resources.
732            } else {
733                tracing::info!("✅ All WebRTC peer connections closed");
734            }
735        }
736
737        // Step 4: Proactively disconnect the WebSocket.
738        tracing::info!("🔌 Disconnecting WebSocket...");
739        match self.signaling_client.disconnect().await {
740            Ok(_) => {
741                tracing::info!("✅ WebSocket disconnected successfully");
742            }
743            Err(e) => {
744                let err_msg = format!("Failed to disconnect WebSocket: {}", e);
745                tracing::warn!("⚠️  {}", err_msg);
746                // Do not fail the whole cleanup; continue releasing other resources.
747            }
748        }
749
750        tracing::info!("✅ Connection cleanup completed");
751
752        Ok(())
753    }
754
755    async fn probe_connectivity(&self) -> Result<(), String> {
756        self.signaling_client
757            .probe_alive(SIGNALING_PROBE_TIMEOUT)
758            .await
759            .map_err(|e| format!("Signaling probe failed: {}", e))
760    }
761
762    async fn force_reconnect(&self) -> Result<(), String> {
763        self.cleanup_connections().await?;
764        let result = self.restore_signaling_and_webrtc("ForceReconnect").await;
765        if let Err(err) = &result {
766            tracing::warn!(
767                error = %err,
768                "ForceReconnect restore failed; scheduling signaling auto-reconnect"
769            );
770            self.signaling_client.schedule_auto_reconnect();
771        }
772        result
773    }
774
775    async fn process_network_recovery_action(
776        &self,
777        action: NetworkRecoveryAction,
778    ) -> Result<(), String> {
779        match action {
780            NetworkRecoveryAction::Noop => Ok(()),
781            NetworkRecoveryAction::Offline => self.process_offline().await,
782            NetworkRecoveryAction::Probe => self.probe_or_restore("Probe").await,
783            NetworkRecoveryAction::Restore => {
784                self.restore_signaling_and_webrtc("NetworkEventBatch").await
785            }
786            NetworkRecoveryAction::CleanupOnly => self.cleanup_connections().await,
787            NetworkRecoveryAction::ForceReconnect => self.force_reconnect().await,
788        }
789    }
790}
791
792pub fn select_network_recovery_action(events: &[NetworkEvent]) -> NetworkRecoveryAction {
793    ConnectionSupervisor::select_action(events)
794}
795
796pub async fn process_network_event_batch(
797    events: Vec<NetworkEvent>,
798    processor: Arc<dyn NetworkEventProcessor>,
799) -> Vec<NetworkEventResult> {
800    if events.is_empty() {
801        return Vec::new();
802    }
803
804    let action = select_network_recovery_action(&events);
805    let start = Instant::now();
806
807    tracing::info!(
808        event_count = events.len(),
809        action = ?action,
810        "network_event.action.start"
811    );
812
813    let result = processor.process_network_recovery_action(action).await;
814
815    let duration_ms = start.elapsed().as_millis() as u64;
816    match &result {
817        Ok(()) => tracing::info!(
818            event_count = events.len(),
819            action = ?action,
820            duration_ms,
821            "network_event.action.completed"
822        ),
823        Err(e) => tracing::warn!(
824            event_count = events.len(),
825            action = ?action,
826            duration_ms,
827            error = %e,
828            "network_event.action.completed"
829        ),
830    }
831
832    events
833        .into_iter()
834        .map(|event| match &result {
835            Ok(()) => NetworkEventResult::success(event, duration_ms),
836            Err(e) => NetworkEventResult::failure(event, e.clone(), duration_ms),
837        })
838        .collect()
839}
840
841pub struct NetworkEventRequest {
842    pub event: NetworkEvent,
843    pub result_tx: oneshot::Sender<NetworkEventResult>,
844}
845
846pub async fn run_network_event_reconciler(
847    mut event_rx: mpsc::Receiver<NetworkEventRequest>,
848    processor: Arc<dyn NetworkEventProcessor>,
849    shutdown_token: CancellationToken,
850) {
851    tracing::info!("🔄 Network event reconciler started");
852
853    loop {
854        tokio::select! {
855            Some(first_request) = event_rx.recv() => {
856                tracing::debug!(
857                    event = ?first_request.event,
858                    "network_event.reconciler.received"
859                );
860                let mut event_barrier = processor.begin_network_event_barrier(&first_request.event);
861                let mut requests = vec![first_request];
862                let settle = tokio::time::sleep(NETWORK_EVENT_SETTLE_WINDOW);
863                tokio::pin!(settle);
864
865                loop {
866                    tokio::select! {
867                        Some(next_request) = event_rx.recv() => {
868                            tracing::debug!(
869                                event = ?next_request.event,
870                                "network_event.reconciler.coalesced"
871                            );
872                            if event_barrier.is_none() {
873                                event_barrier = processor.begin_network_event_barrier(&next_request.event);
874                            }
875                            requests.push(next_request);
876                        }
877                        _ = &mut settle => {
878                            break;
879                        }
880                        _ = shutdown_token.cancelled() => {
881                            tracing::info!("🛑 Network event reconciler shutting down");
882                            return;
883                        }
884                        else => {
885                            break;
886                        }
887                    }
888                }
889
890                while let Ok(next_request) = event_rx.try_recv() {
891                    tracing::debug!(
892                        event = ?next_request.event,
893                        "network_event.reconciler.coalesced"
894                    );
895                    if event_barrier.is_none() {
896                        event_barrier = processor.begin_network_event_barrier(&next_request.event);
897                    }
898                    requests.push(next_request);
899                }
900
901                let events = requests
902                    .iter()
903                    .map(|request| request.event.clone())
904                    .collect::<Vec<_>>();
905                let action = select_network_recovery_action(&events);
906                let facts = events
907                    .iter()
908                    .map(ConnectionFact::from_network_event)
909                    .collect::<Vec<_>>();
910                tracing::info!(
911                    event_count = events.len(),
912                    action = ?action,
913                    events = ?events,
914                    facts = ?facts,
915                    settle_window_ms = NETWORK_EVENT_SETTLE_WINDOW.as_millis() as u64,
916                    "network_event.reconciler.batch_reconciled"
917                );
918
919                let results = process_network_event_batch(events, processor.clone()).await;
920                drop(event_barrier);
921
922                for (request, result) in requests.into_iter().zip(results) {
923                    if request.result_tx.send(result).is_err() {
924                        tracing::debug!("Network event caller dropped before receiving result");
925                    }
926                }
927            }
928            _ = shutdown_token.cancelled() => {
929                tracing::info!("🛑 Network event reconciler shutting down");
930                break;
931            }
932            else => break,
933        }
934    }
935}
936
937/// Network Event Handle
938///
939/// Lightweight handle for sending network events and receiving processing results.
940/// Created before `ActrNode::start()` to bridge platform network events.
941pub struct NetworkEventHandle {
942    /// Event sender (to ActrNode)
943    event_tx: mpsc::Sender<NetworkEventRequest>,
944    result_timeout: Duration,
945}
946
947impl NetworkEventHandle {
948    /// Create a new NetworkEventHandle
949    pub fn new(event_tx: mpsc::Sender<NetworkEventRequest>) -> Self {
950        Self::new_with_result_timeout(event_tx, NETWORK_EVENT_RESULT_TIMEOUT)
951    }
952
953    /// Create a new NetworkEventHandle with a custom result timeout.
954    ///
955    /// Production bindings use [`NetworkEventHandle::new`]. Tests can use this
956    /// constructor to verify bounded waiting without sleeping for the full
957    /// binding timeout.
958    pub fn new_with_result_timeout(
959        event_tx: mpsc::Sender<NetworkEventRequest>,
960        result_timeout: Duration,
961    ) -> Self {
962        Self {
963            event_tx,
964            result_timeout,
965        }
966    }
967
968    /// Handle full network path changes.
969    pub async fn handle_network_path_changed(
970        &self,
971        snapshot: NetworkSnapshot,
972    ) -> Result<NetworkEventResult, String> {
973        self.send_event_and_await_result(NetworkEvent::NetworkPathChanged { snapshot })
974            .await
975    }
976
977    /// Handle app lifecycle changes.
978    pub async fn handle_app_lifecycle_changed(
979        &self,
980        state: AppLifecycleState,
981    ) -> Result<NetworkEventResult, String> {
982        self.send_event_and_await_result(NetworkEvent::AppLifecycleChanged { state })
983            .await
984    }
985
986    /// Proactively clean up all connections with a reason. This never reconnects.
987    pub async fn cleanup_connections(
988        &self,
989        reason: CleanupReason,
990    ) -> Result<NetworkEventResult, String> {
991        self.send_event_and_await_result(NetworkEvent::CleanupConnections { reason })
992            .await
993    }
994
995    /// Force cleanup and reconnect.
996    pub async fn force_reconnect(
997        &self,
998        reason: ReconnectReason,
999    ) -> Result<NetworkEventResult, String> {
1000        self.send_event_and_await_result(NetworkEvent::ForceReconnect { reason })
1001            .await
1002    }
1003
1004    /// Send event and await result (internal helper)
1005    async fn send_event_and_await_result(
1006        &self,
1007        event: NetworkEvent,
1008    ) -> Result<NetworkEventResult, String> {
1009        let event_request_id = NEXT_NETWORK_EVENT_REQUEST_ID.fetch_add(1, Ordering::Relaxed);
1010        let start = Instant::now();
1011        let (result_tx, result_rx) = oneshot::channel();
1012        let request = NetworkEventRequest {
1013            event: event.clone(),
1014            result_tx,
1015        };
1016
1017        tracing::info!(
1018            event_request_id,
1019            event = ?event,
1020            result_timeout_ms = self.result_timeout.as_millis() as u64,
1021            "network_event.handle.enqueue"
1022        );
1023
1024        if let Err(e) = self.event_tx.send(request).await {
1025            let err = format!("Failed to send network event: {}", e);
1026            tracing::warn!(
1027                event_request_id,
1028                event = ?event,
1029                error = %err,
1030                "network_event.handle.enqueue_failed"
1031            );
1032            return Err(err);
1033        }
1034
1035        let result = match tokio::time::timeout(self.result_timeout, result_rx).await {
1036            Ok(Ok(result)) => Ok(result),
1037            Ok(Err(_)) => Err("Failed to receive network event result".to_string()),
1038            Err(_) => Err(format!(
1039                "Timed out waiting for network event result after {}ms",
1040                self.result_timeout.as_millis()
1041            )),
1042        };
1043
1044        let wait_ms = start.elapsed().as_millis() as u64;
1045        match &result {
1046            Ok(result) if result.success => tracing::info!(
1047                event_request_id,
1048                event = ?event,
1049                result_event = ?result.event,
1050                duration_ms = result.duration_ms,
1051                wait_ms,
1052                "network_event.handle.result_received"
1053            ),
1054            Ok(result) => tracing::warn!(
1055                event_request_id,
1056                event = ?event,
1057                result_event = ?result.event,
1058                duration_ms = result.duration_ms,
1059                wait_ms,
1060                error = ?result.error,
1061                "network_event.handle.result_received"
1062            ),
1063            Err(e) => tracing::warn!(
1064                event_request_id,
1065                event = ?event,
1066                wait_ms,
1067                error = %e,
1068                "network_event.handle.result_failed"
1069            ),
1070        }
1071
1072        result
1073    }
1074}
1075
1076impl Clone for NetworkEventHandle {
1077    fn clone(&self) -> Self {
1078        Self {
1079            event_tx: self.event_tx.clone(),
1080            result_timeout: self.result_timeout,
1081        }
1082    }
1083}
1084
1085#[cfg(test)]
1086mod tests {
1087    use super::*;
1088    use crate::lifecycle::CredentialState;
1089    use crate::transport::{NetworkError, NetworkResult};
1090    use crate::wire::webrtc::{SignalingEvent, SignalingStats};
1091    use actr_protocol::{
1092        AIdCredential, ActrId, Pong, RegisterRequest, RegisterResponse, RouteCandidatesRequest,
1093        RouteCandidatesResponse, ServiceAvailabilityState, SignalingEnvelope, UnregisterResponse,
1094    };
1095    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering as AtomicOrdering};
1096    use tokio::sync::broadcast;
1097
1098    struct ForceReconnectFakeSignalingClient {
1099        connected: AtomicBool,
1100        connect_once_should_fail: bool,
1101        disconnect_calls: AtomicUsize,
1102        connect_once_calls: AtomicUsize,
1103        schedule_auto_reconnect_calls: AtomicUsize,
1104        event_tx: broadcast::Sender<SignalingEvent>,
1105    }
1106
1107    impl ForceReconnectFakeSignalingClient {
1108        fn new(connect_once_should_fail: bool) -> Self {
1109            let (event_tx, _rx) = broadcast::channel(8);
1110            Self {
1111                connected: AtomicBool::new(false),
1112                connect_once_should_fail,
1113                disconnect_calls: AtomicUsize::new(0),
1114                connect_once_calls: AtomicUsize::new(0),
1115                schedule_auto_reconnect_calls: AtomicUsize::new(0),
1116                event_tx,
1117            }
1118        }
1119    }
1120
1121    #[async_trait::async_trait]
1122    impl SignalingClient for ForceReconnectFakeSignalingClient {
1123        async fn connect(&self) -> NetworkResult<()> {
1124            Ok(())
1125        }
1126
1127        async fn connect_once(&self) -> NetworkResult<()> {
1128            self.connect_once_calls.fetch_add(1, AtomicOrdering::SeqCst);
1129            if self.connect_once_should_fail {
1130                return Err(NetworkError::ConnectionError(
1131                    "forced connect_once failure".to_string(),
1132                ));
1133            }
1134
1135            self.connected.store(true, AtomicOrdering::SeqCst);
1136            Ok(())
1137        }
1138
1139        fn schedule_auto_reconnect(&self) {
1140            self.schedule_auto_reconnect_calls
1141                .fetch_add(1, AtomicOrdering::SeqCst);
1142        }
1143
1144        async fn disconnect(&self) -> NetworkResult<()> {
1145            self.disconnect_calls.fetch_add(1, AtomicOrdering::SeqCst);
1146            self.connected.store(false, AtomicOrdering::SeqCst);
1147            Ok(())
1148        }
1149
1150        async fn send_register_request(
1151            &self,
1152            _request: RegisterRequest,
1153        ) -> NetworkResult<RegisterResponse> {
1154            Err(NetworkError::ConnectionError("unused".to_string()))
1155        }
1156
1157        async fn send_unregister_request(
1158            &self,
1159            _actor_id: ActrId,
1160            _credential: AIdCredential,
1161            _reason: Option<String>,
1162        ) -> NetworkResult<UnregisterResponse> {
1163            Err(NetworkError::ConnectionError("unused".to_string()))
1164        }
1165
1166        async fn send_heartbeat(
1167            &self,
1168            _actor_id: ActrId,
1169            _credential: AIdCredential,
1170            _availability: ServiceAvailabilityState,
1171            _power_reserve: f32,
1172            _mailbox_backlog: f32,
1173        ) -> NetworkResult<Pong> {
1174            Err(NetworkError::ConnectionError("unused".to_string()))
1175        }
1176
1177        async fn send_route_candidates_request(
1178            &self,
1179            _actor_id: ActrId,
1180            _credential: AIdCredential,
1181            _request: RouteCandidatesRequest,
1182        ) -> NetworkResult<RouteCandidatesResponse> {
1183            Err(NetworkError::ConnectionError("unused".to_string()))
1184        }
1185
1186        async fn get_signing_key(
1187            &self,
1188            _actor_id: ActrId,
1189            _credential: AIdCredential,
1190            _key_id: u32,
1191        ) -> NetworkResult<(u32, Vec<u8>)> {
1192            Err(NetworkError::ConnectionError("unused".to_string()))
1193        }
1194
1195        async fn send_credential_update_request(
1196            &self,
1197            _actor_id: ActrId,
1198            _credential: AIdCredential,
1199        ) -> NetworkResult<RegisterResponse> {
1200            Err(NetworkError::ConnectionError("unused".to_string()))
1201        }
1202
1203        async fn send_envelope(&self, _envelope: SignalingEnvelope) -> NetworkResult<()> {
1204            Err(NetworkError::ConnectionError("unused".to_string()))
1205        }
1206
1207        async fn receive_envelope(&self) -> NetworkResult<Option<SignalingEnvelope>> {
1208            Err(NetworkError::ConnectionError("unused".to_string()))
1209        }
1210
1211        fn is_connected(&self) -> bool {
1212            self.connected.load(AtomicOrdering::SeqCst)
1213        }
1214
1215        fn get_stats(&self) -> SignalingStats {
1216            SignalingStats::default()
1217        }
1218
1219        fn subscribe_events(&self) -> broadcast::Receiver<SignalingEvent> {
1220            self.event_tx.subscribe()
1221        }
1222
1223        async fn set_actor_id(&self, _actor_id: ActrId) {}
1224
1225        async fn set_credential_state(&self, _credential_state: CredentialState) {}
1226
1227        async fn clear_identity(&self) {}
1228    }
1229
1230    fn snapshot(sequence: u64, availability: NetworkAvailability) -> NetworkSnapshot {
1231        NetworkSnapshot {
1232            sequence,
1233            availability,
1234            transport: NetworkTransportFlags::default(),
1235            is_expensive: false,
1236            is_constrained: false,
1237        }
1238    }
1239
1240    #[test]
1241    fn lifecycle_barrier_is_scoped_to_events_that_change_connections() {
1242        let cases = [
1243            (
1244                NetworkEvent::NetworkPathChanged {
1245                    snapshot: snapshot(1, NetworkAvailability::Unavailable),
1246                },
1247                true,
1248            ),
1249            (
1250                NetworkEvent::NetworkPathChanged {
1251                    snapshot: snapshot(2, NetworkAvailability::Available),
1252                },
1253                true,
1254            ),
1255            (
1256                NetworkEvent::NetworkPathChanged {
1257                    snapshot: snapshot(3, NetworkAvailability::Unknown),
1258                },
1259                false,
1260            ),
1261            (
1262                NetworkEvent::AppLifecycleChanged {
1263                    state: AppLifecycleState::Background,
1264                },
1265                false,
1266            ),
1267            (
1268                NetworkEvent::AppLifecycleChanged {
1269                    state: AppLifecycleState::Foreground {
1270                        background_duration_ms: LONG_BACKGROUND_RECONNECT_THRESHOLD_MS - 1,
1271                    },
1272                },
1273                false,
1274            ),
1275            (
1276                NetworkEvent::AppLifecycleChanged {
1277                    state: AppLifecycleState::Foreground {
1278                        background_duration_ms: LONG_BACKGROUND_RECONNECT_THRESHOLD_MS,
1279                    },
1280                },
1281                true,
1282            ),
1283            (
1284                NetworkEvent::CleanupConnections {
1285                    reason: CleanupReason::ManualReset,
1286                },
1287                true,
1288            ),
1289            (
1290                NetworkEvent::ForceReconnect {
1291                    reason: ReconnectReason::ManualReconnect,
1292                },
1293                true,
1294            ),
1295        ];
1296
1297        for (event, expected) in cases {
1298            assert_eq!(
1299                network_event_needs_lifecycle_barrier(&event),
1300                expected,
1301                "{event:?}"
1302            );
1303        }
1304    }
1305
1306    #[tokio::test]
1307    async fn force_reconnect_failure_schedules_auto_reconnect() {
1308        let signaling = Arc::new(ForceReconnectFakeSignalingClient::new(true));
1309        let processor = DefaultNetworkEventProcessor::new(signaling.clone(), None);
1310
1311        let result = processor.force_reconnect().await;
1312
1313        assert!(result.is_err());
1314        assert_eq!(
1315            signaling.disconnect_calls.load(AtomicOrdering::SeqCst),
1316            1,
1317            "ForceReconnect cleanup should disconnect signaling once"
1318        );
1319        assert_eq!(
1320            signaling.connect_once_calls.load(AtomicOrdering::SeqCst),
1321            1,
1322            "ForceReconnect restore should make one quick connect attempt"
1323        );
1324        assert_eq!(
1325            signaling
1326                .schedule_auto_reconnect_calls
1327                .load(AtomicOrdering::SeqCst),
1328            1,
1329            "failed ForceReconnect restore should wake the auto-reconnect manager"
1330        );
1331    }
1332}