1use std::sync::Arc;
25
26use turbomcp_core::context::RequestContext;
27use turbomcp_core::error::{McpError, McpResult};
28use turbomcp_core::handler::McpHandler;
29use turbomcp_types::{
30 Prompt, PromptResult, Resource, ResourceResult, ServerInfo, Tool, ToolResult,
31};
32
33#[derive(Clone)]
49pub struct CompositeHandler {
50 name: String,
51 version: String,
52 description: Option<String>,
53 handlers: Arc<Vec<MountedHandler>>,
54}
55
56impl std::fmt::Debug for CompositeHandler {
57 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58 f.debug_struct("CompositeHandler")
59 .field("name", &self.name)
60 .field("version", &self.version)
61 .field("description", &self.description)
62 .field("handler_count", &self.handlers.len())
63 .finish()
64 }
65}
66
67struct HandlerWrapper<H: McpHandler> {
69 handler: H,
70}
71
72impl<H: McpHandler> HandlerWrapper<H> {
73 fn new(handler: H) -> Self {
74 Self { handler }
75 }
76
77 fn list_tools(&self) -> Vec<Tool> {
78 self.handler.list_tools()
79 }
80
81 fn list_resources(&self) -> Vec<Resource> {
82 self.handler.list_resources()
83 }
84
85 fn list_prompts(&self) -> Vec<Prompt> {
86 self.handler.list_prompts()
87 }
88
89 async fn call_tool(
90 &self,
91 name: &str,
92 args: serde_json::Value,
93 ctx: &RequestContext,
94 ) -> McpResult<ToolResult> {
95 self.handler.call_tool(name, args, ctx).await
96 }
97
98 async fn read_resource(&self, uri: &str, ctx: &RequestContext) -> McpResult<ResourceResult> {
99 self.handler.read_resource(uri, ctx).await
100 }
101
102 async fn get_prompt(
103 &self,
104 name: &str,
105 args: Option<serde_json::Value>,
106 ctx: &RequestContext,
107 ) -> McpResult<PromptResult> {
108 self.handler.get_prompt(name, args, ctx).await
109 }
110}
111
112impl<H: McpHandler> Clone for HandlerWrapper<H> {
113 fn clone(&self) -> Self {
114 Self {
115 handler: self.handler.clone(),
116 }
117 }
118}
119
120trait DynHandler: Send + Sync {
122 fn dyn_clone(&self) -> Box<dyn DynHandler>;
123 fn dyn_list_tools(&self) -> Vec<Tool>;
124 fn dyn_list_resources(&self) -> Vec<Resource>;
125 fn dyn_list_prompts(&self) -> Vec<Prompt>;
126 fn dyn_call_tool<'a>(
127 &'a self,
128 name: &'a str,
129 args: serde_json::Value,
130 ctx: &'a RequestContext,
131 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = McpResult<ToolResult>> + Send + 'a>>;
132 fn dyn_read_resource<'a>(
133 &'a self,
134 uri: &'a str,
135 ctx: &'a RequestContext,
136 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = McpResult<ResourceResult>> + Send + 'a>>;
137 fn dyn_get_prompt<'a>(
138 &'a self,
139 name: &'a str,
140 args: Option<serde_json::Value>,
141 ctx: &'a RequestContext,
142 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = McpResult<PromptResult>> + Send + 'a>>;
143}
144
145impl<H: McpHandler> DynHandler for HandlerWrapper<H> {
146 fn dyn_clone(&self) -> Box<dyn DynHandler> {
147 Box::new(self.clone())
148 }
149
150 fn dyn_list_tools(&self) -> Vec<Tool> {
151 self.list_tools()
152 }
153
154 fn dyn_list_resources(&self) -> Vec<Resource> {
155 self.list_resources()
156 }
157
158 fn dyn_list_prompts(&self) -> Vec<Prompt> {
159 self.list_prompts()
160 }
161
162 fn dyn_call_tool<'a>(
163 &'a self,
164 name: &'a str,
165 args: serde_json::Value,
166 ctx: &'a RequestContext,
167 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = McpResult<ToolResult>> + Send + 'a>>
168 {
169 Box::pin(self.call_tool(name, args, ctx))
170 }
171
172 fn dyn_read_resource<'a>(
173 &'a self,
174 uri: &'a str,
175 ctx: &'a RequestContext,
176 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = McpResult<ResourceResult>> + Send + 'a>>
177 {
178 Box::pin(self.read_resource(uri, ctx))
179 }
180
181 fn dyn_get_prompt<'a>(
182 &'a self,
183 name: &'a str,
184 args: Option<serde_json::Value>,
185 ctx: &'a RequestContext,
186 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = McpResult<PromptResult>> + Send + 'a>>
187 {
188 Box::pin(self.get_prompt(name, args, ctx))
189 }
190}
191
192struct MountedHandler {
194 prefix: String,
195 handler: Box<dyn DynHandler>,
196}
197
198impl Clone for MountedHandler {
199 fn clone(&self) -> Self {
200 Self {
201 prefix: self.prefix.clone(),
202 handler: self.handler.dyn_clone(),
203 }
204 }
205}
206
207impl CompositeHandler {
208 pub fn new(name: impl Into<String>, version: impl Into<String>) -> Self {
216 Self {
217 name: name.into(),
218 version: version.into(),
219 description: None,
220 handlers: Arc::new(Vec::new()),
221 }
222 }
223
224 #[must_use]
226 pub fn with_description(mut self, description: impl Into<String>) -> Self {
227 self.description = Some(description.into());
228 self
229 }
230
231 #[must_use]
250 pub fn mount<H: McpHandler>(mut self, handler: H, prefix: impl Into<String>) -> Self {
251 let prefix = prefix.into();
252
253 if self.handlers.iter().any(|h| h.prefix == prefix) {
255 panic!(
256 "CompositeHandler: duplicate prefix '{}' - each mounted handler must have a unique prefix",
257 prefix
258 );
259 }
260
261 let handlers = Arc::make_mut(&mut self.handlers);
262 handlers.push(MountedHandler {
263 prefix,
264 handler: Box::new(HandlerWrapper::new(handler)),
265 });
266 self
267 }
268
269 pub fn try_mount<H: McpHandler>(
278 mut self,
279 handler: H,
280 prefix: impl Into<String>,
281 ) -> Result<Self, String> {
282 let prefix = prefix.into();
283
284 if self.handlers.iter().any(|h| h.prefix == prefix) {
285 return Err(format!(
286 "duplicate prefix '{}' - each mounted handler must have a unique prefix",
287 prefix
288 ));
289 }
290
291 let handlers = Arc::make_mut(&mut self.handlers);
292 handlers.push(MountedHandler {
293 prefix,
294 handler: Box::new(HandlerWrapper::new(handler)),
295 });
296 Ok(self)
297 }
298
299 pub fn handler_count(&self) -> usize {
301 self.handlers.len()
302 }
303
304 pub fn prefixes(&self) -> Vec<&str> {
306 self.handlers.iter().map(|h| h.prefix.as_str()).collect()
307 }
308
309 fn prefix_tool_name(prefix: &str, name: &str) -> String {
313 format!("{}_{}", prefix, name)
314 }
315
316 fn prefix_resource_uri(prefix: &str, uri: &str) -> String {
318 format!("{}://{}", prefix, uri)
319 }
320
321 fn prefix_prompt_name(prefix: &str, name: &str) -> String {
323 format!("{}_{}", prefix, name)
324 }
325
326 fn parse_prefixed_tool(name: &str) -> Option<(&str, &str)> {
328 name.split_once('_')
329 }
330
331 fn parse_prefixed_uri(uri: &str) -> Option<(&str, &str)> {
333 uri.split_once("://")
334 }
335
336 fn parse_prefixed_prompt(name: &str) -> Option<(&str, &str)> {
338 name.split_once('_')
339 }
340
341 fn find_handler(&self, prefix: &str) -> Option<&MountedHandler> {
343 self.handlers.iter().find(|h| h.prefix == prefix)
344 }
345}
346
347#[allow(clippy::manual_async_fn)]
348impl McpHandler for CompositeHandler {
349 fn server_info(&self) -> ServerInfo {
350 let mut info = ServerInfo::new(&self.name, &self.version);
351 if let Some(ref desc) = self.description {
352 info = info.with_description(desc);
353 }
354 info
355 }
356
357 fn list_tools(&self) -> Vec<Tool> {
358 let mut tools = Vec::new();
359 for mounted in self.handlers.iter() {
360 for mut tool in mounted.handler.dyn_list_tools() {
361 tool.name = Self::prefix_tool_name(&mounted.prefix, &tool.name);
362 tools.push(tool);
363 }
364 }
365 tools
366 }
367
368 fn list_resources(&self) -> Vec<Resource> {
369 let mut resources = Vec::new();
370 for mounted in self.handlers.iter() {
371 for mut resource in mounted.handler.dyn_list_resources() {
372 resource.uri = Self::prefix_resource_uri(&mounted.prefix, &resource.uri);
373 resources.push(resource);
374 }
375 }
376 resources
377 }
378
379 fn list_prompts(&self) -> Vec<Prompt> {
380 let mut prompts = Vec::new();
381 for mounted in self.handlers.iter() {
382 for mut prompt in mounted.handler.dyn_list_prompts() {
383 prompt.name = Self::prefix_prompt_name(&mounted.prefix, &prompt.name);
384 prompts.push(prompt);
385 }
386 }
387 prompts
388 }
389
390 fn call_tool<'a>(
391 &'a self,
392 name: &'a str,
393 args: serde_json::Value,
394 ctx: &'a RequestContext,
395 ) -> impl std::future::Future<Output = McpResult<ToolResult>> + turbomcp_core::marker::MaybeSend + 'a
396 {
397 async move {
398 let (prefix, original_name) =
399 Self::parse_prefixed_tool(name).ok_or_else(|| McpError::tool_not_found(name))?;
400
401 let handler = self
402 .find_handler(prefix)
403 .ok_or_else(|| McpError::tool_not_found(name))?;
404
405 handler
406 .handler
407 .dyn_call_tool(original_name, args, ctx)
408 .await
409 }
410 }
411
412 fn read_resource<'a>(
413 &'a self,
414 uri: &'a str,
415 ctx: &'a RequestContext,
416 ) -> impl std::future::Future<Output = McpResult<ResourceResult>>
417 + turbomcp_core::marker::MaybeSend
418 + 'a {
419 async move {
420 let (prefix, original_uri) =
421 Self::parse_prefixed_uri(uri).ok_or_else(|| McpError::resource_not_found(uri))?;
422
423 let handler = self
424 .find_handler(prefix)
425 .ok_or_else(|| McpError::resource_not_found(uri))?;
426
427 handler.handler.dyn_read_resource(original_uri, ctx).await
428 }
429 }
430
431 fn get_prompt<'a>(
432 &'a self,
433 name: &'a str,
434 args: Option<serde_json::Value>,
435 ctx: &'a RequestContext,
436 ) -> impl std::future::Future<Output = McpResult<PromptResult>> + turbomcp_core::marker::MaybeSend + 'a
437 {
438 async move {
439 let (prefix, original_name) = Self::parse_prefixed_prompt(name)
440 .ok_or_else(|| McpError::prompt_not_found(name))?;
441
442 let handler = self
443 .find_handler(prefix)
444 .ok_or_else(|| McpError::prompt_not_found(name))?;
445
446 handler
447 .handler
448 .dyn_get_prompt(original_name, args, ctx)
449 .await
450 }
451 }
452}
453
454#[cfg(test)]
455#[allow(clippy::manual_async_fn)]
456mod tests {
457 use super::*;
458 use core::future::Future;
459 use turbomcp_core::marker::MaybeSend;
460
461 #[derive(Clone)]
462 struct WeatherHandler;
463
464 impl McpHandler for WeatherHandler {
465 fn server_info(&self) -> ServerInfo {
466 ServerInfo::new("weather", "1.0.0")
467 }
468
469 fn list_tools(&self) -> Vec<Tool> {
470 vec![Tool::new("get_forecast", "Get weather forecast")]
471 }
472
473 fn list_resources(&self) -> Vec<Resource> {
474 vec![Resource::new("api/current", "Current weather")]
475 }
476
477 fn list_prompts(&self) -> Vec<Prompt> {
478 vec![Prompt::new("forecast_prompt", "Weather forecast prompt")]
479 }
480
481 fn call_tool<'a>(
482 &'a self,
483 name: &'a str,
484 _args: serde_json::Value,
485 _ctx: &'a RequestContext,
486 ) -> impl Future<Output = McpResult<ToolResult>> + MaybeSend + 'a {
487 async move {
488 match name {
489 "get_forecast" => Ok(ToolResult::text("Sunny, 72°F")),
490 _ => Err(McpError::tool_not_found(name)),
491 }
492 }
493 }
494
495 fn read_resource<'a>(
496 &'a self,
497 uri: &'a str,
498 _ctx: &'a RequestContext,
499 ) -> impl Future<Output = McpResult<ResourceResult>> + MaybeSend + 'a {
500 let uri = uri.to_string();
501 async move {
502 if uri == "api/current" {
503 Ok(ResourceResult::text(&uri, "Temperature: 72°F"))
504 } else {
505 Err(McpError::resource_not_found(&uri))
506 }
507 }
508 }
509
510 fn get_prompt<'a>(
511 &'a self,
512 name: &'a str,
513 _args: Option<serde_json::Value>,
514 _ctx: &'a RequestContext,
515 ) -> impl Future<Output = McpResult<PromptResult>> + MaybeSend + 'a {
516 let name = name.to_string();
517 async move {
518 if name == "forecast_prompt" {
519 Ok(PromptResult::user("What is the weather forecast?"))
520 } else {
521 Err(McpError::prompt_not_found(&name))
522 }
523 }
524 }
525 }
526
527 #[derive(Clone)]
528 struct NewsHandler;
529
530 impl McpHandler for NewsHandler {
531 fn server_info(&self) -> ServerInfo {
532 ServerInfo::new("news", "1.0.0")
533 }
534
535 fn list_tools(&self) -> Vec<Tool> {
536 vec![Tool::new("get_headlines", "Get news headlines")]
537 }
538
539 fn list_resources(&self) -> Vec<Resource> {
540 vec![Resource::new("feed/top", "Top news feed")]
541 }
542
543 fn list_prompts(&self) -> Vec<Prompt> {
544 vec![Prompt::new("summary_prompt", "News summary prompt")]
545 }
546
547 fn call_tool<'a>(
548 &'a self,
549 name: &'a str,
550 _args: serde_json::Value,
551 _ctx: &'a RequestContext,
552 ) -> impl Future<Output = McpResult<ToolResult>> + MaybeSend + 'a {
553 async move {
554 match name {
555 "get_headlines" => Ok(ToolResult::text("Breaking: AI advances continue")),
556 _ => Err(McpError::tool_not_found(name)),
557 }
558 }
559 }
560
561 fn read_resource<'a>(
562 &'a self,
563 uri: &'a str,
564 _ctx: &'a RequestContext,
565 ) -> impl Future<Output = McpResult<ResourceResult>> + MaybeSend + 'a {
566 let uri = uri.to_string();
567 async move {
568 if uri == "feed/top" {
569 Ok(ResourceResult::text(&uri, "Top news stories"))
570 } else {
571 Err(McpError::resource_not_found(&uri))
572 }
573 }
574 }
575
576 fn get_prompt<'a>(
577 &'a self,
578 name: &'a str,
579 _args: Option<serde_json::Value>,
580 _ctx: &'a RequestContext,
581 ) -> impl Future<Output = McpResult<PromptResult>> + MaybeSend + 'a {
582 let name = name.to_string();
583 async move {
584 if name == "summary_prompt" {
585 Ok(PromptResult::user("Summarize the news"))
586 } else {
587 Err(McpError::prompt_not_found(&name))
588 }
589 }
590 }
591 }
592
593 #[test]
594 fn test_composite_server_info() {
595 let server = CompositeHandler::new("main", "1.0.0").with_description("Main server");
596
597 let info = server.server_info();
598 assert_eq!(info.name, "main");
599 assert_eq!(info.version, "1.0.0");
600 }
601
602 #[test]
603 fn test_mount_handlers() {
604 let server = CompositeHandler::new("main", "1.0.0")
605 .mount(WeatherHandler, "weather")
606 .mount(NewsHandler, "news");
607
608 assert_eq!(server.handler_count(), 2);
609 assert_eq!(server.prefixes(), vec!["weather", "news"]);
610 }
611
612 #[test]
613 fn test_list_tools_prefixed() {
614 let server = CompositeHandler::new("main", "1.0.0")
615 .mount(WeatherHandler, "weather")
616 .mount(NewsHandler, "news");
617
618 let tools = server.list_tools();
619 assert_eq!(tools.len(), 2);
620
621 let tool_names: Vec<&str> = tools.iter().map(|t| t.name.as_str()).collect();
622 assert!(tool_names.contains(&"weather_get_forecast"));
623 assert!(tool_names.contains(&"news_get_headlines"));
624 }
625
626 #[test]
627 fn test_list_resources_prefixed() {
628 let server = CompositeHandler::new("main", "1.0.0")
629 .mount(WeatherHandler, "weather")
630 .mount(NewsHandler, "news");
631
632 let resources = server.list_resources();
633 assert_eq!(resources.len(), 2);
634
635 let uris: Vec<&str> = resources.iter().map(|r| r.uri.as_str()).collect();
636 assert!(uris.contains(&"weather://api/current"));
637 assert!(uris.contains(&"news://feed/top"));
638 }
639
640 #[test]
641 fn test_list_prompts_prefixed() {
642 let server = CompositeHandler::new("main", "1.0.0")
643 .mount(WeatherHandler, "weather")
644 .mount(NewsHandler, "news");
645
646 let prompts = server.list_prompts();
647 assert_eq!(prompts.len(), 2);
648
649 let prompt_names: Vec<&str> = prompts.iter().map(|p| p.name.as_str()).collect();
650 assert!(prompt_names.contains(&"weather_forecast_prompt"));
651 assert!(prompt_names.contains(&"news_summary_prompt"));
652 }
653
654 #[tokio::test]
655 async fn test_call_tool_routed() {
656 let server = CompositeHandler::new("main", "1.0.0")
657 .mount(WeatherHandler, "weather")
658 .mount(NewsHandler, "news");
659
660 let ctx = RequestContext::default();
661
662 let result = server
664 .call_tool("weather_get_forecast", serde_json::json!({}), &ctx)
665 .await
666 .unwrap();
667 assert_eq!(result.first_text(), Some("Sunny, 72°F"));
668
669 let result = server
671 .call_tool("news_get_headlines", serde_json::json!({}), &ctx)
672 .await
673 .unwrap();
674 assert_eq!(result.first_text(), Some("Breaking: AI advances continue"));
675 }
676
677 #[tokio::test]
678 async fn test_call_tool_not_found() {
679 let server = CompositeHandler::new("main", "1.0.0").mount(WeatherHandler, "weather");
680
681 let ctx = RequestContext::default();
682
683 let result = server
685 .call_tool("unknown_tool", serde_json::json!({}), &ctx)
686 .await;
687 assert!(result.is_err());
688
689 let result = server
691 .call_tool("notool", serde_json::json!({}), &ctx)
692 .await;
693 assert!(result.is_err());
694 }
695
696 #[tokio::test]
697 async fn test_read_resource_routed() {
698 let server = CompositeHandler::new("main", "1.0.0")
699 .mount(WeatherHandler, "weather")
700 .mount(NewsHandler, "news");
701
702 let ctx = RequestContext::default();
703
704 let result = server
706 .read_resource("weather://api/current", &ctx)
707 .await
708 .unwrap();
709 assert!(!result.contents.is_empty());
710
711 let result = server.read_resource("news://feed/top", &ctx).await.unwrap();
713 assert!(!result.contents.is_empty());
714 }
715
716 #[tokio::test]
717 async fn test_get_prompt_routed() {
718 let server = CompositeHandler::new("main", "1.0.0")
719 .mount(WeatherHandler, "weather")
720 .mount(NewsHandler, "news");
721
722 let ctx = RequestContext::default();
723
724 let result = server
726 .get_prompt("weather_forecast_prompt", None, &ctx)
727 .await
728 .unwrap();
729 assert!(!result.messages.is_empty());
730
731 let result = server
733 .get_prompt("news_summary_prompt", None, &ctx)
734 .await
735 .unwrap();
736 assert!(!result.messages.is_empty());
737 }
738
739 #[test]
740 #[should_panic(expected = "duplicate prefix 'weather'")]
741 fn test_duplicate_prefix_panics() {
742 let _server = CompositeHandler::new("main", "1.0.0")
743 .mount(WeatherHandler, "weather")
744 .mount(NewsHandler, "weather"); }
746
747 #[test]
748 fn test_try_mount_duplicate_returns_error() {
749 let server = CompositeHandler::new("main", "1.0.0").mount(WeatherHandler, "weather");
750
751 let result = server.try_mount(NewsHandler, "weather");
752 assert!(result.is_err());
753 assert!(result.unwrap_err().contains("duplicate prefix"));
754 }
755
756 #[test]
757 fn test_try_mount_success() {
758 let server = CompositeHandler::new("main", "1.0.0")
759 .try_mount(WeatherHandler, "weather")
760 .unwrap()
761 .try_mount(NewsHandler, "news")
762 .unwrap();
763
764 assert_eq!(server.handler_count(), 2);
765 }
766}