1use std::sync::Arc;
6use std::sync::atomic::Ordering;
7use serde::{Deserialize, Serialize};
8
9use super::{
10 SchemaRegistry, LearningClassifier, SchemaDiscovery,
11 DataTemperature, WorkloadType, AccessPattern,
12 SchemaRoutingMetrics,
13};
14use super::registry::{TableSchema, ColumnSchema, StorageType};
15use super::router::SchemaAwareRouter;
16
17pub struct SchemaRoutingAdmin {
19 pub registry: Arc<SchemaRegistry>,
20 pub router: Arc<SchemaAwareRouter>,
21 pub classifier: Arc<LearningClassifier>,
22 pub discovery: Arc<SchemaDiscovery>,
23 pub metrics: Arc<SchemaRoutingMetrics>,
24}
25
26impl SchemaRoutingAdmin {
27 pub fn new(
29 registry: Arc<SchemaRegistry>,
30 router: Arc<SchemaAwareRouter>,
31 classifier: Arc<LearningClassifier>,
32 discovery: Arc<SchemaDiscovery>,
33 metrics: Arc<SchemaRoutingMetrics>,
34 ) -> Self {
35 Self {
36 registry,
37 router,
38 classifier,
39 discovery,
40 metrics,
41 }
42 }
43
44 pub fn list_tables(&self) -> TablesResponse {
50 let tables = self.registry.list_tables();
51 TablesResponse {
52 tables: tables.into_iter().map(|t| TableSummary {
53 name: t.name.clone(),
54 temperature: format!("{:?}", t.temperature),
55 workload: format!("{:?}", t.workload),
56 access_pattern: format!("{:?}", t.access_pattern),
57 column_count: t.columns.len(),
58 shard_key: t.shard_key.clone(),
59 row_count_estimate: Some(t.estimated_rows),
60 }).collect(),
61 total: self.registry.table_count(),
62 }
63 }
64
65 pub fn get_table(&self, name: &str) -> Option<TableDetails> {
67 self.registry.get_table(name).map(|t| TableDetails {
68 name: t.name.clone(),
69 columns: t.columns.iter().map(|c| ColumnDetails {
70 name: c.name.clone(),
71 data_type: c.data_type.clone(),
72 nullable: c.nullable,
73 is_primary_key: c.is_primary_key,
74 is_indexed: c.is_indexed,
75 default_value: None, storage_type: Some(format!("{:?}", c.storage_type)),
77 }).collect(),
78 temperature: format!("{:?}", t.temperature),
79 workload: format!("{:?}", t.workload),
80 access_pattern: format!("{:?}", t.access_pattern),
81 primary_key: t.primary_key.clone(),
82 shard_key: t.shard_key.clone(),
83 row_count_estimate: Some(t.estimated_rows),
84 size_bytes: Some(t.avg_row_size as u64 * t.estimated_rows),
85 partition_key: t.partition_key.as_ref().map(|p| format!("{:?}", p)),
86 })
87 }
88
89 pub fn register_table(&self, request: RegisterTableRequest) -> Result<TableDetails, AdminError> {
91 let temperature = DataTemperature::from_str(&request.temperature)
92 .ok_or_else(|| AdminError::InvalidInput(format!("Invalid temperature: {}", request.temperature)))?;
93
94 let workload = WorkloadType::from_str(&request.workload)
95 .ok_or_else(|| AdminError::InvalidInput(format!("Invalid workload: {}", request.workload)))?;
96
97 let access_pattern = parse_access_pattern(&request.access_pattern)
98 .ok_or_else(|| AdminError::InvalidInput(format!("Invalid access pattern: {}", request.access_pattern)))?;
99
100 let columns: Vec<ColumnSchema> = request.columns.iter().map(|c| ColumnSchema {
101 name: c.name.clone(),
102 data_type: c.data_type.clone(),
103 nullable: c.nullable,
104 is_primary_key: c.is_primary_key,
105 is_indexed: c.is_indexed.unwrap_or(false),
106 storage_type: StorageType::Row,
107 }).collect();
108
109 let table = TableSchema {
110 name: request.name.clone(),
111 columns,
112 access_pattern,
113 temperature,
114 workload,
115 primary_key: request.primary_key.clone(),
116 shard_key: request.shard_key.clone(),
117 estimated_rows: request.row_count_estimate.unwrap_or(0),
118 avg_row_size: 0,
119 partition_key: None,
120 preferred_nodes: Vec::new(),
121 };
122
123 self.registry.register_table(table);
124
125 self.get_table(&request.name)
126 .ok_or_else(|| AdminError::InternalError("Failed to register table".to_string()))
127 }
128
129 pub fn remove_table(&self, name: &str) -> Result<(), AdminError> {
131 if self.registry.get_table(name).is_none() {
132 return Err(AdminError::NotFound(format!("Table not found: {}", name)));
133 }
134 self.registry.remove_table(name);
135 Ok(())
136 }
137
138 pub fn classify_table(&self, request: ClassifyRequest) -> Result<ClassificationResult, AdminError> {
144 let temperature = DataTemperature::from_str(&request.temperature)
145 .ok_or_else(|| AdminError::InvalidInput(format!("Invalid temperature: {}", request.temperature)))?;
146
147 let workload = WorkloadType::from_str(&request.workload)
148 .ok_or_else(|| AdminError::InvalidInput(format!("Invalid workload: {}", request.workload)))?;
149
150 let mut table = self.registry.get_table(&request.table_name)
152 .ok_or_else(|| AdminError::NotFound(format!("Table not found: {}", request.table_name)))?;
153
154 let old_temperature = table.temperature.clone();
156 let old_workload = table.workload.clone();
157
158 table.temperature = temperature.clone();
159 table.workload = workload.clone();
160
161 self.registry.register_table(table);
163
164 Ok(ClassificationResult {
165 table_name: request.table_name,
166 previous_temperature: format!("{:?}", old_temperature),
167 new_temperature: format!("{:?}", temperature),
168 previous_workload: format!("{:?}", old_workload),
169 new_workload: format!("{:?}", workload),
170 })
171 }
172
173 pub fn get_classification_suggestion(&self, table_name: &str) -> Result<ClassificationSuggestion, AdminError> {
175 let history = self.classifier.get_history(table_name);
177
178 if history.is_none() {
179 return Err(AdminError::NotFound(format!("No query history for table: {}", table_name)));
180 }
181
182 let hist = history.expect("history checked above");
183 let query_count = hist.count();
184 let suggested_temp = self.classifier.suggest_temperature(table_name);
185 let suggested_workload = self.classifier.suggest_workload(table_name);
186 let confidence = self.classifier.get_confidence(table_name);
187
188 Ok(ClassificationSuggestion {
189 table_name: table_name.to_string(),
190 query_count,
191 suggested_temperature: suggested_temp.map(|t| format!("{:?}", t)),
192 suggested_workload: suggested_workload.map(|w| format!("{:?}", w)),
193 confidence: confidence.unwrap_or(0.0),
194 sample_size_sufficient: query_count >= 100,
195 })
196 }
197
198 pub fn analyze_query(&self, request: AnalyzeRequest) -> AnalysisResult {
204 use super::QueryAnalyzer;
205
206 let query = request.query;
207 let analyzer = QueryAnalyzer::new(self.registry.clone());
208 let analysis = analyzer.analyze(&query);
209
210 let access_pattern = analysis.access_patterns.first()
212 .map(|p| format!("{:?}", p))
213 .unwrap_or_else(|| "Mixed".to_string());
214
215 let detected_workload = self.classifier.classify_query(&query)
216 .map(|w| format!("{:?}", w));
217
218 AnalysisResult {
219 query,
220 tables: analysis.tables.iter().map(|t| t.name.clone()).collect(),
221 access_pattern,
222 shard_keys: analysis.shard_keys.iter().map(|(k, v)| format!("{}={:?}", k, v)).collect(),
223 is_read_only: analysis.is_read_only,
224 estimated_complexity: analysis.complexity,
225 estimated_selectivity: analysis.selectivity,
226 has_aggregation: analysis.has_aggregations,
227 has_join: analysis.has_joins,
228 has_subquery: analysis.has_subqueries,
229 columns: Vec::new(), detected_workload,
231 }
232 }
233
234 pub fn route_query(&self, request: RouteRequest) -> RouteResult {
236 let decision = self.router.route(&request.query);
237
238 RouteResult {
239 query: request.query,
240 target_type: format!("{:?}", decision.target),
241 reason: format!("{:?}", decision.reason),
242 preferred_node: decision.node_info.as_ref().map(|n| n.name.clone()),
243 alternative_nodes: Vec::new(), estimated_latency_ms: decision.node_info.as_ref().map(|n| n.current_latency_ms),
245 }
246 }
247
248 pub fn get_stats(&self) -> RoutingStatsResponse {
254 let stats = self.metrics.get_routing_stats();
255
256 RoutingStatsResponse {
257 total_queries_routed: stats.total_queries.load(Ordering::Relaxed),
258 queries_to_primary: stats.primary_routes.load(Ordering::Relaxed),
259 queries_to_replica: stats.replica_routes.load(Ordering::Relaxed),
260 queries_scattered: stats.scatter_gather.load(Ordering::Relaxed),
261 avg_latency_ms: 0.0, cache_hit_rate: stats.classification_hit_rate(),
263 }
264 }
265
266 pub fn get_table_stats(&self) -> Vec<TableStatsResponse> {
268 let stats = self.metrics.get_table_stats_for_admin();
269
270 stats.into_iter().map(|(name, s)| TableStatsResponse {
271 table_name: name,
272 query_count: s.query_count,
273 avg_latency_ms: s.avg_latency_ms,
274 hit_rate: s.cache_hit_rate,
275 temperature: format!("{:?}", s.temperature),
276 workload: format!("{:?}", s.workload),
277 }).collect()
278 }
279
280 pub fn get_workload_stats(&self) -> Vec<WorkloadStatsResponse> {
282 let stats = self.metrics.get_workload_stats_for_admin();
283
284 stats.into_iter().map(|(workload, s)| WorkloadStatsResponse {
285 workload: format!("{:?}", workload),
286 query_count: s.query_count,
287 avg_latency_ms: s.avg_latency_ms,
288 queries_to_primary: s.queries_to_primary,
289 queries_to_replica: s.queries_to_replica,
290 }).collect()
291 }
292
293 pub async fn trigger_discovery(&self) -> Result<DiscoveryResult, AdminError> {
299 let tables = self.discovery.discover().await
300 .map_err(|e| AdminError::DiscoveryError(e.to_string()))?;
301
302 for table in &tables {
304 self.registry.register_table(table.clone());
305 }
306
307 Ok(DiscoveryResult {
308 tables_discovered: tables.len(),
309 table_names: tables.iter().map(|t| t.name.clone()).collect(),
310 })
311 }
312
313 pub async fn refresh_schema(&self) -> Result<RefreshResult, AdminError> {
315 self.discovery.refresh().await
316 .map_err(|e| AdminError::DiscoveryError(e.to_string()))?;
317
318 Ok(RefreshResult {
319 success: true,
320 message: "Schema cache refreshed successfully".to_string(),
321 })
322 }
323
324 pub fn get_ai_workload_stats(&self) -> AIWorkloadStatsResponse {
330 let stats = self.metrics.get_ai_workload_stats();
331
332 AIWorkloadStatsResponse {
333 embedding_queries: stats.embedding_retrieval_count,
334 context_lookups: stats.context_lookup_count,
335 knowledge_base_queries: stats.knowledge_base_count,
336 tool_executions: stats.tool_execution_count,
337 total_ai_queries: stats.total_ai_queries(),
338 avg_vector_dimensions: stats.avg_vector_dimensions,
339 }
340 }
341
342 pub fn get_rag_stats(&self) -> RAGStatsResponse {
344 let stats = self.metrics.get_rag_stats_for_admin();
345
346 RAGStatsResponse {
347 retrieval_count: stats.retrieval_count,
348 avg_retrieval_latency_ms: stats.avg_retrieval_latency_ms,
349 fetch_count: stats.fetch_count,
350 avg_fetch_latency_ms: stats.avg_fetch_latency_ms,
351 total_pipeline_executions: stats.total_pipeline_executions,
352 avg_total_latency_ms: stats.avg_total_latency_ms,
353 }
354 }
355}
356
357#[derive(Debug, Serialize)]
362pub struct TablesResponse {
363 pub tables: Vec<TableSummary>,
364 pub total: usize,
365}
366
367#[derive(Debug, Serialize)]
368pub struct TableSummary {
369 pub name: String,
370 pub temperature: String,
371 pub workload: String,
372 pub access_pattern: String,
373 pub column_count: usize,
374 pub shard_key: Option<String>,
375 pub row_count_estimate: Option<u64>,
376}
377
378#[derive(Debug, Serialize)]
379pub struct TableDetails {
380 pub name: String,
381 pub columns: Vec<ColumnDetails>,
382 pub temperature: String,
383 pub workload: String,
384 pub access_pattern: String,
385 pub primary_key: Vec<String>,
386 pub shard_key: Option<String>,
387 pub row_count_estimate: Option<u64>,
388 pub size_bytes: Option<u64>,
389 pub partition_key: Option<String>,
390}
391
392#[derive(Debug, Serialize)]
393pub struct ColumnDetails {
394 pub name: String,
395 pub data_type: String,
396 pub nullable: bool,
397 pub is_primary_key: bool,
398 pub is_indexed: bool,
399 pub default_value: Option<String>,
400 pub storage_type: Option<String>,
401}
402
403#[derive(Debug, Deserialize)]
404pub struct RegisterTableRequest {
405 pub name: String,
406 pub columns: Vec<ColumnRequest>,
407 pub temperature: String,
408 pub workload: String,
409 pub access_pattern: String,
410 pub primary_key: Vec<String>,
411 pub shard_key: Option<String>,
412 pub row_count_estimate: Option<u64>,
413}
414
415#[derive(Debug, Deserialize)]
416pub struct ColumnRequest {
417 pub name: String,
418 pub data_type: String,
419 pub nullable: bool,
420 pub is_primary_key: bool,
421 pub is_indexed: Option<bool>,
422 pub default_value: Option<String>,
423}
424
425#[derive(Debug, Deserialize)]
426pub struct ClassifyRequest {
427 pub table_name: String,
428 pub temperature: String,
429 pub workload: String,
430}
431
432#[derive(Debug, Serialize)]
433pub struct ClassificationResult {
434 pub table_name: String,
435 pub previous_temperature: String,
436 pub new_temperature: String,
437 pub previous_workload: String,
438 pub new_workload: String,
439}
440
441#[derive(Debug, Serialize)]
442pub struct ClassificationSuggestion {
443 pub table_name: String,
444 pub query_count: u64,
445 pub suggested_temperature: Option<String>,
446 pub suggested_workload: Option<String>,
447 pub confidence: f64,
448 pub sample_size_sufficient: bool,
449}
450
451#[derive(Debug, Deserialize)]
452pub struct AnalyzeRequest {
453 pub query: String,
454}
455
456#[derive(Debug, Serialize)]
457pub struct AnalysisResult {
458 pub query: String,
459 pub tables: Vec<String>,
460 pub access_pattern: String,
461 pub shard_keys: Vec<String>,
462 pub is_read_only: bool,
463 pub estimated_complexity: u32,
464 pub estimated_selectivity: f64,
465 pub has_aggregation: bool,
466 pub has_join: bool,
467 pub has_subquery: bool,
468 pub columns: Vec<String>,
469 pub detected_workload: Option<String>,
470}
471
472#[derive(Debug, Deserialize)]
473pub struct RouteRequest {
474 pub query: String,
475}
476
477#[derive(Debug, Serialize)]
478pub struct RouteResult {
479 pub query: String,
480 pub target_type: String,
481 pub reason: String,
482 pub preferred_node: Option<String>,
483 pub alternative_nodes: Vec<String>,
484 pub estimated_latency_ms: Option<u64>,
485}
486
487#[derive(Debug, Serialize)]
488pub struct RoutingStatsResponse {
489 pub total_queries_routed: u64,
490 pub queries_to_primary: u64,
491 pub queries_to_replica: u64,
492 pub queries_scattered: u64,
493 pub avg_latency_ms: f64,
494 pub cache_hit_rate: f64,
495}
496
497#[derive(Debug, Serialize)]
498pub struct TableStatsResponse {
499 pub table_name: String,
500 pub query_count: u64,
501 pub avg_latency_ms: f64,
502 pub hit_rate: f64,
503 pub temperature: String,
504 pub workload: String,
505}
506
507#[derive(Debug, Serialize)]
508pub struct WorkloadStatsResponse {
509 pub workload: String,
510 pub query_count: u64,
511 pub avg_latency_ms: f64,
512 pub queries_to_primary: u64,
513 pub queries_to_replica: u64,
514}
515
516#[derive(Debug, Serialize)]
517pub struct DiscoveryResult {
518 pub tables_discovered: usize,
519 pub table_names: Vec<String>,
520}
521
522#[derive(Debug, Serialize)]
523pub struct RefreshResult {
524 pub success: bool,
525 pub message: String,
526}
527
528#[derive(Debug, Serialize)]
529pub struct AIWorkloadStatsResponse {
530 pub embedding_queries: u64,
531 pub context_lookups: u64,
532 pub knowledge_base_queries: u64,
533 pub tool_executions: u64,
534 pub total_ai_queries: u64,
535 pub avg_vector_dimensions: f64,
536}
537
538#[derive(Debug, Serialize)]
539pub struct RAGStatsResponse {
540 pub retrieval_count: u64,
541 pub avg_retrieval_latency_ms: f64,
542 pub fetch_count: u64,
543 pub avg_fetch_latency_ms: f64,
544 pub total_pipeline_executions: u64,
545 pub avg_total_latency_ms: f64,
546}
547
548#[derive(Debug)]
553pub enum AdminError {
554 NotFound(String),
555 InvalidInput(String),
556 DiscoveryError(String),
557 InternalError(String),
558}
559
560impl std::fmt::Display for AdminError {
561 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
562 match self {
563 Self::NotFound(msg) => write!(f, "Not found: {}", msg),
564 Self::InvalidInput(msg) => write!(f, "Invalid input: {}", msg),
565 Self::DiscoveryError(msg) => write!(f, "Discovery error: {}", msg),
566 Self::InternalError(msg) => write!(f, "Internal error: {}", msg),
567 }
568 }
569}
570
571impl std::error::Error for AdminError {}
572
573fn parse_access_pattern(s: &str) -> Option<AccessPattern> {
578 match s.to_uppercase().as_str() {
579 "POINTLOOKUP" | "POINT_LOOKUP" => Some(AccessPattern::PointLookup),
580 "RANGESCAN" | "RANGE_SCAN" => Some(AccessPattern::RangeScan),
581 "FULLSCAN" | "FULL_SCAN" => Some(AccessPattern::FullScan),
582 "VECTORSEARCH" | "VECTOR_SEARCH" => Some(AccessPattern::VectorSearch),
583 "TIMESERIESAPPEND" | "TIME_SERIES_APPEND" => Some(AccessPattern::TimeSeriesAppend),
584 "MIXED" => Some(AccessPattern::Mixed),
585 _ => None,
586 }
587}
588
589#[cfg(test)]
590mod tests {
591 use super::*;
592
593 #[test]
594 fn test_parse_access_pattern() {
595 assert_eq!(parse_access_pattern("PointLookup"), Some(AccessPattern::PointLookup));
596 assert_eq!(parse_access_pattern("POINT_LOOKUP"), Some(AccessPattern::PointLookup));
597 assert_eq!(parse_access_pattern("RangeScan"), Some(AccessPattern::RangeScan));
598 assert_eq!(parse_access_pattern("VectorSearch"), Some(AccessPattern::VectorSearch));
599 assert_eq!(parse_access_pattern("Mixed"), Some(AccessPattern::Mixed));
600 assert_eq!(parse_access_pattern("Invalid"), None);
601 }
602
603 #[test]
604 fn test_admin_error_display() {
605 let err = AdminError::NotFound("users".to_string());
606 assert!(err.to_string().contains("Not found"));
607
608 let err = AdminError::InvalidInput("bad temp".to_string());
609 assert!(err.to_string().contains("Invalid input"));
610 }
611}