1use crate::errors::{KodeBridgeError, Result};
2use interprocess::local_socket::tokio::prelude::LocalSocketStream;
3use interprocess::local_socket::traits::tokio::Stream;
4use interprocess::local_socket::Name;
5use parking_lot::Mutex;
6use std::collections::VecDeque;
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9use tokio::sync::Semaphore;
10use tracing::{debug, trace, warn};
11
12#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
14pub struct PoolConfig {
15 pub max_size: usize,
17 pub min_idle: usize,
19 pub max_idle_time_ms: u64,
21 pub connection_timeout_ms: u64,
23 pub retry_delay_ms: u64,
25 pub max_retries: usize,
27 pub max_concurrent_requests: usize,
29 pub max_requests_per_second: Option<f64>,
31}
32
33impl Default for PoolConfig {
34 fn default() -> Self {
35 Self {
36 max_size: 20, min_idle: 5, max_idle_time_ms: 300_000, connection_timeout_ms: 10_000, retry_delay_ms: 50, max_retries: 5, max_concurrent_requests: 8,
43 max_requests_per_second: Some(10.0),
44 }
45 }
46}
47
48impl PoolConfig {
49 pub fn max_idle_time(&self) -> Duration {
51 Duration::from_millis(self.max_idle_time_ms)
52 }
53
54 pub fn connection_timeout(&self) -> Duration {
56 Duration::from_millis(self.connection_timeout_ms)
57 }
58
59 pub fn retry_delay(&self) -> Duration {
61 Duration::from_millis(self.retry_delay_ms)
62 }
63}
64
65pub struct PooledConnection {
67 inner: Option<LocalSocketStream>,
68 created_at: Instant,
69 last_used: Instant,
70 pool: Arc<ConnectionPoolInner>,
71}
72
73impl PooledConnection {
74 fn new(stream: LocalSocketStream, pool: Arc<ConnectionPoolInner>) -> Self {
75 let now = Instant::now();
76 Self {
77 inner: Some(stream),
78 created_at: now,
79 last_used: now,
80 pool,
81 }
82 }
83
84 pub fn stream(&mut self) -> Option<&mut LocalSocketStream> {
86 self.last_used = Instant::now();
87 self.inner.as_mut()
88 }
89
90 pub fn into_stream(mut self) -> Option<LocalSocketStream> {
92 self.inner.take()
93 }
94
95 pub fn is_valid(&self) -> bool {
97 self.inner.is_some() && self.last_used.elapsed() < self.pool.config.max_idle_time()
98 }
99
100 pub fn age(&self) -> Duration {
102 self.created_at.elapsed()
103 }
104
105 pub fn idle_time(&self) -> Duration {
107 self.last_used.elapsed()
108 }
109}
110
111impl Drop for PooledConnection {
112 fn drop(&mut self) {
113 if let Some(stream) = self.inner.take() {
114 self.pool.return_connection(stream);
115 }
116 }
117}
118
119struct ConnectionPoolInner {
121 name: Name<'static>,
122 config: PoolConfig,
123 connections: Mutex<VecDeque<(LocalSocketStream, Instant)>>,
124 semaphore: Semaphore,
125}
126
127impl ConnectionPoolInner {
128 fn new(name: Name<'static>, config: PoolConfig) -> Self {
129 Self {
130 name,
131 semaphore: Semaphore::new(config.max_size),
132 connections: Mutex::new(VecDeque::new()),
133 config,
134 }
135 }
136
137 async fn create_connection(&self) -> Result<LocalSocketStream> {
138 let mut last_error = None;
139 let mut delay = self.config.retry_delay();
140
141 for attempt in 0..self.config.max_retries {
142 if attempt > 0 {
143 tokio::time::sleep(delay).await;
145 delay = std::cmp::min(delay * 2, Duration::from_millis(1000));
146 }
147
148 match LocalSocketStream::connect(self.name.clone()).await {
149 Ok(stream) => {
150 debug!("Created new connection on attempt {}", attempt + 1);
151 return Ok(stream);
152 }
153 Err(e) => {
154 warn!("Connection attempt {} failed: {}", attempt + 1, e);
155 last_error = Some(e);
156 }
157 }
158 }
159
160 Err(KodeBridgeError::connection(format!(
161 "Failed to create connection after {} attempts: {}",
162 self.config.max_retries,
163 last_error
164 .map(|e| e.to_string())
165 .unwrap_or_else(|| "Unknown error".to_string())
166 )))
167 }
168
169 fn get_pooled_connection(&self) -> Option<LocalSocketStream> {
170 let mut connections = self.connections.lock();
171
172 let now = Instant::now();
174 while let Some((_, created_at)) = connections.front() {
175 if now.duration_since(*created_at) > self.config.max_idle_time() {
176 connections.pop_front();
177 } else {
178 break;
179 }
180 }
181
182 connections.pop_front().map(|(stream, _)| {
184 trace!("Reusing pooled connection, {} remaining", connections.len());
185 stream
186 })
187 }
188
189 fn return_connection(&self, stream: LocalSocketStream) {
190 let mut connections = self.connections.lock();
191
192 if connections.len() < self.config.max_size {
194 connections.push_back((stream, Instant::now()));
195 trace!("Returned connection to pool, {} total", connections.len());
196 } else {
197 trace!("Pool full, dropping connection");
198 }
199 }
200
201 async fn get_connection_with_timeout(&self) -> Result<LocalSocketStream> {
202 let permit =
204 tokio::time::timeout(self.config.connection_timeout(), self.semaphore.acquire())
205 .await
206 .map_err(|_| {
207 KodeBridgeError::timeout(self.config.connection_timeout().as_millis() as u64)
208 })?
209 .map_err(|_| KodeBridgeError::custom("Semaphore closed"))?;
210
211 if let Some(stream) = self.get_pooled_connection() {
213 permit.forget(); return Ok(stream);
215 }
216
217 let stream = self.create_connection().await?;
219 permit.forget(); Ok(stream)
221 }
222}
223
224#[derive(Clone)]
226pub struct ConnectionPool {
227 inner: Arc<ConnectionPoolInner>,
228}
229
230impl ConnectionPool {
231 pub fn new(name: Name<'static>, config: PoolConfig) -> Self {
233 Self {
234 inner: Arc::new(ConnectionPoolInner::new(name, config)),
235 }
236 }
237
238 pub fn with_default_config(name: Name<'static>) -> Self {
240 Self::new(name, PoolConfig::default())
241 }
242
243 pub async fn get_connection(&self) -> Result<PooledConnection> {
245 let stream = self.inner.get_connection_with_timeout().await?;
246 Ok(PooledConnection::new(stream, self.inner.clone()))
247 }
248
249 pub async fn get_connections(&self, count: usize) -> Result<Vec<PooledConnection>> {
251 let mut connections = Vec::with_capacity(count);
252
253 let mut tasks = Vec::new();
255 for _ in 0..count {
256 let pool = self.clone();
257 tasks.push(tokio::spawn(async move { pool.get_connection().await }));
258 }
259
260 for task in tasks {
262 match task.await {
263 Ok(Ok(conn)) => connections.push(conn),
264 Ok(Err(e)) => return Err(e),
265 Err(e) => return Err(KodeBridgeError::custom(format!("Task failed: {}", e))),
266 }
267 }
268
269 Ok(connections)
270 }
271
272 pub fn stats(&self) -> PoolStats {
274 let connections = self.inner.connections.lock();
275 PoolStats {
276 total_connections: connections.len(),
277 available_permits: self.inner.semaphore.available_permits(),
278 max_size: self.inner.config.max_size,
279 }
280 }
281
282 pub fn close(&self) {
284 let mut connections = self.inner.connections.lock();
285 connections.clear();
286 debug!("Closed all pooled connections");
287 }
288}
289
290#[derive(Debug, Clone)]
292pub struct PoolStats {
293 pub total_connections: usize,
294 pub available_permits: usize,
295 pub max_size: usize,
296}
297
298impl std::fmt::Display for PoolStats {
299 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
300 write!(
301 f,
302 "Pool(connections: {}, permits: {}, max: {})",
303 self.total_connections, self.available_permits, self.max_size
304 )
305 }
306}