1use std::collections::HashMap;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10use tokio::sync::RwLock;
11
12use super::{
13 AIWorkloadType, AccessPattern, DataTemperature, RAGStage, RoutingDecision, WorkloadType,
14};
15
16pub struct SchemaRoutingMetrics {
18 table_stats: Arc<RwLock<HashMap<String, TableStats>>>,
20 workload_stats: Arc<RwLock<HashMap<WorkloadType, WorkloadStats>>>,
22 temperature_stats: Arc<RwLock<HashMap<DataTemperature, TemperatureStats>>>,
24 ai_stats: Arc<RwLock<AIWorkloadStats>>,
26 rag_stats: Arc<RwLock<RAGStats>>,
28 routing_stats: Arc<RoutingStats>,
30 node_stats: Arc<RwLock<HashMap<String, NodeStats>>>,
32 shard_stats: Arc<RwLock<HashMap<u32, ShardStats>>>,
34 start_time: Instant,
36}
37
38#[derive(Debug, Clone, Default)]
40pub struct TableStats {
41 pub table_name: String,
43 pub total_queries: u64,
45 pub by_access_pattern: HashMap<AccessPattern, u64>,
47 pub by_workload: HashMap<WorkloadType, u64>,
49 pub avg_latency_us: u64,
51 pub p99_latency_us: u64,
53 pub shard_hit_rate: f64,
55 pub cache_hit_rate: f64,
57 pub last_query_time: Option<Instant>,
59}
60
61impl TableStats {
62 pub fn new(table_name: &str) -> Self {
64 Self {
65 table_name: table_name.to_string(),
66 ..Default::default()
67 }
68 }
69
70 pub fn record_query(
72 &mut self,
73 pattern: AccessPattern,
74 workload: WorkloadType,
75 latency_us: u64,
76 ) {
77 self.total_queries += 1;
78 *self.by_access_pattern.entry(pattern).or_insert(0) += 1;
79 *self.by_workload.entry(workload).or_insert(0) += 1;
80
81 if self.avg_latency_us == 0 {
83 self.avg_latency_us = latency_us;
84 } else {
85 self.avg_latency_us = (self.avg_latency_us * 9 + latency_us) / 10;
86 }
87
88 self.last_query_time = Some(Instant::now());
89 }
90}
91
92#[derive(Debug, Clone, Default)]
94pub struct WorkloadStats {
95 pub total_queries: u64,
97 pub routed_to_primary: u64,
99 pub routed_to_replica: u64,
101 pub scatter_gather: u64,
103 pub avg_latency_us: u64,
105 pub tables: Vec<String>,
107}
108
109impl WorkloadStats {
110 pub fn record(&mut self, to_primary: bool, is_scatter: bool, latency_us: u64) {
112 self.total_queries += 1;
113
114 if is_scatter {
115 self.scatter_gather += 1;
116 } else if to_primary {
117 self.routed_to_primary += 1;
118 } else {
119 self.routed_to_replica += 1;
120 }
121
122 if self.avg_latency_us == 0 {
124 self.avg_latency_us = latency_us;
125 } else {
126 self.avg_latency_us = (self.avg_latency_us * 9 + latency_us) / 10;
127 }
128 }
129}
130
131#[derive(Debug, Clone, Default)]
133pub struct TemperatureStats {
134 pub total_queries: u64,
136 pub table_count: u64,
138 pub total_size_bytes: u64,
140 pub cache_hit_rate: f64,
142 pub avg_latency_us: u64,
144}
145
146#[derive(Debug, Clone, Default)]
148pub struct AIWorkloadStats {
149 pub total_queries: u64,
151 pub by_type: HashMap<String, u64>,
153 pub embedding_retrieval: u64,
155 pub context_lookup: u64,
157 pub knowledge_base: u64,
159 pub tool_execution: u64,
161 pub avg_vector_dimensions: u64,
163 pub avg_top_k: u64,
165}
166
167impl AIWorkloadStats {
168 pub fn record(
170 &mut self,
171 workload_type: AIWorkloadType,
172 vector_dims: Option<u64>,
173 top_k: Option<u64>,
174 ) {
175 self.total_queries += 1;
176
177 match workload_type {
178 AIWorkloadType::EmbeddingRetrieval => {
179 self.embedding_retrieval += 1;
180 *self
181 .by_type
182 .entry("embedding_retrieval".to_string())
183 .or_insert(0) += 1;
184 }
185 AIWorkloadType::ContextLookup => {
186 self.context_lookup += 1;
187 *self
188 .by_type
189 .entry("context_lookup".to_string())
190 .or_insert(0) += 1;
191 }
192 AIWorkloadType::KnowledgeBase => {
193 self.knowledge_base += 1;
194 *self
195 .by_type
196 .entry("knowledge_base".to_string())
197 .or_insert(0) += 1;
198 }
199 AIWorkloadType::ToolExecution => {
200 self.tool_execution += 1;
201 *self
202 .by_type
203 .entry("tool_execution".to_string())
204 .or_insert(0) += 1;
205 }
206 }
207
208 if let Some(dims) = vector_dims {
209 self.avg_vector_dimensions = (self.avg_vector_dimensions * 9 + dims) / 10;
210 }
211
212 if let Some(k) = top_k {
213 self.avg_top_k = (self.avg_top_k * 9 + k) / 10;
214 }
215 }
216}
217
218#[derive(Debug, Clone, Default)]
220pub struct RAGStats {
221 pub total_queries: u64,
223 pub retrieval_count: u64,
225 pub fetch_count: u64,
227 pub rerank_count: u64,
229 pub generate_count: u64,
231 pub avg_retrieval_latency_us: u64,
233 pub avg_fetch_latency_us: u64,
235 pub avg_pipeline_latency_us: u64,
237}
238
239impl RAGStats {
240 pub fn record_stage(&mut self, stage: RAGStage, latency_us: u64) {
242 self.total_queries += 1;
243
244 match stage {
245 RAGStage::Retrieval => {
246 self.retrieval_count += 1;
247 if self.avg_retrieval_latency_us == 0 {
248 self.avg_retrieval_latency_us = latency_us;
249 } else {
250 self.avg_retrieval_latency_us =
251 (self.avg_retrieval_latency_us * 9 + latency_us) / 10;
252 }
253 }
254 RAGStage::Fetch => {
255 self.fetch_count += 1;
256 if self.avg_fetch_latency_us == 0 {
257 self.avg_fetch_latency_us = latency_us;
258 } else {
259 self.avg_fetch_latency_us = (self.avg_fetch_latency_us * 9 + latency_us) / 10;
260 }
261 }
262 RAGStage::Rerank => {
263 self.rerank_count += 1;
264 }
265 RAGStage::Generate => {
266 self.generate_count += 1;
267 }
268 }
269 }
270}
271
272pub struct RoutingStats {
274 pub total_queries: AtomicU64,
276 pub schema_aware_routes: AtomicU64,
278 pub fallback_routes: AtomicU64,
280 pub shard_targeted: AtomicU64,
282 pub scatter_gather: AtomicU64,
284 pub primary_routes: AtomicU64,
286 pub replica_routes: AtomicU64,
288 pub ai_routes: AtomicU64,
290 pub rag_routes: AtomicU64,
292 pub vector_routes: AtomicU64,
294 pub classification_hits: AtomicU64,
296 pub classification_misses: AtomicU64,
298 pub routing_errors: AtomicU64,
300}
301
302impl Default for RoutingStats {
303 fn default() -> Self {
304 Self {
305 total_queries: AtomicU64::new(0),
306 schema_aware_routes: AtomicU64::new(0),
307 fallback_routes: AtomicU64::new(0),
308 shard_targeted: AtomicU64::new(0),
309 scatter_gather: AtomicU64::new(0),
310 primary_routes: AtomicU64::new(0),
311 replica_routes: AtomicU64::new(0),
312 ai_routes: AtomicU64::new(0),
313 rag_routes: AtomicU64::new(0),
314 vector_routes: AtomicU64::new(0),
315 classification_hits: AtomicU64::new(0),
316 classification_misses: AtomicU64::new(0),
317 routing_errors: AtomicU64::new(0),
318 }
319 }
320}
321
322impl RoutingStats {
323 pub fn total_queries(&self) -> u64 {
325 self.total_queries.load(Ordering::Relaxed)
326 }
327
328 pub fn schema_aware_percentage(&self) -> f64 {
330 let total = self.total_queries.load(Ordering::Relaxed);
331 if total == 0 {
332 return 0.0;
333 }
334 let schema_aware = self.schema_aware_routes.load(Ordering::Relaxed);
335 (schema_aware as f64 / total as f64) * 100.0
336 }
337
338 pub fn classification_hit_rate(&self) -> f64 {
340 let hits = self.classification_hits.load(Ordering::Relaxed);
341 let misses = self.classification_misses.load(Ordering::Relaxed);
342 let total = hits + misses;
343 if total == 0 {
344 return 0.0;
345 }
346 hits as f64 / total as f64
347 }
348
349 pub fn primary_replica_ratio(&self) -> f64 {
351 let primary = self.primary_routes.load(Ordering::Relaxed);
352 let replica = self.replica_routes.load(Ordering::Relaxed);
353 let total = primary + replica;
354 if total == 0 {
355 return 0.0;
356 }
357 primary as f64 / total as f64
358 }
359}
360
361#[derive(Debug, Clone, Default)]
363pub struct NodeStats {
364 pub node_id: String,
366 pub total_queries: u64,
368 pub avg_latency_us: u64,
370 pub error_count: u64,
372 pub load_factor: f64,
374 pub last_query_time: Option<Instant>,
376}
377
378#[derive(Debug, Clone, Default)]
380pub struct ShardStats {
381 pub shard_id: u32,
383 pub total_queries: u64,
385 pub tables: Vec<String>,
387 pub estimated_rows: u64,
389 pub size_bytes: u64,
391}
392
393impl SchemaRoutingMetrics {
394 pub fn new() -> Self {
396 Self {
397 table_stats: Arc::new(RwLock::new(HashMap::new())),
398 workload_stats: Arc::new(RwLock::new(HashMap::new())),
399 temperature_stats: Arc::new(RwLock::new(HashMap::new())),
400 ai_stats: Arc::new(RwLock::new(AIWorkloadStats::default())),
401 rag_stats: Arc::new(RwLock::new(RAGStats::default())),
402 routing_stats: Arc::new(RoutingStats::default()),
403 node_stats: Arc::new(RwLock::new(HashMap::new())),
404 shard_stats: Arc::new(RwLock::new(HashMap::new())),
405 start_time: Instant::now(),
406 }
407 }
408
409 pub async fn record_routing(&self, decision: &RoutingDecision, latency_us: u64) {
411 self.routing_stats
412 .total_queries
413 .fetch_add(1, Ordering::Relaxed);
414
415 match &decision.target {
416 super::RouteTarget::Primary => {
417 self.routing_stats
418 .primary_routes
419 .fetch_add(1, Ordering::Relaxed);
420 }
421 super::RouteTarget::Node(_) => {
422 self.routing_stats
423 .replica_routes
424 .fetch_add(1, Ordering::Relaxed);
425 }
426 super::RouteTarget::Shard(_) => {
427 self.routing_stats
428 .shard_targeted
429 .fetch_add(1, Ordering::Relaxed);
430 }
431 super::RouteTarget::ScatterGather => {
432 self.routing_stats
433 .scatter_gather
434 .fetch_add(1, Ordering::Relaxed);
435 }
436 }
437
438 if let super::RouteTarget::Node(node_id) = &decision.target {
440 let mut node_stats = self.node_stats.write().await;
441 let stats = node_stats
442 .entry(node_id.clone())
443 .or_insert_with(|| NodeStats {
444 node_id: node_id.clone(),
445 ..Default::default()
446 });
447 stats.total_queries += 1;
448 stats.avg_latency_us = (stats.avg_latency_us * 9 + latency_us) / 10;
449 stats.last_query_time = Some(Instant::now());
450 }
451
452 if !decision.shards.is_empty() {
454 let mut shard_stats = self.shard_stats.write().await;
455 for shard_id in &decision.shards {
456 let stats = shard_stats.entry(*shard_id).or_default();
457 stats.shard_id = *shard_id;
458 stats.total_queries += 1;
459 }
460 }
461 }
462
463 pub async fn record_table_query(
465 &self,
466 table: &str,
467 pattern: AccessPattern,
468 workload: WorkloadType,
469 latency_us: u64,
470 ) {
471 let mut table_stats = self.table_stats.write().await;
472 let stats = table_stats
473 .entry(table.to_string())
474 .or_insert_with(|| TableStats::new(table));
475 stats.record_query(pattern, workload, latency_us);
476 }
477
478 pub async fn record_workload(
480 &self,
481 workload: WorkloadType,
482 to_primary: bool,
483 is_scatter: bool,
484 latency_us: u64,
485 ) {
486 let mut workload_stats = self.workload_stats.write().await;
487 let stats = workload_stats.entry(workload).or_default();
488 stats.record(to_primary, is_scatter, latency_us);
489 }
490
491 pub async fn record_ai_workload(
493 &self,
494 workload_type: AIWorkloadType,
495 vector_dims: Option<u64>,
496 top_k: Option<u64>,
497 ) {
498 self.routing_stats.ai_routes.fetch_add(1, Ordering::Relaxed);
499 let mut ai_stats = self.ai_stats.write().await;
500 ai_stats.record(workload_type, vector_dims, top_k);
501 }
502
503 pub async fn record_rag_stage(&self, stage: RAGStage, latency_us: u64) {
505 self.routing_stats
506 .rag_routes
507 .fetch_add(1, Ordering::Relaxed);
508 let mut rag_stats = self.rag_stats.write().await;
509 rag_stats.record_stage(stage, latency_us);
510 }
511
512 pub fn record_classification_lookup(&self, hit: bool) {
514 if hit {
515 self.routing_stats
516 .classification_hits
517 .fetch_add(1, Ordering::Relaxed);
518 } else {
519 self.routing_stats
520 .classification_misses
521 .fetch_add(1, Ordering::Relaxed);
522 }
523 }
524
525 pub fn record_error(&self) {
527 self.routing_stats
528 .routing_errors
529 .fetch_add(1, Ordering::Relaxed);
530 }
531
532 pub fn get_routing_stats(&self) -> &RoutingStats {
534 &self.routing_stats
535 }
536
537 pub async fn get_table_stats(&self, table: &str) -> Option<TableStats> {
539 let stats = self.table_stats.read().await;
540 stats.get(table).cloned()
541 }
542
543 pub async fn get_all_table_stats(&self) -> HashMap<String, TableStats> {
545 self.table_stats.read().await.clone()
546 }
547
548 pub async fn get_workload_stats(&self, workload: WorkloadType) -> Option<WorkloadStats> {
550 let stats = self.workload_stats.read().await;
551 stats.get(&workload).cloned()
552 }
553
554 pub async fn get_all_workload_stats(&self) -> HashMap<WorkloadType, WorkloadStats> {
556 self.workload_stats.read().await.clone()
557 }
558
559 pub async fn get_ai_stats(&self) -> AIWorkloadStats {
561 self.ai_stats.read().await.clone()
562 }
563
564 pub async fn get_rag_stats(&self) -> RAGStats {
566 self.rag_stats.read().await.clone()
567 }
568
569 pub async fn get_node_stats(&self, node_id: &str) -> Option<NodeStats> {
571 let stats = self.node_stats.read().await;
572 stats.get(node_id).cloned()
573 }
574
575 pub async fn get_all_node_stats(&self) -> HashMap<String, NodeStats> {
577 self.node_stats.read().await.clone()
578 }
579
580 pub async fn get_shard_stats(&self, shard_id: u32) -> Option<ShardStats> {
582 let stats = self.shard_stats.read().await;
583 stats.get(&shard_id).cloned()
584 }
585
586 pub fn uptime(&self) -> Duration {
588 self.start_time.elapsed()
589 }
590
591 pub async fn reset(&self) {
593 self.table_stats.write().await.clear();
594 self.workload_stats.write().await.clear();
595 self.temperature_stats.write().await.clear();
596 *self.ai_stats.write().await = AIWorkloadStats::default();
597 *self.rag_stats.write().await = RAGStats::default();
598 self.node_stats.write().await.clear();
599 self.shard_stats.write().await.clear();
600
601 self.routing_stats.total_queries.store(0, Ordering::Relaxed);
603 self.routing_stats
604 .schema_aware_routes
605 .store(0, Ordering::Relaxed);
606 self.routing_stats
607 .fallback_routes
608 .store(0, Ordering::Relaxed);
609 self.routing_stats
610 .shard_targeted
611 .store(0, Ordering::Relaxed);
612 self.routing_stats
613 .scatter_gather
614 .store(0, Ordering::Relaxed);
615 self.routing_stats
616 .primary_routes
617 .store(0, Ordering::Relaxed);
618 self.routing_stats
619 .replica_routes
620 .store(0, Ordering::Relaxed);
621 self.routing_stats.ai_routes.store(0, Ordering::Relaxed);
622 self.routing_stats.rag_routes.store(0, Ordering::Relaxed);
623 self.routing_stats.vector_routes.store(0, Ordering::Relaxed);
624 self.routing_stats
625 .classification_hits
626 .store(0, Ordering::Relaxed);
627 self.routing_stats
628 .classification_misses
629 .store(0, Ordering::Relaxed);
630 self.routing_stats
631 .routing_errors
632 .store(0, Ordering::Relaxed);
633 }
634
635 pub async fn generate_report(&self) -> MetricsReport {
637 let routing = self.get_routing_stats();
638 let tables = self.get_all_table_stats().await;
639 let _workloads = self.get_all_workload_stats().await;
640 let ai = self.get_ai_stats().await;
641 let rag = self.get_rag_stats().await;
642 let nodes = self.get_all_node_stats().await;
643
644 MetricsReport {
645 uptime: self.uptime(),
646 total_queries: routing.total_queries(),
647 schema_aware_percentage: routing.schema_aware_percentage(),
648 classification_hit_rate: routing.classification_hit_rate(),
649 primary_replica_ratio: routing.primary_replica_ratio(),
650 table_count: tables.len(),
651 active_nodes: nodes.len(),
652 ai_query_percentage: if routing.total_queries() == 0 {
653 0.0
654 } else {
655 (ai.total_queries as f64 / routing.total_queries() as f64) * 100.0
656 },
657 rag_query_percentage: if routing.total_queries() == 0 {
658 0.0
659 } else {
660 (rag.total_queries as f64 / routing.total_queries() as f64) * 100.0
661 },
662 error_rate: if routing.total_queries() == 0 {
663 0.0
664 } else {
665 (routing.routing_errors.load(Ordering::Relaxed) as f64
666 / routing.total_queries() as f64)
667 * 100.0
668 },
669 }
670 }
671}
672
673impl Default for SchemaRoutingMetrics {
674 fn default() -> Self {
675 Self::new()
676 }
677}
678
679impl SchemaRoutingMetrics {
680 pub fn get_table_stats_for_admin(&self) -> Vec<(String, TableStatsForAdmin)> {
682 let table_stats = self.table_stats.clone();
684 let handle = tokio::runtime::Handle::try_current();
685 match handle {
686 Ok(h) => {
687 let stats = h.block_on(async {
688 let guard = table_stats.read().await;
689 guard
690 .iter()
691 .map(|(name, stats)| {
692 (
693 name.clone(),
694 TableStatsForAdmin {
695 query_count: stats.total_queries,
696 avg_latency_ms: stats.avg_latency_us as f64 / 1000.0,
697 cache_hit_rate: stats.cache_hit_rate,
698 temperature: infer_temperature_from_count(stats.total_queries),
699 workload: infer_workload_from_stats(stats),
700 },
701 )
702 })
703 .collect::<Vec<_>>()
704 });
705 stats
706 }
707 Err(_) => Vec::new(),
708 }
709 }
710
711 pub fn get_workload_stats_for_admin(&self) -> Vec<(WorkloadType, WorkloadStatsForAdmin)> {
713 let workload_stats = self.workload_stats.clone();
714 let handle = tokio::runtime::Handle::try_current();
715 match handle {
716 Ok(h) => h.block_on(async {
717 let guard = workload_stats.read().await;
718 guard
719 .iter()
720 .map(|(workload, stats)| {
721 (
722 *workload,
723 WorkloadStatsForAdmin {
724 query_count: stats.total_queries,
725 avg_latency_ms: stats.avg_latency_us as f64 / 1000.0,
726 queries_to_primary: stats.routed_to_primary,
727 queries_to_replica: stats.routed_to_replica,
728 },
729 )
730 })
731 .collect::<Vec<_>>()
732 }),
733 Err(_) => Vec::new(),
734 }
735 }
736
737 pub fn get_ai_workload_stats(&self) -> AIWorkloadStatsForAdmin {
739 let ai_stats = self.ai_stats.clone();
740 let handle = tokio::runtime::Handle::try_current();
741 match handle {
742 Ok(h) => h.block_on(async {
743 let guard = ai_stats.read().await;
744 AIWorkloadStatsForAdmin {
745 embedding_retrieval_count: guard.embedding_retrieval,
746 context_lookup_count: guard.context_lookup,
747 knowledge_base_count: guard.knowledge_base,
748 tool_execution_count: guard.tool_execution,
749 avg_vector_dimensions: guard.avg_vector_dimensions as f64,
750 }
751 }),
752 Err(_) => AIWorkloadStatsForAdmin::default(),
753 }
754 }
755
756 pub fn get_rag_stats_for_admin(&self) -> RAGStatsForAdmin {
758 let rag_stats = self.rag_stats.clone();
759 let handle = tokio::runtime::Handle::try_current();
760 match handle {
761 Ok(h) => h.block_on(async {
762 let guard = rag_stats.read().await;
763 RAGStatsForAdmin {
764 retrieval_count: guard.retrieval_count,
765 avg_retrieval_latency_ms: guard.avg_retrieval_latency_us as f64 / 1000.0,
766 fetch_count: guard.fetch_count,
767 avg_fetch_latency_ms: guard.avg_fetch_latency_us as f64 / 1000.0,
768 total_pipeline_executions: guard.total_queries,
769 avg_total_latency_ms: guard.avg_pipeline_latency_us as f64 / 1000.0,
770 }
771 }),
772 Err(_) => RAGStatsForAdmin::default(),
773 }
774 }
775}
776
777fn infer_temperature_from_count(query_count: u64) -> DataTemperature {
779 if query_count > 10000 {
780 DataTemperature::Hot
781 } else if query_count > 1000 {
782 DataTemperature::Warm
783 } else if query_count > 100 {
784 DataTemperature::Cold
785 } else {
786 DataTemperature::Frozen
787 }
788}
789
790fn infer_workload_from_stats(stats: &TableStats) -> WorkloadType {
792 let olap_count = stats
793 .by_access_pattern
794 .get(&AccessPattern::FullScan)
795 .copied()
796 .unwrap_or(0);
797 let vector_count = stats
798 .by_access_pattern
799 .get(&AccessPattern::VectorSearch)
800 .copied()
801 .unwrap_or(0);
802 let point_count = stats
803 .by_access_pattern
804 .get(&AccessPattern::PointLookup)
805 .copied()
806 .unwrap_or(0);
807
808 if vector_count > stats.total_queries / 3 {
809 WorkloadType::Vector
810 } else if olap_count > stats.total_queries / 2 {
811 WorkloadType::OLAP
812 } else if point_count > stats.total_queries / 2 {
813 WorkloadType::OLTP
814 } else {
815 WorkloadType::Mixed
816 }
817}
818
819#[derive(Debug, Clone)]
821pub struct TableStatsForAdmin {
822 pub query_count: u64,
823 pub avg_latency_ms: f64,
824 pub cache_hit_rate: f64,
825 pub temperature: DataTemperature,
826 pub workload: WorkloadType,
827}
828
829#[derive(Debug, Clone)]
831pub struct WorkloadStatsForAdmin {
832 pub query_count: u64,
833 pub avg_latency_ms: f64,
834 pub queries_to_primary: u64,
835 pub queries_to_replica: u64,
836}
837
838#[derive(Debug, Clone, Default)]
840pub struct AIWorkloadStatsForAdmin {
841 pub embedding_retrieval_count: u64,
842 pub context_lookup_count: u64,
843 pub knowledge_base_count: u64,
844 pub tool_execution_count: u64,
845 pub avg_vector_dimensions: f64,
846}
847
848impl AIWorkloadStatsForAdmin {
849 pub fn total_ai_queries(&self) -> u64 {
851 self.embedding_retrieval_count
852 + self.context_lookup_count
853 + self.knowledge_base_count
854 + self.tool_execution_count
855 }
856}
857
858#[derive(Debug, Clone, Default)]
860pub struct RAGStatsForAdmin {
861 pub retrieval_count: u64,
862 pub avg_retrieval_latency_ms: f64,
863 pub fetch_count: u64,
864 pub avg_fetch_latency_ms: f64,
865 pub total_pipeline_executions: u64,
866 pub avg_total_latency_ms: f64,
867}
868
869#[derive(Debug, Clone)]
871pub struct MetricsReport {
872 pub uptime: Duration,
874 pub total_queries: u64,
876 pub schema_aware_percentage: f64,
878 pub classification_hit_rate: f64,
880 pub primary_replica_ratio: f64,
882 pub table_count: usize,
884 pub active_nodes: usize,
886 pub ai_query_percentage: f64,
888 pub rag_query_percentage: f64,
890 pub error_rate: f64,
892}
893
894#[cfg(test)]
895mod tests {
896 use super::super::{RouteTarget, RoutingReason};
897 use super::*;
898
899 fn sample_decision() -> RoutingDecision {
900 RoutingDecision {
901 target: RouteTarget::Primary,
902 reason: RoutingReason::WriteQuery,
903 shards: vec![],
904 branch: None,
905 node_info: None,
906 }
907 }
908
909 #[tokio::test]
910 async fn test_metrics_new() {
911 let metrics = SchemaRoutingMetrics::new();
912 assert_eq!(metrics.get_routing_stats().total_queries(), 0);
913 }
914
915 #[tokio::test]
916 async fn test_record_routing() {
917 let metrics = SchemaRoutingMetrics::new();
918 let decision = sample_decision();
919
920 metrics.record_routing(&decision, 1000).await;
921
922 assert_eq!(metrics.get_routing_stats().total_queries(), 1);
923 assert_eq!(
924 metrics
925 .get_routing_stats()
926 .primary_routes
927 .load(Ordering::Relaxed),
928 1
929 );
930 }
931
932 #[tokio::test]
933 async fn test_record_table_query() {
934 let metrics = SchemaRoutingMetrics::new();
935
936 metrics
937 .record_table_query("users", AccessPattern::PointLookup, WorkloadType::OLTP, 500)
938 .await;
939 metrics
940 .record_table_query("users", AccessPattern::PointLookup, WorkloadType::OLTP, 600)
941 .await;
942
943 let stats = metrics.get_table_stats("users").await.unwrap();
944 assert_eq!(stats.total_queries, 2);
945 assert_eq!(
946 *stats
947 .by_access_pattern
948 .get(&AccessPattern::PointLookup)
949 .unwrap(),
950 2
951 );
952 }
953
954 #[tokio::test]
955 async fn test_record_workload() {
956 let metrics = SchemaRoutingMetrics::new();
957
958 metrics
959 .record_workload(WorkloadType::OLTP, true, false, 100)
960 .await;
961 metrics
962 .record_workload(WorkloadType::OLTP, false, false, 200)
963 .await;
964
965 let stats = metrics
966 .get_workload_stats(WorkloadType::OLTP)
967 .await
968 .unwrap();
969 assert_eq!(stats.total_queries, 2);
970 assert_eq!(stats.routed_to_primary, 1);
971 assert_eq!(stats.routed_to_replica, 1);
972 }
973
974 #[tokio::test]
975 async fn test_record_ai_workload() {
976 let metrics = SchemaRoutingMetrics::new();
977
978 metrics
979 .record_ai_workload(AIWorkloadType::EmbeddingRetrieval, Some(1536), Some(10))
980 .await;
981 metrics
982 .record_ai_workload(AIWorkloadType::ContextLookup, None, None)
983 .await;
984
985 let stats = metrics.get_ai_stats().await;
986 assert_eq!(stats.total_queries, 2);
987 assert_eq!(stats.embedding_retrieval, 1);
988 assert_eq!(stats.context_lookup, 1);
989 }
990
991 #[tokio::test]
992 async fn test_record_rag_stage() {
993 let metrics = SchemaRoutingMetrics::new();
994
995 metrics.record_rag_stage(RAGStage::Retrieval, 5000).await;
996 metrics.record_rag_stage(RAGStage::Fetch, 2000).await;
997
998 let stats = metrics.get_rag_stats().await;
999 assert_eq!(stats.total_queries, 2);
1000 assert_eq!(stats.retrieval_count, 1);
1001 assert_eq!(stats.fetch_count, 1);
1002 }
1003
1004 #[tokio::test]
1005 async fn test_record_node_stats() {
1006 let metrics = SchemaRoutingMetrics::new();
1007
1008 let decision = RoutingDecision {
1009 target: RouteTarget::Node("node1".to_string()),
1010 reason: RoutingReason::LowLatency,
1011 shards: vec![],
1012 branch: None,
1013 node_info: None,
1014 };
1015
1016 metrics.record_routing(&decision, 1000).await;
1017 metrics.record_routing(&decision, 2000).await;
1018
1019 let stats = metrics.get_node_stats("node1").await.unwrap();
1020 assert_eq!(stats.total_queries, 2);
1021 }
1022
1023 #[tokio::test]
1024 async fn test_record_shard_stats() {
1025 let metrics = SchemaRoutingMetrics::new();
1026
1027 let decision = RoutingDecision {
1028 target: RouteTarget::Shard(5),
1029 reason: RoutingReason::ShardKey,
1030 shards: vec![5],
1031 branch: None,
1032 node_info: None,
1033 };
1034
1035 metrics.record_routing(&decision, 1000).await;
1036
1037 let stats = metrics.get_shard_stats(5).await.unwrap();
1038 assert_eq!(stats.total_queries, 1);
1039 }
1040
1041 #[tokio::test]
1042 async fn test_classification_lookup() {
1043 let metrics = SchemaRoutingMetrics::new();
1044
1045 metrics.record_classification_lookup(true);
1046 metrics.record_classification_lookup(true);
1047 metrics.record_classification_lookup(false);
1048
1049 assert_eq!(
1050 metrics.get_routing_stats().classification_hit_rate(),
1051 2.0 / 3.0
1052 );
1053 }
1054
1055 #[tokio::test]
1056 async fn test_record_error() {
1057 let metrics = SchemaRoutingMetrics::new();
1058
1059 metrics.record_error();
1060 metrics.record_error();
1061
1062 assert_eq!(
1063 metrics
1064 .get_routing_stats()
1065 .routing_errors
1066 .load(Ordering::Relaxed),
1067 2
1068 );
1069 }
1070
1071 #[tokio::test]
1072 async fn test_reset_metrics() {
1073 let metrics = SchemaRoutingMetrics::new();
1074
1075 metrics.record_routing(&sample_decision(), 1000).await;
1077 metrics
1078 .record_table_query("users", AccessPattern::PointLookup, WorkloadType::OLTP, 500)
1079 .await;
1080 metrics
1081 .record_ai_workload(AIWorkloadType::EmbeddingRetrieval, None, None)
1082 .await;
1083
1084 metrics.reset().await;
1086
1087 assert_eq!(metrics.get_routing_stats().total_queries(), 0);
1089 assert!(metrics.get_table_stats("users").await.is_none());
1090 assert_eq!(metrics.get_ai_stats().await.total_queries, 0);
1091 }
1092
1093 #[tokio::test]
1094 async fn test_generate_report() {
1095 let metrics = SchemaRoutingMetrics::new();
1096
1097 for _ in 0..10 {
1099 metrics.record_routing(&sample_decision(), 1000).await;
1100 }
1101 metrics
1102 .record_table_query("users", AccessPattern::PointLookup, WorkloadType::OLTP, 500)
1103 .await;
1104 metrics
1105 .record_ai_workload(AIWorkloadType::EmbeddingRetrieval, None, None)
1106 .await;
1107 metrics.record_rag_stage(RAGStage::Retrieval, 5000).await;
1108
1109 let report = metrics.generate_report().await;
1110
1111 assert_eq!(report.total_queries, 10);
1112 assert_eq!(report.table_count, 1);
1113 }
1114
1115 #[test]
1116 fn test_routing_stats_percentages() {
1117 let stats = RoutingStats::default();
1118
1119 assert_eq!(stats.schema_aware_percentage(), 0.0);
1121 assert_eq!(stats.classification_hit_rate(), 0.0);
1122 assert_eq!(stats.primary_replica_ratio(), 0.0);
1123
1124 stats.total_queries.store(100, Ordering::Relaxed);
1126 stats.schema_aware_routes.store(80, Ordering::Relaxed);
1127 stats.classification_hits.store(90, Ordering::Relaxed);
1128 stats.classification_misses.store(10, Ordering::Relaxed);
1129 stats.primary_routes.store(30, Ordering::Relaxed);
1130 stats.replica_routes.store(70, Ordering::Relaxed);
1131
1132 assert_eq!(stats.schema_aware_percentage(), 80.0);
1133 assert_eq!(stats.classification_hit_rate(), 0.9);
1134 assert_eq!(stats.primary_replica_ratio(), 0.3);
1135 }
1136
1137 #[test]
1138 fn test_table_stats_record() {
1139 let mut stats = TableStats::new("orders");
1140
1141 stats.record_query(AccessPattern::PointLookup, WorkloadType::OLTP, 100);
1142 stats.record_query(AccessPattern::RangeScan, WorkloadType::OLTP, 200);
1143
1144 assert_eq!(stats.total_queries, 2);
1145 assert_eq!(
1146 *stats
1147 .by_access_pattern
1148 .get(&AccessPattern::PointLookup)
1149 .unwrap(),
1150 1
1151 );
1152 assert_eq!(
1153 *stats
1154 .by_access_pattern
1155 .get(&AccessPattern::RangeScan)
1156 .unwrap(),
1157 1
1158 );
1159 }
1160
1161 #[test]
1162 fn test_workload_stats_record() {
1163 let mut stats = WorkloadStats::default();
1164
1165 stats.record(true, false, 100);
1166 stats.record(false, false, 200);
1167 stats.record(false, true, 300);
1168
1169 assert_eq!(stats.total_queries, 3);
1170 assert_eq!(stats.routed_to_primary, 1);
1171 assert_eq!(stats.routed_to_replica, 1);
1172 assert_eq!(stats.scatter_gather, 1);
1173 }
1174
1175 #[test]
1176 fn test_ai_stats_record() {
1177 let mut stats = AIWorkloadStats::default();
1178
1179 stats.record(AIWorkloadType::EmbeddingRetrieval, Some(1536), Some(10));
1180 stats.record(AIWorkloadType::KnowledgeBase, None, None);
1181
1182 assert_eq!(stats.total_queries, 2);
1183 assert_eq!(stats.embedding_retrieval, 1);
1184 assert_eq!(stats.knowledge_base, 1);
1185 }
1186
1187 #[test]
1188 fn test_rag_stats_record() {
1189 let mut stats = RAGStats::default();
1190
1191 stats.record_stage(RAGStage::Retrieval, 5000);
1192 stats.record_stage(RAGStage::Fetch, 2000);
1193 stats.record_stage(RAGStage::Rerank, 1000);
1194
1195 assert_eq!(stats.total_queries, 3);
1196 assert_eq!(stats.retrieval_count, 1);
1197 assert_eq!(stats.fetch_count, 1);
1198 assert_eq!(stats.rerank_count, 1);
1199 }
1200}