1use crate::{DatabaseConfig, Result, StorageError};
6use sqlx::sqlite::{SqlitePool, SqlitePoolOptions};
7use sqlx::{Sqlite, Transaction};
8use std::future::Future;
9use std::time::Duration;
10
11#[derive(Clone)]
13pub struct DatabasePool {
14 pool: SqlitePool,
15}
16
17impl DatabasePool {
18 pub async fn new(config: DatabaseConfig) -> Result<Self> {
20 let pool = SqlitePoolOptions::new()
21 .max_connections(config.max_connections)
22 .min_connections(config.min_connections)
23 .acquire_timeout(Duration::from_secs(30))
24 .connect(&config.database_url)
25 .await?;
26
27 Ok(Self { pool })
28 }
29
30 pub fn from_pool(pool: SqlitePool) -> Self {
32 Self { pool }
33 }
34
35 pub async fn warmup(&self, target_connections: Option<u32>) -> Result<u32> {
52 let target =
53 target_connections.unwrap_or_else(|| self.pool.options().get_min_connections());
54
55 let mut acquired = Vec::new();
56 let mut count = 0;
57
58 for _ in 0..target {
60 match self.pool.acquire().await {
61 Ok(conn) => {
62 acquired.push(conn);
63 count += 1;
64 }
65 Err(e) => {
66 tracing::warn!("Failed to acquire connection during warmup: {}", e);
67 break;
68 }
69 }
70 }
71
72 drop(acquired);
74
75 tracing::info!("Warmed up connection pool with {} connections", count);
76 Ok(count)
77 }
78
79 pub async fn migrate(&self) -> Result<()> {
81 sqlx::migrate!("./migrations")
82 .run(&self.pool)
83 .await
84 .map_err(|e| StorageError::Migration(e.to_string()))?;
85 Ok(())
86 }
87
88 pub fn pool(&self) -> &SqlitePool {
90 &self.pool
91 }
92
93 pub async fn health_check(&self) -> Result<()> {
95 sqlx::query("SELECT 1").execute(&self.pool).await?;
96 Ok(())
97 }
98
99 pub async fn begin(&self) -> Result<Transaction<'static, Sqlite>> {
103 let tx = self.pool.begin().await?;
104 Ok(tx)
105 }
106
107 pub async fn transaction<F, T, Fut>(&self, f: F) -> Result<T>
110 where
111 F: FnOnce(Transaction<'static, Sqlite>) -> Fut,
112 Fut: Future<Output = Result<(Transaction<'static, Sqlite>, T)>>,
113 {
114 let tx = self.begin().await?;
115 match f(tx).await {
116 Ok((tx, result)) => {
117 tx.commit().await?;
118 Ok(result)
119 }
120 Err(e) => Err(e),
121 }
122 }
123
124 pub fn stats(&self) -> PoolStats {
126 PoolStats {
127 size: self.pool.size(),
128 num_idle: self.pool.num_idle(),
129 max_connections: self.pool.options().get_max_connections(),
130 min_connections: self.pool.options().get_min_connections(),
131 }
132 }
133
134 pub fn metrics(&self) -> PoolMetrics {
136 let stats = self.stats();
137 let health = self.health_status();
138
139 PoolMetrics {
140 stats,
141 health,
142 acquire_timeout_ms: 30_000, }
144 }
145
146 pub fn health_status(&self) -> PoolHealth {
148 if self.is_closed() {
149 return PoolHealth::Critical;
150 }
151
152 let stats = self.stats();
153
154 if stats.is_at_capacity() || !stats.has_available() {
155 PoolHealth::Critical
156 } else if stats.is_overutilized() {
157 PoolHealth::Degraded
158 } else {
159 PoolHealth::Healthy
160 }
161 }
162
163 pub async fn close(&self) {
165 self.pool.close().await;
166 }
167
168 pub fn is_closed(&self) -> bool {
170 self.pool.is_closed()
171 }
172
173 pub async fn acquire(&self) -> Result<sqlx::pool::PoolConnection<Sqlite>> {
175 let conn = self.pool.acquire().await?;
176 Ok(conn)
177 }
178
179 pub fn export_metrics(&self) -> std::collections::HashMap<String, f64> {
184 let stats = self.stats();
185 let mut metrics = std::collections::HashMap::new();
186
187 metrics.insert("pool_size".to_string(), f64::from(stats.size));
188 metrics.insert("pool_idle_connections".to_string(), stats.num_idle as f64);
189 metrics.insert(
190 "pool_active_connections".to_string(),
191 stats.active_connections() as f64,
192 );
193 metrics.insert(
194 "pool_max_connections".to_string(),
195 f64::from(stats.max_connections),
196 );
197 metrics.insert(
198 "pool_min_connections".to_string(),
199 f64::from(stats.min_connections),
200 );
201 metrics.insert("pool_utilization".to_string(), stats.utilization());
202 metrics.insert(
203 "pool_at_capacity".to_string(),
204 if stats.is_at_capacity() { 1.0 } else { 0.0 },
205 );
206 metrics.insert(
207 "pool_has_available".to_string(),
208 if stats.has_available() { 1.0 } else { 0.0 },
209 );
210
211 let health = self.health_status();
212 metrics.insert(
213 "pool_is_closed".to_string(),
214 if self.is_closed() { 1.0 } else { 0.0 },
215 );
216 metrics.insert(
217 "pool_is_healthy".to_string(),
218 if health == PoolHealth::Healthy {
219 1.0
220 } else {
221 0.0
222 },
223 );
224 metrics.insert(
225 "pool_is_degraded".to_string(),
226 if health == PoolHealth::Degraded {
227 1.0
228 } else {
229 0.0
230 },
231 );
232 metrics.insert(
233 "pool_is_critical".to_string(),
234 if health == PoolHealth::Critical {
235 1.0
236 } else {
237 0.0
238 },
239 );
240
241 metrics
242 }
243}
244
245#[derive(Debug, Clone, Copy)]
247pub struct PoolStats {
248 pub size: u32,
250 pub num_idle: usize,
252 pub max_connections: u32,
254 pub min_connections: u32,
256}
257
258impl PoolStats {
259 pub fn active_connections(&self) -> usize {
261 self.size as usize - self.num_idle
262 }
263
264 pub fn has_available(&self) -> bool {
266 self.num_idle > 0 || (self.size as usize) < self.max_connections as usize
267 }
268
269 pub fn utilization(&self) -> f64 {
271 if self.max_connections == 0 {
272 return 0.0;
273 }
274 f64::from(self.size) / f64::from(self.max_connections)
275 }
276
277 pub fn is_at_capacity(&self) -> bool {
279 self.size >= self.max_connections
280 }
281
282 pub fn is_underutilized(&self) -> bool {
284 self.utilization() < 0.5
285 }
286
287 pub fn is_overutilized(&self) -> bool {
289 self.utilization() > 0.8
290 }
291}
292
293#[derive(Debug, Clone)]
295pub struct PoolMetrics {
296 pub stats: PoolStats,
298 pub health: PoolHealth,
300 pub acquire_timeout_ms: u64,
302}
303
304#[derive(Debug, Clone, Copy, PartialEq, Eq)]
306pub enum PoolHealth {
307 Healthy,
309 Degraded,
311 Critical,
313}
314
315impl PoolHealth {
316 pub fn as_str(&self) -> &'static str {
317 match self {
318 PoolHealth::Healthy => "healthy",
319 PoolHealth::Degraded => "degraded",
320 PoolHealth::Critical => "critical",
321 }
322 }
323}
324
325impl std::fmt::Display for PoolHealth {
326 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
327 write!(f, "{}", self.as_str())
328 }
329}
330
331#[allow(dead_code)]
333pub struct TransactionHelper {
334 tx: Option<Transaction<'static, Sqlite>>,
335 committed: bool,
336}
337
338#[allow(dead_code)]
339impl TransactionHelper {
340 pub async fn new(pool: &DatabasePool) -> Result<Self> {
342 let tx = pool.begin().await?;
343 Ok(Self {
344 tx: Some(tx),
345 committed: false,
346 })
347 }
348
349 pub fn tx(&mut self) -> &mut Transaction<'static, Sqlite> {
351 self.tx.as_mut().expect("Transaction already consumed")
352 }
353
354 pub async fn commit(mut self) -> Result<()> {
356 if let Some(tx) = self.tx.take() {
357 tx.commit().await?;
358 self.committed = true;
359 }
360 Ok(())
361 }
362
363 pub async fn rollback(mut self) -> Result<()> {
365 if let Some(tx) = self.tx.take() {
366 tx.rollback().await?;
367 }
368 Ok(())
369 }
370}
371
372#[cfg(test)]
373mod tests {
374 use super::*;
375
376 #[test]
377 fn test_pool_stats() {
378 let stats = PoolStats {
379 size: 10,
380 num_idle: 3,
381 max_connections: 20,
382 min_connections: 2,
383 };
384
385 assert_eq!(stats.active_connections(), 7);
386 assert!(stats.has_available());
387 }
388
389 #[test]
390 fn test_pool_stats_at_capacity() {
391 let stats = PoolStats {
392 size: 20,
393 num_idle: 0,
394 max_connections: 20,
395 min_connections: 2,
396 };
397
398 assert_eq!(stats.active_connections(), 20);
399 assert!(!stats.has_available());
400 }
401}