1use dashmap::DashMap;
4use std::collections::HashMap;
5use std::sync::Arc;
6use turbomcp_core::RequestContext;
7use turbomcp_protocol::{
8 jsonrpc::{JsonRpcRequest, JsonRpcResponse, JsonRpcVersion},
9 types::{
10 CallToolRequest,
11 CompleteRequestParams,
12 CompletionResponse,
13 CreateMessageRequest,
14 ElicitRequest,
16 ElicitResult,
17 EmptyResult,
18 GetPromptRequest,
19 Implementation,
20 InitializeRequest,
21 InitializeResult,
22 ListPromptsResult,
23 ListResourceTemplatesRequest,
24 ListResourceTemplatesResult,
25 ListResourcesResult,
26 ListRootsResult,
27 ListToolsResult,
28 LoggingCapabilities,
29 PingRequest,
30 PingResult,
31 PromptsCapabilities,
32 ReadResourceRequest,
33 ResourcesCapabilities,
34 Root,
35 ServerCapabilities,
36 SetLevelRequest,
37 SubscribeRequest,
38 ToolsCapabilities,
39 UnsubscribeRequest,
40 },
41};
42
43use crate::registry::HandlerRegistry;
44use crate::{ServerError, ServerResult};
45use futures::stream::{self, StreamExt};
46use jsonschema::{Draft, JSONSchema};
47
48pub struct RequestRouter {
50 registry: Arc<HandlerRegistry>,
52 config: RouterConfig,
54 custom_routes: HashMap<String, Arc<dyn RouteHandler>>,
56 resource_subscriptions: DashMap<String, usize>,
58 server_request_dispatcher: Option<Arc<dyn ServerRequestDispatcher>>,
60}
61
62impl std::fmt::Debug for RequestRouter {
63 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64 f.debug_struct("RequestRouter")
65 .field("config", &self.config)
66 .field("custom_routes_count", &self.custom_routes.len())
67 .finish()
68 }
69}
70
71#[derive(Debug, Clone)]
73pub struct RouterConfig {
74 pub validate_requests: bool,
76 pub validate_responses: bool,
78 pub default_timeout_ms: u64,
80 pub enable_tracing: bool,
82 pub max_concurrent_requests: usize,
84 pub enable_bidirectional: bool,
86}
87
88impl Default for RouterConfig {
89 fn default() -> Self {
90 Self {
91 validate_requests: true,
92 validate_responses: true,
93 default_timeout_ms: 30_000,
94 enable_tracing: true,
95 max_concurrent_requests: 1000,
96 enable_bidirectional: true,
97 }
98 }
99}
100
101#[async_trait::async_trait]
103pub trait ServerRequestDispatcher: Send + Sync {
104 async fn send_elicitation(
106 &self,
107 request: ElicitRequest,
108 ctx: RequestContext,
109 ) -> ServerResult<ElicitResult>;
110
111 async fn send_ping(
113 &self,
114 request: PingRequest,
115 ctx: RequestContext,
116 ) -> ServerResult<PingResult>;
117
118 async fn send_create_message(
120 &self,
121 request: CreateMessageRequest,
122 ctx: RequestContext,
123 ) -> ServerResult<turbomcp_protocol::types::CreateMessageResult>;
124
125 async fn send_list_roots(
127 &self,
128 request: turbomcp_protocol::types::ListRootsRequest,
129 ctx: RequestContext,
130 ) -> ServerResult<ListRootsResult>;
131
132 fn supports_bidirectional(&self) -> bool;
134
135 async fn get_client_capabilities(&self) -> ServerResult<Option<serde_json::Value>>;
137}
138
139#[async_trait::async_trait]
141pub trait RouteHandler: Send + Sync {
142 async fn handle(
144 &self,
145 request: JsonRpcRequest,
146 ctx: RequestContext,
147 ) -> ServerResult<JsonRpcResponse>;
148
149 fn can_handle(&self, method: &str) -> bool;
151
152 fn metadata(&self) -> RouteMetadata {
154 RouteMetadata::default()
155 }
156}
157
158#[derive(Debug, Clone)]
160pub struct RouteMetadata {
161 pub name: String,
163 pub description: Option<String>,
165 pub version: String,
167 pub methods: Vec<String>,
169 pub tags: Vec<String>,
171}
172
173impl Default for RouteMetadata {
174 fn default() -> Self {
175 Self {
176 name: "unknown".to_string(),
177 description: None,
178 version: "1.0.0".to_string(),
179 methods: Vec::new(),
180 tags: Vec::new(),
181 }
182 }
183}
184
185impl RequestRouter {
186 #[must_use]
188 pub fn new(registry: Arc<HandlerRegistry>) -> Self {
189 Self {
190 registry,
191 config: RouterConfig::default(),
192 custom_routes: HashMap::new(),
193 resource_subscriptions: DashMap::new(),
194 server_request_dispatcher: None,
195 }
196 }
197
198 #[must_use]
200 pub fn with_config(registry: Arc<HandlerRegistry>, config: RouterConfig) -> Self {
201 Self {
202 registry,
203 config,
204 custom_routes: HashMap::new(),
205 resource_subscriptions: DashMap::new(),
206 server_request_dispatcher: None,
207 }
208 }
209
210 pub fn set_server_request_dispatcher<D>(&mut self, dispatcher: D)
212 where
213 D: ServerRequestDispatcher + 'static,
214 {
215 self.server_request_dispatcher = Some(Arc::new(dispatcher));
216 }
217
218 pub fn get_server_request_dispatcher(&self) -> Option<&Arc<dyn ServerRequestDispatcher>> {
220 self.server_request_dispatcher.as_ref()
221 }
222
223 pub fn supports_bidirectional(&self) -> bool {
225 self.config.enable_bidirectional && self.server_request_dispatcher.is_some()
226 }
227
228 pub fn add_route<H>(&mut self, handler: H) -> ServerResult<()>
230 where
231 H: RouteHandler + 'static,
232 {
233 let metadata = handler.metadata();
234 let handler_arc: Arc<dyn RouteHandler> = Arc::new(handler);
235
236 for method in &metadata.methods {
237 if self.custom_routes.contains_key(method) {
238 return Err(ServerError::routing_with_method(
239 format!("Route for method '{method}' already exists"),
240 method.clone(),
241 ));
242 }
243 self.custom_routes
244 .insert(method.clone(), Arc::clone(&handler_arc));
245 }
246
247 Ok(())
248 }
249
250 pub async fn route(&self, request: JsonRpcRequest, ctx: RequestContext) -> JsonRpcResponse {
252 if self.config.validate_requests
254 && let Err(e) = self.validate_request(&request)
255 {
256 return self.error_response(&request, e);
257 }
258
259 let result = match request.method.as_str() {
261 "initialize" => self.handle_initialize(request, ctx).await,
263
264 "tools/list" => self.handle_list_tools(request, ctx).await,
266 "tools/call" => self.handle_call_tool(request, ctx).await,
267
268 "prompts/list" => self.handle_list_prompts(request, ctx).await,
270 "prompts/get" => self.handle_get_prompt(request, ctx).await,
271
272 "resources/list" => self.handle_list_resources(request, ctx).await,
274 "resources/read" => self.handle_read_resource(request, ctx).await,
275 "resources/subscribe" => self.handle_subscribe_resource(request, ctx).await,
276 "resources/unsubscribe" => self.handle_unsubscribe_resource(request, ctx).await,
277
278 "logging/setLevel" => self.handle_set_log_level(request, ctx).await,
280
281 "sampling/createMessage" => self.handle_create_message(request, ctx).await,
283
284 "roots/list" => self.handle_list_roots(request, ctx).await,
286
287 "elicit/request" => self.handle_elicitation(request, ctx).await,
289 "complete/request" => self.handle_completion(request, ctx).await,
290 "resources/templates/list" => self.handle_list_resource_templates(request, ctx).await,
291 "ping/request" => self.handle_ping(request, ctx).await,
292
293 method => {
295 if let Some(handler) = self.custom_routes.get(method) {
296 let request_clone = request.clone();
297 handler
298 .handle(request, ctx)
299 .await
300 .unwrap_or_else(|e| self.error_response(&request_clone, e))
301 } else {
302 self.method_not_found_response(&request)
303 }
304 }
305 };
306
307 if self.config.validate_responses
309 && let Err(e) = self.validate_response(&result)
310 {
311 tracing::warn!("Response validation failed: {}", e);
312 }
313
314 result
315 }
316
317 pub async fn route_batch(
319 &self,
320 requests: Vec<JsonRpcRequest>,
321 ctx: RequestContext,
322 ) -> Vec<JsonRpcResponse> {
323 let max_in_flight = self.config.max_concurrent_requests.max(1);
324 stream::iter(requests.into_iter())
325 .map(|req| {
326 let ctx_cloned = ctx.clone();
327 async move { self.route(req, ctx_cloned).await }
328 })
329 .buffer_unordered(max_in_flight)
330 .collect()
331 .await
332 }
333
334 async fn handle_initialize(
337 &self,
338 request: JsonRpcRequest,
339 _ctx: RequestContext,
340 ) -> JsonRpcResponse {
341 match self.parse_params::<InitializeRequest>(&request) {
342 Ok(_init_request) => {
343 let result = InitializeResult {
344 protocol_version: turbomcp_protocol::PROTOCOL_VERSION.to_string(),
345 server_info: Implementation {
346 name: crate::SERVER_NAME.to_string(),
347 title: Some("TurboMCP Server".to_string()),
348 version: crate::SERVER_VERSION.to_string(),
349 },
350 capabilities: self.get_server_capabilities(),
351 instructions: None,
352 };
353
354 self.success_response(&request, result)
355 }
356 Err(e) => self.error_response(&request, e),
357 }
358 }
359
360 async fn handle_list_tools(
361 &self,
362 request: JsonRpcRequest,
363 _ctx: RequestContext,
364 ) -> JsonRpcResponse {
365 let tools = self.registry.get_tool_definitions();
366 let result = ListToolsResult {
367 tools,
368 next_cursor: None,
369 };
370 self.success_response(&request, result)
371 }
372
373 async fn handle_call_tool(
374 &self,
375 request: JsonRpcRequest,
376 ctx: RequestContext,
377 ) -> JsonRpcResponse {
378 let ctx = ctx.with_server_capabilities(
380 Arc::new(self.clone()) as Arc<dyn turbomcp_core::ServerCapabilities>
381 );
382
383 match self.parse_params::<CallToolRequest>(&request) {
384 Ok(call_request) => {
385 let tool_name = &call_request.name;
386
387 if let Some(handler) = self.registry.get_tool(tool_name) {
388 if self.config.validate_requests
390 && let Some(required_roles) = handler.allowed_roles()
391 {
392 let has_role = ctx
393 .metadata
394 .get("auth")
395 .and_then(|v| v.get("roles"))
396 .and_then(|v| v.as_array())
397 .is_some_and(|arr| {
398 let user_set: std::collections::HashSet<String> = arr
399 .iter()
400 .filter_map(|v| {
401 v.as_str().map(std::string::ToString::to_string)
402 })
403 .collect();
404 required_roles.iter().any(|r| user_set.contains(r))
405 });
406 if !has_role {
407 return self.error_response(
408 &request,
409 ServerError::authentication(format!(
410 "Access denied for tool '{tool_name}'"
411 )),
412 );
413 }
414 }
415
416 if self.config.validate_requests
418 && let Some(arguments) = &call_request.arguments
419 {
420 let tool_def = handler.tool_definition();
422 if let Some(props) = tool_def.input_schema.properties.as_ref() {
423 let mut schema = serde_json::json!({
425 "type": "object",
426 "properties": {},
427 "additionalProperties": tool_def.input_schema.additional_properties.unwrap_or(true)
428 });
429 if let Some(obj) =
430 schema.get_mut("properties").and_then(|v| v.as_object_mut())
431 {
432 for (k, v) in props {
433 obj.insert(k.clone(), v.clone());
434 }
435 }
436 if let Some(required) = tool_def.input_schema.required.as_ref() {
437 schema.as_object_mut().unwrap().insert(
438 "required".to_string(),
439 serde_json::Value::Array(
440 required
441 .iter()
442 .map(|s| serde_json::Value::String(s.clone()))
443 .collect(),
444 ),
445 );
446 }
447
448 if let Ok(compiled) = JSONSchema::options()
450 .with_draft(Draft::Draft7)
451 .compile(&schema)
452 {
453 let instance = serde_json::Value::Object(
454 arguments.clone().into_iter().collect(),
455 );
456 let mut error_messages: Vec<String> = Vec::new();
457 if let Err(iter) = compiled.validate(&instance) {
458 for e in iter {
459 error_messages.push(format!("{}: {}", e.instance_path, e));
460 }
461 }
462 if !error_messages.is_empty() {
463 let joined = error_messages.join("; ");
464 let err = ServerError::routing_with_method(
465 format!("Argument validation failed: {joined}"),
466 "tools/call".to_string(),
467 );
468 return self.error_response(&request, err);
469 }
470 }
471 }
472 }
473 match handler.handle(call_request, ctx).await {
474 Ok(result) => self.success_response(&request, result),
475 Err(e) => self.error_response(&request, e),
476 }
477 } else {
478 let error = ServerError::not_found(format!("Tool '{tool_name}'"));
479 self.error_response(&request, error)
480 }
481 }
482 Err(e) => self.error_response(&request, e),
483 }
484 }
485
486 async fn handle_list_prompts(
487 &self,
488 request: JsonRpcRequest,
489 _ctx: RequestContext,
490 ) -> JsonRpcResponse {
491 let prompts = self.registry.get_prompt_definitions();
492 let result = ListPromptsResult {
493 prompts,
494 next_cursor: None,
495 };
496 self.success_response(&request, result)
497 }
498
499 async fn handle_get_prompt(
500 &self,
501 request: JsonRpcRequest,
502 ctx: RequestContext,
503 ) -> JsonRpcResponse {
504 match self.parse_params::<GetPromptRequest>(&request) {
505 Ok(prompt_request) => {
506 let prompt_name = &prompt_request.name;
507
508 if let Some(handler) = self.registry.get_prompt(prompt_name) {
509 match handler.handle(prompt_request, ctx).await {
510 Ok(result) => self.success_response(&request, result),
511 Err(e) => self.error_response(&request, e),
512 }
513 } else {
514 let error = ServerError::not_found(format!("Prompt '{prompt_name}'"));
515 self.error_response(&request, error)
516 }
517 }
518 Err(e) => self.error_response(&request, e),
519 }
520 }
521
522 async fn handle_list_resources(
523 &self,
524 request: JsonRpcRequest,
525 _ctx: RequestContext,
526 ) -> JsonRpcResponse {
527 let resources = self.registry.get_resource_definitions();
528 let result = ListResourcesResult {
529 resources,
530 next_cursor: None,
531 };
532 self.success_response(&request, result)
533 }
534
535 async fn handle_read_resource(
536 &self,
537 request: JsonRpcRequest,
538 ctx: RequestContext,
539 ) -> JsonRpcResponse {
540 match self.parse_params::<ReadResourceRequest>(&request) {
541 Ok(resource_request) => {
542 let resource_uri = &resource_request.uri;
543
544 for handler in &self.registry.resources {
546 let resource_def = handler.value().resource_definition();
547 if self.matches_uri_pattern(&resource_def.uri, resource_uri) {
548 match handler.value().handle(resource_request, ctx).await {
549 Ok(result) => return self.success_response(&request, result),
550 Err(e) => return self.error_response(&request, e),
551 }
552 }
553 }
554
555 let error = ServerError::not_found(format!("Resource '{resource_uri}'"));
556 self.error_response(&request, error)
557 }
558 Err(e) => self.error_response(&request, e),
559 }
560 }
561
562 async fn handle_subscribe_resource(
563 &self,
564 request: JsonRpcRequest,
565 _ctx: RequestContext,
566 ) -> JsonRpcResponse {
567 match self.parse_params::<SubscribeRequest>(&request) {
568 Ok(sub) => {
569 let uri = sub.uri;
570 let new_count_ref = self
571 .resource_subscriptions
572 .entry(uri.clone())
573 .and_modify(|c| *c += 1)
574 .or_insert(1usize);
575 let new_count: usize = *new_count_ref;
576 tracing::debug!(uri = %uri, count = new_count, "resource subscribed");
577 self.success_response(&request, EmptyResult {})
578 }
579 Err(e) => self.error_response(&request, e),
580 }
581 }
582
583 async fn handle_unsubscribe_resource(
584 &self,
585 request: JsonRpcRequest,
586 _ctx: RequestContext,
587 ) -> JsonRpcResponse {
588 match self.parse_params::<UnsubscribeRequest>(&request) {
589 Ok(unsub) => {
590 let uri = unsub.uri;
591 if let Some(mut entry) = self.resource_subscriptions.get_mut(&uri) {
592 let count = entry.value_mut();
593 if *count > 0 {
594 *count -= 1;
595 }
596 if *count == 0 {
597 drop(entry);
598 self.resource_subscriptions.remove(&uri);
599 }
600 tracing::debug!(uri = %uri, "resource unsubscribed");
601 }
602 self.success_response(&request, EmptyResult {})
603 }
604 Err(e) => self.error_response(&request, e),
605 }
606 }
607
608 async fn handle_set_log_level(
609 &self,
610 request: JsonRpcRequest,
611 ctx: RequestContext,
612 ) -> JsonRpcResponse {
613 match self.parse_params::<SetLevelRequest>(&request) {
614 Ok(level_request) => {
615 if let Some(handler_entry) = self.registry.logging.iter().next() {
617 match handler_entry.value().handle(level_request, ctx).await {
618 Ok(result) => self.success_response(&request, result),
619 Err(e) => self.error_response(&request, e),
620 }
621 } else {
622 let error = ServerError::not_found("No logging handler available");
623 self.error_response(&request, error)
624 }
625 }
626 Err(e) => self.error_response(&request, e),
627 }
628 }
629
630 async fn handle_create_message(
631 &self,
632 request: JsonRpcRequest,
633 ctx: RequestContext,
634 ) -> JsonRpcResponse {
635 match self.parse_params::<CreateMessageRequest>(&request) {
636 Ok(message_request) => {
637 if let Some(handler_entry) = self.registry.sampling.iter().next() {
639 match handler_entry.value().handle(message_request, ctx).await {
640 Ok(result) => self.success_response(&request, result),
641 Err(e) => self.error_response(&request, e),
642 }
643 } else {
644 let error = ServerError::not_found("No sampling handler available");
645 self.error_response(&request, error)
646 }
647 }
648 Err(e) => self.error_response(&request, e),
649 }
650 }
651
652 async fn handle_list_roots(
653 &self,
654 request: JsonRpcRequest,
655 _ctx: RequestContext,
656 ) -> JsonRpcResponse {
657 let roots = self.registry.get_roots();
659
660 let roots = if roots.is_empty() {
662 let mut default_roots: Vec<Root> = Vec::new();
663 #[cfg(target_os = "linux")]
664 {
665 default_roots.push(Root {
666 uri: "file:///".to_string(),
667 name: Some("root".to_string()),
668 });
669 }
670 #[cfg(target_os = "macos")]
671 {
672 default_roots.push(Root {
673 uri: "file:///".to_string(),
674 name: Some("root".to_string()),
675 });
676 default_roots.push(Root {
677 uri: "file:///Volumes".to_string(),
678 name: Some("Volumes".to_string()),
679 });
680 }
681 #[cfg(target_os = "windows")]
682 {
683 for drive in ['C', 'D', 'E', 'F', 'G', 'H'] {
685 default_roots.push(Root {
686 uri: format!("file:///{}:/", drive),
687 name: Some(format!("{}:", drive)),
688 });
689 }
690 }
691 default_roots
692 } else {
693 roots
694 };
695
696 let result = ListRootsResult { roots };
697 self.success_response(&request, result)
698 }
699
700 async fn handle_elicitation(
705 &self,
706 request: JsonRpcRequest,
707 _ctx: RequestContext,
708 ) -> JsonRpcResponse {
709 match self.parse_params::<ElicitRequest>(&request) {
710 Ok(_elicit_request) => {
711 let result = ElicitResult {
714 action: turbomcp_protocol::types::ElicitationAction::Decline,
715 content: None,
716 _meta: Some(serde_json::json!({
717 "message": "Elicitation not supported - no handler registered"
718 })),
719 };
720 self.success_response(&request, result)
721 }
722 Err(e) => self.error_response(&request, e),
723 }
724 }
725
726 async fn handle_completion(
727 &self,
728 request: JsonRpcRequest,
729 _ctx: RequestContext,
730 ) -> JsonRpcResponse {
731 match self.parse_params::<CompleteRequestParams>(&request) {
732 Ok(_complete_request) => {
733 let result = CompletionResponse {
736 values: Vec::new(),
737 has_more: Some(false),
738 total: Some(0),
739 };
740 self.success_response(&request, result)
741 }
742 Err(e) => self.error_response(&request, e),
743 }
744 }
745
746 async fn handle_list_resource_templates(
747 &self,
748 request: JsonRpcRequest,
749 _ctx: RequestContext,
750 ) -> JsonRpcResponse {
751 match self.parse_params::<ListResourceTemplatesRequest>(&request) {
752 Ok(_template_request) => {
753 let result = ListResourceTemplatesResult {
756 resource_templates: Vec::new(),
757 next_cursor: None,
758 _meta: Some(serde_json::json!({
759 "message": "Resource templates not supported - no handler registered"
760 })),
761 };
762 self.success_response(&request, result)
763 }
764 Err(e) => self.error_response(&request, e),
765 }
766 }
767
768 async fn handle_ping(&self, request: JsonRpcRequest, _ctx: RequestContext) -> JsonRpcResponse {
769 match self.parse_params::<PingRequest>(&request) {
770 Ok(_ping_request) => {
771 let result = PingResult {
773 _meta: Some(serde_json::json!({
774 "status": "healthy",
775 "timestamp": chrono::Utc::now().to_rfc3339(),
776 "server": "turbomcp-server",
777 })),
778 };
779 self.success_response(&request, result)
780 }
781 Err(e) => self.error_response(&request, e),
782 }
783 }
784
785 pub async fn send_elicitation_to_client(
791 &self,
792 request: ElicitRequest,
793 ctx: RequestContext,
794 ) -> ServerResult<ElicitResult> {
795 if let Some(dispatcher) = &self.server_request_dispatcher {
796 dispatcher.send_elicitation(request, ctx).await
797 } else {
798 Err(ServerError::Handler {
799 message: "Server request dispatcher not configured for bidirectional communication"
800 .to_string(),
801 context: Some("elicitation".to_string()),
802 })
803 }
804 }
805
806 pub async fn send_ping_to_client(
808 &self,
809 request: PingRequest,
810 ctx: RequestContext,
811 ) -> ServerResult<PingResult> {
812 if let Some(dispatcher) = &self.server_request_dispatcher {
813 dispatcher.send_ping(request, ctx).await
814 } else {
815 Err(ServerError::Handler {
816 message: "Server request dispatcher not configured for bidirectional communication"
817 .to_string(),
818 context: Some("ping".to_string()),
819 })
820 }
821 }
822
823 pub async fn send_create_message_to_client(
825 &self,
826 request: CreateMessageRequest,
827 ctx: RequestContext,
828 ) -> ServerResult<turbomcp_protocol::types::CreateMessageResult> {
829 if let Some(dispatcher) = &self.server_request_dispatcher {
830 dispatcher.send_create_message(request, ctx).await
831 } else {
832 Err(ServerError::Handler {
833 message: "Server request dispatcher not configured for bidirectional communication"
834 .to_string(),
835 context: Some("create_message".to_string()),
836 })
837 }
838 }
839
840 pub async fn send_list_roots_to_client(
842 &self,
843 request: turbomcp_protocol::types::ListRootsRequest,
844 ctx: RequestContext,
845 ) -> ServerResult<ListRootsResult> {
846 if let Some(dispatcher) = &self.server_request_dispatcher {
847 dispatcher.send_list_roots(request, ctx).await
848 } else {
849 Err(ServerError::Handler {
850 message: "Server request dispatcher not configured for bidirectional communication"
851 .to_string(),
852 context: Some("list_roots".to_string()),
853 })
854 }
855 }
856
857 fn get_server_capabilities(&self) -> ServerCapabilities {
860 ServerCapabilities {
861 tools: if self.registry.tools.is_empty() {
862 None
863 } else {
864 Some(ToolsCapabilities::default())
865 },
866 prompts: if self.registry.prompts.is_empty() {
867 None
868 } else {
869 Some(PromptsCapabilities::default())
870 },
871 resources: if self.registry.resources.is_empty() {
872 None
873 } else {
874 Some(ResourcesCapabilities::default())
875 },
876 logging: if self.registry.logging.is_empty() {
877 None
878 } else {
879 Some(LoggingCapabilities)
880 },
881 completions: None, experimental: None,
883 }
884 }
885
886 fn parse_params<T>(&self, request: &JsonRpcRequest) -> ServerResult<T>
887 where
888 T: serde::de::DeserializeOwned,
889 {
890 match &request.params {
891 Some(params) => serde_json::from_value(params.clone()).map_err(|e| {
892 ServerError::routing_with_method(
893 format!("Invalid parameters: {e}"),
894 request.method.clone(),
895 )
896 }),
897 None => Err(ServerError::routing_with_method(
898 "Missing required parameters".to_string(),
899 request.method.clone(),
900 )),
901 }
902 }
903
904 fn success_response<T>(&self, request: &JsonRpcRequest, result: T) -> JsonRpcResponse
905 where
906 T: serde::Serialize,
907 {
908 JsonRpcResponse {
909 jsonrpc: JsonRpcVersion,
910 id: Some(request.id.clone()),
911 result: Some(serde_json::to_value(result).unwrap()),
912 error: None,
913 }
914 }
915
916 fn error_response(&self, request: &JsonRpcRequest, error: ServerError) -> JsonRpcResponse {
917 JsonRpcResponse {
918 jsonrpc: JsonRpcVersion,
919 id: Some(request.id.clone()),
920 result: None,
921 error: Some(turbomcp_protocol::jsonrpc::JsonRpcError {
922 code: error.error_code(),
923 message: error.to_string(),
924 data: None,
925 }),
926 }
927 }
928
929 fn method_not_found_response(&self, request: &JsonRpcRequest) -> JsonRpcResponse {
930 JsonRpcResponse {
931 jsonrpc: JsonRpcVersion,
932 id: Some(request.id.clone()),
933 result: None,
934 error: Some(turbomcp_protocol::jsonrpc::JsonRpcError {
935 code: -32601,
936 message: format!("Method '{}' not found", request.method),
937 data: None,
938 }),
939 }
940 }
941
942 fn validate_request(&self, _request: &JsonRpcRequest) -> ServerResult<()> {
943 let validator = turbomcp_protocol::validation::ProtocolValidator::new();
945 match validator.validate_request(_request) {
946 turbomcp_protocol::validation::ValidationResult::Invalid(errors) => {
947 let msg = errors
948 .into_iter()
949 .map(|e| {
950 format!(
951 "{}: {}{}",
952 e.code,
953 e.message,
954 e.field_path
955 .map(|p| format!(" (@ {p})"))
956 .unwrap_or_default()
957 )
958 })
959 .collect::<Vec<_>>()
960 .join("; ");
961 Err(ServerError::routing_with_method(
962 format!("Request validation failed: {msg}"),
963 _request.method.clone(),
964 ))
965 }
966 _ => Ok(()),
967 }
968 }
969
970 fn validate_response(&self, _response: &JsonRpcResponse) -> ServerResult<()> {
971 let validator = turbomcp_protocol::validation::ProtocolValidator::new();
972 match validator.validate_response(_response) {
973 turbomcp_protocol::validation::ValidationResult::Invalid(errors) => {
974 let msg = errors
975 .into_iter()
976 .map(|e| {
977 format!(
978 "{}: {}{}",
979 e.code,
980 e.message,
981 e.field_path
982 .map(|p| format!(" (@ {p})"))
983 .unwrap_or_default()
984 )
985 })
986 .collect::<Vec<_>>()
987 .join("; ");
988 Err(ServerError::routing(format!(
989 "Response validation failed: {msg}"
990 )))
991 }
992 _ => Ok(()),
993 }
994 }
995
996 fn matches_uri_pattern(&self, pattern: &str, uri: &str) -> bool {
997 let mut regex_str = String::from("^");
1001 let mut chars = pattern.chars().peekable();
1002 while let Some(c) = chars.next() {
1003 match c {
1004 '*' => regex_str.push_str(".*"),
1005 '{' => {
1006 for nc in chars.by_ref() {
1008 if nc == '}' {
1009 break;
1010 }
1011 }
1012 regex_str.push_str("[^/]+");
1013 }
1014 '.' | '+' | '?' | '(' | ')' | '|' | '^' | '$' | '[' | ']' | '\\' => {
1015 regex_str.push('\\');
1016 regex_str.push(c);
1017 }
1018 other => regex_str.push(other),
1019 }
1020 }
1021 regex_str.push('$');
1022 let re = regex::Regex::new(®ex_str).unwrap_or_else(|_| regex::Regex::new("^$").unwrap());
1023 re.is_match(uri)
1024 }
1025}
1026
1027impl Clone for RequestRouter {
1028 fn clone(&self) -> Self {
1029 Self {
1030 registry: Arc::clone(&self.registry),
1031 config: self.config.clone(),
1032 custom_routes: self.custom_routes.clone(),
1033 resource_subscriptions: DashMap::new(),
1034 server_request_dispatcher: self.server_request_dispatcher.clone(),
1035 }
1036 }
1037}
1038
1039impl turbomcp_core::ServerCapabilities for RequestRouter {
1042 fn create_message(
1043 &self,
1044 request: serde_json::Value,
1045 ) -> futures::future::BoxFuture<
1046 '_,
1047 Result<serde_json::Value, Box<dyn std::error::Error + Send + Sync>>,
1048 > {
1049 Box::pin(async move {
1050 let request: CreateMessageRequest = serde_json::from_value(request)
1051 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
1052 let ctx = turbomcp_core::RequestContext::new();
1053 let result = self
1054 .send_create_message_to_client(request, ctx)
1055 .await
1056 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
1057 serde_json::to_value(result)
1058 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
1059 })
1060 }
1061
1062 fn elicit(
1063 &self,
1064 request: serde_json::Value,
1065 ) -> futures::future::BoxFuture<
1066 '_,
1067 Result<serde_json::Value, Box<dyn std::error::Error + Send + Sync>>,
1068 > {
1069 Box::pin(async move {
1070 let request: ElicitRequest = serde_json::from_value(request)
1071 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
1072 let ctx = turbomcp_core::RequestContext::new();
1073 let result = self
1074 .send_elicitation_to_client(request, ctx)
1075 .await
1076 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
1077 serde_json::to_value(result)
1078 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
1079 })
1080 }
1081
1082 fn list_roots(
1083 &self,
1084 ) -> futures::future::BoxFuture<
1085 '_,
1086 Result<serde_json::Value, Box<dyn std::error::Error + Send + Sync>>,
1087 > {
1088 Box::pin(async move {
1089 let ctx = turbomcp_core::RequestContext::new();
1090 let result = self
1091 .send_list_roots_to_client(turbomcp_protocol::types::ListRootsRequest {}, ctx)
1092 .await
1093 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
1094 serde_json::to_value(result)
1095 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
1096 })
1097 }
1098}
1099
1100#[derive(Clone)]
1102pub struct Route {
1103 pub method: String,
1105 pub handler: Arc<dyn RouteHandler>,
1107 pub metadata: RouteMetadata,
1109}
1110
1111impl std::fmt::Debug for Route {
1112 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1113 f.debug_struct("Route")
1114 .field("method", &self.method)
1115 .field("metadata", &self.metadata)
1116 .finish()
1117 }
1118}
1119
1120pub type Router = RequestRouter;