Skip to main content

turbomcp_server/
builder.rs

1//! Server Builder - SOTA fluent API for MCP server configuration.
2//!
3//! This module provides a builder pattern for configuring and running MCP servers
4//! with full control over transport selection and server integration.
5//!
6//! # Design Principles
7//!
8//! 1. **Zero Configuration Required** - Sensible defaults for quick starts
9//! 2. **Transport Agnostic** - Choose transport at runtime, not compile time
10//! 3. **BYO Server Support** - Integrate with existing Axum/Tower infrastructure
11//! 4. **Platform Transparent** - Works on native and WASM without `#[cfg]` in user code
12//!
13//! # Examples
14//!
15//! ## Simplest Usage (STDIO default)
16//!
17//! ```rust,ignore
18//! use turbomcp::prelude::*;
19//!
20//! #[tokio::main]
21//! async fn main() {
22//!     MyServer.serve().await.unwrap();
23//! }
24//! ```
25//!
26//! ## Choose Transport at Runtime
27//!
28//! ```rust,ignore
29//! use turbomcp::prelude::*;
30//!
31//! #[tokio::main]
32//! async fn main() {
33//!     let transport = std::env::var("TRANSPORT").unwrap_or("stdio".into());
34//!
35//!     MyServer.builder()
36//!         .transport(match transport.as_str() {
37//!             "http" => Transport::http("0.0.0.0:8080"),
38//!             "tcp" => Transport::tcp("0.0.0.0:9000"),
39//!             _ => Transport::stdio(),
40//!         })
41//!         .serve()
42//!         .await
43//!         .unwrap();
44//! }
45//! ```
46//!
47//! ## Full Configuration
48//!
49//! ```rust,ignore
50//! use turbomcp::prelude::*;
51//!
52//! #[tokio::main]
53//! async fn main() {
54//!     MyServer.builder()
55//!         .transport(Transport::http("0.0.0.0:8080"))
56//!         .with_rate_limit(100, Duration::from_secs(1))
57//!         .with_connection_limit(1000)
58//!         .with_graceful_shutdown(Duration::from_secs(30))
59//!         .serve()
60//!         .await
61//!         .unwrap();
62//! }
63//! ```
64//!
65//! ## Bring Your Own Server (Axum Integration)
66//!
67//! ```rust,ignore
68//! use axum::Router;
69//! use turbomcp::prelude::*;
70//!
71//! #[tokio::main]
72//! async fn main() {
73//!     // Get MCP routes as an Axum router
74//!     let mcp_router = MyServer.builder().into_axum_router();
75//!
76//!     // Merge with your existing routes
77//!     let app = Router::new()
78//!         .route("/health", get(health_check))
79//!         .merge(mcp_router);
80//!
81//!     // Use your own server
82//!     let listener = tokio::net::TcpListener::bind("0.0.0.0:8080").await?;
83//!     axum::serve(listener, app).await?;
84//! }
85//! ```
86
87use std::time::Duration;
88
89use turbomcp_core::error::McpResult;
90use turbomcp_core::handler::McpHandler;
91
92use super::config::{ConnectionLimits, RateLimitConfig, ServerConfig, ServerConfigBuilder};
93
94/// Transport configuration for the server.
95///
96/// Use the associated functions to create transport configurations:
97/// - `Transport::stdio()` - Standard I/O (default, works with Claude Desktop)
98/// - `Transport::http(addr)` - HTTP JSON-RPC
99/// - `Transport::websocket(addr)` - WebSocket bidirectional
100/// - `Transport::tcp(addr)` - Raw TCP sockets
101/// - `Transport::unix(path)` - Unix domain sockets
102#[derive(Debug, Clone, Default)]
103pub enum Transport {
104    /// Standard I/O transport (line-based JSON-RPC).
105    /// This is the default and works with Claude Desktop.
106    #[default]
107    Stdio,
108
109    /// HTTP transport (JSON-RPC over HTTP POST).
110    #[cfg(feature = "http")]
111    Http {
112        /// Bind address (e.g., "0.0.0.0:8080")
113        addr: String,
114    },
115
116    /// WebSocket transport (bidirectional JSON-RPC).
117    #[cfg(feature = "websocket")]
118    WebSocket {
119        /// Bind address (e.g., "0.0.0.0:8080")
120        addr: String,
121    },
122
123    /// TCP transport (line-based JSON-RPC over TCP).
124    #[cfg(feature = "tcp")]
125    Tcp {
126        /// Bind address (e.g., "0.0.0.0:9000")
127        addr: String,
128    },
129
130    /// Unix domain socket transport (line-based JSON-RPC).
131    #[cfg(feature = "unix")]
132    Unix {
133        /// Socket path (e.g., "/tmp/mcp.sock")
134        path: String,
135    },
136}
137
138impl Transport {
139    /// Create STDIO transport configuration.
140    ///
141    /// This is the default transport that works with Claude Desktop
142    /// and other MCP clients that communicate via stdin/stdout.
143    #[must_use]
144    pub fn stdio() -> Self {
145        Self::Stdio
146    }
147
148    /// Create HTTP transport configuration.
149    ///
150    /// # Arguments
151    ///
152    /// * `addr` - Bind address (e.g., "0.0.0.0:8080" or "127.0.0.1:3000")
153    #[cfg(feature = "http")]
154    #[must_use]
155    pub fn http(addr: impl Into<String>) -> Self {
156        Self::Http { addr: addr.into() }
157    }
158
159    /// Create WebSocket transport configuration.
160    ///
161    /// # Arguments
162    ///
163    /// * `addr` - Bind address (e.g., "0.0.0.0:8080")
164    #[cfg(feature = "websocket")]
165    #[must_use]
166    pub fn websocket(addr: impl Into<String>) -> Self {
167        Self::WebSocket { addr: addr.into() }
168    }
169
170    /// Create TCP transport configuration.
171    ///
172    /// # Arguments
173    ///
174    /// * `addr` - Bind address (e.g., "0.0.0.0:9000")
175    #[cfg(feature = "tcp")]
176    #[must_use]
177    pub fn tcp(addr: impl Into<String>) -> Self {
178        Self::Tcp { addr: addr.into() }
179    }
180
181    /// Create Unix domain socket transport configuration.
182    ///
183    /// # Arguments
184    ///
185    /// * `path` - Socket path (e.g., "/tmp/mcp.sock")
186    #[cfg(feature = "unix")]
187    #[must_use]
188    pub fn unix(path: impl Into<String>) -> Self {
189        Self::Unix { path: path.into() }
190    }
191}
192
193/// Server builder for configuring and running MCP servers.
194///
195/// This builder provides a fluent API for:
196/// - Selecting transport at runtime
197/// - Configuring rate limits and connection limits
198/// - Setting up graceful shutdown
199/// - Integrating with existing server infrastructure
200///
201/// # Example
202///
203/// ```rust,ignore
204/// use turbomcp::prelude::*;
205///
206/// MyServer.builder()
207///     .transport(Transport::http("0.0.0.0:8080"))
208///     .with_rate_limit(100, Duration::from_secs(1))
209///     .serve()
210///     .await?;
211/// ```
212#[derive(Debug)]
213pub struct ServerBuilder<H: McpHandler> {
214    handler: H,
215    transport: Transport,
216    config: ServerConfigBuilder,
217    graceful_shutdown: Option<Duration>,
218}
219
220impl<H: McpHandler> ServerBuilder<H> {
221    /// Create a new server builder wrapping the given handler.
222    pub fn new(handler: H) -> Self {
223        Self {
224            handler,
225            transport: Transport::default(),
226            config: ServerConfig::builder(),
227            graceful_shutdown: None,
228        }
229    }
230
231    /// Set the transport for this server.
232    ///
233    /// # Example
234    ///
235    /// ```rust,ignore
236    /// builder.transport(Transport::http("0.0.0.0:8080"))
237    /// ```
238    #[must_use]
239    pub fn transport(mut self, transport: Transport) -> Self {
240        self.transport = transport;
241        self
242    }
243
244    /// Configure rate limiting.
245    ///
246    /// # Arguments
247    ///
248    /// * `requests` - Maximum requests allowed
249    /// * `per` - Time window for the limit
250    ///
251    /// # Example
252    ///
253    /// ```rust,ignore
254    /// // Allow 100 requests per second
255    /// builder.with_rate_limit(100, Duration::from_secs(1))
256    /// ```
257    #[must_use]
258    pub fn with_rate_limit(mut self, max_requests: u32, window: Duration) -> Self {
259        self.config = self.config.rate_limit(RateLimitConfig {
260            max_requests,
261            window,
262            per_client: true,
263        });
264        self
265    }
266
267    /// Configure maximum concurrent connections.
268    ///
269    /// This limit applies to TCP, HTTP, WebSocket, and Unix transports.
270    /// STDIO transport always has exactly one connection.
271    ///
272    /// # Example
273    ///
274    /// ```rust,ignore
275    /// builder.with_connection_limit(1000)
276    /// ```
277    #[must_use]
278    pub fn with_connection_limit(mut self, max: usize) -> Self {
279        self.config = self.config.connection_limits(ConnectionLimits {
280            max_tcp_connections: max,
281            max_websocket_connections: max,
282            max_http_concurrent: max,
283            max_unix_connections: max,
284        });
285        self
286    }
287
288    /// Configure graceful shutdown timeout.
289    ///
290    /// When the server receives a shutdown signal, it will wait up to
291    /// this duration for in-flight requests to complete.
292    ///
293    /// # Example
294    ///
295    /// ```rust,ignore
296    /// builder.with_graceful_shutdown(Duration::from_secs(30))
297    /// ```
298    #[must_use]
299    pub fn with_graceful_shutdown(mut self, timeout: Duration) -> Self {
300        self.graceful_shutdown = Some(timeout);
301        self
302    }
303
304    /// Configure maximum message size.
305    ///
306    /// Messages exceeding this size will be rejected.
307    /// Default: 10MB.
308    ///
309    /// # Example
310    ///
311    /// ```rust,ignore
312    /// // Limit messages to 1MB
313    /// builder.with_max_message_size(1024 * 1024)
314    /// ```
315    #[must_use]
316    pub fn with_max_message_size(mut self, size: usize) -> Self {
317        self.config = self.config.max_message_size(size);
318        self
319    }
320
321    /// Apply a custom server configuration.
322    ///
323    /// This replaces any previously set configuration options.
324    ///
325    /// # Example
326    ///
327    /// ```rust,ignore
328    /// let config = ServerConfig::builder()
329    ///     .rate_limit(rate_config)
330    ///     .connection_limits(limits)
331    ///     .build();
332    ///
333    /// builder.with_config(config)
334    /// ```
335    #[must_use]
336    pub fn with_config(mut self, config: ServerConfig) -> Self {
337        let mut builder = ServerConfig::builder()
338            .protocol(config.protocol)
339            .connection_limits(config.connection_limits)
340            .required_capabilities(config.required_capabilities)
341            .max_message_size(config.max_message_size);
342
343        if let Some(rate_limit) = config.rate_limit {
344            builder = builder.rate_limit(rate_limit);
345        }
346
347        self.config = builder;
348        self
349    }
350
351    /// Run the server with the configured transport.
352    ///
353    /// This is the main entry point that starts the server and blocks
354    /// until shutdown.
355    ///
356    /// # Example
357    ///
358    /// ```rust,ignore
359    /// MyServer.builder()
360    ///     .transport(Transport::http("0.0.0.0:8080"))
361    ///     .serve()
362    ///     .await?;
363    /// ```
364    #[allow(unused_variables)]
365    pub async fn serve(self) -> McpResult<()> {
366        // Config is used by transport-specific features (http, websocket, tcp, unix)
367        // STDIO doesn't use config, so this may be unused if only stdio is enabled
368        let config = self.config.build();
369
370        match self.transport {
371            Transport::Stdio => {
372                #[cfg(feature = "stdio")]
373                {
374                    super::transport::stdio::run(&self.handler).await
375                }
376                #[cfg(not(feature = "stdio"))]
377                {
378                    Err(turbomcp_core::error::McpError::internal(
379                        "STDIO transport not available. Enable the 'stdio' feature.",
380                    ))
381                }
382            }
383
384            #[cfg(feature = "http")]
385            Transport::Http { addr } => {
386                super::transport::http::run_with_config(&self.handler, &addr, &config).await
387            }
388
389            #[cfg(feature = "websocket")]
390            Transport::WebSocket { addr } => {
391                super::transport::websocket::run_with_config(&self.handler, &addr, &config).await
392            }
393
394            #[cfg(feature = "tcp")]
395            Transport::Tcp { addr } => {
396                super::transport::tcp::run_with_config(&self.handler, &addr, &config).await
397            }
398
399            #[cfg(feature = "unix")]
400            Transport::Unix { path } => {
401                super::transport::unix::run_with_config(&self.handler, &path, &config).await
402            }
403        }
404    }
405
406    /// Get the underlying handler.
407    ///
408    /// Useful for testing or custom integrations.
409    #[must_use]
410    pub fn handler(&self) -> &H {
411        &self.handler
412    }
413
414    /// Consume the builder and return the handler.
415    ///
416    /// Useful for custom integrations where you need ownership.
417    #[must_use]
418    pub fn into_handler(self) -> H {
419        self.handler
420    }
421
422    /// Convert to an Axum router for BYO server integration.
423    ///
424    /// This allows you to merge MCP routes with your existing Axum application.
425    /// Rate limiting configured via `with_rate_limit()` is applied to all requests.
426    ///
427    /// # Example
428    ///
429    /// ```rust,ignore
430    /// use axum::Router;
431    /// use axum::routing::get;
432    ///
433    /// let mcp_router = MyServer.builder()
434    ///     .with_rate_limit(100, Duration::from_secs(1))
435    ///     .into_axum_router();
436    ///
437    /// let app = Router::new()
438    ///     .route("/health", get(|| async { "OK" }))
439    ///     .merge(mcp_router);
440    ///
441    /// let listener = tokio::net::TcpListener::bind("0.0.0.0:8080").await?;
442    /// axum::serve(listener, app).await?;
443    /// ```
444    #[cfg(feature = "http")]
445    pub fn into_axum_router(self) -> axum::Router {
446        use axum::{Router, routing::post};
447        use std::sync::Arc;
448
449        let config = self.config.build();
450        let handler = Arc::new(self.handler);
451        let rate_limiter = config
452            .rate_limit
453            .map(|cfg| Arc::new(crate::config::RateLimiter::new(cfg)));
454
455        Router::new()
456            .route("/", post(handle_json_rpc::<H>))
457            .route("/mcp", post(handle_json_rpc::<H>))
458            .with_state(AppState {
459                handler,
460                rate_limiter,
461            })
462    }
463
464    /// Convert to a Tower service for custom server integration.
465    ///
466    /// This returns a service that can be used with any Tower-compatible
467    /// HTTP server (Hyper, Axum, Warp, etc.).
468    ///
469    /// # Example
470    ///
471    /// ```rust,ignore
472    /// use hyper::server::conn::http1;
473    /// use hyper_util::rt::TokioIo;
474    ///
475    /// let service = MyServer.builder().into_service();
476    ///
477    /// let listener = tokio::net::TcpListener::bind("0.0.0.0:8080").await?;
478    /// loop {
479    ///     let (stream, _) = listener.accept().await?;
480    ///     let service = service.clone();
481    ///     tokio::spawn(async move {
482    ///         http1::Builder::new()
483    ///             .serve_connection(TokioIo::new(stream), service)
484    ///             .await
485    ///     });
486    /// }
487    /// ```
488    #[cfg(feature = "http")]
489    pub fn into_service(
490        self,
491    ) -> impl tower::Service<
492        axum::http::Request<axum::body::Body>,
493        Response = axum::http::Response<axum::body::Body>,
494        Error = std::convert::Infallible,
495        Future = impl Future<
496            Output = Result<axum::http::Response<axum::body::Body>, std::convert::Infallible>,
497        > + Send,
498    > + Clone
499    + Send {
500        use tower::ServiceExt;
501        self.into_axum_router()
502            .into_service()
503            .map_err(|e| match e {})
504    }
505}
506
507/// State for the Axum handler.
508#[cfg(feature = "http")]
509#[derive(Clone)]
510struct AppState<H: McpHandler> {
511    handler: std::sync::Arc<H>,
512    rate_limiter: Option<std::sync::Arc<crate::config::RateLimiter>>,
513}
514
515/// JSON-RPC request handler for Axum.
516///
517/// Note: Rate limiting uses global rate limiting when used via `into_axum_router()`.
518/// For per-client rate limiting based on IP, use the full transport which includes
519/// `ConnectInfo` extraction.
520#[cfg(feature = "http")]
521async fn handle_json_rpc<H: McpHandler>(
522    axum::extract::State(state): axum::extract::State<AppState<H>>,
523    axum::Json(request): axum::Json<serde_json::Value>,
524) -> impl axum::response::IntoResponse {
525    use super::context::RequestContext;
526    use super::router::{parse_request, route_request, serialize_response};
527
528    // Check rate limit if configured (uses global rate limiting for BYO server)
529    if let Some(ref limiter) = state.rate_limiter
530        && !limiter.check(None)
531    {
532        return (
533            axum::http::StatusCode::TOO_MANY_REQUESTS,
534            axum::Json(serde_json::json!({
535                "jsonrpc": "2.0",
536                "error": {
537                    "code": -32000,
538                    "message": "Rate limit exceeded"
539                },
540                "id": null
541            })),
542        );
543    }
544
545    let request_str = match serde_json::to_string(&request) {
546        Ok(s) => s,
547        Err(e) => {
548            return (
549                axum::http::StatusCode::BAD_REQUEST,
550                axum::Json(serde_json::json!({
551                    "jsonrpc": "2.0",
552                    "error": {
553                        "code": -32700,
554                        "message": format!("Parse error: {}", e)
555                    },
556                    "id": null
557                })),
558            );
559        }
560    };
561
562    let parsed = match parse_request(&request_str) {
563        Ok(p) => p,
564        Err(e) => {
565            return (
566                axum::http::StatusCode::BAD_REQUEST,
567                axum::Json(serde_json::json!({
568                    "jsonrpc": "2.0",
569                    "error": {
570                        "code": -32700,
571                        "message": format!("Parse error: {}", e)
572                    },
573                    "id": null
574                })),
575            );
576        }
577    };
578
579    let ctx = RequestContext::http();
580    let core_ctx = ctx.to_core_context();
581    let response = route_request(&*state.handler, parsed, &core_ctx).await;
582
583    if !response.should_send() {
584        return (
585            axum::http::StatusCode::NO_CONTENT,
586            axum::Json(serde_json::json!(null)),
587        );
588    }
589
590    match serialize_response(&response) {
591        Ok(json_str) => {
592            let value: serde_json::Value = serde_json::from_str(&json_str).unwrap_or_default();
593            (axum::http::StatusCode::OK, axum::Json(value))
594        }
595        Err(e) => (
596            axum::http::StatusCode::INTERNAL_SERVER_ERROR,
597            axum::Json(serde_json::json!({
598                "jsonrpc": "2.0",
599                "error": {
600                    "code": -32603,
601                    "message": format!("Internal error: {}", e)
602                },
603                "id": null
604            })),
605        ),
606    }
607}
608
609/// Extension trait for creating server builders from handlers.
610///
611/// This trait provides the builder pattern for configurable server deployment.
612/// For simple cases, use `McpHandlerExt::run()` directly.
613///
614/// # Design Philosophy
615///
616/// - **Simple**: `handler.run()` → runs with STDIO (via `McpHandlerExt`)
617/// - **Configurable**: `handler.builder().transport(...).serve()` → full control
618///
619/// # Example
620///
621/// ```rust,ignore
622/// use turbomcp::prelude::*;
623///
624/// // Simple (no config needed)
625/// MyServer.run().await?;
626///
627/// // Configurable (builder pattern)
628/// MyServer.builder()
629///     .transport(Transport::http("0.0.0.0:8080"))
630///     .with_rate_limit(100, Duration::from_secs(1))
631///     .serve()
632///     .await?;
633///
634/// // BYO server (Axum integration)
635/// let mcp = MyServer.builder().into_axum_router();
636/// ```
637pub trait McpServerExt: McpHandler + Sized {
638    /// Create a server builder for this handler.
639    ///
640    /// The builder allows configuring transport, rate limits, connection
641    /// limits, and other server options before starting.
642    fn builder(self) -> ServerBuilder<Self> {
643        ServerBuilder::new(self)
644    }
645}
646
647/// Blanket implementation for all McpHandler types.
648impl<T: McpHandler> McpServerExt for T {}
649
650#[cfg(test)]
651mod tests {
652    use super::*;
653    use serde_json::Value;
654    use turbomcp_core::context::RequestContext as CoreRequestContext;
655    use turbomcp_core::error::McpError;
656    use turbomcp_types::{
657        Prompt, PromptResult, Resource, ResourceResult, ServerInfo, Tool, ToolResult,
658    };
659
660    #[derive(Clone)]
661    struct TestHandler;
662
663    #[allow(clippy::manual_async_fn)]
664    impl McpHandler for TestHandler {
665        fn server_info(&self) -> ServerInfo {
666            ServerInfo::new("test", "1.0.0")
667        }
668
669        fn list_tools(&self) -> Vec<Tool> {
670            vec![Tool::new("test", "Test tool")]
671        }
672
673        fn list_resources(&self) -> Vec<Resource> {
674            vec![]
675        }
676
677        fn list_prompts(&self) -> Vec<Prompt> {
678            vec![]
679        }
680
681        fn call_tool<'a>(
682            &'a self,
683            _name: &'a str,
684            _args: Value,
685            _ctx: &'a CoreRequestContext,
686        ) -> impl std::future::Future<Output = McpResult<ToolResult>> + Send + 'a {
687            async { Ok(ToolResult::text("ok")) }
688        }
689
690        fn read_resource<'a>(
691            &'a self,
692            uri: &'a str,
693            _ctx: &'a CoreRequestContext,
694        ) -> impl std::future::Future<Output = McpResult<ResourceResult>> + Send + 'a {
695            let uri = uri.to_string();
696            async move { Err(McpError::resource_not_found(&uri)) }
697        }
698
699        fn get_prompt<'a>(
700            &'a self,
701            name: &'a str,
702            _args: Option<Value>,
703            _ctx: &'a CoreRequestContext,
704        ) -> impl std::future::Future<Output = McpResult<PromptResult>> + Send + 'a {
705            let name = name.to_string();
706            async move { Err(McpError::prompt_not_found(&name)) }
707        }
708    }
709
710    #[test]
711    fn test_transport_default_is_stdio() {
712        let transport = Transport::default();
713        assert!(matches!(transport, Transport::Stdio));
714    }
715
716    #[test]
717    fn test_builder_creation() {
718        let handler = TestHandler;
719        let builder = handler.builder();
720        assert!(matches!(builder.transport, Transport::Stdio));
721    }
722
723    #[test]
724    fn test_builder_transport_selection() {
725        let handler = TestHandler;
726
727        // Test STDIO
728        let builder = handler.clone().builder().transport(Transport::stdio());
729        assert!(matches!(builder.transport, Transport::Stdio));
730    }
731
732    #[cfg(feature = "http")]
733    #[test]
734    fn test_builder_http_transport() {
735        let handler = TestHandler;
736        let builder = handler.builder().transport(Transport::http("0.0.0.0:8080"));
737        assert!(matches!(builder.transport, Transport::Http { .. }));
738    }
739
740    #[test]
741    fn test_builder_rate_limit() {
742        let handler = TestHandler;
743        let builder = handler
744            .builder()
745            .with_rate_limit(100, Duration::from_secs(1));
746
747        let config = builder.config.build();
748        assert!(config.rate_limit.is_some());
749    }
750
751    #[test]
752    fn test_builder_connection_limit() {
753        let handler = TestHandler;
754        let builder = handler.builder().with_connection_limit(500);
755
756        let config = builder.config.build();
757        assert_eq!(config.connection_limits.max_tcp_connections, 500);
758        assert_eq!(config.connection_limits.max_websocket_connections, 500);
759        assert_eq!(config.connection_limits.max_http_concurrent, 500);
760        assert_eq!(config.connection_limits.max_unix_connections, 500);
761    }
762
763    #[test]
764    fn test_builder_graceful_shutdown() {
765        let handler = TestHandler;
766        let builder = handler
767            .builder()
768            .with_graceful_shutdown(Duration::from_secs(30));
769
770        assert_eq!(builder.graceful_shutdown, Some(Duration::from_secs(30)));
771    }
772
773    #[test]
774    fn test_builder_into_handler() {
775        let handler = TestHandler;
776        let builder = handler.builder();
777        let recovered = builder.into_handler();
778        assert_eq!(recovered.server_info().name, "test");
779    }
780}