1use std::net::SocketAddr;
9use std::sync::Arc;
10
11use http::{Request, Response, StatusCode};
12use http_body_util::BodyExt;
13use hyper::body::Incoming;
14use hyper::service::service_fn;
15use hyper_util::rt::{TokioIo, TokioTimer};
16use tokio::net::TcpListener;
17use tracing::{error, info, warn};
18
19use crate::config::parse_config;
20use crate::hoops::metrics::Metrics;
21use crate::server::AppState;
22use crate::{Body, ProxyError, full_body};
23
24pub async fn start_admin_server(
30 addr: SocketAddr,
31 state: Arc<AppState>,
32 metrics: Arc<Metrics>,
33) -> Result<(), ProxyError> {
34 let listener = TcpListener::bind(addr).await?;
35 info!(%addr, "admin API server listening");
36
37 loop {
38 let (stream, _client_addr) = match listener.accept().await {
39 Ok(conn) => conn,
40 Err(e) => {
41 error!("admin accept error: {e}");
42 continue;
43 }
44 };
45
46 let state = Arc::clone(&state);
47 let metrics = Arc::clone(&metrics);
48
49 tokio::spawn(async move {
50 let io = TokioIo::new(stream);
51 let service = service_fn(move |req: Request<Incoming>| {
52 let state = Arc::clone(&state);
53 let metrics = Arc::clone(&metrics);
54 async move { handle_admin_request(req, &state, &metrics).await }
55 });
56
57 if let Err(e) =
58 hyper_util::server::conn::auto::Builder::new(hyper_util::rt::TokioExecutor::new())
59 .http1()
60 .timer(TokioTimer::new())
61 .serve_connection(io, service)
62 .await
63 {
64 error!("admin connection error: {e}");
65 }
66 });
67 }
68}
69
70async fn handle_admin_request(
72 req: Request<Incoming>,
73 state: &AppState,
74 metrics: &Metrics,
75) -> Result<Response<Body>, std::convert::Infallible> {
76 let config = state.config.load();
78 if let Some(ref token) = config.global.admin_auth_token
79 && !check_bearer_auth(&req, token)
80 {
81 return Ok(json_response(
82 StatusCode::UNAUTHORIZED,
83 r#"{"error":"unauthorized"}"#,
84 ));
85 }
86
87 let path = req.uri().path().to_string();
88 let method = req.method().clone();
89
90 let result = match (method.as_str(), path.as_str()) {
91 ("GET", "/config") => handle_get_config(state),
92 ("POST", "/config/reload") => handle_config_reload(state).await,
93 ("POST", "/config") => handle_post_config(req, state).await,
94 ("POST", "/config/test") => handle_test_config(req).await,
95 ("POST", "/stop") => handle_stop(state),
96 ("GET", "/health") => handle_health(),
97 ("GET", "/upstreams") => handle_upstreams(state),
98 ("GET", "/metrics") => handle_metrics(metrics),
99 ("GET", "/runtime") => handle_runtime(),
100 _ => json_response(StatusCode::NOT_FOUND, r#"{"error":"not found"}"#),
101 };
102
103 Ok(result)
104}
105
106fn handle_get_config(state: &AppState) -> Response<Body> {
112 let config = state.config.load();
113 match serde_json::to_string_pretty(&**config) {
114 Ok(json) => Response::builder()
115 .status(StatusCode::OK)
116 .header("Content-Type", "application/json")
117 .body(full_body(json))
118 .unwrap(),
119 Err(e) => json_response(
120 StatusCode::INTERNAL_SERVER_ERROR,
121 &format!(r#"{{"error":"serialization failed: {e}"}}"#),
122 ),
123 }
124}
125
126async fn handle_config_reload(state: &AppState) -> Response<Body> {
134 if let Some(ref config_path) = state.config_path {
138 match std::fs::read_to_string(config_path) {
139 Ok(config_str) => match crate::config::parse_config(&config_str) {
140 Ok(new_config) => {
141 state.reload(new_config).await;
142 json_response(StatusCode::OK, r#"{"status":"reloaded"}"#)
143 }
144 Err(e) => json_response(
145 StatusCode::BAD_REQUEST,
146 &format!(r#"{{"error":"config parse failed: {e}"}}"#),
147 ),
148 },
149 Err(e) => json_response(
150 StatusCode::INTERNAL_SERVER_ERROR,
151 &format!(r#"{{"error":"failed to read config file: {e}"}}"#),
152 ),
153 }
154 } else {
155 json_response(
156 StatusCode::OK,
157 r#"{"status":"reload_requested","note":"no config path set; reload must be triggered externally"}"#,
158 )
159 }
160}
161
162fn handle_health() -> Response<Body> {
164 json_response(StatusCode::OK, r#"{"status":"healthy"}"#)
165}
166
167fn handle_upstreams(state: &AppState) -> Response<Body> {
173 let config = state.config.load();
174 let mut upstreams = Vec::new();
175
176 for site in &config.sites {
177 for route in &site.routes {
178 if let crate::config::HandlerConfig::Proxy(ref proxy_cfg) = route.handler {
179 for upstream in &proxy_cfg.upstreams {
180 upstreams.push(serde_json::json!({
181 "site": site.host,
182 "route": route.path,
183 "address": upstream.addr,
184 "weight": upstream.weight,
185 }));
186 }
187 }
188 }
189 }
190
191 let json = serde_json::to_string_pretty(&upstreams).unwrap_or_else(|_| "[]".to_string());
192 Response::builder()
193 .status(StatusCode::OK)
194 .header("Content-Type", "application/json")
195 .body(full_body(json))
196 .unwrap()
197}
198
199fn handle_runtime() -> Response<Body> {
201 let rt = crate::runtime::info();
202 let json = format!(
203 r#"{{"runtime":"{}","io_uring":{},"send_tasks":{}}}"#,
204 rt.name, rt.io_uring, rt.send_tasks
205 );
206 json_response(StatusCode::OK, &json)
207}
208
209fn handle_metrics(metrics: &Metrics) -> Response<Body> {
211 let output = metrics.render_prometheus();
212 Response::builder()
213 .status(StatusCode::OK)
214 .header("Content-Type", "text/plain; version=0.0.4; charset=utf-8")
215 .body(full_body(output))
216 .unwrap()
217}
218
219async fn handle_post_config(req: Request<Incoming>, state: &AppState) -> Response<Body> {
221 let body_bytes = match req.into_body().collect().await {
222 Ok(collected) => collected.to_bytes(),
223 Err(e) => {
224 return json_response(
225 StatusCode::BAD_REQUEST,
226 &format!(r#"{{"error":"failed to read body: {e}"}}"#),
227 );
228 }
229 };
230
231 let config_str = match std::str::from_utf8(&body_bytes) {
232 Ok(s) => s,
233 Err(e) => {
234 return json_response(
235 StatusCode::BAD_REQUEST,
236 &format!(r#"{{"error":"invalid UTF-8: {e}"}}"#),
237 );
238 }
239 };
240
241 match parse_config(config_str) {
242 Ok(new_config) => {
243 let sites = new_config.sites.len();
244 let routes: usize = new_config.sites.iter().map(|s| s.routes.len()).sum();
245 state.reload(new_config).await;
246 info!("config applied via admin API");
247 json_response(
248 StatusCode::OK,
249 &format!(r#"{{"status":"applied","sites":{sites},"routes":{routes}}}"#),
250 )
251 }
252 Err(e) => json_response(
253 StatusCode::BAD_REQUEST,
254 &format!(r#"{{"error":"config parse failed: {e}"}}"#),
255 ),
256 }
257}
258
259async fn handle_test_config(req: Request<Incoming>) -> Response<Body> {
261 let body_bytes = match req.into_body().collect().await {
262 Ok(collected) => collected.to_bytes(),
263 Err(e) => {
264 return json_response(
265 StatusCode::BAD_REQUEST,
266 &format!(r#"{{"error":"failed to read body: {e}"}}"#),
267 );
268 }
269 };
270
271 let config_str = match std::str::from_utf8(&body_bytes) {
272 Ok(s) => s,
273 Err(e) => {
274 return json_response(
275 StatusCode::BAD_REQUEST,
276 &format!(r#"{{"error":"invalid UTF-8: {e}"}}"#),
277 );
278 }
279 };
280
281 match parse_config(config_str) {
282 Ok(config) => {
283 let sites = config.sites.len();
284 let routes: usize = config.sites.iter().map(|s| s.routes.len()).sum();
285 json_response(
286 StatusCode::OK,
287 &format!(r#"{{"status":"valid","sites":{sites},"routes":{routes}}}"#),
288 )
289 }
290 Err(e) => json_response(StatusCode::BAD_REQUEST, &format!(r#"{{"error":"{e}"}}"#)),
291 }
292}
293
294fn handle_stop(state: &AppState) -> Response<Body> {
296 warn!("shutdown requested via admin API");
297 state.shutdown.shutdown();
298 json_response(StatusCode::OK, r#"{"status":"stopping"}"#)
299}
300
301fn json_response(status: StatusCode, body: &str) -> Response<Body> {
307 Response::builder()
308 .status(status)
309 .header("Content-Type", "application/json")
310 .body(full_body(body.to_string()))
311 .unwrap()
312}
313
314fn check_bearer_auth(req: &Request<Incoming>, expected_token: &str) -> bool {
316 req.headers()
317 .get(http::header::AUTHORIZATION)
318 .and_then(|v| v.to_str().ok())
319 .map(|v| {
320 v.starts_with("Bearer ")
321 && crate::crypto::constant_time_eq(&v.as_bytes()[7..], expected_token.as_bytes())
322 })
323 .unwrap_or(false)
324}