1#[cfg(feature = "anomaly-detection")]
7use crate::anomaly::AnomalyDetector;
8use crate::config::{NodeConfig, NodeRole, ProxyConfig};
9#[cfg(feature = "edge-proxy")]
10use crate::edge::{EdgeCache, EdgeRegistry, InvalidationEvent};
11#[cfg(feature = "wasm-plugins")]
12use crate::plugins::PluginManager;
13#[cfg(feature = "ha-tr")]
14use crate::replay::{ReplayEngine, TimeTravelRequest};
15use crate::server::{NodeHealth, ServerMetricsSnapshot};
16use crate::{ProxyError, Result};
17#[cfg(feature = "ha-tr")]
18use chrono::{DateTime, Utc};
19use serde::{Deserialize, Serialize};
20use std::collections::HashMap;
21use std::net::SocketAddr;
22use std::sync::atomic::{AtomicUsize, Ordering};
23use std::sync::Arc;
24use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
25use tokio::net::TcpStream;
26use tokio::sync::{broadcast, RwLock};
27
28const ADMIN_UI_HTML: &str = include_str!("admin_ui.html");
32
33fn constant_time_eq_str(a: &str, b: &str) -> bool {
36 let (a, b) = (a.as_bytes(), b.as_bytes());
37 if a.len() != b.len() {
38 return false;
39 }
40 let mut diff = 0u8;
41 for i in 0..a.len() {
42 diff |= a[i] ^ b[i];
43 }
44 diff == 0
45}
46
47pub struct AdminServer {
48 listen_address: String,
50 state: Arc<AdminState>,
52 shutdown_tx: broadcast::Sender<()>,
54}
55
56pub struct AdminState {
58 pub node_health: RwLock<HashMap<String, NodeHealth>>,
60 pub metrics: RwLock<ServerMetricsSnapshot>,
62 pub active_sessions: RwLock<u64>,
64 pub config_snapshot: RwLock<ConfigSnapshot>,
66 pub proxy_config: RwLock<Option<ProxyConfig>>,
68 read_lb_counter: AtomicUsize,
70 commands: RwLock<HashMap<String, CommandHandler>>,
72 #[cfg(feature = "ha-tr")]
76 pub replay_engine: RwLock<Option<Arc<ReplayEngine>>>,
77 #[cfg(feature = "wasm-plugins")]
82 pub plugin_manager: RwLock<Option<Arc<PluginManager>>>,
83 pub chaos_overrides: RwLock<HashMap<String, ChaosOverride>>,
88 #[cfg(feature = "anomaly-detection")]
92 pub anomaly_detector: RwLock<Option<Arc<AnomalyDetector>>>,
93 #[cfg(feature = "edge-proxy")]
96 pub edge_cache: RwLock<Option<Arc<EdgeCache>>>,
97 #[cfg(feature = "edge-proxy")]
98 pub edge_registry: RwLock<Option<Arc<EdgeRegistry>>>,
99 pub auth_token: RwLock<Option<String>>,
102 pub migration: RwLock<Option<MigrationInfo>>,
105 pub branch: RwLock<Option<crate::config::BranchConfig>>,
108}
109
110#[derive(Clone)]
113pub struct MigrationInfo {
114 pub target: String,
115 pub writes_only: bool,
116 pub metrics: Arc<crate::mirror::MirrorMetrics>,
117 pub config: crate::config::MirrorConfig,
119 pub cutover: Arc<arc_swap::ArcSwap<Option<Arc<crate::mirror::CutoverTarget>>>>,
121 pub cutover_target: crate::mirror::CutoverTarget,
122}
123
124#[derive(Debug, Clone, Serialize)]
129pub struct ChaosOverride {
130 pub since: String,
132 pub kind: String,
134 pub note: String,
136}
137
138
139type CommandHandler = Arc<dyn Fn(&[&str]) -> Result<String> + Send + Sync>;
141
142#[derive(Debug, Clone, Serialize, Deserialize)]
144pub struct ConfigSnapshot {
145 pub listen_address: String,
146 pub admin_address: String,
147 pub tr_enabled: bool,
148 pub tr_mode: String,
149 pub pool_min_connections: usize,
150 pub pool_max_connections: usize,
151 pub nodes: Vec<NodeSnapshot>,
152}
153
154#[derive(Debug, Clone, Serialize, Deserialize)]
156pub struct NodeSnapshot {
157 pub address: String,
158 pub role: String,
159 pub weight: u32,
160 pub enabled: bool,
161}
162
163impl AdminServer {
164 pub fn new(listen_address: String, state: Arc<AdminState>) -> Self {
166 let (shutdown_tx, _) = broadcast::channel(1);
167
168 Self {
169 listen_address,
170 state,
171 shutdown_tx,
172 }
173 }
174
175 pub async fn run(&self) -> Result<()> {
177 let listener = crate::server::bind_reuseport(&self.listen_address)?;
180
181 tracing::info!("Admin API listening on {} (SO_REUSEPORT)", self.listen_address);
182
183 let mut shutdown_rx = self.shutdown_tx.subscribe();
184
185 loop {
186 tokio::select! {
187 accept_result = listener.accept() => {
188 match accept_result {
189 Ok((stream, addr)) => {
190 let state = self.state.clone();
191 tokio::spawn(async move {
192 if let Err(e) = Self::handle_connection(stream, addr, state).await {
193 tracing::error!("Admin connection error: {}", e);
194 }
195 });
196 }
197 Err(e) => {
198 tracing::error!("Admin accept error: {}", e);
199 }
200 }
201 }
202 _ = shutdown_rx.recv() => {
203 tracing::info!("Admin server shutting down");
204 break;
205 }
206 }
207 }
208
209 Ok(())
210 }
211
212 async fn handle_connection(
214 mut stream: TcpStream,
215 addr: SocketAddr,
216 state: Arc<AdminState>,
217 ) -> Result<()> {
218 tracing::debug!("Admin connection from {}", addr);
219
220 let (reader, mut writer) = stream.split();
221 let mut reader = BufReader::new(reader);
222 let mut line = String::new();
223
224 let mut headers = Vec::new();
226 let mut content_length: usize = 0;
227
228 loop {
229 line.clear();
230 let bytes_read = reader
231 .read_line(&mut line)
232 .await
233 .map_err(|e| ProxyError::Network(format!("Read error: {}", e)))?;
234
235 if bytes_read == 0 || line == "\r\n" {
236 break;
237 }
238
239 let trimmed = line.trim();
241 if trimmed.to_lowercase().starts_with("content-length:") {
242 if let Some(len_str) = trimmed.split(':').nth(1) {
243 content_length = len_str.trim().parse().unwrap_or(0);
244 }
245 }
246 headers.push(trimmed.to_string());
247 }
248
249 if headers.is_empty() {
250 return Ok(());
251 }
252
253 let request_line = &headers[0];
255 let parts: Vec<&str> = request_line.split_whitespace().collect();
256
257 if parts.len() < 2 {
258 Self::send_response(&mut writer, 400, "Bad Request", "Invalid request line").await?;
259 return Ok(());
260 }
261
262 let method = parts[0];
263 let path = parts[1];
264
265 {
269 let required = state.auth_token.read().await.clone();
270 if let Some(token) = required {
271 let path_only = path.split('?').next().unwrap_or(path);
272 let is_liveness = method == "GET"
273 && matches!(path_only, "/health" | "/healthz" | "/livez" | "/readyz");
274 if !is_liveness && !Self::admin_authorized(&headers, &token) {
275 Self::send_response(
276 &mut writer,
277 401,
278 "Unauthorized",
279 "{\"error\":\"missing or invalid admin bearer token\"}",
280 )
281 .await?;
282 return Ok(());
283 }
284 }
285 }
286
287 let body = if content_length > 0 && (method == "POST" || method == "PUT") {
289 let mut body_buf = vec![0u8; content_length];
290 reader.read_exact(&mut body_buf).await
291 .map_err(|e| ProxyError::Network(format!("Body read error: {}", e)))?;
292 Some(String::from_utf8_lossy(&body_buf).to_string())
293 } else {
294 None
295 };
296
297 if method == "GET" && (path == "/" || path == "/ui" || path == "/ui/") {
300 Self::send_html_response(&mut writer, 200, ADMIN_UI_HTML).await?;
301 return Ok(());
302 }
303
304 let response = Self::route_request(method, path, body.as_deref(), &state).await;
306
307 match response {
308 Ok((status, body)) => {
309 Self::send_json_response(&mut writer, status, &body).await?;
310 }
311 Err(e) => {
312 let error = ErrorResponse {
313 error: e.to_string(),
314 };
315 Self::send_json_response(&mut writer, 500, &error).await?;
316 }
317 }
318
319 Ok(())
320 }
321
322 fn admin_authorized(headers: &[String], token: &str) -> bool {
325 let expected = format!("Bearer {}", token);
326 for h in headers {
327 let mut sp = h.splitn(2, ':');
328 let name = sp.next().unwrap_or("").trim();
329 if name.eq_ignore_ascii_case("authorization") {
330 let value = sp.next().unwrap_or("").trim();
331 return constant_time_eq_str(value, &expected);
332 }
333 }
334 false
335 }
336
337 async fn send_html_response(
339 writer: &mut tokio::net::tcp::WriteHalf<'_>,
340 status: u16,
341 html: &str,
342 ) -> Result<()> {
343 let status_text = match status {
344 200 => "OK",
345 404 => "Not Found",
346 _ => "Unknown",
347 };
348 let response = format!(
349 "HTTP/1.1 {} {}\r\nContent-Type: text/html; charset=utf-8\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
350 status,
351 status_text,
352 html.len(),
353 html
354 );
355 writer
356 .write_all(response.as_bytes())
357 .await
358 .map_err(|e| ProxyError::Network(format!("Write error: {}", e)))?;
359 Ok(())
360 }
361
362 async fn route_request(
364 method: &str,
365 path: &str,
366 body: Option<&str>,
367 state: &Arc<AdminState>,
368 ) -> Result<(u16, serde_json::Value)> {
369 match (method, path) {
370 ("POST", "/api/sql") => {
372 Self::handle_sql_request(body, state).await
373 }
374
375 ("GET", "/health") => {
377 let health = HealthResponse { status: "ok" };
378 Ok((200, serde_json::to_value(health)?))
379 }
380 ("GET", "/health/ready") => {
381 let ready = Self::check_readiness(state).await;
382 let response = ReadinessResponse {
383 ready,
384 message: if ready {
385 "Proxy is ready"
386 } else {
387 "Proxy is not ready"
388 },
389 };
390 let status = if ready { 200 } else { 503 };
391 Ok((status, serde_json::to_value(response)?))
392 }
393 ("GET", "/health/live") => {
394 let response = LivenessResponse { alive: true };
395 Ok((200, serde_json::to_value(response)?))
396 }
397
398 ("GET", "/metrics") => {
400 let metrics = state.metrics.read().await.clone();
401 Ok((200, serde_json::to_value(MetricsResponse::from(metrics))?))
402 }
403 ("GET", "/metrics/prometheus") => {
404 let metrics = state.metrics.read().await.clone();
405 let prometheus = Self::format_prometheus_metrics(&metrics);
406 Ok((200, serde_json::json!({ "text": prometheus })))
407 }
408
409 ("GET", "/nodes") => {
411 let health = state.node_health.read().await;
412 let nodes: Vec<NodeHealthResponse> = health
413 .values()
414 .map(|h| NodeHealthResponse::from(h.clone()))
415 .collect();
416 Ok((200, serde_json::to_value(nodes)?))
417 }
418 ("GET", path) if path.starts_with("/nodes/") => {
419 let node_addr = path.trim_start_matches("/nodes/");
420 let health = state.node_health.read().await;
421 match health.get(node_addr) {
422 Some(h) => Ok((200, serde_json::to_value(NodeHealthResponse::from(h.clone()))?)),
423 None => Ok((404, serde_json::json!({ "error": "Node not found" }))),
424 }
425 }
426 ("POST", path) if path.starts_with("/nodes/") && path.ends_with("/enable") => {
427 let node_addr = path
428 .trim_start_matches("/nodes/")
429 .trim_end_matches("/enable");
430 Self::set_node_enabled(state, node_addr, true).await?;
431 Ok((200, serde_json::json!({ "status": "enabled" })))
432 }
433 ("POST", path) if path.starts_with("/nodes/") && path.ends_with("/disable") => {
434 let node_addr = path
435 .trim_start_matches("/nodes/")
436 .trim_end_matches("/disable");
437 Self::set_node_enabled(state, node_addr, false).await?;
438 Ok((200, serde_json::json!({ "status": "disabled" })))
439 }
440
441 ("GET", "/topology") => {
447 let topo = Self::compute_topology(state).await;
448 Ok((200, serde_json::to_value(topo)?))
449 }
450
451 #[cfg(feature = "ha-tr")]
455 ("POST", "/api/replay") => Self::handle_replay_request(body, state).await,
456 #[cfg(not(feature = "ha-tr"))]
457 ("POST", "/api/replay") => Ok((
458 503,
459 serde_json::json!({ "error": "ha-tr feature not compiled in" }),
460 )),
461
462 #[cfg(feature = "ha-tr")]
468 ("POST", "/api/shadow") => Self::handle_shadow_request(body).await,
469 #[cfg(not(feature = "ha-tr"))]
470 ("POST", "/api/shadow") => Ok((
471 503,
472 serde_json::json!({ "error": "ha-tr feature not compiled in" }),
473 )),
474
475 ("GET", "/plugins") => Self::handle_plugins_list(state).await,
480
481 #[cfg(feature = "anomaly-detection")]
485 ("GET", p) if p == "/anomalies" || p.starts_with("/anomalies?") => {
486 Self::handle_anomalies_list(p, state).await
487 }
488 #[cfg(not(feature = "anomaly-detection"))]
489 ("GET", p) if p == "/anomalies" || p.starts_with("/anomalies?") => Ok((
490 503,
491 serde_json::json!({ "error": "anomaly-detection feature not compiled in" }),
492 )),
493
494 #[cfg(feature = "edge-proxy")]
498 ("GET", "/api/edge") => Self::handle_edge_status(state).await,
499 #[cfg(feature = "edge-proxy")]
500 ("POST", "/api/edge/register") => {
501 Self::handle_edge_register(body, state).await
502 }
503 #[cfg(feature = "edge-proxy")]
504 ("POST", "/api/edge/invalidate") => {
505 Self::handle_edge_invalidate(body, state).await
506 }
507 #[cfg(not(feature = "edge-proxy"))]
508 ("GET", "/api/edge")
509 | ("POST", "/api/edge/register")
510 | ("POST", "/api/edge/invalidate") => Ok((
511 503,
512 serde_json::json!({ "error": "edge-proxy feature not compiled in" }),
513 )),
514
515 ("POST", "/api/chaos") => Self::handle_chaos_request(body, state).await,
519 ("GET", "/api/chaos") => {
522 let overrides = state.chaos_overrides.read().await.clone();
523 Ok((200, serde_json::to_value(overrides)?))
524 }
525
526 ("GET", "/api/migration/status") | ("GET", "/migration/status") => {
528 match state.migration.read().await.as_ref() {
529 Some(info) => {
530 let st = crate::mirror::status(&info.target, info.writes_only, &info.metrics);
531 let mut v = serde_json::to_value(st)?;
532 let cut = info.cutover.load_full().is_some();
533 v["cutover_active"] = serde_json::json!(cut);
534 Ok((200, v))
535 }
536 None => Ok((503, serde_json::json!({ "error": "traffic mirroring not enabled" }))),
537 }
538 }
539
540 ("POST", "/api/migration/cutover") | ("POST", "/migration/cutover") => {
542 let info = state.migration.read().await.clone();
543 let Some(info) = info else {
544 return Ok((503, serde_json::json!({ "error": "traffic mirroring not enabled" })));
545 };
546 let force = path.contains("force=true")
547 || body.map(|b| b.contains("\"force\":true")).unwrap_or(false);
548 let st = crate::mirror::status(&info.target, info.writes_only, &info.metrics);
549 if !st.migration_ready && !force {
550 return Ok((409, serde_json::json!({
551 "ok": false,
552 "error": "not migration_ready (backlog/drops present); pass force=true to override",
553 "status": st,
554 })));
555 }
556 info.cutover.store(Arc::new(Some(Arc::new(info.cutover_target.clone()))));
557 tracing::warn!(target = %info.cutover_target.addr, "migration cutover: new connections now route to the promoted target");
558 Ok((200, serde_json::json!({ "ok": true, "promoted_to": info.cutover_target.addr })))
559 }
560
561 ("POST", "/api/migration/cutover/rollback") | ("POST", "/migration/cutover/rollback") => {
563 let info = state.migration.read().await.clone();
564 let Some(info) = info else {
565 return Ok((503, serde_json::json!({ "error": "traffic mirroring not enabled" })));
566 };
567 info.cutover.store(Arc::new(None));
568 Ok((200, serde_json::json!({ "ok": true, "rolled_back": true })))
569 }
570
571 ("POST", "/api/migration/snapshot") | ("POST", "/migration/snapshot") => {
573 let info = state.migration.read().await.clone();
574 let Some(info) = info else {
575 return Ok((503, serde_json::json!({ "error": "traffic mirroring not enabled" })));
576 };
577 let body = body.unwrap_or("{}");
578 let req: serde_json::Value = serde_json::from_str(body)
579 .map_err(|e| ProxyError::Internal(format!("invalid JSON: {}", e)))?;
580 let tables: Vec<String> = req
581 .get("tables")
582 .and_then(|t| t.as_array())
583 .map(|a| a.iter().filter_map(|v| v.as_str().map(String::from)).collect())
584 .unwrap_or_default();
585 if tables.is_empty() {
586 return Ok((400, serde_json::json!({ "error": "provide a non-empty 'tables' array" })));
587 }
588 match crate::mirror::snapshot_tables(&info.config, &tables).await {
589 Ok(rep) => {
590 let total: u64 = rep.iter().map(|t| t.copied).sum();
591 Ok((200, serde_json::json!({ "ok": true, "tables": rep, "rows_copied": total })))
592 }
593 Err(e) => Ok((500, serde_json::json!({ "ok": false, "error": e }))),
594 }
595 }
596
597 ("GET", p) if p == "/api/branch" || p == "/branch" || p.starts_with("/api/branch?") => {
599 let cfg = state.branch.read().await.clone();
600 let Some(cfg) = cfg else {
601 return Ok((503, serde_json::json!({ "error": "branch databases not enabled" })));
602 };
603 match crate::branch::list(&cfg).await {
604 Ok(branches) => Ok((200, serde_json::json!({ "branches": branches }))),
605 Err(e) => Ok((500, serde_json::json!({ "error": e }))),
606 }
607 }
608 ("POST", p) if p == "/api/branch" || p == "/branch" => {
609 let cfg = state.branch.read().await.clone();
610 let Some(cfg) = cfg else {
611 return Ok((503, serde_json::json!({ "error": "branch databases not enabled" })));
612 };
613 let req: serde_json::Value =
614 serde_json::from_str(body.unwrap_or("{}")).map_err(|e| ProxyError::Internal(format!("invalid JSON: {}", e)))?;
615 let name = req.get("name").and_then(|v| v.as_str()).unwrap_or("");
616 if name.is_empty() {
617 return Ok((400, serde_json::json!({ "error": "provide 'name'" })));
618 }
619 let base = req.get("base").and_then(|v| v.as_str());
620 match crate::branch::create(&cfg, name, base).await {
621 Ok(()) => Ok((200, serde_json::json!({ "ok": true, "branch": name,
622 "base": base.unwrap_or(&cfg.base_database) }))),
623 Err(e) => Ok((500, serde_json::json!({ "ok": false, "error": e }))),
624 }
625 }
626 ("DELETE", p) if p.starts_with("/api/branch") || p.starts_with("/branch") => {
627 let cfg = state.branch.read().await.clone();
628 let Some(cfg) = cfg else {
629 return Ok((503, serde_json::json!({ "error": "branch databases not enabled" })));
630 };
631 let name = p.find("name=").map(|i| &p[i + 5..]).unwrap_or("");
632 if name.is_empty() {
633 return Ok((400, serde_json::json!({ "error": "provide ?name=<branch>" })));
634 }
635 match crate::branch::drop(&cfg, name).await {
636 Ok(()) => Ok((200, serde_json::json!({ "ok": true, "dropped": name }))),
637 Err(e) => Ok((500, serde_json::json!({ "ok": false, "error": e }))),
638 }
639 }
640
641 ("GET", "/config") => {
643 let config = state.config_snapshot.read().await.clone();
644 Ok((200, serde_json::to_value(config)?))
645 }
646
647 ("GET", "/sessions") => {
649 let count = *state.active_sessions.read().await;
650 let response = SessionsResponse {
651 active_sessions: count,
652 };
653 Ok((200, serde_json::to_value(response)?))
654 }
655
656 ("GET", "/pools") => {
658 let pools = Self::get_pool_stats(state).await;
659 Ok((200, serde_json::to_value(pools)?))
660 }
661
662 ("GET", "/version") => {
664 let response = VersionResponse {
665 version: crate::VERSION.to_string(),
666 build_time: env!("CARGO_PKG_VERSION").to_string(),
667 };
668 Ok((200, serde_json::to_value(response)?))
669 }
670
671 _ => Ok((404, serde_json::json!({ "error": "Not found" }))),
673 }
674 }
675
676 async fn handle_sql_request(
678 body: Option<&str>,
679 state: &Arc<AdminState>,
680 ) -> Result<(u16, serde_json::Value)> {
681 let body = body.ok_or_else(|| ProxyError::Internal("Missing request body".to_string()))?;
683 let request: SqlRequest = serde_json::from_str(body)
684 .map_err(|e| ProxyError::Internal(format!("Invalid JSON: {}", e)))?;
685
686 let sql = request.query.trim();
687 if sql.is_empty() {
688 return Ok((400, serde_json::json!({ "error": "Empty query" })));
689 }
690
691 let is_write = Self::is_write_query(sql);
693 let query_type = if is_write { "write" } else { "read" };
694
695 let proxy_config = state.proxy_config.read().await;
697 let config = proxy_config.as_ref()
698 .ok_or_else(|| ProxyError::Internal("Proxy config not initialized".to_string()))?;
699
700 let health = state.node_health.read().await;
702
703 let target_node = if is_write {
705 Self::select_primary_node(config, &health)?
707 } else {
708 Self::select_read_node(config, &health, state)?
710 };
711
712 let target_address = format!("{}:{}", target_node.host, target_node.port);
713 let http_port = target_node.http_port;
715 let http_url = format!("http://{}:{}/api/sql", target_node.host, http_port);
716
717 tracing::debug!(
718 "Routing {} query to {} ({})",
719 query_type,
720 target_address,
721 match target_node.role {
722 NodeRole::Primary => "primary",
723 NodeRole::Standby => "standby",
724 NodeRole::ReadReplica => "replica",
725 }
726 );
727
728 let result = Self::forward_sql_request(&http_url, sql).await?;
730
731 let response = SqlResponse {
733 query_type: query_type.to_string(),
734 routed_to: target_address,
735 node_role: format!("{:?}", target_node.role).to_lowercase(),
736 result,
737 };
738
739 Ok((200, serde_json::to_value(response)?))
740 }
741
742 fn is_write_query(sql: &str) -> bool {
744 let upper = sql.trim().to_uppercase();
745
746 if upper.starts_with("INSERT")
748 || upper.starts_with("UPDATE")
749 || upper.starts_with("DELETE")
750 || upper.starts_with("CREATE")
751 || upper.starts_with("ALTER")
752 || upper.starts_with("DROP")
753 || upper.starts_with("TRUNCATE")
754 || upper.starts_with("GRANT")
755 || upper.starts_with("REVOKE")
756 || upper.starts_with("VACUUM")
757 || upper.starts_with("REINDEX")
758 || upper.starts_with("MERGE")
759 || upper.starts_with("UPSERT")
760 {
761 return true;
762 }
763
764 if upper.starts_with("BEGIN")
766 || upper.starts_with("COMMIT")
767 || upper.starts_with("ROLLBACK")
768 || upper.starts_with("SAVEPOINT")
769 {
770 return true;
772 }
773
774 false
776 }
777
778 fn select_primary_node<'a>(
780 config: &'a ProxyConfig,
781 health: &HashMap<String, NodeHealth>,
782 ) -> Result<&'a NodeConfig> {
783 config.nodes.iter()
784 .find(|n| {
785 n.role == NodeRole::Primary
786 && n.enabled
787 && health.get(&n.address()).map(|h| h.healthy).unwrap_or(false)
788 })
789 .ok_or_else(|| ProxyError::Internal("No healthy primary node available".to_string()))
790 }
791
792 fn select_read_node<'a>(
794 config: &'a ProxyConfig,
795 health: &HashMap<String, NodeHealth>,
796 state: &AdminState,
797 ) -> Result<&'a NodeConfig> {
798 let healthy_nodes: Vec<&NodeConfig> = config.nodes.iter()
800 .filter(|n| n.enabled && health.get(&n.address()).map(|h| h.healthy).unwrap_or(false))
801 .collect();
802
803 if healthy_nodes.is_empty() {
804 return Err(ProxyError::Internal("No healthy nodes available".to_string()));
805 }
806
807 if config.load_balancer.read_write_split {
809 let read_nodes: Vec<&NodeConfig> = healthy_nodes.iter()
810 .filter(|n| n.role == NodeRole::Standby || n.role == NodeRole::ReadReplica)
811 .copied()
812 .collect();
813
814 if !read_nodes.is_empty() {
815 let counter = state.read_lb_counter.fetch_add(1, Ordering::Relaxed);
817 let index = counter % read_nodes.len();
818 return Ok(read_nodes[index]);
819 }
820 }
821
822 let counter = state.read_lb_counter.fetch_add(1, Ordering::Relaxed);
824 let index = counter % healthy_nodes.len();
825 Ok(healthy_nodes[index])
826 }
827
828 async fn forward_sql_request(url: &str, sql: &str) -> Result<serde_json::Value> {
830 let request_body = serde_json::json!({ "query": sql });
832 let body_bytes = serde_json::to_vec(&request_body)
833 .map_err(|e| ProxyError::Internal(format!("JSON serialization error: {}", e)))?;
834
835 let url_parts: Vec<&str> = url.trim_start_matches("http://").splitn(2, '/').collect();
837 if url_parts.is_empty() {
838 return Err(ProxyError::Internal("Invalid URL".to_string()));
839 }
840
841 let host_port = url_parts[0];
842 let path = if url_parts.len() > 1 {
843 format!("/{}", url_parts[1])
844 } else {
845 "/".to_string()
846 };
847
848 let stream = TcpStream::connect(host_port).await
850 .map_err(|e| ProxyError::Network(format!("Failed to connect to {}: {}", host_port, e)))?;
851
852 let (reader, mut writer) = stream.into_split();
853 let mut reader = BufReader::new(reader);
854
855 let request = format!(
857 "POST {} HTTP/1.1\r\nHost: {}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
858 path,
859 host_port,
860 body_bytes.len()
861 );
862
863 writer.write_all(request.as_bytes()).await
864 .map_err(|e| ProxyError::Network(format!("Write error: {}", e)))?;
865 writer.write_all(&body_bytes).await
866 .map_err(|e| ProxyError::Network(format!("Write body error: {}", e)))?;
867
868 let mut response_headers = Vec::new();
870 let mut line = String::new();
871 let mut content_length: usize = 0;
872
873 loop {
874 line.clear();
875 let bytes_read = reader.read_line(&mut line).await
876 .map_err(|e| ProxyError::Network(format!("Response read error: {}", e)))?;
877
878 if bytes_read == 0 || line == "\r\n" {
879 break;
880 }
881
882 let trimmed = line.trim();
883 if trimmed.to_lowercase().starts_with("content-length:") {
884 if let Some(len_str) = trimmed.split(':').nth(1) {
885 content_length = len_str.trim().parse().unwrap_or(0);
886 }
887 }
888 response_headers.push(trimmed.to_string());
889 }
890
891 let mut body_buf = vec![0u8; content_length];
893 if content_length > 0 {
894 reader.read_exact(&mut body_buf).await
895 .map_err(|e| ProxyError::Network(format!("Response body read error: {}", e)))?;
896 }
897
898 let response_body = String::from_utf8_lossy(&body_buf);
899
900 serde_json::from_str(&response_body)
902 .map_err(|e| ProxyError::Internal(format!("Invalid JSON response: {} - body: {}", e, response_body)))
903 }
904
905 async fn check_readiness(state: &Arc<AdminState>) -> bool {
907 let health = state.node_health.read().await;
908
909 health.values().any(|h| h.healthy)
911 }
912
913 async fn set_node_enabled(state: &Arc<AdminState>, node_addr: &str, enabled: bool) -> Result<()> {
915 let mut health = state.node_health.write().await;
916
917 if let Some(node_health) = health.get_mut(node_addr) {
918 node_health.healthy = enabled;
919 Ok(())
920 } else {
921 Err(ProxyError::Config(format!("Node not found: {}", node_addr)))
922 }
923 }
924
925 async fn get_pool_stats(_state: &Arc<AdminState>) -> Vec<PoolStatsResponse> {
927 Vec::new()
929 }
930
931 #[cfg(feature = "ha-tr")]
935 async fn handle_replay_request(
936 body: Option<&str>,
937 state: &Arc<AdminState>,
938 ) -> Result<(u16, serde_json::Value)> {
939 let raw = body.ok_or_else(|| {
940 ProxyError::Internal("replay: empty request body".to_string())
941 })?;
942 let req: ReplayRequestBody = match serde_json::from_str(raw) {
943 Ok(r) => r,
944 Err(e) => {
945 return Ok((
946 400,
947 serde_json::json!({ "error": format!("invalid body: {}", e) }),
948 ));
949 }
950 };
951 let engine = match state.replay_engine.read().await.clone() {
952 Some(e) => e,
953 None => {
954 return Ok((
955 503,
956 serde_json::json!({ "error": "replay engine not attached" }),
957 ));
958 }
959 };
960 let tt = TimeTravelRequest {
961 from: req.from,
962 to: req.to,
963 target_host: req.target_host,
964 target_port: req.target_port,
965 target_user: req.target_user,
966 target_password: req.target_password,
967 target_database: req.target_database,
968 };
969 match engine.replay_window(&tt).await {
970 Ok(summary) => Ok((200, serde_json::to_value(summary)?)),
971 Err(e) => Ok((
972 500,
973 serde_json::json!({ "error": format!("replay failed: {}", e) }),
974 )),
975 }
976 }
977
978 #[cfg(feature = "edge-proxy")]
981 async fn handle_edge_status(
982 state: &Arc<AdminState>,
983 ) -> Result<(u16, serde_json::Value)> {
984 let cache_stats = match state.edge_cache.read().await.clone() {
985 Some(c) => Some(c.stats()),
986 None => None,
987 };
988 let edges = match state.edge_registry.read().await.clone() {
989 Some(r) => r.list(),
990 None => Vec::new(),
991 };
992 Ok((200, serde_json::json!({
993 "cache": cache_stats,
994 "registered": edges,
995 "edge_count": edges.len(),
996 })))
997 }
998
999 #[cfg(feature = "edge-proxy")]
1004 async fn handle_edge_register(
1005 body: Option<&str>,
1006 state: &Arc<AdminState>,
1007 ) -> Result<(u16, serde_json::Value)> {
1008 let raw = body.ok_or_else(|| {
1009 ProxyError::Internal("edge register: empty body".to_string())
1010 })?;
1011 let req: EdgeRegisterBody = match serde_json::from_str(raw) {
1012 Ok(r) => r,
1013 Err(e) => {
1014 return Ok((
1015 400,
1016 serde_json::json!({ "error": format!("invalid body: {}", e) }),
1017 ));
1018 }
1019 };
1020 let registry = match state.edge_registry.read().await.clone() {
1021 Some(r) => r,
1022 None => {
1023 return Ok((
1024 503,
1025 serde_json::json!({ "error": "edge registry not attached" }),
1026 ));
1027 }
1028 };
1029 let now = chrono::Utc::now().to_rfc3339();
1030 match registry.register(&req.edge_id, &req.region, &req.base_url, &now) {
1031 Ok(_rx) => {
1032 Ok((201, serde_json::json!({
1038 "edge_id": req.edge_id,
1039 "region": req.region,
1040 "base_url": req.base_url,
1041 "registered_at": now,
1042 })))
1043 }
1044 Err(e) => Ok((
1045 503,
1046 serde_json::json!({ "error": e.to_string() }),
1047 )),
1048 }
1049 }
1050
1051 #[cfg(feature = "edge-proxy")]
1058 async fn handle_edge_invalidate(
1059 body: Option<&str>,
1060 state: &Arc<AdminState>,
1061 ) -> Result<(u16, serde_json::Value)> {
1062 let raw = body.ok_or_else(|| {
1063 ProxyError::Internal("edge invalidate: empty body".to_string())
1064 })?;
1065 let req: EdgeInvalidateBody = match serde_json::from_str(raw) {
1066 Ok(r) => r,
1067 Err(e) => {
1068 return Ok((
1069 400,
1070 serde_json::json!({ "error": format!("invalid body: {}", e) }),
1071 ));
1072 }
1073 };
1074 let cache = match state.edge_cache.read().await.clone() {
1075 Some(c) => c,
1076 None => {
1077 return Ok((
1078 503,
1079 serde_json::json!({ "error": "edge cache not attached" }),
1080 ));
1081 }
1082 };
1083 let registry = match state.edge_registry.read().await.clone() {
1084 Some(r) => r,
1085 None => {
1086 return Ok((
1087 503,
1088 serde_json::json!({ "error": "edge registry not attached" }),
1089 ));
1090 }
1091 };
1092 let version = req.up_to_version.unwrap_or_else(|| cache.next_version());
1093 let dropped_local = cache.invalidate(version, &req.tables);
1095 let ev = InvalidationEvent {
1097 up_to_version: version,
1098 tables: req.tables.clone(),
1099 committed_at: chrono::Utc::now().to_rfc3339(),
1100 };
1101 let (sent, pruned) = registry.broadcast(ev).await;
1102 Ok((200, serde_json::json!({
1103 "version": version,
1104 "tables": req.tables,
1105 "dropped_local": dropped_local,
1106 "edges_notified": sent,
1107 "edges_pruned": pruned,
1108 })))
1109 }
1110
1111 #[cfg(feature = "anomaly-detection")]
1116 async fn handle_anomalies_list(
1117 path: &str,
1118 state: &Arc<AdminState>,
1119 ) -> Result<(u16, serde_json::Value)> {
1120 let limit = parse_limit_query(path, 100, 1024);
1121 let det = match state.anomaly_detector.read().await.clone() {
1122 Some(d) => d,
1123 None => {
1124 return Ok((
1125 503,
1126 serde_json::json!({ "error": "anomaly detector not attached" }),
1127 ));
1128 }
1129 };
1130 let events = det.recent_events(limit);
1131 Ok((200, serde_json::json!({
1132 "count": events.len(),
1133 "limit": limit,
1134 "events": events,
1135 "buffer_total": det.event_count(),
1136 })))
1137 }
1138
1139 #[cfg(feature = "ha-tr")]
1149 async fn handle_shadow_request(
1150 body: Option<&str>,
1151 ) -> Result<(u16, serde_json::Value)> {
1152 use crate::backend::{tls::default_client_config, BackendClient, BackendConfig, ParamValue, TlsMode};
1153 use crate::shadow_execute::shadow_execute;
1154
1155 let raw = body.ok_or_else(|| {
1156 ProxyError::Internal("shadow: empty request body".to_string())
1157 })?;
1158 let req: ShadowRequestBody = match serde_json::from_str(raw) {
1159 Ok(r) => r,
1160 Err(e) => {
1161 return Ok((
1162 400,
1163 serde_json::json!({ "error": format!("invalid body: {}", e) }),
1164 ));
1165 }
1166 };
1167
1168 let mk_cfg = |host: String, port: u16, user: Option<String>, password: Option<String>, database: Option<String>| BackendConfig {
1171 host,
1172 port,
1173 user: user.unwrap_or_else(|| "postgres".into()),
1174 password,
1175 database,
1176 application_name: Some("heliosdb-proxy-shadow".into()),
1177 tls_mode: TlsMode::Disable,
1178 connect_timeout: std::time::Duration::from_secs(5),
1179 query_timeout: std::time::Duration::from_secs(30),
1180 tls_config: default_client_config(),
1181 };
1182 let source_cfg = mk_cfg(
1183 req.source_host,
1184 req.source_port,
1185 req.source_user,
1186 req.source_password,
1187 req.source_database,
1188 );
1189 let shadow_cfg = mk_cfg(
1190 req.shadow_host,
1191 req.shadow_port,
1192 req.shadow_user,
1193 req.shadow_password,
1194 req.shadow_database,
1195 );
1196
1197 let mut source = match BackendClient::connect(&source_cfg).await {
1201 Ok(c) => c,
1202 Err(e) => {
1203 return Ok((
1204 500,
1205 serde_json::json!({ "error": format!("source connect: {}", e) }),
1206 ));
1207 }
1208 };
1209
1210 let params: Vec<ParamValue> = req
1211 .params
1212 .unwrap_or_default()
1213 .into_iter()
1214 .map(|s| ParamValue::Text(s))
1215 .collect();
1216
1217 let outcome = shadow_execute(&mut source, &shadow_cfg, &req.sql, ¶ms).await;
1218 source.close().await;
1219
1220 match outcome {
1221 Ok((_qr, report)) => Ok((200, serde_json::json!({
1222 "sql": report.sql,
1223 "both_succeeded": report.both_succeeded,
1224 "row_count_match": report.row_count_match,
1225 "row_hash_match": report.row_hash_match,
1226 "primary_elapsed_us": report.primary_elapsed_us,
1227 "shadow_elapsed_us": report.shadow_elapsed_us,
1228 "primary_error": report.primary_error,
1229 "shadow_error": report.shadow_error,
1230 "is_clean": report.is_clean(),
1231 }))),
1232 Err(e) => Ok((
1233 500,
1234 serde_json::json!({ "error": format!("shadow execute: {}", e) }),
1235 )),
1236 }
1237 }
1238
1239 async fn handle_chaos_request(
1255 body: Option<&str>,
1256 state: &Arc<AdminState>,
1257 ) -> Result<(u16, serde_json::Value)> {
1258 let raw = body.ok_or_else(|| {
1259 ProxyError::Internal("chaos: empty request body".to_string())
1260 })?;
1261 let action: ChaosAction = match serde_json::from_str(raw) {
1262 Ok(a) => a,
1263 Err(e) => {
1264 return Ok((
1265 400,
1266 serde_json::json!({ "error": format!("invalid body: {}", e) }),
1267 ));
1268 }
1269 };
1270 match action {
1271 ChaosAction::ForceUnhealthy { target_node } => {
1272 if let Err(e) = Self::set_node_enabled(state, &target_node, false).await {
1273 return Ok((
1274 404,
1275 serde_json::json!({ "error": e.to_string() }),
1276 ));
1277 }
1278 state.chaos_overrides.write().await.insert(
1279 target_node.clone(),
1280 ChaosOverride {
1281 since: chrono::Utc::now().to_rfc3339(),
1282 kind: "force_unhealthy".to_string(),
1283 note: format!("forced unhealthy via chaos endpoint"),
1284 },
1285 );
1286 Ok((200, serde_json::json!({
1287 "applied": "force_unhealthy",
1288 "target_node": target_node,
1289 })))
1290 }
1291 ChaosAction::Restore { target_node } => {
1292 if let Err(e) = Self::set_node_enabled(state, &target_node, true).await {
1293 return Ok((
1294 404,
1295 serde_json::json!({ "error": e.to_string() }),
1296 ));
1297 }
1298 state.chaos_overrides.write().await.remove(&target_node);
1299 Ok((200, serde_json::json!({
1300 "restored": target_node,
1301 })))
1302 }
1303 ChaosAction::Reset => {
1304 let overrides: Vec<String> =
1305 state.chaos_overrides.read().await.keys().cloned().collect();
1306 let mut restored = Vec::with_capacity(overrides.len());
1307 for addr in overrides {
1308 let _ = Self::set_node_enabled(state, &addr, true).await;
1309 restored.push(addr);
1310 }
1311 state.chaos_overrides.write().await.clear();
1312 Ok((200, serde_json::json!({
1313 "reset": true,
1314 "restored": restored,
1315 })))
1316 }
1317 }
1318 }
1319
1320 #[cfg(feature = "wasm-plugins")]
1326 async fn handle_plugins_list(state: &Arc<AdminState>) -> Result<(u16, serde_json::Value)> {
1327 let pm = match state.plugin_manager.read().await.clone() {
1328 Some(p) => p,
1329 None => {
1330 return Ok((
1331 503,
1332 serde_json::json!({ "error": "plugin manager not attached" }),
1333 ));
1334 }
1335 };
1336 let plugins: Vec<PluginListEntry> = pm
1337 .list_plugins()
1338 .into_iter()
1339 .map(|info| PluginListEntry {
1340 name: info.name,
1341 version: info.version,
1342 description: info.description,
1343 hooks: info
1344 .hooks
1345 .iter()
1346 .map(|h| h.export_name().to_string())
1347 .collect(),
1348 state: format!("{:?}", info.state),
1349 invocations: info.stats.total_calls,
1350 errors: info.stats.error_count,
1351 })
1352 .collect();
1353 Ok((200, serde_json::to_value(plugins)?))
1354 }
1355
1356 #[cfg(not(feature = "wasm-plugins"))]
1357 async fn handle_plugins_list(_state: &Arc<AdminState>) -> Result<(u16, serde_json::Value)> {
1358 Ok((
1359 503,
1360 serde_json::json!({ "error": "wasm-plugins feature not compiled in" }),
1361 ))
1362 }
1363
1364 async fn compute_topology(state: &Arc<AdminState>) -> TopologyResponse {
1370 let health = state.node_health.read().await;
1371 let cfg = state.config_snapshot.read().await;
1372
1373 let mut current_primary: Option<String> = None;
1374 for n in &cfg.nodes {
1375 if n.role.eq_ignore_ascii_case("primary") {
1376 let healthy = health.get(&n.address).map(|h| h.healthy).unwrap_or(false);
1377 if healthy {
1378 current_primary = Some(n.address.clone());
1379 break;
1380 }
1381 }
1382 }
1383
1384 let healthy_nodes = health.values().filter(|h| h.healthy).count() as u32;
1385 let unhealthy_nodes = health.values().filter(|h| !h.healthy).count() as u32;
1386 let total_nodes = cfg.nodes.len() as u32;
1387
1388 TopologyResponse {
1389 current_primary,
1390 healthy_nodes,
1391 unhealthy_nodes,
1392 total_nodes,
1393 last_failover_at: None,
1394 }
1395 }
1396
1397 fn format_prometheus_metrics(metrics: &ServerMetricsSnapshot) -> String {
1399 let mut output = String::new();
1400
1401 output.push_str("# HELP heliosdb_proxy_connections_total Total connections accepted\n");
1402 output.push_str("# TYPE heliosdb_proxy_connections_total counter\n");
1403 output.push_str(&format!(
1404 "heliosdb_proxy_connections_total {}\n",
1405 metrics.connections_accepted
1406 ));
1407
1408 output.push_str("# HELP heliosdb_proxy_connections_closed Total connections closed\n");
1409 output.push_str("# TYPE heliosdb_proxy_connections_closed counter\n");
1410 output.push_str(&format!(
1411 "heliosdb_proxy_connections_closed {}\n",
1412 metrics.connections_closed
1413 ));
1414
1415 output.push_str("# HELP heliosdb_proxy_queries_total Total queries processed\n");
1416 output.push_str("# TYPE heliosdb_proxy_queries_total counter\n");
1417 output.push_str(&format!(
1418 "heliosdb_proxy_queries_total {}\n",
1419 metrics.queries_processed
1420 ));
1421
1422 output.push_str("# HELP heliosdb_proxy_bytes_received_total Total bytes received\n");
1423 output.push_str("# TYPE heliosdb_proxy_bytes_received_total counter\n");
1424 output.push_str(&format!(
1425 "heliosdb_proxy_bytes_received_total {}\n",
1426 metrics.bytes_received
1427 ));
1428
1429 output.push_str("# HELP heliosdb_proxy_bytes_sent_total Total bytes sent\n");
1430 output.push_str("# TYPE heliosdb_proxy_bytes_sent_total counter\n");
1431 output.push_str(&format!(
1432 "heliosdb_proxy_bytes_sent_total {}\n",
1433 metrics.bytes_sent
1434 ));
1435
1436 output.push_str("# HELP heliosdb_proxy_failovers_total Total failovers\n");
1437 output.push_str("# TYPE heliosdb_proxy_failovers_total counter\n");
1438 output.push_str(&format!(
1439 "heliosdb_proxy_failovers_total {}\n",
1440 metrics.failovers
1441 ));
1442
1443 output
1444 }
1445
1446 async fn send_response(
1448 writer: &mut tokio::net::tcp::WriteHalf<'_>,
1449 status: u16,
1450 status_text: &str,
1451 body: &str,
1452 ) -> Result<()> {
1453 let response = format!(
1454 "HTTP/1.1 {} {}\r\nContent-Type: text/plain\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
1455 status,
1456 status_text,
1457 body.len(),
1458 body
1459 );
1460
1461 writer
1462 .write_all(response.as_bytes())
1463 .await
1464 .map_err(|e| ProxyError::Network(format!("Write error: {}", e)))?;
1465
1466 Ok(())
1467 }
1468
1469 async fn send_json_response<T: Serialize>(
1471 writer: &mut tokio::net::tcp::WriteHalf<'_>,
1472 status: u16,
1473 body: &T,
1474 ) -> Result<()> {
1475 let json = serde_json::to_string(body)
1476 .map_err(|e| ProxyError::Internal(format!("JSON error: {}", e)))?;
1477
1478 let status_text = match status {
1479 200 => "OK",
1480 400 => "Bad Request",
1481 404 => "Not Found",
1482 500 => "Internal Server Error",
1483 503 => "Service Unavailable",
1484 _ => "Unknown",
1485 };
1486
1487 let response = format!(
1488 "HTTP/1.1 {} {}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
1489 status,
1490 status_text,
1491 json.len(),
1492 json
1493 );
1494
1495 writer
1496 .write_all(response.as_bytes())
1497 .await
1498 .map_err(|e| ProxyError::Network(format!("Write error: {}", e)))?;
1499
1500 Ok(())
1501 }
1502
1503 pub fn shutdown(&self) {
1505 let _ = self.shutdown_tx.send(());
1506 }
1507}
1508
1509impl AdminState {
1510 pub fn new() -> Self {
1512 Self {
1513 node_health: RwLock::new(HashMap::new()),
1514 metrics: RwLock::new(ServerMetricsSnapshot {
1515 connections_accepted: 0,
1516 connections_closed: 0,
1517 queries_processed: 0,
1518 bytes_received: 0,
1519 bytes_sent: 0,
1520 failovers: 0,
1521 }),
1522 active_sessions: RwLock::new(0),
1523 config_snapshot: RwLock::new(ConfigSnapshot {
1524 listen_address: String::new(),
1525 admin_address: String::new(),
1526 tr_enabled: false,
1527 tr_mode: String::new(),
1528 pool_min_connections: 0,
1529 pool_max_connections: 0,
1530 nodes: Vec::new(),
1531 }),
1532 proxy_config: RwLock::new(None),
1533 read_lb_counter: AtomicUsize::new(0),
1534 commands: RwLock::new(HashMap::new()),
1535 #[cfg(feature = "ha-tr")]
1536 replay_engine: RwLock::new(None),
1537 #[cfg(feature = "wasm-plugins")]
1538 plugin_manager: RwLock::new(None),
1539 chaos_overrides: RwLock::new(HashMap::new()),
1540 #[cfg(feature = "anomaly-detection")]
1541 anomaly_detector: RwLock::new(None),
1542 #[cfg(feature = "edge-proxy")]
1543 edge_cache: RwLock::new(None),
1544 #[cfg(feature = "edge-proxy")]
1545 edge_registry: RwLock::new(None),
1546 auth_token: RwLock::new(None),
1547 migration: RwLock::new(None),
1548 branch: RwLock::new(None),
1549 }
1550 }
1551
1552 pub async fn with_auth_token(&self, token: Option<String>) {
1554 *self.auth_token.write().await = token;
1555 }
1556
1557 pub async fn with_migration(&self, info: MigrationInfo) {
1559 *self.migration.write().await = Some(info);
1560 }
1561
1562 pub async fn with_branch(&self, cfg: crate::config::BranchConfig) {
1564 *self.branch.write().await = Some(cfg);
1565 }
1566
1567 #[cfg(feature = "anomaly-detection")]
1570 pub async fn with_anomaly_detector(&self, detector: Arc<AnomalyDetector>) {
1571 *self.anomaly_detector.write().await = Some(detector);
1572 }
1573
1574 #[cfg(feature = "edge-proxy")]
1577 pub async fn with_edge(&self, cache: Arc<EdgeCache>, registry: Arc<EdgeRegistry>) {
1578 *self.edge_cache.write().await = Some(cache);
1579 *self.edge_registry.write().await = Some(registry);
1580 }
1581
1582 #[cfg(feature = "ha-tr")]
1587 pub async fn with_replay_engine(&self, engine: Arc<ReplayEngine>) {
1588 *self.replay_engine.write().await = Some(engine);
1589 }
1590
1591 #[cfg(feature = "wasm-plugins")]
1595 pub async fn with_plugin_manager(&self, manager: Arc<PluginManager>) {
1596 *self.plugin_manager.write().await = Some(manager);
1597 }
1598
1599 pub async fn set_proxy_config(&self, config: ProxyConfig) {
1601 let mut proxy_config = self.proxy_config.write().await;
1602 *proxy_config = Some(config);
1603 }
1604
1605 pub async fn register_command<F>(&self, name: &str, handler: F)
1607 where
1608 F: Fn(&[&str]) -> Result<String> + Send + Sync + 'static,
1609 {
1610 let mut commands = self.commands.write().await;
1611 commands.insert(name.to_string(), Arc::new(handler));
1612 }
1613
1614 pub async fn execute_command(&self, name: &str, args: &[&str]) -> Result<String> {
1616 let commands = self.commands.read().await;
1617 match commands.get(name) {
1618 Some(handler) => handler(args),
1619 None => Err(ProxyError::Internal(format!("Unknown command: {}", name))),
1620 }
1621 }
1622}
1623
1624impl Default for AdminState {
1625 fn default() -> Self {
1626 Self::new()
1627 }
1628}
1629
1630#[derive(Debug, Deserialize)]
1634struct SqlRequest {
1635 query: String,
1637}
1638
1639#[derive(Debug, Serialize)]
1641struct SqlResponse {
1642 query_type: String,
1644 routed_to: String,
1646 node_role: String,
1648 result: serde_json::Value,
1650}
1651
1652#[derive(Serialize)]
1653struct HealthResponse {
1654 status: &'static str,
1655}
1656
1657#[derive(Serialize)]
1658struct ReadinessResponse {
1659 ready: bool,
1660 message: &'static str,
1661}
1662
1663#[derive(Serialize)]
1664struct LivenessResponse {
1665 alive: bool,
1666}
1667
1668#[derive(Serialize)]
1669struct ErrorResponse {
1670 error: String,
1671}
1672
1673#[derive(Serialize)]
1674struct MetricsResponse {
1675 connections_accepted: u64,
1676 connections_closed: u64,
1677 connections_active: u64,
1678 queries_processed: u64,
1679 bytes_received: u64,
1680 bytes_sent: u64,
1681 failovers: u64,
1682}
1683
1684impl From<ServerMetricsSnapshot> for MetricsResponse {
1685 fn from(m: ServerMetricsSnapshot) -> Self {
1686 Self {
1687 connections_accepted: m.connections_accepted,
1688 connections_closed: m.connections_closed,
1689 connections_active: m.connections_accepted.saturating_sub(m.connections_closed),
1690 queries_processed: m.queries_processed,
1691 bytes_received: m.bytes_received,
1692 bytes_sent: m.bytes_sent,
1693 failovers: m.failovers,
1694 }
1695 }
1696}
1697
1698#[derive(Serialize)]
1699struct NodeHealthResponse {
1700 address: String,
1701 healthy: bool,
1702 last_check: String,
1703 failure_count: u32,
1704 last_error: Option<String>,
1705 latency_ms: f64,
1706 replication_lag_bytes: Option<u64>,
1707}
1708
1709impl From<NodeHealth> for NodeHealthResponse {
1710 fn from(h: NodeHealth) -> Self {
1711 Self {
1712 address: h.address,
1713 healthy: h.healthy,
1714 last_check: h.last_check.to_rfc3339(),
1715 failure_count: h.failure_count,
1716 last_error: h.last_error,
1717 latency_ms: h.latency_ms,
1718 replication_lag_bytes: h.replication_lag_bytes,
1719 }
1720 }
1721}
1722
1723#[derive(Serialize)]
1724struct SessionsResponse {
1725 active_sessions: u64,
1726}
1727
1728#[cfg(feature = "edge-proxy")]
1730#[derive(Debug, Deserialize)]
1731struct EdgeRegisterBody {
1732 edge_id: String,
1733 region: String,
1734 base_url: String,
1735}
1736
1737#[cfg(feature = "edge-proxy")]
1741#[derive(Debug, Deserialize)]
1742struct EdgeInvalidateBody {
1743 #[serde(default)]
1744 tables: Vec<String>,
1745 #[serde(default)]
1746 up_to_version: Option<u64>,
1747}
1748
1749#[cfg(feature = "anomaly-detection")]
1752fn parse_limit_query(path: &str, default: usize, max: usize) -> usize {
1753 let q = match path.find('?') {
1754 Some(i) => &path[i + 1..],
1755 None => return default,
1756 };
1757 for kv in q.split('&') {
1758 let mut it = kv.splitn(2, '=');
1759 if let (Some(k), Some(v)) = (it.next(), it.next()) {
1760 if k == "limit" {
1761 if let Ok(n) = v.parse::<usize>() {
1762 return n.min(max);
1763 }
1764 }
1765 }
1766 }
1767 default
1768}
1769
1770#[cfg(feature = "ha-tr")]
1772#[derive(Debug, Deserialize)]
1773struct ShadowRequestBody {
1774 sql: String,
1776 #[serde(default)]
1779 params: Option<Vec<String>>,
1780
1781 source_host: String,
1784 source_port: u16,
1785 #[serde(default)]
1786 source_user: Option<String>,
1787 #[serde(default)]
1788 source_password: Option<String>,
1789 #[serde(default)]
1790 source_database: Option<String>,
1791
1792 shadow_host: String,
1795 shadow_port: u16,
1796 #[serde(default)]
1797 shadow_user: Option<String>,
1798 #[serde(default)]
1799 shadow_password: Option<String>,
1800 #[serde(default)]
1801 shadow_database: Option<String>,
1802}
1803
1804#[derive(Debug, Deserialize)]
1809#[serde(tag = "action", rename_all = "snake_case")]
1810enum ChaosAction {
1811 ForceUnhealthy { target_node: String },
1815 Restore { target_node: String },
1817 Reset,
1819}
1820
1821#[cfg(feature = "wasm-plugins")]
1824#[derive(Serialize)]
1825struct PluginListEntry {
1826 name: String,
1827 version: String,
1828 description: String,
1829 hooks: Vec<String>,
1831 state: String,
1833 invocations: u64,
1834 errors: u64,
1835}
1836
1837#[cfg(feature = "ha-tr")]
1839#[derive(Debug, Deserialize)]
1840struct ReplayRequestBody {
1841 from: DateTime<Utc>,
1843 to: DateTime<Utc>,
1845 target_host: String,
1847 target_port: u16,
1849 #[serde(default)]
1855 target_user: Option<String>,
1856 #[serde(default)]
1857 target_password: Option<String>,
1858 #[serde(default)]
1859 target_database: Option<String>,
1860}
1861
1862#[derive(Serialize)]
1866struct TopologyResponse {
1867 #[serde(rename = "currentPrimary")]
1868 current_primary: Option<String>,
1869 #[serde(rename = "healthyNodes")]
1870 healthy_nodes: u32,
1871 #[serde(rename = "unhealthyNodes")]
1872 unhealthy_nodes: u32,
1873 #[serde(rename = "totalNodes")]
1874 total_nodes: u32,
1875 #[serde(rename = "lastFailoverAt")]
1878 last_failover_at: Option<String>,
1879}
1880
1881#[derive(Serialize)]
1882struct PoolStatsResponse {
1883 node: String,
1884 active_connections: u64,
1885 idle_connections: u64,
1886 pending_requests: u64,
1887 total_connections_created: u64,
1888 total_connections_closed: u64,
1889}
1890
1891#[derive(Serialize)]
1892struct VersionResponse {
1893 version: String,
1894 build_time: String,
1895}
1896
1897#[cfg(test)]
1898mod tests {
1899 use super::*;
1900
1901 #[tokio::test]
1902 async fn test_admin_state_creation() {
1903 let state = AdminState::new();
1904 let sessions = state.active_sessions.read().await;
1905 assert_eq!(*sessions, 0);
1906 }
1907
1908 #[test]
1909 fn test_admin_authorized() {
1910 let h = |s: &str| vec!["GET /topology HTTP/1.1".to_string(), s.to_string()];
1911 assert!(AdminServer::admin_authorized(&h("Authorization: Bearer s3cret"), "s3cret"));
1912 assert!(AdminServer::admin_authorized(&h("authorization: Bearer s3cret"), "s3cret"));
1914 assert!(!AdminServer::admin_authorized(&h("Authorization: Bearer nope"), "s3cret"));
1916 assert!(!AdminServer::admin_authorized(&h("Authorization: Basic s3cret"), "s3cret"));
1917 assert!(!AdminServer::admin_authorized(&["GET / HTTP/1.1".to_string()], "s3cret"));
1918 }
1919
1920 #[test]
1921 fn test_constant_time_eq_str() {
1922 assert!(constant_time_eq_str("abc", "abc"));
1923 assert!(!constant_time_eq_str("abc", "abd"));
1924 assert!(!constant_time_eq_str("abc", "abcd"));
1925 }
1926
1927 #[tokio::test]
1928 async fn test_readiness_check_no_nodes() {
1929 let state = Arc::new(AdminState::new());
1930 let ready = AdminServer::check_readiness(&state).await;
1931 assert!(!ready);
1932 }
1933
1934 #[tokio::test]
1935 async fn test_readiness_check_with_healthy_node() {
1936 let state = Arc::new(AdminState::new());
1937
1938 {
1939 let mut health = state.node_health.write().await;
1940 health.insert(
1941 "localhost:5432".to_string(),
1942 NodeHealth {
1943 address: "localhost:5432".to_string(),
1944 healthy: true,
1945 last_check: chrono::Utc::now(),
1946 failure_count: 0,
1947 last_error: None,
1948 latency_ms: 1.0,
1949 replication_lag_bytes: None,
1950 },
1951 );
1952 }
1953
1954 let ready = AdminServer::check_readiness(&state).await;
1955 assert!(ready);
1956 }
1957
1958 #[tokio::test]
1959 async fn test_command_registration() {
1960 let state = AdminState::new();
1961
1962 state
1963 .register_command("test", |args| {
1964 Ok(format!("Test command with {} args", args.len()))
1965 })
1966 .await;
1967
1968 let result = state.execute_command("test", &["arg1", "arg2"]).await;
1969 assert!(result.is_ok());
1970 assert_eq!(result.unwrap(), "Test command with 2 args");
1971 }
1972
1973 #[tokio::test]
1974 async fn test_unknown_command() {
1975 let state = AdminState::new();
1976 let result = state.execute_command("unknown", &[]).await;
1977 assert!(result.is_err());
1978 }
1979
1980 #[test]
1981 fn test_prometheus_metrics_format() {
1982 let metrics = ServerMetricsSnapshot {
1983 connections_accepted: 100,
1984 connections_closed: 50,
1985 queries_processed: 1000,
1986 bytes_received: 50000,
1987 bytes_sent: 100000,
1988 failovers: 2,
1989 };
1990
1991 let output = AdminServer::format_prometheus_metrics(&metrics);
1992 assert!(output.contains("heliosdb_proxy_connections_total 100"));
1993 assert!(output.contains("heliosdb_proxy_queries_total 1000"));
1994 assert!(output.contains("heliosdb_proxy_failovers_total 2"));
1995 }
1996
1997 #[test]
1998 fn test_metrics_response_active_connections() {
1999 let snapshot = ServerMetricsSnapshot {
2000 connections_accepted: 100,
2001 connections_closed: 30,
2002 queries_processed: 500,
2003 bytes_received: 10000,
2004 bytes_sent: 20000,
2005 failovers: 1,
2006 };
2007
2008 let response = MetricsResponse::from(snapshot);
2009 assert_eq!(response.connections_active, 70);
2010 }
2011
2012 async fn topology_state(
2015 nodes: &[(&str, &str, bool)],
2016 ) -> Arc<AdminState> {
2017 let state = Arc::new(AdminState::new());
2018 {
2019 let mut cfg = state.config_snapshot.write().await;
2020 cfg.nodes = nodes
2021 .iter()
2022 .map(|(addr, role, _)| NodeSnapshot {
2023 address: (*addr).to_string(),
2024 role: (*role).to_string(),
2025 weight: 100,
2026 enabled: true,
2027 })
2028 .collect();
2029 }
2030 {
2031 let mut health = state.node_health.write().await;
2032 for (addr, _, healthy) in nodes {
2033 health.insert(
2034 (*addr).to_string(),
2035 NodeHealth {
2036 address: (*addr).to_string(),
2037 healthy: *healthy,
2038 last_check: chrono::Utc::now(),
2039 failure_count: 0,
2040 last_error: None,
2041 latency_ms: 1.0,
2042 replication_lag_bytes: None,
2043 },
2044 );
2045 }
2046 }
2047 state
2048 }
2049
2050 #[tokio::test]
2051 async fn test_topology_returns_healthy_primary() {
2052 let state = topology_state(&[
2053 ("primary.svc:5432", "primary", true),
2054 ("standby-a.svc:5432", "standby", true),
2055 ("standby-b.svc:5432", "standby", false),
2056 ])
2057 .await;
2058
2059 let topo = AdminServer::compute_topology(&state).await;
2060 assert_eq!(topo.current_primary.as_deref(), Some("primary.svc:5432"));
2061 assert_eq!(topo.healthy_nodes, 2);
2062 assert_eq!(topo.unhealthy_nodes, 1);
2063 assert_eq!(topo.total_nodes, 3);
2064 }
2065
2066 #[tokio::test]
2067 async fn test_topology_no_primary_when_primary_unhealthy() {
2068 let state = topology_state(&[
2071 ("primary.svc:5432", "primary", false),
2072 ("standby.svc:5432", "standby", true),
2073 ])
2074 .await;
2075
2076 let topo = AdminServer::compute_topology(&state).await;
2077 assert_eq!(topo.current_primary, None);
2078 assert_eq!(topo.healthy_nodes, 1);
2079 assert_eq!(topo.unhealthy_nodes, 1);
2080 }
2081
2082 #[tokio::test]
2083 async fn test_topology_handles_empty_cluster() {
2084 let state = Arc::new(AdminState::new());
2085 let topo = AdminServer::compute_topology(&state).await;
2086 assert_eq!(topo.current_primary, None);
2087 assert_eq!(topo.healthy_nodes, 0);
2088 assert_eq!(topo.unhealthy_nodes, 0);
2089 assert_eq!(topo.total_nodes, 0);
2090 }
2091
2092 #[tokio::test]
2093 async fn test_topology_role_match_is_case_insensitive() {
2094 let state = topology_state(&[
2095 ("primary.svc:5432", "PRIMARY", true),
2096 ])
2097 .await;
2098 let topo = AdminServer::compute_topology(&state).await;
2099 assert_eq!(topo.current_primary.as_deref(), Some("primary.svc:5432"));
2100 }
2101
2102 #[cfg(feature = "ha-tr")]
2103 #[tokio::test]
2104 async fn test_replay_returns_503_when_engine_unattached() {
2105 let state = Arc::new(AdminState::new());
2106 let body = r#"{
2107 "from": "2026-04-25T10:00:00Z",
2108 "to": "2026-04-25T11:00:00Z",
2109 "target_host": "127.0.0.1",
2110 "target_port": 5432
2111 }"#;
2112 let (status, value) = AdminServer::handle_replay_request(Some(body), &state)
2113 .await
2114 .expect("handler returns Ok with status code");
2115 assert_eq!(status, 503);
2116 assert_eq!(value["error"], "replay engine not attached");
2117 }
2118
2119 #[cfg(feature = "ha-tr")]
2120 #[tokio::test]
2121 async fn test_replay_400_on_malformed_body() {
2122 let state = Arc::new(AdminState::new());
2123 let (status, _) = AdminServer::handle_replay_request(Some("not json"), &state)
2124 .await
2125 .expect("handler returns Ok with status code");
2126 assert_eq!(status, 400);
2127 }
2128
2129 #[cfg(feature = "ha-tr")]
2130 #[tokio::test]
2131 async fn test_replay_errors_on_empty_body() {
2132 let state = Arc::new(AdminState::new());
2133 let err = AdminServer::handle_replay_request(None, &state).await;
2134 assert!(err.is_err(), "empty body must surface as Err");
2135 }
2136
2137 #[cfg(feature = "wasm-plugins")]
2138 #[tokio::test]
2139 async fn test_plugins_list_returns_503_when_manager_unattached() {
2140 let state = Arc::new(AdminState::new());
2141 let (status, value) = AdminServer::handle_plugins_list(&state)
2142 .await
2143 .expect("handler returns Ok with status code");
2144 assert_eq!(status, 503);
2145 assert_eq!(value["error"], "plugin manager not attached");
2146 }
2147
2148 #[cfg(not(feature = "wasm-plugins"))]
2149 #[tokio::test]
2150 async fn test_plugins_list_503_without_feature() {
2151 let state = Arc::new(AdminState::new());
2152 let (status, _) = AdminServer::handle_plugins_list(&state)
2153 .await
2154 .expect("handler returns Ok");
2155 assert_eq!(status, 503);
2156 }
2157
2158 async fn chaos_state_with_node(addr: &str) -> Arc<AdminState> {
2160 let state = Arc::new(AdminState::new());
2161 state.node_health.write().await.insert(
2162 addr.to_string(),
2163 NodeHealth {
2164 address: addr.to_string(),
2165 healthy: true,
2166 last_check: chrono::Utc::now(),
2167 failure_count: 0,
2168 last_error: None,
2169 latency_ms: 1.0,
2170 replication_lag_bytes: None,
2171 },
2172 );
2173 state
2174 }
2175
2176 #[tokio::test]
2177 async fn test_chaos_force_unhealthy_flips_node_and_records_override() {
2178 let state = chaos_state_with_node("primary.svc:5432").await;
2179 let body = r#"{"action":"force_unhealthy","target_node":"primary.svc:5432"}"#;
2180 let (status, value) = AdminServer::handle_chaos_request(Some(body), &state)
2181 .await
2182 .expect("handler returns Ok");
2183 assert_eq!(status, 200);
2184 assert_eq!(value["applied"], "force_unhealthy");
2185 assert!(!state.node_health.read().await["primary.svc:5432"].healthy);
2187 assert!(state.chaos_overrides.read().await.contains_key("primary.svc:5432"));
2189 }
2190
2191 #[tokio::test]
2192 async fn test_chaos_restore_clears_override_and_flips_back() {
2193 let state = chaos_state_with_node("primary.svc:5432").await;
2194 let _ = AdminServer::handle_chaos_request(
2195 Some(r#"{"action":"force_unhealthy","target_node":"primary.svc:5432"}"#),
2196 &state,
2197 )
2198 .await
2199 .unwrap();
2200 let (status, _) = AdminServer::handle_chaos_request(
2201 Some(r#"{"action":"restore","target_node":"primary.svc:5432"}"#),
2202 &state,
2203 )
2204 .await
2205 .unwrap();
2206 assert_eq!(status, 200);
2207 assert!(state.node_health.read().await["primary.svc:5432"].healthy);
2208 assert!(state.chaos_overrides.read().await.is_empty());
2209 }
2210
2211 #[tokio::test]
2212 async fn test_chaos_reset_restores_all_overrides() {
2213 let state = chaos_state_with_node("a:5432").await;
2214 state.node_health.write().await.insert(
2215 "b:5432".to_string(),
2216 NodeHealth {
2217 address: "b:5432".to_string(),
2218 healthy: true,
2219 last_check: chrono::Utc::now(),
2220 failure_count: 0,
2221 last_error: None,
2222 latency_ms: 1.0,
2223 replication_lag_bytes: None,
2224 },
2225 );
2226 for addr in &["a:5432", "b:5432"] {
2227 let body = format!(r#"{{"action":"force_unhealthy","target_node":"{}"}}"#, addr);
2228 let _ = AdminServer::handle_chaos_request(Some(&body), &state)
2229 .await
2230 .unwrap();
2231 }
2232 let (status, value) = AdminServer::handle_chaos_request(
2233 Some(r#"{"action":"reset"}"#),
2234 &state,
2235 )
2236 .await
2237 .unwrap();
2238 assert_eq!(status, 200);
2239 assert_eq!(value["reset"], true);
2240 let restored = value["restored"].as_array().unwrap();
2241 assert_eq!(restored.len(), 2);
2242 for addr in &["a:5432", "b:5432"] {
2244 assert!(state.node_health.read().await[*addr].healthy);
2245 }
2246 assert!(state.chaos_overrides.read().await.is_empty());
2247 }
2248
2249 #[tokio::test]
2250 async fn test_chaos_force_unhealthy_404s_when_node_unknown() {
2251 let state = Arc::new(AdminState::new());
2252 let body = r#"{"action":"force_unhealthy","target_node":"missing.svc:5432"}"#;
2253 let (status, _) = AdminServer::handle_chaos_request(Some(body), &state)
2254 .await
2255 .expect("handler returns Ok");
2256 assert_eq!(status, 404);
2257 }
2258
2259 #[tokio::test]
2260 async fn test_chaos_400_on_malformed_body() {
2261 let state = Arc::new(AdminState::new());
2262 let (status, _) = AdminServer::handle_chaos_request(Some("not json"), &state)
2263 .await
2264 .expect("handler returns Ok");
2265 assert_eq!(status, 400);
2266 }
2267
2268 #[tokio::test]
2269 async fn test_chaos_400_on_unknown_action() {
2270 let state = Arc::new(AdminState::new());
2271 let body = r#"{"action":"format_disk","target_node":"x"}"#;
2272 let (status, _) = AdminServer::handle_chaos_request(Some(body), &state)
2273 .await
2274 .expect("handler returns Ok");
2275 assert_eq!(status, 400);
2276 }
2277
2278 #[cfg(feature = "ha-tr")]
2279 #[tokio::test]
2280 async fn test_shadow_400_on_malformed_body() {
2281 let (status, _) = AdminServer::handle_shadow_request(Some("not json"))
2282 .await
2283 .expect("handler returns Ok");
2284 assert_eq!(status, 400);
2285 }
2286
2287 #[cfg(feature = "ha-tr")]
2288 #[tokio::test]
2289 async fn test_shadow_500_on_source_unreachable() {
2290 let body = r#"{
2293 "sql": "SELECT 1",
2294 "source_host": "127.0.0.1",
2295 "source_port": 1,
2296 "shadow_host": "127.0.0.1",
2297 "shadow_port": 1
2298 }"#;
2299 let (status, value) = AdminServer::handle_shadow_request(Some(body))
2300 .await
2301 .expect("handler returns Ok");
2302 assert_eq!(status, 500);
2303 let err = value["error"].as_str().expect("error field");
2304 assert!(
2305 err.contains("source connect"),
2306 "expected source connect error, got {}",
2307 err
2308 );
2309 }
2310
2311 #[cfg(feature = "ha-tr")]
2312 #[tokio::test]
2313 async fn test_shadow_errors_on_empty_body() {
2314 let err = AdminServer::handle_shadow_request(None).await;
2315 assert!(err.is_err(), "empty body must surface as Err");
2316 }
2317
2318 #[cfg(feature = "anomaly-detection")]
2319 #[tokio::test]
2320 async fn test_anomalies_returns_503_when_detector_unattached() {
2321 let state = Arc::new(AdminState::new());
2322 let (status, value) =
2323 AdminServer::handle_anomalies_list("/anomalies", &state)
2324 .await
2325 .expect("handler returns Ok");
2326 assert_eq!(status, 503);
2327 assert_eq!(value["error"], "anomaly detector not attached");
2328 }
2329
2330 #[cfg(feature = "anomaly-detection")]
2331 #[tokio::test]
2332 async fn test_anomalies_returns_attached_detector_events() {
2333 use crate::anomaly::{AnomalyConfig, AnomalyDetector, QueryObservation};
2334 let state = Arc::new(AdminState::new());
2335 let det = Arc::new(AnomalyDetector::new(AnomalyConfig::default()));
2336 let _ = det.record_query(&QueryObservation {
2338 tenant: "test".into(),
2339 fingerprint: "fp".into(),
2340 sql: "SELECT * FROM users WHERE id = 1 OR 1=1 --".into(),
2341 timestamp: std::time::Instant::now(),
2342 });
2343 state.with_anomaly_detector(det.clone()).await;
2344
2345 let (status, value) =
2346 AdminServer::handle_anomalies_list("/anomalies", &state)
2347 .await
2348 .expect("handler returns Ok");
2349 assert_eq!(status, 200);
2350 let count = value["count"].as_u64().expect("count field");
2351 assert!(count > 0, "expected at least one event, got {}", count);
2352 }
2353
2354 #[cfg(feature = "anomaly-detection")]
2355 #[tokio::test]
2356 async fn test_anomalies_limit_query_string_respected() {
2357 use crate::anomaly::{AnomalyConfig, AnomalyDetector, QueryObservation};
2358 let state = Arc::new(AdminState::new());
2359 let det = Arc::new(AnomalyDetector::new(AnomalyConfig::default()));
2360 for i in 0..50 {
2361 let fp = format!("fp{}", i);
2362 let _ = det.record_query(&QueryObservation {
2363 tenant: "test".into(),
2364 fingerprint: fp,
2365 sql: "SELECT 1".into(),
2366 timestamp: std::time::Instant::now(),
2367 });
2368 }
2369 state.with_anomaly_detector(det).await;
2370
2371 let (status, value) =
2372 AdminServer::handle_anomalies_list("/anomalies?limit=5", &state)
2373 .await
2374 .expect("handler returns Ok");
2375 assert_eq!(status, 200);
2376 assert_eq!(value["limit"].as_u64().unwrap(), 5);
2377 assert_eq!(value["events"].as_array().unwrap().len(), 5);
2378 }
2379
2380 #[cfg(feature = "anomaly-detection")]
2381 #[test]
2382 fn test_parse_limit_query_helper() {
2383 assert_eq!(parse_limit_query("/anomalies", 100, 1024), 100);
2384 assert_eq!(parse_limit_query("/anomalies?limit=42", 100, 1024), 42);
2385 assert_eq!(parse_limit_query("/anomalies?limit=99999", 100, 1024), 1024);
2386 assert_eq!(parse_limit_query("/anomalies?limit=abc", 100, 1024), 100);
2387 assert_eq!(parse_limit_query("/anomalies?other=x&limit=7", 100, 1024), 7);
2388 }
2389
2390 #[cfg(feature = "edge-proxy")]
2391 async fn edge_state() -> Arc<AdminState> {
2392 use crate::edge::{EdgeCache, EdgeRegistry};
2393 use std::time::Duration;
2394 let s = Arc::new(AdminState::new());
2395 let cache = Arc::new(EdgeCache::new(100));
2396 let registry = Arc::new(EdgeRegistry::new(8, Duration::from_secs(60)));
2397 s.with_edge(cache, registry).await;
2398 s
2399 }
2400
2401 #[cfg(feature = "edge-proxy")]
2402 #[tokio::test]
2403 async fn test_edge_status_returns_empty_lists_initially() {
2404 let s = edge_state().await;
2405 let (status, value) = AdminServer::handle_edge_status(&s)
2406 .await
2407 .expect("handler returns Ok");
2408 assert_eq!(status, 200);
2409 assert_eq!(value["edge_count"].as_u64().unwrap(), 0);
2410 assert_eq!(value["registered"].as_array().unwrap().len(), 0);
2411 assert!(value["cache"].is_object(), "cache stats present");
2412 }
2413
2414 #[cfg(feature = "edge-proxy")]
2415 #[tokio::test]
2416 async fn test_edge_register_then_status_lists_edge() {
2417 let s = edge_state().await;
2418 let body = r#"{"edge_id":"e1","region":"us-east","base_url":"https://e1.svc"}"#;
2419 let (status, _) = AdminServer::handle_edge_register(Some(body), &s)
2420 .await
2421 .expect("handler ok");
2422 assert_eq!(status, 201);
2423 let (status2, value2) = AdminServer::handle_edge_status(&s).await.unwrap();
2424 assert_eq!(status2, 200);
2425 assert_eq!(value2["edge_count"].as_u64().unwrap(), 1);
2426 assert_eq!(
2427 value2["registered"][0]["edge_id"].as_str().unwrap(),
2428 "e1"
2429 );
2430 }
2431
2432 #[cfg(feature = "edge-proxy")]
2433 #[tokio::test]
2434 async fn test_edge_register_400_on_malformed_body() {
2435 let s = edge_state().await;
2436 let (status, _) = AdminServer::handle_edge_register(Some("not json"), &s)
2437 .await
2438 .expect("handler ok");
2439 assert_eq!(status, 400);
2440 }
2441
2442 #[cfg(feature = "edge-proxy")]
2443 #[tokio::test]
2444 async fn test_edge_invalidate_drops_local_cache_entries() {
2445 use crate::edge::{CacheEntry, CacheKey};
2446 use std::time::{Duration, Instant};
2447 let s = edge_state().await;
2448 let cache = s.edge_cache.read().await.clone().unwrap();
2450 cache.insert(
2451 CacheKey::new("fp1", "p1"),
2452 CacheEntry {
2453 version: 1,
2454 response_bytes: b"row".to_vec(),
2455 tables: vec!["users".into()],
2456 expires_at: Instant::now() + Duration::from_secs(60),
2457 },
2458 );
2459 assert!(cache.get(&CacheKey::new("fp1", "p1")).is_some());
2460
2461 let body = r#"{"tables":["users"]}"#;
2462 let (status, value) = AdminServer::handle_edge_invalidate(Some(body), &s)
2463 .await
2464 .expect("handler ok");
2465 assert_eq!(status, 200);
2466 assert_eq!(value["dropped_local"].as_u64().unwrap(), 1);
2467 assert!(cache.get(&CacheKey::new("fp1", "p1")).is_none());
2468 }
2469
2470 #[cfg(feature = "edge-proxy")]
2471 #[tokio::test]
2472 async fn test_edge_invalidate_503_when_cache_unattached() {
2473 let s = Arc::new(AdminState::new());
2474 let body = r#"{"tables":["users"]}"#;
2475 let (status, _) = AdminServer::handle_edge_invalidate(Some(body), &s)
2476 .await
2477 .expect("handler ok");
2478 assert_eq!(status, 503);
2479 }
2480}