turbomcp_server/server/
core.rs

1//! Core MCP server implementation
2//!
3//! Contains the main McpServer struct and its core functionality including
4//! middleware building, lifecycle management, and server construction.
5
6use std::sync::Arc;
7use tracing::{info, info_span};
8
9#[cfg(feature = "http")]
10use tracing::warn;
11
12use crate::{
13    config::ServerConfig,
14    error::ServerResult,
15    lifecycle::{HealthStatus, ServerLifecycle},
16    metrics::ServerMetrics,
17    registry::HandlerRegistry,
18    routing::RequestRouter,
19    service::McpService,
20};
21
22#[cfg(feature = "middleware")]
23use crate::middleware::MiddlewareStack;
24
25use bytes::Bytes;
26use http::{Request, Response};
27use tokio::time::{Duration, sleep};
28use turbomcp_transport::Transport;
29use turbomcp_transport::core::TransportError;
30
31use super::shutdown::ShutdownHandle;
32
33/// Check if logging should be enabled for STDIO transport
34///
35/// For MCP STDIO transport compliance, logging is disabled by default since stdout
36/// must be reserved exclusively for JSON-RPC messages. This can be overridden by
37/// setting the TURBOMCP_FORCE_LOGGING environment variable.
38pub(crate) fn should_log_for_stdio() -> bool {
39    std::env::var("TURBOMCP_FORCE_LOGGING").is_ok()
40}
41
42/// Main MCP server following the Axum/Tower Clone pattern
43///
44/// ## Sharing Pattern
45///
46/// `McpServer` implements `Clone` like Axum's `Router`. All heavy state is Arc-wrapped
47/// internally, making cloning cheap (just atomic reference count increments).
48///
49/// ```rust,no_run
50/// use turbomcp_server::ServerBuilder;
51///
52/// # async fn example() {
53/// let server = ServerBuilder::new().build();
54///
55/// // Clone for passing to functions (cheap - just Arc increments)
56/// let server1 = server.clone();
57/// let server2 = server.clone();
58///
59/// // Access config and health
60/// let config = server1.config();
61/// println!("Server: {}", config.name);
62///
63/// let health = server2.health().await;
64/// println!("Health: {:?}", health);
65/// # }
66/// ```
67///
68/// ## Architecture Notes
69///
70/// The `service` field contains `BoxCloneService` which is `Send + Clone` but NOT `Sync`.
71/// This is intentional and follows Tower's design - users clone the server instead of
72/// Arc-wrapping it.
73///
74/// **Architecture Note**: The service field provides tower::Service integration for
75/// advanced middleware patterns. The request processing pipeline currently uses the
76/// RequestRouter directly. Tower integration can be added via custom middleware layers
77/// when needed for specific use cases (e.g., custom rate limiting, advanced tracing).
78#[derive(Clone)]
79pub struct McpServer {
80    /// Server configuration (Clone-able)
81    pub(crate) config: ServerConfig,
82    /// Handler registry (Arc-wrapped for cheap cloning)
83    pub(crate) registry: Arc<HandlerRegistry>,
84    /// Request router (Arc-wrapped for cheap cloning)
85    pub(crate) router: Arc<RequestRouter>,
86    /// Tower middleware service stack (Clone but !Sync - this is the Tower pattern)
87    ///
88    /// All requests flow through this service stack, which provides:
89    /// - Timeout enforcement
90    /// - Request validation
91    /// - Authorization checks
92    /// - Rate limiting
93    /// - Audit logging
94    /// - And more middleware layers as configured
95    ///
96    /// See `server/transport.rs` for integration with transport layer.
97    pub(crate) service:
98        tower::util::BoxCloneService<Request<Bytes>, Response<Bytes>, crate::ServerError>,
99    /// Server lifecycle (Arc-wrapped for cheap cloning)
100    pub(crate) lifecycle: Arc<ServerLifecycle>,
101    /// Server metrics (Arc-wrapped for cheap cloning)
102    pub(crate) metrics: Arc<ServerMetrics>,
103}
104
105impl std::fmt::Debug for McpServer {
106    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107        f.debug_struct("McpServer")
108            .field("config", &self.config)
109            .finish()
110    }
111}
112
113impl McpServer {
114    /// Build comprehensive Tower middleware stack (transport-agnostic)
115    ///
116    /// ## Architecture
117    ///
118    /// This creates a complete Tower service stack with conditional middleware layers:
119    /// - **Timeout Layer**: Request timeout enforcement (tower_http)
120    /// - **Validation Layer**: JSON-RPC structure validation
121    /// - **Authorization Layer**: Resource access control
122    /// - **Core Service**: JSON-RPC routing and handler execution
123    ///
124    /// All middleware is composed using Tower's ServiceBuilder pattern, which provides:
125    /// - Top-to-bottom execution order
126    /// - Type-safe layer composition
127    /// - Zero-cost abstractions
128    /// - Clone-able service instances
129    ///
130    /// ## Integration
131    ///
132    /// The resulting BoxCloneService is stored in `self.service` and called from
133    /// `server/transport.rs` for every incoming request. This ensures ALL requests
134    /// flow through the complete middleware pipeline before reaching handlers.
135    ///
136    /// ## Adding Middleware
137    ///
138    /// To add new middleware, update the match arms below to include your layer.
139    /// Follow the pattern of conditional inclusion based on config flags.
140    #[cfg(feature = "middleware")]
141    fn build_middleware_stack(
142        core_service: McpService,
143        stack: MiddlewareStack,
144    ) -> tower::util::BoxCloneService<Request<Bytes>, Response<Bytes>, crate::ServerError> {
145        // COMPREHENSIVE TOWER COMPOSITION - Conditional Layer Stacking
146        //
147        // This approach builds the middleware stack incrementally, boxing at each step.
148        // While this has a small performance cost from multiple boxing operations,
149        // it provides several critical advantages:
150        //
151        // 1. **Maintainability**: No combinatorial explosion (8 match arms → simple chain)
152        // 2. **Extensibility**: Adding new middleware requires only one new block
153        // 3. **Clarity**: Each layer's purpose and configuration is explicit
154        // 4. **Type Safety**: BoxCloneService provides type erasure while preserving Clone
155        //
156        // Performance note: The boxing overhead is negligible compared to network I/O
157        // and handler execution time. Modern allocators make this essentially free.
158
159        // Start with core service as a boxed service for uniform type handling
160        let mut service: tower::util::BoxCloneService<
161            Request<Bytes>,
162            Response<Bytes>,
163            crate::ServerError,
164        > = tower::util::BoxCloneService::new(core_service);
165
166        // Authorization layer removed in 2.0.0 - handle at application layer
167
168        // Layer 2: Validation
169        // Validates request structure after auth but before processing
170        #[cfg(feature = "middleware")]
171        {
172            if let Some(validation_layer) = stack.validation_layer() {
173                service = tower::util::BoxCloneService::new(
174                    tower::ServiceBuilder::new()
175                        .layer(validation_layer)
176                        .service(service),
177                );
178            }
179        }
180
181        // Layer 3: Timeout (outermost)
182        // Applied last so it can enforce timeout on the entire request pipeline
183        #[cfg(feature = "middleware")]
184        {
185            if let Some(timeout_config) = stack.timeout_config
186                && timeout_config.enabled
187            {
188                service = tower::util::BoxCloneService::new(
189                    tower::ServiceBuilder::new()
190                        .layer(tower_http::timeout::TimeoutLayer::new(
191                            timeout_config.request_timeout,
192                        ))
193                        .service(service),
194                );
195            }
196        }
197
198        // Future middleware can be added here with similar if-let blocks:
199        // if let Some(auth_config) = stack.auth_config { ... }
200        // if let Some(audit_config) = stack.audit_config { ... }
201        // if let Some(rate_limit_config) = stack.rate_limit_config { ... }
202
203        service
204    }
205
206    /// Create a new server
207    #[must_use]
208    pub fn new(config: ServerConfig) -> Self {
209        Self::new_with_registry(config, HandlerRegistry::new())
210    }
211
212    /// Create a new server with an existing registry (used by ServerBuilder)
213    #[must_use]
214    pub(crate) fn new_with_registry(config: ServerConfig, registry: HandlerRegistry) -> Self {
215        let registry = Arc::new(registry);
216        let metrics = Arc::new(ServerMetrics::new());
217        let router = Arc::new(RequestRouter::new(
218            Arc::clone(&registry),
219            Arc::clone(&metrics),
220            config.clone(),
221        ));
222        // Build middleware stack configuration
223        #[cfg(feature = "middleware")]
224        #[cfg_attr(not(feature = "rate-limiting"), allow(unused_mut))]
225        let mut stack = crate::middleware::MiddlewareStack::new();
226
227        // Auto-install rate limiting if enabled in config
228        #[cfg(feature = "rate-limiting")]
229        if config.rate_limiting.enabled {
230            use crate::middleware::rate_limit::{RateLimitStrategy, RateLimits};
231            use std::num::NonZeroU32;
232            use std::time::Duration;
233
234            let rate_config = crate::middleware::RateLimitConfig {
235                strategy: RateLimitStrategy::Global,
236                limits: RateLimits {
237                    requests_per_period: NonZeroU32::new(
238                        config.rate_limiting.requests_per_second * 60,
239                    )
240                    .unwrap(), // Convert per-second to per-minute
241                    period: Duration::from_secs(60),
242                    burst_size: Some(NonZeroU32::new(config.rate_limiting.burst_capacity).unwrap()),
243                },
244                enabled: true,
245            };
246
247            stack = stack.with_rate_limit(rate_config);
248        }
249
250        // Create core MCP service
251        let core_service = McpService::new(
252            Arc::clone(&registry),
253            Arc::clone(&router),
254            Arc::clone(&metrics),
255        );
256
257        // COMPREHENSIVE TOWER SERVICE COMPOSITION
258        // Build the complete middleware stack with proper type erasure
259        //
260        // This service is called from server/transport.rs for EVERY incoming request:
261        // TransportMessage -> http::Request -> service.call() -> http::Response -> TransportMessage
262        //
263        // The Tower middleware stack provides:
264        // ✓ Timeout enforcement (configurable per-request)
265        // ✓ Request validation (JSON-RPC structure)
266        // ✓ Authorization checks (resource access control)
267        // ✓ Rate limiting (if enabled in config)
268        // ✓ Audit logging (configurable)
269        // ✓ And more layers as configured
270        //
271        // BoxCloneService is Clone but !Sync - this is the Tower pattern
272        #[cfg(feature = "middleware")]
273        let service = Self::build_middleware_stack(core_service, stack);
274
275        #[cfg(not(feature = "middleware"))]
276        let service = tower::util::BoxCloneService::new(core_service);
277
278        let lifecycle = Arc::new(ServerLifecycle::new());
279
280        Self {
281            config,
282            registry,
283            router,
284            service,
285            lifecycle,
286            metrics,
287        }
288    }
289
290    /// Get server configuration
291    #[must_use]
292    pub const fn config(&self) -> &ServerConfig {
293        &self.config
294    }
295
296    /// Get handler registry
297    #[must_use]
298    pub const fn registry(&self) -> &Arc<HandlerRegistry> {
299        &self.registry
300    }
301
302    /// Get request router
303    #[must_use]
304    pub const fn router(&self) -> &Arc<RequestRouter> {
305        &self.router
306    }
307
308    /// Get server lifecycle
309    #[must_use]
310    pub const fn lifecycle(&self) -> &Arc<ServerLifecycle> {
311        &self.lifecycle
312    }
313
314    /// Get server metrics
315    #[must_use]
316    pub const fn metrics(&self) -> &Arc<ServerMetrics> {
317        &self.metrics
318    }
319
320    /// Get the Tower service stack (test accessor)
321    ///
322    /// **Note**: This is primarily for integration testing. Production code should
323    /// use the transport layer which calls the service internally via
324    /// `handle_transport_message()`.
325    ///
326    /// Returns a clone of the Tower service stack, which is cheap (BoxCloneService
327    /// is designed for cloning).
328    #[doc(hidden)]
329    pub fn service(
330        &self,
331    ) -> tower::util::BoxCloneService<Request<Bytes>, Response<Bytes>, crate::ServerError> {
332        self.service.clone()
333    }
334
335    /// Get a shutdown handle for graceful server termination
336    ///
337    /// This handle enables external control over server shutdown, essential for:
338    /// - **Production deployments**: Graceful shutdown on SIGTERM/SIGINT
339    /// - **Container orchestration**: Kubernetes graceful pod termination
340    /// - **Load balancer integration**: Health check coordination
341    /// - **Multi-component systems**: Coordinated shutdown sequences
342    /// - **Maintenance operations**: Planned downtime and updates
343    ///
344    /// # Examples
345    ///
346    /// ## Basic shutdown coordination
347    /// ```no_run
348    /// # use turbomcp_server::ServerBuilder;
349    /// # #[tokio::main]
350    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
351    /// let server = ServerBuilder::new().build();
352    /// let shutdown_handle = server.shutdown_handle();
353    ///
354    /// // Coordinate with other services
355    /// tokio::spawn(async move {
356    ///     // Wait for external shutdown signal
357    ///     tokio::signal::ctrl_c().await.expect("Failed to install Ctrl+C handler");
358    ///     println!("Shutdown signal received, terminating gracefully...");
359    ///     shutdown_handle.shutdown().await;
360    /// });
361    ///
362    /// // Server will gracefully shut down when signaled
363    /// // server.run_stdio().await?;
364    /// # Ok(())
365    /// # }
366    /// ```
367    ///
368    /// ## Container/Kubernetes deployment
369    /// ```no_run
370    /// # use turbomcp_server::ServerBuilder;
371    /// # use std::sync::Arc;
372    /// # #[tokio::main]
373    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
374    /// let server = ServerBuilder::new().build();
375    /// let shutdown_handle = server.shutdown_handle();
376    /// let shutdown_handle_clone = shutdown_handle.clone();
377    ///
378    /// // Handle multiple signal types with proper platform support
379    /// tokio::spawn(async move {
380    ///     #[cfg(unix)]
381    ///     {
382    ///         use tokio::signal::unix::{signal, SignalKind};
383    ///         let mut sigterm = signal(SignalKind::terminate()).unwrap();
384    ///         tokio::select! {
385    ///             _ = tokio::signal::ctrl_c() => {
386    ///                 println!("SIGINT received");
387    ///             }
388    ///             _ = sigterm.recv() => {
389    ///                 println!("SIGTERM received");
390    ///             }
391    ///         }
392    ///     }
393    ///     #[cfg(not(unix))]
394    ///     {
395    ///         tokio::signal::ctrl_c().await.expect("Failed to install Ctrl+C handler");
396    ///         println!("SIGINT received");
397    ///     }
398    ///     shutdown_handle_clone.shutdown().await;
399    /// });
400    ///
401    /// // Server handles graceful shutdown automatically
402    /// // server.run_tcp("0.0.0.0:8080").await?;
403    /// # Ok(())
404    /// # }
405    /// ```
406    pub fn shutdown_handle(&self) -> ShutdownHandle {
407        ShutdownHandle::new(self.lifecycle.clone())
408    }
409
410    /// Run the server with STDIO transport
411    ///
412    /// # Errors
413    ///
414    /// Returns [`crate::ServerError::Transport`] if:
415    /// - STDIO transport connection fails
416    /// - Message sending/receiving fails
417    /// - Transport disconnection fails
418    #[tracing::instrument(skip(self), fields(
419        transport = "stdio",
420        service_name = %self.config.name,
421        service_version = %self.config.version
422    ))]
423    pub async fn run_stdio(mut self) -> ServerResult<()> {
424        // For STDIO transport, disable logging unless explicitly overridden
425        // STDIO stdout must be reserved exclusively for JSON-RPC messages per MCP protocol
426        if should_log_for_stdio() {
427            info!("Starting MCP server with STDIO transport");
428        }
429
430        // Start performance monitoring for STDIO server
431        let _perf_span = info_span!("server.run", transport = "stdio").entered();
432        info!("Initializing STDIO transport for MCP server");
433
434        self.lifecycle.start().await;
435
436        // BIDIRECTIONAL STDIO SETUP
437        // Create STDIO dispatcher for server-initiated requests (sampling, elicitation, roots, ping)
438        let (request_tx, request_rx) = tokio::sync::mpsc::unbounded_channel();
439
440        // Use fully-qualified path to avoid ambiguity with the turbomcp crate's runtime module
441        let dispatcher = crate::runtime::StdioDispatcher::new(request_tx);
442
443        // Configure router's bidirectional support with the STDIO dispatcher
444        // SAFETY: We have &mut self, so we can safely get mutable access to the Arc'd router
445        // This is the CRITICAL STEP that was missing - without this, all server→client requests fail
446        let router = Arc::make_mut(&mut self.router);
447        router.set_server_request_dispatcher(dispatcher.clone());
448
449        // Run STDIO with full bidirectional support (MCP 2025-06-18 compliant)
450        // This uses the bidirectional-aware runtime that handles both:
451        // - Client→Server requests (tools, resources, prompts)
452        // - Server→Client requests (sampling, elicitation, roots, ping)
453        crate::runtime::run_stdio_bidirectional(self.router.clone(), dispatcher, request_rx)
454            .await
455            .map_err(|e| crate::ServerError::Handler {
456                message: format!("STDIO bidirectional runtime failed: {}", e),
457                context: Some("run_stdio".to_string()),
458            })
459    }
460
461    /// Get health status
462    pub async fn health(&self) -> HealthStatus {
463        self.lifecycle.health().await
464    }
465
466    /// Run server with HTTP transport using default configuration
467    ///
468    /// This provides a working HTTP server with:
469    /// - Standard HTTP POST/GET/DELETE for MCP protocol at `/mcp`
470    /// - Full MCP 2025-06-18 protocol compliance
471    /// - Graceful shutdown support
472    /// - Default rate limiting (100 req/60s)
473    /// - Default security settings (localhost allowed, CORS disabled)
474    ///
475    /// For custom configuration (rate limits, security, CORS), use `run_http_with_config`.
476    ///
477    /// # Examples
478    ///
479    /// ## Basic usage with default configuration
480    /// ```no_run
481    /// use turbomcp_server::ServerBuilder;
482    ///
483    /// #[tokio::main]
484    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
485    ///     let server = ServerBuilder::new()
486    ///         .name("my-server")
487    ///         .version("1.0.0")
488    ///         .build();
489    ///
490    ///     server.run_http("127.0.0.1:3000").await?;
491    ///     Ok(())
492    /// }
493    /// ```
494    ///
495    /// ## With custom configuration
496    /// ```no_run
497    /// use turbomcp_server::ServerBuilder;
498    /// use turbomcp_transport::streamable_http_v2::StreamableHttpConfigBuilder;
499    /// use std::time::Duration;
500    ///
501    /// #[tokio::main]
502    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
503    ///     let server = ServerBuilder::new()
504    ///         .name("my-server")
505    ///         .version("1.0.0")
506    ///         .build();
507    ///
508    ///     let config = StreamableHttpConfigBuilder::new()
509    ///         .without_rate_limit()  // For benchmarking
510    ///         .allow_any_origin(true)  // Enable CORS
511    ///         .build();
512    ///
513    ///     server.run_http_with_config("127.0.0.1:3000", config).await?;
514    ///     Ok(())
515    /// }
516    /// ```
517    ///
518    /// # Errors
519    ///
520    /// Returns [`crate::ServerError::Transport`] if:
521    /// - Address resolution fails
522    /// - HTTP server fails to start
523    /// - Transport disconnection fails
524    #[cfg(feature = "http")]
525    #[tracing::instrument(skip(self), fields(
526        transport = "http",
527        service_name = %self.config.name,
528        service_version = %self.config.version,
529        addr = ?addr
530    ))]
531    pub async fn run_http<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
532        self,
533        addr: A,
534    ) -> ServerResult<()> {
535        use turbomcp_transport::streamable_http_v2::StreamableHttpConfigBuilder;
536
537        // Build default configuration
538        let config = StreamableHttpConfigBuilder::new().build();
539
540        self.run_http_with_config(addr, config).await
541    }
542
543    /// Run server with HTTP transport and custom configuration
544    ///
545    /// This provides full control over HTTP server configuration including:
546    /// - Rate limiting (requests per time window, or disabled entirely)
547    /// - Security settings (CORS, origin validation, authentication)
548    /// - Network settings (bind address, endpoint path, keep-alive)
549    /// - Advanced settings (replay buffer size, etc.)
550    ///
551    /// # Bind Address Configuration
552    ///
553    /// **IMPORTANT**: The `addr` parameter takes precedence over `config.bind_addr`.
554    /// If they differ, a deprecation warning is logged.
555    ///
556    /// **Best Practice** (recommended for forward compatibility):
557    /// ```no_run
558    /// # use turbomcp_server::ServerBuilder;
559    /// # use turbomcp_transport::streamable_http_v2::StreamableHttpConfigBuilder;
560    /// # #[tokio::main]
561    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
562    /// let config = StreamableHttpConfigBuilder::new()
563    ///     .with_bind_address("127.0.0.1:3001")  // Set bind address in config
564    ///     .build();
565    ///
566    /// // Pass matching addr parameter (or use default "127.0.0.1:8080")
567    /// ServerBuilder::new()
568    ///     .build()
569    ///     .run_http_with_config("127.0.0.1:3001", config).await?;
570    /// # Ok(())
571    /// # }
572    /// ```
573    ///
574    /// **Deprecated** (will be removed in v3.x):
575    /// ```no_run
576    /// # use turbomcp_server::ServerBuilder;
577    /// # use turbomcp_transport::streamable_http_v2::StreamableHttpConfigBuilder;
578    /// # #[tokio::main]
579    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
580    /// // ⚠️ Avoid setting different addresses - causes deprecation warning
581    /// let config = StreamableHttpConfigBuilder::new()
582    ///     .with_bind_address("0.0.0.0:5000")  // This is ignored!
583    ///     .build();
584    ///
585    /// // The addr parameter wins (3001 is used, not 5000)
586    /// ServerBuilder::new()
587    ///     .build()
588    ///     .run_http_with_config("127.0.0.1:3001", config).await?;
589    /// # Ok(())
590    /// # }
591    /// ```
592    ///
593    /// **Future (v3.x)**: The `addr` parameter will be removed. Configure bind address
594    /// via `config.with_bind_address()` only.
595    ///
596    /// # Examples
597    ///
598    /// ## Benchmarking configuration (no rate limits)
599    /// ```no_run
600    /// use turbomcp_server::ServerBuilder;
601    /// use turbomcp_transport::streamable_http_v2::StreamableHttpConfigBuilder;
602    ///
603    /// #[tokio::main]
604    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
605    ///     let server = ServerBuilder::new()
606    ///         .name("benchmark-server")
607    ///         .version("1.0.0")
608    ///         .build();
609    ///
610    ///     let config = StreamableHttpConfigBuilder::new()
611    ///         .with_bind_address("127.0.0.1:3000")
612    ///         .without_rate_limit()  // Disable rate limiting
613    ///         .build();
614    ///
615    ///     server.run_http_with_config("127.0.0.1:3000", config).await?;
616    ///     Ok(())
617    /// }
618    /// ```
619    ///
620    /// ## Production configuration (secure, rate limited)
621    /// ```no_run
622    /// use turbomcp_server::ServerBuilder;
623    /// use turbomcp_transport::streamable_http_v2::StreamableHttpConfigBuilder;
624    /// use std::time::Duration;
625    ///
626    /// #[tokio::main]
627    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
628    ///     let server = ServerBuilder::new()
629    ///         .name("production-server")
630    ///         .version("1.0.0")
631    ///         .build();
632    ///
633    ///     let config = StreamableHttpConfigBuilder::new()
634    ///         .with_bind_address("127.0.0.1:3000")
635    ///         .with_rate_limit(1000, Duration::from_secs(60))  // 1000 req/min
636    ///         .allow_any_origin(false)  // Strict CORS
637    ///         .require_authentication(true)  // Require auth
638    ///         .build();
639    ///
640    ///     server.run_http_with_config("127.0.0.1:3000", config).await?;
641    ///     Ok(())
642    /// }
643    /// ```
644    ///
645    /// # Errors
646    ///
647    /// Returns [`crate::ServerError::Transport`] if:
648    /// - Address resolution fails
649    /// - HTTP server fails to start
650    /// - Transport disconnection fails
651    #[cfg(feature = "http")]
652    #[tracing::instrument(skip(self, config), fields(
653        transport = "http",
654        service_name = %self.config.name,
655        service_version = %self.config.version,
656        addr = ?addr
657    ))]
658    pub async fn run_http_with_config<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
659        self,
660        addr: A,
661        mut config: turbomcp_transport::streamable_http_v2::StreamableHttpConfig,
662    ) -> ServerResult<()> {
663        use std::collections::HashMap;
664        use tokio::sync::{Mutex, RwLock};
665
666        info!("Starting MCP server with HTTP transport");
667
668        self.lifecycle.start().await;
669
670        // Resolve address to string
671        let socket_addr = addr
672            .to_socket_addrs()
673            .map_err(|e| crate::ServerError::configuration(format!("Invalid address: {}", e)))?
674            .next()
675            .ok_or_else(|| crate::ServerError::configuration("No address resolved"))?;
676
677        info!("Resolved address: {}", socket_addr);
678
679        // Check for conflicting bind addresses and warn about deprecation
680        let socket_addr_str = socket_addr.to_string();
681        if config.bind_addr != socket_addr_str {
682            warn!(
683                addr_parameter = %socket_addr_str,
684                config_bind_addr = %config.bind_addr,
685                "⚠️  DEPRECATION WARNING: The `addr` parameter takes precedence over `config.bind_addr`"
686            );
687            warn!(
688                "⚠️  In TurboMCP v3.x, the `addr` parameter will be removed. Please use StreamableHttpConfigBuilder::new().with_bind_address(\"{}\").build() instead",
689                socket_addr_str
690            );
691            warn!(
692                "⚠️  Avoid setting both `addr` parameter and `config.bind_addr` to prevent confusion"
693            );
694
695            // Update config to single source of truth
696            config.bind_addr = socket_addr_str.clone();
697            config.base_url = format!("http://{}", socket_addr_str);
698        }
699
700        info!(
701            config = ?config,
702            "HTTP configuration (updated with resolved address)"
703        );
704
705        // BIDIRECTIONAL HTTP SETUP
706        // Create shared state for session management and bidirectional MCP
707        let sessions = Arc::new(RwLock::new(HashMap::new()));
708        let pending_requests = Arc::new(Mutex::new(HashMap::new()));
709
710        // Share router across all sessions (routing logic and handler registry)
711        let router = self.router.clone();
712
713        // Capture server identity for MCP protocol compliance
714        let server_info = turbomcp_protocol::ServerInfo {
715            name: self.config.name.clone(),
716            version: self.config.version.clone(),
717        };
718
719        // Factory pattern: create session-specific router for each HTTP request
720        // This is the clean architecture that HTTP requires - each session gets its own
721        // bidirectional dispatcher while sharing the routing logic
722        let sessions_for_factory = Arc::clone(&sessions);
723        let pending_for_factory = Arc::clone(&pending_requests);
724        let router_for_factory = Arc::clone(&router);
725
726        let handler_factory = move |session_id: Option<String>| {
727            let session_id = session_id.unwrap_or_else(|| {
728                let new_id = uuid::Uuid::new_v4().to_string();
729                tracing::warn!(
730                    "⚠️ Factory generating random session ID (no session ID provided): {}",
731                    new_id
732                );
733                new_id
734            });
735
736            tracing::debug!("Factory creating handler for session: {}", session_id);
737
738            // Create session-specific HTTP dispatcher (now local to turbomcp-server!)
739            let dispatcher = crate::runtime::http::HttpDispatcher::new(
740                session_id,
741                Arc::clone(&sessions_for_factory),
742                Arc::clone(&pending_for_factory),
743            );
744
745            // Clone the base router and configure with session-specific dispatcher
746            // CRITICAL: set_server_request_dispatcher also recreates server_to_client adapter
747            let mut session_router = (*router_for_factory).clone();
748            session_router.set_server_request_dispatcher(dispatcher);
749
750            session_router
751        };
752
753        info!(
754            server_name = %server_info.name,
755            server_version = %server_info.version,
756            bind_addr = %socket_addr,
757            endpoint_path = %config.endpoint_path,
758            "HTTP server starting with full bidirectional support (elicitation, sampling, roots, ping)"
759        );
760
761        // Use factory-based HTTP server with full bidirectional support
762        use crate::runtime::http::run_http;
763        run_http(
764            handler_factory,
765            sessions,
766            pending_requests,
767            socket_addr.to_string(),
768            config.endpoint_path.clone(),
769        )
770        .await
771        .map_err(|e| {
772            tracing::error!(error = %e, "HTTP server failed");
773            crate::ServerError::handler(e.to_string())
774        })?;
775
776        info!("HTTP server shutdown complete");
777        Ok(())
778    }
779
780    /// Run server with WebSocket transport (full bidirectional support)
781    ///
782    /// This provides a simple API for WebSocket servers with sensible defaults:
783    /// - Default endpoint: `/mcp/ws`
784    /// - Full MCP 2025-06-18 compliance
785    /// - Bidirectional communication
786    /// - Elicitation support
787    /// - Session management and middleware
788    ///
789    /// For custom configuration, use `run_websocket_with_config()`.
790    ///
791    /// # Example
792    ///
793    /// ```no_run
794    /// use turbomcp_server::ServerBuilder;
795    ///
796    /// #[tokio::main]
797    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
798    ///     let server = ServerBuilder::new()
799    ///         .name("ws-server")
800    ///         .version("1.0.0")
801    ///         .build();
802    ///
803    ///     server.run_websocket("127.0.0.1:8080").await?;
804    ///     Ok(())
805    /// }
806    /// ```
807    #[cfg(feature = "websocket")]
808    #[tracing::instrument(skip(self), fields(
809        transport = "websocket",
810        service_name = %self.config.name,
811        service_version = %self.config.version,
812        addr = ?addr
813    ))]
814    pub async fn run_websocket<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
815        self,
816        addr: A,
817    ) -> ServerResult<()> {
818        use crate::runtime::websocket::WebSocketServerConfig;
819
820        // Build default configuration
821        let config = WebSocketServerConfig::default();
822
823        self.run_websocket_with_config(addr, config).await
824    }
825
826    /// Run server with WebSocket transport and custom configuration
827    ///
828    /// This provides full control over WebSocket server configuration including:
829    /// - Custom endpoint path
830    /// - MCP server settings (middleware, security, etc.)
831    ///
832    /// # Example
833    ///
834    /// ```no_run
835    /// use turbomcp_server::{ServerBuilder, WebSocketServerConfig};
836    ///
837    /// #[tokio::main]
838    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
839    ///     let server = ServerBuilder::new()
840    ///         .name("custom-ws-server")
841    ///         .version("1.0.0")
842    ///         .build();
843    ///
844    ///     let config = WebSocketServerConfig {
845    ///         bind_addr: "0.0.0.0:8080".to_string(),
846    ///         endpoint_path: "/custom/ws".to_string(),
847    ///     };
848    ///
849    ///     server.run_websocket_with_config("127.0.0.1:8080", config).await?;
850    ///     Ok(())
851    /// }
852    /// ```
853    #[cfg(feature = "websocket")]
854    #[tracing::instrument(skip(self, config), fields(
855        transport = "websocket",
856        service_name = %self.config.name,
857        service_version = %self.config.version,
858        addr = ?addr
859    ))]
860    pub async fn run_websocket_with_config<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
861        self,
862        addr: A,
863        config: crate::runtime::websocket::WebSocketServerConfig,
864    ) -> ServerResult<()> {
865        info!("Starting MCP server with WebSocket transport");
866        info!(config = ?config, "WebSocket configuration");
867
868        self.lifecycle.start().await;
869
870        // Resolve address to string
871        let socket_addr = addr
872            .to_socket_addrs()
873            .map_err(|e| crate::ServerError::configuration(format!("Invalid address: {}", e)))?
874            .next()
875            .ok_or_else(|| crate::ServerError::configuration("No address resolved"))?;
876
877        info!("Resolved address: {}", socket_addr);
878
879        // Capture server identity for MCP protocol compliance
880        let server_info = turbomcp_protocol::ServerInfo {
881            name: self.config.name.clone(),
882            version: self.config.version.clone(),
883        };
884
885        // Router for this server (shared across all connections)
886        let router = (*self.router).clone();
887
888        // Wrapper factory: configure router with per-connection dispatcher
889        // This is the same clean architecture as HTTP - each connection gets its own
890        // bidirectional dispatcher while sharing the routing logic
891        let wrapper_factory =
892            move |mut base_router: crate::routing::RequestRouter,
893                  dispatcher: crate::runtime::websocket::WebSocketServerDispatcher| {
894                // Clone the router for this connection and configure with dispatcher
895                // CRITICAL: set_server_request_dispatcher also recreates server_to_client adapter
896                base_router.set_server_request_dispatcher(dispatcher);
897                base_router
898            };
899
900        info!(
901            server_name = %server_info.name,
902            server_version = %server_info.version,
903            bind_addr = %socket_addr,
904            endpoint_path = %config.endpoint_path,
905            "WebSocket server starting with full bidirectional support (elicitation, sampling, roots, ping)"
906        );
907
908        // Use factory-based WebSocket server with full bidirectional support
909        use crate::runtime::websocket::run_websocket;
910
911        // Update config with resolved bind address
912        let ws_config = crate::runtime::websocket::WebSocketServerConfig {
913            bind_addr: socket_addr.to_string(),
914            endpoint_path: config.endpoint_path.clone(),
915            max_concurrent_requests: config.max_concurrent_requests,
916        };
917
918        run_websocket(router, wrapper_factory, ws_config)
919            .await
920            .map_err(|e| {
921                tracing::error!(error = %e, "WebSocket server failed");
922                crate::ServerError::handler(e.to_string())
923            })?;
924
925        info!("WebSocket server shutdown complete");
926        Ok(())
927    }
928
929    /// Run server with TCP transport (progressive enhancement - runtime configuration)
930    #[cfg(feature = "tcp")]
931    #[tracing::instrument(skip(self), fields(
932        transport = "tcp",
933        service_name = %self.config.name,
934        service_version = %self.config.version,
935        addr = ?addr
936    ))]
937    pub async fn run_tcp<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
938        mut self,
939        addr: A,
940    ) -> ServerResult<()> {
941        use turbomcp_transport::TcpTransport;
942
943        // Start performance monitoring for TCP server
944        let _perf_span = info_span!("server.run", transport = "tcp").entered();
945        info!(?addr, "Starting MCP server with TCP transport");
946
947        self.lifecycle.start().await;
948
949        // Convert ToSocketAddrs to SocketAddr
950        let socket_addr = match addr.to_socket_addrs() {
951            Ok(mut addrs) => match addrs.next() {
952                Some(addr) => addr,
953                None => {
954                    tracing::error!("No socket address resolved from provided address");
955                    self.lifecycle.shutdown().await;
956                    return Err(crate::ServerError::configuration("Invalid socket address"));
957                }
958            },
959            Err(e) => {
960                tracing::error!(error = %e, "Failed to resolve socket address");
961                self.lifecycle.shutdown().await;
962                return Err(crate::ServerError::configuration(format!(
963                    "Address resolution failed: {e}"
964                )));
965            }
966        };
967
968        let transport = TcpTransport::new_server(socket_addr);
969        if let Err(e) = transport.connect().await {
970            tracing::error!(error = %e, "Failed to connect TCP transport");
971            self.lifecycle.shutdown().await;
972            return Err(e.into());
973        }
974
975        // BIDIRECTIONAL TCP SETUP
976        // Create generic transport dispatcher for server-initiated requests
977        let dispatcher = crate::runtime::TransportDispatcher::new(transport);
978
979        // Configure router's bidirectional support with the TCP dispatcher
980        // This enables ctx.elicit(), ctx.create_message(), ctx.list_roots(), etc.
981        let router = Arc::make_mut(&mut self.router);
982        router.set_server_request_dispatcher(dispatcher.clone());
983
984        // Run TCP with full bidirectional support (MCP 2025-06-18 compliant)
985        // This uses the generic bidirectional runtime that handles both:
986        // - Client→Server requests (tools, resources, prompts)
987        // - Server→Client requests (sampling, elicitation, roots, ping)
988        crate::runtime::run_transport_bidirectional(self.router.clone(), dispatcher)
989            .await
990            .map_err(|e| crate::ServerError::Handler {
991                message: format!("TCP bidirectional runtime failed: {}", e),
992                context: Some("run_tcp".to_string()),
993            })
994    }
995
996    /// Run server with Unix socket transport (progressive enhancement - runtime configuration)
997    #[cfg(all(feature = "unix", unix))]
998    #[tracing::instrument(skip(self), fields(
999        transport = "unix",
1000        service_name = %self.config.name,
1001        service_version = %self.config.version,
1002        path = ?path.as_ref()
1003    ))]
1004    pub async fn run_unix<P: AsRef<std::path::Path>>(mut self, path: P) -> ServerResult<()> {
1005        use std::path::PathBuf;
1006        use turbomcp_transport::UnixTransport;
1007
1008        // Start performance monitoring for Unix server
1009        let _perf_span = info_span!("server.run", transport = "unix").entered();
1010        info!(path = ?path.as_ref(), "Starting MCP server with Unix socket transport");
1011
1012        self.lifecycle.start().await;
1013
1014        let socket_path = PathBuf::from(path.as_ref());
1015        let transport = UnixTransport::new_server(socket_path);
1016        if let Err(e) = transport.connect().await {
1017            tracing::error!(error = %e, "Failed to connect Unix socket transport");
1018            self.lifecycle.shutdown().await;
1019            return Err(e.into());
1020        }
1021
1022        // BIDIRECTIONAL UNIX SOCKET SETUP
1023        // Create generic transport dispatcher for server-initiated requests
1024        let dispatcher = crate::runtime::TransportDispatcher::new(transport);
1025
1026        // Configure router's bidirectional support with the Unix socket dispatcher
1027        // This enables ctx.elicit(), ctx.create_message(), ctx.list_roots(), etc.
1028        let router = Arc::make_mut(&mut self.router);
1029        router.set_server_request_dispatcher(dispatcher.clone());
1030
1031        // Run Unix Socket with full bidirectional support (MCP 2025-06-18 compliant)
1032        // This uses the generic bidirectional runtime that handles both:
1033        // - Client→Server requests (tools, resources, prompts)
1034        // - Server→Client requests (sampling, elicitation, roots, ping)
1035        crate::runtime::run_transport_bidirectional(self.router.clone(), dispatcher)
1036            .await
1037            .map_err(|e| crate::ServerError::Handler {
1038                message: format!("Unix socket bidirectional runtime failed: {}", e),
1039                context: Some("run_unix".to_string()),
1040            })
1041    }
1042
1043    /// Generic transport runner (DRY principle)
1044    /// Used by feature-gated transport methods (http, tcp, websocket, unix)
1045    #[allow(dead_code)]
1046    #[tracing::instrument(skip(self, transport), fields(
1047        service_name = %self.config.name,
1048        service_version = %self.config.version
1049    ))]
1050    async fn run_with_transport<T: Transport>(&self, mut transport: T) -> ServerResult<()> {
1051        // Install signal handlers for graceful shutdown (Ctrl+C / SIGTERM)
1052        let lifecycle_for_sigint = self.lifecycle.clone();
1053        tokio::spawn(async move {
1054            if let Err(e) = tokio::signal::ctrl_c().await {
1055                tracing::warn!(error = %e, "Failed to install Ctrl+C handler");
1056                return;
1057            }
1058            tracing::info!("Ctrl+C received, initiating shutdown");
1059            lifecycle_for_sigint.shutdown().await;
1060        });
1061
1062        #[cfg(unix)]
1063        {
1064            let lifecycle_for_sigterm = self.lifecycle.clone();
1065            tokio::spawn(async move {
1066                use tokio::signal::unix::{SignalKind, signal};
1067                match signal(SignalKind::terminate()) {
1068                    Ok(mut sigterm) => {
1069                        sigterm.recv().await;
1070                        tracing::info!("SIGTERM received, initiating shutdown");
1071                        lifecycle_for_sigterm.shutdown().await;
1072                    }
1073                    Err(e) => tracing::warn!(error = %e, "Failed to install SIGTERM handler"),
1074                }
1075            });
1076        }
1077
1078        // Shutdown signal
1079        let mut shutdown = self.lifecycle.shutdown_signal();
1080
1081        // Main message processing loop
1082        loop {
1083            tokio::select! {
1084                _ = shutdown.recv() => {
1085                    tracing::info!("Shutdown signal received");
1086                    break;
1087                }
1088                res = transport.receive() => {
1089                    match res {
1090                        Ok(Some(message)) => {
1091                            if let Err(e) = self.handle_transport_message(&mut transport, message).await {
1092                                tracing::warn!(error = %e, "Failed to handle transport message");
1093                            }
1094                        }
1095                        Ok(None) => {
1096                            // No message available; sleep briefly to avoid busy loop
1097                            sleep(Duration::from_millis(5)).await;
1098                        }
1099                        Err(e) => {
1100                            match e {
1101                                TransportError::ReceiveFailed(msg) if msg.contains("disconnected") => {
1102                                    tracing::info!("Transport receive channel disconnected; shutting down");
1103                                    break;
1104                                }
1105                                _ => {
1106                                    tracing::error!(error = %e, "Transport receive failed");
1107                                    // Backoff on errors
1108                                    sleep(Duration::from_millis(50)).await;
1109                                }
1110                            }
1111                        }
1112                    }
1113                }
1114            }
1115        }
1116
1117        // Disconnect transport
1118        if let Err(e) = transport.disconnect().await {
1119            tracing::warn!(error = %e, "Error while disconnecting transport");
1120        }
1121
1122        tracing::info!("Server shutdown complete");
1123        Ok(())
1124    }
1125}
1126
1127// Compile-time assertion that McpServer is Send + Clone (Tower pattern)
1128// Note: McpServer is Clone but NOT Sync (due to BoxCloneService being !Sync)
1129// This is intentional and follows the Axum/Tower design pattern
1130#[allow(dead_code)]
1131const _: () = {
1132    const fn assert_send_clone<T: Send + Clone>() {}
1133    const fn check() {
1134        assert_send_clone::<crate::server::core::McpServer>();
1135    }
1136};