use super::backoff::ExponentialBackoff;
use super::wire_handle::{WireHandle, WireIdentity, WireStatus};
use std::collections::HashSet;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
use std::time::Duration;
use tokio::sync::{RwLock, watch};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum ConnType {
WebSocket,
WebRTC,
}
impl ConnType {
const fn as_index(self) -> usize {
match self {
ConnType::WebSocket => 0,
ConnType::WebRTC => 1,
}
}
const ALL: [ConnType; 2] = [ConnType::WebSocket, ConnType::WebRTC];
}
pub(crate) type ReadySet = HashSet<ConnType>;
#[derive(Debug, Clone, Copy)]
pub(crate) struct RetryConfig {
pub(crate) max_attempts: u32,
pub(crate) initial_delay_ms: u64,
pub(crate) max_delay_ms: u64,
pub(crate) multiplier: f64,
}
impl Default for RetryConfig {
fn default() -> Self {
Self {
max_attempts: 3,
initial_delay_ms: 1000,
max_delay_ms: 10000,
multiplier: 2.0,
}
}
}
impl RetryConfig {
pub(crate) fn create_backoff(&self) -> ExponentialBackoff {
ExponentialBackoff::with_multiplier(
Duration::from_millis(self.initial_delay_ms),
Duration::from_millis(self.max_delay_ms),
Some(self.max_attempts),
self.multiplier,
)
}
}
pub(crate) struct WirePool {
connections: Arc<RwLock<[Option<WireStatus>; 2]>>,
ready_tx: watch::Sender<ReadySet>,
ready_rx: watch::Receiver<ReadySet>,
pending: Arc<AtomicU8>,
retry_config: RetryConfig,
closed: Arc<AtomicBool>,
}
impl WirePool {
pub(crate) fn new(retry_config: RetryConfig) -> Self {
let (tx, rx) = watch::channel(HashSet::new());
Self {
connections: Arc::new(RwLock::new([None, None])),
ready_tx: tx,
ready_rx: rx,
pending: Arc::new(AtomicU8::new(0)),
retry_config,
closed: Arc::new(AtomicBool::new(false)),
}
}
pub(crate) fn add_connection(&self, connection: Arc<dyn WireHandle>) {
let connections = Arc::clone(&self.connections);
let ready_tx = self.ready_tx.clone();
let pending = Arc::clone(&self.pending);
let retry_config = self.retry_config;
let closed = Arc::clone(&self.closed);
let conn_type = connection.connection_type();
tokio::spawn(async move {
{
let mut conns = connections.write().await;
conns[conn_type.as_index()] = Some(WireStatus::Connecting);
}
let backoff = retry_config.create_backoff();
for (attempt, delay) in backoff.enumerate() {
if closed.load(Ordering::Relaxed) {
tracing::debug!(
"🛑 [{:?}] Connection task terminated (pool closed)",
conn_type
);
return;
}
if attempt > 0 {
tracing::debug!(
"⏱️ [{:?}] Waiting {:?} before retry {}",
conn_type,
delay,
attempt + 1
);
tokio::time::sleep(delay).await;
if closed.load(Ordering::Relaxed) {
tracing::debug!(
"🛑 [{:?}] Connection task terminated (pool closed)",
conn_type
);
return;
}
}
pending.fetch_add(1, Ordering::Relaxed);
tracing::debug!(
"🔄 [{:?}] Connecting (attempt {}/{})",
conn_type,
attempt + 1,
retry_config.max_attempts
);
let result = connection.connect().await;
pending.fetch_sub(1, Ordering::Relaxed);
match result {
Ok(_) => {
tracing::info!(
"✅ [{:?}] Connection established on attempt {}",
conn_type,
attempt + 1
);
{
let mut conns = connections.write().await;
conns[conn_type.as_index()] =
Some(WireStatus::Ready(Arc::clone(&connection)));
}
Self::broadcast_ready_connections(&connections, &ready_tx).await;
return; }
Err(e) => {
tracing::warn!(
"❌ [{:?}] Connection failed on attempt {}: {}",
conn_type,
attempt + 1,
e
);
}
}
}
tracing::error!(
"💀 [{:?}] All {} retries exhausted",
conn_type,
retry_config.max_attempts
);
let mut conns = connections.write().await;
conns[conn_type.as_index()] = Some(WireStatus::Failed);
let remaining = pending.load(Ordering::Relaxed);
if remaining == 0 {
let all_failed = conns
.iter()
.all(|s| matches!(s, Some(WireStatus::Failed) | None));
if all_failed {
tracing::error!("💀💀 All connections failed");
}
}
});
}
#[cfg(feature = "test-utils")]
pub(crate) async fn add_connection_smart(&self, connection: Arc<dyn WireHandle>) {
let conn_type = connection.connection_type();
let should_add = {
let conns = self.connections.read().await;
match &conns[conn_type.as_index()] {
Some(WireStatus::Ready(_)) => {
tracing::debug!("⏭️ [{:?}] Skipping - already Ready", conn_type);
false
}
Some(WireStatus::Connecting) => {
tracing::debug!("⏭️ [{:?}] Skipping - already Connecting", conn_type);
false
}
Some(WireStatus::Failed) | None => {
tracing::info!(
"🔄 [{:?}] Starting connection (was {:?})",
conn_type,
conns[conn_type.as_index()]
);
true
}
}
};
if should_add {
self.add_connection(connection);
}
}
async fn broadcast_ready_connections(
connections: &Arc<RwLock<[Option<WireStatus>; 2]>>,
ready_tx: &watch::Sender<ReadySet>,
) {
let conns = connections.read().await;
let mut ready_set: ReadySet = HashSet::new();
for conn_type in ConnType::ALL {
if let Some(WireStatus::Ready(_)) = &conns[conn_type.as_index()] {
ready_set.insert(conn_type);
}
}
let _ = ready_tx.send(ready_set);
}
pub(crate) fn watch_ready(&self) -> watch::Receiver<ReadySet> {
self.ready_rx.clone()
}
pub(crate) async fn get_connection(&self, conn_type: ConnType) -> Option<Arc<dyn WireHandle>> {
let conns = self.connections.read().await;
match &conns[conn_type.as_index()] {
Some(WireStatus::Ready(conn)) => Some(Arc::clone(conn)),
_ => None,
}
}
pub(crate) async fn mark_connection_closed(&self, conn_type: ConnType) {
{
let mut conns = self.connections.write().await;
conns[conn_type.as_index()] = Some(WireStatus::Failed);
}
Self::broadcast_ready_connections(&self.connections, &self.ready_tx).await;
tracing::debug!("🔌 Marked {:?} connection as closed", conn_type);
}
pub(crate) async fn connection_matches_identity(
&self,
conn_type: ConnType,
expected_identity: &WireIdentity,
) -> bool {
let conns = self.connections.read().await;
match &conns[conn_type.as_index()] {
Some(WireStatus::Ready(handle)) => {
handle.identity().as_ref() == Some(expected_identity)
}
_ => false,
}
}
pub(crate) async fn mark_connection_closed_if_same(
&self,
conn_type: ConnType,
expected_identity: &WireIdentity,
) -> bool {
{
let mut conns = self.connections.write().await;
match &conns[conn_type.as_index()] {
Some(WireStatus::Ready(handle)) => {
if handle.identity().as_ref() != Some(expected_identity) {
tracing::debug!(
"🔌 {:?} identity mismatch — not marking closed (wire already replaced)",
conn_type
);
return false;
}
}
_ => {
return false;
}
}
conns[conn_type.as_index()] = Some(WireStatus::Failed);
}
Self::broadcast_ready_connections(&self.connections, &self.ready_tx).await;
tracing::debug!(
"🔌 Marked {:?} closed (identity matched {:?})",
conn_type,
expected_identity
);
true
}
pub(crate) async fn close_all(&self) {
self.closed.store(true, Ordering::Relaxed);
let mut conns = self.connections.write().await;
*conns = [None, None];
let _ = self.ready_tx.send(HashSet::new());
tracing::debug!("🔌 Closed all connections in pool (background tasks will terminate)");
}
pub(crate) fn is_closed(&self) -> bool {
self.closed.load(Ordering::Relaxed)
}
}