Skip to main content

entdb_server/server/
metrics.rs

1/*
2 * Copyright 2026 EntDB Authors
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17use entdb::storage::buffer_pool::BufferPoolStats;
18use parking_lot::Mutex;
19use std::collections::HashMap;
20use std::sync::atomic::{AtomicU64, Ordering};
21
22#[derive(Debug, Clone, Default)]
23pub struct ServerMetricsSnapshot {
24    pub connections_accepted: u64,
25    pub connections_refused: u64,
26    pub active_connections: u64,
27    pub peak_connections: u64,
28    pub queries_total: u64,
29    pub queries_succeeded: u64,
30    pub queries_failed: u64,
31    pub query_timeouts: u64,
32    pub query_latency_total_ns: u64,
33    pub query_latency_max_ns: u64,
34    pub sqlstate_errors: HashMap<String, u64>,
35    pub shutdown_flush_total_ns: u64,
36    pub shutdown_persist_total_ns: u64,
37    pub wal_flushes: u64,
38    pub buffer_pool_pressure: Option<BufferPoolStats>,
39}
40
41#[derive(Debug, Default)]
42pub struct ServerMetrics {
43    connections_accepted: AtomicU64,
44    connections_refused: AtomicU64,
45    active_connections: AtomicU64,
46    peak_connections: AtomicU64,
47    queries_total: AtomicU64,
48    queries_succeeded: AtomicU64,
49    queries_failed: AtomicU64,
50    query_timeouts: AtomicU64,
51    query_latency_total_ns: AtomicU64,
52    query_latency_max_ns: AtomicU64,
53    shutdown_flush_total_ns: AtomicU64,
54    shutdown_persist_total_ns: AtomicU64,
55    wal_flushes: AtomicU64,
56    sqlstate_errors: Mutex<HashMap<String, u64>>,
57    buffer_pool_pressure: Mutex<Option<BufferPoolStats>>,
58}
59
60impl ServerMetrics {
61    pub fn on_connection_accepted(&self) {
62        self.connections_accepted.fetch_add(1, Ordering::Relaxed);
63        let active = self.active_connections.fetch_add(1, Ordering::Relaxed) + 1;
64        let mut peak = self.peak_connections.load(Ordering::Relaxed);
65        while active > peak {
66            match self.peak_connections.compare_exchange(
67                peak,
68                active,
69                Ordering::Relaxed,
70                Ordering::Relaxed,
71            ) {
72                Ok(_) => break,
73                Err(actual) => peak = actual,
74            }
75        }
76    }
77
78    pub fn on_connection_refused(&self) {
79        self.connections_refused.fetch_add(1, Ordering::Relaxed);
80    }
81
82    pub fn on_connection_closed(&self) {
83        self.active_connections.fetch_sub(1, Ordering::Relaxed);
84    }
85
86    pub fn on_query_finished(&self, elapsed_ns: u64, sqlstate: Option<&str>) {
87        self.queries_total.fetch_add(1, Ordering::Relaxed);
88        self.query_latency_total_ns
89            .fetch_add(elapsed_ns, Ordering::Relaxed);
90        self.update_latency_max(elapsed_ns);
91
92        match sqlstate {
93            None => {
94                self.queries_succeeded.fetch_add(1, Ordering::Relaxed);
95            }
96            Some(code) => {
97                self.queries_failed.fetch_add(1, Ordering::Relaxed);
98                if code == "57014" {
99                    self.query_timeouts.fetch_add(1, Ordering::Relaxed);
100                }
101                let mut map = self.sqlstate_errors.lock();
102                *map.entry(code.to_string()).or_insert(0) += 1;
103            }
104        }
105    }
106
107    pub fn on_shutdown_flush(&self, elapsed_ns: u64) {
108        self.shutdown_flush_total_ns
109            .fetch_add(elapsed_ns, Ordering::Relaxed);
110        self.wal_flushes.fetch_add(1, Ordering::Relaxed);
111    }
112
113    pub fn on_shutdown_persist(&self, elapsed_ns: u64) {
114        self.shutdown_persist_total_ns
115            .fetch_add(elapsed_ns, Ordering::Relaxed);
116    }
117
118    pub fn set_buffer_pool_pressure(&self, stats: BufferPoolStats) {
119        *self.buffer_pool_pressure.lock() = Some(stats);
120    }
121
122    pub fn snapshot(&self) -> ServerMetricsSnapshot {
123        ServerMetricsSnapshot {
124            connections_accepted: self.connections_accepted.load(Ordering::Relaxed),
125            connections_refused: self.connections_refused.load(Ordering::Relaxed),
126            active_connections: self.active_connections.load(Ordering::Relaxed),
127            peak_connections: self.peak_connections.load(Ordering::Relaxed),
128            queries_total: self.queries_total.load(Ordering::Relaxed),
129            queries_succeeded: self.queries_succeeded.load(Ordering::Relaxed),
130            queries_failed: self.queries_failed.load(Ordering::Relaxed),
131            query_timeouts: self.query_timeouts.load(Ordering::Relaxed),
132            query_latency_total_ns: self.query_latency_total_ns.load(Ordering::Relaxed),
133            query_latency_max_ns: self.query_latency_max_ns.load(Ordering::Relaxed),
134            sqlstate_errors: self.sqlstate_errors.lock().clone(),
135            shutdown_flush_total_ns: self.shutdown_flush_total_ns.load(Ordering::Relaxed),
136            shutdown_persist_total_ns: self.shutdown_persist_total_ns.load(Ordering::Relaxed),
137            wal_flushes: self.wal_flushes.load(Ordering::Relaxed),
138            buffer_pool_pressure: *self.buffer_pool_pressure.lock(),
139        }
140    }
141
142    fn update_latency_max(&self, elapsed_ns: u64) {
143        let mut current = self.query_latency_max_ns.load(Ordering::Relaxed);
144        while elapsed_ns > current {
145            match self.query_latency_max_ns.compare_exchange(
146                current,
147                elapsed_ns,
148                Ordering::Relaxed,
149                Ordering::Relaxed,
150            ) {
151                Ok(_) => break,
152                Err(actual) => current = actual,
153            }
154        }
155    }
156}
157
158#[cfg(test)]
159mod tests {
160    use super::ServerMetrics;
161
162    #[test]
163    fn metrics_track_connections_queries_and_errors() {
164        let m = ServerMetrics::default();
165
166        m.on_connection_accepted();
167        m.on_connection_accepted();
168        m.on_connection_refused();
169        m.on_connection_closed();
170
171        m.on_query_finished(10, None);
172        m.on_query_finished(20, Some("22000"));
173        m.on_query_finished(30, Some("57014"));
174
175        let s = m.snapshot();
176        assert_eq!(s.connections_accepted, 2);
177        assert_eq!(s.connections_refused, 1);
178        assert_eq!(s.active_connections, 1);
179        assert_eq!(s.peak_connections, 2);
180        assert_eq!(s.queries_total, 3);
181        assert_eq!(s.queries_succeeded, 1);
182        assert_eq!(s.queries_failed, 2);
183        assert_eq!(s.query_timeouts, 1);
184        assert_eq!(s.query_latency_total_ns, 60);
185        assert_eq!(s.query_latency_max_ns, 30);
186        assert_eq!(s.sqlstate_errors.get("22000"), Some(&1));
187        assert_eq!(s.sqlstate_errors.get("57014"), Some(&1));
188    }
189}