1#[cfg(feature = "anomaly-detection")]
7use crate::anomaly::AnomalyDetector;
8use crate::config::{NodeConfig, NodeRole, ProxyConfig};
9#[cfg(feature = "edge-proxy")]
10use crate::edge::{EdgeCache, EdgeRegistry, InvalidationEvent};
11#[cfg(feature = "wasm-plugins")]
12use crate::plugins::PluginManager;
13#[cfg(feature = "ha-tr")]
14use crate::replay::{ReplayEngine, TimeTravelRequest};
15use crate::server::{NodeHealth, ServerMetricsSnapshot};
16use crate::{ProxyError, Result};
17#[cfg(feature = "ha-tr")]
18use chrono::{DateTime, Utc};
19use serde::{Deserialize, Serialize};
20use std::collections::HashMap;
21use std::net::SocketAddr;
22use std::sync::atomic::{AtomicUsize, Ordering};
23use std::sync::Arc;
24use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
25use tokio::net::TcpStream;
26use tokio::sync::{broadcast, RwLock};
27
28const ADMIN_UI_HTML: &str = include_str!("admin_ui.html");
32
33fn constant_time_eq_str(a: &str, b: &str) -> bool {
36 let (a, b) = (a.as_bytes(), b.as_bytes());
37 if a.len() != b.len() {
38 return false;
39 }
40 let mut diff = 0u8;
41 for i in 0..a.len() {
42 diff |= a[i] ^ b[i];
43 }
44 diff == 0
45}
46
47pub struct AdminServer {
48 listen_address: String,
50 state: Arc<AdminState>,
52 shutdown_tx: broadcast::Sender<()>,
54}
55
56pub struct AdminState {
58 pub node_health: RwLock<HashMap<String, NodeHealth>>,
60 pub metrics: RwLock<ServerMetricsSnapshot>,
62 pub active_sessions: RwLock<u64>,
64 pub config_snapshot: RwLock<ConfigSnapshot>,
66 pub proxy_config: RwLock<Option<ProxyConfig>>,
68 read_lb_counter: AtomicUsize,
70 commands: RwLock<HashMap<String, CommandHandler>>,
72 #[cfg(feature = "pool-modes")]
76 pub pool_manager: RwLock<Option<Arc<crate::pool::ConnectionPoolManager>>>,
77 #[cfg(feature = "circuit-breaker")]
80 pub circuit_breaker: RwLock<Option<Arc<crate::circuit_breaker::CircuitBreakerManager>>>,
81 #[cfg(feature = "ha-tr")]
85 pub replay_engine: RwLock<Option<Arc<ReplayEngine>>>,
86 #[cfg(feature = "wasm-plugins")]
91 pub plugin_manager: RwLock<Option<Arc<PluginManager>>>,
92 pub chaos_overrides: RwLock<HashMap<String, ChaosOverride>>,
97 #[cfg(feature = "anomaly-detection")]
101 pub anomaly_detector: RwLock<Option<Arc<AnomalyDetector>>>,
102 #[cfg(feature = "query-analytics")]
105 pub analytics: RwLock<Option<Arc<crate::analytics::QueryAnalytics>>>,
106 #[cfg(feature = "edge-proxy")]
109 pub edge_cache: RwLock<Option<Arc<EdgeCache>>>,
110 #[cfg(feature = "edge-proxy")]
111 pub edge_registry: RwLock<Option<Arc<EdgeRegistry>>>,
112 pub auth_token: RwLock<Option<String>>,
115 pub migration: RwLock<Option<MigrationInfo>>,
118 pub branch: RwLock<Option<crate::config::BranchConfig>>,
121}
122
123#[derive(Clone)]
126pub struct MigrationInfo {
127 pub target: String,
128 pub writes_only: bool,
129 pub metrics: Arc<crate::mirror::MirrorMetrics>,
130 pub config: crate::config::MirrorConfig,
132 pub cutover: Arc<arc_swap::ArcSwap<Option<Arc<crate::mirror::CutoverTarget>>>>,
134 pub cutover_target: crate::mirror::CutoverTarget,
135}
136
137#[derive(Debug, Clone, Serialize)]
142pub struct ChaosOverride {
143 pub since: String,
145 pub kind: String,
147 pub note: String,
149}
150
151type CommandHandler = Arc<dyn Fn(&[&str]) -> Result<String> + Send + Sync>;
153
154#[derive(Debug, Clone, Serialize, Deserialize)]
156pub struct ConfigSnapshot {
157 pub listen_address: String,
158 pub admin_address: String,
159 pub tr_enabled: bool,
160 pub tr_mode: String,
161 pub pool_min_connections: usize,
162 pub pool_max_connections: usize,
163 pub nodes: Vec<NodeSnapshot>,
164}
165
166#[derive(Debug, Clone, Serialize, Deserialize)]
168pub struct NodeSnapshot {
169 pub address: String,
170 pub role: String,
171 pub weight: u32,
172 pub enabled: bool,
173}
174
175impl AdminServer {
176 pub fn new(listen_address: String, state: Arc<AdminState>) -> Self {
178 let (shutdown_tx, _) = broadcast::channel(1);
179
180 Self {
181 listen_address,
182 state,
183 shutdown_tx,
184 }
185 }
186
187 pub async fn run(&self) -> Result<()> {
189 let listener = crate::server::bind_reuseport(&self.listen_address)?;
192
193 tracing::info!(
194 "Admin API listening on {} (SO_REUSEPORT)",
195 self.listen_address
196 );
197
198 let mut shutdown_rx = self.shutdown_tx.subscribe();
199
200 loop {
201 tokio::select! {
202 accept_result = listener.accept() => {
203 match accept_result {
204 Ok((stream, addr)) => {
205 let state = self.state.clone();
206 tokio::spawn(async move {
207 if let Err(e) = Self::handle_connection(stream, addr, state).await {
208 tracing::error!("Admin connection error: {}", e);
209 }
210 });
211 }
212 Err(e) => {
213 tracing::error!("Admin accept error: {}", e);
214 }
215 }
216 }
217 _ = shutdown_rx.recv() => {
218 tracing::info!("Admin server shutting down");
219 break;
220 }
221 }
222 }
223
224 Ok(())
225 }
226
227 async fn handle_connection(
229 mut stream: TcpStream,
230 addr: SocketAddr,
231 state: Arc<AdminState>,
232 ) -> Result<()> {
233 tracing::debug!("Admin connection from {}", addr);
234
235 let (reader, mut writer) = stream.split();
236 let mut reader = BufReader::new(reader);
237 let mut line = String::new();
238
239 let mut headers = Vec::new();
241 let mut content_length: usize = 0;
242
243 loop {
244 line.clear();
245 let bytes_read = reader
246 .read_line(&mut line)
247 .await
248 .map_err(|e| ProxyError::Network(format!("Read error: {}", e)))?;
249
250 if bytes_read == 0 || line == "\r\n" {
251 break;
252 }
253
254 let trimmed = line.trim();
256 if trimmed.to_lowercase().starts_with("content-length:") {
257 if let Some(len_str) = trimmed.split(':').nth(1) {
258 content_length = len_str.trim().parse().unwrap_or(0);
259 }
260 }
261 headers.push(trimmed.to_string());
262 }
263
264 if headers.is_empty() {
265 return Ok(());
266 }
267
268 let request_line = &headers[0];
270 let parts: Vec<&str> = request_line.split_whitespace().collect();
271
272 if parts.len() < 2 {
273 Self::send_response(&mut writer, 400, "Bad Request", "Invalid request line").await?;
274 return Ok(());
275 }
276
277 let method = parts[0];
278 let path = parts[1];
279
280 {
284 let required = state.auth_token.read().await.clone();
285 if let Some(token) = required {
286 let path_only = path.split('?').next().unwrap_or(path);
287 let is_liveness = method == "GET"
288 && matches!(path_only, "/health" | "/healthz" | "/livez" | "/readyz");
289 if !is_liveness && !Self::admin_authorized(&headers, &token) {
290 Self::send_response(
291 &mut writer,
292 401,
293 "Unauthorized",
294 "{\"error\":\"missing or invalid admin bearer token\"}",
295 )
296 .await?;
297 return Ok(());
298 }
299 }
300 }
301
302 let body = if content_length > 0 && (method == "POST" || method == "PUT") {
304 let mut body_buf = vec![0u8; content_length];
305 reader
306 .read_exact(&mut body_buf)
307 .await
308 .map_err(|e| ProxyError::Network(format!("Body read error: {}", e)))?;
309 Some(String::from_utf8_lossy(&body_buf).to_string())
310 } else {
311 None
312 };
313
314 if method == "GET" && (path == "/" || path == "/ui" || path == "/ui/") {
317 Self::send_html_response(&mut writer, 200, ADMIN_UI_HTML).await?;
318 return Ok(());
319 }
320
321 let response = Self::route_request(method, path, body.as_deref(), &state).await;
323
324 match response {
325 Ok((status, body)) => {
326 Self::send_json_response(&mut writer, status, &body).await?;
327 }
328 Err(e) => {
329 let error = ErrorResponse {
330 error: e.to_string(),
331 };
332 Self::send_json_response(&mut writer, 500, &error).await?;
333 }
334 }
335
336 Ok(())
337 }
338
339 fn admin_authorized(headers: &[String], token: &str) -> bool {
342 let expected = format!("Bearer {}", token);
343 for h in headers {
344 let mut sp = h.splitn(2, ':');
345 let name = sp.next().unwrap_or("").trim();
346 if name.eq_ignore_ascii_case("authorization") {
347 let value = sp.next().unwrap_or("").trim();
348 return constant_time_eq_str(value, &expected);
349 }
350 }
351 false
352 }
353
354 async fn send_html_response(
356 writer: &mut tokio::net::tcp::WriteHalf<'_>,
357 status: u16,
358 html: &str,
359 ) -> Result<()> {
360 let status_text = match status {
361 200 => "OK",
362 404 => "Not Found",
363 _ => "Unknown",
364 };
365 let response = format!(
366 "HTTP/1.1 {} {}\r\nContent-Type: text/html; charset=utf-8\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
367 status,
368 status_text,
369 html.len(),
370 html
371 );
372 writer
373 .write_all(response.as_bytes())
374 .await
375 .map_err(|e| ProxyError::Network(format!("Write error: {}", e)))?;
376 Ok(())
377 }
378
379 async fn route_request(
381 method: &str,
382 path: &str,
383 body: Option<&str>,
384 state: &Arc<AdminState>,
385 ) -> Result<(u16, serde_json::Value)> {
386 match (method, path) {
387 ("POST", "/api/sql") => Self::handle_sql_request(body, state).await,
389
390 ("GET", "/health") => {
392 let health = HealthResponse { status: "ok" };
393 Ok((200, serde_json::to_value(health)?))
394 }
395 ("GET", "/health/ready") => {
396 let ready = Self::check_readiness(state).await;
397 let response = ReadinessResponse {
398 ready,
399 message: if ready {
400 "Proxy is ready"
401 } else {
402 "Proxy is not ready"
403 },
404 };
405 let status = if ready { 200 } else { 503 };
406 Ok((status, serde_json::to_value(response)?))
407 }
408 ("GET", "/health/live") => {
409 let response = LivenessResponse { alive: true };
410 Ok((200, serde_json::to_value(response)?))
411 }
412
413 ("GET", "/metrics") => {
415 let metrics = state.metrics.read().await.clone();
416 Ok((200, serde_json::to_value(MetricsResponse::from(metrics))?))
417 }
418 ("GET", "/metrics/prometheus") => {
419 let metrics = state.metrics.read().await.clone();
420 let prometheus = Self::format_prometheus_metrics(&metrics);
421 Ok((200, serde_json::json!({ "text": prometheus })))
422 }
423
424 ("GET", "/nodes") => {
426 let health = state.node_health.read().await;
427 let nodes: Vec<NodeHealthResponse> = health
428 .values()
429 .map(|h| NodeHealthResponse::from(h.clone()))
430 .collect();
431 Ok((200, serde_json::to_value(nodes)?))
432 }
433 ("GET", path) if path.starts_with("/nodes/") => {
434 let node_addr = path.trim_start_matches("/nodes/");
435 let health = state.node_health.read().await;
436 match health.get(node_addr) {
437 Some(h) => Ok((
438 200,
439 serde_json::to_value(NodeHealthResponse::from(h.clone()))?,
440 )),
441 None => Ok((404, serde_json::json!({ "error": "Node not found" }))),
442 }
443 }
444 ("POST", path) if path.starts_with("/nodes/") && path.ends_with("/enable") => {
445 let node_addr = path
446 .trim_start_matches("/nodes/")
447 .trim_end_matches("/enable");
448 Self::set_node_enabled(state, node_addr, true).await?;
449 Ok((200, serde_json::json!({ "status": "enabled" })))
450 }
451 ("POST", path) if path.starts_with("/nodes/") && path.ends_with("/disable") => {
452 let node_addr = path
453 .trim_start_matches("/nodes/")
454 .trim_end_matches("/disable");
455 Self::set_node_enabled(state, node_addr, false).await?;
456 Ok((200, serde_json::json!({ "status": "disabled" })))
457 }
458
459 ("GET", "/topology") => {
465 let topo = Self::compute_topology(state).await;
466 Ok((200, serde_json::to_value(topo)?))
467 }
468
469 #[cfg(feature = "ha-tr")]
473 ("POST", "/api/replay") => Self::handle_replay_request(body, state).await,
474 #[cfg(not(feature = "ha-tr"))]
475 ("POST", "/api/replay") => Ok((
476 503,
477 serde_json::json!({ "error": "ha-tr feature not compiled in" }),
478 )),
479
480 #[cfg(feature = "ha-tr")]
486 ("POST", "/api/shadow") => Self::handle_shadow_request(body).await,
487 #[cfg(not(feature = "ha-tr"))]
488 ("POST", "/api/shadow") => Ok((
489 503,
490 serde_json::json!({ "error": "ha-tr feature not compiled in" }),
491 )),
492
493 ("GET", "/plugins") => Self::handle_plugins_list(state).await,
498
499 #[cfg(feature = "anomaly-detection")]
503 ("GET", p) if p == "/anomalies" || p.starts_with("/anomalies?") => {
504 Self::handle_anomalies_list(p, state).await
505 }
506 #[cfg(not(feature = "anomaly-detection"))]
507 ("GET", p) if p == "/anomalies" || p.starts_with("/anomalies?") => Ok((
508 503,
509 serde_json::json!({ "error": "anomaly-detection feature not compiled in" }),
510 )),
511
512 #[cfg(feature = "query-analytics")]
514 ("GET", p)
515 if p == "/api/analytics"
516 || p == "/analytics"
517 || p.starts_with("/api/analytics?")
518 || p.starts_with("/analytics?") =>
519 {
520 Self::handle_analytics(p, state).await
521 }
522 #[cfg(not(feature = "query-analytics"))]
523 ("GET", p)
524 if p == "/api/analytics"
525 || p == "/analytics"
526 || p.starts_with("/api/analytics?")
527 || p.starts_with("/analytics?") =>
528 {
529 Ok((
530 503,
531 serde_json::json!({ "error": "query-analytics feature not compiled in" }),
532 ))
533 }
534
535 #[cfg(feature = "edge-proxy")]
539 ("GET", "/api/edge") => Self::handle_edge_status(state).await,
540 #[cfg(feature = "edge-proxy")]
541 ("POST", "/api/edge/register") => Self::handle_edge_register(body, state).await,
542 #[cfg(feature = "edge-proxy")]
543 ("POST", "/api/edge/invalidate") => Self::handle_edge_invalidate(body, state).await,
544 #[cfg(not(feature = "edge-proxy"))]
545 ("GET", "/api/edge")
546 | ("POST", "/api/edge/register")
547 | ("POST", "/api/edge/invalidate") => Ok((
548 503,
549 serde_json::json!({ "error": "edge-proxy feature not compiled in" }),
550 )),
551
552 ("POST", "/api/chaos") => Self::handle_chaos_request(body, state).await,
556 ("GET", "/api/chaos") => {
559 let overrides = state.chaos_overrides.read().await.clone();
560 Ok((200, serde_json::to_value(overrides)?))
561 }
562
563 #[cfg(feature = "circuit-breaker")]
566 ("GET", "/api/circuit") => Self::handle_circuit_status(state).await,
567 #[cfg(not(feature = "circuit-breaker"))]
568 ("GET", "/api/circuit") => Ok((
569 503,
570 serde_json::json!({ "error": "circuit-breaker feature not enabled" }),
571 )),
572
573 ("GET", "/api/migration/status") | ("GET", "/migration/status") => {
575 match state.migration.read().await.as_ref() {
576 Some(info) => {
577 let st =
578 crate::mirror::status(&info.target, info.writes_only, &info.metrics);
579 let mut v = serde_json::to_value(st)?;
580 let cut = info.cutover.load_full().is_some();
581 v["cutover_active"] = serde_json::json!(cut);
582 Ok((200, v))
583 }
584 None => Ok((
585 503,
586 serde_json::json!({ "error": "traffic mirroring not enabled" }),
587 )),
588 }
589 }
590
591 ("POST", "/api/migration/cutover") | ("POST", "/migration/cutover") => {
593 let info = state.migration.read().await.clone();
594 let Some(info) = info else {
595 return Ok((
596 503,
597 serde_json::json!({ "error": "traffic mirroring not enabled" }),
598 ));
599 };
600 let force = path.contains("force=true")
601 || body.map(|b| b.contains("\"force\":true")).unwrap_or(false);
602 let st = crate::mirror::status(&info.target, info.writes_only, &info.metrics);
603 if !st.migration_ready && !force {
604 return Ok((
605 409,
606 serde_json::json!({
607 "ok": false,
608 "error": "not migration_ready (backlog/drops present); pass force=true to override",
609 "status": st,
610 }),
611 ));
612 }
613 info.cutover
614 .store(Arc::new(Some(Arc::new(info.cutover_target.clone()))));
615 tracing::warn!(target = %info.cutover_target.addr, "migration cutover: new connections now route to the promoted target");
616 Ok((
617 200,
618 serde_json::json!({ "ok": true, "promoted_to": info.cutover_target.addr }),
619 ))
620 }
621
622 ("POST", "/api/migration/cutover/rollback")
624 | ("POST", "/migration/cutover/rollback") => {
625 let info = state.migration.read().await.clone();
626 let Some(info) = info else {
627 return Ok((
628 503,
629 serde_json::json!({ "error": "traffic mirroring not enabled" }),
630 ));
631 };
632 info.cutover.store(Arc::new(None));
633 Ok((200, serde_json::json!({ "ok": true, "rolled_back": true })))
634 }
635
636 ("POST", "/api/migration/snapshot") | ("POST", "/migration/snapshot") => {
638 let info = state.migration.read().await.clone();
639 let Some(info) = info else {
640 return Ok((
641 503,
642 serde_json::json!({ "error": "traffic mirroring not enabled" }),
643 ));
644 };
645 let body = body.unwrap_or("{}");
646 let req: serde_json::Value = serde_json::from_str(body)
647 .map_err(|e| ProxyError::Internal(format!("invalid JSON: {}", e)))?;
648 let tables: Vec<String> = req
649 .get("tables")
650 .and_then(|t| t.as_array())
651 .map(|a| {
652 a.iter()
653 .filter_map(|v| v.as_str().map(String::from))
654 .collect()
655 })
656 .unwrap_or_default();
657 if tables.is_empty() {
658 return Ok((
659 400,
660 serde_json::json!({ "error": "provide a non-empty 'tables' array" }),
661 ));
662 }
663 match crate::mirror::snapshot_tables(&info.config, &tables).await {
664 Ok(rep) => {
665 let total: u64 = rep.iter().map(|t| t.copied).sum();
666 Ok((
667 200,
668 serde_json::json!({ "ok": true, "tables": rep, "rows_copied": total }),
669 ))
670 }
671 Err(e) => Ok((500, serde_json::json!({ "ok": false, "error": e }))),
672 }
673 }
674
675 ("GET", p) if p == "/api/branch" || p == "/branch" || p.starts_with("/api/branch?") => {
677 let cfg = state.branch.read().await.clone();
678 let Some(cfg) = cfg else {
679 return Ok((
680 503,
681 serde_json::json!({ "error": "branch databases not enabled" }),
682 ));
683 };
684 match crate::branch::list(&cfg).await {
685 Ok(branches) => Ok((200, serde_json::json!({ "branches": branches }))),
686 Err(e) => Ok((500, serde_json::json!({ "error": e }))),
687 }
688 }
689 ("POST", p) if p == "/api/branch" || p == "/branch" => {
690 let cfg = state.branch.read().await.clone();
691 let Some(cfg) = cfg else {
692 return Ok((
693 503,
694 serde_json::json!({ "error": "branch databases not enabled" }),
695 ));
696 };
697 let req: serde_json::Value = serde_json::from_str(body.unwrap_or("{}"))
698 .map_err(|e| ProxyError::Internal(format!("invalid JSON: {}", e)))?;
699 let name = req.get("name").and_then(|v| v.as_str()).unwrap_or("");
700 if name.is_empty() {
701 return Ok((400, serde_json::json!({ "error": "provide 'name'" })));
702 }
703 let base = req.get("base").and_then(|v| v.as_str());
704 match crate::branch::create(&cfg, name, base).await {
705 Ok(()) => Ok((
706 200,
707 serde_json::json!({ "ok": true, "branch": name,
708 "base": base.unwrap_or(&cfg.base_database) }),
709 )),
710 Err(e) => Ok((500, serde_json::json!({ "ok": false, "error": e }))),
711 }
712 }
713 ("DELETE", p) if p.starts_with("/api/branch") || p.starts_with("/branch") => {
714 let cfg = state.branch.read().await.clone();
715 let Some(cfg) = cfg else {
716 return Ok((
717 503,
718 serde_json::json!({ "error": "branch databases not enabled" }),
719 ));
720 };
721 let name = p.find("name=").map(|i| &p[i + 5..]).unwrap_or("");
722 if name.is_empty() {
723 return Ok((
724 400,
725 serde_json::json!({ "error": "provide ?name=<branch>" }),
726 ));
727 }
728 match crate::branch::drop(&cfg, name).await {
729 Ok(()) => Ok((200, serde_json::json!({ "ok": true, "dropped": name }))),
730 Err(e) => Ok((500, serde_json::json!({ "ok": false, "error": e }))),
731 }
732 }
733
734 ("GET", "/config") => {
736 let config = state.config_snapshot.read().await.clone();
737 Ok((200, serde_json::to_value(config)?))
738 }
739
740 ("GET", "/sessions") => {
742 let count = *state.active_sessions.read().await;
743 let response = SessionsResponse {
744 active_sessions: count,
745 };
746 Ok((200, serde_json::to_value(response)?))
747 }
748
749 ("GET", "/pools") => {
751 let pools = Self::get_pool_stats(state).await;
752 Ok((200, serde_json::to_value(pools)?))
753 }
754
755 ("GET", "/version") => {
757 let response = VersionResponse {
758 version: crate::VERSION.to_string(),
759 build_time: env!("CARGO_PKG_VERSION").to_string(),
760 };
761 Ok((200, serde_json::to_value(response)?))
762 }
763
764 _ => Ok((404, serde_json::json!({ "error": "Not found" }))),
766 }
767 }
768
769 async fn handle_sql_request(
771 body: Option<&str>,
772 state: &Arc<AdminState>,
773 ) -> Result<(u16, serde_json::Value)> {
774 let body = body.ok_or_else(|| ProxyError::Internal("Missing request body".to_string()))?;
776 let request: SqlRequest = serde_json::from_str(body)
777 .map_err(|e| ProxyError::Internal(format!("Invalid JSON: {}", e)))?;
778
779 let sql = request.query.trim();
780 if sql.is_empty() {
781 return Ok((400, serde_json::json!({ "error": "Empty query" })));
782 }
783
784 let is_write = Self::is_write_query(sql);
786 let query_type = if is_write { "write" } else { "read" };
787
788 let proxy_config = state.proxy_config.read().await;
790 let config = proxy_config
791 .as_ref()
792 .ok_or_else(|| ProxyError::Internal("Proxy config not initialized".to_string()))?;
793
794 let health = state.node_health.read().await;
796
797 let target_node = if is_write {
799 Self::select_primary_node(config, &health)?
801 } else {
802 Self::select_read_node(config, &health, state)?
804 };
805
806 let target_address = format!("{}:{}", target_node.host, target_node.port);
807 let http_port = target_node.http_port;
809 let http_url = format!("http://{}:{}/api/sql", target_node.host, http_port);
810
811 tracing::debug!(
812 "Routing {} query to {} ({})",
813 query_type,
814 target_address,
815 match target_node.role {
816 NodeRole::Primary => "primary",
817 NodeRole::Standby => "standby",
818 NodeRole::ReadReplica => "replica",
819 }
820 );
821
822 let result = Self::forward_sql_request(&http_url, sql).await?;
824
825 let response = SqlResponse {
827 query_type: query_type.to_string(),
828 routed_to: target_address,
829 node_role: format!("{:?}", target_node.role).to_lowercase(),
830 result,
831 };
832
833 Ok((200, serde_json::to_value(response)?))
834 }
835
836 fn is_write_query(sql: &str) -> bool {
838 let upper = sql.trim().to_uppercase();
839
840 if upper.starts_with("INSERT")
842 || upper.starts_with("UPDATE")
843 || upper.starts_with("DELETE")
844 || upper.starts_with("CREATE")
845 || upper.starts_with("ALTER")
846 || upper.starts_with("DROP")
847 || upper.starts_with("TRUNCATE")
848 || upper.starts_with("GRANT")
849 || upper.starts_with("REVOKE")
850 || upper.starts_with("VACUUM")
851 || upper.starts_with("REINDEX")
852 || upper.starts_with("MERGE")
853 || upper.starts_with("UPSERT")
854 {
855 return true;
856 }
857
858 if upper.starts_with("BEGIN")
860 || upper.starts_with("COMMIT")
861 || upper.starts_with("ROLLBACK")
862 || upper.starts_with("SAVEPOINT")
863 {
864 return true;
866 }
867
868 false
870 }
871
872 fn select_primary_node<'a>(
874 config: &'a ProxyConfig,
875 health: &HashMap<String, NodeHealth>,
876 ) -> Result<&'a NodeConfig> {
877 config
878 .nodes
879 .iter()
880 .find(|n| {
881 n.role == NodeRole::Primary
882 && n.enabled
883 && health.get(&n.address()).map(|h| h.healthy).unwrap_or(false)
884 })
885 .ok_or_else(|| ProxyError::Internal("No healthy primary node available".to_string()))
886 }
887
888 fn select_read_node<'a>(
890 config: &'a ProxyConfig,
891 health: &HashMap<String, NodeHealth>,
892 state: &AdminState,
893 ) -> Result<&'a NodeConfig> {
894 let healthy_nodes: Vec<&NodeConfig> = config
896 .nodes
897 .iter()
898 .filter(|n| n.enabled && health.get(&n.address()).map(|h| h.healthy).unwrap_or(false))
899 .collect();
900
901 if healthy_nodes.is_empty() {
902 return Err(ProxyError::Internal(
903 "No healthy nodes available".to_string(),
904 ));
905 }
906
907 if config.load_balancer.read_write_split {
909 let read_nodes: Vec<&NodeConfig> = healthy_nodes
910 .iter()
911 .filter(|n| n.role == NodeRole::Standby || n.role == NodeRole::ReadReplica)
912 .copied()
913 .collect();
914
915 if !read_nodes.is_empty() {
916 let counter = state.read_lb_counter.fetch_add(1, Ordering::Relaxed);
918 let index = counter % read_nodes.len();
919 return Ok(read_nodes[index]);
920 }
921 }
922
923 let counter = state.read_lb_counter.fetch_add(1, Ordering::Relaxed);
925 let index = counter % healthy_nodes.len();
926 Ok(healthy_nodes[index])
927 }
928
929 async fn forward_sql_request(url: &str, sql: &str) -> Result<serde_json::Value> {
931 let request_body = serde_json::json!({ "query": sql });
933 let body_bytes = serde_json::to_vec(&request_body)
934 .map_err(|e| ProxyError::Internal(format!("JSON serialization error: {}", e)))?;
935
936 let url_parts: Vec<&str> = url.trim_start_matches("http://").splitn(2, '/').collect();
938 if url_parts.is_empty() {
939 return Err(ProxyError::Internal("Invalid URL".to_string()));
940 }
941
942 let host_port = url_parts[0];
943 let path = if url_parts.len() > 1 {
944 format!("/{}", url_parts[1])
945 } else {
946 "/".to_string()
947 };
948
949 let stream = TcpStream::connect(host_port).await.map_err(|e| {
951 ProxyError::Network(format!("Failed to connect to {}: {}", host_port, e))
952 })?;
953
954 let (reader, mut writer) = stream.into_split();
955 let mut reader = BufReader::new(reader);
956
957 let request = format!(
959 "POST {} HTTP/1.1\r\nHost: {}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
960 path,
961 host_port,
962 body_bytes.len()
963 );
964
965 writer
966 .write_all(request.as_bytes())
967 .await
968 .map_err(|e| ProxyError::Network(format!("Write error: {}", e)))?;
969 writer
970 .write_all(&body_bytes)
971 .await
972 .map_err(|e| ProxyError::Network(format!("Write body error: {}", e)))?;
973
974 let mut response_headers = Vec::new();
976 let mut line = String::new();
977 let mut content_length: usize = 0;
978
979 loop {
980 line.clear();
981 let bytes_read = reader
982 .read_line(&mut line)
983 .await
984 .map_err(|e| ProxyError::Network(format!("Response read error: {}", e)))?;
985
986 if bytes_read == 0 || line == "\r\n" {
987 break;
988 }
989
990 let trimmed = line.trim();
991 if trimmed.to_lowercase().starts_with("content-length:") {
992 if let Some(len_str) = trimmed.split(':').nth(1) {
993 content_length = len_str.trim().parse().unwrap_or(0);
994 }
995 }
996 response_headers.push(trimmed.to_string());
997 }
998
999 let mut body_buf = vec![0u8; content_length];
1001 if content_length > 0 {
1002 reader
1003 .read_exact(&mut body_buf)
1004 .await
1005 .map_err(|e| ProxyError::Network(format!("Response body read error: {}", e)))?;
1006 }
1007
1008 let response_body = String::from_utf8_lossy(&body_buf);
1009
1010 serde_json::from_str(&response_body).map_err(|e| {
1012 ProxyError::Internal(format!(
1013 "Invalid JSON response: {} - body: {}",
1014 e, response_body
1015 ))
1016 })
1017 }
1018
1019 async fn check_readiness(state: &Arc<AdminState>) -> bool {
1021 let health = state.node_health.read().await;
1022
1023 health.values().any(|h| h.healthy)
1025 }
1026
1027 async fn set_node_enabled(
1029 state: &Arc<AdminState>,
1030 node_addr: &str,
1031 enabled: bool,
1032 ) -> Result<()> {
1033 let mut health = state.node_health.write().await;
1034
1035 if let Some(node_health) = health.get_mut(node_addr) {
1036 node_health.healthy = enabled;
1037 Ok(())
1038 } else {
1039 Err(ProxyError::Config(format!("Node not found: {}", node_addr)))
1040 }
1041 }
1042
1043 async fn get_pool_stats(_state: &Arc<AdminState>) -> Vec<PoolStatsResponse> {
1045 #[cfg(feature = "pool-modes")]
1048 if let Some(mgr) = _state.pool_manager.read().await.clone() {
1049 let stats = mgr.get_stats().await;
1050 return stats
1051 .node_stats
1052 .into_iter()
1053 .map(|ns| PoolStatsResponse {
1054 node: ns.node_id.0.to_string(),
1055 active_connections: ns.active as u64,
1056 idle_connections: ns.idle as u64,
1057 pending_requests: 0,
1060 total_connections_created: ns.total as u64,
1061 total_connections_closed: 0,
1062 })
1063 .collect();
1064 }
1065 Vec::new()
1066 }
1067
1068 #[cfg(feature = "ha-tr")]
1072 async fn handle_replay_request(
1073 body: Option<&str>,
1074 state: &Arc<AdminState>,
1075 ) -> Result<(u16, serde_json::Value)> {
1076 let raw =
1077 body.ok_or_else(|| ProxyError::Internal("replay: empty request body".to_string()))?;
1078 let req: ReplayRequestBody = match serde_json::from_str(raw) {
1079 Ok(r) => r,
1080 Err(e) => {
1081 return Ok((
1082 400,
1083 serde_json::json!({ "error": format!("invalid body: {}", e) }),
1084 ));
1085 }
1086 };
1087 let engine = match state.replay_engine.read().await.clone() {
1088 Some(e) => e,
1089 None => {
1090 return Ok((
1091 503,
1092 serde_json::json!({ "error": "replay engine not attached" }),
1093 ));
1094 }
1095 };
1096 let tt = TimeTravelRequest {
1097 from: req.from,
1098 to: req.to,
1099 target_host: req.target_host,
1100 target_port: req.target_port,
1101 target_user: req.target_user,
1102 target_password: req.target_password,
1103 target_database: req.target_database,
1104 };
1105 match engine.replay_window(&tt).await {
1106 Ok(summary) => Ok((200, serde_json::to_value(summary)?)),
1107 Err(e) => Ok((
1108 500,
1109 serde_json::json!({ "error": format!("replay failed: {}", e) }),
1110 )),
1111 }
1112 }
1113
1114 #[cfg(feature = "edge-proxy")]
1117 async fn handle_edge_status(state: &Arc<AdminState>) -> Result<(u16, serde_json::Value)> {
1118 let cache_stats = state.edge_cache.read().await.clone().map(|c| c.stats());
1119 let edges = match state.edge_registry.read().await.clone() {
1120 Some(r) => r.list(),
1121 None => Vec::new(),
1122 };
1123 Ok((
1124 200,
1125 serde_json::json!({
1126 "cache": cache_stats,
1127 "registered": edges,
1128 "edge_count": edges.len(),
1129 }),
1130 ))
1131 }
1132
1133 #[cfg(feature = "edge-proxy")]
1138 async fn handle_edge_register(
1139 body: Option<&str>,
1140 state: &Arc<AdminState>,
1141 ) -> Result<(u16, serde_json::Value)> {
1142 let raw =
1143 body.ok_or_else(|| ProxyError::Internal("edge register: empty body".to_string()))?;
1144 let req: EdgeRegisterBody = match serde_json::from_str(raw) {
1145 Ok(r) => r,
1146 Err(e) => {
1147 return Ok((
1148 400,
1149 serde_json::json!({ "error": format!("invalid body: {}", e) }),
1150 ));
1151 }
1152 };
1153 let registry = match state.edge_registry.read().await.clone() {
1154 Some(r) => r,
1155 None => {
1156 return Ok((
1157 503,
1158 serde_json::json!({ "error": "edge registry not attached" }),
1159 ));
1160 }
1161 };
1162 let now = chrono::Utc::now().to_rfc3339();
1163 match registry.register(&req.edge_id, &req.region, &req.base_url, &now) {
1164 Ok(_rx) => {
1165 Ok((
1171 201,
1172 serde_json::json!({
1173 "edge_id": req.edge_id,
1174 "region": req.region,
1175 "base_url": req.base_url,
1176 "registered_at": now,
1177 }),
1178 ))
1179 }
1180 Err(e) => Ok((503, serde_json::json!({ "error": e.to_string() }))),
1181 }
1182 }
1183
1184 #[cfg(feature = "edge-proxy")]
1191 async fn handle_edge_invalidate(
1192 body: Option<&str>,
1193 state: &Arc<AdminState>,
1194 ) -> Result<(u16, serde_json::Value)> {
1195 let raw =
1196 body.ok_or_else(|| ProxyError::Internal("edge invalidate: empty body".to_string()))?;
1197 let req: EdgeInvalidateBody = match serde_json::from_str(raw) {
1198 Ok(r) => r,
1199 Err(e) => {
1200 return Ok((
1201 400,
1202 serde_json::json!({ "error": format!("invalid body: {}", e) }),
1203 ));
1204 }
1205 };
1206 let cache = match state.edge_cache.read().await.clone() {
1207 Some(c) => c,
1208 None => {
1209 return Ok((
1210 503,
1211 serde_json::json!({ "error": "edge cache not attached" }),
1212 ));
1213 }
1214 };
1215 let registry = match state.edge_registry.read().await.clone() {
1216 Some(r) => r,
1217 None => {
1218 return Ok((
1219 503,
1220 serde_json::json!({ "error": "edge registry not attached" }),
1221 ));
1222 }
1223 };
1224 let version = req.up_to_version.unwrap_or_else(|| cache.next_version());
1225 let dropped_local = cache.invalidate(version, &req.tables);
1227 let ev = InvalidationEvent {
1229 up_to_version: version,
1230 tables: req.tables.clone(),
1231 committed_at: chrono::Utc::now().to_rfc3339(),
1232 };
1233 let (sent, pruned) = registry.broadcast(ev).await;
1234 Ok((
1235 200,
1236 serde_json::json!({
1237 "version": version,
1238 "tables": req.tables,
1239 "dropped_local": dropped_local,
1240 "edges_notified": sent,
1241 "edges_pruned": pruned,
1242 }),
1243 ))
1244 }
1245
1246 #[cfg(feature = "anomaly-detection")]
1251 async fn handle_anomalies_list(
1252 path: &str,
1253 state: &Arc<AdminState>,
1254 ) -> Result<(u16, serde_json::Value)> {
1255 let limit = parse_limit_query(path, 100, 1024);
1256 let det = match state.anomaly_detector.read().await.clone() {
1257 Some(d) => d,
1258 None => {
1259 return Ok((
1260 503,
1261 serde_json::json!({ "error": "anomaly detector not attached" }),
1262 ));
1263 }
1264 };
1265 let events = det.recent_events(limit);
1266 Ok((
1267 200,
1268 serde_json::json!({
1269 "count": events.len(),
1270 "limit": limit,
1271 "events": events,
1272 "buffer_total": det.event_count(),
1273 }),
1274 ))
1275 }
1276
1277 #[cfg(feature = "query-analytics")]
1280 async fn handle_analytics(
1281 path: &str,
1282 state: &Arc<AdminState>,
1283 ) -> Result<(u16, serde_json::Value)> {
1284 use crate::analytics::OrderBy;
1285 let limit = parse_limit_query(path, 50, 1024);
1286 let a = match state.analytics.read().await.clone() {
1287 Some(a) => a,
1288 None => {
1289 return Ok((503, serde_json::json!({ "error": "analytics not enabled" })));
1290 }
1291 };
1292 let top: Vec<serde_json::Value> = a
1293 .top_queries(OrderBy::Calls, limit)
1294 .into_iter()
1295 .map(|s| {
1296 serde_json::json!({
1297 "fingerprint": s.fingerprint_hash,
1298 "normalized": s.normalized,
1299 "calls": s.calls,
1300 "avg_ms": s.avg_time.as_secs_f64() * 1000.0,
1301 "p99_ms": s.p99.as_secs_f64() * 1000.0,
1302 "rows": s.rows,
1303 "errors": s.errors,
1304 })
1305 })
1306 .collect();
1307 let slow_count = a.slow_queries(limit).len();
1308 Ok((
1309 200,
1310 serde_json::json!({
1311 "limit": limit,
1312 "top_queries": top,
1313 "slow_query_count": slow_count,
1314 }),
1315 ))
1316 }
1317
1318 #[cfg(feature = "ha-tr")]
1328 async fn handle_shadow_request(body: Option<&str>) -> Result<(u16, serde_json::Value)> {
1329 use crate::backend::{
1330 tls::default_client_config, BackendClient, BackendConfig, ParamValue, TlsMode,
1331 };
1332 use crate::shadow_execute::shadow_execute;
1333
1334 let raw =
1335 body.ok_or_else(|| ProxyError::Internal("shadow: empty request body".to_string()))?;
1336 let req: ShadowRequestBody = match serde_json::from_str(raw) {
1337 Ok(r) => r,
1338 Err(e) => {
1339 return Ok((
1340 400,
1341 serde_json::json!({ "error": format!("invalid body: {}", e) }),
1342 ));
1343 }
1344 };
1345
1346 let mk_cfg = |host: String,
1349 port: u16,
1350 user: Option<String>,
1351 password: Option<String>,
1352 database: Option<String>| BackendConfig {
1353 host,
1354 port,
1355 user: user.unwrap_or_else(|| "postgres".into()),
1356 password,
1357 database,
1358 application_name: Some("heliosdb-proxy-shadow".into()),
1359 tls_mode: TlsMode::Disable,
1360 connect_timeout: std::time::Duration::from_secs(5),
1361 query_timeout: std::time::Duration::from_secs(30),
1362 tls_config: default_client_config(),
1363 };
1364 let source_cfg = mk_cfg(
1365 req.source_host,
1366 req.source_port,
1367 req.source_user,
1368 req.source_password,
1369 req.source_database,
1370 );
1371 let shadow_cfg = mk_cfg(
1372 req.shadow_host,
1373 req.shadow_port,
1374 req.shadow_user,
1375 req.shadow_password,
1376 req.shadow_database,
1377 );
1378
1379 let mut source = match BackendClient::connect(&source_cfg).await {
1383 Ok(c) => c,
1384 Err(e) => {
1385 return Ok((
1386 500,
1387 serde_json::json!({ "error": format!("source connect: {}", e) }),
1388 ));
1389 }
1390 };
1391
1392 let params: Vec<ParamValue> = req
1393 .params
1394 .unwrap_or_default()
1395 .into_iter()
1396 .map(ParamValue::Text)
1397 .collect();
1398
1399 let outcome = shadow_execute(&mut source, &shadow_cfg, &req.sql, ¶ms).await;
1400 source.close().await;
1401
1402 match outcome {
1403 Ok((_qr, report)) => Ok((
1404 200,
1405 serde_json::json!({
1406 "sql": report.sql,
1407 "both_succeeded": report.both_succeeded,
1408 "row_count_match": report.row_count_match,
1409 "row_hash_match": report.row_hash_match,
1410 "primary_elapsed_us": report.primary_elapsed_us,
1411 "shadow_elapsed_us": report.shadow_elapsed_us,
1412 "primary_error": report.primary_error,
1413 "shadow_error": report.shadow_error,
1414 "is_clean": report.is_clean(),
1415 }),
1416 )),
1417 Err(e) => Ok((
1418 500,
1419 serde_json::json!({ "error": format!("shadow execute: {}", e) }),
1420 )),
1421 }
1422 }
1423
1424 #[cfg(feature = "circuit-breaker")]
1443 async fn handle_circuit_status(state: &Arc<AdminState>) -> Result<(u16, serde_json::Value)> {
1444 let mgr = match state.circuit_breaker.read().await.clone() {
1445 Some(m) => m,
1446 None => {
1447 return Ok((
1448 503,
1449 serde_json::json!({ "error": "circuit breaker not attached" }),
1450 ))
1451 }
1452 };
1453 let nodes = state.config_snapshot.read().await.nodes.clone();
1454 let circuits: Vec<serde_json::Value> = nodes
1455 .iter()
1456 .map(|n| {
1457 let st = mgr.get_breaker(&n.address).get_state();
1458 serde_json::json!({
1459 "node": n.address,
1460 "state": format!("{:?}", st).to_lowercase(),
1461 })
1462 })
1463 .collect();
1464 Ok((200, serde_json::json!({ "circuits": circuits })))
1465 }
1466
1467 async fn handle_chaos_request(
1468 body: Option<&str>,
1469 state: &Arc<AdminState>,
1470 ) -> Result<(u16, serde_json::Value)> {
1471 let raw =
1472 body.ok_or_else(|| ProxyError::Internal("chaos: empty request body".to_string()))?;
1473 let action: ChaosAction = match serde_json::from_str(raw) {
1474 Ok(a) => a,
1475 Err(e) => {
1476 return Ok((
1477 400,
1478 serde_json::json!({ "error": format!("invalid body: {}", e) }),
1479 ));
1480 }
1481 };
1482 match action {
1483 ChaosAction::ForceUnhealthy { target_node } => {
1484 if let Err(e) = Self::set_node_enabled(state, &target_node, false).await {
1485 return Ok((404, serde_json::json!({ "error": e.to_string() })));
1486 }
1487 state.chaos_overrides.write().await.insert(
1488 target_node.clone(),
1489 ChaosOverride {
1490 since: chrono::Utc::now().to_rfc3339(),
1491 kind: "force_unhealthy".to_string(),
1492 note: "forced unhealthy via chaos endpoint".to_string(),
1493 },
1494 );
1495 Ok((
1496 200,
1497 serde_json::json!({
1498 "applied": "force_unhealthy",
1499 "target_node": target_node,
1500 }),
1501 ))
1502 }
1503 ChaosAction::Restore { target_node } => {
1504 if let Err(e) = Self::set_node_enabled(state, &target_node, true).await {
1505 return Ok((404, serde_json::json!({ "error": e.to_string() })));
1506 }
1507 state.chaos_overrides.write().await.remove(&target_node);
1508 Ok((
1509 200,
1510 serde_json::json!({
1511 "restored": target_node,
1512 }),
1513 ))
1514 }
1515 ChaosAction::Reset => {
1516 let overrides: Vec<String> =
1517 state.chaos_overrides.read().await.keys().cloned().collect();
1518 let mut restored = Vec::with_capacity(overrides.len());
1519 for addr in overrides {
1520 let _ = Self::set_node_enabled(state, &addr, true).await;
1521 restored.push(addr);
1522 }
1523 state.chaos_overrides.write().await.clear();
1524 Ok((
1525 200,
1526 serde_json::json!({
1527 "reset": true,
1528 "restored": restored,
1529 }),
1530 ))
1531 }
1532 }
1533 }
1534
1535 #[cfg(feature = "wasm-plugins")]
1541 async fn handle_plugins_list(state: &Arc<AdminState>) -> Result<(u16, serde_json::Value)> {
1542 let pm = match state.plugin_manager.read().await.clone() {
1543 Some(p) => p,
1544 None => {
1545 return Ok((
1546 503,
1547 serde_json::json!({ "error": "plugin manager not attached" }),
1548 ));
1549 }
1550 };
1551 let plugins: Vec<PluginListEntry> = pm
1552 .list_plugins()
1553 .into_iter()
1554 .map(|info| PluginListEntry {
1555 name: info.name,
1556 version: info.version,
1557 description: info.description,
1558 hooks: info
1559 .hooks
1560 .iter()
1561 .map(|h| h.export_name().to_string())
1562 .collect(),
1563 state: format!("{:?}", info.state),
1564 invocations: info.stats.total_calls,
1565 errors: info.stats.error_count,
1566 })
1567 .collect();
1568 Ok((200, serde_json::to_value(plugins)?))
1569 }
1570
1571 #[cfg(not(feature = "wasm-plugins"))]
1572 async fn handle_plugins_list(_state: &Arc<AdminState>) -> Result<(u16, serde_json::Value)> {
1573 Ok((
1574 503,
1575 serde_json::json!({ "error": "wasm-plugins feature not compiled in" }),
1576 ))
1577 }
1578
1579 async fn compute_topology(state: &Arc<AdminState>) -> TopologyResponse {
1585 let health = state.node_health.read().await;
1586 let cfg = state.config_snapshot.read().await;
1587
1588 let mut current_primary: Option<String> = None;
1589 for n in &cfg.nodes {
1590 if n.role.eq_ignore_ascii_case("primary") {
1591 let healthy = health.get(&n.address).map(|h| h.healthy).unwrap_or(false);
1592 if healthy {
1593 current_primary = Some(n.address.clone());
1594 break;
1595 }
1596 }
1597 }
1598
1599 let healthy_nodes = health.values().filter(|h| h.healthy).count() as u32;
1600 let unhealthy_nodes = health.values().filter(|h| !h.healthy).count() as u32;
1601 let total_nodes = cfg.nodes.len() as u32;
1602
1603 TopologyResponse {
1604 current_primary,
1605 healthy_nodes,
1606 unhealthy_nodes,
1607 total_nodes,
1608 last_failover_at: None,
1609 }
1610 }
1611
1612 fn format_prometheus_metrics(metrics: &ServerMetricsSnapshot) -> String {
1614 let mut output = String::new();
1615
1616 output.push_str("# HELP heliosdb_proxy_connections_total Total connections accepted\n");
1617 output.push_str("# TYPE heliosdb_proxy_connections_total counter\n");
1618 output.push_str(&format!(
1619 "heliosdb_proxy_connections_total {}\n",
1620 metrics.connections_accepted
1621 ));
1622
1623 output.push_str("# HELP heliosdb_proxy_connections_closed Total connections closed\n");
1624 output.push_str("# TYPE heliosdb_proxy_connections_closed counter\n");
1625 output.push_str(&format!(
1626 "heliosdb_proxy_connections_closed {}\n",
1627 metrics.connections_closed
1628 ));
1629
1630 output.push_str("# HELP heliosdb_proxy_queries_total Total queries processed\n");
1631 output.push_str("# TYPE heliosdb_proxy_queries_total counter\n");
1632 output.push_str(&format!(
1633 "heliosdb_proxy_queries_total {}\n",
1634 metrics.queries_processed
1635 ));
1636
1637 output.push_str("# HELP heliosdb_proxy_bytes_received_total Total bytes received\n");
1638 output.push_str("# TYPE heliosdb_proxy_bytes_received_total counter\n");
1639 output.push_str(&format!(
1640 "heliosdb_proxy_bytes_received_total {}\n",
1641 metrics.bytes_received
1642 ));
1643
1644 output.push_str("# HELP heliosdb_proxy_bytes_sent_total Total bytes sent\n");
1645 output.push_str("# TYPE heliosdb_proxy_bytes_sent_total counter\n");
1646 output.push_str(&format!(
1647 "heliosdb_proxy_bytes_sent_total {}\n",
1648 metrics.bytes_sent
1649 ));
1650
1651 output.push_str("# HELP heliosdb_proxy_failovers_total Total failovers\n");
1652 output.push_str("# TYPE heliosdb_proxy_failovers_total counter\n");
1653 output.push_str(&format!(
1654 "heliosdb_proxy_failovers_total {}\n",
1655 metrics.failovers
1656 ));
1657
1658 output
1659 }
1660
1661 async fn send_response(
1663 writer: &mut tokio::net::tcp::WriteHalf<'_>,
1664 status: u16,
1665 status_text: &str,
1666 body: &str,
1667 ) -> Result<()> {
1668 let response = format!(
1669 "HTTP/1.1 {} {}\r\nContent-Type: text/plain\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
1670 status,
1671 status_text,
1672 body.len(),
1673 body
1674 );
1675
1676 writer
1677 .write_all(response.as_bytes())
1678 .await
1679 .map_err(|e| ProxyError::Network(format!("Write error: {}", e)))?;
1680
1681 Ok(())
1682 }
1683
1684 async fn send_json_response<T: Serialize>(
1686 writer: &mut tokio::net::tcp::WriteHalf<'_>,
1687 status: u16,
1688 body: &T,
1689 ) -> Result<()> {
1690 let json = serde_json::to_string(body)
1691 .map_err(|e| ProxyError::Internal(format!("JSON error: {}", e)))?;
1692
1693 let status_text = match status {
1694 200 => "OK",
1695 400 => "Bad Request",
1696 404 => "Not Found",
1697 500 => "Internal Server Error",
1698 503 => "Service Unavailable",
1699 _ => "Unknown",
1700 };
1701
1702 let response = format!(
1703 "HTTP/1.1 {} {}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
1704 status,
1705 status_text,
1706 json.len(),
1707 json
1708 );
1709
1710 writer
1711 .write_all(response.as_bytes())
1712 .await
1713 .map_err(|e| ProxyError::Network(format!("Write error: {}", e)))?;
1714
1715 Ok(())
1716 }
1717
1718 pub fn shutdown(&self) {
1720 let _ = self.shutdown_tx.send(());
1721 }
1722}
1723
1724impl AdminState {
1725 pub fn new() -> Self {
1727 Self {
1728 node_health: RwLock::new(HashMap::new()),
1729 metrics: RwLock::new(ServerMetricsSnapshot {
1730 connections_accepted: 0,
1731 connections_closed: 0,
1732 queries_processed: 0,
1733 bytes_received: 0,
1734 bytes_sent: 0,
1735 failovers: 0,
1736 }),
1737 active_sessions: RwLock::new(0),
1738 config_snapshot: RwLock::new(ConfigSnapshot {
1739 listen_address: String::new(),
1740 admin_address: String::new(),
1741 tr_enabled: false,
1742 tr_mode: String::new(),
1743 pool_min_connections: 0,
1744 pool_max_connections: 0,
1745 nodes: Vec::new(),
1746 }),
1747 proxy_config: RwLock::new(None),
1748 read_lb_counter: AtomicUsize::new(0),
1749 commands: RwLock::new(HashMap::new()),
1750 #[cfg(feature = "pool-modes")]
1751 pool_manager: RwLock::new(None),
1752 #[cfg(feature = "circuit-breaker")]
1753 circuit_breaker: RwLock::new(None),
1754 #[cfg(feature = "ha-tr")]
1755 replay_engine: RwLock::new(None),
1756 #[cfg(feature = "wasm-plugins")]
1757 plugin_manager: RwLock::new(None),
1758 chaos_overrides: RwLock::new(HashMap::new()),
1759 #[cfg(feature = "anomaly-detection")]
1760 anomaly_detector: RwLock::new(None),
1761 #[cfg(feature = "query-analytics")]
1762 analytics: RwLock::new(None),
1763 #[cfg(feature = "edge-proxy")]
1764 edge_cache: RwLock::new(None),
1765 #[cfg(feature = "edge-proxy")]
1766 edge_registry: RwLock::new(None),
1767 auth_token: RwLock::new(None),
1768 migration: RwLock::new(None),
1769 branch: RwLock::new(None),
1770 }
1771 }
1772
1773 pub async fn with_auth_token(&self, token: Option<String>) {
1775 *self.auth_token.write().await = token;
1776 }
1777
1778 pub async fn with_migration(&self, info: MigrationInfo) {
1780 *self.migration.write().await = Some(info);
1781 }
1782
1783 pub async fn with_branch(&self, cfg: crate::config::BranchConfig) {
1785 *self.branch.write().await = Some(cfg);
1786 }
1787
1788 #[cfg(feature = "pool-modes")]
1791 pub async fn with_pool_manager(&self, manager: Arc<crate::pool::ConnectionPoolManager>) {
1792 *self.pool_manager.write().await = Some(manager);
1793 }
1794
1795 #[cfg(feature = "circuit-breaker")]
1798 pub async fn with_circuit_breaker(
1799 &self,
1800 manager: Arc<crate::circuit_breaker::CircuitBreakerManager>,
1801 ) {
1802 *self.circuit_breaker.write().await = Some(manager);
1803 }
1804
1805 #[cfg(feature = "anomaly-detection")]
1808 pub async fn with_anomaly_detector(&self, detector: Arc<AnomalyDetector>) {
1809 *self.anomaly_detector.write().await = Some(detector);
1810 }
1811
1812 #[cfg(feature = "query-analytics")]
1814 pub async fn with_analytics(&self, analytics: Arc<crate::analytics::QueryAnalytics>) {
1815 *self.analytics.write().await = Some(analytics);
1816 }
1817
1818 #[cfg(feature = "edge-proxy")]
1821 pub async fn with_edge(&self, cache: Arc<EdgeCache>, registry: Arc<EdgeRegistry>) {
1822 *self.edge_cache.write().await = Some(cache);
1823 *self.edge_registry.write().await = Some(registry);
1824 }
1825
1826 #[cfg(feature = "ha-tr")]
1831 pub async fn with_replay_engine(&self, engine: Arc<ReplayEngine>) {
1832 *self.replay_engine.write().await = Some(engine);
1833 }
1834
1835 #[cfg(feature = "wasm-plugins")]
1839 pub async fn with_plugin_manager(&self, manager: Arc<PluginManager>) {
1840 *self.plugin_manager.write().await = Some(manager);
1841 }
1842
1843 pub async fn set_proxy_config(&self, config: ProxyConfig) {
1845 let mut proxy_config = self.proxy_config.write().await;
1846 *proxy_config = Some(config);
1847 }
1848
1849 pub async fn register_command<F>(&self, name: &str, handler: F)
1851 where
1852 F: Fn(&[&str]) -> Result<String> + Send + Sync + 'static,
1853 {
1854 let mut commands = self.commands.write().await;
1855 commands.insert(name.to_string(), Arc::new(handler));
1856 }
1857
1858 pub async fn execute_command(&self, name: &str, args: &[&str]) -> Result<String> {
1860 let commands = self.commands.read().await;
1861 match commands.get(name) {
1862 Some(handler) => handler(args),
1863 None => Err(ProxyError::Internal(format!("Unknown command: {}", name))),
1864 }
1865 }
1866}
1867
1868impl Default for AdminState {
1869 fn default() -> Self {
1870 Self::new()
1871 }
1872}
1873
1874#[derive(Debug, Deserialize)]
1878struct SqlRequest {
1879 query: String,
1881}
1882
1883#[derive(Debug, Serialize)]
1885struct SqlResponse {
1886 query_type: String,
1888 routed_to: String,
1890 node_role: String,
1892 result: serde_json::Value,
1894}
1895
1896#[derive(Serialize)]
1897struct HealthResponse {
1898 status: &'static str,
1899}
1900
1901#[derive(Serialize)]
1902struct ReadinessResponse {
1903 ready: bool,
1904 message: &'static str,
1905}
1906
1907#[derive(Serialize)]
1908struct LivenessResponse {
1909 alive: bool,
1910}
1911
1912#[derive(Serialize)]
1913struct ErrorResponse {
1914 error: String,
1915}
1916
1917#[derive(Serialize)]
1918struct MetricsResponse {
1919 connections_accepted: u64,
1920 connections_closed: u64,
1921 connections_active: u64,
1922 queries_processed: u64,
1923 bytes_received: u64,
1924 bytes_sent: u64,
1925 failovers: u64,
1926}
1927
1928impl From<ServerMetricsSnapshot> for MetricsResponse {
1929 fn from(m: ServerMetricsSnapshot) -> Self {
1930 Self {
1931 connections_accepted: m.connections_accepted,
1932 connections_closed: m.connections_closed,
1933 connections_active: m.connections_accepted.saturating_sub(m.connections_closed),
1934 queries_processed: m.queries_processed,
1935 bytes_received: m.bytes_received,
1936 bytes_sent: m.bytes_sent,
1937 failovers: m.failovers,
1938 }
1939 }
1940}
1941
1942#[derive(Serialize)]
1943struct NodeHealthResponse {
1944 address: String,
1945 healthy: bool,
1946 last_check: String,
1947 failure_count: u32,
1948 last_error: Option<String>,
1949 latency_ms: f64,
1950 replication_lag_bytes: Option<u64>,
1951}
1952
1953impl From<NodeHealth> for NodeHealthResponse {
1954 fn from(h: NodeHealth) -> Self {
1955 Self {
1956 address: h.address,
1957 healthy: h.healthy,
1958 last_check: h.last_check.to_rfc3339(),
1959 failure_count: h.failure_count,
1960 last_error: h.last_error,
1961 latency_ms: h.latency_ms,
1962 replication_lag_bytes: h.replication_lag_bytes,
1963 }
1964 }
1965}
1966
1967#[derive(Serialize)]
1968struct SessionsResponse {
1969 active_sessions: u64,
1970}
1971
1972#[cfg(feature = "edge-proxy")]
1974#[derive(Debug, Deserialize)]
1975struct EdgeRegisterBody {
1976 edge_id: String,
1977 region: String,
1978 base_url: String,
1979}
1980
1981#[cfg(feature = "edge-proxy")]
1985#[derive(Debug, Deserialize)]
1986struct EdgeInvalidateBody {
1987 #[serde(default)]
1988 tables: Vec<String>,
1989 #[serde(default)]
1990 up_to_version: Option<u64>,
1991}
1992
1993#[cfg(any(feature = "anomaly-detection", feature = "query-analytics"))]
2000fn parse_limit_query(path: &str, default: usize, max: usize) -> usize {
2001 let q = match path.find('?') {
2002 Some(i) => &path[i + 1..],
2003 None => return default,
2004 };
2005 for kv in q.split('&') {
2006 let mut it = kv.splitn(2, '=');
2007 if let (Some(k), Some(v)) = (it.next(), it.next()) {
2008 if k == "limit" {
2009 if let Ok(n) = v.parse::<usize>() {
2010 return n.min(max);
2011 }
2012 }
2013 }
2014 }
2015 default
2016}
2017
2018#[cfg(feature = "ha-tr")]
2020#[derive(Debug, Deserialize)]
2021struct ShadowRequestBody {
2022 sql: String,
2024 #[serde(default)]
2027 params: Option<Vec<String>>,
2028
2029 source_host: String,
2032 source_port: u16,
2033 #[serde(default)]
2034 source_user: Option<String>,
2035 #[serde(default)]
2036 source_password: Option<String>,
2037 #[serde(default)]
2038 source_database: Option<String>,
2039
2040 shadow_host: String,
2043 shadow_port: u16,
2044 #[serde(default)]
2045 shadow_user: Option<String>,
2046 #[serde(default)]
2047 shadow_password: Option<String>,
2048 #[serde(default)]
2049 shadow_database: Option<String>,
2050}
2051
2052#[derive(Debug, Deserialize)]
2057#[serde(tag = "action", rename_all = "snake_case")]
2058enum ChaosAction {
2059 ForceUnhealthy { target_node: String },
2063 Restore { target_node: String },
2065 Reset,
2067}
2068
2069#[cfg(feature = "wasm-plugins")]
2072#[derive(Serialize)]
2073struct PluginListEntry {
2074 name: String,
2075 version: String,
2076 description: String,
2077 hooks: Vec<String>,
2079 state: String,
2081 invocations: u64,
2082 errors: u64,
2083}
2084
2085#[cfg(feature = "ha-tr")]
2087#[derive(Debug, Deserialize)]
2088struct ReplayRequestBody {
2089 from: DateTime<Utc>,
2091 to: DateTime<Utc>,
2093 target_host: String,
2095 target_port: u16,
2097 #[serde(default)]
2103 target_user: Option<String>,
2104 #[serde(default)]
2105 target_password: Option<String>,
2106 #[serde(default)]
2107 target_database: Option<String>,
2108}
2109
2110#[derive(Serialize)]
2114struct TopologyResponse {
2115 #[serde(rename = "currentPrimary")]
2116 current_primary: Option<String>,
2117 #[serde(rename = "healthyNodes")]
2118 healthy_nodes: u32,
2119 #[serde(rename = "unhealthyNodes")]
2120 unhealthy_nodes: u32,
2121 #[serde(rename = "totalNodes")]
2122 total_nodes: u32,
2123 #[serde(rename = "lastFailoverAt")]
2126 last_failover_at: Option<String>,
2127}
2128
2129#[derive(Serialize)]
2130struct PoolStatsResponse {
2131 node: String,
2132 active_connections: u64,
2133 idle_connections: u64,
2134 pending_requests: u64,
2135 total_connections_created: u64,
2136 total_connections_closed: u64,
2137}
2138
2139#[derive(Serialize)]
2140struct VersionResponse {
2141 version: String,
2142 build_time: String,
2143}
2144
2145#[cfg(test)]
2146mod tests {
2147 use super::*;
2148
2149 #[tokio::test]
2150 async fn test_admin_state_creation() {
2151 let state = AdminState::new();
2152 let sessions = state.active_sessions.read().await;
2153 assert_eq!(*sessions, 0);
2154 }
2155
2156 #[test]
2157 fn test_admin_authorized() {
2158 let h = |s: &str| vec!["GET /topology HTTP/1.1".to_string(), s.to_string()];
2159 assert!(AdminServer::admin_authorized(
2160 &h("Authorization: Bearer s3cret"),
2161 "s3cret"
2162 ));
2163 assert!(AdminServer::admin_authorized(
2165 &h("authorization: Bearer s3cret"),
2166 "s3cret"
2167 ));
2168 assert!(!AdminServer::admin_authorized(
2170 &h("Authorization: Bearer nope"),
2171 "s3cret"
2172 ));
2173 assert!(!AdminServer::admin_authorized(
2174 &h("Authorization: Basic s3cret"),
2175 "s3cret"
2176 ));
2177 assert!(!AdminServer::admin_authorized(
2178 &["GET / HTTP/1.1".to_string()],
2179 "s3cret"
2180 ));
2181 }
2182
2183 #[test]
2184 fn test_constant_time_eq_str() {
2185 assert!(constant_time_eq_str("abc", "abc"));
2186 assert!(!constant_time_eq_str("abc", "abd"));
2187 assert!(!constant_time_eq_str("abc", "abcd"));
2188 }
2189
2190 #[tokio::test]
2191 async fn test_readiness_check_no_nodes() {
2192 let state = Arc::new(AdminState::new());
2193 let ready = AdminServer::check_readiness(&state).await;
2194 assert!(!ready);
2195 }
2196
2197 #[tokio::test]
2198 async fn test_readiness_check_with_healthy_node() {
2199 let state = Arc::new(AdminState::new());
2200
2201 {
2202 let mut health = state.node_health.write().await;
2203 health.insert(
2204 "localhost:5432".to_string(),
2205 NodeHealth {
2206 address: "localhost:5432".to_string(),
2207 healthy: true,
2208 last_check: chrono::Utc::now(),
2209 failure_count: 0,
2210 last_error: None,
2211 latency_ms: 1.0,
2212 replication_lag_bytes: None,
2213 },
2214 );
2215 }
2216
2217 let ready = AdminServer::check_readiness(&state).await;
2218 assert!(ready);
2219 }
2220
2221 #[tokio::test]
2222 async fn test_command_registration() {
2223 let state = AdminState::new();
2224
2225 state
2226 .register_command("test", |args| {
2227 Ok(format!("Test command with {} args", args.len()))
2228 })
2229 .await;
2230
2231 let result = state.execute_command("test", &["arg1", "arg2"]).await;
2232 assert!(result.is_ok());
2233 assert_eq!(result.unwrap(), "Test command with 2 args");
2234 }
2235
2236 #[tokio::test]
2237 async fn test_unknown_command() {
2238 let state = AdminState::new();
2239 let result = state.execute_command("unknown", &[]).await;
2240 assert!(result.is_err());
2241 }
2242
2243 #[test]
2244 fn test_prometheus_metrics_format() {
2245 let metrics = ServerMetricsSnapshot {
2246 connections_accepted: 100,
2247 connections_closed: 50,
2248 queries_processed: 1000,
2249 bytes_received: 50000,
2250 bytes_sent: 100000,
2251 failovers: 2,
2252 };
2253
2254 let output = AdminServer::format_prometheus_metrics(&metrics);
2255 assert!(output.contains("heliosdb_proxy_connections_total 100"));
2256 assert!(output.contains("heliosdb_proxy_queries_total 1000"));
2257 assert!(output.contains("heliosdb_proxy_failovers_total 2"));
2258 }
2259
2260 #[test]
2261 fn test_metrics_response_active_connections() {
2262 let snapshot = ServerMetricsSnapshot {
2263 connections_accepted: 100,
2264 connections_closed: 30,
2265 queries_processed: 500,
2266 bytes_received: 10000,
2267 bytes_sent: 20000,
2268 failovers: 1,
2269 };
2270
2271 let response = MetricsResponse::from(snapshot);
2272 assert_eq!(response.connections_active, 70);
2273 }
2274
2275 async fn topology_state(nodes: &[(&str, &str, bool)]) -> Arc<AdminState> {
2278 let state = Arc::new(AdminState::new());
2279 {
2280 let mut cfg = state.config_snapshot.write().await;
2281 cfg.nodes = nodes
2282 .iter()
2283 .map(|(addr, role, _)| NodeSnapshot {
2284 address: (*addr).to_string(),
2285 role: (*role).to_string(),
2286 weight: 100,
2287 enabled: true,
2288 })
2289 .collect();
2290 }
2291 {
2292 let mut health = state.node_health.write().await;
2293 for (addr, _, healthy) in nodes {
2294 health.insert(
2295 (*addr).to_string(),
2296 NodeHealth {
2297 address: (*addr).to_string(),
2298 healthy: *healthy,
2299 last_check: chrono::Utc::now(),
2300 failure_count: 0,
2301 last_error: None,
2302 latency_ms: 1.0,
2303 replication_lag_bytes: None,
2304 },
2305 );
2306 }
2307 }
2308 state
2309 }
2310
2311 #[tokio::test]
2312 async fn test_topology_returns_healthy_primary() {
2313 let state = topology_state(&[
2314 ("primary.svc:5432", "primary", true),
2315 ("standby-a.svc:5432", "standby", true),
2316 ("standby-b.svc:5432", "standby", false),
2317 ])
2318 .await;
2319
2320 let topo = AdminServer::compute_topology(&state).await;
2321 assert_eq!(topo.current_primary.as_deref(), Some("primary.svc:5432"));
2322 assert_eq!(topo.healthy_nodes, 2);
2323 assert_eq!(topo.unhealthy_nodes, 1);
2324 assert_eq!(topo.total_nodes, 3);
2325 }
2326
2327 #[tokio::test]
2328 async fn test_topology_no_primary_when_primary_unhealthy() {
2329 let state = topology_state(&[
2332 ("primary.svc:5432", "primary", false),
2333 ("standby.svc:5432", "standby", true),
2334 ])
2335 .await;
2336
2337 let topo = AdminServer::compute_topology(&state).await;
2338 assert_eq!(topo.current_primary, None);
2339 assert_eq!(topo.healthy_nodes, 1);
2340 assert_eq!(topo.unhealthy_nodes, 1);
2341 }
2342
2343 #[tokio::test]
2344 async fn test_topology_handles_empty_cluster() {
2345 let state = Arc::new(AdminState::new());
2346 let topo = AdminServer::compute_topology(&state).await;
2347 assert_eq!(topo.current_primary, None);
2348 assert_eq!(topo.healthy_nodes, 0);
2349 assert_eq!(topo.unhealthy_nodes, 0);
2350 assert_eq!(topo.total_nodes, 0);
2351 }
2352
2353 #[tokio::test]
2354 async fn test_topology_role_match_is_case_insensitive() {
2355 let state = topology_state(&[("primary.svc:5432", "PRIMARY", true)]).await;
2356 let topo = AdminServer::compute_topology(&state).await;
2357 assert_eq!(topo.current_primary.as_deref(), Some("primary.svc:5432"));
2358 }
2359
2360 #[cfg(feature = "ha-tr")]
2361 #[tokio::test]
2362 async fn test_replay_returns_503_when_engine_unattached() {
2363 let state = Arc::new(AdminState::new());
2364 let body = r#"{
2365 "from": "2026-04-25T10:00:00Z",
2366 "to": "2026-04-25T11:00:00Z",
2367 "target_host": "127.0.0.1",
2368 "target_port": 5432
2369 }"#;
2370 let (status, value) = AdminServer::handle_replay_request(Some(body), &state)
2371 .await
2372 .expect("handler returns Ok with status code");
2373 assert_eq!(status, 503);
2374 assert_eq!(value["error"], "replay engine not attached");
2375 }
2376
2377 #[cfg(feature = "ha-tr")]
2378 #[tokio::test]
2379 async fn test_replay_400_on_malformed_body() {
2380 let state = Arc::new(AdminState::new());
2381 let (status, _) = AdminServer::handle_replay_request(Some("not json"), &state)
2382 .await
2383 .expect("handler returns Ok with status code");
2384 assert_eq!(status, 400);
2385 }
2386
2387 #[cfg(feature = "ha-tr")]
2388 #[tokio::test]
2389 async fn test_replay_errors_on_empty_body() {
2390 let state = Arc::new(AdminState::new());
2391 let err = AdminServer::handle_replay_request(None, &state).await;
2392 assert!(err.is_err(), "empty body must surface as Err");
2393 }
2394
2395 #[cfg(feature = "wasm-plugins")]
2396 #[tokio::test]
2397 async fn test_plugins_list_returns_503_when_manager_unattached() {
2398 let state = Arc::new(AdminState::new());
2399 let (status, value) = AdminServer::handle_plugins_list(&state)
2400 .await
2401 .expect("handler returns Ok with status code");
2402 assert_eq!(status, 503);
2403 assert_eq!(value["error"], "plugin manager not attached");
2404 }
2405
2406 #[cfg(not(feature = "wasm-plugins"))]
2407 #[tokio::test]
2408 async fn test_plugins_list_503_without_feature() {
2409 let state = Arc::new(AdminState::new());
2410 let (status, _) = AdminServer::handle_plugins_list(&state)
2411 .await
2412 .expect("handler returns Ok");
2413 assert_eq!(status, 503);
2414 }
2415
2416 async fn chaos_state_with_node(addr: &str) -> Arc<AdminState> {
2418 let state = Arc::new(AdminState::new());
2419 state.node_health.write().await.insert(
2420 addr.to_string(),
2421 NodeHealth {
2422 address: addr.to_string(),
2423 healthy: true,
2424 last_check: chrono::Utc::now(),
2425 failure_count: 0,
2426 last_error: None,
2427 latency_ms: 1.0,
2428 replication_lag_bytes: None,
2429 },
2430 );
2431 state
2432 }
2433
2434 #[tokio::test]
2435 async fn test_chaos_force_unhealthy_flips_node_and_records_override() {
2436 let state = chaos_state_with_node("primary.svc:5432").await;
2437 let body = r#"{"action":"force_unhealthy","target_node":"primary.svc:5432"}"#;
2438 let (status, value) = AdminServer::handle_chaos_request(Some(body), &state)
2439 .await
2440 .expect("handler returns Ok");
2441 assert_eq!(status, 200);
2442 assert_eq!(value["applied"], "force_unhealthy");
2443 assert!(!state.node_health.read().await["primary.svc:5432"].healthy);
2445 assert!(state
2447 .chaos_overrides
2448 .read()
2449 .await
2450 .contains_key("primary.svc:5432"));
2451 }
2452
2453 #[tokio::test]
2454 async fn test_chaos_restore_clears_override_and_flips_back() {
2455 let state = chaos_state_with_node("primary.svc:5432").await;
2456 let _ = AdminServer::handle_chaos_request(
2457 Some(r#"{"action":"force_unhealthy","target_node":"primary.svc:5432"}"#),
2458 &state,
2459 )
2460 .await
2461 .unwrap();
2462 let (status, _) = AdminServer::handle_chaos_request(
2463 Some(r#"{"action":"restore","target_node":"primary.svc:5432"}"#),
2464 &state,
2465 )
2466 .await
2467 .unwrap();
2468 assert_eq!(status, 200);
2469 assert!(state.node_health.read().await["primary.svc:5432"].healthy);
2470 assert!(state.chaos_overrides.read().await.is_empty());
2471 }
2472
2473 #[tokio::test]
2474 async fn test_chaos_reset_restores_all_overrides() {
2475 let state = chaos_state_with_node("a:5432").await;
2476 state.node_health.write().await.insert(
2477 "b:5432".to_string(),
2478 NodeHealth {
2479 address: "b:5432".to_string(),
2480 healthy: true,
2481 last_check: chrono::Utc::now(),
2482 failure_count: 0,
2483 last_error: None,
2484 latency_ms: 1.0,
2485 replication_lag_bytes: None,
2486 },
2487 );
2488 for addr in &["a:5432", "b:5432"] {
2489 let body = format!(r#"{{"action":"force_unhealthy","target_node":"{}"}}"#, addr);
2490 let _ = AdminServer::handle_chaos_request(Some(&body), &state)
2491 .await
2492 .unwrap();
2493 }
2494 let (status, value) =
2495 AdminServer::handle_chaos_request(Some(r#"{"action":"reset"}"#), &state)
2496 .await
2497 .unwrap();
2498 assert_eq!(status, 200);
2499 assert_eq!(value["reset"], true);
2500 let restored = value["restored"].as_array().unwrap();
2501 assert_eq!(restored.len(), 2);
2502 for addr in &["a:5432", "b:5432"] {
2504 assert!(state.node_health.read().await[*addr].healthy);
2505 }
2506 assert!(state.chaos_overrides.read().await.is_empty());
2507 }
2508
2509 #[tokio::test]
2510 async fn test_chaos_force_unhealthy_404s_when_node_unknown() {
2511 let state = Arc::new(AdminState::new());
2512 let body = r#"{"action":"force_unhealthy","target_node":"missing.svc:5432"}"#;
2513 let (status, _) = AdminServer::handle_chaos_request(Some(body), &state)
2514 .await
2515 .expect("handler returns Ok");
2516 assert_eq!(status, 404);
2517 }
2518
2519 #[tokio::test]
2520 async fn test_chaos_400_on_malformed_body() {
2521 let state = Arc::new(AdminState::new());
2522 let (status, _) = AdminServer::handle_chaos_request(Some("not json"), &state)
2523 .await
2524 .expect("handler returns Ok");
2525 assert_eq!(status, 400);
2526 }
2527
2528 #[tokio::test]
2529 async fn test_chaos_400_on_unknown_action() {
2530 let state = Arc::new(AdminState::new());
2531 let body = r#"{"action":"format_disk","target_node":"x"}"#;
2532 let (status, _) = AdminServer::handle_chaos_request(Some(body), &state)
2533 .await
2534 .expect("handler returns Ok");
2535 assert_eq!(status, 400);
2536 }
2537
2538 #[cfg(feature = "ha-tr")]
2539 #[tokio::test]
2540 async fn test_shadow_400_on_malformed_body() {
2541 let (status, _) = AdminServer::handle_shadow_request(Some("not json"))
2542 .await
2543 .expect("handler returns Ok");
2544 assert_eq!(status, 400);
2545 }
2546
2547 #[cfg(feature = "ha-tr")]
2548 #[tokio::test]
2549 async fn test_shadow_500_on_source_unreachable() {
2550 let body = r#"{
2553 "sql": "SELECT 1",
2554 "source_host": "127.0.0.1",
2555 "source_port": 1,
2556 "shadow_host": "127.0.0.1",
2557 "shadow_port": 1
2558 }"#;
2559 let (status, value) = AdminServer::handle_shadow_request(Some(body))
2560 .await
2561 .expect("handler returns Ok");
2562 assert_eq!(status, 500);
2563 let err = value["error"].as_str().expect("error field");
2564 assert!(
2565 err.contains("source connect"),
2566 "expected source connect error, got {}",
2567 err
2568 );
2569 }
2570
2571 #[cfg(feature = "ha-tr")]
2572 #[tokio::test]
2573 async fn test_shadow_errors_on_empty_body() {
2574 let err = AdminServer::handle_shadow_request(None).await;
2575 assert!(err.is_err(), "empty body must surface as Err");
2576 }
2577
2578 #[cfg(feature = "anomaly-detection")]
2579 #[tokio::test]
2580 async fn test_anomalies_returns_503_when_detector_unattached() {
2581 let state = Arc::new(AdminState::new());
2582 let (status, value) = AdminServer::handle_anomalies_list("/anomalies", &state)
2583 .await
2584 .expect("handler returns Ok");
2585 assert_eq!(status, 503);
2586 assert_eq!(value["error"], "anomaly detector not attached");
2587 }
2588
2589 #[cfg(feature = "anomaly-detection")]
2590 #[tokio::test]
2591 async fn test_anomalies_returns_attached_detector_events() {
2592 use crate::anomaly::{AnomalyConfig, AnomalyDetector, QueryObservation};
2593 let state = Arc::new(AdminState::new());
2594 let det = Arc::new(AnomalyDetector::new(AnomalyConfig::default()));
2595 let _ = det.record_query(&QueryObservation {
2597 tenant: "test".into(),
2598 fingerprint: "fp".into(),
2599 sql: "SELECT * FROM users WHERE id = 1 OR 1=1 --".into(),
2600 timestamp: std::time::Instant::now(),
2601 });
2602 state.with_anomaly_detector(det.clone()).await;
2603
2604 let (status, value) = AdminServer::handle_anomalies_list("/anomalies", &state)
2605 .await
2606 .expect("handler returns Ok");
2607 assert_eq!(status, 200);
2608 let count = value["count"].as_u64().expect("count field");
2609 assert!(count > 0, "expected at least one event, got {}", count);
2610 }
2611
2612 #[cfg(feature = "anomaly-detection")]
2613 #[tokio::test]
2614 async fn test_anomalies_limit_query_string_respected() {
2615 use crate::anomaly::{AnomalyConfig, AnomalyDetector, QueryObservation};
2616 let state = Arc::new(AdminState::new());
2617 let det = Arc::new(AnomalyDetector::new(AnomalyConfig::default()));
2618 for i in 0..50 {
2619 let fp = format!("fp{}", i);
2620 let _ = det.record_query(&QueryObservation {
2621 tenant: "test".into(),
2622 fingerprint: fp,
2623 sql: "SELECT 1".into(),
2624 timestamp: std::time::Instant::now(),
2625 });
2626 }
2627 state.with_anomaly_detector(det).await;
2628
2629 let (status, value) = AdminServer::handle_anomalies_list("/anomalies?limit=5", &state)
2630 .await
2631 .expect("handler returns Ok");
2632 assert_eq!(status, 200);
2633 assert_eq!(value["limit"].as_u64().unwrap(), 5);
2634 assert_eq!(value["events"].as_array().unwrap().len(), 5);
2635 }
2636
2637 #[cfg(any(feature = "anomaly-detection", feature = "query-analytics"))]
2638 #[test]
2639 fn test_parse_limit_query_helper() {
2640 assert_eq!(parse_limit_query("/anomalies", 100, 1024), 100);
2641 assert_eq!(parse_limit_query("/anomalies?limit=42", 100, 1024), 42);
2642 assert_eq!(parse_limit_query("/anomalies?limit=99999", 100, 1024), 1024);
2643 assert_eq!(parse_limit_query("/anomalies?limit=abc", 100, 1024), 100);
2644 assert_eq!(
2645 parse_limit_query("/anomalies?other=x&limit=7", 100, 1024),
2646 7
2647 );
2648 }
2649
2650 #[cfg(feature = "edge-proxy")]
2651 async fn edge_state() -> Arc<AdminState> {
2652 use crate::edge::{EdgeCache, EdgeRegistry};
2653 use std::time::Duration;
2654 let s = Arc::new(AdminState::new());
2655 let cache = Arc::new(EdgeCache::new(100));
2656 let registry = Arc::new(EdgeRegistry::new(8, Duration::from_secs(60)));
2657 s.with_edge(cache, registry).await;
2658 s
2659 }
2660
2661 #[cfg(feature = "edge-proxy")]
2662 #[tokio::test]
2663 async fn test_edge_status_returns_empty_lists_initially() {
2664 let s = edge_state().await;
2665 let (status, value) = AdminServer::handle_edge_status(&s)
2666 .await
2667 .expect("handler returns Ok");
2668 assert_eq!(status, 200);
2669 assert_eq!(value["edge_count"].as_u64().unwrap(), 0);
2670 assert_eq!(value["registered"].as_array().unwrap().len(), 0);
2671 assert!(value["cache"].is_object(), "cache stats present");
2672 }
2673
2674 #[cfg(feature = "edge-proxy")]
2675 #[tokio::test]
2676 async fn test_edge_register_then_status_lists_edge() {
2677 let s = edge_state().await;
2678 let body = r#"{"edge_id":"e1","region":"us-east","base_url":"https://e1.svc"}"#;
2679 let (status, _) = AdminServer::handle_edge_register(Some(body), &s)
2680 .await
2681 .expect("handler ok");
2682 assert_eq!(status, 201);
2683 let (status2, value2) = AdminServer::handle_edge_status(&s).await.unwrap();
2684 assert_eq!(status2, 200);
2685 assert_eq!(value2["edge_count"].as_u64().unwrap(), 1);
2686 assert_eq!(value2["registered"][0]["edge_id"].as_str().unwrap(), "e1");
2687 }
2688
2689 #[cfg(feature = "edge-proxy")]
2690 #[tokio::test]
2691 async fn test_edge_register_400_on_malformed_body() {
2692 let s = edge_state().await;
2693 let (status, _) = AdminServer::handle_edge_register(Some("not json"), &s)
2694 .await
2695 .expect("handler ok");
2696 assert_eq!(status, 400);
2697 }
2698
2699 #[cfg(feature = "edge-proxy")]
2700 #[tokio::test]
2701 async fn test_edge_invalidate_drops_local_cache_entries() {
2702 use crate::edge::{CacheEntry, CacheKey};
2703 use std::time::{Duration, Instant};
2704 let s = edge_state().await;
2705 let cache = s.edge_cache.read().await.clone().unwrap();
2707 cache.insert(
2708 CacheKey::new("fp1", "p1"),
2709 CacheEntry {
2710 version: 1,
2711 response_bytes: b"row".to_vec(),
2712 tables: vec!["users".into()],
2713 expires_at: Instant::now() + Duration::from_secs(60),
2714 },
2715 );
2716 assert!(cache.get(&CacheKey::new("fp1", "p1")).is_some());
2717
2718 let body = r#"{"tables":["users"]}"#;
2719 let (status, value) = AdminServer::handle_edge_invalidate(Some(body), &s)
2720 .await
2721 .expect("handler ok");
2722 assert_eq!(status, 200);
2723 assert_eq!(value["dropped_local"].as_u64().unwrap(), 1);
2724 assert!(cache.get(&CacheKey::new("fp1", "p1")).is_none());
2725 }
2726
2727 #[cfg(feature = "edge-proxy")]
2728 #[tokio::test]
2729 async fn test_edge_invalidate_503_when_cache_unattached() {
2730 let s = Arc::new(AdminState::new());
2731 let body = r#"{"tables":["users"]}"#;
2732 let (status, _) = AdminServer::handle_edge_invalidate(Some(body), &s)
2733 .await
2734 .expect("handler ok");
2735 assert_eq!(status, 503);
2736 }
2737}