Skip to main content

heliosdb_proxy/schema_routing/
metrics.rs

1//! Schema Routing Metrics
2//!
3//! Provides metrics and statistics for schema-aware routing decisions.
4//! Tracks routing patterns, hit rates, and performance characteristics.
5
6use std::collections::HashMap;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10use tokio::sync::RwLock;
11
12use super::{
13    AIWorkloadType, AccessPattern, DataTemperature, RAGStage, RoutingDecision, WorkloadType,
14};
15
16/// Schema routing metrics collector
17pub struct SchemaRoutingMetrics {
18    /// Routing statistics by table
19    table_stats: Arc<RwLock<HashMap<String, TableStats>>>,
20    /// Routing statistics by workload type
21    workload_stats: Arc<RwLock<HashMap<WorkloadType, WorkloadStats>>>,
22    /// Routing statistics by temperature
23    temperature_stats: Arc<RwLock<HashMap<DataTemperature, TemperatureStats>>>,
24    /// AI workload statistics
25    ai_stats: Arc<RwLock<AIWorkloadStats>>,
26    /// RAG pipeline statistics
27    rag_stats: Arc<RwLock<RAGStats>>,
28    /// Overall routing statistics
29    routing_stats: Arc<RoutingStats>,
30    /// Node distribution statistics
31    node_stats: Arc<RwLock<HashMap<String, NodeStats>>>,
32    /// Shard distribution statistics
33    shard_stats: Arc<RwLock<HashMap<u32, ShardStats>>>,
34    /// Start time for uptime calculation
35    start_time: Instant,
36}
37
38/// Statistics for a specific table
39#[derive(Debug, Clone, Default)]
40pub struct TableStats {
41    /// Table name
42    pub table_name: String,
43    /// Total queries routed
44    pub total_queries: u64,
45    /// Queries by access pattern
46    pub by_access_pattern: HashMap<AccessPattern, u64>,
47    /// Queries by workload type
48    pub by_workload: HashMap<WorkloadType, u64>,
49    /// Average latency in microseconds
50    pub avg_latency_us: u64,
51    /// P99 latency in microseconds
52    pub p99_latency_us: u64,
53    /// Shard hit rate (queries with shard key)
54    pub shard_hit_rate: f64,
55    /// Cache utilization
56    pub cache_hit_rate: f64,
57    /// Last query time
58    pub last_query_time: Option<Instant>,
59}
60
61impl TableStats {
62    /// Create new table stats
63    pub fn new(table_name: &str) -> Self {
64        Self {
65            table_name: table_name.to_string(),
66            ..Default::default()
67        }
68    }
69
70    /// Record a query
71    pub fn record_query(
72        &mut self,
73        pattern: AccessPattern,
74        workload: WorkloadType,
75        latency_us: u64,
76    ) {
77        self.total_queries += 1;
78        *self.by_access_pattern.entry(pattern).or_insert(0) += 1;
79        *self.by_workload.entry(workload).or_insert(0) += 1;
80
81        // Update average latency (exponential moving average)
82        if self.avg_latency_us == 0 {
83            self.avg_latency_us = latency_us;
84        } else {
85            self.avg_latency_us = (self.avg_latency_us * 9 + latency_us) / 10;
86        }
87
88        self.last_query_time = Some(Instant::now());
89    }
90}
91
92/// Statistics by workload type
93#[derive(Debug, Clone, Default)]
94pub struct WorkloadStats {
95    /// Total queries for this workload
96    pub total_queries: u64,
97    /// Queries routed to primary
98    pub routed_to_primary: u64,
99    /// Queries routed to replicas
100    pub routed_to_replica: u64,
101    /// Queries scattered to all nodes
102    pub scatter_gather: u64,
103    /// Average latency in microseconds
104    pub avg_latency_us: u64,
105    /// Tables using this workload
106    pub tables: Vec<String>,
107}
108
109impl WorkloadStats {
110    /// Record a routing decision
111    pub fn record(&mut self, to_primary: bool, is_scatter: bool, latency_us: u64) {
112        self.total_queries += 1;
113
114        if is_scatter {
115            self.scatter_gather += 1;
116        } else if to_primary {
117            self.routed_to_primary += 1;
118        } else {
119            self.routed_to_replica += 1;
120        }
121
122        // Update average latency
123        if self.avg_latency_us == 0 {
124            self.avg_latency_us = latency_us;
125        } else {
126            self.avg_latency_us = (self.avg_latency_us * 9 + latency_us) / 10;
127        }
128    }
129}
130
131/// Statistics by temperature
132#[derive(Debug, Clone, Default)]
133pub struct TemperatureStats {
134    /// Total queries
135    pub total_queries: u64,
136    /// Total tables at this temperature
137    pub table_count: u64,
138    /// Total size in bytes
139    pub total_size_bytes: u64,
140    /// Cache hit rate
141    pub cache_hit_rate: f64,
142    /// Average query latency
143    pub avg_latency_us: u64,
144}
145
146/// AI workload statistics
147#[derive(Debug, Clone, Default)]
148pub struct AIWorkloadStats {
149    /// Total AI workload queries
150    pub total_queries: u64,
151    /// Queries by AI workload type
152    pub by_type: HashMap<String, u64>,
153    /// Embedding retrieval count
154    pub embedding_retrieval: u64,
155    /// Context lookup count
156    pub context_lookup: u64,
157    /// Knowledge base queries
158    pub knowledge_base: u64,
159    /// Tool execution queries
160    pub tool_execution: u64,
161    /// Average vector dimensions
162    pub avg_vector_dimensions: u64,
163    /// Average k (top-k results)
164    pub avg_top_k: u64,
165}
166
167impl AIWorkloadStats {
168    /// Record an AI workload query
169    pub fn record(
170        &mut self,
171        workload_type: AIWorkloadType,
172        vector_dims: Option<u64>,
173        top_k: Option<u64>,
174    ) {
175        self.total_queries += 1;
176
177        match workload_type {
178            AIWorkloadType::EmbeddingRetrieval => {
179                self.embedding_retrieval += 1;
180                *self
181                    .by_type
182                    .entry("embedding_retrieval".to_string())
183                    .or_insert(0) += 1;
184            }
185            AIWorkloadType::ContextLookup => {
186                self.context_lookup += 1;
187                *self
188                    .by_type
189                    .entry("context_lookup".to_string())
190                    .or_insert(0) += 1;
191            }
192            AIWorkloadType::KnowledgeBase => {
193                self.knowledge_base += 1;
194                *self
195                    .by_type
196                    .entry("knowledge_base".to_string())
197                    .or_insert(0) += 1;
198            }
199            AIWorkloadType::ToolExecution => {
200                self.tool_execution += 1;
201                *self
202                    .by_type
203                    .entry("tool_execution".to_string())
204                    .or_insert(0) += 1;
205            }
206        }
207
208        if let Some(dims) = vector_dims {
209            self.avg_vector_dimensions = (self.avg_vector_dimensions * 9 + dims) / 10;
210        }
211
212        if let Some(k) = top_k {
213            self.avg_top_k = (self.avg_top_k * 9 + k) / 10;
214        }
215    }
216}
217
218/// RAG pipeline statistics
219#[derive(Debug, Clone, Default)]
220pub struct RAGStats {
221    /// Total RAG queries
222    pub total_queries: u64,
223    /// Retrieval stage count
224    pub retrieval_count: u64,
225    /// Fetch stage count
226    pub fetch_count: u64,
227    /// Rerank stage count
228    pub rerank_count: u64,
229    /// Generate stage count
230    pub generate_count: u64,
231    /// Average retrieval latency
232    pub avg_retrieval_latency_us: u64,
233    /// Average fetch latency
234    pub avg_fetch_latency_us: u64,
235    /// Average total pipeline latency
236    pub avg_pipeline_latency_us: u64,
237}
238
239impl RAGStats {
240    /// Record a RAG stage execution
241    pub fn record_stage(&mut self, stage: RAGStage, latency_us: u64) {
242        self.total_queries += 1;
243
244        match stage {
245            RAGStage::Retrieval => {
246                self.retrieval_count += 1;
247                if self.avg_retrieval_latency_us == 0 {
248                    self.avg_retrieval_latency_us = latency_us;
249                } else {
250                    self.avg_retrieval_latency_us =
251                        (self.avg_retrieval_latency_us * 9 + latency_us) / 10;
252                }
253            }
254            RAGStage::Fetch => {
255                self.fetch_count += 1;
256                if self.avg_fetch_latency_us == 0 {
257                    self.avg_fetch_latency_us = latency_us;
258                } else {
259                    self.avg_fetch_latency_us = (self.avg_fetch_latency_us * 9 + latency_us) / 10;
260                }
261            }
262            RAGStage::Rerank => {
263                self.rerank_count += 1;
264            }
265            RAGStage::Generate => {
266                self.generate_count += 1;
267            }
268        }
269    }
270}
271
272/// Overall routing statistics
273pub struct RoutingStats {
274    /// Total queries routed
275    pub total_queries: AtomicU64,
276    /// Schema-aware routing decisions
277    pub schema_aware_routes: AtomicU64,
278    /// Fallback routing decisions
279    pub fallback_routes: AtomicU64,
280    /// Shard-targeted queries
281    pub shard_targeted: AtomicU64,
282    /// Scatter-gather queries
283    pub scatter_gather: AtomicU64,
284    /// Primary routes
285    pub primary_routes: AtomicU64,
286    /// Replica routes
287    pub replica_routes: AtomicU64,
288    /// AI workload routes
289    pub ai_routes: AtomicU64,
290    /// RAG pipeline routes
291    pub rag_routes: AtomicU64,
292    /// Vector search routes
293    pub vector_routes: AtomicU64,
294    /// Classification cache hits
295    pub classification_hits: AtomicU64,
296    /// Classification cache misses
297    pub classification_misses: AtomicU64,
298    /// Routing errors
299    pub routing_errors: AtomicU64,
300}
301
302impl Default for RoutingStats {
303    fn default() -> Self {
304        Self {
305            total_queries: AtomicU64::new(0),
306            schema_aware_routes: AtomicU64::new(0),
307            fallback_routes: AtomicU64::new(0),
308            shard_targeted: AtomicU64::new(0),
309            scatter_gather: AtomicU64::new(0),
310            primary_routes: AtomicU64::new(0),
311            replica_routes: AtomicU64::new(0),
312            ai_routes: AtomicU64::new(0),
313            rag_routes: AtomicU64::new(0),
314            vector_routes: AtomicU64::new(0),
315            classification_hits: AtomicU64::new(0),
316            classification_misses: AtomicU64::new(0),
317            routing_errors: AtomicU64::new(0),
318        }
319    }
320}
321
322impl RoutingStats {
323    /// Get total queries count
324    pub fn total_queries(&self) -> u64 {
325        self.total_queries.load(Ordering::Relaxed)
326    }
327
328    /// Get schema-aware routing percentage
329    pub fn schema_aware_percentage(&self) -> f64 {
330        let total = self.total_queries.load(Ordering::Relaxed);
331        if total == 0 {
332            return 0.0;
333        }
334        let schema_aware = self.schema_aware_routes.load(Ordering::Relaxed);
335        (schema_aware as f64 / total as f64) * 100.0
336    }
337
338    /// Get classification cache hit rate
339    pub fn classification_hit_rate(&self) -> f64 {
340        let hits = self.classification_hits.load(Ordering::Relaxed);
341        let misses = self.classification_misses.load(Ordering::Relaxed);
342        let total = hits + misses;
343        if total == 0 {
344            return 0.0;
345        }
346        hits as f64 / total as f64
347    }
348
349    /// Get primary/replica distribution
350    pub fn primary_replica_ratio(&self) -> f64 {
351        let primary = self.primary_routes.load(Ordering::Relaxed);
352        let replica = self.replica_routes.load(Ordering::Relaxed);
353        let total = primary + replica;
354        if total == 0 {
355            return 0.0;
356        }
357        primary as f64 / total as f64
358    }
359}
360
361/// Node routing statistics
362#[derive(Debug, Clone, Default)]
363pub struct NodeStats {
364    /// Node identifier
365    pub node_id: String,
366    /// Total queries routed to this node
367    pub total_queries: u64,
368    /// Average latency
369    pub avg_latency_us: u64,
370    /// Error count
371    pub error_count: u64,
372    /// Load factor (0.0 - 1.0)
373    pub load_factor: f64,
374    /// Last query time
375    pub last_query_time: Option<Instant>,
376}
377
378/// Shard routing statistics
379#[derive(Debug, Clone, Default)]
380pub struct ShardStats {
381    /// Shard identifier
382    pub shard_id: u32,
383    /// Total queries
384    pub total_queries: u64,
385    /// Tables in this shard
386    pub tables: Vec<String>,
387    /// Estimated row count
388    pub estimated_rows: u64,
389    /// Size in bytes
390    pub size_bytes: u64,
391}
392
393impl SchemaRoutingMetrics {
394    /// Create a new metrics collector
395    pub fn new() -> Self {
396        Self {
397            table_stats: Arc::new(RwLock::new(HashMap::new())),
398            workload_stats: Arc::new(RwLock::new(HashMap::new())),
399            temperature_stats: Arc::new(RwLock::new(HashMap::new())),
400            ai_stats: Arc::new(RwLock::new(AIWorkloadStats::default())),
401            rag_stats: Arc::new(RwLock::new(RAGStats::default())),
402            routing_stats: Arc::new(RoutingStats::default()),
403            node_stats: Arc::new(RwLock::new(HashMap::new())),
404            shard_stats: Arc::new(RwLock::new(HashMap::new())),
405            start_time: Instant::now(),
406        }
407    }
408
409    /// Record a routing decision
410    pub async fn record_routing(&self, decision: &RoutingDecision, latency_us: u64) {
411        self.routing_stats
412            .total_queries
413            .fetch_add(1, Ordering::Relaxed);
414
415        match &decision.target {
416            super::RouteTarget::Primary => {
417                self.routing_stats
418                    .primary_routes
419                    .fetch_add(1, Ordering::Relaxed);
420            }
421            super::RouteTarget::Node(_) => {
422                self.routing_stats
423                    .replica_routes
424                    .fetch_add(1, Ordering::Relaxed);
425            }
426            super::RouteTarget::Shard(_) => {
427                self.routing_stats
428                    .shard_targeted
429                    .fetch_add(1, Ordering::Relaxed);
430            }
431            super::RouteTarget::ScatterGather => {
432                self.routing_stats
433                    .scatter_gather
434                    .fetch_add(1, Ordering::Relaxed);
435            }
436        }
437
438        // Update node stats if routed to specific node
439        if let super::RouteTarget::Node(node_id) = &decision.target {
440            let mut node_stats = self.node_stats.write().await;
441            let stats = node_stats
442                .entry(node_id.clone())
443                .or_insert_with(|| NodeStats {
444                    node_id: node_id.clone(),
445                    ..Default::default()
446                });
447            stats.total_queries += 1;
448            stats.avg_latency_us = (stats.avg_latency_us * 9 + latency_us) / 10;
449            stats.last_query_time = Some(Instant::now());
450        }
451
452        // Update shard stats
453        if !decision.shards.is_empty() {
454            let mut shard_stats = self.shard_stats.write().await;
455            for shard_id in &decision.shards {
456                let stats = shard_stats.entry(*shard_id).or_default();
457                stats.shard_id = *shard_id;
458                stats.total_queries += 1;
459            }
460        }
461    }
462
463    /// Record a table query
464    pub async fn record_table_query(
465        &self,
466        table: &str,
467        pattern: AccessPattern,
468        workload: WorkloadType,
469        latency_us: u64,
470    ) {
471        let mut table_stats = self.table_stats.write().await;
472        let stats = table_stats
473            .entry(table.to_string())
474            .or_insert_with(|| TableStats::new(table));
475        stats.record_query(pattern, workload, latency_us);
476    }
477
478    /// Record a workload routing
479    pub async fn record_workload(
480        &self,
481        workload: WorkloadType,
482        to_primary: bool,
483        is_scatter: bool,
484        latency_us: u64,
485    ) {
486        let mut workload_stats = self.workload_stats.write().await;
487        let stats = workload_stats.entry(workload).or_default();
488        stats.record(to_primary, is_scatter, latency_us);
489    }
490
491    /// Record an AI workload query
492    pub async fn record_ai_workload(
493        &self,
494        workload_type: AIWorkloadType,
495        vector_dims: Option<u64>,
496        top_k: Option<u64>,
497    ) {
498        self.routing_stats.ai_routes.fetch_add(1, Ordering::Relaxed);
499        let mut ai_stats = self.ai_stats.write().await;
500        ai_stats.record(workload_type, vector_dims, top_k);
501    }
502
503    /// Record a RAG stage execution
504    pub async fn record_rag_stage(&self, stage: RAGStage, latency_us: u64) {
505        self.routing_stats
506            .rag_routes
507            .fetch_add(1, Ordering::Relaxed);
508        let mut rag_stats = self.rag_stats.write().await;
509        rag_stats.record_stage(stage, latency_us);
510    }
511
512    /// Record a classification cache hit or miss
513    pub fn record_classification_lookup(&self, hit: bool) {
514        if hit {
515            self.routing_stats
516                .classification_hits
517                .fetch_add(1, Ordering::Relaxed);
518        } else {
519            self.routing_stats
520                .classification_misses
521                .fetch_add(1, Ordering::Relaxed);
522        }
523    }
524
525    /// Record a routing error
526    pub fn record_error(&self) {
527        self.routing_stats
528            .routing_errors
529            .fetch_add(1, Ordering::Relaxed);
530    }
531
532    /// Get overall routing stats
533    pub fn get_routing_stats(&self) -> &RoutingStats {
534        &self.routing_stats
535    }
536
537    /// Get table statistics
538    pub async fn get_table_stats(&self, table: &str) -> Option<TableStats> {
539        let stats = self.table_stats.read().await;
540        stats.get(table).cloned()
541    }
542
543    /// Get all table statistics
544    pub async fn get_all_table_stats(&self) -> HashMap<String, TableStats> {
545        self.table_stats.read().await.clone()
546    }
547
548    /// Get workload statistics
549    pub async fn get_workload_stats(&self, workload: WorkloadType) -> Option<WorkloadStats> {
550        let stats = self.workload_stats.read().await;
551        stats.get(&workload).cloned()
552    }
553
554    /// Get all workload statistics
555    pub async fn get_all_workload_stats(&self) -> HashMap<WorkloadType, WorkloadStats> {
556        self.workload_stats.read().await.clone()
557    }
558
559    /// Get AI workload statistics
560    pub async fn get_ai_stats(&self) -> AIWorkloadStats {
561        self.ai_stats.read().await.clone()
562    }
563
564    /// Get RAG pipeline statistics
565    pub async fn get_rag_stats(&self) -> RAGStats {
566        self.rag_stats.read().await.clone()
567    }
568
569    /// Get node statistics
570    pub async fn get_node_stats(&self, node_id: &str) -> Option<NodeStats> {
571        let stats = self.node_stats.read().await;
572        stats.get(node_id).cloned()
573    }
574
575    /// Get all node statistics
576    pub async fn get_all_node_stats(&self) -> HashMap<String, NodeStats> {
577        self.node_stats.read().await.clone()
578    }
579
580    /// Get shard statistics
581    pub async fn get_shard_stats(&self, shard_id: u32) -> Option<ShardStats> {
582        let stats = self.shard_stats.read().await;
583        stats.get(&shard_id).cloned()
584    }
585
586    /// Get uptime
587    pub fn uptime(&self) -> Duration {
588        self.start_time.elapsed()
589    }
590
591    /// Reset all metrics
592    pub async fn reset(&self) {
593        self.table_stats.write().await.clear();
594        self.workload_stats.write().await.clear();
595        self.temperature_stats.write().await.clear();
596        *self.ai_stats.write().await = AIWorkloadStats::default();
597        *self.rag_stats.write().await = RAGStats::default();
598        self.node_stats.write().await.clear();
599        self.shard_stats.write().await.clear();
600
601        // Reset atomic counters
602        self.routing_stats.total_queries.store(0, Ordering::Relaxed);
603        self.routing_stats
604            .schema_aware_routes
605            .store(0, Ordering::Relaxed);
606        self.routing_stats
607            .fallback_routes
608            .store(0, Ordering::Relaxed);
609        self.routing_stats
610            .shard_targeted
611            .store(0, Ordering::Relaxed);
612        self.routing_stats
613            .scatter_gather
614            .store(0, Ordering::Relaxed);
615        self.routing_stats
616            .primary_routes
617            .store(0, Ordering::Relaxed);
618        self.routing_stats
619            .replica_routes
620            .store(0, Ordering::Relaxed);
621        self.routing_stats.ai_routes.store(0, Ordering::Relaxed);
622        self.routing_stats.rag_routes.store(0, Ordering::Relaxed);
623        self.routing_stats.vector_routes.store(0, Ordering::Relaxed);
624        self.routing_stats
625            .classification_hits
626            .store(0, Ordering::Relaxed);
627        self.routing_stats
628            .classification_misses
629            .store(0, Ordering::Relaxed);
630        self.routing_stats
631            .routing_errors
632            .store(0, Ordering::Relaxed);
633    }
634
635    /// Generate metrics report
636    pub async fn generate_report(&self) -> MetricsReport {
637        let routing = self.get_routing_stats();
638        let tables = self.get_all_table_stats().await;
639        let _workloads = self.get_all_workload_stats().await;
640        let ai = self.get_ai_stats().await;
641        let rag = self.get_rag_stats().await;
642        let nodes = self.get_all_node_stats().await;
643
644        MetricsReport {
645            uptime: self.uptime(),
646            total_queries: routing.total_queries(),
647            schema_aware_percentage: routing.schema_aware_percentage(),
648            classification_hit_rate: routing.classification_hit_rate(),
649            primary_replica_ratio: routing.primary_replica_ratio(),
650            table_count: tables.len(),
651            active_nodes: nodes.len(),
652            ai_query_percentage: if routing.total_queries() == 0 {
653                0.0
654            } else {
655                (ai.total_queries as f64 / routing.total_queries() as f64) * 100.0
656            },
657            rag_query_percentage: if routing.total_queries() == 0 {
658                0.0
659            } else {
660                (rag.total_queries as f64 / routing.total_queries() as f64) * 100.0
661            },
662            error_rate: if routing.total_queries() == 0 {
663                0.0
664            } else {
665                (routing.routing_errors.load(Ordering::Relaxed) as f64
666                    / routing.total_queries() as f64)
667                    * 100.0
668            },
669        }
670    }
671}
672
673impl Default for SchemaRoutingMetrics {
674    fn default() -> Self {
675        Self::new()
676    }
677}
678
679impl SchemaRoutingMetrics {
680    /// Get table stats for admin API (blocking sync version)
681    pub fn get_table_stats_for_admin(&self) -> Vec<(String, TableStatsForAdmin)> {
682        // Use futures to block on the async lock
683        let table_stats = self.table_stats.clone();
684        let handle = tokio::runtime::Handle::try_current();
685        match handle {
686            Ok(h) => {
687                let stats = h.block_on(async {
688                    let guard = table_stats.read().await;
689                    guard
690                        .iter()
691                        .map(|(name, stats)| {
692                            (
693                                name.clone(),
694                                TableStatsForAdmin {
695                                    query_count: stats.total_queries,
696                                    avg_latency_ms: stats.avg_latency_us as f64 / 1000.0,
697                                    cache_hit_rate: stats.cache_hit_rate,
698                                    temperature: infer_temperature_from_count(stats.total_queries),
699                                    workload: infer_workload_from_stats(stats),
700                                },
701                            )
702                        })
703                        .collect::<Vec<_>>()
704                });
705                stats
706            }
707            Err(_) => Vec::new(),
708        }
709    }
710
711    /// Get workload stats for admin API (blocking sync version)
712    pub fn get_workload_stats_for_admin(&self) -> Vec<(WorkloadType, WorkloadStatsForAdmin)> {
713        let workload_stats = self.workload_stats.clone();
714        let handle = tokio::runtime::Handle::try_current();
715        match handle {
716            Ok(h) => h.block_on(async {
717                let guard = workload_stats.read().await;
718                guard
719                    .iter()
720                    .map(|(workload, stats)| {
721                        (
722                            *workload,
723                            WorkloadStatsForAdmin {
724                                query_count: stats.total_queries,
725                                avg_latency_ms: stats.avg_latency_us as f64 / 1000.0,
726                                queries_to_primary: stats.routed_to_primary,
727                                queries_to_replica: stats.routed_to_replica,
728                            },
729                        )
730                    })
731                    .collect::<Vec<_>>()
732            }),
733            Err(_) => Vec::new(),
734        }
735    }
736
737    /// Get AI workload stats for admin API (blocking sync version)
738    pub fn get_ai_workload_stats(&self) -> AIWorkloadStatsForAdmin {
739        let ai_stats = self.ai_stats.clone();
740        let handle = tokio::runtime::Handle::try_current();
741        match handle {
742            Ok(h) => h.block_on(async {
743                let guard = ai_stats.read().await;
744                AIWorkloadStatsForAdmin {
745                    embedding_retrieval_count: guard.embedding_retrieval,
746                    context_lookup_count: guard.context_lookup,
747                    knowledge_base_count: guard.knowledge_base,
748                    tool_execution_count: guard.tool_execution,
749                    avg_vector_dimensions: guard.avg_vector_dimensions as f64,
750                }
751            }),
752            Err(_) => AIWorkloadStatsForAdmin::default(),
753        }
754    }
755
756    /// Get RAG stats for admin API (blocking sync version)
757    pub fn get_rag_stats_for_admin(&self) -> RAGStatsForAdmin {
758        let rag_stats = self.rag_stats.clone();
759        let handle = tokio::runtime::Handle::try_current();
760        match handle {
761            Ok(h) => h.block_on(async {
762                let guard = rag_stats.read().await;
763                RAGStatsForAdmin {
764                    retrieval_count: guard.retrieval_count,
765                    avg_retrieval_latency_ms: guard.avg_retrieval_latency_us as f64 / 1000.0,
766                    fetch_count: guard.fetch_count,
767                    avg_fetch_latency_ms: guard.avg_fetch_latency_us as f64 / 1000.0,
768                    total_pipeline_executions: guard.total_queries,
769                    avg_total_latency_ms: guard.avg_pipeline_latency_us as f64 / 1000.0,
770                }
771            }),
772            Err(_) => RAGStatsForAdmin::default(),
773        }
774    }
775}
776
777/// Infer temperature from query count
778fn infer_temperature_from_count(query_count: u64) -> DataTemperature {
779    if query_count > 10000 {
780        DataTemperature::Hot
781    } else if query_count > 1000 {
782        DataTemperature::Warm
783    } else if query_count > 100 {
784        DataTemperature::Cold
785    } else {
786        DataTemperature::Frozen
787    }
788}
789
790/// Infer workload from stats
791fn infer_workload_from_stats(stats: &TableStats) -> WorkloadType {
792    let olap_count = stats
793        .by_access_pattern
794        .get(&AccessPattern::FullScan)
795        .copied()
796        .unwrap_or(0);
797    let vector_count = stats
798        .by_access_pattern
799        .get(&AccessPattern::VectorSearch)
800        .copied()
801        .unwrap_or(0);
802    let point_count = stats
803        .by_access_pattern
804        .get(&AccessPattern::PointLookup)
805        .copied()
806        .unwrap_or(0);
807
808    if vector_count > stats.total_queries / 3 {
809        WorkloadType::Vector
810    } else if olap_count > stats.total_queries / 2 {
811        WorkloadType::OLAP
812    } else if point_count > stats.total_queries / 2 {
813        WorkloadType::OLTP
814    } else {
815        WorkloadType::Mixed
816    }
817}
818
819/// Table stats for admin API
820#[derive(Debug, Clone)]
821pub struct TableStatsForAdmin {
822    pub query_count: u64,
823    pub avg_latency_ms: f64,
824    pub cache_hit_rate: f64,
825    pub temperature: DataTemperature,
826    pub workload: WorkloadType,
827}
828
829/// Workload stats for admin API
830#[derive(Debug, Clone)]
831pub struct WorkloadStatsForAdmin {
832    pub query_count: u64,
833    pub avg_latency_ms: f64,
834    pub queries_to_primary: u64,
835    pub queries_to_replica: u64,
836}
837
838/// AI workload stats for admin API
839#[derive(Debug, Clone, Default)]
840pub struct AIWorkloadStatsForAdmin {
841    pub embedding_retrieval_count: u64,
842    pub context_lookup_count: u64,
843    pub knowledge_base_count: u64,
844    pub tool_execution_count: u64,
845    pub avg_vector_dimensions: f64,
846}
847
848impl AIWorkloadStatsForAdmin {
849    /// Get total AI queries
850    pub fn total_ai_queries(&self) -> u64 {
851        self.embedding_retrieval_count
852            + self.context_lookup_count
853            + self.knowledge_base_count
854            + self.tool_execution_count
855    }
856}
857
858/// RAG stats for admin API
859#[derive(Debug, Clone, Default)]
860pub struct RAGStatsForAdmin {
861    pub retrieval_count: u64,
862    pub avg_retrieval_latency_ms: f64,
863    pub fetch_count: u64,
864    pub avg_fetch_latency_ms: f64,
865    pub total_pipeline_executions: u64,
866    pub avg_total_latency_ms: f64,
867}
868
869/// Summary metrics report
870#[derive(Debug, Clone)]
871pub struct MetricsReport {
872    /// Uptime duration
873    pub uptime: Duration,
874    /// Total queries routed
875    pub total_queries: u64,
876    /// Percentage of schema-aware routes
877    pub schema_aware_percentage: f64,
878    /// Classification cache hit rate
879    pub classification_hit_rate: f64,
880    /// Primary to replica ratio
881    pub primary_replica_ratio: f64,
882    /// Number of tracked tables
883    pub table_count: usize,
884    /// Number of active nodes
885    pub active_nodes: usize,
886    /// AI query percentage
887    pub ai_query_percentage: f64,
888    /// RAG query percentage
889    pub rag_query_percentage: f64,
890    /// Error rate
891    pub error_rate: f64,
892}
893
894#[cfg(test)]
895mod tests {
896    use super::super::{RouteTarget, RoutingReason};
897    use super::*;
898
899    fn sample_decision() -> RoutingDecision {
900        RoutingDecision {
901            target: RouteTarget::Primary,
902            reason: RoutingReason::WriteQuery,
903            shards: vec![],
904            branch: None,
905            node_info: None,
906        }
907    }
908
909    #[tokio::test]
910    async fn test_metrics_new() {
911        let metrics = SchemaRoutingMetrics::new();
912        assert_eq!(metrics.get_routing_stats().total_queries(), 0);
913    }
914
915    #[tokio::test]
916    async fn test_record_routing() {
917        let metrics = SchemaRoutingMetrics::new();
918        let decision = sample_decision();
919
920        metrics.record_routing(&decision, 1000).await;
921
922        assert_eq!(metrics.get_routing_stats().total_queries(), 1);
923        assert_eq!(
924            metrics
925                .get_routing_stats()
926                .primary_routes
927                .load(Ordering::Relaxed),
928            1
929        );
930    }
931
932    #[tokio::test]
933    async fn test_record_table_query() {
934        let metrics = SchemaRoutingMetrics::new();
935
936        metrics
937            .record_table_query("users", AccessPattern::PointLookup, WorkloadType::OLTP, 500)
938            .await;
939        metrics
940            .record_table_query("users", AccessPattern::PointLookup, WorkloadType::OLTP, 600)
941            .await;
942
943        let stats = metrics.get_table_stats("users").await.unwrap();
944        assert_eq!(stats.total_queries, 2);
945        assert_eq!(
946            *stats
947                .by_access_pattern
948                .get(&AccessPattern::PointLookup)
949                .unwrap(),
950            2
951        );
952    }
953
954    #[tokio::test]
955    async fn test_record_workload() {
956        let metrics = SchemaRoutingMetrics::new();
957
958        metrics
959            .record_workload(WorkloadType::OLTP, true, false, 100)
960            .await;
961        metrics
962            .record_workload(WorkloadType::OLTP, false, false, 200)
963            .await;
964
965        let stats = metrics
966            .get_workload_stats(WorkloadType::OLTP)
967            .await
968            .unwrap();
969        assert_eq!(stats.total_queries, 2);
970        assert_eq!(stats.routed_to_primary, 1);
971        assert_eq!(stats.routed_to_replica, 1);
972    }
973
974    #[tokio::test]
975    async fn test_record_ai_workload() {
976        let metrics = SchemaRoutingMetrics::new();
977
978        metrics
979            .record_ai_workload(AIWorkloadType::EmbeddingRetrieval, Some(1536), Some(10))
980            .await;
981        metrics
982            .record_ai_workload(AIWorkloadType::ContextLookup, None, None)
983            .await;
984
985        let stats = metrics.get_ai_stats().await;
986        assert_eq!(stats.total_queries, 2);
987        assert_eq!(stats.embedding_retrieval, 1);
988        assert_eq!(stats.context_lookup, 1);
989    }
990
991    #[tokio::test]
992    async fn test_record_rag_stage() {
993        let metrics = SchemaRoutingMetrics::new();
994
995        metrics.record_rag_stage(RAGStage::Retrieval, 5000).await;
996        metrics.record_rag_stage(RAGStage::Fetch, 2000).await;
997
998        let stats = metrics.get_rag_stats().await;
999        assert_eq!(stats.total_queries, 2);
1000        assert_eq!(stats.retrieval_count, 1);
1001        assert_eq!(stats.fetch_count, 1);
1002    }
1003
1004    #[tokio::test]
1005    async fn test_record_node_stats() {
1006        let metrics = SchemaRoutingMetrics::new();
1007
1008        let decision = RoutingDecision {
1009            target: RouteTarget::Node("node1".to_string()),
1010            reason: RoutingReason::LowLatency,
1011            shards: vec![],
1012            branch: None,
1013            node_info: None,
1014        };
1015
1016        metrics.record_routing(&decision, 1000).await;
1017        metrics.record_routing(&decision, 2000).await;
1018
1019        let stats = metrics.get_node_stats("node1").await.unwrap();
1020        assert_eq!(stats.total_queries, 2);
1021    }
1022
1023    #[tokio::test]
1024    async fn test_record_shard_stats() {
1025        let metrics = SchemaRoutingMetrics::new();
1026
1027        let decision = RoutingDecision {
1028            target: RouteTarget::Shard(5),
1029            reason: RoutingReason::ShardKey,
1030            shards: vec![5],
1031            branch: None,
1032            node_info: None,
1033        };
1034
1035        metrics.record_routing(&decision, 1000).await;
1036
1037        let stats = metrics.get_shard_stats(5).await.unwrap();
1038        assert_eq!(stats.total_queries, 1);
1039    }
1040
1041    #[tokio::test]
1042    async fn test_classification_lookup() {
1043        let metrics = SchemaRoutingMetrics::new();
1044
1045        metrics.record_classification_lookup(true);
1046        metrics.record_classification_lookup(true);
1047        metrics.record_classification_lookup(false);
1048
1049        assert_eq!(
1050            metrics.get_routing_stats().classification_hit_rate(),
1051            2.0 / 3.0
1052        );
1053    }
1054
1055    #[tokio::test]
1056    async fn test_record_error() {
1057        let metrics = SchemaRoutingMetrics::new();
1058
1059        metrics.record_error();
1060        metrics.record_error();
1061
1062        assert_eq!(
1063            metrics
1064                .get_routing_stats()
1065                .routing_errors
1066                .load(Ordering::Relaxed),
1067            2
1068        );
1069    }
1070
1071    #[tokio::test]
1072    async fn test_reset_metrics() {
1073        let metrics = SchemaRoutingMetrics::new();
1074
1075        // Record some data
1076        metrics.record_routing(&sample_decision(), 1000).await;
1077        metrics
1078            .record_table_query("users", AccessPattern::PointLookup, WorkloadType::OLTP, 500)
1079            .await;
1080        metrics
1081            .record_ai_workload(AIWorkloadType::EmbeddingRetrieval, None, None)
1082            .await;
1083
1084        // Reset
1085        metrics.reset().await;
1086
1087        // Verify reset
1088        assert_eq!(metrics.get_routing_stats().total_queries(), 0);
1089        assert!(metrics.get_table_stats("users").await.is_none());
1090        assert_eq!(metrics.get_ai_stats().await.total_queries, 0);
1091    }
1092
1093    #[tokio::test]
1094    async fn test_generate_report() {
1095        let metrics = SchemaRoutingMetrics::new();
1096
1097        // Record various metrics
1098        for _ in 0..10 {
1099            metrics.record_routing(&sample_decision(), 1000).await;
1100        }
1101        metrics
1102            .record_table_query("users", AccessPattern::PointLookup, WorkloadType::OLTP, 500)
1103            .await;
1104        metrics
1105            .record_ai_workload(AIWorkloadType::EmbeddingRetrieval, None, None)
1106            .await;
1107        metrics.record_rag_stage(RAGStage::Retrieval, 5000).await;
1108
1109        let report = metrics.generate_report().await;
1110
1111        assert_eq!(report.total_queries, 10);
1112        assert_eq!(report.table_count, 1);
1113    }
1114
1115    #[test]
1116    fn test_routing_stats_percentages() {
1117        let stats = RoutingStats::default();
1118
1119        // Empty stats
1120        assert_eq!(stats.schema_aware_percentage(), 0.0);
1121        assert_eq!(stats.classification_hit_rate(), 0.0);
1122        assert_eq!(stats.primary_replica_ratio(), 0.0);
1123
1124        // With data
1125        stats.total_queries.store(100, Ordering::Relaxed);
1126        stats.schema_aware_routes.store(80, Ordering::Relaxed);
1127        stats.classification_hits.store(90, Ordering::Relaxed);
1128        stats.classification_misses.store(10, Ordering::Relaxed);
1129        stats.primary_routes.store(30, Ordering::Relaxed);
1130        stats.replica_routes.store(70, Ordering::Relaxed);
1131
1132        assert_eq!(stats.schema_aware_percentage(), 80.0);
1133        assert_eq!(stats.classification_hit_rate(), 0.9);
1134        assert_eq!(stats.primary_replica_ratio(), 0.3);
1135    }
1136
1137    #[test]
1138    fn test_table_stats_record() {
1139        let mut stats = TableStats::new("orders");
1140
1141        stats.record_query(AccessPattern::PointLookup, WorkloadType::OLTP, 100);
1142        stats.record_query(AccessPattern::RangeScan, WorkloadType::OLTP, 200);
1143
1144        assert_eq!(stats.total_queries, 2);
1145        assert_eq!(
1146            *stats
1147                .by_access_pattern
1148                .get(&AccessPattern::PointLookup)
1149                .unwrap(),
1150            1
1151        );
1152        assert_eq!(
1153            *stats
1154                .by_access_pattern
1155                .get(&AccessPattern::RangeScan)
1156                .unwrap(),
1157            1
1158        );
1159    }
1160
1161    #[test]
1162    fn test_workload_stats_record() {
1163        let mut stats = WorkloadStats::default();
1164
1165        stats.record(true, false, 100);
1166        stats.record(false, false, 200);
1167        stats.record(false, true, 300);
1168
1169        assert_eq!(stats.total_queries, 3);
1170        assert_eq!(stats.routed_to_primary, 1);
1171        assert_eq!(stats.routed_to_replica, 1);
1172        assert_eq!(stats.scatter_gather, 1);
1173    }
1174
1175    #[test]
1176    fn test_ai_stats_record() {
1177        let mut stats = AIWorkloadStats::default();
1178
1179        stats.record(AIWorkloadType::EmbeddingRetrieval, Some(1536), Some(10));
1180        stats.record(AIWorkloadType::KnowledgeBase, None, None);
1181
1182        assert_eq!(stats.total_queries, 2);
1183        assert_eq!(stats.embedding_retrieval, 1);
1184        assert_eq!(stats.knowledge_base, 1);
1185    }
1186
1187    #[test]
1188    fn test_rag_stats_record() {
1189        let mut stats = RAGStats::default();
1190
1191        stats.record_stage(RAGStage::Retrieval, 5000);
1192        stats.record_stage(RAGStage::Fetch, 2000);
1193        stats.record_stage(RAGStage::Rerank, 1000);
1194
1195        assert_eq!(stats.total_queries, 3);
1196        assert_eq!(stats.retrieval_count, 1);
1197        assert_eq!(stats.fetch_count, 1);
1198        assert_eq!(stats.rerank_count, 1);
1199    }
1200}