1use tracing::{debug, info, warn, error};
2use rmcp::{
6 ErrorData, RoleClient, RoleServer, ServerHandler,
7 model::{
8 CallToolRequestParam, CallToolResult, ClientInfo, Content, Implementation, ListToolsResult,
9 PaginatedRequestParam, ServerInfo,
10 },
11 service::{NotificationContext, Peer, RequestContext, RunningService},
12};
13use std::sync::Arc;
14use arc_swap::ArcSwapOption;
15pub use mcp_common::ToolFilter;
16
17#[derive(Debug)]
20struct PeerInner {
21 peer: Peer<RoleClient>,
23 #[allow(dead_code)]
25 _running: Arc<RunningService<RoleClient, ClientInfo>>,
26}
27
28#[derive(Clone, Debug)]
33pub struct SseHandler {
34 peer: Arc<ArcSwapOption<PeerInner>>,
37 cached_info: ServerInfo,
39 mcp_id: String,
41 tool_filter: ToolFilter,
43}
44
45impl ServerHandler for SseHandler {
46 fn get_info(&self) -> ServerInfo {
47 self.cached_info.clone()
48 }
49
50 #[tracing::instrument(skip(self, request, context), fields(
51 mcp_id = %self.mcp_id,
52 request = ?request,
53 ))]
54 async fn list_tools(
55 &self,
56 request: Option<PaginatedRequestParam>,
57 context: RequestContext<RoleServer>,
58 ) -> Result<ListToolsResult, ErrorData> {
59 let inner_guard = self.peer.load();
61 let inner = inner_guard.as_ref().ok_or_else(|| {
62 error!("Backend connection is not available (reconnecting)");
63 ErrorData::internal_error(
64 "Backend connection is not availabletemp/rust-sdk, reconnecting...".to_string(),
65 None,
66 )
67 })?;
68
69 if inner.peer.is_transport_closed() {
71 error!("Backend transport is closed");
72 return Err(ErrorData::internal_error(
73 "Backend connection closed, please retry".to_string(),
74 None,
75 ));
76 }
77
78 match self.capabilities().tools {
80 Some(_) => {
81 tokio::select! {
83 result = inner.peer.list_tools(request) => {
84 match result {
85 Ok(result) => {
86 let filtered_tools: Vec<_> = if self.tool_filter.is_enabled() {
88 result
89 .tools
90 .into_iter()
91 .filter(|tool| self.tool_filter.is_allowed(&tool.name))
92 .collect()
93 } else {
94 result.tools
95 };
96
97 info!(
99 "[list_tools] 工具列表结果 - MCP ID: {}, 工具数量: {}{}",
100 self.mcp_id,
101 filtered_tools.len(),
102 if self.tool_filter.is_enabled() {
103 " (已过滤)"
104 } else {
105 ""
106 }
107 );
108
109 debug!(
110 "Proxying list_tools response with {} tools",
111 filtered_tools.len()
112 );
113 Ok(ListToolsResult {
114 tools: filtered_tools,
115 next_cursor: result.next_cursor,
116 })
117 }
118 Err(err) => {
119 error!("Error listing tools: {:?}", err);
120 Err(ErrorData::internal_error(
121 format!("Error listing tools: {err}"),
122 None,
123 ))
124 }
125 }
126 }
127 _ = context.ct.cancelled() => {
128 info!("[list_tools] 请求被取消 - MCP ID: {}", self.mcp_id);
129 Err(ErrorData::internal_error(
130 "Request cancelled".to_string(),
131 None,
132 ))
133 }
134 }
135 }
136 None => {
137 warn!("Server doesn't support tools capability");
139 Ok(ListToolsResult::default())
140 }
141 }
142 }
143
144 #[tracing::instrument(skip(self, request, context), fields(
145 mcp_id = %self.mcp_id,
146 tool_name = %request.name,
147 tool_arguments = ?request.arguments,
148 ))]
149 async fn call_tool(
150 &self,
151 request: CallToolRequestParam,
152 context: RequestContext<RoleServer>,
153 ) -> Result<CallToolResult, ErrorData> {
154 if !self.tool_filter.is_allowed(&request.name) {
156 info!(
157 "[call_tool] 工具被过滤 - MCP ID: {}, 工具: {}",
158 self.mcp_id, request.name
159 );
160 return Ok(CallToolResult::error(vec![Content::text(format!(
161 "Tool '{}' is not allowed by filter configuration",
162 request.name
163 ))]));
164 }
165
166 let inner_guard = self.peer.load();
168 let inner = match inner_guard.as_ref() {
169 Some(inner) => inner,
170 None => {
171 error!("Backend connection is not available (reconnecting)");
172 return Ok(CallToolResult::error(vec![Content::text(
173 "Backend connection is not available, reconnecting..."
174 )]));
175 }
176 };
177
178 if inner.peer.is_transport_closed() {
180 error!("Backend transport is closed");
181 return Ok(CallToolResult::error(vec![Content::text(
182 "Backend connection closed, please retry",
183 )]));
184 }
185
186 match self.capabilities().tools {
188 Some(_) => {
189 tokio::select! {
191 result = inner.peer.call_tool(request.clone()) => {
192 match result {
193 Ok(result) => {
194 info!(
196 "[call_tool] 工具调用成功 - MCP ID: {}, 工具: {}",
197 self.mcp_id, request.name
198 );
199
200 debug!("Tool call succeeded");
201 Ok(result)
202 }
203 Err(err) => {
204 error!("Error calling tool: {:?}", err);
205 Ok(CallToolResult::error(vec![Content::text(format!(
207 "Error: {err}"
208 ))]))
209 }
210 }
211 }
212 _ = context.ct.cancelled() => {
213 info!(
214 "[call_tool] 请求被取消 - MCP ID: {}, 工具: {}",
215 self.mcp_id, request.name
216 );
217 Ok(CallToolResult::error(vec![Content::text(
218 "Request cancelled"
219 )]))
220 }
221 }
222 }
223 None => {
224 error!("Server doesn't support tools capability");
225 Ok(CallToolResult::error(vec![Content::text(
226 "Server doesn't support tools capability",
227 )]))
228 }
229 }
230 }
231
232 async fn list_resources(
233 &self,
234 request: Option<PaginatedRequestParam>,
235 context: RequestContext<RoleServer>,
236 ) -> Result<rmcp::model::ListResourcesResult, ErrorData> {
237 let inner_guard = self.peer.load();
239 let inner = inner_guard.as_ref().ok_or_else(|| {
240 error!("Backend connection is not available (reconnecting)");
241 ErrorData::internal_error(
242 "Backend connection is not available, reconnecting...".to_string(),
243 None,
244 )
245 })?;
246
247 if inner.peer.is_transport_closed() {
249 error!("Backend transport is closed");
250 return Err(ErrorData::internal_error(
251 "Backend connection closed, please retry".to_string(),
252 None,
253 ));
254 }
255
256 match self.capabilities().resources {
258 Some(_) => {
259 tokio::select! {
260 result = inner.peer.list_resources(request) => {
261 match result {
262 Ok(result) => {
263 info!(
265 "[list_resources] 资源列表结果 - MCP ID: {}, 资源数量: {}",
266 self.mcp_id,
267 result.resources.len()
268 );
269
270 debug!("Proxying list_resources response");
271 Ok(result)
272 }
273 Err(err) => {
274 error!("Error listing resources: {:?}", err);
275 Err(ErrorData::internal_error(
276 format!("Error listing resources: {err}"),
277 None,
278 ))
279 }
280 }
281 }
282 _ = context.ct.cancelled() => {
283 info!("[list_resources] 请求被取消 - MCP ID: {}", self.mcp_id);
284 Err(ErrorData::internal_error(
285 "Request cancelled".to_string(),
286 None,
287 ))
288 }
289 }
290 }
291 None => {
292 warn!("Server doesn't support resources capability");
294 Ok(rmcp::model::ListResourcesResult::default())
295 }
296 }
297 }
298
299 async fn read_resource(
300 &self,
301 request: rmcp::model::ReadResourceRequestParam,
302 context: RequestContext<RoleServer>,
303 ) -> Result<rmcp::model::ReadResourceResult, ErrorData> {
304 let inner_guard = self.peer.load();
306 let inner = inner_guard.as_ref().ok_or_else(|| {
307 error!("Backend connection is not available (reconnecting)");
308 ErrorData::internal_error(
309 "Backend connection is not available, reconnecting...".to_string(),
310 None,
311 )
312 })?;
313
314 if inner.peer.is_transport_closed() {
316 error!("Backend transport is closed");
317 return Err(ErrorData::internal_error(
318 "Backend connection closed, please retry".to_string(),
319 None,
320 ));
321 }
322
323 match self.capabilities().resources {
325 Some(_) => {
326 tokio::select! {
327 result = inner.peer.read_resource(rmcp::model::ReadResourceRequestParam {
328 uri: request.uri.clone(),
329 }) => {
330 match result {
331 Ok(result) => {
332 info!(
334 "[read_resource] 资源读取结果 - MCP ID: {}, URI: {}",
335 self.mcp_id, request.uri
336 );
337
338 debug!("Proxying read_resource response for {}", request.uri);
339 Ok(result)
340 }
341 Err(err) => {
342 error!("Error reading resource: {:?}", err);
343 Err(ErrorData::internal_error(
344 format!("Error reading resource: {err}"),
345 None,
346 ))
347 }
348 }
349 }
350 _ = context.ct.cancelled() => {
351 info!("[read_resource] 请求被取消 - MCP ID: {}, URI: {}", self.mcp_id, request.uri);
352 Err(ErrorData::internal_error(
353 "Request cancelled".to_string(),
354 None,
355 ))
356 }
357 }
358 }
359 None => {
360 error!("Server doesn't support resources capability");
362 Ok(rmcp::model::ReadResourceResult {
363 contents: Vec::new(),
364 })
365 }
366 }
367 }
368
369 async fn list_resource_templates(
370 &self,
371 request: Option<PaginatedRequestParam>,
372 context: RequestContext<RoleServer>,
373 ) -> Result<rmcp::model::ListResourceTemplatesResult, ErrorData> {
374 let inner_guard = self.peer.load();
376 let inner = inner_guard.as_ref().ok_or_else(|| {
377 error!("Backend connection is not available (reconnecting)");
378 ErrorData::internal_error(
379 "Backend connection is not available, reconnecting...".to_string(),
380 None,
381 )
382 })?;
383
384 if inner.peer.is_transport_closed() {
386 error!("Backend transport is closed");
387 return Err(ErrorData::internal_error(
388 "Backend connection closed, please retry".to_string(),
389 None,
390 ));
391 }
392
393 match self.capabilities().resources {
395 Some(_) => {
396 tokio::select! {
397 result = inner.peer.list_resource_templates(request) => {
398 match result {
399 Ok(result) => {
400 debug!("Proxying list_resource_templates response");
401 Ok(result)
402 }
403 Err(err) => {
404 error!("Error listing resource templates: {:?}", err);
405 Err(ErrorData::internal_error(
406 format!("Error listing resource templates: {err}"),
407 None,
408 ))
409 }
410 }
411 }
412 _ = context.ct.cancelled() => {
413 info!("[list_resource_templates] 请求被取消 - MCP ID: {}", self.mcp_id);
414 Err(ErrorData::internal_error(
415 "Request cancelled".to_string(),
416 None,
417 ))
418 }
419 }
420 }
421 None => {
422 warn!("Server doesn't support resources capability");
424 Ok(rmcp::model::ListResourceTemplatesResult::default())
425 }
426 }
427 }
428
429 async fn list_prompts(
430 &self,
431 request: Option<PaginatedRequestParam>,
432 context: RequestContext<RoleServer>,
433 ) -> Result<rmcp::model::ListPromptsResult, ErrorData> {
434 let inner_guard = self.peer.load();
436 let inner = inner_guard.as_ref().ok_or_else(|| {
437 error!("Backend connection is not available (reconnecting)");
438 ErrorData::internal_error(
439 "Backend connection is not available, reconnecting...".to_string(),
440 None,
441 )
442 })?;
443
444 if inner.peer.is_transport_closed() {
446 error!("Backend transport is closed");
447 return Err(ErrorData::internal_error(
448 "Backend connection closed, please retry".to_string(),
449 None,
450 ));
451 }
452
453 match self.capabilities().prompts {
455 Some(_) => {
456 tokio::select! {
457 result = inner.peer.list_prompts(request) => {
458 match result {
459 Ok(result) => {
460 debug!("Proxying list_prompts response");
461 Ok(result)
462 }
463 Err(err) => {
464 error!("Error listing prompts: {:?}", err);
465 Err(ErrorData::internal_error(
466 format!("Error listing prompts: {err}"),
467 None,
468 ))
469 }
470 }
471 }
472 _ = context.ct.cancelled() => {
473 info!("[list_prompts] 请求被取消 - MCP ID: {}", self.mcp_id);
474 Err(ErrorData::internal_error(
475 "Request cancelled".to_string(),
476 None,
477 ))
478 }
479 }
480 }
481 None => {
482 warn!("Server doesn't support prompts capability");
484 Ok(rmcp::model::ListPromptsResult::default())
485 }
486 }
487 }
488
489 async fn get_prompt(
490 &self,
491 request: rmcp::model::GetPromptRequestParam,
492 context: RequestContext<RoleServer>,
493 ) -> Result<rmcp::model::GetPromptResult, ErrorData> {
494 let inner_guard = self.peer.load();
496 let inner = inner_guard.as_ref().ok_or_else(|| {
497 error!("Backend connection is not available (reconnecting)");
498 ErrorData::internal_error(
499 "Backend connection is not available, reconnecting...".to_string(),
500 None,
501 )
502 })?;
503
504 if inner.peer.is_transport_closed() {
506 error!("Backend transport is closed");
507 return Err(ErrorData::internal_error(
508 "Backend connection closed, please retry".to_string(),
509 None,
510 ));
511 }
512
513 match self.capabilities().prompts {
515 Some(_) => {
516 tokio::select! {
517 result = inner.peer.get_prompt(request.clone()) => {
518 match result {
519 Ok(result) => {
520 debug!("Proxying get_prompt response");
521 Ok(result)
522 }
523 Err(err) => {
524 error!("Error getting prompt: {:?}", err);
525 Err(ErrorData::internal_error(
526 format!("Error getting prompt: {err}"),
527 None,
528 ))
529 }
530 }
531 }
532 _ = context.ct.cancelled() => {
533 info!("[get_prompt] 请求被取消 - MCP ID: {}, prompt: {:?}", self.mcp_id, request.name);
534 Err(ErrorData::internal_error(
535 "Request cancelled".to_string(),
536 None,
537 ))
538 }
539 }
540 }
541 None => {
542 warn!("Server doesn't support prompts capability");
544 Ok(rmcp::model::GetPromptResult {
545 description: None,
546 messages: Vec::new(),
547 })
548 }
549 }
550 }
551
552 async fn complete(
553 &self,
554 request: rmcp::model::CompleteRequestParam,
555 context: RequestContext<RoleServer>,
556 ) -> Result<rmcp::model::CompleteResult, ErrorData> {
557 let inner_guard = self.peer.load();
559 let inner = inner_guard.as_ref().ok_or_else(|| {
560 error!("Backend connection is not available (reconnecting)");
561 ErrorData::internal_error(
562 "Backend connection is not available, reconnecting...".to_string(),
563 None,
564 )
565 })?;
566
567 if inner.peer.is_transport_closed() {
569 error!("Backend transport is closed");
570 return Err(ErrorData::internal_error(
571 "Backend connection closed, please retry".to_string(),
572 None,
573 ));
574 }
575
576 tokio::select! {
577 result = inner.peer.complete(request) => {
578 match result {
579 Ok(result) => {
580 debug!("Proxying complete response");
581 Ok(result)
582 }
583 Err(err) => {
584 error!("Error completing: {:?}", err);
585 Err(ErrorData::internal_error(
586 format!("Error completing: {err}"),
587 None,
588 ))
589 }
590 }
591 }
592 _ = context.ct.cancelled() => {
593 info!("[complete] 请求被取消 - MCP ID: {}", self.mcp_id);
594 Err(ErrorData::internal_error(
595 "Request cancelled".to_string(),
596 None,
597 ))
598 }
599 }
600 }
601
602 async fn on_progress(
603 &self,
604 notification: rmcp::model::ProgressNotificationParam,
605 _context: NotificationContext<RoleServer>,
606 ) {
607 let inner_guard = self.peer.load();
609 let inner = match inner_guard.as_ref() {
610 Some(inner) => inner,
611 None => {
612 error!("Backend connection is not available, cannot forward progress notification");
613 return;
614 }
615 };
616
617 if inner.peer.is_transport_closed() {
619 error!("Backend transport is closed, cannot forward progress notification");
620 return;
621 }
622
623 match inner.peer.notify_progress(notification).await {
624 Ok(_) => {
625 debug!("Proxying progress notification");
626 }
627 Err(err) => {
628 error!("Error notifying progress: {:?}", err);
629 }
630 }
631 }
632
633 async fn on_cancelled(
634 &self,
635 notification: rmcp::model::CancelledNotificationParam,
636 _context: NotificationContext<RoleServer>,
637 ) {
638 let inner_guard = self.peer.load();
640 let inner = match inner_guard.as_ref() {
641 Some(inner) => inner,
642 None => {
643 error!("Backend connection is not available, cannot forward cancelled notification");
644 return;
645 }
646 };
647
648 if inner.peer.is_transport_closed() {
650 error!("Backend transport is closed, cannot forward cancelled notification");
651 return;
652 }
653
654 match inner.peer.notify_cancelled(notification).await {
655 Ok(_) => {
656 debug!("Proxying cancelled notification");
657 }
658 Err(err) => {
659 error!("Error notifying cancelled: {:?}", err);
660 }
661 }
662 }
663}
664
665impl SseHandler {
666 #[inline]
668 fn capabilities(&self) -> &rmcp::model::ServerCapabilities {
669 &self.cached_info.capabilities
670 }
671
672 fn default_server_info(mcp_id: &str) -> ServerInfo {
674 warn!("[SseHandler] 创建默认 ServerInfo - MCP ID: {}", mcp_id);
675 ServerInfo {
676 protocol_version: Default::default(),
677 server_info: Implementation {
678 name: "MCP Proxy".to_string(),
679 version: "0.1.0".to_string(),
680 title: None,
681 website_url: None,
682 icons: None,
683 },
684 instructions: None,
685 capabilities: Default::default(),
686 }
687 }
688
689 fn extract_server_info(client: &RunningService<RoleClient, ClientInfo>, mcp_id: &str) -> ServerInfo {
691 client.peer_info().map(|peer_info| ServerInfo {
692 protocol_version: peer_info.protocol_version.clone(),
693 server_info: Implementation {
694 name: peer_info.server_info.name.clone(),
695 version: peer_info.server_info.version.clone(),
696 title: None,
697 website_url: None,
698 icons: None,
699 },
700 instructions: peer_info.instructions.clone(),
701 capabilities: peer_info.capabilities.clone(),
702 }).unwrap_or_else(|| Self::default_server_info(mcp_id))
703 }
704
705 pub fn new_disconnected(mcp_id: String, tool_filter: ToolFilter, default_info: ServerInfo) -> Self {
708 info!("[SseHandler] 创建断开状态的 handler - MCP ID: {}", mcp_id);
709
710 if tool_filter.is_enabled() {
712 if let Some(ref allow_list) = tool_filter.allow_tools {
713 info!(
714 "[SseHandler] 工具白名单已启用 - MCP ID: {}, 允许的工具: {:?}",
715 mcp_id, allow_list
716 );
717 }
718 if let Some(ref deny_list) = tool_filter.deny_tools {
719 info!(
720 "[SseHandler] 工具黑名单已启用 - MCP ID: {}, 排除的工具: {:?}",
721 mcp_id, deny_list
722 );
723 }
724 }
725
726 Self {
727 peer: Arc::new(ArcSwapOption::empty()),
728 cached_info: default_info,
729 mcp_id,
730 tool_filter,
731 }
732 }
733
734 pub fn new(client: RunningService<RoleClient, ClientInfo>) -> Self {
735 Self::with_mcp_id(client, "unknown".to_string())
736 }
737
738 pub fn with_mcp_id(client: RunningService<RoleClient, ClientInfo>, mcp_id: String) -> Self {
739 Self::with_tool_filter(client, mcp_id, ToolFilter::default())
740 }
741
742 pub fn with_tool_filter(
744 client: RunningService<RoleClient, ClientInfo>,
745 mcp_id: String,
746 tool_filter: ToolFilter,
747 ) -> Self {
748 use std::ops::Deref;
749
750 let cached_info = Self::extract_server_info(&client, &mcp_id);
752
753 let peer = client.deref().clone();
755
756 if tool_filter.is_enabled() {
758 if let Some(ref allow_list) = tool_filter.allow_tools {
759 info!(
760 "[SseHandler] 工具白名单已启用 - MCP ID: {}, 允许的工具: {:?}",
761 mcp_id, allow_list
762 );
763 }
764 if let Some(ref deny_list) = tool_filter.deny_tools {
765 info!(
766 "[SseHandler] 工具黑名单已启用 - MCP ID: {}, 排除的工具: {:?}",
767 mcp_id, deny_list
768 );
769 }
770 }
771
772 let inner = PeerInner {
774 peer,
775 _running: Arc::new(client),
776 };
777
778 Self {
779 peer: Arc::new(ArcSwapOption::from(Some(Arc::new(inner)))),
780 cached_info,
781 mcp_id,
782 tool_filter,
783 }
784 }
785
786 pub fn swap_backend(&self, new_client: Option<RunningService<RoleClient, ClientInfo>>) {
790 use std::ops::Deref;
791
792 match new_client {
793 Some(client) => {
794 let peer = client.deref().clone();
795 let inner = PeerInner {
796 peer,
797 _running: Arc::new(client),
798 };
799 self.peer.store(Some(Arc::new(inner)));
800 info!("[SseHandler] 后端连接已更新 - MCP ID: {}", self.mcp_id);
801 }
802 None => {
803 self.peer.store(None);
804 info!("[SseHandler] 后端连接已断开 - MCP ID: {}", self.mcp_id);
805 }
806 }
807 }
808
809 pub fn is_backend_available(&self) -> bool {
811 let inner_guard = self.peer.load();
812 match inner_guard.as_ref() {
813 Some(inner) => !inner.peer.is_transport_closed(),
814 None => false,
815 }
816 }
817
818 pub async fn is_mcp_server_ready(&self) -> bool {
820 !self.is_terminated_async().await
821 }
822
823 pub fn is_terminated(&self) -> bool {
825 !self.is_backend_available()
826 }
827
828 pub async fn is_terminated_async(&self) -> bool {
830 let inner_guard = self.peer.load();
832 let inner = match inner_guard.as_ref() {
833 Some(inner) => inner,
834 None => return true,
835 };
836
837 if inner.peer.is_transport_closed() {
839 return true;
840 }
841
842 match inner.peer.list_tools(None).await {
844 Ok(_) => {
845 debug!("后端连接状态检查: 正常");
846 false
847 }
848 Err(e) => {
849 info!("后端连接状态检查: 已断开,原因: {e}");
850 true
851 }
852 }
853 }
854
855 pub fn mcp_id(&self) -> &str {
857 &self.mcp_id
858 }
859
860 pub fn swap_backend_from_connection(&self, conn: Option<crate::client::SseClientConnection>) {
869 match conn {
870 Some(c) => {
871 let running = c.into_running_service();
872 self.swap_backend(Some(running));
873 }
874 None => {
875 self.swap_backend(None);
876 }
877 }
878 }
879}