turbomcp_server/builder.rs
1//! Server Builder - SOTA fluent API for MCP server configuration.
2//!
3//! This module provides a builder pattern for configuring and running MCP servers
4//! with full control over transport selection and server integration.
5//!
6//! # Design Principles
7//!
8//! 1. **Zero Configuration Required** - Sensible defaults for quick starts
9//! 2. **Transport Agnostic** - Choose transport at runtime, not compile time
10//! 3. **BYO Server Support** - Integrate with existing Axum/Tower infrastructure
11//! 4. **Platform Transparent** - Works on native and WASM without `#[cfg]` in user code
12//!
13//! # Examples
14//!
15//! ## Simplest Usage (STDIO default)
16//!
17//! ```rust,ignore
18//! use turbomcp::prelude::*;
19//!
20//! #[tokio::main]
21//! async fn main() {
22//! MyServer.serve().await.unwrap();
23//! }
24//! ```
25//!
26//! ## Choose Transport at Runtime
27//!
28//! ```rust,ignore
29//! use turbomcp::prelude::*;
30//!
31//! #[tokio::main]
32//! async fn main() {
33//! let transport = std::env::var("TRANSPORT").unwrap_or("stdio".into());
34//!
35//! MyServer.builder()
36//! .transport(match transport.as_str() {
37//! "http" => Transport::http("0.0.0.0:8080"),
38//! "tcp" => Transport::tcp("0.0.0.0:9000"),
39//! _ => Transport::stdio(),
40//! })
41//! .serve()
42//! .await
43//! .unwrap();
44//! }
45//! ```
46//!
47//! ## Full Configuration
48//!
49//! ```rust,ignore
50//! use turbomcp::prelude::*;
51//!
52//! #[tokio::main]
53//! async fn main() {
54//! MyServer.builder()
55//! .transport(Transport::http("0.0.0.0:8080"))
56//! .with_rate_limit(100, Duration::from_secs(1))
57//! .with_connection_limit(1000)
58//! .with_graceful_shutdown(Duration::from_secs(30))
59//! .serve()
60//! .await
61//! .unwrap();
62//! }
63//! ```
64//!
65//! ## Bring Your Own Server (Axum Integration)
66//!
67//! ```rust,ignore
68//! use axum::Router;
69//! use turbomcp::prelude::*;
70//!
71//! #[tokio::main]
72//! async fn main() {
73//! // Get MCP routes as an Axum router
74//! let mcp_router = MyServer.builder().into_axum_router();
75//!
76//! // Merge with your existing routes
77//! let app = Router::new()
78//! .route("/health", get(health_check))
79//! .merge(mcp_router);
80//!
81//! // Use your own server
82//! let listener = tokio::net::TcpListener::bind("0.0.0.0:8080").await?;
83//! axum::serve(listener, app).await?;
84//! }
85//! ```
86
87use std::time::Duration;
88
89use turbomcp_core::error::McpResult;
90use turbomcp_core::handler::McpHandler;
91
92use super::config::{ConnectionLimits, RateLimitConfig, ServerConfig, ServerConfigBuilder};
93
94/// Transport configuration for the server.
95///
96/// Use the associated functions to create transport configurations:
97/// - `Transport::stdio()` - Standard I/O (default, works with Claude Desktop)
98/// - `Transport::http(addr)` - HTTP JSON-RPC
99/// - `Transport::websocket(addr)` - WebSocket bidirectional
100/// - `Transport::tcp(addr)` - Raw TCP sockets
101/// - `Transport::unix(path)` - Unix domain sockets
102#[derive(Debug, Clone, Default)]
103pub enum Transport {
104 /// Standard I/O transport (line-based JSON-RPC).
105 /// This is the default and works with Claude Desktop.
106 #[default]
107 Stdio,
108
109 /// HTTP transport (JSON-RPC over HTTP POST).
110 #[cfg(feature = "http")]
111 Http {
112 /// Bind address (e.g., "0.0.0.0:8080")
113 addr: String,
114 },
115
116 /// WebSocket transport (bidirectional JSON-RPC).
117 #[cfg(feature = "websocket")]
118 WebSocket {
119 /// Bind address (e.g., "0.0.0.0:8080")
120 addr: String,
121 },
122
123 /// TCP transport (line-based JSON-RPC over TCP).
124 #[cfg(feature = "tcp")]
125 Tcp {
126 /// Bind address (e.g., "0.0.0.0:9000")
127 addr: String,
128 },
129
130 /// Unix domain socket transport (line-based JSON-RPC).
131 #[cfg(feature = "unix")]
132 Unix {
133 /// Socket path (e.g., "/tmp/mcp.sock")
134 path: String,
135 },
136}
137
138impl Transport {
139 /// Create STDIO transport configuration.
140 ///
141 /// This is the default transport that works with Claude Desktop
142 /// and other MCP clients that communicate via stdin/stdout.
143 #[must_use]
144 pub fn stdio() -> Self {
145 Self::Stdio
146 }
147
148 /// Create HTTP transport configuration.
149 ///
150 /// # Arguments
151 ///
152 /// * `addr` - Bind address (e.g., "0.0.0.0:8080" or "127.0.0.1:3000")
153 #[cfg(feature = "http")]
154 #[must_use]
155 pub fn http(addr: impl Into<String>) -> Self {
156 Self::Http { addr: addr.into() }
157 }
158
159 /// Create WebSocket transport configuration.
160 ///
161 /// # Arguments
162 ///
163 /// * `addr` - Bind address (e.g., "0.0.0.0:8080")
164 #[cfg(feature = "websocket")]
165 #[must_use]
166 pub fn websocket(addr: impl Into<String>) -> Self {
167 Self::WebSocket { addr: addr.into() }
168 }
169
170 /// Create TCP transport configuration.
171 ///
172 /// # Arguments
173 ///
174 /// * `addr` - Bind address (e.g., "0.0.0.0:9000")
175 #[cfg(feature = "tcp")]
176 #[must_use]
177 pub fn tcp(addr: impl Into<String>) -> Self {
178 Self::Tcp { addr: addr.into() }
179 }
180
181 /// Create Unix domain socket transport configuration.
182 ///
183 /// # Arguments
184 ///
185 /// * `path` - Socket path (e.g., "/tmp/mcp.sock")
186 #[cfg(feature = "unix")]
187 #[must_use]
188 pub fn unix(path: impl Into<String>) -> Self {
189 Self::Unix { path: path.into() }
190 }
191}
192
193/// Server builder for configuring and running MCP servers.
194///
195/// This builder provides a fluent API for:
196/// - Selecting transport at runtime
197/// - Configuring rate limits and connection limits
198/// - Setting up graceful shutdown
199/// - Integrating with existing server infrastructure
200///
201/// # Example
202///
203/// ```rust,ignore
204/// use turbomcp::prelude::*;
205///
206/// MyServer.builder()
207/// .transport(Transport::http("0.0.0.0:8080"))
208/// .with_rate_limit(100, Duration::from_secs(1))
209/// .serve()
210/// .await?;
211/// ```
212#[derive(Debug)]
213pub struct ServerBuilder<H: McpHandler> {
214 handler: H,
215 transport: Transport,
216 config: ServerConfigBuilder,
217 graceful_shutdown: Option<Duration>,
218}
219
220impl<H: McpHandler> ServerBuilder<H> {
221 /// Create a new server builder wrapping the given handler.
222 pub fn new(handler: H) -> Self {
223 Self {
224 handler,
225 transport: Transport::default(),
226 config: ServerConfig::builder(),
227 graceful_shutdown: None,
228 }
229 }
230
231 /// Set the transport for this server.
232 ///
233 /// # Example
234 ///
235 /// ```rust,ignore
236 /// builder.transport(Transport::http("0.0.0.0:8080"))
237 /// ```
238 #[must_use]
239 pub fn transport(mut self, transport: Transport) -> Self {
240 self.transport = transport;
241 self
242 }
243
244 /// Configure rate limiting.
245 ///
246 /// # Arguments
247 ///
248 /// * `requests` - Maximum requests allowed
249 /// * `per` - Time window for the limit
250 ///
251 /// # Example
252 ///
253 /// ```rust,ignore
254 /// // Allow 100 requests per second
255 /// builder.with_rate_limit(100, Duration::from_secs(1))
256 /// ```
257 #[must_use]
258 pub fn with_rate_limit(mut self, max_requests: u32, window: Duration) -> Self {
259 self.config = self.config.rate_limit(RateLimitConfig {
260 max_requests,
261 window,
262 per_client: true,
263 });
264 self
265 }
266
267 /// Configure maximum concurrent connections.
268 ///
269 /// This limit applies to TCP, HTTP, WebSocket, and Unix transports.
270 /// STDIO transport always has exactly one connection.
271 ///
272 /// # Example
273 ///
274 /// ```rust,ignore
275 /// builder.with_connection_limit(1000)
276 /// ```
277 #[must_use]
278 pub fn with_connection_limit(mut self, max: usize) -> Self {
279 self.config = self.config.connection_limits(ConnectionLimits {
280 max_tcp_connections: max,
281 max_websocket_connections: max,
282 max_http_concurrent: max,
283 max_unix_connections: max,
284 });
285 self
286 }
287
288 /// Configure graceful shutdown timeout.
289 ///
290 /// When the server receives a shutdown signal, it will wait up to
291 /// this duration for in-flight requests to complete.
292 ///
293 /// # Example
294 ///
295 /// ```rust,ignore
296 /// builder.with_graceful_shutdown(Duration::from_secs(30))
297 /// ```
298 #[must_use]
299 pub fn with_graceful_shutdown(mut self, timeout: Duration) -> Self {
300 self.graceful_shutdown = Some(timeout);
301 self
302 }
303
304 /// Configure maximum message size.
305 ///
306 /// Messages exceeding this size will be rejected.
307 /// Default: 10MB.
308 ///
309 /// # Example
310 ///
311 /// ```rust,ignore
312 /// // Limit messages to 1MB
313 /// builder.with_max_message_size(1024 * 1024)
314 /// ```
315 #[must_use]
316 pub fn with_max_message_size(mut self, size: usize) -> Self {
317 self.config = self.config.max_message_size(size);
318 self
319 }
320
321 /// Apply a custom server configuration.
322 ///
323 /// This replaces any previously set configuration options.
324 ///
325 /// # Example
326 ///
327 /// ```rust,ignore
328 /// let config = ServerConfig::builder()
329 /// .rate_limit(rate_config)
330 /// .connection_limits(limits)
331 /// .build();
332 ///
333 /// builder.with_config(config)
334 /// ```
335 #[must_use]
336 pub fn with_config(mut self, config: ServerConfig) -> Self {
337 let mut builder = ServerConfig::builder()
338 .protocol(config.protocol)
339 .connection_limits(config.connection_limits)
340 .required_capabilities(config.required_capabilities)
341 .max_message_size(config.max_message_size);
342
343 if let Some(rate_limit) = config.rate_limit {
344 builder = builder.rate_limit(rate_limit);
345 }
346
347 self.config = builder;
348 self
349 }
350
351 /// Run the server with the configured transport.
352 ///
353 /// This is the main entry point that starts the server and blocks
354 /// until shutdown.
355 ///
356 /// # Example
357 ///
358 /// ```rust,ignore
359 /// MyServer.builder()
360 /// .transport(Transport::http("0.0.0.0:8080"))
361 /// .serve()
362 /// .await?;
363 /// ```
364 #[allow(unused_variables)]
365 pub async fn serve(self) -> McpResult<()> {
366 // Config is used by transport-specific features (http, websocket, tcp, unix)
367 // STDIO doesn't use config, so this may be unused if only stdio is enabled
368 let config = self.config.build();
369
370 match self.transport {
371 Transport::Stdio => {
372 #[cfg(feature = "stdio")]
373 {
374 super::transport::stdio::run(&self.handler).await
375 }
376 #[cfg(not(feature = "stdio"))]
377 {
378 Err(turbomcp_core::error::McpError::internal(
379 "STDIO transport not available. Enable the 'stdio' feature.",
380 ))
381 }
382 }
383
384 #[cfg(feature = "http")]
385 Transport::Http { addr } => {
386 super::transport::http::run_with_config(&self.handler, &addr, &config).await
387 }
388
389 #[cfg(feature = "websocket")]
390 Transport::WebSocket { addr } => {
391 super::transport::websocket::run_with_config(&self.handler, &addr, &config).await
392 }
393
394 #[cfg(feature = "tcp")]
395 Transport::Tcp { addr } => {
396 super::transport::tcp::run_with_config(&self.handler, &addr, &config).await
397 }
398
399 #[cfg(feature = "unix")]
400 Transport::Unix { path } => {
401 super::transport::unix::run_with_config(&self.handler, &path, &config).await
402 }
403 }
404 }
405
406 /// Get the underlying handler.
407 ///
408 /// Useful for testing or custom integrations.
409 #[must_use]
410 pub fn handler(&self) -> &H {
411 &self.handler
412 }
413
414 /// Consume the builder and return the handler.
415 ///
416 /// Useful for custom integrations where you need ownership.
417 #[must_use]
418 pub fn into_handler(self) -> H {
419 self.handler
420 }
421
422 /// Convert to an Axum router for BYO server integration.
423 ///
424 /// This allows you to merge MCP routes with your existing Axum application.
425 /// Rate limiting configured via `with_rate_limit()` is applied to all requests.
426 ///
427 /// # Example
428 ///
429 /// ```rust,ignore
430 /// use axum::Router;
431 /// use axum::routing::get;
432 ///
433 /// let mcp_router = MyServer.builder()
434 /// .with_rate_limit(100, Duration::from_secs(1))
435 /// .into_axum_router();
436 ///
437 /// let app = Router::new()
438 /// .route("/health", get(|| async { "OK" }))
439 /// .merge(mcp_router);
440 ///
441 /// let listener = tokio::net::TcpListener::bind("0.0.0.0:8080").await?;
442 /// axum::serve(listener, app).await?;
443 /// ```
444 #[cfg(feature = "http")]
445 pub fn into_axum_router(self) -> axum::Router {
446 use axum::{Router, routing::post};
447 use std::sync::Arc;
448
449 let config = self.config.build();
450 let handler = Arc::new(self.handler);
451 let rate_limiter = config
452 .rate_limit
453 .map(|cfg| Arc::new(crate::config::RateLimiter::new(cfg)));
454
455 Router::new()
456 .route("/", post(handle_json_rpc::<H>))
457 .route("/mcp", post(handle_json_rpc::<H>))
458 .with_state(AppState {
459 handler,
460 rate_limiter,
461 })
462 }
463
464 /// Convert to a Tower service for custom server integration.
465 ///
466 /// This returns a service that can be used with any Tower-compatible
467 /// HTTP server (Hyper, Axum, Warp, etc.).
468 ///
469 /// # Example
470 ///
471 /// ```rust,ignore
472 /// use hyper::server::conn::http1;
473 /// use hyper_util::rt::TokioIo;
474 ///
475 /// let service = MyServer.builder().into_service();
476 ///
477 /// let listener = tokio::net::TcpListener::bind("0.0.0.0:8080").await?;
478 /// loop {
479 /// let (stream, _) = listener.accept().await?;
480 /// let service = service.clone();
481 /// tokio::spawn(async move {
482 /// http1::Builder::new()
483 /// .serve_connection(TokioIo::new(stream), service)
484 /// .await
485 /// });
486 /// }
487 /// ```
488 #[cfg(feature = "http")]
489 pub fn into_service(
490 self,
491 ) -> impl tower::Service<
492 axum::http::Request<axum::body::Body>,
493 Response = axum::http::Response<axum::body::Body>,
494 Error = std::convert::Infallible,
495 Future = impl Future<
496 Output = Result<axum::http::Response<axum::body::Body>, std::convert::Infallible>,
497 > + Send,
498 > + Clone
499 + Send {
500 use tower::ServiceExt;
501 self.into_axum_router()
502 .into_service()
503 .map_err(|e| match e {})
504 }
505}
506
507/// State for the Axum handler.
508#[cfg(feature = "http")]
509#[derive(Clone)]
510struct AppState<H: McpHandler> {
511 handler: std::sync::Arc<H>,
512 rate_limiter: Option<std::sync::Arc<crate::config::RateLimiter>>,
513}
514
515/// JSON-RPC request handler for Axum.
516///
517/// Note: Rate limiting uses global rate limiting when used via `into_axum_router()`.
518/// For per-client rate limiting based on IP, use the full transport which includes
519/// `ConnectInfo` extraction.
520#[cfg(feature = "http")]
521async fn handle_json_rpc<H: McpHandler>(
522 axum::extract::State(state): axum::extract::State<AppState<H>>,
523 axum::Json(request): axum::Json<serde_json::Value>,
524) -> impl axum::response::IntoResponse {
525 use super::context::RequestContext;
526 use super::router::{parse_request, route_request, serialize_response};
527
528 // Check rate limit if configured (uses global rate limiting for BYO server)
529 if let Some(ref limiter) = state.rate_limiter
530 && !limiter.check(None)
531 {
532 return (
533 axum::http::StatusCode::TOO_MANY_REQUESTS,
534 axum::Json(serde_json::json!({
535 "jsonrpc": "2.0",
536 "error": {
537 "code": -32000,
538 "message": "Rate limit exceeded"
539 },
540 "id": null
541 })),
542 );
543 }
544
545 let request_str = match serde_json::to_string(&request) {
546 Ok(s) => s,
547 Err(e) => {
548 return (
549 axum::http::StatusCode::BAD_REQUEST,
550 axum::Json(serde_json::json!({
551 "jsonrpc": "2.0",
552 "error": {
553 "code": -32700,
554 "message": format!("Parse error: {}", e)
555 },
556 "id": null
557 })),
558 );
559 }
560 };
561
562 let parsed = match parse_request(&request_str) {
563 Ok(p) => p,
564 Err(e) => {
565 return (
566 axum::http::StatusCode::BAD_REQUEST,
567 axum::Json(serde_json::json!({
568 "jsonrpc": "2.0",
569 "error": {
570 "code": -32700,
571 "message": format!("Parse error: {}", e)
572 },
573 "id": null
574 })),
575 );
576 }
577 };
578
579 let ctx = RequestContext::http();
580 let core_ctx = ctx.to_core_context();
581 let response = route_request(&*state.handler, parsed, &core_ctx).await;
582
583 if !response.should_send() {
584 return (
585 axum::http::StatusCode::NO_CONTENT,
586 axum::Json(serde_json::json!(null)),
587 );
588 }
589
590 match serialize_response(&response) {
591 Ok(json_str) => {
592 let value: serde_json::Value = serde_json::from_str(&json_str).unwrap_or_default();
593 (axum::http::StatusCode::OK, axum::Json(value))
594 }
595 Err(e) => (
596 axum::http::StatusCode::INTERNAL_SERVER_ERROR,
597 axum::Json(serde_json::json!({
598 "jsonrpc": "2.0",
599 "error": {
600 "code": -32603,
601 "message": format!("Internal error: {}", e)
602 },
603 "id": null
604 })),
605 ),
606 }
607}
608
609/// Extension trait for creating server builders from handlers.
610///
611/// This trait provides the builder pattern for configurable server deployment.
612/// For simple cases, use `McpHandlerExt::run()` directly.
613///
614/// # Design Philosophy
615///
616/// - **Simple**: `handler.run()` → runs with STDIO (via `McpHandlerExt`)
617/// - **Configurable**: `handler.builder().transport(...).serve()` → full control
618///
619/// # Example
620///
621/// ```rust,ignore
622/// use turbomcp::prelude::*;
623///
624/// // Simple (no config needed)
625/// MyServer.run().await?;
626///
627/// // Configurable (builder pattern)
628/// MyServer.builder()
629/// .transport(Transport::http("0.0.0.0:8080"))
630/// .with_rate_limit(100, Duration::from_secs(1))
631/// .serve()
632/// .await?;
633///
634/// // BYO server (Axum integration)
635/// let mcp = MyServer.builder().into_axum_router();
636/// ```
637pub trait McpServerExt: McpHandler + Sized {
638 /// Create a server builder for this handler.
639 ///
640 /// The builder allows configuring transport, rate limits, connection
641 /// limits, and other server options before starting.
642 fn builder(self) -> ServerBuilder<Self> {
643 ServerBuilder::new(self)
644 }
645}
646
647/// Blanket implementation for all McpHandler types.
648impl<T: McpHandler> McpServerExt for T {}
649
650#[cfg(test)]
651mod tests {
652 use super::*;
653 use serde_json::Value;
654 use turbomcp_core::context::RequestContext as CoreRequestContext;
655 use turbomcp_core::error::McpError;
656 use turbomcp_types::{
657 Prompt, PromptResult, Resource, ResourceResult, ServerInfo, Tool, ToolResult,
658 };
659
660 #[derive(Clone)]
661 struct TestHandler;
662
663 #[allow(clippy::manual_async_fn)]
664 impl McpHandler for TestHandler {
665 fn server_info(&self) -> ServerInfo {
666 ServerInfo::new("test", "1.0.0")
667 }
668
669 fn list_tools(&self) -> Vec<Tool> {
670 vec![Tool::new("test", "Test tool")]
671 }
672
673 fn list_resources(&self) -> Vec<Resource> {
674 vec![]
675 }
676
677 fn list_prompts(&self) -> Vec<Prompt> {
678 vec![]
679 }
680
681 fn call_tool<'a>(
682 &'a self,
683 _name: &'a str,
684 _args: Value,
685 _ctx: &'a CoreRequestContext,
686 ) -> impl std::future::Future<Output = McpResult<ToolResult>> + Send + 'a {
687 async { Ok(ToolResult::text("ok")) }
688 }
689
690 fn read_resource<'a>(
691 &'a self,
692 uri: &'a str,
693 _ctx: &'a CoreRequestContext,
694 ) -> impl std::future::Future<Output = McpResult<ResourceResult>> + Send + 'a {
695 let uri = uri.to_string();
696 async move { Err(McpError::resource_not_found(&uri)) }
697 }
698
699 fn get_prompt<'a>(
700 &'a self,
701 name: &'a str,
702 _args: Option<Value>,
703 _ctx: &'a CoreRequestContext,
704 ) -> impl std::future::Future<Output = McpResult<PromptResult>> + Send + 'a {
705 let name = name.to_string();
706 async move { Err(McpError::prompt_not_found(&name)) }
707 }
708 }
709
710 #[test]
711 fn test_transport_default_is_stdio() {
712 let transport = Transport::default();
713 assert!(matches!(transport, Transport::Stdio));
714 }
715
716 #[test]
717 fn test_builder_creation() {
718 let handler = TestHandler;
719 let builder = handler.builder();
720 assert!(matches!(builder.transport, Transport::Stdio));
721 }
722
723 #[test]
724 fn test_builder_transport_selection() {
725 let handler = TestHandler;
726
727 // Test STDIO
728 let builder = handler.clone().builder().transport(Transport::stdio());
729 assert!(matches!(builder.transport, Transport::Stdio));
730 }
731
732 #[cfg(feature = "http")]
733 #[test]
734 fn test_builder_http_transport() {
735 let handler = TestHandler;
736 let builder = handler.builder().transport(Transport::http("0.0.0.0:8080"));
737 assert!(matches!(builder.transport, Transport::Http { .. }));
738 }
739
740 #[test]
741 fn test_builder_rate_limit() {
742 let handler = TestHandler;
743 let builder = handler
744 .builder()
745 .with_rate_limit(100, Duration::from_secs(1));
746
747 let config = builder.config.build();
748 assert!(config.rate_limit.is_some());
749 }
750
751 #[test]
752 fn test_builder_connection_limit() {
753 let handler = TestHandler;
754 let builder = handler.builder().with_connection_limit(500);
755
756 let config = builder.config.build();
757 assert_eq!(config.connection_limits.max_tcp_connections, 500);
758 assert_eq!(config.connection_limits.max_websocket_connections, 500);
759 assert_eq!(config.connection_limits.max_http_concurrent, 500);
760 assert_eq!(config.connection_limits.max_unix_connections, 500);
761 }
762
763 #[test]
764 fn test_builder_graceful_shutdown() {
765 let handler = TestHandler;
766 let builder = handler
767 .builder()
768 .with_graceful_shutdown(Duration::from_secs(30));
769
770 assert_eq!(builder.graceful_shutdown, Some(Duration::from_secs(30)));
771 }
772
773 #[test]
774 fn test_builder_into_handler() {
775 let handler = TestHandler;
776 let builder = handler.builder();
777 let recovered = builder.into_handler();
778 assert_eq!(recovered.server_info().name, "test");
779 }
780}