systemprompt-mcp 0.6.1

Native Model Context Protocol (MCP) implementation for systemprompt.io. Orchestration, per-server OAuth2, RBAC middleware, and tool-call governance — the core of the AI governance pipeline.
Documentation
//! Native Model Context Protocol (MCP) implementation for systemprompt.io.
//!
//! This crate hosts the in-process MCP server runtime, the
//! registry/orchestrator that supervises per-tenant MCP child processes, the
//! OAuth2/RBAC middleware, and the tool/resource/UI-renderer abstractions used
//! across the platform.
//!
//! # Layered components
//!
//! - [`extension::McpExtension`] — `Extension` registration entry-point.
//! - [`services::McpManager`], [`services::orchestrator::McpOrchestrator`] —
//!   top-level service supervisors.
//! - [`services::registry::RegistryManager`] — registry of MCP servers
//!   configured via `services.yaml`.
//! - [`services::tool_provider::McpToolProvider`] — tool-discovery + execution
//!   facade.
//! - [`middleware::rbac`] — JWT/proxy-verified RBAC layer.
//! - [`orchestration`] — multi-server lifecycle/state management.
//! - [`repository`] — Postgres persistence for sessions, artifacts, tool usage.
//!
//! # Feature matrix
//!
//! This crate has no Cargo features today; it is built as a single unit. The
//! facade crate `systemprompt` gates this crate behind the `mcp` / `full`
//! features.
//!
//! # Errors
//!
//! All public APIs return [`McpDomainResult`] — a typed `Result` aliased over
//! [`McpDomainError`]. External error types (`sqlx`, `serde_json`, `io`,
//! `anyhow`) are composed via `#[from]` on the error enum.

pub(crate) mod capabilities;
pub(crate) mod cli;
pub(crate) mod error;
pub(crate) mod extension;
pub(crate) mod jobs;
pub mod middleware;
pub mod models;
pub mod orchestration;
pub(crate) mod progress;
pub mod repository;
pub(crate) mod resources;
pub(crate) mod response;
pub(crate) mod schema;
pub mod services;
pub(crate) mod tool;

pub use extension::McpExtension;

pub use error::{McpDomainError, McpDomainResult};
pub use rmcp::ErrorData as McpError;

pub const MCP_PROTOCOL_VERSION: &str = "2024-11-05";

pub type McpResult<T> = Result<T, McpError>;

pub use capabilities::{
    WEBSITE_URL, build_experimental_capabilities, default_tool_visibility, mcp_apps_ui_extension,
    model_only_visibility, tool_ui_meta, visibility_to_json,
};
pub use progress::{ProgressCallback, create_progress_callback};
pub use repository::{CreateMcpArtifact, McpArtifactRecord, McpArtifactRepository};
pub use resources::{
    ArtifactViewerConfig, build_artifact_viewer_resource, default_server_icons,
    read_artifact_viewer_resource,
};
pub use response::McpResponseBuilder;
pub use schema::McpOutputSchema;
pub use tool::{McpToolExecutor, McpToolHandler};

pub use systemprompt_models::mcp::{
    Deployment, DeploymentConfig, ERROR, McpAuthState, McpServerConfig, OAuthRequirement, RUNNING,
    STARTING, STOPPED, Settings,
};

pub use services::monitoring::health::HealthStatus;
pub use services::registry::McpServerRegistry;
pub use services::registry::trait_impl::McpDeploymentProviderImpl;
pub use services::tool_provider::McpToolProvider;
pub use services::{EventBus as McpEventBus, McpEvent, McpManager, ServiceManager};

pub use orchestration::{
    McpServerConnectionInfo, McpServerMetadata, McpServiceState, McpToolLoader, ServerStatus,
    ServiceStateManager, SkillLoadingResult,
};

pub use systemprompt_models::mcp::{
    DynMcpDeploymentProvider, DynMcpRegistry, DynMcpToolProvider, McpDeploymentProvider,
    McpProvider, McpRegistry, McpServerState,
};

pub fn mcp_protocol_version() -> String {
    ProtocolVersion::LATEST.to_string()
}

pub mod registry {
    pub use crate::services::registry::RegistryManager;
}

pub use cli::{list_services, show_status, start_services, stop_services};

pub(crate) mod state;

use std::time::Duration;

use axum::extract::Request;
use axum::middleware::Next;
use axum::response::Response;
use rmcp::ServerHandler;
pub use rmcp::model::ProtocolVersion;
use rmcp::transport::StreamableHttpService;
use rmcp::transport::streamable_http_server::StreamableHttpServerConfig;
use systemprompt_database::DbPool;

use crate::middleware::DatabaseSessionManager;

#[derive(Debug, Clone)]
pub struct McpHttpConfig {
    pub allowed_hosts: Option<Vec<String>>,
    pub allowed_origins: Vec<String>,
    pub session: SessionTimeouts,
}

#[derive(Debug, Clone, Copy, Default)]
pub struct SessionTimeouts {
    pub init: Option<Duration>,
    pub keep_alive: Option<Duration>,
}

impl Default for McpHttpConfig {
    fn default() -> Self {
        Self {
            allowed_hosts: Some(vec!["localhost".into(), "127.0.0.1".into(), "::1".into()]),
            allowed_origins: Vec::new(),
            session: SessionTimeouts::default(),
        }
    }
}

pub use state::McpState;

async fn mcp_request_logger(req: Request, next: Next) -> Response {
    let method = req.method().clone();
    let uri = req.uri().clone();
    let session_id = req
        .headers()
        .get("mcp-session-id")
        .and_then(|v| v.to_str().ok())
        .map(String::from);
    let has_auth = req.headers().get("authorization").is_some();
    let proxy_verified = req
        .headers()
        .get("x-proxy-verified")
        .and_then(|v| v.to_str().ok())
        .is_some_and(|v| v == "true");
    let accept = req
        .headers()
        .get("accept")
        .and_then(|v| v.to_str().ok())
        .map(String::from);

    tracing::info!(
        %method,
        %uri,
        session_id = ?session_id,
        has_auth,
        proxy_verified,
        accept = ?accept,
        "MCP request received"
    );

    let response = next.run(req).await;

    let status = response.status();
    if !status.is_success() {
        tracing::error!(
            %method,
            %uri,
            session_id = ?session_id,
            status = %status,
            "MCP request failed at transport level"
        );
    }

    response
}

pub fn create_router<S>(server: S, db_pool: &DbPool, http: McpHttpConfig) -> axum::Router
where
    S: ServerHandler + Clone + Send + Sync + 'static,
{
    let McpHttpConfig {
        allowed_hosts,
        allowed_origins,
        session,
    } = http;

    let host_policy = StreamableHttpServerConfig::default().with_allowed_origins(allowed_origins);
    let host_policy = match allowed_hosts {
        Some(hosts) => host_policy.with_allowed_hosts(hosts),
        None => host_policy.disable_allowed_hosts(),
    };
    let config = host_policy.with_sse_keep_alive(session.keep_alive);

    let session_manager = DatabaseSessionManager::with_timeouts(db_pool, session);

    let service =
        StreamableHttpService::new(move || Ok(server.clone()), session_manager.into(), config);

    axum::Router::new()
        .nest_service("/mcp", service)
        .layer(axum::middleware::from_fn(mcp_request_logger))
        .layer(axum::middleware::map_response(
            |mut response: http::Response<_>| async move {
                response
                    .headers_mut()
                    .insert("x-accel-buffering", http::HeaderValue::from_static("no"));
                response
            },
        ))
}