1use std::collections::VecDeque;
7use std::sync::atomic::{AtomicUsize, Ordering};
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11use tokio::sync::{Mutex, Semaphore};
12
13use crate::client::ElectrumClient;
14use crate::error::{ElectrumError, Result};
15use crate::types::ClientConfig;
16
17struct PooledConnection {
19 client: ElectrumClient,
20 created_at: Instant,
21 last_used: Instant,
22 use_count: usize,
23}
24
25impl PooledConnection {
26 fn new(client: ElectrumClient) -> Self {
27 let now = Instant::now();
28 Self {
29 client,
30 created_at: now,
31 last_used: now,
32 use_count: 0,
33 }
34 }
35
36 fn touch(&mut self) {
37 self.last_used = Instant::now();
38 self.use_count += 1;
39 }
40
41 fn age(&self) -> Duration {
42 self.created_at.elapsed()
43 }
44
45 fn idle_time(&self) -> Duration {
46 self.last_used.elapsed()
47 }
48}
49
50#[derive(Debug, Clone)]
52pub struct PoolConfig {
53 pub min_connections: usize,
55 pub max_connections: usize,
57 pub idle_timeout: Duration,
59 pub max_age: Duration,
61 pub acquire_timeout: Duration,
63 pub validate_on_acquire: bool,
65}
66
67impl Default for PoolConfig {
68 fn default() -> Self {
69 Self {
70 min_connections: 1,
71 max_connections: 10,
72 idle_timeout: Duration::from_secs(300),
73 max_age: Duration::from_secs(3600),
74 acquire_timeout: Duration::from_secs(30),
75 validate_on_acquire: true,
76 }
77 }
78}
79
80impl PoolConfig {
81 pub fn new() -> Self {
83 Self::default()
84 }
85
86 pub fn min_connections(mut self, min: usize) -> Self {
88 self.min_connections = min;
89 self
90 }
91
92 pub fn max_connections(mut self, max: usize) -> Self {
94 self.max_connections = max;
95 self
96 }
97
98 pub fn idle_timeout(mut self, timeout: Duration) -> Self {
100 self.idle_timeout = timeout;
101 self
102 }
103
104 pub fn max_age(mut self, age: Duration) -> Self {
106 self.max_age = age;
107 self
108 }
109
110 pub fn acquire_timeout(mut self, timeout: Duration) -> Self {
112 self.acquire_timeout = timeout;
113 self
114 }
115
116 pub fn validate_on_acquire(mut self, validate: bool) -> Self {
118 self.validate_on_acquire = validate;
119 self
120 }
121}
122
123pub struct ConnectionPool {
125 config: PoolConfig,
126 client_config: ClientConfig,
127 connections: Mutex<VecDeque<PooledConnection>>,
128 semaphore: Arc<Semaphore>,
129 active_count: AtomicUsize,
130 total_created: AtomicUsize,
131}
132
133impl ConnectionPool {
134 pub fn new(client_config: ClientConfig, pool_config: PoolConfig) -> Self {
136 let semaphore = Arc::new(Semaphore::new(pool_config.max_connections));
137
138 Self {
139 config: pool_config,
140 client_config,
141 connections: Mutex::new(VecDeque::new()),
142 semaphore,
143 active_count: AtomicUsize::new(0),
144 total_created: AtomicUsize::new(0),
145 }
146 }
147
148 pub fn with_defaults(client_config: ClientConfig) -> Self {
150 Self::new(client_config, PoolConfig::default())
151 }
152
153 pub async fn initialize(&self) -> Result<()> {
155 let mut conns = self.connections.lock().await;
156
157 while conns.len() < self.config.min_connections {
158 let client = ElectrumClient::with_config(self.client_config.clone()).await?;
159 conns.push_back(PooledConnection::new(client));
160 self.total_created.fetch_add(1, Ordering::SeqCst);
161 }
162
163 Ok(())
164 }
165
166 pub async fn acquire(&self) -> Result<PooledClient<'_>> {
168 let permit = tokio::time::timeout(
170 self.config.acquire_timeout,
171 self.semaphore.clone().acquire_owned(),
172 )
173 .await
174 .map_err(|_| ElectrumError::Timeout)?
175 .map_err(|_| ElectrumError::ConnectionFailed("Pool closed".into()))?;
176
177 let mut conns = self.connections.lock().await;
179
180 while let Some(mut conn) = conns.pop_front() {
181 if conn.age() > self.config.max_age {
183 continue; }
185
186 if conn.idle_time() > self.config.idle_timeout {
187 continue; }
189
190 if self.config.validate_on_acquire {
192 drop(conns); if conn.client.ping().await.is_ok() {
195 conn.touch();
196 self.active_count.fetch_add(1, Ordering::SeqCst);
197 return Ok(PooledClient {
198 connection: Some(conn),
199 pool: self,
200 _permit: permit,
201 });
202 }
203
204 conns = self.connections.lock().await;
205 continue; }
207
208 conn.touch();
209 self.active_count.fetch_add(1, Ordering::SeqCst);
210 return Ok(PooledClient {
211 connection: Some(conn),
212 pool: self,
213 _permit: permit,
214 });
215 }
216
217 drop(conns);
218
219 let client = ElectrumClient::with_config(self.client_config.clone()).await?;
221 let mut conn = PooledConnection::new(client);
222 conn.touch();
223
224 self.total_created.fetch_add(1, Ordering::SeqCst);
225 self.active_count.fetch_add(1, Ordering::SeqCst);
226
227 Ok(PooledClient {
228 connection: Some(conn),
229 pool: self,
230 _permit: permit,
231 })
232 }
233
234 #[allow(dead_code)]
236 async fn release(&self, conn: PooledConnection) {
237 self.active_count.fetch_sub(1, Ordering::SeqCst);
238
239 if conn.age() > self.config.max_age {
241 return; }
243
244 let mut conns = self.connections.lock().await;
245
246 if conns.len() < self.config.max_connections {
248 conns.push_back(conn);
249 }
250 }
251
252 pub async fn stats(&self) -> PoolStats {
254 let conns = self.connections.lock().await;
255
256 PoolStats {
257 idle_connections: conns.len(),
258 active_connections: self.active_count.load(Ordering::SeqCst),
259 total_created: self.total_created.load(Ordering::SeqCst),
260 max_connections: self.config.max_connections,
261 }
262 }
263
264 pub async fn close(&self) {
266 let mut conns = self.connections.lock().await;
267 conns.clear();
268 }
269
270 pub async fn cleanup(&self) {
272 let mut conns = self.connections.lock().await;
273
274 conns.retain(|conn| {
275 conn.idle_time() <= self.config.idle_timeout &&
276 conn.age() <= self.config.max_age
277 });
278
279 }
282}
283
284#[derive(Debug, Clone)]
286pub struct PoolStats {
287 pub idle_connections: usize,
289 pub active_connections: usize,
291 pub total_created: usize,
293 pub max_connections: usize,
295}
296
297impl PoolStats {
298 pub fn total_connections(&self) -> usize {
300 self.idle_connections + self.active_connections
301 }
302
303 pub fn utilization(&self) -> f64 {
305 if self.max_connections == 0 {
306 0.0
307 } else {
308 (self.active_connections as f64 / self.max_connections as f64) * 100.0
309 }
310 }
311}
312
313pub struct PooledClient<'a> {
317 connection: Option<PooledConnection>,
318 pool: &'a ConnectionPool,
319 _permit: tokio::sync::OwnedSemaphorePermit,
320}
321
322impl<'a> PooledClient<'a> {
323 pub fn client(&self) -> &ElectrumClient {
325 &self.connection.as_ref().unwrap().client
326 }
327
328 pub fn use_count(&self) -> usize {
330 self.connection.as_ref().unwrap().use_count
331 }
332
333 pub fn age(&self) -> Duration {
335 self.connection.as_ref().unwrap().age()
336 }
337}
338
339impl<'a> std::ops::Deref for PooledClient<'a> {
340 type Target = ElectrumClient;
341
342 fn deref(&self) -> &Self::Target {
343 self.client()
344 }
345}
346
347impl<'a> Drop for PooledClient<'a> {
348 fn drop(&mut self) {
349 self.connection.take();
354 self.pool.active_count.fetch_sub(1, Ordering::SeqCst);
355 }
356}
357
358#[cfg(test)]
359mod tests {
360 use super::*;
361
362 #[test]
363 fn test_pool_config_default() {
364 let config = PoolConfig::default();
365 assert_eq!(config.min_connections, 1);
366 assert_eq!(config.max_connections, 10);
367 }
368
369 #[test]
370 fn test_pool_config_builder() {
371 let config = PoolConfig::new()
372 .min_connections(2)
373 .max_connections(20)
374 .idle_timeout(Duration::from_secs(60));
375
376 assert_eq!(config.min_connections, 2);
377 assert_eq!(config.max_connections, 20);
378 assert_eq!(config.idle_timeout, Duration::from_secs(60));
379 }
380
381 #[test]
382 fn test_pool_stats() {
383 let stats = PoolStats {
384 idle_connections: 5,
385 active_connections: 3,
386 total_created: 10,
387 max_connections: 10,
388 };
389
390 assert_eq!(stats.total_connections(), 8);
391 assert_eq!(stats.utilization(), 30.0);
392 }
393}