use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
use serde_json::{Value, json};
use tracing::debug;
use crate::resource::{McpResource, resource_to_descriptor};
use crate::{McpResult, SessionContext};
use turul_mcp_protocol::{McpError, WithMeta};
fn extract_limit_from_params(params: &Option<Value>) -> Option<usize> {
params.as_ref().and_then(|p| {
p.get("limit")
.or_else(|| p.get("_meta").and_then(|m| m.get("limit")))
.and_then(|v| v.as_u64())
.map(|n| n as usize)
})
}
#[async_trait]
pub trait McpHandler: Send + Sync {
async fn handle(&self, params: Option<Value>) -> McpResult<Value>;
async fn handle_with_session(
&self,
params: Option<Value>,
_session: Option<SessionContext>,
) -> McpResult<Value> {
self.handle(params).await
}
fn supported_methods(&self) -> Vec<String>;
}
pub struct PingHandler;
#[async_trait]
impl McpHandler for PingHandler {
async fn handle(&self, _params: Option<Value>) -> McpResult<Value> {
Ok(serde_json::json!({}))
}
fn supported_methods(&self) -> Vec<String> {
vec!["ping".to_string()]
}
}
pub struct CompletionHandler;
#[async_trait]
impl McpHandler for CompletionHandler {
async fn handle(&self, _params: Option<Value>) -> McpResult<Value> {
use turul_mcp_protocol::completion::{CompleteResult, CompletionResult};
let values = vec!["example1".to_string(), "example2".to_string()];
let completion_result = CompletionResult::new(values);
let response = CompleteResult::new(completion_result);
serde_json::to_value(response).map_err(McpError::from)
}
fn supported_methods(&self) -> Vec<String> {
vec!["completion/complete".to_string()]
}
}
pub struct PromptsListHandler {
prompts: HashMap<String, Arc<dyn McpPrompt>>,
}
impl Default for PromptsListHandler {
fn default() -> Self {
Self::new()
}
}
impl PromptsListHandler {
pub fn new() -> Self {
Self {
prompts: HashMap::new(),
}
}
pub fn add_prompt<P: McpPrompt + 'static>(mut self, prompt: P) -> Self {
self.prompts
.insert(prompt.name().to_string(), Arc::new(prompt));
self
}
pub fn add_prompt_arc(mut self, prompt: Arc<dyn McpPrompt>) -> Self {
self.prompts.insert(prompt.name().to_string(), prompt);
self
}
}
#[async_trait]
impl McpHandler for PromptsListHandler {
async fn handle(&self, params: Option<Value>) -> McpResult<Value> {
use turul_mcp_protocol::meta::{Cursor, PaginatedResponse};
use turul_mcp_protocol::prompts::{ListPromptsParams, ListPromptsResult, Prompt};
const DEFAULT_PAGE_SIZE: usize = 50;
const MAX_PAGE_SIZE: usize = 1000;
const MIN_PAGE_SIZE: usize = 1;
let page_size = match extract_limit_from_params(¶ms) {
Some(0) => {
return Err(McpError::InvalidParameters(
"limit must be at least 1 (zero would return empty pages forever)".to_string(),
));
}
Some(n) => n.clamp(MIN_PAGE_SIZE, MAX_PAGE_SIZE),
None => DEFAULT_PAGE_SIZE,
};
let list_params = if let Some(params_value) = params {
serde_json::from_value::<ListPromptsParams>(params_value).map_err(|e| {
McpError::InvalidParameters(format!("Invalid parameters for prompts/list: {}", e))
})?
} else {
ListPromptsParams::new()
};
let cursor = list_params.cursor;
debug!(
"Listing prompts with cursor: {:?}, limit: {}",
cursor, page_size
);
let mut all_prompts: Vec<Prompt> = self
.prompts
.values()
.map(|p| {
let mut prompt = Prompt::new(p.name());
if let Some(desc) = p.description() {
prompt = prompt.with_description(desc);
}
if let Some(args) = p.arguments() {
prompt = prompt.with_arguments(args.clone());
}
prompt
})
.collect();
all_prompts.sort_by(|a, b| a.name.cmp(&b.name));
let start_index = if let Some(cursor) = &cursor {
let cursor_name = cursor.as_str();
all_prompts
.iter()
.position(|p| p.name.as_str() > cursor_name)
.unwrap_or(all_prompts.len())
} else {
0 };
let end_index = std::cmp::min(start_index + page_size, all_prompts.len());
let page_prompts: Vec<Prompt> = all_prompts[start_index..end_index].to_vec();
let has_more = end_index < all_prompts.len();
let next_cursor = if has_more {
page_prompts.last().map(|p| Cursor::new(&p.name))
} else {
None
};
debug!(
"Prompt pagination: start={}, end={}, page_size={}, has_more={}, next_cursor={:?}",
start_index,
end_index,
page_prompts.len(),
has_more,
next_cursor
);
let mut base_response = ListPromptsResult::new(page_prompts);
if let Some(ref cursor) = next_cursor {
base_response = base_response.with_next_cursor(cursor.clone());
}
let total = Some(all_prompts.len() as u64);
let next_cursor_clone = next_cursor.clone();
let mut paginated_response =
PaginatedResponse::with_pagination(base_response, next_cursor, total, has_more);
if let Some(request_meta) = list_params.meta {
let mut response_meta = paginated_response.meta().cloned().unwrap_or_else(|| {
turul_mcp_protocol::meta::Meta::with_pagination(next_cursor_clone, total, has_more)
});
for (key, value) in request_meta {
response_meta.extra.insert(key, value);
}
paginated_response = paginated_response.with_meta(response_meta);
}
serde_json::to_value(paginated_response).map_err(McpError::from)
}
fn supported_methods(&self) -> Vec<String> {
vec!["prompts/list".to_string()]
}
}
pub struct PromptsGetHandler {
prompts: HashMap<String, Arc<dyn McpPrompt>>,
}
impl Default for PromptsGetHandler {
fn default() -> Self {
Self::new()
}
}
impl PromptsGetHandler {
pub fn new() -> Self {
Self {
prompts: HashMap::new(),
}
}
pub fn add_prompt<P: McpPrompt + 'static>(mut self, prompt: P) -> Self {
self.prompts
.insert(prompt.name().to_string(), Arc::new(prompt));
self
}
pub fn add_prompt_arc(mut self, prompt: Arc<dyn McpPrompt>) -> Self {
self.prompts.insert(prompt.name().to_string(), prompt);
self
}
}
#[async_trait]
impl McpHandler for PromptsGetHandler {
async fn handle(&self, params: Option<Value>) -> McpResult<Value> {
use std::collections::HashMap as StdHashMap;
use turul_mcp_protocol::prompts::{GetPromptParams, GetPromptResult};
let params = params.ok_or_else(|| McpError::missing_param("GetPromptParams"))?;
let get_params: GetPromptParams = serde_json::from_value(params)?;
debug!(
"Getting prompt: {} with arguments: {:?}",
get_params.name, get_params.arguments
);
let prompt = self.prompts.get(&get_params.name).ok_or_else(|| {
McpError::invalid_param_type("name", "existing prompt name", &get_params.name)
})?;
if let Some(prompt_arguments) = prompt.arguments() {
let empty_args = StdHashMap::new();
let provided_args = get_params.arguments.as_ref().unwrap_or(&empty_args);
for arg_def in prompt_arguments {
let is_required = arg_def.required.unwrap_or(false);
if is_required && !provided_args.contains_key(&arg_def.name) {
return Err(McpError::InvalidParameters(format!(
"Missing required argument '{}' for prompt '{}'",
arg_def.name, get_params.name
)));
}
}
}
let arguments = match get_params.arguments {
Some(args) => {
let mut value_args = StdHashMap::new();
for (key, value) in args {
value_args.insert(key, serde_json::Value::String(value));
}
value_args
}
None => StdHashMap::new(),
};
let messages = prompt.render(Some(arguments)).await?;
let mut response = GetPromptResult::new(messages);
if let Some(desc) = prompt.description() {
response = response.with_description(desc);
}
if let Some(meta) = get_params.meta {
response = response.with_meta(meta);
}
serde_json::to_value(response).map_err(McpError::from)
}
fn supported_methods(&self) -> Vec<String> {
vec!["prompts/get".to_string()]
}
}
pub type PromptsHandler = PromptsListHandler;
pub use crate::prompt::McpPrompt;
pub struct ResourcesListHandler {
resources: HashMap<String, Arc<dyn McpResource>>,
}
impl Default for ResourcesListHandler {
fn default() -> Self {
Self::new()
}
}
impl ResourcesListHandler {
pub fn new() -> Self {
Self {
resources: HashMap::new(),
}
}
pub fn add_resource<R: McpResource + 'static>(mut self, resource: R) -> Self {
self.resources
.insert(resource.uri().to_string(), Arc::new(resource));
self
}
pub fn add_resource_arc(mut self, resource: Arc<dyn McpResource>) -> Self {
self.resources.insert(resource.uri().to_string(), resource);
self
}
}
#[async_trait]
impl McpHandler for ResourcesListHandler {
async fn handle(&self, params: Option<Value>) -> McpResult<Value> {
use turul_mcp_protocol::meta::{Cursor, PaginatedResponse};
use turul_mcp_protocol::resources::{ListResourcesParams, ListResourcesResult, Resource};
const DEFAULT_PAGE_SIZE: usize = 50;
const MAX_PAGE_SIZE: usize = 1000;
const MIN_PAGE_SIZE: usize = 1;
let page_size = match extract_limit_from_params(¶ms) {
Some(0) => {
return Err(McpError::InvalidParameters(
"limit must be at least 1 (zero would return empty pages forever)".to_string(),
));
}
Some(n) => n.clamp(MIN_PAGE_SIZE, MAX_PAGE_SIZE),
None => DEFAULT_PAGE_SIZE,
};
let list_params = if let Some(params_value) = params {
serde_json::from_value::<ListResourcesParams>(params_value).map_err(|e| {
McpError::InvalidParameters(format!("Invalid parameters for resources/list: {}", e))
})?
} else {
ListResourcesParams::new()
};
let cursor = list_params.cursor;
debug!(
"Listing resources with cursor: {:?}, limit: {}",
cursor, page_size
);
let mut all_resources: Vec<Resource> = self
.resources
.values()
.map(|r| resource_to_descriptor(r.as_ref()))
.collect();
all_resources.sort_by(|a, b| a.uri.cmp(&b.uri));
let start_index = if let Some(cursor) = &cursor {
let cursor_uri = cursor.as_str();
all_resources
.iter()
.position(|r| r.uri.as_str() > cursor_uri)
.unwrap_or(all_resources.len())
} else {
0 };
let end_index = std::cmp::min(start_index + page_size, all_resources.len());
let page_resources: Vec<Resource> = all_resources[start_index..end_index].to_vec();
let has_more = end_index < all_resources.len();
let next_cursor = if has_more {
page_resources.last().map(|r| Cursor::new(&r.uri))
} else {
None
};
debug!(
"Resource pagination: start={}, end={}, page_size={}, has_more={}, next_cursor={:?}",
start_index,
end_index,
page_resources.len(),
has_more,
next_cursor
);
let mut base_response = ListResourcesResult::new(page_resources);
if let Some(ref cursor) = next_cursor {
base_response = base_response.with_next_cursor(cursor.clone());
}
let total = Some(all_resources.len() as u64);
let next_cursor_clone = next_cursor.clone();
let mut paginated_response =
PaginatedResponse::with_pagination(base_response, next_cursor, total, has_more);
if let Some(request_meta) = list_params.meta {
let mut response_meta = paginated_response.meta().cloned().unwrap_or_else(|| {
turul_mcp_protocol::meta::Meta::with_pagination(next_cursor_clone, total, has_more)
});
for (key, value) in request_meta {
response_meta.extra.insert(key, value);
}
paginated_response = paginated_response.with_meta(response_meta);
}
serde_json::to_value(paginated_response).map_err(McpError::from)
}
fn supported_methods(&self) -> Vec<String> {
vec!["resources/list".to_string()]
}
}
pub struct ResourcesReadHandler {
resources: HashMap<String, Arc<dyn McpResource>>,
uri_registry: Arc<crate::uri_template::UriTemplateRegistry>,
security_middleware: Option<Arc<crate::security::SecurityMiddleware>>,
}
impl Default for ResourcesReadHandler {
fn default() -> Self {
Self::new()
}
}
impl ResourcesReadHandler {
pub fn new() -> Self {
Self {
resources: HashMap::new(),
uri_registry: Arc::new(crate::uri_template::UriTemplateRegistry::new()),
security_middleware: Some(Arc::new(crate::security::SecurityMiddleware::default())),
}
}
pub fn with_security(mut self, middleware: Arc<crate::security::SecurityMiddleware>) -> Self {
self.security_middleware = Some(middleware);
self
}
pub fn without_security(mut self) -> Self {
self.security_middleware = None;
self
}
pub fn add_resource<R: McpResource + 'static>(mut self, resource: R) -> Self {
self.resources
.insert(resource.uri().to_string(), Arc::new(resource));
self
}
pub fn add_resource_arc(mut self, resource: Arc<dyn McpResource>) -> Self {
self.resources.insert(resource.uri().to_string(), resource);
self
}
pub fn add_template_resource<R: McpResource + 'static>(
mut self,
template: crate::uri_template::UriTemplate,
resource: R,
) -> Self {
Arc::get_mut(&mut self.uri_registry)
.expect("URI registry should not be shared yet")
.register(template.clone());
let pattern = template.pattern().to_string();
self.resources.insert(pattern, Arc::new(resource));
self
}
pub fn add_template_resource_arc(
mut self,
template: crate::uri_template::UriTemplate,
resource: Arc<dyn McpResource>,
) -> Self {
Arc::get_mut(&mut self.uri_registry)
.expect("URI registry should not be shared yet")
.register(template.clone());
let pattern = template.pattern().to_string();
self.resources.insert(pattern, resource);
self
}
}
#[async_trait]
impl McpHandler for ResourcesReadHandler {
async fn handle(&self, params: Option<Value>) -> McpResult<Value> {
self.handle_with_session(params, None).await
}
async fn handle_with_session(
&self,
params: Option<Value>,
session: Option<SessionContext>,
) -> McpResult<Value> {
use turul_mcp_protocol::resources::{ReadResourceParams, ReadResourceResult};
if let Some(security_middleware) = &self.security_middleware {
security_middleware.validate_request(
"resources/read",
params.as_ref(),
session.as_ref(),
)?;
}
let params = params.ok_or_else(|| McpError::missing_param("ReadResourceParams"))?;
let read_params: ReadResourceParams = serde_json::from_value(params)?;
debug!("Reading resource with URI: {}", read_params.uri);
if let Some(security_middleware) = &self.security_middleware {
let uri_params = serde_json::json!({"uri": read_params.uri});
security_middleware.validate_request(
"resources/read",
Some(&uri_params),
session.as_ref(),
)?;
}
if let Some(template) = self.uri_registry.find_matching(&read_params.uri) {
debug!("Found matching URI template: {}", template.pattern());
let template_vars = template.extract(&read_params.uri)?;
debug!("Extracted template variables: {:?}", template_vars);
let resource = self.resources.get(template.pattern()).ok_or_else(|| {
McpError::invalid_param_type(
"template",
"registered template pattern",
template.pattern(),
)
})?;
let mut enhanced_params = serde_json::to_value(&read_params)?;
if let Some(params_obj) = enhanced_params.as_object_mut() {
params_obj.insert(
"template_variables".to_string(),
serde_json::to_value(template_vars)?,
);
}
let contents = resource
.read(Some(enhanced_params), session.as_ref())
.await?;
if let Some(security_middleware) = &self.security_middleware {
for content in &contents {
match content {
turul_mcp_protocol::resources::ResourceContent::Text(text_content) => {
if let Some(mime_type) = &text_content.mime_type {
security_middleware
.resource_access_control()
.validate_mime_type(mime_type)?;
}
let size = text_content.text.len() as u64;
security_middleware
.resource_access_control()
.validate_size(size)?;
}
turul_mcp_protocol::resources::ResourceContent::Blob(blob_content) => {
if let Some(mime_type) = &blob_content.mime_type {
security_middleware
.resource_access_control()
.validate_mime_type(mime_type)?;
}
let size = blob_content.blob.len() as u64;
security_middleware
.resource_access_control()
.validate_size(size)?;
}
}
}
}
let response = ReadResourceResult::new(contents);
return serde_json::to_value(response).map_err(McpError::from);
}
let resource = self.resources.get(&read_params.uri).ok_or_else(|| {
McpError::invalid_param_type(
"uri",
"existing resource URI or template pattern",
&read_params.uri,
)
})?;
let params = Some(serde_json::to_value(&read_params)?);
let contents = resource.read(params, session.as_ref()).await?;
if let Some(security_middleware) = &self.security_middleware {
for content in &contents {
match content {
turul_mcp_protocol::resources::ResourceContent::Text(text_content) => {
if let Some(mime_type) = &text_content.mime_type {
security_middleware
.resource_access_control()
.validate_mime_type(mime_type)?;
}
let size = text_content.text.len() as u64;
security_middleware
.resource_access_control()
.validate_size(size)?;
}
turul_mcp_protocol::resources::ResourceContent::Blob(blob_content) => {
if let Some(mime_type) = &blob_content.mime_type {
security_middleware
.resource_access_control()
.validate_mime_type(mime_type)?;
}
let size = blob_content.blob.len() as u64;
security_middleware
.resource_access_control()
.validate_size(size)?;
}
}
}
}
let response = ReadResourceResult::new(contents);
serde_json::to_value(response).map_err(McpError::from)
}
fn supported_methods(&self) -> Vec<String> {
vec!["resources/read".to_string()]
}
}
pub type ResourcesHandler = ResourcesListHandler;
pub struct LoggingHandler;
#[async_trait]
impl McpHandler for LoggingHandler {
async fn handle(&self, params: Option<Value>) -> McpResult<Value> {
use turul_mcp_protocol::logging::SetLevelParams;
if let Some(params) = params {
let set_level_params: SetLevelParams = serde_json::from_value(params)?;
tracing::warn!(
"LoggingHandler.handle() called without session context - cannot store level per-session"
);
tracing::info!("Would set log level to: {:?}", set_level_params.level);
serde_json::to_value(json!({})).map_err(McpError::from)
} else {
Err(McpError::missing_param("SetLevelParams"))
}
}
async fn handle_with_session(
&self,
params: Option<Value>,
session: Option<SessionContext>,
) -> McpResult<Value> {
use turul_mcp_protocol::logging::SetLevelParams;
let params = params.ok_or_else(|| McpError::missing_param("params"))?;
let set_level_params: SetLevelParams = serde_json::from_value(params)?;
let session_ctx = session
.ok_or_else(|| McpError::configuration("Session required for logging/setLevel"))?;
if !(session_ctx.is_initialized)().await {
return Err(McpError::configuration(
"Session must be initialized before setting logging level",
));
}
session_ctx.set_logging_level(set_level_params.level).await;
tracing::debug!(
"đ¯ Set logging level for session {}: {:?}",
session_ctx.session_id,
set_level_params.level
);
let stored_level = session_ctx.get_logging_level().await;
if stored_level != set_level_params.level {
return Err(McpError::configuration(
"Failed to persist logging level in session storage",
));
}
session_ctx
.notify_log(
turul_mcp_protocol::logging::LoggingLevel::Info,
serde_json::json!(format!(
"Logging level changed to: {:?}",
set_level_params.level
)),
None,
None,
)
.await;
Ok(json!({}))
}
fn supported_methods(&self) -> Vec<String> {
vec!["logging/setLevel".to_string()]
}
}
pub struct RootsHandler {
roots: Vec<turul_mcp_protocol::roots::Root>,
}
impl Default for RootsHandler {
fn default() -> Self {
Self::new()
}
}
impl RootsHandler {
pub fn new() -> Self {
Self { roots: Vec::new() }
}
pub fn add_root(mut self, root: turul_mcp_protocol::roots::Root) -> Self {
self.roots.push(root);
self
}
}
#[async_trait]
impl McpHandler for RootsHandler {
async fn handle(&self, params: Option<Value>) -> McpResult<Value> {
use turul_mcp_protocol::meta::{Cursor, PaginatedResponse};
use turul_mcp_protocol::roots::ListRootsResult;
let cursor = params
.as_ref()
.and_then(|p| p.get("cursor"))
.and_then(|c| c.as_str())
.map(Cursor::from);
debug!("Listing roots with cursor: {:?}", cursor);
let mut all_roots = self.roots.clone();
all_roots.sort_by(|a, b| a.uri.cmp(&b.uri));
const DEFAULT_PAGE_SIZE: usize = 50; let page_size = DEFAULT_PAGE_SIZE;
let start_index = if let Some(cursor) = &cursor {
let cursor_uri = cursor.as_str();
all_roots
.iter()
.position(|r| r.uri.as_str() > cursor_uri)
.unwrap_or(all_roots.len())
} else {
0 };
let end_index = std::cmp::min(start_index + page_size, all_roots.len());
let page_roots = all_roots[start_index..end_index].to_vec();
let has_more = end_index < all_roots.len();
let next_cursor = if has_more {
page_roots.last().map(|r| Cursor::new(&r.uri))
} else {
None
};
debug!(
"Root pagination: start={}, end={}, page_size={}, has_more={}, next_cursor={:?}",
start_index,
end_index,
page_roots.len(),
has_more,
next_cursor
);
let base_response = ListRootsResult::new(page_roots);
let total = Some(all_roots.len() as u64);
let paginated_response =
PaginatedResponse::with_pagination(base_response, next_cursor, total, has_more);
serde_json::to_value(paginated_response).map_err(McpError::from)
}
fn supported_methods(&self) -> Vec<String> {
vec!["roots/list".to_string()]
}
}
pub struct SamplingHandler;
#[async_trait]
impl McpHandler for SamplingHandler {
async fn handle(&self, params: Option<Value>) -> McpResult<Value> {
use turul_mcp_protocol::meta::{ProgressResponse, ProgressToken};
use turul_mcp_protocol::sampling::CreateMessageResult;
let progress_token = params
.as_ref()
.and_then(|p| p.get("progressToken"))
.and_then(|t| t.as_str())
.map(ProgressToken::from);
let base_response = CreateMessageResult {
role: turul_mcp_protocol::sampling::Role::Assistant,
content: turul_mcp_protocol::prompts::ContentBlock::text(
"This is a sample message generated by the MCP server",
),
model: "mock-model-v1".to_string(),
stop_reason: Some("stop".to_string()),
meta: None,
};
let progress_response = ProgressResponse::with_progress(
base_response,
progress_token.or_else(|| Some(ProgressToken::new("sampling-default"))),
1.0, Some(1),
Some(1),
);
serde_json::to_value(progress_response).map_err(McpError::from)
}
fn supported_methods(&self) -> Vec<String> {
vec!["sampling/createMessage".to_string()]
}
}
pub struct ProvidedSamplingHandler {
providers: HashMap<String, Arc<dyn crate::McpSampling>>,
}
impl ProvidedSamplingHandler {
pub fn new(providers: HashMap<String, Arc<dyn crate::McpSampling>>) -> Self {
Self { providers }
}
}
#[async_trait]
impl McpHandler for ProvidedSamplingHandler {
async fn handle(&self, params: Option<Value>) -> McpResult<Value> {
use turul_mcp_protocol::meta::{ProgressResponse, ProgressToken};
use turul_mcp_protocol::sampling::{CreateMessageParams, CreateMessageRequest};
let progress_token = params
.as_ref()
.and_then(|p| p.get("progressToken"))
.and_then(|t| t.as_str())
.map(ProgressToken::from);
let message_params: CreateMessageParams =
serde_json::from_value(params.ok_or_else(|| McpError::missing_param("params"))?)?;
let request = CreateMessageRequest {
method: "sampling/createMessage".to_string(),
params: message_params,
};
let provider = self
.providers
.values()
.next()
.ok_or_else(|| McpError::configuration("No sampling provider available"))?;
provider.validate_request(&request).await?;
let result = provider.sample(request).await?;
let progress_response = ProgressResponse::with_progress(
result,
progress_token.or_else(|| Some(ProgressToken::new("sampling-provided"))),
1.0, Some(1),
Some(1),
);
serde_json::to_value(progress_response).map_err(McpError::from)
}
fn supported_methods(&self) -> Vec<String> {
vec!["sampling/createMessage".to_string()]
}
}
pub struct ResourceTemplatesHandler {
templates: Vec<(crate::uri_template::UriTemplate, Arc<dyn McpResource>)>,
}
impl Default for ResourceTemplatesHandler {
fn default() -> Self {
Self::new()
}
}
impl ResourceTemplatesHandler {
pub fn new() -> Self {
Self {
templates: Vec::new(),
}
}
pub fn with_templates(
mut self,
templates: Vec<(crate::uri_template::UriTemplate, Arc<dyn McpResource>)>,
) -> Self {
self.templates = templates;
self
}
}
#[async_trait]
impl McpHandler for ResourceTemplatesHandler {
async fn handle(&self, params: Option<Value>) -> McpResult<Value> {
use turul_mcp_protocol::meta::Cursor;
const DEFAULT_PAGE_SIZE: usize = 50;
const MAX_PAGE_SIZE: usize = 1000;
const MIN_PAGE_SIZE: usize = 1;
let page_size = match extract_limit_from_params(¶ms) {
Some(0) => {
return Err(McpError::InvalidParameters(
"limit must be at least 1 (zero would return empty pages forever)".to_string(),
));
}
Some(n) => n.clamp(MIN_PAGE_SIZE, MAX_PAGE_SIZE),
None => DEFAULT_PAGE_SIZE,
};
use turul_mcp_protocol::resources::ListResourceTemplatesParams;
let list_params = if let Some(params_value) = params {
serde_json::from_value::<ListResourceTemplatesParams>(params_value).map_err(|e| {
McpError::InvalidParameters(format!(
"Invalid parameters for resources/templates/list: {}",
e
))
})?
} else {
ListResourceTemplatesParams::new()
};
let cursor = list_params.cursor;
debug!(
"Listing resource templates with cursor: {:?}, limit: {}",
cursor, page_size
);
tracing::info!(
"Resource templates list requested - {} templates registered",
self.templates.len()
);
use turul_mcp_protocol::resources::{ListResourceTemplatesResult, ResourceTemplate};
let mut all_templates: Vec<ResourceTemplate> = self
.templates
.iter()
.map(|(uri_template, resource)| {
let template_name = resource.name();
let mut template = ResourceTemplate::new(template_name, uri_template.pattern());
if let Some(desc) = resource.description() {
template = template.with_description(desc);
}
if let Some(mime_type) = resource.mime_type() {
template = template.with_mime_type(mime_type);
}
template
})
.collect();
all_templates.sort_by(|a, b| a.uri_template.cmp(&b.uri_template));
let start_index = if let Some(cursor) = &cursor {
let cursor_template = cursor.as_str();
all_templates
.iter()
.position(|t| t.uri_template.as_str() > cursor_template)
.unwrap_or(all_templates.len())
} else {
0 };
let end_index = std::cmp::min(start_index + page_size, all_templates.len());
let page_templates = all_templates[start_index..end_index].to_vec();
let total = Some(all_templates.len() as u64);
let has_more = end_index < all_templates.len();
let next_cursor = if has_more {
page_templates.last().map(|t| Cursor::new(&t.uri_template))
} else {
None };
debug!(
"Resource template pagination: page_size={}, has_more={}, next_cursor={:?}",
page_templates.len(),
has_more,
next_cursor
);
let mut base_response = ListResourceTemplatesResult::new(page_templates);
if let Some(ref cursor) = next_cursor {
base_response = base_response.with_next_cursor(cursor.clone());
}
use turul_mcp_protocol::meta::PaginatedResponse;
let next_cursor_clone = next_cursor.clone();
let mut paginated_response =
PaginatedResponse::with_pagination(base_response, next_cursor, total, has_more);
if let Some(request_meta) = list_params.meta {
let mut response_meta = paginated_response.meta().cloned().unwrap_or_else(|| {
turul_mcp_protocol::meta::Meta::with_pagination(next_cursor_clone, total, has_more)
});
for (key, value) in request_meta {
response_meta.extra.insert(key, value);
}
paginated_response = paginated_response.with_meta(response_meta);
}
serde_json::to_value(paginated_response).map_err(McpError::from)
}
fn supported_methods(&self) -> Vec<String> {
vec!["resources/templates/list".to_string()]
}
}
#[async_trait]
pub trait ElicitationProvider: Send + Sync {
async fn elicit(
&self,
request: &turul_mcp_protocol::elicitation::ElicitCreateRequest,
) -> McpResult<turul_mcp_protocol::elicitation::ElicitResult>;
fn can_handle(&self, _request: &turul_mcp_protocol::elicitation::ElicitCreateRequest) -> bool {
true
}
}
pub struct MockElicitationProvider;
#[async_trait]
impl ElicitationProvider for MockElicitationProvider {
async fn elicit(
&self,
request: &turul_mcp_protocol::elicitation::ElicitCreateRequest,
) -> McpResult<turul_mcp_protocol::elicitation::ElicitResult> {
use turul_mcp_protocol::elicitation::ElicitResult;
let mut mock_data = std::collections::HashMap::new();
mock_data.insert("mock_response".to_string(), serde_json::json!(true));
mock_data.insert(
"message".to_string(),
serde_json::json!(&request.params.message),
);
mock_data.insert(
"note".to_string(),
serde_json::json!("This is a mock elicitation response for testing"),
);
match request.params.message.as_str() {
msg if msg.contains("cancel") => Ok(ElicitResult::cancel()),
msg if msg.contains("decline") => Ok(ElicitResult::decline()),
_ => Ok(ElicitResult::accept(mock_data)),
}
}
fn can_handle(&self, _request: &turul_mcp_protocol::elicitation::ElicitCreateRequest) -> bool {
true }
}
pub struct ElicitationHandler {
provider: Arc<dyn ElicitationProvider>,
}
impl ElicitationHandler {
pub fn new(provider: Arc<dyn ElicitationProvider>) -> Self {
Self { provider }
}
pub fn with_mock_provider() -> Self {
Self::new(Arc::new(MockElicitationProvider))
}
}
#[async_trait]
impl McpHandler for ElicitationHandler {
async fn handle(&self, params: Option<Value>) -> McpResult<Value> {
use turul_mcp_protocol::elicitation::ElicitCreateParams;
if let Some(params) = params {
let request_params: ElicitCreateParams = serde_json::from_value(params)?;
tracing::info!("Processing elicitation request: {}", request_params.message);
use turul_mcp_protocol::elicitation::ElicitCreateRequest;
let create_request = ElicitCreateRequest {
method: "elicitation/create".to_string(),
params: request_params.clone(),
};
if !self.provider.can_handle(&create_request) {
let error_response = turul_mcp_protocol::elicitation::ElicitResult::cancel();
return serde_json::to_value(error_response).map_err(McpError::from);
}
let result = self.provider.elicit(&create_request).await?;
serde_json::to_value(result).map_err(McpError::from)
} else {
Err(McpError::missing_param("ElicitCreateParams"))
}
}
fn supported_methods(&self) -> Vec<String> {
vec!["elicitation/create".to_string()]
}
}
use crate::session::SessionManager;
pub struct NotificationsHandler;
#[async_trait]
impl McpHandler for NotificationsHandler {
async fn handle(&self, params: Option<Value>) -> McpResult<Value> {
tracing::info!("Received notification: {:?}", params);
Ok(Value::Null)
}
fn supported_methods(&self) -> Vec<String> {
vec![
"notifications/message".to_string(),
"notifications/progress".to_string(),
"notifications/resources/list_changed".to_string(),
"notifications/tools/list_changed".to_string(),
"notifications/prompts/list_changed".to_string(),
"notifications/roots/list_changed".to_string(),
"notifications/resources/listChanged".to_string(),
"notifications/tools/listChanged".to_string(),
"notifications/prompts/listChanged".to_string(),
"notifications/roots/listChanged".to_string(),
"notifications/resources/updated".to_string(),
]
}
}
pub struct InitializedNotificationHandler {
session_manager: Arc<SessionManager>,
}
impl InitializedNotificationHandler {
pub fn new(session_manager: Arc<SessionManager>) -> Self {
Self { session_manager }
}
}
#[async_trait]
impl McpHandler for InitializedNotificationHandler {
async fn handle(&self, _params: Option<Value>) -> McpResult<Value> {
tracing::warn!("notifications/initialized received without session context");
Ok(Value::Null)
}
async fn handle_with_session(
&self,
params: Option<Value>,
session: Option<SessionContext>,
) -> McpResult<Value> {
tracing::debug!("đ¨ Received notifications/initialized: {:?}", params);
if let Some(session_ctx) = session {
tracing::debug!(
"đ Processing notifications/initialized for session: {}",
session_ctx.session_id
);
if self
.session_manager
.is_session_initialized(&session_ctx.session_id)
.await
{
tracing::info!(
"âšī¸ Session {} already initialized, ignoring duplicate notifications/initialized",
session_ctx.session_id
);
return Ok(Value::Null);
}
let client_info_value = self
.session_manager
.get_session_state(&session_ctx.session_id, "client_info")
.await;
let capabilities_value = self
.session_manager
.get_session_state(&session_ctx.session_id, "client_capabilities")
.await;
let negotiated_version_value = self
.session_manager
.get_session_state(&session_ctx.session_id, "negotiated_version")
.await;
if let (
Some(client_info_value),
Some(capabilities_value),
Some(negotiated_version_value),
) = (
client_info_value,
capabilities_value,
negotiated_version_value,
) {
use turul_mcp_protocol::{ClientCapabilities, Implementation, McpVersion};
if let (Ok(client_info), Ok(client_capabilities), Ok(negotiated_version)) = (
serde_json::from_value::<Implementation>(client_info_value),
serde_json::from_value::<ClientCapabilities>(capabilities_value),
serde_json::from_value::<McpVersion>(negotiated_version_value),
) {
if let Err(e) = self
.session_manager
.initialize_session_with_version(
&session_ctx.session_id,
client_info,
client_capabilities,
negotiated_version,
)
.await
{
tracing::error!(
"â Failed to initialize session {}: {}",
session_ctx.session_id,
e
);
return Err(turul_mcp_protocol::McpError::configuration(&format!(
"Failed to initialize session: {}",
e
)));
}
tracing::debug!(
"â
Session {} successfully initialized after receiving notifications/initialized",
session_ctx.session_id
);
} else {
tracing::error!(
"â Failed to deserialize stored client info/capabilities/version for session {}",
session_ctx.session_id
);
return Err(turul_mcp_protocol::McpError::configuration(
"Failed to deserialize stored client info",
));
}
} else {
tracing::error!(
"â Missing stored client info/capabilities/version for session {}",
session_ctx.session_id
);
return Err(turul_mcp_protocol::McpError::configuration(
"Missing stored client info - session must call initialize first",
));
}
} else {
tracing::warn!("â ī¸ notifications/initialized received without session context");
}
Ok(Value::Null)
}
fn supported_methods(&self) -> Vec<String> {
vec!["notifications/initialized".to_string()]
}
}