Skip to main content

actr_hyper/lifecycle/
network_event.rs

1//! Network Event Handling Architecture
2//!
3//! This module defines the network event handling infrastructure.
4//!
5//! # Architecture Overview
6//!
7//! ```text
8//!        ┌─────────────────────────────────────────────┐
9//!        │ (FFI Path - Implemented)  (Actor Path - TODO)
10//!        ▼                                             ▼
11//! ┌──────────────────────────┐      ┌──────────────────────────┐
12//! │ NetworkEventHandle       │      │ Direct Proto Message     │
13//! │ • Platform FFI calls     │      │ • Actor call/tell        │
14//! │ • Send via channel       │      │ • Send to actor mailbox  │
15//! │ • Await result           │      │ • No handle needed       │
16//! └────────┬─────────────────┘      └──────┬───────────────────┘
17//!          │                               │
18//!          └───────────────┬───────────────┘
19//!                          │ Both trigger
20//!                          ▼
21//! ┌─────────────────────────────────────────────────────────┐
22//! │  ActrNode::network_event_loop()                         │
23//! │  • Receive event from channel (FFI path)                │
24//! │  • Or handle message directly (Actor path - TODO)       │
25//! │  • Delegate to NetworkEventProcessor                    │
26//! │  • Send result back via channel                         │
27//! └──────────────────────┬──────────────────────────────────┘
28//!                        │ Delegate
29//!                        ▼
30//! ┌─────────────────────────────────────────────────────────┐
31//! │  NetworkEventProcessor (Trait)                          │
32//! │                                                          │
33//! │  DefaultNetworkEventProcessor:                          │
34//! │  • process_network_available()                          │
35//! │    └─► Reconnect signaling + ICE restart                │
36//! │  • process_network_lost()                               │
37//! │    └─► Clear pending + disconnect                       │
38//! │  • process_network_type_changed()                       │
39//! │    └─► Disconnect + wait + reconnect                    │
40//! └─────────────────────────────────────────────────────────┘
41//! ```
42//!
43//! # Key Components
44//!
45//! - **NetworkEvent**: Event types (Available, Lost, TypeChanged)
46//! - **NetworkEventResult**: Processing result with success/error/duration
47//! - **NetworkEventProcessor**: Trait for custom event handling logic
48//! - **DefaultNetworkEventProcessor**: Default implementation with signaling + WebRTC recovery
49//!
50//! # Usage Patterns
51//!
52//! ## 1. Platform FFI Call (Primary, Implemented)
53//! ```ignore
54//! // Platform layer calls NetworkEventHandle via FFI
55//! let network_handle = system.create_network_event_handle();
56//! let result = network_handle.handle_network_available().await?;
57//! if result.success {
58//!     println!("✅ Processed in {}ms", result.duration_ms);
59//! }
60//! ```
61//!
62//! ## 2. Actor Proto Message (Optional, TODO)
63//! ```ignore
64//! // TODO: actors send proto message directly (not yet implemented)
65//! actor_ref.call(NetworkAvailableMessage).await?;
66//! ```
67//!
68//! **Key Differences:**
69//! - FFI path: Uses NetworkEventHandle + channel (implemented)
70//! - Actor path: Direct proto message to mailbox (TODO, future enhancement)
71
72use std::sync::Arc;
73use std::time::{Duration, Instant};
74
75use crate::wire::webrtc::{SignalingClient, WebRtcCoordinator};
76use tokio_util::sync::CancellationToken;
77
78const NETWORK_EVENT_SETTLE_WINDOW: Duration = Duration::from_millis(400);
79const SIGNALING_PROBE_TIMEOUT: Duration = Duration::from_secs(1);
80
81/// Network event type
82#[derive(Debug, Clone, PartialEq, Eq, Hash)]
83pub enum NetworkEvent {
84    /// Network available (recovered from disconnection)
85    Available,
86
87    /// Network lost (disconnected)
88    Lost,
89
90    /// Network type changed (WiFi <-> Cellular)
91    TypeChanged { is_wifi: bool, is_cellular: bool },
92
93    /// Proactively clean up all connections
94    ///
95    /// Used for app lifecycle management scenarios:
96    /// - App entering background
97    /// - User actively logging out
98    /// - App about to exit
99    CleanupConnections,
100}
101
102/// Final action selected from a settled batch of network events.
103#[derive(Debug, Clone, Copy, PartialEq, Eq)]
104pub enum NetworkRecoveryAction {
105    Noop,
106    Offline,
107    Restore,
108    CleanupConnectionsCompat,
109}
110
111/// Network event processing result
112#[derive(Debug, Clone)]
113pub struct NetworkEventResult {
114    /// Event type
115    pub event: NetworkEvent,
116
117    /// Whether processing succeeded
118    pub success: bool,
119
120    /// Error message (if failed)
121    pub error: Option<String>,
122
123    /// Processing duration (milliseconds)
124    pub duration_ms: u64,
125}
126
127impl NetworkEventResult {
128    pub fn success(event: NetworkEvent, duration_ms: u64) -> Self {
129        Self {
130            event,
131            success: true,
132            error: None,
133            duration_ms,
134        }
135    }
136
137    pub fn failure(event: NetworkEvent, error: String, duration_ms: u64) -> Self {
138        Self {
139            event,
140            success: false,
141            error: Some(error),
142            duration_ms,
143        }
144    }
145}
146
147/// Network event processor trait
148///
149/// Defines the processing logic for network events; can be custom-implemented by users
150#[async_trait::async_trait]
151pub trait NetworkEventProcessor: Send + Sync {
152    /// Process network available event
153    ///
154    /// # Returns
155    /// - `Ok(())`: processing succeeded
156    /// - `Err(String)`: processing failed, contains error message
157    async fn process_network_available(&self) -> Result<(), String>;
158
159    /// Process network lost event
160    ///
161    /// # Returns
162    /// - `Ok(())`: processing succeeded
163    /// - `Err(String)`: processing failed, contains error message
164    async fn process_network_lost(&self) -> Result<(), String>;
165
166    /// Process network type changed event
167    ///
168    /// # Returns
169    /// - `Ok(())`: processing succeeded
170    /// - `Err(String)`: processing failed, contains error message
171    async fn process_network_type_changed(
172        &self,
173        is_wifi: bool,
174        is_cellular: bool,
175    ) -> Result<(), String>;
176
177    /// Proactively clean up all connections
178    ///
179    /// This method proactively cleans up all network connections. Applicable scenarios:
180    /// - App entering background (iOS/Android)
181    /// - User actively logging out
182    /// - App about to exit
183    /// - Need to reset network state
184    ///
185    /// # FFI Binding Note
186    ///
187    /// This method is specifically designed for FFI bindings, allowing upper-layer
188    /// platform code (Swift/Kotlin) to proactively manage connection lifecycle
189    /// through the unified `NetworkEventProcessor` interface.
190    ///
191    /// # Difference from Event Response
192    ///
193    /// - `process_network_lost()`: passively responds to network disconnection events
194    /// - `cleanup_connections()`: proactively cleans up connections (independent of network events)
195    ///
196    /// # Returns
197    /// - `Ok(())`: cleanup succeeded
198    /// - `Err(String)`: cleanup failed, contains error message
199    async fn cleanup_connections(&self) -> Result<(), String>;
200
201    /// Process the final action selected from a settled event batch.
202    ///
203    /// Custom processors can rely on the default mapping. The default runtime
204    /// processor overrides this to bypass per-event debounce after reconciliation.
205    async fn process_network_recovery_action(
206        &self,
207        action: NetworkRecoveryAction,
208    ) -> Result<(), String> {
209        match action {
210            NetworkRecoveryAction::Noop => Ok(()),
211            NetworkRecoveryAction::Offline => self.process_network_lost().await,
212            NetworkRecoveryAction::Restore => self.process_network_available().await,
213            NetworkRecoveryAction::CleanupConnectionsCompat => self.cleanup_connections().await,
214        }
215    }
216}
217
218/// Debounce configuration
219#[derive(Debug, Clone)]
220pub struct DebounceConfig {
221    /// Debounce time window (duplicate events within this window are ignored)
222    pub window: Duration,
223}
224
225impl Default for DebounceConfig {
226    fn default() -> Self {
227        Self {
228            // Default debounce window
229            window: Duration::from_secs(2),
230        }
231    }
232}
233
234/// Debounce state tracking
235#[derive(Debug)]
236struct DebounceState {
237    last_available: tokio::sync::Mutex<Option<Instant>>,
238    last_lost: tokio::sync::Mutex<Option<Instant>>,
239    last_type_changed: tokio::sync::Mutex<Option<Instant>>,
240}
241
242impl DebounceState {
243    fn new() -> Self {
244        Self {
245            last_available: tokio::sync::Mutex::new(None),
246            last_lost: tokio::sync::Mutex::new(None),
247            last_type_changed: tokio::sync::Mutex::new(None),
248        }
249    }
250}
251
252#[derive(Debug)]
253struct SignalingRecoveryState {
254    connect_lock: tokio::sync::Mutex<()>,
255    last_successful_connect: tokio::sync::Mutex<Option<Instant>>,
256}
257
258impl SignalingRecoveryState {
259    fn new() -> Self {
260        Self {
261            connect_lock: tokio::sync::Mutex::new(()),
262            last_successful_connect: tokio::sync::Mutex::new(None),
263        }
264    }
265}
266
267/// Default network event processor implementation
268pub struct DefaultNetworkEventProcessor {
269    signaling_client: Arc<dyn SignalingClient>,
270    webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
271    debounce_config: DebounceConfig,
272    debounce_state: Arc<DebounceState>,
273    recovery_state: Arc<SignalingRecoveryState>,
274}
275
276impl DefaultNetworkEventProcessor {
277    pub fn new(
278        signaling_client: Arc<dyn SignalingClient>,
279        webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
280    ) -> Self {
281        Self::new_with_debounce(
282            signaling_client,
283            webrtc_coordinator,
284            DebounceConfig::default(),
285        )
286    }
287
288    pub fn new_with_debounce(
289        signaling_client: Arc<dyn SignalingClient>,
290        webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
291        debounce_config: DebounceConfig,
292    ) -> Self {
293        Self {
294            signaling_client,
295            webrtc_coordinator,
296            debounce_config,
297            debounce_state: Arc::new(DebounceState::new()),
298            recovery_state: Arc::new(SignalingRecoveryState::new()),
299        }
300    }
301
302    /// Check whether an event should be filtered by debounce
303    ///
304    /// # Returns
305    /// - `true`: the event should be processed
306    /// - `false`: the event is within the debounce window and should be ignored
307    async fn should_process_event(&self, event: &NetworkEvent) -> bool {
308        let now = Instant::now();
309
310        match event {
311            NetworkEvent::Available => {
312                let mut last = self.debounce_state.last_available.lock().await;
313                if let Some(last_time) = *last {
314                    if now.duration_since(last_time) < self.debounce_config.window {
315                        tracing::debug!(
316                            "⏸️  Debouncing Network Available event (last event was {:?} ago)",
317                            now.duration_since(last_time)
318                        );
319                        return false;
320                    }
321                }
322                *last = Some(now);
323                true
324            }
325            NetworkEvent::Lost => {
326                let mut last = self.debounce_state.last_lost.lock().await;
327                if let Some(last_time) = *last {
328                    if now.duration_since(last_time) < self.debounce_config.window {
329                        tracing::debug!(
330                            "⏸️  Debouncing Network Lost event (last event was {:?} ago)",
331                            now.duration_since(last_time)
332                        );
333                        return false;
334                    }
335                }
336                *last = Some(now);
337                true
338            }
339            NetworkEvent::TypeChanged { .. } => {
340                let mut last = self.debounce_state.last_type_changed.lock().await;
341                if let Some(last_time) = *last {
342                    if now.duration_since(last_time) < self.debounce_config.window {
343                        tracing::debug!(
344                            "⏸️  Debouncing Network TypeChanged event (last event was {:?} ago)",
345                            now.duration_since(last_time)
346                        );
347                        return false;
348                    }
349                }
350                *last = Some(now);
351                true
352            }
353            // CleanupConnections skips debounce check; proactive cleanup always executes immediately
354            NetworkEvent::CleanupConnections => {
355                tracing::debug!(
356                    "🧹 CleanupConnections event - no debouncing (always execute immediately)"
357                );
358                true
359            }
360        }
361    }
362
363    async fn ensure_signaling_connected_once(&self, reason: &str) -> Result<(), String> {
364        let _guard = self.recovery_state.connect_lock.lock().await;
365
366        if self.signaling_client.is_connected() {
367            tracing::debug!(
368                reason = reason,
369                "Signaling already connected, skipping connect"
370            );
371            return Ok(());
372        }
373
374        let recently_connected = {
375            let last = self.recovery_state.last_successful_connect.lock().await;
376            last.map(|instant| instant.elapsed() < Duration::from_millis(1500))
377                .unwrap_or(false)
378        };
379        if recently_connected && self.signaling_client.is_connected() {
380            tracing::debug!(
381                reason = reason,
382                "Signaling recently connected, reusing connection"
383            );
384            return Ok(());
385        }
386
387        tracing::info!(reason = reason, "🔄 Connecting signaling");
388        self.signaling_client.connect_once().await.map_err(|e| {
389            let err_msg = format!("WebSocket connect failed: {}", e);
390            tracing::error!("❌ {}", err_msg);
391            err_msg
392        })?;
393
394        *self.recovery_state.last_successful_connect.lock().await = Some(Instant::now());
395        tracing::info!(reason = reason, "✅ Signaling connected");
396        Ok(())
397    }
398
399    async fn ensure_signaling_healthy_once(&self, reason: &str) -> Result<(), String> {
400        let _guard = self.recovery_state.connect_lock.lock().await;
401
402        if !self.signaling_client.is_connected() {
403            tracing::info!(reason = reason, "🔄 Connecting signaling");
404            self.signaling_client.connect_once().await.map_err(|e| {
405                let err_msg = format!("WebSocket connect failed: {}", e);
406                tracing::error!("❌ {}", err_msg);
407                err_msg
408            })?;
409
410            *self.recovery_state.last_successful_connect.lock().await = Some(Instant::now());
411            tracing::info!(reason = reason, "✅ Signaling connected");
412            return Ok(());
413        }
414
415        tracing::debug!(
416            reason = reason,
417            timeout_ms = SIGNALING_PROBE_TIMEOUT.as_millis() as u64,
418            "🔎 Probing existing signaling WebSocket"
419        );
420
421        match self
422            .signaling_client
423            .probe_alive(SIGNALING_PROBE_TIMEOUT)
424            .await
425        {
426            Ok(()) => {
427                tracing::debug!(
428                    reason = reason,
429                    "✅ Signaling probe succeeded; keeping existing WebSocket"
430                );
431                Ok(())
432            }
433            Err(e) => {
434                tracing::warn!(
435                    reason = reason,
436                    "⚠️ Signaling probe failed; rebuilding WebSocket: {}",
437                    e
438                );
439
440                if let Err(disconnect_err) = self.signaling_client.disconnect().await {
441                    tracing::warn!(
442                        reason = reason,
443                        "⚠️ Failed to disconnect unhealthy signaling before rebuild: {}",
444                        disconnect_err
445                    );
446                }
447
448                tracing::info!(reason = reason, "🔄 Rebuilding signaling: connecting");
449                self.signaling_client
450                    .connect_once()
451                    .await
452                    .map_err(|connect_err| {
453                        let err_msg = format!("WebSocket rebuild failed: {}", connect_err);
454                        tracing::error!("❌ {}", err_msg);
455                        err_msg
456                    })?;
457
458                *self.recovery_state.last_successful_connect.lock().await = Some(Instant::now());
459                tracing::info!(reason = reason, "✅ Signaling rebuilt");
460                Ok(())
461            }
462        }
463    }
464
465    async fn restore_signaling_and_webrtc(&self, reason: &str) -> Result<(), String> {
466        let recovery_targets = if let Some(coordinator) = self.webrtc_coordinator.clone() {
467            coordinator.begin_network_recovery(reason).await
468        } else {
469            Vec::new()
470        };
471
472        self.ensure_signaling_healthy_once(reason).await?;
473
474        let coordinator = self.webrtc_coordinator.clone();
475
476        if let Some(coordinator) = coordinator {
477            if recovery_targets.is_empty() {
478                tracing::info!("♻️ Resuming ICE restart for peers already in network recovery");
479            } else {
480                tracing::info!("♻️ Triggering ICE restart for recovering connections...");
481            }
482            coordinator.restart_network_recovery_connections().await;
483        }
484
485        Ok(())
486    }
487
488    async fn process_offline(&self) -> Result<(), String> {
489        tracing::info!("📱 Processing: Network offline");
490
491        if let Some(ref coordinator) = self.webrtc_coordinator {
492            coordinator.begin_network_recovery("NetworkLost").await;
493            tracing::info!("🧹 Clearing pending ICE restart attempts...");
494            coordinator.clear_pending_restarts().await;
495        }
496
497        if self.signaling_client.is_connected() {
498            tracing::info!("🔌 Disconnecting WebSocket...");
499            let _ = self.signaling_client.disconnect().await;
500        }
501
502        Ok(())
503    }
504}
505
506#[async_trait::async_trait]
507impl NetworkEventProcessor for DefaultNetworkEventProcessor {
508    /// Process network available event
509    async fn process_network_available(&self) -> Result<(), String> {
510        // Debounce check
511        let should_process = self.should_process_event(&NetworkEvent::Available).await;
512        if !should_process && self.signaling_client.is_connected() {
513            return Ok(());
514        }
515
516        tracing::info!("📱 Processing: Network available");
517
518        self.restore_signaling_and_webrtc("NetworkAvailable").await
519    }
520
521    /// Process network lost event
522    async fn process_network_lost(&self) -> Result<(), String> {
523        // Debounce check
524        if !self.should_process_event(&NetworkEvent::Lost).await {
525            return Ok(());
526        }
527
528        self.process_offline().await
529    }
530
531    /// Process network type changed event
532    async fn process_network_type_changed(
533        &self,
534        is_wifi: bool,
535        is_cellular: bool,
536    ) -> Result<(), String> {
537        // Debounce check
538        let should_process = self
539            .should_process_event(&NetworkEvent::TypeChanged {
540                is_wifi,
541                is_cellular,
542            })
543            .await;
544        if !should_process && self.signaling_client.is_connected() {
545            return Ok(());
546        }
547
548        tracing::info!(
549            "📱 Processing: Network type changed (WiFi={}, Cellular={})",
550            is_wifi,
551            is_cellular
552        );
553
554        self.restore_signaling_and_webrtc("NetworkTypeChanged")
555            .await
556    }
557
558    /// Proactively clean up all connections
559    ///
560    /// Differs from `process_network_lost()`:
561    /// - No debounce check (proactive calls always execute)
562    /// - Intended for app lifecycle management, not network event response
563    async fn cleanup_connections(&self) -> Result<(), String> {
564        let _cleanup_guard = self
565            .webrtc_coordinator
566            .as_ref()
567            .map(|coordinator| coordinator.cleanup_guard());
568
569        tracing::info!("🧹 Manually cleaning up all connections...");
570
571        // Step 1: Clear pending ICE restart attempts
572        if let Some(ref coordinator) = self.webrtc_coordinator {
573            tracing::info!("♻️  Clearing pending ICE restart attempts...");
574            coordinator.clear_pending_restarts().await;
575
576            // Step 2: Close all WebRTC peer connections
577            tracing::info!("🔻 Closing all WebRTC peer connections...");
578            if let Err(e) = coordinator.close_all_peers().await {
579                let err_msg = format!("Failed to close all peers: {}", e);
580                tracing::warn!("⚠️  {}", err_msg);
581                // Do not fail the whole cleanup; continue releasing other resources.
582            } else {
583                tracing::info!("✅ All WebRTC peer connections closed");
584            }
585        }
586
587        // Step 3: Proactively disconnect the WebSocket.
588        if self.signaling_client.is_connected() {
589            tracing::info!("🔌 Disconnecting WebSocket...");
590            match self.signaling_client.disconnect().await {
591                Ok(_) => {
592                    tracing::info!("✅ WebSocket disconnected successfully");
593                }
594                Err(e) => {
595                    let err_msg = format!("Failed to disconnect WebSocket: {}", e);
596                    tracing::warn!("⚠️  {}", err_msg);
597                    // Do not fail the whole cleanup; continue releasing other resources.
598                }
599            }
600        }
601
602        tracing::info!("✅ Connection cleanup completed");
603
604        // Step 4: Re-establish signaling immediately.
605        // This keeps the app usable as soon as it returns to the foreground.
606        tracing::info!("🔌 Re-establishing signaling connection...");
607        self.ensure_signaling_connected_once("CompatCleanupConnections")
608            .await?;
609
610        tracing::info!("✅ Connection cleanup and reconnect completed");
611        Ok(())
612    }
613
614    async fn process_network_recovery_action(
615        &self,
616        action: NetworkRecoveryAction,
617    ) -> Result<(), String> {
618        match action {
619            NetworkRecoveryAction::Noop => Ok(()),
620            NetworkRecoveryAction::Offline => self.process_offline().await,
621            NetworkRecoveryAction::Restore => {
622                self.restore_signaling_and_webrtc("NetworkEventBatch").await
623            }
624            NetworkRecoveryAction::CleanupConnectionsCompat => self.cleanup_connections().await,
625        }
626    }
627}
628
629pub fn select_network_recovery_action(events: &[NetworkEvent]) -> NetworkRecoveryAction {
630    let mut saw_cleanup_connections = false;
631    let mut latest_state_action = NetworkRecoveryAction::Noop;
632
633    for event in events {
634        match event {
635            NetworkEvent::CleanupConnections => saw_cleanup_connections = true,
636            NetworkEvent::Available | NetworkEvent::TypeChanged { .. } => {
637                latest_state_action = NetworkRecoveryAction::Restore
638            }
639            NetworkEvent::Lost => latest_state_action = NetworkRecoveryAction::Offline,
640        }
641    }
642
643    if saw_cleanup_connections {
644        NetworkRecoveryAction::CleanupConnectionsCompat
645    } else {
646        latest_state_action
647    }
648}
649
650pub async fn process_network_event_batch(
651    events: Vec<NetworkEvent>,
652    processor: Arc<dyn NetworkEventProcessor>,
653) -> Vec<NetworkEventResult> {
654    if events.is_empty() {
655        return Vec::new();
656    }
657
658    let action = select_network_recovery_action(&events);
659    let start = Instant::now();
660
661    let result = processor.process_network_recovery_action(action).await;
662
663    let duration_ms = start.elapsed().as_millis() as u64;
664    events
665        .into_iter()
666        .map(|event| match &result {
667            Ok(()) => NetworkEventResult::success(event, duration_ms),
668            Err(e) => NetworkEventResult::failure(event, e.clone(), duration_ms),
669        })
670        .collect()
671}
672
673pub async fn run_network_event_reconciler(
674    mut event_rx: tokio::sync::mpsc::Receiver<NetworkEvent>,
675    result_tx: tokio::sync::mpsc::Sender<NetworkEventResult>,
676    processor: Arc<dyn NetworkEventProcessor>,
677    shutdown_token: CancellationToken,
678) {
679    tracing::info!("🔄 Network event reconciler started");
680
681    loop {
682        tokio::select! {
683            Some(first_event) = event_rx.recv() => {
684                let mut events = vec![first_event];
685                let settle = tokio::time::sleep(NETWORK_EVENT_SETTLE_WINDOW);
686                tokio::pin!(settle);
687
688                loop {
689                    tokio::select! {
690                        Some(next_event) = event_rx.recv() => {
691                            events.push(next_event);
692                        }
693                        _ = &mut settle => {
694                            break;
695                        }
696                        _ = shutdown_token.cancelled() => {
697                            tracing::info!("🛑 Network event reconciler shutting down");
698                            return;
699                        }
700                        else => {
701                            break;
702                        }
703                    }
704                }
705
706                while let Ok(next_event) = event_rx.try_recv() {
707                    events.push(next_event);
708                }
709
710                let action = select_network_recovery_action(&events);
711                tracing::info!(
712                    event_count = events.len(),
713                    action = ?action,
714                    settle_window_ms = NETWORK_EVENT_SETTLE_WINDOW.as_millis() as u64,
715                    "📱 Processing settled network event batch"
716                );
717
718                let results = process_network_event_batch(events, processor.clone()).await;
719
720                for result in results {
721                    if let Err(e) = result_tx.send(result).await {
722                        tracing::warn!("Failed to send event result: {}", e);
723                    }
724                }
725            }
726            _ = shutdown_token.cancelled() => {
727                tracing::info!("🛑 Network event reconciler shutting down");
728                break;
729            }
730            else => break,
731        }
732    }
733}
734
735/// Network Event Handle
736///
737/// Lightweight handle for sending network events and receiving processing results.
738/// Created before `ActrNode::start()` to bridge platform network events.
739pub struct NetworkEventHandle {
740    /// Event sender (to ActrNode)
741    event_tx: tokio::sync::mpsc::Sender<NetworkEvent>,
742
743    /// Result receiver (from ActrNode)
744    /// Wrapped in Arc<Mutex> to allow cloning
745    result_rx: Arc<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<NetworkEventResult>>>,
746}
747
748impl NetworkEventHandle {
749    /// Create a new NetworkEventHandle
750    pub fn new(
751        event_tx: tokio::sync::mpsc::Sender<NetworkEvent>,
752        result_rx: tokio::sync::mpsc::Receiver<NetworkEventResult>,
753    ) -> Self {
754        Self {
755            event_tx,
756            result_rx: Arc::new(tokio::sync::Mutex::new(result_rx)),
757        }
758    }
759
760    /// Handle network available event
761    ///
762    /// # Returns
763    /// - `Ok(NetworkEventResult)`: Processing result
764    /// - `Err(String)`: Failed to send event or receive result
765    pub async fn handle_network_available(&self) -> Result<NetworkEventResult, String> {
766        self.send_event_and_await_result(NetworkEvent::Available)
767            .await
768    }
769
770    /// Handle network lost event
771    ///
772    /// # Returns
773    /// - `Ok(NetworkEventResult)`: Processing result
774    /// - `Err(String)`: Failed to send event or receive result
775    pub async fn handle_network_lost(&self) -> Result<NetworkEventResult, String> {
776        self.send_event_and_await_result(NetworkEvent::Lost).await
777    }
778
779    /// Handle network type changed event
780    ///
781    /// # Returns
782    /// - `Ok(NetworkEventResult)`: Processing result
783    /// - `Err(String)`: Failed to send event or receive result
784    pub async fn handle_network_type_changed(
785        &self,
786        is_wifi: bool,
787        is_cellular: bool,
788    ) -> Result<NetworkEventResult, String> {
789        self.send_event_and_await_result(NetworkEvent::TypeChanged {
790            is_wifi,
791            is_cellular,
792        })
793        .await
794    }
795
796    /// Proactively clean up all connections.
797    ///
798    /// Use this to proactively clean up all network connections in cases such as:
799    /// - App entering the background (iOS/Android)
800    /// - User logging out
801    /// - App preparing to exit
802    /// - Network state reset
803    ///
804    /// # Returns
805    /// - `Ok(NetworkEventResult)`: Processing result
806    /// - `Err(String)`: Failed to send event or receive result
807    pub async fn cleanup_connections(&self) -> Result<NetworkEventResult, String> {
808        self.send_event_and_await_result(NetworkEvent::CleanupConnections)
809            .await
810    }
811
812    /// Send event and await result (internal helper)
813    async fn send_event_and_await_result(
814        &self,
815        event: NetworkEvent,
816    ) -> Result<NetworkEventResult, String> {
817        // Send event
818        self.event_tx
819            .send(event.clone())
820            .await
821            .map_err(|e| format!("Failed to send network event: {}", e))?;
822
823        // Await result
824        let mut rx = self.result_rx.lock().await;
825        rx.recv()
826            .await
827            .ok_or_else(|| "Failed to receive network event result".to_string())
828    }
829}
830
831impl Clone for NetworkEventHandle {
832    fn clone(&self) -> Self {
833        Self {
834            event_tx: self.event_tx.clone(),
835            result_rx: self.result_rx.clone(),
836        }
837    }
838}