kode_bridge/
pool.rs

1use crate::errors::{KodeBridgeError, Result};
2use interprocess::local_socket::Name;
3use interprocess::local_socket::tokio::prelude::LocalSocketStream;
4use interprocess::local_socket::traits::tokio::Stream;
5use parking_lot::Mutex;
6use std::collections::VecDeque;
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9use tokio::sync::Semaphore;
10use tracing::{debug, trace, warn};
11
12/// Configuration for connection pool
13#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
14pub struct PoolConfig {
15    /// Maximum number of connections in the pool
16    pub max_size: usize,
17    /// Minimum number of idle connections to maintain
18    pub min_idle: usize,
19    /// Maximum time a connection can be idle before being closed (in milliseconds)
20    pub max_idle_time_ms: u64,
21    /// Maximum time to wait for a connection from the pool (in milliseconds)
22    pub connection_timeout_ms: u64,
23    /// Time to wait between connection attempts (in milliseconds)
24    pub retry_delay_ms: u64,
25    /// Maximum number of retry attempts
26    pub max_retries: usize,
27}
28
29impl Default for PoolConfig {
30    fn default() -> Self {
31        Self {
32            max_size: 10,
33            min_idle: 2,
34            max_idle_time_ms: 300_000, // 5 minutes
35            connection_timeout_ms: 30_000,
36            retry_delay_ms: 100,
37            max_retries: 3,
38        }
39    }
40}
41
42impl PoolConfig {
43    /// Get max idle time as Duration
44    pub fn max_idle_time(&self) -> Duration {
45        Duration::from_millis(self.max_idle_time_ms)
46    }
47
48    /// Get connection timeout as Duration
49    pub fn connection_timeout(&self) -> Duration {
50        Duration::from_millis(self.connection_timeout_ms)
51    }
52
53    /// Get retry delay as Duration
54    pub fn retry_delay(&self) -> Duration {
55        Duration::from_millis(self.retry_delay_ms)
56    }
57}
58
59/// A pooled connection wrapper
60pub struct PooledConnection {
61    inner: Option<LocalSocketStream>,
62    created_at: Instant,
63    last_used: Instant,
64    pool: Arc<ConnectionPoolInner>,
65}
66
67impl PooledConnection {
68    fn new(stream: LocalSocketStream, pool: Arc<ConnectionPoolInner>) -> Self {
69        let now = Instant::now();
70        Self {
71            inner: Some(stream),
72            created_at: now,
73            last_used: now,
74            pool,
75        }
76    }
77
78    /// Get the underlying stream
79    pub fn stream(&mut self) -> Option<&mut LocalSocketStream> {
80        self.last_used = Instant::now();
81        self.inner.as_mut()
82    }
83
84    /// Take ownership of the underlying stream
85    pub fn into_stream(mut self) -> Option<LocalSocketStream> {
86        self.inner.take()
87    }
88
89    /// Check if connection is still valid
90    pub fn is_valid(&self) -> bool {
91        self.inner.is_some() && self.last_used.elapsed() < self.pool.config.max_idle_time()
92    }
93
94    /// Get connection age
95    pub fn age(&self) -> Duration {
96        self.created_at.elapsed()
97    }
98
99    /// Get idle time
100    pub fn idle_time(&self) -> Duration {
101        self.last_used.elapsed()
102    }
103}
104
105impl Drop for PooledConnection {
106    fn drop(&mut self) {
107        if let Some(stream) = self.inner.take() {
108            self.pool.return_connection(stream);
109        }
110    }
111}
112
113/// Internal pool state
114struct ConnectionPoolInner {
115    name: Name<'static>,
116    config: PoolConfig,
117    connections: Mutex<VecDeque<(LocalSocketStream, Instant)>>,
118    semaphore: Semaphore,
119}
120
121impl ConnectionPoolInner {
122    fn new(name: Name<'static>, config: PoolConfig) -> Self {
123        Self {
124            name,
125            semaphore: Semaphore::new(config.max_size),
126            connections: Mutex::new(VecDeque::new()),
127            config,
128        }
129    }
130
131    async fn create_connection(&self) -> Result<LocalSocketStream> {
132        let mut last_error = None;
133
134        for attempt in 0..self.config.max_retries {
135            if attempt > 0 {
136                tokio::time::sleep(self.config.retry_delay()).await;
137            }
138
139            match LocalSocketStream::connect(self.name.clone()).await {
140                Ok(stream) => {
141                    debug!("Created new connection on attempt {}", attempt + 1);
142                    return Ok(stream);
143                }
144                Err(e) => {
145                    warn!("Connection attempt {} failed: {}", attempt + 1, e);
146                    last_error = Some(e);
147                }
148            }
149        }
150
151        Err(KodeBridgeError::connection(format!(
152            "Failed to create connection after {} attempts: {}",
153            self.config.max_retries,
154            last_error.unwrap()
155        )))
156    }
157
158    fn get_pooled_connection(&self) -> Option<LocalSocketStream> {
159        let mut connections = self.connections.lock();
160
161        // Remove expired connections
162        let now = Instant::now();
163        while let Some((_, created_at)) = connections.front() {
164            if now.duration_since(*created_at) > self.config.max_idle_time() {
165                connections.pop_front();
166            } else {
167                break;
168            }
169        }
170
171        // Get a connection if available
172        connections.pop_front().map(|(stream, _)| {
173            trace!("Reusing pooled connection, {} remaining", connections.len());
174            stream
175        })
176    }
177
178    fn return_connection(&self, stream: LocalSocketStream) {
179        let mut connections = self.connections.lock();
180
181        // Only keep the connection if we haven't exceeded max_size
182        if connections.len() < self.config.max_size {
183            connections.push_back((stream, Instant::now()));
184            trace!("Returned connection to pool, {} total", connections.len());
185        } else {
186            trace!("Pool full, dropping connection");
187        }
188    }
189
190    async fn get_connection_with_timeout(&self) -> Result<LocalSocketStream> {
191        // Try to get a permit within the timeout
192        let permit =
193            tokio::time::timeout(self.config.connection_timeout(), self.semaphore.acquire())
194                .await
195                .map_err(|_| {
196                    KodeBridgeError::timeout(self.config.connection_timeout().as_millis() as u64)
197                })?
198                .map_err(|_| KodeBridgeError::custom("Semaphore closed"))?;
199
200        // Try to get an existing connection first
201        if let Some(stream) = self.get_pooled_connection() {
202            permit.forget(); // Release the permit since we're using a pooled connection
203            return Ok(stream);
204        }
205
206        // Create a new connection
207        let stream = self.create_connection().await?;
208        permit.forget(); // Release the permit
209        Ok(stream)
210    }
211}
212
213/// High-performance connection pool for IPC connections
214#[derive(Clone)]
215pub struct ConnectionPool {
216    inner: Arc<ConnectionPoolInner>,
217}
218
219impl ConnectionPool {
220    /// Create a new connection pool
221    pub fn new(name: Name<'static>, config: PoolConfig) -> Self {
222        Self {
223            inner: Arc::new(ConnectionPoolInner::new(name, config)),
224        }
225    }
226
227    /// Create a connection pool with default configuration
228    pub fn with_default_config(name: Name<'static>) -> Self {
229        Self::new(name, PoolConfig::default())
230    }
231
232    /// Get a connection from the pool
233    pub async fn get_connection(&self) -> Result<PooledConnection> {
234        let stream = self.inner.get_connection_with_timeout().await?;
235        Ok(PooledConnection::new(stream, self.inner.clone()))
236    }
237
238    /// Get pool statistics
239    pub fn stats(&self) -> PoolStats {
240        let connections = self.inner.connections.lock();
241        PoolStats {
242            total_connections: connections.len(),
243            available_permits: self.inner.semaphore.available_permits(),
244            max_size: self.inner.config.max_size,
245        }
246    }
247
248    /// Close all pooled connections
249    pub fn close(&self) {
250        let mut connections = self.inner.connections.lock();
251        connections.clear();
252        debug!("Closed all pooled connections");
253    }
254}
255
256/// Pool statistics
257#[derive(Debug, Clone)]
258pub struct PoolStats {
259    pub total_connections: usize,
260    pub available_permits: usize,
261    pub max_size: usize,
262}
263
264impl std::fmt::Display for PoolStats {
265    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
266        write!(
267            f,
268            "Pool(connections: {}, permits: {}, max: {})",
269            self.total_connections, self.available_permits, self.max_size
270        )
271    }
272}