use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;
use tracing::{debug, error, info, warn};
use turbomcp_protocol::MessageId;
use turbomcp_protocol::jsonrpc::JsonRpcError;
use turbomcp_protocol::types::LogLevel;
pub use turbomcp_protocol::types::{
CancelledNotification, LoggingNotification, ProgressNotification, ResourceUpdatedNotification, };
#[derive(Error, Debug)]
#[non_exhaustive]
pub enum HandlerError {
#[error("User cancelled the operation")]
UserCancelled,
#[error("Handler operation timed out after {timeout_seconds} seconds")]
Timeout { timeout_seconds: u64 },
#[error("Invalid input: {details}")]
InvalidInput { details: String },
#[error("Handler configuration error: {message}")]
Configuration { message: String },
#[error("Handler error: {message}")]
Generic { message: String },
#[error("External system error: {source}")]
External {
#[from]
source: Box<dyn std::error::Error + Send + Sync>,
},
}
impl HandlerError {
#[must_use]
pub fn into_jsonrpc_error(&self) -> JsonRpcError {
let (code, message) = match self {
HandlerError::UserCancelled => (-1, "User rejected sampling request".to_string()),
HandlerError::Timeout { timeout_seconds } => (
-32801,
format!(
"Handler operation timed out after {} seconds",
timeout_seconds
),
),
HandlerError::InvalidInput { details } => {
(-32602, format!("Invalid input: {}", details))
}
HandlerError::Configuration { message } => {
(-32601, format!("Handler configuration error: {}", message))
}
HandlerError::Generic { message } => (-32603, format!("Handler error: {}", message)),
HandlerError::External { source } => {
(-32603, format!("External system error: {}", source))
}
};
JsonRpcError {
code,
message,
data: None,
}
}
}
pub type HandlerResult<T> = Result<T, HandlerError>;
#[derive(Debug, Clone)]
pub struct ElicitationRequest {
id: MessageId,
inner: turbomcp_protocol::types::ElicitRequest,
}
impl ElicitationRequest {
#[must_use]
pub fn new(id: MessageId, request: turbomcp_protocol::types::ElicitRequest) -> Self {
Self { id, inner: request }
}
#[must_use]
pub fn id(&self) -> &MessageId {
&self.id
}
#[must_use]
pub fn message(&self) -> &str {
self.inner.params.message()
}
#[must_use]
pub fn schema(&self) -> Option<&turbomcp_protocol::types::ElicitationSchema> {
#[allow(unreachable_patterns)]
match &self.inner.params {
turbomcp_protocol::types::ElicitRequestParams::Form(form) => Some(&form.schema),
_ => None, }
}
#[must_use]
pub fn timeout(&self) -> Option<Duration> {
#[allow(unreachable_patterns)]
match &self.inner.params {
turbomcp_protocol::types::ElicitRequestParams::Form(form) => {
form.timeout_ms.map(|ms| Duration::from_millis(ms as u64))
}
_ => None, }
}
#[must_use]
pub fn is_cancellable(&self) -> bool {
#[allow(unreachable_patterns)]
match &self.inner.params {
turbomcp_protocol::types::ElicitRequestParams::Form(form) => {
form.cancellable.unwrap_or(false)
}
_ => false, }
}
#[must_use]
pub fn as_protocol(&self) -> &turbomcp_protocol::types::ElicitRequest {
&self.inner
}
#[must_use]
pub fn into_protocol(self) -> turbomcp_protocol::types::ElicitRequest {
self.inner
}
}
pub use turbomcp_protocol::types::ElicitationAction;
#[derive(Debug, Clone)]
pub struct ElicitationResponse {
inner: turbomcp_protocol::types::ElicitResult,
}
impl ElicitationResponse {
#[must_use]
pub fn accept(content: HashMap<String, serde_json::Value>) -> Self {
Self {
inner: turbomcp_protocol::types::ElicitResult {
action: ElicitationAction::Accept,
content: Some(content),
_meta: None,
},
}
}
#[must_use]
pub fn decline() -> Self {
Self {
inner: turbomcp_protocol::types::ElicitResult {
action: ElicitationAction::Decline,
content: None,
_meta: None,
},
}
}
#[must_use]
pub fn cancel() -> Self {
Self {
inner: turbomcp_protocol::types::ElicitResult {
action: ElicitationAction::Cancel,
content: None,
_meta: None,
},
}
}
#[must_use]
pub fn action(&self) -> ElicitationAction {
self.inner.action
}
#[must_use]
pub fn content(&self) -> Option<&HashMap<String, serde_json::Value>> {
self.inner.content.as_ref()
}
pub(crate) fn into_protocol(self) -> turbomcp_protocol::types::ElicitResult {
self.inner
}
}
pub trait ElicitationHandler: Send + Sync + std::fmt::Debug {
fn handle_elicitation(
&self,
request: ElicitationRequest,
) -> Pin<Box<dyn Future<Output = HandlerResult<ElicitationResponse>> + Send + '_>>;
}
pub trait LogHandler: Send + Sync + std::fmt::Debug {
fn handle_log(
&self,
log: LoggingNotification,
) -> Pin<Box<dyn Future<Output = HandlerResult<()>> + Send + '_>>;
}
pub trait ResourceUpdateHandler: Send + Sync + std::fmt::Debug {
fn handle_resource_update(
&self,
notification: ResourceUpdatedNotification,
) -> Pin<Box<dyn Future<Output = HandlerResult<()>> + Send + '_>>;
}
pub trait RootsHandler: Send + Sync + std::fmt::Debug {
fn handle_roots_request(
&self,
) -> Pin<Box<dyn Future<Output = HandlerResult<Vec<turbomcp_protocol::types::Root>>> + Send + '_>>;
}
pub trait CancellationHandler: Send + Sync + std::fmt::Debug {
fn handle_cancellation(
&self,
notification: CancelledNotification,
) -> Pin<Box<dyn Future<Output = HandlerResult<()>> + Send + '_>>;
}
pub trait ResourceListChangedHandler: Send + Sync + std::fmt::Debug {
fn handle_resource_list_changed(
&self,
) -> Pin<Box<dyn Future<Output = HandlerResult<()>> + Send + '_>>;
}
pub trait PromptListChangedHandler: Send + Sync + std::fmt::Debug {
fn handle_prompt_list_changed(
&self,
) -> Pin<Box<dyn Future<Output = HandlerResult<()>> + Send + '_>>;
}
pub trait ToolListChangedHandler: Send + Sync + std::fmt::Debug {
fn handle_tool_list_changed(
&self,
) -> Pin<Box<dyn Future<Output = HandlerResult<()>> + Send + '_>>;
}
pub trait ProgressHandler: Send + Sync + std::fmt::Debug {
fn handle_progress(
&self,
notification: ProgressNotification,
) -> Pin<Box<dyn Future<Output = HandlerResult<()>> + Send + '_>>;
}
#[derive(Debug, Default)]
pub struct HandlerRegistry {
pub roots: Option<Arc<dyn RootsHandler>>,
pub elicitation: Option<Arc<dyn ElicitationHandler>>,
pub log: Option<Arc<dyn LogHandler>>,
pub resource_update: Option<Arc<dyn ResourceUpdateHandler>>,
pub cancellation: Option<Arc<dyn CancellationHandler>>,
pub resource_list_changed: Option<Arc<dyn ResourceListChangedHandler>>,
pub prompt_list_changed: Option<Arc<dyn PromptListChangedHandler>>,
pub tool_list_changed: Option<Arc<dyn ToolListChangedHandler>>,
pub progress: Option<Arc<dyn ProgressHandler>>,
}
impl HandlerRegistry {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn set_roots_handler(&mut self, handler: Arc<dyn RootsHandler>) {
debug!("Registering roots handler");
self.roots = Some(handler);
}
pub fn set_elicitation_handler(&mut self, handler: Arc<dyn ElicitationHandler>) {
debug!("Registering elicitation handler");
self.elicitation = Some(handler);
}
pub fn set_log_handler(&mut self, handler: Arc<dyn LogHandler>) {
debug!("Registering log handler");
self.log = Some(handler);
}
pub fn set_resource_update_handler(&mut self, handler: Arc<dyn ResourceUpdateHandler>) {
debug!("Registering resource update handler");
self.resource_update = Some(handler);
}
pub fn set_cancellation_handler(&mut self, handler: Arc<dyn CancellationHandler>) {
debug!("Registering cancellation handler");
self.cancellation = Some(handler);
}
pub fn set_resource_list_changed_handler(
&mut self,
handler: Arc<dyn ResourceListChangedHandler>,
) {
debug!("Registering resource list changed handler");
self.resource_list_changed = Some(handler);
}
pub fn set_prompt_list_changed_handler(&mut self, handler: Arc<dyn PromptListChangedHandler>) {
debug!("Registering prompt list changed handler");
self.prompt_list_changed = Some(handler);
}
pub fn set_tool_list_changed_handler(&mut self, handler: Arc<dyn ToolListChangedHandler>) {
debug!("Registering tool list changed handler");
self.tool_list_changed = Some(handler);
}
pub fn set_progress_handler(&mut self, handler: Arc<dyn ProgressHandler>) {
debug!("Registering progress handler");
self.progress = Some(handler);
}
#[must_use]
pub fn has_roots_handler(&self) -> bool {
self.roots.is_some()
}
#[must_use]
pub fn has_elicitation_handler(&self) -> bool {
self.elicitation.is_some()
}
#[must_use]
pub fn has_log_handler(&self) -> bool {
self.log.is_some()
}
#[must_use]
pub fn has_resource_update_handler(&self) -> bool {
self.resource_update.is_some()
}
#[must_use]
pub fn get_log_handler(&self) -> Option<Arc<dyn LogHandler>> {
self.log.clone()
}
#[must_use]
pub fn get_resource_update_handler(&self) -> Option<Arc<dyn ResourceUpdateHandler>> {
self.resource_update.clone()
}
#[must_use]
pub fn get_cancellation_handler(&self) -> Option<Arc<dyn CancellationHandler>> {
self.cancellation.clone()
}
#[must_use]
pub fn get_resource_list_changed_handler(&self) -> Option<Arc<dyn ResourceListChangedHandler>> {
self.resource_list_changed.clone()
}
#[must_use]
pub fn get_prompt_list_changed_handler(&self) -> Option<Arc<dyn PromptListChangedHandler>> {
self.prompt_list_changed.clone()
}
#[must_use]
pub fn get_tool_list_changed_handler(&self) -> Option<Arc<dyn ToolListChangedHandler>> {
self.tool_list_changed.clone()
}
#[must_use]
pub fn has_progress_handler(&self) -> bool {
self.progress.is_some()
}
#[must_use]
pub fn get_progress_handler(&self) -> Option<Arc<dyn ProgressHandler>> {
self.progress.clone()
}
pub async fn handle_roots_request(&self) -> HandlerResult<Vec<turbomcp_protocol::types::Root>> {
match &self.roots {
Some(handler) => {
info!("Processing roots/list request from server");
handler.handle_roots_request().await
}
None => {
warn!("No roots handler registered, returning empty roots list");
Ok(Vec::new())
}
}
}
pub async fn handle_elicitation(
&self,
request: ElicitationRequest,
) -> HandlerResult<ElicitationResponse> {
match &self.elicitation {
Some(handler) => {
info!("Processing elicitation request: {}", request.id);
handler.handle_elicitation(request).await
}
None => {
warn!("No elicitation handler registered, declining request");
Err(HandlerError::Configuration {
message: "No elicitation handler registered".to_string(),
})
}
}
}
pub async fn handle_log(&self, log: LoggingNotification) -> HandlerResult<()> {
match &self.log {
Some(handler) => handler.handle_log(log).await,
None => {
debug!("No log handler registered, ignoring log message");
Ok(())
}
}
}
pub async fn handle_resource_update(
&self,
notification: ResourceUpdatedNotification,
) -> HandlerResult<()> {
match &self.resource_update {
Some(handler) => {
debug!("Processing resource update: {}", notification.uri);
handler.handle_resource_update(notification).await
}
None => {
debug!("No resource update handler registered, ignoring notification");
Ok(())
}
}
}
}
#[derive(Debug)]
pub struct DeclineElicitationHandler;
impl ElicitationHandler for DeclineElicitationHandler {
fn handle_elicitation(
&self,
request: ElicitationRequest,
) -> Pin<Box<dyn Future<Output = HandlerResult<ElicitationResponse>> + Send + '_>> {
Box::pin(async move {
warn!("Declining elicitation request: {}", request.message());
Ok(ElicitationResponse::decline())
})
}
}
#[derive(Debug)]
pub struct TracingLogHandler;
impl LogHandler for TracingLogHandler {
fn handle_log(
&self,
log: LoggingNotification,
) -> Pin<Box<dyn Future<Output = HandlerResult<()>> + Send + '_>> {
Box::pin(async move {
let logger_prefix = log.logger.as_deref().unwrap_or("server");
let message = log.data.to_string();
match log.level {
LogLevel::Error => error!("[{}] {}", logger_prefix, message),
LogLevel::Warning => warn!("[{}] {}", logger_prefix, message),
LogLevel::Info => info!("[{}] {}", logger_prefix, message),
LogLevel::Debug => debug!("[{}] {}", logger_prefix, message),
LogLevel::Notice => info!("[{}] [NOTICE] {}", logger_prefix, message),
LogLevel::Critical => error!("[{}] [CRITICAL] {}", logger_prefix, message),
LogLevel::Alert => error!("[{}] [ALERT] {}", logger_prefix, message),
LogLevel::Emergency => error!("[{}] [EMERGENCY] {}", logger_prefix, message),
}
Ok(())
})
}
}
#[derive(Debug)]
pub struct LoggingResourceUpdateHandler;
impl ResourceUpdateHandler for LoggingResourceUpdateHandler {
fn handle_resource_update(
&self,
notification: ResourceUpdatedNotification,
) -> Pin<Box<dyn Future<Output = HandlerResult<()>> + Send + '_>> {
Box::pin(async move {
info!("Resource {} was updated", notification.uri);
Ok(())
})
}
}
#[derive(Debug)]
pub struct LoggingCancellationHandler;
impl CancellationHandler for LoggingCancellationHandler {
fn handle_cancellation(
&self,
notification: CancelledNotification,
) -> Pin<Box<dyn Future<Output = HandlerResult<()>> + Send + '_>> {
Box::pin(async move {
if let Some(reason) = ¬ification.reason {
info!(
"Request {} was cancelled: {}",
notification.request_id, reason
);
} else {
info!("Request {} was cancelled", notification.request_id);
}
Ok(())
})
}
}
#[derive(Debug)]
pub struct LoggingResourceListChangedHandler;
impl ResourceListChangedHandler for LoggingResourceListChangedHandler {
fn handle_resource_list_changed(
&self,
) -> Pin<Box<dyn Future<Output = HandlerResult<()>> + Send + '_>> {
Box::pin(async move {
info!("Server's resource list changed");
Ok(())
})
}
}
#[derive(Debug)]
pub struct LoggingPromptListChangedHandler;
impl PromptListChangedHandler for LoggingPromptListChangedHandler {
fn handle_prompt_list_changed(
&self,
) -> Pin<Box<dyn Future<Output = HandlerResult<()>> + Send + '_>> {
Box::pin(async move {
info!("Server's prompt list changed");
Ok(())
})
}
}
#[derive(Debug)]
pub struct LoggingToolListChangedHandler;
impl ToolListChangedHandler for LoggingToolListChangedHandler {
fn handle_tool_list_changed(
&self,
) -> Pin<Box<dyn Future<Output = HandlerResult<()>> + Send + '_>> {
Box::pin(async move {
info!("Server's tool list changed");
Ok(())
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
use tokio;
#[derive(Debug)]
struct TestElicitationHandler;
impl ElicitationHandler for TestElicitationHandler {
fn handle_elicitation(
&self,
_request: ElicitationRequest,
) -> Pin<Box<dyn Future<Output = HandlerResult<ElicitationResponse>> + Send + '_>> {
Box::pin(async move {
let mut content = HashMap::new();
content.insert("test".to_string(), json!("response"));
Ok(ElicitationResponse::accept(content))
})
}
}
#[tokio::test]
async fn test_handler_registry_creation() {
let registry = HandlerRegistry::new();
assert!(!registry.has_elicitation_handler());
assert!(!registry.has_log_handler());
assert!(!registry.has_resource_update_handler());
}
#[tokio::test]
async fn test_elicitation_handler_registration() {
let mut registry = HandlerRegistry::new();
let handler = Arc::new(TestElicitationHandler);
registry.set_elicitation_handler(handler);
assert!(registry.has_elicitation_handler());
}
#[tokio::test]
async fn test_elicitation_request_handling() {
let mut registry = HandlerRegistry::new();
let handler = Arc::new(TestElicitationHandler);
registry.set_elicitation_handler(handler);
let proto_request = turbomcp_protocol::types::ElicitRequest {
params: turbomcp_protocol::types::ElicitRequestParams::form(
"Test prompt".to_string(),
turbomcp_protocol::types::ElicitationSchema::new(),
None,
None,
),
task: None,
_meta: None,
};
let request = ElicitationRequest::new(
turbomcp_protocol::MessageId::String("test-123".to_string()),
proto_request,
);
let response = registry.handle_elicitation(request).await.unwrap();
assert_eq!(response.action(), ElicitationAction::Accept);
assert!(response.content().is_some());
}
#[tokio::test]
async fn test_default_handlers() {
let decline_handler = DeclineElicitationHandler;
let proto_request = turbomcp_protocol::types::ElicitRequest {
params: turbomcp_protocol::types::ElicitRequestParams::form(
"Test".to_string(),
turbomcp_protocol::types::ElicitationSchema::new(),
None,
None,
),
task: None,
_meta: None,
};
let request = ElicitationRequest::new(
turbomcp_protocol::MessageId::String("test".to_string()),
proto_request,
);
let response = decline_handler.handle_elicitation(request).await.unwrap();
assert_eq!(response.action(), ElicitationAction::Decline);
}
#[tokio::test]
async fn test_handler_error_types() {
let error = HandlerError::UserCancelled;
assert!(error.to_string().contains("User cancelled"));
let timeout_error = HandlerError::Timeout {
timeout_seconds: 30,
};
assert!(timeout_error.to_string().contains("30 seconds"));
}
#[test]
fn test_user_cancelled_error_mapping() {
let error = HandlerError::UserCancelled;
let jsonrpc_error = error.into_jsonrpc_error();
assert_eq!(
jsonrpc_error.code, -1,
"User cancelled should map to -1 per current MCP spec"
);
assert!(jsonrpc_error.message.contains("User rejected"));
assert!(jsonrpc_error.data.is_none());
}
#[test]
fn test_timeout_error_mapping() {
let error = HandlerError::Timeout {
timeout_seconds: 30,
};
let jsonrpc_error = error.into_jsonrpc_error();
assert_eq!(jsonrpc_error.code, -32801, "Timeout should map to -32801");
assert!(jsonrpc_error.message.contains("30 seconds"));
assert!(jsonrpc_error.data.is_none());
}
#[test]
fn test_invalid_input_error_mapping() {
let error = HandlerError::InvalidInput {
details: "Missing required field".to_string(),
};
let jsonrpc_error = error.into_jsonrpc_error();
assert_eq!(
jsonrpc_error.code, -32602,
"Invalid input should map to -32602"
);
assert!(jsonrpc_error.message.contains("Invalid input"));
assert!(jsonrpc_error.message.contains("Missing required field"));
assert!(jsonrpc_error.data.is_none());
}
#[test]
fn test_configuration_error_mapping() {
let error = HandlerError::Configuration {
message: "Handler not registered".to_string(),
};
let jsonrpc_error = error.into_jsonrpc_error();
assert_eq!(
jsonrpc_error.code, -32601,
"Configuration error should map to -32601"
);
assert!(
jsonrpc_error
.message
.contains("Handler configuration error")
);
assert!(jsonrpc_error.message.contains("Handler not registered"));
assert!(jsonrpc_error.data.is_none());
}
#[test]
fn test_generic_error_mapping() {
let error = HandlerError::Generic {
message: "Something went wrong".to_string(),
};
let jsonrpc_error = error.into_jsonrpc_error();
assert_eq!(
jsonrpc_error.code, -32603,
"Generic error should map to -32603"
);
assert!(jsonrpc_error.message.contains("Handler error"));
assert!(jsonrpc_error.message.contains("Something went wrong"));
assert!(jsonrpc_error.data.is_none());
}
#[test]
fn test_external_error_mapping() {
let external_err = Box::new(std::io::Error::other("Database connection failed"));
let error = HandlerError::External {
source: external_err,
};
let jsonrpc_error = error.into_jsonrpc_error();
assert_eq!(
jsonrpc_error.code, -32603,
"External error should map to -32603"
);
assert!(jsonrpc_error.message.contains("External system error"));
assert!(jsonrpc_error.message.contains("Database connection failed"));
assert!(jsonrpc_error.data.is_none());
}
#[test]
fn test_error_code_uniqueness() {
let user_cancelled = HandlerError::UserCancelled.into_jsonrpc_error().code;
let timeout = HandlerError::Timeout { timeout_seconds: 1 }
.into_jsonrpc_error()
.code;
let invalid_input = HandlerError::InvalidInput {
details: "test".to_string(),
}
.into_jsonrpc_error()
.code;
let configuration = HandlerError::Configuration {
message: "test".to_string(),
}
.into_jsonrpc_error()
.code;
assert_ne!(user_cancelled, timeout);
assert_ne!(user_cancelled, invalid_input);
assert_ne!(user_cancelled, configuration);
assert_ne!(timeout, invalid_input);
assert_ne!(timeout, configuration);
assert_ne!(invalid_input, configuration);
}
#[test]
fn test_error_messages_are_informative() {
let errors = vec![
HandlerError::UserCancelled,
HandlerError::Timeout {
timeout_seconds: 42,
},
HandlerError::InvalidInput {
details: "test detail".to_string(),
},
HandlerError::Configuration {
message: "test config".to_string(),
},
HandlerError::Generic {
message: "test generic".to_string(),
},
];
for error in errors {
let jsonrpc_error = error.into_jsonrpc_error();
assert!(
!jsonrpc_error.message.is_empty(),
"Error message should not be empty"
);
assert!(
jsonrpc_error.message.len() > 10,
"Error message should be descriptive"
);
}
}
}