actr_runtime/transport/
manager.rs

1//! OutprocTransportManager - Cross-process transport manager
2//!
3//! Manages transport layer for multiple Dests, providing unified send/recv interface
4//!
5//! # Naming Convention
6//! - **OutprocTransportManager**: Manages cross-process communication (WebRTC, WebSocket)
7//! - **InprocTransportManager**: Manages intra-process communication (mpsc channels)
8//!
9//! These two form a symmetric design, handling different transport scenarios
10
11use super::Dest; // Re-exported from actr-framework
12use super::dest_transport::DestTransport;
13use super::error::{NetworkError, NetworkResult};
14use super::wire_handle::WireHandle;
15use actr_protocol::{ActrId, PayloadType};
16use async_trait::async_trait;
17use either::Either;
18use std::collections::HashMap;
19use std::sync::Arc;
20use std::time::Duration;
21use tokio::sync::{Notify, RwLock};
22
23/// Wire builder trait: asynchronously creates Wire components based on Dest
24///
25/// Implement this trait to customize Wire layer component creation logic (e.g., WebRTC, WebSocket)
26#[async_trait]
27pub trait WireBuilder: Send + Sync {
28    /// Create Wire handle list to specified Dest
29    ///
30    /// # Arguments
31    /// - `dest`: Target destination
32    ///
33    /// # Returns
34    /// - Wire handle list (may contain multiple types: WebSocket, WebRTC, etc.)
35    async fn create_connections(&self, dest: &Dest) -> NetworkResult<Vec<WireHandle>>;
36}
37
38/// Destination transport state
39///
40/// Uses Either to manage connection lifecycle:
41/// - Left: Connecting state with shared Notify (multiple waiters)
42/// - Right: Connected state with DestTransport
43type DestState = Either<Arc<Notify>, Arc<DestTransport>>;
44
45/// OutprocTransportManager - Cross-process transport manager
46///
47/// Responsibilities:
48/// - Manage transport layer for multiple Dests (each Dest maps to one DestTransport)
49/// - Create DestTransport on-demand (lazy initialization)
50/// - Provide unified send/recv interface
51/// - Support custom connection factories
52/// - Prevent duplicate connection creation using Either state machine
53///
54/// # Comparison with InprocTransportManager
55/// - **OutprocTransportManager**: Cross-process, uses WebRTC/WebSocket
56/// - **InprocTransportManager**: Intra-process, uses mpsc channels, zero serialization
57///
58/// # State Machine
59/// ```text
60/// None → Connecting(Notify) → Connected(Transport)
61///         ↓                      ↓
62///      (multiple waiters)     (ready)
63/// ```
64pub struct OutprocTransportManager {
65    /// Local Actor ID
66    local_id: ActrId,
67
68    /// Dest → DestState mapping (Either state machine)
69    transports: Arc<RwLock<HashMap<Dest, DestState>>>,
70
71    /// Wire builder (used to create Wire handles for new DestTransport)
72    conn_factory: Arc<dyn WireBuilder>,
73}
74
75impl OutprocTransportManager {
76    /// Create new OutprocTransportManager
77    ///
78    /// # Arguments
79    /// - `local_id`: Local Actor ID
80    /// - `conn_factory`: Wire builder, asynchronously creates Wire handle list based on Dest
81    pub fn new(local_id: ActrId, conn_factory: Arc<dyn WireBuilder>) -> Self {
82        Self {
83            local_id,
84            transports: Arc::new(RwLock::new(HashMap::new())),
85            conn_factory,
86        }
87    }
88
89    /// Get or create DestTransport for specified Dest
90    ///
91    /// # Arguments
92    /// - `dest`: Target destination
93    ///
94    /// # Returns
95    /// - DestTransport for this Dest (Arc-shared)
96    ///
97    /// # State Machine
98    /// Uses Either to prevent duplicate connections:
99    /// 1. If Connected → return transport
100    /// 2. If Connecting → wait for notify, then retry
101    /// 3. If None → insert Connecting(notify), create connection outside lock
102    pub async fn get_or_create_transport(&self, dest: &Dest) -> NetworkResult<Arc<DestTransport>> {
103        loop {
104            // 1. Fast path: check current state
105            let state_opt = {
106                let transports = self.transports.read().await;
107                transports.get(dest).cloned()
108            };
109
110            match state_opt {
111                // Already connected - fast path
112                Some(Either::Right(transport)) => {
113                    tracing::debug!("📦 Reusing existing DestTransport: {:?}", dest);
114                    return Ok(transport);
115                }
116                // Currently connecting - wait for completion
117                Some(Either::Left(notify)) => {
118                    tracing::debug!("⏳ Waiting for ongoing connection: {:?}", dest);
119                    notify.notified().await;
120                    // Retry after notification
121                    continue;
122                }
123                // Not exists - need to create
124                None => {
125                    // Enter slow path
126                }
127            }
128
129            // 2. Slow path: try to become the creator
130            let notify = {
131                let mut transports = self.transports.write().await;
132
133                // Double-check: may have been created while waiting for write lock
134                match transports.get(dest) {
135                    Some(Either::Right(transport)) => {
136                        return Ok(Arc::clone(transport));
137                    }
138                    Some(Either::Left(notify)) => {
139                        // Another thread is creating, wait for it
140                        Arc::clone(notify)
141                    }
142                    None => {
143                        // We are the creator, insert Connecting state
144                        let notify = Arc::new(Notify::new());
145                        transports.insert(dest.clone(), Either::Left(Arc::clone(&notify)));
146                        tracing::debug!("🔄 Inserted Connecting state for: {:?}", dest);
147                        Arc::clone(&notify)
148                    }
149                }
150            };
151
152            // Check if we are the creator (notify was just created)
153            let is_creator = {
154                let transports = self.transports.read().await;
155                matches!(transports.get(dest), Some(Either::Left(n)) if Arc::ptr_eq(n, &notify))
156            };
157
158            if !is_creator {
159                // Wait for the actual creator
160                tracing::debug!("⏳ Another thread is creating connection: {:?}", dest);
161                notify.notified().await;
162                continue;
163            }
164
165            // 3. We are the creator - create connections OUTSIDE lock
166            tracing::info!("🚀 Creating new connection for: {:?}", dest);
167
168            let result = async {
169                let connections = self.conn_factory.create_connections(dest).await?;
170
171                if connections.is_empty() {
172                    return Err(NetworkError::ConfigurationError(format!(
173                        "Connection factory returned no connections: {dest:?}"
174                    )));
175                }
176
177                tracing::info!(
178                    "✨ Creating DestTransport: {:?} ({} connections)",
179                    dest,
180                    connections.len()
181                );
182                let transport = DestTransport::new(dest.clone(), connections).await?;
183                Ok(Arc::new(transport))
184            }
185            .await;
186
187            // 4. Update state and notify waiters
188            let mut transports = self.transports.write().await;
189
190            match result {
191                Ok(transport) => {
192                    tracing::info!("✅ Connection established: {:?}", dest);
193                    transports.insert(dest.clone(), Either::Right(Arc::clone(&transport)));
194                    drop(transports);
195                    notify.notify_waiters();
196                    return Ok(transport);
197                }
198                Err(e) => {
199                    tracing::error!("❌ Connection failed: {:?}: {}", dest, e);
200                    transports.remove(dest);
201                    drop(transports);
202                    notify.notify_waiters();
203                    return Err(e);
204                }
205            }
206        }
207    }
208
209    /// Send message to specified Dest
210    ///
211    /// # Arguments
212    /// - `dest`: Target destination
213    /// - `payload_type`: Message type
214    /// - `data`: Message data
215    ///
216    /// # Example
217    ///
218    /// ```rust,ignore
219    /// mgr.send(&dest, PayloadType::RpcSignal, b"hello").await?;
220    /// ```
221    pub async fn send(
222        &self,
223        dest: &Dest,
224        payload_type: PayloadType,
225        data: &[u8],
226    ) -> NetworkResult<()> {
227        tracing::debug!(
228            "📤 [OutprocTransportManager] Sending to {:?}: type={:?}, size={}",
229            dest,
230            payload_type,
231            data.len()
232        );
233
234        // Get or create DestTransport for this Dest
235        let transport = self.get_or_create_transport(dest).await?;
236
237        // Send through DestTransport
238        transport.send(payload_type, data).await
239    }
240
241    /// Close DestTransport for specified Dest
242    ///
243    /// # Arguments
244    /// - `dest`: Target destination
245    pub async fn close_transport(&self, dest: &Dest) -> NetworkResult<()> {
246        let mut transports = self.transports.write().await;
247
248        if let Some(state) = transports.remove(dest) {
249            match state {
250                Either::Right(transport) => {
251                    tracing::info!("🔌 Closing DestTransport: {:?}", dest);
252                    transport.close().await?;
253                }
254                Either::Left(_notify) => {
255                    tracing::debug!("⏸️ Removed Connecting state for: {:?}", dest);
256                }
257            }
258        }
259
260        Ok(())
261    }
262
263    /// Close all DestTransports
264    pub async fn close_all(&self) -> NetworkResult<()> {
265        let mut transports = self.transports.write().await;
266
267        tracing::info!(
268            "🔌 Closing all DestTransports (count: {})",
269            transports.len()
270        );
271
272        for (dest, state) in transports.drain() {
273            match state {
274                Either::Right(transport) => {
275                    if let Err(e) = transport.close().await {
276                        tracing::warn!("❌ Failed to close DestTransport {:?}: {}", dest, e);
277                    }
278                }
279                Either::Left(_notify) => {
280                    tracing::debug!("⏸️ Skipped Connecting state for: {:?}", dest);
281                }
282            }
283        }
284
285        Ok(())
286    }
287
288    /// Get count of currently managed Dests
289    pub async fn dest_count(&self) -> usize {
290        self.transports.read().await.len()
291    }
292
293    /// Get local Actor ID
294    #[inline]
295    pub fn local_id(&self) -> &ActrId {
296        &self.local_id
297    }
298
299    /// List all connected Dests
300    pub async fn list_dests(&self) -> Vec<Dest> {
301        self.transports.read().await.keys().cloned().collect()
302    }
303
304    /// Check if connection to specified Dest exists
305    pub async fn has_dest(&self, dest: &Dest) -> bool {
306        self.transports.read().await.contains_key(dest)
307    }
308
309    /// Spawn health checker background task with smart reconnect
310    ///
311    /// Periodically checks all DestTransport health status:
312    /// - If some connections failed → trigger smart reconnect (reuse working connections)
313    /// - If all connections failed → remove entire DestTransport
314    ///
315    /// # Arguments
316    /// - `interval`: Health check interval (recommended: 10-30 seconds)
317    ///
318    /// # Returns
319    /// - JoinHandle for the background task (can be used to cancel)
320    ///
321    /// # Example
322    /// ```rust,ignore
323    /// let mgr = Arc::new(OutprocTransportManager::new(local_id, factory));
324    /// let health_check_handle = mgr.spawn_health_checker(Duration::from_secs(10));
325    /// ```
326    pub fn spawn_health_checker(&self, interval: Duration) -> tokio::task::JoinHandle<()> {
327        let transports = Arc::clone(&self.transports);
328        let conn_factory = Arc::clone(&self.conn_factory);
329
330        tokio::spawn(async move {
331            let mut interval_timer = tokio::time::interval(interval);
332            interval_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
333
334            loop {
335                interval_timer.tick().await;
336
337                // Collect snapshot of connected Dests first (no async under lock)
338                let snapshot: Vec<(Dest, Arc<DestTransport>)> = {
339                    let transports_read = transports.read().await;
340
341                    transports_read
342                        .iter()
343                        .filter_map(|(dest, state)| {
344                            // Only check Connected transports, skip Connecting
345                            if let Either::Right(transport) = state {
346                                Some((dest.clone(), Arc::clone(transport)))
347                            } else {
348                                None
349                            }
350                        })
351                        .collect()
352                };
353
354                // Process each Dest outside of the lock
355                for (dest_clone, transport) in snapshot {
356                    let healthy = transport.has_healthy_connection().await;
357
358                    if !healthy {
359                        // All connections failed - schedule for removal
360                        tracing::warn!(
361                            "💀 All connections failed for {:?}, will remove",
362                            dest_clone
363                        );
364
365                        // Remove entire DestTransport
366                        let mut transports_write = transports.write().await;
367                        if let Some(Either::Right(transport)) = transports_write.remove(&dest_clone)
368                        {
369                            tracing::info!(
370                                "🗑️  Removing completely failed DestTransport: {:?}",
371                                dest_clone
372                            );
373                            // Drop lock before awaiting close
374                            drop(transports_write);
375
376                            if let Err(e) = transport.close().await {
377                                tracing::warn!(
378                                    "❌ Failed to close DestTransport {:?}: {}",
379                                    dest_clone,
380                                    e
381                                );
382                            }
383                        } else {
384                            // State changed between snapshot and removal; skip safely
385                            drop(transports_write);
386                        }
387                    } else {
388                        // At least one connection is working
389                        // Try to reconnect failed ones (smart reconnect)
390                        tracing::debug!("🔄 Triggering smart reconnect for: {:?}", dest_clone);
391                        if let Err(e) = transport
392                            .retry_failed_connections(&dest_clone, conn_factory.as_ref())
393                            .await
394                        {
395                            tracing::warn!("❌ Smart reconnect failed for {:?}: {}", dest_clone, e);
396                        }
397                    }
398                }
399            }
400        })
401    }
402}
403
404impl Drop for OutprocTransportManager {
405    fn drop(&mut self) {
406        tracing::debug!("🗑️  OutprocTransportManager dropped");
407        // Note: async cleanup requires external call to close_all()
408    }
409}
410
411#[cfg(test)]
412mod tests {
413    use super::*;
414
415    struct TestFactory;
416
417    #[async_trait]
418    impl WireBuilder for TestFactory {
419        async fn create_connections(&self, _dest: &Dest) -> NetworkResult<Vec<WireHandle>> {
420            // Test factory: returns empty list (real usage requires actual connections)
421            Ok(vec![])
422        }
423    }
424
425    fn create_test_factory() -> Arc<dyn WireBuilder> {
426        Arc::new(TestFactory)
427    }
428
429    #[tokio::test]
430    async fn test_transport_manager_creation() {
431        let local_id = ActrId::default();
432        let factory = create_test_factory();
433        let mgr = OutprocTransportManager::new(local_id.clone(), factory);
434
435        assert_eq!(mgr.dest_count().await, 0);
436        assert_eq!(mgr.local_id(), &local_id);
437    }
438
439    #[tokio::test]
440    async fn test_list_dests() {
441        let local_id = ActrId::default();
442        let factory = create_test_factory();
443        let mgr = OutprocTransportManager::new(local_id, factory);
444
445        let dests = mgr.list_dests().await;
446        assert_eq!(dests.len(), 0);
447    }
448
449    #[tokio::test]
450    async fn test_has_dest() {
451        let local_id = ActrId::default();
452        let factory = create_test_factory();
453        let mgr = OutprocTransportManager::new(local_id, factory);
454
455        let dest = Dest::shell();
456        assert!(!mgr.has_dest(&dest).await);
457    }
458}