Skip to main content

heliosdb_proxy/rewriter/
metrics.rs

1//! Rewrite Metrics
2//!
3//! Metrics collection for query rewriting.
4
5use std::collections::HashMap;
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::time::Duration;
8use parking_lot::RwLock;
9
10/// Rewrite metrics collector
11pub struct RewriteMetrics {
12    /// Total queries processed
13    total_queries: AtomicU64,
14
15    /// Queries that were rewritten
16    rewritten_queries: AtomicU64,
17
18    /// Queries with no matching rules
19    no_match_queries: AtomicU64,
20
21    /// Total rewrite time (nanoseconds)
22    total_rewrite_time_ns: AtomicU64,
23
24    /// Per-rule statistics
25    rule_stats: RwLock<HashMap<String, RuleStats>>,
26
27    /// Histogram buckets for latency
28    latency_buckets: RwLock<LatencyHistogram>,
29}
30
31impl RewriteMetrics {
32    /// Create new metrics
33    pub fn new() -> Self {
34        Self {
35            total_queries: AtomicU64::new(0),
36            rewritten_queries: AtomicU64::new(0),
37            no_match_queries: AtomicU64::new(0),
38            total_rewrite_time_ns: AtomicU64::new(0),
39            rule_stats: RwLock::new(HashMap::new()),
40            latency_buckets: RwLock::new(LatencyHistogram::new()),
41        }
42    }
43
44    /// Record a rewrite operation
45    pub fn record_rewrite(&self, duration: Duration, was_rewritten: bool) {
46        self.total_queries.fetch_add(1, Ordering::Relaxed);
47
48        if was_rewritten {
49            self.rewritten_queries.fetch_add(1, Ordering::Relaxed);
50        }
51
52        let nanos = duration.as_nanos() as u64;
53        self.total_rewrite_time_ns.fetch_add(nanos, Ordering::Relaxed);
54
55        self.latency_buckets.write().record(duration);
56    }
57
58    /// Record a no-match query
59    pub fn record_no_match(&self, duration: Duration) {
60        self.total_queries.fetch_add(1, Ordering::Relaxed);
61        self.no_match_queries.fetch_add(1, Ordering::Relaxed);
62
63        let nanos = duration.as_nanos() as u64;
64        self.total_rewrite_time_ns.fetch_add(nanos, Ordering::Relaxed);
65
66        self.latency_buckets.write().record(duration);
67    }
68
69    /// Record a rule match
70    pub fn record_rule_match(&self, rule_id: &str) {
71        let mut stats = self.rule_stats.write();
72        let entry = stats.entry(rule_id.to_string()).or_insert_with(RuleStats::new);
73        entry.matches.fetch_add(1, Ordering::Relaxed);
74    }
75
76    /// Get statistics
77    pub fn stats(&self) -> RewriteStats {
78        let total = self.total_queries.load(Ordering::Relaxed);
79        let rewritten = self.rewritten_queries.load(Ordering::Relaxed);
80        let no_match = self.no_match_queries.load(Ordering::Relaxed);
81        let total_time_ns = self.total_rewrite_time_ns.load(Ordering::Relaxed);
82
83        let avg_time = if total > 0 {
84            Duration::from_nanos(total_time_ns / total)
85        } else {
86            Duration::ZERO
87        };
88
89        let rewrite_ratio = if total > 0 {
90            rewritten as f64 / total as f64
91        } else {
92            0.0
93        };
94
95        let rule_stats: HashMap<String, RuleStatsSnapshot> = self.rule_stats.read()
96            .iter()
97            .map(|(k, v)| (k.clone(), v.snapshot()))
98            .collect();
99
100        let latency = self.latency_buckets.read().percentiles();
101
102        RewriteStats {
103            total_queries: total,
104            rewritten_queries: rewritten,
105            no_match_queries: no_match,
106            rewrite_ratio,
107            avg_rewrite_time: avg_time,
108            total_rewrite_time: Duration::from_nanos(total_time_ns),
109            rule_stats,
110            latency,
111        }
112    }
113
114    /// Reset all metrics
115    pub fn reset(&self) {
116        self.total_queries.store(0, Ordering::Relaxed);
117        self.rewritten_queries.store(0, Ordering::Relaxed);
118        self.no_match_queries.store(0, Ordering::Relaxed);
119        self.total_rewrite_time_ns.store(0, Ordering::Relaxed);
120        self.rule_stats.write().clear();
121        *self.latency_buckets.write() = LatencyHistogram::new();
122    }
123}
124
125impl Default for RewriteMetrics {
126    fn default() -> Self {
127        Self::new()
128    }
129}
130
131/// Per-rule statistics
132pub struct RuleStats {
133    /// Number of matches
134    pub matches: AtomicU64,
135
136    /// Number of successful applications
137    pub applied: AtomicU64,
138
139    /// Number of failures
140    pub failures: AtomicU64,
141
142    /// Total time saved (estimated, nanoseconds)
143    pub time_saved_ns: AtomicU64,
144}
145
146impl RuleStats {
147    /// Create new stats
148    pub fn new() -> Self {
149        Self {
150            matches: AtomicU64::new(0),
151            applied: AtomicU64::new(0),
152            failures: AtomicU64::new(0),
153            time_saved_ns: AtomicU64::new(0),
154        }
155    }
156
157    /// Get a snapshot
158    pub fn snapshot(&self) -> RuleStatsSnapshot {
159        RuleStatsSnapshot {
160            matches: self.matches.load(Ordering::Relaxed),
161            applied: self.applied.load(Ordering::Relaxed),
162            failures: self.failures.load(Ordering::Relaxed),
163            time_saved: Duration::from_nanos(self.time_saved_ns.load(Ordering::Relaxed)),
164        }
165    }
166}
167
168impl Default for RuleStats {
169    fn default() -> Self {
170        Self::new()
171    }
172}
173
174/// Snapshot of rule statistics
175#[derive(Debug, Clone)]
176pub struct RuleStatsSnapshot {
177    /// Number of matches
178    pub matches: u64,
179
180    /// Number of successful applications
181    pub applied: u64,
182
183    /// Number of failures
184    pub failures: u64,
185
186    /// Total time saved
187    pub time_saved: Duration,
188}
189
190/// Overall rewrite statistics
191#[derive(Debug, Clone)]
192pub struct RewriteStats {
193    /// Total queries processed
194    pub total_queries: u64,
195
196    /// Queries that were rewritten
197    pub rewritten_queries: u64,
198
199    /// Queries with no matching rules
200    pub no_match_queries: u64,
201
202    /// Ratio of rewritten queries
203    pub rewrite_ratio: f64,
204
205    /// Average rewrite time
206    pub avg_rewrite_time: Duration,
207
208    /// Total rewrite time
209    pub total_rewrite_time: Duration,
210
211    /// Per-rule statistics
212    pub rule_stats: HashMap<String, RuleStatsSnapshot>,
213
214    /// Latency percentiles
215    pub latency: LatencyPercentiles,
216}
217
218/// Latency histogram
219struct LatencyHistogram {
220    /// Bucket boundaries (microseconds)
221    boundaries: Vec<u64>,
222
223    /// Counts per bucket
224    counts: Vec<AtomicU64>,
225
226    /// Total count
227    total: AtomicU64,
228}
229
230impl LatencyHistogram {
231    fn new() -> Self {
232        // Buckets: 1μs, 5μs, 10μs, 25μs, 50μs, 100μs, 250μs, 500μs, 1ms, 5ms, 10ms
233        let boundaries = vec![1, 5, 10, 25, 50, 100, 250, 500, 1000, 5000, 10000];
234        let counts: Vec<AtomicU64> = (0..=boundaries.len()).map(|_| AtomicU64::new(0)).collect();
235
236        Self {
237            boundaries,
238            counts,
239            total: AtomicU64::new(0),
240        }
241    }
242
243    fn record(&mut self, duration: Duration) {
244        let micros = duration.as_micros() as u64;
245        let mut bucket = self.boundaries.len();
246
247        for (i, &boundary) in self.boundaries.iter().enumerate() {
248            if micros <= boundary {
249                bucket = i;
250                break;
251            }
252        }
253
254        self.counts[bucket].fetch_add(1, Ordering::Relaxed);
255        self.total.fetch_add(1, Ordering::Relaxed);
256    }
257
258    fn percentiles(&self) -> LatencyPercentiles {
259        let total = self.total.load(Ordering::Relaxed) as f64;
260
261        if total == 0.0 {
262            return LatencyPercentiles::default();
263        }
264
265        let cumulative: Vec<u64> = self.counts.iter()
266            .scan(0u64, |acc, c| {
267                *acc += c.load(Ordering::Relaxed);
268                Some(*acc)
269            })
270            .collect();
271
272        let get_percentile = |p: f64| -> Duration {
273            let target = (total * p) as u64;
274            for (i, &count) in cumulative.iter().enumerate() {
275                if count >= target {
276                    if i < self.boundaries.len() {
277                        return Duration::from_micros(self.boundaries[i]);
278                    } else {
279                        return Duration::from_micros(self.boundaries.last().copied().unwrap_or(10000) * 2);
280                    }
281                }
282            }
283            Duration::from_micros(10000)
284        };
285
286        LatencyPercentiles {
287            p50: get_percentile(0.50),
288            p90: get_percentile(0.90),
289            p95: get_percentile(0.95),
290            p99: get_percentile(0.99),
291        }
292    }
293}
294
295/// Latency percentiles
296#[derive(Debug, Clone, Default)]
297pub struct LatencyPercentiles {
298    /// 50th percentile
299    pub p50: Duration,
300
301    /// 90th percentile
302    pub p90: Duration,
303
304    /// 95th percentile
305    pub p95: Duration,
306
307    /// 99th percentile
308    pub p99: Duration,
309}
310
311#[cfg(test)]
312mod tests {
313    use super::*;
314
315    #[test]
316    fn test_metrics_basic() {
317        let metrics = RewriteMetrics::new();
318
319        metrics.record_rewrite(Duration::from_micros(100), true);
320        metrics.record_rewrite(Duration::from_micros(50), false);
321        metrics.record_no_match(Duration::from_micros(10));
322
323        let stats = metrics.stats();
324        assert_eq!(stats.total_queries, 3);
325        assert_eq!(stats.rewritten_queries, 1);
326        assert_eq!(stats.no_match_queries, 1);
327    }
328
329    #[test]
330    fn test_rule_stats() {
331        let metrics = RewriteMetrics::new();
332
333        metrics.record_rule_match("rule1");
334        metrics.record_rule_match("rule1");
335        metrics.record_rule_match("rule2");
336
337        let stats = metrics.stats();
338        assert_eq!(stats.rule_stats.get("rule1").unwrap().matches, 2);
339        assert_eq!(stats.rule_stats.get("rule2").unwrap().matches, 1);
340    }
341
342    #[test]
343    fn test_reset() {
344        let metrics = RewriteMetrics::new();
345
346        metrics.record_rewrite(Duration::from_micros(100), true);
347        metrics.record_rule_match("rule1");
348
349        metrics.reset();
350
351        let stats = metrics.stats();
352        assert_eq!(stats.total_queries, 0);
353        assert!(stats.rule_stats.is_empty());
354    }
355
356    #[test]
357    fn test_rewrite_ratio() {
358        let metrics = RewriteMetrics::new();
359
360        // 3 rewritten, 7 not = 30% ratio
361        for _ in 0..3 {
362            metrics.record_rewrite(Duration::from_micros(10), true);
363        }
364        for _ in 0..7 {
365            metrics.record_rewrite(Duration::from_micros(10), false);
366        }
367
368        let stats = metrics.stats();
369        assert!((stats.rewrite_ratio - 0.3).abs() < 0.01);
370    }
371}