Skip to main content

heliosdb_proxy/schema_routing/
metrics.rs

1//! Schema Routing Metrics
2//!
3//! Provides metrics and statistics for schema-aware routing decisions.
4//! Tracks routing patterns, hit rates, and performance characteristics.
5
6use std::collections::HashMap;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10use tokio::sync::RwLock;
11
12use super::{
13    DataTemperature, WorkloadType, AccessPattern,
14    RoutingDecision, AIWorkloadType, RAGStage,
15};
16
17/// Schema routing metrics collector
18pub struct SchemaRoutingMetrics {
19    /// Routing statistics by table
20    table_stats: Arc<RwLock<HashMap<String, TableStats>>>,
21    /// Routing statistics by workload type
22    workload_stats: Arc<RwLock<HashMap<WorkloadType, WorkloadStats>>>,
23    /// Routing statistics by temperature
24    temperature_stats: Arc<RwLock<HashMap<DataTemperature, TemperatureStats>>>,
25    /// AI workload statistics
26    ai_stats: Arc<RwLock<AIWorkloadStats>>,
27    /// RAG pipeline statistics
28    rag_stats: Arc<RwLock<RAGStats>>,
29    /// Overall routing statistics
30    routing_stats: Arc<RoutingStats>,
31    /// Node distribution statistics
32    node_stats: Arc<RwLock<HashMap<String, NodeStats>>>,
33    /// Shard distribution statistics
34    shard_stats: Arc<RwLock<HashMap<u32, ShardStats>>>,
35    /// Start time for uptime calculation
36    start_time: Instant,
37}
38
39/// Statistics for a specific table
40#[derive(Debug, Clone, Default)]
41pub struct TableStats {
42    /// Table name
43    pub table_name: String,
44    /// Total queries routed
45    pub total_queries: u64,
46    /// Queries by access pattern
47    pub by_access_pattern: HashMap<AccessPattern, u64>,
48    /// Queries by workload type
49    pub by_workload: HashMap<WorkloadType, u64>,
50    /// Average latency in microseconds
51    pub avg_latency_us: u64,
52    /// P99 latency in microseconds
53    pub p99_latency_us: u64,
54    /// Shard hit rate (queries with shard key)
55    pub shard_hit_rate: f64,
56    /// Cache utilization
57    pub cache_hit_rate: f64,
58    /// Last query time
59    pub last_query_time: Option<Instant>,
60}
61
62impl TableStats {
63    /// Create new table stats
64    pub fn new(table_name: &str) -> Self {
65        Self {
66            table_name: table_name.to_string(),
67            ..Default::default()
68        }
69    }
70
71    /// Record a query
72    pub fn record_query(&mut self, pattern: AccessPattern, workload: WorkloadType, latency_us: u64) {
73        self.total_queries += 1;
74        *self.by_access_pattern.entry(pattern).or_insert(0) += 1;
75        *self.by_workload.entry(workload).or_insert(0) += 1;
76
77        // Update average latency (exponential moving average)
78        if self.avg_latency_us == 0 {
79            self.avg_latency_us = latency_us;
80        } else {
81            self.avg_latency_us = (self.avg_latency_us * 9 + latency_us) / 10;
82        }
83
84        self.last_query_time = Some(Instant::now());
85    }
86}
87
88/// Statistics by workload type
89#[derive(Debug, Clone, Default)]
90pub struct WorkloadStats {
91    /// Total queries for this workload
92    pub total_queries: u64,
93    /// Queries routed to primary
94    pub routed_to_primary: u64,
95    /// Queries routed to replicas
96    pub routed_to_replica: u64,
97    /// Queries scattered to all nodes
98    pub scatter_gather: u64,
99    /// Average latency in microseconds
100    pub avg_latency_us: u64,
101    /// Tables using this workload
102    pub tables: Vec<String>,
103}
104
105impl WorkloadStats {
106    /// Record a routing decision
107    pub fn record(&mut self, to_primary: bool, is_scatter: bool, latency_us: u64) {
108        self.total_queries += 1;
109
110        if is_scatter {
111            self.scatter_gather += 1;
112        } else if to_primary {
113            self.routed_to_primary += 1;
114        } else {
115            self.routed_to_replica += 1;
116        }
117
118        // Update average latency
119        if self.avg_latency_us == 0 {
120            self.avg_latency_us = latency_us;
121        } else {
122            self.avg_latency_us = (self.avg_latency_us * 9 + latency_us) / 10;
123        }
124    }
125}
126
127/// Statistics by temperature
128#[derive(Debug, Clone, Default)]
129pub struct TemperatureStats {
130    /// Total queries
131    pub total_queries: u64,
132    /// Total tables at this temperature
133    pub table_count: u64,
134    /// Total size in bytes
135    pub total_size_bytes: u64,
136    /// Cache hit rate
137    pub cache_hit_rate: f64,
138    /// Average query latency
139    pub avg_latency_us: u64,
140}
141
142/// AI workload statistics
143#[derive(Debug, Clone, Default)]
144pub struct AIWorkloadStats {
145    /// Total AI workload queries
146    pub total_queries: u64,
147    /// Queries by AI workload type
148    pub by_type: HashMap<String, u64>,
149    /// Embedding retrieval count
150    pub embedding_retrieval: u64,
151    /// Context lookup count
152    pub context_lookup: u64,
153    /// Knowledge base queries
154    pub knowledge_base: u64,
155    /// Tool execution queries
156    pub tool_execution: u64,
157    /// Average vector dimensions
158    pub avg_vector_dimensions: u64,
159    /// Average k (top-k results)
160    pub avg_top_k: u64,
161}
162
163impl AIWorkloadStats {
164    /// Record an AI workload query
165    pub fn record(&mut self, workload_type: AIWorkloadType, vector_dims: Option<u64>, top_k: Option<u64>) {
166        self.total_queries += 1;
167
168        match workload_type {
169            AIWorkloadType::EmbeddingRetrieval => {
170                self.embedding_retrieval += 1;
171                *self.by_type.entry("embedding_retrieval".to_string()).or_insert(0) += 1;
172            }
173            AIWorkloadType::ContextLookup => {
174                self.context_lookup += 1;
175                *self.by_type.entry("context_lookup".to_string()).or_insert(0) += 1;
176            }
177            AIWorkloadType::KnowledgeBase => {
178                self.knowledge_base += 1;
179                *self.by_type.entry("knowledge_base".to_string()).or_insert(0) += 1;
180            }
181            AIWorkloadType::ToolExecution => {
182                self.tool_execution += 1;
183                *self.by_type.entry("tool_execution".to_string()).or_insert(0) += 1;
184            }
185        }
186
187        if let Some(dims) = vector_dims {
188            self.avg_vector_dimensions = (self.avg_vector_dimensions * 9 + dims) / 10;
189        }
190
191        if let Some(k) = top_k {
192            self.avg_top_k = (self.avg_top_k * 9 + k) / 10;
193        }
194    }
195}
196
197/// RAG pipeline statistics
198#[derive(Debug, Clone, Default)]
199pub struct RAGStats {
200    /// Total RAG queries
201    pub total_queries: u64,
202    /// Retrieval stage count
203    pub retrieval_count: u64,
204    /// Fetch stage count
205    pub fetch_count: u64,
206    /// Rerank stage count
207    pub rerank_count: u64,
208    /// Generate stage count
209    pub generate_count: u64,
210    /// Average retrieval latency
211    pub avg_retrieval_latency_us: u64,
212    /// Average fetch latency
213    pub avg_fetch_latency_us: u64,
214    /// Average total pipeline latency
215    pub avg_pipeline_latency_us: u64,
216}
217
218impl RAGStats {
219    /// Record a RAG stage execution
220    pub fn record_stage(&mut self, stage: RAGStage, latency_us: u64) {
221        self.total_queries += 1;
222
223        match stage {
224            RAGStage::Retrieval => {
225                self.retrieval_count += 1;
226                if self.avg_retrieval_latency_us == 0 {
227                    self.avg_retrieval_latency_us = latency_us;
228                } else {
229                    self.avg_retrieval_latency_us = (self.avg_retrieval_latency_us * 9 + latency_us) / 10;
230                }
231            }
232            RAGStage::Fetch => {
233                self.fetch_count += 1;
234                if self.avg_fetch_latency_us == 0 {
235                    self.avg_fetch_latency_us = latency_us;
236                } else {
237                    self.avg_fetch_latency_us = (self.avg_fetch_latency_us * 9 + latency_us) / 10;
238                }
239            }
240            RAGStage::Rerank => {
241                self.rerank_count += 1;
242            }
243            RAGStage::Generate => {
244                self.generate_count += 1;
245            }
246        }
247    }
248}
249
250/// Overall routing statistics
251pub struct RoutingStats {
252    /// Total queries routed
253    pub total_queries: AtomicU64,
254    /// Schema-aware routing decisions
255    pub schema_aware_routes: AtomicU64,
256    /// Fallback routing decisions
257    pub fallback_routes: AtomicU64,
258    /// Shard-targeted queries
259    pub shard_targeted: AtomicU64,
260    /// Scatter-gather queries
261    pub scatter_gather: AtomicU64,
262    /// Primary routes
263    pub primary_routes: AtomicU64,
264    /// Replica routes
265    pub replica_routes: AtomicU64,
266    /// AI workload routes
267    pub ai_routes: AtomicU64,
268    /// RAG pipeline routes
269    pub rag_routes: AtomicU64,
270    /// Vector search routes
271    pub vector_routes: AtomicU64,
272    /// Classification cache hits
273    pub classification_hits: AtomicU64,
274    /// Classification cache misses
275    pub classification_misses: AtomicU64,
276    /// Routing errors
277    pub routing_errors: AtomicU64,
278}
279
280impl Default for RoutingStats {
281    fn default() -> Self {
282        Self {
283            total_queries: AtomicU64::new(0),
284            schema_aware_routes: AtomicU64::new(0),
285            fallback_routes: AtomicU64::new(0),
286            shard_targeted: AtomicU64::new(0),
287            scatter_gather: AtomicU64::new(0),
288            primary_routes: AtomicU64::new(0),
289            replica_routes: AtomicU64::new(0),
290            ai_routes: AtomicU64::new(0),
291            rag_routes: AtomicU64::new(0),
292            vector_routes: AtomicU64::new(0),
293            classification_hits: AtomicU64::new(0),
294            classification_misses: AtomicU64::new(0),
295            routing_errors: AtomicU64::new(0),
296        }
297    }
298}
299
300impl RoutingStats {
301    /// Get total queries count
302    pub fn total_queries(&self) -> u64 {
303        self.total_queries.load(Ordering::Relaxed)
304    }
305
306    /// Get schema-aware routing percentage
307    pub fn schema_aware_percentage(&self) -> f64 {
308        let total = self.total_queries.load(Ordering::Relaxed);
309        if total == 0 {
310            return 0.0;
311        }
312        let schema_aware = self.schema_aware_routes.load(Ordering::Relaxed);
313        (schema_aware as f64 / total as f64) * 100.0
314    }
315
316    /// Get classification cache hit rate
317    pub fn classification_hit_rate(&self) -> f64 {
318        let hits = self.classification_hits.load(Ordering::Relaxed);
319        let misses = self.classification_misses.load(Ordering::Relaxed);
320        let total = hits + misses;
321        if total == 0 {
322            return 0.0;
323        }
324        hits as f64 / total as f64
325    }
326
327    /// Get primary/replica distribution
328    pub fn primary_replica_ratio(&self) -> f64 {
329        let primary = self.primary_routes.load(Ordering::Relaxed);
330        let replica = self.replica_routes.load(Ordering::Relaxed);
331        let total = primary + replica;
332        if total == 0 {
333            return 0.0;
334        }
335        primary as f64 / total as f64
336    }
337}
338
339/// Node routing statistics
340#[derive(Debug, Clone, Default)]
341pub struct NodeStats {
342    /// Node identifier
343    pub node_id: String,
344    /// Total queries routed to this node
345    pub total_queries: u64,
346    /// Average latency
347    pub avg_latency_us: u64,
348    /// Error count
349    pub error_count: u64,
350    /// Load factor (0.0 - 1.0)
351    pub load_factor: f64,
352    /// Last query time
353    pub last_query_time: Option<Instant>,
354}
355
356/// Shard routing statistics
357#[derive(Debug, Clone, Default)]
358pub struct ShardStats {
359    /// Shard identifier
360    pub shard_id: u32,
361    /// Total queries
362    pub total_queries: u64,
363    /// Tables in this shard
364    pub tables: Vec<String>,
365    /// Estimated row count
366    pub estimated_rows: u64,
367    /// Size in bytes
368    pub size_bytes: u64,
369}
370
371impl SchemaRoutingMetrics {
372    /// Create a new metrics collector
373    pub fn new() -> Self {
374        Self {
375            table_stats: Arc::new(RwLock::new(HashMap::new())),
376            workload_stats: Arc::new(RwLock::new(HashMap::new())),
377            temperature_stats: Arc::new(RwLock::new(HashMap::new())),
378            ai_stats: Arc::new(RwLock::new(AIWorkloadStats::default())),
379            rag_stats: Arc::new(RwLock::new(RAGStats::default())),
380            routing_stats: Arc::new(RoutingStats::default()),
381            node_stats: Arc::new(RwLock::new(HashMap::new())),
382            shard_stats: Arc::new(RwLock::new(HashMap::new())),
383            start_time: Instant::now(),
384        }
385    }
386
387    /// Record a routing decision
388    pub async fn record_routing(&self, decision: &RoutingDecision, latency_us: u64) {
389        self.routing_stats.total_queries.fetch_add(1, Ordering::Relaxed);
390
391        match &decision.target {
392            super::RouteTarget::Primary => {
393                self.routing_stats.primary_routes.fetch_add(1, Ordering::Relaxed);
394            }
395            super::RouteTarget::Node(_) => {
396                self.routing_stats.replica_routes.fetch_add(1, Ordering::Relaxed);
397            }
398            super::RouteTarget::Shard(_) => {
399                self.routing_stats.shard_targeted.fetch_add(1, Ordering::Relaxed);
400            }
401            super::RouteTarget::ScatterGather => {
402                self.routing_stats.scatter_gather.fetch_add(1, Ordering::Relaxed);
403            }
404        }
405
406        // Update node stats if routed to specific node
407        if let super::RouteTarget::Node(node_id) = &decision.target {
408            let mut node_stats = self.node_stats.write().await;
409            let stats = node_stats.entry(node_id.clone())
410                .or_insert_with(|| NodeStats {
411                    node_id: node_id.clone(),
412                    ..Default::default()
413                });
414            stats.total_queries += 1;
415            stats.avg_latency_us = (stats.avg_latency_us * 9 + latency_us) / 10;
416            stats.last_query_time = Some(Instant::now());
417        }
418
419        // Update shard stats
420        if !decision.shards.is_empty() {
421            let mut shard_stats = self.shard_stats.write().await;
422            for shard_id in &decision.shards {
423                let stats = shard_stats.entry(*shard_id).or_default();
424                stats.shard_id = *shard_id;
425                stats.total_queries += 1;
426            }
427        }
428    }
429
430    /// Record a table query
431    pub async fn record_table_query(
432        &self,
433        table: &str,
434        pattern: AccessPattern,
435        workload: WorkloadType,
436        latency_us: u64,
437    ) {
438        let mut table_stats = self.table_stats.write().await;
439        let stats = table_stats.entry(table.to_string())
440            .or_insert_with(|| TableStats::new(table));
441        stats.record_query(pattern, workload, latency_us);
442    }
443
444    /// Record a workload routing
445    pub async fn record_workload(&self, workload: WorkloadType, to_primary: bool, is_scatter: bool, latency_us: u64) {
446        let mut workload_stats = self.workload_stats.write().await;
447        let stats = workload_stats.entry(workload).or_default();
448        stats.record(to_primary, is_scatter, latency_us);
449    }
450
451    /// Record an AI workload query
452    pub async fn record_ai_workload(&self, workload_type: AIWorkloadType, vector_dims: Option<u64>, top_k: Option<u64>) {
453        self.routing_stats.ai_routes.fetch_add(1, Ordering::Relaxed);
454        let mut ai_stats = self.ai_stats.write().await;
455        ai_stats.record(workload_type, vector_dims, top_k);
456    }
457
458    /// Record a RAG stage execution
459    pub async fn record_rag_stage(&self, stage: RAGStage, latency_us: u64) {
460        self.routing_stats.rag_routes.fetch_add(1, Ordering::Relaxed);
461        let mut rag_stats = self.rag_stats.write().await;
462        rag_stats.record_stage(stage, latency_us);
463    }
464
465    /// Record a classification cache hit or miss
466    pub fn record_classification_lookup(&self, hit: bool) {
467        if hit {
468            self.routing_stats.classification_hits.fetch_add(1, Ordering::Relaxed);
469        } else {
470            self.routing_stats.classification_misses.fetch_add(1, Ordering::Relaxed);
471        }
472    }
473
474    /// Record a routing error
475    pub fn record_error(&self) {
476        self.routing_stats.routing_errors.fetch_add(1, Ordering::Relaxed);
477    }
478
479    /// Get overall routing stats
480    pub fn get_routing_stats(&self) -> &RoutingStats {
481        &self.routing_stats
482    }
483
484    /// Get table statistics
485    pub async fn get_table_stats(&self, table: &str) -> Option<TableStats> {
486        let stats = self.table_stats.read().await;
487        stats.get(table).cloned()
488    }
489
490    /// Get all table statistics
491    pub async fn get_all_table_stats(&self) -> HashMap<String, TableStats> {
492        self.table_stats.read().await.clone()
493    }
494
495    /// Get workload statistics
496    pub async fn get_workload_stats(&self, workload: WorkloadType) -> Option<WorkloadStats> {
497        let stats = self.workload_stats.read().await;
498        stats.get(&workload).cloned()
499    }
500
501    /// Get all workload statistics
502    pub async fn get_all_workload_stats(&self) -> HashMap<WorkloadType, WorkloadStats> {
503        self.workload_stats.read().await.clone()
504    }
505
506    /// Get AI workload statistics
507    pub async fn get_ai_stats(&self) -> AIWorkloadStats {
508        self.ai_stats.read().await.clone()
509    }
510
511    /// Get RAG pipeline statistics
512    pub async fn get_rag_stats(&self) -> RAGStats {
513        self.rag_stats.read().await.clone()
514    }
515
516    /// Get node statistics
517    pub async fn get_node_stats(&self, node_id: &str) -> Option<NodeStats> {
518        let stats = self.node_stats.read().await;
519        stats.get(node_id).cloned()
520    }
521
522    /// Get all node statistics
523    pub async fn get_all_node_stats(&self) -> HashMap<String, NodeStats> {
524        self.node_stats.read().await.clone()
525    }
526
527    /// Get shard statistics
528    pub async fn get_shard_stats(&self, shard_id: u32) -> Option<ShardStats> {
529        let stats = self.shard_stats.read().await;
530        stats.get(&shard_id).cloned()
531    }
532
533    /// Get uptime
534    pub fn uptime(&self) -> Duration {
535        self.start_time.elapsed()
536    }
537
538    /// Reset all metrics
539    pub async fn reset(&self) {
540        self.table_stats.write().await.clear();
541        self.workload_stats.write().await.clear();
542        self.temperature_stats.write().await.clear();
543        *self.ai_stats.write().await = AIWorkloadStats::default();
544        *self.rag_stats.write().await = RAGStats::default();
545        self.node_stats.write().await.clear();
546        self.shard_stats.write().await.clear();
547
548        // Reset atomic counters
549        self.routing_stats.total_queries.store(0, Ordering::Relaxed);
550        self.routing_stats.schema_aware_routes.store(0, Ordering::Relaxed);
551        self.routing_stats.fallback_routes.store(0, Ordering::Relaxed);
552        self.routing_stats.shard_targeted.store(0, Ordering::Relaxed);
553        self.routing_stats.scatter_gather.store(0, Ordering::Relaxed);
554        self.routing_stats.primary_routes.store(0, Ordering::Relaxed);
555        self.routing_stats.replica_routes.store(0, Ordering::Relaxed);
556        self.routing_stats.ai_routes.store(0, Ordering::Relaxed);
557        self.routing_stats.rag_routes.store(0, Ordering::Relaxed);
558        self.routing_stats.vector_routes.store(0, Ordering::Relaxed);
559        self.routing_stats.classification_hits.store(0, Ordering::Relaxed);
560        self.routing_stats.classification_misses.store(0, Ordering::Relaxed);
561        self.routing_stats.routing_errors.store(0, Ordering::Relaxed);
562    }
563
564    /// Generate metrics report
565    pub async fn generate_report(&self) -> MetricsReport {
566        let routing = self.get_routing_stats();
567        let tables = self.get_all_table_stats().await;
568        let _workloads = self.get_all_workload_stats().await;
569        let ai = self.get_ai_stats().await;
570        let rag = self.get_rag_stats().await;
571        let nodes = self.get_all_node_stats().await;
572
573        MetricsReport {
574            uptime: self.uptime(),
575            total_queries: routing.total_queries(),
576            schema_aware_percentage: routing.schema_aware_percentage(),
577            classification_hit_rate: routing.classification_hit_rate(),
578            primary_replica_ratio: routing.primary_replica_ratio(),
579            table_count: tables.len(),
580            active_nodes: nodes.len(),
581            ai_query_percentage: if routing.total_queries() == 0 {
582                0.0
583            } else {
584                (ai.total_queries as f64 / routing.total_queries() as f64) * 100.0
585            },
586            rag_query_percentage: if routing.total_queries() == 0 {
587                0.0
588            } else {
589                (rag.total_queries as f64 / routing.total_queries() as f64) * 100.0
590            },
591            error_rate: if routing.total_queries() == 0 {
592                0.0
593            } else {
594                (routing.routing_errors.load(Ordering::Relaxed) as f64 / routing.total_queries() as f64) * 100.0
595            },
596        }
597    }
598}
599
600impl Default for SchemaRoutingMetrics {
601    fn default() -> Self {
602        Self::new()
603    }
604}
605
606impl SchemaRoutingMetrics {
607    /// Get table stats for admin API (blocking sync version)
608    pub fn get_table_stats_for_admin(&self) -> Vec<(String, TableStatsForAdmin)> {
609        // Use futures to block on the async lock
610        let table_stats = self.table_stats.clone();
611        let handle = tokio::runtime::Handle::try_current();
612        match handle {
613            Ok(h) => {
614                let stats = h.block_on(async {
615                    let guard = table_stats.read().await;
616                    guard.iter().map(|(name, stats)| {
617                        (name.clone(), TableStatsForAdmin {
618                            query_count: stats.total_queries,
619                            avg_latency_ms: stats.avg_latency_us as f64 / 1000.0,
620                            cache_hit_rate: stats.cache_hit_rate,
621                            temperature: infer_temperature_from_count(stats.total_queries),
622                            workload: infer_workload_from_stats(stats),
623                        })
624                    }).collect::<Vec<_>>()
625                });
626                stats
627            }
628            Err(_) => Vec::new(),
629        }
630    }
631
632    /// Get workload stats for admin API (blocking sync version)
633    pub fn get_workload_stats_for_admin(&self) -> Vec<(WorkloadType, WorkloadStatsForAdmin)> {
634        let workload_stats = self.workload_stats.clone();
635        let handle = tokio::runtime::Handle::try_current();
636        match handle {
637            Ok(h) => {
638                h.block_on(async {
639                    let guard = workload_stats.read().await;
640                    guard.iter().map(|(workload, stats)| {
641                        (*workload, WorkloadStatsForAdmin {
642                            query_count: stats.total_queries,
643                            avg_latency_ms: stats.avg_latency_us as f64 / 1000.0,
644                            queries_to_primary: stats.routed_to_primary,
645                            queries_to_replica: stats.routed_to_replica,
646                        })
647                    }).collect::<Vec<_>>()
648                })
649            }
650            Err(_) => Vec::new(),
651        }
652    }
653
654    /// Get AI workload stats for admin API (blocking sync version)
655    pub fn get_ai_workload_stats(&self) -> AIWorkloadStatsForAdmin {
656        let ai_stats = self.ai_stats.clone();
657        let handle = tokio::runtime::Handle::try_current();
658        match handle {
659            Ok(h) => {
660                h.block_on(async {
661                    let guard = ai_stats.read().await;
662                    AIWorkloadStatsForAdmin {
663                        embedding_retrieval_count: guard.embedding_retrieval,
664                        context_lookup_count: guard.context_lookup,
665                        knowledge_base_count: guard.knowledge_base,
666                        tool_execution_count: guard.tool_execution,
667                        avg_vector_dimensions: guard.avg_vector_dimensions as f64,
668                    }
669                })
670            }
671            Err(_) => AIWorkloadStatsForAdmin::default(),
672        }
673    }
674
675    /// Get RAG stats for admin API (blocking sync version)
676    pub fn get_rag_stats_for_admin(&self) -> RAGStatsForAdmin {
677        let rag_stats = self.rag_stats.clone();
678        let handle = tokio::runtime::Handle::try_current();
679        match handle {
680            Ok(h) => {
681                h.block_on(async {
682                    let guard = rag_stats.read().await;
683                    RAGStatsForAdmin {
684                        retrieval_count: guard.retrieval_count,
685                        avg_retrieval_latency_ms: guard.avg_retrieval_latency_us as f64 / 1000.0,
686                        fetch_count: guard.fetch_count,
687                        avg_fetch_latency_ms: guard.avg_fetch_latency_us as f64 / 1000.0,
688                        total_pipeline_executions: guard.total_queries,
689                        avg_total_latency_ms: guard.avg_pipeline_latency_us as f64 / 1000.0,
690                    }
691                })
692            }
693            Err(_) => RAGStatsForAdmin::default(),
694        }
695    }
696
697}
698
699/// Infer temperature from query count
700fn infer_temperature_from_count(query_count: u64) -> DataTemperature {
701    if query_count > 10000 {
702        DataTemperature::Hot
703    } else if query_count > 1000 {
704        DataTemperature::Warm
705    } else if query_count > 100 {
706        DataTemperature::Cold
707    } else {
708        DataTemperature::Frozen
709    }
710}
711
712/// Infer workload from stats
713fn infer_workload_from_stats(stats: &TableStats) -> WorkloadType {
714    let olap_count = stats.by_access_pattern.get(&AccessPattern::FullScan).copied().unwrap_or(0);
715    let vector_count = stats.by_access_pattern.get(&AccessPattern::VectorSearch).copied().unwrap_or(0);
716    let point_count = stats.by_access_pattern.get(&AccessPattern::PointLookup).copied().unwrap_or(0);
717
718    if vector_count > stats.total_queries / 3 {
719        WorkloadType::Vector
720    } else if olap_count > stats.total_queries / 2 {
721        WorkloadType::OLAP
722    } else if point_count > stats.total_queries / 2 {
723        WorkloadType::OLTP
724    } else {
725        WorkloadType::Mixed
726    }
727}
728
729/// Table stats for admin API
730#[derive(Debug, Clone)]
731pub struct TableStatsForAdmin {
732    pub query_count: u64,
733    pub avg_latency_ms: f64,
734    pub cache_hit_rate: f64,
735    pub temperature: DataTemperature,
736    pub workload: WorkloadType,
737}
738
739/// Workload stats for admin API
740#[derive(Debug, Clone)]
741pub struct WorkloadStatsForAdmin {
742    pub query_count: u64,
743    pub avg_latency_ms: f64,
744    pub queries_to_primary: u64,
745    pub queries_to_replica: u64,
746}
747
748/// AI workload stats for admin API
749#[derive(Debug, Clone, Default)]
750pub struct AIWorkloadStatsForAdmin {
751    pub embedding_retrieval_count: u64,
752    pub context_lookup_count: u64,
753    pub knowledge_base_count: u64,
754    pub tool_execution_count: u64,
755    pub avg_vector_dimensions: f64,
756}
757
758impl AIWorkloadStatsForAdmin {
759    /// Get total AI queries
760    pub fn total_ai_queries(&self) -> u64 {
761        self.embedding_retrieval_count + self.context_lookup_count +
762            self.knowledge_base_count + self.tool_execution_count
763    }
764}
765
766/// RAG stats for admin API
767#[derive(Debug, Clone, Default)]
768pub struct RAGStatsForAdmin {
769    pub retrieval_count: u64,
770    pub avg_retrieval_latency_ms: f64,
771    pub fetch_count: u64,
772    pub avg_fetch_latency_ms: f64,
773    pub total_pipeline_executions: u64,
774    pub avg_total_latency_ms: f64,
775}
776
777/// Summary metrics report
778#[derive(Debug, Clone)]
779pub struct MetricsReport {
780    /// Uptime duration
781    pub uptime: Duration,
782    /// Total queries routed
783    pub total_queries: u64,
784    /// Percentage of schema-aware routes
785    pub schema_aware_percentage: f64,
786    /// Classification cache hit rate
787    pub classification_hit_rate: f64,
788    /// Primary to replica ratio
789    pub primary_replica_ratio: f64,
790    /// Number of tracked tables
791    pub table_count: usize,
792    /// Number of active nodes
793    pub active_nodes: usize,
794    /// AI query percentage
795    pub ai_query_percentage: f64,
796    /// RAG query percentage
797    pub rag_query_percentage: f64,
798    /// Error rate
799    pub error_rate: f64,
800}
801
802#[cfg(test)]
803mod tests {
804    use super::*;
805    use super::super::{RouteTarget, RoutingReason};
806
807    fn sample_decision() -> RoutingDecision {
808        RoutingDecision {
809            target: RouteTarget::Primary,
810            reason: RoutingReason::WriteQuery,
811            shards: vec![],
812            branch: None,
813            node_info: None,
814        }
815    }
816
817    #[tokio::test]
818    async fn test_metrics_new() {
819        let metrics = SchemaRoutingMetrics::new();
820        assert_eq!(metrics.get_routing_stats().total_queries(), 0);
821    }
822
823    #[tokio::test]
824    async fn test_record_routing() {
825        let metrics = SchemaRoutingMetrics::new();
826        let decision = sample_decision();
827
828        metrics.record_routing(&decision, 1000).await;
829
830        assert_eq!(metrics.get_routing_stats().total_queries(), 1);
831        assert_eq!(metrics.get_routing_stats().primary_routes.load(Ordering::Relaxed), 1);
832    }
833
834    #[tokio::test]
835    async fn test_record_table_query() {
836        let metrics = SchemaRoutingMetrics::new();
837
838        metrics.record_table_query("users", AccessPattern::PointLookup, WorkloadType::OLTP, 500).await;
839        metrics.record_table_query("users", AccessPattern::PointLookup, WorkloadType::OLTP, 600).await;
840
841        let stats = metrics.get_table_stats("users").await.unwrap();
842        assert_eq!(stats.total_queries, 2);
843        assert_eq!(*stats.by_access_pattern.get(&AccessPattern::PointLookup).unwrap(), 2);
844    }
845
846    #[tokio::test]
847    async fn test_record_workload() {
848        let metrics = SchemaRoutingMetrics::new();
849
850        metrics.record_workload(WorkloadType::OLTP, true, false, 100).await;
851        metrics.record_workload(WorkloadType::OLTP, false, false, 200).await;
852
853        let stats = metrics.get_workload_stats(WorkloadType::OLTP).await.unwrap();
854        assert_eq!(stats.total_queries, 2);
855        assert_eq!(stats.routed_to_primary, 1);
856        assert_eq!(stats.routed_to_replica, 1);
857    }
858
859    #[tokio::test]
860    async fn test_record_ai_workload() {
861        let metrics = SchemaRoutingMetrics::new();
862
863        metrics.record_ai_workload(AIWorkloadType::EmbeddingRetrieval, Some(1536), Some(10)).await;
864        metrics.record_ai_workload(AIWorkloadType::ContextLookup, None, None).await;
865
866        let stats = metrics.get_ai_stats().await;
867        assert_eq!(stats.total_queries, 2);
868        assert_eq!(stats.embedding_retrieval, 1);
869        assert_eq!(stats.context_lookup, 1);
870    }
871
872    #[tokio::test]
873    async fn test_record_rag_stage() {
874        let metrics = SchemaRoutingMetrics::new();
875
876        metrics.record_rag_stage(RAGStage::Retrieval, 5000).await;
877        metrics.record_rag_stage(RAGStage::Fetch, 2000).await;
878
879        let stats = metrics.get_rag_stats().await;
880        assert_eq!(stats.total_queries, 2);
881        assert_eq!(stats.retrieval_count, 1);
882        assert_eq!(stats.fetch_count, 1);
883    }
884
885    #[tokio::test]
886    async fn test_record_node_stats() {
887        let metrics = SchemaRoutingMetrics::new();
888
889        let decision = RoutingDecision {
890            target: RouteTarget::Node("node1".to_string()),
891            reason: RoutingReason::LowLatency,
892            shards: vec![],
893            branch: None,
894            node_info: None,
895        };
896
897        metrics.record_routing(&decision, 1000).await;
898        metrics.record_routing(&decision, 2000).await;
899
900        let stats = metrics.get_node_stats("node1").await.unwrap();
901        assert_eq!(stats.total_queries, 2);
902    }
903
904    #[tokio::test]
905    async fn test_record_shard_stats() {
906        let metrics = SchemaRoutingMetrics::new();
907
908        let decision = RoutingDecision {
909            target: RouteTarget::Shard(5),
910            reason: RoutingReason::ShardKey,
911            shards: vec![5],
912            branch: None,
913            node_info: None,
914        };
915
916        metrics.record_routing(&decision, 1000).await;
917
918        let stats = metrics.get_shard_stats(5).await.unwrap();
919        assert_eq!(stats.total_queries, 1);
920    }
921
922    #[tokio::test]
923    async fn test_classification_lookup() {
924        let metrics = SchemaRoutingMetrics::new();
925
926        metrics.record_classification_lookup(true);
927        metrics.record_classification_lookup(true);
928        metrics.record_classification_lookup(false);
929
930        assert_eq!(metrics.get_routing_stats().classification_hit_rate(), 2.0 / 3.0);
931    }
932
933    #[tokio::test]
934    async fn test_record_error() {
935        let metrics = SchemaRoutingMetrics::new();
936
937        metrics.record_error();
938        metrics.record_error();
939
940        assert_eq!(metrics.get_routing_stats().routing_errors.load(Ordering::Relaxed), 2);
941    }
942
943    #[tokio::test]
944    async fn test_reset_metrics() {
945        let metrics = SchemaRoutingMetrics::new();
946
947        // Record some data
948        metrics.record_routing(&sample_decision(), 1000).await;
949        metrics.record_table_query("users", AccessPattern::PointLookup, WorkloadType::OLTP, 500).await;
950        metrics.record_ai_workload(AIWorkloadType::EmbeddingRetrieval, None, None).await;
951
952        // Reset
953        metrics.reset().await;
954
955        // Verify reset
956        assert_eq!(metrics.get_routing_stats().total_queries(), 0);
957        assert!(metrics.get_table_stats("users").await.is_none());
958        assert_eq!(metrics.get_ai_stats().await.total_queries, 0);
959    }
960
961    #[tokio::test]
962    async fn test_generate_report() {
963        let metrics = SchemaRoutingMetrics::new();
964
965        // Record various metrics
966        for _ in 0..10 {
967            metrics.record_routing(&sample_decision(), 1000).await;
968        }
969        metrics.record_table_query("users", AccessPattern::PointLookup, WorkloadType::OLTP, 500).await;
970        metrics.record_ai_workload(AIWorkloadType::EmbeddingRetrieval, None, None).await;
971        metrics.record_rag_stage(RAGStage::Retrieval, 5000).await;
972
973        let report = metrics.generate_report().await;
974
975        assert_eq!(report.total_queries, 10);
976        assert_eq!(report.table_count, 1);
977    }
978
979    #[test]
980    fn test_routing_stats_percentages() {
981        let stats = RoutingStats::default();
982
983        // Empty stats
984        assert_eq!(stats.schema_aware_percentage(), 0.0);
985        assert_eq!(stats.classification_hit_rate(), 0.0);
986        assert_eq!(stats.primary_replica_ratio(), 0.0);
987
988        // With data
989        stats.total_queries.store(100, Ordering::Relaxed);
990        stats.schema_aware_routes.store(80, Ordering::Relaxed);
991        stats.classification_hits.store(90, Ordering::Relaxed);
992        stats.classification_misses.store(10, Ordering::Relaxed);
993        stats.primary_routes.store(30, Ordering::Relaxed);
994        stats.replica_routes.store(70, Ordering::Relaxed);
995
996        assert_eq!(stats.schema_aware_percentage(), 80.0);
997        assert_eq!(stats.classification_hit_rate(), 0.9);
998        assert_eq!(stats.primary_replica_ratio(), 0.3);
999    }
1000
1001    #[test]
1002    fn test_table_stats_record() {
1003        let mut stats = TableStats::new("orders");
1004
1005        stats.record_query(AccessPattern::PointLookup, WorkloadType::OLTP, 100);
1006        stats.record_query(AccessPattern::RangeScan, WorkloadType::OLTP, 200);
1007
1008        assert_eq!(stats.total_queries, 2);
1009        assert_eq!(*stats.by_access_pattern.get(&AccessPattern::PointLookup).unwrap(), 1);
1010        assert_eq!(*stats.by_access_pattern.get(&AccessPattern::RangeScan).unwrap(), 1);
1011    }
1012
1013    #[test]
1014    fn test_workload_stats_record() {
1015        let mut stats = WorkloadStats::default();
1016
1017        stats.record(true, false, 100);
1018        stats.record(false, false, 200);
1019        stats.record(false, true, 300);
1020
1021        assert_eq!(stats.total_queries, 3);
1022        assert_eq!(stats.routed_to_primary, 1);
1023        assert_eq!(stats.routed_to_replica, 1);
1024        assert_eq!(stats.scatter_gather, 1);
1025    }
1026
1027    #[test]
1028    fn test_ai_stats_record() {
1029        let mut stats = AIWorkloadStats::default();
1030
1031        stats.record(AIWorkloadType::EmbeddingRetrieval, Some(1536), Some(10));
1032        stats.record(AIWorkloadType::KnowledgeBase, None, None);
1033
1034        assert_eq!(stats.total_queries, 2);
1035        assert_eq!(stats.embedding_retrieval, 1);
1036        assert_eq!(stats.knowledge_base, 1);
1037    }
1038
1039    #[test]
1040    fn test_rag_stats_record() {
1041        let mut stats = RAGStats::default();
1042
1043        stats.record_stage(RAGStage::Retrieval, 5000);
1044        stats.record_stage(RAGStage::Fetch, 2000);
1045        stats.record_stage(RAGStage::Rerank, 1000);
1046
1047        assert_eq!(stats.total_queries, 3);
1048        assert_eq!(stats.retrieval_count, 1);
1049        assert_eq!(stats.fetch_count, 1);
1050        assert_eq!(stats.rerank_count, 1);
1051    }
1052}