turbomcp_server/server/
core.rs

1//! Core MCP server implementation
2//!
3//! Contains the main McpServer struct and its core functionality including
4//! middleware building, lifecycle management, and server construction.
5
6use std::sync::Arc;
7use tracing::{info, info_span};
8
9#[cfg(feature = "http")]
10use tracing::warn;
11
12use crate::{
13    config::ServerConfig,
14    error::ServerResult,
15    lifecycle::{HealthStatus, ServerLifecycle},
16    metrics::ServerMetrics,
17    registry::HandlerRegistry,
18    routing::RequestRouter,
19    service::McpService,
20};
21
22#[cfg(feature = "middleware")]
23use crate::middleware::MiddlewareStack;
24
25use bytes::Bytes;
26use http::{Request, Response};
27use tokio::time::{Duration, sleep};
28use turbomcp_transport::Transport;
29use turbomcp_transport::core::TransportError;
30
31use super::shutdown::ShutdownHandle;
32
33/// Check if logging should be enabled for STDIO transport
34///
35/// For MCP STDIO transport compliance, logging is disabled by default since stdout
36/// must be reserved exclusively for JSON-RPC messages. This can be overridden by
37/// setting the TURBOMCP_FORCE_LOGGING environment variable.
38pub(crate) fn should_log_for_stdio() -> bool {
39    std::env::var("TURBOMCP_FORCE_LOGGING").is_ok()
40}
41
42/// Wrapper that holds router + headers + tenant_id and implements JsonRpcHandler
43/// This allows us to pass headers and tenant info to create_context without storing them on the router.
44/// Used by both HTTP and WebSocket transports.
45#[cfg(any(feature = "http", feature = "websocket"))]
46struct HttpHandlerWithHeaders {
47    router: crate::routing::RequestRouter,
48    headers: Option<std::collections::HashMap<String, String>>,
49    transport: &'static str,
50    tenant_id: Option<String>,
51}
52
53#[cfg(any(feature = "http", feature = "websocket"))]
54#[async_trait::async_trait]
55impl turbomcp_protocol::JsonRpcHandler for HttpHandlerWithHeaders {
56    async fn handle_request(&self, req_value: serde_json::Value) -> serde_json::Value {
57        use turbomcp_protocol::jsonrpc::JsonRpcRequest;
58
59        // Parse the request
60        let req: JsonRpcRequest = match serde_json::from_value(req_value) {
61            Ok(r) => r,
62            Err(e) => {
63                return serde_json::json!({
64                    "jsonrpc": "2.0",
65                    "error": {
66                        "code": -32700,
67                        "message": format!("Parse error: {}", e)
68                    },
69                    "id": null
70                });
71            }
72        };
73
74        // Create context with headers, transport type, and tenant_id
75        // tenant_id is extracted from request extensions by the HTTP/WebSocket handlers
76        // if TenantExtractionLayer middleware was applied
77        let ctx = self.router.create_context(
78            self.headers.clone(),
79            Some(self.transport),
80            self.tenant_id.clone(),
81        );
82
83        // Route the request
84        let response = self.router.route(req, ctx).await;
85
86        // Serialize response
87        match serde_json::to_value(&response) {
88            Ok(v) => v,
89            Err(e) => {
90                serde_json::json!({
91                    "jsonrpc": "2.0",
92                    "error": {
93                        "code": -32603,
94                        "message": format!("Internal error: failed to serialize response: {}", e)
95                    },
96                    "id": response.id
97                })
98            }
99        }
100    }
101
102    fn server_info(&self) -> turbomcp_protocol::ServerInfo {
103        self.router.server_info()
104    }
105}
106
107/// Main MCP server following the Axum/Tower Clone pattern
108///
109/// ## Sharing Pattern
110///
111/// `McpServer` implements `Clone` like Axum's `Router`. All heavy state is Arc-wrapped
112/// internally, making cloning cheap (just atomic reference count increments).
113///
114/// ```rust,no_run
115/// use turbomcp_server::ServerBuilder;
116///
117/// # async fn example() {
118/// let server = ServerBuilder::new().build();
119///
120/// // Clone for passing to functions (cheap - just Arc increments)
121/// let server1 = server.clone();
122/// let server2 = server.clone();
123///
124/// // Access config and health
125/// let config = server1.config();
126/// println!("Server: {}", config.name);
127///
128/// let health = server2.health().await;
129/// println!("Health: {:?}", health);
130/// # }
131/// ```
132///
133/// ## Architecture Notes
134///
135/// The `service` field contains `BoxCloneService` which is `Send + Clone` but NOT `Sync`.
136/// This is intentional and follows Tower's design - users clone the server instead of
137/// Arc-wrapping it.
138///
139/// **Architecture Note**: The service field provides tower::Service integration for
140/// advanced middleware patterns. The request processing pipeline currently uses the
141/// RequestRouter directly. Tower integration can be added via custom middleware layers
142/// when needed for specific use cases (e.g., custom rate limiting, advanced tracing).
143#[derive(Clone)]
144pub struct McpServer {
145    /// Server configuration (Clone-able)
146    pub(crate) config: ServerConfig,
147    /// Handler registry (Arc-wrapped for cheap cloning)
148    pub(crate) registry: Arc<HandlerRegistry>,
149    /// Request router (Arc-wrapped for cheap cloning)
150    pub(crate) router: Arc<RequestRouter>,
151    /// Tower middleware service stack (Clone but !Sync - this is the Tower pattern)
152    ///
153    /// All requests flow through this service stack, which provides:
154    /// - Timeout enforcement
155    /// - Request validation
156    /// - Authorization checks
157    /// - Rate limiting
158    /// - Audit logging
159    /// - And more middleware layers as configured
160    ///
161    /// See `server/transport.rs` for integration with transport layer.
162    pub(crate) service:
163        tower::util::BoxCloneService<Request<Bytes>, Response<Bytes>, crate::ServerError>,
164    /// Server lifecycle (Arc-wrapped for cheap cloning)
165    pub(crate) lifecycle: Arc<ServerLifecycle>,
166    /// Server metrics (Arc-wrapped for cheap cloning)
167    pub(crate) metrics: Arc<ServerMetrics>,
168    /// Task storage for MCP Tasks API (SEP-1686)
169    #[cfg(feature = "mcp-tasks")]
170    pub(crate) task_storage: Arc<crate::task_storage::TaskStorage>,
171}
172
173impl std::fmt::Debug for McpServer {
174    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
175        f.debug_struct("McpServer")
176            .field("config", &self.config)
177            .finish()
178    }
179}
180
181impl McpServer {
182    /// Build comprehensive Tower middleware stack (transport-agnostic)
183    ///
184    /// ## Architecture
185    ///
186    /// This creates a complete Tower service stack with conditional middleware layers:
187    /// - **Timeout Layer**: Request timeout enforcement (tower_http)
188    /// - **Validation Layer**: JSON-RPC structure validation
189    /// - **Authorization Layer**: Resource access control
190    /// - **Core Service**: JSON-RPC routing and handler execution
191    ///
192    /// All middleware is composed using Tower's ServiceBuilder pattern, which provides:
193    /// - Top-to-bottom execution order
194    /// - Type-safe layer composition
195    /// - Zero-cost abstractions
196    /// - Clone-able service instances
197    ///
198    /// ## Integration
199    ///
200    /// The resulting BoxCloneService is stored in `self.service` and called from
201    /// `server/transport.rs` for every incoming request. This ensures ALL requests
202    /// flow through the complete middleware pipeline before reaching handlers.
203    ///
204    /// ## Adding Middleware
205    ///
206    /// To add new middleware, update the match arms below to include your layer.
207    /// Follow the pattern of conditional inclusion based on config flags.
208    #[cfg(feature = "middleware")]
209    fn build_middleware_stack(
210        core_service: McpService,
211        stack: MiddlewareStack,
212    ) -> tower::util::BoxCloneService<Request<Bytes>, Response<Bytes>, crate::ServerError> {
213        // COMPREHENSIVE TOWER COMPOSITION - Conditional Layer Stacking
214        //
215        // This approach builds the middleware stack incrementally, boxing at each step.
216        // While this has a small performance cost from multiple boxing operations,
217        // it provides several critical advantages:
218        //
219        // 1. **Maintainability**: No combinatorial explosion (8 match arms → simple chain)
220        // 2. **Extensibility**: Adding new middleware requires only one new block
221        // 3. **Clarity**: Each layer's purpose and configuration is explicit
222        // 4. **Type Safety**: BoxCloneService provides type erasure while preserving Clone
223        //
224        // Performance note: The boxing overhead is negligible compared to network I/O
225        // and handler execution time. Modern allocators make this essentially free.
226
227        // Start with core service as a boxed service for uniform type handling
228        let mut service: tower::util::BoxCloneService<
229            Request<Bytes>,
230            Response<Bytes>,
231            crate::ServerError,
232        > = tower::util::BoxCloneService::new(core_service);
233
234        // Authorization layer removed in 2.0.0 - handle at application layer
235
236        // Layer 2: Validation
237        // Validates request structure after auth but before processing
238        #[cfg(feature = "middleware")]
239        {
240            if let Some(validation_layer) = stack.validation_layer() {
241                service = tower::util::BoxCloneService::new(
242                    tower::ServiceBuilder::new()
243                        .layer(validation_layer)
244                        .service(service),
245                );
246            }
247        }
248
249        // Layer 3: Timeout (outermost)
250        // Applied last so it can enforce timeout on the entire request pipeline
251        #[cfg(feature = "middleware")]
252        {
253            if let Some(timeout_config) = stack.timeout_config
254                && timeout_config.enabled
255            {
256                service = tower::util::BoxCloneService::new(
257                    tower::ServiceBuilder::new()
258                        .layer(tower_http::timeout::TimeoutLayer::new(
259                            timeout_config.request_timeout,
260                        ))
261                        .service(service),
262                );
263            }
264        }
265
266        // Future middleware can be added here with similar if-let blocks:
267        // if let Some(auth_config) = stack.auth_config { ... }
268        // if let Some(audit_config) = stack.audit_config { ... }
269        // if let Some(rate_limit_config) = stack.rate_limit_config { ... }
270
271        service
272    }
273
274    /// Create a new server
275    #[must_use]
276    pub fn new(config: ServerConfig) -> Self {
277        Self::new_with_registry(config, HandlerRegistry::new())
278    }
279
280    /// Create a new server with an existing registry (used by ServerBuilder)
281    #[must_use]
282    pub(crate) fn new_with_registry(config: ServerConfig, registry: HandlerRegistry) -> Self {
283        let registry = Arc::new(registry);
284        let metrics = Arc::new(ServerMetrics::new());
285
286        // Initialize task storage (SEP-1686)
287        #[cfg(feature = "mcp-tasks")]
288        let task_storage = {
289            use tokio::time::Duration;
290            let storage = crate::task_storage::TaskStorage::new(Duration::from_secs(60));
291            // Start background cleanup task
292            storage.start_cleanup();
293            Arc::new(storage)
294        };
295
296        let router = Arc::new(RequestRouter::new(
297            Arc::clone(&registry),
298            Arc::clone(&metrics),
299            config.clone(),
300            #[cfg(feature = "mcp-tasks")]
301            Some(Arc::clone(&task_storage)),
302        ));
303        // Build middleware stack configuration
304        #[cfg(feature = "middleware")]
305        #[cfg_attr(not(feature = "rate-limiting"), allow(unused_mut))]
306        let mut stack = crate::middleware::MiddlewareStack::new();
307
308        // Auto-install rate limiting if enabled in config
309        #[cfg(feature = "rate-limiting")]
310        if config.rate_limiting.enabled {
311            use crate::middleware::rate_limit::{RateLimitStrategy, RateLimits};
312            use std::num::NonZeroU32;
313            use std::time::Duration;
314
315            let rate_config = crate::middleware::RateLimitConfig {
316                strategy: RateLimitStrategy::Global,
317                limits: RateLimits {
318                    requests_per_period: NonZeroU32::new(
319                        config.rate_limiting.requests_per_second * 60,
320                    )
321                    .unwrap(), // Convert per-second to per-minute
322                    period: Duration::from_secs(60),
323                    burst_size: Some(NonZeroU32::new(config.rate_limiting.burst_capacity).unwrap()),
324                },
325                enabled: true,
326            };
327
328            stack = stack.with_rate_limit(rate_config);
329        }
330
331        // Create core MCP service
332        let core_service = McpService::new(
333            Arc::clone(&registry),
334            Arc::clone(&router),
335            Arc::clone(&metrics),
336        );
337
338        // COMPREHENSIVE TOWER SERVICE COMPOSITION
339        // Build the complete middleware stack with proper type erasure
340        //
341        // This service is called from server/transport.rs for EVERY incoming request:
342        // TransportMessage -> http::Request -> service.call() -> http::Response -> TransportMessage
343        //
344        // The Tower middleware stack provides:
345        // ✓ Timeout enforcement (configurable per-request)
346        // ✓ Request validation (JSON-RPC structure)
347        // ✓ Authorization checks (resource access control)
348        // ✓ Rate limiting (if enabled in config)
349        // ✓ Audit logging (configurable)
350        // ✓ And more layers as configured
351        //
352        // BoxCloneService is Clone but !Sync - this is the Tower pattern
353        #[cfg(feature = "middleware")]
354        let service = Self::build_middleware_stack(core_service, stack);
355
356        #[cfg(not(feature = "middleware"))]
357        let service = tower::util::BoxCloneService::new(core_service);
358
359        let lifecycle = Arc::new(ServerLifecycle::new());
360
361        Self {
362            config,
363            registry,
364            router,
365            service,
366            lifecycle,
367            metrics,
368            #[cfg(feature = "mcp-tasks")]
369            task_storage,
370        }
371    }
372
373    /// Get server configuration
374    #[must_use]
375    pub const fn config(&self) -> &ServerConfig {
376        &self.config
377    }
378
379    /// Get handler registry
380    #[must_use]
381    pub const fn registry(&self) -> &Arc<HandlerRegistry> {
382        &self.registry
383    }
384
385    /// Get request router
386    #[must_use]
387    pub const fn router(&self) -> &Arc<RequestRouter> {
388        &self.router
389    }
390
391    /// Get server lifecycle
392    #[must_use]
393    pub const fn lifecycle(&self) -> &Arc<ServerLifecycle> {
394        &self.lifecycle
395    }
396
397    /// Get server metrics
398    #[must_use]
399    pub const fn metrics(&self) -> &Arc<ServerMetrics> {
400        &self.metrics
401    }
402
403    /// Get task storage (MCP Tasks API - SEP-1686)
404    ///
405    /// Returns the task storage instance for managing long-running operations.
406    /// Only available when the `mcp-tasks` feature is enabled.
407    #[cfg(feature = "mcp-tasks")]
408    #[must_use]
409    pub const fn task_storage(&self) -> &Arc<crate::task_storage::TaskStorage> {
410        &self.task_storage
411    }
412
413    /// Get the Tower service stack (test accessor)
414    ///
415    /// **Note**: This is primarily for integration testing. Production code should
416    /// use the transport layer which calls the service internally via
417    /// `handle_transport_message()`.
418    ///
419    /// Returns a clone of the Tower service stack, which is cheap (BoxCloneService
420    /// is designed for cloning).
421    #[doc(hidden)]
422    pub fn service(
423        &self,
424    ) -> tower::util::BoxCloneService<Request<Bytes>, Response<Bytes>, crate::ServerError> {
425        self.service.clone()
426    }
427
428    /// Get a shutdown handle for graceful server termination
429    ///
430    /// This handle enables external control over server shutdown, essential for:
431    /// - **Production deployments**: Graceful shutdown on SIGTERM/SIGINT
432    /// - **Container orchestration**: Kubernetes graceful pod termination
433    /// - **Load balancer integration**: Health check coordination
434    /// - **Multi-component systems**: Coordinated shutdown sequences
435    /// - **Maintenance operations**: Planned downtime and updates
436    ///
437    /// # Examples
438    ///
439    /// ## Basic shutdown coordination
440    /// ```no_run
441    /// # use turbomcp_server::ServerBuilder;
442    /// # #[tokio::main]
443    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
444    /// let server = ServerBuilder::new().build();
445    /// let shutdown_handle = server.shutdown_handle();
446    ///
447    /// // Coordinate with other services
448    /// tokio::spawn(async move {
449    ///     // Wait for external shutdown signal
450    ///     tokio::signal::ctrl_c().await.expect("Failed to install Ctrl+C handler");
451    ///     println!("Shutdown signal received, terminating gracefully...");
452    ///     shutdown_handle.shutdown().await;
453    /// });
454    ///
455    /// // Server will gracefully shut down when signaled
456    /// // server.run_stdio().await?;
457    /// # Ok(())
458    /// # }
459    /// ```
460    ///
461    /// ## Container/Kubernetes deployment
462    /// ```no_run
463    /// # use turbomcp_server::ServerBuilder;
464    /// # use std::sync::Arc;
465    /// # #[tokio::main]
466    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
467    /// let server = ServerBuilder::new().build();
468    /// let shutdown_handle = server.shutdown_handle();
469    /// let shutdown_handle_clone = shutdown_handle.clone();
470    ///
471    /// // Handle multiple signal types with proper platform support
472    /// tokio::spawn(async move {
473    ///     #[cfg(unix)]
474    ///     {
475    ///         use tokio::signal::unix::{signal, SignalKind};
476    ///         let mut sigterm = signal(SignalKind::terminate()).unwrap();
477    ///         tokio::select! {
478    ///             _ = tokio::signal::ctrl_c() => {
479    ///                 println!("SIGINT received");
480    ///             }
481    ///             _ = sigterm.recv() => {
482    ///                 println!("SIGTERM received");
483    ///             }
484    ///         }
485    ///     }
486    ///     #[cfg(not(unix))]
487    ///     {
488    ///         tokio::signal::ctrl_c().await.expect("Failed to install Ctrl+C handler");
489    ///         println!("SIGINT received");
490    ///     }
491    ///     shutdown_handle_clone.shutdown().await;
492    /// });
493    ///
494    /// // Server handles graceful shutdown automatically
495    /// // server.run_tcp("0.0.0.0:8080").await?;
496    /// # Ok(())
497    /// # }
498    /// ```
499    pub fn shutdown_handle(&self) -> ShutdownHandle {
500        ShutdownHandle::new(self.lifecycle.clone())
501    }
502
503    /// Run the server with STDIO transport
504    ///
505    /// # Errors
506    ///
507    /// Returns [`crate::ServerError::Transport`] if:
508    /// - STDIO transport connection fails
509    /// - Message sending/receiving fails
510    /// - Transport disconnection fails
511    #[tracing::instrument(skip(self), fields(
512        transport = "stdio",
513        service_name = %self.config.name,
514        service_version = %self.config.version
515    ))]
516    pub async fn run_stdio(mut self) -> ServerResult<()> {
517        // For STDIO transport, disable logging unless explicitly overridden
518        // STDIO stdout must be reserved exclusively for JSON-RPC messages per MCP protocol
519        if should_log_for_stdio() {
520            info!("Starting MCP server with STDIO transport");
521        }
522
523        // Start performance monitoring for STDIO server
524        let _perf_span = info_span!("server.run", transport = "stdio").entered();
525        info!("Initializing STDIO transport for MCP server");
526
527        self.lifecycle.start().await;
528
529        // BIDIRECTIONAL STDIO SETUP
530        // Create STDIO dispatcher for server-initiated requests (sampling, elicitation, roots, ping)
531        let (request_tx, request_rx) = tokio::sync::mpsc::unbounded_channel();
532
533        // Use fully-qualified path to avoid ambiguity with the turbomcp crate's runtime module
534        let dispatcher = crate::runtime::StdioDispatcher::new(request_tx);
535
536        // Configure router's bidirectional support with the STDIO dispatcher
537        // SAFETY: We have &mut self, so we can safely get mutable access to the Arc'd router
538        // This is the CRITICAL STEP that was missing - without this, all server→client requests fail
539        let router = Arc::make_mut(&mut self.router);
540        router.set_server_request_dispatcher(dispatcher.clone());
541
542        // Run STDIO with full bidirectional support (MCP 2025-11-25 compliant)
543        // This uses the bidirectional-aware runtime that handles both:
544        // - Client→Server requests (tools, resources, prompts)
545        // - Server→Client requests (sampling, elicitation, roots, ping)
546        crate::runtime::run_stdio_bidirectional(self.router.clone(), dispatcher, request_rx)
547            .await
548            .map_err(|e| crate::ServerError::Handler {
549                message: format!("STDIO bidirectional runtime failed: {}", e),
550                context: Some("run_stdio".to_string()),
551            })
552    }
553
554    /// Get health status
555    pub async fn health(&self) -> HealthStatus {
556        self.lifecycle.health().await
557    }
558
559    /// Run server with HTTP transport using default configuration
560    ///
561    /// This provides a working HTTP server with:
562    /// - Standard HTTP POST/GET/DELETE for MCP protocol at `/mcp`
563    /// - Full MCP 2025-11-25 protocol compliance
564    /// - Graceful shutdown support
565    /// - Default rate limiting (100 req/60s)
566    /// - Default security settings (localhost allowed, CORS disabled)
567    ///
568    /// For custom configuration (rate limits, security, CORS), use `run_http_with_config`.
569    ///
570    /// # Examples
571    ///
572    /// ## Basic usage with default configuration
573    /// ```no_run
574    /// use turbomcp_server::ServerBuilder;
575    ///
576    /// #[tokio::main]
577    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
578    ///     let server = ServerBuilder::new()
579    ///         .name("my-server")
580    ///         .version("1.0.0")
581    ///         .build();
582    ///
583    ///     server.run_http("127.0.0.1:3000").await?;
584    ///     Ok(())
585    /// }
586    /// ```
587    ///
588    /// ## With custom configuration
589    /// ```no_run
590    /// use turbomcp_server::ServerBuilder;
591    /// use turbomcp_transport::streamable_http::StreamableHttpConfigBuilder;
592    /// use std::time::Duration;
593    ///
594    /// #[tokio::main]
595    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
596    ///     let server = ServerBuilder::new()
597    ///         .name("my-server")
598    ///         .version("1.0.0")
599    ///         .build();
600    ///
601    ///     let config = StreamableHttpConfigBuilder::new()
602    ///         .with_bind_address("127.0.0.1:3000")
603    ///         .allow_any_origin(true)  // Enable CORS for development
604    ///         .build();
605    ///
606    ///     server.run_http_with_config("127.0.0.1:3000", config).await?;
607    ///     Ok(())
608    /// }
609    /// ```
610    ///
611    /// # Errors
612    ///
613    /// Returns [`crate::ServerError::Transport`] if:
614    /// - Address resolution fails
615    /// - HTTP server fails to start
616    /// - Transport disconnection fails
617    #[cfg(feature = "http")]
618    #[tracing::instrument(skip(self), fields(
619        transport = "http",
620        service_name = %self.config.name,
621        service_version = %self.config.version,
622        addr = ?addr
623    ))]
624    pub async fn run_http<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
625        self,
626        addr: A,
627    ) -> ServerResult<()> {
628        use turbomcp_transport::streamable_http::StreamableHttpConfigBuilder;
629
630        // Build default configuration
631        let config = StreamableHttpConfigBuilder::new().build();
632
633        self.run_http_with_config(addr, config).await
634    }
635
636    /// Run server with HTTP transport and custom configuration
637    ///
638    /// This provides full control over HTTP server configuration including:
639    /// - Rate limiting (requests per time window, or disabled entirely)
640    /// - Security settings (CORS, origin validation, authentication)
641    /// - Network settings (bind address, endpoint path, keep-alive)
642    /// - Advanced settings (replay buffer size, etc.)
643    ///
644    /// # Bind Address Configuration
645    ///
646    /// **IMPORTANT**: The `addr` parameter takes precedence over `config.bind_addr`.
647    /// If they differ, a deprecation warning is logged.
648    ///
649    /// **Best Practice** (recommended for forward compatibility):
650    /// ```no_run
651    /// # use turbomcp_server::ServerBuilder;
652    /// # use turbomcp_transport::streamable_http::StreamableHttpConfigBuilder;
653    /// # #[tokio::main]
654    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
655    /// let config = StreamableHttpConfigBuilder::new()
656    ///     .with_bind_address("127.0.0.1:3001")  // Set bind address in config
657    ///     .build();
658    ///
659    /// // Pass matching addr parameter (or use default "127.0.0.1:8080")
660    /// ServerBuilder::new()
661    ///     .build()
662    ///     .run_http_with_config("127.0.0.1:3001", config).await?;
663    /// # Ok(())
664    /// # }
665    /// ```
666    ///
667    /// **Deprecated** (will be removed in v3.x):
668    /// ```no_run
669    /// # use turbomcp_server::ServerBuilder;
670    /// # use turbomcp_transport::streamable_http::StreamableHttpConfigBuilder;
671    /// # #[tokio::main]
672    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
673    /// // ⚠️ Avoid setting different addresses - causes deprecation warning
674    /// let config = StreamableHttpConfigBuilder::new()
675    ///     .with_bind_address("0.0.0.0:5000")  // This is ignored!
676    ///     .build();
677    ///
678    /// // The addr parameter wins (3001 is used, not 5000)
679    /// ServerBuilder::new()
680    ///     .build()
681    ///     .run_http_with_config("127.0.0.1:3001", config).await?;
682    /// # Ok(())
683    /// # }
684    /// ```
685    ///
686    /// **Future (v3.x)**: The `addr` parameter will be removed. Configure bind address
687    /// via `config.with_bind_address()` only.
688    ///
689    /// # Examples
690    ///
691    /// ## Custom configuration example
692    /// ```no_run
693    /// use turbomcp_server::ServerBuilder;
694    /// use turbomcp_transport::streamable_http::StreamableHttpConfigBuilder;
695    /// use std::time::Duration;
696    ///
697    /// #[tokio::main]
698    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
699    ///     let server = ServerBuilder::new()
700    ///         .name("custom-server")
701    ///         .version("1.0.0")
702    ///         .build();
703    ///
704    ///     let config = StreamableHttpConfigBuilder::new()
705    ///         .with_bind_address("127.0.0.1:3000")
706    ///         .with_rate_limit(1000, Duration::from_secs(60))  // 1000 req/min
707    ///         .build();
708    ///
709    ///     server.run_http_with_config("127.0.0.1:3000", config).await?;
710    ///     Ok(())
711    /// }
712    /// ```
713    ///
714    /// ## Production configuration (secure, rate limited)
715    /// ```no_run
716    /// use turbomcp_server::ServerBuilder;
717    /// use turbomcp_transport::streamable_http::StreamableHttpConfigBuilder;
718    /// use std::time::Duration;
719    ///
720    /// #[tokio::main]
721    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
722    ///     let server = ServerBuilder::new()
723    ///         .name("production-server")
724    ///         .version("1.0.0")
725    ///         .build();
726    ///
727    ///     let config = StreamableHttpConfigBuilder::new()
728    ///         .with_bind_address("127.0.0.1:3000")
729    ///         .with_rate_limit(1000, Duration::from_secs(60))  // 1000 req/min
730    ///         .allow_any_origin(false)  // Strict CORS
731    ///         .require_authentication(true)  // Require auth
732    ///         .build();
733    ///
734    ///     server.run_http_with_config("127.0.0.1:3000", config).await?;
735    ///     Ok(())
736    /// }
737    /// ```
738    ///
739    /// # Errors
740    ///
741    /// Returns [`crate::ServerError::Transport`] if:
742    /// - Address resolution fails
743    /// - HTTP server fails to start
744    /// - Transport disconnection fails
745    #[cfg(feature = "http")]
746    #[tracing::instrument(skip(self, config), fields(
747        transport = "http",
748        service_name = %self.config.name,
749        service_version = %self.config.version,
750        addr = ?addr
751    ))]
752    pub async fn run_http_with_config<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
753        self,
754        addr: A,
755        mut config: turbomcp_transport::streamable_http::StreamableHttpConfig,
756    ) -> ServerResult<()> {
757        use std::collections::HashMap;
758        use tokio::sync::{Mutex, RwLock};
759
760        // Sprint 2.6: Check for insecure 0.0.0.0 binding
761        crate::security_checks::check_binding_security(&addr);
762
763        info!("Starting MCP server with HTTP transport");
764
765        self.lifecycle.start().await;
766
767        // Resolve address to string
768        let socket_addr = addr
769            .to_socket_addrs()
770            .map_err(|e| crate::ServerError::configuration(format!("Invalid address: {}", e)))?
771            .next()
772            .ok_or_else(|| crate::ServerError::configuration("No address resolved"))?;
773
774        info!("Resolved address: {}", socket_addr);
775
776        // Check for conflicting bind addresses and warn about deprecation
777        let socket_addr_str = socket_addr.to_string();
778        if config.bind_addr != socket_addr_str {
779            warn!(
780                addr_parameter = %socket_addr_str,
781                config_bind_addr = %config.bind_addr,
782                "⚠️  DEPRECATION WARNING: The `addr` parameter takes precedence over `config.bind_addr`"
783            );
784            warn!(
785                "⚠️  In TurboMCP v3.x, the `addr` parameter will be removed. Please use StreamableHttpConfigBuilder::new().with_bind_address(\"{}\").build() instead",
786                socket_addr_str
787            );
788            warn!(
789                "⚠️  Avoid setting both `addr` parameter and `config.bind_addr` to prevent confusion"
790            );
791
792            // Update config to single source of truth
793            config.bind_addr = socket_addr_str.clone();
794            config.base_url = format!("http://{}", socket_addr_str);
795        }
796
797        info!(
798            config = ?config,
799            "HTTP configuration (updated with resolved address)"
800        );
801
802        // BIDIRECTIONAL HTTP SETUP
803        // Create shared state for session management and bidirectional MCP
804        let sessions = Arc::new(RwLock::new(HashMap::new()));
805        let pending_requests = Arc::new(Mutex::new(HashMap::new()));
806
807        // Share router across all sessions (routing logic and handler registry)
808        let router = self.router.clone();
809
810        // Capture server identity for MCP protocol compliance
811        let server_info = turbomcp_protocol::ServerInfo {
812            name: self.config.name.clone(),
813            version: self.config.version.clone(),
814        };
815
816        // Factory pattern: create session-specific router for each HTTP request
817        // This is the clean architecture that HTTP requires - each session gets its own
818        // bidirectional dispatcher while sharing the routing logic
819        let sessions_for_factory = Arc::clone(&sessions);
820        let pending_for_factory = Arc::clone(&pending_requests);
821        let router_for_factory = Arc::clone(&router);
822
823        // Create a wrapper that converts headers and delegates to router
824        // This is cleaner than storing headers on the router itself
825        let handler_factory = move |session_id: Option<String>,
826                                    headers: Option<axum::http::HeaderMap>,
827                                    tenant_id: Option<String>| {
828            let session_id = session_id.unwrap_or_else(|| {
829                let new_id = uuid::Uuid::new_v4().to_string();
830                tracing::debug!(
831                    "HTTP POST without session ID - generating ephemeral ID for request: {}",
832                    new_id
833                );
834                new_id
835            });
836
837            tracing::debug!("Factory creating handler for session: {}", session_id);
838
839            // Create session-specific HTTP dispatcher (now local to turbomcp-server!)
840            let dispatcher = crate::runtime::http::HttpDispatcher::new(
841                session_id,
842                Arc::clone(&sessions_for_factory),
843                Arc::clone(&pending_for_factory),
844            );
845
846            // Clone the base router and configure with session-specific dispatcher
847            // CRITICAL: set_server_request_dispatcher also recreates server_to_client adapter
848            let mut session_router = (*router_for_factory).clone();
849            session_router.set_server_request_dispatcher(dispatcher);
850
851            // Convert HeaderMap to HashMap<String, String> for passing to create_context
852            let headers_map = headers.map(|header_map| {
853                header_map
854                    .iter()
855                    .filter_map(|(name, value)| {
856                        value
857                            .to_str()
858                            .ok()
859                            .map(|v| (name.to_string(), v.to_string()))
860                    })
861                    .collect()
862            });
863
864            // Create wrapper that passes headers and tenant_id to create_context (HTTP transport)
865            HttpHandlerWithHeaders {
866                router: session_router,
867                headers: headers_map,
868                transport: "http",
869                tenant_id,
870            }
871        };
872
873        info!(
874            server_name = %server_info.name,
875            server_version = %server_info.version,
876            bind_addr = %socket_addr,
877            endpoint_path = %config.endpoint_path,
878            "HTTP server starting with full bidirectional support (elicitation, sampling, roots, ping)"
879        );
880
881        // Use factory-based HTTP server with full bidirectional support
882        use crate::runtime::http::run_http;
883        run_http(handler_factory, sessions, pending_requests, config)
884            .await
885            .map_err(|e| {
886                tracing::error!(error = %e, "HTTP server failed");
887                crate::ServerError::handler(e.to_string())
888            })?;
889
890        info!("HTTP server shutdown complete");
891        Ok(())
892    }
893
894    /// Run server with HTTP transport and Tower middleware
895    ///
896    /// This method enables advanced features like multi-tenancy, authentication, rate limiting,
897    /// and other cross-cutting concerns by allowing you to apply Tower middleware layers to the
898    /// HTTP router.
899    ///
900    /// # Multi-Tenancy Example
901    ///
902    /// ```no_run
903    /// use turbomcp_server::ServerBuilder;
904    /// use turbomcp_server::middleware::tenancy::{HeaderTenantExtractor, TenantExtractionLayer};
905    /// use tower::ServiceBuilder;
906    ///
907    /// #[tokio::main]
908    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
909    ///     let server = ServerBuilder::new()
910    ///         .name("multi-tenant-server")
911    ///         .version("1.0.0")
912    ///         .build();
913    ///
914    ///     // Create tenant extractor middleware
915    ///     let tenant_extractor = HeaderTenantExtractor::new("X-Tenant-ID");
916    ///     let middleware = ServiceBuilder::new()
917    ///         .layer(TenantExtractionLayer::new(tenant_extractor));
918    ///
919    ///     // Run server with middleware
920    ///     server.run_http_with_middleware(
921    ///         "127.0.0.1:3000",
922    ///         Box::new(move |router| router.layer(middleware))
923    ///     ).await?;
924    ///
925    ///     Ok(())
926    /// }
927    /// ```
928    ///
929    /// # Authentication Example
930    ///
931    /// ```no_run
932    /// use turbomcp_server::ServerBuilder;
933    /// use tower::ServiceBuilder;
934    /// use tower_http::auth::RequireAuthorizationLayer;
935    ///
936    /// # #[tokio::main]
937    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
938    /// let server = ServerBuilder::new()
939    ///     .name("auth-server")
940    ///     .version("1.0.0")
941    ///     .build();
942    ///
943    /// // Add bearer token authentication
944    /// let middleware = ServiceBuilder::new()
945    ///     .layer(RequireAuthorizationLayer::bearer("secret-token"));
946    ///
947    /// server.run_http_with_middleware(
948    ///     "127.0.0.1:3000",
949    ///     Box::new(move |router| router.layer(middleware))
950    /// ).await?;
951    /// # Ok(())
952    /// # }
953    /// ```
954    #[cfg(feature = "http")]
955    #[tracing::instrument(skip(self, middleware_fn), fields(
956        transport = "http",
957        service_name = %self.config.name,
958        service_version = %self.config.version,
959        addr = ?addr
960    ))]
961    pub async fn run_http_with_middleware<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
962        self,
963        addr: A,
964        middleware_fn: Box<dyn FnOnce(axum::Router) -> axum::Router + Send>,
965    ) -> ServerResult<()> {
966        use std::collections::HashMap;
967        use tokio::sync::{Mutex, RwLock};
968
969        // Sprint 2.6: Check for insecure 0.0.0.0 binding
970        crate::security_checks::check_binding_security(&addr);
971
972        info!("Starting MCP server with HTTP transport and custom middleware");
973
974        self.lifecycle.start().await;
975
976        // Resolve address to string
977        let socket_addr = addr
978            .to_socket_addrs()
979            .map_err(|e| crate::ServerError::configuration(format!("Invalid address: {}", e)))?
980            .next()
981            .ok_or_else(|| crate::ServerError::configuration("No address resolved"))?;
982
983        info!("Resolved address: {}", socket_addr);
984
985        // BIDIRECTIONAL HTTP SETUP
986        let sessions = Arc::new(RwLock::new(HashMap::new()));
987        let pending_requests = Arc::new(Mutex::new(HashMap::new()));
988        let router = self.router.clone();
989
990        // Factory pattern for session-specific handlers
991        let sessions_for_factory = Arc::clone(&sessions);
992        let pending_for_factory = Arc::clone(&pending_requests);
993        let router_for_factory = Arc::clone(&router);
994
995        let handler_factory = move |session_id: Option<String>,
996                                    headers: Option<axum::http::HeaderMap>,
997                                    tenant_id: Option<String>| {
998            let session_id = session_id.unwrap_or_else(|| {
999                let new_id = uuid::Uuid::new_v4().to_string();
1000                tracing::debug!(
1001                    "HTTP POST without session ID - generating ephemeral ID for request: {}",
1002                    new_id
1003                );
1004                new_id
1005            });
1006
1007            tracing::debug!("Factory creating handler for session: {}", session_id);
1008
1009            let dispatcher = crate::runtime::http::HttpDispatcher::new(
1010                session_id,
1011                Arc::clone(&sessions_for_factory),
1012                Arc::clone(&pending_for_factory),
1013            );
1014
1015            let mut session_router = (*router_for_factory).clone();
1016            session_router.set_server_request_dispatcher(dispatcher);
1017
1018            let headers_map = headers.map(|header_map| {
1019                header_map
1020                    .iter()
1021                    .filter_map(|(name, value)| {
1022                        value
1023                            .to_str()
1024                            .ok()
1025                            .map(|v| (name.to_string(), v.to_string()))
1026                    })
1027                    .collect()
1028            });
1029
1030            HttpHandlerWithHeaders {
1031                router: session_router,
1032                headers: headers_map,
1033                transport: "http",
1034                tenant_id,
1035            }
1036        };
1037
1038        info!(
1039            server_name = %self.config.name,
1040            server_version = %self.config.version,
1041            bind_addr = %socket_addr,
1042            "HTTP server starting with custom middleware and bidirectional support"
1043        );
1044
1045        // Use run_http_with_middleware function
1046        use crate::runtime::http::run_http_with_middleware;
1047        use turbomcp_transport::streamable_http::StreamableHttpConfigBuilder;
1048
1049        // Create default config with resolved address
1050        let config = StreamableHttpConfigBuilder::new()
1051            .with_bind_address(socket_addr.to_string())
1052            .with_endpoint_path("/mcp")
1053            .allow_localhost(true)
1054            .build();
1055
1056        run_http_with_middleware(
1057            handler_factory,
1058            sessions,
1059            pending_requests,
1060            config,
1061            Some(middleware_fn),
1062        )
1063        .await
1064        .map_err(|e| {
1065            tracing::error!(error = %e, "HTTP server with middleware failed");
1066            crate::ServerError::handler(e.to_string())
1067        })?;
1068
1069        info!("HTTP server shutdown complete");
1070        Ok(())
1071    }
1072
1073    /// Run server with WebSocket transport (full bidirectional support)
1074    ///
1075    /// This provides a simple API for WebSocket servers with sensible defaults:
1076    /// - Default endpoint: `/mcp/ws`
1077    /// - Full MCP 2025-11-25 compliance
1078    /// - Bidirectional communication
1079    /// - Elicitation support
1080    /// - Session management and middleware
1081    ///
1082    /// For custom configuration, use `run_websocket_with_config()`.
1083    ///
1084    /// # Example
1085    ///
1086    /// ```no_run
1087    /// use turbomcp_server::ServerBuilder;
1088    ///
1089    /// #[tokio::main]
1090    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
1091    ///     let server = ServerBuilder::new()
1092    ///         .name("ws-server")
1093    ///         .version("1.0.0")
1094    ///         .build();
1095    ///
1096    ///     server.run_websocket("127.0.0.1:8080").await?;
1097    ///     Ok(())
1098    /// }
1099    /// ```
1100    #[cfg(feature = "websocket")]
1101    #[tracing::instrument(skip(self), fields(
1102        transport = "websocket",
1103        service_name = %self.config.name,
1104        service_version = %self.config.version,
1105        addr = ?addr
1106    ))]
1107    pub async fn run_websocket<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
1108        self,
1109        addr: A,
1110    ) -> ServerResult<()> {
1111        use crate::config::WebSocketServerConfig;
1112
1113        // Build default configuration
1114        let config = WebSocketServerConfig::default();
1115
1116        self.run_websocket_with_config(addr, config).await
1117    }
1118
1119    /// Run server with WebSocket transport and custom configuration
1120    ///
1121    /// This provides full control over WebSocket server configuration including:
1122    /// - Custom endpoint path
1123    /// - MCP server settings (middleware, security, etc.)
1124    ///
1125    /// # Example
1126    ///
1127    /// ```no_run
1128    /// use turbomcp_server::{ServerBuilder, WebSocketServerConfig};
1129    ///
1130    /// #[tokio::main]
1131    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
1132    ///     let server = ServerBuilder::new()
1133    ///         .name("custom-ws-server")
1134    ///         .version("1.0.0")
1135    ///         .build();
1136    ///
1137    ///     let config = WebSocketServerConfig {
1138    ///         bind_addr: "127.0.0.1:8080".to_string(),
1139    ///         endpoint_path: "/custom/ws".to_string(),
1140    ///         max_concurrent_requests: 100,
1141    ///     };
1142    ///
1143    ///     server.run_websocket_with_config("127.0.0.1:8080", config).await?;
1144    ///     Ok(())
1145    /// }
1146    /// ```
1147    #[cfg(feature = "websocket")]
1148    #[tracing::instrument(skip(self, config), fields(
1149        transport = "websocket",
1150        service_name = %self.config.name,
1151        service_version = %self.config.version,
1152        addr = ?addr
1153    ))]
1154    pub async fn run_websocket_with_config<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
1155        self,
1156        addr: A,
1157        config: crate::config::WebSocketServerConfig,
1158    ) -> ServerResult<()> {
1159        use axum::{Router, middleware, routing::get};
1160        use turbomcp_transport::axum::{WebSocketFactoryState, websocket_handler_with_factory};
1161        use turbomcp_transport::tower::SessionInfo;
1162
1163        // Sprint 2.6: Check for insecure 0.0.0.0 binding
1164        crate::security_checks::check_binding_security(&addr);
1165
1166        info!("Starting MCP server with WebSocket transport");
1167        info!(config = ?config, "WebSocket configuration");
1168
1169        self.lifecycle.start().await;
1170
1171        // Resolve address to string
1172        let socket_addr = addr
1173            .to_socket_addrs()
1174            .map_err(|e| crate::ServerError::configuration(format!("Invalid address: {}", e)))?
1175            .next()
1176            .ok_or_else(|| crate::ServerError::configuration("No address resolved"))?;
1177
1178        info!("Resolved address: {}", socket_addr);
1179
1180        // Capture server identity for MCP protocol compliance
1181        let server_info = turbomcp_protocol::ServerInfo {
1182            name: self.config.name.clone(),
1183            version: self.config.version.clone(),
1184        };
1185
1186        // Router for this server (shared across all connections)
1187        let router = (*self.router).clone();
1188
1189        // Factory: creates per-connection handler with bidirectional support
1190        // This is the unified architecture - transport layer handles WebSocket mechanics,
1191        // server layer provides MCP-specific handler logic
1192        let handler_factory =
1193            move |transport_dispatcher: turbomcp_transport::axum::WebSocketDispatcher,
1194                  headers: Option<std::collections::HashMap<String, String>>,
1195                  tenant_id: Option<String>| {
1196                // Wrap transport dispatcher with server layer adapter
1197                let server_dispatcher =
1198                    crate::routing::WebSocketDispatcherAdapter::new(transport_dispatcher);
1199
1200                // Clone router for this connection and configure with dispatcher
1201                let mut connection_router = router.clone();
1202                connection_router.set_server_request_dispatcher(server_dispatcher);
1203
1204                // Create wrapper that passes headers and tenant_id to create_context (WebSocket transport)
1205                // We can reuse HttpHandlerWithHeaders since it's generic
1206                Arc::new(HttpHandlerWithHeaders {
1207                    router: connection_router,
1208                    headers,
1209                    transport: "websocket",
1210                    tenant_id,
1211                }) as Arc<dyn turbomcp_protocol::JsonRpcHandler>
1212            };
1213
1214        info!(
1215            server_name = %server_info.name,
1216            server_version = %server_info.version,
1217            bind_addr = %socket_addr,
1218            endpoint_path = %config.endpoint_path,
1219            "WebSocket server starting with full bidirectional support (elicitation, sampling, roots, ping)"
1220        );
1221
1222        // Create factory state for transport layer
1223        let factory_state = WebSocketFactoryState::new(handler_factory);
1224
1225        // Session middleware to extract headers (same as HTTP transport)
1226        let session_middleware = |mut request: axum::extract::Request, next: middleware::Next| async move {
1227            let mut session = SessionInfo::new();
1228
1229            // Extract headers and store in session metadata
1230            for (name, value) in request.headers().iter() {
1231                if let Ok(value_str) = value.to_str() {
1232                    session
1233                        .metadata
1234                        .insert(name.to_string(), value_str.to_string());
1235                }
1236            }
1237
1238            // Extract specific useful headers
1239            if let Some(user_agent) = request
1240                .headers()
1241                .get("user-agent")
1242                .and_then(|v| v.to_str().ok())
1243            {
1244                session.user_agent = Some(user_agent.to_string());
1245            }
1246
1247            if let Some(remote_addr) = request
1248                .headers()
1249                .get("x-forwarded-for")
1250                .and_then(|v| v.to_str().ok())
1251            {
1252                session.remote_addr = Some(remote_addr.to_string());
1253            }
1254
1255            request.extensions_mut().insert(session);
1256            next.run(request).await
1257        };
1258
1259        // Build Axum router using transport layer
1260        let app = Router::new()
1261            .route(&config.endpoint_path, get(websocket_handler_with_factory))
1262            .with_state(factory_state)
1263            .layer(middleware::from_fn(session_middleware));
1264
1265        info!("WebSocket server bound to {}", socket_addr);
1266
1267        // Serve using Axum
1268        let listener = tokio::net::TcpListener::bind(socket_addr)
1269            .await
1270            .map_err(|e| crate::ServerError::configuration(format!("Failed to bind: {}", e)))?;
1271
1272        axum::serve(listener, app).await.map_err(|e| {
1273            tracing::error!(error = %e, "WebSocket server failed");
1274            crate::ServerError::handler(e.to_string())
1275        })?;
1276
1277        info!("WebSocket server shutdown complete");
1278        Ok(())
1279    }
1280
1281    /// Run server with TCP transport (progressive enhancement - runtime configuration)
1282    #[cfg(feature = "tcp")]
1283    #[tracing::instrument(skip(self), fields(
1284        transport = "tcp",
1285        service_name = %self.config.name,
1286        service_version = %self.config.version,
1287        addr = ?addr
1288    ))]
1289    pub async fn run_tcp<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
1290        mut self,
1291        addr: A,
1292    ) -> ServerResult<()> {
1293        use turbomcp_transport::TcpTransport;
1294
1295        // Sprint 2.6: Check for insecure 0.0.0.0 binding
1296        crate::security_checks::check_binding_security(&addr);
1297
1298        // Start performance monitoring for TCP server
1299        let _perf_span = info_span!("server.run", transport = "tcp").entered();
1300        info!(?addr, "Starting MCP server with TCP transport");
1301
1302        self.lifecycle.start().await;
1303
1304        // Convert ToSocketAddrs to SocketAddr
1305        let socket_addr = match addr.to_socket_addrs() {
1306            Ok(mut addrs) => match addrs.next() {
1307                Some(addr) => addr,
1308                None => {
1309                    tracing::error!("No socket address resolved from provided address");
1310                    self.lifecycle.shutdown().await;
1311                    return Err(crate::ServerError::configuration("Invalid socket address"));
1312                }
1313            },
1314            Err(e) => {
1315                tracing::error!(error = %e, "Failed to resolve socket address");
1316                self.lifecycle.shutdown().await;
1317                return Err(crate::ServerError::configuration(format!(
1318                    "Address resolution failed: {e}"
1319                )));
1320            }
1321        };
1322
1323        let transport = TcpTransport::new_server(socket_addr);
1324        if let Err(e) = transport.connect().await {
1325            tracing::error!(error = %e, "Failed to connect TCP transport");
1326            self.lifecycle.shutdown().await;
1327            return Err(e.into());
1328        }
1329
1330        // BIDIRECTIONAL TCP SETUP
1331        // Create generic transport dispatcher for server-initiated requests
1332        let dispatcher = crate::runtime::TransportDispatcher::new(transport);
1333
1334        // Configure router's bidirectional support with the TCP dispatcher
1335        // This enables ctx.elicit(), ctx.create_message(), ctx.list_roots(), etc.
1336        let router = Arc::make_mut(&mut self.router);
1337        router.set_server_request_dispatcher(dispatcher.clone());
1338
1339        // Run TCP with full bidirectional support (MCP 2025-11-25 compliant)
1340        // This uses the generic bidirectional runtime that handles both:
1341        // - Client→Server requests (tools, resources, prompts)
1342        // - Server→Client requests (sampling, elicitation, roots, ping)
1343        crate::runtime::run_transport_bidirectional(self.router.clone(), dispatcher)
1344            .await
1345            .map_err(|e| crate::ServerError::Handler {
1346                message: format!("TCP bidirectional runtime failed: {}", e),
1347                context: Some("run_tcp".to_string()),
1348            })
1349    }
1350
1351    /// Run server with Unix socket transport (progressive enhancement - runtime configuration)
1352    #[cfg(all(feature = "unix", unix))]
1353    #[tracing::instrument(skip(self), fields(
1354        transport = "unix",
1355        service_name = %self.config.name,
1356        service_version = %self.config.version,
1357        path = ?path.as_ref()
1358    ))]
1359    pub async fn run_unix<P: AsRef<std::path::Path>>(mut self, path: P) -> ServerResult<()> {
1360        use std::path::PathBuf;
1361        use turbomcp_transport::UnixTransport;
1362
1363        // Start performance monitoring for Unix server
1364        let _perf_span = info_span!("server.run", transport = "unix").entered();
1365        info!(path = ?path.as_ref(), "Starting MCP server with Unix socket transport");
1366
1367        self.lifecycle.start().await;
1368
1369        let socket_path = PathBuf::from(path.as_ref());
1370        let transport = UnixTransport::new_server(socket_path);
1371        if let Err(e) = transport.connect().await {
1372            tracing::error!(error = %e, "Failed to connect Unix socket transport");
1373            self.lifecycle.shutdown().await;
1374            return Err(e.into());
1375        }
1376
1377        // BIDIRECTIONAL UNIX SOCKET SETUP
1378        // Create generic transport dispatcher for server-initiated requests
1379        let dispatcher = crate::runtime::TransportDispatcher::new(transport);
1380
1381        // Configure router's bidirectional support with the Unix socket dispatcher
1382        // This enables ctx.elicit(), ctx.create_message(), ctx.list_roots(), etc.
1383        let router = Arc::make_mut(&mut self.router);
1384        router.set_server_request_dispatcher(dispatcher.clone());
1385
1386        // Run Unix Socket with full bidirectional support (MCP 2025-11-25 compliant)
1387        // This uses the generic bidirectional runtime that handles both:
1388        // - Client→Server requests (tools, resources, prompts)
1389        // - Server→Client requests (sampling, elicitation, roots, ping)
1390        crate::runtime::run_transport_bidirectional(self.router.clone(), dispatcher)
1391            .await
1392            .map_err(|e| crate::ServerError::Handler {
1393                message: format!("Unix socket bidirectional runtime failed: {}", e),
1394                context: Some("run_unix".to_string()),
1395            })
1396    }
1397
1398    /// Generic transport runner (DRY principle)
1399    /// Used by feature-gated transport methods (http, tcp, websocket, unix)
1400    #[allow(dead_code)]
1401    #[tracing::instrument(skip(self, transport), fields(
1402        service_name = %self.config.name,
1403        service_version = %self.config.version
1404    ))]
1405    async fn run_with_transport<T: Transport>(&self, mut transport: T) -> ServerResult<()> {
1406        // Install signal handlers for graceful shutdown (Ctrl+C / SIGTERM)
1407        let lifecycle_for_sigint = self.lifecycle.clone();
1408        tokio::spawn(async move {
1409            if let Err(e) = tokio::signal::ctrl_c().await {
1410                tracing::warn!(error = %e, "Failed to install Ctrl+C handler");
1411                return;
1412            }
1413            tracing::info!("Ctrl+C received, initiating shutdown");
1414            lifecycle_for_sigint.shutdown().await;
1415        });
1416
1417        #[cfg(unix)]
1418        {
1419            let lifecycle_for_sigterm = self.lifecycle.clone();
1420            tokio::spawn(async move {
1421                use tokio::signal::unix::{SignalKind, signal};
1422                match signal(SignalKind::terminate()) {
1423                    Ok(mut sigterm) => {
1424                        sigterm.recv().await;
1425                        tracing::info!("SIGTERM received, initiating shutdown");
1426                        lifecycle_for_sigterm.shutdown().await;
1427                    }
1428                    Err(e) => tracing::warn!(error = %e, "Failed to install SIGTERM handler"),
1429                }
1430            });
1431        }
1432
1433        // Shutdown signal
1434        let mut shutdown = self.lifecycle.shutdown_signal();
1435
1436        // Main message processing loop
1437        loop {
1438            tokio::select! {
1439                _ = shutdown.recv() => {
1440                    tracing::info!("Shutdown signal received");
1441                    break;
1442                }
1443                res = transport.receive() => {
1444                    match res {
1445                        Ok(Some(message)) => {
1446                            if let Err(e) = self.handle_transport_message(&mut transport, message).await {
1447                                tracing::warn!(error = %e, "Failed to handle transport message");
1448                            }
1449                        }
1450                        Ok(None) => {
1451                            // No message available; sleep briefly to avoid busy loop
1452                            sleep(Duration::from_millis(5)).await;
1453                        }
1454                        Err(e) => {
1455                            match e {
1456                                TransportError::ReceiveFailed(msg) if msg.contains("disconnected") => {
1457                                    tracing::info!("Transport receive channel disconnected; shutting down");
1458                                    break;
1459                                }
1460                                _ => {
1461                                    tracing::error!(error = %e, "Transport receive failed");
1462                                    // Backoff on errors
1463                                    sleep(Duration::from_millis(50)).await;
1464                                }
1465                            }
1466                        }
1467                    }
1468                }
1469            }
1470        }
1471
1472        // Disconnect transport
1473        if let Err(e) = transport.disconnect().await {
1474            tracing::warn!(error = %e, "Error while disconnecting transport");
1475        }
1476
1477        tracing::info!("Server shutdown complete");
1478        Ok(())
1479    }
1480}
1481
1482// Compile-time assertion that McpServer is Send + Clone (Tower pattern)
1483// Note: McpServer is Clone but NOT Sync (due to BoxCloneService being !Sync)
1484// This is intentional and follows the Axum/Tower design pattern
1485#[allow(dead_code)]
1486const _: () = {
1487    const fn assert_send_clone<T: Send + Clone>() {}
1488    const fn check() {
1489        assert_send_clone::<crate::server::core::McpServer>();
1490    }
1491};