Skip to main content

heliosdb_proxy/
admin.rs

1//! Admin API
2//!
3//! REST API for proxy management, monitoring, and configuration.
4//! Includes HTTP SQL API for transparent write routing (TWR) and load balancing.
5
6#[cfg(feature = "anomaly-detection")]
7use crate::anomaly::AnomalyDetector;
8use crate::config::{NodeConfig, NodeRole, ProxyConfig};
9#[cfg(feature = "edge-proxy")]
10use crate::edge::{EdgeCache, EdgeRegistry, InvalidationEvent};
11#[cfg(feature = "wasm-plugins")]
12use crate::plugins::PluginManager;
13#[cfg(feature = "ha-tr")]
14use crate::replay::{ReplayEngine, TimeTravelRequest};
15use crate::server::{NodeHealth, ServerMetricsSnapshot};
16use crate::{ProxyError, Result};
17#[cfg(feature = "ha-tr")]
18use chrono::{DateTime, Utc};
19use serde::{Deserialize, Serialize};
20use std::collections::HashMap;
21use std::net::SocketAddr;
22use std::sync::atomic::{AtomicUsize, Ordering};
23use std::sync::Arc;
24use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
25use tokio::net::TcpStream;
26use tokio::sync::{broadcast, RwLock};
27
28/// Static admin UI (vanilla HTML + JS). Compiled into the binary via
29/// `include_str!` so deployments are a single binary — no extra file
30/// serving or asset bundling. Served at `GET /` and `GET /ui`.
31const ADMIN_UI_HTML: &str = include_str!("admin_ui.html");
32
33/// Admin API server
34/// Constant-time string comparison (admin token check).
35fn constant_time_eq_str(a: &str, b: &str) -> bool {
36    let (a, b) = (a.as_bytes(), b.as_bytes());
37    if a.len() != b.len() {
38        return false;
39    }
40    let mut diff = 0u8;
41    for i in 0..a.len() {
42        diff |= a[i] ^ b[i];
43    }
44    diff == 0
45}
46
47pub struct AdminServer {
48    /// Listen address
49    listen_address: String,
50    /// Shared state with proxy
51    state: Arc<AdminState>,
52    /// Shutdown channel
53    shutdown_tx: broadcast::Sender<()>,
54}
55
56/// Shared admin state
57pub struct AdminState {
58    /// Node health status
59    pub node_health: RwLock<HashMap<String, NodeHealth>>,
60    /// Server metrics
61    pub metrics: RwLock<ServerMetricsSnapshot>,
62    /// Active sessions count
63    pub active_sessions: RwLock<u64>,
64    /// Configuration (read-only)
65    pub config_snapshot: RwLock<ConfigSnapshot>,
66    /// Full proxy config (for SQL routing)
67    pub proxy_config: RwLock<Option<ProxyConfig>>,
68    /// Round-robin counter for read load balancing
69    read_lb_counter: AtomicUsize,
70    /// Registered command handlers
71    commands: RwLock<HashMap<String, CommandHandler>>,
72    /// Connection pool manager (Session/Transaction/Statement modes).
73    /// Attached at startup; `/api/pools` returns real per-node pool
74    /// stats when present, an empty list otherwise.
75    #[cfg(feature = "pool-modes")]
76    pub pool_manager: RwLock<Option<Arc<crate::pool::ConnectionPoolManager>>>,
77    /// Circuit-breaker manager. Attached at startup; `/api/circuit` reports
78    /// each node's live circuit state (closed / open / half-open).
79    #[cfg(feature = "circuit-breaker")]
80    pub circuit_breaker: RwLock<Option<Arc<crate::circuit_breaker::CircuitBreakerManager>>>,
81    /// Time-travel replay engine. Optional so test fixtures don't have
82    /// to wire a backend template; production startup attaches it via
83    /// `with_replay_engine`. Endpoint returns 503 when missing.
84    #[cfg(feature = "ha-tr")]
85    pub replay_engine: RwLock<Option<Arc<ReplayEngine>>>,
86    /// WASM plugin manager. None when the proxy started without
87    /// plugins (or with a different feature set). `/plugins`
88    /// endpoint returns 503 when missing; UI panel says "no plugin
89    /// manager attached".
90    #[cfg(feature = "wasm-plugins")]
91    pub plugin_manager: RwLock<Option<Arc<PluginManager>>>,
92    /// Chaos-mode overrides: per-node-address marker that the chaos
93    /// system (POST /api/chaos) has forced this node to a particular
94    /// state. Lets the UI distinguish "operationally disabled" from
95    /// "chaos-injected fault" and lets `Reset` restore everything.
96    pub chaos_overrides: RwLock<HashMap<String, ChaosOverride>>,
97    /// Anomaly detector — same Arc the server populates from the
98    /// query path. /api/anomalies polls this for the recent-events
99    /// ring buffer.
100    #[cfg(feature = "anomaly-detection")]
101    pub anomaly_detector: RwLock<Option<Arc<AnomalyDetector>>>,
102    /// Query-analytics engine — same Arc the server records on from the query
103    /// path. `/api/analytics` reads top queries + slow-query log from it.
104    #[cfg(feature = "query-analytics")]
105    pub analytics: RwLock<Option<Arc<crate::analytics::QueryAnalytics>>>,
106    /// Edge proxy cache + registry. Cache surfaces stats; registry
107    /// is the home-side fanout for invalidations.
108    #[cfg(feature = "edge-proxy")]
109    pub edge_cache: RwLock<Option<Arc<EdgeCache>>>,
110    #[cfg(feature = "edge-proxy")]
111    pub edge_registry: RwLock<Option<Arc<EdgeRegistry>>>,
112    /// Bearer token required on admin requests (except liveness probes).
113    /// `None` = open. Set once at startup from `config.admin_token`.
114    pub auth_token: RwLock<Option<String>>,
115    /// Traffic-mirror / migration info for `/api/migration/status`. `Some`
116    /// when `[mirror] enabled`.
117    pub migration: RwLock<Option<MigrationInfo>>,
118    /// Branch-database config for `/api/branch`. `Some` when `[branch]
119    /// enabled`.
120    pub branch: RwLock<Option<crate::config::BranchConfig>>,
121}
122
123/// What the admin API needs to report migration status, without owning the
124/// mirror worker.
125#[derive(Clone)]
126pub struct MigrationInfo {
127    pub target: String,
128    pub writes_only: bool,
129    pub metrics: Arc<crate::mirror::MirrorMetrics>,
130    /// Mirror config (source + target) for snapshot bootstrap.
131    pub config: crate::config::MirrorConfig,
132    /// The proxy's cutover switch and the target to promote to.
133    pub cutover: Arc<arc_swap::ArcSwap<Option<Arc<crate::mirror::CutoverTarget>>>>,
134    pub cutover_target: crate::mirror::CutoverTarget,
135}
136
137/// Chaos override applied to a single node. Today only the
138/// `ForceUnhealthy` flavour is implemented — `inject_query_delay`
139/// is the natural follow-up but wants per-query interception that
140/// lives in the server message loop, not here.
141#[derive(Debug, Clone, Serialize)]
142pub struct ChaosOverride {
143    /// Wall-clock when the override was applied (RFC 3339).
144    pub since: String,
145    /// "force_unhealthy" | "delay_ms"
146    pub kind: String,
147    /// Free-form description shown in admin UI.
148    pub note: String,
149}
150
151/// Command handler type
152type CommandHandler = Arc<dyn Fn(&[&str]) -> Result<String> + Send + Sync>;
153
154/// Configuration snapshot for admin API
155#[derive(Debug, Clone, Serialize, Deserialize)]
156pub struct ConfigSnapshot {
157    pub listen_address: String,
158    pub admin_address: String,
159    pub tr_enabled: bool,
160    pub tr_mode: String,
161    pub pool_min_connections: usize,
162    pub pool_max_connections: usize,
163    pub nodes: Vec<NodeSnapshot>,
164}
165
166/// Node configuration snapshot
167#[derive(Debug, Clone, Serialize, Deserialize)]
168pub struct NodeSnapshot {
169    pub address: String,
170    pub role: String,
171    pub weight: u32,
172    pub enabled: bool,
173}
174
175impl AdminServer {
176    /// Create a new admin server
177    pub fn new(listen_address: String, state: Arc<AdminState>) -> Self {
178        let (shutdown_tx, _) = broadcast::channel(1);
179
180        Self {
181            listen_address,
182            state,
183            shutdown_tx,
184        }
185    }
186
187    /// Run the admin server
188    pub async fn run(&self) -> Result<()> {
189        // SO_REUSEPORT like the client listener, so a binary handoff can re-bind
190        // the admin address concurrently while the old process drains (Batch H).
191        let listener = crate::server::bind_reuseport(&self.listen_address)?;
192
193        tracing::info!(
194            "Admin API listening on {} (SO_REUSEPORT)",
195            self.listen_address
196        );
197
198        let mut shutdown_rx = self.shutdown_tx.subscribe();
199
200        loop {
201            tokio::select! {
202                accept_result = listener.accept() => {
203                    match accept_result {
204                        Ok((stream, addr)) => {
205                            let state = self.state.clone();
206                            tokio::spawn(async move {
207                                if let Err(e) = Self::handle_connection(stream, addr, state).await {
208                                    tracing::error!("Admin connection error: {}", e);
209                                }
210                            });
211                        }
212                        Err(e) => {
213                            tracing::error!("Admin accept error: {}", e);
214                        }
215                    }
216                }
217                _ = shutdown_rx.recv() => {
218                    tracing::info!("Admin server shutting down");
219                    break;
220                }
221            }
222        }
223
224        Ok(())
225    }
226
227    /// Handle an admin connection
228    async fn handle_connection(
229        mut stream: TcpStream,
230        addr: SocketAddr,
231        state: Arc<AdminState>,
232    ) -> Result<()> {
233        tracing::debug!("Admin connection from {}", addr);
234
235        let (reader, mut writer) = stream.split();
236        let mut reader = BufReader::new(reader);
237        let mut line = String::new();
238
239        // Read HTTP request headers
240        let mut headers = Vec::new();
241        let mut content_length: usize = 0;
242
243        loop {
244            line.clear();
245            let bytes_read = reader
246                .read_line(&mut line)
247                .await
248                .map_err(|e| ProxyError::Network(format!("Read error: {}", e)))?;
249
250            if bytes_read == 0 || line == "\r\n" {
251                break;
252            }
253
254            // Parse Content-Length header
255            let trimmed = line.trim();
256            if trimmed.to_lowercase().starts_with("content-length:") {
257                if let Some(len_str) = trimmed.split(':').nth(1) {
258                    content_length = len_str.trim().parse().unwrap_or(0);
259                }
260            }
261            headers.push(trimmed.to_string());
262        }
263
264        if headers.is_empty() {
265            return Ok(());
266        }
267
268        // Parse request line
269        let request_line = &headers[0];
270        let parts: Vec<&str> = request_line.split_whitespace().collect();
271
272        if parts.len() < 2 {
273            Self::send_response(&mut writer, 400, "Bad Request", "Invalid request line").await?;
274            return Ok(());
275        }
276
277        let method = parts[0];
278        let path = parts[1];
279
280        // Bearer-token gate. Liveness probes stay open so orchestrators can
281        // health-check without the token; everything else is rejected with
282        // 401 unless `Authorization: Bearer <token>` matches.
283        {
284            let required = state.auth_token.read().await.clone();
285            if let Some(token) = required {
286                let path_only = path.split('?').next().unwrap_or(path);
287                let is_liveness = method == "GET"
288                    && matches!(path_only, "/health" | "/healthz" | "/livez" | "/readyz");
289                if !is_liveness && !Self::admin_authorized(&headers, &token) {
290                    Self::send_response(
291                        &mut writer,
292                        401,
293                        "Unauthorized",
294                        "{\"error\":\"missing or invalid admin bearer token\"}",
295                    )
296                    .await?;
297                    return Ok(());
298                }
299            }
300        }
301
302        // Read request body for POST/PUT requests
303        let body = if content_length > 0 && (method == "POST" || method == "PUT") {
304            let mut body_buf = vec![0u8; content_length];
305            reader
306                .read_exact(&mut body_buf)
307                .await
308                .map_err(|e| ProxyError::Network(format!("Body read error: {}", e)))?;
309            Some(String::from_utf8_lossy(&body_buf).to_string())
310        } else {
311            None
312        };
313
314        // Static admin UI — single HTML file compiled into the binary.
315        // Served at `/` and `/ui`; all other routes remain JSON.
316        if method == "GET" && (path == "/" || path == "/ui" || path == "/ui/") {
317            Self::send_html_response(&mut writer, 200, ADMIN_UI_HTML).await?;
318            return Ok(());
319        }
320
321        // Route request
322        let response = Self::route_request(method, path, body.as_deref(), &state).await;
323
324        match response {
325            Ok((status, body)) => {
326                Self::send_json_response(&mut writer, status, &body).await?;
327            }
328            Err(e) => {
329                let error = ErrorResponse {
330                    error: e.to_string(),
331                };
332                Self::send_json_response(&mut writer, 500, &error).await?;
333            }
334        }
335
336        Ok(())
337    }
338
339    /// True if the request carries `Authorization: Bearer <token>` matching
340    /// the configured admin token (constant-time compare).
341    fn admin_authorized(headers: &[String], token: &str) -> bool {
342        let expected = format!("Bearer {}", token);
343        for h in headers {
344            let mut sp = h.splitn(2, ':');
345            let name = sp.next().unwrap_or("").trim();
346            if name.eq_ignore_ascii_case("authorization") {
347                let value = sp.next().unwrap_or("").trim();
348                return constant_time_eq_str(value, &expected);
349            }
350        }
351        false
352    }
353
354    /// Serve a text/html HTTP response. Used by the admin UI route.
355    async fn send_html_response(
356        writer: &mut tokio::net::tcp::WriteHalf<'_>,
357        status: u16,
358        html: &str,
359    ) -> Result<()> {
360        let status_text = match status {
361            200 => "OK",
362            404 => "Not Found",
363            _ => "Unknown",
364        };
365        let response = format!(
366            "HTTP/1.1 {} {}\r\nContent-Type: text/html; charset=utf-8\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
367            status,
368            status_text,
369            html.len(),
370            html
371        );
372        writer
373            .write_all(response.as_bytes())
374            .await
375            .map_err(|e| ProxyError::Network(format!("Write error: {}", e)))?;
376        Ok(())
377    }
378
379    /// Route a request to the appropriate handler
380    async fn route_request(
381        method: &str,
382        path: &str,
383        body: Option<&str>,
384        state: &Arc<AdminState>,
385    ) -> Result<(u16, serde_json::Value)> {
386        match (method, path) {
387            // SQL API - Execute SQL with TWR (Transparent Write Routing)
388            ("POST", "/api/sql") => Self::handle_sql_request(body, state).await,
389
390            // Health endpoints
391            ("GET", "/health") => {
392                let health = HealthResponse { status: "ok" };
393                Ok((200, serde_json::to_value(health)?))
394            }
395            ("GET", "/health/ready") => {
396                let ready = Self::check_readiness(state).await;
397                let response = ReadinessResponse {
398                    ready,
399                    message: if ready {
400                        "Proxy is ready"
401                    } else {
402                        "Proxy is not ready"
403                    },
404                };
405                let status = if ready { 200 } else { 503 };
406                Ok((status, serde_json::to_value(response)?))
407            }
408            ("GET", "/health/live") => {
409                let response = LivenessResponse { alive: true };
410                Ok((200, serde_json::to_value(response)?))
411            }
412
413            // Metrics
414            ("GET", "/metrics") => {
415                let metrics = state.metrics.read().await.clone();
416                Ok((200, serde_json::to_value(MetricsResponse::from(metrics))?))
417            }
418            ("GET", "/metrics/prometheus") => {
419                let metrics = state.metrics.read().await.clone();
420                let prometheus = Self::format_prometheus_metrics(&metrics);
421                Ok((200, serde_json::json!({ "text": prometheus })))
422            }
423
424            // Node management
425            ("GET", "/nodes") => {
426                let health = state.node_health.read().await;
427                let nodes: Vec<NodeHealthResponse> = health
428                    .values()
429                    .map(|h| NodeHealthResponse::from(h.clone()))
430                    .collect();
431                Ok((200, serde_json::to_value(nodes)?))
432            }
433            ("GET", path) if path.starts_with("/nodes/") => {
434                let node_addr = path.trim_start_matches("/nodes/");
435                let health = state.node_health.read().await;
436                match health.get(node_addr) {
437                    Some(h) => Ok((
438                        200,
439                        serde_json::to_value(NodeHealthResponse::from(h.clone()))?,
440                    )),
441                    None => Ok((404, serde_json::json!({ "error": "Node not found" }))),
442                }
443            }
444            ("POST", path) if path.starts_with("/nodes/") && path.ends_with("/enable") => {
445                let node_addr = path
446                    .trim_start_matches("/nodes/")
447                    .trim_end_matches("/enable");
448                Self::set_node_enabled(state, node_addr, true).await?;
449                Ok((200, serde_json::json!({ "status": "enabled" })))
450            }
451            ("POST", path) if path.starts_with("/nodes/") && path.ends_with("/disable") => {
452                let node_addr = path
453                    .trim_start_matches("/nodes/")
454                    .trim_end_matches("/disable");
455                Self::set_node_enabled(state, node_addr, false).await?;
456                Ok((200, serde_json::json!({ "status": "disabled" })))
457            }
458
459            // Topology — joins config (role) with node_health (healthy)
460            // so external controllers (operator, ops dashboards) can
461            // populate `currentPrimary` / `healthyNodes` /
462            // `unhealthyNodes` in one round-trip. Designed for
463            // poll-friendly use; never blocks.
464            ("GET", "/topology") => {
465                let topo = Self::compute_topology(state).await;
466                Ok((200, serde_json::to_value(topo)?))
467            }
468
469            // Time-travel replay — replays a journal window against a
470            // target backend (typically a staging DB). Body shape is
471            // `ReplayRequestBody` below.
472            #[cfg(feature = "ha-tr")]
473            ("POST", "/api/replay") => Self::handle_replay_request(body, state).await,
474            #[cfg(not(feature = "ha-tr"))]
475            ("POST", "/api/replay") => Ok((
476                503,
477                serde_json::json!({ "error": "ha-tr feature not compiled in" }),
478            )),
479
480            // Shadow execution (T3.4) — runs a query against a source
481            // backend AND a shadow backend, diffs the result. Used for
482            // major-version upgrade validation, schema-migration
483            // canaries, replica-drift detection. Body is
484            // `ShadowRequestBody`.
485            #[cfg(feature = "ha-tr")]
486            ("POST", "/api/shadow") => Self::handle_shadow_request(body).await,
487            #[cfg(not(feature = "ha-tr"))]
488            ("POST", "/api/shadow") => Ok((
489                503,
490                serde_json::json!({ "error": "ha-tr feature not compiled in" }),
491            )),
492
493            // Loaded WASM plugins — name, version, hooks, state,
494            // invocation count. Returns 503 when no plugin manager
495            // is attached (proxy started without --features
496            // wasm-plugins or with plugins disabled in config).
497            ("GET", "/plugins") => Self::handle_plugins_list(state).await,
498
499            // Anomaly detector recent-events feed (T3.1). Optional
500            // ?limit query string clamps the response size; default
501            // is 100 events newest-first.
502            #[cfg(feature = "anomaly-detection")]
503            ("GET", p) if p == "/anomalies" || p.starts_with("/anomalies?") => {
504                Self::handle_anomalies_list(p, state).await
505            }
506            #[cfg(not(feature = "anomaly-detection"))]
507            ("GET", p) if p == "/anomalies" || p.starts_with("/anomalies?") => Ok((
508                503,
509                serde_json::json!({ "error": "anomaly-detection feature not compiled in" }),
510            )),
511
512            // Query analytics: top queries by call count + slow-query log.
513            #[cfg(feature = "query-analytics")]
514            ("GET", p)
515                if p == "/api/analytics"
516                    || p == "/analytics"
517                    || p.starts_with("/api/analytics?")
518                    || p.starts_with("/analytics?") =>
519            {
520                Self::handle_analytics(p, state).await
521            }
522            #[cfg(not(feature = "query-analytics"))]
523            ("GET", p)
524                if p == "/api/analytics"
525                    || p == "/analytics"
526                    || p.starts_with("/api/analytics?")
527                    || p.starts_with("/analytics?") =>
528            {
529                Ok((
530                    503,
531                    serde_json::json!({ "error": "query-analytics feature not compiled in" }),
532                ))
533            }
534
535            // Edge mode (T3.2). Stats panel for the home; the home's
536            // registered edges + cache stats; and a manual
537            // invalidation endpoint for ops drills.
538            #[cfg(feature = "edge-proxy")]
539            ("GET", "/api/edge") => Self::handle_edge_status(state).await,
540            #[cfg(feature = "edge-proxy")]
541            ("POST", "/api/edge/register") => Self::handle_edge_register(body, state).await,
542            #[cfg(feature = "edge-proxy")]
543            ("POST", "/api/edge/invalidate") => Self::handle_edge_invalidate(body, state).await,
544            #[cfg(not(feature = "edge-proxy"))]
545            ("GET", "/api/edge")
546            | ("POST", "/api/edge/register")
547            | ("POST", "/api/edge/invalidate") => Ok((
548                503,
549                serde_json::json!({ "error": "edge-proxy feature not compiled in" }),
550            )),
551
552            // Chaos engineering — controlled fault injection for HA
553            // testing. Body is `ChaosRequestBody`; supported actions
554            // are `force_unhealthy` / `restore` / `reset`.
555            ("POST", "/api/chaos") => Self::handle_chaos_request(body, state).await,
556            // Read current overrides so the UI can show "what's
557            // currently broken on purpose".
558            ("GET", "/api/chaos") => {
559                let overrides = state.chaos_overrides.read().await.clone();
560                Ok((200, serde_json::to_value(overrides)?))
561            }
562
563            // Live per-node circuit-breaker state (closed / open / half-open)
564            // so an operator can see which backends the breaker has tripped.
565            #[cfg(feature = "circuit-breaker")]
566            ("GET", "/api/circuit") => Self::handle_circuit_status(state).await,
567            #[cfg(not(feature = "circuit-breaker"))]
568            ("GET", "/api/circuit") => Ok((
569                503,
570                serde_json::json!({ "error": "circuit-breaker feature not enabled" }),
571            )),
572
573            // Migration / traffic-mirror status
574            ("GET", "/api/migration/status") | ("GET", "/migration/status") => {
575                match state.migration.read().await.as_ref() {
576                    Some(info) => {
577                        let st =
578                            crate::mirror::status(&info.target, info.writes_only, &info.metrics);
579                        let mut v = serde_json::to_value(st)?;
580                        let cut = info.cutover.load_full().is_some();
581                        v["cutover_active"] = serde_json::json!(cut);
582                        Ok((200, v))
583                    }
584                    None => Ok((
585                        503,
586                        serde_json::json!({ "error": "traffic mirroring not enabled" }),
587                    )),
588                }
589            }
590
591            // Promote the mirror target to primary: new connections route there.
592            ("POST", "/api/migration/cutover") | ("POST", "/migration/cutover") => {
593                let info = state.migration.read().await.clone();
594                let Some(info) = info else {
595                    return Ok((
596                        503,
597                        serde_json::json!({ "error": "traffic mirroring not enabled" }),
598                    ));
599                };
600                let force = path.contains("force=true")
601                    || body.map(|b| b.contains("\"force\":true")).unwrap_or(false);
602                let st = crate::mirror::status(&info.target, info.writes_only, &info.metrics);
603                if !st.migration_ready && !force {
604                    return Ok((
605                        409,
606                        serde_json::json!({
607                            "ok": false,
608                            "error": "not migration_ready (backlog/drops present); pass force=true to override",
609                            "status": st,
610                        }),
611                    ));
612                }
613                info.cutover
614                    .store(Arc::new(Some(Arc::new(info.cutover_target.clone()))));
615                tracing::warn!(target = %info.cutover_target.addr, "migration cutover: new connections now route to the promoted target");
616                Ok((
617                    200,
618                    serde_json::json!({ "ok": true, "promoted_to": info.cutover_target.addr }),
619                ))
620            }
621
622            // Roll a cutover back to the original primary.
623            ("POST", "/api/migration/cutover/rollback")
624            | ("POST", "/migration/cutover/rollback") => {
625                let info = state.migration.read().await.clone();
626                let Some(info) = info else {
627                    return Ok((
628                        503,
629                        serde_json::json!({ "error": "traffic mirroring not enabled" }),
630                    ));
631                };
632                info.cutover.store(Arc::new(None));
633                Ok((200, serde_json::json!({ "ok": true, "rolled_back": true })))
634            }
635
636            // Snapshot-bootstrap named tables from the source into the mirror.
637            ("POST", "/api/migration/snapshot") | ("POST", "/migration/snapshot") => {
638                let info = state.migration.read().await.clone();
639                let Some(info) = info else {
640                    return Ok((
641                        503,
642                        serde_json::json!({ "error": "traffic mirroring not enabled" }),
643                    ));
644                };
645                let body = body.unwrap_or("{}");
646                let req: serde_json::Value = serde_json::from_str(body)
647                    .map_err(|e| ProxyError::Internal(format!("invalid JSON: {}", e)))?;
648                let tables: Vec<String> = req
649                    .get("tables")
650                    .and_then(|t| t.as_array())
651                    .map(|a| {
652                        a.iter()
653                            .filter_map(|v| v.as_str().map(String::from))
654                            .collect()
655                    })
656                    .unwrap_or_default();
657                if tables.is_empty() {
658                    return Ok((
659                        400,
660                        serde_json::json!({ "error": "provide a non-empty 'tables' array" }),
661                    ));
662                }
663                match crate::mirror::snapshot_tables(&info.config, &tables).await {
664                    Ok(rep) => {
665                        let total: u64 = rep.iter().map(|t| t.copied).sum();
666                        Ok((
667                            200,
668                            serde_json::json!({ "ok": true, "tables": rep, "rows_copied": total }),
669                        ))
670                    }
671                    Err(e) => Ok((500, serde_json::json!({ "ok": false, "error": e }))),
672                }
673            }
674
675            // Branch databases: list / create / drop.
676            ("GET", p) if p == "/api/branch" || p == "/branch" || p.starts_with("/api/branch?") => {
677                let cfg = state.branch.read().await.clone();
678                let Some(cfg) = cfg else {
679                    return Ok((
680                        503,
681                        serde_json::json!({ "error": "branch databases not enabled" }),
682                    ));
683                };
684                match crate::branch::list(&cfg).await {
685                    Ok(branches) => Ok((200, serde_json::json!({ "branches": branches }))),
686                    Err(e) => Ok((500, serde_json::json!({ "error": e }))),
687                }
688            }
689            ("POST", p) if p == "/api/branch" || p == "/branch" => {
690                let cfg = state.branch.read().await.clone();
691                let Some(cfg) = cfg else {
692                    return Ok((
693                        503,
694                        serde_json::json!({ "error": "branch databases not enabled" }),
695                    ));
696                };
697                let req: serde_json::Value = serde_json::from_str(body.unwrap_or("{}"))
698                    .map_err(|e| ProxyError::Internal(format!("invalid JSON: {}", e)))?;
699                let name = req.get("name").and_then(|v| v.as_str()).unwrap_or("");
700                if name.is_empty() {
701                    return Ok((400, serde_json::json!({ "error": "provide 'name'" })));
702                }
703                let base = req.get("base").and_then(|v| v.as_str());
704                match crate::branch::create(&cfg, name, base).await {
705                    Ok(()) => Ok((
706                        200,
707                        serde_json::json!({ "ok": true, "branch": name,
708                        "base": base.unwrap_or(&cfg.base_database) }),
709                    )),
710                    Err(e) => Ok((500, serde_json::json!({ "ok": false, "error": e }))),
711                }
712            }
713            ("DELETE", p) if p.starts_with("/api/branch") || p.starts_with("/branch") => {
714                let cfg = state.branch.read().await.clone();
715                let Some(cfg) = cfg else {
716                    return Ok((
717                        503,
718                        serde_json::json!({ "error": "branch databases not enabled" }),
719                    ));
720                };
721                let name = p.find("name=").map(|i| &p[i + 5..]).unwrap_or("");
722                if name.is_empty() {
723                    return Ok((
724                        400,
725                        serde_json::json!({ "error": "provide ?name=<branch>" }),
726                    ));
727                }
728                match crate::branch::drop(&cfg, name).await {
729                    Ok(()) => Ok((200, serde_json::json!({ "ok": true, "dropped": name }))),
730                    Err(e) => Ok((500, serde_json::json!({ "ok": false, "error": e }))),
731                }
732            }
733
734            // Configuration
735            ("GET", "/config") => {
736                let config = state.config_snapshot.read().await.clone();
737                Ok((200, serde_json::to_value(config)?))
738            }
739
740            // Sessions
741            ("GET", "/sessions") => {
742                let count = *state.active_sessions.read().await;
743                let response = SessionsResponse {
744                    active_sessions: count,
745                };
746                Ok((200, serde_json::to_value(response)?))
747            }
748
749            // Pools
750            ("GET", "/pools") => {
751                let pools = Self::get_pool_stats(state).await;
752                Ok((200, serde_json::to_value(pools)?))
753            }
754
755            // Version
756            ("GET", "/version") => {
757                let response = VersionResponse {
758                    version: crate::VERSION.to_string(),
759                    build_time: env!("CARGO_PKG_VERSION").to_string(),
760                };
761                Ok((200, serde_json::to_value(response)?))
762            }
763
764            // Not found
765            _ => Ok((404, serde_json::json!({ "error": "Not found" }))),
766        }
767    }
768
769    /// Handle SQL execution request with TWR (Transparent Write Routing)
770    async fn handle_sql_request(
771        body: Option<&str>,
772        state: &Arc<AdminState>,
773    ) -> Result<(u16, serde_json::Value)> {
774        // Parse request body
775        let body = body.ok_or_else(|| ProxyError::Internal("Missing request body".to_string()))?;
776        let request: SqlRequest = serde_json::from_str(body)
777            .map_err(|e| ProxyError::Internal(format!("Invalid JSON: {}", e)))?;
778
779        let sql = request.query.trim();
780        if sql.is_empty() {
781            return Ok((400, serde_json::json!({ "error": "Empty query" })));
782        }
783
784        // Classify query as read or write
785        let is_write = Self::is_write_query(sql);
786        let query_type = if is_write { "write" } else { "read" };
787
788        // Get proxy config
789        let proxy_config = state.proxy_config.read().await;
790        let config = proxy_config
791            .as_ref()
792            .ok_or_else(|| ProxyError::Internal("Proxy config not initialized".to_string()))?;
793
794        // Get node health
795        let health = state.node_health.read().await;
796
797        // Select target node based on query type
798        let target_node = if is_write {
799            // Write queries always go to primary
800            Self::select_primary_node(config, &health)?
801        } else {
802            // Read queries can go to any healthy node with load balancing
803            Self::select_read_node(config, &health, state)?
804        };
805
806        let target_address = format!("{}:{}", target_node.host, target_node.port);
807        // Use HTTP port from node config (defaults to 8080)
808        let http_port = target_node.http_port;
809        let http_url = format!("http://{}:{}/api/sql", target_node.host, http_port);
810
811        tracing::debug!(
812            "Routing {} query to {} ({})",
813            query_type,
814            target_address,
815            match target_node.role {
816                NodeRole::Primary => "primary",
817                NodeRole::Standby => "standby",
818                NodeRole::ReadReplica => "replica",
819            }
820        );
821
822        // Forward request to backend node
823        let result = Self::forward_sql_request(&http_url, sql).await?;
824
825        // Return result with routing metadata
826        let response = SqlResponse {
827            query_type: query_type.to_string(),
828            routed_to: target_address,
829            node_role: format!("{:?}", target_node.role).to_lowercase(),
830            result,
831        };
832
833        Ok((200, serde_json::to_value(response)?))
834    }
835
836    /// Determine if a query is a write operation
837    fn is_write_query(sql: &str) -> bool {
838        let upper = sql.trim().to_uppercase();
839
840        // Write operations
841        if upper.starts_with("INSERT")
842            || upper.starts_with("UPDATE")
843            || upper.starts_with("DELETE")
844            || upper.starts_with("CREATE")
845            || upper.starts_with("ALTER")
846            || upper.starts_with("DROP")
847            || upper.starts_with("TRUNCATE")
848            || upper.starts_with("GRANT")
849            || upper.starts_with("REVOKE")
850            || upper.starts_with("VACUUM")
851            || upper.starts_with("REINDEX")
852            || upper.starts_with("MERGE")
853            || upper.starts_with("UPSERT")
854        {
855            return true;
856        }
857
858        // Transaction control that might contain writes
859        if upper.starts_with("BEGIN")
860            || upper.starts_with("COMMIT")
861            || upper.starts_with("ROLLBACK")
862            || upper.starts_with("SAVEPOINT")
863        {
864            // Transaction control goes to primary for safety
865            return true;
866        }
867
868        // Read operations
869        false
870    }
871
872    /// Select primary node for write queries
873    fn select_primary_node<'a>(
874        config: &'a ProxyConfig,
875        health: &HashMap<String, NodeHealth>,
876    ) -> Result<&'a NodeConfig> {
877        config
878            .nodes
879            .iter()
880            .find(|n| {
881                n.role == NodeRole::Primary
882                    && n.enabled
883                    && health.get(&n.address()).map(|h| h.healthy).unwrap_or(false)
884            })
885            .ok_or_else(|| ProxyError::Internal("No healthy primary node available".to_string()))
886    }
887
888    /// Select node for read queries with load balancing
889    fn select_read_node<'a>(
890        config: &'a ProxyConfig,
891        health: &HashMap<String, NodeHealth>,
892        state: &AdminState,
893    ) -> Result<&'a NodeConfig> {
894        // Get all healthy nodes (primary, standby, or replica)
895        let healthy_nodes: Vec<&NodeConfig> = config
896            .nodes
897            .iter()
898            .filter(|n| n.enabled && health.get(&n.address()).map(|h| h.healthy).unwrap_or(false))
899            .collect();
900
901        if healthy_nodes.is_empty() {
902            return Err(ProxyError::Internal(
903                "No healthy nodes available".to_string(),
904            ));
905        }
906
907        // If read/write splitting is enabled and there are standbys, prefer them
908        if config.load_balancer.read_write_split {
909            let read_nodes: Vec<&NodeConfig> = healthy_nodes
910                .iter()
911                .filter(|n| n.role == NodeRole::Standby || n.role == NodeRole::ReadReplica)
912                .copied()
913                .collect();
914
915            if !read_nodes.is_empty() {
916                // Round-robin across read nodes
917                let counter = state.read_lb_counter.fetch_add(1, Ordering::Relaxed);
918                let index = counter % read_nodes.len();
919                return Ok(read_nodes[index]);
920            }
921        }
922
923        // Fall back to round-robin across all healthy nodes
924        let counter = state.read_lb_counter.fetch_add(1, Ordering::Relaxed);
925        let index = counter % healthy_nodes.len();
926        Ok(healthy_nodes[index])
927    }
928
929    /// Forward SQL request to backend node's HTTP API
930    async fn forward_sql_request(url: &str, sql: &str) -> Result<serde_json::Value> {
931        // Build HTTP request
932        let request_body = serde_json::json!({ "query": sql });
933        let body_bytes = serde_json::to_vec(&request_body)
934            .map_err(|e| ProxyError::Internal(format!("JSON serialization error: {}", e)))?;
935
936        // Parse URL
937        let url_parts: Vec<&str> = url.trim_start_matches("http://").splitn(2, '/').collect();
938        if url_parts.is_empty() {
939            return Err(ProxyError::Internal("Invalid URL".to_string()));
940        }
941
942        let host_port = url_parts[0];
943        let path = if url_parts.len() > 1 {
944            format!("/{}", url_parts[1])
945        } else {
946            "/".to_string()
947        };
948
949        // Connect to backend
950        let stream = TcpStream::connect(host_port).await.map_err(|e| {
951            ProxyError::Network(format!("Failed to connect to {}: {}", host_port, e))
952        })?;
953
954        let (reader, mut writer) = stream.into_split();
955        let mut reader = BufReader::new(reader);
956
957        // Send HTTP request
958        let request = format!(
959            "POST {} HTTP/1.1\r\nHost: {}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
960            path,
961            host_port,
962            body_bytes.len()
963        );
964
965        writer
966            .write_all(request.as_bytes())
967            .await
968            .map_err(|e| ProxyError::Network(format!("Write error: {}", e)))?;
969        writer
970            .write_all(&body_bytes)
971            .await
972            .map_err(|e| ProxyError::Network(format!("Write body error: {}", e)))?;
973
974        // Read response headers
975        let mut response_headers = Vec::new();
976        let mut line = String::new();
977        let mut content_length: usize = 0;
978
979        loop {
980            line.clear();
981            let bytes_read = reader
982                .read_line(&mut line)
983                .await
984                .map_err(|e| ProxyError::Network(format!("Response read error: {}", e)))?;
985
986            if bytes_read == 0 || line == "\r\n" {
987                break;
988            }
989
990            let trimmed = line.trim();
991            if trimmed.to_lowercase().starts_with("content-length:") {
992                if let Some(len_str) = trimmed.split(':').nth(1) {
993                    content_length = len_str.trim().parse().unwrap_or(0);
994                }
995            }
996            response_headers.push(trimmed.to_string());
997        }
998
999        // Read response body
1000        let mut body_buf = vec![0u8; content_length];
1001        if content_length > 0 {
1002            reader
1003                .read_exact(&mut body_buf)
1004                .await
1005                .map_err(|e| ProxyError::Network(format!("Response body read error: {}", e)))?;
1006        }
1007
1008        let response_body = String::from_utf8_lossy(&body_buf);
1009
1010        // Parse JSON response
1011        serde_json::from_str(&response_body).map_err(|e| {
1012            ProxyError::Internal(format!(
1013                "Invalid JSON response: {} - body: {}",
1014                e, response_body
1015            ))
1016        })
1017    }
1018
1019    /// Check if proxy is ready to accept connections
1020    async fn check_readiness(state: &Arc<AdminState>) -> bool {
1021        let health = state.node_health.read().await;
1022
1023        // Need at least one healthy primary
1024        health.values().any(|h| h.healthy)
1025    }
1026
1027    /// Set node enabled status
1028    async fn set_node_enabled(
1029        state: &Arc<AdminState>,
1030        node_addr: &str,
1031        enabled: bool,
1032    ) -> Result<()> {
1033        let mut health = state.node_health.write().await;
1034
1035        if let Some(node_health) = health.get_mut(node_addr) {
1036            node_health.healthy = enabled;
1037            Ok(())
1038        } else {
1039            Err(ProxyError::Config(format!("Node not found: {}", node_addr)))
1040        }
1041    }
1042
1043    /// Get pool statistics
1044    async fn get_pool_stats(_state: &Arc<AdminState>) -> Vec<PoolStatsResponse> {
1045        // Real per-node pool stats from the attached pool manager. Returns
1046        // an empty list when pool-modes is off or no manager is attached.
1047        #[cfg(feature = "pool-modes")]
1048        if let Some(mgr) = _state.pool_manager.read().await.clone() {
1049            let stats = mgr.get_stats().await;
1050            return stats
1051                .node_stats
1052                .into_iter()
1053                .map(|ns| PoolStatsResponse {
1054                    node: ns.node_id.0.to_string(),
1055                    active_connections: ns.active as u64,
1056                    idle_connections: ns.idle as u64,
1057                    // Per-node pending/created/closed counters are not tracked
1058                    // separately by the manager; total is the live pool size.
1059                    pending_requests: 0,
1060                    total_connections_created: ns.total as u64,
1061                    total_connections_closed: 0,
1062                })
1063                .collect();
1064        }
1065        Vec::new()
1066    }
1067
1068    /// Handle `POST /api/replay`. Body is a JSON `ReplayRequestBody`.
1069    /// Returns 503 when no replay engine is attached, 400 on a malformed
1070    /// body or inverted window, 200 with `ReplaySummary` on success.
1071    #[cfg(feature = "ha-tr")]
1072    async fn handle_replay_request(
1073        body: Option<&str>,
1074        state: &Arc<AdminState>,
1075    ) -> Result<(u16, serde_json::Value)> {
1076        let raw =
1077            body.ok_or_else(|| ProxyError::Internal("replay: empty request body".to_string()))?;
1078        let req: ReplayRequestBody = match serde_json::from_str(raw) {
1079            Ok(r) => r,
1080            Err(e) => {
1081                return Ok((
1082                    400,
1083                    serde_json::json!({ "error": format!("invalid body: {}", e) }),
1084                ));
1085            }
1086        };
1087        let engine = match state.replay_engine.read().await.clone() {
1088            Some(e) => e,
1089            None => {
1090                return Ok((
1091                    503,
1092                    serde_json::json!({ "error": "replay engine not attached" }),
1093                ));
1094            }
1095        };
1096        let tt = TimeTravelRequest {
1097            from: req.from,
1098            to: req.to,
1099            target_host: req.target_host,
1100            target_port: req.target_port,
1101            target_user: req.target_user,
1102            target_password: req.target_password,
1103            target_database: req.target_database,
1104        };
1105        match engine.replay_window(&tt).await {
1106            Ok(summary) => Ok((200, serde_json::to_value(summary)?)),
1107            Err(e) => Ok((
1108                500,
1109                serde_json::json!({ "error": format!("replay failed: {}", e) }),
1110            )),
1111        }
1112    }
1113
1114    /// `GET /api/edge` — surfaces edge-mode state: cache stats +
1115    /// the list of registered edges (when running in home mode).
1116    #[cfg(feature = "edge-proxy")]
1117    async fn handle_edge_status(state: &Arc<AdminState>) -> Result<(u16, serde_json::Value)> {
1118        let cache_stats = state.edge_cache.read().await.clone().map(|c| c.stats());
1119        let edges = match state.edge_registry.read().await.clone() {
1120            Some(r) => r.list(),
1121            None => Vec::new(),
1122        };
1123        Ok((
1124            200,
1125            serde_json::json!({
1126                "cache":          cache_stats,
1127                "registered":     edges,
1128                "edge_count":     edges.len(),
1129            }),
1130        ))
1131    }
1132
1133    /// `POST /api/edge/register` — edges call this once at boot to
1134    /// announce themselves to the home. Body shape:
1135    /// `{"edge_id":"e1","region":"us-east","base_url":"https://e1"}`.
1136    /// Returns 201 with the assigned slot, 503 when registry full.
1137    #[cfg(feature = "edge-proxy")]
1138    async fn handle_edge_register(
1139        body: Option<&str>,
1140        state: &Arc<AdminState>,
1141    ) -> Result<(u16, serde_json::Value)> {
1142        let raw =
1143            body.ok_or_else(|| ProxyError::Internal("edge register: empty body".to_string()))?;
1144        let req: EdgeRegisterBody = match serde_json::from_str(raw) {
1145            Ok(r) => r,
1146            Err(e) => {
1147                return Ok((
1148                    400,
1149                    serde_json::json!({ "error": format!("invalid body: {}", e) }),
1150                ));
1151            }
1152        };
1153        let registry = match state.edge_registry.read().await.clone() {
1154            Some(r) => r,
1155            None => {
1156                return Ok((
1157                    503,
1158                    serde_json::json!({ "error": "edge registry not attached" }),
1159                ));
1160            }
1161        };
1162        let now = chrono::Utc::now().to_rfc3339();
1163        match registry.register(&req.edge_id, &req.region, &req.base_url, &now) {
1164            Ok(_rx) => {
1165                // Receiver dropped here — in production the SSE
1166                // handler keeps it alive for the connection's
1167                // lifetime. For the JSON endpoint, we acknowledge
1168                // the registration and the edge polls /api/edge for
1169                // invalidations until SSE is wired.
1170                Ok((
1171                    201,
1172                    serde_json::json!({
1173                        "edge_id":  req.edge_id,
1174                        "region":   req.region,
1175                        "base_url": req.base_url,
1176                        "registered_at": now,
1177                    }),
1178                ))
1179            }
1180            Err(e) => Ok((503, serde_json::json!({ "error": e.to_string() }))),
1181        }
1182    }
1183
1184    /// `POST /api/edge/invalidate` — manual invalidation for ops
1185    /// drills. The proxy normally fans out invalidations
1186    /// automatically on writes; this endpoint is for "I just ran
1187    /// a migration outside the proxy, please drop caches".
1188    /// Body: `{"tables":["users"],"up_to_version":null}` — null
1189    /// version means "use the cache's current version" (drop all).
1190    #[cfg(feature = "edge-proxy")]
1191    async fn handle_edge_invalidate(
1192        body: Option<&str>,
1193        state: &Arc<AdminState>,
1194    ) -> Result<(u16, serde_json::Value)> {
1195        let raw =
1196            body.ok_or_else(|| ProxyError::Internal("edge invalidate: empty body".to_string()))?;
1197        let req: EdgeInvalidateBody = match serde_json::from_str(raw) {
1198            Ok(r) => r,
1199            Err(e) => {
1200                return Ok((
1201                    400,
1202                    serde_json::json!({ "error": format!("invalid body: {}", e) }),
1203                ));
1204            }
1205        };
1206        let cache = match state.edge_cache.read().await.clone() {
1207            Some(c) => c,
1208            None => {
1209                return Ok((
1210                    503,
1211                    serde_json::json!({ "error": "edge cache not attached" }),
1212                ));
1213            }
1214        };
1215        let registry = match state.edge_registry.read().await.clone() {
1216            Some(r) => r,
1217            None => {
1218                return Ok((
1219                    503,
1220                    serde_json::json!({ "error": "edge registry not attached" }),
1221                ));
1222            }
1223        };
1224        let version = req.up_to_version.unwrap_or_else(|| cache.next_version());
1225        // Local cache invalidation (home-side cache, if any).
1226        let dropped_local = cache.invalidate(version, &req.tables);
1227        // Fan out to every registered edge.
1228        let ev = InvalidationEvent {
1229            up_to_version: version,
1230            tables: req.tables.clone(),
1231            committed_at: chrono::Utc::now().to_rfc3339(),
1232        };
1233        let (sent, pruned) = registry.broadcast(ev).await;
1234        Ok((
1235            200,
1236            serde_json::json!({
1237                "version":         version,
1238                "tables":          req.tables,
1239                "dropped_local":   dropped_local,
1240                "edges_notified":  sent,
1241                "edges_pruned":    pruned,
1242            }),
1243        ))
1244    }
1245
1246    /// Handle `GET /anomalies`. Returns the anomaly detector's
1247    /// recent-events ring buffer as JSON. Optional `?limit=N`
1248    /// query string clamps the response size (default 100, max 1024).
1249    /// Returns 503 when the detector hasn't been attached.
1250    #[cfg(feature = "anomaly-detection")]
1251    async fn handle_anomalies_list(
1252        path: &str,
1253        state: &Arc<AdminState>,
1254    ) -> Result<(u16, serde_json::Value)> {
1255        let limit = parse_limit_query(path, 100, 1024);
1256        let det = match state.anomaly_detector.read().await.clone() {
1257            Some(d) => d,
1258            None => {
1259                return Ok((
1260                    503,
1261                    serde_json::json!({ "error": "anomaly detector not attached" }),
1262                ));
1263            }
1264        };
1265        let events = det.recent_events(limit);
1266        Ok((
1267            200,
1268            serde_json::json!({
1269                "count":     events.len(),
1270                "limit":     limit,
1271                "events":    events,
1272                "buffer_total": det.event_count(),
1273            }),
1274        ))
1275    }
1276
1277    /// `GET /api/analytics` — top queries by call count plus the slow-query
1278    /// count. Returns 503 when analytics is not attached/enabled.
1279    #[cfg(feature = "query-analytics")]
1280    async fn handle_analytics(
1281        path: &str,
1282        state: &Arc<AdminState>,
1283    ) -> Result<(u16, serde_json::Value)> {
1284        use crate::analytics::OrderBy;
1285        let limit = parse_limit_query(path, 50, 1024);
1286        let a = match state.analytics.read().await.clone() {
1287            Some(a) => a,
1288            None => {
1289                return Ok((503, serde_json::json!({ "error": "analytics not enabled" })));
1290            }
1291        };
1292        let top: Vec<serde_json::Value> = a
1293            .top_queries(OrderBy::Calls, limit)
1294            .into_iter()
1295            .map(|s| {
1296                serde_json::json!({
1297                    "fingerprint": s.fingerprint_hash,
1298                    "normalized":  s.normalized,
1299                    "calls":       s.calls,
1300                    "avg_ms":      s.avg_time.as_secs_f64() * 1000.0,
1301                    "p99_ms":      s.p99.as_secs_f64() * 1000.0,
1302                    "rows":        s.rows,
1303                    "errors":      s.errors,
1304                })
1305            })
1306            .collect();
1307        let slow_count = a.slow_queries(limit).len();
1308        Ok((
1309            200,
1310            serde_json::json!({
1311                "limit":            limit,
1312                "top_queries":      top,
1313                "slow_query_count": slow_count,
1314            }),
1315        ))
1316    }
1317
1318    /// Handle `POST /api/shadow`. Body is a JSON `ShadowRequestBody`.
1319    /// Connects to both source and shadow backends, runs the SQL on
1320    /// each, returns a `ShadowExecuteReport` with the diff.
1321    ///
1322    /// Status codes:
1323    ///   200 — both sides ran (report carries pass/fail details)
1324    ///   400 — malformed body
1325    ///   500 — source connect failure (shadow connect failures end up
1326    ///         in the report rather than the HTTP status)
1327    #[cfg(feature = "ha-tr")]
1328    async fn handle_shadow_request(body: Option<&str>) -> Result<(u16, serde_json::Value)> {
1329        use crate::backend::{
1330            tls::default_client_config, BackendClient, BackendConfig, ParamValue, TlsMode,
1331        };
1332        use crate::shadow_execute::shadow_execute;
1333
1334        let raw =
1335            body.ok_or_else(|| ProxyError::Internal("shadow: empty request body".to_string()))?;
1336        let req: ShadowRequestBody = match serde_json::from_str(raw) {
1337            Ok(r) => r,
1338            Err(e) => {
1339                return Ok((
1340                    400,
1341                    serde_json::json!({ "error": format!("invalid body: {}", e) }),
1342                ));
1343            }
1344        };
1345
1346        // Build the two configs from the request. TLS off + 5s
1347        // connect / 30s query timeouts mirror the replay defaults.
1348        let mk_cfg = |host: String,
1349                      port: u16,
1350                      user: Option<String>,
1351                      password: Option<String>,
1352                      database: Option<String>| BackendConfig {
1353            host,
1354            port,
1355            user: user.unwrap_or_else(|| "postgres".into()),
1356            password,
1357            database,
1358            application_name: Some("heliosdb-proxy-shadow".into()),
1359            tls_mode: TlsMode::Disable,
1360            connect_timeout: std::time::Duration::from_secs(5),
1361            query_timeout: std::time::Duration::from_secs(30),
1362            tls_config: default_client_config(),
1363        };
1364        let source_cfg = mk_cfg(
1365            req.source_host,
1366            req.source_port,
1367            req.source_user,
1368            req.source_password,
1369            req.source_database,
1370        );
1371        let shadow_cfg = mk_cfg(
1372            req.shadow_host,
1373            req.shadow_port,
1374            req.shadow_user,
1375            req.shadow_password,
1376            req.shadow_database,
1377        );
1378
1379        // Connect to source. Connect failure here is a real HTTP
1380        // error since we can't even attempt the diff; shadow connect
1381        // failures land inside the report as `shadow_error`.
1382        let mut source = match BackendClient::connect(&source_cfg).await {
1383            Ok(c) => c,
1384            Err(e) => {
1385                return Ok((
1386                    500,
1387                    serde_json::json!({ "error": format!("source connect: {}", e) }),
1388                ));
1389            }
1390        };
1391
1392        let params: Vec<ParamValue> = req
1393            .params
1394            .unwrap_or_default()
1395            .into_iter()
1396            .map(ParamValue::Text)
1397            .collect();
1398
1399        let outcome = shadow_execute(&mut source, &shadow_cfg, &req.sql, &params).await;
1400        source.close().await;
1401
1402        match outcome {
1403            Ok((_qr, report)) => Ok((
1404                200,
1405                serde_json::json!({
1406                    "sql":                report.sql,
1407                    "both_succeeded":     report.both_succeeded,
1408                    "row_count_match":    report.row_count_match,
1409                    "row_hash_match":     report.row_hash_match,
1410                    "primary_elapsed_us": report.primary_elapsed_us,
1411                    "shadow_elapsed_us":  report.shadow_elapsed_us,
1412                    "primary_error":      report.primary_error,
1413                    "shadow_error":       report.shadow_error,
1414                    "is_clean":           report.is_clean(),
1415                }),
1416            )),
1417            Err(e) => Ok((
1418                500,
1419                serde_json::json!({ "error": format!("shadow execute: {}", e) }),
1420            )),
1421        }
1422    }
1423
1424    /// Handle `POST /api/chaos`. Body is a JSON `ChaosRequestBody`.
1425    ///
1426    /// Supported actions (intentionally narrow — the goal is "test
1427    /// the failover machinery without external chaos tooling", not
1428    /// "ship a kitchen-sink fault injector"):
1429    ///
1430    ///   force_unhealthy { target_node }  — flip the node's health flag
1431    ///                                      to false; the failover
1432    ///                                      controller observes this and
1433    ///                                      reroutes traffic.
1434    ///   restore         { target_node }  — flip the node's health flag
1435    ///                                      back to true and clear the
1436    ///                                      override entry.
1437    ///   reset                            — restore every overridden
1438    ///                                      node in one call.
1439    /// `GET /api/circuit` — live per-node circuit-breaker state. Reports each
1440    /// configured node's breaker as `closed` / `open` / `half_open` so an
1441    /// operator can see which backends the breaker has tripped out of rotation.
1442    #[cfg(feature = "circuit-breaker")]
1443    async fn handle_circuit_status(state: &Arc<AdminState>) -> Result<(u16, serde_json::Value)> {
1444        let mgr = match state.circuit_breaker.read().await.clone() {
1445            Some(m) => m,
1446            None => {
1447                return Ok((
1448                    503,
1449                    serde_json::json!({ "error": "circuit breaker not attached" }),
1450                ))
1451            }
1452        };
1453        let nodes = state.config_snapshot.read().await.nodes.clone();
1454        let circuits: Vec<serde_json::Value> = nodes
1455            .iter()
1456            .map(|n| {
1457                let st = mgr.get_breaker(&n.address).get_state();
1458                serde_json::json!({
1459                    "node": n.address,
1460                    "state": format!("{:?}", st).to_lowercase(),
1461                })
1462            })
1463            .collect();
1464        Ok((200, serde_json::json!({ "circuits": circuits })))
1465    }
1466
1467    async fn handle_chaos_request(
1468        body: Option<&str>,
1469        state: &Arc<AdminState>,
1470    ) -> Result<(u16, serde_json::Value)> {
1471        let raw =
1472            body.ok_or_else(|| ProxyError::Internal("chaos: empty request body".to_string()))?;
1473        let action: ChaosAction = match serde_json::from_str(raw) {
1474            Ok(a) => a,
1475            Err(e) => {
1476                return Ok((
1477                    400,
1478                    serde_json::json!({ "error": format!("invalid body: {}", e) }),
1479                ));
1480            }
1481        };
1482        match action {
1483            ChaosAction::ForceUnhealthy { target_node } => {
1484                if let Err(e) = Self::set_node_enabled(state, &target_node, false).await {
1485                    return Ok((404, serde_json::json!({ "error": e.to_string() })));
1486                }
1487                state.chaos_overrides.write().await.insert(
1488                    target_node.clone(),
1489                    ChaosOverride {
1490                        since: chrono::Utc::now().to_rfc3339(),
1491                        kind: "force_unhealthy".to_string(),
1492                        note: "forced unhealthy via chaos endpoint".to_string(),
1493                    },
1494                );
1495                Ok((
1496                    200,
1497                    serde_json::json!({
1498                        "applied":     "force_unhealthy",
1499                        "target_node": target_node,
1500                    }),
1501                ))
1502            }
1503            ChaosAction::Restore { target_node } => {
1504                if let Err(e) = Self::set_node_enabled(state, &target_node, true).await {
1505                    return Ok((404, serde_json::json!({ "error": e.to_string() })));
1506                }
1507                state.chaos_overrides.write().await.remove(&target_node);
1508                Ok((
1509                    200,
1510                    serde_json::json!({
1511                        "restored":    target_node,
1512                    }),
1513                ))
1514            }
1515            ChaosAction::Reset => {
1516                let overrides: Vec<String> =
1517                    state.chaos_overrides.read().await.keys().cloned().collect();
1518                let mut restored = Vec::with_capacity(overrides.len());
1519                for addr in overrides {
1520                    let _ = Self::set_node_enabled(state, &addr, true).await;
1521                    restored.push(addr);
1522                }
1523                state.chaos_overrides.write().await.clear();
1524                Ok((
1525                    200,
1526                    serde_json::json!({
1527                        "reset":      true,
1528                        "restored":   restored,
1529                    }),
1530                ))
1531            }
1532        }
1533    }
1534
1535    /// Handle `GET /plugins`. Returns 503 when no plugin manager is
1536    /// attached, 200 with `Vec<PluginListEntry>` otherwise. Building
1537    /// the response in admin.rs (rather than serialising
1538    /// `plugins::PluginInfo` directly) keeps the plugins module
1539    /// independent of serde — only the wire shape lives here.
1540    #[cfg(feature = "wasm-plugins")]
1541    async fn handle_plugins_list(state: &Arc<AdminState>) -> Result<(u16, serde_json::Value)> {
1542        let pm = match state.plugin_manager.read().await.clone() {
1543            Some(p) => p,
1544            None => {
1545                return Ok((
1546                    503,
1547                    serde_json::json!({ "error": "plugin manager not attached" }),
1548                ));
1549            }
1550        };
1551        let plugins: Vec<PluginListEntry> = pm
1552            .list_plugins()
1553            .into_iter()
1554            .map(|info| PluginListEntry {
1555                name: info.name,
1556                version: info.version,
1557                description: info.description,
1558                hooks: info
1559                    .hooks
1560                    .iter()
1561                    .map(|h| h.export_name().to_string())
1562                    .collect(),
1563                state: format!("{:?}", info.state),
1564                invocations: info.stats.total_calls,
1565                errors: info.stats.error_count,
1566            })
1567            .collect();
1568        Ok((200, serde_json::to_value(plugins)?))
1569    }
1570
1571    #[cfg(not(feature = "wasm-plugins"))]
1572    async fn handle_plugins_list(_state: &Arc<AdminState>) -> Result<(u16, serde_json::Value)> {
1573        Ok((
1574            503,
1575            serde_json::json!({ "error": "wasm-plugins feature not compiled in" }),
1576        ))
1577    }
1578
1579    /// Compute the joined topology view used by `/topology`.
1580    ///
1581    /// `currentPrimary` is the address of the first node whose role
1582    /// is "primary" and whose health entry is `healthy = true`. None
1583    /// is the legitimate answer when failover is in progress.
1584    async fn compute_topology(state: &Arc<AdminState>) -> TopologyResponse {
1585        let health = state.node_health.read().await;
1586        let cfg = state.config_snapshot.read().await;
1587
1588        let mut current_primary: Option<String> = None;
1589        for n in &cfg.nodes {
1590            if n.role.eq_ignore_ascii_case("primary") {
1591                let healthy = health.get(&n.address).map(|h| h.healthy).unwrap_or(false);
1592                if healthy {
1593                    current_primary = Some(n.address.clone());
1594                    break;
1595                }
1596            }
1597        }
1598
1599        let healthy_nodes = health.values().filter(|h| h.healthy).count() as u32;
1600        let unhealthy_nodes = health.values().filter(|h| !h.healthy).count() as u32;
1601        let total_nodes = cfg.nodes.len() as u32;
1602
1603        TopologyResponse {
1604            current_primary,
1605            healthy_nodes,
1606            unhealthy_nodes,
1607            total_nodes,
1608            last_failover_at: None,
1609        }
1610    }
1611
1612    /// Format metrics as Prometheus text format
1613    fn format_prometheus_metrics(metrics: &ServerMetricsSnapshot) -> String {
1614        let mut output = String::new();
1615
1616        output.push_str("# HELP heliosdb_proxy_connections_total Total connections accepted\n");
1617        output.push_str("# TYPE heliosdb_proxy_connections_total counter\n");
1618        output.push_str(&format!(
1619            "heliosdb_proxy_connections_total {}\n",
1620            metrics.connections_accepted
1621        ));
1622
1623        output.push_str("# HELP heliosdb_proxy_connections_closed Total connections closed\n");
1624        output.push_str("# TYPE heliosdb_proxy_connections_closed counter\n");
1625        output.push_str(&format!(
1626            "heliosdb_proxy_connections_closed {}\n",
1627            metrics.connections_closed
1628        ));
1629
1630        output.push_str("# HELP heliosdb_proxy_queries_total Total queries processed\n");
1631        output.push_str("# TYPE heliosdb_proxy_queries_total counter\n");
1632        output.push_str(&format!(
1633            "heliosdb_proxy_queries_total {}\n",
1634            metrics.queries_processed
1635        ));
1636
1637        output.push_str("# HELP heliosdb_proxy_bytes_received_total Total bytes received\n");
1638        output.push_str("# TYPE heliosdb_proxy_bytes_received_total counter\n");
1639        output.push_str(&format!(
1640            "heliosdb_proxy_bytes_received_total {}\n",
1641            metrics.bytes_received
1642        ));
1643
1644        output.push_str("# HELP heliosdb_proxy_bytes_sent_total Total bytes sent\n");
1645        output.push_str("# TYPE heliosdb_proxy_bytes_sent_total counter\n");
1646        output.push_str(&format!(
1647            "heliosdb_proxy_bytes_sent_total {}\n",
1648            metrics.bytes_sent
1649        ));
1650
1651        output.push_str("# HELP heliosdb_proxy_failovers_total Total failovers\n");
1652        output.push_str("# TYPE heliosdb_proxy_failovers_total counter\n");
1653        output.push_str(&format!(
1654            "heliosdb_proxy_failovers_total {}\n",
1655            metrics.failovers
1656        ));
1657
1658        output
1659    }
1660
1661    /// Send HTTP response
1662    async fn send_response(
1663        writer: &mut tokio::net::tcp::WriteHalf<'_>,
1664        status: u16,
1665        status_text: &str,
1666        body: &str,
1667    ) -> Result<()> {
1668        let response = format!(
1669            "HTTP/1.1 {} {}\r\nContent-Type: text/plain\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
1670            status,
1671            status_text,
1672            body.len(),
1673            body
1674        );
1675
1676        writer
1677            .write_all(response.as_bytes())
1678            .await
1679            .map_err(|e| ProxyError::Network(format!("Write error: {}", e)))?;
1680
1681        Ok(())
1682    }
1683
1684    /// Send JSON HTTP response
1685    async fn send_json_response<T: Serialize>(
1686        writer: &mut tokio::net::tcp::WriteHalf<'_>,
1687        status: u16,
1688        body: &T,
1689    ) -> Result<()> {
1690        let json = serde_json::to_string(body)
1691            .map_err(|e| ProxyError::Internal(format!("JSON error: {}", e)))?;
1692
1693        let status_text = match status {
1694            200 => "OK",
1695            400 => "Bad Request",
1696            404 => "Not Found",
1697            500 => "Internal Server Error",
1698            503 => "Service Unavailable",
1699            _ => "Unknown",
1700        };
1701
1702        let response = format!(
1703            "HTTP/1.1 {} {}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
1704            status,
1705            status_text,
1706            json.len(),
1707            json
1708        );
1709
1710        writer
1711            .write_all(response.as_bytes())
1712            .await
1713            .map_err(|e| ProxyError::Network(format!("Write error: {}", e)))?;
1714
1715        Ok(())
1716    }
1717
1718    /// Shutdown the admin server
1719    pub fn shutdown(&self) {
1720        let _ = self.shutdown_tx.send(());
1721    }
1722}
1723
1724impl AdminState {
1725    /// Create new admin state
1726    pub fn new() -> Self {
1727        Self {
1728            node_health: RwLock::new(HashMap::new()),
1729            metrics: RwLock::new(ServerMetricsSnapshot {
1730                connections_accepted: 0,
1731                connections_closed: 0,
1732                queries_processed: 0,
1733                bytes_received: 0,
1734                bytes_sent: 0,
1735                failovers: 0,
1736            }),
1737            active_sessions: RwLock::new(0),
1738            config_snapshot: RwLock::new(ConfigSnapshot {
1739                listen_address: String::new(),
1740                admin_address: String::new(),
1741                tr_enabled: false,
1742                tr_mode: String::new(),
1743                pool_min_connections: 0,
1744                pool_max_connections: 0,
1745                nodes: Vec::new(),
1746            }),
1747            proxy_config: RwLock::new(None),
1748            read_lb_counter: AtomicUsize::new(0),
1749            commands: RwLock::new(HashMap::new()),
1750            #[cfg(feature = "pool-modes")]
1751            pool_manager: RwLock::new(None),
1752            #[cfg(feature = "circuit-breaker")]
1753            circuit_breaker: RwLock::new(None),
1754            #[cfg(feature = "ha-tr")]
1755            replay_engine: RwLock::new(None),
1756            #[cfg(feature = "wasm-plugins")]
1757            plugin_manager: RwLock::new(None),
1758            chaos_overrides: RwLock::new(HashMap::new()),
1759            #[cfg(feature = "anomaly-detection")]
1760            anomaly_detector: RwLock::new(None),
1761            #[cfg(feature = "query-analytics")]
1762            analytics: RwLock::new(None),
1763            #[cfg(feature = "edge-proxy")]
1764            edge_cache: RwLock::new(None),
1765            #[cfg(feature = "edge-proxy")]
1766            edge_registry: RwLock::new(None),
1767            auth_token: RwLock::new(None),
1768            migration: RwLock::new(None),
1769            branch: RwLock::new(None),
1770        }
1771    }
1772
1773    /// Set the admin Bearer token (wired by the server at startup).
1774    pub async fn with_auth_token(&self, token: Option<String>) {
1775        *self.auth_token.write().await = token;
1776    }
1777
1778    /// Attach traffic-mirror info so `/api/migration/status` can report it.
1779    pub async fn with_migration(&self, info: MigrationInfo) {
1780        *self.migration.write().await = Some(info);
1781    }
1782
1783    /// Attach branch-database config so `/api/branch` can provision.
1784    pub async fn with_branch(&self, cfg: crate::config::BranchConfig) {
1785        *self.branch.write().await = Some(cfg);
1786    }
1787
1788    /// Attach the connection pool manager so `/api/pools` reports real
1789    /// per-node pool statistics. Wired by the server at startup.
1790    #[cfg(feature = "pool-modes")]
1791    pub async fn with_pool_manager(&self, manager: Arc<crate::pool::ConnectionPoolManager>) {
1792        *self.pool_manager.write().await = Some(manager);
1793    }
1794
1795    /// Attach the circuit-breaker manager so `/api/circuit` reports live
1796    /// per-node circuit state. Wired by the server at startup.
1797    #[cfg(feature = "circuit-breaker")]
1798    pub async fn with_circuit_breaker(
1799        &self,
1800        manager: Arc<crate::circuit_breaker::CircuitBreakerManager>,
1801    ) {
1802        *self.circuit_breaker.write().await = Some(manager);
1803    }
1804
1805    /// Attach an anomaly detector. Mirror of with_replay_engine /
1806    /// with_plugin_manager — wired by the server at startup.
1807    #[cfg(feature = "anomaly-detection")]
1808    pub async fn with_anomaly_detector(&self, detector: Arc<AnomalyDetector>) {
1809        *self.anomaly_detector.write().await = Some(detector);
1810    }
1811
1812    /// Attach the query-analytics engine so `/api/analytics` can read it.
1813    #[cfg(feature = "query-analytics")]
1814    pub async fn with_analytics(&self, analytics: Arc<crate::analytics::QueryAnalytics>) {
1815        *self.analytics.write().await = Some(analytics);
1816    }
1817
1818    /// Attach edge cache + registry. Server calls this once at
1819    /// startup; both Arcs are the same instances ServerState holds.
1820    #[cfg(feature = "edge-proxy")]
1821    pub async fn with_edge(&self, cache: Arc<EdgeCache>, registry: Arc<EdgeRegistry>) {
1822        *self.edge_cache.write().await = Some(cache);
1823        *self.edge_registry.write().await = Some(registry);
1824    }
1825
1826    /// Attach a time-travel replay engine. Production startup calls
1827    /// this once with a `ReplayEngine` constructed from the proxy's
1828    /// shared `TransactionJournal` + a `BackendConfig` template; the
1829    /// `/api/replay` endpoint returns 503 until this is set.
1830    #[cfg(feature = "ha-tr")]
1831    pub async fn with_replay_engine(&self, engine: Arc<ReplayEngine>) {
1832        *self.replay_engine.write().await = Some(engine);
1833    }
1834
1835    /// Attach a WASM plugin manager. Production startup calls this
1836    /// once with the same Arc held by ProxyServer; the `/plugins`
1837    /// endpoint returns 503 until set.
1838    #[cfg(feature = "wasm-plugins")]
1839    pub async fn with_plugin_manager(&self, manager: Arc<PluginManager>) {
1840        *self.plugin_manager.write().await = Some(manager);
1841    }
1842
1843    /// Set the proxy configuration for SQL routing
1844    pub async fn set_proxy_config(&self, config: ProxyConfig) {
1845        let mut proxy_config = self.proxy_config.write().await;
1846        *proxy_config = Some(config);
1847    }
1848
1849    /// Register a command handler
1850    pub async fn register_command<F>(&self, name: &str, handler: F)
1851    where
1852        F: Fn(&[&str]) -> Result<String> + Send + Sync + 'static,
1853    {
1854        let mut commands = self.commands.write().await;
1855        commands.insert(name.to_string(), Arc::new(handler));
1856    }
1857
1858    /// Execute a command
1859    pub async fn execute_command(&self, name: &str, args: &[&str]) -> Result<String> {
1860        let commands = self.commands.read().await;
1861        match commands.get(name) {
1862            Some(handler) => handler(args),
1863            None => Err(ProxyError::Internal(format!("Unknown command: {}", name))),
1864        }
1865    }
1866}
1867
1868impl Default for AdminState {
1869    fn default() -> Self {
1870        Self::new()
1871    }
1872}
1873
1874// Request and Response types
1875
1876/// SQL execution request
1877#[derive(Debug, Deserialize)]
1878struct SqlRequest {
1879    /// SQL query to execute
1880    query: String,
1881}
1882
1883/// SQL execution response
1884#[derive(Debug, Serialize)]
1885struct SqlResponse {
1886    /// Query type (read/write)
1887    query_type: String,
1888    /// Node the query was routed to
1889    routed_to: String,
1890    /// Role of the target node
1891    node_role: String,
1892    /// Query result from backend
1893    result: serde_json::Value,
1894}
1895
1896#[derive(Serialize)]
1897struct HealthResponse {
1898    status: &'static str,
1899}
1900
1901#[derive(Serialize)]
1902struct ReadinessResponse {
1903    ready: bool,
1904    message: &'static str,
1905}
1906
1907#[derive(Serialize)]
1908struct LivenessResponse {
1909    alive: bool,
1910}
1911
1912#[derive(Serialize)]
1913struct ErrorResponse {
1914    error: String,
1915}
1916
1917#[derive(Serialize)]
1918struct MetricsResponse {
1919    connections_accepted: u64,
1920    connections_closed: u64,
1921    connections_active: u64,
1922    queries_processed: u64,
1923    bytes_received: u64,
1924    bytes_sent: u64,
1925    failovers: u64,
1926}
1927
1928impl From<ServerMetricsSnapshot> for MetricsResponse {
1929    fn from(m: ServerMetricsSnapshot) -> Self {
1930        Self {
1931            connections_accepted: m.connections_accepted,
1932            connections_closed: m.connections_closed,
1933            connections_active: m.connections_accepted.saturating_sub(m.connections_closed),
1934            queries_processed: m.queries_processed,
1935            bytes_received: m.bytes_received,
1936            bytes_sent: m.bytes_sent,
1937            failovers: m.failovers,
1938        }
1939    }
1940}
1941
1942#[derive(Serialize)]
1943struct NodeHealthResponse {
1944    address: String,
1945    healthy: bool,
1946    last_check: String,
1947    failure_count: u32,
1948    last_error: Option<String>,
1949    latency_ms: f64,
1950    replication_lag_bytes: Option<u64>,
1951}
1952
1953impl From<NodeHealth> for NodeHealthResponse {
1954    fn from(h: NodeHealth) -> Self {
1955        Self {
1956            address: h.address,
1957            healthy: h.healthy,
1958            last_check: h.last_check.to_rfc3339(),
1959            failure_count: h.failure_count,
1960            last_error: h.last_error,
1961            latency_ms: h.latency_ms,
1962            replication_lag_bytes: h.replication_lag_bytes,
1963        }
1964    }
1965}
1966
1967#[derive(Serialize)]
1968struct SessionsResponse {
1969    active_sessions: u64,
1970}
1971
1972/// JSON body for `POST /api/edge/register`.
1973#[cfg(feature = "edge-proxy")]
1974#[derive(Debug, Deserialize)]
1975struct EdgeRegisterBody {
1976    edge_id: String,
1977    region: String,
1978    base_url: String,
1979}
1980
1981/// JSON body for `POST /api/edge/invalidate`. `up_to_version` is
1982/// optional — when None, the cache mints the next version stamp
1983/// (effectively "drop everything matching `tables`").
1984#[cfg(feature = "edge-proxy")]
1985#[derive(Debug, Deserialize)]
1986struct EdgeInvalidateBody {
1987    #[serde(default)]
1988    tables: Vec<String>,
1989    #[serde(default)]
1990    up_to_version: Option<u64>,
1991}
1992
1993/// Parse `?limit=N` from a path. Returns clamped value, or `default`
1994/// when the param is missing / unparseable.
1995///
1996/// Shared by the `/anomalies` (`anomaly-detection`) and `/api/analytics`
1997/// (`query-analytics`) handlers, so it must compile whenever *either* feature
1998/// is enabled — not just `anomaly-detection`.
1999#[cfg(any(feature = "anomaly-detection", feature = "query-analytics"))]
2000fn parse_limit_query(path: &str, default: usize, max: usize) -> usize {
2001    let q = match path.find('?') {
2002        Some(i) => &path[i + 1..],
2003        None => return default,
2004    };
2005    for kv in q.split('&') {
2006        let mut it = kv.splitn(2, '=');
2007        if let (Some(k), Some(v)) = (it.next(), it.next()) {
2008            if k == "limit" {
2009                if let Ok(n) = v.parse::<usize>() {
2010                    return n.min(max);
2011                }
2012            }
2013        }
2014    }
2015    default
2016}
2017
2018/// JSON body for `POST /api/shadow`.
2019#[cfg(feature = "ha-tr")]
2020#[derive(Debug, Deserialize)]
2021struct ShadowRequestBody {
2022    /// SQL to execute on both sides.
2023    sql: String,
2024    /// Optional text-format parameters interpolated into `sql`. None
2025    /// or empty list runs as a simple_query.
2026    #[serde(default)]
2027    params: Option<Vec<String>>,
2028
2029    /// Source backend (the side whose result the application would
2030    /// see in production).
2031    source_host: String,
2032    source_port: u16,
2033    #[serde(default)]
2034    source_user: Option<String>,
2035    #[serde(default)]
2036    source_password: Option<String>,
2037    #[serde(default)]
2038    source_database: Option<String>,
2039
2040    /// Shadow backend (the side being validated — typically a
2041    /// new-version replica or post-migration schema).
2042    shadow_host: String,
2043    shadow_port: u16,
2044    #[serde(default)]
2045    shadow_user: Option<String>,
2046    #[serde(default)]
2047    shadow_password: Option<String>,
2048    #[serde(default)]
2049    shadow_database: Option<String>,
2050}
2051
2052/// Chaos actions the proxy supports today. Forward-compatible —
2053/// unknown actions deserialise as an error.
2054///
2055/// Wire shape: `{"action":"force_unhealthy","target_node":"..."}`.
2056#[derive(Debug, Deserialize)]
2057#[serde(tag = "action", rename_all = "snake_case")]
2058enum ChaosAction {
2059    /// Mark a node unhealthy until restored (or until reset is
2060    /// called). Triggers the failover path the same way a real
2061    /// health-check failure would.
2062    ForceUnhealthy { target_node: String },
2063    /// Mark a previously-overridden node healthy again.
2064    Restore { target_node: String },
2065    /// Reset every chaos override in one call. Idempotent.
2066    Reset,
2067}
2068
2069/// JSON entry returned by `GET /plugins`. Built in admin.rs so the
2070/// plugins module doesn't need a serde dep.
2071#[cfg(feature = "wasm-plugins")]
2072#[derive(Serialize)]
2073struct PluginListEntry {
2074    name: String,
2075    version: String,
2076    description: String,
2077    /// Hook export names (`pre_query`, `post_query`, `route`, ...).
2078    hooks: Vec<String>,
2079    /// `Loading` | `Running` | `Paused` | `Error(...)` | `Unloading`.
2080    state: String,
2081    invocations: u64,
2082    errors: u64,
2083}
2084
2085/// JSON body for `POST /api/replay`.
2086#[cfg(feature = "ha-tr")]
2087#[derive(Debug, Deserialize)]
2088struct ReplayRequestBody {
2089    /// RFC 3339 inclusive window start.
2090    from: DateTime<Utc>,
2091    /// RFC 3339 inclusive window end.
2092    to: DateTime<Utc>,
2093    /// Target backend host (typically a staging DB).
2094    target_host: String,
2095    /// Target backend port.
2096    target_port: u16,
2097    /// Optional credential overrides — when omitted, the engine uses
2098    /// the template values set at server startup. Production callers
2099    /// targeting a separate staging DB pass these explicitly so the
2100    /// proxy doesn't need to hold staging credentials in its own
2101    /// config.
2102    #[serde(default)]
2103    target_user: Option<String>,
2104    #[serde(default)]
2105    target_password: Option<String>,
2106    #[serde(default)]
2107    target_database: Option<String>,
2108}
2109
2110/// Joined view exposed at `/topology`. Field names use camelCase so
2111/// they map cleanly into the Kubernetes operator's CRD status
2112/// (`HeliosProxyStatus.currentPrimary`, etc).
2113#[derive(Serialize)]
2114struct TopologyResponse {
2115    #[serde(rename = "currentPrimary")]
2116    current_primary: Option<String>,
2117    #[serde(rename = "healthyNodes")]
2118    healthy_nodes: u32,
2119    #[serde(rename = "unhealthyNodes")]
2120    unhealthy_nodes: u32,
2121    #[serde(rename = "totalNodes")]
2122    total_nodes: u32,
2123    /// RFC 3339 timestamp of the last detected primary change.
2124    /// `None` when the proxy hasn't observed a failover since boot.
2125    #[serde(rename = "lastFailoverAt")]
2126    last_failover_at: Option<String>,
2127}
2128
2129#[derive(Serialize)]
2130struct PoolStatsResponse {
2131    node: String,
2132    active_connections: u64,
2133    idle_connections: u64,
2134    pending_requests: u64,
2135    total_connections_created: u64,
2136    total_connections_closed: u64,
2137}
2138
2139#[derive(Serialize)]
2140struct VersionResponse {
2141    version: String,
2142    build_time: String,
2143}
2144
2145#[cfg(test)]
2146mod tests {
2147    use super::*;
2148
2149    #[tokio::test]
2150    async fn test_admin_state_creation() {
2151        let state = AdminState::new();
2152        let sessions = state.active_sessions.read().await;
2153        assert_eq!(*sessions, 0);
2154    }
2155
2156    #[test]
2157    fn test_admin_authorized() {
2158        let h = |s: &str| vec!["GET /topology HTTP/1.1".to_string(), s.to_string()];
2159        assert!(AdminServer::admin_authorized(
2160            &h("Authorization: Bearer s3cret"),
2161            "s3cret"
2162        ));
2163        // case-insensitive header name, exact token
2164        assert!(AdminServer::admin_authorized(
2165            &h("authorization: Bearer s3cret"),
2166            "s3cret"
2167        ));
2168        // wrong token / wrong scheme / missing header all rejected
2169        assert!(!AdminServer::admin_authorized(
2170            &h("Authorization: Bearer nope"),
2171            "s3cret"
2172        ));
2173        assert!(!AdminServer::admin_authorized(
2174            &h("Authorization: Basic s3cret"),
2175            "s3cret"
2176        ));
2177        assert!(!AdminServer::admin_authorized(
2178            &["GET / HTTP/1.1".to_string()],
2179            "s3cret"
2180        ));
2181    }
2182
2183    #[test]
2184    fn test_constant_time_eq_str() {
2185        assert!(constant_time_eq_str("abc", "abc"));
2186        assert!(!constant_time_eq_str("abc", "abd"));
2187        assert!(!constant_time_eq_str("abc", "abcd"));
2188    }
2189
2190    #[tokio::test]
2191    async fn test_readiness_check_no_nodes() {
2192        let state = Arc::new(AdminState::new());
2193        let ready = AdminServer::check_readiness(&state).await;
2194        assert!(!ready);
2195    }
2196
2197    #[tokio::test]
2198    async fn test_readiness_check_with_healthy_node() {
2199        let state = Arc::new(AdminState::new());
2200
2201        {
2202            let mut health = state.node_health.write().await;
2203            health.insert(
2204                "localhost:5432".to_string(),
2205                NodeHealth {
2206                    address: "localhost:5432".to_string(),
2207                    healthy: true,
2208                    last_check: chrono::Utc::now(),
2209                    failure_count: 0,
2210                    last_error: None,
2211                    latency_ms: 1.0,
2212                    replication_lag_bytes: None,
2213                },
2214            );
2215        }
2216
2217        let ready = AdminServer::check_readiness(&state).await;
2218        assert!(ready);
2219    }
2220
2221    #[tokio::test]
2222    async fn test_command_registration() {
2223        let state = AdminState::new();
2224
2225        state
2226            .register_command("test", |args| {
2227                Ok(format!("Test command with {} args", args.len()))
2228            })
2229            .await;
2230
2231        let result = state.execute_command("test", &["arg1", "arg2"]).await;
2232        assert!(result.is_ok());
2233        assert_eq!(result.unwrap(), "Test command with 2 args");
2234    }
2235
2236    #[tokio::test]
2237    async fn test_unknown_command() {
2238        let state = AdminState::new();
2239        let result = state.execute_command("unknown", &[]).await;
2240        assert!(result.is_err());
2241    }
2242
2243    #[test]
2244    fn test_prometheus_metrics_format() {
2245        let metrics = ServerMetricsSnapshot {
2246            connections_accepted: 100,
2247            connections_closed: 50,
2248            queries_processed: 1000,
2249            bytes_received: 50000,
2250            bytes_sent: 100000,
2251            failovers: 2,
2252        };
2253
2254        let output = AdminServer::format_prometheus_metrics(&metrics);
2255        assert!(output.contains("heliosdb_proxy_connections_total 100"));
2256        assert!(output.contains("heliosdb_proxy_queries_total 1000"));
2257        assert!(output.contains("heliosdb_proxy_failovers_total 2"));
2258    }
2259
2260    #[test]
2261    fn test_metrics_response_active_connections() {
2262        let snapshot = ServerMetricsSnapshot {
2263            connections_accepted: 100,
2264            connections_closed: 30,
2265            queries_processed: 500,
2266            bytes_received: 10000,
2267            bytes_sent: 20000,
2268            failovers: 1,
2269        };
2270
2271        let response = MetricsResponse::from(snapshot);
2272        assert_eq!(response.connections_active, 70);
2273    }
2274
2275    /// Helper: build an AdminState with the given (address, role,
2276    /// healthy) tuples seeded into config + node_health.
2277    async fn topology_state(nodes: &[(&str, &str, bool)]) -> Arc<AdminState> {
2278        let state = Arc::new(AdminState::new());
2279        {
2280            let mut cfg = state.config_snapshot.write().await;
2281            cfg.nodes = nodes
2282                .iter()
2283                .map(|(addr, role, _)| NodeSnapshot {
2284                    address: (*addr).to_string(),
2285                    role: (*role).to_string(),
2286                    weight: 100,
2287                    enabled: true,
2288                })
2289                .collect();
2290        }
2291        {
2292            let mut health = state.node_health.write().await;
2293            for (addr, _, healthy) in nodes {
2294                health.insert(
2295                    (*addr).to_string(),
2296                    NodeHealth {
2297                        address: (*addr).to_string(),
2298                        healthy: *healthy,
2299                        last_check: chrono::Utc::now(),
2300                        failure_count: 0,
2301                        last_error: None,
2302                        latency_ms: 1.0,
2303                        replication_lag_bytes: None,
2304                    },
2305                );
2306            }
2307        }
2308        state
2309    }
2310
2311    #[tokio::test]
2312    async fn test_topology_returns_healthy_primary() {
2313        let state = topology_state(&[
2314            ("primary.svc:5432", "primary", true),
2315            ("standby-a.svc:5432", "standby", true),
2316            ("standby-b.svc:5432", "standby", false),
2317        ])
2318        .await;
2319
2320        let topo = AdminServer::compute_topology(&state).await;
2321        assert_eq!(topo.current_primary.as_deref(), Some("primary.svc:5432"));
2322        assert_eq!(topo.healthy_nodes, 2);
2323        assert_eq!(topo.unhealthy_nodes, 1);
2324        assert_eq!(topo.total_nodes, 3);
2325    }
2326
2327    #[tokio::test]
2328    async fn test_topology_no_primary_when_primary_unhealthy() {
2329        // Failover in progress: the configured primary is sick and
2330        // no other node has been promoted yet.
2331        let state = topology_state(&[
2332            ("primary.svc:5432", "primary", false),
2333            ("standby.svc:5432", "standby", true),
2334        ])
2335        .await;
2336
2337        let topo = AdminServer::compute_topology(&state).await;
2338        assert_eq!(topo.current_primary, None);
2339        assert_eq!(topo.healthy_nodes, 1);
2340        assert_eq!(topo.unhealthy_nodes, 1);
2341    }
2342
2343    #[tokio::test]
2344    async fn test_topology_handles_empty_cluster() {
2345        let state = Arc::new(AdminState::new());
2346        let topo = AdminServer::compute_topology(&state).await;
2347        assert_eq!(topo.current_primary, None);
2348        assert_eq!(topo.healthy_nodes, 0);
2349        assert_eq!(topo.unhealthy_nodes, 0);
2350        assert_eq!(topo.total_nodes, 0);
2351    }
2352
2353    #[tokio::test]
2354    async fn test_topology_role_match_is_case_insensitive() {
2355        let state = topology_state(&[("primary.svc:5432", "PRIMARY", true)]).await;
2356        let topo = AdminServer::compute_topology(&state).await;
2357        assert_eq!(topo.current_primary.as_deref(), Some("primary.svc:5432"));
2358    }
2359
2360    #[cfg(feature = "ha-tr")]
2361    #[tokio::test]
2362    async fn test_replay_returns_503_when_engine_unattached() {
2363        let state = Arc::new(AdminState::new());
2364        let body = r#"{
2365            "from": "2026-04-25T10:00:00Z",
2366            "to":   "2026-04-25T11:00:00Z",
2367            "target_host": "127.0.0.1",
2368            "target_port": 5432
2369        }"#;
2370        let (status, value) = AdminServer::handle_replay_request(Some(body), &state)
2371            .await
2372            .expect("handler returns Ok with status code");
2373        assert_eq!(status, 503);
2374        assert_eq!(value["error"], "replay engine not attached");
2375    }
2376
2377    #[cfg(feature = "ha-tr")]
2378    #[tokio::test]
2379    async fn test_replay_400_on_malformed_body() {
2380        let state = Arc::new(AdminState::new());
2381        let (status, _) = AdminServer::handle_replay_request(Some("not json"), &state)
2382            .await
2383            .expect("handler returns Ok with status code");
2384        assert_eq!(status, 400);
2385    }
2386
2387    #[cfg(feature = "ha-tr")]
2388    #[tokio::test]
2389    async fn test_replay_errors_on_empty_body() {
2390        let state = Arc::new(AdminState::new());
2391        let err = AdminServer::handle_replay_request(None, &state).await;
2392        assert!(err.is_err(), "empty body must surface as Err");
2393    }
2394
2395    #[cfg(feature = "wasm-plugins")]
2396    #[tokio::test]
2397    async fn test_plugins_list_returns_503_when_manager_unattached() {
2398        let state = Arc::new(AdminState::new());
2399        let (status, value) = AdminServer::handle_plugins_list(&state)
2400            .await
2401            .expect("handler returns Ok with status code");
2402        assert_eq!(status, 503);
2403        assert_eq!(value["error"], "plugin manager not attached");
2404    }
2405
2406    #[cfg(not(feature = "wasm-plugins"))]
2407    #[tokio::test]
2408    async fn test_plugins_list_503_without_feature() {
2409        let state = Arc::new(AdminState::new());
2410        let (status, _) = AdminServer::handle_plugins_list(&state)
2411            .await
2412            .expect("handler returns Ok");
2413        assert_eq!(status, 503);
2414    }
2415
2416    /// Helper: state with a single healthy node seeded into health.
2417    async fn chaos_state_with_node(addr: &str) -> Arc<AdminState> {
2418        let state = Arc::new(AdminState::new());
2419        state.node_health.write().await.insert(
2420            addr.to_string(),
2421            NodeHealth {
2422                address: addr.to_string(),
2423                healthy: true,
2424                last_check: chrono::Utc::now(),
2425                failure_count: 0,
2426                last_error: None,
2427                latency_ms: 1.0,
2428                replication_lag_bytes: None,
2429            },
2430        );
2431        state
2432    }
2433
2434    #[tokio::test]
2435    async fn test_chaos_force_unhealthy_flips_node_and_records_override() {
2436        let state = chaos_state_with_node("primary.svc:5432").await;
2437        let body = r#"{"action":"force_unhealthy","target_node":"primary.svc:5432"}"#;
2438        let (status, value) = AdminServer::handle_chaos_request(Some(body), &state)
2439            .await
2440            .expect("handler returns Ok");
2441        assert_eq!(status, 200);
2442        assert_eq!(value["applied"], "force_unhealthy");
2443        // Health flag flipped.
2444        assert!(!state.node_health.read().await["primary.svc:5432"].healthy);
2445        // Override recorded.
2446        assert!(state
2447            .chaos_overrides
2448            .read()
2449            .await
2450            .contains_key("primary.svc:5432"));
2451    }
2452
2453    #[tokio::test]
2454    async fn test_chaos_restore_clears_override_and_flips_back() {
2455        let state = chaos_state_with_node("primary.svc:5432").await;
2456        let _ = AdminServer::handle_chaos_request(
2457            Some(r#"{"action":"force_unhealthy","target_node":"primary.svc:5432"}"#),
2458            &state,
2459        )
2460        .await
2461        .unwrap();
2462        let (status, _) = AdminServer::handle_chaos_request(
2463            Some(r#"{"action":"restore","target_node":"primary.svc:5432"}"#),
2464            &state,
2465        )
2466        .await
2467        .unwrap();
2468        assert_eq!(status, 200);
2469        assert!(state.node_health.read().await["primary.svc:5432"].healthy);
2470        assert!(state.chaos_overrides.read().await.is_empty());
2471    }
2472
2473    #[tokio::test]
2474    async fn test_chaos_reset_restores_all_overrides() {
2475        let state = chaos_state_with_node("a:5432").await;
2476        state.node_health.write().await.insert(
2477            "b:5432".to_string(),
2478            NodeHealth {
2479                address: "b:5432".to_string(),
2480                healthy: true,
2481                last_check: chrono::Utc::now(),
2482                failure_count: 0,
2483                last_error: None,
2484                latency_ms: 1.0,
2485                replication_lag_bytes: None,
2486            },
2487        );
2488        for addr in &["a:5432", "b:5432"] {
2489            let body = format!(r#"{{"action":"force_unhealthy","target_node":"{}"}}"#, addr);
2490            let _ = AdminServer::handle_chaos_request(Some(&body), &state)
2491                .await
2492                .unwrap();
2493        }
2494        let (status, value) =
2495            AdminServer::handle_chaos_request(Some(r#"{"action":"reset"}"#), &state)
2496                .await
2497                .unwrap();
2498        assert_eq!(status, 200);
2499        assert_eq!(value["reset"], true);
2500        let restored = value["restored"].as_array().unwrap();
2501        assert_eq!(restored.len(), 2);
2502        // Both nodes back to healthy + overrides cleared.
2503        for addr in &["a:5432", "b:5432"] {
2504            assert!(state.node_health.read().await[*addr].healthy);
2505        }
2506        assert!(state.chaos_overrides.read().await.is_empty());
2507    }
2508
2509    #[tokio::test]
2510    async fn test_chaos_force_unhealthy_404s_when_node_unknown() {
2511        let state = Arc::new(AdminState::new());
2512        let body = r#"{"action":"force_unhealthy","target_node":"missing.svc:5432"}"#;
2513        let (status, _) = AdminServer::handle_chaos_request(Some(body), &state)
2514            .await
2515            .expect("handler returns Ok");
2516        assert_eq!(status, 404);
2517    }
2518
2519    #[tokio::test]
2520    async fn test_chaos_400_on_malformed_body() {
2521        let state = Arc::new(AdminState::new());
2522        let (status, _) = AdminServer::handle_chaos_request(Some("not json"), &state)
2523            .await
2524            .expect("handler returns Ok");
2525        assert_eq!(status, 400);
2526    }
2527
2528    #[tokio::test]
2529    async fn test_chaos_400_on_unknown_action() {
2530        let state = Arc::new(AdminState::new());
2531        let body = r#"{"action":"format_disk","target_node":"x"}"#;
2532        let (status, _) = AdminServer::handle_chaos_request(Some(body), &state)
2533            .await
2534            .expect("handler returns Ok");
2535        assert_eq!(status, 400);
2536    }
2537
2538    #[cfg(feature = "ha-tr")]
2539    #[tokio::test]
2540    async fn test_shadow_400_on_malformed_body() {
2541        let (status, _) = AdminServer::handle_shadow_request(Some("not json"))
2542            .await
2543            .expect("handler returns Ok");
2544        assert_eq!(status, 400);
2545    }
2546
2547    #[cfg(feature = "ha-tr")]
2548    #[tokio::test]
2549    async fn test_shadow_500_on_source_unreachable() {
2550        // Address that nothing is listening on (port 1 = tcpmux,
2551        // refused by everything reasonable).
2552        let body = r#"{
2553            "sql": "SELECT 1",
2554            "source_host": "127.0.0.1",
2555            "source_port": 1,
2556            "shadow_host": "127.0.0.1",
2557            "shadow_port": 1
2558        }"#;
2559        let (status, value) = AdminServer::handle_shadow_request(Some(body))
2560            .await
2561            .expect("handler returns Ok");
2562        assert_eq!(status, 500);
2563        let err = value["error"].as_str().expect("error field");
2564        assert!(
2565            err.contains("source connect"),
2566            "expected source connect error, got {}",
2567            err
2568        );
2569    }
2570
2571    #[cfg(feature = "ha-tr")]
2572    #[tokio::test]
2573    async fn test_shadow_errors_on_empty_body() {
2574        let err = AdminServer::handle_shadow_request(None).await;
2575        assert!(err.is_err(), "empty body must surface as Err");
2576    }
2577
2578    #[cfg(feature = "anomaly-detection")]
2579    #[tokio::test]
2580    async fn test_anomalies_returns_503_when_detector_unattached() {
2581        let state = Arc::new(AdminState::new());
2582        let (status, value) = AdminServer::handle_anomalies_list("/anomalies", &state)
2583            .await
2584            .expect("handler returns Ok");
2585        assert_eq!(status, 503);
2586        assert_eq!(value["error"], "anomaly detector not attached");
2587    }
2588
2589    #[cfg(feature = "anomaly-detection")]
2590    #[tokio::test]
2591    async fn test_anomalies_returns_attached_detector_events() {
2592        use crate::anomaly::{AnomalyConfig, AnomalyDetector, QueryObservation};
2593        let state = Arc::new(AdminState::new());
2594        let det = Arc::new(AnomalyDetector::new(AnomalyConfig::default()));
2595        // Seed a SQL injection event into the detector.
2596        let _ = det.record_query(&QueryObservation {
2597            tenant: "test".into(),
2598            fingerprint: "fp".into(),
2599            sql: "SELECT * FROM users WHERE id = 1 OR 1=1 --".into(),
2600            timestamp: std::time::Instant::now(),
2601        });
2602        state.with_anomaly_detector(det.clone()).await;
2603
2604        let (status, value) = AdminServer::handle_anomalies_list("/anomalies", &state)
2605            .await
2606            .expect("handler returns Ok");
2607        assert_eq!(status, 200);
2608        let count = value["count"].as_u64().expect("count field");
2609        assert!(count > 0, "expected at least one event, got {}", count);
2610    }
2611
2612    #[cfg(feature = "anomaly-detection")]
2613    #[tokio::test]
2614    async fn test_anomalies_limit_query_string_respected() {
2615        use crate::anomaly::{AnomalyConfig, AnomalyDetector, QueryObservation};
2616        let state = Arc::new(AdminState::new());
2617        let det = Arc::new(AnomalyDetector::new(AnomalyConfig::default()));
2618        for i in 0..50 {
2619            let fp = format!("fp{}", i);
2620            let _ = det.record_query(&QueryObservation {
2621                tenant: "test".into(),
2622                fingerprint: fp,
2623                sql: "SELECT 1".into(),
2624                timestamp: std::time::Instant::now(),
2625            });
2626        }
2627        state.with_anomaly_detector(det).await;
2628
2629        let (status, value) = AdminServer::handle_anomalies_list("/anomalies?limit=5", &state)
2630            .await
2631            .expect("handler returns Ok");
2632        assert_eq!(status, 200);
2633        assert_eq!(value["limit"].as_u64().unwrap(), 5);
2634        assert_eq!(value["events"].as_array().unwrap().len(), 5);
2635    }
2636
2637    #[cfg(any(feature = "anomaly-detection", feature = "query-analytics"))]
2638    #[test]
2639    fn test_parse_limit_query_helper() {
2640        assert_eq!(parse_limit_query("/anomalies", 100, 1024), 100);
2641        assert_eq!(parse_limit_query("/anomalies?limit=42", 100, 1024), 42);
2642        assert_eq!(parse_limit_query("/anomalies?limit=99999", 100, 1024), 1024);
2643        assert_eq!(parse_limit_query("/anomalies?limit=abc", 100, 1024), 100);
2644        assert_eq!(
2645            parse_limit_query("/anomalies?other=x&limit=7", 100, 1024),
2646            7
2647        );
2648    }
2649
2650    #[cfg(feature = "edge-proxy")]
2651    async fn edge_state() -> Arc<AdminState> {
2652        use crate::edge::{EdgeCache, EdgeRegistry};
2653        use std::time::Duration;
2654        let s = Arc::new(AdminState::new());
2655        let cache = Arc::new(EdgeCache::new(100));
2656        let registry = Arc::new(EdgeRegistry::new(8, Duration::from_secs(60)));
2657        s.with_edge(cache, registry).await;
2658        s
2659    }
2660
2661    #[cfg(feature = "edge-proxy")]
2662    #[tokio::test]
2663    async fn test_edge_status_returns_empty_lists_initially() {
2664        let s = edge_state().await;
2665        let (status, value) = AdminServer::handle_edge_status(&s)
2666            .await
2667            .expect("handler returns Ok");
2668        assert_eq!(status, 200);
2669        assert_eq!(value["edge_count"].as_u64().unwrap(), 0);
2670        assert_eq!(value["registered"].as_array().unwrap().len(), 0);
2671        assert!(value["cache"].is_object(), "cache stats present");
2672    }
2673
2674    #[cfg(feature = "edge-proxy")]
2675    #[tokio::test]
2676    async fn test_edge_register_then_status_lists_edge() {
2677        let s = edge_state().await;
2678        let body = r#"{"edge_id":"e1","region":"us-east","base_url":"https://e1.svc"}"#;
2679        let (status, _) = AdminServer::handle_edge_register(Some(body), &s)
2680            .await
2681            .expect("handler ok");
2682        assert_eq!(status, 201);
2683        let (status2, value2) = AdminServer::handle_edge_status(&s).await.unwrap();
2684        assert_eq!(status2, 200);
2685        assert_eq!(value2["edge_count"].as_u64().unwrap(), 1);
2686        assert_eq!(value2["registered"][0]["edge_id"].as_str().unwrap(), "e1");
2687    }
2688
2689    #[cfg(feature = "edge-proxy")]
2690    #[tokio::test]
2691    async fn test_edge_register_400_on_malformed_body() {
2692        let s = edge_state().await;
2693        let (status, _) = AdminServer::handle_edge_register(Some("not json"), &s)
2694            .await
2695            .expect("handler ok");
2696        assert_eq!(status, 400);
2697    }
2698
2699    #[cfg(feature = "edge-proxy")]
2700    #[tokio::test]
2701    async fn test_edge_invalidate_drops_local_cache_entries() {
2702        use crate::edge::{CacheEntry, CacheKey};
2703        use std::time::{Duration, Instant};
2704        let s = edge_state().await;
2705        // Seed an entry into the local cache.
2706        let cache = s.edge_cache.read().await.clone().unwrap();
2707        cache.insert(
2708            CacheKey::new("fp1", "p1"),
2709            CacheEntry {
2710                version: 1,
2711                response_bytes: b"row".to_vec(),
2712                tables: vec!["users".into()],
2713                expires_at: Instant::now() + Duration::from_secs(60),
2714            },
2715        );
2716        assert!(cache.get(&CacheKey::new("fp1", "p1")).is_some());
2717
2718        let body = r#"{"tables":["users"]}"#;
2719        let (status, value) = AdminServer::handle_edge_invalidate(Some(body), &s)
2720            .await
2721            .expect("handler ok");
2722        assert_eq!(status, 200);
2723        assert_eq!(value["dropped_local"].as_u64().unwrap(), 1);
2724        assert!(cache.get(&CacheKey::new("fp1", "p1")).is_none());
2725    }
2726
2727    #[cfg(feature = "edge-proxy")]
2728    #[tokio::test]
2729    async fn test_edge_invalidate_503_when_cache_unattached() {
2730        let s = Arc::new(AdminState::new());
2731        let body = r#"{"tables":["users"]}"#;
2732        let (status, _) = AdminServer::handle_edge_invalidate(Some(body), &s)
2733            .await
2734            .expect("handler ok");
2735        assert_eq!(status, 503);
2736    }
2737}