Skip to main content

heliosdb_proxy/schema_routing/
admin.rs

1//! Schema Routing Admin API
2//!
3//! REST API endpoints for managing schema-aware routing.
4
5use std::sync::Arc;
6use std::sync::atomic::Ordering;
7use serde::{Deserialize, Serialize};
8
9use super::{
10    SchemaRegistry, LearningClassifier, SchemaDiscovery,
11    DataTemperature, WorkloadType, AccessPattern,
12    SchemaRoutingMetrics,
13};
14use super::registry::{TableSchema, ColumnSchema, StorageType};
15use super::router::SchemaAwareRouter;
16
17/// Admin API for schema routing
18pub struct SchemaRoutingAdmin {
19    pub registry: Arc<SchemaRegistry>,
20    pub router: Arc<SchemaAwareRouter>,
21    pub classifier: Arc<LearningClassifier>,
22    pub discovery: Arc<SchemaDiscovery>,
23    pub metrics: Arc<SchemaRoutingMetrics>,
24}
25
26impl SchemaRoutingAdmin {
27    /// Create a new admin API instance
28    pub fn new(
29        registry: Arc<SchemaRegistry>,
30        router: Arc<SchemaAwareRouter>,
31        classifier: Arc<LearningClassifier>,
32        discovery: Arc<SchemaDiscovery>,
33        metrics: Arc<SchemaRoutingMetrics>,
34    ) -> Self {
35        Self {
36            registry,
37            router,
38            classifier,
39            discovery,
40            metrics,
41        }
42    }
43
44    // =========================================================================
45    // TABLE ENDPOINTS
46    // =========================================================================
47
48    /// GET /schema/tables - List all registered tables
49    pub fn list_tables(&self) -> TablesResponse {
50        let tables = self.registry.list_tables();
51        TablesResponse {
52            tables: tables.into_iter().map(|t| TableSummary {
53                name: t.name.clone(),
54                temperature: format!("{:?}", t.temperature),
55                workload: format!("{:?}", t.workload),
56                access_pattern: format!("{:?}", t.access_pattern),
57                column_count: t.columns.len(),
58                shard_key: t.shard_key.clone(),
59                row_count_estimate: Some(t.estimated_rows),
60            }).collect(),
61            total: self.registry.table_count(),
62        }
63    }
64
65    /// GET /schema/tables/:name - Get details for a specific table
66    pub fn get_table(&self, name: &str) -> Option<TableDetails> {
67        self.registry.get_table(name).map(|t| TableDetails {
68            name: t.name.clone(),
69            columns: t.columns.iter().map(|c| ColumnDetails {
70                name: c.name.clone(),
71                data_type: c.data_type.clone(),
72                nullable: c.nullable,
73                is_primary_key: c.is_primary_key,
74                is_indexed: c.is_indexed,
75                default_value: None, // ColumnSchema doesn't have default_value
76                storage_type: Some(format!("{:?}", c.storage_type)),
77            }).collect(),
78            temperature: format!("{:?}", t.temperature),
79            workload: format!("{:?}", t.workload),
80            access_pattern: format!("{:?}", t.access_pattern),
81            primary_key: t.primary_key.clone(),
82            shard_key: t.shard_key.clone(),
83            row_count_estimate: Some(t.estimated_rows),
84            size_bytes: Some(t.avg_row_size as u64 * t.estimated_rows),
85            partition_key: t.partition_key.as_ref().map(|p| format!("{:?}", p)),
86        })
87    }
88
89    /// POST /schema/tables - Register a new table
90    pub fn register_table(&self, request: RegisterTableRequest) -> Result<TableDetails, AdminError> {
91        let temperature = DataTemperature::from_str(&request.temperature)
92            .ok_or_else(|| AdminError::InvalidInput(format!("Invalid temperature: {}", request.temperature)))?;
93
94        let workload = WorkloadType::from_str(&request.workload)
95            .ok_or_else(|| AdminError::InvalidInput(format!("Invalid workload: {}", request.workload)))?;
96
97        let access_pattern = parse_access_pattern(&request.access_pattern)
98            .ok_or_else(|| AdminError::InvalidInput(format!("Invalid access pattern: {}", request.access_pattern)))?;
99
100        let columns: Vec<ColumnSchema> = request.columns.iter().map(|c| ColumnSchema {
101            name: c.name.clone(),
102            data_type: c.data_type.clone(),
103            nullable: c.nullable,
104            is_primary_key: c.is_primary_key,
105            is_indexed: c.is_indexed.unwrap_or(false),
106            storage_type: StorageType::Row,
107        }).collect();
108
109        let table = TableSchema {
110            name: request.name.clone(),
111            columns,
112            access_pattern,
113            temperature,
114            workload,
115            primary_key: request.primary_key.clone(),
116            shard_key: request.shard_key.clone(),
117            estimated_rows: request.row_count_estimate.unwrap_or(0),
118            avg_row_size: 0,
119            partition_key: None,
120            preferred_nodes: Vec::new(),
121        };
122
123        self.registry.register_table(table);
124
125        self.get_table(&request.name)
126            .ok_or_else(|| AdminError::InternalError("Failed to register table".to_string()))
127    }
128
129    /// DELETE /schema/tables/:name - Remove a table from routing
130    pub fn remove_table(&self, name: &str) -> Result<(), AdminError> {
131        if self.registry.get_table(name).is_none() {
132            return Err(AdminError::NotFound(format!("Table not found: {}", name)));
133        }
134        self.registry.remove_table(name);
135        Ok(())
136    }
137
138    // =========================================================================
139    // CLASSIFICATION ENDPOINTS
140    // =========================================================================
141
142    /// POST /schema/classify - Manually classify a table
143    pub fn classify_table(&self, request: ClassifyRequest) -> Result<ClassificationResult, AdminError> {
144        let temperature = DataTemperature::from_str(&request.temperature)
145            .ok_or_else(|| AdminError::InvalidInput(format!("Invalid temperature: {}", request.temperature)))?;
146
147        let workload = WorkloadType::from_str(&request.workload)
148            .ok_or_else(|| AdminError::InvalidInput(format!("Invalid workload: {}", request.workload)))?;
149
150        // Get existing table
151        let mut table = self.registry.get_table(&request.table_name)
152            .ok_or_else(|| AdminError::NotFound(format!("Table not found: {}", request.table_name)))?;
153
154        // Update classifications
155        let old_temperature = table.temperature.clone();
156        let old_workload = table.workload.clone();
157
158        table.temperature = temperature.clone();
159        table.workload = workload.clone();
160
161        // Re-register with new classification
162        self.registry.register_table(table);
163
164        Ok(ClassificationResult {
165            table_name: request.table_name,
166            previous_temperature: format!("{:?}", old_temperature),
167            new_temperature: format!("{:?}", temperature),
168            previous_workload: format!("{:?}", old_workload),
169            new_workload: format!("{:?}", workload),
170        })
171    }
172
173    /// GET /schema/classify/:table - Get classifier suggestions
174    pub fn get_classification_suggestion(&self, table_name: &str) -> Result<ClassificationSuggestion, AdminError> {
175        // Get history from classifier
176        let history = self.classifier.get_history(table_name);
177
178        if history.is_none() {
179            return Err(AdminError::NotFound(format!("No query history for table: {}", table_name)));
180        }
181
182        let hist = history.expect("history checked above");
183        let query_count = hist.count();
184        let suggested_temp = self.classifier.suggest_temperature(table_name);
185        let suggested_workload = self.classifier.suggest_workload(table_name);
186        let confidence = self.classifier.get_confidence(table_name);
187
188        Ok(ClassificationSuggestion {
189            table_name: table_name.to_string(),
190            query_count,
191            suggested_temperature: suggested_temp.map(|t| format!("{:?}", t)),
192            suggested_workload: suggested_workload.map(|w| format!("{:?}", w)),
193            confidence: confidence.unwrap_or(0.0),
194            sample_size_sufficient: query_count >= 100,
195        })
196    }
197
198    // =========================================================================
199    // ANALYSIS ENDPOINTS
200    // =========================================================================
201
202    /// POST /schema/analyze - Analyze a query
203    pub fn analyze_query(&self, request: AnalyzeRequest) -> AnalysisResult {
204        use super::QueryAnalyzer;
205
206        let query = request.query;
207        let analyzer = QueryAnalyzer::new(self.registry.clone());
208        let analysis = analyzer.analyze(&query);
209
210        // Get primary access pattern from the list
211        let access_pattern = analysis.access_patterns.first()
212            .map(|p| format!("{:?}", p))
213            .unwrap_or_else(|| "Mixed".to_string());
214
215        let detected_workload = self.classifier.classify_query(&query)
216            .map(|w| format!("{:?}", w));
217
218        AnalysisResult {
219            query,
220            tables: analysis.tables.iter().map(|t| t.name.clone()).collect(),
221            access_pattern,
222            shard_keys: analysis.shard_keys.iter().map(|(k, v)| format!("{}={:?}", k, v)).collect(),
223            is_read_only: analysis.is_read_only,
224            estimated_complexity: analysis.complexity,
225            estimated_selectivity: analysis.selectivity,
226            has_aggregation: analysis.has_aggregations,
227            has_join: analysis.has_joins,
228            has_subquery: analysis.has_subqueries,
229            columns: Vec::new(), // Not available in QueryAnalysis
230            detected_workload,
231        }
232    }
233
234    /// POST /schema/route - Get routing decision for a query (dry-run)
235    pub fn route_query(&self, request: RouteRequest) -> RouteResult {
236        let decision = self.router.route(&request.query);
237
238        RouteResult {
239            query: request.query,
240            target_type: format!("{:?}", decision.target),
241            reason: format!("{:?}", decision.reason),
242            preferred_node: decision.node_info.as_ref().map(|n| n.name.clone()),
243            alternative_nodes: Vec::new(), // Not available in current RoutingDecision
244            estimated_latency_ms: decision.node_info.as_ref().map(|n| n.current_latency_ms),
245        }
246    }
247
248    // =========================================================================
249    // ROUTING STATS ENDPOINTS
250    // =========================================================================
251
252    /// GET /schema/stats - Get overall routing statistics
253    pub fn get_stats(&self) -> RoutingStatsResponse {
254        let stats = self.metrics.get_routing_stats();
255
256        RoutingStatsResponse {
257            total_queries_routed: stats.total_queries.load(Ordering::Relaxed),
258            queries_to_primary: stats.primary_routes.load(Ordering::Relaxed),
259            queries_to_replica: stats.replica_routes.load(Ordering::Relaxed),
260            queries_scattered: stats.scatter_gather.load(Ordering::Relaxed),
261            avg_latency_ms: 0.0, // Not tracked globally in RoutingStats
262            cache_hit_rate: stats.classification_hit_rate(),
263        }
264    }
265
266    /// GET /schema/stats/tables - Get per-table statistics
267    pub fn get_table_stats(&self) -> Vec<TableStatsResponse> {
268        let stats = self.metrics.get_table_stats_for_admin();
269
270        stats.into_iter().map(|(name, s)| TableStatsResponse {
271            table_name: name,
272            query_count: s.query_count,
273            avg_latency_ms: s.avg_latency_ms,
274            hit_rate: s.cache_hit_rate,
275            temperature: format!("{:?}", s.temperature),
276            workload: format!("{:?}", s.workload),
277        }).collect()
278    }
279
280    /// GET /schema/stats/workloads - Get per-workload statistics
281    pub fn get_workload_stats(&self) -> Vec<WorkloadStatsResponse> {
282        let stats = self.metrics.get_workload_stats_for_admin();
283
284        stats.into_iter().map(|(workload, s)| WorkloadStatsResponse {
285            workload: format!("{:?}", workload),
286            query_count: s.query_count,
287            avg_latency_ms: s.avg_latency_ms,
288            queries_to_primary: s.queries_to_primary,
289            queries_to_replica: s.queries_to_replica,
290        }).collect()
291    }
292
293    // =========================================================================
294    // DISCOVERY ENDPOINTS
295    // =========================================================================
296
297    /// POST /schema/discover - Trigger schema discovery
298    pub async fn trigger_discovery(&self) -> Result<DiscoveryResult, AdminError> {
299        let tables = self.discovery.discover().await
300            .map_err(|e| AdminError::DiscoveryError(e.to_string()))?;
301
302        // Register discovered tables
303        for table in &tables {
304            self.registry.register_table(table.clone());
305        }
306
307        Ok(DiscoveryResult {
308            tables_discovered: tables.len(),
309            table_names: tables.iter().map(|t| t.name.clone()).collect(),
310        })
311    }
312
313    /// POST /schema/refresh - Refresh schema cache
314    pub async fn refresh_schema(&self) -> Result<RefreshResult, AdminError> {
315        self.discovery.refresh().await
316            .map_err(|e| AdminError::DiscoveryError(e.to_string()))?;
317
318        Ok(RefreshResult {
319            success: true,
320            message: "Schema cache refreshed successfully".to_string(),
321        })
322    }
323
324    // =========================================================================
325    // AI/AGENT ENDPOINTS
326    // =========================================================================
327
328    /// GET /schema/ai/workloads - Get AI workload statistics
329    pub fn get_ai_workload_stats(&self) -> AIWorkloadStatsResponse {
330        let stats = self.metrics.get_ai_workload_stats();
331
332        AIWorkloadStatsResponse {
333            embedding_queries: stats.embedding_retrieval_count,
334            context_lookups: stats.context_lookup_count,
335            knowledge_base_queries: stats.knowledge_base_count,
336            tool_executions: stats.tool_execution_count,
337            total_ai_queries: stats.total_ai_queries(),
338            avg_vector_dimensions: stats.avg_vector_dimensions,
339        }
340    }
341
342    /// GET /schema/rag/stats - Get RAG pipeline statistics
343    pub fn get_rag_stats(&self) -> RAGStatsResponse {
344        let stats = self.metrics.get_rag_stats_for_admin();
345
346        RAGStatsResponse {
347            retrieval_count: stats.retrieval_count,
348            avg_retrieval_latency_ms: stats.avg_retrieval_latency_ms,
349            fetch_count: stats.fetch_count,
350            avg_fetch_latency_ms: stats.avg_fetch_latency_ms,
351            total_pipeline_executions: stats.total_pipeline_executions,
352            avg_total_latency_ms: stats.avg_total_latency_ms,
353        }
354    }
355}
356
357// =============================================================================
358// REQUEST/RESPONSE TYPES
359// =============================================================================
360
361#[derive(Debug, Serialize)]
362pub struct TablesResponse {
363    pub tables: Vec<TableSummary>,
364    pub total: usize,
365}
366
367#[derive(Debug, Serialize)]
368pub struct TableSummary {
369    pub name: String,
370    pub temperature: String,
371    pub workload: String,
372    pub access_pattern: String,
373    pub column_count: usize,
374    pub shard_key: Option<String>,
375    pub row_count_estimate: Option<u64>,
376}
377
378#[derive(Debug, Serialize)]
379pub struct TableDetails {
380    pub name: String,
381    pub columns: Vec<ColumnDetails>,
382    pub temperature: String,
383    pub workload: String,
384    pub access_pattern: String,
385    pub primary_key: Vec<String>,
386    pub shard_key: Option<String>,
387    pub row_count_estimate: Option<u64>,
388    pub size_bytes: Option<u64>,
389    pub partition_key: Option<String>,
390}
391
392#[derive(Debug, Serialize)]
393pub struct ColumnDetails {
394    pub name: String,
395    pub data_type: String,
396    pub nullable: bool,
397    pub is_primary_key: bool,
398    pub is_indexed: bool,
399    pub default_value: Option<String>,
400    pub storage_type: Option<String>,
401}
402
403#[derive(Debug, Deserialize)]
404pub struct RegisterTableRequest {
405    pub name: String,
406    pub columns: Vec<ColumnRequest>,
407    pub temperature: String,
408    pub workload: String,
409    pub access_pattern: String,
410    pub primary_key: Vec<String>,
411    pub shard_key: Option<String>,
412    pub row_count_estimate: Option<u64>,
413}
414
415#[derive(Debug, Deserialize)]
416pub struct ColumnRequest {
417    pub name: String,
418    pub data_type: String,
419    pub nullable: bool,
420    pub is_primary_key: bool,
421    pub is_indexed: Option<bool>,
422    pub default_value: Option<String>,
423}
424
425#[derive(Debug, Deserialize)]
426pub struct ClassifyRequest {
427    pub table_name: String,
428    pub temperature: String,
429    pub workload: String,
430}
431
432#[derive(Debug, Serialize)]
433pub struct ClassificationResult {
434    pub table_name: String,
435    pub previous_temperature: String,
436    pub new_temperature: String,
437    pub previous_workload: String,
438    pub new_workload: String,
439}
440
441#[derive(Debug, Serialize)]
442pub struct ClassificationSuggestion {
443    pub table_name: String,
444    pub query_count: u64,
445    pub suggested_temperature: Option<String>,
446    pub suggested_workload: Option<String>,
447    pub confidence: f64,
448    pub sample_size_sufficient: bool,
449}
450
451#[derive(Debug, Deserialize)]
452pub struct AnalyzeRequest {
453    pub query: String,
454}
455
456#[derive(Debug, Serialize)]
457pub struct AnalysisResult {
458    pub query: String,
459    pub tables: Vec<String>,
460    pub access_pattern: String,
461    pub shard_keys: Vec<String>,
462    pub is_read_only: bool,
463    pub estimated_complexity: u32,
464    pub estimated_selectivity: f64,
465    pub has_aggregation: bool,
466    pub has_join: bool,
467    pub has_subquery: bool,
468    pub columns: Vec<String>,
469    pub detected_workload: Option<String>,
470}
471
472#[derive(Debug, Deserialize)]
473pub struct RouteRequest {
474    pub query: String,
475}
476
477#[derive(Debug, Serialize)]
478pub struct RouteResult {
479    pub query: String,
480    pub target_type: String,
481    pub reason: String,
482    pub preferred_node: Option<String>,
483    pub alternative_nodes: Vec<String>,
484    pub estimated_latency_ms: Option<u64>,
485}
486
487#[derive(Debug, Serialize)]
488pub struct RoutingStatsResponse {
489    pub total_queries_routed: u64,
490    pub queries_to_primary: u64,
491    pub queries_to_replica: u64,
492    pub queries_scattered: u64,
493    pub avg_latency_ms: f64,
494    pub cache_hit_rate: f64,
495}
496
497#[derive(Debug, Serialize)]
498pub struct TableStatsResponse {
499    pub table_name: String,
500    pub query_count: u64,
501    pub avg_latency_ms: f64,
502    pub hit_rate: f64,
503    pub temperature: String,
504    pub workload: String,
505}
506
507#[derive(Debug, Serialize)]
508pub struct WorkloadStatsResponse {
509    pub workload: String,
510    pub query_count: u64,
511    pub avg_latency_ms: f64,
512    pub queries_to_primary: u64,
513    pub queries_to_replica: u64,
514}
515
516#[derive(Debug, Serialize)]
517pub struct DiscoveryResult {
518    pub tables_discovered: usize,
519    pub table_names: Vec<String>,
520}
521
522#[derive(Debug, Serialize)]
523pub struct RefreshResult {
524    pub success: bool,
525    pub message: String,
526}
527
528#[derive(Debug, Serialize)]
529pub struct AIWorkloadStatsResponse {
530    pub embedding_queries: u64,
531    pub context_lookups: u64,
532    pub knowledge_base_queries: u64,
533    pub tool_executions: u64,
534    pub total_ai_queries: u64,
535    pub avg_vector_dimensions: f64,
536}
537
538#[derive(Debug, Serialize)]
539pub struct RAGStatsResponse {
540    pub retrieval_count: u64,
541    pub avg_retrieval_latency_ms: f64,
542    pub fetch_count: u64,
543    pub avg_fetch_latency_ms: f64,
544    pub total_pipeline_executions: u64,
545    pub avg_total_latency_ms: f64,
546}
547
548// =============================================================================
549// ERRORS
550// =============================================================================
551
552#[derive(Debug)]
553pub enum AdminError {
554    NotFound(String),
555    InvalidInput(String),
556    DiscoveryError(String),
557    InternalError(String),
558}
559
560impl std::fmt::Display for AdminError {
561    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
562        match self {
563            Self::NotFound(msg) => write!(f, "Not found: {}", msg),
564            Self::InvalidInput(msg) => write!(f, "Invalid input: {}", msg),
565            Self::DiscoveryError(msg) => write!(f, "Discovery error: {}", msg),
566            Self::InternalError(msg) => write!(f, "Internal error: {}", msg),
567        }
568    }
569}
570
571impl std::error::Error for AdminError {}
572
573// =============================================================================
574// HELPER FUNCTIONS
575// =============================================================================
576
577fn parse_access_pattern(s: &str) -> Option<AccessPattern> {
578    match s.to_uppercase().as_str() {
579        "POINTLOOKUP" | "POINT_LOOKUP" => Some(AccessPattern::PointLookup),
580        "RANGESCAN" | "RANGE_SCAN" => Some(AccessPattern::RangeScan),
581        "FULLSCAN" | "FULL_SCAN" => Some(AccessPattern::FullScan),
582        "VECTORSEARCH" | "VECTOR_SEARCH" => Some(AccessPattern::VectorSearch),
583        "TIMESERIESAPPEND" | "TIME_SERIES_APPEND" => Some(AccessPattern::TimeSeriesAppend),
584        "MIXED" => Some(AccessPattern::Mixed),
585        _ => None,
586    }
587}
588
589#[cfg(test)]
590mod tests {
591    use super::*;
592
593    #[test]
594    fn test_parse_access_pattern() {
595        assert_eq!(parse_access_pattern("PointLookup"), Some(AccessPattern::PointLookup));
596        assert_eq!(parse_access_pattern("POINT_LOOKUP"), Some(AccessPattern::PointLookup));
597        assert_eq!(parse_access_pattern("RangeScan"), Some(AccessPattern::RangeScan));
598        assert_eq!(parse_access_pattern("VectorSearch"), Some(AccessPattern::VectorSearch));
599        assert_eq!(parse_access_pattern("Mixed"), Some(AccessPattern::Mixed));
600        assert_eq!(parse_access_pattern("Invalid"), None);
601    }
602
603    #[test]
604    fn test_admin_error_display() {
605        let err = AdminError::NotFound("users".to_string());
606        assert!(err.to_string().contains("Not found"));
607
608        let err = AdminError::InvalidInput("bad temp".to_string());
609        assert!(err.to_string().contains("Invalid input"));
610    }
611}