Skip to main content

heliosdb_proxy/
admin.rs

1//! Admin API
2//!
3//! REST API for proxy management, monitoring, and configuration.
4//! Includes HTTP SQL API for transparent write routing (TWR) and load balancing.
5
6#[cfg(feature = "anomaly-detection")]
7use crate::anomaly::AnomalyDetector;
8use crate::config::{NodeConfig, NodeRole, ProxyConfig};
9#[cfg(feature = "edge-proxy")]
10use crate::edge::{EdgeCache, EdgeRegistry, InvalidationEvent};
11#[cfg(feature = "wasm-plugins")]
12use crate::plugins::PluginManager;
13#[cfg(feature = "ha-tr")]
14use crate::replay::{ReplayEngine, TimeTravelRequest};
15use crate::server::{NodeHealth, ServerMetricsSnapshot};
16use crate::{ProxyError, Result};
17#[cfg(feature = "ha-tr")]
18use chrono::{DateTime, Utc};
19use serde::{Deserialize, Serialize};
20use std::collections::HashMap;
21use std::net::SocketAddr;
22use std::sync::atomic::{AtomicUsize, Ordering};
23use std::sync::Arc;
24use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
25use tokio::net::{TcpListener, TcpStream};
26use tokio::sync::{broadcast, RwLock};
27
28/// Static admin UI (vanilla HTML + JS). Compiled into the binary via
29/// `include_str!` so deployments are a single binary — no extra file
30/// serving or asset bundling. Served at `GET /` and `GET /ui`.
31const ADMIN_UI_HTML: &str = include_str!("admin_ui.html");
32
33/// Admin API server
34pub struct AdminServer {
35    /// Listen address
36    listen_address: String,
37    /// Shared state with proxy
38    state: Arc<AdminState>,
39    /// Shutdown channel
40    shutdown_tx: broadcast::Sender<()>,
41}
42
43/// Shared admin state
44pub struct AdminState {
45    /// Node health status
46    pub node_health: RwLock<HashMap<String, NodeHealth>>,
47    /// Server metrics
48    pub metrics: RwLock<ServerMetricsSnapshot>,
49    /// Active sessions count
50    pub active_sessions: RwLock<u64>,
51    /// Configuration (read-only)
52    pub config_snapshot: RwLock<ConfigSnapshot>,
53    /// Full proxy config (for SQL routing)
54    pub proxy_config: RwLock<Option<ProxyConfig>>,
55    /// Round-robin counter for read load balancing
56    read_lb_counter: AtomicUsize,
57    /// Registered command handlers
58    commands: RwLock<HashMap<String, CommandHandler>>,
59    /// Time-travel replay engine. Optional so test fixtures don't have
60    /// to wire a backend template; production startup attaches it via
61    /// `with_replay_engine`. Endpoint returns 503 when missing.
62    #[cfg(feature = "ha-tr")]
63    pub replay_engine: RwLock<Option<Arc<ReplayEngine>>>,
64    /// WASM plugin manager. None when the proxy started without
65    /// plugins (or with a different feature set). `/plugins`
66    /// endpoint returns 503 when missing; UI panel says "no plugin
67    /// manager attached".
68    #[cfg(feature = "wasm-plugins")]
69    pub plugin_manager: RwLock<Option<Arc<PluginManager>>>,
70    /// Chaos-mode overrides: per-node-address marker that the chaos
71    /// system (POST /api/chaos) has forced this node to a particular
72    /// state. Lets the UI distinguish "operationally disabled" from
73    /// "chaos-injected fault" and lets `Reset` restore everything.
74    pub chaos_overrides: RwLock<HashMap<String, ChaosOverride>>,
75    /// Anomaly detector — same Arc the server populates from the
76    /// query path. /api/anomalies polls this for the recent-events
77    /// ring buffer.
78    #[cfg(feature = "anomaly-detection")]
79    pub anomaly_detector: RwLock<Option<Arc<AnomalyDetector>>>,
80    /// Edge proxy cache + registry. Cache surfaces stats; registry
81    /// is the home-side fanout for invalidations.
82    #[cfg(feature = "edge-proxy")]
83    pub edge_cache: RwLock<Option<Arc<EdgeCache>>>,
84    #[cfg(feature = "edge-proxy")]
85    pub edge_registry: RwLock<Option<Arc<EdgeRegistry>>>,
86}
87
88/// Chaos override applied to a single node. Today only the
89/// `ForceUnhealthy` flavour is implemented — `inject_query_delay`
90/// is the natural follow-up but wants per-query interception that
91/// lives in the server message loop, not here.
92#[derive(Debug, Clone, Serialize)]
93pub struct ChaosOverride {
94    /// Wall-clock when the override was applied (RFC 3339).
95    pub since: String,
96    /// "force_unhealthy" | "delay_ms"
97    pub kind: String,
98    /// Free-form description shown in admin UI.
99    pub note: String,
100}
101
102
103/// Command handler type
104type CommandHandler = Arc<dyn Fn(&[&str]) -> Result<String> + Send + Sync>;
105
106/// Configuration snapshot for admin API
107#[derive(Debug, Clone, Serialize, Deserialize)]
108pub struct ConfigSnapshot {
109    pub listen_address: String,
110    pub admin_address: String,
111    pub tr_enabled: bool,
112    pub tr_mode: String,
113    pub pool_min_connections: usize,
114    pub pool_max_connections: usize,
115    pub nodes: Vec<NodeSnapshot>,
116}
117
118/// Node configuration snapshot
119#[derive(Debug, Clone, Serialize, Deserialize)]
120pub struct NodeSnapshot {
121    pub address: String,
122    pub role: String,
123    pub weight: u32,
124    pub enabled: bool,
125}
126
127impl AdminServer {
128    /// Create a new admin server
129    pub fn new(listen_address: String, state: Arc<AdminState>) -> Self {
130        let (shutdown_tx, _) = broadcast::channel(1);
131
132        Self {
133            listen_address,
134            state,
135            shutdown_tx,
136        }
137    }
138
139    /// Run the admin server
140    pub async fn run(&self) -> Result<()> {
141        let listener = TcpListener::bind(&self.listen_address)
142            .await
143            .map_err(|e| ProxyError::Network(format!("Failed to bind admin: {}", e)))?;
144
145        tracing::info!("Admin API listening on {}", self.listen_address);
146
147        let mut shutdown_rx = self.shutdown_tx.subscribe();
148
149        loop {
150            tokio::select! {
151                accept_result = listener.accept() => {
152                    match accept_result {
153                        Ok((stream, addr)) => {
154                            let state = self.state.clone();
155                            tokio::spawn(async move {
156                                if let Err(e) = Self::handle_connection(stream, addr, state).await {
157                                    tracing::error!("Admin connection error: {}", e);
158                                }
159                            });
160                        }
161                        Err(e) => {
162                            tracing::error!("Admin accept error: {}", e);
163                        }
164                    }
165                }
166                _ = shutdown_rx.recv() => {
167                    tracing::info!("Admin server shutting down");
168                    break;
169                }
170            }
171        }
172
173        Ok(())
174    }
175
176    /// Handle an admin connection
177    async fn handle_connection(
178        mut stream: TcpStream,
179        addr: SocketAddr,
180        state: Arc<AdminState>,
181    ) -> Result<()> {
182        tracing::debug!("Admin connection from {}", addr);
183
184        let (reader, mut writer) = stream.split();
185        let mut reader = BufReader::new(reader);
186        let mut line = String::new();
187
188        // Read HTTP request headers
189        let mut headers = Vec::new();
190        let mut content_length: usize = 0;
191
192        loop {
193            line.clear();
194            let bytes_read = reader
195                .read_line(&mut line)
196                .await
197                .map_err(|e| ProxyError::Network(format!("Read error: {}", e)))?;
198
199            if bytes_read == 0 || line == "\r\n" {
200                break;
201            }
202
203            // Parse Content-Length header
204            let trimmed = line.trim();
205            if trimmed.to_lowercase().starts_with("content-length:") {
206                if let Some(len_str) = trimmed.split(':').nth(1) {
207                    content_length = len_str.trim().parse().unwrap_or(0);
208                }
209            }
210            headers.push(trimmed.to_string());
211        }
212
213        if headers.is_empty() {
214            return Ok(());
215        }
216
217        // Parse request line
218        let request_line = &headers[0];
219        let parts: Vec<&str> = request_line.split_whitespace().collect();
220
221        if parts.len() < 2 {
222            Self::send_response(&mut writer, 400, "Bad Request", "Invalid request line").await?;
223            return Ok(());
224        }
225
226        let method = parts[0];
227        let path = parts[1];
228
229        // Read request body for POST/PUT requests
230        let body = if content_length > 0 && (method == "POST" || method == "PUT") {
231            let mut body_buf = vec![0u8; content_length];
232            reader.read_exact(&mut body_buf).await
233                .map_err(|e| ProxyError::Network(format!("Body read error: {}", e)))?;
234            Some(String::from_utf8_lossy(&body_buf).to_string())
235        } else {
236            None
237        };
238
239        // Static admin UI — single HTML file compiled into the binary.
240        // Served at `/` and `/ui`; all other routes remain JSON.
241        if method == "GET" && (path == "/" || path == "/ui" || path == "/ui/") {
242            Self::send_html_response(&mut writer, 200, ADMIN_UI_HTML).await?;
243            return Ok(());
244        }
245
246        // Route request
247        let response = Self::route_request(method, path, body.as_deref(), &state).await;
248
249        match response {
250            Ok((status, body)) => {
251                Self::send_json_response(&mut writer, status, &body).await?;
252            }
253            Err(e) => {
254                let error = ErrorResponse {
255                    error: e.to_string(),
256                };
257                Self::send_json_response(&mut writer, 500, &error).await?;
258            }
259        }
260
261        Ok(())
262    }
263
264    /// Serve a text/html HTTP response. Used by the admin UI route.
265    async fn send_html_response(
266        writer: &mut tokio::net::tcp::WriteHalf<'_>,
267        status: u16,
268        html: &str,
269    ) -> Result<()> {
270        let status_text = match status {
271            200 => "OK",
272            404 => "Not Found",
273            _ => "Unknown",
274        };
275        let response = format!(
276            "HTTP/1.1 {} {}\r\nContent-Type: text/html; charset=utf-8\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
277            status,
278            status_text,
279            html.len(),
280            html
281        );
282        writer
283            .write_all(response.as_bytes())
284            .await
285            .map_err(|e| ProxyError::Network(format!("Write error: {}", e)))?;
286        Ok(())
287    }
288
289    /// Route a request to the appropriate handler
290    async fn route_request(
291        method: &str,
292        path: &str,
293        body: Option<&str>,
294        state: &Arc<AdminState>,
295    ) -> Result<(u16, serde_json::Value)> {
296        match (method, path) {
297            // SQL API - Execute SQL with TWR (Transparent Write Routing)
298            ("POST", "/api/sql") => {
299                Self::handle_sql_request(body, state).await
300            }
301
302            // Health endpoints
303            ("GET", "/health") => {
304                let health = HealthResponse { status: "ok" };
305                Ok((200, serde_json::to_value(health)?))
306            }
307            ("GET", "/health/ready") => {
308                let ready = Self::check_readiness(state).await;
309                let response = ReadinessResponse {
310                    ready,
311                    message: if ready {
312                        "Proxy is ready"
313                    } else {
314                        "Proxy is not ready"
315                    },
316                };
317                let status = if ready { 200 } else { 503 };
318                Ok((status, serde_json::to_value(response)?))
319            }
320            ("GET", "/health/live") => {
321                let response = LivenessResponse { alive: true };
322                Ok((200, serde_json::to_value(response)?))
323            }
324
325            // Metrics
326            ("GET", "/metrics") => {
327                let metrics = state.metrics.read().await.clone();
328                Ok((200, serde_json::to_value(MetricsResponse::from(metrics))?))
329            }
330            ("GET", "/metrics/prometheus") => {
331                let metrics = state.metrics.read().await.clone();
332                let prometheus = Self::format_prometheus_metrics(&metrics);
333                Ok((200, serde_json::json!({ "text": prometheus })))
334            }
335
336            // Node management
337            ("GET", "/nodes") => {
338                let health = state.node_health.read().await;
339                let nodes: Vec<NodeHealthResponse> = health
340                    .values()
341                    .map(|h| NodeHealthResponse::from(h.clone()))
342                    .collect();
343                Ok((200, serde_json::to_value(nodes)?))
344            }
345            ("GET", path) if path.starts_with("/nodes/") => {
346                let node_addr = path.trim_start_matches("/nodes/");
347                let health = state.node_health.read().await;
348                match health.get(node_addr) {
349                    Some(h) => Ok((200, serde_json::to_value(NodeHealthResponse::from(h.clone()))?)),
350                    None => Ok((404, serde_json::json!({ "error": "Node not found" }))),
351                }
352            }
353            ("POST", path) if path.starts_with("/nodes/") && path.ends_with("/enable") => {
354                let node_addr = path
355                    .trim_start_matches("/nodes/")
356                    .trim_end_matches("/enable");
357                Self::set_node_enabled(state, node_addr, true).await?;
358                Ok((200, serde_json::json!({ "status": "enabled" })))
359            }
360            ("POST", path) if path.starts_with("/nodes/") && path.ends_with("/disable") => {
361                let node_addr = path
362                    .trim_start_matches("/nodes/")
363                    .trim_end_matches("/disable");
364                Self::set_node_enabled(state, node_addr, false).await?;
365                Ok((200, serde_json::json!({ "status": "disabled" })))
366            }
367
368            // Topology — joins config (role) with node_health (healthy)
369            // so external controllers (operator, ops dashboards) can
370            // populate `currentPrimary` / `healthyNodes` /
371            // `unhealthyNodes` in one round-trip. Designed for
372            // poll-friendly use; never blocks.
373            ("GET", "/topology") => {
374                let topo = Self::compute_topology(state).await;
375                Ok((200, serde_json::to_value(topo)?))
376            }
377
378            // Time-travel replay — replays a journal window against a
379            // target backend (typically a staging DB). Body shape is
380            // `ReplayRequestBody` below.
381            #[cfg(feature = "ha-tr")]
382            ("POST", "/api/replay") => Self::handle_replay_request(body, state).await,
383            #[cfg(not(feature = "ha-tr"))]
384            ("POST", "/api/replay") => Ok((
385                503,
386                serde_json::json!({ "error": "ha-tr feature not compiled in" }),
387            )),
388
389            // Shadow execution (T3.4) — runs a query against a source
390            // backend AND a shadow backend, diffs the result. Used for
391            // major-version upgrade validation, schema-migration
392            // canaries, replica-drift detection. Body is
393            // `ShadowRequestBody`.
394            #[cfg(feature = "ha-tr")]
395            ("POST", "/api/shadow") => Self::handle_shadow_request(body).await,
396            #[cfg(not(feature = "ha-tr"))]
397            ("POST", "/api/shadow") => Ok((
398                503,
399                serde_json::json!({ "error": "ha-tr feature not compiled in" }),
400            )),
401
402            // Loaded WASM plugins — name, version, hooks, state,
403            // invocation count. Returns 503 when no plugin manager
404            // is attached (proxy started without --features
405            // wasm-plugins or with plugins disabled in config).
406            ("GET", "/plugins") => Self::handle_plugins_list(state).await,
407
408            // Anomaly detector recent-events feed (T3.1). Optional
409            // ?limit query string clamps the response size; default
410            // is 100 events newest-first.
411            #[cfg(feature = "anomaly-detection")]
412            ("GET", p) if p == "/anomalies" || p.starts_with("/anomalies?") => {
413                Self::handle_anomalies_list(p, state).await
414            }
415            #[cfg(not(feature = "anomaly-detection"))]
416            ("GET", p) if p == "/anomalies" || p.starts_with("/anomalies?") => Ok((
417                503,
418                serde_json::json!({ "error": "anomaly-detection feature not compiled in" }),
419            )),
420
421            // Edge mode (T3.2). Stats panel for the home; the home's
422            // registered edges + cache stats; and a manual
423            // invalidation endpoint for ops drills.
424            #[cfg(feature = "edge-proxy")]
425            ("GET", "/api/edge") => Self::handle_edge_status(state).await,
426            #[cfg(feature = "edge-proxy")]
427            ("POST", "/api/edge/register") => {
428                Self::handle_edge_register(body, state).await
429            }
430            #[cfg(feature = "edge-proxy")]
431            ("POST", "/api/edge/invalidate") => {
432                Self::handle_edge_invalidate(body, state).await
433            }
434            #[cfg(not(feature = "edge-proxy"))]
435            ("GET", "/api/edge")
436            | ("POST", "/api/edge/register")
437            | ("POST", "/api/edge/invalidate") => Ok((
438                503,
439                serde_json::json!({ "error": "edge-proxy feature not compiled in" }),
440            )),
441
442            // Chaos engineering — controlled fault injection for HA
443            // testing. Body is `ChaosRequestBody`; supported actions
444            // are `force_unhealthy` / `restore` / `reset`.
445            ("POST", "/api/chaos") => Self::handle_chaos_request(body, state).await,
446            // Read current overrides so the UI can show "what's
447            // currently broken on purpose".
448            ("GET", "/api/chaos") => {
449                let overrides = state.chaos_overrides.read().await.clone();
450                Ok((200, serde_json::to_value(overrides)?))
451            }
452
453            // Configuration
454            ("GET", "/config") => {
455                let config = state.config_snapshot.read().await.clone();
456                Ok((200, serde_json::to_value(config)?))
457            }
458
459            // Sessions
460            ("GET", "/sessions") => {
461                let count = *state.active_sessions.read().await;
462                let response = SessionsResponse {
463                    active_sessions: count,
464                };
465                Ok((200, serde_json::to_value(response)?))
466            }
467
468            // Pools
469            ("GET", "/pools") => {
470                let pools = Self::get_pool_stats(state).await;
471                Ok((200, serde_json::to_value(pools)?))
472            }
473
474            // Version
475            ("GET", "/version") => {
476                let response = VersionResponse {
477                    version: crate::VERSION.to_string(),
478                    build_time: env!("CARGO_PKG_VERSION").to_string(),
479                };
480                Ok((200, serde_json::to_value(response)?))
481            }
482
483            // Not found
484            _ => Ok((404, serde_json::json!({ "error": "Not found" }))),
485        }
486    }
487
488    /// Handle SQL execution request with TWR (Transparent Write Routing)
489    async fn handle_sql_request(
490        body: Option<&str>,
491        state: &Arc<AdminState>,
492    ) -> Result<(u16, serde_json::Value)> {
493        // Parse request body
494        let body = body.ok_or_else(|| ProxyError::Internal("Missing request body".to_string()))?;
495        let request: SqlRequest = serde_json::from_str(body)
496            .map_err(|e| ProxyError::Internal(format!("Invalid JSON: {}", e)))?;
497
498        let sql = request.query.trim();
499        if sql.is_empty() {
500            return Ok((400, serde_json::json!({ "error": "Empty query" })));
501        }
502
503        // Classify query as read or write
504        let is_write = Self::is_write_query(sql);
505        let query_type = if is_write { "write" } else { "read" };
506
507        // Get proxy config
508        let proxy_config = state.proxy_config.read().await;
509        let config = proxy_config.as_ref()
510            .ok_or_else(|| ProxyError::Internal("Proxy config not initialized".to_string()))?;
511
512        // Get node health
513        let health = state.node_health.read().await;
514
515        // Select target node based on query type
516        let target_node = if is_write {
517            // Write queries always go to primary
518            Self::select_primary_node(config, &health)?
519        } else {
520            // Read queries can go to any healthy node with load balancing
521            Self::select_read_node(config, &health, state)?
522        };
523
524        let target_address = format!("{}:{}", target_node.host, target_node.port);
525        // Use HTTP port from node config (defaults to 8080)
526        let http_port = target_node.http_port;
527        let http_url = format!("http://{}:{}/api/sql", target_node.host, http_port);
528
529        tracing::debug!(
530            "Routing {} query to {} ({})",
531            query_type,
532            target_address,
533            match target_node.role {
534                NodeRole::Primary => "primary",
535                NodeRole::Standby => "standby",
536                NodeRole::ReadReplica => "replica",
537            }
538        );
539
540        // Forward request to backend node
541        let result = Self::forward_sql_request(&http_url, sql).await?;
542
543        // Return result with routing metadata
544        let response = SqlResponse {
545            query_type: query_type.to_string(),
546            routed_to: target_address,
547            node_role: format!("{:?}", target_node.role).to_lowercase(),
548            result,
549        };
550
551        Ok((200, serde_json::to_value(response)?))
552    }
553
554    /// Determine if a query is a write operation
555    fn is_write_query(sql: &str) -> bool {
556        let upper = sql.trim().to_uppercase();
557
558        // Write operations
559        if upper.starts_with("INSERT")
560            || upper.starts_with("UPDATE")
561            || upper.starts_with("DELETE")
562            || upper.starts_with("CREATE")
563            || upper.starts_with("ALTER")
564            || upper.starts_with("DROP")
565            || upper.starts_with("TRUNCATE")
566            || upper.starts_with("GRANT")
567            || upper.starts_with("REVOKE")
568            || upper.starts_with("VACUUM")
569            || upper.starts_with("REINDEX")
570            || upper.starts_with("MERGE")
571            || upper.starts_with("UPSERT")
572        {
573            return true;
574        }
575
576        // Transaction control that might contain writes
577        if upper.starts_with("BEGIN")
578            || upper.starts_with("COMMIT")
579            || upper.starts_with("ROLLBACK")
580            || upper.starts_with("SAVEPOINT")
581        {
582            // Transaction control goes to primary for safety
583            return true;
584        }
585
586        // Read operations
587        false
588    }
589
590    /// Select primary node for write queries
591    fn select_primary_node<'a>(
592        config: &'a ProxyConfig,
593        health: &HashMap<String, NodeHealth>,
594    ) -> Result<&'a NodeConfig> {
595        config.nodes.iter()
596            .find(|n| {
597                n.role == NodeRole::Primary
598                    && n.enabled
599                    && health.get(&n.address()).map(|h| h.healthy).unwrap_or(false)
600            })
601            .ok_or_else(|| ProxyError::Internal("No healthy primary node available".to_string()))
602    }
603
604    /// Select node for read queries with load balancing
605    fn select_read_node<'a>(
606        config: &'a ProxyConfig,
607        health: &HashMap<String, NodeHealth>,
608        state: &AdminState,
609    ) -> Result<&'a NodeConfig> {
610        // Get all healthy nodes (primary, standby, or replica)
611        let healthy_nodes: Vec<&NodeConfig> = config.nodes.iter()
612            .filter(|n| n.enabled && health.get(&n.address()).map(|h| h.healthy).unwrap_or(false))
613            .collect();
614
615        if healthy_nodes.is_empty() {
616            return Err(ProxyError::Internal("No healthy nodes available".to_string()));
617        }
618
619        // If read/write splitting is enabled and there are standbys, prefer them
620        if config.load_balancer.read_write_split {
621            let read_nodes: Vec<&NodeConfig> = healthy_nodes.iter()
622                .filter(|n| n.role == NodeRole::Standby || n.role == NodeRole::ReadReplica)
623                .copied()
624                .collect();
625
626            if !read_nodes.is_empty() {
627                // Round-robin across read nodes
628                let counter = state.read_lb_counter.fetch_add(1, Ordering::Relaxed);
629                let index = counter % read_nodes.len();
630                return Ok(read_nodes[index]);
631            }
632        }
633
634        // Fall back to round-robin across all healthy nodes
635        let counter = state.read_lb_counter.fetch_add(1, Ordering::Relaxed);
636        let index = counter % healthy_nodes.len();
637        Ok(healthy_nodes[index])
638    }
639
640    /// Forward SQL request to backend node's HTTP API
641    async fn forward_sql_request(url: &str, sql: &str) -> Result<serde_json::Value> {
642        // Build HTTP request
643        let request_body = serde_json::json!({ "query": sql });
644        let body_bytes = serde_json::to_vec(&request_body)
645            .map_err(|e| ProxyError::Internal(format!("JSON serialization error: {}", e)))?;
646
647        // Parse URL
648        let url_parts: Vec<&str> = url.trim_start_matches("http://").splitn(2, '/').collect();
649        if url_parts.is_empty() {
650            return Err(ProxyError::Internal("Invalid URL".to_string()));
651        }
652
653        let host_port = url_parts[0];
654        let path = if url_parts.len() > 1 {
655            format!("/{}", url_parts[1])
656        } else {
657            "/".to_string()
658        };
659
660        // Connect to backend
661        let stream = TcpStream::connect(host_port).await
662            .map_err(|e| ProxyError::Network(format!("Failed to connect to {}: {}", host_port, e)))?;
663
664        let (reader, mut writer) = stream.into_split();
665        let mut reader = BufReader::new(reader);
666
667        // Send HTTP request
668        let request = format!(
669            "POST {} HTTP/1.1\r\nHost: {}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
670            path,
671            host_port,
672            body_bytes.len()
673        );
674
675        writer.write_all(request.as_bytes()).await
676            .map_err(|e| ProxyError::Network(format!("Write error: {}", e)))?;
677        writer.write_all(&body_bytes).await
678            .map_err(|e| ProxyError::Network(format!("Write body error: {}", e)))?;
679
680        // Read response headers
681        let mut response_headers = Vec::new();
682        let mut line = String::new();
683        let mut content_length: usize = 0;
684
685        loop {
686            line.clear();
687            let bytes_read = reader.read_line(&mut line).await
688                .map_err(|e| ProxyError::Network(format!("Response read error: {}", e)))?;
689
690            if bytes_read == 0 || line == "\r\n" {
691                break;
692            }
693
694            let trimmed = line.trim();
695            if trimmed.to_lowercase().starts_with("content-length:") {
696                if let Some(len_str) = trimmed.split(':').nth(1) {
697                    content_length = len_str.trim().parse().unwrap_or(0);
698                }
699            }
700            response_headers.push(trimmed.to_string());
701        }
702
703        // Read response body
704        let mut body_buf = vec![0u8; content_length];
705        if content_length > 0 {
706            reader.read_exact(&mut body_buf).await
707                .map_err(|e| ProxyError::Network(format!("Response body read error: {}", e)))?;
708        }
709
710        let response_body = String::from_utf8_lossy(&body_buf);
711
712        // Parse JSON response
713        serde_json::from_str(&response_body)
714            .map_err(|e| ProxyError::Internal(format!("Invalid JSON response: {} - body: {}", e, response_body)))
715    }
716
717    /// Check if proxy is ready to accept connections
718    async fn check_readiness(state: &Arc<AdminState>) -> bool {
719        let health = state.node_health.read().await;
720
721        // Need at least one healthy primary
722        health.values().any(|h| h.healthy)
723    }
724
725    /// Set node enabled status
726    async fn set_node_enabled(state: &Arc<AdminState>, node_addr: &str, enabled: bool) -> Result<()> {
727        let mut health = state.node_health.write().await;
728
729        if let Some(node_health) = health.get_mut(node_addr) {
730            node_health.healthy = enabled;
731            Ok(())
732        } else {
733            Err(ProxyError::Config(format!("Node not found: {}", node_addr)))
734        }
735    }
736
737    /// Get pool statistics
738    async fn get_pool_stats(_state: &Arc<AdminState>) -> Vec<PoolStatsResponse> {
739        // Placeholder - in real implementation would query pool state
740        Vec::new()
741    }
742
743    /// Handle `POST /api/replay`. Body is a JSON `ReplayRequestBody`.
744    /// Returns 503 when no replay engine is attached, 400 on a malformed
745    /// body or inverted window, 200 with `ReplaySummary` on success.
746    #[cfg(feature = "ha-tr")]
747    async fn handle_replay_request(
748        body: Option<&str>,
749        state: &Arc<AdminState>,
750    ) -> Result<(u16, serde_json::Value)> {
751        let raw = body.ok_or_else(|| {
752            ProxyError::Internal("replay: empty request body".to_string())
753        })?;
754        let req: ReplayRequestBody = match serde_json::from_str(raw) {
755            Ok(r) => r,
756            Err(e) => {
757                return Ok((
758                    400,
759                    serde_json::json!({ "error": format!("invalid body: {}", e) }),
760                ));
761            }
762        };
763        let engine = match state.replay_engine.read().await.clone() {
764            Some(e) => e,
765            None => {
766                return Ok((
767                    503,
768                    serde_json::json!({ "error": "replay engine not attached" }),
769                ));
770            }
771        };
772        let tt = TimeTravelRequest {
773            from: req.from,
774            to: req.to,
775            target_host: req.target_host,
776            target_port: req.target_port,
777            target_user: req.target_user,
778            target_password: req.target_password,
779            target_database: req.target_database,
780        };
781        match engine.replay_window(&tt).await {
782            Ok(summary) => Ok((200, serde_json::to_value(summary)?)),
783            Err(e) => Ok((
784                500,
785                serde_json::json!({ "error": format!("replay failed: {}", e) }),
786            )),
787        }
788    }
789
790    /// `GET /api/edge` — surfaces edge-mode state: cache stats +
791    /// the list of registered edges (when running in home mode).
792    #[cfg(feature = "edge-proxy")]
793    async fn handle_edge_status(
794        state: &Arc<AdminState>,
795    ) -> Result<(u16, serde_json::Value)> {
796        let cache_stats = match state.edge_cache.read().await.clone() {
797            Some(c) => Some(c.stats()),
798            None => None,
799        };
800        let edges = match state.edge_registry.read().await.clone() {
801            Some(r) => r.list(),
802            None => Vec::new(),
803        };
804        Ok((200, serde_json::json!({
805            "cache":          cache_stats,
806            "registered":     edges,
807            "edge_count":     edges.len(),
808        })))
809    }
810
811    /// `POST /api/edge/register` — edges call this once at boot to
812    /// announce themselves to the home. Body shape:
813    /// `{"edge_id":"e1","region":"us-east","base_url":"https://e1"}`.
814    /// Returns 201 with the assigned slot, 503 when registry full.
815    #[cfg(feature = "edge-proxy")]
816    async fn handle_edge_register(
817        body: Option<&str>,
818        state: &Arc<AdminState>,
819    ) -> Result<(u16, serde_json::Value)> {
820        let raw = body.ok_or_else(|| {
821            ProxyError::Internal("edge register: empty body".to_string())
822        })?;
823        let req: EdgeRegisterBody = match serde_json::from_str(raw) {
824            Ok(r) => r,
825            Err(e) => {
826                return Ok((
827                    400,
828                    serde_json::json!({ "error": format!("invalid body: {}", e) }),
829                ));
830            }
831        };
832        let registry = match state.edge_registry.read().await.clone() {
833            Some(r) => r,
834            None => {
835                return Ok((
836                    503,
837                    serde_json::json!({ "error": "edge registry not attached" }),
838                ));
839            }
840        };
841        let now = chrono::Utc::now().to_rfc3339();
842        match registry.register(&req.edge_id, &req.region, &req.base_url, &now) {
843            Ok(_rx) => {
844                // Receiver dropped here — in production the SSE
845                // handler keeps it alive for the connection's
846                // lifetime. For the JSON endpoint, we acknowledge
847                // the registration and the edge polls /api/edge for
848                // invalidations until SSE is wired.
849                Ok((201, serde_json::json!({
850                    "edge_id":  req.edge_id,
851                    "region":   req.region,
852                    "base_url": req.base_url,
853                    "registered_at": now,
854                })))
855            }
856            Err(e) => Ok((
857                503,
858                serde_json::json!({ "error": e.to_string() }),
859            )),
860        }
861    }
862
863    /// `POST /api/edge/invalidate` — manual invalidation for ops
864    /// drills. The proxy normally fans out invalidations
865    /// automatically on writes; this endpoint is for "I just ran
866    /// a migration outside the proxy, please drop caches".
867    /// Body: `{"tables":["users"],"up_to_version":null}` — null
868    /// version means "use the cache's current version" (drop all).
869    #[cfg(feature = "edge-proxy")]
870    async fn handle_edge_invalidate(
871        body: Option<&str>,
872        state: &Arc<AdminState>,
873    ) -> Result<(u16, serde_json::Value)> {
874        let raw = body.ok_or_else(|| {
875            ProxyError::Internal("edge invalidate: empty body".to_string())
876        })?;
877        let req: EdgeInvalidateBody = match serde_json::from_str(raw) {
878            Ok(r) => r,
879            Err(e) => {
880                return Ok((
881                    400,
882                    serde_json::json!({ "error": format!("invalid body: {}", e) }),
883                ));
884            }
885        };
886        let cache = match state.edge_cache.read().await.clone() {
887            Some(c) => c,
888            None => {
889                return Ok((
890                    503,
891                    serde_json::json!({ "error": "edge cache not attached" }),
892                ));
893            }
894        };
895        let registry = match state.edge_registry.read().await.clone() {
896            Some(r) => r,
897            None => {
898                return Ok((
899                    503,
900                    serde_json::json!({ "error": "edge registry not attached" }),
901                ));
902            }
903        };
904        let version = req.up_to_version.unwrap_or_else(|| cache.next_version());
905        // Local cache invalidation (home-side cache, if any).
906        let dropped_local = cache.invalidate(version, &req.tables);
907        // Fan out to every registered edge.
908        let ev = InvalidationEvent {
909            up_to_version: version,
910            tables: req.tables.clone(),
911            committed_at: chrono::Utc::now().to_rfc3339(),
912        };
913        let (sent, pruned) = registry.broadcast(ev).await;
914        Ok((200, serde_json::json!({
915            "version":         version,
916            "tables":          req.tables,
917            "dropped_local":   dropped_local,
918            "edges_notified":  sent,
919            "edges_pruned":    pruned,
920        })))
921    }
922
923    /// Handle `GET /anomalies`. Returns the anomaly detector's
924    /// recent-events ring buffer as JSON. Optional `?limit=N`
925    /// query string clamps the response size (default 100, max 1024).
926    /// Returns 503 when the detector hasn't been attached.
927    #[cfg(feature = "anomaly-detection")]
928    async fn handle_anomalies_list(
929        path: &str,
930        state: &Arc<AdminState>,
931    ) -> Result<(u16, serde_json::Value)> {
932        let limit = parse_limit_query(path, 100, 1024);
933        let det = match state.anomaly_detector.read().await.clone() {
934            Some(d) => d,
935            None => {
936                return Ok((
937                    503,
938                    serde_json::json!({ "error": "anomaly detector not attached" }),
939                ));
940            }
941        };
942        let events = det.recent_events(limit);
943        Ok((200, serde_json::json!({
944            "count":     events.len(),
945            "limit":     limit,
946            "events":    events,
947            "buffer_total": det.event_count(),
948        })))
949    }
950
951    /// Handle `POST /api/shadow`. Body is a JSON `ShadowRequestBody`.
952    /// Connects to both source and shadow backends, runs the SQL on
953    /// each, returns a `ShadowExecuteReport` with the diff.
954    ///
955    /// Status codes:
956    ///   200 — both sides ran (report carries pass/fail details)
957    ///   400 — malformed body
958    ///   500 — source connect failure (shadow connect failures end up
959    ///         in the report rather than the HTTP status)
960    #[cfg(feature = "ha-tr")]
961    async fn handle_shadow_request(
962        body: Option<&str>,
963    ) -> Result<(u16, serde_json::Value)> {
964        use crate::backend::{tls::default_client_config, BackendClient, BackendConfig, ParamValue, TlsMode};
965        use crate::shadow_execute::shadow_execute;
966
967        let raw = body.ok_or_else(|| {
968            ProxyError::Internal("shadow: empty request body".to_string())
969        })?;
970        let req: ShadowRequestBody = match serde_json::from_str(raw) {
971            Ok(r) => r,
972            Err(e) => {
973                return Ok((
974                    400,
975                    serde_json::json!({ "error": format!("invalid body: {}", e) }),
976                ));
977            }
978        };
979
980        // Build the two configs from the request. TLS off + 5s
981        // connect / 30s query timeouts mirror the replay defaults.
982        let mk_cfg = |host: String, port: u16, user: Option<String>, password: Option<String>, database: Option<String>| BackendConfig {
983            host,
984            port,
985            user: user.unwrap_or_else(|| "postgres".into()),
986            password,
987            database,
988            application_name: Some("heliosdb-proxy-shadow".into()),
989            tls_mode: TlsMode::Disable,
990            connect_timeout: std::time::Duration::from_secs(5),
991            query_timeout: std::time::Duration::from_secs(30),
992            tls_config: default_client_config(),
993        };
994        let source_cfg = mk_cfg(
995            req.source_host,
996            req.source_port,
997            req.source_user,
998            req.source_password,
999            req.source_database,
1000        );
1001        let shadow_cfg = mk_cfg(
1002            req.shadow_host,
1003            req.shadow_port,
1004            req.shadow_user,
1005            req.shadow_password,
1006            req.shadow_database,
1007        );
1008
1009        // Connect to source. Connect failure here is a real HTTP
1010        // error since we can't even attempt the diff; shadow connect
1011        // failures land inside the report as `shadow_error`.
1012        let mut source = match BackendClient::connect(&source_cfg).await {
1013            Ok(c) => c,
1014            Err(e) => {
1015                return Ok((
1016                    500,
1017                    serde_json::json!({ "error": format!("source connect: {}", e) }),
1018                ));
1019            }
1020        };
1021
1022        let params: Vec<ParamValue> = req
1023            .params
1024            .unwrap_or_default()
1025            .into_iter()
1026            .map(|s| ParamValue::Text(s))
1027            .collect();
1028
1029        let outcome = shadow_execute(&mut source, &shadow_cfg, &req.sql, &params).await;
1030        source.close().await;
1031
1032        match outcome {
1033            Ok((_qr, report)) => Ok((200, serde_json::json!({
1034                "sql":                report.sql,
1035                "both_succeeded":     report.both_succeeded,
1036                "row_count_match":    report.row_count_match,
1037                "row_hash_match":     report.row_hash_match,
1038                "primary_elapsed_us": report.primary_elapsed_us,
1039                "shadow_elapsed_us":  report.shadow_elapsed_us,
1040                "primary_error":      report.primary_error,
1041                "shadow_error":       report.shadow_error,
1042                "is_clean":           report.is_clean(),
1043            }))),
1044            Err(e) => Ok((
1045                500,
1046                serde_json::json!({ "error": format!("shadow execute: {}", e) }),
1047            )),
1048        }
1049    }
1050
1051    /// Handle `POST /api/chaos`. Body is a JSON `ChaosRequestBody`.
1052    ///
1053    /// Supported actions (intentionally narrow — the goal is "test
1054    /// the failover machinery without external chaos tooling", not
1055    /// "ship a kitchen-sink fault injector"):
1056    ///
1057    ///   force_unhealthy { target_node }  — flip the node's health flag
1058    ///                                      to false; the failover
1059    ///                                      controller observes this and
1060    ///                                      reroutes traffic.
1061    ///   restore         { target_node }  — flip the node's health flag
1062    ///                                      back to true and clear the
1063    ///                                      override entry.
1064    ///   reset                            — restore every overridden
1065    ///                                      node in one call.
1066    async fn handle_chaos_request(
1067        body: Option<&str>,
1068        state: &Arc<AdminState>,
1069    ) -> Result<(u16, serde_json::Value)> {
1070        let raw = body.ok_or_else(|| {
1071            ProxyError::Internal("chaos: empty request body".to_string())
1072        })?;
1073        let action: ChaosAction = match serde_json::from_str(raw) {
1074            Ok(a) => a,
1075            Err(e) => {
1076                return Ok((
1077                    400,
1078                    serde_json::json!({ "error": format!("invalid body: {}", e) }),
1079                ));
1080            }
1081        };
1082        match action {
1083            ChaosAction::ForceUnhealthy { target_node } => {
1084                if let Err(e) = Self::set_node_enabled(state, &target_node, false).await {
1085                    return Ok((
1086                        404,
1087                        serde_json::json!({ "error": e.to_string() }),
1088                    ));
1089                }
1090                state.chaos_overrides.write().await.insert(
1091                    target_node.clone(),
1092                    ChaosOverride {
1093                        since: chrono::Utc::now().to_rfc3339(),
1094                        kind: "force_unhealthy".to_string(),
1095                        note: format!("forced unhealthy via chaos endpoint"),
1096                    },
1097                );
1098                Ok((200, serde_json::json!({
1099                    "applied":     "force_unhealthy",
1100                    "target_node": target_node,
1101                })))
1102            }
1103            ChaosAction::Restore { target_node } => {
1104                if let Err(e) = Self::set_node_enabled(state, &target_node, true).await {
1105                    return Ok((
1106                        404,
1107                        serde_json::json!({ "error": e.to_string() }),
1108                    ));
1109                }
1110                state.chaos_overrides.write().await.remove(&target_node);
1111                Ok((200, serde_json::json!({
1112                    "restored":    target_node,
1113                })))
1114            }
1115            ChaosAction::Reset => {
1116                let overrides: Vec<String> =
1117                    state.chaos_overrides.read().await.keys().cloned().collect();
1118                let mut restored = Vec::with_capacity(overrides.len());
1119                for addr in overrides {
1120                    let _ = Self::set_node_enabled(state, &addr, true).await;
1121                    restored.push(addr);
1122                }
1123                state.chaos_overrides.write().await.clear();
1124                Ok((200, serde_json::json!({
1125                    "reset":      true,
1126                    "restored":   restored,
1127                })))
1128            }
1129        }
1130    }
1131
1132    /// Handle `GET /plugins`. Returns 503 when no plugin manager is
1133    /// attached, 200 with `Vec<PluginListEntry>` otherwise. Building
1134    /// the response in admin.rs (rather than serialising
1135    /// `plugins::PluginInfo` directly) keeps the plugins module
1136    /// independent of serde — only the wire shape lives here.
1137    #[cfg(feature = "wasm-plugins")]
1138    async fn handle_plugins_list(state: &Arc<AdminState>) -> Result<(u16, serde_json::Value)> {
1139        let pm = match state.plugin_manager.read().await.clone() {
1140            Some(p) => p,
1141            None => {
1142                return Ok((
1143                    503,
1144                    serde_json::json!({ "error": "plugin manager not attached" }),
1145                ));
1146            }
1147        };
1148        let plugins: Vec<PluginListEntry> = pm
1149            .list_plugins()
1150            .into_iter()
1151            .map(|info| PluginListEntry {
1152                name: info.name,
1153                version: info.version,
1154                description: info.description,
1155                hooks: info
1156                    .hooks
1157                    .iter()
1158                    .map(|h| h.export_name().to_string())
1159                    .collect(),
1160                state: format!("{:?}", info.state),
1161                invocations: info.stats.total_calls,
1162                errors: info.stats.error_count,
1163            })
1164            .collect();
1165        Ok((200, serde_json::to_value(plugins)?))
1166    }
1167
1168    #[cfg(not(feature = "wasm-plugins"))]
1169    async fn handle_plugins_list(_state: &Arc<AdminState>) -> Result<(u16, serde_json::Value)> {
1170        Ok((
1171            503,
1172            serde_json::json!({ "error": "wasm-plugins feature not compiled in" }),
1173        ))
1174    }
1175
1176    /// Compute the joined topology view used by `/topology`.
1177    ///
1178    /// `currentPrimary` is the address of the first node whose role
1179    /// is "primary" and whose health entry is `healthy = true`. None
1180    /// is the legitimate answer when failover is in progress.
1181    async fn compute_topology(state: &Arc<AdminState>) -> TopologyResponse {
1182        let health = state.node_health.read().await;
1183        let cfg = state.config_snapshot.read().await;
1184
1185        let mut current_primary: Option<String> = None;
1186        for n in &cfg.nodes {
1187            if n.role.eq_ignore_ascii_case("primary") {
1188                let healthy = health.get(&n.address).map(|h| h.healthy).unwrap_or(false);
1189                if healthy {
1190                    current_primary = Some(n.address.clone());
1191                    break;
1192                }
1193            }
1194        }
1195
1196        let healthy_nodes = health.values().filter(|h| h.healthy).count() as u32;
1197        let unhealthy_nodes = health.values().filter(|h| !h.healthy).count() as u32;
1198        let total_nodes = cfg.nodes.len() as u32;
1199
1200        TopologyResponse {
1201            current_primary,
1202            healthy_nodes,
1203            unhealthy_nodes,
1204            total_nodes,
1205            last_failover_at: None,
1206        }
1207    }
1208
1209    /// Format metrics as Prometheus text format
1210    fn format_prometheus_metrics(metrics: &ServerMetricsSnapshot) -> String {
1211        let mut output = String::new();
1212
1213        output.push_str("# HELP heliosdb_proxy_connections_total Total connections accepted\n");
1214        output.push_str("# TYPE heliosdb_proxy_connections_total counter\n");
1215        output.push_str(&format!(
1216            "heliosdb_proxy_connections_total {}\n",
1217            metrics.connections_accepted
1218        ));
1219
1220        output.push_str("# HELP heliosdb_proxy_connections_closed Total connections closed\n");
1221        output.push_str("# TYPE heliosdb_proxy_connections_closed counter\n");
1222        output.push_str(&format!(
1223            "heliosdb_proxy_connections_closed {}\n",
1224            metrics.connections_closed
1225        ));
1226
1227        output.push_str("# HELP heliosdb_proxy_queries_total Total queries processed\n");
1228        output.push_str("# TYPE heliosdb_proxy_queries_total counter\n");
1229        output.push_str(&format!(
1230            "heliosdb_proxy_queries_total {}\n",
1231            metrics.queries_processed
1232        ));
1233
1234        output.push_str("# HELP heliosdb_proxy_bytes_received_total Total bytes received\n");
1235        output.push_str("# TYPE heliosdb_proxy_bytes_received_total counter\n");
1236        output.push_str(&format!(
1237            "heliosdb_proxy_bytes_received_total {}\n",
1238            metrics.bytes_received
1239        ));
1240
1241        output.push_str("# HELP heliosdb_proxy_bytes_sent_total Total bytes sent\n");
1242        output.push_str("# TYPE heliosdb_proxy_bytes_sent_total counter\n");
1243        output.push_str(&format!(
1244            "heliosdb_proxy_bytes_sent_total {}\n",
1245            metrics.bytes_sent
1246        ));
1247
1248        output.push_str("# HELP heliosdb_proxy_failovers_total Total failovers\n");
1249        output.push_str("# TYPE heliosdb_proxy_failovers_total counter\n");
1250        output.push_str(&format!(
1251            "heliosdb_proxy_failovers_total {}\n",
1252            metrics.failovers
1253        ));
1254
1255        output
1256    }
1257
1258    /// Send HTTP response
1259    async fn send_response(
1260        writer: &mut tokio::net::tcp::WriteHalf<'_>,
1261        status: u16,
1262        status_text: &str,
1263        body: &str,
1264    ) -> Result<()> {
1265        let response = format!(
1266            "HTTP/1.1 {} {}\r\nContent-Type: text/plain\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
1267            status,
1268            status_text,
1269            body.len(),
1270            body
1271        );
1272
1273        writer
1274            .write_all(response.as_bytes())
1275            .await
1276            .map_err(|e| ProxyError::Network(format!("Write error: {}", e)))?;
1277
1278        Ok(())
1279    }
1280
1281    /// Send JSON HTTP response
1282    async fn send_json_response<T: Serialize>(
1283        writer: &mut tokio::net::tcp::WriteHalf<'_>,
1284        status: u16,
1285        body: &T,
1286    ) -> Result<()> {
1287        let json = serde_json::to_string(body)
1288            .map_err(|e| ProxyError::Internal(format!("JSON error: {}", e)))?;
1289
1290        let status_text = match status {
1291            200 => "OK",
1292            400 => "Bad Request",
1293            404 => "Not Found",
1294            500 => "Internal Server Error",
1295            503 => "Service Unavailable",
1296            _ => "Unknown",
1297        };
1298
1299        let response = format!(
1300            "HTTP/1.1 {} {}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
1301            status,
1302            status_text,
1303            json.len(),
1304            json
1305        );
1306
1307        writer
1308            .write_all(response.as_bytes())
1309            .await
1310            .map_err(|e| ProxyError::Network(format!("Write error: {}", e)))?;
1311
1312        Ok(())
1313    }
1314
1315    /// Shutdown the admin server
1316    pub fn shutdown(&self) {
1317        let _ = self.shutdown_tx.send(());
1318    }
1319}
1320
1321impl AdminState {
1322    /// Create new admin state
1323    pub fn new() -> Self {
1324        Self {
1325            node_health: RwLock::new(HashMap::new()),
1326            metrics: RwLock::new(ServerMetricsSnapshot {
1327                connections_accepted: 0,
1328                connections_closed: 0,
1329                queries_processed: 0,
1330                bytes_received: 0,
1331                bytes_sent: 0,
1332                failovers: 0,
1333            }),
1334            active_sessions: RwLock::new(0),
1335            config_snapshot: RwLock::new(ConfigSnapshot {
1336                listen_address: String::new(),
1337                admin_address: String::new(),
1338                tr_enabled: false,
1339                tr_mode: String::new(),
1340                pool_min_connections: 0,
1341                pool_max_connections: 0,
1342                nodes: Vec::new(),
1343            }),
1344            proxy_config: RwLock::new(None),
1345            read_lb_counter: AtomicUsize::new(0),
1346            commands: RwLock::new(HashMap::new()),
1347            #[cfg(feature = "ha-tr")]
1348            replay_engine: RwLock::new(None),
1349            #[cfg(feature = "wasm-plugins")]
1350            plugin_manager: RwLock::new(None),
1351            chaos_overrides: RwLock::new(HashMap::new()),
1352            #[cfg(feature = "anomaly-detection")]
1353            anomaly_detector: RwLock::new(None),
1354            #[cfg(feature = "edge-proxy")]
1355            edge_cache: RwLock::new(None),
1356            #[cfg(feature = "edge-proxy")]
1357            edge_registry: RwLock::new(None),
1358        }
1359    }
1360
1361    /// Attach an anomaly detector. Mirror of with_replay_engine /
1362    /// with_plugin_manager — wired by the server at startup.
1363    #[cfg(feature = "anomaly-detection")]
1364    pub async fn with_anomaly_detector(&self, detector: Arc<AnomalyDetector>) {
1365        *self.anomaly_detector.write().await = Some(detector);
1366    }
1367
1368    /// Attach edge cache + registry. Server calls this once at
1369    /// startup; both Arcs are the same instances ServerState holds.
1370    #[cfg(feature = "edge-proxy")]
1371    pub async fn with_edge(&self, cache: Arc<EdgeCache>, registry: Arc<EdgeRegistry>) {
1372        *self.edge_cache.write().await = Some(cache);
1373        *self.edge_registry.write().await = Some(registry);
1374    }
1375
1376    /// Attach a time-travel replay engine. Production startup calls
1377    /// this once with a `ReplayEngine` constructed from the proxy's
1378    /// shared `TransactionJournal` + a `BackendConfig` template; the
1379    /// `/api/replay` endpoint returns 503 until this is set.
1380    #[cfg(feature = "ha-tr")]
1381    pub async fn with_replay_engine(&self, engine: Arc<ReplayEngine>) {
1382        *self.replay_engine.write().await = Some(engine);
1383    }
1384
1385    /// Attach a WASM plugin manager. Production startup calls this
1386    /// once with the same Arc held by ProxyServer; the `/plugins`
1387    /// endpoint returns 503 until set.
1388    #[cfg(feature = "wasm-plugins")]
1389    pub async fn with_plugin_manager(&self, manager: Arc<PluginManager>) {
1390        *self.plugin_manager.write().await = Some(manager);
1391    }
1392
1393    /// Set the proxy configuration for SQL routing
1394    pub async fn set_proxy_config(&self, config: ProxyConfig) {
1395        let mut proxy_config = self.proxy_config.write().await;
1396        *proxy_config = Some(config);
1397    }
1398
1399    /// Register a command handler
1400    pub async fn register_command<F>(&self, name: &str, handler: F)
1401    where
1402        F: Fn(&[&str]) -> Result<String> + Send + Sync + 'static,
1403    {
1404        let mut commands = self.commands.write().await;
1405        commands.insert(name.to_string(), Arc::new(handler));
1406    }
1407
1408    /// Execute a command
1409    pub async fn execute_command(&self, name: &str, args: &[&str]) -> Result<String> {
1410        let commands = self.commands.read().await;
1411        match commands.get(name) {
1412            Some(handler) => handler(args),
1413            None => Err(ProxyError::Internal(format!("Unknown command: {}", name))),
1414        }
1415    }
1416}
1417
1418impl Default for AdminState {
1419    fn default() -> Self {
1420        Self::new()
1421    }
1422}
1423
1424// Request and Response types
1425
1426/// SQL execution request
1427#[derive(Debug, Deserialize)]
1428struct SqlRequest {
1429    /// SQL query to execute
1430    query: String,
1431}
1432
1433/// SQL execution response
1434#[derive(Debug, Serialize)]
1435struct SqlResponse {
1436    /// Query type (read/write)
1437    query_type: String,
1438    /// Node the query was routed to
1439    routed_to: String,
1440    /// Role of the target node
1441    node_role: String,
1442    /// Query result from backend
1443    result: serde_json::Value,
1444}
1445
1446#[derive(Serialize)]
1447struct HealthResponse {
1448    status: &'static str,
1449}
1450
1451#[derive(Serialize)]
1452struct ReadinessResponse {
1453    ready: bool,
1454    message: &'static str,
1455}
1456
1457#[derive(Serialize)]
1458struct LivenessResponse {
1459    alive: bool,
1460}
1461
1462#[derive(Serialize)]
1463struct ErrorResponse {
1464    error: String,
1465}
1466
1467#[derive(Serialize)]
1468struct MetricsResponse {
1469    connections_accepted: u64,
1470    connections_closed: u64,
1471    connections_active: u64,
1472    queries_processed: u64,
1473    bytes_received: u64,
1474    bytes_sent: u64,
1475    failovers: u64,
1476}
1477
1478impl From<ServerMetricsSnapshot> for MetricsResponse {
1479    fn from(m: ServerMetricsSnapshot) -> Self {
1480        Self {
1481            connections_accepted: m.connections_accepted,
1482            connections_closed: m.connections_closed,
1483            connections_active: m.connections_accepted.saturating_sub(m.connections_closed),
1484            queries_processed: m.queries_processed,
1485            bytes_received: m.bytes_received,
1486            bytes_sent: m.bytes_sent,
1487            failovers: m.failovers,
1488        }
1489    }
1490}
1491
1492#[derive(Serialize)]
1493struct NodeHealthResponse {
1494    address: String,
1495    healthy: bool,
1496    last_check: String,
1497    failure_count: u32,
1498    last_error: Option<String>,
1499    latency_ms: f64,
1500    replication_lag_bytes: Option<u64>,
1501}
1502
1503impl From<NodeHealth> for NodeHealthResponse {
1504    fn from(h: NodeHealth) -> Self {
1505        Self {
1506            address: h.address,
1507            healthy: h.healthy,
1508            last_check: h.last_check.to_rfc3339(),
1509            failure_count: h.failure_count,
1510            last_error: h.last_error,
1511            latency_ms: h.latency_ms,
1512            replication_lag_bytes: h.replication_lag_bytes,
1513        }
1514    }
1515}
1516
1517#[derive(Serialize)]
1518struct SessionsResponse {
1519    active_sessions: u64,
1520}
1521
1522/// JSON body for `POST /api/edge/register`.
1523#[cfg(feature = "edge-proxy")]
1524#[derive(Debug, Deserialize)]
1525struct EdgeRegisterBody {
1526    edge_id: String,
1527    region: String,
1528    base_url: String,
1529}
1530
1531/// JSON body for `POST /api/edge/invalidate`. `up_to_version` is
1532/// optional — when None, the cache mints the next version stamp
1533/// (effectively "drop everything matching `tables`").
1534#[cfg(feature = "edge-proxy")]
1535#[derive(Debug, Deserialize)]
1536struct EdgeInvalidateBody {
1537    #[serde(default)]
1538    tables: Vec<String>,
1539    #[serde(default)]
1540    up_to_version: Option<u64>,
1541}
1542
1543/// Parse `?limit=N` from a path. Returns clamped value, or `default`
1544/// when the param is missing / unparseable.
1545#[cfg(feature = "anomaly-detection")]
1546fn parse_limit_query(path: &str, default: usize, max: usize) -> usize {
1547    let q = match path.find('?') {
1548        Some(i) => &path[i + 1..],
1549        None => return default,
1550    };
1551    for kv in q.split('&') {
1552        let mut it = kv.splitn(2, '=');
1553        if let (Some(k), Some(v)) = (it.next(), it.next()) {
1554            if k == "limit" {
1555                if let Ok(n) = v.parse::<usize>() {
1556                    return n.min(max);
1557                }
1558            }
1559        }
1560    }
1561    default
1562}
1563
1564/// JSON body for `POST /api/shadow`.
1565#[cfg(feature = "ha-tr")]
1566#[derive(Debug, Deserialize)]
1567struct ShadowRequestBody {
1568    /// SQL to execute on both sides.
1569    sql: String,
1570    /// Optional text-format parameters interpolated into `sql`. None
1571    /// or empty list runs as a simple_query.
1572    #[serde(default)]
1573    params: Option<Vec<String>>,
1574
1575    /// Source backend (the side whose result the application would
1576    /// see in production).
1577    source_host: String,
1578    source_port: u16,
1579    #[serde(default)]
1580    source_user: Option<String>,
1581    #[serde(default)]
1582    source_password: Option<String>,
1583    #[serde(default)]
1584    source_database: Option<String>,
1585
1586    /// Shadow backend (the side being validated — typically a
1587    /// new-version replica or post-migration schema).
1588    shadow_host: String,
1589    shadow_port: u16,
1590    #[serde(default)]
1591    shadow_user: Option<String>,
1592    #[serde(default)]
1593    shadow_password: Option<String>,
1594    #[serde(default)]
1595    shadow_database: Option<String>,
1596}
1597
1598/// Chaos actions the proxy supports today. Forward-compatible —
1599/// unknown actions deserialise as an error.
1600///
1601/// Wire shape: `{"action":"force_unhealthy","target_node":"..."}`.
1602#[derive(Debug, Deserialize)]
1603#[serde(tag = "action", rename_all = "snake_case")]
1604enum ChaosAction {
1605    /// Mark a node unhealthy until restored (or until reset is
1606    /// called). Triggers the failover path the same way a real
1607    /// health-check failure would.
1608    ForceUnhealthy { target_node: String },
1609    /// Mark a previously-overridden node healthy again.
1610    Restore { target_node: String },
1611    /// Reset every chaos override in one call. Idempotent.
1612    Reset,
1613}
1614
1615/// JSON entry returned by `GET /plugins`. Built in admin.rs so the
1616/// plugins module doesn't need a serde dep.
1617#[cfg(feature = "wasm-plugins")]
1618#[derive(Serialize)]
1619struct PluginListEntry {
1620    name: String,
1621    version: String,
1622    description: String,
1623    /// Hook export names (`pre_query`, `post_query`, `route`, ...).
1624    hooks: Vec<String>,
1625    /// `Loading` | `Running` | `Paused` | `Error(...)` | `Unloading`.
1626    state: String,
1627    invocations: u64,
1628    errors: u64,
1629}
1630
1631/// JSON body for `POST /api/replay`.
1632#[cfg(feature = "ha-tr")]
1633#[derive(Debug, Deserialize)]
1634struct ReplayRequestBody {
1635    /// RFC 3339 inclusive window start.
1636    from: DateTime<Utc>,
1637    /// RFC 3339 inclusive window end.
1638    to: DateTime<Utc>,
1639    /// Target backend host (typically a staging DB).
1640    target_host: String,
1641    /// Target backend port.
1642    target_port: u16,
1643    /// Optional credential overrides — when omitted, the engine uses
1644    /// the template values set at server startup. Production callers
1645    /// targeting a separate staging DB pass these explicitly so the
1646    /// proxy doesn't need to hold staging credentials in its own
1647    /// config.
1648    #[serde(default)]
1649    target_user: Option<String>,
1650    #[serde(default)]
1651    target_password: Option<String>,
1652    #[serde(default)]
1653    target_database: Option<String>,
1654}
1655
1656/// Joined view exposed at `/topology`. Field names use camelCase so
1657/// they map cleanly into the Kubernetes operator's CRD status
1658/// (`HeliosProxyStatus.currentPrimary`, etc).
1659#[derive(Serialize)]
1660struct TopologyResponse {
1661    #[serde(rename = "currentPrimary")]
1662    current_primary: Option<String>,
1663    #[serde(rename = "healthyNodes")]
1664    healthy_nodes: u32,
1665    #[serde(rename = "unhealthyNodes")]
1666    unhealthy_nodes: u32,
1667    #[serde(rename = "totalNodes")]
1668    total_nodes: u32,
1669    /// RFC 3339 timestamp of the last detected primary change.
1670    /// `None` when the proxy hasn't observed a failover since boot.
1671    #[serde(rename = "lastFailoverAt")]
1672    last_failover_at: Option<String>,
1673}
1674
1675#[derive(Serialize)]
1676struct PoolStatsResponse {
1677    node: String,
1678    active_connections: u64,
1679    idle_connections: u64,
1680    pending_requests: u64,
1681    total_connections_created: u64,
1682    total_connections_closed: u64,
1683}
1684
1685#[derive(Serialize)]
1686struct VersionResponse {
1687    version: String,
1688    build_time: String,
1689}
1690
1691#[cfg(test)]
1692mod tests {
1693    use super::*;
1694
1695    #[tokio::test]
1696    async fn test_admin_state_creation() {
1697        let state = AdminState::new();
1698        let sessions = state.active_sessions.read().await;
1699        assert_eq!(*sessions, 0);
1700    }
1701
1702    #[tokio::test]
1703    async fn test_readiness_check_no_nodes() {
1704        let state = Arc::new(AdminState::new());
1705        let ready = AdminServer::check_readiness(&state).await;
1706        assert!(!ready);
1707    }
1708
1709    #[tokio::test]
1710    async fn test_readiness_check_with_healthy_node() {
1711        let state = Arc::new(AdminState::new());
1712
1713        {
1714            let mut health = state.node_health.write().await;
1715            health.insert(
1716                "localhost:5432".to_string(),
1717                NodeHealth {
1718                    address: "localhost:5432".to_string(),
1719                    healthy: true,
1720                    last_check: chrono::Utc::now(),
1721                    failure_count: 0,
1722                    last_error: None,
1723                    latency_ms: 1.0,
1724                    replication_lag_bytes: None,
1725                },
1726            );
1727        }
1728
1729        let ready = AdminServer::check_readiness(&state).await;
1730        assert!(ready);
1731    }
1732
1733    #[tokio::test]
1734    async fn test_command_registration() {
1735        let state = AdminState::new();
1736
1737        state
1738            .register_command("test", |args| {
1739                Ok(format!("Test command with {} args", args.len()))
1740            })
1741            .await;
1742
1743        let result = state.execute_command("test", &["arg1", "arg2"]).await;
1744        assert!(result.is_ok());
1745        assert_eq!(result.unwrap(), "Test command with 2 args");
1746    }
1747
1748    #[tokio::test]
1749    async fn test_unknown_command() {
1750        let state = AdminState::new();
1751        let result = state.execute_command("unknown", &[]).await;
1752        assert!(result.is_err());
1753    }
1754
1755    #[test]
1756    fn test_prometheus_metrics_format() {
1757        let metrics = ServerMetricsSnapshot {
1758            connections_accepted: 100,
1759            connections_closed: 50,
1760            queries_processed: 1000,
1761            bytes_received: 50000,
1762            bytes_sent: 100000,
1763            failovers: 2,
1764        };
1765
1766        let output = AdminServer::format_prometheus_metrics(&metrics);
1767        assert!(output.contains("heliosdb_proxy_connections_total 100"));
1768        assert!(output.contains("heliosdb_proxy_queries_total 1000"));
1769        assert!(output.contains("heliosdb_proxy_failovers_total 2"));
1770    }
1771
1772    #[test]
1773    fn test_metrics_response_active_connections() {
1774        let snapshot = ServerMetricsSnapshot {
1775            connections_accepted: 100,
1776            connections_closed: 30,
1777            queries_processed: 500,
1778            bytes_received: 10000,
1779            bytes_sent: 20000,
1780            failovers: 1,
1781        };
1782
1783        let response = MetricsResponse::from(snapshot);
1784        assert_eq!(response.connections_active, 70);
1785    }
1786
1787    /// Helper: build an AdminState with the given (address, role,
1788    /// healthy) tuples seeded into config + node_health.
1789    async fn topology_state(
1790        nodes: &[(&str, &str, bool)],
1791    ) -> Arc<AdminState> {
1792        let state = Arc::new(AdminState::new());
1793        {
1794            let mut cfg = state.config_snapshot.write().await;
1795            cfg.nodes = nodes
1796                .iter()
1797                .map(|(addr, role, _)| NodeSnapshot {
1798                    address: (*addr).to_string(),
1799                    role: (*role).to_string(),
1800                    weight: 100,
1801                    enabled: true,
1802                })
1803                .collect();
1804        }
1805        {
1806            let mut health = state.node_health.write().await;
1807            for (addr, _, healthy) in nodes {
1808                health.insert(
1809                    (*addr).to_string(),
1810                    NodeHealth {
1811                        address: (*addr).to_string(),
1812                        healthy: *healthy,
1813                        last_check: chrono::Utc::now(),
1814                        failure_count: 0,
1815                        last_error: None,
1816                        latency_ms: 1.0,
1817                        replication_lag_bytes: None,
1818                    },
1819                );
1820            }
1821        }
1822        state
1823    }
1824
1825    #[tokio::test]
1826    async fn test_topology_returns_healthy_primary() {
1827        let state = topology_state(&[
1828            ("primary.svc:5432", "primary", true),
1829            ("standby-a.svc:5432", "standby", true),
1830            ("standby-b.svc:5432", "standby", false),
1831        ])
1832        .await;
1833
1834        let topo = AdminServer::compute_topology(&state).await;
1835        assert_eq!(topo.current_primary.as_deref(), Some("primary.svc:5432"));
1836        assert_eq!(topo.healthy_nodes, 2);
1837        assert_eq!(topo.unhealthy_nodes, 1);
1838        assert_eq!(topo.total_nodes, 3);
1839    }
1840
1841    #[tokio::test]
1842    async fn test_topology_no_primary_when_primary_unhealthy() {
1843        // Failover in progress: the configured primary is sick and
1844        // no other node has been promoted yet.
1845        let state = topology_state(&[
1846            ("primary.svc:5432", "primary", false),
1847            ("standby.svc:5432", "standby", true),
1848        ])
1849        .await;
1850
1851        let topo = AdminServer::compute_topology(&state).await;
1852        assert_eq!(topo.current_primary, None);
1853        assert_eq!(topo.healthy_nodes, 1);
1854        assert_eq!(topo.unhealthy_nodes, 1);
1855    }
1856
1857    #[tokio::test]
1858    async fn test_topology_handles_empty_cluster() {
1859        let state = Arc::new(AdminState::new());
1860        let topo = AdminServer::compute_topology(&state).await;
1861        assert_eq!(topo.current_primary, None);
1862        assert_eq!(topo.healthy_nodes, 0);
1863        assert_eq!(topo.unhealthy_nodes, 0);
1864        assert_eq!(topo.total_nodes, 0);
1865    }
1866
1867    #[tokio::test]
1868    async fn test_topology_role_match_is_case_insensitive() {
1869        let state = topology_state(&[
1870            ("primary.svc:5432", "PRIMARY", true),
1871        ])
1872        .await;
1873        let topo = AdminServer::compute_topology(&state).await;
1874        assert_eq!(topo.current_primary.as_deref(), Some("primary.svc:5432"));
1875    }
1876
1877    #[cfg(feature = "ha-tr")]
1878    #[tokio::test]
1879    async fn test_replay_returns_503_when_engine_unattached() {
1880        let state = Arc::new(AdminState::new());
1881        let body = r#"{
1882            "from": "2026-04-25T10:00:00Z",
1883            "to":   "2026-04-25T11:00:00Z",
1884            "target_host": "127.0.0.1",
1885            "target_port": 5432
1886        }"#;
1887        let (status, value) = AdminServer::handle_replay_request(Some(body), &state)
1888            .await
1889            .expect("handler returns Ok with status code");
1890        assert_eq!(status, 503);
1891        assert_eq!(value["error"], "replay engine not attached");
1892    }
1893
1894    #[cfg(feature = "ha-tr")]
1895    #[tokio::test]
1896    async fn test_replay_400_on_malformed_body() {
1897        let state = Arc::new(AdminState::new());
1898        let (status, _) = AdminServer::handle_replay_request(Some("not json"), &state)
1899            .await
1900            .expect("handler returns Ok with status code");
1901        assert_eq!(status, 400);
1902    }
1903
1904    #[cfg(feature = "ha-tr")]
1905    #[tokio::test]
1906    async fn test_replay_errors_on_empty_body() {
1907        let state = Arc::new(AdminState::new());
1908        let err = AdminServer::handle_replay_request(None, &state).await;
1909        assert!(err.is_err(), "empty body must surface as Err");
1910    }
1911
1912    #[cfg(feature = "wasm-plugins")]
1913    #[tokio::test]
1914    async fn test_plugins_list_returns_503_when_manager_unattached() {
1915        let state = Arc::new(AdminState::new());
1916        let (status, value) = AdminServer::handle_plugins_list(&state)
1917            .await
1918            .expect("handler returns Ok with status code");
1919        assert_eq!(status, 503);
1920        assert_eq!(value["error"], "plugin manager not attached");
1921    }
1922
1923    #[cfg(not(feature = "wasm-plugins"))]
1924    #[tokio::test]
1925    async fn test_plugins_list_503_without_feature() {
1926        let state = Arc::new(AdminState::new());
1927        let (status, _) = AdminServer::handle_plugins_list(&state)
1928            .await
1929            .expect("handler returns Ok");
1930        assert_eq!(status, 503);
1931    }
1932
1933    /// Helper: state with a single healthy node seeded into health.
1934    async fn chaos_state_with_node(addr: &str) -> Arc<AdminState> {
1935        let state = Arc::new(AdminState::new());
1936        state.node_health.write().await.insert(
1937            addr.to_string(),
1938            NodeHealth {
1939                address: addr.to_string(),
1940                healthy: true,
1941                last_check: chrono::Utc::now(),
1942                failure_count: 0,
1943                last_error: None,
1944                latency_ms: 1.0,
1945                replication_lag_bytes: None,
1946            },
1947        );
1948        state
1949    }
1950
1951    #[tokio::test]
1952    async fn test_chaos_force_unhealthy_flips_node_and_records_override() {
1953        let state = chaos_state_with_node("primary.svc:5432").await;
1954        let body = r#"{"action":"force_unhealthy","target_node":"primary.svc:5432"}"#;
1955        let (status, value) = AdminServer::handle_chaos_request(Some(body), &state)
1956            .await
1957            .expect("handler returns Ok");
1958        assert_eq!(status, 200);
1959        assert_eq!(value["applied"], "force_unhealthy");
1960        // Health flag flipped.
1961        assert!(!state.node_health.read().await["primary.svc:5432"].healthy);
1962        // Override recorded.
1963        assert!(state.chaos_overrides.read().await.contains_key("primary.svc:5432"));
1964    }
1965
1966    #[tokio::test]
1967    async fn test_chaos_restore_clears_override_and_flips_back() {
1968        let state = chaos_state_with_node("primary.svc:5432").await;
1969        let _ = AdminServer::handle_chaos_request(
1970            Some(r#"{"action":"force_unhealthy","target_node":"primary.svc:5432"}"#),
1971            &state,
1972        )
1973        .await
1974        .unwrap();
1975        let (status, _) = AdminServer::handle_chaos_request(
1976            Some(r#"{"action":"restore","target_node":"primary.svc:5432"}"#),
1977            &state,
1978        )
1979        .await
1980        .unwrap();
1981        assert_eq!(status, 200);
1982        assert!(state.node_health.read().await["primary.svc:5432"].healthy);
1983        assert!(state.chaos_overrides.read().await.is_empty());
1984    }
1985
1986    #[tokio::test]
1987    async fn test_chaos_reset_restores_all_overrides() {
1988        let state = chaos_state_with_node("a:5432").await;
1989        state.node_health.write().await.insert(
1990            "b:5432".to_string(),
1991            NodeHealth {
1992                address: "b:5432".to_string(),
1993                healthy: true,
1994                last_check: chrono::Utc::now(),
1995                failure_count: 0,
1996                last_error: None,
1997                latency_ms: 1.0,
1998                replication_lag_bytes: None,
1999            },
2000        );
2001        for addr in &["a:5432", "b:5432"] {
2002            let body = format!(r#"{{"action":"force_unhealthy","target_node":"{}"}}"#, addr);
2003            let _ = AdminServer::handle_chaos_request(Some(&body), &state)
2004                .await
2005                .unwrap();
2006        }
2007        let (status, value) = AdminServer::handle_chaos_request(
2008            Some(r#"{"action":"reset"}"#),
2009            &state,
2010        )
2011        .await
2012        .unwrap();
2013        assert_eq!(status, 200);
2014        assert_eq!(value["reset"], true);
2015        let restored = value["restored"].as_array().unwrap();
2016        assert_eq!(restored.len(), 2);
2017        // Both nodes back to healthy + overrides cleared.
2018        for addr in &["a:5432", "b:5432"] {
2019            assert!(state.node_health.read().await[*addr].healthy);
2020        }
2021        assert!(state.chaos_overrides.read().await.is_empty());
2022    }
2023
2024    #[tokio::test]
2025    async fn test_chaos_force_unhealthy_404s_when_node_unknown() {
2026        let state = Arc::new(AdminState::new());
2027        let body = r#"{"action":"force_unhealthy","target_node":"missing.svc:5432"}"#;
2028        let (status, _) = AdminServer::handle_chaos_request(Some(body), &state)
2029            .await
2030            .expect("handler returns Ok");
2031        assert_eq!(status, 404);
2032    }
2033
2034    #[tokio::test]
2035    async fn test_chaos_400_on_malformed_body() {
2036        let state = Arc::new(AdminState::new());
2037        let (status, _) = AdminServer::handle_chaos_request(Some("not json"), &state)
2038            .await
2039            .expect("handler returns Ok");
2040        assert_eq!(status, 400);
2041    }
2042
2043    #[tokio::test]
2044    async fn test_chaos_400_on_unknown_action() {
2045        let state = Arc::new(AdminState::new());
2046        let body = r#"{"action":"format_disk","target_node":"x"}"#;
2047        let (status, _) = AdminServer::handle_chaos_request(Some(body), &state)
2048            .await
2049            .expect("handler returns Ok");
2050        assert_eq!(status, 400);
2051    }
2052
2053    #[cfg(feature = "ha-tr")]
2054    #[tokio::test]
2055    async fn test_shadow_400_on_malformed_body() {
2056        let (status, _) = AdminServer::handle_shadow_request(Some("not json"))
2057            .await
2058            .expect("handler returns Ok");
2059        assert_eq!(status, 400);
2060    }
2061
2062    #[cfg(feature = "ha-tr")]
2063    #[tokio::test]
2064    async fn test_shadow_500_on_source_unreachable() {
2065        // Address that nothing is listening on (port 1 = tcpmux,
2066        // refused by everything reasonable).
2067        let body = r#"{
2068            "sql": "SELECT 1",
2069            "source_host": "127.0.0.1",
2070            "source_port": 1,
2071            "shadow_host": "127.0.0.1",
2072            "shadow_port": 1
2073        }"#;
2074        let (status, value) = AdminServer::handle_shadow_request(Some(body))
2075            .await
2076            .expect("handler returns Ok");
2077        assert_eq!(status, 500);
2078        let err = value["error"].as_str().expect("error field");
2079        assert!(
2080            err.contains("source connect"),
2081            "expected source connect error, got {}",
2082            err
2083        );
2084    }
2085
2086    #[cfg(feature = "ha-tr")]
2087    #[tokio::test]
2088    async fn test_shadow_errors_on_empty_body() {
2089        let err = AdminServer::handle_shadow_request(None).await;
2090        assert!(err.is_err(), "empty body must surface as Err");
2091    }
2092
2093    #[cfg(feature = "anomaly-detection")]
2094    #[tokio::test]
2095    async fn test_anomalies_returns_503_when_detector_unattached() {
2096        let state = Arc::new(AdminState::new());
2097        let (status, value) =
2098            AdminServer::handle_anomalies_list("/anomalies", &state)
2099                .await
2100                .expect("handler returns Ok");
2101        assert_eq!(status, 503);
2102        assert_eq!(value["error"], "anomaly detector not attached");
2103    }
2104
2105    #[cfg(feature = "anomaly-detection")]
2106    #[tokio::test]
2107    async fn test_anomalies_returns_attached_detector_events() {
2108        use crate::anomaly::{AnomalyConfig, AnomalyDetector, QueryObservation};
2109        let state = Arc::new(AdminState::new());
2110        let det = Arc::new(AnomalyDetector::new(AnomalyConfig::default()));
2111        // Seed a SQL injection event into the detector.
2112        let _ = det.record_query(&QueryObservation {
2113            tenant: "test".into(),
2114            fingerprint: "fp".into(),
2115            sql: "SELECT * FROM users WHERE id = 1 OR 1=1 --".into(),
2116            timestamp: std::time::Instant::now(),
2117            iso_timestamp: "ts".into(),
2118        });
2119        state.with_anomaly_detector(det.clone()).await;
2120
2121        let (status, value) =
2122            AdminServer::handle_anomalies_list("/anomalies", &state)
2123                .await
2124                .expect("handler returns Ok");
2125        assert_eq!(status, 200);
2126        let count = value["count"].as_u64().expect("count field");
2127        assert!(count > 0, "expected at least one event, got {}", count);
2128    }
2129
2130    #[cfg(feature = "anomaly-detection")]
2131    #[tokio::test]
2132    async fn test_anomalies_limit_query_string_respected() {
2133        use crate::anomaly::{AnomalyConfig, AnomalyDetector, QueryObservation};
2134        let state = Arc::new(AdminState::new());
2135        let det = Arc::new(AnomalyDetector::new(AnomalyConfig::default()));
2136        for i in 0..50 {
2137            let fp = format!("fp{}", i);
2138            let _ = det.record_query(&QueryObservation {
2139                tenant: "test".into(),
2140                fingerprint: fp,
2141                sql: "SELECT 1".into(),
2142                timestamp: std::time::Instant::now(),
2143                iso_timestamp: "ts".into(),
2144            });
2145        }
2146        state.with_anomaly_detector(det).await;
2147
2148        let (status, value) =
2149            AdminServer::handle_anomalies_list("/anomalies?limit=5", &state)
2150                .await
2151                .expect("handler returns Ok");
2152        assert_eq!(status, 200);
2153        assert_eq!(value["limit"].as_u64().unwrap(), 5);
2154        assert_eq!(value["events"].as_array().unwrap().len(), 5);
2155    }
2156
2157    #[cfg(feature = "anomaly-detection")]
2158    #[test]
2159    fn test_parse_limit_query_helper() {
2160        assert_eq!(parse_limit_query("/anomalies", 100, 1024), 100);
2161        assert_eq!(parse_limit_query("/anomalies?limit=42", 100, 1024), 42);
2162        assert_eq!(parse_limit_query("/anomalies?limit=99999", 100, 1024), 1024);
2163        assert_eq!(parse_limit_query("/anomalies?limit=abc", 100, 1024), 100);
2164        assert_eq!(parse_limit_query("/anomalies?other=x&limit=7", 100, 1024), 7);
2165    }
2166
2167    #[cfg(feature = "edge-proxy")]
2168    async fn edge_state() -> Arc<AdminState> {
2169        use crate::edge::{EdgeCache, EdgeRegistry};
2170        use std::time::Duration;
2171        let s = Arc::new(AdminState::new());
2172        let cache = Arc::new(EdgeCache::new(100));
2173        let registry = Arc::new(EdgeRegistry::new(8, Duration::from_secs(60)));
2174        s.with_edge(cache, registry).await;
2175        s
2176    }
2177
2178    #[cfg(feature = "edge-proxy")]
2179    #[tokio::test]
2180    async fn test_edge_status_returns_empty_lists_initially() {
2181        let s = edge_state().await;
2182        let (status, value) = AdminServer::handle_edge_status(&s)
2183            .await
2184            .expect("handler returns Ok");
2185        assert_eq!(status, 200);
2186        assert_eq!(value["edge_count"].as_u64().unwrap(), 0);
2187        assert_eq!(value["registered"].as_array().unwrap().len(), 0);
2188        assert!(value["cache"].is_object(), "cache stats present");
2189    }
2190
2191    #[cfg(feature = "edge-proxy")]
2192    #[tokio::test]
2193    async fn test_edge_register_then_status_lists_edge() {
2194        let s = edge_state().await;
2195        let body = r#"{"edge_id":"e1","region":"us-east","base_url":"https://e1.svc"}"#;
2196        let (status, _) = AdminServer::handle_edge_register(Some(body), &s)
2197            .await
2198            .expect("handler ok");
2199        assert_eq!(status, 201);
2200        let (status2, value2) = AdminServer::handle_edge_status(&s).await.unwrap();
2201        assert_eq!(status2, 200);
2202        assert_eq!(value2["edge_count"].as_u64().unwrap(), 1);
2203        assert_eq!(
2204            value2["registered"][0]["edge_id"].as_str().unwrap(),
2205            "e1"
2206        );
2207    }
2208
2209    #[cfg(feature = "edge-proxy")]
2210    #[tokio::test]
2211    async fn test_edge_register_400_on_malformed_body() {
2212        let s = edge_state().await;
2213        let (status, _) = AdminServer::handle_edge_register(Some("not json"), &s)
2214            .await
2215            .expect("handler ok");
2216        assert_eq!(status, 400);
2217    }
2218
2219    #[cfg(feature = "edge-proxy")]
2220    #[tokio::test]
2221    async fn test_edge_invalidate_drops_local_cache_entries() {
2222        use crate::edge::{CacheEntry, CacheKey};
2223        use std::time::{Duration, Instant};
2224        let s = edge_state().await;
2225        // Seed an entry into the local cache.
2226        let cache = s.edge_cache.read().await.clone().unwrap();
2227        cache.insert(
2228            CacheKey::new("fp1", "p1"),
2229            CacheEntry {
2230                version: 1,
2231                response_bytes: b"row".to_vec(),
2232                tables: vec!["users".into()],
2233                expires_at: Instant::now() + Duration::from_secs(60),
2234            },
2235        );
2236        assert!(cache.get(&CacheKey::new("fp1", "p1")).is_some());
2237
2238        let body = r#"{"tables":["users"]}"#;
2239        let (status, value) = AdminServer::handle_edge_invalidate(Some(body), &s)
2240            .await
2241            .expect("handler ok");
2242        assert_eq!(status, 200);
2243        assert_eq!(value["dropped_local"].as_u64().unwrap(), 1);
2244        assert!(cache.get(&CacheKey::new("fp1", "p1")).is_none());
2245    }
2246
2247    #[cfg(feature = "edge-proxy")]
2248    #[tokio::test]
2249    async fn test_edge_invalidate_503_when_cache_unattached() {
2250        let s = Arc::new(AdminState::new());
2251        let body = r#"{"tables":["users"]}"#;
2252        let (status, _) = AdminServer::handle_edge_invalidate(Some(body), &s)
2253            .await
2254            .expect("handler ok");
2255        assert_eq!(status, 503);
2256    }
2257}