Skip to main content

gatel_core/admin/
mod.rs

1//! Admin REST API server.
2//!
3//! Provides a lightweight HTTP API for inspecting and managing the running
4//! proxy. Runs on a separate listener (configured via `global.admin_addr`)
5//! and exposes endpoints for health, configuration, upstream status, metrics,
6//! and hot-reload.
7
8use std::net::SocketAddr;
9use std::sync::Arc;
10
11use http::{Request, Response, StatusCode};
12use http_body_util::BodyExt;
13use hyper::body::Incoming;
14use hyper::service::service_fn;
15use hyper_util::rt::{TokioIo, TokioTimer};
16use tokio::net::TcpListener;
17use tracing::{error, info, warn};
18
19use crate::config::parse_config;
20use crate::hoops::metrics::Metrics;
21use crate::server::AppState;
22use crate::{Body, ProxyError, full_body};
23
24/// Start the admin HTTP server on the given address.
25///
26/// The server runs indefinitely and handles requests against the shared
27/// application state. It should be spawned as a background task alongside
28/// the main proxy listeners.
29pub async fn start_admin_server(
30    addr: SocketAddr,
31    state: Arc<AppState>,
32    metrics: Arc<Metrics>,
33) -> Result<(), ProxyError> {
34    let listener = TcpListener::bind(addr).await?;
35    info!(%addr, "admin API server listening");
36
37    loop {
38        let (stream, _client_addr) = match listener.accept().await {
39            Ok(conn) => conn,
40            Err(e) => {
41                error!("admin accept error: {e}");
42                continue;
43            }
44        };
45
46        let state = Arc::clone(&state);
47        let metrics = Arc::clone(&metrics);
48
49        tokio::spawn(async move {
50            let io = TokioIo::new(stream);
51            let service = service_fn(move |req: Request<Incoming>| {
52                let state = Arc::clone(&state);
53                let metrics = Arc::clone(&metrics);
54                async move { handle_admin_request(req, &state, &metrics).await }
55            });
56
57            if let Err(e) =
58                hyper_util::server::conn::auto::Builder::new(hyper_util::rt::TokioExecutor::new())
59                    .http1()
60                    .timer(TokioTimer::new())
61                    .serve_connection(io, service)
62                    .await
63            {
64                error!("admin connection error: {e}");
65            }
66        });
67    }
68}
69
70/// Route an admin request to the appropriate handler.
71async fn handle_admin_request(
72    req: Request<Incoming>,
73    state: &AppState,
74    metrics: &Metrics,
75) -> Result<Response<Body>, std::convert::Infallible> {
76    // Enforce bearer token auth if configured.
77    let config = state.config.load();
78    if let Some(ref token) = config.global.admin_auth_token
79        && !check_bearer_auth(&req, token)
80    {
81        return Ok(json_response(
82            StatusCode::UNAUTHORIZED,
83            r#"{"error":"unauthorized"}"#,
84        ));
85    }
86
87    let path = req.uri().path().to_string();
88    let method = req.method().clone();
89
90    let result = match (method.as_str(), path.as_str()) {
91        ("GET", "/config") => handle_get_config(state),
92        ("POST", "/config/reload") => handle_config_reload(state).await,
93        ("POST", "/config") => handle_post_config(req, state).await,
94        ("POST", "/config/test") => handle_test_config(req).await,
95        ("POST", "/stop") => handle_stop(state),
96        ("GET", "/health") => handle_health(),
97        ("GET", "/upstreams") => handle_upstreams(state),
98        ("GET", "/metrics") => handle_metrics(metrics),
99        ("GET", "/runtime") => handle_runtime(),
100        _ => json_response(StatusCode::NOT_FOUND, r#"{"error":"not found"}"#),
101    };
102
103    Ok(result)
104}
105
106// ---------------------------------------------------------------------------
107// Endpoint handlers
108// ---------------------------------------------------------------------------
109
110/// `GET /config` — return the current configuration as JSON.
111fn handle_get_config(state: &AppState) -> Response<Body> {
112    let config = state.config.load();
113    match serde_json::to_string_pretty(&**config) {
114        Ok(json) => Response::builder()
115            .status(StatusCode::OK)
116            .header("Content-Type", "application/json")
117            .body(full_body(json))
118            .unwrap(),
119        Err(e) => json_response(
120            StatusCode::INTERNAL_SERVER_ERROR,
121            &format!(r#"{{"error":"serialization failed: {e}"}}"#),
122        ),
123    }
124}
125
126/// `POST /config/reload` — trigger a hot-reload from the config file.
127///
128/// Currently delegates to `AppState::reload` using the path stored in the
129/// existing config. In practice, the caller (main binary) would re-read the
130/// config file and call `state.reload(new_config)`. Here we just signal
131/// success to confirm the endpoint is wired up; the actual file re-read is
132/// done by the caller that owns the config path.
133async fn handle_config_reload(state: &AppState) -> Response<Body> {
134    // Read the config path from global state — the admin API can only signal
135    // that a reload was requested. The actual file-based reload requires the
136    // config path which is stored in AppState.config_path.
137    if let Some(ref config_path) = state.config_path {
138        match std::fs::read_to_string(config_path) {
139            Ok(config_str) => match crate::config::parse_config(&config_str) {
140                Ok(new_config) => {
141                    state.reload(new_config).await;
142                    json_response(StatusCode::OK, r#"{"status":"reloaded"}"#)
143                }
144                Err(e) => json_response(
145                    StatusCode::BAD_REQUEST,
146                    &format!(r#"{{"error":"config parse failed: {e}"}}"#),
147                ),
148            },
149            Err(e) => json_response(
150                StatusCode::INTERNAL_SERVER_ERROR,
151                &format!(r#"{{"error":"failed to read config file: {e}"}}"#),
152            ),
153        }
154    } else {
155        json_response(
156            StatusCode::OK,
157            r#"{"status":"reload_requested","note":"no config path set; reload must be triggered externally"}"#,
158        )
159    }
160}
161
162/// `GET /health` — simple health check endpoint.
163fn handle_health() -> Response<Body> {
164    json_response(StatusCode::OK, r#"{"status":"healthy"}"#)
165}
166
167/// `GET /upstreams` — return upstream backend status.
168///
169/// Iterates over all sites and their proxy routes to collect upstream info
170/// from the current configuration. This shows the configured addresses,
171/// health status, and active connections from the config perspective.
172fn handle_upstreams(state: &AppState) -> Response<Body> {
173    let config = state.config.load();
174    let mut upstreams = Vec::new();
175
176    for site in &config.sites {
177        for route in &site.routes {
178            if let crate::config::HandlerConfig::Proxy(ref proxy_cfg) = route.handler {
179                for upstream in &proxy_cfg.upstreams {
180                    upstreams.push(serde_json::json!({
181                        "site": site.host,
182                        "route": route.path,
183                        "address": upstream.addr,
184                        "weight": upstream.weight,
185                    }));
186                }
187            }
188        }
189    }
190
191    let json = serde_json::to_string_pretty(&upstreams).unwrap_or_else(|_| "[]".to_string());
192    Response::builder()
193        .status(StatusCode::OK)
194        .header("Content-Type", "application/json")
195        .body(full_body(json))
196        .unwrap()
197}
198
199/// `GET /runtime` — return runtime information.
200fn handle_runtime() -> Response<Body> {
201    let rt = crate::runtime::info();
202    let json = format!(
203        r#"{{"runtime":"{}","io_uring":{},"send_tasks":{}}}"#,
204        rt.name, rt.io_uring, rt.send_tasks
205    );
206    json_response(StatusCode::OK, &json)
207}
208
209/// `GET /metrics` — return Prometheus-format metrics.
210fn handle_metrics(metrics: &Metrics) -> Response<Body> {
211    let output = metrics.render_prometheus();
212    Response::builder()
213        .status(StatusCode::OK)
214        .header("Content-Type", "text/plain; version=0.0.4; charset=utf-8")
215        .body(full_body(output))
216        .unwrap()
217}
218
219/// `POST /config` — apply a new KDL configuration from the request body.
220async fn handle_post_config(req: Request<Incoming>, state: &AppState) -> Response<Body> {
221    let body_bytes = match req.into_body().collect().await {
222        Ok(collected) => collected.to_bytes(),
223        Err(e) => {
224            return json_response(
225                StatusCode::BAD_REQUEST,
226                &format!(r#"{{"error":"failed to read body: {e}"}}"#),
227            );
228        }
229    };
230
231    let config_str = match std::str::from_utf8(&body_bytes) {
232        Ok(s) => s,
233        Err(e) => {
234            return json_response(
235                StatusCode::BAD_REQUEST,
236                &format!(r#"{{"error":"invalid UTF-8: {e}"}}"#),
237            );
238        }
239    };
240
241    match parse_config(config_str) {
242        Ok(new_config) => {
243            let sites = new_config.sites.len();
244            let routes: usize = new_config.sites.iter().map(|s| s.routes.len()).sum();
245            state.reload(new_config).await;
246            info!("config applied via admin API");
247            json_response(
248                StatusCode::OK,
249                &format!(r#"{{"status":"applied","sites":{sites},"routes":{routes}}}"#),
250            )
251        }
252        Err(e) => json_response(
253            StatusCode::BAD_REQUEST,
254            &format!(r#"{{"error":"config parse failed: {e}"}}"#),
255        ),
256    }
257}
258
259/// `POST /config/test` — validate a KDL configuration without applying it.
260async fn handle_test_config(req: Request<Incoming>) -> Response<Body> {
261    let body_bytes = match req.into_body().collect().await {
262        Ok(collected) => collected.to_bytes(),
263        Err(e) => {
264            return json_response(
265                StatusCode::BAD_REQUEST,
266                &format!(r#"{{"error":"failed to read body: {e}"}}"#),
267            );
268        }
269    };
270
271    let config_str = match std::str::from_utf8(&body_bytes) {
272        Ok(s) => s,
273        Err(e) => {
274            return json_response(
275                StatusCode::BAD_REQUEST,
276                &format!(r#"{{"error":"invalid UTF-8: {e}"}}"#),
277            );
278        }
279    };
280
281    match parse_config(config_str) {
282        Ok(config) => {
283            let sites = config.sites.len();
284            let routes: usize = config.sites.iter().map(|s| s.routes.len()).sum();
285            json_response(
286                StatusCode::OK,
287                &format!(r#"{{"status":"valid","sites":{sites},"routes":{routes}}}"#),
288            )
289        }
290        Err(e) => json_response(StatusCode::BAD_REQUEST, &format!(r#"{{"error":"{e}"}}"#)),
291    }
292}
293
294/// `POST /stop` — initiate graceful shutdown.
295fn handle_stop(state: &AppState) -> Response<Body> {
296    warn!("shutdown requested via admin API");
297    state.shutdown.shutdown();
298    json_response(StatusCode::OK, r#"{"status":"stopping"}"#)
299}
300
301// ---------------------------------------------------------------------------
302// Helpers
303// ---------------------------------------------------------------------------
304
305/// Build a simple JSON response with the given status and body string.
306fn json_response(status: StatusCode, body: &str) -> Response<Body> {
307    Response::builder()
308        .status(status)
309        .header("Content-Type", "application/json")
310        .body(full_body(body.to_string()))
311        .unwrap()
312}
313
314/// Check for a valid `Authorization: Bearer <token>` header.
315fn check_bearer_auth(req: &Request<Incoming>, expected_token: &str) -> bool {
316    req.headers()
317        .get(http::header::AUTHORIZATION)
318        .and_then(|v| v.to_str().ok())
319        .map(|v| {
320            v.starts_with("Bearer ")
321                && crate::crypto::constant_time_eq(&v.as_bytes()[7..], expected_token.as_bytes())
322        })
323        .unwrap_or(false)
324}