Skip to main content

soli_proxy/admin/
mod.rs

1pub mod handlers;
2
3use crate::app::AppManager;
4use crate::circuit_breaker::SharedCircuitBreaker;
5use crate::config::ConfigManager;
6use crate::metrics::SharedMetrics;
7use anyhow::Result;
8use bytes::Bytes;
9use http_body_util::BodyExt;
10use hyper::body::Incoming;
11use hyper::service::service_fn;
12use hyper::{Method, Request, Response};
13use hyper_util::client::legacy::connect::HttpConnector;
14use hyper_util::client::legacy::Client;
15use hyper_util::rt::{TokioExecutor, TokioIo};
16use std::sync::Arc;
17use std::time::Instant;
18use tokio::io::{AsyncReadExt, AsyncWriteExt};
19use tokio::net::{TcpListener, TcpStream};
20
21type BoxBody = http_body_util::combinators::BoxBody<Bytes, std::convert::Infallible>;
22
23pub struct AdminState {
24    pub config_manager: Arc<ConfigManager>,
25    pub metrics: SharedMetrics,
26    pub start_time: Instant,
27    pub circuit_breaker: SharedCircuitBreaker,
28    pub app_manager: Option<Arc<AppManager>>,
29}
30
31pub(crate) fn cors_headers(
32    builder: hyper::http::response::Builder,
33) -> hyper::http::response::Builder {
34    builder
35        .header("Access-Control-Allow-Origin", "*")
36        .header(
37            "Access-Control-Allow-Methods",
38            "GET, POST, PUT, DELETE, OPTIONS",
39        )
40        .header("Access-Control-Allow-Headers", "Content-Type, X-Api-Key")
41}
42
43fn json_response(status: u16, body: serde_json::Value) -> Response<BoxBody> {
44    let bytes = Bytes::from(serde_json::to_string(&body).unwrap());
45    cors_headers(Response::builder())
46        .status(status)
47        .header("Content-Type", "application/json")
48        .body(http_body_util::Full::new(bytes).boxed())
49        .unwrap()
50}
51
52fn ok_response(data: serde_json::Value) -> Response<BoxBody> {
53    json_response(200, serde_json::json!({ "ok": true, "data": data }))
54}
55
56fn created_response(data: serde_json::Value) -> Response<BoxBody> {
57    json_response(201, serde_json::json!({ "ok": true, "data": data }))
58}
59
60fn no_content_response() -> Response<BoxBody> {
61    cors_headers(Response::builder())
62        .status(204)
63        .body(http_body_util::Full::new(Bytes::new()).boxed())
64        .unwrap()
65}
66
67fn preflight_response() -> Response<BoxBody> {
68    cors_headers(Response::builder())
69        .status(204)
70        .header("Access-Control-Max-Age", "86400")
71        .body(http_body_util::Full::new(Bytes::new()).boxed())
72        .unwrap()
73}
74
75fn error_response(status: u16, message: &str) -> Response<BoxBody> {
76    json_response(status, serde_json::json!({ "ok": false, "error": message }))
77}
78
79fn check_auth(req: &Request<Incoming>, api_key: &Option<String>) -> bool {
80    match api_key {
81        None => true, // No auth configured = dev mode
82        Some(key) if key.is_empty() => true,
83        Some(key) => req
84            .headers()
85            .get("X-Api-Key")
86            .and_then(|v| v.to_str().ok())
87            .is_some_and(|v| v == key),
88    }
89}
90
91/// Extract route index from path like /api/v1/routes/3
92fn extract_route_index(path: &str) -> Option<usize> {
93    path.strip_prefix("/api/v1/routes/")
94        .and_then(|s| s.parse::<usize>().ok())
95}
96
97async fn handle_admin_request(
98    req: Request<Incoming>,
99    state: Arc<AdminState>,
100) -> Result<Response<BoxBody>, std::convert::Infallible> {
101    let api_key = state.config_manager.get_config().admin.api_key.clone();
102    if !check_auth(&req, &api_key) {
103        return Ok(error_response(401, "Unauthorized"));
104    }
105
106    let method = req.method().clone();
107    let path = req.uri().path().to_string();
108
109    // Handle CORS preflight
110    if method == Method::OPTIONS {
111        return Ok(preflight_response());
112    }
113
114    let response = match (method.clone(), path.as_str()) {
115        // Phase 1: Read-only endpoints
116        (Method::GET, "/api/v1/status") => handlers::get_status(&state),
117        (Method::GET, "/api/v1/config") => handlers::get_config(&state),
118        (Method::GET, "/api/v1/routes") => handlers::get_routes(&state),
119        (Method::GET, "/api/v1/metrics") => handlers::get_metrics(&state),
120        (Method::GET, "/api/v1/app-metrics") => handlers::get_all_app_metrics(&state),
121        (Method::POST, "/api/v1/reload") => handlers::post_reload(&state).await,
122
123        // App management endpoints
124        (Method::GET, "/api/v1/apps") => handlers::get_apps(&state).await,
125        (_, p) if p.starts_with("/api/v1/apps/") => {
126            let app_name = p.strip_prefix("/api/v1/apps/").unwrap_or("");
127            if method == Method::GET && !app_name.is_empty() && !app_name.contains('/') {
128                handlers::get_app(&state, app_name).await
129            } else if method == Method::GET && app_name.ends_with("/metrics") {
130                let name = app_name.strip_suffix("/metrics").unwrap_or("");
131                if name.is_empty() {
132                    error_response(400, "Invalid app name")
133                } else {
134                    handlers::get_app_metrics(&state, name).await
135                }
136            } else if app_name.ends_with("/deploy") {
137                let name = app_name.strip_suffix("/deploy").unwrap_or("");
138                if name.is_empty() {
139                    error_response(400, "Invalid app name")
140                } else {
141                    handlers::post_app_deploy(&state, name).await
142                }
143            } else if app_name.ends_with("/restart") {
144                let name = app_name.strip_suffix("/restart").unwrap_or("");
145                if name.is_empty() {
146                    error_response(400, "Invalid app name")
147                } else {
148                    handlers::post_app_restart(&state, name).await
149                }
150            } else if app_name.ends_with("/rollback") {
151                let name = app_name.strip_suffix("/rollback").unwrap_or("");
152                if name.is_empty() {
153                    error_response(400, "Invalid app name")
154                } else {
155                    handlers::post_app_rollback(&state, name).await
156                }
157            } else if app_name.ends_with("/stop") {
158                let name = app_name.strip_suffix("/stop").unwrap_or("");
159                if name.is_empty() {
160                    error_response(400, "Invalid app name")
161                } else {
162                    handlers::post_app_stop(&state, name).await
163                }
164            } else if app_name.ends_with("/logs") {
165                let name = app_name.strip_suffix("/logs").unwrap_or("");
166                if name.is_empty() {
167                    error_response(400, "Invalid app name")
168                } else {
169                    handlers::get_app_logs(&state, name).await
170                }
171            } else {
172                error_response(404, "Not found")
173            }
174        }
175
176        // Phase 2: Mutation endpoints
177        (Method::POST, "/api/v1/routes") => {
178            let body = read_body(req).await;
179            handlers::post_route(&state, &body)
180        }
181        (Method::PUT, "/api/v1/config") => {
182            let body = read_body(req).await;
183            handlers::put_config(&state, &body)
184        }
185
186        // Routes with index parameter
187        (Method::GET, p) if p.starts_with("/api/v1/routes/") => match extract_route_index(p) {
188            Some(idx) => handlers::get_route(&state, idx),
189            None => error_response(400, "Invalid route index"),
190        },
191        (Method::PUT, p) if p.starts_with("/api/v1/routes/") => match extract_route_index(p) {
192            Some(idx) => {
193                let body = read_body(req).await;
194                handlers::put_route(&state, idx, &body)
195            }
196            None => error_response(400, "Invalid route index"),
197        },
198        (Method::DELETE, p) if p.starts_with("/api/v1/routes/") => match extract_route_index(p) {
199            Some(idx) => handlers::delete_route(&state, idx),
200            None => error_response(400, "Invalid route index"),
201        },
202
203        // Circuit breaker endpoints
204        (Method::GET, "/api/v1/circuit-breaker") => handlers::get_circuit_breaker(&state),
205        (Method::POST, "/api/v1/circuit-breaker/reset") => handlers::reset_circuit_breaker(&state),
206
207        // Utility endpoints
208        (Method::POST, "/api/v1/hash-password") => {
209            let body = read_body(req).await;
210            handlers::post_hash_password(&state, &body)
211        }
212
213        // Everything else → proxy to _admin app (UI, static assets, etc.)
214        _ if state.app_manager.is_some() => {
215            let is_ws = req
216                .headers()
217                .get("upgrade")
218                .and_then(|v| v.to_str().ok())
219                .is_some_and(|v| v.eq_ignore_ascii_case("websocket"));
220            if is_ws {
221                proxy_websocket_to_admin_app(req, &state).await
222            } else {
223                proxy_to_admin_app(req, &state).await
224            }
225        }
226        _ => error_response(404, "Not found"),
227    };
228
229    Ok(response)
230}
231
232async fn proxy_to_admin_app(req: Request<Incoming>, state: &Arc<AdminState>) -> Response<BoxBody> {
233    let port = match resolve_admin_port(state).await {
234        Ok(p) => p,
235        Err(resp) => return resp,
236    };
237
238    let path = req.uri().path();
239    let query = req
240        .uri()
241        .query()
242        .map(|q| format!("?{}", q))
243        .unwrap_or_default();
244    let target_uri = format!("http://localhost:{}{}{}", port, path, query);
245
246    let (mut parts, body) = req.into_parts();
247    parts.uri = match target_uri.parse() {
248        Ok(uri) => uri,
249        Err(_) => return error_response(500, "Failed to build proxy URI"),
250    };
251
252    let proxy_req = Request::from_parts(parts, body);
253
254    let mut connector = HttpConnector::new();
255    connector.set_connect_timeout(Some(std::time::Duration::from_secs(3)));
256
257    let client: Client<HttpConnector, Incoming> =
258        Client::builder(TokioExecutor::new()).build(connector);
259
260    match client.request(proxy_req).await {
261        Ok(resp) => {
262            let (parts, body) = resp.into_parts();
263            let mapped = body.map_err(|_| -> std::convert::Infallible { unreachable!() });
264            Response::from_parts(parts, mapped.boxed())
265        }
266        Err(e) => {
267            tracing::error!("Failed to proxy to _admin app: {}", e);
268            error_response(502, &format!("Admin app not reachable on port {} — deploy it first: POST /api/v1/apps/_admin/deploy", port))
269        }
270    }
271}
272
273/// Resolve the _admin app's backend port, or return an error response.
274async fn resolve_admin_port(state: &Arc<AdminState>) -> Result<u16, Response<BoxBody>> {
275    let app_manager = match &state.app_manager {
276        Some(m) => m,
277        None => return Err(error_response(501, "App management not configured")),
278    };
279
280    let app = match app_manager.get_app("_admin").await {
281        Some(a) => a,
282        None => return Err(error_response(502, "_admin app not found")),
283    };
284
285    let port = if app.current_slot == "blue" {
286        app.blue.port
287    } else {
288        app.green.port
289    };
290
291    if port == 0 {
292        return Err(error_response(502, "_admin app not deployed"));
293    }
294
295    Ok(port)
296}
297
298async fn proxy_websocket_to_admin_app(
299    req: Request<Incoming>,
300    state: &Arc<AdminState>,
301) -> Response<BoxBody> {
302    let port = match resolve_admin_port(state).await {
303        Ok(p) => p,
304        Err(resp) => return resp,
305    };
306
307    let path = req.uri().path().to_string();
308    let query = req
309        .uri()
310        .query()
311        .map(|q| format!("?{}", q))
312        .unwrap_or_default();
313
314    // Build the raw HTTP upgrade request to send to the backend
315    let ws_key = req
316        .headers()
317        .get("sec-websocket-key")
318        .and_then(|v| v.to_str().ok())
319        .unwrap_or("")
320        .to_string();
321    let ws_version = req
322        .headers()
323        .get("sec-websocket-version")
324        .and_then(|v| v.to_str().ok())
325        .unwrap_or("13")
326        .to_string();
327    let ws_protocol = req
328        .headers()
329        .get("sec-websocket-protocol")
330        .and_then(|v| v.to_str().ok())
331        .map(|s| s.to_string());
332
333    // Connect to the backend
334    let backend = match TcpStream::connect(format!("127.0.0.1:{}", port)).await {
335        Ok(s) => s,
336        Err(e) => {
337            tracing::error!("Failed to connect to _admin backend for WebSocket: {}", e);
338            return error_response(502, "Admin app not reachable");
339        }
340    };
341
342    // Send the upgrade request to the backend
343    let mut handshake = format!(
344        "GET {}{} HTTP/1.1\r\n\
345         Host: 127.0.0.1:{}\r\n\
346         Upgrade: websocket\r\n\
347         Connection: Upgrade\r\n\
348         Sec-WebSocket-Key: {}\r\n\
349         Sec-WebSocket-Version: {}\r\n",
350        path, query, port, ws_key, ws_version,
351    );
352    if let Some(proto) = &ws_protocol {
353        handshake.push_str(&format!("Sec-WebSocket-Protocol: {}\r\n", proto));
354    }
355    handshake.push_str("\r\n");
356
357    let (mut backend_read, mut backend_write) = backend.into_split();
358    if let Err(e) = backend_write.write_all(handshake.as_bytes()).await {
359        tracing::error!("Failed to send WebSocket handshake to backend: {}", e);
360        return error_response(502, "Failed to initiate WebSocket with backend");
361    }
362
363    // Read the backend's 101 response
364    let mut response_buf = vec![0u8; 4096];
365    let n = match backend_read.read(&mut response_buf).await {
366        Ok(n) if n > 0 => n,
367        _ => {
368            tracing::error!("No response from backend for WebSocket upgrade");
369            return error_response(502, "Backend did not respond to WebSocket upgrade");
370        }
371    };
372
373    let response_str = String::from_utf8_lossy(&response_buf[..n]);
374    if !response_str.contains("101") {
375        tracing::error!(
376            "Backend rejected WebSocket upgrade: {}",
377            response_str.lines().next().unwrap_or("")
378        );
379        return error_response(502, "Backend rejected WebSocket upgrade");
380    }
381
382    // Extract headers from backend 101 response to forward to client
383    let mut accept_key = String::new();
384    let mut resp_protocol = None;
385    for line in response_str.lines().skip(1) {
386        if line.trim().is_empty() {
387            break;
388        }
389        if let Some((name, value)) = line.split_once(':') {
390            let name_lower = name.trim().to_lowercase();
391            let value = value.trim().to_string();
392            if name_lower == "sec-websocket-accept" {
393                accept_key = value;
394            } else if name_lower == "sec-websocket-protocol" {
395                resp_protocol = Some(value);
396            }
397        }
398    }
399
400    // Use hyper::upgrade::on to get the client-side stream after we return 101
401    let client_upgrade = hyper::upgrade::on(req);
402
403    // Reunite the backend halves
404    let backend_stream = backend_read.reunite(backend_write).unwrap();
405
406    // Spawn the bidirectional copy task
407    tokio::spawn(async move {
408        match client_upgrade.await {
409            Ok(upgraded) => {
410                let mut client_stream = TokioIo::new(upgraded);
411                let (mut br, mut bw) = tokio::io::split(backend_stream);
412                let (mut cr, mut cw) = tokio::io::split(&mut client_stream);
413                let _ = tokio::join!(
414                    tokio::io::copy(&mut br, &mut cw),
415                    tokio::io::copy(&mut cr, &mut bw),
416                );
417            }
418            Err(e) => {
419                tracing::error!("WebSocket client upgrade failed: {}", e);
420            }
421        }
422    });
423
424    // Return 101 Switching Protocols to the client
425    let mut resp = Response::builder()
426        .status(101)
427        .header("Upgrade", "websocket")
428        .header("Connection", "Upgrade")
429        .header("Sec-WebSocket-Accept", accept_key);
430    if let Some(proto) = resp_protocol {
431        resp = resp.header("Sec-WebSocket-Protocol", proto);
432    }
433    resp.body(http_body_util::Full::new(Bytes::new()).boxed())
434        .unwrap()
435}
436
437async fn read_body(req: Request<Incoming>) -> String {
438    match req.into_body().collect().await {
439        Ok(collected) => String::from_utf8_lossy(&collected.to_bytes()).to_string(),
440        Err(_) => String::new(),
441    }
442}
443
444pub async fn run_admin_server(state: Arc<AdminState>) -> Result<()> {
445    let bind = state.config_manager.get_config().admin.bind.clone();
446    let addr: std::net::SocketAddr = bind.parse()?;
447    let listener = TcpListener::bind(addr).await?;
448
449    tracing::info!("Admin API listening on {}", addr);
450
451    loop {
452        match listener.accept().await {
453            Ok((stream, _)) => {
454                let state = state.clone();
455                tokio::spawn(async move {
456                    let io = TokioIo::new(stream);
457                    let svc = service_fn(move |req| {
458                        let state = state.clone();
459                        async move { handle_admin_request(req, state).await }
460                    });
461                    if let Err(e) = hyper::server::conn::http1::Builder::new()
462                        .serve_connection(io, svc)
463                        .with_upgrades()
464                        .await
465                    {
466                        tracing::debug!("Admin connection error: {}", e);
467                    }
468                });
469            }
470            Err(e) => {
471                tracing::error!("Admin accept error: {}", e);
472            }
473        }
474    }
475}