turbomcp_server/server/core.rs
1//! Core MCP server implementation
2//!
3//! Contains the main McpServer struct and its core functionality including
4//! middleware building, lifecycle management, and server construction.
5
6use std::sync::Arc;
7use tracing::{info, info_span};
8
9#[cfg(feature = "http")]
10use tracing::warn;
11
12use crate::{
13 config::ServerConfig,
14 error::ServerResult,
15 lifecycle::{HealthStatus, ServerLifecycle},
16 metrics::ServerMetrics,
17 registry::HandlerRegistry,
18 routing::RequestRouter,
19 service::McpService,
20};
21
22#[cfg(feature = "middleware")]
23use crate::middleware::MiddlewareStack;
24
25use bytes::Bytes;
26use http::{Request, Response};
27use tokio::time::{Duration, sleep};
28use turbomcp_transport::Transport;
29use turbomcp_transport::core::TransportError;
30
31use super::shutdown::ShutdownHandle;
32
33/// Check if logging should be enabled for STDIO transport
34///
35/// For MCP STDIO transport compliance, logging is disabled by default since stdout
36/// must be reserved exclusively for JSON-RPC messages. This can be overridden by
37/// setting the TURBOMCP_FORCE_LOGGING environment variable.
38pub(crate) fn should_log_for_stdio() -> bool {
39 std::env::var("TURBOMCP_FORCE_LOGGING").is_ok()
40}
41
42/// Main MCP server following the Axum/Tower Clone pattern
43///
44/// ## Sharing Pattern
45///
46/// `McpServer` implements `Clone` like Axum's `Router`. All heavy state is Arc-wrapped
47/// internally, making cloning cheap (just atomic reference count increments).
48///
49/// ```rust,no_run
50/// use turbomcp_server::ServerBuilder;
51///
52/// # async fn example() {
53/// let server = ServerBuilder::new().build();
54///
55/// // Clone for passing to functions (cheap - just Arc increments)
56/// let server1 = server.clone();
57/// let server2 = server.clone();
58///
59/// // Access config and health
60/// let config = server1.config();
61/// println!("Server: {}", config.name);
62///
63/// let health = server2.health().await;
64/// println!("Health: {:?}", health);
65/// # }
66/// ```
67///
68/// ## Architecture Notes
69///
70/// The `service` field contains `BoxCloneService` which is `Send + Clone` but NOT `Sync`.
71/// This is intentional and follows Tower's design - users clone the server instead of
72/// Arc-wrapping it.
73///
74/// **Architecture Note**: The service field provides tower::Service integration for
75/// advanced middleware patterns. The request processing pipeline currently uses the
76/// RequestRouter directly. Tower integration can be added via custom middleware layers
77/// when needed for specific use cases (e.g., custom rate limiting, advanced tracing).
78#[derive(Clone)]
79pub struct McpServer {
80 /// Server configuration (Clone-able)
81 pub(crate) config: ServerConfig,
82 /// Handler registry (Arc-wrapped for cheap cloning)
83 pub(crate) registry: Arc<HandlerRegistry>,
84 /// Request router (Arc-wrapped for cheap cloning)
85 pub(crate) router: Arc<RequestRouter>,
86 /// Tower middleware service stack (Clone but !Sync - this is the Tower pattern)
87 ///
88 /// All requests flow through this service stack, which provides:
89 /// - Timeout enforcement
90 /// - Request validation
91 /// - Authorization checks
92 /// - Rate limiting
93 /// - Audit logging
94 /// - And more middleware layers as configured
95 ///
96 /// See `server/transport.rs` for integration with transport layer.
97 pub(crate) service:
98 tower::util::BoxCloneService<Request<Bytes>, Response<Bytes>, crate::ServerError>,
99 /// Server lifecycle (Arc-wrapped for cheap cloning)
100 pub(crate) lifecycle: Arc<ServerLifecycle>,
101 /// Server metrics (Arc-wrapped for cheap cloning)
102 pub(crate) metrics: Arc<ServerMetrics>,
103}
104
105impl std::fmt::Debug for McpServer {
106 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107 f.debug_struct("McpServer")
108 .field("config", &self.config)
109 .finish()
110 }
111}
112
113impl McpServer {
114 /// Build comprehensive Tower middleware stack (transport-agnostic)
115 ///
116 /// ## Architecture
117 ///
118 /// This creates a complete Tower service stack with conditional middleware layers:
119 /// - **Timeout Layer**: Request timeout enforcement (tower_http)
120 /// - **Validation Layer**: JSON-RPC structure validation
121 /// - **Authorization Layer**: Resource access control
122 /// - **Core Service**: JSON-RPC routing and handler execution
123 ///
124 /// All middleware is composed using Tower's ServiceBuilder pattern, which provides:
125 /// - Top-to-bottom execution order
126 /// - Type-safe layer composition
127 /// - Zero-cost abstractions
128 /// - Clone-able service instances
129 ///
130 /// ## Integration
131 ///
132 /// The resulting BoxCloneService is stored in `self.service` and called from
133 /// `server/transport.rs` for every incoming request. This ensures ALL requests
134 /// flow through the complete middleware pipeline before reaching handlers.
135 ///
136 /// ## Adding Middleware
137 ///
138 /// To add new middleware, update the match arms below to include your layer.
139 /// Follow the pattern of conditional inclusion based on config flags.
140 #[cfg(feature = "middleware")]
141 fn build_middleware_stack(
142 core_service: McpService,
143 stack: MiddlewareStack,
144 ) -> tower::util::BoxCloneService<Request<Bytes>, Response<Bytes>, crate::ServerError> {
145 // COMPREHENSIVE TOWER COMPOSITION - Conditional Layer Stacking
146 //
147 // This approach builds the middleware stack incrementally, boxing at each step.
148 // While this has a small performance cost from multiple boxing operations,
149 // it provides several critical advantages:
150 //
151 // 1. **Maintainability**: No combinatorial explosion (8 match arms → simple chain)
152 // 2. **Extensibility**: Adding new middleware requires only one new block
153 // 3. **Clarity**: Each layer's purpose and configuration is explicit
154 // 4. **Type Safety**: BoxCloneService provides type erasure while preserving Clone
155 //
156 // Performance note: The boxing overhead is negligible compared to network I/O
157 // and handler execution time. Modern allocators make this essentially free.
158
159 // Start with core service as a boxed service for uniform type handling
160 let mut service: tower::util::BoxCloneService<
161 Request<Bytes>,
162 Response<Bytes>,
163 crate::ServerError,
164 > = tower::util::BoxCloneService::new(core_service);
165
166 // Authorization layer removed in 2.0.0 - handle at application layer
167
168 // Layer 2: Validation
169 // Validates request structure after auth but before processing
170 #[cfg(feature = "middleware")]
171 {
172 if let Some(validation_layer) = stack.validation_layer() {
173 service = tower::util::BoxCloneService::new(
174 tower::ServiceBuilder::new()
175 .layer(validation_layer)
176 .service(service),
177 );
178 }
179 }
180
181 // Layer 3: Timeout (outermost)
182 // Applied last so it can enforce timeout on the entire request pipeline
183 #[cfg(feature = "middleware")]
184 {
185 if let Some(timeout_config) = stack.timeout_config
186 && timeout_config.enabled
187 {
188 service = tower::util::BoxCloneService::new(
189 tower::ServiceBuilder::new()
190 .layer(tower_http::timeout::TimeoutLayer::new(
191 timeout_config.request_timeout,
192 ))
193 .service(service),
194 );
195 }
196 }
197
198 // Future middleware can be added here with similar if-let blocks:
199 // if let Some(auth_config) = stack.auth_config { ... }
200 // if let Some(audit_config) = stack.audit_config { ... }
201 // if let Some(rate_limit_config) = stack.rate_limit_config { ... }
202
203 service
204 }
205
206 /// Create a new server
207 #[must_use]
208 pub fn new(config: ServerConfig) -> Self {
209 Self::new_with_registry(config, HandlerRegistry::new())
210 }
211
212 /// Create a new server with an existing registry (used by ServerBuilder)
213 #[must_use]
214 pub(crate) fn new_with_registry(config: ServerConfig, registry: HandlerRegistry) -> Self {
215 let registry = Arc::new(registry);
216 let metrics = Arc::new(ServerMetrics::new());
217 let router = Arc::new(RequestRouter::new(
218 Arc::clone(®istry),
219 Arc::clone(&metrics),
220 config.clone(),
221 ));
222 // Build middleware stack configuration
223 #[cfg(feature = "middleware")]
224 #[cfg_attr(not(feature = "rate-limiting"), allow(unused_mut))]
225 let mut stack = crate::middleware::MiddlewareStack::new();
226
227 // Auto-install rate limiting if enabled in config
228 #[cfg(feature = "rate-limiting")]
229 if config.rate_limiting.enabled {
230 use crate::middleware::rate_limit::{RateLimitStrategy, RateLimits};
231 use std::num::NonZeroU32;
232 use std::time::Duration;
233
234 let rate_config = crate::middleware::RateLimitConfig {
235 strategy: RateLimitStrategy::Global,
236 limits: RateLimits {
237 requests_per_period: NonZeroU32::new(
238 config.rate_limiting.requests_per_second * 60,
239 )
240 .unwrap(), // Convert per-second to per-minute
241 period: Duration::from_secs(60),
242 burst_size: Some(NonZeroU32::new(config.rate_limiting.burst_capacity).unwrap()),
243 },
244 enabled: true,
245 };
246
247 stack = stack.with_rate_limit(rate_config);
248 }
249
250 // Create core MCP service
251 let core_service = McpService::new(
252 Arc::clone(®istry),
253 Arc::clone(&router),
254 Arc::clone(&metrics),
255 );
256
257 // COMPREHENSIVE TOWER SERVICE COMPOSITION
258 // Build the complete middleware stack with proper type erasure
259 //
260 // This service is called from server/transport.rs for EVERY incoming request:
261 // TransportMessage -> http::Request -> service.call() -> http::Response -> TransportMessage
262 //
263 // The Tower middleware stack provides:
264 // ✓ Timeout enforcement (configurable per-request)
265 // ✓ Request validation (JSON-RPC structure)
266 // ✓ Authorization checks (resource access control)
267 // ✓ Rate limiting (if enabled in config)
268 // ✓ Audit logging (configurable)
269 // ✓ And more layers as configured
270 //
271 // BoxCloneService is Clone but !Sync - this is the Tower pattern
272 #[cfg(feature = "middleware")]
273 let service = Self::build_middleware_stack(core_service, stack);
274
275 #[cfg(not(feature = "middleware"))]
276 let service = tower::util::BoxCloneService::new(core_service);
277
278 let lifecycle = Arc::new(ServerLifecycle::new());
279
280 Self {
281 config,
282 registry,
283 router,
284 service,
285 lifecycle,
286 metrics,
287 }
288 }
289
290 /// Get server configuration
291 #[must_use]
292 pub const fn config(&self) -> &ServerConfig {
293 &self.config
294 }
295
296 /// Get handler registry
297 #[must_use]
298 pub const fn registry(&self) -> &Arc<HandlerRegistry> {
299 &self.registry
300 }
301
302 /// Get request router
303 #[must_use]
304 pub const fn router(&self) -> &Arc<RequestRouter> {
305 &self.router
306 }
307
308 /// Get server lifecycle
309 #[must_use]
310 pub const fn lifecycle(&self) -> &Arc<ServerLifecycle> {
311 &self.lifecycle
312 }
313
314 /// Get server metrics
315 #[must_use]
316 pub const fn metrics(&self) -> &Arc<ServerMetrics> {
317 &self.metrics
318 }
319
320 /// Get the Tower service stack (test accessor)
321 ///
322 /// **Note**: This is primarily for integration testing. Production code should
323 /// use the transport layer which calls the service internally via
324 /// `handle_transport_message()`.
325 ///
326 /// Returns a clone of the Tower service stack, which is cheap (BoxCloneService
327 /// is designed for cloning).
328 #[doc(hidden)]
329 pub fn service(
330 &self,
331 ) -> tower::util::BoxCloneService<Request<Bytes>, Response<Bytes>, crate::ServerError> {
332 self.service.clone()
333 }
334
335 /// Get a shutdown handle for graceful server termination
336 ///
337 /// This handle enables external control over server shutdown, essential for:
338 /// - **Production deployments**: Graceful shutdown on SIGTERM/SIGINT
339 /// - **Container orchestration**: Kubernetes graceful pod termination
340 /// - **Load balancer integration**: Health check coordination
341 /// - **Multi-component systems**: Coordinated shutdown sequences
342 /// - **Maintenance operations**: Planned downtime and updates
343 ///
344 /// # Examples
345 ///
346 /// ## Basic shutdown coordination
347 /// ```no_run
348 /// # use turbomcp_server::ServerBuilder;
349 /// # #[tokio::main]
350 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
351 /// let server = ServerBuilder::new().build();
352 /// let shutdown_handle = server.shutdown_handle();
353 ///
354 /// // Coordinate with other services
355 /// tokio::spawn(async move {
356 /// // Wait for external shutdown signal
357 /// tokio::signal::ctrl_c().await.expect("Failed to install Ctrl+C handler");
358 /// println!("Shutdown signal received, terminating gracefully...");
359 /// shutdown_handle.shutdown().await;
360 /// });
361 ///
362 /// // Server will gracefully shut down when signaled
363 /// // server.run_stdio().await?;
364 /// # Ok(())
365 /// # }
366 /// ```
367 ///
368 /// ## Container/Kubernetes deployment
369 /// ```no_run
370 /// # use turbomcp_server::ServerBuilder;
371 /// # use std::sync::Arc;
372 /// # #[tokio::main]
373 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
374 /// let server = ServerBuilder::new().build();
375 /// let shutdown_handle = server.shutdown_handle();
376 /// let shutdown_handle_clone = shutdown_handle.clone();
377 ///
378 /// // Handle multiple signal types with proper platform support
379 /// tokio::spawn(async move {
380 /// #[cfg(unix)]
381 /// {
382 /// use tokio::signal::unix::{signal, SignalKind};
383 /// let mut sigterm = signal(SignalKind::terminate()).unwrap();
384 /// tokio::select! {
385 /// _ = tokio::signal::ctrl_c() => {
386 /// println!("SIGINT received");
387 /// }
388 /// _ = sigterm.recv() => {
389 /// println!("SIGTERM received");
390 /// }
391 /// }
392 /// }
393 /// #[cfg(not(unix))]
394 /// {
395 /// tokio::signal::ctrl_c().await.expect("Failed to install Ctrl+C handler");
396 /// println!("SIGINT received");
397 /// }
398 /// shutdown_handle_clone.shutdown().await;
399 /// });
400 ///
401 /// // Server handles graceful shutdown automatically
402 /// // server.run_tcp("0.0.0.0:8080").await?;
403 /// # Ok(())
404 /// # }
405 /// ```
406 pub fn shutdown_handle(&self) -> ShutdownHandle {
407 ShutdownHandle::new(self.lifecycle.clone())
408 }
409
410 /// Run the server with STDIO transport
411 ///
412 /// # Errors
413 ///
414 /// Returns [`crate::ServerError::Transport`] if:
415 /// - STDIO transport connection fails
416 /// - Message sending/receiving fails
417 /// - Transport disconnection fails
418 #[tracing::instrument(skip(self), fields(
419 transport = "stdio",
420 service_name = %self.config.name,
421 service_version = %self.config.version
422 ))]
423 pub async fn run_stdio(mut self) -> ServerResult<()> {
424 // For STDIO transport, disable logging unless explicitly overridden
425 // STDIO stdout must be reserved exclusively for JSON-RPC messages per MCP protocol
426 if should_log_for_stdio() {
427 info!("Starting MCP server with STDIO transport");
428 }
429
430 // Start performance monitoring for STDIO server
431 let _perf_span = info_span!("server.run", transport = "stdio").entered();
432 info!("Initializing STDIO transport for MCP server");
433
434 self.lifecycle.start().await;
435
436 // BIDIRECTIONAL STDIO SETUP
437 // Create STDIO dispatcher for server-initiated requests (sampling, elicitation, roots, ping)
438 let (request_tx, request_rx) = tokio::sync::mpsc::unbounded_channel();
439
440 // Use fully-qualified path to avoid ambiguity with the turbomcp crate's runtime module
441 let dispatcher = crate::runtime::StdioDispatcher::new(request_tx);
442
443 // Configure router's bidirectional support with the STDIO dispatcher
444 // SAFETY: We have &mut self, so we can safely get mutable access to the Arc'd router
445 // This is the CRITICAL STEP that was missing - without this, all server→client requests fail
446 let router = Arc::make_mut(&mut self.router);
447 router.set_server_request_dispatcher(dispatcher.clone());
448
449 // Run STDIO with full bidirectional support (MCP 2025-06-18 compliant)
450 // This uses the bidirectional-aware runtime that handles both:
451 // - Client→Server requests (tools, resources, prompts)
452 // - Server→Client requests (sampling, elicitation, roots, ping)
453 crate::runtime::run_stdio_bidirectional(self.router.clone(), dispatcher, request_rx)
454 .await
455 .map_err(|e| crate::ServerError::Handler {
456 message: format!("STDIO bidirectional runtime failed: {}", e),
457 context: Some("run_stdio".to_string()),
458 })
459 }
460
461 /// Get health status
462 pub async fn health(&self) -> HealthStatus {
463 self.lifecycle.health().await
464 }
465
466 /// Run server with HTTP transport using default configuration
467 ///
468 /// This provides a working HTTP server with:
469 /// - Standard HTTP POST/GET/DELETE for MCP protocol at `/mcp`
470 /// - Full MCP 2025-06-18 protocol compliance
471 /// - Graceful shutdown support
472 /// - Default rate limiting (100 req/60s)
473 /// - Default security settings (localhost allowed, CORS disabled)
474 ///
475 /// For custom configuration (rate limits, security, CORS), use `run_http_with_config`.
476 ///
477 /// # Examples
478 ///
479 /// ## Basic usage with default configuration
480 /// ```no_run
481 /// use turbomcp_server::ServerBuilder;
482 ///
483 /// #[tokio::main]
484 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
485 /// let server = ServerBuilder::new()
486 /// .name("my-server")
487 /// .version("1.0.0")
488 /// .build();
489 ///
490 /// server.run_http("127.0.0.1:3000").await?;
491 /// Ok(())
492 /// }
493 /// ```
494 ///
495 /// ## With custom configuration
496 /// ```no_run
497 /// use turbomcp_server::ServerBuilder;
498 /// use turbomcp_transport::streamable_http_v2::StreamableHttpConfigBuilder;
499 /// use std::time::Duration;
500 ///
501 /// #[tokio::main]
502 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
503 /// let server = ServerBuilder::new()
504 /// .name("my-server")
505 /// .version("1.0.0")
506 /// .build();
507 ///
508 /// let config = StreamableHttpConfigBuilder::new()
509 /// .without_rate_limit() // For benchmarking
510 /// .allow_any_origin(true) // Enable CORS
511 /// .build();
512 ///
513 /// server.run_http_with_config("127.0.0.1:3000", config).await?;
514 /// Ok(())
515 /// }
516 /// ```
517 ///
518 /// # Errors
519 ///
520 /// Returns [`crate::ServerError::Transport`] if:
521 /// - Address resolution fails
522 /// - HTTP server fails to start
523 /// - Transport disconnection fails
524 #[cfg(feature = "http")]
525 #[tracing::instrument(skip(self), fields(
526 transport = "http",
527 service_name = %self.config.name,
528 service_version = %self.config.version,
529 addr = ?addr
530 ))]
531 pub async fn run_http<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
532 self,
533 addr: A,
534 ) -> ServerResult<()> {
535 use turbomcp_transport::streamable_http_v2::StreamableHttpConfigBuilder;
536
537 // Build default configuration
538 let config = StreamableHttpConfigBuilder::new().build();
539
540 self.run_http_with_config(addr, config).await
541 }
542
543 /// Run server with HTTP transport and custom configuration
544 ///
545 /// This provides full control over HTTP server configuration including:
546 /// - Rate limiting (requests per time window, or disabled entirely)
547 /// - Security settings (CORS, origin validation, authentication)
548 /// - Network settings (bind address, endpoint path, keep-alive)
549 /// - Advanced settings (replay buffer size, etc.)
550 ///
551 /// # Bind Address Configuration
552 ///
553 /// **IMPORTANT**: The `addr` parameter takes precedence over `config.bind_addr`.
554 /// If they differ, a deprecation warning is logged.
555 ///
556 /// **Best Practice** (recommended for forward compatibility):
557 /// ```no_run
558 /// # use turbomcp_server::ServerBuilder;
559 /// # use turbomcp_transport::streamable_http_v2::StreamableHttpConfigBuilder;
560 /// # #[tokio::main]
561 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
562 /// let config = StreamableHttpConfigBuilder::new()
563 /// .with_bind_address("127.0.0.1:3001") // Set bind address in config
564 /// .build();
565 ///
566 /// // Pass matching addr parameter (or use default "127.0.0.1:8080")
567 /// ServerBuilder::new()
568 /// .build()
569 /// .run_http_with_config("127.0.0.1:3001", config).await?;
570 /// # Ok(())
571 /// # }
572 /// ```
573 ///
574 /// **Deprecated** (will be removed in v3.x):
575 /// ```no_run
576 /// # use turbomcp_server::ServerBuilder;
577 /// # use turbomcp_transport::streamable_http_v2::StreamableHttpConfigBuilder;
578 /// # #[tokio::main]
579 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
580 /// // ⚠️ Avoid setting different addresses - causes deprecation warning
581 /// let config = StreamableHttpConfigBuilder::new()
582 /// .with_bind_address("0.0.0.0:5000") // This is ignored!
583 /// .build();
584 ///
585 /// // The addr parameter wins (3001 is used, not 5000)
586 /// ServerBuilder::new()
587 /// .build()
588 /// .run_http_with_config("127.0.0.1:3001", config).await?;
589 /// # Ok(())
590 /// # }
591 /// ```
592 ///
593 /// **Future (v3.x)**: The `addr` parameter will be removed. Configure bind address
594 /// via `config.with_bind_address()` only.
595 ///
596 /// # Examples
597 ///
598 /// ## Benchmarking configuration (no rate limits)
599 /// ```no_run
600 /// use turbomcp_server::ServerBuilder;
601 /// use turbomcp_transport::streamable_http_v2::StreamableHttpConfigBuilder;
602 ///
603 /// #[tokio::main]
604 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
605 /// let server = ServerBuilder::new()
606 /// .name("benchmark-server")
607 /// .version("1.0.0")
608 /// .build();
609 ///
610 /// let config = StreamableHttpConfigBuilder::new()
611 /// .with_bind_address("127.0.0.1:3000")
612 /// .without_rate_limit() // Disable rate limiting
613 /// .build();
614 ///
615 /// server.run_http_with_config("127.0.0.1:3000", config).await?;
616 /// Ok(())
617 /// }
618 /// ```
619 ///
620 /// ## Production configuration (secure, rate limited)
621 /// ```no_run
622 /// use turbomcp_server::ServerBuilder;
623 /// use turbomcp_transport::streamable_http_v2::StreamableHttpConfigBuilder;
624 /// use std::time::Duration;
625 ///
626 /// #[tokio::main]
627 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
628 /// let server = ServerBuilder::new()
629 /// .name("production-server")
630 /// .version("1.0.0")
631 /// .build();
632 ///
633 /// let config = StreamableHttpConfigBuilder::new()
634 /// .with_bind_address("127.0.0.1:3000")
635 /// .with_rate_limit(1000, Duration::from_secs(60)) // 1000 req/min
636 /// .allow_any_origin(false) // Strict CORS
637 /// .require_authentication(true) // Require auth
638 /// .build();
639 ///
640 /// server.run_http_with_config("127.0.0.1:3000", config).await?;
641 /// Ok(())
642 /// }
643 /// ```
644 ///
645 /// # Errors
646 ///
647 /// Returns [`crate::ServerError::Transport`] if:
648 /// - Address resolution fails
649 /// - HTTP server fails to start
650 /// - Transport disconnection fails
651 #[cfg(feature = "http")]
652 #[tracing::instrument(skip(self, config), fields(
653 transport = "http",
654 service_name = %self.config.name,
655 service_version = %self.config.version,
656 addr = ?addr
657 ))]
658 pub async fn run_http_with_config<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
659 self,
660 addr: A,
661 mut config: turbomcp_transport::streamable_http_v2::StreamableHttpConfig,
662 ) -> ServerResult<()> {
663 use std::collections::HashMap;
664 use tokio::sync::{Mutex, RwLock};
665
666 info!("Starting MCP server with HTTP transport");
667
668 self.lifecycle.start().await;
669
670 // Resolve address to string
671 let socket_addr = addr
672 .to_socket_addrs()
673 .map_err(|e| crate::ServerError::configuration(format!("Invalid address: {}", e)))?
674 .next()
675 .ok_or_else(|| crate::ServerError::configuration("No address resolved"))?;
676
677 info!("Resolved address: {}", socket_addr);
678
679 // Check for conflicting bind addresses and warn about deprecation
680 let socket_addr_str = socket_addr.to_string();
681 if config.bind_addr != socket_addr_str {
682 warn!(
683 addr_parameter = %socket_addr_str,
684 config_bind_addr = %config.bind_addr,
685 "⚠️ DEPRECATION WARNING: The `addr` parameter takes precedence over `config.bind_addr`"
686 );
687 warn!(
688 "⚠️ In TurboMCP v3.x, the `addr` parameter will be removed. Please use StreamableHttpConfigBuilder::new().with_bind_address(\"{}\").build() instead",
689 socket_addr_str
690 );
691 warn!(
692 "⚠️ Avoid setting both `addr` parameter and `config.bind_addr` to prevent confusion"
693 );
694
695 // Update config to single source of truth
696 config.bind_addr = socket_addr_str.clone();
697 config.base_url = format!("http://{}", socket_addr_str);
698 }
699
700 info!(
701 config = ?config,
702 "HTTP configuration (updated with resolved address)"
703 );
704
705 // BIDIRECTIONAL HTTP SETUP
706 // Create shared state for session management and bidirectional MCP
707 let sessions = Arc::new(RwLock::new(HashMap::new()));
708 let pending_requests = Arc::new(Mutex::new(HashMap::new()));
709
710 // Share router across all sessions (routing logic and handler registry)
711 let router = self.router.clone();
712
713 // Capture server identity for MCP protocol compliance
714 let server_info = turbomcp_protocol::ServerInfo {
715 name: self.config.name.clone(),
716 version: self.config.version.clone(),
717 };
718
719 // Factory pattern: create session-specific router for each HTTP request
720 // This is the clean architecture that HTTP requires - each session gets its own
721 // bidirectional dispatcher while sharing the routing logic
722 let sessions_for_factory = Arc::clone(&sessions);
723 let pending_for_factory = Arc::clone(&pending_requests);
724 let router_for_factory = Arc::clone(&router);
725
726 let handler_factory = move |session_id: Option<String>| {
727 let session_id = session_id.unwrap_or_else(|| {
728 let new_id = uuid::Uuid::new_v4().to_string();
729 tracing::warn!(
730 "⚠️ Factory generating random session ID (no session ID provided): {}",
731 new_id
732 );
733 new_id
734 });
735
736 tracing::debug!("Factory creating handler for session: {}", session_id);
737
738 // Create session-specific HTTP dispatcher (now local to turbomcp-server!)
739 let dispatcher = crate::runtime::http::HttpDispatcher::new(
740 session_id,
741 Arc::clone(&sessions_for_factory),
742 Arc::clone(&pending_for_factory),
743 );
744
745 // Clone the base router and configure with session-specific dispatcher
746 // CRITICAL: set_server_request_dispatcher also recreates server_to_client adapter
747 let mut session_router = (*router_for_factory).clone();
748 session_router.set_server_request_dispatcher(dispatcher);
749
750 session_router
751 };
752
753 info!(
754 server_name = %server_info.name,
755 server_version = %server_info.version,
756 bind_addr = %socket_addr,
757 endpoint_path = %config.endpoint_path,
758 "HTTP server starting with full bidirectional support (elicitation, sampling, roots, ping)"
759 );
760
761 // Use factory-based HTTP server with full bidirectional support
762 use crate::runtime::http::run_http;
763 run_http(
764 handler_factory,
765 sessions,
766 pending_requests,
767 socket_addr.to_string(),
768 config.endpoint_path.clone(),
769 )
770 .await
771 .map_err(|e| {
772 tracing::error!(error = %e, "HTTP server failed");
773 crate::ServerError::handler(e.to_string())
774 })?;
775
776 info!("HTTP server shutdown complete");
777 Ok(())
778 }
779
780 /// Run server with WebSocket transport (full bidirectional support)
781 ///
782 /// This provides a simple API for WebSocket servers with sensible defaults:
783 /// - Default endpoint: `/mcp/ws`
784 /// - Full MCP 2025-06-18 compliance
785 /// - Bidirectional communication
786 /// - Elicitation support
787 /// - Session management and middleware
788 ///
789 /// For custom configuration, use `run_websocket_with_config()`.
790 ///
791 /// # Example
792 ///
793 /// ```no_run
794 /// use turbomcp_server::ServerBuilder;
795 ///
796 /// #[tokio::main]
797 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
798 /// let server = ServerBuilder::new()
799 /// .name("ws-server")
800 /// .version("1.0.0")
801 /// .build();
802 ///
803 /// server.run_websocket("127.0.0.1:8080").await?;
804 /// Ok(())
805 /// }
806 /// ```
807 #[cfg(feature = "websocket")]
808 #[tracing::instrument(skip(self), fields(
809 transport = "websocket",
810 service_name = %self.config.name,
811 service_version = %self.config.version,
812 addr = ?addr
813 ))]
814 pub async fn run_websocket<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
815 self,
816 addr: A,
817 ) -> ServerResult<()> {
818 use crate::runtime::websocket::WebSocketServerConfig;
819
820 // Build default configuration
821 let config = WebSocketServerConfig::default();
822
823 self.run_websocket_with_config(addr, config).await
824 }
825
826 /// Run server with WebSocket transport and custom configuration
827 ///
828 /// This provides full control over WebSocket server configuration including:
829 /// - Custom endpoint path
830 /// - MCP server settings (middleware, security, etc.)
831 ///
832 /// # Example
833 ///
834 /// ```no_run
835 /// use turbomcp_server::{ServerBuilder, WebSocketServerConfig};
836 ///
837 /// #[tokio::main]
838 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
839 /// let server = ServerBuilder::new()
840 /// .name("custom-ws-server")
841 /// .version("1.0.0")
842 /// .build();
843 ///
844 /// let config = WebSocketServerConfig {
845 /// bind_addr: "0.0.0.0:8080".to_string(),
846 /// endpoint_path: "/custom/ws".to_string(),
847 /// };
848 ///
849 /// server.run_websocket_with_config("127.0.0.1:8080", config).await?;
850 /// Ok(())
851 /// }
852 /// ```
853 #[cfg(feature = "websocket")]
854 #[tracing::instrument(skip(self, config), fields(
855 transport = "websocket",
856 service_name = %self.config.name,
857 service_version = %self.config.version,
858 addr = ?addr
859 ))]
860 pub async fn run_websocket_with_config<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
861 self,
862 addr: A,
863 config: crate::runtime::websocket::WebSocketServerConfig,
864 ) -> ServerResult<()> {
865 info!("Starting MCP server with WebSocket transport");
866 info!(config = ?config, "WebSocket configuration");
867
868 self.lifecycle.start().await;
869
870 // Resolve address to string
871 let socket_addr = addr
872 .to_socket_addrs()
873 .map_err(|e| crate::ServerError::configuration(format!("Invalid address: {}", e)))?
874 .next()
875 .ok_or_else(|| crate::ServerError::configuration("No address resolved"))?;
876
877 info!("Resolved address: {}", socket_addr);
878
879 // Capture server identity for MCP protocol compliance
880 let server_info = turbomcp_protocol::ServerInfo {
881 name: self.config.name.clone(),
882 version: self.config.version.clone(),
883 };
884
885 // Router for this server (shared across all connections)
886 let router = (*self.router).clone();
887
888 // Wrapper factory: configure router with per-connection dispatcher
889 // This is the same clean architecture as HTTP - each connection gets its own
890 // bidirectional dispatcher while sharing the routing logic
891 let wrapper_factory =
892 move |mut base_router: crate::routing::RequestRouter,
893 dispatcher: crate::runtime::websocket::WebSocketServerDispatcher| {
894 // Clone the router for this connection and configure with dispatcher
895 // CRITICAL: set_server_request_dispatcher also recreates server_to_client adapter
896 base_router.set_server_request_dispatcher(dispatcher);
897 base_router
898 };
899
900 info!(
901 server_name = %server_info.name,
902 server_version = %server_info.version,
903 bind_addr = %socket_addr,
904 endpoint_path = %config.endpoint_path,
905 "WebSocket server starting with full bidirectional support (elicitation, sampling, roots, ping)"
906 );
907
908 // Use factory-based WebSocket server with full bidirectional support
909 use crate::runtime::websocket::run_websocket;
910
911 // Update config with resolved bind address
912 let ws_config = crate::runtime::websocket::WebSocketServerConfig {
913 bind_addr: socket_addr.to_string(),
914 endpoint_path: config.endpoint_path.clone(),
915 max_concurrent_requests: config.max_concurrent_requests,
916 };
917
918 run_websocket(router, wrapper_factory, ws_config)
919 .await
920 .map_err(|e| {
921 tracing::error!(error = %e, "WebSocket server failed");
922 crate::ServerError::handler(e.to_string())
923 })?;
924
925 info!("WebSocket server shutdown complete");
926 Ok(())
927 }
928
929 /// Run server with TCP transport (progressive enhancement - runtime configuration)
930 #[cfg(feature = "tcp")]
931 #[tracing::instrument(skip(self), fields(
932 transport = "tcp",
933 service_name = %self.config.name,
934 service_version = %self.config.version,
935 addr = ?addr
936 ))]
937 pub async fn run_tcp<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
938 mut self,
939 addr: A,
940 ) -> ServerResult<()> {
941 use turbomcp_transport::TcpTransport;
942
943 // Start performance monitoring for TCP server
944 let _perf_span = info_span!("server.run", transport = "tcp").entered();
945 info!(?addr, "Starting MCP server with TCP transport");
946
947 self.lifecycle.start().await;
948
949 // Convert ToSocketAddrs to SocketAddr
950 let socket_addr = match addr.to_socket_addrs() {
951 Ok(mut addrs) => match addrs.next() {
952 Some(addr) => addr,
953 None => {
954 tracing::error!("No socket address resolved from provided address");
955 self.lifecycle.shutdown().await;
956 return Err(crate::ServerError::configuration("Invalid socket address"));
957 }
958 },
959 Err(e) => {
960 tracing::error!(error = %e, "Failed to resolve socket address");
961 self.lifecycle.shutdown().await;
962 return Err(crate::ServerError::configuration(format!(
963 "Address resolution failed: {e}"
964 )));
965 }
966 };
967
968 let transport = TcpTransport::new_server(socket_addr);
969 if let Err(e) = transport.connect().await {
970 tracing::error!(error = %e, "Failed to connect TCP transport");
971 self.lifecycle.shutdown().await;
972 return Err(e.into());
973 }
974
975 // BIDIRECTIONAL TCP SETUP
976 // Create generic transport dispatcher for server-initiated requests
977 let dispatcher = crate::runtime::TransportDispatcher::new(transport);
978
979 // Configure router's bidirectional support with the TCP dispatcher
980 // This enables ctx.elicit(), ctx.create_message(), ctx.list_roots(), etc.
981 let router = Arc::make_mut(&mut self.router);
982 router.set_server_request_dispatcher(dispatcher.clone());
983
984 // Run TCP with full bidirectional support (MCP 2025-06-18 compliant)
985 // This uses the generic bidirectional runtime that handles both:
986 // - Client→Server requests (tools, resources, prompts)
987 // - Server→Client requests (sampling, elicitation, roots, ping)
988 crate::runtime::run_transport_bidirectional(self.router.clone(), dispatcher)
989 .await
990 .map_err(|e| crate::ServerError::Handler {
991 message: format!("TCP bidirectional runtime failed: {}", e),
992 context: Some("run_tcp".to_string()),
993 })
994 }
995
996 /// Run server with Unix socket transport (progressive enhancement - runtime configuration)
997 #[cfg(all(feature = "unix", unix))]
998 #[tracing::instrument(skip(self), fields(
999 transport = "unix",
1000 service_name = %self.config.name,
1001 service_version = %self.config.version,
1002 path = ?path.as_ref()
1003 ))]
1004 pub async fn run_unix<P: AsRef<std::path::Path>>(mut self, path: P) -> ServerResult<()> {
1005 use std::path::PathBuf;
1006 use turbomcp_transport::UnixTransport;
1007
1008 // Start performance monitoring for Unix server
1009 let _perf_span = info_span!("server.run", transport = "unix").entered();
1010 info!(path = ?path.as_ref(), "Starting MCP server with Unix socket transport");
1011
1012 self.lifecycle.start().await;
1013
1014 let socket_path = PathBuf::from(path.as_ref());
1015 let transport = UnixTransport::new_server(socket_path);
1016 if let Err(e) = transport.connect().await {
1017 tracing::error!(error = %e, "Failed to connect Unix socket transport");
1018 self.lifecycle.shutdown().await;
1019 return Err(e.into());
1020 }
1021
1022 // BIDIRECTIONAL UNIX SOCKET SETUP
1023 // Create generic transport dispatcher for server-initiated requests
1024 let dispatcher = crate::runtime::TransportDispatcher::new(transport);
1025
1026 // Configure router's bidirectional support with the Unix socket dispatcher
1027 // This enables ctx.elicit(), ctx.create_message(), ctx.list_roots(), etc.
1028 let router = Arc::make_mut(&mut self.router);
1029 router.set_server_request_dispatcher(dispatcher.clone());
1030
1031 // Run Unix Socket with full bidirectional support (MCP 2025-06-18 compliant)
1032 // This uses the generic bidirectional runtime that handles both:
1033 // - Client→Server requests (tools, resources, prompts)
1034 // - Server→Client requests (sampling, elicitation, roots, ping)
1035 crate::runtime::run_transport_bidirectional(self.router.clone(), dispatcher)
1036 .await
1037 .map_err(|e| crate::ServerError::Handler {
1038 message: format!("Unix socket bidirectional runtime failed: {}", e),
1039 context: Some("run_unix".to_string()),
1040 })
1041 }
1042
1043 /// Generic transport runner (DRY principle)
1044 /// Used by feature-gated transport methods (http, tcp, websocket, unix)
1045 #[allow(dead_code)]
1046 #[tracing::instrument(skip(self, transport), fields(
1047 service_name = %self.config.name,
1048 service_version = %self.config.version
1049 ))]
1050 async fn run_with_transport<T: Transport>(&self, mut transport: T) -> ServerResult<()> {
1051 // Install signal handlers for graceful shutdown (Ctrl+C / SIGTERM)
1052 let lifecycle_for_sigint = self.lifecycle.clone();
1053 tokio::spawn(async move {
1054 if let Err(e) = tokio::signal::ctrl_c().await {
1055 tracing::warn!(error = %e, "Failed to install Ctrl+C handler");
1056 return;
1057 }
1058 tracing::info!("Ctrl+C received, initiating shutdown");
1059 lifecycle_for_sigint.shutdown().await;
1060 });
1061
1062 #[cfg(unix)]
1063 {
1064 let lifecycle_for_sigterm = self.lifecycle.clone();
1065 tokio::spawn(async move {
1066 use tokio::signal::unix::{SignalKind, signal};
1067 match signal(SignalKind::terminate()) {
1068 Ok(mut sigterm) => {
1069 sigterm.recv().await;
1070 tracing::info!("SIGTERM received, initiating shutdown");
1071 lifecycle_for_sigterm.shutdown().await;
1072 }
1073 Err(e) => tracing::warn!(error = %e, "Failed to install SIGTERM handler"),
1074 }
1075 });
1076 }
1077
1078 // Shutdown signal
1079 let mut shutdown = self.lifecycle.shutdown_signal();
1080
1081 // Main message processing loop
1082 loop {
1083 tokio::select! {
1084 _ = shutdown.recv() => {
1085 tracing::info!("Shutdown signal received");
1086 break;
1087 }
1088 res = transport.receive() => {
1089 match res {
1090 Ok(Some(message)) => {
1091 if let Err(e) = self.handle_transport_message(&mut transport, message).await {
1092 tracing::warn!(error = %e, "Failed to handle transport message");
1093 }
1094 }
1095 Ok(None) => {
1096 // No message available; sleep briefly to avoid busy loop
1097 sleep(Duration::from_millis(5)).await;
1098 }
1099 Err(e) => {
1100 match e {
1101 TransportError::ReceiveFailed(msg) if msg.contains("disconnected") => {
1102 tracing::info!("Transport receive channel disconnected; shutting down");
1103 break;
1104 }
1105 _ => {
1106 tracing::error!(error = %e, "Transport receive failed");
1107 // Backoff on errors
1108 sleep(Duration::from_millis(50)).await;
1109 }
1110 }
1111 }
1112 }
1113 }
1114 }
1115 }
1116
1117 // Disconnect transport
1118 if let Err(e) = transport.disconnect().await {
1119 tracing::warn!(error = %e, "Error while disconnecting transport");
1120 }
1121
1122 tracing::info!("Server shutdown complete");
1123 Ok(())
1124 }
1125}
1126
1127// Compile-time assertion that McpServer is Send + Clone (Tower pattern)
1128// Note: McpServer is Clone but NOT Sync (due to BoxCloneService being !Sync)
1129// This is intentional and follows the Axum/Tower design pattern
1130#[allow(dead_code)]
1131const _: () = {
1132 const fn assert_send_clone<T: Send + Clone>() {}
1133 const fn check() {
1134 assert_send_clone::<crate::server::core::McpServer>();
1135 }
1136};