kaccy_bitcoin/
connection_pool.rs

1//! Bitcoin RPC connection pooling for high concurrency
2
3use crate::client::{BitcoinClient, BitcoinNetwork, ReconnectConfig};
4use crate::error::{BitcoinError, Result};
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7use tokio::sync::{RwLock, Semaphore};
8use tracing::{debug, warn};
9
10/// Configuration for the connection pool
11#[derive(Debug, Clone)]
12pub struct PoolConfig {
13    /// Minimum number of connections to maintain
14    pub min_connections: usize,
15    /// Maximum number of connections allowed
16    pub max_connections: usize,
17    /// Maximum time to wait for an available connection
18    pub connection_timeout: Duration,
19    /// Health check interval for idle connections
20    pub health_check_interval: Duration,
21    /// Maximum idle time before closing a connection
22    pub max_idle_time: Duration,
23}
24
25impl Default for PoolConfig {
26    fn default() -> Self {
27        Self {
28            min_connections: 2,
29            max_connections: 10,
30            connection_timeout: Duration::from_secs(30),
31            health_check_interval: Duration::from_secs(60),
32            max_idle_time: Duration::from_secs(300),
33        }
34    }
35}
36
37/// A pooled Bitcoin RPC connection
38struct PooledConnection {
39    client: BitcoinClient,
40    last_used: Instant,
41    is_healthy: bool,
42}
43
44impl PooledConnection {
45    fn new(client: BitcoinClient) -> Self {
46        Self {
47            client,
48            last_used: Instant::now(),
49            is_healthy: true,
50        }
51    }
52
53    fn update_last_used(&mut self) {
54        self.last_used = Instant::now();
55    }
56
57    fn is_idle(&self, max_idle_time: Duration) -> bool {
58        self.last_used.elapsed() > max_idle_time
59    }
60
61    async fn health_check(&mut self) -> bool {
62        match self.client.health_check() {
63            Ok(healthy) => {
64                self.is_healthy = healthy;
65                healthy
66            }
67            Err(e) => {
68                warn!(error = %e, "Connection health check failed");
69                self.is_healthy = false;
70                false
71            }
72        }
73    }
74}
75
76/// Bitcoin RPC connection pool
77pub struct ConnectionPool {
78    url: String,
79    user: String,
80    password: String,
81    network: BitcoinNetwork,
82    reconnect_config: ReconnectConfig,
83    config: PoolConfig,
84    connections: Arc<RwLock<Vec<PooledConnection>>>,
85    semaphore: Arc<Semaphore>,
86}
87
88impl ConnectionPool {
89    /// Create a new connection pool
90    pub async fn new(
91        url: &str,
92        user: &str,
93        password: &str,
94        network: BitcoinNetwork,
95    ) -> Result<Self> {
96        Self::with_config(url, user, password, network, PoolConfig::default()).await
97    }
98
99    /// Create a new connection pool with custom configuration
100    pub async fn with_config(
101        url: &str,
102        user: &str,
103        password: &str,
104        network: BitcoinNetwork,
105        config: PoolConfig,
106    ) -> Result<Self> {
107        let pool = Self {
108            url: url.to_string(),
109            user: user.to_string(),
110            password: password.to_string(),
111            network,
112            reconnect_config: ReconnectConfig::default(),
113            connections: Arc::new(RwLock::new(Vec::new())),
114            semaphore: Arc::new(Semaphore::new(config.max_connections)),
115            config,
116        };
117
118        // Initialize minimum connections
119        pool.initialize_connections().await?;
120
121        // Start background health check task
122        pool.start_health_check_task();
123
124        Ok(pool)
125    }
126
127    /// Initialize minimum number of connections
128    async fn initialize_connections(&self) -> Result<()> {
129        let mut connections = self.connections.write().await;
130
131        for i in 0..self.config.min_connections {
132            match self.create_connection() {
133                Ok(client) => {
134                    connections.push(PooledConnection::new(client));
135                    debug!(connection = i + 1, "Initialized connection");
136                }
137                Err(e) => {
138                    warn!(error = %e, connection = i + 1, "Failed to initialize connection");
139                    if i == 0 {
140                        return Err(e); // Fail if we can't create even one connection
141                    }
142                }
143            }
144        }
145
146        Ok(())
147    }
148
149    /// Create a new connection
150    fn create_connection(&self) -> Result<BitcoinClient> {
151        BitcoinClient::with_config(
152            &self.url,
153            &self.user,
154            &self.password,
155            self.network,
156            self.reconnect_config.clone(),
157        )
158    }
159
160    /// Get a connection from the pool
161    pub async fn get_connection(&self) -> Result<PooledConnectionGuard> {
162        // Acquire semaphore permit
163        let permit = tokio::time::timeout(
164            self.config.connection_timeout,
165            self.semaphore.clone().acquire_owned(),
166        )
167        .await
168        .map_err(|_| BitcoinError::ConnectionTimeout {
169            timeout_secs: self.config.connection_timeout.as_secs(),
170        })?
171        .map_err(|_| BitcoinError::ConnectionPoolExhausted)?;
172
173        let mut connections = self.connections.write().await;
174
175        // Try to find a healthy, idle connection
176        if let Some(pos) = connections
177            .iter()
178            .position(|conn| conn.is_healthy && !conn.is_idle(self.config.max_idle_time))
179        {
180            let mut conn = connections.remove(pos);
181            conn.update_last_used();
182            debug!("Reusing existing connection");
183            return Ok(PooledConnectionGuard {
184                connection: Some(conn),
185                pool: self.connections.clone(),
186                _permit: permit,
187            });
188        }
189
190        // Try to create a new connection if under max
191        if connections.len() < self.config.max_connections {
192            match self.create_connection() {
193                Ok(client) => {
194                    debug!("Created new connection");
195                    let conn = PooledConnection::new(client);
196                    return Ok(PooledConnectionGuard {
197                        connection: Some(conn),
198                        pool: self.connections.clone(),
199                        _permit: permit,
200                    });
201                }
202                Err(e) => {
203                    warn!(error = %e, "Failed to create new connection");
204                }
205            }
206        }
207
208        // If all else fails, try to use any available connection (even if idle)
209        if let Some(mut conn) = connections.pop() {
210            conn.update_last_used();
211            debug!("Using idle connection");
212            return Ok(PooledConnectionGuard {
213                connection: Some(conn),
214                pool: self.connections.clone(),
215                _permit: permit,
216            });
217        }
218
219        Err(BitcoinError::ConnectionPoolExhausted)
220    }
221
222    /// Start background task for health checks
223    fn start_health_check_task(&self) {
224        let connections = self.connections.clone();
225        let interval = self.config.health_check_interval;
226        let max_idle_time = self.config.max_idle_time;
227        let min_connections = self.config.min_connections;
228        let url = self.url.clone();
229        let user = self.user.clone();
230        let password = self.password.clone();
231        let network = self.network;
232        let reconnect_config = self.reconnect_config.clone();
233
234        tokio::spawn(async move {
235            loop {
236                tokio::time::sleep(interval).await;
237
238                let mut conns = connections.write().await;
239
240                // Remove idle connections (but keep minimum)
241                let mut i = 0;
242                while i < conns.len() {
243                    if conns.len() > min_connections && conns[i].is_idle(max_idle_time) {
244                        conns.remove(i);
245                    } else {
246                        i += 1;
247                    }
248                }
249
250                // Health check remaining connections
251                for conn in conns.iter_mut() {
252                    conn.health_check().await;
253                }
254
255                // Remove unhealthy connections
256                conns.retain(|conn| conn.is_healthy);
257
258                // Ensure minimum connections
259                while conns.len() < min_connections {
260                    match BitcoinClient::with_config(
261                        &url,
262                        &user,
263                        &password,
264                        network,
265                        reconnect_config.clone(),
266                    ) {
267                        Ok(client) => {
268                            conns.push(PooledConnection::new(client));
269                            debug!("Added connection to maintain minimum");
270                        }
271                        Err(e) => {
272                            warn!(error = %e, "Failed to create connection during health check");
273                            break;
274                        }
275                    }
276                }
277
278                debug!(
279                    active_connections = conns.len(),
280                    "Connection pool health check completed"
281                );
282            }
283        });
284    }
285
286    /// Get pool statistics
287    pub async fn stats(&self) -> PoolStats {
288        let connections = self.connections.read().await;
289        let healthy_count = connections.iter().filter(|c| c.is_healthy).count();
290
291        PoolStats {
292            total_connections: connections.len(),
293            healthy_connections: healthy_count,
294            max_connections: self.config.max_connections,
295            available_permits: self.semaphore.available_permits(),
296        }
297    }
298}
299
300/// RAII guard for pooled connections
301pub struct PooledConnectionGuard {
302    connection: Option<PooledConnection>,
303    pool: Arc<RwLock<Vec<PooledConnection>>>,
304    _permit: tokio::sync::OwnedSemaphorePermit,
305}
306
307impl PooledConnectionGuard {
308    /// Get a reference to the Bitcoin client
309    pub fn client(&self) -> &BitcoinClient {
310        &self.connection.as_ref().unwrap().client
311    }
312}
313
314impl Drop for PooledConnectionGuard {
315    fn drop(&mut self) {
316        if let Some(connection) = self.connection.take() {
317            let pool = self.pool.clone();
318            tokio::spawn(async move {
319                let mut conns = pool.write().await;
320                conns.push(connection);
321            });
322        }
323    }
324}
325
326/// Connection pool statistics
327#[derive(Debug, Clone)]
328pub struct PoolStats {
329    pub total_connections: usize,
330    pub healthy_connections: usize,
331    pub max_connections: usize,
332    pub available_permits: usize,
333}
334
335#[cfg(test)]
336mod tests {
337    use super::*;
338
339    #[test]
340    fn test_pool_config_defaults() {
341        let config = PoolConfig::default();
342        assert_eq!(config.min_connections, 2);
343        assert_eq!(config.max_connections, 10);
344        assert!(config.connection_timeout.as_secs() > 0);
345    }
346
347    #[test]
348    fn test_pooled_connection_idle_detection() {
349        let client = BitcoinClient::new(
350            "http://localhost:8332",
351            "user",
352            "pass",
353            BitcoinNetwork::Regtest,
354        )
355        .unwrap();
356
357        let conn = PooledConnection::new(client);
358        assert!(!conn.is_idle(Duration::from_secs(1)));
359
360        std::thread::sleep(Duration::from_millis(100));
361        assert!(conn.is_idle(Duration::from_millis(50)));
362    }
363
364    #[test]
365    fn test_pool_stats() {
366        let stats = PoolStats {
367            total_connections: 5,
368            healthy_connections: 5,
369            max_connections: 10,
370            available_permits: 5,
371        };
372
373        assert_eq!(stats.total_connections, 5);
374        assert_eq!(stats.healthy_connections, 5);
375    }
376}