use crate::errors::{KodeBridgeError, Result};
use interprocess::local_socket::tokio::prelude::LocalSocketStream;
use interprocess::local_socket::traits::tokio::Stream;
use interprocess::local_socket::Name;
use parking_lot::Mutex;
use std::collections::VecDeque;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::Semaphore;
use tracing::{debug, trace, warn};
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct PoolConfig {
pub max_size: usize,
pub min_idle: usize,
pub max_idle_time_ms: u64,
pub connection_timeout_ms: u64,
pub retry_delay_ms: u64,
pub max_retries: usize,
pub max_concurrent_requests: usize,
pub max_requests_per_second: Option<f64>,
}
impl Default for PoolConfig {
fn default() -> Self {
Self {
max_size: 64, min_idle: 8, max_idle_time_ms: 120_000, connection_timeout_ms: 3_000, retry_delay_ms: 10, max_retries: 2, max_concurrent_requests: 32, max_requests_per_second: Some(100.0), }
}
}
impl PoolConfig {
pub fn max_idle_time(&self) -> Duration {
Duration::from_millis(self.max_idle_time_ms)
}
pub fn connection_timeout(&self) -> Duration {
Duration::from_millis(self.connection_timeout_ms)
}
pub fn retry_delay(&self) -> Duration {
Duration::from_millis(self.retry_delay_ms)
}
}
pub struct PooledConnection {
inner: Option<LocalSocketStream>,
created_at: Instant,
last_used: Instant,
pool: Arc<ConnectionPoolInner>,
}
impl PooledConnection {
fn new(stream: LocalSocketStream, pool: Arc<ConnectionPoolInner>) -> Self {
let now = Instant::now();
Self {
inner: Some(stream),
created_at: now,
last_used: now,
pool,
}
}
pub fn stream(&mut self) -> Option<&mut LocalSocketStream> {
self.last_used = Instant::now();
self.inner.as_mut()
}
pub fn into_stream(mut self) -> Option<LocalSocketStream> {
self.inner.take()
}
pub fn is_valid(&self) -> bool {
self.inner.is_some() && self.last_used.elapsed() < self.pool.config.max_idle_time()
}
pub fn age(&self) -> Duration {
self.created_at.elapsed()
}
pub fn idle_time(&self) -> Duration {
self.last_used.elapsed()
}
}
impl Drop for PooledConnection {
fn drop(&mut self) {
if let Some(stream) = self.inner.take() {
self.pool.return_connection(stream);
}
}
}
struct ConnectionPoolInner {
name: Name<'static>,
config: PoolConfig,
connections: Mutex<VecDeque<(LocalSocketStream, Instant)>>,
semaphore: Semaphore,
fresh_connections: Mutex<VecDeque<LocalSocketStream>>,
active_connections: std::sync::atomic::AtomicUsize,
}
impl ConnectionPoolInner {
fn new(name: Name<'static>, config: PoolConfig) -> Self {
Self {
name,
semaphore: Semaphore::new(config.max_size),
connections: Mutex::new(VecDeque::new()),
fresh_connections: Mutex::new(VecDeque::new()),
active_connections: std::sync::atomic::AtomicUsize::new(0),
config,
}
}
async fn get_fresh_connection(&self) -> Result<LocalSocketStream> {
{
let mut fresh = self.fresh_connections.lock();
if let Some(stream) = fresh.pop_front() {
return Ok(stream);
}
}
let mut last_error = None;
for attempt in 0..2 {
if attempt > 0 {
tokio::time::sleep(Duration::from_millis(10)).await; }
match LocalSocketStream::connect(self.name.clone()).await {
Ok(stream) => {
debug!("Created fresh connection for PUT request");
return Ok(stream);
}
Err(e) => {
warn!("Fresh connection attempt {} failed: {}", attempt + 1, e);
last_error = Some(e);
}
}
}
match self.get_pooled_connection() {
Some(stream) => {
debug!("Falling back to pooled connection for PUT request");
Ok(stream)
}
None => Err(KodeBridgeError::connection(format!(
"Failed to get fresh connection and no pooled connections available: {}",
last_error
.map(|e| e.to_string())
.unwrap_or_else(|| "Unknown error".to_string())
))),
}
}
async fn preheat_fresh_connections(&self, count: usize) {
let mut successful = 0;
for _ in 0..count {
match LocalSocketStream::connect(self.name.clone()).await {
Ok(stream) => {
let mut fresh = self.fresh_connections.lock();
fresh.push_back(stream);
successful += 1;
}
Err(_) => break,
}
}
if successful > 0 {
debug!("Preheated {} fresh connections", successful);
}
}
async fn create_connection(&self) -> Result<LocalSocketStream> {
let mut last_error = None;
let mut delay = self.config.retry_delay();
let max_delay = Duration::from_millis(200);
for attempt in 0..self.config.max_retries {
if attempt > 0 {
tokio::time::sleep(delay).await;
delay = std::cmp::min(delay * 2, max_delay);
}
match LocalSocketStream::connect(self.name.clone()).await {
Ok(stream) => {
debug!("Created new connection on attempt {}", attempt + 1);
return Ok(stream);
}
Err(e) => {
warn!("Connection attempt {} failed: {}", attempt + 1, e);
last_error = Some(e);
}
}
}
Err(KodeBridgeError::connection(format!(
"Failed to get fresh connection and no pooled connections available: {}",
last_error
.map(|e| e.to_string())
.unwrap_or_else(|| "Unknown error".to_string())
)))
}
fn get_pooled_connection(&self) -> Option<LocalSocketStream> {
let mut connections = self.connections.lock();
let now = Instant::now();
while let Some((_, created_at)) = connections.front() {
if now.duration_since(*created_at) > self.config.max_idle_time() {
connections.pop_front();
} else {
break;
}
}
connections.pop_front().map(|(stream, _)| {
trace!("Reusing pooled connection, {} remaining", connections.len());
stream
})
}
fn return_connection(&self, stream: LocalSocketStream) {
let mut connections = self.connections.lock();
self.active_connections
.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
if connections.len() < self.config.max_size {
connections.push_back((stream, Instant::now()));
trace!("Returned connection to pool, {} total", connections.len());
} else {
trace!("Pool full, dropping connection");
}
}
async fn get_connection_with_timeout(&self) -> Result<LocalSocketStream> {
if let Some(stream) = self.get_pooled_connection() {
return Ok(stream);
}
let active_count = self
.active_connections
.load(std::sync::atomic::Ordering::Relaxed);
if active_count >= self.config.max_size {
return Err(KodeBridgeError::custom("Connection pool exhausted"));
}
let timeout = std::cmp::min(self.config.connection_timeout(), Duration::from_millis(500));
let permit = tokio::time::timeout(timeout, self.semaphore.acquire())
.await
.map_err(|_| KodeBridgeError::timeout(timeout.as_millis() as u64))?
.map_err(|_| KodeBridgeError::custom("Semaphore closed"))?;
if let Some(stream) = self.get_pooled_connection() {
permit.forget(); return Ok(stream);
}
self.active_connections
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
match self.create_connection().await {
Ok(stream) => {
permit.forget(); Ok(stream)
}
Err(e) => {
self.active_connections
.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
Err(e)
}
}
}
async fn get_fresh_connection_with_timeout(&self) -> Result<LocalSocketStream> {
let permit = tokio::time::timeout(Duration::from_millis(100), self.semaphore.acquire())
.await
.map_err(|_| KodeBridgeError::timeout(100))?
.map_err(|_| KodeBridgeError::custom("Semaphore closed"))?;
let stream = self.get_fresh_connection().await?;
permit.forget(); Ok(stream)
}
}
#[derive(Clone)]
pub struct ConnectionPool {
inner: Arc<ConnectionPoolInner>,
}
impl ConnectionPool {
pub fn new(name: Name<'static>, config: PoolConfig) -> Self {
Self {
inner: Arc::new(ConnectionPoolInner::new(name, config)),
}
}
pub fn with_default_config(name: Name<'static>) -> Self {
Self::new(name, PoolConfig::default())
}
pub async fn get_connection(&self) -> Result<PooledConnection> {
let stream = self.inner.get_connection_with_timeout().await?;
Ok(PooledConnection::new(stream, self.inner.clone()))
}
pub async fn get_fresh_connection(&self) -> Result<PooledConnection> {
let stream = self.inner.get_fresh_connection_with_timeout().await?;
Ok(PooledConnection::new(stream, self.inner.clone()))
}
pub async fn preheat_for_puts(&self, count: usize) {
self.inner.preheat_fresh_connections(count).await;
}
pub async fn get_connections(&self, count: usize) -> Result<Vec<PooledConnection>> {
let mut connections = Vec::with_capacity(count);
let mut tasks = Vec::new();
for _ in 0..count {
let pool = self.clone();
tasks.push(tokio::spawn(async move { pool.get_connection().await }));
}
for task in tasks {
match task.await {
Ok(Ok(conn)) => connections.push(conn),
Ok(Err(e)) => return Err(e),
Err(e) => return Err(KodeBridgeError::custom(format!("Task failed: {}", e))),
}
}
Ok(connections)
}
pub fn stats(&self) -> PoolStats {
let connections = self.inner.connections.lock();
let active_count = self
.inner
.active_connections
.load(std::sync::atomic::Ordering::Relaxed);
PoolStats {
total_connections: connections.len(),
available_permits: self.inner.semaphore.available_permits(),
max_size: self.inner.config.max_size,
active_connections: active_count,
}
}
pub fn close(&self) {
let mut connections = self.inner.connections.lock();
connections.clear();
debug!("Closed all pooled connections");
}
}
#[derive(Debug, Clone)]
pub struct PoolStats {
pub total_connections: usize,
pub available_permits: usize,
pub max_size: usize,
pub active_connections: usize,
}
impl std::fmt::Display for PoolStats {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Pool(connections: {}, active: {}, permits: {}, max: {})",
self.total_connections, self.active_connections, self.available_permits, self.max_size
)
}
}