1use crate::errors::{KodeBridgeError, Result};
2use interprocess::local_socket::Name;
3use interprocess::local_socket::tokio::prelude::LocalSocketStream;
4use interprocess::local_socket::traits::tokio::Stream;
5use parking_lot::Mutex;
6use std::collections::VecDeque;
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9use tokio::sync::Semaphore;
10use tracing::{debug, trace, warn};
11
12#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
14pub struct PoolConfig {
15 pub max_size: usize,
17 pub min_idle: usize,
19 pub max_idle_time_ms: u64,
21 pub connection_timeout_ms: u64,
23 pub retry_delay_ms: u64,
25 pub max_retries: usize,
27}
28
29impl Default for PoolConfig {
30 fn default() -> Self {
31 Self {
32 max_size: 10,
33 min_idle: 2,
34 max_idle_time_ms: 300_000, connection_timeout_ms: 30_000,
36 retry_delay_ms: 100,
37 max_retries: 3,
38 }
39 }
40}
41
42impl PoolConfig {
43 pub fn max_idle_time(&self) -> Duration {
45 Duration::from_millis(self.max_idle_time_ms)
46 }
47
48 pub fn connection_timeout(&self) -> Duration {
50 Duration::from_millis(self.connection_timeout_ms)
51 }
52
53 pub fn retry_delay(&self) -> Duration {
55 Duration::from_millis(self.retry_delay_ms)
56 }
57}
58
59pub struct PooledConnection {
61 inner: Option<LocalSocketStream>,
62 created_at: Instant,
63 last_used: Instant,
64 pool: Arc<ConnectionPoolInner>,
65}
66
67impl PooledConnection {
68 fn new(stream: LocalSocketStream, pool: Arc<ConnectionPoolInner>) -> Self {
69 let now = Instant::now();
70 Self {
71 inner: Some(stream),
72 created_at: now,
73 last_used: now,
74 pool,
75 }
76 }
77
78 pub fn stream(&mut self) -> Option<&mut LocalSocketStream> {
80 self.last_used = Instant::now();
81 self.inner.as_mut()
82 }
83
84 pub fn into_stream(mut self) -> Option<LocalSocketStream> {
86 self.inner.take()
87 }
88
89 pub fn is_valid(&self) -> bool {
91 self.inner.is_some() && self.last_used.elapsed() < self.pool.config.max_idle_time()
92 }
93
94 pub fn age(&self) -> Duration {
96 self.created_at.elapsed()
97 }
98
99 pub fn idle_time(&self) -> Duration {
101 self.last_used.elapsed()
102 }
103}
104
105impl Drop for PooledConnection {
106 fn drop(&mut self) {
107 if let Some(stream) = self.inner.take() {
108 self.pool.return_connection(stream);
109 }
110 }
111}
112
113struct ConnectionPoolInner {
115 name: Name<'static>,
116 config: PoolConfig,
117 connections: Mutex<VecDeque<(LocalSocketStream, Instant)>>,
118 semaphore: Semaphore,
119}
120
121impl ConnectionPoolInner {
122 fn new(name: Name<'static>, config: PoolConfig) -> Self {
123 Self {
124 name,
125 semaphore: Semaphore::new(config.max_size),
126 connections: Mutex::new(VecDeque::new()),
127 config,
128 }
129 }
130
131 async fn create_connection(&self) -> Result<LocalSocketStream> {
132 let mut last_error = None;
133
134 for attempt in 0..self.config.max_retries {
135 if attempt > 0 {
136 tokio::time::sleep(self.config.retry_delay()).await;
137 }
138
139 match LocalSocketStream::connect(self.name.clone()).await {
140 Ok(stream) => {
141 debug!("Created new connection on attempt {}", attempt + 1);
142 return Ok(stream);
143 }
144 Err(e) => {
145 warn!("Connection attempt {} failed: {}", attempt + 1, e);
146 last_error = Some(e);
147 }
148 }
149 }
150
151 Err(KodeBridgeError::connection(format!(
152 "Failed to create connection after {} attempts: {}",
153 self.config.max_retries,
154 last_error.unwrap()
155 )))
156 }
157
158 fn get_pooled_connection(&self) -> Option<LocalSocketStream> {
159 let mut connections = self.connections.lock();
160
161 let now = Instant::now();
163 while let Some((_, created_at)) = connections.front() {
164 if now.duration_since(*created_at) > self.config.max_idle_time() {
165 connections.pop_front();
166 } else {
167 break;
168 }
169 }
170
171 connections.pop_front().map(|(stream, _)| {
173 trace!("Reusing pooled connection, {} remaining", connections.len());
174 stream
175 })
176 }
177
178 fn return_connection(&self, stream: LocalSocketStream) {
179 let mut connections = self.connections.lock();
180
181 if connections.len() < self.config.max_size {
183 connections.push_back((stream, Instant::now()));
184 trace!("Returned connection to pool, {} total", connections.len());
185 } else {
186 trace!("Pool full, dropping connection");
187 }
188 }
189
190 async fn get_connection_with_timeout(&self) -> Result<LocalSocketStream> {
191 let permit =
193 tokio::time::timeout(self.config.connection_timeout(), self.semaphore.acquire())
194 .await
195 .map_err(|_| {
196 KodeBridgeError::timeout(self.config.connection_timeout().as_millis() as u64)
197 })?
198 .map_err(|_| KodeBridgeError::custom("Semaphore closed"))?;
199
200 if let Some(stream) = self.get_pooled_connection() {
202 permit.forget(); return Ok(stream);
204 }
205
206 let stream = self.create_connection().await?;
208 permit.forget(); Ok(stream)
210 }
211}
212
213#[derive(Clone)]
215pub struct ConnectionPool {
216 inner: Arc<ConnectionPoolInner>,
217}
218
219impl ConnectionPool {
220 pub fn new(name: Name<'static>, config: PoolConfig) -> Self {
222 Self {
223 inner: Arc::new(ConnectionPoolInner::new(name, config)),
224 }
225 }
226
227 pub fn with_default_config(name: Name<'static>) -> Self {
229 Self::new(name, PoolConfig::default())
230 }
231
232 pub async fn get_connection(&self) -> Result<PooledConnection> {
234 let stream = self.inner.get_connection_with_timeout().await?;
235 Ok(PooledConnection::new(stream, self.inner.clone()))
236 }
237
238 pub fn stats(&self) -> PoolStats {
240 let connections = self.inner.connections.lock();
241 PoolStats {
242 total_connections: connections.len(),
243 available_permits: self.inner.semaphore.available_permits(),
244 max_size: self.inner.config.max_size,
245 }
246 }
247
248 pub fn close(&self) {
250 let mut connections = self.inner.connections.lock();
251 connections.clear();
252 debug!("Closed all pooled connections");
253 }
254}
255
256#[derive(Debug, Clone)]
258pub struct PoolStats {
259 pub total_connections: usize,
260 pub available_permits: usize,
261 pub max_size: usize,
262}
263
264impl std::fmt::Display for PoolStats {
265 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
266 write!(
267 f,
268 "Pool(connections: {}, permits: {}, max: {})",
269 self.total_connections, self.available_permits, self.max_size
270 )
271 }
272}