turbomcp_server/server/core.rs
1//! Core MCP server implementation
2//!
3//! Contains the main McpServer struct and its core functionality including
4//! middleware building, lifecycle management, and server construction.
5
6use std::sync::Arc;
7use tracing::{info, info_span};
8
9#[cfg(feature = "http")]
10use tracing::warn;
11
12use crate::{
13 config::ServerConfig,
14 error::ServerResult,
15 lifecycle::{HealthStatus, ServerLifecycle},
16 metrics::ServerMetrics,
17 registry::HandlerRegistry,
18 routing::RequestRouter,
19 service::McpService,
20};
21
22#[cfg(feature = "middleware")]
23use crate::middleware::MiddlewareStack;
24
25use bytes::Bytes;
26use http::{Request, Response};
27use tokio::time::{Duration, sleep};
28use turbomcp_transport::Transport;
29use turbomcp_transport::core::TransportError;
30
31use super::shutdown::ShutdownHandle;
32
33/// Check if logging should be enabled for STDIO transport
34///
35/// For MCP STDIO transport compliance, logging is disabled by default since stdout
36/// must be reserved exclusively for JSON-RPC messages. This can be overridden by
37/// setting the TURBOMCP_FORCE_LOGGING environment variable.
38pub(crate) fn should_log_for_stdio() -> bool {
39 std::env::var("TURBOMCP_FORCE_LOGGING").is_ok()
40}
41
42/// Wrapper that holds router + headers + tenant_id and implements JsonRpcHandler
43/// This allows us to pass headers and tenant info to create_context without storing them on the router.
44/// Used by both HTTP and WebSocket transports.
45#[cfg(any(feature = "http", feature = "websocket"))]
46struct HttpHandlerWithHeaders {
47 router: crate::routing::RequestRouter,
48 headers: Option<std::collections::HashMap<String, String>>,
49 transport: &'static str,
50 tenant_id: Option<String>,
51}
52
53#[cfg(any(feature = "http", feature = "websocket"))]
54#[async_trait::async_trait]
55impl turbomcp_protocol::JsonRpcHandler for HttpHandlerWithHeaders {
56 async fn handle_request(&self, req_value: serde_json::Value) -> serde_json::Value {
57 use turbomcp_protocol::jsonrpc::JsonRpcRequest;
58
59 // Parse the request
60 let req: JsonRpcRequest = match serde_json::from_value(req_value) {
61 Ok(r) => r,
62 Err(e) => {
63 return serde_json::json!({
64 "jsonrpc": "2.0",
65 "error": {
66 "code": -32700,
67 "message": format!("Parse error: {}", e)
68 },
69 "id": null
70 });
71 }
72 };
73
74 // Create context with headers, transport type, and tenant_id
75 // tenant_id is extracted from request extensions by the HTTP/WebSocket handlers
76 // if TenantExtractionLayer middleware was applied
77 let ctx = self.router.create_context(
78 self.headers.clone(),
79 Some(self.transport),
80 self.tenant_id.clone(),
81 );
82
83 // Route the request
84 let response = self.router.route(req, ctx).await;
85
86 // Serialize response
87 match serde_json::to_value(&response) {
88 Ok(v) => v,
89 Err(e) => {
90 serde_json::json!({
91 "jsonrpc": "2.0",
92 "error": {
93 "code": -32603,
94 "message": format!("Internal error: failed to serialize response: {}", e)
95 },
96 "id": response.id
97 })
98 }
99 }
100 }
101
102 fn server_info(&self) -> turbomcp_protocol::ServerInfo {
103 self.router.server_info()
104 }
105}
106
107/// Main MCP server following the Axum/Tower Clone pattern
108///
109/// ## Sharing Pattern
110///
111/// `McpServer` implements `Clone` like Axum's `Router`. All heavy state is Arc-wrapped
112/// internally, making cloning cheap (just atomic reference count increments).
113///
114/// ```rust,no_run
115/// use turbomcp_server::ServerBuilder;
116///
117/// # async fn example() {
118/// let server = ServerBuilder::new().build();
119///
120/// // Clone for passing to functions (cheap - just Arc increments)
121/// let server1 = server.clone();
122/// let server2 = server.clone();
123///
124/// // Access config and health
125/// let config = server1.config();
126/// println!("Server: {}", config.name);
127///
128/// let health = server2.health().await;
129/// println!("Health: {:?}", health);
130/// # }
131/// ```
132///
133/// ## Architecture Notes
134///
135/// The `service` field contains `BoxCloneService` which is `Send + Clone` but NOT `Sync`.
136/// This is intentional and follows Tower's design - users clone the server instead of
137/// Arc-wrapping it.
138///
139/// **Architecture Note**: The service field provides tower::Service integration for
140/// advanced middleware patterns. The request processing pipeline currently uses the
141/// RequestRouter directly. Tower integration can be added via custom middleware layers
142/// when needed for specific use cases (e.g., custom rate limiting, advanced tracing).
143#[derive(Clone)]
144pub struct McpServer {
145 /// Server configuration (Clone-able)
146 pub(crate) config: ServerConfig,
147 /// Handler registry (Arc-wrapped for cheap cloning)
148 pub(crate) registry: Arc<HandlerRegistry>,
149 /// Request router (Arc-wrapped for cheap cloning)
150 pub(crate) router: Arc<RequestRouter>,
151 /// Tower middleware service stack (Clone but !Sync - this is the Tower pattern)
152 ///
153 /// All requests flow through this service stack, which provides:
154 /// - Timeout enforcement
155 /// - Request validation
156 /// - Authorization checks
157 /// - Rate limiting
158 /// - Audit logging
159 /// - And more middleware layers as configured
160 ///
161 /// See `server/transport.rs` for integration with transport layer.
162 pub(crate) service:
163 tower::util::BoxCloneService<Request<Bytes>, Response<Bytes>, crate::ServerError>,
164 /// Server lifecycle (Arc-wrapped for cheap cloning)
165 pub(crate) lifecycle: Arc<ServerLifecycle>,
166 /// Server metrics (Arc-wrapped for cheap cloning)
167 pub(crate) metrics: Arc<ServerMetrics>,
168 /// Task storage for MCP Tasks API (SEP-1686)
169 #[cfg(feature = "mcp-tasks")]
170 pub(crate) task_storage: Arc<crate::task_storage::TaskStorage>,
171}
172
173impl std::fmt::Debug for McpServer {
174 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
175 f.debug_struct("McpServer")
176 .field("config", &self.config)
177 .finish()
178 }
179}
180
181impl McpServer {
182 /// Build comprehensive Tower middleware stack (transport-agnostic)
183 ///
184 /// ## Architecture
185 ///
186 /// This creates a complete Tower service stack with conditional middleware layers:
187 /// - **Timeout Layer**: Request timeout enforcement (tower_http)
188 /// - **Validation Layer**: JSON-RPC structure validation
189 /// - **Authorization Layer**: Resource access control
190 /// - **Core Service**: JSON-RPC routing and handler execution
191 ///
192 /// All middleware is composed using Tower's ServiceBuilder pattern, which provides:
193 /// - Top-to-bottom execution order
194 /// - Type-safe layer composition
195 /// - Zero-cost abstractions
196 /// - Clone-able service instances
197 ///
198 /// ## Integration
199 ///
200 /// The resulting BoxCloneService is stored in `self.service` and called from
201 /// `server/transport.rs` for every incoming request. This ensures ALL requests
202 /// flow through the complete middleware pipeline before reaching handlers.
203 ///
204 /// ## Adding Middleware
205 ///
206 /// To add new middleware, update the match arms below to include your layer.
207 /// Follow the pattern of conditional inclusion based on config flags.
208 #[cfg(feature = "middleware")]
209 fn build_middleware_stack(
210 core_service: McpService,
211 stack: MiddlewareStack,
212 ) -> tower::util::BoxCloneService<Request<Bytes>, Response<Bytes>, crate::ServerError> {
213 // COMPREHENSIVE TOWER COMPOSITION - Conditional Layer Stacking
214 //
215 // This approach builds the middleware stack incrementally, boxing at each step.
216 // While this has a small performance cost from multiple boxing operations,
217 // it provides several critical advantages:
218 //
219 // 1. **Maintainability**: No combinatorial explosion (8 match arms → simple chain)
220 // 2. **Extensibility**: Adding new middleware requires only one new block
221 // 3. **Clarity**: Each layer's purpose and configuration is explicit
222 // 4. **Type Safety**: BoxCloneService provides type erasure while preserving Clone
223 //
224 // Performance note: The boxing overhead is negligible compared to network I/O
225 // and handler execution time. Modern allocators make this essentially free.
226
227 // Start with core service as a boxed service for uniform type handling
228 let mut service: tower::util::BoxCloneService<
229 Request<Bytes>,
230 Response<Bytes>,
231 crate::ServerError,
232 > = tower::util::BoxCloneService::new(core_service);
233
234 // Authorization layer removed in 2.0.0 - handle at application layer
235
236 // Layer 2: Validation
237 // Validates request structure after auth but before processing
238 #[cfg(feature = "middleware")]
239 {
240 if let Some(validation_layer) = stack.validation_layer() {
241 service = tower::util::BoxCloneService::new(
242 tower::ServiceBuilder::new()
243 .layer(validation_layer)
244 .service(service),
245 );
246 }
247 }
248
249 // Layer 3: Timeout (outermost)
250 // Applied last so it can enforce timeout on the entire request pipeline
251 #[cfg(feature = "middleware")]
252 {
253 if let Some(timeout_config) = stack.timeout_config
254 && timeout_config.enabled
255 {
256 service = tower::util::BoxCloneService::new(
257 tower::ServiceBuilder::new()
258 .layer(tower_http::timeout::TimeoutLayer::new(
259 timeout_config.request_timeout,
260 ))
261 .service(service),
262 );
263 }
264 }
265
266 // Future middleware can be added here with similar if-let blocks:
267 // if let Some(auth_config) = stack.auth_config { ... }
268 // if let Some(audit_config) = stack.audit_config { ... }
269 // if let Some(rate_limit_config) = stack.rate_limit_config { ... }
270
271 service
272 }
273
274 /// Create a new server
275 #[must_use]
276 pub fn new(config: ServerConfig) -> Self {
277 Self::new_with_registry(config, HandlerRegistry::new())
278 }
279
280 /// Create a new server with an existing registry (used by ServerBuilder)
281 #[must_use]
282 pub(crate) fn new_with_registry(config: ServerConfig, registry: HandlerRegistry) -> Self {
283 let registry = Arc::new(registry);
284 let metrics = Arc::new(ServerMetrics::new());
285
286 // Initialize task storage (SEP-1686)
287 #[cfg(feature = "mcp-tasks")]
288 let task_storage = {
289 use tokio::time::Duration;
290 let storage = crate::task_storage::TaskStorage::new(Duration::from_secs(60));
291 // Start background cleanup task
292 storage.start_cleanup();
293 Arc::new(storage)
294 };
295
296 let router = Arc::new(RequestRouter::new(
297 Arc::clone(®istry),
298 Arc::clone(&metrics),
299 config.clone(),
300 #[cfg(feature = "mcp-tasks")]
301 Some(Arc::clone(&task_storage)),
302 ));
303 // Build middleware stack configuration
304 #[cfg(feature = "middleware")]
305 #[cfg_attr(not(feature = "rate-limiting"), allow(unused_mut))]
306 let mut stack = crate::middleware::MiddlewareStack::new();
307
308 // Auto-install rate limiting if enabled in config
309 #[cfg(feature = "rate-limiting")]
310 if config.rate_limiting.enabled {
311 use crate::middleware::rate_limit::{RateLimitStrategy, RateLimits};
312 use std::num::NonZeroU32;
313 use std::time::Duration;
314
315 let rate_config = crate::middleware::RateLimitConfig {
316 strategy: RateLimitStrategy::Global,
317 limits: RateLimits {
318 requests_per_period: NonZeroU32::new(
319 config.rate_limiting.requests_per_second * 60,
320 )
321 .unwrap(), // Convert per-second to per-minute
322 period: Duration::from_secs(60),
323 burst_size: Some(NonZeroU32::new(config.rate_limiting.burst_capacity).unwrap()),
324 },
325 enabled: true,
326 };
327
328 stack = stack.with_rate_limit(rate_config);
329 }
330
331 // Create core MCP service
332 let core_service = McpService::new(
333 Arc::clone(®istry),
334 Arc::clone(&router),
335 Arc::clone(&metrics),
336 );
337
338 // COMPREHENSIVE TOWER SERVICE COMPOSITION
339 // Build the complete middleware stack with proper type erasure
340 //
341 // This service is called from server/transport.rs for EVERY incoming request:
342 // TransportMessage -> http::Request -> service.call() -> http::Response -> TransportMessage
343 //
344 // The Tower middleware stack provides:
345 // ✓ Timeout enforcement (configurable per-request)
346 // ✓ Request validation (JSON-RPC structure)
347 // ✓ Authorization checks (resource access control)
348 // ✓ Rate limiting (if enabled in config)
349 // ✓ Audit logging (configurable)
350 // ✓ And more layers as configured
351 //
352 // BoxCloneService is Clone but !Sync - this is the Tower pattern
353 #[cfg(feature = "middleware")]
354 let service = Self::build_middleware_stack(core_service, stack);
355
356 #[cfg(not(feature = "middleware"))]
357 let service = tower::util::BoxCloneService::new(core_service);
358
359 let lifecycle = Arc::new(ServerLifecycle::new());
360
361 Self {
362 config,
363 registry,
364 router,
365 service,
366 lifecycle,
367 metrics,
368 #[cfg(feature = "mcp-tasks")]
369 task_storage,
370 }
371 }
372
373 /// Get server configuration
374 #[must_use]
375 pub const fn config(&self) -> &ServerConfig {
376 &self.config
377 }
378
379 /// Get handler registry
380 #[must_use]
381 pub const fn registry(&self) -> &Arc<HandlerRegistry> {
382 &self.registry
383 }
384
385 /// Get request router
386 #[must_use]
387 pub const fn router(&self) -> &Arc<RequestRouter> {
388 &self.router
389 }
390
391 /// Get server lifecycle
392 #[must_use]
393 pub const fn lifecycle(&self) -> &Arc<ServerLifecycle> {
394 &self.lifecycle
395 }
396
397 /// Get server metrics
398 #[must_use]
399 pub const fn metrics(&self) -> &Arc<ServerMetrics> {
400 &self.metrics
401 }
402
403 /// Get task storage (MCP Tasks API - SEP-1686)
404 ///
405 /// Returns the task storage instance for managing long-running operations.
406 /// Only available when the `mcp-tasks` feature is enabled.
407 #[cfg(feature = "mcp-tasks")]
408 #[must_use]
409 pub const fn task_storage(&self) -> &Arc<crate::task_storage::TaskStorage> {
410 &self.task_storage
411 }
412
413 /// Get the Tower service stack (test accessor)
414 ///
415 /// **Note**: This is primarily for integration testing. Production code should
416 /// use the transport layer which calls the service internally via
417 /// `handle_transport_message()`.
418 ///
419 /// Returns a clone of the Tower service stack, which is cheap (BoxCloneService
420 /// is designed for cloning).
421 #[doc(hidden)]
422 pub fn service(
423 &self,
424 ) -> tower::util::BoxCloneService<Request<Bytes>, Response<Bytes>, crate::ServerError> {
425 self.service.clone()
426 }
427
428 /// Get a shutdown handle for graceful server termination
429 ///
430 /// This handle enables external control over server shutdown, essential for:
431 /// - **Production deployments**: Graceful shutdown on SIGTERM/SIGINT
432 /// - **Container orchestration**: Kubernetes graceful pod termination
433 /// - **Load balancer integration**: Health check coordination
434 /// - **Multi-component systems**: Coordinated shutdown sequences
435 /// - **Maintenance operations**: Planned downtime and updates
436 ///
437 /// # Examples
438 ///
439 /// ## Basic shutdown coordination
440 /// ```no_run
441 /// # use turbomcp_server::ServerBuilder;
442 /// # #[tokio::main]
443 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
444 /// let server = ServerBuilder::new().build();
445 /// let shutdown_handle = server.shutdown_handle();
446 ///
447 /// // Coordinate with other services
448 /// tokio::spawn(async move {
449 /// // Wait for external shutdown signal
450 /// tokio::signal::ctrl_c().await.expect("Failed to install Ctrl+C handler");
451 /// println!("Shutdown signal received, terminating gracefully...");
452 /// shutdown_handle.shutdown().await;
453 /// });
454 ///
455 /// // Server will gracefully shut down when signaled
456 /// // server.run_stdio().await?;
457 /// # Ok(())
458 /// # }
459 /// ```
460 ///
461 /// ## Container/Kubernetes deployment
462 /// ```no_run
463 /// # use turbomcp_server::ServerBuilder;
464 /// # use std::sync::Arc;
465 /// # #[tokio::main]
466 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
467 /// let server = ServerBuilder::new().build();
468 /// let shutdown_handle = server.shutdown_handle();
469 /// let shutdown_handle_clone = shutdown_handle.clone();
470 ///
471 /// // Handle multiple signal types with proper platform support
472 /// tokio::spawn(async move {
473 /// #[cfg(unix)]
474 /// {
475 /// use tokio::signal::unix::{signal, SignalKind};
476 /// let mut sigterm = signal(SignalKind::terminate()).unwrap();
477 /// tokio::select! {
478 /// _ = tokio::signal::ctrl_c() => {
479 /// println!("SIGINT received");
480 /// }
481 /// _ = sigterm.recv() => {
482 /// println!("SIGTERM received");
483 /// }
484 /// }
485 /// }
486 /// #[cfg(not(unix))]
487 /// {
488 /// tokio::signal::ctrl_c().await.expect("Failed to install Ctrl+C handler");
489 /// println!("SIGINT received");
490 /// }
491 /// shutdown_handle_clone.shutdown().await;
492 /// });
493 ///
494 /// // Server handles graceful shutdown automatically
495 /// // server.run_tcp("0.0.0.0:8080").await?;
496 /// # Ok(())
497 /// # }
498 /// ```
499 pub fn shutdown_handle(&self) -> ShutdownHandle {
500 ShutdownHandle::new(self.lifecycle.clone())
501 }
502
503 /// Run the server with STDIO transport
504 ///
505 /// # Errors
506 ///
507 /// Returns [`crate::ServerError::Transport`] if:
508 /// - STDIO transport connection fails
509 /// - Message sending/receiving fails
510 /// - Transport disconnection fails
511 #[tracing::instrument(skip(self), fields(
512 transport = "stdio",
513 service_name = %self.config.name,
514 service_version = %self.config.version
515 ))]
516 pub async fn run_stdio(mut self) -> ServerResult<()> {
517 // For STDIO transport, disable logging unless explicitly overridden
518 // STDIO stdout must be reserved exclusively for JSON-RPC messages per MCP protocol
519 if should_log_for_stdio() {
520 info!("Starting MCP server with STDIO transport");
521 }
522
523 // Start performance monitoring for STDIO server
524 let _perf_span = info_span!("server.run", transport = "stdio").entered();
525 info!("Initializing STDIO transport for MCP server");
526
527 self.lifecycle.start().await;
528
529 // BIDIRECTIONAL STDIO SETUP
530 // Create STDIO dispatcher for server-initiated requests (sampling, elicitation, roots, ping)
531 let (request_tx, request_rx) = tokio::sync::mpsc::unbounded_channel();
532
533 // Use fully-qualified path to avoid ambiguity with the turbomcp crate's runtime module
534 let dispatcher = crate::runtime::StdioDispatcher::new(request_tx);
535
536 // Configure router's bidirectional support with the STDIO dispatcher
537 // SAFETY: We have &mut self, so we can safely get mutable access to the Arc'd router
538 // This is the CRITICAL STEP that was missing - without this, all server→client requests fail
539 let router = Arc::make_mut(&mut self.router);
540 router.set_server_request_dispatcher(dispatcher.clone());
541
542 // Run STDIO with full bidirectional support (MCP 2025-11-25 compliant)
543 // This uses the bidirectional-aware runtime that handles both:
544 // - Client→Server requests (tools, resources, prompts)
545 // - Server→Client requests (sampling, elicitation, roots, ping)
546 crate::runtime::run_stdio_bidirectional(self.router.clone(), dispatcher, request_rx)
547 .await
548 .map_err(|e| crate::ServerError::Handler {
549 message: format!("STDIO bidirectional runtime failed: {}", e),
550 context: Some("run_stdio".to_string()),
551 })
552 }
553
554 /// Get health status
555 pub async fn health(&self) -> HealthStatus {
556 self.lifecycle.health().await
557 }
558
559 /// Run server with HTTP transport using default configuration
560 ///
561 /// This provides a working HTTP server with:
562 /// - Standard HTTP POST/GET/DELETE for MCP protocol at `/mcp`
563 /// - Full MCP 2025-11-25 protocol compliance
564 /// - Graceful shutdown support
565 /// - Default rate limiting (100 req/60s)
566 /// - Default security settings (localhost allowed, CORS disabled)
567 ///
568 /// For custom configuration (rate limits, security, CORS), use `run_http_with_config`.
569 ///
570 /// # Examples
571 ///
572 /// ## Basic usage with default configuration
573 /// ```no_run
574 /// use turbomcp_server::ServerBuilder;
575 ///
576 /// #[tokio::main]
577 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
578 /// let server = ServerBuilder::new()
579 /// .name("my-server")
580 /// .version("1.0.0")
581 /// .build();
582 ///
583 /// server.run_http("127.0.0.1:3000").await?;
584 /// Ok(())
585 /// }
586 /// ```
587 ///
588 /// ## With custom configuration
589 /// ```no_run
590 /// use turbomcp_server::ServerBuilder;
591 /// use turbomcp_transport::streamable_http::StreamableHttpConfigBuilder;
592 /// use std::time::Duration;
593 ///
594 /// #[tokio::main]
595 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
596 /// let server = ServerBuilder::new()
597 /// .name("my-server")
598 /// .version("1.0.0")
599 /// .build();
600 ///
601 /// let config = StreamableHttpConfigBuilder::new()
602 /// .with_bind_address("127.0.0.1:3000")
603 /// .allow_any_origin(true) // Enable CORS for development
604 /// .build();
605 ///
606 /// server.run_http_with_config("127.0.0.1:3000", config).await?;
607 /// Ok(())
608 /// }
609 /// ```
610 ///
611 /// # Errors
612 ///
613 /// Returns [`crate::ServerError::Transport`] if:
614 /// - Address resolution fails
615 /// - HTTP server fails to start
616 /// - Transport disconnection fails
617 #[cfg(feature = "http")]
618 #[tracing::instrument(skip(self), fields(
619 transport = "http",
620 service_name = %self.config.name,
621 service_version = %self.config.version,
622 addr = ?addr
623 ))]
624 pub async fn run_http<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
625 self,
626 addr: A,
627 ) -> ServerResult<()> {
628 use turbomcp_transport::streamable_http::StreamableHttpConfigBuilder;
629
630 // Build default configuration
631 let config = StreamableHttpConfigBuilder::new().build();
632
633 self.run_http_with_config(addr, config).await
634 }
635
636 /// Run server with HTTP transport and custom configuration
637 ///
638 /// This provides full control over HTTP server configuration including:
639 /// - Rate limiting (requests per time window, or disabled entirely)
640 /// - Security settings (CORS, origin validation, authentication)
641 /// - Network settings (bind address, endpoint path, keep-alive)
642 /// - Advanced settings (replay buffer size, etc.)
643 ///
644 /// # Bind Address Configuration
645 ///
646 /// **IMPORTANT**: The `addr` parameter takes precedence over `config.bind_addr`.
647 /// If they differ, a deprecation warning is logged.
648 ///
649 /// **Best Practice** (recommended for forward compatibility):
650 /// ```no_run
651 /// # use turbomcp_server::ServerBuilder;
652 /// # use turbomcp_transport::streamable_http::StreamableHttpConfigBuilder;
653 /// # #[tokio::main]
654 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
655 /// let config = StreamableHttpConfigBuilder::new()
656 /// .with_bind_address("127.0.0.1:3001") // Set bind address in config
657 /// .build();
658 ///
659 /// // Pass matching addr parameter (or use default "127.0.0.1:8080")
660 /// ServerBuilder::new()
661 /// .build()
662 /// .run_http_with_config("127.0.0.1:3001", config).await?;
663 /// # Ok(())
664 /// # }
665 /// ```
666 ///
667 /// **Deprecated** (will be removed in v3.x):
668 /// ```no_run
669 /// # use turbomcp_server::ServerBuilder;
670 /// # use turbomcp_transport::streamable_http::StreamableHttpConfigBuilder;
671 /// # #[tokio::main]
672 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
673 /// // ⚠️ Avoid setting different addresses - causes deprecation warning
674 /// let config = StreamableHttpConfigBuilder::new()
675 /// .with_bind_address("0.0.0.0:5000") // This is ignored!
676 /// .build();
677 ///
678 /// // The addr parameter wins (3001 is used, not 5000)
679 /// ServerBuilder::new()
680 /// .build()
681 /// .run_http_with_config("127.0.0.1:3001", config).await?;
682 /// # Ok(())
683 /// # }
684 /// ```
685 ///
686 /// **Future (v3.x)**: The `addr` parameter will be removed. Configure bind address
687 /// via `config.with_bind_address()` only.
688 ///
689 /// # Examples
690 ///
691 /// ## Custom configuration example
692 /// ```no_run
693 /// use turbomcp_server::ServerBuilder;
694 /// use turbomcp_transport::streamable_http::StreamableHttpConfigBuilder;
695 /// use std::time::Duration;
696 ///
697 /// #[tokio::main]
698 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
699 /// let server = ServerBuilder::new()
700 /// .name("custom-server")
701 /// .version("1.0.0")
702 /// .build();
703 ///
704 /// let config = StreamableHttpConfigBuilder::new()
705 /// .with_bind_address("127.0.0.1:3000")
706 /// .with_rate_limit(1000, Duration::from_secs(60)) // 1000 req/min
707 /// .build();
708 ///
709 /// server.run_http_with_config("127.0.0.1:3000", config).await?;
710 /// Ok(())
711 /// }
712 /// ```
713 ///
714 /// ## Production configuration (secure, rate limited)
715 /// ```no_run
716 /// use turbomcp_server::ServerBuilder;
717 /// use turbomcp_transport::streamable_http::StreamableHttpConfigBuilder;
718 /// use std::time::Duration;
719 ///
720 /// #[tokio::main]
721 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
722 /// let server = ServerBuilder::new()
723 /// .name("production-server")
724 /// .version("1.0.0")
725 /// .build();
726 ///
727 /// let config = StreamableHttpConfigBuilder::new()
728 /// .with_bind_address("127.0.0.1:3000")
729 /// .with_rate_limit(1000, Duration::from_secs(60)) // 1000 req/min
730 /// .allow_any_origin(false) // Strict CORS
731 /// .require_authentication(true) // Require auth
732 /// .build();
733 ///
734 /// server.run_http_with_config("127.0.0.1:3000", config).await?;
735 /// Ok(())
736 /// }
737 /// ```
738 ///
739 /// # Errors
740 ///
741 /// Returns [`crate::ServerError::Transport`] if:
742 /// - Address resolution fails
743 /// - HTTP server fails to start
744 /// - Transport disconnection fails
745 #[cfg(feature = "http")]
746 #[tracing::instrument(skip(self, config), fields(
747 transport = "http",
748 service_name = %self.config.name,
749 service_version = %self.config.version,
750 addr = ?addr
751 ))]
752 pub async fn run_http_with_config<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
753 self,
754 addr: A,
755 mut config: turbomcp_transport::streamable_http::StreamableHttpConfig,
756 ) -> ServerResult<()> {
757 use std::collections::HashMap;
758 use tokio::sync::{Mutex, RwLock};
759
760 // Sprint 2.6: Check for insecure 0.0.0.0 binding
761 crate::security_checks::check_binding_security(&addr);
762
763 info!("Starting MCP server with HTTP transport");
764
765 self.lifecycle.start().await;
766
767 // Resolve address to string
768 let socket_addr = addr
769 .to_socket_addrs()
770 .map_err(|e| crate::ServerError::configuration(format!("Invalid address: {}", e)))?
771 .next()
772 .ok_or_else(|| crate::ServerError::configuration("No address resolved"))?;
773
774 info!("Resolved address: {}", socket_addr);
775
776 // Check for conflicting bind addresses and warn about deprecation
777 let socket_addr_str = socket_addr.to_string();
778 if config.bind_addr != socket_addr_str {
779 warn!(
780 addr_parameter = %socket_addr_str,
781 config_bind_addr = %config.bind_addr,
782 "⚠️ DEPRECATION WARNING: The `addr` parameter takes precedence over `config.bind_addr`"
783 );
784 warn!(
785 "⚠️ In TurboMCP v3.x, the `addr` parameter will be removed. Please use StreamableHttpConfigBuilder::new().with_bind_address(\"{}\").build() instead",
786 socket_addr_str
787 );
788 warn!(
789 "⚠️ Avoid setting both `addr` parameter and `config.bind_addr` to prevent confusion"
790 );
791
792 // Update config to single source of truth
793 config.bind_addr = socket_addr_str.clone();
794 config.base_url = format!("http://{}", socket_addr_str);
795 }
796
797 info!(
798 config = ?config,
799 "HTTP configuration (updated with resolved address)"
800 );
801
802 // BIDIRECTIONAL HTTP SETUP
803 // Create shared state for session management and bidirectional MCP
804 let sessions = Arc::new(RwLock::new(HashMap::new()));
805 let pending_requests = Arc::new(Mutex::new(HashMap::new()));
806
807 // Share router across all sessions (routing logic and handler registry)
808 let router = self.router.clone();
809
810 // Capture server identity for MCP protocol compliance
811 let server_info = turbomcp_protocol::ServerInfo {
812 name: self.config.name.clone(),
813 version: self.config.version.clone(),
814 };
815
816 // Factory pattern: create session-specific router for each HTTP request
817 // This is the clean architecture that HTTP requires - each session gets its own
818 // bidirectional dispatcher while sharing the routing logic
819 let sessions_for_factory = Arc::clone(&sessions);
820 let pending_for_factory = Arc::clone(&pending_requests);
821 let router_for_factory = Arc::clone(&router);
822
823 // Create a wrapper that converts headers and delegates to router
824 // This is cleaner than storing headers on the router itself
825 let handler_factory = move |session_id: Option<String>,
826 headers: Option<axum::http::HeaderMap>,
827 tenant_id: Option<String>| {
828 let session_id = session_id.unwrap_or_else(|| {
829 let new_id = uuid::Uuid::new_v4().to_string();
830 tracing::debug!(
831 "HTTP POST without session ID - generating ephemeral ID for request: {}",
832 new_id
833 );
834 new_id
835 });
836
837 tracing::debug!("Factory creating handler for session: {}", session_id);
838
839 // Create session-specific HTTP dispatcher (now local to turbomcp-server!)
840 let dispatcher = crate::runtime::http::HttpDispatcher::new(
841 session_id,
842 Arc::clone(&sessions_for_factory),
843 Arc::clone(&pending_for_factory),
844 );
845
846 // Clone the base router and configure with session-specific dispatcher
847 // CRITICAL: set_server_request_dispatcher also recreates server_to_client adapter
848 let mut session_router = (*router_for_factory).clone();
849 session_router.set_server_request_dispatcher(dispatcher);
850
851 // Convert HeaderMap to HashMap<String, String> for passing to create_context
852 let headers_map = headers.map(|header_map| {
853 header_map
854 .iter()
855 .filter_map(|(name, value)| {
856 value
857 .to_str()
858 .ok()
859 .map(|v| (name.to_string(), v.to_string()))
860 })
861 .collect()
862 });
863
864 // Create wrapper that passes headers and tenant_id to create_context (HTTP transport)
865 HttpHandlerWithHeaders {
866 router: session_router,
867 headers: headers_map,
868 transport: "http",
869 tenant_id,
870 }
871 };
872
873 info!(
874 server_name = %server_info.name,
875 server_version = %server_info.version,
876 bind_addr = %socket_addr,
877 endpoint_path = %config.endpoint_path,
878 "HTTP server starting with full bidirectional support (elicitation, sampling, roots, ping)"
879 );
880
881 // Use factory-based HTTP server with full bidirectional support
882 use crate::runtime::http::run_http;
883 run_http(handler_factory, sessions, pending_requests, config)
884 .await
885 .map_err(|e| {
886 tracing::error!(error = %e, "HTTP server failed");
887 crate::ServerError::handler(e.to_string())
888 })?;
889
890 info!("HTTP server shutdown complete");
891 Ok(())
892 }
893
894 /// Run server with HTTP transport and Tower middleware
895 ///
896 /// This method enables advanced features like multi-tenancy, authentication, rate limiting,
897 /// and other cross-cutting concerns by allowing you to apply Tower middleware layers to the
898 /// HTTP router.
899 ///
900 /// # Multi-Tenancy Example
901 ///
902 /// ```no_run
903 /// use turbomcp_server::ServerBuilder;
904 /// use turbomcp_server::middleware::tenancy::{HeaderTenantExtractor, TenantExtractionLayer};
905 /// use tower::ServiceBuilder;
906 ///
907 /// #[tokio::main]
908 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
909 /// let server = ServerBuilder::new()
910 /// .name("multi-tenant-server")
911 /// .version("1.0.0")
912 /// .build();
913 ///
914 /// // Create tenant extractor middleware
915 /// let tenant_extractor = HeaderTenantExtractor::new("X-Tenant-ID");
916 /// let middleware = ServiceBuilder::new()
917 /// .layer(TenantExtractionLayer::new(tenant_extractor));
918 ///
919 /// // Run server with middleware
920 /// server.run_http_with_middleware(
921 /// "127.0.0.1:3000",
922 /// Box::new(move |router| router.layer(middleware))
923 /// ).await?;
924 ///
925 /// Ok(())
926 /// }
927 /// ```
928 ///
929 /// # Authentication Example
930 ///
931 /// ```no_run
932 /// use turbomcp_server::ServerBuilder;
933 /// use tower::ServiceBuilder;
934 /// use tower_http::auth::RequireAuthorizationLayer;
935 ///
936 /// # #[tokio::main]
937 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
938 /// let server = ServerBuilder::new()
939 /// .name("auth-server")
940 /// .version("1.0.0")
941 /// .build();
942 ///
943 /// // Add bearer token authentication
944 /// let middleware = ServiceBuilder::new()
945 /// .layer(RequireAuthorizationLayer::bearer("secret-token"));
946 ///
947 /// server.run_http_with_middleware(
948 /// "127.0.0.1:3000",
949 /// Box::new(move |router| router.layer(middleware))
950 /// ).await?;
951 /// # Ok(())
952 /// # }
953 /// ```
954 #[cfg(feature = "http")]
955 #[tracing::instrument(skip(self, middleware_fn), fields(
956 transport = "http",
957 service_name = %self.config.name,
958 service_version = %self.config.version,
959 addr = ?addr
960 ))]
961 pub async fn run_http_with_middleware<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
962 self,
963 addr: A,
964 middleware_fn: Box<dyn FnOnce(axum::Router) -> axum::Router + Send>,
965 ) -> ServerResult<()> {
966 use std::collections::HashMap;
967 use tokio::sync::{Mutex, RwLock};
968
969 // Sprint 2.6: Check for insecure 0.0.0.0 binding
970 crate::security_checks::check_binding_security(&addr);
971
972 info!("Starting MCP server with HTTP transport and custom middleware");
973
974 self.lifecycle.start().await;
975
976 // Resolve address to string
977 let socket_addr = addr
978 .to_socket_addrs()
979 .map_err(|e| crate::ServerError::configuration(format!("Invalid address: {}", e)))?
980 .next()
981 .ok_or_else(|| crate::ServerError::configuration("No address resolved"))?;
982
983 info!("Resolved address: {}", socket_addr);
984
985 // BIDIRECTIONAL HTTP SETUP
986 let sessions = Arc::new(RwLock::new(HashMap::new()));
987 let pending_requests = Arc::new(Mutex::new(HashMap::new()));
988 let router = self.router.clone();
989
990 // Factory pattern for session-specific handlers
991 let sessions_for_factory = Arc::clone(&sessions);
992 let pending_for_factory = Arc::clone(&pending_requests);
993 let router_for_factory = Arc::clone(&router);
994
995 let handler_factory = move |session_id: Option<String>,
996 headers: Option<axum::http::HeaderMap>,
997 tenant_id: Option<String>| {
998 let session_id = session_id.unwrap_or_else(|| {
999 let new_id = uuid::Uuid::new_v4().to_string();
1000 tracing::debug!(
1001 "HTTP POST without session ID - generating ephemeral ID for request: {}",
1002 new_id
1003 );
1004 new_id
1005 });
1006
1007 tracing::debug!("Factory creating handler for session: {}", session_id);
1008
1009 let dispatcher = crate::runtime::http::HttpDispatcher::new(
1010 session_id,
1011 Arc::clone(&sessions_for_factory),
1012 Arc::clone(&pending_for_factory),
1013 );
1014
1015 let mut session_router = (*router_for_factory).clone();
1016 session_router.set_server_request_dispatcher(dispatcher);
1017
1018 let headers_map = headers.map(|header_map| {
1019 header_map
1020 .iter()
1021 .filter_map(|(name, value)| {
1022 value
1023 .to_str()
1024 .ok()
1025 .map(|v| (name.to_string(), v.to_string()))
1026 })
1027 .collect()
1028 });
1029
1030 HttpHandlerWithHeaders {
1031 router: session_router,
1032 headers: headers_map,
1033 transport: "http",
1034 tenant_id,
1035 }
1036 };
1037
1038 info!(
1039 server_name = %self.config.name,
1040 server_version = %self.config.version,
1041 bind_addr = %socket_addr,
1042 "HTTP server starting with custom middleware and bidirectional support"
1043 );
1044
1045 // Use run_http_with_middleware function
1046 use crate::runtime::http::run_http_with_middleware;
1047 use turbomcp_transport::streamable_http::StreamableHttpConfigBuilder;
1048
1049 // Create default config with resolved address
1050 let config = StreamableHttpConfigBuilder::new()
1051 .with_bind_address(socket_addr.to_string())
1052 .with_endpoint_path("/mcp")
1053 .allow_localhost(true)
1054 .build();
1055
1056 run_http_with_middleware(
1057 handler_factory,
1058 sessions,
1059 pending_requests,
1060 config,
1061 Some(middleware_fn),
1062 )
1063 .await
1064 .map_err(|e| {
1065 tracing::error!(error = %e, "HTTP server with middleware failed");
1066 crate::ServerError::handler(e.to_string())
1067 })?;
1068
1069 info!("HTTP server shutdown complete");
1070 Ok(())
1071 }
1072
1073 /// Run server with WebSocket transport (full bidirectional support)
1074 ///
1075 /// This provides a simple API for WebSocket servers with sensible defaults:
1076 /// - Default endpoint: `/mcp/ws`
1077 /// - Full MCP 2025-11-25 compliance
1078 /// - Bidirectional communication
1079 /// - Elicitation support
1080 /// - Session management and middleware
1081 ///
1082 /// For custom configuration, use `run_websocket_with_config()`.
1083 ///
1084 /// # Example
1085 ///
1086 /// ```no_run
1087 /// use turbomcp_server::ServerBuilder;
1088 ///
1089 /// #[tokio::main]
1090 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
1091 /// let server = ServerBuilder::new()
1092 /// .name("ws-server")
1093 /// .version("1.0.0")
1094 /// .build();
1095 ///
1096 /// server.run_websocket("127.0.0.1:8080").await?;
1097 /// Ok(())
1098 /// }
1099 /// ```
1100 #[cfg(feature = "websocket")]
1101 #[tracing::instrument(skip(self), fields(
1102 transport = "websocket",
1103 service_name = %self.config.name,
1104 service_version = %self.config.version,
1105 addr = ?addr
1106 ))]
1107 pub async fn run_websocket<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
1108 self,
1109 addr: A,
1110 ) -> ServerResult<()> {
1111 use crate::config::WebSocketServerConfig;
1112
1113 // Build default configuration
1114 let config = WebSocketServerConfig::default();
1115
1116 self.run_websocket_with_config(addr, config).await
1117 }
1118
1119 /// Run server with WebSocket transport and custom configuration
1120 ///
1121 /// This provides full control over WebSocket server configuration including:
1122 /// - Custom endpoint path
1123 /// - MCP server settings (middleware, security, etc.)
1124 ///
1125 /// # Example
1126 ///
1127 /// ```no_run
1128 /// use turbomcp_server::{ServerBuilder, WebSocketServerConfig};
1129 ///
1130 /// #[tokio::main]
1131 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
1132 /// let server = ServerBuilder::new()
1133 /// .name("custom-ws-server")
1134 /// .version("1.0.0")
1135 /// .build();
1136 ///
1137 /// let config = WebSocketServerConfig {
1138 /// bind_addr: "127.0.0.1:8080".to_string(),
1139 /// endpoint_path: "/custom/ws".to_string(),
1140 /// max_concurrent_requests: 100,
1141 /// };
1142 ///
1143 /// server.run_websocket_with_config("127.0.0.1:8080", config).await?;
1144 /// Ok(())
1145 /// }
1146 /// ```
1147 #[cfg(feature = "websocket")]
1148 #[tracing::instrument(skip(self, config), fields(
1149 transport = "websocket",
1150 service_name = %self.config.name,
1151 service_version = %self.config.version,
1152 addr = ?addr
1153 ))]
1154 pub async fn run_websocket_with_config<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
1155 self,
1156 addr: A,
1157 config: crate::config::WebSocketServerConfig,
1158 ) -> ServerResult<()> {
1159 use axum::{Router, middleware, routing::get};
1160 use turbomcp_transport::axum::{WebSocketFactoryState, websocket_handler_with_factory};
1161 use turbomcp_transport::tower::SessionInfo;
1162
1163 // Sprint 2.6: Check for insecure 0.0.0.0 binding
1164 crate::security_checks::check_binding_security(&addr);
1165
1166 info!("Starting MCP server with WebSocket transport");
1167 info!(config = ?config, "WebSocket configuration");
1168
1169 self.lifecycle.start().await;
1170
1171 // Resolve address to string
1172 let socket_addr = addr
1173 .to_socket_addrs()
1174 .map_err(|e| crate::ServerError::configuration(format!("Invalid address: {}", e)))?
1175 .next()
1176 .ok_or_else(|| crate::ServerError::configuration("No address resolved"))?;
1177
1178 info!("Resolved address: {}", socket_addr);
1179
1180 // Capture server identity for MCP protocol compliance
1181 let server_info = turbomcp_protocol::ServerInfo {
1182 name: self.config.name.clone(),
1183 version: self.config.version.clone(),
1184 };
1185
1186 // Router for this server (shared across all connections)
1187 let router = (*self.router).clone();
1188
1189 // Factory: creates per-connection handler with bidirectional support
1190 // This is the unified architecture - transport layer handles WebSocket mechanics,
1191 // server layer provides MCP-specific handler logic
1192 let handler_factory =
1193 move |transport_dispatcher: turbomcp_transport::axum::WebSocketDispatcher,
1194 headers: Option<std::collections::HashMap<String, String>>,
1195 tenant_id: Option<String>| {
1196 // Wrap transport dispatcher with server layer adapter
1197 let server_dispatcher =
1198 crate::routing::WebSocketDispatcherAdapter::new(transport_dispatcher);
1199
1200 // Clone router for this connection and configure with dispatcher
1201 let mut connection_router = router.clone();
1202 connection_router.set_server_request_dispatcher(server_dispatcher);
1203
1204 // Create wrapper that passes headers and tenant_id to create_context (WebSocket transport)
1205 // We can reuse HttpHandlerWithHeaders since it's generic
1206 Arc::new(HttpHandlerWithHeaders {
1207 router: connection_router,
1208 headers,
1209 transport: "websocket",
1210 tenant_id,
1211 }) as Arc<dyn turbomcp_protocol::JsonRpcHandler>
1212 };
1213
1214 info!(
1215 server_name = %server_info.name,
1216 server_version = %server_info.version,
1217 bind_addr = %socket_addr,
1218 endpoint_path = %config.endpoint_path,
1219 "WebSocket server starting with full bidirectional support (elicitation, sampling, roots, ping)"
1220 );
1221
1222 // Create factory state for transport layer
1223 let factory_state = WebSocketFactoryState::new(handler_factory);
1224
1225 // Session middleware to extract headers (same as HTTP transport)
1226 let session_middleware = |mut request: axum::extract::Request, next: middleware::Next| async move {
1227 let mut session = SessionInfo::new();
1228
1229 // Extract headers and store in session metadata
1230 for (name, value) in request.headers().iter() {
1231 if let Ok(value_str) = value.to_str() {
1232 session
1233 .metadata
1234 .insert(name.to_string(), value_str.to_string());
1235 }
1236 }
1237
1238 // Extract specific useful headers
1239 if let Some(user_agent) = request
1240 .headers()
1241 .get("user-agent")
1242 .and_then(|v| v.to_str().ok())
1243 {
1244 session.user_agent = Some(user_agent.to_string());
1245 }
1246
1247 if let Some(remote_addr) = request
1248 .headers()
1249 .get("x-forwarded-for")
1250 .and_then(|v| v.to_str().ok())
1251 {
1252 session.remote_addr = Some(remote_addr.to_string());
1253 }
1254
1255 request.extensions_mut().insert(session);
1256 next.run(request).await
1257 };
1258
1259 // Build Axum router using transport layer
1260 let app = Router::new()
1261 .route(&config.endpoint_path, get(websocket_handler_with_factory))
1262 .with_state(factory_state)
1263 .layer(middleware::from_fn(session_middleware));
1264
1265 info!("WebSocket server bound to {}", socket_addr);
1266
1267 // Serve using Axum
1268 let listener = tokio::net::TcpListener::bind(socket_addr)
1269 .await
1270 .map_err(|e| crate::ServerError::configuration(format!("Failed to bind: {}", e)))?;
1271
1272 axum::serve(listener, app).await.map_err(|e| {
1273 tracing::error!(error = %e, "WebSocket server failed");
1274 crate::ServerError::handler(e.to_string())
1275 })?;
1276
1277 info!("WebSocket server shutdown complete");
1278 Ok(())
1279 }
1280
1281 /// Run server with TCP transport (progressive enhancement - runtime configuration)
1282 #[cfg(feature = "tcp")]
1283 #[tracing::instrument(skip(self), fields(
1284 transport = "tcp",
1285 service_name = %self.config.name,
1286 service_version = %self.config.version,
1287 addr = ?addr
1288 ))]
1289 pub async fn run_tcp<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
1290 mut self,
1291 addr: A,
1292 ) -> ServerResult<()> {
1293 use turbomcp_transport::TcpTransport;
1294
1295 // Sprint 2.6: Check for insecure 0.0.0.0 binding
1296 crate::security_checks::check_binding_security(&addr);
1297
1298 // Start performance monitoring for TCP server
1299 let _perf_span = info_span!("server.run", transport = "tcp").entered();
1300 info!(?addr, "Starting MCP server with TCP transport");
1301
1302 self.lifecycle.start().await;
1303
1304 // Convert ToSocketAddrs to SocketAddr
1305 let socket_addr = match addr.to_socket_addrs() {
1306 Ok(mut addrs) => match addrs.next() {
1307 Some(addr) => addr,
1308 None => {
1309 tracing::error!("No socket address resolved from provided address");
1310 self.lifecycle.shutdown().await;
1311 return Err(crate::ServerError::configuration("Invalid socket address"));
1312 }
1313 },
1314 Err(e) => {
1315 tracing::error!(error = %e, "Failed to resolve socket address");
1316 self.lifecycle.shutdown().await;
1317 return Err(crate::ServerError::configuration(format!(
1318 "Address resolution failed: {e}"
1319 )));
1320 }
1321 };
1322
1323 let transport = TcpTransport::new_server(socket_addr);
1324 if let Err(e) = transport.connect().await {
1325 tracing::error!(error = %e, "Failed to connect TCP transport");
1326 self.lifecycle.shutdown().await;
1327 return Err(e.into());
1328 }
1329
1330 // BIDIRECTIONAL TCP SETUP
1331 // Create generic transport dispatcher for server-initiated requests
1332 let dispatcher = crate::runtime::TransportDispatcher::new(transport);
1333
1334 // Configure router's bidirectional support with the TCP dispatcher
1335 // This enables ctx.elicit(), ctx.create_message(), ctx.list_roots(), etc.
1336 let router = Arc::make_mut(&mut self.router);
1337 router.set_server_request_dispatcher(dispatcher.clone());
1338
1339 // Run TCP with full bidirectional support (MCP 2025-11-25 compliant)
1340 // This uses the generic bidirectional runtime that handles both:
1341 // - Client→Server requests (tools, resources, prompts)
1342 // - Server→Client requests (sampling, elicitation, roots, ping)
1343 crate::runtime::run_transport_bidirectional(self.router.clone(), dispatcher)
1344 .await
1345 .map_err(|e| crate::ServerError::Handler {
1346 message: format!("TCP bidirectional runtime failed: {}", e),
1347 context: Some("run_tcp".to_string()),
1348 })
1349 }
1350
1351 /// Run server with Unix socket transport (progressive enhancement - runtime configuration)
1352 #[cfg(all(feature = "unix", unix))]
1353 #[tracing::instrument(skip(self), fields(
1354 transport = "unix",
1355 service_name = %self.config.name,
1356 service_version = %self.config.version,
1357 path = ?path.as_ref()
1358 ))]
1359 pub async fn run_unix<P: AsRef<std::path::Path>>(mut self, path: P) -> ServerResult<()> {
1360 use std::path::PathBuf;
1361 use turbomcp_transport::UnixTransport;
1362
1363 // Start performance monitoring for Unix server
1364 let _perf_span = info_span!("server.run", transport = "unix").entered();
1365 info!(path = ?path.as_ref(), "Starting MCP server with Unix socket transport");
1366
1367 self.lifecycle.start().await;
1368
1369 let socket_path = PathBuf::from(path.as_ref());
1370 let transport = UnixTransport::new_server(socket_path);
1371 if let Err(e) = transport.connect().await {
1372 tracing::error!(error = %e, "Failed to connect Unix socket transport");
1373 self.lifecycle.shutdown().await;
1374 return Err(e.into());
1375 }
1376
1377 // BIDIRECTIONAL UNIX SOCKET SETUP
1378 // Create generic transport dispatcher for server-initiated requests
1379 let dispatcher = crate::runtime::TransportDispatcher::new(transport);
1380
1381 // Configure router's bidirectional support with the Unix socket dispatcher
1382 // This enables ctx.elicit(), ctx.create_message(), ctx.list_roots(), etc.
1383 let router = Arc::make_mut(&mut self.router);
1384 router.set_server_request_dispatcher(dispatcher.clone());
1385
1386 // Run Unix Socket with full bidirectional support (MCP 2025-11-25 compliant)
1387 // This uses the generic bidirectional runtime that handles both:
1388 // - Client→Server requests (tools, resources, prompts)
1389 // - Server→Client requests (sampling, elicitation, roots, ping)
1390 crate::runtime::run_transport_bidirectional(self.router.clone(), dispatcher)
1391 .await
1392 .map_err(|e| crate::ServerError::Handler {
1393 message: format!("Unix socket bidirectional runtime failed: {}", e),
1394 context: Some("run_unix".to_string()),
1395 })
1396 }
1397
1398 /// Generic transport runner (DRY principle)
1399 /// Used by feature-gated transport methods (http, tcp, websocket, unix)
1400 #[allow(dead_code)]
1401 #[tracing::instrument(skip(self, transport), fields(
1402 service_name = %self.config.name,
1403 service_version = %self.config.version
1404 ))]
1405 async fn run_with_transport<T: Transport>(&self, mut transport: T) -> ServerResult<()> {
1406 // Install signal handlers for graceful shutdown (Ctrl+C / SIGTERM)
1407 let lifecycle_for_sigint = self.lifecycle.clone();
1408 tokio::spawn(async move {
1409 if let Err(e) = tokio::signal::ctrl_c().await {
1410 tracing::warn!(error = %e, "Failed to install Ctrl+C handler");
1411 return;
1412 }
1413 tracing::info!("Ctrl+C received, initiating shutdown");
1414 lifecycle_for_sigint.shutdown().await;
1415 });
1416
1417 #[cfg(unix)]
1418 {
1419 let lifecycle_for_sigterm = self.lifecycle.clone();
1420 tokio::spawn(async move {
1421 use tokio::signal::unix::{SignalKind, signal};
1422 match signal(SignalKind::terminate()) {
1423 Ok(mut sigterm) => {
1424 sigterm.recv().await;
1425 tracing::info!("SIGTERM received, initiating shutdown");
1426 lifecycle_for_sigterm.shutdown().await;
1427 }
1428 Err(e) => tracing::warn!(error = %e, "Failed to install SIGTERM handler"),
1429 }
1430 });
1431 }
1432
1433 // Shutdown signal
1434 let mut shutdown = self.lifecycle.shutdown_signal();
1435
1436 // Main message processing loop
1437 loop {
1438 tokio::select! {
1439 _ = shutdown.recv() => {
1440 tracing::info!("Shutdown signal received");
1441 break;
1442 }
1443 res = transport.receive() => {
1444 match res {
1445 Ok(Some(message)) => {
1446 if let Err(e) = self.handle_transport_message(&mut transport, message).await {
1447 tracing::warn!(error = %e, "Failed to handle transport message");
1448 }
1449 }
1450 Ok(None) => {
1451 // No message available; sleep briefly to avoid busy loop
1452 sleep(Duration::from_millis(5)).await;
1453 }
1454 Err(e) => {
1455 match e {
1456 TransportError::ReceiveFailed(msg) if msg.contains("disconnected") => {
1457 tracing::info!("Transport receive channel disconnected; shutting down");
1458 break;
1459 }
1460 _ => {
1461 tracing::error!(error = %e, "Transport receive failed");
1462 // Backoff on errors
1463 sleep(Duration::from_millis(50)).await;
1464 }
1465 }
1466 }
1467 }
1468 }
1469 }
1470 }
1471
1472 // Disconnect transport
1473 if let Err(e) = transport.disconnect().await {
1474 tracing::warn!(error = %e, "Error while disconnecting transport");
1475 }
1476
1477 tracing::info!("Server shutdown complete");
1478 Ok(())
1479 }
1480}
1481
1482// Compile-time assertion that McpServer is Send + Clone (Tower pattern)
1483// Note: McpServer is Clone but NOT Sync (due to BoxCloneService being !Sync)
1484// This is intentional and follows the Axum/Tower design pattern
1485#[allow(dead_code)]
1486const _: () = {
1487 const fn assert_send_clone<T: Send + Clone>() {}
1488 const fn check() {
1489 assert_send_clone::<crate::server::core::McpServer>();
1490 }
1491};