1#[cfg(feature = "anomaly-detection")]
7use crate::anomaly::AnomalyDetector;
8use crate::config::{NodeConfig, NodeRole, ProxyConfig};
9#[cfg(feature = "edge-proxy")]
10use crate::edge::{EdgeCache, EdgeRegistry, InvalidationEvent};
11#[cfg(feature = "wasm-plugins")]
12use crate::plugins::PluginManager;
13#[cfg(feature = "ha-tr")]
14use crate::replay::{ReplayEngine, TimeTravelRequest};
15use crate::server::{NodeHealth, ServerMetricsSnapshot};
16use crate::{ProxyError, Result};
17#[cfg(feature = "ha-tr")]
18use chrono::{DateTime, Utc};
19use serde::{Deserialize, Serialize};
20use std::collections::HashMap;
21use std::net::SocketAddr;
22use std::sync::atomic::{AtomicUsize, Ordering};
23use std::sync::Arc;
24use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
25use tokio::net::{TcpListener, TcpStream};
26use tokio::sync::{broadcast, RwLock};
27
28const ADMIN_UI_HTML: &str = include_str!("admin_ui.html");
32
33pub struct AdminServer {
35 listen_address: String,
37 state: Arc<AdminState>,
39 shutdown_tx: broadcast::Sender<()>,
41}
42
43pub struct AdminState {
45 pub node_health: RwLock<HashMap<String, NodeHealth>>,
47 pub metrics: RwLock<ServerMetricsSnapshot>,
49 pub active_sessions: RwLock<u64>,
51 pub config_snapshot: RwLock<ConfigSnapshot>,
53 pub proxy_config: RwLock<Option<ProxyConfig>>,
55 read_lb_counter: AtomicUsize,
57 commands: RwLock<HashMap<String, CommandHandler>>,
59 #[cfg(feature = "ha-tr")]
63 pub replay_engine: RwLock<Option<Arc<ReplayEngine>>>,
64 #[cfg(feature = "wasm-plugins")]
69 pub plugin_manager: RwLock<Option<Arc<PluginManager>>>,
70 pub chaos_overrides: RwLock<HashMap<String, ChaosOverride>>,
75 #[cfg(feature = "anomaly-detection")]
79 pub anomaly_detector: RwLock<Option<Arc<AnomalyDetector>>>,
80 #[cfg(feature = "edge-proxy")]
83 pub edge_cache: RwLock<Option<Arc<EdgeCache>>>,
84 #[cfg(feature = "edge-proxy")]
85 pub edge_registry: RwLock<Option<Arc<EdgeRegistry>>>,
86}
87
88#[derive(Debug, Clone, Serialize)]
93pub struct ChaosOverride {
94 pub since: String,
96 pub kind: String,
98 pub note: String,
100}
101
102
103type CommandHandler = Arc<dyn Fn(&[&str]) -> Result<String> + Send + Sync>;
105
106#[derive(Debug, Clone, Serialize, Deserialize)]
108pub struct ConfigSnapshot {
109 pub listen_address: String,
110 pub admin_address: String,
111 pub tr_enabled: bool,
112 pub tr_mode: String,
113 pub pool_min_connections: usize,
114 pub pool_max_connections: usize,
115 pub nodes: Vec<NodeSnapshot>,
116}
117
118#[derive(Debug, Clone, Serialize, Deserialize)]
120pub struct NodeSnapshot {
121 pub address: String,
122 pub role: String,
123 pub weight: u32,
124 pub enabled: bool,
125}
126
127impl AdminServer {
128 pub fn new(listen_address: String, state: Arc<AdminState>) -> Self {
130 let (shutdown_tx, _) = broadcast::channel(1);
131
132 Self {
133 listen_address,
134 state,
135 shutdown_tx,
136 }
137 }
138
139 pub async fn run(&self) -> Result<()> {
141 let listener = TcpListener::bind(&self.listen_address)
142 .await
143 .map_err(|e| ProxyError::Network(format!("Failed to bind admin: {}", e)))?;
144
145 tracing::info!("Admin API listening on {}", self.listen_address);
146
147 let mut shutdown_rx = self.shutdown_tx.subscribe();
148
149 loop {
150 tokio::select! {
151 accept_result = listener.accept() => {
152 match accept_result {
153 Ok((stream, addr)) => {
154 let state = self.state.clone();
155 tokio::spawn(async move {
156 if let Err(e) = Self::handle_connection(stream, addr, state).await {
157 tracing::error!("Admin connection error: {}", e);
158 }
159 });
160 }
161 Err(e) => {
162 tracing::error!("Admin accept error: {}", e);
163 }
164 }
165 }
166 _ = shutdown_rx.recv() => {
167 tracing::info!("Admin server shutting down");
168 break;
169 }
170 }
171 }
172
173 Ok(())
174 }
175
176 async fn handle_connection(
178 mut stream: TcpStream,
179 addr: SocketAddr,
180 state: Arc<AdminState>,
181 ) -> Result<()> {
182 tracing::debug!("Admin connection from {}", addr);
183
184 let (reader, mut writer) = stream.split();
185 let mut reader = BufReader::new(reader);
186 let mut line = String::new();
187
188 let mut headers = Vec::new();
190 let mut content_length: usize = 0;
191
192 loop {
193 line.clear();
194 let bytes_read = reader
195 .read_line(&mut line)
196 .await
197 .map_err(|e| ProxyError::Network(format!("Read error: {}", e)))?;
198
199 if bytes_read == 0 || line == "\r\n" {
200 break;
201 }
202
203 let trimmed = line.trim();
205 if trimmed.to_lowercase().starts_with("content-length:") {
206 if let Some(len_str) = trimmed.split(':').nth(1) {
207 content_length = len_str.trim().parse().unwrap_or(0);
208 }
209 }
210 headers.push(trimmed.to_string());
211 }
212
213 if headers.is_empty() {
214 return Ok(());
215 }
216
217 let request_line = &headers[0];
219 let parts: Vec<&str> = request_line.split_whitespace().collect();
220
221 if parts.len() < 2 {
222 Self::send_response(&mut writer, 400, "Bad Request", "Invalid request line").await?;
223 return Ok(());
224 }
225
226 let method = parts[0];
227 let path = parts[1];
228
229 let body = if content_length > 0 && (method == "POST" || method == "PUT") {
231 let mut body_buf = vec![0u8; content_length];
232 reader.read_exact(&mut body_buf).await
233 .map_err(|e| ProxyError::Network(format!("Body read error: {}", e)))?;
234 Some(String::from_utf8_lossy(&body_buf).to_string())
235 } else {
236 None
237 };
238
239 if method == "GET" && (path == "/" || path == "/ui" || path == "/ui/") {
242 Self::send_html_response(&mut writer, 200, ADMIN_UI_HTML).await?;
243 return Ok(());
244 }
245
246 let response = Self::route_request(method, path, body.as_deref(), &state).await;
248
249 match response {
250 Ok((status, body)) => {
251 Self::send_json_response(&mut writer, status, &body).await?;
252 }
253 Err(e) => {
254 let error = ErrorResponse {
255 error: e.to_string(),
256 };
257 Self::send_json_response(&mut writer, 500, &error).await?;
258 }
259 }
260
261 Ok(())
262 }
263
264 async fn send_html_response(
266 writer: &mut tokio::net::tcp::WriteHalf<'_>,
267 status: u16,
268 html: &str,
269 ) -> Result<()> {
270 let status_text = match status {
271 200 => "OK",
272 404 => "Not Found",
273 _ => "Unknown",
274 };
275 let response = format!(
276 "HTTP/1.1 {} {}\r\nContent-Type: text/html; charset=utf-8\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
277 status,
278 status_text,
279 html.len(),
280 html
281 );
282 writer
283 .write_all(response.as_bytes())
284 .await
285 .map_err(|e| ProxyError::Network(format!("Write error: {}", e)))?;
286 Ok(())
287 }
288
289 async fn route_request(
291 method: &str,
292 path: &str,
293 body: Option<&str>,
294 state: &Arc<AdminState>,
295 ) -> Result<(u16, serde_json::Value)> {
296 match (method, path) {
297 ("POST", "/api/sql") => {
299 Self::handle_sql_request(body, state).await
300 }
301
302 ("GET", "/health") => {
304 let health = HealthResponse { status: "ok" };
305 Ok((200, serde_json::to_value(health)?))
306 }
307 ("GET", "/health/ready") => {
308 let ready = Self::check_readiness(state).await;
309 let response = ReadinessResponse {
310 ready,
311 message: if ready {
312 "Proxy is ready"
313 } else {
314 "Proxy is not ready"
315 },
316 };
317 let status = if ready { 200 } else { 503 };
318 Ok((status, serde_json::to_value(response)?))
319 }
320 ("GET", "/health/live") => {
321 let response = LivenessResponse { alive: true };
322 Ok((200, serde_json::to_value(response)?))
323 }
324
325 ("GET", "/metrics") => {
327 let metrics = state.metrics.read().await.clone();
328 Ok((200, serde_json::to_value(MetricsResponse::from(metrics))?))
329 }
330 ("GET", "/metrics/prometheus") => {
331 let metrics = state.metrics.read().await.clone();
332 let prometheus = Self::format_prometheus_metrics(&metrics);
333 Ok((200, serde_json::json!({ "text": prometheus })))
334 }
335
336 ("GET", "/nodes") => {
338 let health = state.node_health.read().await;
339 let nodes: Vec<NodeHealthResponse> = health
340 .values()
341 .map(|h| NodeHealthResponse::from(h.clone()))
342 .collect();
343 Ok((200, serde_json::to_value(nodes)?))
344 }
345 ("GET", path) if path.starts_with("/nodes/") => {
346 let node_addr = path.trim_start_matches("/nodes/");
347 let health = state.node_health.read().await;
348 match health.get(node_addr) {
349 Some(h) => Ok((200, serde_json::to_value(NodeHealthResponse::from(h.clone()))?)),
350 None => Ok((404, serde_json::json!({ "error": "Node not found" }))),
351 }
352 }
353 ("POST", path) if path.starts_with("/nodes/") && path.ends_with("/enable") => {
354 let node_addr = path
355 .trim_start_matches("/nodes/")
356 .trim_end_matches("/enable");
357 Self::set_node_enabled(state, node_addr, true).await?;
358 Ok((200, serde_json::json!({ "status": "enabled" })))
359 }
360 ("POST", path) if path.starts_with("/nodes/") && path.ends_with("/disable") => {
361 let node_addr = path
362 .trim_start_matches("/nodes/")
363 .trim_end_matches("/disable");
364 Self::set_node_enabled(state, node_addr, false).await?;
365 Ok((200, serde_json::json!({ "status": "disabled" })))
366 }
367
368 ("GET", "/topology") => {
374 let topo = Self::compute_topology(state).await;
375 Ok((200, serde_json::to_value(topo)?))
376 }
377
378 #[cfg(feature = "ha-tr")]
382 ("POST", "/api/replay") => Self::handle_replay_request(body, state).await,
383 #[cfg(not(feature = "ha-tr"))]
384 ("POST", "/api/replay") => Ok((
385 503,
386 serde_json::json!({ "error": "ha-tr feature not compiled in" }),
387 )),
388
389 #[cfg(feature = "ha-tr")]
395 ("POST", "/api/shadow") => Self::handle_shadow_request(body).await,
396 #[cfg(not(feature = "ha-tr"))]
397 ("POST", "/api/shadow") => Ok((
398 503,
399 serde_json::json!({ "error": "ha-tr feature not compiled in" }),
400 )),
401
402 ("GET", "/plugins") => Self::handle_plugins_list(state).await,
407
408 #[cfg(feature = "anomaly-detection")]
412 ("GET", p) if p == "/anomalies" || p.starts_with("/anomalies?") => {
413 Self::handle_anomalies_list(p, state).await
414 }
415 #[cfg(not(feature = "anomaly-detection"))]
416 ("GET", p) if p == "/anomalies" || p.starts_with("/anomalies?") => Ok((
417 503,
418 serde_json::json!({ "error": "anomaly-detection feature not compiled in" }),
419 )),
420
421 #[cfg(feature = "edge-proxy")]
425 ("GET", "/api/edge") => Self::handle_edge_status(state).await,
426 #[cfg(feature = "edge-proxy")]
427 ("POST", "/api/edge/register") => {
428 Self::handle_edge_register(body, state).await
429 }
430 #[cfg(feature = "edge-proxy")]
431 ("POST", "/api/edge/invalidate") => {
432 Self::handle_edge_invalidate(body, state).await
433 }
434 #[cfg(not(feature = "edge-proxy"))]
435 ("GET", "/api/edge")
436 | ("POST", "/api/edge/register")
437 | ("POST", "/api/edge/invalidate") => Ok((
438 503,
439 serde_json::json!({ "error": "edge-proxy feature not compiled in" }),
440 )),
441
442 ("POST", "/api/chaos") => Self::handle_chaos_request(body, state).await,
446 ("GET", "/api/chaos") => {
449 let overrides = state.chaos_overrides.read().await.clone();
450 Ok((200, serde_json::to_value(overrides)?))
451 }
452
453 ("GET", "/config") => {
455 let config = state.config_snapshot.read().await.clone();
456 Ok((200, serde_json::to_value(config)?))
457 }
458
459 ("GET", "/sessions") => {
461 let count = *state.active_sessions.read().await;
462 let response = SessionsResponse {
463 active_sessions: count,
464 };
465 Ok((200, serde_json::to_value(response)?))
466 }
467
468 ("GET", "/pools") => {
470 let pools = Self::get_pool_stats(state).await;
471 Ok((200, serde_json::to_value(pools)?))
472 }
473
474 ("GET", "/version") => {
476 let response = VersionResponse {
477 version: crate::VERSION.to_string(),
478 build_time: env!("CARGO_PKG_VERSION").to_string(),
479 };
480 Ok((200, serde_json::to_value(response)?))
481 }
482
483 _ => Ok((404, serde_json::json!({ "error": "Not found" }))),
485 }
486 }
487
488 async fn handle_sql_request(
490 body: Option<&str>,
491 state: &Arc<AdminState>,
492 ) -> Result<(u16, serde_json::Value)> {
493 let body = body.ok_or_else(|| ProxyError::Internal("Missing request body".to_string()))?;
495 let request: SqlRequest = serde_json::from_str(body)
496 .map_err(|e| ProxyError::Internal(format!("Invalid JSON: {}", e)))?;
497
498 let sql = request.query.trim();
499 if sql.is_empty() {
500 return Ok((400, serde_json::json!({ "error": "Empty query" })));
501 }
502
503 let is_write = Self::is_write_query(sql);
505 let query_type = if is_write { "write" } else { "read" };
506
507 let proxy_config = state.proxy_config.read().await;
509 let config = proxy_config.as_ref()
510 .ok_or_else(|| ProxyError::Internal("Proxy config not initialized".to_string()))?;
511
512 let health = state.node_health.read().await;
514
515 let target_node = if is_write {
517 Self::select_primary_node(config, &health)?
519 } else {
520 Self::select_read_node(config, &health, state)?
522 };
523
524 let target_address = format!("{}:{}", target_node.host, target_node.port);
525 let http_port = target_node.http_port;
527 let http_url = format!("http://{}:{}/api/sql", target_node.host, http_port);
528
529 tracing::debug!(
530 "Routing {} query to {} ({})",
531 query_type,
532 target_address,
533 match target_node.role {
534 NodeRole::Primary => "primary",
535 NodeRole::Standby => "standby",
536 NodeRole::ReadReplica => "replica",
537 }
538 );
539
540 let result = Self::forward_sql_request(&http_url, sql).await?;
542
543 let response = SqlResponse {
545 query_type: query_type.to_string(),
546 routed_to: target_address,
547 node_role: format!("{:?}", target_node.role).to_lowercase(),
548 result,
549 };
550
551 Ok((200, serde_json::to_value(response)?))
552 }
553
554 fn is_write_query(sql: &str) -> bool {
556 let upper = sql.trim().to_uppercase();
557
558 if upper.starts_with("INSERT")
560 || upper.starts_with("UPDATE")
561 || upper.starts_with("DELETE")
562 || upper.starts_with("CREATE")
563 || upper.starts_with("ALTER")
564 || upper.starts_with("DROP")
565 || upper.starts_with("TRUNCATE")
566 || upper.starts_with("GRANT")
567 || upper.starts_with("REVOKE")
568 || upper.starts_with("VACUUM")
569 || upper.starts_with("REINDEX")
570 || upper.starts_with("MERGE")
571 || upper.starts_with("UPSERT")
572 {
573 return true;
574 }
575
576 if upper.starts_with("BEGIN")
578 || upper.starts_with("COMMIT")
579 || upper.starts_with("ROLLBACK")
580 || upper.starts_with("SAVEPOINT")
581 {
582 return true;
584 }
585
586 false
588 }
589
590 fn select_primary_node<'a>(
592 config: &'a ProxyConfig,
593 health: &HashMap<String, NodeHealth>,
594 ) -> Result<&'a NodeConfig> {
595 config.nodes.iter()
596 .find(|n| {
597 n.role == NodeRole::Primary
598 && n.enabled
599 && health.get(&n.address()).map(|h| h.healthy).unwrap_or(false)
600 })
601 .ok_or_else(|| ProxyError::Internal("No healthy primary node available".to_string()))
602 }
603
604 fn select_read_node<'a>(
606 config: &'a ProxyConfig,
607 health: &HashMap<String, NodeHealth>,
608 state: &AdminState,
609 ) -> Result<&'a NodeConfig> {
610 let healthy_nodes: Vec<&NodeConfig> = config.nodes.iter()
612 .filter(|n| n.enabled && health.get(&n.address()).map(|h| h.healthy).unwrap_or(false))
613 .collect();
614
615 if healthy_nodes.is_empty() {
616 return Err(ProxyError::Internal("No healthy nodes available".to_string()));
617 }
618
619 if config.load_balancer.read_write_split {
621 let read_nodes: Vec<&NodeConfig> = healthy_nodes.iter()
622 .filter(|n| n.role == NodeRole::Standby || n.role == NodeRole::ReadReplica)
623 .copied()
624 .collect();
625
626 if !read_nodes.is_empty() {
627 let counter = state.read_lb_counter.fetch_add(1, Ordering::Relaxed);
629 let index = counter % read_nodes.len();
630 return Ok(read_nodes[index]);
631 }
632 }
633
634 let counter = state.read_lb_counter.fetch_add(1, Ordering::Relaxed);
636 let index = counter % healthy_nodes.len();
637 Ok(healthy_nodes[index])
638 }
639
640 async fn forward_sql_request(url: &str, sql: &str) -> Result<serde_json::Value> {
642 let request_body = serde_json::json!({ "query": sql });
644 let body_bytes = serde_json::to_vec(&request_body)
645 .map_err(|e| ProxyError::Internal(format!("JSON serialization error: {}", e)))?;
646
647 let url_parts: Vec<&str> = url.trim_start_matches("http://").splitn(2, '/').collect();
649 if url_parts.is_empty() {
650 return Err(ProxyError::Internal("Invalid URL".to_string()));
651 }
652
653 let host_port = url_parts[0];
654 let path = if url_parts.len() > 1 {
655 format!("/{}", url_parts[1])
656 } else {
657 "/".to_string()
658 };
659
660 let stream = TcpStream::connect(host_port).await
662 .map_err(|e| ProxyError::Network(format!("Failed to connect to {}: {}", host_port, e)))?;
663
664 let (reader, mut writer) = stream.into_split();
665 let mut reader = BufReader::new(reader);
666
667 let request = format!(
669 "POST {} HTTP/1.1\r\nHost: {}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
670 path,
671 host_port,
672 body_bytes.len()
673 );
674
675 writer.write_all(request.as_bytes()).await
676 .map_err(|e| ProxyError::Network(format!("Write error: {}", e)))?;
677 writer.write_all(&body_bytes).await
678 .map_err(|e| ProxyError::Network(format!("Write body error: {}", e)))?;
679
680 let mut response_headers = Vec::new();
682 let mut line = String::new();
683 let mut content_length: usize = 0;
684
685 loop {
686 line.clear();
687 let bytes_read = reader.read_line(&mut line).await
688 .map_err(|e| ProxyError::Network(format!("Response read error: {}", e)))?;
689
690 if bytes_read == 0 || line == "\r\n" {
691 break;
692 }
693
694 let trimmed = line.trim();
695 if trimmed.to_lowercase().starts_with("content-length:") {
696 if let Some(len_str) = trimmed.split(':').nth(1) {
697 content_length = len_str.trim().parse().unwrap_or(0);
698 }
699 }
700 response_headers.push(trimmed.to_string());
701 }
702
703 let mut body_buf = vec![0u8; content_length];
705 if content_length > 0 {
706 reader.read_exact(&mut body_buf).await
707 .map_err(|e| ProxyError::Network(format!("Response body read error: {}", e)))?;
708 }
709
710 let response_body = String::from_utf8_lossy(&body_buf);
711
712 serde_json::from_str(&response_body)
714 .map_err(|e| ProxyError::Internal(format!("Invalid JSON response: {} - body: {}", e, response_body)))
715 }
716
717 async fn check_readiness(state: &Arc<AdminState>) -> bool {
719 let health = state.node_health.read().await;
720
721 health.values().any(|h| h.healthy)
723 }
724
725 async fn set_node_enabled(state: &Arc<AdminState>, node_addr: &str, enabled: bool) -> Result<()> {
727 let mut health = state.node_health.write().await;
728
729 if let Some(node_health) = health.get_mut(node_addr) {
730 node_health.healthy = enabled;
731 Ok(())
732 } else {
733 Err(ProxyError::Config(format!("Node not found: {}", node_addr)))
734 }
735 }
736
737 async fn get_pool_stats(_state: &Arc<AdminState>) -> Vec<PoolStatsResponse> {
739 Vec::new()
741 }
742
743 #[cfg(feature = "ha-tr")]
747 async fn handle_replay_request(
748 body: Option<&str>,
749 state: &Arc<AdminState>,
750 ) -> Result<(u16, serde_json::Value)> {
751 let raw = body.ok_or_else(|| {
752 ProxyError::Internal("replay: empty request body".to_string())
753 })?;
754 let req: ReplayRequestBody = match serde_json::from_str(raw) {
755 Ok(r) => r,
756 Err(e) => {
757 return Ok((
758 400,
759 serde_json::json!({ "error": format!("invalid body: {}", e) }),
760 ));
761 }
762 };
763 let engine = match state.replay_engine.read().await.clone() {
764 Some(e) => e,
765 None => {
766 return Ok((
767 503,
768 serde_json::json!({ "error": "replay engine not attached" }),
769 ));
770 }
771 };
772 let tt = TimeTravelRequest {
773 from: req.from,
774 to: req.to,
775 target_host: req.target_host,
776 target_port: req.target_port,
777 target_user: req.target_user,
778 target_password: req.target_password,
779 target_database: req.target_database,
780 };
781 match engine.replay_window(&tt).await {
782 Ok(summary) => Ok((200, serde_json::to_value(summary)?)),
783 Err(e) => Ok((
784 500,
785 serde_json::json!({ "error": format!("replay failed: {}", e) }),
786 )),
787 }
788 }
789
790 #[cfg(feature = "edge-proxy")]
793 async fn handle_edge_status(
794 state: &Arc<AdminState>,
795 ) -> Result<(u16, serde_json::Value)> {
796 let cache_stats = match state.edge_cache.read().await.clone() {
797 Some(c) => Some(c.stats()),
798 None => None,
799 };
800 let edges = match state.edge_registry.read().await.clone() {
801 Some(r) => r.list(),
802 None => Vec::new(),
803 };
804 Ok((200, serde_json::json!({
805 "cache": cache_stats,
806 "registered": edges,
807 "edge_count": edges.len(),
808 })))
809 }
810
811 #[cfg(feature = "edge-proxy")]
816 async fn handle_edge_register(
817 body: Option<&str>,
818 state: &Arc<AdminState>,
819 ) -> Result<(u16, serde_json::Value)> {
820 let raw = body.ok_or_else(|| {
821 ProxyError::Internal("edge register: empty body".to_string())
822 })?;
823 let req: EdgeRegisterBody = match serde_json::from_str(raw) {
824 Ok(r) => r,
825 Err(e) => {
826 return Ok((
827 400,
828 serde_json::json!({ "error": format!("invalid body: {}", e) }),
829 ));
830 }
831 };
832 let registry = match state.edge_registry.read().await.clone() {
833 Some(r) => r,
834 None => {
835 return Ok((
836 503,
837 serde_json::json!({ "error": "edge registry not attached" }),
838 ));
839 }
840 };
841 let now = chrono::Utc::now().to_rfc3339();
842 match registry.register(&req.edge_id, &req.region, &req.base_url, &now) {
843 Ok(_rx) => {
844 Ok((201, serde_json::json!({
850 "edge_id": req.edge_id,
851 "region": req.region,
852 "base_url": req.base_url,
853 "registered_at": now,
854 })))
855 }
856 Err(e) => Ok((
857 503,
858 serde_json::json!({ "error": e.to_string() }),
859 )),
860 }
861 }
862
863 #[cfg(feature = "edge-proxy")]
870 async fn handle_edge_invalidate(
871 body: Option<&str>,
872 state: &Arc<AdminState>,
873 ) -> Result<(u16, serde_json::Value)> {
874 let raw = body.ok_or_else(|| {
875 ProxyError::Internal("edge invalidate: empty body".to_string())
876 })?;
877 let req: EdgeInvalidateBody = match serde_json::from_str(raw) {
878 Ok(r) => r,
879 Err(e) => {
880 return Ok((
881 400,
882 serde_json::json!({ "error": format!("invalid body: {}", e) }),
883 ));
884 }
885 };
886 let cache = match state.edge_cache.read().await.clone() {
887 Some(c) => c,
888 None => {
889 return Ok((
890 503,
891 serde_json::json!({ "error": "edge cache not attached" }),
892 ));
893 }
894 };
895 let registry = match state.edge_registry.read().await.clone() {
896 Some(r) => r,
897 None => {
898 return Ok((
899 503,
900 serde_json::json!({ "error": "edge registry not attached" }),
901 ));
902 }
903 };
904 let version = req.up_to_version.unwrap_or_else(|| cache.next_version());
905 let dropped_local = cache.invalidate(version, &req.tables);
907 let ev = InvalidationEvent {
909 up_to_version: version,
910 tables: req.tables.clone(),
911 committed_at: chrono::Utc::now().to_rfc3339(),
912 };
913 let (sent, pruned) = registry.broadcast(ev).await;
914 Ok((200, serde_json::json!({
915 "version": version,
916 "tables": req.tables,
917 "dropped_local": dropped_local,
918 "edges_notified": sent,
919 "edges_pruned": pruned,
920 })))
921 }
922
923 #[cfg(feature = "anomaly-detection")]
928 async fn handle_anomalies_list(
929 path: &str,
930 state: &Arc<AdminState>,
931 ) -> Result<(u16, serde_json::Value)> {
932 let limit = parse_limit_query(path, 100, 1024);
933 let det = match state.anomaly_detector.read().await.clone() {
934 Some(d) => d,
935 None => {
936 return Ok((
937 503,
938 serde_json::json!({ "error": "anomaly detector not attached" }),
939 ));
940 }
941 };
942 let events = det.recent_events(limit);
943 Ok((200, serde_json::json!({
944 "count": events.len(),
945 "limit": limit,
946 "events": events,
947 "buffer_total": det.event_count(),
948 })))
949 }
950
951 #[cfg(feature = "ha-tr")]
961 async fn handle_shadow_request(
962 body: Option<&str>,
963 ) -> Result<(u16, serde_json::Value)> {
964 use crate::backend::{tls::default_client_config, BackendClient, BackendConfig, ParamValue, TlsMode};
965 use crate::shadow_execute::shadow_execute;
966
967 let raw = body.ok_or_else(|| {
968 ProxyError::Internal("shadow: empty request body".to_string())
969 })?;
970 let req: ShadowRequestBody = match serde_json::from_str(raw) {
971 Ok(r) => r,
972 Err(e) => {
973 return Ok((
974 400,
975 serde_json::json!({ "error": format!("invalid body: {}", e) }),
976 ));
977 }
978 };
979
980 let mk_cfg = |host: String, port: u16, user: Option<String>, password: Option<String>, database: Option<String>| BackendConfig {
983 host,
984 port,
985 user: user.unwrap_or_else(|| "postgres".into()),
986 password,
987 database,
988 application_name: Some("heliosdb-proxy-shadow".into()),
989 tls_mode: TlsMode::Disable,
990 connect_timeout: std::time::Duration::from_secs(5),
991 query_timeout: std::time::Duration::from_secs(30),
992 tls_config: default_client_config(),
993 };
994 let source_cfg = mk_cfg(
995 req.source_host,
996 req.source_port,
997 req.source_user,
998 req.source_password,
999 req.source_database,
1000 );
1001 let shadow_cfg = mk_cfg(
1002 req.shadow_host,
1003 req.shadow_port,
1004 req.shadow_user,
1005 req.shadow_password,
1006 req.shadow_database,
1007 );
1008
1009 let mut source = match BackendClient::connect(&source_cfg).await {
1013 Ok(c) => c,
1014 Err(e) => {
1015 return Ok((
1016 500,
1017 serde_json::json!({ "error": format!("source connect: {}", e) }),
1018 ));
1019 }
1020 };
1021
1022 let params: Vec<ParamValue> = req
1023 .params
1024 .unwrap_or_default()
1025 .into_iter()
1026 .map(|s| ParamValue::Text(s))
1027 .collect();
1028
1029 let outcome = shadow_execute(&mut source, &shadow_cfg, &req.sql, ¶ms).await;
1030 source.close().await;
1031
1032 match outcome {
1033 Ok((_qr, report)) => Ok((200, serde_json::json!({
1034 "sql": report.sql,
1035 "both_succeeded": report.both_succeeded,
1036 "row_count_match": report.row_count_match,
1037 "row_hash_match": report.row_hash_match,
1038 "primary_elapsed_us": report.primary_elapsed_us,
1039 "shadow_elapsed_us": report.shadow_elapsed_us,
1040 "primary_error": report.primary_error,
1041 "shadow_error": report.shadow_error,
1042 "is_clean": report.is_clean(),
1043 }))),
1044 Err(e) => Ok((
1045 500,
1046 serde_json::json!({ "error": format!("shadow execute: {}", e) }),
1047 )),
1048 }
1049 }
1050
1051 async fn handle_chaos_request(
1067 body: Option<&str>,
1068 state: &Arc<AdminState>,
1069 ) -> Result<(u16, serde_json::Value)> {
1070 let raw = body.ok_or_else(|| {
1071 ProxyError::Internal("chaos: empty request body".to_string())
1072 })?;
1073 let action: ChaosAction = match serde_json::from_str(raw) {
1074 Ok(a) => a,
1075 Err(e) => {
1076 return Ok((
1077 400,
1078 serde_json::json!({ "error": format!("invalid body: {}", e) }),
1079 ));
1080 }
1081 };
1082 match action {
1083 ChaosAction::ForceUnhealthy { target_node } => {
1084 if let Err(e) = Self::set_node_enabled(state, &target_node, false).await {
1085 return Ok((
1086 404,
1087 serde_json::json!({ "error": e.to_string() }),
1088 ));
1089 }
1090 state.chaos_overrides.write().await.insert(
1091 target_node.clone(),
1092 ChaosOverride {
1093 since: chrono::Utc::now().to_rfc3339(),
1094 kind: "force_unhealthy".to_string(),
1095 note: format!("forced unhealthy via chaos endpoint"),
1096 },
1097 );
1098 Ok((200, serde_json::json!({
1099 "applied": "force_unhealthy",
1100 "target_node": target_node,
1101 })))
1102 }
1103 ChaosAction::Restore { target_node } => {
1104 if let Err(e) = Self::set_node_enabled(state, &target_node, true).await {
1105 return Ok((
1106 404,
1107 serde_json::json!({ "error": e.to_string() }),
1108 ));
1109 }
1110 state.chaos_overrides.write().await.remove(&target_node);
1111 Ok((200, serde_json::json!({
1112 "restored": target_node,
1113 })))
1114 }
1115 ChaosAction::Reset => {
1116 let overrides: Vec<String> =
1117 state.chaos_overrides.read().await.keys().cloned().collect();
1118 let mut restored = Vec::with_capacity(overrides.len());
1119 for addr in overrides {
1120 let _ = Self::set_node_enabled(state, &addr, true).await;
1121 restored.push(addr);
1122 }
1123 state.chaos_overrides.write().await.clear();
1124 Ok((200, serde_json::json!({
1125 "reset": true,
1126 "restored": restored,
1127 })))
1128 }
1129 }
1130 }
1131
1132 #[cfg(feature = "wasm-plugins")]
1138 async fn handle_plugins_list(state: &Arc<AdminState>) -> Result<(u16, serde_json::Value)> {
1139 let pm = match state.plugin_manager.read().await.clone() {
1140 Some(p) => p,
1141 None => {
1142 return Ok((
1143 503,
1144 serde_json::json!({ "error": "plugin manager not attached" }),
1145 ));
1146 }
1147 };
1148 let plugins: Vec<PluginListEntry> = pm
1149 .list_plugins()
1150 .into_iter()
1151 .map(|info| PluginListEntry {
1152 name: info.name,
1153 version: info.version,
1154 description: info.description,
1155 hooks: info
1156 .hooks
1157 .iter()
1158 .map(|h| h.export_name().to_string())
1159 .collect(),
1160 state: format!("{:?}", info.state),
1161 invocations: info.stats.total_calls,
1162 errors: info.stats.error_count,
1163 })
1164 .collect();
1165 Ok((200, serde_json::to_value(plugins)?))
1166 }
1167
1168 #[cfg(not(feature = "wasm-plugins"))]
1169 async fn handle_plugins_list(_state: &Arc<AdminState>) -> Result<(u16, serde_json::Value)> {
1170 Ok((
1171 503,
1172 serde_json::json!({ "error": "wasm-plugins feature not compiled in" }),
1173 ))
1174 }
1175
1176 async fn compute_topology(state: &Arc<AdminState>) -> TopologyResponse {
1182 let health = state.node_health.read().await;
1183 let cfg = state.config_snapshot.read().await;
1184
1185 let mut current_primary: Option<String> = None;
1186 for n in &cfg.nodes {
1187 if n.role.eq_ignore_ascii_case("primary") {
1188 let healthy = health.get(&n.address).map(|h| h.healthy).unwrap_or(false);
1189 if healthy {
1190 current_primary = Some(n.address.clone());
1191 break;
1192 }
1193 }
1194 }
1195
1196 let healthy_nodes = health.values().filter(|h| h.healthy).count() as u32;
1197 let unhealthy_nodes = health.values().filter(|h| !h.healthy).count() as u32;
1198 let total_nodes = cfg.nodes.len() as u32;
1199
1200 TopologyResponse {
1201 current_primary,
1202 healthy_nodes,
1203 unhealthy_nodes,
1204 total_nodes,
1205 last_failover_at: None,
1206 }
1207 }
1208
1209 fn format_prometheus_metrics(metrics: &ServerMetricsSnapshot) -> String {
1211 let mut output = String::new();
1212
1213 output.push_str("# HELP heliosdb_proxy_connections_total Total connections accepted\n");
1214 output.push_str("# TYPE heliosdb_proxy_connections_total counter\n");
1215 output.push_str(&format!(
1216 "heliosdb_proxy_connections_total {}\n",
1217 metrics.connections_accepted
1218 ));
1219
1220 output.push_str("# HELP heliosdb_proxy_connections_closed Total connections closed\n");
1221 output.push_str("# TYPE heliosdb_proxy_connections_closed counter\n");
1222 output.push_str(&format!(
1223 "heliosdb_proxy_connections_closed {}\n",
1224 metrics.connections_closed
1225 ));
1226
1227 output.push_str("# HELP heliosdb_proxy_queries_total Total queries processed\n");
1228 output.push_str("# TYPE heliosdb_proxy_queries_total counter\n");
1229 output.push_str(&format!(
1230 "heliosdb_proxy_queries_total {}\n",
1231 metrics.queries_processed
1232 ));
1233
1234 output.push_str("# HELP heliosdb_proxy_bytes_received_total Total bytes received\n");
1235 output.push_str("# TYPE heliosdb_proxy_bytes_received_total counter\n");
1236 output.push_str(&format!(
1237 "heliosdb_proxy_bytes_received_total {}\n",
1238 metrics.bytes_received
1239 ));
1240
1241 output.push_str("# HELP heliosdb_proxy_bytes_sent_total Total bytes sent\n");
1242 output.push_str("# TYPE heliosdb_proxy_bytes_sent_total counter\n");
1243 output.push_str(&format!(
1244 "heliosdb_proxy_bytes_sent_total {}\n",
1245 metrics.bytes_sent
1246 ));
1247
1248 output.push_str("# HELP heliosdb_proxy_failovers_total Total failovers\n");
1249 output.push_str("# TYPE heliosdb_proxy_failovers_total counter\n");
1250 output.push_str(&format!(
1251 "heliosdb_proxy_failovers_total {}\n",
1252 metrics.failovers
1253 ));
1254
1255 output
1256 }
1257
1258 async fn send_response(
1260 writer: &mut tokio::net::tcp::WriteHalf<'_>,
1261 status: u16,
1262 status_text: &str,
1263 body: &str,
1264 ) -> Result<()> {
1265 let response = format!(
1266 "HTTP/1.1 {} {}\r\nContent-Type: text/plain\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
1267 status,
1268 status_text,
1269 body.len(),
1270 body
1271 );
1272
1273 writer
1274 .write_all(response.as_bytes())
1275 .await
1276 .map_err(|e| ProxyError::Network(format!("Write error: {}", e)))?;
1277
1278 Ok(())
1279 }
1280
1281 async fn send_json_response<T: Serialize>(
1283 writer: &mut tokio::net::tcp::WriteHalf<'_>,
1284 status: u16,
1285 body: &T,
1286 ) -> Result<()> {
1287 let json = serde_json::to_string(body)
1288 .map_err(|e| ProxyError::Internal(format!("JSON error: {}", e)))?;
1289
1290 let status_text = match status {
1291 200 => "OK",
1292 400 => "Bad Request",
1293 404 => "Not Found",
1294 500 => "Internal Server Error",
1295 503 => "Service Unavailable",
1296 _ => "Unknown",
1297 };
1298
1299 let response = format!(
1300 "HTTP/1.1 {} {}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
1301 status,
1302 status_text,
1303 json.len(),
1304 json
1305 );
1306
1307 writer
1308 .write_all(response.as_bytes())
1309 .await
1310 .map_err(|e| ProxyError::Network(format!("Write error: {}", e)))?;
1311
1312 Ok(())
1313 }
1314
1315 pub fn shutdown(&self) {
1317 let _ = self.shutdown_tx.send(());
1318 }
1319}
1320
1321impl AdminState {
1322 pub fn new() -> Self {
1324 Self {
1325 node_health: RwLock::new(HashMap::new()),
1326 metrics: RwLock::new(ServerMetricsSnapshot {
1327 connections_accepted: 0,
1328 connections_closed: 0,
1329 queries_processed: 0,
1330 bytes_received: 0,
1331 bytes_sent: 0,
1332 failovers: 0,
1333 }),
1334 active_sessions: RwLock::new(0),
1335 config_snapshot: RwLock::new(ConfigSnapshot {
1336 listen_address: String::new(),
1337 admin_address: String::new(),
1338 tr_enabled: false,
1339 tr_mode: String::new(),
1340 pool_min_connections: 0,
1341 pool_max_connections: 0,
1342 nodes: Vec::new(),
1343 }),
1344 proxy_config: RwLock::new(None),
1345 read_lb_counter: AtomicUsize::new(0),
1346 commands: RwLock::new(HashMap::new()),
1347 #[cfg(feature = "ha-tr")]
1348 replay_engine: RwLock::new(None),
1349 #[cfg(feature = "wasm-plugins")]
1350 plugin_manager: RwLock::new(None),
1351 chaos_overrides: RwLock::new(HashMap::new()),
1352 #[cfg(feature = "anomaly-detection")]
1353 anomaly_detector: RwLock::new(None),
1354 #[cfg(feature = "edge-proxy")]
1355 edge_cache: RwLock::new(None),
1356 #[cfg(feature = "edge-proxy")]
1357 edge_registry: RwLock::new(None),
1358 }
1359 }
1360
1361 #[cfg(feature = "anomaly-detection")]
1364 pub async fn with_anomaly_detector(&self, detector: Arc<AnomalyDetector>) {
1365 *self.anomaly_detector.write().await = Some(detector);
1366 }
1367
1368 #[cfg(feature = "edge-proxy")]
1371 pub async fn with_edge(&self, cache: Arc<EdgeCache>, registry: Arc<EdgeRegistry>) {
1372 *self.edge_cache.write().await = Some(cache);
1373 *self.edge_registry.write().await = Some(registry);
1374 }
1375
1376 #[cfg(feature = "ha-tr")]
1381 pub async fn with_replay_engine(&self, engine: Arc<ReplayEngine>) {
1382 *self.replay_engine.write().await = Some(engine);
1383 }
1384
1385 #[cfg(feature = "wasm-plugins")]
1389 pub async fn with_plugin_manager(&self, manager: Arc<PluginManager>) {
1390 *self.plugin_manager.write().await = Some(manager);
1391 }
1392
1393 pub async fn set_proxy_config(&self, config: ProxyConfig) {
1395 let mut proxy_config = self.proxy_config.write().await;
1396 *proxy_config = Some(config);
1397 }
1398
1399 pub async fn register_command<F>(&self, name: &str, handler: F)
1401 where
1402 F: Fn(&[&str]) -> Result<String> + Send + Sync + 'static,
1403 {
1404 let mut commands = self.commands.write().await;
1405 commands.insert(name.to_string(), Arc::new(handler));
1406 }
1407
1408 pub async fn execute_command(&self, name: &str, args: &[&str]) -> Result<String> {
1410 let commands = self.commands.read().await;
1411 match commands.get(name) {
1412 Some(handler) => handler(args),
1413 None => Err(ProxyError::Internal(format!("Unknown command: {}", name))),
1414 }
1415 }
1416}
1417
1418impl Default for AdminState {
1419 fn default() -> Self {
1420 Self::new()
1421 }
1422}
1423
1424#[derive(Debug, Deserialize)]
1428struct SqlRequest {
1429 query: String,
1431}
1432
1433#[derive(Debug, Serialize)]
1435struct SqlResponse {
1436 query_type: String,
1438 routed_to: String,
1440 node_role: String,
1442 result: serde_json::Value,
1444}
1445
1446#[derive(Serialize)]
1447struct HealthResponse {
1448 status: &'static str,
1449}
1450
1451#[derive(Serialize)]
1452struct ReadinessResponse {
1453 ready: bool,
1454 message: &'static str,
1455}
1456
1457#[derive(Serialize)]
1458struct LivenessResponse {
1459 alive: bool,
1460}
1461
1462#[derive(Serialize)]
1463struct ErrorResponse {
1464 error: String,
1465}
1466
1467#[derive(Serialize)]
1468struct MetricsResponse {
1469 connections_accepted: u64,
1470 connections_closed: u64,
1471 connections_active: u64,
1472 queries_processed: u64,
1473 bytes_received: u64,
1474 bytes_sent: u64,
1475 failovers: u64,
1476}
1477
1478impl From<ServerMetricsSnapshot> for MetricsResponse {
1479 fn from(m: ServerMetricsSnapshot) -> Self {
1480 Self {
1481 connections_accepted: m.connections_accepted,
1482 connections_closed: m.connections_closed,
1483 connections_active: m.connections_accepted.saturating_sub(m.connections_closed),
1484 queries_processed: m.queries_processed,
1485 bytes_received: m.bytes_received,
1486 bytes_sent: m.bytes_sent,
1487 failovers: m.failovers,
1488 }
1489 }
1490}
1491
1492#[derive(Serialize)]
1493struct NodeHealthResponse {
1494 address: String,
1495 healthy: bool,
1496 last_check: String,
1497 failure_count: u32,
1498 last_error: Option<String>,
1499 latency_ms: f64,
1500 replication_lag_bytes: Option<u64>,
1501}
1502
1503impl From<NodeHealth> for NodeHealthResponse {
1504 fn from(h: NodeHealth) -> Self {
1505 Self {
1506 address: h.address,
1507 healthy: h.healthy,
1508 last_check: h.last_check.to_rfc3339(),
1509 failure_count: h.failure_count,
1510 last_error: h.last_error,
1511 latency_ms: h.latency_ms,
1512 replication_lag_bytes: h.replication_lag_bytes,
1513 }
1514 }
1515}
1516
1517#[derive(Serialize)]
1518struct SessionsResponse {
1519 active_sessions: u64,
1520}
1521
1522#[cfg(feature = "edge-proxy")]
1524#[derive(Debug, Deserialize)]
1525struct EdgeRegisterBody {
1526 edge_id: String,
1527 region: String,
1528 base_url: String,
1529}
1530
1531#[cfg(feature = "edge-proxy")]
1535#[derive(Debug, Deserialize)]
1536struct EdgeInvalidateBody {
1537 #[serde(default)]
1538 tables: Vec<String>,
1539 #[serde(default)]
1540 up_to_version: Option<u64>,
1541}
1542
1543#[cfg(feature = "anomaly-detection")]
1546fn parse_limit_query(path: &str, default: usize, max: usize) -> usize {
1547 let q = match path.find('?') {
1548 Some(i) => &path[i + 1..],
1549 None => return default,
1550 };
1551 for kv in q.split('&') {
1552 let mut it = kv.splitn(2, '=');
1553 if let (Some(k), Some(v)) = (it.next(), it.next()) {
1554 if k == "limit" {
1555 if let Ok(n) = v.parse::<usize>() {
1556 return n.min(max);
1557 }
1558 }
1559 }
1560 }
1561 default
1562}
1563
1564#[cfg(feature = "ha-tr")]
1566#[derive(Debug, Deserialize)]
1567struct ShadowRequestBody {
1568 sql: String,
1570 #[serde(default)]
1573 params: Option<Vec<String>>,
1574
1575 source_host: String,
1578 source_port: u16,
1579 #[serde(default)]
1580 source_user: Option<String>,
1581 #[serde(default)]
1582 source_password: Option<String>,
1583 #[serde(default)]
1584 source_database: Option<String>,
1585
1586 shadow_host: String,
1589 shadow_port: u16,
1590 #[serde(default)]
1591 shadow_user: Option<String>,
1592 #[serde(default)]
1593 shadow_password: Option<String>,
1594 #[serde(default)]
1595 shadow_database: Option<String>,
1596}
1597
1598#[derive(Debug, Deserialize)]
1603#[serde(tag = "action", rename_all = "snake_case")]
1604enum ChaosAction {
1605 ForceUnhealthy { target_node: String },
1609 Restore { target_node: String },
1611 Reset,
1613}
1614
1615#[cfg(feature = "wasm-plugins")]
1618#[derive(Serialize)]
1619struct PluginListEntry {
1620 name: String,
1621 version: String,
1622 description: String,
1623 hooks: Vec<String>,
1625 state: String,
1627 invocations: u64,
1628 errors: u64,
1629}
1630
1631#[cfg(feature = "ha-tr")]
1633#[derive(Debug, Deserialize)]
1634struct ReplayRequestBody {
1635 from: DateTime<Utc>,
1637 to: DateTime<Utc>,
1639 target_host: String,
1641 target_port: u16,
1643 #[serde(default)]
1649 target_user: Option<String>,
1650 #[serde(default)]
1651 target_password: Option<String>,
1652 #[serde(default)]
1653 target_database: Option<String>,
1654}
1655
1656#[derive(Serialize)]
1660struct TopologyResponse {
1661 #[serde(rename = "currentPrimary")]
1662 current_primary: Option<String>,
1663 #[serde(rename = "healthyNodes")]
1664 healthy_nodes: u32,
1665 #[serde(rename = "unhealthyNodes")]
1666 unhealthy_nodes: u32,
1667 #[serde(rename = "totalNodes")]
1668 total_nodes: u32,
1669 #[serde(rename = "lastFailoverAt")]
1672 last_failover_at: Option<String>,
1673}
1674
1675#[derive(Serialize)]
1676struct PoolStatsResponse {
1677 node: String,
1678 active_connections: u64,
1679 idle_connections: u64,
1680 pending_requests: u64,
1681 total_connections_created: u64,
1682 total_connections_closed: u64,
1683}
1684
1685#[derive(Serialize)]
1686struct VersionResponse {
1687 version: String,
1688 build_time: String,
1689}
1690
1691#[cfg(test)]
1692mod tests {
1693 use super::*;
1694
1695 #[tokio::test]
1696 async fn test_admin_state_creation() {
1697 let state = AdminState::new();
1698 let sessions = state.active_sessions.read().await;
1699 assert_eq!(*sessions, 0);
1700 }
1701
1702 #[tokio::test]
1703 async fn test_readiness_check_no_nodes() {
1704 let state = Arc::new(AdminState::new());
1705 let ready = AdminServer::check_readiness(&state).await;
1706 assert!(!ready);
1707 }
1708
1709 #[tokio::test]
1710 async fn test_readiness_check_with_healthy_node() {
1711 let state = Arc::new(AdminState::new());
1712
1713 {
1714 let mut health = state.node_health.write().await;
1715 health.insert(
1716 "localhost:5432".to_string(),
1717 NodeHealth {
1718 address: "localhost:5432".to_string(),
1719 healthy: true,
1720 last_check: chrono::Utc::now(),
1721 failure_count: 0,
1722 last_error: None,
1723 latency_ms: 1.0,
1724 replication_lag_bytes: None,
1725 },
1726 );
1727 }
1728
1729 let ready = AdminServer::check_readiness(&state).await;
1730 assert!(ready);
1731 }
1732
1733 #[tokio::test]
1734 async fn test_command_registration() {
1735 let state = AdminState::new();
1736
1737 state
1738 .register_command("test", |args| {
1739 Ok(format!("Test command with {} args", args.len()))
1740 })
1741 .await;
1742
1743 let result = state.execute_command("test", &["arg1", "arg2"]).await;
1744 assert!(result.is_ok());
1745 assert_eq!(result.unwrap(), "Test command with 2 args");
1746 }
1747
1748 #[tokio::test]
1749 async fn test_unknown_command() {
1750 let state = AdminState::new();
1751 let result = state.execute_command("unknown", &[]).await;
1752 assert!(result.is_err());
1753 }
1754
1755 #[test]
1756 fn test_prometheus_metrics_format() {
1757 let metrics = ServerMetricsSnapshot {
1758 connections_accepted: 100,
1759 connections_closed: 50,
1760 queries_processed: 1000,
1761 bytes_received: 50000,
1762 bytes_sent: 100000,
1763 failovers: 2,
1764 };
1765
1766 let output = AdminServer::format_prometheus_metrics(&metrics);
1767 assert!(output.contains("heliosdb_proxy_connections_total 100"));
1768 assert!(output.contains("heliosdb_proxy_queries_total 1000"));
1769 assert!(output.contains("heliosdb_proxy_failovers_total 2"));
1770 }
1771
1772 #[test]
1773 fn test_metrics_response_active_connections() {
1774 let snapshot = ServerMetricsSnapshot {
1775 connections_accepted: 100,
1776 connections_closed: 30,
1777 queries_processed: 500,
1778 bytes_received: 10000,
1779 bytes_sent: 20000,
1780 failovers: 1,
1781 };
1782
1783 let response = MetricsResponse::from(snapshot);
1784 assert_eq!(response.connections_active, 70);
1785 }
1786
1787 async fn topology_state(
1790 nodes: &[(&str, &str, bool)],
1791 ) -> Arc<AdminState> {
1792 let state = Arc::new(AdminState::new());
1793 {
1794 let mut cfg = state.config_snapshot.write().await;
1795 cfg.nodes = nodes
1796 .iter()
1797 .map(|(addr, role, _)| NodeSnapshot {
1798 address: (*addr).to_string(),
1799 role: (*role).to_string(),
1800 weight: 100,
1801 enabled: true,
1802 })
1803 .collect();
1804 }
1805 {
1806 let mut health = state.node_health.write().await;
1807 for (addr, _, healthy) in nodes {
1808 health.insert(
1809 (*addr).to_string(),
1810 NodeHealth {
1811 address: (*addr).to_string(),
1812 healthy: *healthy,
1813 last_check: chrono::Utc::now(),
1814 failure_count: 0,
1815 last_error: None,
1816 latency_ms: 1.0,
1817 replication_lag_bytes: None,
1818 },
1819 );
1820 }
1821 }
1822 state
1823 }
1824
1825 #[tokio::test]
1826 async fn test_topology_returns_healthy_primary() {
1827 let state = topology_state(&[
1828 ("primary.svc:5432", "primary", true),
1829 ("standby-a.svc:5432", "standby", true),
1830 ("standby-b.svc:5432", "standby", false),
1831 ])
1832 .await;
1833
1834 let topo = AdminServer::compute_topology(&state).await;
1835 assert_eq!(topo.current_primary.as_deref(), Some("primary.svc:5432"));
1836 assert_eq!(topo.healthy_nodes, 2);
1837 assert_eq!(topo.unhealthy_nodes, 1);
1838 assert_eq!(topo.total_nodes, 3);
1839 }
1840
1841 #[tokio::test]
1842 async fn test_topology_no_primary_when_primary_unhealthy() {
1843 let state = topology_state(&[
1846 ("primary.svc:5432", "primary", false),
1847 ("standby.svc:5432", "standby", true),
1848 ])
1849 .await;
1850
1851 let topo = AdminServer::compute_topology(&state).await;
1852 assert_eq!(topo.current_primary, None);
1853 assert_eq!(topo.healthy_nodes, 1);
1854 assert_eq!(topo.unhealthy_nodes, 1);
1855 }
1856
1857 #[tokio::test]
1858 async fn test_topology_handles_empty_cluster() {
1859 let state = Arc::new(AdminState::new());
1860 let topo = AdminServer::compute_topology(&state).await;
1861 assert_eq!(topo.current_primary, None);
1862 assert_eq!(topo.healthy_nodes, 0);
1863 assert_eq!(topo.unhealthy_nodes, 0);
1864 assert_eq!(topo.total_nodes, 0);
1865 }
1866
1867 #[tokio::test]
1868 async fn test_topology_role_match_is_case_insensitive() {
1869 let state = topology_state(&[
1870 ("primary.svc:5432", "PRIMARY", true),
1871 ])
1872 .await;
1873 let topo = AdminServer::compute_topology(&state).await;
1874 assert_eq!(topo.current_primary.as_deref(), Some("primary.svc:5432"));
1875 }
1876
1877 #[cfg(feature = "ha-tr")]
1878 #[tokio::test]
1879 async fn test_replay_returns_503_when_engine_unattached() {
1880 let state = Arc::new(AdminState::new());
1881 let body = r#"{
1882 "from": "2026-04-25T10:00:00Z",
1883 "to": "2026-04-25T11:00:00Z",
1884 "target_host": "127.0.0.1",
1885 "target_port": 5432
1886 }"#;
1887 let (status, value) = AdminServer::handle_replay_request(Some(body), &state)
1888 .await
1889 .expect("handler returns Ok with status code");
1890 assert_eq!(status, 503);
1891 assert_eq!(value["error"], "replay engine not attached");
1892 }
1893
1894 #[cfg(feature = "ha-tr")]
1895 #[tokio::test]
1896 async fn test_replay_400_on_malformed_body() {
1897 let state = Arc::new(AdminState::new());
1898 let (status, _) = AdminServer::handle_replay_request(Some("not json"), &state)
1899 .await
1900 .expect("handler returns Ok with status code");
1901 assert_eq!(status, 400);
1902 }
1903
1904 #[cfg(feature = "ha-tr")]
1905 #[tokio::test]
1906 async fn test_replay_errors_on_empty_body() {
1907 let state = Arc::new(AdminState::new());
1908 let err = AdminServer::handle_replay_request(None, &state).await;
1909 assert!(err.is_err(), "empty body must surface as Err");
1910 }
1911
1912 #[cfg(feature = "wasm-plugins")]
1913 #[tokio::test]
1914 async fn test_plugins_list_returns_503_when_manager_unattached() {
1915 let state = Arc::new(AdminState::new());
1916 let (status, value) = AdminServer::handle_plugins_list(&state)
1917 .await
1918 .expect("handler returns Ok with status code");
1919 assert_eq!(status, 503);
1920 assert_eq!(value["error"], "plugin manager not attached");
1921 }
1922
1923 #[cfg(not(feature = "wasm-plugins"))]
1924 #[tokio::test]
1925 async fn test_plugins_list_503_without_feature() {
1926 let state = Arc::new(AdminState::new());
1927 let (status, _) = AdminServer::handle_plugins_list(&state)
1928 .await
1929 .expect("handler returns Ok");
1930 assert_eq!(status, 503);
1931 }
1932
1933 async fn chaos_state_with_node(addr: &str) -> Arc<AdminState> {
1935 let state = Arc::new(AdminState::new());
1936 state.node_health.write().await.insert(
1937 addr.to_string(),
1938 NodeHealth {
1939 address: addr.to_string(),
1940 healthy: true,
1941 last_check: chrono::Utc::now(),
1942 failure_count: 0,
1943 last_error: None,
1944 latency_ms: 1.0,
1945 replication_lag_bytes: None,
1946 },
1947 );
1948 state
1949 }
1950
1951 #[tokio::test]
1952 async fn test_chaos_force_unhealthy_flips_node_and_records_override() {
1953 let state = chaos_state_with_node("primary.svc:5432").await;
1954 let body = r#"{"action":"force_unhealthy","target_node":"primary.svc:5432"}"#;
1955 let (status, value) = AdminServer::handle_chaos_request(Some(body), &state)
1956 .await
1957 .expect("handler returns Ok");
1958 assert_eq!(status, 200);
1959 assert_eq!(value["applied"], "force_unhealthy");
1960 assert!(!state.node_health.read().await["primary.svc:5432"].healthy);
1962 assert!(state.chaos_overrides.read().await.contains_key("primary.svc:5432"));
1964 }
1965
1966 #[tokio::test]
1967 async fn test_chaos_restore_clears_override_and_flips_back() {
1968 let state = chaos_state_with_node("primary.svc:5432").await;
1969 let _ = AdminServer::handle_chaos_request(
1970 Some(r#"{"action":"force_unhealthy","target_node":"primary.svc:5432"}"#),
1971 &state,
1972 )
1973 .await
1974 .unwrap();
1975 let (status, _) = AdminServer::handle_chaos_request(
1976 Some(r#"{"action":"restore","target_node":"primary.svc:5432"}"#),
1977 &state,
1978 )
1979 .await
1980 .unwrap();
1981 assert_eq!(status, 200);
1982 assert!(state.node_health.read().await["primary.svc:5432"].healthy);
1983 assert!(state.chaos_overrides.read().await.is_empty());
1984 }
1985
1986 #[tokio::test]
1987 async fn test_chaos_reset_restores_all_overrides() {
1988 let state = chaos_state_with_node("a:5432").await;
1989 state.node_health.write().await.insert(
1990 "b:5432".to_string(),
1991 NodeHealth {
1992 address: "b:5432".to_string(),
1993 healthy: true,
1994 last_check: chrono::Utc::now(),
1995 failure_count: 0,
1996 last_error: None,
1997 latency_ms: 1.0,
1998 replication_lag_bytes: None,
1999 },
2000 );
2001 for addr in &["a:5432", "b:5432"] {
2002 let body = format!(r#"{{"action":"force_unhealthy","target_node":"{}"}}"#, addr);
2003 let _ = AdminServer::handle_chaos_request(Some(&body), &state)
2004 .await
2005 .unwrap();
2006 }
2007 let (status, value) = AdminServer::handle_chaos_request(
2008 Some(r#"{"action":"reset"}"#),
2009 &state,
2010 )
2011 .await
2012 .unwrap();
2013 assert_eq!(status, 200);
2014 assert_eq!(value["reset"], true);
2015 let restored = value["restored"].as_array().unwrap();
2016 assert_eq!(restored.len(), 2);
2017 for addr in &["a:5432", "b:5432"] {
2019 assert!(state.node_health.read().await[*addr].healthy);
2020 }
2021 assert!(state.chaos_overrides.read().await.is_empty());
2022 }
2023
2024 #[tokio::test]
2025 async fn test_chaos_force_unhealthy_404s_when_node_unknown() {
2026 let state = Arc::new(AdminState::new());
2027 let body = r#"{"action":"force_unhealthy","target_node":"missing.svc:5432"}"#;
2028 let (status, _) = AdminServer::handle_chaos_request(Some(body), &state)
2029 .await
2030 .expect("handler returns Ok");
2031 assert_eq!(status, 404);
2032 }
2033
2034 #[tokio::test]
2035 async fn test_chaos_400_on_malformed_body() {
2036 let state = Arc::new(AdminState::new());
2037 let (status, _) = AdminServer::handle_chaos_request(Some("not json"), &state)
2038 .await
2039 .expect("handler returns Ok");
2040 assert_eq!(status, 400);
2041 }
2042
2043 #[tokio::test]
2044 async fn test_chaos_400_on_unknown_action() {
2045 let state = Arc::new(AdminState::new());
2046 let body = r#"{"action":"format_disk","target_node":"x"}"#;
2047 let (status, _) = AdminServer::handle_chaos_request(Some(body), &state)
2048 .await
2049 .expect("handler returns Ok");
2050 assert_eq!(status, 400);
2051 }
2052
2053 #[cfg(feature = "ha-tr")]
2054 #[tokio::test]
2055 async fn test_shadow_400_on_malformed_body() {
2056 let (status, _) = AdminServer::handle_shadow_request(Some("not json"))
2057 .await
2058 .expect("handler returns Ok");
2059 assert_eq!(status, 400);
2060 }
2061
2062 #[cfg(feature = "ha-tr")]
2063 #[tokio::test]
2064 async fn test_shadow_500_on_source_unreachable() {
2065 let body = r#"{
2068 "sql": "SELECT 1",
2069 "source_host": "127.0.0.1",
2070 "source_port": 1,
2071 "shadow_host": "127.0.0.1",
2072 "shadow_port": 1
2073 }"#;
2074 let (status, value) = AdminServer::handle_shadow_request(Some(body))
2075 .await
2076 .expect("handler returns Ok");
2077 assert_eq!(status, 500);
2078 let err = value["error"].as_str().expect("error field");
2079 assert!(
2080 err.contains("source connect"),
2081 "expected source connect error, got {}",
2082 err
2083 );
2084 }
2085
2086 #[cfg(feature = "ha-tr")]
2087 #[tokio::test]
2088 async fn test_shadow_errors_on_empty_body() {
2089 let err = AdminServer::handle_shadow_request(None).await;
2090 assert!(err.is_err(), "empty body must surface as Err");
2091 }
2092
2093 #[cfg(feature = "anomaly-detection")]
2094 #[tokio::test]
2095 async fn test_anomalies_returns_503_when_detector_unattached() {
2096 let state = Arc::new(AdminState::new());
2097 let (status, value) =
2098 AdminServer::handle_anomalies_list("/anomalies", &state)
2099 .await
2100 .expect("handler returns Ok");
2101 assert_eq!(status, 503);
2102 assert_eq!(value["error"], "anomaly detector not attached");
2103 }
2104
2105 #[cfg(feature = "anomaly-detection")]
2106 #[tokio::test]
2107 async fn test_anomalies_returns_attached_detector_events() {
2108 use crate::anomaly::{AnomalyConfig, AnomalyDetector, QueryObservation};
2109 let state = Arc::new(AdminState::new());
2110 let det = Arc::new(AnomalyDetector::new(AnomalyConfig::default()));
2111 let _ = det.record_query(&QueryObservation {
2113 tenant: "test".into(),
2114 fingerprint: "fp".into(),
2115 sql: "SELECT * FROM users WHERE id = 1 OR 1=1 --".into(),
2116 timestamp: std::time::Instant::now(),
2117 iso_timestamp: "ts".into(),
2118 });
2119 state.with_anomaly_detector(det.clone()).await;
2120
2121 let (status, value) =
2122 AdminServer::handle_anomalies_list("/anomalies", &state)
2123 .await
2124 .expect("handler returns Ok");
2125 assert_eq!(status, 200);
2126 let count = value["count"].as_u64().expect("count field");
2127 assert!(count > 0, "expected at least one event, got {}", count);
2128 }
2129
2130 #[cfg(feature = "anomaly-detection")]
2131 #[tokio::test]
2132 async fn test_anomalies_limit_query_string_respected() {
2133 use crate::anomaly::{AnomalyConfig, AnomalyDetector, QueryObservation};
2134 let state = Arc::new(AdminState::new());
2135 let det = Arc::new(AnomalyDetector::new(AnomalyConfig::default()));
2136 for i in 0..50 {
2137 let fp = format!("fp{}", i);
2138 let _ = det.record_query(&QueryObservation {
2139 tenant: "test".into(),
2140 fingerprint: fp,
2141 sql: "SELECT 1".into(),
2142 timestamp: std::time::Instant::now(),
2143 iso_timestamp: "ts".into(),
2144 });
2145 }
2146 state.with_anomaly_detector(det).await;
2147
2148 let (status, value) =
2149 AdminServer::handle_anomalies_list("/anomalies?limit=5", &state)
2150 .await
2151 .expect("handler returns Ok");
2152 assert_eq!(status, 200);
2153 assert_eq!(value["limit"].as_u64().unwrap(), 5);
2154 assert_eq!(value["events"].as_array().unwrap().len(), 5);
2155 }
2156
2157 #[cfg(feature = "anomaly-detection")]
2158 #[test]
2159 fn test_parse_limit_query_helper() {
2160 assert_eq!(parse_limit_query("/anomalies", 100, 1024), 100);
2161 assert_eq!(parse_limit_query("/anomalies?limit=42", 100, 1024), 42);
2162 assert_eq!(parse_limit_query("/anomalies?limit=99999", 100, 1024), 1024);
2163 assert_eq!(parse_limit_query("/anomalies?limit=abc", 100, 1024), 100);
2164 assert_eq!(parse_limit_query("/anomalies?other=x&limit=7", 100, 1024), 7);
2165 }
2166
2167 #[cfg(feature = "edge-proxy")]
2168 async fn edge_state() -> Arc<AdminState> {
2169 use crate::edge::{EdgeCache, EdgeRegistry};
2170 use std::time::Duration;
2171 let s = Arc::new(AdminState::new());
2172 let cache = Arc::new(EdgeCache::new(100));
2173 let registry = Arc::new(EdgeRegistry::new(8, Duration::from_secs(60)));
2174 s.with_edge(cache, registry).await;
2175 s
2176 }
2177
2178 #[cfg(feature = "edge-proxy")]
2179 #[tokio::test]
2180 async fn test_edge_status_returns_empty_lists_initially() {
2181 let s = edge_state().await;
2182 let (status, value) = AdminServer::handle_edge_status(&s)
2183 .await
2184 .expect("handler returns Ok");
2185 assert_eq!(status, 200);
2186 assert_eq!(value["edge_count"].as_u64().unwrap(), 0);
2187 assert_eq!(value["registered"].as_array().unwrap().len(), 0);
2188 assert!(value["cache"].is_object(), "cache stats present");
2189 }
2190
2191 #[cfg(feature = "edge-proxy")]
2192 #[tokio::test]
2193 async fn test_edge_register_then_status_lists_edge() {
2194 let s = edge_state().await;
2195 let body = r#"{"edge_id":"e1","region":"us-east","base_url":"https://e1.svc"}"#;
2196 let (status, _) = AdminServer::handle_edge_register(Some(body), &s)
2197 .await
2198 .expect("handler ok");
2199 assert_eq!(status, 201);
2200 let (status2, value2) = AdminServer::handle_edge_status(&s).await.unwrap();
2201 assert_eq!(status2, 200);
2202 assert_eq!(value2["edge_count"].as_u64().unwrap(), 1);
2203 assert_eq!(
2204 value2["registered"][0]["edge_id"].as_str().unwrap(),
2205 "e1"
2206 );
2207 }
2208
2209 #[cfg(feature = "edge-proxy")]
2210 #[tokio::test]
2211 async fn test_edge_register_400_on_malformed_body() {
2212 let s = edge_state().await;
2213 let (status, _) = AdminServer::handle_edge_register(Some("not json"), &s)
2214 .await
2215 .expect("handler ok");
2216 assert_eq!(status, 400);
2217 }
2218
2219 #[cfg(feature = "edge-proxy")]
2220 #[tokio::test]
2221 async fn test_edge_invalidate_drops_local_cache_entries() {
2222 use crate::edge::{CacheEntry, CacheKey};
2223 use std::time::{Duration, Instant};
2224 let s = edge_state().await;
2225 let cache = s.edge_cache.read().await.clone().unwrap();
2227 cache.insert(
2228 CacheKey::new("fp1", "p1"),
2229 CacheEntry {
2230 version: 1,
2231 response_bytes: b"row".to_vec(),
2232 tables: vec!["users".into()],
2233 expires_at: Instant::now() + Duration::from_secs(60),
2234 },
2235 );
2236 assert!(cache.get(&CacheKey::new("fp1", "p1")).is_some());
2237
2238 let body = r#"{"tables":["users"]}"#;
2239 let (status, value) = AdminServer::handle_edge_invalidate(Some(body), &s)
2240 .await
2241 .expect("handler ok");
2242 assert_eq!(status, 200);
2243 assert_eq!(value["dropped_local"].as_u64().unwrap(), 1);
2244 assert!(cache.get(&CacheKey::new("fp1", "p1")).is_none());
2245 }
2246
2247 #[cfg(feature = "edge-proxy")]
2248 #[tokio::test]
2249 async fn test_edge_invalidate_503_when_cache_unattached() {
2250 let s = Arc::new(AdminState::new());
2251 let body = r#"{"tables":["users"]}"#;
2252 let (status, _) = AdminServer::handle_edge_invalidate(Some(body), &s)
2253 .await
2254 .expect("handler ok");
2255 assert_eq!(status, 503);
2256 }
2257}