1use crate::client::{BitcoinClient, BitcoinNetwork, ReconnectConfig};
4use crate::error::{BitcoinError, Result};
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7use tokio::sync::{RwLock, Semaphore};
8use tracing::{debug, warn};
9
10#[derive(Debug, Clone)]
12pub struct PoolConfig {
13 pub min_connections: usize,
15 pub max_connections: usize,
17 pub connection_timeout: Duration,
19 pub health_check_interval: Duration,
21 pub max_idle_time: Duration,
23}
24
25impl Default for PoolConfig {
26 fn default() -> Self {
27 Self {
28 min_connections: 2,
29 max_connections: 10,
30 connection_timeout: Duration::from_secs(30),
31 health_check_interval: Duration::from_secs(60),
32 max_idle_time: Duration::from_secs(300),
33 }
34 }
35}
36
37struct PooledConnection {
39 client: BitcoinClient,
40 last_used: Instant,
41 is_healthy: bool,
42}
43
44impl PooledConnection {
45 fn new(client: BitcoinClient) -> Self {
46 Self {
47 client,
48 last_used: Instant::now(),
49 is_healthy: true,
50 }
51 }
52
53 fn update_last_used(&mut self) {
54 self.last_used = Instant::now();
55 }
56
57 fn is_idle(&self, max_idle_time: Duration) -> bool {
58 self.last_used.elapsed() > max_idle_time
59 }
60
61 async fn health_check(&mut self) -> bool {
62 match self.client.health_check() {
63 Ok(healthy) => {
64 self.is_healthy = healthy;
65 healthy
66 }
67 Err(e) => {
68 warn!(error = %e, "Connection health check failed");
69 self.is_healthy = false;
70 false
71 }
72 }
73 }
74}
75
76pub struct ConnectionPool {
78 url: String,
79 user: String,
80 password: String,
81 network: BitcoinNetwork,
82 reconnect_config: ReconnectConfig,
83 config: PoolConfig,
84 connections: Arc<RwLock<Vec<PooledConnection>>>,
85 semaphore: Arc<Semaphore>,
86}
87
88impl ConnectionPool {
89 pub async fn new(
91 url: &str,
92 user: &str,
93 password: &str,
94 network: BitcoinNetwork,
95 ) -> Result<Self> {
96 Self::with_config(url, user, password, network, PoolConfig::default()).await
97 }
98
99 pub async fn with_config(
101 url: &str,
102 user: &str,
103 password: &str,
104 network: BitcoinNetwork,
105 config: PoolConfig,
106 ) -> Result<Self> {
107 let pool = Self {
108 url: url.to_string(),
109 user: user.to_string(),
110 password: password.to_string(),
111 network,
112 reconnect_config: ReconnectConfig::default(),
113 connections: Arc::new(RwLock::new(Vec::new())),
114 semaphore: Arc::new(Semaphore::new(config.max_connections)),
115 config,
116 };
117
118 pool.initialize_connections().await?;
120
121 pool.start_health_check_task();
123
124 Ok(pool)
125 }
126
127 async fn initialize_connections(&self) -> Result<()> {
129 let mut connections = self.connections.write().await;
130
131 for i in 0..self.config.min_connections {
132 match self.create_connection() {
133 Ok(client) => {
134 connections.push(PooledConnection::new(client));
135 debug!(connection = i + 1, "Initialized connection");
136 }
137 Err(e) => {
138 warn!(error = %e, connection = i + 1, "Failed to initialize connection");
139 if i == 0 {
140 return Err(e); }
142 }
143 }
144 }
145
146 Ok(())
147 }
148
149 fn create_connection(&self) -> Result<BitcoinClient> {
151 BitcoinClient::with_config(
152 &self.url,
153 &self.user,
154 &self.password,
155 self.network,
156 self.reconnect_config.clone(),
157 )
158 }
159
160 pub async fn get_connection(&self) -> Result<PooledConnectionGuard> {
162 let permit = tokio::time::timeout(
164 self.config.connection_timeout,
165 self.semaphore.clone().acquire_owned(),
166 )
167 .await
168 .map_err(|_| BitcoinError::ConnectionTimeout {
169 timeout_secs: self.config.connection_timeout.as_secs(),
170 })?
171 .map_err(|_| BitcoinError::ConnectionPoolExhausted)?;
172
173 let mut connections = self.connections.write().await;
174
175 if let Some(pos) = connections
177 .iter()
178 .position(|conn| conn.is_healthy && !conn.is_idle(self.config.max_idle_time))
179 {
180 let mut conn = connections.remove(pos);
181 conn.update_last_used();
182 debug!("Reusing existing connection");
183 return Ok(PooledConnectionGuard {
184 connection: Some(conn),
185 pool: self.connections.clone(),
186 _permit: permit,
187 });
188 }
189
190 if connections.len() < self.config.max_connections {
192 match self.create_connection() {
193 Ok(client) => {
194 debug!("Created new connection");
195 let conn = PooledConnection::new(client);
196 return Ok(PooledConnectionGuard {
197 connection: Some(conn),
198 pool: self.connections.clone(),
199 _permit: permit,
200 });
201 }
202 Err(e) => {
203 warn!(error = %e, "Failed to create new connection");
204 }
205 }
206 }
207
208 if let Some(mut conn) = connections.pop() {
210 conn.update_last_used();
211 debug!("Using idle connection");
212 return Ok(PooledConnectionGuard {
213 connection: Some(conn),
214 pool: self.connections.clone(),
215 _permit: permit,
216 });
217 }
218
219 Err(BitcoinError::ConnectionPoolExhausted)
220 }
221
222 fn start_health_check_task(&self) {
224 let connections = self.connections.clone();
225 let interval = self.config.health_check_interval;
226 let max_idle_time = self.config.max_idle_time;
227 let min_connections = self.config.min_connections;
228 let url = self.url.clone();
229 let user = self.user.clone();
230 let password = self.password.clone();
231 let network = self.network;
232 let reconnect_config = self.reconnect_config.clone();
233
234 tokio::spawn(async move {
235 loop {
236 tokio::time::sleep(interval).await;
237
238 let mut conns = connections.write().await;
239
240 let mut i = 0;
242 while i < conns.len() {
243 if conns.len() > min_connections && conns[i].is_idle(max_idle_time) {
244 conns.remove(i);
245 } else {
246 i += 1;
247 }
248 }
249
250 for conn in conns.iter_mut() {
252 conn.health_check().await;
253 }
254
255 conns.retain(|conn| conn.is_healthy);
257
258 while conns.len() < min_connections {
260 match BitcoinClient::with_config(
261 &url,
262 &user,
263 &password,
264 network,
265 reconnect_config.clone(),
266 ) {
267 Ok(client) => {
268 conns.push(PooledConnection::new(client));
269 debug!("Added connection to maintain minimum");
270 }
271 Err(e) => {
272 warn!(error = %e, "Failed to create connection during health check");
273 break;
274 }
275 }
276 }
277
278 debug!(
279 active_connections = conns.len(),
280 "Connection pool health check completed"
281 );
282 }
283 });
284 }
285
286 pub async fn stats(&self) -> PoolStats {
288 let connections = self.connections.read().await;
289 let healthy_count = connections.iter().filter(|c| c.is_healthy).count();
290
291 PoolStats {
292 total_connections: connections.len(),
293 healthy_connections: healthy_count,
294 max_connections: self.config.max_connections,
295 available_permits: self.semaphore.available_permits(),
296 }
297 }
298}
299
300pub struct PooledConnectionGuard {
302 connection: Option<PooledConnection>,
303 pool: Arc<RwLock<Vec<PooledConnection>>>,
304 _permit: tokio::sync::OwnedSemaphorePermit,
305}
306
307impl PooledConnectionGuard {
308 pub fn client(&self) -> &BitcoinClient {
310 &self.connection.as_ref().unwrap().client
311 }
312}
313
314impl Drop for PooledConnectionGuard {
315 fn drop(&mut self) {
316 if let Some(connection) = self.connection.take() {
317 let pool = self.pool.clone();
318 tokio::spawn(async move {
319 let mut conns = pool.write().await;
320 conns.push(connection);
321 });
322 }
323 }
324}
325
326#[derive(Debug, Clone)]
328pub struct PoolStats {
329 pub total_connections: usize,
330 pub healthy_connections: usize,
331 pub max_connections: usize,
332 pub available_permits: usize,
333}
334
335#[cfg(test)]
336mod tests {
337 use super::*;
338
339 #[test]
340 fn test_pool_config_defaults() {
341 let config = PoolConfig::default();
342 assert_eq!(config.min_connections, 2);
343 assert_eq!(config.max_connections, 10);
344 assert!(config.connection_timeout.as_secs() > 0);
345 }
346
347 #[test]
348 fn test_pooled_connection_idle_detection() {
349 let client = BitcoinClient::new(
350 "http://localhost:8332",
351 "user",
352 "pass",
353 BitcoinNetwork::Regtest,
354 )
355 .unwrap();
356
357 let conn = PooledConnection::new(client);
358 assert!(!conn.is_idle(Duration::from_secs(1)));
359
360 std::thread::sleep(Duration::from_millis(100));
361 assert!(conn.is_idle(Duration::from_millis(50)));
362 }
363
364 #[test]
365 fn test_pool_stats() {
366 let stats = PoolStats {
367 total_connections: 5,
368 healthy_connections: 5,
369 max_connections: 10,
370 available_permits: 5,
371 };
372
373 assert_eq!(stats.total_connections, 5);
374 assert_eq!(stats.healthy_connections, 5);
375 }
376}