turbomcp_server/server/core.rs
1//! Core MCP server implementation
2//!
3//! Contains the main McpServer struct and its core functionality including
4//! middleware building, lifecycle management, and server construction.
5
6use std::sync::Arc;
7use tracing::{info, info_span};
8
9#[cfg(feature = "http")]
10use tracing::warn;
11
12use crate::{
13 config::ServerConfig,
14 error::ServerResult,
15 lifecycle::{HealthStatus, ServerLifecycle},
16 metrics::ServerMetrics,
17 registry::HandlerRegistry,
18 routing::RequestRouter,
19 service::McpService,
20};
21
22#[cfg(feature = "middleware")]
23use crate::middleware::MiddlewareStack;
24
25use bytes::Bytes;
26use http::{Request, Response};
27use tokio::time::{Duration, sleep};
28use turbomcp_transport::Transport;
29use turbomcp_transport::core::TransportError;
30
31use super::shutdown::ShutdownHandle;
32
33/// Check if logging should be enabled for STDIO transport
34///
35/// For MCP STDIO transport compliance, logging is disabled by default since stdout
36/// must be reserved exclusively for JSON-RPC messages. This can be overridden by
37/// setting the TURBOMCP_FORCE_LOGGING environment variable.
38pub(crate) fn should_log_for_stdio() -> bool {
39 std::env::var("TURBOMCP_FORCE_LOGGING").is_ok()
40}
41
42/// Wrapper that holds router + headers and implements JsonRpcHandler
43/// This allows us to pass headers to create_context without storing them on the router.
44/// Used by both HTTP and WebSocket transports.
45#[cfg(any(feature = "http", feature = "websocket"))]
46struct HttpHandlerWithHeaders {
47 router: crate::routing::RequestRouter,
48 headers: Option<std::collections::HashMap<String, String>>,
49 transport: &'static str,
50}
51
52#[cfg(any(feature = "http", feature = "websocket"))]
53#[async_trait::async_trait]
54impl turbomcp_protocol::JsonRpcHandler for HttpHandlerWithHeaders {
55 async fn handle_request(&self, req_value: serde_json::Value) -> serde_json::Value {
56 use turbomcp_protocol::jsonrpc::JsonRpcRequest;
57
58 // Parse the request
59 let req: JsonRpcRequest = match serde_json::from_value(req_value) {
60 Ok(r) => r,
61 Err(e) => {
62 return serde_json::json!({
63 "jsonrpc": "2.0",
64 "error": {
65 "code": -32700,
66 "message": format!("Parse error: {}", e)
67 },
68 "id": null
69 });
70 }
71 };
72
73 // Create context with headers and transport type
74 let ctx = self
75 .router
76 .create_context(self.headers.clone(), Some(self.transport));
77
78 // Route the request
79 let response = self.router.route(req, ctx).await;
80
81 // Serialize response
82 match serde_json::to_value(&response) {
83 Ok(v) => v,
84 Err(e) => {
85 serde_json::json!({
86 "jsonrpc": "2.0",
87 "error": {
88 "code": -32603,
89 "message": format!("Internal error: failed to serialize response: {}", e)
90 },
91 "id": response.id
92 })
93 }
94 }
95 }
96
97 fn server_info(&self) -> turbomcp_protocol::ServerInfo {
98 self.router.server_info()
99 }
100}
101
102/// Main MCP server following the Axum/Tower Clone pattern
103///
104/// ## Sharing Pattern
105///
106/// `McpServer` implements `Clone` like Axum's `Router`. All heavy state is Arc-wrapped
107/// internally, making cloning cheap (just atomic reference count increments).
108///
109/// ```rust,no_run
110/// use turbomcp_server::ServerBuilder;
111///
112/// # async fn example() {
113/// let server = ServerBuilder::new().build();
114///
115/// // Clone for passing to functions (cheap - just Arc increments)
116/// let server1 = server.clone();
117/// let server2 = server.clone();
118///
119/// // Access config and health
120/// let config = server1.config();
121/// println!("Server: {}", config.name);
122///
123/// let health = server2.health().await;
124/// println!("Health: {:?}", health);
125/// # }
126/// ```
127///
128/// ## Architecture Notes
129///
130/// The `service` field contains `BoxCloneService` which is `Send + Clone` but NOT `Sync`.
131/// This is intentional and follows Tower's design - users clone the server instead of
132/// Arc-wrapping it.
133///
134/// **Architecture Note**: The service field provides tower::Service integration for
135/// advanced middleware patterns. The request processing pipeline currently uses the
136/// RequestRouter directly. Tower integration can be added via custom middleware layers
137/// when needed for specific use cases (e.g., custom rate limiting, advanced tracing).
138#[derive(Clone)]
139pub struct McpServer {
140 /// Server configuration (Clone-able)
141 pub(crate) config: ServerConfig,
142 /// Handler registry (Arc-wrapped for cheap cloning)
143 pub(crate) registry: Arc<HandlerRegistry>,
144 /// Request router (Arc-wrapped for cheap cloning)
145 pub(crate) router: Arc<RequestRouter>,
146 /// Tower middleware service stack (Clone but !Sync - this is the Tower pattern)
147 ///
148 /// All requests flow through this service stack, which provides:
149 /// - Timeout enforcement
150 /// - Request validation
151 /// - Authorization checks
152 /// - Rate limiting
153 /// - Audit logging
154 /// - And more middleware layers as configured
155 ///
156 /// See `server/transport.rs` for integration with transport layer.
157 pub(crate) service:
158 tower::util::BoxCloneService<Request<Bytes>, Response<Bytes>, crate::ServerError>,
159 /// Server lifecycle (Arc-wrapped for cheap cloning)
160 pub(crate) lifecycle: Arc<ServerLifecycle>,
161 /// Server metrics (Arc-wrapped for cheap cloning)
162 pub(crate) metrics: Arc<ServerMetrics>,
163}
164
165impl std::fmt::Debug for McpServer {
166 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
167 f.debug_struct("McpServer")
168 .field("config", &self.config)
169 .finish()
170 }
171}
172
173impl McpServer {
174 /// Build comprehensive Tower middleware stack (transport-agnostic)
175 ///
176 /// ## Architecture
177 ///
178 /// This creates a complete Tower service stack with conditional middleware layers:
179 /// - **Timeout Layer**: Request timeout enforcement (tower_http)
180 /// - **Validation Layer**: JSON-RPC structure validation
181 /// - **Authorization Layer**: Resource access control
182 /// - **Core Service**: JSON-RPC routing and handler execution
183 ///
184 /// All middleware is composed using Tower's ServiceBuilder pattern, which provides:
185 /// - Top-to-bottom execution order
186 /// - Type-safe layer composition
187 /// - Zero-cost abstractions
188 /// - Clone-able service instances
189 ///
190 /// ## Integration
191 ///
192 /// The resulting BoxCloneService is stored in `self.service` and called from
193 /// `server/transport.rs` for every incoming request. This ensures ALL requests
194 /// flow through the complete middleware pipeline before reaching handlers.
195 ///
196 /// ## Adding Middleware
197 ///
198 /// To add new middleware, update the match arms below to include your layer.
199 /// Follow the pattern of conditional inclusion based on config flags.
200 #[cfg(feature = "middleware")]
201 fn build_middleware_stack(
202 core_service: McpService,
203 stack: MiddlewareStack,
204 ) -> tower::util::BoxCloneService<Request<Bytes>, Response<Bytes>, crate::ServerError> {
205 // COMPREHENSIVE TOWER COMPOSITION - Conditional Layer Stacking
206 //
207 // This approach builds the middleware stack incrementally, boxing at each step.
208 // While this has a small performance cost from multiple boxing operations,
209 // it provides several critical advantages:
210 //
211 // 1. **Maintainability**: No combinatorial explosion (8 match arms → simple chain)
212 // 2. **Extensibility**: Adding new middleware requires only one new block
213 // 3. **Clarity**: Each layer's purpose and configuration is explicit
214 // 4. **Type Safety**: BoxCloneService provides type erasure while preserving Clone
215 //
216 // Performance note: The boxing overhead is negligible compared to network I/O
217 // and handler execution time. Modern allocators make this essentially free.
218
219 // Start with core service as a boxed service for uniform type handling
220 let mut service: tower::util::BoxCloneService<
221 Request<Bytes>,
222 Response<Bytes>,
223 crate::ServerError,
224 > = tower::util::BoxCloneService::new(core_service);
225
226 // Authorization layer removed in 2.0.0 - handle at application layer
227
228 // Layer 2: Validation
229 // Validates request structure after auth but before processing
230 #[cfg(feature = "middleware")]
231 {
232 if let Some(validation_layer) = stack.validation_layer() {
233 service = tower::util::BoxCloneService::new(
234 tower::ServiceBuilder::new()
235 .layer(validation_layer)
236 .service(service),
237 );
238 }
239 }
240
241 // Layer 3: Timeout (outermost)
242 // Applied last so it can enforce timeout on the entire request pipeline
243 #[cfg(feature = "middleware")]
244 {
245 if let Some(timeout_config) = stack.timeout_config
246 && timeout_config.enabled
247 {
248 service = tower::util::BoxCloneService::new(
249 tower::ServiceBuilder::new()
250 .layer(tower_http::timeout::TimeoutLayer::new(
251 timeout_config.request_timeout,
252 ))
253 .service(service),
254 );
255 }
256 }
257
258 // Future middleware can be added here with similar if-let blocks:
259 // if let Some(auth_config) = stack.auth_config { ... }
260 // if let Some(audit_config) = stack.audit_config { ... }
261 // if let Some(rate_limit_config) = stack.rate_limit_config { ... }
262
263 service
264 }
265
266 /// Create a new server
267 #[must_use]
268 pub fn new(config: ServerConfig) -> Self {
269 Self::new_with_registry(config, HandlerRegistry::new())
270 }
271
272 /// Create a new server with an existing registry (used by ServerBuilder)
273 #[must_use]
274 pub(crate) fn new_with_registry(config: ServerConfig, registry: HandlerRegistry) -> Self {
275 let registry = Arc::new(registry);
276 let metrics = Arc::new(ServerMetrics::new());
277 let router = Arc::new(RequestRouter::new(
278 Arc::clone(®istry),
279 Arc::clone(&metrics),
280 config.clone(),
281 ));
282 // Build middleware stack configuration
283 #[cfg(feature = "middleware")]
284 #[cfg_attr(not(feature = "rate-limiting"), allow(unused_mut))]
285 let mut stack = crate::middleware::MiddlewareStack::new();
286
287 // Auto-install rate limiting if enabled in config
288 #[cfg(feature = "rate-limiting")]
289 if config.rate_limiting.enabled {
290 use crate::middleware::rate_limit::{RateLimitStrategy, RateLimits};
291 use std::num::NonZeroU32;
292 use std::time::Duration;
293
294 let rate_config = crate::middleware::RateLimitConfig {
295 strategy: RateLimitStrategy::Global,
296 limits: RateLimits {
297 requests_per_period: NonZeroU32::new(
298 config.rate_limiting.requests_per_second * 60,
299 )
300 .unwrap(), // Convert per-second to per-minute
301 period: Duration::from_secs(60),
302 burst_size: Some(NonZeroU32::new(config.rate_limiting.burst_capacity).unwrap()),
303 },
304 enabled: true,
305 };
306
307 stack = stack.with_rate_limit(rate_config);
308 }
309
310 // Create core MCP service
311 let core_service = McpService::new(
312 Arc::clone(®istry),
313 Arc::clone(&router),
314 Arc::clone(&metrics),
315 );
316
317 // COMPREHENSIVE TOWER SERVICE COMPOSITION
318 // Build the complete middleware stack with proper type erasure
319 //
320 // This service is called from server/transport.rs for EVERY incoming request:
321 // TransportMessage -> http::Request -> service.call() -> http::Response -> TransportMessage
322 //
323 // The Tower middleware stack provides:
324 // ✓ Timeout enforcement (configurable per-request)
325 // ✓ Request validation (JSON-RPC structure)
326 // ✓ Authorization checks (resource access control)
327 // ✓ Rate limiting (if enabled in config)
328 // ✓ Audit logging (configurable)
329 // ✓ And more layers as configured
330 //
331 // BoxCloneService is Clone but !Sync - this is the Tower pattern
332 #[cfg(feature = "middleware")]
333 let service = Self::build_middleware_stack(core_service, stack);
334
335 #[cfg(not(feature = "middleware"))]
336 let service = tower::util::BoxCloneService::new(core_service);
337
338 let lifecycle = Arc::new(ServerLifecycle::new());
339
340 Self {
341 config,
342 registry,
343 router,
344 service,
345 lifecycle,
346 metrics,
347 }
348 }
349
350 /// Get server configuration
351 #[must_use]
352 pub const fn config(&self) -> &ServerConfig {
353 &self.config
354 }
355
356 /// Get handler registry
357 #[must_use]
358 pub const fn registry(&self) -> &Arc<HandlerRegistry> {
359 &self.registry
360 }
361
362 /// Get request router
363 #[must_use]
364 pub const fn router(&self) -> &Arc<RequestRouter> {
365 &self.router
366 }
367
368 /// Get server lifecycle
369 #[must_use]
370 pub const fn lifecycle(&self) -> &Arc<ServerLifecycle> {
371 &self.lifecycle
372 }
373
374 /// Get server metrics
375 #[must_use]
376 pub const fn metrics(&self) -> &Arc<ServerMetrics> {
377 &self.metrics
378 }
379
380 /// Get the Tower service stack (test accessor)
381 ///
382 /// **Note**: This is primarily for integration testing. Production code should
383 /// use the transport layer which calls the service internally via
384 /// `handle_transport_message()`.
385 ///
386 /// Returns a clone of the Tower service stack, which is cheap (BoxCloneService
387 /// is designed for cloning).
388 #[doc(hidden)]
389 pub fn service(
390 &self,
391 ) -> tower::util::BoxCloneService<Request<Bytes>, Response<Bytes>, crate::ServerError> {
392 self.service.clone()
393 }
394
395 /// Get a shutdown handle for graceful server termination
396 ///
397 /// This handle enables external control over server shutdown, essential for:
398 /// - **Production deployments**: Graceful shutdown on SIGTERM/SIGINT
399 /// - **Container orchestration**: Kubernetes graceful pod termination
400 /// - **Load balancer integration**: Health check coordination
401 /// - **Multi-component systems**: Coordinated shutdown sequences
402 /// - **Maintenance operations**: Planned downtime and updates
403 ///
404 /// # Examples
405 ///
406 /// ## Basic shutdown coordination
407 /// ```no_run
408 /// # use turbomcp_server::ServerBuilder;
409 /// # #[tokio::main]
410 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
411 /// let server = ServerBuilder::new().build();
412 /// let shutdown_handle = server.shutdown_handle();
413 ///
414 /// // Coordinate with other services
415 /// tokio::spawn(async move {
416 /// // Wait for external shutdown signal
417 /// tokio::signal::ctrl_c().await.expect("Failed to install Ctrl+C handler");
418 /// println!("Shutdown signal received, terminating gracefully...");
419 /// shutdown_handle.shutdown().await;
420 /// });
421 ///
422 /// // Server will gracefully shut down when signaled
423 /// // server.run_stdio().await?;
424 /// # Ok(())
425 /// # }
426 /// ```
427 ///
428 /// ## Container/Kubernetes deployment
429 /// ```no_run
430 /// # use turbomcp_server::ServerBuilder;
431 /// # use std::sync::Arc;
432 /// # #[tokio::main]
433 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
434 /// let server = ServerBuilder::new().build();
435 /// let shutdown_handle = server.shutdown_handle();
436 /// let shutdown_handle_clone = shutdown_handle.clone();
437 ///
438 /// // Handle multiple signal types with proper platform support
439 /// tokio::spawn(async move {
440 /// #[cfg(unix)]
441 /// {
442 /// use tokio::signal::unix::{signal, SignalKind};
443 /// let mut sigterm = signal(SignalKind::terminate()).unwrap();
444 /// tokio::select! {
445 /// _ = tokio::signal::ctrl_c() => {
446 /// println!("SIGINT received");
447 /// }
448 /// _ = sigterm.recv() => {
449 /// println!("SIGTERM received");
450 /// }
451 /// }
452 /// }
453 /// #[cfg(not(unix))]
454 /// {
455 /// tokio::signal::ctrl_c().await.expect("Failed to install Ctrl+C handler");
456 /// println!("SIGINT received");
457 /// }
458 /// shutdown_handle_clone.shutdown().await;
459 /// });
460 ///
461 /// // Server handles graceful shutdown automatically
462 /// // server.run_tcp("0.0.0.0:8080").await?;
463 /// # Ok(())
464 /// # }
465 /// ```
466 pub fn shutdown_handle(&self) -> ShutdownHandle {
467 ShutdownHandle::new(self.lifecycle.clone())
468 }
469
470 /// Run the server with STDIO transport
471 ///
472 /// # Errors
473 ///
474 /// Returns [`crate::ServerError::Transport`] if:
475 /// - STDIO transport connection fails
476 /// - Message sending/receiving fails
477 /// - Transport disconnection fails
478 #[tracing::instrument(skip(self), fields(
479 transport = "stdio",
480 service_name = %self.config.name,
481 service_version = %self.config.version
482 ))]
483 pub async fn run_stdio(mut self) -> ServerResult<()> {
484 // For STDIO transport, disable logging unless explicitly overridden
485 // STDIO stdout must be reserved exclusively for JSON-RPC messages per MCP protocol
486 if should_log_for_stdio() {
487 info!("Starting MCP server with STDIO transport");
488 }
489
490 // Start performance monitoring for STDIO server
491 let _perf_span = info_span!("server.run", transport = "stdio").entered();
492 info!("Initializing STDIO transport for MCP server");
493
494 self.lifecycle.start().await;
495
496 // BIDIRECTIONAL STDIO SETUP
497 // Create STDIO dispatcher for server-initiated requests (sampling, elicitation, roots, ping)
498 let (request_tx, request_rx) = tokio::sync::mpsc::unbounded_channel();
499
500 // Use fully-qualified path to avoid ambiguity with the turbomcp crate's runtime module
501 let dispatcher = crate::runtime::StdioDispatcher::new(request_tx);
502
503 // Configure router's bidirectional support with the STDIO dispatcher
504 // SAFETY: We have &mut self, so we can safely get mutable access to the Arc'd router
505 // This is the CRITICAL STEP that was missing - without this, all server→client requests fail
506 let router = Arc::make_mut(&mut self.router);
507 router.set_server_request_dispatcher(dispatcher.clone());
508
509 // Run STDIO with full bidirectional support (MCP 2025-06-18 compliant)
510 // This uses the bidirectional-aware runtime that handles both:
511 // - Client→Server requests (tools, resources, prompts)
512 // - Server→Client requests (sampling, elicitation, roots, ping)
513 crate::runtime::run_stdio_bidirectional(self.router.clone(), dispatcher, request_rx)
514 .await
515 .map_err(|e| crate::ServerError::Handler {
516 message: format!("STDIO bidirectional runtime failed: {}", e),
517 context: Some("run_stdio".to_string()),
518 })
519 }
520
521 /// Get health status
522 pub async fn health(&self) -> HealthStatus {
523 self.lifecycle.health().await
524 }
525
526 /// Run server with HTTP transport using default configuration
527 ///
528 /// This provides a working HTTP server with:
529 /// - Standard HTTP POST/GET/DELETE for MCP protocol at `/mcp`
530 /// - Full MCP 2025-06-18 protocol compliance
531 /// - Graceful shutdown support
532 /// - Default rate limiting (100 req/60s)
533 /// - Default security settings (localhost allowed, CORS disabled)
534 ///
535 /// For custom configuration (rate limits, security, CORS), use `run_http_with_config`.
536 ///
537 /// # Examples
538 ///
539 /// ## Basic usage with default configuration
540 /// ```no_run
541 /// use turbomcp_server::ServerBuilder;
542 ///
543 /// #[tokio::main]
544 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
545 /// let server = ServerBuilder::new()
546 /// .name("my-server")
547 /// .version("1.0.0")
548 /// .build();
549 ///
550 /// server.run_http("127.0.0.1:3000").await?;
551 /// Ok(())
552 /// }
553 /// ```
554 ///
555 /// ## With custom configuration
556 /// ```no_run
557 /// use turbomcp_server::ServerBuilder;
558 /// use turbomcp_transport::streamable_http_v2::StreamableHttpConfigBuilder;
559 /// use std::time::Duration;
560 ///
561 /// #[tokio::main]
562 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
563 /// let server = ServerBuilder::new()
564 /// .name("my-server")
565 /// .version("1.0.0")
566 /// .build();
567 ///
568 /// let config = StreamableHttpConfigBuilder::new()
569 /// .with_bind_address("127.0.0.1:3000")
570 /// .allow_any_origin(true) // Enable CORS for development
571 /// .build();
572 ///
573 /// server.run_http_with_config("127.0.0.1:3000", config).await?;
574 /// Ok(())
575 /// }
576 /// ```
577 ///
578 /// # Errors
579 ///
580 /// Returns [`crate::ServerError::Transport`] if:
581 /// - Address resolution fails
582 /// - HTTP server fails to start
583 /// - Transport disconnection fails
584 #[cfg(feature = "http")]
585 #[tracing::instrument(skip(self), fields(
586 transport = "http",
587 service_name = %self.config.name,
588 service_version = %self.config.version,
589 addr = ?addr
590 ))]
591 pub async fn run_http<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
592 self,
593 addr: A,
594 ) -> ServerResult<()> {
595 use turbomcp_transport::streamable_http_v2::StreamableHttpConfigBuilder;
596
597 // Build default configuration
598 let config = StreamableHttpConfigBuilder::new().build();
599
600 self.run_http_with_config(addr, config).await
601 }
602
603 /// Run server with HTTP transport and custom configuration
604 ///
605 /// This provides full control over HTTP server configuration including:
606 /// - Rate limiting (requests per time window, or disabled entirely)
607 /// - Security settings (CORS, origin validation, authentication)
608 /// - Network settings (bind address, endpoint path, keep-alive)
609 /// - Advanced settings (replay buffer size, etc.)
610 ///
611 /// # Bind Address Configuration
612 ///
613 /// **IMPORTANT**: The `addr` parameter takes precedence over `config.bind_addr`.
614 /// If they differ, a deprecation warning is logged.
615 ///
616 /// **Best Practice** (recommended for forward compatibility):
617 /// ```no_run
618 /// # use turbomcp_server::ServerBuilder;
619 /// # use turbomcp_transport::streamable_http_v2::StreamableHttpConfigBuilder;
620 /// # #[tokio::main]
621 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
622 /// let config = StreamableHttpConfigBuilder::new()
623 /// .with_bind_address("127.0.0.1:3001") // Set bind address in config
624 /// .build();
625 ///
626 /// // Pass matching addr parameter (or use default "127.0.0.1:8080")
627 /// ServerBuilder::new()
628 /// .build()
629 /// .run_http_with_config("127.0.0.1:3001", config).await?;
630 /// # Ok(())
631 /// # }
632 /// ```
633 ///
634 /// **Deprecated** (will be removed in v3.x):
635 /// ```no_run
636 /// # use turbomcp_server::ServerBuilder;
637 /// # use turbomcp_transport::streamable_http_v2::StreamableHttpConfigBuilder;
638 /// # #[tokio::main]
639 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
640 /// // ⚠️ Avoid setting different addresses - causes deprecation warning
641 /// let config = StreamableHttpConfigBuilder::new()
642 /// .with_bind_address("0.0.0.0:5000") // This is ignored!
643 /// .build();
644 ///
645 /// // The addr parameter wins (3001 is used, not 5000)
646 /// ServerBuilder::new()
647 /// .build()
648 /// .run_http_with_config("127.0.0.1:3001", config).await?;
649 /// # Ok(())
650 /// # }
651 /// ```
652 ///
653 /// **Future (v3.x)**: The `addr` parameter will be removed. Configure bind address
654 /// via `config.with_bind_address()` only.
655 ///
656 /// # Examples
657 ///
658 /// ## Custom configuration example
659 /// ```no_run
660 /// use turbomcp_server::ServerBuilder;
661 /// use turbomcp_transport::streamable_http_v2::StreamableHttpConfigBuilder;
662 /// use std::time::Duration;
663 ///
664 /// #[tokio::main]
665 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
666 /// let server = ServerBuilder::new()
667 /// .name("custom-server")
668 /// .version("1.0.0")
669 /// .build();
670 ///
671 /// let config = StreamableHttpConfigBuilder::new()
672 /// .with_bind_address("127.0.0.1:3000")
673 /// .with_rate_limit(1000, Duration::from_secs(60)) // 1000 req/min
674 /// .build();
675 ///
676 /// server.run_http_with_config("127.0.0.1:3000", config).await?;
677 /// Ok(())
678 /// }
679 /// ```
680 ///
681 /// ## Production configuration (secure, rate limited)
682 /// ```no_run
683 /// use turbomcp_server::ServerBuilder;
684 /// use turbomcp_transport::streamable_http_v2::StreamableHttpConfigBuilder;
685 /// use std::time::Duration;
686 ///
687 /// #[tokio::main]
688 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
689 /// let server = ServerBuilder::new()
690 /// .name("production-server")
691 /// .version("1.0.0")
692 /// .build();
693 ///
694 /// let config = StreamableHttpConfigBuilder::new()
695 /// .with_bind_address("127.0.0.1:3000")
696 /// .with_rate_limit(1000, Duration::from_secs(60)) // 1000 req/min
697 /// .allow_any_origin(false) // Strict CORS
698 /// .require_authentication(true) // Require auth
699 /// .build();
700 ///
701 /// server.run_http_with_config("127.0.0.1:3000", config).await?;
702 /// Ok(())
703 /// }
704 /// ```
705 ///
706 /// # Errors
707 ///
708 /// Returns [`crate::ServerError::Transport`] if:
709 /// - Address resolution fails
710 /// - HTTP server fails to start
711 /// - Transport disconnection fails
712 #[cfg(feature = "http")]
713 #[tracing::instrument(skip(self, config), fields(
714 transport = "http",
715 service_name = %self.config.name,
716 service_version = %self.config.version,
717 addr = ?addr
718 ))]
719 pub async fn run_http_with_config<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
720 self,
721 addr: A,
722 mut config: turbomcp_transport::streamable_http_v2::StreamableHttpConfig,
723 ) -> ServerResult<()> {
724 use std::collections::HashMap;
725 use tokio::sync::{Mutex, RwLock};
726
727 // Sprint 2.6: Check for insecure 0.0.0.0 binding
728 crate::security_checks::check_binding_security(&addr);
729
730 info!("Starting MCP server with HTTP transport");
731
732 self.lifecycle.start().await;
733
734 // Resolve address to string
735 let socket_addr = addr
736 .to_socket_addrs()
737 .map_err(|e| crate::ServerError::configuration(format!("Invalid address: {}", e)))?
738 .next()
739 .ok_or_else(|| crate::ServerError::configuration("No address resolved"))?;
740
741 info!("Resolved address: {}", socket_addr);
742
743 // Check for conflicting bind addresses and warn about deprecation
744 let socket_addr_str = socket_addr.to_string();
745 if config.bind_addr != socket_addr_str {
746 warn!(
747 addr_parameter = %socket_addr_str,
748 config_bind_addr = %config.bind_addr,
749 "⚠️ DEPRECATION WARNING: The `addr` parameter takes precedence over `config.bind_addr`"
750 );
751 warn!(
752 "⚠️ In TurboMCP v3.x, the `addr` parameter will be removed. Please use StreamableHttpConfigBuilder::new().with_bind_address(\"{}\").build() instead",
753 socket_addr_str
754 );
755 warn!(
756 "⚠️ Avoid setting both `addr` parameter and `config.bind_addr` to prevent confusion"
757 );
758
759 // Update config to single source of truth
760 config.bind_addr = socket_addr_str.clone();
761 config.base_url = format!("http://{}", socket_addr_str);
762 }
763
764 info!(
765 config = ?config,
766 "HTTP configuration (updated with resolved address)"
767 );
768
769 // BIDIRECTIONAL HTTP SETUP
770 // Create shared state for session management and bidirectional MCP
771 let sessions = Arc::new(RwLock::new(HashMap::new()));
772 let pending_requests = Arc::new(Mutex::new(HashMap::new()));
773
774 // Share router across all sessions (routing logic and handler registry)
775 let router = self.router.clone();
776
777 // Capture server identity for MCP protocol compliance
778 let server_info = turbomcp_protocol::ServerInfo {
779 name: self.config.name.clone(),
780 version: self.config.version.clone(),
781 };
782
783 // Factory pattern: create session-specific router for each HTTP request
784 // This is the clean architecture that HTTP requires - each session gets its own
785 // bidirectional dispatcher while sharing the routing logic
786 let sessions_for_factory = Arc::clone(&sessions);
787 let pending_for_factory = Arc::clone(&pending_requests);
788 let router_for_factory = Arc::clone(&router);
789
790 // Create a wrapper that converts headers and delegates to router
791 // This is cleaner than storing headers on the router itself
792 let handler_factory =
793 move |session_id: Option<String>, headers: Option<axum::http::HeaderMap>| {
794 let session_id = session_id.unwrap_or_else(|| {
795 let new_id = uuid::Uuid::new_v4().to_string();
796 tracing::debug!(
797 "HTTP POST without session ID - generating ephemeral ID for request: {}",
798 new_id
799 );
800 new_id
801 });
802
803 tracing::debug!("Factory creating handler for session: {}", session_id);
804
805 // Create session-specific HTTP dispatcher (now local to turbomcp-server!)
806 let dispatcher = crate::runtime::http::HttpDispatcher::new(
807 session_id,
808 Arc::clone(&sessions_for_factory),
809 Arc::clone(&pending_for_factory),
810 );
811
812 // Clone the base router and configure with session-specific dispatcher
813 // CRITICAL: set_server_request_dispatcher also recreates server_to_client adapter
814 let mut session_router = (*router_for_factory).clone();
815 session_router.set_server_request_dispatcher(dispatcher);
816
817 // Convert HeaderMap to HashMap<String, String> for passing to create_context
818 let headers_map = headers.map(|header_map| {
819 header_map
820 .iter()
821 .filter_map(|(name, value)| {
822 value
823 .to_str()
824 .ok()
825 .map(|v| (name.to_string(), v.to_string()))
826 })
827 .collect()
828 });
829
830 // Create wrapper that passes headers to create_context (HTTP transport)
831 HttpHandlerWithHeaders {
832 router: session_router,
833 headers: headers_map,
834 transport: "http",
835 }
836 };
837
838 info!(
839 server_name = %server_info.name,
840 server_version = %server_info.version,
841 bind_addr = %socket_addr,
842 endpoint_path = %config.endpoint_path,
843 "HTTP server starting with full bidirectional support (elicitation, sampling, roots, ping)"
844 );
845
846 // Use factory-based HTTP server with full bidirectional support
847 use crate::runtime::http::run_http;
848 run_http(
849 handler_factory,
850 sessions,
851 pending_requests,
852 socket_addr.to_string(),
853 config.endpoint_path.clone(),
854 )
855 .await
856 .map_err(|e| {
857 tracing::error!(error = %e, "HTTP server failed");
858 crate::ServerError::handler(e.to_string())
859 })?;
860
861 info!("HTTP server shutdown complete");
862 Ok(())
863 }
864
865 /// Run server with WebSocket transport (full bidirectional support)
866 ///
867 /// This provides a simple API for WebSocket servers with sensible defaults:
868 /// - Default endpoint: `/mcp/ws`
869 /// - Full MCP 2025-06-18 compliance
870 /// - Bidirectional communication
871 /// - Elicitation support
872 /// - Session management and middleware
873 ///
874 /// For custom configuration, use `run_websocket_with_config()`.
875 ///
876 /// # Example
877 ///
878 /// ```no_run
879 /// use turbomcp_server::ServerBuilder;
880 ///
881 /// #[tokio::main]
882 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
883 /// let server = ServerBuilder::new()
884 /// .name("ws-server")
885 /// .version("1.0.0")
886 /// .build();
887 ///
888 /// server.run_websocket("127.0.0.1:8080").await?;
889 /// Ok(())
890 /// }
891 /// ```
892 #[cfg(feature = "websocket")]
893 #[tracing::instrument(skip(self), fields(
894 transport = "websocket",
895 service_name = %self.config.name,
896 service_version = %self.config.version,
897 addr = ?addr
898 ))]
899 pub async fn run_websocket<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
900 self,
901 addr: A,
902 ) -> ServerResult<()> {
903 use crate::config::WebSocketServerConfig;
904
905 // Build default configuration
906 let config = WebSocketServerConfig::default();
907
908 self.run_websocket_with_config(addr, config).await
909 }
910
911 /// Run server with WebSocket transport and custom configuration
912 ///
913 /// This provides full control over WebSocket server configuration including:
914 /// - Custom endpoint path
915 /// - MCP server settings (middleware, security, etc.)
916 ///
917 /// # Example
918 ///
919 /// ```no_run
920 /// use turbomcp_server::{ServerBuilder, WebSocketServerConfig};
921 ///
922 /// #[tokio::main]
923 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
924 /// let server = ServerBuilder::new()
925 /// .name("custom-ws-server")
926 /// .version("1.0.0")
927 /// .build();
928 ///
929 /// let config = WebSocketServerConfig {
930 /// bind_addr: "127.0.0.1:8080".to_string(),
931 /// endpoint_path: "/custom/ws".to_string(),
932 /// max_concurrent_requests: 100,
933 /// };
934 ///
935 /// server.run_websocket_with_config("127.0.0.1:8080", config).await?;
936 /// Ok(())
937 /// }
938 /// ```
939 #[cfg(feature = "websocket")]
940 #[tracing::instrument(skip(self, config), fields(
941 transport = "websocket",
942 service_name = %self.config.name,
943 service_version = %self.config.version,
944 addr = ?addr
945 ))]
946 pub async fn run_websocket_with_config<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
947 self,
948 addr: A,
949 config: crate::config::WebSocketServerConfig,
950 ) -> ServerResult<()> {
951 use axum::{Router, middleware, routing::get};
952 use turbomcp_transport::axum::{WebSocketFactoryState, websocket_handler_with_factory};
953 use turbomcp_transport::tower::SessionInfo;
954
955 // Sprint 2.6: Check for insecure 0.0.0.0 binding
956 crate::security_checks::check_binding_security(&addr);
957
958 info!("Starting MCP server with WebSocket transport");
959 info!(config = ?config, "WebSocket configuration");
960
961 self.lifecycle.start().await;
962
963 // Resolve address to string
964 let socket_addr = addr
965 .to_socket_addrs()
966 .map_err(|e| crate::ServerError::configuration(format!("Invalid address: {}", e)))?
967 .next()
968 .ok_or_else(|| crate::ServerError::configuration("No address resolved"))?;
969
970 info!("Resolved address: {}", socket_addr);
971
972 // Capture server identity for MCP protocol compliance
973 let server_info = turbomcp_protocol::ServerInfo {
974 name: self.config.name.clone(),
975 version: self.config.version.clone(),
976 };
977
978 // Router for this server (shared across all connections)
979 let router = (*self.router).clone();
980
981 // Factory: creates per-connection handler with bidirectional support
982 // This is the unified architecture - transport layer handles WebSocket mechanics,
983 // server layer provides MCP-specific handler logic
984 let handler_factory =
985 move |transport_dispatcher: turbomcp_transport::axum::WebSocketDispatcher,
986 headers: Option<std::collections::HashMap<String, String>>| {
987 // Wrap transport dispatcher with server layer adapter
988 let server_dispatcher =
989 crate::routing::WebSocketDispatcherAdapter::new(transport_dispatcher);
990
991 // Clone router for this connection and configure with dispatcher
992 let mut connection_router = router.clone();
993 connection_router.set_server_request_dispatcher(server_dispatcher);
994
995 // Create wrapper that passes headers to create_context (WebSocket transport)
996 // We can reuse HttpHandlerWithHeaders since it's generic
997 Arc::new(HttpHandlerWithHeaders {
998 router: connection_router,
999 headers,
1000 transport: "websocket",
1001 }) as Arc<dyn turbomcp_protocol::JsonRpcHandler>
1002 };
1003
1004 info!(
1005 server_name = %server_info.name,
1006 server_version = %server_info.version,
1007 bind_addr = %socket_addr,
1008 endpoint_path = %config.endpoint_path,
1009 "WebSocket server starting with full bidirectional support (elicitation, sampling, roots, ping)"
1010 );
1011
1012 // Create factory state for transport layer
1013 let factory_state = WebSocketFactoryState::new(handler_factory);
1014
1015 // Session middleware to extract headers (same as HTTP transport)
1016 let session_middleware = |mut request: axum::extract::Request, next: middleware::Next| async move {
1017 let mut session = SessionInfo::new();
1018
1019 // Extract headers and store in session metadata
1020 for (name, value) in request.headers().iter() {
1021 if let Ok(value_str) = value.to_str() {
1022 session
1023 .metadata
1024 .insert(name.to_string(), value_str.to_string());
1025 }
1026 }
1027
1028 // Extract specific useful headers
1029 if let Some(user_agent) = request
1030 .headers()
1031 .get("user-agent")
1032 .and_then(|v| v.to_str().ok())
1033 {
1034 session.user_agent = Some(user_agent.to_string());
1035 }
1036
1037 if let Some(remote_addr) = request
1038 .headers()
1039 .get("x-forwarded-for")
1040 .and_then(|v| v.to_str().ok())
1041 {
1042 session.remote_addr = Some(remote_addr.to_string());
1043 }
1044
1045 request.extensions_mut().insert(session);
1046 next.run(request).await
1047 };
1048
1049 // Build Axum router using transport layer
1050 let app = Router::new()
1051 .route(&config.endpoint_path, get(websocket_handler_with_factory))
1052 .with_state(factory_state)
1053 .layer(middleware::from_fn(session_middleware));
1054
1055 info!("WebSocket server bound to {}", socket_addr);
1056
1057 // Serve using Axum
1058 let listener = tokio::net::TcpListener::bind(socket_addr)
1059 .await
1060 .map_err(|e| crate::ServerError::configuration(format!("Failed to bind: {}", e)))?;
1061
1062 axum::serve(listener, app).await.map_err(|e| {
1063 tracing::error!(error = %e, "WebSocket server failed");
1064 crate::ServerError::handler(e.to_string())
1065 })?;
1066
1067 info!("WebSocket server shutdown complete");
1068 Ok(())
1069 }
1070
1071 /// Run server with TCP transport (progressive enhancement - runtime configuration)
1072 #[cfg(feature = "tcp")]
1073 #[tracing::instrument(skip(self), fields(
1074 transport = "tcp",
1075 service_name = %self.config.name,
1076 service_version = %self.config.version,
1077 addr = ?addr
1078 ))]
1079 pub async fn run_tcp<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
1080 mut self,
1081 addr: A,
1082 ) -> ServerResult<()> {
1083 use turbomcp_transport::TcpTransport;
1084
1085 // Sprint 2.6: Check for insecure 0.0.0.0 binding
1086 crate::security_checks::check_binding_security(&addr);
1087
1088 // Start performance monitoring for TCP server
1089 let _perf_span = info_span!("server.run", transport = "tcp").entered();
1090 info!(?addr, "Starting MCP server with TCP transport");
1091
1092 self.lifecycle.start().await;
1093
1094 // Convert ToSocketAddrs to SocketAddr
1095 let socket_addr = match addr.to_socket_addrs() {
1096 Ok(mut addrs) => match addrs.next() {
1097 Some(addr) => addr,
1098 None => {
1099 tracing::error!("No socket address resolved from provided address");
1100 self.lifecycle.shutdown().await;
1101 return Err(crate::ServerError::configuration("Invalid socket address"));
1102 }
1103 },
1104 Err(e) => {
1105 tracing::error!(error = %e, "Failed to resolve socket address");
1106 self.lifecycle.shutdown().await;
1107 return Err(crate::ServerError::configuration(format!(
1108 "Address resolution failed: {e}"
1109 )));
1110 }
1111 };
1112
1113 let transport = TcpTransport::new_server(socket_addr);
1114 if let Err(e) = transport.connect().await {
1115 tracing::error!(error = %e, "Failed to connect TCP transport");
1116 self.lifecycle.shutdown().await;
1117 return Err(e.into());
1118 }
1119
1120 // BIDIRECTIONAL TCP SETUP
1121 // Create generic transport dispatcher for server-initiated requests
1122 let dispatcher = crate::runtime::TransportDispatcher::new(transport);
1123
1124 // Configure router's bidirectional support with the TCP dispatcher
1125 // This enables ctx.elicit(), ctx.create_message(), ctx.list_roots(), etc.
1126 let router = Arc::make_mut(&mut self.router);
1127 router.set_server_request_dispatcher(dispatcher.clone());
1128
1129 // Run TCP with full bidirectional support (MCP 2025-06-18 compliant)
1130 // This uses the generic bidirectional runtime that handles both:
1131 // - Client→Server requests (tools, resources, prompts)
1132 // - Server→Client requests (sampling, elicitation, roots, ping)
1133 crate::runtime::run_transport_bidirectional(self.router.clone(), dispatcher)
1134 .await
1135 .map_err(|e| crate::ServerError::Handler {
1136 message: format!("TCP bidirectional runtime failed: {}", e),
1137 context: Some("run_tcp".to_string()),
1138 })
1139 }
1140
1141 /// Run server with Unix socket transport (progressive enhancement - runtime configuration)
1142 #[cfg(all(feature = "unix", unix))]
1143 #[tracing::instrument(skip(self), fields(
1144 transport = "unix",
1145 service_name = %self.config.name,
1146 service_version = %self.config.version,
1147 path = ?path.as_ref()
1148 ))]
1149 pub async fn run_unix<P: AsRef<std::path::Path>>(mut self, path: P) -> ServerResult<()> {
1150 use std::path::PathBuf;
1151 use turbomcp_transport::UnixTransport;
1152
1153 // Start performance monitoring for Unix server
1154 let _perf_span = info_span!("server.run", transport = "unix").entered();
1155 info!(path = ?path.as_ref(), "Starting MCP server with Unix socket transport");
1156
1157 self.lifecycle.start().await;
1158
1159 let socket_path = PathBuf::from(path.as_ref());
1160 let transport = UnixTransport::new_server(socket_path);
1161 if let Err(e) = transport.connect().await {
1162 tracing::error!(error = %e, "Failed to connect Unix socket transport");
1163 self.lifecycle.shutdown().await;
1164 return Err(e.into());
1165 }
1166
1167 // BIDIRECTIONAL UNIX SOCKET SETUP
1168 // Create generic transport dispatcher for server-initiated requests
1169 let dispatcher = crate::runtime::TransportDispatcher::new(transport);
1170
1171 // Configure router's bidirectional support with the Unix socket dispatcher
1172 // This enables ctx.elicit(), ctx.create_message(), ctx.list_roots(), etc.
1173 let router = Arc::make_mut(&mut self.router);
1174 router.set_server_request_dispatcher(dispatcher.clone());
1175
1176 // Run Unix Socket with full bidirectional support (MCP 2025-06-18 compliant)
1177 // This uses the generic bidirectional runtime that handles both:
1178 // - Client→Server requests (tools, resources, prompts)
1179 // - Server→Client requests (sampling, elicitation, roots, ping)
1180 crate::runtime::run_transport_bidirectional(self.router.clone(), dispatcher)
1181 .await
1182 .map_err(|e| crate::ServerError::Handler {
1183 message: format!("Unix socket bidirectional runtime failed: {}", e),
1184 context: Some("run_unix".to_string()),
1185 })
1186 }
1187
1188 /// Generic transport runner (DRY principle)
1189 /// Used by feature-gated transport methods (http, tcp, websocket, unix)
1190 #[allow(dead_code)]
1191 #[tracing::instrument(skip(self, transport), fields(
1192 service_name = %self.config.name,
1193 service_version = %self.config.version
1194 ))]
1195 async fn run_with_transport<T: Transport>(&self, mut transport: T) -> ServerResult<()> {
1196 // Install signal handlers for graceful shutdown (Ctrl+C / SIGTERM)
1197 let lifecycle_for_sigint = self.lifecycle.clone();
1198 tokio::spawn(async move {
1199 if let Err(e) = tokio::signal::ctrl_c().await {
1200 tracing::warn!(error = %e, "Failed to install Ctrl+C handler");
1201 return;
1202 }
1203 tracing::info!("Ctrl+C received, initiating shutdown");
1204 lifecycle_for_sigint.shutdown().await;
1205 });
1206
1207 #[cfg(unix)]
1208 {
1209 let lifecycle_for_sigterm = self.lifecycle.clone();
1210 tokio::spawn(async move {
1211 use tokio::signal::unix::{SignalKind, signal};
1212 match signal(SignalKind::terminate()) {
1213 Ok(mut sigterm) => {
1214 sigterm.recv().await;
1215 tracing::info!("SIGTERM received, initiating shutdown");
1216 lifecycle_for_sigterm.shutdown().await;
1217 }
1218 Err(e) => tracing::warn!(error = %e, "Failed to install SIGTERM handler"),
1219 }
1220 });
1221 }
1222
1223 // Shutdown signal
1224 let mut shutdown = self.lifecycle.shutdown_signal();
1225
1226 // Main message processing loop
1227 loop {
1228 tokio::select! {
1229 _ = shutdown.recv() => {
1230 tracing::info!("Shutdown signal received");
1231 break;
1232 }
1233 res = transport.receive() => {
1234 match res {
1235 Ok(Some(message)) => {
1236 if let Err(e) = self.handle_transport_message(&mut transport, message).await {
1237 tracing::warn!(error = %e, "Failed to handle transport message");
1238 }
1239 }
1240 Ok(None) => {
1241 // No message available; sleep briefly to avoid busy loop
1242 sleep(Duration::from_millis(5)).await;
1243 }
1244 Err(e) => {
1245 match e {
1246 TransportError::ReceiveFailed(msg) if msg.contains("disconnected") => {
1247 tracing::info!("Transport receive channel disconnected; shutting down");
1248 break;
1249 }
1250 _ => {
1251 tracing::error!(error = %e, "Transport receive failed");
1252 // Backoff on errors
1253 sleep(Duration::from_millis(50)).await;
1254 }
1255 }
1256 }
1257 }
1258 }
1259 }
1260 }
1261
1262 // Disconnect transport
1263 if let Err(e) = transport.disconnect().await {
1264 tracing::warn!(error = %e, "Error while disconnecting transport");
1265 }
1266
1267 tracing::info!("Server shutdown complete");
1268 Ok(())
1269 }
1270}
1271
1272// Compile-time assertion that McpServer is Send + Clone (Tower pattern)
1273// Note: McpServer is Clone but NOT Sync (due to BoxCloneService being !Sync)
1274// This is intentional and follows the Axum/Tower design pattern
1275#[allow(dead_code)]
1276const _: () = {
1277 const fn assert_send_clone<T: Send + Clone>() {}
1278 const fn check() {
1279 assert_send_clone::<crate::server::core::McpServer>();
1280 }
1281};