actr_runtime/transport/dest_transport.rs
1//! DestTransport - Transport layer manager for a single destination
2//!
3//! Manages all connections and message routing to a specific Dest (Actor or Shell).
4//! Implements event-driven pattern with zero polling.
5
6use super::Dest; // Re-exported from actr-framework
7use super::error::{NetworkError, NetworkResult};
8use super::route_table::PayloadTypeExt;
9use super::wire_handle::WireHandle;
10use super::wire_pool::{ConnType, ReadySet, RetryConfig, WirePool};
11use actr_protocol::PayloadType;
12use std::sync::Arc;
13use tokio::sync::watch;
14
15/// DestTransport - Transport layer manager for a single destination
16///
17/// Core responsibilities:
18/// - Manage all connections to a specific Dest (WebSocket + WebRTC)
19/// - Concurrently establish connections in background (saturated connection pattern)
20/// - Event-driven wait for connection status
21/// - Cache Lanes within WireHandle
22/// - WirePool handles priority selection
23pub struct DestTransport {
24 /// Connection manager
25 conn_mgr: Arc<WirePool>,
26}
27
28impl DestTransport {
29 /// Create new DestTransport
30 ///
31 /// # Arguments
32 /// - `dest`: destination
33 /// - `connections`: list of pre-built connections (WebSocket/WebRTC)
34 pub async fn new(dest: Dest, connections: Vec<WireHandle>) -> NetworkResult<Self> {
35 let conn_mgr = Arc::new(WirePool::new(RetryConfig::default()));
36
37 // Start connection tasks in background (concurrently)
38 tracing::info!("🚀 [{:?}] Starting connection tasks...", dest);
39 for conn in connections {
40 conn_mgr.add_connection(conn);
41 }
42
43 Ok(Self { conn_mgr })
44 }
45
46 /// Send message
47 ///
48 /// Core design: event-driven waiting
49 /// - If connection available, send immediately
50 /// - If not, wait for connection status change (via watch channel)
51 /// - WirePool already handles priority, only need to try DataLane Types in order
52 #[cfg_attr(
53 feature = "opentelemetry",
54 tracing::instrument(skip_all, name = "DestTransport.send")
55 )]
56 pub async fn send(&self, payload_type: PayloadType, data: &[u8]) -> NetworkResult<()> {
57 tracing::debug!(
58 "📤 Sending message: type={:?}, size={}",
59 payload_type,
60 data.len()
61 );
62
63 // 1. Get supported DataLane Types for this PayloadType (already prioritized)
64 let lane_types = payload_type.data_lane_types();
65
66 if lane_types.is_empty() {
67 return Err(NetworkError::NoRoute(format!(
68 "No route for: {payload_type:?}"
69 )));
70 }
71
72 // 2. Subscribe to connection status changes
73 let mut conn_watcher = self.conn_mgr.watch_ready();
74
75 loop {
76 // 3. Check currently available connections (clone to avoid borrowing across await)
77 let ready_connections = {
78 let ready = conn_watcher.borrow_and_update();
79 tracing::trace!("🔍 Available connections: {:?}", ready);
80 ready.clone()
81 };
82
83 // 4. Try each DataLane Type in priority order
84 for &lane_type in lane_types {
85 // Determine required connection type
86 let conn_type = if lane_type.needs_webrtc() {
87 ConnType::WebRTC
88 } else {
89 ConnType::WebSocket
90 };
91
92 // Check if this connection is ready
93 if !ready_connections.contains(&conn_type) {
94 tracing::trace!("🔍 {:?} not ready, trying next", conn_type);
95 continue;
96 }
97
98 // Get connection and create/get Lane
99 if let Some(conn) = self.conn_mgr.get_connection(conn_type).await {
100 // Use original payload_type to create DataLane
101 match conn.get_lane(payload_type).await {
102 Ok(lane) => {
103 tracing::debug!(
104 "✅ Using DataLane: {:?} (type={:?})",
105 lane_type,
106 payload_type
107 );
108 // Convert to Bytes (zero-copy)
109 let payload = bytes::Bytes::copy_from_slice(data);
110 let result = lane.send(payload.clone()).await;
111
112 // If DataChannel is closed, drop cache and retry once.
113 if let Err(NetworkError::DataChannelError(msg)) = &result {
114 if msg.contains("closed") {
115 tracing::warn!(
116 "♻️ DataChannel closed for {:?}, invalidating lane and retrying once",
117 payload_type
118 );
119 conn.invalidate_lane(payload_type).await;
120 if let Ok(new_lane) = conn.get_lane(payload_type).await {
121 return new_lane.send(payload).await;
122 }
123 }
124 }
125
126 return result;
127 }
128 Err(e) => {
129 tracing::warn!("❌ Failed to get DataLane: {:?}: {}", lane_type, e);
130 continue;
131 }
132 }
133 }
134 }
135
136 // 5. All attempts failed, wait for connection status change
137 tracing::info!("⏳ Waiting for connection status...");
138
139 if self.conn_mgr.is_closed() {
140 return Err(NetworkError::ChannelClosed(
141 "connection manager closed".into(),
142 ));
143 }
144
145 // Event-driven wait!
146 if conn_watcher.changed().await.is_err() {
147 return Err(NetworkError::ChannelClosed(
148 "connection manager closed".into(),
149 ));
150 }
151
152 tracing::debug!("🔔 Connection status updated, retrying...");
153 }
154 }
155
156 /// Retry failed connections (smart reconnect)
157 ///
158 /// # Behavior
159 /// - Calls WireBuilder to create new connections
160 /// - Uses `add_connection_smart()` to skip already-working connections
161 /// - Perfect for reconnection after detecting connection failures
162 ///
163 /// # Arguments
164 /// - `dest`: destination (used by WireBuilder)
165 /// - `wire_builder`: factory to create new WireHandles
166 pub async fn retry_failed_connections(
167 &self,
168 dest: &Dest,
169 wire_builder: &dyn super::WireBuilder,
170 ) -> NetworkResult<()> {
171 tracing::info!("🔄 Retrying failed connections for: {:?}", dest);
172
173 // Get fresh connections from builder (no cancel token for retry)
174 let connections = wire_builder
175 .create_connections_with_cancel(dest, None)
176 .await?;
177
178 if connections.is_empty() {
179 return Err(NetworkError::ConfigurationError(
180 "WireBuilder returned no connections".to_string(),
181 ));
182 }
183
184 // Add each connection smartly (skip Ready/Connecting)
185 for conn in connections {
186 self.conn_mgr.add_connection_smart(conn).await;
187 }
188
189 Ok(())
190 }
191
192 /// Close DestTransport and release all connection resources
193 pub async fn close(&self) -> NetworkResult<()> {
194 tracing::info!("🔌 Closing DestTransport");
195
196 // 1. Get all connections and close them one by one
197 for conn_type in [ConnType::WebSocket, ConnType::WebRTC] {
198 if let Some(conn) = self.conn_mgr.get_connection(conn_type).await {
199 if let Err(e) = conn.close().await {
200 tracing::warn!("❌ Failed to close {:?} connection: {}", conn_type, e);
201 } else {
202 tracing::debug!("✅ Closed {:?} connection", conn_type);
203 }
204
205 // 2. Mark connection as closed in the pool
206 // This updates the ready_tx and notifies waiters
207 self.conn_mgr.mark_connection_closed(conn_type).await;
208 }
209 }
210
211 // 3. Clean up the entire pool
212 self.conn_mgr.close_all().await;
213
214 Ok(())
215 }
216
217 /// Check if any connection is still healthy
218 ///
219 /// Used by health checker to detect failed connections
220 ///
221 /// # Returns
222 /// - `true`: at least one connection is healthy (connected)
223 /// - `false`: all connections are unhealthy or no connections exist
224 pub async fn has_healthy_connection(&self) -> bool {
225 for conn_type in [ConnType::WebRTC, ConnType::WebSocket] {
226 if let Some(conn) = self.conn_mgr.get_connection(conn_type).await {
227 if conn.is_connected() {
228 return true;
229 }
230 }
231 }
232 false
233 }
234
235 /// Subscribe to ready-set changes (used for manager-side cleanup).
236 pub fn watch_ready(&self) -> watch::Receiver<ReadySet> {
237 self.conn_mgr.watch_ready()
238 }
239}