1use crate::{ChainConfig, Error, Result, SubstrateAdapter};
10use parking_lot::RwLock;
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13use tokio::time::sleep;
14use tracing::{debug, info, warn};
15
16#[derive(Debug, Clone)]
18pub struct PoolConfig {
19 pub endpoints: Vec<String>,
21 pub health_check_interval: Duration,
23 pub connection_timeout: Duration,
25 pub max_retries: u32,
27 pub auto_health_check: bool,
29}
30
31impl PoolConfig {
32 pub fn new(endpoints: Vec<String>) -> Self {
34 Self {
35 endpoints,
36 health_check_interval: Duration::from_secs(30),
37 connection_timeout: Duration::from_secs(10),
38 max_retries: 3,
39 auto_health_check: true,
40 }
41 }
42
43 pub fn with_health_check_interval(mut self, interval: Duration) -> Self {
45 self.health_check_interval = interval;
46 self
47 }
48
49 pub fn with_connection_timeout(mut self, timeout: Duration) -> Self {
51 self.connection_timeout = timeout;
52 self
53 }
54
55 pub fn with_max_retries(mut self, max_retries: u32) -> Self {
57 self.max_retries = max_retries;
58 self
59 }
60
61 pub fn with_auto_health_check(mut self, enabled: bool) -> Self {
63 self.auto_health_check = enabled;
64 self
65 }
66}
67
68#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70pub enum HealthStatus {
71 Healthy,
73 Unhealthy,
75 Unknown,
77}
78
79#[derive(Clone)]
81struct PooledConnection {
82 endpoint: String,
83 adapter: Option<Arc<SubstrateAdapter>>,
84 health_status: HealthStatus,
85 last_check: Instant,
86 failure_count: u32,
87}
88
89pub struct ConnectionPool {
91 config: PoolConfig,
92 connections: Arc<RwLock<Vec<PooledConnection>>>,
93 current_index: Arc<RwLock<usize>>,
94 chain_config: ChainConfig,
95}
96
97impl ConnectionPool {
98 pub async fn new(config: PoolConfig, chain_config: ChainConfig) -> Result<Self> {
100 if config.endpoints.is_empty() {
101 return Err(Error::Connection(
102 "At least one endpoint is required".to_string(),
103 ));
104 }
105
106 info!(
107 "Creating connection pool with {} endpoints",
108 config.endpoints.len()
109 );
110
111 let mut connections = Vec::new();
112
113 for endpoint in &config.endpoints {
115 debug!("Initializing connection to {}", endpoint);
116
117 let mut chain_cfg = chain_config.clone();
118 chain_cfg.endpoint = endpoint.clone();
119
120 let adapter = match SubstrateAdapter::connect_with_config(chain_cfg).await {
121 Ok(adapter) => {
122 info!("Successfully connected to {}", endpoint);
123 Some(Arc::new(adapter))
124 }
125 Err(e) => {
126 warn!("Failed to connect to {}: {}", endpoint, e);
127 None
128 }
129 };
130
131 let health_status = if adapter.is_some() {
132 HealthStatus::Healthy
133 } else {
134 HealthStatus::Unhealthy
135 };
136
137 connections.push(PooledConnection {
138 endpoint: endpoint.clone(),
139 adapter,
140 health_status,
141 last_check: Instant::now(),
142 failure_count: 0,
143 });
144 }
145
146 let pool = Self {
147 config,
148 connections: Arc::new(RwLock::new(connections)),
149 current_index: Arc::new(RwLock::new(0)),
150 chain_config,
151 };
152
153 if pool.config.auto_health_check {
155 pool.start_health_checker();
156 }
157
158 Ok(pool)
159 }
160
161 #[allow(clippy::result_large_err)]
163 pub fn get_connection(&self) -> Result<Arc<SubstrateAdapter>> {
164 let connections = self.connections.read();
165 let healthy_count = connections
166 .iter()
167 .filter(|c| c.health_status == HealthStatus::Healthy && c.adapter.is_some())
168 .count();
169
170 if healthy_count == 0 {
171 return Err(Error::Connection(
172 "No healthy connections available".to_string(),
173 ));
174 }
175
176 let start_index = *self.current_index.read();
178 let total = connections.len();
179
180 for i in 0..total {
181 let index = (start_index + i) % total;
182 let conn = &connections[index];
183
184 if conn.health_status == HealthStatus::Healthy {
185 if let Some(adapter) = &conn.adapter {
186 *self.current_index.write() = (index + 1) % total;
188 return Ok(adapter.clone());
189 }
190 }
191 }
192
193 Err(Error::Connection(
194 "No healthy connections available".to_string(),
195 ))
196 }
197
198 pub fn get_all_connections(&self) -> Vec<Arc<SubstrateAdapter>> {
200 self.connections
201 .read()
202 .iter()
203 .filter_map(|c| c.adapter.clone())
204 .collect()
205 }
206
207 pub fn stats(&self) -> PoolStats {
209 let connections = self.connections.read();
210
211 let total = connections.len();
212 let healthy = connections
213 .iter()
214 .filter(|c| c.health_status == HealthStatus::Healthy)
215 .count();
216 let unhealthy = connections
217 .iter()
218 .filter(|c| c.health_status == HealthStatus::Unhealthy)
219 .count();
220 let unknown = connections
221 .iter()
222 .filter(|c| c.health_status == HealthStatus::Unknown)
223 .count();
224
225 PoolStats {
226 total_endpoints: total,
227 healthy_endpoints: healthy,
228 unhealthy_endpoints: unhealthy,
229 unknown_endpoints: unknown,
230 }
231 }
232
233 #[allow(clippy::await_holding_lock)]
235 pub async fn health_check(&self) {
236 debug!("Running health check on all endpoints");
237
238 let mut connections = self.connections.write();
239
240 for conn in connections.iter_mut() {
241 let is_healthy = if let Some(adapter) = &conn.adapter {
242 adapter.is_connected()
244 } else {
245 false
246 };
247
248 conn.health_status = if is_healthy {
249 HealthStatus::Healthy
250 } else {
251 HealthStatus::Unhealthy
252 };
253 conn.last_check = Instant::now();
254
255 if !is_healthy {
256 conn.failure_count += 1;
257
258 if conn.failure_count <= self.config.max_retries {
260 debug!("Attempting to reconnect to {}", conn.endpoint);
261
262 let mut chain_cfg = self.chain_config.clone();
263 chain_cfg.endpoint = conn.endpoint.clone();
264
265 match SubstrateAdapter::connect_with_config(chain_cfg).await {
266 Ok(adapter) => {
267 info!("Successfully reconnected to {}", conn.endpoint);
268 conn.adapter = Some(Arc::new(adapter));
269 conn.health_status = HealthStatus::Healthy;
270 conn.failure_count = 0;
271 }
272 Err(e) => {
273 warn!("Failed to reconnect to {}: {}", conn.endpoint, e);
274 }
275 }
276 }
277 } else {
278 conn.failure_count = 0;
280 }
281 }
282 }
283
284 fn start_health_checker(&self) {
286 let connections = self.connections.clone();
287 let interval = self.config.health_check_interval;
288 let chain_config = self.chain_config.clone();
289 let max_retries = self.config.max_retries;
290
291 tokio::spawn(async move {
292 loop {
293 sleep(interval).await;
294
295 debug!("Background health check running");
296
297 let endpoints_to_reconnect: Vec<(String, bool)> = {
299 let mut conns = connections.write();
300 let mut to_reconnect = Vec::new();
301
302 for conn in conns.iter_mut() {
303 let is_healthy = if let Some(adapter) = &conn.adapter {
304 adapter.is_connected()
305 } else {
306 false
307 };
308
309 conn.health_status = if is_healthy {
310 HealthStatus::Healthy
311 } else {
312 HealthStatus::Unhealthy
313 };
314 conn.last_check = Instant::now();
315
316 if !is_healthy && conn.failure_count <= max_retries {
317 to_reconnect.push((conn.endpoint.clone(), true));
318 }
319 }
320
321 to_reconnect
322 };
323
324 for (endpoint, _) in endpoints_to_reconnect {
326 let mut chain_cfg = chain_config.clone();
327 chain_cfg.endpoint = endpoint.clone();
328
329 if let Ok(adapter) = SubstrateAdapter::connect_with_config(chain_cfg).await {
330 let mut conns = connections.write();
331 if let Some(conn) = conns.iter_mut().find(|c| c.endpoint == endpoint) {
332 conn.adapter = Some(Arc::new(adapter));
333 conn.health_status = HealthStatus::Healthy;
334 conn.failure_count = 0;
335 }
336 } else {
337 let mut conns = connections.write();
338 if let Some(conn) = conns.iter_mut().find(|c| c.endpoint == endpoint) {
339 conn.failure_count += 1;
340 }
341 }
342 }
343 }
344 });
345 }
346
347 pub fn endpoint_count(&self) -> usize {
349 self.connections.read().len()
350 }
351
352 pub async fn add_endpoint(&self, endpoint: String) -> Result<()> {
354 info!("Adding new endpoint to pool: {}", endpoint);
355
356 let mut chain_cfg = self.chain_config.clone();
357 chain_cfg.endpoint = endpoint.clone();
358
359 let adapter = match SubstrateAdapter::connect_with_config(chain_cfg).await {
360 Ok(adapter) => Some(Arc::new(adapter)),
361 Err(e) => {
362 warn!("Failed to connect to new endpoint {}: {}", endpoint, e);
363 None
364 }
365 };
366
367 let health_status = if adapter.is_some() {
368 HealthStatus::Healthy
369 } else {
370 HealthStatus::Unhealthy
371 };
372
373 let conn = PooledConnection {
374 endpoint,
375 adapter,
376 health_status,
377 last_check: Instant::now(),
378 failure_count: 0,
379 };
380
381 self.connections.write().push(conn);
382 Ok(())
383 }
384
385 #[allow(clippy::result_large_err)]
387 pub fn remove_endpoint(&self, endpoint: &str) -> Result<()> {
388 info!("Removing endpoint from pool: {}", endpoint);
389
390 let mut connections = self.connections.write();
391 let initial_len = connections.len();
392
393 connections.retain(|c| c.endpoint != endpoint);
394
395 if connections.len() == initial_len {
396 return Err(Error::Connection(format!(
397 "Endpoint '{}' not found in pool",
398 endpoint
399 )));
400 }
401
402 if connections.is_empty() {
403 return Err(Error::Connection(
404 "Cannot remove last endpoint from pool".to_string(),
405 ));
406 }
407
408 Ok(())
409 }
410}
411
412#[derive(Debug, Clone)]
414pub struct PoolStats {
415 pub total_endpoints: usize,
417 pub healthy_endpoints: usize,
419 pub unhealthy_endpoints: usize,
421 pub unknown_endpoints: usize,
423}
424
425impl PoolStats {
426 pub fn health_percentage(&self) -> f64 {
428 if self.total_endpoints == 0 {
429 0.0
430 } else {
431 (self.healthy_endpoints as f64 / self.total_endpoints as f64) * 100.0
432 }
433 }
434}
435
436impl std::fmt::Display for PoolStats {
437 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
438 write!(
439 f,
440 "Pool: {} total, {} healthy ({:.1}%), {} unhealthy, {} unknown",
441 self.total_endpoints,
442 self.healthy_endpoints,
443 self.health_percentage(),
444 self.unhealthy_endpoints,
445 self.unknown_endpoints
446 )
447 }
448}
449
450#[cfg(test)]
451mod tests {
452 use super::*;
453
454 #[test]
455 fn test_pool_config() {
456 let config = PoolConfig::new(vec!["wss://endpoint1".to_string()])
457 .with_health_check_interval(Duration::from_secs(60))
458 .with_max_retries(5);
459
460 assert_eq!(config.endpoints.len(), 1);
461 assert_eq!(config.health_check_interval, Duration::from_secs(60));
462 assert_eq!(config.max_retries, 5);
463 }
464
465 #[test]
466 fn test_pool_stats() {
467 let stats = PoolStats {
468 total_endpoints: 4,
469 healthy_endpoints: 3,
470 unhealthy_endpoints: 1,
471 unknown_endpoints: 0,
472 };
473
474 assert_eq!(stats.health_percentage(), 75.0);
475 }
476
477 #[tokio::test]
478 #[ignore] async fn test_connection_pool() {
480 let config = PoolConfig::new(vec!["wss://westend-rpc.polkadot.io".to_string()]);
481
482 let pool = ConnectionPool::new(config, ChainConfig::westend()).await;
483 assert!(pool.is_ok());
484
485 let pool = pool.unwrap();
486 let stats = pool.stats();
487 assert!(stats.total_endpoints > 0);
488 }
489}