1use crate::errors::{KodeBridgeError, Result};
2use interprocess::local_socket::tokio::prelude::LocalSocketStream;
3use interprocess::local_socket::traits::tokio::Stream;
4use interprocess::local_socket::Name;
5use parking_lot::Mutex;
6use std::collections::VecDeque;
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9use tokio::sync::Semaphore;
10use tracing::{debug, trace, warn};
11
12#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
14pub struct PoolConfig {
15 pub max_size: usize,
17 pub min_idle: usize,
19 pub max_idle_time_ms: u64,
21 pub connection_timeout_ms: u64,
23 pub retry_delay_ms: u64,
25 pub max_retries: usize,
27 pub max_concurrent_requests: usize,
29 pub max_requests_per_second: Option<f64>,
31}
32
33impl Default for PoolConfig {
34 fn default() -> Self {
35 Self {
36 max_size: 64, min_idle: 8, max_idle_time_ms: 120_000, connection_timeout_ms: 3_000, retry_delay_ms: 10, max_retries: 2, max_concurrent_requests: 32, max_requests_per_second: Some(100.0), }
45 }
46}
47
48impl PoolConfig {
49 pub fn max_idle_time(&self) -> Duration {
51 Duration::from_millis(self.max_idle_time_ms)
52 }
53
54 pub fn connection_timeout(&self) -> Duration {
56 Duration::from_millis(self.connection_timeout_ms)
57 }
58
59 pub fn retry_delay(&self) -> Duration {
61 Duration::from_millis(self.retry_delay_ms)
62 }
63}
64
65pub struct PooledConnection {
67 inner: Option<LocalSocketStream>,
68 created_at: Instant,
69 last_used: Instant,
70 pool: Arc<ConnectionPoolInner>,
71}
72
73impl PooledConnection {
74 fn new(stream: LocalSocketStream, pool: Arc<ConnectionPoolInner>) -> Self {
75 let now = Instant::now();
76 Self {
77 inner: Some(stream),
78 created_at: now,
79 last_used: now,
80 pool,
81 }
82 }
83
84 pub fn stream(&mut self) -> Option<&mut LocalSocketStream> {
86 self.last_used = Instant::now();
87 self.inner.as_mut()
88 }
89
90 pub fn into_stream(mut self) -> Option<LocalSocketStream> {
92 self.inner.take()
93 }
94
95 pub fn is_valid(&self) -> bool {
97 self.inner.is_some() && self.last_used.elapsed() < self.pool.config.max_idle_time()
98 }
99
100 pub fn age(&self) -> Duration {
102 self.created_at.elapsed()
103 }
104
105 pub fn idle_time(&self) -> Duration {
107 self.last_used.elapsed()
108 }
109}
110
111impl Drop for PooledConnection {
112 fn drop(&mut self) {
113 if let Some(stream) = self.inner.take() {
114 self.pool.return_connection(stream);
115 }
116 }
117}
118
119struct ConnectionPoolInner {
121 name: Name<'static>,
122 config: PoolConfig,
123 connections: Mutex<VecDeque<(LocalSocketStream, Instant)>>,
124 semaphore: Semaphore,
125 fresh_connections: Mutex<VecDeque<LocalSocketStream>>,
127 active_connections: std::sync::atomic::AtomicUsize,
129}
130
131impl ConnectionPoolInner {
132 fn new(name: Name<'static>, config: PoolConfig) -> Self {
133 Self {
134 name,
135 semaphore: Semaphore::new(config.max_size),
136 connections: Mutex::new(VecDeque::new()),
137 fresh_connections: Mutex::new(VecDeque::new()),
138 active_connections: std::sync::atomic::AtomicUsize::new(0),
139 config,
140 }
141 }
142
143 async fn get_fresh_connection(&self) -> Result<LocalSocketStream> {
145 {
147 let mut fresh = self.fresh_connections.lock();
148 if let Some(stream) = fresh.pop_front() {
149 return Ok(stream);
150 }
151 }
152
153 let mut last_error = None;
155 for attempt in 0..2 {
156 if attempt > 0 {
158 tokio::time::sleep(Duration::from_millis(10)).await; }
160
161 match LocalSocketStream::connect(self.name.clone()).await {
162 Ok(stream) => {
163 debug!("Created fresh connection for PUT request");
164 return Ok(stream);
165 }
166 Err(e) => {
167 warn!("Fresh connection attempt {} failed: {}", attempt + 1, e);
168 last_error = Some(e);
169 }
170 }
171 }
172
173 match self.get_pooled_connection() {
175 Some(stream) => {
176 debug!("Falling back to pooled connection for PUT request");
177 Ok(stream)
178 }
179 None => Err(KodeBridgeError::connection(format!(
180 "Failed to get fresh connection and no pooled connections available: {}",
181 last_error
182 .map(|e| e.to_string())
183 .unwrap_or_else(|| "Unknown error".to_string())
184 ))),
185 }
186 }
187
188 async fn preheat_fresh_connections(&self, count: usize) {
190 let mut successful = 0;
191 for _ in 0..count {
192 match LocalSocketStream::connect(self.name.clone()).await {
193 Ok(stream) => {
194 let mut fresh = self.fresh_connections.lock();
195 fresh.push_back(stream);
196 successful += 1;
197 }
198 Err(_) => break,
199 }
200 }
201 if successful > 0 {
202 debug!("Preheated {} fresh connections", successful);
203 }
204 }
205
206 async fn create_connection(&self) -> Result<LocalSocketStream> {
207 let mut last_error = None;
208 let mut delay = self.config.retry_delay();
209 let max_delay = Duration::from_millis(200); for attempt in 0..self.config.max_retries {
212 if attempt > 0 {
213 tokio::time::sleep(delay).await;
215 delay = std::cmp::min(delay * 2, max_delay);
216 }
217
218 match LocalSocketStream::connect(self.name.clone()).await {
219 Ok(stream) => {
220 debug!("Created new connection on attempt {}", attempt + 1);
221 return Ok(stream);
222 }
223 Err(e) => {
224 warn!("Connection attempt {} failed: {}", attempt + 1, e);
225 last_error = Some(e);
226 }
227 }
228 }
229
230 Err(KodeBridgeError::connection(format!(
231 "Failed to get fresh connection and no pooled connections available: {}",
232 last_error
233 .map(|e| e.to_string())
234 .unwrap_or_else(|| "Unknown error".to_string())
235 )))
236 }
237
238 fn get_pooled_connection(&self) -> Option<LocalSocketStream> {
239 let mut connections = self.connections.lock();
240
241 let now = Instant::now();
243 while let Some((_, created_at)) = connections.front() {
244 if now.duration_since(*created_at) > self.config.max_idle_time() {
245 connections.pop_front();
246 } else {
247 break;
248 }
249 }
250
251 connections.pop_front().map(|(stream, _)| {
253 trace!("Reusing pooled connection, {} remaining", connections.len());
254 stream
255 })
256 }
257
258 fn return_connection(&self, stream: LocalSocketStream) {
259 let mut connections = self.connections.lock();
260
261 self.active_connections
263 .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
264
265 if connections.len() < self.config.max_size {
267 connections.push_back((stream, Instant::now()));
268 trace!("Returned connection to pool, {} total", connections.len());
269 } else {
270 trace!("Pool full, dropping connection");
271 }
272 }
273
274 async fn get_connection_with_timeout(&self) -> Result<LocalSocketStream> {
275 if let Some(stream) = self.get_pooled_connection() {
279 return Ok(stream);
280 }
281
282 let active_count = self
284 .active_connections
285 .load(std::sync::atomic::Ordering::Relaxed);
286 if active_count >= self.config.max_size {
287 return Err(KodeBridgeError::custom("Connection pool exhausted"));
289 }
290
291 let timeout = std::cmp::min(self.config.connection_timeout(), Duration::from_millis(500));
293 let permit = tokio::time::timeout(timeout, self.semaphore.acquire())
294 .await
295 .map_err(|_| KodeBridgeError::timeout(timeout.as_millis() as u64))?
296 .map_err(|_| KodeBridgeError::custom("Semaphore closed"))?;
297
298 if let Some(stream) = self.get_pooled_connection() {
300 permit.forget(); return Ok(stream);
302 }
303
304 self.active_connections
306 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
307
308 match self.create_connection().await {
310 Ok(stream) => {
311 permit.forget(); Ok(stream)
313 }
314 Err(e) => {
315 self.active_connections
317 .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
318 Err(e)
319 }
320 }
321 }
322
323 async fn get_fresh_connection_with_timeout(&self) -> Result<LocalSocketStream> {
325 let permit = tokio::time::timeout(Duration::from_millis(100), self.semaphore.acquire())
327 .await
328 .map_err(|_| KodeBridgeError::timeout(100))?
329 .map_err(|_| KodeBridgeError::custom("Semaphore closed"))?;
330
331 let stream = self.get_fresh_connection().await?;
333 permit.forget(); Ok(stream)
335 }
336}
337
338#[derive(Clone)]
340pub struct ConnectionPool {
341 inner: Arc<ConnectionPoolInner>,
342}
343
344impl ConnectionPool {
345 pub fn new(name: Name<'static>, config: PoolConfig) -> Self {
347 Self {
348 inner: Arc::new(ConnectionPoolInner::new(name, config)),
349 }
350 }
351
352 pub fn with_default_config(name: Name<'static>) -> Self {
354 Self::new(name, PoolConfig::default())
355 }
356
357 pub async fn get_connection(&self) -> Result<PooledConnection> {
359 let stream = self.inner.get_connection_with_timeout().await?;
360 Ok(PooledConnection::new(stream, self.inner.clone()))
361 }
362
363 pub async fn get_fresh_connection(&self) -> Result<PooledConnection> {
365 let stream = self.inner.get_fresh_connection_with_timeout().await?;
366 Ok(PooledConnection::new(stream, self.inner.clone()))
367 }
368
369 pub async fn preheat_for_puts(&self, count: usize) {
371 self.inner.preheat_fresh_connections(count).await;
372 }
373
374 pub async fn get_connections(&self, count: usize) -> Result<Vec<PooledConnection>> {
376 let mut connections = Vec::with_capacity(count);
377
378 let mut tasks = Vec::new();
380 for _ in 0..count {
381 let pool = self.clone();
382 tasks.push(tokio::spawn(async move { pool.get_connection().await }));
383 }
384
385 for task in tasks {
387 match task.await {
388 Ok(Ok(conn)) => connections.push(conn),
389 Ok(Err(e)) => return Err(e),
390 Err(e) => return Err(KodeBridgeError::custom(format!("Task failed: {}", e))),
391 }
392 }
393
394 Ok(connections)
395 }
396
397 pub fn stats(&self) -> PoolStats {
399 let connections = self.inner.connections.lock();
400 let active_count = self
401 .inner
402 .active_connections
403 .load(std::sync::atomic::Ordering::Relaxed);
404 PoolStats {
405 total_connections: connections.len(),
406 available_permits: self.inner.semaphore.available_permits(),
407 max_size: self.inner.config.max_size,
408 active_connections: active_count,
409 }
410 }
411
412 pub fn close(&self) {
414 let mut connections = self.inner.connections.lock();
415 connections.clear();
416 debug!("Closed all pooled connections");
417 }
418}
419
420#[derive(Debug, Clone)]
422pub struct PoolStats {
423 pub total_connections: usize,
424 pub available_permits: usize,
425 pub max_size: usize,
426 pub active_connections: usize,
427}
428
429impl std::fmt::Display for PoolStats {
430 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
431 write!(
432 f,
433 "Pool(connections: {}, active: {}, permits: {}, max: {})",
434 self.total_connections, self.active_connections, self.available_permits, self.max_size
435 )
436 }
437}