apex_sdk_evm/
pool.rs

1//! Connection pooling for EVM providers
2//!
3//! This module provides:
4//! - Connection pooling with round-robin load balancing
5//! - Health checks for endpoints
6//! - Automatic failover to backup endpoints
7//! - Connection reuse
8
9use crate::{Error, EvmAdapter};
10use std::sync::atomic::{AtomicUsize, Ordering};
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13use tokio::sync::RwLock;
14
15/// Health status of an endpoint
16#[derive(Debug, Clone)]
17pub struct EndpointHealth {
18    /// Whether the endpoint is currently healthy
19    pub is_healthy: bool,
20    /// Last successful connection timestamp
21    pub last_success: Option<Instant>,
22    /// Last failed connection timestamp
23    pub last_failure: Option<Instant>,
24    /// Consecutive failure count
25    pub failure_count: u32,
26    /// Average response time in milliseconds
27    pub avg_response_time_ms: u64,
28}
29
30impl Default for EndpointHealth {
31    fn default() -> Self {
32        Self {
33            is_healthy: true,
34            last_success: None,
35            last_failure: None,
36            failure_count: 0,
37            avg_response_time_ms: 0,
38        }
39    }
40}
41
42/// Pooled connection to an EVM endpoint
43pub struct PooledConnection {
44    adapter: Arc<EvmAdapter>,
45    endpoint: String,
46    health: Arc<RwLock<EndpointHealth>>,
47}
48
49impl PooledConnection {
50    /// Get the underlying adapter
51    pub fn adapter(&self) -> &EvmAdapter {
52        &self.adapter
53    }
54
55    /// Get the endpoint URL
56    pub fn endpoint(&self) -> &str {
57        &self.endpoint
58    }
59
60    /// Get current health status
61    pub async fn health(&self) -> EndpointHealth {
62        self.health.read().await.clone()
63    }
64
65    /// Mark connection as healthy after successful operation
66    pub async fn mark_healthy(&self, response_time_ms: u64) {
67        let mut health = self.health.write().await;
68        health.is_healthy = true;
69        health.last_success = Some(Instant::now());
70        health.failure_count = 0;
71
72        // Update average response time (exponential moving average)
73        if health.avg_response_time_ms == 0 {
74            health.avg_response_time_ms = response_time_ms;
75        } else {
76            health.avg_response_time_ms = (health.avg_response_time_ms * 9 + response_time_ms) / 10;
77        }
78    }
79
80    /// Mark connection as unhealthy after failure
81    pub async fn mark_unhealthy(&self) {
82        let mut health = self.health.write().await;
83        health.last_failure = Some(Instant::now());
84        health.failure_count += 1;
85
86        // Mark as unhealthy after 3 consecutive failures
87        if health.failure_count >= 3 {
88            health.is_healthy = false;
89            tracing::warn!("Endpoint {} marked as unhealthy", self.endpoint);
90        }
91    }
92}
93
94/// Configuration for connection pool
95#[derive(Debug, Clone)]
96pub struct PoolConfig {
97    /// Maximum number of connections per endpoint
98    pub max_connections_per_endpoint: usize,
99    /// Health check interval in seconds
100    pub health_check_interval_secs: u64,
101    /// Timeout for health checks in seconds
102    pub health_check_timeout_secs: u64,
103    /// Maximum consecutive failures before marking unhealthy
104    pub max_failures: u32,
105    /// Time to wait before retrying unhealthy endpoint (seconds)
106    pub unhealthy_retry_delay_secs: u64,
107}
108
109impl Default for PoolConfig {
110    fn default() -> Self {
111        Self {
112            max_connections_per_endpoint: 10,
113            health_check_interval_secs: 30,
114            health_check_timeout_secs: 5,
115            max_failures: 3,
116            unhealthy_retry_delay_secs: 60,
117        }
118    }
119}
120
121/// Connection pool for EVM providers
122pub struct ConnectionPool {
123    endpoints: Vec<String>,
124    connections: Arc<RwLock<Vec<PooledConnection>>>,
125    next_index: AtomicUsize,
126    config: PoolConfig,
127}
128
129impl ConnectionPool {
130    /// Create a new connection pool
131    pub async fn new(endpoints: Vec<String>) -> Result<Self, Error> {
132        Self::with_config(endpoints, PoolConfig::default()).await
133    }
134
135    /// Create a new connection pool with custom configuration
136    pub async fn with_config(endpoints: Vec<String>, config: PoolConfig) -> Result<Self, Error> {
137        if endpoints.is_empty() {
138            return Err(Error::Connection("No endpoints provided".to_string()));
139        }
140
141        tracing::info!(
142            "Creating connection pool with {} endpoints",
143            endpoints.len()
144        );
145
146        let mut connections = Vec::new();
147
148        // Create initial connections
149        for endpoint in &endpoints {
150            match EvmAdapter::connect(endpoint).await {
151                Ok(adapter) => {
152                    let conn = PooledConnection {
153                        adapter: Arc::new(adapter),
154                        endpoint: endpoint.clone(),
155                        health: Arc::new(RwLock::new(EndpointHealth::default())),
156                    };
157                    connections.push(conn);
158                    tracing::info!("Successfully connected to endpoint: {}", endpoint);
159                }
160                Err(e) => {
161                    tracing::warn!("Failed to connect to endpoint {}: {}", endpoint, e);
162                    // Create unhealthy connection
163                    let adapter = EvmAdapter::connect(endpoint).await?;
164                    let health = EndpointHealth {
165                        is_healthy: false,
166                        failure_count: 1,
167                        ..Default::default()
168                    };
169
170                    let conn = PooledConnection {
171                        adapter: Arc::new(adapter),
172                        endpoint: endpoint.clone(),
173                        health: Arc::new(RwLock::new(health)),
174                    };
175                    connections.push(conn);
176                }
177            }
178        }
179
180        Ok(Self {
181            endpoints,
182            connections: Arc::new(RwLock::new(connections)),
183            next_index: AtomicUsize::new(0),
184            config,
185        })
186    }
187
188    /// Get a connection using round-robin load balancing
189    ///
190    /// This will skip unhealthy endpoints and try the next one
191    pub async fn get_connection(&self) -> Result<Arc<PooledConnection>, Error> {
192        let connections = self.connections.read().await;
193
194        if connections.is_empty() {
195            return Err(Error::Connection("No connections available".to_string()));
196        }
197
198        let total = connections.len();
199        let mut attempts = 0;
200
201        // Try to find a healthy connection
202        while attempts < total {
203            let index = self.next_index.fetch_add(1, Ordering::Relaxed) % total;
204            let conn = &connections[index];
205
206            let health = conn.health.read().await;
207            if health.is_healthy {
208                drop(health);
209                return Ok(Arc::new(PooledConnection {
210                    adapter: conn.adapter.clone(),
211                    endpoint: conn.endpoint.clone(),
212                    health: conn.health.clone(),
213                }));
214            }
215
216            // Check if enough time has passed to retry unhealthy endpoint
217            if let Some(last_failure) = health.last_failure {
218                if last_failure.elapsed().as_secs() > self.config.unhealthy_retry_delay_secs {
219                    drop(health);
220                    tracing::info!("Retrying previously unhealthy endpoint: {}", conn.endpoint);
221                    return Ok(Arc::new(PooledConnection {
222                        adapter: conn.adapter.clone(),
223                        endpoint: conn.endpoint.clone(),
224                        health: conn.health.clone(),
225                    }));
226                }
227            }
228
229            attempts += 1;
230        }
231
232        // All endpoints unhealthy, return the first one and let caller handle retry
233        let conn = &connections[0];
234        tracing::warn!("All endpoints unhealthy, returning first endpoint");
235        Ok(Arc::new(PooledConnection {
236            adapter: conn.adapter.clone(),
237            endpoint: conn.endpoint.clone(),
238            health: conn.health.clone(),
239        }))
240    }
241
242    /// Get health status of all endpoints
243    pub async fn health_status(&self) -> Vec<(String, EndpointHealth)> {
244        let connections = self.connections.read().await;
245        let mut status = Vec::new();
246
247        for conn in connections.iter() {
248            let health = conn.health.read().await.clone();
249            status.push((conn.endpoint.clone(), health));
250        }
251
252        status
253    }
254
255    /// Run health checks on all endpoints
256    pub async fn run_health_checks(&self) -> Result<(), Error> {
257        tracing::debug!("Running health checks on all endpoints");
258
259        let connections = self.connections.read().await;
260
261        for conn in connections.iter() {
262            let start = Instant::now();
263
264            // Try to get block number as health check
265            match conn.adapter.provider().get_block_number().await {
266                Ok(_) => {
267                    let elapsed = start.elapsed().as_millis() as u64;
268                    conn.mark_healthy(elapsed).await;
269                    tracing::debug!("Health check passed for {}: {}ms", conn.endpoint, elapsed);
270                }
271                Err(e) => {
272                    conn.mark_unhealthy().await;
273                    tracing::warn!("Health check failed for {}: {}", conn.endpoint, e);
274                }
275            }
276        }
277
278        Ok(())
279    }
280
281    /// Start automatic health checking in the background
282    pub fn start_health_checker(self: Arc<Self>) {
283        let pool = self.clone();
284        let interval = Duration::from_secs(self.config.health_check_interval_secs);
285        let interval_secs = self.config.health_check_interval_secs;
286
287        tokio::spawn(async move {
288            loop {
289                tokio::time::sleep(interval).await;
290
291                if let Err(e) = pool.run_health_checks().await {
292                    tracing::error!("Health check error: {}", e);
293                }
294            }
295        });
296
297        tracing::info!("Started health checker with interval: {}s", interval_secs);
298    }
299
300    /// Get the number of endpoints
301    pub fn endpoint_count(&self) -> usize {
302        self.endpoints.len()
303    }
304
305    /// Get list of all endpoints
306    pub fn endpoints(&self) -> &[String] {
307        &self.endpoints
308    }
309}
310
311#[cfg(test)]
312mod tests {
313    use super::*;
314
315    #[test]
316    fn test_pool_config_default() {
317        let config = PoolConfig::default();
318        assert_eq!(config.max_connections_per_endpoint, 10);
319        assert_eq!(config.health_check_interval_secs, 30);
320        assert_eq!(config.max_failures, 3);
321    }
322
323    #[test]
324    fn test_endpoint_health_default() {
325        let health = EndpointHealth::default();
326        assert!(health.is_healthy);
327        assert_eq!(health.failure_count, 0);
328    }
329
330    #[tokio::test]
331    #[ignore] // Requires network
332    async fn test_connection_pool() {
333        let endpoints = vec![
334            "https://eth.llamarpc.com".to_string(),
335            "https://ethereum.publicnode.com".to_string(),
336        ];
337
338        let pool = ConnectionPool::new(endpoints).await.unwrap();
339        assert_eq!(pool.endpoint_count(), 2);
340
341        // Get a connection
342        let conn = pool.get_connection().await.unwrap();
343        assert!(!conn.endpoint().is_empty());
344    }
345}