oxify_storage/
pool.rs

1//! Database connection pool management
2//!
3//! Provides connection pooling and transaction support for SQLite.
4
5use crate::{DatabaseConfig, Result, StorageError};
6use sqlx::sqlite::{SqlitePool, SqlitePoolOptions};
7use sqlx::{Sqlite, Transaction};
8use std::future::Future;
9use std::time::Duration;
10
11/// Database connection pool
12#[derive(Clone)]
13pub struct DatabasePool {
14    pool: SqlitePool,
15}
16
17impl DatabasePool {
18    /// Create a new database pool
19    pub async fn new(config: DatabaseConfig) -> Result<Self> {
20        let pool = SqlitePoolOptions::new()
21            .max_connections(config.max_connections)
22            .min_connections(config.min_connections)
23            .acquire_timeout(Duration::from_secs(30))
24            .connect(&config.database_url)
25            .await?;
26
27        Ok(Self { pool })
28    }
29
30    /// Create a DatabasePool from an existing SqlitePool
31    pub fn from_pool(pool: SqlitePool) -> Self {
32        Self { pool }
33    }
34
35    /// Warm up the connection pool
36    ///
37    /// Pre-populates the pool with connections to avoid cold start latency.
38    /// This is useful during application startup to ensure connections are
39    /// ready before handling requests.
40    ///
41    /// # Arguments
42    ///
43    /// * `target_connections` - Number of connections to pre-create (defaults to min_connections)
44    ///
45    /// # Example
46    ///
47    /// ```ignore
48    /// let pool = DatabasePool::new(config).await?;
49    /// pool.warmup(Some(5)).await?; // Pre-create 5 connections
50    /// ```
51    pub async fn warmup(&self, target_connections: Option<u32>) -> Result<u32> {
52        let target =
53            target_connections.unwrap_or_else(|| self.pool.options().get_min_connections());
54
55        let mut acquired = Vec::new();
56        let mut count = 0;
57
58        // Acquire connections up to target
59        for _ in 0..target {
60            match self.pool.acquire().await {
61                Ok(conn) => {
62                    acquired.push(conn);
63                    count += 1;
64                }
65                Err(e) => {
66                    tracing::warn!("Failed to acquire connection during warmup: {}", e);
67                    break;
68                }
69            }
70        }
71
72        // Release all acquired connections back to pool
73        drop(acquired);
74
75        tracing::info!("Warmed up connection pool with {} connections", count);
76        Ok(count)
77    }
78
79    /// Run database migrations
80    pub async fn migrate(&self) -> Result<()> {
81        sqlx::migrate!("./migrations")
82            .run(&self.pool)
83            .await
84            .map_err(|e| StorageError::Migration(e.to_string()))?;
85        Ok(())
86    }
87
88    /// Get a reference to the underlying pool
89    pub fn pool(&self) -> &SqlitePool {
90        &self.pool
91    }
92
93    /// Check if database is healthy
94    pub async fn health_check(&self) -> Result<()> {
95        sqlx::query("SELECT 1").execute(&self.pool).await?;
96        Ok(())
97    }
98
99    // ==================== Transaction Support ====================
100
101    /// Begin a new transaction
102    pub async fn begin(&self) -> Result<Transaction<'static, Sqlite>> {
103        let tx = self.pool.begin().await?;
104        Ok(tx)
105    }
106
107    /// Execute a closure within a transaction
108    /// The transaction is committed if the closure returns Ok, rolled back otherwise
109    pub async fn transaction<F, T, Fut>(&self, f: F) -> Result<T>
110    where
111        F: FnOnce(Transaction<'static, Sqlite>) -> Fut,
112        Fut: Future<Output = Result<(Transaction<'static, Sqlite>, T)>>,
113    {
114        let tx = self.begin().await?;
115        match f(tx).await {
116            Ok((tx, result)) => {
117                tx.commit().await?;
118                Ok(result)
119            }
120            Err(e) => Err(e),
121        }
122    }
123
124    /// Get pool statistics
125    pub fn stats(&self) -> PoolStats {
126        PoolStats {
127            size: self.pool.size(),
128            num_idle: self.pool.num_idle(),
129            max_connections: self.pool.options().get_max_connections(),
130            min_connections: self.pool.options().get_min_connections(),
131        }
132    }
133
134    /// Get comprehensive pool metrics including health status
135    pub fn metrics(&self) -> PoolMetrics {
136        let stats = self.stats();
137        let health = self.health_status();
138
139        PoolMetrics {
140            stats,
141            health,
142            acquire_timeout_ms: 30_000, // 30 seconds (configured in new())
143        }
144    }
145
146    /// Get pool health status based on utilization and state
147    pub fn health_status(&self) -> PoolHealth {
148        if self.is_closed() {
149            return PoolHealth::Critical;
150        }
151
152        let stats = self.stats();
153
154        if stats.is_at_capacity() || !stats.has_available() {
155            PoolHealth::Critical
156        } else if stats.is_overutilized() {
157            PoolHealth::Degraded
158        } else {
159            PoolHealth::Healthy
160        }
161    }
162
163    /// Close the pool gracefully
164    pub async fn close(&self) {
165        self.pool.close().await;
166    }
167
168    /// Check if the pool is closed
169    pub fn is_closed(&self) -> bool {
170        self.pool.is_closed()
171    }
172
173    /// Acquire a connection from the pool
174    pub async fn acquire(&self) -> Result<sqlx::pool::PoolConnection<Sqlite>> {
175        let conn = self.pool.acquire().await?;
176        Ok(conn)
177    }
178
179    /// Export metrics in a format suitable for monitoring systems
180    ///
181    /// Returns a map of metric names to values for integration with
182    /// monitoring systems like Prometheus, DataDog, etc.
183    pub fn export_metrics(&self) -> std::collections::HashMap<String, f64> {
184        let stats = self.stats();
185        let mut metrics = std::collections::HashMap::new();
186
187        metrics.insert("pool_size".to_string(), f64::from(stats.size));
188        metrics.insert("pool_idle_connections".to_string(), stats.num_idle as f64);
189        metrics.insert(
190            "pool_active_connections".to_string(),
191            stats.active_connections() as f64,
192        );
193        metrics.insert(
194            "pool_max_connections".to_string(),
195            f64::from(stats.max_connections),
196        );
197        metrics.insert(
198            "pool_min_connections".to_string(),
199            f64::from(stats.min_connections),
200        );
201        metrics.insert("pool_utilization".to_string(), stats.utilization());
202        metrics.insert(
203            "pool_at_capacity".to_string(),
204            if stats.is_at_capacity() { 1.0 } else { 0.0 },
205        );
206        metrics.insert(
207            "pool_has_available".to_string(),
208            if stats.has_available() { 1.0 } else { 0.0 },
209        );
210
211        let health = self.health_status();
212        metrics.insert(
213            "pool_is_closed".to_string(),
214            if self.is_closed() { 1.0 } else { 0.0 },
215        );
216        metrics.insert(
217            "pool_is_healthy".to_string(),
218            if health == PoolHealth::Healthy {
219                1.0
220            } else {
221                0.0
222            },
223        );
224        metrics.insert(
225            "pool_is_degraded".to_string(),
226            if health == PoolHealth::Degraded {
227                1.0
228            } else {
229                0.0
230            },
231        );
232        metrics.insert(
233            "pool_is_critical".to_string(),
234            if health == PoolHealth::Critical {
235                1.0
236            } else {
237                0.0
238            },
239        );
240
241        metrics
242    }
243}
244
245/// Pool statistics
246#[derive(Debug, Clone, Copy)]
247pub struct PoolStats {
248    /// Current number of connections in the pool
249    pub size: u32,
250    /// Number of idle connections
251    pub num_idle: usize,
252    /// Maximum number of connections
253    pub max_connections: u32,
254    /// Minimum number of connections
255    pub min_connections: u32,
256}
257
258impl PoolStats {
259    /// Number of active (non-idle) connections
260    pub fn active_connections(&self) -> usize {
261        self.size as usize - self.num_idle
262    }
263
264    /// Check if the pool has available connections
265    pub fn has_available(&self) -> bool {
266        self.num_idle > 0 || (self.size as usize) < self.max_connections as usize
267    }
268
269    /// Calculate pool utilization as a percentage (0.0 to 1.0)
270    pub fn utilization(&self) -> f64 {
271        if self.max_connections == 0 {
272            return 0.0;
273        }
274        f64::from(self.size) / f64::from(self.max_connections)
275    }
276
277    /// Check if pool is at capacity
278    pub fn is_at_capacity(&self) -> bool {
279        self.size >= self.max_connections
280    }
281
282    /// Check if pool is under-utilized (less than 50% of max)
283    pub fn is_underutilized(&self) -> bool {
284        self.utilization() < 0.5
285    }
286
287    /// Check if pool is over-utilized (more than 80% of max)
288    pub fn is_overutilized(&self) -> bool {
289        self.utilization() > 0.8
290    }
291}
292
293/// Extended pool metrics for monitoring and observability
294#[derive(Debug, Clone)]
295pub struct PoolMetrics {
296    /// Basic pool statistics
297    pub stats: PoolStats,
298    /// Pool health status
299    pub health: PoolHealth,
300    /// Acquire timeout configuration (in milliseconds)
301    pub acquire_timeout_ms: u64,
302}
303
304/// Pool health status
305#[derive(Debug, Clone, Copy, PartialEq, Eq)]
306pub enum PoolHealth {
307    /// Pool is healthy and operating normally
308    Healthy,
309    /// Pool is degraded but functional (high utilization)
310    Degraded,
311    /// Pool is at capacity or closed
312    Critical,
313}
314
315impl PoolHealth {
316    pub fn as_str(&self) -> &'static str {
317        match self {
318            PoolHealth::Healthy => "healthy",
319            PoolHealth::Degraded => "degraded",
320            PoolHealth::Critical => "critical",
321        }
322    }
323}
324
325impl std::fmt::Display for PoolHealth {
326    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
327        write!(f, "{}", self.as_str())
328    }
329}
330
331/// Transaction helper for manual transaction management
332#[allow(dead_code)]
333pub struct TransactionHelper {
334    tx: Option<Transaction<'static, Sqlite>>,
335    committed: bool,
336}
337
338#[allow(dead_code)]
339impl TransactionHelper {
340    /// Create a new transaction helper
341    pub async fn new(pool: &DatabasePool) -> Result<Self> {
342        let tx = pool.begin().await?;
343        Ok(Self {
344            tx: Some(tx),
345            committed: false,
346        })
347    }
348
349    /// Get a reference to the transaction
350    pub fn tx(&mut self) -> &mut Transaction<'static, Sqlite> {
351        self.tx.as_mut().expect("Transaction already consumed")
352    }
353
354    /// Commit the transaction
355    pub async fn commit(mut self) -> Result<()> {
356        if let Some(tx) = self.tx.take() {
357            tx.commit().await?;
358            self.committed = true;
359        }
360        Ok(())
361    }
362
363    /// Rollback the transaction
364    pub async fn rollback(mut self) -> Result<()> {
365        if let Some(tx) = self.tx.take() {
366            tx.rollback().await?;
367        }
368        Ok(())
369    }
370}
371
372#[cfg(test)]
373mod tests {
374    use super::*;
375
376    #[test]
377    fn test_pool_stats() {
378        let stats = PoolStats {
379            size: 10,
380            num_idle: 3,
381            max_connections: 20,
382            min_connections: 2,
383        };
384
385        assert_eq!(stats.active_connections(), 7);
386        assert!(stats.has_available());
387    }
388
389    #[test]
390    fn test_pool_stats_at_capacity() {
391        let stats = PoolStats {
392            size: 20,
393            num_idle: 0,
394            max_connections: 20,
395            min_connections: 2,
396        };
397
398        assert_eq!(stats.active_connections(), 20);
399        assert!(!stats.has_available());
400    }
401}