apex_sdk_substrate/
pool.rs

1//! Connection pooling for Substrate endpoints
2//!
3//! This module provides connection pooling functionality including:
4//! - Multiple endpoint management
5//! - Round-robin load balancing
6//! - Health checking
7//! - Automatic failover
8
9use crate::{ChainConfig, Error, Result, SubstrateAdapter};
10use parking_lot::RwLock;
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13use tokio::time::sleep;
14use tracing::{debug, info, warn};
15
16/// Configuration for connection pool
17#[derive(Debug, Clone)]
18pub struct PoolConfig {
19    /// Endpoints to connect to
20    pub endpoints: Vec<String>,
21    /// Health check interval
22    pub health_check_interval: Duration,
23    /// Connection timeout
24    pub connection_timeout: Duration,
25    /// Maximum retries for failed connections
26    pub max_retries: u32,
27    /// Enable automatic health checking
28    pub auto_health_check: bool,
29}
30
31impl PoolConfig {
32    /// Create a new pool configuration
33    pub fn new(endpoints: Vec<String>) -> Self {
34        Self {
35            endpoints,
36            health_check_interval: Duration::from_secs(30),
37            connection_timeout: Duration::from_secs(10),
38            max_retries: 3,
39            auto_health_check: true,
40        }
41    }
42
43    /// Set health check interval
44    pub fn with_health_check_interval(mut self, interval: Duration) -> Self {
45        self.health_check_interval = interval;
46        self
47    }
48
49    /// Set connection timeout
50    pub fn with_connection_timeout(mut self, timeout: Duration) -> Self {
51        self.connection_timeout = timeout;
52        self
53    }
54
55    /// Set maximum retries
56    pub fn with_max_retries(mut self, max_retries: u32) -> Self {
57        self.max_retries = max_retries;
58        self
59    }
60
61    /// Enable or disable automatic health checking
62    pub fn with_auto_health_check(mut self, enabled: bool) -> Self {
63        self.auto_health_check = enabled;
64        self
65    }
66}
67
68/// Health status of an endpoint
69#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70pub enum HealthStatus {
71    /// Endpoint is healthy
72    Healthy,
73    /// Endpoint is unhealthy
74    Unhealthy,
75    /// Health status is unknown
76    Unknown,
77}
78
79/// Information about a pooled connection
80#[derive(Clone)]
81struct PooledConnection {
82    endpoint: String,
83    adapter: Option<Arc<SubstrateAdapter>>,
84    health_status: HealthStatus,
85    last_check: Instant,
86    failure_count: u32,
87}
88
89/// Connection pool for managing multiple Substrate connections
90pub struct ConnectionPool {
91    config: PoolConfig,
92    connections: Arc<RwLock<Vec<PooledConnection>>>,
93    current_index: Arc<RwLock<usize>>,
94    chain_config: ChainConfig,
95}
96
97impl ConnectionPool {
98    /// Create a new connection pool
99    pub async fn new(config: PoolConfig, chain_config: ChainConfig) -> Result<Self> {
100        if config.endpoints.is_empty() {
101            return Err(Error::Connection(
102                "At least one endpoint is required".to_string(),
103            ));
104        }
105
106        info!(
107            "Creating connection pool with {} endpoints",
108            config.endpoints.len()
109        );
110
111        let mut connections = Vec::new();
112
113        // Initialize connections
114        for endpoint in &config.endpoints {
115            debug!("Initializing connection to {}", endpoint);
116
117            let mut chain_cfg = chain_config.clone();
118            chain_cfg.endpoint = endpoint.clone();
119
120            let adapter = match SubstrateAdapter::connect_with_config(chain_cfg).await {
121                Ok(adapter) => {
122                    info!("Successfully connected to {}", endpoint);
123                    Some(Arc::new(adapter))
124                }
125                Err(e) => {
126                    warn!("Failed to connect to {}: {}", endpoint, e);
127                    None
128                }
129            };
130
131            let health_status = if adapter.is_some() {
132                HealthStatus::Healthy
133            } else {
134                HealthStatus::Unhealthy
135            };
136
137            connections.push(PooledConnection {
138                endpoint: endpoint.clone(),
139                adapter,
140                health_status,
141                last_check: Instant::now(),
142                failure_count: 0,
143            });
144        }
145
146        let pool = Self {
147            config,
148            connections: Arc::new(RwLock::new(connections)),
149            current_index: Arc::new(RwLock::new(0)),
150            chain_config,
151        };
152
153        // Start health checker if enabled
154        if pool.config.auto_health_check {
155            pool.start_health_checker();
156        }
157
158        Ok(pool)
159    }
160
161    /// Get the next available healthy connection using round-robin
162    #[allow(clippy::result_large_err)]
163    pub fn get_connection(&self) -> Result<Arc<SubstrateAdapter>> {
164        let connections = self.connections.read();
165        let healthy_count = connections
166            .iter()
167            .filter(|c| c.health_status == HealthStatus::Healthy && c.adapter.is_some())
168            .count();
169
170        if healthy_count == 0 {
171            return Err(Error::Connection(
172                "No healthy connections available".to_string(),
173            ));
174        }
175
176        // Round-robin selection among healthy connections
177        let start_index = *self.current_index.read();
178        let total = connections.len();
179
180        for i in 0..total {
181            let index = (start_index + i) % total;
182            let conn = &connections[index];
183
184            if conn.health_status == HealthStatus::Healthy {
185                if let Some(adapter) = &conn.adapter {
186                    // Update index for next call
187                    *self.current_index.write() = (index + 1) % total;
188                    return Ok(adapter.clone());
189                }
190            }
191        }
192
193        Err(Error::Connection(
194            "No healthy connections available".to_string(),
195        ))
196    }
197
198    /// Get all available connections
199    pub fn get_all_connections(&self) -> Vec<Arc<SubstrateAdapter>> {
200        self.connections
201            .read()
202            .iter()
203            .filter_map(|c| c.adapter.clone())
204            .collect()
205    }
206
207    /// Get pool statistics
208    pub fn stats(&self) -> PoolStats {
209        let connections = self.connections.read();
210
211        let total = connections.len();
212        let healthy = connections
213            .iter()
214            .filter(|c| c.health_status == HealthStatus::Healthy)
215            .count();
216        let unhealthy = connections
217            .iter()
218            .filter(|c| c.health_status == HealthStatus::Unhealthy)
219            .count();
220        let unknown = connections
221            .iter()
222            .filter(|c| c.health_status == HealthStatus::Unknown)
223            .count();
224
225        PoolStats {
226            total_endpoints: total,
227            healthy_endpoints: healthy,
228            unhealthy_endpoints: unhealthy,
229            unknown_endpoints: unknown,
230        }
231    }
232
233    /// Manually trigger health check for all endpoints
234    pub async fn health_check(&self) {
235        debug!("Running health check on all endpoints");
236
237        // Collect endpoints that need reconnection while holding the lock
238        let endpoints_to_reconnect: Vec<String> = {
239            let mut connections = self.connections.write();
240
241            let mut to_reconnect = Vec::new();
242
243            for conn in connections.iter_mut() {
244                let is_healthy = if let Some(adapter) = &conn.adapter {
245                    // Simple health check: verify connection is still active
246                    adapter.is_connected()
247                } else {
248                    false
249                };
250
251                conn.health_status = if is_healthy {
252                    HealthStatus::Healthy
253                } else {
254                    HealthStatus::Unhealthy
255                };
256                conn.last_check = Instant::now();
257
258                if !is_healthy {
259                    conn.failure_count += 1;
260
261                    // Mark for reconnection if failures are below threshold
262                    if conn.failure_count <= self.config.max_retries {
263                        to_reconnect.push(conn.endpoint.clone());
264                    }
265                } else {
266                    // Reset failure count on success
267                    conn.failure_count = 0;
268                }
269            }
270
271            to_reconnect
272        };
273
274        // Reconnect outside the lock to avoid holding it across await points
275        for endpoint in endpoints_to_reconnect {
276            debug!("Attempting to reconnect to {}", endpoint);
277
278            let mut chain_cfg = self.chain_config.clone();
279            chain_cfg.endpoint = endpoint.clone();
280
281            match SubstrateAdapter::connect_with_config(chain_cfg).await {
282                Ok(adapter) => {
283                    info!("Successfully reconnected to {}", endpoint);
284                    let mut connections = self.connections.write();
285                    if let Some(conn) = connections.iter_mut().find(|c| c.endpoint == endpoint) {
286                        conn.adapter = Some(Arc::new(adapter));
287                        conn.health_status = HealthStatus::Healthy;
288                        conn.failure_count = 0;
289                    }
290                }
291                Err(e) => {
292                    warn!("Failed to reconnect to {}: {}", endpoint, e);
293                }
294            }
295        }
296    }
297
298    /// Start background health checker
299    fn start_health_checker(&self) {
300        let connections = self.connections.clone();
301        let interval = self.config.health_check_interval;
302        let chain_config = self.chain_config.clone();
303        let max_retries = self.config.max_retries;
304
305        tokio::spawn(async move {
306            loop {
307                sleep(interval).await;
308
309                debug!("Background health check running");
310
311                // Collect endpoints that need reconnection
312                let endpoints_to_reconnect: Vec<(String, bool)> = {
313                    let mut conns = connections.write();
314                    let mut to_reconnect = Vec::new();
315
316                    for conn in conns.iter_mut() {
317                        let is_healthy = if let Some(adapter) = &conn.adapter {
318                            adapter.is_connected()
319                        } else {
320                            false
321                        };
322
323                        conn.health_status = if is_healthy {
324                            HealthStatus::Healthy
325                        } else {
326                            HealthStatus::Unhealthy
327                        };
328                        conn.last_check = Instant::now();
329
330                        if !is_healthy && conn.failure_count <= max_retries {
331                            to_reconnect.push((conn.endpoint.clone(), true));
332                        }
333                    }
334
335                    to_reconnect
336                };
337
338                // Reconnect outside the lock
339                for (endpoint, _) in endpoints_to_reconnect {
340                    let mut chain_cfg = chain_config.clone();
341                    chain_cfg.endpoint = endpoint.clone();
342
343                    if let Ok(adapter) = SubstrateAdapter::connect_with_config(chain_cfg).await {
344                        let mut conns = connections.write();
345                        if let Some(conn) = conns.iter_mut().find(|c| c.endpoint == endpoint) {
346                            conn.adapter = Some(Arc::new(adapter));
347                            conn.health_status = HealthStatus::Healthy;
348                            conn.failure_count = 0;
349                        }
350                    } else {
351                        let mut conns = connections.write();
352                        if let Some(conn) = conns.iter_mut().find(|c| c.endpoint == endpoint) {
353                            conn.failure_count += 1;
354                        }
355                    }
356                }
357            }
358        });
359    }
360
361    /// Get the number of endpoints in the pool
362    pub fn endpoint_count(&self) -> usize {
363        self.connections.read().len()
364    }
365
366    /// Add a new endpoint to the pool
367    pub async fn add_endpoint(&self, endpoint: String) -> Result<()> {
368        info!("Adding new endpoint to pool: {}", endpoint);
369
370        let mut chain_cfg = self.chain_config.clone();
371        chain_cfg.endpoint = endpoint.clone();
372
373        let adapter = match SubstrateAdapter::connect_with_config(chain_cfg).await {
374            Ok(adapter) => Some(Arc::new(adapter)),
375            Err(e) => {
376                warn!("Failed to connect to new endpoint {}: {}", endpoint, e);
377                None
378            }
379        };
380
381        let health_status = if adapter.is_some() {
382            HealthStatus::Healthy
383        } else {
384            HealthStatus::Unhealthy
385        };
386
387        let conn = PooledConnection {
388            endpoint,
389            adapter,
390            health_status,
391            last_check: Instant::now(),
392            failure_count: 0,
393        };
394
395        self.connections.write().push(conn);
396        Ok(())
397    }
398
399    /// Remove an endpoint from the pool
400    #[allow(clippy::result_large_err)]
401    pub fn remove_endpoint(&self, endpoint: &str) -> Result<()> {
402        info!("Removing endpoint from pool: {}", endpoint);
403
404        let mut connections = self.connections.write();
405        let initial_len = connections.len();
406
407        connections.retain(|c| c.endpoint != endpoint);
408
409        if connections.len() == initial_len {
410            return Err(Error::Connection(format!(
411                "Endpoint '{}' not found in pool",
412                endpoint
413            )));
414        }
415
416        if connections.is_empty() {
417            return Err(Error::Connection(
418                "Cannot remove last endpoint from pool".to_string(),
419            ));
420        }
421
422        Ok(())
423    }
424}
425
426/// Statistics about the connection pool
427#[derive(Debug, Clone)]
428pub struct PoolStats {
429    /// Total number of endpoints
430    pub total_endpoints: usize,
431    /// Number of healthy endpoints
432    pub healthy_endpoints: usize,
433    /// Number of unhealthy endpoints
434    pub unhealthy_endpoints: usize,
435    /// Number of endpoints with unknown status
436    pub unknown_endpoints: usize,
437}
438
439impl PoolStats {
440    /// Get the health percentage
441    pub fn health_percentage(&self) -> f64 {
442        if self.total_endpoints == 0 {
443            0.0
444        } else {
445            (self.healthy_endpoints as f64 / self.total_endpoints as f64) * 100.0
446        }
447    }
448}
449
450impl std::fmt::Display for PoolStats {
451    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
452        write!(
453            f,
454            "Pool: {} total, {} healthy ({:.1}%), {} unhealthy, {} unknown",
455            self.total_endpoints,
456            self.healthy_endpoints,
457            self.health_percentage(),
458            self.unhealthy_endpoints,
459            self.unknown_endpoints
460        )
461    }
462}
463
464#[cfg(test)]
465mod tests {
466    use super::*;
467
468    #[test]
469    fn test_pool_config() {
470        let config = PoolConfig::new(vec!["wss://endpoint1".to_string()])
471            .with_health_check_interval(Duration::from_secs(60))
472            .with_max_retries(5);
473
474        assert_eq!(config.endpoints.len(), 1);
475        assert_eq!(config.health_check_interval, Duration::from_secs(60));
476        assert_eq!(config.max_retries, 5);
477    }
478
479    #[test]
480    fn test_pool_stats() {
481        let stats = PoolStats {
482            total_endpoints: 4,
483            healthy_endpoints: 3,
484            unhealthy_endpoints: 1,
485            unknown_endpoints: 0,
486        };
487
488        assert_eq!(stats.health_percentage(), 75.0);
489    }
490
491    #[tokio::test]
492    #[ignore] // Requires network
493    async fn test_connection_pool() {
494        let config = PoolConfig::new(vec!["wss://westend-rpc.polkadot.io".to_string()]);
495
496        let pool = ConnectionPool::new(config, ChainConfig::westend()).await;
497        assert!(pool.is_ok());
498
499        let pool = pool.unwrap();
500        let stats = pool.stats();
501        assert!(stats.total_endpoints > 0);
502    }
503}