turbomcp_server/routing/
mod.rs

1//! Request routing and handler dispatch system
2//!
3//! This module provides a comprehensive routing system for MCP protocol requests,
4//! supporting all standard MCP methods with enterprise features like RBAC,
5//! JSON Schema validation, timeout management, and bidirectional communication.
6
7mod bidirectional;
8mod config;
9mod handlers;
10mod traits;
11mod utils;
12mod validation;
13
14// Re-export public types to maintain API compatibility
15pub use bidirectional::BidirectionalRouter;
16pub use config::RouterConfig;
17pub use traits::{Route, RouteHandler, RouteMetadata, ServerRequestDispatcher};
18
19use dashmap::DashMap;
20use futures::stream::{self, StreamExt};
21use std::collections::HashMap;
22use std::sync::Arc;
23use tracing::warn;
24use turbomcp_protocol::RequestContext;
25use turbomcp_protocol::{
26    jsonrpc::{JsonRpcRequest, JsonRpcResponse},
27    types::{
28        CreateMessageRequest, ElicitRequest, ElicitResult, ListRootsResult, PingRequest, PingResult,
29    },
30};
31
32use crate::capabilities::ServerToClientAdapter;
33use crate::metrics::ServerMetrics;
34use crate::registry::HandlerRegistry;
35use crate::{ServerError, ServerResult};
36
37use handlers::{HandlerContext, ProtocolHandlers};
38use turbomcp_protocol::context::capabilities::ServerToClientRequests;
39use utils::{error_response, method_not_found_response};
40use validation::{validate_request, validate_response};
41
42/// Request router for dispatching MCP requests to appropriate handlers
43pub struct RequestRouter {
44    /// Handler registry
45    registry: Arc<HandlerRegistry>,
46    /// Route configuration
47    config: RouterConfig,
48    /// Custom route handlers
49    custom_routes: HashMap<String, Arc<dyn RouteHandler>>,
50    /// Resource subscription counters by URI (reserved for future functionality)
51    #[allow(dead_code)]
52    resource_subscriptions: DashMap<String, usize>,
53    /// Bidirectional communication router
54    bidirectional: BidirectionalRouter,
55    /// Protocol handlers
56    handlers: ProtocolHandlers,
57    /// Server-to-client requests adapter for tool-initiated requests (sampling, elicitation, roots)
58    /// This is injected into RequestContext so tools can make server-initiated requests
59    server_to_client: Arc<dyn ServerToClientRequests>,
60}
61
62impl std::fmt::Debug for RequestRouter {
63    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64        f.debug_struct("RequestRouter")
65            .field("config", &self.config)
66            .field("custom_routes_count", &self.custom_routes.len())
67            .finish()
68    }
69}
70
71impl RequestRouter {
72    /// Create a new request router
73    #[must_use]
74    pub fn new(registry: Arc<HandlerRegistry>, _metrics: Arc<ServerMetrics>) -> Self {
75        // Timeout management is now handled by middleware
76        let config = RouterConfig::default();
77
78        let handler_context = HandlerContext::new(Arc::clone(&registry));
79
80        let bidirectional = BidirectionalRouter::new();
81
82        // Create the server-to-client adapter that bridges bidirectional router
83        // to the ServerToClientRequests trait (type-safe, zero-cost abstraction)
84        let server_to_client: Arc<dyn ServerToClientRequests> =
85            Arc::new(ServerToClientAdapter::new(bidirectional.clone()));
86
87        Self {
88            registry,
89            config,
90            custom_routes: HashMap::new(),
91            resource_subscriptions: DashMap::new(),
92            bidirectional,
93            handlers: ProtocolHandlers::new(handler_context),
94            server_to_client,
95        }
96    }
97
98    /// Create a router with configuration
99    #[must_use]
100    pub fn with_config(
101        registry: Arc<HandlerRegistry>,
102        config: RouterConfig,
103        _metrics: Arc<ServerMetrics>,
104    ) -> Self {
105        // Timeout management is now handled by middleware
106
107        let handler_context = HandlerContext::new(Arc::clone(&registry));
108
109        let bidirectional = BidirectionalRouter::new();
110
111        // Create the server-to-client adapter that bridges bidirectional router
112        // to the ServerToClientRequests trait (type-safe, zero-cost abstraction)
113        let server_to_client: Arc<dyn ServerToClientRequests> =
114            Arc::new(ServerToClientAdapter::new(bidirectional.clone()));
115
116        Self {
117            registry,
118            config,
119            custom_routes: HashMap::new(),
120            resource_subscriptions: DashMap::new(),
121            bidirectional,
122            handlers: ProtocolHandlers::new(handler_context),
123            server_to_client,
124        }
125    }
126
127    // Timeout configuration now handled by middleware - no longer needed
128
129    /// Set the server request dispatcher for bidirectional communication
130    pub fn set_server_request_dispatcher<D>(&mut self, dispatcher: D)
131    where
132        D: ServerRequestDispatcher + 'static,
133    {
134        self.bidirectional.set_dispatcher(dispatcher);
135    }
136
137    /// Get the server request dispatcher
138    pub fn get_server_request_dispatcher(&self) -> Option<&Arc<dyn ServerRequestDispatcher>> {
139        self.bidirectional.get_dispatcher()
140    }
141
142    /// Check if bidirectional routing is enabled and supported
143    pub fn supports_bidirectional(&self) -> bool {
144        self.config.enable_bidirectional && self.bidirectional.supports_bidirectional()
145    }
146
147    /// Add a custom route handler
148    ///
149    /// # Errors
150    ///
151    /// Returns [`ServerError::Routing`] if a route for the same method already exists.
152    pub fn add_route<H>(&mut self, handler: H) -> ServerResult<()>
153    where
154        H: RouteHandler + 'static,
155    {
156        let metadata = handler.metadata();
157        let handler_arc: Arc<dyn RouteHandler> = Arc::new(handler);
158
159        for method in &metadata.methods {
160            if self.custom_routes.contains_key(method) {
161                return Err(ServerError::routing_with_method(
162                    format!("Route for method '{method}' already exists"),
163                    method.clone(),
164                ));
165            }
166            self.custom_routes
167                .insert(method.clone(), Arc::clone(&handler_arc));
168        }
169
170        Ok(())
171    }
172
173    /// Route a JSON-RPC request to the appropriate handler
174    pub async fn route(&self, request: JsonRpcRequest, ctx: RequestContext) -> JsonRpcResponse {
175        // Inject server-to-client capabilities into context for tool-initiated requests
176        // Enables type-safe sampling, elicitation, and roots listing with full context propagation
177        let ctx = ctx.with_server_to_client(Arc::clone(&self.server_to_client));
178
179        // Validate request if enabled
180        if self.config.validate_requests
181            && let Err(e) = validate_request(&request)
182        {
183            return error_response(&request, e);
184        }
185
186        // Handle the request
187        let result = match request.method.as_str() {
188            // Core protocol methods
189            "initialize" => self.handlers.handle_initialize(request, ctx).await,
190
191            // Tool methods
192            "tools/list" => self.handlers.handle_list_tools(request, ctx).await,
193            "tools/call" => self.handlers.handle_call_tool(request, ctx).await,
194
195            // Prompt methods
196            "prompts/list" => self.handlers.handle_list_prompts(request, ctx).await,
197            "prompts/get" => self.handlers.handle_get_prompt(request, ctx).await,
198
199            // Resource methods
200            "resources/list" => self.handlers.handle_list_resources(request, ctx).await,
201            "resources/read" => self.handlers.handle_read_resource(request, ctx).await,
202            "resources/subscribe" => self.handlers.handle_subscribe_resource(request, ctx).await,
203            "resources/unsubscribe" => {
204                self.handlers
205                    .handle_unsubscribe_resource(request, ctx)
206                    .await
207            }
208
209            // Logging methods
210            "logging/setLevel" => self.handlers.handle_set_log_level(request, ctx).await,
211
212            // Sampling methods
213            "sampling/createMessage" => self.handlers.handle_create_message(request, ctx).await,
214
215            // Roots methods
216            "roots/list" => self.handlers.handle_list_roots(request, ctx).await,
217
218            // Enhanced MCP features (MCP 2025-06-18 protocol methods)
219            "elicitation/create" => self.handlers.handle_elicitation(request, ctx).await,
220            "completion/complete" => self.handlers.handle_completion(request, ctx).await,
221            "resources/templates/list" => {
222                self.handlers
223                    .handle_list_resource_templates(request, ctx)
224                    .await
225            }
226            "ping" => self.handlers.handle_ping(request, ctx).await,
227
228            // Custom routes
229            method => {
230                if let Some(handler) = self.custom_routes.get(method) {
231                    let request_clone = request.clone();
232                    handler
233                        .handle(request, ctx)
234                        .await
235                        .unwrap_or_else(|e| error_response(&request_clone, e))
236                } else {
237                    method_not_found_response(&request)
238                }
239            }
240        };
241
242        // Validate response if enabled
243        if self.config.validate_responses
244            && let Err(e) = validate_response(&result)
245        {
246            warn!("Response validation failed: {}", e);
247        }
248
249        result
250    }
251
252    /// Handle batch requests
253    pub async fn route_batch(
254        &self,
255        requests: Vec<JsonRpcRequest>,
256        ctx: RequestContext,
257    ) -> Vec<JsonRpcResponse> {
258        // Note: Server capabilities are injected in route() for each request
259        let max_in_flight = self.config.max_concurrent_requests.max(1);
260        stream::iter(requests.into_iter())
261            .map(|req| {
262                let ctx_cloned = ctx.clone();
263                async move { self.route(req, ctx_cloned).await }
264            })
265            .buffer_unordered(max_in_flight)
266            .collect()
267            .await
268    }
269
270    /// Send an elicitation request to the client (server-initiated)
271    ///
272    /// # Errors
273    ///
274    /// Returns [`ServerError::Transport`] if:
275    /// - The bidirectional dispatcher is not configured
276    /// - The client request fails
277    /// - The client does not respond
278    pub async fn send_elicitation_to_client(
279        &self,
280        request: ElicitRequest,
281        ctx: RequestContext,
282    ) -> ServerResult<ElicitResult> {
283        self.bidirectional
284            .send_elicitation_to_client(request, ctx)
285            .await
286    }
287
288    /// Send a ping request to the client (server-initiated)
289    ///
290    /// # Errors
291    ///
292    /// Returns [`ServerError::Transport`] if:
293    /// - The bidirectional dispatcher is not configured
294    /// - The client request fails
295    /// - The client does not respond
296    pub async fn send_ping_to_client(
297        &self,
298        request: PingRequest,
299        ctx: RequestContext,
300    ) -> ServerResult<PingResult> {
301        self.bidirectional.send_ping_to_client(request, ctx).await
302    }
303
304    /// Send a create message request to the client (server-initiated)
305    ///
306    /// # Errors
307    ///
308    /// Returns [`ServerError::Transport`] if:
309    /// - The bidirectional dispatcher is not configured
310    /// - The client request fails
311    /// - The client does not support sampling
312    pub async fn send_create_message_to_client(
313        &self,
314        request: CreateMessageRequest,
315        ctx: RequestContext,
316    ) -> ServerResult<turbomcp_protocol::types::CreateMessageResult> {
317        self.bidirectional
318            .send_create_message_to_client(request, ctx)
319            .await
320    }
321
322    /// Send a list roots request to the client (server-initiated)
323    ///
324    /// # Errors
325    ///
326    /// Returns [`ServerError::Transport`] if:
327    /// - The bidirectional dispatcher is not configured
328    /// - The client request fails
329    /// - The client does not support roots
330    pub async fn send_list_roots_to_client(
331        &self,
332        request: turbomcp_protocol::types::ListRootsRequest,
333        ctx: RequestContext,
334    ) -> ServerResult<ListRootsResult> {
335        self.bidirectional
336            .send_list_roots_to_client(request, ctx)
337            .await
338    }
339}
340
341impl Clone for RequestRouter {
342    fn clone(&self) -> Self {
343        Self {
344            registry: Arc::clone(&self.registry),
345            config: self.config.clone(),
346            custom_routes: self.custom_routes.clone(),
347            resource_subscriptions: DashMap::new(),
348            bidirectional: self.bidirectional.clone(),
349            handlers: ProtocolHandlers::new(HandlerContext::new(Arc::clone(&self.registry))),
350            server_to_client: Arc::clone(&self.server_to_client),
351        }
352    }
353}
354
355// Design Note: ServerCapabilities trait implementation
356//
357// RequestRouter currently uses BidirectionalRouter for server-initiated requests
358// (sampling, elicitation, roots) instead of directly implementing the ServerCapabilities
359// trait from turbomcp_protocol::context::capabilities.
360//
361// Current Pattern:
362// - RequestRouter contains BidirectionalRouter which handles server-to-client requests
363// - BidirectionalRouter uses ServerRequestDispatcher trait for transport-agnostic dispatch
364// - This pattern provides better separation of concerns and testability
365//
366// Alternative (not implemented):
367// - RequestRouter could implement ServerCapabilities trait directly
368// - This would allow passing router as &dyn ServerCapabilities to tools
369// - Current pattern is preferred as it keeps routing and bidirectional concerns separate
370//
371// See: crates/turbomcp-server/src/routing/bidirectional.rs for current implementation
372
373/// Router alias for convenience
374pub type Router = RequestRouter;
375
376// ===================================================================
377// JsonRpcHandler Implementation - For HTTP Transport Integration
378// ===================================================================
379
380#[async_trait::async_trait]
381impl turbomcp_protocol::JsonRpcHandler for RequestRouter {
382    /// Handle a JSON-RPC request via the HTTP transport
383    ///
384    /// This implementation enables `RequestRouter` to be used directly with
385    /// the HTTP transport layer (`run_server`), supporting the builder pattern
386    /// for programmatic server construction.
387    ///
388    /// # Architecture
389    ///
390    /// - Parses raw JSON into `JsonRpcRequest`
391    /// - Creates default `RequestContext` (no auth/session for HTTP)
392    /// - Routes through the existing `route()` method
393    /// - Serializes `JsonRpcResponse` back to JSON
394    ///
395    /// This provides the same request handling as the macro pattern but
396    /// allows runtime handler registration via `ServerBuilder`.
397    async fn handle_request(&self, req_value: serde_json::Value) -> serde_json::Value {
398        // Parse the request
399        let req: JsonRpcRequest = match serde_json::from_value(req_value) {
400            Ok(r) => r,
401            Err(e) => {
402                return serde_json::json!({
403                    "jsonrpc": "2.0",
404                    "error": {
405                        "code": -32700,
406                        "message": format!("Parse error: {}", e)
407                    },
408                    "id": null
409                });
410            }
411        };
412
413        // Create default context for HTTP requests
414        // Note: For authenticated HTTP requests, middleware should inject auth info
415        let ctx = RequestContext::default();
416
417        // Route the request through the standard routing system
418        let response = self.route(req, ctx).await;
419
420        // Serialize response
421        match serde_json::to_value(&response) {
422            Ok(v) => v,
423            Err(e) => {
424                serde_json::json!({
425                    "jsonrpc": "2.0",
426                    "error": {
427                        "code": -32603,
428                        "message": format!("Internal error: failed to serialize response: {}", e)
429                    },
430                    "id": response.id
431                })
432            }
433        }
434    }
435}
436
437// Comprehensive tests in separate file (tokio/axum pattern)
438#[cfg(test)]
439mod tests;