actr_runtime/transport/manager.rs
1//! OutprocTransportManager - Cross-process transport manager
2//!
3//! Manages transport layer for multiple Dests, providing unified send/recv interface
4//!
5//! # Naming Convention
6//! - **OutprocTransportManager**: Manages cross-process communication (WebRTC, WebSocket)
7//! - **InprocTransportManager**: Manages intra-process communication (mpsc channels)
8//!
9//! These two form a symmetric design, handling different transport scenarios
10
11use super::Dest; // Re-exported from actr-framework
12use super::dest_transport::DestTransport;
13use super::error::{NetworkError, NetworkResult};
14use super::wire_handle::WireHandle;
15use actr_protocol::{ActrId, PayloadType};
16use async_trait::async_trait;
17use either::Either;
18use std::collections::HashMap;
19use std::sync::Arc;
20use std::time::Duration;
21use tokio::sync::{Notify, RwLock};
22
23/// Wire builder trait: asynchronously creates Wire components based on Dest
24///
25/// Implement this trait to customize Wire layer component creation logic (e.g., WebRTC, WebSocket)
26#[async_trait]
27pub trait WireBuilder: Send + Sync {
28 /// Create Wire handle list to specified Dest
29 ///
30 /// # Arguments
31 /// - `dest`: Target destination
32 ///
33 /// # Returns
34 /// - Wire handle list (may contain multiple types: WebSocket, WebRTC, etc.)
35 async fn create_connections(&self, dest: &Dest) -> NetworkResult<Vec<WireHandle>>;
36}
37
38/// Destination transport state
39///
40/// Uses Either to manage connection lifecycle:
41/// - Left: Connecting state with shared Notify (multiple waiters)
42/// - Right: Connected state with DestTransport
43type DestState = Either<Arc<Notify>, Arc<DestTransport>>;
44
45/// OutprocTransportManager - Cross-process transport manager
46///
47/// Responsibilities:
48/// - Manage transport layer for multiple Dests (each Dest maps to one DestTransport)
49/// - Create DestTransport on-demand (lazy initialization)
50/// - Provide unified send/recv interface
51/// - Support custom connection factories
52/// - Prevent duplicate connection creation using Either state machine
53///
54/// # Comparison with InprocTransportManager
55/// - **OutprocTransportManager**: Cross-process, uses WebRTC/WebSocket
56/// - **InprocTransportManager**: Intra-process, uses mpsc channels, zero serialization
57///
58/// # State Machine
59/// ```text
60/// None → Connecting(Notify) → Connected(Transport)
61/// ↓ ↓
62/// (multiple waiters) (ready)
63/// ```
64pub struct OutprocTransportManager {
65 /// Local Actor ID
66 local_id: ActrId,
67
68 /// Dest → DestState mapping (Either state machine)
69 transports: Arc<RwLock<HashMap<Dest, DestState>>>,
70
71 /// Wire builder (used to create Wire handles for new DestTransport)
72 conn_factory: Arc<dyn WireBuilder>,
73}
74
75impl OutprocTransportManager {
76 /// Create new OutprocTransportManager
77 ///
78 /// # Arguments
79 /// - `local_id`: Local Actor ID
80 /// - `conn_factory`: Wire builder, asynchronously creates Wire handle list based on Dest
81 pub fn new(local_id: ActrId, conn_factory: Arc<dyn WireBuilder>) -> Self {
82 Self {
83 local_id,
84 transports: Arc::new(RwLock::new(HashMap::new())),
85 conn_factory,
86 }
87 }
88
89 /// Get or create DestTransport for specified Dest
90 ///
91 /// # Arguments
92 /// - `dest`: Target destination
93 ///
94 /// # Returns
95 /// - DestTransport for this Dest (Arc-shared)
96 ///
97 /// # State Machine
98 /// Uses Either to prevent duplicate connections:
99 /// 1. If Connected → return transport
100 /// 2. If Connecting → wait for notify, then retry
101 /// 3. If None → insert Connecting(notify), create connection outside lock
102 pub async fn get_or_create_transport(&self, dest: &Dest) -> NetworkResult<Arc<DestTransport>> {
103 loop {
104 // 1. Fast path: check current state
105 let state_opt = {
106 let transports = self.transports.read().await;
107 transports.get(dest).cloned()
108 };
109
110 match state_opt {
111 // Already connected - fast path
112 Some(Either::Right(transport)) => {
113 tracing::debug!("📦 Reusing existing DestTransport: {:?}", dest);
114 return Ok(transport);
115 }
116 // Currently connecting - wait for completion
117 Some(Either::Left(notify)) => {
118 tracing::debug!("⏳ Waiting for ongoing connection: {:?}", dest);
119 notify.notified().await;
120 // Retry after notification
121 continue;
122 }
123 // Not exists - need to create
124 None => {
125 // Enter slow path
126 }
127 }
128
129 // 2. Slow path: try to become the creator
130 let notify = {
131 let mut transports = self.transports.write().await;
132
133 // Double-check: may have been created while waiting for write lock
134 match transports.get(dest) {
135 Some(Either::Right(transport)) => {
136 return Ok(Arc::clone(transport));
137 }
138 Some(Either::Left(notify)) => {
139 // Another thread is creating, wait for it
140 Arc::clone(notify)
141 }
142 None => {
143 // We are the creator, insert Connecting state
144 let notify = Arc::new(Notify::new());
145 transports.insert(dest.clone(), Either::Left(Arc::clone(¬ify)));
146 tracing::debug!("🔄 Inserted Connecting state for: {:?}", dest);
147 Arc::clone(¬ify)
148 }
149 }
150 };
151
152 // Check if we are the creator (notify was just created)
153 let is_creator = {
154 let transports = self.transports.read().await;
155 matches!(transports.get(dest), Some(Either::Left(n)) if Arc::ptr_eq(n, ¬ify))
156 };
157
158 if !is_creator {
159 // Wait for the actual creator
160 tracing::debug!("⏳ Another thread is creating connection: {:?}", dest);
161 notify.notified().await;
162 continue;
163 }
164
165 // 3. We are the creator - create connections OUTSIDE lock
166 tracing::info!("🚀 Creating new connection for: {:?}", dest);
167
168 let result = async {
169 let connections = self.conn_factory.create_connections(dest).await?;
170
171 if connections.is_empty() {
172 return Err(NetworkError::ConfigurationError(format!(
173 "Connection factory returned no connections: {dest:?}"
174 )));
175 }
176
177 tracing::info!(
178 "✨ Creating DestTransport: {:?} ({} connections)",
179 dest,
180 connections.len()
181 );
182 let transport = DestTransport::new(dest.clone(), connections).await?;
183 Ok(Arc::new(transport))
184 }
185 .await;
186
187 // 4. Update state and notify waiters
188 let mut transports = self.transports.write().await;
189
190 match result {
191 Ok(transport) => {
192 tracing::info!("✅ Connection established: {:?}", dest);
193 transports.insert(dest.clone(), Either::Right(Arc::clone(&transport)));
194 drop(transports);
195 notify.notify_waiters();
196 return Ok(transport);
197 }
198 Err(e) => {
199 tracing::error!("❌ Connection failed: {:?}: {}", dest, e);
200 transports.remove(dest);
201 drop(transports);
202 notify.notify_waiters();
203 return Err(e);
204 }
205 }
206 }
207 }
208
209 /// Send message to specified Dest
210 ///
211 /// # Arguments
212 /// - `dest`: Target destination
213 /// - `payload_type`: Message type
214 /// - `data`: Message data
215 ///
216 /// # Example
217 ///
218 /// ```rust,ignore
219 /// mgr.send(&dest, PayloadType::RpcSignal, b"hello").await?;
220 /// ```
221 pub async fn send(
222 &self,
223 dest: &Dest,
224 payload_type: PayloadType,
225 data: &[u8],
226 ) -> NetworkResult<()> {
227 tracing::debug!(
228 "📤 [OutprocTransportManager] Sending to {:?}: type={:?}, size={}",
229 dest,
230 payload_type,
231 data.len()
232 );
233
234 // Get or create DestTransport for this Dest
235 let transport = self.get_or_create_transport(dest).await?;
236
237 // Send through DestTransport
238 transport.send(payload_type, data).await
239 }
240
241 /// Close DestTransport for specified Dest
242 ///
243 /// # Arguments
244 /// - `dest`: Target destination
245 pub async fn close_transport(&self, dest: &Dest) -> NetworkResult<()> {
246 let mut transports = self.transports.write().await;
247
248 if let Some(state) = transports.remove(dest) {
249 match state {
250 Either::Right(transport) => {
251 tracing::info!("🔌 Closing DestTransport: {:?}", dest);
252 transport.close().await?;
253 }
254 Either::Left(_notify) => {
255 tracing::debug!("⏸️ Removed Connecting state for: {:?}", dest);
256 }
257 }
258 }
259
260 Ok(())
261 }
262
263 /// Close all DestTransports
264 pub async fn close_all(&self) -> NetworkResult<()> {
265 let mut transports = self.transports.write().await;
266
267 tracing::info!(
268 "🔌 Closing all DestTransports (count: {})",
269 transports.len()
270 );
271
272 for (dest, state) in transports.drain() {
273 match state {
274 Either::Right(transport) => {
275 if let Err(e) = transport.close().await {
276 tracing::warn!("❌ Failed to close DestTransport {:?}: {}", dest, e);
277 }
278 }
279 Either::Left(_notify) => {
280 tracing::debug!("⏸️ Skipped Connecting state for: {:?}", dest);
281 }
282 }
283 }
284
285 Ok(())
286 }
287
288 /// Get count of currently managed Dests
289 pub async fn dest_count(&self) -> usize {
290 self.transports.read().await.len()
291 }
292
293 /// Get local Actor ID
294 #[inline]
295 pub fn local_id(&self) -> &ActrId {
296 &self.local_id
297 }
298
299 /// List all connected Dests
300 pub async fn list_dests(&self) -> Vec<Dest> {
301 self.transports.read().await.keys().cloned().collect()
302 }
303
304 /// Check if connection to specified Dest exists
305 pub async fn has_dest(&self, dest: &Dest) -> bool {
306 self.transports.read().await.contains_key(dest)
307 }
308
309 /// Spawn health checker background task with smart reconnect
310 ///
311 /// Periodically checks all DestTransport health status:
312 /// - If some connections failed → trigger smart reconnect (reuse working connections)
313 /// - If all connections failed → remove entire DestTransport
314 ///
315 /// # Arguments
316 /// - `interval`: Health check interval (recommended: 10-30 seconds)
317 ///
318 /// # Returns
319 /// - JoinHandle for the background task (can be used to cancel)
320 ///
321 /// # Example
322 /// ```rust,ignore
323 /// let mgr = Arc::new(OutprocTransportManager::new(local_id, factory));
324 /// let health_check_handle = mgr.spawn_health_checker(Duration::from_secs(10));
325 /// ```
326 pub fn spawn_health_checker(&self, interval: Duration) -> tokio::task::JoinHandle<()> {
327 let transports = Arc::clone(&self.transports);
328 let conn_factory = Arc::clone(&self.conn_factory);
329
330 tokio::spawn(async move {
331 let mut interval_timer = tokio::time::interval(interval);
332 interval_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
333
334 loop {
335 interval_timer.tick().await;
336
337 // Collect snapshot of connected Dests first (no async under lock)
338 let snapshot: Vec<(Dest, Arc<DestTransport>)> = {
339 let transports_read = transports.read().await;
340
341 transports_read
342 .iter()
343 .filter_map(|(dest, state)| {
344 // Only check Connected transports, skip Connecting
345 if let Either::Right(transport) = state {
346 Some((dest.clone(), Arc::clone(transport)))
347 } else {
348 None
349 }
350 })
351 .collect()
352 };
353
354 // Process each Dest outside of the lock
355 for (dest_clone, transport) in snapshot {
356 let healthy = transport.has_healthy_connection().await;
357
358 if !healthy {
359 // All connections failed - schedule for removal
360 tracing::warn!(
361 "💀 All connections failed for {:?}, will remove",
362 dest_clone
363 );
364
365 // Remove entire DestTransport
366 let mut transports_write = transports.write().await;
367 if let Some(Either::Right(transport)) = transports_write.remove(&dest_clone)
368 {
369 tracing::info!(
370 "🗑️ Removing completely failed DestTransport: {:?}",
371 dest_clone
372 );
373 // Drop lock before awaiting close
374 drop(transports_write);
375
376 if let Err(e) = transport.close().await {
377 tracing::warn!(
378 "❌ Failed to close DestTransport {:?}: {}",
379 dest_clone,
380 e
381 );
382 }
383 } else {
384 // State changed between snapshot and removal; skip safely
385 drop(transports_write);
386 }
387 } else {
388 // At least one connection is working
389 // Try to reconnect failed ones (smart reconnect)
390 tracing::debug!("🔄 Triggering smart reconnect for: {:?}", dest_clone);
391 if let Err(e) = transport
392 .retry_failed_connections(&dest_clone, conn_factory.as_ref())
393 .await
394 {
395 tracing::warn!("❌ Smart reconnect failed for {:?}: {}", dest_clone, e);
396 }
397 }
398 }
399 }
400 })
401 }
402}
403
404impl Drop for OutprocTransportManager {
405 fn drop(&mut self) {
406 tracing::debug!("🗑️ OutprocTransportManager dropped");
407 // Note: async cleanup requires external call to close_all()
408 }
409}
410
411#[cfg(test)]
412mod tests {
413 use super::*;
414
415 struct TestFactory;
416
417 #[async_trait]
418 impl WireBuilder for TestFactory {
419 async fn create_connections(&self, _dest: &Dest) -> NetworkResult<Vec<WireHandle>> {
420 // Test factory: returns empty list (real usage requires actual connections)
421 Ok(vec![])
422 }
423 }
424
425 fn create_test_factory() -> Arc<dyn WireBuilder> {
426 Arc::new(TestFactory)
427 }
428
429 #[tokio::test]
430 async fn test_transport_manager_creation() {
431 let local_id = ActrId::default();
432 let factory = create_test_factory();
433 let mgr = OutprocTransportManager::new(local_id.clone(), factory);
434
435 assert_eq!(mgr.dest_count().await, 0);
436 assert_eq!(mgr.local_id(), &local_id);
437 }
438
439 #[tokio::test]
440 async fn test_list_dests() {
441 let local_id = ActrId::default();
442 let factory = create_test_factory();
443 let mgr = OutprocTransportManager::new(local_id, factory);
444
445 let dests = mgr.list_dests().await;
446 assert_eq!(dests.len(), 0);
447 }
448
449 #[tokio::test]
450 async fn test_has_dest() {
451 let local_id = ActrId::default();
452 let factory = create_test_factory();
453 let mgr = OutprocTransportManager::new(local_id, factory);
454
455 let dest = Dest::shell();
456 assert!(!mgr.has_dest(&dest).await);
457 }
458}