turbomcp_server/routing/mod.rs
1//! Request routing and handler dispatch system
2//!
3//! This module provides a comprehensive routing system for MCP protocol requests,
4//! supporting all standard MCP methods with enterprise features like RBAC,
5//! JSON Schema validation, timeout management, and bidirectional communication.
6
7mod bidirectional;
8mod config;
9mod handlers;
10mod traits;
11mod utils;
12mod validation;
13
14// Re-export public types to maintain API compatibility
15pub use bidirectional::BidirectionalRouter;
16pub use config::RouterConfig;
17pub use traits::{Route, RouteHandler, RouteMetadata, ServerRequestDispatcher};
18
19use dashmap::DashMap;
20use futures::stream::{self, StreamExt};
21use std::collections::HashMap;
22use std::sync::Arc;
23use tracing::warn;
24use turbomcp_protocol::RequestContext;
25use turbomcp_protocol::{
26 jsonrpc::{JsonRpcRequest, JsonRpcResponse},
27 types::{
28 CreateMessageRequest, ElicitRequest, ElicitResult, ListRootsResult, PingRequest, PingResult,
29 },
30};
31
32use crate::capabilities::ServerToClientAdapter;
33use crate::metrics::ServerMetrics;
34use crate::registry::HandlerRegistry;
35use crate::{ServerError, ServerResult};
36
37use handlers::{HandlerContext, ProtocolHandlers};
38use turbomcp_protocol::context::capabilities::ServerToClientRequests;
39use utils::{error_response, method_not_found_response};
40use validation::{validate_request, validate_response};
41
42/// Request router for dispatching MCP requests to appropriate handlers
43pub struct RequestRouter {
44 /// Handler registry
45 registry: Arc<HandlerRegistry>,
46 /// Route configuration
47 config: RouterConfig,
48 /// Custom route handlers
49 custom_routes: HashMap<String, Arc<dyn RouteHandler>>,
50 /// Resource subscription counters by URI (reserved for future functionality)
51 #[allow(dead_code)]
52 resource_subscriptions: DashMap<String, usize>,
53 /// Bidirectional communication router
54 bidirectional: BidirectionalRouter,
55 /// Protocol handlers
56 handlers: ProtocolHandlers,
57 /// Server-to-client requests adapter for tool-initiated requests (sampling, elicitation, roots)
58 /// This is injected into RequestContext so tools can make server-initiated requests
59 server_to_client: Arc<dyn ServerToClientRequests>,
60}
61
62impl std::fmt::Debug for RequestRouter {
63 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64 f.debug_struct("RequestRouter")
65 .field("config", &self.config)
66 .field("custom_routes_count", &self.custom_routes.len())
67 .finish()
68 }
69}
70
71impl RequestRouter {
72 /// Create a new request router
73 #[must_use]
74 pub fn new(registry: Arc<HandlerRegistry>, _metrics: Arc<ServerMetrics>) -> Self {
75 // Timeout management is now handled by middleware
76 let config = RouterConfig::default();
77
78 let handler_context = HandlerContext::new(Arc::clone(®istry));
79
80 let bidirectional = BidirectionalRouter::new();
81
82 // Create the server-to-client adapter that bridges bidirectional router
83 // to the ServerToClientRequests trait (type-safe, zero-cost abstraction)
84 let server_to_client: Arc<dyn ServerToClientRequests> =
85 Arc::new(ServerToClientAdapter::new(bidirectional.clone()));
86
87 Self {
88 registry,
89 config,
90 custom_routes: HashMap::new(),
91 resource_subscriptions: DashMap::new(),
92 bidirectional,
93 handlers: ProtocolHandlers::new(handler_context),
94 server_to_client,
95 }
96 }
97
98 /// Create a router with configuration
99 #[must_use]
100 pub fn with_config(
101 registry: Arc<HandlerRegistry>,
102 config: RouterConfig,
103 _metrics: Arc<ServerMetrics>,
104 ) -> Self {
105 // Timeout management is now handled by middleware
106
107 let handler_context = HandlerContext::new(Arc::clone(®istry));
108
109 let bidirectional = BidirectionalRouter::new();
110
111 // Create the server-to-client adapter that bridges bidirectional router
112 // to the ServerToClientRequests trait (type-safe, zero-cost abstraction)
113 let server_to_client: Arc<dyn ServerToClientRequests> =
114 Arc::new(ServerToClientAdapter::new(bidirectional.clone()));
115
116 Self {
117 registry,
118 config,
119 custom_routes: HashMap::new(),
120 resource_subscriptions: DashMap::new(),
121 bidirectional,
122 handlers: ProtocolHandlers::new(handler_context),
123 server_to_client,
124 }
125 }
126
127 // Timeout configuration now handled by middleware - no longer needed
128
129 /// Set the server request dispatcher for bidirectional communication
130 ///
131 /// CRITICAL: This also refreshes the server_to_client adapter so it sees the new dispatcher.
132 /// Without this refresh, the adapter would still point to the old (empty) bidirectional router.
133 pub fn set_server_request_dispatcher<D>(&mut self, dispatcher: D)
134 where
135 D: ServerRequestDispatcher + 'static,
136 {
137 self.bidirectional.set_dispatcher(dispatcher);
138
139 // CRITICAL FIX: Recreate the adapter so it sees the new dispatcher
140 // The adapter was created with a clone of bidirectional BEFORE the dispatcher was set.
141 // Since BidirectionalRouter::set_dispatcher() replaces the Option rather than mutating
142 // through it, the adapter's clone still has dispatcher: None.
143 // By recreating it here, we ensure it gets a fresh clone that includes the dispatcher.
144 self.server_to_client = Arc::new(ServerToClientAdapter::new(self.bidirectional.clone()));
145 }
146
147 /// Get the server request dispatcher
148 pub fn get_server_request_dispatcher(&self) -> Option<&Arc<dyn ServerRequestDispatcher>> {
149 self.bidirectional.get_dispatcher()
150 }
151
152 /// Check if bidirectional routing is enabled and supported
153 pub fn supports_bidirectional(&self) -> bool {
154 self.config.enable_bidirectional && self.bidirectional.supports_bidirectional()
155 }
156
157 /// Add a custom route handler
158 ///
159 /// # Errors
160 ///
161 /// Returns [`ServerError::Routing`] if a route for the same method already exists.
162 pub fn add_route<H>(&mut self, handler: H) -> ServerResult<()>
163 where
164 H: RouteHandler + 'static,
165 {
166 let metadata = handler.metadata();
167 let handler_arc: Arc<dyn RouteHandler> = Arc::new(handler);
168
169 for method in &metadata.methods {
170 if self.custom_routes.contains_key(method) {
171 return Err(ServerError::routing_with_method(
172 format!("Route for method '{method}' already exists"),
173 method.clone(),
174 ));
175 }
176 self.custom_routes
177 .insert(method.clone(), Arc::clone(&handler_arc));
178 }
179
180 Ok(())
181 }
182
183 /// Create a properly configured RequestContext for this router
184 ///
185 /// This factory method creates a RequestContext with all necessary capabilities
186 /// pre-configured, including server-to-client communication for bidirectional
187 /// features (sampling, elicitation, roots).
188 ///
189 /// **Design Pattern**: Explicit Factory
190 /// - Context is valid from creation (no broken intermediate state)
191 /// - Router provides factory but doesn't modify contexts
192 /// - Follows Single Responsibility Principle
193 ///
194 /// # Example
195 /// ```rust,ignore
196 /// let ctx = router.create_context();
197 /// let response = router.route(request, ctx).await;
198 /// ```
199 #[must_use]
200 pub fn create_context(&self) -> RequestContext {
201 RequestContext::new().with_server_to_client(Arc::clone(&self.server_to_client))
202 }
203
204 /// Route a JSON-RPC request to the appropriate handler
205 ///
206 /// **IMPORTANT**: The context should be created using `create_context()` to ensure
207 /// it has all necessary capabilities configured. This method does NOT modify the
208 /// context - it only routes the request.
209 ///
210 /// # Design Pattern
211 /// This follows the Single Responsibility Principle:
212 /// - `create_context()`: Creates properly configured contexts
213 /// - `route()`: Routes requests to handlers
214 ///
215 /// Previously, `route()` was modifying the context (adding server_to_client),
216 /// which violated SRP and created invalid intermediate states.
217 pub async fn route(&self, request: JsonRpcRequest, ctx: RequestContext) -> JsonRpcResponse {
218 // Validate request if enabled
219 if self.config.validate_requests
220 && let Err(e) = validate_request(&request)
221 {
222 return error_response(&request, e);
223 }
224
225 // Handle the request
226 let result = match request.method.as_str() {
227 // Core protocol methods
228 "initialize" => self.handlers.handle_initialize(request, ctx).await,
229
230 // Tool methods
231 "tools/list" => self.handlers.handle_list_tools(request, ctx).await,
232 "tools/call" => self.handlers.handle_call_tool(request, ctx).await,
233
234 // Prompt methods
235 "prompts/list" => self.handlers.handle_list_prompts(request, ctx).await,
236 "prompts/get" => self.handlers.handle_get_prompt(request, ctx).await,
237
238 // Resource methods
239 "resources/list" => self.handlers.handle_list_resources(request, ctx).await,
240 "resources/read" => self.handlers.handle_read_resource(request, ctx).await,
241 "resources/subscribe" => self.handlers.handle_subscribe_resource(request, ctx).await,
242 "resources/unsubscribe" => {
243 self.handlers
244 .handle_unsubscribe_resource(request, ctx)
245 .await
246 }
247
248 // Logging methods
249 "logging/setLevel" => self.handlers.handle_set_log_level(request, ctx).await,
250
251 // Sampling methods
252 "sampling/createMessage" => self.handlers.handle_create_message(request, ctx).await,
253
254 // Roots methods
255 "roots/list" => self.handlers.handle_list_roots(request, ctx).await,
256
257 // Enhanced MCP features (MCP 2025-06-18 protocol methods)
258 "elicitation/create" => self.handlers.handle_elicitation(request, ctx).await,
259 "completion/complete" => self.handlers.handle_completion(request, ctx).await,
260 "resources/templates/list" => {
261 self.handlers
262 .handle_list_resource_templates(request, ctx)
263 .await
264 }
265 "ping" => self.handlers.handle_ping(request, ctx).await,
266
267 // Custom routes
268 method => {
269 if let Some(handler) = self.custom_routes.get(method) {
270 let request_clone = request.clone();
271 handler
272 .handle(request, ctx)
273 .await
274 .unwrap_or_else(|e| error_response(&request_clone, e))
275 } else {
276 method_not_found_response(&request)
277 }
278 }
279 };
280
281 // Validate response if enabled
282 if self.config.validate_responses
283 && let Err(e) = validate_response(&result)
284 {
285 warn!("Response validation failed: {}", e);
286 }
287
288 result
289 }
290
291 /// Handle batch requests
292 pub async fn route_batch(
293 &self,
294 requests: Vec<JsonRpcRequest>,
295 ctx: RequestContext,
296 ) -> Vec<JsonRpcResponse> {
297 // Note: Server capabilities are injected in route() for each request
298 let max_in_flight = self.config.max_concurrent_requests.max(1);
299 stream::iter(requests.into_iter())
300 .map(|req| {
301 let ctx_cloned = ctx.clone();
302 async move { self.route(req, ctx_cloned).await }
303 })
304 .buffer_unordered(max_in_flight)
305 .collect()
306 .await
307 }
308
309 /// Send an elicitation request to the client (server-initiated)
310 ///
311 /// # Errors
312 ///
313 /// Returns [`ServerError::Transport`] if:
314 /// - The bidirectional dispatcher is not configured
315 /// - The client request fails
316 /// - The client does not respond
317 pub async fn send_elicitation_to_client(
318 &self,
319 request: ElicitRequest,
320 ctx: RequestContext,
321 ) -> ServerResult<ElicitResult> {
322 self.bidirectional
323 .send_elicitation_to_client(request, ctx)
324 .await
325 }
326
327 /// Send a ping request to the client (server-initiated)
328 ///
329 /// # Errors
330 ///
331 /// Returns [`ServerError::Transport`] if:
332 /// - The bidirectional dispatcher is not configured
333 /// - The client request fails
334 /// - The client does not respond
335 pub async fn send_ping_to_client(
336 &self,
337 request: PingRequest,
338 ctx: RequestContext,
339 ) -> ServerResult<PingResult> {
340 self.bidirectional.send_ping_to_client(request, ctx).await
341 }
342
343 /// Send a create message request to the client (server-initiated)
344 ///
345 /// # Errors
346 ///
347 /// Returns [`ServerError::Transport`] if:
348 /// - The bidirectional dispatcher is not configured
349 /// - The client request fails
350 /// - The client does not support sampling
351 pub async fn send_create_message_to_client(
352 &self,
353 request: CreateMessageRequest,
354 ctx: RequestContext,
355 ) -> ServerResult<turbomcp_protocol::types::CreateMessageResult> {
356 self.bidirectional
357 .send_create_message_to_client(request, ctx)
358 .await
359 }
360
361 /// Send a list roots request to the client (server-initiated)
362 ///
363 /// # Errors
364 ///
365 /// Returns [`ServerError::Transport`] if:
366 /// - The bidirectional dispatcher is not configured
367 /// - The client request fails
368 /// - The client does not support roots
369 pub async fn send_list_roots_to_client(
370 &self,
371 request: turbomcp_protocol::types::ListRootsRequest,
372 ctx: RequestContext,
373 ) -> ServerResult<ListRootsResult> {
374 self.bidirectional
375 .send_list_roots_to_client(request, ctx)
376 .await
377 }
378}
379
380impl Clone for RequestRouter {
381 fn clone(&self) -> Self {
382 Self {
383 registry: Arc::clone(&self.registry),
384 config: self.config.clone(),
385 custom_routes: self.custom_routes.clone(),
386 resource_subscriptions: DashMap::new(),
387 bidirectional: self.bidirectional.clone(),
388 handlers: ProtocolHandlers::new(HandlerContext::new(Arc::clone(&self.registry))),
389 server_to_client: Arc::clone(&self.server_to_client),
390 }
391 }
392}
393
394// Design Note: ServerCapabilities trait implementation
395//
396// RequestRouter currently uses BidirectionalRouter for server-initiated requests
397// (sampling, elicitation, roots) instead of directly implementing the ServerCapabilities
398// trait from turbomcp_protocol::context::capabilities.
399//
400// Current Pattern:
401// - RequestRouter contains BidirectionalRouter which handles server-to-client requests
402// - BidirectionalRouter uses ServerRequestDispatcher trait for transport-agnostic dispatch
403// - This pattern provides better separation of concerns and testability
404//
405// Alternative (not implemented):
406// - RequestRouter could implement ServerCapabilities trait directly
407// - This would allow passing router as &dyn ServerCapabilities to tools
408// - Current pattern is preferred as it keeps routing and bidirectional concerns separate
409//
410// See: crates/turbomcp-server/src/routing/bidirectional.rs for current implementation
411
412/// Router alias for convenience
413pub type Router = RequestRouter;
414
415// ===================================================================
416// JsonRpcHandler Implementation - For HTTP Transport Integration
417// ===================================================================
418
419#[async_trait::async_trait]
420impl turbomcp_protocol::JsonRpcHandler for RequestRouter {
421 /// Handle a JSON-RPC request via the HTTP transport
422 ///
423 /// This implementation enables `RequestRouter` to be used directly with
424 /// the HTTP transport layer (`run_server`), supporting the builder pattern
425 /// for programmatic server construction.
426 ///
427 /// # Architecture
428 ///
429 /// - Parses raw JSON into `JsonRpcRequest`
430 /// - Creates default `RequestContext` (no auth/session for HTTP)
431 /// - Routes through the existing `route()` method
432 /// - Serializes `JsonRpcResponse` back to JSON
433 ///
434 /// This provides the same request handling as the macro pattern but
435 /// allows runtime handler registration via `ServerBuilder`.
436 async fn handle_request(&self, req_value: serde_json::Value) -> serde_json::Value {
437 // Parse the request
438 let req: JsonRpcRequest = match serde_json::from_value(req_value) {
439 Ok(r) => r,
440 Err(e) => {
441 return serde_json::json!({
442 "jsonrpc": "2.0",
443 "error": {
444 "code": -32700,
445 "message": format!("Parse error: {}", e)
446 },
447 "id": null
448 });
449 }
450 };
451
452 // Create properly configured context with server-to-client capabilities
453 // Note: For authenticated HTTP requests, middleware should add auth info via with_* methods
454 let ctx = self.create_context();
455
456 // Route the request through the standard routing system
457 let response = self.route(req, ctx).await;
458
459 // Serialize response
460 match serde_json::to_value(&response) {
461 Ok(v) => v,
462 Err(e) => {
463 serde_json::json!({
464 "jsonrpc": "2.0",
465 "error": {
466 "code": -32603,
467 "message": format!("Internal error: failed to serialize response: {}", e)
468 },
469 "id": response.id
470 })
471 }
472 }
473 }
474}
475
476// Comprehensive tests in separate file (tokio/axum pattern)
477#[cfg(test)]
478mod tests;