actr_runtime/transport/
wire_pool.rs

1//! WirePool - Wire connection pool manager
2//!
3//! Manages connection strategies: saturated concurrent connections, automatic retry, and fallback strategies.
4//! Uses watch channels to broadcast connection status, implementing zero-polling event-driven architecture.
5
6use super::backoff::ExponentialBackoff;
7use super::error::NetworkResult;
8use super::wire_handle::{WireHandle, WireStatus};
9use std::collections::HashSet;
10use std::sync::Arc;
11use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
12use std::time::Duration;
13use tokio::sync::{RwLock, watch};
14
15/// Connection type identifier
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
17pub enum ConnType {
18    WebSocket,
19    WebRTC,
20}
21
22impl ConnType {
23    /// Convert to array index
24    const fn as_index(self) -> usize {
25        match self {
26            ConnType::WebSocket => 0,
27            ConnType::WebRTC => 1,
28        }
29    }
30
31    /// All connection types
32    const ALL: [ConnType; 2] = [ConnType::WebSocket, ConnType::WebRTC];
33}
34
35impl From<&WireHandle> for ConnType {
36    fn from(conn: &WireHandle) -> Self {
37        match conn {
38            WireHandle::WebSocket(_) => ConnType::WebSocket,
39            WireHandle::WebRTC(_) => ConnType::WebRTC,
40        }
41    }
42}
43
44/// Set of ready connections
45pub type ReadySet = HashSet<ConnType>;
46
47/// Retry configuration
48#[derive(Debug, Clone, Copy)]
49pub struct RetryConfig {
50    pub max_attempts: u32,
51    pub initial_delay_ms: u64,
52    pub max_delay_ms: u64,
53    pub multiplier: f64,
54}
55
56impl Default for RetryConfig {
57    fn default() -> Self {
58        Self {
59            max_attempts: 3,
60            initial_delay_ms: 1000,
61            max_delay_ms: 10000,
62            multiplier: 2.0,
63        }
64    }
65}
66
67impl RetryConfig {
68    /// Create ExponentialBackoff from this config
69    pub fn create_backoff(&self) -> ExponentialBackoff {
70        ExponentialBackoff::with_multiplier(
71            Duration::from_millis(self.initial_delay_ms),
72            Duration::from_millis(self.max_delay_ms),
73            Some(self.max_attempts),
74            self.multiplier,
75        )
76    }
77}
78
79/// WirePool - Wire connection pool manager
80///
81/// # Responsibilities
82/// - Saturated concurrent connections (simultaneously attempt WebRTC + WebSocket)
83/// - Automatic retry with exponential backoff
84/// - Broadcast connection status (via watch channel, zero-polling)
85/// - Keep all successful connections (no priority-based replacement)
86///
87/// # Design Highlights
88/// - **Event-driven**: Use watch channels to notify status changes
89/// - **Zero-polling**: Callers use `await ready_rx.changed()` to wait for connection readiness
90/// - **Array optimization**: Use fixed-size array instead of HashMap
91pub struct WirePool {
92    /// Connection status (array optimization: WebSocket=0, WebRTC=1)
93    connections: Arc<RwLock<[Option<WireStatus>; 2]>>,
94
95    /// Ready connection set (broadcast)
96    ready_tx: watch::Sender<ReadySet>,
97    ready_rx: watch::Receiver<ReadySet>,
98
99    /// Pending connection count
100    pending: Arc<AtomicU8>,
101
102    /// Retry configuration
103    retry_config: RetryConfig,
104
105    /// Closed flag (used to terminate background tasks)
106    closed: Arc<AtomicBool>,
107}
108
109impl WirePool {
110    /// Create new wire connection pool
111    pub fn new(retry_config: RetryConfig) -> Self {
112        let (tx, rx) = watch::channel(HashSet::new());
113
114        Self {
115            connections: Arc::new(RwLock::new([None, None])),
116            ready_tx: tx,
117            ready_rx: rx,
118            pending: Arc::new(AtomicU8::new(0)),
119            retry_config,
120            closed: Arc::new(AtomicBool::new(false)),
121        }
122    }
123
124    /// Add connection and start connection task in background
125    ///
126    /// Non-blocking, returns immediately and attempts connection concurrently in background
127    ///
128    /// # Behavior
129    /// - **Unconditionally starts**: Always starts connection attempt, even if a connection already exists
130    /// - Use `add_connection_smart()` if you want to skip already-ready connections
131    pub fn add_connection(&self, connection: WireHandle) {
132        let connections = Arc::clone(&self.connections);
133        let ready_tx = self.ready_tx.clone();
134        let pending = Arc::clone(&self.pending);
135        let retry_config = self.retry_config;
136        let closed = Arc::clone(&self.closed);
137
138        let conn_type = ConnType::from(&connection);
139
140        tokio::spawn(async move {
141            // Initialize status
142            {
143                let mut conns = connections.write().await;
144                conns[conn_type.as_index()] = Some(WireStatus::Connecting);
145            }
146
147            // Create exponential backoff iterator
148            let backoff = retry_config.create_backoff();
149
150            // Retry loop using ExponentialBackoff iterator
151            for (attempt, delay) in backoff.enumerate() {
152                // Check if pool has been closed
153                if closed.load(Ordering::Relaxed) {
154                    tracing::debug!(
155                        "🛑 [{:?}] Connection task terminated (pool closed)",
156                        conn_type
157                    );
158                    return;
159                }
160
161                // Wait for delay (first attempt has 0 delay built into iterator)
162                if attempt > 0 {
163                    tracing::debug!(
164                        "⏱️ [{:?}] Waiting {:?} before retry {}",
165                        conn_type,
166                        delay,
167                        attempt + 1
168                    );
169                    tokio::time::sleep(delay).await;
170
171                    // Check again after sleep
172                    if closed.load(Ordering::Relaxed) {
173                        tracing::debug!(
174                            "🛑 [{:?}] Connection task terminated (pool closed)",
175                            conn_type
176                        );
177                        return;
178                    }
179                }
180
181                pending.fetch_add(1, Ordering::Relaxed);
182
183                tracing::debug!(
184                    "🔄 [{:?}] Connecting (attempt {}/{})",
185                    conn_type,
186                    attempt + 1,
187                    retry_config.max_attempts
188                );
189
190                let result = connection.connect().await;
191                pending.fetch_sub(1, Ordering::Relaxed);
192
193                match result {
194                    Ok(_) => {
195                        tracing::info!(
196                            "✅ [{:?}] Connection established on attempt {}",
197                            conn_type,
198                            attempt + 1
199                        );
200
201                        // Update status to Ready
202                        {
203                            let mut conns = connections.write().await;
204                            conns[conn_type.as_index()] =
205                                Some(WireStatus::Ready(connection.clone()));
206                        }
207
208                        // Broadcast new ready connection set (keep all connections, no replacement)
209                        Self::broadcast_ready_connections(&connections, &ready_tx).await;
210
211                        return; // Success, exit
212                    }
213                    Err(e) => {
214                        tracing::warn!(
215                            "❌ [{:?}] Connection failed on attempt {}: {}",
216                            conn_type,
217                            attempt + 1,
218                            e
219                        );
220                    }
221                }
222            }
223
224            // All retries failed
225            tracing::error!(
226                "💀 [{:?}] All {} retries exhausted",
227                conn_type,
228                retry_config.max_attempts
229            );
230
231            let mut conns = connections.write().await;
232            conns[conn_type.as_index()] = Some(WireStatus::Failed);
233
234            // Check if all connections failed
235            let remaining = pending.load(Ordering::Relaxed);
236            if remaining == 0 {
237                let all_failed = conns
238                    .iter()
239                    .all(|s| matches!(s, Some(WireStatus::Failed) | None));
240
241                if all_failed {
242                    tracing::error!("💀💀 All connections failed");
243                }
244            }
245        });
246    }
247
248    /// Add connection smartly - skip if already Ready or Connecting
249    ///
250    /// # Behavior
251    /// - **Ready**: Skip (reuse existing connection)
252    /// - **Connecting**: Skip (avoid duplicate retry)
253    /// - **None/Failed**: Start connection attempt
254    ///
255    /// # Use Case
256    /// Perfect for reconnection scenarios where you want to retry failed connections
257    /// without disrupting working ones.
258    pub async fn add_connection_smart(&self, connection: WireHandle) {
259        let conn_type = ConnType::from(&connection);
260
261        // Check current status
262        let should_add = {
263            let conns = self.connections.read().await;
264            match &conns[conn_type.as_index()] {
265                Some(WireStatus::Ready(_)) => {
266                    tracing::debug!("⏭️ [{:?}] Skipping - already Ready", conn_type);
267                    false
268                }
269                Some(WireStatus::Connecting) => {
270                    tracing::debug!("⏭️ [{:?}] Skipping - already Connecting", conn_type);
271                    false
272                }
273                Some(WireStatus::Failed) | None => {
274                    tracing::info!(
275                        "🔄 [{:?}] Starting connection (was {:?})",
276                        conn_type,
277                        conns[conn_type.as_index()]
278                    );
279                    true
280                }
281            }
282        };
283
284        if should_add {
285            self.add_connection(connection);
286        }
287    }
288
289    /// Broadcast current ready connections
290    async fn broadcast_ready_connections(
291        connections: &Arc<RwLock<[Option<WireStatus>; 2]>>,
292        ready_tx: &watch::Sender<ReadySet>,
293    ) {
294        let conns = connections.read().await;
295
296        // Collect all ready connections
297        let mut ready_set: ReadySet = HashSet::new();
298
299        for conn_type in ConnType::ALL {
300            if let Some(WireStatus::Ready(_)) = &conns[conn_type.as_index()] {
301                ready_set.insert(conn_type);
302            }
303        }
304
305        // Broadcast ready set
306        let _ = ready_tx.send(ready_set);
307    }
308
309    /// Watch for connection status changes
310    pub fn watch_ready(&self) -> watch::Receiver<ReadySet> {
311        self.ready_rx.clone()
312    }
313
314    /// Get current ready connection set
315    pub fn get_ready(&self) -> ReadySet {
316        self.ready_rx.borrow().clone()
317    }
318
319    /// Get connection of specified type
320    pub async fn get_connection(&self, conn_type: ConnType) -> Option<WireHandle> {
321        let conns = self.connections.read().await;
322
323        match &conns[conn_type.as_index()] {
324            Some(WireStatus::Ready(conn)) => Some(conn.clone()),
325            _ => None,
326        }
327    }
328
329    /// Wait for any connection to become ready
330    pub async fn wait_for_any(&self) -> NetworkResult<()> {
331        let mut rx = self.ready_rx.clone();
332
333        rx.wait_for(|ready_set| !ready_set.is_empty())
334            .await
335            .map_err(|_| {
336                super::error::NetworkError::ChannelClosed("watch channel closed".to_string())
337            })?;
338
339        Ok(())
340    }
341
342    /// Mark a connection as closed/failed
343    ///
344    /// Called by upper layers (DestTransport) when closing connections.
345    /// This replaces the per-connection event listener pattern.
346    pub async fn mark_connection_closed(&self, conn_type: ConnType) {
347        {
348            let mut conns = self.connections.write().await;
349            conns[conn_type.as_index()] = Some(WireStatus::Failed);
350        }
351
352        // Update ready set
353        Self::broadcast_ready_connections(&self.connections, &self.ready_tx).await;
354
355        tracing::debug!("🔌 Marked {:?} connection as closed", conn_type);
356    }
357
358    /// Close all connections in the pool
359    ///
360    /// Called by DestTransport.close() to clean up all connections.
361    /// This also terminates all background connection tasks.
362    pub async fn close_all(&self) {
363        // 1. Set closed flag to terminate background tasks
364        self.closed.store(true, Ordering::Relaxed);
365
366        // 2. Clear all connection status
367        let mut conns = self.connections.write().await;
368        *conns = [None, None];
369
370        // 3. Broadcast empty ready set
371        let _ = self.ready_tx.send(HashSet::new());
372
373        tracing::debug!("🔌 Closed all connections in pool (background tasks will terminate)");
374    }
375
376    /// Check if pool is closed
377    pub fn is_closed(&self) -> bool {
378        self.closed.load(Ordering::Relaxed)
379    }
380}