use super::Dest; use super::error::{NetworkError, NetworkResult};
use super::route_table::PayloadTypeExt;
use super::wire_handle::{WireHandle, WireIdentity};
use super::wire_pool::{ConnType, ReadySet, RetryConfig, WirePool};
use actr_protocol::PayloadType;
use std::sync::Arc;
use tokio::sync::watch;
pub(crate) struct DestTransport {
conn_mgr: Arc<WirePool>,
}
impl DestTransport {
pub(crate) async fn new(
dest: Dest,
connections: Vec<Arc<dyn WireHandle>>,
) -> NetworkResult<Self> {
let conn_mgr = Arc::new(WirePool::new(RetryConfig::default()));
tracing::info!("🚀 [{:?}] Starting connection tasks...", dest);
for conn in connections {
conn_mgr.add_connection(conn);
}
Ok(Self { conn_mgr })
}
#[cfg_attr(
feature = "opentelemetry",
tracing::instrument(skip_all, name = "DestTransport.send")
)]
pub(crate) async fn send(&self, payload_type: PayloadType, data: &[u8]) -> NetworkResult<()> {
tracing::debug!(
"📤 Sending message: type={:?}, size={}",
payload_type,
data.len()
);
let lane_types = payload_type.data_lane_types();
if lane_types.is_empty() {
return Err(NetworkError::NoRoute(format!(
"No route for: {payload_type:?}"
)));
}
let mut conn_watcher = self.conn_mgr.watch_ready();
'send: loop {
let ready_connections = {
let ready = conn_watcher.borrow_and_update();
tracing::trace!("🔍 Available connections: {:?}", ready);
ready.clone()
};
for &lane_type in lane_types {
let conn_type = if lane_type.needs_webrtc() {
ConnType::WebRTC
} else {
ConnType::WebSocket
};
if !ready_connections.contains(&conn_type) {
tracing::trace!("🔍 {:?} not ready, trying next", conn_type);
continue;
}
if let Some(conn) = self.conn_mgr.get_connection(conn_type).await {
match conn.get_lane(payload_type).await {
Ok(lane) => {
tracing::info!(
"📡 [channel={:?}] {:?} ({} bytes)",
conn_type,
payload_type,
data.len()
);
let payload = bytes::Bytes::copy_from_slice(data);
let result = lane.send(payload.clone()).await;
if let Err(NetworkError::DataChannelError(msg)) = &result {
if msg.contains("closed") {
tracing::warn!(
"♻️ DataChannel closed for {:?}, invalidating lane and retrying once",
payload_type
);
conn.invalidate_lane(payload_type).await;
if let Ok(new_lane) = conn.get_lane(payload_type).await {
return new_lane.send(payload).await;
}
}
}
return result;
}
Err(e) => {
let is_closed_like =
conn_type == ConnType::WebRTC && is_closed_like_error(&e);
if is_closed_like {
tracing::warn!(
"❌ get_lane returned closed-like error for {:?}, invalidating lane",
conn_type
);
conn.invalidate_lane(payload_type).await;
let wire_identity = conn.identity();
let changed = match wire_identity.as_ref() {
Some(identity) => {
self.conn_mgr
.mark_connection_closed_if_same(conn_type, identity)
.await
}
None => false,
};
tracing::warn!(
"♻️ DestTransport stale self-heal: payload_type={:?}, conn_type={:?}, expected_identity={:?}, changed={}",
payload_type,
conn_type,
wire_identity,
changed
);
continue 'send;
}
tracing::warn!("❌ Failed to get DataLane: {:?}: {}", lane_type, e);
continue;
}
}
}
}
tracing::info!("⏳ Waiting for connection status...");
if self.conn_mgr.is_closed() {
return Err(NetworkError::ChannelClosed(
"connection manager closed".into(),
));
}
if conn_watcher.changed().await.is_err() {
return Err(NetworkError::ChannelClosed(
"connection manager closed".into(),
));
}
tracing::debug!("🔔 Connection status updated, retrying...");
}
}
#[cfg(feature = "test-utils")]
pub(crate) async fn retry_failed_connections(
&self,
dest: &Dest,
wire_builder: &dyn super::WireBuilder,
) -> NetworkResult<()> {
tracing::info!("🔄 Retrying failed connections for: {:?}", dest);
let connections = wire_builder
.create_connections_with_cancel(dest, None)
.await?;
if connections.is_empty() {
return Err(NetworkError::ConfigurationError(
"WireBuilder returned no connections".to_string(),
));
}
for conn in connections {
self.conn_mgr.add_connection_smart(conn).await;
}
Ok(())
}
pub(crate) async fn close(&self) -> NetworkResult<()> {
tracing::info!("🔌 Closing DestTransport");
for conn_type in [ConnType::WebSocket, ConnType::WebRTC] {
if let Some(conn) = self.conn_mgr.get_connection(conn_type).await {
if let Err(e) = conn.close().await {
tracing::warn!("❌ Failed to close {:?} connection: {}", conn_type, e);
} else {
tracing::debug!("✅ Closed {:?} connection", conn_type);
}
self.conn_mgr.mark_connection_closed(conn_type).await;
}
}
self.conn_mgr.close_all().await;
Ok(())
}
#[cfg(feature = "test-utils")]
pub(crate) async fn has_healthy_connection(&self) -> bool {
for conn_type in [ConnType::WebRTC, ConnType::WebSocket] {
if let Some(conn) = self.conn_mgr.get_connection(conn_type).await {
if conn.is_connected() {
return true;
}
}
}
false
}
pub(crate) fn watch_ready(&self) -> watch::Receiver<ReadySet> {
self.conn_mgr.watch_ready()
}
pub(crate) async fn matches_webrtc_session(
&self,
peer_id: &actr_protocol::ActrId,
session_id: u64,
) -> bool {
let expected = WireIdentity::WebRtc {
peer_id: peer_id.clone(),
session_id,
};
self.conn_mgr
.connection_matches_identity(ConnType::WebRTC, &expected)
.await
}
}
fn is_closed_like_error(e: &NetworkError) -> bool {
fn contains_closed_like(msg: &str) -> bool {
let msg = msg.to_ascii_lowercase();
msg.contains("connection closed")
|| msg.contains("peer connection closed")
|| msg.contains("datachannel closed")
|| msg.contains("data channel closed")
|| msg.contains("websocket connection closed")
|| msg.contains("closed")
}
match e {
NetworkError::ConnectionClosed(_) => true,
NetworkError::ConnectionError(msg)
| NetworkError::WebRtcError(msg)
| NetworkError::DataChannelError(msg)
| NetworkError::WebSocketError(msg)
| NetworkError::SendError(msg) => contains_closed_like(msg),
_ => false,
}
}