do_memory_storage_turso/pool/
mod.rs1pub mod adaptive;
29pub mod caching_pool;
30mod config;
31pub mod connection_wrapper;
32pub mod keepalive;
33
34pub use adaptive::{
35 AdaptiveConnectionPool, AdaptivePoolConfig, AdaptivePoolMetrics, AdaptivePooledConnection,
36 ConnectionCleanupCallback, ConnectionId,
37};
38pub use caching_pool::{CachingPool, CachingPoolConfig, CachingPoolStats, ConnectionGuard};
39pub use config::{PoolConfig, PoolStatistics, PooledConnection};
40pub use connection_wrapper::PooledConnection as WrappedPooledConnection;
41pub use keepalive::{KeepAliveConfig, KeepAliveConnection, KeepAlivePool, KeepAliveStatistics};
42
43use do_memory_core::{Error, Result};
44use libsql::{Connection, Database};
45use parking_lot::RwLock;
46use std::sync::Arc;
47use std::time::{Duration, Instant};
48use tokio::sync::Semaphore;
49use tracing::{debug, info, warn};
50
51pub struct ConnectionPool {
59 db: Arc<Database>,
60 config: PoolConfig,
61 semaphore: Arc<Semaphore>,
62 stats: Arc<RwLock<PoolStatistics>>,
63}
64
65impl ConnectionPool {
66 pub async fn new(db: Arc<Database>, config: PoolConfig) -> Result<Self> {
88 info!(
89 "Creating connection pool with max_connections={}",
90 config.max_connections
91 );
92
93 let semaphore = Arc::new(Semaphore::new(config.max_connections));
95 let stats = Arc::new(RwLock::new(PoolStatistics::default()));
96
97 let pool = Self {
98 db,
99 config,
100 semaphore,
101 stats,
102 };
103
104 pool.validate_database().await?;
106
107 info!("Connection pool created successfully");
108 Ok(pool)
109 }
110
111 async fn validate_database(&self) -> Result<()> {
113 let conn = self
114 .db
115 .connect()
116 .map_err(|e| Error::Storage(format!("Failed to connect to database: {}", e)))?;
117
118 conn.query("SELECT 1", ())
119 .await
120 .map_err(|e| Error::Storage(format!("Database validation failed: {}", e)))?;
121
122 Ok(())
123 }
124
125 async fn create_connection(&self) -> Result<Connection> {
127 let conn = self
128 .db
129 .connect()
130 .map_err(|e| Error::Storage(format!("Failed to create connection: {}", e)))?;
131
132 {
134 let mut stats = self.stats.write();
135 stats.total_created += 1;
136 }
137
138 Ok(conn)
139 }
140
141 pub async fn get(&self) -> Result<PooledConnection> {
156 let start = Instant::now();
157
158 let owned_permit_fut = Arc::clone(&self.semaphore).acquire_owned();
160 let permit = tokio::time::timeout(self.config.connection_timeout, owned_permit_fut)
161 .await
162 .map_err(|_| {
163 Error::Storage(format!(
164 "Connection pool timeout after {:?}: max {} connections in use",
165 self.config.connection_timeout, self.config.max_connections
166 ))
167 })?
168 .map_err(|e| Error::Storage(format!("Failed to acquire connection permit: {}", e)))?;
169
170 let wait_time = start.elapsed();
171
172 let conn = self.create_connection().await?;
174
175 if self.config.enable_health_check {
177 if let Err(e) = self.validate_connection_health(&conn).await {
178 let mut stats = self.stats.write();
179 stats.total_health_checks_failed += 1;
180 return Err(e);
181 }
182
183 let mut stats = self.stats.write();
184 stats.total_health_checks_passed += 1;
185 }
186
187 {
189 let mut stats = self.stats.write();
190 stats.total_checkouts += 1;
191 stats.total_wait_time_ms += wait_time.as_millis() as u64;
192 stats.active_connections += 1;
193 stats.update_averages();
194 }
195
196 debug!(
197 "Connection acquired (wait: {:?}, active: {})",
198 wait_time,
199 self.stats.read().active_connections
200 );
201
202 Ok(PooledConnection {
203 connection: Some(conn),
204 _permit: permit,
205 stats: Arc::clone(&self.stats),
206 })
207 }
208
209 async fn validate_connection_health(&self, conn: &Connection) -> Result<()> {
211 tokio::time::timeout(self.config.health_check_timeout, conn.query("SELECT 1", ()))
212 .await
213 .map_err(|_| Error::Storage("Connection health check timeout".to_string()))?
214 .map_err(|e| Error::Storage(format!("Connection health check failed: {}", e)))?;
215
216 Ok(())
217 }
218
219 pub async fn statistics(&self) -> PoolStatistics {
221 self.stats.read().clone()
222 }
223
224 pub async fn utilization(&self) -> f32 {
226 let stats = self.stats.read();
227 if self.config.max_connections == 0 {
228 return 0.0;
229 }
230 stats.active_connections as f32 / self.config.max_connections as f32
231 }
232
233 pub async fn available_connections(&self) -> usize {
235 let stats = self.stats.read();
236 self.config
237 .max_connections
238 .saturating_sub(stats.active_connections)
239 }
240
241 pub async fn has_capacity(&self) -> bool {
243 self.available_connections().await > 0
244 }
245
246 pub async fn shutdown(&self) -> Result<()> {
250 info!("Shutting down connection pool");
251
252 let shutdown_timeout = Duration::from_secs(30);
253 let start = Instant::now();
254
255 while start.elapsed() < shutdown_timeout {
256 let active = self.stats.read().active_connections;
257 if active == 0 {
258 break;
259 }
260
261 debug!("Waiting for {} active connections to complete", active);
262 tokio::time::sleep(Duration::from_millis(100)).await;
263 }
264
265 let final_active = self.stats.read().active_connections;
266 if final_active > 0 {
267 warn!(
268 "Shutdown completed with {} active connections still in use",
269 final_active
270 );
271 } else {
272 info!("Connection pool shutdown complete");
273 }
274
275 Ok(())
276 }
277}