use clap::Parser;
use rmcp::{
model::{
ResourceUpdatedNotificationParam, ServerCapabilities, ServerInfo, SubscribeRequestParams,
},
service::{MaybeSendFuture, RequestContext},
transport::streamable_http_server::{
session::local::LocalSessionManager, StreamableHttpServerConfig, StreamableHttpService,
},
ErrorData as McpError, RoleServer, ServerHandler,
};
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio_util::sync::CancellationToken;
#[derive(Parser)]
#[command(
name = "subscribe-test-server",
about = "Subscribe-capable MCP server for shell smoke tests"
)]
struct Cli {
#[arg(long, default_value_t = 0)]
port: u16,
#[arg(long, default_value_t = 0)]
interval: u64,
}
#[derive(Clone)]
struct SubscribeTestServer {
subscribed_uris: Arc<Mutex<Vec<String>>>,
}
impl SubscribeTestServer {
fn new() -> Self {
Self {
subscribed_uris: Arc::new(Mutex::new(Vec::new())),
}
}
}
impl ServerHandler for SubscribeTestServer {
fn get_info(&self) -> ServerInfo {
ServerInfo::new(
ServerCapabilities::builder()
.enable_resources()
.enable_resources_subscribe()
.build(),
)
}
fn subscribe(
&self,
request: SubscribeRequestParams,
context: RequestContext<RoleServer>,
) -> impl std::future::Future<Output = Result<(), McpError>> + MaybeSendFuture + '_ {
let uri = request.uri.clone();
let peer = context.peer.clone();
let uris = self.subscribed_uris.clone();
async move {
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let _ = peer
.notify_resource_updated(ResourceUpdatedNotificationParam { uri: uri.clone() })
.await;
});
uris.lock().await.push(request.uri.clone());
Ok(())
}
}
}
async fn run_http(port: u16, interval_ms: u64) {
let ct = CancellationToken::new();
let config = StreamableHttpServerConfig::default()
.with_sse_keep_alive(None)
.with_cancellation_token(ct.child_token());
let server = SubscribeTestServer::new();
if interval_ms > 0 {
let uris = server.subscribed_uris.clone();
let ct_loop = ct.child_token();
tokio::spawn(async move {
let mut ticker = tokio::time::interval(std::time::Duration::from_millis(interval_ms));
loop {
tokio::select! {
_ = ct_loop.cancelled() => break,
_ = ticker.tick() => {
let uris_snap = uris.lock().await.clone();
if !uris_snap.is_empty() {
eprintln!(
"[subscribe-test-server] periodic tick: {} subscribed uri(s) recorded \
(peer-level periodic notify requires per-session peer handles)",
uris_snap.len()
);
}
}
}
}
});
}
let server_clone = server.clone();
let service: StreamableHttpService<SubscribeTestServer, LocalSessionManager> =
StreamableHttpService::new(move || Ok(server_clone.clone()), Default::default(), config);
let router = axum::Router::new().nest_service("/mcp", service);
let addr = format!("127.0.0.1:{port}");
let listener = match tokio::net::TcpListener::bind(&addr).await {
Ok(l) => l,
Err(e) => {
eprintln!("[subscribe-test-server] failed to bind {addr}: {e}");
std::process::exit(1);
}
};
let bound_addr = match listener.local_addr() {
Ok(a) => a,
Err(e) => {
eprintln!("[subscribe-test-server] local_addr failed: {e}");
std::process::exit(1);
}
};
println!("SUBSCRIBE_TEST_SERVER_URL=http://{bound_addr}/mcp");
let ct_shutdown = ct.clone();
tokio::spawn(async move {
if let Err(e) = axum::serve(listener, router)
.with_graceful_shutdown(async move { ct_shutdown.cancelled_owned().await })
.await
{
eprintln!("[subscribe-test-server] http server error: {e}");
}
});
wait_for_signal().await;
ct.cancel();
}
#[cfg(unix)]
async fn wait_for_signal() {
use tokio::signal::unix::{signal, SignalKind};
let mut sigint = signal(SignalKind::interrupt()).expect("SIGINT handler");
let mut sigterm = signal(SignalKind::terminate()).expect("SIGTERM handler");
tokio::select! {
_ = sigint.recv() => {},
_ = sigterm.recv() => {},
}
}
#[cfg(not(unix))]
async fn wait_for_signal() {
let _ = tokio::signal::ctrl_c().await;
}
#[tokio::main]
async fn main() {
let cli = Cli::parse();
run_http(cli.port, cli.interval).await;
}