use crate::app::{collection::Collection, handler::RequestHandler};
#[cfg(feature = "http-server")]
use crate::transport::HttpServer;
use crate::transport::{StdIoServer, TransportProto};
use dashmap::{DashMap, DashSet};
use std::fmt::{Debug, Formatter};
use std::{sync::Arc, time::Duration};
use tokio_util::sync::CancellationToken;
use crate::middleware::{Middleware, Middlewares};
use crate::PROTOCOL_VERSIONS;
use crate::types::{
Cursor, Implementation, Prompt, PromptsCapability, ReadResourceResult, RequestId, Resource,
ResourceTemplate, ResourcesCapability, Tool, ToolsCapability, Uri,
resource::{Route, route::ResourceHandler},
};
#[cfg(feature = "tasks")]
use crate::shared::{TaskHandle, TaskTracker};
#[cfg(feature = "tracing")]
use crate::types::notification::LoggingLevel;
#[cfg(feature = "tasks")]
use crate::types::{ServerTasksCapability, Task, TaskPayload};
#[cfg(feature = "tracing")]
use tracing_subscriber::{Registry, filter::LevelFilter, reload::Handle};
#[cfg(any(feature = "tracing", feature = "tasks"))]
use crate::error::Error;
#[cfg(feature = "tracing")]
use crate::error::ErrorCode;
pub type RuntimeMcpOptions = Arc<McpOptions>;
pub struct McpOptions {
pub(crate) implementation: Implementation,
pub(crate) request_timeout: Duration,
pub(super) tools: Collection<Tool>,
pub(super) prompts: Collection<Prompt>,
pub(super) resources: Collection<Resource>,
pub(super) resources_templates: Collection<ResourceTemplate>,
pub(super) resource_subscriptions: DashSet<Uri>,
pub(super) middlewares: Option<Middlewares>,
tools_capability: Option<ToolsCapability>,
resources_capability: Option<ResourcesCapability>,
prompts_capability: Option<PromptsCapability>,
#[cfg(feature = "tasks")]
tasks_capability: Option<ServerTasksCapability>,
#[cfg(feature = "tracing")]
log_level: Option<Handle<LevelFilter, Registry>>,
protocol_ver: Option<&'static str>,
proto: Option<TransportProto>,
resource_routes: Route,
requests: DashMap<RequestId, CancellationToken>,
#[cfg(feature = "tasks")]
pub(super) tasks: TaskTracker,
}
impl Debug for McpOptions {
#[inline]
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let mut binding = f.debug_struct("McpOptions");
let dbg = binding
.field("implementation", &self.implementation)
.field("request_timeout", &self.request_timeout)
.field("tools_capability", &self.tools_capability)
.field("resources_capability", &self.resources_capability)
.field("prompts_capability", &self.prompts_capability)
.field("protocol_ver", &self.protocol_ver);
#[cfg(feature = "tasks")]
dbg.field("tasks_capability", &self.tasks_capability);
#[cfg(feature = "tracing")]
dbg.field("log_level", &self.log_level);
dbg.finish()
}
}
impl Default for McpOptions {
#[inline]
fn default() -> Self {
Self {
implementation: Default::default(),
request_timeout: Duration::from_secs(10),
tools: Collection::new(),
resources: Collection::new(),
prompts: Collection::new(),
resources_templates: Collection::new(),
proto: Default::default(),
protocol_ver: Default::default(),
tools_capability: Default::default(),
resources_capability: Default::default(),
prompts_capability: Default::default(),
#[cfg(feature = "tasks")]
tasks_capability: Default::default(),
resource_routes: Default::default(),
requests: Default::default(),
resource_subscriptions: Default::default(),
middlewares: None,
#[cfg(feature = "tracing")]
log_level: Default::default(),
#[cfg(feature = "tasks")]
tasks: TaskTracker::new(),
}
}
}
impl McpOptions {
pub fn with_stdio(mut self) -> Self {
self.proto = Some(TransportProto::StdIoServer(StdIoServer::new()));
self
}
#[cfg(feature = "http-server")]
pub fn set_http(mut self, http: HttpServer) -> Self {
self.proto = Some(TransportProto::HttpServer(Box::new(http)));
self
}
#[cfg(feature = "http-server")]
pub fn with_http<F: FnOnce(HttpServer) -> HttpServer>(mut self, config: F) -> Self {
self.proto = Some(TransportProto::HttpServer(Box::new(config(
HttpServer::default(),
))));
self
}
#[cfg(feature = "http-server")]
pub fn with_default_http(self) -> Self {
self.with_http(|http| http)
}
pub fn with_name(mut self, name: &str) -> Self {
self.implementation.name = name.into();
self
}
pub fn with_version(mut self, ver: &str) -> Self {
self.implementation.version = ver.into();
self
}
pub fn with_mcp_version(mut self, ver: &'static str) -> Self {
self.protocol_ver = Some(ver);
self
}
pub fn with_tools<F>(mut self, config: F) -> Self
where
F: FnOnce(ToolsCapability) -> ToolsCapability,
{
self.tools_capability = Some(config(Default::default()));
self
}
pub fn with_resources<F>(mut self, config: F) -> Self
where
F: FnOnce(ResourcesCapability) -> ResourcesCapability,
{
self.resources_capability = Some(config(Default::default()));
self
}
pub fn with_prompts<F>(mut self, config: F) -> Self
where
F: FnOnce(PromptsCapability) -> PromptsCapability,
{
self.prompts_capability = Some(config(Default::default()));
self
}
#[cfg(feature = "tasks")]
pub fn with_tasks<F>(mut self, config: F) -> Self
where
F: FnOnce(ServerTasksCapability) -> ServerTasksCapability,
{
self.tasks_capability = Some(config(Default::default()));
self
}
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.request_timeout = timeout;
self
}
#[cfg(feature = "tracing")]
pub fn with_logging(mut self, log_handle: Handle<LevelFilter, Registry>) -> Self {
self.log_level = Some(log_handle);
self
}
#[cfg(feature = "tracing")]
pub fn set_log_level(&self, level: LoggingLevel) -> Result<(), Error> {
if let Some(handle) = &self.log_level {
handle
.modify(|current| *current = level.into())
.map_err(|e| Error::new(ErrorCode::InternalError, e.to_string()))?;
}
Ok(())
}
#[cfg(feature = "tracing")]
pub(crate) fn log_level(&self) -> Option<LoggingLevel> {
match &self.log_level {
None => None,
Some(handle) => handle.clone_current().map(|x| x.into()),
}
}
pub(crate) fn track_request(&self, req_id: &RequestId) -> CancellationToken {
let token = CancellationToken::new();
self.requests.insert(req_id.clone(), token.clone());
token
}
pub(crate) fn cancel_request(&self, req_id: &RequestId) {
if let Some((_, token)) = self.requests.remove(req_id) {
token.cancel();
}
}
pub(crate) fn complete_request(&self, req_id: &RequestId) {
self.requests.remove(req_id);
}
#[cfg(feature = "tasks")]
pub(crate) fn list_tasks(&self) -> Vec<Task> {
self.tasks.tasks()
}
#[cfg(feature = "tasks")]
pub(crate) fn track_task(&self, task: Task) -> TaskHandle {
self.tasks.track(task)
}
#[cfg(feature = "tasks")]
pub(crate) fn cancel_task(&self, task_id: &str) -> Result<Task, Error> {
self.tasks.cancel(task_id)
}
#[cfg(feature = "tasks")]
pub(crate) fn get_task_status(&self, task_id: &str) -> Result<Task, Error> {
self.tasks.get_status(task_id)
}
#[cfg(feature = "tasks")]
pub(crate) async fn get_task_result(&self, task_id: &str) -> Result<TaskPayload, Error> {
self.tasks.get_result(task_id).await
}
pub(crate) fn add_tool(&mut self, tool: Tool) -> &mut Tool {
self.tools_capability.get_or_insert_default();
self.tools.as_mut().entry(tool.name.clone()).or_insert(tool)
}
pub(crate) fn add_resource(&mut self, resource: Resource) -> &mut Resource {
self.resources_capability.get_or_insert_default();
self.resources
.as_mut()
.entry(resource.uri.to_string())
.or_insert(resource)
}
pub(crate) fn add_resource_template(
&mut self,
template: ResourceTemplate,
handler: RequestHandler<ReadResourceResult>,
) -> &mut ResourceTemplate {
self.resources_capability.get_or_insert_default();
let name = template.name.clone();
self.resource_routes
.insert(&template.uri_template, name.clone(), handler);
self.resources_templates
.as_mut()
.entry(name)
.or_insert(template)
}
pub(crate) fn add_prompt(&mut self, prompt: Prompt) -> &mut Prompt {
self.prompts_capability.get_or_insert_default();
self.prompts
.as_mut()
.entry(prompt.name.clone())
.or_insert(prompt)
}
#[inline]
pub(crate) fn add_middleware(&mut self, middleware: Middleware) {
self.middlewares
.get_or_insert_with(Middlewares::new)
.add(middleware);
}
#[inline]
pub(crate) fn protocol_ver(&self) -> &'static str {
match self.protocol_ver {
Some(ver) => ver,
None => PROTOCOL_VERSIONS.last().unwrap(),
}
}
pub(crate) fn transport(&mut self) -> TransportProto {
let transport = self.proto.take();
transport.unwrap_or_default()
}
pub(super) fn transport_label(&self) -> String {
match &self.proto {
Some(TransportProto::StdIoServer(_)) => "stdio".to_owned(),
#[cfg(feature = "http-server")]
Some(TransportProto::HttpServer(http)) => http.url_label(),
_ => "(none)".to_owned(),
}
}
#[inline]
pub(crate) async fn get_tool(&self, name: &str) -> Option<Tool> {
self.tools.get(name).await
}
#[inline]
pub(crate) async fn list_tools_page(
&self,
cursor: Option<Cursor>,
page_size: usize,
) -> (Vec<Tool>, Option<Cursor>) {
self.tools.page_values(cursor, page_size).await
}
#[inline]
pub(crate) fn read_resource(&self, uri: &Uri) -> Option<(&ResourceHandler, Box<[String]>)> {
self.resource_routes.find(uri)
}
#[inline]
pub(crate) async fn list_resources_page(
&self,
cursor: Option<Cursor>,
page_size: usize,
) -> (Vec<Resource>, Option<Cursor>) {
self.resources.page_values(cursor, page_size).await
}
#[inline]
pub(crate) async fn list_resource_templates_page(
&self,
cursor: Option<Cursor>,
page_size: usize,
) -> (Vec<ResourceTemplate>, Option<Cursor>) {
self.resources_templates
.page_values(cursor, page_size)
.await
}
#[inline]
pub(crate) async fn get_prompt(&self, name: &str) -> Option<Prompt> {
self.prompts.get(name).await
}
#[inline]
pub(crate) async fn list_prompts_page(
&self,
cursor: Option<Cursor>,
page_size: usize,
) -> (Vec<Prompt>, Option<Cursor>) {
self.prompts.page_values(cursor, page_size).await
}
pub(crate) fn tools_capability(&self) -> Option<ToolsCapability> {
self.tools_capability.clone()
}
pub(crate) fn resources_capability(&self) -> Option<ResourcesCapability> {
self.resources_capability.clone()
}
pub(crate) fn prompts_capability(&self) -> Option<PromptsCapability> {
self.prompts_capability.clone()
}
#[cfg(feature = "tasks")]
pub(crate) fn tasks_capability(&self) -> Option<ServerTasksCapability> {
self.tasks_capability.clone()
}
#[inline]
pub(crate) fn is_resource_subscription_supported(&self) -> bool {
self.resources_capability
.as_ref()
.is_some_and(|res| res.subscribe)
}
#[inline]
pub(crate) fn is_resource_list_changed_supported(&self) -> bool {
self.resources_capability
.as_ref()
.is_some_and(|res| res.list_changed)
}
#[inline]
pub(crate) fn is_tools_list_changed_supported(&self) -> bool {
self.tools_capability
.as_ref()
.is_some_and(|tool| tool.list_changed)
}
#[inline]
pub(crate) fn is_prompts_list_changed_supported(&self) -> bool {
self.prompts_capability
.as_ref()
.is_some_and(|prompt| prompt.list_changed)
}
#[inline]
#[cfg(feature = "tasks")]
pub(crate) fn is_tasks_list_supported(&self) -> bool {
self.tasks_capability
.as_ref()
.is_some_and(|tasks| tasks.list.is_some())
}
#[inline]
#[cfg(feature = "tasks")]
pub(crate) fn is_tasks_cancellation_supported(&self) -> bool {
self.tasks_capability
.as_ref()
.is_some_and(|tasks| tasks.cancel.is_some())
}
#[inline]
#[cfg(feature = "tasks")]
pub(crate) fn is_task_augmented_tool_call_supported(&self) -> bool {
self.tasks_capability
.as_ref()
.and_then(|tasks| tasks.requests.as_ref())
.and_then(|req| req.tools.as_ref())
.is_some_and(|tools| tools.call.is_some())
}
pub(crate) fn into_runtime(mut self) -> RuntimeMcpOptions {
self.tools = self.tools.into_runtime();
self.prompts = self.prompts.into_runtime();
self.resources = self.resources.into_runtime();
self.resources_templates = self.resources_templates.into_runtime();
Arc::new(self)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::SDK_NAME;
use crate::error::{Error, ErrorCode};
use crate::types::resource::Uri;
use crate::types::resource::template::ResourceFunc;
use crate::types::{
GetPromptRequestParams, PromptMessage, ReadResourceRequestParams, ResourceContents, Role,
};
#[test]
fn it_creates_default_options() {
let options = McpOptions::default();
assert_eq!(options.implementation.name, SDK_NAME);
assert_eq!(options.implementation.version, env!("CARGO_PKG_VERSION"));
assert_eq!(options.tools.as_ref().len(), 0);
assert_eq!(options.resources.as_ref().len(), 0);
assert_eq!(options.resources_templates.as_ref().len(), 0);
assert_eq!(options.prompts.as_ref().len(), 0);
assert!(options.proto.is_none());
}
#[test]
fn it_takes_none_transport_by_default() {
let mut options = McpOptions::default();
let transport = options.transport();
assert!(matches!(transport, TransportProto::None));
}
#[test]
fn it_sets_and_takes_stdio_transport() {
let mut options = McpOptions::default().with_stdio();
let transport = options.transport();
assert!(matches!(transport, TransportProto::StdIoServer(_)));
}
#[test]
fn it_sets_server_name() {
let options = McpOptions::default().with_name("name");
assert_eq!(options.implementation.name, "name");
}
#[test]
fn it_sets_server_version() {
let options = McpOptions::default().with_version("1");
assert_eq!(options.implementation.version, "1");
}
#[tokio::test]
async fn it_adds_and_gets_tool() {
let mut options = McpOptions::default();
options.add_tool(Tool::new("tool", || async { "test" }));
let tool = options.get_tool("tool").await.unwrap();
assert_eq!(tool.name, "tool");
}
#[tokio::test]
async fn it_returns_tools() {
let mut options = McpOptions::default();
options.add_tool(Tool::new("tool", || async { "test" }));
let (tools, next_cursor) = options.list_tools_page(None, 10).await;
assert_eq!(tools.len(), 1);
assert_eq!(next_cursor, None);
}
#[tokio::test]
async fn it_returns_resources() {
let mut options = McpOptions::default();
options.add_resource(Resource::new("res://res", "res"));
let (resources, next_cursor) = options.list_resources_page(None, 10).await;
assert_eq!(resources.len(), 1);
assert_eq!(next_cursor, None);
}
#[tokio::test]
async fn it_adds_and_reads_resource_template() {
let mut options = McpOptions::default();
let handler = |uri: Uri| async move {
ResourceContents::new(uri)
.with_mime("text/plain")
.with_text("some text")
};
options.add_resource_template(
ResourceTemplate::new("res://res", "test"),
ResourceFunc::new(handler),
);
let req = ReadResourceRequestParams {
uri: "res://res".into(),
meta: None,
args: None,
};
let res = options.read_resource(&req.uri).unwrap();
let res = res.0.call(req.into()).await.unwrap();
assert_eq!(res.contents.len(), 1);
}
#[tokio::test]
async fn it_adds_and_reads_resource_template_with_err() {
let mut options = McpOptions::default();
let handler = |_: Uri| async move {
Err::<ResourceContents, _>(Error::from(ErrorCode::ResourceNotFound))
};
options.add_resource_template(
ResourceTemplate::new("res://res", "test"),
ResourceFunc::new(handler),
);
let req = ReadResourceRequestParams {
uri: "res://res".into(),
meta: None,
args: None,
};
let res = options.read_resource(&req.uri).unwrap();
let res = res.0.call(req.into()).await;
assert!(res.is_err());
}
#[tokio::test]
async fn it_returns_resource_templates() {
let mut options = McpOptions::default();
let handler = |uri: Uri| async move {
ResourceContents::new(uri)
.with_mime("text/plain")
.with_text("some text")
};
options.add_resource_template(
ResourceTemplate::new("res://res", "test"),
ResourceFunc::new(handler),
);
let (resources, next_cursor) = options.list_resource_templates_page(None, 10).await;
assert_eq!(resources.len(), 1);
assert_eq!(next_cursor, None);
}
#[tokio::test]
async fn it_adds_and_gets_prompt() {
let mut options = McpOptions::default();
options.add_prompt(Prompt::new("test", || async { [("test", Role::User)] }));
let prompt = options.get_prompt("test").await.unwrap();
assert_eq!(prompt.name, "test");
let req = GetPromptRequestParams {
name: "test".into(),
args: None,
meta: None,
};
let result = prompt.call(req.into()).await.unwrap();
let msg = result.messages.first().unwrap();
assert_eq!(msg.role, Role::User)
}
#[tokio::test]
async fn it_adds_and_gets_prompt_with_error() {
let mut options = McpOptions::default();
options.add_prompt(Prompt::new("test", || async {
Err::<PromptMessage, _>(Error::from(ErrorCode::InternalError))
}));
let prompt = options.get_prompt("test").await.unwrap();
assert_eq!(prompt.name, "test");
let req = GetPromptRequestParams {
name: "test".into(),
args: None,
meta: None,
};
let result = prompt.call(req.into()).await;
assert!(result.is_err())
}
#[tokio::test]
async fn it_returns_prompts() {
let mut options = McpOptions::default();
options.add_prompt(Prompt::new("test", || async { [("test", Role::User)] }));
let (prompts, next_cursor) = options.list_prompts_page(None, 10).await;
assert_eq!(prompts.len(), 1);
assert_eq!(next_cursor, None);
}
#[test]
fn it_returns_some_tool_capabilities_if_configured() {
let options = McpOptions::default().with_tools(|tools| tools.with_list_changed());
let tools_capability = options.tools_capability().unwrap();
assert!(tools_capability.list_changed);
}
#[test]
fn it_returns_some_tool_capabilities_if_there_are_tools() {
let mut options = McpOptions::default();
options.add_tool(Tool::new("tool", || async { "test" }));
let tools_capability = options.tools_capability().unwrap();
assert!(!tools_capability.list_changed);
}
#[test]
fn it_returns_none_tool_capabilities() {
let options = McpOptions::default();
assert!(options.tools_capability().is_none());
}
#[test]
fn it_returns_some_resource_capabilities_if_configured() {
let options = McpOptions::default().with_resources(|res| res.with_list_changed());
let resources_capability = options.resources_capability().unwrap();
assert!(resources_capability.list_changed);
}
#[test]
fn it_returns_some_resources_capability_if_there_are_resources() {
let mut options = McpOptions::default();
options.add_resource(Resource::new("res", "test"));
let resources_capability = options.resources_capability().unwrap();
assert!(!resources_capability.list_changed);
}
#[test]
fn it_returns_some_resources_capability_if_there_are_resource_templates() {
let mut options = McpOptions::default();
let handler = |_: Uri| async move {
Err::<ResourceContents, _>(Error::from(ErrorCode::ResourceNotFound))
};
options.add_resource_template(
ResourceTemplate::new("res://test", "test"),
ResourceFunc::new(handler),
);
let resources_capability = options.resources_capability().unwrap();
assert!(!resources_capability.list_changed);
}
#[test]
fn it_returns_none_resources_capability() {
let options = McpOptions::default();
assert!(options.resources_capability().is_none());
}
#[test]
fn it_returns_some_prompts_capability_if_configured() {
let options = McpOptions::default().with_prompts(|prompts| prompts.with_list_changed());
let prompts_capability = options.prompts_capability().unwrap();
assert!(prompts_capability.list_changed);
}
#[test]
fn it_returns_some_prompts_capability_if_there_are_tools() {
let mut options = McpOptions::default();
options.add_prompt(Prompt::new("test", || async {
Err::<PromptMessage, _>(Error::from(ErrorCode::InternalError))
}));
let prompts_capability = options.prompts_capability().unwrap();
assert!(!prompts_capability.list_changed);
}
#[test]
fn it_returns_none_prompts_capability() {
let options = McpOptions::default();
assert!(options.prompts_capability().is_none());
}
#[test]
fn it_returns_stdio_label() {
let options = McpOptions::default().with_stdio();
assert_eq!(options.transport_label(), "stdio");
}
#[test]
fn it_returns_none_label_when_no_transport() {
let options = McpOptions::default();
assert_eq!(options.transport_label(), "(none)");
}
#[cfg(feature = "http-server")]
#[test]
fn it_returns_http_label_when_http_transport() {
let options = McpOptions::default().with_default_http();
assert_eq!(options.transport_label(), "http://127.0.0.1:3000/mcp");
}
}