#![cfg(not(feature = "local"))]
use std::sync::Arc;
use std::time::Duration;
use futures::StreamExt;
use rmcp::{
RoleServer, ServerHandler,
model::{Implementation, ServerCapabilities, ServerInfo, ToolsCapability},
service::NotificationContext,
transport::streamable_http_server::{
StreamableHttpServerConfig, StreamableHttpService, session::local::LocalSessionManager,
},
};
use serde_json::json;
use tokio::sync::Notify;
use tokio_util::sync::CancellationToken;
const ACCEPT_SSE: &str = "text/event-stream";
const ACCEPT_BOTH: &str = "text/event-stream, application/json";
#[derive(Clone)]
pub struct TestServer {
trigger: Arc<Notify>,
}
impl TestServer {
fn new(trigger: Arc<Notify>) -> Self {
Self { trigger }
}
}
impl ServerHandler for TestServer {
fn get_info(&self) -> ServerInfo {
ServerInfo::new(
ServerCapabilities::builder()
.enable_tools_with(ToolsCapability {
list_changed: Some(true),
})
.build(),
)
.with_server_info(Implementation::new("test-server", "1.0.0"))
}
async fn on_initialized(&self, context: NotificationContext<RoleServer>) {
let peer = context.peer.clone();
let trigger = self.trigger.clone();
tokio::spawn(async move {
trigger.notified().await;
let _ = peer.notify_tool_list_changed().await;
});
}
}
async fn start_test_server(ct: CancellationToken, trigger: Arc<Notify>) -> String {
let server = TestServer::new(trigger);
let service = StreamableHttpService::new(
move || Ok(server.clone()),
Arc::new(LocalSessionManager::default()),
StreamableHttpServerConfig::default().with_cancellation_token(ct.child_token()),
);
let router = axum::Router::new().nest_service("/mcp", service);
let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
.await
.expect("bind");
let addr = listener.local_addr().unwrap();
let url = format!("http://127.0.0.1:{}/mcp", addr.port());
let ct_clone = ct.clone();
tokio::spawn(async move {
axum::serve(listener, router)
.with_graceful_shutdown(async move { ct_clone.cancelled().await })
.await
.unwrap();
});
tokio::time::sleep(Duration::from_millis(100)).await;
url
}
async fn initialize_session(client: &reqwest::Client, url: &str) -> String {
let resp = client
.post(url)
.header("Accept", ACCEPT_BOTH)
.header("Content-Type", "application/json")
.json(&json!({
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": { "name": "test-client", "version": "1.0.0" }
}
}))
.timeout(Duration::from_millis(500))
.send()
.await
.expect("POST initialize");
assert!(resp.status().is_success(), "initialize should succeed");
resp.headers()
.get("Mcp-Session-Id")
.expect("session ID header")
.to_str()
.unwrap()
.to_string()
}
async fn send_initialized_notification(client: &reqwest::Client, url: &str, session_id: &str) {
let resp = client
.post(url)
.header("Accept", ACCEPT_BOTH)
.header("Content-Type", "application/json")
.header("Mcp-Session-Id", session_id)
.json(&json!({
"jsonrpc": "2.0",
"method": "notifications/initialized"
}))
.send()
.await
.expect("POST notifications/initialized");
assert_eq!(
resp.status().as_u16(),
202,
"notifications/initialized should return 202 Accepted"
);
}
async fn open_standalone_get(
client: &reqwest::Client,
url: &str,
session_id: &str,
) -> reqwest::Response {
client
.get(url)
.header("Accept", ACCEPT_SSE)
.header("Mcp-Session-Id", session_id)
.send()
.await
.expect("GET SSE stream")
}
async fn open_resume_get(
client: &reqwest::Client,
url: &str,
session_id: &str,
last_event_id: &str,
) -> reqwest::Response {
client
.get(url)
.header("Accept", ACCEPT_SSE)
.header("Mcp-Session-Id", session_id)
.header("Last-Event-ID", last_event_id)
.send()
.await
.expect("GET SSE stream with Last-Event-ID")
}
async fn wait_for_sse_event(resp: reqwest::Response, needle: &str, timeout: Duration) -> bool {
let mut stream = resp.bytes_stream();
let result = tokio::time::timeout(timeout, async {
while let Some(Ok(chunk)) = stream.next().await {
let text = String::from_utf8_lossy(&chunk);
if text.contains(needle) {
return true;
}
}
false
})
.await;
matches!(result, Ok(true))
}
#[tokio::test]
async fn shadow_second_standalone_get_returns_200() {
let ct = CancellationToken::new();
let trigger = Arc::new(Notify::new());
let url = start_test_server(ct.clone(), trigger).await;
let client = reqwest::Client::new();
let session_id = initialize_session(&client, &url).await;
let get1 = open_standalone_get(&client, &url, &session_id).await;
assert_eq!(get1.status(), 200, "First GET should succeed");
tokio::time::sleep(Duration::from_millis(100)).await;
let get2 = open_standalone_get(&client, &url, &session_id).await;
assert_eq!(
get2.status(),
200,
"Second GET should return 200 (shadow stream), not 409 Conflict"
);
ct.cancel();
}
#[tokio::test]
async fn shadow_multiple_standalone_gets_all_succeed() {
let ct = CancellationToken::new();
let trigger = Arc::new(Notify::new());
let url = start_test_server(ct.clone(), trigger).await;
let client = reqwest::Client::new();
let session_id = initialize_session(&client, &url).await;
let mut responses = Vec::new();
for i in 0..5 {
let resp = open_standalone_get(&client, &url, &session_id).await;
assert_eq!(resp.status(), 200, "GET #{i} should succeed");
responses.push(resp);
tokio::time::sleep(Duration::from_millis(50)).await;
}
assert_eq!(responses.len(), 5);
ct.cancel();
}
#[tokio::test]
async fn dead_primary_gets_replaced_by_next_get() {
let ct = CancellationToken::new();
let trigger = Arc::new(Notify::new());
let url = start_test_server(ct.clone(), trigger).await;
let client = reqwest::Client::new();
let session_id = initialize_session(&client, &url).await;
let get1 = open_standalone_get(&client, &url, &session_id).await;
assert_eq!(get1.status(), 200);
drop(get1);
tokio::time::sleep(Duration::from_millis(100)).await;
let get2 = open_standalone_get(&client, &url, &session_id).await;
assert_eq!(
get2.status(),
200,
"GET should succeed as new primary after old primary was dropped"
);
ct.cancel();
}
#[tokio::test]
async fn dead_primary_replacement_receives_notifications() {
let ct = CancellationToken::new();
let trigger = Arc::new(Notify::new());
let url = start_test_server(ct.clone(), trigger.clone()).await;
let client = reqwest::Client::new();
let session_id = initialize_session(&client, &url).await;
send_initialized_notification(&client, &url, &session_id).await;
tokio::time::sleep(Duration::from_millis(100)).await;
let get1 = open_standalone_get(&client, &url, &session_id).await;
assert_eq!(get1.status(), 200);
drop(get1);
tokio::time::sleep(Duration::from_millis(100)).await;
let get2 = open_standalone_get(&client, &url, &session_id).await;
assert_eq!(get2.status(), 200);
trigger.notify_one();
assert!(
wait_for_sse_event(get2, "tools/list_changed", Duration::from_secs(3)).await,
"Replacement primary should receive notifications"
);
ct.cancel();
}
#[tokio::test]
async fn dead_primary_can_be_replaced_multiple_times() {
let ct = CancellationToken::new();
let trigger = Arc::new(Notify::new());
let url = start_test_server(ct.clone(), trigger).await;
let client = reqwest::Client::new();
let session_id = initialize_session(&client, &url).await;
for i in 0..3 {
let get = open_standalone_get(&client, &url, &session_id).await;
assert_eq!(get.status(), 200, "GET #{i} should succeed");
drop(get);
tokio::time::sleep(Duration::from_millis(100)).await;
}
let final_get = open_standalone_get(&client, &url, &session_id).await;
assert_eq!(
final_get.status(),
200,
"GET after multiple replacements should succeed"
);
ct.cancel();
}
#[tokio::test]
async fn notification_reaches_primary_not_shadow() {
let ct = CancellationToken::new();
let trigger = Arc::new(Notify::new());
let url = start_test_server(ct.clone(), trigger.clone()).await;
let client = reqwest::Client::new();
let session_id = initialize_session(&client, &url).await;
send_initialized_notification(&client, &url, &session_id).await;
tokio::time::sleep(Duration::from_millis(200)).await;
let get1 = open_standalone_get(&client, &url, &session_id).await;
assert_eq!(get1.status(), 200);
tokio::time::sleep(Duration::from_millis(100)).await;
let _get2 = open_standalone_get(&client, &url, &session_id).await;
assert_eq!(_get2.status(), 200);
tokio::time::sleep(Duration::from_millis(100)).await;
trigger.notify_one();
assert!(
wait_for_sse_event(get1, "tools/list_changed", Duration::from_secs(3)).await,
"Primary stream should receive notification even after shadow was created"
);
ct.cancel();
}
#[tokio::test]
async fn resume_completed_request_wise_creates_shadow() {
let ct = CancellationToken::new();
let trigger = Arc::new(Notify::new());
let url = start_test_server(ct.clone(), trigger).await;
let client = reqwest::Client::new();
let session_id = initialize_session(&client, &url).await;
let _get1 = open_standalone_get(&client, &url, &session_id).await;
assert_eq!(_get1.status(), 200);
tokio::time::sleep(Duration::from_millis(100)).await;
let get_resume = open_resume_get(&client, &url, &session_id, "0/999").await;
assert_eq!(
get_resume.status(),
200,
"Resume of completed request-wise channel should return 200 (shadow)"
);
ct.cancel();
}
#[tokio::test]
async fn resume_common_while_primary_alive_creates_shadow() {
let ct = CancellationToken::new();
let trigger = Arc::new(Notify::new());
let url = start_test_server(ct.clone(), trigger).await;
let client = reqwest::Client::new();
let session_id = initialize_session(&client, &url).await;
let _get1 = open_standalone_get(&client, &url, &session_id).await;
assert_eq!(_get1.status(), 200);
tokio::time::sleep(Duration::from_millis(100)).await;
let get_resume = open_resume_get(&client, &url, &session_id, "0").await;
assert_eq!(
get_resume.status(),
200,
"Common channel resume while primary alive should return 200 (shadow)"
);
ct.cancel();
}
#[tokio::test]
async fn resume_common_while_primary_dead_becomes_primary() {
let ct = CancellationToken::new();
let trigger = Arc::new(Notify::new());
let url = start_test_server(ct.clone(), trigger.clone()).await;
let client = reqwest::Client::new();
let session_id = initialize_session(&client, &url).await;
send_initialized_notification(&client, &url, &session_id).await;
tokio::time::sleep(Duration::from_millis(200)).await;
let get1 = open_standalone_get(&client, &url, &session_id).await;
assert_eq!(get1.status(), 200);
drop(get1);
tokio::time::sleep(Duration::from_millis(100)).await;
let get_resume = open_resume_get(&client, &url, &session_id, "0").await;
assert_eq!(get_resume.status(), 200);
trigger.notify_one();
assert!(
wait_for_sse_event(get_resume, "tools/list_changed", Duration::from_secs(3)).await,
"Resumed stream that replaced dead primary should receive notifications"
);
ct.cancel();
}
#[tokio::test]
async fn post_reconnect_and_standalone_coexist() {
let ct = CancellationToken::new();
let trigger = Arc::new(Notify::new());
let url = start_test_server(ct.clone(), trigger).await;
let client = reqwest::Client::new();
let session_id = initialize_session(&client, &url).await;
let _standalone = open_standalone_get(&client, &url, &session_id).await;
assert_eq!(_standalone.status(), 200);
tokio::time::sleep(Duration::from_millis(100)).await;
let reconnect1 = open_resume_get(&client, &url, &session_id, "0/0").await;
assert_eq!(
reconnect1.status(),
200,
"POST reconnection should get shadow, not replace primary"
);
tokio::time::sleep(Duration::from_millis(100)).await;
let reconnect2 = open_resume_get(&client, &url, &session_id, "0/1").await;
assert_eq!(
reconnect2.status(),
200,
"Second POST reconnection should also succeed"
);
ct.cancel();
}
#[tokio::test]
async fn reconnect_after_stream_timeout() {
let ct = CancellationToken::new();
let trigger = Arc::new(Notify::new());
let url = start_test_server(ct.clone(), trigger.clone()).await;
let client = reqwest::Client::new();
let session_id = initialize_session(&client, &url).await;
send_initialized_notification(&client, &url, &session_id).await;
tokio::time::sleep(Duration::from_millis(200)).await;
let get1 = open_standalone_get(&client, &url, &session_id).await;
assert_eq!(get1.status(), 200);
drop(get1);
tokio::time::sleep(Duration::from_millis(100)).await;
let get2 = open_standalone_get(&client, &url, &session_id).await;
assert_eq!(get2.status(), 200);
trigger.notify_one();
assert!(
wait_for_sse_event(get2, "tools/list_changed", Duration::from_secs(3)).await,
"Reconnected stream should receive notifications"
);
ct.cancel();
}
#[tokio::test]
async fn get_without_valid_session_returns_404() {
let ct = CancellationToken::new();
let trigger = Arc::new(Notify::new());
let url = start_test_server(ct.clone(), trigger).await;
let client = reqwest::Client::new();
let resp = client
.get(&url)
.header("Accept", ACCEPT_SSE)
.header("Mcp-Session-Id", "nonexistent-session-id")
.send()
.await
.expect("GET with invalid session");
assert_eq!(
resp.status().as_u16(),
404,
"GET with unknown session ID should return 404 Not Found per MCP spec"
);
ct.cancel();
}
#[tokio::test]
async fn get_without_session_id_header_returns_400() {
let ct = CancellationToken::new();
let trigger = Arc::new(Notify::new());
let url = start_test_server(ct.clone(), trigger).await;
let client = reqwest::Client::new();
let resp = client
.get(&url)
.header("Accept", ACCEPT_SSE)
.send()
.await
.expect("GET without session ID");
assert_eq!(
resp.status().as_u16(),
400,
"GET without session ID should return 400 Bad Request per MCP spec"
);
ct.cancel();
}
#[tokio::test]
async fn shadow_stream_does_not_receive_notifications() {
let ct = CancellationToken::new();
let trigger = Arc::new(Notify::new());
let url = start_test_server(ct.clone(), trigger.clone()).await;
let client = reqwest::Client::new();
let session_id = initialize_session(&client, &url).await;
send_initialized_notification(&client, &url, &session_id).await;
tokio::time::sleep(Duration::from_millis(200)).await;
let _get1 = open_standalone_get(&client, &url, &session_id).await;
assert_eq!(_get1.status(), 200);
tokio::time::sleep(Duration::from_millis(100)).await;
let get2 = open_standalone_get(&client, &url, &session_id).await;
assert_eq!(get2.status(), 200);
tokio::time::sleep(Duration::from_millis(100)).await;
trigger.notify_one();
let shadow_received =
wait_for_sse_event(get2, "tools/list_changed", Duration::from_millis(500)).await;
assert!(
!shadow_received,
"Shadow stream should NOT receive notifications"
);
ct.cancel();
}
#[tokio::test]
async fn dropping_shadows_does_not_affect_primary() {
let ct = CancellationToken::new();
let trigger = Arc::new(Notify::new());
let url = start_test_server(ct.clone(), trigger.clone()).await;
let client = reqwest::Client::new();
let session_id = initialize_session(&client, &url).await;
send_initialized_notification(&client, &url, &session_id).await;
tokio::time::sleep(Duration::from_millis(200)).await;
let get1 = open_standalone_get(&client, &url, &session_id).await;
assert_eq!(get1.status(), 200);
tokio::time::sleep(Duration::from_millis(100)).await;
for _ in 0..3 {
let shadow = open_standalone_get(&client, &url, &session_id).await;
assert_eq!(shadow.status(), 200);
drop(shadow);
tokio::time::sleep(Duration::from_millis(50)).await;
}
trigger.notify_one();
assert!(
wait_for_sse_event(get1, "tools/list_changed", Duration::from_secs(3)).await,
"Primary should still work after all shadows are dropped"
);
ct.cancel();
}
#[tokio::test]
async fn dead_primary_replacement_replays_cached_events() {
let ct = CancellationToken::new();
let trigger = Arc::new(Notify::new());
let url = start_test_server(ct.clone(), trigger.clone()).await;
let client = reqwest::Client::new();
let session_id = initialize_session(&client, &url).await;
send_initialized_notification(&client, &url, &session_id).await;
tokio::time::sleep(Duration::from_millis(200)).await;
let get1 = open_standalone_get(&client, &url, &session_id).await;
assert_eq!(get1.status(), 200);
tokio::time::sleep(Duration::from_millis(100)).await;
trigger.notify_one();
tokio::time::sleep(Duration::from_millis(200)).await;
drop(get1);
tokio::time::sleep(Duration::from_millis(100)).await;
let get_resume = open_resume_get(&client, &url, &session_id, "0").await;
assert_eq!(get_resume.status(), 200);
assert!(
wait_for_sse_event(get_resume, "tools/list_changed", Duration::from_secs(3)).await,
"Replacement primary should receive cached notification via sync() replay"
);
ct.cancel();
}
#[tokio::test]
async fn shadow_stream_limit_drops_oldest() {
let ct = CancellationToken::new();
let trigger = Arc::new(Notify::new());
let url = start_test_server(ct.clone(), trigger.clone()).await;
let client = reqwest::Client::new();
let session_id = initialize_session(&client, &url).await;
send_initialized_notification(&client, &url, &session_id).await;
tokio::time::sleep(Duration::from_millis(200)).await;
let get1 = open_standalone_get(&client, &url, &session_id).await;
assert_eq!(get1.status(), 200);
tokio::time::sleep(Duration::from_millis(100)).await;
let mut shadows = Vec::new();
for i in 0..35 {
let shadow = open_standalone_get(&client, &url, &session_id).await;
assert_eq!(shadow.status(), 200, "Shadow #{i} should succeed");
shadows.push(shadow);
}
trigger.notify_one();
assert!(
wait_for_sse_event(get1, "tools/list_changed", Duration::from_secs(3)).await,
"Primary should still work after exceeding shadow limit"
);
ct.cancel();
}