Skip to main content

soli_proxy/admin/
mod.rs

1pub mod handlers;
2
3use crate::app::AppManager;
4use crate::circuit_breaker::SharedCircuitBreaker;
5use crate::config::ConfigManager;
6use crate::metrics::SharedMetrics;
7use anyhow::Result;
8use bytes::Bytes;
9use http_body_util::BodyExt;
10use hyper::body::Incoming;
11use hyper::service::service_fn;
12use hyper::{Method, Request, Response};
13use hyper_util::client::legacy::connect::HttpConnector;
14use hyper_util::client::legacy::Client;
15use hyper_util::rt::{TokioExecutor, TokioIo};
16use std::sync::Arc;
17use std::time::Instant;
18use tokio::io::{AsyncReadExt, AsyncWriteExt};
19use tokio::net::{TcpListener, TcpStream};
20
21type BoxBody = http_body_util::combinators::BoxBody<Bytes, std::convert::Infallible>;
22
23pub struct AdminState {
24    pub config_manager: Arc<ConfigManager>,
25    pub metrics: SharedMetrics,
26    pub start_time: Instant,
27    pub circuit_breaker: SharedCircuitBreaker,
28    pub app_manager: Option<Arc<AppManager>>,
29}
30
31pub(crate) fn cors_headers(
32    builder: hyper::http::response::Builder,
33) -> hyper::http::response::Builder {
34    builder
35        .header("Access-Control-Allow-Origin", "*")
36        .header(
37            "Access-Control-Allow-Methods",
38            "GET, POST, PUT, DELETE, OPTIONS",
39        )
40        .header("Access-Control-Allow-Headers", "Content-Type, X-Api-Key")
41}
42
43fn json_response(status: u16, body: serde_json::Value) -> Response<BoxBody> {
44    let bytes = Bytes::from(serde_json::to_string(&body).unwrap());
45    cors_headers(Response::builder())
46        .status(status)
47        .header("Content-Type", "application/json")
48        .body(http_body_util::Full::new(bytes).boxed())
49        .unwrap()
50}
51
52fn ok_response(data: serde_json::Value) -> Response<BoxBody> {
53    json_response(200, serde_json::json!({ "ok": true, "data": data }))
54}
55
56fn created_response(data: serde_json::Value) -> Response<BoxBody> {
57    json_response(201, serde_json::json!({ "ok": true, "data": data }))
58}
59
60fn no_content_response() -> Response<BoxBody> {
61    cors_headers(Response::builder())
62        .status(204)
63        .body(http_body_util::Full::new(Bytes::new()).boxed())
64        .unwrap()
65}
66
67fn preflight_response() -> Response<BoxBody> {
68    cors_headers(Response::builder())
69        .status(204)
70        .header("Access-Control-Max-Age", "86400")
71        .body(http_body_util::Full::new(Bytes::new()).boxed())
72        .unwrap()
73}
74
75fn error_response(status: u16, message: &str) -> Response<BoxBody> {
76    json_response(status, serde_json::json!({ "ok": false, "error": message }))
77}
78
79fn unauthorized_response(use_basic_auth: bool) -> Response<BoxBody> {
80    let body = http_body_util::Full::new(Bytes::from("Unauthorized")).boxed();
81    let mut builder = Response::builder()
82        .status(401)
83        .header("Content-Type", "application/json");
84    if use_basic_auth {
85        builder = builder.header("WWW-Authenticate", "Basic realm=\"Admin\"");
86    }
87    builder.body(body).unwrap()
88}
89
90fn check_auth(
91    req: &Request<Incoming>,
92    api_key: &Option<String>,
93    username: &Option<String>,
94    password_hash: &Option<String>,
95) -> bool {
96    if username.is_none() && password_hash.is_none() && api_key.is_none() {
97        return true;
98    }
99
100    if let Some(key) = api_key {
101        if !key.is_empty()
102            && req
103                .headers()
104                .get("X-Api-Key")
105                .and_then(|v| v.to_str().ok())
106                .is_some_and(|v| v == key)
107        {
108            return true;
109        }
110    }
111
112    if let (Some(user), Some(hash)) = (username, password_hash) {
113        if let Some(auth_header) = req.headers().get("authorization") {
114            if let Ok(header_value) = auth_header.to_str() {
115                if let Some(encoded) = header_value.strip_prefix("Basic ") {
116                    if let Ok(decoded) =
117                        base64::Engine::decode(&base64::engine::general_purpose::STANDARD, encoded)
118                    {
119                        let creds = String::from_utf8_lossy(&decoded);
120                        if let Some((u, p)) = creds.split_once(':') {
121                            if u == user && crate::auth::verify_password(p, hash) {
122                                return true;
123                            }
124                        }
125                    }
126                }
127            }
128        }
129    }
130
131    false
132}
133
134/// Extract route index from path like /api/v1/routes/3
135fn extract_route_index(path: &str) -> Option<usize> {
136    path.strip_prefix("/api/v1/routes/")
137        .and_then(|s| s.parse::<usize>().ok())
138}
139
140async fn handle_admin_request(
141    req: Request<Incoming>,
142    state: Arc<AdminState>,
143) -> Result<Response<BoxBody>, std::convert::Infallible> {
144    let admin_config = &state.config_manager.get_config().admin;
145    let api_key = admin_config.api_key.clone();
146    let username = admin_config.username.clone();
147    let password_hash = admin_config.password_hash.clone();
148    let use_basic_auth = username.is_some() && password_hash.is_some();
149    if !check_auth(&req, &api_key, &username, &password_hash) {
150        return Ok(unauthorized_response(use_basic_auth));
151    }
152
153    let method = req.method().clone();
154    let path = req.uri().path().to_string();
155
156    // Handle CORS preflight
157    if method == Method::OPTIONS {
158        return Ok(preflight_response());
159    }
160
161    let response = match (method.clone(), path.as_str()) {
162        // Phase 1: Read-only endpoints
163        (Method::GET, "/api/v1/status") => handlers::get_status(&state).await,
164        (Method::GET, "/api/v1/config") => handlers::get_config(&state),
165        (Method::GET, "/api/v1/routes") => handlers::get_routes(&state),
166        (Method::GET, "/api/v1/metrics") => handlers::get_metrics(&state),
167        (Method::GET, "/api/v1/app-metrics") => handlers::get_all_app_metrics(&state),
168        (Method::GET, "/api/v1/app-metrics/system") => {
169            handlers::get_app_system_metrics(&state).await
170        }
171        (Method::GET, "/api/v1/events/apps") => handlers::sse_app_events(state.clone()).await,
172        (Method::POST, "/api/v1/reload") => handlers::post_reload(&state).await,
173
174        // App management endpoints
175        (Method::GET, "/api/v1/apps") => handlers::get_apps(&state).await,
176        (Method::GET, "/api/v1/apps/by-domain") => handlers::get_apps_by_domain(&state).await,
177        (_, p) if p.starts_with("/api/v1/apps/") => {
178            let app_name = p.strip_prefix("/api/v1/apps/").unwrap_or("");
179            if method == Method::GET && !app_name.is_empty() && !app_name.contains('/') {
180                handlers::get_app(&state, app_name).await
181            } else if method == Method::GET && app_name.ends_with("/metrics") {
182                let name = app_name.strip_suffix("/metrics").unwrap_or("");
183                if name.is_empty() {
184                    error_response(400, "Invalid app name")
185                } else {
186                    handlers::get_app_metrics(&state, name).await
187                }
188            } else if app_name.ends_with("/deploy") {
189                let name = app_name.strip_suffix("/deploy").unwrap_or("");
190                if name.is_empty() {
191                    error_response(400, "Invalid app name")
192                } else {
193                    handlers::post_app_deploy(&state, name).await
194                }
195            } else if app_name.ends_with("/restart") {
196                let name = app_name.strip_suffix("/restart").unwrap_or("");
197                if name.is_empty() {
198                    error_response(400, "Invalid app name")
199                } else {
200                    handlers::post_app_restart(&state, name).await
201                }
202            } else if app_name.ends_with("/rollback") {
203                let name = app_name.strip_suffix("/rollback").unwrap_or("");
204                if name.is_empty() {
205                    error_response(400, "Invalid app name")
206                } else {
207                    handlers::post_app_rollback(&state, name).await
208                }
209            } else if app_name.ends_with("/stop") {
210                let name = app_name.strip_suffix("/stop").unwrap_or("");
211                if name.is_empty() {
212                    error_response(400, "Invalid app name")
213                } else {
214                    handlers::post_app_stop(&state, name).await
215                }
216            } else if app_name.ends_with("/logs") {
217                let name = app_name.strip_suffix("/logs").unwrap_or("");
218                if name.is_empty() {
219                    error_response(400, "Invalid app name")
220                } else {
221                    handlers::get_app_logs(&state, name).await
222                }
223            } else {
224                error_response(404, "Not found")
225            }
226        }
227
228        // Phase 2: Mutation endpoints
229        (Method::POST, "/api/v1/routes") => {
230            let body = read_body(req).await;
231            handlers::post_route(&state, &body)
232        }
233        (Method::PUT, "/api/v1/config") => {
234            let body = read_body(req).await;
235            handlers::put_config(&state, &body)
236        }
237
238        // Routes with index parameter
239        (Method::GET, p) if p.starts_with("/api/v1/routes/") => match extract_route_index(p) {
240            Some(idx) => handlers::get_route(&state, idx),
241            None => error_response(400, "Invalid route index"),
242        },
243        (Method::PUT, p) if p.starts_with("/api/v1/routes/") => match extract_route_index(p) {
244            Some(idx) => {
245                let body = read_body(req).await;
246                handlers::put_route(&state, idx, &body)
247            }
248            None => error_response(400, "Invalid route index"),
249        },
250        (Method::DELETE, p) if p.starts_with("/api/v1/routes/") => match extract_route_index(p) {
251            Some(idx) => handlers::delete_route(&state, idx),
252            None => error_response(400, "Invalid route index"),
253        },
254
255        // Circuit breaker endpoints
256        (Method::GET, "/api/v1/circuit-breaker") => handlers::get_circuit_breaker(&state),
257        (Method::POST, "/api/v1/circuit-breaker/reset") => handlers::reset_circuit_breaker(&state),
258
259        // Utility endpoints
260        (Method::POST, "/api/v1/hash-password") => {
261            let body = read_body(req).await;
262            handlers::post_hash_password(&state, &body)
263        }
264
265        // Everything else → proxy to _admin app (UI, static assets, etc.)
266        _ if state.app_manager.is_some() => {
267            let is_ws = req
268                .headers()
269                .get("upgrade")
270                .and_then(|v| v.to_str().ok())
271                .is_some_and(|v| v.eq_ignore_ascii_case("websocket"));
272            if is_ws {
273                proxy_websocket_to_admin_app(req, &state).await
274            } else {
275                proxy_to_admin_app(req, &state).await
276            }
277        }
278        _ => error_response(404, "Not found"),
279    };
280
281    Ok(response)
282}
283
284async fn proxy_to_admin_app(req: Request<Incoming>, state: &Arc<AdminState>) -> Response<BoxBody> {
285    let port = match resolve_admin_port(state).await {
286        Ok(p) => p,
287        Err(resp) => return resp,
288    };
289
290    let path = req.uri().path();
291    let query = req
292        .uri()
293        .query()
294        .map(|q| format!("?{}", q))
295        .unwrap_or_default();
296    let target_uri = format!("http://localhost:{}{}{}", port, path, query);
297
298    let (mut parts, body) = req.into_parts();
299    parts.uri = match target_uri.parse() {
300        Ok(uri) => uri,
301        Err(_) => return error_response(500, "Failed to build proxy URI"),
302    };
303
304    let proxy_req = Request::from_parts(parts, body);
305
306    let mut connector = HttpConnector::new();
307    connector.set_connect_timeout(Some(std::time::Duration::from_secs(3)));
308
309    let client: Client<HttpConnector, Incoming> =
310        Client::builder(TokioExecutor::new()).build(connector);
311
312    match client.request(proxy_req).await {
313        Ok(resp) => {
314            let (parts, body) = resp.into_parts();
315            let mapped = body.map_err(|_| -> std::convert::Infallible { unreachable!() });
316            Response::from_parts(parts, mapped.boxed())
317        }
318        Err(e) => {
319            tracing::error!("Failed to proxy to _admin app: {}", e);
320            error_response(502, &format!("Admin app not reachable on port {} — deploy it first: POST /api/v1/apps/_admin/deploy", port))
321        }
322    }
323}
324
325/// Resolve the _admin app's backend port, or return an error response.
326async fn resolve_admin_port(state: &Arc<AdminState>) -> Result<u16, Response<BoxBody>> {
327    let app_manager = match &state.app_manager {
328        Some(m) => m,
329        None => return Err(error_response(501, "App management not configured")),
330    };
331
332    let app = match app_manager.get_app("_admin").await {
333        Some(a) => a,
334        None => return Err(error_response(502, "_admin app not found")),
335    };
336
337    let port = if app.current_slot == "blue" {
338        app.blue.port
339    } else {
340        app.green.port
341    };
342
343    if port == 0 {
344        return Err(error_response(502, "_admin app not deployed"));
345    }
346
347    Ok(port)
348}
349
350async fn proxy_websocket_to_admin_app(
351    req: Request<Incoming>,
352    state: &Arc<AdminState>,
353) -> Response<BoxBody> {
354    let port = match resolve_admin_port(state).await {
355        Ok(p) => p,
356        Err(resp) => return resp,
357    };
358
359    let path = req.uri().path().to_string();
360    let query = req
361        .uri()
362        .query()
363        .map(|q| format!("?{}", q))
364        .unwrap_or_default();
365
366    // Build the raw HTTP upgrade request to send to the backend
367    let ws_key = req
368        .headers()
369        .get("sec-websocket-key")
370        .and_then(|v| v.to_str().ok())
371        .unwrap_or("")
372        .to_string();
373    let ws_version = req
374        .headers()
375        .get("sec-websocket-version")
376        .and_then(|v| v.to_str().ok())
377        .unwrap_or("13")
378        .to_string();
379    let ws_protocol = req
380        .headers()
381        .get("sec-websocket-protocol")
382        .and_then(|v| v.to_str().ok())
383        .map(|s| s.to_string());
384
385    // Connect to the backend
386    let backend = match TcpStream::connect(format!("127.0.0.1:{}", port)).await {
387        Ok(s) => s,
388        Err(e) => {
389            tracing::error!("Failed to connect to _admin backend for WebSocket: {}", e);
390            return error_response(502, "Admin app not reachable");
391        }
392    };
393
394    // Send the upgrade request to the backend
395    let mut handshake = format!(
396        "GET {}{} HTTP/1.1\r\n\
397         Host: 127.0.0.1:{}\r\n\
398         Upgrade: websocket\r\n\
399         Connection: Upgrade\r\n\
400         Sec-WebSocket-Key: {}\r\n\
401         Sec-WebSocket-Version: {}\r\n",
402        path, query, port, ws_key, ws_version,
403    );
404    if let Some(proto) = &ws_protocol {
405        handshake.push_str(&format!("Sec-WebSocket-Protocol: {}\r\n", proto));
406    }
407    handshake.push_str("\r\n");
408
409    let (mut backend_read, mut backend_write) = backend.into_split();
410    if let Err(e) = backend_write.write_all(handshake.as_bytes()).await {
411        tracing::error!("Failed to send WebSocket handshake to backend: {}", e);
412        return error_response(502, "Failed to initiate WebSocket with backend");
413    }
414
415    // Read the backend's 101 response
416    let mut response_buf = vec![0u8; 4096];
417    let n = match backend_read.read(&mut response_buf).await {
418        Ok(n) if n > 0 => n,
419        _ => {
420            tracing::error!("No response from backend for WebSocket upgrade");
421            return error_response(502, "Backend did not respond to WebSocket upgrade");
422        }
423    };
424
425    let response_str = String::from_utf8_lossy(&response_buf[..n]);
426    if !response_str.contains("101") {
427        tracing::error!(
428            "Backend rejected WebSocket upgrade: {}",
429            response_str.lines().next().unwrap_or("")
430        );
431        return error_response(502, "Backend rejected WebSocket upgrade");
432    }
433
434    // Extract headers from backend 101 response to forward to client
435    let mut accept_key = String::new();
436    let mut resp_protocol = None;
437    for line in response_str.lines().skip(1) {
438        if line.trim().is_empty() {
439            break;
440        }
441        if let Some((name, value)) = line.split_once(':') {
442            let name_lower = name.trim().to_lowercase();
443            let value = value.trim().to_string();
444            if name_lower == "sec-websocket-accept" {
445                accept_key = value;
446            } else if name_lower == "sec-websocket-protocol" {
447                resp_protocol = Some(value);
448            }
449        }
450    }
451
452    // Use hyper::upgrade::on to get the client-side stream after we return 101
453    let client_upgrade = hyper::upgrade::on(req);
454
455    // Reunite the backend halves
456    let backend_stream = backend_read.reunite(backend_write).unwrap();
457
458    // Spawn the bidirectional copy task
459    tokio::spawn(async move {
460        match client_upgrade.await {
461            Ok(upgraded) => {
462                let mut client_stream = TokioIo::new(upgraded);
463                let (mut br, mut bw) = tokio::io::split(backend_stream);
464                let (mut cr, mut cw) = tokio::io::split(&mut client_stream);
465                let _ = tokio::join!(
466                    tokio::io::copy(&mut br, &mut cw),
467                    tokio::io::copy(&mut cr, &mut bw),
468                );
469            }
470            Err(e) => {
471                tracing::error!("WebSocket client upgrade failed: {}", e);
472            }
473        }
474    });
475
476    // Return 101 Switching Protocols to the client
477    let mut resp = Response::builder()
478        .status(101)
479        .header("Upgrade", "websocket")
480        .header("Connection", "Upgrade")
481        .header("Sec-WebSocket-Accept", accept_key);
482    if let Some(proto) = resp_protocol {
483        resp = resp.header("Sec-WebSocket-Protocol", proto);
484    }
485    resp.body(http_body_util::Full::new(Bytes::new()).boxed())
486        .unwrap()
487}
488
489async fn read_body(req: Request<Incoming>) -> String {
490    match req.into_body().collect().await {
491        Ok(collected) => String::from_utf8_lossy(&collected.to_bytes()).to_string(),
492        Err(_) => String::new(),
493    }
494}
495
496pub async fn run_admin_server(state: Arc<AdminState>) -> Result<()> {
497    let bind = state.config_manager.get_config().admin.bind.clone();
498    let addr: std::net::SocketAddr = bind.parse()?;
499    let listener = TcpListener::bind(addr).await?;
500
501    tracing::info!("Admin API listening on {}", addr);
502
503    loop {
504        match listener.accept().await {
505            Ok((stream, _)) => {
506                let state = state.clone();
507                tokio::spawn(async move {
508                    let io = TokioIo::new(stream);
509                    let svc = service_fn(move |req| {
510                        let state = state.clone();
511                        async move { handle_admin_request(req, state).await }
512                    });
513                    if let Err(e) = hyper::server::conn::http1::Builder::new()
514                        .serve_connection(io, svc)
515                        .with_upgrades()
516                        .await
517                    {
518                        tracing::debug!("Admin connection error: {}", e);
519                    }
520                });
521            }
522            Err(e) => {
523                tracing::error!("Admin accept error: {}", e);
524            }
525        }
526    }
527}