1use super::RouteTarget;
6use std::collections::HashMap;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::time::{Duration, Instant};
9use tokio::sync::RwLock;
10
11pub struct RoutingMetrics {
13 total_routed: AtomicU64,
15 with_hints: AtomicU64,
17 without_hints: AtomicU64,
19 invalid_hints: AtomicU64,
21 fallback_count: AtomicU64,
23 no_nodes_count: AtomicU64,
25 total_routing_time_us: AtomicU64,
27 target_counts: RwLock<HashMap<RouteTarget, u64>>,
29 hint_usage: RwLock<HashMap<String, u64>>,
31 recent_decisions: RwLock<Vec<RoutingDecisionRecord>>,
33 max_recent: usize,
35}
36
37impl RoutingMetrics {
38 pub fn new() -> Self {
40 Self {
41 total_routed: AtomicU64::new(0),
42 with_hints: AtomicU64::new(0),
43 without_hints: AtomicU64::new(0),
44 invalid_hints: AtomicU64::new(0),
45 fallback_count: AtomicU64::new(0),
46 no_nodes_count: AtomicU64::new(0),
47 total_routing_time_us: AtomicU64::new(0),
48 target_counts: RwLock::new(HashMap::new()),
49 hint_usage: RwLock::new(HashMap::new()),
50 recent_decisions: RwLock::new(Vec::new()),
51 max_recent: 100,
52 }
53 }
54
55 pub fn record_routing(&self, target: Option<RouteTarget>, had_hints: bool, elapsed: Duration) {
57 self.total_routed.fetch_add(1, Ordering::SeqCst);
58
59 if had_hints {
60 self.with_hints.fetch_add(1, Ordering::SeqCst);
61 } else {
62 self.without_hints.fetch_add(1, Ordering::SeqCst);
63 }
64
65 self.total_routing_time_us
66 .fetch_add(elapsed.as_micros() as u64, Ordering::SeqCst);
67
68 if let Some(t) = target {
70 let _target = t;
71 tokio::spawn(async move {
72 });
75 }
76 }
77
78 pub fn record_invalid_hints(&self) {
80 self.invalid_hints.fetch_add(1, Ordering::SeqCst);
81 }
82
83 pub fn record_fallback(&self) {
85 self.fallback_count.fetch_add(1, Ordering::SeqCst);
86 }
87
88 pub fn record_no_nodes(&self) {
90 self.no_nodes_count.fetch_add(1, Ordering::SeqCst);
91 }
92
93 pub async fn record_hint(&self, hint_name: &str) {
95 let mut usage = self.hint_usage.write().await;
96 *usage.entry(hint_name.to_string()).or_insert(0) += 1;
97 }
98
99 pub async fn record_decision(&self, record: RoutingDecisionRecord) {
101 let mut recent = self.recent_decisions.write().await;
102 recent.push(record);
103
104 if recent.len() > self.max_recent {
106 recent.remove(0);
107 }
108 }
109
110 pub fn snapshot(&self) -> RoutingStats {
112 let total = self.total_routed.load(Ordering::SeqCst);
113 let total_time_us = self.total_routing_time_us.load(Ordering::SeqCst);
114
115 RoutingStats {
116 total_routed: total,
117 with_hints: self.with_hints.load(Ordering::SeqCst),
118 without_hints: self.without_hints.load(Ordering::SeqCst),
119 invalid_hints: self.invalid_hints.load(Ordering::SeqCst),
120 fallback_count: self.fallback_count.load(Ordering::SeqCst),
121 no_nodes_count: self.no_nodes_count.load(Ordering::SeqCst),
122 avg_routing_time_us: total_time_us.checked_div(total).unwrap_or(0),
123 }
124 }
125
126 pub async fn hint_usage(&self) -> HintUsageStats {
128 let usage = self.hint_usage.read().await;
129 HintUsageStats {
130 by_hint: usage.clone(),
131 }
132 }
133
134 pub async fn recent_decisions(&self, limit: usize) -> Vec<RoutingDecisionRecord> {
136 let recent = self.recent_decisions.read().await;
137 recent.iter().rev().take(limit).cloned().collect()
138 }
139
140 pub async fn target_distribution(&self) -> HashMap<RouteTarget, u64> {
142 self.target_counts.read().await.clone()
143 }
144
145 pub async fn reset(&self) {
147 self.total_routed.store(0, Ordering::SeqCst);
148 self.with_hints.store(0, Ordering::SeqCst);
149 self.without_hints.store(0, Ordering::SeqCst);
150 self.invalid_hints.store(0, Ordering::SeqCst);
151 self.fallback_count.store(0, Ordering::SeqCst);
152 self.no_nodes_count.store(0, Ordering::SeqCst);
153 self.total_routing_time_us.store(0, Ordering::SeqCst);
154 self.target_counts.write().await.clear();
155 self.hint_usage.write().await.clear();
156 self.recent_decisions.write().await.clear();
157 }
158}
159
160impl Default for RoutingMetrics {
161 fn default() -> Self {
162 Self::new()
163 }
164}
165
166#[derive(Debug, Clone)]
168pub struct RoutingStats {
169 pub total_routed: u64,
171 pub with_hints: u64,
173 pub without_hints: u64,
175 pub invalid_hints: u64,
177 pub fallback_count: u64,
179 pub no_nodes_count: u64,
181 pub avg_routing_time_us: u64,
183}
184
185impl RoutingStats {
186 pub fn hints_percentage(&self) -> f64 {
188 if self.total_routed == 0 {
189 0.0
190 } else {
191 (self.with_hints as f64 / self.total_routed as f64) * 100.0
192 }
193 }
194
195 pub fn fallback_percentage(&self) -> f64 {
197 if self.total_routed == 0 {
198 0.0
199 } else {
200 (self.fallback_count as f64 / self.total_routed as f64) * 100.0
201 }
202 }
203}
204
205#[derive(Debug, Clone)]
207pub struct HintUsageStats {
208 pub by_hint: HashMap<String, u64>,
210}
211
212impl HintUsageStats {
213 pub fn top_hints(&self, n: usize) -> Vec<(String, u64)> {
215 let mut hints: Vec<_> = self.by_hint.iter().map(|(k, v)| (k.clone(), *v)).collect();
216 hints.sort_by_key(|b| std::cmp::Reverse(b.1));
217 hints.truncate(n);
218 hints
219 }
220}
221
222#[derive(Debug, Clone)]
224pub struct RoutingDecisionRecord {
225 pub query_hash: u64,
227 pub target_node: Option<String>,
229 pub route_target: Option<RouteTarget>,
231 pub hints: Vec<String>,
233 pub reason: String,
235 pub timestamp: Instant,
237 pub elapsed_us: u64,
239}
240
241impl RoutingDecisionRecord {
242 pub fn new(
244 query: &str,
245 target_node: Option<String>,
246 route_target: Option<RouteTarget>,
247 hints: Vec<String>,
248 reason: String,
249 elapsed: Duration,
250 ) -> Self {
251 use std::collections::hash_map::DefaultHasher;
252 use std::hash::{Hash, Hasher};
253
254 let mut hasher = DefaultHasher::new();
255 query.hash(&mut hasher);
256
257 Self {
258 query_hash: hasher.finish(),
259 target_node,
260 route_target,
261 hints,
262 reason,
263 timestamp: Instant::now(),
264 elapsed_us: elapsed.as_micros() as u64,
265 }
266 }
267}
268
269#[cfg(test)]
270mod tests {
271 use super::*;
272
273 #[tokio::test]
274 async fn test_record_routing() {
275 let metrics = RoutingMetrics::new();
276
277 metrics.record_routing(Some(RouteTarget::Primary), true, Duration::from_micros(100));
278 metrics.record_routing(Some(RouteTarget::Standby), false, Duration::from_micros(50));
279 metrics.record_routing(Some(RouteTarget::Async), true, Duration::from_micros(75));
280
281 let stats = metrics.snapshot();
282 assert_eq!(stats.total_routed, 3);
283 assert_eq!(stats.with_hints, 2);
284 assert_eq!(stats.without_hints, 1);
285 }
286
287 #[tokio::test]
288 async fn test_record_errors() {
289 let metrics = RoutingMetrics::new();
290
291 metrics.record_invalid_hints();
292 metrics.record_invalid_hints();
293 metrics.record_fallback();
294 metrics.record_no_nodes();
295
296 let stats = metrics.snapshot();
297 assert_eq!(stats.invalid_hints, 2);
298 assert_eq!(stats.fallback_count, 1);
299 assert_eq!(stats.no_nodes_count, 1);
300 }
301
302 #[tokio::test]
303 async fn test_hint_usage() {
304 let metrics = RoutingMetrics::new();
305
306 metrics.record_hint("route").await;
307 metrics.record_hint("route").await;
308 metrics.record_hint("node").await;
309
310 let usage = metrics.hint_usage().await;
311 assert_eq!(usage.by_hint.get("route"), Some(&2));
312 assert_eq!(usage.by_hint.get("node"), Some(&1));
313 }
314
315 #[tokio::test]
316 async fn test_recent_decisions() {
317 let metrics = RoutingMetrics::new();
318
319 for i in 0..5 {
320 metrics
321 .record_decision(RoutingDecisionRecord::new(
322 &format!("SELECT {}", i),
323 Some("node".to_string()),
324 Some(RouteTarget::Standby),
325 vec!["route".to_string()],
326 "test".to_string(),
327 Duration::from_micros(100),
328 ))
329 .await;
330 }
331
332 let recent = metrics.recent_decisions(3).await;
333 assert_eq!(recent.len(), 3);
334 }
335
336 #[tokio::test]
337 async fn test_reset() {
338 let metrics = RoutingMetrics::new();
339
340 metrics.record_routing(Some(RouteTarget::Primary), true, Duration::from_micros(100));
341 metrics.record_hint("route").await;
342
343 metrics.reset().await;
344
345 let stats = metrics.snapshot();
346 assert_eq!(stats.total_routed, 0);
347
348 let usage = metrics.hint_usage().await;
349 assert!(usage.by_hint.is_empty());
350 }
351
352 #[test]
353 fn test_stats_percentages() {
354 let stats = RoutingStats {
355 total_routed: 100,
356 with_hints: 30,
357 without_hints: 70,
358 invalid_hints: 2,
359 fallback_count: 5,
360 no_nodes_count: 1,
361 avg_routing_time_us: 50,
362 };
363
364 assert!((stats.hints_percentage() - 30.0).abs() < f64::EPSILON);
365 assert!((stats.fallback_percentage() - 5.0).abs() < f64::EPSILON);
366 }
367
368 #[test]
369 fn test_top_hints() {
370 let mut by_hint = HashMap::new();
371 by_hint.insert("route".to_string(), 100);
372 by_hint.insert("node".to_string(), 50);
373 by_hint.insert("lag".to_string(), 25);
374
375 let usage = HintUsageStats { by_hint };
376 let top = usage.top_hints(2);
377
378 assert_eq!(top.len(), 2);
379 assert_eq!(top[0].0, "route");
380 assert_eq!(top[1].0, "node");
381 }
382}