kode_bridge/
pool.rs

1use crate::errors::{KodeBridgeError, Result};
2use interprocess::local_socket::tokio::prelude::LocalSocketStream;
3use interprocess::local_socket::traits::tokio::Stream as _;
4use interprocess::local_socket::Name;
5use parking_lot::Mutex;
6use std::collections::VecDeque;
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9use tokio::sync::Semaphore;
10use tracing::{debug, trace, warn};
11
12/// Configuration for connection pool
13#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
14pub struct PoolConfig {
15    /// Maximum number of connections in the pool
16    pub max_size: usize,
17    /// Minimum number of idle connections to maintain
18    pub min_idle: usize,
19    /// Maximum time a connection can be idle before being closed (in milliseconds)
20    pub max_idle_time_ms: u64,
21    /// Maximum time to wait for a connection from the pool (in milliseconds)
22    pub connection_timeout_ms: u64,
23    /// Time to wait between connection attempts (in milliseconds)
24    pub retry_delay_ms: u64,
25    /// Maximum number of retry attempts
26    pub max_retries: usize,
27    /// Concurrent request limit
28    pub max_concurrent_requests: usize,
29    /// Rate limiting: max requests per second
30    pub max_requests_per_second: Option<f64>,
31}
32
33impl Default for PoolConfig {
34    fn default() -> Self {
35        Self {
36            max_size: 64,                         // 增加到2的幂次,更好的内存对齐
37            min_idle: 8,                          // 减少最小空闲连接
38            max_idle_time_ms: 120_000,            // 2分钟 - 进一步减少空闲时间
39            connection_timeout_ms: 3_000,         // 减少连接超时到3秒
40            retry_delay_ms: 10,                   // 减少重试延迟到10ms
41            max_retries: 2,                       // 减少重试次数到2次
42            max_concurrent_requests: 32,          // 增加并发请求限制到2的幂次
43            max_requests_per_second: Some(100.0), // 增加速率限制
44        }
45    }
46}
47
48impl PoolConfig {
49    /// Get max idle time as Duration
50    pub const fn max_idle_time(&self) -> Duration {
51        Duration::from_millis(self.max_idle_time_ms)
52    }
53
54    /// Get connection timeout as Duration
55    pub const fn connection_timeout(&self) -> Duration {
56        Duration::from_millis(self.connection_timeout_ms)
57    }
58
59    /// Get retry delay as Duration
60    pub const fn retry_delay(&self) -> Duration {
61        Duration::from_millis(self.retry_delay_ms)
62    }
63}
64
65/// A pooled connection wrapper
66pub struct PooledConnection {
67    inner: Option<LocalSocketStream>,
68    created_at: Instant,
69    last_used: Instant,
70    pool: Arc<ConnectionPoolInner>,
71}
72
73impl PooledConnection {
74    fn new(stream: LocalSocketStream, pool: Arc<ConnectionPoolInner>) -> Self {
75        let now = Instant::now();
76        Self {
77            inner: Some(stream),
78            created_at: now,
79            last_used: now,
80            pool,
81        }
82    }
83
84    /// Get the underlying stream
85    pub fn stream(&mut self) -> Option<&mut LocalSocketStream> {
86        self.last_used = Instant::now();
87        self.inner.as_mut()
88    }
89
90    /// Take ownership of the underlying stream
91    pub fn into_stream(mut self) -> Option<LocalSocketStream> {
92        self.inner.take()
93    }
94
95    /// Check if connection is still valid
96    pub fn is_valid(&self) -> bool {
97        self.inner.is_some() && self.last_used.elapsed() < self.pool.config.max_idle_time()
98    }
99
100    /// Get connection age
101    pub fn age(&self) -> Duration {
102        self.created_at.elapsed()
103    }
104
105    /// Get idle time
106    pub fn idle_time(&self) -> Duration {
107        self.last_used.elapsed()
108    }
109}
110
111impl Drop for PooledConnection {
112    fn drop(&mut self) {
113        if let Some(stream) = self.inner.take() {
114            self.pool.return_connection(stream);
115        }
116    }
117}
118
119/// Internal pool state with PUT request optimization
120struct ConnectionPoolInner {
121    name: Name<'static>,
122    config: PoolConfig,
123    connections: Mutex<VecDeque<(LocalSocketStream, Instant)>>,
124    semaphore: Semaphore,
125    /// 专用于PUT请求的新连接缓存
126    fresh_connections: Mutex<VecDeque<LocalSocketStream>>,
127    /// 快速路径计数器,用于避免semaphore竞争
128    active_connections: std::sync::atomic::AtomicUsize,
129}
130
131impl ConnectionPoolInner {
132    fn new(name: Name<'static>, config: PoolConfig) -> Self {
133        Self {
134            name,
135            semaphore: Semaphore::new(config.max_size),
136            connections: Mutex::new(VecDeque::new()),
137            fresh_connections: Mutex::new(VecDeque::new()),
138            active_connections: std::sync::atomic::AtomicUsize::new(0),
139            config,
140        }
141    }
142
143    /// Get a fresh connection for PUT requests, bypassing normal pool
144    async fn get_fresh_connection(&self) -> Result<LocalSocketStream> {
145        // 首先检查是否有预备的新连接
146        {
147            let mut fresh = self.fresh_connections.lock();
148            if let Some(stream) = fresh.pop_front() {
149                return Ok(stream);
150            }
151        }
152
153        // 创建新连接,使用更短的超时和优化的参数
154        let mut last_error = None;
155        for attempt in 0..2 {
156            // 只重试1次,更快失败
157            if attempt > 0 {
158                tokio::time::sleep(Duration::from_millis(10)).await; // 很短的重试延迟
159            }
160
161            match LocalSocketStream::connect(self.name.clone()).await {
162                Ok(stream) => {
163                    debug!("Created fresh connection for PUT request");
164                    return Ok(stream);
165                }
166                Err(e) => {
167                    warn!("Fresh connection attempt {} failed: {}", attempt + 1, e);
168                    last_error = Some(e);
169                }
170            }
171        }
172
173        // 如果新连接失败,回退到池化连接
174        match self.get_pooled_connection() {
175            Some(stream) => {
176                debug!("Falling back to pooled connection for PUT request");
177                Ok(stream)
178            }
179            None => Err(KodeBridgeError::connection(format!(
180                "Failed to get fresh connection and no pooled connections available: {}",
181                last_error
182                    .map(|e| e.to_string())
183                    .unwrap_or_else(|| "Unknown error".to_string())
184            ))),
185        }
186    }
187
188    /// 预热新连接池,为PUT请求做准备
189    async fn preheat_fresh_connections(&self, count: usize) {
190        let mut successful = 0;
191        for _ in 0..count {
192            match LocalSocketStream::connect(self.name.clone()).await {
193                Ok(stream) => {
194                    self.fresh_connections.lock().push_back(stream);
195                    successful += 1;
196                }
197                Err(_) => break,
198            }
199        }
200        if successful > 0 {
201            debug!("Preheated {} fresh connections", successful);
202        }
203    }
204
205    async fn create_connection(&self) -> Result<LocalSocketStream> {
206        let mut last_error = None;
207        let mut delay = self.config.retry_delay();
208        let max_delay = Duration::from_millis(200); // 限制最大延迟为200ms
209
210        for attempt in 0..self.config.max_retries {
211            if attempt > 0 {
212                // 优化的指数退避,避免过长的延迟
213                tokio::time::sleep(delay).await;
214                delay = std::cmp::min(delay * 2, max_delay);
215            }
216
217            match LocalSocketStream::connect(self.name.clone()).await {
218                Ok(stream) => {
219                    debug!("Created new connection on attempt {}", attempt + 1);
220                    return Ok(stream);
221                }
222                Err(e) => {
223                    warn!("Connection attempt {} failed: {}", attempt + 1, e);
224                    last_error = Some(e);
225                }
226            }
227        }
228
229        Err(KodeBridgeError::connection(format!(
230            "Failed to get fresh connection and no pooled connections available: {}",
231            last_error
232                .map(|e| e.to_string())
233                .unwrap_or_else(|| "Unknown error".to_string())
234        )))
235    }
236
237    fn get_pooled_connection(&self) -> Option<LocalSocketStream> {
238        let mut connections = self.connections.lock();
239
240        // Remove expired connections
241        let now = Instant::now();
242        while let Some((_, created_at)) = connections.front() {
243            if now.duration_since(*created_at) > self.config.max_idle_time() {
244                connections.pop_front();
245            } else {
246                break;
247            }
248        }
249
250        // Get a connection if available
251        connections.pop_front().map(|(stream, _)| {
252            trace!("Reusing pooled connection, {} remaining", connections.len());
253            stream
254        })
255    }
256
257    fn return_connection(&self, stream: LocalSocketStream) {
258        // 减少活跃连接计数
259        self.active_connections
260            .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
261
262        let (kept, pool_size) = {
263            let mut connections = self.connections.lock();
264
265            // Only keep the connection if we haven't exceeded max_size
266            if connections.len() < self.config.max_size {
267                connections.push_back((stream, Instant::now()));
268                (true, connections.len())
269            } else {
270                (false, connections.len())
271            }
272        };
273
274        if kept {
275            trace!("Returned connection to pool, {} total", pool_size);
276        } else {
277            trace!("Pool full, dropping connection");
278        }
279    }
280
281    async fn get_connection_with_timeout(&self) -> Result<LocalSocketStream> {
282        // 优化的获取连接逻辑,减少semaphore竞争
283
284        // 首先快速检查是否有可用的池化连接
285        if let Some(stream) = self.get_pooled_connection() {
286            return Ok(stream);
287        }
288
289        // 检查活跃连接数,避免不必要的semaphore等待
290        let active_count = self
291            .active_connections
292            .load(std::sync::atomic::Ordering::Relaxed);
293        if active_count >= self.config.max_size {
294            // 快速失败路径,避免长时间等待
295            return Err(KodeBridgeError::custom("Connection pool exhausted"));
296        }
297
298        // 使用更短的超时来获取许可
299        let timeout = std::cmp::min(self.config.connection_timeout(), Duration::from_millis(500));
300        let permit = tokio::time::timeout(timeout, self.semaphore.acquire())
301            .await
302            .map_err(|_| KodeBridgeError::timeout(timeout.as_millis() as u64))?
303            .map_err(|_| KodeBridgeError::custom("Semaphore closed"))?;
304
305        // 再次检查池化连接(避免不必要的连接创建)
306        if let Some(stream) = self.get_pooled_connection() {
307            drop(permit);
308            return Ok(stream);
309        }
310
311        // 增加活跃连接计数
312        self.active_connections
313            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
314
315        // 创建新连接
316        match self.create_connection().await {
317            Ok(stream) => {
318                drop(permit);
319                Ok(stream)
320            }
321            Err(e) => {
322                // 出错时减少活跃连接计数
323                self.active_connections
324                    .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
325                drop(permit);
326                Err(e)
327            }
328        }
329    }
330
331    /// Get a fresh connection optimized for PUT requests
332    async fn get_fresh_connection_with_timeout(&self) -> Result<LocalSocketStream> {
333        // 对PUT请求使用专门优化的逻辑
334        let permit = tokio::time::timeout(Duration::from_millis(100), self.semaphore.acquire())
335            .await
336            .map_err(|_| KodeBridgeError::timeout(100))?
337            .map_err(|_| KodeBridgeError::custom("Semaphore closed"))?;
338
339        // Get fresh connection directly with optimized parameters
340        let stream = self.get_fresh_connection().await?;
341        drop(permit);
342        Ok(stream)
343    }
344}
345
346/// High-performance connection pool for IPC connections
347#[derive(Clone)]
348pub struct ConnectionPool {
349    inner: Arc<ConnectionPoolInner>,
350}
351
352impl ConnectionPool {
353    /// Create a new connection pool
354    pub fn new(name: Name<'static>, config: PoolConfig) -> Self {
355        Self {
356            inner: Arc::new(ConnectionPoolInner::new(name, config)),
357        }
358    }
359
360    /// Create a connection pool with default configuration
361    pub fn with_default_config(name: Name<'static>) -> Self {
362        Self::new(name, PoolConfig::default())
363    }
364
365    /// Get a connection from the pool
366    pub async fn get_connection(&self) -> Result<PooledConnection> {
367        let stream = self.inner.get_connection_with_timeout().await?;
368        Ok(PooledConnection::new(stream, Arc::clone(&self.inner)))
369    }
370
371    /// Get a fresh connection optimized for PUT requests
372    pub async fn get_fresh_connection(&self) -> Result<PooledConnection> {
373        let stream = self.inner.get_fresh_connection_with_timeout().await?;
374        Ok(PooledConnection::new(stream, Arc::clone(&self.inner)))
375    }
376
377    /// Preheat fresh connections for better PUT performance
378    pub async fn preheat_for_puts(&self, count: usize) {
379        self.inner.preheat_fresh_connections(count).await;
380    }
381
382    /// Get multiple connections for concurrent operations
383    pub async fn get_connections(&self, count: usize) -> Result<Vec<PooledConnection>> {
384        let mut connections = Vec::with_capacity(count);
385
386        // Use semaphore to control concurrent acquisition
387        let mut tasks = Vec::new();
388        for _ in 0..count {
389            let pool = self.clone();
390            tasks.push(tokio::spawn(async move { pool.get_connection().await }));
391        }
392
393        // Wait for all connection acquisitions to complete
394        for task in tasks {
395            match task.await {
396                Ok(Ok(conn)) => connections.push(conn),
397                Ok(Err(e)) => return Err(e),
398                Err(e) => return Err(KodeBridgeError::custom(format!("Task failed: {}", e))),
399            }
400        }
401
402        Ok(connections)
403    }
404
405    /// Get pool statistics
406    pub fn stats(&self) -> PoolStats {
407        let connections = self.inner.connections.lock();
408        let active_count = self
409            .inner
410            .active_connections
411            .load(std::sync::atomic::Ordering::Relaxed);
412        PoolStats {
413            total_connections: connections.len(),
414            available_permits: self.inner.semaphore.available_permits(),
415            max_size: self.inner.config.max_size,
416            active_connections: active_count,
417        }
418    }
419
420    /// Close all pooled connections
421    pub fn close(&self) {
422        self.inner.connections.lock().clear();
423        debug!("Closed all pooled connections");
424    }
425}
426
427/// Pool statistics
428#[derive(Debug, Clone)]
429pub struct PoolStats {
430    pub total_connections: usize,
431    pub available_permits: usize,
432    pub max_size: usize,
433    pub active_connections: usize,
434}
435
436impl std::fmt::Display for PoolStats {
437    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
438        write!(
439            f,
440            "Pool(connections: {}, active: {}, permits: {}, max: {})",
441            self.total_connections, self.active_connections, self.available_permits, self.max_size
442        )
443    }
444}