kode_bridge/
pool.rs

1use crate::errors::{KodeBridgeError, Result};
2use interprocess::local_socket::tokio::prelude::LocalSocketStream;
3use interprocess::local_socket::traits::tokio::Stream;
4use interprocess::local_socket::Name;
5use parking_lot::Mutex;
6use std::collections::VecDeque;
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9use tokio::sync::Semaphore;
10use tracing::{debug, trace, warn};
11
12/// Configuration for connection pool
13#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
14pub struct PoolConfig {
15    /// Maximum number of connections in the pool
16    pub max_size: usize,
17    /// Minimum number of idle connections to maintain
18    pub min_idle: usize,
19    /// Maximum time a connection can be idle before being closed (in milliseconds)
20    pub max_idle_time_ms: u64,
21    /// Maximum time to wait for a connection from the pool (in milliseconds)
22    pub connection_timeout_ms: u64,
23    /// Time to wait between connection attempts (in milliseconds)
24    pub retry_delay_ms: u64,
25    /// Maximum number of retry attempts
26    pub max_retries: usize,
27    /// Concurrent request limit
28    pub max_concurrent_requests: usize,
29    /// Rate limiting: max requests per second
30    pub max_requests_per_second: Option<f64>,
31}
32
33impl Default for PoolConfig {
34    fn default() -> Self {
35        Self {
36            max_size: 20,              // Increase connection pool size to support more concurrency
37            min_idle: 5,               // Maintain more idle connections
38            max_idle_time_ms: 300_000, // 5 minutes
39            connection_timeout_ms: 10_000, // Reduce connection timeout
40            retry_delay_ms: 50,        // Reduce retry delay
41            max_retries: 5,            // Increase retry attempts
42            max_concurrent_requests: 8,
43            max_requests_per_second: Some(10.0),
44        }
45    }
46}
47
48impl PoolConfig {
49    /// Get max idle time as Duration
50    pub fn max_idle_time(&self) -> Duration {
51        Duration::from_millis(self.max_idle_time_ms)
52    }
53
54    /// Get connection timeout as Duration
55    pub fn connection_timeout(&self) -> Duration {
56        Duration::from_millis(self.connection_timeout_ms)
57    }
58
59    /// Get retry delay as Duration
60    pub fn retry_delay(&self) -> Duration {
61        Duration::from_millis(self.retry_delay_ms)
62    }
63}
64
65/// A pooled connection wrapper
66pub struct PooledConnection {
67    inner: Option<LocalSocketStream>,
68    created_at: Instant,
69    last_used: Instant,
70    pool: Arc<ConnectionPoolInner>,
71}
72
73impl PooledConnection {
74    fn new(stream: LocalSocketStream, pool: Arc<ConnectionPoolInner>) -> Self {
75        let now = Instant::now();
76        Self {
77            inner: Some(stream),
78            created_at: now,
79            last_used: now,
80            pool,
81        }
82    }
83
84    /// Get the underlying stream
85    pub fn stream(&mut self) -> Option<&mut LocalSocketStream> {
86        self.last_used = Instant::now();
87        self.inner.as_mut()
88    }
89
90    /// Take ownership of the underlying stream
91    pub fn into_stream(mut self) -> Option<LocalSocketStream> {
92        self.inner.take()
93    }
94
95    /// Check if connection is still valid
96    pub fn is_valid(&self) -> bool {
97        self.inner.is_some() && self.last_used.elapsed() < self.pool.config.max_idle_time()
98    }
99
100    /// Get connection age
101    pub fn age(&self) -> Duration {
102        self.created_at.elapsed()
103    }
104
105    /// Get idle time
106    pub fn idle_time(&self) -> Duration {
107        self.last_used.elapsed()
108    }
109}
110
111impl Drop for PooledConnection {
112    fn drop(&mut self) {
113        if let Some(stream) = self.inner.take() {
114            self.pool.return_connection(stream);
115        }
116    }
117}
118
119/// Internal pool state
120struct ConnectionPoolInner {
121    name: Name<'static>,
122    config: PoolConfig,
123    connections: Mutex<VecDeque<(LocalSocketStream, Instant)>>,
124    semaphore: Semaphore,
125}
126
127impl ConnectionPoolInner {
128    fn new(name: Name<'static>, config: PoolConfig) -> Self {
129        Self {
130            name,
131            semaphore: Semaphore::new(config.max_size),
132            connections: Mutex::new(VecDeque::new()),
133            config,
134        }
135    }
136
137    async fn create_connection(&self) -> Result<LocalSocketStream> {
138        let mut last_error = None;
139        let mut delay = self.config.retry_delay();
140
141        for attempt in 0..self.config.max_retries {
142            if attempt > 0 {
143                // Exponential backoff retry delay
144                tokio::time::sleep(delay).await;
145                delay = std::cmp::min(delay * 2, Duration::from_millis(1000));
146            }
147
148            match LocalSocketStream::connect(self.name.clone()).await {
149                Ok(stream) => {
150                    debug!("Created new connection on attempt {}", attempt + 1);
151                    return Ok(stream);
152                }
153                Err(e) => {
154                    warn!("Connection attempt {} failed: {}", attempt + 1, e);
155                    last_error = Some(e);
156                }
157            }
158        }
159
160        Err(KodeBridgeError::connection(format!(
161            "Failed to create connection after {} attempts: {}",
162            self.config.max_retries,
163            last_error
164                .map(|e| e.to_string())
165                .unwrap_or_else(|| "Unknown error".to_string())
166        )))
167    }
168
169    fn get_pooled_connection(&self) -> Option<LocalSocketStream> {
170        let mut connections = self.connections.lock();
171
172        // Remove expired connections
173        let now = Instant::now();
174        while let Some((_, created_at)) = connections.front() {
175            if now.duration_since(*created_at) > self.config.max_idle_time() {
176                connections.pop_front();
177            } else {
178                break;
179            }
180        }
181
182        // Get a connection if available
183        connections.pop_front().map(|(stream, _)| {
184            trace!("Reusing pooled connection, {} remaining", connections.len());
185            stream
186        })
187    }
188
189    fn return_connection(&self, stream: LocalSocketStream) {
190        let mut connections = self.connections.lock();
191
192        // Only keep the connection if we haven't exceeded max_size
193        if connections.len() < self.config.max_size {
194            connections.push_back((stream, Instant::now()));
195            trace!("Returned connection to pool, {} total", connections.len());
196        } else {
197            trace!("Pool full, dropping connection");
198        }
199    }
200
201    async fn get_connection_with_timeout(&self) -> Result<LocalSocketStream> {
202        // Try to get a permit within the timeout
203        let permit =
204            tokio::time::timeout(self.config.connection_timeout(), self.semaphore.acquire())
205                .await
206                .map_err(|_| {
207                    KodeBridgeError::timeout(self.config.connection_timeout().as_millis() as u64)
208                })?
209                .map_err(|_| KodeBridgeError::custom("Semaphore closed"))?;
210
211        // Try to get an existing connection first
212        if let Some(stream) = self.get_pooled_connection() {
213            permit.forget(); // Release the permit since we're using a pooled connection
214            return Ok(stream);
215        }
216
217        // Create a new connection
218        let stream = self.create_connection().await?;
219        permit.forget(); // Release the permit
220        Ok(stream)
221    }
222}
223
224/// High-performance connection pool for IPC connections
225#[derive(Clone)]
226pub struct ConnectionPool {
227    inner: Arc<ConnectionPoolInner>,
228}
229
230impl ConnectionPool {
231    /// Create a new connection pool
232    pub fn new(name: Name<'static>, config: PoolConfig) -> Self {
233        Self {
234            inner: Arc::new(ConnectionPoolInner::new(name, config)),
235        }
236    }
237
238    /// Create a connection pool with default configuration
239    pub fn with_default_config(name: Name<'static>) -> Self {
240        Self::new(name, PoolConfig::default())
241    }
242
243    /// Get a connection from the pool
244    pub async fn get_connection(&self) -> Result<PooledConnection> {
245        let stream = self.inner.get_connection_with_timeout().await?;
246        Ok(PooledConnection::new(stream, self.inner.clone()))
247    }
248
249    /// Get multiple connections for concurrent operations
250    pub async fn get_connections(&self, count: usize) -> Result<Vec<PooledConnection>> {
251        let mut connections = Vec::with_capacity(count);
252
253        // Use semaphore to control concurrent acquisition
254        let mut tasks = Vec::new();
255        for _ in 0..count {
256            let pool = self.clone();
257            tasks.push(tokio::spawn(async move { pool.get_connection().await }));
258        }
259
260        // Wait for all connection acquisitions to complete
261        for task in tasks {
262            match task.await {
263                Ok(Ok(conn)) => connections.push(conn),
264                Ok(Err(e)) => return Err(e),
265                Err(e) => return Err(KodeBridgeError::custom(format!("Task failed: {}", e))),
266            }
267        }
268
269        Ok(connections)
270    }
271
272    /// Get pool statistics
273    pub fn stats(&self) -> PoolStats {
274        let connections = self.inner.connections.lock();
275        PoolStats {
276            total_connections: connections.len(),
277            available_permits: self.inner.semaphore.available_permits(),
278            max_size: self.inner.config.max_size,
279        }
280    }
281
282    /// Close all pooled connections
283    pub fn close(&self) {
284        let mut connections = self.inner.connections.lock();
285        connections.clear();
286        debug!("Closed all pooled connections");
287    }
288}
289
290/// Pool statistics
291#[derive(Debug, Clone)]
292pub struct PoolStats {
293    pub total_connections: usize,
294    pub available_permits: usize,
295    pub max_size: usize,
296}
297
298impl std::fmt::Display for PoolStats {
299    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
300        write!(
301            f,
302            "Pool(connections: {}, permits: {}, max: {})",
303            self.total_connections, self.available_permits, self.max_size
304        )
305    }
306}