turbomcp_server/server/core.rs
1//! Core MCP server implementation
2//!
3//! Contains the main McpServer struct and its core functionality including
4//! middleware building, lifecycle management, and server construction.
5
6use std::sync::Arc;
7use tracing::{info, info_span};
8
9#[cfg(feature = "http")]
10use tracing::warn;
11
12use crate::{
13 config::ServerConfig,
14 error::ServerResult,
15 lifecycle::{HealthStatus, ServerLifecycle},
16 metrics::ServerMetrics,
17 registry::HandlerRegistry,
18 routing::RequestRouter,
19 service::McpService,
20};
21
22#[cfg(feature = "middleware")]
23use crate::middleware::MiddlewareStack;
24
25use bytes::Bytes;
26#[cfg(feature = "middleware")]
27use http::StatusCode;
28use http::{Request, Response};
29use tokio::time::{Duration, sleep};
30use turbomcp_transport::Transport;
31use turbomcp_transport::core::TransportError;
32
33use super::shutdown::ShutdownHandle;
34
35/// Check if logging should be enabled for STDIO transport
36///
37/// For MCP STDIO transport compliance, logging is disabled by default since stdout
38/// must be reserved exclusively for JSON-RPC messages. This can be overridden by
39/// setting the TURBOMCP_FORCE_LOGGING environment variable.
40pub(crate) fn should_log_for_stdio() -> bool {
41 std::env::var("TURBOMCP_FORCE_LOGGING").is_ok()
42}
43
44/// Wrapper that holds router + headers + tenant_id and implements JsonRpcHandler
45/// This allows us to pass headers and tenant info to create_context without storing them on the router.
46/// Used by both HTTP and WebSocket transports.
47#[cfg(any(feature = "http", feature = "websocket"))]
48struct HttpHandlerWithHeaders {
49 router: crate::routing::RequestRouter,
50 headers: Option<std::collections::HashMap<String, String>>,
51 transport: &'static str,
52 tenant_id: Option<String>,
53}
54
55#[cfg(any(feature = "http", feature = "websocket"))]
56#[async_trait::async_trait]
57impl turbomcp_protocol::JsonRpcHandler for HttpHandlerWithHeaders {
58 async fn handle_request(&self, req_value: serde_json::Value) -> serde_json::Value {
59 use turbomcp_protocol::jsonrpc::JsonRpcRequest;
60
61 // Parse the request
62 let req: JsonRpcRequest = match serde_json::from_value(req_value) {
63 Ok(r) => r,
64 Err(e) => {
65 return serde_json::json!({
66 "jsonrpc": "2.0",
67 "error": {
68 "code": -32700,
69 "message": format!("Parse error: {}", e)
70 },
71 "id": null
72 });
73 }
74 };
75
76 // Create context with headers, transport type, and tenant_id
77 // tenant_id is extracted from request extensions by the HTTP/WebSocket handlers
78 // if TenantExtractionLayer middleware was applied
79 let ctx = self.router.create_context(
80 self.headers.clone(),
81 Some(self.transport),
82 self.tenant_id.clone(),
83 );
84
85 // Route the request
86 let response = self.router.route(req, ctx).await;
87
88 // Serialize response
89 match serde_json::to_value(&response) {
90 Ok(v) => v,
91 Err(e) => {
92 serde_json::json!({
93 "jsonrpc": "2.0",
94 "error": {
95 "code": -32603,
96 "message": format!("Internal error: failed to serialize response: {}", e)
97 },
98 "id": response.id
99 })
100 }
101 }
102 }
103
104 fn server_info(&self) -> turbomcp_protocol::ServerInfo {
105 self.router.server_info()
106 }
107}
108
109/// Main MCP server following the Axum/Tower Clone pattern
110///
111/// ## Sharing Pattern
112///
113/// `McpServer` implements `Clone` like Axum's `Router`. All heavy state is Arc-wrapped
114/// internally, making cloning cheap (just atomic reference count increments).
115///
116/// ```rust,no_run
117/// use turbomcp_server::ServerBuilder;
118///
119/// # async fn example() {
120/// let server = ServerBuilder::new().build();
121///
122/// // Clone for passing to functions (cheap - just Arc increments)
123/// let server1 = server.clone();
124/// let server2 = server.clone();
125///
126/// // Access config and health
127/// let config = server1.config();
128/// println!("Server: {}", config.name);
129///
130/// let health = server2.health().await;
131/// println!("Health: {:?}", health);
132/// # }
133/// ```
134///
135/// ## Architecture Notes
136///
137/// The `service` field contains `BoxCloneService` which is `Send + Clone` but NOT `Sync`.
138/// This is intentional and follows Tower's design - users clone the server instead of
139/// Arc-wrapping it.
140///
141/// **Architecture Note**: The service field provides tower::Service integration for
142/// advanced middleware patterns. The request processing pipeline currently uses the
143/// RequestRouter directly. Tower integration can be added via custom middleware layers
144/// when needed for specific use cases (e.g., custom rate limiting, advanced tracing).
145#[derive(Clone)]
146pub struct McpServer {
147 /// Server configuration (Clone-able)
148 pub(crate) config: ServerConfig,
149 /// Handler registry (Arc-wrapped for cheap cloning)
150 pub(crate) registry: Arc<HandlerRegistry>,
151 /// Request router (Arc-wrapped for cheap cloning)
152 pub(crate) router: Arc<RequestRouter>,
153 /// Tower middleware service stack (Clone but !Sync - this is the Tower pattern)
154 ///
155 /// All requests flow through this service stack, which provides:
156 /// - Timeout enforcement
157 /// - Request validation
158 /// - Authorization checks
159 /// - Rate limiting
160 /// - Audit logging
161 /// - And more middleware layers as configured
162 ///
163 /// See `server/transport.rs` for integration with transport layer.
164 pub(crate) service:
165 tower::util::BoxCloneService<Request<Bytes>, Response<Bytes>, crate::ServerError>,
166 /// Server lifecycle (Arc-wrapped for cheap cloning)
167 pub(crate) lifecycle: Arc<ServerLifecycle>,
168 /// Server metrics (Arc-wrapped for cheap cloning)
169 pub(crate) metrics: Arc<ServerMetrics>,
170 /// Task storage for MCP Tasks API (SEP-1686)
171 #[cfg(feature = "mcp-tasks")]
172 pub(crate) task_storage: Arc<crate::task_storage::TaskStorage>,
173}
174
175impl std::fmt::Debug for McpServer {
176 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
177 f.debug_struct("McpServer")
178 .field("config", &self.config)
179 .finish()
180 }
181}
182
183impl McpServer {
184 /// Build comprehensive Tower middleware stack (transport-agnostic)
185 ///
186 /// ## Architecture
187 ///
188 /// This creates a complete Tower service stack with conditional middleware layers:
189 /// - **Timeout Layer**: Request timeout enforcement (tower_http)
190 /// - **Validation Layer**: JSON-RPC structure validation
191 /// - **Authorization Layer**: Resource access control
192 /// - **Core Service**: JSON-RPC routing and handler execution
193 ///
194 /// All middleware is composed using Tower's ServiceBuilder pattern, which provides:
195 /// - Top-to-bottom execution order
196 /// - Type-safe layer composition
197 /// - Zero-cost abstractions
198 /// - Clone-able service instances
199 ///
200 /// ## Integration
201 ///
202 /// The resulting BoxCloneService is stored in `self.service` and called from
203 /// `server/transport.rs` for every incoming request. This ensures ALL requests
204 /// flow through the complete middleware pipeline before reaching handlers.
205 ///
206 /// ## Adding Middleware
207 ///
208 /// To add new middleware, update the match arms below to include your layer.
209 /// Follow the pattern of conditional inclusion based on config flags.
210 #[cfg(feature = "middleware")]
211 fn build_middleware_stack(
212 core_service: McpService,
213 stack: MiddlewareStack,
214 ) -> tower::util::BoxCloneService<Request<Bytes>, Response<Bytes>, crate::ServerError> {
215 // COMPREHENSIVE TOWER COMPOSITION - Conditional Layer Stacking
216 //
217 // This approach builds the middleware stack incrementally, boxing at each step.
218 // While this has a small performance cost from multiple boxing operations,
219 // it provides several critical advantages:
220 //
221 // 1. **Maintainability**: No combinatorial explosion (8 match arms → simple chain)
222 // 2. **Extensibility**: Adding new middleware requires only one new block
223 // 3. **Clarity**: Each layer's purpose and configuration is explicit
224 // 4. **Type Safety**: BoxCloneService provides type erasure while preserving Clone
225 //
226 // Performance note: The boxing overhead is negligible compared to network I/O
227 // and handler execution time. Modern allocators make this essentially free.
228
229 // Start with core service as a boxed service for uniform type handling
230 let mut service: tower::util::BoxCloneService<
231 Request<Bytes>,
232 Response<Bytes>,
233 crate::ServerError,
234 > = tower::util::BoxCloneService::new(core_service);
235
236 // Authorization layer removed in 2.0.0 - handle at application layer
237
238 // Layer 2: Validation
239 // Validates request structure after auth but before processing
240 #[cfg(feature = "middleware")]
241 {
242 if let Some(validation_layer) = stack.validation_layer() {
243 service = tower::util::BoxCloneService::new(
244 tower::ServiceBuilder::new()
245 .layer(validation_layer)
246 .service(service),
247 );
248 }
249 }
250
251 // Layer 3: Timeout (outermost)
252 // Applied last so it can enforce timeout on the entire request pipeline
253 #[cfg(feature = "middleware")]
254 {
255 if let Some(timeout_config) = stack.timeout_config
256 && timeout_config.enabled
257 {
258 service = tower::util::BoxCloneService::new(
259 tower::ServiceBuilder::new()
260 .layer(tower_http::timeout::TimeoutLayer::with_status_code(
261 StatusCode::REQUEST_TIMEOUT,
262 timeout_config.request_timeout,
263 ))
264 .service(service),
265 );
266 }
267 }
268
269 // Future middleware can be added here with similar if-let blocks:
270 // if let Some(auth_config) = stack.auth_config { ... }
271 // if let Some(audit_config) = stack.audit_config { ... }
272 // if let Some(rate_limit_config) = stack.rate_limit_config { ... }
273
274 service
275 }
276
277 /// Create a new server
278 #[must_use]
279 pub fn new(config: ServerConfig) -> Self {
280 Self::new_with_registry(config, HandlerRegistry::new())
281 }
282
283 /// Create a new server with an existing registry (used by ServerBuilder)
284 #[must_use]
285 pub(crate) fn new_with_registry(config: ServerConfig, registry: HandlerRegistry) -> Self {
286 let registry = Arc::new(registry);
287 let metrics = Arc::new(ServerMetrics::new());
288
289 // Initialize task storage (SEP-1686)
290 #[cfg(feature = "mcp-tasks")]
291 let task_storage = {
292 use tokio::time::Duration;
293 let storage = crate::task_storage::TaskStorage::new(Duration::from_secs(60));
294 // Start background cleanup task
295 storage.start_cleanup();
296 Arc::new(storage)
297 };
298
299 let router = Arc::new(RequestRouter::new(
300 Arc::clone(®istry),
301 Arc::clone(&metrics),
302 config.clone(),
303 #[cfg(feature = "mcp-tasks")]
304 Some(Arc::clone(&task_storage)),
305 ));
306 // Build middleware stack configuration
307 #[cfg(feature = "middleware")]
308 #[cfg_attr(not(feature = "rate-limiting"), allow(unused_mut))]
309 let mut stack = crate::middleware::MiddlewareStack::new();
310
311 // Auto-install rate limiting if enabled in config
312 #[cfg(feature = "rate-limiting")]
313 if config.rate_limiting.enabled {
314 use crate::middleware::rate_limit::{RateLimitStrategy, RateLimits};
315 use std::num::NonZeroU32;
316 use std::time::Duration;
317
318 let rate_config = crate::middleware::RateLimitConfig {
319 strategy: RateLimitStrategy::Global,
320 limits: RateLimits {
321 requests_per_period: NonZeroU32::new(
322 config.rate_limiting.requests_per_second * 60,
323 )
324 .unwrap(), // Convert per-second to per-minute
325 period: Duration::from_secs(60),
326 burst_size: Some(NonZeroU32::new(config.rate_limiting.burst_capacity).unwrap()),
327 },
328 enabled: true,
329 };
330
331 stack = stack.with_rate_limit(rate_config);
332 }
333
334 // Create core MCP service
335 let core_service = McpService::new(
336 Arc::clone(®istry),
337 Arc::clone(&router),
338 Arc::clone(&metrics),
339 );
340
341 // COMPREHENSIVE TOWER SERVICE COMPOSITION
342 // Build the complete middleware stack with proper type erasure
343 //
344 // This service is called from server/transport.rs for EVERY incoming request:
345 // TransportMessage -> http::Request -> service.call() -> http::Response -> TransportMessage
346 //
347 // The Tower middleware stack provides:
348 // ✓ Timeout enforcement (configurable per-request)
349 // ✓ Request validation (JSON-RPC structure)
350 // ✓ Authorization checks (resource access control)
351 // ✓ Rate limiting (if enabled in config)
352 // ✓ Audit logging (configurable)
353 // ✓ And more layers as configured
354 //
355 // BoxCloneService is Clone but !Sync - this is the Tower pattern
356 #[cfg(feature = "middleware")]
357 let service = Self::build_middleware_stack(core_service, stack);
358
359 #[cfg(not(feature = "middleware"))]
360 let service = tower::util::BoxCloneService::new(core_service);
361
362 let lifecycle = Arc::new(ServerLifecycle::new());
363
364 Self {
365 config,
366 registry,
367 router,
368 service,
369 lifecycle,
370 metrics,
371 #[cfg(feature = "mcp-tasks")]
372 task_storage,
373 }
374 }
375
376 /// Get server configuration
377 #[must_use]
378 pub const fn config(&self) -> &ServerConfig {
379 &self.config
380 }
381
382 /// Get handler registry
383 #[must_use]
384 pub const fn registry(&self) -> &Arc<HandlerRegistry> {
385 &self.registry
386 }
387
388 /// Get request router
389 #[must_use]
390 pub const fn router(&self) -> &Arc<RequestRouter> {
391 &self.router
392 }
393
394 /// Get server lifecycle
395 #[must_use]
396 pub const fn lifecycle(&self) -> &Arc<ServerLifecycle> {
397 &self.lifecycle
398 }
399
400 /// Get server metrics
401 #[must_use]
402 pub const fn metrics(&self) -> &Arc<ServerMetrics> {
403 &self.metrics
404 }
405
406 /// Get task storage (MCP Tasks API - SEP-1686)
407 ///
408 /// Returns the task storage instance for managing long-running operations.
409 /// Only available when the `mcp-tasks` feature is enabled.
410 #[cfg(feature = "mcp-tasks")]
411 #[must_use]
412 pub const fn task_storage(&self) -> &Arc<crate::task_storage::TaskStorage> {
413 &self.task_storage
414 }
415
416 /// Get the Tower service stack (test accessor)
417 ///
418 /// **Note**: This is primarily for integration testing. Production code should
419 /// use the transport layer which calls the service internally via
420 /// `handle_transport_message()`.
421 ///
422 /// Returns a clone of the Tower service stack, which is cheap (BoxCloneService
423 /// is designed for cloning).
424 #[doc(hidden)]
425 pub fn service(
426 &self,
427 ) -> tower::util::BoxCloneService<Request<Bytes>, Response<Bytes>, crate::ServerError> {
428 self.service.clone()
429 }
430
431 /// Get a shutdown handle for graceful server termination
432 ///
433 /// This handle enables external control over server shutdown, essential for:
434 /// - **Production deployments**: Graceful shutdown on SIGTERM/SIGINT
435 /// - **Container orchestration**: Kubernetes graceful pod termination
436 /// - **Load balancer integration**: Health check coordination
437 /// - **Multi-component systems**: Coordinated shutdown sequences
438 /// - **Maintenance operations**: Planned downtime and updates
439 ///
440 /// # Examples
441 ///
442 /// ## Basic shutdown coordination
443 /// ```no_run
444 /// # use turbomcp_server::ServerBuilder;
445 /// # #[tokio::main]
446 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
447 /// let server = ServerBuilder::new().build();
448 /// let shutdown_handle = server.shutdown_handle();
449 ///
450 /// // Coordinate with other services
451 /// tokio::spawn(async move {
452 /// // Wait for external shutdown signal
453 /// tokio::signal::ctrl_c().await.expect("Failed to install Ctrl+C handler");
454 /// println!("Shutdown signal received, terminating gracefully...");
455 /// shutdown_handle.shutdown().await;
456 /// });
457 ///
458 /// // Server will gracefully shut down when signaled
459 /// // server.run_stdio().await?;
460 /// # Ok(())
461 /// # }
462 /// ```
463 ///
464 /// ## Container/Kubernetes deployment
465 /// ```no_run
466 /// # use turbomcp_server::ServerBuilder;
467 /// # use std::sync::Arc;
468 /// # #[tokio::main]
469 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
470 /// let server = ServerBuilder::new().build();
471 /// let shutdown_handle = server.shutdown_handle();
472 /// let shutdown_handle_clone = shutdown_handle.clone();
473 ///
474 /// // Handle multiple signal types with proper platform support
475 /// tokio::spawn(async move {
476 /// #[cfg(unix)]
477 /// {
478 /// use tokio::signal::unix::{signal, SignalKind};
479 /// let mut sigterm = signal(SignalKind::terminate()).unwrap();
480 /// tokio::select! {
481 /// _ = tokio::signal::ctrl_c() => {
482 /// println!("SIGINT received");
483 /// }
484 /// _ = sigterm.recv() => {
485 /// println!("SIGTERM received");
486 /// }
487 /// }
488 /// }
489 /// #[cfg(not(unix))]
490 /// {
491 /// tokio::signal::ctrl_c().await.expect("Failed to install Ctrl+C handler");
492 /// println!("SIGINT received");
493 /// }
494 /// shutdown_handle_clone.shutdown().await;
495 /// });
496 ///
497 /// // Server handles graceful shutdown automatically
498 /// // server.run_tcp("0.0.0.0:8080").await?;
499 /// # Ok(())
500 /// # }
501 /// ```
502 pub fn shutdown_handle(&self) -> ShutdownHandle {
503 ShutdownHandle::new(self.lifecycle.clone())
504 }
505
506 /// Run the server with STDIO transport
507 ///
508 /// # Errors
509 ///
510 /// Returns [`crate::ServerError::Transport`] if:
511 /// - STDIO transport connection fails
512 /// - Message sending/receiving fails
513 /// - Transport disconnection fails
514 #[tracing::instrument(skip(self), fields(
515 transport = "stdio",
516 service_name = %self.config.name,
517 service_version = %self.config.version
518 ))]
519 pub async fn run_stdio(mut self) -> ServerResult<()> {
520 // For STDIO transport, disable logging unless explicitly overridden
521 // STDIO stdout must be reserved exclusively for JSON-RPC messages per MCP protocol
522 if should_log_for_stdio() {
523 info!("Starting MCP server with STDIO transport");
524 }
525
526 // Start performance monitoring for STDIO server
527 let _perf_span = info_span!("server.run", transport = "stdio").entered();
528 info!("Initializing STDIO transport for MCP server");
529
530 self.lifecycle.start().await;
531
532 // BIDIRECTIONAL STDIO SETUP
533 // Create STDIO dispatcher for server-initiated requests (sampling, elicitation, roots, ping)
534 let (request_tx, request_rx) = tokio::sync::mpsc::unbounded_channel();
535
536 // Use fully-qualified path to avoid ambiguity with the turbomcp crate's runtime module
537 let dispatcher = crate::runtime::StdioDispatcher::new(request_tx);
538
539 // Configure router's bidirectional support with the STDIO dispatcher
540 // SAFETY: We have &mut self, so we can safely get mutable access to the Arc'd router
541 // This is the CRITICAL STEP that was missing - without this, all server→client requests fail
542 let router = Arc::make_mut(&mut self.router);
543 router.set_server_request_dispatcher(dispatcher.clone());
544
545 // Run STDIO with full bidirectional support (MCP 2025-11-25 compliant)
546 // This uses the bidirectional-aware runtime that handles both:
547 // - Client→Server requests (tools, resources, prompts)
548 // - Server→Client requests (sampling, elicitation, roots, ping)
549 crate::runtime::run_stdio_bidirectional(self.router.clone(), dispatcher, request_rx)
550 .await
551 .map_err(|e| crate::ServerError::Handler {
552 message: format!("STDIO bidirectional runtime failed: {}", e),
553 context: Some("run_stdio".to_string()),
554 })
555 }
556
557 /// Get health status
558 pub async fn health(&self) -> HealthStatus {
559 self.lifecycle.health().await
560 }
561
562 /// Run server with HTTP transport using default configuration
563 ///
564 /// This provides a working HTTP server with:
565 /// - Standard HTTP POST/GET/DELETE for MCP protocol at `/mcp`
566 /// - Full MCP 2025-11-25 protocol compliance
567 /// - Graceful shutdown support
568 /// - Default rate limiting (100 req/60s)
569 /// - Default security settings (localhost allowed, CORS disabled)
570 ///
571 /// For custom configuration (rate limits, security, CORS), use `run_http_with_config`.
572 ///
573 /// # Examples
574 ///
575 /// ## Basic usage with default configuration
576 /// ```no_run
577 /// use turbomcp_server::ServerBuilder;
578 ///
579 /// #[tokio::main]
580 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
581 /// let server = ServerBuilder::new()
582 /// .name("my-server")
583 /// .version("1.0.0")
584 /// .build();
585 ///
586 /// server.run_http("127.0.0.1:3000").await?;
587 /// Ok(())
588 /// }
589 /// ```
590 ///
591 /// ## With custom configuration
592 /// ```no_run
593 /// use turbomcp_server::ServerBuilder;
594 /// use turbomcp_transport::streamable_http::StreamableHttpConfigBuilder;
595 /// use std::time::Duration;
596 ///
597 /// #[tokio::main]
598 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
599 /// let server = ServerBuilder::new()
600 /// .name("my-server")
601 /// .version("1.0.0")
602 /// .build();
603 ///
604 /// let config = StreamableHttpConfigBuilder::new()
605 /// .with_bind_address("127.0.0.1:3000")
606 /// .allow_any_origin(true) // Enable CORS for development
607 /// .build();
608 ///
609 /// server.run_http_with_config("127.0.0.1:3000", config).await?;
610 /// Ok(())
611 /// }
612 /// ```
613 ///
614 /// # Errors
615 ///
616 /// Returns [`crate::ServerError::Transport`] if:
617 /// - Address resolution fails
618 /// - HTTP server fails to start
619 /// - Transport disconnection fails
620 #[cfg(feature = "http")]
621 #[tracing::instrument(skip(self), fields(
622 transport = "http",
623 service_name = %self.config.name,
624 service_version = %self.config.version,
625 addr = ?addr
626 ))]
627 pub async fn run_http<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
628 self,
629 addr: A,
630 ) -> ServerResult<()> {
631 use turbomcp_transport::streamable_http::StreamableHttpConfigBuilder;
632
633 // Build default configuration
634 let config = StreamableHttpConfigBuilder::new().build();
635
636 self.run_http_with_config(addr, config).await
637 }
638
639 /// Run server with HTTP transport and custom configuration
640 ///
641 /// This provides full control over HTTP server configuration including:
642 /// - Rate limiting (requests per time window, or disabled entirely)
643 /// - Security settings (CORS, origin validation, authentication)
644 /// - Network settings (bind address, endpoint path, keep-alive)
645 /// - Advanced settings (replay buffer size, etc.)
646 ///
647 /// # Bind Address Configuration
648 ///
649 /// **IMPORTANT**: The `addr` parameter takes precedence over `config.bind_addr`.
650 /// If they differ, a deprecation warning is logged.
651 ///
652 /// **Best Practice** (recommended for forward compatibility):
653 /// ```no_run
654 /// # use turbomcp_server::ServerBuilder;
655 /// # use turbomcp_transport::streamable_http::StreamableHttpConfigBuilder;
656 /// # #[tokio::main]
657 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
658 /// let config = StreamableHttpConfigBuilder::new()
659 /// .with_bind_address("127.0.0.1:3001") // Set bind address in config
660 /// .build();
661 ///
662 /// // Pass matching addr parameter (or use default "127.0.0.1:8080")
663 /// ServerBuilder::new()
664 /// .build()
665 /// .run_http_with_config("127.0.0.1:3001", config).await?;
666 /// # Ok(())
667 /// # }
668 /// ```
669 ///
670 /// **Deprecated** (will be removed in v3.x):
671 /// ```no_run
672 /// # use turbomcp_server::ServerBuilder;
673 /// # use turbomcp_transport::streamable_http::StreamableHttpConfigBuilder;
674 /// # #[tokio::main]
675 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
676 /// // ⚠️ Avoid setting different addresses - causes deprecation warning
677 /// let config = StreamableHttpConfigBuilder::new()
678 /// .with_bind_address("0.0.0.0:5000") // This is ignored!
679 /// .build();
680 ///
681 /// // The addr parameter wins (3001 is used, not 5000)
682 /// ServerBuilder::new()
683 /// .build()
684 /// .run_http_with_config("127.0.0.1:3001", config).await?;
685 /// # Ok(())
686 /// # }
687 /// ```
688 ///
689 /// **Future (v3.x)**: The `addr` parameter will be removed. Configure bind address
690 /// via `config.with_bind_address()` only.
691 ///
692 /// # Examples
693 ///
694 /// ## Custom configuration example
695 /// ```no_run
696 /// use turbomcp_server::ServerBuilder;
697 /// use turbomcp_transport::streamable_http::StreamableHttpConfigBuilder;
698 /// use std::time::Duration;
699 ///
700 /// #[tokio::main]
701 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
702 /// let server = ServerBuilder::new()
703 /// .name("custom-server")
704 /// .version("1.0.0")
705 /// .build();
706 ///
707 /// let config = StreamableHttpConfigBuilder::new()
708 /// .with_bind_address("127.0.0.1:3000")
709 /// .with_rate_limit(1000, Duration::from_secs(60)) // 1000 req/min
710 /// .build();
711 ///
712 /// server.run_http_with_config("127.0.0.1:3000", config).await?;
713 /// Ok(())
714 /// }
715 /// ```
716 ///
717 /// ## Production configuration (secure, rate limited)
718 /// ```no_run
719 /// use turbomcp_server::ServerBuilder;
720 /// use turbomcp_transport::streamable_http::StreamableHttpConfigBuilder;
721 /// use std::time::Duration;
722 ///
723 /// #[tokio::main]
724 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
725 /// let server = ServerBuilder::new()
726 /// .name("production-server")
727 /// .version("1.0.0")
728 /// .build();
729 ///
730 /// let config = StreamableHttpConfigBuilder::new()
731 /// .with_bind_address("127.0.0.1:3000")
732 /// .with_rate_limit(1000, Duration::from_secs(60)) // 1000 req/min
733 /// .allow_any_origin(false) // Strict CORS
734 /// .require_authentication(true) // Require auth
735 /// .build();
736 ///
737 /// server.run_http_with_config("127.0.0.1:3000", config).await?;
738 /// Ok(())
739 /// }
740 /// ```
741 ///
742 /// # Errors
743 ///
744 /// Returns [`crate::ServerError::Transport`] if:
745 /// - Address resolution fails
746 /// - HTTP server fails to start
747 /// - Transport disconnection fails
748 #[cfg(feature = "http")]
749 #[tracing::instrument(skip(self, config), fields(
750 transport = "http",
751 service_name = %self.config.name,
752 service_version = %self.config.version,
753 addr = ?addr
754 ))]
755 pub async fn run_http_with_config<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
756 self,
757 addr: A,
758 mut config: turbomcp_transport::streamable_http::StreamableHttpConfig,
759 ) -> ServerResult<()> {
760 use std::collections::HashMap;
761 use tokio::sync::{Mutex, RwLock};
762
763 // Sprint 2.6: Check for insecure 0.0.0.0 binding
764 crate::security_checks::check_binding_security(&addr);
765
766 info!("Starting MCP server with HTTP transport");
767
768 self.lifecycle.start().await;
769
770 // Resolve address to string
771 let socket_addr = addr
772 .to_socket_addrs()
773 .map_err(|e| crate::ServerError::configuration(format!("Invalid address: {}", e)))?
774 .next()
775 .ok_or_else(|| crate::ServerError::configuration("No address resolved"))?;
776
777 info!("Resolved address: {}", socket_addr);
778
779 // Check for conflicting bind addresses and warn about deprecation
780 let socket_addr_str = socket_addr.to_string();
781 if config.bind_addr != socket_addr_str {
782 warn!(
783 addr_parameter = %socket_addr_str,
784 config_bind_addr = %config.bind_addr,
785 "⚠️ DEPRECATION WARNING: The `addr` parameter takes precedence over `config.bind_addr`"
786 );
787 warn!(
788 "⚠️ In TurboMCP v3.x, the `addr` parameter will be removed. Please use StreamableHttpConfigBuilder::new().with_bind_address(\"{}\").build() instead",
789 socket_addr_str
790 );
791 warn!(
792 "⚠️ Avoid setting both `addr` parameter and `config.bind_addr` to prevent confusion"
793 );
794
795 // Update config to single source of truth
796 config.bind_addr = socket_addr_str.clone();
797 config.base_url = format!("http://{}", socket_addr_str);
798 }
799
800 info!(
801 config = ?config,
802 "HTTP configuration (updated with resolved address)"
803 );
804
805 // BIDIRECTIONAL HTTP SETUP
806 // Create shared state for session management and bidirectional MCP
807 let sessions = Arc::new(RwLock::new(HashMap::new()));
808 let pending_requests = Arc::new(Mutex::new(HashMap::new()));
809
810 // Share router across all sessions (routing logic and handler registry)
811 let router = self.router.clone();
812
813 // Capture server identity for MCP protocol compliance
814 let server_info = turbomcp_protocol::ServerInfo {
815 name: self.config.name.clone(),
816 version: self.config.version.clone(),
817 };
818
819 // Factory pattern: create session-specific router for each HTTP request
820 // This is the clean architecture that HTTP requires - each session gets its own
821 // bidirectional dispatcher while sharing the routing logic
822 let sessions_for_factory = Arc::clone(&sessions);
823 let pending_for_factory = Arc::clone(&pending_requests);
824 let router_for_factory = Arc::clone(&router);
825
826 // Create a wrapper that converts headers and delegates to router
827 // This is cleaner than storing headers on the router itself
828 let handler_factory = move |session_id: Option<String>,
829 headers: Option<axum::http::HeaderMap>,
830 tenant_id: Option<String>| {
831 let session_id = session_id.unwrap_or_else(|| {
832 let new_id = uuid::Uuid::new_v4().to_string();
833 tracing::debug!(
834 "HTTP POST without session ID - generating ephemeral ID for request: {}",
835 new_id
836 );
837 new_id
838 });
839
840 tracing::debug!("Factory creating handler for session: {}", session_id);
841
842 // Create session-specific HTTP dispatcher (now local to turbomcp-server!)
843 let dispatcher = crate::runtime::http::HttpDispatcher::new(
844 session_id,
845 Arc::clone(&sessions_for_factory),
846 Arc::clone(&pending_for_factory),
847 );
848
849 // Clone the base router and configure with session-specific dispatcher
850 // CRITICAL: set_server_request_dispatcher also recreates server_to_client adapter
851 let mut session_router = (*router_for_factory).clone();
852 session_router.set_server_request_dispatcher(dispatcher);
853
854 // Convert HeaderMap to HashMap<String, String> for passing to create_context
855 let headers_map = headers.map(|header_map| {
856 header_map
857 .iter()
858 .filter_map(|(name, value)| {
859 value
860 .to_str()
861 .ok()
862 .map(|v| (name.to_string(), v.to_string()))
863 })
864 .collect()
865 });
866
867 // Create wrapper that passes headers and tenant_id to create_context (HTTP transport)
868 HttpHandlerWithHeaders {
869 router: session_router,
870 headers: headers_map,
871 transport: "http",
872 tenant_id,
873 }
874 };
875
876 info!(
877 server_name = %server_info.name,
878 server_version = %server_info.version,
879 bind_addr = %socket_addr,
880 endpoint_path = %config.endpoint_path,
881 "HTTP server starting with full bidirectional support (elicitation, sampling, roots, ping)"
882 );
883
884 // Use factory-based HTTP server with full bidirectional support
885 use crate::runtime::http::run_http;
886 run_http(handler_factory, sessions, pending_requests, config)
887 .await
888 .map_err(|e| {
889 tracing::error!(error = %e, "HTTP server failed");
890 crate::ServerError::handler(e.to_string())
891 })?;
892
893 info!("HTTP server shutdown complete");
894 Ok(())
895 }
896
897 /// Run server with HTTP transport and Tower middleware
898 ///
899 /// This method enables advanced features like multi-tenancy, authentication, rate limiting,
900 /// and other cross-cutting concerns by allowing you to apply Tower middleware layers to the
901 /// HTTP router.
902 ///
903 /// # Multi-Tenancy Example
904 ///
905 /// ```no_run
906 /// use turbomcp_server::ServerBuilder;
907 /// use turbomcp_server::middleware::tenancy::{HeaderTenantExtractor, TenantExtractionLayer};
908 /// use tower::ServiceBuilder;
909 ///
910 /// #[tokio::main]
911 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
912 /// let server = ServerBuilder::new()
913 /// .name("multi-tenant-server")
914 /// .version("1.0.0")
915 /// .build();
916 ///
917 /// // Create tenant extractor middleware
918 /// let tenant_extractor = HeaderTenantExtractor::new("X-Tenant-ID");
919 /// let middleware = ServiceBuilder::new()
920 /// .layer(TenantExtractionLayer::new(tenant_extractor));
921 ///
922 /// // Run server with middleware
923 /// server.run_http_with_middleware(
924 /// "127.0.0.1:3000",
925 /// Box::new(move |router| router.layer(middleware))
926 /// ).await?;
927 ///
928 /// Ok(())
929 /// }
930 /// ```
931 ///
932 /// # Authentication Example
933 ///
934 /// ```no_run
935 /// use turbomcp_server::ServerBuilder;
936 /// use tower::ServiceBuilder;
937 /// use tower_http::auth::RequireAuthorizationLayer;
938 ///
939 /// # #[tokio::main]
940 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
941 /// let server = ServerBuilder::new()
942 /// .name("auth-server")
943 /// .version("1.0.0")
944 /// .build();
945 ///
946 /// // Add bearer token authentication
947 /// let middleware = ServiceBuilder::new()
948 /// .layer(RequireAuthorizationLayer::bearer("secret-token"));
949 ///
950 /// server.run_http_with_middleware(
951 /// "127.0.0.1:3000",
952 /// Box::new(move |router| router.layer(middleware))
953 /// ).await?;
954 /// # Ok(())
955 /// # }
956 /// ```
957 #[cfg(feature = "http")]
958 #[tracing::instrument(skip(self, middleware_fn), fields(
959 transport = "http",
960 service_name = %self.config.name,
961 service_version = %self.config.version,
962 addr = ?addr
963 ))]
964 pub async fn run_http_with_middleware<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
965 self,
966 addr: A,
967 middleware_fn: Box<dyn FnOnce(axum::Router) -> axum::Router + Send>,
968 ) -> ServerResult<()> {
969 use std::collections::HashMap;
970 use tokio::sync::{Mutex, RwLock};
971
972 // Sprint 2.6: Check for insecure 0.0.0.0 binding
973 crate::security_checks::check_binding_security(&addr);
974
975 info!("Starting MCP server with HTTP transport and custom middleware");
976
977 self.lifecycle.start().await;
978
979 // Resolve address to string
980 let socket_addr = addr
981 .to_socket_addrs()
982 .map_err(|e| crate::ServerError::configuration(format!("Invalid address: {}", e)))?
983 .next()
984 .ok_or_else(|| crate::ServerError::configuration("No address resolved"))?;
985
986 info!("Resolved address: {}", socket_addr);
987
988 // BIDIRECTIONAL HTTP SETUP
989 let sessions = Arc::new(RwLock::new(HashMap::new()));
990 let pending_requests = Arc::new(Mutex::new(HashMap::new()));
991 let router = self.router.clone();
992
993 // Factory pattern for session-specific handlers
994 let sessions_for_factory = Arc::clone(&sessions);
995 let pending_for_factory = Arc::clone(&pending_requests);
996 let router_for_factory = Arc::clone(&router);
997
998 let handler_factory = move |session_id: Option<String>,
999 headers: Option<axum::http::HeaderMap>,
1000 tenant_id: Option<String>| {
1001 let session_id = session_id.unwrap_or_else(|| {
1002 let new_id = uuid::Uuid::new_v4().to_string();
1003 tracing::debug!(
1004 "HTTP POST without session ID - generating ephemeral ID for request: {}",
1005 new_id
1006 );
1007 new_id
1008 });
1009
1010 tracing::debug!("Factory creating handler for session: {}", session_id);
1011
1012 let dispatcher = crate::runtime::http::HttpDispatcher::new(
1013 session_id,
1014 Arc::clone(&sessions_for_factory),
1015 Arc::clone(&pending_for_factory),
1016 );
1017
1018 let mut session_router = (*router_for_factory).clone();
1019 session_router.set_server_request_dispatcher(dispatcher);
1020
1021 let headers_map = headers.map(|header_map| {
1022 header_map
1023 .iter()
1024 .filter_map(|(name, value)| {
1025 value
1026 .to_str()
1027 .ok()
1028 .map(|v| (name.to_string(), v.to_string()))
1029 })
1030 .collect()
1031 });
1032
1033 HttpHandlerWithHeaders {
1034 router: session_router,
1035 headers: headers_map,
1036 transport: "http",
1037 tenant_id,
1038 }
1039 };
1040
1041 info!(
1042 server_name = %self.config.name,
1043 server_version = %self.config.version,
1044 bind_addr = %socket_addr,
1045 "HTTP server starting with custom middleware and bidirectional support"
1046 );
1047
1048 // Use run_http_with_middleware function
1049 use crate::runtime::http::run_http_with_middleware;
1050 use turbomcp_transport::streamable_http::StreamableHttpConfigBuilder;
1051
1052 // Create default config with resolved address
1053 let config = StreamableHttpConfigBuilder::new()
1054 .with_bind_address(socket_addr.to_string())
1055 .with_endpoint_path("/mcp")
1056 .allow_localhost(true)
1057 .build();
1058
1059 run_http_with_middleware(
1060 handler_factory,
1061 sessions,
1062 pending_requests,
1063 config,
1064 Some(middleware_fn),
1065 )
1066 .await
1067 .map_err(|e| {
1068 tracing::error!(error = %e, "HTTP server with middleware failed");
1069 crate::ServerError::handler(e.to_string())
1070 })?;
1071
1072 info!("HTTP server shutdown complete");
1073 Ok(())
1074 }
1075
1076 /// Run server with WebSocket transport (full bidirectional support)
1077 ///
1078 /// This provides a simple API for WebSocket servers with sensible defaults:
1079 /// - Default endpoint: `/mcp/ws`
1080 /// - Full MCP 2025-11-25 compliance
1081 /// - Bidirectional communication
1082 /// - Elicitation support
1083 /// - Session management and middleware
1084 ///
1085 /// For custom configuration, use `run_websocket_with_config()`.
1086 ///
1087 /// # Example
1088 ///
1089 /// ```no_run
1090 /// use turbomcp_server::ServerBuilder;
1091 ///
1092 /// #[tokio::main]
1093 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
1094 /// let server = ServerBuilder::new()
1095 /// .name("ws-server")
1096 /// .version("1.0.0")
1097 /// .build();
1098 ///
1099 /// server.run_websocket("127.0.0.1:8080").await?;
1100 /// Ok(())
1101 /// }
1102 /// ```
1103 #[cfg(feature = "websocket")]
1104 #[tracing::instrument(skip(self), fields(
1105 transport = "websocket",
1106 service_name = %self.config.name,
1107 service_version = %self.config.version,
1108 addr = ?addr
1109 ))]
1110 pub async fn run_websocket<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
1111 self,
1112 addr: A,
1113 ) -> ServerResult<()> {
1114 use crate::config::WebSocketServerConfig;
1115
1116 // Build default configuration
1117 let config = WebSocketServerConfig::default();
1118
1119 self.run_websocket_with_config(addr, config).await
1120 }
1121
1122 /// Run server with WebSocket transport and custom configuration
1123 ///
1124 /// This provides full control over WebSocket server configuration including:
1125 /// - Custom endpoint path
1126 /// - MCP server settings (middleware, security, etc.)
1127 ///
1128 /// # Example
1129 ///
1130 /// ```no_run
1131 /// use turbomcp_server::{ServerBuilder, WebSocketServerConfig};
1132 ///
1133 /// #[tokio::main]
1134 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
1135 /// let server = ServerBuilder::new()
1136 /// .name("custom-ws-server")
1137 /// .version("1.0.0")
1138 /// .build();
1139 ///
1140 /// let config = WebSocketServerConfig {
1141 /// bind_addr: "127.0.0.1:8080".to_string(),
1142 /// endpoint_path: "/custom/ws".to_string(),
1143 /// max_concurrent_requests: 100,
1144 /// };
1145 ///
1146 /// server.run_websocket_with_config("127.0.0.1:8080", config).await?;
1147 /// Ok(())
1148 /// }
1149 /// ```
1150 #[cfg(feature = "websocket")]
1151 #[tracing::instrument(skip(self, config), fields(
1152 transport = "websocket",
1153 service_name = %self.config.name,
1154 service_version = %self.config.version,
1155 addr = ?addr
1156 ))]
1157 pub async fn run_websocket_with_config<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
1158 self,
1159 addr: A,
1160 config: crate::config::WebSocketServerConfig,
1161 ) -> ServerResult<()> {
1162 use axum::{Router, middleware, routing::get};
1163 use turbomcp_transport::axum::{WebSocketFactoryState, websocket_handler_with_factory};
1164 use turbomcp_transport::tower::SessionInfo;
1165
1166 // Sprint 2.6: Check for insecure 0.0.0.0 binding
1167 crate::security_checks::check_binding_security(&addr);
1168
1169 info!("Starting MCP server with WebSocket transport");
1170 info!(config = ?config, "WebSocket configuration");
1171
1172 self.lifecycle.start().await;
1173
1174 // Resolve address to string
1175 let socket_addr = addr
1176 .to_socket_addrs()
1177 .map_err(|e| crate::ServerError::configuration(format!("Invalid address: {}", e)))?
1178 .next()
1179 .ok_or_else(|| crate::ServerError::configuration("No address resolved"))?;
1180
1181 info!("Resolved address: {}", socket_addr);
1182
1183 // Capture server identity for MCP protocol compliance
1184 let server_info = turbomcp_protocol::ServerInfo {
1185 name: self.config.name.clone(),
1186 version: self.config.version.clone(),
1187 };
1188
1189 // Router for this server (shared across all connections)
1190 let router = (*self.router).clone();
1191
1192 // Factory: creates per-connection handler with bidirectional support
1193 // This is the unified architecture - transport layer handles WebSocket mechanics,
1194 // server layer provides MCP-specific handler logic
1195 let handler_factory =
1196 move |transport_dispatcher: turbomcp_transport::axum::WebSocketDispatcher,
1197 headers: Option<std::collections::HashMap<String, String>>,
1198 tenant_id: Option<String>| {
1199 // Wrap transport dispatcher with server layer adapter
1200 let server_dispatcher =
1201 crate::routing::WebSocketDispatcherAdapter::new(transport_dispatcher);
1202
1203 // Clone router for this connection and configure with dispatcher
1204 let mut connection_router = router.clone();
1205 connection_router.set_server_request_dispatcher(server_dispatcher);
1206
1207 // Create wrapper that passes headers and tenant_id to create_context (WebSocket transport)
1208 // We can reuse HttpHandlerWithHeaders since it's generic
1209 Arc::new(HttpHandlerWithHeaders {
1210 router: connection_router,
1211 headers,
1212 transport: "websocket",
1213 tenant_id,
1214 }) as Arc<dyn turbomcp_protocol::JsonRpcHandler>
1215 };
1216
1217 info!(
1218 server_name = %server_info.name,
1219 server_version = %server_info.version,
1220 bind_addr = %socket_addr,
1221 endpoint_path = %config.endpoint_path,
1222 "WebSocket server starting with full bidirectional support (elicitation, sampling, roots, ping)"
1223 );
1224
1225 // Create factory state for transport layer
1226 let factory_state = WebSocketFactoryState::new(handler_factory);
1227
1228 // Session middleware to extract headers (same as HTTP transport)
1229 let session_middleware = |mut request: axum::extract::Request, next: middleware::Next| async move {
1230 let mut session = SessionInfo::new();
1231
1232 // Extract headers and store in session metadata
1233 for (name, value) in request.headers().iter() {
1234 if let Ok(value_str) = value.to_str() {
1235 session
1236 .metadata
1237 .insert(name.to_string(), value_str.to_string());
1238 }
1239 }
1240
1241 // Extract specific useful headers
1242 if let Some(user_agent) = request
1243 .headers()
1244 .get("user-agent")
1245 .and_then(|v| v.to_str().ok())
1246 {
1247 session.user_agent = Some(user_agent.to_string());
1248 }
1249
1250 if let Some(remote_addr) = request
1251 .headers()
1252 .get("x-forwarded-for")
1253 .and_then(|v| v.to_str().ok())
1254 {
1255 session.remote_addr = Some(remote_addr.to_string());
1256 }
1257
1258 request.extensions_mut().insert(session);
1259 next.run(request).await
1260 };
1261
1262 // Build Axum router using transport layer
1263 let app = Router::new()
1264 .route(&config.endpoint_path, get(websocket_handler_with_factory))
1265 .with_state(factory_state)
1266 .layer(middleware::from_fn(session_middleware));
1267
1268 info!("WebSocket server bound to {}", socket_addr);
1269
1270 // Serve using Axum
1271 let listener = tokio::net::TcpListener::bind(socket_addr)
1272 .await
1273 .map_err(|e| crate::ServerError::configuration(format!("Failed to bind: {}", e)))?;
1274
1275 axum::serve(listener, app).await.map_err(|e| {
1276 tracing::error!(error = %e, "WebSocket server failed");
1277 crate::ServerError::handler(e.to_string())
1278 })?;
1279
1280 info!("WebSocket server shutdown complete");
1281 Ok(())
1282 }
1283
1284 /// Run server with TCP transport (progressive enhancement - runtime configuration)
1285 #[cfg(feature = "tcp")]
1286 #[tracing::instrument(skip(self), fields(
1287 transport = "tcp",
1288 service_name = %self.config.name,
1289 service_version = %self.config.version,
1290 addr = ?addr
1291 ))]
1292 pub async fn run_tcp<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
1293 mut self,
1294 addr: A,
1295 ) -> ServerResult<()> {
1296 use turbomcp_transport::TcpTransport;
1297
1298 // Sprint 2.6: Check for insecure 0.0.0.0 binding
1299 crate::security_checks::check_binding_security(&addr);
1300
1301 // Start performance monitoring for TCP server
1302 let _perf_span = info_span!("server.run", transport = "tcp").entered();
1303 info!(?addr, "Starting MCP server with TCP transport");
1304
1305 self.lifecycle.start().await;
1306
1307 // Convert ToSocketAddrs to SocketAddr
1308 let socket_addr = match addr.to_socket_addrs() {
1309 Ok(mut addrs) => match addrs.next() {
1310 Some(addr) => addr,
1311 None => {
1312 tracing::error!("No socket address resolved from provided address");
1313 self.lifecycle.shutdown().await;
1314 return Err(crate::ServerError::configuration("Invalid socket address"));
1315 }
1316 },
1317 Err(e) => {
1318 tracing::error!(error = %e, "Failed to resolve socket address");
1319 self.lifecycle.shutdown().await;
1320 return Err(crate::ServerError::configuration(format!(
1321 "Address resolution failed: {e}"
1322 )));
1323 }
1324 };
1325
1326 let transport = TcpTransport::new_server(socket_addr);
1327 if let Err(e) = transport.connect().await {
1328 tracing::error!(error = %e, "Failed to connect TCP transport");
1329 self.lifecycle.shutdown().await;
1330 return Err(e.into());
1331 }
1332
1333 // BIDIRECTIONAL TCP SETUP
1334 // Create generic transport dispatcher for server-initiated requests
1335 let dispatcher = crate::runtime::TransportDispatcher::new(transport);
1336
1337 // Configure router's bidirectional support with the TCP dispatcher
1338 // This enables ctx.elicit(), ctx.create_message(), ctx.list_roots(), etc.
1339 let router = Arc::make_mut(&mut self.router);
1340 router.set_server_request_dispatcher(dispatcher.clone());
1341
1342 // Run TCP with full bidirectional support (MCP 2025-11-25 compliant)
1343 // This uses the generic bidirectional runtime that handles both:
1344 // - Client→Server requests (tools, resources, prompts)
1345 // - Server→Client requests (sampling, elicitation, roots, ping)
1346 crate::runtime::run_transport_bidirectional(self.router.clone(), dispatcher)
1347 .await
1348 .map_err(|e| crate::ServerError::Handler {
1349 message: format!("TCP bidirectional runtime failed: {}", e),
1350 context: Some("run_tcp".to_string()),
1351 })
1352 }
1353
1354 /// Run server with Unix socket transport (progressive enhancement - runtime configuration)
1355 #[cfg(all(feature = "unix", unix))]
1356 #[tracing::instrument(skip(self), fields(
1357 transport = "unix",
1358 service_name = %self.config.name,
1359 service_version = %self.config.version,
1360 path = ?path.as_ref()
1361 ))]
1362 pub async fn run_unix<P: AsRef<std::path::Path>>(mut self, path: P) -> ServerResult<()> {
1363 use std::path::PathBuf;
1364 use turbomcp_transport::UnixTransport;
1365
1366 // Start performance monitoring for Unix server
1367 let _perf_span = info_span!("server.run", transport = "unix").entered();
1368 info!(path = ?path.as_ref(), "Starting MCP server with Unix socket transport");
1369
1370 self.lifecycle.start().await;
1371
1372 let socket_path = PathBuf::from(path.as_ref());
1373 let transport = UnixTransport::new_server(socket_path);
1374 if let Err(e) = transport.connect().await {
1375 tracing::error!(error = %e, "Failed to connect Unix socket transport");
1376 self.lifecycle.shutdown().await;
1377 return Err(e.into());
1378 }
1379
1380 // BIDIRECTIONAL UNIX SOCKET SETUP
1381 // Create generic transport dispatcher for server-initiated requests
1382 let dispatcher = crate::runtime::TransportDispatcher::new(transport);
1383
1384 // Configure router's bidirectional support with the Unix socket dispatcher
1385 // This enables ctx.elicit(), ctx.create_message(), ctx.list_roots(), etc.
1386 let router = Arc::make_mut(&mut self.router);
1387 router.set_server_request_dispatcher(dispatcher.clone());
1388
1389 // Run Unix Socket with full bidirectional support (MCP 2025-11-25 compliant)
1390 // This uses the generic bidirectional runtime that handles both:
1391 // - Client→Server requests (tools, resources, prompts)
1392 // - Server→Client requests (sampling, elicitation, roots, ping)
1393 crate::runtime::run_transport_bidirectional(self.router.clone(), dispatcher)
1394 .await
1395 .map_err(|e| crate::ServerError::Handler {
1396 message: format!("Unix socket bidirectional runtime failed: {}", e),
1397 context: Some("run_unix".to_string()),
1398 })
1399 }
1400
1401 /// Generic transport runner (DRY principle)
1402 /// Used by feature-gated transport methods (http, tcp, websocket, unix)
1403 #[allow(dead_code)]
1404 #[tracing::instrument(skip(self, transport), fields(
1405 service_name = %self.config.name,
1406 service_version = %self.config.version
1407 ))]
1408 async fn run_with_transport<T: Transport>(&self, mut transport: T) -> ServerResult<()> {
1409 // Install signal handlers for graceful shutdown (Ctrl+C / SIGTERM)
1410 let lifecycle_for_sigint = self.lifecycle.clone();
1411 tokio::spawn(async move {
1412 if let Err(e) = tokio::signal::ctrl_c().await {
1413 tracing::warn!(error = %e, "Failed to install Ctrl+C handler");
1414 return;
1415 }
1416 tracing::info!("Ctrl+C received, initiating shutdown");
1417 lifecycle_for_sigint.shutdown().await;
1418 });
1419
1420 #[cfg(unix)]
1421 {
1422 let lifecycle_for_sigterm = self.lifecycle.clone();
1423 tokio::spawn(async move {
1424 use tokio::signal::unix::{SignalKind, signal};
1425 match signal(SignalKind::terminate()) {
1426 Ok(mut sigterm) => {
1427 sigterm.recv().await;
1428 tracing::info!("SIGTERM received, initiating shutdown");
1429 lifecycle_for_sigterm.shutdown().await;
1430 }
1431 Err(e) => tracing::warn!(error = %e, "Failed to install SIGTERM handler"),
1432 }
1433 });
1434 }
1435
1436 // Shutdown signal
1437 let mut shutdown = self.lifecycle.shutdown_signal();
1438
1439 // Main message processing loop
1440 loop {
1441 tokio::select! {
1442 _ = shutdown.recv() => {
1443 tracing::info!("Shutdown signal received");
1444 break;
1445 }
1446 res = transport.receive() => {
1447 match res {
1448 Ok(Some(message)) => {
1449 if let Err(e) = self.handle_transport_message(&mut transport, message).await {
1450 tracing::warn!(error = %e, "Failed to handle transport message");
1451 }
1452 }
1453 Ok(None) => {
1454 // No message available; sleep briefly to avoid busy loop
1455 sleep(Duration::from_millis(5)).await;
1456 }
1457 Err(e) => {
1458 match e {
1459 TransportError::ReceiveFailed(msg) if msg.contains("disconnected") => {
1460 tracing::info!("Transport receive channel disconnected; shutting down");
1461 break;
1462 }
1463 _ => {
1464 tracing::error!(error = %e, "Transport receive failed");
1465 // Backoff on errors
1466 sleep(Duration::from_millis(50)).await;
1467 }
1468 }
1469 }
1470 }
1471 }
1472 }
1473 }
1474
1475 // Disconnect transport
1476 if let Err(e) = transport.disconnect().await {
1477 tracing::warn!(error = %e, "Error while disconnecting transport");
1478 }
1479
1480 tracing::info!("Server shutdown complete");
1481 Ok(())
1482 }
1483}
1484
1485// Compile-time assertion that McpServer is Send + Clone (Tower pattern)
1486// Note: McpServer is Clone but NOT Sync (due to BoxCloneService being !Sync)
1487// This is intentional and follows the Axum/Tower design pattern
1488#[allow(dead_code)]
1489const _: () = {
1490 const fn assert_send_clone<T: Send + Clone>() {}
1491 const fn check() {
1492 assert_send_clone::<crate::server::core::McpServer>();
1493 }
1494};