rustywallet_electrum/
pool.rs

1//! Connection pooling for Electrum clients.
2//!
3//! This module provides a connection pool that manages multiple
4//! Electrum client connections for improved performance and reliability.
5
6use std::collections::VecDeque;
7use std::sync::atomic::{AtomicUsize, Ordering};
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11use tokio::sync::{Mutex, Semaphore};
12
13use crate::client::ElectrumClient;
14use crate::error::{ElectrumError, Result};
15use crate::types::ClientConfig;
16
17/// A pooled connection wrapper.
18struct PooledConnection {
19    client: ElectrumClient,
20    created_at: Instant,
21    last_used: Instant,
22    use_count: usize,
23}
24
25impl PooledConnection {
26    fn new(client: ElectrumClient) -> Self {
27        let now = Instant::now();
28        Self {
29            client,
30            created_at: now,
31            last_used: now,
32            use_count: 0,
33        }
34    }
35
36    fn touch(&mut self) {
37        self.last_used = Instant::now();
38        self.use_count += 1;
39    }
40
41    fn age(&self) -> Duration {
42        self.created_at.elapsed()
43    }
44
45    fn idle_time(&self) -> Duration {
46        self.last_used.elapsed()
47    }
48}
49
50/// Configuration for the connection pool.
51#[derive(Debug, Clone)]
52pub struct PoolConfig {
53    /// Minimum number of connections to maintain
54    pub min_connections: usize,
55    /// Maximum number of connections allowed
56    pub max_connections: usize,
57    /// Maximum time a connection can be idle before being closed
58    pub idle_timeout: Duration,
59    /// Maximum age of a connection before it's recycled
60    pub max_age: Duration,
61    /// Timeout for acquiring a connection from the pool
62    pub acquire_timeout: Duration,
63    /// Whether to validate connections before returning them
64    pub validate_on_acquire: bool,
65}
66
67impl Default for PoolConfig {
68    fn default() -> Self {
69        Self {
70            min_connections: 1,
71            max_connections: 10,
72            idle_timeout: Duration::from_secs(300),
73            max_age: Duration::from_secs(3600),
74            acquire_timeout: Duration::from_secs(30),
75            validate_on_acquire: true,
76        }
77    }
78}
79
80impl PoolConfig {
81    /// Create a new pool configuration.
82    pub fn new() -> Self {
83        Self::default()
84    }
85
86    /// Set minimum connections.
87    pub fn min_connections(mut self, min: usize) -> Self {
88        self.min_connections = min;
89        self
90    }
91
92    /// Set maximum connections.
93    pub fn max_connections(mut self, max: usize) -> Self {
94        self.max_connections = max;
95        self
96    }
97
98    /// Set idle timeout.
99    pub fn idle_timeout(mut self, timeout: Duration) -> Self {
100        self.idle_timeout = timeout;
101        self
102    }
103
104    /// Set maximum connection age.
105    pub fn max_age(mut self, age: Duration) -> Self {
106        self.max_age = age;
107        self
108    }
109
110    /// Set acquire timeout.
111    pub fn acquire_timeout(mut self, timeout: Duration) -> Self {
112        self.acquire_timeout = timeout;
113        self
114    }
115
116    /// Set whether to validate on acquire.
117    pub fn validate_on_acquire(mut self, validate: bool) -> Self {
118        self.validate_on_acquire = validate;
119        self
120    }
121}
122
123/// Connection pool for Electrum clients.
124pub struct ConnectionPool {
125    config: PoolConfig,
126    client_config: ClientConfig,
127    connections: Mutex<VecDeque<PooledConnection>>,
128    semaphore: Arc<Semaphore>,
129    active_count: AtomicUsize,
130    total_created: AtomicUsize,
131}
132
133impl ConnectionPool {
134    /// Create a new connection pool.
135    pub fn new(client_config: ClientConfig, pool_config: PoolConfig) -> Self {
136        let semaphore = Arc::new(Semaphore::new(pool_config.max_connections));
137        
138        Self {
139            config: pool_config,
140            client_config,
141            connections: Mutex::new(VecDeque::new()),
142            semaphore,
143            active_count: AtomicUsize::new(0),
144            total_created: AtomicUsize::new(0),
145        }
146    }
147
148    /// Create a pool with default configuration.
149    pub fn with_defaults(client_config: ClientConfig) -> Self {
150        Self::new(client_config, PoolConfig::default())
151    }
152
153    /// Initialize the pool with minimum connections.
154    pub async fn initialize(&self) -> Result<()> {
155        let mut conns = self.connections.lock().await;
156        
157        while conns.len() < self.config.min_connections {
158            let client = ElectrumClient::with_config(self.client_config.clone()).await?;
159            conns.push_back(PooledConnection::new(client));
160            self.total_created.fetch_add(1, Ordering::SeqCst);
161        }
162        
163        Ok(())
164    }
165
166    /// Acquire a connection from the pool.
167    pub async fn acquire(&self) -> Result<PooledClient<'_>> {
168        // Try to acquire a permit
169        let permit = tokio::time::timeout(
170            self.config.acquire_timeout,
171            self.semaphore.clone().acquire_owned(),
172        )
173        .await
174        .map_err(|_| ElectrumError::Timeout)?
175        .map_err(|_| ElectrumError::ConnectionFailed("Pool closed".into()))?;
176
177        // Try to get an existing connection
178        let mut conns = self.connections.lock().await;
179        
180        while let Some(mut conn) = conns.pop_front() {
181            // Check if connection is still valid
182            if conn.age() > self.config.max_age {
183                continue; // Connection too old, discard
184            }
185            
186            if conn.idle_time() > self.config.idle_timeout {
187                continue; // Connection idle too long, discard
188            }
189
190            // Validate if configured
191            if self.config.validate_on_acquire {
192                drop(conns); // Release lock during validation
193                
194                if conn.client.ping().await.is_ok() {
195                    conn.touch();
196                    self.active_count.fetch_add(1, Ordering::SeqCst);
197                    return Ok(PooledClient {
198                        connection: Some(conn),
199                        pool: self,
200                        _permit: permit,
201                    });
202                }
203                
204                conns = self.connections.lock().await;
205                continue; // Validation failed, try next
206            }
207
208            conn.touch();
209            self.active_count.fetch_add(1, Ordering::SeqCst);
210            return Ok(PooledClient {
211                connection: Some(conn),
212                pool: self,
213                _permit: permit,
214            });
215        }
216        
217        drop(conns);
218
219        // No available connections, create a new one
220        let client = ElectrumClient::with_config(self.client_config.clone()).await?;
221        let mut conn = PooledConnection::new(client);
222        conn.touch();
223        
224        self.total_created.fetch_add(1, Ordering::SeqCst);
225        self.active_count.fetch_add(1, Ordering::SeqCst);
226        
227        Ok(PooledClient {
228            connection: Some(conn),
229            pool: self,
230            _permit: permit,
231        })
232    }
233
234    /// Return a connection to the pool.
235    #[allow(dead_code)]
236    async fn release(&self, conn: PooledConnection) {
237        self.active_count.fetch_sub(1, Ordering::SeqCst);
238        
239        // Check if connection should be kept
240        if conn.age() > self.config.max_age {
241            return; // Too old, discard
242        }
243
244        let mut conns = self.connections.lock().await;
245        
246        // Only keep if under max
247        if conns.len() < self.config.max_connections {
248            conns.push_back(conn);
249        }
250    }
251
252    /// Get pool statistics.
253    pub async fn stats(&self) -> PoolStats {
254        let conns = self.connections.lock().await;
255        
256        PoolStats {
257            idle_connections: conns.len(),
258            active_connections: self.active_count.load(Ordering::SeqCst),
259            total_created: self.total_created.load(Ordering::SeqCst),
260            max_connections: self.config.max_connections,
261        }
262    }
263
264    /// Close all connections and reset the pool.
265    pub async fn close(&self) {
266        let mut conns = self.connections.lock().await;
267        conns.clear();
268    }
269
270    /// Remove idle connections that exceed the timeout.
271    pub async fn cleanup(&self) {
272        let mut conns = self.connections.lock().await;
273        
274        conns.retain(|conn| {
275            conn.idle_time() <= self.config.idle_timeout && 
276            conn.age() <= self.config.max_age
277        });
278
279        // Ensure minimum connections
280        // Note: This doesn't create new connections, just retains existing ones
281    }
282}
283
284/// Pool statistics.
285#[derive(Debug, Clone)]
286pub struct PoolStats {
287    /// Number of idle connections in the pool
288    pub idle_connections: usize,
289    /// Number of connections currently in use
290    pub active_connections: usize,
291    /// Total number of connections created
292    pub total_created: usize,
293    /// Maximum allowed connections
294    pub max_connections: usize,
295}
296
297impl PoolStats {
298    /// Get total connections (idle + active).
299    pub fn total_connections(&self) -> usize {
300        self.idle_connections + self.active_connections
301    }
302
303    /// Get pool utilization as a percentage.
304    pub fn utilization(&self) -> f64 {
305        if self.max_connections == 0 {
306            0.0
307        } else {
308            (self.active_connections as f64 / self.max_connections as f64) * 100.0
309        }
310    }
311}
312
313/// A client borrowed from the connection pool.
314///
315/// The connection is automatically returned to the pool when dropped.
316pub struct PooledClient<'a> {
317    connection: Option<PooledConnection>,
318    pool: &'a ConnectionPool,
319    _permit: tokio::sync::OwnedSemaphorePermit,
320}
321
322impl<'a> PooledClient<'a> {
323    /// Get a reference to the underlying client.
324    pub fn client(&self) -> &ElectrumClient {
325        &self.connection.as_ref().unwrap().client
326    }
327
328    /// Get the connection's use count.
329    pub fn use_count(&self) -> usize {
330        self.connection.as_ref().unwrap().use_count
331    }
332
333    /// Get the connection's age.
334    pub fn age(&self) -> Duration {
335        self.connection.as_ref().unwrap().age()
336    }
337}
338
339impl<'a> std::ops::Deref for PooledClient<'a> {
340    type Target = ElectrumClient;
341
342    fn deref(&self) -> &Self::Target {
343        self.client()
344    }
345}
346
347impl<'a> Drop for PooledClient<'a> {
348    fn drop(&mut self) {
349        // Note: We can't spawn async task here due to lifetime constraints.
350        // The connection will be dropped. For proper pooling, use the pool's
351        // release method explicitly before dropping, or use a different pattern.
352        // This is a limitation of the current design.
353        self.connection.take();
354        self.pool.active_count.fetch_sub(1, Ordering::SeqCst);
355    }
356}
357
358#[cfg(test)]
359mod tests {
360    use super::*;
361
362    #[test]
363    fn test_pool_config_default() {
364        let config = PoolConfig::default();
365        assert_eq!(config.min_connections, 1);
366        assert_eq!(config.max_connections, 10);
367    }
368
369    #[test]
370    fn test_pool_config_builder() {
371        let config = PoolConfig::new()
372            .min_connections(2)
373            .max_connections(20)
374            .idle_timeout(Duration::from_secs(60));
375        
376        assert_eq!(config.min_connections, 2);
377        assert_eq!(config.max_connections, 20);
378        assert_eq!(config.idle_timeout, Duration::from_secs(60));
379    }
380
381    #[test]
382    fn test_pool_stats() {
383        let stats = PoolStats {
384            idle_connections: 5,
385            active_connections: 3,
386            total_created: 10,
387            max_connections: 10,
388        };
389        
390        assert_eq!(stats.total_connections(), 8);
391        assert_eq!(stats.utilization(), 30.0);
392    }
393}