turbomcp_server/
routing.rs

1//! Request routing and handler dispatch system
2
3use dashmap::DashMap;
4use std::collections::HashMap;
5use std::sync::Arc;
6use turbomcp_core::RequestContext;
7use turbomcp_protocol::{
8    jsonrpc::{JsonRpcRequest, JsonRpcResponse, JsonRpcVersion},
9    types::{
10        CallToolRequest,
11        CompleteRequestParams,
12        CompletionResponse,
13        CreateMessageRequest,
14        // New MCP feature types
15        ElicitRequest,
16        ElicitResult,
17        EmptyResult,
18        GetPromptRequest,
19        Implementation,
20        InitializeRequest,
21        InitializeResult,
22        ListPromptsResult,
23        ListResourceTemplatesRequest,
24        ListResourceTemplatesResult,
25        ListResourcesResult,
26        ListRootsResult,
27        ListToolsResult,
28        LoggingCapabilities,
29        PingRequest,
30        PingResult,
31        PromptsCapabilities,
32        ReadResourceRequest,
33        ResourcesCapabilities,
34        Root,
35        ServerCapabilities,
36        SetLevelRequest,
37        SubscribeRequest,
38        ToolsCapabilities,
39        UnsubscribeRequest,
40    },
41};
42
43use crate::registry::HandlerRegistry;
44use crate::{ServerError, ServerResult};
45use futures::stream::{self, StreamExt};
46use jsonschema::{Draft, JSONSchema};
47
48/// Request router for dispatching MCP requests to appropriate handlers
49pub struct RequestRouter {
50    /// Handler registry
51    registry: Arc<HandlerRegistry>,
52    /// Route configuration
53    config: RouterConfig,
54    /// Custom route handlers
55    custom_routes: HashMap<String, Arc<dyn RouteHandler>>,
56    /// Resource subscription counters by URI
57    resource_subscriptions: DashMap<String, usize>,
58    /// Server-initiated request dispatcher (for bidirectional communication)
59    server_request_dispatcher: Option<Arc<dyn ServerRequestDispatcher>>,
60}
61
62impl std::fmt::Debug for RequestRouter {
63    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64        f.debug_struct("RequestRouter")
65            .field("config", &self.config)
66            .field("custom_routes_count", &self.custom_routes.len())
67            .finish()
68    }
69}
70
71/// Router configuration
72#[derive(Debug, Clone)]
73pub struct RouterConfig {
74    /// Enable request validation
75    pub validate_requests: bool,
76    /// Enable response validation
77    pub validate_responses: bool,
78    /// Default request timeout in milliseconds
79    pub default_timeout_ms: u64,
80    /// Enable request tracing
81    pub enable_tracing: bool,
82    /// Maximum concurrent requests
83    pub max_concurrent_requests: usize,
84    /// Enable bidirectional routing (server-initiated requests)
85    pub enable_bidirectional: bool,
86}
87
88impl Default for RouterConfig {
89    fn default() -> Self {
90        Self {
91            validate_requests: true,
92            validate_responses: true,
93            default_timeout_ms: 30_000,
94            enable_tracing: true,
95            max_concurrent_requests: 1000,
96            enable_bidirectional: true,
97        }
98    }
99}
100
101/// Server request dispatcher trait for server-initiated requests
102#[async_trait::async_trait]
103pub trait ServerRequestDispatcher: Send + Sync {
104    /// Send an elicitation request to the client
105    async fn send_elicitation(
106        &self,
107        request: ElicitRequest,
108        ctx: RequestContext,
109    ) -> ServerResult<ElicitResult>;
110
111    /// Send a ping request to the client
112    async fn send_ping(
113        &self,
114        request: PingRequest,
115        ctx: RequestContext,
116    ) -> ServerResult<PingResult>;
117
118    /// Send a sampling create message request to the client
119    async fn send_create_message(
120        &self,
121        request: CreateMessageRequest,
122        ctx: RequestContext,
123    ) -> ServerResult<turbomcp_protocol::types::CreateMessageResult>;
124
125    /// Send a roots list request to the client  
126    async fn send_list_roots(
127        &self,
128        request: turbomcp_protocol::types::ListRootsRequest,
129        ctx: RequestContext,
130    ) -> ServerResult<ListRootsResult>;
131
132    /// Check if client supports bidirectional communication
133    fn supports_bidirectional(&self) -> bool;
134
135    /// Get client capabilities
136    async fn get_client_capabilities(&self) -> ServerResult<Option<serde_json::Value>>;
137}
138
139/// Route handler trait for custom routes
140#[async_trait::async_trait]
141pub trait RouteHandler: Send + Sync {
142    /// Handle the request
143    async fn handle(
144        &self,
145        request: JsonRpcRequest,
146        ctx: RequestContext,
147    ) -> ServerResult<JsonRpcResponse>;
148
149    /// Check if this handler can handle the request
150    fn can_handle(&self, method: &str) -> bool;
151
152    /// Get handler metadata
153    fn metadata(&self) -> RouteMetadata {
154        RouteMetadata::default()
155    }
156}
157
158/// Route metadata
159#[derive(Debug, Clone)]
160pub struct RouteMetadata {
161    /// Route name
162    pub name: String,
163    /// Route description
164    pub description: Option<String>,
165    /// Route version
166    pub version: String,
167    /// Supported methods
168    pub methods: Vec<String>,
169    /// Route tags
170    pub tags: Vec<String>,
171}
172
173impl Default for RouteMetadata {
174    fn default() -> Self {
175        Self {
176            name: "unknown".to_string(),
177            description: None,
178            version: "1.0.0".to_string(),
179            methods: Vec::new(),
180            tags: Vec::new(),
181        }
182    }
183}
184
185impl RequestRouter {
186    /// Create a new request router
187    #[must_use]
188    pub fn new(registry: Arc<HandlerRegistry>) -> Self {
189        Self {
190            registry,
191            config: RouterConfig::default(),
192            custom_routes: HashMap::new(),
193            resource_subscriptions: DashMap::new(),
194            server_request_dispatcher: None,
195        }
196    }
197
198    /// Create a router with configuration
199    #[must_use]
200    pub fn with_config(registry: Arc<HandlerRegistry>, config: RouterConfig) -> Self {
201        Self {
202            registry,
203            config,
204            custom_routes: HashMap::new(),
205            resource_subscriptions: DashMap::new(),
206            server_request_dispatcher: None,
207        }
208    }
209
210    /// Set the server request dispatcher for bidirectional communication
211    pub fn set_server_request_dispatcher<D>(&mut self, dispatcher: D)
212    where
213        D: ServerRequestDispatcher + 'static,
214    {
215        self.server_request_dispatcher = Some(Arc::new(dispatcher));
216    }
217
218    /// Get the server request dispatcher  
219    pub fn get_server_request_dispatcher(&self) -> Option<&Arc<dyn ServerRequestDispatcher>> {
220        self.server_request_dispatcher.as_ref()
221    }
222
223    /// Check if bidirectional routing is enabled and supported
224    pub fn supports_bidirectional(&self) -> bool {
225        self.config.enable_bidirectional && self.server_request_dispatcher.is_some()
226    }
227
228    /// Add a custom route handler
229    pub fn add_route<H>(&mut self, handler: H) -> ServerResult<()>
230    where
231        H: RouteHandler + 'static,
232    {
233        let metadata = handler.metadata();
234        let handler_arc: Arc<dyn RouteHandler> = Arc::new(handler);
235
236        for method in &metadata.methods {
237            if self.custom_routes.contains_key(method) {
238                return Err(ServerError::routing_with_method(
239                    format!("Route for method '{method}' already exists"),
240                    method.clone(),
241                ));
242            }
243            self.custom_routes
244                .insert(method.clone(), Arc::clone(&handler_arc));
245        }
246
247        Ok(())
248    }
249
250    /// Route a JSON-RPC request to the appropriate handler
251    pub async fn route(&self, request: JsonRpcRequest, ctx: RequestContext) -> JsonRpcResponse {
252        // Validate request if enabled
253        if self.config.validate_requests
254            && let Err(e) = self.validate_request(&request)
255        {
256            return self.error_response(&request, e);
257        }
258
259        // Handle the request
260        let result = match request.method.as_str() {
261            // Core protocol methods
262            "initialize" => self.handle_initialize(request, ctx).await,
263
264            // Tool methods
265            "tools/list" => self.handle_list_tools(request, ctx).await,
266            "tools/call" => self.handle_call_tool(request, ctx).await,
267
268            // Prompt methods
269            "prompts/list" => self.handle_list_prompts(request, ctx).await,
270            "prompts/get" => self.handle_get_prompt(request, ctx).await,
271
272            // Resource methods
273            "resources/list" => self.handle_list_resources(request, ctx).await,
274            "resources/read" => self.handle_read_resource(request, ctx).await,
275            "resources/subscribe" => self.handle_subscribe_resource(request, ctx).await,
276            "resources/unsubscribe" => self.handle_unsubscribe_resource(request, ctx).await,
277
278            // Logging methods
279            "logging/setLevel" => self.handle_set_log_level(request, ctx).await,
280
281            // Sampling methods
282            "sampling/createMessage" => self.handle_create_message(request, ctx).await,
283
284            // Roots methods
285            "roots/list" => self.handle_list_roots(request, ctx).await,
286
287            // Enhanced MCP features (new protocol methods)
288            "elicit/request" => self.handle_elicitation(request, ctx).await,
289            "complete/request" => self.handle_completion(request, ctx).await,
290            "resources/templates/list" => self.handle_list_resource_templates(request, ctx).await,
291            "ping/request" => self.handle_ping(request, ctx).await,
292
293            // Custom routes
294            method => {
295                if let Some(handler) = self.custom_routes.get(method) {
296                    let request_clone = request.clone();
297                    handler
298                        .handle(request, ctx)
299                        .await
300                        .unwrap_or_else(|e| self.error_response(&request_clone, e))
301                } else {
302                    self.method_not_found_response(&request)
303                }
304            }
305        };
306
307        // Validate response if enabled
308        if self.config.validate_responses
309            && let Err(e) = self.validate_response(&result)
310        {
311            tracing::warn!("Response validation failed: {}", e);
312        }
313
314        result
315    }
316
317    /// Handle batch requests
318    pub async fn route_batch(
319        &self,
320        requests: Vec<JsonRpcRequest>,
321        ctx: RequestContext,
322    ) -> Vec<JsonRpcResponse> {
323        let max_in_flight = self.config.max_concurrent_requests.max(1);
324        stream::iter(requests.into_iter())
325            .map(|req| {
326                let ctx_cloned = ctx.clone();
327                async move { self.route(req, ctx_cloned).await }
328            })
329            .buffer_unordered(max_in_flight)
330            .collect()
331            .await
332    }
333
334    // Protocol method handlers
335
336    async fn handle_initialize(
337        &self,
338        request: JsonRpcRequest,
339        _ctx: RequestContext,
340    ) -> JsonRpcResponse {
341        match self.parse_params::<InitializeRequest>(&request) {
342            Ok(_init_request) => {
343                let result = InitializeResult {
344                    protocol_version: turbomcp_protocol::PROTOCOL_VERSION.to_string(),
345                    server_info: Implementation {
346                        name: crate::SERVER_NAME.to_string(),
347                        title: Some("TurboMCP Server".to_string()),
348                        version: crate::SERVER_VERSION.to_string(),
349                    },
350                    capabilities: self.get_server_capabilities(),
351                    instructions: None,
352                };
353
354                self.success_response(&request, result)
355            }
356            Err(e) => self.error_response(&request, e),
357        }
358    }
359
360    async fn handle_list_tools(
361        &self,
362        request: JsonRpcRequest,
363        _ctx: RequestContext,
364    ) -> JsonRpcResponse {
365        let tools = self.registry.get_tool_definitions();
366        let result = ListToolsResult {
367            tools,
368            next_cursor: None,
369        };
370        self.success_response(&request, result)
371    }
372
373    async fn handle_call_tool(
374        &self,
375        request: JsonRpcRequest,
376        ctx: RequestContext,
377    ) -> JsonRpcResponse {
378        // Inject the router as server capabilities for server-initiated requests
379        let ctx = ctx.with_server_capabilities(
380            Arc::new(self.clone()) as Arc<dyn turbomcp_core::ServerCapabilities>
381        );
382
383        match self.parse_params::<CallToolRequest>(&request) {
384            Ok(call_request) => {
385                let tool_name = &call_request.name;
386
387                if let Some(handler) = self.registry.get_tool(tool_name) {
388                    // RBAC: if handler metadata enforces allowed roles, check RequestContext
389                    if self.config.validate_requests
390                        && let Some(required_roles) = handler.allowed_roles()
391                    {
392                        let has_role = ctx
393                            .metadata
394                            .get("auth")
395                            .and_then(|v| v.get("roles"))
396                            .and_then(|v| v.as_array())
397                            .is_some_and(|arr| {
398                                let user_set: std::collections::HashSet<String> = arr
399                                    .iter()
400                                    .filter_map(|v| {
401                                        v.as_str().map(std::string::ToString::to_string)
402                                    })
403                                    .collect();
404                                required_roles.iter().any(|r| user_set.contains(r))
405                            });
406                        if !has_role {
407                            return self.error_response(
408                                &request,
409                                ServerError::authentication(format!(
410                                    "Access denied for tool '{tool_name}'"
411                                )),
412                            );
413                        }
414                    }
415
416                    // Optional input validation using tool definition schema if present
417                    if self.config.validate_requests
418                        && let Some(arguments) = &call_request.arguments
419                    {
420                        // Best-effort shape check against ToolInput.properties/required
421                        let tool_def = handler.tool_definition();
422                        if let Some(props) = tool_def.input_schema.properties.as_ref() {
423                            // Build a JSON Schema object dynamically from ToolInput
424                            let mut schema = serde_json::json!({
425                                "type": "object",
426                                "properties": {},
427                                "additionalProperties": tool_def.input_schema.additional_properties.unwrap_or(true)
428                            });
429                            if let Some(obj) =
430                                schema.get_mut("properties").and_then(|v| v.as_object_mut())
431                            {
432                                for (k, v) in props {
433                                    obj.insert(k.clone(), v.clone());
434                                }
435                            }
436                            if let Some(required) = tool_def.input_schema.required.as_ref() {
437                                schema.as_object_mut().unwrap().insert(
438                                    "required".to_string(),
439                                    serde_json::Value::Array(
440                                        required
441                                            .iter()
442                                            .map(|s| serde_json::Value::String(s.clone()))
443                                            .collect(),
444                                    ),
445                                );
446                            }
447
448                            // Compile and validate
449                            if let Ok(compiled) = JSONSchema::options()
450                                .with_draft(Draft::Draft7)
451                                .compile(&schema)
452                            {
453                                let instance = serde_json::Value::Object(
454                                    arguments.clone().into_iter().collect(),
455                                );
456                                let mut error_messages: Vec<String> = Vec::new();
457                                if let Err(iter) = compiled.validate(&instance) {
458                                    for e in iter {
459                                        error_messages.push(format!("{}: {}", e.instance_path, e));
460                                    }
461                                }
462                                if !error_messages.is_empty() {
463                                    let joined = error_messages.join("; ");
464                                    let err = ServerError::routing_with_method(
465                                        format!("Argument validation failed: {joined}"),
466                                        "tools/call".to_string(),
467                                    );
468                                    return self.error_response(&request, err);
469                                }
470                            }
471                        }
472                    }
473                    match handler.handle(call_request, ctx).await {
474                        Ok(result) => self.success_response(&request, result),
475                        Err(e) => self.error_response(&request, e),
476                    }
477                } else {
478                    let error = ServerError::not_found(format!("Tool '{tool_name}'"));
479                    self.error_response(&request, error)
480                }
481            }
482            Err(e) => self.error_response(&request, e),
483        }
484    }
485
486    async fn handle_list_prompts(
487        &self,
488        request: JsonRpcRequest,
489        _ctx: RequestContext,
490    ) -> JsonRpcResponse {
491        let prompts = self.registry.get_prompt_definitions();
492        let result = ListPromptsResult {
493            prompts,
494            next_cursor: None,
495        };
496        self.success_response(&request, result)
497    }
498
499    async fn handle_get_prompt(
500        &self,
501        request: JsonRpcRequest,
502        ctx: RequestContext,
503    ) -> JsonRpcResponse {
504        match self.parse_params::<GetPromptRequest>(&request) {
505            Ok(prompt_request) => {
506                let prompt_name = &prompt_request.name;
507
508                if let Some(handler) = self.registry.get_prompt(prompt_name) {
509                    match handler.handle(prompt_request, ctx).await {
510                        Ok(result) => self.success_response(&request, result),
511                        Err(e) => self.error_response(&request, e),
512                    }
513                } else {
514                    let error = ServerError::not_found(format!("Prompt '{prompt_name}'"));
515                    self.error_response(&request, error)
516                }
517            }
518            Err(e) => self.error_response(&request, e),
519        }
520    }
521
522    async fn handle_list_resources(
523        &self,
524        request: JsonRpcRequest,
525        _ctx: RequestContext,
526    ) -> JsonRpcResponse {
527        let resources = self.registry.get_resource_definitions();
528        let result = ListResourcesResult {
529            resources,
530            next_cursor: None,
531        };
532        self.success_response(&request, result)
533    }
534
535    async fn handle_read_resource(
536        &self,
537        request: JsonRpcRequest,
538        ctx: RequestContext,
539    ) -> JsonRpcResponse {
540        match self.parse_params::<ReadResourceRequest>(&request) {
541            Ok(resource_request) => {
542                let resource_uri = &resource_request.uri;
543
544                // Find handler by matching URI pattern
545                for handler in &self.registry.resources {
546                    let resource_def = handler.value().resource_definition();
547                    if self.matches_uri_pattern(&resource_def.uri, resource_uri) {
548                        match handler.value().handle(resource_request, ctx).await {
549                            Ok(result) => return self.success_response(&request, result),
550                            Err(e) => return self.error_response(&request, e),
551                        }
552                    }
553                }
554
555                let error = ServerError::not_found(format!("Resource '{resource_uri}'"));
556                self.error_response(&request, error)
557            }
558            Err(e) => self.error_response(&request, e),
559        }
560    }
561
562    async fn handle_subscribe_resource(
563        &self,
564        request: JsonRpcRequest,
565        _ctx: RequestContext,
566    ) -> JsonRpcResponse {
567        match self.parse_params::<SubscribeRequest>(&request) {
568            Ok(sub) => {
569                let uri = sub.uri;
570                let new_count_ref = self
571                    .resource_subscriptions
572                    .entry(uri.clone())
573                    .and_modify(|c| *c += 1)
574                    .or_insert(1usize);
575                let new_count: usize = *new_count_ref;
576                tracing::debug!(uri = %uri, count = new_count, "resource subscribed");
577                self.success_response(&request, EmptyResult {})
578            }
579            Err(e) => self.error_response(&request, e),
580        }
581    }
582
583    async fn handle_unsubscribe_resource(
584        &self,
585        request: JsonRpcRequest,
586        _ctx: RequestContext,
587    ) -> JsonRpcResponse {
588        match self.parse_params::<UnsubscribeRequest>(&request) {
589            Ok(unsub) => {
590                let uri = unsub.uri;
591                if let Some(mut entry) = self.resource_subscriptions.get_mut(&uri) {
592                    let count = entry.value_mut();
593                    if *count > 0 {
594                        *count -= 1;
595                    }
596                    if *count == 0 {
597                        drop(entry);
598                        self.resource_subscriptions.remove(&uri);
599                    }
600                    tracing::debug!(uri = %uri, "resource unsubscribed");
601                }
602                self.success_response(&request, EmptyResult {})
603            }
604            Err(e) => self.error_response(&request, e),
605        }
606    }
607
608    async fn handle_set_log_level(
609        &self,
610        request: JsonRpcRequest,
611        ctx: RequestContext,
612    ) -> JsonRpcResponse {
613        match self.parse_params::<SetLevelRequest>(&request) {
614            Ok(level_request) => {
615                // Use first available logging handler
616                if let Some(handler_entry) = self.registry.logging.iter().next() {
617                    match handler_entry.value().handle(level_request, ctx).await {
618                        Ok(result) => self.success_response(&request, result),
619                        Err(e) => self.error_response(&request, e),
620                    }
621                } else {
622                    let error = ServerError::not_found("No logging handler available");
623                    self.error_response(&request, error)
624                }
625            }
626            Err(e) => self.error_response(&request, e),
627        }
628    }
629
630    async fn handle_create_message(
631        &self,
632        request: JsonRpcRequest,
633        ctx: RequestContext,
634    ) -> JsonRpcResponse {
635        match self.parse_params::<CreateMessageRequest>(&request) {
636            Ok(message_request) => {
637                // Use first available sampling handler
638                if let Some(handler_entry) = self.registry.sampling.iter().next() {
639                    match handler_entry.value().handle(message_request, ctx).await {
640                        Ok(result) => self.success_response(&request, result),
641                        Err(e) => self.error_response(&request, e),
642                    }
643                } else {
644                    let error = ServerError::not_found("No sampling handler available");
645                    self.error_response(&request, error)
646                }
647            }
648            Err(e) => self.error_response(&request, e),
649        }
650    }
651
652    async fn handle_list_roots(
653        &self,
654        request: JsonRpcRequest,
655        _ctx: RequestContext,
656    ) -> JsonRpcResponse {
657        // Get configured roots from registry
658        let roots = self.registry.get_roots();
659
660        // If no roots configured, provide OS-specific defaults
661        let roots = if roots.is_empty() {
662            let mut default_roots: Vec<Root> = Vec::new();
663            #[cfg(target_os = "linux")]
664            {
665                default_roots.push(Root {
666                    uri: "file:///".to_string(),
667                    name: Some("root".to_string()),
668                });
669            }
670            #[cfg(target_os = "macos")]
671            {
672                default_roots.push(Root {
673                    uri: "file:///".to_string(),
674                    name: Some("root".to_string()),
675                });
676                default_roots.push(Root {
677                    uri: "file:///Volumes".to_string(),
678                    name: Some("Volumes".to_string()),
679                });
680            }
681            #[cfg(target_os = "windows")]
682            {
683                // Common drive letters; clients can probe for availability
684                for drive in ['C', 'D', 'E', 'F', 'G', 'H'] {
685                    default_roots.push(Root {
686                        uri: format!("file:///{}:/", drive),
687                        name: Some(format!("{}:", drive)),
688                    });
689                }
690            }
691            default_roots
692        } else {
693            roots
694        };
695
696        let result = ListRootsResult { roots };
697        self.success_response(&request, result)
698    }
699
700    // ========================================================================
701    // Enhanced MCP Feature Handlers
702    // ========================================================================
703
704    async fn handle_elicitation(
705        &self,
706        request: JsonRpcRequest,
707        _ctx: RequestContext,
708    ) -> JsonRpcResponse {
709        match self.parse_params::<ElicitRequest>(&request) {
710            Ok(_elicit_request) => {
711                // Default elicitation handler - returns decline action
712                // This should be overridden by applications with proper elicitation handlers
713                let result = ElicitResult {
714                    action: turbomcp_protocol::types::ElicitationAction::Decline,
715                    content: None,
716                    _meta: Some(serde_json::json!({
717                        "message": "Elicitation not supported - no handler registered"
718                    })),
719                };
720                self.success_response(&request, result)
721            }
722            Err(e) => self.error_response(&request, e),
723        }
724    }
725
726    async fn handle_completion(
727        &self,
728        request: JsonRpcRequest,
729        _ctx: RequestContext,
730    ) -> JsonRpcResponse {
731        match self.parse_params::<CompleteRequestParams>(&request) {
732            Ok(_complete_request) => {
733                // Default completion handler - returns empty completions
734                // This should be overridden by applications with proper completion handlers
735                let result = CompletionResponse {
736                    values: Vec::new(),
737                    has_more: Some(false),
738                    total: Some(0),
739                };
740                self.success_response(&request, result)
741            }
742            Err(e) => self.error_response(&request, e),
743        }
744    }
745
746    async fn handle_list_resource_templates(
747        &self,
748        request: JsonRpcRequest,
749        _ctx: RequestContext,
750    ) -> JsonRpcResponse {
751        match self.parse_params::<ListResourceTemplatesRequest>(&request) {
752            Ok(_template_request) => {
753                // Default resource template handler - returns empty templates
754                // This should be overridden by applications with proper resource template handlers
755                let result = ListResourceTemplatesResult {
756                    resource_templates: Vec::new(),
757                    next_cursor: None,
758                    _meta: Some(serde_json::json!({
759                        "message": "Resource templates not supported - no handler registered"
760                    })),
761                };
762                self.success_response(&request, result)
763            }
764            Err(e) => self.error_response(&request, e),
765        }
766    }
767
768    async fn handle_ping(&self, request: JsonRpcRequest, _ctx: RequestContext) -> JsonRpcResponse {
769        match self.parse_params::<PingRequest>(&request) {
770            Ok(_ping_request) => {
771                // Default ping handler - basic health check response
772                let result = PingResult {
773                    _meta: Some(serde_json::json!({
774                        "status": "healthy",
775                        "timestamp": chrono::Utc::now().to_rfc3339(),
776                        "server": "turbomcp-server",
777                    })),
778                };
779                self.success_response(&request, result)
780            }
781            Err(e) => self.error_response(&request, e),
782        }
783    }
784
785    // ========================================================================
786    // Server-Initiated Request Methods (Bidirectional Communication)
787    // ========================================================================
788
789    /// Send an elicitation request to the client (server-initiated)
790    pub async fn send_elicitation_to_client(
791        &self,
792        request: ElicitRequest,
793        ctx: RequestContext,
794    ) -> ServerResult<ElicitResult> {
795        if let Some(dispatcher) = &self.server_request_dispatcher {
796            dispatcher.send_elicitation(request, ctx).await
797        } else {
798            Err(ServerError::Handler {
799                message: "Server request dispatcher not configured for bidirectional communication"
800                    .to_string(),
801                context: Some("elicitation".to_string()),
802            })
803        }
804    }
805
806    /// Send a ping request to the client (server-initiated)
807    pub async fn send_ping_to_client(
808        &self,
809        request: PingRequest,
810        ctx: RequestContext,
811    ) -> ServerResult<PingResult> {
812        if let Some(dispatcher) = &self.server_request_dispatcher {
813            dispatcher.send_ping(request, ctx).await
814        } else {
815            Err(ServerError::Handler {
816                message: "Server request dispatcher not configured for bidirectional communication"
817                    .to_string(),
818                context: Some("ping".to_string()),
819            })
820        }
821    }
822
823    /// Send a create message request to the client (server-initiated)
824    pub async fn send_create_message_to_client(
825        &self,
826        request: CreateMessageRequest,
827        ctx: RequestContext,
828    ) -> ServerResult<turbomcp_protocol::types::CreateMessageResult> {
829        if let Some(dispatcher) = &self.server_request_dispatcher {
830            dispatcher.send_create_message(request, ctx).await
831        } else {
832            Err(ServerError::Handler {
833                message: "Server request dispatcher not configured for bidirectional communication"
834                    .to_string(),
835                context: Some("create_message".to_string()),
836            })
837        }
838    }
839
840    /// Send a list roots request to the client (server-initiated)
841    pub async fn send_list_roots_to_client(
842        &self,
843        request: turbomcp_protocol::types::ListRootsRequest,
844        ctx: RequestContext,
845    ) -> ServerResult<ListRootsResult> {
846        if let Some(dispatcher) = &self.server_request_dispatcher {
847            dispatcher.send_list_roots(request, ctx).await
848        } else {
849            Err(ServerError::Handler {
850                message: "Server request dispatcher not configured for bidirectional communication"
851                    .to_string(),
852                context: Some("list_roots".to_string()),
853            })
854        }
855    }
856
857    // Helper methods
858
859    fn get_server_capabilities(&self) -> ServerCapabilities {
860        ServerCapabilities {
861            tools: if self.registry.tools.is_empty() {
862                None
863            } else {
864                Some(ToolsCapabilities::default())
865            },
866            prompts: if self.registry.prompts.is_empty() {
867                None
868            } else {
869                Some(PromptsCapabilities::default())
870            },
871            resources: if self.registry.resources.is_empty() {
872                None
873            } else {
874                Some(ResourcesCapabilities::default())
875            },
876            logging: if self.registry.logging.is_empty() {
877                None
878            } else {
879                Some(LoggingCapabilities)
880            },
881            completions: None, // Completion capabilities not enabled by default
882            experimental: None,
883        }
884    }
885
886    fn parse_params<T>(&self, request: &JsonRpcRequest) -> ServerResult<T>
887    where
888        T: serde::de::DeserializeOwned,
889    {
890        match &request.params {
891            Some(params) => serde_json::from_value(params.clone()).map_err(|e| {
892                ServerError::routing_with_method(
893                    format!("Invalid parameters: {e}"),
894                    request.method.clone(),
895                )
896            }),
897            None => Err(ServerError::routing_with_method(
898                "Missing required parameters".to_string(),
899                request.method.clone(),
900            )),
901        }
902    }
903
904    fn success_response<T>(&self, request: &JsonRpcRequest, result: T) -> JsonRpcResponse
905    where
906        T: serde::Serialize,
907    {
908        JsonRpcResponse {
909            jsonrpc: JsonRpcVersion,
910            id: Some(request.id.clone()),
911            result: Some(serde_json::to_value(result).unwrap()),
912            error: None,
913        }
914    }
915
916    fn error_response(&self, request: &JsonRpcRequest, error: ServerError) -> JsonRpcResponse {
917        JsonRpcResponse {
918            jsonrpc: JsonRpcVersion,
919            id: Some(request.id.clone()),
920            result: None,
921            error: Some(turbomcp_protocol::jsonrpc::JsonRpcError {
922                code: error.error_code(),
923                message: error.to_string(),
924                data: None,
925            }),
926        }
927    }
928
929    fn method_not_found_response(&self, request: &JsonRpcRequest) -> JsonRpcResponse {
930        JsonRpcResponse {
931            jsonrpc: JsonRpcVersion,
932            id: Some(request.id.clone()),
933            result: None,
934            error: Some(turbomcp_protocol::jsonrpc::JsonRpcError {
935                code: -32601,
936                message: format!("Method '{}' not found", request.method),
937                data: None,
938            }),
939        }
940    }
941
942    fn validate_request(&self, _request: &JsonRpcRequest) -> ServerResult<()> {
943        // Lightweight structural validation using protocol validator
944        let validator = turbomcp_protocol::validation::ProtocolValidator::new();
945        match validator.validate_request(_request) {
946            turbomcp_protocol::validation::ValidationResult::Invalid(errors) => {
947                let msg = errors
948                    .into_iter()
949                    .map(|e| {
950                        format!(
951                            "{}: {}{}",
952                            e.code,
953                            e.message,
954                            e.field_path
955                                .map(|p| format!(" (@ {p})"))
956                                .unwrap_or_default()
957                        )
958                    })
959                    .collect::<Vec<_>>()
960                    .join("; ");
961                Err(ServerError::routing_with_method(
962                    format!("Request validation failed: {msg}"),
963                    _request.method.clone(),
964                ))
965            }
966            _ => Ok(()),
967        }
968    }
969
970    fn validate_response(&self, _response: &JsonRpcResponse) -> ServerResult<()> {
971        let validator = turbomcp_protocol::validation::ProtocolValidator::new();
972        match validator.validate_response(_response) {
973            turbomcp_protocol::validation::ValidationResult::Invalid(errors) => {
974                let msg = errors
975                    .into_iter()
976                    .map(|e| {
977                        format!(
978                            "{}: {}{}",
979                            e.code,
980                            e.message,
981                            e.field_path
982                                .map(|p| format!(" (@ {p})"))
983                                .unwrap_or_default()
984                        )
985                    })
986                    .collect::<Vec<_>>()
987                    .join("; ");
988                Err(ServerError::routing(format!(
989                    "Response validation failed: {msg}"
990                )))
991            }
992            _ => Ok(()),
993        }
994    }
995
996    fn matches_uri_pattern(&self, pattern: &str, uri: &str) -> bool {
997        // Convert simple templates to regex (very basic):
998        // - '*' => '.*'
999        // - '{param}' => '[^/]+'
1000        let mut regex_str = String::from("^");
1001        let mut chars = pattern.chars().peekable();
1002        while let Some(c) = chars.next() {
1003            match c {
1004                '*' => regex_str.push_str(".*"),
1005                '{' => {
1006                    // consume until '}'
1007                    for nc in chars.by_ref() {
1008                        if nc == '}' {
1009                            break;
1010                        }
1011                    }
1012                    regex_str.push_str("[^/]+");
1013                }
1014                '.' | '+' | '?' | '(' | ')' | '|' | '^' | '$' | '[' | ']' | '\\' => {
1015                    regex_str.push('\\');
1016                    regex_str.push(c);
1017                }
1018                other => regex_str.push(other),
1019            }
1020        }
1021        regex_str.push('$');
1022        let re = regex::Regex::new(&regex_str).unwrap_or_else(|_| regex::Regex::new("^$").unwrap());
1023        re.is_match(uri)
1024    }
1025}
1026
1027impl Clone for RequestRouter {
1028    fn clone(&self) -> Self {
1029        Self {
1030            registry: Arc::clone(&self.registry),
1031            config: self.config.clone(),
1032            custom_routes: self.custom_routes.clone(),
1033            resource_subscriptions: DashMap::new(),
1034            server_request_dispatcher: self.server_request_dispatcher.clone(),
1035        }
1036    }
1037}
1038
1039// Implementation of ServerCapabilities for RequestRouter
1040// This enables tools to make server-initiated requests through the context
1041impl turbomcp_core::ServerCapabilities for RequestRouter {
1042    fn create_message(
1043        &self,
1044        request: serde_json::Value,
1045    ) -> futures::future::BoxFuture<
1046        '_,
1047        Result<serde_json::Value, Box<dyn std::error::Error + Send + Sync>>,
1048    > {
1049        Box::pin(async move {
1050            let request: CreateMessageRequest = serde_json::from_value(request)
1051                .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
1052            let ctx = turbomcp_core::RequestContext::new();
1053            let result = self
1054                .send_create_message_to_client(request, ctx)
1055                .await
1056                .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
1057            serde_json::to_value(result)
1058                .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
1059        })
1060    }
1061
1062    fn elicit(
1063        &self,
1064        request: serde_json::Value,
1065    ) -> futures::future::BoxFuture<
1066        '_,
1067        Result<serde_json::Value, Box<dyn std::error::Error + Send + Sync>>,
1068    > {
1069        Box::pin(async move {
1070            let request: ElicitRequest = serde_json::from_value(request)
1071                .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
1072            let ctx = turbomcp_core::RequestContext::new();
1073            let result = self
1074                .send_elicitation_to_client(request, ctx)
1075                .await
1076                .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
1077            serde_json::to_value(result)
1078                .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
1079        })
1080    }
1081
1082    fn list_roots(
1083        &self,
1084    ) -> futures::future::BoxFuture<
1085        '_,
1086        Result<serde_json::Value, Box<dyn std::error::Error + Send + Sync>>,
1087    > {
1088        Box::pin(async move {
1089            let ctx = turbomcp_core::RequestContext::new();
1090            let result = self
1091                .send_list_roots_to_client(turbomcp_protocol::types::ListRootsRequest {}, ctx)
1092                .await
1093                .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
1094            serde_json::to_value(result)
1095                .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
1096        })
1097    }
1098}
1099
1100/// Route definition for custom routing
1101#[derive(Clone)]
1102pub struct Route {
1103    /// Route method pattern
1104    pub method: String,
1105    /// Route handler
1106    pub handler: Arc<dyn RouteHandler>,
1107    /// Route metadata
1108    pub metadata: RouteMetadata,
1109}
1110
1111impl std::fmt::Debug for Route {
1112    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1113        f.debug_struct("Route")
1114            .field("method", &self.method)
1115            .field("metadata", &self.metadata)
1116            .finish()
1117    }
1118}
1119
1120/// Router alias for convenience
1121pub type Router = RequestRouter;