1#![forbid(unsafe_code)]
39#![allow(dead_code)]
40
41extern crate self as fastmcp_server;
45
46mod auth;
47pub mod bidirectional;
48mod builder;
49pub mod caching;
50pub mod docket;
51mod handler;
52mod middleware;
53pub mod oauth;
54pub mod oidc;
55pub mod providers;
56mod proxy;
57pub mod rate_limiting;
58mod router;
59mod session;
60mod tasks;
61pub mod transform;
62
63#[cfg(test)]
64mod tests;
65
66#[cfg(feature = "jwt")]
67pub use auth::JwtTokenVerifier;
68pub use auth::{
69 AllowAllAuthProvider, AuthProvider, AuthRequest, StaticTokenVerifier, TokenAuthProvider,
70 TokenVerifier,
71};
72pub use builder::ServerBuilder;
73pub use fastmcp_console::config::{BannerStyle, ConsoleConfig, TrafficVerbosity};
74pub use fastmcp_console::stats::{ServerStats, StatsSnapshot};
75pub use handler::{
76 BidirectionalSenders, BoxFuture, ProgressNotificationSender, PromptHandler, ResourceHandler,
77 ToolHandler, create_context_with_progress, create_context_with_progress_and_senders,
78};
79pub use middleware::{Middleware, MiddlewareDecision};
80pub use proxy::{ProxyBackend, ProxyCatalog, ProxyClient};
81pub use router::{
82 MountResult, NotificationSender, Router, RouterResourceReader, RouterToolCaller, TagFilters,
83};
84pub use session::Session;
85pub use tasks::{SharedTaskManager, TaskManager};
86
87pub use bidirectional::{
89 PendingRequests, RequestSender, TransportElicitationSender, TransportRootsProvider,
90 TransportSamplingSender,
91};
92
93use std::collections::HashMap;
94use std::io::{Read, Write};
95use std::sync::{Arc, Condvar, Mutex};
96use std::time::{Duration, Instant};
97
98use asupersync::time::wall_now;
99use asupersync::{Budget, CancelKind, Cx, RegionId};
100use fastmcp_console::client::RequestResponseRenderer;
101use fastmcp_console::logging::RichLoggerBuilder;
102use fastmcp_console::{banner::StartupBanner, console};
103use fastmcp_core::logging::{debug, error, info, targets};
104use fastmcp_core::{AuthContext, McpContext, McpError, McpErrorCode, McpResult};
105use fastmcp_protocol::{
106 CallToolParams, CancelTaskParams, CancelledParams, GetPromptParams, GetTaskParams,
107 InitializeParams, JsonRpcError, JsonRpcMessage, JsonRpcRequest, JsonRpcResponse,
108 ListPromptsParams, ListResourceTemplatesParams, ListResourcesParams, ListTasksParams,
109 ListToolsParams, LogLevel, LogMessageParams, Prompt, ReadResourceParams, RequestId, Resource,
110 ResourceTemplate, ServerCapabilities, ServerInfo, SetLogLevelParams, SubmitTaskParams,
111 SubscribeResourceParams, Tool, UnsubscribeResourceParams,
112};
113use fastmcp_transport::sse::SseServerTransport;
114use fastmcp_transport::websocket::WsTransport;
115use fastmcp_transport::{AsyncStdout, Codec, StdioTransport, Transport, TransportError};
116use log::{Level, LevelFilter};
117
118pub type StartupHook =
120 Box<dyn FnOnce() -> Result<(), Box<dyn std::error::Error + Send + Sync>> + Send>;
121
122pub type ShutdownHook = Box<dyn FnOnce() + Send>;
124
125#[derive(Default)]
151pub struct LifespanHooks {
152 pub on_startup: Option<StartupHook>,
154 pub on_shutdown: Option<ShutdownHook>,
156}
157
158impl LifespanHooks {
159 #[must_use]
161 pub fn new() -> Self {
162 Self::default()
163 }
164}
165
166#[derive(Debug, Clone)]
168pub struct LoggingConfig {
169 pub level: Level,
171 pub timestamps: bool,
173 pub targets: bool,
175 pub file_line: bool,
177}
178
179impl Default for LoggingConfig {
180 fn default() -> Self {
181 Self {
182 level: Level::Info,
183 timestamps: true,
184 targets: true,
185 file_line: false,
186 }
187 }
188}
189
190impl LoggingConfig {
191 #[must_use]
199 pub fn from_env() -> Self {
200 let level = std::env::var("FASTMCP_LOG")
201 .ok()
202 .and_then(|s| match s.to_lowercase().as_str() {
203 "error" => Some(Level::Error),
204 "warn" | "warning" => Some(Level::Warn),
205 "info" => Some(Level::Info),
206 "debug" => Some(Level::Debug),
207 "trace" => Some(Level::Trace),
208 _ => None,
209 })
210 .unwrap_or(Level::Info);
211
212 let timestamps = std::env::var("FASTMCP_LOG_TIMESTAMPS")
213 .map(|s| !matches!(s.to_lowercase().as_str(), "0" | "false" | "no"))
214 .unwrap_or(true);
215
216 let targets = std::env::var("FASTMCP_LOG_TARGETS")
217 .map(|s| !matches!(s.to_lowercase().as_str(), "0" | "false" | "no"))
218 .unwrap_or(true);
219
220 let file_line = std::env::var("FASTMCP_LOG_FILE_LINE")
221 .map(|s| matches!(s.to_lowercase().as_str(), "1" | "true" | "yes"))
222 .unwrap_or(false);
223
224 Self {
225 level,
226 timestamps,
227 targets,
228 file_line,
229 }
230 }
231}
232
233#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
238pub enum DuplicateBehavior {
239 Error,
243
244 #[default]
249 Warn,
250
251 Replace,
255
256 Ignore,
260}
261
262pub struct Server {
267 info: ServerInfo,
268 capabilities: ServerCapabilities,
269 router: Router,
270 instructions: Option<String>,
271 request_timeout_secs: u64,
273 stats: Option<ServerStats>,
275 mask_error_details: bool,
277 logging: LoggingConfig,
279 console_config: ConsoleConfig,
281 lifespan: Mutex<Option<LifespanHooks>>,
283 auth_provider: Option<Arc<dyn AuthProvider>>,
285 middleware: Arc<Vec<Box<dyn crate::Middleware>>>,
287 active_requests: Mutex<HashMap<RequestId, ActiveRequest>>,
289 task_manager: Option<SharedTaskManager>,
291 pending_requests: Arc<bidirectional::PendingRequests>,
293}
294
295impl Server {
296 #[must_use]
298 #[allow(clippy::new_ret_no_self)]
299 pub fn new(name: impl Into<String>, version: impl Into<String>) -> ServerBuilder {
300 ServerBuilder::new(name, version)
301 }
302
303 #[must_use]
305 pub fn info(&self) -> &ServerInfo {
306 &self.info
307 }
308
309 #[must_use]
311 pub fn capabilities(&self) -> &ServerCapabilities {
312 &self.capabilities
313 }
314
315 #[must_use]
317 pub fn tools(&self) -> Vec<Tool> {
318 self.router.tools()
319 }
320
321 #[must_use]
323 pub fn resources(&self) -> Vec<Resource> {
324 self.router.resources()
325 }
326
327 #[must_use]
329 pub fn resource_templates(&self) -> Vec<ResourceTemplate> {
330 self.router.resource_templates()
331 }
332
333 #[must_use]
335 pub fn prompts(&self) -> Vec<Prompt> {
336 self.router.prompts()
337 }
338
339 #[must_use]
343 pub fn task_manager(&self) -> Option<&SharedTaskManager> {
344 self.task_manager.as_ref()
345 }
346
347 #[must_use]
351 pub fn into_router(self) -> Router {
352 self.router
353 }
354
355 #[must_use]
360 pub fn has_tools(&self) -> bool {
361 self.capabilities.tools.is_some()
362 }
363
364 #[must_use]
366 pub fn has_resources(&self) -> bool {
367 self.capabilities.resources.is_some()
368 }
369
370 #[must_use]
372 pub fn has_prompts(&self) -> bool {
373 self.capabilities.prompts.is_some()
374 }
375
376 #[must_use]
380 pub fn stats(&self) -> Option<StatsSnapshot> {
381 self.stats.as_ref().map(ServerStats::snapshot)
382 }
383
384 #[must_use]
389 pub fn stats_collector(&self) -> Option<&ServerStats> {
390 self.stats.as_ref()
391 }
392
393 pub fn display_stats(&self) {
395 let Some(stats) = self.stats.as_ref() else {
396 return;
397 };
398
399 let snapshot = stats.snapshot();
400 let renderer = fastmcp_console::stats::StatsRenderer::detect();
401 renderer.render_panel(&snapshot, console());
402 }
403
404 #[must_use]
406 pub fn console_config(&self) -> &ConsoleConfig {
407 &self.console_config
408 }
409
410 fn render_startup_banner(&self) {
412 let render = || {
413 let mut banner = StartupBanner::new(&self.info.name, &self.info.version)
414 .tools(self.router.tools_count())
415 .resources(self.router.resources_count())
416 .prompts(self.router.prompts_count())
417 .transport("stdio");
418
419 if let Some(desc) = self.instructions.as_deref().filter(|d| !d.is_empty()) {
420 banner = banner.description(desc);
421 }
422
423 match self.console_config.banner_style {
425 BannerStyle::Full => banner.render(console()),
426 BannerStyle::Compact | BannerStyle::Minimal => {
427 banner.no_logo().render(console());
429 }
430 BannerStyle::None => {} }
432 };
433
434 if let Err(err) = std::panic::catch_unwind(std::panic::AssertUnwindSafe(render)) {
435 eprintln!("Warning: banner rendering failed: {err:?}");
436 }
437 }
438
439 fn init_rich_logging(&self) {
445 let result = RichLoggerBuilder::new()
446 .level(self.logging.level)
447 .with_timestamps(self.logging.timestamps)
448 .with_targets(self.logging.targets)
449 .with_file_line(self.logging.file_line)
450 .init();
451
452 if let Err(e) = result {
453 eprintln!("Note: Rich logging not initialized (logger already set): {e}");
455 }
456 }
457
458 pub fn run_stdio(self) -> ! {
463 let cx = Cx::for_request();
465 self.run_stdio_with_cx(&cx)
466 }
467
468 pub fn run_stdio_with_cx(self, cx: &Cx) -> ! {
472 self.init_rich_logging();
474
475 let transport = StdioTransport::stdio();
476 let shared = SharedTransport::new(transport);
477
478 let notification_sender = create_notification_sender();
482
483 let shared_recv = shared.clone();
484 let shared_send = shared.clone();
485 self.run_loop(
486 cx,
487 move |cx| shared_recv.recv(cx),
488 move |cx, message| shared_send.send(cx, message),
489 notification_sender,
490 )
491 }
492
493 pub fn run_transport<T>(self, transport: T) -> !
498 where
499 T: Transport + Send + 'static,
500 {
501 let cx = Cx::for_request();
503 self.run_transport_with_cx(&cx, transport)
504 }
505
506 pub fn run_transport_with_cx<T>(self, cx: &Cx, transport: T) -> !
510 where
511 T: Transport + Send + 'static,
512 {
513 self.init_rich_logging();
514
515 let shared = SharedTransport::new(transport);
516 let notification_sender = create_transport_notification_sender(shared.clone());
517
518 let shared_recv = shared.clone();
519 let shared_send = shared;
520 self.run_loop(
521 cx,
522 move |cx| shared_recv.recv(cx),
523 move |cx, message| shared_send.send(cx, message),
524 notification_sender,
525 )
526 }
527
528 pub fn run_transport_returning_with_cx<T>(self, cx: &Cx, transport: T)
534 where
535 T: Transport + Send + 'static,
536 {
537 self.init_rich_logging();
538
539 let shared = SharedTransport::new(transport);
540 let notification_sender = create_transport_notification_sender(shared.clone());
541
542 let shared_recv = shared.clone();
543 let shared_send = shared;
544 self.run_loop_returning(
545 cx,
546 move |cx| shared_recv.recv(cx),
547 move |cx, message| shared_send.send(cx, message),
548 notification_sender,
549 );
550 }
551
552 pub fn run_transport_returning<T>(self, transport: T)
557 where
558 T: Transport + Send + 'static,
559 {
560 let cx = Cx::for_request();
562 self.run_transport_returning_with_cx(&cx, transport);
563 }
564
565 pub fn run_sse<W, R>(self, writer: W, request_source: R, endpoint_url: impl Into<String>) -> !
569 where
570 W: Write + Send + 'static,
571 R: Iterator<Item = JsonRpcRequest> + Send + 'static,
572 {
573 let transport = SseServerTransport::new(writer, request_source, endpoint_url);
574 self.run_transport(transport)
575 }
576
577 pub fn run_sse_with_cx<W, R>(
579 self,
580 cx: &Cx,
581 writer: W,
582 request_source: R,
583 endpoint_url: impl Into<String>,
584 ) -> !
585 where
586 W: Write + Send + 'static,
587 R: Iterator<Item = JsonRpcRequest> + Send + 'static,
588 {
589 let transport = SseServerTransport::new(writer, request_source, endpoint_url);
590 self.run_transport_with_cx(cx, transport)
591 }
592
593 pub fn run_websocket<R, W>(self, reader: R, writer: W) -> !
597 where
598 R: Read + Send + 'static,
599 W: Write + Send + 'static,
600 {
601 let transport = WsTransport::new(reader, writer);
602 self.run_transport(transport)
603 }
604
605 pub fn run_websocket_with_cx<R, W>(self, cx: &Cx, reader: R, writer: W) -> !
607 where
608 R: Read + Send + 'static,
609 W: Write + Send + 'static,
610 {
611 let transport = WsTransport::new(reader, writer);
612 self.run_transport_with_cx(cx, transport)
613 }
614
615 pub(crate) fn run_startup_hook(&self) -> bool {
620 let hook = {
621 let mut guard = self.lifespan.lock().unwrap_or_else(|poisoned| {
622 error!(target: targets::SERVER, "lifespan lock poisoned in run_startup_hook, recovering");
623 poisoned.into_inner()
624 });
625 guard.as_mut().and_then(|h| h.on_startup.take())
626 };
627
628 if let Some(hook) = hook {
629 debug!(target: targets::SERVER, "Running startup hook");
630 match hook() {
631 Ok(()) => {
632 debug!(target: targets::SERVER, "Startup hook completed successfully");
633 true
634 }
635 Err(e) => {
636 error!(target: targets::SERVER, "Startup hook failed: {}", e);
637 false
638 }
639 }
640 } else {
641 true
642 }
643 }
644
645 pub(crate) fn run_shutdown_hook(&self) {
647 let hook = {
648 let mut guard = self.lifespan.lock().unwrap_or_else(|poisoned| {
649 error!(target: targets::SERVER, "lifespan lock poisoned in run_shutdown_hook, recovering");
650 poisoned.into_inner()
651 });
652 guard.as_mut().and_then(|h| h.on_shutdown.take())
653 };
654
655 if let Some(hook) = hook {
656 debug!(target: targets::SERVER, "Running shutdown hook");
657 hook();
658 debug!(target: targets::SERVER, "Shutdown hook completed");
659 }
660 }
661
662 fn graceful_shutdown(&self, exit_code: i32) -> ! {
664 self.cancel_active_requests(CancelKind::Shutdown, true);
665 self.run_shutdown_hook();
666 if let Some(ref stats) = self.stats {
667 stats.connection_closed();
668 }
669 std::process::exit(exit_code)
670 }
671
672 fn graceful_shutdown_returning(&self) {
677 self.cancel_active_requests(CancelKind::Shutdown, true);
678 self.run_shutdown_hook();
679 if let Some(ref stats) = self.stats {
680 stats.connection_closed();
681 }
682 }
683
684 fn run_loop<R, S>(
686 self,
687 cx: &Cx,
688 mut recv: R,
689 send: S,
690 notification_sender: NotificationSender,
691 ) -> !
692 where
693 R: FnMut(&Cx) -> Result<JsonRpcMessage, TransportError>,
694 S: FnMut(&Cx, &JsonRpcMessage) -> Result<(), TransportError> + Send + Sync + 'static,
695 {
696 let mut session = Session::new(self.info.clone(), self.capabilities.clone());
697
698 let send = Arc::new(Mutex::new(send));
700
701 let request_sender = {
703 let send_clone = send.clone();
704 let send_cx = cx.clone();
705 let send_fn: bidirectional::TransportSendFn = Arc::new(move |message| {
706 let mut guard = send_clone
707 .lock()
708 .map_err(|e| format!("Lock poisoned: {}", e))?;
709 guard(&send_cx, message).map_err(|e| format!("Send failed: {}", e))
710 });
711 bidirectional::RequestSender::new(self.pending_requests.clone(), send_fn)
712 };
713
714 if let Some(ref stats) = self.stats {
716 stats.connection_opened();
717 }
718
719 if self.console_config.show_banner && !banner_suppressed() {
721 self.render_startup_banner();
722 }
723
724 if !self.run_startup_hook() {
726 error!(target: targets::SERVER, "Startup hook failed, exiting");
727 self.graceful_shutdown(1);
728 }
729
730 let traffic_renderer = if self.console_config.show_request_traffic {
732 let mut renderer = RequestResponseRenderer::new(self.console_config.resolve_context());
733 renderer.truncate_at = self.console_config.truncate_at;
734 match self.console_config.traffic_verbosity {
735 TrafficVerbosity::None => {} TrafficVerbosity::Summary | TrafficVerbosity::Headers => {
737 renderer.show_params = false;
738 renderer.show_result = false;
739 }
740 TrafficVerbosity::Full => {
741 renderer.show_params = true;
742 renderer.show_result = true;
743 }
744 }
745 Some(renderer)
746 } else {
747 None
748 };
749
750 loop {
752 if cx.is_cancel_requested() {
754 info!(target: targets::SERVER, "Cancellation requested, shutting down");
755 self.graceful_shutdown(0);
756 }
757
758 let message = match recv(cx) {
760 Ok(msg) => msg,
761 Err(TransportError::Closed) => {
762 self.graceful_shutdown(0);
764 }
765 Err(TransportError::Cancelled) => {
766 info!(target: targets::SERVER, "Transport cancelled");
767 self.graceful_shutdown(0);
768 }
769 Err(e) => {
770 error!(target: targets::TRANSPORT, "Transport error: {}", e);
771 continue;
772 }
773 };
774
775 if let Some(renderer) = &traffic_renderer {
777 if let JsonRpcMessage::Request(req) = &message {
778 renderer.render_request(req, console());
779 }
780 }
781
782 let start_time = Instant::now();
783
784 let response_opt = match message {
786 JsonRpcMessage::Request(request) => {
787 if let Some(ref stats) = self.stats {
789 if let Ok(json) = serde_json::to_string(&request) {
792 stats.add_bytes_received(json.len() as u64 + 1); }
794 }
795 self.handle_request(
796 cx,
797 &mut session,
798 request,
799 ¬ification_sender,
800 &request_sender,
801 )
802 }
803 JsonRpcMessage::Response(response) => {
804 if self.pending_requests.route_response(&response) {
806 debug!(target: targets::SERVER, "Routed response to pending request");
807 } else {
808 debug!(target: targets::SERVER, "Received unexpected response: {:?}", response.id);
809 }
810 continue;
811 }
812 };
813
814 let duration = start_time.elapsed();
815
816 if let Some(response) = response_opt {
817 if let Some(renderer) = &traffic_renderer {
819 renderer.render_response(&response, Some(duration), console());
820 }
821
822 if let Some(ref stats) = self.stats {
824 if let Ok(json) = serde_json::to_string(&response) {
825 stats.add_bytes_sent(json.len() as u64 + 1); }
827 }
828
829 let send_result = {
831 let mut guard = match send.lock() {
832 Ok(guard) => guard,
833 Err(poisoned) => {
834 error!(
835 target: targets::TRANSPORT,
836 "Send channel lock poisoned; continuing with inner guard"
837 );
838 poisoned.into_inner()
839 }
840 };
841 guard(cx, &JsonRpcMessage::Response(response))
842 };
843 if let Err(e) = send_result {
844 error!(target: targets::TRANSPORT, "Failed to send response: {}", e);
845 }
846 }
847 }
848 }
849
850 #[allow(clippy::too_many_lines)]
855 fn run_loop_returning<R, S>(
856 self,
857 cx: &Cx,
858 mut recv: R,
859 send: S,
860 notification_sender: NotificationSender,
861 ) where
862 R: FnMut(&Cx) -> Result<JsonRpcMessage, TransportError>,
863 S: FnMut(&Cx, &JsonRpcMessage) -> Result<(), TransportError> + Send + Sync + 'static,
864 {
865 let mut session = Session::new(self.info.clone(), self.capabilities.clone());
866
867 let send = Arc::new(Mutex::new(send));
869
870 let request_sender = {
872 let send_clone = send.clone();
873 let send_cx = cx.clone();
874 let send_fn: bidirectional::TransportSendFn = Arc::new(move |message| {
875 let mut guard = send_clone
876 .lock()
877 .map_err(|e| format!("Lock poisoned: {}", e))?;
878 guard(&send_cx, message).map_err(|e| format!("Send failed: {}", e))
879 });
880 bidirectional::RequestSender::new(self.pending_requests.clone(), send_fn)
881 };
882
883 if let Some(ref stats) = self.stats {
885 stats.connection_opened();
886 }
887
888 if self.console_config.show_banner && !banner_suppressed() {
890 self.render_startup_banner();
891 }
892
893 if !self.run_startup_hook() {
895 error!(target: targets::SERVER, "Startup hook failed, stopping");
896 self.graceful_shutdown_returning();
897 return;
898 }
899
900 let traffic_renderer = if self.console_config.show_request_traffic {
902 let mut renderer = RequestResponseRenderer::new(self.console_config.resolve_context());
903 renderer.truncate_at = self.console_config.truncate_at;
904 match self.console_config.traffic_verbosity {
905 TrafficVerbosity::None => {} TrafficVerbosity::Summary | TrafficVerbosity::Headers => {
907 renderer.show_params = false;
908 renderer.show_result = false;
909 }
910 TrafficVerbosity::Full => {
911 renderer.show_params = true;
912 renderer.show_result = true;
913 }
914 }
915 Some(renderer)
916 } else {
917 None
918 };
919
920 loop {
922 if cx.is_cancel_requested() {
924 info!(target: targets::SERVER, "Cancellation requested, stopping");
925 self.graceful_shutdown_returning();
926 return;
927 }
928
929 let message = match recv(cx) {
931 Ok(msg) => msg,
932 Err(TransportError::Closed) => {
933 self.graceful_shutdown_returning();
934 return;
935 }
936 Err(TransportError::Cancelled) => {
937 info!(target: targets::SERVER, "Transport cancelled");
938 self.graceful_shutdown_returning();
939 return;
940 }
941 Err(e) => {
942 error!(target: targets::TRANSPORT, "Transport error: {}", e);
943 continue;
944 }
945 };
946
947 if let Some(renderer) = &traffic_renderer {
949 if let JsonRpcMessage::Request(req) = &message {
950 renderer.render_request(req, console());
951 }
952 }
953
954 let start_time = Instant::now();
955
956 let response_opt = match message {
958 JsonRpcMessage::Request(request) => {
959 if let Some(ref stats) = self.stats {
961 if let Ok(json) = serde_json::to_string(&request) {
964 stats.add_bytes_received(json.len() as u64 + 1); }
966 }
967 self.handle_request(
968 cx,
969 &mut session,
970 request,
971 ¬ification_sender,
972 &request_sender,
973 )
974 }
975 JsonRpcMessage::Response(response) => {
976 if self.pending_requests.route_response(&response) {
978 debug!(target: targets::SERVER, "Routed response to pending request");
979 } else {
980 debug!(
981 target: targets::SERVER,
982 "Received unexpected response: {:?}",
983 response.id
984 );
985 }
986 continue;
987 }
988 };
989
990 let duration = start_time.elapsed();
991
992 if let Some(response) = response_opt {
993 if let Some(renderer) = &traffic_renderer {
995 renderer.render_response(&response, Some(duration), console());
996 }
997
998 if let Some(ref stats) = self.stats {
1000 if let Ok(json) = serde_json::to_string(&response) {
1001 stats.add_bytes_sent(json.len() as u64 + 1); }
1003 }
1004
1005 let send_result = {
1007 let mut guard = match send.lock() {
1008 Ok(guard) => guard,
1009 Err(poisoned) => {
1010 error!(
1011 target: targets::TRANSPORT,
1012 "Send channel lock poisoned; continuing with inner guard"
1013 );
1014 poisoned.into_inner()
1015 }
1016 };
1017 guard(cx, &JsonRpcMessage::Response(response))
1018 };
1019 if let Err(e) = send_result {
1020 error!(target: targets::TRANSPORT, "Failed to send response: {}", e);
1021 }
1022 }
1023 }
1024 }
1025
1026 fn handle_request(
1028 &self,
1029 cx: &Cx,
1030 session: &mut Session,
1031 request: JsonRpcRequest,
1032 notification_sender: &NotificationSender,
1033 request_sender: &bidirectional::RequestSender,
1034 ) -> Option<JsonRpcResponse> {
1035 let id = request.id.clone();
1036 let method = request.method.clone();
1037 let is_notification = id.is_none();
1038
1039 let start_time = Instant::now();
1041
1042 let request_id = request_id_to_u64(id.as_ref());
1044
1045 let budget = self.create_request_budget();
1047
1048 if budget.is_exhausted() {
1050 if let Some(ref stats) = self.stats {
1052 stats.record_request(&method, start_time.elapsed(), false);
1053 }
1054 let response_id = id.clone()?;
1056 return Some(JsonRpcResponse::error(
1057 Some(response_id),
1058 JsonRpcError {
1059 code: McpErrorCode::RequestCancelled.into(),
1060 message: "Request budget exhausted".to_string(),
1061 data: None,
1062 },
1063 ));
1064 }
1065
1066 let request_cx = if is_notification {
1067 cx.clone()
1068 } else {
1069 Cx::for_request_with_budget(budget)
1070 };
1071
1072 let _active_guard = id.clone().map(|request_id| {
1073 ActiveRequestGuard::new(&self.active_requests, request_id, request_cx.clone())
1074 });
1075
1076 let result = self.dispatch_method(
1078 &request_cx,
1079 session,
1080 request,
1081 request_id,
1082 &budget,
1083 notification_sender,
1084 request_sender,
1085 );
1086
1087 let latency = start_time.elapsed();
1089 if let Some(ref stats) = self.stats {
1090 match &result {
1091 Ok(_) => stats.record_request(&method, latency, true),
1092 Err(e) if e.code == fastmcp_core::McpErrorCode::RequestCancelled => {
1093 stats.record_cancelled(&method, latency);
1094 }
1095 Err(_) => stats.record_request(&method, latency, false),
1096 }
1097 }
1098
1099 if is_notification {
1101 if let Err(e) = result {
1102 fastmcp_core::logging::error!(
1103 target: targets::HANDLER,
1104 "Notification '{}' failed: {}",
1105 method,
1106 e
1107 );
1108 }
1109 return None;
1110 }
1111
1112 let response_id = id.clone()?;
1115
1116 match result {
1117 Ok(value) => Some(JsonRpcResponse::success(response_id, value)),
1118 Err(e) => {
1119 if self.mask_error_details && e.is_internal() {
1121 fastmcp_core::logging::error!(
1122 target: targets::HANDLER,
1123 "Request '{}' failed (masked in response): {}",
1124 method,
1125 e
1126 );
1127 }
1128
1129 let masked = e.masked(self.mask_error_details);
1131 Some(JsonRpcResponse::error(
1132 id,
1133 JsonRpcError {
1134 code: masked.code.into(),
1135 message: masked.message,
1136 data: masked.data,
1137 },
1138 ))
1139 }
1140 }
1141 }
1142
1143 fn create_request_budget(&self) -> Budget {
1145 if self.request_timeout_secs == 0 {
1146 Budget::INFINITE
1148 } else {
1149 let now = wall_now();
1152 let timeout_ns = self.request_timeout_secs.saturating_mul(1_000_000_000);
1153 let deadline = now.saturating_add_nanos(timeout_ns);
1154 Budget::new().with_deadline(deadline)
1155 }
1156 }
1157
1158 #[allow(clippy::too_many_arguments, clippy::too_many_lines)]
1160 fn dispatch_method(
1161 &self,
1162 cx: &Cx,
1163 session: &mut Session,
1164 request: JsonRpcRequest,
1165 request_id: u64,
1166 budget: &Budget,
1167 notification_sender: &NotificationSender,
1168 request_sender: &bidirectional::RequestSender,
1169 ) -> Result<serde_json::Value, McpError> {
1170 if cx.is_cancel_requested() {
1172 return Err(McpError::request_cancelled());
1173 }
1174
1175 if budget.is_exhausted() {
1177 return Err(McpError::new(
1178 McpErrorCode::RequestCancelled,
1179 "Request budget exhausted",
1180 ));
1181 }
1182 if budget.is_past_deadline(wall_now()) {
1184 cx.cancel_fast(CancelKind::Deadline);
1185 return Err(McpError::new(
1186 McpErrorCode::RequestCancelled,
1187 "Request timeout exceeded",
1188 ));
1189 }
1190
1191 if !session.is_initialized() && request.method != "initialize" && request.method != "ping" {
1193 return Err(McpError::invalid_request(
1194 "Server not initialized. Client must send 'initialize' first.",
1195 ));
1196 }
1197
1198 if let Some(task_manager) = &self.task_manager {
1199 task_manager.set_notification_sender(Arc::clone(notification_sender));
1200 }
1201
1202 let mw_ctx = McpContext::with_state(cx.clone(), request_id, session.state().clone());
1206 let mut entered_middleware: Vec<&dyn crate::Middleware> = Vec::new();
1207
1208 for m in self.middleware.iter() {
1209 entered_middleware.push(m.as_ref());
1210 match m.on_request(&mw_ctx, &request) {
1211 Ok(crate::MiddlewareDecision::Continue) => {}
1212 Ok(crate::MiddlewareDecision::Respond(v)) => {
1213 return self.apply_middleware_response(
1214 &entered_middleware,
1215 &mw_ctx,
1216 &request,
1217 v,
1218 );
1219 }
1220 Err(e) => {
1221 let err =
1222 self.apply_middleware_error(&entered_middleware, &mw_ctx, &request, e);
1223 return Err(err);
1224 }
1225 }
1226 }
1227
1228 let result: Result<serde_json::Value, McpError> = (|| {
1235 if self.should_authenticate(&request.method) {
1236 let auth_request = AuthRequest {
1237 method: &request.method,
1238 params: request.params.as_ref(),
1239 request_id,
1240 };
1241 self.authenticate_request(cx, request_id, session, auth_request)?;
1242 }
1243
1244 let method = &request.method;
1245 let params = request.params.clone();
1246
1247 let bidirectional_senders = self.create_bidirectional_senders(session, request_sender);
1249
1250 match method.as_str() {
1251 "initialize" => {
1252 let params: InitializeParams = parse_params(params)?;
1253 let result = self.router.handle_initialize(
1254 cx,
1255 session,
1256 params,
1257 self.instructions.as_deref(),
1258 )?;
1259 Ok(serde_json::to_value(result).map_err(McpError::from)?)
1260 }
1261 "initialized" => {
1262 Ok(serde_json::Value::Null)
1264 }
1265 "notifications/cancelled" => {
1266 let params: CancelledParams = parse_params(params)?;
1267 self.handle_cancelled_notification(params);
1268 Ok(serde_json::Value::Null)
1269 }
1270 "logging/setLevel" => {
1271 let params: SetLogLevelParams = parse_params(params)?;
1272 self.handle_set_log_level(session, params);
1273 Ok(serde_json::Value::Null)
1274 }
1275 "tools/list" => {
1276 let params: ListToolsParams = parse_params_or_default(params)?;
1277 let result =
1278 self.router
1279 .handle_tools_list(cx, params, Some(session.state()))?;
1280 Ok(serde_json::to_value(result).map_err(McpError::from)?)
1281 }
1282 "tools/call" => {
1283 let params: CallToolParams = parse_params(params)?;
1284 let result = self.router.handle_tools_call(
1285 cx,
1286 request_id,
1287 params,
1288 budget,
1289 session.state().clone(),
1290 Some(notification_sender),
1291 bidirectional_senders.as_ref(),
1292 )?;
1293 Ok(serde_json::to_value(result).map_err(McpError::from)?)
1294 }
1295 "resources/list" => {
1296 let params: ListResourcesParams = parse_params_or_default(params)?;
1297 let result =
1298 self.router
1299 .handle_resources_list(cx, params, Some(session.state()))?;
1300 Ok(serde_json::to_value(result).map_err(McpError::from)?)
1301 }
1302 "resources/templates/list" => {
1303 let params: ListResourceTemplatesParams = parse_params_or_default(params)?;
1304 let result = self.router.handle_resource_templates_list(
1305 cx,
1306 params,
1307 Some(session.state()),
1308 )?;
1309 Ok(serde_json::to_value(result).map_err(McpError::from)?)
1310 }
1311 "resources/read" => {
1312 let params: ReadResourceParams = parse_params(params)?;
1313 let result = self.router.handle_resources_read(
1314 cx,
1315 request_id,
1316 ¶ms,
1317 budget,
1318 session.state().clone(),
1319 Some(notification_sender),
1320 bidirectional_senders.as_ref(),
1321 )?;
1322 Ok(serde_json::to_value(result).map_err(McpError::from)?)
1323 }
1324 "resources/subscribe" => {
1325 let params: SubscribeResourceParams = parse_params(params)?;
1326 if !self.router.resource_exists(¶ms.uri) {
1327 return Err(McpError::resource_not_found(¶ms.uri));
1328 }
1329 session.subscribe_resource(params.uri);
1330 Ok(serde_json::json!({}))
1331 }
1332 "resources/unsubscribe" => {
1333 let params: UnsubscribeResourceParams = parse_params(params)?;
1334 session.unsubscribe_resource(¶ms.uri);
1335 Ok(serde_json::json!({}))
1336 }
1337 "prompts/list" => {
1338 let params: ListPromptsParams = parse_params_or_default(params)?;
1339 let result =
1340 self.router
1341 .handle_prompts_list(cx, params, Some(session.state()))?;
1342 Ok(serde_json::to_value(result).map_err(McpError::from)?)
1343 }
1344 "prompts/get" => {
1345 let params: GetPromptParams = parse_params(params)?;
1346 let result = self.router.handle_prompts_get(
1347 cx,
1348 request_id,
1349 params,
1350 budget,
1351 session.state().clone(),
1352 Some(notification_sender),
1353 bidirectional_senders.as_ref(),
1354 )?;
1355 Ok(serde_json::to_value(result).map_err(McpError::from)?)
1356 }
1357 "ping" => {
1358 Ok(serde_json::json!({}))
1360 }
1361 "tasks/list" => {
1363 let params: ListTasksParams = parse_params_or_default(params)?;
1364 let result =
1365 self.router
1366 .handle_tasks_list(cx, params, self.task_manager.as_ref())?;
1367 Ok(serde_json::to_value(result).map_err(McpError::from)?)
1368 }
1369 "tasks/get" => {
1370 let params: GetTaskParams = parse_params(params)?;
1371 let result =
1372 self.router
1373 .handle_tasks_get(cx, params, self.task_manager.as_ref())?;
1374 Ok(serde_json::to_value(result).map_err(McpError::from)?)
1375 }
1376 "tasks/cancel" => {
1377 let params: CancelTaskParams = parse_params(params)?;
1378 let result =
1379 self.router
1380 .handle_tasks_cancel(cx, params, self.task_manager.as_ref())?;
1381 Ok(serde_json::to_value(result).map_err(McpError::from)?)
1382 }
1383 "tasks/submit" => {
1384 let params: SubmitTaskParams = parse_params(params)?;
1385 let result =
1386 self.router
1387 .handle_tasks_submit(cx, params, self.task_manager.as_ref())?;
1388 Ok(serde_json::to_value(result).map_err(McpError::from)?)
1389 }
1390 _ => Err(McpError::method_not_found(method)),
1391 }
1392 })();
1393
1394 let final_result = match result {
1395 Ok(v) => self.apply_middleware_response(&entered_middleware, &mw_ctx, &request, v),
1396 Err(e) => Err(self.apply_middleware_error(&entered_middleware, &mw_ctx, &request, e)),
1397 };
1398
1399 self.maybe_emit_log_notification(
1400 session,
1401 notification_sender,
1402 &request.method,
1403 &final_result,
1404 );
1405
1406 final_result
1407 }
1408
1409 fn apply_middleware_response(
1410 &self,
1411 stack: &[&dyn crate::Middleware],
1412 ctx: &McpContext,
1413 request: &JsonRpcRequest,
1414 value: serde_json::Value,
1415 ) -> Result<serde_json::Value, McpError> {
1416 let mut response = value;
1417 for m in stack.iter().rev() {
1418 match m.on_response(ctx, request, response) {
1419 Ok(next) => response = next,
1420 Err(err) => {
1421 let mapped = self.apply_middleware_error(stack, ctx, request, err);
1422 return Err(mapped);
1423 }
1424 }
1425 }
1426 Ok(response)
1427 }
1428
1429 fn apply_middleware_error(
1430 &self,
1431 stack: &[&dyn crate::Middleware],
1432 ctx: &McpContext,
1433 request: &JsonRpcRequest,
1434 error: McpError,
1435 ) -> McpError {
1436 let mut err = error;
1437 for m in stack.iter().rev() {
1438 err = m.on_error(ctx, request, err);
1439 }
1440 err
1441 }
1442
1443 fn create_bidirectional_senders(
1448 &self,
1449 session: &Session,
1450 request_sender: &bidirectional::RequestSender,
1451 ) -> Option<handler::BidirectionalSenders> {
1452 let supports_sampling = session.supports_sampling();
1453 let supports_elicitation = session.supports_elicitation();
1454
1455 if !supports_sampling && !supports_elicitation {
1456 return None;
1457 }
1458
1459 let mut senders = handler::BidirectionalSenders::new();
1460
1461 if supports_sampling {
1462 let sampling_sender: Arc<dyn fastmcp_core::SamplingSender> = Arc::new(
1463 bidirectional::TransportSamplingSender::new(request_sender.clone()),
1464 );
1465 senders = senders.with_sampling(sampling_sender);
1466 }
1467
1468 if supports_elicitation {
1469 let elicitation_sender: Arc<dyn fastmcp_core::ElicitationSender> = Arc::new(
1470 bidirectional::TransportElicitationSender::new(request_sender.clone()),
1471 );
1472 senders = senders.with_elicitation(elicitation_sender);
1473 }
1474
1475 Some(senders)
1476 }
1477
1478 fn should_authenticate(&self, method: &str) -> bool {
1479 !matches!(
1480 method,
1481 "initialize" | "initialized" | "notifications/cancelled" | "ping"
1482 )
1483 }
1484
1485 fn authenticate_request(
1486 &self,
1487 cx: &Cx,
1488 request_id: u64,
1489 session: &Session,
1490 request: AuthRequest<'_>,
1491 ) -> Result<AuthContext, McpError> {
1492 let Some(provider) = &self.auth_provider else {
1493 return Ok(AuthContext::anonymous());
1494 };
1495
1496 let ctx = McpContext::with_state(cx.clone(), request_id, session.state().clone());
1497 let auth = provider.authenticate(&ctx, request)?;
1498 if !ctx.set_auth(auth.clone()) {
1499 debug!(
1500 target: targets::SESSION,
1501 "Auth context not stored (session state unavailable)"
1502 );
1503 }
1504 Ok(auth)
1505 }
1506
1507 fn handle_cancelled_notification(&self, params: CancelledParams) {
1508 let reason = params.reason.as_deref().unwrap_or("unspecified");
1509 let await_cleanup = params.await_cleanup.unwrap_or(false);
1510 info!(
1511 target: targets::SESSION,
1512 "Cancellation requested for requestId={} (reason: {}, await_cleanup={})",
1513 params.request_id,
1514 reason,
1515 await_cleanup
1516 );
1517 let active = {
1518 let guard = self.active_requests.lock().unwrap_or_else(|poisoned| {
1519 error!(target: targets::SERVER, "active_requests lock poisoned, recovering");
1520 poisoned.into_inner()
1521 });
1522 guard
1523 .get(¶ms.request_id)
1524 .map(|entry| (entry.cx.clone(), entry.region_id, entry.completion.clone()))
1525 };
1526 if let Some((cx, region_id, completion)) = active {
1527 cx.cancel_with(CancelKind::User, None);
1528 if await_cleanup {
1529 let completed = completion.wait_timeout(AWAIT_CLEANUP_TIMEOUT);
1530 if !completed {
1531 fastmcp_core::logging::warn!(
1532 target: targets::SESSION,
1533 "await_cleanup timed out for requestId={} (region={:?})",
1534 params.request_id,
1535 region_id
1536 );
1537 }
1538 }
1539 } else {
1540 fastmcp_core::logging::warn!(
1541 target: targets::SESSION,
1542 "No active request found for cancellation requestId={}",
1543 params.request_id
1544 );
1545 }
1546 }
1547
1548 fn cancel_active_requests(&self, kind: CancelKind, await_cleanup: bool) {
1549 let active: Vec<(RequestId, RegionId, Cx, Arc<RequestCompletion>)> = {
1550 let guard = self.active_requests.lock().unwrap_or_else(|poisoned| {
1551 error!(target: targets::SERVER, "active_requests lock poisoned in cancel_active_requests, recovering");
1552 poisoned.into_inner()
1553 });
1554 guard
1555 .iter()
1556 .map(|(request_id, entry)| {
1557 (
1558 request_id.clone(),
1559 entry.region_id,
1560 entry.cx.clone(),
1561 entry.completion.clone(),
1562 )
1563 })
1564 .collect()
1565 };
1566 if active.is_empty() {
1567 return;
1568 }
1569 info!(
1570 target: targets::SESSION,
1571 "Cancelling {} active request(s) (kind={:?}, await_cleanup={})",
1572 active.len(),
1573 kind,
1574 await_cleanup
1575 );
1576 for (_, _, cx, _) in &active {
1577 cx.cancel_with(kind, None);
1578 }
1579
1580 if await_cleanup {
1581 for (request_id, region_id, _cx, completion) in active {
1582 let completed = completion.wait_timeout(AWAIT_CLEANUP_TIMEOUT);
1583 if !completed {
1584 fastmcp_core::logging::warn!(
1585 target: targets::SESSION,
1586 "Shutdown cancel timed out for requestId={} (region={:?})",
1587 request_id,
1588 region_id
1589 );
1590 }
1591 }
1592 }
1593 }
1594
1595 fn handle_set_log_level(&self, session: &mut Session, params: SetLogLevelParams) {
1596 let requested = match params.level {
1597 LogLevel::Debug => LevelFilter::Debug,
1598 LogLevel::Info => LevelFilter::Info,
1599 LogLevel::Warning => LevelFilter::Warn,
1600 LogLevel::Error => LevelFilter::Error,
1601 };
1602
1603 let configured = self.logging.level.to_level_filter();
1604 let effective = if requested > configured {
1605 configured
1606 } else {
1607 requested
1608 };
1609
1610 log::set_max_level(effective);
1611
1612 let effective_level = match effective {
1613 LevelFilter::Debug => LogLevel::Debug,
1614 LevelFilter::Info => LogLevel::Info,
1615 LevelFilter::Warn => LogLevel::Warning,
1616 LevelFilter::Error => LogLevel::Error,
1617 _ => LogLevel::Info,
1618 };
1619 session.set_log_level(effective_level);
1620
1621 if effective != requested {
1622 fastmcp_core::logging::warn!(
1623 target: targets::SESSION,
1624 "Client requested log level {:?}; clamped to server level {:?}",
1625 params.level,
1626 effective
1627 );
1628 } else {
1629 info!(
1630 target: targets::SESSION,
1631 "Log level set to {:?}",
1632 params.level
1633 );
1634 }
1635 }
1636
1637 fn log_level_rank(level: LogLevel) -> u8 {
1638 match level {
1639 LogLevel::Debug => 1,
1640 LogLevel::Info => 2,
1641 LogLevel::Warning => 3,
1642 LogLevel::Error => 4,
1643 }
1644 }
1645
1646 fn emit_log_notification(
1647 &self,
1648 session: &Session,
1649 sender: &NotificationSender,
1650 level: LogLevel,
1651 message: impl Into<String>,
1652 ) {
1653 let Some(min_level) = session.log_level() else {
1654 return;
1655 };
1656 if Self::log_level_rank(level) < Self::log_level_rank(min_level) {
1657 return;
1658 }
1659
1660 let ts = chrono::Utc::now().to_rfc3339();
1661 let text = format!("{ts} {}", message.into());
1662 let params = LogMessageParams {
1663 level,
1664 logger: Some("fastmcp_rust::server".to_string()),
1665 data: serde_json::Value::String(text),
1666 };
1667 let payload = match serde_json::to_value(params) {
1668 Ok(value) => value,
1669 Err(err) => {
1670 fastmcp_core::logging::warn!(
1671 target: targets::SESSION,
1672 "Failed to serialize log message notification: {}",
1673 err
1674 );
1675 return;
1676 }
1677 };
1678 sender(JsonRpcRequest::notification(
1679 "notifications/message",
1680 Some(payload),
1681 ));
1682 }
1683
1684 fn maybe_emit_log_notification(
1685 &self,
1686 session: &Session,
1687 sender: &NotificationSender,
1688 method: &str,
1689 result: &McpResult<serde_json::Value>,
1690 ) {
1691 if method.starts_with("notifications/") || method == "logging/setLevel" {
1692 return;
1693 }
1694 let level = if result.is_ok() {
1695 LogLevel::Info
1696 } else {
1697 LogLevel::Error
1698 };
1699 let message = if result.is_ok() {
1700 format!("Handled {}", method)
1701 } else {
1702 format!("Error handling {}", method)
1703 };
1704 self.emit_log_notification(session, sender, level, message);
1705 }
1706}
1707
1708const AWAIT_CLEANUP_TIMEOUT: Duration = Duration::from_secs(5);
1709
1710struct RequestCompletion {
1711 done: Mutex<bool>,
1712 cv: Condvar,
1713}
1714
1715impl RequestCompletion {
1716 fn new() -> Self {
1717 Self {
1718 done: Mutex::new(false),
1719 cv: Condvar::new(),
1720 }
1721 }
1722
1723 fn mark_done(&self) {
1724 let mut done = self
1725 .done
1726 .lock()
1727 .unwrap_or_else(std::sync::PoisonError::into_inner);
1728 if !*done {
1729 *done = true;
1730 self.cv.notify_all();
1731 }
1732 }
1733
1734 fn wait_timeout(&self, timeout: Duration) -> bool {
1735 let mut done = self
1736 .done
1737 .lock()
1738 .unwrap_or_else(std::sync::PoisonError::into_inner);
1739 if *done {
1740 return true;
1741 }
1742 let start = Instant::now();
1743 let mut remaining = timeout;
1744 loop {
1745 let (guard, result) = self
1746 .cv
1747 .wait_timeout(done, remaining)
1748 .unwrap_or_else(std::sync::PoisonError::into_inner);
1749 done = guard;
1750 if *done {
1751 return true;
1752 }
1753 if result.timed_out() {
1754 return false;
1755 }
1756 let elapsed = start.elapsed();
1757 remaining = match timeout.checked_sub(elapsed) {
1758 Some(left) if !left.is_zero() => left,
1759 _ => return false,
1760 };
1761 }
1762 }
1763
1764 fn is_done(&self) -> bool {
1765 let done = self
1766 .done
1767 .lock()
1768 .unwrap_or_else(std::sync::PoisonError::into_inner);
1769 *done
1770 }
1771}
1772
1773struct ActiveRequest {
1774 cx: Cx,
1775 region_id: RegionId,
1776 completion: Arc<RequestCompletion>,
1777}
1778
1779impl ActiveRequest {
1780 fn new(cx: Cx, completion: Arc<RequestCompletion>) -> Self {
1781 let region_id = cx.region_id();
1782 Self {
1783 cx,
1784 region_id,
1785 completion,
1786 }
1787 }
1788}
1789
1790struct ActiveRequestGuard<'a> {
1791 map: &'a Mutex<HashMap<RequestId, ActiveRequest>>,
1792 id: RequestId,
1793 completion: Arc<RequestCompletion>,
1794}
1795
1796impl<'a> ActiveRequestGuard<'a> {
1797 fn new(map: &'a Mutex<HashMap<RequestId, ActiveRequest>>, id: RequestId, cx: Cx) -> Self {
1798 let completion = Arc::new(RequestCompletion::new());
1799 let entry = ActiveRequest::new(cx, completion.clone());
1800 let mut guard = map
1801 .lock()
1802 .unwrap_or_else(std::sync::PoisonError::into_inner);
1803 if guard.insert(id.clone(), entry).is_some() {
1804 fastmcp_core::logging::warn!(
1805 target: targets::SESSION,
1806 "Active request replaced for requestId={}",
1807 id
1808 );
1809 }
1810 Self {
1811 map,
1812 id,
1813 completion,
1814 }
1815 }
1816}
1817
1818impl Drop for ActiveRequestGuard<'_> {
1819 fn drop(&mut self) {
1820 {
1821 let mut guard = self
1822 .map
1823 .lock()
1824 .unwrap_or_else(std::sync::PoisonError::into_inner);
1825 match guard.get(&self.id) {
1826 Some(entry) if Arc::ptr_eq(&entry.completion, &self.completion) => {
1827 guard.remove(&self.id);
1828 }
1829 Some(_) => {
1830 fastmcp_core::logging::warn!(
1831 target: targets::SESSION,
1832 "Active request replaced before drop for requestId={}",
1833 self.id
1834 );
1835 }
1836 None => {
1837 fastmcp_core::logging::warn!(
1838 target: targets::SESSION,
1839 "Active request missing on drop for requestId={}",
1840 self.id
1841 );
1842 }
1843 }
1844 }
1845 self.completion.mark_done();
1846 }
1847}
1848
1849fn banner_suppressed() -> bool {
1853 std::env::var("FASTMCP_NO_BANNER")
1854 .map(|value| matches!(value.to_lowercase().as_str(), "1" | "true" | "yes"))
1855 .unwrap_or(false)
1856}
1857
1858fn parse_params<T: serde::de::DeserializeOwned>(
1860 params: Option<serde_json::Value>,
1861) -> Result<T, McpError> {
1862 let value = params.ok_or_else(|| McpError::invalid_params("Missing required parameters"))?;
1863 serde_json::from_value(value).map_err(|e| McpError::invalid_params(e.to_string()))
1864}
1865
1866fn parse_params_or_default<T: serde::de::DeserializeOwned + Default>(
1868 params: Option<serde_json::Value>,
1869) -> Result<T, McpError> {
1870 match params {
1871 Some(value) => {
1872 serde_json::from_value(value).map_err(|e| McpError::invalid_params(e.to_string()))
1873 }
1874 None => Ok(T::default()),
1875 }
1876}
1877
1878fn request_id_to_u64(id: Option<&RequestId>) -> u64 {
1883 match id {
1884 Some(RequestId::Number(n)) => *n as u64,
1885 Some(RequestId::String(s)) => stable_hash_request_id(s),
1886 None => 0,
1887 }
1888}
1889
1890fn stable_hash_request_id(value: &str) -> u64 {
1891 const FNV_OFFSET: u64 = 0xcbf29ce484222325;
1892 const FNV_PRIME: u64 = 0x0000_0100_0000_01b3;
1893 let mut hash = FNV_OFFSET;
1894 for byte in value.as_bytes() {
1895 hash ^= u64::from(*byte);
1896 hash = hash.wrapping_mul(FNV_PRIME);
1897 }
1898 if hash == 0 { FNV_OFFSET } else { hash }
1899}
1900
1901struct SharedTransport<T> {
1902 inner: Arc<Mutex<T>>,
1903}
1904
1905impl<T> Clone for SharedTransport<T> {
1906 fn clone(&self) -> Self {
1907 Self {
1908 inner: Arc::clone(&self.inner),
1909 }
1910 }
1911}
1912
1913impl<T: Transport> SharedTransport<T> {
1914 fn new(transport: T) -> Self {
1915 Self {
1916 inner: Arc::new(Mutex::new(transport)),
1917 }
1918 }
1919
1920 fn recv(&self, cx: &Cx) -> Result<JsonRpcMessage, TransportError> {
1921 let mut guard = self.inner.lock().map_err(|_| transport_lock_error())?;
1922 guard.recv(cx)
1923 }
1924
1925 fn send(&self, cx: &Cx, message: &JsonRpcMessage) -> Result<(), TransportError> {
1926 let mut guard = self.inner.lock().map_err(|_| transport_lock_error())?;
1927 guard.send(cx, message)
1928 }
1929}
1930
1931fn transport_lock_error() -> TransportError {
1932 TransportError::Io(std::io::Error::other("transport lock poisoned"))
1933}
1934
1935fn create_transport_notification_sender<T>(transport: SharedTransport<T>) -> NotificationSender
1936where
1937 T: Transport + Send + 'static,
1938{
1939 let cx = Cx::for_request();
1940
1941 Arc::new(move |request: JsonRpcRequest| {
1942 let message = JsonRpcMessage::Request(request);
1943 if let Err(e) = transport.send(&cx, &message) {
1944 log::error!(
1945 target: targets::TRANSPORT,
1946 "Failed to send notification: {}",
1947 e
1948 );
1949 }
1950 })
1951}
1952
1953fn create_notification_sender() -> NotificationSender {
1962 use std::sync::Mutex;
1963
1964 let stdout = Mutex::new(AsyncStdout::new());
1967 let codec = Codec::new();
1968
1969 Arc::new(move |request: JsonRpcRequest| {
1970 let bytes = match codec.encode_request(&request) {
1971 Ok(b) => b,
1972 Err(e) => {
1973 log::error!(target: targets::SERVER, "Failed to encode notification: {}", e);
1974 return;
1975 }
1976 };
1977
1978 if let Ok(mut stdout) = stdout.lock() {
1979 if let Err(e) = stdout.write_all_unchecked(&bytes) {
1980 log::error!(target: targets::TRANSPORT, "Failed to send notification: {}", e);
1981 }
1982 if let Err(e) = stdout.flush_unchecked() {
1983 log::error!(target: targets::TRANSPORT, "Failed to flush notification: {}", e);
1984 }
1985 } else {
1986 log::warn!(target: targets::SERVER, "Failed to acquire stdout lock for notification");
1987 }
1988 })
1989}
1990
1991#[cfg(test)]
1992mod lib_unit_tests {
1993 use super::*;
1994
1995 #[test]
1998 fn parse_params_none_returns_error() {
1999 let result = parse_params::<serde_json::Value>(None);
2000 let err = result.unwrap_err();
2001 assert!(err.message.contains("Missing required parameters"));
2002 }
2003
2004 #[test]
2005 fn parse_params_invalid_json_returns_error() {
2006 let result = parse_params::<ListToolsParams>(Some(serde_json::json!("not_an_object")));
2008 assert!(result.is_err());
2009 }
2010
2011 #[test]
2012 fn parse_params_valid_json_succeeds() {
2013 let result = parse_params::<ReadResourceParams>(Some(serde_json::json!({"uri": "x://y"})));
2014 let params = result.unwrap();
2015 assert_eq!(params.uri, "x://y");
2016 }
2017
2018 #[test]
2021 fn parse_params_or_default_none_returns_default() {
2022 let result = parse_params_or_default::<ListToolsParams>(None);
2023 let params = result.unwrap();
2024 assert!(params.cursor.is_none());
2025 }
2026
2027 #[test]
2028 fn parse_params_or_default_invalid_json_returns_error() {
2029 let result =
2030 parse_params_or_default::<ListToolsParams>(Some(serde_json::json!("bad_input")));
2031 assert!(result.is_err());
2032 }
2033
2034 #[test]
2035 fn parse_params_or_default_valid_json_succeeds() {
2036 let result =
2037 parse_params_or_default::<ListToolsParams>(Some(serde_json::json!({"cursor": "abc"})));
2038 let params = result.unwrap();
2039 assert_eq!(params.cursor.as_deref(), Some("abc"));
2040 }
2041
2042 #[test]
2045 fn request_id_to_u64_number() {
2046 let id = RequestId::Number(42);
2047 assert_eq!(request_id_to_u64(Some(&id)), 42);
2048 }
2049
2050 #[test]
2051 fn request_id_to_u64_string() {
2052 let id = RequestId::String("req-123".to_string());
2053 let result = request_id_to_u64(Some(&id));
2054 assert_ne!(result, 0);
2055 }
2056
2057 #[test]
2058 fn request_id_to_u64_none() {
2059 assert_eq!(request_id_to_u64(None), 0);
2060 }
2061
2062 #[test]
2065 fn stable_hash_is_deterministic() {
2066 let h1 = stable_hash_request_id("test");
2067 let h2 = stable_hash_request_id("test");
2068 assert_eq!(h1, h2);
2069 }
2070
2071 #[test]
2072 fn stable_hash_never_returns_zero() {
2073 assert_ne!(stable_hash_request_id(""), 0);
2075 assert_ne!(stable_hash_request_id("a"), 0);
2076 }
2077
2078 #[test]
2079 fn stable_hash_different_inputs_differ() {
2080 let h1 = stable_hash_request_id("alpha");
2081 let h2 = stable_hash_request_id("beta");
2082 assert_ne!(h1, h2);
2083 }
2084
2085 #[test]
2088 fn request_completion_new_is_not_done() {
2089 let rc = RequestCompletion::new();
2090 assert!(!rc.is_done());
2091 }
2092
2093 #[test]
2094 fn request_completion_mark_done_sets_done() {
2095 let rc = RequestCompletion::new();
2096 rc.mark_done();
2097 assert!(rc.is_done());
2098 }
2099
2100 #[test]
2101 fn request_completion_mark_done_idempotent() {
2102 let rc = RequestCompletion::new();
2103 rc.mark_done();
2104 rc.mark_done(); assert!(rc.is_done());
2106 }
2107
2108 #[test]
2109 fn request_completion_wait_timeout_returns_true_if_done() {
2110 let rc = RequestCompletion::new();
2111 rc.mark_done();
2112 assert!(rc.wait_timeout(Duration::from_millis(10)));
2113 }
2114
2115 #[test]
2116 fn request_completion_wait_timeout_returns_false_if_not_done() {
2117 let rc = RequestCompletion::new();
2118 assert!(!rc.wait_timeout(Duration::from_millis(10)));
2119 }
2120
2121 #[test]
2124 fn duplicate_behavior_default_is_warn() {
2125 assert_eq!(DuplicateBehavior::default(), DuplicateBehavior::Warn);
2126 }
2127
2128 #[test]
2129 fn duplicate_behavior_debug_and_clone() {
2130 let b = DuplicateBehavior::Error;
2131 let debug = format!("{:?}", b);
2132 assert!(debug.contains("Error"));
2133 let cloned = b;
2134 assert_eq!(cloned, DuplicateBehavior::Error);
2135 }
2136
2137 #[test]
2138 fn duplicate_behavior_all_variants_are_distinct() {
2139 assert_ne!(DuplicateBehavior::Error, DuplicateBehavior::Warn);
2140 assert_ne!(DuplicateBehavior::Warn, DuplicateBehavior::Replace);
2141 assert_ne!(DuplicateBehavior::Replace, DuplicateBehavior::Ignore);
2142 }
2143
2144 #[test]
2147 fn logging_config_default_values() {
2148 let config = LoggingConfig::default();
2149 assert_eq!(config.level, Level::Info);
2150 assert!(config.timestamps);
2151 assert!(config.targets);
2152 assert!(!config.file_line);
2153 }
2154
2155 #[test]
2158 fn lifespan_hooks_new_has_no_hooks() {
2159 let hooks = LifespanHooks::new();
2160 assert!(hooks.on_startup.is_none());
2161 assert!(hooks.on_shutdown.is_none());
2162 }
2163
2164 #[test]
2167 fn log_level_rank_ordering() {
2168 assert!(Server::log_level_rank(LogLevel::Debug) < Server::log_level_rank(LogLevel::Info));
2169 assert!(Server::log_level_rank(LogLevel::Info) < Server::log_level_rank(LogLevel::Warning));
2170 assert!(
2171 Server::log_level_rank(LogLevel::Warning) < Server::log_level_rank(LogLevel::Error)
2172 );
2173 }
2174
2175 #[test]
2178 fn active_request_guard_removes_on_drop() {
2179 let map = Mutex::new(HashMap::new());
2180 let cx = Cx::for_testing();
2181 let id = RequestId::Number(1);
2182 {
2183 let _guard = ActiveRequestGuard::new(&map, id.clone(), cx);
2184 assert_eq!(map.lock().unwrap().len(), 1);
2185 }
2186 assert_eq!(map.lock().unwrap().len(), 0);
2188 }
2189
2190 #[test]
2195 fn logging_config_debug_and_clone() {
2196 let config = LoggingConfig::default();
2197 let debug = format!("{config:?}");
2198 assert!(debug.contains("LoggingConfig"));
2199 assert!(debug.contains("Info"));
2200
2201 let cloned = config.clone();
2202 assert_eq!(cloned.level, Level::Info);
2203 assert_eq!(cloned.timestamps, config.timestamps);
2204 }
2205
2206 #[test]
2207 fn transport_lock_error_is_io() {
2208 let err = transport_lock_error();
2209 match err {
2210 TransportError::Io(io) => {
2211 assert!(io.to_string().contains("poisoned"));
2212 }
2213 other => panic!("expected Io variant, got: {other:?}"),
2214 }
2215 }
2216
2217 #[test]
2218 fn lifespan_hooks_default_matches_new() {
2219 let default_hooks = LifespanHooks::default();
2220 let new_hooks = LifespanHooks::new();
2221 assert!(default_hooks.on_startup.is_none());
2222 assert!(default_hooks.on_shutdown.is_none());
2223 assert!(new_hooks.on_startup.is_none());
2224 assert!(new_hooks.on_shutdown.is_none());
2225 }
2226
2227 #[test]
2228 fn request_completion_wait_resolves_on_concurrent_done() {
2229 use std::sync::Arc;
2230 use std::thread;
2231
2232 let rc = Arc::new(RequestCompletion::new());
2233 let rc_clone = rc.clone();
2234
2235 let handle = thread::spawn(move || {
2236 thread::sleep(Duration::from_millis(20));
2237 rc_clone.mark_done();
2238 });
2239
2240 assert!(rc.wait_timeout(Duration::from_secs(2)));
2242 handle.join().unwrap();
2243 }
2244
2245 #[test]
2246 fn active_request_stores_region_id() {
2247 let cx = Cx::for_testing();
2248 let expected_region = cx.region_id();
2249 let completion = Arc::new(RequestCompletion::new());
2250 let ar = ActiveRequest::new(cx, completion);
2251 assert_eq!(ar.region_id, expected_region);
2252 }
2253}