oxigdal_websocket/server/
pool.rs1use crate::error::Result;
4use crate::server::connection::{Connection, ConnectionId};
5use parking_lot::RwLock;
6use std::collections::{HashMap, VecDeque};
7use std::sync::Arc;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::time::{Duration, Instant};
10
11#[derive(Debug, Clone)]
13pub struct PoolConfig {
14 pub max_size: usize,
16 pub min_idle: usize,
18 pub max_idle_time_secs: u64,
20 pub check_interval_secs: u64,
22 pub enabled: bool,
24}
25
26impl Default for PoolConfig {
27 fn default() -> Self {
28 Self {
29 max_size: 1000,
30 min_idle: 10,
31 max_idle_time_secs: 600,
32 check_interval_secs: 60,
33 enabled: true,
34 }
35 }
36}
37
38struct PoolEntry {
40 connection: Arc<Connection>,
41 last_used: Instant,
42}
43
44pub struct ConnectionPool {
46 config: PoolConfig,
47 idle_connections: Arc<RwLock<VecDeque<PoolEntry>>>,
48 active_connections: Arc<RwLock<HashMap<ConnectionId, Arc<Connection>>>>,
49 stats: Arc<PoolStatistics>,
50}
51
52struct PoolStatistics {
54 acquisitions: AtomicU64,
55 releases: AtomicU64,
56 evictions: AtomicU64,
57 creation_failures: AtomicU64,
58}
59
60impl ConnectionPool {
61 pub fn new(config: PoolConfig) -> Self {
63 Self {
64 config,
65 idle_connections: Arc::new(RwLock::new(VecDeque::new())),
66 active_connections: Arc::new(RwLock::new(HashMap::new())),
67 stats: Arc::new(PoolStatistics {
68 acquisitions: AtomicU64::new(0),
69 releases: AtomicU64::new(0),
70 evictions: AtomicU64::new(0),
71 creation_failures: AtomicU64::new(0),
72 }),
73 }
74 }
75
76 pub async fn acquire(&self) -> Option<Arc<Connection>> {
78 if !self.config.enabled {
79 return None;
80 }
81
82 self.stats.acquisitions.fetch_add(1, Ordering::Relaxed);
83
84 loop {
86 let entry = {
87 let mut idle = self.idle_connections.write();
88 idle.pop_front()
89 };
90
91 if let Some(entry) = entry {
92 if self.is_connection_valid(&entry).await {
94 let conn = entry.connection.clone();
95
96 let mut active = self.active_connections.write();
98 active.insert(conn.id(), conn.clone());
99
100 return Some(conn);
101 }
102 } else {
103 break;
104 }
105 }
106
107 None
108 }
109
110 pub async fn release(&self, connection: Arc<Connection>) -> Result<()> {
112 if !self.config.enabled {
113 return Ok(());
114 }
115
116 self.stats.releases.fetch_add(1, Ordering::Relaxed);
117
118 let id = connection.id();
119
120 {
122 let mut active = self.active_connections.write();
123 active.remove(&id);
124 }
125
126 let should_close = {
128 let mut idle = self.idle_connections.write();
129 if idle.len() < self.config.max_size {
130 idle.push_back(PoolEntry {
131 connection: connection.clone(),
132 last_used: Instant::now(),
133 });
134 false
135 } else {
136 true
137 }
138 };
139
140 if should_close {
141 connection.close().await?;
143 }
144
145 Ok(())
146 }
147
148 pub async fn evict_idle(&self) -> Result<usize> {
150 if !self.config.enabled {
151 return Ok(0);
152 }
153
154 let max_idle = Duration::from_secs(self.config.max_idle_time_secs);
155
156 let entries_to_evict: Vec<PoolEntry> = {
158 let mut idle = self.idle_connections.write();
159 let mut retained = VecDeque::new();
160 let mut to_evict = Vec::new();
161
162 while let Some(entry) = idle.pop_front() {
163 if entry.last_used.elapsed() > max_idle {
164 to_evict.push(entry);
165 } else {
166 retained.push_back(entry);
167 }
168 }
169
170 *idle = retained;
171 to_evict
172 };
173
174 let mut evicted = 0;
176 for entry in entries_to_evict {
177 if let Err(e) = entry.connection.close().await {
178 tracing::error!("Failed to close evicted connection: {}", e);
179 }
180 evicted += 1;
181 self.stats.evictions.fetch_add(1, Ordering::Relaxed);
182 }
183
184 Ok(evicted)
185 }
186
187 pub async fn maintain(&self) -> Result<()> {
189 if !self.config.enabled {
190 return Ok(());
191 }
192
193 self.evict_idle().await?;
195
196 Ok(())
197 }
198
199 async fn is_connection_valid(&self, entry: &PoolEntry) -> bool {
201 let max_idle = Duration::from_secs(self.config.max_idle_time_secs);
203 if entry.last_used.elapsed() > max_idle {
204 return false;
205 }
206
207 matches!(
209 entry.connection.state().await,
210 crate::server::connection::ConnectionState::Connected
211 )
212 }
213
214 pub fn stats(&self) -> PoolStats {
216 let idle = self.idle_connections.read();
217 let active = self.active_connections.read();
218
219 PoolStats {
220 idle_connections: idle.len(),
221 active_connections: active.len(),
222 total_acquisitions: self.stats.acquisitions.load(Ordering::Relaxed),
223 total_releases: self.stats.releases.load(Ordering::Relaxed),
224 total_evictions: self.stats.evictions.load(Ordering::Relaxed),
225 total_creation_failures: self.stats.creation_failures.load(Ordering::Relaxed),
226 }
227 }
228
229 pub fn idle_count(&self) -> usize {
231 self.idle_connections.read().len()
232 }
233
234 pub fn active_count(&self) -> usize {
236 self.active_connections.read().len()
237 }
238
239 pub async fn clear_idle(&self) -> Result<usize> {
241 let entries: Vec<PoolEntry> = {
242 let mut idle = self.idle_connections.write();
243 idle.drain(..).collect()
244 };
245
246 let count = entries.len();
247
248 for entry in entries {
249 if let Err(e) = entry.connection.close().await {
250 tracing::error!("Failed to close connection during clear: {}", e);
251 }
252 }
253
254 Ok(count)
255 }
256
257 pub async fn shutdown(&self) -> Result<()> {
259 self.clear_idle().await?;
261
262 let connections: Vec<_> = {
264 let active = self.active_connections.write();
265 active.values().cloned().collect()
266 };
267
268 for conn in connections {
269 if let Err(e) = conn.close().await {
270 tracing::error!("Failed to close connection during shutdown: {}", e);
271 }
272 }
273
274 self.active_connections.write().clear();
275
276 Ok(())
277 }
278}
279
280#[derive(Debug, Clone)]
282pub struct PoolStats {
283 pub idle_connections: usize,
285 pub active_connections: usize,
287 pub total_acquisitions: u64,
289 pub total_releases: u64,
291 pub total_evictions: u64,
293 pub total_creation_failures: u64,
295}
296
297#[cfg(test)]
298mod tests {
299 use super::*;
300
301 #[test]
302 fn test_pool_config_default() {
303 let config = PoolConfig::default();
304 assert!(config.enabled);
305 assert_eq!(config.max_size, 1000);
306 assert_eq!(config.min_idle, 10);
307 }
308
309 #[test]
310 fn test_pool_creation() {
311 let config = PoolConfig::default();
312 let pool = ConnectionPool::new(config);
313
314 assert_eq!(pool.idle_count(), 0);
315 assert_eq!(pool.active_count(), 0);
316 }
317
318 #[test]
319 fn test_pool_stats() {
320 let config = PoolConfig::default();
321 let pool = ConnectionPool::new(config);
322
323 let stats = pool.stats();
324 assert_eq!(stats.idle_connections, 0);
325 assert_eq!(stats.active_connections, 0);
326 assert_eq!(stats.total_acquisitions, 0);
327 }
328
329 #[tokio::test]
330 async fn test_pool_disabled() {
331 let config = PoolConfig {
332 enabled: false,
333 ..Default::default()
334 };
335 let pool = ConnectionPool::new(config);
336
337 let conn = pool.acquire().await;
338 assert!(conn.is_none());
339 }
340}