1use std::collections::HashMap;
71use std::sync::Arc;
72use std::time::Duration;
73
74use axum::Router;
75use axum::extract::{Extension, Path};
76use axum::http::StatusCode;
77use axum::response::{IntoResponse, Json};
78use axum::routing::{delete, get, post};
79use chrono::{DateTime, Utc};
80use serde::{Deserialize, Serialize};
81use tokio::sync::RwLock;
82use tower_mcp::SessionHandle;
83use tower_mcp::proxy::McpProxy;
84
85#[derive(Clone)]
87pub struct AdminState {
88 health: Arc<RwLock<Vec<BackendStatus>>>,
89 health_history: Arc<RwLock<Vec<HealthEvent>>>,
90 proxy_name: String,
91 proxy_version: String,
92 backend_count: usize,
93}
94
95#[derive(Debug, Clone, Serialize)]
97#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
98pub struct HealthEvent {
99 pub namespace: String,
101 pub healthy: bool,
103 pub timestamp: DateTime<Utc>,
105}
106
107const MAX_HEALTH_HISTORY: usize = 100;
109
110impl AdminState {
111 pub async fn health(&self) -> Vec<BackendStatus> {
113 self.health.read().await.clone()
114 }
115
116 pub fn proxy_name(&self) -> &str {
118 &self.proxy_name
119 }
120
121 pub fn proxy_version(&self) -> &str {
123 &self.proxy_version
124 }
125
126 pub fn backend_count(&self) -> usize {
128 self.backend_count
129 }
130}
131
132#[derive(Serialize, Clone)]
134#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
135pub struct BackendStatus {
136 pub namespace: String,
138 pub healthy: bool,
140 pub last_checked_at: Option<DateTime<Utc>>,
142 pub consecutive_failures: u32,
144 pub error: Option<String>,
146 pub transport: Option<String>,
148}
149
150#[derive(Serialize)]
151#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
152struct AdminBackendsResponse {
153 proxy: ProxyInfo,
154 backends: Vec<BackendStatus>,
155}
156
157#[derive(Serialize)]
158#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
159struct ProxyInfo {
160 name: String,
161 version: String,
162 backend_count: usize,
163 active_sessions: usize,
164}
165
166#[derive(Clone)]
168pub struct BackendMeta {
169 pub transport: String,
171}
172
173pub fn spawn_health_checker(
176 proxy: McpProxy,
177 proxy_name: String,
178 proxy_version: String,
179 backend_count: usize,
180 backend_meta: HashMap<String, BackendMeta>,
181) -> AdminState {
182 let health: Arc<RwLock<Vec<BackendStatus>>> = Arc::new(RwLock::new(Vec::new()));
183 let health_writer = Arc::clone(&health);
184 let health_history: Arc<RwLock<Vec<HealthEvent>>> = Arc::new(RwLock::new(Vec::new()));
185 let history_writer = Arc::clone(&health_history);
186
187 std::thread::spawn(move || {
188 let rt = tokio::runtime::Builder::new_current_thread()
189 .enable_all()
190 .build()
191 .expect("admin health check runtime");
192
193 let mut failure_counts: HashMap<String, u32> = HashMap::new();
194 let mut prev_healthy: HashMap<String, bool> = HashMap::new();
196
197 rt.block_on(async move {
198 loop {
199 let results = proxy.health_check().await;
200 let now = Utc::now();
201 let mut transitions = Vec::new();
202
203 let statuses: Vec<BackendStatus> = results
204 .into_iter()
205 .map(|h| {
206 let count = failure_counts.entry(h.namespace.clone()).or_insert(0);
207 if h.healthy {
208 *count = 0;
209 } else {
210 *count += 1;
211 }
212
213 let prev = prev_healthy.get(&h.namespace).copied();
215 if prev != Some(h.healthy) {
216 transitions.push(HealthEvent {
217 namespace: h.namespace.clone(),
218 healthy: h.healthy,
219 timestamp: now,
220 });
221 prev_healthy.insert(h.namespace.clone(), h.healthy);
222 }
223
224 let meta = backend_meta.get(&h.namespace);
225 BackendStatus {
226 namespace: h.namespace,
227 healthy: h.healthy,
228 last_checked_at: Some(now),
229 consecutive_failures: *count,
230 error: if h.healthy {
231 None
232 } else {
233 Some("ping failed".to_string())
234 },
235 transport: meta.map(|m| m.transport.clone()),
236 }
237 })
238 .collect();
239
240 *health_writer.write().await = statuses;
241
242 if !transitions.is_empty() {
244 let mut history = history_writer.write().await;
245 history.extend(transitions);
246 if history.len() > MAX_HEALTH_HISTORY {
248 let excess = history.len() - MAX_HEALTH_HISTORY;
249 history.drain(..excess);
250 }
251 }
252
253 tokio::time::sleep(Duration::from_secs(10)).await;
254 }
255 });
256 });
257
258 AdminState {
259 health,
260 health_history,
261 proxy_name,
262 proxy_version,
263 backend_count,
264 }
265}
266
267async fn handle_backends(
268 Extension(state): Extension<AdminState>,
269 Extension(session_handle): Extension<SessionHandle>,
270) -> Json<AdminBackendsResponse> {
271 let backends = state.health.read().await.clone();
272 let active_sessions = session_handle.session_count().await;
273
274 Json(AdminBackendsResponse {
275 proxy: ProxyInfo {
276 name: state.proxy_name,
277 version: state.proxy_version,
278 backend_count: state.backend_count,
279 active_sessions,
280 },
281 backends,
282 })
283}
284
285async fn handle_health(Extension(state): Extension<AdminState>) -> Json<HealthResponse> {
286 let backends = state.health.read().await;
287 let all_healthy = backends.iter().all(|b| b.healthy);
288 let unhealthy: Vec<String> = backends
289 .iter()
290 .filter(|b| !b.healthy)
291 .map(|b| b.namespace.clone())
292 .collect();
293 Json(HealthResponse {
294 status: if all_healthy { "healthy" } else { "degraded" }.to_string(),
295 unhealthy_backends: unhealthy,
296 })
297}
298
299#[derive(Serialize)]
300#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
301struct HealthResponse {
302 status: String,
303 unhealthy_backends: Vec<String>,
304}
305
306#[cfg(feature = "metrics")]
307async fn handle_metrics(
308 Extension(handle): Extension<Option<metrics_exporter_prometheus::PrometheusHandle>>,
309) -> impl IntoResponse {
310 match handle {
311 Some(h) => h.render(),
312 None => String::new(),
313 }
314}
315
316#[cfg(not(feature = "metrics"))]
317async fn handle_metrics() -> impl IntoResponse {
318 String::new()
319}
320
321async fn handle_cache_stats(
322 Extension(cache_handle): Extension<Option<crate::cache::CacheHandle>>,
323) -> Json<Vec<crate::cache::CacheStatsSnapshot>> {
324 match cache_handle {
325 Some(h) => Json(h.stats().await),
326 None => Json(vec![]),
327 }
328}
329
330async fn handle_cache_clear(
331 Extension(cache_handle): Extension<Option<crate::cache::CacheHandle>>,
332) -> &'static str {
333 if let Some(h) = cache_handle {
334 h.clear().await;
335 "caches cleared"
336 } else {
337 "no caches configured"
338 }
339}
340
341#[cfg(test)]
343pub(crate) fn test_admin_state(
344 proxy_name: &str,
345 proxy_version: &str,
346 backend_count: usize,
347 statuses: Vec<BackendStatus>,
348) -> AdminState {
349 AdminState {
350 health: Arc::new(RwLock::new(statuses)),
351 health_history: Arc::new(RwLock::new(Vec::new())),
352 proxy_name: proxy_name.to_string(),
353 proxy_version: proxy_version.to_string(),
354 backend_count,
355 }
356}
357
358#[cfg(feature = "metrics")]
360pub type MetricsHandle = Option<metrics_exporter_prometheus::PrometheusHandle>;
361#[cfg(not(feature = "metrics"))]
363pub type MetricsHandle = Option<()>;
364
365#[derive(Debug, Deserialize)]
371#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
372struct AddBackendRequest {
373 name: String,
375 url: String,
377 bearer_token: Option<String>,
379}
380
381#[derive(Serialize)]
383#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
384struct BackendOpResponse {
385 ok: bool,
386 message: String,
387}
388
389async fn handle_add_backend(
390 Extension(proxy): Extension<McpProxy>,
391 Json(req): Json<AddBackendRequest>,
392) -> (StatusCode, Json<BackendOpResponse>) {
393 let mut transport = tower_mcp::client::HttpClientTransport::new(&req.url);
394 if let Some(token) = &req.bearer_token {
395 transport = transport.bearer_token(token);
396 }
397
398 match proxy.add_backend(&req.name, transport).await {
399 Ok(()) => (
400 StatusCode::CREATED,
401 Json(BackendOpResponse {
402 ok: true,
403 message: format!("Backend '{}' added", req.name),
404 }),
405 ),
406 Err(e) => (
407 StatusCode::CONFLICT,
408 Json(BackendOpResponse {
409 ok: false,
410 message: format!("Failed to add backend: {e}"),
411 }),
412 ),
413 }
414}
415
416async fn handle_remove_backend(
417 Extension(proxy): Extension<McpProxy>,
418 Path(name): Path<String>,
419) -> (StatusCode, Json<BackendOpResponse>) {
420 if proxy.remove_backend(&name).await {
421 (
422 StatusCode::OK,
423 Json(BackendOpResponse {
424 ok: true,
425 message: format!("Backend '{}' removed", name),
426 }),
427 )
428 } else {
429 (
430 StatusCode::NOT_FOUND,
431 Json(BackendOpResponse {
432 ok: false,
433 message: format!("Backend '{}' not found", name),
434 }),
435 )
436 }
437}
438
439async fn handle_get_config(
440 Extension(config_toml): Extension<std::sync::Arc<String>>,
441) -> impl IntoResponse {
442 config_toml.as_str().to_string()
443}
444
445async fn handle_validate_config(body: String) -> (StatusCode, Json<BackendOpResponse>) {
446 match crate::config::ProxyConfig::parse(&body) {
447 Ok(config) => (
448 StatusCode::OK,
449 Json(BackendOpResponse {
450 ok: true,
451 message: format!("Valid config with {} backends", config.backends.len()),
452 }),
453 ),
454 Err(e) => (
455 StatusCode::BAD_REQUEST,
456 Json(BackendOpResponse {
457 ok: false,
458 message: format!("Invalid config: {e}"),
459 }),
460 ),
461 }
462}
463
464async fn handle_list_sessions(
465 Extension(session_handle): Extension<SessionHandle>,
466) -> Json<SessionsResponse> {
467 Json(SessionsResponse {
468 active_sessions: session_handle.session_count().await,
469 })
470}
471
472#[derive(Serialize)]
473#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
474struct SessionsResponse {
475 active_sessions: usize,
476}
477
478async fn handle_backend_health_history(
479 Extension(state): Extension<AdminState>,
480 Path(name): Path<String>,
481) -> Json<Vec<HealthEvent>> {
482 let history = state.health_history.read().await;
483 let filtered: Vec<HealthEvent> = history
484 .iter()
485 .filter(|e| {
486 e.namespace == name
487 || e.namespace == format!("{name}/")
488 || e.namespace.trim_end_matches('/') == name
489 })
490 .cloned()
491 .collect();
492 Json(filtered)
493}
494
495async fn handle_update_backend(
496 Extension(proxy): Extension<McpProxy>,
497 Path(name): Path<String>,
498 Json(req): Json<UpdateBackendRequest>,
499) -> (StatusCode, Json<BackendOpResponse>) {
500 if !proxy.remove_backend(&name).await {
502 return (
503 StatusCode::NOT_FOUND,
504 Json(BackendOpResponse {
505 ok: false,
506 message: format!("Backend '{name}' not found"),
507 }),
508 );
509 }
510
511 let mut transport = tower_mcp::client::HttpClientTransport::new(&req.url);
513 if let Some(token) = &req.bearer_token {
514 transport = transport.bearer_token(token);
515 }
516
517 match proxy.add_backend(&name, transport).await {
518 Ok(()) => (
519 StatusCode::OK,
520 Json(BackendOpResponse {
521 ok: true,
522 message: format!("Backend '{name}' updated"),
523 }),
524 ),
525 Err(e) => (
526 StatusCode::INTERNAL_SERVER_ERROR,
527 Json(BackendOpResponse {
528 ok: false,
529 message: format!("Failed to re-add backend after removal: {e}"),
530 }),
531 ),
532 }
533}
534
535#[derive(Debug, Deserialize)]
536#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
537struct UpdateBackendRequest {
538 url: String,
540 bearer_token: Option<String>,
542}
543
544async fn handle_single_backend_health(
545 Extension(state): Extension<AdminState>,
546 Path(name): Path<String>,
547) -> Result<Json<BackendStatus>, StatusCode> {
548 let backends = state.health.read().await;
549 backends
551 .iter()
552 .find(|b| {
553 b.namespace == name
554 || b.namespace == format!("{name}/")
555 || b.namespace.trim_end_matches('/') == name
556 })
557 .cloned()
558 .map(Json)
559 .ok_or(StatusCode::NOT_FOUND)
560}
561
562async fn handle_aggregate_stats(
563 Extension(state): Extension<AdminState>,
564 Extension(session_handle): Extension<SessionHandle>,
565) -> Json<AggregateStats> {
566 let backends = state.health.read().await;
567 let total = backends.len();
568 let healthy = backends.iter().filter(|b| b.healthy).count();
569 let active_sessions = session_handle.session_count().await;
570 Json(AggregateStats {
571 total_backends: total,
572 healthy_backends: healthy,
573 unhealthy_backends: total - healthy,
574 active_sessions,
575 })
576}
577
578#[derive(Serialize)]
579#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
580struct AggregateStats {
581 total_backends: usize,
582 healthy_backends: usize,
583 unhealthy_backends: usize,
584 active_sessions: usize,
585}
586
587async fn handle_list_sessions_detail(
588 Extension(session_handle): Extension<SessionHandle>,
589) -> Json<Vec<SessionInfoResponse>> {
590 let sessions = session_handle.list_sessions().await;
591 Json(
592 sessions
593 .into_iter()
594 .map(|s| SessionInfoResponse {
595 id: s.id,
596 uptime_seconds: s.created_at.as_secs(),
597 idle_seconds: s.last_activity.as_secs(),
598 })
599 .collect(),
600 )
601}
602
603#[derive(Serialize)]
604#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
605struct SessionInfoResponse {
606 id: String,
607 uptime_seconds: u64,
608 idle_seconds: u64,
609}
610
611async fn handle_terminate_session(
612 Extension(session_handle): Extension<SessionHandle>,
613 Path(id): Path<String>,
614) -> (StatusCode, Json<BackendOpResponse>) {
615 if session_handle.terminate_session(&id).await {
616 (
617 StatusCode::OK,
618 Json(BackendOpResponse {
619 ok: true,
620 message: format!("Session '{id}' terminated"),
621 }),
622 )
623 } else {
624 (
625 StatusCode::NOT_FOUND,
626 Json(BackendOpResponse {
627 ok: false,
628 message: format!("Session '{id}' not found"),
629 }),
630 )
631 }
632}
633
634async fn handle_update_config(
635 Extension(config_path): Extension<Option<std::path::PathBuf>>,
636 body: String,
637) -> (StatusCode, Json<BackendOpResponse>) {
638 let is_yaml = config_path
640 .as_ref()
641 .and_then(|p| p.extension())
642 .is_some_and(|ext| ext == "yaml" || ext == "yml");
643
644 let config = if is_yaml {
645 #[cfg(feature = "yaml")]
646 {
647 crate::config::ProxyConfig::parse_yaml(&body)
648 }
649 #[cfg(not(feature = "yaml"))]
650 {
651 Err(anyhow::anyhow!("YAML support requires the 'yaml' feature"))
652 }
653 } else {
654 crate::config::ProxyConfig::parse(&body)
655 };
656
657 let config = match config {
658 Ok(c) => c,
659 Err(e) => {
660 return (
661 StatusCode::BAD_REQUEST,
662 Json(BackendOpResponse {
663 ok: false,
664 message: format!("Invalid config: {e}"),
665 }),
666 );
667 }
668 };
669
670 let Some(path) = config_path else {
672 return (
673 StatusCode::BAD_REQUEST,
674 Json(BackendOpResponse {
675 ok: false,
676 message: "No config file path available (running in --from-mcp-json mode?)"
677 .to_string(),
678 }),
679 );
680 };
681
682 if let Err(e) = std::fs::write(&path, &body) {
683 return (
684 StatusCode::INTERNAL_SERVER_ERROR,
685 Json(BackendOpResponse {
686 ok: false,
687 message: format!("Failed to write config: {e}"),
688 }),
689 );
690 }
691
692 (
693 StatusCode::OK,
694 Json(BackendOpResponse {
695 ok: true,
696 message: format!(
697 "Config updated ({} backends). Hot reload will apply changes if enabled.",
698 config.backends.len()
699 ),
700 }),
701 )
702}
703
704async fn handle_circuit_breakers(
705 Extension(handles): Extension<Arc<std::collections::HashMap<String, crate::proxy::CbHandle>>>,
706) -> Json<Vec<CircuitBreakerStatus>> {
707 let mut statuses = Vec::new();
708 for (name, handle) in handles.iter() {
709 statuses.push(CircuitBreakerStatus {
710 backend: name.clone(),
711 state: format!("{:?}", handle.state()),
712 health: handle.health_status().to_string(),
713 });
714 }
715 statuses.sort_by(|a, b| a.backend.cmp(&b.backend));
716 Json(statuses)
717}
718
719#[derive(Serialize)]
720#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
721struct CircuitBreakerStatus {
722 backend: String,
723 state: String,
724 health: String,
725}
726
727#[cfg(feature = "openapi")]
731#[derive(utoipa::OpenApi)]
732#[openapi(
733 info(
734 title = "mcp-proxy Admin API",
735 description = "REST API for managing and monitoring the MCP proxy.",
736 version = "0.1.0",
737 ),
738 components(schemas(
739 AdminBackendsResponse,
740 ProxyInfo,
741 BackendStatus,
742 HealthResponse,
743 crate::cache::CacheStatsSnapshot,
744 SessionsResponse,
745 AddBackendRequest,
746 BackendOpResponse,
747 ))
748)]
749struct ApiDoc;
750
751#[cfg(feature = "openapi")]
752async fn handle_openapi() -> impl IntoResponse {
753 axum::Json(<ApiDoc as utoipa::OpenApi>::openapi())
754}
755
756#[allow(clippy::too_many_arguments)]
758pub fn admin_router(
759 state: AdminState,
760 metrics_handle: MetricsHandle,
761 session_handle: SessionHandle,
762 cache_handle: Option<crate::cache::CacheHandle>,
763 proxy: McpProxy,
764 config: &crate::config::ProxyConfig,
765 config_path: Option<std::path::PathBuf>,
766 cb_handles: std::collections::HashMap<String, crate::proxy::CbHandle>,
767) -> Router {
768 let config_toml = std::sync::Arc::new(toml::to_string_pretty(config).unwrap_or_default());
769
770 let router = Router::new()
771 .route("/backends", get(handle_backends))
773 .route("/health", get(handle_health))
774 .route("/cache/stats", get(handle_cache_stats))
775 .route("/cache/clear", axum::routing::post(handle_cache_clear))
776 .route("/metrics", get(handle_metrics))
777 .route("/sessions", get(handle_list_sessions))
778 .route("/sessions/detail", get(handle_list_sessions_detail))
779 .route("/sessions/{id}", delete(handle_terminate_session))
780 .route("/stats", get(handle_aggregate_stats))
781 .route("/circuit-breakers", get(handle_circuit_breakers))
782 .route("/config", get(handle_get_config).put(handle_update_config))
783 .route("/config/validate", post(handle_validate_config))
784 .route("/backends/{name}/health", get(handle_single_backend_health))
786 .route(
787 "/backends/{name}/health/history",
788 get(handle_backend_health_history),
789 )
790 .route("/backends/add", post(handle_add_backend))
792 .route(
793 "/backends/{name}",
794 delete(handle_remove_backend).put(handle_update_backend),
795 )
796 .layer(Extension(state))
797 .layer(Extension(session_handle))
798 .layer(Extension(cache_handle))
799 .layer(Extension(proxy))
800 .layer(Extension(config_toml))
801 .layer(Extension(config_path))
802 .layer(Extension(Arc::new(cb_handles)));
803
804 #[cfg(feature = "metrics")]
805 let router = router.layer(Extension(metrics_handle));
806 #[cfg(not(feature = "metrics"))]
807 let _ = metrics_handle;
808
809 #[cfg(feature = "openapi")]
810 let router = router.route("/openapi.json", get(handle_openapi));
811
812 let admin_tokens = resolve_admin_tokens(config);
814 if !admin_tokens.is_empty() {
815 tracing::info!(token_count = admin_tokens.len(), "Admin API auth enabled");
816 let validator = tower_mcp::auth::StaticBearerValidator::new(admin_tokens);
817 let layer = tower_mcp::auth::AuthLayer::new(validator);
818 router.layer(layer)
819 } else {
820 router
821 }
822}
823
824fn resolve_admin_tokens(config: &crate::config::ProxyConfig) -> Vec<String> {
826 if let Some(ref token) = config.security.admin_token {
828 return vec![token.clone()];
829 }
830
831 match &config.auth {
833 Some(crate::config::AuthConfig::Bearer {
834 tokens,
835 scoped_tokens,
836 }) => {
837 let mut all: Vec<String> = tokens.clone();
838 all.extend(scoped_tokens.iter().map(|st| st.token.clone()));
839 all
840 }
841 _ => vec![],
842 }
843}
844
845#[cfg(test)]
846mod tests {
847 use super::*;
848 use axum::body::Body;
849 use axum::http::Request;
850 use tower::ServiceExt;
851
852 fn make_state(statuses: Vec<BackendStatus>) -> AdminState {
853 test_admin_state("test-gw", "1.0.0", statuses.len(), statuses)
854 }
855
856 fn healthy_backend(name: &str) -> BackendStatus {
857 BackendStatus {
858 namespace: name.to_string(),
859 healthy: true,
860 last_checked_at: Some(Utc::now()),
861 consecutive_failures: 0,
862 error: None,
863 transport: Some("http".to_string()),
864 }
865 }
866
867 fn unhealthy_backend(name: &str) -> BackendStatus {
868 BackendStatus {
869 namespace: name.to_string(),
870 healthy: false,
871 last_checked_at: Some(Utc::now()),
872 consecutive_failures: 3,
873 error: Some("ping failed".to_string()),
874 transport: Some("stdio".to_string()),
875 }
876 }
877
878 async fn make_test_proxy() -> McpProxy {
879 use tower_mcp::client::ChannelTransport;
880 use tower_mcp::{CallToolResult, McpRouter, ToolBuilder};
881
882 let router = McpRouter::new().server_info("test", "1.0.0").tool(
883 ToolBuilder::new("ping")
884 .description("Ping")
885 .handler(|_: tower_mcp::NoParams| async move { Ok(CallToolResult::text("pong")) })
886 .build(),
887 );
888
889 McpProxy::builder("test-proxy", "1.0.0")
890 .backend("test", ChannelTransport::new(router))
891 .await
892 .build_strict()
893 .await
894 .unwrap()
895 }
896
897 fn make_test_config() -> crate::config::ProxyConfig {
898 crate::config::ProxyConfig::parse(
899 r#"
900 [proxy]
901 name = "test"
902 [proxy.listen]
903
904 [[backends]]
905 name = "echo"
906 transport = "stdio"
907 command = "echo"
908 "#,
909 )
910 .unwrap()
911 }
912
913 fn make_session_handle() -> SessionHandle {
914 let svc = tower::util::BoxCloneService::new(tower::service_fn(
916 |_req: tower_mcp::RouterRequest| async {
917 Ok::<_, std::convert::Infallible>(tower_mcp::RouterResponse {
918 id: tower_mcp::protocol::RequestId::Number(1),
919 inner: Ok(tower_mcp::protocol::McpResponse::Pong(Default::default())),
920 })
921 },
922 ));
923 let (_, handle) =
924 tower_mcp::transport::http::HttpTransport::from_service(svc).into_router_with_handle();
925 handle
926 }
927
928 async fn get_json(router: &Router, path: &str) -> serde_json::Value {
929 let resp = router
930 .clone()
931 .oneshot(Request::builder().uri(path).body(Body::empty()).unwrap())
932 .await
933 .unwrap();
934
935 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
936 .await
937 .unwrap();
938 serde_json::from_slice(&body).unwrap()
939 }
940
941 #[tokio::test]
942 async fn test_admin_state_accessors() {
943 let state = make_state(vec![healthy_backend("db/")]);
944 assert_eq!(state.proxy_name(), "test-gw");
945 assert_eq!(state.proxy_version(), "1.0.0");
946 assert_eq!(state.backend_count(), 1);
947
948 let health = state.health().await;
949 assert_eq!(health.len(), 1);
950 assert!(health[0].healthy);
951 }
952
953 #[tokio::test]
954 async fn test_health_endpoint_all_healthy() {
955 let state = make_state(vec![healthy_backend("db/"), healthy_backend("api/")]);
956 let session_handle = make_session_handle();
957 let router = admin_router(
958 state,
959 None,
960 session_handle,
961 None,
962 make_test_proxy().await,
963 &make_test_config(),
964 None,
965 std::collections::HashMap::new(),
966 );
967
968 let json = get_json(&router, "/health").await;
969 assert_eq!(json["status"], "healthy");
970 assert!(json["unhealthy_backends"].as_array().unwrap().is_empty());
971 }
972
973 #[tokio::test]
974 async fn test_health_endpoint_degraded() {
975 let state = make_state(vec![healthy_backend("db/"), unhealthy_backend("flaky/")]);
976 let session_handle = make_session_handle();
977 let router = admin_router(
978 state,
979 None,
980 session_handle,
981 None,
982 make_test_proxy().await,
983 &make_test_config(),
984 None,
985 std::collections::HashMap::new(),
986 );
987
988 let json = get_json(&router, "/health").await;
989 assert_eq!(json["status"], "degraded");
990 let unhealthy = json["unhealthy_backends"].as_array().unwrap();
991 assert_eq!(unhealthy.len(), 1);
992 assert_eq!(unhealthy[0], "flaky/");
993 }
994
995 #[tokio::test]
996 async fn test_backends_endpoint() {
997 let state = make_state(vec![healthy_backend("db/")]);
998 let session_handle = make_session_handle();
999 let router = admin_router(
1000 state,
1001 None,
1002 session_handle,
1003 None,
1004 make_test_proxy().await,
1005 &make_test_config(),
1006 None,
1007 std::collections::HashMap::new(),
1008 );
1009
1010 let json = get_json(&router, "/backends").await;
1011 assert_eq!(json["proxy"]["name"], "test-gw");
1012 assert_eq!(json["proxy"]["version"], "1.0.0");
1013 assert_eq!(json["proxy"]["backend_count"], 1);
1014 assert_eq!(json["backends"].as_array().unwrap().len(), 1);
1015 assert_eq!(json["backends"][0]["namespace"], "db/");
1016 assert!(json["backends"][0]["healthy"].as_bool().unwrap());
1017 }
1018
1019 #[tokio::test]
1020 async fn test_cache_stats_no_cache() {
1021 let state = make_state(vec![]);
1022 let session_handle = make_session_handle();
1023 let router = admin_router(
1024 state,
1025 None,
1026 session_handle,
1027 None,
1028 make_test_proxy().await,
1029 &make_test_config(),
1030 None,
1031 std::collections::HashMap::new(),
1032 );
1033
1034 let json = get_json(&router, "/cache/stats").await;
1035 assert!(json.as_array().unwrap().is_empty());
1036 }
1037
1038 #[tokio::test]
1039 async fn test_cache_clear_no_cache() {
1040 let state = make_state(vec![]);
1041 let session_handle = make_session_handle();
1042 let router = admin_router(
1043 state,
1044 None,
1045 session_handle,
1046 None,
1047 make_test_proxy().await,
1048 &make_test_config(),
1049 None,
1050 std::collections::HashMap::new(),
1051 );
1052
1053 let resp = router
1054 .clone()
1055 .oneshot(
1056 Request::builder()
1057 .method("POST")
1058 .uri("/cache/clear")
1059 .body(Body::empty())
1060 .unwrap(),
1061 )
1062 .await
1063 .unwrap();
1064
1065 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
1066 .await
1067 .unwrap();
1068 assert_eq!(body.as_ref(), b"no caches configured");
1069 }
1070
1071 #[tokio::test]
1072 async fn test_metrics_endpoint_no_recorder() {
1073 let state = make_state(vec![]);
1074 let session_handle = make_session_handle();
1075 let router = admin_router(
1076 state,
1077 None,
1078 session_handle,
1079 None,
1080 make_test_proxy().await,
1081 &make_test_config(),
1082 None,
1083 std::collections::HashMap::new(),
1084 );
1085
1086 let resp = router
1087 .clone()
1088 .oneshot(
1089 Request::builder()
1090 .uri("/metrics")
1091 .body(Body::empty())
1092 .unwrap(),
1093 )
1094 .await
1095 .unwrap();
1096
1097 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
1098 .await
1099 .unwrap();
1100 assert!(body.is_empty());
1101 }
1102
1103 #[tokio::test]
1104 async fn test_single_backend_health() {
1105 let state = make_state(vec![healthy_backend("db/"), unhealthy_backend("flaky/")]);
1106 let session_handle = make_session_handle();
1107 let router = admin_router(
1108 state,
1109 None,
1110 session_handle,
1111 None,
1112 make_test_proxy().await,
1113 &make_test_config(),
1114 None,
1115 std::collections::HashMap::new(),
1116 );
1117
1118 let json = get_json(&router, "/backends/db/health").await;
1119 assert_eq!(json["namespace"], "db/");
1120 assert!(json["healthy"].as_bool().unwrap());
1121 }
1122
1123 #[tokio::test]
1124 async fn test_single_backend_health_not_found() {
1125 let state = make_state(vec![healthy_backend("db/")]);
1126 let session_handle = make_session_handle();
1127 let router = admin_router(
1128 state,
1129 None,
1130 session_handle,
1131 None,
1132 make_test_proxy().await,
1133 &make_test_config(),
1134 None,
1135 std::collections::HashMap::new(),
1136 );
1137
1138 let resp = router
1139 .clone()
1140 .oneshot(
1141 Request::builder()
1142 .uri("/backends/nonexistent/health")
1143 .body(Body::empty())
1144 .unwrap(),
1145 )
1146 .await
1147 .unwrap();
1148 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
1149 }
1150
1151 #[tokio::test]
1152 async fn test_aggregate_stats() {
1153 let state = make_state(vec![healthy_backend("db/"), unhealthy_backend("flaky/")]);
1154 let session_handle = make_session_handle();
1155 let router = admin_router(
1156 state,
1157 None,
1158 session_handle,
1159 None,
1160 make_test_proxy().await,
1161 &make_test_config(),
1162 None,
1163 std::collections::HashMap::new(),
1164 );
1165
1166 let json = get_json(&router, "/stats").await;
1167 assert_eq!(json["total_backends"], 2);
1168 assert_eq!(json["healthy_backends"], 1);
1169 assert_eq!(json["unhealthy_backends"], 1);
1170 }
1171
1172 #[tokio::test]
1173 async fn test_health_history_empty() {
1174 let state = make_state(vec![healthy_backend("db/")]);
1175 let session_handle = make_session_handle();
1176 let router = admin_router(
1177 state,
1178 None,
1179 session_handle,
1180 None,
1181 make_test_proxy().await,
1182 &make_test_config(),
1183 None,
1184 std::collections::HashMap::new(),
1185 );
1186
1187 let json = get_json(&router, "/backends/db/health/history").await;
1188 assert!(json.as_array().unwrap().is_empty());
1189 }
1190
1191 #[tokio::test]
1192 async fn test_health_history_with_events() {
1193 let state = make_state(vec![healthy_backend("db/")]);
1194 {
1196 let mut history = state.health_history.write().await;
1197 history.push(HealthEvent {
1198 namespace: "db/".to_string(),
1199 healthy: true,
1200 timestamp: Utc::now(),
1201 });
1202 history.push(HealthEvent {
1203 namespace: "db/".to_string(),
1204 healthy: false,
1205 timestamp: Utc::now(),
1206 });
1207 history.push(HealthEvent {
1208 namespace: "other/".to_string(),
1209 healthy: false,
1210 timestamp: Utc::now(),
1211 });
1212 }
1213
1214 let session_handle = make_session_handle();
1215 let router = admin_router(
1216 state,
1217 None,
1218 session_handle,
1219 None,
1220 make_test_proxy().await,
1221 &make_test_config(),
1222 None,
1223 std::collections::HashMap::new(),
1224 );
1225
1226 let json = get_json(&router, "/backends/db/health/history").await;
1228 let events = json.as_array().unwrap();
1229 assert_eq!(events.len(), 2);
1230 assert!(events[0]["healthy"].as_bool().unwrap());
1231 assert!(!events[1]["healthy"].as_bool().unwrap());
1232 }
1233
1234 #[tokio::test]
1235 async fn test_sessions_endpoint() {
1236 let state = make_state(vec![]);
1237 let session_handle = make_session_handle();
1238 let router = admin_router(
1239 state,
1240 None,
1241 session_handle,
1242 None,
1243 make_test_proxy().await,
1244 &make_test_config(),
1245 None,
1246 std::collections::HashMap::new(),
1247 );
1248
1249 let json = get_json(&router, "/sessions").await;
1250 assert_eq!(json["active_sessions"], 0);
1251 }
1252
1253 #[tokio::test]
1254 async fn test_sessions_detail_empty() {
1255 let state = make_state(vec![]);
1256 let session_handle = make_session_handle();
1257 let router = admin_router(
1258 state,
1259 None,
1260 session_handle,
1261 None,
1262 make_test_proxy().await,
1263 &make_test_config(),
1264 None,
1265 std::collections::HashMap::new(),
1266 );
1267
1268 let json = get_json(&router, "/sessions/detail").await;
1269 let sessions = json.as_array().unwrap();
1270 assert!(sessions.is_empty());
1271 }
1272
1273 #[tokio::test]
1274 async fn test_terminate_session_not_found() {
1275 let state = make_state(vec![]);
1276 let session_handle = make_session_handle();
1277 let router = admin_router(
1278 state,
1279 None,
1280 session_handle,
1281 None,
1282 make_test_proxy().await,
1283 &make_test_config(),
1284 None,
1285 std::collections::HashMap::new(),
1286 );
1287
1288 let resp = router
1289 .clone()
1290 .oneshot(
1291 Request::builder()
1292 .method("DELETE")
1293 .uri("/sessions/nonexistent-id")
1294 .body(Body::empty())
1295 .unwrap(),
1296 )
1297 .await
1298 .unwrap();
1299 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
1300
1301 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
1302 .await
1303 .unwrap();
1304 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
1305 assert!(!json["ok"].as_bool().unwrap());
1306 assert!(json["message"].as_str().unwrap().contains("not found"));
1307 }
1308
1309 #[tokio::test]
1310 async fn test_sessions_detail_with_active_session() {
1311 let svc = tower::util::BoxCloneService::new(tower::service_fn(
1313 |_req: tower_mcp::RouterRequest| async {
1314 Ok::<_, std::convert::Infallible>(tower_mcp::RouterResponse {
1315 id: tower_mcp::protocol::RequestId::Number(1),
1316 inner: Ok(tower_mcp::protocol::McpResponse::Initialize(
1317 tower_mcp::protocol::InitializeResult {
1318 protocol_version: "2025-03-26".to_string(),
1319 server_info: tower_mcp::protocol::Implementation {
1320 name: "test".to_string(),
1321 version: "1.0.0".to_string(),
1322 ..Default::default()
1323 },
1324 capabilities: Default::default(),
1325 instructions: None,
1326 meta: None,
1327 },
1328 )),
1329 })
1330 },
1331 ));
1332
1333 let (http_router, session_handle) =
1334 tower_mcp::transport::http::HttpTransport::from_service(svc).into_router_with_handle();
1335
1336 let init_body = serde_json::json!({
1338 "jsonrpc": "2.0",
1339 "id": 1,
1340 "method": "initialize",
1341 "params": {
1342 "protocolVersion": "2025-03-26",
1343 "clientInfo": { "name": "test", "version": "1.0.0" },
1344 "capabilities": {}
1345 }
1346 });
1347
1348 let resp = http_router
1349 .clone()
1350 .oneshot(
1351 Request::builder()
1352 .method("POST")
1353 .uri("/")
1354 .header("Content-Type", "application/json")
1355 .body(Body::from(serde_json::to_string(&init_body).unwrap()))
1356 .unwrap(),
1357 )
1358 .await
1359 .unwrap();
1360 assert_eq!(resp.status(), 200);
1361
1362 let session_id = resp
1364 .headers()
1365 .get("mcp-session-id")
1366 .expect("initialize response should include mcp-session-id header")
1367 .to_str()
1368 .unwrap()
1369 .to_string();
1370
1371 let state = make_state(vec![]);
1373 let admin = admin_router(
1374 state,
1375 None,
1376 session_handle.clone(),
1377 None,
1378 make_test_proxy().await,
1379 &make_test_config(),
1380 None,
1381 std::collections::HashMap::new(),
1382 );
1383
1384 let json = get_json(&admin, "/sessions").await;
1386 assert_eq!(json["active_sessions"], 1);
1387
1388 let json = get_json(&admin, "/sessions/detail").await;
1390 let sessions = json.as_array().unwrap();
1391 assert_eq!(sessions.len(), 1);
1392 assert!(!sessions[0]["id"].as_str().unwrap().is_empty());
1393 assert!(sessions[0]["uptime_seconds"].is_number());
1394 assert!(sessions[0]["idle_seconds"].is_number());
1395
1396 let resp = admin
1398 .clone()
1399 .oneshot(
1400 Request::builder()
1401 .method("DELETE")
1402 .uri(format!("/sessions/{}", session_id))
1403 .body(Body::empty())
1404 .unwrap(),
1405 )
1406 .await
1407 .unwrap();
1408 assert_eq!(resp.status(), StatusCode::OK);
1409
1410 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
1411 .await
1412 .unwrap();
1413 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
1414 assert!(json["ok"].as_bool().unwrap());
1415
1416 let json = get_json(&admin, "/sessions").await;
1418 assert_eq!(json["active_sessions"], 0);
1419
1420 let resp = admin
1422 .clone()
1423 .oneshot(
1424 Request::builder()
1425 .method("DELETE")
1426 .uri(format!("/sessions/{}", session_id))
1427 .body(Body::empty())
1428 .unwrap(),
1429 )
1430 .await
1431 .unwrap();
1432 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
1433 }
1434}