1use super::mode::PoolingMode;
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::Arc;
10
11#[derive(Debug)]
13pub struct PoolModeMetrics {
14 pub acquires: AtomicU64,
16 pub releases: AtomicU64,
18 pub acquire_failures: AtomicU64,
20 pub acquire_timeouts: AtomicU64,
22 pub connections_created: AtomicU64,
24 pub connections_closed: AtomicU64,
26 pub connection_resets: AtomicU64,
28 pub reset_failures: AtomicU64,
30 pub transactions_completed: AtomicU64,
32 pub statements_executed: AtomicU64,
34 pub active_leases: AtomicU64,
36 pub peak_active_leases: AtomicU64,
38 pub queue_waits: AtomicU64,
40 mode_stats: Arc<parking_lot::RwLock<HashMap<PoolingMode, ModeStats>>>,
42}
43
44#[derive(Debug, Clone, Default, Serialize, Deserialize)]
46pub struct ModeStats {
47 pub active_connections: u64,
49 pub total_acquires: u64,
51 pub total_releases: u64,
53 pub avg_lease_duration_ms: f64,
55 pub avg_statements_per_lease: f64,
57}
58
59impl Default for PoolModeMetrics {
60 fn default() -> Self {
61 Self::new()
62 }
63}
64
65impl PoolModeMetrics {
66 pub fn new() -> Self {
68 let mut mode_stats = HashMap::new();
69 mode_stats.insert(PoolingMode::Session, ModeStats::default());
70 mode_stats.insert(PoolingMode::Transaction, ModeStats::default());
71 mode_stats.insert(PoolingMode::Statement, ModeStats::default());
72
73 Self {
74 acquires: AtomicU64::new(0),
75 releases: AtomicU64::new(0),
76 acquire_failures: AtomicU64::new(0),
77 acquire_timeouts: AtomicU64::new(0),
78 connections_created: AtomicU64::new(0),
79 connections_closed: AtomicU64::new(0),
80 connection_resets: AtomicU64::new(0),
81 reset_failures: AtomicU64::new(0),
82 transactions_completed: AtomicU64::new(0),
83 statements_executed: AtomicU64::new(0),
84 active_leases: AtomicU64::new(0),
85 peak_active_leases: AtomicU64::new(0),
86 queue_waits: AtomicU64::new(0),
87 mode_stats: Arc::new(parking_lot::RwLock::new(mode_stats)),
88 }
89 }
90
91 pub fn record_acquire(&self, mode: PoolingMode) {
93 self.acquires.fetch_add(1, Ordering::Relaxed);
94 let active = self.active_leases.fetch_add(1, Ordering::Relaxed) + 1;
95
96 loop {
98 let peak = self.peak_active_leases.load(Ordering::Relaxed);
99 if active <= peak {
100 break;
101 }
102 if self
103 .peak_active_leases
104 .compare_exchange(peak, active, Ordering::Relaxed, Ordering::Relaxed)
105 .is_ok()
106 {
107 break;
108 }
109 }
110
111 let mut stats = self.mode_stats.write();
113 if let Some(mode_stat) = stats.get_mut(&mode) {
114 mode_stat.active_connections += 1;
115 mode_stat.total_acquires += 1;
116 }
117 }
118
119 pub fn record_release(&self, mode: PoolingMode, lease_duration_ms: u64, statements: u64) {
121 self.releases.fetch_add(1, Ordering::Relaxed);
122 self.active_leases.fetch_sub(1, Ordering::Relaxed);
123 self.statements_executed
124 .fetch_add(statements, Ordering::Relaxed);
125
126 let mut stats = self.mode_stats.write();
128 if let Some(mode_stat) = stats.get_mut(&mode) {
129 mode_stat.active_connections = mode_stat.active_connections.saturating_sub(1);
130 mode_stat.total_releases += 1;
131
132 let n = mode_stat.total_releases as f64;
134 mode_stat.avg_lease_duration_ms =
135 mode_stat.avg_lease_duration_ms * ((n - 1.0) / n) + (lease_duration_ms as f64 / n);
136
137 mode_stat.avg_statements_per_lease =
139 mode_stat.avg_statements_per_lease * ((n - 1.0) / n) + (statements as f64 / n);
140 }
141 }
142
143 pub fn record_acquire_failure(&self) {
145 self.acquire_failures.fetch_add(1, Ordering::Relaxed);
146 }
147
148 pub fn record_acquire_timeout(&self) {
150 self.acquire_timeouts.fetch_add(1, Ordering::Relaxed);
151 }
152
153 pub fn record_connection_created(&self) {
155 self.connections_created.fetch_add(1, Ordering::Relaxed);
156 }
157
158 pub fn record_connection_closed(&self) {
160 self.connections_closed.fetch_add(1, Ordering::Relaxed);
161 }
162
163 pub fn record_reset(&self, success: bool) {
165 self.connection_resets.fetch_add(1, Ordering::Relaxed);
166 if !success {
167 self.reset_failures.fetch_add(1, Ordering::Relaxed);
168 }
169 }
170
171 pub fn record_transaction_complete(&self) {
173 self.transactions_completed.fetch_add(1, Ordering::Relaxed);
174 }
175
176 pub fn record_queue_wait(&self) {
178 self.queue_waits.fetch_add(1, Ordering::Relaxed);
179 }
180
181 pub fn snapshot(&self) -> MetricsSnapshot {
183 MetricsSnapshot {
184 acquires: self.acquires.load(Ordering::Relaxed),
185 releases: self.releases.load(Ordering::Relaxed),
186 acquire_failures: self.acquire_failures.load(Ordering::Relaxed),
187 acquire_timeouts: self.acquire_timeouts.load(Ordering::Relaxed),
188 connections_created: self.connections_created.load(Ordering::Relaxed),
189 connections_closed: self.connections_closed.load(Ordering::Relaxed),
190 connection_resets: self.connection_resets.load(Ordering::Relaxed),
191 reset_failures: self.reset_failures.load(Ordering::Relaxed),
192 transactions_completed: self.transactions_completed.load(Ordering::Relaxed),
193 statements_executed: self.statements_executed.load(Ordering::Relaxed),
194 active_leases: self.active_leases.load(Ordering::Relaxed),
195 peak_active_leases: self.peak_active_leases.load(Ordering::Relaxed),
196 queue_waits: self.queue_waits.load(Ordering::Relaxed),
197 mode_stats: self.mode_stats.read().clone(),
198 }
199 }
200
201 pub fn reset(&self) {
203 self.acquires.store(0, Ordering::Relaxed);
204 self.releases.store(0, Ordering::Relaxed);
205 self.acquire_failures.store(0, Ordering::Relaxed);
206 self.acquire_timeouts.store(0, Ordering::Relaxed);
207 self.connections_created.store(0, Ordering::Relaxed);
208 self.connections_closed.store(0, Ordering::Relaxed);
209 self.connection_resets.store(0, Ordering::Relaxed);
210 self.reset_failures.store(0, Ordering::Relaxed);
211 self.transactions_completed.store(0, Ordering::Relaxed);
212 self.statements_executed.store(0, Ordering::Relaxed);
213 self.queue_waits.store(0, Ordering::Relaxed);
215
216 let mut stats = self.mode_stats.write();
217 for stat in stats.values_mut() {
218 *stat = ModeStats::default();
219 }
220 }
221}
222
223#[derive(Debug, Clone, Serialize, Deserialize)]
225pub struct MetricsSnapshot {
226 pub acquires: u64,
227 pub releases: u64,
228 pub acquire_failures: u64,
229 pub acquire_timeouts: u64,
230 pub connections_created: u64,
231 pub connections_closed: u64,
232 pub connection_resets: u64,
233 pub reset_failures: u64,
234 pub transactions_completed: u64,
235 pub statements_executed: u64,
236 pub active_leases: u64,
237 pub peak_active_leases: u64,
238 pub queue_waits: u64,
239 pub mode_stats: HashMap<PoolingMode, ModeStats>,
240}
241
242impl MetricsSnapshot {
243 pub fn connection_efficiency(&self) -> f64 {
245 if self.acquires == 0 {
246 1.0
247 } else {
248 self.releases as f64 / self.acquires as f64
249 }
250 }
251
252 pub fn reset_success_rate(&self) -> f64 {
254 if self.connection_resets == 0 {
255 1.0
256 } else {
257 1.0 - (self.reset_failures as f64 / self.connection_resets as f64)
258 }
259 }
260
261 pub fn acquire_success_rate(&self) -> f64 {
263 let total_attempts = self.acquires + self.acquire_failures + self.acquire_timeouts;
264 if total_attempts == 0 {
265 1.0
266 } else {
267 self.acquires as f64 / total_attempts as f64
268 }
269 }
270}
271
272#[cfg(test)]
273mod tests {
274 use super::*;
275
276 #[test]
277 fn test_metrics_new() {
278 let metrics = PoolModeMetrics::new();
279 let snapshot = metrics.snapshot();
280 assert_eq!(snapshot.acquires, 0);
281 assert_eq!(snapshot.releases, 0);
282 assert_eq!(snapshot.active_leases, 0);
283 }
284
285 #[test]
286 fn test_record_acquire_release() {
287 let metrics = PoolModeMetrics::new();
288
289 metrics.record_acquire(PoolingMode::Transaction);
290 assert_eq!(metrics.active_leases.load(Ordering::Relaxed), 1);
291 assert_eq!(metrics.acquires.load(Ordering::Relaxed), 1);
292
293 metrics.record_release(PoolingMode::Transaction, 100, 5);
294 assert_eq!(metrics.active_leases.load(Ordering::Relaxed), 0);
295 assert_eq!(metrics.releases.load(Ordering::Relaxed), 1);
296 assert_eq!(metrics.statements_executed.load(Ordering::Relaxed), 5);
297 }
298
299 #[test]
300 fn test_peak_active_leases() {
301 let metrics = PoolModeMetrics::new();
302
303 metrics.record_acquire(PoolingMode::Session);
304 metrics.record_acquire(PoolingMode::Session);
305 metrics.record_acquire(PoolingMode::Session);
306 assert_eq!(metrics.peak_active_leases.load(Ordering::Relaxed), 3);
307
308 metrics.record_release(PoolingMode::Session, 100, 1);
309 metrics.record_release(PoolingMode::Session, 100, 1);
310 assert_eq!(metrics.peak_active_leases.load(Ordering::Relaxed), 3);
311
312 metrics.record_acquire(PoolingMode::Session);
313 assert_eq!(metrics.peak_active_leases.load(Ordering::Relaxed), 3);
314
315 metrics.record_acquire(PoolingMode::Session);
316 metrics.record_acquire(PoolingMode::Session);
317 assert_eq!(metrics.peak_active_leases.load(Ordering::Relaxed), 4);
318 }
319
320 #[test]
321 fn test_mode_stats() {
322 let metrics = PoolModeMetrics::new();
323
324 metrics.record_acquire(PoolingMode::Transaction);
325 metrics.record_release(PoolingMode::Transaction, 200, 10);
326
327 metrics.record_acquire(PoolingMode::Transaction);
328 metrics.record_release(PoolingMode::Transaction, 100, 5);
329
330 let snapshot = metrics.snapshot();
331 let txn_stats = snapshot.mode_stats.get(&PoolingMode::Transaction).unwrap();
332
333 assert_eq!(txn_stats.total_acquires, 2);
334 assert_eq!(txn_stats.total_releases, 2);
335 assert_eq!(txn_stats.avg_statements_per_lease, 7.5); }
337
338 #[test]
339 fn test_reset() {
340 let metrics = PoolModeMetrics::new();
341
342 metrics.record_acquire(PoolingMode::Session);
343 metrics.record_connection_created();
344 metrics.record_transaction_complete();
345
346 metrics.reset();
347
348 let snapshot = metrics.snapshot();
349 assert_eq!(snapshot.acquires, 0);
350 assert_eq!(snapshot.connections_created, 0);
351 assert_eq!(snapshot.transactions_completed, 0);
352 assert_eq!(snapshot.active_leases, 1);
354 }
355
356 #[test]
357 fn test_snapshot_calculations() {
358 let metrics = PoolModeMetrics::new();
359
360 for _ in 0..10 {
362 metrics.record_acquire(PoolingMode::Transaction);
363 metrics.record_release(PoolingMode::Transaction, 100, 1);
364 }
365
366 let snapshot = metrics.snapshot();
367 assert_eq!(snapshot.connection_efficiency(), 1.0);
368 assert_eq!(snapshot.acquire_success_rate(), 1.0);
369 }
370}