Skip to main content

oxigdal_websocket/server/
pool.rs

1//! Connection pooling for WebSocket server
2
3use crate::error::Result;
4use crate::server::connection::{Connection, ConnectionId};
5use parking_lot::RwLock;
6use std::collections::{HashMap, VecDeque};
7use std::sync::Arc;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::time::{Duration, Instant};
10
11/// Connection pool configuration
12#[derive(Debug, Clone)]
13pub struct PoolConfig {
14    /// Maximum pool size
15    pub max_size: usize,
16    /// Minimum idle connections
17    pub min_idle: usize,
18    /// Maximum idle time before eviction (seconds)
19    pub max_idle_time_secs: u64,
20    /// Pool check interval (seconds)
21    pub check_interval_secs: u64,
22    /// Enable pool
23    pub enabled: bool,
24}
25
26impl Default for PoolConfig {
27    fn default() -> Self {
28        Self {
29            max_size: 1000,
30            min_idle: 10,
31            max_idle_time_secs: 600,
32            check_interval_secs: 60,
33            enabled: true,
34        }
35    }
36}
37
38/// Connection pool entry
39struct PoolEntry {
40    connection: Arc<Connection>,
41    last_used: Instant,
42}
43
44/// Connection pool
45pub struct ConnectionPool {
46    config: PoolConfig,
47    idle_connections: Arc<RwLock<VecDeque<PoolEntry>>>,
48    active_connections: Arc<RwLock<HashMap<ConnectionId, Arc<Connection>>>>,
49    stats: Arc<PoolStatistics>,
50}
51
52/// Pool statistics
53struct PoolStatistics {
54    acquisitions: AtomicU64,
55    releases: AtomicU64,
56    evictions: AtomicU64,
57    creation_failures: AtomicU64,
58}
59
60impl ConnectionPool {
61    /// Create a new connection pool
62    pub fn new(config: PoolConfig) -> Self {
63        Self {
64            config,
65            idle_connections: Arc::new(RwLock::new(VecDeque::new())),
66            active_connections: Arc::new(RwLock::new(HashMap::new())),
67            stats: Arc::new(PoolStatistics {
68                acquisitions: AtomicU64::new(0),
69                releases: AtomicU64::new(0),
70                evictions: AtomicU64::new(0),
71                creation_failures: AtomicU64::new(0),
72            }),
73        }
74    }
75
76    /// Acquire a connection from the pool
77    pub async fn acquire(&self) -> Option<Arc<Connection>> {
78        if !self.config.enabled {
79            return None;
80        }
81
82        self.stats.acquisitions.fetch_add(1, Ordering::Relaxed);
83
84        // Try to get from idle pool
85        loop {
86            let entry = {
87                let mut idle = self.idle_connections.write();
88                idle.pop_front()
89            };
90
91            if let Some(entry) = entry {
92                // Check if connection is still valid
93                if self.is_connection_valid(&entry).await {
94                    let conn = entry.connection.clone();
95
96                    // Move to active
97                    let mut active = self.active_connections.write();
98                    active.insert(conn.id(), conn.clone());
99
100                    return Some(conn);
101                }
102            } else {
103                break;
104            }
105        }
106
107        None
108    }
109
110    /// Release a connection back to the pool
111    pub async fn release(&self, connection: Arc<Connection>) -> Result<()> {
112        if !self.config.enabled {
113            return Ok(());
114        }
115
116        self.stats.releases.fetch_add(1, Ordering::Relaxed);
117
118        let id = connection.id();
119
120        // Remove from active
121        {
122            let mut active = self.active_connections.write();
123            active.remove(&id);
124        }
125
126        // Add to idle if pool is not full
127        let should_close = {
128            let mut idle = self.idle_connections.write();
129            if idle.len() < self.config.max_size {
130                idle.push_back(PoolEntry {
131                    connection: connection.clone(),
132                    last_used: Instant::now(),
133                });
134                false
135            } else {
136                true
137            }
138        };
139
140        if should_close {
141            // Pool is full, close the connection
142            connection.close().await?;
143        }
144
145        Ok(())
146    }
147
148    /// Evict idle connections
149    pub async fn evict_idle(&self) -> Result<usize> {
150        if !self.config.enabled {
151            return Ok(0);
152        }
153
154        let max_idle = Duration::from_secs(self.config.max_idle_time_secs);
155
156        // First, collect entries to evict while holding the lock
157        let entries_to_evict: Vec<PoolEntry> = {
158            let mut idle = self.idle_connections.write();
159            let mut retained = VecDeque::new();
160            let mut to_evict = Vec::new();
161
162            while let Some(entry) = idle.pop_front() {
163                if entry.last_used.elapsed() > max_idle {
164                    to_evict.push(entry);
165                } else {
166                    retained.push_back(entry);
167                }
168            }
169
170            *idle = retained;
171            to_evict
172        };
173
174        // Now close the connections without holding the lock
175        let mut evicted = 0;
176        for entry in entries_to_evict {
177            if let Err(e) = entry.connection.close().await {
178                tracing::error!("Failed to close evicted connection: {}", e);
179            }
180            evicted += 1;
181            self.stats.evictions.fetch_add(1, Ordering::Relaxed);
182        }
183
184        Ok(evicted)
185    }
186
187    /// Maintain pool size
188    pub async fn maintain(&self) -> Result<()> {
189        if !self.config.enabled {
190            return Ok(());
191        }
192
193        // Evict idle connections
194        self.evict_idle().await?;
195
196        Ok(())
197    }
198
199    /// Check if a connection is still valid
200    async fn is_connection_valid(&self, entry: &PoolEntry) -> bool {
201        // Check if too old
202        let max_idle = Duration::from_secs(self.config.max_idle_time_secs);
203        if entry.last_used.elapsed() > max_idle {
204            return false;
205        }
206
207        // Check connection state
208        matches!(
209            entry.connection.state().await,
210            crate::server::connection::ConnectionState::Connected
211        )
212    }
213
214    /// Get pool statistics
215    pub fn stats(&self) -> PoolStats {
216        let idle = self.idle_connections.read();
217        let active = self.active_connections.read();
218
219        PoolStats {
220            idle_connections: idle.len(),
221            active_connections: active.len(),
222            total_acquisitions: self.stats.acquisitions.load(Ordering::Relaxed),
223            total_releases: self.stats.releases.load(Ordering::Relaxed),
224            total_evictions: self.stats.evictions.load(Ordering::Relaxed),
225            total_creation_failures: self.stats.creation_failures.load(Ordering::Relaxed),
226        }
227    }
228
229    /// Get idle connection count
230    pub fn idle_count(&self) -> usize {
231        self.idle_connections.read().len()
232    }
233
234    /// Get active connection count
235    pub fn active_count(&self) -> usize {
236        self.active_connections.read().len()
237    }
238
239    /// Clear all idle connections
240    pub async fn clear_idle(&self) -> Result<usize> {
241        let entries: Vec<PoolEntry> = {
242            let mut idle = self.idle_connections.write();
243            idle.drain(..).collect()
244        };
245
246        let count = entries.len();
247
248        for entry in entries {
249            if let Err(e) = entry.connection.close().await {
250                tracing::error!("Failed to close connection during clear: {}", e);
251            }
252        }
253
254        Ok(count)
255    }
256
257    /// Shutdown the pool
258    pub async fn shutdown(&self) -> Result<()> {
259        // Close all idle connections
260        self.clear_idle().await?;
261
262        // Close all active connections
263        let connections: Vec<_> = {
264            let active = self.active_connections.write();
265            active.values().cloned().collect()
266        };
267
268        for conn in connections {
269            if let Err(e) = conn.close().await {
270                tracing::error!("Failed to close connection during shutdown: {}", e);
271            }
272        }
273
274        self.active_connections.write().clear();
275
276        Ok(())
277    }
278}
279
280/// Pool statistics snapshot
281#[derive(Debug, Clone)]
282pub struct PoolStats {
283    /// Idle connections
284    pub idle_connections: usize,
285    /// Active connections
286    pub active_connections: usize,
287    /// Total acquisitions
288    pub total_acquisitions: u64,
289    /// Total releases
290    pub total_releases: u64,
291    /// Total evictions
292    pub total_evictions: u64,
293    /// Total creation failures
294    pub total_creation_failures: u64,
295}
296
297#[cfg(test)]
298mod tests {
299    use super::*;
300
301    #[test]
302    fn test_pool_config_default() {
303        let config = PoolConfig::default();
304        assert!(config.enabled);
305        assert_eq!(config.max_size, 1000);
306        assert_eq!(config.min_idle, 10);
307    }
308
309    #[test]
310    fn test_pool_creation() {
311        let config = PoolConfig::default();
312        let pool = ConnectionPool::new(config);
313
314        assert_eq!(pool.idle_count(), 0);
315        assert_eq!(pool.active_count(), 0);
316    }
317
318    #[test]
319    fn test_pool_stats() {
320        let config = PoolConfig::default();
321        let pool = ConnectionPool::new(config);
322
323        let stats = pool.stats();
324        assert_eq!(stats.idle_connections, 0);
325        assert_eq!(stats.active_connections, 0);
326        assert_eq!(stats.total_acquisitions, 0);
327    }
328
329    #[tokio::test]
330    async fn test_pool_disabled() {
331        let config = PoolConfig {
332            enabled: false,
333            ..Default::default()
334        };
335        let pool = ConnectionPool::new(config);
336
337        let conn = pool.acquire().await;
338        assert!(conn.is_none());
339    }
340}