actr_runtime/lifecycle/network_event.rs
1//! Network Event Handling Architecture
2//!
3//! This module defines the network event handling infrastructure.
4//!
5//! # Architecture Overview
6//!
7//! ```text
8//! ┌─────────────────────────────────────────────┐
9//! │ (FFI Path - Implemented) (Actor Path - TODO)
10//! ▼ ▼
11//! ┌──────────────────────────┐ ┌──────────────────────────┐
12//! │ NetworkEventHandle │ │ Direct Proto Message │
13//! │ • Platform FFI calls │ │ • Actor call/tell │
14//! │ • Send via channel │ │ • Send to actor mailbox │
15//! │ • Await result │ │ • No handle needed │
16//! └────────┬─────────────────┘ └──────┬───────────────────┘
17//! │ │
18//! └───────────────┬───────────────┘
19//! │ Both trigger
20//! ▼
21//! ┌─────────────────────────────────────────────────────────┐
22//! │ ActrNode::network_event_loop() │
23//! │ • Receive event from channel (FFI path) │
24//! │ • Or handle message directly (Actor path - TODO) │
25//! │ • Delegate to NetworkEventProcessor │
26//! │ • Send result back via channel │
27//! └──────────────────────┬──────────────────────────────────┘
28//! │ Delegate
29//! ▼
30//! ┌─────────────────────────────────────────────────────────┐
31//! │ NetworkEventProcessor (Trait) │
32//! │ │
33//! │ DefaultNetworkEventProcessor: │
34//! │ • process_network_available() │
35//! │ └─► Reconnect signaling + ICE restart │
36//! │ • process_network_lost() │
37//! │ └─► Clear pending + disconnect │
38//! │ • process_network_type_changed() │
39//! │ └─► Disconnect + wait + reconnect │
40//! └─────────────────────────────────────────────────────────┘
41//! ```
42//!
43//! # Key Components
44//!
45//! - **NetworkEvent**: Event types (Available, Lost, TypeChanged)
46//! - **NetworkEventResult**: Processing result with success/error/duration
47//! - **NetworkEventProcessor**: Trait for custom event handling logic
48//! - **DefaultNetworkEventProcessor**: Default implementation with signaling + WebRTC recovery
49//!
50//! # Usage Patterns
51//!
52//! ## 1. Platform FFI Call (Primary, Implemented)
53//! ```ignore
54//! // Platform layer calls NetworkEventHandle via FFI
55//! let network_handle = system.create_network_event_handle();
56//! let result = network_handle.handle_network_available().await?;
57//! if result.success {
58//! println!("✅ Processed in {}ms", result.duration_ms);
59//! }
60//! ```
61//!
62//! ## 2. Actor Proto Message (Optional, TODO)
63//! ```ignore
64//! // TODO: actors send proto message directly (not yet implemented)
65//! actor_ref.call(NetworkAvailableMessage).await?;
66//! ```
67//!
68//! **Key Differences:**
69//! - FFI path: Uses NetworkEventHandle + channel (implemented)
70//! - Actor path: Direct proto message to mailbox (TODO, future enhancement)
71
72use std::sync::Arc;
73use std::time::Duration;
74
75use crate::wire::webrtc::{SignalingClient, coordinator::WebRtcCoordinator};
76
77/// 网络事件类型
78#[derive(Debug, Clone)]
79pub enum NetworkEvent {
80 /// 网络可用(从断网恢复)
81 Available,
82
83 /// 网络丢失(断网)
84 Lost,
85
86 /// 网络类型变化(WiFi ↔ Cellular)
87 TypeChanged { is_wifi: bool, is_cellular: bool },
88}
89
90/// 网络事件处理结果
91#[derive(Debug, Clone)]
92pub struct NetworkEventResult {
93 /// 事件类型
94 pub event: NetworkEvent,
95
96 /// 处理是否成功
97 pub success: bool,
98
99 /// 错误信息(如果失败)
100 pub error: Option<String>,
101
102 /// 处理耗时(毫秒)
103 pub duration_ms: u64,
104}
105
106impl NetworkEventResult {
107 pub fn success(event: NetworkEvent, duration_ms: u64) -> Self {
108 Self {
109 event,
110 success: true,
111 error: None,
112 duration_ms,
113 }
114 }
115
116 pub fn failure(event: NetworkEvent, error: String, duration_ms: u64) -> Self {
117 Self {
118 event,
119 success: false,
120 error: Some(error),
121 duration_ms,
122 }
123 }
124}
125
126/// 网络事件处理器 Trait
127///
128/// 定义网络事件的处理逻辑,可由用户自定义实现
129#[async_trait::async_trait]
130pub trait NetworkEventProcessor: Send + Sync {
131 /// 处理网络可用事件
132 ///
133 /// # Returns
134 /// - `Ok(())`: 处理成功
135 /// - `Err(String)`: 处理失败,包含错误信息
136 async fn process_network_available(&self) -> Result<(), String>;
137
138 /// 处理网络丢失事件
139 ///
140 /// # Returns
141 /// - `Ok(())`: 处理成功
142 /// - `Err(String)`: 处理失败,包含错误信息
143 async fn process_network_lost(&self) -> Result<(), String>;
144
145 /// 处理网络类型变化事件
146 ///
147 /// # Returns
148 /// - `Ok(())`: 处理成功
149 /// - `Err(String)`: 处理失败,包含错误信息
150 async fn process_network_type_changed(
151 &self,
152 is_wifi: bool,
153 is_cellular: bool,
154 ) -> Result<(), String>;
155}
156
157/// 默认网络事件处理器实现
158pub struct DefaultNetworkEventProcessor {
159 signaling_client: Arc<dyn SignalingClient>,
160 webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
161}
162
163impl DefaultNetworkEventProcessor {
164 pub fn new(
165 signaling_client: Arc<dyn SignalingClient>,
166 webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
167 ) -> Self {
168 Self {
169 signaling_client,
170 webrtc_coordinator,
171 }
172 }
173}
174
175#[async_trait::async_trait]
176impl NetworkEventProcessor for DefaultNetworkEventProcessor {
177 /// 处理网络可用事件
178 async fn process_network_available(&self) -> Result<(), String> {
179 tracing::info!("📱 Processing: Network available");
180
181 // Step 1: 强制断开现有连接(避免"僵尸连接")
182 if self.signaling_client.is_connected() {
183 tracing::info!("🔌 Disconnecting existing connection to ensure fresh state...");
184 let _ = self.signaling_client.disconnect().await;
185 }
186
187 // Step 2: 短暂延迟,让资源清理
188 tokio::time::sleep(Duration::from_millis(100)).await;
189
190 // Step 3: 建立新的 WebSocket 连接
191 tracing::info!("🔄 Reconnecting WebSocket...");
192 match self.signaling_client.connect().await {
193 Ok(_) => {
194 tracing::info!("✅ WebSocket reconnected successfully");
195 }
196 Err(e) => {
197 let err_msg = format!("WebSocket reconnect failed: {}", e);
198 tracing::error!("❌ {}", err_msg);
199 return Err(err_msg);
200 }
201 }
202
203 // Step 4: 触发 ICE 重启(如果 WebRTC 已初始化)
204 let coordinator = self.webrtc_coordinator.clone();
205
206 if let Some(coordinator) = coordinator {
207 tracing::info!("♻️ Triggering ICE restart for failed connections...");
208 coordinator.retry_failed_connections().await;
209 }
210
211 Ok(())
212 }
213
214 /// 处理网络丢失事件
215 async fn process_network_lost(&self) -> Result<(), String> {
216 tracing::info!("📱 Processing: Network lost");
217
218 // Step 1: 清理待处理的 ICE 重启尝试
219 if let Some(ref coordinator) = self.webrtc_coordinator {
220 tracing::info!("🧹 Clearing pending ICE restart attempts...");
221 coordinator.clear_pending_restarts().await;
222 }
223
224 // Step 2: 主动断开 WebSocket
225 if self.signaling_client.is_connected() {
226 tracing::info!("🔌 Disconnecting WebSocket...");
227 let _ = self.signaling_client.disconnect().await;
228 }
229
230 Ok(())
231 }
232
233 /// 处理网络类型变化事件
234 async fn process_network_type_changed(
235 &self,
236 is_wifi: bool,
237 is_cellular: bool,
238 ) -> Result<(), String> {
239 tracing::info!(
240 "📱 Processing: Network type changed (WiFi={}, Cellular={})",
241 is_wifi,
242 is_cellular
243 );
244
245 // 网络类型变化通常意味着 IP 地址变化
246 // 视为断网 + 恢复序列
247
248 // Step 1: 作为网络丢失处理
249 self.process_network_lost().await?;
250
251 // Step 2: 等待网络稳定
252 tokio::time::sleep(Duration::from_millis(500)).await;
253
254 // Step 3: 作为网络恢复处理
255 self.process_network_available().await?;
256
257 Ok(())
258 }
259}
260
261/// Network Event Handle
262///
263/// Lightweight handle for sending network events and receiving processing results.
264/// Created by `ActrSystem::create_network_event_handle()`.
265pub struct NetworkEventHandle {
266 /// Event sender (to ActrNode)
267 event_tx: tokio::sync::mpsc::Sender<NetworkEvent>,
268
269 /// Result receiver (from ActrNode)
270 /// Wrapped in Arc<Mutex> to allow cloning
271 result_rx: Arc<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<NetworkEventResult>>>,
272}
273
274impl NetworkEventHandle {
275 /// Create a new NetworkEventHandle
276 pub fn new(
277 event_tx: tokio::sync::mpsc::Sender<NetworkEvent>,
278 result_rx: tokio::sync::mpsc::Receiver<NetworkEventResult>,
279 ) -> Self {
280 Self {
281 event_tx,
282 result_rx: Arc::new(tokio::sync::Mutex::new(result_rx)),
283 }
284 }
285
286 /// Handle network available event
287 ///
288 /// # Returns
289 /// - `Ok(NetworkEventResult)`: Processing result
290 /// - `Err(String)`: Failed to send event or receive result
291 pub async fn handle_network_available(&self) -> Result<NetworkEventResult, String> {
292 self.send_event_and_await_result(NetworkEvent::Available)
293 .await
294 }
295
296 /// Handle network lost event
297 ///
298 /// # Returns
299 /// - `Ok(NetworkEventResult)`: Processing result
300 /// - `Err(String)`: Failed to send event or receive result
301 pub async fn handle_network_lost(&self) -> Result<NetworkEventResult, String> {
302 self.send_event_and_await_result(NetworkEvent::Lost).await
303 }
304
305 /// Handle network type changed event
306 ///
307 /// # Returns
308 /// - `Ok(NetworkEventResult)`: Processing result
309 /// - `Err(String)`: Failed to send event or receive result
310 pub async fn handle_network_type_changed(
311 &self,
312 is_wifi: bool,
313 is_cellular: bool,
314 ) -> Result<NetworkEventResult, String> {
315 self.send_event_and_await_result(NetworkEvent::TypeChanged {
316 is_wifi,
317 is_cellular,
318 })
319 .await
320 }
321
322 /// Send event and await result (internal helper)
323 async fn send_event_and_await_result(
324 &self,
325 event: NetworkEvent,
326 ) -> Result<NetworkEventResult, String> {
327 // Send event
328 self.event_tx
329 .send(event.clone())
330 .await
331 .map_err(|e| format!("Failed to send network event: {}", e))?;
332
333 // Await result
334 let mut rx = self.result_rx.lock().await;
335 rx.recv()
336 .await
337 .ok_or_else(|| "Failed to receive network event result".to_string())
338 }
339}
340
341impl Clone for NetworkEventHandle {
342 fn clone(&self) -> Self {
343 Self {
344 event_tx: self.event_tx.clone(),
345 result_rx: self.result_rx.clone(),
346 }
347 }
348}