turbomcp_server/routing/
mod.rs

1//! Request routing and handler dispatch system
2//!
3//! This module provides a comprehensive routing system for MCP protocol requests,
4//! supporting all standard MCP methods with enterprise features like RBAC,
5//! JSON Schema validation, timeout management, and bidirectional communication.
6
7mod bidirectional;
8mod config;
9mod handlers;
10mod traits;
11mod utils;
12mod validation;
13
14// Re-export public types to maintain API compatibility
15pub use bidirectional::BidirectionalRouter;
16pub use config::RouterConfig;
17pub use traits::{Route, RouteHandler, RouteMetadata, ServerRequestDispatcher};
18
19use dashmap::DashMap;
20use futures::stream::{self, StreamExt};
21use std::collections::HashMap;
22use std::sync::Arc;
23use tracing::warn;
24use turbomcp_protocol::RequestContext;
25use turbomcp_protocol::{
26    jsonrpc::{JsonRpcRequest, JsonRpcResponse},
27    types::{
28        CreateMessageRequest, ElicitRequest, ElicitResult, ListRootsResult, PingRequest, PingResult,
29    },
30};
31
32use crate::capabilities::ServerToClientAdapter;
33use crate::metrics::ServerMetrics;
34use crate::registry::HandlerRegistry;
35use crate::{ServerError, ServerResult};
36
37use handlers::{HandlerContext, ProtocolHandlers};
38use turbomcp_protocol::context::capabilities::ServerToClientRequests;
39use utils::{error_response, method_not_found_response};
40use validation::{validate_request, validate_response};
41
42/// Request router for dispatching MCP requests to appropriate handlers
43pub struct RequestRouter {
44    /// Handler registry
45    registry: Arc<HandlerRegistry>,
46    /// Route configuration
47    config: RouterConfig,
48    /// Server configuration (for protocol responses)
49    server_config: crate::config::ServerConfig,
50    /// Custom route handlers
51    custom_routes: HashMap<String, Arc<dyn RouteHandler>>,
52    /// Resource subscription counters by URI (reserved for future functionality)
53    #[allow(dead_code)]
54    resource_subscriptions: DashMap<String, usize>,
55    /// Bidirectional communication router
56    bidirectional: BidirectionalRouter,
57    /// Protocol handlers
58    handlers: ProtocolHandlers,
59    /// Server-to-client requests adapter for tool-initiated requests (sampling, elicitation, roots)
60    /// This is injected into RequestContext so tools can make server-initiated requests
61    server_to_client: Arc<dyn ServerToClientRequests>,
62}
63
64impl std::fmt::Debug for RequestRouter {
65    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66        f.debug_struct("RequestRouter")
67            .field("config", &self.config)
68            .field("custom_routes_count", &self.custom_routes.len())
69            .finish()
70    }
71}
72
73impl RequestRouter {
74    /// Create a new request router
75    #[must_use]
76    pub fn new(
77        registry: Arc<HandlerRegistry>,
78        _metrics: Arc<ServerMetrics>,
79        server_config: crate::config::ServerConfig,
80    ) -> Self {
81        // Timeout management is now handled by middleware
82        let config = RouterConfig::default();
83
84        let handler_context = HandlerContext::new(Arc::clone(&registry), server_config.clone());
85
86        let bidirectional = BidirectionalRouter::new();
87
88        // Create the server-to-client adapter that bridges bidirectional router
89        // to the ServerToClientRequests trait (type-safe, zero-cost abstraction)
90        let server_to_client: Arc<dyn ServerToClientRequests> =
91            Arc::new(ServerToClientAdapter::new(bidirectional.clone()));
92
93        Self {
94            registry,
95            config,
96            server_config,
97            custom_routes: HashMap::new(),
98            resource_subscriptions: DashMap::new(),
99            bidirectional,
100            handlers: ProtocolHandlers::new(handler_context),
101            server_to_client,
102        }
103    }
104
105    /// Create a router with configuration
106    #[must_use]
107    pub fn with_config(
108        registry: Arc<HandlerRegistry>,
109        config: RouterConfig,
110        _metrics: Arc<ServerMetrics>,
111        server_config: crate::config::ServerConfig,
112    ) -> Self {
113        // Timeout management is now handled by middleware
114
115        let handler_context = HandlerContext::new(Arc::clone(&registry), server_config.clone());
116
117        let bidirectional = BidirectionalRouter::new();
118
119        // Create the server-to-client adapter that bridges bidirectional router
120        // to the ServerToClientRequests trait (type-safe, zero-cost abstraction)
121        let server_to_client: Arc<dyn ServerToClientRequests> =
122            Arc::new(ServerToClientAdapter::new(bidirectional.clone()));
123
124        Self {
125            registry,
126            config,
127            server_config,
128            custom_routes: HashMap::new(),
129            resource_subscriptions: DashMap::new(),
130            bidirectional,
131            handlers: ProtocolHandlers::new(handler_context),
132            server_to_client,
133        }
134    }
135
136    // Timeout configuration now handled by middleware - no longer needed
137
138    /// Set the server request dispatcher for bidirectional communication
139    ///
140    /// CRITICAL: This also refreshes the server_to_client adapter so it sees the new dispatcher.
141    /// Without this refresh, the adapter would still point to the old (empty) bidirectional router.
142    pub fn set_server_request_dispatcher<D>(&mut self, dispatcher: D)
143    where
144        D: ServerRequestDispatcher + 'static,
145    {
146        self.bidirectional.set_dispatcher(dispatcher);
147
148        // CRITICAL FIX: Recreate the adapter so it sees the new dispatcher
149        // The adapter was created with a clone of bidirectional BEFORE the dispatcher was set.
150        // Since BidirectionalRouter::set_dispatcher() replaces the Option rather than mutating
151        // through it, the adapter's clone still has dispatcher: None.
152        // By recreating it here, we ensure it gets a fresh clone that includes the dispatcher.
153        self.server_to_client = Arc::new(ServerToClientAdapter::new(self.bidirectional.clone()));
154    }
155
156    /// Get the server request dispatcher
157    pub fn get_server_request_dispatcher(&self) -> Option<&Arc<dyn ServerRequestDispatcher>> {
158        self.bidirectional.get_dispatcher()
159    }
160
161    /// Check if bidirectional routing is enabled and supported
162    pub fn supports_bidirectional(&self) -> bool {
163        self.config.enable_bidirectional && self.bidirectional.supports_bidirectional()
164    }
165
166    /// Add a custom route handler
167    ///
168    /// # Errors
169    ///
170    /// Returns [`ServerError::Routing`] if a route for the same method already exists.
171    pub fn add_route<H>(&mut self, handler: H) -> ServerResult<()>
172    where
173        H: RouteHandler + 'static,
174    {
175        let metadata = handler.metadata();
176        let handler_arc: Arc<dyn RouteHandler> = Arc::new(handler);
177
178        for method in &metadata.methods {
179            if self.custom_routes.contains_key(method) {
180                return Err(ServerError::routing_with_method(
181                    format!("Route for method '{method}' already exists"),
182                    method.clone(),
183                ));
184            }
185            self.custom_routes
186                .insert(method.clone(), Arc::clone(&handler_arc));
187        }
188
189        Ok(())
190    }
191
192    /// Create a properly configured RequestContext for this router
193    ///
194    /// This factory method creates a RequestContext with all necessary capabilities
195    /// pre-configured, including server-to-client communication for bidirectional
196    /// features (sampling, elicitation, roots).
197    ///
198    /// **Design Pattern**: Explicit Factory
199    /// - Context is valid from creation (no broken intermediate state)
200    /// - Router provides factory but doesn't modify contexts
201    /// - Follows Single Responsibility Principle
202    ///
203    /// # Example
204    /// ```rust,ignore
205    /// let ctx = router.create_context();
206    /// let response = router.route(request, ctx).await;
207    /// ```
208    #[must_use]
209    pub fn create_context(&self) -> RequestContext {
210        RequestContext::new().with_server_to_client(Arc::clone(&self.server_to_client))
211    }
212
213    /// Route a JSON-RPC request to the appropriate handler
214    ///
215    /// **IMPORTANT**: The context should be created using `create_context()` to ensure
216    /// it has all necessary capabilities configured. This method does NOT modify the
217    /// context - it only routes the request.
218    ///
219    /// # Design Pattern
220    /// This follows the Single Responsibility Principle:
221    /// - `create_context()`: Creates properly configured contexts
222    /// - `route()`: Routes requests to handlers
223    ///
224    /// Previously, `route()` was modifying the context (adding server_to_client),
225    /// which violated SRP and created invalid intermediate states.
226    pub async fn route(&self, request: JsonRpcRequest, ctx: RequestContext) -> JsonRpcResponse {
227        // Validate request if enabled
228        if self.config.validate_requests
229            && let Err(e) = validate_request(&request)
230        {
231            return error_response(&request, e);
232        }
233
234        // Handle the request
235        let result = match request.method.as_str() {
236            // Core protocol methods
237            "initialize" => self.handlers.handle_initialize(request, ctx).await,
238
239            // Tool methods
240            "tools/list" => self.handlers.handle_list_tools(request, ctx).await,
241            "tools/call" => self.handlers.handle_call_tool(request, ctx).await,
242
243            // Prompt methods
244            "prompts/list" => self.handlers.handle_list_prompts(request, ctx).await,
245            "prompts/get" => self.handlers.handle_get_prompt(request, ctx).await,
246
247            // Resource methods
248            "resources/list" => self.handlers.handle_list_resources(request, ctx).await,
249            "resources/read" => self.handlers.handle_read_resource(request, ctx).await,
250            "resources/subscribe" => self.handlers.handle_subscribe_resource(request, ctx).await,
251            "resources/unsubscribe" => {
252                self.handlers
253                    .handle_unsubscribe_resource(request, ctx)
254                    .await
255            }
256
257            // Logging methods
258            "logging/setLevel" => self.handlers.handle_set_log_level(request, ctx).await,
259
260            // Sampling methods
261            "sampling/createMessage" => self.handlers.handle_create_message(request, ctx).await,
262
263            // Roots methods
264            "roots/list" => self.handlers.handle_list_roots(request, ctx).await,
265
266            // Enhanced MCP features (MCP 2025-06-18 protocol methods)
267            "elicitation/create" => self.handlers.handle_elicitation(request, ctx).await,
268            "completion/complete" => self.handlers.handle_completion(request, ctx).await,
269            "resources/templates/list" => {
270                self.handlers
271                    .handle_list_resource_templates(request, ctx)
272                    .await
273            }
274            "ping" => self.handlers.handle_ping(request, ctx).await,
275
276            // Custom routes
277            method => {
278                if let Some(handler) = self.custom_routes.get(method) {
279                    let request_clone = request.clone();
280                    handler
281                        .handle(request, ctx)
282                        .await
283                        .unwrap_or_else(|e| error_response(&request_clone, e))
284                } else {
285                    method_not_found_response(&request)
286                }
287            }
288        };
289
290        // Validate response if enabled
291        if self.config.validate_responses
292            && let Err(e) = validate_response(&result)
293        {
294            warn!("Response validation failed: {}", e);
295        }
296
297        result
298    }
299
300    /// Handle batch requests
301    pub async fn route_batch(
302        &self,
303        requests: Vec<JsonRpcRequest>,
304        ctx: RequestContext,
305    ) -> Vec<JsonRpcResponse> {
306        // Note: Server capabilities are injected in route() for each request
307        let max_in_flight = self.config.max_concurrent_requests.max(1);
308        stream::iter(requests.into_iter())
309            .map(|req| {
310                let ctx_cloned = ctx.clone();
311                async move { self.route(req, ctx_cloned).await }
312            })
313            .buffer_unordered(max_in_flight)
314            .collect()
315            .await
316    }
317
318    /// Send an elicitation request to the client (server-initiated)
319    ///
320    /// # Errors
321    ///
322    /// Returns [`ServerError::Transport`] if:
323    /// - The bidirectional dispatcher is not configured
324    /// - The client request fails
325    /// - The client does not respond
326    pub async fn send_elicitation_to_client(
327        &self,
328        request: ElicitRequest,
329        ctx: RequestContext,
330    ) -> ServerResult<ElicitResult> {
331        self.bidirectional
332            .send_elicitation_to_client(request, ctx)
333            .await
334    }
335
336    /// Send a ping request to the client (server-initiated)
337    ///
338    /// # Errors
339    ///
340    /// Returns [`ServerError::Transport`] if:
341    /// - The bidirectional dispatcher is not configured
342    /// - The client request fails
343    /// - The client does not respond
344    pub async fn send_ping_to_client(
345        &self,
346        request: PingRequest,
347        ctx: RequestContext,
348    ) -> ServerResult<PingResult> {
349        self.bidirectional.send_ping_to_client(request, ctx).await
350    }
351
352    /// Send a create message request to the client (server-initiated)
353    ///
354    /// # Errors
355    ///
356    /// Returns [`ServerError::Transport`] if:
357    /// - The bidirectional dispatcher is not configured
358    /// - The client request fails
359    /// - The client does not support sampling
360    pub async fn send_create_message_to_client(
361        &self,
362        request: CreateMessageRequest,
363        ctx: RequestContext,
364    ) -> ServerResult<turbomcp_protocol::types::CreateMessageResult> {
365        self.bidirectional
366            .send_create_message_to_client(request, ctx)
367            .await
368    }
369
370    /// Send a list roots request to the client (server-initiated)
371    ///
372    /// # Errors
373    ///
374    /// Returns [`ServerError::Transport`] if:
375    /// - The bidirectional dispatcher is not configured
376    /// - The client request fails
377    /// - The client does not support roots
378    pub async fn send_list_roots_to_client(
379        &self,
380        request: turbomcp_protocol::types::ListRootsRequest,
381        ctx: RequestContext,
382    ) -> ServerResult<ListRootsResult> {
383        self.bidirectional
384            .send_list_roots_to_client(request, ctx)
385            .await
386    }
387}
388
389impl Clone for RequestRouter {
390    fn clone(&self) -> Self {
391        Self {
392            registry: Arc::clone(&self.registry),
393            config: self.config.clone(),
394            server_config: self.server_config.clone(),
395            custom_routes: self.custom_routes.clone(),
396            resource_subscriptions: DashMap::new(),
397            bidirectional: self.bidirectional.clone(),
398            handlers: ProtocolHandlers::new(HandlerContext::new(
399                Arc::clone(&self.registry),
400                self.server_config.clone(),
401            )),
402            server_to_client: Arc::clone(&self.server_to_client),
403        }
404    }
405}
406
407// Design Note: ServerCapabilities trait implementation
408//
409// RequestRouter currently uses BidirectionalRouter for server-initiated requests
410// (sampling, elicitation, roots) instead of directly implementing the ServerCapabilities
411// trait from turbomcp_protocol::context::capabilities.
412//
413// Current Pattern:
414// - RequestRouter contains BidirectionalRouter which handles server-to-client requests
415// - BidirectionalRouter uses ServerRequestDispatcher trait for transport-agnostic dispatch
416// - This pattern provides better separation of concerns and testability
417//
418// Alternative (not implemented):
419// - RequestRouter could implement ServerCapabilities trait directly
420// - This would allow passing router as &dyn ServerCapabilities to tools
421// - Current pattern is preferred as it keeps routing and bidirectional concerns separate
422//
423// See: crates/turbomcp-server/src/routing/bidirectional.rs for current implementation
424
425/// Router alias for convenience
426pub type Router = RequestRouter;
427
428// ===================================================================
429// JsonRpcHandler Implementation - For HTTP Transport Integration
430// ===================================================================
431
432#[async_trait::async_trait]
433impl turbomcp_protocol::JsonRpcHandler for RequestRouter {
434    /// Handle a JSON-RPC request via the HTTP transport
435    ///
436    /// This implementation enables `RequestRouter` to be used directly with
437    /// the HTTP transport layer (`run_server`), supporting the builder pattern
438    /// for programmatic server construction.
439    ///
440    /// # Architecture
441    ///
442    /// - Parses raw JSON into `JsonRpcRequest`
443    /// - Creates default `RequestContext` (no auth/session for HTTP)
444    /// - Routes through the existing `route()` method
445    /// - Serializes `JsonRpcResponse` back to JSON
446    ///
447    /// This provides the same request handling as the macro pattern but
448    /// allows runtime handler registration via `ServerBuilder`.
449    async fn handle_request(&self, req_value: serde_json::Value) -> serde_json::Value {
450        // Parse the request
451        let req: JsonRpcRequest = match serde_json::from_value(req_value) {
452            Ok(r) => r,
453            Err(e) => {
454                return serde_json::json!({
455                    "jsonrpc": "2.0",
456                    "error": {
457                        "code": -32700,
458                        "message": format!("Parse error: {}", e)
459                    },
460                    "id": null
461                });
462            }
463        };
464
465        // Create properly configured context with server-to-client capabilities
466        // Note: For authenticated HTTP requests, middleware should add auth info via with_* methods
467        let ctx = self.create_context();
468
469        // Route the request through the standard routing system
470        let response = self.route(req, ctx).await;
471
472        // Serialize response
473        match serde_json::to_value(&response) {
474            Ok(v) => v,
475            Err(e) => {
476                serde_json::json!({
477                    "jsonrpc": "2.0",
478                    "error": {
479                        "code": -32603,
480                        "message": format!("Internal error: failed to serialize response: {}", e)
481                    },
482                    "id": response.id
483                })
484            }
485        }
486    }
487}
488
489// Comprehensive tests in separate file (tokio/axum pattern)
490#[cfg(test)]
491mod tests;