turbomcp_server/server/core.rs
1//! Core MCP server implementation
2//!
3//! Contains the main McpServer struct and its core functionality including
4//! middleware building, lifecycle management, and server construction.
5
6use std::sync::Arc;
7use tracing::{info, info_span};
8
9use crate::{
10 config::ServerConfig,
11 error::ServerResult,
12 lifecycle::{HealthStatus, ServerLifecycle},
13 metrics::ServerMetrics,
14 registry::HandlerRegistry,
15 routing::RequestRouter,
16 service::McpService,
17};
18
19#[cfg(feature = "middleware")]
20use crate::middleware::MiddlewareStack;
21
22use bytes::Bytes;
23use http::{Request, Response};
24use tokio::time::{Duration, sleep};
25use turbomcp_transport::core::TransportError;
26use turbomcp_transport::{StdioTransport, Transport};
27
28use super::shutdown::ShutdownHandle;
29
30/// Check if logging should be enabled for STDIO transport
31///
32/// For MCP STDIO transport compliance, logging is disabled by default since stdout
33/// must be reserved exclusively for JSON-RPC messages. This can be overridden by
34/// setting the TURBOMCP_FORCE_LOGGING environment variable.
35pub(crate) fn should_log_for_stdio() -> bool {
36 std::env::var("TURBOMCP_FORCE_LOGGING").is_ok()
37}
38
39/// Main MCP server following the Axum/Tower Clone pattern
40///
41/// ## Sharing Pattern
42///
43/// `McpServer` implements `Clone` like Axum's `Router`. All heavy state is Arc-wrapped
44/// internally, making cloning cheap (just atomic reference count increments).
45///
46/// ```rust,no_run
47/// use turbomcp_server::ServerBuilder;
48///
49/// # async fn example() {
50/// let server = ServerBuilder::new().build();
51///
52/// // Clone for passing to functions (cheap - just Arc increments)
53/// let server1 = server.clone();
54/// let server2 = server.clone();
55///
56/// // Access config and health
57/// let config = server1.config();
58/// println!("Server: {}", config.name);
59///
60/// let health = server2.health().await;
61/// println!("Health: {:?}", health);
62/// # }
63/// ```
64///
65/// ## Architecture Notes
66///
67/// The `service` field contains `BoxCloneService` which is `Send + Clone` but NOT `Sync`.
68/// This is intentional and follows Tower's design - users clone the server instead of
69/// Arc-wrapping it.
70///
71/// **Architecture Note**: The service field provides tower::Service integration for
72/// advanced middleware patterns. The request processing pipeline currently uses the
73/// RequestRouter directly. Tower integration can be added via custom middleware layers
74/// when needed for specific use cases (e.g., custom rate limiting, advanced tracing).
75#[derive(Clone)]
76pub struct McpServer {
77 /// Server configuration (Clone-able)
78 pub(crate) config: ServerConfig,
79 /// Handler registry (Arc-wrapped for cheap cloning)
80 pub(crate) registry: Arc<HandlerRegistry>,
81 /// Request router (Arc-wrapped for cheap cloning)
82 pub(crate) router: Arc<RequestRouter>,
83 /// Tower middleware service stack (Clone but !Sync - this is the Tower pattern)
84 ///
85 /// All requests flow through this service stack, which provides:
86 /// - Timeout enforcement
87 /// - Request validation
88 /// - Authorization checks
89 /// - Rate limiting
90 /// - Audit logging
91 /// - And more middleware layers as configured
92 ///
93 /// See `server/transport.rs` for integration with transport layer.
94 pub(crate) service:
95 tower::util::BoxCloneService<Request<Bytes>, Response<Bytes>, crate::ServerError>,
96 /// Server lifecycle (Arc-wrapped for cheap cloning)
97 pub(crate) lifecycle: Arc<ServerLifecycle>,
98 /// Server metrics (Arc-wrapped for cheap cloning)
99 pub(crate) metrics: Arc<ServerMetrics>,
100}
101
102impl std::fmt::Debug for McpServer {
103 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104 f.debug_struct("McpServer")
105 .field("config", &self.config)
106 .finish()
107 }
108}
109
110impl McpServer {
111 /// Build comprehensive Tower middleware stack (transport-agnostic)
112 ///
113 /// ## Architecture
114 ///
115 /// This creates a complete Tower service stack with conditional middleware layers:
116 /// - **Timeout Layer**: Request timeout enforcement (tower_http)
117 /// - **Validation Layer**: JSON-RPC structure validation
118 /// - **Authorization Layer**: Resource access control
119 /// - **Core Service**: JSON-RPC routing and handler execution
120 ///
121 /// All middleware is composed using Tower's ServiceBuilder pattern, which provides:
122 /// - Top-to-bottom execution order
123 /// - Type-safe layer composition
124 /// - Zero-cost abstractions
125 /// - Clone-able service instances
126 ///
127 /// ## Integration
128 ///
129 /// The resulting BoxCloneService is stored in `self.service` and called from
130 /// `server/transport.rs` for every incoming request. This ensures ALL requests
131 /// flow through the complete middleware pipeline before reaching handlers.
132 ///
133 /// ## Adding Middleware
134 ///
135 /// To add new middleware, update the match arms below to include your layer.
136 /// Follow the pattern of conditional inclusion based on config flags.
137 #[cfg(feature = "middleware")]
138 fn build_middleware_stack(
139 core_service: McpService,
140 stack: MiddlewareStack,
141 ) -> tower::util::BoxCloneService<Request<Bytes>, Response<Bytes>, crate::ServerError> {
142 // COMPREHENSIVE TOWER COMPOSITION - Conditional Layer Stacking
143 //
144 // This approach builds the middleware stack incrementally, boxing at each step.
145 // While this has a small performance cost from multiple boxing operations,
146 // it provides several critical advantages:
147 //
148 // 1. **Maintainability**: No combinatorial explosion (8 match arms → simple chain)
149 // 2. **Extensibility**: Adding new middleware requires only one new block
150 // 3. **Clarity**: Each layer's purpose and configuration is explicit
151 // 4. **Type Safety**: BoxCloneService provides type erasure while preserving Clone
152 //
153 // Performance note: The boxing overhead is negligible compared to network I/O
154 // and handler execution time. Modern allocators make this essentially free.
155
156 // Start with core service as a boxed service for uniform type handling
157 let mut service: tower::util::BoxCloneService<
158 Request<Bytes>,
159 Response<Bytes>,
160 crate::ServerError,
161 > = tower::util::BoxCloneService::new(core_service);
162
163 // Authorization layer removed in 2.0.0 - handle at application layer
164
165 // Layer 2: Validation
166 // Validates request structure after auth but before processing
167 #[cfg(feature = "middleware")]
168 {
169 if let Some(validation_layer) = stack.validation_layer() {
170 service = tower::util::BoxCloneService::new(
171 tower::ServiceBuilder::new()
172 .layer(validation_layer)
173 .service(service),
174 );
175 }
176 }
177
178 // Layer 3: Timeout (outermost)
179 // Applied last so it can enforce timeout on the entire request pipeline
180 #[cfg(feature = "middleware")]
181 {
182 if let Some(timeout_config) = stack.timeout_config
183 && timeout_config.enabled
184 {
185 service = tower::util::BoxCloneService::new(
186 tower::ServiceBuilder::new()
187 .layer(tower_http::timeout::TimeoutLayer::new(
188 timeout_config.request_timeout,
189 ))
190 .service(service),
191 );
192 }
193 }
194
195 // Future middleware can be added here with similar if-let blocks:
196 // if let Some(auth_config) = stack.auth_config { ... }
197 // if let Some(audit_config) = stack.audit_config { ... }
198 // if let Some(rate_limit_config) = stack.rate_limit_config { ... }
199
200 service
201 }
202
203 /// Create a new server
204 #[must_use]
205 pub fn new(config: ServerConfig) -> Self {
206 Self::new_with_registry(config, HandlerRegistry::new())
207 }
208
209 /// Create a new server with an existing registry (used by ServerBuilder)
210 #[must_use]
211 pub(crate) fn new_with_registry(config: ServerConfig, registry: HandlerRegistry) -> Self {
212 let registry = Arc::new(registry);
213 let metrics = Arc::new(ServerMetrics::new());
214 let router = Arc::new(RequestRouter::new(
215 Arc::clone(®istry),
216 Arc::clone(&metrics),
217 ));
218 // Build middleware stack configuration
219 #[cfg(feature = "middleware")]
220 #[cfg_attr(not(feature = "rate-limiting"), allow(unused_mut))]
221 let mut stack = crate::middleware::MiddlewareStack::new();
222
223 // Auto-install rate limiting if enabled in config
224 #[cfg(feature = "rate-limiting")]
225 if config.rate_limiting.enabled {
226 use crate::middleware::rate_limit::{RateLimitStrategy, RateLimits};
227 use std::num::NonZeroU32;
228 use std::time::Duration;
229
230 let rate_config = crate::middleware::RateLimitConfig {
231 strategy: RateLimitStrategy::Global,
232 limits: RateLimits {
233 requests_per_period: NonZeroU32::new(
234 config.rate_limiting.requests_per_second * 60,
235 )
236 .unwrap(), // Convert per-second to per-minute
237 period: Duration::from_secs(60),
238 burst_size: Some(NonZeroU32::new(config.rate_limiting.burst_capacity).unwrap()),
239 },
240 enabled: true,
241 };
242
243 stack = stack.with_rate_limit(rate_config);
244 }
245
246 // Create core MCP service
247 let core_service = McpService::new(
248 Arc::clone(®istry),
249 Arc::clone(&router),
250 Arc::clone(&metrics),
251 );
252
253 // COMPREHENSIVE TOWER SERVICE COMPOSITION
254 // Build the complete middleware stack with proper type erasure
255 //
256 // This service is called from server/transport.rs for EVERY incoming request:
257 // TransportMessage -> http::Request -> service.call() -> http::Response -> TransportMessage
258 //
259 // The Tower middleware stack provides:
260 // ✓ Timeout enforcement (configurable per-request)
261 // ✓ Request validation (JSON-RPC structure)
262 // ✓ Authorization checks (resource access control)
263 // ✓ Rate limiting (if enabled in config)
264 // ✓ Audit logging (configurable)
265 // ✓ And more layers as configured
266 //
267 // BoxCloneService is Clone but !Sync - this is the Tower pattern
268 #[cfg(feature = "middleware")]
269 let service = Self::build_middleware_stack(core_service, stack);
270
271 #[cfg(not(feature = "middleware"))]
272 let service = tower::util::BoxCloneService::new(core_service);
273
274 let lifecycle = Arc::new(ServerLifecycle::new());
275
276 Self {
277 config,
278 registry,
279 router,
280 service,
281 lifecycle,
282 metrics,
283 }
284 }
285
286 /// Get server configuration
287 #[must_use]
288 pub const fn config(&self) -> &ServerConfig {
289 &self.config
290 }
291
292 /// Get handler registry
293 #[must_use]
294 pub const fn registry(&self) -> &Arc<HandlerRegistry> {
295 &self.registry
296 }
297
298 /// Get request router
299 #[must_use]
300 pub const fn router(&self) -> &Arc<RequestRouter> {
301 &self.router
302 }
303
304 /// Get server lifecycle
305 #[must_use]
306 pub const fn lifecycle(&self) -> &Arc<ServerLifecycle> {
307 &self.lifecycle
308 }
309
310 /// Get server metrics
311 #[must_use]
312 pub const fn metrics(&self) -> &Arc<ServerMetrics> {
313 &self.metrics
314 }
315
316 /// Get the Tower service stack (test accessor)
317 ///
318 /// **Note**: This is primarily for integration testing. Production code should
319 /// use the transport layer which calls the service internally via
320 /// `handle_transport_message()`.
321 ///
322 /// Returns a clone of the Tower service stack, which is cheap (BoxCloneService
323 /// is designed for cloning).
324 #[doc(hidden)]
325 pub fn service(
326 &self,
327 ) -> tower::util::BoxCloneService<Request<Bytes>, Response<Bytes>, crate::ServerError> {
328 self.service.clone()
329 }
330
331 /// Get a shutdown handle for graceful server termination
332 ///
333 /// This handle enables external control over server shutdown, essential for:
334 /// - **Production deployments**: Graceful shutdown on SIGTERM/SIGINT
335 /// - **Container orchestration**: Kubernetes graceful pod termination
336 /// - **Load balancer integration**: Health check coordination
337 /// - **Multi-component systems**: Coordinated shutdown sequences
338 /// - **Maintenance operations**: Planned downtime and updates
339 ///
340 /// # Examples
341 ///
342 /// ## Basic shutdown coordination
343 /// ```no_run
344 /// # use turbomcp_server::ServerBuilder;
345 /// # #[tokio::main]
346 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
347 /// let server = ServerBuilder::new().build();
348 /// let shutdown_handle = server.shutdown_handle();
349 ///
350 /// // Coordinate with other services
351 /// tokio::spawn(async move {
352 /// // Wait for external shutdown signal
353 /// tokio::signal::ctrl_c().await.expect("Failed to install Ctrl+C handler");
354 /// println!("Shutdown signal received, terminating gracefully...");
355 /// shutdown_handle.shutdown().await;
356 /// });
357 ///
358 /// // Server will gracefully shut down when signaled
359 /// // server.run_stdio().await?;
360 /// # Ok(())
361 /// # }
362 /// ```
363 ///
364 /// ## Container/Kubernetes deployment
365 /// ```no_run
366 /// # use turbomcp_server::ServerBuilder;
367 /// # use std::sync::Arc;
368 /// # #[tokio::main]
369 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
370 /// let server = ServerBuilder::new().build();
371 /// let shutdown_handle = server.shutdown_handle();
372 /// let shutdown_handle_clone = shutdown_handle.clone();
373 ///
374 /// // Handle multiple signal types with proper platform support
375 /// tokio::spawn(async move {
376 /// #[cfg(unix)]
377 /// {
378 /// use tokio::signal::unix::{signal, SignalKind};
379 /// let mut sigterm = signal(SignalKind::terminate()).unwrap();
380 /// tokio::select! {
381 /// _ = tokio::signal::ctrl_c() => {
382 /// println!("SIGINT received");
383 /// }
384 /// _ = sigterm.recv() => {
385 /// println!("SIGTERM received");
386 /// }
387 /// }
388 /// }
389 /// #[cfg(not(unix))]
390 /// {
391 /// tokio::signal::ctrl_c().await.expect("Failed to install Ctrl+C handler");
392 /// println!("SIGINT received");
393 /// }
394 /// shutdown_handle_clone.shutdown().await;
395 /// });
396 ///
397 /// // Server handles graceful shutdown automatically
398 /// // server.run_tcp("0.0.0.0:8080").await?;
399 /// # Ok(())
400 /// # }
401 /// ```
402 pub fn shutdown_handle(&self) -> ShutdownHandle {
403 ShutdownHandle::new(self.lifecycle.clone())
404 }
405
406 /// Run the server with STDIO transport
407 ///
408 /// # Errors
409 ///
410 /// Returns [`crate::ServerError::Transport`] if:
411 /// - STDIO transport connection fails
412 /// - Message sending/receiving fails
413 /// - Transport disconnection fails
414 #[tracing::instrument(skip(self), fields(
415 transport = "stdio",
416 service_name = %self.config.name,
417 service_version = %self.config.version
418 ))]
419 pub async fn run_stdio(self) -> ServerResult<()> {
420 // For STDIO transport, disable logging unless explicitly overridden
421 // STDIO stdout must be reserved exclusively for JSON-RPC messages per MCP protocol
422 if should_log_for_stdio() {
423 info!("Starting MCP server with STDIO transport");
424 }
425
426 // Start performance monitoring for STDIO server
427 let _perf_span = info_span!("server.run", transport = "stdio").entered();
428 info!("Initializing STDIO transport for MCP server");
429
430 self.lifecycle.start().await;
431
432 // Initialize STDIO transport
433 let transport = StdioTransport::new();
434 if let Err(e) = transport.connect().await {
435 if should_log_for_stdio() {
436 tracing::error!(error = %e, "Failed to connect stdio transport");
437 } else {
438 // Critical errors can go to stderr for debugging
439 eprintln!("TurboMCP STDIO transport failed to connect: {}", e);
440 }
441 self.lifecycle.shutdown().await;
442 return Err(e.into());
443 }
444
445 self.run_with_transport_stdio_aware(transport).await
446 }
447
448 /// Get health status
449 pub async fn health(&self) -> HealthStatus {
450 self.lifecycle.health().await
451 }
452
453 /// Run server with HTTP transport using default configuration
454 ///
455 /// This provides a working HTTP server with:
456 /// - Standard HTTP POST/GET/DELETE for MCP protocol at `/mcp`
457 /// - Full MCP 2025-06-18 protocol compliance
458 /// - Graceful shutdown support
459 /// - Default rate limiting (100 req/60s)
460 /// - Default security settings (localhost allowed, CORS disabled)
461 ///
462 /// For custom configuration (rate limits, security, CORS), use `run_http_with_config`.
463 ///
464 /// # Examples
465 ///
466 /// ## Basic usage with default configuration
467 /// ```no_run
468 /// use turbomcp_server::ServerBuilder;
469 ///
470 /// #[tokio::main]
471 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
472 /// let server = ServerBuilder::new()
473 /// .name("my-server")
474 /// .version("1.0.0")
475 /// .build();
476 ///
477 /// server.run_http("127.0.0.1:3000").await?;
478 /// Ok(())
479 /// }
480 /// ```
481 ///
482 /// ## With custom configuration
483 /// ```no_run
484 /// use turbomcp_server::ServerBuilder;
485 /// use turbomcp_transport::streamable_http_v2::StreamableHttpConfigBuilder;
486 /// use std::time::Duration;
487 ///
488 /// #[tokio::main]
489 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
490 /// let server = ServerBuilder::new()
491 /// .name("my-server")
492 /// .version("1.0.0")
493 /// .build();
494 ///
495 /// let config = StreamableHttpConfigBuilder::new()
496 /// .without_rate_limit() // For benchmarking
497 /// .allow_any_origin(true) // Enable CORS
498 /// .build();
499 ///
500 /// server.run_http_with_config("127.0.0.1:3000", config).await?;
501 /// Ok(())
502 /// }
503 /// ```
504 ///
505 /// # Errors
506 ///
507 /// Returns [`crate::ServerError::Transport`] if:
508 /// - Address resolution fails
509 /// - HTTP server fails to start
510 /// - Transport disconnection fails
511 #[cfg(feature = "http")]
512 #[tracing::instrument(skip(self), fields(
513 transport = "http",
514 service_name = %self.config.name,
515 service_version = %self.config.version,
516 addr = ?addr
517 ))]
518 pub async fn run_http<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
519 self,
520 addr: A,
521 ) -> ServerResult<()> {
522 use turbomcp_transport::streamable_http_v2::StreamableHttpConfigBuilder;
523
524 // Build default configuration
525 let config = StreamableHttpConfigBuilder::new().build();
526
527 self.run_http_with_config(addr, config).await
528 }
529
530 /// Run server with HTTP transport and custom configuration
531 ///
532 /// This provides full control over HTTP server configuration including:
533 /// - Rate limiting (requests per time window, or disabled entirely)
534 /// - Security settings (CORS, origin validation, authentication)
535 /// - Network settings (bind address, endpoint path, keep-alive)
536 /// - Advanced settings (replay buffer size, etc.)
537 ///
538 /// # Examples
539 ///
540 /// ## Benchmarking configuration (no rate limits)
541 /// ```no_run
542 /// use turbomcp_server::ServerBuilder;
543 /// use turbomcp_transport::streamable_http_v2::StreamableHttpConfigBuilder;
544 ///
545 /// #[tokio::main]
546 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
547 /// let server = ServerBuilder::new()
548 /// .name("benchmark-server")
549 /// .version("1.0.0")
550 /// .build();
551 ///
552 /// let config = StreamableHttpConfigBuilder::new()
553 /// .without_rate_limit() // Disable rate limiting
554 /// .build();
555 ///
556 /// server.run_http_with_config("127.0.0.1:3000", config).await?;
557 /// Ok(())
558 /// }
559 /// ```
560 ///
561 /// ## Production configuration (secure, rate limited)
562 /// ```no_run
563 /// use turbomcp_server::ServerBuilder;
564 /// use turbomcp_transport::streamable_http_v2::StreamableHttpConfigBuilder;
565 /// use std::time::Duration;
566 ///
567 /// #[tokio::main]
568 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
569 /// let server = ServerBuilder::new()
570 /// .name("production-server")
571 /// .version("1.0.0")
572 /// .build();
573 ///
574 /// let config = StreamableHttpConfigBuilder::new()
575 /// .with_rate_limit(1000, Duration::from_secs(60)) // 1000 req/min
576 /// .allow_any_origin(false) // Strict CORS
577 /// .require_authentication(true) // Require auth
578 /// .build();
579 ///
580 /// server.run_http_with_config("127.0.0.1:3000", config).await?;
581 /// Ok(())
582 /// }
583 /// ```
584 ///
585 /// # Errors
586 ///
587 /// Returns [`crate::ServerError::Transport`] if:
588 /// - Address resolution fails
589 /// - HTTP server fails to start
590 /// - Transport disconnection fails
591 #[cfg(feature = "http")]
592 #[tracing::instrument(skip(self, config), fields(
593 transport = "http",
594 service_name = %self.config.name,
595 service_version = %self.config.version,
596 addr = ?addr
597 ))]
598 pub async fn run_http_with_config<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
599 self,
600 addr: A,
601 config: turbomcp_transport::streamable_http_v2::StreamableHttpConfig,
602 ) -> ServerResult<()> {
603 use turbomcp_transport::streamable_http_v2::run_server;
604
605 info!("Starting MCP server with HTTP transport");
606 info!(
607 config = ?config,
608 "HTTP configuration loaded"
609 );
610
611 self.lifecycle.start().await;
612
613 // Resolve address to string
614 let socket_addr = addr
615 .to_socket_addrs()
616 .map_err(|e| crate::ServerError::configuration(format!("Invalid address: {}", e)))?
617 .next()
618 .ok_or_else(|| crate::ServerError::configuration("No address resolved"))?;
619
620 info!("Resolved address: {}", socket_addr);
621
622 // Use provided config but override bind_addr with resolved address
623 let mut final_config = config;
624 final_config.bind_addr = socket_addr.to_string();
625
626 info!(
627 bind_addr = %final_config.bind_addr,
628 endpoint_path = %final_config.endpoint_path,
629 "HTTP server configuration finalized"
630 );
631
632 // Run HTTP server with the router
633 // The router implements the required handler trait for HTTP transport
634 run_server(final_config, self.router.clone())
635 .await
636 .map_err(|e| {
637 tracing::error!(error = %e, "HTTP server failed");
638 crate::ServerError::handler(e.to_string())
639 })?;
640
641 info!("HTTP server shutdown complete");
642 Ok(())
643 }
644
645 /// Run server with WebSocket transport (full bidirectional support)
646 ///
647 /// This provides a simple API for WebSocket servers with sensible defaults:
648 /// - Default endpoint: `/mcp/ws`
649 /// - Full MCP 2025-06-18 compliance
650 /// - Bidirectional communication
651 /// - Elicitation support
652 /// - Session management and middleware
653 ///
654 /// For custom configuration, use `run_websocket_with_config()`.
655 ///
656 /// # Example
657 ///
658 /// ```no_run
659 /// use turbomcp_server::ServerBuilder;
660 ///
661 /// #[tokio::main]
662 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
663 /// let server = ServerBuilder::new()
664 /// .name("ws-server")
665 /// .version("1.0.0")
666 /// .build();
667 ///
668 /// server.run_websocket("127.0.0.1:8080").await?;
669 /// Ok(())
670 /// }
671 /// ```
672 #[cfg(all(feature = "websocket", feature = "http"))]
673 #[tracing::instrument(skip(self), fields(
674 transport = "websocket",
675 service_name = %self.config.name,
676 service_version = %self.config.version,
677 addr = ?addr
678 ))]
679 pub async fn run_websocket<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
680 self,
681 addr: A,
682 ) -> ServerResult<()> {
683 use turbomcp_transport::websocket_server::WebSocketServerConfig;
684
685 // Build default configuration
686 let config = WebSocketServerConfig::default();
687
688 self.run_websocket_with_config(addr, config).await
689 }
690
691 /// Run server with WebSocket transport and custom configuration
692 ///
693 /// This provides full control over WebSocket server configuration including:
694 /// - Custom endpoint path
695 /// - MCP server settings (middleware, security, etc.)
696 ///
697 /// # Example
698 ///
699 /// ```no_run
700 /// use turbomcp_server::ServerBuilder;
701 /// use turbomcp_transport::websocket_server::WebSocketServerConfig;
702 ///
703 /// #[tokio::main]
704 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
705 /// let server = ServerBuilder::new()
706 /// .name("custom-ws-server")
707 /// .version("1.0.0")
708 /// .build();
709 ///
710 /// let config = WebSocketServerConfig {
711 /// bind_addr: "0.0.0.0:8080".to_string(),
712 /// endpoint_path: "/custom/ws".to_string(),
713 /// };
714 ///
715 /// server.run_websocket_with_config("127.0.0.1:8080", config).await?;
716 /// Ok(())
717 /// }
718 /// ```
719 #[cfg(all(feature = "websocket", feature = "http"))]
720 #[tracing::instrument(skip(self, config), fields(
721 transport = "websocket",
722 service_name = %self.config.name,
723 service_version = %self.config.version,
724 addr = ?addr
725 ))]
726 pub async fn run_websocket_with_config<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
727 self,
728 addr: A,
729 config: turbomcp_transport::websocket_server::WebSocketServerConfig,
730 ) -> ServerResult<()> {
731 use turbomcp_transport::websocket_server::run_websocket_server_with_config;
732
733 info!("Starting MCP server with WebSocket transport");
734 info!(config = ?config, "WebSocket configuration");
735
736 // Resolve address to string
737 let socket_addr = addr
738 .to_socket_addrs()
739 .map_err(|e| crate::ServerError::configuration(format!("Invalid address: {}", e)))?
740 .next()
741 .ok_or_else(|| crate::ServerError::configuration("No address resolved"))?;
742
743 info!("Resolved address: {}", socket_addr);
744
745 // Use provided config but override bind_addr with resolved address
746 let mut final_config = config;
747 final_config.bind_addr = socket_addr.to_string();
748
749 // Run WebSocket server with the router
750 run_websocket_server_with_config(final_config, self.router.clone())
751 .await
752 .map_err(|e| {
753 tracing::error!(error = %e, "WebSocket server failed");
754 crate::ServerError::handler(e.to_string())
755 })?;
756
757 info!("WebSocket server shutdown complete");
758 Ok(())
759 }
760
761 /// Run server with TCP transport (progressive enhancement - runtime configuration)
762 #[cfg(feature = "tcp")]
763 #[tracing::instrument(skip(self), fields(
764 transport = "tcp",
765 service_name = %self.config.name,
766 service_version = %self.config.version,
767 addr = ?addr
768 ))]
769 pub async fn run_tcp<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
770 self,
771 addr: A,
772 ) -> ServerResult<()> {
773 use turbomcp_transport::TcpTransport;
774
775 // Start performance monitoring for TCP server
776 let _perf_span = info_span!("server.run", transport = "tcp").entered();
777 info!(?addr, "Starting MCP server with TCP transport");
778
779 self.lifecycle.start().await;
780
781 // Convert ToSocketAddrs to SocketAddr
782 let socket_addr = match addr.to_socket_addrs() {
783 Ok(mut addrs) => match addrs.next() {
784 Some(addr) => addr,
785 None => {
786 tracing::error!("No socket address resolved from provided address");
787 self.lifecycle.shutdown().await;
788 return Err(crate::ServerError::configuration("Invalid socket address"));
789 }
790 },
791 Err(e) => {
792 tracing::error!(error = %e, "Failed to resolve socket address");
793 self.lifecycle.shutdown().await;
794 return Err(crate::ServerError::configuration(format!(
795 "Address resolution failed: {e}"
796 )));
797 }
798 };
799
800 let transport = TcpTransport::new_server(socket_addr);
801 if let Err(e) = transport.connect().await {
802 tracing::error!(error = %e, "Failed to connect TCP transport");
803 self.lifecycle.shutdown().await;
804 return Err(e.into());
805 }
806
807 self.run_with_transport(transport).await
808 }
809
810 /// Run server with Unix socket transport (progressive enhancement - runtime configuration)
811 #[cfg(all(feature = "unix", unix))]
812 #[tracing::instrument(skip(self), fields(
813 transport = "unix",
814 service_name = %self.config.name,
815 service_version = %self.config.version,
816 path = ?path.as_ref()
817 ))]
818 pub async fn run_unix<P: AsRef<std::path::Path>>(self, path: P) -> ServerResult<()> {
819 use std::path::PathBuf;
820 use turbomcp_transport::UnixTransport;
821
822 // Start performance monitoring for Unix server
823 let _perf_span = info_span!("server.run", transport = "unix").entered();
824 info!(path = ?path.as_ref(), "Starting MCP server with Unix socket transport");
825
826 self.lifecycle.start().await;
827
828 let socket_path = PathBuf::from(path.as_ref());
829 let transport = UnixTransport::new_server(socket_path);
830 if let Err(e) = transport.connect().await {
831 tracing::error!(error = %e, "Failed to connect Unix socket transport");
832 self.lifecycle.shutdown().await;
833 return Err(e.into());
834 }
835
836 self.run_with_transport(transport).await
837 }
838
839 /// Generic transport runner (DRY principle)
840 /// Used by feature-gated transport methods (http, tcp, websocket, unix)
841 #[allow(dead_code)]
842 #[tracing::instrument(skip(self, transport), fields(
843 service_name = %self.config.name,
844 service_version = %self.config.version
845 ))]
846 async fn run_with_transport<T: Transport>(&self, mut transport: T) -> ServerResult<()> {
847 // Install signal handlers for graceful shutdown (Ctrl+C / SIGTERM)
848 let lifecycle_for_sigint = self.lifecycle.clone();
849 tokio::spawn(async move {
850 if let Err(e) = tokio::signal::ctrl_c().await {
851 tracing::warn!(error = %e, "Failed to install Ctrl+C handler");
852 return;
853 }
854 tracing::info!("Ctrl+C received, initiating shutdown");
855 lifecycle_for_sigint.shutdown().await;
856 });
857
858 #[cfg(unix)]
859 {
860 let lifecycle_for_sigterm = self.lifecycle.clone();
861 tokio::spawn(async move {
862 use tokio::signal::unix::{SignalKind, signal};
863 match signal(SignalKind::terminate()) {
864 Ok(mut sigterm) => {
865 sigterm.recv().await;
866 tracing::info!("SIGTERM received, initiating shutdown");
867 lifecycle_for_sigterm.shutdown().await;
868 }
869 Err(e) => tracing::warn!(error = %e, "Failed to install SIGTERM handler"),
870 }
871 });
872 }
873
874 // Shutdown signal
875 let mut shutdown = self.lifecycle.shutdown_signal();
876
877 // Main message processing loop
878 loop {
879 tokio::select! {
880 _ = shutdown.recv() => {
881 tracing::info!("Shutdown signal received");
882 break;
883 }
884 res = transport.receive() => {
885 match res {
886 Ok(Some(message)) => {
887 if let Err(e) = self.handle_transport_message(&mut transport, message).await {
888 tracing::warn!(error = %e, "Failed to handle transport message");
889 }
890 }
891 Ok(None) => {
892 // No message available; sleep briefly to avoid busy loop
893 sleep(Duration::from_millis(5)).await;
894 }
895 Err(e) => {
896 match e {
897 TransportError::ReceiveFailed(msg) if msg.contains("disconnected") => {
898 tracing::info!("Transport receive channel disconnected; shutting down");
899 break;
900 }
901 _ => {
902 tracing::error!(error = %e, "Transport receive failed");
903 // Backoff on errors
904 sleep(Duration::from_millis(50)).await;
905 }
906 }
907 }
908 }
909 }
910 }
911 }
912
913 // Disconnect transport
914 if let Err(e) = transport.disconnect().await {
915 tracing::warn!(error = %e, "Error while disconnecting transport");
916 }
917
918 tracing::info!("Server shutdown complete");
919 Ok(())
920 }
921
922 /// STDIO-aware transport runner that respects MCP protocol logging requirements
923 #[tracing::instrument(skip(self, transport), fields(
924 service_name = %self.config.name,
925 service_version = %self.config.version,
926 transport = "stdio"
927 ))]
928 async fn run_with_transport_stdio_aware<T: Transport>(
929 &self,
930 mut transport: T,
931 ) -> ServerResult<()> {
932 // Install signal handlers for graceful shutdown (Ctrl+C / SIGTERM)
933 let lifecycle_for_sigint = self.lifecycle.clone();
934 tokio::spawn(async move {
935 if let Err(e) = tokio::signal::ctrl_c().await {
936 if should_log_for_stdio() {
937 tracing::warn!(error = %e, "Failed to install Ctrl+C handler");
938 }
939 return;
940 }
941 if should_log_for_stdio() {
942 tracing::info!("Ctrl+C received, initiating shutdown");
943 }
944 lifecycle_for_sigint.shutdown().await;
945 });
946
947 #[cfg(unix)]
948 {
949 let lifecycle_for_sigterm = self.lifecycle.clone();
950 tokio::spawn(async move {
951 use tokio::signal::unix::{SignalKind, signal};
952 match signal(SignalKind::terminate()) {
953 Ok(mut sigterm) => {
954 sigterm.recv().await;
955 if should_log_for_stdio() {
956 tracing::info!("SIGTERM received, initiating shutdown");
957 }
958 lifecycle_for_sigterm.shutdown().await;
959 }
960 Err(e) => {
961 if should_log_for_stdio() {
962 tracing::warn!(error = %e, "Failed to install SIGTERM handler");
963 }
964 }
965 }
966 });
967 }
968
969 // Shutdown signal
970 let mut shutdown = self.lifecycle.shutdown_signal();
971
972 // Main message processing loop
973 loop {
974 tokio::select! {
975 _ = shutdown.recv() => {
976 if should_log_for_stdio() {
977 tracing::info!("Shutdown signal received");
978 }
979 break;
980 }
981 res = transport.receive() => {
982 match res {
983 Ok(Some(message)) => {
984 if let Err(e) = self.handle_transport_message_stdio_aware(&mut transport, message).await
985 && should_log_for_stdio() {
986 tracing::warn!(error = %e, "Failed to handle transport message");
987 }
988 }
989 Ok(None) => {
990 // No message available; sleep briefly to avoid busy loop
991 sleep(Duration::from_millis(5)).await;
992 }
993 Err(e) => {
994 match e {
995 TransportError::ReceiveFailed(msg) if msg.contains("disconnected") => {
996 if should_log_for_stdio() {
997 tracing::info!("Transport receive channel disconnected; shutting down");
998 }
999 break;
1000 }
1001 _ => {
1002 if should_log_for_stdio() {
1003 tracing::error!(error = %e, "Transport receive failed");
1004 }
1005 // Backoff on errors
1006 sleep(Duration::from_millis(50)).await;
1007 }
1008 }
1009 }
1010 }
1011 }
1012 }
1013 }
1014
1015 // Disconnect transport
1016 if let Err(e) = transport.disconnect().await
1017 && should_log_for_stdio()
1018 {
1019 tracing::warn!(error = %e, "Error while disconnecting transport");
1020 }
1021
1022 if should_log_for_stdio() {
1023 tracing::info!("Server shutdown complete");
1024 }
1025 Ok(())
1026 }
1027}
1028
1029// Compile-time assertion that McpServer is Send + Clone (Tower pattern)
1030// Note: McpServer is Clone but NOT Sync (due to BoxCloneService being !Sync)
1031// This is intentional and follows the Axum/Tower design pattern
1032#[allow(dead_code)]
1033const _: () = {
1034 const fn assert_send_clone<T: Send + Clone>() {}
1035 const fn check() {
1036 assert_send_clone::<crate::server::core::McpServer>();
1037 }
1038};