1use crate::monitoring::histogram::Histogram;
7use std::sync::atomic::{AtomicU64, Ordering};
8
9#[derive(Debug, Clone, Default)]
11pub struct MetricsSnapshot {
12 pub inserts_total: u64,
14 pub gets_total: u64,
15 pub deletes_total: u64,
16 pub sql_queries_total: u64,
17 pub flush_total: u64,
18
19 pub delta_hits: u64,
21 pub delta_misses: u64,
22 pub cache_hits: u64,
23 pub cache_misses: u64,
24 pub wos_hits: u64,
25 pub wos_misses: u64,
26
27 pub scatter_writes_total: u64,
29 pub scatter_reads_total: u64,
30
31 pub partition_prune_hits: u64,
33
34 pub wal_appends_total: u64,
36 pub wal_compactions_total: u64,
37
38 pub avg_query_latency_us: u64,
40 pub avg_insert_latency_us: u64,
41
42 pub delta_hit_rate: f64,
44 pub cache_hit_rate: f64,
45 pub wos_hit_rate: f64,
46}
47
48pub struct DbxMetrics {
52 pub inserts_total: AtomicU64,
54 pub gets_total: AtomicU64,
55 pub deletes_total: AtomicU64,
56 pub sql_queries_total: AtomicU64,
57 pub flush_total: AtomicU64,
58
59 pub delta_hits: AtomicU64,
61 pub delta_misses: AtomicU64,
62 pub cache_hits: AtomicU64,
63 pub cache_misses: AtomicU64,
64 pub wos_hits: AtomicU64,
65 pub wos_misses: AtomicU64,
66
67 pub scatter_writes_total: AtomicU64,
69 pub scatter_reads_total: AtomicU64,
70
71 pub partition_prune_hits: AtomicU64,
73
74 pub wal_appends_total: AtomicU64,
76 pub wal_compactions_total: AtomicU64,
77
78 pub query_latency_us: Histogram,
80 pub insert_latency_us: Histogram,
81}
82
83impl DbxMetrics {
84 pub fn new() -> Self {
85 Self {
86 inserts_total: AtomicU64::new(0),
87 gets_total: AtomicU64::new(0),
88 deletes_total: AtomicU64::new(0),
89 sql_queries_total: AtomicU64::new(0),
90 flush_total: AtomicU64::new(0),
91
92 delta_hits: AtomicU64::new(0),
93 delta_misses: AtomicU64::new(0),
94 cache_hits: AtomicU64::new(0),
95 cache_misses: AtomicU64::new(0),
96 wos_hits: AtomicU64::new(0),
97 wos_misses: AtomicU64::new(0),
98
99 scatter_writes_total: AtomicU64::new(0),
100 scatter_reads_total: AtomicU64::new(0),
101
102 partition_prune_hits: AtomicU64::new(0),
103
104 wal_appends_total: AtomicU64::new(0),
105 wal_compactions_total: AtomicU64::new(0),
106
107 query_latency_us: Histogram::new(
108 "dbx_query_latency_us",
109 "SQL query execution latency in microseconds",
110 ),
111 insert_latency_us: Histogram::new(
112 "dbx_insert_latency_us",
113 "INSERT operation latency in microseconds",
114 ),
115 }
116 }
117
118 #[inline]
120 pub fn inc_inserts(&self) {
121 self.inserts_total.fetch_add(1, Ordering::Relaxed);
122 }
123
124 #[inline]
125 pub fn inc_gets(&self) {
126 self.gets_total.fetch_add(1, Ordering::Relaxed);
127 }
128
129 #[inline]
130 pub fn inc_deletes(&self) {
131 self.deletes_total.fetch_add(1, Ordering::Relaxed);
132 }
133
134 #[inline]
135 pub fn inc_sql_queries(&self) {
136 self.sql_queries_total.fetch_add(1, Ordering::Relaxed);
137 }
138
139 #[inline]
140 pub fn inc_flush(&self) {
141 self.flush_total.fetch_add(1, Ordering::Relaxed);
142 }
143
144 #[inline]
145 pub fn inc_delta_hit(&self) {
146 self.delta_hits.fetch_add(1, Ordering::Relaxed);
147 }
148
149 #[inline]
150 pub fn inc_delta_miss(&self) {
151 self.delta_misses.fetch_add(1, Ordering::Relaxed);
152 }
153
154 #[inline]
155 pub fn inc_cache_hit(&self) {
156 self.cache_hits.fetch_add(1, Ordering::Relaxed);
157 }
158
159 #[inline]
160 pub fn inc_cache_miss(&self) {
161 self.cache_misses.fetch_add(1, Ordering::Relaxed);
162 }
163
164 #[inline]
165 pub fn inc_wos_hit(&self) {
166 self.wos_hits.fetch_add(1, Ordering::Relaxed);
167 }
168
169 #[inline]
170 pub fn inc_wos_miss(&self) {
171 self.wos_misses.fetch_add(1, Ordering::Relaxed);
172 }
173
174 #[inline]
175 pub fn inc_scatter_write(&self) {
176 self.scatter_writes_total.fetch_add(1, Ordering::Relaxed);
177 }
178
179 #[inline]
180 pub fn inc_scatter_read(&self) {
181 self.scatter_reads_total.fetch_add(1, Ordering::Relaxed);
182 }
183
184 #[inline]
185 pub fn inc_partition_prune_hit(&self) {
186 self.partition_prune_hits.fetch_add(1, Ordering::Relaxed);
187 }
188
189 #[inline]
190 pub fn inc_wal_append(&self) {
191 self.wal_appends_total.fetch_add(1, Ordering::Relaxed);
192 }
193
194 #[inline]
195 pub fn inc_wal_compaction(&self) {
196 self.wal_compactions_total.fetch_add(1, Ordering::Relaxed);
197 }
198
199 pub fn snapshot(&self) -> MetricsSnapshot {
201 let delta_hits = self.delta_hits.load(Ordering::Relaxed);
202 let delta_misses = self.delta_misses.load(Ordering::Relaxed);
203 let delta_total = delta_hits + delta_misses;
204
205 let cache_hits = self.cache_hits.load(Ordering::Relaxed);
206 let cache_misses = self.cache_misses.load(Ordering::Relaxed);
207 let cache_total = cache_hits + cache_misses;
208
209 let wos_hits = self.wos_hits.load(Ordering::Relaxed);
210 let wos_misses = self.wos_misses.load(Ordering::Relaxed);
211 let wos_total = wos_hits + wos_misses;
212
213 let (q_sum, q_count) = self.query_latency_us.snapshot();
214 let (i_sum, i_count) = self.insert_latency_us.snapshot();
215
216 MetricsSnapshot {
217 inserts_total: self.inserts_total.load(Ordering::Relaxed),
218 gets_total: self.gets_total.load(Ordering::Relaxed),
219 deletes_total: self.deletes_total.load(Ordering::Relaxed),
220 sql_queries_total: self.sql_queries_total.load(Ordering::Relaxed),
221 flush_total: self.flush_total.load(Ordering::Relaxed),
222
223 delta_hits,
224 delta_misses,
225 cache_hits,
226 cache_misses,
227 wos_hits,
228 wos_misses,
229
230 scatter_writes_total: self.scatter_writes_total.load(Ordering::Relaxed),
231 scatter_reads_total: self.scatter_reads_total.load(Ordering::Relaxed),
232
233 partition_prune_hits: self.partition_prune_hits.load(Ordering::Relaxed),
234
235 wal_appends_total: self.wal_appends_total.load(Ordering::Relaxed),
236 wal_compactions_total: self.wal_compactions_total.load(Ordering::Relaxed),
237
238 avg_query_latency_us: if q_count > 0 { q_sum / q_count } else { 0 },
239 avg_insert_latency_us: if i_count > 0 { i_sum / i_count } else { 0 },
240
241 delta_hit_rate: if delta_total > 0 {
242 delta_hits as f64 / delta_total as f64
243 } else {
244 0.0
245 },
246 cache_hit_rate: if cache_total > 0 {
247 cache_hits as f64 / cache_total as f64
248 } else {
249 0.0
250 },
251 wos_hit_rate: if wos_total > 0 {
252 wos_hits as f64 / wos_total as f64
253 } else {
254 0.0
255 },
256 }
257 }
258
259 pub fn reset(&self) {
261 self.inserts_total.store(0, Ordering::Relaxed);
262 self.gets_total.store(0, Ordering::Relaxed);
263 self.deletes_total.store(0, Ordering::Relaxed);
264 self.sql_queries_total.store(0, Ordering::Relaxed);
265 self.flush_total.store(0, Ordering::Relaxed);
266 self.delta_hits.store(0, Ordering::Relaxed);
267 self.delta_misses.store(0, Ordering::Relaxed);
268 self.cache_hits.store(0, Ordering::Relaxed);
269 self.cache_misses.store(0, Ordering::Relaxed);
270 self.wos_hits.store(0, Ordering::Relaxed);
271 self.wos_misses.store(0, Ordering::Relaxed);
272 self.scatter_writes_total.store(0, Ordering::Relaxed);
273 self.scatter_reads_total.store(0, Ordering::Relaxed);
274 self.partition_prune_hits.store(0, Ordering::Relaxed);
275 self.wal_appends_total.store(0, Ordering::Relaxed);
276 self.wal_compactions_total.store(0, Ordering::Relaxed);
277 self.query_latency_us.reset();
278 self.insert_latency_us.reset();
279 }
280}
281
282impl Default for DbxMetrics {
283 fn default() -> Self {
284 Self::new()
285 }
286}
287
288#[cfg(test)]
289mod tests {
290 use super::*;
291
292 #[test]
293 fn test_inc_counters() {
294 let m = DbxMetrics::new();
295 m.inc_inserts();
296 m.inc_inserts();
297 m.inc_gets();
298 let snap = m.snapshot();
299 assert_eq!(snap.inserts_total, 2);
300 assert_eq!(snap.gets_total, 1);
301 }
302
303 #[test]
304 fn test_hit_rate_calculation() {
305 let m = DbxMetrics::new();
306 m.inc_delta_hit();
307 m.inc_delta_hit();
308 m.inc_delta_miss();
309 let snap = m.snapshot();
310 assert!((snap.delta_hit_rate - 2.0 / 3.0).abs() < 0.001);
311 }
312
313 #[test]
314 fn test_reset() {
315 let m = DbxMetrics::new();
316 m.inc_inserts();
317 m.inc_inserts();
318 m.reset();
319 let snap = m.snapshot();
320 assert_eq!(snap.inserts_total, 0);
321 }
322
323 #[test]
324 fn test_latency_histogram() {
325 let m = DbxMetrics::new();
326 m.query_latency_us.observe(500);
327 m.query_latency_us.observe(1200);
328 let snap = m.snapshot();
329 assert_eq!(snap.avg_query_latency_us, 850); }
331}