kode_bridge/
pool.rs

1use crate::errors::{KodeBridgeError, Result};
2use interprocess::local_socket::tokio::prelude::LocalSocketStream;
3use interprocess::local_socket::traits::tokio::Stream;
4use interprocess::local_socket::Name;
5use parking_lot::Mutex;
6use std::collections::VecDeque;
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9use tokio::sync::Semaphore;
10use tracing::{debug, trace, warn};
11
12/// Configuration for connection pool
13#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
14pub struct PoolConfig {
15    /// Maximum number of connections in the pool
16    pub max_size: usize,
17    /// Minimum number of idle connections to maintain
18    pub min_idle: usize,
19    /// Maximum time a connection can be idle before being closed (in milliseconds)
20    pub max_idle_time_ms: u64,
21    /// Maximum time to wait for a connection from the pool (in milliseconds)
22    pub connection_timeout_ms: u64,
23    /// Time to wait between connection attempts (in milliseconds)
24    pub retry_delay_ms: u64,
25    /// Maximum number of retry attempts
26    pub max_retries: usize,
27    /// Concurrent request limit
28    pub max_concurrent_requests: usize,
29    /// Rate limiting: max requests per second
30    pub max_requests_per_second: Option<f64>,
31}
32
33impl Default for PoolConfig {
34    fn default() -> Self {
35        Self {
36            max_size: 64,                         // 增加到2的幂次,更好的内存对齐
37            min_idle: 8,                          // 减少最小空闲连接
38            max_idle_time_ms: 120_000,            // 2分钟 - 进一步减少空闲时间
39            connection_timeout_ms: 3_000,         // 减少连接超时到3秒
40            retry_delay_ms: 10,                   // 减少重试延迟到10ms
41            max_retries: 2,                       // 减少重试次数到2次
42            max_concurrent_requests: 32,          // 增加并发请求限制到2的幂次
43            max_requests_per_second: Some(100.0), // 增加速率限制
44        }
45    }
46}
47
48impl PoolConfig {
49    /// Get max idle time as Duration
50    pub fn max_idle_time(&self) -> Duration {
51        Duration::from_millis(self.max_idle_time_ms)
52    }
53
54    /// Get connection timeout as Duration
55    pub fn connection_timeout(&self) -> Duration {
56        Duration::from_millis(self.connection_timeout_ms)
57    }
58
59    /// Get retry delay as Duration
60    pub fn retry_delay(&self) -> Duration {
61        Duration::from_millis(self.retry_delay_ms)
62    }
63}
64
65/// A pooled connection wrapper
66pub struct PooledConnection {
67    inner: Option<LocalSocketStream>,
68    created_at: Instant,
69    last_used: Instant,
70    pool: Arc<ConnectionPoolInner>,
71}
72
73impl PooledConnection {
74    fn new(stream: LocalSocketStream, pool: Arc<ConnectionPoolInner>) -> Self {
75        let now = Instant::now();
76        Self {
77            inner: Some(stream),
78            created_at: now,
79            last_used: now,
80            pool,
81        }
82    }
83
84    /// Get the underlying stream
85    pub fn stream(&mut self) -> Option<&mut LocalSocketStream> {
86        self.last_used = Instant::now();
87        self.inner.as_mut()
88    }
89
90    /// Take ownership of the underlying stream
91    pub fn into_stream(mut self) -> Option<LocalSocketStream> {
92        self.inner.take()
93    }
94
95    /// Check if connection is still valid
96    pub fn is_valid(&self) -> bool {
97        self.inner.is_some() && self.last_used.elapsed() < self.pool.config.max_idle_time()
98    }
99
100    /// Get connection age
101    pub fn age(&self) -> Duration {
102        self.created_at.elapsed()
103    }
104
105    /// Get idle time
106    pub fn idle_time(&self) -> Duration {
107        self.last_used.elapsed()
108    }
109}
110
111impl Drop for PooledConnection {
112    fn drop(&mut self) {
113        if let Some(stream) = self.inner.take() {
114            self.pool.return_connection(stream);
115        }
116    }
117}
118
119/// Internal pool state with PUT request optimization
120struct ConnectionPoolInner {
121    name: Name<'static>,
122    config: PoolConfig,
123    connections: Mutex<VecDeque<(LocalSocketStream, Instant)>>,
124    semaphore: Semaphore,
125    /// 专用于PUT请求的新连接缓存
126    fresh_connections: Mutex<VecDeque<LocalSocketStream>>,
127    /// 快速路径计数器,用于避免semaphore竞争
128    active_connections: std::sync::atomic::AtomicUsize,
129}
130
131impl ConnectionPoolInner {
132    fn new(name: Name<'static>, config: PoolConfig) -> Self {
133        Self {
134            name,
135            semaphore: Semaphore::new(config.max_size),
136            connections: Mutex::new(VecDeque::new()),
137            fresh_connections: Mutex::new(VecDeque::new()),
138            active_connections: std::sync::atomic::AtomicUsize::new(0),
139            config,
140        }
141    }
142
143    /// Get a fresh connection for PUT requests, bypassing normal pool
144    async fn get_fresh_connection(&self) -> Result<LocalSocketStream> {
145        // 首先检查是否有预备的新连接
146        {
147            let mut fresh = self.fresh_connections.lock();
148            if let Some(stream) = fresh.pop_front() {
149                return Ok(stream);
150            }
151        }
152
153        // 创建新连接,使用更短的超时和优化的参数
154        let mut last_error = None;
155        for attempt in 0..2 {
156            // 只重试1次,更快失败
157            if attempt > 0 {
158                tokio::time::sleep(Duration::from_millis(10)).await; // 很短的重试延迟
159            }
160
161            match LocalSocketStream::connect(self.name.clone()).await {
162                Ok(stream) => {
163                    debug!("Created fresh connection for PUT request");
164                    return Ok(stream);
165                }
166                Err(e) => {
167                    warn!("Fresh connection attempt {} failed: {}", attempt + 1, e);
168                    last_error = Some(e);
169                }
170            }
171        }
172
173        // 如果新连接失败,回退到池化连接
174        match self.get_pooled_connection() {
175            Some(stream) => {
176                debug!("Falling back to pooled connection for PUT request");
177                Ok(stream)
178            }
179            None => Err(KodeBridgeError::connection(format!(
180                "Failed to get fresh connection and no pooled connections available: {}",
181                last_error
182                    .map(|e| e.to_string())
183                    .unwrap_or_else(|| "Unknown error".to_string())
184            ))),
185        }
186    }
187
188    /// 预热新连接池,为PUT请求做准备
189    async fn preheat_fresh_connections(&self, count: usize) {
190        let mut successful = 0;
191        for _ in 0..count {
192            match LocalSocketStream::connect(self.name.clone()).await {
193                Ok(stream) => {
194                    let mut fresh = self.fresh_connections.lock();
195                    fresh.push_back(stream);
196                    successful += 1;
197                }
198                Err(_) => break,
199            }
200        }
201        if successful > 0 {
202            debug!("Preheated {} fresh connections", successful);
203        }
204    }
205
206    async fn create_connection(&self) -> Result<LocalSocketStream> {
207        let mut last_error = None;
208        let mut delay = self.config.retry_delay();
209        let max_delay = Duration::from_millis(200); // 限制最大延迟为200ms
210
211        for attempt in 0..self.config.max_retries {
212            if attempt > 0 {
213                // 优化的指数退避,避免过长的延迟
214                tokio::time::sleep(delay).await;
215                delay = std::cmp::min(delay * 2, max_delay);
216            }
217
218            match LocalSocketStream::connect(self.name.clone()).await {
219                Ok(stream) => {
220                    debug!("Created new connection on attempt {}", attempt + 1);
221                    return Ok(stream);
222                }
223                Err(e) => {
224                    warn!("Connection attempt {} failed: {}", attempt + 1, e);
225                    last_error = Some(e);
226                }
227            }
228        }
229
230        Err(KodeBridgeError::connection(format!(
231            "Failed to get fresh connection and no pooled connections available: {}",
232            last_error
233                .map(|e| e.to_string())
234                .unwrap_or_else(|| "Unknown error".to_string())
235        )))
236    }
237
238    fn get_pooled_connection(&self) -> Option<LocalSocketStream> {
239        let mut connections = self.connections.lock();
240
241        // Remove expired connections
242        let now = Instant::now();
243        while let Some((_, created_at)) = connections.front() {
244            if now.duration_since(*created_at) > self.config.max_idle_time() {
245                connections.pop_front();
246            } else {
247                break;
248            }
249        }
250
251        // Get a connection if available
252        connections.pop_front().map(|(stream, _)| {
253            trace!("Reusing pooled connection, {} remaining", connections.len());
254            stream
255        })
256    }
257
258    fn return_connection(&self, stream: LocalSocketStream) {
259        let mut connections = self.connections.lock();
260
261        // 减少活跃连接计数
262        self.active_connections
263            .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
264
265        // Only keep the connection if we haven't exceeded max_size
266        if connections.len() < self.config.max_size {
267            connections.push_back((stream, Instant::now()));
268            trace!("Returned connection to pool, {} total", connections.len());
269        } else {
270            trace!("Pool full, dropping connection");
271        }
272    }
273
274    async fn get_connection_with_timeout(&self) -> Result<LocalSocketStream> {
275        // 优化的获取连接逻辑,减少semaphore竞争
276
277        // 首先快速检查是否有可用的池化连接
278        if let Some(stream) = self.get_pooled_connection() {
279            return Ok(stream);
280        }
281
282        // 检查活跃连接数,避免不必要的semaphore等待
283        let active_count = self
284            .active_connections
285            .load(std::sync::atomic::Ordering::Relaxed);
286        if active_count >= self.config.max_size {
287            // 快速失败路径,避免长时间等待
288            return Err(KodeBridgeError::custom("Connection pool exhausted"));
289        }
290
291        // 使用更短的超时来获取许可
292        let timeout = std::cmp::min(self.config.connection_timeout(), Duration::from_millis(500));
293        let permit = tokio::time::timeout(timeout, self.semaphore.acquire())
294            .await
295            .map_err(|_| KodeBridgeError::timeout(timeout.as_millis() as u64))?
296            .map_err(|_| KodeBridgeError::custom("Semaphore closed"))?;
297
298        // 再次检查池化连接(避免不必要的连接创建)
299        if let Some(stream) = self.get_pooled_connection() {
300            permit.forget(); // Release the permit since we're using a pooled connection
301            return Ok(stream);
302        }
303
304        // 增加活跃连接计数
305        self.active_connections
306            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
307
308        // 创建新连接
309        match self.create_connection().await {
310            Ok(stream) => {
311                permit.forget(); // Release the permit
312                Ok(stream)
313            }
314            Err(e) => {
315                // 出错时减少活跃连接计数
316                self.active_connections
317                    .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
318                Err(e)
319            }
320        }
321    }
322
323    /// Get a fresh connection optimized for PUT requests
324    async fn get_fresh_connection_with_timeout(&self) -> Result<LocalSocketStream> {
325        // 对PUT请求使用专门优化的逻辑
326        let permit = tokio::time::timeout(Duration::from_millis(100), self.semaphore.acquire())
327            .await
328            .map_err(|_| KodeBridgeError::timeout(100))?
329            .map_err(|_| KodeBridgeError::custom("Semaphore closed"))?;
330
331        // Get fresh connection directly with optimized parameters
332        let stream = self.get_fresh_connection().await?;
333        permit.forget(); // Release the permit
334        Ok(stream)
335    }
336}
337
338/// High-performance connection pool for IPC connections
339#[derive(Clone)]
340pub struct ConnectionPool {
341    inner: Arc<ConnectionPoolInner>,
342}
343
344impl ConnectionPool {
345    /// Create a new connection pool
346    pub fn new(name: Name<'static>, config: PoolConfig) -> Self {
347        Self {
348            inner: Arc::new(ConnectionPoolInner::new(name, config)),
349        }
350    }
351
352    /// Create a connection pool with default configuration
353    pub fn with_default_config(name: Name<'static>) -> Self {
354        Self::new(name, PoolConfig::default())
355    }
356
357    /// Get a connection from the pool
358    pub async fn get_connection(&self) -> Result<PooledConnection> {
359        let stream = self.inner.get_connection_with_timeout().await?;
360        Ok(PooledConnection::new(stream, self.inner.clone()))
361    }
362
363    /// Get a fresh connection optimized for PUT requests
364    pub async fn get_fresh_connection(&self) -> Result<PooledConnection> {
365        let stream = self.inner.get_fresh_connection_with_timeout().await?;
366        Ok(PooledConnection::new(stream, self.inner.clone()))
367    }
368
369    /// Preheat fresh connections for better PUT performance
370    pub async fn preheat_for_puts(&self, count: usize) {
371        self.inner.preheat_fresh_connections(count).await;
372    }
373
374    /// Get multiple connections for concurrent operations
375    pub async fn get_connections(&self, count: usize) -> Result<Vec<PooledConnection>> {
376        let mut connections = Vec::with_capacity(count);
377
378        // Use semaphore to control concurrent acquisition
379        let mut tasks = Vec::new();
380        for _ in 0..count {
381            let pool = self.clone();
382            tasks.push(tokio::spawn(async move { pool.get_connection().await }));
383        }
384
385        // Wait for all connection acquisitions to complete
386        for task in tasks {
387            match task.await {
388                Ok(Ok(conn)) => connections.push(conn),
389                Ok(Err(e)) => return Err(e),
390                Err(e) => return Err(KodeBridgeError::custom(format!("Task failed: {}", e))),
391            }
392        }
393
394        Ok(connections)
395    }
396
397    /// Get pool statistics
398    pub fn stats(&self) -> PoolStats {
399        let connections = self.inner.connections.lock();
400        let active_count = self
401            .inner
402            .active_connections
403            .load(std::sync::atomic::Ordering::Relaxed);
404        PoolStats {
405            total_connections: connections.len(),
406            available_permits: self.inner.semaphore.available_permits(),
407            max_size: self.inner.config.max_size,
408            active_connections: active_count,
409        }
410    }
411
412    /// Close all pooled connections
413    pub fn close(&self) {
414        let mut connections = self.inner.connections.lock();
415        connections.clear();
416        debug!("Closed all pooled connections");
417    }
418}
419
420/// Pool statistics
421#[derive(Debug, Clone)]
422pub struct PoolStats {
423    pub total_connections: usize,
424    pub available_permits: usize,
425    pub max_size: usize,
426    pub active_connections: usize,
427}
428
429impl std::fmt::Display for PoolStats {
430    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
431        write!(
432            f,
433            "Pool(connections: {}, active: {}, permits: {}, max: {})",
434            self.total_connections, self.active_connections, self.available_permits, self.max_size
435        )
436    }
437}