apex_sdk_substrate/
pool.rs

1//! Connection pooling for Substrate endpoints
2//!
3//! This module provides connection pooling functionality including:
4//! - Multiple endpoint management
5//! - Round-robin load balancing
6//! - Health checking
7//! - Automatic failover
8
9use crate::{ChainConfig, Error, Result, SubstrateAdapter};
10use parking_lot::RwLock;
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13use tokio::time::sleep;
14use tracing::{debug, info, warn};
15
16/// Configuration for connection pool
17#[derive(Debug, Clone)]
18pub struct PoolConfig {
19    /// Endpoints to connect to
20    pub endpoints: Vec<String>,
21    /// Health check interval
22    pub health_check_interval: Duration,
23    /// Connection timeout
24    pub connection_timeout: Duration,
25    /// Maximum retries for failed connections
26    pub max_retries: u32,
27    /// Enable automatic health checking
28    pub auto_health_check: bool,
29}
30
31impl PoolConfig {
32    /// Create a new pool configuration
33    pub fn new(endpoints: Vec<String>) -> Self {
34        Self {
35            endpoints,
36            health_check_interval: Duration::from_secs(30),
37            connection_timeout: Duration::from_secs(10),
38            max_retries: 3,
39            auto_health_check: true,
40        }
41    }
42
43    /// Set health check interval
44    pub fn with_health_check_interval(mut self, interval: Duration) -> Self {
45        self.health_check_interval = interval;
46        self
47    }
48
49    /// Set connection timeout
50    pub fn with_connection_timeout(mut self, timeout: Duration) -> Self {
51        self.connection_timeout = timeout;
52        self
53    }
54
55    /// Set maximum retries
56    pub fn with_max_retries(mut self, max_retries: u32) -> Self {
57        self.max_retries = max_retries;
58        self
59    }
60
61    /// Enable or disable automatic health checking
62    pub fn with_auto_health_check(mut self, enabled: bool) -> Self {
63        self.auto_health_check = enabled;
64        self
65    }
66}
67
68/// Health status of an endpoint
69#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70pub enum HealthStatus {
71    /// Endpoint is healthy
72    Healthy,
73    /// Endpoint is unhealthy
74    Unhealthy,
75    /// Health status is unknown
76    Unknown,
77}
78
79/// Information about a pooled connection
80#[derive(Clone)]
81struct PooledConnection {
82    endpoint: String,
83    adapter: Option<Arc<SubstrateAdapter>>,
84    health_status: HealthStatus,
85    last_check: Instant,
86    failure_count: u32,
87}
88
89/// Connection pool for managing multiple Substrate connections
90pub struct ConnectionPool {
91    config: PoolConfig,
92    connections: Arc<RwLock<Vec<PooledConnection>>>,
93    current_index: Arc<RwLock<usize>>,
94    chain_config: ChainConfig,
95}
96
97impl ConnectionPool {
98    /// Create a new connection pool
99    pub async fn new(config: PoolConfig, chain_config: ChainConfig) -> Result<Self> {
100        if config.endpoints.is_empty() {
101            return Err(Error::Connection(
102                "At least one endpoint is required".to_string(),
103            ));
104        }
105
106        info!(
107            "Creating connection pool with {} endpoints",
108            config.endpoints.len()
109        );
110
111        let mut connections = Vec::new();
112
113        // Initialize connections
114        for endpoint in &config.endpoints {
115            debug!("Initializing connection to {}", endpoint);
116
117            let mut chain_cfg = chain_config.clone();
118            chain_cfg.endpoint = endpoint.clone();
119
120            let adapter = match SubstrateAdapter::connect_with_config(chain_cfg).await {
121                Ok(adapter) => {
122                    info!("Successfully connected to {}", endpoint);
123                    Some(Arc::new(adapter))
124                }
125                Err(e) => {
126                    warn!("Failed to connect to {}: {}", endpoint, e);
127                    None
128                }
129            };
130
131            let health_status = if adapter.is_some() {
132                HealthStatus::Healthy
133            } else {
134                HealthStatus::Unhealthy
135            };
136
137            connections.push(PooledConnection {
138                endpoint: endpoint.clone(),
139                adapter,
140                health_status,
141                last_check: Instant::now(),
142                failure_count: 0,
143            });
144        }
145
146        let pool = Self {
147            config,
148            connections: Arc::new(RwLock::new(connections)),
149            current_index: Arc::new(RwLock::new(0)),
150            chain_config,
151        };
152
153        // Start health checker if enabled
154        if pool.config.auto_health_check {
155            pool.start_health_checker();
156        }
157
158        Ok(pool)
159    }
160
161    /// Get the next available healthy connection using round-robin
162    #[allow(clippy::result_large_err)]
163    pub fn get_connection(&self) -> Result<Arc<SubstrateAdapter>> {
164        let connections = self.connections.read();
165        let healthy_count = connections
166            .iter()
167            .filter(|c| c.health_status == HealthStatus::Healthy && c.adapter.is_some())
168            .count();
169
170        if healthy_count == 0 {
171            return Err(Error::Connection(
172                "No healthy connections available".to_string(),
173            ));
174        }
175
176        // Round-robin selection among healthy connections
177        let start_index = *self.current_index.read();
178        let total = connections.len();
179
180        for i in 0..total {
181            let index = (start_index + i) % total;
182            let conn = &connections[index];
183
184            if conn.health_status == HealthStatus::Healthy {
185                if let Some(adapter) = &conn.adapter {
186                    // Update index for next call
187                    *self.current_index.write() = (index + 1) % total;
188                    return Ok(adapter.clone());
189                }
190            }
191        }
192
193        Err(Error::Connection(
194            "No healthy connections available".to_string(),
195        ))
196    }
197
198    /// Get all available connections
199    pub fn get_all_connections(&self) -> Vec<Arc<SubstrateAdapter>> {
200        self.connections
201            .read()
202            .iter()
203            .filter_map(|c| c.adapter.clone())
204            .collect()
205    }
206
207    /// Get pool statistics
208    pub fn stats(&self) -> PoolStats {
209        let connections = self.connections.read();
210
211        let total = connections.len();
212        let healthy = connections
213            .iter()
214            .filter(|c| c.health_status == HealthStatus::Healthy)
215            .count();
216        let unhealthy = connections
217            .iter()
218            .filter(|c| c.health_status == HealthStatus::Unhealthy)
219            .count();
220        let unknown = connections
221            .iter()
222            .filter(|c| c.health_status == HealthStatus::Unknown)
223            .count();
224
225        PoolStats {
226            total_endpoints: total,
227            healthy_endpoints: healthy,
228            unhealthy_endpoints: unhealthy,
229            unknown_endpoints: unknown,
230        }
231    }
232
233    /// Manually trigger health check for all endpoints
234    #[allow(clippy::await_holding_lock)]
235    pub async fn health_check(&self) {
236        debug!("Running health check on all endpoints");
237
238        let mut connections = self.connections.write();
239
240        for conn in connections.iter_mut() {
241            let is_healthy = if let Some(adapter) = &conn.adapter {
242                // Simple health check: verify connection is still active
243                adapter.is_connected()
244            } else {
245                false
246            };
247
248            conn.health_status = if is_healthy {
249                HealthStatus::Healthy
250            } else {
251                HealthStatus::Unhealthy
252            };
253            conn.last_check = Instant::now();
254
255            if !is_healthy {
256                conn.failure_count += 1;
257
258                // Try to reconnect if failures are below threshold
259                if conn.failure_count <= self.config.max_retries {
260                    debug!("Attempting to reconnect to {}", conn.endpoint);
261
262                    let mut chain_cfg = self.chain_config.clone();
263                    chain_cfg.endpoint = conn.endpoint.clone();
264
265                    match SubstrateAdapter::connect_with_config(chain_cfg).await {
266                        Ok(adapter) => {
267                            info!("Successfully reconnected to {}", conn.endpoint);
268                            conn.adapter = Some(Arc::new(adapter));
269                            conn.health_status = HealthStatus::Healthy;
270                            conn.failure_count = 0;
271                        }
272                        Err(e) => {
273                            warn!("Failed to reconnect to {}: {}", conn.endpoint, e);
274                        }
275                    }
276                }
277            } else {
278                // Reset failure count on success
279                conn.failure_count = 0;
280            }
281        }
282    }
283
284    /// Start background health checker
285    fn start_health_checker(&self) {
286        let connections = self.connections.clone();
287        let interval = self.config.health_check_interval;
288        let chain_config = self.chain_config.clone();
289        let max_retries = self.config.max_retries;
290
291        tokio::spawn(async move {
292            loop {
293                sleep(interval).await;
294
295                debug!("Background health check running");
296
297                // Collect endpoints that need reconnection
298                let endpoints_to_reconnect: Vec<(String, bool)> = {
299                    let mut conns = connections.write();
300                    let mut to_reconnect = Vec::new();
301
302                    for conn in conns.iter_mut() {
303                        let is_healthy = if let Some(adapter) = &conn.adapter {
304                            adapter.is_connected()
305                        } else {
306                            false
307                        };
308
309                        conn.health_status = if is_healthy {
310                            HealthStatus::Healthy
311                        } else {
312                            HealthStatus::Unhealthy
313                        };
314                        conn.last_check = Instant::now();
315
316                        if !is_healthy && conn.failure_count <= max_retries {
317                            to_reconnect.push((conn.endpoint.clone(), true));
318                        }
319                    }
320
321                    to_reconnect
322                };
323
324                // Reconnect outside the lock
325                for (endpoint, _) in endpoints_to_reconnect {
326                    let mut chain_cfg = chain_config.clone();
327                    chain_cfg.endpoint = endpoint.clone();
328
329                    if let Ok(adapter) = SubstrateAdapter::connect_with_config(chain_cfg).await {
330                        let mut conns = connections.write();
331                        if let Some(conn) = conns.iter_mut().find(|c| c.endpoint == endpoint) {
332                            conn.adapter = Some(Arc::new(adapter));
333                            conn.health_status = HealthStatus::Healthy;
334                            conn.failure_count = 0;
335                        }
336                    } else {
337                        let mut conns = connections.write();
338                        if let Some(conn) = conns.iter_mut().find(|c| c.endpoint == endpoint) {
339                            conn.failure_count += 1;
340                        }
341                    }
342                }
343            }
344        });
345    }
346
347    /// Get the number of endpoints in the pool
348    pub fn endpoint_count(&self) -> usize {
349        self.connections.read().len()
350    }
351
352    /// Add a new endpoint to the pool
353    pub async fn add_endpoint(&self, endpoint: String) -> Result<()> {
354        info!("Adding new endpoint to pool: {}", endpoint);
355
356        let mut chain_cfg = self.chain_config.clone();
357        chain_cfg.endpoint = endpoint.clone();
358
359        let adapter = match SubstrateAdapter::connect_with_config(chain_cfg).await {
360            Ok(adapter) => Some(Arc::new(adapter)),
361            Err(e) => {
362                warn!("Failed to connect to new endpoint {}: {}", endpoint, e);
363                None
364            }
365        };
366
367        let health_status = if adapter.is_some() {
368            HealthStatus::Healthy
369        } else {
370            HealthStatus::Unhealthy
371        };
372
373        let conn = PooledConnection {
374            endpoint,
375            adapter,
376            health_status,
377            last_check: Instant::now(),
378            failure_count: 0,
379        };
380
381        self.connections.write().push(conn);
382        Ok(())
383    }
384
385    /// Remove an endpoint from the pool
386    #[allow(clippy::result_large_err)]
387    pub fn remove_endpoint(&self, endpoint: &str) -> Result<()> {
388        info!("Removing endpoint from pool: {}", endpoint);
389
390        let mut connections = self.connections.write();
391        let initial_len = connections.len();
392
393        connections.retain(|c| c.endpoint != endpoint);
394
395        if connections.len() == initial_len {
396            return Err(Error::Connection(format!(
397                "Endpoint '{}' not found in pool",
398                endpoint
399            )));
400        }
401
402        if connections.is_empty() {
403            return Err(Error::Connection(
404                "Cannot remove last endpoint from pool".to_string(),
405            ));
406        }
407
408        Ok(())
409    }
410}
411
412/// Statistics about the connection pool
413#[derive(Debug, Clone)]
414pub struct PoolStats {
415    /// Total number of endpoints
416    pub total_endpoints: usize,
417    /// Number of healthy endpoints
418    pub healthy_endpoints: usize,
419    /// Number of unhealthy endpoints
420    pub unhealthy_endpoints: usize,
421    /// Number of endpoints with unknown status
422    pub unknown_endpoints: usize,
423}
424
425impl PoolStats {
426    /// Get the health percentage
427    pub fn health_percentage(&self) -> f64 {
428        if self.total_endpoints == 0 {
429            0.0
430        } else {
431            (self.healthy_endpoints as f64 / self.total_endpoints as f64) * 100.0
432        }
433    }
434}
435
436impl std::fmt::Display for PoolStats {
437    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
438        write!(
439            f,
440            "Pool: {} total, {} healthy ({:.1}%), {} unhealthy, {} unknown",
441            self.total_endpoints,
442            self.healthy_endpoints,
443            self.health_percentage(),
444            self.unhealthy_endpoints,
445            self.unknown_endpoints
446        )
447    }
448}
449
450#[cfg(test)]
451mod tests {
452    use super::*;
453
454    #[test]
455    fn test_pool_config() {
456        let config = PoolConfig::new(vec!["wss://endpoint1".to_string()])
457            .with_health_check_interval(Duration::from_secs(60))
458            .with_max_retries(5);
459
460        assert_eq!(config.endpoints.len(), 1);
461        assert_eq!(config.health_check_interval, Duration::from_secs(60));
462        assert_eq!(config.max_retries, 5);
463    }
464
465    #[test]
466    fn test_pool_stats() {
467        let stats = PoolStats {
468            total_endpoints: 4,
469            healthy_endpoints: 3,
470            unhealthy_endpoints: 1,
471            unknown_endpoints: 0,
472        };
473
474        assert_eq!(stats.health_percentage(), 75.0);
475    }
476
477    #[tokio::test]
478    #[ignore] // Requires network
479    async fn test_connection_pool() {
480        let config = PoolConfig::new(vec!["wss://westend-rpc.polkadot.io".to_string()]);
481
482        let pool = ConnectionPool::new(config, ChainConfig::westend()).await;
483        assert!(pool.is_ok());
484
485        let pool = pool.unwrap();
486        let stats = pool.stats();
487        assert!(stats.total_endpoints > 0);
488    }
489}