kaccy_db/
pool_hooks.rs

1//! Pool event hooks for monitoring connection lifecycle events.
2//!
3//! This module provides:
4//! - Connection acquire/release callbacks
5//! - Connection timeout notifications
6//! - Pool exhaustion alerts
7
8use async_trait::async_trait;
9use chrono::{DateTime, Utc};
10use parking_lot::RwLock;
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::sync::Arc;
14use std::time::Duration;
15use tracing::{debug, error, info, warn};
16use uuid::Uuid;
17
18/// Event types for pool lifecycle
19#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
20pub enum PoolEventType {
21    /// Connection acquired from pool
22    ConnectionAcquired,
23    /// Connection released back to pool
24    ConnectionReleased,
25    /// Connection acquisition timed out
26    AcquisitionTimeout,
27    /// Pool is exhausted (no available connections)
28    PoolExhausted,
29    /// Connection created
30    ConnectionCreated,
31    /// Connection closed
32    ConnectionClosed,
33    /// Connection validation failed
34    ValidationFailed,
35}
36
37/// Pool event data
38#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct PoolEvent {
40    /// Unique event ID
41    pub event_id: Uuid,
42    /// Event type
43    pub event_type: PoolEventType,
44    /// Timestamp when event occurred
45    pub timestamp: DateTime<Utc>,
46    /// Pool size at time of event
47    pub pool_size: u32,
48    /// Number of idle connections
49    pub idle_connections: u32,
50    /// Wait time in milliseconds (for acquire events)
51    pub wait_time_ms: Option<u64>,
52    /// Error message (if applicable)
53    pub error: Option<String>,
54    /// Additional metadata
55    pub metadata: HashMap<String, String>,
56}
57
58impl PoolEvent {
59    /// Create a new pool event
60    pub fn new(event_type: PoolEventType, pool_size: u32, idle_connections: u32) -> Self {
61        Self {
62            event_id: Uuid::new_v4(),
63            event_type,
64            timestamp: Utc::now(),
65            pool_size,
66            idle_connections,
67            wait_time_ms: None,
68            error: None,
69            metadata: HashMap::new(),
70        }
71    }
72
73    /// Add wait time
74    pub fn with_wait_time(mut self, wait_time: Duration) -> Self {
75        self.wait_time_ms = Some(wait_time.as_millis() as u64);
76        self
77    }
78
79    /// Add error message
80    pub fn with_error(mut self, error: String) -> Self {
81        self.error = Some(error);
82        self
83    }
84
85    /// Add metadata
86    pub fn add_metadata(mut self, key: String, value: String) -> Self {
87        self.metadata.insert(key, value);
88        self
89    }
90}
91
92/// Trait for handling pool events
93#[async_trait]
94pub trait PoolEventHandler: Send + Sync {
95    /// Handle a pool event
96    async fn handle_event(&self, event: PoolEvent);
97}
98
99/// Logging event handler that logs events using tracing
100#[derive(Debug, Clone)]
101pub struct LoggingEventHandler {
102    /// Log level for different event types
103    pub log_levels: HashMap<PoolEventType, LogLevel>,
104}
105
106#[derive(Debug, Clone, Copy, PartialEq, Eq)]
107pub enum LogLevel {
108    Debug,
109    Info,
110    Warn,
111    Error,
112}
113
114impl Default for LoggingEventHandler {
115    fn default() -> Self {
116        let mut log_levels = HashMap::new();
117        log_levels.insert(PoolEventType::ConnectionAcquired, LogLevel::Debug);
118        log_levels.insert(PoolEventType::ConnectionReleased, LogLevel::Debug);
119        log_levels.insert(PoolEventType::AcquisitionTimeout, LogLevel::Warn);
120        log_levels.insert(PoolEventType::PoolExhausted, LogLevel::Error);
121        log_levels.insert(PoolEventType::ConnectionCreated, LogLevel::Info);
122        log_levels.insert(PoolEventType::ConnectionClosed, LogLevel::Info);
123        log_levels.insert(PoolEventType::ValidationFailed, LogLevel::Warn);
124
125        Self { log_levels }
126    }
127}
128
129#[async_trait]
130impl PoolEventHandler for LoggingEventHandler {
131    async fn handle_event(&self, event: PoolEvent) {
132        let level = self
133            .log_levels
134            .get(&event.event_type)
135            .copied()
136            .unwrap_or(LogLevel::Info);
137
138        let message = format!(
139            "Pool event: {:?}, size: {}, idle: {}",
140            event.event_type, event.pool_size, event.idle_connections
141        );
142
143        match level {
144            LogLevel::Debug => debug!(
145                event_id = %event.event_id,
146                event_type = ?event.event_type,
147                pool_size = event.pool_size,
148                idle = event.idle_connections,
149                wait_time_ms = ?event.wait_time_ms,
150                "{}", message
151            ),
152            LogLevel::Info => info!(
153                event_id = %event.event_id,
154                event_type = ?event.event_type,
155                pool_size = event.pool_size,
156                idle = event.idle_connections,
157                wait_time_ms = ?event.wait_time_ms,
158                "{}", message
159            ),
160            LogLevel::Warn => warn!(
161                event_id = %event.event_id,
162                event_type = ?event.event_type,
163                pool_size = event.pool_size,
164                idle = event.idle_connections,
165                wait_time_ms = ?event.wait_time_ms,
166                error = ?event.error,
167                "{}", message
168            ),
169            LogLevel::Error => error!(
170                event_id = %event.event_id,
171                event_type = ?event.event_type,
172                pool_size = event.pool_size,
173                idle = event.idle_connections,
174                wait_time_ms = ?event.wait_time_ms,
175                error = ?event.error,
176                "{}", message
177            ),
178        }
179    }
180}
181
182/// Metrics event handler that tracks pool metrics
183#[derive(Debug, Clone)]
184pub struct MetricsEventHandler {
185    /// Event counts by type
186    counts: Arc<RwLock<HashMap<PoolEventType, u64>>>,
187    /// Total wait time in milliseconds
188    total_wait_time_ms: Arc<RwLock<u64>>,
189    /// Number of wait time samples
190    wait_time_samples: Arc<RwLock<u64>>,
191}
192
193impl Default for MetricsEventHandler {
194    fn default() -> Self {
195        Self {
196            counts: Arc::new(RwLock::new(HashMap::new())),
197            total_wait_time_ms: Arc::new(RwLock::new(0)),
198            wait_time_samples: Arc::new(RwLock::new(0)),
199        }
200    }
201}
202
203impl MetricsEventHandler {
204    /// Get event count for a specific type
205    pub fn get_count(&self, event_type: &PoolEventType) -> u64 {
206        self.counts.read().get(event_type).copied().unwrap_or(0)
207    }
208
209    /// Get average wait time in milliseconds
210    pub fn get_avg_wait_time_ms(&self) -> f64 {
211        let total = *self.total_wait_time_ms.read();
212        let samples = *self.wait_time_samples.read();
213
214        if samples == 0 {
215            0.0
216        } else {
217            total as f64 / samples as f64
218        }
219    }
220
221    /// Reset all metrics
222    pub fn reset(&self) {
223        self.counts.write().clear();
224        *self.total_wait_time_ms.write() = 0;
225        *self.wait_time_samples.write() = 0;
226    }
227}
228
229#[async_trait]
230impl PoolEventHandler for MetricsEventHandler {
231    async fn handle_event(&self, event: PoolEvent) {
232        // Increment count
233        let mut counts = self.counts.write();
234        *counts.entry(event.event_type.clone()).or_insert(0) += 1;
235        drop(counts);
236
237        // Track wait time
238        if let Some(wait_time_ms) = event.wait_time_ms {
239            *self.total_wait_time_ms.write() += wait_time_ms;
240            *self.wait_time_samples.write() += 1;
241        }
242    }
243}
244
245/// Alert event handler that triggers alerts for critical events
246#[derive(Clone)]
247pub struct AlertEventHandler {
248    /// Callback function for alerts
249    alert_callback: Arc<dyn Fn(PoolEvent) + Send + Sync>,
250}
251
252impl AlertEventHandler {
253    /// Create a new alert handler with a callback
254    pub fn new<F>(callback: F) -> Self
255    where
256        F: Fn(PoolEvent) + Send + Sync + 'static,
257    {
258        Self {
259            alert_callback: Arc::new(callback),
260        }
261    }
262}
263
264#[async_trait]
265impl PoolEventHandler for AlertEventHandler {
266    async fn handle_event(&self, event: PoolEvent) {
267        // Trigger alert for critical events
268        match event.event_type {
269            PoolEventType::AcquisitionTimeout
270            | PoolEventType::PoolExhausted
271            | PoolEventType::ValidationFailed => {
272                (self.alert_callback)(event);
273            }
274            _ => {}
275        }
276    }
277}
278
279/// Pool event manager that coordinates multiple event handlers
280#[derive(Clone)]
281pub struct PoolEventManager {
282    handlers: Arc<RwLock<Vec<Arc<dyn PoolEventHandler>>>>,
283}
284
285impl Default for PoolEventManager {
286    fn default() -> Self {
287        Self::new()
288    }
289}
290
291impl PoolEventManager {
292    /// Create a new event manager
293    pub fn new() -> Self {
294        Self {
295            handlers: Arc::new(RwLock::new(Vec::new())),
296        }
297    }
298
299    /// Add an event handler
300    pub fn add_handler(&self, handler: Arc<dyn PoolEventHandler>) {
301        self.handlers.write().push(handler);
302    }
303
304    /// Emit an event to all handlers
305    pub async fn emit(&self, event: PoolEvent) {
306        let handlers = self.handlers.read().clone();
307
308        for handler in handlers {
309            handler.handle_event(event.clone()).await;
310        }
311    }
312
313    /// Convenience method to emit connection acquired event
314    pub async fn emit_acquired(&self, pool_size: u32, idle: u32, wait_time: Duration) {
315        let event = PoolEvent::new(PoolEventType::ConnectionAcquired, pool_size, idle)
316            .with_wait_time(wait_time);
317        self.emit(event).await;
318    }
319
320    /// Convenience method to emit connection released event
321    pub async fn emit_released(&self, pool_size: u32, idle: u32) {
322        let event = PoolEvent::new(PoolEventType::ConnectionReleased, pool_size, idle);
323        self.emit(event).await;
324    }
325
326    /// Convenience method to emit timeout event
327    pub async fn emit_timeout(&self, pool_size: u32, idle: u32, timeout: Duration) {
328        let event = PoolEvent::new(PoolEventType::AcquisitionTimeout, pool_size, idle)
329            .with_wait_time(timeout)
330            .with_error(format!(
331                "Connection acquisition timed out after {}ms",
332                timeout.as_millis()
333            ));
334        self.emit(event).await;
335    }
336
337    /// Convenience method to emit pool exhausted event
338    pub async fn emit_exhausted(&self, pool_size: u32) {
339        let event = PoolEvent::new(PoolEventType::PoolExhausted, pool_size, 0)
340            .with_error("Pool has no available connections".to_string());
341        self.emit(event).await;
342    }
343}
344
345#[cfg(test)]
346mod tests {
347    use super::*;
348
349    #[test]
350    fn test_pool_event_creation() {
351        let event = PoolEvent::new(PoolEventType::ConnectionAcquired, 10, 5);
352
353        assert_eq!(event.event_type, PoolEventType::ConnectionAcquired);
354        assert_eq!(event.pool_size, 10);
355        assert_eq!(event.idle_connections, 5);
356        assert!(event.wait_time_ms.is_none());
357        assert!(event.error.is_none());
358    }
359
360    #[test]
361    fn test_pool_event_with_wait_time() {
362        let event = PoolEvent::new(PoolEventType::ConnectionAcquired, 10, 5)
363            .with_wait_time(Duration::from_millis(150));
364
365        assert_eq!(event.wait_time_ms, Some(150));
366    }
367
368    #[test]
369    fn test_pool_event_with_error() {
370        let event = PoolEvent::new(PoolEventType::AcquisitionTimeout, 10, 0)
371            .with_error("Timeout occurred".to_string());
372
373        assert_eq!(event.error, Some("Timeout occurred".to_string()));
374    }
375
376    #[tokio::test]
377    async fn test_logging_event_handler() {
378        let handler = LoggingEventHandler::default();
379        let event = PoolEvent::new(PoolEventType::ConnectionAcquired, 10, 5);
380
381        // Should not panic
382        handler.handle_event(event).await;
383    }
384
385    #[tokio::test]
386    async fn test_metrics_event_handler() {
387        let handler = MetricsEventHandler::default();
388
389        // Emit several events
390        for _ in 0..5 {
391            let event = PoolEvent::new(PoolEventType::ConnectionAcquired, 10, 5)
392                .with_wait_time(Duration::from_millis(100));
393            handler.handle_event(event).await;
394        }
395
396        assert_eq!(handler.get_count(&PoolEventType::ConnectionAcquired), 5);
397        assert!((handler.get_avg_wait_time_ms() - 100.0).abs() < 0.1);
398    }
399
400    #[tokio::test]
401    async fn test_metrics_event_handler_reset() {
402        let handler = MetricsEventHandler::default();
403
404        let event = PoolEvent::new(PoolEventType::ConnectionAcquired, 10, 5)
405            .with_wait_time(Duration::from_millis(100));
406        handler.handle_event(event).await;
407
408        handler.reset();
409
410        assert_eq!(handler.get_count(&PoolEventType::ConnectionAcquired), 0);
411        assert_eq!(handler.get_avg_wait_time_ms(), 0.0);
412    }
413
414    #[tokio::test]
415    async fn test_alert_event_handler() {
416        let alert_triggered = Arc::new(RwLock::new(false));
417        let alert_triggered_clone = alert_triggered.clone();
418
419        let handler = AlertEventHandler::new(move |_event| {
420            *alert_triggered_clone.write() = true;
421        });
422
423        // Non-critical event should not trigger alert
424        let event = PoolEvent::new(PoolEventType::ConnectionAcquired, 10, 5);
425        handler.handle_event(event).await;
426        assert!(!*alert_triggered.read());
427
428        // Critical event should trigger alert
429        let event = PoolEvent::new(PoolEventType::PoolExhausted, 10, 0);
430        handler.handle_event(event).await;
431        assert!(*alert_triggered.read());
432    }
433
434    #[tokio::test]
435    async fn test_pool_event_manager() {
436        let manager = PoolEventManager::new();
437        let metrics = Arc::new(MetricsEventHandler::default());
438
439        manager.add_handler(metrics.clone());
440
441        manager
442            .emit_acquired(10, 5, Duration::from_millis(100))
443            .await;
444        manager.emit_released(10, 6).await;
445
446        assert_eq!(metrics.get_count(&PoolEventType::ConnectionAcquired), 1);
447        assert_eq!(metrics.get_count(&PoolEventType::ConnectionReleased), 1);
448    }
449
450    #[tokio::test]
451    async fn test_pool_event_manager_multiple_handlers() {
452        let manager = PoolEventManager::new();
453        let metrics1 = Arc::new(MetricsEventHandler::default());
454        let metrics2 = Arc::new(MetricsEventHandler::default());
455
456        manager.add_handler(metrics1.clone());
457        manager.add_handler(metrics2.clone());
458
459        let event = PoolEvent::new(PoolEventType::ConnectionAcquired, 10, 5);
460        manager.emit(event).await;
461
462        // Both handlers should receive the event
463        assert_eq!(metrics1.get_count(&PoolEventType::ConnectionAcquired), 1);
464        assert_eq!(metrics2.get_count(&PoolEventType::ConnectionAcquired), 1);
465    }
466
467    #[tokio::test]
468    async fn test_emit_timeout() {
469        let manager = PoolEventManager::new();
470        let metrics = Arc::new(MetricsEventHandler::default());
471
472        manager.add_handler(metrics.clone());
473        manager.emit_timeout(10, 0, Duration::from_secs(5)).await;
474
475        assert_eq!(metrics.get_count(&PoolEventType::AcquisitionTimeout), 1);
476    }
477
478    #[tokio::test]
479    async fn test_emit_exhausted() {
480        let manager = PoolEventManager::new();
481        let metrics = Arc::new(MetricsEventHandler::default());
482
483        manager.add_handler(metrics.clone());
484        manager.emit_exhausted(10).await;
485
486        assert_eq!(metrics.get_count(&PoolEventType::PoolExhausted), 1);
487    }
488}