1use std::collections::VecDeque;
25use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
26use std::time::{Duration, Instant};
27
28use parking_lot::Mutex;
29use sentinel_agent_protocol::AgentClient;
30use tracing::{debug, trace, warn};
31
32pub struct AgentConnectionPool {
37 max_connections: usize,
39 min_idle: usize,
40 max_idle: usize,
41 idle_timeout: Duration,
42 connections: Mutex<VecDeque<PooledConnection>>,
44 active_count: AtomicU32,
46 total_created: AtomicU64,
48 total_returned: AtomicU64,
50 total_acquired: AtomicU64,
52 total_timed_out: AtomicU64,
54}
55
56pub struct PooledConnection {
58 pub client: AgentClient,
60 pub created_at: Instant,
62 pub last_returned: Instant,
64 pub use_count: u64,
66}
67
68impl PooledConnection {
69 pub fn new(client: AgentClient) -> Self {
71 let now = Instant::now();
72 Self {
73 client,
74 created_at: now,
75 last_returned: now,
76 use_count: 0,
77 }
78 }
79
80 #[inline]
82 pub fn is_expired(&self, idle_timeout: Duration) -> bool {
83 self.last_returned.elapsed() > idle_timeout
84 }
85}
86
87impl AgentConnectionPool {
88 pub fn new(
97 max_connections: usize,
98 min_idle: usize,
99 max_idle: usize,
100 idle_timeout: Duration,
101 ) -> Self {
102 trace!(
103 max_connections = max_connections,
104 min_idle = min_idle,
105 max_idle = max_idle,
106 idle_timeout_secs = idle_timeout.as_secs(),
107 "Creating agent connection pool"
108 );
109
110 debug!(
111 max_connections = max_connections,
112 "Agent connection pool initialized"
113 );
114
115 Self {
116 max_connections,
117 min_idle,
118 max_idle,
119 idle_timeout,
120 connections: Mutex::new(VecDeque::with_capacity(max_idle)),
121 active_count: AtomicU32::new(0),
122 total_created: AtomicU64::new(0),
123 total_returned: AtomicU64::new(0),
124 total_acquired: AtomicU64::new(0),
125 total_timed_out: AtomicU64::new(0),
126 }
127 }
128
129 #[inline]
138 pub fn try_get(&self) -> Option<PooledConnection> {
139 let mut pool = self.connections.lock();
140
141 while let Some(conn) = pool.front() {
143 if conn.is_expired(self.idle_timeout) {
144 pool.pop_front();
145 self.total_timed_out.fetch_add(1, Ordering::Relaxed);
146 trace!("Evicted expired connection from pool");
147 } else {
148 break;
149 }
150 }
151
152 if let Some(mut conn) = pool.pop_back() {
155 self.active_count.fetch_add(1, Ordering::Relaxed);
156 self.total_acquired.fetch_add(1, Ordering::Relaxed);
157 conn.use_count += 1;
158 trace!(
159 pool_size = pool.len(),
160 use_count = conn.use_count,
161 "Acquired connection from pool"
162 );
163 return Some(conn);
164 }
165
166 None
167 }
168
169 #[inline]
177 pub fn return_connection(&self, mut conn: PooledConnection) {
178 self.active_count.fetch_sub(1, Ordering::Relaxed);
179 conn.last_returned = Instant::now();
180
181 let mut pool = self.connections.lock();
182
183 if pool.len() >= self.max_idle {
185 trace!(
186 pool_size = pool.len(),
187 max_idle = self.max_idle,
188 "Pool at capacity, dropping connection"
189 );
190 return;
192 }
193
194 pool.push_back(conn);
196 self.total_returned.fetch_add(1, Ordering::Relaxed);
197
198 trace!(pool_size = pool.len(), "Returned connection to pool");
199 }
200
201 #[inline]
205 pub fn can_create(&self) -> bool {
206 let active = self.active_count.load(Ordering::Relaxed) as usize;
207 let idle = self.connections.lock().len();
208 active + idle < self.max_connections
209 }
210
211 #[inline]
215 pub fn register_created(&self) {
216 self.active_count.fetch_add(1, Ordering::Relaxed);
217 self.total_created.fetch_add(1, Ordering::Relaxed);
218 }
219
220 #[inline]
224 pub fn mark_failed(&self) {
225 self.active_count.fetch_sub(1, Ordering::Relaxed);
226 }
227
228 #[inline]
230 pub fn active_count(&self) -> u32 {
231 self.active_count.load(Ordering::Relaxed)
232 }
233
234 #[inline]
236 pub fn idle_count(&self) -> usize {
237 self.connections.lock().len()
238 }
239
240 #[inline]
242 pub fn total_created(&self) -> u64 {
243 self.total_created.load(Ordering::Relaxed)
244 }
245
246 #[inline]
248 pub fn total_acquired(&self) -> u64 {
249 self.total_acquired.load(Ordering::Relaxed)
250 }
251
252 #[inline]
254 pub fn total_returned(&self) -> u64 {
255 self.total_returned.load(Ordering::Relaxed)
256 }
257
258 pub fn stats(&self) -> PoolStats {
260 PoolStats {
261 active: self.active_count.load(Ordering::Relaxed),
262 idle: self.connections.lock().len() as u32,
263 total_created: self.total_created.load(Ordering::Relaxed),
264 total_acquired: self.total_acquired.load(Ordering::Relaxed),
265 total_returned: self.total_returned.load(Ordering::Relaxed),
266 total_timed_out: self.total_timed_out.load(Ordering::Relaxed),
267 max_connections: self.max_connections as u32,
268 max_idle: self.max_idle as u32,
269 }
270 }
271
272 pub fn clear(&self) {
276 let mut pool = self.connections.lock();
277 let count = pool.len();
278 pool.clear();
279 debug!(evicted = count, "Cleared all connections from pool");
280 }
281
282 pub fn evict_expired(&self) -> usize {
286 let mut pool = self.connections.lock();
287 let before = pool.len();
288
289 pool.retain(|conn| !conn.is_expired(self.idle_timeout));
290
291 let evicted = before - pool.len();
292 if evicted > 0 {
293 self.total_timed_out.fetch_add(evicted as u64, Ordering::Relaxed);
294 debug!(evicted = evicted, "Evicted expired connections");
295 }
296 evicted
297 }
298}
299
300#[derive(Debug, Clone, Copy)]
302pub struct PoolStats {
303 pub active: u32,
305 pub idle: u32,
307 pub total_created: u64,
309 pub total_acquired: u64,
311 pub total_returned: u64,
313 pub total_timed_out: u64,
315 pub max_connections: u32,
317 pub max_idle: u32,
319}
320
321impl PoolStats {
322 pub fn hit_rate(&self) -> f64 {
324 if self.total_acquired == 0 {
325 return 0.0;
326 }
327 self.total_acquired as f64 / (self.total_acquired + self.total_created) as f64
328 }
329
330 pub fn utilization(&self) -> f64 {
332 if self.max_connections == 0 {
333 return 0.0;
334 }
335 self.active as f64 / self.max_connections as f64
336 }
337}
338
339#[cfg(test)]
340mod tests {
341 use super::*;
342
343 fn mock_client() -> AgentClient {
345 unimplemented!("Tests require integration setup")
348 }
349
350 #[test]
351 fn test_pool_creation() {
352 let pool = AgentConnectionPool::new(8, 2, 4, Duration::from_secs(60));
353 assert_eq!(pool.active_count(), 0);
354 assert_eq!(pool.idle_count(), 0);
355 assert_eq!(pool.total_created(), 0);
356 }
357
358 #[test]
359 fn test_pool_can_create() {
360 let pool = AgentConnectionPool::new(2, 0, 2, Duration::from_secs(60));
361 assert!(pool.can_create());
362
363 pool.register_created();
365 assert!(pool.can_create());
366
367 pool.register_created();
368 assert!(!pool.can_create()); }
370
371 #[test]
372 fn test_pool_stats() {
373 let pool = AgentConnectionPool::new(8, 2, 4, Duration::from_secs(60));
374 let stats = pool.stats();
375 assert_eq!(stats.active, 0);
376 assert_eq!(stats.idle, 0);
377 assert_eq!(stats.max_connections, 8);
378 assert_eq!(stats.max_idle, 4);
379 }
380
381 #[test]
382 fn test_try_get_empty_pool() {
383 let pool = AgentConnectionPool::new(8, 2, 4, Duration::from_secs(60));
384 assert!(pool.try_get().is_none());
385 }
386
387 #[test]
388 fn test_hit_rate() {
389 let stats = PoolStats {
390 active: 2,
391 idle: 3,
392 total_created: 10,
393 total_acquired: 90,
394 total_returned: 88,
395 total_timed_out: 2,
396 max_connections: 20,
397 max_idle: 10,
398 };
399
400 assert!((stats.hit_rate() - 0.9).abs() < 0.01);
402
403 assert!((stats.utilization() - 0.1).abs() < 0.01);
405 }
406}