Skip to main content

heliosdb_proxy/routing/
metrics.rs

1//! Routing Metrics
2//!
3//! Tracks routing decisions and performance metrics.
4
5use super::RouteTarget;
6use std::collections::HashMap;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::time::{Duration, Instant};
9use tokio::sync::RwLock;
10
11/// Routing metrics tracker
12pub struct RoutingMetrics {
13    /// Total queries routed
14    total_routed: AtomicU64,
15    /// Queries routed with hints
16    with_hints: AtomicU64,
17    /// Queries routed without hints
18    without_hints: AtomicU64,
19    /// Invalid hints encountered
20    invalid_hints: AtomicU64,
21    /// Fallback routing used
22    fallback_count: AtomicU64,
23    /// No nodes available
24    no_nodes_count: AtomicU64,
25    /// Total routing time (microseconds)
26    total_routing_time_us: AtomicU64,
27    /// Per-target counts
28    target_counts: RwLock<HashMap<RouteTarget, u64>>,
29    /// Per-hint usage counts
30    hint_usage: RwLock<HashMap<String, u64>>,
31    /// Recent routing decisions (for debugging)
32    recent_decisions: RwLock<Vec<RoutingDecisionRecord>>,
33    /// Maximum recent decisions to keep
34    max_recent: usize,
35}
36
37impl RoutingMetrics {
38    /// Create new metrics tracker
39    pub fn new() -> Self {
40        Self {
41            total_routed: AtomicU64::new(0),
42            with_hints: AtomicU64::new(0),
43            without_hints: AtomicU64::new(0),
44            invalid_hints: AtomicU64::new(0),
45            fallback_count: AtomicU64::new(0),
46            no_nodes_count: AtomicU64::new(0),
47            total_routing_time_us: AtomicU64::new(0),
48            target_counts: RwLock::new(HashMap::new()),
49            hint_usage: RwLock::new(HashMap::new()),
50            recent_decisions: RwLock::new(Vec::new()),
51            max_recent: 100,
52        }
53    }
54
55    /// Record a routing decision
56    pub fn record_routing(&self, target: Option<RouteTarget>, had_hints: bool, elapsed: Duration) {
57        self.total_routed.fetch_add(1, Ordering::SeqCst);
58
59        if had_hints {
60            self.with_hints.fetch_add(1, Ordering::SeqCst);
61        } else {
62            self.without_hints.fetch_add(1, Ordering::SeqCst);
63        }
64
65        self.total_routing_time_us
66            .fetch_add(elapsed.as_micros() as u64, Ordering::SeqCst);
67
68        // Track target usage (async - won't block)
69        if let Some(t) = target {
70            let _target = t;
71            tokio::spawn(async move {
72                // In real implementation, would update the actual counter
73                // This is simplified for the skeleton
74            });
75        }
76    }
77
78    /// Record invalid hints
79    pub fn record_invalid_hints(&self) {
80        self.invalid_hints.fetch_add(1, Ordering::SeqCst);
81    }
82
83    /// Record fallback routing
84    pub fn record_fallback(&self) {
85        self.fallback_count.fetch_add(1, Ordering::SeqCst);
86    }
87
88    /// Record no nodes available
89    pub fn record_no_nodes(&self) {
90        self.no_nodes_count.fetch_add(1, Ordering::SeqCst);
91    }
92
93    /// Record hint usage
94    pub async fn record_hint(&self, hint_name: &str) {
95        let mut usage = self.hint_usage.write().await;
96        *usage.entry(hint_name.to_string()).or_insert(0) += 1;
97    }
98
99    /// Record a decision for debugging
100    pub async fn record_decision(&self, record: RoutingDecisionRecord) {
101        let mut recent = self.recent_decisions.write().await;
102        recent.push(record);
103
104        // Keep only recent decisions
105        if recent.len() > self.max_recent {
106            recent.remove(0);
107        }
108    }
109
110    /// Get a snapshot of current stats
111    pub fn snapshot(&self) -> RoutingStats {
112        let total = self.total_routed.load(Ordering::SeqCst);
113        let total_time_us = self.total_routing_time_us.load(Ordering::SeqCst);
114
115        RoutingStats {
116            total_routed: total,
117            with_hints: self.with_hints.load(Ordering::SeqCst),
118            without_hints: self.without_hints.load(Ordering::SeqCst),
119            invalid_hints: self.invalid_hints.load(Ordering::SeqCst),
120            fallback_count: self.fallback_count.load(Ordering::SeqCst),
121            no_nodes_count: self.no_nodes_count.load(Ordering::SeqCst),
122            avg_routing_time_us: total_time_us.checked_div(total).unwrap_or(0),
123        }
124    }
125
126    /// Get hint usage stats
127    pub async fn hint_usage(&self) -> HintUsageStats {
128        let usage = self.hint_usage.read().await;
129        HintUsageStats {
130            by_hint: usage.clone(),
131        }
132    }
133
134    /// Get recent decisions
135    pub async fn recent_decisions(&self, limit: usize) -> Vec<RoutingDecisionRecord> {
136        let recent = self.recent_decisions.read().await;
137        recent.iter().rev().take(limit).cloned().collect()
138    }
139
140    /// Get target distribution
141    pub async fn target_distribution(&self) -> HashMap<RouteTarget, u64> {
142        self.target_counts.read().await.clone()
143    }
144
145    /// Reset all metrics
146    pub async fn reset(&self) {
147        self.total_routed.store(0, Ordering::SeqCst);
148        self.with_hints.store(0, Ordering::SeqCst);
149        self.without_hints.store(0, Ordering::SeqCst);
150        self.invalid_hints.store(0, Ordering::SeqCst);
151        self.fallback_count.store(0, Ordering::SeqCst);
152        self.no_nodes_count.store(0, Ordering::SeqCst);
153        self.total_routing_time_us.store(0, Ordering::SeqCst);
154        self.target_counts.write().await.clear();
155        self.hint_usage.write().await.clear();
156        self.recent_decisions.write().await.clear();
157    }
158}
159
160impl Default for RoutingMetrics {
161    fn default() -> Self {
162        Self::new()
163    }
164}
165
166/// Routing statistics snapshot
167#[derive(Debug, Clone)]
168pub struct RoutingStats {
169    /// Total queries routed
170    pub total_routed: u64,
171    /// Queries with routing hints
172    pub with_hints: u64,
173    /// Queries without routing hints
174    pub without_hints: u64,
175    /// Invalid hint combinations
176    pub invalid_hints: u64,
177    /// Fallback routing count
178    pub fallback_count: u64,
179    /// No nodes available count
180    pub no_nodes_count: u64,
181    /// Average routing decision time (microseconds)
182    pub avg_routing_time_us: u64,
183}
184
185impl RoutingStats {
186    /// Get percentage of queries with hints
187    pub fn hints_percentage(&self) -> f64 {
188        if self.total_routed == 0 {
189            0.0
190        } else {
191            (self.with_hints as f64 / self.total_routed as f64) * 100.0
192        }
193    }
194
195    /// Get fallback percentage
196    pub fn fallback_percentage(&self) -> f64 {
197        if self.total_routed == 0 {
198            0.0
199        } else {
200            (self.fallback_count as f64 / self.total_routed as f64) * 100.0
201        }
202    }
203}
204
205/// Hint usage statistics
206#[derive(Debug, Clone)]
207pub struct HintUsageStats {
208    /// Count by hint name
209    pub by_hint: HashMap<String, u64>,
210}
211
212impl HintUsageStats {
213    /// Get most used hints
214    pub fn top_hints(&self, n: usize) -> Vec<(String, u64)> {
215        let mut hints: Vec<_> = self.by_hint.iter().map(|(k, v)| (k.clone(), *v)).collect();
216        hints.sort_by_key(|b| std::cmp::Reverse(b.1));
217        hints.truncate(n);
218        hints
219    }
220}
221
222/// Record of a routing decision (for debugging/auditing)
223#[derive(Debug, Clone)]
224pub struct RoutingDecisionRecord {
225    /// Query hash (for privacy)
226    pub query_hash: u64,
227    /// Target node
228    pub target_node: Option<String>,
229    /// Route target hint
230    pub route_target: Option<RouteTarget>,
231    /// Hints used
232    pub hints: Vec<String>,
233    /// Decision reason
234    pub reason: String,
235    /// Timestamp
236    pub timestamp: Instant,
237    /// Routing time
238    pub elapsed_us: u64,
239}
240
241impl RoutingDecisionRecord {
242    /// Create a new record
243    pub fn new(
244        query: &str,
245        target_node: Option<String>,
246        route_target: Option<RouteTarget>,
247        hints: Vec<String>,
248        reason: String,
249        elapsed: Duration,
250    ) -> Self {
251        use std::collections::hash_map::DefaultHasher;
252        use std::hash::{Hash, Hasher};
253
254        let mut hasher = DefaultHasher::new();
255        query.hash(&mut hasher);
256
257        Self {
258            query_hash: hasher.finish(),
259            target_node,
260            route_target,
261            hints,
262            reason,
263            timestamp: Instant::now(),
264            elapsed_us: elapsed.as_micros() as u64,
265        }
266    }
267}
268
269#[cfg(test)]
270mod tests {
271    use super::*;
272
273    #[tokio::test]
274    async fn test_record_routing() {
275        let metrics = RoutingMetrics::new();
276
277        metrics.record_routing(Some(RouteTarget::Primary), true, Duration::from_micros(100));
278        metrics.record_routing(Some(RouteTarget::Standby), false, Duration::from_micros(50));
279        metrics.record_routing(Some(RouteTarget::Async), true, Duration::from_micros(75));
280
281        let stats = metrics.snapshot();
282        assert_eq!(stats.total_routed, 3);
283        assert_eq!(stats.with_hints, 2);
284        assert_eq!(stats.without_hints, 1);
285    }
286
287    #[tokio::test]
288    async fn test_record_errors() {
289        let metrics = RoutingMetrics::new();
290
291        metrics.record_invalid_hints();
292        metrics.record_invalid_hints();
293        metrics.record_fallback();
294        metrics.record_no_nodes();
295
296        let stats = metrics.snapshot();
297        assert_eq!(stats.invalid_hints, 2);
298        assert_eq!(stats.fallback_count, 1);
299        assert_eq!(stats.no_nodes_count, 1);
300    }
301
302    #[tokio::test]
303    async fn test_hint_usage() {
304        let metrics = RoutingMetrics::new();
305
306        metrics.record_hint("route").await;
307        metrics.record_hint("route").await;
308        metrics.record_hint("node").await;
309
310        let usage = metrics.hint_usage().await;
311        assert_eq!(usage.by_hint.get("route"), Some(&2));
312        assert_eq!(usage.by_hint.get("node"), Some(&1));
313    }
314
315    #[tokio::test]
316    async fn test_recent_decisions() {
317        let metrics = RoutingMetrics::new();
318
319        for i in 0..5 {
320            metrics
321                .record_decision(RoutingDecisionRecord::new(
322                    &format!("SELECT {}", i),
323                    Some("node".to_string()),
324                    Some(RouteTarget::Standby),
325                    vec!["route".to_string()],
326                    "test".to_string(),
327                    Duration::from_micros(100),
328                ))
329                .await;
330        }
331
332        let recent = metrics.recent_decisions(3).await;
333        assert_eq!(recent.len(), 3);
334    }
335
336    #[tokio::test]
337    async fn test_reset() {
338        let metrics = RoutingMetrics::new();
339
340        metrics.record_routing(Some(RouteTarget::Primary), true, Duration::from_micros(100));
341        metrics.record_hint("route").await;
342
343        metrics.reset().await;
344
345        let stats = metrics.snapshot();
346        assert_eq!(stats.total_routed, 0);
347
348        let usage = metrics.hint_usage().await;
349        assert!(usage.by_hint.is_empty());
350    }
351
352    #[test]
353    fn test_stats_percentages() {
354        let stats = RoutingStats {
355            total_routed: 100,
356            with_hints: 30,
357            without_hints: 70,
358            invalid_hints: 2,
359            fallback_count: 5,
360            no_nodes_count: 1,
361            avg_routing_time_us: 50,
362        };
363
364        assert!((stats.hints_percentage() - 30.0).abs() < f64::EPSILON);
365        assert!((stats.fallback_percentage() - 5.0).abs() < f64::EPSILON);
366    }
367
368    #[test]
369    fn test_top_hints() {
370        let mut by_hint = HashMap::new();
371        by_hint.insert("route".to_string(), 100);
372        by_hint.insert("node".to_string(), 50);
373        by_hint.insert("lag".to_string(), 25);
374
375        let usage = HintUsageStats { by_hint };
376        let top = usage.top_hints(2);
377
378        assert_eq!(top.len(), 2);
379        assert_eq!(top[0].0, "route");
380        assert_eq!(top[1].0, "node");
381    }
382}