turbomcp_server/server/
core.rs

1//! Core MCP server implementation
2//!
3//! Contains the main McpServer struct and its core functionality including
4//! middleware building, lifecycle management, and server construction.
5
6use std::sync::Arc;
7use tracing::{info, info_span};
8
9#[cfg(feature = "http")]
10use tracing::warn;
11
12use crate::{
13    config::ServerConfig,
14    error::ServerResult,
15    lifecycle::{HealthStatus, ServerLifecycle},
16    metrics::ServerMetrics,
17    registry::HandlerRegistry,
18    routing::RequestRouter,
19    service::McpService,
20};
21
22#[cfg(feature = "middleware")]
23use crate::middleware::MiddlewareStack;
24
25use bytes::Bytes;
26#[cfg(feature = "middleware")]
27use http::StatusCode;
28use http::{Request, Response};
29use tokio::time::{Duration, sleep};
30use turbomcp_transport::Transport;
31use turbomcp_transport::core::TransportError;
32
33use super::shutdown::ShutdownHandle;
34
35/// Check if logging should be enabled for STDIO transport
36///
37/// For MCP STDIO transport compliance, logging is disabled by default since stdout
38/// must be reserved exclusively for JSON-RPC messages. This can be overridden by
39/// setting the TURBOMCP_FORCE_LOGGING environment variable.
40pub(crate) fn should_log_for_stdio() -> bool {
41    std::env::var("TURBOMCP_FORCE_LOGGING").is_ok()
42}
43
44/// Wrapper that holds router + headers + tenant_id and implements JsonRpcHandler
45/// This allows us to pass headers and tenant info to create_context without storing them on the router.
46/// Used by both HTTP and WebSocket transports.
47#[cfg(any(feature = "http", feature = "websocket"))]
48struct HttpHandlerWithHeaders {
49    router: crate::routing::RequestRouter,
50    headers: Option<std::collections::HashMap<String, String>>,
51    transport: &'static str,
52    tenant_id: Option<String>,
53}
54
55#[cfg(any(feature = "http", feature = "websocket"))]
56#[async_trait::async_trait]
57impl turbomcp_protocol::JsonRpcHandler for HttpHandlerWithHeaders {
58    async fn handle_request(&self, req_value: serde_json::Value) -> serde_json::Value {
59        use turbomcp_protocol::jsonrpc::JsonRpcRequest;
60
61        // Parse the request
62        let req: JsonRpcRequest = match serde_json::from_value(req_value) {
63            Ok(r) => r,
64            Err(e) => {
65                return serde_json::json!({
66                    "jsonrpc": "2.0",
67                    "error": {
68                        "code": -32700,
69                        "message": format!("Parse error: {}", e)
70                    },
71                    "id": null
72                });
73            }
74        };
75
76        // Create context with headers, transport type, and tenant_id
77        // tenant_id is extracted from request extensions by the HTTP/WebSocket handlers
78        // if TenantExtractionLayer middleware was applied
79        let ctx = self.router.create_context(
80            self.headers.clone(),
81            Some(self.transport),
82            self.tenant_id.clone(),
83        );
84
85        // Route the request
86        let response = self.router.route(req, ctx).await;
87
88        // Serialize response
89        match serde_json::to_value(&response) {
90            Ok(v) => v,
91            Err(e) => {
92                serde_json::json!({
93                    "jsonrpc": "2.0",
94                    "error": {
95                        "code": -32603,
96                        "message": format!("Internal error: failed to serialize response: {}", e)
97                    },
98                    "id": response.id
99                })
100            }
101        }
102    }
103
104    fn server_info(&self) -> turbomcp_protocol::ServerInfo {
105        self.router.server_info()
106    }
107}
108
109/// Main MCP server following the Axum/Tower Clone pattern
110///
111/// ## Sharing Pattern
112///
113/// `McpServer` implements `Clone` like Axum's `Router`. All heavy state is Arc-wrapped
114/// internally, making cloning cheap (just atomic reference count increments).
115///
116/// ```rust,no_run
117/// use turbomcp_server::ServerBuilder;
118///
119/// # async fn example() {
120/// let server = ServerBuilder::new().build();
121///
122/// // Clone for passing to functions (cheap - just Arc increments)
123/// let server1 = server.clone();
124/// let server2 = server.clone();
125///
126/// // Access config and health
127/// let config = server1.config();
128/// println!("Server: {}", config.name);
129///
130/// let health = server2.health().await;
131/// println!("Health: {:?}", health);
132/// # }
133/// ```
134///
135/// ## Architecture Notes
136///
137/// The `service` field contains `BoxCloneService` which is `Send + Clone` but NOT `Sync`.
138/// This is intentional and follows Tower's design - users clone the server instead of
139/// Arc-wrapping it.
140///
141/// **Architecture Note**: The service field provides tower::Service integration for
142/// advanced middleware patterns. The request processing pipeline currently uses the
143/// RequestRouter directly. Tower integration can be added via custom middleware layers
144/// when needed for specific use cases (e.g., custom rate limiting, advanced tracing).
145#[derive(Clone)]
146pub struct McpServer {
147    /// Server configuration (Clone-able)
148    pub(crate) config: ServerConfig,
149    /// Handler registry (Arc-wrapped for cheap cloning)
150    pub(crate) registry: Arc<HandlerRegistry>,
151    /// Request router (Arc-wrapped for cheap cloning)
152    pub(crate) router: Arc<RequestRouter>,
153    /// Tower middleware service stack (Clone but !Sync - this is the Tower pattern)
154    ///
155    /// All requests flow through this service stack, which provides:
156    /// - Timeout enforcement
157    /// - Request validation
158    /// - Authorization checks
159    /// - Rate limiting
160    /// - Audit logging
161    /// - And more middleware layers as configured
162    ///
163    /// See `server/transport.rs` for integration with transport layer.
164    pub(crate) service:
165        tower::util::BoxCloneService<Request<Bytes>, Response<Bytes>, crate::ServerError>,
166    /// Server lifecycle (Arc-wrapped for cheap cloning)
167    pub(crate) lifecycle: Arc<ServerLifecycle>,
168    /// Server metrics (Arc-wrapped for cheap cloning)
169    pub(crate) metrics: Arc<ServerMetrics>,
170    /// Task storage for MCP Tasks API (SEP-1686)
171    #[cfg(feature = "mcp-tasks")]
172    pub(crate) task_storage: Arc<crate::task_storage::TaskStorage>,
173}
174
175impl std::fmt::Debug for McpServer {
176    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
177        f.debug_struct("McpServer")
178            .field("config", &self.config)
179            .finish()
180    }
181}
182
183impl McpServer {
184    /// Build comprehensive Tower middleware stack (transport-agnostic)
185    ///
186    /// ## Architecture
187    ///
188    /// This creates a complete Tower service stack with conditional middleware layers:
189    /// - **Timeout Layer**: Request timeout enforcement (tower_http)
190    /// - **Validation Layer**: JSON-RPC structure validation
191    /// - **Authorization Layer**: Resource access control
192    /// - **Core Service**: JSON-RPC routing and handler execution
193    ///
194    /// All middleware is composed using Tower's ServiceBuilder pattern, which provides:
195    /// - Top-to-bottom execution order
196    /// - Type-safe layer composition
197    /// - Zero-cost abstractions
198    /// - Clone-able service instances
199    ///
200    /// ## Integration
201    ///
202    /// The resulting BoxCloneService is stored in `self.service` and called from
203    /// `server/transport.rs` for every incoming request. This ensures ALL requests
204    /// flow through the complete middleware pipeline before reaching handlers.
205    ///
206    /// ## Adding Middleware
207    ///
208    /// To add new middleware, update the match arms below to include your layer.
209    /// Follow the pattern of conditional inclusion based on config flags.
210    #[cfg(feature = "middleware")]
211    fn build_middleware_stack(
212        core_service: McpService,
213        stack: MiddlewareStack,
214    ) -> tower::util::BoxCloneService<Request<Bytes>, Response<Bytes>, crate::ServerError> {
215        // COMPREHENSIVE TOWER COMPOSITION - Conditional Layer Stacking
216        //
217        // This approach builds the middleware stack incrementally, boxing at each step.
218        // While this has a small performance cost from multiple boxing operations,
219        // it provides several critical advantages:
220        //
221        // 1. **Maintainability**: No combinatorial explosion (8 match arms → simple chain)
222        // 2. **Extensibility**: Adding new middleware requires only one new block
223        // 3. **Clarity**: Each layer's purpose and configuration is explicit
224        // 4. **Type Safety**: BoxCloneService provides type erasure while preserving Clone
225        //
226        // Performance note: The boxing overhead is negligible compared to network I/O
227        // and handler execution time. Modern allocators make this essentially free.
228
229        // Start with core service as a boxed service for uniform type handling
230        let mut service: tower::util::BoxCloneService<
231            Request<Bytes>,
232            Response<Bytes>,
233            crate::ServerError,
234        > = tower::util::BoxCloneService::new(core_service);
235
236        // Authorization layer removed in 2.0.0 - handle at application layer
237
238        // Layer 2: Validation
239        // Validates request structure after auth but before processing
240        #[cfg(feature = "middleware")]
241        {
242            if let Some(validation_layer) = stack.validation_layer() {
243                service = tower::util::BoxCloneService::new(
244                    tower::ServiceBuilder::new()
245                        .layer(validation_layer)
246                        .service(service),
247                );
248            }
249        }
250
251        // Layer 3: Timeout (outermost)
252        // Applied last so it can enforce timeout on the entire request pipeline
253        #[cfg(feature = "middleware")]
254        {
255            if let Some(timeout_config) = stack.timeout_config
256                && timeout_config.enabled
257            {
258                service = tower::util::BoxCloneService::new(
259                    tower::ServiceBuilder::new()
260                        .layer(tower_http::timeout::TimeoutLayer::with_status_code(
261                            StatusCode::REQUEST_TIMEOUT,
262                            timeout_config.request_timeout,
263                        ))
264                        .service(service),
265                );
266            }
267        }
268
269        // Future middleware can be added here with similar if-let blocks:
270        // if let Some(auth_config) = stack.auth_config { ... }
271        // if let Some(audit_config) = stack.audit_config { ... }
272        // if let Some(rate_limit_config) = stack.rate_limit_config { ... }
273
274        service
275    }
276
277    /// Create a new server
278    #[must_use]
279    pub fn new(config: ServerConfig) -> Self {
280        Self::new_with_registry(config, HandlerRegistry::new())
281    }
282
283    /// Create a new server with an existing registry (used by ServerBuilder)
284    #[must_use]
285    pub(crate) fn new_with_registry(config: ServerConfig, registry: HandlerRegistry) -> Self {
286        let registry = Arc::new(registry);
287        let metrics = Arc::new(ServerMetrics::new());
288
289        // Initialize task storage (SEP-1686)
290        #[cfg(feature = "mcp-tasks")]
291        let task_storage = {
292            use tokio::time::Duration;
293            let storage = crate::task_storage::TaskStorage::new(Duration::from_secs(60));
294            // Start background cleanup task
295            storage.start_cleanup();
296            Arc::new(storage)
297        };
298
299        let router = Arc::new(RequestRouter::new(
300            Arc::clone(&registry),
301            Arc::clone(&metrics),
302            config.clone(),
303            #[cfg(feature = "mcp-tasks")]
304            Some(Arc::clone(&task_storage)),
305        ));
306        // Build middleware stack configuration
307        #[cfg(feature = "middleware")]
308        #[cfg_attr(not(feature = "rate-limiting"), allow(unused_mut))]
309        let mut stack = crate::middleware::MiddlewareStack::new();
310
311        // Auto-install rate limiting if enabled in config
312        #[cfg(feature = "rate-limiting")]
313        if config.rate_limiting.enabled {
314            use crate::middleware::rate_limit::{RateLimitStrategy, RateLimits};
315            use std::num::NonZeroU32;
316            use std::time::Duration;
317
318            let rate_config = crate::middleware::RateLimitConfig {
319                strategy: RateLimitStrategy::Global,
320                limits: RateLimits {
321                    requests_per_period: NonZeroU32::new(
322                        config.rate_limiting.requests_per_second * 60,
323                    )
324                    .unwrap(), // Convert per-second to per-minute
325                    period: Duration::from_secs(60),
326                    burst_size: Some(NonZeroU32::new(config.rate_limiting.burst_capacity).unwrap()),
327                },
328                enabled: true,
329            };
330
331            stack = stack.with_rate_limit(rate_config);
332        }
333
334        // Create core MCP service
335        let core_service = McpService::new(
336            Arc::clone(&registry),
337            Arc::clone(&router),
338            Arc::clone(&metrics),
339        );
340
341        // COMPREHENSIVE TOWER SERVICE COMPOSITION
342        // Build the complete middleware stack with proper type erasure
343        //
344        // This service is called from server/transport.rs for EVERY incoming request:
345        // TransportMessage -> http::Request -> service.call() -> http::Response -> TransportMessage
346        //
347        // The Tower middleware stack provides:
348        // ✓ Timeout enforcement (configurable per-request)
349        // ✓ Request validation (JSON-RPC structure)
350        // ✓ Authorization checks (resource access control)
351        // ✓ Rate limiting (if enabled in config)
352        // ✓ Audit logging (configurable)
353        // ✓ And more layers as configured
354        //
355        // BoxCloneService is Clone but !Sync - this is the Tower pattern
356        #[cfg(feature = "middleware")]
357        let service = Self::build_middleware_stack(core_service, stack);
358
359        #[cfg(not(feature = "middleware"))]
360        let service = tower::util::BoxCloneService::new(core_service);
361
362        let lifecycle = Arc::new(ServerLifecycle::new());
363
364        Self {
365            config,
366            registry,
367            router,
368            service,
369            lifecycle,
370            metrics,
371            #[cfg(feature = "mcp-tasks")]
372            task_storage,
373        }
374    }
375
376    /// Get server configuration
377    #[must_use]
378    pub const fn config(&self) -> &ServerConfig {
379        &self.config
380    }
381
382    /// Get handler registry
383    #[must_use]
384    pub const fn registry(&self) -> &Arc<HandlerRegistry> {
385        &self.registry
386    }
387
388    /// Get request router
389    #[must_use]
390    pub const fn router(&self) -> &Arc<RequestRouter> {
391        &self.router
392    }
393
394    /// Get server lifecycle
395    #[must_use]
396    pub const fn lifecycle(&self) -> &Arc<ServerLifecycle> {
397        &self.lifecycle
398    }
399
400    /// Get server metrics
401    #[must_use]
402    pub const fn metrics(&self) -> &Arc<ServerMetrics> {
403        &self.metrics
404    }
405
406    /// Get task storage (MCP Tasks API - SEP-1686)
407    ///
408    /// Returns the task storage instance for managing long-running operations.
409    /// Only available when the `mcp-tasks` feature is enabled.
410    #[cfg(feature = "mcp-tasks")]
411    #[must_use]
412    pub const fn task_storage(&self) -> &Arc<crate::task_storage::TaskStorage> {
413        &self.task_storage
414    }
415
416    /// Get the Tower service stack (test accessor)
417    ///
418    /// **Note**: This is primarily for integration testing. Production code should
419    /// use the transport layer which calls the service internally via
420    /// `handle_transport_message()`.
421    ///
422    /// Returns a clone of the Tower service stack, which is cheap (BoxCloneService
423    /// is designed for cloning).
424    #[doc(hidden)]
425    pub fn service(
426        &self,
427    ) -> tower::util::BoxCloneService<Request<Bytes>, Response<Bytes>, crate::ServerError> {
428        self.service.clone()
429    }
430
431    /// Get a shutdown handle for graceful server termination
432    ///
433    /// This handle enables external control over server shutdown, essential for:
434    /// - **Production deployments**: Graceful shutdown on SIGTERM/SIGINT
435    /// - **Container orchestration**: Kubernetes graceful pod termination
436    /// - **Load balancer integration**: Health check coordination
437    /// - **Multi-component systems**: Coordinated shutdown sequences
438    /// - **Maintenance operations**: Planned downtime and updates
439    ///
440    /// # Examples
441    ///
442    /// ## Basic shutdown coordination
443    /// ```no_run
444    /// # use turbomcp_server::ServerBuilder;
445    /// # #[tokio::main]
446    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
447    /// let server = ServerBuilder::new().build();
448    /// let shutdown_handle = server.shutdown_handle();
449    ///
450    /// // Coordinate with other services
451    /// tokio::spawn(async move {
452    ///     // Wait for external shutdown signal
453    ///     tokio::signal::ctrl_c().await.expect("Failed to install Ctrl+C handler");
454    ///     println!("Shutdown signal received, terminating gracefully...");
455    ///     shutdown_handle.shutdown().await;
456    /// });
457    ///
458    /// // Server will gracefully shut down when signaled
459    /// // server.run_stdio().await?;
460    /// # Ok(())
461    /// # }
462    /// ```
463    ///
464    /// ## Container/Kubernetes deployment
465    /// ```no_run
466    /// # use turbomcp_server::ServerBuilder;
467    /// # use std::sync::Arc;
468    /// # #[tokio::main]
469    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
470    /// let server = ServerBuilder::new().build();
471    /// let shutdown_handle = server.shutdown_handle();
472    /// let shutdown_handle_clone = shutdown_handle.clone();
473    ///
474    /// // Handle multiple signal types with proper platform support
475    /// tokio::spawn(async move {
476    ///     #[cfg(unix)]
477    ///     {
478    ///         use tokio::signal::unix::{signal, SignalKind};
479    ///         let mut sigterm = signal(SignalKind::terminate()).unwrap();
480    ///         tokio::select! {
481    ///             _ = tokio::signal::ctrl_c() => {
482    ///                 println!("SIGINT received");
483    ///             }
484    ///             _ = sigterm.recv() => {
485    ///                 println!("SIGTERM received");
486    ///             }
487    ///         }
488    ///     }
489    ///     #[cfg(not(unix))]
490    ///     {
491    ///         tokio::signal::ctrl_c().await.expect("Failed to install Ctrl+C handler");
492    ///         println!("SIGINT received");
493    ///     }
494    ///     shutdown_handle_clone.shutdown().await;
495    /// });
496    ///
497    /// // Server handles graceful shutdown automatically
498    /// // server.run_tcp("0.0.0.0:8080").await?;
499    /// # Ok(())
500    /// # }
501    /// ```
502    pub fn shutdown_handle(&self) -> ShutdownHandle {
503        ShutdownHandle::new(self.lifecycle.clone())
504    }
505
506    /// Run the server with STDIO transport
507    ///
508    /// # Errors
509    ///
510    /// Returns [`crate::ServerError::Transport`] if:
511    /// - STDIO transport connection fails
512    /// - Message sending/receiving fails
513    /// - Transport disconnection fails
514    #[tracing::instrument(skip(self), fields(
515        transport = "stdio",
516        service_name = %self.config.name,
517        service_version = %self.config.version
518    ))]
519    pub async fn run_stdio(mut self) -> ServerResult<()> {
520        // For STDIO transport, disable logging unless explicitly overridden
521        // STDIO stdout must be reserved exclusively for JSON-RPC messages per MCP protocol
522        if should_log_for_stdio() {
523            info!("Starting MCP server with STDIO transport");
524        }
525
526        // Start performance monitoring for STDIO server
527        let _perf_span = info_span!("server.run", transport = "stdio").entered();
528        info!("Initializing STDIO transport for MCP server");
529
530        self.lifecycle.start().await;
531
532        // BIDIRECTIONAL STDIO SETUP
533        // Create STDIO dispatcher for server-initiated requests (sampling, elicitation, roots, ping)
534        let (request_tx, request_rx) = tokio::sync::mpsc::unbounded_channel();
535
536        // Use fully-qualified path to avoid ambiguity with the turbomcp crate's runtime module
537        let dispatcher = crate::runtime::StdioDispatcher::new(request_tx);
538
539        // Configure router's bidirectional support with the STDIO dispatcher
540        // SAFETY: We have &mut self, so we can safely get mutable access to the Arc'd router
541        // This is the CRITICAL STEP that was missing - without this, all server→client requests fail
542        let router = Arc::make_mut(&mut self.router);
543        router.set_server_request_dispatcher(dispatcher.clone());
544
545        // Run STDIO with full bidirectional support (MCP 2025-11-25 compliant)
546        // This uses the bidirectional-aware runtime that handles both:
547        // - Client→Server requests (tools, resources, prompts)
548        // - Server→Client requests (sampling, elicitation, roots, ping)
549        crate::runtime::run_stdio_bidirectional(self.router.clone(), dispatcher, request_rx)
550            .await
551            .map_err(|e| crate::ServerError::Handler {
552                message: format!("STDIO bidirectional runtime failed: {}", e),
553                context: Some("run_stdio".to_string()),
554            })
555    }
556
557    /// Get health status
558    pub async fn health(&self) -> HealthStatus {
559        self.lifecycle.health().await
560    }
561
562    /// Run server with HTTP transport using default configuration
563    ///
564    /// This provides a working HTTP server with:
565    /// - Standard HTTP POST/GET/DELETE for MCP protocol at `/mcp`
566    /// - Full MCP 2025-11-25 protocol compliance
567    /// - Graceful shutdown support
568    /// - Default rate limiting (100 req/60s)
569    /// - Default security settings (localhost allowed, CORS disabled)
570    ///
571    /// For custom configuration (rate limits, security, CORS), use `run_http_with_config`.
572    ///
573    /// # Examples
574    ///
575    /// ## Basic usage with default configuration
576    /// ```no_run
577    /// use turbomcp_server::ServerBuilder;
578    ///
579    /// #[tokio::main]
580    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
581    ///     let server = ServerBuilder::new()
582    ///         .name("my-server")
583    ///         .version("1.0.0")
584    ///         .build();
585    ///
586    ///     server.run_http("127.0.0.1:3000").await?;
587    ///     Ok(())
588    /// }
589    /// ```
590    ///
591    /// ## With custom configuration
592    /// ```no_run
593    /// use turbomcp_server::ServerBuilder;
594    /// use turbomcp_transport::streamable_http::StreamableHttpConfigBuilder;
595    /// use std::time::Duration;
596    ///
597    /// #[tokio::main]
598    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
599    ///     let server = ServerBuilder::new()
600    ///         .name("my-server")
601    ///         .version("1.0.0")
602    ///         .build();
603    ///
604    ///     let config = StreamableHttpConfigBuilder::new()
605    ///         .with_bind_address("127.0.0.1:3000")
606    ///         .allow_any_origin(true)  // Enable CORS for development
607    ///         .build();
608    ///
609    ///     server.run_http_with_config("127.0.0.1:3000", config).await?;
610    ///     Ok(())
611    /// }
612    /// ```
613    ///
614    /// # Errors
615    ///
616    /// Returns [`crate::ServerError::Transport`] if:
617    /// - Address resolution fails
618    /// - HTTP server fails to start
619    /// - Transport disconnection fails
620    #[cfg(feature = "http")]
621    #[tracing::instrument(skip(self), fields(
622        transport = "http",
623        service_name = %self.config.name,
624        service_version = %self.config.version,
625        addr = ?addr
626    ))]
627    pub async fn run_http<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
628        self,
629        addr: A,
630    ) -> ServerResult<()> {
631        use turbomcp_transport::streamable_http::StreamableHttpConfigBuilder;
632
633        // Build default configuration
634        let config = StreamableHttpConfigBuilder::new().build();
635
636        self.run_http_with_config(addr, config).await
637    }
638
639    /// Run server with HTTP transport and custom configuration
640    ///
641    /// This provides full control over HTTP server configuration including:
642    /// - Rate limiting (requests per time window, or disabled entirely)
643    /// - Security settings (CORS, origin validation, authentication)
644    /// - Network settings (bind address, endpoint path, keep-alive)
645    /// - Advanced settings (replay buffer size, etc.)
646    ///
647    /// # Bind Address Configuration
648    ///
649    /// **IMPORTANT**: The `addr` parameter takes precedence over `config.bind_addr`.
650    /// If they differ, a deprecation warning is logged.
651    ///
652    /// **Best Practice** (recommended for forward compatibility):
653    /// ```no_run
654    /// # use turbomcp_server::ServerBuilder;
655    /// # use turbomcp_transport::streamable_http::StreamableHttpConfigBuilder;
656    /// # #[tokio::main]
657    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
658    /// let config = StreamableHttpConfigBuilder::new()
659    ///     .with_bind_address("127.0.0.1:3001")  // Set bind address in config
660    ///     .build();
661    ///
662    /// // Pass matching addr parameter (or use default "127.0.0.1:8080")
663    /// ServerBuilder::new()
664    ///     .build()
665    ///     .run_http_with_config("127.0.0.1:3001", config).await?;
666    /// # Ok(())
667    /// # }
668    /// ```
669    ///
670    /// **Deprecated** (will be removed in v3.x):
671    /// ```no_run
672    /// # use turbomcp_server::ServerBuilder;
673    /// # use turbomcp_transport::streamable_http::StreamableHttpConfigBuilder;
674    /// # #[tokio::main]
675    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
676    /// // ⚠️ Avoid setting different addresses - causes deprecation warning
677    /// let config = StreamableHttpConfigBuilder::new()
678    ///     .with_bind_address("0.0.0.0:5000")  // This is ignored!
679    ///     .build();
680    ///
681    /// // The addr parameter wins (3001 is used, not 5000)
682    /// ServerBuilder::new()
683    ///     .build()
684    ///     .run_http_with_config("127.0.0.1:3001", config).await?;
685    /// # Ok(())
686    /// # }
687    /// ```
688    ///
689    /// **Future (v3.x)**: The `addr` parameter will be removed. Configure bind address
690    /// via `config.with_bind_address()` only.
691    ///
692    /// # Examples
693    ///
694    /// ## Custom configuration example
695    /// ```no_run
696    /// use turbomcp_server::ServerBuilder;
697    /// use turbomcp_transport::streamable_http::StreamableHttpConfigBuilder;
698    /// use std::time::Duration;
699    ///
700    /// #[tokio::main]
701    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
702    ///     let server = ServerBuilder::new()
703    ///         .name("custom-server")
704    ///         .version("1.0.0")
705    ///         .build();
706    ///
707    ///     let config = StreamableHttpConfigBuilder::new()
708    ///         .with_bind_address("127.0.0.1:3000")
709    ///         .with_rate_limit(1000, Duration::from_secs(60))  // 1000 req/min
710    ///         .build();
711    ///
712    ///     server.run_http_with_config("127.0.0.1:3000", config).await?;
713    ///     Ok(())
714    /// }
715    /// ```
716    ///
717    /// ## Production configuration (secure, rate limited)
718    /// ```no_run
719    /// use turbomcp_server::ServerBuilder;
720    /// use turbomcp_transport::streamable_http::StreamableHttpConfigBuilder;
721    /// use std::time::Duration;
722    ///
723    /// #[tokio::main]
724    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
725    ///     let server = ServerBuilder::new()
726    ///         .name("production-server")
727    ///         .version("1.0.0")
728    ///         .build();
729    ///
730    ///     let config = StreamableHttpConfigBuilder::new()
731    ///         .with_bind_address("127.0.0.1:3000")
732    ///         .with_rate_limit(1000, Duration::from_secs(60))  // 1000 req/min
733    ///         .allow_any_origin(false)  // Strict CORS
734    ///         .require_authentication(true)  // Require auth
735    ///         .build();
736    ///
737    ///     server.run_http_with_config("127.0.0.1:3000", config).await?;
738    ///     Ok(())
739    /// }
740    /// ```
741    ///
742    /// # Errors
743    ///
744    /// Returns [`crate::ServerError::Transport`] if:
745    /// - Address resolution fails
746    /// - HTTP server fails to start
747    /// - Transport disconnection fails
748    #[cfg(feature = "http")]
749    #[tracing::instrument(skip(self, config), fields(
750        transport = "http",
751        service_name = %self.config.name,
752        service_version = %self.config.version,
753        addr = ?addr
754    ))]
755    pub async fn run_http_with_config<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
756        self,
757        addr: A,
758        mut config: turbomcp_transport::streamable_http::StreamableHttpConfig,
759    ) -> ServerResult<()> {
760        use std::collections::HashMap;
761        use tokio::sync::{Mutex, RwLock};
762
763        // Sprint 2.6: Check for insecure 0.0.0.0 binding
764        crate::security_checks::check_binding_security(&addr);
765
766        info!("Starting MCP server with HTTP transport");
767
768        self.lifecycle.start().await;
769
770        // Resolve address to string
771        let socket_addr = addr
772            .to_socket_addrs()
773            .map_err(|e| crate::ServerError::configuration(format!("Invalid address: {}", e)))?
774            .next()
775            .ok_or_else(|| crate::ServerError::configuration("No address resolved"))?;
776
777        info!("Resolved address: {}", socket_addr);
778
779        // Check for conflicting bind addresses and warn about deprecation
780        let socket_addr_str = socket_addr.to_string();
781        if config.bind_addr != socket_addr_str {
782            warn!(
783                addr_parameter = %socket_addr_str,
784                config_bind_addr = %config.bind_addr,
785                "⚠️  DEPRECATION WARNING: The `addr` parameter takes precedence over `config.bind_addr`"
786            );
787            warn!(
788                "⚠️  In TurboMCP v3.x, the `addr` parameter will be removed. Please use StreamableHttpConfigBuilder::new().with_bind_address(\"{}\").build() instead",
789                socket_addr_str
790            );
791            warn!(
792                "⚠️  Avoid setting both `addr` parameter and `config.bind_addr` to prevent confusion"
793            );
794
795            // Update config to single source of truth
796            config.bind_addr = socket_addr_str.clone();
797            config.base_url = format!("http://{}", socket_addr_str);
798        }
799
800        info!(
801            config = ?config,
802            "HTTP configuration (updated with resolved address)"
803        );
804
805        // BIDIRECTIONAL HTTP SETUP
806        // Create shared state for session management and bidirectional MCP
807        let sessions = Arc::new(RwLock::new(HashMap::new()));
808        let pending_requests = Arc::new(Mutex::new(HashMap::new()));
809
810        // Share router across all sessions (routing logic and handler registry)
811        let router = self.router.clone();
812
813        // Capture server identity for MCP protocol compliance
814        let server_info = turbomcp_protocol::ServerInfo {
815            name: self.config.name.clone(),
816            version: self.config.version.clone(),
817        };
818
819        // Factory pattern: create session-specific router for each HTTP request
820        // This is the clean architecture that HTTP requires - each session gets its own
821        // bidirectional dispatcher while sharing the routing logic
822        let sessions_for_factory = Arc::clone(&sessions);
823        let pending_for_factory = Arc::clone(&pending_requests);
824        let router_for_factory = Arc::clone(&router);
825
826        // Create a wrapper that converts headers and delegates to router
827        // This is cleaner than storing headers on the router itself
828        let handler_factory = move |session_id: Option<String>,
829                                    headers: Option<axum::http::HeaderMap>,
830                                    tenant_id: Option<String>| {
831            let session_id = session_id.unwrap_or_else(|| {
832                let new_id = uuid::Uuid::new_v4().to_string();
833                tracing::debug!(
834                    "HTTP POST without session ID - generating ephemeral ID for request: {}",
835                    new_id
836                );
837                new_id
838            });
839
840            tracing::debug!("Factory creating handler for session: {}", session_id);
841
842            // Create session-specific HTTP dispatcher (now local to turbomcp-server!)
843            let dispatcher = crate::runtime::http::HttpDispatcher::new(
844                session_id,
845                Arc::clone(&sessions_for_factory),
846                Arc::clone(&pending_for_factory),
847            );
848
849            // Clone the base router and configure with session-specific dispatcher
850            // CRITICAL: set_server_request_dispatcher also recreates server_to_client adapter
851            let mut session_router = (*router_for_factory).clone();
852            session_router.set_server_request_dispatcher(dispatcher);
853
854            // Convert HeaderMap to HashMap<String, String> for passing to create_context
855            let headers_map = headers.map(|header_map| {
856                header_map
857                    .iter()
858                    .filter_map(|(name, value)| {
859                        value
860                            .to_str()
861                            .ok()
862                            .map(|v| (name.to_string(), v.to_string()))
863                    })
864                    .collect()
865            });
866
867            // Create wrapper that passes headers and tenant_id to create_context (HTTP transport)
868            HttpHandlerWithHeaders {
869                router: session_router,
870                headers: headers_map,
871                transport: "http",
872                tenant_id,
873            }
874        };
875
876        info!(
877            server_name = %server_info.name,
878            server_version = %server_info.version,
879            bind_addr = %socket_addr,
880            endpoint_path = %config.endpoint_path,
881            "HTTP server starting with full bidirectional support (elicitation, sampling, roots, ping)"
882        );
883
884        // Use factory-based HTTP server with full bidirectional support
885        use crate::runtime::http::run_http;
886        run_http(handler_factory, sessions, pending_requests, config)
887            .await
888            .map_err(|e| {
889                tracing::error!(error = %e, "HTTP server failed");
890                crate::ServerError::handler(e.to_string())
891            })?;
892
893        info!("HTTP server shutdown complete");
894        Ok(())
895    }
896
897    /// Run server with HTTP transport and Tower middleware
898    ///
899    /// This method enables advanced features like multi-tenancy, authentication, rate limiting,
900    /// and other cross-cutting concerns by allowing you to apply Tower middleware layers to the
901    /// HTTP router.
902    ///
903    /// # Multi-Tenancy Example
904    ///
905    /// ```no_run
906    /// use turbomcp_server::ServerBuilder;
907    /// use turbomcp_server::middleware::tenancy::{HeaderTenantExtractor, TenantExtractionLayer};
908    /// use tower::ServiceBuilder;
909    ///
910    /// #[tokio::main]
911    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
912    ///     let server = ServerBuilder::new()
913    ///         .name("multi-tenant-server")
914    ///         .version("1.0.0")
915    ///         .build();
916    ///
917    ///     // Create tenant extractor middleware
918    ///     let tenant_extractor = HeaderTenantExtractor::new("X-Tenant-ID");
919    ///     let middleware = ServiceBuilder::new()
920    ///         .layer(TenantExtractionLayer::new(tenant_extractor));
921    ///
922    ///     // Run server with middleware
923    ///     server.run_http_with_middleware(
924    ///         "127.0.0.1:3000",
925    ///         Box::new(move |router| router.layer(middleware))
926    ///     ).await?;
927    ///
928    ///     Ok(())
929    /// }
930    /// ```
931    ///
932    /// # Authentication Example
933    ///
934    /// ```no_run
935    /// use turbomcp_server::ServerBuilder;
936    /// use tower::ServiceBuilder;
937    /// use tower_http::auth::RequireAuthorizationLayer;
938    ///
939    /// # #[tokio::main]
940    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
941    /// let server = ServerBuilder::new()
942    ///     .name("auth-server")
943    ///     .version("1.0.0")
944    ///     .build();
945    ///
946    /// // Add bearer token authentication
947    /// let middleware = ServiceBuilder::new()
948    ///     .layer(RequireAuthorizationLayer::bearer("secret-token"));
949    ///
950    /// server.run_http_with_middleware(
951    ///     "127.0.0.1:3000",
952    ///     Box::new(move |router| router.layer(middleware))
953    /// ).await?;
954    /// # Ok(())
955    /// # }
956    /// ```
957    #[cfg(feature = "http")]
958    #[tracing::instrument(skip(self, middleware_fn), fields(
959        transport = "http",
960        service_name = %self.config.name,
961        service_version = %self.config.version,
962        addr = ?addr
963    ))]
964    pub async fn run_http_with_middleware<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
965        self,
966        addr: A,
967        middleware_fn: Box<dyn FnOnce(axum::Router) -> axum::Router + Send>,
968    ) -> ServerResult<()> {
969        use std::collections::HashMap;
970        use tokio::sync::{Mutex, RwLock};
971
972        // Sprint 2.6: Check for insecure 0.0.0.0 binding
973        crate::security_checks::check_binding_security(&addr);
974
975        info!("Starting MCP server with HTTP transport and custom middleware");
976
977        self.lifecycle.start().await;
978
979        // Resolve address to string
980        let socket_addr = addr
981            .to_socket_addrs()
982            .map_err(|e| crate::ServerError::configuration(format!("Invalid address: {}", e)))?
983            .next()
984            .ok_or_else(|| crate::ServerError::configuration("No address resolved"))?;
985
986        info!("Resolved address: {}", socket_addr);
987
988        // BIDIRECTIONAL HTTP SETUP
989        let sessions = Arc::new(RwLock::new(HashMap::new()));
990        let pending_requests = Arc::new(Mutex::new(HashMap::new()));
991        let router = self.router.clone();
992
993        // Factory pattern for session-specific handlers
994        let sessions_for_factory = Arc::clone(&sessions);
995        let pending_for_factory = Arc::clone(&pending_requests);
996        let router_for_factory = Arc::clone(&router);
997
998        let handler_factory = move |session_id: Option<String>,
999                                    headers: Option<axum::http::HeaderMap>,
1000                                    tenant_id: Option<String>| {
1001            let session_id = session_id.unwrap_or_else(|| {
1002                let new_id = uuid::Uuid::new_v4().to_string();
1003                tracing::debug!(
1004                    "HTTP POST without session ID - generating ephemeral ID for request: {}",
1005                    new_id
1006                );
1007                new_id
1008            });
1009
1010            tracing::debug!("Factory creating handler for session: {}", session_id);
1011
1012            let dispatcher = crate::runtime::http::HttpDispatcher::new(
1013                session_id,
1014                Arc::clone(&sessions_for_factory),
1015                Arc::clone(&pending_for_factory),
1016            );
1017
1018            let mut session_router = (*router_for_factory).clone();
1019            session_router.set_server_request_dispatcher(dispatcher);
1020
1021            let headers_map = headers.map(|header_map| {
1022                header_map
1023                    .iter()
1024                    .filter_map(|(name, value)| {
1025                        value
1026                            .to_str()
1027                            .ok()
1028                            .map(|v| (name.to_string(), v.to_string()))
1029                    })
1030                    .collect()
1031            });
1032
1033            HttpHandlerWithHeaders {
1034                router: session_router,
1035                headers: headers_map,
1036                transport: "http",
1037                tenant_id,
1038            }
1039        };
1040
1041        info!(
1042            server_name = %self.config.name,
1043            server_version = %self.config.version,
1044            bind_addr = %socket_addr,
1045            "HTTP server starting with custom middleware and bidirectional support"
1046        );
1047
1048        // Use run_http_with_middleware function
1049        use crate::runtime::http::run_http_with_middleware;
1050        use turbomcp_transport::streamable_http::StreamableHttpConfigBuilder;
1051
1052        // Create default config with resolved address
1053        let config = StreamableHttpConfigBuilder::new()
1054            .with_bind_address(socket_addr.to_string())
1055            .with_endpoint_path("/mcp")
1056            .allow_localhost(true)
1057            .build();
1058
1059        run_http_with_middleware(
1060            handler_factory,
1061            sessions,
1062            pending_requests,
1063            config,
1064            Some(middleware_fn),
1065        )
1066        .await
1067        .map_err(|e| {
1068            tracing::error!(error = %e, "HTTP server with middleware failed");
1069            crate::ServerError::handler(e.to_string())
1070        })?;
1071
1072        info!("HTTP server shutdown complete");
1073        Ok(())
1074    }
1075
1076    /// Run server with WebSocket transport (full bidirectional support)
1077    ///
1078    /// This provides a simple API for WebSocket servers with sensible defaults:
1079    /// - Default endpoint: `/mcp/ws`
1080    /// - Full MCP 2025-11-25 compliance
1081    /// - Bidirectional communication
1082    /// - Elicitation support
1083    /// - Session management and middleware
1084    ///
1085    /// For custom configuration, use `run_websocket_with_config()`.
1086    ///
1087    /// # Example
1088    ///
1089    /// ```no_run
1090    /// use turbomcp_server::ServerBuilder;
1091    ///
1092    /// #[tokio::main]
1093    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
1094    ///     let server = ServerBuilder::new()
1095    ///         .name("ws-server")
1096    ///         .version("1.0.0")
1097    ///         .build();
1098    ///
1099    ///     server.run_websocket("127.0.0.1:8080").await?;
1100    ///     Ok(())
1101    /// }
1102    /// ```
1103    #[cfg(feature = "websocket")]
1104    #[tracing::instrument(skip(self), fields(
1105        transport = "websocket",
1106        service_name = %self.config.name,
1107        service_version = %self.config.version,
1108        addr = ?addr
1109    ))]
1110    pub async fn run_websocket<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
1111        self,
1112        addr: A,
1113    ) -> ServerResult<()> {
1114        use crate::config::WebSocketServerConfig;
1115
1116        // Build default configuration
1117        let config = WebSocketServerConfig::default();
1118
1119        self.run_websocket_with_config(addr, config).await
1120    }
1121
1122    /// Run server with WebSocket transport and custom configuration
1123    ///
1124    /// This provides full control over WebSocket server configuration including:
1125    /// - Custom endpoint path
1126    /// - MCP server settings (middleware, security, etc.)
1127    ///
1128    /// # Example
1129    ///
1130    /// ```no_run
1131    /// use turbomcp_server::{ServerBuilder, WebSocketServerConfig};
1132    ///
1133    /// #[tokio::main]
1134    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
1135    ///     let server = ServerBuilder::new()
1136    ///         .name("custom-ws-server")
1137    ///         .version("1.0.0")
1138    ///         .build();
1139    ///
1140    ///     let config = WebSocketServerConfig {
1141    ///         bind_addr: "127.0.0.1:8080".to_string(),
1142    ///         endpoint_path: "/custom/ws".to_string(),
1143    ///         max_concurrent_requests: 100,
1144    ///     };
1145    ///
1146    ///     server.run_websocket_with_config("127.0.0.1:8080", config).await?;
1147    ///     Ok(())
1148    /// }
1149    /// ```
1150    #[cfg(feature = "websocket")]
1151    #[tracing::instrument(skip(self, config), fields(
1152        transport = "websocket",
1153        service_name = %self.config.name,
1154        service_version = %self.config.version,
1155        addr = ?addr
1156    ))]
1157    pub async fn run_websocket_with_config<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
1158        self,
1159        addr: A,
1160        config: crate::config::WebSocketServerConfig,
1161    ) -> ServerResult<()> {
1162        use axum::{Router, middleware, routing::get};
1163        use turbomcp_transport::axum::{WebSocketFactoryState, websocket_handler_with_factory};
1164        use turbomcp_transport::tower::SessionInfo;
1165
1166        // Sprint 2.6: Check for insecure 0.0.0.0 binding
1167        crate::security_checks::check_binding_security(&addr);
1168
1169        info!("Starting MCP server with WebSocket transport");
1170        info!(config = ?config, "WebSocket configuration");
1171
1172        self.lifecycle.start().await;
1173
1174        // Resolve address to string
1175        let socket_addr = addr
1176            .to_socket_addrs()
1177            .map_err(|e| crate::ServerError::configuration(format!("Invalid address: {}", e)))?
1178            .next()
1179            .ok_or_else(|| crate::ServerError::configuration("No address resolved"))?;
1180
1181        info!("Resolved address: {}", socket_addr);
1182
1183        // Capture server identity for MCP protocol compliance
1184        let server_info = turbomcp_protocol::ServerInfo {
1185            name: self.config.name.clone(),
1186            version: self.config.version.clone(),
1187        };
1188
1189        // Router for this server (shared across all connections)
1190        let router = (*self.router).clone();
1191
1192        // Factory: creates per-connection handler with bidirectional support
1193        // This is the unified architecture - transport layer handles WebSocket mechanics,
1194        // server layer provides MCP-specific handler logic
1195        let handler_factory =
1196            move |transport_dispatcher: turbomcp_transport::axum::WebSocketDispatcher,
1197                  headers: Option<std::collections::HashMap<String, String>>,
1198                  tenant_id: Option<String>| {
1199                // Wrap transport dispatcher with server layer adapter
1200                let server_dispatcher =
1201                    crate::routing::WebSocketDispatcherAdapter::new(transport_dispatcher);
1202
1203                // Clone router for this connection and configure with dispatcher
1204                let mut connection_router = router.clone();
1205                connection_router.set_server_request_dispatcher(server_dispatcher);
1206
1207                // Create wrapper that passes headers and tenant_id to create_context (WebSocket transport)
1208                // We can reuse HttpHandlerWithHeaders since it's generic
1209                Arc::new(HttpHandlerWithHeaders {
1210                    router: connection_router,
1211                    headers,
1212                    transport: "websocket",
1213                    tenant_id,
1214                }) as Arc<dyn turbomcp_protocol::JsonRpcHandler>
1215            };
1216
1217        info!(
1218            server_name = %server_info.name,
1219            server_version = %server_info.version,
1220            bind_addr = %socket_addr,
1221            endpoint_path = %config.endpoint_path,
1222            "WebSocket server starting with full bidirectional support (elicitation, sampling, roots, ping)"
1223        );
1224
1225        // Create factory state for transport layer
1226        let factory_state = WebSocketFactoryState::new(handler_factory);
1227
1228        // Session middleware to extract headers (same as HTTP transport)
1229        let session_middleware = |mut request: axum::extract::Request, next: middleware::Next| async move {
1230            let mut session = SessionInfo::new();
1231
1232            // Extract headers and store in session metadata
1233            for (name, value) in request.headers().iter() {
1234                if let Ok(value_str) = value.to_str() {
1235                    session
1236                        .metadata
1237                        .insert(name.to_string(), value_str.to_string());
1238                }
1239            }
1240
1241            // Extract specific useful headers
1242            if let Some(user_agent) = request
1243                .headers()
1244                .get("user-agent")
1245                .and_then(|v| v.to_str().ok())
1246            {
1247                session.user_agent = Some(user_agent.to_string());
1248            }
1249
1250            if let Some(remote_addr) = request
1251                .headers()
1252                .get("x-forwarded-for")
1253                .and_then(|v| v.to_str().ok())
1254            {
1255                session.remote_addr = Some(remote_addr.to_string());
1256            }
1257
1258            request.extensions_mut().insert(session);
1259            next.run(request).await
1260        };
1261
1262        // Build Axum router using transport layer
1263        let app = Router::new()
1264            .route(&config.endpoint_path, get(websocket_handler_with_factory))
1265            .with_state(factory_state)
1266            .layer(middleware::from_fn(session_middleware));
1267
1268        info!("WebSocket server bound to {}", socket_addr);
1269
1270        // Serve using Axum
1271        let listener = tokio::net::TcpListener::bind(socket_addr)
1272            .await
1273            .map_err(|e| crate::ServerError::configuration(format!("Failed to bind: {}", e)))?;
1274
1275        axum::serve(listener, app).await.map_err(|e| {
1276            tracing::error!(error = %e, "WebSocket server failed");
1277            crate::ServerError::handler(e.to_string())
1278        })?;
1279
1280        info!("WebSocket server shutdown complete");
1281        Ok(())
1282    }
1283
1284    /// Run server with TCP transport (progressive enhancement - runtime configuration)
1285    #[cfg(feature = "tcp")]
1286    #[tracing::instrument(skip(self), fields(
1287        transport = "tcp",
1288        service_name = %self.config.name,
1289        service_version = %self.config.version,
1290        addr = ?addr
1291    ))]
1292    pub async fn run_tcp<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
1293        mut self,
1294        addr: A,
1295    ) -> ServerResult<()> {
1296        use turbomcp_transport::TcpTransport;
1297
1298        // Sprint 2.6: Check for insecure 0.0.0.0 binding
1299        crate::security_checks::check_binding_security(&addr);
1300
1301        // Start performance monitoring for TCP server
1302        let _perf_span = info_span!("server.run", transport = "tcp").entered();
1303        info!(?addr, "Starting MCP server with TCP transport");
1304
1305        self.lifecycle.start().await;
1306
1307        // Convert ToSocketAddrs to SocketAddr
1308        let socket_addr = match addr.to_socket_addrs() {
1309            Ok(mut addrs) => match addrs.next() {
1310                Some(addr) => addr,
1311                None => {
1312                    tracing::error!("No socket address resolved from provided address");
1313                    self.lifecycle.shutdown().await;
1314                    return Err(crate::ServerError::configuration("Invalid socket address"));
1315                }
1316            },
1317            Err(e) => {
1318                tracing::error!(error = %e, "Failed to resolve socket address");
1319                self.lifecycle.shutdown().await;
1320                return Err(crate::ServerError::configuration(format!(
1321                    "Address resolution failed: {e}"
1322                )));
1323            }
1324        };
1325
1326        let transport = TcpTransport::new_server(socket_addr);
1327        if let Err(e) = transport.connect().await {
1328            tracing::error!(error = %e, "Failed to connect TCP transport");
1329            self.lifecycle.shutdown().await;
1330            return Err(e.into());
1331        }
1332
1333        // BIDIRECTIONAL TCP SETUP
1334        // Create generic transport dispatcher for server-initiated requests
1335        let dispatcher = crate::runtime::TransportDispatcher::new(transport);
1336
1337        // Configure router's bidirectional support with the TCP dispatcher
1338        // This enables ctx.elicit(), ctx.create_message(), ctx.list_roots(), etc.
1339        let router = Arc::make_mut(&mut self.router);
1340        router.set_server_request_dispatcher(dispatcher.clone());
1341
1342        // Run TCP with full bidirectional support (MCP 2025-11-25 compliant)
1343        // This uses the generic bidirectional runtime that handles both:
1344        // - Client→Server requests (tools, resources, prompts)
1345        // - Server→Client requests (sampling, elicitation, roots, ping)
1346        crate::runtime::run_transport_bidirectional(self.router.clone(), dispatcher)
1347            .await
1348            .map_err(|e| crate::ServerError::Handler {
1349                message: format!("TCP bidirectional runtime failed: {}", e),
1350                context: Some("run_tcp".to_string()),
1351            })
1352    }
1353
1354    /// Run server with Unix socket transport (progressive enhancement - runtime configuration)
1355    #[cfg(all(feature = "unix", unix))]
1356    #[tracing::instrument(skip(self), fields(
1357        transport = "unix",
1358        service_name = %self.config.name,
1359        service_version = %self.config.version,
1360        path = ?path.as_ref()
1361    ))]
1362    pub async fn run_unix<P: AsRef<std::path::Path>>(mut self, path: P) -> ServerResult<()> {
1363        use std::path::PathBuf;
1364        use turbomcp_transport::UnixTransport;
1365
1366        // Start performance monitoring for Unix server
1367        let _perf_span = info_span!("server.run", transport = "unix").entered();
1368        info!(path = ?path.as_ref(), "Starting MCP server with Unix socket transport");
1369
1370        self.lifecycle.start().await;
1371
1372        let socket_path = PathBuf::from(path.as_ref());
1373        let transport = UnixTransport::new_server(socket_path);
1374        if let Err(e) = transport.connect().await {
1375            tracing::error!(error = %e, "Failed to connect Unix socket transport");
1376            self.lifecycle.shutdown().await;
1377            return Err(e.into());
1378        }
1379
1380        // BIDIRECTIONAL UNIX SOCKET SETUP
1381        // Create generic transport dispatcher for server-initiated requests
1382        let dispatcher = crate::runtime::TransportDispatcher::new(transport);
1383
1384        // Configure router's bidirectional support with the Unix socket dispatcher
1385        // This enables ctx.elicit(), ctx.create_message(), ctx.list_roots(), etc.
1386        let router = Arc::make_mut(&mut self.router);
1387        router.set_server_request_dispatcher(dispatcher.clone());
1388
1389        // Run Unix Socket with full bidirectional support (MCP 2025-11-25 compliant)
1390        // This uses the generic bidirectional runtime that handles both:
1391        // - Client→Server requests (tools, resources, prompts)
1392        // - Server→Client requests (sampling, elicitation, roots, ping)
1393        crate::runtime::run_transport_bidirectional(self.router.clone(), dispatcher)
1394            .await
1395            .map_err(|e| crate::ServerError::Handler {
1396                message: format!("Unix socket bidirectional runtime failed: {}", e),
1397                context: Some("run_unix".to_string()),
1398            })
1399    }
1400
1401    /// Generic transport runner (DRY principle)
1402    /// Used by feature-gated transport methods (http, tcp, websocket, unix)
1403    #[allow(dead_code)]
1404    #[tracing::instrument(skip(self, transport), fields(
1405        service_name = %self.config.name,
1406        service_version = %self.config.version
1407    ))]
1408    async fn run_with_transport<T: Transport>(&self, mut transport: T) -> ServerResult<()> {
1409        // Install signal handlers for graceful shutdown (Ctrl+C / SIGTERM)
1410        let lifecycle_for_sigint = self.lifecycle.clone();
1411        tokio::spawn(async move {
1412            if let Err(e) = tokio::signal::ctrl_c().await {
1413                tracing::warn!(error = %e, "Failed to install Ctrl+C handler");
1414                return;
1415            }
1416            tracing::info!("Ctrl+C received, initiating shutdown");
1417            lifecycle_for_sigint.shutdown().await;
1418        });
1419
1420        #[cfg(unix)]
1421        {
1422            let lifecycle_for_sigterm = self.lifecycle.clone();
1423            tokio::spawn(async move {
1424                use tokio::signal::unix::{SignalKind, signal};
1425                match signal(SignalKind::terminate()) {
1426                    Ok(mut sigterm) => {
1427                        sigterm.recv().await;
1428                        tracing::info!("SIGTERM received, initiating shutdown");
1429                        lifecycle_for_sigterm.shutdown().await;
1430                    }
1431                    Err(e) => tracing::warn!(error = %e, "Failed to install SIGTERM handler"),
1432                }
1433            });
1434        }
1435
1436        // Shutdown signal
1437        let mut shutdown = self.lifecycle.shutdown_signal();
1438
1439        // Main message processing loop
1440        loop {
1441            tokio::select! {
1442                _ = shutdown.recv() => {
1443                    tracing::info!("Shutdown signal received");
1444                    break;
1445                }
1446                res = transport.receive() => {
1447                    match res {
1448                        Ok(Some(message)) => {
1449                            if let Err(e) = self.handle_transport_message(&mut transport, message).await {
1450                                tracing::warn!(error = %e, "Failed to handle transport message");
1451                            }
1452                        }
1453                        Ok(None) => {
1454                            // No message available; sleep briefly to avoid busy loop
1455                            sleep(Duration::from_millis(5)).await;
1456                        }
1457                        Err(e) => {
1458                            match e {
1459                                TransportError::ReceiveFailed(msg) if msg.contains("disconnected") => {
1460                                    tracing::info!("Transport receive channel disconnected; shutting down");
1461                                    break;
1462                                }
1463                                _ => {
1464                                    tracing::error!(error = %e, "Transport receive failed");
1465                                    // Backoff on errors
1466                                    sleep(Duration::from_millis(50)).await;
1467                                }
1468                            }
1469                        }
1470                    }
1471                }
1472            }
1473        }
1474
1475        // Disconnect transport
1476        if let Err(e) = transport.disconnect().await {
1477            tracing::warn!(error = %e, "Error while disconnecting transport");
1478        }
1479
1480        tracing::info!("Server shutdown complete");
1481        Ok(())
1482    }
1483}
1484
1485// Compile-time assertion that McpServer is Send + Clone (Tower pattern)
1486// Note: McpServer is Clone but NOT Sync (due to BoxCloneService being !Sync)
1487// This is intentional and follows the Axum/Tower design pattern
1488#[allow(dead_code)]
1489const _: () = {
1490    const fn assert_send_clone<T: Send + Clone>() {}
1491    const fn check() {
1492        assert_send_clone::<crate::server::core::McpServer>();
1493    }
1494};