1use std::collections::HashMap;
6use std::sync::Arc;
7use std::time::Duration;
8
9use axum::Router;
10use axum::extract::{Extension, Path};
11use axum::http::StatusCode;
12use axum::response::{IntoResponse, Json};
13use axum::routing::{delete, get, post};
14use chrono::{DateTime, Utc};
15use serde::{Deserialize, Serialize};
16use tokio::sync::RwLock;
17use tower_mcp::SessionHandle;
18use tower_mcp::proxy::McpProxy;
19
20#[derive(Clone)]
22pub struct AdminState {
23 health: Arc<RwLock<Vec<BackendStatus>>>,
24 health_history: Arc<RwLock<Vec<HealthEvent>>>,
25 proxy_name: String,
26 proxy_version: String,
27 backend_count: usize,
28}
29
30#[derive(Debug, Clone, Serialize)]
32#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
33pub struct HealthEvent {
34 pub namespace: String,
36 pub healthy: bool,
38 pub timestamp: DateTime<Utc>,
40}
41
42const MAX_HEALTH_HISTORY: usize = 100;
44
45impl AdminState {
46 pub async fn health(&self) -> Vec<BackendStatus> {
48 self.health.read().await.clone()
49 }
50
51 pub fn proxy_name(&self) -> &str {
53 &self.proxy_name
54 }
55
56 pub fn proxy_version(&self) -> &str {
58 &self.proxy_version
59 }
60
61 pub fn backend_count(&self) -> usize {
63 self.backend_count
64 }
65}
66
67#[derive(Serialize, Clone)]
69#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
70pub struct BackendStatus {
71 pub namespace: String,
73 pub healthy: bool,
75 pub last_checked_at: Option<DateTime<Utc>>,
77 pub consecutive_failures: u32,
79 pub error: Option<String>,
81 pub transport: Option<String>,
83}
84
85#[derive(Serialize)]
86#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
87struct AdminBackendsResponse {
88 proxy: ProxyInfo,
89 backends: Vec<BackendStatus>,
90}
91
92#[derive(Serialize)]
93#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
94struct ProxyInfo {
95 name: String,
96 version: String,
97 backend_count: usize,
98 active_sessions: usize,
99}
100
101#[derive(Clone)]
103pub struct BackendMeta {
104 pub transport: String,
106}
107
108pub fn spawn_health_checker(
111 proxy: McpProxy,
112 proxy_name: String,
113 proxy_version: String,
114 backend_count: usize,
115 backend_meta: HashMap<String, BackendMeta>,
116) -> AdminState {
117 let health: Arc<RwLock<Vec<BackendStatus>>> = Arc::new(RwLock::new(Vec::new()));
118 let health_writer = Arc::clone(&health);
119 let health_history: Arc<RwLock<Vec<HealthEvent>>> = Arc::new(RwLock::new(Vec::new()));
120 let history_writer = Arc::clone(&health_history);
121
122 std::thread::spawn(move || {
123 let rt = tokio::runtime::Builder::new_current_thread()
124 .enable_all()
125 .build()
126 .expect("admin health check runtime");
127
128 let mut failure_counts: HashMap<String, u32> = HashMap::new();
129 let mut prev_healthy: HashMap<String, bool> = HashMap::new();
131
132 rt.block_on(async move {
133 loop {
134 let results = proxy.health_check().await;
135 let now = Utc::now();
136 let mut transitions = Vec::new();
137
138 let statuses: Vec<BackendStatus> = results
139 .into_iter()
140 .map(|h| {
141 let count = failure_counts.entry(h.namespace.clone()).or_insert(0);
142 if h.healthy {
143 *count = 0;
144 } else {
145 *count += 1;
146 }
147
148 let prev = prev_healthy.get(&h.namespace).copied();
150 if prev != Some(h.healthy) {
151 transitions.push(HealthEvent {
152 namespace: h.namespace.clone(),
153 healthy: h.healthy,
154 timestamp: now,
155 });
156 prev_healthy.insert(h.namespace.clone(), h.healthy);
157 }
158
159 let meta = backend_meta.get(&h.namespace);
160 BackendStatus {
161 namespace: h.namespace,
162 healthy: h.healthy,
163 last_checked_at: Some(now),
164 consecutive_failures: *count,
165 error: if h.healthy {
166 None
167 } else {
168 Some("ping failed".to_string())
169 },
170 transport: meta.map(|m| m.transport.clone()),
171 }
172 })
173 .collect();
174
175 *health_writer.write().await = statuses;
176
177 if !transitions.is_empty() {
179 let mut history = history_writer.write().await;
180 history.extend(transitions);
181 if history.len() > MAX_HEALTH_HISTORY {
183 let excess = history.len() - MAX_HEALTH_HISTORY;
184 history.drain(..excess);
185 }
186 }
187
188 tokio::time::sleep(Duration::from_secs(10)).await;
189 }
190 });
191 });
192
193 AdminState {
194 health,
195 health_history,
196 proxy_name,
197 proxy_version,
198 backend_count,
199 }
200}
201
202async fn handle_backends(
203 Extension(state): Extension<AdminState>,
204 Extension(session_handle): Extension<SessionHandle>,
205) -> Json<AdminBackendsResponse> {
206 let backends = state.health.read().await.clone();
207 let active_sessions = session_handle.session_count().await;
208
209 Json(AdminBackendsResponse {
210 proxy: ProxyInfo {
211 name: state.proxy_name,
212 version: state.proxy_version,
213 backend_count: state.backend_count,
214 active_sessions,
215 },
216 backends,
217 })
218}
219
220async fn handle_health(Extension(state): Extension<AdminState>) -> Json<HealthResponse> {
221 let backends = state.health.read().await;
222 let all_healthy = backends.iter().all(|b| b.healthy);
223 let unhealthy: Vec<String> = backends
224 .iter()
225 .filter(|b| !b.healthy)
226 .map(|b| b.namespace.clone())
227 .collect();
228 Json(HealthResponse {
229 status: if all_healthy { "healthy" } else { "degraded" }.to_string(),
230 unhealthy_backends: unhealthy,
231 })
232}
233
234#[derive(Serialize)]
235#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
236struct HealthResponse {
237 status: String,
238 unhealthy_backends: Vec<String>,
239}
240
241#[cfg(feature = "metrics")]
242async fn handle_metrics(
243 Extension(handle): Extension<Option<metrics_exporter_prometheus::PrometheusHandle>>,
244) -> impl IntoResponse {
245 match handle {
246 Some(h) => h.render(),
247 None => String::new(),
248 }
249}
250
251#[cfg(not(feature = "metrics"))]
252async fn handle_metrics() -> impl IntoResponse {
253 String::new()
254}
255
256async fn handle_cache_stats(
257 Extension(cache_handle): Extension<Option<crate::cache::CacheHandle>>,
258) -> Json<Vec<crate::cache::CacheStatsSnapshot>> {
259 match cache_handle {
260 Some(h) => Json(h.stats().await),
261 None => Json(vec![]),
262 }
263}
264
265async fn handle_cache_clear(
266 Extension(cache_handle): Extension<Option<crate::cache::CacheHandle>>,
267) -> &'static str {
268 if let Some(h) = cache_handle {
269 h.clear().await;
270 "caches cleared"
271 } else {
272 "no caches configured"
273 }
274}
275
276#[cfg(test)]
278fn test_admin_state(
279 proxy_name: &str,
280 proxy_version: &str,
281 backend_count: usize,
282 statuses: Vec<BackendStatus>,
283) -> AdminState {
284 AdminState {
285 health: Arc::new(RwLock::new(statuses)),
286 health_history: Arc::new(RwLock::new(Vec::new())),
287 proxy_name: proxy_name.to_string(),
288 proxy_version: proxy_version.to_string(),
289 backend_count,
290 }
291}
292
293#[cfg(feature = "metrics")]
295pub type MetricsHandle = Option<metrics_exporter_prometheus::PrometheusHandle>;
296#[cfg(not(feature = "metrics"))]
298pub type MetricsHandle = Option<()>;
299
300#[derive(Debug, Deserialize)]
306#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
307struct AddBackendRequest {
308 name: String,
310 url: String,
312 bearer_token: Option<String>,
314}
315
316#[derive(Serialize)]
318#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
319struct BackendOpResponse {
320 ok: bool,
321 message: String,
322}
323
324async fn handle_add_backend(
325 Extension(proxy): Extension<McpProxy>,
326 Json(req): Json<AddBackendRequest>,
327) -> (StatusCode, Json<BackendOpResponse>) {
328 let mut transport = tower_mcp::client::HttpClientTransport::new(&req.url);
329 if let Some(token) = &req.bearer_token {
330 transport = transport.bearer_token(token);
331 }
332
333 match proxy.add_backend(&req.name, transport).await {
334 Ok(()) => (
335 StatusCode::CREATED,
336 Json(BackendOpResponse {
337 ok: true,
338 message: format!("Backend '{}' added", req.name),
339 }),
340 ),
341 Err(e) => (
342 StatusCode::CONFLICT,
343 Json(BackendOpResponse {
344 ok: false,
345 message: format!("Failed to add backend: {e}"),
346 }),
347 ),
348 }
349}
350
351async fn handle_remove_backend(
352 Extension(proxy): Extension<McpProxy>,
353 Path(name): Path<String>,
354) -> (StatusCode, Json<BackendOpResponse>) {
355 if proxy.remove_backend(&name).await {
356 (
357 StatusCode::OK,
358 Json(BackendOpResponse {
359 ok: true,
360 message: format!("Backend '{}' removed", name),
361 }),
362 )
363 } else {
364 (
365 StatusCode::NOT_FOUND,
366 Json(BackendOpResponse {
367 ok: false,
368 message: format!("Backend '{}' not found", name),
369 }),
370 )
371 }
372}
373
374async fn handle_get_config(
375 Extension(config_toml): Extension<std::sync::Arc<String>>,
376) -> impl IntoResponse {
377 config_toml.as_str().to_string()
378}
379
380async fn handle_validate_config(body: String) -> (StatusCode, Json<BackendOpResponse>) {
381 match crate::config::ProxyConfig::parse(&body) {
382 Ok(config) => (
383 StatusCode::OK,
384 Json(BackendOpResponse {
385 ok: true,
386 message: format!("Valid config with {} backends", config.backends.len()),
387 }),
388 ),
389 Err(e) => (
390 StatusCode::BAD_REQUEST,
391 Json(BackendOpResponse {
392 ok: false,
393 message: format!("Invalid config: {e}"),
394 }),
395 ),
396 }
397}
398
399async fn handle_list_sessions(
400 Extension(session_handle): Extension<SessionHandle>,
401) -> Json<SessionsResponse> {
402 Json(SessionsResponse {
403 active_sessions: session_handle.session_count().await,
404 })
405}
406
407#[derive(Serialize)]
408#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
409struct SessionsResponse {
410 active_sessions: usize,
411}
412
413async fn handle_backend_health_history(
414 Extension(state): Extension<AdminState>,
415 Path(name): Path<String>,
416) -> Json<Vec<HealthEvent>> {
417 let history = state.health_history.read().await;
418 let filtered: Vec<HealthEvent> = history
419 .iter()
420 .filter(|e| {
421 e.namespace == name
422 || e.namespace == format!("{name}/")
423 || e.namespace.trim_end_matches('/') == name
424 })
425 .cloned()
426 .collect();
427 Json(filtered)
428}
429
430async fn handle_update_backend(
431 Extension(proxy): Extension<McpProxy>,
432 Path(name): Path<String>,
433 Json(req): Json<UpdateBackendRequest>,
434) -> (StatusCode, Json<BackendOpResponse>) {
435 if !proxy.remove_backend(&name).await {
437 return (
438 StatusCode::NOT_FOUND,
439 Json(BackendOpResponse {
440 ok: false,
441 message: format!("Backend '{name}' not found"),
442 }),
443 );
444 }
445
446 let mut transport = tower_mcp::client::HttpClientTransport::new(&req.url);
448 if let Some(token) = &req.bearer_token {
449 transport = transport.bearer_token(token);
450 }
451
452 match proxy.add_backend(&name, transport).await {
453 Ok(()) => (
454 StatusCode::OK,
455 Json(BackendOpResponse {
456 ok: true,
457 message: format!("Backend '{name}' updated"),
458 }),
459 ),
460 Err(e) => (
461 StatusCode::INTERNAL_SERVER_ERROR,
462 Json(BackendOpResponse {
463 ok: false,
464 message: format!("Failed to re-add backend after removal: {e}"),
465 }),
466 ),
467 }
468}
469
470#[derive(Debug, Deserialize)]
471#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
472struct UpdateBackendRequest {
473 url: String,
475 bearer_token: Option<String>,
477}
478
479async fn handle_single_backend_health(
480 Extension(state): Extension<AdminState>,
481 Path(name): Path<String>,
482) -> Result<Json<BackendStatus>, StatusCode> {
483 let backends = state.health.read().await;
484 backends
486 .iter()
487 .find(|b| {
488 b.namespace == name
489 || b.namespace == format!("{name}/")
490 || b.namespace.trim_end_matches('/') == name
491 })
492 .cloned()
493 .map(Json)
494 .ok_or(StatusCode::NOT_FOUND)
495}
496
497async fn handle_aggregate_stats(
498 Extension(state): Extension<AdminState>,
499 Extension(session_handle): Extension<SessionHandle>,
500) -> Json<AggregateStats> {
501 let backends = state.health.read().await;
502 let total = backends.len();
503 let healthy = backends.iter().filter(|b| b.healthy).count();
504 let active_sessions = session_handle.session_count().await;
505 Json(AggregateStats {
506 total_backends: total,
507 healthy_backends: healthy,
508 unhealthy_backends: total - healthy,
509 active_sessions,
510 })
511}
512
513#[derive(Serialize)]
514#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
515struct AggregateStats {
516 total_backends: usize,
517 healthy_backends: usize,
518 unhealthy_backends: usize,
519 active_sessions: usize,
520}
521
522async fn handle_list_sessions_detail(
523 Extension(session_handle): Extension<SessionHandle>,
524) -> Json<Vec<SessionInfoResponse>> {
525 let sessions = session_handle.list_sessions().await;
526 Json(
527 sessions
528 .into_iter()
529 .map(|s| SessionInfoResponse {
530 id: s.id,
531 uptime_seconds: s.created_at.as_secs(),
532 idle_seconds: s.last_activity.as_secs(),
533 })
534 .collect(),
535 )
536}
537
538#[derive(Serialize)]
539#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
540struct SessionInfoResponse {
541 id: String,
542 uptime_seconds: u64,
543 idle_seconds: u64,
544}
545
546async fn handle_terminate_session(
547 Extension(session_handle): Extension<SessionHandle>,
548 Path(id): Path<String>,
549) -> (StatusCode, Json<BackendOpResponse>) {
550 if session_handle.terminate_session(&id).await {
551 (
552 StatusCode::OK,
553 Json(BackendOpResponse {
554 ok: true,
555 message: format!("Session '{id}' terminated"),
556 }),
557 )
558 } else {
559 (
560 StatusCode::NOT_FOUND,
561 Json(BackendOpResponse {
562 ok: false,
563 message: format!("Session '{id}' not found"),
564 }),
565 )
566 }
567}
568
569#[cfg(feature = "openapi")]
573#[derive(utoipa::OpenApi)]
574#[openapi(
575 info(
576 title = "mcp-proxy Admin API",
577 description = "REST API for managing and monitoring the MCP proxy.",
578 version = "0.1.0",
579 ),
580 components(schemas(
581 AdminBackendsResponse,
582 ProxyInfo,
583 BackendStatus,
584 HealthResponse,
585 crate::cache::CacheStatsSnapshot,
586 SessionsResponse,
587 AddBackendRequest,
588 BackendOpResponse,
589 ))
590)]
591struct ApiDoc;
592
593#[cfg(feature = "openapi")]
594async fn handle_openapi() -> impl IntoResponse {
595 axum::Json(<ApiDoc as utoipa::OpenApi>::openapi())
596}
597
598pub fn admin_router(
600 state: AdminState,
601 metrics_handle: MetricsHandle,
602 session_handle: SessionHandle,
603 cache_handle: Option<crate::cache::CacheHandle>,
604 proxy: McpProxy,
605 config: &crate::config::ProxyConfig,
606) -> Router {
607 let config_toml = std::sync::Arc::new(toml::to_string_pretty(config).unwrap_or_default());
608
609 let router = Router::new()
610 .route("/backends", get(handle_backends))
612 .route("/health", get(handle_health))
613 .route("/cache/stats", get(handle_cache_stats))
614 .route("/cache/clear", axum::routing::post(handle_cache_clear))
615 .route("/metrics", get(handle_metrics))
616 .route("/sessions", get(handle_list_sessions))
617 .route("/sessions/detail", get(handle_list_sessions_detail))
618 .route("/sessions/{id}", delete(handle_terminate_session))
619 .route("/stats", get(handle_aggregate_stats))
620 .route("/config", get(handle_get_config))
621 .route("/config/validate", post(handle_validate_config))
622 .route("/backends/{name}/health", get(handle_single_backend_health))
624 .route(
625 "/backends/{name}/health/history",
626 get(handle_backend_health_history),
627 )
628 .route("/backends/add", post(handle_add_backend))
630 .route(
631 "/backends/{name}",
632 delete(handle_remove_backend).put(handle_update_backend),
633 )
634 .layer(Extension(state))
635 .layer(Extension(session_handle))
636 .layer(Extension(cache_handle))
637 .layer(Extension(proxy))
638 .layer(Extension(config_toml));
639
640 #[cfg(feature = "metrics")]
641 let router = router.layer(Extension(metrics_handle));
642 #[cfg(not(feature = "metrics"))]
643 let _ = metrics_handle;
644
645 #[cfg(feature = "openapi")]
646 let router = router.route("/openapi.json", get(handle_openapi));
647
648 router
649}
650
651#[cfg(test)]
652mod tests {
653 use super::*;
654 use axum::body::Body;
655 use axum::http::Request;
656 use tower::ServiceExt;
657
658 fn make_state(statuses: Vec<BackendStatus>) -> AdminState {
659 test_admin_state("test-gw", "1.0.0", statuses.len(), statuses)
660 }
661
662 fn healthy_backend(name: &str) -> BackendStatus {
663 BackendStatus {
664 namespace: name.to_string(),
665 healthy: true,
666 last_checked_at: Some(Utc::now()),
667 consecutive_failures: 0,
668 error: None,
669 transport: Some("http".to_string()),
670 }
671 }
672
673 fn unhealthy_backend(name: &str) -> BackendStatus {
674 BackendStatus {
675 namespace: name.to_string(),
676 healthy: false,
677 last_checked_at: Some(Utc::now()),
678 consecutive_failures: 3,
679 error: Some("ping failed".to_string()),
680 transport: Some("stdio".to_string()),
681 }
682 }
683
684 async fn make_test_proxy() -> McpProxy {
685 use tower_mcp::client::ChannelTransport;
686 use tower_mcp::{CallToolResult, McpRouter, ToolBuilder};
687
688 let router = McpRouter::new().server_info("test", "1.0.0").tool(
689 ToolBuilder::new("ping")
690 .description("Ping")
691 .handler(|_: tower_mcp::NoParams| async move { Ok(CallToolResult::text("pong")) })
692 .build(),
693 );
694
695 McpProxy::builder("test-proxy", "1.0.0")
696 .backend("test", ChannelTransport::new(router))
697 .await
698 .build_strict()
699 .await
700 .unwrap()
701 }
702
703 fn make_test_config() -> crate::config::ProxyConfig {
704 crate::config::ProxyConfig::parse(
705 r#"
706 [proxy]
707 name = "test"
708 [proxy.listen]
709
710 [[backends]]
711 name = "echo"
712 transport = "stdio"
713 command = "echo"
714 "#,
715 )
716 .unwrap()
717 }
718
719 fn make_session_handle() -> SessionHandle {
720 let svc = tower::util::BoxCloneService::new(tower::service_fn(
722 |_req: tower_mcp::RouterRequest| async {
723 Ok::<_, std::convert::Infallible>(tower_mcp::RouterResponse {
724 id: tower_mcp::protocol::RequestId::Number(1),
725 inner: Ok(tower_mcp::protocol::McpResponse::Pong(Default::default())),
726 })
727 },
728 ));
729 let (_, handle) =
730 tower_mcp::transport::http::HttpTransport::from_service(svc).into_router_with_handle();
731 handle
732 }
733
734 async fn get_json(router: &Router, path: &str) -> serde_json::Value {
735 let resp = router
736 .clone()
737 .oneshot(Request::builder().uri(path).body(Body::empty()).unwrap())
738 .await
739 .unwrap();
740
741 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
742 .await
743 .unwrap();
744 serde_json::from_slice(&body).unwrap()
745 }
746
747 #[tokio::test]
748 async fn test_admin_state_accessors() {
749 let state = make_state(vec![healthy_backend("db/")]);
750 assert_eq!(state.proxy_name(), "test-gw");
751 assert_eq!(state.proxy_version(), "1.0.0");
752 assert_eq!(state.backend_count(), 1);
753
754 let health = state.health().await;
755 assert_eq!(health.len(), 1);
756 assert!(health[0].healthy);
757 }
758
759 #[tokio::test]
760 async fn test_health_endpoint_all_healthy() {
761 let state = make_state(vec![healthy_backend("db/"), healthy_backend("api/")]);
762 let session_handle = make_session_handle();
763 let router = admin_router(
764 state,
765 None,
766 session_handle,
767 None,
768 make_test_proxy().await,
769 &make_test_config(),
770 );
771
772 let json = get_json(&router, "/health").await;
773 assert_eq!(json["status"], "healthy");
774 assert!(json["unhealthy_backends"].as_array().unwrap().is_empty());
775 }
776
777 #[tokio::test]
778 async fn test_health_endpoint_degraded() {
779 let state = make_state(vec![healthy_backend("db/"), unhealthy_backend("flaky/")]);
780 let session_handle = make_session_handle();
781 let router = admin_router(
782 state,
783 None,
784 session_handle,
785 None,
786 make_test_proxy().await,
787 &make_test_config(),
788 );
789
790 let json = get_json(&router, "/health").await;
791 assert_eq!(json["status"], "degraded");
792 let unhealthy = json["unhealthy_backends"].as_array().unwrap();
793 assert_eq!(unhealthy.len(), 1);
794 assert_eq!(unhealthy[0], "flaky/");
795 }
796
797 #[tokio::test]
798 async fn test_backends_endpoint() {
799 let state = make_state(vec![healthy_backend("db/")]);
800 let session_handle = make_session_handle();
801 let router = admin_router(
802 state,
803 None,
804 session_handle,
805 None,
806 make_test_proxy().await,
807 &make_test_config(),
808 );
809
810 let json = get_json(&router, "/backends").await;
811 assert_eq!(json["proxy"]["name"], "test-gw");
812 assert_eq!(json["proxy"]["version"], "1.0.0");
813 assert_eq!(json["proxy"]["backend_count"], 1);
814 assert_eq!(json["backends"].as_array().unwrap().len(), 1);
815 assert_eq!(json["backends"][0]["namespace"], "db/");
816 assert!(json["backends"][0]["healthy"].as_bool().unwrap());
817 }
818
819 #[tokio::test]
820 async fn test_cache_stats_no_cache() {
821 let state = make_state(vec![]);
822 let session_handle = make_session_handle();
823 let router = admin_router(
824 state,
825 None,
826 session_handle,
827 None,
828 make_test_proxy().await,
829 &make_test_config(),
830 );
831
832 let json = get_json(&router, "/cache/stats").await;
833 assert!(json.as_array().unwrap().is_empty());
834 }
835
836 #[tokio::test]
837 async fn test_cache_clear_no_cache() {
838 let state = make_state(vec![]);
839 let session_handle = make_session_handle();
840 let router = admin_router(
841 state,
842 None,
843 session_handle,
844 None,
845 make_test_proxy().await,
846 &make_test_config(),
847 );
848
849 let resp = router
850 .clone()
851 .oneshot(
852 Request::builder()
853 .method("POST")
854 .uri("/cache/clear")
855 .body(Body::empty())
856 .unwrap(),
857 )
858 .await
859 .unwrap();
860
861 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
862 .await
863 .unwrap();
864 assert_eq!(body.as_ref(), b"no caches configured");
865 }
866
867 #[tokio::test]
868 async fn test_metrics_endpoint_no_recorder() {
869 let state = make_state(vec![]);
870 let session_handle = make_session_handle();
871 let router = admin_router(
872 state,
873 None,
874 session_handle,
875 None,
876 make_test_proxy().await,
877 &make_test_config(),
878 );
879
880 let resp = router
881 .clone()
882 .oneshot(
883 Request::builder()
884 .uri("/metrics")
885 .body(Body::empty())
886 .unwrap(),
887 )
888 .await
889 .unwrap();
890
891 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
892 .await
893 .unwrap();
894 assert!(body.is_empty());
895 }
896
897 #[tokio::test]
898 async fn test_single_backend_health() {
899 let state = make_state(vec![healthy_backend("db/"), unhealthy_backend("flaky/")]);
900 let session_handle = make_session_handle();
901 let router = admin_router(
902 state,
903 None,
904 session_handle,
905 None,
906 make_test_proxy().await,
907 &make_test_config(),
908 );
909
910 let json = get_json(&router, "/backends/db/health").await;
911 assert_eq!(json["namespace"], "db/");
912 assert!(json["healthy"].as_bool().unwrap());
913 }
914
915 #[tokio::test]
916 async fn test_single_backend_health_not_found() {
917 let state = make_state(vec![healthy_backend("db/")]);
918 let session_handle = make_session_handle();
919 let router = admin_router(
920 state,
921 None,
922 session_handle,
923 None,
924 make_test_proxy().await,
925 &make_test_config(),
926 );
927
928 let resp = router
929 .clone()
930 .oneshot(
931 Request::builder()
932 .uri("/backends/nonexistent/health")
933 .body(Body::empty())
934 .unwrap(),
935 )
936 .await
937 .unwrap();
938 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
939 }
940
941 #[tokio::test]
942 async fn test_aggregate_stats() {
943 let state = make_state(vec![healthy_backend("db/"), unhealthy_backend("flaky/")]);
944 let session_handle = make_session_handle();
945 let router = admin_router(
946 state,
947 None,
948 session_handle,
949 None,
950 make_test_proxy().await,
951 &make_test_config(),
952 );
953
954 let json = get_json(&router, "/stats").await;
955 assert_eq!(json["total_backends"], 2);
956 assert_eq!(json["healthy_backends"], 1);
957 assert_eq!(json["unhealthy_backends"], 1);
958 }
959
960 #[tokio::test]
961 async fn test_health_history_empty() {
962 let state = make_state(vec![healthy_backend("db/")]);
963 let session_handle = make_session_handle();
964 let router = admin_router(
965 state,
966 None,
967 session_handle,
968 None,
969 make_test_proxy().await,
970 &make_test_config(),
971 );
972
973 let json = get_json(&router, "/backends/db/health/history").await;
974 assert!(json.as_array().unwrap().is_empty());
975 }
976
977 #[tokio::test]
978 async fn test_health_history_with_events() {
979 let state = make_state(vec![healthy_backend("db/")]);
980 {
982 let mut history = state.health_history.write().await;
983 history.push(HealthEvent {
984 namespace: "db/".to_string(),
985 healthy: true,
986 timestamp: Utc::now(),
987 });
988 history.push(HealthEvent {
989 namespace: "db/".to_string(),
990 healthy: false,
991 timestamp: Utc::now(),
992 });
993 history.push(HealthEvent {
994 namespace: "other/".to_string(),
995 healthy: false,
996 timestamp: Utc::now(),
997 });
998 }
999
1000 let session_handle = make_session_handle();
1001 let router = admin_router(
1002 state,
1003 None,
1004 session_handle,
1005 None,
1006 make_test_proxy().await,
1007 &make_test_config(),
1008 );
1009
1010 let json = get_json(&router, "/backends/db/health/history").await;
1012 let events = json.as_array().unwrap();
1013 assert_eq!(events.len(), 2);
1014 assert!(events[0]["healthy"].as_bool().unwrap());
1015 assert!(!events[1]["healthy"].as_bool().unwrap());
1016 }
1017}