pub mod adaptive;
pub mod caching_pool;
mod config;
pub mod connection_wrapper;
pub mod keepalive;
pub use adaptive::{
AdaptiveConnectionPool, AdaptivePoolConfig, AdaptivePoolMetrics, AdaptivePooledConnection,
ConnectionCleanupCallback, ConnectionId,
};
pub use caching_pool::{CachingPool, CachingPoolConfig, CachingPoolStats, ConnectionGuard};
pub use config::{PoolConfig, PoolStatistics, PooledConnection};
pub use connection_wrapper::PooledConnection as WrappedPooledConnection;
pub use keepalive::{KeepAliveConfig, KeepAliveConnection, KeepAlivePool, KeepAliveStatistics};
use do_memory_core::{Error, Result};
use libsql::{Connection, Database};
use parking_lot::RwLock;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::Semaphore;
use tracing::{debug, info, warn};
pub struct ConnectionPool {
db: Arc<Database>,
config: PoolConfig,
semaphore: Arc<Semaphore>,
stats: Arc<RwLock<PoolStatistics>>,
}
impl ConnectionPool {
pub async fn new(db: Arc<Database>, config: PoolConfig) -> Result<Self> {
info!(
"Creating connection pool with max_connections={}",
config.max_connections
);
let semaphore = Arc::new(Semaphore::new(config.max_connections));
let stats = Arc::new(RwLock::new(PoolStatistics::default()));
let pool = Self {
db,
config,
semaphore,
stats,
};
pool.validate_database().await?;
info!("Connection pool created successfully");
Ok(pool)
}
async fn validate_database(&self) -> Result<()> {
let conn = self
.db
.connect()
.map_err(|e| Error::Storage(format!("Failed to connect to database: {}", e)))?;
conn.query("SELECT 1", ())
.await
.map_err(|e| Error::Storage(format!("Database validation failed: {}", e)))?;
Ok(())
}
async fn create_connection(&self) -> Result<Connection> {
let conn = self
.db
.connect()
.map_err(|e| Error::Storage(format!("Failed to create connection: {}", e)))?;
{
let mut stats = self.stats.write();
stats.total_created += 1;
}
Ok(conn)
}
pub async fn get(&self) -> Result<PooledConnection> {
let start = Instant::now();
let owned_permit_fut = Arc::clone(&self.semaphore).acquire_owned();
let permit = tokio::time::timeout(self.config.connection_timeout, owned_permit_fut)
.await
.map_err(|_| {
Error::Storage(format!(
"Connection pool timeout after {:?}: max {} connections in use",
self.config.connection_timeout, self.config.max_connections
))
})?
.map_err(|e| Error::Storage(format!("Failed to acquire connection permit: {}", e)))?;
let wait_time = start.elapsed();
let conn = self.create_connection().await?;
if self.config.enable_health_check {
if let Err(e) = self.validate_connection_health(&conn).await {
let mut stats = self.stats.write();
stats.total_health_checks_failed += 1;
return Err(e);
}
let mut stats = self.stats.write();
stats.total_health_checks_passed += 1;
}
{
let mut stats = self.stats.write();
stats.total_checkouts += 1;
stats.total_wait_time_ms += wait_time.as_millis() as u64;
stats.active_connections += 1;
stats.update_averages();
}
debug!(
"Connection acquired (wait: {:?}, active: {})",
wait_time,
self.stats.read().active_connections
);
Ok(PooledConnection {
connection: Some(conn),
_permit: permit,
stats: Arc::clone(&self.stats),
})
}
async fn validate_connection_health(&self, conn: &Connection) -> Result<()> {
tokio::time::timeout(self.config.health_check_timeout, conn.query("SELECT 1", ()))
.await
.map_err(|_| Error::Storage("Connection health check timeout".to_string()))?
.map_err(|e| Error::Storage(format!("Connection health check failed: {}", e)))?;
Ok(())
}
pub async fn statistics(&self) -> PoolStatistics {
self.stats.read().clone()
}
pub async fn utilization(&self) -> f32 {
let stats = self.stats.read();
if self.config.max_connections == 0 {
return 0.0;
}
stats.active_connections as f32 / self.config.max_connections as f32
}
pub async fn available_connections(&self) -> usize {
let stats = self.stats.read();
self.config
.max_connections
.saturating_sub(stats.active_connections)
}
pub async fn has_capacity(&self) -> bool {
self.available_connections().await > 0
}
pub async fn shutdown(&self) -> Result<()> {
info!("Shutting down connection pool");
let shutdown_timeout = Duration::from_secs(30);
let start = Instant::now();
while start.elapsed() < shutdown_timeout {
let active = self.stats.read().active_connections;
if active == 0 {
break;
}
debug!("Waiting for {} active connections to complete", active);
tokio::time::sleep(Duration::from_millis(100)).await;
}
let final_active = self.stats.read().active_connections;
if final_active > 0 {
warn!(
"Shutdown completed with {} active connections still in use",
final_active
);
} else {
info!("Connection pool shutdown complete");
}
Ok(())
}
}