Skip to main content

heliosdb_proxy/routing/
metrics.rs

1//! Routing Metrics
2//!
3//! Tracks routing decisions and performance metrics.
4
5use super::RouteTarget;
6use std::collections::HashMap;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::time::{Duration, Instant};
9use tokio::sync::RwLock;
10
11/// Routing metrics tracker
12pub struct RoutingMetrics {
13    /// Total queries routed
14    total_routed: AtomicU64,
15    /// Queries routed with hints
16    with_hints: AtomicU64,
17    /// Queries routed without hints
18    without_hints: AtomicU64,
19    /// Invalid hints encountered
20    invalid_hints: AtomicU64,
21    /// Fallback routing used
22    fallback_count: AtomicU64,
23    /// No nodes available
24    no_nodes_count: AtomicU64,
25    /// Total routing time (microseconds)
26    total_routing_time_us: AtomicU64,
27    /// Per-target counts
28    target_counts: RwLock<HashMap<RouteTarget, u64>>,
29    /// Per-hint usage counts
30    hint_usage: RwLock<HashMap<String, u64>>,
31    /// Recent routing decisions (for debugging)
32    recent_decisions: RwLock<Vec<RoutingDecisionRecord>>,
33    /// Maximum recent decisions to keep
34    max_recent: usize,
35}
36
37impl RoutingMetrics {
38    /// Create new metrics tracker
39    pub fn new() -> Self {
40        Self {
41            total_routed: AtomicU64::new(0),
42            with_hints: AtomicU64::new(0),
43            without_hints: AtomicU64::new(0),
44            invalid_hints: AtomicU64::new(0),
45            fallback_count: AtomicU64::new(0),
46            no_nodes_count: AtomicU64::new(0),
47            total_routing_time_us: AtomicU64::new(0),
48            target_counts: RwLock::new(HashMap::new()),
49            hint_usage: RwLock::new(HashMap::new()),
50            recent_decisions: RwLock::new(Vec::new()),
51            max_recent: 100,
52        }
53    }
54
55    /// Record a routing decision
56    pub fn record_routing(
57        &self,
58        target: Option<RouteTarget>,
59        had_hints: bool,
60        elapsed: Duration,
61    ) {
62        self.total_routed.fetch_add(1, Ordering::SeqCst);
63
64        if had_hints {
65            self.with_hints.fetch_add(1, Ordering::SeqCst);
66        } else {
67            self.without_hints.fetch_add(1, Ordering::SeqCst);
68        }
69
70        self.total_routing_time_us
71            .fetch_add(elapsed.as_micros() as u64, Ordering::SeqCst);
72
73        // Track target usage (async - won't block)
74        if let Some(t) = target {
75            let _target = t;
76            tokio::spawn(async move {
77                // In real implementation, would update the actual counter
78                // This is simplified for the skeleton
79            });
80        }
81    }
82
83    /// Record invalid hints
84    pub fn record_invalid_hints(&self) {
85        self.invalid_hints.fetch_add(1, Ordering::SeqCst);
86    }
87
88    /// Record fallback routing
89    pub fn record_fallback(&self) {
90        self.fallback_count.fetch_add(1, Ordering::SeqCst);
91    }
92
93    /// Record no nodes available
94    pub fn record_no_nodes(&self) {
95        self.no_nodes_count.fetch_add(1, Ordering::SeqCst);
96    }
97
98    /// Record hint usage
99    pub async fn record_hint(&self, hint_name: &str) {
100        let mut usage = self.hint_usage.write().await;
101        *usage.entry(hint_name.to_string()).or_insert(0) += 1;
102    }
103
104    /// Record a decision for debugging
105    pub async fn record_decision(&self, record: RoutingDecisionRecord) {
106        let mut recent = self.recent_decisions.write().await;
107        recent.push(record);
108
109        // Keep only recent decisions
110        if recent.len() > self.max_recent {
111            recent.remove(0);
112        }
113    }
114
115    /// Get a snapshot of current stats
116    pub fn snapshot(&self) -> RoutingStats {
117        let total = self.total_routed.load(Ordering::SeqCst);
118        let total_time_us = self.total_routing_time_us.load(Ordering::SeqCst);
119
120        RoutingStats {
121            total_routed: total,
122            with_hints: self.with_hints.load(Ordering::SeqCst),
123            without_hints: self.without_hints.load(Ordering::SeqCst),
124            invalid_hints: self.invalid_hints.load(Ordering::SeqCst),
125            fallback_count: self.fallback_count.load(Ordering::SeqCst),
126            no_nodes_count: self.no_nodes_count.load(Ordering::SeqCst),
127            avg_routing_time_us: if total > 0 { total_time_us / total } else { 0 },
128        }
129    }
130
131    /// Get hint usage stats
132    pub async fn hint_usage(&self) -> HintUsageStats {
133        let usage = self.hint_usage.read().await;
134        HintUsageStats {
135            by_hint: usage.clone(),
136        }
137    }
138
139    /// Get recent decisions
140    pub async fn recent_decisions(&self, limit: usize) -> Vec<RoutingDecisionRecord> {
141        let recent = self.recent_decisions.read().await;
142        recent.iter().rev().take(limit).cloned().collect()
143    }
144
145    /// Get target distribution
146    pub async fn target_distribution(&self) -> HashMap<RouteTarget, u64> {
147        self.target_counts.read().await.clone()
148    }
149
150    /// Reset all metrics
151    pub async fn reset(&self) {
152        self.total_routed.store(0, Ordering::SeqCst);
153        self.with_hints.store(0, Ordering::SeqCst);
154        self.without_hints.store(0, Ordering::SeqCst);
155        self.invalid_hints.store(0, Ordering::SeqCst);
156        self.fallback_count.store(0, Ordering::SeqCst);
157        self.no_nodes_count.store(0, Ordering::SeqCst);
158        self.total_routing_time_us.store(0, Ordering::SeqCst);
159        self.target_counts.write().await.clear();
160        self.hint_usage.write().await.clear();
161        self.recent_decisions.write().await.clear();
162    }
163}
164
165impl Default for RoutingMetrics {
166    fn default() -> Self {
167        Self::new()
168    }
169}
170
171/// Routing statistics snapshot
172#[derive(Debug, Clone)]
173pub struct RoutingStats {
174    /// Total queries routed
175    pub total_routed: u64,
176    /// Queries with routing hints
177    pub with_hints: u64,
178    /// Queries without routing hints
179    pub without_hints: u64,
180    /// Invalid hint combinations
181    pub invalid_hints: u64,
182    /// Fallback routing count
183    pub fallback_count: u64,
184    /// No nodes available count
185    pub no_nodes_count: u64,
186    /// Average routing decision time (microseconds)
187    pub avg_routing_time_us: u64,
188}
189
190impl RoutingStats {
191    /// Get percentage of queries with hints
192    pub fn hints_percentage(&self) -> f64 {
193        if self.total_routed == 0 {
194            0.0
195        } else {
196            (self.with_hints as f64 / self.total_routed as f64) * 100.0
197        }
198    }
199
200    /// Get fallback percentage
201    pub fn fallback_percentage(&self) -> f64 {
202        if self.total_routed == 0 {
203            0.0
204        } else {
205            (self.fallback_count as f64 / self.total_routed as f64) * 100.0
206        }
207    }
208}
209
210/// Hint usage statistics
211#[derive(Debug, Clone)]
212pub struct HintUsageStats {
213    /// Count by hint name
214    pub by_hint: HashMap<String, u64>,
215}
216
217impl HintUsageStats {
218    /// Get most used hints
219    pub fn top_hints(&self, n: usize) -> Vec<(String, u64)> {
220        let mut hints: Vec<_> = self.by_hint.iter()
221            .map(|(k, v)| (k.clone(), *v))
222            .collect();
223        hints.sort_by(|a, b| b.1.cmp(&a.1));
224        hints.truncate(n);
225        hints
226    }
227}
228
229/// Record of a routing decision (for debugging/auditing)
230#[derive(Debug, Clone)]
231pub struct RoutingDecisionRecord {
232    /// Query hash (for privacy)
233    pub query_hash: u64,
234    /// Target node
235    pub target_node: Option<String>,
236    /// Route target hint
237    pub route_target: Option<RouteTarget>,
238    /// Hints used
239    pub hints: Vec<String>,
240    /// Decision reason
241    pub reason: String,
242    /// Timestamp
243    pub timestamp: Instant,
244    /// Routing time
245    pub elapsed_us: u64,
246}
247
248impl RoutingDecisionRecord {
249    /// Create a new record
250    pub fn new(
251        query: &str,
252        target_node: Option<String>,
253        route_target: Option<RouteTarget>,
254        hints: Vec<String>,
255        reason: String,
256        elapsed: Duration,
257    ) -> Self {
258        use std::collections::hash_map::DefaultHasher;
259        use std::hash::{Hash, Hasher};
260
261        let mut hasher = DefaultHasher::new();
262        query.hash(&mut hasher);
263
264        Self {
265            query_hash: hasher.finish(),
266            target_node,
267            route_target,
268            hints,
269            reason,
270            timestamp: Instant::now(),
271            elapsed_us: elapsed.as_micros() as u64,
272        }
273    }
274}
275
276#[cfg(test)]
277mod tests {
278    use super::*;
279
280    #[tokio::test]
281    async fn test_record_routing() {
282        let metrics = RoutingMetrics::new();
283
284        metrics.record_routing(Some(RouteTarget::Primary), true, Duration::from_micros(100));
285        metrics.record_routing(Some(RouteTarget::Standby), false, Duration::from_micros(50));
286        metrics.record_routing(Some(RouteTarget::Async), true, Duration::from_micros(75));
287
288        let stats = metrics.snapshot();
289        assert_eq!(stats.total_routed, 3);
290        assert_eq!(stats.with_hints, 2);
291        assert_eq!(stats.without_hints, 1);
292    }
293
294    #[tokio::test]
295    async fn test_record_errors() {
296        let metrics = RoutingMetrics::new();
297
298        metrics.record_invalid_hints();
299        metrics.record_invalid_hints();
300        metrics.record_fallback();
301        metrics.record_no_nodes();
302
303        let stats = metrics.snapshot();
304        assert_eq!(stats.invalid_hints, 2);
305        assert_eq!(stats.fallback_count, 1);
306        assert_eq!(stats.no_nodes_count, 1);
307    }
308
309    #[tokio::test]
310    async fn test_hint_usage() {
311        let metrics = RoutingMetrics::new();
312
313        metrics.record_hint("route").await;
314        metrics.record_hint("route").await;
315        metrics.record_hint("node").await;
316
317        let usage = metrics.hint_usage().await;
318        assert_eq!(usage.by_hint.get("route"), Some(&2));
319        assert_eq!(usage.by_hint.get("node"), Some(&1));
320    }
321
322    #[tokio::test]
323    async fn test_recent_decisions() {
324        let metrics = RoutingMetrics::new();
325
326        for i in 0..5 {
327            metrics.record_decision(RoutingDecisionRecord::new(
328                &format!("SELECT {}", i),
329                Some("node".to_string()),
330                Some(RouteTarget::Standby),
331                vec!["route".to_string()],
332                "test".to_string(),
333                Duration::from_micros(100),
334            )).await;
335        }
336
337        let recent = metrics.recent_decisions(3).await;
338        assert_eq!(recent.len(), 3);
339    }
340
341    #[tokio::test]
342    async fn test_reset() {
343        let metrics = RoutingMetrics::new();
344
345        metrics.record_routing(Some(RouteTarget::Primary), true, Duration::from_micros(100));
346        metrics.record_hint("route").await;
347
348        metrics.reset().await;
349
350        let stats = metrics.snapshot();
351        assert_eq!(stats.total_routed, 0);
352
353        let usage = metrics.hint_usage().await;
354        assert!(usage.by_hint.is_empty());
355    }
356
357    #[test]
358    fn test_stats_percentages() {
359        let stats = RoutingStats {
360            total_routed: 100,
361            with_hints: 30,
362            without_hints: 70,
363            invalid_hints: 2,
364            fallback_count: 5,
365            no_nodes_count: 1,
366            avg_routing_time_us: 50,
367        };
368
369        assert!((stats.hints_percentage() - 30.0).abs() < f64::EPSILON);
370        assert!((stats.fallback_percentage() - 5.0).abs() < f64::EPSILON);
371    }
372
373    #[test]
374    fn test_top_hints() {
375        let mut by_hint = HashMap::new();
376        by_hint.insert("route".to_string(), 100);
377        by_hint.insert("node".to_string(), 50);
378        by_hint.insert("lag".to_string(), 25);
379
380        let usage = HintUsageStats { by_hint };
381        let top = usage.top_hints(2);
382
383        assert_eq!(top.len(), 2);
384        assert_eq!(top[0].0, "route");
385        assert_eq!(top[1].0, "node");
386    }
387}