1#![forbid(unsafe_code)]
39#![allow(dead_code)]
40
41extern crate self as fastmcp_server;
45
46mod auth;
47pub mod bidirectional;
48mod builder;
49pub mod caching;
50pub mod docket;
51mod handler;
52mod middleware;
53pub mod oauth;
54pub mod oidc;
55pub mod providers;
56mod proxy;
57pub mod rate_limiting;
58mod router;
59mod session;
60mod tasks;
61pub mod transform;
62
63#[cfg(test)]
64mod tests;
65
66#[cfg(feature = "jwt")]
67pub use auth::JwtTokenVerifier;
68pub use auth::{
69 AllowAllAuthProvider, AuthProvider, AuthRequest, StaticTokenVerifier, TokenAuthProvider,
70 TokenVerifier,
71};
72pub use builder::ServerBuilder;
73pub use fastmcp_console::config::{BannerStyle, ConsoleConfig, TrafficVerbosity};
74pub use fastmcp_console::stats::{ServerStats, StatsSnapshot};
75pub use handler::{
76 BidirectionalSenders, BoxFuture, ProgressNotificationSender, PromptHandler, ResourceHandler,
77 ToolHandler, create_context_with_progress, create_context_with_progress_and_senders,
78};
79pub use middleware::{Middleware, MiddlewareDecision};
80pub use proxy::{ProxyBackend, ProxyCatalog, ProxyClient};
81pub use router::{
82 MountResult, NotificationSender, Router, RouterResourceReader, RouterToolCaller, TagFilters,
83};
84pub use session::Session;
85pub use tasks::{SharedTaskManager, TaskManager};
86
87pub use bidirectional::{
89 PendingRequests, RequestSender, TransportElicitationSender, TransportRootsProvider,
90 TransportSamplingSender,
91};
92
93use std::collections::HashMap;
94use std::io::{BufReader, BufWriter, Read, Write};
95use std::net::TcpListener;
96use std::sync::atomic::{AtomicUsize, Ordering};
97use std::sync::{Arc, Condvar, Mutex};
98use std::time::{Duration, Instant};
99
100use fastmcp_transport::http::{
101 HttpHandlerConfig, HttpMethod, HttpRequest, HttpRequestHandler, HttpResponse, HttpStatus,
102 HttpTransport,
103};
104
105use asupersync::time::wall_now;
106use asupersync::{Budget, CancelKind, Cx, RegionId};
107use fastmcp_console::client::RequestResponseRenderer;
108use fastmcp_console::logging::RichLoggerBuilder;
109use fastmcp_console::{banner::StartupBanner, console};
110use fastmcp_core::logging::{debug, error, info, targets};
111use fastmcp_core::{AuthContext, McpContext, McpError, McpErrorCode, McpResult, SessionState};
112use fastmcp_protocol::{
113 CallToolParams, CancelTaskParams, CancelledParams, GetPromptParams, GetTaskParams,
114 InitializeParams, JsonRpcError, JsonRpcMessage, JsonRpcRequest, JsonRpcResponse,
115 ListPromptsParams, ListResourceTemplatesParams, ListResourcesParams, ListTasksParams,
116 ListToolsParams, LogLevel, LogMessageParams, Prompt, ReadResourceParams, RequestId, Resource,
117 ResourceTemplate, ServerCapabilities, ServerInfo, SetLogLevelParams, SubmitTaskParams,
118 SubscribeResourceParams, Tool, UnsubscribeResourceParams,
119};
120use fastmcp_transport::sse::SseServerTransport;
121use fastmcp_transport::websocket::WsTransport;
122use fastmcp_transport::{AsyncStdout, Codec, StdioTransport, Transport, TransportError};
123use log::{Level, LevelFilter};
124
125pub type StartupHook =
127 Box<dyn FnOnce() -> Result<(), Box<dyn std::error::Error + Send + Sync>> + Send>;
128
129pub type ShutdownHook = Box<dyn FnOnce() + Send>;
131
132#[derive(Default)]
158pub struct LifespanHooks {
159 pub on_startup: Option<StartupHook>,
161 pub on_shutdown: Option<ShutdownHook>,
163}
164
165impl LifespanHooks {
166 #[must_use]
168 pub fn new() -> Self {
169 Self::default()
170 }
171}
172
173#[derive(Debug, Clone)]
175pub struct LoggingConfig {
176 pub level: Level,
178 pub timestamps: bool,
180 pub targets: bool,
182 pub file_line: bool,
184}
185
186impl Default for LoggingConfig {
187 fn default() -> Self {
188 Self {
189 level: Level::Info,
190 timestamps: true,
191 targets: true,
192 file_line: false,
193 }
194 }
195}
196
197impl LoggingConfig {
198 #[must_use]
206 pub fn from_env() -> Self {
207 let level = std::env::var("FASTMCP_LOG")
208 .ok()
209 .and_then(|s| match s.to_lowercase().as_str() {
210 "error" => Some(Level::Error),
211 "warn" | "warning" => Some(Level::Warn),
212 "info" => Some(Level::Info),
213 "debug" => Some(Level::Debug),
214 "trace" => Some(Level::Trace),
215 _ => None,
216 })
217 .unwrap_or(Level::Info);
218
219 let timestamps = std::env::var("FASTMCP_LOG_TIMESTAMPS")
220 .map(|s| !matches!(s.to_lowercase().as_str(), "0" | "false" | "no"))
221 .unwrap_or(true);
222
223 let targets = std::env::var("FASTMCP_LOG_TARGETS")
224 .map(|s| !matches!(s.to_lowercase().as_str(), "0" | "false" | "no"))
225 .unwrap_or(true);
226
227 let file_line = std::env::var("FASTMCP_LOG_FILE_LINE")
228 .map(|s| matches!(s.to_lowercase().as_str(), "1" | "true" | "yes"))
229 .unwrap_or(false);
230
231 Self {
232 level,
233 timestamps,
234 targets,
235 file_line,
236 }
237 }
238}
239
240#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
245pub enum DuplicateBehavior {
246 Error,
250
251 #[default]
256 Warn,
257
258 Replace,
262
263 Ignore,
267}
268
269#[derive(Debug, Clone)]
284pub struct HttpServerConfig {
285 pub mcp_path: String,
287 pub health_path: String,
289 pub max_connections: usize,
291 pub handler_config: HttpHandlerConfig,
293}
294
295#[derive(Debug, Clone, Copy, PartialEq, Eq)]
296enum HttpRequestExecutionMode {
297 ConcurrentReadOnly,
298 ExclusiveSession,
299}
300
301impl HttpRequestExecutionMode {
302 fn for_method(method: &str) -> Self {
303 match method {
304 "resources/read" | "prompts/get" => Self::ConcurrentReadOnly,
305 _ => Self::ExclusiveSession,
306 }
307 }
308
309 fn for_request(router: &Router, request: &JsonRpcRequest) -> Self {
310 match request.method.as_str() {
311 "tools/call" => request
312 .params
313 .as_ref()
314 .and_then(|params| params.get("name"))
315 .and_then(serde_json::Value::as_str)
316 .filter(|name| router.tool_is_read_only(name))
317 .map_or(Self::ExclusiveSession, |_| Self::ConcurrentReadOnly),
318 _ => Self::for_method(&request.method),
319 }
320 }
321}
322
323#[derive(Debug, Clone)]
324struct SessionView {
325 initialized: bool,
326 state: SessionState,
327 supports_sampling: bool,
328 supports_elicitation: bool,
329 log_level: Option<LogLevel>,
330}
331
332impl SessionView {
333 fn from_session(session: &Session) -> Self {
334 Self {
335 initialized: session.is_initialized(),
336 state: session.state().clone(),
337 supports_sampling: session.supports_sampling(),
338 supports_elicitation: session.supports_elicitation(),
339 log_level: session.log_level(),
340 }
341 }
342}
343
344impl Default for HttpServerConfig {
345 fn default() -> Self {
346 Self {
347 mcp_path: "/mcp".to_string(),
348 health_path: "/health".to_string(),
349 max_connections: 64,
350 handler_config: HttpHandlerConfig {
351 base_path: "/mcp".to_string(),
352 ..HttpHandlerConfig::default()
353 },
354 }
355 }
356}
357
358impl HttpServerConfig {
359 #[must_use]
361 pub fn new() -> Self {
362 Self::default()
363 }
364
365 #[must_use]
367 pub fn mcp_path(mut self, path: impl Into<String>) -> Self {
368 self.mcp_path = path.into();
369 self
370 }
371
372 #[must_use]
374 pub fn health_path(mut self, path: impl Into<String>) -> Self {
375 self.health_path = path.into();
376 self
377 }
378
379 #[must_use]
381 pub fn max_connections(mut self, max: usize) -> Self {
382 self.max_connections = max;
383 self
384 }
385
386 #[must_use]
388 pub fn handler_config(mut self, config: HttpHandlerConfig) -> Self {
389 self.handler_config = config;
390 self
391 }
392}
393
394pub struct Server {
399 info: ServerInfo,
400 capabilities: ServerCapabilities,
401 router: Router,
402 instructions: Option<String>,
403 request_timeout_secs: u64,
405 stats: Option<ServerStats>,
407 mask_error_details: bool,
409 logging: LoggingConfig,
411 console_config: ConsoleConfig,
413 lifespan: Mutex<Option<LifespanHooks>>,
415 auth_provider: Option<Arc<dyn AuthProvider>>,
417 middleware: Arc<Vec<Box<dyn crate::Middleware>>>,
419 active_requests: Mutex<HashMap<RequestId, ActiveRequest>>,
421 task_manager: Option<SharedTaskManager>,
423 pending_requests: Arc<bidirectional::PendingRequests>,
425 http_config: HttpServerConfig,
427}
428
429impl Server {
430 #[must_use]
432 #[allow(clippy::new_ret_no_self)]
433 pub fn new(name: impl Into<String>, version: impl Into<String>) -> ServerBuilder {
434 ServerBuilder::new(name, version)
435 }
436
437 #[must_use]
439 pub fn info(&self) -> &ServerInfo {
440 &self.info
441 }
442
443 #[must_use]
445 pub fn capabilities(&self) -> &ServerCapabilities {
446 &self.capabilities
447 }
448
449 #[must_use]
451 pub fn tools(&self) -> Vec<Tool> {
452 self.router.tools()
453 }
454
455 #[must_use]
457 pub fn resources(&self) -> Vec<Resource> {
458 self.router.resources()
459 }
460
461 #[must_use]
463 pub fn resource_templates(&self) -> Vec<ResourceTemplate> {
464 self.router.resource_templates()
465 }
466
467 #[must_use]
469 pub fn prompts(&self) -> Vec<Prompt> {
470 self.router.prompts()
471 }
472
473 #[must_use]
477 pub fn task_manager(&self) -> Option<&SharedTaskManager> {
478 self.task_manager.as_ref()
479 }
480
481 #[must_use]
485 pub fn into_router(self) -> Router {
486 self.router
487 }
488
489 #[must_use]
494 pub fn has_tools(&self) -> bool {
495 self.capabilities.tools.is_some()
496 }
497
498 #[must_use]
500 pub fn has_resources(&self) -> bool {
501 self.capabilities.resources.is_some()
502 }
503
504 #[must_use]
506 pub fn has_prompts(&self) -> bool {
507 self.capabilities.prompts.is_some()
508 }
509
510 #[must_use]
514 pub fn stats(&self) -> Option<StatsSnapshot> {
515 self.stats.as_ref().map(ServerStats::snapshot)
516 }
517
518 #[must_use]
523 pub fn stats_collector(&self) -> Option<&ServerStats> {
524 self.stats.as_ref()
525 }
526
527 pub fn display_stats(&self) {
529 let Some(stats) = self.stats.as_ref() else {
530 return;
531 };
532
533 let snapshot = stats.snapshot();
534 let renderer = fastmcp_console::stats::StatsRenderer::detect();
535 renderer.render_panel(&snapshot, console());
536 }
537
538 #[must_use]
540 pub fn console_config(&self) -> &ConsoleConfig {
541 &self.console_config
542 }
543
544 fn render_startup_banner(&self) {
546 let render = || {
547 let mut banner = StartupBanner::new(&self.info.name, &self.info.version)
548 .tools(self.router.tools_count())
549 .resources(self.router.resources_count())
550 .prompts(self.router.prompts_count())
551 .transport("stdio");
552
553 if let Some(desc) = self.instructions.as_deref().filter(|d| !d.is_empty()) {
554 banner = banner.description(desc);
555 }
556
557 match self.console_config.banner_style {
559 BannerStyle::Full => banner.render(console()),
560 BannerStyle::Compact | BannerStyle::Minimal => {
561 banner.no_logo().render(console());
563 }
564 BannerStyle::None => {} }
566 };
567
568 if let Err(err) = std::panic::catch_unwind(std::panic::AssertUnwindSafe(render)) {
569 eprintln!("Warning: banner rendering failed: {err:?}");
570 }
571 }
572
573 fn init_rich_logging(&self) {
579 let result = RichLoggerBuilder::new()
580 .level(self.logging.level)
581 .with_timestamps(self.logging.timestamps)
582 .with_targets(self.logging.targets)
583 .with_file_line(self.logging.file_line)
584 .init();
585
586 if let Err(e) = result {
587 eprintln!("Note: Rich logging not initialized (logger already set): {e}");
589 }
590 }
591
592 pub fn dispatch_request(
642 &self,
643 cx: &Cx,
644 session: &mut Session,
645 request: JsonRpcRequest,
646 notification_sender: &NotificationSender,
647 request_sender: &bidirectional::RequestSender,
648 ) -> Option<JsonRpcResponse> {
649 self.handle_request(cx, session, request, notification_sender, request_sender)
650 }
651
652 pub fn dispatch_request_concurrent(
713 &self,
714 cx: &Cx,
715 session: &Arc<Mutex<Session>>,
716 request: JsonRpcRequest,
717 notification_sender: &NotificationSender,
718 request_sender: &bidirectional::RequestSender,
719 ) -> Option<JsonRpcResponse> {
720 let execution_mode = HttpRequestExecutionMode::for_request(&self.router, &request);
721
722 match execution_mode {
723 HttpRequestExecutionMode::ConcurrentReadOnly => {
724 let session_view = {
725 let session_guard = match session.lock() {
726 Ok(g) => g,
727 Err(poisoned) => {
728 error!(target: targets::SERVER, "Session lock poisoned, recovering");
729 poisoned.into_inner()
730 }
731 };
732 SessionView::from_session(&session_guard)
733 };
734 self.handle_request_with_view(
735 cx,
736 &session_view,
737 request,
738 notification_sender,
739 request_sender,
740 )
741 }
742 HttpRequestExecutionMode::ExclusiveSession => {
743 let mut session_guard = match session.lock() {
744 Ok(g) => g,
745 Err(poisoned) => {
746 error!(target: targets::SERVER, "Session lock poisoned, recovering");
747 poisoned.into_inner()
748 }
749 };
750 self.handle_request(
751 cx,
752 &mut session_guard,
753 request,
754 notification_sender,
755 request_sender,
756 )
757 }
758 }
759 }
760
761 pub fn run_stdio(self) -> ! {
766 let cx = Cx::for_request();
768 self.run_stdio_with_cx(&cx)
769 }
770
771 pub fn run_stdio_with_cx(self, cx: &Cx) -> ! {
775 self.init_rich_logging();
777
778 let transport = StdioTransport::stdio();
779 let shared = SharedTransport::new(transport);
780
781 let notification_sender = create_notification_sender();
785
786 let shared_recv = shared.clone();
787 let shared_send = shared.clone();
788 self.run_loop(
789 cx,
790 move |cx| shared_recv.recv(cx),
791 move |cx, message| shared_send.send(cx, message),
792 notification_sender,
793 )
794 }
795
796 pub fn run_transport<T>(self, transport: T) -> !
801 where
802 T: Transport + Send + 'static,
803 {
804 let cx = Cx::for_request();
806 self.run_transport_with_cx(&cx, transport)
807 }
808
809 pub fn run_transport_with_cx<T>(self, cx: &Cx, transport: T) -> !
813 where
814 T: Transport + Send + 'static,
815 {
816 self.init_rich_logging();
817
818 let shared = SharedTransport::new(transport);
819 let notification_sender = create_transport_notification_sender(shared.clone());
820
821 let shared_recv = shared.clone();
822 let shared_send = shared;
823 self.run_loop(
824 cx,
825 move |cx| shared_recv.recv(cx),
826 move |cx, message| shared_send.send(cx, message),
827 notification_sender,
828 )
829 }
830
831 pub fn run_transport_returning_with_cx<T>(self, cx: &Cx, transport: T)
837 where
838 T: Transport + Send + 'static,
839 {
840 self.init_rich_logging();
841
842 let shared = SharedTransport::new(transport);
843 let notification_sender = create_transport_notification_sender(shared.clone());
844
845 let shared_recv = shared.clone();
846 let shared_send = shared;
847 self.run_loop_returning(
848 cx,
849 move |cx| shared_recv.recv(cx),
850 move |cx, message| shared_send.send(cx, message),
851 notification_sender,
852 );
853 }
854
855 pub fn run_transport_returning<T>(self, transport: T)
860 where
861 T: Transport + Send + 'static,
862 {
863 let cx = Cx::for_request();
865 self.run_transport_returning_with_cx(&cx, transport);
866 }
867
868 pub fn run_sse<W, R>(self, writer: W, request_source: R, endpoint_url: impl Into<String>) -> !
872 where
873 W: Write + Send + 'static,
874 R: Iterator<Item = JsonRpcRequest> + Send + 'static,
875 {
876 let transport = SseServerTransport::new(writer, request_source, endpoint_url);
877 self.run_transport(transport)
878 }
879
880 pub fn run_sse_with_cx<W, R>(
882 self,
883 cx: &Cx,
884 writer: W,
885 request_source: R,
886 endpoint_url: impl Into<String>,
887 ) -> !
888 where
889 W: Write + Send + 'static,
890 R: Iterator<Item = JsonRpcRequest> + Send + 'static,
891 {
892 let transport = SseServerTransport::new(writer, request_source, endpoint_url);
893 self.run_transport_with_cx(cx, transport)
894 }
895
896 pub fn run_websocket<R, W>(self, reader: R, writer: W) -> !
900 where
901 R: Read + Send + 'static,
902 W: Write + Send + 'static,
903 {
904 let transport = WsTransport::new(reader, writer);
905 self.run_transport(transport)
906 }
907
908 pub fn run_websocket_with_cx<R, W>(self, cx: &Cx, reader: R, writer: W) -> !
910 where
911 R: Read + Send + 'static,
912 W: Write + Send + 'static,
913 {
914 let transport = WsTransport::new(reader, writer);
915 self.run_transport_with_cx(cx, transport)
916 }
917
918 pub fn run_http(self, addr: impl Into<String>) -> ! {
945 let cx = Cx::for_request();
946 self.run_http_with_cx(&cx, addr)
947 }
948
949 pub fn run_http_with_cx(self, cx: &Cx, addr: impl Into<String>) -> ! {
953 self.run_http_accept_loop(cx, addr.into(), false);
954 unreachable!()
956 }
957
958 pub fn run_http_returning(self, addr: impl Into<String>) {
964 let cx = Cx::for_request();
965 self.run_http_returning_with_cx(&cx, addr);
966 }
967
968 pub fn run_http_returning_with_cx(self, cx: &Cx, addr: impl Into<String>) {
970 self.run_http_accept_loop(cx, addr.into(), true);
971 }
972
973 #[allow(clippy::too_many_lines)]
982 fn run_http_accept_loop(self, cx: &Cx, addr: String, returning: bool) {
983 self.init_rich_logging();
984
985 let listener = match TcpListener::bind(&addr) {
987 Ok(l) => l,
988 Err(e) => {
989 error!(target: targets::TRANSPORT, "Failed to bind HTTP listener on {}: {}", addr, e);
990 if returning {
991 return;
992 }
993 std::process::exit(1);
994 }
995 };
996
997 let _ = listener.set_nonblocking(true);
1000
1001 info!(target: targets::SERVER, "HTTP server listening on {}", addr);
1002
1003 let mcp_path = self.http_config.mcp_path.clone();
1005 let health_path = self.http_config.health_path.clone();
1006 let max_connections = self.http_config.max_connections;
1007
1008 let session = Arc::new(Mutex::new(Session::new(
1010 self.info.clone(),
1011 self.capabilities.clone(),
1012 )));
1013
1014 let notification_sender: NotificationSender = Arc::new(|request: JsonRpcRequest| {
1017 log::debug!(
1018 target: targets::SERVER,
1019 "HTTP notification (not deliverable to client): {}",
1020 request.method
1021 );
1022 });
1023
1024 let request_sender = Arc::new({
1026 let send_fn: bidirectional::TransportSendFn = Arc::new(|_message| {
1027 Err("HTTP transport does not support server-to-client requests".into())
1030 });
1031 bidirectional::RequestSender::new(self.pending_requests.clone(), send_fn)
1032 });
1033
1034 if let Some(ref stats) = self.stats {
1036 stats.connection_opened();
1037 }
1038
1039 if self.console_config.show_banner && !banner_suppressed() {
1041 self.render_http_startup_banner(&addr);
1042 }
1043
1044 if !self.run_startup_hook() {
1046 error!(target: targets::SERVER, "Startup hook failed");
1047 if returning {
1048 self.graceful_shutdown_returning();
1049 return;
1050 }
1051 self.graceful_shutdown(1);
1052 }
1053
1054 let http_handler = Arc::new(HttpRequestHandler::new());
1056
1057 let traffic_renderer = Arc::new(if self.console_config.show_request_traffic {
1059 let mut renderer = RequestResponseRenderer::new(self.console_config.resolve_context());
1060 renderer.truncate_at = self.console_config.truncate_at;
1061 match self.console_config.traffic_verbosity {
1062 TrafficVerbosity::None => {}
1063 TrafficVerbosity::Summary | TrafficVerbosity::Headers => {
1064 renderer.show_params = false;
1065 renderer.show_result = false;
1066 }
1067 TrafficVerbosity::Full => {
1068 renderer.show_params = true;
1069 renderer.show_result = true;
1070 }
1071 }
1072 Some(renderer)
1073 } else {
1074 None
1075 });
1076
1077 let server = Arc::new(self);
1079
1080 let active_connections = Arc::new(AtomicUsize::new(0));
1082
1083 loop {
1085 if cx.is_cancel_requested() {
1086 info!(target: targets::SERVER, "Cancellation requested, shutting down HTTP server");
1087 if returning {
1088 server.graceful_shutdown_returning();
1089 return;
1090 }
1091 server.graceful_shutdown(0);
1092 }
1093
1094 let (stream, peer_addr) = match listener.accept() {
1095 Ok(pair) => pair,
1096 Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
1097 std::thread::sleep(Duration::from_millis(10));
1099 continue;
1100 }
1101 Err(e) => {
1102 error!(target: targets::TRANSPORT, "Failed to accept connection: {}", e);
1103 continue;
1104 }
1105 };
1106
1107 let current = active_connections.load(Ordering::Relaxed);
1109 if current >= max_connections {
1110 debug!(
1111 target: targets::TRANSPORT,
1112 "Rejecting connection from {} (max_connections {} reached)",
1113 peer_addr,
1114 max_connections
1115 );
1116 if let Ok(reader_stream) = stream.try_clone() {
1118 let mut http_transport =
1119 HttpTransport::new(BufReader::new(reader_stream), BufWriter::new(stream));
1120 let _ = http_transport.write_response(
1121 &HttpResponse::new(HttpStatus::SERVICE_UNAVAILABLE)
1122 .with_json(&serde_json::json!({"error": "too many connections"})),
1123 );
1124 }
1125 continue;
1126 }
1127
1128 debug!(
1129 target: targets::TRANSPORT,
1130 "Accepted HTTP connection from {}",
1131 peer_addr
1132 );
1133
1134 let _ = stream.set_nonblocking(false);
1138
1139 let server = Arc::clone(&server);
1141 let session = Arc::clone(&session);
1142 let notification_sender = Arc::clone(¬ification_sender);
1143 let request_sender = Arc::clone(&request_sender);
1144 let http_handler = Arc::clone(&http_handler);
1145 let traffic_renderer = Arc::clone(&traffic_renderer);
1146 let active_connections = Arc::clone(&active_connections);
1147 let mcp_path = mcp_path.clone();
1148 let health_path = health_path.clone();
1149 let conn_cx = cx.clone();
1150
1151 active_connections.fetch_add(1, Ordering::Relaxed);
1153
1154 std::thread::spawn(move || {
1156 struct ConnectionGuard(Arc<AtomicUsize>);
1158 impl Drop for ConnectionGuard {
1159 fn drop(&mut self) {
1160 self.0.fetch_sub(1, Ordering::Relaxed);
1161 }
1162 }
1163 let _guard = ConnectionGuard(active_connections);
1164
1165 let reader = BufReader::new(match stream.try_clone() {
1167 Ok(s) => s,
1168 Err(e) => {
1169 error!(target: targets::TRANSPORT, "Failed to clone TCP stream: {}", e);
1170 return;
1171 }
1172 });
1173 let writer = BufWriter::new(stream);
1174 let mut http_transport: HttpTransport<
1175 BufReader<std::net::TcpStream>,
1176 BufWriter<std::net::TcpStream>,
1177 > = HttpTransport::new(reader, writer);
1178
1179 let http_request = match http_transport.read_request() {
1180 Ok(req) => req,
1181 Err(e) => {
1182 debug!(target: targets::TRANSPORT, "Failed to read HTTP request: {}", e);
1183 return;
1184 }
1185 };
1186
1187 let response = if http_request.path == health_path
1189 && http_request.method == HttpMethod::Get
1190 {
1191 HttpResponse::ok().with_json(&serde_json::json!({"status": "ok"}))
1193 } else if http_request.path == mcp_path
1194 && http_request.method == HttpMethod::Options
1195 {
1196 http_handler.handle_options(&http_request)
1198 } else if http_request.path == mcp_path && http_request.method == HttpMethod::Post {
1199 server.handle_http_mcp_request(
1201 &conn_cx,
1202 &session,
1203 &http_handler,
1204 &http_request,
1205 ¬ification_sender,
1206 &request_sender,
1207 &traffic_renderer,
1208 )
1209 } else {
1210 HttpResponse::new(HttpStatus::NOT_FOUND)
1212 .with_json(&serde_json::json!({"error": "not found"}))
1213 };
1214
1215 if let Err(e) = http_transport.write_response(&response) {
1217 debug!(target: targets::TRANSPORT, "Failed to write HTTP response: {}", e);
1218 }
1219 });
1220 }
1221 }
1222
1223 fn handle_http_mcp_request(
1225 &self,
1226 cx: &Cx,
1227 session: &Arc<Mutex<Session>>,
1228 http_handler: &HttpRequestHandler,
1229 http_request: &HttpRequest,
1230 notification_sender: &NotificationSender,
1231 request_sender: &bidirectional::RequestSender,
1232 traffic_renderer: &Option<RequestResponseRenderer>,
1233 ) -> HttpResponse {
1234 let json_rpc = match http_handler.parse_request(http_request) {
1236 Ok(r) => r,
1237 Err(e) => {
1238 debug!(target: targets::TRANSPORT, "Invalid MCP request: {}", e);
1239 return http_handler
1240 .error_response(HttpStatus::BAD_REQUEST, &format!("Invalid request: {e}"));
1241 }
1242 };
1243
1244 if let Some(renderer) = traffic_renderer {
1246 renderer.render_request(&json_rpc, console());
1247 }
1248
1249 if let Some(ref stats) = self.stats {
1251 if let Ok(json) = serde_json::to_string(&json_rpc) {
1252 stats.add_bytes_received(json.len() as u64 + 1);
1253 }
1254 }
1255
1256 let start_time = Instant::now();
1257
1258 let execution_mode = HttpRequestExecutionMode::for_request(&self.router, &json_rpc);
1259
1260 let response_opt = match execution_mode {
1265 HttpRequestExecutionMode::ConcurrentReadOnly => {
1266 let session_view = {
1267 let session_guard = match session.lock() {
1268 Ok(g) => g,
1269 Err(poisoned) => {
1270 error!(target: targets::SERVER, "Session lock poisoned, recovering");
1271 poisoned.into_inner()
1272 }
1273 };
1274 SessionView::from_session(&session_guard)
1275 };
1276 self.handle_request_with_view(
1277 cx,
1278 &session_view,
1279 json_rpc,
1280 notification_sender,
1281 request_sender,
1282 )
1283 }
1284 HttpRequestExecutionMode::ExclusiveSession => {
1285 let mut session_guard = match session.lock() {
1286 Ok(g) => g,
1287 Err(poisoned) => {
1288 error!(target: targets::SERVER, "Session lock poisoned, recovering");
1289 poisoned.into_inner()
1290 }
1291 };
1292 self.handle_request(
1293 cx,
1294 &mut session_guard,
1295 json_rpc,
1296 notification_sender,
1297 request_sender,
1298 )
1299 }
1300 };
1301
1302 let duration = start_time.elapsed();
1303
1304 match response_opt {
1305 Some(json_rpc_response) => {
1306 if let Some(renderer) = traffic_renderer {
1308 renderer.render_response(&json_rpc_response, Some(duration), console());
1309 }
1310
1311 if let Some(ref stats) = self.stats {
1313 if let Ok(json) = serde_json::to_string(&json_rpc_response) {
1314 stats.add_bytes_sent(json.len() as u64 + 1);
1315 }
1316 }
1317
1318 let origin = http_request.header("origin");
1319 http_handler.create_response(&json_rpc_response, origin)
1320 }
1321 None => {
1322 HttpResponse::new(HttpStatus::ACCEPTED)
1324 }
1325 }
1326 }
1327
1328 fn render_http_startup_banner(&self, addr: &str) {
1330 let render = || {
1331 let transport_label = format!("http://{addr}");
1332 let mut banner = StartupBanner::new(&self.info.name, &self.info.version)
1333 .tools(self.router.tools_count())
1334 .resources(self.router.resources_count())
1335 .prompts(self.router.prompts_count())
1336 .transport(&transport_label);
1337
1338 if let Some(desc) = self.instructions.as_deref().filter(|d| !d.is_empty()) {
1339 banner = banner.description(desc);
1340 }
1341
1342 match self.console_config.banner_style {
1343 BannerStyle::Full => banner.render(console()),
1344 BannerStyle::Compact | BannerStyle::Minimal => {
1345 banner.no_logo().render(console());
1346 }
1347 BannerStyle::None => {}
1348 }
1349 };
1350
1351 if let Err(err) = std::panic::catch_unwind(std::panic::AssertUnwindSafe(render)) {
1352 eprintln!("Warning: banner rendering failed: {err:?}");
1353 }
1354 }
1355
1356 pub(crate) fn run_startup_hook(&self) -> bool {
1361 let hook = {
1362 let mut guard = self.lifespan.lock().unwrap_or_else(|poisoned| {
1363 error!(target: targets::SERVER, "lifespan lock poisoned in run_startup_hook, recovering");
1364 poisoned.into_inner()
1365 });
1366 guard.as_mut().and_then(|h| h.on_startup.take())
1367 };
1368
1369 if let Some(hook) = hook {
1370 debug!(target: targets::SERVER, "Running startup hook");
1371 match hook() {
1372 Ok(()) => {
1373 debug!(target: targets::SERVER, "Startup hook completed successfully");
1374 true
1375 }
1376 Err(e) => {
1377 error!(target: targets::SERVER, "Startup hook failed: {}", e);
1378 false
1379 }
1380 }
1381 } else {
1382 true
1383 }
1384 }
1385
1386 pub(crate) fn run_shutdown_hook(&self) {
1388 let hook = {
1389 let mut guard = self.lifespan.lock().unwrap_or_else(|poisoned| {
1390 error!(target: targets::SERVER, "lifespan lock poisoned in run_shutdown_hook, recovering");
1391 poisoned.into_inner()
1392 });
1393 guard.as_mut().and_then(|h| h.on_shutdown.take())
1394 };
1395
1396 if let Some(hook) = hook {
1397 debug!(target: targets::SERVER, "Running shutdown hook");
1398 hook();
1399 debug!(target: targets::SERVER, "Shutdown hook completed");
1400 }
1401 }
1402
1403 fn graceful_shutdown(&self, exit_code: i32) -> ! {
1405 self.cancel_active_requests(CancelKind::Shutdown, true);
1406 self.run_shutdown_hook();
1407 if let Some(ref stats) = self.stats {
1408 stats.connection_closed();
1409 }
1410 std::process::exit(exit_code)
1411 }
1412
1413 fn graceful_shutdown_returning(&self) {
1418 self.cancel_active_requests(CancelKind::Shutdown, true);
1419 self.run_shutdown_hook();
1420 if let Some(ref stats) = self.stats {
1421 stats.connection_closed();
1422 }
1423 }
1424
1425 fn run_loop<R, S>(
1427 self,
1428 cx: &Cx,
1429 mut recv: R,
1430 send: S,
1431 notification_sender: NotificationSender,
1432 ) -> !
1433 where
1434 R: FnMut(&Cx) -> Result<JsonRpcMessage, TransportError>,
1435 S: FnMut(&Cx, &JsonRpcMessage) -> Result<(), TransportError> + Send + Sync + 'static,
1436 {
1437 let mut session = Session::new(self.info.clone(), self.capabilities.clone());
1438
1439 let send = Arc::new(Mutex::new(send));
1441
1442 let request_sender = {
1444 let send_clone = send.clone();
1445 let send_cx = cx.clone();
1446 let send_fn: bidirectional::TransportSendFn = Arc::new(move |message| {
1447 let mut guard = send_clone
1448 .lock()
1449 .map_err(|e| format!("Lock poisoned: {}", e))?;
1450 guard(&send_cx, message).map_err(|e| format!("Send failed: {}", e))
1451 });
1452 bidirectional::RequestSender::new(self.pending_requests.clone(), send_fn)
1453 };
1454
1455 if let Some(ref stats) = self.stats {
1457 stats.connection_opened();
1458 }
1459
1460 if self.console_config.show_banner && !banner_suppressed() {
1462 self.render_startup_banner();
1463 }
1464
1465 if !self.run_startup_hook() {
1467 error!(target: targets::SERVER, "Startup hook failed, exiting");
1468 self.graceful_shutdown(1);
1469 }
1470
1471 let traffic_renderer = if self.console_config.show_request_traffic {
1473 let mut renderer = RequestResponseRenderer::new(self.console_config.resolve_context());
1474 renderer.truncate_at = self.console_config.truncate_at;
1475 match self.console_config.traffic_verbosity {
1476 TrafficVerbosity::None => {} TrafficVerbosity::Summary | TrafficVerbosity::Headers => {
1478 renderer.show_params = false;
1479 renderer.show_result = false;
1480 }
1481 TrafficVerbosity::Full => {
1482 renderer.show_params = true;
1483 renderer.show_result = true;
1484 }
1485 }
1486 Some(renderer)
1487 } else {
1488 None
1489 };
1490
1491 loop {
1493 if cx.is_cancel_requested() {
1495 info!(target: targets::SERVER, "Cancellation requested, shutting down");
1496 self.graceful_shutdown(0);
1497 }
1498
1499 let message = match recv(cx) {
1501 Ok(msg) => msg,
1502 Err(TransportError::Closed) => {
1503 self.graceful_shutdown(0);
1505 }
1506 Err(TransportError::Cancelled) => {
1507 info!(target: targets::SERVER, "Transport cancelled");
1508 self.graceful_shutdown(0);
1509 }
1510 Err(e) => {
1511 error!(target: targets::TRANSPORT, "Transport error: {}", e);
1512 continue;
1513 }
1514 };
1515
1516 if let Some(renderer) = &traffic_renderer {
1518 if let JsonRpcMessage::Request(req) = &message {
1519 renderer.render_request(req, console());
1520 }
1521 }
1522
1523 let start_time = Instant::now();
1524
1525 let response_opt = match message {
1527 JsonRpcMessage::Request(request) => {
1528 if let Some(ref stats) = self.stats {
1530 if let Ok(json) = serde_json::to_string(&request) {
1533 stats.add_bytes_received(json.len() as u64 + 1); }
1535 }
1536 self.handle_request(
1537 cx,
1538 &mut session,
1539 request,
1540 ¬ification_sender,
1541 &request_sender,
1542 )
1543 }
1544 JsonRpcMessage::Response(response) => {
1545 if self.pending_requests.route_response(&response) {
1547 debug!(target: targets::SERVER, "Routed response to pending request");
1548 } else {
1549 debug!(target: targets::SERVER, "Received unexpected response: {:?}", response.id);
1550 }
1551 continue;
1552 }
1553 };
1554
1555 let duration = start_time.elapsed();
1556
1557 if let Some(response) = response_opt {
1558 if let Some(renderer) = &traffic_renderer {
1560 renderer.render_response(&response, Some(duration), console());
1561 }
1562
1563 if let Some(ref stats) = self.stats {
1565 if let Ok(json) = serde_json::to_string(&response) {
1566 stats.add_bytes_sent(json.len() as u64 + 1); }
1568 }
1569
1570 let send_result = {
1572 let mut guard = match send.lock() {
1573 Ok(guard) => guard,
1574 Err(poisoned) => {
1575 error!(
1576 target: targets::TRANSPORT,
1577 "Send channel lock poisoned; continuing with inner guard"
1578 );
1579 poisoned.into_inner()
1580 }
1581 };
1582 guard(cx, &JsonRpcMessage::Response(response))
1583 };
1584 if let Err(e) = send_result {
1585 error!(target: targets::TRANSPORT, "Failed to send response: {}", e);
1586 }
1587 }
1588 }
1589 }
1590
1591 #[allow(clippy::too_many_lines)]
1596 fn run_loop_returning<R, S>(
1597 self,
1598 cx: &Cx,
1599 mut recv: R,
1600 send: S,
1601 notification_sender: NotificationSender,
1602 ) where
1603 R: FnMut(&Cx) -> Result<JsonRpcMessage, TransportError>,
1604 S: FnMut(&Cx, &JsonRpcMessage) -> Result<(), TransportError> + Send + Sync + 'static,
1605 {
1606 let mut session = Session::new(self.info.clone(), self.capabilities.clone());
1607
1608 let send = Arc::new(Mutex::new(send));
1610
1611 let request_sender = {
1613 let send_clone = send.clone();
1614 let send_cx = cx.clone();
1615 let send_fn: bidirectional::TransportSendFn = Arc::new(move |message| {
1616 let mut guard = send_clone
1617 .lock()
1618 .map_err(|e| format!("Lock poisoned: {}", e))?;
1619 guard(&send_cx, message).map_err(|e| format!("Send failed: {}", e))
1620 });
1621 bidirectional::RequestSender::new(self.pending_requests.clone(), send_fn)
1622 };
1623
1624 if let Some(ref stats) = self.stats {
1626 stats.connection_opened();
1627 }
1628
1629 if self.console_config.show_banner && !banner_suppressed() {
1631 self.render_startup_banner();
1632 }
1633
1634 if !self.run_startup_hook() {
1636 error!(target: targets::SERVER, "Startup hook failed, stopping");
1637 self.graceful_shutdown_returning();
1638 return;
1639 }
1640
1641 let traffic_renderer = if self.console_config.show_request_traffic {
1643 let mut renderer = RequestResponseRenderer::new(self.console_config.resolve_context());
1644 renderer.truncate_at = self.console_config.truncate_at;
1645 match self.console_config.traffic_verbosity {
1646 TrafficVerbosity::None => {} TrafficVerbosity::Summary | TrafficVerbosity::Headers => {
1648 renderer.show_params = false;
1649 renderer.show_result = false;
1650 }
1651 TrafficVerbosity::Full => {
1652 renderer.show_params = true;
1653 renderer.show_result = true;
1654 }
1655 }
1656 Some(renderer)
1657 } else {
1658 None
1659 };
1660
1661 loop {
1663 if cx.is_cancel_requested() {
1665 info!(target: targets::SERVER, "Cancellation requested, stopping");
1666 self.graceful_shutdown_returning();
1667 return;
1668 }
1669
1670 let message = match recv(cx) {
1672 Ok(msg) => msg,
1673 Err(TransportError::Closed) => {
1674 self.graceful_shutdown_returning();
1675 return;
1676 }
1677 Err(TransportError::Cancelled) => {
1678 info!(target: targets::SERVER, "Transport cancelled");
1679 self.graceful_shutdown_returning();
1680 return;
1681 }
1682 Err(e) => {
1683 error!(target: targets::TRANSPORT, "Transport error: {}", e);
1684 continue;
1685 }
1686 };
1687
1688 if let Some(renderer) = &traffic_renderer {
1690 if let JsonRpcMessage::Request(req) = &message {
1691 renderer.render_request(req, console());
1692 }
1693 }
1694
1695 let start_time = Instant::now();
1696
1697 let response_opt = match message {
1699 JsonRpcMessage::Request(request) => {
1700 if let Some(ref stats) = self.stats {
1702 if let Ok(json) = serde_json::to_string(&request) {
1705 stats.add_bytes_received(json.len() as u64 + 1); }
1707 }
1708 self.handle_request(
1709 cx,
1710 &mut session,
1711 request,
1712 ¬ification_sender,
1713 &request_sender,
1714 )
1715 }
1716 JsonRpcMessage::Response(response) => {
1717 if self.pending_requests.route_response(&response) {
1719 debug!(target: targets::SERVER, "Routed response to pending request");
1720 } else {
1721 debug!(
1722 target: targets::SERVER,
1723 "Received unexpected response: {:?}",
1724 response.id
1725 );
1726 }
1727 continue;
1728 }
1729 };
1730
1731 let duration = start_time.elapsed();
1732
1733 if let Some(response) = response_opt {
1734 if let Some(renderer) = &traffic_renderer {
1736 renderer.render_response(&response, Some(duration), console());
1737 }
1738
1739 if let Some(ref stats) = self.stats {
1741 if let Ok(json) = serde_json::to_string(&response) {
1742 stats.add_bytes_sent(json.len() as u64 + 1); }
1744 }
1745
1746 let send_result = {
1748 let mut guard = match send.lock() {
1749 Ok(guard) => guard,
1750 Err(poisoned) => {
1751 error!(
1752 target: targets::TRANSPORT,
1753 "Send channel lock poisoned; continuing with inner guard"
1754 );
1755 poisoned.into_inner()
1756 }
1757 };
1758 guard(cx, &JsonRpcMessage::Response(response))
1759 };
1760 if let Err(e) = send_result {
1761 error!(target: targets::TRANSPORT, "Failed to send response: {}", e);
1762 }
1763 }
1764 }
1765 }
1766
1767 fn handle_request(
1769 &self,
1770 cx: &Cx,
1771 session: &mut Session,
1772 request: JsonRpcRequest,
1773 notification_sender: &NotificationSender,
1774 request_sender: &bidirectional::RequestSender,
1775 ) -> Option<JsonRpcResponse> {
1776 let id = request.id.clone();
1777 let method = request.method.clone();
1778 let is_notification = id.is_none();
1779
1780 let start_time = Instant::now();
1782
1783 let request_id = request_id_to_u64(id.as_ref());
1785
1786 let budget = self.create_request_budget();
1788
1789 if budget.is_exhausted() {
1791 if let Some(ref stats) = self.stats {
1793 stats.record_request(&method, start_time.elapsed(), false);
1794 }
1795 let response_id = id.clone()?;
1797 return Some(JsonRpcResponse::error(
1798 Some(response_id),
1799 JsonRpcError {
1800 code: McpErrorCode::RequestCancelled.into(),
1801 message: "Request budget exhausted".to_string(),
1802 data: None,
1803 },
1804 ));
1805 }
1806
1807 let request_cx = if is_notification {
1808 cx.clone()
1809 } else {
1810 Cx::for_request_with_budget(budget)
1811 };
1812
1813 let _active_guard = match id.clone() {
1814 Some(request_id) => {
1815 match ActiveRequestGuard::try_new(
1816 &self.active_requests,
1817 request_id.clone(),
1818 request_cx.clone(),
1819 ) {
1820 Ok(guard) => Some(guard),
1821 Err(duplicate_id) => {
1822 if let Some(ref stats) = self.stats {
1823 stats.record_request(&method, start_time.elapsed(), false);
1824 }
1825 let message = format!(
1826 "Request id {duplicate_id} is already active; wait for the earlier request to finish before reusing it"
1827 );
1828 return Some(JsonRpcResponse::error(
1829 Some(request_id),
1830 JsonRpcError {
1831 code: McpErrorCode::InvalidRequest.into(),
1832 message,
1833 data: None,
1834 },
1835 ));
1836 }
1837 }
1838 }
1839 None => None,
1840 };
1841
1842 let result = self.dispatch_method(
1844 &request_cx,
1845 session,
1846 request,
1847 request_id,
1848 &budget,
1849 notification_sender,
1850 request_sender,
1851 );
1852
1853 let latency = start_time.elapsed();
1855 if let Some(ref stats) = self.stats {
1856 match &result {
1857 Ok(_) => stats.record_request(&method, latency, true),
1858 Err(e) if e.code == fastmcp_core::McpErrorCode::RequestCancelled => {
1859 stats.record_cancelled(&method, latency);
1860 }
1861 Err(_) => stats.record_request(&method, latency, false),
1862 }
1863 }
1864
1865 if is_notification {
1867 if let Err(e) = result {
1868 fastmcp_core::logging::error!(
1869 target: targets::HANDLER,
1870 "Notification '{}' failed: {}",
1871 method,
1872 e
1873 );
1874 }
1875 return None;
1876 }
1877
1878 let response_id = id.clone()?;
1881
1882 match result {
1883 Ok(value) => Some(JsonRpcResponse::success(response_id, value)),
1884 Err(e) => {
1885 if self.mask_error_details && e.is_internal() {
1887 fastmcp_core::logging::error!(
1888 target: targets::HANDLER,
1889 "Request '{}' failed (masked in response): {}",
1890 method,
1891 e
1892 );
1893 }
1894
1895 let masked = e.masked(self.mask_error_details);
1897 Some(JsonRpcResponse::error(
1898 id,
1899 JsonRpcError {
1900 code: masked.code.into(),
1901 message: masked.message,
1902 data: masked.data,
1903 },
1904 ))
1905 }
1906 }
1907 }
1908
1909 fn handle_request_with_view(
1910 &self,
1911 cx: &Cx,
1912 session: &SessionView,
1913 request: JsonRpcRequest,
1914 notification_sender: &NotificationSender,
1915 request_sender: &bidirectional::RequestSender,
1916 ) -> Option<JsonRpcResponse> {
1917 let id = request.id.clone();
1918 let method = request.method.clone();
1919 let is_notification = id.is_none();
1920
1921 let start_time = Instant::now();
1922 let request_id = request_id_to_u64(id.as_ref());
1923 let budget = self.create_request_budget();
1924
1925 if budget.is_exhausted() {
1926 if let Some(ref stats) = self.stats {
1927 stats.record_request(&method, start_time.elapsed(), false);
1928 }
1929 let response_id = id.clone()?;
1930 return Some(JsonRpcResponse::error(
1931 Some(response_id),
1932 JsonRpcError {
1933 code: McpErrorCode::RequestCancelled.into(),
1934 message: "Request budget exhausted".to_string(),
1935 data: None,
1936 },
1937 ));
1938 }
1939
1940 let request_cx = if is_notification {
1941 cx.clone()
1942 } else {
1943 Cx::for_request_with_budget(budget)
1944 };
1945
1946 let _active_guard = match id.clone() {
1947 Some(request_id) => {
1948 match ActiveRequestGuard::try_new(
1949 &self.active_requests,
1950 request_id.clone(),
1951 request_cx.clone(),
1952 ) {
1953 Ok(guard) => Some(guard),
1954 Err(duplicate_id) => {
1955 if let Some(ref stats) = self.stats {
1956 stats.record_request(&method, start_time.elapsed(), false);
1957 }
1958 let message = format!(
1959 "Request id {duplicate_id} is already active; wait for the earlier request to finish before reusing it"
1960 );
1961 return Some(JsonRpcResponse::error(
1962 Some(request_id),
1963 JsonRpcError {
1964 code: McpErrorCode::InvalidRequest.into(),
1965 message,
1966 data: None,
1967 },
1968 ));
1969 }
1970 }
1971 }
1972 None => None,
1973 };
1974
1975 let result = self.dispatch_read_only_http_method(
1976 &request_cx,
1977 session,
1978 request,
1979 request_id,
1980 &budget,
1981 notification_sender,
1982 request_sender,
1983 );
1984
1985 let latency = start_time.elapsed();
1986 if let Some(ref stats) = self.stats {
1987 match &result {
1988 Ok(_) => stats.record_request(&method, latency, true),
1989 Err(e) if e.code == fastmcp_core::McpErrorCode::RequestCancelled => {
1990 stats.record_cancelled(&method, latency);
1991 }
1992 Err(_) => stats.record_request(&method, latency, false),
1993 }
1994 }
1995
1996 if is_notification {
1997 if let Err(e) = result {
1998 fastmcp_core::logging::error!(
1999 target: targets::HANDLER,
2000 "Notification '{}' failed: {}",
2001 method,
2002 e
2003 );
2004 }
2005 return None;
2006 }
2007
2008 let response_id = id.clone()?;
2009
2010 match result {
2011 Ok(value) => Some(JsonRpcResponse::success(response_id, value)),
2012 Err(e) => {
2013 if self.mask_error_details && e.is_internal() {
2014 fastmcp_core::logging::error!(
2015 target: targets::HANDLER,
2016 "Request '{}' failed (masked in response): {}",
2017 method,
2018 e
2019 );
2020 }
2021
2022 let masked = e.masked(self.mask_error_details);
2023 Some(JsonRpcResponse::error(
2024 id,
2025 JsonRpcError {
2026 code: masked.code.into(),
2027 message: masked.message,
2028 data: masked.data,
2029 },
2030 ))
2031 }
2032 }
2033 }
2034
2035 fn create_request_budget(&self) -> Budget {
2037 if self.request_timeout_secs == 0 {
2038 Budget::INFINITE
2040 } else {
2041 let now = wall_now();
2044 let timeout_ns = self.request_timeout_secs.saturating_mul(1_000_000_000);
2045 let deadline = now.saturating_add_nanos(timeout_ns);
2046 Budget::new().with_deadline(deadline)
2047 }
2048 }
2049
2050 #[allow(clippy::too_many_arguments, clippy::too_many_lines)]
2052 fn dispatch_method(
2053 &self,
2054 cx: &Cx,
2055 session: &mut Session,
2056 request: JsonRpcRequest,
2057 request_id: u64,
2058 budget: &Budget,
2059 notification_sender: &NotificationSender,
2060 request_sender: &bidirectional::RequestSender,
2061 ) -> Result<serde_json::Value, McpError> {
2062 if cx.is_cancel_requested() {
2064 return Err(McpError::request_cancelled());
2065 }
2066
2067 if budget.is_exhausted() {
2069 return Err(McpError::new(
2070 McpErrorCode::RequestCancelled,
2071 "Request budget exhausted",
2072 ));
2073 }
2074 if budget.is_past_deadline(wall_now()) {
2076 cx.cancel_fast(CancelKind::Deadline);
2077 return Err(McpError::new(
2078 McpErrorCode::RequestCancelled,
2079 "Request timeout exceeded",
2080 ));
2081 }
2082
2083 if !session.is_initialized() && request.method != "initialize" && request.method != "ping" {
2085 return Err(McpError::invalid_request(
2086 "Server not initialized. Client must send 'initialize' first.",
2087 ));
2088 }
2089
2090 if let Some(task_manager) = &self.task_manager {
2091 task_manager.set_notification_sender(Arc::clone(notification_sender));
2092 }
2093
2094 let mut mw_ctx = McpContext::with_state(cx.clone(), request_id, session.state().clone());
2095 let request_auth = if self.should_authenticate(&request.method) {
2096 let auth_request = AuthRequest {
2097 method: &request.method,
2098 params: request.params.as_ref(),
2099 request_id,
2100 };
2101 match self.authenticate_request(cx, request_id, session, auth_request) {
2102 Ok(auth) => Some(auth),
2103 Err(err) => {
2104 let err = self.apply_global_middleware_error(&mw_ctx, &request, err);
2105 let result = Err(err);
2106 self.maybe_emit_log_notification(
2107 session,
2108 notification_sender,
2109 &request.method,
2110 &result,
2111 );
2112 return result;
2113 }
2114 }
2115 } else {
2116 None
2117 };
2118 if let Some(auth) = request_auth.clone() {
2119 mw_ctx = mw_ctx.with_auth(auth);
2120 }
2121
2122 let mut entered_middleware: Vec<&dyn crate::Middleware> = Vec::new();
2126
2127 for m in self.middleware.iter() {
2128 entered_middleware.push(m.as_ref());
2129 match m.on_request(&mw_ctx, &request) {
2130 Ok(crate::MiddlewareDecision::Continue) => {}
2131 Ok(crate::MiddlewareDecision::Respond(v)) => {
2132 return self.apply_middleware_response(
2133 &entered_middleware,
2134 &mw_ctx,
2135 &request,
2136 v,
2137 );
2138 }
2139 Err(e) => {
2140 let err =
2141 self.apply_middleware_error(&entered_middleware, &mw_ctx, &request, e);
2142 return Err(err);
2143 }
2144 }
2145 }
2146
2147 let dispatch_auth = mw_ctx.auth();
2148
2149 let result: Result<serde_json::Value, McpError> = (|| {
2156 let method = &request.method;
2157 let params = request.params.clone();
2158
2159 let bidirectional_senders = self.create_bidirectional_senders(session, request_sender);
2161
2162 match method.as_str() {
2163 "initialize" => {
2164 let params: InitializeParams = parse_params(params)?;
2165 let result = self.router.handle_initialize(
2166 cx,
2167 session,
2168 params,
2169 self.instructions.as_deref(),
2170 )?;
2171 Ok(serde_json::to_value(result).map_err(McpError::from)?)
2172 }
2173 "initialized" => {
2174 Ok(serde_json::Value::Null)
2176 }
2177 "notifications/cancelled" => {
2178 let params: CancelledParams = parse_params(params)?;
2179 self.handle_cancelled_notification(params);
2180 Ok(serde_json::Value::Null)
2181 }
2182 "logging/setLevel" => {
2183 let params: SetLogLevelParams = parse_params(params)?;
2184 self.handle_set_log_level(session, params);
2185 Ok(serde_json::Value::Null)
2186 }
2187 "tools/list" => {
2188 let params: ListToolsParams = parse_params_or_default(params)?;
2189 let result =
2190 self.router
2191 .handle_tools_list(cx, params, Some(session.state()))?;
2192 Ok(serde_json::to_value(result).map_err(McpError::from)?)
2193 }
2194 "tools/call" => {
2195 let params: CallToolParams = parse_params(params)?;
2196 let result = self.router.handle_tools_call(
2197 cx,
2198 request_id,
2199 params,
2200 budget,
2201 session.state().clone(),
2202 dispatch_auth.clone(),
2203 Some(notification_sender),
2204 bidirectional_senders.as_ref(),
2205 )?;
2206 Ok(serde_json::to_value(result).map_err(McpError::from)?)
2207 }
2208 "resources/list" => {
2209 let params: ListResourcesParams = parse_params_or_default(params)?;
2210 let result =
2211 self.router
2212 .handle_resources_list(cx, params, Some(session.state()))?;
2213 Ok(serde_json::to_value(result).map_err(McpError::from)?)
2214 }
2215 "resources/templates/list" => {
2216 let params: ListResourceTemplatesParams = parse_params_or_default(params)?;
2217 let result = self.router.handle_resource_templates_list(
2218 cx,
2219 params,
2220 Some(session.state()),
2221 )?;
2222 Ok(serde_json::to_value(result).map_err(McpError::from)?)
2223 }
2224 "resources/read" => {
2225 let params: ReadResourceParams = parse_params(params)?;
2226 let result = self.router.handle_resources_read(
2227 cx,
2228 request_id,
2229 ¶ms,
2230 budget,
2231 session.state().clone(),
2232 dispatch_auth.clone(),
2233 Some(notification_sender),
2234 bidirectional_senders.as_ref(),
2235 )?;
2236 Ok(serde_json::to_value(result).map_err(McpError::from)?)
2237 }
2238 "resources/subscribe" => {
2239 let params: SubscribeResourceParams = parse_params(params)?;
2240 if !self.router.resource_exists(¶ms.uri) {
2241 return Err(McpError::resource_not_found(¶ms.uri));
2242 }
2243 session.subscribe_resource(params.uri);
2244 Ok(serde_json::json!({}))
2245 }
2246 "resources/unsubscribe" => {
2247 let params: UnsubscribeResourceParams = parse_params(params)?;
2248 session.unsubscribe_resource(¶ms.uri);
2249 Ok(serde_json::json!({}))
2250 }
2251 "prompts/list" => {
2252 let params: ListPromptsParams = parse_params_or_default(params)?;
2253 let result =
2254 self.router
2255 .handle_prompts_list(cx, params, Some(session.state()))?;
2256 Ok(serde_json::to_value(result).map_err(McpError::from)?)
2257 }
2258 "prompts/get" => {
2259 let params: GetPromptParams = parse_params(params)?;
2260 let result = self.router.handle_prompts_get(
2261 cx,
2262 request_id,
2263 params,
2264 budget,
2265 session.state().clone(),
2266 dispatch_auth.clone(),
2267 Some(notification_sender),
2268 bidirectional_senders.as_ref(),
2269 )?;
2270 Ok(serde_json::to_value(result).map_err(McpError::from)?)
2271 }
2272 "ping" => {
2273 Ok(serde_json::json!({}))
2275 }
2276 "tasks/list" => {
2278 let params: ListTasksParams = parse_params_or_default(params)?;
2279 let result =
2280 self.router
2281 .handle_tasks_list(cx, params, self.task_manager.as_ref())?;
2282 Ok(serde_json::to_value(result).map_err(McpError::from)?)
2283 }
2284 "tasks/get" => {
2285 let params: GetTaskParams = parse_params(params)?;
2286 let result =
2287 self.router
2288 .handle_tasks_get(cx, params, self.task_manager.as_ref())?;
2289 Ok(serde_json::to_value(result).map_err(McpError::from)?)
2290 }
2291 "tasks/cancel" => {
2292 let params: CancelTaskParams = parse_params(params)?;
2293 let result =
2294 self.router
2295 .handle_tasks_cancel(cx, params, self.task_manager.as_ref())?;
2296 Ok(serde_json::to_value(result).map_err(McpError::from)?)
2297 }
2298 "tasks/submit" => {
2299 let params: SubmitTaskParams = parse_params(params)?;
2300 let result =
2301 self.router
2302 .handle_tasks_submit(cx, params, self.task_manager.as_ref())?;
2303 Ok(serde_json::to_value(result).map_err(McpError::from)?)
2304 }
2305 _ => Err(McpError::method_not_found(method)),
2306 }
2307 })();
2308
2309 let final_result = match result {
2310 Ok(v) => self.apply_middleware_response(&entered_middleware, &mw_ctx, &request, v),
2311 Err(e) => Err(self.apply_middleware_error(&entered_middleware, &mw_ctx, &request, e)),
2312 };
2313
2314 self.maybe_emit_log_notification(
2315 session,
2316 notification_sender,
2317 &request.method,
2318 &final_result,
2319 );
2320
2321 final_result
2322 }
2323
2324 fn dispatch_read_only_http_method(
2325 &self,
2326 cx: &Cx,
2327 session: &SessionView,
2328 request: JsonRpcRequest,
2329 request_id: u64,
2330 budget: &Budget,
2331 notification_sender: &NotificationSender,
2332 request_sender: &bidirectional::RequestSender,
2333 ) -> Result<serde_json::Value, McpError> {
2334 if cx.is_cancel_requested() {
2335 return Err(McpError::request_cancelled());
2336 }
2337
2338 if budget.is_exhausted() {
2339 return Err(McpError::new(
2340 McpErrorCode::RequestCancelled,
2341 "Request budget exhausted",
2342 ));
2343 }
2344
2345 if budget.is_past_deadline(wall_now()) {
2346 cx.cancel_fast(CancelKind::Deadline);
2347 return Err(McpError::new(
2348 McpErrorCode::RequestCancelled,
2349 "Request timeout exceeded",
2350 ));
2351 }
2352
2353 if !session.initialized && request.method != "initialize" && request.method != "ping" {
2354 return Err(McpError::invalid_request(
2355 "Server not initialized. Client must send 'initialize' first.",
2356 ));
2357 }
2358
2359 if let Some(task_manager) = &self.task_manager {
2360 task_manager.set_notification_sender(Arc::clone(notification_sender));
2361 }
2362
2363 let mut mw_ctx = McpContext::with_state(cx.clone(), request_id, session.state.clone());
2364 let request_auth = if self.should_authenticate(&request.method) {
2365 let auth_request = AuthRequest {
2366 method: &request.method,
2367 params: request.params.as_ref(),
2368 request_id,
2369 };
2370 match self.authenticate_request_with_state(cx, request_id, &session.state, auth_request)
2371 {
2372 Ok(auth) => Some(auth),
2373 Err(err) => {
2374 let err = self.apply_global_middleware_error(&mw_ctx, &request, err);
2375 let result = Err(err);
2376 self.maybe_emit_log_notification_for_level(
2377 session.log_level,
2378 notification_sender,
2379 &request.method,
2380 &result,
2381 );
2382 return result;
2383 }
2384 }
2385 } else {
2386 None
2387 };
2388 if let Some(auth) = request_auth.clone() {
2389 mw_ctx = mw_ctx.with_auth(auth);
2390 }
2391
2392 let mut entered_middleware: Vec<&dyn crate::Middleware> = Vec::new();
2393
2394 for m in self.middleware.iter() {
2395 entered_middleware.push(m.as_ref());
2396 match m.on_request(&mw_ctx, &request) {
2397 Ok(crate::MiddlewareDecision::Continue) => {}
2398 Ok(crate::MiddlewareDecision::Respond(v)) => {
2399 let result =
2400 self.apply_middleware_response(&entered_middleware, &mw_ctx, &request, v);
2401 self.maybe_emit_log_notification_for_level(
2402 session.log_level,
2403 notification_sender,
2404 &request.method,
2405 &result,
2406 );
2407 return result;
2408 }
2409 Err(e) => {
2410 let err =
2411 self.apply_middleware_error(&entered_middleware, &mw_ctx, &request, e);
2412 let result = Err(err);
2413 self.maybe_emit_log_notification_for_level(
2414 session.log_level,
2415 notification_sender,
2416 &request.method,
2417 &result,
2418 );
2419 return result;
2420 }
2421 }
2422 }
2423
2424 let dispatch_auth = mw_ctx.auth();
2425
2426 let result: Result<serde_json::Value, McpError> = (|| {
2427 let method = &request.method;
2428 let params = request.params.clone();
2429 let bidirectional_senders =
2430 self.create_bidirectional_senders_from_view(session, request_sender);
2431
2432 match method.as_str() {
2433 "tools/call" => {
2434 let params: CallToolParams = parse_params(params)?;
2435 let result = self.router.handle_tools_call(
2436 cx,
2437 request_id,
2438 params,
2439 budget,
2440 session.state.clone(),
2441 dispatch_auth.clone(),
2442 Some(notification_sender),
2443 bidirectional_senders.as_ref(),
2444 )?;
2445 Ok(serde_json::to_value(result).map_err(McpError::from)?)
2446 }
2447 "resources/read" => {
2448 let params: ReadResourceParams = parse_params(params)?;
2449 let result = self.router.handle_resources_read(
2450 cx,
2451 request_id,
2452 ¶ms,
2453 budget,
2454 session.state.clone(),
2455 dispatch_auth.clone(),
2456 Some(notification_sender),
2457 bidirectional_senders.as_ref(),
2458 )?;
2459 Ok(serde_json::to_value(result).map_err(McpError::from)?)
2460 }
2461 "prompts/get" => {
2462 let params: GetPromptParams = parse_params(params)?;
2463 let result = self.router.handle_prompts_get(
2464 cx,
2465 request_id,
2466 params,
2467 budget,
2468 session.state.clone(),
2469 dispatch_auth.clone(),
2470 Some(notification_sender),
2471 bidirectional_senders.as_ref(),
2472 )?;
2473 Ok(serde_json::to_value(result).map_err(McpError::from)?)
2474 }
2475 _ => Err(McpError::method_not_found(method)),
2476 }
2477 })();
2478
2479 let final_result = match result {
2480 Ok(v) => self.apply_middleware_response(&entered_middleware, &mw_ctx, &request, v),
2481 Err(e) => Err(self.apply_middleware_error(&entered_middleware, &mw_ctx, &request, e)),
2482 };
2483
2484 self.maybe_emit_log_notification_for_level(
2485 session.log_level,
2486 notification_sender,
2487 &request.method,
2488 &final_result,
2489 );
2490
2491 final_result
2492 }
2493
2494 fn apply_middleware_response(
2495 &self,
2496 stack: &[&dyn crate::Middleware],
2497 ctx: &McpContext,
2498 request: &JsonRpcRequest,
2499 value: serde_json::Value,
2500 ) -> Result<serde_json::Value, McpError> {
2501 let mut response = value;
2502 for m in stack.iter().rev() {
2503 match m.on_response(ctx, request, response) {
2504 Ok(next) => response = next,
2505 Err(err) => {
2506 let mapped = self.apply_middleware_error(stack, ctx, request, err);
2507 return Err(mapped);
2508 }
2509 }
2510 }
2511 Ok(response)
2512 }
2513
2514 fn apply_middleware_error(
2515 &self,
2516 stack: &[&dyn crate::Middleware],
2517 ctx: &McpContext,
2518 request: &JsonRpcRequest,
2519 error: McpError,
2520 ) -> McpError {
2521 let mut err = error;
2522 for m in stack.iter().rev() {
2523 err = m.on_error(ctx, request, err);
2524 }
2525 err
2526 }
2527
2528 fn apply_global_middleware_error(
2529 &self,
2530 ctx: &McpContext,
2531 request: &JsonRpcRequest,
2532 error: McpError,
2533 ) -> McpError {
2534 let mut err = error;
2535 for m in self.middleware.iter().rev() {
2536 err = m.on_error(ctx, request, err);
2537 }
2538 err
2539 }
2540
2541 fn create_bidirectional_senders(
2546 &self,
2547 session: &Session,
2548 request_sender: &bidirectional::RequestSender,
2549 ) -> Option<handler::BidirectionalSenders> {
2550 self.create_bidirectional_senders_from_capabilities(
2551 session.supports_sampling(),
2552 session.supports_elicitation(),
2553 request_sender,
2554 )
2555 }
2556
2557 fn create_bidirectional_senders_from_view(
2558 &self,
2559 session: &SessionView,
2560 request_sender: &bidirectional::RequestSender,
2561 ) -> Option<handler::BidirectionalSenders> {
2562 self.create_bidirectional_senders_from_capabilities(
2563 session.supports_sampling,
2564 session.supports_elicitation,
2565 request_sender,
2566 )
2567 }
2568
2569 fn create_bidirectional_senders_from_capabilities(
2570 &self,
2571 supports_sampling: bool,
2572 supports_elicitation: bool,
2573 request_sender: &bidirectional::RequestSender,
2574 ) -> Option<handler::BidirectionalSenders> {
2575 if !supports_sampling && !supports_elicitation {
2576 return None;
2577 }
2578
2579 let mut senders = handler::BidirectionalSenders::new();
2580
2581 if supports_sampling {
2582 let sampling_sender: Arc<dyn fastmcp_core::SamplingSender> = Arc::new(
2583 bidirectional::TransportSamplingSender::new(request_sender.clone()),
2584 );
2585 senders = senders.with_sampling(sampling_sender);
2586 }
2587
2588 if supports_elicitation {
2589 let elicitation_sender: Arc<dyn fastmcp_core::ElicitationSender> = Arc::new(
2590 bidirectional::TransportElicitationSender::new(request_sender.clone()),
2591 );
2592 senders = senders.with_elicitation(elicitation_sender);
2593 }
2594
2595 Some(senders)
2596 }
2597
2598 fn should_authenticate(&self, method: &str) -> bool {
2599 !matches!(
2600 method,
2601 "initialize" | "initialized" | "notifications/cancelled" | "ping"
2602 )
2603 }
2604
2605 fn authenticate_request(
2606 &self,
2607 cx: &Cx,
2608 request_id: u64,
2609 session: &Session,
2610 request: AuthRequest<'_>,
2611 ) -> Result<AuthContext, McpError> {
2612 let Some(provider) = &self.auth_provider else {
2613 return Ok(AuthContext::anonymous());
2614 };
2615
2616 let ctx = McpContext::with_state(cx.clone(), request_id, session.state().clone());
2617 let auth = provider.authenticate(&ctx, request)?;
2618 if !ctx.set_auth(auth.clone()) {
2619 debug!(
2620 target: targets::SESSION,
2621 "Auth context not stored (session state unavailable)"
2622 );
2623 }
2624 Ok(auth)
2625 }
2626
2627 fn authenticate_request_with_state(
2628 &self,
2629 cx: &Cx,
2630 request_id: u64,
2631 session_state: &SessionState,
2632 request: AuthRequest<'_>,
2633 ) -> Result<AuthContext, McpError> {
2634 let Some(provider) = &self.auth_provider else {
2635 return Ok(AuthContext::anonymous());
2636 };
2637
2638 let ctx = McpContext::with_state(cx.clone(), request_id, session_state.clone());
2639 let auth = provider.authenticate(&ctx, request)?;
2640 if !ctx.set_auth(auth.clone()) {
2641 debug!(
2642 target: targets::SESSION,
2643 "Auth context not stored (session state unavailable)"
2644 );
2645 }
2646 Ok(auth)
2647 }
2648
2649 fn handle_cancelled_notification(&self, params: CancelledParams) {
2650 let reason = params.reason.as_deref().unwrap_or("unspecified");
2651 let await_cleanup = params.await_cleanup.unwrap_or(false);
2652 info!(
2653 target: targets::SESSION,
2654 "Cancellation requested for requestId={} (reason: {}, await_cleanup={})",
2655 params.request_id,
2656 reason,
2657 await_cleanup
2658 );
2659 let active = {
2660 let guard = self.active_requests.lock().unwrap_or_else(|poisoned| {
2661 error!(target: targets::SERVER, "active_requests lock poisoned, recovering");
2662 poisoned.into_inner()
2663 });
2664 guard
2665 .get(¶ms.request_id)
2666 .map(|entry| (entry.cx.clone(), entry.region_id, entry.completion.clone()))
2667 };
2668 if let Some((cx, region_id, completion)) = active {
2669 cx.cancel_with(CancelKind::User, None);
2670 if await_cleanup {
2671 let completed = completion.wait_timeout(AWAIT_CLEANUP_TIMEOUT);
2672 if !completed {
2673 fastmcp_core::logging::warn!(
2674 target: targets::SESSION,
2675 "await_cleanup timed out for requestId={} (region={:?})",
2676 params.request_id,
2677 region_id
2678 );
2679 }
2680 }
2681 } else {
2682 fastmcp_core::logging::warn!(
2683 target: targets::SESSION,
2684 "No active request found for cancellation requestId={}",
2685 params.request_id
2686 );
2687 }
2688 }
2689
2690 fn cancel_active_requests(&self, kind: CancelKind, await_cleanup: bool) {
2691 let active: Vec<(RequestId, RegionId, Cx, Arc<RequestCompletion>)> = {
2692 let guard = self.active_requests.lock().unwrap_or_else(|poisoned| {
2693 error!(target: targets::SERVER, "active_requests lock poisoned in cancel_active_requests, recovering");
2694 poisoned.into_inner()
2695 });
2696 guard
2697 .iter()
2698 .map(|(request_id, entry)| {
2699 (
2700 request_id.clone(),
2701 entry.region_id,
2702 entry.cx.clone(),
2703 entry.completion.clone(),
2704 )
2705 })
2706 .collect()
2707 };
2708 if active.is_empty() {
2709 return;
2710 }
2711 info!(
2712 target: targets::SESSION,
2713 "Cancelling {} active request(s) (kind={:?}, await_cleanup={})",
2714 active.len(),
2715 kind,
2716 await_cleanup
2717 );
2718 for (_, _, cx, _) in &active {
2719 cx.cancel_with(kind, None);
2720 }
2721
2722 if await_cleanup {
2723 for (request_id, region_id, _cx, completion) in active {
2724 let completed = completion.wait_timeout(AWAIT_CLEANUP_TIMEOUT);
2725 if !completed {
2726 fastmcp_core::logging::warn!(
2727 target: targets::SESSION,
2728 "Shutdown cancel timed out for requestId={} (region={:?})",
2729 request_id,
2730 region_id
2731 );
2732 }
2733 }
2734 }
2735 }
2736
2737 fn handle_set_log_level(&self, session: &mut Session, params: SetLogLevelParams) {
2738 let requested = match params.level {
2739 LogLevel::Debug => LevelFilter::Debug,
2740 LogLevel::Info => LevelFilter::Info,
2741 LogLevel::Warning => LevelFilter::Warn,
2742 LogLevel::Error => LevelFilter::Error,
2743 };
2744
2745 let configured = self.logging.level.to_level_filter();
2746 let effective = if requested > configured {
2747 configured
2748 } else {
2749 requested
2750 };
2751
2752 log::set_max_level(effective);
2753
2754 let effective_level = match effective {
2755 LevelFilter::Debug => LogLevel::Debug,
2756 LevelFilter::Info => LogLevel::Info,
2757 LevelFilter::Warn => LogLevel::Warning,
2758 LevelFilter::Error => LogLevel::Error,
2759 _ => LogLevel::Info,
2760 };
2761 session.set_log_level(effective_level);
2762
2763 if effective != requested {
2764 fastmcp_core::logging::warn!(
2765 target: targets::SESSION,
2766 "Client requested log level {:?}; clamped to server level {:?}",
2767 params.level,
2768 effective
2769 );
2770 } else {
2771 info!(
2772 target: targets::SESSION,
2773 "Log level set to {:?}",
2774 params.level
2775 );
2776 }
2777 }
2778
2779 fn log_level_rank(level: LogLevel) -> u8 {
2780 match level {
2781 LogLevel::Debug => 1,
2782 LogLevel::Info => 2,
2783 LogLevel::Warning => 3,
2784 LogLevel::Error => 4,
2785 }
2786 }
2787
2788 fn emit_log_notification_for_level(
2789 &self,
2790 min_level: Option<LogLevel>,
2791 sender: &NotificationSender,
2792 level: LogLevel,
2793 message: impl Into<String>,
2794 ) {
2795 let Some(min_level) = min_level else {
2796 return;
2797 };
2798 if Self::log_level_rank(level) < Self::log_level_rank(min_level) {
2799 return;
2800 }
2801
2802 let ts = chrono::Utc::now().to_rfc3339();
2803 let text = format!("{ts} {}", message.into());
2804 let params = LogMessageParams {
2805 level,
2806 logger: Some("fastmcp_rust::server".to_string()),
2807 data: serde_json::Value::String(text),
2808 };
2809 let payload = match serde_json::to_value(params) {
2810 Ok(value) => value,
2811 Err(err) => {
2812 fastmcp_core::logging::warn!(
2813 target: targets::SESSION,
2814 "Failed to serialize log message notification: {}",
2815 err
2816 );
2817 return;
2818 }
2819 };
2820 sender(JsonRpcRequest::notification(
2821 "notifications/message",
2822 Some(payload),
2823 ));
2824 }
2825
2826 fn emit_log_notification(
2827 &self,
2828 session: &Session,
2829 sender: &NotificationSender,
2830 level: LogLevel,
2831 message: impl Into<String>,
2832 ) {
2833 self.emit_log_notification_for_level(session.log_level(), sender, level, message);
2834 }
2835
2836 fn maybe_emit_log_notification_for_level(
2837 &self,
2838 min_level: Option<LogLevel>,
2839 sender: &NotificationSender,
2840 method: &str,
2841 result: &McpResult<serde_json::Value>,
2842 ) {
2843 if method.starts_with("notifications/") || method == "logging/setLevel" {
2844 return;
2845 }
2846 let level = if result.is_ok() {
2847 LogLevel::Info
2848 } else {
2849 LogLevel::Error
2850 };
2851 let message = if result.is_ok() {
2852 format!("Handled {}", method)
2853 } else {
2854 format!("Error handling {}", method)
2855 };
2856 self.emit_log_notification_for_level(min_level, sender, level, message);
2857 }
2858
2859 fn maybe_emit_log_notification(
2860 &self,
2861 session: &Session,
2862 sender: &NotificationSender,
2863 method: &str,
2864 result: &McpResult<serde_json::Value>,
2865 ) {
2866 if method.starts_with("notifications/") || method == "logging/setLevel" {
2867 return;
2868 }
2869 let level = if result.is_ok() {
2870 LogLevel::Info
2871 } else {
2872 LogLevel::Error
2873 };
2874 let message = if result.is_ok() {
2875 format!("Handled {}", method)
2876 } else {
2877 format!("Error handling {}", method)
2878 };
2879 self.emit_log_notification(session, sender, level, message);
2880 }
2881}
2882
2883const AWAIT_CLEANUP_TIMEOUT: Duration = Duration::from_secs(5);
2884
2885struct RequestCompletion {
2886 done: Mutex<bool>,
2887 cv: Condvar,
2888}
2889
2890impl RequestCompletion {
2891 fn new() -> Self {
2892 Self {
2893 done: Mutex::new(false),
2894 cv: Condvar::new(),
2895 }
2896 }
2897
2898 fn mark_done(&self) {
2899 let mut done = self
2900 .done
2901 .lock()
2902 .unwrap_or_else(std::sync::PoisonError::into_inner);
2903 if !*done {
2904 *done = true;
2905 self.cv.notify_all();
2906 }
2907 }
2908
2909 fn wait_timeout(&self, timeout: Duration) -> bool {
2910 let mut done = self
2911 .done
2912 .lock()
2913 .unwrap_or_else(std::sync::PoisonError::into_inner);
2914 if *done {
2915 return true;
2916 }
2917 let start = Instant::now();
2918 let mut remaining = timeout;
2919 loop {
2920 let (guard, result) = self
2921 .cv
2922 .wait_timeout(done, remaining)
2923 .unwrap_or_else(std::sync::PoisonError::into_inner);
2924 done = guard;
2925 if *done {
2926 return true;
2927 }
2928 if result.timed_out() {
2929 return false;
2930 }
2931 let elapsed = start.elapsed();
2932 remaining = match timeout.checked_sub(elapsed) {
2933 Some(left) if !left.is_zero() => left,
2934 _ => return false,
2935 };
2936 }
2937 }
2938
2939 fn is_done(&self) -> bool {
2940 let done = self
2941 .done
2942 .lock()
2943 .unwrap_or_else(std::sync::PoisonError::into_inner);
2944 *done
2945 }
2946}
2947
2948struct ActiveRequest {
2949 cx: Cx,
2950 region_id: RegionId,
2951 completion: Arc<RequestCompletion>,
2952}
2953
2954impl ActiveRequest {
2955 fn new(cx: Cx, completion: Arc<RequestCompletion>) -> Self {
2956 let region_id = cx.region_id();
2957 Self {
2958 cx,
2959 region_id,
2960 completion,
2961 }
2962 }
2963}
2964
2965struct ActiveRequestGuard<'a> {
2966 map: &'a Mutex<HashMap<RequestId, ActiveRequest>>,
2967 id: RequestId,
2968 completion: Arc<RequestCompletion>,
2969}
2970
2971impl<'a> ActiveRequestGuard<'a> {
2972 fn try_new(
2973 map: &'a Mutex<HashMap<RequestId, ActiveRequest>>,
2974 id: RequestId,
2975 cx: Cx,
2976 ) -> Result<Self, RequestId> {
2977 let completion = Arc::new(RequestCompletion::new());
2978 let entry = ActiveRequest::new(cx, completion.clone());
2979 let mut guard = map
2980 .lock()
2981 .unwrap_or_else(std::sync::PoisonError::into_inner);
2982 if guard.contains_key(&id) {
2983 fastmcp_core::logging::warn!(
2984 target: targets::SESSION,
2985 "Duplicate active requestId={} rejected while an earlier request is still running",
2986 id
2987 );
2988 return Err(id);
2989 }
2990 guard.insert(id.clone(), entry);
2991 Ok(Self {
2992 map,
2993 id,
2994 completion,
2995 })
2996 }
2997}
2998
2999impl Drop for ActiveRequestGuard<'_> {
3000 fn drop(&mut self) {
3001 {
3002 let mut guard = self
3003 .map
3004 .lock()
3005 .unwrap_or_else(std::sync::PoisonError::into_inner);
3006 match guard.get(&self.id) {
3007 Some(entry) if Arc::ptr_eq(&entry.completion, &self.completion) => {
3008 guard.remove(&self.id);
3009 }
3010 Some(_) => {
3011 fastmcp_core::logging::warn!(
3012 target: targets::SESSION,
3013 "Active request replaced before drop for requestId={}",
3014 self.id
3015 );
3016 }
3017 None => {
3018 fastmcp_core::logging::warn!(
3019 target: targets::SESSION,
3020 "Active request missing on drop for requestId={}",
3021 self.id
3022 );
3023 }
3024 }
3025 }
3026 self.completion.mark_done();
3027 }
3028}
3029
3030fn banner_suppressed() -> bool {
3034 std::env::var("FASTMCP_NO_BANNER")
3035 .map(|value| matches!(value.to_lowercase().as_str(), "1" | "true" | "yes"))
3036 .unwrap_or(false)
3037}
3038
3039fn parse_params<T: serde::de::DeserializeOwned>(
3041 params: Option<serde_json::Value>,
3042) -> Result<T, McpError> {
3043 let value = params.ok_or_else(|| McpError::invalid_params("Missing required parameters"))?;
3044 serde_json::from_value(value).map_err(|e| McpError::invalid_params(e.to_string()))
3045}
3046
3047fn parse_params_or_default<T: serde::de::DeserializeOwned + Default>(
3049 params: Option<serde_json::Value>,
3050) -> Result<T, McpError> {
3051 match params {
3052 Some(value) => {
3053 serde_json::from_value(value).map_err(|e| McpError::invalid_params(e.to_string()))
3054 }
3055 None => Ok(T::default()),
3056 }
3057}
3058
3059fn request_id_to_u64(id: Option<&RequestId>) -> u64 {
3064 match id {
3065 Some(RequestId::Number(n)) => *n as u64,
3066 Some(RequestId::String(s)) => stable_hash_request_id(s),
3067 None => 0,
3068 }
3069}
3070
3071fn stable_hash_request_id(value: &str) -> u64 {
3072 const FNV_OFFSET: u64 = 0xcbf29ce484222325;
3073 const FNV_PRIME: u64 = 0x0000_0100_0000_01b3;
3074 let mut hash = FNV_OFFSET;
3075 for byte in value.as_bytes() {
3076 hash ^= u64::from(*byte);
3077 hash = hash.wrapping_mul(FNV_PRIME);
3078 }
3079 if hash == 0 { FNV_OFFSET } else { hash }
3080}
3081
3082struct SharedTransport<T> {
3083 inner: Arc<Mutex<T>>,
3084}
3085
3086impl<T> Clone for SharedTransport<T> {
3087 fn clone(&self) -> Self {
3088 Self {
3089 inner: Arc::clone(&self.inner),
3090 }
3091 }
3092}
3093
3094impl<T: Transport> SharedTransport<T> {
3095 fn new(transport: T) -> Self {
3096 Self {
3097 inner: Arc::new(Mutex::new(transport)),
3098 }
3099 }
3100
3101 fn recv(&self, cx: &Cx) -> Result<JsonRpcMessage, TransportError> {
3102 let mut guard = self.inner.lock().map_err(|_| transport_lock_error())?;
3103 guard.recv(cx)
3104 }
3105
3106 fn send(&self, cx: &Cx, message: &JsonRpcMessage) -> Result<(), TransportError> {
3107 let mut guard = self.inner.lock().map_err(|_| transport_lock_error())?;
3108 guard.send(cx, message)
3109 }
3110}
3111
3112fn transport_lock_error() -> TransportError {
3113 TransportError::Io(std::io::Error::other("transport lock poisoned"))
3114}
3115
3116fn create_transport_notification_sender<T>(transport: SharedTransport<T>) -> NotificationSender
3117where
3118 T: Transport + Send + 'static,
3119{
3120 let cx = Cx::for_request();
3121
3122 Arc::new(move |request: JsonRpcRequest| {
3123 let message = JsonRpcMessage::Request(request);
3124 if let Err(e) = transport.send(&cx, &message) {
3125 log::error!(
3126 target: targets::TRANSPORT,
3127 "Failed to send notification: {}",
3128 e
3129 );
3130 }
3131 })
3132}
3133
3134fn create_notification_sender() -> NotificationSender {
3143 use std::sync::Mutex;
3144
3145 let stdout = Mutex::new(AsyncStdout::new());
3148 let codec = Codec::new();
3149
3150 Arc::new(move |request: JsonRpcRequest| {
3151 let bytes = match codec.encode_request(&request) {
3152 Ok(b) => b,
3153 Err(e) => {
3154 log::error!(target: targets::SERVER, "Failed to encode notification: {}", e);
3155 return;
3156 }
3157 };
3158
3159 if let Ok(mut stdout) = stdout.lock() {
3160 if let Err(e) = stdout.write_all_unchecked(&bytes) {
3161 log::error!(target: targets::TRANSPORT, "Failed to send notification: {}", e);
3162 }
3163 if let Err(e) = stdout.flush_unchecked() {
3164 log::error!(target: targets::TRANSPORT, "Failed to flush notification: {}", e);
3165 }
3166 } else {
3167 log::warn!(target: targets::SERVER, "Failed to acquire stdout lock for notification");
3168 }
3169 })
3170}
3171
3172#[cfg(test)]
3173mod lib_unit_tests {
3174 use super::*;
3175 use fastmcp_derive::tool;
3176 use fastmcp_protocol::{CallToolResult, Content};
3177 use std::sync::OnceLock;
3178 use std::sync::atomic::{AtomicUsize, Ordering};
3179 use std::thread;
3180 use std::time::Duration;
3181
3182 #[derive(Debug, Default)]
3183 struct HttpOverlapMetrics {
3184 current: AtomicUsize,
3185 max: AtomicUsize,
3186 }
3187
3188 static HTTP_OVERLAP_METRICS: OnceLock<HttpOverlapMetrics> = OnceLock::new();
3189 static HTTP_OVERLAP_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
3190
3191 fn http_overlap_metrics() -> &'static HttpOverlapMetrics {
3192 HTTP_OVERLAP_METRICS.get_or_init(HttpOverlapMetrics::default)
3193 }
3194
3195 fn http_overlap_lock() -> &'static Mutex<()> {
3196 HTTP_OVERLAP_LOCK.get_or_init(|| Mutex::new(()))
3197 }
3198
3199 fn reset_http_overlap_metrics() {
3200 let metrics = http_overlap_metrics();
3201 metrics.current.store(0, Ordering::SeqCst);
3202 metrics.max.store(0, Ordering::SeqCst);
3203 }
3204
3205 fn test_request_sender() -> RequestSender {
3206 let pending = Arc::new(PendingRequests::new());
3207 let send_fn: bidirectional::TransportSendFn =
3208 Arc::new(|message| Err(format!("unexpected outbound message in test: {message:?}")));
3209 RequestSender::new(pending, send_fn)
3210 }
3211
3212 fn http_json_request(method: &str, params: serde_json::Value, id: i64) -> HttpRequest {
3213 let request = JsonRpcRequest::new(method, Some(params), id);
3214 HttpRequest::new(HttpMethod::Post, "/mcp")
3215 .with_header("content-type", "application/json")
3216 .with_body(serde_json::to_vec(&request).expect("serialize JSON-RPC request"))
3217 }
3218
3219 #[tool(
3220 name = "http_overlap_tool",
3221 description = "Records concurrent overlap for HTTP tests",
3222 annotations(read_only)
3223 )]
3224 fn http_overlap_tool(_ctx: &McpContext) -> String {
3225 let metrics = http_overlap_metrics();
3226 let current = metrics.current.fetch_add(1, Ordering::SeqCst) + 1;
3227 metrics.max.fetch_max(current, Ordering::SeqCst);
3228 thread::sleep(Duration::from_millis(100));
3229 metrics.current.fetch_sub(1, Ordering::SeqCst);
3230 "overlap-ok".to_string()
3231 }
3232
3233 #[tool(
3234 name = "http_auth_echo_tool_runtime",
3235 description = "Returns the request-scoped auth subject while recording overlap",
3236 annotations(read_only)
3237 )]
3238 fn http_auth_echo_tool_runtime(ctx: &McpContext) -> String {
3239 let metrics = http_overlap_metrics();
3240 let current = metrics.current.fetch_add(1, Ordering::SeqCst) + 1;
3241 metrics.max.fetch_max(current, Ordering::SeqCst);
3242 thread::sleep(Duration::from_millis(100));
3243 metrics.current.fetch_sub(1, Ordering::SeqCst);
3244 ctx.auth()
3245 .and_then(|auth| auth.subject)
3246 .unwrap_or_else(|| "anonymous".to_string())
3247 }
3248
3249 #[tool(
3250 name = "http_stateful_increment_tool",
3251 description = "Increments a session counter across HTTP requests"
3252 )]
3253 fn http_stateful_increment_tool(ctx: &McpContext) -> String {
3254 let count: i32 = ctx.get_state("http_counter").unwrap_or(0);
3255 let next = count + 1;
3256 assert!(ctx.set_state("http_counter", next));
3257 format!("Counter: {next}")
3258 }
3259
3260 #[tool(
3261 name = "http_current_auth_subject_tool",
3262 description = "Returns the current request auth subject",
3263 annotations(read_only)
3264 )]
3265 fn http_current_auth_subject_tool(ctx: &McpContext) -> String {
3266 ctx.auth()
3267 .and_then(|auth| auth.subject)
3268 .unwrap_or_else(|| "anonymous".to_string())
3269 }
3270
3271 #[tool(
3272 name = "http_current_auth_subject_exclusive_tool",
3273 description = "Returns the current request auth subject from the exclusive path"
3274 )]
3275 fn http_current_auth_subject_exclusive_tool(ctx: &McpContext) -> String {
3276 ctx.auth()
3277 .and_then(|auth| auth.subject)
3278 .unwrap_or_else(|| "anonymous".to_string())
3279 }
3280
3281 #[derive(Debug, Clone)]
3282 struct CapturingAuthMiddleware {
3283 seen: Arc<Mutex<Vec<(String, Option<String>)>>>,
3284 }
3285
3286 impl Middleware for CapturingAuthMiddleware {
3287 fn on_request(
3288 &self,
3289 ctx: &McpContext,
3290 request: &JsonRpcRequest,
3291 ) -> McpResult<MiddlewareDecision> {
3292 self.seen
3293 .lock()
3294 .expect("captured auth middleware mutex should not be poisoned")
3295 .push((
3296 request.method.clone(),
3297 ctx.auth().and_then(|auth| auth.subject),
3298 ));
3299 Ok(MiddlewareDecision::Continue)
3300 }
3301 }
3302
3303 #[derive(Debug, Clone)]
3304 struct OverridingAuthMiddleware {
3305 subject: &'static str,
3306 }
3307
3308 impl Middleware for OverridingAuthMiddleware {
3309 fn on_request(
3310 &self,
3311 ctx: &McpContext,
3312 _request: &JsonRpcRequest,
3313 ) -> McpResult<MiddlewareDecision> {
3314 ctx.set_auth(AuthContext::with_subject(self.subject));
3315 Ok(MiddlewareDecision::Continue)
3316 }
3317 }
3318
3319 #[derive(Debug)]
3320 struct AlwaysFailAuthProvider;
3321
3322 impl AuthProvider for AlwaysFailAuthProvider {
3323 fn authenticate(
3324 &self,
3325 _ctx: &McpContext,
3326 _request: AuthRequest<'_>,
3327 ) -> McpResult<AuthContext> {
3328 Err(McpError::invalid_request("auth failed"))
3329 }
3330 }
3331
3332 #[derive(Debug, Clone)]
3333 struct RewritingErrorMiddleware;
3334
3335 impl Middleware for RewritingErrorMiddleware {
3336 fn on_error(
3337 &self,
3338 _ctx: &McpContext,
3339 _request: &JsonRpcRequest,
3340 error: McpError,
3341 ) -> McpError {
3342 McpError::new(error.code, format!("rewritten: {}", error.message))
3343 }
3344 }
3345
3346 #[test]
3349 fn parse_params_none_returns_error() {
3350 let result = parse_params::<serde_json::Value>(None);
3351 let err = result.unwrap_err();
3352 assert!(err.message.contains("Missing required parameters"));
3353 }
3354
3355 #[test]
3356 fn parse_params_invalid_json_returns_error() {
3357 let result = parse_params::<ListToolsParams>(Some(serde_json::json!("not_an_object")));
3359 assert!(result.is_err());
3360 }
3361
3362 #[test]
3363 fn parse_params_valid_json_succeeds() {
3364 let result = parse_params::<ReadResourceParams>(Some(serde_json::json!({"uri": "x://y"})));
3365 let params = result.unwrap();
3366 assert_eq!(params.uri, "x://y");
3367 }
3368
3369 #[test]
3372 fn parse_params_or_default_none_returns_default() {
3373 let result = parse_params_or_default::<ListToolsParams>(None);
3374 let params = result.unwrap();
3375 assert!(params.cursor.is_none());
3376 }
3377
3378 #[test]
3379 fn parse_params_or_default_invalid_json_returns_error() {
3380 let result =
3381 parse_params_or_default::<ListToolsParams>(Some(serde_json::json!("bad_input")));
3382 assert!(result.is_err());
3383 }
3384
3385 #[test]
3386 fn parse_params_or_default_valid_json_succeeds() {
3387 let result =
3388 parse_params_or_default::<ListToolsParams>(Some(serde_json::json!({"cursor": "abc"})));
3389 let params = result.unwrap();
3390 assert_eq!(params.cursor.as_deref(), Some("abc"));
3391 }
3392
3393 #[test]
3396 fn request_id_to_u64_number() {
3397 let id = RequestId::Number(42);
3398 assert_eq!(request_id_to_u64(Some(&id)), 42);
3399 }
3400
3401 #[test]
3402 fn request_id_to_u64_string() {
3403 let id = RequestId::String("req-123".to_string());
3404 let result = request_id_to_u64(Some(&id));
3405 assert_ne!(result, 0);
3406 }
3407
3408 #[test]
3409 fn request_id_to_u64_none() {
3410 assert_eq!(request_id_to_u64(None), 0);
3411 }
3412
3413 #[test]
3416 fn stable_hash_is_deterministic() {
3417 let h1 = stable_hash_request_id("test");
3418 let h2 = stable_hash_request_id("test");
3419 assert_eq!(h1, h2);
3420 }
3421
3422 #[test]
3423 fn stable_hash_never_returns_zero() {
3424 assert_ne!(stable_hash_request_id(""), 0);
3426 assert_ne!(stable_hash_request_id("a"), 0);
3427 }
3428
3429 #[test]
3430 fn stable_hash_different_inputs_differ() {
3431 let h1 = stable_hash_request_id("alpha");
3432 let h2 = stable_hash_request_id("beta");
3433 assert_ne!(h1, h2);
3434 }
3435
3436 #[test]
3439 fn request_completion_new_is_not_done() {
3440 let rc = RequestCompletion::new();
3441 assert!(!rc.is_done());
3442 }
3443
3444 #[test]
3445 fn request_completion_mark_done_sets_done() {
3446 let rc = RequestCompletion::new();
3447 rc.mark_done();
3448 assert!(rc.is_done());
3449 }
3450
3451 #[test]
3452 fn request_completion_mark_done_idempotent() {
3453 let rc = RequestCompletion::new();
3454 rc.mark_done();
3455 rc.mark_done(); assert!(rc.is_done());
3457 }
3458
3459 #[test]
3460 fn request_completion_wait_timeout_returns_true_if_done() {
3461 let rc = RequestCompletion::new();
3462 rc.mark_done();
3463 assert!(rc.wait_timeout(Duration::from_millis(10)));
3464 }
3465
3466 #[test]
3467 fn request_completion_wait_timeout_returns_false_if_not_done() {
3468 let rc = RequestCompletion::new();
3469 assert!(!rc.wait_timeout(Duration::from_millis(10)));
3470 }
3471
3472 #[test]
3475 fn duplicate_behavior_default_is_warn() {
3476 assert_eq!(DuplicateBehavior::default(), DuplicateBehavior::Warn);
3477 }
3478
3479 #[test]
3480 fn duplicate_behavior_debug_and_clone() {
3481 let b = DuplicateBehavior::Error;
3482 let debug = format!("{:?}", b);
3483 assert!(debug.contains("Error"));
3484 let cloned = b;
3485 assert_eq!(cloned, DuplicateBehavior::Error);
3486 }
3487
3488 #[test]
3489 fn duplicate_behavior_all_variants_are_distinct() {
3490 assert_ne!(DuplicateBehavior::Error, DuplicateBehavior::Warn);
3491 assert_ne!(DuplicateBehavior::Warn, DuplicateBehavior::Replace);
3492 assert_ne!(DuplicateBehavior::Replace, DuplicateBehavior::Ignore);
3493 }
3494
3495 #[test]
3498 fn logging_config_default_values() {
3499 let config = LoggingConfig::default();
3500 assert_eq!(config.level, Level::Info);
3501 assert!(config.timestamps);
3502 assert!(config.targets);
3503 assert!(!config.file_line);
3504 }
3505
3506 #[test]
3509 fn lifespan_hooks_new_has_no_hooks() {
3510 let hooks = LifespanHooks::new();
3511 assert!(hooks.on_startup.is_none());
3512 assert!(hooks.on_shutdown.is_none());
3513 }
3514
3515 #[test]
3518 fn log_level_rank_ordering() {
3519 assert!(Server::log_level_rank(LogLevel::Debug) < Server::log_level_rank(LogLevel::Info));
3520 assert!(Server::log_level_rank(LogLevel::Info) < Server::log_level_rank(LogLevel::Warning));
3521 assert!(
3522 Server::log_level_rank(LogLevel::Warning) < Server::log_level_rank(LogLevel::Error)
3523 );
3524 }
3525
3526 #[test]
3529 fn active_request_guard_removes_on_drop() {
3530 let map = Mutex::new(HashMap::new());
3531 let cx = Cx::for_testing();
3532 let id = RequestId::Number(1);
3533 {
3534 let _guard = ActiveRequestGuard::try_new(&map, id.clone(), cx).expect("insert guard");
3535 assert_eq!(map.lock().unwrap().len(), 1);
3536 }
3537 assert_eq!(map.lock().unwrap().len(), 0);
3539 }
3540
3541 #[test]
3542 fn active_request_guard_rejects_duplicate_request_id() {
3543 let map = Mutex::new(HashMap::new());
3544 let first = ActiveRequestGuard::try_new(&map, RequestId::Number(7), Cx::for_testing())
3545 .expect("first request should register");
3546 let duplicate = ActiveRequestGuard::try_new(&map, RequestId::Number(7), Cx::for_testing());
3547 assert!(
3548 duplicate.is_err(),
3549 "duplicate active request id must be rejected"
3550 );
3551 drop(first);
3552 assert!(map.lock().unwrap().is_empty());
3553 }
3554
3555 #[test]
3560 fn logging_config_debug_and_clone() {
3561 let config = LoggingConfig::default();
3562 let debug = format!("{config:?}");
3563 assert!(debug.contains("LoggingConfig"));
3564 assert!(debug.contains("Info"));
3565
3566 let cloned = config.clone();
3567 assert_eq!(cloned.level, Level::Info);
3568 assert_eq!(cloned.timestamps, config.timestamps);
3569 }
3570
3571 #[test]
3572 fn transport_lock_error_is_io() {
3573 let err = transport_lock_error();
3574 match err {
3575 TransportError::Io(io) => {
3576 assert!(io.to_string().contains("poisoned"));
3577 }
3578 other => panic!("expected Io variant, got: {other:?}"),
3579 }
3580 }
3581
3582 #[test]
3583 fn lifespan_hooks_default_matches_new() {
3584 let default_hooks = LifespanHooks::default();
3585 let new_hooks = LifespanHooks::new();
3586 assert!(default_hooks.on_startup.is_none());
3587 assert!(default_hooks.on_shutdown.is_none());
3588 assert!(new_hooks.on_startup.is_none());
3589 assert!(new_hooks.on_shutdown.is_none());
3590 }
3591
3592 #[test]
3593 fn request_completion_wait_resolves_on_concurrent_done() {
3594 use std::sync::Arc;
3595 use std::thread;
3596
3597 let rc = Arc::new(RequestCompletion::new());
3598 let rc_clone = rc.clone();
3599
3600 let handle = thread::spawn(move || {
3601 thread::sleep(Duration::from_millis(20));
3602 rc_clone.mark_done();
3603 });
3604
3605 assert!(rc.wait_timeout(Duration::from_secs(2)));
3607 handle.join().unwrap();
3608 }
3609
3610 #[test]
3611 fn active_request_stores_region_id() {
3612 let cx = Cx::for_testing();
3613 let expected_region = cx.region_id();
3614 let completion = Arc::new(RequestCompletion::new());
3615 let ar = ActiveRequest::new(cx, completion);
3616 assert_eq!(ar.region_id, expected_region);
3617 }
3618
3619 #[test]
3620 fn http_request_execution_mode_classifies_methods() {
3621 let mut router = Router::new();
3622 router.add_tool(HttpOverlapTool);
3623 router.add_tool(HttpStatefulIncrementTool);
3624
3625 assert_eq!(
3626 HttpRequestExecutionMode::for_request(
3627 &router,
3628 &JsonRpcRequest::new(
3629 "tools/call",
3630 Some(serde_json::json!({
3631 "name": "http_overlap_tool",
3632 "arguments": {}
3633 })),
3634 1,
3635 ),
3636 ),
3637 HttpRequestExecutionMode::ConcurrentReadOnly
3638 );
3639 assert_eq!(
3640 HttpRequestExecutionMode::for_request(
3641 &router,
3642 &JsonRpcRequest::new(
3643 "tools/call",
3644 Some(serde_json::json!({
3645 "name": "http_stateful_increment_tool",
3646 "arguments": {}
3647 })),
3648 2,
3649 ),
3650 ),
3651 HttpRequestExecutionMode::ExclusiveSession
3652 );
3653 assert_eq!(
3654 HttpRequestExecutionMode::for_request(
3655 &router,
3656 &JsonRpcRequest::new("resources/read", None, 3),
3657 ),
3658 HttpRequestExecutionMode::ConcurrentReadOnly
3659 );
3660 assert_eq!(
3661 HttpRequestExecutionMode::for_request(
3662 &router,
3663 &JsonRpcRequest::new("prompts/get", None, 4)
3664 ),
3665 HttpRequestExecutionMode::ConcurrentReadOnly
3666 );
3667
3668 assert_eq!(
3669 HttpRequestExecutionMode::for_request(
3670 &router,
3671 &JsonRpcRequest::new("initialize", None, 5)
3672 ),
3673 HttpRequestExecutionMode::ExclusiveSession
3674 );
3675 assert_eq!(
3676 HttpRequestExecutionMode::for_request(
3677 &router,
3678 &JsonRpcRequest::new("logging/setLevel", None, 6),
3679 ),
3680 HttpRequestExecutionMode::ExclusiveSession
3681 );
3682 assert_eq!(
3683 HttpRequestExecutionMode::for_request(
3684 &router,
3685 &JsonRpcRequest::new("resources/subscribe", None, 7),
3686 ),
3687 HttpRequestExecutionMode::ExclusiveSession
3688 );
3689 }
3690
3691 #[test]
3692 fn http_read_only_requests_can_overlap_without_session_mutex_serialization() {
3693 let _guard = http_overlap_lock()
3694 .lock()
3695 .expect("http overlap test lock poisoned");
3696 reset_http_overlap_metrics();
3697
3698 let server = Arc::new(
3699 Server::new("http-test-server", "1.0.0")
3700 .tool(HttpOverlapTool)
3701 .build(),
3702 );
3703 let session = Arc::new(Mutex::new(Session::new(
3704 server.info.clone(),
3705 server.capabilities.clone(),
3706 )));
3707 session.lock().expect("session lock poisoned").initialize(
3708 fastmcp_protocol::ClientInfo {
3709 name: "http-test-client".to_string(),
3710 version: "1.0.0".to_string(),
3711 },
3712 fastmcp_protocol::ClientCapabilities::default(),
3713 "2024-11-05".to_string(),
3714 );
3715
3716 let http_handler = Arc::new(HttpRequestHandler::new());
3717 let notification_sender: NotificationSender = Arc::new(|_| {});
3718 let request_sender = test_request_sender();
3719 let start = Arc::new(std::sync::Barrier::new(3));
3720
3721 let run_request = |id| {
3722 let server = Arc::clone(&server);
3723 let session = Arc::clone(&session);
3724 let http_handler = Arc::clone(&http_handler);
3725 let notification_sender = Arc::clone(¬ification_sender);
3726 let request_sender = request_sender.clone();
3727 let start = Arc::clone(&start);
3728 thread::spawn(move || {
3729 start.wait();
3730 let cx = Cx::for_testing();
3731 let request = http_json_request(
3732 "tools/call",
3733 serde_json::json!({
3734 "name": "http_overlap_tool",
3735 "arguments": {}
3736 }),
3737 id,
3738 );
3739 let traffic_renderer: Option<RequestResponseRenderer> = None;
3740 let response = server.handle_http_mcp_request(
3741 &cx,
3742 &session,
3743 &http_handler,
3744 &request,
3745 ¬ification_sender,
3746 &request_sender,
3747 &traffic_renderer,
3748 );
3749 assert_eq!(response.status, HttpStatus::OK);
3750 let json: JsonRpcResponse =
3751 serde_json::from_slice(&response.body).expect("parse HTTP JSON-RPC response");
3752 assert!(
3753 json.error.is_none(),
3754 "unexpected error response: {:?}",
3755 json.error
3756 );
3757 })
3758 };
3759
3760 let first = run_request(1);
3761 let second = run_request(2);
3762 start.wait();
3763
3764 first.join().expect("first HTTP request thread panicked");
3765 second.join().expect("second HTTP request thread panicked");
3766
3767 let overlap = http_overlap_metrics().max.load(Ordering::SeqCst);
3768 assert!(
3769 overlap >= 2,
3770 "expected concurrent HTTP tools/call overlap, observed max overlap {overlap}"
3771 );
3772 }
3773
3774 #[test]
3775 fn http_read_only_requests_keep_request_auth_isolated() {
3776 #[derive(Debug)]
3777 struct EchoAuthProvider;
3778
3779 impl AuthProvider for EchoAuthProvider {
3780 fn authenticate(
3781 &self,
3782 _ctx: &McpContext,
3783 request: AuthRequest<'_>,
3784 ) -> McpResult<AuthContext> {
3785 let access = request
3786 .access_token()
3787 .ok_or_else(|| McpError::invalid_request("missing auth token"))?;
3788 Ok(AuthContext {
3789 subject: Some(access.token.clone()),
3790 token: Some(access),
3791 ..AuthContext::default()
3792 })
3793 }
3794 }
3795
3796 let guard = http_overlap_lock()
3797 .lock()
3798 .expect("http overlap test lock poisoned");
3799 reset_http_overlap_metrics();
3800
3801 let server = Arc::new(
3802 Server::new("http-auth-test-server", "1.0.0")
3803 .auth_provider(EchoAuthProvider)
3804 .tool(HttpAuthEchoToolRuntime)
3805 .build(),
3806 );
3807 let session = Arc::new(Mutex::new(Session::new(
3808 server.info.clone(),
3809 server.capabilities.clone(),
3810 )));
3811 session.lock().expect("session lock poisoned").initialize(
3812 fastmcp_protocol::ClientInfo {
3813 name: "http-auth-test-client".to_string(),
3814 version: "1.0.0".to_string(),
3815 },
3816 fastmcp_protocol::ClientCapabilities::default(),
3817 "2024-11-05".to_string(),
3818 );
3819
3820 let http_handler = Arc::new(HttpRequestHandler::new());
3821 let notification_sender: NotificationSender = Arc::new(|_| {});
3822 let request_sender = test_request_sender();
3823 let start = Arc::new(std::sync::Barrier::new(3));
3824
3825 let run_request = |id, token: &'static str| {
3826 let server = Arc::clone(&server);
3827 let session = Arc::clone(&session);
3828 let http_handler = Arc::clone(&http_handler);
3829 let notification_sender = Arc::clone(¬ification_sender);
3830 let request_sender = request_sender.clone();
3831 let start = Arc::clone(&start);
3832 thread::spawn(move || {
3833 start.wait();
3834 let cx = Cx::for_testing();
3835 let request = http_json_request(
3836 "tools/call",
3837 serde_json::json!({
3838 "name": "http_auth_echo_tool_runtime",
3839 "arguments": {},
3840 "auth": format!("Bearer {token}")
3841 }),
3842 id,
3843 );
3844 let traffic_renderer: Option<RequestResponseRenderer> = None;
3845 let response = server.handle_http_mcp_request(
3846 &cx,
3847 &session,
3848 &http_handler,
3849 &request,
3850 ¬ification_sender,
3851 &request_sender,
3852 &traffic_renderer,
3853 );
3854 assert_eq!(response.status, HttpStatus::OK);
3855 let json: JsonRpcResponse =
3856 serde_json::from_slice(&response.body).expect("parse HTTP JSON-RPC response");
3857 let result = json.result.expect("authenticated request should succeed");
3858 let tool_result: CallToolResult =
3859 serde_json::from_value(result).expect("parse tool result payload");
3860 assert!(
3861 !tool_result.is_error,
3862 "auth echo tool unexpectedly returned an error payload"
3863 );
3864 match tool_result.content.as_slice() {
3865 [Content::Text { text }] => text.clone(),
3866 other => panic!("expected single text tool result, got {other:?}"),
3867 }
3868 })
3869 };
3870
3871 let first = run_request(1, "alpha");
3872 let second = run_request(2, "beta");
3873 start.wait();
3874
3875 let first_result = first.join();
3876 let second_result = second.join();
3877 let overlap = http_overlap_metrics().max.load(Ordering::SeqCst);
3878 drop(guard);
3879
3880 let first_subject = first_result.expect("first auth request thread panicked");
3881 let second_subject = second_result.expect("second auth request thread panicked");
3882
3883 assert_eq!(first_subject, "alpha");
3884 assert_eq!(second_subject, "beta");
3885 assert!(
3886 overlap >= 2,
3887 "expected authenticated requests to overlap, observed max overlap {overlap}"
3888 );
3889 }
3890
3891 #[test]
3892 fn http_stateful_tool_calls_preserve_session_state_updates() {
3893 let server = Arc::new(
3894 Server::new("http-state-test-server", "1.0.0")
3895 .tool(HttpStatefulIncrementTool)
3896 .build(),
3897 );
3898 let session = Arc::new(Mutex::new(Session::new(
3899 server.info.clone(),
3900 server.capabilities.clone(),
3901 )));
3902 session.lock().expect("session lock poisoned").initialize(
3903 fastmcp_protocol::ClientInfo {
3904 name: "http-state-test-client".to_string(),
3905 version: "1.0.0".to_string(),
3906 },
3907 fastmcp_protocol::ClientCapabilities::default(),
3908 "2024-11-05".to_string(),
3909 );
3910
3911 let http_handler = Arc::new(HttpRequestHandler::new());
3912 let notification_sender: NotificationSender = Arc::new(|_| {});
3913 let request_sender = test_request_sender();
3914
3915 let run_request = |id| {
3916 let cx = Cx::for_testing();
3917 let request = http_json_request(
3918 "tools/call",
3919 serde_json::json!({
3920 "name": "http_stateful_increment_tool",
3921 "arguments": {}
3922 }),
3923 id,
3924 );
3925 let traffic_renderer: Option<RequestResponseRenderer> = None;
3926 let response = server.handle_http_mcp_request(
3927 &cx,
3928 &session,
3929 &http_handler,
3930 &request,
3931 ¬ification_sender,
3932 &request_sender,
3933 &traffic_renderer,
3934 );
3935 assert_eq!(response.status, HttpStatus::OK);
3936 let json: JsonRpcResponse =
3937 serde_json::from_slice(&response.body).expect("parse HTTP JSON-RPC response");
3938 let result = json.result.expect("stateful request should succeed");
3939 let tool_result: CallToolResult =
3940 serde_json::from_value(result).expect("parse tool result payload");
3941 assert!(!tool_result.is_error, "stateful tool unexpectedly errored");
3942 match tool_result.content.as_slice() {
3943 [Content::Text { text }] => text.clone(),
3944 other => panic!("expected single text tool result, got {other:?}"),
3945 }
3946 };
3947
3948 assert_eq!(run_request(1), "Counter: 1");
3949 assert_eq!(run_request(2), "Counter: 2");
3950 }
3951
3952 #[test]
3953 fn http_exclusive_requests_expose_request_auth_to_middleware() {
3954 #[derive(Debug)]
3955 struct EchoAuthProvider;
3956
3957 impl AuthProvider for EchoAuthProvider {
3958 fn authenticate(
3959 &self,
3960 _ctx: &McpContext,
3961 request: AuthRequest<'_>,
3962 ) -> McpResult<AuthContext> {
3963 let access = request
3964 .access_token()
3965 .ok_or_else(|| McpError::invalid_request("missing auth token"))?;
3966 Ok(AuthContext {
3967 subject: Some(access.token.clone()),
3968 token: Some(access),
3969 ..AuthContext::default()
3970 })
3971 }
3972 }
3973
3974 let seen = Arc::new(Mutex::new(Vec::new()));
3975 let middleware = CapturingAuthMiddleware {
3976 seen: Arc::clone(&seen),
3977 };
3978 let server = Server::new("http-middleware-auth-test-server", "1.0.0")
3979 .auth_provider(EchoAuthProvider)
3980 .middleware(middleware)
3981 .build();
3982 let session = Arc::new(Mutex::new(Session::new(
3983 server.info.clone(),
3984 server.capabilities.clone(),
3985 )));
3986 session.lock().expect("session lock poisoned").initialize(
3987 fastmcp_protocol::ClientInfo {
3988 name: "http-middleware-client".to_string(),
3989 version: "1.0.0".to_string(),
3990 },
3991 fastmcp_protocol::ClientCapabilities::default(),
3992 "2024-11-05".to_string(),
3993 );
3994
3995 let http_handler = Arc::new(HttpRequestHandler::new());
3996 let notification_sender: NotificationSender = Arc::new(|_| {});
3997 let request_sender = test_request_sender();
3998 let request = http_json_request(
3999 "tools/list",
4000 serde_json::json!({
4001 "auth": "Bearer alpha"
4002 }),
4003 1,
4004 );
4005 let traffic_renderer: Option<RequestResponseRenderer> = None;
4006 let response = server.handle_http_mcp_request(
4007 &Cx::for_testing(),
4008 &session,
4009 &http_handler,
4010 &request,
4011 ¬ification_sender,
4012 &request_sender,
4013 &traffic_renderer,
4014 );
4015 assert_eq!(response.status, HttpStatus::OK);
4016
4017 let observed = seen
4018 .lock()
4019 .expect("captured auth middleware mutex should not be poisoned")
4020 .clone();
4021 assert_eq!(
4022 observed,
4023 vec![("tools/list".to_string(), Some("alpha".to_string()))]
4024 );
4025 }
4026
4027 #[test]
4028 fn http_read_only_requests_expose_request_auth_to_middleware() {
4029 #[derive(Debug)]
4030 struct EchoAuthProvider;
4031
4032 impl AuthProvider for EchoAuthProvider {
4033 fn authenticate(
4034 &self,
4035 _ctx: &McpContext,
4036 request: AuthRequest<'_>,
4037 ) -> McpResult<AuthContext> {
4038 let access = request
4039 .access_token()
4040 .ok_or_else(|| McpError::invalid_request("missing auth token"))?;
4041 Ok(AuthContext {
4042 subject: Some(access.token.clone()),
4043 token: Some(access),
4044 ..AuthContext::default()
4045 })
4046 }
4047 }
4048
4049 let seen = Arc::new(Mutex::new(Vec::new()));
4050 let middleware = CapturingAuthMiddleware {
4051 seen: Arc::clone(&seen),
4052 };
4053 let server = Server::new("http-read-only-middleware-auth-test-server", "1.0.0")
4054 .auth_provider(EchoAuthProvider)
4055 .middleware(middleware)
4056 .tool(HttpCurrentAuthSubjectTool)
4057 .build();
4058 let session = Arc::new(Mutex::new(Session::new(
4059 server.info.clone(),
4060 server.capabilities.clone(),
4061 )));
4062 session.lock().expect("session lock poisoned").initialize(
4063 fastmcp_protocol::ClientInfo {
4064 name: "http-read-only-middleware-client".to_string(),
4065 version: "1.0.0".to_string(),
4066 },
4067 fastmcp_protocol::ClientCapabilities::default(),
4068 "2024-11-05".to_string(),
4069 );
4070
4071 let http_handler = Arc::new(HttpRequestHandler::new());
4072 let notification_sender: NotificationSender = Arc::new(|_| {});
4073 let request_sender = test_request_sender();
4074 let request = http_json_request(
4075 "tools/call",
4076 serde_json::json!({
4077 "name": "http_current_auth_subject_tool",
4078 "arguments": {},
4079 "auth": "Bearer beta"
4080 }),
4081 1,
4082 );
4083 let traffic_renderer: Option<RequestResponseRenderer> = None;
4084 let response = server.handle_http_mcp_request(
4085 &Cx::for_testing(),
4086 &session,
4087 &http_handler,
4088 &request,
4089 ¬ification_sender,
4090 &request_sender,
4091 &traffic_renderer,
4092 );
4093 assert_eq!(response.status, HttpStatus::OK);
4094
4095 let json: JsonRpcResponse =
4096 serde_json::from_slice(&response.body).expect("parse HTTP JSON-RPC response");
4097 let result = json.result.expect("read-only auth request should succeed");
4098 let tool_result: CallToolResult =
4099 serde_json::from_value(result).expect("parse tool result payload");
4100 match tool_result.content.as_slice() {
4101 [Content::Text { text }] => assert_eq!(text, "beta"),
4102 other => panic!("expected single text tool result, got {other:?}"),
4103 }
4104
4105 let observed = seen
4106 .lock()
4107 .expect("captured auth middleware mutex should not be poisoned")
4108 .clone();
4109 assert_eq!(
4110 observed,
4111 vec![("tools/call".to_string(), Some("beta".to_string()))]
4112 );
4113 }
4114
4115 #[test]
4116 fn http_exclusive_middleware_auth_mutation_reaches_handler_dispatch() {
4117 let server = Server::new("http-exclusive-auth-override-test-server", "1.0.0")
4118 .middleware(OverridingAuthMiddleware {
4119 subject: "exclusive-override",
4120 })
4121 .tool(HttpCurrentAuthSubjectExclusiveTool)
4122 .build();
4123 let session = Arc::new(Mutex::new(Session::new(
4124 server.info.clone(),
4125 server.capabilities.clone(),
4126 )));
4127 session.lock().expect("session lock poisoned").initialize(
4128 fastmcp_protocol::ClientInfo {
4129 name: "http-exclusive-auth-override-client".to_string(),
4130 version: "1.0.0".to_string(),
4131 },
4132 fastmcp_protocol::ClientCapabilities::default(),
4133 "2024-11-05".to_string(),
4134 );
4135
4136 let http_handler = Arc::new(HttpRequestHandler::new());
4137 let notification_sender: NotificationSender = Arc::new(|_| {});
4138 let request_sender = test_request_sender();
4139 let request = http_json_request(
4140 "tools/call",
4141 serde_json::json!({
4142 "name": "http_current_auth_subject_exclusive_tool",
4143 "arguments": {}
4144 }),
4145 1,
4146 );
4147 let traffic_renderer: Option<RequestResponseRenderer> = None;
4148 let response = server.handle_http_mcp_request(
4149 &Cx::for_testing(),
4150 &session,
4151 &http_handler,
4152 &request,
4153 ¬ification_sender,
4154 &request_sender,
4155 &traffic_renderer,
4156 );
4157 assert_eq!(response.status, HttpStatus::OK);
4158
4159 let json: JsonRpcResponse =
4160 serde_json::from_slice(&response.body).expect("parse HTTP JSON-RPC response");
4161 let result = json
4162 .result
4163 .expect("exclusive auth override request should succeed");
4164 let tool_result: CallToolResult =
4165 serde_json::from_value(result).expect("parse tool result payload");
4166 match tool_result.content.as_slice() {
4167 [Content::Text { text }] => assert_eq!(text, "exclusive-override"),
4168 other => panic!("expected single text tool result, got {other:?}"),
4169 }
4170 }
4171
4172 #[test]
4173 fn http_read_only_middleware_auth_mutation_reaches_handler_dispatch() {
4174 let server = Server::new("http-read-only-auth-override-test-server", "1.0.0")
4175 .middleware(OverridingAuthMiddleware {
4176 subject: "read-only-override",
4177 })
4178 .tool(HttpCurrentAuthSubjectTool)
4179 .build();
4180 let session = Arc::new(Mutex::new(Session::new(
4181 server.info.clone(),
4182 server.capabilities.clone(),
4183 )));
4184 session.lock().expect("session lock poisoned").initialize(
4185 fastmcp_protocol::ClientInfo {
4186 name: "http-read-only-auth-override-client".to_string(),
4187 version: "1.0.0".to_string(),
4188 },
4189 fastmcp_protocol::ClientCapabilities::default(),
4190 "2024-11-05".to_string(),
4191 );
4192
4193 let http_handler = Arc::new(HttpRequestHandler::new());
4194 let notification_sender: NotificationSender = Arc::new(|_| {});
4195 let request_sender = test_request_sender();
4196 let request = http_json_request(
4197 "tools/call",
4198 serde_json::json!({
4199 "name": "http_current_auth_subject_tool",
4200 "arguments": {}
4201 }),
4202 1,
4203 );
4204 let traffic_renderer: Option<RequestResponseRenderer> = None;
4205 let response = server.handle_http_mcp_request(
4206 &Cx::for_testing(),
4207 &session,
4208 &http_handler,
4209 &request,
4210 ¬ification_sender,
4211 &request_sender,
4212 &traffic_renderer,
4213 );
4214 assert_eq!(response.status, HttpStatus::OK);
4215
4216 let json: JsonRpcResponse =
4217 serde_json::from_slice(&response.body).expect("parse HTTP JSON-RPC response");
4218 let result = json
4219 .result
4220 .expect("read-only auth override request should succeed");
4221 let tool_result: CallToolResult =
4222 serde_json::from_value(result).expect("parse tool result payload");
4223 match tool_result.content.as_slice() {
4224 [Content::Text { text }] => assert_eq!(text, "read-only-override"),
4225 other => panic!("expected single text tool result, got {other:?}"),
4226 }
4227 }
4228
4229 #[test]
4230 fn http_exclusive_auth_failures_flow_through_middleware_error_rewriting() {
4231 let server = Server::new("http-exclusive-auth-error-test-server", "1.0.0")
4232 .auth_provider(AlwaysFailAuthProvider)
4233 .middleware(RewritingErrorMiddleware)
4234 .build();
4235 let session = Arc::new(Mutex::new(Session::new(
4236 server.info.clone(),
4237 server.capabilities.clone(),
4238 )));
4239 session.lock().expect("session lock poisoned").initialize(
4240 fastmcp_protocol::ClientInfo {
4241 name: "http-exclusive-auth-error-client".to_string(),
4242 version: "1.0.0".to_string(),
4243 },
4244 fastmcp_protocol::ClientCapabilities::default(),
4245 "2024-11-05".to_string(),
4246 );
4247
4248 let http_handler = Arc::new(HttpRequestHandler::new());
4249 let notification_sender: NotificationSender = Arc::new(|_| {});
4250 let request_sender = test_request_sender();
4251 let request = http_json_request(
4252 "tools/list",
4253 serde_json::json!({
4254 "auth": "Bearer nope"
4255 }),
4256 1,
4257 );
4258 let traffic_renderer: Option<RequestResponseRenderer> = None;
4259 let response = server.handle_http_mcp_request(
4260 &Cx::for_testing(),
4261 &session,
4262 &http_handler,
4263 &request,
4264 ¬ification_sender,
4265 &request_sender,
4266 &traffic_renderer,
4267 );
4268 assert_eq!(response.status, HttpStatus::OK);
4269
4270 let json: JsonRpcResponse =
4271 serde_json::from_slice(&response.body).expect("parse HTTP JSON-RPC response");
4272 let error = json
4273 .error
4274 .expect("auth failure should return JSON-RPC error");
4275 assert_eq!(error.message, "rewritten: auth failed");
4276 }
4277
4278 #[test]
4279 fn http_read_only_auth_failures_flow_through_middleware_error_rewriting() {
4280 let server = Server::new("http-read-only-auth-error-test-server", "1.0.0")
4281 .auth_provider(AlwaysFailAuthProvider)
4282 .middleware(RewritingErrorMiddleware)
4283 .tool(HttpCurrentAuthSubjectTool)
4284 .build();
4285 let session = Arc::new(Mutex::new(Session::new(
4286 server.info.clone(),
4287 server.capabilities.clone(),
4288 )));
4289 session.lock().expect("session lock poisoned").initialize(
4290 fastmcp_protocol::ClientInfo {
4291 name: "http-read-only-auth-error-client".to_string(),
4292 version: "1.0.0".to_string(),
4293 },
4294 fastmcp_protocol::ClientCapabilities::default(),
4295 "2024-11-05".to_string(),
4296 );
4297
4298 let http_handler = Arc::new(HttpRequestHandler::new());
4299 let notification_sender: NotificationSender = Arc::new(|_| {});
4300 let request_sender = test_request_sender();
4301 let request = http_json_request(
4302 "tools/call",
4303 serde_json::json!({
4304 "name": "http_current_auth_subject_tool",
4305 "arguments": {},
4306 "auth": "Bearer nope"
4307 }),
4308 1,
4309 );
4310 let traffic_renderer: Option<RequestResponseRenderer> = None;
4311 let response = server.handle_http_mcp_request(
4312 &Cx::for_testing(),
4313 &session,
4314 &http_handler,
4315 &request,
4316 ¬ification_sender,
4317 &request_sender,
4318 &traffic_renderer,
4319 );
4320 assert_eq!(response.status, HttpStatus::OK);
4321
4322 let json: JsonRpcResponse =
4323 serde_json::from_slice(&response.body).expect("parse HTTP JSON-RPC response");
4324 let error = json
4325 .error
4326 .expect("auth failure should return JSON-RPC error");
4327 assert_eq!(error.message, "rewritten: auth failed");
4328 }
4329
4330 #[test]
4331 fn router_tools_call_injects_explicit_request_auth() {
4332 let server = Server::new("router-auth-test-server", "1.0.0")
4333 .tool(HttpCurrentAuthSubjectTool)
4334 .build();
4335 let result = server
4336 .router
4337 .handle_tools_call(
4338 &Cx::for_testing(),
4339 41,
4340 CallToolParams {
4341 name: "http_current_auth_subject_tool".to_string(),
4342 arguments: Some(serde_json::json!({})),
4343 meta: None,
4344 },
4345 &Budget::INFINITE,
4346 SessionState::new(),
4347 Some(AuthContext::with_subject("alpha")),
4348 None,
4349 None,
4350 )
4351 .expect("tool call should succeed");
4352
4353 match result.content.as_slice() {
4354 [Content::Text { text }] => assert_eq!(text, "alpha"),
4355 other => panic!("expected single text tool result, got {other:?}"),
4356 }
4357 }
4358
4359 #[test]
4360 fn http_returning_server_honors_cancellation_without_needing_accept_wakeup() {
4361 let port_probe =
4362 std::net::TcpListener::bind("127.0.0.1:0").expect("bind ephemeral port probe");
4363 let addr = port_probe
4364 .local_addr()
4365 .expect("discover ephemeral port probe address");
4366 drop(port_probe);
4367
4368 let cx = Cx::for_testing();
4369 let (done_tx, done_rx) = std::sync::mpsc::channel();
4370 let server_thread = thread::spawn({
4371 let server = Server::new("http-cancel-test", "1.0.0").build();
4372 let cx = cx.clone();
4373 let addr = addr.to_string();
4374 move || {
4375 server.run_http_returning_with_cx(&cx, addr);
4376 let _ = done_tx.send(());
4377 }
4378 });
4379
4380 std::thread::sleep(Duration::from_millis(50));
4381 cx.cancel_with(CancelKind::User, None);
4382
4383 let returned_before_wakeup = done_rx.recv_timeout(Duration::from_millis(300)).is_ok();
4384 if !returned_before_wakeup {
4385 let _ = std::net::TcpStream::connect(addr);
4386 let _ = done_rx.recv_timeout(Duration::from_secs(1));
4387 }
4388
4389 server_thread
4390 .join()
4391 .expect("HTTP returning server thread should not panic");
4392
4393 #[cfg(not(windows))]
4403 assert!(
4404 returned_before_wakeup,
4405 "run_http_returning_with_cx should stop promptly after cancellation without requiring an extra connection to wake accept()"
4406 );
4407 }
4408}