Skip to main content

heliosdb_proxy/rewriter/
metrics.rs

1//! Rewrite Metrics
2//!
3//! Metrics collection for query rewriting.
4
5use parking_lot::RwLock;
6use std::collections::HashMap;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::time::Duration;
9
10/// Rewrite metrics collector
11pub struct RewriteMetrics {
12    /// Total queries processed
13    total_queries: AtomicU64,
14
15    /// Queries that were rewritten
16    rewritten_queries: AtomicU64,
17
18    /// Queries with no matching rules
19    no_match_queries: AtomicU64,
20
21    /// Total rewrite time (nanoseconds)
22    total_rewrite_time_ns: AtomicU64,
23
24    /// Per-rule statistics
25    rule_stats: RwLock<HashMap<String, RuleStats>>,
26
27    /// Histogram buckets for latency
28    latency_buckets: RwLock<LatencyHistogram>,
29}
30
31impl RewriteMetrics {
32    /// Create new metrics
33    pub fn new() -> Self {
34        Self {
35            total_queries: AtomicU64::new(0),
36            rewritten_queries: AtomicU64::new(0),
37            no_match_queries: AtomicU64::new(0),
38            total_rewrite_time_ns: AtomicU64::new(0),
39            rule_stats: RwLock::new(HashMap::new()),
40            latency_buckets: RwLock::new(LatencyHistogram::new()),
41        }
42    }
43
44    /// Record a rewrite operation
45    pub fn record_rewrite(&self, duration: Duration, was_rewritten: bool) {
46        self.total_queries.fetch_add(1, Ordering::Relaxed);
47
48        if was_rewritten {
49            self.rewritten_queries.fetch_add(1, Ordering::Relaxed);
50        }
51
52        let nanos = duration.as_nanos() as u64;
53        self.total_rewrite_time_ns
54            .fetch_add(nanos, Ordering::Relaxed);
55
56        self.latency_buckets.write().record(duration);
57    }
58
59    /// Record a no-match query
60    pub fn record_no_match(&self, duration: Duration) {
61        self.total_queries.fetch_add(1, Ordering::Relaxed);
62        self.no_match_queries.fetch_add(1, Ordering::Relaxed);
63
64        let nanos = duration.as_nanos() as u64;
65        self.total_rewrite_time_ns
66            .fetch_add(nanos, Ordering::Relaxed);
67
68        self.latency_buckets.write().record(duration);
69    }
70
71    /// Record a rule match
72    pub fn record_rule_match(&self, rule_id: &str) {
73        let mut stats = self.rule_stats.write();
74        let entry = stats.entry(rule_id.to_string()).or_default();
75        entry.matches.fetch_add(1, Ordering::Relaxed);
76    }
77
78    /// Get statistics
79    pub fn stats(&self) -> RewriteStats {
80        let total = self.total_queries.load(Ordering::Relaxed);
81        let rewritten = self.rewritten_queries.load(Ordering::Relaxed);
82        let no_match = self.no_match_queries.load(Ordering::Relaxed);
83        let total_time_ns = self.total_rewrite_time_ns.load(Ordering::Relaxed);
84
85        let avg_time = Duration::from_nanos(total_time_ns.checked_div(total).unwrap_or(0));
86
87        let rewrite_ratio = if total > 0 {
88            rewritten as f64 / total as f64
89        } else {
90            0.0
91        };
92
93        let rule_stats: HashMap<String, RuleStatsSnapshot> = self
94            .rule_stats
95            .read()
96            .iter()
97            .map(|(k, v)| (k.clone(), v.snapshot()))
98            .collect();
99
100        let latency = self.latency_buckets.read().percentiles();
101
102        RewriteStats {
103            total_queries: total,
104            rewritten_queries: rewritten,
105            no_match_queries: no_match,
106            rewrite_ratio,
107            avg_rewrite_time: avg_time,
108            total_rewrite_time: Duration::from_nanos(total_time_ns),
109            rule_stats,
110            latency,
111        }
112    }
113
114    /// Reset all metrics
115    pub fn reset(&self) {
116        self.total_queries.store(0, Ordering::Relaxed);
117        self.rewritten_queries.store(0, Ordering::Relaxed);
118        self.no_match_queries.store(0, Ordering::Relaxed);
119        self.total_rewrite_time_ns.store(0, Ordering::Relaxed);
120        self.rule_stats.write().clear();
121        *self.latency_buckets.write() = LatencyHistogram::new();
122    }
123}
124
125impl Default for RewriteMetrics {
126    fn default() -> Self {
127        Self::new()
128    }
129}
130
131/// Per-rule statistics
132pub struct RuleStats {
133    /// Number of matches
134    pub matches: AtomicU64,
135
136    /// Number of successful applications
137    pub applied: AtomicU64,
138
139    /// Number of failures
140    pub failures: AtomicU64,
141
142    /// Total time saved (estimated, nanoseconds)
143    pub time_saved_ns: AtomicU64,
144}
145
146impl RuleStats {
147    /// Create new stats
148    pub fn new() -> Self {
149        Self {
150            matches: AtomicU64::new(0),
151            applied: AtomicU64::new(0),
152            failures: AtomicU64::new(0),
153            time_saved_ns: AtomicU64::new(0),
154        }
155    }
156
157    /// Get a snapshot
158    pub fn snapshot(&self) -> RuleStatsSnapshot {
159        RuleStatsSnapshot {
160            matches: self.matches.load(Ordering::Relaxed),
161            applied: self.applied.load(Ordering::Relaxed),
162            failures: self.failures.load(Ordering::Relaxed),
163            time_saved: Duration::from_nanos(self.time_saved_ns.load(Ordering::Relaxed)),
164        }
165    }
166}
167
168impl Default for RuleStats {
169    fn default() -> Self {
170        Self::new()
171    }
172}
173
174/// Snapshot of rule statistics
175#[derive(Debug, Clone)]
176pub struct RuleStatsSnapshot {
177    /// Number of matches
178    pub matches: u64,
179
180    /// Number of successful applications
181    pub applied: u64,
182
183    /// Number of failures
184    pub failures: u64,
185
186    /// Total time saved
187    pub time_saved: Duration,
188}
189
190/// Overall rewrite statistics
191#[derive(Debug, Clone)]
192pub struct RewriteStats {
193    /// Total queries processed
194    pub total_queries: u64,
195
196    /// Queries that were rewritten
197    pub rewritten_queries: u64,
198
199    /// Queries with no matching rules
200    pub no_match_queries: u64,
201
202    /// Ratio of rewritten queries
203    pub rewrite_ratio: f64,
204
205    /// Average rewrite time
206    pub avg_rewrite_time: Duration,
207
208    /// Total rewrite time
209    pub total_rewrite_time: Duration,
210
211    /// Per-rule statistics
212    pub rule_stats: HashMap<String, RuleStatsSnapshot>,
213
214    /// Latency percentiles
215    pub latency: LatencyPercentiles,
216}
217
218/// Latency histogram
219struct LatencyHistogram {
220    /// Bucket boundaries (microseconds)
221    boundaries: Vec<u64>,
222
223    /// Counts per bucket
224    counts: Vec<AtomicU64>,
225
226    /// Total count
227    total: AtomicU64,
228}
229
230impl LatencyHistogram {
231    fn new() -> Self {
232        // Buckets: 1μs, 5μs, 10μs, 25μs, 50μs, 100μs, 250μs, 500μs, 1ms, 5ms, 10ms
233        let boundaries = vec![1, 5, 10, 25, 50, 100, 250, 500, 1000, 5000, 10000];
234        let counts: Vec<AtomicU64> = (0..=boundaries.len()).map(|_| AtomicU64::new(0)).collect();
235
236        Self {
237            boundaries,
238            counts,
239            total: AtomicU64::new(0),
240        }
241    }
242
243    fn record(&mut self, duration: Duration) {
244        let micros = duration.as_micros() as u64;
245        let mut bucket = self.boundaries.len();
246
247        for (i, &boundary) in self.boundaries.iter().enumerate() {
248            if micros <= boundary {
249                bucket = i;
250                break;
251            }
252        }
253
254        self.counts[bucket].fetch_add(1, Ordering::Relaxed);
255        self.total.fetch_add(1, Ordering::Relaxed);
256    }
257
258    fn percentiles(&self) -> LatencyPercentiles {
259        let total = self.total.load(Ordering::Relaxed) as f64;
260
261        if total == 0.0 {
262            return LatencyPercentiles::default();
263        }
264
265        let cumulative: Vec<u64> = self
266            .counts
267            .iter()
268            .scan(0u64, |acc, c| {
269                *acc += c.load(Ordering::Relaxed);
270                Some(*acc)
271            })
272            .collect();
273
274        let get_percentile = |p: f64| -> Duration {
275            let target = (total * p) as u64;
276            for (i, &count) in cumulative.iter().enumerate() {
277                if count >= target {
278                    if i < self.boundaries.len() {
279                        return Duration::from_micros(self.boundaries[i]);
280                    } else {
281                        return Duration::from_micros(
282                            self.boundaries.last().copied().unwrap_or(10000) * 2,
283                        );
284                    }
285                }
286            }
287            Duration::from_micros(10000)
288        };
289
290        LatencyPercentiles {
291            p50: get_percentile(0.50),
292            p90: get_percentile(0.90),
293            p95: get_percentile(0.95),
294            p99: get_percentile(0.99),
295        }
296    }
297}
298
299/// Latency percentiles
300#[derive(Debug, Clone, Default)]
301pub struct LatencyPercentiles {
302    /// 50th percentile
303    pub p50: Duration,
304
305    /// 90th percentile
306    pub p90: Duration,
307
308    /// 95th percentile
309    pub p95: Duration,
310
311    /// 99th percentile
312    pub p99: Duration,
313}
314
315#[cfg(test)]
316mod tests {
317    use super::*;
318
319    #[test]
320    fn test_metrics_basic() {
321        let metrics = RewriteMetrics::new();
322
323        metrics.record_rewrite(Duration::from_micros(100), true);
324        metrics.record_rewrite(Duration::from_micros(50), false);
325        metrics.record_no_match(Duration::from_micros(10));
326
327        let stats = metrics.stats();
328        assert_eq!(stats.total_queries, 3);
329        assert_eq!(stats.rewritten_queries, 1);
330        assert_eq!(stats.no_match_queries, 1);
331    }
332
333    #[test]
334    fn test_rule_stats() {
335        let metrics = RewriteMetrics::new();
336
337        metrics.record_rule_match("rule1");
338        metrics.record_rule_match("rule1");
339        metrics.record_rule_match("rule2");
340
341        let stats = metrics.stats();
342        assert_eq!(stats.rule_stats.get("rule1").unwrap().matches, 2);
343        assert_eq!(stats.rule_stats.get("rule2").unwrap().matches, 1);
344    }
345
346    #[test]
347    fn test_reset() {
348        let metrics = RewriteMetrics::new();
349
350        metrics.record_rewrite(Duration::from_micros(100), true);
351        metrics.record_rule_match("rule1");
352
353        metrics.reset();
354
355        let stats = metrics.stats();
356        assert_eq!(stats.total_queries, 0);
357        assert!(stats.rule_stats.is_empty());
358    }
359
360    #[test]
361    fn test_rewrite_ratio() {
362        let metrics = RewriteMetrics::new();
363
364        // 3 rewritten, 7 not = 30% ratio
365        for _ in 0..3 {
366            metrics.record_rewrite(Duration::from_micros(10), true);
367        }
368        for _ in 0..7 {
369            metrics.record_rewrite(Duration::from_micros(10), false);
370        }
371
372        let stats = metrics.stats();
373        assert!((stats.rewrite_ratio - 0.3).abs() < 0.01);
374    }
375}