1use super::RouteTarget;
6use std::collections::HashMap;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::time::{Duration, Instant};
9use tokio::sync::RwLock;
10
11pub struct RoutingMetrics {
13 total_routed: AtomicU64,
15 with_hints: AtomicU64,
17 without_hints: AtomicU64,
19 invalid_hints: AtomicU64,
21 fallback_count: AtomicU64,
23 no_nodes_count: AtomicU64,
25 total_routing_time_us: AtomicU64,
27 target_counts: RwLock<HashMap<RouteTarget, u64>>,
29 hint_usage: RwLock<HashMap<String, u64>>,
31 recent_decisions: RwLock<Vec<RoutingDecisionRecord>>,
33 max_recent: usize,
35}
36
37impl RoutingMetrics {
38 pub fn new() -> Self {
40 Self {
41 total_routed: AtomicU64::new(0),
42 with_hints: AtomicU64::new(0),
43 without_hints: AtomicU64::new(0),
44 invalid_hints: AtomicU64::new(0),
45 fallback_count: AtomicU64::new(0),
46 no_nodes_count: AtomicU64::new(0),
47 total_routing_time_us: AtomicU64::new(0),
48 target_counts: RwLock::new(HashMap::new()),
49 hint_usage: RwLock::new(HashMap::new()),
50 recent_decisions: RwLock::new(Vec::new()),
51 max_recent: 100,
52 }
53 }
54
55 pub fn record_routing(
57 &self,
58 target: Option<RouteTarget>,
59 had_hints: bool,
60 elapsed: Duration,
61 ) {
62 self.total_routed.fetch_add(1, Ordering::SeqCst);
63
64 if had_hints {
65 self.with_hints.fetch_add(1, Ordering::SeqCst);
66 } else {
67 self.without_hints.fetch_add(1, Ordering::SeqCst);
68 }
69
70 self.total_routing_time_us
71 .fetch_add(elapsed.as_micros() as u64, Ordering::SeqCst);
72
73 if let Some(t) = target {
75 let _target = t;
76 tokio::spawn(async move {
77 });
80 }
81 }
82
83 pub fn record_invalid_hints(&self) {
85 self.invalid_hints.fetch_add(1, Ordering::SeqCst);
86 }
87
88 pub fn record_fallback(&self) {
90 self.fallback_count.fetch_add(1, Ordering::SeqCst);
91 }
92
93 pub fn record_no_nodes(&self) {
95 self.no_nodes_count.fetch_add(1, Ordering::SeqCst);
96 }
97
98 pub async fn record_hint(&self, hint_name: &str) {
100 let mut usage = self.hint_usage.write().await;
101 *usage.entry(hint_name.to_string()).or_insert(0) += 1;
102 }
103
104 pub async fn record_decision(&self, record: RoutingDecisionRecord) {
106 let mut recent = self.recent_decisions.write().await;
107 recent.push(record);
108
109 if recent.len() > self.max_recent {
111 recent.remove(0);
112 }
113 }
114
115 pub fn snapshot(&self) -> RoutingStats {
117 let total = self.total_routed.load(Ordering::SeqCst);
118 let total_time_us = self.total_routing_time_us.load(Ordering::SeqCst);
119
120 RoutingStats {
121 total_routed: total,
122 with_hints: self.with_hints.load(Ordering::SeqCst),
123 without_hints: self.without_hints.load(Ordering::SeqCst),
124 invalid_hints: self.invalid_hints.load(Ordering::SeqCst),
125 fallback_count: self.fallback_count.load(Ordering::SeqCst),
126 no_nodes_count: self.no_nodes_count.load(Ordering::SeqCst),
127 avg_routing_time_us: if total > 0 { total_time_us / total } else { 0 },
128 }
129 }
130
131 pub async fn hint_usage(&self) -> HintUsageStats {
133 let usage = self.hint_usage.read().await;
134 HintUsageStats {
135 by_hint: usage.clone(),
136 }
137 }
138
139 pub async fn recent_decisions(&self, limit: usize) -> Vec<RoutingDecisionRecord> {
141 let recent = self.recent_decisions.read().await;
142 recent.iter().rev().take(limit).cloned().collect()
143 }
144
145 pub async fn target_distribution(&self) -> HashMap<RouteTarget, u64> {
147 self.target_counts.read().await.clone()
148 }
149
150 pub async fn reset(&self) {
152 self.total_routed.store(0, Ordering::SeqCst);
153 self.with_hints.store(0, Ordering::SeqCst);
154 self.without_hints.store(0, Ordering::SeqCst);
155 self.invalid_hints.store(0, Ordering::SeqCst);
156 self.fallback_count.store(0, Ordering::SeqCst);
157 self.no_nodes_count.store(0, Ordering::SeqCst);
158 self.total_routing_time_us.store(0, Ordering::SeqCst);
159 self.target_counts.write().await.clear();
160 self.hint_usage.write().await.clear();
161 self.recent_decisions.write().await.clear();
162 }
163}
164
165impl Default for RoutingMetrics {
166 fn default() -> Self {
167 Self::new()
168 }
169}
170
171#[derive(Debug, Clone)]
173pub struct RoutingStats {
174 pub total_routed: u64,
176 pub with_hints: u64,
178 pub without_hints: u64,
180 pub invalid_hints: u64,
182 pub fallback_count: u64,
184 pub no_nodes_count: u64,
186 pub avg_routing_time_us: u64,
188}
189
190impl RoutingStats {
191 pub fn hints_percentage(&self) -> f64 {
193 if self.total_routed == 0 {
194 0.0
195 } else {
196 (self.with_hints as f64 / self.total_routed as f64) * 100.0
197 }
198 }
199
200 pub fn fallback_percentage(&self) -> f64 {
202 if self.total_routed == 0 {
203 0.0
204 } else {
205 (self.fallback_count as f64 / self.total_routed as f64) * 100.0
206 }
207 }
208}
209
210#[derive(Debug, Clone)]
212pub struct HintUsageStats {
213 pub by_hint: HashMap<String, u64>,
215}
216
217impl HintUsageStats {
218 pub fn top_hints(&self, n: usize) -> Vec<(String, u64)> {
220 let mut hints: Vec<_> = self.by_hint.iter()
221 .map(|(k, v)| (k.clone(), *v))
222 .collect();
223 hints.sort_by(|a, b| b.1.cmp(&a.1));
224 hints.truncate(n);
225 hints
226 }
227}
228
229#[derive(Debug, Clone)]
231pub struct RoutingDecisionRecord {
232 pub query_hash: u64,
234 pub target_node: Option<String>,
236 pub route_target: Option<RouteTarget>,
238 pub hints: Vec<String>,
240 pub reason: String,
242 pub timestamp: Instant,
244 pub elapsed_us: u64,
246}
247
248impl RoutingDecisionRecord {
249 pub fn new(
251 query: &str,
252 target_node: Option<String>,
253 route_target: Option<RouteTarget>,
254 hints: Vec<String>,
255 reason: String,
256 elapsed: Duration,
257 ) -> Self {
258 use std::collections::hash_map::DefaultHasher;
259 use std::hash::{Hash, Hasher};
260
261 let mut hasher = DefaultHasher::new();
262 query.hash(&mut hasher);
263
264 Self {
265 query_hash: hasher.finish(),
266 target_node,
267 route_target,
268 hints,
269 reason,
270 timestamp: Instant::now(),
271 elapsed_us: elapsed.as_micros() as u64,
272 }
273 }
274}
275
276#[cfg(test)]
277mod tests {
278 use super::*;
279
280 #[tokio::test]
281 async fn test_record_routing() {
282 let metrics = RoutingMetrics::new();
283
284 metrics.record_routing(Some(RouteTarget::Primary), true, Duration::from_micros(100));
285 metrics.record_routing(Some(RouteTarget::Standby), false, Duration::from_micros(50));
286 metrics.record_routing(Some(RouteTarget::Async), true, Duration::from_micros(75));
287
288 let stats = metrics.snapshot();
289 assert_eq!(stats.total_routed, 3);
290 assert_eq!(stats.with_hints, 2);
291 assert_eq!(stats.without_hints, 1);
292 }
293
294 #[tokio::test]
295 async fn test_record_errors() {
296 let metrics = RoutingMetrics::new();
297
298 metrics.record_invalid_hints();
299 metrics.record_invalid_hints();
300 metrics.record_fallback();
301 metrics.record_no_nodes();
302
303 let stats = metrics.snapshot();
304 assert_eq!(stats.invalid_hints, 2);
305 assert_eq!(stats.fallback_count, 1);
306 assert_eq!(stats.no_nodes_count, 1);
307 }
308
309 #[tokio::test]
310 async fn test_hint_usage() {
311 let metrics = RoutingMetrics::new();
312
313 metrics.record_hint("route").await;
314 metrics.record_hint("route").await;
315 metrics.record_hint("node").await;
316
317 let usage = metrics.hint_usage().await;
318 assert_eq!(usage.by_hint.get("route"), Some(&2));
319 assert_eq!(usage.by_hint.get("node"), Some(&1));
320 }
321
322 #[tokio::test]
323 async fn test_recent_decisions() {
324 let metrics = RoutingMetrics::new();
325
326 for i in 0..5 {
327 metrics.record_decision(RoutingDecisionRecord::new(
328 &format!("SELECT {}", i),
329 Some("node".to_string()),
330 Some(RouteTarget::Standby),
331 vec!["route".to_string()],
332 "test".to_string(),
333 Duration::from_micros(100),
334 )).await;
335 }
336
337 let recent = metrics.recent_decisions(3).await;
338 assert_eq!(recent.len(), 3);
339 }
340
341 #[tokio::test]
342 async fn test_reset() {
343 let metrics = RoutingMetrics::new();
344
345 metrics.record_routing(Some(RouteTarget::Primary), true, Duration::from_micros(100));
346 metrics.record_hint("route").await;
347
348 metrics.reset().await;
349
350 let stats = metrics.snapshot();
351 assert_eq!(stats.total_routed, 0);
352
353 let usage = metrics.hint_usage().await;
354 assert!(usage.by_hint.is_empty());
355 }
356
357 #[test]
358 fn test_stats_percentages() {
359 let stats = RoutingStats {
360 total_routed: 100,
361 with_hints: 30,
362 without_hints: 70,
363 invalid_hints: 2,
364 fallback_count: 5,
365 no_nodes_count: 1,
366 avg_routing_time_us: 50,
367 };
368
369 assert!((stats.hints_percentage() - 30.0).abs() < f64::EPSILON);
370 assert!((stats.fallback_percentage() - 5.0).abs() < f64::EPSILON);
371 }
372
373 #[test]
374 fn test_top_hints() {
375 let mut by_hint = HashMap::new();
376 by_hint.insert("route".to_string(), 100);
377 by_hint.insert("node".to_string(), 50);
378 by_hint.insert("lag".to_string(), 25);
379
380 let usage = HintUsageStats { by_hint };
381 let top = usage.top_hints(2);
382
383 assert_eq!(top.len(), 2);
384 assert_eq!(top[0].0, "route");
385 assert_eq!(top[1].0, "node");
386 }
387}