Skip to main content

fastmcp_server/
lib.rs

1//! MCP server implementation for FastMCP.
2//!
3//! This crate provides the server-side implementation:
4//! - Server builder pattern
5//! - Tool, resource, and prompt registration
6//! - Request routing and dispatching
7//! - Session management
8//!
9//! # Example
10//!
11//! ```ignore
12//! use fastmcp_rust::prelude::*;
13//!
14//! #[tool]
15//! async fn greet(ctx: &McpContext, name: String) -> String {
16//!     format!("Hello, {name}!")
17//! }
18//!
19//! fn main() {
20//!     Server::new("my-server", "1.0.0")
21//!         .tool(greet)
22//!         .run_stdio();
23//! }
24//! ```
25//!
26//! # Role in the System
27//!
28//! `fastmcp-server` is the **execution engine** for MCP servers. It ties
29//! together:
30//! - Protocol types (`fastmcp-protocol`) for requests and responses
31//! - Transports (`fastmcp-transport`) for stdio/SSE/WebSocket I/O
32//! - Core context + cancellation (`fastmcp-core`) for budgets and checkpoints
33//! - Console output (`fastmcp-console`) for human-friendly stderr rendering
34//!
35//! The façade crate `fastmcp` re-exports this API, so most users interact with
36//! `Server` via `fastmcp_rust::prelude::*`.
37
38#![forbid(unsafe_code)]
39#![allow(dead_code)]
40
41// Proc-macros (fastmcp-derive) reference this crate by its external name
42// (`fastmcp_server::...`). This alias makes those macros usable inside this crate too
43// (including in unit tests).
44extern crate self as fastmcp_server;
45
46mod auth;
47pub mod bidirectional;
48mod builder;
49pub mod caching;
50pub mod docket;
51mod handler;
52mod middleware;
53pub mod oauth;
54pub mod oidc;
55pub mod providers;
56mod proxy;
57pub mod rate_limiting;
58mod router;
59mod session;
60mod tasks;
61pub mod transform;
62
63#[cfg(test)]
64mod tests;
65
66#[cfg(feature = "jwt")]
67pub use auth::JwtTokenVerifier;
68pub use auth::{
69    AllowAllAuthProvider, AuthProvider, AuthRequest, StaticTokenVerifier, TokenAuthProvider,
70    TokenVerifier,
71};
72pub use builder::ServerBuilder;
73pub use fastmcp_console::config::{BannerStyle, ConsoleConfig, TrafficVerbosity};
74pub use fastmcp_console::stats::{ServerStats, StatsSnapshot};
75pub use handler::{
76    BidirectionalSenders, BoxFuture, ProgressNotificationSender, PromptHandler, ResourceHandler,
77    ToolHandler, create_context_with_progress, create_context_with_progress_and_senders,
78};
79pub use middleware::{Middleware, MiddlewareDecision};
80pub use proxy::{ProxyBackend, ProxyCatalog, ProxyClient};
81pub use router::{
82    MountResult, NotificationSender, Router, RouterResourceReader, RouterToolCaller, TagFilters,
83};
84pub use session::Session;
85pub use tasks::{SharedTaskManager, TaskManager};
86
87// Re-export bidirectional communication types
88pub use bidirectional::{
89    PendingRequests, RequestSender, TransportElicitationSender, TransportRootsProvider,
90    TransportSamplingSender,
91};
92
93use std::collections::HashMap;
94use std::io::{BufReader, BufWriter, Read, Write};
95use std::net::TcpListener;
96use std::sync::atomic::{AtomicUsize, Ordering};
97use std::sync::{Arc, Condvar, Mutex};
98use std::time::{Duration, Instant};
99
100use fastmcp_transport::http::{
101    HttpHandlerConfig, HttpMethod, HttpRequest, HttpRequestHandler, HttpResponse, HttpStatus,
102    HttpTransport,
103};
104
105use asupersync::time::wall_now;
106use asupersync::{Budget, CancelKind, Cx, RegionId};
107use fastmcp_console::client::RequestResponseRenderer;
108use fastmcp_console::logging::RichLoggerBuilder;
109use fastmcp_console::{banner::StartupBanner, console};
110use fastmcp_core::logging::{debug, error, info, targets};
111use fastmcp_core::{AuthContext, McpContext, McpError, McpErrorCode, McpResult, SessionState};
112use fastmcp_protocol::{
113    CallToolParams, CancelTaskParams, CancelledParams, GetPromptParams, GetTaskParams,
114    InitializeParams, JsonRpcError, JsonRpcMessage, JsonRpcRequest, JsonRpcResponse,
115    ListPromptsParams, ListResourceTemplatesParams, ListResourcesParams, ListTasksParams,
116    ListToolsParams, LogLevel, LogMessageParams, Prompt, ReadResourceParams, RequestId, Resource,
117    ResourceTemplate, ServerCapabilities, ServerInfo, SetLogLevelParams, SubmitTaskParams,
118    SubscribeResourceParams, Tool, UnsubscribeResourceParams,
119};
120use fastmcp_transport::sse::SseServerTransport;
121use fastmcp_transport::websocket::WsTransport;
122use fastmcp_transport::{AsyncStdout, Codec, StdioTransport, Transport, TransportError};
123use log::{Level, LevelFilter};
124
125/// Type alias for startup hook function.
126pub type StartupHook =
127    Box<dyn FnOnce() -> Result<(), Box<dyn std::error::Error + Send + Sync>> + Send>;
128
129/// Type alias for shutdown hook function.
130pub type ShutdownHook = Box<dyn FnOnce() + Send>;
131
132/// Lifecycle hooks for server startup and shutdown.
133///
134/// These hooks allow custom initialization and cleanup logic to run
135/// at well-defined points in the server lifecycle:
136///
137/// - `on_startup`: Called before the server starts accepting connections
138/// - `on_shutdown`: Called when the server is shutting down
139///
140/// # Example
141///
142/// ```ignore
143/// use fastmcp_rust::prelude::*;
144///
145/// Server::new("demo", "1.0.0")
146///     .on_startup(|| {
147///         println!("Initializing...");
148///         // Initialize database, caches, etc.
149///         Ok(())
150///     })
151///     .on_shutdown(|| {
152///         println!("Cleaning up...");
153///         // Close connections, flush buffers, etc.
154///     })
155///     .run_stdio();
156/// ```
157#[derive(Default)]
158pub struct LifespanHooks {
159    /// Hook called before the server starts accepting connections.
160    pub on_startup: Option<StartupHook>,
161    /// Hook called when the server is shutting down.
162    pub on_shutdown: Option<ShutdownHook>,
163}
164
165impl LifespanHooks {
166    /// Creates empty lifecycle hooks.
167    #[must_use]
168    pub fn new() -> Self {
169        Self::default()
170    }
171}
172
173/// Logging configuration for the server.
174#[derive(Debug, Clone)]
175pub struct LoggingConfig {
176    /// Minimum log level (default: INFO).
177    pub level: Level,
178    /// Show timestamps in logs (default: true).
179    pub timestamps: bool,
180    /// Show module targets in logs (default: true).
181    pub targets: bool,
182    /// Show file:line in logs (default: false).
183    pub file_line: bool,
184}
185
186impl Default for LoggingConfig {
187    fn default() -> Self {
188        Self {
189            level: Level::Info,
190            timestamps: true,
191            targets: true,
192            file_line: false,
193        }
194    }
195}
196
197impl LoggingConfig {
198    /// Create logging config from environment variables.
199    ///
200    /// Respects:
201    /// - `FASTMCP_LOG`: Log level (error, warn, info, debug, trace)
202    /// - `FASTMCP_LOG_TIMESTAMPS`: Show timestamps (0/false to disable)
203    /// - `FASTMCP_LOG_TARGETS`: Show targets (0/false to disable)
204    /// - `FASTMCP_LOG_FILE_LINE`: Show file:line (1/true to enable)
205    #[must_use]
206    pub fn from_env() -> Self {
207        let level = std::env::var("FASTMCP_LOG")
208            .ok()
209            .and_then(|s| match s.to_lowercase().as_str() {
210                "error" => Some(Level::Error),
211                "warn" | "warning" => Some(Level::Warn),
212                "info" => Some(Level::Info),
213                "debug" => Some(Level::Debug),
214                "trace" => Some(Level::Trace),
215                _ => None,
216            })
217            .unwrap_or(Level::Info);
218
219        let timestamps = std::env::var("FASTMCP_LOG_TIMESTAMPS")
220            .map(|s| !matches!(s.to_lowercase().as_str(), "0" | "false" | "no"))
221            .unwrap_or(true);
222
223        let targets = std::env::var("FASTMCP_LOG_TARGETS")
224            .map(|s| !matches!(s.to_lowercase().as_str(), "0" | "false" | "no"))
225            .unwrap_or(true);
226
227        let file_line = std::env::var("FASTMCP_LOG_FILE_LINE")
228            .map(|s| matches!(s.to_lowercase().as_str(), "1" | "true" | "yes"))
229            .unwrap_or(false);
230
231        Self {
232            level,
233            timestamps,
234            targets,
235            file_line,
236        }
237    }
238}
239
240/// Behavior when registering a component with a name that already exists.
241///
242/// This setting controls how the server handles duplicate tool, resource,
243/// or prompt names during registration.
244#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
245pub enum DuplicateBehavior {
246    /// Raise an error and fail registration.
247    ///
248    /// Use this for strict validation in production environments.
249    Error,
250
251    /// Log a warning and keep the original component.
252    ///
253    /// This is the default behavior, providing visibility into duplicates
254    /// while maintaining backwards compatibility.
255    #[default]
256    Warn,
257
258    /// Replace the original component with the new one.
259    ///
260    /// Use this when you want later registrations to override earlier ones.
261    Replace,
262
263    /// Silently keep the original component.
264    ///
265    /// Use this when duplicates are expected and should be ignored.
266    Ignore,
267}
268
269/// Configuration for the turnkey HTTP server started by [`Server::run_http`].
270///
271/// All fields have sensible defaults. Use the builder methods to customise.
272///
273/// # Example
274///
275/// ```ignore
276/// use fastmcp_server::HttpServerConfig;
277///
278/// let config = HttpServerConfig::new()
279///     .mcp_path("/api/mcp")
280///     .health_path("/healthz")
281///     .max_connections(128);
282/// ```
283#[derive(Debug, Clone)]
284pub struct HttpServerConfig {
285    /// Path where MCP JSON-RPC requests are accepted (default: `"/mcp"`).
286    pub mcp_path: String,
287    /// Path for the health-check endpoint (default: `"/health"`).
288    pub health_path: String,
289    /// Maximum number of concurrent connections (default: 64).
290    pub max_connections: usize,
291    /// Inner HTTP handler configuration (CORS, body size, timeouts).
292    pub handler_config: HttpHandlerConfig,
293}
294
295#[derive(Debug, Clone, Copy, PartialEq, Eq)]
296enum HttpRequestExecutionMode {
297    ConcurrentReadOnly,
298    ExclusiveSession,
299}
300
301impl HttpRequestExecutionMode {
302    fn for_method(method: &str) -> Self {
303        match method {
304            "resources/read" | "prompts/get" => Self::ConcurrentReadOnly,
305            _ => Self::ExclusiveSession,
306        }
307    }
308
309    fn for_request(router: &Router, request: &JsonRpcRequest) -> Self {
310        match request.method.as_str() {
311            "tools/call" => request
312                .params
313                .as_ref()
314                .and_then(|params| params.get("name"))
315                .and_then(serde_json::Value::as_str)
316                .filter(|name| router.tool_is_read_only(name))
317                .map_or(Self::ExclusiveSession, |_| Self::ConcurrentReadOnly),
318            _ => Self::for_method(&request.method),
319        }
320    }
321}
322
323#[derive(Debug, Clone)]
324struct SessionView {
325    initialized: bool,
326    state: SessionState,
327    supports_sampling: bool,
328    supports_elicitation: bool,
329    log_level: Option<LogLevel>,
330}
331
332impl SessionView {
333    fn from_session(session: &Session) -> Self {
334        Self {
335            initialized: session.is_initialized(),
336            state: session.state().clone(),
337            supports_sampling: session.supports_sampling(),
338            supports_elicitation: session.supports_elicitation(),
339            log_level: session.log_level(),
340        }
341    }
342}
343
344impl Default for HttpServerConfig {
345    fn default() -> Self {
346        Self {
347            mcp_path: "/mcp".to_string(),
348            health_path: "/health".to_string(),
349            max_connections: 64,
350            handler_config: HttpHandlerConfig {
351                base_path: "/mcp".to_string(),
352                ..HttpHandlerConfig::default()
353            },
354        }
355    }
356}
357
358impl HttpServerConfig {
359    /// Creates a new configuration with default values.
360    #[must_use]
361    pub fn new() -> Self {
362        Self::default()
363    }
364
365    /// Sets the MCP endpoint path.
366    #[must_use]
367    pub fn mcp_path(mut self, path: impl Into<String>) -> Self {
368        self.mcp_path = path.into();
369        self
370    }
371
372    /// Sets the health-check endpoint path.
373    #[must_use]
374    pub fn health_path(mut self, path: impl Into<String>) -> Self {
375        self.health_path = path.into();
376        self
377    }
378
379    /// Sets the maximum number of concurrent connections.
380    #[must_use]
381    pub fn max_connections(mut self, max: usize) -> Self {
382        self.max_connections = max;
383        self
384    }
385
386    /// Sets the inner HTTP handler configuration.
387    #[must_use]
388    pub fn handler_config(mut self, config: HttpHandlerConfig) -> Self {
389        self.handler_config = config;
390        self
391    }
392}
393
394/// An MCP server instance.
395///
396/// Servers are built using [`ServerBuilder`] and can run on various
397/// transports (stdio, SSE, WebSocket).
398pub struct Server {
399    info: ServerInfo,
400    capabilities: ServerCapabilities,
401    router: Router,
402    instructions: Option<String>,
403    /// Request timeout in seconds (0 = no timeout).
404    request_timeout_secs: u64,
405    /// Runtime statistics collector (None = disabled).
406    stats: Option<ServerStats>,
407    /// Whether to mask internal error details in responses.
408    mask_error_details: bool,
409    /// Logging configuration.
410    logging: LoggingConfig,
411    /// Console configuration for rich output.
412    console_config: ConsoleConfig,
413    /// Lifecycle hooks (wrapped in Option so they can be taken once).
414    lifespan: Mutex<Option<LifespanHooks>>,
415    /// Optional authentication provider.
416    auth_provider: Option<Arc<dyn AuthProvider>>,
417    /// Registered middleware.
418    middleware: Arc<Vec<Box<dyn crate::Middleware>>>,
419    /// Active requests by JSON-RPC request ID.
420    active_requests: Mutex<HashMap<RequestId, ActiveRequest>>,
421    /// Optional task manager for background tasks (Docket/SEP-1686).
422    task_manager: Option<SharedTaskManager>,
423    /// Pending server-to-client requests (for bidirectional communication).
424    pending_requests: Arc<bidirectional::PendingRequests>,
425    /// HTTP server configuration (paths, max_connections, CORS).
426    http_config: HttpServerConfig,
427}
428
429impl Server {
430    /// Creates a new server builder.
431    #[must_use]
432    #[allow(clippy::new_ret_no_self)]
433    pub fn new(name: impl Into<String>, version: impl Into<String>) -> ServerBuilder {
434        ServerBuilder::new(name, version)
435    }
436
437    /// Returns the server info.
438    #[must_use]
439    pub fn info(&self) -> &ServerInfo {
440        &self.info
441    }
442
443    /// Returns the server capabilities.
444    #[must_use]
445    pub fn capabilities(&self) -> &ServerCapabilities {
446        &self.capabilities
447    }
448
449    /// Lists all registered tools.
450    #[must_use]
451    pub fn tools(&self) -> Vec<Tool> {
452        self.router.tools()
453    }
454
455    /// Lists all registered resources.
456    #[must_use]
457    pub fn resources(&self) -> Vec<Resource> {
458        self.router.resources()
459    }
460
461    /// Lists all registered resource templates.
462    #[must_use]
463    pub fn resource_templates(&self) -> Vec<ResourceTemplate> {
464        self.router.resource_templates()
465    }
466
467    /// Lists all registered prompts.
468    #[must_use]
469    pub fn prompts(&self) -> Vec<Prompt> {
470        self.router.prompts()
471    }
472
473    /// Returns the task manager, if configured.
474    ///
475    /// Returns `None` if background tasks are not enabled.
476    #[must_use]
477    pub fn task_manager(&self) -> Option<&SharedTaskManager> {
478        self.task_manager.as_ref()
479    }
480
481    /// Consumes the server and returns its router.
482    ///
483    /// This is used for mounting one server's components into another.
484    #[must_use]
485    pub fn into_router(self) -> Router {
486        self.router
487    }
488
489    /// Returns the capabilities this server provides.
490    ///
491    /// This is useful when determining what components a server has
492    /// before mounting.
493    #[must_use]
494    pub fn has_tools(&self) -> bool {
495        self.capabilities.tools.is_some()
496    }
497
498    /// Returns whether this server has resources.
499    #[must_use]
500    pub fn has_resources(&self) -> bool {
501        self.capabilities.resources.is_some()
502    }
503
504    /// Returns whether this server has prompts.
505    #[must_use]
506    pub fn has_prompts(&self) -> bool {
507        self.capabilities.prompts.is_some()
508    }
509
510    /// Returns a point-in-time snapshot of server statistics.
511    ///
512    /// Returns `None` if statistics collection is disabled.
513    #[must_use]
514    pub fn stats(&self) -> Option<StatsSnapshot> {
515        self.stats.as_ref().map(ServerStats::snapshot)
516    }
517
518    /// Returns the raw statistics collector.
519    ///
520    /// Useful for advanced scenarios where you need direct access.
521    /// Returns `None` if statistics collection is disabled.
522    #[must_use]
523    pub fn stats_collector(&self) -> Option<&ServerStats> {
524        self.stats.as_ref()
525    }
526
527    /// Renders a stats panel to stderr, if stats are enabled.
528    pub fn display_stats(&self) {
529        let Some(stats) = self.stats.as_ref() else {
530            return;
531        };
532
533        let snapshot = stats.snapshot();
534        let renderer = fastmcp_console::stats::StatsRenderer::detect();
535        renderer.render_panel(&snapshot, console());
536    }
537
538    /// Returns the console configuration.
539    #[must_use]
540    pub fn console_config(&self) -> &ConsoleConfig {
541        &self.console_config
542    }
543
544    /// Renders the startup banner based on console configuration.
545    fn render_startup_banner(&self) {
546        let render = || {
547            let mut banner = StartupBanner::new(&self.info.name, &self.info.version)
548                .tools(self.router.tools_count())
549                .resources(self.router.resources_count())
550                .prompts(self.router.prompts_count())
551                .transport("stdio");
552
553            if let Some(desc) = self.instructions.as_deref().filter(|d| !d.is_empty()) {
554                banner = banner.description(desc);
555            }
556
557            // Apply banner style from config
558            match self.console_config.banner_style {
559                BannerStyle::Full => banner.render(console()),
560                BannerStyle::Compact | BannerStyle::Minimal => {
561                    // Compact/Minimal: render without the large logo
562                    banner.no_logo().render(console());
563                }
564                BannerStyle::None => {} // Already checked show_banner, but be safe
565            }
566        };
567
568        if let Err(err) = std::panic::catch_unwind(std::panic::AssertUnwindSafe(render)) {
569            eprintln!("Warning: banner rendering failed: {err:?}");
570        }
571    }
572
573    /// Initializes rich logging based on server configuration.
574    ///
575    /// This should be called early in the startup sequence, before any
576    /// log output is generated. If initialization fails (e.g., logger
577    /// already set), a warning is printed to stderr.
578    fn init_rich_logging(&self) {
579        let result = RichLoggerBuilder::new()
580            .level(self.logging.level)
581            .with_timestamps(self.logging.timestamps)
582            .with_targets(self.logging.targets)
583            .with_file_line(self.logging.file_line)
584            .init();
585
586        if let Err(e) = result {
587            // Logger already initialized (likely by user code), not an error
588            eprintln!("Note: Rich logging not initialized (logger already set): {e}");
589        }
590    }
591
592    /// Processes a single JSON-RPC request through the full server dispatch
593    /// pipeline (initialization checks, middleware, routing, tool/resource/prompt
594    /// execution, error masking, and statistics recording).
595    ///
596    /// This is the public equivalent of the internal `handle_request` method that
597    /// the built-in transports (`run_stdio`, `run_http`, etc.) use. It allows
598    /// external code to drive the server from a custom transport or embedding
599    /// without going through a `Transport` abstraction.
600    ///
601    /// # Parameters
602    ///
603    /// - `cx` — The cancellation / budget context for this request.
604    /// - `session` — Mutable reference to the session for this connection.
605    ///   The caller is responsible for session lifecycle (creation, sharing,
606    ///   locking if shared across threads).
607    /// - `request` — The incoming JSON-RPC request (or notification).
608    /// - `notification_sender` — Callback used to push server-initiated
609    ///   notifications (e.g. progress) back to the client.
610    /// - `request_sender` — Sender for server-to-client requests
611    ///   (sampling, elicitation, roots).
612    ///
613    /// # Returns
614    ///
615    /// `Some(JsonRpcResponse)` for normal requests, or `None` for
616    /// notifications (JSON-RPC messages without an `id`).
617    ///
618    /// # Example
619    ///
620    /// ```ignore
621    /// use std::sync::Arc;
622    /// use fastmcp_rust::{
623    ///     Server, Session, JsonRpcRequest, NotificationSender,
624    ///     bidirectional::RequestSender,
625    /// };
626    /// use fastmcp_core::Cx;
627    ///
628    /// let server = Arc::new(
629    ///     Server::new("my-server", "1.0.0").build(),
630    /// );
631    /// let mut session = Session::new();
632    /// let cx = Cx::for_request();
633    /// let notify: NotificationSender = Arc::new(|_| {});
634    /// let req_sender = RequestSender::noop();
635    ///
636    /// let request: JsonRpcRequest = /* ... */;
637    /// let response = server.dispatch_request(
638    ///     &cx, &mut session, request, &notify, &req_sender,
639    /// );
640    /// ```
641    pub fn dispatch_request(
642        &self,
643        cx: &Cx,
644        session: &mut Session,
645        request: JsonRpcRequest,
646        notification_sender: &NotificationSender,
647        request_sender: &bidirectional::RequestSender,
648    ) -> Option<JsonRpcResponse> {
649        self.handle_request(cx, session, request, notification_sender, request_sender)
650    }
651
652    /// Processes a single JSON-RPC request with automatic concurrency
653    /// classification, allowing read-only requests to execute without holding
654    /// the session mutex for the duration of the handler.
655    ///
656    /// This is the concurrent counterpart of [`dispatch_request`](Self::dispatch_request).
657    /// Use it when the session is shared across threads behind an
658    /// `Arc<Mutex<Session>>` (e.g. in HTTP or WebSocket transports where
659    /// multiple requests may arrive simultaneously).
660    ///
661    /// Internally, each request is classified into one of two execution modes:
662    ///
663    /// - **`ConcurrentReadOnly`** (`resources/read`, `prompts/get`, and
664    ///   `tools/call` on read-only tools): a lightweight session snapshot
665    ///   is taken and the mutex is released immediately, so other
666    ///   requests can proceed in parallel.
667    /// - **`ExclusiveSession`** (everything else): the mutex is held for the
668    ///   full duration of the handler, guaranteeing exclusive access to
669    ///   session state.
670    ///
671    /// Mutex poisoning is recovered from automatically (the poisoned inner
672    /// value is used), matching the behaviour of the internal HTTP handler.
673    ///
674    /// # Parameters
675    ///
676    /// - `cx` — The cancellation / budget context for this request.
677    /// - `session` — Shared, mutex-protected session for this connection.
678    /// - `request` — The incoming JSON-RPC request (or notification).
679    /// - `notification_sender` — Callback used to push server-initiated
680    ///   notifications (e.g. progress) back to the client.
681    /// - `request_sender` — Sender for server-to-client requests
682    ///   (sampling, elicitation, roots).
683    ///
684    /// # Returns
685    ///
686    /// `Some(JsonRpcResponse)` for normal requests, or `None` for
687    /// notifications (JSON-RPC messages without an `id`).
688    ///
689    /// # Example
690    ///
691    /// ```ignore
692    /// use std::sync::{Arc, Mutex};
693    /// use fastmcp_rust::{
694    ///     Server, Session, JsonRpcRequest, NotificationSender,
695    ///     bidirectional::RequestSender,
696    /// };
697    /// use fastmcp_core::Cx;
698    ///
699    /// let server = Arc::new(
700    ///     Server::new("my-server", "1.0.0").build(),
701    /// );
702    /// let session = Arc::new(Mutex::new(Session::new()));
703    /// let cx = Cx::for_request();
704    /// let notify: NotificationSender = Arc::new(|_| {});
705    /// let req_sender = RequestSender::noop();
706    ///
707    /// let request: JsonRpcRequest = /* ... */;
708    /// let response = server.dispatch_request_concurrent(
709    ///     &cx, &session, request, &notify, &req_sender,
710    /// );
711    /// ```
712    pub fn dispatch_request_concurrent(
713        &self,
714        cx: &Cx,
715        session: &Arc<Mutex<Session>>,
716        request: JsonRpcRequest,
717        notification_sender: &NotificationSender,
718        request_sender: &bidirectional::RequestSender,
719    ) -> Option<JsonRpcResponse> {
720        let execution_mode = HttpRequestExecutionMode::for_request(&self.router, &request);
721
722        match execution_mode {
723            HttpRequestExecutionMode::ConcurrentReadOnly => {
724                let session_view = {
725                    let session_guard = match session.lock() {
726                        Ok(g) => g,
727                        Err(poisoned) => {
728                            error!(target: targets::SERVER, "Session lock poisoned, recovering");
729                            poisoned.into_inner()
730                        }
731                    };
732                    SessionView::from_session(&session_guard)
733                };
734                self.handle_request_with_view(
735                    cx,
736                    &session_view,
737                    request,
738                    notification_sender,
739                    request_sender,
740                )
741            }
742            HttpRequestExecutionMode::ExclusiveSession => {
743                let mut session_guard = match session.lock() {
744                    Ok(g) => g,
745                    Err(poisoned) => {
746                        error!(target: targets::SERVER, "Session lock poisoned, recovering");
747                        poisoned.into_inner()
748                    }
749                };
750                self.handle_request(
751                    cx,
752                    &mut session_guard,
753                    request,
754                    notification_sender,
755                    request_sender,
756                )
757            }
758        }
759    }
760
761    /// Runs the server on stdio transport.
762    ///
763    /// This is the primary way to run MCP servers as subprocesses.
764    /// Creates a request-scoped Cx and runs the server loop.
765    pub fn run_stdio(self) -> ! {
766        // Top-level server loop Cx (non-test). Per-request budgets are applied in `handle_request`.
767        let cx = Cx::for_request();
768        self.run_stdio_with_cx(&cx)
769    }
770
771    /// Runs the server on stdio with a provided Cx.
772    ///
773    /// This allows integration with a real asupersync runtime.
774    pub fn run_stdio_with_cx(self, cx: &Cx) -> ! {
775        // Initialize rich logging first, before any log output
776        self.init_rich_logging();
777
778        let transport = StdioTransport::stdio();
779        let shared = SharedTransport::new(transport);
780
781        // Create a notification sender that writes to a separate stdout handle.
782        // This allows progress notifications to be sent during handler execution
783        // while the main transport is blocked on recv().
784        let notification_sender = create_notification_sender();
785
786        let shared_recv = shared.clone();
787        let shared_send = shared.clone();
788        self.run_loop(
789            cx,
790            move |cx| shared_recv.recv(cx),
791            move |cx, message| shared_send.send(cx, message),
792            notification_sender,
793        )
794    }
795
796    /// Runs the server on a custom transport with a request-scoped Cx.
797    ///
798    /// This is useful for SSE/WebSocket integrations where the transport is
799    /// provided by an external server framework.
800    pub fn run_transport<T>(self, transport: T) -> !
801    where
802        T: Transport + Send + 'static,
803    {
804        // Top-level server loop Cx (non-test). Per-request budgets are applied in `handle_request`.
805        let cx = Cx::for_request();
806        self.run_transport_with_cx(&cx, transport)
807    }
808
809    /// Runs the server on a custom transport with a provided Cx.
810    ///
811    /// This allows integration with a real asupersync runtime.
812    pub fn run_transport_with_cx<T>(self, cx: &Cx, transport: T) -> !
813    where
814        T: Transport + Send + 'static,
815    {
816        self.init_rich_logging();
817
818        let shared = SharedTransport::new(transport);
819        let notification_sender = create_transport_notification_sender(shared.clone());
820
821        let shared_recv = shared.clone();
822        let shared_send = shared;
823        self.run_loop(
824            cx,
825            move |cx| shared_recv.recv(cx),
826            move |cx, message| shared_send.send(cx, message),
827            notification_sender,
828        )
829    }
830
831    /// Runs the server on a custom transport and returns when the transport closes or the Cx is cancelled.
832    ///
833    /// Unlike [`run_transport_with_cx`](Self::run_transport_with_cx), this does not call
834    /// `std::process::exit` on shutdown. This is useful for tests and embedding where you need
835    /// the server loop to be joinable.
836    pub fn run_transport_returning_with_cx<T>(self, cx: &Cx, transport: T)
837    where
838        T: Transport + Send + 'static,
839    {
840        self.init_rich_logging();
841
842        let shared = SharedTransport::new(transport);
843        let notification_sender = create_transport_notification_sender(shared.clone());
844
845        let shared_recv = shared.clone();
846        let shared_send = shared;
847        self.run_loop_returning(
848            cx,
849            move |cx| shared_recv.recv(cx),
850            move |cx, message| shared_send.send(cx, message),
851            notification_sender,
852        );
853    }
854
855    /// Runs the server on a custom transport and returns when the transport closes.
856    ///
857    /// This uses a request-scoped [`Cx`], but unlike [`run_transport`](Self::run_transport) it does
858    /// not exit the process.
859    pub fn run_transport_returning<T>(self, transport: T)
860    where
861        T: Transport + Send + 'static,
862    {
863        // Top-level server loop Cx (non-test). Per-request budgets are applied in `handle_request`.
864        let cx = Cx::for_request();
865        self.run_transport_returning_with_cx(&cx, transport);
866    }
867
868    /// Runs the server using SSE transport with a testing Cx.
869    ///
870    /// This is a convenience wrapper around [`SseServerTransport`].
871    pub fn run_sse<W, R>(self, writer: W, request_source: R, endpoint_url: impl Into<String>) -> !
872    where
873        W: Write + Send + 'static,
874        R: Iterator<Item = JsonRpcRequest> + Send + 'static,
875    {
876        let transport = SseServerTransport::new(writer, request_source, endpoint_url);
877        self.run_transport(transport)
878    }
879
880    /// Runs the server using SSE transport with a provided Cx.
881    pub fn run_sse_with_cx<W, R>(
882        self,
883        cx: &Cx,
884        writer: W,
885        request_source: R,
886        endpoint_url: impl Into<String>,
887    ) -> !
888    where
889        W: Write + Send + 'static,
890        R: Iterator<Item = JsonRpcRequest> + Send + 'static,
891    {
892        let transport = SseServerTransport::new(writer, request_source, endpoint_url);
893        self.run_transport_with_cx(cx, transport)
894    }
895
896    /// Runs the server using WebSocket transport with a testing Cx.
897    ///
898    /// This is a convenience wrapper around [`WsTransport`].
899    pub fn run_websocket<R, W>(self, reader: R, writer: W) -> !
900    where
901        R: Read + Send + 'static,
902        W: Write + Send + 'static,
903    {
904        let transport = WsTransport::new(reader, writer);
905        self.run_transport(transport)
906    }
907
908    /// Runs the server using WebSocket transport with a provided Cx.
909    pub fn run_websocket_with_cx<R, W>(self, cx: &Cx, reader: R, writer: W) -> !
910    where
911        R: Read + Send + 'static,
912        W: Write + Send + 'static,
913    {
914        let transport = WsTransport::new(reader, writer);
915        self.run_transport_with_cx(cx, transport)
916    }
917
918    // =========================================================================
919    // HTTP Server — turnkey Streamable HTTP transport
920    // =========================================================================
921
922    /// Runs the server on HTTP transport, listening on the given address.
923    ///
924    /// This is the HTTP equivalent of [`run_stdio`](Self::run_stdio). It binds a
925    /// TCP listener, accepts connections, and dispatches MCP JSON-RPC requests
926    /// over HTTP.
927    ///
928    /// # Routes
929    ///
930    /// | Method  | Path       | Description                              |
931    /// |---------|------------|------------------------------------------|
932    /// | POST    | `/mcp`     | MCP JSON-RPC handler                     |
933    /// | GET     | `/health`  | Health check (`{"status": "ok"}`)        |
934    /// | OPTIONS | `/mcp`     | CORS preflight (via `HttpRequestHandler`) |
935    ///
936    /// # Example
937    ///
938    /// ```ignore
939    /// Server::new("my-server", "1.0.0")
940    ///     .tool(my_tool)
941    ///     .build()
942    ///     .run_http("0.0.0.0:3000");
943    /// ```
944    pub fn run_http(self, addr: impl Into<String>) -> ! {
945        let cx = Cx::for_request();
946        self.run_http_with_cx(&cx, addr)
947    }
948
949    /// Runs the server on HTTP with a provided [`Cx`].
950    ///
951    /// This allows integration with a real asupersync runtime.
952    pub fn run_http_with_cx(self, cx: &Cx, addr: impl Into<String>) -> ! {
953        self.run_http_accept_loop(cx, addr.into(), false);
954        // run_http_accept_loop only returns when `returning` is true.
955        unreachable!()
956    }
957
958    /// Runs the server on HTTP and returns when the listener closes or the [`Cx`]
959    /// is cancelled.
960    ///
961    /// Unlike [`run_http`](Self::run_http), this does **not** call
962    /// `std::process::exit` on shutdown. This is useful for tests and embedding.
963    pub fn run_http_returning(self, addr: impl Into<String>) {
964        let cx = Cx::for_request();
965        self.run_http_returning_with_cx(&cx, addr);
966    }
967
968    /// Full control: custom [`Cx`] + returns on shutdown.
969    pub fn run_http_returning_with_cx(self, cx: &Cx, addr: impl Into<String>) {
970        self.run_http_accept_loop(cx, addr.into(), true);
971    }
972
973    /// Core HTTP accept loop shared by all `run_http*` variants.
974    ///
975    /// When `returning` is `false` the method calls `std::process::exit` on
976    /// shutdown (matching the behaviour of the stdio/transport family).
977    ///
978    /// Each accepted connection is handled in its own thread, enabling
979    /// concurrent tool dispatch across HTTP connections. The number of
980    /// concurrent connections is bounded by `HttpServerConfig::max_connections`.
981    #[allow(clippy::too_many_lines)]
982    fn run_http_accept_loop(self, cx: &Cx, addr: String, returning: bool) {
983        self.init_rich_logging();
984
985        // Bind the TCP listener.
986        let listener = match TcpListener::bind(&addr) {
987            Ok(l) => l,
988            Err(e) => {
989                error!(target: targets::TRANSPORT, "Failed to bind HTTP listener on {}: {}", addr, e);
990                if returning {
991                    return;
992                }
993                std::process::exit(1);
994            }
995        };
996
997        // Poll accept in nonblocking mode so cancellation/shutdown can be
998        // observed promptly even when no clients are connecting.
999        let _ = listener.set_nonblocking(true);
1000
1001        info!(target: targets::SERVER, "HTTP server listening on {}", addr);
1002
1003        // Extract http_config paths before wrapping self in Arc.
1004        let mcp_path = self.http_config.mcp_path.clone();
1005        let health_path = self.http_config.health_path.clone();
1006        let max_connections = self.http_config.max_connections;
1007
1008        // Set up per-server state shared across connections.
1009        let session = Arc::new(Mutex::new(Session::new(
1010            self.info.clone(),
1011            self.capabilities.clone(),
1012        )));
1013
1014        // Notification sender — for HTTP we log notifications since there is no
1015        // persistent outbound channel per connection.
1016        let notification_sender: NotificationSender = Arc::new(|request: JsonRpcRequest| {
1017            log::debug!(
1018                target: targets::SERVER,
1019                "HTTP notification (not deliverable to client): {}",
1020                request.method
1021            );
1022        });
1023
1024        // Request sender for bidirectional communication.
1025        let request_sender = Arc::new({
1026            let send_fn: bidirectional::TransportSendFn = Arc::new(|_message| {
1027                // HTTP is request/response — server-to-client requests are not
1028                // deliverable.  Log and drop.
1029                Err("HTTP transport does not support server-to-client requests".into())
1030            });
1031            bidirectional::RequestSender::new(self.pending_requests.clone(), send_fn)
1032        });
1033
1034        // Track connection opened.
1035        if let Some(ref stats) = self.stats {
1036            stats.connection_opened();
1037        }
1038
1039        // Render startup banner (with HTTP transport name).
1040        if self.console_config.show_banner && !banner_suppressed() {
1041            self.render_http_startup_banner(&addr);
1042        }
1043
1044        // Run startup hook.
1045        if !self.run_startup_hook() {
1046            error!(target: targets::SERVER, "Startup hook failed");
1047            if returning {
1048                self.graceful_shutdown_returning();
1049                return;
1050            }
1051            self.graceful_shutdown(1);
1052        }
1053
1054        // Build the HTTP request handler with the server's config (or default).
1055        let http_handler = Arc::new(HttpRequestHandler::new());
1056
1057        // Traffic renderer.
1058        let traffic_renderer = Arc::new(if self.console_config.show_request_traffic {
1059            let mut renderer = RequestResponseRenderer::new(self.console_config.resolve_context());
1060            renderer.truncate_at = self.console_config.truncate_at;
1061            match self.console_config.traffic_verbosity {
1062                TrafficVerbosity::None => {}
1063                TrafficVerbosity::Summary | TrafficVerbosity::Headers => {
1064                    renderer.show_params = false;
1065                    renderer.show_result = false;
1066                }
1067                TrafficVerbosity::Full => {
1068                    renderer.show_params = true;
1069                    renderer.show_result = true;
1070                }
1071            }
1072            Some(renderer)
1073        } else {
1074            None
1075        });
1076
1077        // Wrap self in Arc for sharing across connection handler threads.
1078        let server = Arc::new(self);
1079
1080        // Active connection counter for max_connections enforcement.
1081        let active_connections = Arc::new(AtomicUsize::new(0));
1082
1083        // Accept loop — each connection is handled in its own thread.
1084        loop {
1085            if cx.is_cancel_requested() {
1086                info!(target: targets::SERVER, "Cancellation requested, shutting down HTTP server");
1087                if returning {
1088                    server.graceful_shutdown_returning();
1089                    return;
1090                }
1091                server.graceful_shutdown(0);
1092            }
1093
1094            let (stream, peer_addr) = match listener.accept() {
1095                Ok(pair) => pair,
1096                Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
1097                    // Non-blocking timeout — check cancellation and retry.
1098                    std::thread::sleep(Duration::from_millis(10));
1099                    continue;
1100                }
1101                Err(e) => {
1102                    error!(target: targets::TRANSPORT, "Failed to accept connection: {}", e);
1103                    continue;
1104                }
1105            };
1106
1107            // Enforce max_connections.
1108            let current = active_connections.load(Ordering::Relaxed);
1109            if current >= max_connections {
1110                debug!(
1111                    target: targets::TRANSPORT,
1112                    "Rejecting connection from {} (max_connections {} reached)",
1113                    peer_addr,
1114                    max_connections
1115                );
1116                // Write a 503 Service Unavailable and close.
1117                if let Ok(reader_stream) = stream.try_clone() {
1118                    let mut http_transport =
1119                        HttpTransport::new(BufReader::new(reader_stream), BufWriter::new(stream));
1120                    let _ = http_transport.write_response(
1121                        &HttpResponse::new(HttpStatus::SERVICE_UNAVAILABLE)
1122                            .with_json(&serde_json::json!({"error": "too many connections"})),
1123                    );
1124                }
1125                continue;
1126            }
1127
1128            debug!(
1129                target: targets::TRANSPORT,
1130                "Accepted HTTP connection from {}",
1131                peer_addr
1132            );
1133
1134            // The listener is nonblocking so the accepted socket may inherit
1135            // that mode on some platforms; force blocking reads for
1136            // HttpTransport's request/response flow.
1137            let _ = stream.set_nonblocking(false);
1138
1139            // Clone shared state for the connection handler thread.
1140            let server = Arc::clone(&server);
1141            let session = Arc::clone(&session);
1142            let notification_sender = Arc::clone(&notification_sender);
1143            let request_sender = Arc::clone(&request_sender);
1144            let http_handler = Arc::clone(&http_handler);
1145            let traffic_renderer = Arc::clone(&traffic_renderer);
1146            let active_connections = Arc::clone(&active_connections);
1147            let mcp_path = mcp_path.clone();
1148            let health_path = health_path.clone();
1149            let conn_cx = cx.clone();
1150
1151            // Increment active connection count.
1152            active_connections.fetch_add(1, Ordering::Relaxed);
1153
1154            // Spawn a thread to handle this connection concurrently.
1155            std::thread::spawn(move || {
1156                // Ensure connection count is decremented when this thread exits.
1157                struct ConnectionGuard(Arc<AtomicUsize>);
1158                impl Drop for ConnectionGuard {
1159                    fn drop(&mut self) {
1160                        self.0.fetch_sub(1, Ordering::Relaxed);
1161                    }
1162                }
1163                let _guard = ConnectionGuard(active_connections);
1164
1165                // Read the HTTP request from the connection.
1166                let reader = BufReader::new(match stream.try_clone() {
1167                    Ok(s) => s,
1168                    Err(e) => {
1169                        error!(target: targets::TRANSPORT, "Failed to clone TCP stream: {}", e);
1170                        return;
1171                    }
1172                });
1173                let writer = BufWriter::new(stream);
1174                let mut http_transport: HttpTransport<
1175                    BufReader<std::net::TcpStream>,
1176                    BufWriter<std::net::TcpStream>,
1177                > = HttpTransport::new(reader, writer);
1178
1179                let http_request = match http_transport.read_request() {
1180                    Ok(req) => req,
1181                    Err(e) => {
1182                        debug!(target: targets::TRANSPORT, "Failed to read HTTP request: {}", e);
1183                        return;
1184                    }
1185                };
1186
1187                // Route by path and method.
1188                let response = if http_request.path == health_path
1189                    && http_request.method == HttpMethod::Get
1190                {
1191                    // Health-check endpoint.
1192                    HttpResponse::ok().with_json(&serde_json::json!({"status": "ok"}))
1193                } else if http_request.path == mcp_path
1194                    && http_request.method == HttpMethod::Options
1195                {
1196                    // CORS preflight.
1197                    http_handler.handle_options(&http_request)
1198                } else if http_request.path == mcp_path && http_request.method == HttpMethod::Post {
1199                    // MCP JSON-RPC handler.
1200                    server.handle_http_mcp_request(
1201                        &conn_cx,
1202                        &session,
1203                        &http_handler,
1204                        &http_request,
1205                        &notification_sender,
1206                        &request_sender,
1207                        &traffic_renderer,
1208                    )
1209                } else {
1210                    // 404 for anything else.
1211                    HttpResponse::new(HttpStatus::NOT_FOUND)
1212                        .with_json(&serde_json::json!({"error": "not found"}))
1213                };
1214
1215                // Write the response.
1216                if let Err(e) = http_transport.write_response(&response) {
1217                    debug!(target: targets::TRANSPORT, "Failed to write HTTP response: {}", e);
1218                }
1219            });
1220        }
1221    }
1222
1223    /// Processes a single MCP JSON-RPC request received over HTTP.
1224    fn handle_http_mcp_request(
1225        &self,
1226        cx: &Cx,
1227        session: &Arc<Mutex<Session>>,
1228        http_handler: &HttpRequestHandler,
1229        http_request: &HttpRequest,
1230        notification_sender: &NotificationSender,
1231        request_sender: &bidirectional::RequestSender,
1232        traffic_renderer: &Option<RequestResponseRenderer>,
1233    ) -> HttpResponse {
1234        // Parse the JSON-RPC request from the HTTP body.
1235        let json_rpc = match http_handler.parse_request(http_request) {
1236            Ok(r) => r,
1237            Err(e) => {
1238                debug!(target: targets::TRANSPORT, "Invalid MCP request: {}", e);
1239                return http_handler
1240                    .error_response(HttpStatus::BAD_REQUEST, &format!("Invalid request: {e}"));
1241            }
1242        };
1243
1244        // Log request traffic.
1245        if let Some(renderer) = traffic_renderer {
1246            renderer.render_request(&json_rpc, console());
1247        }
1248
1249        // Track bytes received.
1250        if let Some(ref stats) = self.stats {
1251            if let Ok(json) = serde_json::to_string(&json_rpc) {
1252                stats.add_bytes_received(json.len() as u64 + 1);
1253            }
1254        }
1255
1256        let start_time = Instant::now();
1257
1258        let execution_mode = HttpRequestExecutionMode::for_request(&self.router, &json_rpc);
1259
1260        // Dispatch through the server's handler. Concurrent HTTP methods take
1261        // a lock-free view of session metadata while sharing live session
1262        // state; request-local auth now lives on McpContext instead of being
1263        // written into SessionState.
1264        let response_opt = match execution_mode {
1265            HttpRequestExecutionMode::ConcurrentReadOnly => {
1266                let session_view = {
1267                    let session_guard = match session.lock() {
1268                        Ok(g) => g,
1269                        Err(poisoned) => {
1270                            error!(target: targets::SERVER, "Session lock poisoned, recovering");
1271                            poisoned.into_inner()
1272                        }
1273                    };
1274                    SessionView::from_session(&session_guard)
1275                };
1276                self.handle_request_with_view(
1277                    cx,
1278                    &session_view,
1279                    json_rpc,
1280                    notification_sender,
1281                    request_sender,
1282                )
1283            }
1284            HttpRequestExecutionMode::ExclusiveSession => {
1285                let mut session_guard = match session.lock() {
1286                    Ok(g) => g,
1287                    Err(poisoned) => {
1288                        error!(target: targets::SERVER, "Session lock poisoned, recovering");
1289                        poisoned.into_inner()
1290                    }
1291                };
1292                self.handle_request(
1293                    cx,
1294                    &mut session_guard,
1295                    json_rpc,
1296                    notification_sender,
1297                    request_sender,
1298                )
1299            }
1300        };
1301
1302        let duration = start_time.elapsed();
1303
1304        match response_opt {
1305            Some(json_rpc_response) => {
1306                // Log response traffic.
1307                if let Some(renderer) = traffic_renderer {
1308                    renderer.render_response(&json_rpc_response, Some(duration), console());
1309                }
1310
1311                // Track bytes sent.
1312                if let Some(ref stats) = self.stats {
1313                    if let Ok(json) = serde_json::to_string(&json_rpc_response) {
1314                        stats.add_bytes_sent(json.len() as u64 + 1);
1315                    }
1316                }
1317
1318                let origin = http_request.header("origin");
1319                http_handler.create_response(&json_rpc_response, origin)
1320            }
1321            None => {
1322                // Notification (no response expected) — return 202 Accepted.
1323                HttpResponse::new(HttpStatus::ACCEPTED)
1324            }
1325        }
1326    }
1327
1328    /// Renders the HTTP-specific startup banner.
1329    fn render_http_startup_banner(&self, addr: &str) {
1330        let render = || {
1331            let transport_label = format!("http://{addr}");
1332            let mut banner = StartupBanner::new(&self.info.name, &self.info.version)
1333                .tools(self.router.tools_count())
1334                .resources(self.router.resources_count())
1335                .prompts(self.router.prompts_count())
1336                .transport(&transport_label);
1337
1338            if let Some(desc) = self.instructions.as_deref().filter(|d| !d.is_empty()) {
1339                banner = banner.description(desc);
1340            }
1341
1342            match self.console_config.banner_style {
1343                BannerStyle::Full => banner.render(console()),
1344                BannerStyle::Compact | BannerStyle::Minimal => {
1345                    banner.no_logo().render(console());
1346                }
1347                BannerStyle::None => {}
1348            }
1349        };
1350
1351        if let Err(err) = std::panic::catch_unwind(std::panic::AssertUnwindSafe(render)) {
1352            eprintln!("Warning: banner rendering failed: {err:?}");
1353        }
1354    }
1355
1356    /// Runs the startup lifecycle hook, if configured.
1357    ///
1358    /// Returns `true` if startup succeeded (or no hook was configured),
1359    /// `false` if the hook returned an error.
1360    pub(crate) fn run_startup_hook(&self) -> bool {
1361        let hook = {
1362            let mut guard = self.lifespan.lock().unwrap_or_else(|poisoned| {
1363                error!(target: targets::SERVER, "lifespan lock poisoned in run_startup_hook, recovering");
1364                poisoned.into_inner()
1365            });
1366            guard.as_mut().and_then(|h| h.on_startup.take())
1367        };
1368
1369        if let Some(hook) = hook {
1370            debug!(target: targets::SERVER, "Running startup hook");
1371            match hook() {
1372                Ok(()) => {
1373                    debug!(target: targets::SERVER, "Startup hook completed successfully");
1374                    true
1375                }
1376                Err(e) => {
1377                    error!(target: targets::SERVER, "Startup hook failed: {}", e);
1378                    false
1379                }
1380            }
1381        } else {
1382            true
1383        }
1384    }
1385
1386    /// Runs the shutdown lifecycle hook, if configured.
1387    pub(crate) fn run_shutdown_hook(&self) {
1388        let hook = {
1389            let mut guard = self.lifespan.lock().unwrap_or_else(|poisoned| {
1390                error!(target: targets::SERVER, "lifespan lock poisoned in run_shutdown_hook, recovering");
1391                poisoned.into_inner()
1392            });
1393            guard.as_mut().and_then(|h| h.on_shutdown.take())
1394        };
1395
1396        if let Some(hook) = hook {
1397            debug!(target: targets::SERVER, "Running shutdown hook");
1398            hook();
1399            debug!(target: targets::SERVER, "Shutdown hook completed");
1400        }
1401    }
1402
1403    /// Performs graceful shutdown: runs hook, closes stats, exits.
1404    fn graceful_shutdown(&self, exit_code: i32) -> ! {
1405        self.cancel_active_requests(CancelKind::Shutdown, true);
1406        self.run_shutdown_hook();
1407        if let Some(ref stats) = self.stats {
1408            stats.connection_closed();
1409        }
1410        std::process::exit(exit_code)
1411    }
1412
1413    /// Performs graceful shutdown without exiting the process.
1414    ///
1415    /// This is intended for embedding/testing scenarios where the server loop is
1416    /// running on a thread and the caller wants to `join()` it.
1417    fn graceful_shutdown_returning(&self) {
1418        self.cancel_active_requests(CancelKind::Shutdown, true);
1419        self.run_shutdown_hook();
1420        if let Some(ref stats) = self.stats {
1421            stats.connection_closed();
1422        }
1423    }
1424
1425    /// Shared server loop for any transport, using closure-based recv/send.
1426    fn run_loop<R, S>(
1427        self,
1428        cx: &Cx,
1429        mut recv: R,
1430        send: S,
1431        notification_sender: NotificationSender,
1432    ) -> !
1433    where
1434        R: FnMut(&Cx) -> Result<JsonRpcMessage, TransportError>,
1435        S: FnMut(&Cx, &JsonRpcMessage) -> Result<(), TransportError> + Send + Sync + 'static,
1436    {
1437        let mut session = Session::new(self.info.clone(), self.capabilities.clone());
1438
1439        // Wrap send in Arc<Mutex> for shared access from bidirectional requests
1440        let send = Arc::new(Mutex::new(send));
1441
1442        // Create a RequestSender for bidirectional communication
1443        let request_sender = {
1444            let send_clone = send.clone();
1445            let send_cx = cx.clone();
1446            let send_fn: bidirectional::TransportSendFn = Arc::new(move |message| {
1447                let mut guard = send_clone
1448                    .lock()
1449                    .map_err(|e| format!("Lock poisoned: {}", e))?;
1450                guard(&send_cx, message).map_err(|e| format!("Send failed: {}", e))
1451            });
1452            bidirectional::RequestSender::new(self.pending_requests.clone(), send_fn)
1453        };
1454
1455        // Track connection opened
1456        if let Some(ref stats) = self.stats {
1457            stats.connection_opened();
1458        }
1459
1460        // Render startup banner if enabled (respects both config and legacy env var)
1461        if self.console_config.show_banner && !banner_suppressed() {
1462            self.render_startup_banner();
1463        }
1464
1465        // Run startup hook
1466        if !self.run_startup_hook() {
1467            error!(target: targets::SERVER, "Startup hook failed, exiting");
1468            self.graceful_shutdown(1);
1469        }
1470
1471        // Create traffic renderer if enabled
1472        let traffic_renderer = if self.console_config.show_request_traffic {
1473            let mut renderer = RequestResponseRenderer::new(self.console_config.resolve_context());
1474            renderer.truncate_at = self.console_config.truncate_at;
1475            match self.console_config.traffic_verbosity {
1476                TrafficVerbosity::None => {} // Should not happen given the if check
1477                TrafficVerbosity::Summary | TrafficVerbosity::Headers => {
1478                    renderer.show_params = false;
1479                    renderer.show_result = false;
1480                }
1481                TrafficVerbosity::Full => {
1482                    renderer.show_params = true;
1483                    renderer.show_result = true;
1484                }
1485            }
1486            Some(renderer)
1487        } else {
1488            None
1489        };
1490
1491        // Main request loop
1492        loop {
1493            // Check for cancellation
1494            if cx.is_cancel_requested() {
1495                info!(target: targets::SERVER, "Cancellation requested, shutting down");
1496                self.graceful_shutdown(0);
1497            }
1498
1499            // Receive next message
1500            let message = match recv(cx) {
1501                Ok(msg) => msg,
1502                Err(TransportError::Closed) => {
1503                    // Clean shutdown - track connection close
1504                    self.graceful_shutdown(0);
1505                }
1506                Err(TransportError::Cancelled) => {
1507                    info!(target: targets::SERVER, "Transport cancelled");
1508                    self.graceful_shutdown(0);
1509                }
1510                Err(e) => {
1511                    error!(target: targets::TRANSPORT, "Transport error: {}", e);
1512                    continue;
1513                }
1514            };
1515
1516            // Log request traffic
1517            if let Some(renderer) = &traffic_renderer {
1518                if let JsonRpcMessage::Request(req) = &message {
1519                    renderer.render_request(req, console());
1520                }
1521            }
1522
1523            let start_time = Instant::now();
1524
1525            // Handle the message
1526            let response_opt = match message {
1527                JsonRpcMessage::Request(request) => {
1528                    // Track bytes received (approximate from serialized request size)
1529                    if let Some(ref stats) = self.stats {
1530                        // Estimate request size by serializing back to JSON
1531                        // This is approximate but accurate enough for statistics
1532                        if let Ok(json) = serde_json::to_string(&request) {
1533                            stats.add_bytes_received(json.len() as u64 + 1); // +1 for newline
1534                        }
1535                    }
1536                    self.handle_request(
1537                        cx,
1538                        &mut session,
1539                        request,
1540                        &notification_sender,
1541                        &request_sender,
1542                    )
1543                }
1544                JsonRpcMessage::Response(response) => {
1545                    // Route response to pending server-initiated request (bidirectional)
1546                    if self.pending_requests.route_response(&response) {
1547                        debug!(target: targets::SERVER, "Routed response to pending request");
1548                    } else {
1549                        debug!(target: targets::SERVER, "Received unexpected response: {:?}", response.id);
1550                    }
1551                    continue;
1552                }
1553            };
1554
1555            let duration = start_time.elapsed();
1556
1557            if let Some(response) = response_opt {
1558                // Log response traffic
1559                if let Some(renderer) = &traffic_renderer {
1560                    renderer.render_response(&response, Some(duration), console());
1561                }
1562
1563                // Track bytes sent (approximate from serialized response size)
1564                if let Some(ref stats) = self.stats {
1565                    if let Ok(json) = serde_json::to_string(&response) {
1566                        stats.add_bytes_sent(json.len() as u64 + 1); // +1 for newline
1567                    }
1568                }
1569
1570                // Send response
1571                let send_result = {
1572                    let mut guard = match send.lock() {
1573                        Ok(guard) => guard,
1574                        Err(poisoned) => {
1575                            error!(
1576                                target: targets::TRANSPORT,
1577                                "Send channel lock poisoned; continuing with inner guard"
1578                            );
1579                            poisoned.into_inner()
1580                        }
1581                    };
1582                    guard(cx, &JsonRpcMessage::Response(response))
1583                };
1584                if let Err(e) = send_result {
1585                    error!(target: targets::TRANSPORT, "Failed to send response: {}", e);
1586                }
1587            }
1588        }
1589    }
1590
1591    /// Shared server loop for embedding/testing, returning on shutdown instead of exiting.
1592    ///
1593    /// This is intentionally separate from [`run_loop`](Self::run_loop) because the primary server
1594    /// entrypoints use `std::process::exit` on shutdown for subprocess use-cases.
1595    #[allow(clippy::too_many_lines)]
1596    fn run_loop_returning<R, S>(
1597        self,
1598        cx: &Cx,
1599        mut recv: R,
1600        send: S,
1601        notification_sender: NotificationSender,
1602    ) where
1603        R: FnMut(&Cx) -> Result<JsonRpcMessage, TransportError>,
1604        S: FnMut(&Cx, &JsonRpcMessage) -> Result<(), TransportError> + Send + Sync + 'static,
1605    {
1606        let mut session = Session::new(self.info.clone(), self.capabilities.clone());
1607
1608        // Wrap send in Arc<Mutex> for shared access from bidirectional requests
1609        let send = Arc::new(Mutex::new(send));
1610
1611        // Create a RequestSender for bidirectional communication
1612        let request_sender = {
1613            let send_clone = send.clone();
1614            let send_cx = cx.clone();
1615            let send_fn: bidirectional::TransportSendFn = Arc::new(move |message| {
1616                let mut guard = send_clone
1617                    .lock()
1618                    .map_err(|e| format!("Lock poisoned: {}", e))?;
1619                guard(&send_cx, message).map_err(|e| format!("Send failed: {}", e))
1620            });
1621            bidirectional::RequestSender::new(self.pending_requests.clone(), send_fn)
1622        };
1623
1624        // Track connection opened
1625        if let Some(ref stats) = self.stats {
1626            stats.connection_opened();
1627        }
1628
1629        // Render startup banner if enabled (respects both config and legacy env var)
1630        if self.console_config.show_banner && !banner_suppressed() {
1631            self.render_startup_banner();
1632        }
1633
1634        // Run startup hook
1635        if !self.run_startup_hook() {
1636            error!(target: targets::SERVER, "Startup hook failed, stopping");
1637            self.graceful_shutdown_returning();
1638            return;
1639        }
1640
1641        // Create traffic renderer if enabled
1642        let traffic_renderer = if self.console_config.show_request_traffic {
1643            let mut renderer = RequestResponseRenderer::new(self.console_config.resolve_context());
1644            renderer.truncate_at = self.console_config.truncate_at;
1645            match self.console_config.traffic_verbosity {
1646                TrafficVerbosity::None => {} // Should not happen given the if check
1647                TrafficVerbosity::Summary | TrafficVerbosity::Headers => {
1648                    renderer.show_params = false;
1649                    renderer.show_result = false;
1650                }
1651                TrafficVerbosity::Full => {
1652                    renderer.show_params = true;
1653                    renderer.show_result = true;
1654                }
1655            }
1656            Some(renderer)
1657        } else {
1658            None
1659        };
1660
1661        // Main request loop
1662        loop {
1663            // Check for cancellation
1664            if cx.is_cancel_requested() {
1665                info!(target: targets::SERVER, "Cancellation requested, stopping");
1666                self.graceful_shutdown_returning();
1667                return;
1668            }
1669
1670            // Receive next message
1671            let message = match recv(cx) {
1672                Ok(msg) => msg,
1673                Err(TransportError::Closed) => {
1674                    self.graceful_shutdown_returning();
1675                    return;
1676                }
1677                Err(TransportError::Cancelled) => {
1678                    info!(target: targets::SERVER, "Transport cancelled");
1679                    self.graceful_shutdown_returning();
1680                    return;
1681                }
1682                Err(e) => {
1683                    error!(target: targets::TRANSPORT, "Transport error: {}", e);
1684                    continue;
1685                }
1686            };
1687
1688            // Log request traffic
1689            if let Some(renderer) = &traffic_renderer {
1690                if let JsonRpcMessage::Request(req) = &message {
1691                    renderer.render_request(req, console());
1692                }
1693            }
1694
1695            let start_time = Instant::now();
1696
1697            // Handle the message
1698            let response_opt = match message {
1699                JsonRpcMessage::Request(request) => {
1700                    // Track bytes received (approximate from serialized request size)
1701                    if let Some(ref stats) = self.stats {
1702                        // Estimate request size by serializing back to JSON
1703                        // This is approximate but accurate enough for statistics
1704                        if let Ok(json) = serde_json::to_string(&request) {
1705                            stats.add_bytes_received(json.len() as u64 + 1); // +1 for newline
1706                        }
1707                    }
1708                    self.handle_request(
1709                        cx,
1710                        &mut session,
1711                        request,
1712                        &notification_sender,
1713                        &request_sender,
1714                    )
1715                }
1716                JsonRpcMessage::Response(response) => {
1717                    // Route response to pending server-initiated request (bidirectional)
1718                    if self.pending_requests.route_response(&response) {
1719                        debug!(target: targets::SERVER, "Routed response to pending request");
1720                    } else {
1721                        debug!(
1722                            target: targets::SERVER,
1723                            "Received unexpected response: {:?}",
1724                            response.id
1725                        );
1726                    }
1727                    continue;
1728                }
1729            };
1730
1731            let duration = start_time.elapsed();
1732
1733            if let Some(response) = response_opt {
1734                // Log response traffic
1735                if let Some(renderer) = &traffic_renderer {
1736                    renderer.render_response(&response, Some(duration), console());
1737                }
1738
1739                // Track bytes sent (approximate from serialized response size)
1740                if let Some(ref stats) = self.stats {
1741                    if let Ok(json) = serde_json::to_string(&response) {
1742                        stats.add_bytes_sent(json.len() as u64 + 1); // +1 for newline
1743                    }
1744                }
1745
1746                // Send response
1747                let send_result = {
1748                    let mut guard = match send.lock() {
1749                        Ok(guard) => guard,
1750                        Err(poisoned) => {
1751                            error!(
1752                                target: targets::TRANSPORT,
1753                                "Send channel lock poisoned; continuing with inner guard"
1754                            );
1755                            poisoned.into_inner()
1756                        }
1757                    };
1758                    guard(cx, &JsonRpcMessage::Response(response))
1759                };
1760                if let Err(e) = send_result {
1761                    error!(target: targets::TRANSPORT, "Failed to send response: {}", e);
1762                }
1763            }
1764        }
1765    }
1766
1767    /// Handles a single JSON-RPC request.
1768    fn handle_request(
1769        &self,
1770        cx: &Cx,
1771        session: &mut Session,
1772        request: JsonRpcRequest,
1773        notification_sender: &NotificationSender,
1774        request_sender: &bidirectional::RequestSender,
1775    ) -> Option<JsonRpcResponse> {
1776        let id = request.id.clone();
1777        let method = request.method.clone();
1778        let is_notification = id.is_none();
1779
1780        // Start timing for stats
1781        let start_time = Instant::now();
1782
1783        // Generate internal request ID for tracing
1784        let request_id = request_id_to_u64(id.as_ref());
1785
1786        // Create a budget for this request based on timeout configuration
1787        let budget = self.create_request_budget();
1788
1789        // Check if budget is already exhausted (should not happen, but be defensive)
1790        if budget.is_exhausted() {
1791            // Record failed request due to exhausted budget
1792            if let Some(ref stats) = self.stats {
1793                stats.record_request(&method, start_time.elapsed(), false);
1794            }
1795            // If it's a notification, we don't send an error response
1796            let response_id = id.clone()?;
1797            return Some(JsonRpcResponse::error(
1798                Some(response_id),
1799                JsonRpcError {
1800                    code: McpErrorCode::RequestCancelled.into(),
1801                    message: "Request budget exhausted".to_string(),
1802                    data: None,
1803                },
1804            ));
1805        }
1806
1807        let request_cx = if is_notification {
1808            cx.clone()
1809        } else {
1810            Cx::for_request_with_budget(budget)
1811        };
1812
1813        let _active_guard = match id.clone() {
1814            Some(request_id) => {
1815                match ActiveRequestGuard::try_new(
1816                    &self.active_requests,
1817                    request_id.clone(),
1818                    request_cx.clone(),
1819                ) {
1820                    Ok(guard) => Some(guard),
1821                    Err(duplicate_id) => {
1822                        if let Some(ref stats) = self.stats {
1823                            stats.record_request(&method, start_time.elapsed(), false);
1824                        }
1825                        let message = format!(
1826                            "Request id {duplicate_id} is already active; wait for the earlier request to finish before reusing it"
1827                        );
1828                        return Some(JsonRpcResponse::error(
1829                            Some(request_id),
1830                            JsonRpcError {
1831                                code: McpErrorCode::InvalidRequest.into(),
1832                                message,
1833                                data: None,
1834                            },
1835                        ));
1836                    }
1837                }
1838            }
1839            None => None,
1840        };
1841
1842        // Dispatch based on method, passing the budget, notification sender, and request sender
1843        let result = self.dispatch_method(
1844            &request_cx,
1845            session,
1846            request,
1847            request_id,
1848            &budget,
1849            notification_sender,
1850            request_sender,
1851        );
1852
1853        // Record statistics
1854        let latency = start_time.elapsed();
1855        if let Some(ref stats) = self.stats {
1856            match &result {
1857                Ok(_) => stats.record_request(&method, latency, true),
1858                Err(e) if e.code == fastmcp_core::McpErrorCode::RequestCancelled => {
1859                    stats.record_cancelled(&method, latency);
1860                }
1861                Err(_) => stats.record_request(&method, latency, false),
1862            }
1863        }
1864
1865        // If it's a notification (no ID), we must not reply
1866        if is_notification {
1867            if let Err(e) = result {
1868                fastmcp_core::logging::error!(
1869                    target: targets::HANDLER,
1870                    "Notification '{}' failed: {}",
1871                    method,
1872                    e
1873                );
1874            }
1875            return None;
1876        }
1877
1878        // We only reach here if `is_notification` is false, which implies `id` is present.
1879        // Use `?` to avoid `unwrap()` and keep the control-flow explicit.
1880        let response_id = id.clone()?;
1881
1882        match result {
1883            Ok(value) => Some(JsonRpcResponse::success(response_id, value)),
1884            Err(e) => {
1885                // Log full error before masking if this is an internal error
1886                if self.mask_error_details && e.is_internal() {
1887                    fastmcp_core::logging::error!(
1888                        target: targets::HANDLER,
1889                        "Request '{}' failed (masked in response): {}",
1890                        method,
1891                        e
1892                    );
1893                }
1894
1895                // Apply masking if enabled
1896                let masked = e.masked(self.mask_error_details);
1897                Some(JsonRpcResponse::error(
1898                    id,
1899                    JsonRpcError {
1900                        code: masked.code.into(),
1901                        message: masked.message,
1902                        data: masked.data,
1903                    },
1904                ))
1905            }
1906        }
1907    }
1908
1909    fn handle_request_with_view(
1910        &self,
1911        cx: &Cx,
1912        session: &SessionView,
1913        request: JsonRpcRequest,
1914        notification_sender: &NotificationSender,
1915        request_sender: &bidirectional::RequestSender,
1916    ) -> Option<JsonRpcResponse> {
1917        let id = request.id.clone();
1918        let method = request.method.clone();
1919        let is_notification = id.is_none();
1920
1921        let start_time = Instant::now();
1922        let request_id = request_id_to_u64(id.as_ref());
1923        let budget = self.create_request_budget();
1924
1925        if budget.is_exhausted() {
1926            if let Some(ref stats) = self.stats {
1927                stats.record_request(&method, start_time.elapsed(), false);
1928            }
1929            let response_id = id.clone()?;
1930            return Some(JsonRpcResponse::error(
1931                Some(response_id),
1932                JsonRpcError {
1933                    code: McpErrorCode::RequestCancelled.into(),
1934                    message: "Request budget exhausted".to_string(),
1935                    data: None,
1936                },
1937            ));
1938        }
1939
1940        let request_cx = if is_notification {
1941            cx.clone()
1942        } else {
1943            Cx::for_request_with_budget(budget)
1944        };
1945
1946        let _active_guard = match id.clone() {
1947            Some(request_id) => {
1948                match ActiveRequestGuard::try_new(
1949                    &self.active_requests,
1950                    request_id.clone(),
1951                    request_cx.clone(),
1952                ) {
1953                    Ok(guard) => Some(guard),
1954                    Err(duplicate_id) => {
1955                        if let Some(ref stats) = self.stats {
1956                            stats.record_request(&method, start_time.elapsed(), false);
1957                        }
1958                        let message = format!(
1959                            "Request id {duplicate_id} is already active; wait for the earlier request to finish before reusing it"
1960                        );
1961                        return Some(JsonRpcResponse::error(
1962                            Some(request_id),
1963                            JsonRpcError {
1964                                code: McpErrorCode::InvalidRequest.into(),
1965                                message,
1966                                data: None,
1967                            },
1968                        ));
1969                    }
1970                }
1971            }
1972            None => None,
1973        };
1974
1975        let result = self.dispatch_read_only_http_method(
1976            &request_cx,
1977            session,
1978            request,
1979            request_id,
1980            &budget,
1981            notification_sender,
1982            request_sender,
1983        );
1984
1985        let latency = start_time.elapsed();
1986        if let Some(ref stats) = self.stats {
1987            match &result {
1988                Ok(_) => stats.record_request(&method, latency, true),
1989                Err(e) if e.code == fastmcp_core::McpErrorCode::RequestCancelled => {
1990                    stats.record_cancelled(&method, latency);
1991                }
1992                Err(_) => stats.record_request(&method, latency, false),
1993            }
1994        }
1995
1996        if is_notification {
1997            if let Err(e) = result {
1998                fastmcp_core::logging::error!(
1999                    target: targets::HANDLER,
2000                    "Notification '{}' failed: {}",
2001                    method,
2002                    e
2003                );
2004            }
2005            return None;
2006        }
2007
2008        let response_id = id.clone()?;
2009
2010        match result {
2011            Ok(value) => Some(JsonRpcResponse::success(response_id, value)),
2012            Err(e) => {
2013                if self.mask_error_details && e.is_internal() {
2014                    fastmcp_core::logging::error!(
2015                        target: targets::HANDLER,
2016                        "Request '{}' failed (masked in response): {}",
2017                        method,
2018                        e
2019                    );
2020                }
2021
2022                let masked = e.masked(self.mask_error_details);
2023                Some(JsonRpcResponse::error(
2024                    id,
2025                    JsonRpcError {
2026                        code: masked.code.into(),
2027                        message: masked.message,
2028                        data: masked.data,
2029                    },
2030                ))
2031            }
2032        }
2033    }
2034
2035    /// Creates a budget for a new request based on server configuration.
2036    fn create_request_budget(&self) -> Budget {
2037        if self.request_timeout_secs == 0 {
2038            // No timeout - unlimited budget
2039            Budget::INFINITE
2040        } else {
2041            // Budget deadlines are absolute (`Time` since runtime epoch). We use `wall_now()`
2042            // so timeouts work even when running outside a full asupersync scheduler.
2043            let now = wall_now();
2044            let timeout_ns = self.request_timeout_secs.saturating_mul(1_000_000_000);
2045            let deadline = now.saturating_add_nanos(timeout_ns);
2046            Budget::new().with_deadline(deadline)
2047        }
2048    }
2049
2050    /// Dispatches a request to the appropriate handler.
2051    #[allow(clippy::too_many_arguments, clippy::too_many_lines)]
2052    fn dispatch_method(
2053        &self,
2054        cx: &Cx,
2055        session: &mut Session,
2056        request: JsonRpcRequest,
2057        request_id: u64,
2058        budget: &Budget,
2059        notification_sender: &NotificationSender,
2060        request_sender: &bidirectional::RequestSender,
2061    ) -> Result<serde_json::Value, McpError> {
2062        // Check cancellation before dispatch
2063        if cx.is_cancel_requested() {
2064            return Err(McpError::request_cancelled());
2065        }
2066
2067        // Check budget before dispatch (for poll-based exhaustion)
2068        if budget.is_exhausted() {
2069            return Err(McpError::new(
2070                McpErrorCode::RequestCancelled,
2071                "Request budget exhausted",
2072            ));
2073        }
2074        // Deadline exhaustion is time-based and must be checked against current time.
2075        if budget.is_past_deadline(wall_now()) {
2076            cx.cancel_fast(CancelKind::Deadline);
2077            return Err(McpError::new(
2078                McpErrorCode::RequestCancelled,
2079                "Request timeout exceeded",
2080            ));
2081        }
2082
2083        // Check initialization state
2084        if !session.is_initialized() && request.method != "initialize" && request.method != "ping" {
2085            return Err(McpError::invalid_request(
2086                "Server not initialized. Client must send 'initialize' first.",
2087            ));
2088        }
2089
2090        if let Some(task_manager) = &self.task_manager {
2091            task_manager.set_notification_sender(Arc::clone(notification_sender));
2092        }
2093
2094        let mut mw_ctx = McpContext::with_state(cx.clone(), request_id, session.state().clone());
2095        let request_auth = if self.should_authenticate(&request.method) {
2096            let auth_request = AuthRequest {
2097                method: &request.method,
2098                params: request.params.as_ref(),
2099                request_id,
2100            };
2101            match self.authenticate_request(cx, request_id, session, auth_request) {
2102                Ok(auth) => Some(auth),
2103                Err(err) => {
2104                    let err = self.apply_global_middleware_error(&mw_ctx, &request, err);
2105                    let result = Err(err);
2106                    self.maybe_emit_log_notification(
2107                        session,
2108                        notification_sender,
2109                        &request.method,
2110                        &result,
2111                    );
2112                    return result;
2113                }
2114            }
2115        } else {
2116            None
2117        };
2118        if let Some(auth) = request_auth.clone() {
2119            mw_ctx = mw_ctx.with_auth(auth);
2120        }
2121
2122        // Middleware: on_request
2123        // We use a temporary context derived from the request context for middleware
2124        // so they can access session state, request auth, and share the request's lifecycle.
2125        let mut entered_middleware: Vec<&dyn crate::Middleware> = Vec::new();
2126
2127        for m in self.middleware.iter() {
2128            entered_middleware.push(m.as_ref());
2129            match m.on_request(&mw_ctx, &request) {
2130                Ok(crate::MiddlewareDecision::Continue) => {}
2131                Ok(crate::MiddlewareDecision::Respond(v)) => {
2132                    return self.apply_middleware_response(
2133                        &entered_middleware,
2134                        &mw_ctx,
2135                        &request,
2136                        v,
2137                    );
2138                }
2139                Err(e) => {
2140                    let err =
2141                        self.apply_middleware_error(&entered_middleware, &mw_ctx, &request, e);
2142                    return Err(err);
2143                }
2144            }
2145        }
2146
2147        let dispatch_auth = mw_ctx.auth();
2148
2149        // Everything after middleware entry must flow through `result` so that:
2150        // - `on_response` runs for successes in reverse middleware order
2151        // - `on_error` runs for handler/middleware errors in reverse middleware order
2152        //
2153        // Without this, `?` would early-return from `dispatch_method` and bypass middleware error
2154        // rewriting, contradicting the ordering semantics documented in `middleware.rs`.
2155        let result: Result<serde_json::Value, McpError> = (|| {
2156            let method = &request.method;
2157            let params = request.params.clone();
2158
2159            // Create bidirectional senders based on client capabilities
2160            let bidirectional_senders = self.create_bidirectional_senders(session, request_sender);
2161
2162            match method.as_str() {
2163                "initialize" => {
2164                    let params: InitializeParams = parse_params(params)?;
2165                    let result = self.router.handle_initialize(
2166                        cx,
2167                        session,
2168                        params,
2169                        self.instructions.as_deref(),
2170                    )?;
2171                    Ok(serde_json::to_value(result).map_err(McpError::from)?)
2172                }
2173                "initialized" => {
2174                    // Notification, no response needed (but we send empty ok)
2175                    Ok(serde_json::Value::Null)
2176                }
2177                "notifications/cancelled" => {
2178                    let params: CancelledParams = parse_params(params)?;
2179                    self.handle_cancelled_notification(params);
2180                    Ok(serde_json::Value::Null)
2181                }
2182                "logging/setLevel" => {
2183                    let params: SetLogLevelParams = parse_params(params)?;
2184                    self.handle_set_log_level(session, params);
2185                    Ok(serde_json::Value::Null)
2186                }
2187                "tools/list" => {
2188                    let params: ListToolsParams = parse_params_or_default(params)?;
2189                    let result =
2190                        self.router
2191                            .handle_tools_list(cx, params, Some(session.state()))?;
2192                    Ok(serde_json::to_value(result).map_err(McpError::from)?)
2193                }
2194                "tools/call" => {
2195                    let params: CallToolParams = parse_params(params)?;
2196                    let result = self.router.handle_tools_call(
2197                        cx,
2198                        request_id,
2199                        params,
2200                        budget,
2201                        session.state().clone(),
2202                        dispatch_auth.clone(),
2203                        Some(notification_sender),
2204                        bidirectional_senders.as_ref(),
2205                    )?;
2206                    Ok(serde_json::to_value(result).map_err(McpError::from)?)
2207                }
2208                "resources/list" => {
2209                    let params: ListResourcesParams = parse_params_or_default(params)?;
2210                    let result =
2211                        self.router
2212                            .handle_resources_list(cx, params, Some(session.state()))?;
2213                    Ok(serde_json::to_value(result).map_err(McpError::from)?)
2214                }
2215                "resources/templates/list" => {
2216                    let params: ListResourceTemplatesParams = parse_params_or_default(params)?;
2217                    let result = self.router.handle_resource_templates_list(
2218                        cx,
2219                        params,
2220                        Some(session.state()),
2221                    )?;
2222                    Ok(serde_json::to_value(result).map_err(McpError::from)?)
2223                }
2224                "resources/read" => {
2225                    let params: ReadResourceParams = parse_params(params)?;
2226                    let result = self.router.handle_resources_read(
2227                        cx,
2228                        request_id,
2229                        &params,
2230                        budget,
2231                        session.state().clone(),
2232                        dispatch_auth.clone(),
2233                        Some(notification_sender),
2234                        bidirectional_senders.as_ref(),
2235                    )?;
2236                    Ok(serde_json::to_value(result).map_err(McpError::from)?)
2237                }
2238                "resources/subscribe" => {
2239                    let params: SubscribeResourceParams = parse_params(params)?;
2240                    if !self.router.resource_exists(&params.uri) {
2241                        return Err(McpError::resource_not_found(&params.uri));
2242                    }
2243                    session.subscribe_resource(params.uri);
2244                    Ok(serde_json::json!({}))
2245                }
2246                "resources/unsubscribe" => {
2247                    let params: UnsubscribeResourceParams = parse_params(params)?;
2248                    session.unsubscribe_resource(&params.uri);
2249                    Ok(serde_json::json!({}))
2250                }
2251                "prompts/list" => {
2252                    let params: ListPromptsParams = parse_params_or_default(params)?;
2253                    let result =
2254                        self.router
2255                            .handle_prompts_list(cx, params, Some(session.state()))?;
2256                    Ok(serde_json::to_value(result).map_err(McpError::from)?)
2257                }
2258                "prompts/get" => {
2259                    let params: GetPromptParams = parse_params(params)?;
2260                    let result = self.router.handle_prompts_get(
2261                        cx,
2262                        request_id,
2263                        params,
2264                        budget,
2265                        session.state().clone(),
2266                        dispatch_auth.clone(),
2267                        Some(notification_sender),
2268                        bidirectional_senders.as_ref(),
2269                    )?;
2270                    Ok(serde_json::to_value(result).map_err(McpError::from)?)
2271                }
2272                "ping" => {
2273                    // Simple ping-pong for health checks
2274                    Ok(serde_json::json!({}))
2275                }
2276                // Task methods (Docket/SEP-1686)
2277                "tasks/list" => {
2278                    let params: ListTasksParams = parse_params_or_default(params)?;
2279                    let result =
2280                        self.router
2281                            .handle_tasks_list(cx, params, self.task_manager.as_ref())?;
2282                    Ok(serde_json::to_value(result).map_err(McpError::from)?)
2283                }
2284                "tasks/get" => {
2285                    let params: GetTaskParams = parse_params(params)?;
2286                    let result =
2287                        self.router
2288                            .handle_tasks_get(cx, params, self.task_manager.as_ref())?;
2289                    Ok(serde_json::to_value(result).map_err(McpError::from)?)
2290                }
2291                "tasks/cancel" => {
2292                    let params: CancelTaskParams = parse_params(params)?;
2293                    let result =
2294                        self.router
2295                            .handle_tasks_cancel(cx, params, self.task_manager.as_ref())?;
2296                    Ok(serde_json::to_value(result).map_err(McpError::from)?)
2297                }
2298                "tasks/submit" => {
2299                    let params: SubmitTaskParams = parse_params(params)?;
2300                    let result =
2301                        self.router
2302                            .handle_tasks_submit(cx, params, self.task_manager.as_ref())?;
2303                    Ok(serde_json::to_value(result).map_err(McpError::from)?)
2304                }
2305                _ => Err(McpError::method_not_found(method)),
2306            }
2307        })();
2308
2309        let final_result = match result {
2310            Ok(v) => self.apply_middleware_response(&entered_middleware, &mw_ctx, &request, v),
2311            Err(e) => Err(self.apply_middleware_error(&entered_middleware, &mw_ctx, &request, e)),
2312        };
2313
2314        self.maybe_emit_log_notification(
2315            session,
2316            notification_sender,
2317            &request.method,
2318            &final_result,
2319        );
2320
2321        final_result
2322    }
2323
2324    fn dispatch_read_only_http_method(
2325        &self,
2326        cx: &Cx,
2327        session: &SessionView,
2328        request: JsonRpcRequest,
2329        request_id: u64,
2330        budget: &Budget,
2331        notification_sender: &NotificationSender,
2332        request_sender: &bidirectional::RequestSender,
2333    ) -> Result<serde_json::Value, McpError> {
2334        if cx.is_cancel_requested() {
2335            return Err(McpError::request_cancelled());
2336        }
2337
2338        if budget.is_exhausted() {
2339            return Err(McpError::new(
2340                McpErrorCode::RequestCancelled,
2341                "Request budget exhausted",
2342            ));
2343        }
2344
2345        if budget.is_past_deadline(wall_now()) {
2346            cx.cancel_fast(CancelKind::Deadline);
2347            return Err(McpError::new(
2348                McpErrorCode::RequestCancelled,
2349                "Request timeout exceeded",
2350            ));
2351        }
2352
2353        if !session.initialized && request.method != "initialize" && request.method != "ping" {
2354            return Err(McpError::invalid_request(
2355                "Server not initialized. Client must send 'initialize' first.",
2356            ));
2357        }
2358
2359        if let Some(task_manager) = &self.task_manager {
2360            task_manager.set_notification_sender(Arc::clone(notification_sender));
2361        }
2362
2363        let mut mw_ctx = McpContext::with_state(cx.clone(), request_id, session.state.clone());
2364        let request_auth = if self.should_authenticate(&request.method) {
2365            let auth_request = AuthRequest {
2366                method: &request.method,
2367                params: request.params.as_ref(),
2368                request_id,
2369            };
2370            match self.authenticate_request_with_state(cx, request_id, &session.state, auth_request)
2371            {
2372                Ok(auth) => Some(auth),
2373                Err(err) => {
2374                    let err = self.apply_global_middleware_error(&mw_ctx, &request, err);
2375                    let result = Err(err);
2376                    self.maybe_emit_log_notification_for_level(
2377                        session.log_level,
2378                        notification_sender,
2379                        &request.method,
2380                        &result,
2381                    );
2382                    return result;
2383                }
2384            }
2385        } else {
2386            None
2387        };
2388        if let Some(auth) = request_auth.clone() {
2389            mw_ctx = mw_ctx.with_auth(auth);
2390        }
2391
2392        let mut entered_middleware: Vec<&dyn crate::Middleware> = Vec::new();
2393
2394        for m in self.middleware.iter() {
2395            entered_middleware.push(m.as_ref());
2396            match m.on_request(&mw_ctx, &request) {
2397                Ok(crate::MiddlewareDecision::Continue) => {}
2398                Ok(crate::MiddlewareDecision::Respond(v)) => {
2399                    let result =
2400                        self.apply_middleware_response(&entered_middleware, &mw_ctx, &request, v);
2401                    self.maybe_emit_log_notification_for_level(
2402                        session.log_level,
2403                        notification_sender,
2404                        &request.method,
2405                        &result,
2406                    );
2407                    return result;
2408                }
2409                Err(e) => {
2410                    let err =
2411                        self.apply_middleware_error(&entered_middleware, &mw_ctx, &request, e);
2412                    let result = Err(err);
2413                    self.maybe_emit_log_notification_for_level(
2414                        session.log_level,
2415                        notification_sender,
2416                        &request.method,
2417                        &result,
2418                    );
2419                    return result;
2420                }
2421            }
2422        }
2423
2424        let dispatch_auth = mw_ctx.auth();
2425
2426        let result: Result<serde_json::Value, McpError> = (|| {
2427            let method = &request.method;
2428            let params = request.params.clone();
2429            let bidirectional_senders =
2430                self.create_bidirectional_senders_from_view(session, request_sender);
2431
2432            match method.as_str() {
2433                "tools/call" => {
2434                    let params: CallToolParams = parse_params(params)?;
2435                    let result = self.router.handle_tools_call(
2436                        cx,
2437                        request_id,
2438                        params,
2439                        budget,
2440                        session.state.clone(),
2441                        dispatch_auth.clone(),
2442                        Some(notification_sender),
2443                        bidirectional_senders.as_ref(),
2444                    )?;
2445                    Ok(serde_json::to_value(result).map_err(McpError::from)?)
2446                }
2447                "resources/read" => {
2448                    let params: ReadResourceParams = parse_params(params)?;
2449                    let result = self.router.handle_resources_read(
2450                        cx,
2451                        request_id,
2452                        &params,
2453                        budget,
2454                        session.state.clone(),
2455                        dispatch_auth.clone(),
2456                        Some(notification_sender),
2457                        bidirectional_senders.as_ref(),
2458                    )?;
2459                    Ok(serde_json::to_value(result).map_err(McpError::from)?)
2460                }
2461                "prompts/get" => {
2462                    let params: GetPromptParams = parse_params(params)?;
2463                    let result = self.router.handle_prompts_get(
2464                        cx,
2465                        request_id,
2466                        params,
2467                        budget,
2468                        session.state.clone(),
2469                        dispatch_auth.clone(),
2470                        Some(notification_sender),
2471                        bidirectional_senders.as_ref(),
2472                    )?;
2473                    Ok(serde_json::to_value(result).map_err(McpError::from)?)
2474                }
2475                _ => Err(McpError::method_not_found(method)),
2476            }
2477        })();
2478
2479        let final_result = match result {
2480            Ok(v) => self.apply_middleware_response(&entered_middleware, &mw_ctx, &request, v),
2481            Err(e) => Err(self.apply_middleware_error(&entered_middleware, &mw_ctx, &request, e)),
2482        };
2483
2484        self.maybe_emit_log_notification_for_level(
2485            session.log_level,
2486            notification_sender,
2487            &request.method,
2488            &final_result,
2489        );
2490
2491        final_result
2492    }
2493
2494    fn apply_middleware_response(
2495        &self,
2496        stack: &[&dyn crate::Middleware],
2497        ctx: &McpContext,
2498        request: &JsonRpcRequest,
2499        value: serde_json::Value,
2500    ) -> Result<serde_json::Value, McpError> {
2501        let mut response = value;
2502        for m in stack.iter().rev() {
2503            match m.on_response(ctx, request, response) {
2504                Ok(next) => response = next,
2505                Err(err) => {
2506                    let mapped = self.apply_middleware_error(stack, ctx, request, err);
2507                    return Err(mapped);
2508                }
2509            }
2510        }
2511        Ok(response)
2512    }
2513
2514    fn apply_middleware_error(
2515        &self,
2516        stack: &[&dyn crate::Middleware],
2517        ctx: &McpContext,
2518        request: &JsonRpcRequest,
2519        error: McpError,
2520    ) -> McpError {
2521        let mut err = error;
2522        for m in stack.iter().rev() {
2523            err = m.on_error(ctx, request, err);
2524        }
2525        err
2526    }
2527
2528    fn apply_global_middleware_error(
2529        &self,
2530        ctx: &McpContext,
2531        request: &JsonRpcRequest,
2532        error: McpError,
2533    ) -> McpError {
2534        let mut err = error;
2535        for m in self.middleware.iter().rev() {
2536            err = m.on_error(ctx, request, err);
2537        }
2538        err
2539    }
2540
2541    /// Creates bidirectional senders based on client capabilities.
2542    ///
2543    /// Returns `Some(BidirectionalSenders)` if the client supports any bidirectional
2544    /// features (sampling, elicitation), or `None` if no features are supported.
2545    fn create_bidirectional_senders(
2546        &self,
2547        session: &Session,
2548        request_sender: &bidirectional::RequestSender,
2549    ) -> Option<handler::BidirectionalSenders> {
2550        self.create_bidirectional_senders_from_capabilities(
2551            session.supports_sampling(),
2552            session.supports_elicitation(),
2553            request_sender,
2554        )
2555    }
2556
2557    fn create_bidirectional_senders_from_view(
2558        &self,
2559        session: &SessionView,
2560        request_sender: &bidirectional::RequestSender,
2561    ) -> Option<handler::BidirectionalSenders> {
2562        self.create_bidirectional_senders_from_capabilities(
2563            session.supports_sampling,
2564            session.supports_elicitation,
2565            request_sender,
2566        )
2567    }
2568
2569    fn create_bidirectional_senders_from_capabilities(
2570        &self,
2571        supports_sampling: bool,
2572        supports_elicitation: bool,
2573        request_sender: &bidirectional::RequestSender,
2574    ) -> Option<handler::BidirectionalSenders> {
2575        if !supports_sampling && !supports_elicitation {
2576            return None;
2577        }
2578
2579        let mut senders = handler::BidirectionalSenders::new();
2580
2581        if supports_sampling {
2582            let sampling_sender: Arc<dyn fastmcp_core::SamplingSender> = Arc::new(
2583                bidirectional::TransportSamplingSender::new(request_sender.clone()),
2584            );
2585            senders = senders.with_sampling(sampling_sender);
2586        }
2587
2588        if supports_elicitation {
2589            let elicitation_sender: Arc<dyn fastmcp_core::ElicitationSender> = Arc::new(
2590                bidirectional::TransportElicitationSender::new(request_sender.clone()),
2591            );
2592            senders = senders.with_elicitation(elicitation_sender);
2593        }
2594
2595        Some(senders)
2596    }
2597
2598    fn should_authenticate(&self, method: &str) -> bool {
2599        !matches!(
2600            method,
2601            "initialize" | "initialized" | "notifications/cancelled" | "ping"
2602        )
2603    }
2604
2605    fn authenticate_request(
2606        &self,
2607        cx: &Cx,
2608        request_id: u64,
2609        session: &Session,
2610        request: AuthRequest<'_>,
2611    ) -> Result<AuthContext, McpError> {
2612        let Some(provider) = &self.auth_provider else {
2613            return Ok(AuthContext::anonymous());
2614        };
2615
2616        let ctx = McpContext::with_state(cx.clone(), request_id, session.state().clone());
2617        let auth = provider.authenticate(&ctx, request)?;
2618        if !ctx.set_auth(auth.clone()) {
2619            debug!(
2620                target: targets::SESSION,
2621                "Auth context not stored (session state unavailable)"
2622            );
2623        }
2624        Ok(auth)
2625    }
2626
2627    fn authenticate_request_with_state(
2628        &self,
2629        cx: &Cx,
2630        request_id: u64,
2631        session_state: &SessionState,
2632        request: AuthRequest<'_>,
2633    ) -> Result<AuthContext, McpError> {
2634        let Some(provider) = &self.auth_provider else {
2635            return Ok(AuthContext::anonymous());
2636        };
2637
2638        let ctx = McpContext::with_state(cx.clone(), request_id, session_state.clone());
2639        let auth = provider.authenticate(&ctx, request)?;
2640        if !ctx.set_auth(auth.clone()) {
2641            debug!(
2642                target: targets::SESSION,
2643                "Auth context not stored (session state unavailable)"
2644            );
2645        }
2646        Ok(auth)
2647    }
2648
2649    fn handle_cancelled_notification(&self, params: CancelledParams) {
2650        let reason = params.reason.as_deref().unwrap_or("unspecified");
2651        let await_cleanup = params.await_cleanup.unwrap_or(false);
2652        info!(
2653            target: targets::SESSION,
2654            "Cancellation requested for requestId={} (reason: {}, await_cleanup={})",
2655            params.request_id,
2656            reason,
2657            await_cleanup
2658        );
2659        let active = {
2660            let guard = self.active_requests.lock().unwrap_or_else(|poisoned| {
2661                error!(target: targets::SERVER, "active_requests lock poisoned, recovering");
2662                poisoned.into_inner()
2663            });
2664            guard
2665                .get(&params.request_id)
2666                .map(|entry| (entry.cx.clone(), entry.region_id, entry.completion.clone()))
2667        };
2668        if let Some((cx, region_id, completion)) = active {
2669            cx.cancel_with(CancelKind::User, None);
2670            if await_cleanup {
2671                let completed = completion.wait_timeout(AWAIT_CLEANUP_TIMEOUT);
2672                if !completed {
2673                    fastmcp_core::logging::warn!(
2674                        target: targets::SESSION,
2675                        "await_cleanup timed out for requestId={} (region={:?})",
2676                        params.request_id,
2677                        region_id
2678                    );
2679                }
2680            }
2681        } else {
2682            fastmcp_core::logging::warn!(
2683                target: targets::SESSION,
2684                "No active request found for cancellation requestId={}",
2685                params.request_id
2686            );
2687        }
2688    }
2689
2690    fn cancel_active_requests(&self, kind: CancelKind, await_cleanup: bool) {
2691        let active: Vec<(RequestId, RegionId, Cx, Arc<RequestCompletion>)> = {
2692            let guard = self.active_requests.lock().unwrap_or_else(|poisoned| {
2693                error!(target: targets::SERVER, "active_requests lock poisoned in cancel_active_requests, recovering");
2694                poisoned.into_inner()
2695            });
2696            guard
2697                .iter()
2698                .map(|(request_id, entry)| {
2699                    (
2700                        request_id.clone(),
2701                        entry.region_id,
2702                        entry.cx.clone(),
2703                        entry.completion.clone(),
2704                    )
2705                })
2706                .collect()
2707        };
2708        if active.is_empty() {
2709            return;
2710        }
2711        info!(
2712            target: targets::SESSION,
2713            "Cancelling {} active request(s) (kind={:?}, await_cleanup={})",
2714            active.len(),
2715            kind,
2716            await_cleanup
2717        );
2718        for (_, _, cx, _) in &active {
2719            cx.cancel_with(kind, None);
2720        }
2721
2722        if await_cleanup {
2723            for (request_id, region_id, _cx, completion) in active {
2724                let completed = completion.wait_timeout(AWAIT_CLEANUP_TIMEOUT);
2725                if !completed {
2726                    fastmcp_core::logging::warn!(
2727                        target: targets::SESSION,
2728                        "Shutdown cancel timed out for requestId={} (region={:?})",
2729                        request_id,
2730                        region_id
2731                    );
2732                }
2733            }
2734        }
2735    }
2736
2737    fn handle_set_log_level(&self, session: &mut Session, params: SetLogLevelParams) {
2738        let requested = match params.level {
2739            LogLevel::Debug => LevelFilter::Debug,
2740            LogLevel::Info => LevelFilter::Info,
2741            LogLevel::Warning => LevelFilter::Warn,
2742            LogLevel::Error => LevelFilter::Error,
2743        };
2744
2745        let configured = self.logging.level.to_level_filter();
2746        let effective = if requested > configured {
2747            configured
2748        } else {
2749            requested
2750        };
2751
2752        log::set_max_level(effective);
2753
2754        let effective_level = match effective {
2755            LevelFilter::Debug => LogLevel::Debug,
2756            LevelFilter::Info => LogLevel::Info,
2757            LevelFilter::Warn => LogLevel::Warning,
2758            LevelFilter::Error => LogLevel::Error,
2759            _ => LogLevel::Info,
2760        };
2761        session.set_log_level(effective_level);
2762
2763        if effective != requested {
2764            fastmcp_core::logging::warn!(
2765                target: targets::SESSION,
2766                "Client requested log level {:?}; clamped to server level {:?}",
2767                params.level,
2768                effective
2769            );
2770        } else {
2771            info!(
2772                target: targets::SESSION,
2773                "Log level set to {:?}",
2774                params.level
2775            );
2776        }
2777    }
2778
2779    fn log_level_rank(level: LogLevel) -> u8 {
2780        match level {
2781            LogLevel::Debug => 1,
2782            LogLevel::Info => 2,
2783            LogLevel::Warning => 3,
2784            LogLevel::Error => 4,
2785        }
2786    }
2787
2788    fn emit_log_notification_for_level(
2789        &self,
2790        min_level: Option<LogLevel>,
2791        sender: &NotificationSender,
2792        level: LogLevel,
2793        message: impl Into<String>,
2794    ) {
2795        let Some(min_level) = min_level else {
2796            return;
2797        };
2798        if Self::log_level_rank(level) < Self::log_level_rank(min_level) {
2799            return;
2800        }
2801
2802        let ts = chrono::Utc::now().to_rfc3339();
2803        let text = format!("{ts} {}", message.into());
2804        let params = LogMessageParams {
2805            level,
2806            logger: Some("fastmcp_rust::server".to_string()),
2807            data: serde_json::Value::String(text),
2808        };
2809        let payload = match serde_json::to_value(params) {
2810            Ok(value) => value,
2811            Err(err) => {
2812                fastmcp_core::logging::warn!(
2813                    target: targets::SESSION,
2814                    "Failed to serialize log message notification: {}",
2815                    err
2816                );
2817                return;
2818            }
2819        };
2820        sender(JsonRpcRequest::notification(
2821            "notifications/message",
2822            Some(payload),
2823        ));
2824    }
2825
2826    fn emit_log_notification(
2827        &self,
2828        session: &Session,
2829        sender: &NotificationSender,
2830        level: LogLevel,
2831        message: impl Into<String>,
2832    ) {
2833        self.emit_log_notification_for_level(session.log_level(), sender, level, message);
2834    }
2835
2836    fn maybe_emit_log_notification_for_level(
2837        &self,
2838        min_level: Option<LogLevel>,
2839        sender: &NotificationSender,
2840        method: &str,
2841        result: &McpResult<serde_json::Value>,
2842    ) {
2843        if method.starts_with("notifications/") || method == "logging/setLevel" {
2844            return;
2845        }
2846        let level = if result.is_ok() {
2847            LogLevel::Info
2848        } else {
2849            LogLevel::Error
2850        };
2851        let message = if result.is_ok() {
2852            format!("Handled {}", method)
2853        } else {
2854            format!("Error handling {}", method)
2855        };
2856        self.emit_log_notification_for_level(min_level, sender, level, message);
2857    }
2858
2859    fn maybe_emit_log_notification(
2860        &self,
2861        session: &Session,
2862        sender: &NotificationSender,
2863        method: &str,
2864        result: &McpResult<serde_json::Value>,
2865    ) {
2866        if method.starts_with("notifications/") || method == "logging/setLevel" {
2867            return;
2868        }
2869        let level = if result.is_ok() {
2870            LogLevel::Info
2871        } else {
2872            LogLevel::Error
2873        };
2874        let message = if result.is_ok() {
2875            format!("Handled {}", method)
2876        } else {
2877            format!("Error handling {}", method)
2878        };
2879        self.emit_log_notification(session, sender, level, message);
2880    }
2881}
2882
2883const AWAIT_CLEANUP_TIMEOUT: Duration = Duration::from_secs(5);
2884
2885struct RequestCompletion {
2886    done: Mutex<bool>,
2887    cv: Condvar,
2888}
2889
2890impl RequestCompletion {
2891    fn new() -> Self {
2892        Self {
2893            done: Mutex::new(false),
2894            cv: Condvar::new(),
2895        }
2896    }
2897
2898    fn mark_done(&self) {
2899        let mut done = self
2900            .done
2901            .lock()
2902            .unwrap_or_else(std::sync::PoisonError::into_inner);
2903        if !*done {
2904            *done = true;
2905            self.cv.notify_all();
2906        }
2907    }
2908
2909    fn wait_timeout(&self, timeout: Duration) -> bool {
2910        let mut done = self
2911            .done
2912            .lock()
2913            .unwrap_or_else(std::sync::PoisonError::into_inner);
2914        if *done {
2915            return true;
2916        }
2917        let start = Instant::now();
2918        let mut remaining = timeout;
2919        loop {
2920            let (guard, result) = self
2921                .cv
2922                .wait_timeout(done, remaining)
2923                .unwrap_or_else(std::sync::PoisonError::into_inner);
2924            done = guard;
2925            if *done {
2926                return true;
2927            }
2928            if result.timed_out() {
2929                return false;
2930            }
2931            let elapsed = start.elapsed();
2932            remaining = match timeout.checked_sub(elapsed) {
2933                Some(left) if !left.is_zero() => left,
2934                _ => return false,
2935            };
2936        }
2937    }
2938
2939    fn is_done(&self) -> bool {
2940        let done = self
2941            .done
2942            .lock()
2943            .unwrap_or_else(std::sync::PoisonError::into_inner);
2944        *done
2945    }
2946}
2947
2948struct ActiveRequest {
2949    cx: Cx,
2950    region_id: RegionId,
2951    completion: Arc<RequestCompletion>,
2952}
2953
2954impl ActiveRequest {
2955    fn new(cx: Cx, completion: Arc<RequestCompletion>) -> Self {
2956        let region_id = cx.region_id();
2957        Self {
2958            cx,
2959            region_id,
2960            completion,
2961        }
2962    }
2963}
2964
2965struct ActiveRequestGuard<'a> {
2966    map: &'a Mutex<HashMap<RequestId, ActiveRequest>>,
2967    id: RequestId,
2968    completion: Arc<RequestCompletion>,
2969}
2970
2971impl<'a> ActiveRequestGuard<'a> {
2972    fn try_new(
2973        map: &'a Mutex<HashMap<RequestId, ActiveRequest>>,
2974        id: RequestId,
2975        cx: Cx,
2976    ) -> Result<Self, RequestId> {
2977        let completion = Arc::new(RequestCompletion::new());
2978        let entry = ActiveRequest::new(cx, completion.clone());
2979        let mut guard = map
2980            .lock()
2981            .unwrap_or_else(std::sync::PoisonError::into_inner);
2982        if guard.contains_key(&id) {
2983            fastmcp_core::logging::warn!(
2984                target: targets::SESSION,
2985                "Duplicate active requestId={} rejected while an earlier request is still running",
2986                id
2987            );
2988            return Err(id);
2989        }
2990        guard.insert(id.clone(), entry);
2991        Ok(Self {
2992            map,
2993            id,
2994            completion,
2995        })
2996    }
2997}
2998
2999impl Drop for ActiveRequestGuard<'_> {
3000    fn drop(&mut self) {
3001        {
3002            let mut guard = self
3003                .map
3004                .lock()
3005                .unwrap_or_else(std::sync::PoisonError::into_inner);
3006            match guard.get(&self.id) {
3007                Some(entry) if Arc::ptr_eq(&entry.completion, &self.completion) => {
3008                    guard.remove(&self.id);
3009                }
3010                Some(_) => {
3011                    fastmcp_core::logging::warn!(
3012                        target: targets::SESSION,
3013                        "Active request replaced before drop for requestId={}",
3014                        self.id
3015                    );
3016                }
3017                None => {
3018                    fastmcp_core::logging::warn!(
3019                        target: targets::SESSION,
3020                        "Active request missing on drop for requestId={}",
3021                        self.id
3022                    );
3023                }
3024            }
3025        }
3026        self.completion.mark_done();
3027    }
3028}
3029
3030/// Checks if banner should be suppressed via environment variable.
3031///
3032/// This is a legacy check. Prefer using `ConsoleConfig` for banner control.
3033fn banner_suppressed() -> bool {
3034    std::env::var("FASTMCP_NO_BANNER")
3035        .map(|value| matches!(value.to_lowercase().as_str(), "1" | "true" | "yes"))
3036        .unwrap_or(false)
3037}
3038
3039/// Parses required parameters from JSON.
3040fn parse_params<T: serde::de::DeserializeOwned>(
3041    params: Option<serde_json::Value>,
3042) -> Result<T, McpError> {
3043    let value = params.ok_or_else(|| McpError::invalid_params("Missing required parameters"))?;
3044    serde_json::from_value(value).map_err(|e| McpError::invalid_params(e.to_string()))
3045}
3046
3047/// Parses optional parameters from JSON, using default if not provided.
3048fn parse_params_or_default<T: serde::de::DeserializeOwned + Default>(
3049    params: Option<serde_json::Value>,
3050) -> Result<T, McpError> {
3051    match params {
3052        Some(value) => {
3053            serde_json::from_value(value).map_err(|e| McpError::invalid_params(e.to_string()))
3054        }
3055        None => Ok(T::default()),
3056    }
3057}
3058
3059/// Converts a JSON-RPC RequestId to a u64 for internal tracking.
3060///
3061/// If the ID is a number, uses that number. If it's a string or absent,
3062/// uses a stable hash (string) or 0 (absent) as a fallback.
3063fn request_id_to_u64(id: Option<&RequestId>) -> u64 {
3064    match id {
3065        Some(RequestId::Number(n)) => *n as u64,
3066        Some(RequestId::String(s)) => stable_hash_request_id(s),
3067        None => 0,
3068    }
3069}
3070
3071fn stable_hash_request_id(value: &str) -> u64 {
3072    const FNV_OFFSET: u64 = 0xcbf29ce484222325;
3073    const FNV_PRIME: u64 = 0x0000_0100_0000_01b3;
3074    let mut hash = FNV_OFFSET;
3075    for byte in value.as_bytes() {
3076        hash ^= u64::from(*byte);
3077        hash = hash.wrapping_mul(FNV_PRIME);
3078    }
3079    if hash == 0 { FNV_OFFSET } else { hash }
3080}
3081
3082struct SharedTransport<T> {
3083    inner: Arc<Mutex<T>>,
3084}
3085
3086impl<T> Clone for SharedTransport<T> {
3087    fn clone(&self) -> Self {
3088        Self {
3089            inner: Arc::clone(&self.inner),
3090        }
3091    }
3092}
3093
3094impl<T: Transport> SharedTransport<T> {
3095    fn new(transport: T) -> Self {
3096        Self {
3097            inner: Arc::new(Mutex::new(transport)),
3098        }
3099    }
3100
3101    fn recv(&self, cx: &Cx) -> Result<JsonRpcMessage, TransportError> {
3102        let mut guard = self.inner.lock().map_err(|_| transport_lock_error())?;
3103        guard.recv(cx)
3104    }
3105
3106    fn send(&self, cx: &Cx, message: &JsonRpcMessage) -> Result<(), TransportError> {
3107        let mut guard = self.inner.lock().map_err(|_| transport_lock_error())?;
3108        guard.send(cx, message)
3109    }
3110}
3111
3112fn transport_lock_error() -> TransportError {
3113    TransportError::Io(std::io::Error::other("transport lock poisoned"))
3114}
3115
3116fn create_transport_notification_sender<T>(transport: SharedTransport<T>) -> NotificationSender
3117where
3118    T: Transport + Send + 'static,
3119{
3120    let cx = Cx::for_request();
3121
3122    Arc::new(move |request: JsonRpcRequest| {
3123        let message = JsonRpcMessage::Request(request);
3124        if let Err(e) = transport.send(&cx, &message) {
3125            log::error!(
3126                target: targets::TRANSPORT,
3127                "Failed to send notification: {}",
3128                e
3129            );
3130        }
3131    })
3132}
3133
3134/// Creates a notification sender that writes JSON-RPC notifications to stdout.
3135///
3136/// This creates a separate stdout handle for sending notifications, allowing
3137/// notifications (like progress updates) to be sent during handler execution
3138/// independently of the main transport.
3139///
3140/// The sender uses NDJSON format (newline-delimited JSON) to match the
3141/// standard MCP transport format.
3142fn create_notification_sender() -> NotificationSender {
3143    use std::sync::Mutex;
3144
3145    // Use AsyncStdout so notifications share the global stdout lock used by
3146    // the transport writer, preventing interleaved NDJSON writes.
3147    let stdout = Mutex::new(AsyncStdout::new());
3148    let codec = Codec::new();
3149
3150    Arc::new(move |request: JsonRpcRequest| {
3151        let bytes = match codec.encode_request(&request) {
3152            Ok(b) => b,
3153            Err(e) => {
3154                log::error!(target: targets::SERVER, "Failed to encode notification: {}", e);
3155                return;
3156            }
3157        };
3158
3159        if let Ok(mut stdout) = stdout.lock() {
3160            if let Err(e) = stdout.write_all_unchecked(&bytes) {
3161                log::error!(target: targets::TRANSPORT, "Failed to send notification: {}", e);
3162            }
3163            if let Err(e) = stdout.flush_unchecked() {
3164                log::error!(target: targets::TRANSPORT, "Failed to flush notification: {}", e);
3165            }
3166        } else {
3167            log::warn!(target: targets::SERVER, "Failed to acquire stdout lock for notification");
3168        }
3169    })
3170}
3171
3172#[cfg(test)]
3173mod lib_unit_tests {
3174    use super::*;
3175    use fastmcp_derive::tool;
3176    use fastmcp_protocol::{CallToolResult, Content};
3177    use std::sync::OnceLock;
3178    use std::sync::atomic::{AtomicUsize, Ordering};
3179    use std::thread;
3180    use std::time::Duration;
3181
3182    #[derive(Debug, Default)]
3183    struct HttpOverlapMetrics {
3184        current: AtomicUsize,
3185        max: AtomicUsize,
3186    }
3187
3188    static HTTP_OVERLAP_METRICS: OnceLock<HttpOverlapMetrics> = OnceLock::new();
3189    static HTTP_OVERLAP_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
3190
3191    fn http_overlap_metrics() -> &'static HttpOverlapMetrics {
3192        HTTP_OVERLAP_METRICS.get_or_init(HttpOverlapMetrics::default)
3193    }
3194
3195    fn http_overlap_lock() -> &'static Mutex<()> {
3196        HTTP_OVERLAP_LOCK.get_or_init(|| Mutex::new(()))
3197    }
3198
3199    fn reset_http_overlap_metrics() {
3200        let metrics = http_overlap_metrics();
3201        metrics.current.store(0, Ordering::SeqCst);
3202        metrics.max.store(0, Ordering::SeqCst);
3203    }
3204
3205    fn test_request_sender() -> RequestSender {
3206        let pending = Arc::new(PendingRequests::new());
3207        let send_fn: bidirectional::TransportSendFn =
3208            Arc::new(|message| Err(format!("unexpected outbound message in test: {message:?}")));
3209        RequestSender::new(pending, send_fn)
3210    }
3211
3212    fn http_json_request(method: &str, params: serde_json::Value, id: i64) -> HttpRequest {
3213        let request = JsonRpcRequest::new(method, Some(params), id);
3214        HttpRequest::new(HttpMethod::Post, "/mcp")
3215            .with_header("content-type", "application/json")
3216            .with_body(serde_json::to_vec(&request).expect("serialize JSON-RPC request"))
3217    }
3218
3219    #[tool(
3220        name = "http_overlap_tool",
3221        description = "Records concurrent overlap for HTTP tests",
3222        annotations(read_only)
3223    )]
3224    fn http_overlap_tool(_ctx: &McpContext) -> String {
3225        let metrics = http_overlap_metrics();
3226        let current = metrics.current.fetch_add(1, Ordering::SeqCst) + 1;
3227        metrics.max.fetch_max(current, Ordering::SeqCst);
3228        thread::sleep(Duration::from_millis(100));
3229        metrics.current.fetch_sub(1, Ordering::SeqCst);
3230        "overlap-ok".to_string()
3231    }
3232
3233    #[tool(
3234        name = "http_auth_echo_tool_runtime",
3235        description = "Returns the request-scoped auth subject while recording overlap",
3236        annotations(read_only)
3237    )]
3238    fn http_auth_echo_tool_runtime(ctx: &McpContext) -> String {
3239        let metrics = http_overlap_metrics();
3240        let current = metrics.current.fetch_add(1, Ordering::SeqCst) + 1;
3241        metrics.max.fetch_max(current, Ordering::SeqCst);
3242        thread::sleep(Duration::from_millis(100));
3243        metrics.current.fetch_sub(1, Ordering::SeqCst);
3244        ctx.auth()
3245            .and_then(|auth| auth.subject)
3246            .unwrap_or_else(|| "anonymous".to_string())
3247    }
3248
3249    #[tool(
3250        name = "http_stateful_increment_tool",
3251        description = "Increments a session counter across HTTP requests"
3252    )]
3253    fn http_stateful_increment_tool(ctx: &McpContext) -> String {
3254        let count: i32 = ctx.get_state("http_counter").unwrap_or(0);
3255        let next = count + 1;
3256        assert!(ctx.set_state("http_counter", next));
3257        format!("Counter: {next}")
3258    }
3259
3260    #[tool(
3261        name = "http_current_auth_subject_tool",
3262        description = "Returns the current request auth subject",
3263        annotations(read_only)
3264    )]
3265    fn http_current_auth_subject_tool(ctx: &McpContext) -> String {
3266        ctx.auth()
3267            .and_then(|auth| auth.subject)
3268            .unwrap_or_else(|| "anonymous".to_string())
3269    }
3270
3271    #[tool(
3272        name = "http_current_auth_subject_exclusive_tool",
3273        description = "Returns the current request auth subject from the exclusive path"
3274    )]
3275    fn http_current_auth_subject_exclusive_tool(ctx: &McpContext) -> String {
3276        ctx.auth()
3277            .and_then(|auth| auth.subject)
3278            .unwrap_or_else(|| "anonymous".to_string())
3279    }
3280
3281    #[derive(Debug, Clone)]
3282    struct CapturingAuthMiddleware {
3283        seen: Arc<Mutex<Vec<(String, Option<String>)>>>,
3284    }
3285
3286    impl Middleware for CapturingAuthMiddleware {
3287        fn on_request(
3288            &self,
3289            ctx: &McpContext,
3290            request: &JsonRpcRequest,
3291        ) -> McpResult<MiddlewareDecision> {
3292            self.seen
3293                .lock()
3294                .expect("captured auth middleware mutex should not be poisoned")
3295                .push((
3296                    request.method.clone(),
3297                    ctx.auth().and_then(|auth| auth.subject),
3298                ));
3299            Ok(MiddlewareDecision::Continue)
3300        }
3301    }
3302
3303    #[derive(Debug, Clone)]
3304    struct OverridingAuthMiddleware {
3305        subject: &'static str,
3306    }
3307
3308    impl Middleware for OverridingAuthMiddleware {
3309        fn on_request(
3310            &self,
3311            ctx: &McpContext,
3312            _request: &JsonRpcRequest,
3313        ) -> McpResult<MiddlewareDecision> {
3314            ctx.set_auth(AuthContext::with_subject(self.subject));
3315            Ok(MiddlewareDecision::Continue)
3316        }
3317    }
3318
3319    #[derive(Debug)]
3320    struct AlwaysFailAuthProvider;
3321
3322    impl AuthProvider for AlwaysFailAuthProvider {
3323        fn authenticate(
3324            &self,
3325            _ctx: &McpContext,
3326            _request: AuthRequest<'_>,
3327        ) -> McpResult<AuthContext> {
3328            Err(McpError::invalid_request("auth failed"))
3329        }
3330    }
3331
3332    #[derive(Debug, Clone)]
3333    struct RewritingErrorMiddleware;
3334
3335    impl Middleware for RewritingErrorMiddleware {
3336        fn on_error(
3337            &self,
3338            _ctx: &McpContext,
3339            _request: &JsonRpcRequest,
3340            error: McpError,
3341        ) -> McpError {
3342            McpError::new(error.code, format!("rewritten: {}", error.message))
3343        }
3344    }
3345
3346    // ── parse_params ────────────────────────────────────────────────
3347
3348    #[test]
3349    fn parse_params_none_returns_error() {
3350        let result = parse_params::<serde_json::Value>(None);
3351        let err = result.unwrap_err();
3352        assert!(err.message.contains("Missing required parameters"));
3353    }
3354
3355    #[test]
3356    fn parse_params_invalid_json_returns_error() {
3357        // Pass a string where a struct is expected
3358        let result = parse_params::<ListToolsParams>(Some(serde_json::json!("not_an_object")));
3359        assert!(result.is_err());
3360    }
3361
3362    #[test]
3363    fn parse_params_valid_json_succeeds() {
3364        let result = parse_params::<ReadResourceParams>(Some(serde_json::json!({"uri": "x://y"})));
3365        let params = result.unwrap();
3366        assert_eq!(params.uri, "x://y");
3367    }
3368
3369    // ── parse_params_or_default ─────────────────────────────────────
3370
3371    #[test]
3372    fn parse_params_or_default_none_returns_default() {
3373        let result = parse_params_or_default::<ListToolsParams>(None);
3374        let params = result.unwrap();
3375        assert!(params.cursor.is_none());
3376    }
3377
3378    #[test]
3379    fn parse_params_or_default_invalid_json_returns_error() {
3380        let result =
3381            parse_params_or_default::<ListToolsParams>(Some(serde_json::json!("bad_input")));
3382        assert!(result.is_err());
3383    }
3384
3385    #[test]
3386    fn parse_params_or_default_valid_json_succeeds() {
3387        let result =
3388            parse_params_or_default::<ListToolsParams>(Some(serde_json::json!({"cursor": "abc"})));
3389        let params = result.unwrap();
3390        assert_eq!(params.cursor.as_deref(), Some("abc"));
3391    }
3392
3393    // ── request_id_to_u64 ───────────────────────────────────────────
3394
3395    #[test]
3396    fn request_id_to_u64_number() {
3397        let id = RequestId::Number(42);
3398        assert_eq!(request_id_to_u64(Some(&id)), 42);
3399    }
3400
3401    #[test]
3402    fn request_id_to_u64_string() {
3403        let id = RequestId::String("req-123".to_string());
3404        let result = request_id_to_u64(Some(&id));
3405        assert_ne!(result, 0);
3406    }
3407
3408    #[test]
3409    fn request_id_to_u64_none() {
3410        assert_eq!(request_id_to_u64(None), 0);
3411    }
3412
3413    // ── stable_hash_request_id ──────────────────────────────────────
3414
3415    #[test]
3416    fn stable_hash_is_deterministic() {
3417        let h1 = stable_hash_request_id("test");
3418        let h2 = stable_hash_request_id("test");
3419        assert_eq!(h1, h2);
3420    }
3421
3422    #[test]
3423    fn stable_hash_never_returns_zero() {
3424        // Empty string and various inputs should never produce 0
3425        assert_ne!(stable_hash_request_id(""), 0);
3426        assert_ne!(stable_hash_request_id("a"), 0);
3427    }
3428
3429    #[test]
3430    fn stable_hash_different_inputs_differ() {
3431        let h1 = stable_hash_request_id("alpha");
3432        let h2 = stable_hash_request_id("beta");
3433        assert_ne!(h1, h2);
3434    }
3435
3436    // ── RequestCompletion ───────────────────────────────────────────
3437
3438    #[test]
3439    fn request_completion_new_is_not_done() {
3440        let rc = RequestCompletion::new();
3441        assert!(!rc.is_done());
3442    }
3443
3444    #[test]
3445    fn request_completion_mark_done_sets_done() {
3446        let rc = RequestCompletion::new();
3447        rc.mark_done();
3448        assert!(rc.is_done());
3449    }
3450
3451    #[test]
3452    fn request_completion_mark_done_idempotent() {
3453        let rc = RequestCompletion::new();
3454        rc.mark_done();
3455        rc.mark_done(); // should not panic
3456        assert!(rc.is_done());
3457    }
3458
3459    #[test]
3460    fn request_completion_wait_timeout_returns_true_if_done() {
3461        let rc = RequestCompletion::new();
3462        rc.mark_done();
3463        assert!(rc.wait_timeout(Duration::from_millis(10)));
3464    }
3465
3466    #[test]
3467    fn request_completion_wait_timeout_returns_false_if_not_done() {
3468        let rc = RequestCompletion::new();
3469        assert!(!rc.wait_timeout(Duration::from_millis(10)));
3470    }
3471
3472    // ── DuplicateBehavior ───────────────────────────────────────────
3473
3474    #[test]
3475    fn duplicate_behavior_default_is_warn() {
3476        assert_eq!(DuplicateBehavior::default(), DuplicateBehavior::Warn);
3477    }
3478
3479    #[test]
3480    fn duplicate_behavior_debug_and_clone() {
3481        let b = DuplicateBehavior::Error;
3482        let debug = format!("{:?}", b);
3483        assert!(debug.contains("Error"));
3484        let cloned = b;
3485        assert_eq!(cloned, DuplicateBehavior::Error);
3486    }
3487
3488    #[test]
3489    fn duplicate_behavior_all_variants_are_distinct() {
3490        assert_ne!(DuplicateBehavior::Error, DuplicateBehavior::Warn);
3491        assert_ne!(DuplicateBehavior::Warn, DuplicateBehavior::Replace);
3492        assert_ne!(DuplicateBehavior::Replace, DuplicateBehavior::Ignore);
3493    }
3494
3495    // ── LoggingConfig ───────────────────────────────────────────────
3496
3497    #[test]
3498    fn logging_config_default_values() {
3499        let config = LoggingConfig::default();
3500        assert_eq!(config.level, Level::Info);
3501        assert!(config.timestamps);
3502        assert!(config.targets);
3503        assert!(!config.file_line);
3504    }
3505
3506    // ── LifespanHooks ───────────────────────────────────────────────
3507
3508    #[test]
3509    fn lifespan_hooks_new_has_no_hooks() {
3510        let hooks = LifespanHooks::new();
3511        assert!(hooks.on_startup.is_none());
3512        assert!(hooks.on_shutdown.is_none());
3513    }
3514
3515    // ── log_level_rank ──────────────────────────────────────────────
3516
3517    #[test]
3518    fn log_level_rank_ordering() {
3519        assert!(Server::log_level_rank(LogLevel::Debug) < Server::log_level_rank(LogLevel::Info));
3520        assert!(Server::log_level_rank(LogLevel::Info) < Server::log_level_rank(LogLevel::Warning));
3521        assert!(
3522            Server::log_level_rank(LogLevel::Warning) < Server::log_level_rank(LogLevel::Error)
3523        );
3524    }
3525
3526    // ── ActiveRequestGuard ──────────────────────────────────────────
3527
3528    #[test]
3529    fn active_request_guard_removes_on_drop() {
3530        let map = Mutex::new(HashMap::new());
3531        let cx = Cx::for_testing();
3532        let id = RequestId::Number(1);
3533        {
3534            let _guard = ActiveRequestGuard::try_new(&map, id.clone(), cx).expect("insert guard");
3535            assert_eq!(map.lock().unwrap().len(), 1);
3536        }
3537        // After drop, the entry should be removed
3538        assert_eq!(map.lock().unwrap().len(), 0);
3539    }
3540
3541    #[test]
3542    fn active_request_guard_rejects_duplicate_request_id() {
3543        let map = Mutex::new(HashMap::new());
3544        let first = ActiveRequestGuard::try_new(&map, RequestId::Number(7), Cx::for_testing())
3545            .expect("first request should register");
3546        let duplicate = ActiveRequestGuard::try_new(&map, RequestId::Number(7), Cx::for_testing());
3547        assert!(
3548            duplicate.is_err(),
3549            "duplicate active request id must be rejected"
3550        );
3551        drop(first);
3552        assert!(map.lock().unwrap().is_empty());
3553    }
3554
3555    // =========================================================================
3556    // Additional coverage tests (bd-cd79)
3557    // =========================================================================
3558
3559    #[test]
3560    fn logging_config_debug_and_clone() {
3561        let config = LoggingConfig::default();
3562        let debug = format!("{config:?}");
3563        assert!(debug.contains("LoggingConfig"));
3564        assert!(debug.contains("Info"));
3565
3566        let cloned = config.clone();
3567        assert_eq!(cloned.level, Level::Info);
3568        assert_eq!(cloned.timestamps, config.timestamps);
3569    }
3570
3571    #[test]
3572    fn transport_lock_error_is_io() {
3573        let err = transport_lock_error();
3574        match err {
3575            TransportError::Io(io) => {
3576                assert!(io.to_string().contains("poisoned"));
3577            }
3578            other => panic!("expected Io variant, got: {other:?}"),
3579        }
3580    }
3581
3582    #[test]
3583    fn lifespan_hooks_default_matches_new() {
3584        let default_hooks = LifespanHooks::default();
3585        let new_hooks = LifespanHooks::new();
3586        assert!(default_hooks.on_startup.is_none());
3587        assert!(default_hooks.on_shutdown.is_none());
3588        assert!(new_hooks.on_startup.is_none());
3589        assert!(new_hooks.on_shutdown.is_none());
3590    }
3591
3592    #[test]
3593    fn request_completion_wait_resolves_on_concurrent_done() {
3594        use std::sync::Arc;
3595        use std::thread;
3596
3597        let rc = Arc::new(RequestCompletion::new());
3598        let rc_clone = rc.clone();
3599
3600        let handle = thread::spawn(move || {
3601            thread::sleep(Duration::from_millis(20));
3602            rc_clone.mark_done();
3603        });
3604
3605        // Should resolve within the timeout because the other thread marks done
3606        assert!(rc.wait_timeout(Duration::from_secs(2)));
3607        handle.join().unwrap();
3608    }
3609
3610    #[test]
3611    fn active_request_stores_region_id() {
3612        let cx = Cx::for_testing();
3613        let expected_region = cx.region_id();
3614        let completion = Arc::new(RequestCompletion::new());
3615        let ar = ActiveRequest::new(cx, completion);
3616        assert_eq!(ar.region_id, expected_region);
3617    }
3618
3619    #[test]
3620    fn http_request_execution_mode_classifies_methods() {
3621        let mut router = Router::new();
3622        router.add_tool(HttpOverlapTool);
3623        router.add_tool(HttpStatefulIncrementTool);
3624
3625        assert_eq!(
3626            HttpRequestExecutionMode::for_request(
3627                &router,
3628                &JsonRpcRequest::new(
3629                    "tools/call",
3630                    Some(serde_json::json!({
3631                        "name": "http_overlap_tool",
3632                        "arguments": {}
3633                    })),
3634                    1,
3635                ),
3636            ),
3637            HttpRequestExecutionMode::ConcurrentReadOnly
3638        );
3639        assert_eq!(
3640            HttpRequestExecutionMode::for_request(
3641                &router,
3642                &JsonRpcRequest::new(
3643                    "tools/call",
3644                    Some(serde_json::json!({
3645                        "name": "http_stateful_increment_tool",
3646                        "arguments": {}
3647                    })),
3648                    2,
3649                ),
3650            ),
3651            HttpRequestExecutionMode::ExclusiveSession
3652        );
3653        assert_eq!(
3654            HttpRequestExecutionMode::for_request(
3655                &router,
3656                &JsonRpcRequest::new("resources/read", None, 3),
3657            ),
3658            HttpRequestExecutionMode::ConcurrentReadOnly
3659        );
3660        assert_eq!(
3661            HttpRequestExecutionMode::for_request(
3662                &router,
3663                &JsonRpcRequest::new("prompts/get", None, 4)
3664            ),
3665            HttpRequestExecutionMode::ConcurrentReadOnly
3666        );
3667
3668        assert_eq!(
3669            HttpRequestExecutionMode::for_request(
3670                &router,
3671                &JsonRpcRequest::new("initialize", None, 5)
3672            ),
3673            HttpRequestExecutionMode::ExclusiveSession
3674        );
3675        assert_eq!(
3676            HttpRequestExecutionMode::for_request(
3677                &router,
3678                &JsonRpcRequest::new("logging/setLevel", None, 6),
3679            ),
3680            HttpRequestExecutionMode::ExclusiveSession
3681        );
3682        assert_eq!(
3683            HttpRequestExecutionMode::for_request(
3684                &router,
3685                &JsonRpcRequest::new("resources/subscribe", None, 7),
3686            ),
3687            HttpRequestExecutionMode::ExclusiveSession
3688        );
3689    }
3690
3691    #[test]
3692    fn http_read_only_requests_can_overlap_without_session_mutex_serialization() {
3693        let _guard = http_overlap_lock()
3694            .lock()
3695            .expect("http overlap test lock poisoned");
3696        reset_http_overlap_metrics();
3697
3698        let server = Arc::new(
3699            Server::new("http-test-server", "1.0.0")
3700                .tool(HttpOverlapTool)
3701                .build(),
3702        );
3703        let session = Arc::new(Mutex::new(Session::new(
3704            server.info.clone(),
3705            server.capabilities.clone(),
3706        )));
3707        session.lock().expect("session lock poisoned").initialize(
3708            fastmcp_protocol::ClientInfo {
3709                name: "http-test-client".to_string(),
3710                version: "1.0.0".to_string(),
3711            },
3712            fastmcp_protocol::ClientCapabilities::default(),
3713            "2024-11-05".to_string(),
3714        );
3715
3716        let http_handler = Arc::new(HttpRequestHandler::new());
3717        let notification_sender: NotificationSender = Arc::new(|_| {});
3718        let request_sender = test_request_sender();
3719        let start = Arc::new(std::sync::Barrier::new(3));
3720
3721        let run_request = |id| {
3722            let server = Arc::clone(&server);
3723            let session = Arc::clone(&session);
3724            let http_handler = Arc::clone(&http_handler);
3725            let notification_sender = Arc::clone(&notification_sender);
3726            let request_sender = request_sender.clone();
3727            let start = Arc::clone(&start);
3728            thread::spawn(move || {
3729                start.wait();
3730                let cx = Cx::for_testing();
3731                let request = http_json_request(
3732                    "tools/call",
3733                    serde_json::json!({
3734                        "name": "http_overlap_tool",
3735                        "arguments": {}
3736                    }),
3737                    id,
3738                );
3739                let traffic_renderer: Option<RequestResponseRenderer> = None;
3740                let response = server.handle_http_mcp_request(
3741                    &cx,
3742                    &session,
3743                    &http_handler,
3744                    &request,
3745                    &notification_sender,
3746                    &request_sender,
3747                    &traffic_renderer,
3748                );
3749                assert_eq!(response.status, HttpStatus::OK);
3750                let json: JsonRpcResponse =
3751                    serde_json::from_slice(&response.body).expect("parse HTTP JSON-RPC response");
3752                assert!(
3753                    json.error.is_none(),
3754                    "unexpected error response: {:?}",
3755                    json.error
3756                );
3757            })
3758        };
3759
3760        let first = run_request(1);
3761        let second = run_request(2);
3762        start.wait();
3763
3764        first.join().expect("first HTTP request thread panicked");
3765        second.join().expect("second HTTP request thread panicked");
3766
3767        let overlap = http_overlap_metrics().max.load(Ordering::SeqCst);
3768        assert!(
3769            overlap >= 2,
3770            "expected concurrent HTTP tools/call overlap, observed max overlap {overlap}"
3771        );
3772    }
3773
3774    #[test]
3775    fn http_read_only_requests_keep_request_auth_isolated() {
3776        #[derive(Debug)]
3777        struct EchoAuthProvider;
3778
3779        impl AuthProvider for EchoAuthProvider {
3780            fn authenticate(
3781                &self,
3782                _ctx: &McpContext,
3783                request: AuthRequest<'_>,
3784            ) -> McpResult<AuthContext> {
3785                let access = request
3786                    .access_token()
3787                    .ok_or_else(|| McpError::invalid_request("missing auth token"))?;
3788                Ok(AuthContext {
3789                    subject: Some(access.token.clone()),
3790                    token: Some(access),
3791                    ..AuthContext::default()
3792                })
3793            }
3794        }
3795
3796        let guard = http_overlap_lock()
3797            .lock()
3798            .expect("http overlap test lock poisoned");
3799        reset_http_overlap_metrics();
3800
3801        let server = Arc::new(
3802            Server::new("http-auth-test-server", "1.0.0")
3803                .auth_provider(EchoAuthProvider)
3804                .tool(HttpAuthEchoToolRuntime)
3805                .build(),
3806        );
3807        let session = Arc::new(Mutex::new(Session::new(
3808            server.info.clone(),
3809            server.capabilities.clone(),
3810        )));
3811        session.lock().expect("session lock poisoned").initialize(
3812            fastmcp_protocol::ClientInfo {
3813                name: "http-auth-test-client".to_string(),
3814                version: "1.0.0".to_string(),
3815            },
3816            fastmcp_protocol::ClientCapabilities::default(),
3817            "2024-11-05".to_string(),
3818        );
3819
3820        let http_handler = Arc::new(HttpRequestHandler::new());
3821        let notification_sender: NotificationSender = Arc::new(|_| {});
3822        let request_sender = test_request_sender();
3823        let start = Arc::new(std::sync::Barrier::new(3));
3824
3825        let run_request = |id, token: &'static str| {
3826            let server = Arc::clone(&server);
3827            let session = Arc::clone(&session);
3828            let http_handler = Arc::clone(&http_handler);
3829            let notification_sender = Arc::clone(&notification_sender);
3830            let request_sender = request_sender.clone();
3831            let start = Arc::clone(&start);
3832            thread::spawn(move || {
3833                start.wait();
3834                let cx = Cx::for_testing();
3835                let request = http_json_request(
3836                    "tools/call",
3837                    serde_json::json!({
3838                        "name": "http_auth_echo_tool_runtime",
3839                        "arguments": {},
3840                        "auth": format!("Bearer {token}")
3841                    }),
3842                    id,
3843                );
3844                let traffic_renderer: Option<RequestResponseRenderer> = None;
3845                let response = server.handle_http_mcp_request(
3846                    &cx,
3847                    &session,
3848                    &http_handler,
3849                    &request,
3850                    &notification_sender,
3851                    &request_sender,
3852                    &traffic_renderer,
3853                );
3854                assert_eq!(response.status, HttpStatus::OK);
3855                let json: JsonRpcResponse =
3856                    serde_json::from_slice(&response.body).expect("parse HTTP JSON-RPC response");
3857                let result = json.result.expect("authenticated request should succeed");
3858                let tool_result: CallToolResult =
3859                    serde_json::from_value(result).expect("parse tool result payload");
3860                assert!(
3861                    !tool_result.is_error,
3862                    "auth echo tool unexpectedly returned an error payload"
3863                );
3864                match tool_result.content.as_slice() {
3865                    [Content::Text { text }] => text.clone(),
3866                    other => panic!("expected single text tool result, got {other:?}"),
3867                }
3868            })
3869        };
3870
3871        let first = run_request(1, "alpha");
3872        let second = run_request(2, "beta");
3873        start.wait();
3874
3875        let first_result = first.join();
3876        let second_result = second.join();
3877        let overlap = http_overlap_metrics().max.load(Ordering::SeqCst);
3878        drop(guard);
3879
3880        let first_subject = first_result.expect("first auth request thread panicked");
3881        let second_subject = second_result.expect("second auth request thread panicked");
3882
3883        assert_eq!(first_subject, "alpha");
3884        assert_eq!(second_subject, "beta");
3885        assert!(
3886            overlap >= 2,
3887            "expected authenticated requests to overlap, observed max overlap {overlap}"
3888        );
3889    }
3890
3891    #[test]
3892    fn http_stateful_tool_calls_preserve_session_state_updates() {
3893        let server = Arc::new(
3894            Server::new("http-state-test-server", "1.0.0")
3895                .tool(HttpStatefulIncrementTool)
3896                .build(),
3897        );
3898        let session = Arc::new(Mutex::new(Session::new(
3899            server.info.clone(),
3900            server.capabilities.clone(),
3901        )));
3902        session.lock().expect("session lock poisoned").initialize(
3903            fastmcp_protocol::ClientInfo {
3904                name: "http-state-test-client".to_string(),
3905                version: "1.0.0".to_string(),
3906            },
3907            fastmcp_protocol::ClientCapabilities::default(),
3908            "2024-11-05".to_string(),
3909        );
3910
3911        let http_handler = Arc::new(HttpRequestHandler::new());
3912        let notification_sender: NotificationSender = Arc::new(|_| {});
3913        let request_sender = test_request_sender();
3914
3915        let run_request = |id| {
3916            let cx = Cx::for_testing();
3917            let request = http_json_request(
3918                "tools/call",
3919                serde_json::json!({
3920                    "name": "http_stateful_increment_tool",
3921                    "arguments": {}
3922                }),
3923                id,
3924            );
3925            let traffic_renderer: Option<RequestResponseRenderer> = None;
3926            let response = server.handle_http_mcp_request(
3927                &cx,
3928                &session,
3929                &http_handler,
3930                &request,
3931                &notification_sender,
3932                &request_sender,
3933                &traffic_renderer,
3934            );
3935            assert_eq!(response.status, HttpStatus::OK);
3936            let json: JsonRpcResponse =
3937                serde_json::from_slice(&response.body).expect("parse HTTP JSON-RPC response");
3938            let result = json.result.expect("stateful request should succeed");
3939            let tool_result: CallToolResult =
3940                serde_json::from_value(result).expect("parse tool result payload");
3941            assert!(!tool_result.is_error, "stateful tool unexpectedly errored");
3942            match tool_result.content.as_slice() {
3943                [Content::Text { text }] => text.clone(),
3944                other => panic!("expected single text tool result, got {other:?}"),
3945            }
3946        };
3947
3948        assert_eq!(run_request(1), "Counter: 1");
3949        assert_eq!(run_request(2), "Counter: 2");
3950    }
3951
3952    #[test]
3953    fn http_exclusive_requests_expose_request_auth_to_middleware() {
3954        #[derive(Debug)]
3955        struct EchoAuthProvider;
3956
3957        impl AuthProvider for EchoAuthProvider {
3958            fn authenticate(
3959                &self,
3960                _ctx: &McpContext,
3961                request: AuthRequest<'_>,
3962            ) -> McpResult<AuthContext> {
3963                let access = request
3964                    .access_token()
3965                    .ok_or_else(|| McpError::invalid_request("missing auth token"))?;
3966                Ok(AuthContext {
3967                    subject: Some(access.token.clone()),
3968                    token: Some(access),
3969                    ..AuthContext::default()
3970                })
3971            }
3972        }
3973
3974        let seen = Arc::new(Mutex::new(Vec::new()));
3975        let middleware = CapturingAuthMiddleware {
3976            seen: Arc::clone(&seen),
3977        };
3978        let server = Server::new("http-middleware-auth-test-server", "1.0.0")
3979            .auth_provider(EchoAuthProvider)
3980            .middleware(middleware)
3981            .build();
3982        let session = Arc::new(Mutex::new(Session::new(
3983            server.info.clone(),
3984            server.capabilities.clone(),
3985        )));
3986        session.lock().expect("session lock poisoned").initialize(
3987            fastmcp_protocol::ClientInfo {
3988                name: "http-middleware-client".to_string(),
3989                version: "1.0.0".to_string(),
3990            },
3991            fastmcp_protocol::ClientCapabilities::default(),
3992            "2024-11-05".to_string(),
3993        );
3994
3995        let http_handler = Arc::new(HttpRequestHandler::new());
3996        let notification_sender: NotificationSender = Arc::new(|_| {});
3997        let request_sender = test_request_sender();
3998        let request = http_json_request(
3999            "tools/list",
4000            serde_json::json!({
4001                "auth": "Bearer alpha"
4002            }),
4003            1,
4004        );
4005        let traffic_renderer: Option<RequestResponseRenderer> = None;
4006        let response = server.handle_http_mcp_request(
4007            &Cx::for_testing(),
4008            &session,
4009            &http_handler,
4010            &request,
4011            &notification_sender,
4012            &request_sender,
4013            &traffic_renderer,
4014        );
4015        assert_eq!(response.status, HttpStatus::OK);
4016
4017        let observed = seen
4018            .lock()
4019            .expect("captured auth middleware mutex should not be poisoned")
4020            .clone();
4021        assert_eq!(
4022            observed,
4023            vec![("tools/list".to_string(), Some("alpha".to_string()))]
4024        );
4025    }
4026
4027    #[test]
4028    fn http_read_only_requests_expose_request_auth_to_middleware() {
4029        #[derive(Debug)]
4030        struct EchoAuthProvider;
4031
4032        impl AuthProvider for EchoAuthProvider {
4033            fn authenticate(
4034                &self,
4035                _ctx: &McpContext,
4036                request: AuthRequest<'_>,
4037            ) -> McpResult<AuthContext> {
4038                let access = request
4039                    .access_token()
4040                    .ok_or_else(|| McpError::invalid_request("missing auth token"))?;
4041                Ok(AuthContext {
4042                    subject: Some(access.token.clone()),
4043                    token: Some(access),
4044                    ..AuthContext::default()
4045                })
4046            }
4047        }
4048
4049        let seen = Arc::new(Mutex::new(Vec::new()));
4050        let middleware = CapturingAuthMiddleware {
4051            seen: Arc::clone(&seen),
4052        };
4053        let server = Server::new("http-read-only-middleware-auth-test-server", "1.0.0")
4054            .auth_provider(EchoAuthProvider)
4055            .middleware(middleware)
4056            .tool(HttpCurrentAuthSubjectTool)
4057            .build();
4058        let session = Arc::new(Mutex::new(Session::new(
4059            server.info.clone(),
4060            server.capabilities.clone(),
4061        )));
4062        session.lock().expect("session lock poisoned").initialize(
4063            fastmcp_protocol::ClientInfo {
4064                name: "http-read-only-middleware-client".to_string(),
4065                version: "1.0.0".to_string(),
4066            },
4067            fastmcp_protocol::ClientCapabilities::default(),
4068            "2024-11-05".to_string(),
4069        );
4070
4071        let http_handler = Arc::new(HttpRequestHandler::new());
4072        let notification_sender: NotificationSender = Arc::new(|_| {});
4073        let request_sender = test_request_sender();
4074        let request = http_json_request(
4075            "tools/call",
4076            serde_json::json!({
4077                "name": "http_current_auth_subject_tool",
4078                "arguments": {},
4079                "auth": "Bearer beta"
4080            }),
4081            1,
4082        );
4083        let traffic_renderer: Option<RequestResponseRenderer> = None;
4084        let response = server.handle_http_mcp_request(
4085            &Cx::for_testing(),
4086            &session,
4087            &http_handler,
4088            &request,
4089            &notification_sender,
4090            &request_sender,
4091            &traffic_renderer,
4092        );
4093        assert_eq!(response.status, HttpStatus::OK);
4094
4095        let json: JsonRpcResponse =
4096            serde_json::from_slice(&response.body).expect("parse HTTP JSON-RPC response");
4097        let result = json.result.expect("read-only auth request should succeed");
4098        let tool_result: CallToolResult =
4099            serde_json::from_value(result).expect("parse tool result payload");
4100        match tool_result.content.as_slice() {
4101            [Content::Text { text }] => assert_eq!(text, "beta"),
4102            other => panic!("expected single text tool result, got {other:?}"),
4103        }
4104
4105        let observed = seen
4106            .lock()
4107            .expect("captured auth middleware mutex should not be poisoned")
4108            .clone();
4109        assert_eq!(
4110            observed,
4111            vec![("tools/call".to_string(), Some("beta".to_string()))]
4112        );
4113    }
4114
4115    #[test]
4116    fn http_exclusive_middleware_auth_mutation_reaches_handler_dispatch() {
4117        let server = Server::new("http-exclusive-auth-override-test-server", "1.0.0")
4118            .middleware(OverridingAuthMiddleware {
4119                subject: "exclusive-override",
4120            })
4121            .tool(HttpCurrentAuthSubjectExclusiveTool)
4122            .build();
4123        let session = Arc::new(Mutex::new(Session::new(
4124            server.info.clone(),
4125            server.capabilities.clone(),
4126        )));
4127        session.lock().expect("session lock poisoned").initialize(
4128            fastmcp_protocol::ClientInfo {
4129                name: "http-exclusive-auth-override-client".to_string(),
4130                version: "1.0.0".to_string(),
4131            },
4132            fastmcp_protocol::ClientCapabilities::default(),
4133            "2024-11-05".to_string(),
4134        );
4135
4136        let http_handler = Arc::new(HttpRequestHandler::new());
4137        let notification_sender: NotificationSender = Arc::new(|_| {});
4138        let request_sender = test_request_sender();
4139        let request = http_json_request(
4140            "tools/call",
4141            serde_json::json!({
4142                "name": "http_current_auth_subject_exclusive_tool",
4143                "arguments": {}
4144            }),
4145            1,
4146        );
4147        let traffic_renderer: Option<RequestResponseRenderer> = None;
4148        let response = server.handle_http_mcp_request(
4149            &Cx::for_testing(),
4150            &session,
4151            &http_handler,
4152            &request,
4153            &notification_sender,
4154            &request_sender,
4155            &traffic_renderer,
4156        );
4157        assert_eq!(response.status, HttpStatus::OK);
4158
4159        let json: JsonRpcResponse =
4160            serde_json::from_slice(&response.body).expect("parse HTTP JSON-RPC response");
4161        let result = json
4162            .result
4163            .expect("exclusive auth override request should succeed");
4164        let tool_result: CallToolResult =
4165            serde_json::from_value(result).expect("parse tool result payload");
4166        match tool_result.content.as_slice() {
4167            [Content::Text { text }] => assert_eq!(text, "exclusive-override"),
4168            other => panic!("expected single text tool result, got {other:?}"),
4169        }
4170    }
4171
4172    #[test]
4173    fn http_read_only_middleware_auth_mutation_reaches_handler_dispatch() {
4174        let server = Server::new("http-read-only-auth-override-test-server", "1.0.0")
4175            .middleware(OverridingAuthMiddleware {
4176                subject: "read-only-override",
4177            })
4178            .tool(HttpCurrentAuthSubjectTool)
4179            .build();
4180        let session = Arc::new(Mutex::new(Session::new(
4181            server.info.clone(),
4182            server.capabilities.clone(),
4183        )));
4184        session.lock().expect("session lock poisoned").initialize(
4185            fastmcp_protocol::ClientInfo {
4186                name: "http-read-only-auth-override-client".to_string(),
4187                version: "1.0.0".to_string(),
4188            },
4189            fastmcp_protocol::ClientCapabilities::default(),
4190            "2024-11-05".to_string(),
4191        );
4192
4193        let http_handler = Arc::new(HttpRequestHandler::new());
4194        let notification_sender: NotificationSender = Arc::new(|_| {});
4195        let request_sender = test_request_sender();
4196        let request = http_json_request(
4197            "tools/call",
4198            serde_json::json!({
4199                "name": "http_current_auth_subject_tool",
4200                "arguments": {}
4201            }),
4202            1,
4203        );
4204        let traffic_renderer: Option<RequestResponseRenderer> = None;
4205        let response = server.handle_http_mcp_request(
4206            &Cx::for_testing(),
4207            &session,
4208            &http_handler,
4209            &request,
4210            &notification_sender,
4211            &request_sender,
4212            &traffic_renderer,
4213        );
4214        assert_eq!(response.status, HttpStatus::OK);
4215
4216        let json: JsonRpcResponse =
4217            serde_json::from_slice(&response.body).expect("parse HTTP JSON-RPC response");
4218        let result = json
4219            .result
4220            .expect("read-only auth override request should succeed");
4221        let tool_result: CallToolResult =
4222            serde_json::from_value(result).expect("parse tool result payload");
4223        match tool_result.content.as_slice() {
4224            [Content::Text { text }] => assert_eq!(text, "read-only-override"),
4225            other => panic!("expected single text tool result, got {other:?}"),
4226        }
4227    }
4228
4229    #[test]
4230    fn http_exclusive_auth_failures_flow_through_middleware_error_rewriting() {
4231        let server = Server::new("http-exclusive-auth-error-test-server", "1.0.0")
4232            .auth_provider(AlwaysFailAuthProvider)
4233            .middleware(RewritingErrorMiddleware)
4234            .build();
4235        let session = Arc::new(Mutex::new(Session::new(
4236            server.info.clone(),
4237            server.capabilities.clone(),
4238        )));
4239        session.lock().expect("session lock poisoned").initialize(
4240            fastmcp_protocol::ClientInfo {
4241                name: "http-exclusive-auth-error-client".to_string(),
4242                version: "1.0.0".to_string(),
4243            },
4244            fastmcp_protocol::ClientCapabilities::default(),
4245            "2024-11-05".to_string(),
4246        );
4247
4248        let http_handler = Arc::new(HttpRequestHandler::new());
4249        let notification_sender: NotificationSender = Arc::new(|_| {});
4250        let request_sender = test_request_sender();
4251        let request = http_json_request(
4252            "tools/list",
4253            serde_json::json!({
4254                "auth": "Bearer nope"
4255            }),
4256            1,
4257        );
4258        let traffic_renderer: Option<RequestResponseRenderer> = None;
4259        let response = server.handle_http_mcp_request(
4260            &Cx::for_testing(),
4261            &session,
4262            &http_handler,
4263            &request,
4264            &notification_sender,
4265            &request_sender,
4266            &traffic_renderer,
4267        );
4268        assert_eq!(response.status, HttpStatus::OK);
4269
4270        let json: JsonRpcResponse =
4271            serde_json::from_slice(&response.body).expect("parse HTTP JSON-RPC response");
4272        let error = json
4273            .error
4274            .expect("auth failure should return JSON-RPC error");
4275        assert_eq!(error.message, "rewritten: auth failed");
4276    }
4277
4278    #[test]
4279    fn http_read_only_auth_failures_flow_through_middleware_error_rewriting() {
4280        let server = Server::new("http-read-only-auth-error-test-server", "1.0.0")
4281            .auth_provider(AlwaysFailAuthProvider)
4282            .middleware(RewritingErrorMiddleware)
4283            .tool(HttpCurrentAuthSubjectTool)
4284            .build();
4285        let session = Arc::new(Mutex::new(Session::new(
4286            server.info.clone(),
4287            server.capabilities.clone(),
4288        )));
4289        session.lock().expect("session lock poisoned").initialize(
4290            fastmcp_protocol::ClientInfo {
4291                name: "http-read-only-auth-error-client".to_string(),
4292                version: "1.0.0".to_string(),
4293            },
4294            fastmcp_protocol::ClientCapabilities::default(),
4295            "2024-11-05".to_string(),
4296        );
4297
4298        let http_handler = Arc::new(HttpRequestHandler::new());
4299        let notification_sender: NotificationSender = Arc::new(|_| {});
4300        let request_sender = test_request_sender();
4301        let request = http_json_request(
4302            "tools/call",
4303            serde_json::json!({
4304                "name": "http_current_auth_subject_tool",
4305                "arguments": {},
4306                "auth": "Bearer nope"
4307            }),
4308            1,
4309        );
4310        let traffic_renderer: Option<RequestResponseRenderer> = None;
4311        let response = server.handle_http_mcp_request(
4312            &Cx::for_testing(),
4313            &session,
4314            &http_handler,
4315            &request,
4316            &notification_sender,
4317            &request_sender,
4318            &traffic_renderer,
4319        );
4320        assert_eq!(response.status, HttpStatus::OK);
4321
4322        let json: JsonRpcResponse =
4323            serde_json::from_slice(&response.body).expect("parse HTTP JSON-RPC response");
4324        let error = json
4325            .error
4326            .expect("auth failure should return JSON-RPC error");
4327        assert_eq!(error.message, "rewritten: auth failed");
4328    }
4329
4330    #[test]
4331    fn router_tools_call_injects_explicit_request_auth() {
4332        let server = Server::new("router-auth-test-server", "1.0.0")
4333            .tool(HttpCurrentAuthSubjectTool)
4334            .build();
4335        let result = server
4336            .router
4337            .handle_tools_call(
4338                &Cx::for_testing(),
4339                41,
4340                CallToolParams {
4341                    name: "http_current_auth_subject_tool".to_string(),
4342                    arguments: Some(serde_json::json!({})),
4343                    meta: None,
4344                },
4345                &Budget::INFINITE,
4346                SessionState::new(),
4347                Some(AuthContext::with_subject("alpha")),
4348                None,
4349                None,
4350            )
4351            .expect("tool call should succeed");
4352
4353        match result.content.as_slice() {
4354            [Content::Text { text }] => assert_eq!(text, "alpha"),
4355            other => panic!("expected single text tool result, got {other:?}"),
4356        }
4357    }
4358
4359    #[test]
4360    fn http_returning_server_honors_cancellation_without_needing_accept_wakeup() {
4361        let port_probe =
4362            std::net::TcpListener::bind("127.0.0.1:0").expect("bind ephemeral port probe");
4363        let addr = port_probe
4364            .local_addr()
4365            .expect("discover ephemeral port probe address");
4366        drop(port_probe);
4367
4368        let cx = Cx::for_testing();
4369        let (done_tx, done_rx) = std::sync::mpsc::channel();
4370        let server_thread = thread::spawn({
4371            let server = Server::new("http-cancel-test", "1.0.0").build();
4372            let cx = cx.clone();
4373            let addr = addr.to_string();
4374            move || {
4375                server.run_http_returning_with_cx(&cx, addr);
4376                let _ = done_tx.send(());
4377            }
4378        });
4379
4380        std::thread::sleep(Duration::from_millis(50));
4381        cx.cancel_with(CancelKind::User, None);
4382
4383        let returned_before_wakeup = done_rx.recv_timeout(Duration::from_millis(300)).is_ok();
4384        if !returned_before_wakeup {
4385            let _ = std::net::TcpStream::connect(addr);
4386            let _ = done_rx.recv_timeout(Duration::from_secs(1));
4387        }
4388
4389        server_thread
4390            .join()
4391            .expect("HTTP returning server thread should not panic");
4392        assert!(
4393            returned_before_wakeup,
4394            "run_http_returning_with_cx should stop promptly after cancellation without requiring an extra connection to wake accept()"
4395        );
4396    }
4397}