1#![allow(clippy::missing_const_for_fn)]
7#![allow(clippy::cast_possible_truncation)]
8#![allow(clippy::cast_lossless)]
9#![allow(clippy::cast_precision_loss)]
10
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14
15use chrono::{DateTime, Utc};
16use serde::{Deserialize, Serialize};
17use tracing::{debug, instrument, warn};
18
19use crate::{PoolStatus, PostgresError};
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct PoolMetrics {
24 pub current_connections: u32,
26 pub idle_connections: u32,
28 pub active_connections: u32,
30 pub total_connections_created: u64,
32 pub total_connections_closed: u64,
34 pub avg_acquisition_time: Duration,
36 pub peak_connections: u32,
38 pub connection_timeouts: u64,
40 pub connection_errors: u64,
42 pub utilization_percent: f64,
44 pub is_healthy: bool,
46 pub last_updated: DateTime<Utc>,
48}
49
50#[derive(Debug)]
52pub struct PoolMonitor {
53 connections_created: AtomicU64,
55 connections_closed: AtomicU64,
56 connection_timeouts: AtomicU64,
57 connection_errors: AtomicU64,
58 peak_connections: AtomicU64,
59
60 total_acquisition_time: AtomicU64,
62 acquisition_count: AtomicU64,
63
64 max_connections: u32,
66 warning_threshold: f64, }
68
69impl PoolMonitor {
70 pub fn new(max_connections: u32) -> Self {
72 Self {
73 connections_created: AtomicU64::new(0),
74 connections_closed: AtomicU64::new(0),
75 connection_timeouts: AtomicU64::new(0),
76 connection_errors: AtomicU64::new(0),
77 peak_connections: AtomicU64::new(0),
78 total_acquisition_time: AtomicU64::new(0),
79 acquisition_count: AtomicU64::new(0),
80 max_connections,
81 warning_threshold: 80.0, }
83 }
84
85 pub fn record_connection_created(&self) {
87 self.connections_created.fetch_add(1, Ordering::Relaxed);
88 }
89
90 pub fn record_connection_closed(&self) {
92 self.connections_closed.fetch_add(1, Ordering::Relaxed);
93 }
94
95 pub fn record_connection_timeout(&self) {
97 self.connection_timeouts.fetch_add(1, Ordering::Relaxed);
98 debug!("Connection timeout recorded");
99 }
100
101 pub fn record_connection_error(&self) {
103 self.connection_errors.fetch_add(1, Ordering::Relaxed);
104 debug!("Connection error recorded");
105 }
106
107 pub fn record_acquisition_time(&self, duration: Duration) {
109 self.total_acquisition_time
110 .fetch_add(duration.as_micros() as u64, Ordering::Relaxed);
111 self.acquisition_count.fetch_add(1, Ordering::Relaxed);
112 }
113
114 pub fn update_peak_connections(&self, current: u32) {
116 let current_peak = self.peak_connections.load(Ordering::Relaxed) as u32;
117 if current > current_peak {
118 self.peak_connections
119 .store(current as u64, Ordering::Relaxed);
120 }
121 }
122
123 #[instrument(skip(self))]
125 pub fn get_metrics(&self, pool_status: &PoolStatus) -> PoolMetrics {
126 let current_connections = pool_status.size;
127 let idle_connections = pool_status.idle;
128 let active_connections = current_connections.saturating_sub(idle_connections);
129
130 self.update_peak_connections(current_connections);
132
133 let total_time = self.total_acquisition_time.load(Ordering::Relaxed);
135 let count = self.acquisition_count.load(Ordering::Relaxed);
136 let avg_acquisition_time = if count > 0 {
137 Duration::from_micros(total_time / count)
138 } else {
139 Duration::ZERO
140 };
141
142 let utilization_percent = if self.max_connections > 0 {
144 (current_connections as f64 / self.max_connections as f64) * 100.0
145 } else {
146 0.0
147 };
148
149 let is_healthy = self.is_pool_healthy(pool_status, utilization_percent);
151
152 if utilization_percent > self.warning_threshold {
154 warn!(
155 "Pool utilization high: {:.1}% ({}/{})",
156 utilization_percent, current_connections, self.max_connections
157 );
158 }
159
160 PoolMetrics {
161 current_connections,
162 idle_connections,
163 active_connections,
164 total_connections_created: self.connections_created.load(Ordering::Relaxed),
165 total_connections_closed: self.connections_closed.load(Ordering::Relaxed),
166 avg_acquisition_time,
167 peak_connections: self.peak_connections.load(Ordering::Relaxed) as u32,
168 connection_timeouts: self.connection_timeouts.load(Ordering::Relaxed),
169 connection_errors: self.connection_errors.load(Ordering::Relaxed),
170 utilization_percent,
171 is_healthy,
172 last_updated: Utc::now(),
173 }
174 }
175
176 fn is_pool_healthy(&self, pool_status: &PoolStatus, utilization_percent: f64) -> bool {
178 if pool_status.is_closed {
185 return false;
186 }
187
188 if utilization_percent > 95.0 {
189 return false;
190 }
191
192 if pool_status.idle == 0 && utilization_percent > 80.0 {
193 return false;
194 }
195
196 let recent_errors = self.connection_errors.load(Ordering::Relaxed);
198 let recent_acquisitions = self.acquisition_count.load(Ordering::Relaxed);
199
200 if recent_acquisitions > 0 {
201 let error_rate = recent_errors as f64 / recent_acquisitions as f64;
202 if error_rate > 0.1 {
203 return false;
205 }
206 }
207
208 true
209 }
210
211 pub fn reset_metrics(&self) {
213 self.connections_created.store(0, Ordering::Relaxed);
214 self.connections_closed.store(0, Ordering::Relaxed);
215 self.connection_timeouts.store(0, Ordering::Relaxed);
216 self.connection_errors.store(0, Ordering::Relaxed);
217 self.peak_connections.store(0, Ordering::Relaxed);
218 self.total_acquisition_time.store(0, Ordering::Relaxed);
219 self.acquisition_count.store(0, Ordering::Relaxed);
220 debug!("Pool metrics reset");
221 }
222
223 pub fn get_metrics_json(&self, pool_status: &PoolStatus) -> Result<String, PostgresError> {
225 let metrics = self.get_metrics(pool_status);
226 serde_json::to_string(&metrics).map_err(PostgresError::Serialization)
227 }
228}
229
230pub struct AcquisitionTimer {
232 start: Instant,
233 monitor: Arc<PoolMonitor>,
234}
235
236impl AcquisitionTimer {
237 pub fn new(monitor: Arc<PoolMonitor>) -> Self {
239 Self {
240 start: Instant::now(),
241 monitor,
242 }
243 }
244
245 pub fn complete(self) {
247 let duration = self.start.elapsed();
248 self.monitor.record_acquisition_time(duration);
249 }
250}
251
252pub struct PoolMonitoringTask {
254 monitor: Arc<PoolMonitor>,
255 interval: Duration,
256 stop_signal: tokio::sync::watch::Receiver<bool>,
257}
258
259impl PoolMonitoringTask {
260 pub fn new(
262 monitor: Arc<PoolMonitor>,
263 interval: Duration,
264 stop_signal: tokio::sync::watch::Receiver<bool>,
265 ) -> Self {
266 Self {
267 monitor,
268 interval,
269 stop_signal,
270 }
271 }
272
273 pub async fn run<F>(mut self, mut get_pool_status: F)
275 where
276 F: FnMut() -> PoolStatus + Send + 'static,
277 {
278 let mut interval_timer = tokio::time::interval(self.interval);
279
280 loop {
281 tokio::select! {
282 _ = interval_timer.tick() => {
283 let pool_status = get_pool_status();
284 let metrics = self.monitor.get_metrics(&pool_status);
285
286 debug!("Pool metrics: {:?}", metrics);
287
288 if !metrics.is_healthy {
290 warn!("Pool health check failed: {:?}", metrics);
291 }
292
293 if metrics.connection_errors > 0 {
294 warn!("Connection errors detected: {}", metrics.connection_errors);
295 }
296
297 if metrics.connection_timeouts > 0 {
298 warn!("Connection timeouts detected: {}", metrics.connection_timeouts);
299 }
300 }
301 _ = self.stop_signal.changed() => {
302 if *self.stop_signal.borrow() {
303 debug!("Pool monitoring task stopped");
304 break;
305 }
306 }
307 }
308 }
309 }
310}
311
312#[cfg(test)]
313mod tests {
314 use super::*;
315
316 #[test]
317 fn test_pool_monitor_creation() {
318 let monitor = PoolMonitor::new(10);
319
320 assert_eq!(monitor.connections_created.load(Ordering::Relaxed), 0);
322 assert_eq!(monitor.connections_closed.load(Ordering::Relaxed), 0);
323 assert_eq!(monitor.max_connections, 10);
324 }
325
326 #[test]
327 fn test_pool_monitor_record_operations() {
328 let monitor = PoolMonitor::new(10);
329
330 monitor.record_connection_created();
332 monitor.record_connection_created();
333 monitor.record_connection_closed();
334 monitor.record_connection_timeout();
335 monitor.record_connection_error();
336
337 assert_eq!(monitor.connections_created.load(Ordering::Relaxed), 2);
339 assert_eq!(monitor.connections_closed.load(Ordering::Relaxed), 1);
340 assert_eq!(monitor.connection_timeouts.load(Ordering::Relaxed), 1);
341 assert_eq!(monitor.connection_errors.load(Ordering::Relaxed), 1);
342 }
343
344 #[test]
345 fn test_pool_monitor_metrics() {
346 let monitor = PoolMonitor::new(10);
347
348 monitor.record_connection_created();
350 monitor.record_connection_created();
351 monitor.record_acquisition_time(Duration::from_millis(50));
352 monitor.record_acquisition_time(Duration::from_millis(100));
353
354 let pool_status = PoolStatus {
355 size: 5,
356 idle: 2,
357 is_closed: false,
358 };
359
360 let metrics = monitor.get_metrics(&pool_status);
361
362 assert_eq!(metrics.current_connections, 5);
363 assert_eq!(metrics.idle_connections, 2);
364 assert_eq!(metrics.active_connections, 3);
365 assert_eq!(metrics.total_connections_created, 2);
366 assert_eq!(metrics.avg_acquisition_time, Duration::from_millis(75)); assert!((metrics.utilization_percent - 50.0).abs() < f64::EPSILON); assert!(metrics.is_healthy);
369 }
370
371 #[test]
372 fn test_pool_health_assessment() {
373 let monitor = PoolMonitor::new(10);
374
375 let healthy_status = PoolStatus {
377 size: 5,
378 idle: 2,
379 is_closed: false,
380 };
381 assert!(monitor.is_pool_healthy(&healthy_status, 50.0));
382
383 let closed_status = PoolStatus {
385 size: 5,
386 idle: 2,
387 is_closed: true,
388 };
389 assert!(!monitor.is_pool_healthy(&closed_status, 50.0));
390
391 let overutil_status = PoolStatus {
393 size: 10,
394 idle: 0,
395 is_closed: false,
396 };
397 assert!(!monitor.is_pool_healthy(&overutil_status, 96.0));
398 }
399
400 #[test]
401 fn test_acquisition_timer() {
402 let monitor = Arc::new(PoolMonitor::new(10));
403 let timer = AcquisitionTimer::new(Arc::clone(&monitor));
404
405 std::thread::sleep(std::time::Duration::from_millis(1));
407
408 timer.complete();
410
411 assert_eq!(monitor.acquisition_count.load(Ordering::Relaxed), 1);
413 assert!(monitor.total_acquisition_time.load(Ordering::Relaxed) > 0);
414 }
415
416 #[test]
417 fn test_metrics_reset() {
418 let monitor = PoolMonitor::new(10);
419
420 monitor.record_connection_created();
422 monitor.record_connection_error();
423 monitor.record_acquisition_time(Duration::from_millis(100));
424
425 assert!(monitor.connections_created.load(Ordering::Relaxed) > 0);
427 assert!(monitor.connection_errors.load(Ordering::Relaxed) > 0);
428 assert!(monitor.acquisition_count.load(Ordering::Relaxed) > 0);
429
430 monitor.reset_metrics();
432 assert_eq!(monitor.connections_created.load(Ordering::Relaxed), 0);
433 assert_eq!(monitor.connection_errors.load(Ordering::Relaxed), 0);
434 assert_eq!(monitor.acquisition_count.load(Ordering::Relaxed), 0);
435 }
436}