Skip to main content

kode_bridge/
pool.rs

1use crate::errors::{KodeBridgeError, Result};
2use interprocess::local_socket::tokio::prelude::LocalSocketStream;
3use interprocess::local_socket::traits::tokio::Stream as _;
4use interprocess::local_socket::Name;
5use parking_lot::Mutex;
6use std::collections::VecDeque;
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9use tokio::sync::{OwnedSemaphorePermit, Semaphore};
10use tracing::{debug, trace, warn};
11
12/// Configuration for connection pool
13#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
14pub struct PoolConfig {
15    /// Maximum number of connections in the pool
16    pub max_size: usize,
17    /// Minimum number of idle connections to maintain
18    pub min_idle: usize,
19    /// Maximum time a connection can be idle before being closed (in milliseconds)
20    pub max_idle_time_ms: u64,
21    /// Maximum time to wait for a connection from the pool (in milliseconds)
22    pub connection_timeout_ms: u64,
23    /// Time to wait between connection attempts (in milliseconds)
24    pub retry_delay_ms: u64,
25    /// Maximum number of retry attempts
26    pub max_retries: usize,
27    /// Concurrent request limit
28    pub max_concurrent_requests: usize,
29    /// Rate limiting: max requests per second
30    pub max_requests_per_second: Option<f64>,
31}
32
33impl Default for PoolConfig {
34    fn default() -> Self {
35        Self {
36            max_size: 64,                 // 增加到2的幂次,更好的内存对齐
37            min_idle: 8,                  // 减少最小空闲连接
38            max_idle_time_ms: 120_000,    // 2分钟 - 进一步减少空闲时间
39            connection_timeout_ms: 3_000, // 减少连接超时到3秒
40            retry_delay_ms: 10,           // 减少重试延迟到10ms
41            max_retries: 2,               // 减少重试次数到2次
42            max_concurrent_requests: 32,
43            max_requests_per_second: None,
44        }
45    }
46}
47
48impl PoolConfig {
49    /// Get max idle time as Duration
50    pub const fn max_idle_time(&self) -> Duration {
51        Duration::from_millis(self.max_idle_time_ms)
52    }
53
54    /// Get connection timeout as Duration
55    pub const fn connection_timeout(&self) -> Duration {
56        Duration::from_millis(self.connection_timeout_ms)
57    }
58
59    /// Get retry delay as Duration
60    pub const fn retry_delay(&self) -> Duration {
61        Duration::from_millis(self.retry_delay_ms)
62    }
63}
64
65/// A pooled connection wrapper
66pub struct PooledConnection {
67    inner: Option<LocalSocketStream>,
68    permit: Option<OwnedSemaphorePermit>,
69    created_at: Instant,
70    last_used: Instant,
71    reusable: bool,
72    pool: Arc<ConnectionPoolInner>,
73}
74
75impl PooledConnection {
76    fn new(stream: LocalSocketStream, permit: OwnedSemaphorePermit, pool: Arc<ConnectionPoolInner>) -> Self {
77        let now = Instant::now();
78        Self {
79            inner: Some(stream),
80            permit: Some(permit),
81            created_at: now,
82            last_used: now,
83            reusable: true,
84            pool,
85        }
86    }
87
88    /// Get the underlying stream
89    pub fn stream(&mut self) -> Option<&mut LocalSocketStream> {
90        self.last_used = Instant::now();
91        self.inner.as_mut()
92    }
93
94    /// Take ownership of the underlying stream
95    pub fn into_stream(mut self) -> Option<LocalSocketStream> {
96        self.reusable = false;
97        if let Some(permit) = self.permit.take() {
98            self.pool
99                .active_connections
100                .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
101            drop(permit);
102        }
103        self.inner.take()
104    }
105
106    /// Mark the connection as broken so it is not returned to the pool.
107    pub const fn invalidate(&mut self) {
108        self.reusable = false;
109    }
110
111    /// Check if connection is still valid
112    pub fn is_valid(&self) -> bool {
113        self.inner.is_some() && self.last_used.elapsed() < self.pool.config.max_idle_time()
114    }
115
116    /// Get connection age
117    pub fn age(&self) -> Duration {
118        self.created_at.elapsed()
119    }
120
121    /// Get idle time
122    pub fn idle_time(&self) -> Duration {
123        self.last_used.elapsed()
124    }
125}
126
127impl Drop for PooledConnection {
128    fn drop(&mut self) {
129        if let Some(stream) = self.inner.take() {
130            if let Some(permit) = self.permit.take() {
131                self.pool.return_connection(stream, permit, self.reusable);
132            }
133        }
134    }
135}
136
137struct IdleConnection {
138    stream: LocalSocketStream,
139    last_used: Instant,
140    permit: OwnedSemaphorePermit,
141}
142
143/// Internal pool state with PUT request optimization
144struct ConnectionPoolInner {
145    name: Name<'static>,
146    config: PoolConfig,
147    connections: Mutex<VecDeque<IdleConnection>>,
148    semaphore: Arc<Semaphore>,
149    /// Number of checked-out connections currently in use.
150    active_connections: std::sync::atomic::AtomicUsize,
151}
152
153impl ConnectionPoolInner {
154    fn new(name: Name<'static>, config: PoolConfig) -> Self {
155        Self {
156            name,
157            semaphore: Arc::new(Semaphore::new(config.max_size)),
158            connections: Mutex::new(VecDeque::new()),
159            active_connections: std::sync::atomic::AtomicUsize::new(0),
160            config,
161        }
162    }
163
164    /// Get a fresh connection for PUT requests, bypassing normal pool
165    async fn get_fresh_connection(&self) -> Result<LocalSocketStream> {
166        let mut last_error = None;
167        for attempt in 0..2 {
168            if attempt > 0 {
169                tokio::time::sleep(Duration::from_millis(10)).await;
170            }
171
172            match LocalSocketStream::connect(self.name.clone()).await {
173                Ok(stream) => {
174                    debug!("Created fresh connection for PUT request");
175                    return Ok(stream);
176                }
177                Err(e) => {
178                    warn!("Fresh connection attempt {} failed: {}", attempt + 1, e);
179                    last_error = Some(e);
180                }
181            }
182        }
183
184        Err(KodeBridgeError::connection(format!(
185            "Failed to create fresh connection: {}",
186            last_error
187                .map(|e| e.to_string())
188                .unwrap_or_else(|| "Unknown error".to_string())
189        )))
190    }
191
192    /// 预热新连接池,为PUT请求做准备
193    async fn preheat_fresh_connections(&self, count: usize) {
194        let mut successful = 0;
195        for _ in 0..count {
196            let Ok(permit) = Arc::clone(&self.semaphore).try_acquire_owned() else {
197                break;
198            };
199
200            match LocalSocketStream::connect(self.name.clone()).await {
201                Ok(stream) => {
202                    self.connections.lock().push_back(IdleConnection {
203                        stream,
204                        last_used: Instant::now(),
205                        permit,
206                    });
207                    successful += 1;
208                }
209                Err(_) => {
210                    drop(permit);
211                    break;
212                }
213            }
214        }
215        if successful > 0 {
216            debug!("Preheated {} fresh connections", successful);
217        }
218    }
219
220    async fn create_connection(&self) -> Result<LocalSocketStream> {
221        let mut last_error = None;
222        let mut delay = self.config.retry_delay();
223        let max_delay = Duration::from_millis(200); // 限制最大延迟为200ms
224
225        for attempt in 0..self.config.max_retries {
226            if attempt > 0 {
227                // 优化的指数退避,避免过长的延迟
228                tokio::time::sleep(delay).await;
229                delay = std::cmp::min(delay * 2, max_delay);
230            }
231
232            match LocalSocketStream::connect(self.name.clone()).await {
233                Ok(stream) => {
234                    debug!("Created new connection on attempt {}", attempt + 1);
235                    return Ok(stream);
236                }
237                Err(e) => {
238                    warn!("Connection attempt {} failed: {}", attempt + 1, e);
239                    last_error = Some(e);
240                }
241            }
242        }
243
244        Err(KodeBridgeError::connection(format!(
245            "Failed to get fresh connection and no pooled connections available: {}",
246            last_error
247                .map(|e| e.to_string())
248                .unwrap_or_else(|| "Unknown error".to_string())
249        )))
250    }
251
252    fn get_pooled_connection(&self) -> Option<(LocalSocketStream, OwnedSemaphorePermit)> {
253        let mut connections = self.connections.lock();
254
255        let now = Instant::now();
256        while let Some(idle) = connections.front() {
257            if now.duration_since(idle.last_used) > self.config.max_idle_time() {
258                connections.pop_front();
259            } else {
260                break;
261            }
262        }
263
264        connections.pop_front().map(|idle| {
265            trace!("Reusing pooled connection, {} remaining", connections.len());
266            (idle.stream, idle.permit)
267        })
268    }
269
270    fn return_connection(&self, stream: LocalSocketStream, permit: OwnedSemaphorePermit, reusable: bool) {
271        self.active_connections
272            .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
273
274        if !reusable {
275            trace!("Dropping broken pooled connection");
276            return;
277        }
278
279        let (kept, pool_size) = {
280            let mut connections = self.connections.lock();
281
282            if connections.len() < self.config.max_size {
283                connections.push_back(IdleConnection {
284                    stream,
285                    last_used: Instant::now(),
286                    permit,
287                });
288                (true, connections.len())
289            } else {
290                (false, connections.len())
291            }
292        };
293
294        if kept {
295            trace!("Returned connection to pool, {} total", pool_size);
296        } else {
297            trace!("Pool full, dropping connection");
298        }
299    }
300}
301
302/// High-performance connection pool for IPC connections
303#[derive(Clone)]
304pub struct ConnectionPool {
305    inner: Arc<ConnectionPoolInner>,
306}
307
308impl ConnectionPool {
309    /// Create a new connection pool
310    pub fn new(name: Name<'static>, config: PoolConfig) -> Self {
311        Self {
312            inner: Arc::new(ConnectionPoolInner::new(name, config)),
313        }
314    }
315
316    /// Create a connection pool with default configuration
317    pub fn with_default_config(name: Name<'static>) -> Self {
318        Self::new(name, PoolConfig::default())
319    }
320
321    /// Get a connection from the pool
322    pub async fn get_connection(&self) -> Result<PooledConnection> {
323        if let Some((stream, permit)) = self.inner.get_pooled_connection() {
324            self.inner
325                .active_connections
326                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
327            return Ok(PooledConnection::new(stream, permit, Arc::clone(&self.inner)));
328        }
329
330        let timeout = self.inner.config.connection_timeout();
331        let permit = tokio::time::timeout(timeout, Arc::clone(&self.inner.semaphore).acquire_owned())
332            .await
333            .map_err(|_| KodeBridgeError::timeout(timeout.as_millis() as u64))?
334            .map_err(|_| KodeBridgeError::custom("Semaphore closed"))?;
335
336        if let Some((stream, pooled_permit)) = self.inner.get_pooled_connection() {
337            drop(permit);
338            self.inner
339                .active_connections
340                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
341            return Ok(PooledConnection::new(stream, pooled_permit, Arc::clone(&self.inner)));
342        }
343
344        match self.inner.create_connection().await {
345            Ok(stream) => {
346                self.inner
347                    .active_connections
348                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
349                Ok(PooledConnection::new(stream, permit, Arc::clone(&self.inner)))
350            }
351            Err(e) => Err(e),
352        }
353    }
354
355    /// Get a fresh connection optimized for PUT requests
356    pub async fn get_fresh_connection(&self) -> Result<PooledConnection> {
357        let permit = tokio::time::timeout(
358            Duration::from_millis(100),
359            Arc::clone(&self.inner.semaphore).acquire_owned(),
360        )
361        .await
362        .map_err(|_| KodeBridgeError::timeout(100))?
363        .map_err(|_| KodeBridgeError::custom("Semaphore closed"))?;
364
365        match self.inner.get_fresh_connection().await {
366            Ok(stream) => {
367                self.inner
368                    .active_connections
369                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
370                Ok(PooledConnection::new(stream, permit, Arc::clone(&self.inner)))
371            }
372            Err(e) => Err(e),
373        }
374    }
375
376    /// Preheat fresh connections for better PUT performance
377    pub async fn preheat_for_puts(&self, count: usize) {
378        self.inner.preheat_fresh_connections(count).await;
379    }
380
381    /// Get multiple connections for concurrent operations
382    pub async fn get_connections(&self, count: usize) -> Result<Vec<PooledConnection>> {
383        let mut connections = Vec::with_capacity(count);
384
385        // Use semaphore to control concurrent acquisition
386        let mut tasks = Vec::new();
387        for _ in 0..count {
388            let pool = self.clone();
389            tasks.push(tokio::spawn(async move { pool.get_connection().await }));
390        }
391
392        // Wait for all connection acquisitions to complete
393        for task in tasks {
394            match task.await {
395                Ok(Ok(conn)) => connections.push(conn),
396                Ok(Err(e)) => return Err(e),
397                Err(e) => return Err(KodeBridgeError::custom(format!("Task failed: {}", e))),
398            }
399        }
400
401        Ok(connections)
402    }
403
404    /// Get pool statistics
405    pub fn stats(&self) -> PoolStats {
406        let connections = self.inner.connections.lock();
407        let active_count = self
408            .inner
409            .active_connections
410            .load(std::sync::atomic::Ordering::Relaxed);
411        PoolStats {
412            total_connections: connections.len() + active_count,
413            available_permits: self.inner.semaphore.available_permits(),
414            max_size: self.inner.config.max_size,
415            active_connections: active_count,
416        }
417    }
418
419    /// Close all pooled connections
420    pub fn close(&self) {
421        self.inner.connections.lock().clear();
422        debug!("Closed all pooled connections");
423    }
424}
425
426/// Pool statistics
427#[derive(Debug, Clone)]
428pub struct PoolStats {
429    pub total_connections: usize,
430    pub available_permits: usize,
431    pub max_size: usize,
432    pub active_connections: usize,
433}
434
435impl std::fmt::Display for PoolStats {
436    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
437        write!(
438            f,
439            "Pool(connections: {}, active: {}, permits: {}, max: {})",
440            self.total_connections, self.active_connections, self.available_permits, self.max_size
441        )
442    }
443}