Skip to main content

fastmcp_server/
lib.rs

1//! MCP server implementation for FastMCP.
2//!
3//! This crate provides the server-side implementation:
4//! - Server builder pattern
5//! - Tool, resource, and prompt registration
6//! - Request routing and dispatching
7//! - Session management
8//!
9//! # Example
10//!
11//! ```ignore
12//! use fastmcp_rust::prelude::*;
13//!
14//! #[tool]
15//! async fn greet(ctx: &McpContext, name: String) -> String {
16//!     format!("Hello, {name}!")
17//! }
18//!
19//! fn main() {
20//!     Server::new("my-server", "1.0.0")
21//!         .tool(greet)
22//!         .run_stdio();
23//! }
24//! ```
25//!
26//! # Role in the System
27//!
28//! `fastmcp-server` is the **execution engine** for MCP servers. It ties
29//! together:
30//! - Protocol types (`fastmcp-protocol`) for requests and responses
31//! - Transports (`fastmcp-transport`) for stdio/SSE/WebSocket I/O
32//! - Core context + cancellation (`fastmcp-core`) for budgets and checkpoints
33//! - Console output (`fastmcp-console`) for human-friendly stderr rendering
34//!
35//! The façade crate `fastmcp` re-exports this API, so most users interact with
36//! `Server` via `fastmcp_rust::prelude::*`.
37
38#![forbid(unsafe_code)]
39#![allow(dead_code)]
40
41// Proc-macros (fastmcp-derive) reference this crate by its external name
42// (`fastmcp_server::...`). This alias makes those macros usable inside this crate too
43// (including in unit tests).
44extern crate self as fastmcp_server;
45
46mod auth;
47pub mod bidirectional;
48mod builder;
49pub mod caching;
50pub mod docket;
51mod handler;
52mod middleware;
53pub mod oauth;
54pub mod oidc;
55pub mod providers;
56mod proxy;
57pub mod rate_limiting;
58mod router;
59mod session;
60mod tasks;
61pub mod transform;
62
63#[cfg(test)]
64mod tests;
65
66#[cfg(feature = "jwt")]
67pub use auth::JwtTokenVerifier;
68pub use auth::{
69    AllowAllAuthProvider, AuthProvider, AuthRequest, StaticTokenVerifier, TokenAuthProvider,
70    TokenVerifier,
71};
72pub use builder::ServerBuilder;
73pub use fastmcp_console::config::{BannerStyle, ConsoleConfig, TrafficVerbosity};
74pub use fastmcp_console::stats::{ServerStats, StatsSnapshot};
75pub use handler::{
76    BidirectionalSenders, BoxFuture, ProgressNotificationSender, PromptHandler, ResourceHandler,
77    ToolHandler, create_context_with_progress, create_context_with_progress_and_senders,
78};
79pub use middleware::{Middleware, MiddlewareDecision};
80pub use proxy::{ProxyBackend, ProxyCatalog, ProxyClient};
81pub use router::{
82    MountResult, NotificationSender, Router, RouterResourceReader, RouterToolCaller, TagFilters,
83};
84pub use session::Session;
85pub use tasks::{SharedTaskManager, TaskManager};
86
87// Re-export bidirectional communication types
88pub use bidirectional::{
89    PendingRequests, RequestSender, TransportElicitationSender, TransportRootsProvider,
90    TransportSamplingSender,
91};
92
93use std::collections::HashMap;
94use std::io::{Read, Write};
95use std::sync::{Arc, Condvar, Mutex};
96use std::time::{Duration, Instant};
97
98use asupersync::time::wall_now;
99use asupersync::{Budget, CancelKind, Cx, RegionId};
100use fastmcp_console::client::RequestResponseRenderer;
101use fastmcp_console::logging::RichLoggerBuilder;
102use fastmcp_console::{banner::StartupBanner, console};
103use fastmcp_core::logging::{debug, error, info, targets};
104use fastmcp_core::{AuthContext, McpContext, McpError, McpErrorCode, McpResult};
105use fastmcp_protocol::{
106    CallToolParams, CancelTaskParams, CancelledParams, GetPromptParams, GetTaskParams,
107    InitializeParams, JsonRpcError, JsonRpcMessage, JsonRpcRequest, JsonRpcResponse,
108    ListPromptsParams, ListResourceTemplatesParams, ListResourcesParams, ListTasksParams,
109    ListToolsParams, LogLevel, LogMessageParams, Prompt, ReadResourceParams, RequestId, Resource,
110    ResourceTemplate, ServerCapabilities, ServerInfo, SetLogLevelParams, SubmitTaskParams,
111    SubscribeResourceParams, Tool, UnsubscribeResourceParams,
112};
113use fastmcp_transport::sse::SseServerTransport;
114use fastmcp_transport::websocket::WsTransport;
115use fastmcp_transport::{AsyncStdout, Codec, StdioTransport, Transport, TransportError};
116use log::{Level, LevelFilter};
117
118/// Type alias for startup hook function.
119pub type StartupHook =
120    Box<dyn FnOnce() -> Result<(), Box<dyn std::error::Error + Send + Sync>> + Send>;
121
122/// Type alias for shutdown hook function.
123pub type ShutdownHook = Box<dyn FnOnce() + Send>;
124
125/// Lifecycle hooks for server startup and shutdown.
126///
127/// These hooks allow custom initialization and cleanup logic to run
128/// at well-defined points in the server lifecycle:
129///
130/// - `on_startup`: Called before the server starts accepting connections
131/// - `on_shutdown`: Called when the server is shutting down
132///
133/// # Example
134///
135/// ```ignore
136/// use fastmcp_rust::prelude::*;
137///
138/// Server::new("demo", "1.0.0")
139///     .on_startup(|| {
140///         println!("Initializing...");
141///         // Initialize database, caches, etc.
142///         Ok(())
143///     })
144///     .on_shutdown(|| {
145///         println!("Cleaning up...");
146///         // Close connections, flush buffers, etc.
147///     })
148///     .run_stdio();
149/// ```
150#[derive(Default)]
151pub struct LifespanHooks {
152    /// Hook called before the server starts accepting connections.
153    pub on_startup: Option<StartupHook>,
154    /// Hook called when the server is shutting down.
155    pub on_shutdown: Option<ShutdownHook>,
156}
157
158impl LifespanHooks {
159    /// Creates empty lifecycle hooks.
160    #[must_use]
161    pub fn new() -> Self {
162        Self::default()
163    }
164}
165
166/// Logging configuration for the server.
167#[derive(Debug, Clone)]
168pub struct LoggingConfig {
169    /// Minimum log level (default: INFO).
170    pub level: Level,
171    /// Show timestamps in logs (default: true).
172    pub timestamps: bool,
173    /// Show module targets in logs (default: true).
174    pub targets: bool,
175    /// Show file:line in logs (default: false).
176    pub file_line: bool,
177}
178
179impl Default for LoggingConfig {
180    fn default() -> Self {
181        Self {
182            level: Level::Info,
183            timestamps: true,
184            targets: true,
185            file_line: false,
186        }
187    }
188}
189
190impl LoggingConfig {
191    /// Create logging config from environment variables.
192    ///
193    /// Respects:
194    /// - `FASTMCP_LOG`: Log level (error, warn, info, debug, trace)
195    /// - `FASTMCP_LOG_TIMESTAMPS`: Show timestamps (0/false to disable)
196    /// - `FASTMCP_LOG_TARGETS`: Show targets (0/false to disable)
197    /// - `FASTMCP_LOG_FILE_LINE`: Show file:line (1/true to enable)
198    #[must_use]
199    pub fn from_env() -> Self {
200        let level = std::env::var("FASTMCP_LOG")
201            .ok()
202            .and_then(|s| match s.to_lowercase().as_str() {
203                "error" => Some(Level::Error),
204                "warn" | "warning" => Some(Level::Warn),
205                "info" => Some(Level::Info),
206                "debug" => Some(Level::Debug),
207                "trace" => Some(Level::Trace),
208                _ => None,
209            })
210            .unwrap_or(Level::Info);
211
212        let timestamps = std::env::var("FASTMCP_LOG_TIMESTAMPS")
213            .map(|s| !matches!(s.to_lowercase().as_str(), "0" | "false" | "no"))
214            .unwrap_or(true);
215
216        let targets = std::env::var("FASTMCP_LOG_TARGETS")
217            .map(|s| !matches!(s.to_lowercase().as_str(), "0" | "false" | "no"))
218            .unwrap_or(true);
219
220        let file_line = std::env::var("FASTMCP_LOG_FILE_LINE")
221            .map(|s| matches!(s.to_lowercase().as_str(), "1" | "true" | "yes"))
222            .unwrap_or(false);
223
224        Self {
225            level,
226            timestamps,
227            targets,
228            file_line,
229        }
230    }
231}
232
233/// Behavior when registering a component with a name that already exists.
234///
235/// This setting controls how the server handles duplicate tool, resource,
236/// or prompt names during registration.
237#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
238pub enum DuplicateBehavior {
239    /// Raise an error and fail registration.
240    ///
241    /// Use this for strict validation in production environments.
242    Error,
243
244    /// Log a warning and keep the original component.
245    ///
246    /// This is the default behavior, providing visibility into duplicates
247    /// while maintaining backwards compatibility.
248    #[default]
249    Warn,
250
251    /// Replace the original component with the new one.
252    ///
253    /// Use this when you want later registrations to override earlier ones.
254    Replace,
255
256    /// Silently keep the original component.
257    ///
258    /// Use this when duplicates are expected and should be ignored.
259    Ignore,
260}
261
262/// An MCP server instance.
263///
264/// Servers are built using [`ServerBuilder`] and can run on various
265/// transports (stdio, SSE, WebSocket).
266pub struct Server {
267    info: ServerInfo,
268    capabilities: ServerCapabilities,
269    router: Router,
270    instructions: Option<String>,
271    /// Request timeout in seconds (0 = no timeout).
272    request_timeout_secs: u64,
273    /// Runtime statistics collector (None = disabled).
274    stats: Option<ServerStats>,
275    /// Whether to mask internal error details in responses.
276    mask_error_details: bool,
277    /// Logging configuration.
278    logging: LoggingConfig,
279    /// Console configuration for rich output.
280    console_config: ConsoleConfig,
281    /// Lifecycle hooks (wrapped in Option so they can be taken once).
282    lifespan: Mutex<Option<LifespanHooks>>,
283    /// Optional authentication provider.
284    auth_provider: Option<Arc<dyn AuthProvider>>,
285    /// Registered middleware.
286    middleware: Arc<Vec<Box<dyn crate::Middleware>>>,
287    /// Active requests by JSON-RPC request ID.
288    active_requests: Mutex<HashMap<RequestId, ActiveRequest>>,
289    /// Optional task manager for background tasks (Docket/SEP-1686).
290    task_manager: Option<SharedTaskManager>,
291    /// Pending server-to-client requests (for bidirectional communication).
292    pending_requests: Arc<bidirectional::PendingRequests>,
293}
294
295impl Server {
296    /// Creates a new server builder.
297    #[must_use]
298    #[allow(clippy::new_ret_no_self)]
299    pub fn new(name: impl Into<String>, version: impl Into<String>) -> ServerBuilder {
300        ServerBuilder::new(name, version)
301    }
302
303    /// Returns the server info.
304    #[must_use]
305    pub fn info(&self) -> &ServerInfo {
306        &self.info
307    }
308
309    /// Returns the server capabilities.
310    #[must_use]
311    pub fn capabilities(&self) -> &ServerCapabilities {
312        &self.capabilities
313    }
314
315    /// Lists all registered tools.
316    #[must_use]
317    pub fn tools(&self) -> Vec<Tool> {
318        self.router.tools()
319    }
320
321    /// Lists all registered resources.
322    #[must_use]
323    pub fn resources(&self) -> Vec<Resource> {
324        self.router.resources()
325    }
326
327    /// Lists all registered resource templates.
328    #[must_use]
329    pub fn resource_templates(&self) -> Vec<ResourceTemplate> {
330        self.router.resource_templates()
331    }
332
333    /// Lists all registered prompts.
334    #[must_use]
335    pub fn prompts(&self) -> Vec<Prompt> {
336        self.router.prompts()
337    }
338
339    /// Returns the task manager, if configured.
340    ///
341    /// Returns `None` if background tasks are not enabled.
342    #[must_use]
343    pub fn task_manager(&self) -> Option<&SharedTaskManager> {
344        self.task_manager.as_ref()
345    }
346
347    /// Consumes the server and returns its router.
348    ///
349    /// This is used for mounting one server's components into another.
350    #[must_use]
351    pub fn into_router(self) -> Router {
352        self.router
353    }
354
355    /// Returns the capabilities this server provides.
356    ///
357    /// This is useful when determining what components a server has
358    /// before mounting.
359    #[must_use]
360    pub fn has_tools(&self) -> bool {
361        self.capabilities.tools.is_some()
362    }
363
364    /// Returns whether this server has resources.
365    #[must_use]
366    pub fn has_resources(&self) -> bool {
367        self.capabilities.resources.is_some()
368    }
369
370    /// Returns whether this server has prompts.
371    #[must_use]
372    pub fn has_prompts(&self) -> bool {
373        self.capabilities.prompts.is_some()
374    }
375
376    /// Returns a point-in-time snapshot of server statistics.
377    ///
378    /// Returns `None` if statistics collection is disabled.
379    #[must_use]
380    pub fn stats(&self) -> Option<StatsSnapshot> {
381        self.stats.as_ref().map(ServerStats::snapshot)
382    }
383
384    /// Returns the raw statistics collector.
385    ///
386    /// Useful for advanced scenarios where you need direct access.
387    /// Returns `None` if statistics collection is disabled.
388    #[must_use]
389    pub fn stats_collector(&self) -> Option<&ServerStats> {
390        self.stats.as_ref()
391    }
392
393    /// Renders a stats panel to stderr, if stats are enabled.
394    pub fn display_stats(&self) {
395        let Some(stats) = self.stats.as_ref() else {
396            return;
397        };
398
399        let snapshot = stats.snapshot();
400        let renderer = fastmcp_console::stats::StatsRenderer::detect();
401        renderer.render_panel(&snapshot, console());
402    }
403
404    /// Returns the console configuration.
405    #[must_use]
406    pub fn console_config(&self) -> &ConsoleConfig {
407        &self.console_config
408    }
409
410    /// Renders the startup banner based on console configuration.
411    fn render_startup_banner(&self) {
412        let render = || {
413            let mut banner = StartupBanner::new(&self.info.name, &self.info.version)
414                .tools(self.router.tools_count())
415                .resources(self.router.resources_count())
416                .prompts(self.router.prompts_count())
417                .transport("stdio");
418
419            if let Some(desc) = self.instructions.as_deref().filter(|d| !d.is_empty()) {
420                banner = banner.description(desc);
421            }
422
423            // Apply banner style from config
424            match self.console_config.banner_style {
425                BannerStyle::Full => banner.render(console()),
426                BannerStyle::Compact | BannerStyle::Minimal => {
427                    // Compact/Minimal: render without the large logo
428                    banner.no_logo().render(console());
429                }
430                BannerStyle::None => {} // Already checked show_banner, but be safe
431            }
432        };
433
434        if let Err(err) = std::panic::catch_unwind(std::panic::AssertUnwindSafe(render)) {
435            eprintln!("Warning: banner rendering failed: {err:?}");
436        }
437    }
438
439    /// Initializes rich logging based on server configuration.
440    ///
441    /// This should be called early in the startup sequence, before any
442    /// log output is generated. If initialization fails (e.g., logger
443    /// already set), a warning is printed to stderr.
444    fn init_rich_logging(&self) {
445        let result = RichLoggerBuilder::new()
446            .level(self.logging.level)
447            .with_timestamps(self.logging.timestamps)
448            .with_targets(self.logging.targets)
449            .with_file_line(self.logging.file_line)
450            .init();
451
452        if let Err(e) = result {
453            // Logger already initialized (likely by user code), not an error
454            eprintln!("Note: Rich logging not initialized (logger already set): {e}");
455        }
456    }
457
458    /// Runs the server on stdio transport.
459    ///
460    /// This is the primary way to run MCP servers as subprocesses.
461    /// Creates a request-scoped Cx and runs the server loop.
462    pub fn run_stdio(self) -> ! {
463        // Top-level server loop Cx (non-test). Per-request budgets are applied in `handle_request`.
464        let cx = Cx::for_request();
465        self.run_stdio_with_cx(&cx)
466    }
467
468    /// Runs the server on stdio with a provided Cx.
469    ///
470    /// This allows integration with a real asupersync runtime.
471    pub fn run_stdio_with_cx(self, cx: &Cx) -> ! {
472        // Initialize rich logging first, before any log output
473        self.init_rich_logging();
474
475        let transport = StdioTransport::stdio();
476        let shared = SharedTransport::new(transport);
477
478        // Create a notification sender that writes to a separate stdout handle.
479        // This allows progress notifications to be sent during handler execution
480        // while the main transport is blocked on recv().
481        let notification_sender = create_notification_sender();
482
483        let shared_recv = shared.clone();
484        let shared_send = shared.clone();
485        self.run_loop(
486            cx,
487            move |cx| shared_recv.recv(cx),
488            move |cx, message| shared_send.send(cx, message),
489            notification_sender,
490        )
491    }
492
493    /// Runs the server on a custom transport with a request-scoped Cx.
494    ///
495    /// This is useful for SSE/WebSocket integrations where the transport is
496    /// provided by an external server framework.
497    pub fn run_transport<T>(self, transport: T) -> !
498    where
499        T: Transport + Send + 'static,
500    {
501        // Top-level server loop Cx (non-test). Per-request budgets are applied in `handle_request`.
502        let cx = Cx::for_request();
503        self.run_transport_with_cx(&cx, transport)
504    }
505
506    /// Runs the server on a custom transport with a provided Cx.
507    ///
508    /// This allows integration with a real asupersync runtime.
509    pub fn run_transport_with_cx<T>(self, cx: &Cx, transport: T) -> !
510    where
511        T: Transport + Send + 'static,
512    {
513        self.init_rich_logging();
514
515        let shared = SharedTransport::new(transport);
516        let notification_sender = create_transport_notification_sender(shared.clone());
517
518        let shared_recv = shared.clone();
519        let shared_send = shared;
520        self.run_loop(
521            cx,
522            move |cx| shared_recv.recv(cx),
523            move |cx, message| shared_send.send(cx, message),
524            notification_sender,
525        )
526    }
527
528    /// Runs the server on a custom transport and returns when the transport closes or the Cx is cancelled.
529    ///
530    /// Unlike [`run_transport_with_cx`](Self::run_transport_with_cx), this does not call
531    /// `std::process::exit` on shutdown. This is useful for tests and embedding where you need
532    /// the server loop to be joinable.
533    pub fn run_transport_returning_with_cx<T>(self, cx: &Cx, transport: T)
534    where
535        T: Transport + Send + 'static,
536    {
537        self.init_rich_logging();
538
539        let shared = SharedTransport::new(transport);
540        let notification_sender = create_transport_notification_sender(shared.clone());
541
542        let shared_recv = shared.clone();
543        let shared_send = shared;
544        self.run_loop_returning(
545            cx,
546            move |cx| shared_recv.recv(cx),
547            move |cx, message| shared_send.send(cx, message),
548            notification_sender,
549        );
550    }
551
552    /// Runs the server on a custom transport and returns when the transport closes.
553    ///
554    /// This uses a request-scoped [`Cx`], but unlike [`run_transport`](Self::run_transport) it does
555    /// not exit the process.
556    pub fn run_transport_returning<T>(self, transport: T)
557    where
558        T: Transport + Send + 'static,
559    {
560        // Top-level server loop Cx (non-test). Per-request budgets are applied in `handle_request`.
561        let cx = Cx::for_request();
562        self.run_transport_returning_with_cx(&cx, transport);
563    }
564
565    /// Runs the server using SSE transport with a testing Cx.
566    ///
567    /// This is a convenience wrapper around [`SseServerTransport`].
568    pub fn run_sse<W, R>(self, writer: W, request_source: R, endpoint_url: impl Into<String>) -> !
569    where
570        W: Write + Send + 'static,
571        R: Iterator<Item = JsonRpcRequest> + Send + 'static,
572    {
573        let transport = SseServerTransport::new(writer, request_source, endpoint_url);
574        self.run_transport(transport)
575    }
576
577    /// Runs the server using SSE transport with a provided Cx.
578    pub fn run_sse_with_cx<W, R>(
579        self,
580        cx: &Cx,
581        writer: W,
582        request_source: R,
583        endpoint_url: impl Into<String>,
584    ) -> !
585    where
586        W: Write + Send + 'static,
587        R: Iterator<Item = JsonRpcRequest> + Send + 'static,
588    {
589        let transport = SseServerTransport::new(writer, request_source, endpoint_url);
590        self.run_transport_with_cx(cx, transport)
591    }
592
593    /// Runs the server using WebSocket transport with a testing Cx.
594    ///
595    /// This is a convenience wrapper around [`WsTransport`].
596    pub fn run_websocket<R, W>(self, reader: R, writer: W) -> !
597    where
598        R: Read + Send + 'static,
599        W: Write + Send + 'static,
600    {
601        let transport = WsTransport::new(reader, writer);
602        self.run_transport(transport)
603    }
604
605    /// Runs the server using WebSocket transport with a provided Cx.
606    pub fn run_websocket_with_cx<R, W>(self, cx: &Cx, reader: R, writer: W) -> !
607    where
608        R: Read + Send + 'static,
609        W: Write + Send + 'static,
610    {
611        let transport = WsTransport::new(reader, writer);
612        self.run_transport_with_cx(cx, transport)
613    }
614
615    /// Runs the startup lifecycle hook, if configured.
616    ///
617    /// Returns `true` if startup succeeded (or no hook was configured),
618    /// `false` if the hook returned an error.
619    pub(crate) fn run_startup_hook(&self) -> bool {
620        let hook = {
621            let mut guard = self.lifespan.lock().unwrap_or_else(|poisoned| {
622                error!(target: targets::SERVER, "lifespan lock poisoned in run_startup_hook, recovering");
623                poisoned.into_inner()
624            });
625            guard.as_mut().and_then(|h| h.on_startup.take())
626        };
627
628        if let Some(hook) = hook {
629            debug!(target: targets::SERVER, "Running startup hook");
630            match hook() {
631                Ok(()) => {
632                    debug!(target: targets::SERVER, "Startup hook completed successfully");
633                    true
634                }
635                Err(e) => {
636                    error!(target: targets::SERVER, "Startup hook failed: {}", e);
637                    false
638                }
639            }
640        } else {
641            true
642        }
643    }
644
645    /// Runs the shutdown lifecycle hook, if configured.
646    pub(crate) fn run_shutdown_hook(&self) {
647        let hook = {
648            let mut guard = self.lifespan.lock().unwrap_or_else(|poisoned| {
649                error!(target: targets::SERVER, "lifespan lock poisoned in run_shutdown_hook, recovering");
650                poisoned.into_inner()
651            });
652            guard.as_mut().and_then(|h| h.on_shutdown.take())
653        };
654
655        if let Some(hook) = hook {
656            debug!(target: targets::SERVER, "Running shutdown hook");
657            hook();
658            debug!(target: targets::SERVER, "Shutdown hook completed");
659        }
660    }
661
662    /// Performs graceful shutdown: runs hook, closes stats, exits.
663    fn graceful_shutdown(&self, exit_code: i32) -> ! {
664        self.cancel_active_requests(CancelKind::Shutdown, true);
665        self.run_shutdown_hook();
666        if let Some(ref stats) = self.stats {
667            stats.connection_closed();
668        }
669        std::process::exit(exit_code)
670    }
671
672    /// Performs graceful shutdown without exiting the process.
673    ///
674    /// This is intended for embedding/testing scenarios where the server loop is
675    /// running on a thread and the caller wants to `join()` it.
676    fn graceful_shutdown_returning(&self) {
677        self.cancel_active_requests(CancelKind::Shutdown, true);
678        self.run_shutdown_hook();
679        if let Some(ref stats) = self.stats {
680            stats.connection_closed();
681        }
682    }
683
684    /// Shared server loop for any transport, using closure-based recv/send.
685    fn run_loop<R, S>(
686        self,
687        cx: &Cx,
688        mut recv: R,
689        send: S,
690        notification_sender: NotificationSender,
691    ) -> !
692    where
693        R: FnMut(&Cx) -> Result<JsonRpcMessage, TransportError>,
694        S: FnMut(&Cx, &JsonRpcMessage) -> Result<(), TransportError> + Send + Sync + 'static,
695    {
696        let mut session = Session::new(self.info.clone(), self.capabilities.clone());
697
698        // Wrap send in Arc<Mutex> for shared access from bidirectional requests
699        let send = Arc::new(Mutex::new(send));
700
701        // Create a RequestSender for bidirectional communication
702        let request_sender = {
703            let send_clone = send.clone();
704            let send_cx = cx.clone();
705            let send_fn: bidirectional::TransportSendFn = Arc::new(move |message| {
706                let mut guard = send_clone
707                    .lock()
708                    .map_err(|e| format!("Lock poisoned: {}", e))?;
709                guard(&send_cx, message).map_err(|e| format!("Send failed: {}", e))
710            });
711            bidirectional::RequestSender::new(self.pending_requests.clone(), send_fn)
712        };
713
714        // Track connection opened
715        if let Some(ref stats) = self.stats {
716            stats.connection_opened();
717        }
718
719        // Render startup banner if enabled (respects both config and legacy env var)
720        if self.console_config.show_banner && !banner_suppressed() {
721            self.render_startup_banner();
722        }
723
724        // Run startup hook
725        if !self.run_startup_hook() {
726            error!(target: targets::SERVER, "Startup hook failed, exiting");
727            self.graceful_shutdown(1);
728        }
729
730        // Create traffic renderer if enabled
731        let traffic_renderer = if self.console_config.show_request_traffic {
732            let mut renderer = RequestResponseRenderer::new(self.console_config.resolve_context());
733            renderer.truncate_at = self.console_config.truncate_at;
734            match self.console_config.traffic_verbosity {
735                TrafficVerbosity::None => {} // Should not happen given the if check
736                TrafficVerbosity::Summary | TrafficVerbosity::Headers => {
737                    renderer.show_params = false;
738                    renderer.show_result = false;
739                }
740                TrafficVerbosity::Full => {
741                    renderer.show_params = true;
742                    renderer.show_result = true;
743                }
744            }
745            Some(renderer)
746        } else {
747            None
748        };
749
750        // Main request loop
751        loop {
752            // Check for cancellation
753            if cx.is_cancel_requested() {
754                info!(target: targets::SERVER, "Cancellation requested, shutting down");
755                self.graceful_shutdown(0);
756            }
757
758            // Receive next message
759            let message = match recv(cx) {
760                Ok(msg) => msg,
761                Err(TransportError::Closed) => {
762                    // Clean shutdown - track connection close
763                    self.graceful_shutdown(0);
764                }
765                Err(TransportError::Cancelled) => {
766                    info!(target: targets::SERVER, "Transport cancelled");
767                    self.graceful_shutdown(0);
768                }
769                Err(e) => {
770                    error!(target: targets::TRANSPORT, "Transport error: {}", e);
771                    continue;
772                }
773            };
774
775            // Log request traffic
776            if let Some(renderer) = &traffic_renderer {
777                if let JsonRpcMessage::Request(req) = &message {
778                    renderer.render_request(req, console());
779                }
780            }
781
782            let start_time = Instant::now();
783
784            // Handle the message
785            let response_opt = match message {
786                JsonRpcMessage::Request(request) => {
787                    // Track bytes received (approximate from serialized request size)
788                    if let Some(ref stats) = self.stats {
789                        // Estimate request size by serializing back to JSON
790                        // This is approximate but accurate enough for statistics
791                        if let Ok(json) = serde_json::to_string(&request) {
792                            stats.add_bytes_received(json.len() as u64 + 1); // +1 for newline
793                        }
794                    }
795                    self.handle_request(
796                        cx,
797                        &mut session,
798                        request,
799                        &notification_sender,
800                        &request_sender,
801                    )
802                }
803                JsonRpcMessage::Response(response) => {
804                    // Route response to pending server-initiated request (bidirectional)
805                    if self.pending_requests.route_response(&response) {
806                        debug!(target: targets::SERVER, "Routed response to pending request");
807                    } else {
808                        debug!(target: targets::SERVER, "Received unexpected response: {:?}", response.id);
809                    }
810                    continue;
811                }
812            };
813
814            let duration = start_time.elapsed();
815
816            if let Some(response) = response_opt {
817                // Log response traffic
818                if let Some(renderer) = &traffic_renderer {
819                    renderer.render_response(&response, Some(duration), console());
820                }
821
822                // Track bytes sent (approximate from serialized response size)
823                if let Some(ref stats) = self.stats {
824                    if let Ok(json) = serde_json::to_string(&response) {
825                        stats.add_bytes_sent(json.len() as u64 + 1); // +1 for newline
826                    }
827                }
828
829                // Send response
830                let send_result = {
831                    let mut guard = match send.lock() {
832                        Ok(guard) => guard,
833                        Err(poisoned) => {
834                            error!(
835                                target: targets::TRANSPORT,
836                                "Send channel lock poisoned; continuing with inner guard"
837                            );
838                            poisoned.into_inner()
839                        }
840                    };
841                    guard(cx, &JsonRpcMessage::Response(response))
842                };
843                if let Err(e) = send_result {
844                    error!(target: targets::TRANSPORT, "Failed to send response: {}", e);
845                }
846            }
847        }
848    }
849
850    /// Shared server loop for embedding/testing, returning on shutdown instead of exiting.
851    ///
852    /// This is intentionally separate from [`run_loop`](Self::run_loop) because the primary server
853    /// entrypoints use `std::process::exit` on shutdown for subprocess use-cases.
854    #[allow(clippy::too_many_lines)]
855    fn run_loop_returning<R, S>(
856        self,
857        cx: &Cx,
858        mut recv: R,
859        send: S,
860        notification_sender: NotificationSender,
861    ) where
862        R: FnMut(&Cx) -> Result<JsonRpcMessage, TransportError>,
863        S: FnMut(&Cx, &JsonRpcMessage) -> Result<(), TransportError> + Send + Sync + 'static,
864    {
865        let mut session = Session::new(self.info.clone(), self.capabilities.clone());
866
867        // Wrap send in Arc<Mutex> for shared access from bidirectional requests
868        let send = Arc::new(Mutex::new(send));
869
870        // Create a RequestSender for bidirectional communication
871        let request_sender = {
872            let send_clone = send.clone();
873            let send_cx = cx.clone();
874            let send_fn: bidirectional::TransportSendFn = Arc::new(move |message| {
875                let mut guard = send_clone
876                    .lock()
877                    .map_err(|e| format!("Lock poisoned: {}", e))?;
878                guard(&send_cx, message).map_err(|e| format!("Send failed: {}", e))
879            });
880            bidirectional::RequestSender::new(self.pending_requests.clone(), send_fn)
881        };
882
883        // Track connection opened
884        if let Some(ref stats) = self.stats {
885            stats.connection_opened();
886        }
887
888        // Render startup banner if enabled (respects both config and legacy env var)
889        if self.console_config.show_banner && !banner_suppressed() {
890            self.render_startup_banner();
891        }
892
893        // Run startup hook
894        if !self.run_startup_hook() {
895            error!(target: targets::SERVER, "Startup hook failed, stopping");
896            self.graceful_shutdown_returning();
897            return;
898        }
899
900        // Create traffic renderer if enabled
901        let traffic_renderer = if self.console_config.show_request_traffic {
902            let mut renderer = RequestResponseRenderer::new(self.console_config.resolve_context());
903            renderer.truncate_at = self.console_config.truncate_at;
904            match self.console_config.traffic_verbosity {
905                TrafficVerbosity::None => {} // Should not happen given the if check
906                TrafficVerbosity::Summary | TrafficVerbosity::Headers => {
907                    renderer.show_params = false;
908                    renderer.show_result = false;
909                }
910                TrafficVerbosity::Full => {
911                    renderer.show_params = true;
912                    renderer.show_result = true;
913                }
914            }
915            Some(renderer)
916        } else {
917            None
918        };
919
920        // Main request loop
921        loop {
922            // Check for cancellation
923            if cx.is_cancel_requested() {
924                info!(target: targets::SERVER, "Cancellation requested, stopping");
925                self.graceful_shutdown_returning();
926                return;
927            }
928
929            // Receive next message
930            let message = match recv(cx) {
931                Ok(msg) => msg,
932                Err(TransportError::Closed) => {
933                    self.graceful_shutdown_returning();
934                    return;
935                }
936                Err(TransportError::Cancelled) => {
937                    info!(target: targets::SERVER, "Transport cancelled");
938                    self.graceful_shutdown_returning();
939                    return;
940                }
941                Err(e) => {
942                    error!(target: targets::TRANSPORT, "Transport error: {}", e);
943                    continue;
944                }
945            };
946
947            // Log request traffic
948            if let Some(renderer) = &traffic_renderer {
949                if let JsonRpcMessage::Request(req) = &message {
950                    renderer.render_request(req, console());
951                }
952            }
953
954            let start_time = Instant::now();
955
956            // Handle the message
957            let response_opt = match message {
958                JsonRpcMessage::Request(request) => {
959                    // Track bytes received (approximate from serialized request size)
960                    if let Some(ref stats) = self.stats {
961                        // Estimate request size by serializing back to JSON
962                        // This is approximate but accurate enough for statistics
963                        if let Ok(json) = serde_json::to_string(&request) {
964                            stats.add_bytes_received(json.len() as u64 + 1); // +1 for newline
965                        }
966                    }
967                    self.handle_request(
968                        cx,
969                        &mut session,
970                        request,
971                        &notification_sender,
972                        &request_sender,
973                    )
974                }
975                JsonRpcMessage::Response(response) => {
976                    // Route response to pending server-initiated request (bidirectional)
977                    if self.pending_requests.route_response(&response) {
978                        debug!(target: targets::SERVER, "Routed response to pending request");
979                    } else {
980                        debug!(
981                            target: targets::SERVER,
982                            "Received unexpected response: {:?}",
983                            response.id
984                        );
985                    }
986                    continue;
987                }
988            };
989
990            let duration = start_time.elapsed();
991
992            if let Some(response) = response_opt {
993                // Log response traffic
994                if let Some(renderer) = &traffic_renderer {
995                    renderer.render_response(&response, Some(duration), console());
996                }
997
998                // Track bytes sent (approximate from serialized response size)
999                if let Some(ref stats) = self.stats {
1000                    if let Ok(json) = serde_json::to_string(&response) {
1001                        stats.add_bytes_sent(json.len() as u64 + 1); // +1 for newline
1002                    }
1003                }
1004
1005                // Send response
1006                let send_result = {
1007                    let mut guard = match send.lock() {
1008                        Ok(guard) => guard,
1009                        Err(poisoned) => {
1010                            error!(
1011                                target: targets::TRANSPORT,
1012                                "Send channel lock poisoned; continuing with inner guard"
1013                            );
1014                            poisoned.into_inner()
1015                        }
1016                    };
1017                    guard(cx, &JsonRpcMessage::Response(response))
1018                };
1019                if let Err(e) = send_result {
1020                    error!(target: targets::TRANSPORT, "Failed to send response: {}", e);
1021                }
1022            }
1023        }
1024    }
1025
1026    /// Handles a single JSON-RPC request.
1027    fn handle_request(
1028        &self,
1029        cx: &Cx,
1030        session: &mut Session,
1031        request: JsonRpcRequest,
1032        notification_sender: &NotificationSender,
1033        request_sender: &bidirectional::RequestSender,
1034    ) -> Option<JsonRpcResponse> {
1035        let id = request.id.clone();
1036        let method = request.method.clone();
1037        let is_notification = id.is_none();
1038
1039        // Start timing for stats
1040        let start_time = Instant::now();
1041
1042        // Generate internal request ID for tracing
1043        let request_id = request_id_to_u64(id.as_ref());
1044
1045        // Create a budget for this request based on timeout configuration
1046        let budget = self.create_request_budget();
1047
1048        // Check if budget is already exhausted (should not happen, but be defensive)
1049        if budget.is_exhausted() {
1050            // Record failed request due to exhausted budget
1051            if let Some(ref stats) = self.stats {
1052                stats.record_request(&method, start_time.elapsed(), false);
1053            }
1054            // If it's a notification, we don't send an error response
1055            let response_id = id.clone()?;
1056            return Some(JsonRpcResponse::error(
1057                Some(response_id),
1058                JsonRpcError {
1059                    code: McpErrorCode::RequestCancelled.into(),
1060                    message: "Request budget exhausted".to_string(),
1061                    data: None,
1062                },
1063            ));
1064        }
1065
1066        let request_cx = if is_notification {
1067            cx.clone()
1068        } else {
1069            Cx::for_request_with_budget(budget)
1070        };
1071
1072        let _active_guard = id.clone().map(|request_id| {
1073            ActiveRequestGuard::new(&self.active_requests, request_id, request_cx.clone())
1074        });
1075
1076        // Dispatch based on method, passing the budget, notification sender, and request sender
1077        let result = self.dispatch_method(
1078            &request_cx,
1079            session,
1080            request,
1081            request_id,
1082            &budget,
1083            notification_sender,
1084            request_sender,
1085        );
1086
1087        // Record statistics
1088        let latency = start_time.elapsed();
1089        if let Some(ref stats) = self.stats {
1090            match &result {
1091                Ok(_) => stats.record_request(&method, latency, true),
1092                Err(e) if e.code == fastmcp_core::McpErrorCode::RequestCancelled => {
1093                    stats.record_cancelled(&method, latency);
1094                }
1095                Err(_) => stats.record_request(&method, latency, false),
1096            }
1097        }
1098
1099        // If it's a notification (no ID), we must not reply
1100        if is_notification {
1101            if let Err(e) = result {
1102                fastmcp_core::logging::error!(
1103                    target: targets::HANDLER,
1104                    "Notification '{}' failed: {}",
1105                    method,
1106                    e
1107                );
1108            }
1109            return None;
1110        }
1111
1112        // We only reach here if `is_notification` is false, which implies `id` is present.
1113        // Use `?` to avoid `unwrap()` and keep the control-flow explicit.
1114        let response_id = id.clone()?;
1115
1116        match result {
1117            Ok(value) => Some(JsonRpcResponse::success(response_id, value)),
1118            Err(e) => {
1119                // Log full error before masking if this is an internal error
1120                if self.mask_error_details && e.is_internal() {
1121                    fastmcp_core::logging::error!(
1122                        target: targets::HANDLER,
1123                        "Request '{}' failed (masked in response): {}",
1124                        method,
1125                        e
1126                    );
1127                }
1128
1129                // Apply masking if enabled
1130                let masked = e.masked(self.mask_error_details);
1131                Some(JsonRpcResponse::error(
1132                    id,
1133                    JsonRpcError {
1134                        code: masked.code.into(),
1135                        message: masked.message,
1136                        data: masked.data,
1137                    },
1138                ))
1139            }
1140        }
1141    }
1142
1143    /// Creates a budget for a new request based on server configuration.
1144    fn create_request_budget(&self) -> Budget {
1145        if self.request_timeout_secs == 0 {
1146            // No timeout - unlimited budget
1147            Budget::INFINITE
1148        } else {
1149            // Budget deadlines are absolute (`Time` since runtime epoch). We use `wall_now()`
1150            // so timeouts work even when running outside a full asupersync scheduler.
1151            let now = wall_now();
1152            let timeout_ns = self.request_timeout_secs.saturating_mul(1_000_000_000);
1153            let deadline = now.saturating_add_nanos(timeout_ns);
1154            Budget::new().with_deadline(deadline)
1155        }
1156    }
1157
1158    /// Dispatches a request to the appropriate handler.
1159    #[allow(clippy::too_many_arguments, clippy::too_many_lines)]
1160    fn dispatch_method(
1161        &self,
1162        cx: &Cx,
1163        session: &mut Session,
1164        request: JsonRpcRequest,
1165        request_id: u64,
1166        budget: &Budget,
1167        notification_sender: &NotificationSender,
1168        request_sender: &bidirectional::RequestSender,
1169    ) -> Result<serde_json::Value, McpError> {
1170        // Check cancellation before dispatch
1171        if cx.is_cancel_requested() {
1172            return Err(McpError::request_cancelled());
1173        }
1174
1175        // Check budget before dispatch (for poll-based exhaustion)
1176        if budget.is_exhausted() {
1177            return Err(McpError::new(
1178                McpErrorCode::RequestCancelled,
1179                "Request budget exhausted",
1180            ));
1181        }
1182        // Deadline exhaustion is time-based and must be checked against current time.
1183        if budget.is_past_deadline(wall_now()) {
1184            cx.cancel_fast(CancelKind::Deadline);
1185            return Err(McpError::new(
1186                McpErrorCode::RequestCancelled,
1187                "Request timeout exceeded",
1188            ));
1189        }
1190
1191        // Check initialization state
1192        if !session.is_initialized() && request.method != "initialize" && request.method != "ping" {
1193            return Err(McpError::invalid_request(
1194                "Server not initialized. Client must send 'initialize' first.",
1195            ));
1196        }
1197
1198        if let Some(task_manager) = &self.task_manager {
1199            task_manager.set_notification_sender(Arc::clone(notification_sender));
1200        }
1201
1202        // Middleware: on_request
1203        // We use a temporary context derived from the request context for middleware
1204        // so they can access session state but share the request's lifecycle.
1205        let mw_ctx = McpContext::with_state(cx.clone(), request_id, session.state().clone());
1206        let mut entered_middleware: Vec<&dyn crate::Middleware> = Vec::new();
1207
1208        for m in self.middleware.iter() {
1209            entered_middleware.push(m.as_ref());
1210            match m.on_request(&mw_ctx, &request) {
1211                Ok(crate::MiddlewareDecision::Continue) => {}
1212                Ok(crate::MiddlewareDecision::Respond(v)) => {
1213                    return self.apply_middleware_response(
1214                        &entered_middleware,
1215                        &mw_ctx,
1216                        &request,
1217                        v,
1218                    );
1219                }
1220                Err(e) => {
1221                    let err =
1222                        self.apply_middleware_error(&entered_middleware, &mw_ctx, &request, e);
1223                    return Err(err);
1224                }
1225            }
1226        }
1227
1228        // Everything after middleware entry must flow through `result` so that:
1229        // - `on_response` runs for successes in reverse middleware order
1230        // - `on_error` runs for *all* errors (including parsing/auth) in reverse middleware order
1231        //
1232        // Without this, `?` would early-return from `dispatch_method` and bypass middleware error
1233        // rewriting, contradicting the ordering semantics documented in `middleware.rs`.
1234        let result: Result<serde_json::Value, McpError> = (|| {
1235            if self.should_authenticate(&request.method) {
1236                let auth_request = AuthRequest {
1237                    method: &request.method,
1238                    params: request.params.as_ref(),
1239                    request_id,
1240                };
1241                self.authenticate_request(cx, request_id, session, auth_request)?;
1242            }
1243
1244            let method = &request.method;
1245            let params = request.params.clone();
1246
1247            // Create bidirectional senders based on client capabilities
1248            let bidirectional_senders = self.create_bidirectional_senders(session, request_sender);
1249
1250            match method.as_str() {
1251                "initialize" => {
1252                    let params: InitializeParams = parse_params(params)?;
1253                    let result = self.router.handle_initialize(
1254                        cx,
1255                        session,
1256                        params,
1257                        self.instructions.as_deref(),
1258                    )?;
1259                    Ok(serde_json::to_value(result).map_err(McpError::from)?)
1260                }
1261                "initialized" => {
1262                    // Notification, no response needed (but we send empty ok)
1263                    Ok(serde_json::Value::Null)
1264                }
1265                "notifications/cancelled" => {
1266                    let params: CancelledParams = parse_params(params)?;
1267                    self.handle_cancelled_notification(params);
1268                    Ok(serde_json::Value::Null)
1269                }
1270                "logging/setLevel" => {
1271                    let params: SetLogLevelParams = parse_params(params)?;
1272                    self.handle_set_log_level(session, params);
1273                    Ok(serde_json::Value::Null)
1274                }
1275                "tools/list" => {
1276                    let params: ListToolsParams = parse_params_or_default(params)?;
1277                    let result =
1278                        self.router
1279                            .handle_tools_list(cx, params, Some(session.state()))?;
1280                    Ok(serde_json::to_value(result).map_err(McpError::from)?)
1281                }
1282                "tools/call" => {
1283                    let params: CallToolParams = parse_params(params)?;
1284                    let result = self.router.handle_tools_call(
1285                        cx,
1286                        request_id,
1287                        params,
1288                        budget,
1289                        session.state().clone(),
1290                        Some(notification_sender),
1291                        bidirectional_senders.as_ref(),
1292                    )?;
1293                    Ok(serde_json::to_value(result).map_err(McpError::from)?)
1294                }
1295                "resources/list" => {
1296                    let params: ListResourcesParams = parse_params_or_default(params)?;
1297                    let result =
1298                        self.router
1299                            .handle_resources_list(cx, params, Some(session.state()))?;
1300                    Ok(serde_json::to_value(result).map_err(McpError::from)?)
1301                }
1302                "resources/templates/list" => {
1303                    let params: ListResourceTemplatesParams = parse_params_or_default(params)?;
1304                    let result = self.router.handle_resource_templates_list(
1305                        cx,
1306                        params,
1307                        Some(session.state()),
1308                    )?;
1309                    Ok(serde_json::to_value(result).map_err(McpError::from)?)
1310                }
1311                "resources/read" => {
1312                    let params: ReadResourceParams = parse_params(params)?;
1313                    let result = self.router.handle_resources_read(
1314                        cx,
1315                        request_id,
1316                        &params,
1317                        budget,
1318                        session.state().clone(),
1319                        Some(notification_sender),
1320                        bidirectional_senders.as_ref(),
1321                    )?;
1322                    Ok(serde_json::to_value(result).map_err(McpError::from)?)
1323                }
1324                "resources/subscribe" => {
1325                    let params: SubscribeResourceParams = parse_params(params)?;
1326                    if !self.router.resource_exists(&params.uri) {
1327                        return Err(McpError::resource_not_found(&params.uri));
1328                    }
1329                    session.subscribe_resource(params.uri);
1330                    Ok(serde_json::json!({}))
1331                }
1332                "resources/unsubscribe" => {
1333                    let params: UnsubscribeResourceParams = parse_params(params)?;
1334                    session.unsubscribe_resource(&params.uri);
1335                    Ok(serde_json::json!({}))
1336                }
1337                "prompts/list" => {
1338                    let params: ListPromptsParams = parse_params_or_default(params)?;
1339                    let result =
1340                        self.router
1341                            .handle_prompts_list(cx, params, Some(session.state()))?;
1342                    Ok(serde_json::to_value(result).map_err(McpError::from)?)
1343                }
1344                "prompts/get" => {
1345                    let params: GetPromptParams = parse_params(params)?;
1346                    let result = self.router.handle_prompts_get(
1347                        cx,
1348                        request_id,
1349                        params,
1350                        budget,
1351                        session.state().clone(),
1352                        Some(notification_sender),
1353                        bidirectional_senders.as_ref(),
1354                    )?;
1355                    Ok(serde_json::to_value(result).map_err(McpError::from)?)
1356                }
1357                "ping" => {
1358                    // Simple ping-pong for health checks
1359                    Ok(serde_json::json!({}))
1360                }
1361                // Task methods (Docket/SEP-1686)
1362                "tasks/list" => {
1363                    let params: ListTasksParams = parse_params_or_default(params)?;
1364                    let result =
1365                        self.router
1366                            .handle_tasks_list(cx, params, self.task_manager.as_ref())?;
1367                    Ok(serde_json::to_value(result).map_err(McpError::from)?)
1368                }
1369                "tasks/get" => {
1370                    let params: GetTaskParams = parse_params(params)?;
1371                    let result =
1372                        self.router
1373                            .handle_tasks_get(cx, params, self.task_manager.as_ref())?;
1374                    Ok(serde_json::to_value(result).map_err(McpError::from)?)
1375                }
1376                "tasks/cancel" => {
1377                    let params: CancelTaskParams = parse_params(params)?;
1378                    let result =
1379                        self.router
1380                            .handle_tasks_cancel(cx, params, self.task_manager.as_ref())?;
1381                    Ok(serde_json::to_value(result).map_err(McpError::from)?)
1382                }
1383                "tasks/submit" => {
1384                    let params: SubmitTaskParams = parse_params(params)?;
1385                    let result =
1386                        self.router
1387                            .handle_tasks_submit(cx, params, self.task_manager.as_ref())?;
1388                    Ok(serde_json::to_value(result).map_err(McpError::from)?)
1389                }
1390                _ => Err(McpError::method_not_found(method)),
1391            }
1392        })();
1393
1394        let final_result = match result {
1395            Ok(v) => self.apply_middleware_response(&entered_middleware, &mw_ctx, &request, v),
1396            Err(e) => Err(self.apply_middleware_error(&entered_middleware, &mw_ctx, &request, e)),
1397        };
1398
1399        self.maybe_emit_log_notification(
1400            session,
1401            notification_sender,
1402            &request.method,
1403            &final_result,
1404        );
1405
1406        final_result
1407    }
1408
1409    fn apply_middleware_response(
1410        &self,
1411        stack: &[&dyn crate::Middleware],
1412        ctx: &McpContext,
1413        request: &JsonRpcRequest,
1414        value: serde_json::Value,
1415    ) -> Result<serde_json::Value, McpError> {
1416        let mut response = value;
1417        for m in stack.iter().rev() {
1418            match m.on_response(ctx, request, response) {
1419                Ok(next) => response = next,
1420                Err(err) => {
1421                    let mapped = self.apply_middleware_error(stack, ctx, request, err);
1422                    return Err(mapped);
1423                }
1424            }
1425        }
1426        Ok(response)
1427    }
1428
1429    fn apply_middleware_error(
1430        &self,
1431        stack: &[&dyn crate::Middleware],
1432        ctx: &McpContext,
1433        request: &JsonRpcRequest,
1434        error: McpError,
1435    ) -> McpError {
1436        let mut err = error;
1437        for m in stack.iter().rev() {
1438            err = m.on_error(ctx, request, err);
1439        }
1440        err
1441    }
1442
1443    /// Creates bidirectional senders based on client capabilities.
1444    ///
1445    /// Returns `Some(BidirectionalSenders)` if the client supports any bidirectional
1446    /// features (sampling, elicitation), or `None` if no features are supported.
1447    fn create_bidirectional_senders(
1448        &self,
1449        session: &Session,
1450        request_sender: &bidirectional::RequestSender,
1451    ) -> Option<handler::BidirectionalSenders> {
1452        let supports_sampling = session.supports_sampling();
1453        let supports_elicitation = session.supports_elicitation();
1454
1455        if !supports_sampling && !supports_elicitation {
1456            return None;
1457        }
1458
1459        let mut senders = handler::BidirectionalSenders::new();
1460
1461        if supports_sampling {
1462            let sampling_sender: Arc<dyn fastmcp_core::SamplingSender> = Arc::new(
1463                bidirectional::TransportSamplingSender::new(request_sender.clone()),
1464            );
1465            senders = senders.with_sampling(sampling_sender);
1466        }
1467
1468        if supports_elicitation {
1469            let elicitation_sender: Arc<dyn fastmcp_core::ElicitationSender> = Arc::new(
1470                bidirectional::TransportElicitationSender::new(request_sender.clone()),
1471            );
1472            senders = senders.with_elicitation(elicitation_sender);
1473        }
1474
1475        Some(senders)
1476    }
1477
1478    fn should_authenticate(&self, method: &str) -> bool {
1479        !matches!(
1480            method,
1481            "initialize" | "initialized" | "notifications/cancelled" | "ping"
1482        )
1483    }
1484
1485    fn authenticate_request(
1486        &self,
1487        cx: &Cx,
1488        request_id: u64,
1489        session: &Session,
1490        request: AuthRequest<'_>,
1491    ) -> Result<AuthContext, McpError> {
1492        let Some(provider) = &self.auth_provider else {
1493            return Ok(AuthContext::anonymous());
1494        };
1495
1496        let ctx = McpContext::with_state(cx.clone(), request_id, session.state().clone());
1497        let auth = provider.authenticate(&ctx, request)?;
1498        if !ctx.set_auth(auth.clone()) {
1499            debug!(
1500                target: targets::SESSION,
1501                "Auth context not stored (session state unavailable)"
1502            );
1503        }
1504        Ok(auth)
1505    }
1506
1507    fn handle_cancelled_notification(&self, params: CancelledParams) {
1508        let reason = params.reason.as_deref().unwrap_or("unspecified");
1509        let await_cleanup = params.await_cleanup.unwrap_or(false);
1510        info!(
1511            target: targets::SESSION,
1512            "Cancellation requested for requestId={} (reason: {}, await_cleanup={})",
1513            params.request_id,
1514            reason,
1515            await_cleanup
1516        );
1517        let active = {
1518            let guard = self.active_requests.lock().unwrap_or_else(|poisoned| {
1519                error!(target: targets::SERVER, "active_requests lock poisoned, recovering");
1520                poisoned.into_inner()
1521            });
1522            guard
1523                .get(&params.request_id)
1524                .map(|entry| (entry.cx.clone(), entry.region_id, entry.completion.clone()))
1525        };
1526        if let Some((cx, region_id, completion)) = active {
1527            cx.cancel_with(CancelKind::User, None);
1528            if await_cleanup {
1529                let completed = completion.wait_timeout(AWAIT_CLEANUP_TIMEOUT);
1530                if !completed {
1531                    fastmcp_core::logging::warn!(
1532                        target: targets::SESSION,
1533                        "await_cleanup timed out for requestId={} (region={:?})",
1534                        params.request_id,
1535                        region_id
1536                    );
1537                }
1538            }
1539        } else {
1540            fastmcp_core::logging::warn!(
1541                target: targets::SESSION,
1542                "No active request found for cancellation requestId={}",
1543                params.request_id
1544            );
1545        }
1546    }
1547
1548    fn cancel_active_requests(&self, kind: CancelKind, await_cleanup: bool) {
1549        let active: Vec<(RequestId, RegionId, Cx, Arc<RequestCompletion>)> = {
1550            let guard = self.active_requests.lock().unwrap_or_else(|poisoned| {
1551                error!(target: targets::SERVER, "active_requests lock poisoned in cancel_active_requests, recovering");
1552                poisoned.into_inner()
1553            });
1554            guard
1555                .iter()
1556                .map(|(request_id, entry)| {
1557                    (
1558                        request_id.clone(),
1559                        entry.region_id,
1560                        entry.cx.clone(),
1561                        entry.completion.clone(),
1562                    )
1563                })
1564                .collect()
1565        };
1566        if active.is_empty() {
1567            return;
1568        }
1569        info!(
1570            target: targets::SESSION,
1571            "Cancelling {} active request(s) (kind={:?}, await_cleanup={})",
1572            active.len(),
1573            kind,
1574            await_cleanup
1575        );
1576        for (_, _, cx, _) in &active {
1577            cx.cancel_with(kind, None);
1578        }
1579
1580        if await_cleanup {
1581            for (request_id, region_id, _cx, completion) in active {
1582                let completed = completion.wait_timeout(AWAIT_CLEANUP_TIMEOUT);
1583                if !completed {
1584                    fastmcp_core::logging::warn!(
1585                        target: targets::SESSION,
1586                        "Shutdown cancel timed out for requestId={} (region={:?})",
1587                        request_id,
1588                        region_id
1589                    );
1590                }
1591            }
1592        }
1593    }
1594
1595    fn handle_set_log_level(&self, session: &mut Session, params: SetLogLevelParams) {
1596        let requested = match params.level {
1597            LogLevel::Debug => LevelFilter::Debug,
1598            LogLevel::Info => LevelFilter::Info,
1599            LogLevel::Warning => LevelFilter::Warn,
1600            LogLevel::Error => LevelFilter::Error,
1601        };
1602
1603        let configured = self.logging.level.to_level_filter();
1604        let effective = if requested > configured {
1605            configured
1606        } else {
1607            requested
1608        };
1609
1610        log::set_max_level(effective);
1611
1612        let effective_level = match effective {
1613            LevelFilter::Debug => LogLevel::Debug,
1614            LevelFilter::Info => LogLevel::Info,
1615            LevelFilter::Warn => LogLevel::Warning,
1616            LevelFilter::Error => LogLevel::Error,
1617            _ => LogLevel::Info,
1618        };
1619        session.set_log_level(effective_level);
1620
1621        if effective != requested {
1622            fastmcp_core::logging::warn!(
1623                target: targets::SESSION,
1624                "Client requested log level {:?}; clamped to server level {:?}",
1625                params.level,
1626                effective
1627            );
1628        } else {
1629            info!(
1630                target: targets::SESSION,
1631                "Log level set to {:?}",
1632                params.level
1633            );
1634        }
1635    }
1636
1637    fn log_level_rank(level: LogLevel) -> u8 {
1638        match level {
1639            LogLevel::Debug => 1,
1640            LogLevel::Info => 2,
1641            LogLevel::Warning => 3,
1642            LogLevel::Error => 4,
1643        }
1644    }
1645
1646    fn emit_log_notification(
1647        &self,
1648        session: &Session,
1649        sender: &NotificationSender,
1650        level: LogLevel,
1651        message: impl Into<String>,
1652    ) {
1653        let Some(min_level) = session.log_level() else {
1654            return;
1655        };
1656        if Self::log_level_rank(level) < Self::log_level_rank(min_level) {
1657            return;
1658        }
1659
1660        let ts = chrono::Utc::now().to_rfc3339();
1661        let text = format!("{ts} {}", message.into());
1662        let params = LogMessageParams {
1663            level,
1664            logger: Some("fastmcp_rust::server".to_string()),
1665            data: serde_json::Value::String(text),
1666        };
1667        let payload = match serde_json::to_value(params) {
1668            Ok(value) => value,
1669            Err(err) => {
1670                fastmcp_core::logging::warn!(
1671                    target: targets::SESSION,
1672                    "Failed to serialize log message notification: {}",
1673                    err
1674                );
1675                return;
1676            }
1677        };
1678        sender(JsonRpcRequest::notification(
1679            "notifications/message",
1680            Some(payload),
1681        ));
1682    }
1683
1684    fn maybe_emit_log_notification(
1685        &self,
1686        session: &Session,
1687        sender: &NotificationSender,
1688        method: &str,
1689        result: &McpResult<serde_json::Value>,
1690    ) {
1691        if method.starts_with("notifications/") || method == "logging/setLevel" {
1692            return;
1693        }
1694        let level = if result.is_ok() {
1695            LogLevel::Info
1696        } else {
1697            LogLevel::Error
1698        };
1699        let message = if result.is_ok() {
1700            format!("Handled {}", method)
1701        } else {
1702            format!("Error handling {}", method)
1703        };
1704        self.emit_log_notification(session, sender, level, message);
1705    }
1706}
1707
1708const AWAIT_CLEANUP_TIMEOUT: Duration = Duration::from_secs(5);
1709
1710struct RequestCompletion {
1711    done: Mutex<bool>,
1712    cv: Condvar,
1713}
1714
1715impl RequestCompletion {
1716    fn new() -> Self {
1717        Self {
1718            done: Mutex::new(false),
1719            cv: Condvar::new(),
1720        }
1721    }
1722
1723    fn mark_done(&self) {
1724        let mut done = self
1725            .done
1726            .lock()
1727            .unwrap_or_else(std::sync::PoisonError::into_inner);
1728        if !*done {
1729            *done = true;
1730            self.cv.notify_all();
1731        }
1732    }
1733
1734    fn wait_timeout(&self, timeout: Duration) -> bool {
1735        let mut done = self
1736            .done
1737            .lock()
1738            .unwrap_or_else(std::sync::PoisonError::into_inner);
1739        if *done {
1740            return true;
1741        }
1742        let start = Instant::now();
1743        let mut remaining = timeout;
1744        loop {
1745            let (guard, result) = self
1746                .cv
1747                .wait_timeout(done, remaining)
1748                .unwrap_or_else(std::sync::PoisonError::into_inner);
1749            done = guard;
1750            if *done {
1751                return true;
1752            }
1753            if result.timed_out() {
1754                return false;
1755            }
1756            let elapsed = start.elapsed();
1757            remaining = match timeout.checked_sub(elapsed) {
1758                Some(left) if !left.is_zero() => left,
1759                _ => return false,
1760            };
1761        }
1762    }
1763
1764    fn is_done(&self) -> bool {
1765        let done = self
1766            .done
1767            .lock()
1768            .unwrap_or_else(std::sync::PoisonError::into_inner);
1769        *done
1770    }
1771}
1772
1773struct ActiveRequest {
1774    cx: Cx,
1775    region_id: RegionId,
1776    completion: Arc<RequestCompletion>,
1777}
1778
1779impl ActiveRequest {
1780    fn new(cx: Cx, completion: Arc<RequestCompletion>) -> Self {
1781        let region_id = cx.region_id();
1782        Self {
1783            cx,
1784            region_id,
1785            completion,
1786        }
1787    }
1788}
1789
1790struct ActiveRequestGuard<'a> {
1791    map: &'a Mutex<HashMap<RequestId, ActiveRequest>>,
1792    id: RequestId,
1793    completion: Arc<RequestCompletion>,
1794}
1795
1796impl<'a> ActiveRequestGuard<'a> {
1797    fn new(map: &'a Mutex<HashMap<RequestId, ActiveRequest>>, id: RequestId, cx: Cx) -> Self {
1798        let completion = Arc::new(RequestCompletion::new());
1799        let entry = ActiveRequest::new(cx, completion.clone());
1800        let mut guard = map
1801            .lock()
1802            .unwrap_or_else(std::sync::PoisonError::into_inner);
1803        if guard.insert(id.clone(), entry).is_some() {
1804            fastmcp_core::logging::warn!(
1805                target: targets::SESSION,
1806                "Active request replaced for requestId={}",
1807                id
1808            );
1809        }
1810        Self {
1811            map,
1812            id,
1813            completion,
1814        }
1815    }
1816}
1817
1818impl Drop for ActiveRequestGuard<'_> {
1819    fn drop(&mut self) {
1820        {
1821            let mut guard = self
1822                .map
1823                .lock()
1824                .unwrap_or_else(std::sync::PoisonError::into_inner);
1825            match guard.get(&self.id) {
1826                Some(entry) if Arc::ptr_eq(&entry.completion, &self.completion) => {
1827                    guard.remove(&self.id);
1828                }
1829                Some(_) => {
1830                    fastmcp_core::logging::warn!(
1831                        target: targets::SESSION,
1832                        "Active request replaced before drop for requestId={}",
1833                        self.id
1834                    );
1835                }
1836                None => {
1837                    fastmcp_core::logging::warn!(
1838                        target: targets::SESSION,
1839                        "Active request missing on drop for requestId={}",
1840                        self.id
1841                    );
1842                }
1843            }
1844        }
1845        self.completion.mark_done();
1846    }
1847}
1848
1849/// Checks if banner should be suppressed via environment variable.
1850///
1851/// This is a legacy check. Prefer using `ConsoleConfig` for banner control.
1852fn banner_suppressed() -> bool {
1853    std::env::var("FASTMCP_NO_BANNER")
1854        .map(|value| matches!(value.to_lowercase().as_str(), "1" | "true" | "yes"))
1855        .unwrap_or(false)
1856}
1857
1858/// Parses required parameters from JSON.
1859fn parse_params<T: serde::de::DeserializeOwned>(
1860    params: Option<serde_json::Value>,
1861) -> Result<T, McpError> {
1862    let value = params.ok_or_else(|| McpError::invalid_params("Missing required parameters"))?;
1863    serde_json::from_value(value).map_err(|e| McpError::invalid_params(e.to_string()))
1864}
1865
1866/// Parses optional parameters from JSON, using default if not provided.
1867fn parse_params_or_default<T: serde::de::DeserializeOwned + Default>(
1868    params: Option<serde_json::Value>,
1869) -> Result<T, McpError> {
1870    match params {
1871        Some(value) => {
1872            serde_json::from_value(value).map_err(|e| McpError::invalid_params(e.to_string()))
1873        }
1874        None => Ok(T::default()),
1875    }
1876}
1877
1878/// Converts a JSON-RPC RequestId to a u64 for internal tracking.
1879///
1880/// If the ID is a number, uses that number. If it's a string or absent,
1881/// uses a stable hash (string) or 0 (absent) as a fallback.
1882fn request_id_to_u64(id: Option<&RequestId>) -> u64 {
1883    match id {
1884        Some(RequestId::Number(n)) => *n as u64,
1885        Some(RequestId::String(s)) => stable_hash_request_id(s),
1886        None => 0,
1887    }
1888}
1889
1890fn stable_hash_request_id(value: &str) -> u64 {
1891    const FNV_OFFSET: u64 = 0xcbf29ce484222325;
1892    const FNV_PRIME: u64 = 0x0000_0100_0000_01b3;
1893    let mut hash = FNV_OFFSET;
1894    for byte in value.as_bytes() {
1895        hash ^= u64::from(*byte);
1896        hash = hash.wrapping_mul(FNV_PRIME);
1897    }
1898    if hash == 0 { FNV_OFFSET } else { hash }
1899}
1900
1901struct SharedTransport<T> {
1902    inner: Arc<Mutex<T>>,
1903}
1904
1905impl<T> Clone for SharedTransport<T> {
1906    fn clone(&self) -> Self {
1907        Self {
1908            inner: Arc::clone(&self.inner),
1909        }
1910    }
1911}
1912
1913impl<T: Transport> SharedTransport<T> {
1914    fn new(transport: T) -> Self {
1915        Self {
1916            inner: Arc::new(Mutex::new(transport)),
1917        }
1918    }
1919
1920    fn recv(&self, cx: &Cx) -> Result<JsonRpcMessage, TransportError> {
1921        let mut guard = self.inner.lock().map_err(|_| transport_lock_error())?;
1922        guard.recv(cx)
1923    }
1924
1925    fn send(&self, cx: &Cx, message: &JsonRpcMessage) -> Result<(), TransportError> {
1926        let mut guard = self.inner.lock().map_err(|_| transport_lock_error())?;
1927        guard.send(cx, message)
1928    }
1929}
1930
1931fn transport_lock_error() -> TransportError {
1932    TransportError::Io(std::io::Error::other("transport lock poisoned"))
1933}
1934
1935fn create_transport_notification_sender<T>(transport: SharedTransport<T>) -> NotificationSender
1936where
1937    T: Transport + Send + 'static,
1938{
1939    let cx = Cx::for_request();
1940
1941    Arc::new(move |request: JsonRpcRequest| {
1942        let message = JsonRpcMessage::Request(request);
1943        if let Err(e) = transport.send(&cx, &message) {
1944            log::error!(
1945                target: targets::TRANSPORT,
1946                "Failed to send notification: {}",
1947                e
1948            );
1949        }
1950    })
1951}
1952
1953/// Creates a notification sender that writes JSON-RPC notifications to stdout.
1954///
1955/// This creates a separate stdout handle for sending notifications, allowing
1956/// notifications (like progress updates) to be sent during handler execution
1957/// independently of the main transport.
1958///
1959/// The sender uses NDJSON format (newline-delimited JSON) to match the
1960/// standard MCP transport format.
1961fn create_notification_sender() -> NotificationSender {
1962    use std::sync::Mutex;
1963
1964    // Use AsyncStdout so notifications share the global stdout lock used by
1965    // the transport writer, preventing interleaved NDJSON writes.
1966    let stdout = Mutex::new(AsyncStdout::new());
1967    let codec = Codec::new();
1968
1969    Arc::new(move |request: JsonRpcRequest| {
1970        let bytes = match codec.encode_request(&request) {
1971            Ok(b) => b,
1972            Err(e) => {
1973                log::error!(target: targets::SERVER, "Failed to encode notification: {}", e);
1974                return;
1975            }
1976        };
1977
1978        if let Ok(mut stdout) = stdout.lock() {
1979            if let Err(e) = stdout.write_all_unchecked(&bytes) {
1980                log::error!(target: targets::TRANSPORT, "Failed to send notification: {}", e);
1981            }
1982            if let Err(e) = stdout.flush_unchecked() {
1983                log::error!(target: targets::TRANSPORT, "Failed to flush notification: {}", e);
1984            }
1985        } else {
1986            log::warn!(target: targets::SERVER, "Failed to acquire stdout lock for notification");
1987        }
1988    })
1989}
1990
1991#[cfg(test)]
1992mod lib_unit_tests {
1993    use super::*;
1994
1995    // ── parse_params ────────────────────────────────────────────────
1996
1997    #[test]
1998    fn parse_params_none_returns_error() {
1999        let result = parse_params::<serde_json::Value>(None);
2000        let err = result.unwrap_err();
2001        assert!(err.message.contains("Missing required parameters"));
2002    }
2003
2004    #[test]
2005    fn parse_params_invalid_json_returns_error() {
2006        // Pass a string where a struct is expected
2007        let result = parse_params::<ListToolsParams>(Some(serde_json::json!("not_an_object")));
2008        assert!(result.is_err());
2009    }
2010
2011    #[test]
2012    fn parse_params_valid_json_succeeds() {
2013        let result = parse_params::<ReadResourceParams>(Some(serde_json::json!({"uri": "x://y"})));
2014        let params = result.unwrap();
2015        assert_eq!(params.uri, "x://y");
2016    }
2017
2018    // ── parse_params_or_default ─────────────────────────────────────
2019
2020    #[test]
2021    fn parse_params_or_default_none_returns_default() {
2022        let result = parse_params_or_default::<ListToolsParams>(None);
2023        let params = result.unwrap();
2024        assert!(params.cursor.is_none());
2025    }
2026
2027    #[test]
2028    fn parse_params_or_default_invalid_json_returns_error() {
2029        let result =
2030            parse_params_or_default::<ListToolsParams>(Some(serde_json::json!("bad_input")));
2031        assert!(result.is_err());
2032    }
2033
2034    #[test]
2035    fn parse_params_or_default_valid_json_succeeds() {
2036        let result =
2037            parse_params_or_default::<ListToolsParams>(Some(serde_json::json!({"cursor": "abc"})));
2038        let params = result.unwrap();
2039        assert_eq!(params.cursor.as_deref(), Some("abc"));
2040    }
2041
2042    // ── request_id_to_u64 ───────────────────────────────────────────
2043
2044    #[test]
2045    fn request_id_to_u64_number() {
2046        let id = RequestId::Number(42);
2047        assert_eq!(request_id_to_u64(Some(&id)), 42);
2048    }
2049
2050    #[test]
2051    fn request_id_to_u64_string() {
2052        let id = RequestId::String("req-123".to_string());
2053        let result = request_id_to_u64(Some(&id));
2054        assert_ne!(result, 0);
2055    }
2056
2057    #[test]
2058    fn request_id_to_u64_none() {
2059        assert_eq!(request_id_to_u64(None), 0);
2060    }
2061
2062    // ── stable_hash_request_id ──────────────────────────────────────
2063
2064    #[test]
2065    fn stable_hash_is_deterministic() {
2066        let h1 = stable_hash_request_id("test");
2067        let h2 = stable_hash_request_id("test");
2068        assert_eq!(h1, h2);
2069    }
2070
2071    #[test]
2072    fn stable_hash_never_returns_zero() {
2073        // Empty string and various inputs should never produce 0
2074        assert_ne!(stable_hash_request_id(""), 0);
2075        assert_ne!(stable_hash_request_id("a"), 0);
2076    }
2077
2078    #[test]
2079    fn stable_hash_different_inputs_differ() {
2080        let h1 = stable_hash_request_id("alpha");
2081        let h2 = stable_hash_request_id("beta");
2082        assert_ne!(h1, h2);
2083    }
2084
2085    // ── RequestCompletion ───────────────────────────────────────────
2086
2087    #[test]
2088    fn request_completion_new_is_not_done() {
2089        let rc = RequestCompletion::new();
2090        assert!(!rc.is_done());
2091    }
2092
2093    #[test]
2094    fn request_completion_mark_done_sets_done() {
2095        let rc = RequestCompletion::new();
2096        rc.mark_done();
2097        assert!(rc.is_done());
2098    }
2099
2100    #[test]
2101    fn request_completion_mark_done_idempotent() {
2102        let rc = RequestCompletion::new();
2103        rc.mark_done();
2104        rc.mark_done(); // should not panic
2105        assert!(rc.is_done());
2106    }
2107
2108    #[test]
2109    fn request_completion_wait_timeout_returns_true_if_done() {
2110        let rc = RequestCompletion::new();
2111        rc.mark_done();
2112        assert!(rc.wait_timeout(Duration::from_millis(10)));
2113    }
2114
2115    #[test]
2116    fn request_completion_wait_timeout_returns_false_if_not_done() {
2117        let rc = RequestCompletion::new();
2118        assert!(!rc.wait_timeout(Duration::from_millis(10)));
2119    }
2120
2121    // ── DuplicateBehavior ───────────────────────────────────────────
2122
2123    #[test]
2124    fn duplicate_behavior_default_is_warn() {
2125        assert_eq!(DuplicateBehavior::default(), DuplicateBehavior::Warn);
2126    }
2127
2128    #[test]
2129    fn duplicate_behavior_debug_and_clone() {
2130        let b = DuplicateBehavior::Error;
2131        let debug = format!("{:?}", b);
2132        assert!(debug.contains("Error"));
2133        let cloned = b;
2134        assert_eq!(cloned, DuplicateBehavior::Error);
2135    }
2136
2137    #[test]
2138    fn duplicate_behavior_all_variants_are_distinct() {
2139        assert_ne!(DuplicateBehavior::Error, DuplicateBehavior::Warn);
2140        assert_ne!(DuplicateBehavior::Warn, DuplicateBehavior::Replace);
2141        assert_ne!(DuplicateBehavior::Replace, DuplicateBehavior::Ignore);
2142    }
2143
2144    // ── LoggingConfig ───────────────────────────────────────────────
2145
2146    #[test]
2147    fn logging_config_default_values() {
2148        let config = LoggingConfig::default();
2149        assert_eq!(config.level, Level::Info);
2150        assert!(config.timestamps);
2151        assert!(config.targets);
2152        assert!(!config.file_line);
2153    }
2154
2155    // ── LifespanHooks ───────────────────────────────────────────────
2156
2157    #[test]
2158    fn lifespan_hooks_new_has_no_hooks() {
2159        let hooks = LifespanHooks::new();
2160        assert!(hooks.on_startup.is_none());
2161        assert!(hooks.on_shutdown.is_none());
2162    }
2163
2164    // ── log_level_rank ──────────────────────────────────────────────
2165
2166    #[test]
2167    fn log_level_rank_ordering() {
2168        assert!(Server::log_level_rank(LogLevel::Debug) < Server::log_level_rank(LogLevel::Info));
2169        assert!(Server::log_level_rank(LogLevel::Info) < Server::log_level_rank(LogLevel::Warning));
2170        assert!(
2171            Server::log_level_rank(LogLevel::Warning) < Server::log_level_rank(LogLevel::Error)
2172        );
2173    }
2174
2175    // ── ActiveRequestGuard ──────────────────────────────────────────
2176
2177    #[test]
2178    fn active_request_guard_removes_on_drop() {
2179        let map = Mutex::new(HashMap::new());
2180        let cx = Cx::for_testing();
2181        let id = RequestId::Number(1);
2182        {
2183            let _guard = ActiveRequestGuard::new(&map, id.clone(), cx);
2184            assert_eq!(map.lock().unwrap().len(), 1);
2185        }
2186        // After drop, the entry should be removed
2187        assert_eq!(map.lock().unwrap().len(), 0);
2188    }
2189
2190    // =========================================================================
2191    // Additional coverage tests (bd-cd79)
2192    // =========================================================================
2193
2194    #[test]
2195    fn logging_config_debug_and_clone() {
2196        let config = LoggingConfig::default();
2197        let debug = format!("{config:?}");
2198        assert!(debug.contains("LoggingConfig"));
2199        assert!(debug.contains("Info"));
2200
2201        let cloned = config.clone();
2202        assert_eq!(cloned.level, Level::Info);
2203        assert_eq!(cloned.timestamps, config.timestamps);
2204    }
2205
2206    #[test]
2207    fn transport_lock_error_is_io() {
2208        let err = transport_lock_error();
2209        match err {
2210            TransportError::Io(io) => {
2211                assert!(io.to_string().contains("poisoned"));
2212            }
2213            other => panic!("expected Io variant, got: {other:?}"),
2214        }
2215    }
2216
2217    #[test]
2218    fn lifespan_hooks_default_matches_new() {
2219        let default_hooks = LifespanHooks::default();
2220        let new_hooks = LifespanHooks::new();
2221        assert!(default_hooks.on_startup.is_none());
2222        assert!(default_hooks.on_shutdown.is_none());
2223        assert!(new_hooks.on_startup.is_none());
2224        assert!(new_hooks.on_shutdown.is_none());
2225    }
2226
2227    #[test]
2228    fn request_completion_wait_resolves_on_concurrent_done() {
2229        use std::sync::Arc;
2230        use std::thread;
2231
2232        let rc = Arc::new(RequestCompletion::new());
2233        let rc_clone = rc.clone();
2234
2235        let handle = thread::spawn(move || {
2236            thread::sleep(Duration::from_millis(20));
2237            rc_clone.mark_done();
2238        });
2239
2240        // Should resolve within the timeout because the other thread marks done
2241        assert!(rc.wait_timeout(Duration::from_secs(2)));
2242        handle.join().unwrap();
2243    }
2244
2245    #[test]
2246    fn active_request_stores_region_id() {
2247        let cx = Cx::for_testing();
2248        let expected_region = cx.region_id();
2249        let completion = Arc::new(RequestCompletion::new());
2250        let ar = ActiveRequest::new(cx, completion);
2251        assert_eq!(ar.region_id, expected_region);
2252    }
2253}