1use dashmap::DashMap;
4use std::collections::HashMap;
5use std::sync::Arc;
6use turbomcp_core::RequestContext;
7use turbomcp_protocol::{
8 jsonrpc::{JsonRpcRequest, JsonRpcResponse},
9 types::{
10 CallToolRequest,
11 CompleteRequestParams,
12 CompletionResponse,
13 CreateMessageRequest,
14 ElicitRequest,
16 ElicitResult,
17 EmptyResult,
18 GetPromptRequest,
19 Implementation,
20 InitializeRequest,
21 InitializeResult,
22 ListPromptsResult,
23 ListResourceTemplatesRequest,
24 ListResourceTemplatesResult,
25 ListResourcesResult,
26 ListRootsResult,
27 ListToolsResult,
28 LoggingCapabilities,
29 PingRequest,
30 PingResult,
31 PromptsCapabilities,
32 ReadResourceRequest,
33 ResourcesCapabilities,
34 Root,
35 ServerCapabilities,
36 SetLevelRequest,
37 SubscribeRequest,
38 ToolsCapabilities,
39 UnsubscribeRequest,
40 },
41};
42
43use crate::registry::HandlerRegistry;
44use crate::{ServerError, ServerResult};
45use futures::stream::{self, StreamExt};
46use jsonschema::{Draft, JSONSchema};
47
48pub struct RequestRouter {
50 registry: Arc<HandlerRegistry>,
52 config: RouterConfig,
54 custom_routes: HashMap<String, Arc<dyn RouteHandler>>,
56 resource_subscriptions: DashMap<String, usize>,
58 server_request_dispatcher: Option<Arc<dyn ServerRequestDispatcher>>,
60}
61
62impl std::fmt::Debug for RequestRouter {
63 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64 f.debug_struct("RequestRouter")
65 .field("config", &self.config)
66 .field("custom_routes_count", &self.custom_routes.len())
67 .finish()
68 }
69}
70
71#[derive(Debug, Clone)]
73pub struct RouterConfig {
74 pub validate_requests: bool,
76 pub validate_responses: bool,
78 pub default_timeout_ms: u64,
80 pub enable_tracing: bool,
82 pub max_concurrent_requests: usize,
84 pub enable_bidirectional: bool,
86}
87
88impl Default for RouterConfig {
89 fn default() -> Self {
90 Self {
91 validate_requests: true,
92 validate_responses: true,
93 default_timeout_ms: 30_000,
94 enable_tracing: true,
95 max_concurrent_requests: 1000,
96 enable_bidirectional: true,
97 }
98 }
99}
100
101#[async_trait::async_trait]
103pub trait ServerRequestDispatcher: Send + Sync {
104 async fn send_elicitation(
106 &self,
107 request: ElicitRequest,
108 ctx: RequestContext,
109 ) -> ServerResult<ElicitResult>;
110
111 async fn send_ping(
113 &self,
114 request: PingRequest,
115 ctx: RequestContext,
116 ) -> ServerResult<PingResult>;
117
118 async fn send_create_message(
120 &self,
121 request: CreateMessageRequest,
122 ctx: RequestContext,
123 ) -> ServerResult<turbomcp_protocol::types::CreateMessageResult>;
124
125 async fn send_list_roots(
127 &self,
128 request: turbomcp_protocol::types::ListRootsRequest,
129 ctx: RequestContext,
130 ) -> ServerResult<ListRootsResult>;
131
132 fn supports_bidirectional(&self) -> bool;
134
135 async fn get_client_capabilities(&self) -> ServerResult<Option<serde_json::Value>>;
137}
138
139#[async_trait::async_trait]
141pub trait RouteHandler: Send + Sync {
142 async fn handle(
144 &self,
145 request: JsonRpcRequest,
146 ctx: RequestContext,
147 ) -> ServerResult<JsonRpcResponse>;
148
149 fn can_handle(&self, method: &str) -> bool;
151
152 fn metadata(&self) -> RouteMetadata {
154 RouteMetadata::default()
155 }
156}
157
158#[derive(Debug, Clone)]
160pub struct RouteMetadata {
161 pub name: String,
163 pub description: Option<String>,
165 pub version: String,
167 pub methods: Vec<String>,
169 pub tags: Vec<String>,
171}
172
173impl Default for RouteMetadata {
174 fn default() -> Self {
175 Self {
176 name: "unknown".to_string(),
177 description: None,
178 version: "1.0.0".to_string(),
179 methods: Vec::new(),
180 tags: Vec::new(),
181 }
182 }
183}
184
185impl RequestRouter {
186 #[must_use]
188 pub fn new(registry: Arc<HandlerRegistry>) -> Self {
189 Self {
190 registry,
191 config: RouterConfig::default(),
192 custom_routes: HashMap::new(),
193 resource_subscriptions: DashMap::new(),
194 server_request_dispatcher: None,
195 }
196 }
197
198 #[must_use]
200 pub fn with_config(registry: Arc<HandlerRegistry>, config: RouterConfig) -> Self {
201 Self {
202 registry,
203 config,
204 custom_routes: HashMap::new(),
205 resource_subscriptions: DashMap::new(),
206 server_request_dispatcher: None,
207 }
208 }
209
210 pub fn set_server_request_dispatcher<D>(&mut self, dispatcher: D)
212 where
213 D: ServerRequestDispatcher + 'static,
214 {
215 self.server_request_dispatcher = Some(Arc::new(dispatcher));
216 }
217
218 pub fn get_server_request_dispatcher(&self) -> Option<&Arc<dyn ServerRequestDispatcher>> {
220 self.server_request_dispatcher.as_ref()
221 }
222
223 pub fn supports_bidirectional(&self) -> bool {
225 self.config.enable_bidirectional && self.server_request_dispatcher.is_some()
226 }
227
228 pub fn add_route<H>(&mut self, handler: H) -> ServerResult<()>
230 where
231 H: RouteHandler + 'static,
232 {
233 let metadata = handler.metadata();
234 let handler_arc: Arc<dyn RouteHandler> = Arc::new(handler);
235
236 for method in &metadata.methods {
237 if self.custom_routes.contains_key(method) {
238 return Err(ServerError::routing_with_method(
239 format!("Route for method '{method}' already exists"),
240 method.clone(),
241 ));
242 }
243 self.custom_routes
244 .insert(method.clone(), Arc::clone(&handler_arc));
245 }
246
247 Ok(())
248 }
249
250 pub async fn route(&self, request: JsonRpcRequest, ctx: RequestContext) -> JsonRpcResponse {
252 if self.config.validate_requests
254 && let Err(e) = self.validate_request(&request)
255 {
256 return self.error_response(&request, e);
257 }
258
259 let result = match request.method.as_str() {
261 "initialize" => self.handle_initialize(request, ctx).await,
263
264 "tools/list" => self.handle_list_tools(request, ctx).await,
266 "tools/call" => self.handle_call_tool(request, ctx).await,
267
268 "prompts/list" => self.handle_list_prompts(request, ctx).await,
270 "prompts/get" => self.handle_get_prompt(request, ctx).await,
271
272 "resources/list" => self.handle_list_resources(request, ctx).await,
274 "resources/read" => self.handle_read_resource(request, ctx).await,
275 "resources/subscribe" => self.handle_subscribe_resource(request, ctx).await,
276 "resources/unsubscribe" => self.handle_unsubscribe_resource(request, ctx).await,
277
278 "logging/setLevel" => self.handle_set_log_level(request, ctx).await,
280
281 "sampling/createMessage" => self.handle_create_message(request, ctx).await,
283
284 "roots/list" => self.handle_list_roots(request, ctx).await,
286
287 "elicit/request" => self.handle_elicitation(request, ctx).await,
289 "complete/request" => self.handle_completion(request, ctx).await,
290 "resources/templates/list" => self.handle_list_resource_templates(request, ctx).await,
291 "ping/request" => self.handle_ping(request, ctx).await,
292
293 method => {
295 if let Some(handler) = self.custom_routes.get(method) {
296 let request_clone = request.clone();
297 handler
298 .handle(request, ctx)
299 .await
300 .unwrap_or_else(|e| self.error_response(&request_clone, e))
301 } else {
302 self.method_not_found_response(&request)
303 }
304 }
305 };
306
307 if self.config.validate_responses
309 && let Err(e) = self.validate_response(&result)
310 {
311 tracing::warn!("Response validation failed: {}", e);
312 }
313
314 result
315 }
316
317 pub async fn route_batch(
319 &self,
320 requests: Vec<JsonRpcRequest>,
321 ctx: RequestContext,
322 ) -> Vec<JsonRpcResponse> {
323 let max_in_flight = self.config.max_concurrent_requests.max(1);
324 stream::iter(requests.into_iter())
325 .map(|req| {
326 let ctx_cloned = ctx.clone();
327 async move { self.route(req, ctx_cloned).await }
328 })
329 .buffer_unordered(max_in_flight)
330 .collect()
331 .await
332 }
333
334 async fn handle_initialize(
337 &self,
338 request: JsonRpcRequest,
339 _ctx: RequestContext,
340 ) -> JsonRpcResponse {
341 match self.parse_params::<InitializeRequest>(&request) {
342 Ok(_init_request) => {
343 let result = InitializeResult {
344 protocol_version: turbomcp_protocol::PROTOCOL_VERSION.to_string(),
345 server_info: Implementation {
346 name: crate::SERVER_NAME.to_string(),
347 title: Some("TurboMCP Server".to_string()),
348 version: crate::SERVER_VERSION.to_string(),
349 },
350 capabilities: self.get_server_capabilities(),
351 instructions: None,
352 _meta: None,
353 };
354
355 self.success_response(&request, result)
356 }
357 Err(e) => self.error_response(&request, e),
358 }
359 }
360
361 async fn handle_list_tools(
362 &self,
363 request: JsonRpcRequest,
364 _ctx: RequestContext,
365 ) -> JsonRpcResponse {
366 let tools = self.registry.get_tool_definitions();
367 let result = ListToolsResult {
368 tools,
369 next_cursor: None,
370 _meta: None,
371 };
372 self.success_response(&request, result)
373 }
374
375 async fn handle_call_tool(
376 &self,
377 request: JsonRpcRequest,
378 ctx: RequestContext,
379 ) -> JsonRpcResponse {
380 let ctx = ctx.with_server_capabilities(
382 Arc::new(self.clone()) as Arc<dyn turbomcp_core::ServerCapabilities>
383 );
384
385 match self.parse_params::<CallToolRequest>(&request) {
386 Ok(call_request) => {
387 let tool_name = &call_request.name;
388
389 if let Some(handler) = self.registry.get_tool(tool_name) {
390 if self.config.validate_requests
392 && let Some(required_roles) = handler.allowed_roles()
393 {
394 let has_role = ctx
395 .metadata
396 .get("auth")
397 .and_then(|v| v.get("roles"))
398 .and_then(|v| v.as_array())
399 .is_some_and(|arr| {
400 let user_set: std::collections::HashSet<String> = arr
401 .iter()
402 .filter_map(|v| {
403 v.as_str().map(std::string::ToString::to_string)
404 })
405 .collect();
406 required_roles.iter().any(|r| user_set.contains(r))
407 });
408 if !has_role {
409 return self.error_response(
410 &request,
411 ServerError::authentication(format!(
412 "Access denied for tool '{tool_name}'"
413 )),
414 );
415 }
416 }
417
418 if self.config.validate_requests
420 && let Some(arguments) = &call_request.arguments
421 {
422 let tool_def = handler.tool_definition();
424 if let Some(props) = tool_def.input_schema.properties.as_ref() {
425 let mut schema = serde_json::json!({
427 "type": "object",
428 "properties": {},
429 "additionalProperties": tool_def.input_schema.additional_properties.unwrap_or(true)
430 });
431 if let Some(obj) =
432 schema.get_mut("properties").and_then(|v| v.as_object_mut())
433 {
434 for (k, v) in props {
435 obj.insert(k.clone(), v.clone());
436 }
437 }
438 if let Some(required) = tool_def.input_schema.required.as_ref() {
439 schema.as_object_mut().unwrap().insert(
440 "required".to_string(),
441 serde_json::Value::Array(
442 required
443 .iter()
444 .map(|s| serde_json::Value::String(s.clone()))
445 .collect(),
446 ),
447 );
448 }
449
450 if let Ok(compiled) = JSONSchema::options()
452 .with_draft(Draft::Draft7)
453 .compile(&schema)
454 {
455 let instance = serde_json::Value::Object(
456 arguments.clone().into_iter().collect(),
457 );
458 let mut error_messages: Vec<String> = Vec::new();
459 if let Err(iter) = compiled.validate(&instance) {
460 for e in iter {
461 error_messages.push(format!("{}: {}", e.instance_path, e));
462 }
463 }
464 if !error_messages.is_empty() {
465 let joined = error_messages.join("; ");
466 let err = ServerError::routing_with_method(
467 format!("Argument validation failed: {joined}"),
468 "tools/call".to_string(),
469 );
470 return self.error_response(&request, err);
471 }
472 }
473 }
474 }
475 match handler.handle(call_request, ctx).await {
476 Ok(result) => self.success_response(&request, result),
477 Err(e) => self.error_response(&request, e),
478 }
479 } else {
480 let error = ServerError::not_found(format!("Tool '{tool_name}'"));
481 self.error_response(&request, error)
482 }
483 }
484 Err(e) => self.error_response(&request, e),
485 }
486 }
487
488 async fn handle_list_prompts(
489 &self,
490 request: JsonRpcRequest,
491 _ctx: RequestContext,
492 ) -> JsonRpcResponse {
493 let prompts = self.registry.get_prompt_definitions();
494 let result = ListPromptsResult {
495 prompts,
496 next_cursor: None,
497 _meta: None,
498 };
499 self.success_response(&request, result)
500 }
501
502 async fn handle_get_prompt(
503 &self,
504 request: JsonRpcRequest,
505 ctx: RequestContext,
506 ) -> JsonRpcResponse {
507 match self.parse_params::<GetPromptRequest>(&request) {
508 Ok(prompt_request) => {
509 let prompt_name = &prompt_request.name;
510
511 if let Some(handler) = self.registry.get_prompt(prompt_name) {
512 match handler.handle(prompt_request, ctx).await {
513 Ok(result) => self.success_response(&request, result),
514 Err(e) => self.error_response(&request, e),
515 }
516 } else {
517 let error = ServerError::not_found(format!("Prompt '{prompt_name}'"));
518 self.error_response(&request, error)
519 }
520 }
521 Err(e) => self.error_response(&request, e),
522 }
523 }
524
525 async fn handle_list_resources(
526 &self,
527 request: JsonRpcRequest,
528 _ctx: RequestContext,
529 ) -> JsonRpcResponse {
530 let resources = self.registry.get_resource_definitions();
531 let result = ListResourcesResult {
532 resources,
533 next_cursor: None,
534 _meta: None,
535 };
536 self.success_response(&request, result)
537 }
538
539 async fn handle_read_resource(
540 &self,
541 request: JsonRpcRequest,
542 ctx: RequestContext,
543 ) -> JsonRpcResponse {
544 match self.parse_params::<ReadResourceRequest>(&request) {
545 Ok(resource_request) => {
546 let resource_uri = &resource_request.uri;
547
548 for handler in &self.registry.resources {
550 let resource_def = handler.value().resource_definition();
551 if self.matches_uri_pattern(&resource_def.uri, resource_uri) {
552 match handler.value().handle(resource_request, ctx).await {
553 Ok(result) => return self.success_response(&request, result),
554 Err(e) => return self.error_response(&request, e),
555 }
556 }
557 }
558
559 let error = ServerError::not_found(format!("Resource '{resource_uri}'"));
560 self.error_response(&request, error)
561 }
562 Err(e) => self.error_response(&request, e),
563 }
564 }
565
566 async fn handle_subscribe_resource(
567 &self,
568 request: JsonRpcRequest,
569 _ctx: RequestContext,
570 ) -> JsonRpcResponse {
571 match self.parse_params::<SubscribeRequest>(&request) {
572 Ok(sub) => {
573 let uri = sub.uri;
574 let new_count_ref = self
575 .resource_subscriptions
576 .entry(uri.clone())
577 .and_modify(|c| *c += 1)
578 .or_insert(1usize);
579 let new_count: usize = *new_count_ref;
580 tracing::debug!(uri = %uri, count = new_count, "resource subscribed");
581 self.success_response(&request, EmptyResult {})
582 }
583 Err(e) => self.error_response(&request, e),
584 }
585 }
586
587 async fn handle_unsubscribe_resource(
588 &self,
589 request: JsonRpcRequest,
590 _ctx: RequestContext,
591 ) -> JsonRpcResponse {
592 match self.parse_params::<UnsubscribeRequest>(&request) {
593 Ok(unsub) => {
594 let uri = unsub.uri;
595 if let Some(mut entry) = self.resource_subscriptions.get_mut(&uri) {
596 let count = entry.value_mut();
597 if *count > 0 {
598 *count -= 1;
599 }
600 if *count == 0 {
601 drop(entry);
602 self.resource_subscriptions.remove(&uri);
603 }
604 tracing::debug!(uri = %uri, "resource unsubscribed");
605 }
606 self.success_response(&request, EmptyResult {})
607 }
608 Err(e) => self.error_response(&request, e),
609 }
610 }
611
612 async fn handle_set_log_level(
613 &self,
614 request: JsonRpcRequest,
615 ctx: RequestContext,
616 ) -> JsonRpcResponse {
617 match self.parse_params::<SetLevelRequest>(&request) {
618 Ok(level_request) => {
619 if let Some(handler_entry) = self.registry.logging.iter().next() {
621 match handler_entry.value().handle(level_request, ctx).await {
622 Ok(result) => self.success_response(&request, result),
623 Err(e) => self.error_response(&request, e),
624 }
625 } else {
626 let error = ServerError::not_found("No logging handler available");
627 self.error_response(&request, error)
628 }
629 }
630 Err(e) => self.error_response(&request, e),
631 }
632 }
633
634 async fn handle_create_message(
635 &self,
636 request: JsonRpcRequest,
637 ctx: RequestContext,
638 ) -> JsonRpcResponse {
639 match self.parse_params::<CreateMessageRequest>(&request) {
640 Ok(message_request) => {
641 if let Some(handler_entry) = self.registry.sampling.iter().next() {
643 match handler_entry.value().handle(message_request, ctx).await {
644 Ok(result) => self.success_response(&request, result),
645 Err(e) => self.error_response(&request, e),
646 }
647 } else {
648 let error = ServerError::not_found("No sampling handler available");
649 self.error_response(&request, error)
650 }
651 }
652 Err(e) => self.error_response(&request, e),
653 }
654 }
655
656 async fn handle_list_roots(
657 &self,
658 request: JsonRpcRequest,
659 _ctx: RequestContext,
660 ) -> JsonRpcResponse {
661 let roots = self.registry.get_roots();
663
664 let roots = if roots.is_empty() {
666 let mut default_roots: Vec<Root> = Vec::new();
667 #[cfg(target_os = "linux")]
668 {
669 default_roots.push(Root {
670 uri: "file:///".to_string(),
671 name: Some("root".to_string()),
672 });
673 }
674 #[cfg(target_os = "macos")]
675 {
676 default_roots.push(Root {
677 uri: "file:///".to_string(),
678 name: Some("root".to_string()),
679 });
680 default_roots.push(Root {
681 uri: "file:///Volumes".to_string(),
682 name: Some("Volumes".to_string()),
683 });
684 }
685 #[cfg(target_os = "windows")]
686 {
687 for drive in ['C', 'D', 'E', 'F', 'G', 'H'] {
689 default_roots.push(Root {
690 uri: format!("file:///{}:/", drive),
691 name: Some(format!("{}:", drive)),
692 });
693 }
694 }
695 default_roots
696 } else {
697 roots
698 };
699
700 let result = ListRootsResult { roots, _meta: None };
701 self.success_response(&request, result)
702 }
703
704 async fn handle_elicitation(
709 &self,
710 request: JsonRpcRequest,
711 _ctx: RequestContext,
712 ) -> JsonRpcResponse {
713 match self.parse_params::<ElicitRequest>(&request) {
714 Ok(_elicit_request) => {
715 let result = ElicitResult {
718 action: turbomcp_protocol::types::ElicitationAction::Decline,
719 content: None,
720 _meta: Some(serde_json::json!({
721 "message": "Elicitation not supported - no handler registered"
722 })),
723 };
724 self.success_response(&request, result)
725 }
726 Err(e) => self.error_response(&request, e),
727 }
728 }
729
730 async fn handle_completion(
731 &self,
732 request: JsonRpcRequest,
733 _ctx: RequestContext,
734 ) -> JsonRpcResponse {
735 match self.parse_params::<CompleteRequestParams>(&request) {
736 Ok(_complete_request) => {
737 let result = CompletionResponse {
740 values: Vec::new(),
741 has_more: Some(false),
742 total: Some(0),
743 };
744 self.success_response(&request, result)
745 }
746 Err(e) => self.error_response(&request, e),
747 }
748 }
749
750 async fn handle_list_resource_templates(
751 &self,
752 request: JsonRpcRequest,
753 _ctx: RequestContext,
754 ) -> JsonRpcResponse {
755 match self.parse_params::<ListResourceTemplatesRequest>(&request) {
756 Ok(_template_request) => {
757 let result = ListResourceTemplatesResult {
760 resource_templates: Vec::new(),
761 next_cursor: None,
762 _meta: Some(serde_json::json!({
763 "message": "Resource templates not supported - no handler registered"
764 })),
765 };
766 self.success_response(&request, result)
767 }
768 Err(e) => self.error_response(&request, e),
769 }
770 }
771
772 async fn handle_ping(&self, request: JsonRpcRequest, _ctx: RequestContext) -> JsonRpcResponse {
773 match self.parse_params::<PingRequest>(&request) {
774 Ok(_ping_request) => {
775 let result = PingResult {
777 _meta: Some(serde_json::json!({
778 "status": "healthy",
779 "timestamp": chrono::Utc::now().to_rfc3339(),
780 "server": "turbomcp-server",
781 })),
782 };
783 self.success_response(&request, result)
784 }
785 Err(e) => self.error_response(&request, e),
786 }
787 }
788
789 pub async fn send_elicitation_to_client(
795 &self,
796 request: ElicitRequest,
797 ctx: RequestContext,
798 ) -> ServerResult<ElicitResult> {
799 if let Some(dispatcher) = &self.server_request_dispatcher {
800 dispatcher.send_elicitation(request, ctx).await
801 } else {
802 Err(ServerError::Handler {
803 message: "Server request dispatcher not configured for bidirectional communication"
804 .to_string(),
805 context: Some("elicitation".to_string()),
806 })
807 }
808 }
809
810 pub async fn send_ping_to_client(
812 &self,
813 request: PingRequest,
814 ctx: RequestContext,
815 ) -> ServerResult<PingResult> {
816 if let Some(dispatcher) = &self.server_request_dispatcher {
817 dispatcher.send_ping(request, ctx).await
818 } else {
819 Err(ServerError::Handler {
820 message: "Server request dispatcher not configured for bidirectional communication"
821 .to_string(),
822 context: Some("ping".to_string()),
823 })
824 }
825 }
826
827 pub async fn send_create_message_to_client(
829 &self,
830 request: CreateMessageRequest,
831 ctx: RequestContext,
832 ) -> ServerResult<turbomcp_protocol::types::CreateMessageResult> {
833 if let Some(dispatcher) = &self.server_request_dispatcher {
834 dispatcher.send_create_message(request, ctx).await
835 } else {
836 Err(ServerError::Handler {
837 message: "Server request dispatcher not configured for bidirectional communication"
838 .to_string(),
839 context: Some("create_message".to_string()),
840 })
841 }
842 }
843
844 pub async fn send_list_roots_to_client(
846 &self,
847 request: turbomcp_protocol::types::ListRootsRequest,
848 ctx: RequestContext,
849 ) -> ServerResult<ListRootsResult> {
850 if let Some(dispatcher) = &self.server_request_dispatcher {
851 dispatcher.send_list_roots(request, ctx).await
852 } else {
853 Err(ServerError::Handler {
854 message: "Server request dispatcher not configured for bidirectional communication"
855 .to_string(),
856 context: Some("list_roots".to_string()),
857 })
858 }
859 }
860
861 fn get_server_capabilities(&self) -> ServerCapabilities {
864 ServerCapabilities {
865 tools: if self.registry.tools.is_empty() {
866 None
867 } else {
868 Some(ToolsCapabilities::default())
869 },
870 prompts: if self.registry.prompts.is_empty() {
871 None
872 } else {
873 Some(PromptsCapabilities::default())
874 },
875 resources: if self.registry.resources.is_empty() {
876 None
877 } else {
878 Some(ResourcesCapabilities::default())
879 },
880 logging: if self.registry.logging.is_empty() {
881 None
882 } else {
883 Some(LoggingCapabilities)
884 },
885 completions: None, experimental: None,
887 }
888 }
889
890 fn parse_params<T>(&self, request: &JsonRpcRequest) -> ServerResult<T>
891 where
892 T: serde::de::DeserializeOwned,
893 {
894 match &request.params {
895 Some(params) => serde_json::from_value(params.clone()).map_err(|e| {
896 ServerError::routing_with_method(
897 format!("Invalid parameters: {e}"),
898 request.method.clone(),
899 )
900 }),
901 None => Err(ServerError::routing_with_method(
902 "Missing required parameters".to_string(),
903 request.method.clone(),
904 )),
905 }
906 }
907
908 fn success_response<T>(&self, request: &JsonRpcRequest, result: T) -> JsonRpcResponse
909 where
910 T: serde::Serialize,
911 {
912 JsonRpcResponse::success(serde_json::to_value(result).unwrap(), request.id.clone())
913 }
914
915 fn error_response(&self, request: &JsonRpcRequest, error: ServerError) -> JsonRpcResponse {
916 JsonRpcResponse::error_response(
917 turbomcp_protocol::jsonrpc::JsonRpcError {
918 code: error.error_code(),
919 message: error.to_string(),
920 data: None,
921 },
922 request.id.clone(),
923 )
924 }
925
926 fn method_not_found_response(&self, request: &JsonRpcRequest) -> JsonRpcResponse {
927 JsonRpcResponse::error_response(
928 turbomcp_protocol::jsonrpc::JsonRpcError {
929 code: -32601,
930 message: format!("Method '{}' not found", request.method),
931 data: None,
932 },
933 request.id.clone(),
934 )
935 }
936
937 fn validate_request(&self, _request: &JsonRpcRequest) -> ServerResult<()> {
938 let validator = turbomcp_protocol::validation::ProtocolValidator::new();
940 match validator.validate_request(_request) {
941 turbomcp_protocol::validation::ValidationResult::Invalid(errors) => {
942 let msg = errors
943 .into_iter()
944 .map(|e| {
945 format!(
946 "{}: {}{}",
947 e.code,
948 e.message,
949 e.field_path
950 .map(|p| format!(" (@ {p})"))
951 .unwrap_or_default()
952 )
953 })
954 .collect::<Vec<_>>()
955 .join("; ");
956 Err(ServerError::routing_with_method(
957 format!("Request validation failed: {msg}"),
958 _request.method.clone(),
959 ))
960 }
961 _ => Ok(()),
962 }
963 }
964
965 fn validate_response(&self, _response: &JsonRpcResponse) -> ServerResult<()> {
966 let validator = turbomcp_protocol::validation::ProtocolValidator::new();
967 match validator.validate_response(_response) {
968 turbomcp_protocol::validation::ValidationResult::Invalid(errors) => {
969 let msg = errors
970 .into_iter()
971 .map(|e| {
972 format!(
973 "{}: {}{}",
974 e.code,
975 e.message,
976 e.field_path
977 .map(|p| format!(" (@ {p})"))
978 .unwrap_or_default()
979 )
980 })
981 .collect::<Vec<_>>()
982 .join("; ");
983 Err(ServerError::routing(format!(
984 "Response validation failed: {msg}"
985 )))
986 }
987 _ => Ok(()),
988 }
989 }
990
991 fn matches_uri_pattern(&self, pattern: &str, uri: &str) -> bool {
992 let mut regex_str = String::from("^");
996 let mut chars = pattern.chars().peekable();
997 while let Some(c) = chars.next() {
998 match c {
999 '*' => regex_str.push_str(".*"),
1000 '{' => {
1001 for nc in chars.by_ref() {
1003 if nc == '}' {
1004 break;
1005 }
1006 }
1007 regex_str.push_str("[^/]+");
1008 }
1009 '.' | '+' | '?' | '(' | ')' | '|' | '^' | '$' | '[' | ']' | '\\' => {
1010 regex_str.push('\\');
1011 regex_str.push(c);
1012 }
1013 other => regex_str.push(other),
1014 }
1015 }
1016 regex_str.push('$');
1017 let re = regex::Regex::new(®ex_str).unwrap_or_else(|_| regex::Regex::new("^$").unwrap());
1018 re.is_match(uri)
1019 }
1020}
1021
1022impl Clone for RequestRouter {
1023 fn clone(&self) -> Self {
1024 Self {
1025 registry: Arc::clone(&self.registry),
1026 config: self.config.clone(),
1027 custom_routes: self.custom_routes.clone(),
1028 resource_subscriptions: DashMap::new(),
1029 server_request_dispatcher: self.server_request_dispatcher.clone(),
1030 }
1031 }
1032}
1033
1034impl turbomcp_core::ServerCapabilities for RequestRouter {
1037 fn create_message(
1038 &self,
1039 request: serde_json::Value,
1040 ) -> futures::future::BoxFuture<
1041 '_,
1042 Result<serde_json::Value, Box<dyn std::error::Error + Send + Sync>>,
1043 > {
1044 Box::pin(async move {
1045 let request: CreateMessageRequest = serde_json::from_value(request)
1046 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
1047 let ctx = turbomcp_core::RequestContext::new();
1048 let result = self
1049 .send_create_message_to_client(request, ctx)
1050 .await
1051 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
1052 serde_json::to_value(result)
1053 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
1054 })
1055 }
1056
1057 fn elicit(
1058 &self,
1059 request: serde_json::Value,
1060 ) -> futures::future::BoxFuture<
1061 '_,
1062 Result<serde_json::Value, Box<dyn std::error::Error + Send + Sync>>,
1063 > {
1064 Box::pin(async move {
1065 let request: ElicitRequest = serde_json::from_value(request)
1066 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
1067 let ctx = turbomcp_core::RequestContext::new();
1068 let result = self
1069 .send_elicitation_to_client(request, ctx)
1070 .await
1071 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
1072 serde_json::to_value(result)
1073 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
1074 })
1075 }
1076
1077 fn list_roots(
1078 &self,
1079 ) -> futures::future::BoxFuture<
1080 '_,
1081 Result<serde_json::Value, Box<dyn std::error::Error + Send + Sync>>,
1082 > {
1083 Box::pin(async move {
1084 let ctx = turbomcp_core::RequestContext::new();
1085 let result = self
1086 .send_list_roots_to_client(turbomcp_protocol::types::ListRootsRequest {}, ctx)
1087 .await
1088 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
1089 serde_json::to_value(result)
1090 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
1091 })
1092 }
1093}
1094
1095#[derive(Clone)]
1097pub struct Route {
1098 pub method: String,
1100 pub handler: Arc<dyn RouteHandler>,
1102 pub metadata: RouteMetadata,
1104}
1105
1106impl std::fmt::Debug for Route {
1107 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1108 f.debug_struct("Route")
1109 .field("method", &self.method)
1110 .field("metadata", &self.metadata)
1111 .finish()
1112 }
1113}
1114
1115pub type Router = RequestRouter;