1use tracing::{debug, info, warn, error};
2use std::time::{Instant, SystemTime};
3use rmcp::{
7 ErrorData, RoleClient, RoleServer, ServerHandler,
8 model::{
9 CallToolRequestParam, CallToolResult, ClientInfo, Content, Implementation, ListToolsResult,
10 PaginatedRequestParam, ServerInfo,
11 },
12 service::{NotificationContext, Peer, RequestContext, RunningService},
13};
14use std::sync::Arc;
15use arc_swap::ArcSwapOption;
16pub use mcp_common::ToolFilter;
17
18#[derive(Debug)]
21struct PeerInner {
22 peer: Peer<RoleClient>,
24 #[allow(dead_code)]
26 _running: Arc<RunningService<RoleClient, ClientInfo>>,
27}
28
29#[derive(Clone, Debug)]
34pub struct SseHandler {
35 peer: Arc<ArcSwapOption<PeerInner>>,
38 cached_info: ServerInfo,
40 mcp_id: String,
42 tool_filter: ToolFilter,
44}
45
46impl ServerHandler for SseHandler {
47 fn get_info(&self) -> ServerInfo {
48 self.cached_info.clone()
49 }
50
51 #[tracing::instrument(skip(self, request, context), fields(
52 mcp_id = %self.mcp_id,
53 request = ?request,
54 ))]
55 async fn list_tools(
56 &self,
57 request: Option<PaginatedRequestParam>,
58 context: RequestContext<RoleServer>,
59 ) -> Result<ListToolsResult, ErrorData> {
60 let inner_guard = self.peer.load();
62 let inner = inner_guard.as_ref().ok_or_else(|| {
63 error!("Backend connection is not available (reconnecting)");
64 ErrorData::internal_error(
65 "Backend connection is not availabletemp/rust-sdk, reconnecting...".to_string(),
66 None,
67 )
68 })?;
69
70 if inner.peer.is_transport_closed() {
72 error!("Backend transport is closed");
73 return Err(ErrorData::internal_error(
74 "Backend connection closed, please retry".to_string(),
75 None,
76 ));
77 }
78
79 match self.capabilities().tools {
81 Some(_) => {
82 tokio::select! {
84 result = inner.peer.list_tools(request) => {
85 match result {
86 Ok(result) => {
87 let filtered_tools: Vec<_> = if self.tool_filter.is_enabled() {
89 result
90 .tools
91 .into_iter()
92 .filter(|tool| self.tool_filter.is_allowed(&tool.name))
93 .collect()
94 } else {
95 result.tools
96 };
97
98 info!(
100 "[list_tools] 工具列表结果 - MCP ID: {}, 工具数量: {}{}",
101 self.mcp_id,
102 filtered_tools.len(),
103 if self.tool_filter.is_enabled() {
104 " (已过滤)"
105 } else {
106 ""
107 }
108 );
109
110 debug!(
111 "Proxying list_tools response with {} tools",
112 filtered_tools.len()
113 );
114 Ok(ListToolsResult {
115 tools: filtered_tools,
116 next_cursor: result.next_cursor,
117 })
118 }
119 Err(err) => {
120 error!("Error listing tools: {:?}", err);
121 Err(ErrorData::internal_error(
122 format!("Error listing tools: {err}"),
123 None,
124 ))
125 }
126 }
127 }
128 _ = context.ct.cancelled() => {
129 info!("[list_tools] 请求被取消 - MCP ID: {}", self.mcp_id);
130 Err(ErrorData::internal_error(
131 "Request cancelled".to_string(),
132 None,
133 ))
134 }
135 }
136 }
137 None => {
138 warn!("Server doesn't support tools capability");
140 Ok(ListToolsResult::default())
141 }
142 }
143 }
144
145 #[tracing::instrument(skip(self, request, context), fields(
146 mcp_id = %self.mcp_id,
147 tool_name = %request.name,
148 tool_arguments = ?request.arguments,
149 ))]
150 async fn call_tool(
151 &self,
152 request: CallToolRequestParam,
153 context: RequestContext<RoleServer>,
154 ) -> Result<CallToolResult, ErrorData> {
155 let start = Instant::now();
156 let start_time = SystemTime::now();
157 info!("🔧 开始执行工具: {}, 时间: {:?}", request.name, start_time);
158
159 if !self.tool_filter.is_allowed(&request.name) {
161 info!(
162 "[call_tool] 工具被过滤 - MCP ID: {}, 工具: {}",
163 self.mcp_id, request.name
164 );
165 return Ok(CallToolResult::error(vec![Content::text(format!(
166 "Tool '{}' is not allowed by filter configuration",
167 request.name
168 ))]));
169 }
170
171 let inner_guard = self.peer.load();
173 let inner = match inner_guard.as_ref() {
174 Some(inner) => inner,
175 None => {
176 error!("Backend connection is not available (reconnecting)");
177 return Ok(CallToolResult::error(vec![Content::text(
178 "Backend connection is not available, reconnecting..."
179 )]));
180 }
181 };
182
183 if inner.peer.is_transport_closed() {
185 error!("Backend transport is closed");
186 return Ok(CallToolResult::error(vec![Content::text(
187 "Backend connection closed, please retry",
188 )]));
189 }
190
191 let result = match self.capabilities().tools {
193 Some(_) => {
194 tokio::select! {
196 result = inner.peer.call_tool(request.clone()) => {
197 match result {
198 Ok(result) => {
199 let elapsed = start.elapsed();
201 info!(
202 "[call_tool] 工具调用成功 - MCP ID: {}, 工具: {}, 耗时: {}ms",
203 self.mcp_id, request.name, elapsed.as_millis()
204 );
205
206 debug!("Tool call succeeded");
207 Ok(result)
208 }
209 Err(err) => {
210 let elapsed = start.elapsed();
211 error!("Error calling tool: {:?}, 耗时: {}ms", err, elapsed.as_millis());
212 Ok(CallToolResult::error(vec![Content::text(format!(
214 "Error: {err}"
215 ))]))
216 }
217 }
218 }
219 _ = context.ct.cancelled() => {
220 let elapsed = start.elapsed();
221 info!(
222 "[call_tool] 请求被取消 - MCP ID: {}, 工具: {}, 耗时: {}ms",
223 self.mcp_id, request.name, elapsed.as_millis()
224 );
225 Ok(CallToolResult::error(vec![Content::text(
226 "Request cancelled"
227 )]))
228 }
229 }
230 }
231 None => {
232 error!("Server doesn't support tools capability");
233 Ok(CallToolResult::error(vec![Content::text(
234 "Server doesn't support tools capability",
235 )]))
236 }
237 };
238
239 let total_elapsed = start.elapsed();
240 info!("✅ 工具执行完成: {}, 总耗时: {}ms", request.name, total_elapsed.as_millis());
241 result
242 }
243
244 async fn list_resources(
245 &self,
246 request: Option<PaginatedRequestParam>,
247 context: RequestContext<RoleServer>,
248 ) -> Result<rmcp::model::ListResourcesResult, ErrorData> {
249 let inner_guard = self.peer.load();
251 let inner = inner_guard.as_ref().ok_or_else(|| {
252 error!("Backend connection is not available (reconnecting)");
253 ErrorData::internal_error(
254 "Backend connection is not available, reconnecting...".to_string(),
255 None,
256 )
257 })?;
258
259 if inner.peer.is_transport_closed() {
261 error!("Backend transport is closed");
262 return Err(ErrorData::internal_error(
263 "Backend connection closed, please retry".to_string(),
264 None,
265 ));
266 }
267
268 match self.capabilities().resources {
270 Some(_) => {
271 tokio::select! {
272 result = inner.peer.list_resources(request) => {
273 match result {
274 Ok(result) => {
275 info!(
277 "[list_resources] 资源列表结果 - MCP ID: {}, 资源数量: {}",
278 self.mcp_id,
279 result.resources.len()
280 );
281
282 debug!("Proxying list_resources response");
283 Ok(result)
284 }
285 Err(err) => {
286 error!("Error listing resources: {:?}", err);
287 Err(ErrorData::internal_error(
288 format!("Error listing resources: {err}"),
289 None,
290 ))
291 }
292 }
293 }
294 _ = context.ct.cancelled() => {
295 info!("[list_resources] 请求被取消 - MCP ID: {}", self.mcp_id);
296 Err(ErrorData::internal_error(
297 "Request cancelled".to_string(),
298 None,
299 ))
300 }
301 }
302 }
303 None => {
304 warn!("Server doesn't support resources capability");
306 Ok(rmcp::model::ListResourcesResult::default())
307 }
308 }
309 }
310
311 async fn read_resource(
312 &self,
313 request: rmcp::model::ReadResourceRequestParam,
314 context: RequestContext<RoleServer>,
315 ) -> Result<rmcp::model::ReadResourceResult, ErrorData> {
316 let inner_guard = self.peer.load();
318 let inner = inner_guard.as_ref().ok_or_else(|| {
319 error!("Backend connection is not available (reconnecting)");
320 ErrorData::internal_error(
321 "Backend connection is not available, reconnecting...".to_string(),
322 None,
323 )
324 })?;
325
326 if inner.peer.is_transport_closed() {
328 error!("Backend transport is closed");
329 return Err(ErrorData::internal_error(
330 "Backend connection closed, please retry".to_string(),
331 None,
332 ));
333 }
334
335 match self.capabilities().resources {
337 Some(_) => {
338 tokio::select! {
339 result = inner.peer.read_resource(rmcp::model::ReadResourceRequestParam {
340 uri: request.uri.clone(),
341 }) => {
342 match result {
343 Ok(result) => {
344 info!(
346 "[read_resource] 资源读取结果 - MCP ID: {}, URI: {}",
347 self.mcp_id, request.uri
348 );
349
350 debug!("Proxying read_resource response for {}", request.uri);
351 Ok(result)
352 }
353 Err(err) => {
354 error!("Error reading resource: {:?}", err);
355 Err(ErrorData::internal_error(
356 format!("Error reading resource: {err}"),
357 None,
358 ))
359 }
360 }
361 }
362 _ = context.ct.cancelled() => {
363 info!("[read_resource] 请求被取消 - MCP ID: {}, URI: {}", self.mcp_id, request.uri);
364 Err(ErrorData::internal_error(
365 "Request cancelled".to_string(),
366 None,
367 ))
368 }
369 }
370 }
371 None => {
372 error!("Server doesn't support resources capability");
374 Ok(rmcp::model::ReadResourceResult {
375 contents: Vec::new(),
376 })
377 }
378 }
379 }
380
381 async fn list_resource_templates(
382 &self,
383 request: Option<PaginatedRequestParam>,
384 context: RequestContext<RoleServer>,
385 ) -> Result<rmcp::model::ListResourceTemplatesResult, ErrorData> {
386 let inner_guard = self.peer.load();
388 let inner = inner_guard.as_ref().ok_or_else(|| {
389 error!("Backend connection is not available (reconnecting)");
390 ErrorData::internal_error(
391 "Backend connection is not available, reconnecting...".to_string(),
392 None,
393 )
394 })?;
395
396 if inner.peer.is_transport_closed() {
398 error!("Backend transport is closed");
399 return Err(ErrorData::internal_error(
400 "Backend connection closed, please retry".to_string(),
401 None,
402 ));
403 }
404
405 match self.capabilities().resources {
407 Some(_) => {
408 tokio::select! {
409 result = inner.peer.list_resource_templates(request) => {
410 match result {
411 Ok(result) => {
412 debug!("Proxying list_resource_templates response");
413 Ok(result)
414 }
415 Err(err) => {
416 error!("Error listing resource templates: {:?}", err);
417 Err(ErrorData::internal_error(
418 format!("Error listing resource templates: {err}"),
419 None,
420 ))
421 }
422 }
423 }
424 _ = context.ct.cancelled() => {
425 info!("[list_resource_templates] 请求被取消 - MCP ID: {}", self.mcp_id);
426 Err(ErrorData::internal_error(
427 "Request cancelled".to_string(),
428 None,
429 ))
430 }
431 }
432 }
433 None => {
434 warn!("Server doesn't support resources capability");
436 Ok(rmcp::model::ListResourceTemplatesResult::default())
437 }
438 }
439 }
440
441 async fn list_prompts(
442 &self,
443 request: Option<PaginatedRequestParam>,
444 context: RequestContext<RoleServer>,
445 ) -> Result<rmcp::model::ListPromptsResult, ErrorData> {
446 let inner_guard = self.peer.load();
448 let inner = inner_guard.as_ref().ok_or_else(|| {
449 error!("Backend connection is not available (reconnecting)");
450 ErrorData::internal_error(
451 "Backend connection is not available, reconnecting...".to_string(),
452 None,
453 )
454 })?;
455
456 if inner.peer.is_transport_closed() {
458 error!("Backend transport is closed");
459 return Err(ErrorData::internal_error(
460 "Backend connection closed, please retry".to_string(),
461 None,
462 ));
463 }
464
465 match self.capabilities().prompts {
467 Some(_) => {
468 tokio::select! {
469 result = inner.peer.list_prompts(request) => {
470 match result {
471 Ok(result) => {
472 debug!("Proxying list_prompts response");
473 Ok(result)
474 }
475 Err(err) => {
476 error!("Error listing prompts: {:?}", err);
477 Err(ErrorData::internal_error(
478 format!("Error listing prompts: {err}"),
479 None,
480 ))
481 }
482 }
483 }
484 _ = context.ct.cancelled() => {
485 info!("[list_prompts] 请求被取消 - MCP ID: {}", self.mcp_id);
486 Err(ErrorData::internal_error(
487 "Request cancelled".to_string(),
488 None,
489 ))
490 }
491 }
492 }
493 None => {
494 warn!("Server doesn't support prompts capability");
496 Ok(rmcp::model::ListPromptsResult::default())
497 }
498 }
499 }
500
501 async fn get_prompt(
502 &self,
503 request: rmcp::model::GetPromptRequestParam,
504 context: RequestContext<RoleServer>,
505 ) -> Result<rmcp::model::GetPromptResult, ErrorData> {
506 let inner_guard = self.peer.load();
508 let inner = inner_guard.as_ref().ok_or_else(|| {
509 error!("Backend connection is not available (reconnecting)");
510 ErrorData::internal_error(
511 "Backend connection is not available, reconnecting...".to_string(),
512 None,
513 )
514 })?;
515
516 if inner.peer.is_transport_closed() {
518 error!("Backend transport is closed");
519 return Err(ErrorData::internal_error(
520 "Backend connection closed, please retry".to_string(),
521 None,
522 ));
523 }
524
525 match self.capabilities().prompts {
527 Some(_) => {
528 tokio::select! {
529 result = inner.peer.get_prompt(request.clone()) => {
530 match result {
531 Ok(result) => {
532 debug!("Proxying get_prompt response");
533 Ok(result)
534 }
535 Err(err) => {
536 error!("Error getting prompt: {:?}", err);
537 Err(ErrorData::internal_error(
538 format!("Error getting prompt: {err}"),
539 None,
540 ))
541 }
542 }
543 }
544 _ = context.ct.cancelled() => {
545 info!("[get_prompt] 请求被取消 - MCP ID: {}, prompt: {:?}", self.mcp_id, request.name);
546 Err(ErrorData::internal_error(
547 "Request cancelled".to_string(),
548 None,
549 ))
550 }
551 }
552 }
553 None => {
554 warn!("Server doesn't support prompts capability");
556 Ok(rmcp::model::GetPromptResult {
557 description: None,
558 messages: Vec::new(),
559 })
560 }
561 }
562 }
563
564 async fn complete(
565 &self,
566 request: rmcp::model::CompleteRequestParam,
567 context: RequestContext<RoleServer>,
568 ) -> Result<rmcp::model::CompleteResult, ErrorData> {
569 let inner_guard = self.peer.load();
571 let inner = inner_guard.as_ref().ok_or_else(|| {
572 error!("Backend connection is not available (reconnecting)");
573 ErrorData::internal_error(
574 "Backend connection is not available, reconnecting...".to_string(),
575 None,
576 )
577 })?;
578
579 if inner.peer.is_transport_closed() {
581 error!("Backend transport is closed");
582 return Err(ErrorData::internal_error(
583 "Backend connection closed, please retry".to_string(),
584 None,
585 ));
586 }
587
588 tokio::select! {
589 result = inner.peer.complete(request) => {
590 match result {
591 Ok(result) => {
592 debug!("Proxying complete response");
593 Ok(result)
594 }
595 Err(err) => {
596 error!("Error completing: {:?}", err);
597 Err(ErrorData::internal_error(
598 format!("Error completing: {err}"),
599 None,
600 ))
601 }
602 }
603 }
604 _ = context.ct.cancelled() => {
605 info!("[complete] 请求被取消 - MCP ID: {}", self.mcp_id);
606 Err(ErrorData::internal_error(
607 "Request cancelled".to_string(),
608 None,
609 ))
610 }
611 }
612 }
613
614 async fn on_progress(
615 &self,
616 notification: rmcp::model::ProgressNotificationParam,
617 _context: NotificationContext<RoleServer>,
618 ) {
619 let inner_guard = self.peer.load();
621 let inner = match inner_guard.as_ref() {
622 Some(inner) => inner,
623 None => {
624 error!("Backend connection is not available, cannot forward progress notification");
625 return;
626 }
627 };
628
629 if inner.peer.is_transport_closed() {
631 error!("Backend transport is closed, cannot forward progress notification");
632 return;
633 }
634
635 match inner.peer.notify_progress(notification).await {
636 Ok(_) => {
637 debug!("Proxying progress notification");
638 }
639 Err(err) => {
640 error!("Error notifying progress: {:?}", err);
641 }
642 }
643 }
644
645 async fn on_cancelled(
646 &self,
647 notification: rmcp::model::CancelledNotificationParam,
648 _context: NotificationContext<RoleServer>,
649 ) {
650 let inner_guard = self.peer.load();
652 let inner = match inner_guard.as_ref() {
653 Some(inner) => inner,
654 None => {
655 error!("Backend connection is not available, cannot forward cancelled notification");
656 return;
657 }
658 };
659
660 if inner.peer.is_transport_closed() {
662 error!("Backend transport is closed, cannot forward cancelled notification");
663 return;
664 }
665
666 match inner.peer.notify_cancelled(notification).await {
667 Ok(_) => {
668 debug!("Proxying cancelled notification");
669 }
670 Err(err) => {
671 error!("Error notifying cancelled: {:?}", err);
672 }
673 }
674 }
675}
676
677impl SseHandler {
678 #[inline]
680 fn capabilities(&self) -> &rmcp::model::ServerCapabilities {
681 &self.cached_info.capabilities
682 }
683
684 fn default_server_info(mcp_id: &str) -> ServerInfo {
686 warn!("[SseHandler] 创建默认 ServerInfo - MCP ID: {}", mcp_id);
687 ServerInfo {
688 protocol_version: Default::default(),
689 server_info: Implementation {
690 name: "MCP Proxy".to_string(),
691 version: "0.1.0".to_string(),
692 title: None,
693 website_url: None,
694 icons: None,
695 },
696 instructions: None,
697 capabilities: Default::default(),
698 }
699 }
700
701 fn extract_server_info(client: &RunningService<RoleClient, ClientInfo>, mcp_id: &str) -> ServerInfo {
703 client.peer_info().map(|peer_info| ServerInfo {
704 protocol_version: peer_info.protocol_version.clone(),
705 server_info: Implementation {
706 name: peer_info.server_info.name.clone(),
707 version: peer_info.server_info.version.clone(),
708 title: None,
709 website_url: None,
710 icons: None,
711 },
712 instructions: peer_info.instructions.clone(),
713 capabilities: peer_info.capabilities.clone(),
714 }).unwrap_or_else(|| Self::default_server_info(mcp_id))
715 }
716
717 pub fn new_disconnected(mcp_id: String, tool_filter: ToolFilter, default_info: ServerInfo) -> Self {
720 info!("[SseHandler] 创建断开状态的 handler - MCP ID: {}", mcp_id);
721
722 if tool_filter.is_enabled() {
724 if let Some(ref allow_list) = tool_filter.allow_tools {
725 info!(
726 "[SseHandler] 工具白名单已启用 - MCP ID: {}, 允许的工具: {:?}",
727 mcp_id, allow_list
728 );
729 }
730 if let Some(ref deny_list) = tool_filter.deny_tools {
731 info!(
732 "[SseHandler] 工具黑名单已启用 - MCP ID: {}, 排除的工具: {:?}",
733 mcp_id, deny_list
734 );
735 }
736 }
737
738 Self {
739 peer: Arc::new(ArcSwapOption::empty()),
740 cached_info: default_info,
741 mcp_id,
742 tool_filter,
743 }
744 }
745
746 pub fn new(client: RunningService<RoleClient, ClientInfo>) -> Self {
747 Self::with_mcp_id(client, "unknown".to_string())
748 }
749
750 pub fn with_mcp_id(client: RunningService<RoleClient, ClientInfo>, mcp_id: String) -> Self {
751 Self::with_tool_filter(client, mcp_id, ToolFilter::default())
752 }
753
754 pub fn with_tool_filter(
756 client: RunningService<RoleClient, ClientInfo>,
757 mcp_id: String,
758 tool_filter: ToolFilter,
759 ) -> Self {
760 use std::ops::Deref;
761
762 let cached_info = Self::extract_server_info(&client, &mcp_id);
764
765 let peer = client.deref().clone();
767
768 if tool_filter.is_enabled() {
770 if let Some(ref allow_list) = tool_filter.allow_tools {
771 info!(
772 "[SseHandler] 工具白名单已启用 - MCP ID: {}, 允许的工具: {:?}",
773 mcp_id, allow_list
774 );
775 }
776 if let Some(ref deny_list) = tool_filter.deny_tools {
777 info!(
778 "[SseHandler] 工具黑名单已启用 - MCP ID: {}, 排除的工具: {:?}",
779 mcp_id, deny_list
780 );
781 }
782 }
783
784 let inner = PeerInner {
786 peer,
787 _running: Arc::new(client),
788 };
789
790 Self {
791 peer: Arc::new(ArcSwapOption::from(Some(Arc::new(inner)))),
792 cached_info,
793 mcp_id,
794 tool_filter,
795 }
796 }
797
798 pub fn swap_backend(&self, new_client: Option<RunningService<RoleClient, ClientInfo>>) {
802 use std::ops::Deref;
803
804 match new_client {
805 Some(client) => {
806 let peer = client.deref().clone();
807 let inner = PeerInner {
808 peer,
809 _running: Arc::new(client),
810 };
811 self.peer.store(Some(Arc::new(inner)));
812 info!("[SseHandler] 后端连接已更新 - MCP ID: {}", self.mcp_id);
813 }
814 None => {
815 self.peer.store(None);
816 info!("[SseHandler] 后端连接已断开 - MCP ID: {}", self.mcp_id);
817 }
818 }
819 }
820
821 pub fn is_backend_available(&self) -> bool {
823 let inner_guard = self.peer.load();
824 match inner_guard.as_ref() {
825 Some(inner) => !inner.peer.is_transport_closed(),
826 None => false,
827 }
828 }
829
830 pub async fn is_mcp_server_ready(&self) -> bool {
832 !self.is_terminated_async().await
833 }
834
835 pub fn is_terminated(&self) -> bool {
837 !self.is_backend_available()
838 }
839
840 pub async fn is_terminated_async(&self) -> bool {
842 let inner_guard = self.peer.load();
844 let inner = match inner_guard.as_ref() {
845 Some(inner) => inner,
846 None => return true,
847 };
848
849 if inner.peer.is_transport_closed() {
851 return true;
852 }
853
854 match inner.peer.list_tools(None).await {
856 Ok(_) => {
857 debug!("后端连接状态检查: 正常");
858 false
859 }
860 Err(e) => {
861 info!("后端连接状态检查: 已断开,原因: {e}");
862 true
863 }
864 }
865 }
866
867 pub fn mcp_id(&self) -> &str {
869 &self.mcp_id
870 }
871
872 pub fn swap_backend_from_connection(&self, conn: Option<crate::client::SseClientConnection>) {
881 match conn {
882 Some(c) => {
883 let running = c.into_running_service();
884 self.swap_backend(Some(running));
885 }
886 None => {
887 self.swap_backend(None);
888 }
889 }
890 }
891}