1use crate::{ChainConfig, Error, Result, SubstrateAdapter};
10use parking_lot::RwLock;
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13use tokio::time::sleep;
14use tracing::{debug, info, warn};
15
16#[derive(Debug, Clone)]
18pub struct PoolConfig {
19 pub endpoints: Vec<String>,
21 pub health_check_interval: Duration,
23 pub connection_timeout: Duration,
25 pub max_retries: u32,
27 pub auto_health_check: bool,
29}
30
31impl PoolConfig {
32 pub fn new(endpoints: Vec<String>) -> Self {
34 Self {
35 endpoints,
36 health_check_interval: Duration::from_secs(30),
37 connection_timeout: Duration::from_secs(10),
38 max_retries: 3,
39 auto_health_check: true,
40 }
41 }
42
43 pub fn with_health_check_interval(mut self, interval: Duration) -> Self {
45 self.health_check_interval = interval;
46 self
47 }
48
49 pub fn with_connection_timeout(mut self, timeout: Duration) -> Self {
51 self.connection_timeout = timeout;
52 self
53 }
54
55 pub fn with_max_retries(mut self, max_retries: u32) -> Self {
57 self.max_retries = max_retries;
58 self
59 }
60
61 pub fn with_auto_health_check(mut self, enabled: bool) -> Self {
63 self.auto_health_check = enabled;
64 self
65 }
66}
67
68#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70pub enum HealthStatus {
71 Healthy,
73 Unhealthy,
75 Unknown,
77}
78
79#[derive(Clone)]
81struct PooledConnection {
82 endpoint: String,
83 adapter: Option<Arc<SubstrateAdapter>>,
84 health_status: HealthStatus,
85 last_check: Instant,
86 failure_count: u32,
87}
88
89pub struct ConnectionPool {
91 config: PoolConfig,
92 connections: Arc<RwLock<Vec<PooledConnection>>>,
93 current_index: Arc<RwLock<usize>>,
94 chain_config: ChainConfig,
95}
96
97impl ConnectionPool {
98 pub async fn new(config: PoolConfig, chain_config: ChainConfig) -> Result<Self> {
100 if config.endpoints.is_empty() {
101 return Err(Error::Connection(
102 "At least one endpoint is required".to_string(),
103 ));
104 }
105
106 info!(
107 "Creating connection pool with {} endpoints",
108 config.endpoints.len()
109 );
110
111 let mut connections = Vec::new();
112
113 for endpoint in &config.endpoints {
115 debug!("Initializing connection to {}", endpoint);
116
117 let mut chain_cfg = chain_config.clone();
118 chain_cfg.endpoint = endpoint.clone();
119
120 let adapter = match SubstrateAdapter::connect_with_config(chain_cfg).await {
121 Ok(adapter) => {
122 info!("Successfully connected to {}", endpoint);
123 Some(Arc::new(adapter))
124 }
125 Err(e) => {
126 warn!("Failed to connect to {}: {}", endpoint, e);
127 None
128 }
129 };
130
131 let health_status = if adapter.is_some() {
132 HealthStatus::Healthy
133 } else {
134 HealthStatus::Unhealthy
135 };
136
137 connections.push(PooledConnection {
138 endpoint: endpoint.clone(),
139 adapter,
140 health_status,
141 last_check: Instant::now(),
142 failure_count: 0,
143 });
144 }
145
146 let pool = Self {
147 config,
148 connections: Arc::new(RwLock::new(connections)),
149 current_index: Arc::new(RwLock::new(0)),
150 chain_config,
151 };
152
153 if pool.config.auto_health_check {
155 pool.start_health_checker();
156 }
157
158 Ok(pool)
159 }
160
161 #[allow(clippy::result_large_err)]
163 pub fn get_connection(&self) -> Result<Arc<SubstrateAdapter>> {
164 let connections = self.connections.read();
165 let healthy_count = connections
166 .iter()
167 .filter(|c| c.health_status == HealthStatus::Healthy && c.adapter.is_some())
168 .count();
169
170 if healthy_count == 0 {
171 return Err(Error::Connection(
172 "No healthy connections available".to_string(),
173 ));
174 }
175
176 let start_index = *self.current_index.read();
178 let total = connections.len();
179
180 for i in 0..total {
181 let index = (start_index + i) % total;
182 let conn = &connections[index];
183
184 if conn.health_status == HealthStatus::Healthy {
185 if let Some(adapter) = &conn.adapter {
186 *self.current_index.write() = (index + 1) % total;
188 return Ok(adapter.clone());
189 }
190 }
191 }
192
193 Err(Error::Connection(
194 "No healthy connections available".to_string(),
195 ))
196 }
197
198 pub fn get_all_connections(&self) -> Vec<Arc<SubstrateAdapter>> {
200 self.connections
201 .read()
202 .iter()
203 .filter_map(|c| c.adapter.clone())
204 .collect()
205 }
206
207 pub fn stats(&self) -> PoolStats {
209 let connections = self.connections.read();
210
211 let total = connections.len();
212 let healthy = connections
213 .iter()
214 .filter(|c| c.health_status == HealthStatus::Healthy)
215 .count();
216 let unhealthy = connections
217 .iter()
218 .filter(|c| c.health_status == HealthStatus::Unhealthy)
219 .count();
220 let unknown = connections
221 .iter()
222 .filter(|c| c.health_status == HealthStatus::Unknown)
223 .count();
224
225 PoolStats {
226 total_endpoints: total,
227 healthy_endpoints: healthy,
228 unhealthy_endpoints: unhealthy,
229 unknown_endpoints: unknown,
230 }
231 }
232
233 pub async fn health_check(&self) {
235 debug!("Running health check on all endpoints");
236
237 let endpoints_to_reconnect: Vec<String> = {
239 let mut connections = self.connections.write();
240
241 let mut to_reconnect = Vec::new();
242
243 for conn in connections.iter_mut() {
244 let is_healthy = if let Some(adapter) = &conn.adapter {
245 adapter.is_connected()
247 } else {
248 false
249 };
250
251 conn.health_status = if is_healthy {
252 HealthStatus::Healthy
253 } else {
254 HealthStatus::Unhealthy
255 };
256 conn.last_check = Instant::now();
257
258 if !is_healthy {
259 conn.failure_count += 1;
260
261 if conn.failure_count <= self.config.max_retries {
263 to_reconnect.push(conn.endpoint.clone());
264 }
265 } else {
266 conn.failure_count = 0;
268 }
269 }
270
271 to_reconnect
272 };
273
274 for endpoint in endpoints_to_reconnect {
276 debug!("Attempting to reconnect to {}", endpoint);
277
278 let mut chain_cfg = self.chain_config.clone();
279 chain_cfg.endpoint = endpoint.clone();
280
281 match SubstrateAdapter::connect_with_config(chain_cfg).await {
282 Ok(adapter) => {
283 info!("Successfully reconnected to {}", endpoint);
284 let mut connections = self.connections.write();
285 if let Some(conn) = connections.iter_mut().find(|c| c.endpoint == endpoint) {
286 conn.adapter = Some(Arc::new(adapter));
287 conn.health_status = HealthStatus::Healthy;
288 conn.failure_count = 0;
289 }
290 }
291 Err(e) => {
292 warn!("Failed to reconnect to {}: {}", endpoint, e);
293 }
294 }
295 }
296 }
297
298 fn start_health_checker(&self) {
300 let connections = self.connections.clone();
301 let interval = self.config.health_check_interval;
302 let chain_config = self.chain_config.clone();
303 let max_retries = self.config.max_retries;
304
305 tokio::spawn(async move {
306 loop {
307 sleep(interval).await;
308
309 debug!("Background health check running");
310
311 let endpoints_to_reconnect: Vec<(String, bool)> = {
313 let mut conns = connections.write();
314 let mut to_reconnect = Vec::new();
315
316 for conn in conns.iter_mut() {
317 let is_healthy = if let Some(adapter) = &conn.adapter {
318 adapter.is_connected()
319 } else {
320 false
321 };
322
323 conn.health_status = if is_healthy {
324 HealthStatus::Healthy
325 } else {
326 HealthStatus::Unhealthy
327 };
328 conn.last_check = Instant::now();
329
330 if !is_healthy && conn.failure_count <= max_retries {
331 to_reconnect.push((conn.endpoint.clone(), true));
332 }
333 }
334
335 to_reconnect
336 };
337
338 for (endpoint, _) in endpoints_to_reconnect {
340 let mut chain_cfg = chain_config.clone();
341 chain_cfg.endpoint = endpoint.clone();
342
343 if let Ok(adapter) = SubstrateAdapter::connect_with_config(chain_cfg).await {
344 let mut conns = connections.write();
345 if let Some(conn) = conns.iter_mut().find(|c| c.endpoint == endpoint) {
346 conn.adapter = Some(Arc::new(adapter));
347 conn.health_status = HealthStatus::Healthy;
348 conn.failure_count = 0;
349 }
350 } else {
351 let mut conns = connections.write();
352 if let Some(conn) = conns.iter_mut().find(|c| c.endpoint == endpoint) {
353 conn.failure_count += 1;
354 }
355 }
356 }
357 }
358 });
359 }
360
361 pub fn endpoint_count(&self) -> usize {
363 self.connections.read().len()
364 }
365
366 pub async fn add_endpoint(&self, endpoint: String) -> Result<()> {
368 info!("Adding new endpoint to pool: {}", endpoint);
369
370 let mut chain_cfg = self.chain_config.clone();
371 chain_cfg.endpoint = endpoint.clone();
372
373 let adapter = match SubstrateAdapter::connect_with_config(chain_cfg).await {
374 Ok(adapter) => Some(Arc::new(adapter)),
375 Err(e) => {
376 warn!("Failed to connect to new endpoint {}: {}", endpoint, e);
377 None
378 }
379 };
380
381 let health_status = if adapter.is_some() {
382 HealthStatus::Healthy
383 } else {
384 HealthStatus::Unhealthy
385 };
386
387 let conn = PooledConnection {
388 endpoint,
389 adapter,
390 health_status,
391 last_check: Instant::now(),
392 failure_count: 0,
393 };
394
395 self.connections.write().push(conn);
396 Ok(())
397 }
398
399 #[allow(clippy::result_large_err)]
401 pub fn remove_endpoint(&self, endpoint: &str) -> Result<()> {
402 info!("Removing endpoint from pool: {}", endpoint);
403
404 let mut connections = self.connections.write();
405 let initial_len = connections.len();
406
407 connections.retain(|c| c.endpoint != endpoint);
408
409 if connections.len() == initial_len {
410 return Err(Error::Connection(format!(
411 "Endpoint '{}' not found in pool",
412 endpoint
413 )));
414 }
415
416 if connections.is_empty() {
417 return Err(Error::Connection(
418 "Cannot remove last endpoint from pool".to_string(),
419 ));
420 }
421
422 Ok(())
423 }
424}
425
426#[derive(Debug, Clone)]
428pub struct PoolStats {
429 pub total_endpoints: usize,
431 pub healthy_endpoints: usize,
433 pub unhealthy_endpoints: usize,
435 pub unknown_endpoints: usize,
437}
438
439impl PoolStats {
440 pub fn health_percentage(&self) -> f64 {
442 if self.total_endpoints == 0 {
443 0.0
444 } else {
445 (self.healthy_endpoints as f64 / self.total_endpoints as f64) * 100.0
446 }
447 }
448}
449
450impl std::fmt::Display for PoolStats {
451 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
452 write!(
453 f,
454 "Pool: {} total, {} healthy ({:.1}%), {} unhealthy, {} unknown",
455 self.total_endpoints,
456 self.healthy_endpoints,
457 self.health_percentage(),
458 self.unhealthy_endpoints,
459 self.unknown_endpoints
460 )
461 }
462}
463
464#[cfg(test)]
465mod tests {
466 use super::*;
467
468 #[test]
469 fn test_pool_config() {
470 let config = PoolConfig::new(vec!["wss://endpoint1".to_string()])
471 .with_health_check_interval(Duration::from_secs(60))
472 .with_max_retries(5);
473
474 assert_eq!(config.endpoints.len(), 1);
475 assert_eq!(config.health_check_interval, Duration::from_secs(60));
476 assert_eq!(config.max_retries, 5);
477 }
478
479 #[test]
480 fn test_pool_stats() {
481 let stats = PoolStats {
482 total_endpoints: 4,
483 healthy_endpoints: 3,
484 unhealthy_endpoints: 1,
485 unknown_endpoints: 0,
486 };
487
488 assert_eq!(stats.health_percentage(), 75.0);
489 }
490
491 #[tokio::test]
492 #[ignore] async fn test_connection_pool() {
494 let config = PoolConfig::new(vec!["wss://westend-rpc.polkadot.io".to_string()]);
495
496 let pool = ConnectionPool::new(config, ChainConfig::westend()).await;
497 assert!(pool.is_ok());
498
499 let pool = pool.unwrap();
500 let stats = pool.stats();
501 assert!(stats.total_endpoints > 0);
502 }
503}