1#![forbid(unsafe_code)]
39#![allow(dead_code)]
40
41mod auth;
42pub mod bidirectional;
43mod builder;
44pub mod caching;
45pub mod docket;
46mod handler;
47mod middleware;
48pub mod oauth;
49pub mod oidc;
50pub mod providers;
51mod proxy;
52pub mod rate_limiting;
53mod router;
54mod session;
55mod tasks;
56pub mod transform;
57
58#[cfg(test)]
59mod tests;
60
61#[cfg(feature = "jwt")]
62pub use auth::JwtTokenVerifier;
63pub use auth::{
64 AllowAllAuthProvider, AuthProvider, AuthRequest, StaticTokenVerifier, TokenAuthProvider,
65 TokenVerifier,
66};
67pub use builder::ServerBuilder;
68pub use fastmcp_console::config::{BannerStyle, ConsoleConfig, TrafficVerbosity};
69pub use fastmcp_console::stats::{ServerStats, StatsSnapshot};
70pub use handler::{
71 BidirectionalSenders, BoxFuture, ProgressNotificationSender, PromptHandler, ResourceHandler,
72 ToolHandler, create_context_with_progress, create_context_with_progress_and_senders,
73};
74pub use middleware::{Middleware, MiddlewareDecision};
75pub use proxy::{ProxyBackend, ProxyCatalog, ProxyClient};
76pub use router::{
77 MountResult, NotificationSender, Router, RouterResourceReader, RouterToolCaller, TagFilters,
78};
79pub use session::Session;
80pub use tasks::{SharedTaskManager, TaskManager};
81
82pub use bidirectional::{
84 PendingRequests, RequestSender, TransportElicitationSender, TransportRootsProvider,
85 TransportSamplingSender,
86};
87
88use std::collections::HashMap;
89use std::io::{Read, Write};
90use std::sync::{Arc, Condvar, Mutex};
91use std::time::{Duration, Instant};
92
93use asupersync::{Budget, CancelKind, Cx, RegionId};
94use fastmcp_console::client::RequestResponseRenderer;
95use fastmcp_console::logging::RichLoggerBuilder;
96use fastmcp_console::{banner::StartupBanner, console};
97use fastmcp_core::logging::{debug, error, info, targets};
98use fastmcp_core::{AuthContext, McpContext, McpError, McpErrorCode, McpResult};
99use fastmcp_protocol::{
100 CallToolParams, CancelTaskParams, CancelledParams, GetPromptParams, GetTaskParams,
101 InitializeParams, JsonRpcError, JsonRpcMessage, JsonRpcRequest, JsonRpcResponse,
102 ListPromptsParams, ListResourceTemplatesParams, ListResourcesParams, ListTasksParams,
103 ListToolsParams, LogLevel, LogMessageParams, Prompt, ReadResourceParams, RequestId, Resource,
104 ResourceTemplate, ServerCapabilities, ServerInfo, SetLogLevelParams, SubmitTaskParams,
105 SubscribeResourceParams, Tool, UnsubscribeResourceParams,
106};
107use fastmcp_transport::sse::SseServerTransport;
108use fastmcp_transport::websocket::WsTransport;
109use fastmcp_transport::{AsyncStdout, Codec, StdioTransport, Transport, TransportError};
110use log::{Level, LevelFilter};
111
112pub type StartupHook =
114 Box<dyn FnOnce() -> Result<(), Box<dyn std::error::Error + Send + Sync>> + Send>;
115
116pub type ShutdownHook = Box<dyn FnOnce() + Send>;
118
119#[derive(Default)]
145pub struct LifespanHooks {
146 pub on_startup: Option<StartupHook>,
148 pub on_shutdown: Option<ShutdownHook>,
150}
151
152impl LifespanHooks {
153 #[must_use]
155 pub fn new() -> Self {
156 Self::default()
157 }
158}
159
160#[derive(Debug, Clone)]
162pub struct LoggingConfig {
163 pub level: Level,
165 pub timestamps: bool,
167 pub targets: bool,
169 pub file_line: bool,
171}
172
173impl Default for LoggingConfig {
174 fn default() -> Self {
175 Self {
176 level: Level::Info,
177 timestamps: true,
178 targets: true,
179 file_line: false,
180 }
181 }
182}
183
184impl LoggingConfig {
185 #[must_use]
193 pub fn from_env() -> Self {
194 let level = std::env::var("FASTMCP_LOG")
195 .ok()
196 .and_then(|s| match s.to_lowercase().as_str() {
197 "error" => Some(Level::Error),
198 "warn" | "warning" => Some(Level::Warn),
199 "info" => Some(Level::Info),
200 "debug" => Some(Level::Debug),
201 "trace" => Some(Level::Trace),
202 _ => None,
203 })
204 .unwrap_or(Level::Info);
205
206 let timestamps = std::env::var("FASTMCP_LOG_TIMESTAMPS")
207 .map(|s| !matches!(s.to_lowercase().as_str(), "0" | "false" | "no"))
208 .unwrap_or(true);
209
210 let targets = std::env::var("FASTMCP_LOG_TARGETS")
211 .map(|s| !matches!(s.to_lowercase().as_str(), "0" | "false" | "no"))
212 .unwrap_or(true);
213
214 let file_line = std::env::var("FASTMCP_LOG_FILE_LINE")
215 .map(|s| matches!(s.to_lowercase().as_str(), "1" | "true" | "yes"))
216 .unwrap_or(false);
217
218 Self {
219 level,
220 timestamps,
221 targets,
222 file_line,
223 }
224 }
225}
226
227#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
232pub enum DuplicateBehavior {
233 Error,
237
238 #[default]
243 Warn,
244
245 Replace,
249
250 Ignore,
254}
255
256pub struct Server {
261 info: ServerInfo,
262 capabilities: ServerCapabilities,
263 router: Router,
264 instructions: Option<String>,
265 request_timeout_secs: u64,
267 stats: Option<ServerStats>,
269 mask_error_details: bool,
271 logging: LoggingConfig,
273 console_config: ConsoleConfig,
275 lifespan: Mutex<Option<LifespanHooks>>,
277 auth_provider: Option<Arc<dyn AuthProvider>>,
279 middleware: Arc<Vec<Box<dyn crate::Middleware>>>,
281 active_requests: Mutex<HashMap<RequestId, ActiveRequest>>,
283 task_manager: Option<SharedTaskManager>,
285 pending_requests: Arc<bidirectional::PendingRequests>,
287}
288
289impl Server {
290 #[must_use]
292 #[allow(clippy::new_ret_no_self)]
293 pub fn new(name: impl Into<String>, version: impl Into<String>) -> ServerBuilder {
294 ServerBuilder::new(name, version)
295 }
296
297 #[must_use]
299 pub fn info(&self) -> &ServerInfo {
300 &self.info
301 }
302
303 #[must_use]
305 pub fn capabilities(&self) -> &ServerCapabilities {
306 &self.capabilities
307 }
308
309 #[must_use]
311 pub fn tools(&self) -> Vec<Tool> {
312 self.router.tools()
313 }
314
315 #[must_use]
317 pub fn resources(&self) -> Vec<Resource> {
318 self.router.resources()
319 }
320
321 #[must_use]
323 pub fn resource_templates(&self) -> Vec<ResourceTemplate> {
324 self.router.resource_templates()
325 }
326
327 #[must_use]
329 pub fn prompts(&self) -> Vec<Prompt> {
330 self.router.prompts()
331 }
332
333 #[must_use]
337 pub fn task_manager(&self) -> Option<&SharedTaskManager> {
338 self.task_manager.as_ref()
339 }
340
341 #[must_use]
345 pub fn into_router(self) -> Router {
346 self.router
347 }
348
349 #[must_use]
354 pub fn has_tools(&self) -> bool {
355 self.capabilities.tools.is_some()
356 }
357
358 #[must_use]
360 pub fn has_resources(&self) -> bool {
361 self.capabilities.resources.is_some()
362 }
363
364 #[must_use]
366 pub fn has_prompts(&self) -> bool {
367 self.capabilities.prompts.is_some()
368 }
369
370 #[must_use]
374 pub fn stats(&self) -> Option<StatsSnapshot> {
375 self.stats.as_ref().map(ServerStats::snapshot)
376 }
377
378 #[must_use]
383 pub fn stats_collector(&self) -> Option<&ServerStats> {
384 self.stats.as_ref()
385 }
386
387 pub fn display_stats(&self) {
389 let Some(stats) = self.stats.as_ref() else {
390 return;
391 };
392
393 let snapshot = stats.snapshot();
394 let renderer = fastmcp_console::stats::StatsRenderer::detect();
395 renderer.render_panel(&snapshot, console());
396 }
397
398 #[must_use]
400 pub fn console_config(&self) -> &ConsoleConfig {
401 &self.console_config
402 }
403
404 fn render_startup_banner(&self) {
406 let render = || {
407 let mut banner = StartupBanner::new(&self.info.name, &self.info.version)
408 .tools(self.router.tools_count())
409 .resources(self.router.resources_count())
410 .prompts(self.router.prompts_count())
411 .transport("stdio");
412
413 if let Some(desc) = self.instructions.as_deref().filter(|d| !d.is_empty()) {
414 banner = banner.description(desc);
415 }
416
417 match self.console_config.banner_style {
419 BannerStyle::Full => banner.render(console()),
420 BannerStyle::Compact | BannerStyle::Minimal => {
421 banner.no_logo().render(console());
423 }
424 BannerStyle::None => {} }
426 };
427
428 if let Err(err) = std::panic::catch_unwind(std::panic::AssertUnwindSafe(render)) {
429 eprintln!("Warning: banner rendering failed: {err:?}");
430 }
431 }
432
433 fn init_rich_logging(&self) {
439 let result = RichLoggerBuilder::new()
440 .level(self.logging.level)
441 .with_timestamps(self.logging.timestamps)
442 .with_targets(self.logging.targets)
443 .with_file_line(self.logging.file_line)
444 .init();
445
446 if let Err(e) = result {
447 eprintln!("Note: Rich logging not initialized (logger already set): {e}");
449 }
450 }
451
452 pub fn run_stdio(self) -> ! {
457 let cx = Cx::for_testing();
459 self.run_stdio_with_cx(&cx)
460 }
461
462 pub fn run_stdio_with_cx(self, cx: &Cx) -> ! {
466 self.init_rich_logging();
468
469 let transport = StdioTransport::stdio();
470 let shared = SharedTransport::new(transport);
471
472 let notification_sender = create_notification_sender();
476
477 let shared_recv = shared.clone();
478 let shared_send = shared.clone();
479 self.run_loop(
480 cx,
481 move |cx| shared_recv.recv(cx),
482 move |cx, message| shared_send.send(cx, message),
483 notification_sender,
484 )
485 }
486
487 pub fn run_transport<T>(self, transport: T) -> !
492 where
493 T: Transport + Send + 'static,
494 {
495 let cx = Cx::for_testing();
496 self.run_transport_with_cx(&cx, transport)
497 }
498
499 pub fn run_transport_with_cx<T>(self, cx: &Cx, transport: T) -> !
503 where
504 T: Transport + Send + 'static,
505 {
506 self.init_rich_logging();
507
508 let shared = SharedTransport::new(transport);
509 let notification_sender = create_transport_notification_sender(shared.clone());
510
511 let shared_recv = shared.clone();
512 let shared_send = shared;
513 self.run_loop(
514 cx,
515 move |cx| shared_recv.recv(cx),
516 move |cx, message| shared_send.send(cx, message),
517 notification_sender,
518 )
519 }
520
521 pub fn run_sse<W, R>(self, writer: W, request_source: R, endpoint_url: impl Into<String>) -> !
525 where
526 W: Write + Send + 'static,
527 R: Iterator<Item = JsonRpcRequest> + Send + 'static,
528 {
529 let transport = SseServerTransport::new(writer, request_source, endpoint_url);
530 self.run_transport(transport)
531 }
532
533 pub fn run_sse_with_cx<W, R>(
535 self,
536 cx: &Cx,
537 writer: W,
538 request_source: R,
539 endpoint_url: impl Into<String>,
540 ) -> !
541 where
542 W: Write + Send + 'static,
543 R: Iterator<Item = JsonRpcRequest> + Send + 'static,
544 {
545 let transport = SseServerTransport::new(writer, request_source, endpoint_url);
546 self.run_transport_with_cx(cx, transport)
547 }
548
549 pub fn run_websocket<R, W>(self, reader: R, writer: W) -> !
553 where
554 R: Read + Send + 'static,
555 W: Write + Send + 'static,
556 {
557 let transport = WsTransport::new(reader, writer);
558 self.run_transport(transport)
559 }
560
561 pub fn run_websocket_with_cx<R, W>(self, cx: &Cx, reader: R, writer: W) -> !
563 where
564 R: Read + Send + 'static,
565 W: Write + Send + 'static,
566 {
567 let transport = WsTransport::new(reader, writer);
568 self.run_transport_with_cx(cx, transport)
569 }
570
571 pub(crate) fn run_startup_hook(&self) -> bool {
576 let hook = {
577 let mut guard = self.lifespan.lock().unwrap_or_else(|poisoned| {
578 error!(target: targets::SERVER, "lifespan lock poisoned in run_startup_hook, recovering");
579 poisoned.into_inner()
580 });
581 guard.as_mut().and_then(|h| h.on_startup.take())
582 };
583
584 if let Some(hook) = hook {
585 debug!(target: targets::SERVER, "Running startup hook");
586 match hook() {
587 Ok(()) => {
588 debug!(target: targets::SERVER, "Startup hook completed successfully");
589 true
590 }
591 Err(e) => {
592 error!(target: targets::SERVER, "Startup hook failed: {}", e);
593 false
594 }
595 }
596 } else {
597 true
598 }
599 }
600
601 pub(crate) fn run_shutdown_hook(&self) {
603 let hook = {
604 let mut guard = self.lifespan.lock().unwrap_or_else(|poisoned| {
605 error!(target: targets::SERVER, "lifespan lock poisoned in run_shutdown_hook, recovering");
606 poisoned.into_inner()
607 });
608 guard.as_mut().and_then(|h| h.on_shutdown.take())
609 };
610
611 if let Some(hook) = hook {
612 debug!(target: targets::SERVER, "Running shutdown hook");
613 hook();
614 debug!(target: targets::SERVER, "Shutdown hook completed");
615 }
616 }
617
618 fn graceful_shutdown(&self, exit_code: i32) -> ! {
620 self.cancel_active_requests(CancelKind::Shutdown, true);
621 self.run_shutdown_hook();
622 if let Some(ref stats) = self.stats {
623 stats.connection_closed();
624 }
625 std::process::exit(exit_code)
626 }
627
628 fn run_loop<R, S>(
630 self,
631 cx: &Cx,
632 mut recv: R,
633 send: S,
634 notification_sender: NotificationSender,
635 ) -> !
636 where
637 R: FnMut(&Cx) -> Result<JsonRpcMessage, TransportError>,
638 S: FnMut(&Cx, &JsonRpcMessage) -> Result<(), TransportError> + Send + Sync + 'static,
639 {
640 let mut session = Session::new(self.info.clone(), self.capabilities.clone());
641
642 let send = Arc::new(Mutex::new(send));
644
645 let request_sender = {
647 let send_clone = send.clone();
648 let send_fn: bidirectional::TransportSendFn = Arc::new(move |message| {
649 let mut guard = send_clone
650 .lock()
651 .map_err(|e| format!("Lock poisoned: {}", e))?;
652 let cx = Cx::for_testing();
654 guard(&cx, message).map_err(|e| format!("Send failed: {}", e))
655 });
656 bidirectional::RequestSender::new(self.pending_requests.clone(), send_fn)
657 };
658
659 if let Some(ref stats) = self.stats {
661 stats.connection_opened();
662 }
663
664 if self.console_config.show_banner && !banner_suppressed() {
666 self.render_startup_banner();
667 }
668
669 if !self.run_startup_hook() {
671 error!(target: targets::SERVER, "Startup hook failed, exiting");
672 self.graceful_shutdown(1);
673 }
674
675 let traffic_renderer = if self.console_config.show_request_traffic {
677 let mut renderer = RequestResponseRenderer::new(self.console_config.resolve_context());
678 renderer.truncate_at = self.console_config.truncate_at;
679 match self.console_config.traffic_verbosity {
680 TrafficVerbosity::None => {} TrafficVerbosity::Summary | TrafficVerbosity::Headers => {
682 renderer.show_params = false;
683 renderer.show_result = false;
684 }
685 TrafficVerbosity::Full => {
686 renderer.show_params = true;
687 renderer.show_result = true;
688 }
689 }
690 Some(renderer)
691 } else {
692 None
693 };
694
695 loop {
697 if cx.is_cancel_requested() {
699 info!(target: targets::SERVER, "Cancellation requested, shutting down");
700 self.graceful_shutdown(0);
701 }
702
703 let message = match recv(cx) {
705 Ok(msg) => msg,
706 Err(TransportError::Closed) => {
707 self.graceful_shutdown(0);
709 }
710 Err(TransportError::Cancelled) => {
711 info!(target: targets::SERVER, "Transport cancelled");
712 self.graceful_shutdown(0);
713 }
714 Err(e) => {
715 error!(target: targets::TRANSPORT, "Transport error: {}", e);
716 continue;
717 }
718 };
719
720 if let Some(renderer) = &traffic_renderer {
722 if let JsonRpcMessage::Request(req) = &message {
723 renderer.render_request(req, console());
724 }
725 }
726
727 let start_time = Instant::now();
728
729 let response_opt = match message {
731 JsonRpcMessage::Request(request) => {
732 if let Some(ref stats) = self.stats {
734 if let Ok(json) = serde_json::to_string(&request) {
737 stats.add_bytes_received(json.len() as u64 + 1); }
739 }
740 self.handle_request(
741 cx,
742 &mut session,
743 request,
744 ¬ification_sender,
745 &request_sender,
746 )
747 }
748 JsonRpcMessage::Response(response) => {
749 if self.pending_requests.route_response(&response) {
751 debug!(target: targets::SERVER, "Routed response to pending request");
752 } else {
753 debug!(target: targets::SERVER, "Received unexpected response: {:?}", response.id);
754 }
755 continue;
756 }
757 };
758
759 let duration = start_time.elapsed();
760
761 if let Some(response) = response_opt {
762 if let Some(renderer) = &traffic_renderer {
764 renderer.render_response(&response, Some(duration), console());
765 }
766
767 if let Some(ref stats) = self.stats {
769 if let Ok(json) = serde_json::to_string(&response) {
770 stats.add_bytes_sent(json.len() as u64 + 1); }
772 }
773
774 let send_result = {
776 let mut guard = send.lock().unwrap();
777 guard(cx, &JsonRpcMessage::Response(response))
778 };
779 if let Err(e) = send_result {
780 error!(target: targets::TRANSPORT, "Failed to send response: {}", e);
781 }
782 }
783 }
784 }
785
786 fn handle_request(
788 &self,
789 cx: &Cx,
790 session: &mut Session,
791 request: JsonRpcRequest,
792 notification_sender: &NotificationSender,
793 request_sender: &bidirectional::RequestSender,
794 ) -> Option<JsonRpcResponse> {
795 let id = request.id.clone();
796 let method = request.method.clone();
797 let is_notification = id.is_none();
798
799 let start_time = Instant::now();
801
802 let request_id = request_id_to_u64(id.as_ref());
804
805 let budget = self.create_request_budget();
807
808 if budget.is_exhausted() {
810 if let Some(ref stats) = self.stats {
812 stats.record_request(&method, start_time.elapsed(), false);
813 }
814 let response_id = id.clone()?;
816 return Some(JsonRpcResponse::error(
817 Some(response_id),
818 JsonRpcError {
819 code: McpErrorCode::RequestCancelled.into(),
820 message: "Request budget exhausted".to_string(),
821 data: None,
822 },
823 ));
824 }
825
826 let request_cx = if is_notification {
827 cx.clone()
828 } else {
829 Cx::for_request_with_budget(budget)
830 };
831
832 let _active_guard = id.clone().map(|request_id| {
833 ActiveRequestGuard::new(&self.active_requests, request_id, request_cx.clone())
834 });
835
836 let result = self.dispatch_method(
838 &request_cx,
839 session,
840 request,
841 request_id,
842 &budget,
843 notification_sender,
844 request_sender,
845 );
846
847 let latency = start_time.elapsed();
849 if let Some(ref stats) = self.stats {
850 match &result {
851 Ok(_) => stats.record_request(&method, latency, true),
852 Err(e) if e.code == fastmcp_core::McpErrorCode::RequestCancelled => {
853 stats.record_cancelled(&method, latency);
854 }
855 Err(_) => stats.record_request(&method, latency, false),
856 }
857 }
858
859 if is_notification {
861 if let Err(e) = result {
862 fastmcp_core::logging::error!(
863 target: targets::HANDLER,
864 "Notification '{}' failed: {}",
865 method,
866 e
867 );
868 }
869 return None;
870 }
871
872 let response_id = id.clone().unwrap();
875
876 match result {
877 Ok(value) => Some(JsonRpcResponse::success(response_id, value)),
878 Err(e) => {
879 if self.mask_error_details && e.is_internal() {
881 fastmcp_core::logging::error!(
882 target: targets::HANDLER,
883 "Request '{}' failed (masked in response): {}",
884 method,
885 e
886 );
887 }
888
889 let masked = e.masked(self.mask_error_details);
891 Some(JsonRpcResponse::error(
892 id,
893 JsonRpcError {
894 code: masked.code.into(),
895 message: masked.message,
896 data: masked.data,
897 },
898 ))
899 }
900 }
901 }
902
903 fn create_request_budget(&self) -> Budget {
905 if self.request_timeout_secs == 0 {
906 Budget::INFINITE
908 } else {
909 Budget::with_deadline_secs(self.request_timeout_secs)
911 }
912 }
913
914 #[allow(clippy::too_many_arguments, clippy::too_many_lines)]
916 fn dispatch_method(
917 &self,
918 cx: &Cx,
919 session: &mut Session,
920 request: JsonRpcRequest,
921 request_id: u64,
922 budget: &Budget,
923 notification_sender: &NotificationSender,
924 request_sender: &bidirectional::RequestSender,
925 ) -> Result<serde_json::Value, McpError> {
926 if cx.is_cancel_requested() {
928 return Err(McpError::request_cancelled());
929 }
930
931 if budget.is_exhausted() {
933 return Err(McpError::new(
934 McpErrorCode::RequestCancelled,
935 "Request budget exhausted",
936 ));
937 }
938
939 if !session.is_initialized() && request.method != "initialize" && request.method != "ping" {
941 return Err(McpError::invalid_request(
942 "Server not initialized. Client must send 'initialize' first.",
943 ));
944 }
945
946 if let Some(task_manager) = &self.task_manager {
947 task_manager.set_notification_sender(Arc::clone(notification_sender));
948 }
949
950 let mw_ctx = McpContext::with_state(cx.clone(), request_id, session.state().clone());
954 let mut entered_middleware: Vec<&dyn crate::Middleware> = Vec::new();
955
956 for m in self.middleware.iter() {
957 entered_middleware.push(m.as_ref());
958 match m.on_request(&mw_ctx, &request) {
959 Ok(crate::MiddlewareDecision::Continue) => {}
960 Ok(crate::MiddlewareDecision::Respond(v)) => {
961 return self.apply_middleware_response(
962 &entered_middleware,
963 &mw_ctx,
964 &request,
965 v,
966 );
967 }
968 Err(e) => {
969 let err =
970 self.apply_middleware_error(&entered_middleware, &mw_ctx, &request, e);
971 return Err(err);
972 }
973 }
974 }
975
976 if self.should_authenticate(&request.method) {
977 let auth_request = AuthRequest {
978 method: &request.method,
979 params: request.params.as_ref(),
980 request_id,
981 };
982 self.authenticate_request(cx, request_id, session, auth_request)?;
983 }
984
985 let method = &request.method;
986 let params = request.params.clone();
987
988 let bidirectional_senders = self.create_bidirectional_senders(session, request_sender);
990
991 let result = match method.as_str() {
992 "initialize" => {
993 let params: InitializeParams = parse_params(params)?;
994 let result = self.router.handle_initialize(
995 cx,
996 session,
997 params,
998 self.instructions.as_deref(),
999 )?;
1000 Ok(serde_json::to_value(result).map_err(McpError::from)?)
1001 }
1002 "initialized" => {
1003 Ok(serde_json::Value::Null)
1005 }
1006 "notifications/cancelled" => {
1007 let params: CancelledParams = parse_params(params)?;
1008 self.handle_cancelled_notification(params);
1009 Ok(serde_json::Value::Null)
1010 }
1011 "logging/setLevel" => {
1012 let params: SetLogLevelParams = parse_params(params)?;
1013 self.handle_set_log_level(session, params);
1014 Ok(serde_json::Value::Null)
1015 }
1016 "tools/list" => {
1017 let params: ListToolsParams = parse_params_or_default(params)?;
1018 let result = self
1019 .router
1020 .handle_tools_list(cx, params, Some(session.state()))?;
1021 Ok(serde_json::to_value(result).map_err(McpError::from)?)
1022 }
1023 "tools/call" => {
1024 let params: CallToolParams = parse_params(params)?;
1025 let result = self.router.handle_tools_call(
1026 cx,
1027 request_id,
1028 params,
1029 budget,
1030 session.state().clone(),
1031 Some(notification_sender),
1032 bidirectional_senders.as_ref(),
1033 )?;
1034 Ok(serde_json::to_value(result).map_err(McpError::from)?)
1035 }
1036 "resources/list" => {
1037 let params: ListResourcesParams = parse_params_or_default(params)?;
1038 let result =
1039 self.router
1040 .handle_resources_list(cx, params, Some(session.state()))?;
1041 Ok(serde_json::to_value(result).map_err(McpError::from)?)
1042 }
1043 "resources/templates/list" => {
1044 let params: ListResourceTemplatesParams = parse_params_or_default(params)?;
1045 let result = self.router.handle_resource_templates_list(
1046 cx,
1047 params,
1048 Some(session.state()),
1049 )?;
1050 Ok(serde_json::to_value(result).map_err(McpError::from)?)
1051 }
1052 "resources/read" => {
1053 let params: ReadResourceParams = parse_params(params)?;
1054 let result = self.router.handle_resources_read(
1055 cx,
1056 request_id,
1057 ¶ms,
1058 budget,
1059 session.state().clone(),
1060 Some(notification_sender),
1061 bidirectional_senders.as_ref(),
1062 )?;
1063 Ok(serde_json::to_value(result).map_err(McpError::from)?)
1064 }
1065 "resources/subscribe" => {
1066 let params: SubscribeResourceParams = parse_params(params)?;
1067 if !self.router.resource_exists(¶ms.uri) {
1068 return Err(McpError::resource_not_found(¶ms.uri));
1069 }
1070 session.subscribe_resource(params.uri);
1071 Ok(serde_json::json!({}))
1072 }
1073 "resources/unsubscribe" => {
1074 let params: UnsubscribeResourceParams = parse_params(params)?;
1075 session.unsubscribe_resource(¶ms.uri);
1076 Ok(serde_json::json!({}))
1077 }
1078 "prompts/list" => {
1079 let params: ListPromptsParams = parse_params_or_default(params)?;
1080 let result = self
1081 .router
1082 .handle_prompts_list(cx, params, Some(session.state()))?;
1083 Ok(serde_json::to_value(result).map_err(McpError::from)?)
1084 }
1085 "prompts/get" => {
1086 let params: GetPromptParams = parse_params(params)?;
1087 let result = self.router.handle_prompts_get(
1088 cx,
1089 request_id,
1090 params,
1091 budget,
1092 session.state().clone(),
1093 Some(notification_sender),
1094 bidirectional_senders.as_ref(),
1095 )?;
1096 Ok(serde_json::to_value(result).map_err(McpError::from)?)
1097 }
1098 "ping" => {
1099 Ok(serde_json::json!({}))
1101 }
1102 "tasks/list" => {
1104 let params: ListTasksParams = parse_params_or_default(params)?;
1105 let result =
1106 self.router
1107 .handle_tasks_list(cx, params, self.task_manager.as_ref())?;
1108 Ok(serde_json::to_value(result).map_err(McpError::from)?)
1109 }
1110 "tasks/get" => {
1111 let params: GetTaskParams = parse_params(params)?;
1112 let result =
1113 self.router
1114 .handle_tasks_get(cx, params, self.task_manager.as_ref())?;
1115 Ok(serde_json::to_value(result).map_err(McpError::from)?)
1116 }
1117 "tasks/cancel" => {
1118 let params: CancelTaskParams = parse_params(params)?;
1119 let result =
1120 self.router
1121 .handle_tasks_cancel(cx, params, self.task_manager.as_ref())?;
1122 Ok(serde_json::to_value(result).map_err(McpError::from)?)
1123 }
1124 "tasks/submit" => {
1125 let params: SubmitTaskParams = parse_params(params)?;
1126 let result =
1127 self.router
1128 .handle_tasks_submit(cx, params, self.task_manager.as_ref())?;
1129 Ok(serde_json::to_value(result).map_err(McpError::from)?)
1130 }
1131 _ => Err(McpError::method_not_found(method)),
1132 };
1133
1134 let final_result = match result {
1135 Ok(v) => self.apply_middleware_response(&entered_middleware, &mw_ctx, &request, v),
1136 Err(e) => Err(self.apply_middleware_error(&entered_middleware, &mw_ctx, &request, e)),
1137 };
1138
1139 self.maybe_emit_log_notification(session, notification_sender, method, &final_result);
1140
1141 final_result
1142 }
1143
1144 fn apply_middleware_response(
1145 &self,
1146 stack: &[&dyn crate::Middleware],
1147 ctx: &McpContext,
1148 request: &JsonRpcRequest,
1149 value: serde_json::Value,
1150 ) -> Result<serde_json::Value, McpError> {
1151 let mut response = value;
1152 for m in stack.iter().rev() {
1153 match m.on_response(ctx, request, response) {
1154 Ok(next) => response = next,
1155 Err(err) => {
1156 let mapped = self.apply_middleware_error(stack, ctx, request, err);
1157 return Err(mapped);
1158 }
1159 }
1160 }
1161 Ok(response)
1162 }
1163
1164 fn apply_middleware_error(
1165 &self,
1166 stack: &[&dyn crate::Middleware],
1167 ctx: &McpContext,
1168 request: &JsonRpcRequest,
1169 error: McpError,
1170 ) -> McpError {
1171 let mut err = error;
1172 for m in stack.iter().rev() {
1173 err = m.on_error(ctx, request, err);
1174 }
1175 err
1176 }
1177
1178 fn create_bidirectional_senders(
1183 &self,
1184 session: &Session,
1185 request_sender: &bidirectional::RequestSender,
1186 ) -> Option<handler::BidirectionalSenders> {
1187 let supports_sampling = session.supports_sampling();
1188 let supports_elicitation = session.supports_elicitation();
1189
1190 if !supports_sampling && !supports_elicitation {
1191 return None;
1192 }
1193
1194 let mut senders = handler::BidirectionalSenders::new();
1195
1196 if supports_sampling {
1197 let sampling_sender: Arc<dyn fastmcp_core::SamplingSender> = Arc::new(
1198 bidirectional::TransportSamplingSender::new(request_sender.clone()),
1199 );
1200 senders = senders.with_sampling(sampling_sender);
1201 }
1202
1203 if supports_elicitation {
1204 let elicitation_sender: Arc<dyn fastmcp_core::ElicitationSender> = Arc::new(
1205 bidirectional::TransportElicitationSender::new(request_sender.clone()),
1206 );
1207 senders = senders.with_elicitation(elicitation_sender);
1208 }
1209
1210 Some(senders)
1211 }
1212
1213 fn should_authenticate(&self, method: &str) -> bool {
1214 !matches!(
1215 method,
1216 "initialize" | "initialized" | "notifications/cancelled" | "ping"
1217 )
1218 }
1219
1220 fn authenticate_request(
1221 &self,
1222 cx: &Cx,
1223 request_id: u64,
1224 session: &Session,
1225 request: AuthRequest<'_>,
1226 ) -> Result<AuthContext, McpError> {
1227 let Some(provider) = &self.auth_provider else {
1228 return Ok(AuthContext::anonymous());
1229 };
1230
1231 let ctx = McpContext::with_state(cx.clone(), request_id, session.state().clone());
1232 let auth = provider.authenticate(&ctx, request)?;
1233 if !ctx.set_auth(auth.clone()) {
1234 debug!(
1235 target: targets::SESSION,
1236 "Auth context not stored (session state unavailable)"
1237 );
1238 }
1239 Ok(auth)
1240 }
1241
1242 fn handle_cancelled_notification(&self, params: CancelledParams) {
1243 let reason = params.reason.as_deref().unwrap_or("unspecified");
1244 let await_cleanup = params.await_cleanup.unwrap_or(false);
1245 info!(
1246 target: targets::SESSION,
1247 "Cancellation requested for requestId={} (reason: {}, await_cleanup={})",
1248 params.request_id,
1249 reason,
1250 await_cleanup
1251 );
1252 let active = {
1253 let guard = self.active_requests.lock().unwrap_or_else(|poisoned| {
1254 error!(target: targets::SERVER, "active_requests lock poisoned, recovering");
1255 poisoned.into_inner()
1256 });
1257 guard
1258 .get(¶ms.request_id)
1259 .map(|entry| (entry.cx.clone(), entry.region_id, entry.completion.clone()))
1260 };
1261 if let Some((cx, region_id, completion)) = active {
1262 cx.cancel_with(CancelKind::User, None);
1263 if await_cleanup {
1264 let completed = completion.wait_timeout(AWAIT_CLEANUP_TIMEOUT);
1265 if !completed {
1266 fastmcp_core::logging::warn!(
1267 target: targets::SESSION,
1268 "await_cleanup timed out for requestId={} (region={:?})",
1269 params.request_id,
1270 region_id
1271 );
1272 }
1273 }
1274 } else {
1275 fastmcp_core::logging::warn!(
1276 target: targets::SESSION,
1277 "No active request found for cancellation requestId={}",
1278 params.request_id
1279 );
1280 }
1281 }
1282
1283 fn cancel_active_requests(&self, kind: CancelKind, await_cleanup: bool) {
1284 let active: Vec<(RequestId, RegionId, Cx, Arc<RequestCompletion>)> = {
1285 let guard = self.active_requests.lock().unwrap_or_else(|poisoned| {
1286 error!(target: targets::SERVER, "active_requests lock poisoned in cancel_active_requests, recovering");
1287 poisoned.into_inner()
1288 });
1289 guard
1290 .iter()
1291 .map(|(request_id, entry)| {
1292 (
1293 request_id.clone(),
1294 entry.region_id,
1295 entry.cx.clone(),
1296 entry.completion.clone(),
1297 )
1298 })
1299 .collect()
1300 };
1301 if active.is_empty() {
1302 return;
1303 }
1304 info!(
1305 target: targets::SESSION,
1306 "Cancelling {} active request(s) (kind={:?}, await_cleanup={})",
1307 active.len(),
1308 kind,
1309 await_cleanup
1310 );
1311 for (_, _, cx, _) in &active {
1312 cx.cancel_with(kind, None);
1313 }
1314
1315 if await_cleanup {
1316 for (request_id, region_id, _cx, completion) in active {
1317 let completed = completion.wait_timeout(AWAIT_CLEANUP_TIMEOUT);
1318 if !completed {
1319 fastmcp_core::logging::warn!(
1320 target: targets::SESSION,
1321 "Shutdown cancel timed out for requestId={} (region={:?})",
1322 request_id,
1323 region_id
1324 );
1325 }
1326 }
1327 }
1328 }
1329
1330 fn handle_set_log_level(&self, session: &mut Session, params: SetLogLevelParams) {
1331 let requested = match params.level {
1332 LogLevel::Debug => LevelFilter::Debug,
1333 LogLevel::Info => LevelFilter::Info,
1334 LogLevel::Warning => LevelFilter::Warn,
1335 LogLevel::Error => LevelFilter::Error,
1336 };
1337
1338 let configured = self.logging.level.to_level_filter();
1339 let effective = if requested > configured {
1340 configured
1341 } else {
1342 requested
1343 };
1344
1345 log::set_max_level(effective);
1346
1347 let effective_level = match effective {
1348 LevelFilter::Debug => LogLevel::Debug,
1349 LevelFilter::Info => LogLevel::Info,
1350 LevelFilter::Warn => LogLevel::Warning,
1351 LevelFilter::Error => LogLevel::Error,
1352 _ => LogLevel::Info,
1353 };
1354 session.set_log_level(effective_level);
1355
1356 if effective != requested {
1357 fastmcp_core::logging::warn!(
1358 target: targets::SESSION,
1359 "Client requested log level {:?}; clamped to server level {:?}",
1360 params.level,
1361 effective
1362 );
1363 } else {
1364 info!(
1365 target: targets::SESSION,
1366 "Log level set to {:?}",
1367 params.level
1368 );
1369 }
1370 }
1371
1372 fn log_level_rank(level: LogLevel) -> u8 {
1373 match level {
1374 LogLevel::Debug => 1,
1375 LogLevel::Info => 2,
1376 LogLevel::Warning => 3,
1377 LogLevel::Error => 4,
1378 }
1379 }
1380
1381 fn emit_log_notification(
1382 &self,
1383 session: &Session,
1384 sender: &NotificationSender,
1385 level: LogLevel,
1386 message: impl Into<String>,
1387 ) {
1388 let Some(min_level) = session.log_level() else {
1389 return;
1390 };
1391 if Self::log_level_rank(level) < Self::log_level_rank(min_level) {
1392 return;
1393 }
1394
1395 let ts = chrono::Utc::now().to_rfc3339();
1396 let text = format!("{ts} {}", message.into());
1397 let params = LogMessageParams {
1398 level,
1399 logger: Some("fastmcp::server".to_string()),
1400 data: serde_json::Value::String(text),
1401 };
1402 let payload = match serde_json::to_value(params) {
1403 Ok(value) => value,
1404 Err(err) => {
1405 fastmcp_core::logging::warn!(
1406 target: targets::SESSION,
1407 "Failed to serialize log message notification: {}",
1408 err
1409 );
1410 return;
1411 }
1412 };
1413 sender(JsonRpcRequest::notification(
1414 "notifications/message",
1415 Some(payload),
1416 ));
1417 }
1418
1419 fn maybe_emit_log_notification(
1420 &self,
1421 session: &Session,
1422 sender: &NotificationSender,
1423 method: &str,
1424 result: &McpResult<serde_json::Value>,
1425 ) {
1426 if method.starts_with("notifications/") || method == "logging/setLevel" {
1427 return;
1428 }
1429 let level = if result.is_ok() {
1430 LogLevel::Info
1431 } else {
1432 LogLevel::Error
1433 };
1434 let message = if result.is_ok() {
1435 format!("Handled {}", method)
1436 } else {
1437 format!("Error handling {}", method)
1438 };
1439 self.emit_log_notification(session, sender, level, message);
1440 }
1441}
1442
1443const AWAIT_CLEANUP_TIMEOUT: Duration = Duration::from_secs(5);
1444
1445struct RequestCompletion {
1446 done: Mutex<bool>,
1447 cv: Condvar,
1448}
1449
1450impl RequestCompletion {
1451 fn new() -> Self {
1452 Self {
1453 done: Mutex::new(false),
1454 cv: Condvar::new(),
1455 }
1456 }
1457
1458 fn mark_done(&self) {
1459 let mut done = self
1460 .done
1461 .lock()
1462 .unwrap_or_else(std::sync::PoisonError::into_inner);
1463 if !*done {
1464 *done = true;
1465 self.cv.notify_all();
1466 }
1467 }
1468
1469 fn wait_timeout(&self, timeout: Duration) -> bool {
1470 let mut done = self
1471 .done
1472 .lock()
1473 .unwrap_or_else(std::sync::PoisonError::into_inner);
1474 if *done {
1475 return true;
1476 }
1477 let start = Instant::now();
1478 let mut remaining = timeout;
1479 loop {
1480 let (guard, result) = self
1481 .cv
1482 .wait_timeout(done, remaining)
1483 .unwrap_or_else(std::sync::PoisonError::into_inner);
1484 done = guard;
1485 if *done {
1486 return true;
1487 }
1488 if result.timed_out() {
1489 return false;
1490 }
1491 let elapsed = start.elapsed();
1492 remaining = match timeout.checked_sub(elapsed) {
1493 Some(left) if !left.is_zero() => left,
1494 _ => return false,
1495 };
1496 }
1497 }
1498
1499 fn is_done(&self) -> bool {
1500 let done = self
1501 .done
1502 .lock()
1503 .unwrap_or_else(std::sync::PoisonError::into_inner);
1504 *done
1505 }
1506}
1507
1508struct ActiveRequest {
1509 cx: Cx,
1510 region_id: RegionId,
1511 completion: Arc<RequestCompletion>,
1512}
1513
1514impl ActiveRequest {
1515 fn new(cx: Cx, completion: Arc<RequestCompletion>) -> Self {
1516 let region_id = cx.region_id();
1517 Self {
1518 cx,
1519 region_id,
1520 completion,
1521 }
1522 }
1523}
1524
1525struct ActiveRequestGuard<'a> {
1526 map: &'a Mutex<HashMap<RequestId, ActiveRequest>>,
1527 id: RequestId,
1528 completion: Arc<RequestCompletion>,
1529}
1530
1531impl<'a> ActiveRequestGuard<'a> {
1532 fn new(map: &'a Mutex<HashMap<RequestId, ActiveRequest>>, id: RequestId, cx: Cx) -> Self {
1533 let completion = Arc::new(RequestCompletion::new());
1534 let entry = ActiveRequest::new(cx, completion.clone());
1535 let mut guard = map
1536 .lock()
1537 .unwrap_or_else(std::sync::PoisonError::into_inner);
1538 if guard.insert(id.clone(), entry).is_some() {
1539 fastmcp_core::logging::warn!(
1540 target: targets::SESSION,
1541 "Active request replaced for requestId={}",
1542 id
1543 );
1544 }
1545 Self {
1546 map,
1547 id,
1548 completion,
1549 }
1550 }
1551}
1552
1553impl Drop for ActiveRequestGuard<'_> {
1554 fn drop(&mut self) {
1555 {
1556 let mut guard = self
1557 .map
1558 .lock()
1559 .unwrap_or_else(std::sync::PoisonError::into_inner);
1560 match guard.get(&self.id) {
1561 Some(entry) if Arc::ptr_eq(&entry.completion, &self.completion) => {
1562 guard.remove(&self.id);
1563 }
1564 Some(_) => {
1565 fastmcp_core::logging::warn!(
1566 target: targets::SESSION,
1567 "Active request replaced before drop for requestId={}",
1568 self.id
1569 );
1570 }
1571 None => {
1572 fastmcp_core::logging::warn!(
1573 target: targets::SESSION,
1574 "Active request missing on drop for requestId={}",
1575 self.id
1576 );
1577 }
1578 }
1579 }
1580 self.completion.mark_done();
1581 }
1582}
1583
1584fn banner_suppressed() -> bool {
1588 std::env::var("FASTMCP_NO_BANNER")
1589 .map(|value| matches!(value.to_lowercase().as_str(), "1" | "true" | "yes"))
1590 .unwrap_or(false)
1591}
1592
1593fn parse_params<T: serde::de::DeserializeOwned>(
1595 params: Option<serde_json::Value>,
1596) -> Result<T, McpError> {
1597 let value = params.ok_or_else(|| McpError::invalid_params("Missing required parameters"))?;
1598 serde_json::from_value(value).map_err(|e| McpError::invalid_params(e.to_string()))
1599}
1600
1601fn parse_params_or_default<T: serde::de::DeserializeOwned + Default>(
1603 params: Option<serde_json::Value>,
1604) -> Result<T, McpError> {
1605 match params {
1606 Some(value) => {
1607 serde_json::from_value(value).map_err(|e| McpError::invalid_params(e.to_string()))
1608 }
1609 None => Ok(T::default()),
1610 }
1611}
1612
1613fn request_id_to_u64(id: Option<&RequestId>) -> u64 {
1618 match id {
1619 Some(RequestId::Number(n)) => *n as u64,
1620 Some(RequestId::String(s)) => stable_hash_request_id(s),
1621 None => 0,
1622 }
1623}
1624
1625fn stable_hash_request_id(value: &str) -> u64 {
1626 const FNV_OFFSET: u64 = 0xcbf29ce484222325;
1627 const FNV_PRIME: u64 = 0x0000_0100_0000_01b3;
1628 let mut hash = FNV_OFFSET;
1629 for byte in value.as_bytes() {
1630 hash ^= u64::from(*byte);
1631 hash = hash.wrapping_mul(FNV_PRIME);
1632 }
1633 if hash == 0 { FNV_OFFSET } else { hash }
1634}
1635
1636struct SharedTransport<T> {
1637 inner: Arc<Mutex<T>>,
1638}
1639
1640impl<T> Clone for SharedTransport<T> {
1641 fn clone(&self) -> Self {
1642 Self {
1643 inner: Arc::clone(&self.inner),
1644 }
1645 }
1646}
1647
1648impl<T: Transport> SharedTransport<T> {
1649 fn new(transport: T) -> Self {
1650 Self {
1651 inner: Arc::new(Mutex::new(transport)),
1652 }
1653 }
1654
1655 fn recv(&self, cx: &Cx) -> Result<JsonRpcMessage, TransportError> {
1656 let mut guard = self.inner.lock().map_err(|_| transport_lock_error())?;
1657 guard.recv(cx)
1658 }
1659
1660 fn send(&self, cx: &Cx, message: &JsonRpcMessage) -> Result<(), TransportError> {
1661 let mut guard = self.inner.lock().map_err(|_| transport_lock_error())?;
1662 guard.send(cx, message)
1663 }
1664}
1665
1666fn transport_lock_error() -> TransportError {
1667 TransportError::Io(std::io::Error::other("transport lock poisoned"))
1668}
1669
1670fn create_transport_notification_sender<T>(transport: SharedTransport<T>) -> NotificationSender
1671where
1672 T: Transport + Send + 'static,
1673{
1674 let cx = Cx::for_testing();
1675
1676 Arc::new(move |request: JsonRpcRequest| {
1677 let message = JsonRpcMessage::Request(request);
1678 if let Err(e) = transport.send(&cx, &message) {
1679 log::error!(
1680 target: targets::TRANSPORT,
1681 "Failed to send notification: {}",
1682 e
1683 );
1684 }
1685 })
1686}
1687
1688fn create_notification_sender() -> NotificationSender {
1697 use std::sync::Mutex;
1698
1699 let stdout = Mutex::new(AsyncStdout::new());
1702 let codec = Codec::new();
1703
1704 Arc::new(move |request: JsonRpcRequest| {
1705 let bytes = match codec.encode_request(&request) {
1706 Ok(b) => b,
1707 Err(e) => {
1708 log::error!(target: targets::SERVER, "Failed to encode notification: {}", e);
1709 return;
1710 }
1711 };
1712
1713 if let Ok(mut stdout) = stdout.lock() {
1714 if let Err(e) = stdout.write_all_unchecked(&bytes) {
1715 log::error!(target: targets::TRANSPORT, "Failed to send notification: {}", e);
1716 }
1717 if let Err(e) = stdout.flush_unchecked() {
1718 log::error!(target: targets::TRANSPORT, "Failed to flush notification: {}", e);
1719 }
1720 } else {
1721 log::warn!(target: targets::SERVER, "Failed to acquire stdout lock for notification");
1722 }
1723 })
1724}