turbomcp-server 2.3.3

Fast MCP server implementation with middleware, routing, and lifecycle management
Documentation

TurboMCP Server

Crates.io Documentation License: MIT

MCP server framework with OAuth 2.1 MCP compliance, middleware pipeline, and lifecycle management.

Table of Contents

Overview

turbomcp-server provides a comprehensive server framework for Model Context Protocol implementations. It handles all server-side concerns including request routing, authentication, middleware processing, session management, and production lifecycle operations.

Security Hardened

  • Zero Known Vulnerabilities - Comprehensive security audit and hardening
  • Dependency Security - Eliminated all vulnerable dependency paths
  • MIT-Compatible Licensing - Strict open-source license compliance

Key Features

Handler Registry & Routing

  • Type-safe registration - Compile-time handler validation and automatic discovery
  • Efficient routing - O(1) method dispatch with parameter injection
  • Schema generation - Automatic JSON schema creation from handler signatures
  • Hot reloading - Dynamic handler registration and updates (development mode)

OAuth 2.1 MCP Compliance

  • Multiple providers - Google, GitHub, Microsoft, and custom OAuth 2.1 providers
  • PKCE security - Proof Key for Code Exchange enabled by default
  • All OAuth flows - Authorization Code, Client Credentials, Device Code
  • Session management - Secure user session tracking with automatic cleanup

Middleware Pipeline

  • Request processing - Configurable middleware chain with error handling
  • Security middleware - CORS, CSP, rate limiting, security headers
  • Authentication - JWT validation, API key, OAuth token verification
  • Observability - Request logging, metrics collection, distributed tracing

Health & Metrics

  • Health endpoints - Readiness, liveness, and custom health checks
  • Performance metrics - Request timing, error rates, resource utilization
  • Prometheus integration - Standard metrics format with custom labels
  • Circuit breaker status - Transport and dependency health monitoring

Graceful Shutdown

  • Signal handling - SIGTERM/SIGINT graceful shutdown with timeout
  • Connection draining - Active request completion before shutdown
  • Resource cleanup - Proper cleanup of connections, files, and threads
  • Health status - Shutdown status reporting for load balancers

Clone Pattern for Server Sharing (Axum/Tower Standard)

  • Cheap cloning - All heavy state is Arc-wrapped (just atomic increments)
  • Tower compatible - Same pattern as Axum's Router and Tower services
  • No wrapper types - Server is directly Clone (no Arc needed)
  • Concurrent access - Share across multiple async tasks for monitoring
  • Zero overhead - Same performance as direct server usage
  • Type safe - Same type whether cloned or not

Architecture

┌─────────────────────────────────────────────┐
│              TurboMCP Server                │
├─────────────────────────────────────────────┤
│ Request Processing Pipeline                │
│ ├── Middleware chain                       │
│ ├── Authentication layer                   │
│ ├── Request routing                        │
│ └── Handler execution                      │
├─────────────────────────────────────────────┤
│ Handler Registry                           │
│ ├── Type-safe registration                 │
│ ├── Schema generation                      │
│ ├── Parameter validation                   │
│ └── Response serialization                 │
├─────────────────────────────────────────────┤
│ Authentication & Session                   │
│ ├── OAuth 2.1 providers                    │
│ ├── JWT token validation                   │
│ ├── Session lifecycle                      │
│ └── Security middleware                    │
├─────────────────────────────────────────────┤
│ Observability & Lifecycle                 │
│ ├── Health check endpoints                 │
│ ├── Metrics collection                     │
│ ├── Graceful shutdown                      │
│ └── Resource management                    │
└─────────────────────────────────────────────┘

Server Builder

Basic Server Setup

use turbomcp_server::{ServerBuilder, McpServer};

// Simple server creation
let server = ServerBuilder::new()
    .name("MyMCPServer")
    .version("1.0.0")
    .build();

// Run with STDIO transport
server.run_stdio().await?;

Production Server with Handlers

use turbomcp_server::ServerBuilder;
use turbomcp_protocol::types::Root;

let server = ServerBuilder::new()
    .name("ProductionMCPServer")
    .version("1.0.0")
    .description("Enterprise MCP server with comprehensive tooling")

    // Register filesystem roots
    .root("file:///workspace", Some("Workspace".to_string()))
    .root("file:///tmp", Some("Temp".to_string()))

    // Register tool handlers (traits implement ToolHandler)
    .tool("calculate", calculate_tool)?
    .tool("search", search_tool)?

    // Register resource handlers
    .resource("config://settings", config_resource)?
    .resource("db://users/*", user_resource)?

    // Register prompt handlers
    .prompt("code_review", code_review_prompt)?

    .build();

// Middleware, auth, and observability are configured separately
// via the MiddlewareStack (see Middleware System section)

Handler Registry

Handler Traits

Handlers implement trait interfaces for type-safe registration:

use turbomcp_server::{ServerBuilder, ToolHandler, ServerResult};
use turbomcp_protocol::{RequestContext, types::{CallToolRequest, CallToolResult, Tool, ContentBlock, TextContent}};
use async_trait::async_trait;
use serde_json::json;

// Example tool handler
struct CalculateTool;

#[async_trait]
impl ToolHandler for CalculateTool {
    async fn handle(
        &self,
        request: CallToolRequest,
        _ctx: RequestContext,
    ) -> ServerResult<CallToolResult> {
        let a: f64 = request.arguments.get("a")
            .and_then(|v| v.as_f64())
            .ok_or_else(|| turbomcp_server::ServerError::InvalidToolInput("Missing 'a' parameter".into()))?;
        let b: f64 = request.arguments.get("b")
            .and_then(|v| v.as_f64())
            .ok_or_else(|| turbomcp_server::ServerError::InvalidToolInput("Missing 'b' parameter".into()))?;

        let result = a + b;

        Ok(CallToolResult {
            content: vec![ContentBlock::Text(TextContent {
                text: format!("Result: {}", result),
                annotations: None,
            })],
            is_error: Some(false),
            structured_content: None,
            _meta: None,
        })
    }

    fn tool_definition(&self) -> Tool {
        Tool::new("calculate")
            .with_description("Add two numbers")
            .with_input_schema(
                turbomcp_protocol::types::ToolInputSchema::empty()
                    .add_property("a".to_string(), json!({"type": "number"}))
                    .add_property("b".to_string(), json!({"type": "number"}))
                    .require_property("a".to_string())
                    .require_property("b".to_string())
            )
    }
}

// Register via builder
let server = ServerBuilder::new()
    .tool("calculate", CalculateTool)?
    .build();

Authentication with turbomcp-auth

The server integrates with the turbomcp-auth crate for comprehensive authentication:

OAuth 2.1 Setup

use turbomcp_auth::{AuthManager, AuthConfig, OAuth2Config, AuthProviderType};

// Configure OAuth 2.1
let oauth_config = OAuth2Config {
    client_id: std::env::var("GOOGLE_CLIENT_ID")?,
    client_secret: std::env::var("GOOGLE_CLIENT_SECRET")?,
    auth_url: "https://accounts.google.com/o/oauth2/v2/auth".to_string(),
    token_url: "https://www.googleapis.com/oauth2/v4/token".to_string(),
    redirect_uri: "https://myapp.com/auth/callback".to_string(),
    scopes: vec!["openid".to_string(), "profile".to_string(), "email".to_string()],
    flow_type: OAuth2FlowType::AuthorizationCode,
    additional_params: HashMap::new(),
    security_level: SecurityLevel::Standard,
    #[cfg(feature = "dpop")]
    dpop_config: None,
    mcp_resource_uri: Some("https://myapp.com/mcp".to_string()),
    auto_resource_indicators: true,
};

// Create auth manager
let mut settings_map = HashMap::new();
// Serialize OAuth config to Value, then extract as object
if let serde_json::Value::Object(map) = serde_json::to_value(&oauth_config)? {
    for (key, value) in map {
        settings_map.insert(key, value);
    }
}

let auth_config = AuthConfig {
    enabled: true,
    providers: vec![AuthProviderConfig {
        name: "google".to_string(),
        provider_type: AuthProviderType::OAuth2,
        settings: settings_map,
        enabled: true,
        priority: 1,
    }],
    session: SessionConfig::default(),
    authorization: AuthorizationConfig::default(),
};

let auth_manager = AuthManager::new(auth_config);

// Add to your server implementation
// (integration varies based on transport type)

JWT Authentication via Middleware

For JWT-only authentication, use the server's built-in middleware:

use turbomcp_server::middleware::AuthConfig;
use secrecy::Secret;
use jsonwebtoken::Algorithm;

// JWT authentication is available via middleware (feature: auth)
#[cfg(feature = "auth")]
let auth_config = AuthConfig {
    secret: Secret::new(std::env::var("JWT_SECRET")?),
    algorithm: Algorithm::HS256,
    issuer: Some("your-issuer".to_string()),
    audience: Some("your-audience".to_string()),
    leeway: 60,
    validate_exp: true,
    validate_nbf: true,
};

Middleware System

The server uses a Tower-based middleware stack for cross-cutting concerns:

use turbomcp_server::middleware::{
    MiddlewareStack, SecurityConfig, CorsConfig, CorsOrigins, ValidationConfig,
    AuthConfig, RateLimitConfig, RateLimitStrategy, RateLimits,
    AuditConfig, AuditLogLevel, TimeoutConfig
};
use http::Method;
use std::time::Duration;

// Build a comprehensive middleware stack
let middleware = MiddlewareStack::new()
    .with_security(SecurityConfig {
        cors: CorsConfig {
            allowed_origins: CorsOrigins::List(vec![
                "https://app.example.com".to_string()
            ]),
            allowed_methods: vec![Method::GET, Method::POST],
            max_age: Some(Duration::from_secs(86400)),
            ..Default::default()
        },
        ..Default::default()
    })
    .with_validation(ValidationConfig {
        schemas: Default::default(),
        validate_requests: true,
        validate_responses: false,
        strict_mode: true,
    })
    .with_timeout(TimeoutConfig {
        request_timeout: Duration::from_secs(30),
        enabled: true,
    })
    .with_audit(AuditConfig {
        log_success: true,
        log_failures: true,
        log_auth_events: true,
        log_authz_events: true,
        log_level: AuditLogLevel::Info,
    });

// With auth feature enabled:
#[cfg(feature = "auth")]
{
    use secrecy::Secret;
    use jsonwebtoken::Algorithm;

    let middleware = middleware.with_auth(AuthConfig {
        secret: Secret::new("your-secret-key".to_string()),
        algorithm: Algorithm::HS256,
        issuer: Some("your-app".to_string()),
        audience: Some("your-api".to_string()),
        leeway: 60,
        validate_exp: true,
        validate_nbf: true,
    });
}

// With rate-limiting feature enabled:
#[cfg(feature = "rate-limiting")]
{
    use std::num::NonZeroU32;

    let middleware = middleware.with_rate_limit(RateLimitConfig {
        strategy: RateLimitStrategy::PerIp,
        limits: RateLimits {
            requests_per_period: NonZeroU32::new(100).unwrap(),
            period: Duration::from_secs(60), // 100 requests per minute
            burst_size: Some(NonZeroU32::new(20).unwrap()),
        },
        enabled: true,
    });
}

// The middleware stack is automatically applied by the server

Session Management with turbomcp-protocol

Session management is provided by the turbomcp-protocol crate:

use turbomcp_protocol::{SessionManager, SessionConfig};
use chrono::Duration;
use std::time::Duration as StdDuration;

// Configure session management
let session_config = SessionConfig {
    max_sessions: 1000,                           // Maximum concurrent sessions
    session_timeout: Duration::hours(24),         // Session lifetime
    max_request_history: 1000,                    // Request analytics depth
    max_requests_per_session: Some(10000),        // Rate limiting per session
    cleanup_interval: StdDuration::from_secs(300), // 5 minutes
    enable_analytics: true,
};

// Create session manager
let session_manager = SessionManager::new(session_config);

// Session manager handles:
// - Session lifecycle (create, update, expire)
// - Request analytics and client behavior tracking
// - Automatic cleanup of expired sessions
// - Elicitation and completion tracking

Health & Lifecycle

The server provides built-in health status and graceful shutdown:

use turbomcp_server::ServerBuilder;

let server = ServerBuilder::new()
    .name("MyServer")
    .version("2.0.0")
    .build();

// Get health status
let health = server.health().await;
if health.healthy {
    println!("Server is healthy (checked at {:?})", health.timestamp);
    for check in &health.details {
        println!("  - {}: {}", check.name, if check.healthy { "OK" } else { "FAILED" });
    }
} else {
    eprintln!("Server is unhealthy!");
}

// Graceful shutdown
let shutdown_handle = server.shutdown_handle();
tokio::spawn(async move {
    tokio::signal::ctrl_c().await.ok();
    shutdown_handle.shutdown().await;
});

server.run_stdio().await?;

Metrics & Observability

The server provides built-in production-grade metrics collection with lock-free atomic operations:

use turbomcp_server::{ServerBuilder, ServerMetrics};

let server = ServerBuilder::new()
    .name("MyServer")
    .version("2.0.0")
    .build();

// Access server metrics
let metrics = server.metrics();

// Metrics are automatically collected:
println!("Total requests: {}", metrics.requests_total.load(Ordering::Relaxed));
println!("Successful: {}", metrics.requests_successful.load(Ordering::Relaxed));
println!("Failed: {}", metrics.requests_failed.load(Ordering::Relaxed));
println!("In flight: {}", metrics.requests_in_flight.load(Ordering::Relaxed));

// Error metrics
println!("Total errors: {}", metrics.errors_total.load(Ordering::Relaxed));
println!("Validation errors: {}", metrics.errors_validation.load(Ordering::Relaxed));
println!("Auth errors: {}", metrics.errors_auth.load(Ordering::Relaxed));

// Tool execution metrics
println!("Tool calls: {}", metrics.tool_calls_total.load(Ordering::Relaxed));
println!("Tool timeouts: {}", metrics.tool_timeouts_total.load(Ordering::Relaxed));

// Connection metrics
println!("Active connections: {}", metrics.connections_active.load(Ordering::Relaxed));
println!("Total connections: {}", metrics.connections_total.load(Ordering::Relaxed));

// Response time statistics
let avg_response_time_us = metrics.total_response_time_us.load(Ordering::Relaxed)
    / metrics.requests_total.load(Ordering::Relaxed).max(1);
println!("Avg response time: {}μs", avg_response_time_us);

// Custom metrics (use the RwLock-protected HashMap)
{
    let mut custom = metrics.custom.write();
    custom.insert("my_metric".to_string(), 42.0);
}

Integration Examples

With TurboMCP Framework

Server functionality is automatically provided when using the framework:

use turbomcp::prelude::*;

#[derive(Clone)]
struct ProductionServer {
    database: Database,
    cache: Cache,
}

#[server]
impl ProductionServer {
    #[tool("Process user data")]
    async fn process_user(&self, ctx: Context, target_user_id: String) -> McpResult<User> {
        // Example: Use Context API for authentication
        if !ctx.is_authenticated() {
            return Err(McpError::Unauthorized("Authentication required".to_string()));
        }

        let current_user = ctx.user_id.as_deref().unwrap_or("anonymous");
        let roles = ctx.roles();

        tracing::info!("User {} accessing profile for {}", current_user, target_user_id);

        // Check permissions
        if !roles.contains(&"admin".to_string()) && Some(current_user) != Some(&target_user_id) {
            return Err(McpError::Unauthorized("Insufficient permissions".to_string()));
        }

        // Context provides:
        // - Authentication info: ctx.is_authenticated(), ctx.roles()
        // - Request correlation: ctx.request_id
        // - User identity: ctx.user_id
        // - Session tracking: ctx.session_id

        let user = self.database.get_user(&target_user_id).await?;
        Ok(user)
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let server = ProductionServer {
        database: Database::connect(&database_url).await?,
        cache: Cache::connect(&redis_url).await?,
    };
    
    // Server infrastructure handled automatically
    server.run_http("0.0.0.0:8080").await?;
    Ok(())
}

Direct Server Usage

For advanced server customization:

use turbomcp_server::{McpServer, ServerConfig, HandlerRegistry};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = ServerConfig::production()
        .with_authentication(auth_config)
        .with_middleware_stack(middleware_stack)
        .with_observability(observability_config);
    
    let mut server = McpServer::with_config(config);
    
    // Manual handler registration
    server.register_tool_handler("advanced_tool", |params| async {
        // Custom tool implementation
        Ok(serde_json::json!({"status": "processed"}))
    }).await?;
    
    // Start server with graceful shutdown
    let (server, shutdown_handle) = server.with_graceful_shutdown();
    
    let server_task = tokio::spawn(async move {
        server.run_http("0.0.0.0:8080").await
    });
    
    tokio::signal::ctrl_c().await?;
    tracing::info!("Shutdown signal received");
    
    shutdown_handle.shutdown().await;
    server_task.await??;
    
    Ok(())
}

Feature Flags

Feature Description Default
stdio Enable STDIO transport
http Enable HTTP transport
websocket Enable WebSocket transport
tcp Enable TCP transport
unix Enable Unix socket transport
auth Enable JWT authentication middleware
metrics Enable metrics collection
health-checks Enable health check endpoints
middleware Enable middleware stack (CORS, security headers, etc)
rate-limiting Enable rate limiting middleware
multi-tenancy Enable multi-tenant SaaS features (tenant extraction, per-tenant config, metrics)
graceful-shutdown DEPRECATED - Graceful shutdown is now always enabled
observability Enable observability features
hot-reload Enable hot reloading of handlers
all-transports Enable all transport types
full Enable all features

Server Sharing with Clone (Axum/Tower Pattern)

TurboMCP follows the Axum/Tower Clone pattern for sharing server instances across tasks and threads. All heavy state is Arc-wrapped internally, making cloning cheap (just atomic reference count increments).

Basic Server Cloning

use turbomcp_server::{ServerBuilder, ServerConfig};

// Create server (Clone-able)
let server = ServerBuilder::new()
    .name("MyServer")
    .version("2.0.0")
    .build();

// Clone for monitoring tasks (cheap - just Arc increments)
let monitor1 = server.clone();
let monitor2 = server.clone();

// Concurrent monitoring operations
let health_task = tokio::spawn(async move {
    loop {
        let health = monitor1.health().await;
        println!("Server health: {:?}", health);
        tokio::time::sleep(Duration::from_secs(5)).await;
    }
});

let metrics_task = tokio::spawn(async move {
    loop {
        let metrics = monitor2.metrics();
        println!("Server metrics: request_count={}",
            metrics.requests_total.load(std::sync::atomic::Ordering::Relaxed));
        tokio::time::sleep(Duration::from_secs(10)).await;
    }
});

// Run the server
server.run_stdio().await?;

Advanced Server Monitoring

use turbomcp_server::{ServerBuilder, HealthStatus};
use std::sync::Arc;
use tokio::sync::Notify;

let server = ServerBuilder::new().build();
let shutdown_notify = Arc::new(Notify::new());

// Health monitoring task
let monitor = server.clone();
let notify = shutdown_notify.clone();
let health_task = tokio::spawn(async move {
    loop {
        let health_status = monitor.health().await;
        if health_status.healthy {
            println!("✅ Server healthy ({} checks passed)", health_status.details.len());
        } else {
            println!("❌ Server unhealthy");
            for check in &health_status.details {
                if !check.healthy {
                    println!("  Failed: {}", check.name);
                    if let Some(msg) = &check.message {
                        println!("    Reason: {}", msg);
                    }
                }
            }
            notify.notify_one();
            break;
        }
        tokio::time::sleep(Duration::from_secs(30)).await;
    }
});

// Metrics collection task
let metrics_monitor = server.clone();
let metrics_task = tokio::spawn(async move {
    loop {
        let metrics = metrics_monitor.metrics();
        send_to_prometheus(metrics).await;
        tokio::time::sleep(Duration::from_secs(60)).await;
    }
});

// Run server with monitoring
let server_task = tokio::spawn(async move {
    server.run_stdio().await
});

// Wait for shutdown signal or server completion
tokio::select! {
    _ = shutdown_notify.notified() => {
        println!("Shutting down due to health check failure");
    }
    result = server_task => {
        println!("Server completed: {:?}", result);
    }
}

Benefits of the Clone Pattern

  • Cheap Cloning: Just atomic reference count increments (Arc-wrapped state)
  • Tower Compatible: Follows the same pattern as Axum's Router
  • No Arc Wrappers: No need for Arc<McpServer> - server is directly Clone
  • Type Safe: Same type whether cloned or not (no wrapper types)
  • Zero Overhead: Same performance as direct server usage
  • Ecosystem Standard: Matches Axum, Tower, Hyper conventions

Development

Building

# Build with all features
cargo build --all-features

# Build minimal server
cargo build --no-default-features --features basic

# Build with OAuth only
cargo build --no-default-features --features oauth

Testing

# Run server tests
cargo test

# Test with OAuth providers (requires environment variables)
GOOGLE_CLIENT_ID=test GOOGLE_CLIENT_SECRET=test cargo test oauth

# Integration tests
cargo test --test integration

# Test graceful shutdown
cargo test graceful_shutdown

Development Server

# Run development server with hot reloading
cargo run --example dev_server

# Run with debug logging
RUST_LOG=debug cargo run --example production_server

Multi-Tenancy Security Best Practices

When building multi-tenant SaaS applications with TurboMCP, follow these security practices to ensure robust tenant isolation:

1. Always Validate Tenant Ownership

Critical: Before accessing any tenant-scoped resource, validate that the requesting tenant owns it:

#[tool("Get user data")]
async fn get_user_data(
    &self,
    ctx: Context,
    user_id: String,
) -> McpResult<UserData> {
    // ✅ REQUIRED: Extract tenant ID
    let tenant_id = ctx.require_tenant()?;

    // Fetch resource from database
    let user = self.db.get_user(&user_id).await?;

    // ✅ CRITICAL: Validate tenant owns this resource
    ctx.validate_tenant_ownership(&user.tenant_id)?;

    // Now safe to return data
    Ok(user.data)
}

Never skip validation:

// ❌ INSECURE: No tenant validation
async fn bad_get_user(&self, user_id: String) -> McpResult<UserData> {
    self.db.get_user(&user_id).await  // Any tenant can access any user!
}

2. Use Database Row-Level Security (RLS)

Implement defense-in-depth with database-level isolation:

-- PostgreSQL Row-Level Security example
ALTER TABLE users ENABLE ROW LEVEL SECURITY;

CREATE POLICY tenant_isolation ON users
    USING (tenant_id = current_setting('app.current_tenant')::text);

-- Set tenant context in connection
SET app.current_tenant = 'acme-corp';

3. Encrypt Tenant Credentials

Store per-tenant API keys and OAuth tokens encrypted:

use aes_gcm::{Aes256Gcm, Key, Nonce};

async fn store_credentials(&self, tenant_id: &str, credentials: Credentials) -> Result<()> {
    // Per-tenant encryption key (stored in KMS/Vault)
    let encryption_key = self.kms.get_tenant_key(tenant_id).await?;

    // Encrypt before storing
    let encrypted = self.encrypt(&credentials, &encryption_key)?;
    self.db.store_encrypted_credentials(tenant_id, encrypted).await
}

4. Implement Per-Tenant Rate Limiting

Prevent one tenant from consuming all resources:

use turbomcp_server::config::multi_tenant::TenantConfig;

let tenant_config = TenantConfig {
    rate_limit_per_second: Some(100),  // 100 req/sec per tenant
    max_concurrent_requests: Some(10),  // 10 concurrent per tenant
    ..Default::default()
};

5. Audit Logging for Compliance

Log all tenant operations for security audits:

#[tool("Delete user")]
async fn delete_user(&self, ctx: Context, user_id: String) -> McpResult<()> {
    let tenant_id = ctx.require_tenant()?;

    // Audit critical operations
    tracing::warn!(
        tenant_id = %tenant_id,
        user_id = %user_id,
        action = "delete_user",
        "AUDIT: User deletion requested"
    );

    // ... perform deletion
}

6. Tenant Extraction Strategies

Use multiple extraction methods with fallback:

use turbomcp_server::middleware::tenancy::*;

let extractor = CompositeTenantExtractor::new(vec![
    // 1. Explicit header (highest priority)
    Box::new(HeaderTenantExtractor::new("X-Tenant-ID")),

    // 2. API key prefix (sk_tenant_secret)
    Box::new(ApiKeyTenantExtractor::new('_', 1).with_prefix("sk_")),

    // 3. JWT claim (requires validation first)
    Box::new(JwtTenantExtractor::new("tenant_id")),

    // 4. Subdomain (tenant.api.example.com)
    Box::new(SubdomainTenantExtractor::new("api.example.com")),
]);

7. Migration from Single to Multi-Tenant

Gradual migration path:

// Phase 1: Add tenant_id column (nullable)
ALTER TABLE users ADD COLUMN tenant_id TEXT;

// Phase 2: Backfill existing data with default tenant
UPDATE users SET tenant_id = 'default' WHERE tenant_id IS NULL;

// Phase 3: Make non-nullable
ALTER TABLE users ALTER COLUMN tenant_id SET NOT NULL;

// Phase 4: Add tenant validation to code
ctx.validate_tenant_ownership(&resource.tenant_id)?;

8. Testing Multi-Tenancy

Test tenant isolation thoroughly:

#[tokio::test]
async fn test_tenant_isolation() {
    let server = create_test_server().await;

    // Tenant A creates resource
    let ctx_a = RequestContext::new().with_tenant_id("tenant-a");
    let resource_id = server.create_resource(ctx_a, "data").await?;

    // Tenant B tries to access (should fail)
    let ctx_b = RequestContext::new().with_tenant_id("tenant-b");
    let result = server.get_resource(ctx_b, resource_id).await;
    assert!(result.is_err());  // ✅ Access denied
}

9. Security Checklist

Before deploying multi-tenant applications:

  • All resource access validates ctx.validate_tenant_ownership()
  • Database has row-level security (RLS) enabled
  • Credentials encrypted per-tenant with separate keys
  • Rate limiting configured per-tenant
  • Audit logging enabled for all mutations
  • Tenant extraction middleware configured
  • Integration tests verify tenant isolation
  • No tenant ID leakage in error messages
  • Background jobs scoped to correct tenant
  • Admin endpoints require super-user auth

10. Common Pitfalls

Don't trust client-provided tenant IDs without validation ❌ Don't use global caches without tenant keys ❌ Don't expose tenant IDs in URLs (use opaque resource IDs) ❌ Don't log sensitive tenant data ❌ Don't share database connections across tenants without context

Do validate tenant ownership before every resource access ✅ Do use database-level isolation (RLS) ✅ Do encrypt credentials per-tenant ✅ Do implement comprehensive audit logging ✅ Do test tenant isolation thoroughly

Example: Complete Secure Tool

#[tool("Process payment (multi-tenant secure)")]
async fn process_payment(
    &self,
    ctx: Context,
    payment_id: String,
) -> McpResult<PaymentResult> {
    // 1. Extract tenant
    let tenant_id = ctx.require_tenant()?;

    // 2. Check tenant is active
    let tenant_config = self.tenant_configs.get_config(tenant_id).await
        .ok_or_else(|| mcp_error!("Tenant not found"))?;

    if !tenant_config.is_active() {
        return Err(mcp_error!("Account suspended"));
    }

    // 3. Get payment from database (with RLS)
    let payment = self.db.get_payment(&payment_id).await?;

    // 4. CRITICAL: Validate tenant ownership
    ctx.validate_tenant_ownership(&payment.tenant_id)?;

    // 5. Check tenant quota
    tenant_config.check_quota("payments")?;

    // 6. Audit log
    tracing::info!(
        tenant_id = %tenant_id,
        payment_id = %payment_id,
        amount = %payment.amount,
        "Processing payment"
    );

    // 7. Process payment with tenant-specific credentials
    let credentials = self.credential_store
        .get_encrypted(tenant_id, "stripe")
        .await?;

    let result = self.payment_processor
        .process(payment, credentials)
        .await?;

    // 8. Update metrics
    self.metrics.record_request_success(tenant_id, ctx.elapsed());

    Ok(result)
}

For a complete working example, see examples/multi_tenant_server.rs in the TurboMCP repository.

Related Crates

Note: In v2.0.0, turbomcp-core was merged into turbomcp-protocol to eliminate circular dependencies.

External Resources

License

Licensed under the MIT License.


Part of the TurboMCP Rust SDK for the Model Context Protocol.