turbomcp_server/server/
core.rs

1//! Core MCP server implementation
2//!
3//! Contains the main McpServer struct and its core functionality including
4//! middleware building, lifecycle management, and server construction.
5
6use std::sync::Arc;
7use tracing::{info, info_span};
8
9#[cfg(feature = "http")]
10use tracing::warn;
11
12use crate::{
13    config::ServerConfig,
14    error::ServerResult,
15    lifecycle::{HealthStatus, ServerLifecycle},
16    metrics::ServerMetrics,
17    registry::HandlerRegistry,
18    routing::RequestRouter,
19    service::McpService,
20};
21
22#[cfg(feature = "middleware")]
23use crate::middleware::MiddlewareStack;
24
25use bytes::Bytes;
26use http::{Request, Response};
27use tokio::time::{Duration, sleep};
28use turbomcp_transport::Transport;
29use turbomcp_transport::core::TransportError;
30
31use super::shutdown::ShutdownHandle;
32
33/// Check if logging should be enabled for STDIO transport
34///
35/// For MCP STDIO transport compliance, logging is disabled by default since stdout
36/// must be reserved exclusively for JSON-RPC messages. This can be overridden by
37/// setting the TURBOMCP_FORCE_LOGGING environment variable.
38pub(crate) fn should_log_for_stdio() -> bool {
39    std::env::var("TURBOMCP_FORCE_LOGGING").is_ok()
40}
41
42/// Wrapper that holds router + headers and implements JsonRpcHandler
43/// This allows us to pass headers to create_context without storing them on the router.
44/// Used by both HTTP and WebSocket transports.
45#[cfg(any(feature = "http", feature = "websocket"))]
46struct HttpHandlerWithHeaders {
47    router: crate::routing::RequestRouter,
48    headers: Option<std::collections::HashMap<String, String>>,
49    transport: &'static str,
50}
51
52#[cfg(any(feature = "http", feature = "websocket"))]
53#[async_trait::async_trait]
54impl turbomcp_protocol::JsonRpcHandler for HttpHandlerWithHeaders {
55    async fn handle_request(&self, req_value: serde_json::Value) -> serde_json::Value {
56        use turbomcp_protocol::jsonrpc::JsonRpcRequest;
57
58        // Parse the request
59        let req: JsonRpcRequest = match serde_json::from_value(req_value) {
60            Ok(r) => r,
61            Err(e) => {
62                return serde_json::json!({
63                    "jsonrpc": "2.0",
64                    "error": {
65                        "code": -32700,
66                        "message": format!("Parse error: {}", e)
67                    },
68                    "id": null
69                });
70            }
71        };
72
73        // Create context with headers and transport type
74        let ctx = self
75            .router
76            .create_context(self.headers.clone(), Some(self.transport));
77
78        // Route the request
79        let response = self.router.route(req, ctx).await;
80
81        // Serialize response
82        match serde_json::to_value(&response) {
83            Ok(v) => v,
84            Err(e) => {
85                serde_json::json!({
86                    "jsonrpc": "2.0",
87                    "error": {
88                        "code": -32603,
89                        "message": format!("Internal error: failed to serialize response: {}", e)
90                    },
91                    "id": response.id
92                })
93            }
94        }
95    }
96
97    fn server_info(&self) -> turbomcp_protocol::ServerInfo {
98        self.router.server_info()
99    }
100}
101
102/// Main MCP server following the Axum/Tower Clone pattern
103///
104/// ## Sharing Pattern
105///
106/// `McpServer` implements `Clone` like Axum's `Router`. All heavy state is Arc-wrapped
107/// internally, making cloning cheap (just atomic reference count increments).
108///
109/// ```rust,no_run
110/// use turbomcp_server::ServerBuilder;
111///
112/// # async fn example() {
113/// let server = ServerBuilder::new().build();
114///
115/// // Clone for passing to functions (cheap - just Arc increments)
116/// let server1 = server.clone();
117/// let server2 = server.clone();
118///
119/// // Access config and health
120/// let config = server1.config();
121/// println!("Server: {}", config.name);
122///
123/// let health = server2.health().await;
124/// println!("Health: {:?}", health);
125/// # }
126/// ```
127///
128/// ## Architecture Notes
129///
130/// The `service` field contains `BoxCloneService` which is `Send + Clone` but NOT `Sync`.
131/// This is intentional and follows Tower's design - users clone the server instead of
132/// Arc-wrapping it.
133///
134/// **Architecture Note**: The service field provides tower::Service integration for
135/// advanced middleware patterns. The request processing pipeline currently uses the
136/// RequestRouter directly. Tower integration can be added via custom middleware layers
137/// when needed for specific use cases (e.g., custom rate limiting, advanced tracing).
138#[derive(Clone)]
139pub struct McpServer {
140    /// Server configuration (Clone-able)
141    pub(crate) config: ServerConfig,
142    /// Handler registry (Arc-wrapped for cheap cloning)
143    pub(crate) registry: Arc<HandlerRegistry>,
144    /// Request router (Arc-wrapped for cheap cloning)
145    pub(crate) router: Arc<RequestRouter>,
146    /// Tower middleware service stack (Clone but !Sync - this is the Tower pattern)
147    ///
148    /// All requests flow through this service stack, which provides:
149    /// - Timeout enforcement
150    /// - Request validation
151    /// - Authorization checks
152    /// - Rate limiting
153    /// - Audit logging
154    /// - And more middleware layers as configured
155    ///
156    /// See `server/transport.rs` for integration with transport layer.
157    pub(crate) service:
158        tower::util::BoxCloneService<Request<Bytes>, Response<Bytes>, crate::ServerError>,
159    /// Server lifecycle (Arc-wrapped for cheap cloning)
160    pub(crate) lifecycle: Arc<ServerLifecycle>,
161    /// Server metrics (Arc-wrapped for cheap cloning)
162    pub(crate) metrics: Arc<ServerMetrics>,
163}
164
165impl std::fmt::Debug for McpServer {
166    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
167        f.debug_struct("McpServer")
168            .field("config", &self.config)
169            .finish()
170    }
171}
172
173impl McpServer {
174    /// Build comprehensive Tower middleware stack (transport-agnostic)
175    ///
176    /// ## Architecture
177    ///
178    /// This creates a complete Tower service stack with conditional middleware layers:
179    /// - **Timeout Layer**: Request timeout enforcement (tower_http)
180    /// - **Validation Layer**: JSON-RPC structure validation
181    /// - **Authorization Layer**: Resource access control
182    /// - **Core Service**: JSON-RPC routing and handler execution
183    ///
184    /// All middleware is composed using Tower's ServiceBuilder pattern, which provides:
185    /// - Top-to-bottom execution order
186    /// - Type-safe layer composition
187    /// - Zero-cost abstractions
188    /// - Clone-able service instances
189    ///
190    /// ## Integration
191    ///
192    /// The resulting BoxCloneService is stored in `self.service` and called from
193    /// `server/transport.rs` for every incoming request. This ensures ALL requests
194    /// flow through the complete middleware pipeline before reaching handlers.
195    ///
196    /// ## Adding Middleware
197    ///
198    /// To add new middleware, update the match arms below to include your layer.
199    /// Follow the pattern of conditional inclusion based on config flags.
200    #[cfg(feature = "middleware")]
201    fn build_middleware_stack(
202        core_service: McpService,
203        stack: MiddlewareStack,
204    ) -> tower::util::BoxCloneService<Request<Bytes>, Response<Bytes>, crate::ServerError> {
205        // COMPREHENSIVE TOWER COMPOSITION - Conditional Layer Stacking
206        //
207        // This approach builds the middleware stack incrementally, boxing at each step.
208        // While this has a small performance cost from multiple boxing operations,
209        // it provides several critical advantages:
210        //
211        // 1. **Maintainability**: No combinatorial explosion (8 match arms → simple chain)
212        // 2. **Extensibility**: Adding new middleware requires only one new block
213        // 3. **Clarity**: Each layer's purpose and configuration is explicit
214        // 4. **Type Safety**: BoxCloneService provides type erasure while preserving Clone
215        //
216        // Performance note: The boxing overhead is negligible compared to network I/O
217        // and handler execution time. Modern allocators make this essentially free.
218
219        // Start with core service as a boxed service for uniform type handling
220        let mut service: tower::util::BoxCloneService<
221            Request<Bytes>,
222            Response<Bytes>,
223            crate::ServerError,
224        > = tower::util::BoxCloneService::new(core_service);
225
226        // Authorization layer removed in 2.0.0 - handle at application layer
227
228        // Layer 2: Validation
229        // Validates request structure after auth but before processing
230        #[cfg(feature = "middleware")]
231        {
232            if let Some(validation_layer) = stack.validation_layer() {
233                service = tower::util::BoxCloneService::new(
234                    tower::ServiceBuilder::new()
235                        .layer(validation_layer)
236                        .service(service),
237                );
238            }
239        }
240
241        // Layer 3: Timeout (outermost)
242        // Applied last so it can enforce timeout on the entire request pipeline
243        #[cfg(feature = "middleware")]
244        {
245            if let Some(timeout_config) = stack.timeout_config
246                && timeout_config.enabled
247            {
248                service = tower::util::BoxCloneService::new(
249                    tower::ServiceBuilder::new()
250                        .layer(tower_http::timeout::TimeoutLayer::new(
251                            timeout_config.request_timeout,
252                        ))
253                        .service(service),
254                );
255            }
256        }
257
258        // Future middleware can be added here with similar if-let blocks:
259        // if let Some(auth_config) = stack.auth_config { ... }
260        // if let Some(audit_config) = stack.audit_config { ... }
261        // if let Some(rate_limit_config) = stack.rate_limit_config { ... }
262
263        service
264    }
265
266    /// Create a new server
267    #[must_use]
268    pub fn new(config: ServerConfig) -> Self {
269        Self::new_with_registry(config, HandlerRegistry::new())
270    }
271
272    /// Create a new server with an existing registry (used by ServerBuilder)
273    #[must_use]
274    pub(crate) fn new_with_registry(config: ServerConfig, registry: HandlerRegistry) -> Self {
275        let registry = Arc::new(registry);
276        let metrics = Arc::new(ServerMetrics::new());
277        let router = Arc::new(RequestRouter::new(
278            Arc::clone(&registry),
279            Arc::clone(&metrics),
280            config.clone(),
281        ));
282        // Build middleware stack configuration
283        #[cfg(feature = "middleware")]
284        #[cfg_attr(not(feature = "rate-limiting"), allow(unused_mut))]
285        let mut stack = crate::middleware::MiddlewareStack::new();
286
287        // Auto-install rate limiting if enabled in config
288        #[cfg(feature = "rate-limiting")]
289        if config.rate_limiting.enabled {
290            use crate::middleware::rate_limit::{RateLimitStrategy, RateLimits};
291            use std::num::NonZeroU32;
292            use std::time::Duration;
293
294            let rate_config = crate::middleware::RateLimitConfig {
295                strategy: RateLimitStrategy::Global,
296                limits: RateLimits {
297                    requests_per_period: NonZeroU32::new(
298                        config.rate_limiting.requests_per_second * 60,
299                    )
300                    .unwrap(), // Convert per-second to per-minute
301                    period: Duration::from_secs(60),
302                    burst_size: Some(NonZeroU32::new(config.rate_limiting.burst_capacity).unwrap()),
303                },
304                enabled: true,
305            };
306
307            stack = stack.with_rate_limit(rate_config);
308        }
309
310        // Create core MCP service
311        let core_service = McpService::new(
312            Arc::clone(&registry),
313            Arc::clone(&router),
314            Arc::clone(&metrics),
315        );
316
317        // COMPREHENSIVE TOWER SERVICE COMPOSITION
318        // Build the complete middleware stack with proper type erasure
319        //
320        // This service is called from server/transport.rs for EVERY incoming request:
321        // TransportMessage -> http::Request -> service.call() -> http::Response -> TransportMessage
322        //
323        // The Tower middleware stack provides:
324        // ✓ Timeout enforcement (configurable per-request)
325        // ✓ Request validation (JSON-RPC structure)
326        // ✓ Authorization checks (resource access control)
327        // ✓ Rate limiting (if enabled in config)
328        // ✓ Audit logging (configurable)
329        // ✓ And more layers as configured
330        //
331        // BoxCloneService is Clone but !Sync - this is the Tower pattern
332        #[cfg(feature = "middleware")]
333        let service = Self::build_middleware_stack(core_service, stack);
334
335        #[cfg(not(feature = "middleware"))]
336        let service = tower::util::BoxCloneService::new(core_service);
337
338        let lifecycle = Arc::new(ServerLifecycle::new());
339
340        Self {
341            config,
342            registry,
343            router,
344            service,
345            lifecycle,
346            metrics,
347        }
348    }
349
350    /// Get server configuration
351    #[must_use]
352    pub const fn config(&self) -> &ServerConfig {
353        &self.config
354    }
355
356    /// Get handler registry
357    #[must_use]
358    pub const fn registry(&self) -> &Arc<HandlerRegistry> {
359        &self.registry
360    }
361
362    /// Get request router
363    #[must_use]
364    pub const fn router(&self) -> &Arc<RequestRouter> {
365        &self.router
366    }
367
368    /// Get server lifecycle
369    #[must_use]
370    pub const fn lifecycle(&self) -> &Arc<ServerLifecycle> {
371        &self.lifecycle
372    }
373
374    /// Get server metrics
375    #[must_use]
376    pub const fn metrics(&self) -> &Arc<ServerMetrics> {
377        &self.metrics
378    }
379
380    /// Get the Tower service stack (test accessor)
381    ///
382    /// **Note**: This is primarily for integration testing. Production code should
383    /// use the transport layer which calls the service internally via
384    /// `handle_transport_message()`.
385    ///
386    /// Returns a clone of the Tower service stack, which is cheap (BoxCloneService
387    /// is designed for cloning).
388    #[doc(hidden)]
389    pub fn service(
390        &self,
391    ) -> tower::util::BoxCloneService<Request<Bytes>, Response<Bytes>, crate::ServerError> {
392        self.service.clone()
393    }
394
395    /// Get a shutdown handle for graceful server termination
396    ///
397    /// This handle enables external control over server shutdown, essential for:
398    /// - **Production deployments**: Graceful shutdown on SIGTERM/SIGINT
399    /// - **Container orchestration**: Kubernetes graceful pod termination
400    /// - **Load balancer integration**: Health check coordination
401    /// - **Multi-component systems**: Coordinated shutdown sequences
402    /// - **Maintenance operations**: Planned downtime and updates
403    ///
404    /// # Examples
405    ///
406    /// ## Basic shutdown coordination
407    /// ```no_run
408    /// # use turbomcp_server::ServerBuilder;
409    /// # #[tokio::main]
410    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
411    /// let server = ServerBuilder::new().build();
412    /// let shutdown_handle = server.shutdown_handle();
413    ///
414    /// // Coordinate with other services
415    /// tokio::spawn(async move {
416    ///     // Wait for external shutdown signal
417    ///     tokio::signal::ctrl_c().await.expect("Failed to install Ctrl+C handler");
418    ///     println!("Shutdown signal received, terminating gracefully...");
419    ///     shutdown_handle.shutdown().await;
420    /// });
421    ///
422    /// // Server will gracefully shut down when signaled
423    /// // server.run_stdio().await?;
424    /// # Ok(())
425    /// # }
426    /// ```
427    ///
428    /// ## Container/Kubernetes deployment
429    /// ```no_run
430    /// # use turbomcp_server::ServerBuilder;
431    /// # use std::sync::Arc;
432    /// # #[tokio::main]
433    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
434    /// let server = ServerBuilder::new().build();
435    /// let shutdown_handle = server.shutdown_handle();
436    /// let shutdown_handle_clone = shutdown_handle.clone();
437    ///
438    /// // Handle multiple signal types with proper platform support
439    /// tokio::spawn(async move {
440    ///     #[cfg(unix)]
441    ///     {
442    ///         use tokio::signal::unix::{signal, SignalKind};
443    ///         let mut sigterm = signal(SignalKind::terminate()).unwrap();
444    ///         tokio::select! {
445    ///             _ = tokio::signal::ctrl_c() => {
446    ///                 println!("SIGINT received");
447    ///             }
448    ///             _ = sigterm.recv() => {
449    ///                 println!("SIGTERM received");
450    ///             }
451    ///         }
452    ///     }
453    ///     #[cfg(not(unix))]
454    ///     {
455    ///         tokio::signal::ctrl_c().await.expect("Failed to install Ctrl+C handler");
456    ///         println!("SIGINT received");
457    ///     }
458    ///     shutdown_handle_clone.shutdown().await;
459    /// });
460    ///
461    /// // Server handles graceful shutdown automatically
462    /// // server.run_tcp("0.0.0.0:8080").await?;
463    /// # Ok(())
464    /// # }
465    /// ```
466    pub fn shutdown_handle(&self) -> ShutdownHandle {
467        ShutdownHandle::new(self.lifecycle.clone())
468    }
469
470    /// Run the server with STDIO transport
471    ///
472    /// # Errors
473    ///
474    /// Returns [`crate::ServerError::Transport`] if:
475    /// - STDIO transport connection fails
476    /// - Message sending/receiving fails
477    /// - Transport disconnection fails
478    #[tracing::instrument(skip(self), fields(
479        transport = "stdio",
480        service_name = %self.config.name,
481        service_version = %self.config.version
482    ))]
483    pub async fn run_stdio(mut self) -> ServerResult<()> {
484        // For STDIO transport, disable logging unless explicitly overridden
485        // STDIO stdout must be reserved exclusively for JSON-RPC messages per MCP protocol
486        if should_log_for_stdio() {
487            info!("Starting MCP server with STDIO transport");
488        }
489
490        // Start performance monitoring for STDIO server
491        let _perf_span = info_span!("server.run", transport = "stdio").entered();
492        info!("Initializing STDIO transport for MCP server");
493
494        self.lifecycle.start().await;
495
496        // BIDIRECTIONAL STDIO SETUP
497        // Create STDIO dispatcher for server-initiated requests (sampling, elicitation, roots, ping)
498        let (request_tx, request_rx) = tokio::sync::mpsc::unbounded_channel();
499
500        // Use fully-qualified path to avoid ambiguity with the turbomcp crate's runtime module
501        let dispatcher = crate::runtime::StdioDispatcher::new(request_tx);
502
503        // Configure router's bidirectional support with the STDIO dispatcher
504        // SAFETY: We have &mut self, so we can safely get mutable access to the Arc'd router
505        // This is the CRITICAL STEP that was missing - without this, all server→client requests fail
506        let router = Arc::make_mut(&mut self.router);
507        router.set_server_request_dispatcher(dispatcher.clone());
508
509        // Run STDIO with full bidirectional support (MCP 2025-06-18 compliant)
510        // This uses the bidirectional-aware runtime that handles both:
511        // - Client→Server requests (tools, resources, prompts)
512        // - Server→Client requests (sampling, elicitation, roots, ping)
513        crate::runtime::run_stdio_bidirectional(self.router.clone(), dispatcher, request_rx)
514            .await
515            .map_err(|e| crate::ServerError::Handler {
516                message: format!("STDIO bidirectional runtime failed: {}", e),
517                context: Some("run_stdio".to_string()),
518            })
519    }
520
521    /// Get health status
522    pub async fn health(&self) -> HealthStatus {
523        self.lifecycle.health().await
524    }
525
526    /// Run server with HTTP transport using default configuration
527    ///
528    /// This provides a working HTTP server with:
529    /// - Standard HTTP POST/GET/DELETE for MCP protocol at `/mcp`
530    /// - Full MCP 2025-06-18 protocol compliance
531    /// - Graceful shutdown support
532    /// - Default rate limiting (100 req/60s)
533    /// - Default security settings (localhost allowed, CORS disabled)
534    ///
535    /// For custom configuration (rate limits, security, CORS), use `run_http_with_config`.
536    ///
537    /// # Examples
538    ///
539    /// ## Basic usage with default configuration
540    /// ```no_run
541    /// use turbomcp_server::ServerBuilder;
542    ///
543    /// #[tokio::main]
544    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
545    ///     let server = ServerBuilder::new()
546    ///         .name("my-server")
547    ///         .version("1.0.0")
548    ///         .build();
549    ///
550    ///     server.run_http("127.0.0.1:3000").await?;
551    ///     Ok(())
552    /// }
553    /// ```
554    ///
555    /// ## With custom configuration
556    /// ```no_run
557    /// use turbomcp_server::ServerBuilder;
558    /// use turbomcp_transport::streamable_http_v2::StreamableHttpConfigBuilder;
559    /// use std::time::Duration;
560    ///
561    /// #[tokio::main]
562    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
563    ///     let server = ServerBuilder::new()
564    ///         .name("my-server")
565    ///         .version("1.0.0")
566    ///         .build();
567    ///
568    ///     let config = StreamableHttpConfigBuilder::new()
569    ///         .with_bind_address("127.0.0.1:3000")
570    ///         .allow_any_origin(true)  // Enable CORS for development
571    ///         .build();
572    ///
573    ///     server.run_http_with_config("127.0.0.1:3000", config).await?;
574    ///     Ok(())
575    /// }
576    /// ```
577    ///
578    /// # Errors
579    ///
580    /// Returns [`crate::ServerError::Transport`] if:
581    /// - Address resolution fails
582    /// - HTTP server fails to start
583    /// - Transport disconnection fails
584    #[cfg(feature = "http")]
585    #[tracing::instrument(skip(self), fields(
586        transport = "http",
587        service_name = %self.config.name,
588        service_version = %self.config.version,
589        addr = ?addr
590    ))]
591    pub async fn run_http<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
592        self,
593        addr: A,
594    ) -> ServerResult<()> {
595        use turbomcp_transport::streamable_http_v2::StreamableHttpConfigBuilder;
596
597        // Build default configuration
598        let config = StreamableHttpConfigBuilder::new().build();
599
600        self.run_http_with_config(addr, config).await
601    }
602
603    /// Run server with HTTP transport and custom configuration
604    ///
605    /// This provides full control over HTTP server configuration including:
606    /// - Rate limiting (requests per time window, or disabled entirely)
607    /// - Security settings (CORS, origin validation, authentication)
608    /// - Network settings (bind address, endpoint path, keep-alive)
609    /// - Advanced settings (replay buffer size, etc.)
610    ///
611    /// # Bind Address Configuration
612    ///
613    /// **IMPORTANT**: The `addr` parameter takes precedence over `config.bind_addr`.
614    /// If they differ, a deprecation warning is logged.
615    ///
616    /// **Best Practice** (recommended for forward compatibility):
617    /// ```no_run
618    /// # use turbomcp_server::ServerBuilder;
619    /// # use turbomcp_transport::streamable_http_v2::StreamableHttpConfigBuilder;
620    /// # #[tokio::main]
621    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
622    /// let config = StreamableHttpConfigBuilder::new()
623    ///     .with_bind_address("127.0.0.1:3001")  // Set bind address in config
624    ///     .build();
625    ///
626    /// // Pass matching addr parameter (or use default "127.0.0.1:8080")
627    /// ServerBuilder::new()
628    ///     .build()
629    ///     .run_http_with_config("127.0.0.1:3001", config).await?;
630    /// # Ok(())
631    /// # }
632    /// ```
633    ///
634    /// **Deprecated** (will be removed in v3.x):
635    /// ```no_run
636    /// # use turbomcp_server::ServerBuilder;
637    /// # use turbomcp_transport::streamable_http_v2::StreamableHttpConfigBuilder;
638    /// # #[tokio::main]
639    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
640    /// // ⚠️ Avoid setting different addresses - causes deprecation warning
641    /// let config = StreamableHttpConfigBuilder::new()
642    ///     .with_bind_address("0.0.0.0:5000")  // This is ignored!
643    ///     .build();
644    ///
645    /// // The addr parameter wins (3001 is used, not 5000)
646    /// ServerBuilder::new()
647    ///     .build()
648    ///     .run_http_with_config("127.0.0.1:3001", config).await?;
649    /// # Ok(())
650    /// # }
651    /// ```
652    ///
653    /// **Future (v3.x)**: The `addr` parameter will be removed. Configure bind address
654    /// via `config.with_bind_address()` only.
655    ///
656    /// # Examples
657    ///
658    /// ## Custom configuration example
659    /// ```no_run
660    /// use turbomcp_server::ServerBuilder;
661    /// use turbomcp_transport::streamable_http_v2::StreamableHttpConfigBuilder;
662    /// use std::time::Duration;
663    ///
664    /// #[tokio::main]
665    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
666    ///     let server = ServerBuilder::new()
667    ///         .name("custom-server")
668    ///         .version("1.0.0")
669    ///         .build();
670    ///
671    ///     let config = StreamableHttpConfigBuilder::new()
672    ///         .with_bind_address("127.0.0.1:3000")
673    ///         .with_rate_limit(1000, Duration::from_secs(60))  // 1000 req/min
674    ///         .build();
675    ///
676    ///     server.run_http_with_config("127.0.0.1:3000", config).await?;
677    ///     Ok(())
678    /// }
679    /// ```
680    ///
681    /// ## Production configuration (secure, rate limited)
682    /// ```no_run
683    /// use turbomcp_server::ServerBuilder;
684    /// use turbomcp_transport::streamable_http_v2::StreamableHttpConfigBuilder;
685    /// use std::time::Duration;
686    ///
687    /// #[tokio::main]
688    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
689    ///     let server = ServerBuilder::new()
690    ///         .name("production-server")
691    ///         .version("1.0.0")
692    ///         .build();
693    ///
694    ///     let config = StreamableHttpConfigBuilder::new()
695    ///         .with_bind_address("127.0.0.1:3000")
696    ///         .with_rate_limit(1000, Duration::from_secs(60))  // 1000 req/min
697    ///         .allow_any_origin(false)  // Strict CORS
698    ///         .require_authentication(true)  // Require auth
699    ///         .build();
700    ///
701    ///     server.run_http_with_config("127.0.0.1:3000", config).await?;
702    ///     Ok(())
703    /// }
704    /// ```
705    ///
706    /// # Errors
707    ///
708    /// Returns [`crate::ServerError::Transport`] if:
709    /// - Address resolution fails
710    /// - HTTP server fails to start
711    /// - Transport disconnection fails
712    #[cfg(feature = "http")]
713    #[tracing::instrument(skip(self, config), fields(
714        transport = "http",
715        service_name = %self.config.name,
716        service_version = %self.config.version,
717        addr = ?addr
718    ))]
719    pub async fn run_http_with_config<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
720        self,
721        addr: A,
722        mut config: turbomcp_transport::streamable_http_v2::StreamableHttpConfig,
723    ) -> ServerResult<()> {
724        use std::collections::HashMap;
725        use tokio::sync::{Mutex, RwLock};
726
727        // Sprint 2.6: Check for insecure 0.0.0.0 binding
728        crate::security_checks::check_binding_security(&addr);
729
730        info!("Starting MCP server with HTTP transport");
731
732        self.lifecycle.start().await;
733
734        // Resolve address to string
735        let socket_addr = addr
736            .to_socket_addrs()
737            .map_err(|e| crate::ServerError::configuration(format!("Invalid address: {}", e)))?
738            .next()
739            .ok_or_else(|| crate::ServerError::configuration("No address resolved"))?;
740
741        info!("Resolved address: {}", socket_addr);
742
743        // Check for conflicting bind addresses and warn about deprecation
744        let socket_addr_str = socket_addr.to_string();
745        if config.bind_addr != socket_addr_str {
746            warn!(
747                addr_parameter = %socket_addr_str,
748                config_bind_addr = %config.bind_addr,
749                "⚠️  DEPRECATION WARNING: The `addr` parameter takes precedence over `config.bind_addr`"
750            );
751            warn!(
752                "⚠️  In TurboMCP v3.x, the `addr` parameter will be removed. Please use StreamableHttpConfigBuilder::new().with_bind_address(\"{}\").build() instead",
753                socket_addr_str
754            );
755            warn!(
756                "⚠️  Avoid setting both `addr` parameter and `config.bind_addr` to prevent confusion"
757            );
758
759            // Update config to single source of truth
760            config.bind_addr = socket_addr_str.clone();
761            config.base_url = format!("http://{}", socket_addr_str);
762        }
763
764        info!(
765            config = ?config,
766            "HTTP configuration (updated with resolved address)"
767        );
768
769        // BIDIRECTIONAL HTTP SETUP
770        // Create shared state for session management and bidirectional MCP
771        let sessions = Arc::new(RwLock::new(HashMap::new()));
772        let pending_requests = Arc::new(Mutex::new(HashMap::new()));
773
774        // Share router across all sessions (routing logic and handler registry)
775        let router = self.router.clone();
776
777        // Capture server identity for MCP protocol compliance
778        let server_info = turbomcp_protocol::ServerInfo {
779            name: self.config.name.clone(),
780            version: self.config.version.clone(),
781        };
782
783        // Factory pattern: create session-specific router for each HTTP request
784        // This is the clean architecture that HTTP requires - each session gets its own
785        // bidirectional dispatcher while sharing the routing logic
786        let sessions_for_factory = Arc::clone(&sessions);
787        let pending_for_factory = Arc::clone(&pending_requests);
788        let router_for_factory = Arc::clone(&router);
789
790        // Create a wrapper that converts headers and delegates to router
791        // This is cleaner than storing headers on the router itself
792        let handler_factory =
793            move |session_id: Option<String>, headers: Option<axum::http::HeaderMap>| {
794                let session_id = session_id.unwrap_or_else(|| {
795                    let new_id = uuid::Uuid::new_v4().to_string();
796                    tracing::debug!(
797                        "HTTP POST without session ID - generating ephemeral ID for request: {}",
798                        new_id
799                    );
800                    new_id
801                });
802
803                tracing::debug!("Factory creating handler for session: {}", session_id);
804
805                // Create session-specific HTTP dispatcher (now local to turbomcp-server!)
806                let dispatcher = crate::runtime::http::HttpDispatcher::new(
807                    session_id,
808                    Arc::clone(&sessions_for_factory),
809                    Arc::clone(&pending_for_factory),
810                );
811
812                // Clone the base router and configure with session-specific dispatcher
813                // CRITICAL: set_server_request_dispatcher also recreates server_to_client adapter
814                let mut session_router = (*router_for_factory).clone();
815                session_router.set_server_request_dispatcher(dispatcher);
816
817                // Convert HeaderMap to HashMap<String, String> for passing to create_context
818                let headers_map = headers.map(|header_map| {
819                    header_map
820                        .iter()
821                        .filter_map(|(name, value)| {
822                            value
823                                .to_str()
824                                .ok()
825                                .map(|v| (name.to_string(), v.to_string()))
826                        })
827                        .collect()
828                });
829
830                // Create wrapper that passes headers to create_context (HTTP transport)
831                HttpHandlerWithHeaders {
832                    router: session_router,
833                    headers: headers_map,
834                    transport: "http",
835                }
836            };
837
838        info!(
839            server_name = %server_info.name,
840            server_version = %server_info.version,
841            bind_addr = %socket_addr,
842            endpoint_path = %config.endpoint_path,
843            "HTTP server starting with full bidirectional support (elicitation, sampling, roots, ping)"
844        );
845
846        // Use factory-based HTTP server with full bidirectional support
847        use crate::runtime::http::run_http;
848        run_http(
849            handler_factory,
850            sessions,
851            pending_requests,
852            socket_addr.to_string(),
853            config.endpoint_path.clone(),
854        )
855        .await
856        .map_err(|e| {
857            tracing::error!(error = %e, "HTTP server failed");
858            crate::ServerError::handler(e.to_string())
859        })?;
860
861        info!("HTTP server shutdown complete");
862        Ok(())
863    }
864
865    /// Run server with WebSocket transport (full bidirectional support)
866    ///
867    /// This provides a simple API for WebSocket servers with sensible defaults:
868    /// - Default endpoint: `/mcp/ws`
869    /// - Full MCP 2025-06-18 compliance
870    /// - Bidirectional communication
871    /// - Elicitation support
872    /// - Session management and middleware
873    ///
874    /// For custom configuration, use `run_websocket_with_config()`.
875    ///
876    /// # Example
877    ///
878    /// ```no_run
879    /// use turbomcp_server::ServerBuilder;
880    ///
881    /// #[tokio::main]
882    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
883    ///     let server = ServerBuilder::new()
884    ///         .name("ws-server")
885    ///         .version("1.0.0")
886    ///         .build();
887    ///
888    ///     server.run_websocket("127.0.0.1:8080").await?;
889    ///     Ok(())
890    /// }
891    /// ```
892    #[cfg(feature = "websocket")]
893    #[tracing::instrument(skip(self), fields(
894        transport = "websocket",
895        service_name = %self.config.name,
896        service_version = %self.config.version,
897        addr = ?addr
898    ))]
899    pub async fn run_websocket<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
900        self,
901        addr: A,
902    ) -> ServerResult<()> {
903        use crate::config::WebSocketServerConfig;
904
905        // Build default configuration
906        let config = WebSocketServerConfig::default();
907
908        self.run_websocket_with_config(addr, config).await
909    }
910
911    /// Run server with WebSocket transport and custom configuration
912    ///
913    /// This provides full control over WebSocket server configuration including:
914    /// - Custom endpoint path
915    /// - MCP server settings (middleware, security, etc.)
916    ///
917    /// # Example
918    ///
919    /// ```no_run
920    /// use turbomcp_server::{ServerBuilder, WebSocketServerConfig};
921    ///
922    /// #[tokio::main]
923    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
924    ///     let server = ServerBuilder::new()
925    ///         .name("custom-ws-server")
926    ///         .version("1.0.0")
927    ///         .build();
928    ///
929    ///     let config = WebSocketServerConfig {
930    ///         bind_addr: "127.0.0.1:8080".to_string(),
931    ///         endpoint_path: "/custom/ws".to_string(),
932    ///         max_concurrent_requests: 100,
933    ///     };
934    ///
935    ///     server.run_websocket_with_config("127.0.0.1:8080", config).await?;
936    ///     Ok(())
937    /// }
938    /// ```
939    #[cfg(feature = "websocket")]
940    #[tracing::instrument(skip(self, config), fields(
941        transport = "websocket",
942        service_name = %self.config.name,
943        service_version = %self.config.version,
944        addr = ?addr
945    ))]
946    pub async fn run_websocket_with_config<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
947        self,
948        addr: A,
949        config: crate::config::WebSocketServerConfig,
950    ) -> ServerResult<()> {
951        use axum::{Router, middleware, routing::get};
952        use turbomcp_transport::axum::{WebSocketFactoryState, websocket_handler_with_factory};
953        use turbomcp_transport::tower::SessionInfo;
954
955        // Sprint 2.6: Check for insecure 0.0.0.0 binding
956        crate::security_checks::check_binding_security(&addr);
957
958        info!("Starting MCP server with WebSocket transport");
959        info!(config = ?config, "WebSocket configuration");
960
961        self.lifecycle.start().await;
962
963        // Resolve address to string
964        let socket_addr = addr
965            .to_socket_addrs()
966            .map_err(|e| crate::ServerError::configuration(format!("Invalid address: {}", e)))?
967            .next()
968            .ok_or_else(|| crate::ServerError::configuration("No address resolved"))?;
969
970        info!("Resolved address: {}", socket_addr);
971
972        // Capture server identity for MCP protocol compliance
973        let server_info = turbomcp_protocol::ServerInfo {
974            name: self.config.name.clone(),
975            version: self.config.version.clone(),
976        };
977
978        // Router for this server (shared across all connections)
979        let router = (*self.router).clone();
980
981        // Factory: creates per-connection handler with bidirectional support
982        // This is the unified architecture - transport layer handles WebSocket mechanics,
983        // server layer provides MCP-specific handler logic
984        let handler_factory =
985            move |transport_dispatcher: turbomcp_transport::axum::WebSocketDispatcher,
986                  headers: Option<std::collections::HashMap<String, String>>| {
987                // Wrap transport dispatcher with server layer adapter
988                let server_dispatcher =
989                    crate::routing::WebSocketDispatcherAdapter::new(transport_dispatcher);
990
991                // Clone router for this connection and configure with dispatcher
992                let mut connection_router = router.clone();
993                connection_router.set_server_request_dispatcher(server_dispatcher);
994
995                // Create wrapper that passes headers to create_context (WebSocket transport)
996                // We can reuse HttpHandlerWithHeaders since it's generic
997                Arc::new(HttpHandlerWithHeaders {
998                    router: connection_router,
999                    headers,
1000                    transport: "websocket",
1001                }) as Arc<dyn turbomcp_protocol::JsonRpcHandler>
1002            };
1003
1004        info!(
1005            server_name = %server_info.name,
1006            server_version = %server_info.version,
1007            bind_addr = %socket_addr,
1008            endpoint_path = %config.endpoint_path,
1009            "WebSocket server starting with full bidirectional support (elicitation, sampling, roots, ping)"
1010        );
1011
1012        // Create factory state for transport layer
1013        let factory_state = WebSocketFactoryState::new(handler_factory);
1014
1015        // Session middleware to extract headers (same as HTTP transport)
1016        let session_middleware = |mut request: axum::extract::Request, next: middleware::Next| async move {
1017            let mut session = SessionInfo::new();
1018
1019            // Extract headers and store in session metadata
1020            for (name, value) in request.headers().iter() {
1021                if let Ok(value_str) = value.to_str() {
1022                    session
1023                        .metadata
1024                        .insert(name.to_string(), value_str.to_string());
1025                }
1026            }
1027
1028            // Extract specific useful headers
1029            if let Some(user_agent) = request
1030                .headers()
1031                .get("user-agent")
1032                .and_then(|v| v.to_str().ok())
1033            {
1034                session.user_agent = Some(user_agent.to_string());
1035            }
1036
1037            if let Some(remote_addr) = request
1038                .headers()
1039                .get("x-forwarded-for")
1040                .and_then(|v| v.to_str().ok())
1041            {
1042                session.remote_addr = Some(remote_addr.to_string());
1043            }
1044
1045            request.extensions_mut().insert(session);
1046            next.run(request).await
1047        };
1048
1049        // Build Axum router using transport layer
1050        let app = Router::new()
1051            .route(&config.endpoint_path, get(websocket_handler_with_factory))
1052            .with_state(factory_state)
1053            .layer(middleware::from_fn(session_middleware));
1054
1055        info!("WebSocket server bound to {}", socket_addr);
1056
1057        // Serve using Axum
1058        let listener = tokio::net::TcpListener::bind(socket_addr)
1059            .await
1060            .map_err(|e| crate::ServerError::configuration(format!("Failed to bind: {}", e)))?;
1061
1062        axum::serve(listener, app).await.map_err(|e| {
1063            tracing::error!(error = %e, "WebSocket server failed");
1064            crate::ServerError::handler(e.to_string())
1065        })?;
1066
1067        info!("WebSocket server shutdown complete");
1068        Ok(())
1069    }
1070
1071    /// Run server with TCP transport (progressive enhancement - runtime configuration)
1072    #[cfg(feature = "tcp")]
1073    #[tracing::instrument(skip(self), fields(
1074        transport = "tcp",
1075        service_name = %self.config.name,
1076        service_version = %self.config.version,
1077        addr = ?addr
1078    ))]
1079    pub async fn run_tcp<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
1080        mut self,
1081        addr: A,
1082    ) -> ServerResult<()> {
1083        use turbomcp_transport::TcpTransport;
1084
1085        // Sprint 2.6: Check for insecure 0.0.0.0 binding
1086        crate::security_checks::check_binding_security(&addr);
1087
1088        // Start performance monitoring for TCP server
1089        let _perf_span = info_span!("server.run", transport = "tcp").entered();
1090        info!(?addr, "Starting MCP server with TCP transport");
1091
1092        self.lifecycle.start().await;
1093
1094        // Convert ToSocketAddrs to SocketAddr
1095        let socket_addr = match addr.to_socket_addrs() {
1096            Ok(mut addrs) => match addrs.next() {
1097                Some(addr) => addr,
1098                None => {
1099                    tracing::error!("No socket address resolved from provided address");
1100                    self.lifecycle.shutdown().await;
1101                    return Err(crate::ServerError::configuration("Invalid socket address"));
1102                }
1103            },
1104            Err(e) => {
1105                tracing::error!(error = %e, "Failed to resolve socket address");
1106                self.lifecycle.shutdown().await;
1107                return Err(crate::ServerError::configuration(format!(
1108                    "Address resolution failed: {e}"
1109                )));
1110            }
1111        };
1112
1113        let transport = TcpTransport::new_server(socket_addr);
1114        if let Err(e) = transport.connect().await {
1115            tracing::error!(error = %e, "Failed to connect TCP transport");
1116            self.lifecycle.shutdown().await;
1117            return Err(e.into());
1118        }
1119
1120        // BIDIRECTIONAL TCP SETUP
1121        // Create generic transport dispatcher for server-initiated requests
1122        let dispatcher = crate::runtime::TransportDispatcher::new(transport);
1123
1124        // Configure router's bidirectional support with the TCP dispatcher
1125        // This enables ctx.elicit(), ctx.create_message(), ctx.list_roots(), etc.
1126        let router = Arc::make_mut(&mut self.router);
1127        router.set_server_request_dispatcher(dispatcher.clone());
1128
1129        // Run TCP with full bidirectional support (MCP 2025-06-18 compliant)
1130        // This uses the generic bidirectional runtime that handles both:
1131        // - Client→Server requests (tools, resources, prompts)
1132        // - Server→Client requests (sampling, elicitation, roots, ping)
1133        crate::runtime::run_transport_bidirectional(self.router.clone(), dispatcher)
1134            .await
1135            .map_err(|e| crate::ServerError::Handler {
1136                message: format!("TCP bidirectional runtime failed: {}", e),
1137                context: Some("run_tcp".to_string()),
1138            })
1139    }
1140
1141    /// Run server with Unix socket transport (progressive enhancement - runtime configuration)
1142    #[cfg(all(feature = "unix", unix))]
1143    #[tracing::instrument(skip(self), fields(
1144        transport = "unix",
1145        service_name = %self.config.name,
1146        service_version = %self.config.version,
1147        path = ?path.as_ref()
1148    ))]
1149    pub async fn run_unix<P: AsRef<std::path::Path>>(mut self, path: P) -> ServerResult<()> {
1150        use std::path::PathBuf;
1151        use turbomcp_transport::UnixTransport;
1152
1153        // Start performance monitoring for Unix server
1154        let _perf_span = info_span!("server.run", transport = "unix").entered();
1155        info!(path = ?path.as_ref(), "Starting MCP server with Unix socket transport");
1156
1157        self.lifecycle.start().await;
1158
1159        let socket_path = PathBuf::from(path.as_ref());
1160        let transport = UnixTransport::new_server(socket_path);
1161        if let Err(e) = transport.connect().await {
1162            tracing::error!(error = %e, "Failed to connect Unix socket transport");
1163            self.lifecycle.shutdown().await;
1164            return Err(e.into());
1165        }
1166
1167        // BIDIRECTIONAL UNIX SOCKET SETUP
1168        // Create generic transport dispatcher for server-initiated requests
1169        let dispatcher = crate::runtime::TransportDispatcher::new(transport);
1170
1171        // Configure router's bidirectional support with the Unix socket dispatcher
1172        // This enables ctx.elicit(), ctx.create_message(), ctx.list_roots(), etc.
1173        let router = Arc::make_mut(&mut self.router);
1174        router.set_server_request_dispatcher(dispatcher.clone());
1175
1176        // Run Unix Socket with full bidirectional support (MCP 2025-06-18 compliant)
1177        // This uses the generic bidirectional runtime that handles both:
1178        // - Client→Server requests (tools, resources, prompts)
1179        // - Server→Client requests (sampling, elicitation, roots, ping)
1180        crate::runtime::run_transport_bidirectional(self.router.clone(), dispatcher)
1181            .await
1182            .map_err(|e| crate::ServerError::Handler {
1183                message: format!("Unix socket bidirectional runtime failed: {}", e),
1184                context: Some("run_unix".to_string()),
1185            })
1186    }
1187
1188    /// Generic transport runner (DRY principle)
1189    /// Used by feature-gated transport methods (http, tcp, websocket, unix)
1190    #[allow(dead_code)]
1191    #[tracing::instrument(skip(self, transport), fields(
1192        service_name = %self.config.name,
1193        service_version = %self.config.version
1194    ))]
1195    async fn run_with_transport<T: Transport>(&self, mut transport: T) -> ServerResult<()> {
1196        // Install signal handlers for graceful shutdown (Ctrl+C / SIGTERM)
1197        let lifecycle_for_sigint = self.lifecycle.clone();
1198        tokio::spawn(async move {
1199            if let Err(e) = tokio::signal::ctrl_c().await {
1200                tracing::warn!(error = %e, "Failed to install Ctrl+C handler");
1201                return;
1202            }
1203            tracing::info!("Ctrl+C received, initiating shutdown");
1204            lifecycle_for_sigint.shutdown().await;
1205        });
1206
1207        #[cfg(unix)]
1208        {
1209            let lifecycle_for_sigterm = self.lifecycle.clone();
1210            tokio::spawn(async move {
1211                use tokio::signal::unix::{SignalKind, signal};
1212                match signal(SignalKind::terminate()) {
1213                    Ok(mut sigterm) => {
1214                        sigterm.recv().await;
1215                        tracing::info!("SIGTERM received, initiating shutdown");
1216                        lifecycle_for_sigterm.shutdown().await;
1217                    }
1218                    Err(e) => tracing::warn!(error = %e, "Failed to install SIGTERM handler"),
1219                }
1220            });
1221        }
1222
1223        // Shutdown signal
1224        let mut shutdown = self.lifecycle.shutdown_signal();
1225
1226        // Main message processing loop
1227        loop {
1228            tokio::select! {
1229                _ = shutdown.recv() => {
1230                    tracing::info!("Shutdown signal received");
1231                    break;
1232                }
1233                res = transport.receive() => {
1234                    match res {
1235                        Ok(Some(message)) => {
1236                            if let Err(e) = self.handle_transport_message(&mut transport, message).await {
1237                                tracing::warn!(error = %e, "Failed to handle transport message");
1238                            }
1239                        }
1240                        Ok(None) => {
1241                            // No message available; sleep briefly to avoid busy loop
1242                            sleep(Duration::from_millis(5)).await;
1243                        }
1244                        Err(e) => {
1245                            match e {
1246                                TransportError::ReceiveFailed(msg) if msg.contains("disconnected") => {
1247                                    tracing::info!("Transport receive channel disconnected; shutting down");
1248                                    break;
1249                                }
1250                                _ => {
1251                                    tracing::error!(error = %e, "Transport receive failed");
1252                                    // Backoff on errors
1253                                    sleep(Duration::from_millis(50)).await;
1254                                }
1255                            }
1256                        }
1257                    }
1258                }
1259            }
1260        }
1261
1262        // Disconnect transport
1263        if let Err(e) = transport.disconnect().await {
1264            tracing::warn!(error = %e, "Error while disconnecting transport");
1265        }
1266
1267        tracing::info!("Server shutdown complete");
1268        Ok(())
1269    }
1270}
1271
1272// Compile-time assertion that McpServer is Send + Clone (Tower pattern)
1273// Note: McpServer is Clone but NOT Sync (due to BoxCloneService being !Sync)
1274// This is intentional and follows the Axum/Tower design pattern
1275#[allow(dead_code)]
1276const _: () = {
1277    const fn assert_send_clone<T: Send + Clone>() {}
1278    const fn check() {
1279        assert_send_clone::<crate::server::core::McpServer>();
1280    }
1281};