turbomcp_server/routing/
mod.rs

1//! Request routing and handler dispatch system
2//!
3//! This module provides a comprehensive routing system for MCP protocol requests,
4//! supporting all standard MCP methods with enterprise features like RBAC,
5//! JSON Schema validation, timeout management, and bidirectional communication.
6
7mod bidirectional;
8mod config;
9mod handlers;
10mod traits;
11mod utils;
12mod validation;
13
14// Re-export public types to maintain API compatibility
15pub use bidirectional::BidirectionalRouter;
16pub use config::RouterConfig;
17pub use traits::{Route, RouteHandler, RouteMetadata, ServerRequestDispatcher};
18
19use dashmap::DashMap;
20use futures::stream::{self, StreamExt};
21use std::collections::HashMap;
22use std::sync::Arc;
23use tracing::warn;
24use turbomcp_protocol::RequestContext;
25use turbomcp_protocol::{
26    jsonrpc::{JsonRpcRequest, JsonRpcResponse},
27    types::{
28        CreateMessageRequest, ElicitRequest, ElicitResult, ListRootsResult, PingRequest, PingResult,
29    },
30};
31
32use crate::capabilities::ServerToClientAdapter;
33use crate::metrics::ServerMetrics;
34use crate::registry::HandlerRegistry;
35use crate::{ServerError, ServerResult};
36
37use handlers::{HandlerContext, ProtocolHandlers};
38use turbomcp_protocol::context::capabilities::ServerToClientRequests;
39use utils::{error_response, method_not_found_response};
40use validation::{validate_request, validate_response};
41
42/// Request router for dispatching MCP requests to appropriate handlers
43pub struct RequestRouter {
44    /// Handler registry
45    registry: Arc<HandlerRegistry>,
46    /// Route configuration
47    config: RouterConfig,
48    /// Custom route handlers
49    custom_routes: HashMap<String, Arc<dyn RouteHandler>>,
50    /// Resource subscription counters by URI (reserved for future functionality)
51    #[allow(dead_code)]
52    resource_subscriptions: DashMap<String, usize>,
53    /// Bidirectional communication router
54    bidirectional: BidirectionalRouter,
55    /// Protocol handlers
56    handlers: ProtocolHandlers,
57    /// Server-to-client requests adapter for tool-initiated requests (sampling, elicitation, roots)
58    /// This is injected into RequestContext so tools can make server-initiated requests
59    server_to_client: Arc<dyn ServerToClientRequests>,
60}
61
62impl std::fmt::Debug for RequestRouter {
63    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64        f.debug_struct("RequestRouter")
65            .field("config", &self.config)
66            .field("custom_routes_count", &self.custom_routes.len())
67            .finish()
68    }
69}
70
71impl RequestRouter {
72    /// Create a new request router
73    #[must_use]
74    pub fn new(registry: Arc<HandlerRegistry>, _metrics: Arc<ServerMetrics>) -> Self {
75        // Timeout management is now handled by middleware
76        let config = RouterConfig::default();
77
78        let handler_context = HandlerContext::new(Arc::clone(&registry));
79
80        let bidirectional = BidirectionalRouter::new();
81
82        // Create the server-to-client adapter that bridges bidirectional router
83        // to the ServerToClientRequests trait (type-safe, zero-cost abstraction)
84        let server_to_client: Arc<dyn ServerToClientRequests> =
85            Arc::new(ServerToClientAdapter::new(bidirectional.clone()));
86
87        Self {
88            registry,
89            config,
90            custom_routes: HashMap::new(),
91            resource_subscriptions: DashMap::new(),
92            bidirectional,
93            handlers: ProtocolHandlers::new(handler_context),
94            server_to_client,
95        }
96    }
97
98    /// Create a router with configuration
99    #[must_use]
100    pub fn with_config(
101        registry: Arc<HandlerRegistry>,
102        config: RouterConfig,
103        _metrics: Arc<ServerMetrics>,
104    ) -> Self {
105        // Timeout management is now handled by middleware
106
107        let handler_context = HandlerContext::new(Arc::clone(&registry));
108
109        let bidirectional = BidirectionalRouter::new();
110
111        // Create the server-to-client adapter that bridges bidirectional router
112        // to the ServerToClientRequests trait (type-safe, zero-cost abstraction)
113        let server_to_client: Arc<dyn ServerToClientRequests> =
114            Arc::new(ServerToClientAdapter::new(bidirectional.clone()));
115
116        Self {
117            registry,
118            config,
119            custom_routes: HashMap::new(),
120            resource_subscriptions: DashMap::new(),
121            bidirectional,
122            handlers: ProtocolHandlers::new(handler_context),
123            server_to_client,
124        }
125    }
126
127    // Timeout configuration now handled by middleware - no longer needed
128
129    /// Set the server request dispatcher for bidirectional communication
130    ///
131    /// CRITICAL: This also refreshes the server_to_client adapter so it sees the new dispatcher.
132    /// Without this refresh, the adapter would still point to the old (empty) bidirectional router.
133    pub fn set_server_request_dispatcher<D>(&mut self, dispatcher: D)
134    where
135        D: ServerRequestDispatcher + 'static,
136    {
137        self.bidirectional.set_dispatcher(dispatcher);
138
139        // CRITICAL FIX: Recreate the adapter so it sees the new dispatcher
140        // The adapter was created with a clone of bidirectional BEFORE the dispatcher was set.
141        // Since BidirectionalRouter::set_dispatcher() replaces the Option rather than mutating
142        // through it, the adapter's clone still has dispatcher: None.
143        // By recreating it here, we ensure it gets a fresh clone that includes the dispatcher.
144        self.server_to_client = Arc::new(ServerToClientAdapter::new(self.bidirectional.clone()));
145    }
146
147    /// Get the server request dispatcher
148    pub fn get_server_request_dispatcher(&self) -> Option<&Arc<dyn ServerRequestDispatcher>> {
149        self.bidirectional.get_dispatcher()
150    }
151
152    /// Check if bidirectional routing is enabled and supported
153    pub fn supports_bidirectional(&self) -> bool {
154        self.config.enable_bidirectional && self.bidirectional.supports_bidirectional()
155    }
156
157    /// Add a custom route handler
158    ///
159    /// # Errors
160    ///
161    /// Returns [`ServerError::Routing`] if a route for the same method already exists.
162    pub fn add_route<H>(&mut self, handler: H) -> ServerResult<()>
163    where
164        H: RouteHandler + 'static,
165    {
166        let metadata = handler.metadata();
167        let handler_arc: Arc<dyn RouteHandler> = Arc::new(handler);
168
169        for method in &metadata.methods {
170            if self.custom_routes.contains_key(method) {
171                return Err(ServerError::routing_with_method(
172                    format!("Route for method '{method}' already exists"),
173                    method.clone(),
174                ));
175            }
176            self.custom_routes
177                .insert(method.clone(), Arc::clone(&handler_arc));
178        }
179
180        Ok(())
181    }
182
183    /// Create a properly configured RequestContext for this router
184    ///
185    /// This factory method creates a RequestContext with all necessary capabilities
186    /// pre-configured, including server-to-client communication for bidirectional
187    /// features (sampling, elicitation, roots).
188    ///
189    /// **Design Pattern**: Explicit Factory
190    /// - Context is valid from creation (no broken intermediate state)
191    /// - Router provides factory but doesn't modify contexts
192    /// - Follows Single Responsibility Principle
193    ///
194    /// # Example
195    /// ```rust,ignore
196    /// let ctx = router.create_context();
197    /// let response = router.route(request, ctx).await;
198    /// ```
199    #[must_use]
200    pub fn create_context(&self) -> RequestContext {
201        RequestContext::new().with_server_to_client(Arc::clone(&self.server_to_client))
202    }
203
204    /// Route a JSON-RPC request to the appropriate handler
205    ///
206    /// **IMPORTANT**: The context should be created using `create_context()` to ensure
207    /// it has all necessary capabilities configured. This method does NOT modify the
208    /// context - it only routes the request.
209    ///
210    /// # Design Pattern
211    /// This follows the Single Responsibility Principle:
212    /// - `create_context()`: Creates properly configured contexts
213    /// - `route()`: Routes requests to handlers
214    ///
215    /// Previously, `route()` was modifying the context (adding server_to_client),
216    /// which violated SRP and created invalid intermediate states.
217    pub async fn route(&self, request: JsonRpcRequest, ctx: RequestContext) -> JsonRpcResponse {
218        // Validate request if enabled
219        if self.config.validate_requests
220            && let Err(e) = validate_request(&request)
221        {
222            return error_response(&request, e);
223        }
224
225        // Handle the request
226        let result = match request.method.as_str() {
227            // Core protocol methods
228            "initialize" => self.handlers.handle_initialize(request, ctx).await,
229
230            // Tool methods
231            "tools/list" => self.handlers.handle_list_tools(request, ctx).await,
232            "tools/call" => self.handlers.handle_call_tool(request, ctx).await,
233
234            // Prompt methods
235            "prompts/list" => self.handlers.handle_list_prompts(request, ctx).await,
236            "prompts/get" => self.handlers.handle_get_prompt(request, ctx).await,
237
238            // Resource methods
239            "resources/list" => self.handlers.handle_list_resources(request, ctx).await,
240            "resources/read" => self.handlers.handle_read_resource(request, ctx).await,
241            "resources/subscribe" => self.handlers.handle_subscribe_resource(request, ctx).await,
242            "resources/unsubscribe" => {
243                self.handlers
244                    .handle_unsubscribe_resource(request, ctx)
245                    .await
246            }
247
248            // Logging methods
249            "logging/setLevel" => self.handlers.handle_set_log_level(request, ctx).await,
250
251            // Sampling methods
252            "sampling/createMessage" => self.handlers.handle_create_message(request, ctx).await,
253
254            // Roots methods
255            "roots/list" => self.handlers.handle_list_roots(request, ctx).await,
256
257            // Enhanced MCP features (MCP 2025-06-18 protocol methods)
258            "elicitation/create" => self.handlers.handle_elicitation(request, ctx).await,
259            "completion/complete" => self.handlers.handle_completion(request, ctx).await,
260            "resources/templates/list" => {
261                self.handlers
262                    .handle_list_resource_templates(request, ctx)
263                    .await
264            }
265            "ping" => self.handlers.handle_ping(request, ctx).await,
266
267            // Custom routes
268            method => {
269                if let Some(handler) = self.custom_routes.get(method) {
270                    let request_clone = request.clone();
271                    handler
272                        .handle(request, ctx)
273                        .await
274                        .unwrap_or_else(|e| error_response(&request_clone, e))
275                } else {
276                    method_not_found_response(&request)
277                }
278            }
279        };
280
281        // Validate response if enabled
282        if self.config.validate_responses
283            && let Err(e) = validate_response(&result)
284        {
285            warn!("Response validation failed: {}", e);
286        }
287
288        result
289    }
290
291    /// Handle batch requests
292    pub async fn route_batch(
293        &self,
294        requests: Vec<JsonRpcRequest>,
295        ctx: RequestContext,
296    ) -> Vec<JsonRpcResponse> {
297        // Note: Server capabilities are injected in route() for each request
298        let max_in_flight = self.config.max_concurrent_requests.max(1);
299        stream::iter(requests.into_iter())
300            .map(|req| {
301                let ctx_cloned = ctx.clone();
302                async move { self.route(req, ctx_cloned).await }
303            })
304            .buffer_unordered(max_in_flight)
305            .collect()
306            .await
307    }
308
309    /// Send an elicitation request to the client (server-initiated)
310    ///
311    /// # Errors
312    ///
313    /// Returns [`ServerError::Transport`] if:
314    /// - The bidirectional dispatcher is not configured
315    /// - The client request fails
316    /// - The client does not respond
317    pub async fn send_elicitation_to_client(
318        &self,
319        request: ElicitRequest,
320        ctx: RequestContext,
321    ) -> ServerResult<ElicitResult> {
322        self.bidirectional
323            .send_elicitation_to_client(request, ctx)
324            .await
325    }
326
327    /// Send a ping request to the client (server-initiated)
328    ///
329    /// # Errors
330    ///
331    /// Returns [`ServerError::Transport`] if:
332    /// - The bidirectional dispatcher is not configured
333    /// - The client request fails
334    /// - The client does not respond
335    pub async fn send_ping_to_client(
336        &self,
337        request: PingRequest,
338        ctx: RequestContext,
339    ) -> ServerResult<PingResult> {
340        self.bidirectional.send_ping_to_client(request, ctx).await
341    }
342
343    /// Send a create message request to the client (server-initiated)
344    ///
345    /// # Errors
346    ///
347    /// Returns [`ServerError::Transport`] if:
348    /// - The bidirectional dispatcher is not configured
349    /// - The client request fails
350    /// - The client does not support sampling
351    pub async fn send_create_message_to_client(
352        &self,
353        request: CreateMessageRequest,
354        ctx: RequestContext,
355    ) -> ServerResult<turbomcp_protocol::types::CreateMessageResult> {
356        self.bidirectional
357            .send_create_message_to_client(request, ctx)
358            .await
359    }
360
361    /// Send a list roots request to the client (server-initiated)
362    ///
363    /// # Errors
364    ///
365    /// Returns [`ServerError::Transport`] if:
366    /// - The bidirectional dispatcher is not configured
367    /// - The client request fails
368    /// - The client does not support roots
369    pub async fn send_list_roots_to_client(
370        &self,
371        request: turbomcp_protocol::types::ListRootsRequest,
372        ctx: RequestContext,
373    ) -> ServerResult<ListRootsResult> {
374        self.bidirectional
375            .send_list_roots_to_client(request, ctx)
376            .await
377    }
378}
379
380impl Clone for RequestRouter {
381    fn clone(&self) -> Self {
382        Self {
383            registry: Arc::clone(&self.registry),
384            config: self.config.clone(),
385            custom_routes: self.custom_routes.clone(),
386            resource_subscriptions: DashMap::new(),
387            bidirectional: self.bidirectional.clone(),
388            handlers: ProtocolHandlers::new(HandlerContext::new(Arc::clone(&self.registry))),
389            server_to_client: Arc::clone(&self.server_to_client),
390        }
391    }
392}
393
394// Design Note: ServerCapabilities trait implementation
395//
396// RequestRouter currently uses BidirectionalRouter for server-initiated requests
397// (sampling, elicitation, roots) instead of directly implementing the ServerCapabilities
398// trait from turbomcp_protocol::context::capabilities.
399//
400// Current Pattern:
401// - RequestRouter contains BidirectionalRouter which handles server-to-client requests
402// - BidirectionalRouter uses ServerRequestDispatcher trait for transport-agnostic dispatch
403// - This pattern provides better separation of concerns and testability
404//
405// Alternative (not implemented):
406// - RequestRouter could implement ServerCapabilities trait directly
407// - This would allow passing router as &dyn ServerCapabilities to tools
408// - Current pattern is preferred as it keeps routing and bidirectional concerns separate
409//
410// See: crates/turbomcp-server/src/routing/bidirectional.rs for current implementation
411
412/// Router alias for convenience
413pub type Router = RequestRouter;
414
415// ===================================================================
416// JsonRpcHandler Implementation - For HTTP Transport Integration
417// ===================================================================
418
419#[async_trait::async_trait]
420impl turbomcp_protocol::JsonRpcHandler for RequestRouter {
421    /// Handle a JSON-RPC request via the HTTP transport
422    ///
423    /// This implementation enables `RequestRouter` to be used directly with
424    /// the HTTP transport layer (`run_server`), supporting the builder pattern
425    /// for programmatic server construction.
426    ///
427    /// # Architecture
428    ///
429    /// - Parses raw JSON into `JsonRpcRequest`
430    /// - Creates default `RequestContext` (no auth/session for HTTP)
431    /// - Routes through the existing `route()` method
432    /// - Serializes `JsonRpcResponse` back to JSON
433    ///
434    /// This provides the same request handling as the macro pattern but
435    /// allows runtime handler registration via `ServerBuilder`.
436    async fn handle_request(&self, req_value: serde_json::Value) -> serde_json::Value {
437        // Parse the request
438        let req: JsonRpcRequest = match serde_json::from_value(req_value) {
439            Ok(r) => r,
440            Err(e) => {
441                return serde_json::json!({
442                    "jsonrpc": "2.0",
443                    "error": {
444                        "code": -32700,
445                        "message": format!("Parse error: {}", e)
446                    },
447                    "id": null
448                });
449            }
450        };
451
452        // Create properly configured context with server-to-client capabilities
453        // Note: For authenticated HTTP requests, middleware should add auth info via with_* methods
454        let ctx = self.create_context();
455
456        // Route the request through the standard routing system
457        let response = self.route(req, ctx).await;
458
459        // Serialize response
460        match serde_json::to_value(&response) {
461            Ok(v) => v,
462            Err(e) => {
463                serde_json::json!({
464                    "jsonrpc": "2.0",
465                    "error": {
466                        "code": -32603,
467                        "message": format!("Internal error: failed to serialize response: {}", e)
468                    },
469                    "id": response.id
470                })
471            }
472        }
473    }
474}
475
476// Comprehensive tests in separate file (tokio/axum pattern)
477#[cfg(test)]
478mod tests;