1use std::collections::HashMap;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10use tokio::sync::RwLock;
11
12use super::{
13 DataTemperature, WorkloadType, AccessPattern,
14 RoutingDecision, AIWorkloadType, RAGStage,
15};
16
17pub struct SchemaRoutingMetrics {
19 table_stats: Arc<RwLock<HashMap<String, TableStats>>>,
21 workload_stats: Arc<RwLock<HashMap<WorkloadType, WorkloadStats>>>,
23 temperature_stats: Arc<RwLock<HashMap<DataTemperature, TemperatureStats>>>,
25 ai_stats: Arc<RwLock<AIWorkloadStats>>,
27 rag_stats: Arc<RwLock<RAGStats>>,
29 routing_stats: Arc<RoutingStats>,
31 node_stats: Arc<RwLock<HashMap<String, NodeStats>>>,
33 shard_stats: Arc<RwLock<HashMap<u32, ShardStats>>>,
35 start_time: Instant,
37}
38
39#[derive(Debug, Clone, Default)]
41pub struct TableStats {
42 pub table_name: String,
44 pub total_queries: u64,
46 pub by_access_pattern: HashMap<AccessPattern, u64>,
48 pub by_workload: HashMap<WorkloadType, u64>,
50 pub avg_latency_us: u64,
52 pub p99_latency_us: u64,
54 pub shard_hit_rate: f64,
56 pub cache_hit_rate: f64,
58 pub last_query_time: Option<Instant>,
60}
61
62impl TableStats {
63 pub fn new(table_name: &str) -> Self {
65 Self {
66 table_name: table_name.to_string(),
67 ..Default::default()
68 }
69 }
70
71 pub fn record_query(&mut self, pattern: AccessPattern, workload: WorkloadType, latency_us: u64) {
73 self.total_queries += 1;
74 *self.by_access_pattern.entry(pattern).or_insert(0) += 1;
75 *self.by_workload.entry(workload).or_insert(0) += 1;
76
77 if self.avg_latency_us == 0 {
79 self.avg_latency_us = latency_us;
80 } else {
81 self.avg_latency_us = (self.avg_latency_us * 9 + latency_us) / 10;
82 }
83
84 self.last_query_time = Some(Instant::now());
85 }
86}
87
88#[derive(Debug, Clone, Default)]
90pub struct WorkloadStats {
91 pub total_queries: u64,
93 pub routed_to_primary: u64,
95 pub routed_to_replica: u64,
97 pub scatter_gather: u64,
99 pub avg_latency_us: u64,
101 pub tables: Vec<String>,
103}
104
105impl WorkloadStats {
106 pub fn record(&mut self, to_primary: bool, is_scatter: bool, latency_us: u64) {
108 self.total_queries += 1;
109
110 if is_scatter {
111 self.scatter_gather += 1;
112 } else if to_primary {
113 self.routed_to_primary += 1;
114 } else {
115 self.routed_to_replica += 1;
116 }
117
118 if self.avg_latency_us == 0 {
120 self.avg_latency_us = latency_us;
121 } else {
122 self.avg_latency_us = (self.avg_latency_us * 9 + latency_us) / 10;
123 }
124 }
125}
126
127#[derive(Debug, Clone, Default)]
129pub struct TemperatureStats {
130 pub total_queries: u64,
132 pub table_count: u64,
134 pub total_size_bytes: u64,
136 pub cache_hit_rate: f64,
138 pub avg_latency_us: u64,
140}
141
142#[derive(Debug, Clone, Default)]
144pub struct AIWorkloadStats {
145 pub total_queries: u64,
147 pub by_type: HashMap<String, u64>,
149 pub embedding_retrieval: u64,
151 pub context_lookup: u64,
153 pub knowledge_base: u64,
155 pub tool_execution: u64,
157 pub avg_vector_dimensions: u64,
159 pub avg_top_k: u64,
161}
162
163impl AIWorkloadStats {
164 pub fn record(&mut self, workload_type: AIWorkloadType, vector_dims: Option<u64>, top_k: Option<u64>) {
166 self.total_queries += 1;
167
168 match workload_type {
169 AIWorkloadType::EmbeddingRetrieval => {
170 self.embedding_retrieval += 1;
171 *self.by_type.entry("embedding_retrieval".to_string()).or_insert(0) += 1;
172 }
173 AIWorkloadType::ContextLookup => {
174 self.context_lookup += 1;
175 *self.by_type.entry("context_lookup".to_string()).or_insert(0) += 1;
176 }
177 AIWorkloadType::KnowledgeBase => {
178 self.knowledge_base += 1;
179 *self.by_type.entry("knowledge_base".to_string()).or_insert(0) += 1;
180 }
181 AIWorkloadType::ToolExecution => {
182 self.tool_execution += 1;
183 *self.by_type.entry("tool_execution".to_string()).or_insert(0) += 1;
184 }
185 }
186
187 if let Some(dims) = vector_dims {
188 self.avg_vector_dimensions = (self.avg_vector_dimensions * 9 + dims) / 10;
189 }
190
191 if let Some(k) = top_k {
192 self.avg_top_k = (self.avg_top_k * 9 + k) / 10;
193 }
194 }
195}
196
197#[derive(Debug, Clone, Default)]
199pub struct RAGStats {
200 pub total_queries: u64,
202 pub retrieval_count: u64,
204 pub fetch_count: u64,
206 pub rerank_count: u64,
208 pub generate_count: u64,
210 pub avg_retrieval_latency_us: u64,
212 pub avg_fetch_latency_us: u64,
214 pub avg_pipeline_latency_us: u64,
216}
217
218impl RAGStats {
219 pub fn record_stage(&mut self, stage: RAGStage, latency_us: u64) {
221 self.total_queries += 1;
222
223 match stage {
224 RAGStage::Retrieval => {
225 self.retrieval_count += 1;
226 if self.avg_retrieval_latency_us == 0 {
227 self.avg_retrieval_latency_us = latency_us;
228 } else {
229 self.avg_retrieval_latency_us = (self.avg_retrieval_latency_us * 9 + latency_us) / 10;
230 }
231 }
232 RAGStage::Fetch => {
233 self.fetch_count += 1;
234 if self.avg_fetch_latency_us == 0 {
235 self.avg_fetch_latency_us = latency_us;
236 } else {
237 self.avg_fetch_latency_us = (self.avg_fetch_latency_us * 9 + latency_us) / 10;
238 }
239 }
240 RAGStage::Rerank => {
241 self.rerank_count += 1;
242 }
243 RAGStage::Generate => {
244 self.generate_count += 1;
245 }
246 }
247 }
248}
249
250pub struct RoutingStats {
252 pub total_queries: AtomicU64,
254 pub schema_aware_routes: AtomicU64,
256 pub fallback_routes: AtomicU64,
258 pub shard_targeted: AtomicU64,
260 pub scatter_gather: AtomicU64,
262 pub primary_routes: AtomicU64,
264 pub replica_routes: AtomicU64,
266 pub ai_routes: AtomicU64,
268 pub rag_routes: AtomicU64,
270 pub vector_routes: AtomicU64,
272 pub classification_hits: AtomicU64,
274 pub classification_misses: AtomicU64,
276 pub routing_errors: AtomicU64,
278}
279
280impl Default for RoutingStats {
281 fn default() -> Self {
282 Self {
283 total_queries: AtomicU64::new(0),
284 schema_aware_routes: AtomicU64::new(0),
285 fallback_routes: AtomicU64::new(0),
286 shard_targeted: AtomicU64::new(0),
287 scatter_gather: AtomicU64::new(0),
288 primary_routes: AtomicU64::new(0),
289 replica_routes: AtomicU64::new(0),
290 ai_routes: AtomicU64::new(0),
291 rag_routes: AtomicU64::new(0),
292 vector_routes: AtomicU64::new(0),
293 classification_hits: AtomicU64::new(0),
294 classification_misses: AtomicU64::new(0),
295 routing_errors: AtomicU64::new(0),
296 }
297 }
298}
299
300impl RoutingStats {
301 pub fn total_queries(&self) -> u64 {
303 self.total_queries.load(Ordering::Relaxed)
304 }
305
306 pub fn schema_aware_percentage(&self) -> f64 {
308 let total = self.total_queries.load(Ordering::Relaxed);
309 if total == 0 {
310 return 0.0;
311 }
312 let schema_aware = self.schema_aware_routes.load(Ordering::Relaxed);
313 (schema_aware as f64 / total as f64) * 100.0
314 }
315
316 pub fn classification_hit_rate(&self) -> f64 {
318 let hits = self.classification_hits.load(Ordering::Relaxed);
319 let misses = self.classification_misses.load(Ordering::Relaxed);
320 let total = hits + misses;
321 if total == 0 {
322 return 0.0;
323 }
324 hits as f64 / total as f64
325 }
326
327 pub fn primary_replica_ratio(&self) -> f64 {
329 let primary = self.primary_routes.load(Ordering::Relaxed);
330 let replica = self.replica_routes.load(Ordering::Relaxed);
331 let total = primary + replica;
332 if total == 0 {
333 return 0.0;
334 }
335 primary as f64 / total as f64
336 }
337}
338
339#[derive(Debug, Clone, Default)]
341pub struct NodeStats {
342 pub node_id: String,
344 pub total_queries: u64,
346 pub avg_latency_us: u64,
348 pub error_count: u64,
350 pub load_factor: f64,
352 pub last_query_time: Option<Instant>,
354}
355
356#[derive(Debug, Clone, Default)]
358pub struct ShardStats {
359 pub shard_id: u32,
361 pub total_queries: u64,
363 pub tables: Vec<String>,
365 pub estimated_rows: u64,
367 pub size_bytes: u64,
369}
370
371impl SchemaRoutingMetrics {
372 pub fn new() -> Self {
374 Self {
375 table_stats: Arc::new(RwLock::new(HashMap::new())),
376 workload_stats: Arc::new(RwLock::new(HashMap::new())),
377 temperature_stats: Arc::new(RwLock::new(HashMap::new())),
378 ai_stats: Arc::new(RwLock::new(AIWorkloadStats::default())),
379 rag_stats: Arc::new(RwLock::new(RAGStats::default())),
380 routing_stats: Arc::new(RoutingStats::default()),
381 node_stats: Arc::new(RwLock::new(HashMap::new())),
382 shard_stats: Arc::new(RwLock::new(HashMap::new())),
383 start_time: Instant::now(),
384 }
385 }
386
387 pub async fn record_routing(&self, decision: &RoutingDecision, latency_us: u64) {
389 self.routing_stats.total_queries.fetch_add(1, Ordering::Relaxed);
390
391 match &decision.target {
392 super::RouteTarget::Primary => {
393 self.routing_stats.primary_routes.fetch_add(1, Ordering::Relaxed);
394 }
395 super::RouteTarget::Node(_) => {
396 self.routing_stats.replica_routes.fetch_add(1, Ordering::Relaxed);
397 }
398 super::RouteTarget::Shard(_) => {
399 self.routing_stats.shard_targeted.fetch_add(1, Ordering::Relaxed);
400 }
401 super::RouteTarget::ScatterGather => {
402 self.routing_stats.scatter_gather.fetch_add(1, Ordering::Relaxed);
403 }
404 }
405
406 if let super::RouteTarget::Node(node_id) = &decision.target {
408 let mut node_stats = self.node_stats.write().await;
409 let stats = node_stats.entry(node_id.clone())
410 .or_insert_with(|| NodeStats {
411 node_id: node_id.clone(),
412 ..Default::default()
413 });
414 stats.total_queries += 1;
415 stats.avg_latency_us = (stats.avg_latency_us * 9 + latency_us) / 10;
416 stats.last_query_time = Some(Instant::now());
417 }
418
419 if !decision.shards.is_empty() {
421 let mut shard_stats = self.shard_stats.write().await;
422 for shard_id in &decision.shards {
423 let stats = shard_stats.entry(*shard_id).or_default();
424 stats.shard_id = *shard_id;
425 stats.total_queries += 1;
426 }
427 }
428 }
429
430 pub async fn record_table_query(
432 &self,
433 table: &str,
434 pattern: AccessPattern,
435 workload: WorkloadType,
436 latency_us: u64,
437 ) {
438 let mut table_stats = self.table_stats.write().await;
439 let stats = table_stats.entry(table.to_string())
440 .or_insert_with(|| TableStats::new(table));
441 stats.record_query(pattern, workload, latency_us);
442 }
443
444 pub async fn record_workload(&self, workload: WorkloadType, to_primary: bool, is_scatter: bool, latency_us: u64) {
446 let mut workload_stats = self.workload_stats.write().await;
447 let stats = workload_stats.entry(workload).or_default();
448 stats.record(to_primary, is_scatter, latency_us);
449 }
450
451 pub async fn record_ai_workload(&self, workload_type: AIWorkloadType, vector_dims: Option<u64>, top_k: Option<u64>) {
453 self.routing_stats.ai_routes.fetch_add(1, Ordering::Relaxed);
454 let mut ai_stats = self.ai_stats.write().await;
455 ai_stats.record(workload_type, vector_dims, top_k);
456 }
457
458 pub async fn record_rag_stage(&self, stage: RAGStage, latency_us: u64) {
460 self.routing_stats.rag_routes.fetch_add(1, Ordering::Relaxed);
461 let mut rag_stats = self.rag_stats.write().await;
462 rag_stats.record_stage(stage, latency_us);
463 }
464
465 pub fn record_classification_lookup(&self, hit: bool) {
467 if hit {
468 self.routing_stats.classification_hits.fetch_add(1, Ordering::Relaxed);
469 } else {
470 self.routing_stats.classification_misses.fetch_add(1, Ordering::Relaxed);
471 }
472 }
473
474 pub fn record_error(&self) {
476 self.routing_stats.routing_errors.fetch_add(1, Ordering::Relaxed);
477 }
478
479 pub fn get_routing_stats(&self) -> &RoutingStats {
481 &self.routing_stats
482 }
483
484 pub async fn get_table_stats(&self, table: &str) -> Option<TableStats> {
486 let stats = self.table_stats.read().await;
487 stats.get(table).cloned()
488 }
489
490 pub async fn get_all_table_stats(&self) -> HashMap<String, TableStats> {
492 self.table_stats.read().await.clone()
493 }
494
495 pub async fn get_workload_stats(&self, workload: WorkloadType) -> Option<WorkloadStats> {
497 let stats = self.workload_stats.read().await;
498 stats.get(&workload).cloned()
499 }
500
501 pub async fn get_all_workload_stats(&self) -> HashMap<WorkloadType, WorkloadStats> {
503 self.workload_stats.read().await.clone()
504 }
505
506 pub async fn get_ai_stats(&self) -> AIWorkloadStats {
508 self.ai_stats.read().await.clone()
509 }
510
511 pub async fn get_rag_stats(&self) -> RAGStats {
513 self.rag_stats.read().await.clone()
514 }
515
516 pub async fn get_node_stats(&self, node_id: &str) -> Option<NodeStats> {
518 let stats = self.node_stats.read().await;
519 stats.get(node_id).cloned()
520 }
521
522 pub async fn get_all_node_stats(&self) -> HashMap<String, NodeStats> {
524 self.node_stats.read().await.clone()
525 }
526
527 pub async fn get_shard_stats(&self, shard_id: u32) -> Option<ShardStats> {
529 let stats = self.shard_stats.read().await;
530 stats.get(&shard_id).cloned()
531 }
532
533 pub fn uptime(&self) -> Duration {
535 self.start_time.elapsed()
536 }
537
538 pub async fn reset(&self) {
540 self.table_stats.write().await.clear();
541 self.workload_stats.write().await.clear();
542 self.temperature_stats.write().await.clear();
543 *self.ai_stats.write().await = AIWorkloadStats::default();
544 *self.rag_stats.write().await = RAGStats::default();
545 self.node_stats.write().await.clear();
546 self.shard_stats.write().await.clear();
547
548 self.routing_stats.total_queries.store(0, Ordering::Relaxed);
550 self.routing_stats.schema_aware_routes.store(0, Ordering::Relaxed);
551 self.routing_stats.fallback_routes.store(0, Ordering::Relaxed);
552 self.routing_stats.shard_targeted.store(0, Ordering::Relaxed);
553 self.routing_stats.scatter_gather.store(0, Ordering::Relaxed);
554 self.routing_stats.primary_routes.store(0, Ordering::Relaxed);
555 self.routing_stats.replica_routes.store(0, Ordering::Relaxed);
556 self.routing_stats.ai_routes.store(0, Ordering::Relaxed);
557 self.routing_stats.rag_routes.store(0, Ordering::Relaxed);
558 self.routing_stats.vector_routes.store(0, Ordering::Relaxed);
559 self.routing_stats.classification_hits.store(0, Ordering::Relaxed);
560 self.routing_stats.classification_misses.store(0, Ordering::Relaxed);
561 self.routing_stats.routing_errors.store(0, Ordering::Relaxed);
562 }
563
564 pub async fn generate_report(&self) -> MetricsReport {
566 let routing = self.get_routing_stats();
567 let tables = self.get_all_table_stats().await;
568 let _workloads = self.get_all_workload_stats().await;
569 let ai = self.get_ai_stats().await;
570 let rag = self.get_rag_stats().await;
571 let nodes = self.get_all_node_stats().await;
572
573 MetricsReport {
574 uptime: self.uptime(),
575 total_queries: routing.total_queries(),
576 schema_aware_percentage: routing.schema_aware_percentage(),
577 classification_hit_rate: routing.classification_hit_rate(),
578 primary_replica_ratio: routing.primary_replica_ratio(),
579 table_count: tables.len(),
580 active_nodes: nodes.len(),
581 ai_query_percentage: if routing.total_queries() == 0 {
582 0.0
583 } else {
584 (ai.total_queries as f64 / routing.total_queries() as f64) * 100.0
585 },
586 rag_query_percentage: if routing.total_queries() == 0 {
587 0.0
588 } else {
589 (rag.total_queries as f64 / routing.total_queries() as f64) * 100.0
590 },
591 error_rate: if routing.total_queries() == 0 {
592 0.0
593 } else {
594 (routing.routing_errors.load(Ordering::Relaxed) as f64 / routing.total_queries() as f64) * 100.0
595 },
596 }
597 }
598}
599
600impl Default for SchemaRoutingMetrics {
601 fn default() -> Self {
602 Self::new()
603 }
604}
605
606impl SchemaRoutingMetrics {
607 pub fn get_table_stats_for_admin(&self) -> Vec<(String, TableStatsForAdmin)> {
609 let table_stats = self.table_stats.clone();
611 let handle = tokio::runtime::Handle::try_current();
612 match handle {
613 Ok(h) => {
614 let stats = h.block_on(async {
615 let guard = table_stats.read().await;
616 guard.iter().map(|(name, stats)| {
617 (name.clone(), TableStatsForAdmin {
618 query_count: stats.total_queries,
619 avg_latency_ms: stats.avg_latency_us as f64 / 1000.0,
620 cache_hit_rate: stats.cache_hit_rate,
621 temperature: infer_temperature_from_count(stats.total_queries),
622 workload: infer_workload_from_stats(stats),
623 })
624 }).collect::<Vec<_>>()
625 });
626 stats
627 }
628 Err(_) => Vec::new(),
629 }
630 }
631
632 pub fn get_workload_stats_for_admin(&self) -> Vec<(WorkloadType, WorkloadStatsForAdmin)> {
634 let workload_stats = self.workload_stats.clone();
635 let handle = tokio::runtime::Handle::try_current();
636 match handle {
637 Ok(h) => {
638 h.block_on(async {
639 let guard = workload_stats.read().await;
640 guard.iter().map(|(workload, stats)| {
641 (*workload, WorkloadStatsForAdmin {
642 query_count: stats.total_queries,
643 avg_latency_ms: stats.avg_latency_us as f64 / 1000.0,
644 queries_to_primary: stats.routed_to_primary,
645 queries_to_replica: stats.routed_to_replica,
646 })
647 }).collect::<Vec<_>>()
648 })
649 }
650 Err(_) => Vec::new(),
651 }
652 }
653
654 pub fn get_ai_workload_stats(&self) -> AIWorkloadStatsForAdmin {
656 let ai_stats = self.ai_stats.clone();
657 let handle = tokio::runtime::Handle::try_current();
658 match handle {
659 Ok(h) => {
660 h.block_on(async {
661 let guard = ai_stats.read().await;
662 AIWorkloadStatsForAdmin {
663 embedding_retrieval_count: guard.embedding_retrieval,
664 context_lookup_count: guard.context_lookup,
665 knowledge_base_count: guard.knowledge_base,
666 tool_execution_count: guard.tool_execution,
667 avg_vector_dimensions: guard.avg_vector_dimensions as f64,
668 }
669 })
670 }
671 Err(_) => AIWorkloadStatsForAdmin::default(),
672 }
673 }
674
675 pub fn get_rag_stats_for_admin(&self) -> RAGStatsForAdmin {
677 let rag_stats = self.rag_stats.clone();
678 let handle = tokio::runtime::Handle::try_current();
679 match handle {
680 Ok(h) => {
681 h.block_on(async {
682 let guard = rag_stats.read().await;
683 RAGStatsForAdmin {
684 retrieval_count: guard.retrieval_count,
685 avg_retrieval_latency_ms: guard.avg_retrieval_latency_us as f64 / 1000.0,
686 fetch_count: guard.fetch_count,
687 avg_fetch_latency_ms: guard.avg_fetch_latency_us as f64 / 1000.0,
688 total_pipeline_executions: guard.total_queries,
689 avg_total_latency_ms: guard.avg_pipeline_latency_us as f64 / 1000.0,
690 }
691 })
692 }
693 Err(_) => RAGStatsForAdmin::default(),
694 }
695 }
696
697}
698
699fn infer_temperature_from_count(query_count: u64) -> DataTemperature {
701 if query_count > 10000 {
702 DataTemperature::Hot
703 } else if query_count > 1000 {
704 DataTemperature::Warm
705 } else if query_count > 100 {
706 DataTemperature::Cold
707 } else {
708 DataTemperature::Frozen
709 }
710}
711
712fn infer_workload_from_stats(stats: &TableStats) -> WorkloadType {
714 let olap_count = stats.by_access_pattern.get(&AccessPattern::FullScan).copied().unwrap_or(0);
715 let vector_count = stats.by_access_pattern.get(&AccessPattern::VectorSearch).copied().unwrap_or(0);
716 let point_count = stats.by_access_pattern.get(&AccessPattern::PointLookup).copied().unwrap_or(0);
717
718 if vector_count > stats.total_queries / 3 {
719 WorkloadType::Vector
720 } else if olap_count > stats.total_queries / 2 {
721 WorkloadType::OLAP
722 } else if point_count > stats.total_queries / 2 {
723 WorkloadType::OLTP
724 } else {
725 WorkloadType::Mixed
726 }
727}
728
729#[derive(Debug, Clone)]
731pub struct TableStatsForAdmin {
732 pub query_count: u64,
733 pub avg_latency_ms: f64,
734 pub cache_hit_rate: f64,
735 pub temperature: DataTemperature,
736 pub workload: WorkloadType,
737}
738
739#[derive(Debug, Clone)]
741pub struct WorkloadStatsForAdmin {
742 pub query_count: u64,
743 pub avg_latency_ms: f64,
744 pub queries_to_primary: u64,
745 pub queries_to_replica: u64,
746}
747
748#[derive(Debug, Clone, Default)]
750pub struct AIWorkloadStatsForAdmin {
751 pub embedding_retrieval_count: u64,
752 pub context_lookup_count: u64,
753 pub knowledge_base_count: u64,
754 pub tool_execution_count: u64,
755 pub avg_vector_dimensions: f64,
756}
757
758impl AIWorkloadStatsForAdmin {
759 pub fn total_ai_queries(&self) -> u64 {
761 self.embedding_retrieval_count + self.context_lookup_count +
762 self.knowledge_base_count + self.tool_execution_count
763 }
764}
765
766#[derive(Debug, Clone, Default)]
768pub struct RAGStatsForAdmin {
769 pub retrieval_count: u64,
770 pub avg_retrieval_latency_ms: f64,
771 pub fetch_count: u64,
772 pub avg_fetch_latency_ms: f64,
773 pub total_pipeline_executions: u64,
774 pub avg_total_latency_ms: f64,
775}
776
777#[derive(Debug, Clone)]
779pub struct MetricsReport {
780 pub uptime: Duration,
782 pub total_queries: u64,
784 pub schema_aware_percentage: f64,
786 pub classification_hit_rate: f64,
788 pub primary_replica_ratio: f64,
790 pub table_count: usize,
792 pub active_nodes: usize,
794 pub ai_query_percentage: f64,
796 pub rag_query_percentage: f64,
798 pub error_rate: f64,
800}
801
802#[cfg(test)]
803mod tests {
804 use super::*;
805 use super::super::{RouteTarget, RoutingReason};
806
807 fn sample_decision() -> RoutingDecision {
808 RoutingDecision {
809 target: RouteTarget::Primary,
810 reason: RoutingReason::WriteQuery,
811 shards: vec![],
812 branch: None,
813 node_info: None,
814 }
815 }
816
817 #[tokio::test]
818 async fn test_metrics_new() {
819 let metrics = SchemaRoutingMetrics::new();
820 assert_eq!(metrics.get_routing_stats().total_queries(), 0);
821 }
822
823 #[tokio::test]
824 async fn test_record_routing() {
825 let metrics = SchemaRoutingMetrics::new();
826 let decision = sample_decision();
827
828 metrics.record_routing(&decision, 1000).await;
829
830 assert_eq!(metrics.get_routing_stats().total_queries(), 1);
831 assert_eq!(metrics.get_routing_stats().primary_routes.load(Ordering::Relaxed), 1);
832 }
833
834 #[tokio::test]
835 async fn test_record_table_query() {
836 let metrics = SchemaRoutingMetrics::new();
837
838 metrics.record_table_query("users", AccessPattern::PointLookup, WorkloadType::OLTP, 500).await;
839 metrics.record_table_query("users", AccessPattern::PointLookup, WorkloadType::OLTP, 600).await;
840
841 let stats = metrics.get_table_stats("users").await.unwrap();
842 assert_eq!(stats.total_queries, 2);
843 assert_eq!(*stats.by_access_pattern.get(&AccessPattern::PointLookup).unwrap(), 2);
844 }
845
846 #[tokio::test]
847 async fn test_record_workload() {
848 let metrics = SchemaRoutingMetrics::new();
849
850 metrics.record_workload(WorkloadType::OLTP, true, false, 100).await;
851 metrics.record_workload(WorkloadType::OLTP, false, false, 200).await;
852
853 let stats = metrics.get_workload_stats(WorkloadType::OLTP).await.unwrap();
854 assert_eq!(stats.total_queries, 2);
855 assert_eq!(stats.routed_to_primary, 1);
856 assert_eq!(stats.routed_to_replica, 1);
857 }
858
859 #[tokio::test]
860 async fn test_record_ai_workload() {
861 let metrics = SchemaRoutingMetrics::new();
862
863 metrics.record_ai_workload(AIWorkloadType::EmbeddingRetrieval, Some(1536), Some(10)).await;
864 metrics.record_ai_workload(AIWorkloadType::ContextLookup, None, None).await;
865
866 let stats = metrics.get_ai_stats().await;
867 assert_eq!(stats.total_queries, 2);
868 assert_eq!(stats.embedding_retrieval, 1);
869 assert_eq!(stats.context_lookup, 1);
870 }
871
872 #[tokio::test]
873 async fn test_record_rag_stage() {
874 let metrics = SchemaRoutingMetrics::new();
875
876 metrics.record_rag_stage(RAGStage::Retrieval, 5000).await;
877 metrics.record_rag_stage(RAGStage::Fetch, 2000).await;
878
879 let stats = metrics.get_rag_stats().await;
880 assert_eq!(stats.total_queries, 2);
881 assert_eq!(stats.retrieval_count, 1);
882 assert_eq!(stats.fetch_count, 1);
883 }
884
885 #[tokio::test]
886 async fn test_record_node_stats() {
887 let metrics = SchemaRoutingMetrics::new();
888
889 let decision = RoutingDecision {
890 target: RouteTarget::Node("node1".to_string()),
891 reason: RoutingReason::LowLatency,
892 shards: vec![],
893 branch: None,
894 node_info: None,
895 };
896
897 metrics.record_routing(&decision, 1000).await;
898 metrics.record_routing(&decision, 2000).await;
899
900 let stats = metrics.get_node_stats("node1").await.unwrap();
901 assert_eq!(stats.total_queries, 2);
902 }
903
904 #[tokio::test]
905 async fn test_record_shard_stats() {
906 let metrics = SchemaRoutingMetrics::new();
907
908 let decision = RoutingDecision {
909 target: RouteTarget::Shard(5),
910 reason: RoutingReason::ShardKey,
911 shards: vec![5],
912 branch: None,
913 node_info: None,
914 };
915
916 metrics.record_routing(&decision, 1000).await;
917
918 let stats = metrics.get_shard_stats(5).await.unwrap();
919 assert_eq!(stats.total_queries, 1);
920 }
921
922 #[tokio::test]
923 async fn test_classification_lookup() {
924 let metrics = SchemaRoutingMetrics::new();
925
926 metrics.record_classification_lookup(true);
927 metrics.record_classification_lookup(true);
928 metrics.record_classification_lookup(false);
929
930 assert_eq!(metrics.get_routing_stats().classification_hit_rate(), 2.0 / 3.0);
931 }
932
933 #[tokio::test]
934 async fn test_record_error() {
935 let metrics = SchemaRoutingMetrics::new();
936
937 metrics.record_error();
938 metrics.record_error();
939
940 assert_eq!(metrics.get_routing_stats().routing_errors.load(Ordering::Relaxed), 2);
941 }
942
943 #[tokio::test]
944 async fn test_reset_metrics() {
945 let metrics = SchemaRoutingMetrics::new();
946
947 metrics.record_routing(&sample_decision(), 1000).await;
949 metrics.record_table_query("users", AccessPattern::PointLookup, WorkloadType::OLTP, 500).await;
950 metrics.record_ai_workload(AIWorkloadType::EmbeddingRetrieval, None, None).await;
951
952 metrics.reset().await;
954
955 assert_eq!(metrics.get_routing_stats().total_queries(), 0);
957 assert!(metrics.get_table_stats("users").await.is_none());
958 assert_eq!(metrics.get_ai_stats().await.total_queries, 0);
959 }
960
961 #[tokio::test]
962 async fn test_generate_report() {
963 let metrics = SchemaRoutingMetrics::new();
964
965 for _ in 0..10 {
967 metrics.record_routing(&sample_decision(), 1000).await;
968 }
969 metrics.record_table_query("users", AccessPattern::PointLookup, WorkloadType::OLTP, 500).await;
970 metrics.record_ai_workload(AIWorkloadType::EmbeddingRetrieval, None, None).await;
971 metrics.record_rag_stage(RAGStage::Retrieval, 5000).await;
972
973 let report = metrics.generate_report().await;
974
975 assert_eq!(report.total_queries, 10);
976 assert_eq!(report.table_count, 1);
977 }
978
979 #[test]
980 fn test_routing_stats_percentages() {
981 let stats = RoutingStats::default();
982
983 assert_eq!(stats.schema_aware_percentage(), 0.0);
985 assert_eq!(stats.classification_hit_rate(), 0.0);
986 assert_eq!(stats.primary_replica_ratio(), 0.0);
987
988 stats.total_queries.store(100, Ordering::Relaxed);
990 stats.schema_aware_routes.store(80, Ordering::Relaxed);
991 stats.classification_hits.store(90, Ordering::Relaxed);
992 stats.classification_misses.store(10, Ordering::Relaxed);
993 stats.primary_routes.store(30, Ordering::Relaxed);
994 stats.replica_routes.store(70, Ordering::Relaxed);
995
996 assert_eq!(stats.schema_aware_percentage(), 80.0);
997 assert_eq!(stats.classification_hit_rate(), 0.9);
998 assert_eq!(stats.primary_replica_ratio(), 0.3);
999 }
1000
1001 #[test]
1002 fn test_table_stats_record() {
1003 let mut stats = TableStats::new("orders");
1004
1005 stats.record_query(AccessPattern::PointLookup, WorkloadType::OLTP, 100);
1006 stats.record_query(AccessPattern::RangeScan, WorkloadType::OLTP, 200);
1007
1008 assert_eq!(stats.total_queries, 2);
1009 assert_eq!(*stats.by_access_pattern.get(&AccessPattern::PointLookup).unwrap(), 1);
1010 assert_eq!(*stats.by_access_pattern.get(&AccessPattern::RangeScan).unwrap(), 1);
1011 }
1012
1013 #[test]
1014 fn test_workload_stats_record() {
1015 let mut stats = WorkloadStats::default();
1016
1017 stats.record(true, false, 100);
1018 stats.record(false, false, 200);
1019 stats.record(false, true, 300);
1020
1021 assert_eq!(stats.total_queries, 3);
1022 assert_eq!(stats.routed_to_primary, 1);
1023 assert_eq!(stats.routed_to_replica, 1);
1024 assert_eq!(stats.scatter_gather, 1);
1025 }
1026
1027 #[test]
1028 fn test_ai_stats_record() {
1029 let mut stats = AIWorkloadStats::default();
1030
1031 stats.record(AIWorkloadType::EmbeddingRetrieval, Some(1536), Some(10));
1032 stats.record(AIWorkloadType::KnowledgeBase, None, None);
1033
1034 assert_eq!(stats.total_queries, 2);
1035 assert_eq!(stats.embedding_retrieval, 1);
1036 assert_eq!(stats.knowledge_base, 1);
1037 }
1038
1039 #[test]
1040 fn test_rag_stats_record() {
1041 let mut stats = RAGStats::default();
1042
1043 stats.record_stage(RAGStage::Retrieval, 5000);
1044 stats.record_stage(RAGStage::Fetch, 2000);
1045 stats.record_stage(RAGStage::Rerank, 1000);
1046
1047 assert_eq!(stats.total_queries, 3);
1048 assert_eq!(stats.retrieval_count, 1);
1049 assert_eq!(stats.fetch_count, 1);
1050 assert_eq!(stats.rerank_count, 1);
1051 }
1052}