turbomcp_server/server/
core.rs

1//! Core MCP server implementation
2//!
3//! Contains the main McpServer struct and its core functionality including
4//! middleware building, lifecycle management, and server construction.
5
6use std::sync::Arc;
7use tracing::{info, info_span};
8
9use crate::{
10    config::ServerConfig,
11    error::ServerResult,
12    lifecycle::{HealthStatus, ServerLifecycle},
13    metrics::ServerMetrics,
14    registry::HandlerRegistry,
15    routing::RequestRouter,
16    service::McpService,
17};
18
19#[cfg(feature = "middleware")]
20use crate::middleware::MiddlewareStack;
21
22use bytes::Bytes;
23use http::{Request, Response};
24use tokio::time::{Duration, sleep};
25use turbomcp_transport::Transport;
26use turbomcp_transport::core::TransportError;
27
28use super::shutdown::ShutdownHandle;
29
30/// Check if logging should be enabled for STDIO transport
31///
32/// For MCP STDIO transport compliance, logging is disabled by default since stdout
33/// must be reserved exclusively for JSON-RPC messages. This can be overridden by
34/// setting the TURBOMCP_FORCE_LOGGING environment variable.
35pub(crate) fn should_log_for_stdio() -> bool {
36    std::env::var("TURBOMCP_FORCE_LOGGING").is_ok()
37}
38
39/// Main MCP server following the Axum/Tower Clone pattern
40///
41/// ## Sharing Pattern
42///
43/// `McpServer` implements `Clone` like Axum's `Router`. All heavy state is Arc-wrapped
44/// internally, making cloning cheap (just atomic reference count increments).
45///
46/// ```rust,no_run
47/// use turbomcp_server::ServerBuilder;
48///
49/// # async fn example() {
50/// let server = ServerBuilder::new().build();
51///
52/// // Clone for passing to functions (cheap - just Arc increments)
53/// let server1 = server.clone();
54/// let server2 = server.clone();
55///
56/// // Access config and health
57/// let config = server1.config();
58/// println!("Server: {}", config.name);
59///
60/// let health = server2.health().await;
61/// println!("Health: {:?}", health);
62/// # }
63/// ```
64///
65/// ## Architecture Notes
66///
67/// The `service` field contains `BoxCloneService` which is `Send + Clone` but NOT `Sync`.
68/// This is intentional and follows Tower's design - users clone the server instead of
69/// Arc-wrapping it.
70///
71/// **Architecture Note**: The service field provides tower::Service integration for
72/// advanced middleware patterns. The request processing pipeline currently uses the
73/// RequestRouter directly. Tower integration can be added via custom middleware layers
74/// when needed for specific use cases (e.g., custom rate limiting, advanced tracing).
75#[derive(Clone)]
76pub struct McpServer {
77    /// Server configuration (Clone-able)
78    pub(crate) config: ServerConfig,
79    /// Handler registry (Arc-wrapped for cheap cloning)
80    pub(crate) registry: Arc<HandlerRegistry>,
81    /// Request router (Arc-wrapped for cheap cloning)
82    pub(crate) router: Arc<RequestRouter>,
83    /// Tower middleware service stack (Clone but !Sync - this is the Tower pattern)
84    ///
85    /// All requests flow through this service stack, which provides:
86    /// - Timeout enforcement
87    /// - Request validation
88    /// - Authorization checks
89    /// - Rate limiting
90    /// - Audit logging
91    /// - And more middleware layers as configured
92    ///
93    /// See `server/transport.rs` for integration with transport layer.
94    pub(crate) service:
95        tower::util::BoxCloneService<Request<Bytes>, Response<Bytes>, crate::ServerError>,
96    /// Server lifecycle (Arc-wrapped for cheap cloning)
97    pub(crate) lifecycle: Arc<ServerLifecycle>,
98    /// Server metrics (Arc-wrapped for cheap cloning)
99    pub(crate) metrics: Arc<ServerMetrics>,
100}
101
102impl std::fmt::Debug for McpServer {
103    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104        f.debug_struct("McpServer")
105            .field("config", &self.config)
106            .finish()
107    }
108}
109
110impl McpServer {
111    /// Build comprehensive Tower middleware stack (transport-agnostic)
112    ///
113    /// ## Architecture
114    ///
115    /// This creates a complete Tower service stack with conditional middleware layers:
116    /// - **Timeout Layer**: Request timeout enforcement (tower_http)
117    /// - **Validation Layer**: JSON-RPC structure validation
118    /// - **Authorization Layer**: Resource access control
119    /// - **Core Service**: JSON-RPC routing and handler execution
120    ///
121    /// All middleware is composed using Tower's ServiceBuilder pattern, which provides:
122    /// - Top-to-bottom execution order
123    /// - Type-safe layer composition
124    /// - Zero-cost abstractions
125    /// - Clone-able service instances
126    ///
127    /// ## Integration
128    ///
129    /// The resulting BoxCloneService is stored in `self.service` and called from
130    /// `server/transport.rs` for every incoming request. This ensures ALL requests
131    /// flow through the complete middleware pipeline before reaching handlers.
132    ///
133    /// ## Adding Middleware
134    ///
135    /// To add new middleware, update the match arms below to include your layer.
136    /// Follow the pattern of conditional inclusion based on config flags.
137    #[cfg(feature = "middleware")]
138    fn build_middleware_stack(
139        core_service: McpService,
140        stack: MiddlewareStack,
141    ) -> tower::util::BoxCloneService<Request<Bytes>, Response<Bytes>, crate::ServerError> {
142        // COMPREHENSIVE TOWER COMPOSITION - Conditional Layer Stacking
143        //
144        // This approach builds the middleware stack incrementally, boxing at each step.
145        // While this has a small performance cost from multiple boxing operations,
146        // it provides several critical advantages:
147        //
148        // 1. **Maintainability**: No combinatorial explosion (8 match arms → simple chain)
149        // 2. **Extensibility**: Adding new middleware requires only one new block
150        // 3. **Clarity**: Each layer's purpose and configuration is explicit
151        // 4. **Type Safety**: BoxCloneService provides type erasure while preserving Clone
152        //
153        // Performance note: The boxing overhead is negligible compared to network I/O
154        // and handler execution time. Modern allocators make this essentially free.
155
156        // Start with core service as a boxed service for uniform type handling
157        let mut service: tower::util::BoxCloneService<
158            Request<Bytes>,
159            Response<Bytes>,
160            crate::ServerError,
161        > = tower::util::BoxCloneService::new(core_service);
162
163        // Authorization layer removed in 2.0.0 - handle at application layer
164
165        // Layer 2: Validation
166        // Validates request structure after auth but before processing
167        #[cfg(feature = "middleware")]
168        {
169            if let Some(validation_layer) = stack.validation_layer() {
170                service = tower::util::BoxCloneService::new(
171                    tower::ServiceBuilder::new()
172                        .layer(validation_layer)
173                        .service(service),
174                );
175            }
176        }
177
178        // Layer 3: Timeout (outermost)
179        // Applied last so it can enforce timeout on the entire request pipeline
180        #[cfg(feature = "middleware")]
181        {
182            if let Some(timeout_config) = stack.timeout_config
183                && timeout_config.enabled
184            {
185                service = tower::util::BoxCloneService::new(
186                    tower::ServiceBuilder::new()
187                        .layer(tower_http::timeout::TimeoutLayer::new(
188                            timeout_config.request_timeout,
189                        ))
190                        .service(service),
191                );
192            }
193        }
194
195        // Future middleware can be added here with similar if-let blocks:
196        // if let Some(auth_config) = stack.auth_config { ... }
197        // if let Some(audit_config) = stack.audit_config { ... }
198        // if let Some(rate_limit_config) = stack.rate_limit_config { ... }
199
200        service
201    }
202
203    /// Create a new server
204    #[must_use]
205    pub fn new(config: ServerConfig) -> Self {
206        Self::new_with_registry(config, HandlerRegistry::new())
207    }
208
209    /// Create a new server with an existing registry (used by ServerBuilder)
210    #[must_use]
211    pub(crate) fn new_with_registry(config: ServerConfig, registry: HandlerRegistry) -> Self {
212        let registry = Arc::new(registry);
213        let metrics = Arc::new(ServerMetrics::new());
214        let router = Arc::new(RequestRouter::new(
215            Arc::clone(&registry),
216            Arc::clone(&metrics),
217            config.clone(),
218        ));
219        // Build middleware stack configuration
220        #[cfg(feature = "middleware")]
221        #[cfg_attr(not(feature = "rate-limiting"), allow(unused_mut))]
222        let mut stack = crate::middleware::MiddlewareStack::new();
223
224        // Auto-install rate limiting if enabled in config
225        #[cfg(feature = "rate-limiting")]
226        if config.rate_limiting.enabled {
227            use crate::middleware::rate_limit::{RateLimitStrategy, RateLimits};
228            use std::num::NonZeroU32;
229            use std::time::Duration;
230
231            let rate_config = crate::middleware::RateLimitConfig {
232                strategy: RateLimitStrategy::Global,
233                limits: RateLimits {
234                    requests_per_period: NonZeroU32::new(
235                        config.rate_limiting.requests_per_second * 60,
236                    )
237                    .unwrap(), // Convert per-second to per-minute
238                    period: Duration::from_secs(60),
239                    burst_size: Some(NonZeroU32::new(config.rate_limiting.burst_capacity).unwrap()),
240                },
241                enabled: true,
242            };
243
244            stack = stack.with_rate_limit(rate_config);
245        }
246
247        // Create core MCP service
248        let core_service = McpService::new(
249            Arc::clone(&registry),
250            Arc::clone(&router),
251            Arc::clone(&metrics),
252        );
253
254        // COMPREHENSIVE TOWER SERVICE COMPOSITION
255        // Build the complete middleware stack with proper type erasure
256        //
257        // This service is called from server/transport.rs for EVERY incoming request:
258        // TransportMessage -> http::Request -> service.call() -> http::Response -> TransportMessage
259        //
260        // The Tower middleware stack provides:
261        // ✓ Timeout enforcement (configurable per-request)
262        // ✓ Request validation (JSON-RPC structure)
263        // ✓ Authorization checks (resource access control)
264        // ✓ Rate limiting (if enabled in config)
265        // ✓ Audit logging (configurable)
266        // ✓ And more layers as configured
267        //
268        // BoxCloneService is Clone but !Sync - this is the Tower pattern
269        #[cfg(feature = "middleware")]
270        let service = Self::build_middleware_stack(core_service, stack);
271
272        #[cfg(not(feature = "middleware"))]
273        let service = tower::util::BoxCloneService::new(core_service);
274
275        let lifecycle = Arc::new(ServerLifecycle::new());
276
277        Self {
278            config,
279            registry,
280            router,
281            service,
282            lifecycle,
283            metrics,
284        }
285    }
286
287    /// Get server configuration
288    #[must_use]
289    pub const fn config(&self) -> &ServerConfig {
290        &self.config
291    }
292
293    /// Get handler registry
294    #[must_use]
295    pub const fn registry(&self) -> &Arc<HandlerRegistry> {
296        &self.registry
297    }
298
299    /// Get request router
300    #[must_use]
301    pub const fn router(&self) -> &Arc<RequestRouter> {
302        &self.router
303    }
304
305    /// Get server lifecycle
306    #[must_use]
307    pub const fn lifecycle(&self) -> &Arc<ServerLifecycle> {
308        &self.lifecycle
309    }
310
311    /// Get server metrics
312    #[must_use]
313    pub const fn metrics(&self) -> &Arc<ServerMetrics> {
314        &self.metrics
315    }
316
317    /// Get the Tower service stack (test accessor)
318    ///
319    /// **Note**: This is primarily for integration testing. Production code should
320    /// use the transport layer which calls the service internally via
321    /// `handle_transport_message()`.
322    ///
323    /// Returns a clone of the Tower service stack, which is cheap (BoxCloneService
324    /// is designed for cloning).
325    #[doc(hidden)]
326    pub fn service(
327        &self,
328    ) -> tower::util::BoxCloneService<Request<Bytes>, Response<Bytes>, crate::ServerError> {
329        self.service.clone()
330    }
331
332    /// Get a shutdown handle for graceful server termination
333    ///
334    /// This handle enables external control over server shutdown, essential for:
335    /// - **Production deployments**: Graceful shutdown on SIGTERM/SIGINT
336    /// - **Container orchestration**: Kubernetes graceful pod termination
337    /// - **Load balancer integration**: Health check coordination
338    /// - **Multi-component systems**: Coordinated shutdown sequences
339    /// - **Maintenance operations**: Planned downtime and updates
340    ///
341    /// # Examples
342    ///
343    /// ## Basic shutdown coordination
344    /// ```no_run
345    /// # use turbomcp_server::ServerBuilder;
346    /// # #[tokio::main]
347    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
348    /// let server = ServerBuilder::new().build();
349    /// let shutdown_handle = server.shutdown_handle();
350    ///
351    /// // Coordinate with other services
352    /// tokio::spawn(async move {
353    ///     // Wait for external shutdown signal
354    ///     tokio::signal::ctrl_c().await.expect("Failed to install Ctrl+C handler");
355    ///     println!("Shutdown signal received, terminating gracefully...");
356    ///     shutdown_handle.shutdown().await;
357    /// });
358    ///
359    /// // Server will gracefully shut down when signaled
360    /// // server.run_stdio().await?;
361    /// # Ok(())
362    /// # }
363    /// ```
364    ///
365    /// ## Container/Kubernetes deployment
366    /// ```no_run
367    /// # use turbomcp_server::ServerBuilder;
368    /// # use std::sync::Arc;
369    /// # #[tokio::main]
370    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
371    /// let server = ServerBuilder::new().build();
372    /// let shutdown_handle = server.shutdown_handle();
373    /// let shutdown_handle_clone = shutdown_handle.clone();
374    ///
375    /// // Handle multiple signal types with proper platform support
376    /// tokio::spawn(async move {
377    ///     #[cfg(unix)]
378    ///     {
379    ///         use tokio::signal::unix::{signal, SignalKind};
380    ///         let mut sigterm = signal(SignalKind::terminate()).unwrap();
381    ///         tokio::select! {
382    ///             _ = tokio::signal::ctrl_c() => {
383    ///                 println!("SIGINT received");
384    ///             }
385    ///             _ = sigterm.recv() => {
386    ///                 println!("SIGTERM received");
387    ///             }
388    ///         }
389    ///     }
390    ///     #[cfg(not(unix))]
391    ///     {
392    ///         tokio::signal::ctrl_c().await.expect("Failed to install Ctrl+C handler");
393    ///         println!("SIGINT received");
394    ///     }
395    ///     shutdown_handle_clone.shutdown().await;
396    /// });
397    ///
398    /// // Server handles graceful shutdown automatically
399    /// // server.run_tcp("0.0.0.0:8080").await?;
400    /// # Ok(())
401    /// # }
402    /// ```
403    pub fn shutdown_handle(&self) -> ShutdownHandle {
404        ShutdownHandle::new(self.lifecycle.clone())
405    }
406
407    /// Run the server with STDIO transport
408    ///
409    /// # Errors
410    ///
411    /// Returns [`crate::ServerError::Transport`] if:
412    /// - STDIO transport connection fails
413    /// - Message sending/receiving fails
414    /// - Transport disconnection fails
415    #[tracing::instrument(skip(self), fields(
416        transport = "stdio",
417        service_name = %self.config.name,
418        service_version = %self.config.version
419    ))]
420    pub async fn run_stdio(mut self) -> ServerResult<()> {
421        // For STDIO transport, disable logging unless explicitly overridden
422        // STDIO stdout must be reserved exclusively for JSON-RPC messages per MCP protocol
423        if should_log_for_stdio() {
424            info!("Starting MCP server with STDIO transport");
425        }
426
427        // Start performance monitoring for STDIO server
428        let _perf_span = info_span!("server.run", transport = "stdio").entered();
429        info!("Initializing STDIO transport for MCP server");
430
431        self.lifecycle.start().await;
432
433        // BIDIRECTIONAL STDIO SETUP
434        // Create STDIO dispatcher for server-initiated requests (sampling, elicitation, roots, ping)
435        let (request_tx, request_rx) = tokio::sync::mpsc::unbounded_channel();
436
437        // Use fully-qualified path to avoid ambiguity with the turbomcp crate's runtime module
438        let dispatcher = crate::runtime::StdioDispatcher::new(request_tx);
439
440        // Configure router's bidirectional support with the STDIO dispatcher
441        // SAFETY: We have &mut self, so we can safely get mutable access to the Arc'd router
442        // This is the CRITICAL STEP that was missing - without this, all server→client requests fail
443        let router = Arc::make_mut(&mut self.router);
444        router.set_server_request_dispatcher(dispatcher.clone());
445
446        // Run STDIO with full bidirectional support (MCP 2025-06-18 compliant)
447        // This uses the bidirectional-aware runtime that handles both:
448        // - Client→Server requests (tools, resources, prompts)
449        // - Server→Client requests (sampling, elicitation, roots, ping)
450        crate::runtime::run_stdio_bidirectional(self.router.clone(), dispatcher, request_rx)
451            .await
452            .map_err(|e| crate::ServerError::Handler {
453                message: format!("STDIO bidirectional runtime failed: {}", e),
454                context: Some("run_stdio".to_string()),
455            })
456    }
457
458    /// Get health status
459    pub async fn health(&self) -> HealthStatus {
460        self.lifecycle.health().await
461    }
462
463    /// Run server with HTTP transport using default configuration
464    ///
465    /// This provides a working HTTP server with:
466    /// - Standard HTTP POST/GET/DELETE for MCP protocol at `/mcp`
467    /// - Full MCP 2025-06-18 protocol compliance
468    /// - Graceful shutdown support
469    /// - Default rate limiting (100 req/60s)
470    /// - Default security settings (localhost allowed, CORS disabled)
471    ///
472    /// For custom configuration (rate limits, security, CORS), use `run_http_with_config`.
473    ///
474    /// # Examples
475    ///
476    /// ## Basic usage with default configuration
477    /// ```no_run
478    /// use turbomcp_server::ServerBuilder;
479    ///
480    /// #[tokio::main]
481    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
482    ///     let server = ServerBuilder::new()
483    ///         .name("my-server")
484    ///         .version("1.0.0")
485    ///         .build();
486    ///
487    ///     server.run_http("127.0.0.1:3000").await?;
488    ///     Ok(())
489    /// }
490    /// ```
491    ///
492    /// ## With custom configuration
493    /// ```no_run
494    /// use turbomcp_server::ServerBuilder;
495    /// use turbomcp_transport::streamable_http_v2::StreamableHttpConfigBuilder;
496    /// use std::time::Duration;
497    ///
498    /// #[tokio::main]
499    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
500    ///     let server = ServerBuilder::new()
501    ///         .name("my-server")
502    ///         .version("1.0.0")
503    ///         .build();
504    ///
505    ///     let config = StreamableHttpConfigBuilder::new()
506    ///         .without_rate_limit()  // For benchmarking
507    ///         .allow_any_origin(true)  // Enable CORS
508    ///         .build();
509    ///
510    ///     server.run_http_with_config("127.0.0.1:3000", config).await?;
511    ///     Ok(())
512    /// }
513    /// ```
514    ///
515    /// # Errors
516    ///
517    /// Returns [`crate::ServerError::Transport`] if:
518    /// - Address resolution fails
519    /// - HTTP server fails to start
520    /// - Transport disconnection fails
521    #[cfg(feature = "http")]
522    #[tracing::instrument(skip(self), fields(
523        transport = "http",
524        service_name = %self.config.name,
525        service_version = %self.config.version,
526        addr = ?addr
527    ))]
528    pub async fn run_http<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
529        self,
530        addr: A,
531    ) -> ServerResult<()> {
532        use turbomcp_transport::streamable_http_v2::StreamableHttpConfigBuilder;
533
534        // Build default configuration
535        let config = StreamableHttpConfigBuilder::new().build();
536
537        self.run_http_with_config(addr, config).await
538    }
539
540    /// Run server with HTTP transport and custom configuration
541    ///
542    /// This provides full control over HTTP server configuration including:
543    /// - Rate limiting (requests per time window, or disabled entirely)
544    /// - Security settings (CORS, origin validation, authentication)
545    /// - Network settings (bind address, endpoint path, keep-alive)
546    /// - Advanced settings (replay buffer size, etc.)
547    ///
548    /// # Examples
549    ///
550    /// ## Benchmarking configuration (no rate limits)
551    /// ```no_run
552    /// use turbomcp_server::ServerBuilder;
553    /// use turbomcp_transport::streamable_http_v2::StreamableHttpConfigBuilder;
554    ///
555    /// #[tokio::main]
556    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
557    ///     let server = ServerBuilder::new()
558    ///         .name("benchmark-server")
559    ///         .version("1.0.0")
560    ///         .build();
561    ///
562    ///     let config = StreamableHttpConfigBuilder::new()
563    ///         .without_rate_limit()  // Disable rate limiting
564    ///         .build();
565    ///
566    ///     server.run_http_with_config("127.0.0.1:3000", config).await?;
567    ///     Ok(())
568    /// }
569    /// ```
570    ///
571    /// ## Production configuration (secure, rate limited)
572    /// ```no_run
573    /// use turbomcp_server::ServerBuilder;
574    /// use turbomcp_transport::streamable_http_v2::StreamableHttpConfigBuilder;
575    /// use std::time::Duration;
576    ///
577    /// #[tokio::main]
578    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
579    ///     let server = ServerBuilder::new()
580    ///         .name("production-server")
581    ///         .version("1.0.0")
582    ///         .build();
583    ///
584    ///     let config = StreamableHttpConfigBuilder::new()
585    ///         .with_rate_limit(1000, Duration::from_secs(60))  // 1000 req/min
586    ///         .allow_any_origin(false)  // Strict CORS
587    ///         .require_authentication(true)  // Require auth
588    ///         .build();
589    ///
590    ///     server.run_http_with_config("127.0.0.1:3000", config).await?;
591    ///     Ok(())
592    /// }
593    /// ```
594    ///
595    /// # Errors
596    ///
597    /// Returns [`crate::ServerError::Transport`] if:
598    /// - Address resolution fails
599    /// - HTTP server fails to start
600    /// - Transport disconnection fails
601    #[cfg(feature = "http")]
602    #[tracing::instrument(skip(self, config), fields(
603        transport = "http",
604        service_name = %self.config.name,
605        service_version = %self.config.version,
606        addr = ?addr
607    ))]
608    pub async fn run_http_with_config<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
609        self,
610        addr: A,
611        config: turbomcp_transport::streamable_http_v2::StreamableHttpConfig,
612    ) -> ServerResult<()> {
613        use std::collections::HashMap;
614        use tokio::sync::{Mutex, RwLock};
615
616        info!("Starting MCP server with HTTP transport");
617        info!(
618            config = ?config,
619            "HTTP configuration loaded"
620        );
621
622        self.lifecycle.start().await;
623
624        // Resolve address to string
625        let socket_addr = addr
626            .to_socket_addrs()
627            .map_err(|e| crate::ServerError::configuration(format!("Invalid address: {}", e)))?
628            .next()
629            .ok_or_else(|| crate::ServerError::configuration("No address resolved"))?;
630
631        info!("Resolved address: {}", socket_addr);
632
633        // BIDIRECTIONAL HTTP SETUP
634        // Create shared state for session management and bidirectional MCP
635        let sessions = Arc::new(RwLock::new(HashMap::new()));
636        let pending_requests = Arc::new(Mutex::new(HashMap::new()));
637
638        // Share router across all sessions (routing logic and handler registry)
639        let router = self.router.clone();
640
641        // Capture server identity for MCP protocol compliance
642        let server_info = turbomcp_protocol::ServerInfo {
643            name: self.config.name.clone(),
644            version: self.config.version.clone(),
645        };
646
647        // Factory pattern: create session-specific router for each HTTP request
648        // This is the clean architecture that HTTP requires - each session gets its own
649        // bidirectional dispatcher while sharing the routing logic
650        let sessions_for_factory = Arc::clone(&sessions);
651        let pending_for_factory = Arc::clone(&pending_requests);
652        let router_for_factory = Arc::clone(&router);
653
654        let handler_factory = move |session_id: Option<String>| {
655            let session_id = session_id.unwrap_or_else(|| {
656                let new_id = uuid::Uuid::new_v4().to_string();
657                tracing::warn!(
658                    "⚠️ Factory generating random session ID (no session ID provided): {}",
659                    new_id
660                );
661                new_id
662            });
663
664            tracing::debug!("Factory creating handler for session: {}", session_id);
665
666            // Create session-specific HTTP dispatcher (now local to turbomcp-server!)
667            let dispatcher = crate::runtime::http::HttpDispatcher::new(
668                session_id,
669                Arc::clone(&sessions_for_factory),
670                Arc::clone(&pending_for_factory),
671            );
672
673            // Clone the base router and configure with session-specific dispatcher
674            // CRITICAL: set_server_request_dispatcher also recreates server_to_client adapter
675            let mut session_router = (*router_for_factory).clone();
676            session_router.set_server_request_dispatcher(dispatcher);
677
678            session_router
679        };
680
681        info!(
682            server_name = %server_info.name,
683            server_version = %server_info.version,
684            bind_addr = %socket_addr,
685            endpoint_path = %config.endpoint_path,
686            "HTTP server starting with full bidirectional support (elicitation, sampling, roots, ping)"
687        );
688
689        // Use factory-based HTTP server with full bidirectional support
690        use crate::runtime::http::run_http;
691        run_http(
692            handler_factory,
693            sessions,
694            pending_requests,
695            socket_addr.to_string(),
696            config.endpoint_path.clone(),
697        )
698        .await
699        .map_err(|e| {
700            tracing::error!(error = %e, "HTTP server failed");
701            crate::ServerError::handler(e.to_string())
702        })?;
703
704        info!("HTTP server shutdown complete");
705        Ok(())
706    }
707
708    /// Run server with WebSocket transport (full bidirectional support)
709    ///
710    /// This provides a simple API for WebSocket servers with sensible defaults:
711    /// - Default endpoint: `/mcp/ws`
712    /// - Full MCP 2025-06-18 compliance
713    /// - Bidirectional communication
714    /// - Elicitation support
715    /// - Session management and middleware
716    ///
717    /// For custom configuration, use `run_websocket_with_config()`.
718    ///
719    /// # Example
720    ///
721    /// ```no_run
722    /// use turbomcp_server::ServerBuilder;
723    ///
724    /// #[tokio::main]
725    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
726    ///     let server = ServerBuilder::new()
727    ///         .name("ws-server")
728    ///         .version("1.0.0")
729    ///         .build();
730    ///
731    ///     server.run_websocket("127.0.0.1:8080").await?;
732    ///     Ok(())
733    /// }
734    /// ```
735    #[cfg(feature = "websocket")]
736    #[tracing::instrument(skip(self), fields(
737        transport = "websocket",
738        service_name = %self.config.name,
739        service_version = %self.config.version,
740        addr = ?addr
741    ))]
742    pub async fn run_websocket<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
743        self,
744        addr: A,
745    ) -> ServerResult<()> {
746        use crate::runtime::websocket::WebSocketServerConfig;
747
748        // Build default configuration
749        let config = WebSocketServerConfig::default();
750
751        self.run_websocket_with_config(addr, config).await
752    }
753
754    /// Run server with WebSocket transport and custom configuration
755    ///
756    /// This provides full control over WebSocket server configuration including:
757    /// - Custom endpoint path
758    /// - MCP server settings (middleware, security, etc.)
759    ///
760    /// # Example
761    ///
762    /// ```no_run
763    /// use turbomcp_server::{ServerBuilder, WebSocketServerConfig};
764    ///
765    /// #[tokio::main]
766    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
767    ///     let server = ServerBuilder::new()
768    ///         .name("custom-ws-server")
769    ///         .version("1.0.0")
770    ///         .build();
771    ///
772    ///     let config = WebSocketServerConfig {
773    ///         bind_addr: "0.0.0.0:8080".to_string(),
774    ///         endpoint_path: "/custom/ws".to_string(),
775    ///     };
776    ///
777    ///     server.run_websocket_with_config("127.0.0.1:8080", config).await?;
778    ///     Ok(())
779    /// }
780    /// ```
781    #[cfg(feature = "websocket")]
782    #[tracing::instrument(skip(self, config), fields(
783        transport = "websocket",
784        service_name = %self.config.name,
785        service_version = %self.config.version,
786        addr = ?addr
787    ))]
788    pub async fn run_websocket_with_config<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
789        self,
790        addr: A,
791        config: crate::runtime::websocket::WebSocketServerConfig,
792    ) -> ServerResult<()> {
793        info!("Starting MCP server with WebSocket transport");
794        info!(config = ?config, "WebSocket configuration");
795
796        self.lifecycle.start().await;
797
798        // Resolve address to string
799        let socket_addr = addr
800            .to_socket_addrs()
801            .map_err(|e| crate::ServerError::configuration(format!("Invalid address: {}", e)))?
802            .next()
803            .ok_or_else(|| crate::ServerError::configuration("No address resolved"))?;
804
805        info!("Resolved address: {}", socket_addr);
806
807        // Capture server identity for MCP protocol compliance
808        let server_info = turbomcp_protocol::ServerInfo {
809            name: self.config.name.clone(),
810            version: self.config.version.clone(),
811        };
812
813        // Router for this server (shared across all connections)
814        let router = (*self.router).clone();
815
816        // Wrapper factory: configure router with per-connection dispatcher
817        // This is the same clean architecture as HTTP - each connection gets its own
818        // bidirectional dispatcher while sharing the routing logic
819        let wrapper_factory =
820            move |mut base_router: crate::routing::RequestRouter,
821                  dispatcher: crate::runtime::websocket::WebSocketServerDispatcher| {
822                // Clone the router for this connection and configure with dispatcher
823                // CRITICAL: set_server_request_dispatcher also recreates server_to_client adapter
824                base_router.set_server_request_dispatcher(dispatcher);
825                base_router
826            };
827
828        info!(
829            server_name = %server_info.name,
830            server_version = %server_info.version,
831            bind_addr = %socket_addr,
832            endpoint_path = %config.endpoint_path,
833            "WebSocket server starting with full bidirectional support (elicitation, sampling, roots, ping)"
834        );
835
836        // Use factory-based WebSocket server with full bidirectional support
837        use crate::runtime::websocket::run_websocket;
838
839        // Update config with resolved bind address
840        let ws_config = crate::runtime::websocket::WebSocketServerConfig {
841            bind_addr: socket_addr.to_string(),
842            endpoint_path: config.endpoint_path.clone(),
843            max_concurrent_requests: config.max_concurrent_requests,
844        };
845
846        run_websocket(router, wrapper_factory, ws_config)
847            .await
848            .map_err(|e| {
849                tracing::error!(error = %e, "WebSocket server failed");
850                crate::ServerError::handler(e.to_string())
851            })?;
852
853        info!("WebSocket server shutdown complete");
854        Ok(())
855    }
856
857    /// Run server with TCP transport (progressive enhancement - runtime configuration)
858    #[cfg(feature = "tcp")]
859    #[tracing::instrument(skip(self), fields(
860        transport = "tcp",
861        service_name = %self.config.name,
862        service_version = %self.config.version,
863        addr = ?addr
864    ))]
865    pub async fn run_tcp<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
866        mut self,
867        addr: A,
868    ) -> ServerResult<()> {
869        use turbomcp_transport::TcpTransport;
870
871        // Start performance monitoring for TCP server
872        let _perf_span = info_span!("server.run", transport = "tcp").entered();
873        info!(?addr, "Starting MCP server with TCP transport");
874
875        self.lifecycle.start().await;
876
877        // Convert ToSocketAddrs to SocketAddr
878        let socket_addr = match addr.to_socket_addrs() {
879            Ok(mut addrs) => match addrs.next() {
880                Some(addr) => addr,
881                None => {
882                    tracing::error!("No socket address resolved from provided address");
883                    self.lifecycle.shutdown().await;
884                    return Err(crate::ServerError::configuration("Invalid socket address"));
885                }
886            },
887            Err(e) => {
888                tracing::error!(error = %e, "Failed to resolve socket address");
889                self.lifecycle.shutdown().await;
890                return Err(crate::ServerError::configuration(format!(
891                    "Address resolution failed: {e}"
892                )));
893            }
894        };
895
896        let transport = TcpTransport::new_server(socket_addr);
897        if let Err(e) = transport.connect().await {
898            tracing::error!(error = %e, "Failed to connect TCP transport");
899            self.lifecycle.shutdown().await;
900            return Err(e.into());
901        }
902
903        // BIDIRECTIONAL TCP SETUP
904        // Create generic transport dispatcher for server-initiated requests
905        let dispatcher = crate::runtime::TransportDispatcher::new(transport);
906
907        // Configure router's bidirectional support with the TCP dispatcher
908        // This enables ctx.elicit(), ctx.create_message(), ctx.list_roots(), etc.
909        let router = Arc::make_mut(&mut self.router);
910        router.set_server_request_dispatcher(dispatcher.clone());
911
912        // Run TCP with full bidirectional support (MCP 2025-06-18 compliant)
913        // This uses the generic bidirectional runtime that handles both:
914        // - Client→Server requests (tools, resources, prompts)
915        // - Server→Client requests (sampling, elicitation, roots, ping)
916        crate::runtime::run_transport_bidirectional(self.router.clone(), dispatcher)
917            .await
918            .map_err(|e| crate::ServerError::Handler {
919                message: format!("TCP bidirectional runtime failed: {}", e),
920                context: Some("run_tcp".to_string()),
921            })
922    }
923
924    /// Run server with Unix socket transport (progressive enhancement - runtime configuration)
925    #[cfg(all(feature = "unix", unix))]
926    #[tracing::instrument(skip(self), fields(
927        transport = "unix",
928        service_name = %self.config.name,
929        service_version = %self.config.version,
930        path = ?path.as_ref()
931    ))]
932    pub async fn run_unix<P: AsRef<std::path::Path>>(mut self, path: P) -> ServerResult<()> {
933        use std::path::PathBuf;
934        use turbomcp_transport::UnixTransport;
935
936        // Start performance monitoring for Unix server
937        let _perf_span = info_span!("server.run", transport = "unix").entered();
938        info!(path = ?path.as_ref(), "Starting MCP server with Unix socket transport");
939
940        self.lifecycle.start().await;
941
942        let socket_path = PathBuf::from(path.as_ref());
943        let transport = UnixTransport::new_server(socket_path);
944        if let Err(e) = transport.connect().await {
945            tracing::error!(error = %e, "Failed to connect Unix socket transport");
946            self.lifecycle.shutdown().await;
947            return Err(e.into());
948        }
949
950        // BIDIRECTIONAL UNIX SOCKET SETUP
951        // Create generic transport dispatcher for server-initiated requests
952        let dispatcher = crate::runtime::TransportDispatcher::new(transport);
953
954        // Configure router's bidirectional support with the Unix socket dispatcher
955        // This enables ctx.elicit(), ctx.create_message(), ctx.list_roots(), etc.
956        let router = Arc::make_mut(&mut self.router);
957        router.set_server_request_dispatcher(dispatcher.clone());
958
959        // Run Unix Socket with full bidirectional support (MCP 2025-06-18 compliant)
960        // This uses the generic bidirectional runtime that handles both:
961        // - Client→Server requests (tools, resources, prompts)
962        // - Server→Client requests (sampling, elicitation, roots, ping)
963        crate::runtime::run_transport_bidirectional(self.router.clone(), dispatcher)
964            .await
965            .map_err(|e| crate::ServerError::Handler {
966                message: format!("Unix socket bidirectional runtime failed: {}", e),
967                context: Some("run_unix".to_string()),
968            })
969    }
970
971    /// Generic transport runner (DRY principle)
972    /// Used by feature-gated transport methods (http, tcp, websocket, unix)
973    #[allow(dead_code)]
974    #[tracing::instrument(skip(self, transport), fields(
975        service_name = %self.config.name,
976        service_version = %self.config.version
977    ))]
978    async fn run_with_transport<T: Transport>(&self, mut transport: T) -> ServerResult<()> {
979        // Install signal handlers for graceful shutdown (Ctrl+C / SIGTERM)
980        let lifecycle_for_sigint = self.lifecycle.clone();
981        tokio::spawn(async move {
982            if let Err(e) = tokio::signal::ctrl_c().await {
983                tracing::warn!(error = %e, "Failed to install Ctrl+C handler");
984                return;
985            }
986            tracing::info!("Ctrl+C received, initiating shutdown");
987            lifecycle_for_sigint.shutdown().await;
988        });
989
990        #[cfg(unix)]
991        {
992            let lifecycle_for_sigterm = self.lifecycle.clone();
993            tokio::spawn(async move {
994                use tokio::signal::unix::{SignalKind, signal};
995                match signal(SignalKind::terminate()) {
996                    Ok(mut sigterm) => {
997                        sigterm.recv().await;
998                        tracing::info!("SIGTERM received, initiating shutdown");
999                        lifecycle_for_sigterm.shutdown().await;
1000                    }
1001                    Err(e) => tracing::warn!(error = %e, "Failed to install SIGTERM handler"),
1002                }
1003            });
1004        }
1005
1006        // Shutdown signal
1007        let mut shutdown = self.lifecycle.shutdown_signal();
1008
1009        // Main message processing loop
1010        loop {
1011            tokio::select! {
1012                _ = shutdown.recv() => {
1013                    tracing::info!("Shutdown signal received");
1014                    break;
1015                }
1016                res = transport.receive() => {
1017                    match res {
1018                        Ok(Some(message)) => {
1019                            if let Err(e) = self.handle_transport_message(&mut transport, message).await {
1020                                tracing::warn!(error = %e, "Failed to handle transport message");
1021                            }
1022                        }
1023                        Ok(None) => {
1024                            // No message available; sleep briefly to avoid busy loop
1025                            sleep(Duration::from_millis(5)).await;
1026                        }
1027                        Err(e) => {
1028                            match e {
1029                                TransportError::ReceiveFailed(msg) if msg.contains("disconnected") => {
1030                                    tracing::info!("Transport receive channel disconnected; shutting down");
1031                                    break;
1032                                }
1033                                _ => {
1034                                    tracing::error!(error = %e, "Transport receive failed");
1035                                    // Backoff on errors
1036                                    sleep(Duration::from_millis(50)).await;
1037                                }
1038                            }
1039                        }
1040                    }
1041                }
1042            }
1043        }
1044
1045        // Disconnect transport
1046        if let Err(e) = transport.disconnect().await {
1047            tracing::warn!(error = %e, "Error while disconnecting transport");
1048        }
1049
1050        tracing::info!("Server shutdown complete");
1051        Ok(())
1052    }
1053}
1054
1055// Compile-time assertion that McpServer is Send + Clone (Tower pattern)
1056// Note: McpServer is Clone but NOT Sync (due to BoxCloneService being !Sync)
1057// This is intentional and follows the Axum/Tower design pattern
1058#[allow(dead_code)]
1059const _: () = {
1060    const fn assert_send_clone<T: Send + Clone>() {}
1061    const fn check() {
1062        assert_send_clone::<crate::server::core::McpServer>();
1063    }
1064};