Skip to main content

heliosdb_proxy/
admin.rs

1//! Admin API
2//!
3//! REST API for proxy management, monitoring, and configuration.
4//! Includes HTTP SQL API for transparent write routing (TWR) and load balancing.
5
6#[cfg(feature = "anomaly-detection")]
7use crate::anomaly::AnomalyDetector;
8use crate::config::{NodeConfig, NodeRole, ProxyConfig};
9#[cfg(feature = "edge-proxy")]
10use crate::edge::{EdgeCache, EdgeRegistry, InvalidationEvent};
11#[cfg(feature = "wasm-plugins")]
12use crate::plugins::PluginManager;
13#[cfg(feature = "ha-tr")]
14use crate::replay::{ReplayEngine, TimeTravelRequest};
15use crate::server::{NodeHealth, ServerMetricsSnapshot};
16use crate::{ProxyError, Result};
17#[cfg(feature = "ha-tr")]
18use chrono::{DateTime, Utc};
19use serde::{Deserialize, Serialize};
20use std::collections::HashMap;
21use std::net::SocketAddr;
22use std::sync::atomic::{AtomicUsize, Ordering};
23use std::sync::Arc;
24use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
25use tokio::net::TcpStream;
26use tokio::sync::{broadcast, RwLock};
27
28/// Static admin UI (vanilla HTML + JS). Compiled into the binary via
29/// `include_str!` so deployments are a single binary — no extra file
30/// serving or asset bundling. Served at `GET /` and `GET /ui`.
31const ADMIN_UI_HTML: &str = include_str!("admin_ui.html");
32
33/// Admin API server
34/// Constant-time string comparison (admin token check).
35fn constant_time_eq_str(a: &str, b: &str) -> bool {
36    let (a, b) = (a.as_bytes(), b.as_bytes());
37    if a.len() != b.len() {
38        return false;
39    }
40    let mut diff = 0u8;
41    for i in 0..a.len() {
42        diff |= a[i] ^ b[i];
43    }
44    diff == 0
45}
46
47pub struct AdminServer {
48    /// Listen address
49    listen_address: String,
50    /// Shared state with proxy
51    state: Arc<AdminState>,
52    /// Shutdown channel
53    shutdown_tx: broadcast::Sender<()>,
54}
55
56/// Shared admin state
57pub struct AdminState {
58    /// Node health status
59    pub node_health: RwLock<HashMap<String, NodeHealth>>,
60    /// Server metrics
61    pub metrics: RwLock<ServerMetricsSnapshot>,
62    /// Active sessions count
63    pub active_sessions: RwLock<u64>,
64    /// Configuration (read-only)
65    pub config_snapshot: RwLock<ConfigSnapshot>,
66    /// Full proxy config (for SQL routing)
67    pub proxy_config: RwLock<Option<ProxyConfig>>,
68    /// Round-robin counter for read load balancing
69    read_lb_counter: AtomicUsize,
70    /// Registered command handlers
71    commands: RwLock<HashMap<String, CommandHandler>>,
72    /// Time-travel replay engine. Optional so test fixtures don't have
73    /// to wire a backend template; production startup attaches it via
74    /// `with_replay_engine`. Endpoint returns 503 when missing.
75    #[cfg(feature = "ha-tr")]
76    pub replay_engine: RwLock<Option<Arc<ReplayEngine>>>,
77    /// WASM plugin manager. None when the proxy started without
78    /// plugins (or with a different feature set). `/plugins`
79    /// endpoint returns 503 when missing; UI panel says "no plugin
80    /// manager attached".
81    #[cfg(feature = "wasm-plugins")]
82    pub plugin_manager: RwLock<Option<Arc<PluginManager>>>,
83    /// Chaos-mode overrides: per-node-address marker that the chaos
84    /// system (POST /api/chaos) has forced this node to a particular
85    /// state. Lets the UI distinguish "operationally disabled" from
86    /// "chaos-injected fault" and lets `Reset` restore everything.
87    pub chaos_overrides: RwLock<HashMap<String, ChaosOverride>>,
88    /// Anomaly detector — same Arc the server populates from the
89    /// query path. /api/anomalies polls this for the recent-events
90    /// ring buffer.
91    #[cfg(feature = "anomaly-detection")]
92    pub anomaly_detector: RwLock<Option<Arc<AnomalyDetector>>>,
93    /// Edge proxy cache + registry. Cache surfaces stats; registry
94    /// is the home-side fanout for invalidations.
95    #[cfg(feature = "edge-proxy")]
96    pub edge_cache: RwLock<Option<Arc<EdgeCache>>>,
97    #[cfg(feature = "edge-proxy")]
98    pub edge_registry: RwLock<Option<Arc<EdgeRegistry>>>,
99    /// Bearer token required on admin requests (except liveness probes).
100    /// `None` = open. Set once at startup from `config.admin_token`.
101    pub auth_token: RwLock<Option<String>>,
102    /// Traffic-mirror / migration info for `/api/migration/status`. `Some`
103    /// when `[mirror] enabled`.
104    pub migration: RwLock<Option<MigrationInfo>>,
105    /// Branch-database config for `/api/branch`. `Some` when `[branch]
106    /// enabled`.
107    pub branch: RwLock<Option<crate::config::BranchConfig>>,
108}
109
110/// What the admin API needs to report migration status, without owning the
111/// mirror worker.
112#[derive(Clone)]
113pub struct MigrationInfo {
114    pub target: String,
115    pub writes_only: bool,
116    pub metrics: Arc<crate::mirror::MirrorMetrics>,
117    /// Mirror config (source + target) for snapshot bootstrap.
118    pub config: crate::config::MirrorConfig,
119    /// The proxy's cutover switch and the target to promote to.
120    pub cutover: Arc<arc_swap::ArcSwap<Option<Arc<crate::mirror::CutoverTarget>>>>,
121    pub cutover_target: crate::mirror::CutoverTarget,
122}
123
124/// Chaos override applied to a single node. Today only the
125/// `ForceUnhealthy` flavour is implemented — `inject_query_delay`
126/// is the natural follow-up but wants per-query interception that
127/// lives in the server message loop, not here.
128#[derive(Debug, Clone, Serialize)]
129pub struct ChaosOverride {
130    /// Wall-clock when the override was applied (RFC 3339).
131    pub since: String,
132    /// "force_unhealthy" | "delay_ms"
133    pub kind: String,
134    /// Free-form description shown in admin UI.
135    pub note: String,
136}
137
138
139/// Command handler type
140type CommandHandler = Arc<dyn Fn(&[&str]) -> Result<String> + Send + Sync>;
141
142/// Configuration snapshot for admin API
143#[derive(Debug, Clone, Serialize, Deserialize)]
144pub struct ConfigSnapshot {
145    pub listen_address: String,
146    pub admin_address: String,
147    pub tr_enabled: bool,
148    pub tr_mode: String,
149    pub pool_min_connections: usize,
150    pub pool_max_connections: usize,
151    pub nodes: Vec<NodeSnapshot>,
152}
153
154/// Node configuration snapshot
155#[derive(Debug, Clone, Serialize, Deserialize)]
156pub struct NodeSnapshot {
157    pub address: String,
158    pub role: String,
159    pub weight: u32,
160    pub enabled: bool,
161}
162
163impl AdminServer {
164    /// Create a new admin server
165    pub fn new(listen_address: String, state: Arc<AdminState>) -> Self {
166        let (shutdown_tx, _) = broadcast::channel(1);
167
168        Self {
169            listen_address,
170            state,
171            shutdown_tx,
172        }
173    }
174
175    /// Run the admin server
176    pub async fn run(&self) -> Result<()> {
177        // SO_REUSEPORT like the client listener, so a binary handoff can re-bind
178        // the admin address concurrently while the old process drains (Batch H).
179        let listener = crate::server::bind_reuseport(&self.listen_address)?;
180
181        tracing::info!("Admin API listening on {} (SO_REUSEPORT)", self.listen_address);
182
183        let mut shutdown_rx = self.shutdown_tx.subscribe();
184
185        loop {
186            tokio::select! {
187                accept_result = listener.accept() => {
188                    match accept_result {
189                        Ok((stream, addr)) => {
190                            let state = self.state.clone();
191                            tokio::spawn(async move {
192                                if let Err(e) = Self::handle_connection(stream, addr, state).await {
193                                    tracing::error!("Admin connection error: {}", e);
194                                }
195                            });
196                        }
197                        Err(e) => {
198                            tracing::error!("Admin accept error: {}", e);
199                        }
200                    }
201                }
202                _ = shutdown_rx.recv() => {
203                    tracing::info!("Admin server shutting down");
204                    break;
205                }
206            }
207        }
208
209        Ok(())
210    }
211
212    /// Handle an admin connection
213    async fn handle_connection(
214        mut stream: TcpStream,
215        addr: SocketAddr,
216        state: Arc<AdminState>,
217    ) -> Result<()> {
218        tracing::debug!("Admin connection from {}", addr);
219
220        let (reader, mut writer) = stream.split();
221        let mut reader = BufReader::new(reader);
222        let mut line = String::new();
223
224        // Read HTTP request headers
225        let mut headers = Vec::new();
226        let mut content_length: usize = 0;
227
228        loop {
229            line.clear();
230            let bytes_read = reader
231                .read_line(&mut line)
232                .await
233                .map_err(|e| ProxyError::Network(format!("Read error: {}", e)))?;
234
235            if bytes_read == 0 || line == "\r\n" {
236                break;
237            }
238
239            // Parse Content-Length header
240            let trimmed = line.trim();
241            if trimmed.to_lowercase().starts_with("content-length:") {
242                if let Some(len_str) = trimmed.split(':').nth(1) {
243                    content_length = len_str.trim().parse().unwrap_or(0);
244                }
245            }
246            headers.push(trimmed.to_string());
247        }
248
249        if headers.is_empty() {
250            return Ok(());
251        }
252
253        // Parse request line
254        let request_line = &headers[0];
255        let parts: Vec<&str> = request_line.split_whitespace().collect();
256
257        if parts.len() < 2 {
258            Self::send_response(&mut writer, 400, "Bad Request", "Invalid request line").await?;
259            return Ok(());
260        }
261
262        let method = parts[0];
263        let path = parts[1];
264
265        // Bearer-token gate. Liveness probes stay open so orchestrators can
266        // health-check without the token; everything else is rejected with
267        // 401 unless `Authorization: Bearer <token>` matches.
268        {
269            let required = state.auth_token.read().await.clone();
270            if let Some(token) = required {
271                let path_only = path.split('?').next().unwrap_or(path);
272                let is_liveness = method == "GET"
273                    && matches!(path_only, "/health" | "/healthz" | "/livez" | "/readyz");
274                if !is_liveness && !Self::admin_authorized(&headers, &token) {
275                    Self::send_response(
276                        &mut writer,
277                        401,
278                        "Unauthorized",
279                        "{\"error\":\"missing or invalid admin bearer token\"}",
280                    )
281                    .await?;
282                    return Ok(());
283                }
284            }
285        }
286
287        // Read request body for POST/PUT requests
288        let body = if content_length > 0 && (method == "POST" || method == "PUT") {
289            let mut body_buf = vec![0u8; content_length];
290            reader.read_exact(&mut body_buf).await
291                .map_err(|e| ProxyError::Network(format!("Body read error: {}", e)))?;
292            Some(String::from_utf8_lossy(&body_buf).to_string())
293        } else {
294            None
295        };
296
297        // Static admin UI — single HTML file compiled into the binary.
298        // Served at `/` and `/ui`; all other routes remain JSON.
299        if method == "GET" && (path == "/" || path == "/ui" || path == "/ui/") {
300            Self::send_html_response(&mut writer, 200, ADMIN_UI_HTML).await?;
301            return Ok(());
302        }
303
304        // Route request
305        let response = Self::route_request(method, path, body.as_deref(), &state).await;
306
307        match response {
308            Ok((status, body)) => {
309                Self::send_json_response(&mut writer, status, &body).await?;
310            }
311            Err(e) => {
312                let error = ErrorResponse {
313                    error: e.to_string(),
314                };
315                Self::send_json_response(&mut writer, 500, &error).await?;
316            }
317        }
318
319        Ok(())
320    }
321
322    /// True if the request carries `Authorization: Bearer <token>` matching
323    /// the configured admin token (constant-time compare).
324    fn admin_authorized(headers: &[String], token: &str) -> bool {
325        let expected = format!("Bearer {}", token);
326        for h in headers {
327            let mut sp = h.splitn(2, ':');
328            let name = sp.next().unwrap_or("").trim();
329            if name.eq_ignore_ascii_case("authorization") {
330                let value = sp.next().unwrap_or("").trim();
331                return constant_time_eq_str(value, &expected);
332            }
333        }
334        false
335    }
336
337    /// Serve a text/html HTTP response. Used by the admin UI route.
338    async fn send_html_response(
339        writer: &mut tokio::net::tcp::WriteHalf<'_>,
340        status: u16,
341        html: &str,
342    ) -> Result<()> {
343        let status_text = match status {
344            200 => "OK",
345            404 => "Not Found",
346            _ => "Unknown",
347        };
348        let response = format!(
349            "HTTP/1.1 {} {}\r\nContent-Type: text/html; charset=utf-8\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
350            status,
351            status_text,
352            html.len(),
353            html
354        );
355        writer
356            .write_all(response.as_bytes())
357            .await
358            .map_err(|e| ProxyError::Network(format!("Write error: {}", e)))?;
359        Ok(())
360    }
361
362    /// Route a request to the appropriate handler
363    async fn route_request(
364        method: &str,
365        path: &str,
366        body: Option<&str>,
367        state: &Arc<AdminState>,
368    ) -> Result<(u16, serde_json::Value)> {
369        match (method, path) {
370            // SQL API - Execute SQL with TWR (Transparent Write Routing)
371            ("POST", "/api/sql") => {
372                Self::handle_sql_request(body, state).await
373            }
374
375            // Health endpoints
376            ("GET", "/health") => {
377                let health = HealthResponse { status: "ok" };
378                Ok((200, serde_json::to_value(health)?))
379            }
380            ("GET", "/health/ready") => {
381                let ready = Self::check_readiness(state).await;
382                let response = ReadinessResponse {
383                    ready,
384                    message: if ready {
385                        "Proxy is ready"
386                    } else {
387                        "Proxy is not ready"
388                    },
389                };
390                let status = if ready { 200 } else { 503 };
391                Ok((status, serde_json::to_value(response)?))
392            }
393            ("GET", "/health/live") => {
394                let response = LivenessResponse { alive: true };
395                Ok((200, serde_json::to_value(response)?))
396            }
397
398            // Metrics
399            ("GET", "/metrics") => {
400                let metrics = state.metrics.read().await.clone();
401                Ok((200, serde_json::to_value(MetricsResponse::from(metrics))?))
402            }
403            ("GET", "/metrics/prometheus") => {
404                let metrics = state.metrics.read().await.clone();
405                let prometheus = Self::format_prometheus_metrics(&metrics);
406                Ok((200, serde_json::json!({ "text": prometheus })))
407            }
408
409            // Node management
410            ("GET", "/nodes") => {
411                let health = state.node_health.read().await;
412                let nodes: Vec<NodeHealthResponse> = health
413                    .values()
414                    .map(|h| NodeHealthResponse::from(h.clone()))
415                    .collect();
416                Ok((200, serde_json::to_value(nodes)?))
417            }
418            ("GET", path) if path.starts_with("/nodes/") => {
419                let node_addr = path.trim_start_matches("/nodes/");
420                let health = state.node_health.read().await;
421                match health.get(node_addr) {
422                    Some(h) => Ok((200, serde_json::to_value(NodeHealthResponse::from(h.clone()))?)),
423                    None => Ok((404, serde_json::json!({ "error": "Node not found" }))),
424                }
425            }
426            ("POST", path) if path.starts_with("/nodes/") && path.ends_with("/enable") => {
427                let node_addr = path
428                    .trim_start_matches("/nodes/")
429                    .trim_end_matches("/enable");
430                Self::set_node_enabled(state, node_addr, true).await?;
431                Ok((200, serde_json::json!({ "status": "enabled" })))
432            }
433            ("POST", path) if path.starts_with("/nodes/") && path.ends_with("/disable") => {
434                let node_addr = path
435                    .trim_start_matches("/nodes/")
436                    .trim_end_matches("/disable");
437                Self::set_node_enabled(state, node_addr, false).await?;
438                Ok((200, serde_json::json!({ "status": "disabled" })))
439            }
440
441            // Topology — joins config (role) with node_health (healthy)
442            // so external controllers (operator, ops dashboards) can
443            // populate `currentPrimary` / `healthyNodes` /
444            // `unhealthyNodes` in one round-trip. Designed for
445            // poll-friendly use; never blocks.
446            ("GET", "/topology") => {
447                let topo = Self::compute_topology(state).await;
448                Ok((200, serde_json::to_value(topo)?))
449            }
450
451            // Time-travel replay — replays a journal window against a
452            // target backend (typically a staging DB). Body shape is
453            // `ReplayRequestBody` below.
454            #[cfg(feature = "ha-tr")]
455            ("POST", "/api/replay") => Self::handle_replay_request(body, state).await,
456            #[cfg(not(feature = "ha-tr"))]
457            ("POST", "/api/replay") => Ok((
458                503,
459                serde_json::json!({ "error": "ha-tr feature not compiled in" }),
460            )),
461
462            // Shadow execution (T3.4) — runs a query against a source
463            // backend AND a shadow backend, diffs the result. Used for
464            // major-version upgrade validation, schema-migration
465            // canaries, replica-drift detection. Body is
466            // `ShadowRequestBody`.
467            #[cfg(feature = "ha-tr")]
468            ("POST", "/api/shadow") => Self::handle_shadow_request(body).await,
469            #[cfg(not(feature = "ha-tr"))]
470            ("POST", "/api/shadow") => Ok((
471                503,
472                serde_json::json!({ "error": "ha-tr feature not compiled in" }),
473            )),
474
475            // Loaded WASM plugins — name, version, hooks, state,
476            // invocation count. Returns 503 when no plugin manager
477            // is attached (proxy started without --features
478            // wasm-plugins or with plugins disabled in config).
479            ("GET", "/plugins") => Self::handle_plugins_list(state).await,
480
481            // Anomaly detector recent-events feed (T3.1). Optional
482            // ?limit query string clamps the response size; default
483            // is 100 events newest-first.
484            #[cfg(feature = "anomaly-detection")]
485            ("GET", p) if p == "/anomalies" || p.starts_with("/anomalies?") => {
486                Self::handle_anomalies_list(p, state).await
487            }
488            #[cfg(not(feature = "anomaly-detection"))]
489            ("GET", p) if p == "/anomalies" || p.starts_with("/anomalies?") => Ok((
490                503,
491                serde_json::json!({ "error": "anomaly-detection feature not compiled in" }),
492            )),
493
494            // Edge mode (T3.2). Stats panel for the home; the home's
495            // registered edges + cache stats; and a manual
496            // invalidation endpoint for ops drills.
497            #[cfg(feature = "edge-proxy")]
498            ("GET", "/api/edge") => Self::handle_edge_status(state).await,
499            #[cfg(feature = "edge-proxy")]
500            ("POST", "/api/edge/register") => {
501                Self::handle_edge_register(body, state).await
502            }
503            #[cfg(feature = "edge-proxy")]
504            ("POST", "/api/edge/invalidate") => {
505                Self::handle_edge_invalidate(body, state).await
506            }
507            #[cfg(not(feature = "edge-proxy"))]
508            ("GET", "/api/edge")
509            | ("POST", "/api/edge/register")
510            | ("POST", "/api/edge/invalidate") => Ok((
511                503,
512                serde_json::json!({ "error": "edge-proxy feature not compiled in" }),
513            )),
514
515            // Chaos engineering — controlled fault injection for HA
516            // testing. Body is `ChaosRequestBody`; supported actions
517            // are `force_unhealthy` / `restore` / `reset`.
518            ("POST", "/api/chaos") => Self::handle_chaos_request(body, state).await,
519            // Read current overrides so the UI can show "what's
520            // currently broken on purpose".
521            ("GET", "/api/chaos") => {
522                let overrides = state.chaos_overrides.read().await.clone();
523                Ok((200, serde_json::to_value(overrides)?))
524            }
525
526            // Migration / traffic-mirror status
527            ("GET", "/api/migration/status") | ("GET", "/migration/status") => {
528                match state.migration.read().await.as_ref() {
529                    Some(info) => {
530                        let st = crate::mirror::status(&info.target, info.writes_only, &info.metrics);
531                        let mut v = serde_json::to_value(st)?;
532                        let cut = info.cutover.load_full().is_some();
533                        v["cutover_active"] = serde_json::json!(cut);
534                        Ok((200, v))
535                    }
536                    None => Ok((503, serde_json::json!({ "error": "traffic mirroring not enabled" }))),
537                }
538            }
539
540            // Promote the mirror target to primary: new connections route there.
541            ("POST", "/api/migration/cutover") | ("POST", "/migration/cutover") => {
542                let info = state.migration.read().await.clone();
543                let Some(info) = info else {
544                    return Ok((503, serde_json::json!({ "error": "traffic mirroring not enabled" })));
545                };
546                let force = path.contains("force=true")
547                    || body.map(|b| b.contains("\"force\":true")).unwrap_or(false);
548                let st = crate::mirror::status(&info.target, info.writes_only, &info.metrics);
549                if !st.migration_ready && !force {
550                    return Ok((409, serde_json::json!({
551                        "ok": false,
552                        "error": "not migration_ready (backlog/drops present); pass force=true to override",
553                        "status": st,
554                    })));
555                }
556                info.cutover.store(Arc::new(Some(Arc::new(info.cutover_target.clone()))));
557                tracing::warn!(target = %info.cutover_target.addr, "migration cutover: new connections now route to the promoted target");
558                Ok((200, serde_json::json!({ "ok": true, "promoted_to": info.cutover_target.addr })))
559            }
560
561            // Roll a cutover back to the original primary.
562            ("POST", "/api/migration/cutover/rollback") | ("POST", "/migration/cutover/rollback") => {
563                let info = state.migration.read().await.clone();
564                let Some(info) = info else {
565                    return Ok((503, serde_json::json!({ "error": "traffic mirroring not enabled" })));
566                };
567                info.cutover.store(Arc::new(None));
568                Ok((200, serde_json::json!({ "ok": true, "rolled_back": true })))
569            }
570
571            // Snapshot-bootstrap named tables from the source into the mirror.
572            ("POST", "/api/migration/snapshot") | ("POST", "/migration/snapshot") => {
573                let info = state.migration.read().await.clone();
574                let Some(info) = info else {
575                    return Ok((503, serde_json::json!({ "error": "traffic mirroring not enabled" })));
576                };
577                let body = body.unwrap_or("{}");
578                let req: serde_json::Value = serde_json::from_str(body)
579                    .map_err(|e| ProxyError::Internal(format!("invalid JSON: {}", e)))?;
580                let tables: Vec<String> = req
581                    .get("tables")
582                    .and_then(|t| t.as_array())
583                    .map(|a| a.iter().filter_map(|v| v.as_str().map(String::from)).collect())
584                    .unwrap_or_default();
585                if tables.is_empty() {
586                    return Ok((400, serde_json::json!({ "error": "provide a non-empty 'tables' array" })));
587                }
588                match crate::mirror::snapshot_tables(&info.config, &tables).await {
589                    Ok(rep) => {
590                        let total: u64 = rep.iter().map(|t| t.copied).sum();
591                        Ok((200, serde_json::json!({ "ok": true, "tables": rep, "rows_copied": total })))
592                    }
593                    Err(e) => Ok((500, serde_json::json!({ "ok": false, "error": e }))),
594                }
595            }
596
597            // Branch databases: list / create / drop.
598            ("GET", p) if p == "/api/branch" || p == "/branch" || p.starts_with("/api/branch?") => {
599                let cfg = state.branch.read().await.clone();
600                let Some(cfg) = cfg else {
601                    return Ok((503, serde_json::json!({ "error": "branch databases not enabled" })));
602                };
603                match crate::branch::list(&cfg).await {
604                    Ok(branches) => Ok((200, serde_json::json!({ "branches": branches }))),
605                    Err(e) => Ok((500, serde_json::json!({ "error": e }))),
606                }
607            }
608            ("POST", p) if p == "/api/branch" || p == "/branch" => {
609                let cfg = state.branch.read().await.clone();
610                let Some(cfg) = cfg else {
611                    return Ok((503, serde_json::json!({ "error": "branch databases not enabled" })));
612                };
613                let req: serde_json::Value =
614                    serde_json::from_str(body.unwrap_or("{}")).map_err(|e| ProxyError::Internal(format!("invalid JSON: {}", e)))?;
615                let name = req.get("name").and_then(|v| v.as_str()).unwrap_or("");
616                if name.is_empty() {
617                    return Ok((400, serde_json::json!({ "error": "provide 'name'" })));
618                }
619                let base = req.get("base").and_then(|v| v.as_str());
620                match crate::branch::create(&cfg, name, base).await {
621                    Ok(()) => Ok((200, serde_json::json!({ "ok": true, "branch": name,
622                        "base": base.unwrap_or(&cfg.base_database) }))),
623                    Err(e) => Ok((500, serde_json::json!({ "ok": false, "error": e }))),
624                }
625            }
626            ("DELETE", p) if p.starts_with("/api/branch") || p.starts_with("/branch") => {
627                let cfg = state.branch.read().await.clone();
628                let Some(cfg) = cfg else {
629                    return Ok((503, serde_json::json!({ "error": "branch databases not enabled" })));
630                };
631                let name = p.find("name=").map(|i| &p[i + 5..]).unwrap_or("");
632                if name.is_empty() {
633                    return Ok((400, serde_json::json!({ "error": "provide ?name=<branch>" })));
634                }
635                match crate::branch::drop(&cfg, name).await {
636                    Ok(()) => Ok((200, serde_json::json!({ "ok": true, "dropped": name }))),
637                    Err(e) => Ok((500, serde_json::json!({ "ok": false, "error": e }))),
638                }
639            }
640
641            // Configuration
642            ("GET", "/config") => {
643                let config = state.config_snapshot.read().await.clone();
644                Ok((200, serde_json::to_value(config)?))
645            }
646
647            // Sessions
648            ("GET", "/sessions") => {
649                let count = *state.active_sessions.read().await;
650                let response = SessionsResponse {
651                    active_sessions: count,
652                };
653                Ok((200, serde_json::to_value(response)?))
654            }
655
656            // Pools
657            ("GET", "/pools") => {
658                let pools = Self::get_pool_stats(state).await;
659                Ok((200, serde_json::to_value(pools)?))
660            }
661
662            // Version
663            ("GET", "/version") => {
664                let response = VersionResponse {
665                    version: crate::VERSION.to_string(),
666                    build_time: env!("CARGO_PKG_VERSION").to_string(),
667                };
668                Ok((200, serde_json::to_value(response)?))
669            }
670
671            // Not found
672            _ => Ok((404, serde_json::json!({ "error": "Not found" }))),
673        }
674    }
675
676    /// Handle SQL execution request with TWR (Transparent Write Routing)
677    async fn handle_sql_request(
678        body: Option<&str>,
679        state: &Arc<AdminState>,
680    ) -> Result<(u16, serde_json::Value)> {
681        // Parse request body
682        let body = body.ok_or_else(|| ProxyError::Internal("Missing request body".to_string()))?;
683        let request: SqlRequest = serde_json::from_str(body)
684            .map_err(|e| ProxyError::Internal(format!("Invalid JSON: {}", e)))?;
685
686        let sql = request.query.trim();
687        if sql.is_empty() {
688            return Ok((400, serde_json::json!({ "error": "Empty query" })));
689        }
690
691        // Classify query as read or write
692        let is_write = Self::is_write_query(sql);
693        let query_type = if is_write { "write" } else { "read" };
694
695        // Get proxy config
696        let proxy_config = state.proxy_config.read().await;
697        let config = proxy_config.as_ref()
698            .ok_or_else(|| ProxyError::Internal("Proxy config not initialized".to_string()))?;
699
700        // Get node health
701        let health = state.node_health.read().await;
702
703        // Select target node based on query type
704        let target_node = if is_write {
705            // Write queries always go to primary
706            Self::select_primary_node(config, &health)?
707        } else {
708            // Read queries can go to any healthy node with load balancing
709            Self::select_read_node(config, &health, state)?
710        };
711
712        let target_address = format!("{}:{}", target_node.host, target_node.port);
713        // Use HTTP port from node config (defaults to 8080)
714        let http_port = target_node.http_port;
715        let http_url = format!("http://{}:{}/api/sql", target_node.host, http_port);
716
717        tracing::debug!(
718            "Routing {} query to {} ({})",
719            query_type,
720            target_address,
721            match target_node.role {
722                NodeRole::Primary => "primary",
723                NodeRole::Standby => "standby",
724                NodeRole::ReadReplica => "replica",
725            }
726        );
727
728        // Forward request to backend node
729        let result = Self::forward_sql_request(&http_url, sql).await?;
730
731        // Return result with routing metadata
732        let response = SqlResponse {
733            query_type: query_type.to_string(),
734            routed_to: target_address,
735            node_role: format!("{:?}", target_node.role).to_lowercase(),
736            result,
737        };
738
739        Ok((200, serde_json::to_value(response)?))
740    }
741
742    /// Determine if a query is a write operation
743    fn is_write_query(sql: &str) -> bool {
744        let upper = sql.trim().to_uppercase();
745
746        // Write operations
747        if upper.starts_with("INSERT")
748            || upper.starts_with("UPDATE")
749            || upper.starts_with("DELETE")
750            || upper.starts_with("CREATE")
751            || upper.starts_with("ALTER")
752            || upper.starts_with("DROP")
753            || upper.starts_with("TRUNCATE")
754            || upper.starts_with("GRANT")
755            || upper.starts_with("REVOKE")
756            || upper.starts_with("VACUUM")
757            || upper.starts_with("REINDEX")
758            || upper.starts_with("MERGE")
759            || upper.starts_with("UPSERT")
760        {
761            return true;
762        }
763
764        // Transaction control that might contain writes
765        if upper.starts_with("BEGIN")
766            || upper.starts_with("COMMIT")
767            || upper.starts_with("ROLLBACK")
768            || upper.starts_with("SAVEPOINT")
769        {
770            // Transaction control goes to primary for safety
771            return true;
772        }
773
774        // Read operations
775        false
776    }
777
778    /// Select primary node for write queries
779    fn select_primary_node<'a>(
780        config: &'a ProxyConfig,
781        health: &HashMap<String, NodeHealth>,
782    ) -> Result<&'a NodeConfig> {
783        config.nodes.iter()
784            .find(|n| {
785                n.role == NodeRole::Primary
786                    && n.enabled
787                    && health.get(&n.address()).map(|h| h.healthy).unwrap_or(false)
788            })
789            .ok_or_else(|| ProxyError::Internal("No healthy primary node available".to_string()))
790    }
791
792    /// Select node for read queries with load balancing
793    fn select_read_node<'a>(
794        config: &'a ProxyConfig,
795        health: &HashMap<String, NodeHealth>,
796        state: &AdminState,
797    ) -> Result<&'a NodeConfig> {
798        // Get all healthy nodes (primary, standby, or replica)
799        let healthy_nodes: Vec<&NodeConfig> = config.nodes.iter()
800            .filter(|n| n.enabled && health.get(&n.address()).map(|h| h.healthy).unwrap_or(false))
801            .collect();
802
803        if healthy_nodes.is_empty() {
804            return Err(ProxyError::Internal("No healthy nodes available".to_string()));
805        }
806
807        // If read/write splitting is enabled and there are standbys, prefer them
808        if config.load_balancer.read_write_split {
809            let read_nodes: Vec<&NodeConfig> = healthy_nodes.iter()
810                .filter(|n| n.role == NodeRole::Standby || n.role == NodeRole::ReadReplica)
811                .copied()
812                .collect();
813
814            if !read_nodes.is_empty() {
815                // Round-robin across read nodes
816                let counter = state.read_lb_counter.fetch_add(1, Ordering::Relaxed);
817                let index = counter % read_nodes.len();
818                return Ok(read_nodes[index]);
819            }
820        }
821
822        // Fall back to round-robin across all healthy nodes
823        let counter = state.read_lb_counter.fetch_add(1, Ordering::Relaxed);
824        let index = counter % healthy_nodes.len();
825        Ok(healthy_nodes[index])
826    }
827
828    /// Forward SQL request to backend node's HTTP API
829    async fn forward_sql_request(url: &str, sql: &str) -> Result<serde_json::Value> {
830        // Build HTTP request
831        let request_body = serde_json::json!({ "query": sql });
832        let body_bytes = serde_json::to_vec(&request_body)
833            .map_err(|e| ProxyError::Internal(format!("JSON serialization error: {}", e)))?;
834
835        // Parse URL
836        let url_parts: Vec<&str> = url.trim_start_matches("http://").splitn(2, '/').collect();
837        if url_parts.is_empty() {
838            return Err(ProxyError::Internal("Invalid URL".to_string()));
839        }
840
841        let host_port = url_parts[0];
842        let path = if url_parts.len() > 1 {
843            format!("/{}", url_parts[1])
844        } else {
845            "/".to_string()
846        };
847
848        // Connect to backend
849        let stream = TcpStream::connect(host_port).await
850            .map_err(|e| ProxyError::Network(format!("Failed to connect to {}: {}", host_port, e)))?;
851
852        let (reader, mut writer) = stream.into_split();
853        let mut reader = BufReader::new(reader);
854
855        // Send HTTP request
856        let request = format!(
857            "POST {} HTTP/1.1\r\nHost: {}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
858            path,
859            host_port,
860            body_bytes.len()
861        );
862
863        writer.write_all(request.as_bytes()).await
864            .map_err(|e| ProxyError::Network(format!("Write error: {}", e)))?;
865        writer.write_all(&body_bytes).await
866            .map_err(|e| ProxyError::Network(format!("Write body error: {}", e)))?;
867
868        // Read response headers
869        let mut response_headers = Vec::new();
870        let mut line = String::new();
871        let mut content_length: usize = 0;
872
873        loop {
874            line.clear();
875            let bytes_read = reader.read_line(&mut line).await
876                .map_err(|e| ProxyError::Network(format!("Response read error: {}", e)))?;
877
878            if bytes_read == 0 || line == "\r\n" {
879                break;
880            }
881
882            let trimmed = line.trim();
883            if trimmed.to_lowercase().starts_with("content-length:") {
884                if let Some(len_str) = trimmed.split(':').nth(1) {
885                    content_length = len_str.trim().parse().unwrap_or(0);
886                }
887            }
888            response_headers.push(trimmed.to_string());
889        }
890
891        // Read response body
892        let mut body_buf = vec![0u8; content_length];
893        if content_length > 0 {
894            reader.read_exact(&mut body_buf).await
895                .map_err(|e| ProxyError::Network(format!("Response body read error: {}", e)))?;
896        }
897
898        let response_body = String::from_utf8_lossy(&body_buf);
899
900        // Parse JSON response
901        serde_json::from_str(&response_body)
902            .map_err(|e| ProxyError::Internal(format!("Invalid JSON response: {} - body: {}", e, response_body)))
903    }
904
905    /// Check if proxy is ready to accept connections
906    async fn check_readiness(state: &Arc<AdminState>) -> bool {
907        let health = state.node_health.read().await;
908
909        // Need at least one healthy primary
910        health.values().any(|h| h.healthy)
911    }
912
913    /// Set node enabled status
914    async fn set_node_enabled(state: &Arc<AdminState>, node_addr: &str, enabled: bool) -> Result<()> {
915        let mut health = state.node_health.write().await;
916
917        if let Some(node_health) = health.get_mut(node_addr) {
918            node_health.healthy = enabled;
919            Ok(())
920        } else {
921            Err(ProxyError::Config(format!("Node not found: {}", node_addr)))
922        }
923    }
924
925    /// Get pool statistics
926    async fn get_pool_stats(_state: &Arc<AdminState>) -> Vec<PoolStatsResponse> {
927        // Placeholder - in real implementation would query pool state
928        Vec::new()
929    }
930
931    /// Handle `POST /api/replay`. Body is a JSON `ReplayRequestBody`.
932    /// Returns 503 when no replay engine is attached, 400 on a malformed
933    /// body or inverted window, 200 with `ReplaySummary` on success.
934    #[cfg(feature = "ha-tr")]
935    async fn handle_replay_request(
936        body: Option<&str>,
937        state: &Arc<AdminState>,
938    ) -> Result<(u16, serde_json::Value)> {
939        let raw = body.ok_or_else(|| {
940            ProxyError::Internal("replay: empty request body".to_string())
941        })?;
942        let req: ReplayRequestBody = match serde_json::from_str(raw) {
943            Ok(r) => r,
944            Err(e) => {
945                return Ok((
946                    400,
947                    serde_json::json!({ "error": format!("invalid body: {}", e) }),
948                ));
949            }
950        };
951        let engine = match state.replay_engine.read().await.clone() {
952            Some(e) => e,
953            None => {
954                return Ok((
955                    503,
956                    serde_json::json!({ "error": "replay engine not attached" }),
957                ));
958            }
959        };
960        let tt = TimeTravelRequest {
961            from: req.from,
962            to: req.to,
963            target_host: req.target_host,
964            target_port: req.target_port,
965            target_user: req.target_user,
966            target_password: req.target_password,
967            target_database: req.target_database,
968        };
969        match engine.replay_window(&tt).await {
970            Ok(summary) => Ok((200, serde_json::to_value(summary)?)),
971            Err(e) => Ok((
972                500,
973                serde_json::json!({ "error": format!("replay failed: {}", e) }),
974            )),
975        }
976    }
977
978    /// `GET /api/edge` — surfaces edge-mode state: cache stats +
979    /// the list of registered edges (when running in home mode).
980    #[cfg(feature = "edge-proxy")]
981    async fn handle_edge_status(
982        state: &Arc<AdminState>,
983    ) -> Result<(u16, serde_json::Value)> {
984        let cache_stats = match state.edge_cache.read().await.clone() {
985            Some(c) => Some(c.stats()),
986            None => None,
987        };
988        let edges = match state.edge_registry.read().await.clone() {
989            Some(r) => r.list(),
990            None => Vec::new(),
991        };
992        Ok((200, serde_json::json!({
993            "cache":          cache_stats,
994            "registered":     edges,
995            "edge_count":     edges.len(),
996        })))
997    }
998
999    /// `POST /api/edge/register` — edges call this once at boot to
1000    /// announce themselves to the home. Body shape:
1001    /// `{"edge_id":"e1","region":"us-east","base_url":"https://e1"}`.
1002    /// Returns 201 with the assigned slot, 503 when registry full.
1003    #[cfg(feature = "edge-proxy")]
1004    async fn handle_edge_register(
1005        body: Option<&str>,
1006        state: &Arc<AdminState>,
1007    ) -> Result<(u16, serde_json::Value)> {
1008        let raw = body.ok_or_else(|| {
1009            ProxyError::Internal("edge register: empty body".to_string())
1010        })?;
1011        let req: EdgeRegisterBody = match serde_json::from_str(raw) {
1012            Ok(r) => r,
1013            Err(e) => {
1014                return Ok((
1015                    400,
1016                    serde_json::json!({ "error": format!("invalid body: {}", e) }),
1017                ));
1018            }
1019        };
1020        let registry = match state.edge_registry.read().await.clone() {
1021            Some(r) => r,
1022            None => {
1023                return Ok((
1024                    503,
1025                    serde_json::json!({ "error": "edge registry not attached" }),
1026                ));
1027            }
1028        };
1029        let now = chrono::Utc::now().to_rfc3339();
1030        match registry.register(&req.edge_id, &req.region, &req.base_url, &now) {
1031            Ok(_rx) => {
1032                // Receiver dropped here — in production the SSE
1033                // handler keeps it alive for the connection's
1034                // lifetime. For the JSON endpoint, we acknowledge
1035                // the registration and the edge polls /api/edge for
1036                // invalidations until SSE is wired.
1037                Ok((201, serde_json::json!({
1038                    "edge_id":  req.edge_id,
1039                    "region":   req.region,
1040                    "base_url": req.base_url,
1041                    "registered_at": now,
1042                })))
1043            }
1044            Err(e) => Ok((
1045                503,
1046                serde_json::json!({ "error": e.to_string() }),
1047            )),
1048        }
1049    }
1050
1051    /// `POST /api/edge/invalidate` — manual invalidation for ops
1052    /// drills. The proxy normally fans out invalidations
1053    /// automatically on writes; this endpoint is for "I just ran
1054    /// a migration outside the proxy, please drop caches".
1055    /// Body: `{"tables":["users"],"up_to_version":null}` — null
1056    /// version means "use the cache's current version" (drop all).
1057    #[cfg(feature = "edge-proxy")]
1058    async fn handle_edge_invalidate(
1059        body: Option<&str>,
1060        state: &Arc<AdminState>,
1061    ) -> Result<(u16, serde_json::Value)> {
1062        let raw = body.ok_or_else(|| {
1063            ProxyError::Internal("edge invalidate: empty body".to_string())
1064        })?;
1065        let req: EdgeInvalidateBody = match serde_json::from_str(raw) {
1066            Ok(r) => r,
1067            Err(e) => {
1068                return Ok((
1069                    400,
1070                    serde_json::json!({ "error": format!("invalid body: {}", e) }),
1071                ));
1072            }
1073        };
1074        let cache = match state.edge_cache.read().await.clone() {
1075            Some(c) => c,
1076            None => {
1077                return Ok((
1078                    503,
1079                    serde_json::json!({ "error": "edge cache not attached" }),
1080                ));
1081            }
1082        };
1083        let registry = match state.edge_registry.read().await.clone() {
1084            Some(r) => r,
1085            None => {
1086                return Ok((
1087                    503,
1088                    serde_json::json!({ "error": "edge registry not attached" }),
1089                ));
1090            }
1091        };
1092        let version = req.up_to_version.unwrap_or_else(|| cache.next_version());
1093        // Local cache invalidation (home-side cache, if any).
1094        let dropped_local = cache.invalidate(version, &req.tables);
1095        // Fan out to every registered edge.
1096        let ev = InvalidationEvent {
1097            up_to_version: version,
1098            tables: req.tables.clone(),
1099            committed_at: chrono::Utc::now().to_rfc3339(),
1100        };
1101        let (sent, pruned) = registry.broadcast(ev).await;
1102        Ok((200, serde_json::json!({
1103            "version":         version,
1104            "tables":          req.tables,
1105            "dropped_local":   dropped_local,
1106            "edges_notified":  sent,
1107            "edges_pruned":    pruned,
1108        })))
1109    }
1110
1111    /// Handle `GET /anomalies`. Returns the anomaly detector's
1112    /// recent-events ring buffer as JSON. Optional `?limit=N`
1113    /// query string clamps the response size (default 100, max 1024).
1114    /// Returns 503 when the detector hasn't been attached.
1115    #[cfg(feature = "anomaly-detection")]
1116    async fn handle_anomalies_list(
1117        path: &str,
1118        state: &Arc<AdminState>,
1119    ) -> Result<(u16, serde_json::Value)> {
1120        let limit = parse_limit_query(path, 100, 1024);
1121        let det = match state.anomaly_detector.read().await.clone() {
1122            Some(d) => d,
1123            None => {
1124                return Ok((
1125                    503,
1126                    serde_json::json!({ "error": "anomaly detector not attached" }),
1127                ));
1128            }
1129        };
1130        let events = det.recent_events(limit);
1131        Ok((200, serde_json::json!({
1132            "count":     events.len(),
1133            "limit":     limit,
1134            "events":    events,
1135            "buffer_total": det.event_count(),
1136        })))
1137    }
1138
1139    /// Handle `POST /api/shadow`. Body is a JSON `ShadowRequestBody`.
1140    /// Connects to both source and shadow backends, runs the SQL on
1141    /// each, returns a `ShadowExecuteReport` with the diff.
1142    ///
1143    /// Status codes:
1144    ///   200 — both sides ran (report carries pass/fail details)
1145    ///   400 — malformed body
1146    ///   500 — source connect failure (shadow connect failures end up
1147    ///         in the report rather than the HTTP status)
1148    #[cfg(feature = "ha-tr")]
1149    async fn handle_shadow_request(
1150        body: Option<&str>,
1151    ) -> Result<(u16, serde_json::Value)> {
1152        use crate::backend::{tls::default_client_config, BackendClient, BackendConfig, ParamValue, TlsMode};
1153        use crate::shadow_execute::shadow_execute;
1154
1155        let raw = body.ok_or_else(|| {
1156            ProxyError::Internal("shadow: empty request body".to_string())
1157        })?;
1158        let req: ShadowRequestBody = match serde_json::from_str(raw) {
1159            Ok(r) => r,
1160            Err(e) => {
1161                return Ok((
1162                    400,
1163                    serde_json::json!({ "error": format!("invalid body: {}", e) }),
1164                ));
1165            }
1166        };
1167
1168        // Build the two configs from the request. TLS off + 5s
1169        // connect / 30s query timeouts mirror the replay defaults.
1170        let mk_cfg = |host: String, port: u16, user: Option<String>, password: Option<String>, database: Option<String>| BackendConfig {
1171            host,
1172            port,
1173            user: user.unwrap_or_else(|| "postgres".into()),
1174            password,
1175            database,
1176            application_name: Some("heliosdb-proxy-shadow".into()),
1177            tls_mode: TlsMode::Disable,
1178            connect_timeout: std::time::Duration::from_secs(5),
1179            query_timeout: std::time::Duration::from_secs(30),
1180            tls_config: default_client_config(),
1181        };
1182        let source_cfg = mk_cfg(
1183            req.source_host,
1184            req.source_port,
1185            req.source_user,
1186            req.source_password,
1187            req.source_database,
1188        );
1189        let shadow_cfg = mk_cfg(
1190            req.shadow_host,
1191            req.shadow_port,
1192            req.shadow_user,
1193            req.shadow_password,
1194            req.shadow_database,
1195        );
1196
1197        // Connect to source. Connect failure here is a real HTTP
1198        // error since we can't even attempt the diff; shadow connect
1199        // failures land inside the report as `shadow_error`.
1200        let mut source = match BackendClient::connect(&source_cfg).await {
1201            Ok(c) => c,
1202            Err(e) => {
1203                return Ok((
1204                    500,
1205                    serde_json::json!({ "error": format!("source connect: {}", e) }),
1206                ));
1207            }
1208        };
1209
1210        let params: Vec<ParamValue> = req
1211            .params
1212            .unwrap_or_default()
1213            .into_iter()
1214            .map(|s| ParamValue::Text(s))
1215            .collect();
1216
1217        let outcome = shadow_execute(&mut source, &shadow_cfg, &req.sql, &params).await;
1218        source.close().await;
1219
1220        match outcome {
1221            Ok((_qr, report)) => Ok((200, serde_json::json!({
1222                "sql":                report.sql,
1223                "both_succeeded":     report.both_succeeded,
1224                "row_count_match":    report.row_count_match,
1225                "row_hash_match":     report.row_hash_match,
1226                "primary_elapsed_us": report.primary_elapsed_us,
1227                "shadow_elapsed_us":  report.shadow_elapsed_us,
1228                "primary_error":      report.primary_error,
1229                "shadow_error":       report.shadow_error,
1230                "is_clean":           report.is_clean(),
1231            }))),
1232            Err(e) => Ok((
1233                500,
1234                serde_json::json!({ "error": format!("shadow execute: {}", e) }),
1235            )),
1236        }
1237    }
1238
1239    /// Handle `POST /api/chaos`. Body is a JSON `ChaosRequestBody`.
1240    ///
1241    /// Supported actions (intentionally narrow — the goal is "test
1242    /// the failover machinery without external chaos tooling", not
1243    /// "ship a kitchen-sink fault injector"):
1244    ///
1245    ///   force_unhealthy { target_node }  — flip the node's health flag
1246    ///                                      to false; the failover
1247    ///                                      controller observes this and
1248    ///                                      reroutes traffic.
1249    ///   restore         { target_node }  — flip the node's health flag
1250    ///                                      back to true and clear the
1251    ///                                      override entry.
1252    ///   reset                            — restore every overridden
1253    ///                                      node in one call.
1254    async fn handle_chaos_request(
1255        body: Option<&str>,
1256        state: &Arc<AdminState>,
1257    ) -> Result<(u16, serde_json::Value)> {
1258        let raw = body.ok_or_else(|| {
1259            ProxyError::Internal("chaos: empty request body".to_string())
1260        })?;
1261        let action: ChaosAction = match serde_json::from_str(raw) {
1262            Ok(a) => a,
1263            Err(e) => {
1264                return Ok((
1265                    400,
1266                    serde_json::json!({ "error": format!("invalid body: {}", e) }),
1267                ));
1268            }
1269        };
1270        match action {
1271            ChaosAction::ForceUnhealthy { target_node } => {
1272                if let Err(e) = Self::set_node_enabled(state, &target_node, false).await {
1273                    return Ok((
1274                        404,
1275                        serde_json::json!({ "error": e.to_string() }),
1276                    ));
1277                }
1278                state.chaos_overrides.write().await.insert(
1279                    target_node.clone(),
1280                    ChaosOverride {
1281                        since: chrono::Utc::now().to_rfc3339(),
1282                        kind: "force_unhealthy".to_string(),
1283                        note: format!("forced unhealthy via chaos endpoint"),
1284                    },
1285                );
1286                Ok((200, serde_json::json!({
1287                    "applied":     "force_unhealthy",
1288                    "target_node": target_node,
1289                })))
1290            }
1291            ChaosAction::Restore { target_node } => {
1292                if let Err(e) = Self::set_node_enabled(state, &target_node, true).await {
1293                    return Ok((
1294                        404,
1295                        serde_json::json!({ "error": e.to_string() }),
1296                    ));
1297                }
1298                state.chaos_overrides.write().await.remove(&target_node);
1299                Ok((200, serde_json::json!({
1300                    "restored":    target_node,
1301                })))
1302            }
1303            ChaosAction::Reset => {
1304                let overrides: Vec<String> =
1305                    state.chaos_overrides.read().await.keys().cloned().collect();
1306                let mut restored = Vec::with_capacity(overrides.len());
1307                for addr in overrides {
1308                    let _ = Self::set_node_enabled(state, &addr, true).await;
1309                    restored.push(addr);
1310                }
1311                state.chaos_overrides.write().await.clear();
1312                Ok((200, serde_json::json!({
1313                    "reset":      true,
1314                    "restored":   restored,
1315                })))
1316            }
1317        }
1318    }
1319
1320    /// Handle `GET /plugins`. Returns 503 when no plugin manager is
1321    /// attached, 200 with `Vec<PluginListEntry>` otherwise. Building
1322    /// the response in admin.rs (rather than serialising
1323    /// `plugins::PluginInfo` directly) keeps the plugins module
1324    /// independent of serde — only the wire shape lives here.
1325    #[cfg(feature = "wasm-plugins")]
1326    async fn handle_plugins_list(state: &Arc<AdminState>) -> Result<(u16, serde_json::Value)> {
1327        let pm = match state.plugin_manager.read().await.clone() {
1328            Some(p) => p,
1329            None => {
1330                return Ok((
1331                    503,
1332                    serde_json::json!({ "error": "plugin manager not attached" }),
1333                ));
1334            }
1335        };
1336        let plugins: Vec<PluginListEntry> = pm
1337            .list_plugins()
1338            .into_iter()
1339            .map(|info| PluginListEntry {
1340                name: info.name,
1341                version: info.version,
1342                description: info.description,
1343                hooks: info
1344                    .hooks
1345                    .iter()
1346                    .map(|h| h.export_name().to_string())
1347                    .collect(),
1348                state: format!("{:?}", info.state),
1349                invocations: info.stats.total_calls,
1350                errors: info.stats.error_count,
1351            })
1352            .collect();
1353        Ok((200, serde_json::to_value(plugins)?))
1354    }
1355
1356    #[cfg(not(feature = "wasm-plugins"))]
1357    async fn handle_plugins_list(_state: &Arc<AdminState>) -> Result<(u16, serde_json::Value)> {
1358        Ok((
1359            503,
1360            serde_json::json!({ "error": "wasm-plugins feature not compiled in" }),
1361        ))
1362    }
1363
1364    /// Compute the joined topology view used by `/topology`.
1365    ///
1366    /// `currentPrimary` is the address of the first node whose role
1367    /// is "primary" and whose health entry is `healthy = true`. None
1368    /// is the legitimate answer when failover is in progress.
1369    async fn compute_topology(state: &Arc<AdminState>) -> TopologyResponse {
1370        let health = state.node_health.read().await;
1371        let cfg = state.config_snapshot.read().await;
1372
1373        let mut current_primary: Option<String> = None;
1374        for n in &cfg.nodes {
1375            if n.role.eq_ignore_ascii_case("primary") {
1376                let healthy = health.get(&n.address).map(|h| h.healthy).unwrap_or(false);
1377                if healthy {
1378                    current_primary = Some(n.address.clone());
1379                    break;
1380                }
1381            }
1382        }
1383
1384        let healthy_nodes = health.values().filter(|h| h.healthy).count() as u32;
1385        let unhealthy_nodes = health.values().filter(|h| !h.healthy).count() as u32;
1386        let total_nodes = cfg.nodes.len() as u32;
1387
1388        TopologyResponse {
1389            current_primary,
1390            healthy_nodes,
1391            unhealthy_nodes,
1392            total_nodes,
1393            last_failover_at: None,
1394        }
1395    }
1396
1397    /// Format metrics as Prometheus text format
1398    fn format_prometheus_metrics(metrics: &ServerMetricsSnapshot) -> String {
1399        let mut output = String::new();
1400
1401        output.push_str("# HELP heliosdb_proxy_connections_total Total connections accepted\n");
1402        output.push_str("# TYPE heliosdb_proxy_connections_total counter\n");
1403        output.push_str(&format!(
1404            "heliosdb_proxy_connections_total {}\n",
1405            metrics.connections_accepted
1406        ));
1407
1408        output.push_str("# HELP heliosdb_proxy_connections_closed Total connections closed\n");
1409        output.push_str("# TYPE heliosdb_proxy_connections_closed counter\n");
1410        output.push_str(&format!(
1411            "heliosdb_proxy_connections_closed {}\n",
1412            metrics.connections_closed
1413        ));
1414
1415        output.push_str("# HELP heliosdb_proxy_queries_total Total queries processed\n");
1416        output.push_str("# TYPE heliosdb_proxy_queries_total counter\n");
1417        output.push_str(&format!(
1418            "heliosdb_proxy_queries_total {}\n",
1419            metrics.queries_processed
1420        ));
1421
1422        output.push_str("# HELP heliosdb_proxy_bytes_received_total Total bytes received\n");
1423        output.push_str("# TYPE heliosdb_proxy_bytes_received_total counter\n");
1424        output.push_str(&format!(
1425            "heliosdb_proxy_bytes_received_total {}\n",
1426            metrics.bytes_received
1427        ));
1428
1429        output.push_str("# HELP heliosdb_proxy_bytes_sent_total Total bytes sent\n");
1430        output.push_str("# TYPE heliosdb_proxy_bytes_sent_total counter\n");
1431        output.push_str(&format!(
1432            "heliosdb_proxy_bytes_sent_total {}\n",
1433            metrics.bytes_sent
1434        ));
1435
1436        output.push_str("# HELP heliosdb_proxy_failovers_total Total failovers\n");
1437        output.push_str("# TYPE heliosdb_proxy_failovers_total counter\n");
1438        output.push_str(&format!(
1439            "heliosdb_proxy_failovers_total {}\n",
1440            metrics.failovers
1441        ));
1442
1443        output
1444    }
1445
1446    /// Send HTTP response
1447    async fn send_response(
1448        writer: &mut tokio::net::tcp::WriteHalf<'_>,
1449        status: u16,
1450        status_text: &str,
1451        body: &str,
1452    ) -> Result<()> {
1453        let response = format!(
1454            "HTTP/1.1 {} {}\r\nContent-Type: text/plain\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
1455            status,
1456            status_text,
1457            body.len(),
1458            body
1459        );
1460
1461        writer
1462            .write_all(response.as_bytes())
1463            .await
1464            .map_err(|e| ProxyError::Network(format!("Write error: {}", e)))?;
1465
1466        Ok(())
1467    }
1468
1469    /// Send JSON HTTP response
1470    async fn send_json_response<T: Serialize>(
1471        writer: &mut tokio::net::tcp::WriteHalf<'_>,
1472        status: u16,
1473        body: &T,
1474    ) -> Result<()> {
1475        let json = serde_json::to_string(body)
1476            .map_err(|e| ProxyError::Internal(format!("JSON error: {}", e)))?;
1477
1478        let status_text = match status {
1479            200 => "OK",
1480            400 => "Bad Request",
1481            404 => "Not Found",
1482            500 => "Internal Server Error",
1483            503 => "Service Unavailable",
1484            _ => "Unknown",
1485        };
1486
1487        let response = format!(
1488            "HTTP/1.1 {} {}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
1489            status,
1490            status_text,
1491            json.len(),
1492            json
1493        );
1494
1495        writer
1496            .write_all(response.as_bytes())
1497            .await
1498            .map_err(|e| ProxyError::Network(format!("Write error: {}", e)))?;
1499
1500        Ok(())
1501    }
1502
1503    /// Shutdown the admin server
1504    pub fn shutdown(&self) {
1505        let _ = self.shutdown_tx.send(());
1506    }
1507}
1508
1509impl AdminState {
1510    /// Create new admin state
1511    pub fn new() -> Self {
1512        Self {
1513            node_health: RwLock::new(HashMap::new()),
1514            metrics: RwLock::new(ServerMetricsSnapshot {
1515                connections_accepted: 0,
1516                connections_closed: 0,
1517                queries_processed: 0,
1518                bytes_received: 0,
1519                bytes_sent: 0,
1520                failovers: 0,
1521            }),
1522            active_sessions: RwLock::new(0),
1523            config_snapshot: RwLock::new(ConfigSnapshot {
1524                listen_address: String::new(),
1525                admin_address: String::new(),
1526                tr_enabled: false,
1527                tr_mode: String::new(),
1528                pool_min_connections: 0,
1529                pool_max_connections: 0,
1530                nodes: Vec::new(),
1531            }),
1532            proxy_config: RwLock::new(None),
1533            read_lb_counter: AtomicUsize::new(0),
1534            commands: RwLock::new(HashMap::new()),
1535            #[cfg(feature = "ha-tr")]
1536            replay_engine: RwLock::new(None),
1537            #[cfg(feature = "wasm-plugins")]
1538            plugin_manager: RwLock::new(None),
1539            chaos_overrides: RwLock::new(HashMap::new()),
1540            #[cfg(feature = "anomaly-detection")]
1541            anomaly_detector: RwLock::new(None),
1542            #[cfg(feature = "edge-proxy")]
1543            edge_cache: RwLock::new(None),
1544            #[cfg(feature = "edge-proxy")]
1545            edge_registry: RwLock::new(None),
1546            auth_token: RwLock::new(None),
1547            migration: RwLock::new(None),
1548            branch: RwLock::new(None),
1549        }
1550    }
1551
1552    /// Set the admin Bearer token (wired by the server at startup).
1553    pub async fn with_auth_token(&self, token: Option<String>) {
1554        *self.auth_token.write().await = token;
1555    }
1556
1557    /// Attach traffic-mirror info so `/api/migration/status` can report it.
1558    pub async fn with_migration(&self, info: MigrationInfo) {
1559        *self.migration.write().await = Some(info);
1560    }
1561
1562    /// Attach branch-database config so `/api/branch` can provision.
1563    pub async fn with_branch(&self, cfg: crate::config::BranchConfig) {
1564        *self.branch.write().await = Some(cfg);
1565    }
1566
1567    /// Attach an anomaly detector. Mirror of with_replay_engine /
1568    /// with_plugin_manager — wired by the server at startup.
1569    #[cfg(feature = "anomaly-detection")]
1570    pub async fn with_anomaly_detector(&self, detector: Arc<AnomalyDetector>) {
1571        *self.anomaly_detector.write().await = Some(detector);
1572    }
1573
1574    /// Attach edge cache + registry. Server calls this once at
1575    /// startup; both Arcs are the same instances ServerState holds.
1576    #[cfg(feature = "edge-proxy")]
1577    pub async fn with_edge(&self, cache: Arc<EdgeCache>, registry: Arc<EdgeRegistry>) {
1578        *self.edge_cache.write().await = Some(cache);
1579        *self.edge_registry.write().await = Some(registry);
1580    }
1581
1582    /// Attach a time-travel replay engine. Production startup calls
1583    /// this once with a `ReplayEngine` constructed from the proxy's
1584    /// shared `TransactionJournal` + a `BackendConfig` template; the
1585    /// `/api/replay` endpoint returns 503 until this is set.
1586    #[cfg(feature = "ha-tr")]
1587    pub async fn with_replay_engine(&self, engine: Arc<ReplayEngine>) {
1588        *self.replay_engine.write().await = Some(engine);
1589    }
1590
1591    /// Attach a WASM plugin manager. Production startup calls this
1592    /// once with the same Arc held by ProxyServer; the `/plugins`
1593    /// endpoint returns 503 until set.
1594    #[cfg(feature = "wasm-plugins")]
1595    pub async fn with_plugin_manager(&self, manager: Arc<PluginManager>) {
1596        *self.plugin_manager.write().await = Some(manager);
1597    }
1598
1599    /// Set the proxy configuration for SQL routing
1600    pub async fn set_proxy_config(&self, config: ProxyConfig) {
1601        let mut proxy_config = self.proxy_config.write().await;
1602        *proxy_config = Some(config);
1603    }
1604
1605    /// Register a command handler
1606    pub async fn register_command<F>(&self, name: &str, handler: F)
1607    where
1608        F: Fn(&[&str]) -> Result<String> + Send + Sync + 'static,
1609    {
1610        let mut commands = self.commands.write().await;
1611        commands.insert(name.to_string(), Arc::new(handler));
1612    }
1613
1614    /// Execute a command
1615    pub async fn execute_command(&self, name: &str, args: &[&str]) -> Result<String> {
1616        let commands = self.commands.read().await;
1617        match commands.get(name) {
1618            Some(handler) => handler(args),
1619            None => Err(ProxyError::Internal(format!("Unknown command: {}", name))),
1620        }
1621    }
1622}
1623
1624impl Default for AdminState {
1625    fn default() -> Self {
1626        Self::new()
1627    }
1628}
1629
1630// Request and Response types
1631
1632/// SQL execution request
1633#[derive(Debug, Deserialize)]
1634struct SqlRequest {
1635    /// SQL query to execute
1636    query: String,
1637}
1638
1639/// SQL execution response
1640#[derive(Debug, Serialize)]
1641struct SqlResponse {
1642    /// Query type (read/write)
1643    query_type: String,
1644    /// Node the query was routed to
1645    routed_to: String,
1646    /// Role of the target node
1647    node_role: String,
1648    /// Query result from backend
1649    result: serde_json::Value,
1650}
1651
1652#[derive(Serialize)]
1653struct HealthResponse {
1654    status: &'static str,
1655}
1656
1657#[derive(Serialize)]
1658struct ReadinessResponse {
1659    ready: bool,
1660    message: &'static str,
1661}
1662
1663#[derive(Serialize)]
1664struct LivenessResponse {
1665    alive: bool,
1666}
1667
1668#[derive(Serialize)]
1669struct ErrorResponse {
1670    error: String,
1671}
1672
1673#[derive(Serialize)]
1674struct MetricsResponse {
1675    connections_accepted: u64,
1676    connections_closed: u64,
1677    connections_active: u64,
1678    queries_processed: u64,
1679    bytes_received: u64,
1680    bytes_sent: u64,
1681    failovers: u64,
1682}
1683
1684impl From<ServerMetricsSnapshot> for MetricsResponse {
1685    fn from(m: ServerMetricsSnapshot) -> Self {
1686        Self {
1687            connections_accepted: m.connections_accepted,
1688            connections_closed: m.connections_closed,
1689            connections_active: m.connections_accepted.saturating_sub(m.connections_closed),
1690            queries_processed: m.queries_processed,
1691            bytes_received: m.bytes_received,
1692            bytes_sent: m.bytes_sent,
1693            failovers: m.failovers,
1694        }
1695    }
1696}
1697
1698#[derive(Serialize)]
1699struct NodeHealthResponse {
1700    address: String,
1701    healthy: bool,
1702    last_check: String,
1703    failure_count: u32,
1704    last_error: Option<String>,
1705    latency_ms: f64,
1706    replication_lag_bytes: Option<u64>,
1707}
1708
1709impl From<NodeHealth> for NodeHealthResponse {
1710    fn from(h: NodeHealth) -> Self {
1711        Self {
1712            address: h.address,
1713            healthy: h.healthy,
1714            last_check: h.last_check.to_rfc3339(),
1715            failure_count: h.failure_count,
1716            last_error: h.last_error,
1717            latency_ms: h.latency_ms,
1718            replication_lag_bytes: h.replication_lag_bytes,
1719        }
1720    }
1721}
1722
1723#[derive(Serialize)]
1724struct SessionsResponse {
1725    active_sessions: u64,
1726}
1727
1728/// JSON body for `POST /api/edge/register`.
1729#[cfg(feature = "edge-proxy")]
1730#[derive(Debug, Deserialize)]
1731struct EdgeRegisterBody {
1732    edge_id: String,
1733    region: String,
1734    base_url: String,
1735}
1736
1737/// JSON body for `POST /api/edge/invalidate`. `up_to_version` is
1738/// optional — when None, the cache mints the next version stamp
1739/// (effectively "drop everything matching `tables`").
1740#[cfg(feature = "edge-proxy")]
1741#[derive(Debug, Deserialize)]
1742struct EdgeInvalidateBody {
1743    #[serde(default)]
1744    tables: Vec<String>,
1745    #[serde(default)]
1746    up_to_version: Option<u64>,
1747}
1748
1749/// Parse `?limit=N` from a path. Returns clamped value, or `default`
1750/// when the param is missing / unparseable.
1751#[cfg(feature = "anomaly-detection")]
1752fn parse_limit_query(path: &str, default: usize, max: usize) -> usize {
1753    let q = match path.find('?') {
1754        Some(i) => &path[i + 1..],
1755        None => return default,
1756    };
1757    for kv in q.split('&') {
1758        let mut it = kv.splitn(2, '=');
1759        if let (Some(k), Some(v)) = (it.next(), it.next()) {
1760            if k == "limit" {
1761                if let Ok(n) = v.parse::<usize>() {
1762                    return n.min(max);
1763                }
1764            }
1765        }
1766    }
1767    default
1768}
1769
1770/// JSON body for `POST /api/shadow`.
1771#[cfg(feature = "ha-tr")]
1772#[derive(Debug, Deserialize)]
1773struct ShadowRequestBody {
1774    /// SQL to execute on both sides.
1775    sql: String,
1776    /// Optional text-format parameters interpolated into `sql`. None
1777    /// or empty list runs as a simple_query.
1778    #[serde(default)]
1779    params: Option<Vec<String>>,
1780
1781    /// Source backend (the side whose result the application would
1782    /// see in production).
1783    source_host: String,
1784    source_port: u16,
1785    #[serde(default)]
1786    source_user: Option<String>,
1787    #[serde(default)]
1788    source_password: Option<String>,
1789    #[serde(default)]
1790    source_database: Option<String>,
1791
1792    /// Shadow backend (the side being validated — typically a
1793    /// new-version replica or post-migration schema).
1794    shadow_host: String,
1795    shadow_port: u16,
1796    #[serde(default)]
1797    shadow_user: Option<String>,
1798    #[serde(default)]
1799    shadow_password: Option<String>,
1800    #[serde(default)]
1801    shadow_database: Option<String>,
1802}
1803
1804/// Chaos actions the proxy supports today. Forward-compatible —
1805/// unknown actions deserialise as an error.
1806///
1807/// Wire shape: `{"action":"force_unhealthy","target_node":"..."}`.
1808#[derive(Debug, Deserialize)]
1809#[serde(tag = "action", rename_all = "snake_case")]
1810enum ChaosAction {
1811    /// Mark a node unhealthy until restored (or until reset is
1812    /// called). Triggers the failover path the same way a real
1813    /// health-check failure would.
1814    ForceUnhealthy { target_node: String },
1815    /// Mark a previously-overridden node healthy again.
1816    Restore { target_node: String },
1817    /// Reset every chaos override in one call. Idempotent.
1818    Reset,
1819}
1820
1821/// JSON entry returned by `GET /plugins`. Built in admin.rs so the
1822/// plugins module doesn't need a serde dep.
1823#[cfg(feature = "wasm-plugins")]
1824#[derive(Serialize)]
1825struct PluginListEntry {
1826    name: String,
1827    version: String,
1828    description: String,
1829    /// Hook export names (`pre_query`, `post_query`, `route`, ...).
1830    hooks: Vec<String>,
1831    /// `Loading` | `Running` | `Paused` | `Error(...)` | `Unloading`.
1832    state: String,
1833    invocations: u64,
1834    errors: u64,
1835}
1836
1837/// JSON body for `POST /api/replay`.
1838#[cfg(feature = "ha-tr")]
1839#[derive(Debug, Deserialize)]
1840struct ReplayRequestBody {
1841    /// RFC 3339 inclusive window start.
1842    from: DateTime<Utc>,
1843    /// RFC 3339 inclusive window end.
1844    to: DateTime<Utc>,
1845    /// Target backend host (typically a staging DB).
1846    target_host: String,
1847    /// Target backend port.
1848    target_port: u16,
1849    /// Optional credential overrides — when omitted, the engine uses
1850    /// the template values set at server startup. Production callers
1851    /// targeting a separate staging DB pass these explicitly so the
1852    /// proxy doesn't need to hold staging credentials in its own
1853    /// config.
1854    #[serde(default)]
1855    target_user: Option<String>,
1856    #[serde(default)]
1857    target_password: Option<String>,
1858    #[serde(default)]
1859    target_database: Option<String>,
1860}
1861
1862/// Joined view exposed at `/topology`. Field names use camelCase so
1863/// they map cleanly into the Kubernetes operator's CRD status
1864/// (`HeliosProxyStatus.currentPrimary`, etc).
1865#[derive(Serialize)]
1866struct TopologyResponse {
1867    #[serde(rename = "currentPrimary")]
1868    current_primary: Option<String>,
1869    #[serde(rename = "healthyNodes")]
1870    healthy_nodes: u32,
1871    #[serde(rename = "unhealthyNodes")]
1872    unhealthy_nodes: u32,
1873    #[serde(rename = "totalNodes")]
1874    total_nodes: u32,
1875    /// RFC 3339 timestamp of the last detected primary change.
1876    /// `None` when the proxy hasn't observed a failover since boot.
1877    #[serde(rename = "lastFailoverAt")]
1878    last_failover_at: Option<String>,
1879}
1880
1881#[derive(Serialize)]
1882struct PoolStatsResponse {
1883    node: String,
1884    active_connections: u64,
1885    idle_connections: u64,
1886    pending_requests: u64,
1887    total_connections_created: u64,
1888    total_connections_closed: u64,
1889}
1890
1891#[derive(Serialize)]
1892struct VersionResponse {
1893    version: String,
1894    build_time: String,
1895}
1896
1897#[cfg(test)]
1898mod tests {
1899    use super::*;
1900
1901    #[tokio::test]
1902    async fn test_admin_state_creation() {
1903        let state = AdminState::new();
1904        let sessions = state.active_sessions.read().await;
1905        assert_eq!(*sessions, 0);
1906    }
1907
1908    #[test]
1909    fn test_admin_authorized() {
1910        let h = |s: &str| vec!["GET /topology HTTP/1.1".to_string(), s.to_string()];
1911        assert!(AdminServer::admin_authorized(&h("Authorization: Bearer s3cret"), "s3cret"));
1912        // case-insensitive header name, exact token
1913        assert!(AdminServer::admin_authorized(&h("authorization: Bearer s3cret"), "s3cret"));
1914        // wrong token / wrong scheme / missing header all rejected
1915        assert!(!AdminServer::admin_authorized(&h("Authorization: Bearer nope"), "s3cret"));
1916        assert!(!AdminServer::admin_authorized(&h("Authorization: Basic s3cret"), "s3cret"));
1917        assert!(!AdminServer::admin_authorized(&["GET / HTTP/1.1".to_string()], "s3cret"));
1918    }
1919
1920    #[test]
1921    fn test_constant_time_eq_str() {
1922        assert!(constant_time_eq_str("abc", "abc"));
1923        assert!(!constant_time_eq_str("abc", "abd"));
1924        assert!(!constant_time_eq_str("abc", "abcd"));
1925    }
1926
1927    #[tokio::test]
1928    async fn test_readiness_check_no_nodes() {
1929        let state = Arc::new(AdminState::new());
1930        let ready = AdminServer::check_readiness(&state).await;
1931        assert!(!ready);
1932    }
1933
1934    #[tokio::test]
1935    async fn test_readiness_check_with_healthy_node() {
1936        let state = Arc::new(AdminState::new());
1937
1938        {
1939            let mut health = state.node_health.write().await;
1940            health.insert(
1941                "localhost:5432".to_string(),
1942                NodeHealth {
1943                    address: "localhost:5432".to_string(),
1944                    healthy: true,
1945                    last_check: chrono::Utc::now(),
1946                    failure_count: 0,
1947                    last_error: None,
1948                    latency_ms: 1.0,
1949                    replication_lag_bytes: None,
1950                },
1951            );
1952        }
1953
1954        let ready = AdminServer::check_readiness(&state).await;
1955        assert!(ready);
1956    }
1957
1958    #[tokio::test]
1959    async fn test_command_registration() {
1960        let state = AdminState::new();
1961
1962        state
1963            .register_command("test", |args| {
1964                Ok(format!("Test command with {} args", args.len()))
1965            })
1966            .await;
1967
1968        let result = state.execute_command("test", &["arg1", "arg2"]).await;
1969        assert!(result.is_ok());
1970        assert_eq!(result.unwrap(), "Test command with 2 args");
1971    }
1972
1973    #[tokio::test]
1974    async fn test_unknown_command() {
1975        let state = AdminState::new();
1976        let result = state.execute_command("unknown", &[]).await;
1977        assert!(result.is_err());
1978    }
1979
1980    #[test]
1981    fn test_prometheus_metrics_format() {
1982        let metrics = ServerMetricsSnapshot {
1983            connections_accepted: 100,
1984            connections_closed: 50,
1985            queries_processed: 1000,
1986            bytes_received: 50000,
1987            bytes_sent: 100000,
1988            failovers: 2,
1989        };
1990
1991        let output = AdminServer::format_prometheus_metrics(&metrics);
1992        assert!(output.contains("heliosdb_proxy_connections_total 100"));
1993        assert!(output.contains("heliosdb_proxy_queries_total 1000"));
1994        assert!(output.contains("heliosdb_proxy_failovers_total 2"));
1995    }
1996
1997    #[test]
1998    fn test_metrics_response_active_connections() {
1999        let snapshot = ServerMetricsSnapshot {
2000            connections_accepted: 100,
2001            connections_closed: 30,
2002            queries_processed: 500,
2003            bytes_received: 10000,
2004            bytes_sent: 20000,
2005            failovers: 1,
2006        };
2007
2008        let response = MetricsResponse::from(snapshot);
2009        assert_eq!(response.connections_active, 70);
2010    }
2011
2012    /// Helper: build an AdminState with the given (address, role,
2013    /// healthy) tuples seeded into config + node_health.
2014    async fn topology_state(
2015        nodes: &[(&str, &str, bool)],
2016    ) -> Arc<AdminState> {
2017        let state = Arc::new(AdminState::new());
2018        {
2019            let mut cfg = state.config_snapshot.write().await;
2020            cfg.nodes = nodes
2021                .iter()
2022                .map(|(addr, role, _)| NodeSnapshot {
2023                    address: (*addr).to_string(),
2024                    role: (*role).to_string(),
2025                    weight: 100,
2026                    enabled: true,
2027                })
2028                .collect();
2029        }
2030        {
2031            let mut health = state.node_health.write().await;
2032            for (addr, _, healthy) in nodes {
2033                health.insert(
2034                    (*addr).to_string(),
2035                    NodeHealth {
2036                        address: (*addr).to_string(),
2037                        healthy: *healthy,
2038                        last_check: chrono::Utc::now(),
2039                        failure_count: 0,
2040                        last_error: None,
2041                        latency_ms: 1.0,
2042                        replication_lag_bytes: None,
2043                    },
2044                );
2045            }
2046        }
2047        state
2048    }
2049
2050    #[tokio::test]
2051    async fn test_topology_returns_healthy_primary() {
2052        let state = topology_state(&[
2053            ("primary.svc:5432", "primary", true),
2054            ("standby-a.svc:5432", "standby", true),
2055            ("standby-b.svc:5432", "standby", false),
2056        ])
2057        .await;
2058
2059        let topo = AdminServer::compute_topology(&state).await;
2060        assert_eq!(topo.current_primary.as_deref(), Some("primary.svc:5432"));
2061        assert_eq!(topo.healthy_nodes, 2);
2062        assert_eq!(topo.unhealthy_nodes, 1);
2063        assert_eq!(topo.total_nodes, 3);
2064    }
2065
2066    #[tokio::test]
2067    async fn test_topology_no_primary_when_primary_unhealthy() {
2068        // Failover in progress: the configured primary is sick and
2069        // no other node has been promoted yet.
2070        let state = topology_state(&[
2071            ("primary.svc:5432", "primary", false),
2072            ("standby.svc:5432", "standby", true),
2073        ])
2074        .await;
2075
2076        let topo = AdminServer::compute_topology(&state).await;
2077        assert_eq!(topo.current_primary, None);
2078        assert_eq!(topo.healthy_nodes, 1);
2079        assert_eq!(topo.unhealthy_nodes, 1);
2080    }
2081
2082    #[tokio::test]
2083    async fn test_topology_handles_empty_cluster() {
2084        let state = Arc::new(AdminState::new());
2085        let topo = AdminServer::compute_topology(&state).await;
2086        assert_eq!(topo.current_primary, None);
2087        assert_eq!(topo.healthy_nodes, 0);
2088        assert_eq!(topo.unhealthy_nodes, 0);
2089        assert_eq!(topo.total_nodes, 0);
2090    }
2091
2092    #[tokio::test]
2093    async fn test_topology_role_match_is_case_insensitive() {
2094        let state = topology_state(&[
2095            ("primary.svc:5432", "PRIMARY", true),
2096        ])
2097        .await;
2098        let topo = AdminServer::compute_topology(&state).await;
2099        assert_eq!(topo.current_primary.as_deref(), Some("primary.svc:5432"));
2100    }
2101
2102    #[cfg(feature = "ha-tr")]
2103    #[tokio::test]
2104    async fn test_replay_returns_503_when_engine_unattached() {
2105        let state = Arc::new(AdminState::new());
2106        let body = r#"{
2107            "from": "2026-04-25T10:00:00Z",
2108            "to":   "2026-04-25T11:00:00Z",
2109            "target_host": "127.0.0.1",
2110            "target_port": 5432
2111        }"#;
2112        let (status, value) = AdminServer::handle_replay_request(Some(body), &state)
2113            .await
2114            .expect("handler returns Ok with status code");
2115        assert_eq!(status, 503);
2116        assert_eq!(value["error"], "replay engine not attached");
2117    }
2118
2119    #[cfg(feature = "ha-tr")]
2120    #[tokio::test]
2121    async fn test_replay_400_on_malformed_body() {
2122        let state = Arc::new(AdminState::new());
2123        let (status, _) = AdminServer::handle_replay_request(Some("not json"), &state)
2124            .await
2125            .expect("handler returns Ok with status code");
2126        assert_eq!(status, 400);
2127    }
2128
2129    #[cfg(feature = "ha-tr")]
2130    #[tokio::test]
2131    async fn test_replay_errors_on_empty_body() {
2132        let state = Arc::new(AdminState::new());
2133        let err = AdminServer::handle_replay_request(None, &state).await;
2134        assert!(err.is_err(), "empty body must surface as Err");
2135    }
2136
2137    #[cfg(feature = "wasm-plugins")]
2138    #[tokio::test]
2139    async fn test_plugins_list_returns_503_when_manager_unattached() {
2140        let state = Arc::new(AdminState::new());
2141        let (status, value) = AdminServer::handle_plugins_list(&state)
2142            .await
2143            .expect("handler returns Ok with status code");
2144        assert_eq!(status, 503);
2145        assert_eq!(value["error"], "plugin manager not attached");
2146    }
2147
2148    #[cfg(not(feature = "wasm-plugins"))]
2149    #[tokio::test]
2150    async fn test_plugins_list_503_without_feature() {
2151        let state = Arc::new(AdminState::new());
2152        let (status, _) = AdminServer::handle_plugins_list(&state)
2153            .await
2154            .expect("handler returns Ok");
2155        assert_eq!(status, 503);
2156    }
2157
2158    /// Helper: state with a single healthy node seeded into health.
2159    async fn chaos_state_with_node(addr: &str) -> Arc<AdminState> {
2160        let state = Arc::new(AdminState::new());
2161        state.node_health.write().await.insert(
2162            addr.to_string(),
2163            NodeHealth {
2164                address: addr.to_string(),
2165                healthy: true,
2166                last_check: chrono::Utc::now(),
2167                failure_count: 0,
2168                last_error: None,
2169                latency_ms: 1.0,
2170                replication_lag_bytes: None,
2171            },
2172        );
2173        state
2174    }
2175
2176    #[tokio::test]
2177    async fn test_chaos_force_unhealthy_flips_node_and_records_override() {
2178        let state = chaos_state_with_node("primary.svc:5432").await;
2179        let body = r#"{"action":"force_unhealthy","target_node":"primary.svc:5432"}"#;
2180        let (status, value) = AdminServer::handle_chaos_request(Some(body), &state)
2181            .await
2182            .expect("handler returns Ok");
2183        assert_eq!(status, 200);
2184        assert_eq!(value["applied"], "force_unhealthy");
2185        // Health flag flipped.
2186        assert!(!state.node_health.read().await["primary.svc:5432"].healthy);
2187        // Override recorded.
2188        assert!(state.chaos_overrides.read().await.contains_key("primary.svc:5432"));
2189    }
2190
2191    #[tokio::test]
2192    async fn test_chaos_restore_clears_override_and_flips_back() {
2193        let state = chaos_state_with_node("primary.svc:5432").await;
2194        let _ = AdminServer::handle_chaos_request(
2195            Some(r#"{"action":"force_unhealthy","target_node":"primary.svc:5432"}"#),
2196            &state,
2197        )
2198        .await
2199        .unwrap();
2200        let (status, _) = AdminServer::handle_chaos_request(
2201            Some(r#"{"action":"restore","target_node":"primary.svc:5432"}"#),
2202            &state,
2203        )
2204        .await
2205        .unwrap();
2206        assert_eq!(status, 200);
2207        assert!(state.node_health.read().await["primary.svc:5432"].healthy);
2208        assert!(state.chaos_overrides.read().await.is_empty());
2209    }
2210
2211    #[tokio::test]
2212    async fn test_chaos_reset_restores_all_overrides() {
2213        let state = chaos_state_with_node("a:5432").await;
2214        state.node_health.write().await.insert(
2215            "b:5432".to_string(),
2216            NodeHealth {
2217                address: "b:5432".to_string(),
2218                healthy: true,
2219                last_check: chrono::Utc::now(),
2220                failure_count: 0,
2221                last_error: None,
2222                latency_ms: 1.0,
2223                replication_lag_bytes: None,
2224            },
2225        );
2226        for addr in &["a:5432", "b:5432"] {
2227            let body = format!(r#"{{"action":"force_unhealthy","target_node":"{}"}}"#, addr);
2228            let _ = AdminServer::handle_chaos_request(Some(&body), &state)
2229                .await
2230                .unwrap();
2231        }
2232        let (status, value) = AdminServer::handle_chaos_request(
2233            Some(r#"{"action":"reset"}"#),
2234            &state,
2235        )
2236        .await
2237        .unwrap();
2238        assert_eq!(status, 200);
2239        assert_eq!(value["reset"], true);
2240        let restored = value["restored"].as_array().unwrap();
2241        assert_eq!(restored.len(), 2);
2242        // Both nodes back to healthy + overrides cleared.
2243        for addr in &["a:5432", "b:5432"] {
2244            assert!(state.node_health.read().await[*addr].healthy);
2245        }
2246        assert!(state.chaos_overrides.read().await.is_empty());
2247    }
2248
2249    #[tokio::test]
2250    async fn test_chaos_force_unhealthy_404s_when_node_unknown() {
2251        let state = Arc::new(AdminState::new());
2252        let body = r#"{"action":"force_unhealthy","target_node":"missing.svc:5432"}"#;
2253        let (status, _) = AdminServer::handle_chaos_request(Some(body), &state)
2254            .await
2255            .expect("handler returns Ok");
2256        assert_eq!(status, 404);
2257    }
2258
2259    #[tokio::test]
2260    async fn test_chaos_400_on_malformed_body() {
2261        let state = Arc::new(AdminState::new());
2262        let (status, _) = AdminServer::handle_chaos_request(Some("not json"), &state)
2263            .await
2264            .expect("handler returns Ok");
2265        assert_eq!(status, 400);
2266    }
2267
2268    #[tokio::test]
2269    async fn test_chaos_400_on_unknown_action() {
2270        let state = Arc::new(AdminState::new());
2271        let body = r#"{"action":"format_disk","target_node":"x"}"#;
2272        let (status, _) = AdminServer::handle_chaos_request(Some(body), &state)
2273            .await
2274            .expect("handler returns Ok");
2275        assert_eq!(status, 400);
2276    }
2277
2278    #[cfg(feature = "ha-tr")]
2279    #[tokio::test]
2280    async fn test_shadow_400_on_malformed_body() {
2281        let (status, _) = AdminServer::handle_shadow_request(Some("not json"))
2282            .await
2283            .expect("handler returns Ok");
2284        assert_eq!(status, 400);
2285    }
2286
2287    #[cfg(feature = "ha-tr")]
2288    #[tokio::test]
2289    async fn test_shadow_500_on_source_unreachable() {
2290        // Address that nothing is listening on (port 1 = tcpmux,
2291        // refused by everything reasonable).
2292        let body = r#"{
2293            "sql": "SELECT 1",
2294            "source_host": "127.0.0.1",
2295            "source_port": 1,
2296            "shadow_host": "127.0.0.1",
2297            "shadow_port": 1
2298        }"#;
2299        let (status, value) = AdminServer::handle_shadow_request(Some(body))
2300            .await
2301            .expect("handler returns Ok");
2302        assert_eq!(status, 500);
2303        let err = value["error"].as_str().expect("error field");
2304        assert!(
2305            err.contains("source connect"),
2306            "expected source connect error, got {}",
2307            err
2308        );
2309    }
2310
2311    #[cfg(feature = "ha-tr")]
2312    #[tokio::test]
2313    async fn test_shadow_errors_on_empty_body() {
2314        let err = AdminServer::handle_shadow_request(None).await;
2315        assert!(err.is_err(), "empty body must surface as Err");
2316    }
2317
2318    #[cfg(feature = "anomaly-detection")]
2319    #[tokio::test]
2320    async fn test_anomalies_returns_503_when_detector_unattached() {
2321        let state = Arc::new(AdminState::new());
2322        let (status, value) =
2323            AdminServer::handle_anomalies_list("/anomalies", &state)
2324                .await
2325                .expect("handler returns Ok");
2326        assert_eq!(status, 503);
2327        assert_eq!(value["error"], "anomaly detector not attached");
2328    }
2329
2330    #[cfg(feature = "anomaly-detection")]
2331    #[tokio::test]
2332    async fn test_anomalies_returns_attached_detector_events() {
2333        use crate::anomaly::{AnomalyConfig, AnomalyDetector, QueryObservation};
2334        let state = Arc::new(AdminState::new());
2335        let det = Arc::new(AnomalyDetector::new(AnomalyConfig::default()));
2336        // Seed a SQL injection event into the detector.
2337        let _ = det.record_query(&QueryObservation {
2338            tenant: "test".into(),
2339            fingerprint: "fp".into(),
2340            sql: "SELECT * FROM users WHERE id = 1 OR 1=1 --".into(),
2341            timestamp: std::time::Instant::now(),
2342        });
2343        state.with_anomaly_detector(det.clone()).await;
2344
2345        let (status, value) =
2346            AdminServer::handle_anomalies_list("/anomalies", &state)
2347                .await
2348                .expect("handler returns Ok");
2349        assert_eq!(status, 200);
2350        let count = value["count"].as_u64().expect("count field");
2351        assert!(count > 0, "expected at least one event, got {}", count);
2352    }
2353
2354    #[cfg(feature = "anomaly-detection")]
2355    #[tokio::test]
2356    async fn test_anomalies_limit_query_string_respected() {
2357        use crate::anomaly::{AnomalyConfig, AnomalyDetector, QueryObservation};
2358        let state = Arc::new(AdminState::new());
2359        let det = Arc::new(AnomalyDetector::new(AnomalyConfig::default()));
2360        for i in 0..50 {
2361            let fp = format!("fp{}", i);
2362            let _ = det.record_query(&QueryObservation {
2363                tenant: "test".into(),
2364                fingerprint: fp,
2365                sql: "SELECT 1".into(),
2366                timestamp: std::time::Instant::now(),
2367            });
2368        }
2369        state.with_anomaly_detector(det).await;
2370
2371        let (status, value) =
2372            AdminServer::handle_anomalies_list("/anomalies?limit=5", &state)
2373                .await
2374                .expect("handler returns Ok");
2375        assert_eq!(status, 200);
2376        assert_eq!(value["limit"].as_u64().unwrap(), 5);
2377        assert_eq!(value["events"].as_array().unwrap().len(), 5);
2378    }
2379
2380    #[cfg(feature = "anomaly-detection")]
2381    #[test]
2382    fn test_parse_limit_query_helper() {
2383        assert_eq!(parse_limit_query("/anomalies", 100, 1024), 100);
2384        assert_eq!(parse_limit_query("/anomalies?limit=42", 100, 1024), 42);
2385        assert_eq!(parse_limit_query("/anomalies?limit=99999", 100, 1024), 1024);
2386        assert_eq!(parse_limit_query("/anomalies?limit=abc", 100, 1024), 100);
2387        assert_eq!(parse_limit_query("/anomalies?other=x&limit=7", 100, 1024), 7);
2388    }
2389
2390    #[cfg(feature = "edge-proxy")]
2391    async fn edge_state() -> Arc<AdminState> {
2392        use crate::edge::{EdgeCache, EdgeRegistry};
2393        use std::time::Duration;
2394        let s = Arc::new(AdminState::new());
2395        let cache = Arc::new(EdgeCache::new(100));
2396        let registry = Arc::new(EdgeRegistry::new(8, Duration::from_secs(60)));
2397        s.with_edge(cache, registry).await;
2398        s
2399    }
2400
2401    #[cfg(feature = "edge-proxy")]
2402    #[tokio::test]
2403    async fn test_edge_status_returns_empty_lists_initially() {
2404        let s = edge_state().await;
2405        let (status, value) = AdminServer::handle_edge_status(&s)
2406            .await
2407            .expect("handler returns Ok");
2408        assert_eq!(status, 200);
2409        assert_eq!(value["edge_count"].as_u64().unwrap(), 0);
2410        assert_eq!(value["registered"].as_array().unwrap().len(), 0);
2411        assert!(value["cache"].is_object(), "cache stats present");
2412    }
2413
2414    #[cfg(feature = "edge-proxy")]
2415    #[tokio::test]
2416    async fn test_edge_register_then_status_lists_edge() {
2417        let s = edge_state().await;
2418        let body = r#"{"edge_id":"e1","region":"us-east","base_url":"https://e1.svc"}"#;
2419        let (status, _) = AdminServer::handle_edge_register(Some(body), &s)
2420            .await
2421            .expect("handler ok");
2422        assert_eq!(status, 201);
2423        let (status2, value2) = AdminServer::handle_edge_status(&s).await.unwrap();
2424        assert_eq!(status2, 200);
2425        assert_eq!(value2["edge_count"].as_u64().unwrap(), 1);
2426        assert_eq!(
2427            value2["registered"][0]["edge_id"].as_str().unwrap(),
2428            "e1"
2429        );
2430    }
2431
2432    #[cfg(feature = "edge-proxy")]
2433    #[tokio::test]
2434    async fn test_edge_register_400_on_malformed_body() {
2435        let s = edge_state().await;
2436        let (status, _) = AdminServer::handle_edge_register(Some("not json"), &s)
2437            .await
2438            .expect("handler ok");
2439        assert_eq!(status, 400);
2440    }
2441
2442    #[cfg(feature = "edge-proxy")]
2443    #[tokio::test]
2444    async fn test_edge_invalidate_drops_local_cache_entries() {
2445        use crate::edge::{CacheEntry, CacheKey};
2446        use std::time::{Duration, Instant};
2447        let s = edge_state().await;
2448        // Seed an entry into the local cache.
2449        let cache = s.edge_cache.read().await.clone().unwrap();
2450        cache.insert(
2451            CacheKey::new("fp1", "p1"),
2452            CacheEntry {
2453                version: 1,
2454                response_bytes: b"row".to_vec(),
2455                tables: vec!["users".into()],
2456                expires_at: Instant::now() + Duration::from_secs(60),
2457            },
2458        );
2459        assert!(cache.get(&CacheKey::new("fp1", "p1")).is_some());
2460
2461        let body = r#"{"tables":["users"]}"#;
2462        let (status, value) = AdminServer::handle_edge_invalidate(Some(body), &s)
2463            .await
2464            .expect("handler ok");
2465        assert_eq!(status, 200);
2466        assert_eq!(value["dropped_local"].as_u64().unwrap(), 1);
2467        assert!(cache.get(&CacheKey::new("fp1", "p1")).is_none());
2468    }
2469
2470    #[cfg(feature = "edge-proxy")]
2471    #[tokio::test]
2472    async fn test_edge_invalidate_503_when_cache_unattached() {
2473        let s = Arc::new(AdminState::new());
2474        let body = r#"{"tables":["users"]}"#;
2475        let (status, _) = AdminServer::handle_edge_invalidate(Some(body), &s)
2476            .await
2477            .expect("handler ok");
2478        assert_eq!(status, 503);
2479    }
2480}