1use std::collections::VecDeque;
25use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
26use std::time::{Duration, Instant};
27
28use parking_lot::Mutex;
29use sentinel_agent_protocol::AgentClient;
30use tracing::{debug, trace, warn};
31
32pub struct AgentConnectionPool {
37 max_connections: usize,
39 min_idle: usize,
40 max_idle: usize,
41 idle_timeout: Duration,
42 connections: Mutex<VecDeque<PooledConnection>>,
44 active_count: AtomicU32,
46 total_created: AtomicU64,
48 total_returned: AtomicU64,
50 total_acquired: AtomicU64,
52 total_timed_out: AtomicU64,
54}
55
56pub struct PooledConnection {
58 pub client: AgentClient,
60 pub created_at: Instant,
62 pub last_returned: Instant,
64 pub use_count: u64,
66}
67
68impl PooledConnection {
69 pub fn new(client: AgentClient) -> Self {
71 let now = Instant::now();
72 Self {
73 client,
74 created_at: now,
75 last_returned: now,
76 use_count: 0,
77 }
78 }
79
80 #[inline]
82 pub fn is_expired(&self, idle_timeout: Duration) -> bool {
83 self.last_returned.elapsed() > idle_timeout
84 }
85}
86
87impl AgentConnectionPool {
88 pub fn new(
97 max_connections: usize,
98 min_idle: usize,
99 max_idle: usize,
100 idle_timeout: Duration,
101 ) -> Self {
102 trace!(
103 max_connections = max_connections,
104 min_idle = min_idle,
105 max_idle = max_idle,
106 idle_timeout_secs = idle_timeout.as_secs(),
107 "Creating agent connection pool"
108 );
109
110 debug!(
111 max_connections = max_connections,
112 "Agent connection pool initialized"
113 );
114
115 Self {
116 max_connections,
117 min_idle,
118 max_idle,
119 idle_timeout,
120 connections: Mutex::new(VecDeque::with_capacity(max_idle)),
121 active_count: AtomicU32::new(0),
122 total_created: AtomicU64::new(0),
123 total_returned: AtomicU64::new(0),
124 total_acquired: AtomicU64::new(0),
125 total_timed_out: AtomicU64::new(0),
126 }
127 }
128
129 #[inline]
138 pub fn try_get(&self) -> Option<PooledConnection> {
139 let mut pool = self.connections.lock();
140
141 while let Some(conn) = pool.front() {
143 if conn.is_expired(self.idle_timeout) {
144 pool.pop_front();
145 self.total_timed_out.fetch_add(1, Ordering::Relaxed);
146 trace!("Evicted expired connection from pool");
147 } else {
148 break;
149 }
150 }
151
152 if let Some(mut conn) = pool.pop_back() {
155 self.active_count.fetch_add(1, Ordering::Relaxed);
156 self.total_acquired.fetch_add(1, Ordering::Relaxed);
157 conn.use_count += 1;
158 trace!(
159 pool_size = pool.len(),
160 use_count = conn.use_count,
161 "Acquired connection from pool"
162 );
163 return Some(conn);
164 }
165
166 None
167 }
168
169 #[inline]
177 pub fn return_connection(&self, mut conn: PooledConnection) {
178 self.active_count.fetch_sub(1, Ordering::Relaxed);
179 conn.last_returned = Instant::now();
180
181 let mut pool = self.connections.lock();
182
183 if pool.len() >= self.max_idle {
185 trace!(
186 pool_size = pool.len(),
187 max_idle = self.max_idle,
188 "Pool at capacity, dropping connection"
189 );
190 return;
192 }
193
194 pool.push_back(conn);
196 self.total_returned.fetch_add(1, Ordering::Relaxed);
197
198 trace!(pool_size = pool.len(), "Returned connection to pool");
199 }
200
201 #[inline]
205 pub fn can_create(&self) -> bool {
206 let active = self.active_count.load(Ordering::Relaxed) as usize;
207 let idle = self.connections.lock().len();
208 active + idle < self.max_connections
209 }
210
211 #[inline]
215 pub fn register_created(&self) {
216 self.active_count.fetch_add(1, Ordering::Relaxed);
217 self.total_created.fetch_add(1, Ordering::Relaxed);
218 }
219
220 #[inline]
224 pub fn mark_failed(&self) {
225 self.active_count.fetch_sub(1, Ordering::Relaxed);
226 }
227
228 #[inline]
230 pub fn active_count(&self) -> u32 {
231 self.active_count.load(Ordering::Relaxed)
232 }
233
234 #[inline]
236 pub fn idle_count(&self) -> usize {
237 self.connections.lock().len()
238 }
239
240 #[inline]
242 pub fn total_created(&self) -> u64 {
243 self.total_created.load(Ordering::Relaxed)
244 }
245
246 #[inline]
248 pub fn total_acquired(&self) -> u64 {
249 self.total_acquired.load(Ordering::Relaxed)
250 }
251
252 #[inline]
254 pub fn total_returned(&self) -> u64 {
255 self.total_returned.load(Ordering::Relaxed)
256 }
257
258 pub fn stats(&self) -> PoolStats {
260 PoolStats {
261 active: self.active_count.load(Ordering::Relaxed),
262 idle: self.connections.lock().len() as u32,
263 total_created: self.total_created.load(Ordering::Relaxed),
264 total_acquired: self.total_acquired.load(Ordering::Relaxed),
265 total_returned: self.total_returned.load(Ordering::Relaxed),
266 total_timed_out: self.total_timed_out.load(Ordering::Relaxed),
267 max_connections: self.max_connections as u32,
268 max_idle: self.max_idle as u32,
269 }
270 }
271
272 pub fn clear(&self) {
276 let mut pool = self.connections.lock();
277 let count = pool.len();
278 pool.clear();
279 debug!(evicted = count, "Cleared all connections from pool");
280 }
281
282 pub fn evict_expired(&self) -> usize {
286 let mut pool = self.connections.lock();
287 let before = pool.len();
288
289 pool.retain(|conn| !conn.is_expired(self.idle_timeout));
290
291 let evicted = before - pool.len();
292 if evicted > 0 {
293 self.total_timed_out
294 .fetch_add(evicted as u64, Ordering::Relaxed);
295 debug!(evicted = evicted, "Evicted expired connections");
296 }
297 evicted
298 }
299}
300
301#[derive(Debug, Clone, Copy)]
303pub struct PoolStats {
304 pub active: u32,
306 pub idle: u32,
308 pub total_created: u64,
310 pub total_acquired: u64,
312 pub total_returned: u64,
314 pub total_timed_out: u64,
316 pub max_connections: u32,
318 pub max_idle: u32,
320}
321
322impl PoolStats {
323 pub fn hit_rate(&self) -> f64 {
325 if self.total_acquired == 0 {
326 return 0.0;
327 }
328 self.total_acquired as f64 / (self.total_acquired + self.total_created) as f64
329 }
330
331 pub fn utilization(&self) -> f64 {
333 if self.max_connections == 0 {
334 return 0.0;
335 }
336 self.active as f64 / self.max_connections as f64
337 }
338}
339
340#[cfg(test)]
341mod tests {
342 use super::*;
343
344 fn mock_client() -> AgentClient {
346 unimplemented!("Tests require integration setup")
349 }
350
351 #[test]
352 fn test_pool_creation() {
353 let pool = AgentConnectionPool::new(8, 2, 4, Duration::from_secs(60));
354 assert_eq!(pool.active_count(), 0);
355 assert_eq!(pool.idle_count(), 0);
356 assert_eq!(pool.total_created(), 0);
357 }
358
359 #[test]
360 fn test_pool_can_create() {
361 let pool = AgentConnectionPool::new(2, 0, 2, Duration::from_secs(60));
362 assert!(pool.can_create());
363
364 pool.register_created();
366 assert!(pool.can_create());
367
368 pool.register_created();
369 assert!(!pool.can_create()); }
371
372 #[test]
373 fn test_pool_stats() {
374 let pool = AgentConnectionPool::new(8, 2, 4, Duration::from_secs(60));
375 let stats = pool.stats();
376 assert_eq!(stats.active, 0);
377 assert_eq!(stats.idle, 0);
378 assert_eq!(stats.max_connections, 8);
379 assert_eq!(stats.max_idle, 4);
380 }
381
382 #[test]
383 fn test_try_get_empty_pool() {
384 let pool = AgentConnectionPool::new(8, 2, 4, Duration::from_secs(60));
385 assert!(pool.try_get().is_none());
386 }
387
388 #[test]
389 fn test_hit_rate() {
390 let stats = PoolStats {
391 active: 2,
392 idle: 3,
393 total_created: 10,
394 total_acquired: 90,
395 total_returned: 88,
396 total_timed_out: 2,
397 max_connections: 20,
398 max_idle: 10,
399 };
400
401 assert!((stats.hit_rate() - 0.9).abs() < 0.01);
403
404 assert!((stats.utilization() - 0.1).abs() < 0.01);
406 }
407}