1use crate::errors::{KodeBridgeError, Result};
2use interprocess::local_socket::tokio::prelude::LocalSocketStream;
3use interprocess::local_socket::traits::tokio::Stream as _;
4use interprocess::local_socket::Name;
5use parking_lot::Mutex;
6use std::collections::VecDeque;
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9use tokio::sync::Semaphore;
10use tracing::{debug, trace, warn};
11
12#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
14pub struct PoolConfig {
15 pub max_size: usize,
17 pub min_idle: usize,
19 pub max_idle_time_ms: u64,
21 pub connection_timeout_ms: u64,
23 pub retry_delay_ms: u64,
25 pub max_retries: usize,
27 pub max_concurrent_requests: usize,
29 pub max_requests_per_second: Option<f64>,
31}
32
33impl Default for PoolConfig {
34 fn default() -> Self {
35 Self {
36 max_size: 64, min_idle: 8, max_idle_time_ms: 120_000, connection_timeout_ms: 3_000, retry_delay_ms: 10, max_retries: 2, max_concurrent_requests: 32, max_requests_per_second: Some(100.0), }
45 }
46}
47
48impl PoolConfig {
49 pub const fn max_idle_time(&self) -> Duration {
51 Duration::from_millis(self.max_idle_time_ms)
52 }
53
54 pub const fn connection_timeout(&self) -> Duration {
56 Duration::from_millis(self.connection_timeout_ms)
57 }
58
59 pub const fn retry_delay(&self) -> Duration {
61 Duration::from_millis(self.retry_delay_ms)
62 }
63}
64
65pub struct PooledConnection {
67 inner: Option<LocalSocketStream>,
68 created_at: Instant,
69 last_used: Instant,
70 pool: Arc<ConnectionPoolInner>,
71}
72
73impl PooledConnection {
74 fn new(stream: LocalSocketStream, pool: Arc<ConnectionPoolInner>) -> Self {
75 let now = Instant::now();
76 Self {
77 inner: Some(stream),
78 created_at: now,
79 last_used: now,
80 pool,
81 }
82 }
83
84 pub fn stream(&mut self) -> Option<&mut LocalSocketStream> {
86 self.last_used = Instant::now();
87 self.inner.as_mut()
88 }
89
90 pub fn into_stream(mut self) -> Option<LocalSocketStream> {
92 self.inner.take()
93 }
94
95 pub fn is_valid(&self) -> bool {
97 self.inner.is_some() && self.last_used.elapsed() < self.pool.config.max_idle_time()
98 }
99
100 pub fn age(&self) -> Duration {
102 self.created_at.elapsed()
103 }
104
105 pub fn idle_time(&self) -> Duration {
107 self.last_used.elapsed()
108 }
109}
110
111impl Drop for PooledConnection {
112 fn drop(&mut self) {
113 if let Some(stream) = self.inner.take() {
114 self.pool.return_connection(stream);
115 }
116 }
117}
118
119struct ConnectionPoolInner {
121 name: Name<'static>,
122 config: PoolConfig,
123 connections: Mutex<VecDeque<(LocalSocketStream, Instant)>>,
124 semaphore: Semaphore,
125 fresh_connections: Mutex<VecDeque<LocalSocketStream>>,
127 active_connections: std::sync::atomic::AtomicUsize,
129}
130
131impl ConnectionPoolInner {
132 fn new(name: Name<'static>, config: PoolConfig) -> Self {
133 Self {
134 name,
135 semaphore: Semaphore::new(config.max_size),
136 connections: Mutex::new(VecDeque::new()),
137 fresh_connections: Mutex::new(VecDeque::new()),
138 active_connections: std::sync::atomic::AtomicUsize::new(0),
139 config,
140 }
141 }
142
143 async fn get_fresh_connection(&self) -> Result<LocalSocketStream> {
145 {
147 let mut fresh = self.fresh_connections.lock();
148 if let Some(stream) = fresh.pop_front() {
149 return Ok(stream);
150 }
151 }
152
153 let mut last_error = None;
155 for attempt in 0..2 {
156 if attempt > 0 {
158 tokio::time::sleep(Duration::from_millis(10)).await; }
160
161 match LocalSocketStream::connect(self.name.clone()).await {
162 Ok(stream) => {
163 debug!("Created fresh connection for PUT request");
164 return Ok(stream);
165 }
166 Err(e) => {
167 warn!("Fresh connection attempt {} failed: {}", attempt + 1, e);
168 last_error = Some(e);
169 }
170 }
171 }
172
173 match self.get_pooled_connection() {
175 Some(stream) => {
176 debug!("Falling back to pooled connection for PUT request");
177 Ok(stream)
178 }
179 None => Err(KodeBridgeError::connection(format!(
180 "Failed to get fresh connection and no pooled connections available: {}",
181 last_error
182 .map(|e| e.to_string())
183 .unwrap_or_else(|| "Unknown error".to_string())
184 ))),
185 }
186 }
187
188 async fn preheat_fresh_connections(&self, count: usize) {
190 let mut successful = 0;
191 for _ in 0..count {
192 match LocalSocketStream::connect(self.name.clone()).await {
193 Ok(stream) => {
194 self.fresh_connections.lock().push_back(stream);
195 successful += 1;
196 }
197 Err(_) => break,
198 }
199 }
200 if successful > 0 {
201 debug!("Preheated {} fresh connections", successful);
202 }
203 }
204
205 async fn create_connection(&self) -> Result<LocalSocketStream> {
206 let mut last_error = None;
207 let mut delay = self.config.retry_delay();
208 let max_delay = Duration::from_millis(200); for attempt in 0..self.config.max_retries {
211 if attempt > 0 {
212 tokio::time::sleep(delay).await;
214 delay = std::cmp::min(delay * 2, max_delay);
215 }
216
217 match LocalSocketStream::connect(self.name.clone()).await {
218 Ok(stream) => {
219 debug!("Created new connection on attempt {}", attempt + 1);
220 return Ok(stream);
221 }
222 Err(e) => {
223 warn!("Connection attempt {} failed: {}", attempt + 1, e);
224 last_error = Some(e);
225 }
226 }
227 }
228
229 Err(KodeBridgeError::connection(format!(
230 "Failed to get fresh connection and no pooled connections available: {}",
231 last_error
232 .map(|e| e.to_string())
233 .unwrap_or_else(|| "Unknown error".to_string())
234 )))
235 }
236
237 fn get_pooled_connection(&self) -> Option<LocalSocketStream> {
238 let mut connections = self.connections.lock();
239
240 let now = Instant::now();
242 while let Some((_, created_at)) = connections.front() {
243 if now.duration_since(*created_at) > self.config.max_idle_time() {
244 connections.pop_front();
245 } else {
246 break;
247 }
248 }
249
250 connections.pop_front().map(|(stream, _)| {
252 trace!("Reusing pooled connection, {} remaining", connections.len());
253 stream
254 })
255 }
256
257 fn return_connection(&self, stream: LocalSocketStream) {
258 self.active_connections
260 .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
261
262 let (kept, pool_size) = {
263 let mut connections = self.connections.lock();
264
265 if connections.len() < self.config.max_size {
267 connections.push_back((stream, Instant::now()));
268 (true, connections.len())
269 } else {
270 (false, connections.len())
271 }
272 };
273
274 if kept {
275 trace!("Returned connection to pool, {} total", pool_size);
276 } else {
277 trace!("Pool full, dropping connection");
278 }
279 }
280
281 async fn get_connection_with_timeout(&self) -> Result<LocalSocketStream> {
282 if let Some(stream) = self.get_pooled_connection() {
286 return Ok(stream);
287 }
288
289 let active_count = self
291 .active_connections
292 .load(std::sync::atomic::Ordering::Relaxed);
293 if active_count >= self.config.max_size {
294 return Err(KodeBridgeError::custom("Connection pool exhausted"));
296 }
297
298 let timeout = std::cmp::min(self.config.connection_timeout(), Duration::from_millis(500));
300 let permit = tokio::time::timeout(timeout, self.semaphore.acquire())
301 .await
302 .map_err(|_| KodeBridgeError::timeout(timeout.as_millis() as u64))?
303 .map_err(|_| KodeBridgeError::custom("Semaphore closed"))?;
304
305 if let Some(stream) = self.get_pooled_connection() {
307 drop(permit);
308 return Ok(stream);
309 }
310
311 self.active_connections
313 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
314
315 match self.create_connection().await {
317 Ok(stream) => {
318 drop(permit);
319 Ok(stream)
320 }
321 Err(e) => {
322 self.active_connections
324 .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
325 drop(permit);
326 Err(e)
327 }
328 }
329 }
330
331 async fn get_fresh_connection_with_timeout(&self) -> Result<LocalSocketStream> {
333 let permit = tokio::time::timeout(Duration::from_millis(100), self.semaphore.acquire())
335 .await
336 .map_err(|_| KodeBridgeError::timeout(100))?
337 .map_err(|_| KodeBridgeError::custom("Semaphore closed"))?;
338
339 let stream = self.get_fresh_connection().await?;
341 drop(permit);
342 Ok(stream)
343 }
344}
345
346#[derive(Clone)]
348pub struct ConnectionPool {
349 inner: Arc<ConnectionPoolInner>,
350}
351
352impl ConnectionPool {
353 pub fn new(name: Name<'static>, config: PoolConfig) -> Self {
355 Self {
356 inner: Arc::new(ConnectionPoolInner::new(name, config)),
357 }
358 }
359
360 pub fn with_default_config(name: Name<'static>) -> Self {
362 Self::new(name, PoolConfig::default())
363 }
364
365 pub async fn get_connection(&self) -> Result<PooledConnection> {
367 let stream = self.inner.get_connection_with_timeout().await?;
368 Ok(PooledConnection::new(stream, Arc::clone(&self.inner)))
369 }
370
371 pub async fn get_fresh_connection(&self) -> Result<PooledConnection> {
373 let stream = self.inner.get_fresh_connection_with_timeout().await?;
374 Ok(PooledConnection::new(stream, Arc::clone(&self.inner)))
375 }
376
377 pub async fn preheat_for_puts(&self, count: usize) {
379 self.inner.preheat_fresh_connections(count).await;
380 }
381
382 pub async fn get_connections(&self, count: usize) -> Result<Vec<PooledConnection>> {
384 let mut connections = Vec::with_capacity(count);
385
386 let mut tasks = Vec::new();
388 for _ in 0..count {
389 let pool = self.clone();
390 tasks.push(tokio::spawn(async move { pool.get_connection().await }));
391 }
392
393 for task in tasks {
395 match task.await {
396 Ok(Ok(conn)) => connections.push(conn),
397 Ok(Err(e)) => return Err(e),
398 Err(e) => return Err(KodeBridgeError::custom(format!("Task failed: {}", e))),
399 }
400 }
401
402 Ok(connections)
403 }
404
405 pub fn stats(&self) -> PoolStats {
407 let connections = self.inner.connections.lock();
408 let active_count = self
409 .inner
410 .active_connections
411 .load(std::sync::atomic::Ordering::Relaxed);
412 PoolStats {
413 total_connections: connections.len(),
414 available_permits: self.inner.semaphore.available_permits(),
415 max_size: self.inner.config.max_size,
416 active_connections: active_count,
417 }
418 }
419
420 pub fn close(&self) {
422 self.inner.connections.lock().clear();
423 debug!("Closed all pooled connections");
424 }
425}
426
427#[derive(Debug, Clone)]
429pub struct PoolStats {
430 pub total_connections: usize,
431 pub available_permits: usize,
432 pub max_size: usize,
433 pub active_connections: usize,
434}
435
436impl std::fmt::Display for PoolStats {
437 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
438 write!(
439 f,
440 "Pool(connections: {}, active: {}, permits: {}, max: {})",
441 self.total_connections, self.active_connections, self.available_permits, self.max_size
442 )
443 }
444}