actr_runtime/transport/
wire_pool.rs

1//! WirePool - Wire connection pool manager
2//!
3//! Manages connection strategies: saturated concurrent connections, automatic retry, and fallback strategies.
4//! Uses watch channels to broadcast connection status, implementing zero-polling event-driven architecture.
5
6use super::backoff::ExponentialBackoff;
7use super::error::NetworkResult;
8use super::wire_handle::{WireHandle, WireStatus};
9use std::collections::HashSet;
10use std::sync::Arc;
11use std::sync::atomic::{AtomicU8, Ordering};
12use std::time::Duration;
13use tokio::sync::{RwLock, watch};
14
15/// Connection type identifier
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
17pub enum ConnType {
18    WebSocket,
19    WebRTC,
20}
21
22impl ConnType {
23    /// Convert to array index
24    const fn as_index(self) -> usize {
25        match self {
26            ConnType::WebSocket => 0,
27            ConnType::WebRTC => 1,
28        }
29    }
30
31    /// All connection types
32    const ALL: [ConnType; 2] = [ConnType::WebSocket, ConnType::WebRTC];
33}
34
35impl From<&WireHandle> for ConnType {
36    fn from(conn: &WireHandle) -> Self {
37        match conn {
38            WireHandle::WebSocket(_) => ConnType::WebSocket,
39            WireHandle::WebRTC(_) => ConnType::WebRTC,
40        }
41    }
42}
43
44/// Set of ready connections
45pub type ReadySet = HashSet<ConnType>;
46
47/// Retry configuration
48#[derive(Debug, Clone, Copy)]
49pub struct RetryConfig {
50    pub max_attempts: u32,
51    pub initial_delay_ms: u64,
52    pub max_delay_ms: u64,
53    pub multiplier: f64,
54}
55
56impl Default for RetryConfig {
57    fn default() -> Self {
58        Self {
59            max_attempts: 3,
60            initial_delay_ms: 1000,
61            max_delay_ms: 10000,
62            multiplier: 2.0,
63        }
64    }
65}
66
67impl RetryConfig {
68    /// Create ExponentialBackoff from this config
69    pub fn create_backoff(&self) -> ExponentialBackoff {
70        ExponentialBackoff::with_multiplier(
71            Duration::from_millis(self.initial_delay_ms),
72            Duration::from_millis(self.max_delay_ms),
73            Some(self.max_attempts),
74            self.multiplier,
75        )
76    }
77}
78
79/// WirePool - Wire connection pool manager
80///
81/// # Responsibilities
82/// - Saturated concurrent connections (simultaneously attempt WebRTC + WebSocket)
83/// - Automatic retry with exponential backoff
84/// - Broadcast connection status (via watch channel, zero-polling)
85/// - Keep all successful connections (no priority-based replacement)
86///
87/// # Design Highlights
88/// - **Event-driven**: Use watch channels to notify status changes
89/// - **Zero-polling**: Callers use `await ready_rx.changed()` to wait for connection readiness
90/// - **Array optimization**: Use fixed-size array instead of HashMap
91pub struct WirePool {
92    /// Connection status (array optimization: WebSocket=0, WebRTC=1)
93    connections: Arc<RwLock<[Option<WireStatus>; 2]>>,
94
95    /// Ready connection set (broadcast)
96    ready_tx: watch::Sender<ReadySet>,
97    ready_rx: watch::Receiver<ReadySet>,
98
99    /// Pending connection count
100    pending: Arc<AtomicU8>,
101
102    /// Retry configuration
103    retry_config: RetryConfig,
104}
105
106impl WirePool {
107    /// Create new wire connection pool
108    pub fn new(retry_config: RetryConfig) -> Self {
109        let (tx, rx) = watch::channel(HashSet::new());
110
111        Self {
112            connections: Arc::new(RwLock::new([None, None])),
113            ready_tx: tx,
114            ready_rx: rx,
115            pending: Arc::new(AtomicU8::new(0)),
116            retry_config,
117        }
118    }
119
120    /// Add connection and start connection task in background
121    ///
122    /// Non-blocking, returns immediately and attempts connection concurrently in background
123    ///
124    /// # Behavior
125    /// - **Unconditionally starts**: Always starts connection attempt, even if a connection already exists
126    /// - Use `add_connection_smart()` if you want to skip already-ready connections
127    pub fn add_connection(&self, connection: WireHandle) {
128        let connections = Arc::clone(&self.connections);
129        let ready_tx = self.ready_tx.clone();
130        let pending = Arc::clone(&self.pending);
131        let retry_config = self.retry_config;
132
133        let conn_type = ConnType::from(&connection);
134
135        tokio::spawn(async move {
136            // Initialize status
137            {
138                let mut conns = connections.write().await;
139                conns[conn_type.as_index()] = Some(WireStatus::Connecting);
140            }
141
142            // Create exponential backoff iterator
143            let backoff = retry_config.create_backoff();
144
145            // Retry loop using ExponentialBackoff iterator
146            for (attempt, delay) in backoff.enumerate() {
147                // Wait for delay (first attempt has 0 delay built into iterator)
148                if attempt > 0 {
149                    tracing::debug!(
150                        "⏱️ [{:?}] Waiting {:?} before retry {}",
151                        conn_type,
152                        delay,
153                        attempt + 1
154                    );
155                    tokio::time::sleep(delay).await;
156                }
157
158                pending.fetch_add(1, Ordering::Relaxed);
159
160                tracing::debug!(
161                    "🔄 [{:?}] Connecting (attempt {}/{})",
162                    conn_type,
163                    attempt + 1,
164                    retry_config.max_attempts
165                );
166
167                let result = connection.connect().await;
168                pending.fetch_sub(1, Ordering::Relaxed);
169
170                match result {
171                    Ok(_) => {
172                        tracing::info!(
173                            "✅ [{:?}] Connection established on attempt {}",
174                            conn_type,
175                            attempt + 1
176                        );
177
178                        // Update status to Ready
179                        {
180                            let mut conns = connections.write().await;
181                            conns[conn_type.as_index()] =
182                                Some(WireStatus::Ready(connection.clone()));
183                        }
184
185                        // Broadcast new ready connection set (keep all connections, no replacement)
186                        Self::broadcast_ready_connections(&connections, &ready_tx).await;
187
188                        return; // Success, exit
189                    }
190                    Err(e) => {
191                        tracing::warn!(
192                            "❌ [{:?}] Connection failed on attempt {}: {}",
193                            conn_type,
194                            attempt + 1,
195                            e
196                        );
197                    }
198                }
199            }
200
201            // All retries failed
202            tracing::error!(
203                "💀 [{:?}] All {} retries exhausted",
204                conn_type,
205                retry_config.max_attempts
206            );
207
208            let mut conns = connections.write().await;
209            conns[conn_type.as_index()] = Some(WireStatus::Failed);
210
211            // Check if all connections failed
212            let remaining = pending.load(Ordering::Relaxed);
213            if remaining == 0 {
214                let all_failed = conns
215                    .iter()
216                    .all(|s| matches!(s, Some(WireStatus::Failed) | None));
217
218                if all_failed {
219                    tracing::error!("💀💀 All connections failed");
220                }
221            }
222        });
223    }
224
225    /// Add connection smartly - skip if already Ready or Connecting
226    ///
227    /// # Behavior
228    /// - **Ready**: Skip (reuse existing connection)
229    /// - **Connecting**: Skip (avoid duplicate retry)
230    /// - **None/Failed**: Start connection attempt
231    ///
232    /// # Use Case
233    /// Perfect for reconnection scenarios where you want to retry failed connections
234    /// without disrupting working ones.
235    pub async fn add_connection_smart(&self, connection: WireHandle) {
236        let conn_type = ConnType::from(&connection);
237
238        // Check current status
239        let should_add = {
240            let conns = self.connections.read().await;
241            match &conns[conn_type.as_index()] {
242                Some(WireStatus::Ready(_)) => {
243                    tracing::debug!("⏭️ [{:?}] Skipping - already Ready", conn_type);
244                    false
245                }
246                Some(WireStatus::Connecting) => {
247                    tracing::debug!("⏭️ [{:?}] Skipping - already Connecting", conn_type);
248                    false
249                }
250                Some(WireStatus::Failed) | None => {
251                    tracing::info!(
252                        "🔄 [{:?}] Starting connection (was {:?})",
253                        conn_type,
254                        conns[conn_type.as_index()]
255                    );
256                    true
257                }
258            }
259        };
260
261        if should_add {
262            self.add_connection(connection);
263        }
264    }
265
266    /// Broadcast current ready connections
267    async fn broadcast_ready_connections(
268        connections: &Arc<RwLock<[Option<WireStatus>; 2]>>,
269        ready_tx: &watch::Sender<ReadySet>,
270    ) {
271        let conns = connections.read().await;
272
273        // Collect all ready connections
274        let mut ready_set: ReadySet = HashSet::new();
275
276        for conn_type in ConnType::ALL {
277            if let Some(WireStatus::Ready(_)) = &conns[conn_type.as_index()] {
278                ready_set.insert(conn_type);
279            }
280        }
281
282        // Broadcast ready set
283        let _ = ready_tx.send(ready_set);
284    }
285
286    /// Watch for connection status changes
287    pub fn watch_ready(&self) -> watch::Receiver<ReadySet> {
288        self.ready_rx.clone()
289    }
290
291    /// Get current ready connection set
292    pub fn get_ready(&self) -> ReadySet {
293        self.ready_rx.borrow().clone()
294    }
295
296    /// Get connection of specified type
297    pub async fn get_connection(&self, conn_type: ConnType) -> Option<WireHandle> {
298        let conns = self.connections.read().await;
299
300        match &conns[conn_type.as_index()] {
301            Some(WireStatus::Ready(conn)) => Some(conn.clone()),
302            _ => None,
303        }
304    }
305
306    /// Wait for any connection to become ready
307    pub async fn wait_for_any(&self) -> NetworkResult<()> {
308        let mut rx = self.ready_rx.clone();
309
310        rx.wait_for(|ready_set| !ready_set.is_empty())
311            .await
312            .map_err(|_| {
313                super::error::NetworkError::ChannelClosed("watch channel closed".to_string())
314            })?;
315
316        Ok(())
317    }
318}