1use crate::errors::{KodeBridgeError, Result};
2use interprocess::local_socket::tokio::prelude::LocalSocketStream;
3use interprocess::local_socket::traits::tokio::Stream as _;
4use interprocess::local_socket::Name;
5use parking_lot::Mutex;
6use std::collections::VecDeque;
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9use tokio::sync::{OwnedSemaphorePermit, Semaphore};
10use tracing::{debug, trace, warn};
11
12#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
14pub struct PoolConfig {
15 pub max_size: usize,
17 pub min_idle: usize,
19 pub max_idle_time_ms: u64,
21 pub connection_timeout_ms: u64,
23 pub retry_delay_ms: u64,
25 pub max_retries: usize,
27 pub max_concurrent_requests: usize,
29 pub max_requests_per_second: Option<f64>,
31}
32
33impl Default for PoolConfig {
34 fn default() -> Self {
35 Self {
36 max_size: 64, min_idle: 8, max_idle_time_ms: 120_000, connection_timeout_ms: 3_000, retry_delay_ms: 10, max_retries: 2, max_concurrent_requests: 32,
43 max_requests_per_second: None,
44 }
45 }
46}
47
48impl PoolConfig {
49 pub const fn max_idle_time(&self) -> Duration {
51 Duration::from_millis(self.max_idle_time_ms)
52 }
53
54 pub const fn connection_timeout(&self) -> Duration {
56 Duration::from_millis(self.connection_timeout_ms)
57 }
58
59 pub const fn retry_delay(&self) -> Duration {
61 Duration::from_millis(self.retry_delay_ms)
62 }
63}
64
65pub struct PooledConnection {
67 inner: Option<LocalSocketStream>,
68 permit: Option<OwnedSemaphorePermit>,
69 created_at: Instant,
70 last_used: Instant,
71 reusable: bool,
72 pool: Arc<ConnectionPoolInner>,
73}
74
75impl PooledConnection {
76 fn new(stream: LocalSocketStream, permit: OwnedSemaphorePermit, pool: Arc<ConnectionPoolInner>) -> Self {
77 let now = Instant::now();
78 Self {
79 inner: Some(stream),
80 permit: Some(permit),
81 created_at: now,
82 last_used: now,
83 reusable: true,
84 pool,
85 }
86 }
87
88 pub fn stream(&mut self) -> Option<&mut LocalSocketStream> {
90 self.last_used = Instant::now();
91 self.inner.as_mut()
92 }
93
94 pub fn into_stream(mut self) -> Option<LocalSocketStream> {
96 self.reusable = false;
97 if let Some(permit) = self.permit.take() {
98 self.pool
99 .active_connections
100 .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
101 drop(permit);
102 }
103 self.inner.take()
104 }
105
106 pub const fn invalidate(&mut self) {
108 self.reusable = false;
109 }
110
111 pub fn is_valid(&self) -> bool {
113 self.inner.is_some() && self.last_used.elapsed() < self.pool.config.max_idle_time()
114 }
115
116 pub fn age(&self) -> Duration {
118 self.created_at.elapsed()
119 }
120
121 pub fn idle_time(&self) -> Duration {
123 self.last_used.elapsed()
124 }
125}
126
127impl Drop for PooledConnection {
128 fn drop(&mut self) {
129 if let Some(stream) = self.inner.take() {
130 if let Some(permit) = self.permit.take() {
131 self.pool.return_connection(stream, permit, self.reusable);
132 }
133 }
134 }
135}
136
137struct IdleConnection {
138 stream: LocalSocketStream,
139 last_used: Instant,
140 permit: OwnedSemaphorePermit,
141}
142
143struct ConnectionPoolInner {
145 name: Name<'static>,
146 config: PoolConfig,
147 connections: Mutex<VecDeque<IdleConnection>>,
148 semaphore: Arc<Semaphore>,
149 active_connections: std::sync::atomic::AtomicUsize,
151}
152
153impl ConnectionPoolInner {
154 fn new(name: Name<'static>, config: PoolConfig) -> Self {
155 Self {
156 name,
157 semaphore: Arc::new(Semaphore::new(config.max_size)),
158 connections: Mutex::new(VecDeque::new()),
159 active_connections: std::sync::atomic::AtomicUsize::new(0),
160 config,
161 }
162 }
163
164 async fn get_fresh_connection(&self) -> Result<LocalSocketStream> {
166 let mut last_error = None;
167 for attempt in 0..2 {
168 if attempt > 0 {
169 tokio::time::sleep(Duration::from_millis(10)).await;
170 }
171
172 match LocalSocketStream::connect(self.name.clone()).await {
173 Ok(stream) => {
174 debug!("Created fresh connection for PUT request");
175 return Ok(stream);
176 }
177 Err(e) => {
178 warn!("Fresh connection attempt {} failed: {}", attempt + 1, e);
179 last_error = Some(e);
180 }
181 }
182 }
183
184 Err(KodeBridgeError::connection(format!(
185 "Failed to create fresh connection: {}",
186 last_error
187 .map(|e| e.to_string())
188 .unwrap_or_else(|| "Unknown error".to_string())
189 )))
190 }
191
192 async fn preheat_fresh_connections(&self, count: usize) {
194 let mut successful = 0;
195 for _ in 0..count {
196 let Ok(permit) = Arc::clone(&self.semaphore).try_acquire_owned() else {
197 break;
198 };
199
200 match LocalSocketStream::connect(self.name.clone()).await {
201 Ok(stream) => {
202 self.connections.lock().push_back(IdleConnection {
203 stream,
204 last_used: Instant::now(),
205 permit,
206 });
207 successful += 1;
208 }
209 Err(_) => {
210 drop(permit);
211 break;
212 }
213 }
214 }
215 if successful > 0 {
216 debug!("Preheated {} fresh connections", successful);
217 }
218 }
219
220 async fn create_connection(&self) -> Result<LocalSocketStream> {
221 let mut last_error = None;
222 let mut delay = self.config.retry_delay();
223 let max_delay = Duration::from_millis(200); for attempt in 0..self.config.max_retries {
226 if attempt > 0 {
227 tokio::time::sleep(delay).await;
229 delay = std::cmp::min(delay * 2, max_delay);
230 }
231
232 match LocalSocketStream::connect(self.name.clone()).await {
233 Ok(stream) => {
234 debug!("Created new connection on attempt {}", attempt + 1);
235 return Ok(stream);
236 }
237 Err(e) => {
238 warn!("Connection attempt {} failed: {}", attempt + 1, e);
239 last_error = Some(e);
240 }
241 }
242 }
243
244 Err(KodeBridgeError::connection(format!(
245 "Failed to get fresh connection and no pooled connections available: {}",
246 last_error
247 .map(|e| e.to_string())
248 .unwrap_or_else(|| "Unknown error".to_string())
249 )))
250 }
251
252 fn get_pooled_connection(&self) -> Option<(LocalSocketStream, OwnedSemaphorePermit)> {
253 let mut connections = self.connections.lock();
254
255 let now = Instant::now();
256 while let Some(idle) = connections.front() {
257 if now.duration_since(idle.last_used) > self.config.max_idle_time() {
258 connections.pop_front();
259 } else {
260 break;
261 }
262 }
263
264 connections.pop_front().map(|idle| {
265 trace!("Reusing pooled connection, {} remaining", connections.len());
266 (idle.stream, idle.permit)
267 })
268 }
269
270 fn return_connection(&self, stream: LocalSocketStream, permit: OwnedSemaphorePermit, reusable: bool) {
271 self.active_connections
272 .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
273
274 if !reusable {
275 trace!("Dropping broken pooled connection");
276 return;
277 }
278
279 let (kept, pool_size) = {
280 let mut connections = self.connections.lock();
281
282 if connections.len() < self.config.max_size {
283 connections.push_back(IdleConnection {
284 stream,
285 last_used: Instant::now(),
286 permit,
287 });
288 (true, connections.len())
289 } else {
290 (false, connections.len())
291 }
292 };
293
294 if kept {
295 trace!("Returned connection to pool, {} total", pool_size);
296 } else {
297 trace!("Pool full, dropping connection");
298 }
299 }
300}
301
302#[derive(Clone)]
304pub struct ConnectionPool {
305 inner: Arc<ConnectionPoolInner>,
306}
307
308impl ConnectionPool {
309 pub fn new(name: Name<'static>, config: PoolConfig) -> Self {
311 Self {
312 inner: Arc::new(ConnectionPoolInner::new(name, config)),
313 }
314 }
315
316 pub fn with_default_config(name: Name<'static>) -> Self {
318 Self::new(name, PoolConfig::default())
319 }
320
321 pub async fn get_connection(&self) -> Result<PooledConnection> {
323 if let Some((stream, permit)) = self.inner.get_pooled_connection() {
324 self.inner
325 .active_connections
326 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
327 return Ok(PooledConnection::new(stream, permit, Arc::clone(&self.inner)));
328 }
329
330 let timeout = self.inner.config.connection_timeout();
331 let permit = tokio::time::timeout(timeout, Arc::clone(&self.inner.semaphore).acquire_owned())
332 .await
333 .map_err(|_| KodeBridgeError::timeout(timeout.as_millis() as u64))?
334 .map_err(|_| KodeBridgeError::custom("Semaphore closed"))?;
335
336 if let Some((stream, pooled_permit)) = self.inner.get_pooled_connection() {
337 drop(permit);
338 self.inner
339 .active_connections
340 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
341 return Ok(PooledConnection::new(stream, pooled_permit, Arc::clone(&self.inner)));
342 }
343
344 match self.inner.create_connection().await {
345 Ok(stream) => {
346 self.inner
347 .active_connections
348 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
349 Ok(PooledConnection::new(stream, permit, Arc::clone(&self.inner)))
350 }
351 Err(e) => Err(e),
352 }
353 }
354
355 pub async fn get_fresh_connection(&self) -> Result<PooledConnection> {
357 let permit = tokio::time::timeout(
358 Duration::from_millis(100),
359 Arc::clone(&self.inner.semaphore).acquire_owned(),
360 )
361 .await
362 .map_err(|_| KodeBridgeError::timeout(100))?
363 .map_err(|_| KodeBridgeError::custom("Semaphore closed"))?;
364
365 match self.inner.get_fresh_connection().await {
366 Ok(stream) => {
367 self.inner
368 .active_connections
369 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
370 Ok(PooledConnection::new(stream, permit, Arc::clone(&self.inner)))
371 }
372 Err(e) => Err(e),
373 }
374 }
375
376 pub async fn preheat_for_puts(&self, count: usize) {
378 self.inner.preheat_fresh_connections(count).await;
379 }
380
381 pub async fn get_connections(&self, count: usize) -> Result<Vec<PooledConnection>> {
383 let mut connections = Vec::with_capacity(count);
384
385 let mut tasks = Vec::new();
387 for _ in 0..count {
388 let pool = self.clone();
389 tasks.push(tokio::spawn(async move { pool.get_connection().await }));
390 }
391
392 for task in tasks {
394 match task.await {
395 Ok(Ok(conn)) => connections.push(conn),
396 Ok(Err(e)) => return Err(e),
397 Err(e) => return Err(KodeBridgeError::custom(format!("Task failed: {}", e))),
398 }
399 }
400
401 Ok(connections)
402 }
403
404 pub fn stats(&self) -> PoolStats {
406 let connections = self.inner.connections.lock();
407 let active_count = self
408 .inner
409 .active_connections
410 .load(std::sync::atomic::Ordering::Relaxed);
411 PoolStats {
412 total_connections: connections.len() + active_count,
413 available_permits: self.inner.semaphore.available_permits(),
414 max_size: self.inner.config.max_size,
415 active_connections: active_count,
416 }
417 }
418
419 pub fn close(&self) {
421 self.inner.connections.lock().clear();
422 debug!("Closed all pooled connections");
423 }
424}
425
426#[derive(Debug, Clone)]
428pub struct PoolStats {
429 pub total_connections: usize,
430 pub available_permits: usize,
431 pub max_size: usize,
432 pub active_connections: usize,
433}
434
435impl std::fmt::Display for PoolStats {
436 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
437 write!(
438 f,
439 "Pool(connections: {}, active: {}, permits: {}, max: {})",
440 self.total_connections, self.active_connections, self.available_permits, self.max_size
441 )
442 }
443}