actr_runtime/transport/
dest_transport.rs

1//! DestTransport - Transport layer manager for a single destination
2//!
3//! Manages all connections and message routing to a specific Dest (Actor or Shell).
4//! Implements event-driven pattern with zero polling.
5
6use super::Dest; // Re-exported from actr-framework
7use super::error::{NetworkError, NetworkResult};
8use super::route_table::PayloadTypeExt;
9use super::wire_handle::WireHandle;
10use super::wire_pool::{ConnType, RetryConfig, WirePool};
11use actr_protocol::PayloadType;
12use std::sync::Arc;
13
14/// DestTransport - Transport layer manager for a single destination
15///
16/// Core responsibilities:
17/// - Manage all connections to a specific Dest (WebSocket + WebRTC)
18/// - Concurrently establish connections in background (saturated connection pattern)
19/// - Event-driven wait for connection status
20/// - Cache Lanes within WireHandle
21/// - WirePool handles priority selection
22pub struct DestTransport {
23    /// Connection manager
24    conn_mgr: Arc<WirePool>,
25}
26
27impl DestTransport {
28    /// Create new DestTransport
29    ///
30    /// # Arguments
31    /// - `dest`: destination
32    /// - `connections`: list of pre-built connections (WebSocket/WebRTC)
33    pub async fn new(dest: Dest, connections: Vec<WireHandle>) -> NetworkResult<Self> {
34        let conn_mgr = Arc::new(WirePool::new(RetryConfig::default()));
35
36        // Start connection tasks in background (concurrently)
37        tracing::info!("🚀 [{:?}] Starting connection tasks...", dest);
38        for conn in connections {
39            conn_mgr.add_connection(conn);
40        }
41
42        Ok(Self { conn_mgr })
43    }
44
45    /// Send message
46    ///
47    /// Core design: event-driven waiting
48    /// - If connection available, send immediately
49    /// - If not, wait for connection status change (via watch channel)
50    /// - WirePool already handles priority, only need to try DataLane Types in order
51    pub async fn send(&self, payload_type: PayloadType, data: &[u8]) -> NetworkResult<()> {
52        tracing::debug!(
53            "📤 Sending message: type={:?}, size={}",
54            payload_type,
55            data.len()
56        );
57
58        // 1. Get supported DataLane Types for this PayloadType (already prioritized)
59        let lane_types = payload_type.data_lane_types();
60
61        if lane_types.is_empty() {
62            return Err(NetworkError::NoRoute(format!(
63                "No route for: {payload_type:?}"
64            )));
65        }
66
67        // 2. Subscribe to connection status changes
68        let mut conn_watcher = self.conn_mgr.watch_ready();
69
70        loop {
71            // 3. Check currently available connections (clone to avoid borrowing across await)
72            let ready_connections = {
73                let ready = conn_watcher.borrow_and_update();
74                tracing::trace!("🔍 Available connections: {:?}", ready);
75                ready.clone()
76            };
77
78            // 4. Try each DataLane Type in priority order
79            for &lane_type in lane_types {
80                // Determine required connection type
81                let conn_type = if lane_type.needs_webrtc() {
82                    ConnType::WebRTC
83                } else {
84                    ConnType::WebSocket
85                };
86
87                // Check if this connection is ready
88                if !ready_connections.contains(&conn_type) {
89                    tracing::trace!("🔍 {:?} not ready, trying next", conn_type);
90                    continue;
91                }
92
93                // Get connection and create/get Lane
94                if let Some(conn) = self.conn_mgr.get_connection(conn_type).await {
95                    // Use original payload_type to create DataLane
96                    match conn.get_lane(payload_type).await {
97                        Ok(lane) => {
98                            tracing::debug!(
99                                "✅ Using DataLane: {:?} (type={:?})",
100                                lane_type,
101                                payload_type
102                            );
103                            // Convert to Bytes (zero-copy)
104                            return lane.send(bytes::Bytes::copy_from_slice(data)).await;
105                        }
106                        Err(e) => {
107                            tracing::warn!("❌ Failed to get DataLane: {:?}: {}", lane_type, e);
108                            continue;
109                        }
110                    }
111                }
112            }
113
114            // 5. All attempts failed, wait for connection status change
115            tracing::info!("⏳ Waiting for connection status...");
116
117            // Event-driven wait!
118            if conn_watcher.changed().await.is_err() {
119                return Err(NetworkError::ChannelClosed(
120                    "connection manager closed".into(),
121                ));
122            }
123
124            tracing::debug!("🔔 Connection status updated, retrying...");
125        }
126    }
127
128    /// Retry failed connections (smart reconnect)
129    ///
130    /// # Behavior
131    /// - Calls WireBuilder to create new connections
132    /// - Uses `add_connection_smart()` to skip already-working connections
133    /// - Perfect for reconnection after detecting connection failures
134    ///
135    /// # Arguments
136    /// - `dest`: destination (used by WireBuilder)
137    /// - `wire_builder`: factory to create new WireHandles
138    pub async fn retry_failed_connections(
139        &self,
140        dest: &Dest,
141        wire_builder: &dyn super::WireBuilder,
142    ) -> NetworkResult<()> {
143        tracing::info!("🔄 Retrying failed connections for: {:?}", dest);
144
145        // Get fresh connections from builder
146        let connections = wire_builder.create_connections(dest).await?;
147
148        if connections.is_empty() {
149            return Err(NetworkError::ConfigurationError(
150                "WireBuilder returned no connections".to_string(),
151            ));
152        }
153
154        // Add each connection smartly (skip Ready/Connecting)
155        for conn in connections {
156            self.conn_mgr.add_connection_smart(conn).await;
157        }
158
159        Ok(())
160    }
161
162    /// Close DestTransport and release all connection resources
163    pub async fn close(&self) -> NetworkResult<()> {
164        tracing::info!("🔌 Closing DestTransport");
165
166        // Get all connections and close them one by one
167        for conn_type in [ConnType::WebSocket, ConnType::WebRTC] {
168            if let Some(conn) = self.conn_mgr.get_connection(conn_type).await {
169                if let Err(e) = conn.close().await {
170                    tracing::warn!("❌ Failed to close {:?} connection: {}", conn_type, e);
171                } else {
172                    tracing::debug!("✅ Closed {:?} connection", conn_type);
173                }
174            }
175        }
176
177        Ok(())
178    }
179
180    /// Check if any connection is still healthy
181    ///
182    /// Used by health checker to detect failed connections
183    ///
184    /// # Returns
185    /// - `true`: at least one connection is healthy (connected)
186    /// - `false`: all connections are unhealthy or no connections exist
187    pub async fn has_healthy_connection(&self) -> bool {
188        for conn_type in [ConnType::WebRTC, ConnType::WebSocket] {
189            if let Some(conn) = self.conn_mgr.get_connection(conn_type).await {
190                if conn.is_connected() {
191                    return true;
192                }
193            }
194        }
195        false
196    }
197}