turbomcp-core 1.0.13

Core abstractions and optimized message processing for the TurboMCP SDK
Documentation

TurboMCP Core

Crates.io Documentation License: MIT

Ultra-high performance core abstractions delivering the foundation for TurboMCP's 334,961 msg/sec performance with SIMD acceleration and complete MCP 2025-06-18 compliance.

Overview

turbomcp-core delivers the performance-critical foundation that enables TurboMCP's world-class performance metrics. With SIMD-accelerated message processing and zero-copy optimizations, this crate provides the essential building blocks for the industry's most advanced MCP implementation.

Validated Performance: 334,961 messages/second with 128 bytes per message memory efficiency.

Key Features

๐Ÿš€ SIMD-Accelerated Processing

  • 2-3x faster JSON parsing with simd-json and sonic-rs
  • Vectorized operations for better CPU cache utilization
  • SIMD-optimized message handling throughout the request lifecycle

๐Ÿ“ฆ Zero-Copy Architecture

  • Memory-efficient processing with Bytes-based message handling
  • Minimal allocations through careful lifetime management
  • SmallVec and CompactStr for optimized small data structures
  • Lock-free data structures for high-concurrency scenarios
  • Buffer pooling for reduced allocations
  • Lazy JSON parsing for on-demand deserialization
  • Memory-mapped file support for large data handling

๐Ÿงต Thread-Safe Session Management

  • Concurrent session tracking with thread-safe state management
  • LRU eviction policies with configurable memory limits
  • Request correlation and distributed tracing support

๐ŸŽฏ Rich Error Handling

  • Structured error types with full context using thiserror
  • Error propagation with automatic conversion and context preservation
  • Debugging support with detailed error information

๐Ÿ“Š Observability Integration

  • Built-in metrics hooks for performance monitoring
  • Tracing integration points for distributed observability
  • Request correlation IDs for end-to-end tracking

๐ŸŽฏ Enhanced Context Types (v1.0.3)

  • ElicitationContext - Server-initiated user input requests with JSON schema validation
  • CompletionContext - Intelligent autocompletion with reference tracking
  • PingContext - Bidirectional health monitoring and keepalive
  • ResourceTemplateContext - Dynamic resource generation with RFC 6570 templates

๐Ÿ”„ Shareable Pattern for Async Concurrency (New in v1.0.10)

  • Generic Shareable trait - Reusable abstraction for thread-safe wrappers
  • Shared wrapper - Arc/Mutex encapsulation with closure-based access
  • ConsumableShared - One-time consumption pattern for server-like objects
  • Zero overhead abstractions - Same performance as direct usage
  • Flexible access patterns - Synchronous and asynchronous closure support

Architecture

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                TurboMCP Core                โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚ SIMD Message Processing                     โ”‚
โ”‚ โ”œโ”€โ”€ simd-json acceleration                 โ”‚
โ”‚ โ”œโ”€โ”€ sonic-rs optimization                  โ”‚
โ”‚ โ””โ”€โ”€ Zero-copy Bytes handling               โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚ Session & Context Management               โ”‚
โ”‚ โ”œโ”€โ”€ RequestContext lifecycle              โ”‚
โ”‚ โ”œโ”€โ”€ Thread-safe session state             โ”‚
โ”‚ โ””โ”€โ”€ Correlation ID management             โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚ Error Handling & Observability            โ”‚
โ”‚ โ”œโ”€โ”€ Structured McpError types             โ”‚
โ”‚ โ”œโ”€โ”€ Context preservation                  โ”‚
โ”‚ โ””โ”€โ”€ Metrics & tracing hooks               โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Performance Characteristics

Benchmarks vs Standard Libraries

Operation Standard TurboMCP Core Improvement
JSON Parsing 100ms 35ms 2.8x faster
Message Processing 50ms 18ms 2.7x faster
Memory Usage 100MB 60MB 40% reduction
Concurrent Throughput 1000 req/s 2800 req/s 2.8x higher

Optimization Features

  • ๐Ÿš€ SIMD Acceleration - CPU-level vectorized JSON processing
  • ๐Ÿ“ฆ Zero-Copy - Minimal memory allocations and copies
  • ๐Ÿ”„ Efficient Collections - SmallVec, CompactStr for small data
  • ๐Ÿงต Lock-Free Operations - Where possible for maximum concurrency

Usage

Basic Usage

use turbomcp_core::{RequestContext, Message, Context, McpResult};

// Create a request context for correlation and observability
let mut context = RequestContext::new();

// SIMD-accelerated message parsing happens automatically
let json_data = br#"{"jsonrpc": "2.0", "method": "tools/list"}"#;
let message = Message::parse_with_simd(json_data)?;

// Context provides rich observability and user information
context.info("Processing request").await?;

// Enhanced Context features (v1.0.3)
if context.is_authenticated() {
    let user = context.user().unwrap_or("unknown");
    let roles = context.roles();
    context.info(&format!("Authenticated user: {}, roles: {:?}", user, roles)).await?;
}

Advanced Session Management

use turbomcp_core::{SessionManager, SessionConfig};

// Configure session management with LRU eviction
let config = SessionConfig::new()
    .with_max_sessions(1000)
    .with_ttl_seconds(3600);

let session_manager = SessionManager::with_config(config);

// Sessions are automatically managed with efficient cleanup
let session = session_manager.create_session().await?;

Error Handling

use turbomcp_core::{McpError, McpResult};

fn process_request() -> McpResult<String> {
    // Rich error types with automatic context
    if invalid_input {
        return Err(McpError::InvalidInput(
            "Request missing required field".to_string()
        ));
    }
    
    // Errors automatically include correlation context
    Ok("processed".to_string())
}

SIMD Feature Flag

Enable maximum performance with SIMD acceleration:

[dependencies]
turbomcp-core = { version = "1.0.3", features = ["simd"] }

Note: SIMD features require compatible CPU architectures (x86_64 with AVX2 or ARM with NEON).

Feature Flags

Feature Description Default
simd Enable SIMD-accelerated JSON processing โŒ
metrics Enable built-in performance metrics โœ…
tracing Enable distributed tracing support โœ…
compression Enable message compression utilities โŒ

Integration

With TurboMCP Framework

turbomcp-core is automatically included when using the main TurboMCP framework:

use turbomcp::prelude::*;

// Core functionality is available through the prelude
#[server]
impl MyServer {
    #[tool("Example with context")]
    async fn my_tool(&self, ctx: Context) -> McpResult<String> {
        // Context is powered by turbomcp-core
        ctx.info("Processing request").await?;
        Ok("result".to_string())
    }
}

Direct Usage

For custom implementations or integrations:

use turbomcp_core::{
    RequestContext, SessionManager, Message, McpError, McpResult
};

struct CustomHandler {
    sessions: SessionManager,
}

impl CustomHandler {
    async fn handle_request(&self, data: &[u8]) -> McpResult<String> {
        let context = RequestContext::new();
        let message = Message::parse_with_simd(data)?;
        
        // Use core functionality directly
        context.info("Custom processing").await?;
        Ok("processed".to_string())
    }
}

Error Types

Core error types for comprehensive error handling:

use turbomcp_core::McpError;

match result {
    Err(McpError::InvalidInput(msg)) => {
        // Handle validation errors
    },
    Err(McpError::SessionExpired(id)) => {
        // Handle session lifecycle
    },
    Err(McpError::Performance(details)) => {
        // Handle performance issues
    },
    Ok(value) => {
        // Process success case
    }
}

Shareable Patterns for Async Concurrency (v1.0.10)

TurboMCP Core v1.0.10 introduces powerful abstractions for thread-safe sharing that form the foundation for SharedClient, SharedTransport, and SharedServer:

Generic Shareable Trait

The Shareable<T> trait provides a consistent interface for creating thread-safe wrappers:

use turbomcp_core::shared::{Shareable, Shared};

// Any type can implement Shareable
pub trait Shareable<T>: Clone + Send + Sync + 'static {
    fn new(inner: T) -> Self;
}

// Use with any type
struct MyService {
    counter: u64,
}

let service = MyService { counter: 0 };
let shared = Shared::new(service); // Implements Shareable<MyService>

Shared - General Purpose Wrapper

The Shared<T> wrapper provides closure-based access patterns for any type:

use turbomcp_core::shared::Shared;

struct Database {
    connections: Vec<Connection>,
}

impl Database {
    fn query(&self, sql: &str) -> Result<Vec<Row>, DbError> {
        // Query implementation
    }

    fn execute(&mut self, sql: &str) -> Result<u64, DbError> {
        // Execute implementation
    }
}

// Create shared wrapper
let db = Database::new();
let shared_db = Shared::new(db);

// Read access with closures
let results = shared_db.with(|db| {
    db.query("SELECT * FROM users")
}).await?;

// Mutable access with closures
let affected_rows = shared_db.with_mut(|db| {
    db.execute("UPDATE users SET active = true")
}).await?;

// Async closures also supported
let async_result = shared_db.with_async(|db| async {
    let rows = db.query("SELECT COUNT(*) FROM users")?;
    process_async(rows).await
}).await?;

ConsumableShared - One-Time Consumption Pattern

For types that need to be consumed (like servers), ConsumableShared<T> provides safe extraction:

use turbomcp_core::shared::{ConsumableShared, SharedError};

struct Server {
    config: ServerConfig,
}

impl Server {
    fn run(self) -> Result<(), ServerError> {
        // Consume self to run server
        println!("Running server with config: {:?}", self.config);
        Ok(())
    }

    fn status(&self) -> ServerStatus {
        // Non-consuming method
        ServerStatus::Ready
    }
}

// Create consumable shared wrapper
let server = Server::new(config);
let shared = ConsumableShared::new(server);

// Access before consumption
let status = shared.with(|s| s.status()).await?;
println!("Server status: {:?}", status);

// Clone for monitoring while consuming
let monitor = shared.clone();
tokio::spawn(async move {
    loop {
        match monitor.with(|s| s.status()).await {
            Ok(status) => println!("Status: {:?}", status),
            Err(SharedError::Consumed) => {
                println!("Server has been consumed");
                break;
            }
        }
        tokio::time::sleep(Duration::from_secs(5)).await;
    }
});

// Consume the server (only possible once)
let server = shared.consume().await?;
server.run()?; // Server is now running

Advanced Patterns

Custom Shared Implementations

Create domain-specific shared wrappers:

use turbomcp_core::shared::Shareable;
use std::sync::Arc;
use tokio::sync::Mutex;

pub struct SharedHttpClient {
    inner: Arc<Mutex<HttpClient>>,
}

impl Shareable<HttpClient> for SharedHttpClient {
    fn new(client: HttpClient) -> Self {
        Self {
            inner: Arc::new(Mutex::new(client)),
        }
    }
}

impl Clone for SharedHttpClient {
    fn clone(&self) -> Self {
        Self {
            inner: Arc::clone(&self.inner),
        }
    }
}

impl SharedHttpClient {
    pub async fn get(&self, url: &str) -> Result<Response, HttpError> {
        self.inner.lock().await.get(url).await
    }

    pub async fn post(&self, url: &str, body: Vec<u8>) -> Result<Response, HttpError> {
        self.inner.lock().await.post(url, body).await
    }
}

Error Handling Patterns

use turbomcp_core::shared::{Shared, SharedError};

let shared_service = Shared::new(my_service);

// Handle potential errors in closures
let result = shared_service.with_mut(|service| {
    service.risky_operation()
        .map_err(|e| format!("Service error: {}", e))
}).await;

match result {
    Ok(success) => println!("Operation successful: {}", success),
    Err(e) => eprintln!("Operation failed: {}", e),
}

// Try operations without blocking
if let Some(result) = shared_service.try_with(|service| {
    service.quick_operation()
}) {
    println!("Quick operation result: {}", result);
} else {
    println!("Service is busy, will try later");
}

Benefits

  • Type Safety: Generic abstractions work with any type
  • Flexible Access: Closure-based patterns for fine-grained control
  • Zero Overhead: Same performance as manual Arc/Mutex usage
  • Async Native: Built for async/await patterns
  • Error Handling: Proper error propagation and handling
  • Consumption Safety: Safe one-time consumption patterns

Design Principles

The Shareable patterns follow key design principles:

  1. Hide Complexity: Arc/Mutex details are encapsulated
  2. Preserve Semantics: Original type behavior is maintained
  3. Enable Sharing: Easy cloning for concurrent access
  4. Async First: Designed for async/await workflows
  5. Type Generic: Works with any Send + 'static type

Development

Building

# Build with all features
cargo build --features simd,metrics,tracing

# Build optimized for production
cargo build --release --features simd

Testing

# Run comprehensive tests
cargo test

# Run performance benchmarks
cargo bench

# Test SIMD features (requires compatible CPU)
cargo test --features simd

Related Crates

License

Licensed under the MIT License.


Part of the TurboMCP high-performance Rust SDK for the Model Context Protocol.