use super::RouteTarget;
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
pub struct RoutingMetrics {
total_routed: AtomicU64,
with_hints: AtomicU64,
without_hints: AtomicU64,
invalid_hints: AtomicU64,
fallback_count: AtomicU64,
no_nodes_count: AtomicU64,
total_routing_time_us: AtomicU64,
target_counts: RwLock<HashMap<RouteTarget, u64>>,
hint_usage: RwLock<HashMap<String, u64>>,
recent_decisions: RwLock<Vec<RoutingDecisionRecord>>,
max_recent: usize,
}
impl RoutingMetrics {
pub fn new() -> Self {
Self {
total_routed: AtomicU64::new(0),
with_hints: AtomicU64::new(0),
without_hints: AtomicU64::new(0),
invalid_hints: AtomicU64::new(0),
fallback_count: AtomicU64::new(0),
no_nodes_count: AtomicU64::new(0),
total_routing_time_us: AtomicU64::new(0),
target_counts: RwLock::new(HashMap::new()),
hint_usage: RwLock::new(HashMap::new()),
recent_decisions: RwLock::new(Vec::new()),
max_recent: 100,
}
}
pub fn record_routing(
&self,
target: Option<RouteTarget>,
had_hints: bool,
elapsed: Duration,
) {
self.total_routed.fetch_add(1, Ordering::SeqCst);
if had_hints {
self.with_hints.fetch_add(1, Ordering::SeqCst);
} else {
self.without_hints.fetch_add(1, Ordering::SeqCst);
}
self.total_routing_time_us
.fetch_add(elapsed.as_micros() as u64, Ordering::SeqCst);
if let Some(t) = target {
let _target = t;
tokio::spawn(async move {
});
}
}
pub fn record_invalid_hints(&self) {
self.invalid_hints.fetch_add(1, Ordering::SeqCst);
}
pub fn record_fallback(&self) {
self.fallback_count.fetch_add(1, Ordering::SeqCst);
}
pub fn record_no_nodes(&self) {
self.no_nodes_count.fetch_add(1, Ordering::SeqCst);
}
pub async fn record_hint(&self, hint_name: &str) {
let mut usage = self.hint_usage.write().await;
*usage.entry(hint_name.to_string()).or_insert(0) += 1;
}
pub async fn record_decision(&self, record: RoutingDecisionRecord) {
let mut recent = self.recent_decisions.write().await;
recent.push(record);
if recent.len() > self.max_recent {
recent.remove(0);
}
}
pub fn snapshot(&self) -> RoutingStats {
let total = self.total_routed.load(Ordering::SeqCst);
let total_time_us = self.total_routing_time_us.load(Ordering::SeqCst);
RoutingStats {
total_routed: total,
with_hints: self.with_hints.load(Ordering::SeqCst),
without_hints: self.without_hints.load(Ordering::SeqCst),
invalid_hints: self.invalid_hints.load(Ordering::SeqCst),
fallback_count: self.fallback_count.load(Ordering::SeqCst),
no_nodes_count: self.no_nodes_count.load(Ordering::SeqCst),
avg_routing_time_us: if total > 0 { total_time_us / total } else { 0 },
}
}
pub async fn hint_usage(&self) -> HintUsageStats {
let usage = self.hint_usage.read().await;
HintUsageStats {
by_hint: usage.clone(),
}
}
pub async fn recent_decisions(&self, limit: usize) -> Vec<RoutingDecisionRecord> {
let recent = self.recent_decisions.read().await;
recent.iter().rev().take(limit).cloned().collect()
}
pub async fn target_distribution(&self) -> HashMap<RouteTarget, u64> {
self.target_counts.read().await.clone()
}
pub async fn reset(&self) {
self.total_routed.store(0, Ordering::SeqCst);
self.with_hints.store(0, Ordering::SeqCst);
self.without_hints.store(0, Ordering::SeqCst);
self.invalid_hints.store(0, Ordering::SeqCst);
self.fallback_count.store(0, Ordering::SeqCst);
self.no_nodes_count.store(0, Ordering::SeqCst);
self.total_routing_time_us.store(0, Ordering::SeqCst);
self.target_counts.write().await.clear();
self.hint_usage.write().await.clear();
self.recent_decisions.write().await.clear();
}
}
impl Default for RoutingMetrics {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct RoutingStats {
pub total_routed: u64,
pub with_hints: u64,
pub without_hints: u64,
pub invalid_hints: u64,
pub fallback_count: u64,
pub no_nodes_count: u64,
pub avg_routing_time_us: u64,
}
impl RoutingStats {
pub fn hints_percentage(&self) -> f64 {
if self.total_routed == 0 {
0.0
} else {
(self.with_hints as f64 / self.total_routed as f64) * 100.0
}
}
pub fn fallback_percentage(&self) -> f64 {
if self.total_routed == 0 {
0.0
} else {
(self.fallback_count as f64 / self.total_routed as f64) * 100.0
}
}
}
#[derive(Debug, Clone)]
pub struct HintUsageStats {
pub by_hint: HashMap<String, u64>,
}
impl HintUsageStats {
pub fn top_hints(&self, n: usize) -> Vec<(String, u64)> {
let mut hints: Vec<_> = self.by_hint.iter()
.map(|(k, v)| (k.clone(), *v))
.collect();
hints.sort_by(|a, b| b.1.cmp(&a.1));
hints.truncate(n);
hints
}
}
#[derive(Debug, Clone)]
pub struct RoutingDecisionRecord {
pub query_hash: u64,
pub target_node: Option<String>,
pub route_target: Option<RouteTarget>,
pub hints: Vec<String>,
pub reason: String,
pub timestamp: Instant,
pub elapsed_us: u64,
}
impl RoutingDecisionRecord {
pub fn new(
query: &str,
target_node: Option<String>,
route_target: Option<RouteTarget>,
hints: Vec<String>,
reason: String,
elapsed: Duration,
) -> Self {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
query.hash(&mut hasher);
Self {
query_hash: hasher.finish(),
target_node,
route_target,
hints,
reason,
timestamp: Instant::now(),
elapsed_us: elapsed.as_micros() as u64,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_record_routing() {
let metrics = RoutingMetrics::new();
metrics.record_routing(Some(RouteTarget::Primary), true, Duration::from_micros(100));
metrics.record_routing(Some(RouteTarget::Standby), false, Duration::from_micros(50));
metrics.record_routing(Some(RouteTarget::Async), true, Duration::from_micros(75));
let stats = metrics.snapshot();
assert_eq!(stats.total_routed, 3);
assert_eq!(stats.with_hints, 2);
assert_eq!(stats.without_hints, 1);
}
#[tokio::test]
async fn test_record_errors() {
let metrics = RoutingMetrics::new();
metrics.record_invalid_hints();
metrics.record_invalid_hints();
metrics.record_fallback();
metrics.record_no_nodes();
let stats = metrics.snapshot();
assert_eq!(stats.invalid_hints, 2);
assert_eq!(stats.fallback_count, 1);
assert_eq!(stats.no_nodes_count, 1);
}
#[tokio::test]
async fn test_hint_usage() {
let metrics = RoutingMetrics::new();
metrics.record_hint("route").await;
metrics.record_hint("route").await;
metrics.record_hint("node").await;
let usage = metrics.hint_usage().await;
assert_eq!(usage.by_hint.get("route"), Some(&2));
assert_eq!(usage.by_hint.get("node"), Some(&1));
}
#[tokio::test]
async fn test_recent_decisions() {
let metrics = RoutingMetrics::new();
for i in 0..5 {
metrics.record_decision(RoutingDecisionRecord::new(
&format!("SELECT {}", i),
Some("node".to_string()),
Some(RouteTarget::Standby),
vec!["route".to_string()],
"test".to_string(),
Duration::from_micros(100),
)).await;
}
let recent = metrics.recent_decisions(3).await;
assert_eq!(recent.len(), 3);
}
#[tokio::test]
async fn test_reset() {
let metrics = RoutingMetrics::new();
metrics.record_routing(Some(RouteTarget::Primary), true, Duration::from_micros(100));
metrics.record_hint("route").await;
metrics.reset().await;
let stats = metrics.snapshot();
assert_eq!(stats.total_routed, 0);
let usage = metrics.hint_usage().await;
assert!(usage.by_hint.is_empty());
}
#[test]
fn test_stats_percentages() {
let stats = RoutingStats {
total_routed: 100,
with_hints: 30,
without_hints: 70,
invalid_hints: 2,
fallback_count: 5,
no_nodes_count: 1,
avg_routing_time_us: 50,
};
assert!((stats.hints_percentage() - 30.0).abs() < f64::EPSILON);
assert!((stats.fallback_percentage() - 5.0).abs() < f64::EPSILON);
}
#[test]
fn test_top_hints() {
let mut by_hint = HashMap::new();
by_hint.insert("route".to_string(), 100);
by_hint.insert("node".to_string(), 50);
by_hint.insert("lag".to_string(), 25);
let usage = HintUsageStats { by_hint };
let top = usage.top_hints(2);
assert_eq!(top.len(), 2);
assert_eq!(top[0].0, "route");
assert_eq!(top[1].0, "node");
}
}