turbomcp_server/routing/mod.rs
1//! Request routing and handler dispatch system
2//!
3//! This module provides a comprehensive routing system for MCP protocol requests,
4//! supporting all standard MCP methods with enterprise features like RBAC,
5//! JSON Schema validation, timeout management, and bidirectional communication.
6
7mod bidirectional;
8mod config;
9mod handlers;
10mod traits;
11mod utils;
12mod validation;
13#[cfg(feature = "websocket")]
14mod websocket_dispatcher_adapter;
15
16// Re-export public types to maintain API compatibility
17pub use bidirectional::BidirectionalRouter;
18pub use config::RouterConfig;
19pub use traits::{Route, RouteHandler, RouteMetadata, ServerRequestDispatcher};
20#[cfg(feature = "websocket")]
21pub use websocket_dispatcher_adapter::WebSocketDispatcherAdapter;
22
23use dashmap::DashMap;
24use futures::stream::{self, StreamExt};
25use std::collections::HashMap;
26use std::sync::Arc;
27use tracing::warn;
28use turbomcp_protocol::RequestContext;
29use turbomcp_protocol::{
30 jsonrpc::{JsonRpcRequest, JsonRpcResponse},
31 types::{
32 CreateMessageRequest, ElicitRequest, ElicitResult, ListRootsResult, PingRequest, PingResult,
33 },
34};
35
36use crate::capabilities::ServerToClientAdapter;
37use crate::metrics::ServerMetrics;
38use crate::registry::HandlerRegistry;
39use crate::{ServerError, ServerResult};
40
41use handlers::{HandlerContext, ProtocolHandlers};
42use turbomcp_protocol::context::capabilities::ServerToClientRequests;
43use utils::{error_response, method_not_found_response};
44use validation::{validate_request, validate_response};
45
46/// Request router for dispatching MCP requests to appropriate handlers
47pub struct RequestRouter {
48 /// Handler registry
49 registry: Arc<HandlerRegistry>,
50 /// Route configuration
51 config: RouterConfig,
52 /// Server configuration (for protocol responses)
53 server_config: crate::config::ServerConfig,
54 /// Custom route handlers
55 custom_routes: HashMap<String, Arc<dyn RouteHandler>>,
56 /// Resource subscription counters by URI (reserved for future functionality)
57 #[allow(dead_code)]
58 resource_subscriptions: DashMap<String, usize>,
59 /// Bidirectional communication router
60 bidirectional: BidirectionalRouter,
61 /// Protocol handlers
62 handlers: ProtocolHandlers,
63 /// Server-to-client requests adapter for tool-initiated requests (sampling, elicitation, roots)
64 /// This is injected into RequestContext so tools can make server-initiated requests
65 server_to_client: Arc<dyn ServerToClientRequests>,
66}
67
68impl std::fmt::Debug for RequestRouter {
69 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70 f.debug_struct("RequestRouter")
71 .field("config", &self.config)
72 .field("custom_routes_count", &self.custom_routes.len())
73 .finish()
74 }
75}
76
77impl RequestRouter {
78 /// Create a new request router
79 #[must_use]
80 pub fn new(
81 registry: Arc<HandlerRegistry>,
82 _metrics: Arc<ServerMetrics>,
83 server_config: crate::config::ServerConfig,
84 ) -> Self {
85 // Timeout management is now handled by middleware
86 let config = RouterConfig::default();
87
88 let handler_context = HandlerContext::new(Arc::clone(®istry), server_config.clone());
89
90 let bidirectional = BidirectionalRouter::new();
91
92 // Create the server-to-client adapter that bridges bidirectional router
93 // to the ServerToClientRequests trait (type-safe, zero-cost abstraction)
94 let server_to_client: Arc<dyn ServerToClientRequests> =
95 Arc::new(ServerToClientAdapter::new(bidirectional.clone()));
96
97 Self {
98 registry,
99 config,
100 server_config,
101 custom_routes: HashMap::new(),
102 resource_subscriptions: DashMap::new(),
103 bidirectional,
104 handlers: ProtocolHandlers::new(handler_context),
105 server_to_client,
106 }
107 }
108
109 /// Create a router with configuration
110 #[must_use]
111 pub fn with_config(
112 registry: Arc<HandlerRegistry>,
113 config: RouterConfig,
114 _metrics: Arc<ServerMetrics>,
115 server_config: crate::config::ServerConfig,
116 ) -> Self {
117 // Timeout management is now handled by middleware
118
119 let handler_context = HandlerContext::new(Arc::clone(®istry), server_config.clone());
120
121 let bidirectional = BidirectionalRouter::new();
122
123 // Create the server-to-client adapter that bridges bidirectional router
124 // to the ServerToClientRequests trait (type-safe, zero-cost abstraction)
125 let server_to_client: Arc<dyn ServerToClientRequests> =
126 Arc::new(ServerToClientAdapter::new(bidirectional.clone()));
127
128 Self {
129 registry,
130 config,
131 server_config,
132 custom_routes: HashMap::new(),
133 resource_subscriptions: DashMap::new(),
134 bidirectional,
135 handlers: ProtocolHandlers::new(handler_context),
136 server_to_client,
137 }
138 }
139
140 // Timeout configuration now handled by middleware - no longer needed
141
142 /// Set the server request dispatcher for bidirectional communication
143 ///
144 /// CRITICAL: This also refreshes the server_to_client adapter so it sees the new dispatcher.
145 /// Without this refresh, the adapter would still point to the old (empty) bidirectional router.
146 pub fn set_server_request_dispatcher<D>(&mut self, dispatcher: D)
147 where
148 D: ServerRequestDispatcher + 'static,
149 {
150 self.bidirectional.set_dispatcher(dispatcher);
151
152 // CRITICAL FIX: Recreate the adapter so it sees the new dispatcher
153 // The adapter was created with a clone of bidirectional BEFORE the dispatcher was set.
154 // Since BidirectionalRouter::set_dispatcher() replaces the Option rather than mutating
155 // through it, the adapter's clone still has dispatcher: None.
156 // By recreating it here, we ensure it gets a fresh clone that includes the dispatcher.
157 self.server_to_client = Arc::new(ServerToClientAdapter::new(self.bidirectional.clone()));
158 }
159
160 /// Get the server request dispatcher
161 pub fn get_server_request_dispatcher(&self) -> Option<&Arc<dyn ServerRequestDispatcher>> {
162 self.bidirectional.get_dispatcher()
163 }
164
165 /// Check if bidirectional routing is enabled and supported
166 pub fn supports_bidirectional(&self) -> bool {
167 self.config.enable_bidirectional && self.bidirectional.supports_bidirectional()
168 }
169
170 /// Add a custom route handler
171 ///
172 /// # Errors
173 ///
174 /// Returns [`ServerError::Routing`] if a route for the same method already exists.
175 pub fn add_route<H>(&mut self, handler: H) -> ServerResult<()>
176 where
177 H: RouteHandler + 'static,
178 {
179 let metadata = handler.metadata();
180 let handler_arc: Arc<dyn RouteHandler> = Arc::new(handler);
181
182 for method in &metadata.methods {
183 if self.custom_routes.contains_key(method) {
184 return Err(ServerError::routing_with_method(
185 format!("Route for method '{method}' already exists"),
186 method.clone(),
187 ));
188 }
189 self.custom_routes
190 .insert(method.clone(), Arc::clone(&handler_arc));
191 }
192
193 Ok(())
194 }
195
196 /// Create a properly configured RequestContext for this router
197 ///
198 /// This factory method creates a RequestContext with all necessary capabilities
199 /// pre-configured, including server-to-client communication for bidirectional
200 /// features (sampling, elicitation, roots).
201 ///
202 /// **Design Pattern**: Explicit Factory
203 /// - Context is valid from creation (no broken intermediate state)
204 /// - Router provides factory but doesn't modify contexts
205 /// - Follows Single Responsibility Principle
206 ///
207 /// # Example
208 /// ```rust,ignore
209 /// let ctx = router.create_context();
210 /// let response = router.route(request, ctx).await;
211 /// ```
212 #[must_use]
213 pub fn create_context(&self) -> RequestContext {
214 RequestContext::new().with_server_to_client(Arc::clone(&self.server_to_client))
215 }
216
217 /// Route a JSON-RPC request to the appropriate handler
218 ///
219 /// **IMPORTANT**: The context should be created using `create_context()` to ensure
220 /// it has all necessary capabilities configured. This method does NOT modify the
221 /// context - it only routes the request.
222 ///
223 /// # Design Pattern
224 /// This follows the Single Responsibility Principle:
225 /// - `create_context()`: Creates properly configured contexts
226 /// - `route()`: Routes requests to handlers
227 ///
228 /// Previously, `route()` was modifying the context (adding server_to_client),
229 /// which violated SRP and created invalid intermediate states.
230 pub async fn route(&self, request: JsonRpcRequest, ctx: RequestContext) -> JsonRpcResponse {
231 // Validate request if enabled
232 if self.config.validate_requests
233 && let Err(e) = validate_request(&request)
234 {
235 return error_response(&request, e);
236 }
237
238 // Handle the request
239 let result = match request.method.as_str() {
240 // Core protocol methods
241 "initialize" => self.handlers.handle_initialize(request, ctx).await,
242
243 // Tool methods
244 "tools/list" => self.handlers.handle_list_tools(request, ctx).await,
245 "tools/call" => self.handlers.handle_call_tool(request, ctx).await,
246
247 // Prompt methods
248 "prompts/list" => self.handlers.handle_list_prompts(request, ctx).await,
249 "prompts/get" => self.handlers.handle_get_prompt(request, ctx).await,
250
251 // Resource methods
252 "resources/list" => self.handlers.handle_list_resources(request, ctx).await,
253 "resources/read" => self.handlers.handle_read_resource(request, ctx).await,
254 "resources/subscribe" => self.handlers.handle_subscribe_resource(request, ctx).await,
255 "resources/unsubscribe" => {
256 self.handlers
257 .handle_unsubscribe_resource(request, ctx)
258 .await
259 }
260
261 // Logging methods
262 "logging/setLevel" => self.handlers.handle_set_log_level(request, ctx).await,
263
264 // Sampling methods
265 "sampling/createMessage" => self.handlers.handle_create_message(request, ctx).await,
266
267 // Roots methods
268 "roots/list" => self.handlers.handle_list_roots(request, ctx).await,
269
270 // Enhanced MCP features (MCP 2025-06-18 protocol methods)
271 "elicitation/create" => self.handlers.handle_elicitation(request, ctx).await,
272 "completion/complete" => self.handlers.handle_completion(request, ctx).await,
273 "resources/templates/list" => {
274 self.handlers
275 .handle_list_resource_templates(request, ctx)
276 .await
277 }
278 "ping" => self.handlers.handle_ping(request, ctx).await,
279
280 // Custom routes
281 method => {
282 if let Some(handler) = self.custom_routes.get(method) {
283 let request_clone = request.clone();
284 handler
285 .handle(request, ctx)
286 .await
287 .unwrap_or_else(|e| error_response(&request_clone, e))
288 } else {
289 method_not_found_response(&request)
290 }
291 }
292 };
293
294 // Validate response if enabled
295 if self.config.validate_responses
296 && let Err(e) = validate_response(&result)
297 {
298 warn!("Response validation failed: {}", e);
299 }
300
301 result
302 }
303
304 /// Handle batch requests
305 pub async fn route_batch(
306 &self,
307 requests: Vec<JsonRpcRequest>,
308 ctx: RequestContext,
309 ) -> Vec<JsonRpcResponse> {
310 // Note: Server capabilities are injected in route() for each request
311 let max_in_flight = self.config.max_concurrent_requests.max(1);
312 stream::iter(requests.into_iter())
313 .map(|req| {
314 let ctx_cloned = ctx.clone();
315 async move { self.route(req, ctx_cloned).await }
316 })
317 .buffer_unordered(max_in_flight)
318 .collect()
319 .await
320 }
321
322 /// Send an elicitation request to the client (server-initiated)
323 ///
324 /// # Errors
325 ///
326 /// Returns [`ServerError::Transport`] if:
327 /// - The bidirectional dispatcher is not configured
328 /// - The client request fails
329 /// - The client does not respond
330 pub async fn send_elicitation_to_client(
331 &self,
332 request: ElicitRequest,
333 ctx: RequestContext,
334 ) -> ServerResult<ElicitResult> {
335 self.bidirectional
336 .send_elicitation_to_client(request, ctx)
337 .await
338 }
339
340 /// Send a ping request to the client (server-initiated)
341 ///
342 /// # Errors
343 ///
344 /// Returns [`ServerError::Transport`] if:
345 /// - The bidirectional dispatcher is not configured
346 /// - The client request fails
347 /// - The client does not respond
348 pub async fn send_ping_to_client(
349 &self,
350 request: PingRequest,
351 ctx: RequestContext,
352 ) -> ServerResult<PingResult> {
353 self.bidirectional.send_ping_to_client(request, ctx).await
354 }
355
356 /// Send a create message request to the client (server-initiated)
357 ///
358 /// # Errors
359 ///
360 /// Returns [`ServerError::Transport`] if:
361 /// - The bidirectional dispatcher is not configured
362 /// - The client request fails
363 /// - The client does not support sampling
364 pub async fn send_create_message_to_client(
365 &self,
366 request: CreateMessageRequest,
367 ctx: RequestContext,
368 ) -> ServerResult<turbomcp_protocol::types::CreateMessageResult> {
369 self.bidirectional
370 .send_create_message_to_client(request, ctx)
371 .await
372 }
373
374 /// Send a list roots request to the client (server-initiated)
375 ///
376 /// # Errors
377 ///
378 /// Returns [`ServerError::Transport`] if:
379 /// - The bidirectional dispatcher is not configured
380 /// - The client request fails
381 /// - The client does not support roots
382 pub async fn send_list_roots_to_client(
383 &self,
384 request: turbomcp_protocol::types::ListRootsRequest,
385 ctx: RequestContext,
386 ) -> ServerResult<ListRootsResult> {
387 self.bidirectional
388 .send_list_roots_to_client(request, ctx)
389 .await
390 }
391}
392
393impl Clone for RequestRouter {
394 fn clone(&self) -> Self {
395 Self {
396 registry: Arc::clone(&self.registry),
397 config: self.config.clone(),
398 server_config: self.server_config.clone(),
399 custom_routes: self.custom_routes.clone(),
400 resource_subscriptions: DashMap::new(),
401 bidirectional: self.bidirectional.clone(),
402 handlers: ProtocolHandlers::new(HandlerContext::new(
403 Arc::clone(&self.registry),
404 self.server_config.clone(),
405 )),
406 server_to_client: Arc::clone(&self.server_to_client),
407 }
408 }
409}
410
411// Design Note: ServerCapabilities trait implementation
412//
413// RequestRouter currently uses BidirectionalRouter for server-initiated requests
414// (sampling, elicitation, roots) instead of directly implementing the ServerCapabilities
415// trait from turbomcp_protocol::context::capabilities.
416//
417// Current Pattern:
418// - RequestRouter contains BidirectionalRouter which handles server-to-client requests
419// - BidirectionalRouter uses ServerRequestDispatcher trait for transport-agnostic dispatch
420// - This pattern provides better separation of concerns and testability
421//
422// Alternative (not implemented):
423// - RequestRouter could implement ServerCapabilities trait directly
424// - This would allow passing router as &dyn ServerCapabilities to tools
425// - Current pattern is preferred as it keeps routing and bidirectional concerns separate
426//
427// See: crates/turbomcp-server/src/routing/bidirectional.rs for current implementation
428
429/// Router alias for convenience
430pub type Router = RequestRouter;
431
432// ===================================================================
433// JsonRpcHandler Implementation - For HTTP Transport Integration
434// ===================================================================
435
436#[async_trait::async_trait]
437impl turbomcp_protocol::JsonRpcHandler for RequestRouter {
438 /// Handle a JSON-RPC request via the HTTP transport
439 ///
440 /// This implementation enables `RequestRouter` to be used directly with
441 /// the HTTP transport layer (`run_server`), supporting the builder pattern
442 /// for programmatic server construction.
443 ///
444 /// # Architecture
445 ///
446 /// - Parses raw JSON into `JsonRpcRequest`
447 /// - Creates default `RequestContext` (no auth/session for HTTP)
448 /// - Routes through the existing `route()` method
449 /// - Serializes `JsonRpcResponse` back to JSON
450 ///
451 /// This provides the same request handling as the macro pattern but
452 /// allows runtime handler registration via `ServerBuilder`.
453 async fn handle_request(&self, req_value: serde_json::Value) -> serde_json::Value {
454 // Parse the request
455 let req: JsonRpcRequest = match serde_json::from_value(req_value) {
456 Ok(r) => r,
457 Err(e) => {
458 return serde_json::json!({
459 "jsonrpc": "2.0",
460 "error": {
461 "code": -32700,
462 "message": format!("Parse error: {}", e)
463 },
464 "id": null
465 });
466 }
467 };
468
469 // Create properly configured context with server-to-client capabilities
470 // Note: For authenticated HTTP requests, middleware should add auth info via with_* methods
471 let ctx = self.create_context();
472
473 // Route the request through the standard routing system
474 let response = self.route(req, ctx).await;
475
476 // Serialize response
477 match serde_json::to_value(&response) {
478 Ok(v) => v,
479 Err(e) => {
480 serde_json::json!({
481 "jsonrpc": "2.0",
482 "error": {
483 "code": -32603,
484 "message": format!("Internal error: failed to serialize response: {}", e)
485 },
486 "id": response.id
487 })
488 }
489 }
490 }
491}
492
493// Comprehensive tests in separate file (tokio/axum pattern)
494#[cfg(test)]
495mod tests;