Skip to main content

fastmcp_server/
lib.rs

1//! MCP server implementation for FastMCP.
2//!
3//! This crate provides the server-side implementation:
4//! - Server builder pattern
5//! - Tool, resource, and prompt registration
6//! - Request routing and dispatching
7//! - Session management
8//!
9//! # Example
10//!
11//! ```ignore
12//! use fastmcp::prelude::*;
13//!
14//! #[tool]
15//! async fn greet(ctx: &McpContext, name: String) -> String {
16//!     format!("Hello, {name}!")
17//! }
18//!
19//! fn main() {
20//!     Server::new("my-server", "1.0.0")
21//!         .tool(greet)
22//!         .run_stdio();
23//! }
24//! ```
25//!
26//! # Role in the System
27//!
28//! `fastmcp-server` is the **execution engine** for MCP servers. It ties
29//! together:
30//! - Protocol types (`fastmcp-protocol`) for requests and responses
31//! - Transports (`fastmcp-transport`) for stdio/SSE/WebSocket I/O
32//! - Core context + cancellation (`fastmcp-core`) for budgets and checkpoints
33//! - Console output (`fastmcp-console`) for human-friendly stderr rendering
34//!
35//! The façade crate `fastmcp` re-exports this API, so most users interact with
36//! `Server` via `fastmcp::prelude::*`.
37
38#![forbid(unsafe_code)]
39#![allow(dead_code)]
40
41mod auth;
42pub mod bidirectional;
43mod builder;
44pub mod caching;
45pub mod docket;
46mod handler;
47mod middleware;
48pub mod oauth;
49pub mod oidc;
50pub mod providers;
51mod proxy;
52pub mod rate_limiting;
53mod router;
54mod session;
55mod tasks;
56pub mod transform;
57
58#[cfg(test)]
59mod tests;
60
61#[cfg(feature = "jwt")]
62pub use auth::JwtTokenVerifier;
63pub use auth::{
64    AllowAllAuthProvider, AuthProvider, AuthRequest, StaticTokenVerifier, TokenAuthProvider,
65    TokenVerifier,
66};
67pub use builder::ServerBuilder;
68pub use fastmcp_console::config::{BannerStyle, ConsoleConfig, TrafficVerbosity};
69pub use fastmcp_console::stats::{ServerStats, StatsSnapshot};
70pub use handler::{
71    BidirectionalSenders, BoxFuture, ProgressNotificationSender, PromptHandler, ResourceHandler,
72    ToolHandler, create_context_with_progress, create_context_with_progress_and_senders,
73};
74pub use middleware::{Middleware, MiddlewareDecision};
75pub use proxy::{ProxyBackend, ProxyCatalog, ProxyClient};
76pub use router::{
77    MountResult, NotificationSender, Router, RouterResourceReader, RouterToolCaller, TagFilters,
78};
79pub use session::Session;
80pub use tasks::{SharedTaskManager, TaskManager};
81
82// Re-export bidirectional communication types
83pub use bidirectional::{
84    PendingRequests, RequestSender, TransportElicitationSender, TransportRootsProvider,
85    TransportSamplingSender,
86};
87
88use std::collections::HashMap;
89use std::io::{Read, Write};
90use std::sync::{Arc, Condvar, Mutex};
91use std::time::{Duration, Instant};
92
93use asupersync::{Budget, CancelKind, Cx, RegionId};
94use fastmcp_console::client::RequestResponseRenderer;
95use fastmcp_console::logging::RichLoggerBuilder;
96use fastmcp_console::{banner::StartupBanner, console};
97use fastmcp_core::logging::{debug, error, info, targets};
98use fastmcp_core::{AuthContext, McpContext, McpError, McpErrorCode, McpResult};
99use fastmcp_protocol::{
100    CallToolParams, CancelTaskParams, CancelledParams, GetPromptParams, GetTaskParams,
101    InitializeParams, JsonRpcError, JsonRpcMessage, JsonRpcRequest, JsonRpcResponse,
102    ListPromptsParams, ListResourceTemplatesParams, ListResourcesParams, ListTasksParams,
103    ListToolsParams, LogLevel, LogMessageParams, Prompt, ReadResourceParams, RequestId, Resource,
104    ResourceTemplate, ServerCapabilities, ServerInfo, SetLogLevelParams, SubmitTaskParams,
105    SubscribeResourceParams, Tool, UnsubscribeResourceParams,
106};
107use fastmcp_transport::sse::SseServerTransport;
108use fastmcp_transport::websocket::WsTransport;
109use fastmcp_transport::{AsyncStdout, Codec, StdioTransport, Transport, TransportError};
110use log::{Level, LevelFilter};
111
112/// Type alias for startup hook function.
113pub type StartupHook =
114    Box<dyn FnOnce() -> Result<(), Box<dyn std::error::Error + Send + Sync>> + Send>;
115
116/// Type alias for shutdown hook function.
117pub type ShutdownHook = Box<dyn FnOnce() + Send>;
118
119/// Lifecycle hooks for server startup and shutdown.
120///
121/// These hooks allow custom initialization and cleanup logic to run
122/// at well-defined points in the server lifecycle:
123///
124/// - `on_startup`: Called before the server starts accepting connections
125/// - `on_shutdown`: Called when the server is shutting down
126///
127/// # Example
128///
129/// ```ignore
130/// use fastmcp::prelude::*;
131///
132/// Server::new("demo", "1.0.0")
133///     .on_startup(|| {
134///         println!("Initializing...");
135///         // Initialize database, caches, etc.
136///         Ok(())
137///     })
138///     .on_shutdown(|| {
139///         println!("Cleaning up...");
140///         // Close connections, flush buffers, etc.
141///     })
142///     .run_stdio();
143/// ```
144#[derive(Default)]
145pub struct LifespanHooks {
146    /// Hook called before the server starts accepting connections.
147    pub on_startup: Option<StartupHook>,
148    /// Hook called when the server is shutting down.
149    pub on_shutdown: Option<ShutdownHook>,
150}
151
152impl LifespanHooks {
153    /// Creates empty lifecycle hooks.
154    #[must_use]
155    pub fn new() -> Self {
156        Self::default()
157    }
158}
159
160/// Logging configuration for the server.
161#[derive(Debug, Clone)]
162pub struct LoggingConfig {
163    /// Minimum log level (default: INFO).
164    pub level: Level,
165    /// Show timestamps in logs (default: true).
166    pub timestamps: bool,
167    /// Show module targets in logs (default: true).
168    pub targets: bool,
169    /// Show file:line in logs (default: false).
170    pub file_line: bool,
171}
172
173impl Default for LoggingConfig {
174    fn default() -> Self {
175        Self {
176            level: Level::Info,
177            timestamps: true,
178            targets: true,
179            file_line: false,
180        }
181    }
182}
183
184impl LoggingConfig {
185    /// Create logging config from environment variables.
186    ///
187    /// Respects:
188    /// - `FASTMCP_LOG`: Log level (error, warn, info, debug, trace)
189    /// - `FASTMCP_LOG_TIMESTAMPS`: Show timestamps (0/false to disable)
190    /// - `FASTMCP_LOG_TARGETS`: Show targets (0/false to disable)
191    /// - `FASTMCP_LOG_FILE_LINE`: Show file:line (1/true to enable)
192    #[must_use]
193    pub fn from_env() -> Self {
194        let level = std::env::var("FASTMCP_LOG")
195            .ok()
196            .and_then(|s| match s.to_lowercase().as_str() {
197                "error" => Some(Level::Error),
198                "warn" | "warning" => Some(Level::Warn),
199                "info" => Some(Level::Info),
200                "debug" => Some(Level::Debug),
201                "trace" => Some(Level::Trace),
202                _ => None,
203            })
204            .unwrap_or(Level::Info);
205
206        let timestamps = std::env::var("FASTMCP_LOG_TIMESTAMPS")
207            .map(|s| !matches!(s.to_lowercase().as_str(), "0" | "false" | "no"))
208            .unwrap_or(true);
209
210        let targets = std::env::var("FASTMCP_LOG_TARGETS")
211            .map(|s| !matches!(s.to_lowercase().as_str(), "0" | "false" | "no"))
212            .unwrap_or(true);
213
214        let file_line = std::env::var("FASTMCP_LOG_FILE_LINE")
215            .map(|s| matches!(s.to_lowercase().as_str(), "1" | "true" | "yes"))
216            .unwrap_or(false);
217
218        Self {
219            level,
220            timestamps,
221            targets,
222            file_line,
223        }
224    }
225}
226
227/// Behavior when registering a component with a name that already exists.
228///
229/// This setting controls how the server handles duplicate tool, resource,
230/// or prompt names during registration.
231#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
232pub enum DuplicateBehavior {
233    /// Raise an error and fail registration.
234    ///
235    /// Use this for strict validation in production environments.
236    Error,
237
238    /// Log a warning and keep the original component.
239    ///
240    /// This is the default behavior, providing visibility into duplicates
241    /// while maintaining backwards compatibility.
242    #[default]
243    Warn,
244
245    /// Replace the original component with the new one.
246    ///
247    /// Use this when you want later registrations to override earlier ones.
248    Replace,
249
250    /// Silently keep the original component.
251    ///
252    /// Use this when duplicates are expected and should be ignored.
253    Ignore,
254}
255
256/// An MCP server instance.
257///
258/// Servers are built using [`ServerBuilder`] and can run on various
259/// transports (stdio, SSE, WebSocket).
260pub struct Server {
261    info: ServerInfo,
262    capabilities: ServerCapabilities,
263    router: Router,
264    instructions: Option<String>,
265    /// Request timeout in seconds (0 = no timeout).
266    request_timeout_secs: u64,
267    /// Runtime statistics collector (None = disabled).
268    stats: Option<ServerStats>,
269    /// Whether to mask internal error details in responses.
270    mask_error_details: bool,
271    /// Logging configuration.
272    logging: LoggingConfig,
273    /// Console configuration for rich output.
274    console_config: ConsoleConfig,
275    /// Lifecycle hooks (wrapped in Option so they can be taken once).
276    lifespan: Mutex<Option<LifespanHooks>>,
277    /// Optional authentication provider.
278    auth_provider: Option<Arc<dyn AuthProvider>>,
279    /// Registered middleware.
280    middleware: Arc<Vec<Box<dyn crate::Middleware>>>,
281    /// Active requests by JSON-RPC request ID.
282    active_requests: Mutex<HashMap<RequestId, ActiveRequest>>,
283    /// Optional task manager for background tasks (Docket/SEP-1686).
284    task_manager: Option<SharedTaskManager>,
285    /// Pending server-to-client requests (for bidirectional communication).
286    pending_requests: Arc<bidirectional::PendingRequests>,
287}
288
289impl Server {
290    /// Creates a new server builder.
291    #[must_use]
292    #[allow(clippy::new_ret_no_self)]
293    pub fn new(name: impl Into<String>, version: impl Into<String>) -> ServerBuilder {
294        ServerBuilder::new(name, version)
295    }
296
297    /// Returns the server info.
298    #[must_use]
299    pub fn info(&self) -> &ServerInfo {
300        &self.info
301    }
302
303    /// Returns the server capabilities.
304    #[must_use]
305    pub fn capabilities(&self) -> &ServerCapabilities {
306        &self.capabilities
307    }
308
309    /// Lists all registered tools.
310    #[must_use]
311    pub fn tools(&self) -> Vec<Tool> {
312        self.router.tools()
313    }
314
315    /// Lists all registered resources.
316    #[must_use]
317    pub fn resources(&self) -> Vec<Resource> {
318        self.router.resources()
319    }
320
321    /// Lists all registered resource templates.
322    #[must_use]
323    pub fn resource_templates(&self) -> Vec<ResourceTemplate> {
324        self.router.resource_templates()
325    }
326
327    /// Lists all registered prompts.
328    #[must_use]
329    pub fn prompts(&self) -> Vec<Prompt> {
330        self.router.prompts()
331    }
332
333    /// Returns the task manager, if configured.
334    ///
335    /// Returns `None` if background tasks are not enabled.
336    #[must_use]
337    pub fn task_manager(&self) -> Option<&SharedTaskManager> {
338        self.task_manager.as_ref()
339    }
340
341    /// Consumes the server and returns its router.
342    ///
343    /// This is used for mounting one server's components into another.
344    #[must_use]
345    pub fn into_router(self) -> Router {
346        self.router
347    }
348
349    /// Returns the capabilities this server provides.
350    ///
351    /// This is useful when determining what components a server has
352    /// before mounting.
353    #[must_use]
354    pub fn has_tools(&self) -> bool {
355        self.capabilities.tools.is_some()
356    }
357
358    /// Returns whether this server has resources.
359    #[must_use]
360    pub fn has_resources(&self) -> bool {
361        self.capabilities.resources.is_some()
362    }
363
364    /// Returns whether this server has prompts.
365    #[must_use]
366    pub fn has_prompts(&self) -> bool {
367        self.capabilities.prompts.is_some()
368    }
369
370    /// Returns a point-in-time snapshot of server statistics.
371    ///
372    /// Returns `None` if statistics collection is disabled.
373    #[must_use]
374    pub fn stats(&self) -> Option<StatsSnapshot> {
375        self.stats.as_ref().map(ServerStats::snapshot)
376    }
377
378    /// Returns the raw statistics collector.
379    ///
380    /// Useful for advanced scenarios where you need direct access.
381    /// Returns `None` if statistics collection is disabled.
382    #[must_use]
383    pub fn stats_collector(&self) -> Option<&ServerStats> {
384        self.stats.as_ref()
385    }
386
387    /// Renders a stats panel to stderr, if stats are enabled.
388    pub fn display_stats(&self) {
389        let Some(stats) = self.stats.as_ref() else {
390            return;
391        };
392
393        let snapshot = stats.snapshot();
394        let renderer = fastmcp_console::stats::StatsRenderer::detect();
395        renderer.render_panel(&snapshot, console());
396    }
397
398    /// Returns the console configuration.
399    #[must_use]
400    pub fn console_config(&self) -> &ConsoleConfig {
401        &self.console_config
402    }
403
404    /// Renders the startup banner based on console configuration.
405    fn render_startup_banner(&self) {
406        let render = || {
407            let mut banner = StartupBanner::new(&self.info.name, &self.info.version)
408                .tools(self.router.tools_count())
409                .resources(self.router.resources_count())
410                .prompts(self.router.prompts_count())
411                .transport("stdio");
412
413            if let Some(desc) = self.instructions.as_deref().filter(|d| !d.is_empty()) {
414                banner = banner.description(desc);
415            }
416
417            // Apply banner style from config
418            match self.console_config.banner_style {
419                BannerStyle::Full => banner.render(console()),
420                BannerStyle::Compact | BannerStyle::Minimal => {
421                    // Compact/Minimal: render without the large logo
422                    banner.no_logo().render(console());
423                }
424                BannerStyle::None => {} // Already checked show_banner, but be safe
425            }
426        };
427
428        if let Err(err) = std::panic::catch_unwind(std::panic::AssertUnwindSafe(render)) {
429            eprintln!("Warning: banner rendering failed: {err:?}");
430        }
431    }
432
433    /// Initializes rich logging based on server configuration.
434    ///
435    /// This should be called early in the startup sequence, before any
436    /// log output is generated. If initialization fails (e.g., logger
437    /// already set), a warning is printed to stderr.
438    fn init_rich_logging(&self) {
439        let result = RichLoggerBuilder::new()
440            .level(self.logging.level)
441            .with_timestamps(self.logging.timestamps)
442            .with_targets(self.logging.targets)
443            .with_file_line(self.logging.file_line)
444            .init();
445
446        if let Err(e) = result {
447            // Logger already initialized (likely by user code), not an error
448            eprintln!("Note: Rich logging not initialized (logger already set): {e}");
449        }
450    }
451
452    /// Runs the server on stdio transport.
453    ///
454    /// This is the primary way to run MCP servers as subprocesses.
455    /// Creates a testing Cx and runs the server loop.
456    pub fn run_stdio(self) -> ! {
457        // Create a Cx for the server (for now, use testing Cx)
458        let cx = Cx::for_testing();
459        self.run_stdio_with_cx(&cx)
460    }
461
462    /// Runs the server on stdio with a provided Cx.
463    ///
464    /// This allows integration with a real asupersync runtime.
465    pub fn run_stdio_with_cx(self, cx: &Cx) -> ! {
466        // Initialize rich logging first, before any log output
467        self.init_rich_logging();
468
469        let transport = StdioTransport::stdio();
470        let shared = SharedTransport::new(transport);
471
472        // Create a notification sender that writes to a separate stdout handle.
473        // This allows progress notifications to be sent during handler execution
474        // while the main transport is blocked on recv().
475        let notification_sender = create_notification_sender();
476
477        let shared_recv = shared.clone();
478        let shared_send = shared.clone();
479        self.run_loop(
480            cx,
481            move |cx| shared_recv.recv(cx),
482            move |cx, message| shared_send.send(cx, message),
483            notification_sender,
484        )
485    }
486
487    /// Runs the server on a custom transport with a testing Cx.
488    ///
489    /// This is useful for SSE/WebSocket integrations where the transport is
490    /// provided by an external server framework.
491    pub fn run_transport<T>(self, transport: T) -> !
492    where
493        T: Transport + Send + 'static,
494    {
495        let cx = Cx::for_testing();
496        self.run_transport_with_cx(&cx, transport)
497    }
498
499    /// Runs the server on a custom transport with a provided Cx.
500    ///
501    /// This allows integration with a real asupersync runtime.
502    pub fn run_transport_with_cx<T>(self, cx: &Cx, transport: T) -> !
503    where
504        T: Transport + Send + 'static,
505    {
506        self.init_rich_logging();
507
508        let shared = SharedTransport::new(transport);
509        let notification_sender = create_transport_notification_sender(shared.clone());
510
511        let shared_recv = shared.clone();
512        let shared_send = shared;
513        self.run_loop(
514            cx,
515            move |cx| shared_recv.recv(cx),
516            move |cx, message| shared_send.send(cx, message),
517            notification_sender,
518        )
519    }
520
521    /// Runs the server using SSE transport with a testing Cx.
522    ///
523    /// This is a convenience wrapper around [`SseServerTransport`].
524    pub fn run_sse<W, R>(self, writer: W, request_source: R, endpoint_url: impl Into<String>) -> !
525    where
526        W: Write + Send + 'static,
527        R: Iterator<Item = JsonRpcRequest> + Send + 'static,
528    {
529        let transport = SseServerTransport::new(writer, request_source, endpoint_url);
530        self.run_transport(transport)
531    }
532
533    /// Runs the server using SSE transport with a provided Cx.
534    pub fn run_sse_with_cx<W, R>(
535        self,
536        cx: &Cx,
537        writer: W,
538        request_source: R,
539        endpoint_url: impl Into<String>,
540    ) -> !
541    where
542        W: Write + Send + 'static,
543        R: Iterator<Item = JsonRpcRequest> + Send + 'static,
544    {
545        let transport = SseServerTransport::new(writer, request_source, endpoint_url);
546        self.run_transport_with_cx(cx, transport)
547    }
548
549    /// Runs the server using WebSocket transport with a testing Cx.
550    ///
551    /// This is a convenience wrapper around [`WsTransport`].
552    pub fn run_websocket<R, W>(self, reader: R, writer: W) -> !
553    where
554        R: Read + Send + 'static,
555        W: Write + Send + 'static,
556    {
557        let transport = WsTransport::new(reader, writer);
558        self.run_transport(transport)
559    }
560
561    /// Runs the server using WebSocket transport with a provided Cx.
562    pub fn run_websocket_with_cx<R, W>(self, cx: &Cx, reader: R, writer: W) -> !
563    where
564        R: Read + Send + 'static,
565        W: Write + Send + 'static,
566    {
567        let transport = WsTransport::new(reader, writer);
568        self.run_transport_with_cx(cx, transport)
569    }
570
571    /// Runs the startup lifecycle hook, if configured.
572    ///
573    /// Returns `true` if startup succeeded (or no hook was configured),
574    /// `false` if the hook returned an error.
575    pub(crate) fn run_startup_hook(&self) -> bool {
576        let hook = {
577            let mut guard = self.lifespan.lock().unwrap_or_else(|poisoned| {
578                error!(target: targets::SERVER, "lifespan lock poisoned in run_startup_hook, recovering");
579                poisoned.into_inner()
580            });
581            guard.as_mut().and_then(|h| h.on_startup.take())
582        };
583
584        if let Some(hook) = hook {
585            debug!(target: targets::SERVER, "Running startup hook");
586            match hook() {
587                Ok(()) => {
588                    debug!(target: targets::SERVER, "Startup hook completed successfully");
589                    true
590                }
591                Err(e) => {
592                    error!(target: targets::SERVER, "Startup hook failed: {}", e);
593                    false
594                }
595            }
596        } else {
597            true
598        }
599    }
600
601    /// Runs the shutdown lifecycle hook, if configured.
602    pub(crate) fn run_shutdown_hook(&self) {
603        let hook = {
604            let mut guard = self.lifespan.lock().unwrap_or_else(|poisoned| {
605                error!(target: targets::SERVER, "lifespan lock poisoned in run_shutdown_hook, recovering");
606                poisoned.into_inner()
607            });
608            guard.as_mut().and_then(|h| h.on_shutdown.take())
609        };
610
611        if let Some(hook) = hook {
612            debug!(target: targets::SERVER, "Running shutdown hook");
613            hook();
614            debug!(target: targets::SERVER, "Shutdown hook completed");
615        }
616    }
617
618    /// Performs graceful shutdown: runs hook, closes stats, exits.
619    fn graceful_shutdown(&self, exit_code: i32) -> ! {
620        self.cancel_active_requests(CancelKind::Shutdown, true);
621        self.run_shutdown_hook();
622        if let Some(ref stats) = self.stats {
623            stats.connection_closed();
624        }
625        std::process::exit(exit_code)
626    }
627
628    /// Shared server loop for any transport, using closure-based recv/send.
629    fn run_loop<R, S>(
630        self,
631        cx: &Cx,
632        mut recv: R,
633        send: S,
634        notification_sender: NotificationSender,
635    ) -> !
636    where
637        R: FnMut(&Cx) -> Result<JsonRpcMessage, TransportError>,
638        S: FnMut(&Cx, &JsonRpcMessage) -> Result<(), TransportError> + Send + Sync + 'static,
639    {
640        let mut session = Session::new(self.info.clone(), self.capabilities.clone());
641
642        // Wrap send in Arc<Mutex> for shared access from bidirectional requests
643        let send = Arc::new(Mutex::new(send));
644
645        // Create a RequestSender for bidirectional communication
646        let request_sender = {
647            let send_clone = send.clone();
648            let send_fn: bidirectional::TransportSendFn = Arc::new(move |message| {
649                let mut guard = send_clone
650                    .lock()
651                    .map_err(|e| format!("Lock poisoned: {}", e))?;
652                // We need a Cx for the send call, but we're sending async so use a basic one
653                let cx = Cx::for_testing();
654                guard(&cx, message).map_err(|e| format!("Send failed: {}", e))
655            });
656            bidirectional::RequestSender::new(self.pending_requests.clone(), send_fn)
657        };
658
659        // Track connection opened
660        if let Some(ref stats) = self.stats {
661            stats.connection_opened();
662        }
663
664        // Render startup banner if enabled (respects both config and legacy env var)
665        if self.console_config.show_banner && !banner_suppressed() {
666            self.render_startup_banner();
667        }
668
669        // Run startup hook
670        if !self.run_startup_hook() {
671            error!(target: targets::SERVER, "Startup hook failed, exiting");
672            self.graceful_shutdown(1);
673        }
674
675        // Create traffic renderer if enabled
676        let traffic_renderer = if self.console_config.show_request_traffic {
677            let mut renderer = RequestResponseRenderer::new(self.console_config.resolve_context());
678            renderer.truncate_at = self.console_config.truncate_at;
679            match self.console_config.traffic_verbosity {
680                TrafficVerbosity::None => {} // Should not happen given the if check
681                TrafficVerbosity::Summary | TrafficVerbosity::Headers => {
682                    renderer.show_params = false;
683                    renderer.show_result = false;
684                }
685                TrafficVerbosity::Full => {
686                    renderer.show_params = true;
687                    renderer.show_result = true;
688                }
689            }
690            Some(renderer)
691        } else {
692            None
693        };
694
695        // Main request loop
696        loop {
697            // Check for cancellation
698            if cx.is_cancel_requested() {
699                info!(target: targets::SERVER, "Cancellation requested, shutting down");
700                self.graceful_shutdown(0);
701            }
702
703            // Receive next message
704            let message = match recv(cx) {
705                Ok(msg) => msg,
706                Err(TransportError::Closed) => {
707                    // Clean shutdown - track connection close
708                    self.graceful_shutdown(0);
709                }
710                Err(TransportError::Cancelled) => {
711                    info!(target: targets::SERVER, "Transport cancelled");
712                    self.graceful_shutdown(0);
713                }
714                Err(e) => {
715                    error!(target: targets::TRANSPORT, "Transport error: {}", e);
716                    continue;
717                }
718            };
719
720            // Log request traffic
721            if let Some(renderer) = &traffic_renderer {
722                if let JsonRpcMessage::Request(req) = &message {
723                    renderer.render_request(req, console());
724                }
725            }
726
727            let start_time = Instant::now();
728
729            // Handle the message
730            let response_opt = match message {
731                JsonRpcMessage::Request(request) => {
732                    // Track bytes received (approximate from serialized request size)
733                    if let Some(ref stats) = self.stats {
734                        // Estimate request size by serializing back to JSON
735                        // This is approximate but accurate enough for statistics
736                        if let Ok(json) = serde_json::to_string(&request) {
737                            stats.add_bytes_received(json.len() as u64 + 1); // +1 for newline
738                        }
739                    }
740                    self.handle_request(
741                        cx,
742                        &mut session,
743                        request,
744                        &notification_sender,
745                        &request_sender,
746                    )
747                }
748                JsonRpcMessage::Response(response) => {
749                    // Route response to pending server-initiated request (bidirectional)
750                    if self.pending_requests.route_response(&response) {
751                        debug!(target: targets::SERVER, "Routed response to pending request");
752                    } else {
753                        debug!(target: targets::SERVER, "Received unexpected response: {:?}", response.id);
754                    }
755                    continue;
756                }
757            };
758
759            let duration = start_time.elapsed();
760
761            if let Some(response) = response_opt {
762                // Log response traffic
763                if let Some(renderer) = &traffic_renderer {
764                    renderer.render_response(&response, Some(duration), console());
765                }
766
767                // Track bytes sent (approximate from serialized response size)
768                if let Some(ref stats) = self.stats {
769                    if let Ok(json) = serde_json::to_string(&response) {
770                        stats.add_bytes_sent(json.len() as u64 + 1); // +1 for newline
771                    }
772                }
773
774                // Send response
775                let send_result = {
776                    let mut guard = send.lock().unwrap();
777                    guard(cx, &JsonRpcMessage::Response(response))
778                };
779                if let Err(e) = send_result {
780                    error!(target: targets::TRANSPORT, "Failed to send response: {}", e);
781                }
782            }
783        }
784    }
785
786    /// Handles a single JSON-RPC request.
787    fn handle_request(
788        &self,
789        cx: &Cx,
790        session: &mut Session,
791        request: JsonRpcRequest,
792        notification_sender: &NotificationSender,
793        request_sender: &bidirectional::RequestSender,
794    ) -> Option<JsonRpcResponse> {
795        let id = request.id.clone();
796        let method = request.method.clone();
797        let is_notification = id.is_none();
798
799        // Start timing for stats
800        let start_time = Instant::now();
801
802        // Generate internal request ID for tracing
803        let request_id = request_id_to_u64(id.as_ref());
804
805        // Create a budget for this request based on timeout configuration
806        let budget = self.create_request_budget();
807
808        // Check if budget is already exhausted (should not happen, but be defensive)
809        if budget.is_exhausted() {
810            // Record failed request due to exhausted budget
811            if let Some(ref stats) = self.stats {
812                stats.record_request(&method, start_time.elapsed(), false);
813            }
814            // If it's a notification, we don't send an error response
815            let response_id = id.clone()?;
816            return Some(JsonRpcResponse::error(
817                Some(response_id),
818                JsonRpcError {
819                    code: McpErrorCode::RequestCancelled.into(),
820                    message: "Request budget exhausted".to_string(),
821                    data: None,
822                },
823            ));
824        }
825
826        let request_cx = if is_notification {
827            cx.clone()
828        } else {
829            Cx::for_request_with_budget(budget)
830        };
831
832        let _active_guard = id.clone().map(|request_id| {
833            ActiveRequestGuard::new(&self.active_requests, request_id, request_cx.clone())
834        });
835
836        // Dispatch based on method, passing the budget, notification sender, and request sender
837        let result = self.dispatch_method(
838            &request_cx,
839            session,
840            request,
841            request_id,
842            &budget,
843            notification_sender,
844            request_sender,
845        );
846
847        // Record statistics
848        let latency = start_time.elapsed();
849        if let Some(ref stats) = self.stats {
850            match &result {
851                Ok(_) => stats.record_request(&method, latency, true),
852                Err(e) if e.code == fastmcp_core::McpErrorCode::RequestCancelled => {
853                    stats.record_cancelled(&method, latency);
854                }
855                Err(_) => stats.record_request(&method, latency, false),
856            }
857        }
858
859        // If it's a notification (no ID), we must not reply
860        if is_notification {
861            if let Err(e) = result {
862                fastmcp_core::logging::error!(
863                    target: targets::HANDLER,
864                    "Notification '{}' failed: {}",
865                    method,
866                    e
867                );
868            }
869            return None;
870        }
871
872        // For success, we need a non-None id (checked above, so unwrap is safe-ish, but let's be correct)
873        // We only reach here if id is Some.
874        let response_id = id.clone().unwrap();
875
876        match result {
877            Ok(value) => Some(JsonRpcResponse::success(response_id, value)),
878            Err(e) => {
879                // Log full error before masking if this is an internal error
880                if self.mask_error_details && e.is_internal() {
881                    fastmcp_core::logging::error!(
882                        target: targets::HANDLER,
883                        "Request '{}' failed (masked in response): {}",
884                        method,
885                        e
886                    );
887                }
888
889                // Apply masking if enabled
890                let masked = e.masked(self.mask_error_details);
891                Some(JsonRpcResponse::error(
892                    id,
893                    JsonRpcError {
894                        code: masked.code.into(),
895                        message: masked.message,
896                        data: masked.data,
897                    },
898                ))
899            }
900        }
901    }
902
903    /// Creates a budget for a new request based on server configuration.
904    fn create_request_budget(&self) -> Budget {
905        if self.request_timeout_secs == 0 {
906            // No timeout - unlimited budget
907            Budget::INFINITE
908        } else {
909            // Create budget with deadline
910            Budget::with_deadline_secs(self.request_timeout_secs)
911        }
912    }
913
914    /// Dispatches a request to the appropriate handler.
915    #[allow(clippy::too_many_arguments, clippy::too_many_lines)]
916    fn dispatch_method(
917        &self,
918        cx: &Cx,
919        session: &mut Session,
920        request: JsonRpcRequest,
921        request_id: u64,
922        budget: &Budget,
923        notification_sender: &NotificationSender,
924        request_sender: &bidirectional::RequestSender,
925    ) -> Result<serde_json::Value, McpError> {
926        // Check cancellation before dispatch
927        if cx.is_cancel_requested() {
928            return Err(McpError::request_cancelled());
929        }
930
931        // Check budget before dispatch (for poll-based exhaustion)
932        if budget.is_exhausted() {
933            return Err(McpError::new(
934                McpErrorCode::RequestCancelled,
935                "Request budget exhausted",
936            ));
937        }
938
939        // Check initialization state
940        if !session.is_initialized() && request.method != "initialize" && request.method != "ping" {
941            return Err(McpError::invalid_request(
942                "Server not initialized. Client must send 'initialize' first.",
943            ));
944        }
945
946        if let Some(task_manager) = &self.task_manager {
947            task_manager.set_notification_sender(Arc::clone(notification_sender));
948        }
949
950        // Middleware: on_request
951        // We use a temporary context derived from the request context for middleware
952        // so they can access session state but share the request's lifecycle.
953        let mw_ctx = McpContext::with_state(cx.clone(), request_id, session.state().clone());
954        let mut entered_middleware: Vec<&dyn crate::Middleware> = Vec::new();
955
956        for m in self.middleware.iter() {
957            entered_middleware.push(m.as_ref());
958            match m.on_request(&mw_ctx, &request) {
959                Ok(crate::MiddlewareDecision::Continue) => {}
960                Ok(crate::MiddlewareDecision::Respond(v)) => {
961                    return self.apply_middleware_response(
962                        &entered_middleware,
963                        &mw_ctx,
964                        &request,
965                        v,
966                    );
967                }
968                Err(e) => {
969                    let err =
970                        self.apply_middleware_error(&entered_middleware, &mw_ctx, &request, e);
971                    return Err(err);
972                }
973            }
974        }
975
976        if self.should_authenticate(&request.method) {
977            let auth_request = AuthRequest {
978                method: &request.method,
979                params: request.params.as_ref(),
980                request_id,
981            };
982            self.authenticate_request(cx, request_id, session, auth_request)?;
983        }
984
985        let method = &request.method;
986        let params = request.params.clone();
987
988        // Create bidirectional senders based on client capabilities
989        let bidirectional_senders = self.create_bidirectional_senders(session, request_sender);
990
991        let result = match method.as_str() {
992            "initialize" => {
993                let params: InitializeParams = parse_params(params)?;
994                let result = self.router.handle_initialize(
995                    cx,
996                    session,
997                    params,
998                    self.instructions.as_deref(),
999                )?;
1000                Ok(serde_json::to_value(result).map_err(McpError::from)?)
1001            }
1002            "initialized" => {
1003                // Notification, no response needed (but we send empty ok)
1004                Ok(serde_json::Value::Null)
1005            }
1006            "notifications/cancelled" => {
1007                let params: CancelledParams = parse_params(params)?;
1008                self.handle_cancelled_notification(params);
1009                Ok(serde_json::Value::Null)
1010            }
1011            "logging/setLevel" => {
1012                let params: SetLogLevelParams = parse_params(params)?;
1013                self.handle_set_log_level(session, params);
1014                Ok(serde_json::Value::Null)
1015            }
1016            "tools/list" => {
1017                let params: ListToolsParams = parse_params_or_default(params)?;
1018                let result = self
1019                    .router
1020                    .handle_tools_list(cx, params, Some(session.state()))?;
1021                Ok(serde_json::to_value(result).map_err(McpError::from)?)
1022            }
1023            "tools/call" => {
1024                let params: CallToolParams = parse_params(params)?;
1025                let result = self.router.handle_tools_call(
1026                    cx,
1027                    request_id,
1028                    params,
1029                    budget,
1030                    session.state().clone(),
1031                    Some(notification_sender),
1032                    bidirectional_senders.as_ref(),
1033                )?;
1034                Ok(serde_json::to_value(result).map_err(McpError::from)?)
1035            }
1036            "resources/list" => {
1037                let params: ListResourcesParams = parse_params_or_default(params)?;
1038                let result =
1039                    self.router
1040                        .handle_resources_list(cx, params, Some(session.state()))?;
1041                Ok(serde_json::to_value(result).map_err(McpError::from)?)
1042            }
1043            "resources/templates/list" => {
1044                let params: ListResourceTemplatesParams = parse_params_or_default(params)?;
1045                let result = self.router.handle_resource_templates_list(
1046                    cx,
1047                    params,
1048                    Some(session.state()),
1049                )?;
1050                Ok(serde_json::to_value(result).map_err(McpError::from)?)
1051            }
1052            "resources/read" => {
1053                let params: ReadResourceParams = parse_params(params)?;
1054                let result = self.router.handle_resources_read(
1055                    cx,
1056                    request_id,
1057                    &params,
1058                    budget,
1059                    session.state().clone(),
1060                    Some(notification_sender),
1061                    bidirectional_senders.as_ref(),
1062                )?;
1063                Ok(serde_json::to_value(result).map_err(McpError::from)?)
1064            }
1065            "resources/subscribe" => {
1066                let params: SubscribeResourceParams = parse_params(params)?;
1067                if !self.router.resource_exists(&params.uri) {
1068                    return Err(McpError::resource_not_found(&params.uri));
1069                }
1070                session.subscribe_resource(params.uri);
1071                Ok(serde_json::json!({}))
1072            }
1073            "resources/unsubscribe" => {
1074                let params: UnsubscribeResourceParams = parse_params(params)?;
1075                session.unsubscribe_resource(&params.uri);
1076                Ok(serde_json::json!({}))
1077            }
1078            "prompts/list" => {
1079                let params: ListPromptsParams = parse_params_or_default(params)?;
1080                let result = self
1081                    .router
1082                    .handle_prompts_list(cx, params, Some(session.state()))?;
1083                Ok(serde_json::to_value(result).map_err(McpError::from)?)
1084            }
1085            "prompts/get" => {
1086                let params: GetPromptParams = parse_params(params)?;
1087                let result = self.router.handle_prompts_get(
1088                    cx,
1089                    request_id,
1090                    params,
1091                    budget,
1092                    session.state().clone(),
1093                    Some(notification_sender),
1094                    bidirectional_senders.as_ref(),
1095                )?;
1096                Ok(serde_json::to_value(result).map_err(McpError::from)?)
1097            }
1098            "ping" => {
1099                // Simple ping-pong for health checks
1100                Ok(serde_json::json!({}))
1101            }
1102            // Task methods (Docket/SEP-1686)
1103            "tasks/list" => {
1104                let params: ListTasksParams = parse_params_or_default(params)?;
1105                let result =
1106                    self.router
1107                        .handle_tasks_list(cx, params, self.task_manager.as_ref())?;
1108                Ok(serde_json::to_value(result).map_err(McpError::from)?)
1109            }
1110            "tasks/get" => {
1111                let params: GetTaskParams = parse_params(params)?;
1112                let result =
1113                    self.router
1114                        .handle_tasks_get(cx, params, self.task_manager.as_ref())?;
1115                Ok(serde_json::to_value(result).map_err(McpError::from)?)
1116            }
1117            "tasks/cancel" => {
1118                let params: CancelTaskParams = parse_params(params)?;
1119                let result =
1120                    self.router
1121                        .handle_tasks_cancel(cx, params, self.task_manager.as_ref())?;
1122                Ok(serde_json::to_value(result).map_err(McpError::from)?)
1123            }
1124            "tasks/submit" => {
1125                let params: SubmitTaskParams = parse_params(params)?;
1126                let result =
1127                    self.router
1128                        .handle_tasks_submit(cx, params, self.task_manager.as_ref())?;
1129                Ok(serde_json::to_value(result).map_err(McpError::from)?)
1130            }
1131            _ => Err(McpError::method_not_found(method)),
1132        };
1133
1134        let final_result = match result {
1135            Ok(v) => self.apply_middleware_response(&entered_middleware, &mw_ctx, &request, v),
1136            Err(e) => Err(self.apply_middleware_error(&entered_middleware, &mw_ctx, &request, e)),
1137        };
1138
1139        self.maybe_emit_log_notification(session, notification_sender, method, &final_result);
1140
1141        final_result
1142    }
1143
1144    fn apply_middleware_response(
1145        &self,
1146        stack: &[&dyn crate::Middleware],
1147        ctx: &McpContext,
1148        request: &JsonRpcRequest,
1149        value: serde_json::Value,
1150    ) -> Result<serde_json::Value, McpError> {
1151        let mut response = value;
1152        for m in stack.iter().rev() {
1153            match m.on_response(ctx, request, response) {
1154                Ok(next) => response = next,
1155                Err(err) => {
1156                    let mapped = self.apply_middleware_error(stack, ctx, request, err);
1157                    return Err(mapped);
1158                }
1159            }
1160        }
1161        Ok(response)
1162    }
1163
1164    fn apply_middleware_error(
1165        &self,
1166        stack: &[&dyn crate::Middleware],
1167        ctx: &McpContext,
1168        request: &JsonRpcRequest,
1169        error: McpError,
1170    ) -> McpError {
1171        let mut err = error;
1172        for m in stack.iter().rev() {
1173            err = m.on_error(ctx, request, err);
1174        }
1175        err
1176    }
1177
1178    /// Creates bidirectional senders based on client capabilities.
1179    ///
1180    /// Returns `Some(BidirectionalSenders)` if the client supports any bidirectional
1181    /// features (sampling, elicitation), or `None` if no features are supported.
1182    fn create_bidirectional_senders(
1183        &self,
1184        session: &Session,
1185        request_sender: &bidirectional::RequestSender,
1186    ) -> Option<handler::BidirectionalSenders> {
1187        let supports_sampling = session.supports_sampling();
1188        let supports_elicitation = session.supports_elicitation();
1189
1190        if !supports_sampling && !supports_elicitation {
1191            return None;
1192        }
1193
1194        let mut senders = handler::BidirectionalSenders::new();
1195
1196        if supports_sampling {
1197            let sampling_sender: Arc<dyn fastmcp_core::SamplingSender> = Arc::new(
1198                bidirectional::TransportSamplingSender::new(request_sender.clone()),
1199            );
1200            senders = senders.with_sampling(sampling_sender);
1201        }
1202
1203        if supports_elicitation {
1204            let elicitation_sender: Arc<dyn fastmcp_core::ElicitationSender> = Arc::new(
1205                bidirectional::TransportElicitationSender::new(request_sender.clone()),
1206            );
1207            senders = senders.with_elicitation(elicitation_sender);
1208        }
1209
1210        Some(senders)
1211    }
1212
1213    fn should_authenticate(&self, method: &str) -> bool {
1214        !matches!(
1215            method,
1216            "initialize" | "initialized" | "notifications/cancelled" | "ping"
1217        )
1218    }
1219
1220    fn authenticate_request(
1221        &self,
1222        cx: &Cx,
1223        request_id: u64,
1224        session: &Session,
1225        request: AuthRequest<'_>,
1226    ) -> Result<AuthContext, McpError> {
1227        let Some(provider) = &self.auth_provider else {
1228            return Ok(AuthContext::anonymous());
1229        };
1230
1231        let ctx = McpContext::with_state(cx.clone(), request_id, session.state().clone());
1232        let auth = provider.authenticate(&ctx, request)?;
1233        if !ctx.set_auth(auth.clone()) {
1234            debug!(
1235                target: targets::SESSION,
1236                "Auth context not stored (session state unavailable)"
1237            );
1238        }
1239        Ok(auth)
1240    }
1241
1242    fn handle_cancelled_notification(&self, params: CancelledParams) {
1243        let reason = params.reason.as_deref().unwrap_or("unspecified");
1244        let await_cleanup = params.await_cleanup.unwrap_or(false);
1245        info!(
1246            target: targets::SESSION,
1247            "Cancellation requested for requestId={} (reason: {}, await_cleanup={})",
1248            params.request_id,
1249            reason,
1250            await_cleanup
1251        );
1252        let active = {
1253            let guard = self.active_requests.lock().unwrap_or_else(|poisoned| {
1254                error!(target: targets::SERVER, "active_requests lock poisoned, recovering");
1255                poisoned.into_inner()
1256            });
1257            guard
1258                .get(&params.request_id)
1259                .map(|entry| (entry.cx.clone(), entry.region_id, entry.completion.clone()))
1260        };
1261        if let Some((cx, region_id, completion)) = active {
1262            cx.cancel_with(CancelKind::User, None);
1263            if await_cleanup {
1264                let completed = completion.wait_timeout(AWAIT_CLEANUP_TIMEOUT);
1265                if !completed {
1266                    fastmcp_core::logging::warn!(
1267                        target: targets::SESSION,
1268                        "await_cleanup timed out for requestId={} (region={:?})",
1269                        params.request_id,
1270                        region_id
1271                    );
1272                }
1273            }
1274        } else {
1275            fastmcp_core::logging::warn!(
1276                target: targets::SESSION,
1277                "No active request found for cancellation requestId={}",
1278                params.request_id
1279            );
1280        }
1281    }
1282
1283    fn cancel_active_requests(&self, kind: CancelKind, await_cleanup: bool) {
1284        let active: Vec<(RequestId, RegionId, Cx, Arc<RequestCompletion>)> = {
1285            let guard = self.active_requests.lock().unwrap_or_else(|poisoned| {
1286                error!(target: targets::SERVER, "active_requests lock poisoned in cancel_active_requests, recovering");
1287                poisoned.into_inner()
1288            });
1289            guard
1290                .iter()
1291                .map(|(request_id, entry)| {
1292                    (
1293                        request_id.clone(),
1294                        entry.region_id,
1295                        entry.cx.clone(),
1296                        entry.completion.clone(),
1297                    )
1298                })
1299                .collect()
1300        };
1301        if active.is_empty() {
1302            return;
1303        }
1304        info!(
1305            target: targets::SESSION,
1306            "Cancelling {} active request(s) (kind={:?}, await_cleanup={})",
1307            active.len(),
1308            kind,
1309            await_cleanup
1310        );
1311        for (_, _, cx, _) in &active {
1312            cx.cancel_with(kind, None);
1313        }
1314
1315        if await_cleanup {
1316            for (request_id, region_id, _cx, completion) in active {
1317                let completed = completion.wait_timeout(AWAIT_CLEANUP_TIMEOUT);
1318                if !completed {
1319                    fastmcp_core::logging::warn!(
1320                        target: targets::SESSION,
1321                        "Shutdown cancel timed out for requestId={} (region={:?})",
1322                        request_id,
1323                        region_id
1324                    );
1325                }
1326            }
1327        }
1328    }
1329
1330    fn handle_set_log_level(&self, session: &mut Session, params: SetLogLevelParams) {
1331        let requested = match params.level {
1332            LogLevel::Debug => LevelFilter::Debug,
1333            LogLevel::Info => LevelFilter::Info,
1334            LogLevel::Warning => LevelFilter::Warn,
1335            LogLevel::Error => LevelFilter::Error,
1336        };
1337
1338        let configured = self.logging.level.to_level_filter();
1339        let effective = if requested > configured {
1340            configured
1341        } else {
1342            requested
1343        };
1344
1345        log::set_max_level(effective);
1346
1347        let effective_level = match effective {
1348            LevelFilter::Debug => LogLevel::Debug,
1349            LevelFilter::Info => LogLevel::Info,
1350            LevelFilter::Warn => LogLevel::Warning,
1351            LevelFilter::Error => LogLevel::Error,
1352            _ => LogLevel::Info,
1353        };
1354        session.set_log_level(effective_level);
1355
1356        if effective != requested {
1357            fastmcp_core::logging::warn!(
1358                target: targets::SESSION,
1359                "Client requested log level {:?}; clamped to server level {:?}",
1360                params.level,
1361                effective
1362            );
1363        } else {
1364            info!(
1365                target: targets::SESSION,
1366                "Log level set to {:?}",
1367                params.level
1368            );
1369        }
1370    }
1371
1372    fn log_level_rank(level: LogLevel) -> u8 {
1373        match level {
1374            LogLevel::Debug => 1,
1375            LogLevel::Info => 2,
1376            LogLevel::Warning => 3,
1377            LogLevel::Error => 4,
1378        }
1379    }
1380
1381    fn emit_log_notification(
1382        &self,
1383        session: &Session,
1384        sender: &NotificationSender,
1385        level: LogLevel,
1386        message: impl Into<String>,
1387    ) {
1388        let Some(min_level) = session.log_level() else {
1389            return;
1390        };
1391        if Self::log_level_rank(level) < Self::log_level_rank(min_level) {
1392            return;
1393        }
1394
1395        let ts = chrono::Utc::now().to_rfc3339();
1396        let text = format!("{ts} {}", message.into());
1397        let params = LogMessageParams {
1398            level,
1399            logger: Some("fastmcp::server".to_string()),
1400            data: serde_json::Value::String(text),
1401        };
1402        let payload = match serde_json::to_value(params) {
1403            Ok(value) => value,
1404            Err(err) => {
1405                fastmcp_core::logging::warn!(
1406                    target: targets::SESSION,
1407                    "Failed to serialize log message notification: {}",
1408                    err
1409                );
1410                return;
1411            }
1412        };
1413        sender(JsonRpcRequest::notification(
1414            "notifications/message",
1415            Some(payload),
1416        ));
1417    }
1418
1419    fn maybe_emit_log_notification(
1420        &self,
1421        session: &Session,
1422        sender: &NotificationSender,
1423        method: &str,
1424        result: &McpResult<serde_json::Value>,
1425    ) {
1426        if method.starts_with("notifications/") || method == "logging/setLevel" {
1427            return;
1428        }
1429        let level = if result.is_ok() {
1430            LogLevel::Info
1431        } else {
1432            LogLevel::Error
1433        };
1434        let message = if result.is_ok() {
1435            format!("Handled {}", method)
1436        } else {
1437            format!("Error handling {}", method)
1438        };
1439        self.emit_log_notification(session, sender, level, message);
1440    }
1441}
1442
1443const AWAIT_CLEANUP_TIMEOUT: Duration = Duration::from_secs(5);
1444
1445struct RequestCompletion {
1446    done: Mutex<bool>,
1447    cv: Condvar,
1448}
1449
1450impl RequestCompletion {
1451    fn new() -> Self {
1452        Self {
1453            done: Mutex::new(false),
1454            cv: Condvar::new(),
1455        }
1456    }
1457
1458    fn mark_done(&self) {
1459        let mut done = self
1460            .done
1461            .lock()
1462            .unwrap_or_else(std::sync::PoisonError::into_inner);
1463        if !*done {
1464            *done = true;
1465            self.cv.notify_all();
1466        }
1467    }
1468
1469    fn wait_timeout(&self, timeout: Duration) -> bool {
1470        let mut done = self
1471            .done
1472            .lock()
1473            .unwrap_or_else(std::sync::PoisonError::into_inner);
1474        if *done {
1475            return true;
1476        }
1477        let start = Instant::now();
1478        let mut remaining = timeout;
1479        loop {
1480            let (guard, result) = self
1481                .cv
1482                .wait_timeout(done, remaining)
1483                .unwrap_or_else(std::sync::PoisonError::into_inner);
1484            done = guard;
1485            if *done {
1486                return true;
1487            }
1488            if result.timed_out() {
1489                return false;
1490            }
1491            let elapsed = start.elapsed();
1492            remaining = match timeout.checked_sub(elapsed) {
1493                Some(left) if !left.is_zero() => left,
1494                _ => return false,
1495            };
1496        }
1497    }
1498
1499    fn is_done(&self) -> bool {
1500        let done = self
1501            .done
1502            .lock()
1503            .unwrap_or_else(std::sync::PoisonError::into_inner);
1504        *done
1505    }
1506}
1507
1508struct ActiveRequest {
1509    cx: Cx,
1510    region_id: RegionId,
1511    completion: Arc<RequestCompletion>,
1512}
1513
1514impl ActiveRequest {
1515    fn new(cx: Cx, completion: Arc<RequestCompletion>) -> Self {
1516        let region_id = cx.region_id();
1517        Self {
1518            cx,
1519            region_id,
1520            completion,
1521        }
1522    }
1523}
1524
1525struct ActiveRequestGuard<'a> {
1526    map: &'a Mutex<HashMap<RequestId, ActiveRequest>>,
1527    id: RequestId,
1528    completion: Arc<RequestCompletion>,
1529}
1530
1531impl<'a> ActiveRequestGuard<'a> {
1532    fn new(map: &'a Mutex<HashMap<RequestId, ActiveRequest>>, id: RequestId, cx: Cx) -> Self {
1533        let completion = Arc::new(RequestCompletion::new());
1534        let entry = ActiveRequest::new(cx, completion.clone());
1535        let mut guard = map
1536            .lock()
1537            .unwrap_or_else(std::sync::PoisonError::into_inner);
1538        if guard.insert(id.clone(), entry).is_some() {
1539            fastmcp_core::logging::warn!(
1540                target: targets::SESSION,
1541                "Active request replaced for requestId={}",
1542                id
1543            );
1544        }
1545        Self {
1546            map,
1547            id,
1548            completion,
1549        }
1550    }
1551}
1552
1553impl Drop for ActiveRequestGuard<'_> {
1554    fn drop(&mut self) {
1555        {
1556            let mut guard = self
1557                .map
1558                .lock()
1559                .unwrap_or_else(std::sync::PoisonError::into_inner);
1560            match guard.get(&self.id) {
1561                Some(entry) if Arc::ptr_eq(&entry.completion, &self.completion) => {
1562                    guard.remove(&self.id);
1563                }
1564                Some(_) => {
1565                    fastmcp_core::logging::warn!(
1566                        target: targets::SESSION,
1567                        "Active request replaced before drop for requestId={}",
1568                        self.id
1569                    );
1570                }
1571                None => {
1572                    fastmcp_core::logging::warn!(
1573                        target: targets::SESSION,
1574                        "Active request missing on drop for requestId={}",
1575                        self.id
1576                    );
1577                }
1578            }
1579        }
1580        self.completion.mark_done();
1581    }
1582}
1583
1584/// Checks if banner should be suppressed via environment variable.
1585///
1586/// This is a legacy check. Prefer using `ConsoleConfig` for banner control.
1587fn banner_suppressed() -> bool {
1588    std::env::var("FASTMCP_NO_BANNER")
1589        .map(|value| matches!(value.to_lowercase().as_str(), "1" | "true" | "yes"))
1590        .unwrap_or(false)
1591}
1592
1593/// Parses required parameters from JSON.
1594fn parse_params<T: serde::de::DeserializeOwned>(
1595    params: Option<serde_json::Value>,
1596) -> Result<T, McpError> {
1597    let value = params.ok_or_else(|| McpError::invalid_params("Missing required parameters"))?;
1598    serde_json::from_value(value).map_err(|e| McpError::invalid_params(e.to_string()))
1599}
1600
1601/// Parses optional parameters from JSON, using default if not provided.
1602fn parse_params_or_default<T: serde::de::DeserializeOwned + Default>(
1603    params: Option<serde_json::Value>,
1604) -> Result<T, McpError> {
1605    match params {
1606        Some(value) => {
1607            serde_json::from_value(value).map_err(|e| McpError::invalid_params(e.to_string()))
1608        }
1609        None => Ok(T::default()),
1610    }
1611}
1612
1613/// Converts a JSON-RPC RequestId to a u64 for internal tracking.
1614///
1615/// If the ID is a number, uses that number. If it's a string or absent,
1616/// uses a stable hash (string) or 0 (absent) as a fallback.
1617fn request_id_to_u64(id: Option<&RequestId>) -> u64 {
1618    match id {
1619        Some(RequestId::Number(n)) => *n as u64,
1620        Some(RequestId::String(s)) => stable_hash_request_id(s),
1621        None => 0,
1622    }
1623}
1624
1625fn stable_hash_request_id(value: &str) -> u64 {
1626    const FNV_OFFSET: u64 = 0xcbf29ce484222325;
1627    const FNV_PRIME: u64 = 0x0000_0100_0000_01b3;
1628    let mut hash = FNV_OFFSET;
1629    for byte in value.as_bytes() {
1630        hash ^= u64::from(*byte);
1631        hash = hash.wrapping_mul(FNV_PRIME);
1632    }
1633    if hash == 0 { FNV_OFFSET } else { hash }
1634}
1635
1636struct SharedTransport<T> {
1637    inner: Arc<Mutex<T>>,
1638}
1639
1640impl<T> Clone for SharedTransport<T> {
1641    fn clone(&self) -> Self {
1642        Self {
1643            inner: Arc::clone(&self.inner),
1644        }
1645    }
1646}
1647
1648impl<T: Transport> SharedTransport<T> {
1649    fn new(transport: T) -> Self {
1650        Self {
1651            inner: Arc::new(Mutex::new(transport)),
1652        }
1653    }
1654
1655    fn recv(&self, cx: &Cx) -> Result<JsonRpcMessage, TransportError> {
1656        let mut guard = self.inner.lock().map_err(|_| transport_lock_error())?;
1657        guard.recv(cx)
1658    }
1659
1660    fn send(&self, cx: &Cx, message: &JsonRpcMessage) -> Result<(), TransportError> {
1661        let mut guard = self.inner.lock().map_err(|_| transport_lock_error())?;
1662        guard.send(cx, message)
1663    }
1664}
1665
1666fn transport_lock_error() -> TransportError {
1667    TransportError::Io(std::io::Error::other("transport lock poisoned"))
1668}
1669
1670fn create_transport_notification_sender<T>(transport: SharedTransport<T>) -> NotificationSender
1671where
1672    T: Transport + Send + 'static,
1673{
1674    let cx = Cx::for_testing();
1675
1676    Arc::new(move |request: JsonRpcRequest| {
1677        let message = JsonRpcMessage::Request(request);
1678        if let Err(e) = transport.send(&cx, &message) {
1679            log::error!(
1680                target: targets::TRANSPORT,
1681                "Failed to send notification: {}",
1682                e
1683            );
1684        }
1685    })
1686}
1687
1688/// Creates a notification sender that writes JSON-RPC notifications to stdout.
1689///
1690/// This creates a separate stdout handle for sending notifications, allowing
1691/// notifications (like progress updates) to be sent during handler execution
1692/// independently of the main transport.
1693///
1694/// The sender uses NDJSON format (newline-delimited JSON) to match the
1695/// standard MCP transport format.
1696fn create_notification_sender() -> NotificationSender {
1697    use std::sync::Mutex;
1698
1699    // Use AsyncStdout so notifications share the global stdout lock used by
1700    // the transport writer, preventing interleaved NDJSON writes.
1701    let stdout = Mutex::new(AsyncStdout::new());
1702    let codec = Codec::new();
1703
1704    Arc::new(move |request: JsonRpcRequest| {
1705        let bytes = match codec.encode_request(&request) {
1706            Ok(b) => b,
1707            Err(e) => {
1708                log::error!(target: targets::SERVER, "Failed to encode notification: {}", e);
1709                return;
1710            }
1711        };
1712
1713        if let Ok(mut stdout) = stdout.lock() {
1714            if let Err(e) = stdout.write_all_unchecked(&bytes) {
1715                log::error!(target: targets::TRANSPORT, "Failed to send notification: {}", e);
1716            }
1717            if let Err(e) = stdout.flush_unchecked() {
1718                log::error!(target: targets::TRANSPORT, "Failed to flush notification: {}", e);
1719            }
1720        } else {
1721            log::warn!(target: targets::SERVER, "Failed to acquire stdout lock for notification");
1722        }
1723    })
1724}