sentinel_proxy/agents/
pool.rs

1//! Agent connection pooling.
2//!
3//! This module provides connection pooling for agent clients. Instead of serializing
4//! all requests through a single connection, the pool maintains multiple connections
5//! and distributes requests across them.
6//!
7//! # Performance
8//!
9//! The pool uses `parking_lot::Mutex` for fast, low-contention access. Pool operations
10//! (get/return) complete in ~100-500ns, much faster than tokio's async mutex.
11//!
12//! # Usage
13//!
14//! ```ignore
15//! let pool = AgentConnectionPool::new(8, 2, 4, Duration::from_secs(60));
16//!
17//! // Get a connection (creates new if pool empty)
18//! if let Some(conn) = pool.try_get() {
19//!     // Use connection...
20//!     pool.return_connection(conn);
21//! }
22//! ```
23
24use std::collections::VecDeque;
25use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
26use std::time::{Duration, Instant};
27
28use parking_lot::Mutex;
29use sentinel_agent_protocol::AgentClient;
30use tracing::{debug, trace, warn};
31
32/// Agent connection pool for efficient connection reuse.
33///
34/// The pool maintains a set of idle connections that can be reused across requests,
35/// avoiding the overhead of creating a new connection for each request.
36pub struct AgentConnectionPool {
37    /// Pool configuration
38    max_connections: usize,
39    min_idle: usize,
40    max_idle: usize,
41    idle_timeout: Duration,
42    /// Available connections (fast sync mutex)
43    connections: Mutex<VecDeque<PooledConnection>>,
44    /// Active connections count (connections currently in use)
45    active_count: AtomicU32,
46    /// Total connections created over lifetime
47    total_created: AtomicU64,
48    /// Total connections returned to pool
49    total_returned: AtomicU64,
50    /// Total connections acquired from pool
51    total_acquired: AtomicU64,
52    /// Total connections that timed out (evicted due to idle timeout)
53    total_timed_out: AtomicU64,
54}
55
56/// A pooled agent connection with metadata.
57pub struct PooledConnection {
58    /// The actual client
59    pub client: AgentClient,
60    /// Creation time
61    pub created_at: Instant,
62    /// Last time this connection was returned to the pool
63    pub last_returned: Instant,
64    /// Number of times this connection has been used
65    pub use_count: u64,
66}
67
68impl PooledConnection {
69    /// Create a new pooled connection.
70    pub fn new(client: AgentClient) -> Self {
71        let now = Instant::now();
72        Self {
73            client,
74            created_at: now,
75            last_returned: now,
76            use_count: 0,
77        }
78    }
79
80    /// Check if this connection has exceeded the idle timeout.
81    #[inline]
82    pub fn is_expired(&self, idle_timeout: Duration) -> bool {
83        self.last_returned.elapsed() > idle_timeout
84    }
85}
86
87impl AgentConnectionPool {
88    /// Create a new connection pool.
89    ///
90    /// # Arguments
91    ///
92    /// * `max_connections` - Maximum total connections (active + idle)
93    /// * `min_idle` - Minimum idle connections to maintain
94    /// * `max_idle` - Maximum idle connections to keep in pool
95    /// * `idle_timeout` - How long an idle connection can stay in the pool
96    pub fn new(
97        max_connections: usize,
98        min_idle: usize,
99        max_idle: usize,
100        idle_timeout: Duration,
101    ) -> Self {
102        trace!(
103            max_connections = max_connections,
104            min_idle = min_idle,
105            max_idle = max_idle,
106            idle_timeout_secs = idle_timeout.as_secs(),
107            "Creating agent connection pool"
108        );
109
110        debug!(
111            max_connections = max_connections,
112            "Agent connection pool initialized"
113        );
114
115        Self {
116            max_connections,
117            min_idle,
118            max_idle,
119            idle_timeout,
120            connections: Mutex::new(VecDeque::with_capacity(max_idle)),
121            active_count: AtomicU32::new(0),
122            total_created: AtomicU64::new(0),
123            total_returned: AtomicU64::new(0),
124            total_acquired: AtomicU64::new(0),
125            total_timed_out: AtomicU64::new(0),
126        }
127    }
128
129    /// Try to get a connection from the pool.
130    ///
131    /// Returns `Some(connection)` if an idle connection is available,
132    /// `None` if the pool is empty (caller should create a new connection).
133    ///
134    /// # Performance
135    ///
136    /// This operation is O(1) and completes in ~100-500ns.
137    #[inline]
138    pub fn try_get(&self) -> Option<PooledConnection> {
139        let mut pool = self.connections.lock();
140
141        // Evict expired connections from the front (oldest first)
142        while let Some(conn) = pool.front() {
143            if conn.is_expired(self.idle_timeout) {
144                pool.pop_front();
145                self.total_timed_out.fetch_add(1, Ordering::Relaxed);
146                trace!("Evicted expired connection from pool");
147            } else {
148                break;
149            }
150        }
151
152        // Get the most recently returned connection (from the back)
153        // This provides better cache locality and keeps connections warm
154        if let Some(mut conn) = pool.pop_back() {
155            self.active_count.fetch_add(1, Ordering::Relaxed);
156            self.total_acquired.fetch_add(1, Ordering::Relaxed);
157            conn.use_count += 1;
158            trace!(
159                pool_size = pool.len(),
160                use_count = conn.use_count,
161                "Acquired connection from pool"
162            );
163            return Some(conn);
164        }
165
166        None
167    }
168
169    /// Return a connection to the pool.
170    ///
171    /// If the pool is at capacity, the connection is dropped.
172    ///
173    /// # Performance
174    ///
175    /// This operation is O(1) and completes in ~100-500ns.
176    #[inline]
177    pub fn return_connection(&self, mut conn: PooledConnection) {
178        self.active_count.fetch_sub(1, Ordering::Relaxed);
179        conn.last_returned = Instant::now();
180
181        let mut pool = self.connections.lock();
182
183        // Don't exceed max_idle
184        if pool.len() >= self.max_idle {
185            trace!(
186                pool_size = pool.len(),
187                max_idle = self.max_idle,
188                "Pool at capacity, dropping connection"
189            );
190            // Connection will be dropped here
191            return;
192        }
193
194        // Add to the back (most recently used)
195        pool.push_back(conn);
196        self.total_returned.fetch_add(1, Ordering::Relaxed);
197
198        trace!(pool_size = pool.len(), "Returned connection to pool");
199    }
200
201    /// Check if we can create a new connection without exceeding limits.
202    ///
203    /// Returns `true` if a new connection can be created.
204    #[inline]
205    pub fn can_create(&self) -> bool {
206        let active = self.active_count.load(Ordering::Relaxed) as usize;
207        let idle = self.connections.lock().len();
208        active + idle < self.max_connections
209    }
210
211    /// Register that a new connection was created.
212    ///
213    /// Call this after successfully creating a new connection.
214    #[inline]
215    pub fn register_created(&self) {
216        self.active_count.fetch_add(1, Ordering::Relaxed);
217        self.total_created.fetch_add(1, Ordering::Relaxed);
218    }
219
220    /// Mark a connection as failed (without returning it to pool).
221    ///
222    /// Call this when a connection fails and should not be reused.
223    #[inline]
224    pub fn mark_failed(&self) {
225        self.active_count.fetch_sub(1, Ordering::Relaxed);
226    }
227
228    /// Get active connection count (connections currently in use).
229    #[inline]
230    pub fn active_count(&self) -> u32 {
231        self.active_count.load(Ordering::Relaxed)
232    }
233
234    /// Get idle connection count (connections waiting in pool).
235    #[inline]
236    pub fn idle_count(&self) -> usize {
237        self.connections.lock().len()
238    }
239
240    /// Get total connections created over lifetime.
241    #[inline]
242    pub fn total_created(&self) -> u64 {
243        self.total_created.load(Ordering::Relaxed)
244    }
245
246    /// Get total connections acquired from pool.
247    #[inline]
248    pub fn total_acquired(&self) -> u64 {
249        self.total_acquired.load(Ordering::Relaxed)
250    }
251
252    /// Get total connections returned to pool.
253    #[inline]
254    pub fn total_returned(&self) -> u64 {
255        self.total_returned.load(Ordering::Relaxed)
256    }
257
258    /// Get pool statistics.
259    pub fn stats(&self) -> PoolStats {
260        PoolStats {
261            active: self.active_count.load(Ordering::Relaxed),
262            idle: self.connections.lock().len() as u32,
263            total_created: self.total_created.load(Ordering::Relaxed),
264            total_acquired: self.total_acquired.load(Ordering::Relaxed),
265            total_returned: self.total_returned.load(Ordering::Relaxed),
266            total_timed_out: self.total_timed_out.load(Ordering::Relaxed),
267            max_connections: self.max_connections as u32,
268            max_idle: self.max_idle as u32,
269        }
270    }
271
272    /// Evict all idle connections from the pool.
273    ///
274    /// Useful for graceful shutdown or when the agent configuration changes.
275    pub fn clear(&self) {
276        let mut pool = self.connections.lock();
277        let count = pool.len();
278        pool.clear();
279        debug!(evicted = count, "Cleared all connections from pool");
280    }
281
282    /// Evict expired connections from the pool.
283    ///
284    /// Call this periodically to clean up stale connections.
285    pub fn evict_expired(&self) -> usize {
286        let mut pool = self.connections.lock();
287        let before = pool.len();
288
289        pool.retain(|conn| !conn.is_expired(self.idle_timeout));
290
291        let evicted = before - pool.len();
292        if evicted > 0 {
293            self.total_timed_out.fetch_add(evicted as u64, Ordering::Relaxed);
294            debug!(evicted = evicted, "Evicted expired connections");
295        }
296        evicted
297    }
298}
299
300/// Pool statistics for monitoring.
301#[derive(Debug, Clone, Copy)]
302pub struct PoolStats {
303    /// Currently active connections (in use)
304    pub active: u32,
305    /// Currently idle connections (in pool)
306    pub idle: u32,
307    /// Total connections created over lifetime
308    pub total_created: u64,
309    /// Total connections acquired from pool
310    pub total_acquired: u64,
311    /// Total connections returned to pool
312    pub total_returned: u64,
313    /// Total connections evicted due to timeout
314    pub total_timed_out: u64,
315    /// Maximum allowed connections
316    pub max_connections: u32,
317    /// Maximum idle connections to keep
318    pub max_idle: u32,
319}
320
321impl PoolStats {
322    /// Calculate pool hit rate (connections reused vs created).
323    pub fn hit_rate(&self) -> f64 {
324        if self.total_acquired == 0 {
325            return 0.0;
326        }
327        self.total_acquired as f64 / (self.total_acquired + self.total_created) as f64
328    }
329
330    /// Calculate utilization (active / max).
331    pub fn utilization(&self) -> f64 {
332        if self.max_connections == 0 {
333            return 0.0;
334        }
335        self.active as f64 / self.max_connections as f64
336    }
337}
338
339#[cfg(test)]
340mod tests {
341    use super::*;
342
343    // Mock client for testing
344    fn mock_client() -> AgentClient {
345        // We can't easily create a real AgentClient in tests, so we'll test
346        // the pool logic with the actual pool operations
347        unimplemented!("Tests require integration setup")
348    }
349
350    #[test]
351    fn test_pool_creation() {
352        let pool = AgentConnectionPool::new(8, 2, 4, Duration::from_secs(60));
353        assert_eq!(pool.active_count(), 0);
354        assert_eq!(pool.idle_count(), 0);
355        assert_eq!(pool.total_created(), 0);
356    }
357
358    #[test]
359    fn test_pool_can_create() {
360        let pool = AgentConnectionPool::new(2, 0, 2, Duration::from_secs(60));
361        assert!(pool.can_create());
362
363        // Simulate creating connections
364        pool.register_created();
365        assert!(pool.can_create());
366
367        pool.register_created();
368        assert!(!pool.can_create()); // At max
369    }
370
371    #[test]
372    fn test_pool_stats() {
373        let pool = AgentConnectionPool::new(8, 2, 4, Duration::from_secs(60));
374        let stats = pool.stats();
375        assert_eq!(stats.active, 0);
376        assert_eq!(stats.idle, 0);
377        assert_eq!(stats.max_connections, 8);
378        assert_eq!(stats.max_idle, 4);
379    }
380
381    #[test]
382    fn test_try_get_empty_pool() {
383        let pool = AgentConnectionPool::new(8, 2, 4, Duration::from_secs(60));
384        assert!(pool.try_get().is_none());
385    }
386
387    #[test]
388    fn test_hit_rate() {
389        let stats = PoolStats {
390            active: 2,
391            idle: 3,
392            total_created: 10,
393            total_acquired: 90,
394            total_returned: 88,
395            total_timed_out: 2,
396            max_connections: 20,
397            max_idle: 10,
398        };
399
400        // 90 acquired from pool, 10 created = 90% hit rate
401        assert!((stats.hit_rate() - 0.9).abs() < 0.01);
402
403        // 2 active out of 20 max = 10% utilization
404        assert!((stats.utilization() - 0.1).abs() < 0.01);
405    }
406}