rag_plusplus_core/observability/
metrics.rs1use metrics::{counter, gauge, histogram};
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::time::{Duration, Instant};
8
9#[derive(Debug, Clone)]
11pub struct MetricsConfig {
12 pub prefix: String,
14 pub per_index_metrics: bool,
16 pub latency_buckets: Vec<f64>,
18}
19
20impl Default for MetricsConfig {
21 fn default() -> Self {
22 Self {
23 prefix: "ragpp".into(),
24 per_index_metrics: true,
25 latency_buckets: vec![
26 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
27 ],
28 }
29 }
30}
31
32impl MetricsConfig {
33 #[must_use]
35 pub fn new() -> Self {
36 Self::default()
37 }
38
39 #[must_use]
41 pub fn with_prefix(mut self, prefix: impl Into<String>) -> Self {
42 self.prefix = prefix.into();
43 self
44 }
45}
46
47#[derive(Debug)]
61pub struct Metrics {
62 config: MetricsConfig,
63 queries_total: AtomicU64,
65 cache_hits: AtomicU64,
66 cache_misses: AtomicU64,
67 wal_writes: AtomicU64,
68 buffer_flushes: AtomicU64,
69 errors_total: AtomicU64,
70}
71
72impl Metrics {
73 #[must_use]
75 pub fn new(config: MetricsConfig) -> Self {
76 Self {
77 config,
78 queries_total: AtomicU64::new(0),
79 cache_hits: AtomicU64::new(0),
80 cache_misses: AtomicU64::new(0),
81 wal_writes: AtomicU64::new(0),
82 buffer_flushes: AtomicU64::new(0),
83 errors_total: AtomicU64::new(0),
84 }
85 }
86
87 #[must_use]
89 pub fn default_metrics() -> Self {
90 Self::new(MetricsConfig::default())
91 }
92
93 pub fn record_query(&self, latency: Duration, result_count: usize, index_name: Option<&str>) {
95 self.queries_total.fetch_add(1, Ordering::Relaxed);
96
97 let latency_secs = latency.as_secs_f64();
98 let prefix = &self.config.prefix;
99
100 if let Some(name) = index_name {
102 histogram!(format!("{prefix}_query_latency_seconds"), "index" => name.to_string())
103 .record(latency_secs);
104 counter!(format!("{prefix}_queries_total"), "index" => name.to_string())
105 .increment(1);
106 gauge!(format!("{prefix}_query_results"), "index" => name.to_string())
107 .set(result_count as f64);
108 } else {
109 histogram!(format!("{prefix}_query_latency_seconds"))
110 .record(latency_secs);
111 counter!(format!("{prefix}_queries_total"))
112 .increment(1);
113 gauge!(format!("{prefix}_query_results"))
114 .set(result_count as f64);
115 }
116 }
117
118 pub fn record_cache_hit(&self) {
120 self.cache_hits.fetch_add(1, Ordering::Relaxed);
121 counter!(format!("{}_cache_hits_total", self.config.prefix)).increment(1);
122 }
123
124 pub fn record_cache_miss(&self) {
126 self.cache_misses.fetch_add(1, Ordering::Relaxed);
127 counter!(format!("{}_cache_misses_total", self.config.prefix)).increment(1);
128 }
129
130 pub fn record_wal_write(&self) {
132 self.wal_writes.fetch_add(1, Ordering::Relaxed);
133 counter!(format!("{}_wal_writes_total", self.config.prefix)).increment(1);
134 }
135
136 pub fn record_buffer_flush(&self, records_flushed: usize) {
138 self.buffer_flushes.fetch_add(1, Ordering::Relaxed);
139 counter!(format!("{}_buffer_flushes_total", self.config.prefix)).increment(1);
140 counter!(format!("{}_records_flushed_total", self.config.prefix))
141 .increment(records_flushed as u64);
142 }
143
144 pub fn record_error(&self, error_type: &str) {
146 self.errors_total.fetch_add(1, Ordering::Relaxed);
147 counter!(
148 format!("{}_errors_total", self.config.prefix),
149 "type" => error_type.to_string()
150 )
151 .increment(1);
152 }
153
154 pub fn set_index_size(&self, index_name: &str, size: usize) {
156 gauge!(
157 format!("{}_index_size", self.config.prefix),
158 "index" => index_name.to_string()
159 )
160 .set(size as f64);
161 }
162
163 pub fn set_store_size(&self, size: usize) {
165 gauge!(format!("{}_store_size", self.config.prefix)).set(size as f64);
166 }
167
168 pub fn set_memory_bytes(&self, bytes: usize) {
170 gauge!(format!("{}_memory_bytes", self.config.prefix)).set(bytes as f64);
171 }
172
173 #[must_use]
175 pub fn snapshot(&self) -> MetricsSnapshot {
176 MetricsSnapshot {
177 queries_total: self.queries_total.load(Ordering::Relaxed),
178 cache_hits: self.cache_hits.load(Ordering::Relaxed),
179 cache_misses: self.cache_misses.load(Ordering::Relaxed),
180 wal_writes: self.wal_writes.load(Ordering::Relaxed),
181 buffer_flushes: self.buffer_flushes.load(Ordering::Relaxed),
182 errors_total: self.errors_total.load(Ordering::Relaxed),
183 }
184 }
185
186 #[must_use]
188 pub fn cache_hit_ratio(&self) -> f64 {
189 let hits = self.cache_hits.load(Ordering::Relaxed);
190 let misses = self.cache_misses.load(Ordering::Relaxed);
191 let total = hits + misses;
192
193 if total == 0 {
194 0.0
195 } else {
196 hits as f64 / total as f64
197 }
198 }
199}
200
201impl Default for Metrics {
202 fn default() -> Self {
203 Self::default_metrics()
204 }
205}
206
207#[derive(Debug, Clone, Default)]
209pub struct MetricsSnapshot {
210 pub queries_total: u64,
212 pub cache_hits: u64,
214 pub cache_misses: u64,
216 pub wal_writes: u64,
218 pub buffer_flushes: u64,
220 pub errors_total: u64,
222}
223
224impl MetricsSnapshot {
225 #[must_use]
227 pub fn cache_hit_ratio(&self) -> f64 {
228 let total = self.cache_hits + self.cache_misses;
229 if total == 0 {
230 0.0
231 } else {
232 self.cache_hits as f64 / total as f64
233 }
234 }
235}
236
237#[allow(dead_code)]
239pub struct Timer {
240 start: Instant,
241}
242
243#[allow(dead_code)]
244impl Timer {
245 #[must_use]
247 pub fn start() -> Self {
248 Self {
249 start: Instant::now(),
250 }
251 }
252
253 #[must_use]
255 pub fn elapsed(&self) -> Duration {
256 self.start.elapsed()
257 }
258
259 #[must_use]
261 pub fn stop(self) -> Duration {
262 self.elapsed()
263 }
264}
265
266#[cfg(test)]
267mod tests {
268 use super::*;
269
270 #[test]
271 fn test_metrics_creation() {
272 let metrics = Metrics::default_metrics();
273 let snapshot = metrics.snapshot();
274 assert_eq!(snapshot.queries_total, 0);
275 }
276
277 #[test]
278 fn test_record_query() {
279 let metrics = Metrics::default_metrics();
280
281 metrics.record_query(Duration::from_millis(50), 10, Some("test"));
282 metrics.record_query(Duration::from_millis(100), 5, None);
283
284 let snapshot = metrics.snapshot();
285 assert_eq!(snapshot.queries_total, 2);
286 }
287
288 #[test]
289 fn test_cache_metrics() {
290 let metrics = Metrics::default_metrics();
291
292 metrics.record_cache_hit();
293 metrics.record_cache_hit();
294 metrics.record_cache_miss();
295
296 let snapshot = metrics.snapshot();
297 assert_eq!(snapshot.cache_hits, 2);
298 assert_eq!(snapshot.cache_misses, 1);
299 assert!((metrics.cache_hit_ratio() - 0.666).abs() < 0.01);
300 }
301
302 #[test]
303 fn test_timer() {
304 let timer = Timer::start();
305 std::thread::sleep(Duration::from_millis(10));
306 let elapsed = timer.stop();
307
308 assert!(elapsed >= Duration::from_millis(10));
309 }
310
311 #[test]
312 fn test_config_builder() {
313 let config = MetricsConfig::new().with_prefix("myapp");
314 assert_eq!(config.prefix, "myapp");
315 }
316}