Skip to main content

actr_hyper/lifecycle/
network_event.rs

1//! Network Event Handling Architecture
2//!
3//! This module defines the network event handling infrastructure.
4//!
5//! # Architecture Overview
6//!
7//! ```text
8//!        ┌─────────────────────────────────────────────┐
9//!        │ (FFI Path - Implemented)  (Actor Path - TODO)
10//!        ▼                                             ▼
11//! ┌──────────────────────────┐      ┌──────────────────────────┐
12//! │ NetworkEventHandle       │      │ Direct Proto Message     │
13//! │ • Platform FFI calls     │      │ • Actor call/tell        │
14//! │ • Send via channel       │      │ • Send to actor mailbox  │
15//! │ • Await result           │      │ • No handle needed       │
16//! └────────┬─────────────────┘      └──────┬───────────────────┘
17//!          │                               │
18//!          └───────────────┬───────────────┘
19//!                          │ Both trigger
20//!                          ▼
21//! ┌─────────────────────────────────────────────────────────┐
22//! │  ActrNode::network_event_loop()                         │
23//! │  • Receive event from channel (FFI path)                │
24//! │  • Or handle message directly (Actor path - TODO)       │
25//! │  • Delegate to NetworkEventProcessor                    │
26//! │  • Send result back via channel                         │
27//! └──────────────────────┬──────────────────────────────────┘
28//!                        │ Delegate
29//!                        ▼
30//! ┌─────────────────────────────────────────────────────────┐
31//! │  NetworkEventProcessor (Trait)                          │
32//! │                                                          │
33//! │  DefaultNetworkEventProcessor:                          │
34//! │  • reconcile settled network/app events                 │
35//! │  • execute one recovery action                          │
36//! │    └─► Offline / Probe / Restore / Cleanup / Reconnect  │
37//! └─────────────────────────────────────────────────────────┘
38//! ```
39//!
40//! # Key Components
41//!
42//! - **NetworkEvent**: Unified mobile network/app/command events
43//! - **NetworkEventResult**: Processing result with success/error/duration
44//! - **NetworkEventProcessor**: Trait for custom event handling logic
45//! - **DefaultNetworkEventProcessor**: Default implementation with signaling + WebRTC recovery
46//!
47//! # Usage Patterns
48//!
49//! ## 1. Platform FFI Call (Primary, Implemented)
50//! ```ignore
51//! // Platform layer calls NetworkEventHandle via FFI
52//! let network_handle = system.create_network_event_handle();
53//! let result = network_handle.handle_network_path_changed(snapshot).await?;
54//! if result.success {
55//!     println!("Processed in {}ms", result.duration_ms);
56//! }
57//! ```
58//!
59//! ## 2. Actor Proto Message (Optional, TODO)
60//! ```ignore
61//! // TODO: actors send proto message directly (not yet implemented)
62//! actor_ref.call(NetworkPathChangedMessage { snapshot }).await?;
63//! ```
64//!
65//! **Key Differences:**
66//! - FFI path: Uses NetworkEventHandle + channel (implemented)
67//! - Actor path: Direct proto message to mailbox (TODO, future enhancement)
68
69use std::sync::{
70    Arc,
71    atomic::{AtomicU64, Ordering},
72};
73use std::time::{Duration, Instant};
74
75use crate::transport::PeerTransport;
76use crate::wire::webrtc::{CleanupGuard, SignalingClient, WebRtcCoordinator};
77use tokio::sync::{mpsc, oneshot};
78use tokio_util::sync::CancellationToken;
79
80use super::connection_supervisor::{ConnectionFact, ConnectionSupervisor};
81
82const NETWORK_EVENT_SETTLE_WINDOW: Duration = Duration::from_millis(400);
83const NETWORK_EVENT_RESULT_TIMEOUT: Duration = Duration::from_secs(5);
84const SIGNALING_PROBE_TIMEOUT: Duration = Duration::from_secs(1);
85pub(super) const LONG_BACKGROUND_RECONNECT_THRESHOLD_MS: u64 = 30_000;
86static NEXT_NETWORK_EVENT_REQUEST_ID: AtomicU64 = AtomicU64::new(1);
87
88/// Keeps outbound sends behind a network-event lifecycle barrier while the
89/// reconciler settles and processes a queued batch.
90pub struct NetworkEventBarrier {
91    _cleanup_guard: CleanupGuard,
92}
93
94/// Mobile network path snapshot.
95#[derive(Debug, Clone, PartialEq, Eq, Hash)]
96pub struct NetworkSnapshot {
97    pub sequence: u64,
98    pub availability: NetworkAvailability,
99    pub transport: NetworkTransportFlags,
100    pub is_expensive: bool,
101    pub is_constrained: bool,
102}
103
104impl NetworkSnapshot {
105    pub fn is_offline(&self) -> bool {
106        matches!(self.availability, NetworkAvailability::Unavailable)
107    }
108
109    pub fn should_restore(&self) -> bool {
110        matches!(self.availability, NetworkAvailability::Available)
111    }
112}
113
114/// Whether the platform currently has a usable network path.
115#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
116pub enum NetworkAvailability {
117    Unknown,
118    Available,
119    Unavailable,
120}
121
122/// Active network transport flags. Multiple flags can be true at the same time.
123#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
124pub struct NetworkTransportFlags {
125    pub wifi: bool,
126    pub cellular: bool,
127    pub ethernet: bool,
128    pub vpn: bool,
129    pub other: bool,
130}
131
132/// App lifecycle state relevant to connection recovery.
133#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
134pub enum AppLifecycleState {
135    Background,
136    Foreground { background_duration_ms: u64 },
137}
138
139/// Reason for a cleanup-only operation.
140#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
141pub enum CleanupReason {
142    AppTerminating,
143    UserLogout,
144    StaleConnectionSuspected,
145    ManualReset,
146}
147
148/// Reason for a forced cleanup + restore operation.
149#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
150pub enum ReconnectReason {
151    NetworkPathChanged,
152    LongBackground,
153    ProbeFailed,
154    ManualReconnect,
155    StaleConnectionSuspected,
156}
157
158/// Network event type
159#[derive(Debug, Clone, PartialEq, Eq, Hash)]
160pub enum NetworkEvent {
161    /// Full mobile network path changed.
162    NetworkPathChanged { snapshot: NetworkSnapshot },
163
164    /// App lifecycle changed.
165    AppLifecycleChanged { state: AppLifecycleState },
166
167    /// Proactively clean up all connections
168    ///
169    /// Used for app lifecycle management scenarios:
170    /// - App entering background
171    /// - User actively logging out
172    /// - App about to exit
173    CleanupConnections { reason: CleanupReason },
174
175    /// Proactively clean up and restore connections.
176    ForceReconnect { reason: ReconnectReason },
177}
178
179/// Final action selected from a settled batch of network events.
180#[derive(Debug, Clone, Copy, PartialEq, Eq)]
181pub enum NetworkRecoveryAction {
182    Noop,
183    Offline,
184    Probe,
185    Restore,
186    CleanupOnly,
187    ForceReconnect,
188}
189
190/// Network event processing result
191#[derive(Debug, Clone)]
192pub struct NetworkEventResult {
193    /// Event type
194    pub event: NetworkEvent,
195
196    /// Whether processing succeeded
197    pub success: bool,
198
199    /// Error message (if failed)
200    pub error: Option<String>,
201
202    /// Processing duration (milliseconds)
203    pub duration_ms: u64,
204}
205
206impl NetworkEventResult {
207    pub fn success(event: NetworkEvent, duration_ms: u64) -> Self {
208        Self {
209            event,
210            success: true,
211            error: None,
212            duration_ms,
213        }
214    }
215
216    pub fn failure(event: NetworkEvent, error: String, duration_ms: u64) -> Self {
217        Self {
218            event,
219            success: false,
220            error: Some(error),
221            duration_ms,
222        }
223    }
224}
225
226/// Network event processor trait
227///
228/// Defines the processing logic for network events; can be custom-implemented by users
229#[async_trait::async_trait]
230pub trait NetworkEventProcessor: Send + Sync {
231    /// Enter a lifecycle barrier as soon as a queued event is observed by the
232    /// reconciler. The default is no barrier for custom processors.
233    fn begin_network_event_barrier(&self, _event: &NetworkEvent) -> Option<NetworkEventBarrier> {
234        None
235    }
236
237    /// Process network available event
238    ///
239    /// # Returns
240    /// - `Ok(())`: processing succeeded
241    /// - `Err(String)`: processing failed, contains error message
242    async fn process_network_available(&self) -> Result<(), String>;
243
244    /// Process network lost event
245    ///
246    /// # Returns
247    /// - `Ok(())`: processing succeeded
248    /// - `Err(String)`: processing failed, contains error message
249    async fn process_network_lost(&self) -> Result<(), String>;
250
251    /// Process network type changed event
252    ///
253    /// # Returns
254    /// - `Ok(())`: processing succeeded
255    /// - `Err(String)`: processing failed, contains error message
256    async fn process_network_type_changed(
257        &self,
258        is_wifi: bool,
259        is_cellular: bool,
260    ) -> Result<(), String>;
261
262    /// Proactively clean up all connections
263    ///
264    /// This method proactively cleans up all network connections. Applicable scenarios:
265    /// - App entering background (iOS/Android)
266    /// - User actively logging out
267    /// - App about to exit
268    /// - Need to reset network state
269    ///
270    /// # FFI Binding Note
271    ///
272    /// This method is specifically designed for FFI bindings, allowing upper-layer
273    /// platform code (Swift/Kotlin) to proactively manage connection lifecycle
274    /// through the unified `NetworkEventProcessor` interface.
275    ///
276    /// # Difference from Event Response
277    ///
278    /// - `process_network_lost()`: passively responds to network disconnection events
279    /// - `cleanup_connections()`: proactively cleans up connections (independent of network events)
280    ///
281    /// # Returns
282    /// - `Ok(())`: cleanup succeeded
283    /// - `Err(String)`: cleanup failed, contains error message
284    async fn cleanup_connections(&self) -> Result<(), String>;
285
286    /// Probe existing connectivity without forcing cleanup.
287    async fn probe_connectivity(&self) -> Result<(), String> {
288        Ok(())
289    }
290
291    /// Proactively clean up and restore connections.
292    async fn force_reconnect(&self) -> Result<(), String> {
293        self.cleanup_connections().await?;
294        self.process_network_available().await
295    }
296
297    /// Process the final action selected from a settled event batch.
298    ///
299    /// Custom processors can rely on the default mapping. The default runtime
300    /// processor overrides this to bypass per-event debounce after reconciliation.
301    async fn process_network_recovery_action(
302        &self,
303        action: NetworkRecoveryAction,
304    ) -> Result<(), String> {
305        match action {
306            NetworkRecoveryAction::Noop => Ok(()),
307            NetworkRecoveryAction::Offline => self.process_network_lost().await,
308            NetworkRecoveryAction::Probe => self.probe_connectivity().await,
309            NetworkRecoveryAction::Restore => self.process_network_available().await,
310            NetworkRecoveryAction::CleanupOnly => self.cleanup_connections().await,
311            NetworkRecoveryAction::ForceReconnect => self.force_reconnect().await,
312        }
313    }
314}
315
316/// Debounce configuration
317#[derive(Debug, Clone)]
318pub struct DebounceConfig {
319    /// Debounce time window (duplicate events within this window are ignored)
320    pub window: Duration,
321}
322
323impl Default for DebounceConfig {
324    fn default() -> Self {
325        Self {
326            // Default debounce window
327            window: Duration::from_secs(2),
328        }
329    }
330}
331
332/// Debounce state tracking
333#[derive(Debug)]
334struct DebounceState {
335    last_available: tokio::sync::Mutex<Option<Instant>>,
336    last_lost: tokio::sync::Mutex<Option<Instant>>,
337    last_type_changed: tokio::sync::Mutex<Option<Instant>>,
338}
339
340impl DebounceState {
341    fn new() -> Self {
342        Self {
343            last_available: tokio::sync::Mutex::new(None),
344            last_lost: tokio::sync::Mutex::new(None),
345            last_type_changed: tokio::sync::Mutex::new(None),
346        }
347    }
348}
349
350#[derive(Debug, Clone, Copy, PartialEq, Eq)]
351enum DebounceEvent {
352    Available,
353    Lost,
354    TypeChanged,
355}
356
357#[derive(Debug)]
358struct SignalingRecoveryState {
359    connect_lock: tokio::sync::Mutex<()>,
360    last_successful_connect: tokio::sync::Mutex<Option<Instant>>,
361}
362
363impl SignalingRecoveryState {
364    fn new() -> Self {
365        Self {
366            connect_lock: tokio::sync::Mutex::new(()),
367            last_successful_connect: tokio::sync::Mutex::new(None),
368        }
369    }
370}
371
372/// Default network event processor implementation
373pub struct DefaultNetworkEventProcessor {
374    signaling_client: Arc<dyn SignalingClient>,
375    webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
376    peer_transport: Option<Arc<PeerTransport>>,
377    debounce_config: DebounceConfig,
378    debounce_state: Arc<DebounceState>,
379    recovery_state: Arc<SignalingRecoveryState>,
380}
381
382impl DefaultNetworkEventProcessor {
383    pub fn new(
384        signaling_client: Arc<dyn SignalingClient>,
385        webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
386    ) -> Self {
387        Self::new_with_debounce_and_peer_transport(
388            signaling_client,
389            webrtc_coordinator,
390            DebounceConfig::default(),
391            None,
392        )
393    }
394
395    pub fn new_with_debounce(
396        signaling_client: Arc<dyn SignalingClient>,
397        webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
398        debounce_config: DebounceConfig,
399    ) -> Self {
400        Self::new_with_debounce_and_peer_transport(
401            signaling_client,
402            webrtc_coordinator,
403            debounce_config,
404            None,
405        )
406    }
407
408    pub(crate) fn new_with_peer_transport(
409        signaling_client: Arc<dyn SignalingClient>,
410        webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
411        peer_transport: Option<Arc<PeerTransport>>,
412    ) -> Self {
413        Self::new_with_debounce_and_peer_transport(
414            signaling_client,
415            webrtc_coordinator,
416            DebounceConfig::default(),
417            peer_transport,
418        )
419    }
420
421    pub(crate) fn new_with_debounce_and_peer_transport(
422        signaling_client: Arc<dyn SignalingClient>,
423        webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
424        debounce_config: DebounceConfig,
425        peer_transport: Option<Arc<PeerTransport>>,
426    ) -> Self {
427        Self {
428            signaling_client,
429            webrtc_coordinator,
430            peer_transport,
431            debounce_config,
432            debounce_state: Arc::new(DebounceState::new()),
433            recovery_state: Arc::new(SignalingRecoveryState::new()),
434        }
435    }
436
437    /// Check whether an event should be filtered by debounce
438    ///
439    /// # Returns
440    /// - `true`: the event should be processed
441    /// - `false`: the event is within the debounce window and should be ignored
442    async fn should_process_event(&self, event: DebounceEvent) -> bool {
443        let now = Instant::now();
444
445        match event {
446            DebounceEvent::Available => {
447                let mut last = self.debounce_state.last_available.lock().await;
448                if let Some(last_time) = *last {
449                    if now.duration_since(last_time) < self.debounce_config.window {
450                        tracing::debug!(
451                            "⏸️  Debouncing Network Available event (last event was {:?} ago)",
452                            now.duration_since(last_time)
453                        );
454                        return false;
455                    }
456                }
457                *last = Some(now);
458                true
459            }
460            DebounceEvent::Lost => {
461                let mut last = self.debounce_state.last_lost.lock().await;
462                if let Some(last_time) = *last {
463                    if now.duration_since(last_time) < self.debounce_config.window {
464                        tracing::debug!(
465                            "⏸️  Debouncing Network Lost event (last event was {:?} ago)",
466                            now.duration_since(last_time)
467                        );
468                        return false;
469                    }
470                }
471                *last = Some(now);
472                true
473            }
474            DebounceEvent::TypeChanged => {
475                let mut last = self.debounce_state.last_type_changed.lock().await;
476                if let Some(last_time) = *last {
477                    if now.duration_since(last_time) < self.debounce_config.window {
478                        tracing::debug!(
479                            "⏸️  Debouncing Network TypeChanged event (last event was {:?} ago)",
480                            now.duration_since(last_time)
481                        );
482                        return false;
483                    }
484                }
485                *last = Some(now);
486                true
487            }
488        }
489    }
490
491    async fn ensure_signaling_healthy_once(&self, reason: &str) -> Result<(), String> {
492        let _guard = self.recovery_state.connect_lock.lock().await;
493
494        if !self.signaling_client.is_connected() {
495            tracing::info!(reason = reason, "🔄 Connecting signaling");
496            self.signaling_client.connect_once().await.map_err(|e| {
497                let err_msg = format!("WebSocket connect failed: {}", e);
498                tracing::error!("❌ {}", err_msg);
499                err_msg
500            })?;
501
502            *self.recovery_state.last_successful_connect.lock().await = Some(Instant::now());
503            tracing::info!(reason = reason, "✅ Signaling connected");
504            return Ok(());
505        }
506
507        tracing::debug!(
508            reason = reason,
509            timeout_ms = SIGNALING_PROBE_TIMEOUT.as_millis() as u64,
510            "🔎 Probing existing signaling WebSocket"
511        );
512
513        match self
514            .signaling_client
515            .probe_alive(SIGNALING_PROBE_TIMEOUT)
516            .await
517        {
518            Ok(()) => {
519                tracing::debug!(
520                    reason = reason,
521                    "✅ Signaling probe succeeded; keeping existing WebSocket"
522                );
523                Ok(())
524            }
525            Err(e) => {
526                tracing::warn!(
527                    reason = reason,
528                    "⚠️ Signaling probe failed; rebuilding WebSocket: {}",
529                    e
530                );
531
532                if let Err(disconnect_err) = self.signaling_client.disconnect().await {
533                    tracing::warn!(
534                        reason = reason,
535                        "⚠️ Failed to disconnect unhealthy signaling before rebuild: {}",
536                        disconnect_err
537                    );
538                }
539
540                tracing::info!(reason = reason, "🔄 Rebuilding signaling: connecting");
541                self.signaling_client
542                    .connect_once()
543                    .await
544                    .map_err(|connect_err| {
545                        let err_msg = format!("WebSocket rebuild failed: {}", connect_err);
546                        tracing::error!("❌ {}", err_msg);
547                        err_msg
548                    })?;
549
550                *self.recovery_state.last_successful_connect.lock().await = Some(Instant::now());
551                tracing::info!(reason = reason, "✅ Signaling rebuilt");
552                Ok(())
553            }
554        }
555    }
556
557    async fn restore_signaling_and_webrtc(&self, reason: &str) -> Result<(), String> {
558        let recovery_targets = if let Some(coordinator) = self.webrtc_coordinator.clone() {
559            coordinator.begin_network_recovery(reason).await
560        } else {
561            Vec::new()
562        };
563
564        self.ensure_signaling_healthy_once(reason).await?;
565
566        let coordinator = self.webrtc_coordinator.clone();
567
568        if let Some(coordinator) = coordinator {
569            if recovery_targets.is_empty() {
570                tracing::info!("♻️ Resuming ICE restart for peers already in network recovery");
571            } else {
572                tracing::info!("♻️ Triggering ICE restart for recovering connections...");
573            }
574            coordinator.restart_network_recovery_connections().await;
575        }
576
577        Ok(())
578    }
579
580    async fn probe_or_restore(&self, reason: &str) -> Result<(), String> {
581        match self.probe_connectivity().await {
582            Ok(()) => Ok(()),
583            Err(e) => {
584                tracing::warn!(
585                    reason = reason,
586                    "Connectivity probe failed; restoring connections: {}",
587                    e
588                );
589                self.restore_signaling_and_webrtc(reason).await
590            }
591        }
592    }
593
594    async fn process_offline(&self) -> Result<(), String> {
595        tracing::info!("📱 Processing: Network offline");
596
597        if let Some(ref coordinator) = self.webrtc_coordinator {
598            coordinator.begin_network_recovery("NetworkLost").await;
599            tracing::info!("🧹 Clearing pending ICE restart attempts...");
600            coordinator.clear_pending_restarts().await;
601        }
602
603        tracing::info!("🔌 Disconnecting WebSocket...");
604        let _ = self.signaling_client.disconnect().await;
605
606        Ok(())
607    }
608}
609
610#[async_trait::async_trait]
611impl NetworkEventProcessor for DefaultNetworkEventProcessor {
612    fn begin_network_event_barrier(&self, event: &NetworkEvent) -> Option<NetworkEventBarrier> {
613        match event {
614            NetworkEvent::NetworkPathChanged { .. }
615            | NetworkEvent::AppLifecycleChanged { .. }
616            | NetworkEvent::CleanupConnections { .. }
617            | NetworkEvent::ForceReconnect { .. } => {
618                self.webrtc_coordinator
619                    .as_ref()
620                    .map(|coordinator| NetworkEventBarrier {
621                        _cleanup_guard: coordinator.cleanup_guard(),
622                    })
623            }
624        }
625    }
626
627    /// Process network available event
628    async fn process_network_available(&self) -> Result<(), String> {
629        // Debounce check
630        let should_process = self.should_process_event(DebounceEvent::Available).await;
631        if !should_process && self.signaling_client.is_connected() {
632            return Ok(());
633        }
634
635        tracing::info!("📱 Processing: Network available");
636
637        self.restore_signaling_and_webrtc("NetworkAvailable").await
638    }
639
640    /// Process network lost event
641    async fn process_network_lost(&self) -> Result<(), String> {
642        // Debounce check
643        if !self.should_process_event(DebounceEvent::Lost).await {
644            return Ok(());
645        }
646
647        self.process_offline().await
648    }
649
650    /// Process network type changed event
651    async fn process_network_type_changed(
652        &self,
653        is_wifi: bool,
654        is_cellular: bool,
655    ) -> Result<(), String> {
656        // Debounce check
657        let should_process = self.should_process_event(DebounceEvent::TypeChanged).await;
658        if !should_process && self.signaling_client.is_connected() {
659            return Ok(());
660        }
661
662        tracing::info!(
663            "📱 Processing: Network type changed (WiFi={}, Cellular={})",
664            is_wifi,
665            is_cellular
666        );
667
668        self.restore_signaling_and_webrtc("NetworkTypeChanged")
669            .await
670    }
671
672    /// Proactively clean up all connections
673    ///
674    /// Differs from `process_network_lost()`:
675    /// - No debounce check (proactive calls always execute)
676    /// - Intended for app lifecycle management, not network event response
677    async fn cleanup_connections(&self) -> Result<(), String> {
678        let _cleanup_guard = self
679            .webrtc_coordinator
680            .as_ref()
681            .map(|coordinator| coordinator.cleanup_guard());
682
683        tracing::info!("🧹 Manually cleaning up all connections...");
684
685        // Step 1: Clear pending ICE restart attempts
686        if let Some(ref coordinator) = self.webrtc_coordinator {
687            tracing::info!("♻️  Clearing pending ICE restart attempts...");
688            coordinator.clear_pending_restarts().await;
689        }
690
691        // Step 2: Cancel PeerTransport singleflight and close established transports.
692        if let Some(ref peer_transport) = self.peer_transport {
693            tracing::info!("🔻 Closing all PeerTransport connections...");
694            if let Err(e) = peer_transport.close_all().await {
695                let err_msg = format!("Failed to close peer transports: {}", e);
696                tracing::warn!("⚠️  {}", err_msg);
697                // Do not fail the whole cleanup; continue releasing other resources.
698            } else {
699                tracing::info!("✅ All PeerTransport connections closed");
700            }
701        }
702
703        // Step 3: Close all WebRTC peer connections
704        if let Some(ref coordinator) = self.webrtc_coordinator {
705            tracing::info!("🔻 Closing all WebRTC peer connections...");
706            if let Err(e) = coordinator.close_all_peers().await {
707                let err_msg = format!("Failed to close all peers: {}", e);
708                tracing::warn!("⚠️  {}", err_msg);
709                // Do not fail the whole cleanup; continue releasing other resources.
710            } else {
711                tracing::info!("✅ All WebRTC peer connections closed");
712            }
713        }
714
715        // Step 4: Proactively disconnect the WebSocket.
716        tracing::info!("🔌 Disconnecting WebSocket...");
717        match self.signaling_client.disconnect().await {
718            Ok(_) => {
719                tracing::info!("✅ WebSocket disconnected successfully");
720            }
721            Err(e) => {
722                let err_msg = format!("Failed to disconnect WebSocket: {}", e);
723                tracing::warn!("⚠️  {}", err_msg);
724                // Do not fail the whole cleanup; continue releasing other resources.
725            }
726        }
727
728        tracing::info!("✅ Connection cleanup completed");
729
730        Ok(())
731    }
732
733    async fn probe_connectivity(&self) -> Result<(), String> {
734        self.signaling_client
735            .probe_alive(SIGNALING_PROBE_TIMEOUT)
736            .await
737            .map_err(|e| format!("Signaling probe failed: {}", e))
738    }
739
740    async fn force_reconnect(&self) -> Result<(), String> {
741        self.cleanup_connections().await?;
742        self.restore_signaling_and_webrtc("ForceReconnect").await
743    }
744
745    async fn process_network_recovery_action(
746        &self,
747        action: NetworkRecoveryAction,
748    ) -> Result<(), String> {
749        match action {
750            NetworkRecoveryAction::Noop => Ok(()),
751            NetworkRecoveryAction::Offline => self.process_offline().await,
752            NetworkRecoveryAction::Probe => self.probe_or_restore("Probe").await,
753            NetworkRecoveryAction::Restore => {
754                self.restore_signaling_and_webrtc("NetworkEventBatch").await
755            }
756            NetworkRecoveryAction::CleanupOnly => self.cleanup_connections().await,
757            NetworkRecoveryAction::ForceReconnect => self.force_reconnect().await,
758        }
759    }
760}
761
762pub fn select_network_recovery_action(events: &[NetworkEvent]) -> NetworkRecoveryAction {
763    ConnectionSupervisor::select_action(events)
764}
765
766pub async fn process_network_event_batch(
767    events: Vec<NetworkEvent>,
768    processor: Arc<dyn NetworkEventProcessor>,
769) -> Vec<NetworkEventResult> {
770    if events.is_empty() {
771        return Vec::new();
772    }
773
774    let action = select_network_recovery_action(&events);
775    let start = Instant::now();
776
777    tracing::info!(
778        event_count = events.len(),
779        action = ?action,
780        "network_event.action.start"
781    );
782
783    let result = processor.process_network_recovery_action(action).await;
784
785    let duration_ms = start.elapsed().as_millis() as u64;
786    match &result {
787        Ok(()) => tracing::info!(
788            event_count = events.len(),
789            action = ?action,
790            duration_ms,
791            "network_event.action.completed"
792        ),
793        Err(e) => tracing::warn!(
794            event_count = events.len(),
795            action = ?action,
796            duration_ms,
797            error = %e,
798            "network_event.action.completed"
799        ),
800    }
801
802    events
803        .into_iter()
804        .map(|event| match &result {
805            Ok(()) => NetworkEventResult::success(event, duration_ms),
806            Err(e) => NetworkEventResult::failure(event, e.clone(), duration_ms),
807        })
808        .collect()
809}
810
811pub struct NetworkEventRequest {
812    pub event: NetworkEvent,
813    pub result_tx: oneshot::Sender<NetworkEventResult>,
814}
815
816pub async fn run_network_event_reconciler(
817    mut event_rx: mpsc::Receiver<NetworkEventRequest>,
818    processor: Arc<dyn NetworkEventProcessor>,
819    shutdown_token: CancellationToken,
820) {
821    tracing::info!("🔄 Network event reconciler started");
822
823    loop {
824        tokio::select! {
825            Some(first_request) = event_rx.recv() => {
826                tracing::debug!(
827                    event = ?first_request.event,
828                    "network_event.reconciler.received"
829                );
830                let mut event_barrier = processor.begin_network_event_barrier(&first_request.event);
831                let mut requests = vec![first_request];
832                let settle = tokio::time::sleep(NETWORK_EVENT_SETTLE_WINDOW);
833                tokio::pin!(settle);
834
835                loop {
836                    tokio::select! {
837                        Some(next_request) = event_rx.recv() => {
838                            tracing::debug!(
839                                event = ?next_request.event,
840                                "network_event.reconciler.coalesced"
841                            );
842                            if event_barrier.is_none() {
843                                event_barrier = processor.begin_network_event_barrier(&next_request.event);
844                            }
845                            requests.push(next_request);
846                        }
847                        _ = &mut settle => {
848                            break;
849                        }
850                        _ = shutdown_token.cancelled() => {
851                            tracing::info!("🛑 Network event reconciler shutting down");
852                            return;
853                        }
854                        else => {
855                            break;
856                        }
857                    }
858                }
859
860                while let Ok(next_request) = event_rx.try_recv() {
861                    tracing::debug!(
862                        event = ?next_request.event,
863                        "network_event.reconciler.coalesced"
864                    );
865                    if event_barrier.is_none() {
866                        event_barrier = processor.begin_network_event_barrier(&next_request.event);
867                    }
868                    requests.push(next_request);
869                }
870
871                let events = requests
872                    .iter()
873                    .map(|request| request.event.clone())
874                    .collect::<Vec<_>>();
875                let action = select_network_recovery_action(&events);
876                let facts = events
877                    .iter()
878                    .map(ConnectionFact::from_network_event)
879                    .collect::<Vec<_>>();
880                tracing::info!(
881                    event_count = events.len(),
882                    action = ?action,
883                    events = ?events,
884                    facts = ?facts,
885                    settle_window_ms = NETWORK_EVENT_SETTLE_WINDOW.as_millis() as u64,
886                    "network_event.reconciler.batch_reconciled"
887                );
888
889                let results = process_network_event_batch(events, processor.clone()).await;
890                drop(event_barrier);
891
892                for (request, result) in requests.into_iter().zip(results) {
893                    if request.result_tx.send(result).is_err() {
894                        tracing::debug!("Network event caller dropped before receiving result");
895                    }
896                }
897            }
898            _ = shutdown_token.cancelled() => {
899                tracing::info!("🛑 Network event reconciler shutting down");
900                break;
901            }
902            else => break,
903        }
904    }
905}
906
907/// Network Event Handle
908///
909/// Lightweight handle for sending network events and receiving processing results.
910/// Created before `ActrNode::start()` to bridge platform network events.
911pub struct NetworkEventHandle {
912    /// Event sender (to ActrNode)
913    event_tx: mpsc::Sender<NetworkEventRequest>,
914    result_timeout: Duration,
915}
916
917impl NetworkEventHandle {
918    /// Create a new NetworkEventHandle
919    pub fn new(event_tx: mpsc::Sender<NetworkEventRequest>) -> Self {
920        Self::new_with_result_timeout(event_tx, NETWORK_EVENT_RESULT_TIMEOUT)
921    }
922
923    /// Create a new NetworkEventHandle with a custom result timeout.
924    ///
925    /// Production bindings use [`NetworkEventHandle::new`]. Tests can use this
926    /// constructor to verify bounded waiting without sleeping for the full
927    /// binding timeout.
928    pub fn new_with_result_timeout(
929        event_tx: mpsc::Sender<NetworkEventRequest>,
930        result_timeout: Duration,
931    ) -> Self {
932        Self {
933            event_tx,
934            result_timeout,
935        }
936    }
937
938    /// Handle full network path changes.
939    pub async fn handle_network_path_changed(
940        &self,
941        snapshot: NetworkSnapshot,
942    ) -> Result<NetworkEventResult, String> {
943        self.send_event_and_await_result(NetworkEvent::NetworkPathChanged { snapshot })
944            .await
945    }
946
947    /// Handle app lifecycle changes.
948    pub async fn handle_app_lifecycle_changed(
949        &self,
950        state: AppLifecycleState,
951    ) -> Result<NetworkEventResult, String> {
952        self.send_event_and_await_result(NetworkEvent::AppLifecycleChanged { state })
953            .await
954    }
955
956    /// Proactively clean up all connections with a reason. This never reconnects.
957    pub async fn cleanup_connections(
958        &self,
959        reason: CleanupReason,
960    ) -> Result<NetworkEventResult, String> {
961        self.send_event_and_await_result(NetworkEvent::CleanupConnections { reason })
962            .await
963    }
964
965    /// Force cleanup and reconnect.
966    pub async fn force_reconnect(
967        &self,
968        reason: ReconnectReason,
969    ) -> Result<NetworkEventResult, String> {
970        self.send_event_and_await_result(NetworkEvent::ForceReconnect { reason })
971            .await
972    }
973
974    /// Send event and await result (internal helper)
975    async fn send_event_and_await_result(
976        &self,
977        event: NetworkEvent,
978    ) -> Result<NetworkEventResult, String> {
979        let event_request_id = NEXT_NETWORK_EVENT_REQUEST_ID.fetch_add(1, Ordering::Relaxed);
980        let start = Instant::now();
981        let (result_tx, result_rx) = oneshot::channel();
982        let request = NetworkEventRequest {
983            event: event.clone(),
984            result_tx,
985        };
986
987        tracing::info!(
988            event_request_id,
989            event = ?event,
990            result_timeout_ms = self.result_timeout.as_millis() as u64,
991            "network_event.handle.enqueue"
992        );
993
994        if let Err(e) = self.event_tx.send(request).await {
995            let err = format!("Failed to send network event: {}", e);
996            tracing::warn!(
997                event_request_id,
998                event = ?event,
999                error = %err,
1000                "network_event.handle.enqueue_failed"
1001            );
1002            return Err(err);
1003        }
1004
1005        let result = match tokio::time::timeout(self.result_timeout, result_rx).await {
1006            Ok(Ok(result)) => Ok(result),
1007            Ok(Err(_)) => Err("Failed to receive network event result".to_string()),
1008            Err(_) => Err(format!(
1009                "Timed out waiting for network event result after {}ms",
1010                self.result_timeout.as_millis()
1011            )),
1012        };
1013
1014        let wait_ms = start.elapsed().as_millis() as u64;
1015        match &result {
1016            Ok(result) if result.success => tracing::info!(
1017                event_request_id,
1018                event = ?event,
1019                result_event = ?result.event,
1020                duration_ms = result.duration_ms,
1021                wait_ms,
1022                "network_event.handle.result_received"
1023            ),
1024            Ok(result) => tracing::warn!(
1025                event_request_id,
1026                event = ?event,
1027                result_event = ?result.event,
1028                duration_ms = result.duration_ms,
1029                wait_ms,
1030                error = ?result.error,
1031                "network_event.handle.result_received"
1032            ),
1033            Err(e) => tracing::warn!(
1034                event_request_id,
1035                event = ?event,
1036                wait_ms,
1037                error = %e,
1038                "network_event.handle.result_failed"
1039            ),
1040        }
1041
1042        result
1043    }
1044}
1045
1046impl Clone for NetworkEventHandle {
1047    fn clone(&self) -> Self {
1048        Self {
1049            event_tx: self.event_tx.clone(),
1050            result_timeout: self.result_timeout,
1051        }
1052    }
1053}