actr_runtime/lifecycle/
network_event.rs

1//! Network Event Handling Architecture
2//!
3//! This module defines the network event handling infrastructure.
4//!
5//! # Architecture Overview
6//!
7//! ```text
8//!        ┌─────────────────────────────────────────────┐
9//!        │ (FFI Path - Implemented)  (Actor Path - TODO)
10//!        ▼                                             ▼
11//! ┌──────────────────────────┐      ┌──────────────────────────┐
12//! │ NetworkEventHandle       │      │ Direct Proto Message     │
13//! │ • Platform FFI calls     │      │ • Actor call/tell        │
14//! │ • Send via channel       │      │ • Send to actor mailbox  │
15//! │ • Await result           │      │ • No handle needed       │
16//! └────────┬─────────────────┘      └──────┬───────────────────┘
17//!          │                               │
18//!          └───────────────┬───────────────┘
19//!                          │ Both trigger
20//!                          ▼
21//! ┌─────────────────────────────────────────────────────────┐
22//! │  ActrNode::network_event_loop()                         │
23//! │  • Receive event from channel (FFI path)                │
24//! │  • Or handle message directly (Actor path - TODO)       │
25//! │  • Delegate to NetworkEventProcessor                    │
26//! │  • Send result back via channel                         │
27//! └──────────────────────┬──────────────────────────────────┘
28//!                        │ Delegate
29//!                        ▼
30//! ┌─────────────────────────────────────────────────────────┐
31//! │  NetworkEventProcessor (Trait)                          │
32//! │                                                          │
33//! │  DefaultNetworkEventProcessor:                          │
34//! │  • process_network_available()                          │
35//! │    └─► Reconnect signaling + ICE restart                │
36//! │  • process_network_lost()                               │
37//! │    └─► Clear pending + disconnect                       │
38//! │  • process_network_type_changed()                       │
39//! │    └─► Disconnect + wait + reconnect                    │
40//! └─────────────────────────────────────────────────────────┘
41//! ```
42//!
43//! # Key Components
44//!
45//! - **NetworkEvent**: Event types (Available, Lost, TypeChanged)
46//! - **NetworkEventResult**: Processing result with success/error/duration
47//! - **NetworkEventProcessor**: Trait for custom event handling logic
48//! - **DefaultNetworkEventProcessor**: Default implementation with signaling + WebRTC recovery
49//!
50//! # Usage Patterns
51//!
52//! ## 1. Platform FFI Call (Primary, Implemented)
53//! ```ignore
54//! // Platform layer calls NetworkEventHandle via FFI
55//! let network_handle = system.create_network_event_handle();
56//! let result = network_handle.handle_network_available().await?;
57//! if result.success {
58//!     println!("✅ Processed in {}ms", result.duration_ms);
59//! }
60//! ```
61//!
62//! ## 2. Actor Proto Message (Optional, TODO)
63//! ```ignore
64//! // TODO: actors send proto message directly (not yet implemented)
65//! actor_ref.call(NetworkAvailableMessage).await?;
66//! ```
67//!
68//! **Key Differences:**
69//! - FFI path: Uses NetworkEventHandle + channel (implemented)
70//! - Actor path: Direct proto message to mailbox (TODO, future enhancement)
71
72use std::sync::Arc;
73use std::time::Duration;
74
75use crate::wire::webrtc::{SignalingClient, coordinator::WebRtcCoordinator};
76
77/// 网络事件类型
78#[derive(Debug, Clone)]
79pub enum NetworkEvent {
80    /// 网络可用(从断网恢复)
81    Available,
82
83    /// 网络丢失(断网)
84    Lost,
85
86    /// 网络类型变化(WiFi ↔ Cellular)
87    TypeChanged { is_wifi: bool, is_cellular: bool },
88}
89
90/// 网络事件处理结果
91#[derive(Debug, Clone)]
92pub struct NetworkEventResult {
93    /// 事件类型
94    pub event: NetworkEvent,
95
96    /// 处理是否成功
97    pub success: bool,
98
99    /// 错误信息(如果失败)
100    pub error: Option<String>,
101
102    /// 处理耗时(毫秒)
103    pub duration_ms: u64,
104}
105
106impl NetworkEventResult {
107    pub fn success(event: NetworkEvent, duration_ms: u64) -> Self {
108        Self {
109            event,
110            success: true,
111            error: None,
112            duration_ms,
113        }
114    }
115
116    pub fn failure(event: NetworkEvent, error: String, duration_ms: u64) -> Self {
117        Self {
118            event,
119            success: false,
120            error: Some(error),
121            duration_ms,
122        }
123    }
124}
125
126/// 网络事件处理器 Trait
127///
128/// 定义网络事件的处理逻辑,可由用户自定义实现
129#[async_trait::async_trait]
130pub trait NetworkEventProcessor: Send + Sync {
131    /// 处理网络可用事件
132    ///
133    /// # Returns
134    /// - `Ok(())`: 处理成功
135    /// - `Err(String)`: 处理失败,包含错误信息
136    async fn process_network_available(&self) -> Result<(), String>;
137
138    /// 处理网络丢失事件
139    ///
140    /// # Returns
141    /// - `Ok(())`: 处理成功
142    /// - `Err(String)`: 处理失败,包含错误信息
143    async fn process_network_lost(&self) -> Result<(), String>;
144
145    /// 处理网络类型变化事件
146    ///
147    /// # Returns
148    /// - `Ok(())`: 处理成功
149    /// - `Err(String)`: 处理失败,包含错误信息
150    async fn process_network_type_changed(
151        &self,
152        is_wifi: bool,
153        is_cellular: bool,
154    ) -> Result<(), String>;
155}
156
157/// 默认网络事件处理器实现
158pub struct DefaultNetworkEventProcessor {
159    signaling_client: Arc<dyn SignalingClient>,
160    webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
161}
162
163impl DefaultNetworkEventProcessor {
164    pub fn new(
165        signaling_client: Arc<dyn SignalingClient>,
166        webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
167    ) -> Self {
168        Self {
169            signaling_client,
170            webrtc_coordinator,
171        }
172    }
173}
174
175#[async_trait::async_trait]
176impl NetworkEventProcessor for DefaultNetworkEventProcessor {
177    /// 处理网络可用事件
178    async fn process_network_available(&self) -> Result<(), String> {
179        tracing::info!("📱 Processing: Network available");
180
181        // Step 1: 强制断开现有连接(避免"僵尸连接")
182        if self.signaling_client.is_connected() {
183            tracing::info!("🔌 Disconnecting existing connection to ensure fresh state...");
184            let _ = self.signaling_client.disconnect().await;
185        }
186
187        // Step 2: 短暂延迟,让资源清理
188        tokio::time::sleep(Duration::from_millis(100)).await;
189
190        // Step 3: 建立新的 WebSocket 连接
191        tracing::info!("🔄 Reconnecting WebSocket...");
192        match self.signaling_client.connect().await {
193            Ok(_) => {
194                tracing::info!("✅ WebSocket reconnected successfully");
195            }
196            Err(e) => {
197                let err_msg = format!("WebSocket reconnect failed: {}", e);
198                tracing::error!("❌ {}", err_msg);
199                return Err(err_msg);
200            }
201        }
202
203        // Step 4: 触发 ICE 重启(如果 WebRTC 已初始化)
204        let coordinator = self.webrtc_coordinator.clone();
205
206        if let Some(coordinator) = coordinator {
207            tracing::info!("♻️ Triggering ICE restart for failed connections...");
208            coordinator.retry_failed_connections().await;
209        }
210
211        Ok(())
212    }
213
214    /// 处理网络丢失事件
215    async fn process_network_lost(&self) -> Result<(), String> {
216        tracing::info!("📱 Processing: Network lost");
217
218        // Step 1: 清理待处理的 ICE 重启尝试
219        if let Some(ref coordinator) = self.webrtc_coordinator {
220            tracing::info!("🧹 Clearing pending ICE restart attempts...");
221            coordinator.clear_pending_restarts().await;
222        }
223
224        // Step 2: 主动断开 WebSocket
225        if self.signaling_client.is_connected() {
226            tracing::info!("🔌 Disconnecting WebSocket...");
227            let _ = self.signaling_client.disconnect().await;
228        }
229
230        Ok(())
231    }
232
233    /// 处理网络类型变化事件
234    async fn process_network_type_changed(
235        &self,
236        is_wifi: bool,
237        is_cellular: bool,
238    ) -> Result<(), String> {
239        tracing::info!(
240            "📱 Processing: Network type changed (WiFi={}, Cellular={})",
241            is_wifi,
242            is_cellular
243        );
244
245        // 网络类型变化通常意味着 IP 地址变化
246        // 视为断网 + 恢复序列
247
248        // Step 1: 作为网络丢失处理
249        self.process_network_lost().await?;
250
251        // Step 2: 等待网络稳定
252        tokio::time::sleep(Duration::from_millis(500)).await;
253
254        // Step 3: 作为网络恢复处理
255        self.process_network_available().await?;
256
257        Ok(())
258    }
259}
260
261/// Network Event Handle
262///
263/// Lightweight handle for sending network events and receiving processing results.
264/// Created by `ActrSystem::create_network_event_handle()`.
265pub struct NetworkEventHandle {
266    /// Event sender (to ActrNode)
267    event_tx: tokio::sync::mpsc::Sender<NetworkEvent>,
268
269    /// Result receiver (from ActrNode)
270    /// Wrapped in Arc<Mutex> to allow cloning
271    result_rx: Arc<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<NetworkEventResult>>>,
272}
273
274impl NetworkEventHandle {
275    /// Create a new NetworkEventHandle
276    pub fn new(
277        event_tx: tokio::sync::mpsc::Sender<NetworkEvent>,
278        result_rx: tokio::sync::mpsc::Receiver<NetworkEventResult>,
279    ) -> Self {
280        Self {
281            event_tx,
282            result_rx: Arc::new(tokio::sync::Mutex::new(result_rx)),
283        }
284    }
285
286    /// Handle network available event
287    ///
288    /// # Returns
289    /// - `Ok(NetworkEventResult)`: Processing result
290    /// - `Err(String)`: Failed to send event or receive result
291    pub async fn handle_network_available(&self) -> Result<NetworkEventResult, String> {
292        self.send_event_and_await_result(NetworkEvent::Available)
293            .await
294    }
295
296    /// Handle network lost event
297    ///
298    /// # Returns
299    /// - `Ok(NetworkEventResult)`: Processing result
300    /// - `Err(String)`: Failed to send event or receive result
301    pub async fn handle_network_lost(&self) -> Result<NetworkEventResult, String> {
302        self.send_event_and_await_result(NetworkEvent::Lost).await
303    }
304
305    /// Handle network type changed event
306    ///
307    /// # Returns
308    /// - `Ok(NetworkEventResult)`: Processing result
309    /// - `Err(String)`: Failed to send event or receive result
310    pub async fn handle_network_type_changed(
311        &self,
312        is_wifi: bool,
313        is_cellular: bool,
314    ) -> Result<NetworkEventResult, String> {
315        self.send_event_and_await_result(NetworkEvent::TypeChanged {
316            is_wifi,
317            is_cellular,
318        })
319        .await
320    }
321
322    /// Send event and await result (internal helper)
323    async fn send_event_and_await_result(
324        &self,
325        event: NetworkEvent,
326    ) -> Result<NetworkEventResult, String> {
327        // Send event
328        self.event_tx
329            .send(event.clone())
330            .await
331            .map_err(|e| format!("Failed to send network event: {}", e))?;
332
333        // Await result
334        let mut rx = self.result_rx.lock().await;
335        rx.recv()
336            .await
337            .ok_or_else(|| "Failed to receive network event result".to_string())
338    }
339}
340
341impl Clone for NetworkEventHandle {
342    fn clone(&self) -> Self {
343        Self {
344            event_tx: self.event_tx.clone(),
345            result_rx: self.result_rx.clone(),
346        }
347    }
348}