1use std::collections::HashMap;
66use std::sync::Arc;
67use std::time::Duration;
68
69use axum::Router;
70use axum::extract::{Extension, Path};
71use axum::http::StatusCode;
72use axum::response::{IntoResponse, Json};
73use axum::routing::{delete, get, post};
74use chrono::{DateTime, Utc};
75use serde::{Deserialize, Serialize};
76use tokio::sync::RwLock;
77use tower_mcp::SessionHandle;
78use tower_mcp::proxy::McpProxy;
79
80#[derive(Clone)]
82pub struct AdminState {
83 health: Arc<RwLock<Vec<BackendStatus>>>,
84 health_history: Arc<RwLock<Vec<HealthEvent>>>,
85 proxy_name: String,
86 proxy_version: String,
87 backend_count: usize,
88}
89
90#[derive(Debug, Clone, Serialize)]
92#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
93pub struct HealthEvent {
94 pub namespace: String,
96 pub healthy: bool,
98 pub timestamp: DateTime<Utc>,
100}
101
102const MAX_HEALTH_HISTORY: usize = 100;
104
105impl AdminState {
106 pub async fn health(&self) -> Vec<BackendStatus> {
108 self.health.read().await.clone()
109 }
110
111 pub fn proxy_name(&self) -> &str {
113 &self.proxy_name
114 }
115
116 pub fn proxy_version(&self) -> &str {
118 &self.proxy_version
119 }
120
121 pub fn backend_count(&self) -> usize {
123 self.backend_count
124 }
125}
126
127#[derive(Serialize, Clone)]
129#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
130pub struct BackendStatus {
131 pub namespace: String,
133 pub healthy: bool,
135 pub last_checked_at: Option<DateTime<Utc>>,
137 pub consecutive_failures: u32,
139 pub error: Option<String>,
141 pub transport: Option<String>,
143}
144
145#[derive(Serialize)]
146#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
147struct AdminBackendsResponse {
148 proxy: ProxyInfo,
149 backends: Vec<BackendStatus>,
150}
151
152#[derive(Serialize)]
153#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
154struct ProxyInfo {
155 name: String,
156 version: String,
157 backend_count: usize,
158 active_sessions: usize,
159}
160
161#[derive(Clone)]
163pub struct BackendMeta {
164 pub transport: String,
166}
167
168pub fn spawn_health_checker(
171 proxy: McpProxy,
172 proxy_name: String,
173 proxy_version: String,
174 backend_count: usize,
175 backend_meta: HashMap<String, BackendMeta>,
176) -> AdminState {
177 let health: Arc<RwLock<Vec<BackendStatus>>> = Arc::new(RwLock::new(Vec::new()));
178 let health_writer = Arc::clone(&health);
179 let health_history: Arc<RwLock<Vec<HealthEvent>>> = Arc::new(RwLock::new(Vec::new()));
180 let history_writer = Arc::clone(&health_history);
181
182 std::thread::spawn(move || {
183 let rt = tokio::runtime::Builder::new_current_thread()
184 .enable_all()
185 .build()
186 .expect("admin health check runtime");
187
188 let mut failure_counts: HashMap<String, u32> = HashMap::new();
189 let mut prev_healthy: HashMap<String, bool> = HashMap::new();
191
192 rt.block_on(async move {
193 loop {
194 let results = proxy.health_check().await;
195 let now = Utc::now();
196 let mut transitions = Vec::new();
197
198 let statuses: Vec<BackendStatus> = results
199 .into_iter()
200 .map(|h| {
201 let count = failure_counts.entry(h.namespace.clone()).or_insert(0);
202 if h.healthy {
203 *count = 0;
204 } else {
205 *count += 1;
206 }
207
208 let prev = prev_healthy.get(&h.namespace).copied();
210 if prev != Some(h.healthy) {
211 transitions.push(HealthEvent {
212 namespace: h.namespace.clone(),
213 healthy: h.healthy,
214 timestamp: now,
215 });
216 prev_healthy.insert(h.namespace.clone(), h.healthy);
217 }
218
219 let meta = backend_meta.get(&h.namespace);
220 BackendStatus {
221 namespace: h.namespace,
222 healthy: h.healthy,
223 last_checked_at: Some(now),
224 consecutive_failures: *count,
225 error: if h.healthy {
226 None
227 } else {
228 Some("ping failed".to_string())
229 },
230 transport: meta.map(|m| m.transport.clone()),
231 }
232 })
233 .collect();
234
235 *health_writer.write().await = statuses;
236
237 if !transitions.is_empty() {
239 let mut history = history_writer.write().await;
240 history.extend(transitions);
241 if history.len() > MAX_HEALTH_HISTORY {
243 let excess = history.len() - MAX_HEALTH_HISTORY;
244 history.drain(..excess);
245 }
246 }
247
248 tokio::time::sleep(Duration::from_secs(10)).await;
249 }
250 });
251 });
252
253 AdminState {
254 health,
255 health_history,
256 proxy_name,
257 proxy_version,
258 backend_count,
259 }
260}
261
262async fn handle_backends(
263 Extension(state): Extension<AdminState>,
264 Extension(session_handle): Extension<SessionHandle>,
265) -> Json<AdminBackendsResponse> {
266 let backends = state.health.read().await.clone();
267 let active_sessions = session_handle.session_count().await;
268
269 Json(AdminBackendsResponse {
270 proxy: ProxyInfo {
271 name: state.proxy_name,
272 version: state.proxy_version,
273 backend_count: state.backend_count,
274 active_sessions,
275 },
276 backends,
277 })
278}
279
280async fn handle_health(Extension(state): Extension<AdminState>) -> Json<HealthResponse> {
281 let backends = state.health.read().await;
282 let all_healthy = backends.iter().all(|b| b.healthy);
283 let unhealthy: Vec<String> = backends
284 .iter()
285 .filter(|b| !b.healthy)
286 .map(|b| b.namespace.clone())
287 .collect();
288 Json(HealthResponse {
289 status: if all_healthy { "healthy" } else { "degraded" }.to_string(),
290 unhealthy_backends: unhealthy,
291 })
292}
293
294#[derive(Serialize)]
295#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
296struct HealthResponse {
297 status: String,
298 unhealthy_backends: Vec<String>,
299}
300
301#[cfg(feature = "metrics")]
302async fn handle_metrics(
303 Extension(handle): Extension<Option<metrics_exporter_prometheus::PrometheusHandle>>,
304) -> impl IntoResponse {
305 match handle {
306 Some(h) => h.render(),
307 None => String::new(),
308 }
309}
310
311#[cfg(not(feature = "metrics"))]
312async fn handle_metrics() -> impl IntoResponse {
313 String::new()
314}
315
316async fn handle_cache_stats(
317 Extension(cache_handle): Extension<Option<crate::cache::CacheHandle>>,
318) -> Json<Vec<crate::cache::CacheStatsSnapshot>> {
319 match cache_handle {
320 Some(h) => Json(h.stats().await),
321 None => Json(vec![]),
322 }
323}
324
325async fn handle_cache_clear(
326 Extension(cache_handle): Extension<Option<crate::cache::CacheHandle>>,
327) -> &'static str {
328 if let Some(h) = cache_handle {
329 h.clear().await;
330 "caches cleared"
331 } else {
332 "no caches configured"
333 }
334}
335
336#[cfg(test)]
338pub(crate) fn test_admin_state(
339 proxy_name: &str,
340 proxy_version: &str,
341 backend_count: usize,
342 statuses: Vec<BackendStatus>,
343) -> AdminState {
344 AdminState {
345 health: Arc::new(RwLock::new(statuses)),
346 health_history: Arc::new(RwLock::new(Vec::new())),
347 proxy_name: proxy_name.to_string(),
348 proxy_version: proxy_version.to_string(),
349 backend_count,
350 }
351}
352
353#[cfg(feature = "metrics")]
355pub type MetricsHandle = Option<metrics_exporter_prometheus::PrometheusHandle>;
356#[cfg(not(feature = "metrics"))]
358pub type MetricsHandle = Option<()>;
359
360#[derive(Debug, Deserialize)]
366#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
367struct AddBackendRequest {
368 name: String,
370 url: String,
372 bearer_token: Option<String>,
374}
375
376#[derive(Serialize)]
378#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
379struct BackendOpResponse {
380 ok: bool,
381 message: String,
382}
383
384async fn handle_add_backend(
385 Extension(proxy): Extension<McpProxy>,
386 Json(req): Json<AddBackendRequest>,
387) -> (StatusCode, Json<BackendOpResponse>) {
388 let mut transport = tower_mcp::client::HttpClientTransport::new(&req.url);
389 if let Some(token) = &req.bearer_token {
390 transport = transport.bearer_token(token);
391 }
392
393 match proxy.add_backend(&req.name, transport).await {
394 Ok(()) => (
395 StatusCode::CREATED,
396 Json(BackendOpResponse {
397 ok: true,
398 message: format!("Backend '{}' added", req.name),
399 }),
400 ),
401 Err(e) => (
402 StatusCode::CONFLICT,
403 Json(BackendOpResponse {
404 ok: false,
405 message: format!("Failed to add backend: {e}"),
406 }),
407 ),
408 }
409}
410
411async fn handle_remove_backend(
412 Extension(proxy): Extension<McpProxy>,
413 Path(name): Path<String>,
414) -> (StatusCode, Json<BackendOpResponse>) {
415 if proxy.remove_backend(&name).await {
416 (
417 StatusCode::OK,
418 Json(BackendOpResponse {
419 ok: true,
420 message: format!("Backend '{}' removed", name),
421 }),
422 )
423 } else {
424 (
425 StatusCode::NOT_FOUND,
426 Json(BackendOpResponse {
427 ok: false,
428 message: format!("Backend '{}' not found", name),
429 }),
430 )
431 }
432}
433
434async fn handle_get_config(
435 Extension(config_toml): Extension<std::sync::Arc<String>>,
436) -> impl IntoResponse {
437 config_toml.as_str().to_string()
438}
439
440async fn handle_validate_config(body: String) -> (StatusCode, Json<BackendOpResponse>) {
441 match crate::config::ProxyConfig::parse(&body) {
442 Ok(config) => (
443 StatusCode::OK,
444 Json(BackendOpResponse {
445 ok: true,
446 message: format!("Valid config with {} backends", config.backends.len()),
447 }),
448 ),
449 Err(e) => (
450 StatusCode::BAD_REQUEST,
451 Json(BackendOpResponse {
452 ok: false,
453 message: format!("Invalid config: {e}"),
454 }),
455 ),
456 }
457}
458
459async fn handle_list_sessions(
460 Extension(session_handle): Extension<SessionHandle>,
461) -> Json<SessionsResponse> {
462 Json(SessionsResponse {
463 active_sessions: session_handle.session_count().await,
464 })
465}
466
467#[derive(Serialize)]
468#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
469struct SessionsResponse {
470 active_sessions: usize,
471}
472
473async fn handle_backend_health_history(
474 Extension(state): Extension<AdminState>,
475 Path(name): Path<String>,
476) -> Json<Vec<HealthEvent>> {
477 let history = state.health_history.read().await;
478 let filtered: Vec<HealthEvent> = history
479 .iter()
480 .filter(|e| {
481 e.namespace == name
482 || e.namespace == format!("{name}/")
483 || e.namespace.trim_end_matches('/') == name
484 })
485 .cloned()
486 .collect();
487 Json(filtered)
488}
489
490async fn handle_update_backend(
491 Extension(proxy): Extension<McpProxy>,
492 Path(name): Path<String>,
493 Json(req): Json<UpdateBackendRequest>,
494) -> (StatusCode, Json<BackendOpResponse>) {
495 if !proxy.remove_backend(&name).await {
497 return (
498 StatusCode::NOT_FOUND,
499 Json(BackendOpResponse {
500 ok: false,
501 message: format!("Backend '{name}' not found"),
502 }),
503 );
504 }
505
506 let mut transport = tower_mcp::client::HttpClientTransport::new(&req.url);
508 if let Some(token) = &req.bearer_token {
509 transport = transport.bearer_token(token);
510 }
511
512 match proxy.add_backend(&name, transport).await {
513 Ok(()) => (
514 StatusCode::OK,
515 Json(BackendOpResponse {
516 ok: true,
517 message: format!("Backend '{name}' updated"),
518 }),
519 ),
520 Err(e) => (
521 StatusCode::INTERNAL_SERVER_ERROR,
522 Json(BackendOpResponse {
523 ok: false,
524 message: format!("Failed to re-add backend after removal: {e}"),
525 }),
526 ),
527 }
528}
529
530#[derive(Debug, Deserialize)]
531#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
532struct UpdateBackendRequest {
533 url: String,
535 bearer_token: Option<String>,
537}
538
539async fn handle_single_backend_health(
540 Extension(state): Extension<AdminState>,
541 Path(name): Path<String>,
542) -> Result<Json<BackendStatus>, StatusCode> {
543 let backends = state.health.read().await;
544 backends
546 .iter()
547 .find(|b| {
548 b.namespace == name
549 || b.namespace == format!("{name}/")
550 || b.namespace.trim_end_matches('/') == name
551 })
552 .cloned()
553 .map(Json)
554 .ok_or(StatusCode::NOT_FOUND)
555}
556
557async fn handle_aggregate_stats(
558 Extension(state): Extension<AdminState>,
559 Extension(session_handle): Extension<SessionHandle>,
560) -> Json<AggregateStats> {
561 let backends = state.health.read().await;
562 let total = backends.len();
563 let healthy = backends.iter().filter(|b| b.healthy).count();
564 let active_sessions = session_handle.session_count().await;
565 Json(AggregateStats {
566 total_backends: total,
567 healthy_backends: healthy,
568 unhealthy_backends: total - healthy,
569 active_sessions,
570 })
571}
572
573#[derive(Serialize)]
574#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
575struct AggregateStats {
576 total_backends: usize,
577 healthy_backends: usize,
578 unhealthy_backends: usize,
579 active_sessions: usize,
580}
581
582async fn handle_list_sessions_detail(
583 Extension(session_handle): Extension<SessionHandle>,
584) -> Json<Vec<SessionInfoResponse>> {
585 let sessions = session_handle.list_sessions().await;
586 Json(
587 sessions
588 .into_iter()
589 .map(|s| SessionInfoResponse {
590 id: s.id,
591 uptime_seconds: s.created_at.as_secs(),
592 idle_seconds: s.last_activity.as_secs(),
593 })
594 .collect(),
595 )
596}
597
598#[derive(Serialize)]
599#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
600struct SessionInfoResponse {
601 id: String,
602 uptime_seconds: u64,
603 idle_seconds: u64,
604}
605
606async fn handle_terminate_session(
607 Extension(session_handle): Extension<SessionHandle>,
608 Path(id): Path<String>,
609) -> (StatusCode, Json<BackendOpResponse>) {
610 if session_handle.terminate_session(&id).await {
611 (
612 StatusCode::OK,
613 Json(BackendOpResponse {
614 ok: true,
615 message: format!("Session '{id}' terminated"),
616 }),
617 )
618 } else {
619 (
620 StatusCode::NOT_FOUND,
621 Json(BackendOpResponse {
622 ok: false,
623 message: format!("Session '{id}' not found"),
624 }),
625 )
626 }
627}
628
629async fn handle_update_config(
630 Extension(config_path): Extension<Option<std::path::PathBuf>>,
631 body: String,
632) -> (StatusCode, Json<BackendOpResponse>) {
633 let is_yaml = config_path
635 .as_ref()
636 .and_then(|p| p.extension())
637 .is_some_and(|ext| ext == "yaml" || ext == "yml");
638
639 let config = if is_yaml {
640 #[cfg(feature = "yaml")]
641 {
642 crate::config::ProxyConfig::parse_yaml(&body)
643 }
644 #[cfg(not(feature = "yaml"))]
645 {
646 Err(anyhow::anyhow!("YAML support requires the 'yaml' feature"))
647 }
648 } else {
649 crate::config::ProxyConfig::parse(&body)
650 };
651
652 let config = match config {
653 Ok(c) => c,
654 Err(e) => {
655 return (
656 StatusCode::BAD_REQUEST,
657 Json(BackendOpResponse {
658 ok: false,
659 message: format!("Invalid config: {e}"),
660 }),
661 );
662 }
663 };
664
665 let Some(path) = config_path else {
667 return (
668 StatusCode::BAD_REQUEST,
669 Json(BackendOpResponse {
670 ok: false,
671 message: "No config file path available (running in --from-mcp-json mode?)"
672 .to_string(),
673 }),
674 );
675 };
676
677 if let Err(e) = std::fs::write(&path, &body) {
678 return (
679 StatusCode::INTERNAL_SERVER_ERROR,
680 Json(BackendOpResponse {
681 ok: false,
682 message: format!("Failed to write config: {e}"),
683 }),
684 );
685 }
686
687 (
688 StatusCode::OK,
689 Json(BackendOpResponse {
690 ok: true,
691 message: format!(
692 "Config updated ({} backends). Hot reload will apply changes if enabled.",
693 config.backends.len()
694 ),
695 }),
696 )
697}
698
699async fn handle_circuit_breakers(
700 Extension(handles): Extension<Arc<std::collections::HashMap<String, crate::proxy::CbHandle>>>,
701) -> Json<Vec<CircuitBreakerStatus>> {
702 let mut statuses = Vec::new();
703 for (name, handle) in handles.iter() {
704 statuses.push(CircuitBreakerStatus {
705 backend: name.clone(),
706 state: format!("{:?}", handle.state()),
707 health: handle.health_status().to_string(),
708 });
709 }
710 statuses.sort_by(|a, b| a.backend.cmp(&b.backend));
711 Json(statuses)
712}
713
714#[derive(Serialize)]
715#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
716struct CircuitBreakerStatus {
717 backend: String,
718 state: String,
719 health: String,
720}
721
722#[cfg(feature = "openapi")]
726#[derive(utoipa::OpenApi)]
727#[openapi(
728 info(
729 title = "mcp-proxy Admin API",
730 description = "REST API for managing and monitoring the MCP proxy.",
731 version = "0.1.0",
732 ),
733 components(schemas(
734 AdminBackendsResponse,
735 ProxyInfo,
736 BackendStatus,
737 HealthResponse,
738 crate::cache::CacheStatsSnapshot,
739 SessionsResponse,
740 AddBackendRequest,
741 BackendOpResponse,
742 ))
743)]
744struct ApiDoc;
745
746#[cfg(feature = "openapi")]
747async fn handle_openapi() -> impl IntoResponse {
748 axum::Json(<ApiDoc as utoipa::OpenApi>::openapi())
749}
750
751#[allow(clippy::too_many_arguments)]
753pub fn admin_router(
754 state: AdminState,
755 metrics_handle: MetricsHandle,
756 session_handle: SessionHandle,
757 cache_handle: Option<crate::cache::CacheHandle>,
758 proxy: McpProxy,
759 config: &crate::config::ProxyConfig,
760 config_path: Option<std::path::PathBuf>,
761 cb_handles: std::collections::HashMap<String, crate::proxy::CbHandle>,
762) -> Router {
763 let config_toml = std::sync::Arc::new(toml::to_string_pretty(config).unwrap_or_default());
764
765 let router = Router::new()
766 .route("/backends", get(handle_backends))
768 .route("/health", get(handle_health))
769 .route("/cache/stats", get(handle_cache_stats))
770 .route("/cache/clear", axum::routing::post(handle_cache_clear))
771 .route("/metrics", get(handle_metrics))
772 .route("/sessions", get(handle_list_sessions))
773 .route("/sessions/detail", get(handle_list_sessions_detail))
774 .route("/sessions/{id}", delete(handle_terminate_session))
775 .route("/stats", get(handle_aggregate_stats))
776 .route("/circuit-breakers", get(handle_circuit_breakers))
777 .route("/config", get(handle_get_config).put(handle_update_config))
778 .route("/config/validate", post(handle_validate_config))
779 .route("/backends/{name}/health", get(handle_single_backend_health))
781 .route(
782 "/backends/{name}/health/history",
783 get(handle_backend_health_history),
784 )
785 .route("/backends/add", post(handle_add_backend))
787 .route(
788 "/backends/{name}",
789 delete(handle_remove_backend).put(handle_update_backend),
790 )
791 .layer(Extension(state))
792 .layer(Extension(session_handle))
793 .layer(Extension(cache_handle))
794 .layer(Extension(proxy))
795 .layer(Extension(config_toml))
796 .layer(Extension(config_path))
797 .layer(Extension(Arc::new(cb_handles)));
798
799 #[cfg(feature = "metrics")]
800 let router = router.layer(Extension(metrics_handle));
801 #[cfg(not(feature = "metrics"))]
802 let _ = metrics_handle;
803
804 #[cfg(feature = "openapi")]
805 let router = router.route("/openapi.json", get(handle_openapi));
806
807 let admin_tokens = resolve_admin_tokens(config);
809 if !admin_tokens.is_empty() {
810 tracing::info!(token_count = admin_tokens.len(), "Admin API auth enabled");
811 let validator = tower_mcp::auth::StaticBearerValidator::new(admin_tokens);
812 let layer = tower_mcp::auth::AuthLayer::new(validator);
813 router.layer(layer)
814 } else {
815 router
816 }
817}
818
819fn resolve_admin_tokens(config: &crate::config::ProxyConfig) -> Vec<String> {
821 if let Some(ref token) = config.security.admin_token {
823 return vec![token.clone()];
824 }
825
826 match &config.auth {
828 Some(crate::config::AuthConfig::Bearer {
829 tokens,
830 scoped_tokens,
831 }) => {
832 let mut all: Vec<String> = tokens.clone();
833 all.extend(scoped_tokens.iter().map(|st| st.token.clone()));
834 all
835 }
836 _ => vec![],
837 }
838}
839
840#[cfg(test)]
841mod tests {
842 use super::*;
843 use axum::body::Body;
844 use axum::http::Request;
845 use tower::ServiceExt;
846
847 fn make_state(statuses: Vec<BackendStatus>) -> AdminState {
848 test_admin_state("test-gw", "1.0.0", statuses.len(), statuses)
849 }
850
851 fn healthy_backend(name: &str) -> BackendStatus {
852 BackendStatus {
853 namespace: name.to_string(),
854 healthy: true,
855 last_checked_at: Some(Utc::now()),
856 consecutive_failures: 0,
857 error: None,
858 transport: Some("http".to_string()),
859 }
860 }
861
862 fn unhealthy_backend(name: &str) -> BackendStatus {
863 BackendStatus {
864 namespace: name.to_string(),
865 healthy: false,
866 last_checked_at: Some(Utc::now()),
867 consecutive_failures: 3,
868 error: Some("ping failed".to_string()),
869 transport: Some("stdio".to_string()),
870 }
871 }
872
873 async fn make_test_proxy() -> McpProxy {
874 use tower_mcp::client::ChannelTransport;
875 use tower_mcp::{CallToolResult, McpRouter, ToolBuilder};
876
877 let router = McpRouter::new().server_info("test", "1.0.0").tool(
878 ToolBuilder::new("ping")
879 .description("Ping")
880 .handler(|_: tower_mcp::NoParams| async move { Ok(CallToolResult::text("pong")) })
881 .build(),
882 );
883
884 McpProxy::builder("test-proxy", "1.0.0")
885 .backend("test", ChannelTransport::new(router))
886 .await
887 .build_strict()
888 .await
889 .unwrap()
890 }
891
892 fn make_test_config() -> crate::config::ProxyConfig {
893 crate::config::ProxyConfig::parse(
894 r#"
895 [proxy]
896 name = "test"
897 [proxy.listen]
898
899 [[backends]]
900 name = "echo"
901 transport = "stdio"
902 command = "echo"
903 "#,
904 )
905 .unwrap()
906 }
907
908 fn make_session_handle() -> SessionHandle {
909 let svc = tower::util::BoxCloneService::new(tower::service_fn(
911 |_req: tower_mcp::RouterRequest| async {
912 Ok::<_, std::convert::Infallible>(tower_mcp::RouterResponse {
913 id: tower_mcp::protocol::RequestId::Number(1),
914 inner: Ok(tower_mcp::protocol::McpResponse::Pong(Default::default())),
915 })
916 },
917 ));
918 let (_, handle) =
919 tower_mcp::transport::http::HttpTransport::from_service(svc).into_router_with_handle();
920 handle
921 }
922
923 async fn get_json(router: &Router, path: &str) -> serde_json::Value {
924 let resp = router
925 .clone()
926 .oneshot(Request::builder().uri(path).body(Body::empty()).unwrap())
927 .await
928 .unwrap();
929
930 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
931 .await
932 .unwrap();
933 serde_json::from_slice(&body).unwrap()
934 }
935
936 #[tokio::test]
937 async fn test_admin_state_accessors() {
938 let state = make_state(vec![healthy_backend("db/")]);
939 assert_eq!(state.proxy_name(), "test-gw");
940 assert_eq!(state.proxy_version(), "1.0.0");
941 assert_eq!(state.backend_count(), 1);
942
943 let health = state.health().await;
944 assert_eq!(health.len(), 1);
945 assert!(health[0].healthy);
946 }
947
948 #[tokio::test]
949 async fn test_health_endpoint_all_healthy() {
950 let state = make_state(vec![healthy_backend("db/"), healthy_backend("api/")]);
951 let session_handle = make_session_handle();
952 let router = admin_router(
953 state,
954 None,
955 session_handle,
956 None,
957 make_test_proxy().await,
958 &make_test_config(),
959 None,
960 std::collections::HashMap::new(),
961 );
962
963 let json = get_json(&router, "/health").await;
964 assert_eq!(json["status"], "healthy");
965 assert!(json["unhealthy_backends"].as_array().unwrap().is_empty());
966 }
967
968 #[tokio::test]
969 async fn test_health_endpoint_degraded() {
970 let state = make_state(vec![healthy_backend("db/"), unhealthy_backend("flaky/")]);
971 let session_handle = make_session_handle();
972 let router = admin_router(
973 state,
974 None,
975 session_handle,
976 None,
977 make_test_proxy().await,
978 &make_test_config(),
979 None,
980 std::collections::HashMap::new(),
981 );
982
983 let json = get_json(&router, "/health").await;
984 assert_eq!(json["status"], "degraded");
985 let unhealthy = json["unhealthy_backends"].as_array().unwrap();
986 assert_eq!(unhealthy.len(), 1);
987 assert_eq!(unhealthy[0], "flaky/");
988 }
989
990 #[tokio::test]
991 async fn test_backends_endpoint() {
992 let state = make_state(vec![healthy_backend("db/")]);
993 let session_handle = make_session_handle();
994 let router = admin_router(
995 state,
996 None,
997 session_handle,
998 None,
999 make_test_proxy().await,
1000 &make_test_config(),
1001 None,
1002 std::collections::HashMap::new(),
1003 );
1004
1005 let json = get_json(&router, "/backends").await;
1006 assert_eq!(json["proxy"]["name"], "test-gw");
1007 assert_eq!(json["proxy"]["version"], "1.0.0");
1008 assert_eq!(json["proxy"]["backend_count"], 1);
1009 assert_eq!(json["backends"].as_array().unwrap().len(), 1);
1010 assert_eq!(json["backends"][0]["namespace"], "db/");
1011 assert!(json["backends"][0]["healthy"].as_bool().unwrap());
1012 }
1013
1014 #[tokio::test]
1015 async fn test_cache_stats_no_cache() {
1016 let state = make_state(vec![]);
1017 let session_handle = make_session_handle();
1018 let router = admin_router(
1019 state,
1020 None,
1021 session_handle,
1022 None,
1023 make_test_proxy().await,
1024 &make_test_config(),
1025 None,
1026 std::collections::HashMap::new(),
1027 );
1028
1029 let json = get_json(&router, "/cache/stats").await;
1030 assert!(json.as_array().unwrap().is_empty());
1031 }
1032
1033 #[tokio::test]
1034 async fn test_cache_clear_no_cache() {
1035 let state = make_state(vec![]);
1036 let session_handle = make_session_handle();
1037 let router = admin_router(
1038 state,
1039 None,
1040 session_handle,
1041 None,
1042 make_test_proxy().await,
1043 &make_test_config(),
1044 None,
1045 std::collections::HashMap::new(),
1046 );
1047
1048 let resp = router
1049 .clone()
1050 .oneshot(
1051 Request::builder()
1052 .method("POST")
1053 .uri("/cache/clear")
1054 .body(Body::empty())
1055 .unwrap(),
1056 )
1057 .await
1058 .unwrap();
1059
1060 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
1061 .await
1062 .unwrap();
1063 assert_eq!(body.as_ref(), b"no caches configured");
1064 }
1065
1066 #[tokio::test]
1067 async fn test_metrics_endpoint_no_recorder() {
1068 let state = make_state(vec![]);
1069 let session_handle = make_session_handle();
1070 let router = admin_router(
1071 state,
1072 None,
1073 session_handle,
1074 None,
1075 make_test_proxy().await,
1076 &make_test_config(),
1077 None,
1078 std::collections::HashMap::new(),
1079 );
1080
1081 let resp = router
1082 .clone()
1083 .oneshot(
1084 Request::builder()
1085 .uri("/metrics")
1086 .body(Body::empty())
1087 .unwrap(),
1088 )
1089 .await
1090 .unwrap();
1091
1092 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
1093 .await
1094 .unwrap();
1095 assert!(body.is_empty());
1096 }
1097
1098 #[tokio::test]
1099 async fn test_single_backend_health() {
1100 let state = make_state(vec![healthy_backend("db/"), unhealthy_backend("flaky/")]);
1101 let session_handle = make_session_handle();
1102 let router = admin_router(
1103 state,
1104 None,
1105 session_handle,
1106 None,
1107 make_test_proxy().await,
1108 &make_test_config(),
1109 None,
1110 std::collections::HashMap::new(),
1111 );
1112
1113 let json = get_json(&router, "/backends/db/health").await;
1114 assert_eq!(json["namespace"], "db/");
1115 assert!(json["healthy"].as_bool().unwrap());
1116 }
1117
1118 #[tokio::test]
1119 async fn test_single_backend_health_not_found() {
1120 let state = make_state(vec![healthy_backend("db/")]);
1121 let session_handle = make_session_handle();
1122 let router = admin_router(
1123 state,
1124 None,
1125 session_handle,
1126 None,
1127 make_test_proxy().await,
1128 &make_test_config(),
1129 None,
1130 std::collections::HashMap::new(),
1131 );
1132
1133 let resp = router
1134 .clone()
1135 .oneshot(
1136 Request::builder()
1137 .uri("/backends/nonexistent/health")
1138 .body(Body::empty())
1139 .unwrap(),
1140 )
1141 .await
1142 .unwrap();
1143 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
1144 }
1145
1146 #[tokio::test]
1147 async fn test_aggregate_stats() {
1148 let state = make_state(vec![healthy_backend("db/"), unhealthy_backend("flaky/")]);
1149 let session_handle = make_session_handle();
1150 let router = admin_router(
1151 state,
1152 None,
1153 session_handle,
1154 None,
1155 make_test_proxy().await,
1156 &make_test_config(),
1157 None,
1158 std::collections::HashMap::new(),
1159 );
1160
1161 let json = get_json(&router, "/stats").await;
1162 assert_eq!(json["total_backends"], 2);
1163 assert_eq!(json["healthy_backends"], 1);
1164 assert_eq!(json["unhealthy_backends"], 1);
1165 }
1166
1167 #[tokio::test]
1168 async fn test_health_history_empty() {
1169 let state = make_state(vec![healthy_backend("db/")]);
1170 let session_handle = make_session_handle();
1171 let router = admin_router(
1172 state,
1173 None,
1174 session_handle,
1175 None,
1176 make_test_proxy().await,
1177 &make_test_config(),
1178 None,
1179 std::collections::HashMap::new(),
1180 );
1181
1182 let json = get_json(&router, "/backends/db/health/history").await;
1183 assert!(json.as_array().unwrap().is_empty());
1184 }
1185
1186 #[tokio::test]
1187 async fn test_health_history_with_events() {
1188 let state = make_state(vec![healthy_backend("db/")]);
1189 {
1191 let mut history = state.health_history.write().await;
1192 history.push(HealthEvent {
1193 namespace: "db/".to_string(),
1194 healthy: true,
1195 timestamp: Utc::now(),
1196 });
1197 history.push(HealthEvent {
1198 namespace: "db/".to_string(),
1199 healthy: false,
1200 timestamp: Utc::now(),
1201 });
1202 history.push(HealthEvent {
1203 namespace: "other/".to_string(),
1204 healthy: false,
1205 timestamp: Utc::now(),
1206 });
1207 }
1208
1209 let session_handle = make_session_handle();
1210 let router = admin_router(
1211 state,
1212 None,
1213 session_handle,
1214 None,
1215 make_test_proxy().await,
1216 &make_test_config(),
1217 None,
1218 std::collections::HashMap::new(),
1219 );
1220
1221 let json = get_json(&router, "/backends/db/health/history").await;
1223 let events = json.as_array().unwrap();
1224 assert_eq!(events.len(), 2);
1225 assert!(events[0]["healthy"].as_bool().unwrap());
1226 assert!(!events[1]["healthy"].as_bool().unwrap());
1227 }
1228
1229 #[tokio::test]
1230 async fn test_sessions_endpoint() {
1231 let state = make_state(vec![]);
1232 let session_handle = make_session_handle();
1233 let router = admin_router(
1234 state,
1235 None,
1236 session_handle,
1237 None,
1238 make_test_proxy().await,
1239 &make_test_config(),
1240 None,
1241 std::collections::HashMap::new(),
1242 );
1243
1244 let json = get_json(&router, "/sessions").await;
1245 assert_eq!(json["active_sessions"], 0);
1246 }
1247
1248 #[tokio::test]
1249 async fn test_sessions_detail_empty() {
1250 let state = make_state(vec![]);
1251 let session_handle = make_session_handle();
1252 let router = admin_router(
1253 state,
1254 None,
1255 session_handle,
1256 None,
1257 make_test_proxy().await,
1258 &make_test_config(),
1259 None,
1260 std::collections::HashMap::new(),
1261 );
1262
1263 let json = get_json(&router, "/sessions/detail").await;
1264 let sessions = json.as_array().unwrap();
1265 assert!(sessions.is_empty());
1266 }
1267
1268 #[tokio::test]
1269 async fn test_terminate_session_not_found() {
1270 let state = make_state(vec![]);
1271 let session_handle = make_session_handle();
1272 let router = admin_router(
1273 state,
1274 None,
1275 session_handle,
1276 None,
1277 make_test_proxy().await,
1278 &make_test_config(),
1279 None,
1280 std::collections::HashMap::new(),
1281 );
1282
1283 let resp = router
1284 .clone()
1285 .oneshot(
1286 Request::builder()
1287 .method("DELETE")
1288 .uri("/sessions/nonexistent-id")
1289 .body(Body::empty())
1290 .unwrap(),
1291 )
1292 .await
1293 .unwrap();
1294 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
1295
1296 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
1297 .await
1298 .unwrap();
1299 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
1300 assert!(!json["ok"].as_bool().unwrap());
1301 assert!(json["message"].as_str().unwrap().contains("not found"));
1302 }
1303
1304 #[tokio::test]
1305 async fn test_sessions_detail_with_active_session() {
1306 let svc = tower::util::BoxCloneService::new(tower::service_fn(
1308 |_req: tower_mcp::RouterRequest| async {
1309 Ok::<_, std::convert::Infallible>(tower_mcp::RouterResponse {
1310 id: tower_mcp::protocol::RequestId::Number(1),
1311 inner: Ok(tower_mcp::protocol::McpResponse::Initialize(
1312 tower_mcp::protocol::InitializeResult {
1313 protocol_version: "2025-03-26".to_string(),
1314 server_info: tower_mcp::protocol::Implementation {
1315 name: "test".to_string(),
1316 version: "1.0.0".to_string(),
1317 ..Default::default()
1318 },
1319 capabilities: Default::default(),
1320 instructions: None,
1321 meta: None,
1322 },
1323 )),
1324 })
1325 },
1326 ));
1327
1328 let (http_router, session_handle) =
1329 tower_mcp::transport::http::HttpTransport::from_service(svc).into_router_with_handle();
1330
1331 let init_body = serde_json::json!({
1333 "jsonrpc": "2.0",
1334 "id": 1,
1335 "method": "initialize",
1336 "params": {
1337 "protocolVersion": "2025-03-26",
1338 "clientInfo": { "name": "test", "version": "1.0.0" },
1339 "capabilities": {}
1340 }
1341 });
1342
1343 let resp = http_router
1344 .clone()
1345 .oneshot(
1346 Request::builder()
1347 .method("POST")
1348 .uri("/")
1349 .header("Content-Type", "application/json")
1350 .body(Body::from(serde_json::to_string(&init_body).unwrap()))
1351 .unwrap(),
1352 )
1353 .await
1354 .unwrap();
1355 assert_eq!(resp.status(), 200);
1356
1357 let session_id = resp
1359 .headers()
1360 .get("mcp-session-id")
1361 .expect("initialize response should include mcp-session-id header")
1362 .to_str()
1363 .unwrap()
1364 .to_string();
1365
1366 let state = make_state(vec![]);
1368 let admin = admin_router(
1369 state,
1370 None,
1371 session_handle.clone(),
1372 None,
1373 make_test_proxy().await,
1374 &make_test_config(),
1375 None,
1376 std::collections::HashMap::new(),
1377 );
1378
1379 let json = get_json(&admin, "/sessions").await;
1381 assert_eq!(json["active_sessions"], 1);
1382
1383 let json = get_json(&admin, "/sessions/detail").await;
1385 let sessions = json.as_array().unwrap();
1386 assert_eq!(sessions.len(), 1);
1387 assert!(!sessions[0]["id"].as_str().unwrap().is_empty());
1388 assert!(sessions[0]["uptime_seconds"].is_number());
1389 assert!(sessions[0]["idle_seconds"].is_number());
1390
1391 let resp = admin
1393 .clone()
1394 .oneshot(
1395 Request::builder()
1396 .method("DELETE")
1397 .uri(format!("/sessions/{}", session_id))
1398 .body(Body::empty())
1399 .unwrap(),
1400 )
1401 .await
1402 .unwrap();
1403 assert_eq!(resp.status(), StatusCode::OK);
1404
1405 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
1406 .await
1407 .unwrap();
1408 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
1409 assert!(json["ok"].as_bool().unwrap());
1410
1411 let json = get_json(&admin, "/sessions").await;
1413 assert_eq!(json["active_sessions"], 0);
1414
1415 let resp = admin
1417 .clone()
1418 .oneshot(
1419 Request::builder()
1420 .method("DELETE")
1421 .uri(format!("/sessions/{}", session_id))
1422 .body(Body::empty())
1423 .unwrap(),
1424 )
1425 .await
1426 .unwrap();
1427 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
1428 }
1429}