use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use async_trait::async_trait;
use tracing::{debug, error, info, warn};
use crate::handlers::McpHandler;
use crate::session::SessionEventDispatcher;
#[cfg(feature = "http")]
struct StreamManagerEventDispatcher {
stream_manager: Arc<turul_http_mcp_server::StreamManager>,
}
#[cfg(feature = "http")]
#[async_trait]
impl SessionEventDispatcher for StreamManagerEventDispatcher {
async fn dispatch_to_session(
&self,
session_id: &str,
event_type: String,
data: serde_json::Value,
) -> std::result::Result<(), String> {
self.stream_manager
.broadcast_to_session(session_id, event_type, data)
.await
.map(|_| ())
.map_err(|e| e.to_string())
}
}
use crate::session::{SessionContext, SessionManager};
use crate::{McpServerBuilder, McpTool, Result, tool::tool_to_descriptor};
use turul_mcp_json_rpc_server::JsonRpcHandler;
use turul_mcp_protocol::McpError;
use turul_mcp_protocol::*;
pub struct McpServer {
pub implementation: Implementation,
pub capabilities: ServerCapabilities,
tools: HashMap<String, Arc<dyn McpTool>>,
handlers: HashMap<String, Arc<dyn McpHandler>>,
session_manager: Arc<SessionManager>,
session_storage: Option<Arc<turul_mcp_session_storage::BoxedSessionStorage>>,
instructions: Option<String>,
strict_lifecycle: bool,
middleware_stack: crate::middleware::MiddlewareStack,
task_runtime: Option<Arc<crate::task::runtime::TaskRuntime>>,
route_registry: Arc<turul_http_mcp_server::RouteRegistry>,
tool_fingerprint: String,
#[cfg(feature = "dynamic-tools")]
tool_registry: Option<Arc<crate::tool_registry::ToolRegistry>>,
#[cfg(feature = "dynamic-tools")]
coordination_enabled: bool,
#[cfg(feature = "http")]
bind_address: SocketAddr,
#[cfg(feature = "http")]
mcp_path: String,
#[cfg(feature = "http")]
enable_cors: bool,
#[cfg(feature = "http")]
enable_sse: bool,
#[cfg(feature = "http")]
allow_unauthenticated_ping: Option<bool>,
}
impl McpServer {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
implementation: Implementation,
capabilities: ServerCapabilities,
tools: HashMap<String, Arc<dyn McpTool>>,
handlers: HashMap<String, Arc<dyn McpHandler>>,
instructions: Option<String>,
session_timeout_minutes: Option<u64>,
session_cleanup_interval_seconds: Option<u64>,
session_storage: Option<Arc<turul_mcp_session_storage::BoxedSessionStorage>>,
task_runtime: Option<Arc<crate::task::runtime::TaskRuntime>>,
strict_lifecycle: bool,
middleware_stack: crate::middleware::MiddlewareStack,
route_registry: Arc<turul_http_mcp_server::RouteRegistry>,
tool_fingerprint: String,
#[cfg(feature = "dynamic-tools")] dynamic_tools: bool,
#[cfg(feature = "dynamic-tools")]
server_state_storage: Option<Arc<dyn turul_mcp_server_state_storage::ServerStateStorage>>,
#[cfg(feature = "http")] bind_address: SocketAddr,
#[cfg(feature = "http")] mcp_path: String,
#[cfg(feature = "http")] enable_cors: bool,
#[cfg(feature = "http")] enable_sse: bool,
#[cfg(feature = "http")] allow_unauthenticated_ping: Option<bool>,
) -> Self {
let session_manager = match &session_storage {
Some(storage) => {
if let (Some(timeout_mins), Some(cleanup_secs)) =
(session_timeout_minutes, session_cleanup_interval_seconds)
{
Arc::new(SessionManager::with_storage_and_timeouts(
Arc::clone(storage),
capabilities.clone(),
std::time::Duration::from_secs(timeout_mins * 60),
std::time::Duration::from_secs(cleanup_secs),
))
} else {
Arc::new(SessionManager::with_storage_and_timeouts(
Arc::clone(storage),
capabilities.clone(),
std::time::Duration::from_secs(30 * 60), std::time::Duration::from_secs(60), ))
}
}
None => {
if let (Some(timeout_mins), Some(cleanup_secs)) =
(session_timeout_minutes, session_cleanup_interval_seconds)
{
Arc::new(SessionManager::with_timeouts(
capabilities.clone(),
std::time::Duration::from_secs(timeout_mins * 60),
std::time::Duration::from_secs(cleanup_secs),
))
} else {
Arc::new(SessionManager::new(capabilities.clone()))
}
}
};
if let Some(storage) = &session_storage {
debug!(
"McpServer configured with session storage backend: {:p}",
storage
);
} else {
debug!("McpServer configured without session storage");
}
#[cfg(feature = "dynamic-tools")]
let coordination_enabled = server_state_storage
.as_ref()
.map(|s| matches!(s.backend_name(), "PostgreSQL" | "DynamoDB"))
.unwrap_or(false);
#[cfg(feature = "dynamic-tools")]
let tool_registry = if dynamic_tools {
Some(Arc::new(Self::create_tool_registry(
tools.clone(),
session_manager.clone(),
server_state_storage,
)))
} else {
None
};
Self {
implementation,
capabilities,
tools,
handlers,
session_manager,
session_storage,
task_runtime,
instructions,
strict_lifecycle,
middleware_stack,
route_registry,
tool_fingerprint,
#[cfg(feature = "dynamic-tools")]
tool_registry,
#[cfg(feature = "dynamic-tools")]
coordination_enabled,
#[cfg(feature = "http")]
bind_address,
#[cfg(feature = "http")]
mcp_path,
#[cfg(feature = "http")]
enable_cors,
#[cfg(feature = "http")]
enable_sse,
#[cfg(feature = "http")]
allow_unauthenticated_ping,
}
}
pub fn builder() -> McpServerBuilder {
McpServerBuilder::new()
}
#[cfg(feature = "dynamic-tools")]
fn create_tool_registry(
compiled_tools: HashMap<String, Arc<dyn McpTool>>,
session_manager: Arc<SessionManager>,
server_state_storage: Option<
Arc<dyn turul_mcp_server_state_storage::ServerStateStorage>,
>,
) -> crate::tool_registry::ToolRegistry {
let storage = server_state_storage.unwrap_or_else(|| {
Arc::new(turul_mcp_server_state_storage::InMemoryServerStateStorage::new())
});
crate::tool_registry::ToolRegistry::new(compiled_tools, session_manager, storage)
}
#[cfg(feature = "dynamic-tools")]
pub async fn activate_tool(
&self,
name: &str,
) -> std::result::Result<bool, crate::tool_registry::ToolRegistryError> {
match &self.tool_registry {
Some(registry) => registry.activate_tool(name).await,
None => Err(crate::tool_registry::ToolRegistryError::NotCompiled(
format!(
"Cannot activate tool '{}': server is not in Dynamic mode",
name
),
)),
}
}
#[cfg(feature = "dynamic-tools")]
pub async fn deactivate_tool(
&self,
name: &str,
) -> std::result::Result<bool, crate::tool_registry::ToolRegistryError> {
match &self.tool_registry {
Some(registry) => registry.deactivate_tool(name).await,
None => Err(crate::tool_registry::ToolRegistryError::NotCompiled(
format!(
"Cannot deactivate tool '{}': server is not in Dynamic mode",
name
),
)),
}
}
#[cfg(feature = "dynamic-tools")]
pub fn tool_registry(&self) -> Option<&Arc<crate::tool_registry::ToolRegistry>> {
self.tool_registry.as_ref()
}
pub fn capabilities(&self) -> &turul_mcp_protocol::ServerCapabilities {
&self.capabilities
}
pub fn task_runtime(&self) -> Option<&Arc<crate::task::runtime::TaskRuntime>> {
self.task_runtime.as_ref()
}
pub async fn run(&self) -> Result<()> {
#[cfg(feature = "http")]
{
self.run_http().await
}
#[cfg(not(feature = "http"))]
{
Err(McpError::configuration(
"No transport available. Enable the 'http' feature to use HTTP transport.",
))
}
}
#[cfg(feature = "http")]
pub async fn run_http(&self) -> Result<()> {
info!(
"Starting MCP server: {} v{}",
self.implementation.name, self.implementation.version
);
info!("Session management: enabled with automatic cleanup");
if self.enable_sse {
info!("SSE notifications: enabled at GET {}", self.mcp_path);
}
let _cleanup_task = self.session_manager.clone().start_cleanup_task();
if let Some(ref runtime) = self.task_runtime {
match runtime.recover_stuck_tasks().await {
Ok(recovered) if !recovered.is_empty() => {
info!(
count = recovered.len(),
"Recovered stuck tasks from previous session"
);
}
Err(e) => {
warn!(error = %e, "Failed to recover stuck tasks on startup");
}
_ => {}
}
}
#[cfg(feature = "dynamic-tools")]
if self.coordination_enabled {
if let Some(ref registry) = self.tool_registry {
match registry.sync_from_storage().await {
Ok(crate::tool_registry::SyncResult::InitializedStorage) => {
info!("Dynamic: initialized shared storage with local tool state");
}
Ok(crate::tool_registry::SyncResult::InSync) => {
info!("Dynamic: local tools match shared storage");
}
Ok(crate::tool_registry::SyncResult::UpdatedStorage { old_fingerprint }) => {
warn!("Dynamic: updated shared storage (old fingerprint: {}). Other running instances may be serving stale tools.", old_fingerprint);
}
Err(e) => {
error!("Dynamic: failed to sync with shared storage: {}. Continuing with local state.", e);
}
}
let poll_interval = registry.check_ttl();
let _poll_handle = registry.start_polling(poll_interval);
debug!("Dynamic: started background polling (interval: {:?})", poll_interval);
}
}
let mut tool_handler = SessionAwareToolHandler::new(
self.tools.clone(),
self.session_manager.clone(),
self.strict_lifecycle,
);
if let Some(ref runtime) = self.task_runtime {
tool_handler = tool_handler.with_task_runtime(Arc::clone(runtime));
}
#[cfg(feature = "dynamic-tools")]
if let Some(ref registry) = self.tool_registry {
tool_handler = tool_handler.with_tool_registry(Arc::clone(registry));
}
let mut init_handler = SessionAwareInitializeHandler::new(
self.implementation.clone(),
self.capabilities.clone(),
self.instructions.clone(),
self.session_manager.clone(),
self.strict_lifecycle,
self.tool_fingerprint.clone(),
);
#[cfg(feature = "dynamic-tools")]
if let Some(ref registry) = self.tool_registry {
init_handler = init_handler.with_tool_registry(Arc::clone(registry));
}
let session_storage = self.session_manager.get_storage();
debug!("Configuring HTTP MCP server with session storage backend");
let mut builder =
turul_http_mcp_server::HttpMcpServer::builder_with_storage(session_storage)
.bind_address(self.bind_address)
.mcp_path(&self.mcp_path)
.cors(self.enable_cors)
.get_sse(self.enable_sse) .server_capabilities(self.capabilities.clone()) .with_middleware_stack(Arc::new(self.middleware_stack.clone())) .route_registry(Arc::clone(&self.route_registry)) .tool_fingerprint(self.tool_fingerprint.clone())
.register_handler(vec!["initialize".to_string()], init_handler)
.register_handler(vec!["tools/list".to_string()], {
let mut lth = ListToolsHandler::new_with_session_manager(
self.tools.clone(),
self.session_manager.clone(),
self.strict_lifecycle,
self.task_runtime.is_some(),
);
#[cfg(feature = "dynamic-tools")]
if let Some(ref registry) = self.tool_registry {
lth = lth.with_tool_registry(Arc::clone(registry));
}
lth
})
.register_handler(vec!["tools/call".to_string()], tool_handler);
if let Some(allow) = self.allow_unauthenticated_ping {
builder = builder.allow_unauthenticated_ping(allow);
}
for (method, handler) in &self.handlers {
let bridge_handler = SessionAwareMcpHandlerBridge::new(
handler.clone(),
self.session_manager.clone(),
self.strict_lifecycle,
);
builder = builder.register_handler(vec![method.clone()], bridge_handler);
}
use crate::handlers::InitializedNotificationHandler;
let initialized_handler = InitializedNotificationHandler::new(self.session_manager.clone());
let initialized_bridge = SessionAwareMcpHandlerBridge::new(
Arc::new(initialized_handler),
self.session_manager.clone(),
self.strict_lifecycle,
);
builder = builder.register_handler(
vec!["notifications/initialized".to_string()],
initialized_bridge,
);
let http_server = builder.build();
if self.enable_sse {
debug!("SSE support enabled with integrated session management");
self.install_event_dispatcher(&http_server).await;
self.setup_sse_event_bridge().await;
}
http_server.run().await.map_err(|http_err| match http_err {
turul_http_mcp_server::HttpMcpError::Mcp(mcp_err) => mcp_err,
turul_http_mcp_server::HttpMcpError::Http(http_err) => {
McpError::transport(&http_err.to_string())
}
turul_http_mcp_server::HttpMcpError::JsonRpc(rpc_err) => {
McpError::json_rpc_protocol(&rpc_err.to_string())
}
turul_http_mcp_server::HttpMcpError::Serialization(ser_err) => {
McpError::SerializationError(ser_err)
}
turul_http_mcp_server::HttpMcpError::Io(io_err) => McpError::IoError(io_err),
turul_http_mcp_server::HttpMcpError::InvalidRequest(msg) => {
McpError::InvalidParameters(msg)
}
})?;
Ok(())
}
#[cfg(feature = "http")]
async fn install_event_dispatcher(
&self,
http_server: &turul_http_mcp_server::HttpMcpServer,
) {
let stream_manager = http_server.get_stream_manager();
let dispatcher = Arc::new(StreamManagerEventDispatcher { stream_manager });
self.session_manager.set_event_dispatcher(dispatcher).await;
debug!("Event dispatcher installed (guaranteed persistence for Custom events)");
}
async fn setup_sse_event_bridge(&self) {
debug!("🌉 Setting up SSE event bridge (observer-only for Custom events)");
let mut global_events = self.session_manager.subscribe_all_session_events();
tokio::spawn(async move {
debug!("🌐 SSE Event Bridge: Started listening for session events");
while let Ok((session_id, event)) = global_events.recv().await {
debug!(
"📡 SSE Bridge: Received event from session {}: {:?}",
session_id, event
);
match event {
crate::session::SessionEvent::Custom { ref event_type, .. } => {
debug!(
"📡 SSE Bridge: observed custom event '{}' for session {} (dispatcher handles persistence)",
event_type, session_id
);
}
other_event => {
debug!("⏭ SSE Bridge: Skipping non-custom event: {:?}", other_event);
}
}
}
debug!("🚫 SSE Event Bridge: Global event receiver closed");
});
info!("✅ SSE event bridge established successfully");
}
#[cfg(feature = "http")]
pub async fn run_with_sse_access(
&self,
) -> Result<(
turul_http_mcp_server::HttpMcpServer,
tokio::task::JoinHandle<turul_http_mcp_server::Result<()>>,
)> {
info!(
"Starting MCP server: {} v{}",
self.implementation.name, self.implementation.version
);
info!("Session management: enabled with automatic cleanup");
if self.enable_sse {
info!("SSE notifications: enabled - SSE manager available for notifications");
}
let _cleanup_task = self.session_manager.clone().start_cleanup_task();
if let Some(ref runtime) = self.task_runtime {
match runtime.recover_stuck_tasks().await {
Ok(recovered) if !recovered.is_empty() => {
info!(
count = recovered.len(),
"Recovered stuck tasks from previous session"
);
}
Err(e) => {
warn!(error = %e, "Failed to recover stuck tasks on startup");
}
_ => {}
}
}
#[cfg(feature = "dynamic-tools")]
if self.coordination_enabled {
if let Some(ref registry) = self.tool_registry {
match registry.sync_from_storage().await {
Ok(crate::tool_registry::SyncResult::InitializedStorage) => {
info!("Dynamic: initialized shared storage with local tool state");
}
Ok(crate::tool_registry::SyncResult::InSync) => {
info!("Dynamic: local tools match shared storage");
}
Ok(crate::tool_registry::SyncResult::UpdatedStorage { old_fingerprint }) => {
warn!("Dynamic: updated shared storage (old fingerprint: {}). Other running instances may be serving stale tools.", old_fingerprint);
}
Err(e) => {
error!("Dynamic: failed to sync with shared storage: {}. Continuing with local state.", e);
}
}
let poll_interval = registry.check_ttl();
let _poll_handle = registry.start_polling(poll_interval);
debug!("Dynamic: started background polling (interval: {:?})", poll_interval);
}
}
let mut tool_handler = SessionAwareToolHandler::new(
self.tools.clone(),
self.session_manager.clone(),
self.strict_lifecycle,
);
if let Some(ref runtime) = self.task_runtime {
tool_handler = tool_handler.with_task_runtime(Arc::clone(runtime));
}
#[cfg(feature = "dynamic-tools")]
if let Some(ref registry) = self.tool_registry {
tool_handler = tool_handler.with_tool_registry(Arc::clone(registry));
}
let mut init_handler = SessionAwareInitializeHandler::new(
self.implementation.clone(),
self.capabilities.clone(),
self.instructions.clone(),
self.session_manager.clone(),
self.strict_lifecycle,
self.tool_fingerprint.clone(),
);
#[cfg(feature = "dynamic-tools")]
if let Some(ref registry) = self.tool_registry {
init_handler = init_handler.with_tool_registry(Arc::clone(registry));
}
let session_storage = self.session_manager.get_storage();
debug!("Configuring HTTP MCP server with session storage backend");
let mut builder =
turul_http_mcp_server::HttpMcpServer::builder_with_storage(session_storage)
.bind_address(self.bind_address)
.mcp_path(&self.mcp_path)
.cors(self.enable_cors)
.get_sse(self.enable_sse) .server_capabilities(self.capabilities.clone()) .with_middleware_stack(Arc::new(self.middleware_stack.clone())) .route_registry(Arc::clone(&self.route_registry)) .tool_fingerprint(self.tool_fingerprint.clone())
.register_handler(vec!["initialize".to_string()], init_handler)
.register_handler(vec!["tools/list".to_string()], {
let mut lth = ListToolsHandler::new_with_session_manager(
self.tools.clone(),
self.session_manager.clone(),
self.strict_lifecycle,
self.task_runtime.is_some(),
);
#[cfg(feature = "dynamic-tools")]
if let Some(ref registry) = self.tool_registry {
lth = lth.with_tool_registry(Arc::clone(registry));
}
lth
})
.register_handler(vec!["tools/call".to_string()], tool_handler);
if let Some(allow) = self.allow_unauthenticated_ping {
builder = builder.allow_unauthenticated_ping(allow);
}
for (method, handler) in &self.handlers {
let bridge_handler = SessionAwareMcpHandlerBridge::new(
handler.clone(),
self.session_manager.clone(),
self.strict_lifecycle,
);
builder = builder.register_handler(vec![method.clone()], bridge_handler);
}
use crate::handlers::InitializedNotificationHandler;
let initialized_handler = InitializedNotificationHandler::new(self.session_manager.clone());
let initialized_bridge = SessionAwareMcpHandlerBridge::new(
Arc::new(initialized_handler),
self.session_manager.clone(),
self.strict_lifecycle,
);
builder = builder.register_handler(
vec!["notifications/initialized".to_string()],
initialized_bridge,
);
let http_server = builder.build();
let server_task = {
let server = http_server.clone();
tokio::spawn(async move { server.run().await })
};
Ok((http_server, server_task))
}
pub fn session_storage_info(&self) -> &str {
if let Some(storage) = &self.session_storage {
debug!(
"Accessing session storage for info - backend is configured: {:p}",
storage
);
"Backend configured"
} else {
"No backend configured"
}
}
}
pub struct SessionAwareMcpHandlerBridge {
handler: Arc<dyn McpHandler>,
session_manager: Arc<SessionManager>,
strict_lifecycle: bool,
}
impl SessionAwareMcpHandlerBridge {
pub fn new(
handler: Arc<dyn McpHandler>,
session_manager: Arc<SessionManager>,
strict_lifecycle: bool,
) -> Self {
Self {
handler,
session_manager,
strict_lifecycle,
}
}
}
#[async_trait]
impl JsonRpcHandler for SessionAwareMcpHandlerBridge {
type Error = McpError;
async fn handle(
&self,
method: &str,
params: Option<turul_mcp_json_rpc_server::RequestParams>,
session_context: Option<turul_mcp_json_rpc_server::r#async::SessionContext>,
) -> std::result::Result<serde_json::Value, McpError> {
debug!("Handling {} request via session-aware bridge", method);
let mcp_session_context = if let Some(json_rpc_ctx) = session_context {
debug!(
"Converting JSON-RPC session context: session_id={}",
json_rpc_ctx.session_id
);
Some(SessionContext::from_json_rpc_with_broadcaster(
json_rpc_ctx,
self.session_manager.get_storage(),
))
} else {
let session_id = extract_session_id_from_params(¶ms);
if let Some(sid) = session_id {
debug!("Fallback: extracted session_id from params: {}", sid);
self.session_manager.create_session_context(&sid)
} else {
None
}
};
if self.strict_lifecycle
&& method != "initialize"
&& method != "notifications/initialized"
&& let Some(ref session_ctx) = mcp_session_context
{
let session_initialized = self
.session_manager
.is_session_initialized(&session_ctx.session_id)
.await;
if !session_initialized {
debug!(
"🚫 STRICT MODE: Rejecting {} request for session {} - session not yet initialized (waiting for notifications/initialized)",
method, session_ctx.session_id
);
return Err(McpError::SessionError(
"Session not initialized - client must send notifications/initialized first (strict lifecycle mode)".to_string()
));
}
}
let mcp_params = params.map(|p| p.to_value());
match self
.handler
.handle_with_session(mcp_params, mcp_session_context)
.await
{
Ok(result) => Ok(result),
Err(error) => {
error!("MCP handler error: {}", error);
Err(error) }
}
}
async fn handle_notification(
&self,
method: &str,
params: Option<turul_mcp_json_rpc_server::RequestParams>,
session_context: Option<turul_mcp_json_rpc_server::r#async::SessionContext>,
) -> std::result::Result<(), McpError> {
debug!("Handling {} notification via session-aware bridge", method);
let mcp_session_context = session_context.map(|json_rpc_ctx| {
SessionContext::from_json_rpc_with_broadcaster(
json_rpc_ctx,
self.session_manager.get_storage(),
)
});
if self.strict_lifecycle
&& method != "notifications/initialized"
&& let Some(ref session_ctx) = mcp_session_context
{
let session_initialized = self
.session_manager
.is_session_initialized(&session_ctx.session_id)
.await;
if !session_initialized {
tracing::debug!(
"🚫 STRICT MODE: Rejecting notification {} for session {} - session not yet initialized",
method,
session_ctx.session_id
);
return Err(McpError::SessionError(
"Session not initialized - client must send notifications/initialized first (strict lifecycle mode)".to_string()
));
}
}
let mcp_params = params.map(|p| p.to_value());
match self
.handler
.handle_with_session(mcp_params, mcp_session_context)
.await
{
Ok(_result) => Ok(()), Err(error) => {
tracing::error!("MCP notification handler error: {}", error);
Err(error)
}
}
}
fn supported_methods(&self) -> Vec<String> {
self.handler.supported_methods()
}
}
fn extract_session_id_from_params(
_params: &Option<turul_mcp_json_rpc_server::RequestParams>,
) -> Option<String> {
None
}
pub struct SessionAwareInitializeHandler {
implementation: Implementation,
capabilities: ServerCapabilities,
instructions: Option<String>,
session_manager: Arc<SessionManager>,
strict_lifecycle: bool,
tool_fingerprint: String,
#[cfg(feature = "dynamic-tools")]
tool_registry: Option<Arc<crate::tool_registry::ToolRegistry>>,
}
impl SessionAwareInitializeHandler {
pub fn new(
implementation: Implementation,
capabilities: ServerCapabilities,
instructions: Option<String>,
session_manager: Arc<SessionManager>,
strict_lifecycle: bool,
tool_fingerprint: String,
) -> Self {
Self {
implementation,
capabilities,
instructions,
session_manager,
strict_lifecycle,
tool_fingerprint,
#[cfg(feature = "dynamic-tools")]
tool_registry: None,
}
}
#[cfg(feature = "dynamic-tools")]
pub fn with_tool_registry(mut self, registry: Arc<crate::tool_registry::ToolRegistry>) -> Self {
self.tool_registry = Some(registry);
self
}
fn negotiate_version(&self, client_version: &str) -> std::result::Result<McpVersion, String> {
use turul_mcp_protocol::version::McpVersion;
let requested_version = match client_version.parse::<McpVersion>() {
Ok(version) => version,
Err(_) => {
if client_version.matches('-').count() == 2 {
if client_version > McpVersion::LATEST.as_str() {
McpVersion::LATEST
} else if client_version < "2024-11-05" {
return Err(format!(
"Cannot negotiate compatible version with client version {} (server requires at least {})",
client_version, "2024-11-05"
));
} else {
return Err(format!("Unknown protocol version: {}", client_version));
}
} else {
return Err(format!(
"Invalid protocol version format: {}",
client_version
));
}
}
};
let supported_versions = [
McpVersion::V2024_11_05,
McpVersion::V2025_03_26,
McpVersion::V2025_06_18,
McpVersion::V2025_11_25,
];
if supported_versions.contains(&requested_version) {
return Ok(requested_version);
}
let compatible_versions: Vec<_> = supported_versions
.iter()
.filter(|&&v| v <= requested_version)
.collect();
if let Some(&&best_version) = compatible_versions.iter().max() {
Ok(best_version)
} else {
Err(format!(
"Cannot negotiate compatible version with client version {} (server requires at least {})",
client_version,
supported_versions.iter().min().unwrap()
))
}
}
fn adjust_capabilities_for_version(&self, version: McpVersion) -> ServerCapabilities {
let adjusted = self.capabilities.clone();
debug!(
"Server capabilities adjusted for protocol version {}",
version
);
debug!(
"Capabilities: logging={}, tools={}, resources={}, prompts={}",
adjusted.logging.is_some(),
adjusted.tools.is_some(),
adjusted.resources.is_some(),
adjusted.prompts.is_some()
);
adjusted
}
}
#[async_trait]
impl JsonRpcHandler for SessionAwareInitializeHandler {
type Error = McpError;
async fn handle(
&self,
method: &str,
params: Option<turul_mcp_json_rpc_server::RequestParams>,
session_context: Option<turul_mcp_json_rpc_server::r#async::SessionContext>,
) -> std::result::Result<serde_json::Value, McpError> {
debug!("Handling {} request with session support", method);
if method != "initialize" {
return Err(McpError::InvalidParameters(format!(
"Method not supported: {}",
method
)));
}
let request = if let Some(params) = params {
let params_value = params.to_value();
serde_json::from_value::<InitializeRequest>(params_value).map_err(|e| {
McpError::InvalidParameters(format!("Invalid initialize request: {}", e))
})?
} else {
return Err(McpError::MissingParameter(
"Missing parameters for initialize".to_string(),
));
};
let negotiated_version = match self.negotiate_version(&request.protocol_version) {
Ok(version) => {
info!(
"Protocol version negotiated: {} (client requested: {})",
version, request.protocol_version
);
version
}
Err(e) => {
error!("Protocol version negotiation failed: {}", e);
return Err(McpError::ConfigurationError(format!(
"Version negotiation failed: {}",
e
)));
}
};
let session_id = if let Some(ctx) = &session_context {
debug!("Using session from context: {}", ctx.session_id);
let cache_exists = self
.session_manager
.session_exists_in_cache(&ctx.session_id)
.await;
debug!(
"Session {} exists in cache: {}",
ctx.session_id, cache_exists
);
if !cache_exists {
debug!("Session {} not in cache, checking storage", ctx.session_id);
match self
.session_manager
.load_session_from_storage(&ctx.session_id)
.await
{
Ok(true) => {
debug!(
"Session {} loaded from storage with preserved capabilities",
ctx.session_id
);
}
Ok(false) => {
warn!(
"Session {} not found in storage, creating with defaults",
ctx.session_id
);
self.session_manager
.add_session_to_cache(
ctx.session_id.clone(),
self.session_manager.get_default_capabilities(),
)
.await;
}
Err(e) => {
error!(
"Failed to load session {} from storage: {}",
ctx.session_id, e
);
self.session_manager
.add_session_to_cache(
ctx.session_id.clone(),
self.session_manager.get_default_capabilities(),
)
.await;
}
}
} else {
debug!("Session {} already exists in cache", ctx.session_id);
}
ctx.session_id.clone()
} else {
debug!("No session context provided, creating new session");
self.session_manager.create_session().await
};
self.session_manager
.set_session_state(
&session_id,
"client_info",
serde_json::to_value(&request.client_info).map_err(McpError::SerializationError)?,
)
.await;
self.session_manager
.set_session_state(
&session_id,
"client_capabilities",
serde_json::to_value(&request.capabilities)
.map_err(McpError::SerializationError)?,
)
.await;
self.session_manager
.set_session_state(
&session_id,
"negotiated_version",
serde_json::to_value(negotiated_version).map_err(McpError::SerializationError)?,
)
.await;
if !self.tool_fingerprint.is_empty() {
#[cfg(feature = "dynamic-tools")]
let fingerprint = if let Some(ref registry) = self.tool_registry {
registry.fingerprint().await
} else {
self.tool_fingerprint.clone()
};
#[cfg(not(feature = "dynamic-tools"))]
let fingerprint = self.tool_fingerprint.clone();
self.session_manager
.set_session_state(
&session_id,
"mcp:tool_fingerprint",
serde_json::json!(fingerprint),
)
.await;
}
self.session_manager
.set_session_state(
&session_id,
"mcp_version",
serde_json::json!(negotiated_version.as_str()),
)
.await;
if !self.strict_lifecycle {
debug!(
"📝 LENIENT MODE: Immediately initializing session {} (strict_lifecycle=false)",
session_id
);
if let Err(e) = self
.session_manager
.initialize_session_with_version(
&session_id,
request.client_info,
request.capabilities,
negotiated_version,
)
.await
{
error!("❌ Failed to initialize session {}: {}", session_id, e);
return Err(McpError::SessionError(format!(
"Failed to initialize session: {}",
e
)));
}
info!(
"✅ Session {} created and immediately initialized with protocol version {} (lenient mode)",
session_id, negotiated_version
);
} else {
debug!(
"⏳ Session {} created and ready for client with protocol version {} (strict mode - waiting for notifications/initialized)",
session_id, negotiated_version
);
}
let adjusted_capabilities = self.adjust_capabilities_for_version(negotiated_version);
let mut response = InitializeResult::new(
negotiated_version,
adjusted_capabilities,
self.implementation.clone(),
);
if let Some(instructions) = &self.instructions {
response = response.with_instructions(instructions.clone());
}
serde_json::to_value(response).map_err(McpError::SerializationError)
}
fn supported_methods(&self) -> Vec<String> {
vec!["initialize".to_string()]
}
}
pub struct ListToolsHandler {
tools: HashMap<String, Arc<dyn McpTool>>,
session_manager: Option<Arc<SessionManager>>,
strict_lifecycle: bool,
has_tasks: bool,
#[cfg(feature = "dynamic-tools")]
tool_registry: Option<Arc<crate::tool_registry::ToolRegistry>>,
}
impl ListToolsHandler {
pub fn new(tools: HashMap<String, Arc<dyn McpTool>>, has_tasks: bool) -> Self {
Self {
tools,
session_manager: None,
strict_lifecycle: false,
has_tasks,
#[cfg(feature = "dynamic-tools")]
tool_registry: None,
}
}
pub fn new_with_session_manager(
tools: HashMap<String, Arc<dyn McpTool>>,
session_manager: Arc<SessionManager>,
strict_lifecycle: bool,
has_tasks: bool,
) -> Self {
Self {
tools,
session_manager: Some(session_manager),
strict_lifecycle,
has_tasks,
#[cfg(feature = "dynamic-tools")]
tool_registry: None,
}
}
#[cfg(feature = "dynamic-tools")]
pub fn with_tool_registry(mut self, registry: Arc<crate::tool_registry::ToolRegistry>) -> Self {
self.tool_registry = Some(registry);
self
}
}
#[async_trait]
impl JsonRpcHandler for ListToolsHandler {
type Error = McpError;
async fn handle(
&self,
method: &str,
params: Option<turul_mcp_json_rpc_server::RequestParams>,
session_context: Option<turul_mcp_json_rpc_server::r#async::SessionContext>,
) -> std::result::Result<serde_json::Value, McpError> {
use turul_mcp_protocol::meta::{Cursor, PaginatedResponse};
debug!("Handling {} request", method);
if self.strict_lifecycle
&& let (Some(session_manager), Some(session_ctx)) =
(&self.session_manager, &session_context)
{
let session_initialized = session_manager
.is_session_initialized(&session_ctx.session_id)
.await;
if !session_initialized {
debug!(
"🚫 STRICT MODE: Rejecting {} request for session {} - session not yet initialized (waiting for notifications/initialized)",
method, session_ctx.session_id
);
return Err(McpError::SessionError(
"Session not initialized - client must send notifications/initialized first (strict lifecycle mode)".to_string()
));
}
}
if method != "tools/list" {
return Err(McpError::InvalidParameters(format!(
"Method '{}' not supported by tools/list handler",
method
)));
}
use turul_mcp_protocol::tools::{ListToolsParams, ListToolsResult};
let list_params = if let Some(params_value) = params {
serde_json::from_value::<ListToolsParams>(params_value.to_value()).map_err(|e| {
McpError::InvalidParameters(format!("Invalid parameters for tools/list: {}", e))
})?
} else {
ListToolsParams::new()
};
let cursor = list_params.cursor;
debug!("Listing tools with cursor: {:?}", cursor);
#[cfg(feature = "dynamic-tools")]
let mut tools: Vec<Tool> = if let Some(ref registry) = self.tool_registry {
registry.list_active_tools().await
} else {
let mut t: Vec<Tool> = self
.tools
.values()
.map(|tool| tool_to_descriptor(tool.as_ref()))
.collect();
t.sort_by(|a, b| a.name.cmp(&b.name));
t
};
#[cfg(not(feature = "dynamic-tools"))]
let mut tools: Vec<Tool> = {
let mut t: Vec<Tool> = self
.tools
.values()
.map(|tool| tool_to_descriptor(tool.as_ref()))
.collect();
t.sort_by(|a, b| a.name.cmp(&b.name));
t
};
if !self.has_tasks {
for tool in &mut tools {
tool.execution = None;
}
}
const DEFAULT_PAGE_SIZE: usize = 50; const MAX_LIMIT: u32 = 100;
if let Some(limit) = list_params.limit
&& limit == 0
{
return Err(McpError::InvalidParameters(
"limit must be a positive integer (> 0)".to_string(),
));
}
let page_size = list_params
.limit
.map(|l| std::cmp::min(l, MAX_LIMIT) as usize)
.unwrap_or(DEFAULT_PAGE_SIZE);
let start_index = if let Some(cursor) = &cursor {
let cursor_name = cursor.as_str();
tools
.iter()
.position(|t| t.name.as_str() > cursor_name)
.unwrap_or(tools.len())
} else {
0 };
let end_index = std::cmp::min(start_index + page_size, tools.len());
let page_tools: Vec<Tool> = tools[start_index..end_index].to_vec();
let has_more = end_index < tools.len();
let next_cursor = if has_more {
page_tools.last().map(|t| Cursor::new(&t.name))
} else {
None
};
debug!(
"Tool pagination: start={}, end={}, page_size={}, has_more={}, next_cursor={:?}",
start_index,
end_index,
page_tools.len(),
has_more,
next_cursor
);
let mut base_response = ListToolsResult::new(page_tools);
let total = Some(tools.len() as u64);
if let Some(ref cursor) = next_cursor {
base_response = base_response.with_next_cursor(cursor.clone());
}
let next_cursor_clone = next_cursor.clone();
let mut paginated_response =
PaginatedResponse::with_pagination(base_response, next_cursor, total, has_more);
if let Some(request_meta) = list_params.meta {
let mut response_meta = paginated_response.meta().cloned().unwrap_or_else(|| {
turul_mcp_protocol::meta::Meta::with_pagination(next_cursor_clone, total, has_more)
});
for (key, value) in request_meta {
response_meta.extra.insert(key, value);
}
paginated_response = paginated_response.with_meta(response_meta);
}
serde_json::to_value(paginated_response).map_err(McpError::SerializationError)
}
fn supported_methods(&self) -> Vec<String> {
vec!["tools/list".to_string()]
}
}
pub struct SessionAwareToolHandler {
tools: HashMap<String, Arc<dyn McpTool>>,
session_manager: Arc<SessionManager>,
strict_lifecycle: bool,
task_runtime: Option<Arc<crate::task::runtime::TaskRuntime>>,
#[cfg(feature = "dynamic-tools")]
tool_registry: Option<Arc<crate::tool_registry::ToolRegistry>>,
}
impl SessionAwareToolHandler {
pub fn new(
tools: HashMap<String, Arc<dyn McpTool>>,
session_manager: Arc<SessionManager>,
strict_lifecycle: bool,
) -> Self {
Self {
tools,
session_manager,
strict_lifecycle,
task_runtime: None,
#[cfg(feature = "dynamic-tools")]
tool_registry: None,
}
}
pub fn with_task_runtime(mut self, runtime: Arc<crate::task::runtime::TaskRuntime>) -> Self {
self.task_runtime = Some(runtime);
self
}
#[cfg(feature = "dynamic-tools")]
pub fn with_tool_registry(mut self, registry: Arc<crate::tool_registry::ToolRegistry>) -> Self {
self.tool_registry = Some(registry);
self
}
}
#[async_trait]
impl JsonRpcHandler for SessionAwareToolHandler {
type Error = McpError;
async fn handle(
&self,
method: &str,
params: Option<turul_mcp_json_rpc_server::RequestParams>,
session_context: Option<turul_mcp_json_rpc_server::r#async::SessionContext>,
) -> std::result::Result<serde_json::Value, McpError> {
debug!("Handling {} request with session support", method);
if method != "tools/call" {
return Err(McpError::InvalidParameters(format!(
"Method '{}' not supported by tools/call handler",
method
)));
}
if self.strict_lifecycle {
if let Some(ref session_ctx) = session_context {
let session_initialized = self
.session_manager
.is_session_initialized(&session_ctx.session_id)
.await;
if !session_initialized {
debug!(
"🚫 STRICT MODE: Rejecting {} request for session {} - session not yet initialized (waiting for notifications/initialized)",
method, session_ctx.session_id
);
return Err(McpError::SessionError(
"Session not initialized - client must send notifications/initialized first (strict lifecycle mode)".to_string(),
));
}
debug!(
"✅ STRICT MODE: Session {} is initialized - allowing {} request",
session_ctx.session_id, method
);
}
} else {
debug!(
"📝 LENIENT MODE: Allowing {} request without lifecycle check (strict_lifecycle=false)",
method
);
}
let params =
params.ok_or_else(|| McpError::MissingParameter("CallToolRequest".to_string()))?;
use turul_mcp_protocol::param_extraction::extract_params;
let call_params: turul_mcp_protocol::tools::CallToolParams = extract_params(params)?;
#[cfg(feature = "dynamic-tools")]
let tool: Arc<dyn McpTool> = if let Some(ref registry) = self.tool_registry {
registry
.get_tool(&call_params.name)
.await
.ok_or_else(|| McpError::ToolNotFound(call_params.name.clone()))?
} else {
Arc::clone(
self.tools
.get(&call_params.name)
.ok_or_else(|| McpError::ToolNotFound(call_params.name.clone()))?,
)
};
#[cfg(not(feature = "dynamic-tools"))]
let tool: Arc<dyn McpTool> = Arc::clone(
self.tools
.get(&call_params.name)
.ok_or_else(|| McpError::ToolNotFound(call_params.name.clone()))?,
);
let mcp_session_context = if let Some(json_rpc_ctx) = session_context {
debug!(
"Converting JSON-RPC session context for tool call: session_id={}",
json_rpc_ctx.session_id
);
Some(SessionContext::from_json_rpc_with_broadcaster(
json_rpc_ctx,
self.session_manager.get_storage(),
))
} else {
debug!("No session context provided for tool call");
None
};
let args = call_params
.arguments
.map(|hashmap| {
serde_json::to_value(hashmap)
.unwrap_or(serde_json::Value::Object(serde_json::Map::new()))
})
.unwrap_or_else(|| serde_json::Value::Object(serde_json::Map::new()));
{
use turul_mcp_protocol::tools::TaskSupport;
let tool_descriptor = tool.to_tool();
if let Some(ref exec) = tool_descriptor.execution {
if call_params.task.is_some() && exec.task_support == Some(TaskSupport::Forbidden) {
return Err(McpError::InvalidParameters(format!(
"Tool '{}' has taskSupport=forbidden; task-augmented requests are not allowed",
call_params.name
)));
}
if call_params.task.is_none() && exec.task_support == Some(TaskSupport::Required) {
return Err(McpError::InvalidParameters(format!(
"Tool '{}' has taskSupport=required; requests must include task augmentation",
call_params.name
)));
}
} else if call_params.task.is_some() {
return Err(McpError::InvalidParameters(format!(
"Tool '{}' does not declare task support; task-augmented requests are not allowed",
call_params.name
)));
}
}
if call_params.task.is_some() && self.task_runtime.is_none() {
return Err(McpError::InvalidParameters(
"Task-augmented tool calls require the server to have task support configured"
.into(),
));
}
if let (Some(task_meta), Some(runtime)) = (call_params.task, self.task_runtime.as_ref()) {
use turul_mcp_protocol::tasks::{CreateTaskResult, Task};
use turul_mcp_task_storage::{TaskOutcome, TaskRecord};
let task_id = uuid::Uuid::now_v7().as_simple().to_string();
let now = chrono::Utc::now().to_rfc3339();
let session_id = mcp_session_context
.as_ref()
.map(|ctx| ctx.session_id.to_string());
let record = TaskRecord {
task_id: task_id.clone(),
session_id: session_id.clone(),
status: turul_mcp_protocol::TaskStatus::Working,
status_message: Some("Executing tool".to_string()),
created_at: now.clone(),
last_updated_at: now,
ttl: task_meta.ttl.map(|t| t as i64),
poll_interval: Some(1_000),
original_method: "tools/call".to_string(),
original_params: Some(serde_json::json!({
"name": call_params.name,
"arguments": &args,
})),
result: None,
meta: None,
};
let created = runtime.register_task(record).await.map_err(|e| {
McpError::ToolExecutionError(format!("Failed to create task: {}", e))
})?;
let tool = Arc::clone(&tool);
let runtime_for_work = Arc::clone(runtime);
let task_id_for_work = task_id.clone();
let work: crate::task::executor::BoxedTaskWork = Box::new(move || {
Box::pin(async move {
let outcome = match tool.call(args, mcp_session_context).await {
Ok(result) => match serde_json::to_value(&result) {
Ok(value) => TaskOutcome::Success(value),
Err(e) => TaskOutcome::Error {
code: -32603,
message: format!("Serialization error: {}", e),
data: None,
},
},
Err(mcp_err) => TaskOutcome::Error {
code: -32603, message: mcp_err.to_string(),
data: None,
},
};
let terminal_status = match &outcome {
TaskOutcome::Success(_) => turul_mcp_protocol::TaskStatus::Completed,
TaskOutcome::Error { .. } => turul_mcp_protocol::TaskStatus::Failed,
};
if let Err(e) = runtime_for_work
.complete_task(&task_id_for_work, outcome.clone(), terminal_status, None)
.await
{
error!(task_id = %task_id_for_work, error = %e, "Failed to persist task result");
}
outcome
})
});
let _handle = runtime
.executor()
.start_task(&task_id, work)
.await
.map_err(|e| {
McpError::ToolExecutionError(format!("Failed to start task execution: {}", e))
})?;
let task = Task {
task_id: created.task_id.clone(),
status: created.status,
created_at: created.created_at.clone(),
last_updated_at: created.last_updated_at.clone(),
status_message: created.status_message.clone(),
ttl: created.ttl,
poll_interval: created.poll_interval,
meta: None,
};
let result = CreateTaskResult { task, meta: None };
serde_json::to_value(result).map_err(McpError::SerializationError)
} else {
match tool.call(args, mcp_session_context).await {
Ok(response) => {
serde_json::to_value(response).map_err(McpError::SerializationError)
}
Err(error_msg) => {
error!("Tool execution error: {}", error_msg);
Err(error_msg)
}
}
}
}
fn supported_methods(&self) -> Vec<String> {
vec!["tools/call".to_string()]
}
}
impl std::fmt::Debug for McpServer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("McpServer")
.field("implementation", &self.implementation)
.field("capabilities", &self.capabilities)
.field("tools", &format!("HashMap with {} tools", self.tools.len()))
.field("instructions", &self.instructions)
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::McpTool;
use async_trait::async_trait;
use serde_json::Value;
use std::collections::HashMap;
use turul_mcp_builders::prelude::*;
use turul_mcp_protocol::ToolSchema;
use turul_mcp_protocol::tools::{CallToolResult, ToolResult};
struct TestTool {
input_schema: ToolSchema,
}
impl TestTool {
fn new() -> Self {
Self {
input_schema: ToolSchema::object(),
}
}
}
impl HasBaseMetadata for TestTool {
fn name(&self) -> &str {
"test"
}
fn title(&self) -> Option<&str> {
Some("Test Tool")
}
}
impl HasDescription for TestTool {
fn description(&self) -> Option<&str> {
Some("Test tool for unit tests")
}
}
impl HasInputSchema for TestTool {
fn input_schema(&self) -> &ToolSchema {
&self.input_schema
}
}
impl HasOutputSchema for TestTool {
fn output_schema(&self) -> Option<&ToolSchema> {
None
}
}
impl HasAnnotations for TestTool {
fn annotations(&self) -> Option<&turul_mcp_protocol::tools::ToolAnnotations> {
None
}
}
impl HasToolMeta for TestTool {
fn tool_meta(&self) -> Option<&HashMap<String, Value>> {
None
}
}
impl HasIcons for TestTool {}
impl HasExecution for TestTool {}
#[async_trait]
impl McpTool for TestTool {
async fn call(
&self,
_args: Value,
_session: Option<crate::SessionContext>,
) -> crate::McpResult<CallToolResult> {
Ok(CallToolResult::success(vec![ToolResult::text(
"test result",
)]))
}
}
#[test]
fn test_server_creation() {
let server = McpServer::builder()
.name("test-server")
.version("1.0.0")
.tool(TestTool::new())
.build()
.unwrap();
assert_eq!(server.implementation.name, "test-server");
assert_eq!(server.implementation.version, "1.0.0");
assert_eq!(server.tools.len(), 1);
}
#[tokio::test]
async fn test_list_tools_handler() {
let mut tools: HashMap<String, Arc<dyn McpTool>> = HashMap::new();
tools.insert("test".to_string(), Arc::new(TestTool::new()));
let handler = ListToolsHandler::new(tools, false);
let result = handler.handle("tools/list", None, None).await.unwrap();
let response: ListToolsResult = serde_json::from_value(result).unwrap();
assert_eq!(response.tools.len(), 1);
assert_eq!(response.tools[0].name, "test");
}
#[tokio::test]
async fn test_tool_handler() {
let mut tools: HashMap<String, Arc<dyn McpTool>> = HashMap::new();
tools.insert("test".to_string(), Arc::new(TestTool::new()));
let session_manager = Arc::new(SessionManager::new(ServerCapabilities::default()));
let handler = SessionAwareToolHandler::new(tools, session_manager, false);
let params = turul_mcp_json_rpc_server::RequestParams::Object(
[
("name".to_string(), serde_json::json!("test")),
("arguments".to_string(), serde_json::json!({})),
]
.into_iter()
.collect(),
);
let result = handler
.handle("tools/call", Some(params), None)
.await
.unwrap();
let response: CallToolResult = serde_json::from_value(result).unwrap();
assert_eq!(response.content.len(), 1);
if let ToolResult::Text { text, .. } = &response.content[0] {
assert_eq!(text, "test result");
}
}
}