turbomcp_server/server/
core.rs

1//! Core MCP server implementation
2//!
3//! Contains the main McpServer struct and its core functionality including
4//! middleware building, lifecycle management, and server construction.
5
6use std::sync::Arc;
7use tracing::{info, info_span};
8
9use crate::{
10    config::ServerConfig,
11    error::ServerResult,
12    lifecycle::{HealthStatus, ServerLifecycle},
13    metrics::ServerMetrics,
14    registry::HandlerRegistry,
15    routing::RequestRouter,
16    service::McpService,
17};
18
19#[cfg(feature = "middleware")]
20use crate::middleware::MiddlewareStack;
21
22use bytes::Bytes;
23use http::{Request, Response};
24use tokio::time::{Duration, sleep};
25use turbomcp_transport::core::TransportError;
26use turbomcp_transport::{StdioTransport, Transport};
27
28use super::shutdown::ShutdownHandle;
29
30/// Check if logging should be enabled for STDIO transport
31///
32/// For MCP STDIO transport compliance, logging is disabled by default since stdout
33/// must be reserved exclusively for JSON-RPC messages. This can be overridden by
34/// setting the TURBOMCP_FORCE_LOGGING environment variable.
35pub(crate) fn should_log_for_stdio() -> bool {
36    std::env::var("TURBOMCP_FORCE_LOGGING").is_ok()
37}
38
39/// Main MCP server following the Axum/Tower Clone pattern
40///
41/// ## Sharing Pattern
42///
43/// `McpServer` implements `Clone` like Axum's `Router`. All heavy state is Arc-wrapped
44/// internally, making cloning cheap (just atomic reference count increments).
45///
46/// ```rust,no_run
47/// use turbomcp_server::ServerBuilder;
48///
49/// # async fn example() {
50/// let server = ServerBuilder::new().build();
51///
52/// // Clone for passing to functions (cheap - just Arc increments)
53/// let server1 = server.clone();
54/// let server2 = server.clone();
55///
56/// // Access config and health
57/// let config = server1.config();
58/// println!("Server: {}", config.name);
59///
60/// let health = server2.health().await;
61/// println!("Health: {:?}", health);
62/// # }
63/// ```
64///
65/// ## Architecture Notes
66///
67/// The `service` field contains `BoxCloneService` which is `Send + Clone` but NOT `Sync`.
68/// This is intentional and follows Tower's design - users clone the server instead of
69/// Arc-wrapping it.
70///
71/// **Architecture Note**: The service field provides tower::Service integration for
72/// advanced middleware patterns. The request processing pipeline currently uses the
73/// RequestRouter directly. Tower integration can be added via custom middleware layers
74/// when needed for specific use cases (e.g., custom rate limiting, advanced tracing).
75#[derive(Clone)]
76pub struct McpServer {
77    /// Server configuration (Clone-able)
78    pub(crate) config: ServerConfig,
79    /// Handler registry (Arc-wrapped for cheap cloning)
80    pub(crate) registry: Arc<HandlerRegistry>,
81    /// Request router (Arc-wrapped for cheap cloning)
82    pub(crate) router: Arc<RequestRouter>,
83    /// Tower middleware service stack (Clone but !Sync - this is the Tower pattern)
84    ///
85    /// All requests flow through this service stack, which provides:
86    /// - Timeout enforcement
87    /// - Request validation
88    /// - Authorization checks
89    /// - Rate limiting
90    /// - Audit logging
91    /// - And more middleware layers as configured
92    ///
93    /// See `server/transport.rs` for integration with transport layer.
94    pub(crate) service:
95        tower::util::BoxCloneService<Request<Bytes>, Response<Bytes>, crate::ServerError>,
96    /// Server lifecycle (Arc-wrapped for cheap cloning)
97    pub(crate) lifecycle: Arc<ServerLifecycle>,
98    /// Server metrics (Arc-wrapped for cheap cloning)
99    pub(crate) metrics: Arc<ServerMetrics>,
100}
101
102impl std::fmt::Debug for McpServer {
103    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104        f.debug_struct("McpServer")
105            .field("config", &self.config)
106            .finish()
107    }
108}
109
110impl McpServer {
111    /// Build comprehensive Tower middleware stack (transport-agnostic)
112    ///
113    /// ## Architecture
114    ///
115    /// This creates a complete Tower service stack with conditional middleware layers:
116    /// - **Timeout Layer**: Request timeout enforcement (tower_http)
117    /// - **Validation Layer**: JSON-RPC structure validation
118    /// - **Authorization Layer**: Resource access control
119    /// - **Core Service**: JSON-RPC routing and handler execution
120    ///
121    /// All middleware is composed using Tower's ServiceBuilder pattern, which provides:
122    /// - Top-to-bottom execution order
123    /// - Type-safe layer composition
124    /// - Zero-cost abstractions
125    /// - Clone-able service instances
126    ///
127    /// ## Integration
128    ///
129    /// The resulting BoxCloneService is stored in `self.service` and called from
130    /// `server/transport.rs` for every incoming request. This ensures ALL requests
131    /// flow through the complete middleware pipeline before reaching handlers.
132    ///
133    /// ## Adding Middleware
134    ///
135    /// To add new middleware, update the match arms below to include your layer.
136    /// Follow the pattern of conditional inclusion based on config flags.
137    #[cfg(feature = "middleware")]
138    fn build_middleware_stack(
139        core_service: McpService,
140        stack: MiddlewareStack,
141    ) -> tower::util::BoxCloneService<Request<Bytes>, Response<Bytes>, crate::ServerError> {
142        // COMPREHENSIVE TOWER COMPOSITION - Conditional Layer Stacking
143        //
144        // This approach builds the middleware stack incrementally, boxing at each step.
145        // While this has a small performance cost from multiple boxing operations,
146        // it provides several critical advantages:
147        //
148        // 1. **Maintainability**: No combinatorial explosion (8 match arms → simple chain)
149        // 2. **Extensibility**: Adding new middleware requires only one new block
150        // 3. **Clarity**: Each layer's purpose and configuration is explicit
151        // 4. **Type Safety**: BoxCloneService provides type erasure while preserving Clone
152        //
153        // Performance note: The boxing overhead is negligible compared to network I/O
154        // and handler execution time. Modern allocators make this essentially free.
155
156        // Start with core service as a boxed service for uniform type handling
157        let mut service: tower::util::BoxCloneService<
158            Request<Bytes>,
159            Response<Bytes>,
160            crate::ServerError,
161        > = tower::util::BoxCloneService::new(core_service);
162
163        // Authorization layer removed in 2.0.0 - handle at application layer
164
165        // Layer 2: Validation
166        // Validates request structure after auth but before processing
167        #[cfg(feature = "middleware")]
168        {
169            if let Some(validation_layer) = stack.validation_layer() {
170                service = tower::util::BoxCloneService::new(
171                    tower::ServiceBuilder::new()
172                        .layer(validation_layer)
173                        .service(service),
174                );
175            }
176        }
177
178        // Layer 3: Timeout (outermost)
179        // Applied last so it can enforce timeout on the entire request pipeline
180        #[cfg(feature = "middleware")]
181        {
182            if let Some(timeout_config) = stack.timeout_config
183                && timeout_config.enabled
184            {
185                service = tower::util::BoxCloneService::new(
186                    tower::ServiceBuilder::new()
187                        .layer(tower_http::timeout::TimeoutLayer::new(
188                            timeout_config.request_timeout,
189                        ))
190                        .service(service),
191                );
192            }
193        }
194
195        // Future middleware can be added here with similar if-let blocks:
196        // if let Some(auth_config) = stack.auth_config { ... }
197        // if let Some(audit_config) = stack.audit_config { ... }
198        // if let Some(rate_limit_config) = stack.rate_limit_config { ... }
199
200        service
201    }
202
203    /// Create a new server
204    #[must_use]
205    pub fn new(config: ServerConfig) -> Self {
206        Self::new_with_registry(config, HandlerRegistry::new())
207    }
208
209    /// Create a new server with an existing registry (used by ServerBuilder)
210    #[must_use]
211    pub(crate) fn new_with_registry(config: ServerConfig, registry: HandlerRegistry) -> Self {
212        let registry = Arc::new(registry);
213        let metrics = Arc::new(ServerMetrics::new());
214        let router = Arc::new(RequestRouter::new(
215            Arc::clone(&registry),
216            Arc::clone(&metrics),
217        ));
218        // Build middleware stack configuration
219        #[cfg(feature = "middleware")]
220        #[cfg_attr(not(feature = "rate-limiting"), allow(unused_mut))]
221        let mut stack = crate::middleware::MiddlewareStack::new();
222
223        // Auto-install rate limiting if enabled in config
224        #[cfg(feature = "rate-limiting")]
225        if config.rate_limiting.enabled {
226            use crate::middleware::rate_limit::{RateLimitStrategy, RateLimits};
227            use std::num::NonZeroU32;
228            use std::time::Duration;
229
230            let rate_config = crate::middleware::RateLimitConfig {
231                strategy: RateLimitStrategy::Global,
232                limits: RateLimits {
233                    requests_per_period: NonZeroU32::new(
234                        config.rate_limiting.requests_per_second * 60,
235                    )
236                    .unwrap(), // Convert per-second to per-minute
237                    period: Duration::from_secs(60),
238                    burst_size: Some(NonZeroU32::new(config.rate_limiting.burst_capacity).unwrap()),
239                },
240                enabled: true,
241            };
242
243            stack = stack.with_rate_limit(rate_config);
244        }
245
246        // Create core MCP service
247        let core_service = McpService::new(
248            Arc::clone(&registry),
249            Arc::clone(&router),
250            Arc::clone(&metrics),
251        );
252
253        // COMPREHENSIVE TOWER SERVICE COMPOSITION
254        // Build the complete middleware stack with proper type erasure
255        //
256        // This service is called from server/transport.rs for EVERY incoming request:
257        // TransportMessage -> http::Request -> service.call() -> http::Response -> TransportMessage
258        //
259        // The Tower middleware stack provides:
260        // ✓ Timeout enforcement (configurable per-request)
261        // ✓ Request validation (JSON-RPC structure)
262        // ✓ Authorization checks (resource access control)
263        // ✓ Rate limiting (if enabled in config)
264        // ✓ Audit logging (configurable)
265        // ✓ And more layers as configured
266        //
267        // BoxCloneService is Clone but !Sync - this is the Tower pattern
268        #[cfg(feature = "middleware")]
269        let service = Self::build_middleware_stack(core_service, stack);
270
271        #[cfg(not(feature = "middleware"))]
272        let service = tower::util::BoxCloneService::new(core_service);
273
274        let lifecycle = Arc::new(ServerLifecycle::new());
275
276        Self {
277            config,
278            registry,
279            router,
280            service,
281            lifecycle,
282            metrics,
283        }
284    }
285
286    /// Get server configuration
287    #[must_use]
288    pub const fn config(&self) -> &ServerConfig {
289        &self.config
290    }
291
292    /// Get handler registry
293    #[must_use]
294    pub const fn registry(&self) -> &Arc<HandlerRegistry> {
295        &self.registry
296    }
297
298    /// Get request router
299    #[must_use]
300    pub const fn router(&self) -> &Arc<RequestRouter> {
301        &self.router
302    }
303
304    /// Get server lifecycle
305    #[must_use]
306    pub const fn lifecycle(&self) -> &Arc<ServerLifecycle> {
307        &self.lifecycle
308    }
309
310    /// Get server metrics
311    #[must_use]
312    pub const fn metrics(&self) -> &Arc<ServerMetrics> {
313        &self.metrics
314    }
315
316    /// Get the Tower service stack (test accessor)
317    ///
318    /// **Note**: This is primarily for integration testing. Production code should
319    /// use the transport layer which calls the service internally via
320    /// `handle_transport_message()`.
321    ///
322    /// Returns a clone of the Tower service stack, which is cheap (BoxCloneService
323    /// is designed for cloning).
324    #[doc(hidden)]
325    pub fn service(
326        &self,
327    ) -> tower::util::BoxCloneService<Request<Bytes>, Response<Bytes>, crate::ServerError> {
328        self.service.clone()
329    }
330
331    /// Get a shutdown handle for graceful server termination
332    ///
333    /// This handle enables external control over server shutdown, essential for:
334    /// - **Production deployments**: Graceful shutdown on SIGTERM/SIGINT
335    /// - **Container orchestration**: Kubernetes graceful pod termination
336    /// - **Load balancer integration**: Health check coordination
337    /// - **Multi-component systems**: Coordinated shutdown sequences
338    /// - **Maintenance operations**: Planned downtime and updates
339    ///
340    /// # Examples
341    ///
342    /// ## Basic shutdown coordination
343    /// ```no_run
344    /// # use turbomcp_server::ServerBuilder;
345    /// # #[tokio::main]
346    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
347    /// let server = ServerBuilder::new().build();
348    /// let shutdown_handle = server.shutdown_handle();
349    ///
350    /// // Coordinate with other services
351    /// tokio::spawn(async move {
352    ///     // Wait for external shutdown signal
353    ///     tokio::signal::ctrl_c().await.expect("Failed to install Ctrl+C handler");
354    ///     println!("Shutdown signal received, terminating gracefully...");
355    ///     shutdown_handle.shutdown().await;
356    /// });
357    ///
358    /// // Server will gracefully shut down when signaled
359    /// // server.run_stdio().await?;
360    /// # Ok(())
361    /// # }
362    /// ```
363    ///
364    /// ## Container/Kubernetes deployment
365    /// ```no_run
366    /// # use turbomcp_server::ServerBuilder;
367    /// # use std::sync::Arc;
368    /// # #[tokio::main]
369    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
370    /// let server = ServerBuilder::new().build();
371    /// let shutdown_handle = server.shutdown_handle();
372    /// let shutdown_handle_clone = shutdown_handle.clone();
373    ///
374    /// // Handle multiple signal types with proper platform support
375    /// tokio::spawn(async move {
376    ///     #[cfg(unix)]
377    ///     {
378    ///         use tokio::signal::unix::{signal, SignalKind};
379    ///         let mut sigterm = signal(SignalKind::terminate()).unwrap();
380    ///         tokio::select! {
381    ///             _ = tokio::signal::ctrl_c() => {
382    ///                 println!("SIGINT received");
383    ///             }
384    ///             _ = sigterm.recv() => {
385    ///                 println!("SIGTERM received");
386    ///             }
387    ///         }
388    ///     }
389    ///     #[cfg(not(unix))]
390    ///     {
391    ///         tokio::signal::ctrl_c().await.expect("Failed to install Ctrl+C handler");
392    ///         println!("SIGINT received");
393    ///     }
394    ///     shutdown_handle_clone.shutdown().await;
395    /// });
396    ///
397    /// // Server handles graceful shutdown automatically
398    /// // server.run_tcp("0.0.0.0:8080").await?;
399    /// # Ok(())
400    /// # }
401    /// ```
402    pub fn shutdown_handle(&self) -> ShutdownHandle {
403        ShutdownHandle::new(self.lifecycle.clone())
404    }
405
406    /// Run the server with STDIO transport
407    ///
408    /// # Errors
409    ///
410    /// Returns [`crate::ServerError::Transport`] if:
411    /// - STDIO transport connection fails
412    /// - Message sending/receiving fails
413    /// - Transport disconnection fails
414    #[tracing::instrument(skip(self), fields(
415        transport = "stdio",
416        service_name = %self.config.name,
417        service_version = %self.config.version
418    ))]
419    pub async fn run_stdio(self) -> ServerResult<()> {
420        // For STDIO transport, disable logging unless explicitly overridden
421        // STDIO stdout must be reserved exclusively for JSON-RPC messages per MCP protocol
422        if should_log_for_stdio() {
423            info!("Starting MCP server with STDIO transport");
424        }
425
426        // Start performance monitoring for STDIO server
427        let _perf_span = info_span!("server.run", transport = "stdio").entered();
428        info!("Initializing STDIO transport for MCP server");
429
430        self.lifecycle.start().await;
431
432        // Initialize STDIO transport
433        let transport = StdioTransport::new();
434        if let Err(e) = transport.connect().await {
435            if should_log_for_stdio() {
436                tracing::error!(error = %e, "Failed to connect stdio transport");
437            } else {
438                // Critical errors can go to stderr for debugging
439                eprintln!("TurboMCP STDIO transport failed to connect: {}", e);
440            }
441            self.lifecycle.shutdown().await;
442            return Err(e.into());
443        }
444
445        self.run_with_transport_stdio_aware(transport).await
446    }
447
448    /// Get health status
449    pub async fn health(&self) -> HealthStatus {
450        self.lifecycle.health().await
451    }
452
453    /// Run server with HTTP transport using default configuration
454    ///
455    /// This provides a working HTTP server with:
456    /// - Standard HTTP POST/GET/DELETE for MCP protocol at `/mcp`
457    /// - Full MCP 2025-06-18 protocol compliance
458    /// - Graceful shutdown support
459    /// - Default rate limiting (100 req/60s)
460    /// - Default security settings (localhost allowed, CORS disabled)
461    ///
462    /// For custom configuration (rate limits, security, CORS), use `run_http_with_config`.
463    ///
464    /// # Examples
465    ///
466    /// ## Basic usage with default configuration
467    /// ```no_run
468    /// use turbomcp_server::ServerBuilder;
469    ///
470    /// #[tokio::main]
471    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
472    ///     let server = ServerBuilder::new()
473    ///         .name("my-server")
474    ///         .version("1.0.0")
475    ///         .build();
476    ///
477    ///     server.run_http("127.0.0.1:3000").await?;
478    ///     Ok(())
479    /// }
480    /// ```
481    ///
482    /// ## With custom configuration
483    /// ```no_run
484    /// use turbomcp_server::ServerBuilder;
485    /// use turbomcp_transport::streamable_http_v2::StreamableHttpConfigBuilder;
486    /// use std::time::Duration;
487    ///
488    /// #[tokio::main]
489    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
490    ///     let server = ServerBuilder::new()
491    ///         .name("my-server")
492    ///         .version("1.0.0")
493    ///         .build();
494    ///
495    ///     let config = StreamableHttpConfigBuilder::new()
496    ///         .without_rate_limit()  // For benchmarking
497    ///         .allow_any_origin(true)  // Enable CORS
498    ///         .build();
499    ///
500    ///     server.run_http_with_config("127.0.0.1:3000", config).await?;
501    ///     Ok(())
502    /// }
503    /// ```
504    ///
505    /// # Errors
506    ///
507    /// Returns [`crate::ServerError::Transport`] if:
508    /// - Address resolution fails
509    /// - HTTP server fails to start
510    /// - Transport disconnection fails
511    #[cfg(feature = "http")]
512    #[tracing::instrument(skip(self), fields(
513        transport = "http",
514        service_name = %self.config.name,
515        service_version = %self.config.version,
516        addr = ?addr
517    ))]
518    pub async fn run_http<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
519        self,
520        addr: A,
521    ) -> ServerResult<()> {
522        use turbomcp_transport::streamable_http_v2::StreamableHttpConfigBuilder;
523
524        // Build default configuration
525        let config = StreamableHttpConfigBuilder::new().build();
526
527        self.run_http_with_config(addr, config).await
528    }
529
530    /// Run server with HTTP transport and custom configuration
531    ///
532    /// This provides full control over HTTP server configuration including:
533    /// - Rate limiting (requests per time window, or disabled entirely)
534    /// - Security settings (CORS, origin validation, authentication)
535    /// - Network settings (bind address, endpoint path, keep-alive)
536    /// - Advanced settings (replay buffer size, etc.)
537    ///
538    /// # Examples
539    ///
540    /// ## Benchmarking configuration (no rate limits)
541    /// ```no_run
542    /// use turbomcp_server::ServerBuilder;
543    /// use turbomcp_transport::streamable_http_v2::StreamableHttpConfigBuilder;
544    ///
545    /// #[tokio::main]
546    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
547    ///     let server = ServerBuilder::new()
548    ///         .name("benchmark-server")
549    ///         .version("1.0.0")
550    ///         .build();
551    ///
552    ///     let config = StreamableHttpConfigBuilder::new()
553    ///         .without_rate_limit()  // Disable rate limiting
554    ///         .build();
555    ///
556    ///     server.run_http_with_config("127.0.0.1:3000", config).await?;
557    ///     Ok(())
558    /// }
559    /// ```
560    ///
561    /// ## Production configuration (secure, rate limited)
562    /// ```no_run
563    /// use turbomcp_server::ServerBuilder;
564    /// use turbomcp_transport::streamable_http_v2::StreamableHttpConfigBuilder;
565    /// use std::time::Duration;
566    ///
567    /// #[tokio::main]
568    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
569    ///     let server = ServerBuilder::new()
570    ///         .name("production-server")
571    ///         .version("1.0.0")
572    ///         .build();
573    ///
574    ///     let config = StreamableHttpConfigBuilder::new()
575    ///         .with_rate_limit(1000, Duration::from_secs(60))  // 1000 req/min
576    ///         .allow_any_origin(false)  // Strict CORS
577    ///         .require_authentication(true)  // Require auth
578    ///         .build();
579    ///
580    ///     server.run_http_with_config("127.0.0.1:3000", config).await?;
581    ///     Ok(())
582    /// }
583    /// ```
584    ///
585    /// # Errors
586    ///
587    /// Returns [`crate::ServerError::Transport`] if:
588    /// - Address resolution fails
589    /// - HTTP server fails to start
590    /// - Transport disconnection fails
591    #[cfg(feature = "http")]
592    #[tracing::instrument(skip(self, config), fields(
593        transport = "http",
594        service_name = %self.config.name,
595        service_version = %self.config.version,
596        addr = ?addr
597    ))]
598    pub async fn run_http_with_config<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
599        self,
600        addr: A,
601        config: turbomcp_transport::streamable_http_v2::StreamableHttpConfig,
602    ) -> ServerResult<()> {
603        use turbomcp_transport::streamable_http_v2::run_server;
604
605        info!("Starting MCP server with HTTP transport");
606        info!(
607            config = ?config,
608            "HTTP configuration loaded"
609        );
610
611        self.lifecycle.start().await;
612
613        // Resolve address to string
614        let socket_addr = addr
615            .to_socket_addrs()
616            .map_err(|e| crate::ServerError::configuration(format!("Invalid address: {}", e)))?
617            .next()
618            .ok_or_else(|| crate::ServerError::configuration("No address resolved"))?;
619
620        info!("Resolved address: {}", socket_addr);
621
622        // Use provided config but override bind_addr with resolved address
623        let mut final_config = config;
624        final_config.bind_addr = socket_addr.to_string();
625
626        info!(
627            bind_addr = %final_config.bind_addr,
628            endpoint_path = %final_config.endpoint_path,
629            "HTTP server configuration finalized"
630        );
631
632        // Run HTTP server with the router
633        // The router implements the required handler trait for HTTP transport
634        run_server(final_config, self.router.clone())
635            .await
636            .map_err(|e| {
637                tracing::error!(error = %e, "HTTP server failed");
638                crate::ServerError::handler(e.to_string())
639            })?;
640
641        info!("HTTP server shutdown complete");
642        Ok(())
643    }
644
645    /// Run server with WebSocket transport (full bidirectional support)
646    ///
647    /// This provides a simple API for WebSocket servers with sensible defaults:
648    /// - Default endpoint: `/mcp/ws`
649    /// - Full MCP 2025-06-18 compliance
650    /// - Bidirectional communication
651    /// - Elicitation support
652    /// - Session management and middleware
653    ///
654    /// For custom configuration, use `run_websocket_with_config()`.
655    ///
656    /// # Example
657    ///
658    /// ```no_run
659    /// use turbomcp_server::ServerBuilder;
660    ///
661    /// #[tokio::main]
662    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
663    ///     let server = ServerBuilder::new()
664    ///         .name("ws-server")
665    ///         .version("1.0.0")
666    ///         .build();
667    ///
668    ///     server.run_websocket("127.0.0.1:8080").await?;
669    ///     Ok(())
670    /// }
671    /// ```
672    #[cfg(all(feature = "websocket", feature = "http"))]
673    #[tracing::instrument(skip(self), fields(
674        transport = "websocket",
675        service_name = %self.config.name,
676        service_version = %self.config.version,
677        addr = ?addr
678    ))]
679    pub async fn run_websocket<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
680        self,
681        addr: A,
682    ) -> ServerResult<()> {
683        use turbomcp_transport::websocket_server::WebSocketServerConfig;
684
685        // Build default configuration
686        let config = WebSocketServerConfig::default();
687
688        self.run_websocket_with_config(addr, config).await
689    }
690
691    /// Run server with WebSocket transport and custom configuration
692    ///
693    /// This provides full control over WebSocket server configuration including:
694    /// - Custom endpoint path
695    /// - MCP server settings (middleware, security, etc.)
696    ///
697    /// # Example
698    ///
699    /// ```no_run
700    /// use turbomcp_server::ServerBuilder;
701    /// use turbomcp_transport::websocket_server::WebSocketServerConfig;
702    ///
703    /// #[tokio::main]
704    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
705    ///     let server = ServerBuilder::new()
706    ///         .name("custom-ws-server")
707    ///         .version("1.0.0")
708    ///         .build();
709    ///
710    ///     let config = WebSocketServerConfig {
711    ///         bind_addr: "0.0.0.0:8080".to_string(),
712    ///         endpoint_path: "/custom/ws".to_string(),
713    ///     };
714    ///
715    ///     server.run_websocket_with_config("127.0.0.1:8080", config).await?;
716    ///     Ok(())
717    /// }
718    /// ```
719    #[cfg(all(feature = "websocket", feature = "http"))]
720    #[tracing::instrument(skip(self, config), fields(
721        transport = "websocket",
722        service_name = %self.config.name,
723        service_version = %self.config.version,
724        addr = ?addr
725    ))]
726    pub async fn run_websocket_with_config<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
727        self,
728        addr: A,
729        config: turbomcp_transport::websocket_server::WebSocketServerConfig,
730    ) -> ServerResult<()> {
731        use turbomcp_transport::websocket_server::run_websocket_server_with_config;
732
733        info!("Starting MCP server with WebSocket transport");
734        info!(config = ?config, "WebSocket configuration");
735
736        // Resolve address to string
737        let socket_addr = addr
738            .to_socket_addrs()
739            .map_err(|e| crate::ServerError::configuration(format!("Invalid address: {}", e)))?
740            .next()
741            .ok_or_else(|| crate::ServerError::configuration("No address resolved"))?;
742
743        info!("Resolved address: {}", socket_addr);
744
745        // Use provided config but override bind_addr with resolved address
746        let mut final_config = config;
747        final_config.bind_addr = socket_addr.to_string();
748
749        // Run WebSocket server with the router
750        run_websocket_server_with_config(final_config, self.router.clone())
751            .await
752            .map_err(|e| {
753                tracing::error!(error = %e, "WebSocket server failed");
754                crate::ServerError::handler(e.to_string())
755            })?;
756
757        info!("WebSocket server shutdown complete");
758        Ok(())
759    }
760
761    /// Run server with TCP transport (progressive enhancement - runtime configuration)
762    #[cfg(feature = "tcp")]
763    #[tracing::instrument(skip(self), fields(
764        transport = "tcp",
765        service_name = %self.config.name,
766        service_version = %self.config.version,
767        addr = ?addr
768    ))]
769    pub async fn run_tcp<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
770        self,
771        addr: A,
772    ) -> ServerResult<()> {
773        use turbomcp_transport::TcpTransport;
774
775        // Start performance monitoring for TCP server
776        let _perf_span = info_span!("server.run", transport = "tcp").entered();
777        info!(?addr, "Starting MCP server with TCP transport");
778
779        self.lifecycle.start().await;
780
781        // Convert ToSocketAddrs to SocketAddr
782        let socket_addr = match addr.to_socket_addrs() {
783            Ok(mut addrs) => match addrs.next() {
784                Some(addr) => addr,
785                None => {
786                    tracing::error!("No socket address resolved from provided address");
787                    self.lifecycle.shutdown().await;
788                    return Err(crate::ServerError::configuration("Invalid socket address"));
789                }
790            },
791            Err(e) => {
792                tracing::error!(error = %e, "Failed to resolve socket address");
793                self.lifecycle.shutdown().await;
794                return Err(crate::ServerError::configuration(format!(
795                    "Address resolution failed: {e}"
796                )));
797            }
798        };
799
800        let transport = TcpTransport::new_server(socket_addr);
801        if let Err(e) = transport.connect().await {
802            tracing::error!(error = %e, "Failed to connect TCP transport");
803            self.lifecycle.shutdown().await;
804            return Err(e.into());
805        }
806
807        self.run_with_transport(transport).await
808    }
809
810    /// Run server with Unix socket transport (progressive enhancement - runtime configuration)
811    #[cfg(all(feature = "unix", unix))]
812    #[tracing::instrument(skip(self), fields(
813        transport = "unix",
814        service_name = %self.config.name,
815        service_version = %self.config.version,
816        path = ?path.as_ref()
817    ))]
818    pub async fn run_unix<P: AsRef<std::path::Path>>(self, path: P) -> ServerResult<()> {
819        use std::path::PathBuf;
820        use turbomcp_transport::UnixTransport;
821
822        // Start performance monitoring for Unix server
823        let _perf_span = info_span!("server.run", transport = "unix").entered();
824        info!(path = ?path.as_ref(), "Starting MCP server with Unix socket transport");
825
826        self.lifecycle.start().await;
827
828        let socket_path = PathBuf::from(path.as_ref());
829        let transport = UnixTransport::new_server(socket_path);
830        if let Err(e) = transport.connect().await {
831            tracing::error!(error = %e, "Failed to connect Unix socket transport");
832            self.lifecycle.shutdown().await;
833            return Err(e.into());
834        }
835
836        self.run_with_transport(transport).await
837    }
838
839    /// Generic transport runner (DRY principle)
840    /// Used by feature-gated transport methods (http, tcp, websocket, unix)
841    #[allow(dead_code)]
842    #[tracing::instrument(skip(self, transport), fields(
843        service_name = %self.config.name,
844        service_version = %self.config.version
845    ))]
846    async fn run_with_transport<T: Transport>(&self, mut transport: T) -> ServerResult<()> {
847        // Install signal handlers for graceful shutdown (Ctrl+C / SIGTERM)
848        let lifecycle_for_sigint = self.lifecycle.clone();
849        tokio::spawn(async move {
850            if let Err(e) = tokio::signal::ctrl_c().await {
851                tracing::warn!(error = %e, "Failed to install Ctrl+C handler");
852                return;
853            }
854            tracing::info!("Ctrl+C received, initiating shutdown");
855            lifecycle_for_sigint.shutdown().await;
856        });
857
858        #[cfg(unix)]
859        {
860            let lifecycle_for_sigterm = self.lifecycle.clone();
861            tokio::spawn(async move {
862                use tokio::signal::unix::{SignalKind, signal};
863                match signal(SignalKind::terminate()) {
864                    Ok(mut sigterm) => {
865                        sigterm.recv().await;
866                        tracing::info!("SIGTERM received, initiating shutdown");
867                        lifecycle_for_sigterm.shutdown().await;
868                    }
869                    Err(e) => tracing::warn!(error = %e, "Failed to install SIGTERM handler"),
870                }
871            });
872        }
873
874        // Shutdown signal
875        let mut shutdown = self.lifecycle.shutdown_signal();
876
877        // Main message processing loop
878        loop {
879            tokio::select! {
880                _ = shutdown.recv() => {
881                    tracing::info!("Shutdown signal received");
882                    break;
883                }
884                res = transport.receive() => {
885                    match res {
886                        Ok(Some(message)) => {
887                            if let Err(e) = self.handle_transport_message(&mut transport, message).await {
888                                tracing::warn!(error = %e, "Failed to handle transport message");
889                            }
890                        }
891                        Ok(None) => {
892                            // No message available; sleep briefly to avoid busy loop
893                            sleep(Duration::from_millis(5)).await;
894                        }
895                        Err(e) => {
896                            match e {
897                                TransportError::ReceiveFailed(msg) if msg.contains("disconnected") => {
898                                    tracing::info!("Transport receive channel disconnected; shutting down");
899                                    break;
900                                }
901                                _ => {
902                                    tracing::error!(error = %e, "Transport receive failed");
903                                    // Backoff on errors
904                                    sleep(Duration::from_millis(50)).await;
905                                }
906                            }
907                        }
908                    }
909                }
910            }
911        }
912
913        // Disconnect transport
914        if let Err(e) = transport.disconnect().await {
915            tracing::warn!(error = %e, "Error while disconnecting transport");
916        }
917
918        tracing::info!("Server shutdown complete");
919        Ok(())
920    }
921
922    /// STDIO-aware transport runner that respects MCP protocol logging requirements
923    #[tracing::instrument(skip(self, transport), fields(
924        service_name = %self.config.name,
925        service_version = %self.config.version,
926        transport = "stdio"
927    ))]
928    async fn run_with_transport_stdio_aware<T: Transport>(
929        &self,
930        mut transport: T,
931    ) -> ServerResult<()> {
932        // Install signal handlers for graceful shutdown (Ctrl+C / SIGTERM)
933        let lifecycle_for_sigint = self.lifecycle.clone();
934        tokio::spawn(async move {
935            if let Err(e) = tokio::signal::ctrl_c().await {
936                if should_log_for_stdio() {
937                    tracing::warn!(error = %e, "Failed to install Ctrl+C handler");
938                }
939                return;
940            }
941            if should_log_for_stdio() {
942                tracing::info!("Ctrl+C received, initiating shutdown");
943            }
944            lifecycle_for_sigint.shutdown().await;
945        });
946
947        #[cfg(unix)]
948        {
949            let lifecycle_for_sigterm = self.lifecycle.clone();
950            tokio::spawn(async move {
951                use tokio::signal::unix::{SignalKind, signal};
952                match signal(SignalKind::terminate()) {
953                    Ok(mut sigterm) => {
954                        sigterm.recv().await;
955                        if should_log_for_stdio() {
956                            tracing::info!("SIGTERM received, initiating shutdown");
957                        }
958                        lifecycle_for_sigterm.shutdown().await;
959                    }
960                    Err(e) => {
961                        if should_log_for_stdio() {
962                            tracing::warn!(error = %e, "Failed to install SIGTERM handler");
963                        }
964                    }
965                }
966            });
967        }
968
969        // Shutdown signal
970        let mut shutdown = self.lifecycle.shutdown_signal();
971
972        // Main message processing loop
973        loop {
974            tokio::select! {
975                _ = shutdown.recv() => {
976                    if should_log_for_stdio() {
977                        tracing::info!("Shutdown signal received");
978                    }
979                    break;
980                }
981                res = transport.receive() => {
982                    match res {
983                        Ok(Some(message)) => {
984                            if let Err(e) = self.handle_transport_message_stdio_aware(&mut transport, message).await
985                                && should_log_for_stdio() {
986                                    tracing::warn!(error = %e, "Failed to handle transport message");
987                                }
988                        }
989                        Ok(None) => {
990                            // No message available; sleep briefly to avoid busy loop
991                            sleep(Duration::from_millis(5)).await;
992                        }
993                        Err(e) => {
994                            match e {
995                                TransportError::ReceiveFailed(msg) if msg.contains("disconnected") => {
996                                    if should_log_for_stdio() {
997                                        tracing::info!("Transport receive channel disconnected; shutting down");
998                                    }
999                                    break;
1000                                }
1001                                _ => {
1002                                    if should_log_for_stdio() {
1003                                        tracing::error!(error = %e, "Transport receive failed");
1004                                    }
1005                                    // Backoff on errors
1006                                    sleep(Duration::from_millis(50)).await;
1007                                }
1008                            }
1009                        }
1010                    }
1011                }
1012            }
1013        }
1014
1015        // Disconnect transport
1016        if let Err(e) = transport.disconnect().await
1017            && should_log_for_stdio()
1018        {
1019            tracing::warn!(error = %e, "Error while disconnecting transport");
1020        }
1021
1022        if should_log_for_stdio() {
1023            tracing::info!("Server shutdown complete");
1024        }
1025        Ok(())
1026    }
1027}
1028
1029// Compile-time assertion that McpServer is Send + Clone (Tower pattern)
1030// Note: McpServer is Clone but NOT Sync (due to BoxCloneService being !Sync)
1031// This is intentional and follows the Axum/Tower design pattern
1032#[allow(dead_code)]
1033const _: () = {
1034    const fn assert_send_clone<T: Send + Clone>() {}
1035    const fn check() {
1036        assert_send_clone::<crate::server::core::McpServer>();
1037    }
1038};