Skip to main content

mcp_proxy/
admin.rs

1//! Admin HTTP API for proxy introspection and management.
2//!
3//! The admin API runs on the same HTTP server as the MCP endpoint and is
4//! mounted under the `/admin` path prefix. It provides read-only
5//! introspection endpoints and management operations for backends, sessions,
6//! caching, and configuration.
7//!
8//! # Endpoint reference
9//!
10//! | Method | Path | Description |
11//! |--------|------|-------------|
12//! | `GET` | `/admin/health` | Aggregated health status of all backends |
13//! | `GET` | `/admin/backends` | List all backends with metadata |
14//! | `GET` | `/admin/backends/{name}/health` | Health status for a single backend |
15//! | `GET` | `/admin/backends/{name}/health/history` | Health state transition history for a backend |
16//! | `POST` | `/admin/backends/add` | Add a new backend at runtime |
17//! | `PUT` | `/admin/backends/{name}` | Update an existing backend's configuration |
18//! | `DELETE` | `/admin/backends/{name}` | Remove a backend at runtime |
19//! | `GET` | `/admin/sessions` | List active MCP sessions (summary) |
20//! | `GET` | `/admin/sessions/detail` | List active MCP sessions (full detail) |
21//! | `DELETE` | `/admin/sessions/{id}` | Terminate a session by ID |
22//! | `GET` | `/admin/stats` | Aggregated proxy statistics |
23//! | `GET` | `/admin/cache/stats` | Cache hit/miss statistics |
24//! | `POST` | `/admin/cache/clear` | Flush the response cache |
25//! | `GET` | `/admin/config` | Current proxy configuration (sanitized) |
26//! | `PUT` | `/admin/config` | Hot-reload proxy configuration |
27//! | `POST` | `/admin/config/validate` | Validate a config without applying it |
28//! | `GET` | `/admin/metrics` | Prometheus metrics scrape endpoint (feature `metrics`) |
29//! | `GET` | `/admin/openapi.json` | OpenAPI spec (feature `openapi`) |
30//!
31//! # Health checking
32//!
33//! Backend health is monitored by a background task ([`spawn_health_checker`])
34//! that periodically sends `ping` requests to each backend and records
35//! pass/fail status. The cached results are stored in [`AdminState`] and
36//! served by the `/admin/health` endpoint without per-request latency.
37//!
38//! Health state transitions (healthy -> unhealthy or vice versa) are recorded
39//! in a ring buffer of [`HealthEvent`] entries (capped at 100 events). The
40//! per-backend history is available at `/admin/backends/{name}/health/history`.
41//!
42//! # Session management
43//!
44//! The proxy tracks active MCP sessions (SSE and Streamable HTTP transports).
45//! The `/admin/sessions` endpoint lists session IDs and metadata, while
46//! `/admin/sessions/{id}` (DELETE) terminates a session by closing its
47//! transport.
48//!
49//! # Cache management
50//!
51//! When response caching is enabled, `/admin/cache/stats` reports hit/miss
52//! counts and `/admin/cache/clear` flushes all cached entries.
53//!
54//! # Authentication
55//!
56//! Admin endpoints are protected by a token-based auth scheme. The proxy
57//! resolves the admin token using a fallback chain:
58//!
59//! 1. `security.admin_token` -- explicit bearer token for admin access.
60//! 2. Proxy's inbound auth config -- reuses the same auth as MCP endpoints.
61//! 3. If neither is set, the admin API is open (suitable for local/dev use).
62//!
63//! The token supports `${ENV_VAR}` syntax for environment variable expansion.
64
65use std::collections::HashMap;
66use std::sync::Arc;
67use std::time::Duration;
68
69use axum::Router;
70use axum::extract::{Extension, Path};
71use axum::http::StatusCode;
72use axum::response::{IntoResponse, Json};
73use axum::routing::{delete, get, post};
74use chrono::{DateTime, Utc};
75use serde::{Deserialize, Serialize};
76use tokio::sync::RwLock;
77use tower_mcp::SessionHandle;
78use tower_mcp::proxy::McpProxy;
79
80/// Cached health status, updated periodically by a background task.
81#[derive(Clone)]
82pub struct AdminState {
83    health: Arc<RwLock<Vec<BackendStatus>>>,
84    health_history: Arc<RwLock<Vec<HealthEvent>>>,
85    proxy_name: String,
86    proxy_version: String,
87    backend_count: usize,
88}
89
90/// A health state transition event.
91#[derive(Debug, Clone, Serialize)]
92#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
93pub struct HealthEvent {
94    /// Backend namespace.
95    pub namespace: String,
96    /// New health state.
97    pub healthy: bool,
98    /// When the transition occurred.
99    pub timestamp: DateTime<Utc>,
100}
101
102/// Maximum health history events to retain.
103const MAX_HEALTH_HISTORY: usize = 100;
104
105impl AdminState {
106    /// Get a snapshot of backend health status.
107    pub async fn health(&self) -> Vec<BackendStatus> {
108        self.health.read().await.clone()
109    }
110
111    /// Proxy name from config.
112    pub fn proxy_name(&self) -> &str {
113        &self.proxy_name
114    }
115
116    /// Proxy version from config.
117    pub fn proxy_version(&self) -> &str {
118        &self.proxy_version
119    }
120
121    /// Number of configured backends.
122    pub fn backend_count(&self) -> usize {
123        self.backend_count
124    }
125}
126
127/// Health status of a single backend, updated by the background health checker.
128#[derive(Serialize, Clone)]
129#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
130pub struct BackendStatus {
131    /// Backend namespace (e.g. "db/").
132    pub namespace: String,
133    /// Whether the backend responded to the last health check.
134    pub healthy: bool,
135    /// Timestamp of the last health check.
136    pub last_checked_at: Option<DateTime<Utc>>,
137    /// Number of consecutive failed health checks.
138    pub consecutive_failures: u32,
139    /// Last error message from a failed health check.
140    pub error: Option<String>,
141    /// Transport type (e.g. "stdio", "http").
142    pub transport: Option<String>,
143}
144
145#[derive(Serialize)]
146#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
147struct AdminBackendsResponse {
148    proxy: ProxyInfo,
149    backends: Vec<BackendStatus>,
150}
151
152#[derive(Serialize)]
153#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
154struct ProxyInfo {
155    name: String,
156    version: String,
157    backend_count: usize,
158    active_sessions: usize,
159}
160
161/// Per-backend metadata passed in from config at startup.
162#[derive(Clone)]
163pub struct BackendMeta {
164    /// Transport type string (e.g. "stdio", "http").
165    pub transport: String,
166}
167
168/// Spawn a background task that periodically health-checks backends.
169/// Returns the AdminState that admin endpoints read from.
170pub fn spawn_health_checker(
171    proxy: McpProxy,
172    proxy_name: String,
173    proxy_version: String,
174    backend_count: usize,
175    backend_meta: HashMap<String, BackendMeta>,
176) -> AdminState {
177    let health: Arc<RwLock<Vec<BackendStatus>>> = Arc::new(RwLock::new(Vec::new()));
178    let health_writer = Arc::clone(&health);
179    let health_history: Arc<RwLock<Vec<HealthEvent>>> = Arc::new(RwLock::new(Vec::new()));
180    let history_writer = Arc::clone(&health_history);
181
182    std::thread::spawn(move || {
183        let rt = tokio::runtime::Builder::new_current_thread()
184            .enable_all()
185            .build()
186            .expect("admin health check runtime");
187
188        let mut failure_counts: HashMap<String, u32> = HashMap::new();
189        // Track previous health state for transition detection
190        let mut prev_healthy: HashMap<String, bool> = HashMap::new();
191
192        rt.block_on(async move {
193            loop {
194                let results = proxy.health_check().await;
195                let now = Utc::now();
196                let mut transitions = Vec::new();
197
198                let statuses: Vec<BackendStatus> = results
199                    .into_iter()
200                    .map(|h| {
201                        let count = failure_counts.entry(h.namespace.clone()).or_insert(0);
202                        if h.healthy {
203                            *count = 0;
204                        } else {
205                            *count += 1;
206                        }
207
208                        // Detect health state transitions
209                        let prev = prev_healthy.get(&h.namespace).copied();
210                        if prev != Some(h.healthy) {
211                            transitions.push(HealthEvent {
212                                namespace: h.namespace.clone(),
213                                healthy: h.healthy,
214                                timestamp: now,
215                            });
216                            prev_healthy.insert(h.namespace.clone(), h.healthy);
217                        }
218
219                        let meta = backend_meta.get(&h.namespace);
220                        BackendStatus {
221                            namespace: h.namespace,
222                            healthy: h.healthy,
223                            last_checked_at: Some(now),
224                            consecutive_failures: *count,
225                            error: if h.healthy {
226                                None
227                            } else {
228                                Some("ping failed".to_string())
229                            },
230                            transport: meta.map(|m| m.transport.clone()),
231                        }
232                    })
233                    .collect();
234
235                *health_writer.write().await = statuses;
236
237                // Record transitions to history
238                if !transitions.is_empty() {
239                    let mut history = history_writer.write().await;
240                    history.extend(transitions);
241                    // Trim to max size
242                    if history.len() > MAX_HEALTH_HISTORY {
243                        let excess = history.len() - MAX_HEALTH_HISTORY;
244                        history.drain(..excess);
245                    }
246                }
247
248                tokio::time::sleep(Duration::from_secs(10)).await;
249            }
250        });
251    });
252
253    AdminState {
254        health,
255        health_history,
256        proxy_name,
257        proxy_version,
258        backend_count,
259    }
260}
261
262async fn handle_backends(
263    Extension(state): Extension<AdminState>,
264    Extension(session_handle): Extension<SessionHandle>,
265) -> Json<AdminBackendsResponse> {
266    let backends = state.health.read().await.clone();
267    let active_sessions = session_handle.session_count().await;
268
269    Json(AdminBackendsResponse {
270        proxy: ProxyInfo {
271            name: state.proxy_name,
272            version: state.proxy_version,
273            backend_count: state.backend_count,
274            active_sessions,
275        },
276        backends,
277    })
278}
279
280async fn handle_health(Extension(state): Extension<AdminState>) -> Json<HealthResponse> {
281    let backends = state.health.read().await;
282    let all_healthy = backends.iter().all(|b| b.healthy);
283    let unhealthy: Vec<String> = backends
284        .iter()
285        .filter(|b| !b.healthy)
286        .map(|b| b.namespace.clone())
287        .collect();
288    Json(HealthResponse {
289        status: if all_healthy { "healthy" } else { "degraded" }.to_string(),
290        unhealthy_backends: unhealthy,
291    })
292}
293
294#[derive(Serialize)]
295#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
296struct HealthResponse {
297    status: String,
298    unhealthy_backends: Vec<String>,
299}
300
301#[cfg(feature = "metrics")]
302async fn handle_metrics(
303    Extension(handle): Extension<Option<metrics_exporter_prometheus::PrometheusHandle>>,
304) -> impl IntoResponse {
305    match handle {
306        Some(h) => h.render(),
307        None => String::new(),
308    }
309}
310
311#[cfg(not(feature = "metrics"))]
312async fn handle_metrics() -> impl IntoResponse {
313    String::new()
314}
315
316async fn handle_cache_stats(
317    Extension(cache_handle): Extension<Option<crate::cache::CacheHandle>>,
318) -> Json<Vec<crate::cache::CacheStatsSnapshot>> {
319    match cache_handle {
320        Some(h) => Json(h.stats().await),
321        None => Json(vec![]),
322    }
323}
324
325async fn handle_cache_clear(
326    Extension(cache_handle): Extension<Option<crate::cache::CacheHandle>>,
327) -> &'static str {
328    if let Some(h) = cache_handle {
329        h.clear().await;
330        "caches cleared"
331    } else {
332        "no caches configured"
333    }
334}
335
336/// Create an `AdminState` directly for testing.
337#[cfg(test)]
338pub(crate) fn test_admin_state(
339    proxy_name: &str,
340    proxy_version: &str,
341    backend_count: usize,
342    statuses: Vec<BackendStatus>,
343) -> AdminState {
344    AdminState {
345        health: Arc::new(RwLock::new(statuses)),
346        health_history: Arc::new(RwLock::new(Vec::new())),
347        proxy_name: proxy_name.to_string(),
348        proxy_version: proxy_version.to_string(),
349        backend_count,
350    }
351}
352
353/// Metrics handle type -- wraps the Prometheus handle when the feature is enabled.
354#[cfg(feature = "metrics")]
355pub type MetricsHandle = Option<metrics_exporter_prometheus::PrometheusHandle>;
356/// Metrics handle type -- no-op when the metrics feature is disabled.
357#[cfg(not(feature = "metrics"))]
358pub type MetricsHandle = Option<()>;
359
360// ---------------------------------------------------------------------------
361// Management API handlers
362// ---------------------------------------------------------------------------
363
364/// Request body for adding an HTTP backend via REST.
365#[derive(Debug, Deserialize)]
366#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
367struct AddBackendRequest {
368    /// Backend name (becomes the namespace prefix).
369    name: String,
370    /// Backend URL.
371    url: String,
372    /// Optional bearer token for the backend.
373    bearer_token: Option<String>,
374}
375
376/// Response body for backend operations.
377#[derive(Serialize)]
378#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
379struct BackendOpResponse {
380    ok: bool,
381    message: String,
382}
383
384async fn handle_add_backend(
385    Extension(proxy): Extension<McpProxy>,
386    Json(req): Json<AddBackendRequest>,
387) -> (StatusCode, Json<BackendOpResponse>) {
388    let mut transport = tower_mcp::client::HttpClientTransport::new(&req.url);
389    if let Some(token) = &req.bearer_token {
390        transport = transport.bearer_token(token);
391    }
392
393    match proxy.add_backend(&req.name, transport).await {
394        Ok(()) => (
395            StatusCode::CREATED,
396            Json(BackendOpResponse {
397                ok: true,
398                message: format!("Backend '{}' added", req.name),
399            }),
400        ),
401        Err(e) => (
402            StatusCode::CONFLICT,
403            Json(BackendOpResponse {
404                ok: false,
405                message: format!("Failed to add backend: {e}"),
406            }),
407        ),
408    }
409}
410
411async fn handle_remove_backend(
412    Extension(proxy): Extension<McpProxy>,
413    Path(name): Path<String>,
414) -> (StatusCode, Json<BackendOpResponse>) {
415    if proxy.remove_backend(&name).await {
416        (
417            StatusCode::OK,
418            Json(BackendOpResponse {
419                ok: true,
420                message: format!("Backend '{}' removed", name),
421            }),
422        )
423    } else {
424        (
425            StatusCode::NOT_FOUND,
426            Json(BackendOpResponse {
427                ok: false,
428                message: format!("Backend '{}' not found", name),
429            }),
430        )
431    }
432}
433
434async fn handle_get_config(
435    Extension(config_toml): Extension<std::sync::Arc<String>>,
436) -> impl IntoResponse {
437    config_toml.as_str().to_string()
438}
439
440async fn handle_validate_config(body: String) -> (StatusCode, Json<BackendOpResponse>) {
441    match crate::config::ProxyConfig::parse(&body) {
442        Ok(config) => (
443            StatusCode::OK,
444            Json(BackendOpResponse {
445                ok: true,
446                message: format!("Valid config with {} backends", config.backends.len()),
447            }),
448        ),
449        Err(e) => (
450            StatusCode::BAD_REQUEST,
451            Json(BackendOpResponse {
452                ok: false,
453                message: format!("Invalid config: {e}"),
454            }),
455        ),
456    }
457}
458
459async fn handle_list_sessions(
460    Extension(session_handle): Extension<SessionHandle>,
461) -> Json<SessionsResponse> {
462    Json(SessionsResponse {
463        active_sessions: session_handle.session_count().await,
464    })
465}
466
467#[derive(Serialize)]
468#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
469struct SessionsResponse {
470    active_sessions: usize,
471}
472
473async fn handle_backend_health_history(
474    Extension(state): Extension<AdminState>,
475    Path(name): Path<String>,
476) -> Json<Vec<HealthEvent>> {
477    let history = state.health_history.read().await;
478    let filtered: Vec<HealthEvent> = history
479        .iter()
480        .filter(|e| {
481            e.namespace == name
482                || e.namespace == format!("{name}/")
483                || e.namespace.trim_end_matches('/') == name
484        })
485        .cloned()
486        .collect();
487    Json(filtered)
488}
489
490async fn handle_update_backend(
491    Extension(proxy): Extension<McpProxy>,
492    Path(name): Path<String>,
493    Json(req): Json<UpdateBackendRequest>,
494) -> (StatusCode, Json<BackendOpResponse>) {
495    // Remove existing backend
496    if !proxy.remove_backend(&name).await {
497        return (
498            StatusCode::NOT_FOUND,
499            Json(BackendOpResponse {
500                ok: false,
501                message: format!("Backend '{name}' not found"),
502            }),
503        );
504    }
505
506    // Re-add with new config
507    let mut transport = tower_mcp::client::HttpClientTransport::new(&req.url);
508    if let Some(token) = &req.bearer_token {
509        transport = transport.bearer_token(token);
510    }
511
512    match proxy.add_backend(&name, transport).await {
513        Ok(()) => (
514            StatusCode::OK,
515            Json(BackendOpResponse {
516                ok: true,
517                message: format!("Backend '{name}' updated"),
518            }),
519        ),
520        Err(e) => (
521            StatusCode::INTERNAL_SERVER_ERROR,
522            Json(BackendOpResponse {
523                ok: false,
524                message: format!("Failed to re-add backend after removal: {e}"),
525            }),
526        ),
527    }
528}
529
530#[derive(Debug, Deserialize)]
531#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
532struct UpdateBackendRequest {
533    /// New backend URL.
534    url: String,
535    /// Optional bearer token for the backend.
536    bearer_token: Option<String>,
537}
538
539async fn handle_single_backend_health(
540    Extension(state): Extension<AdminState>,
541    Path(name): Path<String>,
542) -> Result<Json<BackendStatus>, StatusCode> {
543    let backends = state.health.read().await;
544    // Match by namespace (with or without trailing separator)
545    backends
546        .iter()
547        .find(|b| {
548            b.namespace == name
549                || b.namespace == format!("{name}/")
550                || b.namespace.trim_end_matches('/') == name
551        })
552        .cloned()
553        .map(Json)
554        .ok_or(StatusCode::NOT_FOUND)
555}
556
557async fn handle_aggregate_stats(
558    Extension(state): Extension<AdminState>,
559    Extension(session_handle): Extension<SessionHandle>,
560) -> Json<AggregateStats> {
561    let backends = state.health.read().await;
562    let total = backends.len();
563    let healthy = backends.iter().filter(|b| b.healthy).count();
564    let active_sessions = session_handle.session_count().await;
565    Json(AggregateStats {
566        total_backends: total,
567        healthy_backends: healthy,
568        unhealthy_backends: total - healthy,
569        active_sessions,
570    })
571}
572
573#[derive(Serialize)]
574#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
575struct AggregateStats {
576    total_backends: usize,
577    healthy_backends: usize,
578    unhealthy_backends: usize,
579    active_sessions: usize,
580}
581
582async fn handle_list_sessions_detail(
583    Extension(session_handle): Extension<SessionHandle>,
584) -> Json<Vec<SessionInfoResponse>> {
585    let sessions = session_handle.list_sessions().await;
586    Json(
587        sessions
588            .into_iter()
589            .map(|s| SessionInfoResponse {
590                id: s.id,
591                uptime_seconds: s.created_at.as_secs(),
592                idle_seconds: s.last_activity.as_secs(),
593            })
594            .collect(),
595    )
596}
597
598#[derive(Serialize)]
599#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
600struct SessionInfoResponse {
601    id: String,
602    uptime_seconds: u64,
603    idle_seconds: u64,
604}
605
606async fn handle_terminate_session(
607    Extension(session_handle): Extension<SessionHandle>,
608    Path(id): Path<String>,
609) -> (StatusCode, Json<BackendOpResponse>) {
610    if session_handle.terminate_session(&id).await {
611        (
612            StatusCode::OK,
613            Json(BackendOpResponse {
614                ok: true,
615                message: format!("Session '{id}' terminated"),
616            }),
617        )
618    } else {
619        (
620            StatusCode::NOT_FOUND,
621            Json(BackendOpResponse {
622                ok: false,
623                message: format!("Session '{id}' not found"),
624            }),
625        )
626    }
627}
628
629async fn handle_update_config(
630    Extension(config_path): Extension<Option<std::path::PathBuf>>,
631    body: String,
632) -> (StatusCode, Json<BackendOpResponse>) {
633    // Detect format from config file extension (or try TOML then YAML)
634    let is_yaml = config_path
635        .as_ref()
636        .and_then(|p| p.extension())
637        .is_some_and(|ext| ext == "yaml" || ext == "yml");
638
639    let config = if is_yaml {
640        #[cfg(feature = "yaml")]
641        {
642            crate::config::ProxyConfig::parse_yaml(&body)
643        }
644        #[cfg(not(feature = "yaml"))]
645        {
646            Err(anyhow::anyhow!("YAML support requires the 'yaml' feature"))
647        }
648    } else {
649        crate::config::ProxyConfig::parse(&body)
650    };
651
652    let config = match config {
653        Ok(c) => c,
654        Err(e) => {
655            return (
656                StatusCode::BAD_REQUEST,
657                Json(BackendOpResponse {
658                    ok: false,
659                    message: format!("Invalid config: {e}"),
660                }),
661            );
662        }
663    };
664
665    // Write to disk if we have a config path (triggers hot reload if enabled)
666    let Some(path) = config_path else {
667        return (
668            StatusCode::BAD_REQUEST,
669            Json(BackendOpResponse {
670                ok: false,
671                message: "No config file path available (running in --from-mcp-json mode?)"
672                    .to_string(),
673            }),
674        );
675    };
676
677    if let Err(e) = std::fs::write(&path, &body) {
678        return (
679            StatusCode::INTERNAL_SERVER_ERROR,
680            Json(BackendOpResponse {
681                ok: false,
682                message: format!("Failed to write config: {e}"),
683            }),
684        );
685    }
686
687    (
688        StatusCode::OK,
689        Json(BackendOpResponse {
690            ok: true,
691            message: format!(
692                "Config updated ({} backends). Hot reload will apply changes if enabled.",
693                config.backends.len()
694            ),
695        }),
696    )
697}
698
699async fn handle_circuit_breakers(
700    Extension(handles): Extension<Arc<std::collections::HashMap<String, crate::proxy::CbHandle>>>,
701) -> Json<Vec<CircuitBreakerStatus>> {
702    let mut statuses = Vec::new();
703    for (name, handle) in handles.iter() {
704        statuses.push(CircuitBreakerStatus {
705            backend: name.clone(),
706            state: format!("{:?}", handle.state()),
707            health: handle.health_status().to_string(),
708        });
709    }
710    statuses.sort_by(|a, b| a.backend.cmp(&b.backend));
711    Json(statuses)
712}
713
714#[derive(Serialize)]
715#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
716struct CircuitBreakerStatus {
717    backend: String,
718    state: String,
719    health: String,
720}
721
722/// OpenAPI spec for the admin API.
723///
724/// Available at `GET /admin/openapi.json` when the `openapi` feature is enabled.
725#[cfg(feature = "openapi")]
726#[derive(utoipa::OpenApi)]
727#[openapi(
728    info(
729        title = "mcp-proxy Admin API",
730        description = "REST API for managing and monitoring the MCP proxy.",
731        version = "0.1.0",
732    ),
733    components(schemas(
734        AdminBackendsResponse,
735        ProxyInfo,
736        BackendStatus,
737        HealthResponse,
738        crate::cache::CacheStatsSnapshot,
739        SessionsResponse,
740        AddBackendRequest,
741        BackendOpResponse,
742    ))
743)]
744struct ApiDoc;
745
746#[cfg(feature = "openapi")]
747async fn handle_openapi() -> impl IntoResponse {
748    axum::Json(<ApiDoc as utoipa::OpenApi>::openapi())
749}
750
751/// Build the admin API router.
752#[allow(clippy::too_many_arguments)]
753pub fn admin_router(
754    state: AdminState,
755    metrics_handle: MetricsHandle,
756    session_handle: SessionHandle,
757    cache_handle: Option<crate::cache::CacheHandle>,
758    proxy: McpProxy,
759    config: &crate::config::ProxyConfig,
760    config_path: Option<std::path::PathBuf>,
761    cb_handles: std::collections::HashMap<String, crate::proxy::CbHandle>,
762) -> Router {
763    let config_toml = std::sync::Arc::new(toml::to_string_pretty(config).unwrap_or_default());
764
765    let router = Router::new()
766        // Read-only endpoints
767        .route("/backends", get(handle_backends))
768        .route("/health", get(handle_health))
769        .route("/cache/stats", get(handle_cache_stats))
770        .route("/cache/clear", axum::routing::post(handle_cache_clear))
771        .route("/metrics", get(handle_metrics))
772        .route("/sessions", get(handle_list_sessions))
773        .route("/sessions/detail", get(handle_list_sessions_detail))
774        .route("/sessions/{id}", delete(handle_terminate_session))
775        .route("/stats", get(handle_aggregate_stats))
776        .route("/circuit-breakers", get(handle_circuit_breakers))
777        .route("/config", get(handle_get_config).put(handle_update_config))
778        .route("/config/validate", post(handle_validate_config))
779        // Per-backend endpoints
780        .route("/backends/{name}/health", get(handle_single_backend_health))
781        .route(
782            "/backends/{name}/health/history",
783            get(handle_backend_health_history),
784        )
785        // Management endpoints
786        .route("/backends/add", post(handle_add_backend))
787        .route(
788            "/backends/{name}",
789            delete(handle_remove_backend).put(handle_update_backend),
790        )
791        .layer(Extension(state))
792        .layer(Extension(session_handle))
793        .layer(Extension(cache_handle))
794        .layer(Extension(proxy))
795        .layer(Extension(config_toml))
796        .layer(Extension(config_path))
797        .layer(Extension(Arc::new(cb_handles)));
798
799    #[cfg(feature = "metrics")]
800    let router = router.layer(Extension(metrics_handle));
801    #[cfg(not(feature = "metrics"))]
802    let _ = metrics_handle;
803
804    #[cfg(feature = "openapi")]
805    let router = router.route("/openapi.json", get(handle_openapi));
806
807    // Admin API auth: admin_token takes priority, then fall back to proxy bearer tokens
808    let admin_tokens = resolve_admin_tokens(config);
809    if !admin_tokens.is_empty() {
810        tracing::info!(token_count = admin_tokens.len(), "Admin API auth enabled");
811        let validator = tower_mcp::auth::StaticBearerValidator::new(admin_tokens);
812        let layer = tower_mcp::auth::AuthLayer::new(validator);
813        router.layer(layer)
814    } else {
815        router
816    }
817}
818
819/// Resolve admin auth tokens: admin_token if set, otherwise proxy bearer tokens.
820fn resolve_admin_tokens(config: &crate::config::ProxyConfig) -> Vec<String> {
821    // Explicit admin token takes priority
822    if let Some(ref token) = config.security.admin_token {
823        return vec![token.clone()];
824    }
825
826    // Fall back to proxy bearer tokens
827    match &config.auth {
828        Some(crate::config::AuthConfig::Bearer {
829            tokens,
830            scoped_tokens,
831        }) => {
832            let mut all: Vec<String> = tokens.clone();
833            all.extend(scoped_tokens.iter().map(|st| st.token.clone()));
834            all
835        }
836        _ => vec![],
837    }
838}
839
840#[cfg(test)]
841mod tests {
842    use super::*;
843    use axum::body::Body;
844    use axum::http::Request;
845    use tower::ServiceExt;
846
847    fn make_state(statuses: Vec<BackendStatus>) -> AdminState {
848        test_admin_state("test-gw", "1.0.0", statuses.len(), statuses)
849    }
850
851    fn healthy_backend(name: &str) -> BackendStatus {
852        BackendStatus {
853            namespace: name.to_string(),
854            healthy: true,
855            last_checked_at: Some(Utc::now()),
856            consecutive_failures: 0,
857            error: None,
858            transport: Some("http".to_string()),
859        }
860    }
861
862    fn unhealthy_backend(name: &str) -> BackendStatus {
863        BackendStatus {
864            namespace: name.to_string(),
865            healthy: false,
866            last_checked_at: Some(Utc::now()),
867            consecutive_failures: 3,
868            error: Some("ping failed".to_string()),
869            transport: Some("stdio".to_string()),
870        }
871    }
872
873    async fn make_test_proxy() -> McpProxy {
874        use tower_mcp::client::ChannelTransport;
875        use tower_mcp::{CallToolResult, McpRouter, ToolBuilder};
876
877        let router = McpRouter::new().server_info("test", "1.0.0").tool(
878            ToolBuilder::new("ping")
879                .description("Ping")
880                .handler(|_: tower_mcp::NoParams| async move { Ok(CallToolResult::text("pong")) })
881                .build(),
882        );
883
884        McpProxy::builder("test-proxy", "1.0.0")
885            .backend("test", ChannelTransport::new(router))
886            .await
887            .build_strict()
888            .await
889            .unwrap()
890    }
891
892    fn make_test_config() -> crate::config::ProxyConfig {
893        crate::config::ProxyConfig::parse(
894            r#"
895            [proxy]
896            name = "test"
897            [proxy.listen]
898
899            [[backends]]
900            name = "echo"
901            transport = "stdio"
902            command = "echo"
903            "#,
904        )
905        .unwrap()
906    }
907
908    fn make_session_handle() -> SessionHandle {
909        // Create a session handle via HttpTransport (the only public way)
910        let svc = tower::util::BoxCloneService::new(tower::service_fn(
911            |_req: tower_mcp::RouterRequest| async {
912                Ok::<_, std::convert::Infallible>(tower_mcp::RouterResponse {
913                    id: tower_mcp::protocol::RequestId::Number(1),
914                    inner: Ok(tower_mcp::protocol::McpResponse::Pong(Default::default())),
915                })
916            },
917        ));
918        let (_, handle) =
919            tower_mcp::transport::http::HttpTransport::from_service(svc).into_router_with_handle();
920        handle
921    }
922
923    async fn get_json(router: &Router, path: &str) -> serde_json::Value {
924        let resp = router
925            .clone()
926            .oneshot(Request::builder().uri(path).body(Body::empty()).unwrap())
927            .await
928            .unwrap();
929
930        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
931            .await
932            .unwrap();
933        serde_json::from_slice(&body).unwrap()
934    }
935
936    #[tokio::test]
937    async fn test_admin_state_accessors() {
938        let state = make_state(vec![healthy_backend("db/")]);
939        assert_eq!(state.proxy_name(), "test-gw");
940        assert_eq!(state.proxy_version(), "1.0.0");
941        assert_eq!(state.backend_count(), 1);
942
943        let health = state.health().await;
944        assert_eq!(health.len(), 1);
945        assert!(health[0].healthy);
946    }
947
948    #[tokio::test]
949    async fn test_health_endpoint_all_healthy() {
950        let state = make_state(vec![healthy_backend("db/"), healthy_backend("api/")]);
951        let session_handle = make_session_handle();
952        let router = admin_router(
953            state,
954            None,
955            session_handle,
956            None,
957            make_test_proxy().await,
958            &make_test_config(),
959            None,
960            std::collections::HashMap::new(),
961        );
962
963        let json = get_json(&router, "/health").await;
964        assert_eq!(json["status"], "healthy");
965        assert!(json["unhealthy_backends"].as_array().unwrap().is_empty());
966    }
967
968    #[tokio::test]
969    async fn test_health_endpoint_degraded() {
970        let state = make_state(vec![healthy_backend("db/"), unhealthy_backend("flaky/")]);
971        let session_handle = make_session_handle();
972        let router = admin_router(
973            state,
974            None,
975            session_handle,
976            None,
977            make_test_proxy().await,
978            &make_test_config(),
979            None,
980            std::collections::HashMap::new(),
981        );
982
983        let json = get_json(&router, "/health").await;
984        assert_eq!(json["status"], "degraded");
985        let unhealthy = json["unhealthy_backends"].as_array().unwrap();
986        assert_eq!(unhealthy.len(), 1);
987        assert_eq!(unhealthy[0], "flaky/");
988    }
989
990    #[tokio::test]
991    async fn test_backends_endpoint() {
992        let state = make_state(vec![healthy_backend("db/")]);
993        let session_handle = make_session_handle();
994        let router = admin_router(
995            state,
996            None,
997            session_handle,
998            None,
999            make_test_proxy().await,
1000            &make_test_config(),
1001            None,
1002            std::collections::HashMap::new(),
1003        );
1004
1005        let json = get_json(&router, "/backends").await;
1006        assert_eq!(json["proxy"]["name"], "test-gw");
1007        assert_eq!(json["proxy"]["version"], "1.0.0");
1008        assert_eq!(json["proxy"]["backend_count"], 1);
1009        assert_eq!(json["backends"].as_array().unwrap().len(), 1);
1010        assert_eq!(json["backends"][0]["namespace"], "db/");
1011        assert!(json["backends"][0]["healthy"].as_bool().unwrap());
1012    }
1013
1014    #[tokio::test]
1015    async fn test_cache_stats_no_cache() {
1016        let state = make_state(vec![]);
1017        let session_handle = make_session_handle();
1018        let router = admin_router(
1019            state,
1020            None,
1021            session_handle,
1022            None,
1023            make_test_proxy().await,
1024            &make_test_config(),
1025            None,
1026            std::collections::HashMap::new(),
1027        );
1028
1029        let json = get_json(&router, "/cache/stats").await;
1030        assert!(json.as_array().unwrap().is_empty());
1031    }
1032
1033    #[tokio::test]
1034    async fn test_cache_clear_no_cache() {
1035        let state = make_state(vec![]);
1036        let session_handle = make_session_handle();
1037        let router = admin_router(
1038            state,
1039            None,
1040            session_handle,
1041            None,
1042            make_test_proxy().await,
1043            &make_test_config(),
1044            None,
1045            std::collections::HashMap::new(),
1046        );
1047
1048        let resp = router
1049            .clone()
1050            .oneshot(
1051                Request::builder()
1052                    .method("POST")
1053                    .uri("/cache/clear")
1054                    .body(Body::empty())
1055                    .unwrap(),
1056            )
1057            .await
1058            .unwrap();
1059
1060        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
1061            .await
1062            .unwrap();
1063        assert_eq!(body.as_ref(), b"no caches configured");
1064    }
1065
1066    #[tokio::test]
1067    async fn test_metrics_endpoint_no_recorder() {
1068        let state = make_state(vec![]);
1069        let session_handle = make_session_handle();
1070        let router = admin_router(
1071            state,
1072            None,
1073            session_handle,
1074            None,
1075            make_test_proxy().await,
1076            &make_test_config(),
1077            None,
1078            std::collections::HashMap::new(),
1079        );
1080
1081        let resp = router
1082            .clone()
1083            .oneshot(
1084                Request::builder()
1085                    .uri("/metrics")
1086                    .body(Body::empty())
1087                    .unwrap(),
1088            )
1089            .await
1090            .unwrap();
1091
1092        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
1093            .await
1094            .unwrap();
1095        assert!(body.is_empty());
1096    }
1097
1098    #[tokio::test]
1099    async fn test_single_backend_health() {
1100        let state = make_state(vec![healthy_backend("db/"), unhealthy_backend("flaky/")]);
1101        let session_handle = make_session_handle();
1102        let router = admin_router(
1103            state,
1104            None,
1105            session_handle,
1106            None,
1107            make_test_proxy().await,
1108            &make_test_config(),
1109            None,
1110            std::collections::HashMap::new(),
1111        );
1112
1113        let json = get_json(&router, "/backends/db/health").await;
1114        assert_eq!(json["namespace"], "db/");
1115        assert!(json["healthy"].as_bool().unwrap());
1116    }
1117
1118    #[tokio::test]
1119    async fn test_single_backend_health_not_found() {
1120        let state = make_state(vec![healthy_backend("db/")]);
1121        let session_handle = make_session_handle();
1122        let router = admin_router(
1123            state,
1124            None,
1125            session_handle,
1126            None,
1127            make_test_proxy().await,
1128            &make_test_config(),
1129            None,
1130            std::collections::HashMap::new(),
1131        );
1132
1133        let resp = router
1134            .clone()
1135            .oneshot(
1136                Request::builder()
1137                    .uri("/backends/nonexistent/health")
1138                    .body(Body::empty())
1139                    .unwrap(),
1140            )
1141            .await
1142            .unwrap();
1143        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
1144    }
1145
1146    #[tokio::test]
1147    async fn test_aggregate_stats() {
1148        let state = make_state(vec![healthy_backend("db/"), unhealthy_backend("flaky/")]);
1149        let session_handle = make_session_handle();
1150        let router = admin_router(
1151            state,
1152            None,
1153            session_handle,
1154            None,
1155            make_test_proxy().await,
1156            &make_test_config(),
1157            None,
1158            std::collections::HashMap::new(),
1159        );
1160
1161        let json = get_json(&router, "/stats").await;
1162        assert_eq!(json["total_backends"], 2);
1163        assert_eq!(json["healthy_backends"], 1);
1164        assert_eq!(json["unhealthy_backends"], 1);
1165    }
1166
1167    #[tokio::test]
1168    async fn test_health_history_empty() {
1169        let state = make_state(vec![healthy_backend("db/")]);
1170        let session_handle = make_session_handle();
1171        let router = admin_router(
1172            state,
1173            None,
1174            session_handle,
1175            None,
1176            make_test_proxy().await,
1177            &make_test_config(),
1178            None,
1179            std::collections::HashMap::new(),
1180        );
1181
1182        let json = get_json(&router, "/backends/db/health/history").await;
1183        assert!(json.as_array().unwrap().is_empty());
1184    }
1185
1186    #[tokio::test]
1187    async fn test_health_history_with_events() {
1188        let state = make_state(vec![healthy_backend("db/")]);
1189        // Manually insert history events
1190        {
1191            let mut history = state.health_history.write().await;
1192            history.push(HealthEvent {
1193                namespace: "db/".to_string(),
1194                healthy: true,
1195                timestamp: Utc::now(),
1196            });
1197            history.push(HealthEvent {
1198                namespace: "db/".to_string(),
1199                healthy: false,
1200                timestamp: Utc::now(),
1201            });
1202            history.push(HealthEvent {
1203                namespace: "other/".to_string(),
1204                healthy: false,
1205                timestamp: Utc::now(),
1206            });
1207        }
1208
1209        let session_handle = make_session_handle();
1210        let router = admin_router(
1211            state,
1212            None,
1213            session_handle,
1214            None,
1215            make_test_proxy().await,
1216            &make_test_config(),
1217            None,
1218            std::collections::HashMap::new(),
1219        );
1220
1221        // Should only return events for "db"
1222        let json = get_json(&router, "/backends/db/health/history").await;
1223        let events = json.as_array().unwrap();
1224        assert_eq!(events.len(), 2);
1225        assert!(events[0]["healthy"].as_bool().unwrap());
1226        assert!(!events[1]["healthy"].as_bool().unwrap());
1227    }
1228
1229    #[tokio::test]
1230    async fn test_sessions_endpoint() {
1231        let state = make_state(vec![]);
1232        let session_handle = make_session_handle();
1233        let router = admin_router(
1234            state,
1235            None,
1236            session_handle,
1237            None,
1238            make_test_proxy().await,
1239            &make_test_config(),
1240            None,
1241            std::collections::HashMap::new(),
1242        );
1243
1244        let json = get_json(&router, "/sessions").await;
1245        assert_eq!(json["active_sessions"], 0);
1246    }
1247
1248    #[tokio::test]
1249    async fn test_sessions_detail_empty() {
1250        let state = make_state(vec![]);
1251        let session_handle = make_session_handle();
1252        let router = admin_router(
1253            state,
1254            None,
1255            session_handle,
1256            None,
1257            make_test_proxy().await,
1258            &make_test_config(),
1259            None,
1260            std::collections::HashMap::new(),
1261        );
1262
1263        let json = get_json(&router, "/sessions/detail").await;
1264        let sessions = json.as_array().unwrap();
1265        assert!(sessions.is_empty());
1266    }
1267
1268    #[tokio::test]
1269    async fn test_terminate_session_not_found() {
1270        let state = make_state(vec![]);
1271        let session_handle = make_session_handle();
1272        let router = admin_router(
1273            state,
1274            None,
1275            session_handle,
1276            None,
1277            make_test_proxy().await,
1278            &make_test_config(),
1279            None,
1280            std::collections::HashMap::new(),
1281        );
1282
1283        let resp = router
1284            .clone()
1285            .oneshot(
1286                Request::builder()
1287                    .method("DELETE")
1288                    .uri("/sessions/nonexistent-id")
1289                    .body(Body::empty())
1290                    .unwrap(),
1291            )
1292            .await
1293            .unwrap();
1294        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
1295
1296        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
1297            .await
1298            .unwrap();
1299        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
1300        assert!(!json["ok"].as_bool().unwrap());
1301        assert!(json["message"].as_str().unwrap().contains("not found"));
1302    }
1303
1304    #[tokio::test]
1305    async fn test_sessions_detail_with_active_session() {
1306        // Create an HTTP transport and send an initialize request to establish a session
1307        let svc = tower::util::BoxCloneService::new(tower::service_fn(
1308            |_req: tower_mcp::RouterRequest| async {
1309                Ok::<_, std::convert::Infallible>(tower_mcp::RouterResponse {
1310                    id: tower_mcp::protocol::RequestId::Number(1),
1311                    inner: Ok(tower_mcp::protocol::McpResponse::Initialize(
1312                        tower_mcp::protocol::InitializeResult {
1313                            protocol_version: "2025-03-26".to_string(),
1314                            server_info: tower_mcp::protocol::Implementation {
1315                                name: "test".to_string(),
1316                                version: "1.0.0".to_string(),
1317                                ..Default::default()
1318                            },
1319                            capabilities: Default::default(),
1320                            instructions: None,
1321                            meta: None,
1322                        },
1323                    )),
1324                })
1325            },
1326        ));
1327
1328        let (http_router, session_handle) =
1329            tower_mcp::transport::http::HttpTransport::from_service(svc).into_router_with_handle();
1330
1331        // Send an initialize request to create a session
1332        let init_body = serde_json::json!({
1333            "jsonrpc": "2.0",
1334            "id": 1,
1335            "method": "initialize",
1336            "params": {
1337                "protocolVersion": "2025-03-26",
1338                "clientInfo": { "name": "test", "version": "1.0.0" },
1339                "capabilities": {}
1340            }
1341        });
1342
1343        let resp = http_router
1344            .clone()
1345            .oneshot(
1346                Request::builder()
1347                    .method("POST")
1348                    .uri("/")
1349                    .header("Content-Type", "application/json")
1350                    .body(Body::from(serde_json::to_string(&init_body).unwrap()))
1351                    .unwrap(),
1352            )
1353            .await
1354            .unwrap();
1355        assert_eq!(resp.status(), 200);
1356
1357        // Extract session ID from response header
1358        let session_id = resp
1359            .headers()
1360            .get("mcp-session-id")
1361            .expect("initialize response should include mcp-session-id header")
1362            .to_str()
1363            .unwrap()
1364            .to_string();
1365
1366        // Verify sessions are visible via admin endpoint
1367        let state = make_state(vec![]);
1368        let admin = admin_router(
1369            state,
1370            None,
1371            session_handle.clone(),
1372            None,
1373            make_test_proxy().await,
1374            &make_test_config(),
1375            None,
1376            std::collections::HashMap::new(),
1377        );
1378
1379        // Check session count
1380        let json = get_json(&admin, "/sessions").await;
1381        assert_eq!(json["active_sessions"], 1);
1382
1383        // Check session detail
1384        let json = get_json(&admin, "/sessions/detail").await;
1385        let sessions = json.as_array().unwrap();
1386        assert_eq!(sessions.len(), 1);
1387        assert!(!sessions[0]["id"].as_str().unwrap().is_empty());
1388        assert!(sessions[0]["uptime_seconds"].is_number());
1389        assert!(sessions[0]["idle_seconds"].is_number());
1390
1391        // Terminate the session
1392        let resp = admin
1393            .clone()
1394            .oneshot(
1395                Request::builder()
1396                    .method("DELETE")
1397                    .uri(format!("/sessions/{}", session_id))
1398                    .body(Body::empty())
1399                    .unwrap(),
1400            )
1401            .await
1402            .unwrap();
1403        assert_eq!(resp.status(), StatusCode::OK);
1404
1405        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
1406            .await
1407            .unwrap();
1408        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
1409        assert!(json["ok"].as_bool().unwrap());
1410
1411        // Verify session is gone
1412        let json = get_json(&admin, "/sessions").await;
1413        assert_eq!(json["active_sessions"], 0);
1414
1415        // Terminating again returns not found
1416        let resp = admin
1417            .clone()
1418            .oneshot(
1419                Request::builder()
1420                    .method("DELETE")
1421                    .uri(format!("/sessions/{}", session_id))
1422                    .body(Body::empty())
1423                    .unwrap(),
1424            )
1425            .await
1426            .unwrap();
1427        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
1428    }
1429}