1pub mod handlers;
2
3use crate::app::AppManager;
4use crate::circuit_breaker::SharedCircuitBreaker;
5use crate::config::ConfigManager;
6use crate::metrics::SharedMetrics;
7use anyhow::Result;
8use bytes::Bytes;
9use http_body_util::BodyExt;
10use hyper::body::Incoming;
11use hyper::service::service_fn;
12use hyper::{Method, Request, Response};
13use hyper_util::client::legacy::connect::HttpConnector;
14use hyper_util::client::legacy::Client;
15use hyper_util::rt::{TokioExecutor, TokioIo};
16use std::sync::Arc;
17use std::time::Instant;
18use tokio::io::{AsyncReadExt, AsyncWriteExt};
19use tokio::net::{TcpListener, TcpStream};
20
21type BoxBody = http_body_util::combinators::BoxBody<Bytes, std::convert::Infallible>;
22
23pub struct AdminState {
24 pub config_manager: Arc<ConfigManager>,
25 pub metrics: SharedMetrics,
26 pub start_time: Instant,
27 pub circuit_breaker: SharedCircuitBreaker,
28 pub app_manager: Option<Arc<AppManager>>,
29}
30
31pub(crate) fn cors_headers(
32 builder: hyper::http::response::Builder,
33) -> hyper::http::response::Builder {
34 builder
35 .header("Access-Control-Allow-Origin", "*")
36 .header(
37 "Access-Control-Allow-Methods",
38 "GET, POST, PUT, DELETE, OPTIONS",
39 )
40 .header("Access-Control-Allow-Headers", "Content-Type, X-Api-Key")
41}
42
43fn json_response(status: u16, body: serde_json::Value) -> Response<BoxBody> {
44 let bytes = Bytes::from(serde_json::to_string(&body).unwrap());
45 cors_headers(Response::builder())
46 .status(status)
47 .header("Content-Type", "application/json")
48 .body(http_body_util::Full::new(bytes).boxed())
49 .unwrap()
50}
51
52fn ok_response(data: serde_json::Value) -> Response<BoxBody> {
53 json_response(200, serde_json::json!({ "ok": true, "data": data }))
54}
55
56fn created_response(data: serde_json::Value) -> Response<BoxBody> {
57 json_response(201, serde_json::json!({ "ok": true, "data": data }))
58}
59
60fn no_content_response() -> Response<BoxBody> {
61 cors_headers(Response::builder())
62 .status(204)
63 .body(http_body_util::Full::new(Bytes::new()).boxed())
64 .unwrap()
65}
66
67fn preflight_response() -> Response<BoxBody> {
68 cors_headers(Response::builder())
69 .status(204)
70 .header("Access-Control-Max-Age", "86400")
71 .body(http_body_util::Full::new(Bytes::new()).boxed())
72 .unwrap()
73}
74
75fn error_response(status: u16, message: &str) -> Response<BoxBody> {
76 json_response(status, serde_json::json!({ "ok": false, "error": message }))
77}
78
79fn check_auth(req: &Request<Incoming>, api_key: &Option<String>) -> bool {
80 match api_key {
81 None => true, Some(key) if key.is_empty() => true,
83 Some(key) => req
84 .headers()
85 .get("X-Api-Key")
86 .and_then(|v| v.to_str().ok())
87 .is_some_and(|v| v == key),
88 }
89}
90
91fn extract_route_index(path: &str) -> Option<usize> {
93 path.strip_prefix("/api/v1/routes/")
94 .and_then(|s| s.parse::<usize>().ok())
95}
96
97async fn handle_admin_request(
98 req: Request<Incoming>,
99 state: Arc<AdminState>,
100) -> Result<Response<BoxBody>, std::convert::Infallible> {
101 let api_key = state.config_manager.get_config().admin.api_key.clone();
102 if !check_auth(&req, &api_key) {
103 return Ok(error_response(401, "Unauthorized"));
104 }
105
106 let method = req.method().clone();
107 let path = req.uri().path().to_string();
108
109 if method == Method::OPTIONS {
111 return Ok(preflight_response());
112 }
113
114 let response = match (method.clone(), path.as_str()) {
115 (Method::GET, "/api/v1/status") => handlers::get_status(&state).await,
117 (Method::GET, "/api/v1/config") => handlers::get_config(&state),
118 (Method::GET, "/api/v1/routes") => handlers::get_routes(&state),
119 (Method::GET, "/api/v1/metrics") => handlers::get_metrics(&state),
120 (Method::GET, "/api/v1/app-metrics") => handlers::get_all_app_metrics(&state),
121 (Method::GET, "/api/v1/app-metrics/system") => {
122 handlers::get_app_system_metrics(&state).await
123 }
124 (Method::GET, "/api/v1/events/apps") => handlers::sse_app_events(state.clone()).await,
125 (Method::POST, "/api/v1/reload") => handlers::post_reload(&state).await,
126
127 (Method::GET, "/api/v1/apps") => handlers::get_apps(&state).await,
129 (Method::GET, "/api/v1/apps/by-domain") => handlers::get_apps_by_domain(&state).await,
130 (_, p) if p.starts_with("/api/v1/apps/") => {
131 let app_name = p.strip_prefix("/api/v1/apps/").unwrap_or("");
132 if method == Method::GET && !app_name.is_empty() && !app_name.contains('/') {
133 handlers::get_app(&state, app_name).await
134 } else if method == Method::GET && app_name.ends_with("/metrics") {
135 let name = app_name.strip_suffix("/metrics").unwrap_or("");
136 if name.is_empty() {
137 error_response(400, "Invalid app name")
138 } else {
139 handlers::get_app_metrics(&state, name).await
140 }
141 } else if app_name.ends_with("/deploy") {
142 let name = app_name.strip_suffix("/deploy").unwrap_or("");
143 if name.is_empty() {
144 error_response(400, "Invalid app name")
145 } else {
146 handlers::post_app_deploy(&state, name).await
147 }
148 } else if app_name.ends_with("/restart") {
149 let name = app_name.strip_suffix("/restart").unwrap_or("");
150 if name.is_empty() {
151 error_response(400, "Invalid app name")
152 } else {
153 handlers::post_app_restart(&state, name).await
154 }
155 } else if app_name.ends_with("/rollback") {
156 let name = app_name.strip_suffix("/rollback").unwrap_or("");
157 if name.is_empty() {
158 error_response(400, "Invalid app name")
159 } else {
160 handlers::post_app_rollback(&state, name).await
161 }
162 } else if app_name.ends_with("/stop") {
163 let name = app_name.strip_suffix("/stop").unwrap_or("");
164 if name.is_empty() {
165 error_response(400, "Invalid app name")
166 } else {
167 handlers::post_app_stop(&state, name).await
168 }
169 } else if app_name.ends_with("/logs") {
170 let name = app_name.strip_suffix("/logs").unwrap_or("");
171 if name.is_empty() {
172 error_response(400, "Invalid app name")
173 } else {
174 handlers::get_app_logs(&state, name).await
175 }
176 } else {
177 error_response(404, "Not found")
178 }
179 }
180
181 (Method::POST, "/api/v1/routes") => {
183 let body = read_body(req).await;
184 handlers::post_route(&state, &body)
185 }
186 (Method::PUT, "/api/v1/config") => {
187 let body = read_body(req).await;
188 handlers::put_config(&state, &body)
189 }
190
191 (Method::GET, p) if p.starts_with("/api/v1/routes/") => match extract_route_index(p) {
193 Some(idx) => handlers::get_route(&state, idx),
194 None => error_response(400, "Invalid route index"),
195 },
196 (Method::PUT, p) if p.starts_with("/api/v1/routes/") => match extract_route_index(p) {
197 Some(idx) => {
198 let body = read_body(req).await;
199 handlers::put_route(&state, idx, &body)
200 }
201 None => error_response(400, "Invalid route index"),
202 },
203 (Method::DELETE, p) if p.starts_with("/api/v1/routes/") => match extract_route_index(p) {
204 Some(idx) => handlers::delete_route(&state, idx),
205 None => error_response(400, "Invalid route index"),
206 },
207
208 (Method::GET, "/api/v1/circuit-breaker") => handlers::get_circuit_breaker(&state),
210 (Method::POST, "/api/v1/circuit-breaker/reset") => handlers::reset_circuit_breaker(&state),
211
212 (Method::POST, "/api/v1/hash-password") => {
214 let body = read_body(req).await;
215 handlers::post_hash_password(&state, &body)
216 }
217
218 _ if state.app_manager.is_some() => {
220 let is_ws = req
221 .headers()
222 .get("upgrade")
223 .and_then(|v| v.to_str().ok())
224 .is_some_and(|v| v.eq_ignore_ascii_case("websocket"));
225 if is_ws {
226 proxy_websocket_to_admin_app(req, &state).await
227 } else {
228 proxy_to_admin_app(req, &state).await
229 }
230 }
231 _ => error_response(404, "Not found"),
232 };
233
234 Ok(response)
235}
236
237async fn proxy_to_admin_app(req: Request<Incoming>, state: &Arc<AdminState>) -> Response<BoxBody> {
238 let port = match resolve_admin_port(state).await {
239 Ok(p) => p,
240 Err(resp) => return resp,
241 };
242
243 let path = req.uri().path();
244 let query = req
245 .uri()
246 .query()
247 .map(|q| format!("?{}", q))
248 .unwrap_or_default();
249 let target_uri = format!("http://localhost:{}{}{}", port, path, query);
250
251 let (mut parts, body) = req.into_parts();
252 parts.uri = match target_uri.parse() {
253 Ok(uri) => uri,
254 Err(_) => return error_response(500, "Failed to build proxy URI"),
255 };
256
257 let proxy_req = Request::from_parts(parts, body);
258
259 let mut connector = HttpConnector::new();
260 connector.set_connect_timeout(Some(std::time::Duration::from_secs(3)));
261
262 let client: Client<HttpConnector, Incoming> =
263 Client::builder(TokioExecutor::new()).build(connector);
264
265 match client.request(proxy_req).await {
266 Ok(resp) => {
267 let (parts, body) = resp.into_parts();
268 let mapped = body.map_err(|_| -> std::convert::Infallible { unreachable!() });
269 Response::from_parts(parts, mapped.boxed())
270 }
271 Err(e) => {
272 tracing::error!("Failed to proxy to _admin app: {}", e);
273 error_response(502, &format!("Admin app not reachable on port {} — deploy it first: POST /api/v1/apps/_admin/deploy", port))
274 }
275 }
276}
277
278async fn resolve_admin_port(state: &Arc<AdminState>) -> Result<u16, Response<BoxBody>> {
280 let app_manager = match &state.app_manager {
281 Some(m) => m,
282 None => return Err(error_response(501, "App management not configured")),
283 };
284
285 let app = match app_manager.get_app("_admin").await {
286 Some(a) => a,
287 None => return Err(error_response(502, "_admin app not found")),
288 };
289
290 let port = if app.current_slot == "blue" {
291 app.blue.port
292 } else {
293 app.green.port
294 };
295
296 if port == 0 {
297 return Err(error_response(502, "_admin app not deployed"));
298 }
299
300 Ok(port)
301}
302
303async fn proxy_websocket_to_admin_app(
304 req: Request<Incoming>,
305 state: &Arc<AdminState>,
306) -> Response<BoxBody> {
307 let port = match resolve_admin_port(state).await {
308 Ok(p) => p,
309 Err(resp) => return resp,
310 };
311
312 let path = req.uri().path().to_string();
313 let query = req
314 .uri()
315 .query()
316 .map(|q| format!("?{}", q))
317 .unwrap_or_default();
318
319 let ws_key = req
321 .headers()
322 .get("sec-websocket-key")
323 .and_then(|v| v.to_str().ok())
324 .unwrap_or("")
325 .to_string();
326 let ws_version = req
327 .headers()
328 .get("sec-websocket-version")
329 .and_then(|v| v.to_str().ok())
330 .unwrap_or("13")
331 .to_string();
332 let ws_protocol = req
333 .headers()
334 .get("sec-websocket-protocol")
335 .and_then(|v| v.to_str().ok())
336 .map(|s| s.to_string());
337
338 let backend = match TcpStream::connect(format!("127.0.0.1:{}", port)).await {
340 Ok(s) => s,
341 Err(e) => {
342 tracing::error!("Failed to connect to _admin backend for WebSocket: {}", e);
343 return error_response(502, "Admin app not reachable");
344 }
345 };
346
347 let mut handshake = format!(
349 "GET {}{} HTTP/1.1\r\n\
350 Host: 127.0.0.1:{}\r\n\
351 Upgrade: websocket\r\n\
352 Connection: Upgrade\r\n\
353 Sec-WebSocket-Key: {}\r\n\
354 Sec-WebSocket-Version: {}\r\n",
355 path, query, port, ws_key, ws_version,
356 );
357 if let Some(proto) = &ws_protocol {
358 handshake.push_str(&format!("Sec-WebSocket-Protocol: {}\r\n", proto));
359 }
360 handshake.push_str("\r\n");
361
362 let (mut backend_read, mut backend_write) = backend.into_split();
363 if let Err(e) = backend_write.write_all(handshake.as_bytes()).await {
364 tracing::error!("Failed to send WebSocket handshake to backend: {}", e);
365 return error_response(502, "Failed to initiate WebSocket with backend");
366 }
367
368 let mut response_buf = vec![0u8; 4096];
370 let n = match backend_read.read(&mut response_buf).await {
371 Ok(n) if n > 0 => n,
372 _ => {
373 tracing::error!("No response from backend for WebSocket upgrade");
374 return error_response(502, "Backend did not respond to WebSocket upgrade");
375 }
376 };
377
378 let response_str = String::from_utf8_lossy(&response_buf[..n]);
379 if !response_str.contains("101") {
380 tracing::error!(
381 "Backend rejected WebSocket upgrade: {}",
382 response_str.lines().next().unwrap_or("")
383 );
384 return error_response(502, "Backend rejected WebSocket upgrade");
385 }
386
387 let mut accept_key = String::new();
389 let mut resp_protocol = None;
390 for line in response_str.lines().skip(1) {
391 if line.trim().is_empty() {
392 break;
393 }
394 if let Some((name, value)) = line.split_once(':') {
395 let name_lower = name.trim().to_lowercase();
396 let value = value.trim().to_string();
397 if name_lower == "sec-websocket-accept" {
398 accept_key = value;
399 } else if name_lower == "sec-websocket-protocol" {
400 resp_protocol = Some(value);
401 }
402 }
403 }
404
405 let client_upgrade = hyper::upgrade::on(req);
407
408 let backend_stream = backend_read.reunite(backend_write).unwrap();
410
411 tokio::spawn(async move {
413 match client_upgrade.await {
414 Ok(upgraded) => {
415 let mut client_stream = TokioIo::new(upgraded);
416 let (mut br, mut bw) = tokio::io::split(backend_stream);
417 let (mut cr, mut cw) = tokio::io::split(&mut client_stream);
418 let _ = tokio::join!(
419 tokio::io::copy(&mut br, &mut cw),
420 tokio::io::copy(&mut cr, &mut bw),
421 );
422 }
423 Err(e) => {
424 tracing::error!("WebSocket client upgrade failed: {}", e);
425 }
426 }
427 });
428
429 let mut resp = Response::builder()
431 .status(101)
432 .header("Upgrade", "websocket")
433 .header("Connection", "Upgrade")
434 .header("Sec-WebSocket-Accept", accept_key);
435 if let Some(proto) = resp_protocol {
436 resp = resp.header("Sec-WebSocket-Protocol", proto);
437 }
438 resp.body(http_body_util::Full::new(Bytes::new()).boxed())
439 .unwrap()
440}
441
442async fn read_body(req: Request<Incoming>) -> String {
443 match req.into_body().collect().await {
444 Ok(collected) => String::from_utf8_lossy(&collected.to_bytes()).to_string(),
445 Err(_) => String::new(),
446 }
447}
448
449pub async fn run_admin_server(state: Arc<AdminState>) -> Result<()> {
450 let bind = state.config_manager.get_config().admin.bind.clone();
451 let addr: std::net::SocketAddr = bind.parse()?;
452 let listener = TcpListener::bind(addr).await?;
453
454 tracing::info!("Admin API listening on {}", addr);
455
456 loop {
457 match listener.accept().await {
458 Ok((stream, _)) => {
459 let state = state.clone();
460 tokio::spawn(async move {
461 let io = TokioIo::new(stream);
462 let svc = service_fn(move |req| {
463 let state = state.clone();
464 async move { handle_admin_request(req, state).await }
465 });
466 if let Err(e) = hyper::server::conn::http1::Builder::new()
467 .serve_connection(io, svc)
468 .with_upgrades()
469 .await
470 {
471 tracing::debug!("Admin connection error: {}", e);
472 }
473 });
474 }
475 Err(e) => {
476 tracing::error!("Admin accept error: {}", e);
477 }
478 }
479 }
480}