1use std::collections::VecDeque;
13use std::sync::Arc;
14use std::time::Duration;
15
16use parking_lot::Mutex;
17use tokio::sync::Semaphore;
18use tokio_rusqlite::Connection;
19use tracing::{debug, info, trace};
20
21use crate::config::SqliteConfig;
22use crate::connection::{PooledConnection, SqliteConnection};
23use crate::error::{SqliteError, SqliteResult};
24
25#[derive(Clone)]
41pub struct SqlitePool {
42 config: Arc<SqliteConfig>,
43 semaphore: Arc<Semaphore>,
45 idle_connections: Arc<Mutex<VecDeque<PooledConnection>>>,
47 pool_config: Arc<PoolConfig>,
48 stats: Arc<Mutex<PoolStats>>,
50}
51
52#[derive(Debug, Default, Clone)]
54pub struct PoolStats {
55 pub reuses: u64,
57 pub opens: u64,
59 pub expirations: u64,
61 pub in_use: usize,
63}
64
65impl SqlitePool {
66 pub async fn new(config: SqliteConfig) -> SqliteResult<Self> {
68 Self::with_pool_config(config, PoolConfig::default()).await
69 }
70
71 pub async fn with_pool_config(
73 config: SqliteConfig,
74 pool_config: PoolConfig,
75 ) -> SqliteResult<Self> {
76 info!(
77 path = %config.path_str(),
78 max_connections = %pool_config.max_connections,
79 "SQLite connection pool created"
80 );
81
82 let test_conn = Self::open_connection(&config).await?;
84 drop(test_conn);
85
86 let pool = Self {
87 config: Arc::new(config),
88 semaphore: Arc::new(Semaphore::new(pool_config.max_connections)),
89 idle_connections: Arc::new(Mutex::new(VecDeque::with_capacity(
90 pool_config.max_connections,
91 ))),
92 pool_config: Arc::new(pool_config),
93 stats: Arc::new(Mutex::new(PoolStats::default())),
94 };
95
96 if !pool.config.path.is_memory() && pool.pool_config.min_connections > 0 {
98 debug!(
99 "Pre-warming pool with {} connections",
100 pool.pool_config.min_connections
101 );
102 for _ in 0..pool.pool_config.min_connections {
103 if let Ok(conn) = Self::open_connection(&pool.config).await {
104 let mut idle = pool.idle_connections.lock();
105 idle.push_back(PooledConnection::new(conn));
106 }
107 }
108 }
109
110 Ok(pool)
111 }
112
113 async fn open_connection(config: &SqliteConfig) -> SqliteResult<Connection> {
115 let path = config.path_str().to_string();
116 let init_sql = config.init_sql();
117
118 let conn = if config.path.is_memory() {
119 Connection::open_in_memory().await?
120 } else {
121 Connection::open(&path).await?
122 };
123
124 conn.call(move |conn| {
126 conn.execute_batch(&init_sql)?;
127 Ok(())
128 })
129 .await?;
130
131 Ok(conn)
132 }
133
134 pub async fn get(&self) -> SqliteResult<SqliteConnection> {
140 trace!("Acquiring connection from pool");
141
142 let permit = self
144 .semaphore
145 .clone()
146 .acquire_owned()
147 .await
148 .map_err(|e| SqliteError::pool(format!("failed to acquire permit: {}", e)))?;
149
150 {
152 let mut stats = self.stats.lock();
153 stats.in_use += 1;
154 }
155
156 if self.config.path.is_memory() {
159 let conn = Self::open_connection(&self.config).await?;
160 {
161 let mut stats = self.stats.lock();
162 stats.opens += 1;
163 }
164 return Ok(SqliteConnection::new_pooled(
165 conn, permit, None, ));
167 }
168
169 let conn: Option<Connection> = {
171 let mut idle = self.idle_connections.lock();
172
173 while let Some(pooled) = idle.pop_front() {
175 let is_expired = if let Some(lifetime) = self.pool_config.max_lifetime {
176 pooled.created_at.elapsed() > lifetime
177 } else {
178 false
179 };
180 let is_idle_expired = if let Some(timeout) = self.pool_config.idle_timeout {
181 pooled.last_used.elapsed() > timeout
182 } else {
183 false
184 };
185
186 if is_expired || is_idle_expired {
187 let mut stats = self.stats.lock();
188 stats.expirations += 1;
189 continue;
191 }
192 let mut stats = self.stats.lock();
194 stats.reuses += 1;
195 return Ok(SqliteConnection::new_pooled(
196 pooled.conn,
197 permit,
198 Some(self.idle_connections.clone()),
199 ));
200 }
201 None
202 };
203
204 if conn.is_none() {
206 debug!("No idle connections, opening new connection");
207 let new_conn = Self::open_connection(&self.config).await?;
208 {
209 let mut stats = self.stats.lock();
210 stats.opens += 1;
211 }
212 return Ok(SqliteConnection::new_pooled(
213 new_conn,
214 permit,
215 Some(self.idle_connections.clone()),
216 ));
217 }
218
219 unreachable!()
220 }
221
222 pub fn config(&self) -> &SqliteConfig {
224 &self.config
225 }
226
227 pub fn pool_config(&self) -> &PoolConfig {
229 &self.pool_config
230 }
231
232 pub fn stats(&self) -> PoolStats {
234 self.stats.lock().clone()
235 }
236
237 pub fn reset_stats(&self) {
239 let mut stats = self.stats.lock();
240 *stats = PoolStats::default();
241 }
242
243 pub async fn is_healthy(&self) -> bool {
245 match Self::open_connection(&self.config).await {
246 Ok(conn) => {
247 let result = conn
248 .call(|conn| {
249 conn.execute("SELECT 1", [])?;
250 Ok(())
251 })
252 .await;
253 result.is_ok()
254 }
255 Err(_) => false,
256 }
257 }
258
259 pub fn available_permits(&self) -> usize {
261 self.semaphore.available_permits()
262 }
263
264 pub fn idle_count(&self) -> usize {
266 self.idle_connections.lock().len()
267 }
268
269 pub fn builder() -> SqlitePoolBuilder {
271 SqlitePoolBuilder::new()
272 }
273}
274
275#[derive(Debug, Clone)]
277pub struct PoolConfig {
278 pub max_connections: usize,
280 pub min_connections: usize,
282 pub connection_timeout: Option<Duration>,
284 pub idle_timeout: Option<Duration>,
286 pub max_lifetime: Option<Duration>,
288}
289
290impl Default for PoolConfig {
291 fn default() -> Self {
292 Self {
293 max_connections: 5, min_connections: 1,
295 connection_timeout: Some(Duration::from_secs(30)),
296 idle_timeout: Some(Duration::from_secs(300)), max_lifetime: Some(Duration::from_secs(1800)), }
299 }
300}
301
302#[derive(Debug, Default)]
304pub struct SqlitePoolBuilder {
305 config: Option<SqliteConfig>,
306 url: Option<String>,
307 pool_config: PoolConfig,
308}
309
310impl SqlitePoolBuilder {
311 pub fn new() -> Self {
313 Self {
314 config: None,
315 url: None,
316 pool_config: PoolConfig::default(),
317 }
318 }
319
320 pub fn url(mut self, url: impl Into<String>) -> Self {
322 self.url = Some(url.into());
323 self
324 }
325
326 pub fn config(mut self, config: SqliteConfig) -> Self {
328 self.config = Some(config);
329 self
330 }
331
332 pub fn max_connections(mut self, n: usize) -> Self {
334 self.pool_config.max_connections = n;
335 self
336 }
337
338 pub fn connection_timeout(mut self, timeout: Duration) -> Self {
340 self.pool_config.connection_timeout = Some(timeout);
341 self
342 }
343
344 pub fn idle_timeout(mut self, timeout: Duration) -> Self {
346 self.pool_config.idle_timeout = Some(timeout);
347 self
348 }
349
350 pub async fn build(self) -> SqliteResult<SqlitePool> {
352 let config = if let Some(config) = self.config {
353 config
354 } else if let Some(url) = self.url {
355 SqliteConfig::from_url(url)?
356 } else {
357 return Err(SqliteError::config("no database URL or config provided"));
358 };
359
360 SqlitePool::with_pool_config(config, self.pool_config).await
361 }
362}
363
364#[cfg(test)]
365mod tests {
366 use super::*;
367
368 #[test]
369 fn test_pool_config_default() {
370 let config = PoolConfig::default();
371 assert_eq!(config.max_connections, 5);
372 }
373
374 #[test]
375 fn test_pool_builder() {
376 let builder = SqlitePoolBuilder::new()
377 .url("sqlite::memory:")
378 .max_connections(10);
379
380 assert!(builder.url.is_some());
381 assert_eq!(builder.pool_config.max_connections, 10);
382 }
383
384 #[tokio::test]
385 async fn test_pool_memory() {
386 let pool = SqlitePool::new(SqliteConfig::memory()).await.unwrap();
387 assert!(pool.available_permits() > 0);
390 }
391
392 #[tokio::test]
393 async fn test_pool_get_connection() {
394 let pool = SqlitePool::new(SqliteConfig::memory()).await.unwrap();
395 let conn = pool.get().await;
396 assert!(conn.is_ok());
397 }
398}