1use async_trait::async_trait;
9use chrono::{DateTime, Utc};
10use parking_lot::RwLock;
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::sync::Arc;
14use std::time::Duration;
15use tracing::{debug, error, info, warn};
16use uuid::Uuid;
17
18#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
20pub enum PoolEventType {
21 ConnectionAcquired,
23 ConnectionReleased,
25 AcquisitionTimeout,
27 PoolExhausted,
29 ConnectionCreated,
31 ConnectionClosed,
33 ValidationFailed,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct PoolEvent {
40 pub event_id: Uuid,
42 pub event_type: PoolEventType,
44 pub timestamp: DateTime<Utc>,
46 pub pool_size: u32,
48 pub idle_connections: u32,
50 pub wait_time_ms: Option<u64>,
52 pub error: Option<String>,
54 pub metadata: HashMap<String, String>,
56}
57
58impl PoolEvent {
59 pub fn new(event_type: PoolEventType, pool_size: u32, idle_connections: u32) -> Self {
61 Self {
62 event_id: Uuid::new_v4(),
63 event_type,
64 timestamp: Utc::now(),
65 pool_size,
66 idle_connections,
67 wait_time_ms: None,
68 error: None,
69 metadata: HashMap::new(),
70 }
71 }
72
73 pub fn with_wait_time(mut self, wait_time: Duration) -> Self {
75 self.wait_time_ms = Some(wait_time.as_millis() as u64);
76 self
77 }
78
79 pub fn with_error(mut self, error: String) -> Self {
81 self.error = Some(error);
82 self
83 }
84
85 pub fn add_metadata(mut self, key: String, value: String) -> Self {
87 self.metadata.insert(key, value);
88 self
89 }
90}
91
92#[async_trait]
94pub trait PoolEventHandler: Send + Sync {
95 async fn handle_event(&self, event: PoolEvent);
97}
98
99#[derive(Debug, Clone)]
101pub struct LoggingEventHandler {
102 pub log_levels: HashMap<PoolEventType, LogLevel>,
104}
105
106#[derive(Debug, Clone, Copy, PartialEq, Eq)]
107pub enum LogLevel {
108 Debug,
109 Info,
110 Warn,
111 Error,
112}
113
114impl Default for LoggingEventHandler {
115 fn default() -> Self {
116 let mut log_levels = HashMap::new();
117 log_levels.insert(PoolEventType::ConnectionAcquired, LogLevel::Debug);
118 log_levels.insert(PoolEventType::ConnectionReleased, LogLevel::Debug);
119 log_levels.insert(PoolEventType::AcquisitionTimeout, LogLevel::Warn);
120 log_levels.insert(PoolEventType::PoolExhausted, LogLevel::Error);
121 log_levels.insert(PoolEventType::ConnectionCreated, LogLevel::Info);
122 log_levels.insert(PoolEventType::ConnectionClosed, LogLevel::Info);
123 log_levels.insert(PoolEventType::ValidationFailed, LogLevel::Warn);
124
125 Self { log_levels }
126 }
127}
128
129#[async_trait]
130impl PoolEventHandler for LoggingEventHandler {
131 async fn handle_event(&self, event: PoolEvent) {
132 let level = self
133 .log_levels
134 .get(&event.event_type)
135 .copied()
136 .unwrap_or(LogLevel::Info);
137
138 let message = format!(
139 "Pool event: {:?}, size: {}, idle: {}",
140 event.event_type, event.pool_size, event.idle_connections
141 );
142
143 match level {
144 LogLevel::Debug => debug!(
145 event_id = %event.event_id,
146 event_type = ?event.event_type,
147 pool_size = event.pool_size,
148 idle = event.idle_connections,
149 wait_time_ms = ?event.wait_time_ms,
150 "{}", message
151 ),
152 LogLevel::Info => info!(
153 event_id = %event.event_id,
154 event_type = ?event.event_type,
155 pool_size = event.pool_size,
156 idle = event.idle_connections,
157 wait_time_ms = ?event.wait_time_ms,
158 "{}", message
159 ),
160 LogLevel::Warn => warn!(
161 event_id = %event.event_id,
162 event_type = ?event.event_type,
163 pool_size = event.pool_size,
164 idle = event.idle_connections,
165 wait_time_ms = ?event.wait_time_ms,
166 error = ?event.error,
167 "{}", message
168 ),
169 LogLevel::Error => error!(
170 event_id = %event.event_id,
171 event_type = ?event.event_type,
172 pool_size = event.pool_size,
173 idle = event.idle_connections,
174 wait_time_ms = ?event.wait_time_ms,
175 error = ?event.error,
176 "{}", message
177 ),
178 }
179 }
180}
181
182#[derive(Debug, Clone)]
184pub struct MetricsEventHandler {
185 counts: Arc<RwLock<HashMap<PoolEventType, u64>>>,
187 total_wait_time_ms: Arc<RwLock<u64>>,
189 wait_time_samples: Arc<RwLock<u64>>,
191}
192
193impl Default for MetricsEventHandler {
194 fn default() -> Self {
195 Self {
196 counts: Arc::new(RwLock::new(HashMap::new())),
197 total_wait_time_ms: Arc::new(RwLock::new(0)),
198 wait_time_samples: Arc::new(RwLock::new(0)),
199 }
200 }
201}
202
203impl MetricsEventHandler {
204 pub fn get_count(&self, event_type: &PoolEventType) -> u64 {
206 self.counts.read().get(event_type).copied().unwrap_or(0)
207 }
208
209 pub fn get_avg_wait_time_ms(&self) -> f64 {
211 let total = *self.total_wait_time_ms.read();
212 let samples = *self.wait_time_samples.read();
213
214 if samples == 0 {
215 0.0
216 } else {
217 total as f64 / samples as f64
218 }
219 }
220
221 pub fn reset(&self) {
223 self.counts.write().clear();
224 *self.total_wait_time_ms.write() = 0;
225 *self.wait_time_samples.write() = 0;
226 }
227}
228
229#[async_trait]
230impl PoolEventHandler for MetricsEventHandler {
231 async fn handle_event(&self, event: PoolEvent) {
232 let mut counts = self.counts.write();
234 *counts.entry(event.event_type.clone()).or_insert(0) += 1;
235 drop(counts);
236
237 if let Some(wait_time_ms) = event.wait_time_ms {
239 *self.total_wait_time_ms.write() += wait_time_ms;
240 *self.wait_time_samples.write() += 1;
241 }
242 }
243}
244
245#[derive(Clone)]
247pub struct AlertEventHandler {
248 alert_callback: Arc<dyn Fn(PoolEvent) + Send + Sync>,
250}
251
252impl AlertEventHandler {
253 pub fn new<F>(callback: F) -> Self
255 where
256 F: Fn(PoolEvent) + Send + Sync + 'static,
257 {
258 Self {
259 alert_callback: Arc::new(callback),
260 }
261 }
262}
263
264#[async_trait]
265impl PoolEventHandler for AlertEventHandler {
266 async fn handle_event(&self, event: PoolEvent) {
267 match event.event_type {
269 PoolEventType::AcquisitionTimeout
270 | PoolEventType::PoolExhausted
271 | PoolEventType::ValidationFailed => {
272 (self.alert_callback)(event);
273 }
274 _ => {}
275 }
276 }
277}
278
279#[derive(Clone)]
281pub struct PoolEventManager {
282 handlers: Arc<RwLock<Vec<Arc<dyn PoolEventHandler>>>>,
283}
284
285impl Default for PoolEventManager {
286 fn default() -> Self {
287 Self::new()
288 }
289}
290
291impl PoolEventManager {
292 pub fn new() -> Self {
294 Self {
295 handlers: Arc::new(RwLock::new(Vec::new())),
296 }
297 }
298
299 pub fn add_handler(&self, handler: Arc<dyn PoolEventHandler>) {
301 self.handlers.write().push(handler);
302 }
303
304 pub async fn emit(&self, event: PoolEvent) {
306 let handlers = self.handlers.read().clone();
307
308 for handler in handlers {
309 handler.handle_event(event.clone()).await;
310 }
311 }
312
313 pub async fn emit_acquired(&self, pool_size: u32, idle: u32, wait_time: Duration) {
315 let event = PoolEvent::new(PoolEventType::ConnectionAcquired, pool_size, idle)
316 .with_wait_time(wait_time);
317 self.emit(event).await;
318 }
319
320 pub async fn emit_released(&self, pool_size: u32, idle: u32) {
322 let event = PoolEvent::new(PoolEventType::ConnectionReleased, pool_size, idle);
323 self.emit(event).await;
324 }
325
326 pub async fn emit_timeout(&self, pool_size: u32, idle: u32, timeout: Duration) {
328 let event = PoolEvent::new(PoolEventType::AcquisitionTimeout, pool_size, idle)
329 .with_wait_time(timeout)
330 .with_error(format!(
331 "Connection acquisition timed out after {}ms",
332 timeout.as_millis()
333 ));
334 self.emit(event).await;
335 }
336
337 pub async fn emit_exhausted(&self, pool_size: u32) {
339 let event = PoolEvent::new(PoolEventType::PoolExhausted, pool_size, 0)
340 .with_error("Pool has no available connections".to_string());
341 self.emit(event).await;
342 }
343}
344
345#[cfg(test)]
346mod tests {
347 use super::*;
348
349 #[test]
350 fn test_pool_event_creation() {
351 let event = PoolEvent::new(PoolEventType::ConnectionAcquired, 10, 5);
352
353 assert_eq!(event.event_type, PoolEventType::ConnectionAcquired);
354 assert_eq!(event.pool_size, 10);
355 assert_eq!(event.idle_connections, 5);
356 assert!(event.wait_time_ms.is_none());
357 assert!(event.error.is_none());
358 }
359
360 #[test]
361 fn test_pool_event_with_wait_time() {
362 let event = PoolEvent::new(PoolEventType::ConnectionAcquired, 10, 5)
363 .with_wait_time(Duration::from_millis(150));
364
365 assert_eq!(event.wait_time_ms, Some(150));
366 }
367
368 #[test]
369 fn test_pool_event_with_error() {
370 let event = PoolEvent::new(PoolEventType::AcquisitionTimeout, 10, 0)
371 .with_error("Timeout occurred".to_string());
372
373 assert_eq!(event.error, Some("Timeout occurred".to_string()));
374 }
375
376 #[tokio::test]
377 async fn test_logging_event_handler() {
378 let handler = LoggingEventHandler::default();
379 let event = PoolEvent::new(PoolEventType::ConnectionAcquired, 10, 5);
380
381 handler.handle_event(event).await;
383 }
384
385 #[tokio::test]
386 async fn test_metrics_event_handler() {
387 let handler = MetricsEventHandler::default();
388
389 for _ in 0..5 {
391 let event = PoolEvent::new(PoolEventType::ConnectionAcquired, 10, 5)
392 .with_wait_time(Duration::from_millis(100));
393 handler.handle_event(event).await;
394 }
395
396 assert_eq!(handler.get_count(&PoolEventType::ConnectionAcquired), 5);
397 assert!((handler.get_avg_wait_time_ms() - 100.0).abs() < 0.1);
398 }
399
400 #[tokio::test]
401 async fn test_metrics_event_handler_reset() {
402 let handler = MetricsEventHandler::default();
403
404 let event = PoolEvent::new(PoolEventType::ConnectionAcquired, 10, 5)
405 .with_wait_time(Duration::from_millis(100));
406 handler.handle_event(event).await;
407
408 handler.reset();
409
410 assert_eq!(handler.get_count(&PoolEventType::ConnectionAcquired), 0);
411 assert_eq!(handler.get_avg_wait_time_ms(), 0.0);
412 }
413
414 #[tokio::test]
415 async fn test_alert_event_handler() {
416 let alert_triggered = Arc::new(RwLock::new(false));
417 let alert_triggered_clone = alert_triggered.clone();
418
419 let handler = AlertEventHandler::new(move |_event| {
420 *alert_triggered_clone.write() = true;
421 });
422
423 let event = PoolEvent::new(PoolEventType::ConnectionAcquired, 10, 5);
425 handler.handle_event(event).await;
426 assert!(!*alert_triggered.read());
427
428 let event = PoolEvent::new(PoolEventType::PoolExhausted, 10, 0);
430 handler.handle_event(event).await;
431 assert!(*alert_triggered.read());
432 }
433
434 #[tokio::test]
435 async fn test_pool_event_manager() {
436 let manager = PoolEventManager::new();
437 let metrics = Arc::new(MetricsEventHandler::default());
438
439 manager.add_handler(metrics.clone());
440
441 manager
442 .emit_acquired(10, 5, Duration::from_millis(100))
443 .await;
444 manager.emit_released(10, 6).await;
445
446 assert_eq!(metrics.get_count(&PoolEventType::ConnectionAcquired), 1);
447 assert_eq!(metrics.get_count(&PoolEventType::ConnectionReleased), 1);
448 }
449
450 #[tokio::test]
451 async fn test_pool_event_manager_multiple_handlers() {
452 let manager = PoolEventManager::new();
453 let metrics1 = Arc::new(MetricsEventHandler::default());
454 let metrics2 = Arc::new(MetricsEventHandler::default());
455
456 manager.add_handler(metrics1.clone());
457 manager.add_handler(metrics2.clone());
458
459 let event = PoolEvent::new(PoolEventType::ConnectionAcquired, 10, 5);
460 manager.emit(event).await;
461
462 assert_eq!(metrics1.get_count(&PoolEventType::ConnectionAcquired), 1);
464 assert_eq!(metrics2.get_count(&PoolEventType::ConnectionAcquired), 1);
465 }
466
467 #[tokio::test]
468 async fn test_emit_timeout() {
469 let manager = PoolEventManager::new();
470 let metrics = Arc::new(MetricsEventHandler::default());
471
472 manager.add_handler(metrics.clone());
473 manager.emit_timeout(10, 0, Duration::from_secs(5)).await;
474
475 assert_eq!(metrics.get_count(&PoolEventType::AcquisitionTimeout), 1);
476 }
477
478 #[tokio::test]
479 async fn test_emit_exhausted() {
480 let manager = PoolEventManager::new();
481 let metrics = Arc::new(MetricsEventHandler::default());
482
483 manager.add_handler(metrics.clone());
484 manager.emit_exhausted(10).await;
485
486 assert_eq!(metrics.get_count(&PoolEventType::PoolExhausted), 1);
487 }
488}