apex_sdk_substrate/
pool.rs

1//! Connection pooling for Substrate endpoints
2//!
3//! This module provides connection pooling functionality including:
4//! - Multiple endpoint management
5//! - Round-robin load balancing
6//! - Health checking
7//! - Automatic failover
8
9use crate::{ChainConfig, Error, Result, SubstrateAdapter};
10use parking_lot::RwLock;
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13use tokio::time::sleep;
14use tracing::{debug, info, warn};
15
16/// Configuration for connection pool
17#[derive(Debug, Clone)]
18pub struct PoolConfig {
19    /// Endpoints to connect to
20    pub endpoints: Vec<String>,
21    /// Health check interval
22    pub health_check_interval: Duration,
23    /// Connection timeout
24    pub connection_timeout: Duration,
25    /// Maximum retries for failed connections
26    pub max_retries: u32,
27    /// Enable automatic health checking
28    pub auto_health_check: bool,
29}
30
31impl PoolConfig {
32    /// Create a new pool configuration
33    pub fn new(endpoints: Vec<String>) -> Self {
34        Self {
35            endpoints,
36            health_check_interval: Duration::from_secs(30),
37            connection_timeout: Duration::from_secs(10),
38            max_retries: 3,
39            auto_health_check: true,
40        }
41    }
42
43    /// Set health check interval
44    pub fn with_health_check_interval(mut self, interval: Duration) -> Self {
45        self.health_check_interval = interval;
46        self
47    }
48
49    /// Set connection timeout
50    pub fn with_connection_timeout(mut self, timeout: Duration) -> Self {
51        self.connection_timeout = timeout;
52        self
53    }
54
55    /// Set maximum retries
56    pub fn with_max_retries(mut self, max_retries: u32) -> Self {
57        self.max_retries = max_retries;
58        self
59    }
60
61    /// Enable or disable automatic health checking
62    pub fn with_auto_health_check(mut self, enabled: bool) -> Self {
63        self.auto_health_check = enabled;
64        self
65    }
66}
67
68/// Health status of an endpoint
69#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70pub enum HealthStatus {
71    /// Endpoint is healthy
72    Healthy,
73    /// Endpoint is unhealthy
74    Unhealthy,
75    /// Health status is unknown
76    Unknown,
77}
78
79/// Information about a pooled connection
80#[derive(Clone)]
81struct PooledConnection {
82    endpoint: String,
83    adapter: Option<Arc<SubstrateAdapter>>,
84    health_status: HealthStatus,
85    last_check: Instant,
86    failure_count: u32,
87}
88
89/// Connection pool for managing multiple Substrate connections
90pub struct ConnectionPool {
91    config: PoolConfig,
92    connections: Arc<RwLock<Vec<PooledConnection>>>,
93    current_index: Arc<RwLock<usize>>,
94    chain_config: ChainConfig,
95}
96
97impl ConnectionPool {
98    /// Create a new connection pool
99    pub async fn new(config: PoolConfig, chain_config: ChainConfig) -> Result<Self> {
100        if config.endpoints.is_empty() {
101            return Err(Error::Connection(
102                "At least one endpoint is required".to_string(),
103            ));
104        }
105
106        info!(
107            "Creating connection pool with {} endpoints",
108            config.endpoints.len()
109        );
110
111        let mut connections = Vec::new();
112
113        // Initialize connections
114        for endpoint in &config.endpoints {
115            debug!("Initializing connection to {}", endpoint);
116
117            let mut chain_cfg = chain_config.clone();
118            chain_cfg.endpoint = endpoint.clone();
119
120            let adapter = match SubstrateAdapter::connect_with_config(chain_cfg).await {
121                Ok(adapter) => {
122                    info!("Successfully connected to {}", endpoint);
123                    Some(Arc::new(adapter))
124                }
125                Err(e) => {
126                    warn!("Failed to connect to {}: {}", endpoint, e);
127                    None
128                }
129            };
130
131            let health_status = if adapter.is_some() {
132                HealthStatus::Healthy
133            } else {
134                HealthStatus::Unhealthy
135            };
136
137            connections.push(PooledConnection {
138                endpoint: endpoint.clone(),
139                adapter,
140                health_status,
141                last_check: Instant::now(),
142                failure_count: 0,
143            });
144        }
145
146        let pool = Self {
147            config,
148            connections: Arc::new(RwLock::new(connections)),
149            current_index: Arc::new(RwLock::new(0)),
150            chain_config,
151        };
152
153        // Start health checker if enabled
154        if pool.config.auto_health_check {
155            pool.start_health_checker();
156        }
157
158        Ok(pool)
159    }
160
161    /// Get the next available healthy connection using round-robin
162    pub fn get_connection(&self) -> Result<Arc<SubstrateAdapter>> {
163        let connections = self.connections.read();
164        let healthy_count = connections
165            .iter()
166            .filter(|c| c.health_status == HealthStatus::Healthy && c.adapter.is_some())
167            .count();
168
169        if healthy_count == 0 {
170            return Err(Error::Connection(
171                "No healthy connections available".to_string(),
172            ));
173        }
174
175        // Round-robin selection among healthy connections
176        let start_index = *self.current_index.read();
177        let total = connections.len();
178
179        for i in 0..total {
180            let index = (start_index + i) % total;
181            let conn = &connections[index];
182
183            if conn.health_status == HealthStatus::Healthy {
184                if let Some(adapter) = &conn.adapter {
185                    // Update index for next call
186                    *self.current_index.write() = (index + 1) % total;
187                    return Ok(adapter.clone());
188                }
189            }
190        }
191
192        Err(Error::Connection(
193            "No healthy connections available".to_string(),
194        ))
195    }
196
197    /// Get all available connections
198    pub fn get_all_connections(&self) -> Vec<Arc<SubstrateAdapter>> {
199        self.connections
200            .read()
201            .iter()
202            .filter_map(|c| c.adapter.clone())
203            .collect()
204    }
205
206    /// Get pool statistics
207    pub fn stats(&self) -> PoolStats {
208        let connections = self.connections.read();
209
210        let total = connections.len();
211        let healthy = connections
212            .iter()
213            .filter(|c| c.health_status == HealthStatus::Healthy)
214            .count();
215        let unhealthy = connections
216            .iter()
217            .filter(|c| c.health_status == HealthStatus::Unhealthy)
218            .count();
219        let unknown = connections
220            .iter()
221            .filter(|c| c.health_status == HealthStatus::Unknown)
222            .count();
223
224        PoolStats {
225            total_endpoints: total,
226            healthy_endpoints: healthy,
227            unhealthy_endpoints: unhealthy,
228            unknown_endpoints: unknown,
229        }
230    }
231
232    /// Manually trigger health check for all endpoints
233    pub async fn health_check(&self) {
234        debug!("Running health check on all endpoints");
235
236        // Collect endpoints that need reconnection
237        let reconnect_endpoints: Vec<String> = {
238            let mut connections = self.connections.write();
239
240            for conn in connections.iter_mut() {
241                let is_healthy = if let Some(adapter) = &conn.adapter {
242                    // Simple health check: verify connection is still active
243                    adapter.is_connected()
244                } else {
245                    false
246                };
247
248                conn.health_status = if is_healthy {
249                    HealthStatus::Healthy
250                } else {
251                    HealthStatus::Unhealthy
252                };
253                conn.last_check = Instant::now();
254
255                if !is_healthy {
256                    conn.failure_count += 1;
257                } else {
258                    // Reset failure count on success
259                    conn.failure_count = 0;
260                }
261            }
262
263            // Collect endpoints that need reconnection
264            connections
265                .iter()
266                .filter(|conn| {
267                    conn.health_status == HealthStatus::Unhealthy
268                        && conn.failure_count <= self.config.max_retries
269                })
270                .map(|conn| conn.endpoint.clone())
271                .collect()
272        }; // Lock is dropped here
273
274        // Reconnect to unhealthy endpoints without holding the lock
275        for endpoint in reconnect_endpoints {
276            debug!("Attempting to reconnect to {}", endpoint);
277
278            let mut chain_cfg = self.chain_config.clone();
279            chain_cfg.endpoint = endpoint.clone();
280
281            match SubstrateAdapter::connect_with_config(chain_cfg).await {
282                Ok(adapter) => {
283                    info!("Successfully reconnected to {}", endpoint);
284                    let mut connections = self.connections.write();
285                    if let Some(conn) = connections.iter_mut().find(|c| c.endpoint == endpoint) {
286                        conn.adapter = Some(Arc::new(adapter));
287                        conn.health_status = HealthStatus::Healthy;
288                        conn.failure_count = 0;
289                    }
290                }
291                Err(e) => {
292                    warn!("Failed to reconnect to {}: {}", endpoint, e);
293                }
294            }
295        }
296    }
297
298    /// Start background health checker
299    fn start_health_checker(&self) {
300        let connections = self.connections.clone();
301        let interval = self.config.health_check_interval;
302        let chain_config = self.chain_config.clone();
303        let max_retries = self.config.max_retries;
304
305        tokio::spawn(async move {
306            loop {
307                sleep(interval).await;
308
309                debug!("Background health check running");
310
311                // Collect endpoints that need reconnection
312                let endpoints_to_reconnect: Vec<(String, bool)> = {
313                    let mut conns = connections.write();
314                    let mut to_reconnect = Vec::new();
315
316                    for conn in conns.iter_mut() {
317                        let is_healthy = if let Some(adapter) = &conn.adapter {
318                            adapter.is_connected()
319                        } else {
320                            false
321                        };
322
323                        conn.health_status = if is_healthy {
324                            HealthStatus::Healthy
325                        } else {
326                            HealthStatus::Unhealthy
327                        };
328                        conn.last_check = Instant::now();
329
330                        if !is_healthy && conn.failure_count <= max_retries {
331                            to_reconnect.push((conn.endpoint.clone(), true));
332                        }
333                    }
334
335                    to_reconnect
336                };
337
338                // Reconnect outside the lock
339                for (endpoint, _) in endpoints_to_reconnect {
340                    let mut chain_cfg = chain_config.clone();
341                    chain_cfg.endpoint = endpoint.clone();
342
343                    if let Ok(adapter) = SubstrateAdapter::connect_with_config(chain_cfg).await {
344                        let mut conns = connections.write();
345                        if let Some(conn) = conns.iter_mut().find(|c| c.endpoint == endpoint) {
346                            conn.adapter = Some(Arc::new(adapter));
347                            conn.health_status = HealthStatus::Healthy;
348                            conn.failure_count = 0;
349                        }
350                    } else {
351                        let mut conns = connections.write();
352                        if let Some(conn) = conns.iter_mut().find(|c| c.endpoint == endpoint) {
353                            conn.failure_count += 1;
354                        }
355                    }
356                }
357            }
358        });
359    }
360
361    /// Get the number of endpoints in the pool
362    pub fn endpoint_count(&self) -> usize {
363        self.connections.read().len()
364    }
365
366    /// Add a new endpoint to the pool
367    pub async fn add_endpoint(&self, endpoint: String) -> Result<()> {
368        info!("Adding new endpoint to pool: {}", endpoint);
369
370        let mut chain_cfg = self.chain_config.clone();
371        chain_cfg.endpoint = endpoint.clone();
372
373        let adapter = match SubstrateAdapter::connect_with_config(chain_cfg).await {
374            Ok(adapter) => Some(Arc::new(adapter)),
375            Err(e) => {
376                warn!("Failed to connect to new endpoint {}: {}", endpoint, e);
377                None
378            }
379        };
380
381        let health_status = if adapter.is_some() {
382            HealthStatus::Healthy
383        } else {
384            HealthStatus::Unhealthy
385        };
386
387        let conn = PooledConnection {
388            endpoint,
389            adapter,
390            health_status,
391            last_check: Instant::now(),
392            failure_count: 0,
393        };
394
395        self.connections.write().push(conn);
396        Ok(())
397    }
398
399    /// Remove an endpoint from the pool
400    pub fn remove_endpoint(&self, endpoint: &str) -> Result<()> {
401        info!("Removing endpoint from pool: {}", endpoint);
402
403        let mut connections = self.connections.write();
404        let initial_len = connections.len();
405
406        connections.retain(|c| c.endpoint != endpoint);
407
408        if connections.len() == initial_len {
409            return Err(Error::Connection(format!(
410                "Endpoint '{}' not found in pool",
411                endpoint
412            )));
413        }
414
415        if connections.is_empty() {
416            return Err(Error::Connection(
417                "Cannot remove last endpoint from pool".to_string(),
418            ));
419        }
420
421        Ok(())
422    }
423}
424
425/// Statistics about the connection pool
426#[derive(Debug, Clone)]
427pub struct PoolStats {
428    /// Total number of endpoints
429    pub total_endpoints: usize,
430    /// Number of healthy endpoints
431    pub healthy_endpoints: usize,
432    /// Number of unhealthy endpoints
433    pub unhealthy_endpoints: usize,
434    /// Number of endpoints with unknown status
435    pub unknown_endpoints: usize,
436}
437
438impl PoolStats {
439    /// Get the health percentage
440    pub fn health_percentage(&self) -> f64 {
441        if self.total_endpoints == 0 {
442            0.0
443        } else {
444            (self.healthy_endpoints as f64 / self.total_endpoints as f64) * 100.0
445        }
446    }
447}
448
449impl std::fmt::Display for PoolStats {
450    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
451        write!(
452            f,
453            "Pool: {} total, {} healthy ({:.1}%), {} unhealthy, {} unknown",
454            self.total_endpoints,
455            self.healthy_endpoints,
456            self.health_percentage(),
457            self.unhealthy_endpoints,
458            self.unknown_endpoints
459        )
460    }
461}
462
463#[cfg(test)]
464mod tests {
465    use super::*;
466
467    #[test]
468    fn test_pool_config() {
469        let config = PoolConfig::new(vec!["wss://endpoint1".to_string()])
470            .with_health_check_interval(Duration::from_secs(60))
471            .with_max_retries(5);
472
473        assert_eq!(config.endpoints.len(), 1);
474        assert_eq!(config.health_check_interval, Duration::from_secs(60));
475        assert_eq!(config.max_retries, 5);
476    }
477
478    #[test]
479    fn test_pool_stats() {
480        let stats = PoolStats {
481            total_endpoints: 4,
482            healthy_endpoints: 3,
483            unhealthy_endpoints: 1,
484            unknown_endpoints: 0,
485        };
486
487        assert_eq!(stats.health_percentage(), 75.0);
488    }
489
490    #[tokio::test]
491    #[ignore] // Requires network
492    async fn test_connection_pool() {
493        let config = PoolConfig::new(vec!["wss://westend-rpc.polkadot.io".to_string()]);
494
495        let pool = ConnectionPool::new(config, ChainConfig::westend()).await;
496        assert!(pool.is_ok());
497
498        let pool = pool.unwrap();
499        let stats = pool.stats();
500        assert!(stats.total_endpoints > 0);
501    }
502}