turbomcp_server/routing/mod.rs
1//! Request routing and handler dispatch system
2//!
3//! This module provides a comprehensive routing system for MCP protocol requests,
4//! supporting all standard MCP methods with enterprise features like RBAC,
5//! JSON Schema validation, timeout management, and bidirectional communication.
6
7mod bidirectional;
8mod config;
9mod handlers;
10mod traits;
11mod utils;
12mod validation;
13
14// Re-export public types to maintain API compatibility
15pub use bidirectional::BidirectionalRouter;
16pub use config::RouterConfig;
17pub use traits::{Route, RouteHandler, RouteMetadata, ServerRequestDispatcher};
18
19use dashmap::DashMap;
20use futures::stream::{self, StreamExt};
21use std::collections::HashMap;
22use std::sync::Arc;
23use tracing::warn;
24use turbomcp_protocol::RequestContext;
25use turbomcp_protocol::{
26 jsonrpc::{JsonRpcRequest, JsonRpcResponse},
27 types::{
28 CreateMessageRequest, ElicitRequest, ElicitResult, ListRootsResult, PingRequest, PingResult,
29 },
30};
31
32use crate::capabilities::ServerToClientAdapter;
33use crate::metrics::ServerMetrics;
34use crate::registry::HandlerRegistry;
35use crate::{ServerError, ServerResult};
36
37use handlers::{HandlerContext, ProtocolHandlers};
38use turbomcp_protocol::context::capabilities::ServerToClientRequests;
39use utils::{error_response, method_not_found_response};
40use validation::{validate_request, validate_response};
41
42/// Request router for dispatching MCP requests to appropriate handlers
43pub struct RequestRouter {
44 /// Handler registry
45 registry: Arc<HandlerRegistry>,
46 /// Route configuration
47 config: RouterConfig,
48 /// Server configuration (for protocol responses)
49 server_config: crate::config::ServerConfig,
50 /// Custom route handlers
51 custom_routes: HashMap<String, Arc<dyn RouteHandler>>,
52 /// Resource subscription counters by URI (reserved for future functionality)
53 #[allow(dead_code)]
54 resource_subscriptions: DashMap<String, usize>,
55 /// Bidirectional communication router
56 bidirectional: BidirectionalRouter,
57 /// Protocol handlers
58 handlers: ProtocolHandlers,
59 /// Server-to-client requests adapter for tool-initiated requests (sampling, elicitation, roots)
60 /// This is injected into RequestContext so tools can make server-initiated requests
61 server_to_client: Arc<dyn ServerToClientRequests>,
62}
63
64impl std::fmt::Debug for RequestRouter {
65 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66 f.debug_struct("RequestRouter")
67 .field("config", &self.config)
68 .field("custom_routes_count", &self.custom_routes.len())
69 .finish()
70 }
71}
72
73impl RequestRouter {
74 /// Create a new request router
75 #[must_use]
76 pub fn new(
77 registry: Arc<HandlerRegistry>,
78 _metrics: Arc<ServerMetrics>,
79 server_config: crate::config::ServerConfig,
80 ) -> Self {
81 // Timeout management is now handled by middleware
82 let config = RouterConfig::default();
83
84 let handler_context = HandlerContext::new(Arc::clone(®istry), server_config.clone());
85
86 let bidirectional = BidirectionalRouter::new();
87
88 // Create the server-to-client adapter that bridges bidirectional router
89 // to the ServerToClientRequests trait (type-safe, zero-cost abstraction)
90 let server_to_client: Arc<dyn ServerToClientRequests> =
91 Arc::new(ServerToClientAdapter::new(bidirectional.clone()));
92
93 Self {
94 registry,
95 config,
96 server_config,
97 custom_routes: HashMap::new(),
98 resource_subscriptions: DashMap::new(),
99 bidirectional,
100 handlers: ProtocolHandlers::new(handler_context),
101 server_to_client,
102 }
103 }
104
105 /// Create a router with configuration
106 #[must_use]
107 pub fn with_config(
108 registry: Arc<HandlerRegistry>,
109 config: RouterConfig,
110 _metrics: Arc<ServerMetrics>,
111 server_config: crate::config::ServerConfig,
112 ) -> Self {
113 // Timeout management is now handled by middleware
114
115 let handler_context = HandlerContext::new(Arc::clone(®istry), server_config.clone());
116
117 let bidirectional = BidirectionalRouter::new();
118
119 // Create the server-to-client adapter that bridges bidirectional router
120 // to the ServerToClientRequests trait (type-safe, zero-cost abstraction)
121 let server_to_client: Arc<dyn ServerToClientRequests> =
122 Arc::new(ServerToClientAdapter::new(bidirectional.clone()));
123
124 Self {
125 registry,
126 config,
127 server_config,
128 custom_routes: HashMap::new(),
129 resource_subscriptions: DashMap::new(),
130 bidirectional,
131 handlers: ProtocolHandlers::new(handler_context),
132 server_to_client,
133 }
134 }
135
136 // Timeout configuration now handled by middleware - no longer needed
137
138 /// Set the server request dispatcher for bidirectional communication
139 ///
140 /// CRITICAL: This also refreshes the server_to_client adapter so it sees the new dispatcher.
141 /// Without this refresh, the adapter would still point to the old (empty) bidirectional router.
142 pub fn set_server_request_dispatcher<D>(&mut self, dispatcher: D)
143 where
144 D: ServerRequestDispatcher + 'static,
145 {
146 self.bidirectional.set_dispatcher(dispatcher);
147
148 // CRITICAL FIX: Recreate the adapter so it sees the new dispatcher
149 // The adapter was created with a clone of bidirectional BEFORE the dispatcher was set.
150 // Since BidirectionalRouter::set_dispatcher() replaces the Option rather than mutating
151 // through it, the adapter's clone still has dispatcher: None.
152 // By recreating it here, we ensure it gets a fresh clone that includes the dispatcher.
153 self.server_to_client = Arc::new(ServerToClientAdapter::new(self.bidirectional.clone()));
154 }
155
156 /// Get the server request dispatcher
157 pub fn get_server_request_dispatcher(&self) -> Option<&Arc<dyn ServerRequestDispatcher>> {
158 self.bidirectional.get_dispatcher()
159 }
160
161 /// Check if bidirectional routing is enabled and supported
162 pub fn supports_bidirectional(&self) -> bool {
163 self.config.enable_bidirectional && self.bidirectional.supports_bidirectional()
164 }
165
166 /// Add a custom route handler
167 ///
168 /// # Errors
169 ///
170 /// Returns [`ServerError::Routing`] if a route for the same method already exists.
171 pub fn add_route<H>(&mut self, handler: H) -> ServerResult<()>
172 where
173 H: RouteHandler + 'static,
174 {
175 let metadata = handler.metadata();
176 let handler_arc: Arc<dyn RouteHandler> = Arc::new(handler);
177
178 for method in &metadata.methods {
179 if self.custom_routes.contains_key(method) {
180 return Err(ServerError::routing_with_method(
181 format!("Route for method '{method}' already exists"),
182 method.clone(),
183 ));
184 }
185 self.custom_routes
186 .insert(method.clone(), Arc::clone(&handler_arc));
187 }
188
189 Ok(())
190 }
191
192 /// Create a properly configured RequestContext for this router
193 ///
194 /// This factory method creates a RequestContext with all necessary capabilities
195 /// pre-configured, including server-to-client communication for bidirectional
196 /// features (sampling, elicitation, roots).
197 ///
198 /// **Design Pattern**: Explicit Factory
199 /// - Context is valid from creation (no broken intermediate state)
200 /// - Router provides factory but doesn't modify contexts
201 /// - Follows Single Responsibility Principle
202 ///
203 /// # Example
204 /// ```rust,ignore
205 /// let ctx = router.create_context();
206 /// let response = router.route(request, ctx).await;
207 /// ```
208 #[must_use]
209 pub fn create_context(&self) -> RequestContext {
210 RequestContext::new().with_server_to_client(Arc::clone(&self.server_to_client))
211 }
212
213 /// Route a JSON-RPC request to the appropriate handler
214 ///
215 /// **IMPORTANT**: The context should be created using `create_context()` to ensure
216 /// it has all necessary capabilities configured. This method does NOT modify the
217 /// context - it only routes the request.
218 ///
219 /// # Design Pattern
220 /// This follows the Single Responsibility Principle:
221 /// - `create_context()`: Creates properly configured contexts
222 /// - `route()`: Routes requests to handlers
223 ///
224 /// Previously, `route()` was modifying the context (adding server_to_client),
225 /// which violated SRP and created invalid intermediate states.
226 pub async fn route(&self, request: JsonRpcRequest, ctx: RequestContext) -> JsonRpcResponse {
227 // Validate request if enabled
228 if self.config.validate_requests
229 && let Err(e) = validate_request(&request)
230 {
231 return error_response(&request, e);
232 }
233
234 // Handle the request
235 let result = match request.method.as_str() {
236 // Core protocol methods
237 "initialize" => self.handlers.handle_initialize(request, ctx).await,
238
239 // Tool methods
240 "tools/list" => self.handlers.handle_list_tools(request, ctx).await,
241 "tools/call" => self.handlers.handle_call_tool(request, ctx).await,
242
243 // Prompt methods
244 "prompts/list" => self.handlers.handle_list_prompts(request, ctx).await,
245 "prompts/get" => self.handlers.handle_get_prompt(request, ctx).await,
246
247 // Resource methods
248 "resources/list" => self.handlers.handle_list_resources(request, ctx).await,
249 "resources/read" => self.handlers.handle_read_resource(request, ctx).await,
250 "resources/subscribe" => self.handlers.handle_subscribe_resource(request, ctx).await,
251 "resources/unsubscribe" => {
252 self.handlers
253 .handle_unsubscribe_resource(request, ctx)
254 .await
255 }
256
257 // Logging methods
258 "logging/setLevel" => self.handlers.handle_set_log_level(request, ctx).await,
259
260 // Sampling methods
261 "sampling/createMessage" => self.handlers.handle_create_message(request, ctx).await,
262
263 // Roots methods
264 "roots/list" => self.handlers.handle_list_roots(request, ctx).await,
265
266 // Enhanced MCP features (MCP 2025-06-18 protocol methods)
267 "elicitation/create" => self.handlers.handle_elicitation(request, ctx).await,
268 "completion/complete" => self.handlers.handle_completion(request, ctx).await,
269 "resources/templates/list" => {
270 self.handlers
271 .handle_list_resource_templates(request, ctx)
272 .await
273 }
274 "ping" => self.handlers.handle_ping(request, ctx).await,
275
276 // Custom routes
277 method => {
278 if let Some(handler) = self.custom_routes.get(method) {
279 let request_clone = request.clone();
280 handler
281 .handle(request, ctx)
282 .await
283 .unwrap_or_else(|e| error_response(&request_clone, e))
284 } else {
285 method_not_found_response(&request)
286 }
287 }
288 };
289
290 // Validate response if enabled
291 if self.config.validate_responses
292 && let Err(e) = validate_response(&result)
293 {
294 warn!("Response validation failed: {}", e);
295 }
296
297 result
298 }
299
300 /// Handle batch requests
301 pub async fn route_batch(
302 &self,
303 requests: Vec<JsonRpcRequest>,
304 ctx: RequestContext,
305 ) -> Vec<JsonRpcResponse> {
306 // Note: Server capabilities are injected in route() for each request
307 let max_in_flight = self.config.max_concurrent_requests.max(1);
308 stream::iter(requests.into_iter())
309 .map(|req| {
310 let ctx_cloned = ctx.clone();
311 async move { self.route(req, ctx_cloned).await }
312 })
313 .buffer_unordered(max_in_flight)
314 .collect()
315 .await
316 }
317
318 /// Send an elicitation request to the client (server-initiated)
319 ///
320 /// # Errors
321 ///
322 /// Returns [`ServerError::Transport`] if:
323 /// - The bidirectional dispatcher is not configured
324 /// - The client request fails
325 /// - The client does not respond
326 pub async fn send_elicitation_to_client(
327 &self,
328 request: ElicitRequest,
329 ctx: RequestContext,
330 ) -> ServerResult<ElicitResult> {
331 self.bidirectional
332 .send_elicitation_to_client(request, ctx)
333 .await
334 }
335
336 /// Send a ping request to the client (server-initiated)
337 ///
338 /// # Errors
339 ///
340 /// Returns [`ServerError::Transport`] if:
341 /// - The bidirectional dispatcher is not configured
342 /// - The client request fails
343 /// - The client does not respond
344 pub async fn send_ping_to_client(
345 &self,
346 request: PingRequest,
347 ctx: RequestContext,
348 ) -> ServerResult<PingResult> {
349 self.bidirectional.send_ping_to_client(request, ctx).await
350 }
351
352 /// Send a create message request to the client (server-initiated)
353 ///
354 /// # Errors
355 ///
356 /// Returns [`ServerError::Transport`] if:
357 /// - The bidirectional dispatcher is not configured
358 /// - The client request fails
359 /// - The client does not support sampling
360 pub async fn send_create_message_to_client(
361 &self,
362 request: CreateMessageRequest,
363 ctx: RequestContext,
364 ) -> ServerResult<turbomcp_protocol::types::CreateMessageResult> {
365 self.bidirectional
366 .send_create_message_to_client(request, ctx)
367 .await
368 }
369
370 /// Send a list roots request to the client (server-initiated)
371 ///
372 /// # Errors
373 ///
374 /// Returns [`ServerError::Transport`] if:
375 /// - The bidirectional dispatcher is not configured
376 /// - The client request fails
377 /// - The client does not support roots
378 pub async fn send_list_roots_to_client(
379 &self,
380 request: turbomcp_protocol::types::ListRootsRequest,
381 ctx: RequestContext,
382 ) -> ServerResult<ListRootsResult> {
383 self.bidirectional
384 .send_list_roots_to_client(request, ctx)
385 .await
386 }
387}
388
389impl Clone for RequestRouter {
390 fn clone(&self) -> Self {
391 Self {
392 registry: Arc::clone(&self.registry),
393 config: self.config.clone(),
394 server_config: self.server_config.clone(),
395 custom_routes: self.custom_routes.clone(),
396 resource_subscriptions: DashMap::new(),
397 bidirectional: self.bidirectional.clone(),
398 handlers: ProtocolHandlers::new(HandlerContext::new(
399 Arc::clone(&self.registry),
400 self.server_config.clone(),
401 )),
402 server_to_client: Arc::clone(&self.server_to_client),
403 }
404 }
405}
406
407// Design Note: ServerCapabilities trait implementation
408//
409// RequestRouter currently uses BidirectionalRouter for server-initiated requests
410// (sampling, elicitation, roots) instead of directly implementing the ServerCapabilities
411// trait from turbomcp_protocol::context::capabilities.
412//
413// Current Pattern:
414// - RequestRouter contains BidirectionalRouter which handles server-to-client requests
415// - BidirectionalRouter uses ServerRequestDispatcher trait for transport-agnostic dispatch
416// - This pattern provides better separation of concerns and testability
417//
418// Alternative (not implemented):
419// - RequestRouter could implement ServerCapabilities trait directly
420// - This would allow passing router as &dyn ServerCapabilities to tools
421// - Current pattern is preferred as it keeps routing and bidirectional concerns separate
422//
423// See: crates/turbomcp-server/src/routing/bidirectional.rs for current implementation
424
425/// Router alias for convenience
426pub type Router = RequestRouter;
427
428// ===================================================================
429// JsonRpcHandler Implementation - For HTTP Transport Integration
430// ===================================================================
431
432#[async_trait::async_trait]
433impl turbomcp_protocol::JsonRpcHandler for RequestRouter {
434 /// Handle a JSON-RPC request via the HTTP transport
435 ///
436 /// This implementation enables `RequestRouter` to be used directly with
437 /// the HTTP transport layer (`run_server`), supporting the builder pattern
438 /// for programmatic server construction.
439 ///
440 /// # Architecture
441 ///
442 /// - Parses raw JSON into `JsonRpcRequest`
443 /// - Creates default `RequestContext` (no auth/session for HTTP)
444 /// - Routes through the existing `route()` method
445 /// - Serializes `JsonRpcResponse` back to JSON
446 ///
447 /// This provides the same request handling as the macro pattern but
448 /// allows runtime handler registration via `ServerBuilder`.
449 async fn handle_request(&self, req_value: serde_json::Value) -> serde_json::Value {
450 // Parse the request
451 let req: JsonRpcRequest = match serde_json::from_value(req_value) {
452 Ok(r) => r,
453 Err(e) => {
454 return serde_json::json!({
455 "jsonrpc": "2.0",
456 "error": {
457 "code": -32700,
458 "message": format!("Parse error: {}", e)
459 },
460 "id": null
461 });
462 }
463 };
464
465 // Create properly configured context with server-to-client capabilities
466 // Note: For authenticated HTTP requests, middleware should add auth info via with_* methods
467 let ctx = self.create_context();
468
469 // Route the request through the standard routing system
470 let response = self.route(req, ctx).await;
471
472 // Serialize response
473 match serde_json::to_value(&response) {
474 Ok(v) => v,
475 Err(e) => {
476 serde_json::json!({
477 "jsonrpc": "2.0",
478 "error": {
479 "code": -32603,
480 "message": format!("Internal error: failed to serialize response: {}", e)
481 },
482 "id": response.id
483 })
484 }
485 }
486 }
487}
488
489// Comprehensive tests in separate file (tokio/axum pattern)
490#[cfg(test)]
491mod tests;