rmcp 1.0.0

Rust SDK for Model Context Protocol
Documentation
#![cfg(all(
    feature = "transport-streamable-http-client",
    feature = "transport-streamable-http-client-reqwest",
    feature = "transport-streamable-http-server"
))]

use std::{collections::HashMap, sync::Arc};

use rmcp::{
    model::{ClientJsonRpcMessage, ClientRequest, PingRequest, RequestId},
    transport::{
        streamable_http_client::{StreamableHttpClient, StreamableHttpError},
        streamable_http_server::{
            StreamableHttpServerConfig, StreamableHttpService, session::local::LocalSessionManager,
        },
    },
};
use tokio_util::sync::CancellationToken;

mod common;
use common::calculator::Calculator;

#[tokio::test]
async fn test_stale_session_id_returns_status_aware_error() -> anyhow::Result<()> {
    let ct = CancellationToken::new();
    let service: StreamableHttpService<Calculator, LocalSessionManager> =
        StreamableHttpService::new(
            || Ok(Calculator::new()),
            Default::default(),
            StreamableHttpServerConfig {
                stateful_mode: true,
                sse_keep_alive: None,
                cancellation_token: ct.child_token(),
                ..Default::default()
            },
        );

    let router = axum::Router::new().nest_service("/mcp", service);
    let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
    let addr = listener.local_addr()?;

    let handle = tokio::spawn({
        let ct = ct.clone();
        async move {
            let _ = axum::serve(listener, router)
                .with_graceful_shutdown(async move { ct.cancelled_owned().await })
                .await;
        }
    });

    let uri = Arc::<str>::from(format!("http://{addr}/mcp"));
    let message = ClientJsonRpcMessage::request(
        ClientRequest::PingRequest(PingRequest::default()),
        RequestId::Number(1),
    );

    let client = reqwest::Client::new();
    let result = client
        .post_message(
            uri.clone(),
            message,
            Some(Arc::from("stale-session-id")),
            None,
            HashMap::new(),
        )
        .await;

    let raw_response = reqwest::Client::new()
        .post(uri.as_ref())
        .header("accept", "application/json, text/event-stream")
        .header("content-type", "application/json")
        .header("mcp-session-id", "stale-session-id")
        .body(r#"{"jsonrpc":"2.0","id":1,"method":"ping","params":{}}"#)
        .send()
        .await?;

    assert_eq!(raw_response.status(), reqwest::StatusCode::NOT_FOUND);
    match result {
        Err(StreamableHttpError::UnexpectedServerResponse(message)) => {
            let message = message.to_string();
            assert!(
                message.contains("404"),
                "error should include HTTP status code, got: {message}"
            );
            assert!(
                message.to_ascii_lowercase().contains("session not found"),
                "error should include session-not-found hint, got: {message}"
            );
        }
        other => panic!("expected UnexpectedServerResponse, got: {other:?}"),
    }

    ct.cancel();
    handle.await?;

    Ok(())
}