1use crate::{ChainConfig, Error, Result, SubstrateAdapter};
10use parking_lot::RwLock;
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13use tokio::time::sleep;
14use tracing::{debug, info, warn};
15
16#[derive(Debug, Clone)]
18pub struct PoolConfig {
19 pub endpoints: Vec<String>,
21 pub health_check_interval: Duration,
23 pub connection_timeout: Duration,
25 pub max_retries: u32,
27 pub auto_health_check: bool,
29}
30
31impl PoolConfig {
32 pub fn new(endpoints: Vec<String>) -> Self {
34 Self {
35 endpoints,
36 health_check_interval: Duration::from_secs(30),
37 connection_timeout: Duration::from_secs(10),
38 max_retries: 3,
39 auto_health_check: true,
40 }
41 }
42
43 pub fn with_health_check_interval(mut self, interval: Duration) -> Self {
45 self.health_check_interval = interval;
46 self
47 }
48
49 pub fn with_connection_timeout(mut self, timeout: Duration) -> Self {
51 self.connection_timeout = timeout;
52 self
53 }
54
55 pub fn with_max_retries(mut self, max_retries: u32) -> Self {
57 self.max_retries = max_retries;
58 self
59 }
60
61 pub fn with_auto_health_check(mut self, enabled: bool) -> Self {
63 self.auto_health_check = enabled;
64 self
65 }
66}
67
68#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70pub enum HealthStatus {
71 Healthy,
73 Unhealthy,
75 Unknown,
77}
78
79#[derive(Clone)]
81struct PooledConnection {
82 endpoint: String,
83 adapter: Option<Arc<SubstrateAdapter>>,
84 health_status: HealthStatus,
85 last_check: Instant,
86 failure_count: u32,
87}
88
89pub struct ConnectionPool {
91 config: PoolConfig,
92 connections: Arc<RwLock<Vec<PooledConnection>>>,
93 current_index: Arc<RwLock<usize>>,
94 chain_config: ChainConfig,
95}
96
97impl ConnectionPool {
98 pub async fn new(config: PoolConfig, chain_config: ChainConfig) -> Result<Self> {
100 if config.endpoints.is_empty() {
101 return Err(Error::Connection(
102 "At least one endpoint is required".to_string(),
103 ));
104 }
105
106 info!(
107 "Creating connection pool with {} endpoints",
108 config.endpoints.len()
109 );
110
111 let mut connections = Vec::new();
112
113 for endpoint in &config.endpoints {
115 debug!("Initializing connection to {}", endpoint);
116
117 let mut chain_cfg = chain_config.clone();
118 chain_cfg.endpoint = endpoint.clone();
119
120 let adapter = match SubstrateAdapter::connect_with_config(chain_cfg).await {
121 Ok(adapter) => {
122 info!("Successfully connected to {}", endpoint);
123 Some(Arc::new(adapter))
124 }
125 Err(e) => {
126 warn!("Failed to connect to {}: {}", endpoint, e);
127 None
128 }
129 };
130
131 let health_status = if adapter.is_some() {
132 HealthStatus::Healthy
133 } else {
134 HealthStatus::Unhealthy
135 };
136
137 connections.push(PooledConnection {
138 endpoint: endpoint.clone(),
139 adapter,
140 health_status,
141 last_check: Instant::now(),
142 failure_count: 0,
143 });
144 }
145
146 let pool = Self {
147 config,
148 connections: Arc::new(RwLock::new(connections)),
149 current_index: Arc::new(RwLock::new(0)),
150 chain_config,
151 };
152
153 if pool.config.auto_health_check {
155 pool.start_health_checker();
156 }
157
158 Ok(pool)
159 }
160
161 pub fn get_connection(&self) -> Result<Arc<SubstrateAdapter>> {
163 let connections = self.connections.read();
164 let healthy_count = connections
165 .iter()
166 .filter(|c| c.health_status == HealthStatus::Healthy && c.adapter.is_some())
167 .count();
168
169 if healthy_count == 0 {
170 return Err(Error::Connection(
171 "No healthy connections available".to_string(),
172 ));
173 }
174
175 let start_index = *self.current_index.read();
177 let total = connections.len();
178
179 for i in 0..total {
180 let index = (start_index + i) % total;
181 let conn = &connections[index];
182
183 if conn.health_status == HealthStatus::Healthy {
184 if let Some(adapter) = &conn.adapter {
185 *self.current_index.write() = (index + 1) % total;
187 return Ok(adapter.clone());
188 }
189 }
190 }
191
192 Err(Error::Connection(
193 "No healthy connections available".to_string(),
194 ))
195 }
196
197 pub fn get_all_connections(&self) -> Vec<Arc<SubstrateAdapter>> {
199 self.connections
200 .read()
201 .iter()
202 .filter_map(|c| c.adapter.clone())
203 .collect()
204 }
205
206 pub fn stats(&self) -> PoolStats {
208 let connections = self.connections.read();
209
210 let total = connections.len();
211 let healthy = connections
212 .iter()
213 .filter(|c| c.health_status == HealthStatus::Healthy)
214 .count();
215 let unhealthy = connections
216 .iter()
217 .filter(|c| c.health_status == HealthStatus::Unhealthy)
218 .count();
219 let unknown = connections
220 .iter()
221 .filter(|c| c.health_status == HealthStatus::Unknown)
222 .count();
223
224 PoolStats {
225 total_endpoints: total,
226 healthy_endpoints: healthy,
227 unhealthy_endpoints: unhealthy,
228 unknown_endpoints: unknown,
229 }
230 }
231
232 pub async fn health_check(&self) {
234 debug!("Running health check on all endpoints");
235
236 let reconnect_endpoints: Vec<String> = {
238 let mut connections = self.connections.write();
239
240 for conn in connections.iter_mut() {
241 let is_healthy = if let Some(adapter) = &conn.adapter {
242 adapter.is_connected()
244 } else {
245 false
246 };
247
248 conn.health_status = if is_healthy {
249 HealthStatus::Healthy
250 } else {
251 HealthStatus::Unhealthy
252 };
253 conn.last_check = Instant::now();
254
255 if !is_healthy {
256 conn.failure_count += 1;
257 } else {
258 conn.failure_count = 0;
260 }
261 }
262
263 connections
265 .iter()
266 .filter(|conn| {
267 conn.health_status == HealthStatus::Unhealthy
268 && conn.failure_count <= self.config.max_retries
269 })
270 .map(|conn| conn.endpoint.clone())
271 .collect()
272 }; for endpoint in reconnect_endpoints {
276 debug!("Attempting to reconnect to {}", endpoint);
277
278 let mut chain_cfg = self.chain_config.clone();
279 chain_cfg.endpoint = endpoint.clone();
280
281 match SubstrateAdapter::connect_with_config(chain_cfg).await {
282 Ok(adapter) => {
283 info!("Successfully reconnected to {}", endpoint);
284 let mut connections = self.connections.write();
285 if let Some(conn) = connections.iter_mut().find(|c| c.endpoint == endpoint) {
286 conn.adapter = Some(Arc::new(adapter));
287 conn.health_status = HealthStatus::Healthy;
288 conn.failure_count = 0;
289 }
290 }
291 Err(e) => {
292 warn!("Failed to reconnect to {}: {}", endpoint, e);
293 }
294 }
295 }
296 }
297
298 fn start_health_checker(&self) {
300 let connections = self.connections.clone();
301 let interval = self.config.health_check_interval;
302 let chain_config = self.chain_config.clone();
303 let max_retries = self.config.max_retries;
304
305 tokio::spawn(async move {
306 loop {
307 sleep(interval).await;
308
309 debug!("Background health check running");
310
311 let endpoints_to_reconnect: Vec<(String, bool)> = {
313 let mut conns = connections.write();
314 let mut to_reconnect = Vec::new();
315
316 for conn in conns.iter_mut() {
317 let is_healthy = if let Some(adapter) = &conn.adapter {
318 adapter.is_connected()
319 } else {
320 false
321 };
322
323 conn.health_status = if is_healthy {
324 HealthStatus::Healthy
325 } else {
326 HealthStatus::Unhealthy
327 };
328 conn.last_check = Instant::now();
329
330 if !is_healthy && conn.failure_count <= max_retries {
331 to_reconnect.push((conn.endpoint.clone(), true));
332 }
333 }
334
335 to_reconnect
336 };
337
338 for (endpoint, _) in endpoints_to_reconnect {
340 let mut chain_cfg = chain_config.clone();
341 chain_cfg.endpoint = endpoint.clone();
342
343 if let Ok(adapter) = SubstrateAdapter::connect_with_config(chain_cfg).await {
344 let mut conns = connections.write();
345 if let Some(conn) = conns.iter_mut().find(|c| c.endpoint == endpoint) {
346 conn.adapter = Some(Arc::new(adapter));
347 conn.health_status = HealthStatus::Healthy;
348 conn.failure_count = 0;
349 }
350 } else {
351 let mut conns = connections.write();
352 if let Some(conn) = conns.iter_mut().find(|c| c.endpoint == endpoint) {
353 conn.failure_count += 1;
354 }
355 }
356 }
357 }
358 });
359 }
360
361 pub fn endpoint_count(&self) -> usize {
363 self.connections.read().len()
364 }
365
366 pub async fn add_endpoint(&self, endpoint: String) -> Result<()> {
368 info!("Adding new endpoint to pool: {}", endpoint);
369
370 let mut chain_cfg = self.chain_config.clone();
371 chain_cfg.endpoint = endpoint.clone();
372
373 let adapter = match SubstrateAdapter::connect_with_config(chain_cfg).await {
374 Ok(adapter) => Some(Arc::new(adapter)),
375 Err(e) => {
376 warn!("Failed to connect to new endpoint {}: {}", endpoint, e);
377 None
378 }
379 };
380
381 let health_status = if adapter.is_some() {
382 HealthStatus::Healthy
383 } else {
384 HealthStatus::Unhealthy
385 };
386
387 let conn = PooledConnection {
388 endpoint,
389 adapter,
390 health_status,
391 last_check: Instant::now(),
392 failure_count: 0,
393 };
394
395 self.connections.write().push(conn);
396 Ok(())
397 }
398
399 pub fn remove_endpoint(&self, endpoint: &str) -> Result<()> {
401 info!("Removing endpoint from pool: {}", endpoint);
402
403 let mut connections = self.connections.write();
404 let initial_len = connections.len();
405
406 connections.retain(|c| c.endpoint != endpoint);
407
408 if connections.len() == initial_len {
409 return Err(Error::Connection(format!(
410 "Endpoint '{}' not found in pool",
411 endpoint
412 )));
413 }
414
415 if connections.is_empty() {
416 return Err(Error::Connection(
417 "Cannot remove last endpoint from pool".to_string(),
418 ));
419 }
420
421 Ok(())
422 }
423}
424
425#[derive(Debug, Clone)]
427pub struct PoolStats {
428 pub total_endpoints: usize,
430 pub healthy_endpoints: usize,
432 pub unhealthy_endpoints: usize,
434 pub unknown_endpoints: usize,
436}
437
438impl PoolStats {
439 pub fn health_percentage(&self) -> f64 {
441 if self.total_endpoints == 0 {
442 0.0
443 } else {
444 (self.healthy_endpoints as f64 / self.total_endpoints as f64) * 100.0
445 }
446 }
447}
448
449impl std::fmt::Display for PoolStats {
450 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
451 write!(
452 f,
453 "Pool: {} total, {} healthy ({:.1}%), {} unhealthy, {} unknown",
454 self.total_endpoints,
455 self.healthy_endpoints,
456 self.health_percentage(),
457 self.unhealthy_endpoints,
458 self.unknown_endpoints
459 )
460 }
461}
462
463#[cfg(test)]
464mod tests {
465 use super::*;
466
467 #[test]
468 fn test_pool_config() {
469 let config = PoolConfig::new(vec!["wss://endpoint1".to_string()])
470 .with_health_check_interval(Duration::from_secs(60))
471 .with_max_retries(5);
472
473 assert_eq!(config.endpoints.len(), 1);
474 assert_eq!(config.health_check_interval, Duration::from_secs(60));
475 assert_eq!(config.max_retries, 5);
476 }
477
478 #[test]
479 fn test_pool_stats() {
480 let stats = PoolStats {
481 total_endpoints: 4,
482 healthy_endpoints: 3,
483 unhealthy_endpoints: 1,
484 unknown_endpoints: 0,
485 };
486
487 assert_eq!(stats.health_percentage(), 75.0);
488 }
489
490 #[tokio::test]
491 #[ignore] async fn test_connection_pool() {
493 let config = PoolConfig::new(vec!["wss://westend-rpc.polkadot.io".to_string()]);
494
495 let pool = ConnectionPool::new(config, ChainConfig::westend()).await;
496 assert!(pool.is_ok());
497
498 let pool = pool.unwrap();
499 let stats = pool.stats();
500 assert!(stats.total_endpoints > 0);
501 }
502}