actr_runtime/transport/
wire_pool.rs1use super::backoff::ExponentialBackoff;
7use super::error::NetworkResult;
8use super::wire_handle::{WireHandle, WireStatus};
9use std::collections::HashSet;
10use std::sync::Arc;
11use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
12use std::time::Duration;
13use tokio::sync::{RwLock, watch};
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
17pub enum ConnType {
18 WebSocket,
19 WebRTC,
20}
21
22impl ConnType {
23 const fn as_index(self) -> usize {
25 match self {
26 ConnType::WebSocket => 0,
27 ConnType::WebRTC => 1,
28 }
29 }
30
31 const ALL: [ConnType; 2] = [ConnType::WebSocket, ConnType::WebRTC];
33}
34
35impl From<&WireHandle> for ConnType {
36 fn from(conn: &WireHandle) -> Self {
37 match conn {
38 WireHandle::WebSocket(_) => ConnType::WebSocket,
39 WireHandle::WebRTC(_) => ConnType::WebRTC,
40 }
41 }
42}
43
44pub type ReadySet = HashSet<ConnType>;
46
47#[derive(Debug, Clone, Copy)]
49pub struct RetryConfig {
50 pub max_attempts: u32,
51 pub initial_delay_ms: u64,
52 pub max_delay_ms: u64,
53 pub multiplier: f64,
54}
55
56impl Default for RetryConfig {
57 fn default() -> Self {
58 Self {
59 max_attempts: 3,
60 initial_delay_ms: 1000,
61 max_delay_ms: 10000,
62 multiplier: 2.0,
63 }
64 }
65}
66
67impl RetryConfig {
68 pub fn create_backoff(&self) -> ExponentialBackoff {
70 ExponentialBackoff::with_multiplier(
71 Duration::from_millis(self.initial_delay_ms),
72 Duration::from_millis(self.max_delay_ms),
73 Some(self.max_attempts),
74 self.multiplier,
75 )
76 }
77}
78
79pub struct WirePool {
92 connections: Arc<RwLock<[Option<WireStatus>; 2]>>,
94
95 ready_tx: watch::Sender<ReadySet>,
97 ready_rx: watch::Receiver<ReadySet>,
98
99 pending: Arc<AtomicU8>,
101
102 retry_config: RetryConfig,
104
105 closed: Arc<AtomicBool>,
107}
108
109impl WirePool {
110 pub fn new(retry_config: RetryConfig) -> Self {
112 let (tx, rx) = watch::channel(HashSet::new());
113
114 Self {
115 connections: Arc::new(RwLock::new([None, None])),
116 ready_tx: tx,
117 ready_rx: rx,
118 pending: Arc::new(AtomicU8::new(0)),
119 retry_config,
120 closed: Arc::new(AtomicBool::new(false)),
121 }
122 }
123
124 pub fn add_connection(&self, connection: WireHandle) {
132 let connections = Arc::clone(&self.connections);
133 let ready_tx = self.ready_tx.clone();
134 let pending = Arc::clone(&self.pending);
135 let retry_config = self.retry_config;
136 let closed = Arc::clone(&self.closed);
137
138 let conn_type = ConnType::from(&connection);
139
140 tokio::spawn(async move {
141 {
143 let mut conns = connections.write().await;
144 conns[conn_type.as_index()] = Some(WireStatus::Connecting);
145 }
146
147 let backoff = retry_config.create_backoff();
149
150 for (attempt, delay) in backoff.enumerate() {
152 if closed.load(Ordering::Relaxed) {
154 tracing::debug!(
155 "🛑 [{:?}] Connection task terminated (pool closed)",
156 conn_type
157 );
158 return;
159 }
160
161 if attempt > 0 {
163 tracing::debug!(
164 "⏱️ [{:?}] Waiting {:?} before retry {}",
165 conn_type,
166 delay,
167 attempt + 1
168 );
169 tokio::time::sleep(delay).await;
170
171 if closed.load(Ordering::Relaxed) {
173 tracing::debug!(
174 "🛑 [{:?}] Connection task terminated (pool closed)",
175 conn_type
176 );
177 return;
178 }
179 }
180
181 pending.fetch_add(1, Ordering::Relaxed);
182
183 tracing::debug!(
184 "🔄 [{:?}] Connecting (attempt {}/{})",
185 conn_type,
186 attempt + 1,
187 retry_config.max_attempts
188 );
189
190 let result = connection.connect().await;
191 pending.fetch_sub(1, Ordering::Relaxed);
192
193 match result {
194 Ok(_) => {
195 tracing::info!(
196 "✅ [{:?}] Connection established on attempt {}",
197 conn_type,
198 attempt + 1
199 );
200
201 {
203 let mut conns = connections.write().await;
204 conns[conn_type.as_index()] =
205 Some(WireStatus::Ready(connection.clone()));
206 }
207
208 Self::broadcast_ready_connections(&connections, &ready_tx).await;
210
211 return; }
213 Err(e) => {
214 tracing::warn!(
215 "❌ [{:?}] Connection failed on attempt {}: {}",
216 conn_type,
217 attempt + 1,
218 e
219 );
220 }
221 }
222 }
223
224 tracing::error!(
226 "💀 [{:?}] All {} retries exhausted",
227 conn_type,
228 retry_config.max_attempts
229 );
230
231 let mut conns = connections.write().await;
232 conns[conn_type.as_index()] = Some(WireStatus::Failed);
233
234 let remaining = pending.load(Ordering::Relaxed);
236 if remaining == 0 {
237 let all_failed = conns
238 .iter()
239 .all(|s| matches!(s, Some(WireStatus::Failed) | None));
240
241 if all_failed {
242 tracing::error!("💀💀 All connections failed");
243 }
244 }
245 });
246 }
247
248 pub async fn add_connection_smart(&self, connection: WireHandle) {
259 let conn_type = ConnType::from(&connection);
260
261 let should_add = {
263 let conns = self.connections.read().await;
264 match &conns[conn_type.as_index()] {
265 Some(WireStatus::Ready(_)) => {
266 tracing::debug!("⏭️ [{:?}] Skipping - already Ready", conn_type);
267 false
268 }
269 Some(WireStatus::Connecting) => {
270 tracing::debug!("⏭️ [{:?}] Skipping - already Connecting", conn_type);
271 false
272 }
273 Some(WireStatus::Failed) | None => {
274 tracing::info!(
275 "🔄 [{:?}] Starting connection (was {:?})",
276 conn_type,
277 conns[conn_type.as_index()]
278 );
279 true
280 }
281 }
282 };
283
284 if should_add {
285 self.add_connection(connection);
286 }
287 }
288
289 async fn broadcast_ready_connections(
291 connections: &Arc<RwLock<[Option<WireStatus>; 2]>>,
292 ready_tx: &watch::Sender<ReadySet>,
293 ) {
294 let conns = connections.read().await;
295
296 let mut ready_set: ReadySet = HashSet::new();
298
299 for conn_type in ConnType::ALL {
300 if let Some(WireStatus::Ready(_)) = &conns[conn_type.as_index()] {
301 ready_set.insert(conn_type);
302 }
303 }
304
305 let _ = ready_tx.send(ready_set);
307 }
308
309 pub fn watch_ready(&self) -> watch::Receiver<ReadySet> {
311 self.ready_rx.clone()
312 }
313
314 pub fn get_ready(&self) -> ReadySet {
316 self.ready_rx.borrow().clone()
317 }
318
319 pub async fn get_connection(&self, conn_type: ConnType) -> Option<WireHandle> {
321 let conns = self.connections.read().await;
322
323 match &conns[conn_type.as_index()] {
324 Some(WireStatus::Ready(conn)) => Some(conn.clone()),
325 _ => None,
326 }
327 }
328
329 pub async fn wait_for_any(&self) -> NetworkResult<()> {
331 let mut rx = self.ready_rx.clone();
332
333 rx.wait_for(|ready_set| !ready_set.is_empty())
334 .await
335 .map_err(|_| {
336 super::error::NetworkError::ChannelClosed("watch channel closed".to_string())
337 })?;
338
339 Ok(())
340 }
341
342 pub async fn mark_connection_closed(&self, conn_type: ConnType) {
347 {
348 let mut conns = self.connections.write().await;
349 conns[conn_type.as_index()] = Some(WireStatus::Failed);
350 }
351
352 Self::broadcast_ready_connections(&self.connections, &self.ready_tx).await;
354
355 tracing::debug!("🔌 Marked {:?} connection as closed", conn_type);
356 }
357
358 pub async fn close_all(&self) {
363 self.closed.store(true, Ordering::Relaxed);
365
366 let mut conns = self.connections.write().await;
368 *conns = [None, None];
369
370 let _ = self.ready_tx.send(HashSet::new());
372
373 tracing::debug!("🔌 Closed all connections in pool (background tasks will terminate)");
374 }
375
376 pub fn is_closed(&self) -> bool {
378 self.closed.load(Ordering::Relaxed)
379 }
380}