actr_runtime/transport/
wire_pool.rs1use super::backoff::ExponentialBackoff;
7use super::error::NetworkResult;
8use super::wire_handle::{WireHandle, WireStatus};
9use std::collections::HashSet;
10use std::sync::Arc;
11use std::sync::atomic::{AtomicU8, Ordering};
12use std::time::Duration;
13use tokio::sync::{RwLock, watch};
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
17pub enum ConnType {
18 WebSocket,
19 WebRTC,
20}
21
22impl ConnType {
23 const fn as_index(self) -> usize {
25 match self {
26 ConnType::WebSocket => 0,
27 ConnType::WebRTC => 1,
28 }
29 }
30
31 const ALL: [ConnType; 2] = [ConnType::WebSocket, ConnType::WebRTC];
33}
34
35impl From<&WireHandle> for ConnType {
36 fn from(conn: &WireHandle) -> Self {
37 match conn {
38 WireHandle::WebSocket(_) => ConnType::WebSocket,
39 WireHandle::WebRTC(_) => ConnType::WebRTC,
40 }
41 }
42}
43
44pub type ReadySet = HashSet<ConnType>;
46
47#[derive(Debug, Clone, Copy)]
49pub struct RetryConfig {
50 pub max_attempts: u32,
51 pub initial_delay_ms: u64,
52 pub max_delay_ms: u64,
53 pub multiplier: f64,
54}
55
56impl Default for RetryConfig {
57 fn default() -> Self {
58 Self {
59 max_attempts: 3,
60 initial_delay_ms: 1000,
61 max_delay_ms: 10000,
62 multiplier: 2.0,
63 }
64 }
65}
66
67impl RetryConfig {
68 pub fn create_backoff(&self) -> ExponentialBackoff {
70 ExponentialBackoff::with_multiplier(
71 Duration::from_millis(self.initial_delay_ms),
72 Duration::from_millis(self.max_delay_ms),
73 Some(self.max_attempts),
74 self.multiplier,
75 )
76 }
77}
78
79pub struct WirePool {
92 connections: Arc<RwLock<[Option<WireStatus>; 2]>>,
94
95 ready_tx: watch::Sender<ReadySet>,
97 ready_rx: watch::Receiver<ReadySet>,
98
99 pending: Arc<AtomicU8>,
101
102 retry_config: RetryConfig,
104}
105
106impl WirePool {
107 pub fn new(retry_config: RetryConfig) -> Self {
109 let (tx, rx) = watch::channel(HashSet::new());
110
111 Self {
112 connections: Arc::new(RwLock::new([None, None])),
113 ready_tx: tx,
114 ready_rx: rx,
115 pending: Arc::new(AtomicU8::new(0)),
116 retry_config,
117 }
118 }
119
120 pub fn add_connection(&self, connection: WireHandle) {
128 let connections = Arc::clone(&self.connections);
129 let ready_tx = self.ready_tx.clone();
130 let pending = Arc::clone(&self.pending);
131 let retry_config = self.retry_config;
132
133 let conn_type = ConnType::from(&connection);
134
135 tokio::spawn(async move {
136 {
138 let mut conns = connections.write().await;
139 conns[conn_type.as_index()] = Some(WireStatus::Connecting);
140 }
141
142 let backoff = retry_config.create_backoff();
144
145 for (attempt, delay) in backoff.enumerate() {
147 if attempt > 0 {
149 tracing::debug!(
150 "⏱️ [{:?}] Waiting {:?} before retry {}",
151 conn_type,
152 delay,
153 attempt + 1
154 );
155 tokio::time::sleep(delay).await;
156 }
157
158 pending.fetch_add(1, Ordering::Relaxed);
159
160 tracing::debug!(
161 "🔄 [{:?}] Connecting (attempt {}/{})",
162 conn_type,
163 attempt + 1,
164 retry_config.max_attempts
165 );
166
167 let result = connection.connect().await;
168 pending.fetch_sub(1, Ordering::Relaxed);
169
170 match result {
171 Ok(_) => {
172 tracing::info!(
173 "✅ [{:?}] Connection established on attempt {}",
174 conn_type,
175 attempt + 1
176 );
177
178 {
180 let mut conns = connections.write().await;
181 conns[conn_type.as_index()] =
182 Some(WireStatus::Ready(connection.clone()));
183 }
184
185 Self::broadcast_ready_connections(&connections, &ready_tx).await;
187
188 return; }
190 Err(e) => {
191 tracing::warn!(
192 "❌ [{:?}] Connection failed on attempt {}: {}",
193 conn_type,
194 attempt + 1,
195 e
196 );
197 }
198 }
199 }
200
201 tracing::error!(
203 "💀 [{:?}] All {} retries exhausted",
204 conn_type,
205 retry_config.max_attempts
206 );
207
208 let mut conns = connections.write().await;
209 conns[conn_type.as_index()] = Some(WireStatus::Failed);
210
211 let remaining = pending.load(Ordering::Relaxed);
213 if remaining == 0 {
214 let all_failed = conns
215 .iter()
216 .all(|s| matches!(s, Some(WireStatus::Failed) | None));
217
218 if all_failed {
219 tracing::error!("💀💀 All connections failed");
220 }
221 }
222 });
223 }
224
225 pub async fn add_connection_smart(&self, connection: WireHandle) {
236 let conn_type = ConnType::from(&connection);
237
238 let should_add = {
240 let conns = self.connections.read().await;
241 match &conns[conn_type.as_index()] {
242 Some(WireStatus::Ready(_)) => {
243 tracing::debug!("⏭️ [{:?}] Skipping - already Ready", conn_type);
244 false
245 }
246 Some(WireStatus::Connecting) => {
247 tracing::debug!("⏭️ [{:?}] Skipping - already Connecting", conn_type);
248 false
249 }
250 Some(WireStatus::Failed) | None => {
251 tracing::info!(
252 "🔄 [{:?}] Starting connection (was {:?})",
253 conn_type,
254 conns[conn_type.as_index()]
255 );
256 true
257 }
258 }
259 };
260
261 if should_add {
262 self.add_connection(connection);
263 }
264 }
265
266 async fn broadcast_ready_connections(
268 connections: &Arc<RwLock<[Option<WireStatus>; 2]>>,
269 ready_tx: &watch::Sender<ReadySet>,
270 ) {
271 let conns = connections.read().await;
272
273 let mut ready_set: ReadySet = HashSet::new();
275
276 for conn_type in ConnType::ALL {
277 if let Some(WireStatus::Ready(_)) = &conns[conn_type.as_index()] {
278 ready_set.insert(conn_type);
279 }
280 }
281
282 let _ = ready_tx.send(ready_set);
284 }
285
286 pub fn watch_ready(&self) -> watch::Receiver<ReadySet> {
288 self.ready_rx.clone()
289 }
290
291 pub fn get_ready(&self) -> ReadySet {
293 self.ready_rx.borrow().clone()
294 }
295
296 pub async fn get_connection(&self, conn_type: ConnType) -> Option<WireHandle> {
298 let conns = self.connections.read().await;
299
300 match &conns[conn_type.as_index()] {
301 Some(WireStatus::Ready(conn)) => Some(conn.clone()),
302 _ => None,
303 }
304 }
305
306 pub async fn wait_for_any(&self) -> NetworkResult<()> {
308 let mut rx = self.ready_rx.clone();
309
310 rx.wait_for(|ready_set| !ready_set.is_empty())
311 .await
312 .map_err(|_| {
313 super::error::NetworkError::ChannelClosed("watch channel closed".to_string())
314 })?;
315
316 Ok(())
317 }
318}