actr_runtime/transport/
dest_transport.rs

1//! DestTransport - Transport layer manager for a single destination
2//!
3//! Manages all connections and message routing to a specific Dest (Actor or Shell).
4//! Implements event-driven pattern with zero polling.
5
6use super::Dest; // Re-exported from actr-framework
7use super::error::{NetworkError, NetworkResult};
8use super::route_table::PayloadTypeExt;
9use super::wire_handle::WireHandle;
10use super::wire_pool::{ConnType, ReadySet, RetryConfig, WirePool};
11use actr_protocol::PayloadType;
12use std::sync::Arc;
13use tokio::sync::watch;
14
15/// DestTransport - Transport layer manager for a single destination
16///
17/// Core responsibilities:
18/// - Manage all connections to a specific Dest (WebSocket + WebRTC)
19/// - Concurrently establish connections in background (saturated connection pattern)
20/// - Event-driven wait for connection status
21/// - Cache Lanes within WireHandle
22/// - WirePool handles priority selection
23pub struct DestTransport {
24    /// Connection manager
25    conn_mgr: Arc<WirePool>,
26}
27
28impl DestTransport {
29    /// Create new DestTransport
30    ///
31    /// # Arguments
32    /// - `dest`: destination
33    /// - `connections`: list of pre-built connections (WebSocket/WebRTC)
34    pub async fn new(dest: Dest, connections: Vec<WireHandle>) -> NetworkResult<Self> {
35        let conn_mgr = Arc::new(WirePool::new(RetryConfig::default()));
36
37        // Start connection tasks in background (concurrently)
38        tracing::info!("🚀 [{:?}] Starting connection tasks...", dest);
39        for conn in connections {
40            conn_mgr.add_connection(conn);
41        }
42
43        Ok(Self { conn_mgr })
44    }
45
46    /// Send message
47    ///
48    /// Core design: event-driven waiting
49    /// - If connection available, send immediately
50    /// - If not, wait for connection status change (via watch channel)
51    /// - WirePool already handles priority, only need to try DataLane Types in order
52    #[cfg_attr(
53        feature = "opentelemetry",
54        tracing::instrument(skip_all, name = "DestTransport.send")
55    )]
56    pub async fn send(&self, payload_type: PayloadType, data: &[u8]) -> NetworkResult<()> {
57        tracing::debug!(
58            "📤 Sending message: type={:?}, size={}",
59            payload_type,
60            data.len()
61        );
62
63        // 1. Get supported DataLane Types for this PayloadType (already prioritized)
64        let lane_types = payload_type.data_lane_types();
65
66        if lane_types.is_empty() {
67            return Err(NetworkError::NoRoute(format!(
68                "No route for: {payload_type:?}"
69            )));
70        }
71
72        // 2. Subscribe to connection status changes
73        let mut conn_watcher = self.conn_mgr.watch_ready();
74
75        loop {
76            // 3. Check currently available connections (clone to avoid borrowing across await)
77            let ready_connections = {
78                let ready = conn_watcher.borrow_and_update();
79                tracing::trace!("🔍 Available connections: {:?}", ready);
80                ready.clone()
81            };
82
83            // 4. Try each DataLane Type in priority order
84            for &lane_type in lane_types {
85                // Determine required connection type
86                let conn_type = if lane_type.needs_webrtc() {
87                    ConnType::WebRTC
88                } else {
89                    ConnType::WebSocket
90                };
91
92                // Check if this connection is ready
93                if !ready_connections.contains(&conn_type) {
94                    tracing::trace!("🔍 {:?} not ready, trying next", conn_type);
95                    continue;
96                }
97
98                // Get connection and create/get Lane
99                if let Some(conn) = self.conn_mgr.get_connection(conn_type).await {
100                    // Use original payload_type to create DataLane
101                    match conn.get_lane(payload_type).await {
102                        Ok(lane) => {
103                            tracing::debug!(
104                                "✅ Using DataLane: {:?} (type={:?})",
105                                lane_type,
106                                payload_type
107                            );
108                            // Convert to Bytes (zero-copy)
109                            let payload = bytes::Bytes::copy_from_slice(data);
110                            let result = lane.send(payload.clone()).await;
111
112                            // If DataChannel is closed, drop cache and retry once.
113                            if let Err(NetworkError::DataChannelError(msg)) = &result {
114                                if msg.contains("closed") {
115                                    tracing::warn!(
116                                        "♻️ DataChannel closed for {:?}, invalidating lane and retrying once",
117                                        payload_type
118                                    );
119                                    conn.invalidate_lane(payload_type).await;
120                                    if let Ok(new_lane) = conn.get_lane(payload_type).await {
121                                        return new_lane.send(payload).await;
122                                    }
123                                }
124                            }
125
126                            return result;
127                        }
128                        Err(e) => {
129                            tracing::warn!("❌ Failed to get DataLane: {:?}: {}", lane_type, e);
130                            continue;
131                        }
132                    }
133                }
134            }
135
136            // 5. All attempts failed, wait for connection status change
137            tracing::info!("⏳ Waiting for connection status...");
138
139            if self.conn_mgr.is_closed() {
140                return Err(NetworkError::ChannelClosed(
141                    "connection manager closed".into(),
142                ));
143            }
144
145            // Event-driven wait!
146            if conn_watcher.changed().await.is_err() {
147                return Err(NetworkError::ChannelClosed(
148                    "connection manager closed".into(),
149                ));
150            }
151
152            tracing::debug!("🔔 Connection status updated, retrying...");
153        }
154    }
155
156    /// Retry failed connections (smart reconnect)
157    ///
158    /// # Behavior
159    /// - Calls WireBuilder to create new connections
160    /// - Uses `add_connection_smart()` to skip already-working connections
161    /// - Perfect for reconnection after detecting connection failures
162    ///
163    /// # Arguments
164    /// - `dest`: destination (used by WireBuilder)
165    /// - `wire_builder`: factory to create new WireHandles
166    pub async fn retry_failed_connections(
167        &self,
168        dest: &Dest,
169        wire_builder: &dyn super::WireBuilder,
170    ) -> NetworkResult<()> {
171        tracing::info!("🔄 Retrying failed connections for: {:?}", dest);
172
173        // Get fresh connections from builder (no cancel token for retry)
174        let connections = wire_builder
175            .create_connections_with_cancel(dest, None)
176            .await?;
177
178        if connections.is_empty() {
179            return Err(NetworkError::ConfigurationError(
180                "WireBuilder returned no connections".to_string(),
181            ));
182        }
183
184        // Add each connection smartly (skip Ready/Connecting)
185        for conn in connections {
186            self.conn_mgr.add_connection_smart(conn).await;
187        }
188
189        Ok(())
190    }
191
192    /// Close DestTransport and release all connection resources
193    pub async fn close(&self) -> NetworkResult<()> {
194        tracing::info!("🔌 Closing DestTransport");
195
196        // 1. Get all connections and close them one by one
197        for conn_type in [ConnType::WebSocket, ConnType::WebRTC] {
198            if let Some(conn) = self.conn_mgr.get_connection(conn_type).await {
199                if let Err(e) = conn.close().await {
200                    tracing::warn!("❌ Failed to close {:?} connection: {}", conn_type, e);
201                } else {
202                    tracing::debug!("✅ Closed {:?} connection", conn_type);
203                }
204
205                // 2. Mark connection as closed in the pool
206                // This updates the ready_tx and notifies waiters
207                self.conn_mgr.mark_connection_closed(conn_type).await;
208            }
209        }
210
211        // 3. Clean up the entire pool
212        self.conn_mgr.close_all().await;
213
214        Ok(())
215    }
216
217    /// Check if any connection is still healthy
218    ///
219    /// Used by health checker to detect failed connections
220    ///
221    /// # Returns
222    /// - `true`: at least one connection is healthy (connected)
223    /// - `false`: all connections are unhealthy or no connections exist
224    pub async fn has_healthy_connection(&self) -> bool {
225        for conn_type in [ConnType::WebRTC, ConnType::WebSocket] {
226            if let Some(conn) = self.conn_mgr.get_connection(conn_type).await {
227                if conn.is_connected() {
228                    return true;
229                }
230            }
231        }
232        false
233    }
234
235    /// Subscribe to ready-set changes (used for manager-side cleanup).
236    pub fn watch_ready(&self) -> watch::Receiver<ReadySet> {
237        self.conn_mgr.watch_ready()
238    }
239}