1use crate::{Error, EvmAdapter};
10use std::sync::atomic::{AtomicUsize, Ordering};
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13use tokio::sync::RwLock;
14
15#[derive(Debug, Clone)]
17pub struct EndpointHealth {
18 pub is_healthy: bool,
20 pub last_success: Option<Instant>,
22 pub last_failure: Option<Instant>,
24 pub failure_count: u32,
26 pub avg_response_time_ms: u64,
28}
29
30impl Default for EndpointHealth {
31 fn default() -> Self {
32 Self {
33 is_healthy: true,
34 last_success: None,
35 last_failure: None,
36 failure_count: 0,
37 avg_response_time_ms: 0,
38 }
39 }
40}
41
42pub struct PooledConnection {
44 adapter: Arc<EvmAdapter>,
45 endpoint: String,
46 health: Arc<RwLock<EndpointHealth>>,
47}
48
49impl PooledConnection {
50 pub fn adapter(&self) -> &EvmAdapter {
52 &self.adapter
53 }
54
55 pub fn endpoint(&self) -> &str {
57 &self.endpoint
58 }
59
60 pub async fn health(&self) -> EndpointHealth {
62 self.health.read().await.clone()
63 }
64
65 pub async fn mark_healthy(&self, response_time_ms: u64) {
67 let mut health = self.health.write().await;
68 health.is_healthy = true;
69 health.last_success = Some(Instant::now());
70 health.failure_count = 0;
71
72 if health.avg_response_time_ms == 0 {
74 health.avg_response_time_ms = response_time_ms;
75 } else {
76 health.avg_response_time_ms = (health.avg_response_time_ms * 9 + response_time_ms) / 10;
77 }
78 }
79
80 pub async fn mark_unhealthy(&self) {
82 let mut health = self.health.write().await;
83 health.last_failure = Some(Instant::now());
84 health.failure_count += 1;
85
86 if health.failure_count >= 3 {
88 health.is_healthy = false;
89 tracing::warn!("Endpoint {} marked as unhealthy", self.endpoint);
90 }
91 }
92}
93
94#[derive(Debug, Clone)]
96pub struct PoolConfig {
97 pub max_connections_per_endpoint: usize,
99 pub health_check_interval_secs: u64,
101 pub health_check_timeout_secs: u64,
103 pub max_failures: u32,
105 pub unhealthy_retry_delay_secs: u64,
107}
108
109impl Default for PoolConfig {
110 fn default() -> Self {
111 Self {
112 max_connections_per_endpoint: 10,
113 health_check_interval_secs: 30,
114 health_check_timeout_secs: 5,
115 max_failures: 3,
116 unhealthy_retry_delay_secs: 60,
117 }
118 }
119}
120
121pub struct ConnectionPool {
123 endpoints: Vec<String>,
124 connections: Arc<RwLock<Vec<PooledConnection>>>,
125 next_index: AtomicUsize,
126 config: PoolConfig,
127}
128
129impl ConnectionPool {
130 pub async fn new(endpoints: Vec<String>) -> Result<Self, Error> {
132 Self::with_config(endpoints, PoolConfig::default()).await
133 }
134
135 pub async fn with_config(endpoints: Vec<String>, config: PoolConfig) -> Result<Self, Error> {
137 if endpoints.is_empty() {
138 return Err(Error::Connection("No endpoints provided".to_string()));
139 }
140
141 tracing::info!(
142 "Creating connection pool with {} endpoints",
143 endpoints.len()
144 );
145
146 let mut connections = Vec::new();
147
148 for endpoint in &endpoints {
150 match EvmAdapter::connect(endpoint).await {
151 Ok(adapter) => {
152 let conn = PooledConnection {
153 adapter: Arc::new(adapter),
154 endpoint: endpoint.clone(),
155 health: Arc::new(RwLock::new(EndpointHealth::default())),
156 };
157 connections.push(conn);
158 tracing::info!("Successfully connected to endpoint: {}", endpoint);
159 }
160 Err(e) => {
161 tracing::warn!("Failed to connect to endpoint {}: {}", endpoint, e);
162 let adapter = EvmAdapter::connect(endpoint).await?;
164 let health = EndpointHealth {
165 is_healthy: false,
166 failure_count: 1,
167 ..Default::default()
168 };
169
170 let conn = PooledConnection {
171 adapter: Arc::new(adapter),
172 endpoint: endpoint.clone(),
173 health: Arc::new(RwLock::new(health)),
174 };
175 connections.push(conn);
176 }
177 }
178 }
179
180 Ok(Self {
181 endpoints,
182 connections: Arc::new(RwLock::new(connections)),
183 next_index: AtomicUsize::new(0),
184 config,
185 })
186 }
187
188 pub async fn get_connection(&self) -> Result<Arc<PooledConnection>, Error> {
192 let connections = self.connections.read().await;
193
194 if connections.is_empty() {
195 return Err(Error::Connection("No connections available".to_string()));
196 }
197
198 let total = connections.len();
199 let mut attempts = 0;
200
201 while attempts < total {
203 let index = self.next_index.fetch_add(1, Ordering::Relaxed) % total;
204 let conn = &connections[index];
205
206 let health = conn.health.read().await;
207 if health.is_healthy {
208 drop(health);
209 return Ok(Arc::new(PooledConnection {
210 adapter: conn.adapter.clone(),
211 endpoint: conn.endpoint.clone(),
212 health: conn.health.clone(),
213 }));
214 }
215
216 if let Some(last_failure) = health.last_failure {
218 if last_failure.elapsed().as_secs() > self.config.unhealthy_retry_delay_secs {
219 drop(health);
220 tracing::info!("Retrying previously unhealthy endpoint: {}", conn.endpoint);
221 return Ok(Arc::new(PooledConnection {
222 adapter: conn.adapter.clone(),
223 endpoint: conn.endpoint.clone(),
224 health: conn.health.clone(),
225 }));
226 }
227 }
228
229 attempts += 1;
230 }
231
232 let conn = &connections[0];
234 tracing::warn!("All endpoints unhealthy, returning first endpoint");
235 Ok(Arc::new(PooledConnection {
236 adapter: conn.adapter.clone(),
237 endpoint: conn.endpoint.clone(),
238 health: conn.health.clone(),
239 }))
240 }
241
242 pub async fn health_status(&self) -> Vec<(String, EndpointHealth)> {
244 let connections = self.connections.read().await;
245 let mut status = Vec::new();
246
247 for conn in connections.iter() {
248 let health = conn.health.read().await.clone();
249 status.push((conn.endpoint.clone(), health));
250 }
251
252 status
253 }
254
255 pub async fn run_health_checks(&self) -> Result<(), Error> {
257 tracing::debug!("Running health checks on all endpoints");
258
259 let connections = self.connections.read().await;
260
261 for conn in connections.iter() {
262 let start = Instant::now();
263
264 match conn.adapter.provider().get_block_number().await {
266 Ok(_) => {
267 let elapsed = start.elapsed().as_millis() as u64;
268 conn.mark_healthy(elapsed).await;
269 tracing::debug!("Health check passed for {}: {}ms", conn.endpoint, elapsed);
270 }
271 Err(e) => {
272 conn.mark_unhealthy().await;
273 tracing::warn!("Health check failed for {}: {}", conn.endpoint, e);
274 }
275 }
276 }
277
278 Ok(())
279 }
280
281 pub fn start_health_checker(self: Arc<Self>) {
283 let pool = self.clone();
284 let interval = Duration::from_secs(self.config.health_check_interval_secs);
285 let interval_secs = self.config.health_check_interval_secs;
286
287 tokio::spawn(async move {
288 loop {
289 tokio::time::sleep(interval).await;
290
291 if let Err(e) = pool.run_health_checks().await {
292 tracing::error!("Health check error: {}", e);
293 }
294 }
295 });
296
297 tracing::info!("Started health checker with interval: {}s", interval_secs);
298 }
299
300 pub fn endpoint_count(&self) -> usize {
302 self.endpoints.len()
303 }
304
305 pub fn endpoints(&self) -> &[String] {
307 &self.endpoints
308 }
309}
310
311#[cfg(test)]
312mod tests {
313 use super::*;
314
315 #[test]
316 fn test_pool_config_default() {
317 let config = PoolConfig::default();
318 assert_eq!(config.max_connections_per_endpoint, 10);
319 assert_eq!(config.health_check_interval_secs, 30);
320 assert_eq!(config.max_failures, 3);
321 }
322
323 #[test]
324 fn test_endpoint_health_default() {
325 let health = EndpointHealth::default();
326 assert!(health.is_healthy);
327 assert_eq!(health.failure_count, 0);
328 }
329
330 #[tokio::test]
331 #[ignore] async fn test_connection_pool() {
333 let endpoints = vec![
334 "https://eth.llamarpc.com".to_string(),
335 "https://ethereum.publicnode.com".to_string(),
336 ];
337
338 let pool = ConnectionPool::new(endpoints).await.unwrap();
339 assert_eq!(pool.endpoint_count(), 2);
340
341 let conn = pool.get_connection().await.unwrap();
343 assert!(!conn.endpoint().is_empty());
344 }
345}