turbomcp_server/
routing.rs

1//! Request routing and handler dispatch system
2
3use dashmap::DashMap;
4use std::collections::HashMap;
5use std::sync::Arc;
6use turbomcp_core::RequestContext;
7use turbomcp_protocol::{
8    jsonrpc::{JsonRpcRequest, JsonRpcResponse},
9    types::{
10        CallToolRequest,
11        CompleteRequestParams,
12        CompletionResponse,
13        CreateMessageRequest,
14        // New MCP feature types
15        ElicitRequest,
16        ElicitResult,
17        EmptyResult,
18        GetPromptRequest,
19        Implementation,
20        InitializeRequest,
21        InitializeResult,
22        ListPromptsResult,
23        ListResourceTemplatesRequest,
24        ListResourceTemplatesResult,
25        ListResourcesResult,
26        ListRootsResult,
27        ListToolsResult,
28        LoggingCapabilities,
29        PingRequest,
30        PingResult,
31        PromptsCapabilities,
32        ReadResourceRequest,
33        ResourcesCapabilities,
34        Root,
35        ServerCapabilities,
36        SetLevelRequest,
37        SubscribeRequest,
38        ToolsCapabilities,
39        UnsubscribeRequest,
40    },
41};
42
43use crate::registry::HandlerRegistry;
44use crate::{ServerError, ServerResult};
45use futures::stream::{self, StreamExt};
46use jsonschema::{Draft, JSONSchema};
47
48/// Request router for dispatching MCP requests to appropriate handlers
49pub struct RequestRouter {
50    /// Handler registry
51    registry: Arc<HandlerRegistry>,
52    /// Route configuration
53    config: RouterConfig,
54    /// Custom route handlers
55    custom_routes: HashMap<String, Arc<dyn RouteHandler>>,
56    /// Resource subscription counters by URI
57    resource_subscriptions: DashMap<String, usize>,
58    /// Server-initiated request dispatcher (for bidirectional communication)
59    server_request_dispatcher: Option<Arc<dyn ServerRequestDispatcher>>,
60}
61
62impl std::fmt::Debug for RequestRouter {
63    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64        f.debug_struct("RequestRouter")
65            .field("config", &self.config)
66            .field("custom_routes_count", &self.custom_routes.len())
67            .finish()
68    }
69}
70
71/// Router configuration
72#[derive(Debug, Clone)]
73pub struct RouterConfig {
74    /// Enable request validation
75    pub validate_requests: bool,
76    /// Enable response validation
77    pub validate_responses: bool,
78    /// Default request timeout in milliseconds
79    pub default_timeout_ms: u64,
80    /// Enable request tracing
81    pub enable_tracing: bool,
82    /// Maximum concurrent requests
83    pub max_concurrent_requests: usize,
84    /// Enable bidirectional routing (server-initiated requests)
85    pub enable_bidirectional: bool,
86}
87
88impl Default for RouterConfig {
89    fn default() -> Self {
90        Self {
91            validate_requests: true,
92            validate_responses: true,
93            default_timeout_ms: 30_000,
94            enable_tracing: true,
95            max_concurrent_requests: 1000,
96            enable_bidirectional: true,
97        }
98    }
99}
100
101/// Server request dispatcher trait for server-initiated requests
102#[async_trait::async_trait]
103pub trait ServerRequestDispatcher: Send + Sync {
104    /// Send an elicitation request to the client
105    async fn send_elicitation(
106        &self,
107        request: ElicitRequest,
108        ctx: RequestContext,
109    ) -> ServerResult<ElicitResult>;
110
111    /// Send a ping request to the client
112    async fn send_ping(
113        &self,
114        request: PingRequest,
115        ctx: RequestContext,
116    ) -> ServerResult<PingResult>;
117
118    /// Send a sampling create message request to the client
119    async fn send_create_message(
120        &self,
121        request: CreateMessageRequest,
122        ctx: RequestContext,
123    ) -> ServerResult<turbomcp_protocol::types::CreateMessageResult>;
124
125    /// Send a roots list request to the client  
126    async fn send_list_roots(
127        &self,
128        request: turbomcp_protocol::types::ListRootsRequest,
129        ctx: RequestContext,
130    ) -> ServerResult<ListRootsResult>;
131
132    /// Check if client supports bidirectional communication
133    fn supports_bidirectional(&self) -> bool;
134
135    /// Get client capabilities
136    async fn get_client_capabilities(&self) -> ServerResult<Option<serde_json::Value>>;
137}
138
139/// Route handler trait for custom routes
140#[async_trait::async_trait]
141pub trait RouteHandler: Send + Sync {
142    /// Handle the request
143    async fn handle(
144        &self,
145        request: JsonRpcRequest,
146        ctx: RequestContext,
147    ) -> ServerResult<JsonRpcResponse>;
148
149    /// Check if this handler can handle the request
150    fn can_handle(&self, method: &str) -> bool;
151
152    /// Get handler metadata
153    fn metadata(&self) -> RouteMetadata {
154        RouteMetadata::default()
155    }
156}
157
158/// Route metadata
159#[derive(Debug, Clone)]
160pub struct RouteMetadata {
161    /// Route name
162    pub name: String,
163    /// Route description
164    pub description: Option<String>,
165    /// Route version
166    pub version: String,
167    /// Supported methods
168    pub methods: Vec<String>,
169    /// Route tags
170    pub tags: Vec<String>,
171}
172
173impl Default for RouteMetadata {
174    fn default() -> Self {
175        Self {
176            name: "unknown".to_string(),
177            description: None,
178            version: "1.0.0".to_string(),
179            methods: Vec::new(),
180            tags: Vec::new(),
181        }
182    }
183}
184
185impl RequestRouter {
186    /// Create a new request router
187    #[must_use]
188    pub fn new(registry: Arc<HandlerRegistry>) -> Self {
189        Self {
190            registry,
191            config: RouterConfig::default(),
192            custom_routes: HashMap::new(),
193            resource_subscriptions: DashMap::new(),
194            server_request_dispatcher: None,
195        }
196    }
197
198    /// Create a router with configuration
199    #[must_use]
200    pub fn with_config(registry: Arc<HandlerRegistry>, config: RouterConfig) -> Self {
201        Self {
202            registry,
203            config,
204            custom_routes: HashMap::new(),
205            resource_subscriptions: DashMap::new(),
206            server_request_dispatcher: None,
207        }
208    }
209
210    /// Set the server request dispatcher for bidirectional communication
211    pub fn set_server_request_dispatcher<D>(&mut self, dispatcher: D)
212    where
213        D: ServerRequestDispatcher + 'static,
214    {
215        self.server_request_dispatcher = Some(Arc::new(dispatcher));
216    }
217
218    /// Get the server request dispatcher  
219    pub fn get_server_request_dispatcher(&self) -> Option<&Arc<dyn ServerRequestDispatcher>> {
220        self.server_request_dispatcher.as_ref()
221    }
222
223    /// Check if bidirectional routing is enabled and supported
224    pub fn supports_bidirectional(&self) -> bool {
225        self.config.enable_bidirectional && self.server_request_dispatcher.is_some()
226    }
227
228    /// Add a custom route handler
229    pub fn add_route<H>(&mut self, handler: H) -> ServerResult<()>
230    where
231        H: RouteHandler + 'static,
232    {
233        let metadata = handler.metadata();
234        let handler_arc: Arc<dyn RouteHandler> = Arc::new(handler);
235
236        for method in &metadata.methods {
237            if self.custom_routes.contains_key(method) {
238                return Err(ServerError::routing_with_method(
239                    format!("Route for method '{method}' already exists"),
240                    method.clone(),
241                ));
242            }
243            self.custom_routes
244                .insert(method.clone(), Arc::clone(&handler_arc));
245        }
246
247        Ok(())
248    }
249
250    /// Route a JSON-RPC request to the appropriate handler
251    pub async fn route(&self, request: JsonRpcRequest, ctx: RequestContext) -> JsonRpcResponse {
252        // Validate request if enabled
253        if self.config.validate_requests
254            && let Err(e) = self.validate_request(&request)
255        {
256            return self.error_response(&request, e);
257        }
258
259        // Handle the request
260        let result = match request.method.as_str() {
261            // Core protocol methods
262            "initialize" => self.handle_initialize(request, ctx).await,
263
264            // Tool methods
265            "tools/list" => self.handle_list_tools(request, ctx).await,
266            "tools/call" => self.handle_call_tool(request, ctx).await,
267
268            // Prompt methods
269            "prompts/list" => self.handle_list_prompts(request, ctx).await,
270            "prompts/get" => self.handle_get_prompt(request, ctx).await,
271
272            // Resource methods
273            "resources/list" => self.handle_list_resources(request, ctx).await,
274            "resources/read" => self.handle_read_resource(request, ctx).await,
275            "resources/subscribe" => self.handle_subscribe_resource(request, ctx).await,
276            "resources/unsubscribe" => self.handle_unsubscribe_resource(request, ctx).await,
277
278            // Logging methods
279            "logging/setLevel" => self.handle_set_log_level(request, ctx).await,
280
281            // Sampling methods
282            "sampling/createMessage" => self.handle_create_message(request, ctx).await,
283
284            // Roots methods
285            "roots/list" => self.handle_list_roots(request, ctx).await,
286
287            // Enhanced MCP features (new protocol methods)
288            "elicit/request" => self.handle_elicitation(request, ctx).await,
289            "complete/request" => self.handle_completion(request, ctx).await,
290            "resources/templates/list" => self.handle_list_resource_templates(request, ctx).await,
291            "ping/request" => self.handle_ping(request, ctx).await,
292
293            // Custom routes
294            method => {
295                if let Some(handler) = self.custom_routes.get(method) {
296                    let request_clone = request.clone();
297                    handler
298                        .handle(request, ctx)
299                        .await
300                        .unwrap_or_else(|e| self.error_response(&request_clone, e))
301                } else {
302                    self.method_not_found_response(&request)
303                }
304            }
305        };
306
307        // Validate response if enabled
308        if self.config.validate_responses
309            && let Err(e) = self.validate_response(&result)
310        {
311            tracing::warn!("Response validation failed: {}", e);
312        }
313
314        result
315    }
316
317    /// Handle batch requests
318    pub async fn route_batch(
319        &self,
320        requests: Vec<JsonRpcRequest>,
321        ctx: RequestContext,
322    ) -> Vec<JsonRpcResponse> {
323        let max_in_flight = self.config.max_concurrent_requests.max(1);
324        stream::iter(requests.into_iter())
325            .map(|req| {
326                let ctx_cloned = ctx.clone();
327                async move { self.route(req, ctx_cloned).await }
328            })
329            .buffer_unordered(max_in_flight)
330            .collect()
331            .await
332    }
333
334    // Protocol method handlers
335
336    async fn handle_initialize(
337        &self,
338        request: JsonRpcRequest,
339        _ctx: RequestContext,
340    ) -> JsonRpcResponse {
341        match self.parse_params::<InitializeRequest>(&request) {
342            Ok(_init_request) => {
343                let result = InitializeResult {
344                    protocol_version: turbomcp_protocol::PROTOCOL_VERSION.to_string(),
345                    server_info: Implementation {
346                        name: crate::SERVER_NAME.to_string(),
347                        title: Some("TurboMCP Server".to_string()),
348                        version: crate::SERVER_VERSION.to_string(),
349                    },
350                    capabilities: self.get_server_capabilities(),
351                    instructions: None,
352                    _meta: None,
353                };
354
355                self.success_response(&request, result)
356            }
357            Err(e) => self.error_response(&request, e),
358        }
359    }
360
361    async fn handle_list_tools(
362        &self,
363        request: JsonRpcRequest,
364        _ctx: RequestContext,
365    ) -> JsonRpcResponse {
366        let tools = self.registry.get_tool_definitions();
367        let result = ListToolsResult {
368            tools,
369            next_cursor: None,
370            _meta: None,
371        };
372        self.success_response(&request, result)
373    }
374
375    async fn handle_call_tool(
376        &self,
377        request: JsonRpcRequest,
378        ctx: RequestContext,
379    ) -> JsonRpcResponse {
380        // Inject the router as server capabilities for server-initiated requests
381        let ctx = ctx.with_server_capabilities(
382            Arc::new(self.clone()) as Arc<dyn turbomcp_core::ServerCapabilities>
383        );
384
385        match self.parse_params::<CallToolRequest>(&request) {
386            Ok(call_request) => {
387                let tool_name = &call_request.name;
388
389                if let Some(handler) = self.registry.get_tool(tool_name) {
390                    // RBAC: if handler metadata enforces allowed roles, check RequestContext
391                    if self.config.validate_requests
392                        && let Some(required_roles) = handler.allowed_roles()
393                    {
394                        let has_role = ctx
395                            .metadata
396                            .get("auth")
397                            .and_then(|v| v.get("roles"))
398                            .and_then(|v| v.as_array())
399                            .is_some_and(|arr| {
400                                let user_set: std::collections::HashSet<String> = arr
401                                    .iter()
402                                    .filter_map(|v| {
403                                        v.as_str().map(std::string::ToString::to_string)
404                                    })
405                                    .collect();
406                                required_roles.iter().any(|r| user_set.contains(r))
407                            });
408                        if !has_role {
409                            return self.error_response(
410                                &request,
411                                ServerError::authentication(format!(
412                                    "Access denied for tool '{tool_name}'"
413                                )),
414                            );
415                        }
416                    }
417
418                    // Optional input validation using tool definition schema if present
419                    if self.config.validate_requests
420                        && let Some(arguments) = &call_request.arguments
421                    {
422                        // Best-effort shape check against ToolInput.properties/required
423                        let tool_def = handler.tool_definition();
424                        if let Some(props) = tool_def.input_schema.properties.as_ref() {
425                            // Build a JSON Schema object dynamically from ToolInput
426                            let mut schema = serde_json::json!({
427                                "type": "object",
428                                "properties": {},
429                                "additionalProperties": tool_def.input_schema.additional_properties.unwrap_or(true)
430                            });
431                            if let Some(obj) =
432                                schema.get_mut("properties").and_then(|v| v.as_object_mut())
433                            {
434                                for (k, v) in props {
435                                    obj.insert(k.clone(), v.clone());
436                                }
437                            }
438                            if let Some(required) = tool_def.input_schema.required.as_ref() {
439                                schema.as_object_mut().unwrap().insert(
440                                    "required".to_string(),
441                                    serde_json::Value::Array(
442                                        required
443                                            .iter()
444                                            .map(|s| serde_json::Value::String(s.clone()))
445                                            .collect(),
446                                    ),
447                                );
448                            }
449
450                            // Compile and validate
451                            if let Ok(compiled) = JSONSchema::options()
452                                .with_draft(Draft::Draft7)
453                                .compile(&schema)
454                            {
455                                let instance = serde_json::Value::Object(
456                                    arguments.clone().into_iter().collect(),
457                                );
458                                let mut error_messages: Vec<String> = Vec::new();
459                                if let Err(iter) = compiled.validate(&instance) {
460                                    for e in iter {
461                                        error_messages.push(format!("{}: {}", e.instance_path, e));
462                                    }
463                                }
464                                if !error_messages.is_empty() {
465                                    let joined = error_messages.join("; ");
466                                    let err = ServerError::routing_with_method(
467                                        format!("Argument validation failed: {joined}"),
468                                        "tools/call".to_string(),
469                                    );
470                                    return self.error_response(&request, err);
471                                }
472                            }
473                        }
474                    }
475                    match handler.handle(call_request, ctx).await {
476                        Ok(result) => self.success_response(&request, result),
477                        Err(e) => self.error_response(&request, e),
478                    }
479                } else {
480                    let error = ServerError::not_found(format!("Tool '{tool_name}'"));
481                    self.error_response(&request, error)
482                }
483            }
484            Err(e) => self.error_response(&request, e),
485        }
486    }
487
488    async fn handle_list_prompts(
489        &self,
490        request: JsonRpcRequest,
491        _ctx: RequestContext,
492    ) -> JsonRpcResponse {
493        let prompts = self.registry.get_prompt_definitions();
494        let result = ListPromptsResult {
495            prompts,
496            next_cursor: None,
497            _meta: None,
498        };
499        self.success_response(&request, result)
500    }
501
502    async fn handle_get_prompt(
503        &self,
504        request: JsonRpcRequest,
505        ctx: RequestContext,
506    ) -> JsonRpcResponse {
507        match self.parse_params::<GetPromptRequest>(&request) {
508            Ok(prompt_request) => {
509                let prompt_name = &prompt_request.name;
510
511                if let Some(handler) = self.registry.get_prompt(prompt_name) {
512                    match handler.handle(prompt_request, ctx).await {
513                        Ok(result) => self.success_response(&request, result),
514                        Err(e) => self.error_response(&request, e),
515                    }
516                } else {
517                    let error = ServerError::not_found(format!("Prompt '{prompt_name}'"));
518                    self.error_response(&request, error)
519                }
520            }
521            Err(e) => self.error_response(&request, e),
522        }
523    }
524
525    async fn handle_list_resources(
526        &self,
527        request: JsonRpcRequest,
528        _ctx: RequestContext,
529    ) -> JsonRpcResponse {
530        let resources = self.registry.get_resource_definitions();
531        let result = ListResourcesResult {
532            resources,
533            next_cursor: None,
534            _meta: None,
535        };
536        self.success_response(&request, result)
537    }
538
539    async fn handle_read_resource(
540        &self,
541        request: JsonRpcRequest,
542        ctx: RequestContext,
543    ) -> JsonRpcResponse {
544        match self.parse_params::<ReadResourceRequest>(&request) {
545            Ok(resource_request) => {
546                let resource_uri = &resource_request.uri;
547
548                // Find handler by matching URI pattern
549                for handler in &self.registry.resources {
550                    let resource_def = handler.value().resource_definition();
551                    if self.matches_uri_pattern(&resource_def.uri, resource_uri) {
552                        match handler.value().handle(resource_request, ctx).await {
553                            Ok(result) => return self.success_response(&request, result),
554                            Err(e) => return self.error_response(&request, e),
555                        }
556                    }
557                }
558
559                let error = ServerError::not_found(format!("Resource '{resource_uri}'"));
560                self.error_response(&request, error)
561            }
562            Err(e) => self.error_response(&request, e),
563        }
564    }
565
566    async fn handle_subscribe_resource(
567        &self,
568        request: JsonRpcRequest,
569        _ctx: RequestContext,
570    ) -> JsonRpcResponse {
571        match self.parse_params::<SubscribeRequest>(&request) {
572            Ok(sub) => {
573                let uri = sub.uri;
574                let new_count_ref = self
575                    .resource_subscriptions
576                    .entry(uri.clone())
577                    .and_modify(|c| *c += 1)
578                    .or_insert(1usize);
579                let new_count: usize = *new_count_ref;
580                tracing::debug!(uri = %uri, count = new_count, "resource subscribed");
581                self.success_response(&request, EmptyResult {})
582            }
583            Err(e) => self.error_response(&request, e),
584        }
585    }
586
587    async fn handle_unsubscribe_resource(
588        &self,
589        request: JsonRpcRequest,
590        _ctx: RequestContext,
591    ) -> JsonRpcResponse {
592        match self.parse_params::<UnsubscribeRequest>(&request) {
593            Ok(unsub) => {
594                let uri = unsub.uri;
595                if let Some(mut entry) = self.resource_subscriptions.get_mut(&uri) {
596                    let count = entry.value_mut();
597                    if *count > 0 {
598                        *count -= 1;
599                    }
600                    if *count == 0 {
601                        drop(entry);
602                        self.resource_subscriptions.remove(&uri);
603                    }
604                    tracing::debug!(uri = %uri, "resource unsubscribed");
605                }
606                self.success_response(&request, EmptyResult {})
607            }
608            Err(e) => self.error_response(&request, e),
609        }
610    }
611
612    async fn handle_set_log_level(
613        &self,
614        request: JsonRpcRequest,
615        ctx: RequestContext,
616    ) -> JsonRpcResponse {
617        match self.parse_params::<SetLevelRequest>(&request) {
618            Ok(level_request) => {
619                // Use first available logging handler
620                if let Some(handler_entry) = self.registry.logging.iter().next() {
621                    match handler_entry.value().handle(level_request, ctx).await {
622                        Ok(result) => self.success_response(&request, result),
623                        Err(e) => self.error_response(&request, e),
624                    }
625                } else {
626                    let error = ServerError::not_found("No logging handler available");
627                    self.error_response(&request, error)
628                }
629            }
630            Err(e) => self.error_response(&request, e),
631        }
632    }
633
634    async fn handle_create_message(
635        &self,
636        request: JsonRpcRequest,
637        ctx: RequestContext,
638    ) -> JsonRpcResponse {
639        match self.parse_params::<CreateMessageRequest>(&request) {
640            Ok(message_request) => {
641                // Use first available sampling handler
642                if let Some(handler_entry) = self.registry.sampling.iter().next() {
643                    match handler_entry.value().handle(message_request, ctx).await {
644                        Ok(result) => self.success_response(&request, result),
645                        Err(e) => self.error_response(&request, e),
646                    }
647                } else {
648                    let error = ServerError::not_found("No sampling handler available");
649                    self.error_response(&request, error)
650                }
651            }
652            Err(e) => self.error_response(&request, e),
653        }
654    }
655
656    async fn handle_list_roots(
657        &self,
658        request: JsonRpcRequest,
659        _ctx: RequestContext,
660    ) -> JsonRpcResponse {
661        // Get configured roots from registry
662        let roots = self.registry.get_roots();
663
664        // If no roots configured, provide OS-specific defaults
665        let roots = if roots.is_empty() {
666            let mut default_roots: Vec<Root> = Vec::new();
667            #[cfg(target_os = "linux")]
668            {
669                default_roots.push(Root {
670                    uri: "file:///".to_string(),
671                    name: Some("root".to_string()),
672                });
673            }
674            #[cfg(target_os = "macos")]
675            {
676                default_roots.push(Root {
677                    uri: "file:///".to_string(),
678                    name: Some("root".to_string()),
679                });
680                default_roots.push(Root {
681                    uri: "file:///Volumes".to_string(),
682                    name: Some("Volumes".to_string()),
683                });
684            }
685            #[cfg(target_os = "windows")]
686            {
687                // Common drive letters; clients can probe for availability
688                for drive in ['C', 'D', 'E', 'F', 'G', 'H'] {
689                    default_roots.push(Root {
690                        uri: format!("file:///{}:/", drive),
691                        name: Some(format!("{}:", drive)),
692                    });
693                }
694            }
695            default_roots
696        } else {
697            roots
698        };
699
700        let result = ListRootsResult { roots, _meta: None };
701        self.success_response(&request, result)
702    }
703
704    // ========================================================================
705    // Enhanced MCP Feature Handlers
706    // ========================================================================
707
708    async fn handle_elicitation(
709        &self,
710        request: JsonRpcRequest,
711        _ctx: RequestContext,
712    ) -> JsonRpcResponse {
713        match self.parse_params::<ElicitRequest>(&request) {
714            Ok(_elicit_request) => {
715                // Default elicitation handler - returns decline action
716                // This should be overridden by applications with proper elicitation handlers
717                let result = ElicitResult {
718                    action: turbomcp_protocol::types::ElicitationAction::Decline,
719                    content: None,
720                    _meta: Some(serde_json::json!({
721                        "message": "Elicitation not supported - no handler registered"
722                    })),
723                };
724                self.success_response(&request, result)
725            }
726            Err(e) => self.error_response(&request, e),
727        }
728    }
729
730    async fn handle_completion(
731        &self,
732        request: JsonRpcRequest,
733        _ctx: RequestContext,
734    ) -> JsonRpcResponse {
735        match self.parse_params::<CompleteRequestParams>(&request) {
736            Ok(_complete_request) => {
737                // Default completion handler - returns empty completions
738                // This should be overridden by applications with proper completion handlers
739                let result = CompletionResponse {
740                    values: Vec::new(),
741                    has_more: Some(false),
742                    total: Some(0),
743                };
744                self.success_response(&request, result)
745            }
746            Err(e) => self.error_response(&request, e),
747        }
748    }
749
750    async fn handle_list_resource_templates(
751        &self,
752        request: JsonRpcRequest,
753        _ctx: RequestContext,
754    ) -> JsonRpcResponse {
755        match self.parse_params::<ListResourceTemplatesRequest>(&request) {
756            Ok(_template_request) => {
757                // Default resource template handler - returns empty templates
758                // This should be overridden by applications with proper resource template handlers
759                let result = ListResourceTemplatesResult {
760                    resource_templates: Vec::new(),
761                    next_cursor: None,
762                    _meta: Some(serde_json::json!({
763                        "message": "Resource templates not supported - no handler registered"
764                    })),
765                };
766                self.success_response(&request, result)
767            }
768            Err(e) => self.error_response(&request, e),
769        }
770    }
771
772    async fn handle_ping(&self, request: JsonRpcRequest, _ctx: RequestContext) -> JsonRpcResponse {
773        match self.parse_params::<PingRequest>(&request) {
774            Ok(_ping_request) => {
775                // Default ping handler - basic health check response
776                let result = PingResult {
777                    _meta: Some(serde_json::json!({
778                        "status": "healthy",
779                        "timestamp": chrono::Utc::now().to_rfc3339(),
780                        "server": "turbomcp-server",
781                    })),
782                };
783                self.success_response(&request, result)
784            }
785            Err(e) => self.error_response(&request, e),
786        }
787    }
788
789    // ========================================================================
790    // Server-Initiated Request Methods (Bidirectional Communication)
791    // ========================================================================
792
793    /// Send an elicitation request to the client (server-initiated)
794    pub async fn send_elicitation_to_client(
795        &self,
796        request: ElicitRequest,
797        ctx: RequestContext,
798    ) -> ServerResult<ElicitResult> {
799        if let Some(dispatcher) = &self.server_request_dispatcher {
800            dispatcher.send_elicitation(request, ctx).await
801        } else {
802            Err(ServerError::Handler {
803                message: "Server request dispatcher not configured for bidirectional communication"
804                    .to_string(),
805                context: Some("elicitation".to_string()),
806            })
807        }
808    }
809
810    /// Send a ping request to the client (server-initiated)
811    pub async fn send_ping_to_client(
812        &self,
813        request: PingRequest,
814        ctx: RequestContext,
815    ) -> ServerResult<PingResult> {
816        if let Some(dispatcher) = &self.server_request_dispatcher {
817            dispatcher.send_ping(request, ctx).await
818        } else {
819            Err(ServerError::Handler {
820                message: "Server request dispatcher not configured for bidirectional communication"
821                    .to_string(),
822                context: Some("ping".to_string()),
823            })
824        }
825    }
826
827    /// Send a create message request to the client (server-initiated)
828    pub async fn send_create_message_to_client(
829        &self,
830        request: CreateMessageRequest,
831        ctx: RequestContext,
832    ) -> ServerResult<turbomcp_protocol::types::CreateMessageResult> {
833        if let Some(dispatcher) = &self.server_request_dispatcher {
834            dispatcher.send_create_message(request, ctx).await
835        } else {
836            Err(ServerError::Handler {
837                message: "Server request dispatcher not configured for bidirectional communication"
838                    .to_string(),
839                context: Some("create_message".to_string()),
840            })
841        }
842    }
843
844    /// Send a list roots request to the client (server-initiated)
845    pub async fn send_list_roots_to_client(
846        &self,
847        request: turbomcp_protocol::types::ListRootsRequest,
848        ctx: RequestContext,
849    ) -> ServerResult<ListRootsResult> {
850        if let Some(dispatcher) = &self.server_request_dispatcher {
851            dispatcher.send_list_roots(request, ctx).await
852        } else {
853            Err(ServerError::Handler {
854                message: "Server request dispatcher not configured for bidirectional communication"
855                    .to_string(),
856                context: Some("list_roots".to_string()),
857            })
858        }
859    }
860
861    // Helper methods
862
863    fn get_server_capabilities(&self) -> ServerCapabilities {
864        ServerCapabilities {
865            tools: if self.registry.tools.is_empty() {
866                None
867            } else {
868                Some(ToolsCapabilities::default())
869            },
870            prompts: if self.registry.prompts.is_empty() {
871                None
872            } else {
873                Some(PromptsCapabilities::default())
874            },
875            resources: if self.registry.resources.is_empty() {
876                None
877            } else {
878                Some(ResourcesCapabilities::default())
879            },
880            logging: if self.registry.logging.is_empty() {
881                None
882            } else {
883                Some(LoggingCapabilities)
884            },
885            completions: None, // Completion capabilities not enabled by default
886            experimental: None,
887        }
888    }
889
890    fn parse_params<T>(&self, request: &JsonRpcRequest) -> ServerResult<T>
891    where
892        T: serde::de::DeserializeOwned,
893    {
894        match &request.params {
895            Some(params) => serde_json::from_value(params.clone()).map_err(|e| {
896                ServerError::routing_with_method(
897                    format!("Invalid parameters: {e}"),
898                    request.method.clone(),
899                )
900            }),
901            None => Err(ServerError::routing_with_method(
902                "Missing required parameters".to_string(),
903                request.method.clone(),
904            )),
905        }
906    }
907
908    fn success_response<T>(&self, request: &JsonRpcRequest, result: T) -> JsonRpcResponse
909    where
910        T: serde::Serialize,
911    {
912        JsonRpcResponse::success(serde_json::to_value(result).unwrap(), request.id.clone())
913    }
914
915    fn error_response(&self, request: &JsonRpcRequest, error: ServerError) -> JsonRpcResponse {
916        JsonRpcResponse::error_response(
917            turbomcp_protocol::jsonrpc::JsonRpcError {
918                code: error.error_code(),
919                message: error.to_string(),
920                data: None,
921            },
922            request.id.clone(),
923        )
924    }
925
926    fn method_not_found_response(&self, request: &JsonRpcRequest) -> JsonRpcResponse {
927        JsonRpcResponse::error_response(
928            turbomcp_protocol::jsonrpc::JsonRpcError {
929                code: -32601,
930                message: format!("Method '{}' not found", request.method),
931                data: None,
932            },
933            request.id.clone(),
934        )
935    }
936
937    fn validate_request(&self, _request: &JsonRpcRequest) -> ServerResult<()> {
938        // Lightweight structural validation using protocol validator
939        let validator = turbomcp_protocol::validation::ProtocolValidator::new();
940        match validator.validate_request(_request) {
941            turbomcp_protocol::validation::ValidationResult::Invalid(errors) => {
942                let msg = errors
943                    .into_iter()
944                    .map(|e| {
945                        format!(
946                            "{}: {}{}",
947                            e.code,
948                            e.message,
949                            e.field_path
950                                .map(|p| format!(" (@ {p})"))
951                                .unwrap_or_default()
952                        )
953                    })
954                    .collect::<Vec<_>>()
955                    .join("; ");
956                Err(ServerError::routing_with_method(
957                    format!("Request validation failed: {msg}"),
958                    _request.method.clone(),
959                ))
960            }
961            _ => Ok(()),
962        }
963    }
964
965    fn validate_response(&self, _response: &JsonRpcResponse) -> ServerResult<()> {
966        let validator = turbomcp_protocol::validation::ProtocolValidator::new();
967        match validator.validate_response(_response) {
968            turbomcp_protocol::validation::ValidationResult::Invalid(errors) => {
969                let msg = errors
970                    .into_iter()
971                    .map(|e| {
972                        format!(
973                            "{}: {}{}",
974                            e.code,
975                            e.message,
976                            e.field_path
977                                .map(|p| format!(" (@ {p})"))
978                                .unwrap_or_default()
979                        )
980                    })
981                    .collect::<Vec<_>>()
982                    .join("; ");
983                Err(ServerError::routing(format!(
984                    "Response validation failed: {msg}"
985                )))
986            }
987            _ => Ok(()),
988        }
989    }
990
991    fn matches_uri_pattern(&self, pattern: &str, uri: &str) -> bool {
992        // Convert simple templates to regex (very basic):
993        // - '*' => '.*'
994        // - '{param}' => '[^/]+'
995        let mut regex_str = String::from("^");
996        let mut chars = pattern.chars().peekable();
997        while let Some(c) = chars.next() {
998            match c {
999                '*' => regex_str.push_str(".*"),
1000                '{' => {
1001                    // consume until '}'
1002                    for nc in chars.by_ref() {
1003                        if nc == '}' {
1004                            break;
1005                        }
1006                    }
1007                    regex_str.push_str("[^/]+");
1008                }
1009                '.' | '+' | '?' | '(' | ')' | '|' | '^' | '$' | '[' | ']' | '\\' => {
1010                    regex_str.push('\\');
1011                    regex_str.push(c);
1012                }
1013                other => regex_str.push(other),
1014            }
1015        }
1016        regex_str.push('$');
1017        let re = regex::Regex::new(&regex_str).unwrap_or_else(|_| regex::Regex::new("^$").unwrap());
1018        re.is_match(uri)
1019    }
1020}
1021
1022impl Clone for RequestRouter {
1023    fn clone(&self) -> Self {
1024        Self {
1025            registry: Arc::clone(&self.registry),
1026            config: self.config.clone(),
1027            custom_routes: self.custom_routes.clone(),
1028            resource_subscriptions: DashMap::new(),
1029            server_request_dispatcher: self.server_request_dispatcher.clone(),
1030        }
1031    }
1032}
1033
1034// Implementation of ServerCapabilities for RequestRouter
1035// This enables tools to make server-initiated requests through the context
1036impl turbomcp_core::ServerCapabilities for RequestRouter {
1037    fn create_message(
1038        &self,
1039        request: serde_json::Value,
1040    ) -> futures::future::BoxFuture<
1041        '_,
1042        Result<serde_json::Value, Box<dyn std::error::Error + Send + Sync>>,
1043    > {
1044        Box::pin(async move {
1045            let request: CreateMessageRequest = serde_json::from_value(request)
1046                .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
1047            let ctx = turbomcp_core::RequestContext::new();
1048            let result = self
1049                .send_create_message_to_client(request, ctx)
1050                .await
1051                .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
1052            serde_json::to_value(result)
1053                .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
1054        })
1055    }
1056
1057    fn elicit(
1058        &self,
1059        request: serde_json::Value,
1060    ) -> futures::future::BoxFuture<
1061        '_,
1062        Result<serde_json::Value, Box<dyn std::error::Error + Send + Sync>>,
1063    > {
1064        Box::pin(async move {
1065            let request: ElicitRequest = serde_json::from_value(request)
1066                .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
1067            let ctx = turbomcp_core::RequestContext::new();
1068            let result = self
1069                .send_elicitation_to_client(request, ctx)
1070                .await
1071                .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
1072            serde_json::to_value(result)
1073                .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
1074        })
1075    }
1076
1077    fn list_roots(
1078        &self,
1079    ) -> futures::future::BoxFuture<
1080        '_,
1081        Result<serde_json::Value, Box<dyn std::error::Error + Send + Sync>>,
1082    > {
1083        Box::pin(async move {
1084            let ctx = turbomcp_core::RequestContext::new();
1085            let result = self
1086                .send_list_roots_to_client(turbomcp_protocol::types::ListRootsRequest {}, ctx)
1087                .await
1088                .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
1089            serde_json::to_value(result)
1090                .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
1091        })
1092    }
1093}
1094
1095/// Route definition for custom routing
1096#[derive(Clone)]
1097pub struct Route {
1098    /// Route method pattern
1099    pub method: String,
1100    /// Route handler
1101    pub handler: Arc<dyn RouteHandler>,
1102    /// Route metadata
1103    pub metadata: RouteMetadata,
1104}
1105
1106impl std::fmt::Debug for Route {
1107    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1108        f.debug_struct("Route")
1109            .field("method", &self.method)
1110            .field("metadata", &self.metadata)
1111            .finish()
1112    }
1113}
1114
1115/// Router alias for convenience
1116pub type Router = RequestRouter;