#![cfg(not(feature = "local"))]
use std::time::Duration;
use rmcp::transport::streamable_http_server::{
StreamableHttpServerConfig, StreamableHttpService,
session::{SessionId, local::LocalSessionManager},
};
use tokio_util::sync::CancellationToken;
mod common;
use common::calculator::Calculator;
#[tokio::test]
async fn test_priming_on_stream_start() -> anyhow::Result<()> {
let ct = CancellationToken::new();
let service: StreamableHttpService<Calculator, LocalSessionManager> =
StreamableHttpService::new(
|| Ok(Calculator::new()),
Default::default(),
StreamableHttpServerConfig::default()
.with_sse_keep_alive(None)
.with_cancellation_token(ct.child_token()),
);
let router = axum::Router::new().nest_service("/mcp", service);
let tcp_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
let addr = tcp_listener.local_addr()?;
let handle = tokio::spawn({
let ct = ct.clone();
async move {
let _ = axum::serve(tcp_listener, router)
.with_graceful_shutdown(async move { ct.cancelled_owned().await })
.await;
}
});
let client = reqwest::Client::new();
let response = client
.post(format!("http://{addr}/mcp"))
.header("Content-Type", "application/json")
.header("Accept", "application/json, text/event-stream")
.body(r#"{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion":"2025-11-25","capabilities":{},"clientInfo":{"name":"test","version":"1.0"}}}"#)
.send()
.await?;
assert_eq!(response.status(), 200);
let body = response.text().await?;
let events: Vec<&str> = body.split("\n\n").filter(|e| !e.is_empty()).collect();
assert!(events.len() >= 2);
let priming_event = events[0];
assert!(priming_event.contains("id: 0"));
assert!(priming_event.contains("retry: 3000"));
assert!(priming_event.contains("data:"));
let response_event = events[1];
assert!(response_event.contains(r#""jsonrpc":"2.0""#));
assert!(response_event.contains(r#""id":1"#));
ct.cancel();
handle.await?;
Ok(())
}
#[tokio::test]
async fn test_request_wise_priming_includes_http_request_id() -> anyhow::Result<()> {
let ct = CancellationToken::new();
let service: StreamableHttpService<Calculator, LocalSessionManager> =
StreamableHttpService::new(
|| Ok(Calculator::new()),
Default::default(),
StreamableHttpServerConfig::default()
.with_sse_keep_alive(None)
.with_cancellation_token(ct.child_token()),
);
let router = axum::Router::new().nest_service("/mcp", service);
let tcp_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
let addr = tcp_listener.local_addr()?;
let handle = tokio::spawn({
let ct = ct.clone();
async move {
let _ = axum::serve(tcp_listener, router)
.with_graceful_shutdown(async move { ct.cancelled_owned().await })
.await;
}
});
let client = reqwest::Client::new();
let response = client
.post(format!("http://{addr}/mcp"))
.header("Content-Type", "application/json")
.header("Accept", "application/json, text/event-stream")
.body(r#"{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion":"2025-11-25","capabilities":{},"clientInfo":{"name":"test","version":"1.0"}}}"#)
.send()
.await?;
assert_eq!(response.status(), 200);
let session_id: SessionId = response.headers()["mcp-session-id"].to_str()?.into();
let status = client
.post(format!("http://{addr}/mcp"))
.header("Content-Type", "application/json")
.header("Accept", "application/json, text/event-stream")
.header("mcp-session-id", session_id.to_string())
.header("Mcp-Protocol-Version", "2025-06-18")
.body(r#"{"jsonrpc":"2.0","method":"notifications/initialized"}"#)
.send()
.await?
.status();
assert_eq!(status, 202);
let body = client
.post(format!("http://{addr}/mcp"))
.header("Content-Type", "application/json")
.header("Accept", "application/json, text/event-stream")
.header("mcp-session-id", session_id.to_string())
.header("Mcp-Protocol-Version", "2025-06-18")
.body(r#"{"jsonrpc":"2.0","id":2,"method":"tools/call","params":{"name":"sum","arguments":{"a":1,"b":2}}}"#)
.send()
.await?
.text()
.await?;
let events: Vec<&str> = body.split("\n\n").filter(|e| !e.is_empty()).collect();
assert!(
events.len() >= 2,
"expected priming + response, got: {body}"
);
let priming = events[0];
assert!(
priming.contains("id: 0/0"),
"first request priming should be 0/0, got: {priming}"
);
assert!(priming.contains("retry: 3000"));
let response_event = events[1];
assert!(
response_event.contains("id: 1/0"),
"first response event id should be 1/0, got: {response_event}"
);
assert!(response_event.contains(r#""id":2"#));
let body = client
.post(format!("http://{addr}/mcp"))
.header("Content-Type", "application/json")
.header("Accept", "application/json, text/event-stream")
.header("mcp-session-id", session_id.to_string())
.header("Mcp-Protocol-Version", "2025-06-18")
.body(r#"{"jsonrpc":"2.0","id":3,"method":"tools/call","params":{"name":"sum","arguments":{"a":3,"b":4}}}"#)
.send()
.await?
.text()
.await?;
let events: Vec<&str> = body.split("\n\n").filter(|e| !e.is_empty()).collect();
assert!(
events.len() >= 2,
"expected priming + response, got: {body}"
);
let priming = events[0];
assert!(
priming.contains("id: 0/1"),
"second request priming should be 0/1, got: {priming}"
);
let response_event = events[1];
assert!(
response_event.contains("id: 1/1"),
"second response event id should be 1/1, got: {response_event}"
);
assert!(response_event.contains(r#""id":3"#));
ct.cancel();
handle.await?;
Ok(())
}
#[tokio::test]
async fn test_resume_after_request_wise_channel_completed() -> anyhow::Result<()> {
let ct = CancellationToken::new();
let service: StreamableHttpService<Calculator, LocalSessionManager> =
StreamableHttpService::new(
|| Ok(Calculator::new()),
Default::default(),
StreamableHttpServerConfig::default()
.with_sse_keep_alive(None)
.with_cancellation_token(ct.child_token()),
);
let router = axum::Router::new().nest_service("/mcp", service);
let tcp_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
let addr = tcp_listener.local_addr()?;
let handle = tokio::spawn({
let ct = ct.clone();
async move {
let _ = axum::serve(tcp_listener, router)
.with_graceful_shutdown(async move { ct.cancelled_owned().await })
.await;
}
});
let client = reqwest::Client::new();
let response = client
.post(format!("http://{addr}/mcp"))
.header("Content-Type", "application/json")
.header("Accept", "application/json, text/event-stream")
.body(r#"{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion":"2025-06-18","capabilities":{},"clientInfo":{"name":"test","version":"1.0"}}}"#)
.send()
.await?;
assert_eq!(response.status(), 200);
let session_id: SessionId = response.headers()["mcp-session-id"].to_str()?.into();
let status = client
.post(format!("http://{addr}/mcp"))
.header("Content-Type", "application/json")
.header("Accept", "application/json, text/event-stream")
.header("mcp-session-id", session_id.to_string())
.header("Mcp-Protocol-Version", "2025-06-18")
.body(r#"{"jsonrpc":"2.0","method":"notifications/initialized"}"#)
.send()
.await?
.status();
assert_eq!(status, 202);
let body = client
.post(format!("http://{addr}/mcp"))
.header("Content-Type", "application/json")
.header("Accept", "application/json, text/event-stream")
.header("mcp-session-id", session_id.to_string())
.header("Mcp-Protocol-Version", "2025-06-18")
.body(r#"{"jsonrpc":"2.0","id":2,"method":"tools/call","params":{"name":"sum","arguments":{"a":1,"b":2}}}"#)
.send()
.await?
.text()
.await?;
let events: Vec<&str> = body.split("\n\n").filter(|e| !e.is_empty()).collect();
assert!(
events.len() >= 2,
"expected priming + response, got: {body}"
);
assert!(events[0].contains("id: 0/0"));
assert!(events[1].contains(r#""id":2"#));
let resume = client
.get(format!("http://{addr}/mcp"))
.header("Accept", "text/event-stream")
.header("mcp-session-id", session_id.to_string())
.header("Mcp-Protocol-Version", "2025-06-18")
.header("last-event-id", "0/0")
.send()
.await?;
assert_eq!(resume.status(), 200);
let resume_body = resume.text().await?;
assert!(
!resume_body.contains("standalone"),
"should not receive events from a different stream"
);
ct.cancel();
handle.await?;
Ok(())
}
#[tokio::test]
async fn test_completed_cache_ttl_eviction() -> anyhow::Result<()> {
use std::sync::Arc;
let ct = CancellationToken::new();
let mut session_manager = LocalSessionManager::default();
session_manager.session_config.completed_cache_ttl = Duration::from_millis(200);
let session_manager = Arc::new(session_manager);
let service = StreamableHttpService::new(
|| Ok(Calculator::new()),
session_manager.clone(),
StreamableHttpServerConfig::default()
.with_sse_keep_alive(None)
.with_cancellation_token(ct.child_token()),
);
let router = axum::Router::new().nest_service("/mcp", service);
let tcp_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
let addr = tcp_listener.local_addr()?;
let handle = tokio::spawn({
let ct = ct.clone();
async move {
let _ = axum::serve(tcp_listener, router)
.with_graceful_shutdown(async move { ct.cancelled_owned().await })
.await;
}
});
let client = reqwest::Client::new();
let response = client
.post(format!("http://{addr}/mcp"))
.header("Content-Type", "application/json")
.header("Accept", "application/json, text/event-stream")
.body(r#"{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion":"2025-06-18","capabilities":{},"clientInfo":{"name":"test","version":"1.0"}}}"#)
.send()
.await?;
assert_eq!(response.status(), 200);
let session_id: SessionId = response.headers()["mcp-session-id"].to_str()?.into();
client
.post(format!("http://{addr}/mcp"))
.header("Content-Type", "application/json")
.header("Accept", "application/json, text/event-stream")
.header("mcp-session-id", session_id.to_string())
.header("Mcp-Protocol-Version", "2025-06-18")
.body(r#"{"jsonrpc":"2.0","method":"notifications/initialized"}"#)
.send()
.await?;
let body = client
.post(format!("http://{addr}/mcp"))
.header("Content-Type", "application/json")
.header("Accept", "application/json, text/event-stream")
.header("mcp-session-id", session_id.to_string())
.header("Mcp-Protocol-Version", "2025-06-18")
.body(r#"{"jsonrpc":"2.0","id":2,"method":"tools/call","params":{"name":"sum","arguments":{"a":1,"b":2}}}"#)
.send()
.await?
.text()
.await?;
assert!(body.contains(r#""id":2"#));
tokio::time::sleep(Duration::from_millis(400)).await;
client
.post(format!("http://{addr}/mcp"))
.header("Content-Type", "application/json")
.header("Accept", "application/json, text/event-stream")
.header("mcp-session-id", session_id.to_string())
.header("Mcp-Protocol-Version", "2025-06-18")
.body(r#"{"jsonrpc":"2.0","method":"notifications/initialized"}"#)
.send()
.await?;
tokio::time::sleep(Duration::from_millis(50)).await;
let resume = client
.get(format!("http://{addr}/mcp"))
.header("Accept", "text/event-stream")
.header("mcp-session-id", session_id.to_string())
.header("Mcp-Protocol-Version", "2025-06-18")
.header("last-event-id", "0/0")
.send()
.await?;
assert_eq!(resume.status(), 200);
let body = resume.text().await?;
assert!(
!body.contains(r#""id":2"#),
"should NOT contain the old tool response after eviction, got: {body}"
);
ct.cancel();
handle.await?;
Ok(())
}
#[tokio::test]
async fn test_priming_on_stream_close() -> anyhow::Result<()> {
use std::sync::Arc;
use rmcp::transport::streamable_http_server::session::SessionId;
let ct = CancellationToken::new();
let session_manager = Arc::new(LocalSessionManager::default());
let service = StreamableHttpService::new(
|| Ok(Calculator::new()),
session_manager.clone(),
StreamableHttpServerConfig::default()
.with_sse_keep_alive(None)
.with_cancellation_token(ct.child_token()),
);
let router = axum::Router::new().nest_service("/mcp", service);
let tcp_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
let addr = tcp_listener.local_addr()?;
let handle = tokio::spawn({
let ct = ct.clone();
async move {
let _ = axum::serve(tcp_listener, router)
.with_graceful_shutdown(async move { ct.cancelled_owned().await })
.await;
}
});
let client = reqwest::Client::new();
let response = client
.post(format!("http://{addr}/mcp"))
.header("Content-Type", "application/json")
.header("Accept", "application/json, text/event-stream")
.body(r#"{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion":"2025-11-25","capabilities":{},"clientInfo":{"name":"test","version":"1.0"}}}"#)
.send()
.await?;
let session_id: SessionId = response.headers()["mcp-session-id"].to_str()?.into();
let response = client
.get(format!("http://{addr}/mcp"))
.header("Accept", "text/event-stream")
.header("mcp-session-id", session_id.to_string())
.send()
.await?;
assert_eq!(response.status(), 200);
let read_task = tokio::spawn(async move { response.text().await.unwrap() });
let sessions = session_manager.sessions.read().await;
let session = sessions.get(&session_id).unwrap();
session
.close_standalone_sse_stream(Some(Duration::from_secs(5)))
.await?;
drop(sessions);
let body = read_task.await?;
let events: Vec<&str> = body.split("\n\n").filter(|e| !e.is_empty()).collect();
assert_eq!(events.len(), 2);
let start_priming = events[0];
assert!(start_priming.contains("id:"));
assert!(start_priming.contains("retry: 3000"));
assert!(start_priming.contains("data:"));
let close_priming = events[1];
assert!(close_priming.contains("id:"));
assert!(close_priming.contains("retry: 5000"));
assert!(close_priming.contains("data:"));
ct.cancel();
handle.await?;
Ok(())
}