1use entdb::storage::buffer_pool::BufferPoolStats;
18use parking_lot::Mutex;
19use std::collections::HashMap;
20use std::sync::atomic::{AtomicU64, Ordering};
21
22#[derive(Debug, Clone, Default)]
23pub struct ServerMetricsSnapshot {
24 pub connections_accepted: u64,
25 pub connections_refused: u64,
26 pub active_connections: u64,
27 pub peak_connections: u64,
28 pub queries_total: u64,
29 pub queries_succeeded: u64,
30 pub queries_failed: u64,
31 pub query_timeouts: u64,
32 pub query_latency_total_ns: u64,
33 pub query_latency_max_ns: u64,
34 pub sqlstate_errors: HashMap<String, u64>,
35 pub shutdown_flush_total_ns: u64,
36 pub shutdown_persist_total_ns: u64,
37 pub wal_flushes: u64,
38 pub buffer_pool_pressure: Option<BufferPoolStats>,
39}
40
41#[derive(Debug, Default)]
42pub struct ServerMetrics {
43 connections_accepted: AtomicU64,
44 connections_refused: AtomicU64,
45 active_connections: AtomicU64,
46 peak_connections: AtomicU64,
47 queries_total: AtomicU64,
48 queries_succeeded: AtomicU64,
49 queries_failed: AtomicU64,
50 query_timeouts: AtomicU64,
51 query_latency_total_ns: AtomicU64,
52 query_latency_max_ns: AtomicU64,
53 shutdown_flush_total_ns: AtomicU64,
54 shutdown_persist_total_ns: AtomicU64,
55 wal_flushes: AtomicU64,
56 sqlstate_errors: Mutex<HashMap<String, u64>>,
57 buffer_pool_pressure: Mutex<Option<BufferPoolStats>>,
58}
59
60impl ServerMetrics {
61 pub fn on_connection_accepted(&self) {
62 self.connections_accepted.fetch_add(1, Ordering::Relaxed);
63 let active = self.active_connections.fetch_add(1, Ordering::Relaxed) + 1;
64 let mut peak = self.peak_connections.load(Ordering::Relaxed);
65 while active > peak {
66 match self.peak_connections.compare_exchange(
67 peak,
68 active,
69 Ordering::Relaxed,
70 Ordering::Relaxed,
71 ) {
72 Ok(_) => break,
73 Err(actual) => peak = actual,
74 }
75 }
76 }
77
78 pub fn on_connection_refused(&self) {
79 self.connections_refused.fetch_add(1, Ordering::Relaxed);
80 }
81
82 pub fn on_connection_closed(&self) {
83 self.active_connections.fetch_sub(1, Ordering::Relaxed);
84 }
85
86 pub fn on_query_finished(&self, elapsed_ns: u64, sqlstate: Option<&str>) {
87 self.queries_total.fetch_add(1, Ordering::Relaxed);
88 self.query_latency_total_ns
89 .fetch_add(elapsed_ns, Ordering::Relaxed);
90 self.update_latency_max(elapsed_ns);
91
92 match sqlstate {
93 None => {
94 self.queries_succeeded.fetch_add(1, Ordering::Relaxed);
95 }
96 Some(code) => {
97 self.queries_failed.fetch_add(1, Ordering::Relaxed);
98 if code == "57014" {
99 self.query_timeouts.fetch_add(1, Ordering::Relaxed);
100 }
101 let mut map = self.sqlstate_errors.lock();
102 *map.entry(code.to_string()).or_insert(0) += 1;
103 }
104 }
105 }
106
107 pub fn on_shutdown_flush(&self, elapsed_ns: u64) {
108 self.shutdown_flush_total_ns
109 .fetch_add(elapsed_ns, Ordering::Relaxed);
110 self.wal_flushes.fetch_add(1, Ordering::Relaxed);
111 }
112
113 pub fn on_shutdown_persist(&self, elapsed_ns: u64) {
114 self.shutdown_persist_total_ns
115 .fetch_add(elapsed_ns, Ordering::Relaxed);
116 }
117
118 pub fn set_buffer_pool_pressure(&self, stats: BufferPoolStats) {
119 *self.buffer_pool_pressure.lock() = Some(stats);
120 }
121
122 pub fn snapshot(&self) -> ServerMetricsSnapshot {
123 ServerMetricsSnapshot {
124 connections_accepted: self.connections_accepted.load(Ordering::Relaxed),
125 connections_refused: self.connections_refused.load(Ordering::Relaxed),
126 active_connections: self.active_connections.load(Ordering::Relaxed),
127 peak_connections: self.peak_connections.load(Ordering::Relaxed),
128 queries_total: self.queries_total.load(Ordering::Relaxed),
129 queries_succeeded: self.queries_succeeded.load(Ordering::Relaxed),
130 queries_failed: self.queries_failed.load(Ordering::Relaxed),
131 query_timeouts: self.query_timeouts.load(Ordering::Relaxed),
132 query_latency_total_ns: self.query_latency_total_ns.load(Ordering::Relaxed),
133 query_latency_max_ns: self.query_latency_max_ns.load(Ordering::Relaxed),
134 sqlstate_errors: self.sqlstate_errors.lock().clone(),
135 shutdown_flush_total_ns: self.shutdown_flush_total_ns.load(Ordering::Relaxed),
136 shutdown_persist_total_ns: self.shutdown_persist_total_ns.load(Ordering::Relaxed),
137 wal_flushes: self.wal_flushes.load(Ordering::Relaxed),
138 buffer_pool_pressure: *self.buffer_pool_pressure.lock(),
139 }
140 }
141
142 fn update_latency_max(&self, elapsed_ns: u64) {
143 let mut current = self.query_latency_max_ns.load(Ordering::Relaxed);
144 while elapsed_ns > current {
145 match self.query_latency_max_ns.compare_exchange(
146 current,
147 elapsed_ns,
148 Ordering::Relaxed,
149 Ordering::Relaxed,
150 ) {
151 Ok(_) => break,
152 Err(actual) => current = actual,
153 }
154 }
155 }
156}
157
158#[cfg(test)]
159mod tests {
160 use super::ServerMetrics;
161
162 #[test]
163 fn metrics_track_connections_queries_and_errors() {
164 let m = ServerMetrics::default();
165
166 m.on_connection_accepted();
167 m.on_connection_accepted();
168 m.on_connection_refused();
169 m.on_connection_closed();
170
171 m.on_query_finished(10, None);
172 m.on_query_finished(20, Some("22000"));
173 m.on_query_finished(30, Some("57014"));
174
175 let s = m.snapshot();
176 assert_eq!(s.connections_accepted, 2);
177 assert_eq!(s.connections_refused, 1);
178 assert_eq!(s.active_connections, 1);
179 assert_eq!(s.peak_connections, 2);
180 assert_eq!(s.queries_total, 3);
181 assert_eq!(s.queries_succeeded, 1);
182 assert_eq!(s.queries_failed, 2);
183 assert_eq!(s.query_timeouts, 1);
184 assert_eq!(s.query_latency_total_ns, 60);
185 assert_eq!(s.query_latency_max_ns, 30);
186 assert_eq!(s.sqlstate_errors.get("22000"), Some(&1));
187 assert_eq!(s.sqlstate_errors.get("57014"), Some(&1));
188 }
189}