turbomcp_server/server/core.rs
1//! Core MCP server implementation
2//!
3//! Contains the main McpServer struct and its core functionality including
4//! middleware building, lifecycle management, and server construction.
5
6use std::sync::Arc;
7use tracing::{info, info_span};
8
9use crate::{
10 config::ServerConfig,
11 error::ServerResult,
12 lifecycle::{HealthStatus, ServerLifecycle},
13 metrics::ServerMetrics,
14 registry::HandlerRegistry,
15 routing::RequestRouter,
16 service::McpService,
17};
18
19#[cfg(feature = "middleware")]
20use crate::middleware::MiddlewareStack;
21
22use bytes::Bytes;
23use http::{Request, Response};
24use tokio::time::{Duration, sleep};
25use turbomcp_transport::Transport;
26use turbomcp_transport::core::TransportError;
27
28use super::shutdown::ShutdownHandle;
29
30/// Check if logging should be enabled for STDIO transport
31///
32/// For MCP STDIO transport compliance, logging is disabled by default since stdout
33/// must be reserved exclusively for JSON-RPC messages. This can be overridden by
34/// setting the TURBOMCP_FORCE_LOGGING environment variable.
35pub(crate) fn should_log_for_stdio() -> bool {
36 std::env::var("TURBOMCP_FORCE_LOGGING").is_ok()
37}
38
39/// Main MCP server following the Axum/Tower Clone pattern
40///
41/// ## Sharing Pattern
42///
43/// `McpServer` implements `Clone` like Axum's `Router`. All heavy state is Arc-wrapped
44/// internally, making cloning cheap (just atomic reference count increments).
45///
46/// ```rust,no_run
47/// use turbomcp_server::ServerBuilder;
48///
49/// # async fn example() {
50/// let server = ServerBuilder::new().build();
51///
52/// // Clone for passing to functions (cheap - just Arc increments)
53/// let server1 = server.clone();
54/// let server2 = server.clone();
55///
56/// // Access config and health
57/// let config = server1.config();
58/// println!("Server: {}", config.name);
59///
60/// let health = server2.health().await;
61/// println!("Health: {:?}", health);
62/// # }
63/// ```
64///
65/// ## Architecture Notes
66///
67/// The `service` field contains `BoxCloneService` which is `Send + Clone` but NOT `Sync`.
68/// This is intentional and follows Tower's design - users clone the server instead of
69/// Arc-wrapping it.
70///
71/// **Architecture Note**: The service field provides tower::Service integration for
72/// advanced middleware patterns. The request processing pipeline currently uses the
73/// RequestRouter directly. Tower integration can be added via custom middleware layers
74/// when needed for specific use cases (e.g., custom rate limiting, advanced tracing).
75#[derive(Clone)]
76pub struct McpServer {
77 /// Server configuration (Clone-able)
78 pub(crate) config: ServerConfig,
79 /// Handler registry (Arc-wrapped for cheap cloning)
80 pub(crate) registry: Arc<HandlerRegistry>,
81 /// Request router (Arc-wrapped for cheap cloning)
82 pub(crate) router: Arc<RequestRouter>,
83 /// Tower middleware service stack (Clone but !Sync - this is the Tower pattern)
84 ///
85 /// All requests flow through this service stack, which provides:
86 /// - Timeout enforcement
87 /// - Request validation
88 /// - Authorization checks
89 /// - Rate limiting
90 /// - Audit logging
91 /// - And more middleware layers as configured
92 ///
93 /// See `server/transport.rs` for integration with transport layer.
94 pub(crate) service:
95 tower::util::BoxCloneService<Request<Bytes>, Response<Bytes>, crate::ServerError>,
96 /// Server lifecycle (Arc-wrapped for cheap cloning)
97 pub(crate) lifecycle: Arc<ServerLifecycle>,
98 /// Server metrics (Arc-wrapped for cheap cloning)
99 pub(crate) metrics: Arc<ServerMetrics>,
100}
101
102impl std::fmt::Debug for McpServer {
103 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104 f.debug_struct("McpServer")
105 .field("config", &self.config)
106 .finish()
107 }
108}
109
110impl McpServer {
111 /// Build comprehensive Tower middleware stack (transport-agnostic)
112 ///
113 /// ## Architecture
114 ///
115 /// This creates a complete Tower service stack with conditional middleware layers:
116 /// - **Timeout Layer**: Request timeout enforcement (tower_http)
117 /// - **Validation Layer**: JSON-RPC structure validation
118 /// - **Authorization Layer**: Resource access control
119 /// - **Core Service**: JSON-RPC routing and handler execution
120 ///
121 /// All middleware is composed using Tower's ServiceBuilder pattern, which provides:
122 /// - Top-to-bottom execution order
123 /// - Type-safe layer composition
124 /// - Zero-cost abstractions
125 /// - Clone-able service instances
126 ///
127 /// ## Integration
128 ///
129 /// The resulting BoxCloneService is stored in `self.service` and called from
130 /// `server/transport.rs` for every incoming request. This ensures ALL requests
131 /// flow through the complete middleware pipeline before reaching handlers.
132 ///
133 /// ## Adding Middleware
134 ///
135 /// To add new middleware, update the match arms below to include your layer.
136 /// Follow the pattern of conditional inclusion based on config flags.
137 #[cfg(feature = "middleware")]
138 fn build_middleware_stack(
139 core_service: McpService,
140 stack: MiddlewareStack,
141 ) -> tower::util::BoxCloneService<Request<Bytes>, Response<Bytes>, crate::ServerError> {
142 // COMPREHENSIVE TOWER COMPOSITION - Conditional Layer Stacking
143 //
144 // This approach builds the middleware stack incrementally, boxing at each step.
145 // While this has a small performance cost from multiple boxing operations,
146 // it provides several critical advantages:
147 //
148 // 1. **Maintainability**: No combinatorial explosion (8 match arms → simple chain)
149 // 2. **Extensibility**: Adding new middleware requires only one new block
150 // 3. **Clarity**: Each layer's purpose and configuration is explicit
151 // 4. **Type Safety**: BoxCloneService provides type erasure while preserving Clone
152 //
153 // Performance note: The boxing overhead is negligible compared to network I/O
154 // and handler execution time. Modern allocators make this essentially free.
155
156 // Start with core service as a boxed service for uniform type handling
157 let mut service: tower::util::BoxCloneService<
158 Request<Bytes>,
159 Response<Bytes>,
160 crate::ServerError,
161 > = tower::util::BoxCloneService::new(core_service);
162
163 // Authorization layer removed in 2.0.0 - handle at application layer
164
165 // Layer 2: Validation
166 // Validates request structure after auth but before processing
167 #[cfg(feature = "middleware")]
168 {
169 if let Some(validation_layer) = stack.validation_layer() {
170 service = tower::util::BoxCloneService::new(
171 tower::ServiceBuilder::new()
172 .layer(validation_layer)
173 .service(service),
174 );
175 }
176 }
177
178 // Layer 3: Timeout (outermost)
179 // Applied last so it can enforce timeout on the entire request pipeline
180 #[cfg(feature = "middleware")]
181 {
182 if let Some(timeout_config) = stack.timeout_config
183 && timeout_config.enabled
184 {
185 service = tower::util::BoxCloneService::new(
186 tower::ServiceBuilder::new()
187 .layer(tower_http::timeout::TimeoutLayer::new(
188 timeout_config.request_timeout,
189 ))
190 .service(service),
191 );
192 }
193 }
194
195 // Future middleware can be added here with similar if-let blocks:
196 // if let Some(auth_config) = stack.auth_config { ... }
197 // if let Some(audit_config) = stack.audit_config { ... }
198 // if let Some(rate_limit_config) = stack.rate_limit_config { ... }
199
200 service
201 }
202
203 /// Create a new server
204 #[must_use]
205 pub fn new(config: ServerConfig) -> Self {
206 Self::new_with_registry(config, HandlerRegistry::new())
207 }
208
209 /// Create a new server with an existing registry (used by ServerBuilder)
210 #[must_use]
211 pub(crate) fn new_with_registry(config: ServerConfig, registry: HandlerRegistry) -> Self {
212 let registry = Arc::new(registry);
213 let metrics = Arc::new(ServerMetrics::new());
214 let router = Arc::new(RequestRouter::new(
215 Arc::clone(®istry),
216 Arc::clone(&metrics),
217 config.clone(),
218 ));
219 // Build middleware stack configuration
220 #[cfg(feature = "middleware")]
221 #[cfg_attr(not(feature = "rate-limiting"), allow(unused_mut))]
222 let mut stack = crate::middleware::MiddlewareStack::new();
223
224 // Auto-install rate limiting if enabled in config
225 #[cfg(feature = "rate-limiting")]
226 if config.rate_limiting.enabled {
227 use crate::middleware::rate_limit::{RateLimitStrategy, RateLimits};
228 use std::num::NonZeroU32;
229 use std::time::Duration;
230
231 let rate_config = crate::middleware::RateLimitConfig {
232 strategy: RateLimitStrategy::Global,
233 limits: RateLimits {
234 requests_per_period: NonZeroU32::new(
235 config.rate_limiting.requests_per_second * 60,
236 )
237 .unwrap(), // Convert per-second to per-minute
238 period: Duration::from_secs(60),
239 burst_size: Some(NonZeroU32::new(config.rate_limiting.burst_capacity).unwrap()),
240 },
241 enabled: true,
242 };
243
244 stack = stack.with_rate_limit(rate_config);
245 }
246
247 // Create core MCP service
248 let core_service = McpService::new(
249 Arc::clone(®istry),
250 Arc::clone(&router),
251 Arc::clone(&metrics),
252 );
253
254 // COMPREHENSIVE TOWER SERVICE COMPOSITION
255 // Build the complete middleware stack with proper type erasure
256 //
257 // This service is called from server/transport.rs for EVERY incoming request:
258 // TransportMessage -> http::Request -> service.call() -> http::Response -> TransportMessage
259 //
260 // The Tower middleware stack provides:
261 // ✓ Timeout enforcement (configurable per-request)
262 // ✓ Request validation (JSON-RPC structure)
263 // ✓ Authorization checks (resource access control)
264 // ✓ Rate limiting (if enabled in config)
265 // ✓ Audit logging (configurable)
266 // ✓ And more layers as configured
267 //
268 // BoxCloneService is Clone but !Sync - this is the Tower pattern
269 #[cfg(feature = "middleware")]
270 let service = Self::build_middleware_stack(core_service, stack);
271
272 #[cfg(not(feature = "middleware"))]
273 let service = tower::util::BoxCloneService::new(core_service);
274
275 let lifecycle = Arc::new(ServerLifecycle::new());
276
277 Self {
278 config,
279 registry,
280 router,
281 service,
282 lifecycle,
283 metrics,
284 }
285 }
286
287 /// Get server configuration
288 #[must_use]
289 pub const fn config(&self) -> &ServerConfig {
290 &self.config
291 }
292
293 /// Get handler registry
294 #[must_use]
295 pub const fn registry(&self) -> &Arc<HandlerRegistry> {
296 &self.registry
297 }
298
299 /// Get request router
300 #[must_use]
301 pub const fn router(&self) -> &Arc<RequestRouter> {
302 &self.router
303 }
304
305 /// Get server lifecycle
306 #[must_use]
307 pub const fn lifecycle(&self) -> &Arc<ServerLifecycle> {
308 &self.lifecycle
309 }
310
311 /// Get server metrics
312 #[must_use]
313 pub const fn metrics(&self) -> &Arc<ServerMetrics> {
314 &self.metrics
315 }
316
317 /// Get the Tower service stack (test accessor)
318 ///
319 /// **Note**: This is primarily for integration testing. Production code should
320 /// use the transport layer which calls the service internally via
321 /// `handle_transport_message()`.
322 ///
323 /// Returns a clone of the Tower service stack, which is cheap (BoxCloneService
324 /// is designed for cloning).
325 #[doc(hidden)]
326 pub fn service(
327 &self,
328 ) -> tower::util::BoxCloneService<Request<Bytes>, Response<Bytes>, crate::ServerError> {
329 self.service.clone()
330 }
331
332 /// Get a shutdown handle for graceful server termination
333 ///
334 /// This handle enables external control over server shutdown, essential for:
335 /// - **Production deployments**: Graceful shutdown on SIGTERM/SIGINT
336 /// - **Container orchestration**: Kubernetes graceful pod termination
337 /// - **Load balancer integration**: Health check coordination
338 /// - **Multi-component systems**: Coordinated shutdown sequences
339 /// - **Maintenance operations**: Planned downtime and updates
340 ///
341 /// # Examples
342 ///
343 /// ## Basic shutdown coordination
344 /// ```no_run
345 /// # use turbomcp_server::ServerBuilder;
346 /// # #[tokio::main]
347 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
348 /// let server = ServerBuilder::new().build();
349 /// let shutdown_handle = server.shutdown_handle();
350 ///
351 /// // Coordinate with other services
352 /// tokio::spawn(async move {
353 /// // Wait for external shutdown signal
354 /// tokio::signal::ctrl_c().await.expect("Failed to install Ctrl+C handler");
355 /// println!("Shutdown signal received, terminating gracefully...");
356 /// shutdown_handle.shutdown().await;
357 /// });
358 ///
359 /// // Server will gracefully shut down when signaled
360 /// // server.run_stdio().await?;
361 /// # Ok(())
362 /// # }
363 /// ```
364 ///
365 /// ## Container/Kubernetes deployment
366 /// ```no_run
367 /// # use turbomcp_server::ServerBuilder;
368 /// # use std::sync::Arc;
369 /// # #[tokio::main]
370 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
371 /// let server = ServerBuilder::new().build();
372 /// let shutdown_handle = server.shutdown_handle();
373 /// let shutdown_handle_clone = shutdown_handle.clone();
374 ///
375 /// // Handle multiple signal types with proper platform support
376 /// tokio::spawn(async move {
377 /// #[cfg(unix)]
378 /// {
379 /// use tokio::signal::unix::{signal, SignalKind};
380 /// let mut sigterm = signal(SignalKind::terminate()).unwrap();
381 /// tokio::select! {
382 /// _ = tokio::signal::ctrl_c() => {
383 /// println!("SIGINT received");
384 /// }
385 /// _ = sigterm.recv() => {
386 /// println!("SIGTERM received");
387 /// }
388 /// }
389 /// }
390 /// #[cfg(not(unix))]
391 /// {
392 /// tokio::signal::ctrl_c().await.expect("Failed to install Ctrl+C handler");
393 /// println!("SIGINT received");
394 /// }
395 /// shutdown_handle_clone.shutdown().await;
396 /// });
397 ///
398 /// // Server handles graceful shutdown automatically
399 /// // server.run_tcp("0.0.0.0:8080").await?;
400 /// # Ok(())
401 /// # }
402 /// ```
403 pub fn shutdown_handle(&self) -> ShutdownHandle {
404 ShutdownHandle::new(self.lifecycle.clone())
405 }
406
407 /// Run the server with STDIO transport
408 ///
409 /// # Errors
410 ///
411 /// Returns [`crate::ServerError::Transport`] if:
412 /// - STDIO transport connection fails
413 /// - Message sending/receiving fails
414 /// - Transport disconnection fails
415 #[tracing::instrument(skip(self), fields(
416 transport = "stdio",
417 service_name = %self.config.name,
418 service_version = %self.config.version
419 ))]
420 pub async fn run_stdio(mut self) -> ServerResult<()> {
421 // For STDIO transport, disable logging unless explicitly overridden
422 // STDIO stdout must be reserved exclusively for JSON-RPC messages per MCP protocol
423 if should_log_for_stdio() {
424 info!("Starting MCP server with STDIO transport");
425 }
426
427 // Start performance monitoring for STDIO server
428 let _perf_span = info_span!("server.run", transport = "stdio").entered();
429 info!("Initializing STDIO transport for MCP server");
430
431 self.lifecycle.start().await;
432
433 // BIDIRECTIONAL STDIO SETUP
434 // Create STDIO dispatcher for server-initiated requests (sampling, elicitation, roots, ping)
435 let (request_tx, request_rx) = tokio::sync::mpsc::unbounded_channel();
436
437 // Use fully-qualified path to avoid ambiguity with the turbomcp crate's runtime module
438 let dispatcher = crate::runtime::StdioDispatcher::new(request_tx);
439
440 // Configure router's bidirectional support with the STDIO dispatcher
441 // SAFETY: We have &mut self, so we can safely get mutable access to the Arc'd router
442 // This is the CRITICAL STEP that was missing - without this, all server→client requests fail
443 let router = Arc::make_mut(&mut self.router);
444 router.set_server_request_dispatcher(dispatcher.clone());
445
446 // Run STDIO with full bidirectional support (MCP 2025-06-18 compliant)
447 // This uses the bidirectional-aware runtime that handles both:
448 // - Client→Server requests (tools, resources, prompts)
449 // - Server→Client requests (sampling, elicitation, roots, ping)
450 crate::runtime::run_stdio_bidirectional(self.router.clone(), dispatcher, request_rx)
451 .await
452 .map_err(|e| crate::ServerError::Handler {
453 message: format!("STDIO bidirectional runtime failed: {}", e),
454 context: Some("run_stdio".to_string()),
455 })
456 }
457
458 /// Get health status
459 pub async fn health(&self) -> HealthStatus {
460 self.lifecycle.health().await
461 }
462
463 /// Run server with HTTP transport using default configuration
464 ///
465 /// This provides a working HTTP server with:
466 /// - Standard HTTP POST/GET/DELETE for MCP protocol at `/mcp`
467 /// - Full MCP 2025-06-18 protocol compliance
468 /// - Graceful shutdown support
469 /// - Default rate limiting (100 req/60s)
470 /// - Default security settings (localhost allowed, CORS disabled)
471 ///
472 /// For custom configuration (rate limits, security, CORS), use `run_http_with_config`.
473 ///
474 /// # Examples
475 ///
476 /// ## Basic usage with default configuration
477 /// ```no_run
478 /// use turbomcp_server::ServerBuilder;
479 ///
480 /// #[tokio::main]
481 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
482 /// let server = ServerBuilder::new()
483 /// .name("my-server")
484 /// .version("1.0.0")
485 /// .build();
486 ///
487 /// server.run_http("127.0.0.1:3000").await?;
488 /// Ok(())
489 /// }
490 /// ```
491 ///
492 /// ## With custom configuration
493 /// ```no_run
494 /// use turbomcp_server::ServerBuilder;
495 /// use turbomcp_transport::streamable_http_v2::StreamableHttpConfigBuilder;
496 /// use std::time::Duration;
497 ///
498 /// #[tokio::main]
499 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
500 /// let server = ServerBuilder::new()
501 /// .name("my-server")
502 /// .version("1.0.0")
503 /// .build();
504 ///
505 /// let config = StreamableHttpConfigBuilder::new()
506 /// .without_rate_limit() // For benchmarking
507 /// .allow_any_origin(true) // Enable CORS
508 /// .build();
509 ///
510 /// server.run_http_with_config("127.0.0.1:3000", config).await?;
511 /// Ok(())
512 /// }
513 /// ```
514 ///
515 /// # Errors
516 ///
517 /// Returns [`crate::ServerError::Transport`] if:
518 /// - Address resolution fails
519 /// - HTTP server fails to start
520 /// - Transport disconnection fails
521 #[cfg(feature = "http")]
522 #[tracing::instrument(skip(self), fields(
523 transport = "http",
524 service_name = %self.config.name,
525 service_version = %self.config.version,
526 addr = ?addr
527 ))]
528 pub async fn run_http<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
529 self,
530 addr: A,
531 ) -> ServerResult<()> {
532 use turbomcp_transport::streamable_http_v2::StreamableHttpConfigBuilder;
533
534 // Build default configuration
535 let config = StreamableHttpConfigBuilder::new().build();
536
537 self.run_http_with_config(addr, config).await
538 }
539
540 /// Run server with HTTP transport and custom configuration
541 ///
542 /// This provides full control over HTTP server configuration including:
543 /// - Rate limiting (requests per time window, or disabled entirely)
544 /// - Security settings (CORS, origin validation, authentication)
545 /// - Network settings (bind address, endpoint path, keep-alive)
546 /// - Advanced settings (replay buffer size, etc.)
547 ///
548 /// # Examples
549 ///
550 /// ## Benchmarking configuration (no rate limits)
551 /// ```no_run
552 /// use turbomcp_server::ServerBuilder;
553 /// use turbomcp_transport::streamable_http_v2::StreamableHttpConfigBuilder;
554 ///
555 /// #[tokio::main]
556 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
557 /// let server = ServerBuilder::new()
558 /// .name("benchmark-server")
559 /// .version("1.0.0")
560 /// .build();
561 ///
562 /// let config = StreamableHttpConfigBuilder::new()
563 /// .without_rate_limit() // Disable rate limiting
564 /// .build();
565 ///
566 /// server.run_http_with_config("127.0.0.1:3000", config).await?;
567 /// Ok(())
568 /// }
569 /// ```
570 ///
571 /// ## Production configuration (secure, rate limited)
572 /// ```no_run
573 /// use turbomcp_server::ServerBuilder;
574 /// use turbomcp_transport::streamable_http_v2::StreamableHttpConfigBuilder;
575 /// use std::time::Duration;
576 ///
577 /// #[tokio::main]
578 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
579 /// let server = ServerBuilder::new()
580 /// .name("production-server")
581 /// .version("1.0.0")
582 /// .build();
583 ///
584 /// let config = StreamableHttpConfigBuilder::new()
585 /// .with_rate_limit(1000, Duration::from_secs(60)) // 1000 req/min
586 /// .allow_any_origin(false) // Strict CORS
587 /// .require_authentication(true) // Require auth
588 /// .build();
589 ///
590 /// server.run_http_with_config("127.0.0.1:3000", config).await?;
591 /// Ok(())
592 /// }
593 /// ```
594 ///
595 /// # Errors
596 ///
597 /// Returns [`crate::ServerError::Transport`] if:
598 /// - Address resolution fails
599 /// - HTTP server fails to start
600 /// - Transport disconnection fails
601 #[cfg(feature = "http")]
602 #[tracing::instrument(skip(self, config), fields(
603 transport = "http",
604 service_name = %self.config.name,
605 service_version = %self.config.version,
606 addr = ?addr
607 ))]
608 pub async fn run_http_with_config<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
609 self,
610 addr: A,
611 config: turbomcp_transport::streamable_http_v2::StreamableHttpConfig,
612 ) -> ServerResult<()> {
613 use std::collections::HashMap;
614 use tokio::sync::{Mutex, RwLock};
615
616 info!("Starting MCP server with HTTP transport");
617 info!(
618 config = ?config,
619 "HTTP configuration loaded"
620 );
621
622 self.lifecycle.start().await;
623
624 // Resolve address to string
625 let socket_addr = addr
626 .to_socket_addrs()
627 .map_err(|e| crate::ServerError::configuration(format!("Invalid address: {}", e)))?
628 .next()
629 .ok_or_else(|| crate::ServerError::configuration("No address resolved"))?;
630
631 info!("Resolved address: {}", socket_addr);
632
633 // BIDIRECTIONAL HTTP SETUP
634 // Create shared state for session management and bidirectional MCP
635 let sessions = Arc::new(RwLock::new(HashMap::new()));
636 let pending_requests = Arc::new(Mutex::new(HashMap::new()));
637
638 // Share router across all sessions (routing logic and handler registry)
639 let router = self.router.clone();
640
641 // Capture server identity for MCP protocol compliance
642 let server_info = turbomcp_protocol::ServerInfo {
643 name: self.config.name.clone(),
644 version: self.config.version.clone(),
645 };
646
647 // Factory pattern: create session-specific router for each HTTP request
648 // This is the clean architecture that HTTP requires - each session gets its own
649 // bidirectional dispatcher while sharing the routing logic
650 let sessions_for_factory = Arc::clone(&sessions);
651 let pending_for_factory = Arc::clone(&pending_requests);
652 let router_for_factory = Arc::clone(&router);
653
654 let handler_factory = move |session_id: Option<String>| {
655 let session_id = session_id.unwrap_or_else(|| {
656 let new_id = uuid::Uuid::new_v4().to_string();
657 tracing::warn!(
658 "⚠️ Factory generating random session ID (no session ID provided): {}",
659 new_id
660 );
661 new_id
662 });
663
664 tracing::debug!("Factory creating handler for session: {}", session_id);
665
666 // Create session-specific HTTP dispatcher (now local to turbomcp-server!)
667 let dispatcher = crate::runtime::http::HttpDispatcher::new(
668 session_id,
669 Arc::clone(&sessions_for_factory),
670 Arc::clone(&pending_for_factory),
671 );
672
673 // Clone the base router and configure with session-specific dispatcher
674 // CRITICAL: set_server_request_dispatcher also recreates server_to_client adapter
675 let mut session_router = (*router_for_factory).clone();
676 session_router.set_server_request_dispatcher(dispatcher);
677
678 session_router
679 };
680
681 info!(
682 server_name = %server_info.name,
683 server_version = %server_info.version,
684 bind_addr = %socket_addr,
685 endpoint_path = %config.endpoint_path,
686 "HTTP server starting with full bidirectional support (elicitation, sampling, roots, ping)"
687 );
688
689 // Use factory-based HTTP server with full bidirectional support
690 use crate::runtime::http::run_http;
691 run_http(
692 handler_factory,
693 sessions,
694 pending_requests,
695 socket_addr.to_string(),
696 config.endpoint_path.clone(),
697 )
698 .await
699 .map_err(|e| {
700 tracing::error!(error = %e, "HTTP server failed");
701 crate::ServerError::handler(e.to_string())
702 })?;
703
704 info!("HTTP server shutdown complete");
705 Ok(())
706 }
707
708 /// Run server with WebSocket transport (full bidirectional support)
709 ///
710 /// This provides a simple API for WebSocket servers with sensible defaults:
711 /// - Default endpoint: `/mcp/ws`
712 /// - Full MCP 2025-06-18 compliance
713 /// - Bidirectional communication
714 /// - Elicitation support
715 /// - Session management and middleware
716 ///
717 /// For custom configuration, use `run_websocket_with_config()`.
718 ///
719 /// # Example
720 ///
721 /// ```no_run
722 /// use turbomcp_server::ServerBuilder;
723 ///
724 /// #[tokio::main]
725 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
726 /// let server = ServerBuilder::new()
727 /// .name("ws-server")
728 /// .version("1.0.0")
729 /// .build();
730 ///
731 /// server.run_websocket("127.0.0.1:8080").await?;
732 /// Ok(())
733 /// }
734 /// ```
735 #[cfg(feature = "websocket")]
736 #[tracing::instrument(skip(self), fields(
737 transport = "websocket",
738 service_name = %self.config.name,
739 service_version = %self.config.version,
740 addr = ?addr
741 ))]
742 pub async fn run_websocket<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
743 self,
744 addr: A,
745 ) -> ServerResult<()> {
746 use crate::runtime::websocket::WebSocketServerConfig;
747
748 // Build default configuration
749 let config = WebSocketServerConfig::default();
750
751 self.run_websocket_with_config(addr, config).await
752 }
753
754 /// Run server with WebSocket transport and custom configuration
755 ///
756 /// This provides full control over WebSocket server configuration including:
757 /// - Custom endpoint path
758 /// - MCP server settings (middleware, security, etc.)
759 ///
760 /// # Example
761 ///
762 /// ```no_run
763 /// use turbomcp_server::{ServerBuilder, WebSocketServerConfig};
764 ///
765 /// #[tokio::main]
766 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
767 /// let server = ServerBuilder::new()
768 /// .name("custom-ws-server")
769 /// .version("1.0.0")
770 /// .build();
771 ///
772 /// let config = WebSocketServerConfig {
773 /// bind_addr: "0.0.0.0:8080".to_string(),
774 /// endpoint_path: "/custom/ws".to_string(),
775 /// };
776 ///
777 /// server.run_websocket_with_config("127.0.0.1:8080", config).await?;
778 /// Ok(())
779 /// }
780 /// ```
781 #[cfg(feature = "websocket")]
782 #[tracing::instrument(skip(self, config), fields(
783 transport = "websocket",
784 service_name = %self.config.name,
785 service_version = %self.config.version,
786 addr = ?addr
787 ))]
788 pub async fn run_websocket_with_config<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
789 self,
790 addr: A,
791 config: crate::runtime::websocket::WebSocketServerConfig,
792 ) -> ServerResult<()> {
793 info!("Starting MCP server with WebSocket transport");
794 info!(config = ?config, "WebSocket configuration");
795
796 self.lifecycle.start().await;
797
798 // Resolve address to string
799 let socket_addr = addr
800 .to_socket_addrs()
801 .map_err(|e| crate::ServerError::configuration(format!("Invalid address: {}", e)))?
802 .next()
803 .ok_or_else(|| crate::ServerError::configuration("No address resolved"))?;
804
805 info!("Resolved address: {}", socket_addr);
806
807 // Capture server identity for MCP protocol compliance
808 let server_info = turbomcp_protocol::ServerInfo {
809 name: self.config.name.clone(),
810 version: self.config.version.clone(),
811 };
812
813 // Router for this server (shared across all connections)
814 let router = (*self.router).clone();
815
816 // Wrapper factory: configure router with per-connection dispatcher
817 // This is the same clean architecture as HTTP - each connection gets its own
818 // bidirectional dispatcher while sharing the routing logic
819 let wrapper_factory =
820 move |mut base_router: crate::routing::RequestRouter,
821 dispatcher: crate::runtime::websocket::WebSocketServerDispatcher| {
822 // Clone the router for this connection and configure with dispatcher
823 // CRITICAL: set_server_request_dispatcher also recreates server_to_client adapter
824 base_router.set_server_request_dispatcher(dispatcher);
825 base_router
826 };
827
828 info!(
829 server_name = %server_info.name,
830 server_version = %server_info.version,
831 bind_addr = %socket_addr,
832 endpoint_path = %config.endpoint_path,
833 "WebSocket server starting with full bidirectional support (elicitation, sampling, roots, ping)"
834 );
835
836 // Use factory-based WebSocket server with full bidirectional support
837 use crate::runtime::websocket::run_websocket;
838
839 // Update config with resolved bind address
840 let ws_config = crate::runtime::websocket::WebSocketServerConfig {
841 bind_addr: socket_addr.to_string(),
842 endpoint_path: config.endpoint_path.clone(),
843 max_concurrent_requests: config.max_concurrent_requests,
844 };
845
846 run_websocket(router, wrapper_factory, ws_config)
847 .await
848 .map_err(|e| {
849 tracing::error!(error = %e, "WebSocket server failed");
850 crate::ServerError::handler(e.to_string())
851 })?;
852
853 info!("WebSocket server shutdown complete");
854 Ok(())
855 }
856
857 /// Run server with TCP transport (progressive enhancement - runtime configuration)
858 #[cfg(feature = "tcp")]
859 #[tracing::instrument(skip(self), fields(
860 transport = "tcp",
861 service_name = %self.config.name,
862 service_version = %self.config.version,
863 addr = ?addr
864 ))]
865 pub async fn run_tcp<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
866 mut self,
867 addr: A,
868 ) -> ServerResult<()> {
869 use turbomcp_transport::TcpTransport;
870
871 // Start performance monitoring for TCP server
872 let _perf_span = info_span!("server.run", transport = "tcp").entered();
873 info!(?addr, "Starting MCP server with TCP transport");
874
875 self.lifecycle.start().await;
876
877 // Convert ToSocketAddrs to SocketAddr
878 let socket_addr = match addr.to_socket_addrs() {
879 Ok(mut addrs) => match addrs.next() {
880 Some(addr) => addr,
881 None => {
882 tracing::error!("No socket address resolved from provided address");
883 self.lifecycle.shutdown().await;
884 return Err(crate::ServerError::configuration("Invalid socket address"));
885 }
886 },
887 Err(e) => {
888 tracing::error!(error = %e, "Failed to resolve socket address");
889 self.lifecycle.shutdown().await;
890 return Err(crate::ServerError::configuration(format!(
891 "Address resolution failed: {e}"
892 )));
893 }
894 };
895
896 let transport = TcpTransport::new_server(socket_addr);
897 if let Err(e) = transport.connect().await {
898 tracing::error!(error = %e, "Failed to connect TCP transport");
899 self.lifecycle.shutdown().await;
900 return Err(e.into());
901 }
902
903 // BIDIRECTIONAL TCP SETUP
904 // Create generic transport dispatcher for server-initiated requests
905 let dispatcher = crate::runtime::TransportDispatcher::new(transport);
906
907 // Configure router's bidirectional support with the TCP dispatcher
908 // This enables ctx.elicit(), ctx.create_message(), ctx.list_roots(), etc.
909 let router = Arc::make_mut(&mut self.router);
910 router.set_server_request_dispatcher(dispatcher.clone());
911
912 // Run TCP with full bidirectional support (MCP 2025-06-18 compliant)
913 // This uses the generic bidirectional runtime that handles both:
914 // - Client→Server requests (tools, resources, prompts)
915 // - Server→Client requests (sampling, elicitation, roots, ping)
916 crate::runtime::run_transport_bidirectional(self.router.clone(), dispatcher)
917 .await
918 .map_err(|e| crate::ServerError::Handler {
919 message: format!("TCP bidirectional runtime failed: {}", e),
920 context: Some("run_tcp".to_string()),
921 })
922 }
923
924 /// Run server with Unix socket transport (progressive enhancement - runtime configuration)
925 #[cfg(all(feature = "unix", unix))]
926 #[tracing::instrument(skip(self), fields(
927 transport = "unix",
928 service_name = %self.config.name,
929 service_version = %self.config.version,
930 path = ?path.as_ref()
931 ))]
932 pub async fn run_unix<P: AsRef<std::path::Path>>(mut self, path: P) -> ServerResult<()> {
933 use std::path::PathBuf;
934 use turbomcp_transport::UnixTransport;
935
936 // Start performance monitoring for Unix server
937 let _perf_span = info_span!("server.run", transport = "unix").entered();
938 info!(path = ?path.as_ref(), "Starting MCP server with Unix socket transport");
939
940 self.lifecycle.start().await;
941
942 let socket_path = PathBuf::from(path.as_ref());
943 let transport = UnixTransport::new_server(socket_path);
944 if let Err(e) = transport.connect().await {
945 tracing::error!(error = %e, "Failed to connect Unix socket transport");
946 self.lifecycle.shutdown().await;
947 return Err(e.into());
948 }
949
950 // BIDIRECTIONAL UNIX SOCKET SETUP
951 // Create generic transport dispatcher for server-initiated requests
952 let dispatcher = crate::runtime::TransportDispatcher::new(transport);
953
954 // Configure router's bidirectional support with the Unix socket dispatcher
955 // This enables ctx.elicit(), ctx.create_message(), ctx.list_roots(), etc.
956 let router = Arc::make_mut(&mut self.router);
957 router.set_server_request_dispatcher(dispatcher.clone());
958
959 // Run Unix Socket with full bidirectional support (MCP 2025-06-18 compliant)
960 // This uses the generic bidirectional runtime that handles both:
961 // - Client→Server requests (tools, resources, prompts)
962 // - Server→Client requests (sampling, elicitation, roots, ping)
963 crate::runtime::run_transport_bidirectional(self.router.clone(), dispatcher)
964 .await
965 .map_err(|e| crate::ServerError::Handler {
966 message: format!("Unix socket bidirectional runtime failed: {}", e),
967 context: Some("run_unix".to_string()),
968 })
969 }
970
971 /// Generic transport runner (DRY principle)
972 /// Used by feature-gated transport methods (http, tcp, websocket, unix)
973 #[allow(dead_code)]
974 #[tracing::instrument(skip(self, transport), fields(
975 service_name = %self.config.name,
976 service_version = %self.config.version
977 ))]
978 async fn run_with_transport<T: Transport>(&self, mut transport: T) -> ServerResult<()> {
979 // Install signal handlers for graceful shutdown (Ctrl+C / SIGTERM)
980 let lifecycle_for_sigint = self.lifecycle.clone();
981 tokio::spawn(async move {
982 if let Err(e) = tokio::signal::ctrl_c().await {
983 tracing::warn!(error = %e, "Failed to install Ctrl+C handler");
984 return;
985 }
986 tracing::info!("Ctrl+C received, initiating shutdown");
987 lifecycle_for_sigint.shutdown().await;
988 });
989
990 #[cfg(unix)]
991 {
992 let lifecycle_for_sigterm = self.lifecycle.clone();
993 tokio::spawn(async move {
994 use tokio::signal::unix::{SignalKind, signal};
995 match signal(SignalKind::terminate()) {
996 Ok(mut sigterm) => {
997 sigterm.recv().await;
998 tracing::info!("SIGTERM received, initiating shutdown");
999 lifecycle_for_sigterm.shutdown().await;
1000 }
1001 Err(e) => tracing::warn!(error = %e, "Failed to install SIGTERM handler"),
1002 }
1003 });
1004 }
1005
1006 // Shutdown signal
1007 let mut shutdown = self.lifecycle.shutdown_signal();
1008
1009 // Main message processing loop
1010 loop {
1011 tokio::select! {
1012 _ = shutdown.recv() => {
1013 tracing::info!("Shutdown signal received");
1014 break;
1015 }
1016 res = transport.receive() => {
1017 match res {
1018 Ok(Some(message)) => {
1019 if let Err(e) = self.handle_transport_message(&mut transport, message).await {
1020 tracing::warn!(error = %e, "Failed to handle transport message");
1021 }
1022 }
1023 Ok(None) => {
1024 // No message available; sleep briefly to avoid busy loop
1025 sleep(Duration::from_millis(5)).await;
1026 }
1027 Err(e) => {
1028 match e {
1029 TransportError::ReceiveFailed(msg) if msg.contains("disconnected") => {
1030 tracing::info!("Transport receive channel disconnected; shutting down");
1031 break;
1032 }
1033 _ => {
1034 tracing::error!(error = %e, "Transport receive failed");
1035 // Backoff on errors
1036 sleep(Duration::from_millis(50)).await;
1037 }
1038 }
1039 }
1040 }
1041 }
1042 }
1043 }
1044
1045 // Disconnect transport
1046 if let Err(e) = transport.disconnect().await {
1047 tracing::warn!(error = %e, "Error while disconnecting transport");
1048 }
1049
1050 tracing::info!("Server shutdown complete");
1051 Ok(())
1052 }
1053}
1054
1055// Compile-time assertion that McpServer is Send + Clone (Tower pattern)
1056// Note: McpServer is Clone but NOT Sync (due to BoxCloneService being !Sync)
1057// This is intentional and follows the Axum/Tower design pattern
1058#[allow(dead_code)]
1059const _: () = {
1060 const fn assert_send_clone<T: Send + Clone>() {}
1061 const fn check() {
1062 assert_send_clone::<crate::server::core::McpServer>();
1063 }
1064};