Skip to main content

mcp_proxy/
admin.rs

1//! Admin HTTP API for proxy introspection and management.
2//!
3//! The admin API runs on the same HTTP server as the MCP endpoint and is
4//! mounted under the `/admin` path prefix. It provides read-only
5//! introspection endpoints and management operations for backends, sessions,
6//! caching, and configuration.
7//!
8//! # Endpoint reference
9//!
10//! | Method | Path | Description |
11//! |--------|------|-------------|
12//! | `GET` | `/admin/health` | Aggregated health status of all backends |
13//! | `GET` | `/admin/backends` | List all backends with metadata |
14//! | `GET` | `/admin/backends/{name}/health` | Health status for a single backend |
15//! | `GET` | `/admin/backends/{name}/health/history` | Health state transition history for a backend |
16//! | `POST` | `/admin/backends/add` | Add a new backend at runtime |
17//! | `PUT` | `/admin/backends/{name}` | Update an existing backend's configuration |
18//! | `DELETE` | `/admin/backends/{name}` | Remove a backend at runtime |
19//! | `GET` | `/admin/sessions` | List active MCP sessions (summary) |
20//! | `GET` | `/admin/sessions/detail` | List active MCP sessions (full detail) |
21//! | `DELETE` | `/admin/sessions/{id}` | Terminate a session by ID |
22//! | `GET` | `/admin/stats` | Aggregated proxy statistics |
23//! | `GET` | `/admin/cache/stats` | Cache hit/miss statistics |
24//! | `POST` | `/admin/cache/clear` | Flush the response cache |
25//! | `GET` | `/admin/config` | Current proxy configuration (sanitized) |
26//! | `PUT` | `/admin/config` | Hot-reload proxy configuration |
27//! | `POST` | `/admin/config/validate` | Validate a config without applying it |
28//! | `GET` | `/admin/metrics` | Prometheus metrics scrape endpoint (feature `metrics`) |
29//! | `GET` | `/admin/openapi.json` | OpenAPI spec (feature `openapi`) |
30//!
31//! # Health checking
32//!
33//! Backend health is monitored by a background task ([`spawn_health_checker`])
34//! that periodically sends `ping` requests to each backend and records
35//! pass/fail status. The cached results are stored in [`AdminState`] and
36//! served by the `/admin/health` endpoint without per-request latency.
37//!
38//! Health state transitions (healthy -> unhealthy or vice versa) are recorded
39//! in a ring buffer of [`HealthEvent`] entries (capped at 100 events). The
40//! per-backend history is available at `/admin/backends/{name}/health/history`.
41//!
42//! # Session management
43//!
44//! The proxy tracks active MCP sessions (SSE and Streamable HTTP transports).
45//! The `/admin/sessions` endpoint lists session IDs and metadata, while
46//! `/admin/sessions/{id}` (DELETE) terminates a session by closing its
47//! transport.
48//!
49//! # Cache management
50//!
51//! When response caching is enabled, `/admin/cache/stats` reports hit/miss
52//! counts and `/admin/cache/clear` flushes all cached entries.
53//!
54//! # Authentication
55//!
56//! Admin endpoints are protected by a token-based auth scheme. The proxy
57//! resolves the admin token using a fallback chain:
58//!
59//! 1. `security.admin_token` -- explicit bearer token for admin access.
60//! 2. Proxy's bearer auth tokens -- reused when `auth.type = "bearer"`.
61//! 3. If neither is set, the admin API is open (suitable for local/dev use).
62//!
63//! When `auth.type` is `jwt` or `oauth` there is no static-token fallback
64//! (step 2 only applies to bearer auth), so `security.admin_token` is
65//! **required** -- config validation rejects its absence to avoid leaving the
66//! admin plane unauthenticated.
67//!
68//! The token supports `${ENV_VAR}` syntax for environment variable expansion.
69
70use std::collections::HashMap;
71use std::sync::Arc;
72use std::time::Duration;
73
74use axum::Router;
75use axum::extract::{Extension, Path};
76use axum::http::StatusCode;
77use axum::response::{IntoResponse, Json};
78use axum::routing::{delete, get, post};
79use chrono::{DateTime, Utc};
80use serde::{Deserialize, Serialize};
81use tokio::sync::RwLock;
82use tower_mcp::SessionHandle;
83use tower_mcp::proxy::McpProxy;
84
85/// Cached health status, updated periodically by a background task.
86#[derive(Clone)]
87pub struct AdminState {
88    health: Arc<RwLock<Vec<BackendStatus>>>,
89    health_history: Arc<RwLock<Vec<HealthEvent>>>,
90    proxy_name: String,
91    proxy_version: String,
92    backend_count: usize,
93}
94
95/// A health state transition event.
96#[derive(Debug, Clone, Serialize)]
97#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
98pub struct HealthEvent {
99    /// Backend namespace.
100    pub namespace: String,
101    /// New health state.
102    pub healthy: bool,
103    /// When the transition occurred.
104    pub timestamp: DateTime<Utc>,
105}
106
107/// Maximum health history events to retain.
108const MAX_HEALTH_HISTORY: usize = 100;
109
110impl AdminState {
111    /// Get a snapshot of backend health status.
112    pub async fn health(&self) -> Vec<BackendStatus> {
113        self.health.read().await.clone()
114    }
115
116    /// Proxy name from config.
117    pub fn proxy_name(&self) -> &str {
118        &self.proxy_name
119    }
120
121    /// Proxy version from config.
122    pub fn proxy_version(&self) -> &str {
123        &self.proxy_version
124    }
125
126    /// Number of configured backends.
127    pub fn backend_count(&self) -> usize {
128        self.backend_count
129    }
130}
131
132/// Health status of a single backend, updated by the background health checker.
133#[derive(Serialize, Clone)]
134#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
135pub struct BackendStatus {
136    /// Backend namespace (e.g. "db/").
137    pub namespace: String,
138    /// Whether the backend responded to the last health check.
139    pub healthy: bool,
140    /// Timestamp of the last health check.
141    pub last_checked_at: Option<DateTime<Utc>>,
142    /// Number of consecutive failed health checks.
143    pub consecutive_failures: u32,
144    /// Last error message from a failed health check.
145    pub error: Option<String>,
146    /// Transport type (e.g. "stdio", "http").
147    pub transport: Option<String>,
148}
149
150#[derive(Serialize)]
151#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
152struct AdminBackendsResponse {
153    proxy: ProxyInfo,
154    backends: Vec<BackendStatus>,
155}
156
157#[derive(Serialize)]
158#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
159struct ProxyInfo {
160    name: String,
161    version: String,
162    backend_count: usize,
163    active_sessions: usize,
164}
165
166/// Per-backend metadata passed in from config at startup.
167#[derive(Clone)]
168pub struct BackendMeta {
169    /// Transport type string (e.g. "stdio", "http").
170    pub transport: String,
171}
172
173/// Spawn a background task that periodically health-checks backends.
174/// Returns the AdminState that admin endpoints read from.
175pub fn spawn_health_checker(
176    proxy: McpProxy,
177    proxy_name: String,
178    proxy_version: String,
179    backend_count: usize,
180    backend_meta: HashMap<String, BackendMeta>,
181) -> AdminState {
182    let health: Arc<RwLock<Vec<BackendStatus>>> = Arc::new(RwLock::new(Vec::new()));
183    let health_writer = Arc::clone(&health);
184    let health_history: Arc<RwLock<Vec<HealthEvent>>> = Arc::new(RwLock::new(Vec::new()));
185    let history_writer = Arc::clone(&health_history);
186
187    std::thread::spawn(move || {
188        let rt = tokio::runtime::Builder::new_current_thread()
189            .enable_all()
190            .build()
191            .expect("admin health check runtime");
192
193        let mut failure_counts: HashMap<String, u32> = HashMap::new();
194        // Track previous health state for transition detection
195        let mut prev_healthy: HashMap<String, bool> = HashMap::new();
196
197        rt.block_on(async move {
198            loop {
199                let results = proxy.health_check().await;
200                let now = Utc::now();
201                let mut transitions = Vec::new();
202
203                let statuses: Vec<BackendStatus> = results
204                    .into_iter()
205                    .map(|h| {
206                        let count = failure_counts.entry(h.namespace.clone()).or_insert(0);
207                        if h.healthy {
208                            *count = 0;
209                        } else {
210                            *count += 1;
211                        }
212
213                        // Detect health state transitions
214                        let prev = prev_healthy.get(&h.namespace).copied();
215                        if prev != Some(h.healthy) {
216                            transitions.push(HealthEvent {
217                                namespace: h.namespace.clone(),
218                                healthy: h.healthy,
219                                timestamp: now,
220                            });
221                            prev_healthy.insert(h.namespace.clone(), h.healthy);
222                        }
223
224                        let meta = backend_meta.get(&h.namespace);
225                        BackendStatus {
226                            namespace: h.namespace,
227                            healthy: h.healthy,
228                            last_checked_at: Some(now),
229                            consecutive_failures: *count,
230                            error: if h.healthy {
231                                None
232                            } else {
233                                Some("ping failed".to_string())
234                            },
235                            transport: meta.map(|m| m.transport.clone()),
236                        }
237                    })
238                    .collect();
239
240                *health_writer.write().await = statuses;
241
242                // Record transitions to history
243                if !transitions.is_empty() {
244                    let mut history = history_writer.write().await;
245                    history.extend(transitions);
246                    // Trim to max size
247                    if history.len() > MAX_HEALTH_HISTORY {
248                        let excess = history.len() - MAX_HEALTH_HISTORY;
249                        history.drain(..excess);
250                    }
251                }
252
253                tokio::time::sleep(Duration::from_secs(10)).await;
254            }
255        });
256    });
257
258    AdminState {
259        health,
260        health_history,
261        proxy_name,
262        proxy_version,
263        backend_count,
264    }
265}
266
267async fn handle_backends(
268    Extension(state): Extension<AdminState>,
269    Extension(session_handle): Extension<SessionHandle>,
270) -> Json<AdminBackendsResponse> {
271    let backends = state.health.read().await.clone();
272    let active_sessions = session_handle.session_count().await;
273
274    Json(AdminBackendsResponse {
275        proxy: ProxyInfo {
276            name: state.proxy_name,
277            version: state.proxy_version,
278            backend_count: state.backend_count,
279            active_sessions,
280        },
281        backends,
282    })
283}
284
285async fn handle_health(Extension(state): Extension<AdminState>) -> Json<HealthResponse> {
286    let backends = state.health.read().await;
287    let all_healthy = backends.iter().all(|b| b.healthy);
288    let unhealthy: Vec<String> = backends
289        .iter()
290        .filter(|b| !b.healthy)
291        .map(|b| b.namespace.clone())
292        .collect();
293    Json(HealthResponse {
294        status: if all_healthy { "healthy" } else { "degraded" }.to_string(),
295        unhealthy_backends: unhealthy,
296    })
297}
298
299#[derive(Serialize)]
300#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
301struct HealthResponse {
302    status: String,
303    unhealthy_backends: Vec<String>,
304}
305
306#[cfg(feature = "metrics")]
307async fn handle_metrics(
308    Extension(handle): Extension<Option<metrics_exporter_prometheus::PrometheusHandle>>,
309) -> impl IntoResponse {
310    match handle {
311        Some(h) => h.render(),
312        None => String::new(),
313    }
314}
315
316#[cfg(not(feature = "metrics"))]
317async fn handle_metrics() -> impl IntoResponse {
318    String::new()
319}
320
321async fn handle_cache_stats(
322    Extension(cache_handle): Extension<Option<crate::cache::CacheHandle>>,
323) -> Json<Vec<crate::cache::CacheStatsSnapshot>> {
324    match cache_handle {
325        Some(h) => Json(h.stats().await),
326        None => Json(vec![]),
327    }
328}
329
330async fn handle_cache_clear(
331    Extension(cache_handle): Extension<Option<crate::cache::CacheHandle>>,
332) -> &'static str {
333    if let Some(h) = cache_handle {
334        h.clear().await;
335        "caches cleared"
336    } else {
337        "no caches configured"
338    }
339}
340
341/// Create an `AdminState` directly for testing.
342#[cfg(test)]
343pub(crate) fn test_admin_state(
344    proxy_name: &str,
345    proxy_version: &str,
346    backend_count: usize,
347    statuses: Vec<BackendStatus>,
348) -> AdminState {
349    AdminState {
350        health: Arc::new(RwLock::new(statuses)),
351        health_history: Arc::new(RwLock::new(Vec::new())),
352        proxy_name: proxy_name.to_string(),
353        proxy_version: proxy_version.to_string(),
354        backend_count,
355    }
356}
357
358/// Metrics handle type -- wraps the Prometheus handle when the feature is enabled.
359#[cfg(feature = "metrics")]
360pub type MetricsHandle = Option<metrics_exporter_prometheus::PrometheusHandle>;
361/// Metrics handle type -- no-op when the metrics feature is disabled.
362#[cfg(not(feature = "metrics"))]
363pub type MetricsHandle = Option<()>;
364
365// ---------------------------------------------------------------------------
366// Management API handlers
367// ---------------------------------------------------------------------------
368
369/// Request body for adding an HTTP backend via REST.
370#[derive(Debug, Deserialize)]
371#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
372struct AddBackendRequest {
373    /// Backend name (becomes the namespace prefix).
374    name: String,
375    /// Backend URL.
376    url: String,
377    /// Optional bearer token for the backend.
378    bearer_token: Option<String>,
379}
380
381/// Response body for backend operations.
382#[derive(Serialize)]
383#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
384struct BackendOpResponse {
385    ok: bool,
386    message: String,
387}
388
389async fn handle_add_backend(
390    Extension(proxy): Extension<McpProxy>,
391    Json(req): Json<AddBackendRequest>,
392) -> (StatusCode, Json<BackendOpResponse>) {
393    let mut transport = tower_mcp::client::HttpClientTransport::new(&req.url);
394    if let Some(token) = &req.bearer_token {
395        transport = transport.bearer_token(token);
396    }
397
398    match proxy.add_backend(&req.name, transport).await {
399        Ok(()) => (
400            StatusCode::CREATED,
401            Json(BackendOpResponse {
402                ok: true,
403                message: format!("Backend '{}' added", req.name),
404            }),
405        ),
406        Err(e) => (
407            StatusCode::CONFLICT,
408            Json(BackendOpResponse {
409                ok: false,
410                message: format!("Failed to add backend: {e}"),
411            }),
412        ),
413    }
414}
415
416async fn handle_remove_backend(
417    Extension(proxy): Extension<McpProxy>,
418    Path(name): Path<String>,
419) -> (StatusCode, Json<BackendOpResponse>) {
420    if proxy.remove_backend(&name).await {
421        (
422            StatusCode::OK,
423            Json(BackendOpResponse {
424                ok: true,
425                message: format!("Backend '{}' removed", name),
426            }),
427        )
428    } else {
429        (
430            StatusCode::NOT_FOUND,
431            Json(BackendOpResponse {
432                ok: false,
433                message: format!("Backend '{}' not found", name),
434            }),
435        )
436    }
437}
438
439async fn handle_get_config(
440    Extension(config_toml): Extension<std::sync::Arc<String>>,
441) -> impl IntoResponse {
442    config_toml.as_str().to_string()
443}
444
445async fn handle_validate_config(body: String) -> (StatusCode, Json<BackendOpResponse>) {
446    match crate::config::ProxyConfig::parse(&body) {
447        Ok(config) => (
448            StatusCode::OK,
449            Json(BackendOpResponse {
450                ok: true,
451                message: format!("Valid config with {} backends", config.backends.len()),
452            }),
453        ),
454        Err(e) => (
455            StatusCode::BAD_REQUEST,
456            Json(BackendOpResponse {
457                ok: false,
458                message: format!("Invalid config: {e}"),
459            }),
460        ),
461    }
462}
463
464async fn handle_list_sessions(
465    Extension(session_handle): Extension<SessionHandle>,
466) -> Json<SessionsResponse> {
467    Json(SessionsResponse {
468        active_sessions: session_handle.session_count().await,
469    })
470}
471
472#[derive(Serialize)]
473#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
474struct SessionsResponse {
475    active_sessions: usize,
476}
477
478async fn handle_backend_health_history(
479    Extension(state): Extension<AdminState>,
480    Path(name): Path<String>,
481) -> Json<Vec<HealthEvent>> {
482    let history = state.health_history.read().await;
483    let filtered: Vec<HealthEvent> = history
484        .iter()
485        .filter(|e| {
486            e.namespace == name
487                || e.namespace == format!("{name}/")
488                || e.namespace.trim_end_matches('/') == name
489        })
490        .cloned()
491        .collect();
492    Json(filtered)
493}
494
495async fn handle_update_backend(
496    Extension(proxy): Extension<McpProxy>,
497    Path(name): Path<String>,
498    Json(req): Json<UpdateBackendRequest>,
499) -> (StatusCode, Json<BackendOpResponse>) {
500    // Remove existing backend
501    if !proxy.remove_backend(&name).await {
502        return (
503            StatusCode::NOT_FOUND,
504            Json(BackendOpResponse {
505                ok: false,
506                message: format!("Backend '{name}' not found"),
507            }),
508        );
509    }
510
511    // Re-add with new config
512    let mut transport = tower_mcp::client::HttpClientTransport::new(&req.url);
513    if let Some(token) = &req.bearer_token {
514        transport = transport.bearer_token(token);
515    }
516
517    match proxy.add_backend(&name, transport).await {
518        Ok(()) => (
519            StatusCode::OK,
520            Json(BackendOpResponse {
521                ok: true,
522                message: format!("Backend '{name}' updated"),
523            }),
524        ),
525        Err(e) => (
526            StatusCode::INTERNAL_SERVER_ERROR,
527            Json(BackendOpResponse {
528                ok: false,
529                message: format!("Failed to re-add backend after removal: {e}"),
530            }),
531        ),
532    }
533}
534
535#[derive(Debug, Deserialize)]
536#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
537struct UpdateBackendRequest {
538    /// New backend URL.
539    url: String,
540    /// Optional bearer token for the backend.
541    bearer_token: Option<String>,
542}
543
544async fn handle_single_backend_health(
545    Extension(state): Extension<AdminState>,
546    Path(name): Path<String>,
547) -> Result<Json<BackendStatus>, StatusCode> {
548    let backends = state.health.read().await;
549    // Match by namespace (with or without trailing separator)
550    backends
551        .iter()
552        .find(|b| {
553            b.namespace == name
554                || b.namespace == format!("{name}/")
555                || b.namespace.trim_end_matches('/') == name
556        })
557        .cloned()
558        .map(Json)
559        .ok_or(StatusCode::NOT_FOUND)
560}
561
562async fn handle_aggregate_stats(
563    Extension(state): Extension<AdminState>,
564    Extension(session_handle): Extension<SessionHandle>,
565) -> Json<AggregateStats> {
566    let backends = state.health.read().await;
567    let total = backends.len();
568    let healthy = backends.iter().filter(|b| b.healthy).count();
569    let active_sessions = session_handle.session_count().await;
570    Json(AggregateStats {
571        total_backends: total,
572        healthy_backends: healthy,
573        unhealthy_backends: total - healthy,
574        active_sessions,
575    })
576}
577
578#[derive(Serialize)]
579#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
580struct AggregateStats {
581    total_backends: usize,
582    healthy_backends: usize,
583    unhealthy_backends: usize,
584    active_sessions: usize,
585}
586
587async fn handle_list_sessions_detail(
588    Extension(session_handle): Extension<SessionHandle>,
589) -> Json<Vec<SessionInfoResponse>> {
590    let sessions = session_handle.list_sessions().await;
591    Json(
592        sessions
593            .into_iter()
594            .map(|s| SessionInfoResponse {
595                id: s.id,
596                uptime_seconds: s.created_at.as_secs(),
597                idle_seconds: s.last_activity.as_secs(),
598            })
599            .collect(),
600    )
601}
602
603#[derive(Serialize)]
604#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
605struct SessionInfoResponse {
606    id: String,
607    uptime_seconds: u64,
608    idle_seconds: u64,
609}
610
611async fn handle_terminate_session(
612    Extension(session_handle): Extension<SessionHandle>,
613    Path(id): Path<String>,
614) -> (StatusCode, Json<BackendOpResponse>) {
615    if session_handle.terminate_session(&id).await {
616        (
617            StatusCode::OK,
618            Json(BackendOpResponse {
619                ok: true,
620                message: format!("Session '{id}' terminated"),
621            }),
622        )
623    } else {
624        (
625            StatusCode::NOT_FOUND,
626            Json(BackendOpResponse {
627                ok: false,
628                message: format!("Session '{id}' not found"),
629            }),
630        )
631    }
632}
633
634async fn handle_update_config(
635    Extension(config_path): Extension<Option<std::path::PathBuf>>,
636    body: String,
637) -> (StatusCode, Json<BackendOpResponse>) {
638    // Detect format from config file extension (or try TOML then YAML)
639    let is_yaml = config_path
640        .as_ref()
641        .and_then(|p| p.extension())
642        .is_some_and(|ext| ext == "yaml" || ext == "yml");
643
644    let config = if is_yaml {
645        #[cfg(feature = "yaml")]
646        {
647            crate::config::ProxyConfig::parse_yaml(&body)
648        }
649        #[cfg(not(feature = "yaml"))]
650        {
651            Err(anyhow::anyhow!("YAML support requires the 'yaml' feature"))
652        }
653    } else {
654        crate::config::ProxyConfig::parse(&body)
655    };
656
657    let config = match config {
658        Ok(c) => c,
659        Err(e) => {
660            return (
661                StatusCode::BAD_REQUEST,
662                Json(BackendOpResponse {
663                    ok: false,
664                    message: format!("Invalid config: {e}"),
665                }),
666            );
667        }
668    };
669
670    // Write to disk if we have a config path (triggers hot reload if enabled)
671    let Some(path) = config_path else {
672        return (
673            StatusCode::BAD_REQUEST,
674            Json(BackendOpResponse {
675                ok: false,
676                message: "No config file path available (running in --from-mcp-json mode?)"
677                    .to_string(),
678            }),
679        );
680    };
681
682    if let Err(e) = std::fs::write(&path, &body) {
683        return (
684            StatusCode::INTERNAL_SERVER_ERROR,
685            Json(BackendOpResponse {
686                ok: false,
687                message: format!("Failed to write config: {e}"),
688            }),
689        );
690    }
691
692    (
693        StatusCode::OK,
694        Json(BackendOpResponse {
695            ok: true,
696            message: format!(
697                "Config updated ({} backends). Hot reload will apply changes if enabled.",
698                config.backends.len()
699            ),
700        }),
701    )
702}
703
704async fn handle_circuit_breakers(
705    Extension(handles): Extension<Arc<std::collections::HashMap<String, crate::proxy::CbHandle>>>,
706) -> Json<Vec<CircuitBreakerStatus>> {
707    let mut statuses = Vec::new();
708    for (name, handle) in handles.iter() {
709        statuses.push(CircuitBreakerStatus {
710            backend: name.clone(),
711            state: format!("{:?}", handle.state()),
712            health: handle.health_status().to_string(),
713        });
714    }
715    statuses.sort_by(|a, b| a.backend.cmp(&b.backend));
716    Json(statuses)
717}
718
719#[derive(Serialize)]
720#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
721struct CircuitBreakerStatus {
722    backend: String,
723    state: String,
724    health: String,
725}
726
727/// OpenAPI spec for the admin API.
728///
729/// Available at `GET /admin/openapi.json` when the `openapi` feature is enabled.
730#[cfg(feature = "openapi")]
731#[derive(utoipa::OpenApi)]
732#[openapi(
733    info(
734        title = "mcp-proxy Admin API",
735        description = "REST API for managing and monitoring the MCP proxy.",
736        version = "0.1.0",
737    ),
738    components(schemas(
739        AdminBackendsResponse,
740        ProxyInfo,
741        BackendStatus,
742        HealthResponse,
743        crate::cache::CacheStatsSnapshot,
744        SessionsResponse,
745        AddBackendRequest,
746        BackendOpResponse,
747    ))
748)]
749struct ApiDoc;
750
751#[cfg(feature = "openapi")]
752async fn handle_openapi() -> impl IntoResponse {
753    axum::Json(<ApiDoc as utoipa::OpenApi>::openapi())
754}
755
756/// Build the admin API router.
757#[allow(clippy::too_many_arguments)]
758pub fn admin_router(
759    state: AdminState,
760    metrics_handle: MetricsHandle,
761    session_handle: SessionHandle,
762    cache_handle: Option<crate::cache::CacheHandle>,
763    proxy: McpProxy,
764    config: &crate::config::ProxyConfig,
765    config_path: Option<std::path::PathBuf>,
766    cb_handles: std::collections::HashMap<String, crate::proxy::CbHandle>,
767) -> Router {
768    let config_toml = std::sync::Arc::new(toml::to_string_pretty(config).unwrap_or_default());
769
770    let router = Router::new()
771        // Read-only endpoints
772        .route("/backends", get(handle_backends))
773        .route("/health", get(handle_health))
774        .route("/cache/stats", get(handle_cache_stats))
775        .route("/cache/clear", axum::routing::post(handle_cache_clear))
776        .route("/metrics", get(handle_metrics))
777        .route("/sessions", get(handle_list_sessions))
778        .route("/sessions/detail", get(handle_list_sessions_detail))
779        .route("/sessions/{id}", delete(handle_terminate_session))
780        .route("/stats", get(handle_aggregate_stats))
781        .route("/circuit-breakers", get(handle_circuit_breakers))
782        .route("/config", get(handle_get_config).put(handle_update_config))
783        .route("/config/validate", post(handle_validate_config))
784        // Per-backend endpoints
785        .route("/backends/{name}/health", get(handle_single_backend_health))
786        .route(
787            "/backends/{name}/health/history",
788            get(handle_backend_health_history),
789        )
790        // Management endpoints
791        .route("/backends/add", post(handle_add_backend))
792        .route(
793            "/backends/{name}",
794            delete(handle_remove_backend).put(handle_update_backend),
795        )
796        .layer(Extension(state))
797        .layer(Extension(session_handle))
798        .layer(Extension(cache_handle))
799        .layer(Extension(proxy))
800        .layer(Extension(config_toml))
801        .layer(Extension(config_path))
802        .layer(Extension(Arc::new(cb_handles)));
803
804    #[cfg(feature = "metrics")]
805    let router = router.layer(Extension(metrics_handle));
806    #[cfg(not(feature = "metrics"))]
807    let _ = metrics_handle;
808
809    #[cfg(feature = "openapi")]
810    let router = router.route("/openapi.json", get(handle_openapi));
811
812    // Admin API auth: admin_token takes priority, then fall back to proxy bearer tokens
813    let admin_tokens = resolve_admin_tokens(config);
814    if !admin_tokens.is_empty() {
815        tracing::info!(token_count = admin_tokens.len(), "Admin API auth enabled");
816        let validator = tower_mcp::auth::StaticBearerValidator::new(admin_tokens);
817        let layer = tower_mcp::auth::AuthLayer::new(validator);
818        router.layer(layer)
819    } else {
820        router
821    }
822}
823
824/// Resolve admin auth tokens: admin_token if set, otherwise proxy bearer tokens.
825fn resolve_admin_tokens(config: &crate::config::ProxyConfig) -> Vec<String> {
826    // Explicit admin token takes priority
827    if let Some(ref token) = config.security.admin_token {
828        return vec![token.clone()];
829    }
830
831    // Fall back to proxy bearer tokens
832    match &config.auth {
833        Some(crate::config::AuthConfig::Bearer {
834            tokens,
835            scoped_tokens,
836        }) => {
837            let mut all: Vec<String> = tokens.clone();
838            all.extend(scoped_tokens.iter().map(|st| st.token.clone()));
839            all
840        }
841        _ => vec![],
842    }
843}
844
845#[cfg(test)]
846mod tests {
847    use super::*;
848    use axum::body::Body;
849    use axum::http::Request;
850    use tower::ServiceExt;
851
852    fn make_state(statuses: Vec<BackendStatus>) -> AdminState {
853        test_admin_state("test-gw", "1.0.0", statuses.len(), statuses)
854    }
855
856    fn healthy_backend(name: &str) -> BackendStatus {
857        BackendStatus {
858            namespace: name.to_string(),
859            healthy: true,
860            last_checked_at: Some(Utc::now()),
861            consecutive_failures: 0,
862            error: None,
863            transport: Some("http".to_string()),
864        }
865    }
866
867    fn unhealthy_backend(name: &str) -> BackendStatus {
868        BackendStatus {
869            namespace: name.to_string(),
870            healthy: false,
871            last_checked_at: Some(Utc::now()),
872            consecutive_failures: 3,
873            error: Some("ping failed".to_string()),
874            transport: Some("stdio".to_string()),
875        }
876    }
877
878    async fn make_test_proxy() -> McpProxy {
879        use tower_mcp::client::ChannelTransport;
880        use tower_mcp::{CallToolResult, McpRouter, ToolBuilder};
881
882        let router = McpRouter::new().server_info("test", "1.0.0").tool(
883            ToolBuilder::new("ping")
884                .description("Ping")
885                .handler(|_: tower_mcp::NoParams| async move { Ok(CallToolResult::text("pong")) })
886                .build(),
887        );
888
889        McpProxy::builder("test-proxy", "1.0.0")
890            .backend("test", ChannelTransport::new(router))
891            .await
892            .build_strict()
893            .await
894            .unwrap()
895    }
896
897    fn make_test_config() -> crate::config::ProxyConfig {
898        crate::config::ProxyConfig::parse(
899            r#"
900            [proxy]
901            name = "test"
902            [proxy.listen]
903
904            [[backends]]
905            name = "echo"
906            transport = "stdio"
907            command = "echo"
908            "#,
909        )
910        .unwrap()
911    }
912
913    fn make_session_handle() -> SessionHandle {
914        // Create a session handle via HttpTransport (the only public way)
915        let svc = tower::util::BoxCloneService::new(tower::service_fn(
916            |_req: tower_mcp::RouterRequest| async {
917                Ok::<_, std::convert::Infallible>(tower_mcp::RouterResponse {
918                    id: tower_mcp::protocol::RequestId::Number(1),
919                    inner: Ok(tower_mcp::protocol::McpResponse::Pong(Default::default())),
920                })
921            },
922        ));
923        let (_, handle) =
924            tower_mcp::transport::http::HttpTransport::from_service(svc).into_router_with_handle();
925        handle
926    }
927
928    async fn get_json(router: &Router, path: &str) -> serde_json::Value {
929        let resp = router
930            .clone()
931            .oneshot(Request::builder().uri(path).body(Body::empty()).unwrap())
932            .await
933            .unwrap();
934
935        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
936            .await
937            .unwrap();
938        serde_json::from_slice(&body).unwrap()
939    }
940
941    #[tokio::test]
942    async fn test_admin_state_accessors() {
943        let state = make_state(vec![healthy_backend("db/")]);
944        assert_eq!(state.proxy_name(), "test-gw");
945        assert_eq!(state.proxy_version(), "1.0.0");
946        assert_eq!(state.backend_count(), 1);
947
948        let health = state.health().await;
949        assert_eq!(health.len(), 1);
950        assert!(health[0].healthy);
951    }
952
953    #[tokio::test]
954    async fn test_health_endpoint_all_healthy() {
955        let state = make_state(vec![healthy_backend("db/"), healthy_backend("api/")]);
956        let session_handle = make_session_handle();
957        let router = admin_router(
958            state,
959            None,
960            session_handle,
961            None,
962            make_test_proxy().await,
963            &make_test_config(),
964            None,
965            std::collections::HashMap::new(),
966        );
967
968        let json = get_json(&router, "/health").await;
969        assert_eq!(json["status"], "healthy");
970        assert!(json["unhealthy_backends"].as_array().unwrap().is_empty());
971    }
972
973    #[tokio::test]
974    async fn test_health_endpoint_degraded() {
975        let state = make_state(vec![healthy_backend("db/"), unhealthy_backend("flaky/")]);
976        let session_handle = make_session_handle();
977        let router = admin_router(
978            state,
979            None,
980            session_handle,
981            None,
982            make_test_proxy().await,
983            &make_test_config(),
984            None,
985            std::collections::HashMap::new(),
986        );
987
988        let json = get_json(&router, "/health").await;
989        assert_eq!(json["status"], "degraded");
990        let unhealthy = json["unhealthy_backends"].as_array().unwrap();
991        assert_eq!(unhealthy.len(), 1);
992        assert_eq!(unhealthy[0], "flaky/");
993    }
994
995    #[tokio::test]
996    async fn test_backends_endpoint() {
997        let state = make_state(vec![healthy_backend("db/")]);
998        let session_handle = make_session_handle();
999        let router = admin_router(
1000            state,
1001            None,
1002            session_handle,
1003            None,
1004            make_test_proxy().await,
1005            &make_test_config(),
1006            None,
1007            std::collections::HashMap::new(),
1008        );
1009
1010        let json = get_json(&router, "/backends").await;
1011        assert_eq!(json["proxy"]["name"], "test-gw");
1012        assert_eq!(json["proxy"]["version"], "1.0.0");
1013        assert_eq!(json["proxy"]["backend_count"], 1);
1014        assert_eq!(json["backends"].as_array().unwrap().len(), 1);
1015        assert_eq!(json["backends"][0]["namespace"], "db/");
1016        assert!(json["backends"][0]["healthy"].as_bool().unwrap());
1017    }
1018
1019    #[tokio::test]
1020    async fn test_cache_stats_no_cache() {
1021        let state = make_state(vec![]);
1022        let session_handle = make_session_handle();
1023        let router = admin_router(
1024            state,
1025            None,
1026            session_handle,
1027            None,
1028            make_test_proxy().await,
1029            &make_test_config(),
1030            None,
1031            std::collections::HashMap::new(),
1032        );
1033
1034        let json = get_json(&router, "/cache/stats").await;
1035        assert!(json.as_array().unwrap().is_empty());
1036    }
1037
1038    #[tokio::test]
1039    async fn test_cache_clear_no_cache() {
1040        let state = make_state(vec![]);
1041        let session_handle = make_session_handle();
1042        let router = admin_router(
1043            state,
1044            None,
1045            session_handle,
1046            None,
1047            make_test_proxy().await,
1048            &make_test_config(),
1049            None,
1050            std::collections::HashMap::new(),
1051        );
1052
1053        let resp = router
1054            .clone()
1055            .oneshot(
1056                Request::builder()
1057                    .method("POST")
1058                    .uri("/cache/clear")
1059                    .body(Body::empty())
1060                    .unwrap(),
1061            )
1062            .await
1063            .unwrap();
1064
1065        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
1066            .await
1067            .unwrap();
1068        assert_eq!(body.as_ref(), b"no caches configured");
1069    }
1070
1071    #[tokio::test]
1072    async fn test_metrics_endpoint_no_recorder() {
1073        let state = make_state(vec![]);
1074        let session_handle = make_session_handle();
1075        let router = admin_router(
1076            state,
1077            None,
1078            session_handle,
1079            None,
1080            make_test_proxy().await,
1081            &make_test_config(),
1082            None,
1083            std::collections::HashMap::new(),
1084        );
1085
1086        let resp = router
1087            .clone()
1088            .oneshot(
1089                Request::builder()
1090                    .uri("/metrics")
1091                    .body(Body::empty())
1092                    .unwrap(),
1093            )
1094            .await
1095            .unwrap();
1096
1097        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
1098            .await
1099            .unwrap();
1100        assert!(body.is_empty());
1101    }
1102
1103    #[tokio::test]
1104    async fn test_single_backend_health() {
1105        let state = make_state(vec![healthy_backend("db/"), unhealthy_backend("flaky/")]);
1106        let session_handle = make_session_handle();
1107        let router = admin_router(
1108            state,
1109            None,
1110            session_handle,
1111            None,
1112            make_test_proxy().await,
1113            &make_test_config(),
1114            None,
1115            std::collections::HashMap::new(),
1116        );
1117
1118        let json = get_json(&router, "/backends/db/health").await;
1119        assert_eq!(json["namespace"], "db/");
1120        assert!(json["healthy"].as_bool().unwrap());
1121    }
1122
1123    #[tokio::test]
1124    async fn test_single_backend_health_not_found() {
1125        let state = make_state(vec![healthy_backend("db/")]);
1126        let session_handle = make_session_handle();
1127        let router = admin_router(
1128            state,
1129            None,
1130            session_handle,
1131            None,
1132            make_test_proxy().await,
1133            &make_test_config(),
1134            None,
1135            std::collections::HashMap::new(),
1136        );
1137
1138        let resp = router
1139            .clone()
1140            .oneshot(
1141                Request::builder()
1142                    .uri("/backends/nonexistent/health")
1143                    .body(Body::empty())
1144                    .unwrap(),
1145            )
1146            .await
1147            .unwrap();
1148        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
1149    }
1150
1151    #[tokio::test]
1152    async fn test_aggregate_stats() {
1153        let state = make_state(vec![healthy_backend("db/"), unhealthy_backend("flaky/")]);
1154        let session_handle = make_session_handle();
1155        let router = admin_router(
1156            state,
1157            None,
1158            session_handle,
1159            None,
1160            make_test_proxy().await,
1161            &make_test_config(),
1162            None,
1163            std::collections::HashMap::new(),
1164        );
1165
1166        let json = get_json(&router, "/stats").await;
1167        assert_eq!(json["total_backends"], 2);
1168        assert_eq!(json["healthy_backends"], 1);
1169        assert_eq!(json["unhealthy_backends"], 1);
1170    }
1171
1172    #[tokio::test]
1173    async fn test_health_history_empty() {
1174        let state = make_state(vec![healthy_backend("db/")]);
1175        let session_handle = make_session_handle();
1176        let router = admin_router(
1177            state,
1178            None,
1179            session_handle,
1180            None,
1181            make_test_proxy().await,
1182            &make_test_config(),
1183            None,
1184            std::collections::HashMap::new(),
1185        );
1186
1187        let json = get_json(&router, "/backends/db/health/history").await;
1188        assert!(json.as_array().unwrap().is_empty());
1189    }
1190
1191    #[tokio::test]
1192    async fn test_health_history_with_events() {
1193        let state = make_state(vec![healthy_backend("db/")]);
1194        // Manually insert history events
1195        {
1196            let mut history = state.health_history.write().await;
1197            history.push(HealthEvent {
1198                namespace: "db/".to_string(),
1199                healthy: true,
1200                timestamp: Utc::now(),
1201            });
1202            history.push(HealthEvent {
1203                namespace: "db/".to_string(),
1204                healthy: false,
1205                timestamp: Utc::now(),
1206            });
1207            history.push(HealthEvent {
1208                namespace: "other/".to_string(),
1209                healthy: false,
1210                timestamp: Utc::now(),
1211            });
1212        }
1213
1214        let session_handle = make_session_handle();
1215        let router = admin_router(
1216            state,
1217            None,
1218            session_handle,
1219            None,
1220            make_test_proxy().await,
1221            &make_test_config(),
1222            None,
1223            std::collections::HashMap::new(),
1224        );
1225
1226        // Should only return events for "db"
1227        let json = get_json(&router, "/backends/db/health/history").await;
1228        let events = json.as_array().unwrap();
1229        assert_eq!(events.len(), 2);
1230        assert!(events[0]["healthy"].as_bool().unwrap());
1231        assert!(!events[1]["healthy"].as_bool().unwrap());
1232    }
1233
1234    #[tokio::test]
1235    async fn test_sessions_endpoint() {
1236        let state = make_state(vec![]);
1237        let session_handle = make_session_handle();
1238        let router = admin_router(
1239            state,
1240            None,
1241            session_handle,
1242            None,
1243            make_test_proxy().await,
1244            &make_test_config(),
1245            None,
1246            std::collections::HashMap::new(),
1247        );
1248
1249        let json = get_json(&router, "/sessions").await;
1250        assert_eq!(json["active_sessions"], 0);
1251    }
1252
1253    #[tokio::test]
1254    async fn test_sessions_detail_empty() {
1255        let state = make_state(vec![]);
1256        let session_handle = make_session_handle();
1257        let router = admin_router(
1258            state,
1259            None,
1260            session_handle,
1261            None,
1262            make_test_proxy().await,
1263            &make_test_config(),
1264            None,
1265            std::collections::HashMap::new(),
1266        );
1267
1268        let json = get_json(&router, "/sessions/detail").await;
1269        let sessions = json.as_array().unwrap();
1270        assert!(sessions.is_empty());
1271    }
1272
1273    #[tokio::test]
1274    async fn test_terminate_session_not_found() {
1275        let state = make_state(vec![]);
1276        let session_handle = make_session_handle();
1277        let router = admin_router(
1278            state,
1279            None,
1280            session_handle,
1281            None,
1282            make_test_proxy().await,
1283            &make_test_config(),
1284            None,
1285            std::collections::HashMap::new(),
1286        );
1287
1288        let resp = router
1289            .clone()
1290            .oneshot(
1291                Request::builder()
1292                    .method("DELETE")
1293                    .uri("/sessions/nonexistent-id")
1294                    .body(Body::empty())
1295                    .unwrap(),
1296            )
1297            .await
1298            .unwrap();
1299        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
1300
1301        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
1302            .await
1303            .unwrap();
1304        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
1305        assert!(!json["ok"].as_bool().unwrap());
1306        assert!(json["message"].as_str().unwrap().contains("not found"));
1307    }
1308
1309    #[tokio::test]
1310    async fn test_sessions_detail_with_active_session() {
1311        // Create an HTTP transport and send an initialize request to establish a session
1312        let svc = tower::util::BoxCloneService::new(tower::service_fn(
1313            |_req: tower_mcp::RouterRequest| async {
1314                Ok::<_, std::convert::Infallible>(tower_mcp::RouterResponse {
1315                    id: tower_mcp::protocol::RequestId::Number(1),
1316                    inner: Ok(tower_mcp::protocol::McpResponse::Initialize(
1317                        tower_mcp::protocol::InitializeResult {
1318                            protocol_version: "2025-03-26".to_string(),
1319                            server_info: tower_mcp::protocol::Implementation {
1320                                name: "test".to_string(),
1321                                version: "1.0.0".to_string(),
1322                                ..Default::default()
1323                            },
1324                            capabilities: Default::default(),
1325                            instructions: None,
1326                            meta: None,
1327                        },
1328                    )),
1329                })
1330            },
1331        ));
1332
1333        let (http_router, session_handle) =
1334            tower_mcp::transport::http::HttpTransport::from_service(svc).into_router_with_handle();
1335
1336        // Send an initialize request to create a session
1337        let init_body = serde_json::json!({
1338            "jsonrpc": "2.0",
1339            "id": 1,
1340            "method": "initialize",
1341            "params": {
1342                "protocolVersion": "2025-03-26",
1343                "clientInfo": { "name": "test", "version": "1.0.0" },
1344                "capabilities": {}
1345            }
1346        });
1347
1348        let resp = http_router
1349            .clone()
1350            .oneshot(
1351                Request::builder()
1352                    .method("POST")
1353                    .uri("/")
1354                    .header("Content-Type", "application/json")
1355                    .body(Body::from(serde_json::to_string(&init_body).unwrap()))
1356                    .unwrap(),
1357            )
1358            .await
1359            .unwrap();
1360        assert_eq!(resp.status(), 200);
1361
1362        // Extract session ID from response header
1363        let session_id = resp
1364            .headers()
1365            .get("mcp-session-id")
1366            .expect("initialize response should include mcp-session-id header")
1367            .to_str()
1368            .unwrap()
1369            .to_string();
1370
1371        // Verify sessions are visible via admin endpoint
1372        let state = make_state(vec![]);
1373        let admin = admin_router(
1374            state,
1375            None,
1376            session_handle.clone(),
1377            None,
1378            make_test_proxy().await,
1379            &make_test_config(),
1380            None,
1381            std::collections::HashMap::new(),
1382        );
1383
1384        // Check session count
1385        let json = get_json(&admin, "/sessions").await;
1386        assert_eq!(json["active_sessions"], 1);
1387
1388        // Check session detail
1389        let json = get_json(&admin, "/sessions/detail").await;
1390        let sessions = json.as_array().unwrap();
1391        assert_eq!(sessions.len(), 1);
1392        assert!(!sessions[0]["id"].as_str().unwrap().is_empty());
1393        assert!(sessions[0]["uptime_seconds"].is_number());
1394        assert!(sessions[0]["idle_seconds"].is_number());
1395
1396        // Terminate the session
1397        let resp = admin
1398            .clone()
1399            .oneshot(
1400                Request::builder()
1401                    .method("DELETE")
1402                    .uri(format!("/sessions/{}", session_id))
1403                    .body(Body::empty())
1404                    .unwrap(),
1405            )
1406            .await
1407            .unwrap();
1408        assert_eq!(resp.status(), StatusCode::OK);
1409
1410        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
1411            .await
1412            .unwrap();
1413        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
1414        assert!(json["ok"].as_bool().unwrap());
1415
1416        // Verify session is gone
1417        let json = get_json(&admin, "/sessions").await;
1418        assert_eq!(json["active_sessions"], 0);
1419
1420        // Terminating again returns not found
1421        let resp = admin
1422            .clone()
1423            .oneshot(
1424                Request::builder()
1425                    .method("DELETE")
1426                    .uri(format!("/sessions/{}", session_id))
1427                    .body(Body::empty())
1428                    .unwrap(),
1429            )
1430            .await
1431            .unwrap();
1432        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
1433    }
1434}