1use std::sync::Arc;
4use tokio::sync::RwLock;
5
6use crate::{
7 config::ServerConfig,
8 error::ServerResult,
9 handlers::{PromptHandler, ResourceHandler, ToolHandler},
10 lifecycle::{HealthStatus, ServerLifecycle},
11 metrics::ServerMetrics,
12 middleware::{KeyExtractor, MiddlewareStack, RateLimitConfig, RateLimitMiddleware},
13 registry::HandlerRegistry,
14 routing::RequestRouter,
15};
16
17use bytes::Bytes;
18use tokio::time::{Duration, sleep};
19use turbomcp_core::RequestContext;
20use turbomcp_protocol::jsonrpc::{JsonRpcMessage, JsonRpcRequest, JsonRpcResponse};
21use turbomcp_transport::StdioTransport;
22use turbomcp_transport::core::{TransportError, TransportMessageMetadata};
23use turbomcp_transport::{Transport, TransportMessage};
24
25fn should_log_for_stdio() -> bool {
31 std::env::var("TURBOMCP_FORCE_LOGGING").is_ok()
32}
33
34#[derive(Debug, Clone)]
46pub struct ShutdownHandle {
47 lifecycle: Arc<ServerLifecycle>,
48}
49
50impl ShutdownHandle {
51 pub async fn shutdown(&self) {
53 self.lifecycle.shutdown().await;
54 }
55
56 pub async fn is_shutting_down(&self) -> bool {
58 use crate::lifecycle::ServerState;
59 matches!(
60 self.lifecycle.state().await,
61 ServerState::ShuttingDown | ServerState::Stopped
62 )
63 }
64}
65
66pub struct McpServer {
68 pub(crate) config: ServerConfig,
70 pub(crate) registry: Arc<HandlerRegistry>,
72 pub(crate) router: Arc<RequestRouter>,
74 #[allow(dead_code)]
76 pub(crate) middleware: Arc<RwLock<MiddlewareStack>>,
77 pub(crate) lifecycle: Arc<ServerLifecycle>,
79 pub(crate) metrics: Arc<ServerMetrics>,
81}
82
83impl std::fmt::Debug for McpServer {
84 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85 f.debug_struct("McpServer")
86 .field("config", &self.config)
87 .finish()
88 }
89}
90
91impl McpServer {
92 #[must_use]
94 pub fn new(config: ServerConfig) -> Self {
95 let registry = Arc::new(HandlerRegistry::new());
96 let router = Arc::new(RequestRouter::new(Arc::clone(®istry)));
97 let mut stack = MiddlewareStack::new();
98 if config.rate_limiting.enabled {
100 #[cfg(test)]
101 let rate_middleware = RateLimitMiddleware::new_for_testing(RateLimitConfig {
102 requests_per_second: config.rate_limiting.requests_per_second,
103 burst_capacity: config.rate_limiting.burst_capacity,
104 key_extractor: KeyExtractor::Global,
105 });
106
107 #[cfg(not(test))]
108 let rate_middleware = RateLimitMiddleware::new(RateLimitConfig {
109 requests_per_second: config.rate_limiting.requests_per_second,
110 burst_capacity: config.rate_limiting.burst_capacity,
111 key_extractor: KeyExtractor::Global,
112 });
113
114 stack.add(rate_middleware);
115 }
116 let middleware = Arc::new(RwLock::new(stack));
117 let lifecycle = Arc::new(ServerLifecycle::new());
118 let metrics = Arc::new(ServerMetrics::new());
119
120 Self {
121 config,
122 registry,
123 router,
124 middleware,
125 lifecycle,
126 metrics,
127 }
128 }
129
130 #[must_use]
132 pub const fn config(&self) -> &ServerConfig {
133 &self.config
134 }
135
136 #[must_use]
138 pub const fn registry(&self) -> &Arc<HandlerRegistry> {
139 &self.registry
140 }
141
142 #[must_use]
144 pub const fn router(&self) -> &Arc<RequestRouter> {
145 &self.router
146 }
147
148 #[must_use]
150 pub const fn lifecycle(&self) -> &Arc<ServerLifecycle> {
151 &self.lifecycle
152 }
153
154 #[must_use]
156 pub const fn metrics(&self) -> &Arc<ServerMetrics> {
157 &self.metrics
158 }
159
160 pub fn shutdown_handle(&self) -> ShutdownHandle {
232 ShutdownHandle {
233 lifecycle: self.lifecycle.clone(),
234 }
235 }
236
237 pub async fn run_stdio(self) -> ServerResult<()> {
239 if should_log_for_stdio() {
242 tracing::info!("Starting MCP server with STDIO transport");
243 }
244 self.lifecycle.start().await;
245
246 let mut transport = StdioTransport::new();
248 if let Err(e) = transport.connect().await {
249 if should_log_for_stdio() {
250 tracing::error!(error = %e, "Failed to connect stdio transport");
251 } else {
252 eprintln!("TurboMCP STDIO transport failed to connect: {}", e);
254 }
255 self.lifecycle.shutdown().await;
256 return Err(e.into());
257 }
258
259 self.run_with_transport_stdio_aware(transport).await
260 }
261
262 pub async fn health(&self) -> HealthStatus {
264 self.lifecycle.health().await
265 }
266
267 #[cfg(feature = "http")]
278 pub async fn run_http<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
279 self,
280 _addr: A,
281 ) -> ServerResult<()> {
282 Err(crate::ServerError::configuration(
285 "Direct HTTP support has been replaced with compile-time routing. \
286 Use the #[server] macro which generates into_router() and run_http_direct() methods \
287 with zero lifetime issues and maximum performance.",
288 ))
289 }
290
291 #[cfg(feature = "websocket")]
295 pub async fn run_websocket<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
296 self,
297 addr: A,
298 ) -> ServerResult<()> {
299 tracing::info!(
300 ?addr,
301 "WebSocket transport server mode not implemented - WebSocket transport is client-oriented"
302 );
303 tracing::info!(
304 "Consider using ServerBuilder with WebSocket middleware for WebSocket server functionality"
305 );
306 Err(crate::ServerError::configuration(
307 "WebSocket server transport not supported - use ServerBuilder with middleware",
308 ))
309 }
310
311 #[cfg(feature = "tcp")]
313 pub async fn run_tcp<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
314 self,
315 addr: A,
316 ) -> ServerResult<()> {
317 use turbomcp_transport::TcpTransport;
318
319 tracing::info!(?addr, "Starting MCP server with TCP transport");
320 self.lifecycle.start().await;
321
322 let socket_addr = match addr.to_socket_addrs() {
324 Ok(mut addrs) => match addrs.next() {
325 Some(addr) => addr,
326 None => {
327 tracing::error!("No socket address resolved from provided address");
328 self.lifecycle.shutdown().await;
329 return Err(crate::ServerError::configuration("Invalid socket address"));
330 }
331 },
332 Err(e) => {
333 tracing::error!(error = %e, "Failed to resolve socket address");
334 self.lifecycle.shutdown().await;
335 return Err(crate::ServerError::configuration(format!(
336 "Address resolution failed: {e}"
337 )));
338 }
339 };
340
341 let mut transport = TcpTransport::new_server(socket_addr);
342 if let Err(e) = transport.connect().await {
343 tracing::error!(error = %e, "Failed to connect TCP transport");
344 self.lifecycle.shutdown().await;
345 return Err(e.into());
346 }
347
348 self.run_with_transport(transport).await
349 }
350
351 #[cfg(all(feature = "unix", unix))]
353 pub async fn run_unix<P: AsRef<std::path::Path>>(self, path: P) -> ServerResult<()> {
354 use std::path::PathBuf;
355 use turbomcp_transport::UnixTransport;
356
357 tracing::info!(path = ?path.as_ref(), "Starting MCP server with Unix socket transport");
358 self.lifecycle.start().await;
359
360 let socket_path = PathBuf::from(path.as_ref());
361 let mut transport = UnixTransport::new_server(socket_path);
362 if let Err(e) = transport.connect().await {
363 tracing::error!(error = %e, "Failed to connect Unix socket transport");
364 self.lifecycle.shutdown().await;
365 return Err(e.into());
366 }
367
368 self.run_with_transport(transport).await
369 }
370
371 async fn run_with_transport<T: Transport>(&self, mut transport: T) -> ServerResult<()> {
373 let lifecycle_for_sigint = self.lifecycle.clone();
375 tokio::spawn(async move {
376 if let Err(e) = tokio::signal::ctrl_c().await {
377 tracing::warn!(error = %e, "Failed to install Ctrl+C handler");
378 return;
379 }
380 tracing::info!("Ctrl+C received, initiating shutdown");
381 lifecycle_for_sigint.shutdown().await;
382 });
383
384 #[cfg(unix)]
385 {
386 let lifecycle_for_sigterm = self.lifecycle.clone();
387 tokio::spawn(async move {
388 use tokio::signal::unix::{SignalKind, signal};
389 match signal(SignalKind::terminate()) {
390 Ok(mut sigterm) => {
391 sigterm.recv().await;
392 tracing::info!("SIGTERM received, initiating shutdown");
393 lifecycle_for_sigterm.shutdown().await;
394 }
395 Err(e) => tracing::warn!(error = %e, "Failed to install SIGTERM handler"),
396 }
397 });
398 }
399
400 let mut shutdown = self.lifecycle.shutdown_signal();
402
403 loop {
405 tokio::select! {
406 _ = shutdown.recv() => {
407 tracing::info!("Shutdown signal received");
408 break;
409 }
410 res = transport.receive() => {
411 match res {
412 Ok(Some(message)) => {
413 if let Err(e) = self.handle_transport_message(&mut transport, message).await {
414 tracing::warn!(error = %e, "Failed to handle transport message");
415 }
416 }
417 Ok(None) => {
418 sleep(Duration::from_millis(5)).await;
420 }
421 Err(e) => {
422 match e {
423 TransportError::ReceiveFailed(msg) if msg.contains("disconnected") => {
424 tracing::info!("Transport receive channel disconnected; shutting down");
425 break;
426 }
427 _ => {
428 tracing::error!(error = %e, "Transport receive failed");
429 sleep(Duration::from_millis(50)).await;
431 }
432 }
433 }
434 }
435 }
436 }
437 }
438
439 if let Err(e) = transport.disconnect().await {
441 tracing::warn!(error = %e, "Error while disconnecting transport");
442 }
443
444 tracing::info!("Server shutdown complete");
445 Ok(())
446 }
447
448 async fn run_with_transport_stdio_aware<T: Transport>(
450 &self,
451 mut transport: T,
452 ) -> ServerResult<()> {
453 let lifecycle_for_sigint = self.lifecycle.clone();
455 tokio::spawn(async move {
456 if let Err(e) = tokio::signal::ctrl_c().await {
457 if should_log_for_stdio() {
458 tracing::warn!(error = %e, "Failed to install Ctrl+C handler");
459 }
460 return;
461 }
462 if should_log_for_stdio() {
463 tracing::info!("Ctrl+C received, initiating shutdown");
464 }
465 lifecycle_for_sigint.shutdown().await;
466 });
467
468 #[cfg(unix)]
469 {
470 let lifecycle_for_sigterm = self.lifecycle.clone();
471 tokio::spawn(async move {
472 use tokio::signal::unix::{SignalKind, signal};
473 match signal(SignalKind::terminate()) {
474 Ok(mut sigterm) => {
475 sigterm.recv().await;
476 if should_log_for_stdio() {
477 tracing::info!("SIGTERM received, initiating shutdown");
478 }
479 lifecycle_for_sigterm.shutdown().await;
480 }
481 Err(e) => {
482 if should_log_for_stdio() {
483 tracing::warn!(error = %e, "Failed to install SIGTERM handler");
484 }
485 }
486 }
487 });
488 }
489
490 let mut shutdown = self.lifecycle.shutdown_signal();
492
493 loop {
495 tokio::select! {
496 _ = shutdown.recv() => {
497 if should_log_for_stdio() {
498 tracing::info!("Shutdown signal received");
499 }
500 break;
501 }
502 res = transport.receive() => {
503 match res {
504 Ok(Some(message)) => {
505 if let Err(e) = self.handle_transport_message_stdio_aware(&mut transport, message).await
506 && should_log_for_stdio() {
507 tracing::warn!(error = %e, "Failed to handle transport message");
508 }
509 }
510 Ok(None) => {
511 sleep(Duration::from_millis(5)).await;
513 }
514 Err(e) => {
515 match e {
516 TransportError::ReceiveFailed(msg) if msg.contains("disconnected") => {
517 if should_log_for_stdio() {
518 tracing::info!("Transport receive channel disconnected; shutting down");
519 }
520 break;
521 }
522 _ => {
523 if should_log_for_stdio() {
524 tracing::error!(error = %e, "Transport receive failed");
525 }
526 sleep(Duration::from_millis(50)).await;
528 }
529 }
530 }
531 }
532 }
533 }
534 }
535
536 if let Err(e) = transport.disconnect().await
538 && should_log_for_stdio()
539 {
540 tracing::warn!(error = %e, "Error while disconnecting transport");
541 }
542
543 if should_log_for_stdio() {
544 tracing::info!("Server shutdown complete");
545 }
546 Ok(())
547 }
548}
549
550impl McpServer {
551 async fn handle_transport_message(
552 &self,
553 transport: &mut dyn Transport,
554 message: TransportMessage,
555 ) -> ServerResult<()> {
556 let json_str = match std::str::from_utf8(&message.payload) {
558 Ok(s) => s,
559 Err(e) => {
560 tracing::warn!(error = %e, "Invalid UTF-8 in incoming message");
561 return Ok(());
562 }
563 };
564
565 let parsed = serde_json::from_str::<JsonRpcMessage>(json_str);
567 let response_json = match parsed {
568 Ok(JsonRpcMessage::Request(req)) => {
569 let ctx = RequestContext::new().with_metadata("transport", "stdio");
570 let (req, ctx) = match self.middleware.read().await.process_request(req, ctx).await
572 {
573 Ok(tuple) => tuple,
574 Err(e) => {
575 let error = turbomcp_protocol::jsonrpc::JsonRpcError {
577 code: e.error_code(),
578 message: e.to_string(),
579 data: None,
580 };
581 let response = turbomcp_protocol::jsonrpc::JsonRpcResponse {
582 jsonrpc: turbomcp_protocol::jsonrpc::JsonRpcVersion,
583 id: turbomcp_protocol::jsonrpc::ResponseId::null(),
584 payload: turbomcp_protocol::jsonrpc::JsonRpcResponsePayload::Error {
585 error,
586 },
587 };
588 let reply = TransportMessage::with_metadata(
589 message.id,
590 Bytes::from(
591 serde_json::to_string(&response)
592 .unwrap_or_else(|_| "{}".to_string()),
593 ),
594 TransportMessageMetadata::with_content_type("application/json"),
595 );
596 let _ = transport.send(reply).await;
597 return Ok(());
598 }
599 };
600 let (processed_req, updated_ctx) = match self
602 .middleware
603 .read()
604 .await
605 .process_request(req, ctx.clone())
606 .await
607 {
608 Ok(r) => r,
609 Err(e) => {
610 let error_response = turbomcp_protocol::jsonrpc::JsonRpcResponse {
612 jsonrpc: turbomcp_protocol::jsonrpc::JsonRpcVersion,
613 id: turbomcp_protocol::jsonrpc::ResponseId::null(),
614 payload: turbomcp_protocol::jsonrpc::JsonRpcResponsePayload::Error {
615 error: turbomcp_protocol::jsonrpc::JsonRpcError {
616 code: -32603,
617 message: format!("Middleware error: {e}"),
618 data: None,
619 },
620 },
621 };
622 let mut reply = TransportMessage::new(
623 turbomcp_core::MessageId::from("error"),
624 Bytes::from(
625 serde_json::to_string(&error_response)
626 .unwrap_or_else(|_| "{}".to_string()),
627 ),
628 );
629 reply.metadata =
630 TransportMessageMetadata::with_content_type("application/json");
631 let _ = transport.send(reply).await;
632 return Ok(());
633 }
634 };
635
636 let mut resp: JsonRpcResponse =
637 self.router.route(processed_req, updated_ctx.clone()).await;
638 resp = match self
640 .middleware
641 .read()
642 .await
643 .process_response(resp, &updated_ctx)
644 .await
645 {
646 Ok(r) => r,
647 Err(e) => turbomcp_protocol::jsonrpc::JsonRpcResponse {
648 jsonrpc: turbomcp_protocol::jsonrpc::JsonRpcVersion,
649 id: turbomcp_protocol::jsonrpc::ResponseId::null(),
650 payload: turbomcp_protocol::jsonrpc::JsonRpcResponsePayload::Error {
651 error: turbomcp_protocol::jsonrpc::JsonRpcError {
652 code: e.error_code(),
653 message: e.to_string(),
654 data: None,
655 },
656 },
657 },
658 };
659
660 serde_json::to_string(&resp).ok()
661 }
662 Ok(JsonRpcMessage::RequestBatch(batch)) => {
663 let requests: Vec<JsonRpcRequest> = batch.items;
665 let ctx = RequestContext::new().with_metadata("transport", "stdio");
666 let responses = self.router.route_batch(requests, ctx).await;
668 serde_json::to_string(&responses).ok()
669 }
670 Ok(JsonRpcMessage::Notification(_note)) => {
671 None
673 }
674 Ok(
676 JsonRpcMessage::Response(_)
677 | JsonRpcMessage::ResponseBatch(_)
678 | JsonRpcMessage::MessageBatch(_),
679 ) => None,
680 Err(e) => {
681 tracing::warn!(error = %e, "Failed to parse JSON-RPC message");
682 let error_response = turbomcp_protocol::jsonrpc::JsonRpcResponse::parse_error(
684 Some(format!("Invalid JSON-RPC: {}", e)),
685 );
686 serde_json::to_string(&error_response).ok()
687 }
688 };
689
690 if let Some(resp_str) = response_json {
691 let reply = TransportMessage::with_metadata(
692 message.id,
693 Bytes::from(resp_str),
694 TransportMessageMetadata::with_content_type("application/json"),
695 );
696 if let Err(e) = transport.send(reply).await {
697 tracing::warn!(error = %e, "Failed to send response over transport");
698 }
699 }
700
701 Ok(())
702 }
703
704 async fn handle_transport_message_stdio_aware(
706 &self,
707 transport: &mut dyn Transport,
708 message: TransportMessage,
709 ) -> ServerResult<()> {
710 let json_str = match std::str::from_utf8(&message.payload) {
712 Ok(s) => s,
713 Err(e) => {
714 if should_log_for_stdio() {
715 tracing::warn!(error = %e, "Invalid UTF-8 in incoming message");
716 }
717 return Ok(());
718 }
719 };
720
721 let parsed = serde_json::from_str::<JsonRpcMessage>(json_str);
723 let response_json = match parsed {
724 Ok(JsonRpcMessage::Request(req)) => {
725 let ctx = RequestContext::new().with_metadata("transport", "stdio");
726 let (req, ctx) = match self.middleware.read().await.process_request(req, ctx).await
728 {
729 Ok(tuple) => tuple,
730 Err(e) => {
731 let error = turbomcp_protocol::jsonrpc::JsonRpcError {
733 code: e.error_code(),
734 message: e.to_string(),
735 data: None,
736 };
737 let response = turbomcp_protocol::jsonrpc::JsonRpcResponse {
738 jsonrpc: turbomcp_protocol::jsonrpc::JsonRpcVersion,
739 id: turbomcp_protocol::jsonrpc::ResponseId::null(),
740 payload: turbomcp_protocol::jsonrpc::JsonRpcResponsePayload::Error {
741 error,
742 },
743 };
744 let reply = TransportMessage::with_metadata(
745 message.id,
746 Bytes::from(
747 serde_json::to_string(&response)
748 .unwrap_or_else(|_| "{}".to_string()),
749 ),
750 TransportMessageMetadata::with_content_type("application/json"),
751 );
752 let _ = transport.send(reply).await;
753 return Ok(());
754 }
755 };
756 let (processed_req, updated_ctx) = match self
758 .middleware
759 .read()
760 .await
761 .process_request(req, ctx.clone())
762 .await
763 {
764 Ok(r) => r,
765 Err(e) => {
766 let error_response = turbomcp_protocol::jsonrpc::JsonRpcResponse {
768 jsonrpc: turbomcp_protocol::jsonrpc::JsonRpcVersion,
769 id: turbomcp_protocol::jsonrpc::ResponseId::null(),
770 payload: turbomcp_protocol::jsonrpc::JsonRpcResponsePayload::Error {
771 error: turbomcp_protocol::jsonrpc::JsonRpcError {
772 code: -32603,
773 message: format!("Middleware error: {e}"),
774 data: None,
775 },
776 },
777 };
778 let mut reply = TransportMessage::new(
779 turbomcp_core::MessageId::from("error"),
780 Bytes::from(
781 serde_json::to_string(&error_response)
782 .unwrap_or_else(|_| "{}".to_string()),
783 ),
784 );
785 reply.metadata =
786 TransportMessageMetadata::with_content_type("application/json");
787 let _ = transport.send(reply).await;
788 return Ok(());
789 }
790 };
791
792 let mut resp: JsonRpcResponse =
793 self.router.route(processed_req, updated_ctx.clone()).await;
794 resp = match self
796 .middleware
797 .read()
798 .await
799 .process_response(resp, &updated_ctx)
800 .await
801 {
802 Ok(r) => r,
803 Err(e) => turbomcp_protocol::jsonrpc::JsonRpcResponse {
804 jsonrpc: turbomcp_protocol::jsonrpc::JsonRpcVersion,
805 id: turbomcp_protocol::jsonrpc::ResponseId::null(),
806 payload: turbomcp_protocol::jsonrpc::JsonRpcResponsePayload::Error {
807 error: turbomcp_protocol::jsonrpc::JsonRpcError {
808 code: e.error_code(),
809 message: e.to_string(),
810 data: None,
811 },
812 },
813 },
814 };
815
816 serde_json::to_string(&resp).ok()
817 }
818 Ok(JsonRpcMessage::RequestBatch(batch)) => {
819 let requests: Vec<JsonRpcRequest> = batch.items;
821 let ctx = RequestContext::new().with_metadata("transport", "stdio");
822 let responses = self.router.route_batch(requests, ctx).await;
824 serde_json::to_string(&responses).ok()
825 }
826 Ok(JsonRpcMessage::Notification(_note)) => {
827 None
829 }
830 Ok(
832 JsonRpcMessage::Response(_)
833 | JsonRpcMessage::ResponseBatch(_)
834 | JsonRpcMessage::MessageBatch(_),
835 ) => None,
836 Err(e) => {
837 if should_log_for_stdio() {
838 tracing::warn!(error = %e, "Failed to parse JSON-RPC message");
839 }
840 let error_response = turbomcp_protocol::jsonrpc::JsonRpcResponse::parse_error(
842 Some(format!("Invalid JSON-RPC: {}", e)),
843 );
844 serde_json::to_string(&error_response).ok()
845 }
846 };
847
848 if let Some(resp_str) = response_json {
849 let reply = TransportMessage::with_metadata(
850 message.id,
851 Bytes::from(resp_str),
852 TransportMessageMetadata::with_content_type("application/json"),
853 );
854 if let Err(e) = transport.send(reply).await
855 && should_log_for_stdio()
856 {
857 tracing::warn!(error = %e, "Failed to send response over transport");
858 }
859 }
860
861 Ok(())
862 }
863}
864
865pub struct ServerBuilder {
867 config: ServerConfig,
869 registry: HandlerRegistry,
871}
872
873impl std::fmt::Debug for ServerBuilder {
874 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
875 f.debug_struct("ServerBuilder")
876 .field("config", &self.config)
877 .finish()
878 }
879}
880
881impl ServerBuilder {
882 #[must_use]
884 pub fn new() -> Self {
885 Self {
886 config: ServerConfig::default(),
887 registry: HandlerRegistry::new(),
888 }
889 }
890
891 pub fn name(mut self, name: impl Into<String>) -> Self {
893 self.config.name = name.into();
894 self
895 }
896
897 pub fn version(mut self, version: impl Into<String>) -> Self {
899 self.config.version = version.into();
900 self
901 }
902
903 pub fn description(mut self, description: impl Into<String>) -> Self {
905 self.config.description = Some(description.into());
906 self
907 }
908
909 pub fn tool<T>(self, name: impl Into<String>, handler: T) -> ServerResult<Self>
911 where
912 T: ToolHandler + 'static,
913 {
914 self.registry.register_tool(name, handler)?;
915 Ok(self)
916 }
917
918 pub fn prompt<P>(self, name: impl Into<String>, handler: P) -> ServerResult<Self>
920 where
921 P: PromptHandler + 'static,
922 {
923 self.registry.register_prompt(name, handler)?;
924 Ok(self)
925 }
926
927 pub fn resource<R>(self, name: impl Into<String>, handler: R) -> ServerResult<Self>
929 where
930 R: ResourceHandler + 'static,
931 {
932 self.registry.register_resource(name, handler)?;
933 Ok(self)
934 }
935
936 pub fn root(self, uri: impl Into<String>, name: Option<String>) -> Self {
938 use turbomcp_protocol::types::Root;
939 self.registry.add_root(Root {
940 uri: uri.into(),
941 name,
942 });
943 self
944 }
945
946 pub fn roots(self, roots: Vec<turbomcp_protocol::types::Root>) -> Self {
948 self.registry.set_roots(roots);
949 self
950 }
951
952 #[must_use]
954 pub fn build(self) -> McpServer {
955 let mut server = McpServer::new(self.config);
956 server.registry = Arc::new(self.registry);
957 server.router = Arc::new(RequestRouter::new(Arc::clone(&server.registry)));
958 server
959 }
960}
961
962impl Default for ServerBuilder {
963 fn default() -> Self {
964 Self::new()
965 }
966}
967
968#[cfg(test)]
969mod tests {
970 use serde_json::Value;
971
972 #[test]
975 fn test_jsonrpc_parse_error_response() {
976 let invalid_json = r#"{"id": 1, "method": "tools/list"}"#; let parse_result =
981 serde_json::from_str::<turbomcp_protocol::jsonrpc::JsonRpcMessage>(invalid_json);
982
983 assert!(
985 parse_result.is_err(),
986 "Invalid JSON-RPC should fail to parse"
987 );
988
989 let error_response = turbomcp_protocol::jsonrpc::JsonRpcResponse::parse_error(None);
991
992 let serialized = serde_json::to_string(&error_response);
994 assert!(
995 serialized.is_ok(),
996 "Error response should serialize correctly"
997 );
998
999 let response_json = serialized.unwrap();
1000 let parsed_response: Value = serde_json::from_str(&response_json).unwrap();
1001
1002 assert_eq!(parsed_response["jsonrpc"], "2.0");
1004 assert_eq!(parsed_response["error"]["code"], -32700);
1005 assert_eq!(parsed_response["error"]["message"], "Parse error");
1006 assert!(parsed_response["error"]["data"].is_null());
1007 }
1008}