actr_runtime/transport/dest_transport.rs
1//! DestTransport - Transport layer manager for a single destination
2//!
3//! Manages all connections and message routing to a specific Dest (Actor or Shell).
4//! Implements event-driven pattern with zero polling.
5
6use super::Dest; // Re-exported from actr-framework
7use super::error::{NetworkError, NetworkResult};
8use super::route_table::PayloadTypeExt;
9use super::wire_handle::WireHandle;
10use super::wire_pool::{ConnType, RetryConfig, WirePool};
11use actr_protocol::PayloadType;
12use std::sync::Arc;
13
14/// DestTransport - Transport layer manager for a single destination
15///
16/// Core responsibilities:
17/// - Manage all connections to a specific Dest (WebSocket + WebRTC)
18/// - Concurrently establish connections in background (saturated connection pattern)
19/// - Event-driven wait for connection status
20/// - Cache Lanes within WireHandle
21/// - WirePool handles priority selection
22pub struct DestTransport {
23 /// Connection manager
24 conn_mgr: Arc<WirePool>,
25}
26
27impl DestTransport {
28 /// Create new DestTransport
29 ///
30 /// # Arguments
31 /// - `dest`: destination
32 /// - `connections`: list of pre-built connections (WebSocket/WebRTC)
33 pub async fn new(dest: Dest, connections: Vec<WireHandle>) -> NetworkResult<Self> {
34 let conn_mgr = Arc::new(WirePool::new(RetryConfig::default()));
35
36 // Start connection tasks in background (concurrently)
37 tracing::info!("🚀 [{:?}] Starting connection tasks...", dest);
38 for conn in connections {
39 conn_mgr.add_connection(conn);
40 }
41
42 Ok(Self { conn_mgr })
43 }
44
45 /// Send message
46 ///
47 /// Core design: event-driven waiting
48 /// - If connection available, send immediately
49 /// - If not, wait for connection status change (via watch channel)
50 /// - WirePool already handles priority, only need to try DataLane Types in order
51 pub async fn send(&self, payload_type: PayloadType, data: &[u8]) -> NetworkResult<()> {
52 tracing::debug!(
53 "📤 Sending message: type={:?}, size={}",
54 payload_type,
55 data.len()
56 );
57
58 // 1. Get supported DataLane Types for this PayloadType (already prioritized)
59 let lane_types = payload_type.data_lane_types();
60
61 if lane_types.is_empty() {
62 return Err(NetworkError::NoRoute(format!(
63 "No route for: {payload_type:?}"
64 )));
65 }
66
67 // 2. Subscribe to connection status changes
68 let mut conn_watcher = self.conn_mgr.watch_ready();
69
70 loop {
71 // 3. Check currently available connections (clone to avoid borrowing across await)
72 let ready_connections = {
73 let ready = conn_watcher.borrow_and_update();
74 tracing::trace!("🔍 Available connections: {:?}", ready);
75 ready.clone()
76 };
77
78 // 4. Try each DataLane Type in priority order
79 for &lane_type in lane_types {
80 // Determine required connection type
81 let conn_type = if lane_type.needs_webrtc() {
82 ConnType::WebRTC
83 } else {
84 ConnType::WebSocket
85 };
86
87 // Check if this connection is ready
88 if !ready_connections.contains(&conn_type) {
89 tracing::trace!("🔍 {:?} not ready, trying next", conn_type);
90 continue;
91 }
92
93 // Get connection and create/get Lane
94 if let Some(conn) = self.conn_mgr.get_connection(conn_type).await {
95 // Use original payload_type to create DataLane
96 match conn.get_lane(payload_type).await {
97 Ok(lane) => {
98 tracing::debug!(
99 "✅ Using DataLane: {:?} (type={:?})",
100 lane_type,
101 payload_type
102 );
103 // Convert to Bytes (zero-copy)
104 return lane.send(bytes::Bytes::copy_from_slice(data)).await;
105 }
106 Err(e) => {
107 tracing::warn!("❌ Failed to get DataLane: {:?}: {}", lane_type, e);
108 continue;
109 }
110 }
111 }
112 }
113
114 // 5. All attempts failed, wait for connection status change
115 tracing::info!("⏳ Waiting for connection status...");
116
117 // Event-driven wait!
118 if conn_watcher.changed().await.is_err() {
119 return Err(NetworkError::ChannelClosed(
120 "connection manager closed".into(),
121 ));
122 }
123
124 tracing::debug!("🔔 Connection status updated, retrying...");
125 }
126 }
127
128 /// Retry failed connections (smart reconnect)
129 ///
130 /// # Behavior
131 /// - Calls WireBuilder to create new connections
132 /// - Uses `add_connection_smart()` to skip already-working connections
133 /// - Perfect for reconnection after detecting connection failures
134 ///
135 /// # Arguments
136 /// - `dest`: destination (used by WireBuilder)
137 /// - `wire_builder`: factory to create new WireHandles
138 pub async fn retry_failed_connections(
139 &self,
140 dest: &Dest,
141 wire_builder: &dyn super::WireBuilder,
142 ) -> NetworkResult<()> {
143 tracing::info!("🔄 Retrying failed connections for: {:?}", dest);
144
145 // Get fresh connections from builder
146 let connections = wire_builder.create_connections(dest).await?;
147
148 if connections.is_empty() {
149 return Err(NetworkError::ConfigurationError(
150 "WireBuilder returned no connections".to_string(),
151 ));
152 }
153
154 // Add each connection smartly (skip Ready/Connecting)
155 for conn in connections {
156 self.conn_mgr.add_connection_smart(conn).await;
157 }
158
159 Ok(())
160 }
161
162 /// Close DestTransport and release all connection resources
163 pub async fn close(&self) -> NetworkResult<()> {
164 tracing::info!("🔌 Closing DestTransport");
165
166 // Get all connections and close them one by one
167 for conn_type in [ConnType::WebSocket, ConnType::WebRTC] {
168 if let Some(conn) = self.conn_mgr.get_connection(conn_type).await {
169 if let Err(e) = conn.close().await {
170 tracing::warn!("❌ Failed to close {:?} connection: {}", conn_type, e);
171 } else {
172 tracing::debug!("✅ Closed {:?} connection", conn_type);
173 }
174 }
175 }
176
177 Ok(())
178 }
179
180 /// Check if any connection is still healthy
181 ///
182 /// Used by health checker to detect failed connections
183 ///
184 /// # Returns
185 /// - `true`: at least one connection is healthy (connected)
186 /// - `false`: all connections are unhealthy or no connections exist
187 pub async fn has_healthy_connection(&self) -> bool {
188 for conn_type in [ConnType::WebRTC, ConnType::WebSocket] {
189 if let Some(conn) = self.conn_mgr.get_connection(conn_type).await {
190 if conn.is_connected() {
191 return true;
192 }
193 }
194 }
195 false
196 }
197}