1use arc_swap::ArcSwapOption;
2pub use mcp_common::ToolFilter;
3use rmcp::{
7 ErrorData, RoleClient, RoleServer, ServerHandler,
8 model::{
9 CallToolRequestParams, CallToolResult, ClientInfo, Content, Implementation,
10 ListToolsResult, PaginatedRequestParams, ServerInfo,
11 },
12 service::{NotificationContext, Peer, RequestContext, RunningService},
13};
14use std::sync::Arc;
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::time::Instant;
17use tracing::{debug, error, info, warn};
18
19static REQUEST_COUNTER: AtomicU64 = AtomicU64::new(1);
21
22#[derive(Debug)]
25struct PeerInner {
26 peer: Peer<RoleClient>,
28 #[allow(dead_code)]
30 _running: Arc<RunningService<RoleClient, ClientInfo>>,
31}
32
33#[derive(Clone, Debug)]
40pub struct ProxyHandler {
41 peer: Arc<ArcSwapOption<PeerInner>>,
44 cached_info: ServerInfo,
46 mcp_id: String,
48 tool_filter: ToolFilter,
50 backend_version: Arc<AtomicU64>,
53}
54
55impl ServerHandler for ProxyHandler {
56 fn get_info(&self) -> ServerInfo {
57 self.cached_info.clone()
58 }
59
60 #[tracing::instrument(skip(self, request, context), fields(
61 mcp_id = %self.mcp_id,
62 request = ?request,
63 ))]
64 async fn list_tools(
65 &self,
66 request: Option<PaginatedRequestParams>,
67 context: RequestContext<RoleServer>,
68 ) -> Result<ListToolsResult, ErrorData> {
69 let inner_guard = self.peer.load();
71 let inner = inner_guard.as_ref().ok_or_else(|| {
72 error!("Backend connection is not available (reconnecting)");
73 ErrorData::internal_error(
74 "Backend connection is not availabletemp/rust-sdk, reconnecting...".to_string(),
75 None,
76 )
77 })?;
78
79 if inner.peer.is_transport_closed() {
81 error!("Backend transport is closed");
82 return Err(ErrorData::internal_error(
83 "Backend connection closed, please retry".to_string(),
84 None,
85 ));
86 }
87
88 match self.capabilities().tools {
90 Some(_) => {
91 tokio::select! {
93 result = inner.peer.list_tools(request) => {
94 match result {
95 Ok(result) => {
96 let filtered_tools: Vec<_> = if self.tool_filter.is_enabled() {
98 result
99 .tools
100 .into_iter()
101 .filter(|tool| self.tool_filter.is_allowed(&tool.name))
102 .collect()
103 } else {
104 result.tools
105 };
106
107 info!(
109 "[list_tools] Tool list results - MCP ID: {}, number of tools: {}{}",
110 self.mcp_id,
111 filtered_tools.len(),
112 if self.tool_filter.is_enabled() {
113 " (filtered)"
114 } else {
115 ""
116 }
117 );
118
119 debug!(
120 "Proxying list_tools response with {} tools",
121 filtered_tools.len()
122 );
123 Ok(ListToolsResult {
124 tools: filtered_tools,
125 next_cursor: result.next_cursor,
126 meta: result.meta, })
128 }
129 Err(err) => {
130 error!("Error listing tools: {:?}", err);
131 Err(ErrorData::internal_error(
132 format!("Error listing tools: {err}"),
133 None,
134 ))
135 }
136 }
137 }
138 _ = context.ct.cancelled() => {
139 info!("[list_tools] Request canceled - MCP ID: {}", self.mcp_id);
140 Err(ErrorData::internal_error(
141 "Request cancelled".to_string(),
142 None,
143 ))
144 }
145 }
146 }
147 None => {
148 warn!("Server doesn't support tools capability");
150 Ok(ListToolsResult::default())
151 }
152 }
153 }
154
155 #[tracing::instrument(skip(self, request, context), fields(
156 mcp_id = %self.mcp_id,
157 tool_name = %request.name,
158 tool_arguments = ?request.arguments,
159 ))]
160 async fn call_tool(
161 &self,
162 request: CallToolRequestParams,
163 context: RequestContext<RoleServer>,
164 ) -> Result<CallToolResult, ErrorData> {
165 let request_id = REQUEST_COUNTER.fetch_add(1, Ordering::Relaxed);
167 let start = Instant::now();
168
169 info!(
170 "[call_tool:{}] Start - Tool: {}, MCP ID: {}",
171 request_id, request.name, self.mcp_id
172 );
173
174 if !self.tool_filter.is_allowed(&request.name) {
176 info!(
177 "[call_tool:{}] Tool is filtered - MCP ID: {}, Tool: {}",
178 request_id, self.mcp_id, request.name
179 );
180 return Ok(CallToolResult::error(vec![Content::text(format!(
181 "Tool '{}' is not allowed by filter configuration",
182 request.name
183 ))]));
184 }
185
186 let inner_guard = self.peer.load();
188 let inner = match inner_guard.as_ref() {
189 Some(inner) => {
190 let transport_closed = inner.peer.is_transport_closed();
191 info!(
192 "[call_tool:{}] Backend connection exists - transport_closed: {}",
193 request_id, transport_closed
194 );
195 inner
196 }
197 None => {
198 error!(
199 "[call_tool:{}] Backend connection unavailable (reconnecting) - MCP ID: {}",
200 request_id, self.mcp_id
201 );
202 return Ok(CallToolResult::error(vec![Content::text(
203 "Backend connection is not available, reconnecting...",
204 )]));
205 }
206 };
207
208 if inner.peer.is_transport_closed() {
210 error!(
211 "[call_tool:{}] Backend transport is closed - MCP ID: {}",
212 request_id, self.mcp_id
213 );
214 return Ok(CallToolResult::error(vec![Content::text(
215 "Backend connection closed, please retry",
216 )]));
217 }
218
219 let result = match self.capabilities().tools {
221 Some(_) => {
222 info!(
224 "[call_tool:{}] Send request to backend... - Tool: {}, Elapsed time: {}ms",
225 request_id,
226 request.name,
227 start.elapsed().as_millis()
228 );
229
230 let call_future = inner.peer.call_tool(request.clone());
232 tokio::pin!(call_future);
233
234 const HEARTBEAT_INTERVAL: std::time::Duration = std::time::Duration::from_secs(30);
236 let mut heartbeat_interval = tokio::time::interval(HEARTBEAT_INTERVAL);
237 heartbeat_interval.tick().await;
239
240 let call_result = loop {
242 tokio::select! {
243 biased;
245
246 result = &mut call_future => {
247 break result;
248 }
249 _ = context.ct.cancelled() => {
250 let elapsed = start.elapsed();
251 warn!(
252 "[call_tool:{}] Request canceled - Tool: {}, Time taken: {}ms, MCP ID: {}",
253 request_id, request.name, elapsed.as_millis(), self.mcp_id
254 );
255 return Ok(CallToolResult::error(vec![Content::text(
256 "Request cancelled"
257 )]));
258 }
259 _ = heartbeat_interval.tick() => {
260 let elapsed = start.elapsed();
262 let transport_closed = inner.peer.is_transport_closed();
263 info!(
264 "[call_tool:{}] Waiting for backend response... - Tool: {}, Waiting: {}ms, \\ transport_closed: {}, MCP ID: {}",
265 request_id, request.name, elapsed.as_millis(),
266 transport_closed, self.mcp_id
267 );
268 }
269 }
270 };
271
272 let elapsed = start.elapsed();
273 match &call_result {
274 Ok(call_result) => {
275 let is_error = call_result.is_error.unwrap_or(false);
277 info!(
278 "[call_tool:{}] Response received - tool: {}, time taken: {}ms, is_error: {}, MCP ID: {}",
279 request_id,
280 request.name,
281 elapsed.as_millis(),
282 is_error,
283 self.mcp_id
284 );
285 if is_error {
286 debug!(
287 "[call_tool:{}] Error response content: {:?}",
288 request_id, call_result.content
289 );
290 }
291 Ok(call_result.clone())
292 }
293 Err(err) => {
294 error!(
295 "[call_tool:{}] Backend returns error - Tool: {}, Time: {}ms, Error: {:?}, MCP ID: {}",
296 request_id,
297 request.name,
298 elapsed.as_millis(),
299 err,
300 self.mcp_id
301 );
302 Ok(CallToolResult::error(vec![Content::text(format!(
304 "Error: {err}"
305 ))]))
306 }
307 }
308 }
309 None => {
310 error!(
311 "[call_tool:{}] The server does not support tools capability - MCP ID: {}",
312 request_id, self.mcp_id
313 );
314 Ok(CallToolResult::error(vec![Content::text(
315 "Server doesn't support tools capability",
316 )]))
317 }
318 };
319
320 let total_elapsed = start.elapsed();
321 info!(
322 "[call_tool:{}] Completed - Tool: {}, total time taken: {}ms",
323 request_id,
324 request.name,
325 total_elapsed.as_millis()
326 );
327 result
328 }
329
330 async fn list_resources(
331 &self,
332 request: Option<PaginatedRequestParams>,
333 context: RequestContext<RoleServer>,
334 ) -> Result<rmcp::model::ListResourcesResult, ErrorData> {
335 let inner_guard = self.peer.load();
337 let inner = inner_guard.as_ref().ok_or_else(|| {
338 error!("Backend connection is not available (reconnecting)");
339 ErrorData::internal_error(
340 "Backend connection is not available, reconnecting...".to_string(),
341 None,
342 )
343 })?;
344
345 if inner.peer.is_transport_closed() {
347 error!("Backend transport is closed");
348 return Err(ErrorData::internal_error(
349 "Backend connection closed, please retry".to_string(),
350 None,
351 ));
352 }
353
354 match self.capabilities().resources {
356 Some(_) => {
357 tokio::select! {
358 result = inner.peer.list_resources(request) => {
359 match result {
360 Ok(result) => {
361 info!(
363 "[list_resources] Resource list results - MCP ID: {}, resource quantity: {}",
364 self.mcp_id,
365 result.resources.len()
366 );
367
368 debug!("Proxying list_resources response");
369 Ok(result)
370 }
371 Err(err) => {
372 error!("Error listing resources: {:?}", err);
373 Err(ErrorData::internal_error(
374 format!("Error listing resources: {err}"),
375 None,
376 ))
377 }
378 }
379 }
380 _ = context.ct.cancelled() => {
381 info!("[list_resources] Request canceled - MCP ID: {}", self.mcp_id);
382 Err(ErrorData::internal_error(
383 "Request cancelled".to_string(),
384 None,
385 ))
386 }
387 }
388 }
389 None => {
390 warn!("Server doesn't support resources capability");
392 Ok(rmcp::model::ListResourcesResult::default())
393 }
394 }
395 }
396
397 async fn read_resource(
398 &self,
399 request: rmcp::model::ReadResourceRequestParams,
400 context: RequestContext<RoleServer>,
401 ) -> Result<rmcp::model::ReadResourceResult, ErrorData> {
402 let inner_guard = self.peer.load();
404 let inner = inner_guard.as_ref().ok_or_else(|| {
405 error!("Backend connection is not available (reconnecting)");
406 ErrorData::internal_error(
407 "Backend connection is not available, reconnecting...".to_string(),
408 None,
409 )
410 })?;
411
412 if inner.peer.is_transport_closed() {
414 error!("Backend transport is closed");
415 return Err(ErrorData::internal_error(
416 "Backend connection closed, please retry".to_string(),
417 None,
418 ));
419 }
420
421 match self.capabilities().resources {
423 Some(_) => {
424 tokio::select! {
425 result = inner.peer.read_resource(rmcp::model::ReadResourceRequestParams::new(request.uri.clone())) => {
426 match result {
427 Ok(result) => {
428 info!(
430 "[read_resource] Resource read result - MCP ID: {}, URI: {}",
431 self.mcp_id, request.uri
432 );
433
434 debug!("Proxying read_resource response for {}", request.uri);
435 Ok(result)
436 }
437 Err(err) => {
438 error!("Error reading resource: {:?}", err);
439 Err(ErrorData::internal_error(
440 format!("Error reading resource: {err}"),
441 None,
442 ))
443 }
444 }
445 }
446 _ = context.ct.cancelled() => {
447 info!("[read_resource] Request canceled - MCP ID: {}, URI: {}", self.mcp_id, request.uri);
448 Err(ErrorData::internal_error(
449 "Request cancelled".to_string(),
450 None,
451 ))
452 }
453 }
454 }
455 None => {
456 error!("Server doesn't support resources capability");
458 Ok(rmcp::model::ReadResourceResult::new(vec![]))
459 }
460 }
461 }
462
463 async fn list_resource_templates(
464 &self,
465 request: Option<PaginatedRequestParams>,
466 context: RequestContext<RoleServer>,
467 ) -> Result<rmcp::model::ListResourceTemplatesResult, ErrorData> {
468 let inner_guard = self.peer.load();
470 let inner = inner_guard.as_ref().ok_or_else(|| {
471 error!("Backend connection is not available (reconnecting)");
472 ErrorData::internal_error(
473 "Backend connection is not available, reconnecting...".to_string(),
474 None,
475 )
476 })?;
477
478 if inner.peer.is_transport_closed() {
480 error!("Backend transport is closed");
481 return Err(ErrorData::internal_error(
482 "Backend connection closed, please retry".to_string(),
483 None,
484 ));
485 }
486
487 match self.capabilities().resources {
489 Some(_) => {
490 tokio::select! {
491 result = inner.peer.list_resource_templates(request) => {
492 match result {
493 Ok(result) => {
494 debug!("Proxying list_resource_templates response");
495 Ok(result)
496 }
497 Err(err) => {
498 error!("Error listing resource templates: {:?}", err);
499 Err(ErrorData::internal_error(
500 format!("Error listing resource templates: {err}"),
501 None,
502 ))
503 }
504 }
505 }
506 _ = context.ct.cancelled() => {
507 info!("[list_resource_templates] request canceled - MCP ID: {}", self.mcp_id);
508 Err(ErrorData::internal_error(
509 "Request cancelled".to_string(),
510 None,
511 ))
512 }
513 }
514 }
515 None => {
516 warn!("Server doesn't support resources capability");
518 Ok(rmcp::model::ListResourceTemplatesResult::default())
519 }
520 }
521 }
522
523 async fn list_prompts(
524 &self,
525 request: Option<PaginatedRequestParams>,
526 context: RequestContext<RoleServer>,
527 ) -> Result<rmcp::model::ListPromptsResult, ErrorData> {
528 let inner_guard = self.peer.load();
530 let inner = inner_guard.as_ref().ok_or_else(|| {
531 error!("Backend connection is not available (reconnecting)");
532 ErrorData::internal_error(
533 "Backend connection is not available, reconnecting...".to_string(),
534 None,
535 )
536 })?;
537
538 if inner.peer.is_transport_closed() {
540 error!("Backend transport is closed");
541 return Err(ErrorData::internal_error(
542 "Backend connection closed, please retry".to_string(),
543 None,
544 ));
545 }
546
547 match self.capabilities().prompts {
549 Some(_) => {
550 tokio::select! {
551 result = inner.peer.list_prompts(request) => {
552 match result {
553 Ok(result) => {
554 debug!("Proxying list_prompts response");
555 Ok(result)
556 }
557 Err(err) => {
558 error!("Error listing prompts: {:?}", err);
559 Err(ErrorData::internal_error(
560 format!("Error listing prompts: {err}"),
561 None,
562 ))
563 }
564 }
565 }
566 _ = context.ct.cancelled() => {
567 info!("[list_prompts] Request canceled - MCP ID: {}", self.mcp_id);
568 Err(ErrorData::internal_error(
569 "Request cancelled".to_string(),
570 None,
571 ))
572 }
573 }
574 }
575 None => {
576 warn!("Server doesn't support prompts capability");
578 Ok(rmcp::model::ListPromptsResult::default())
579 }
580 }
581 }
582
583 async fn get_prompt(
584 &self,
585 request: rmcp::model::GetPromptRequestParams,
586 context: RequestContext<RoleServer>,
587 ) -> Result<rmcp::model::GetPromptResult, ErrorData> {
588 let inner_guard = self.peer.load();
590 let inner = inner_guard.as_ref().ok_or_else(|| {
591 error!("Backend connection is not available (reconnecting)");
592 ErrorData::internal_error(
593 "Backend connection is not available, reconnecting...".to_string(),
594 None,
595 )
596 })?;
597
598 if inner.peer.is_transport_closed() {
600 error!("Backend transport is closed");
601 return Err(ErrorData::internal_error(
602 "Backend connection closed, please retry".to_string(),
603 None,
604 ));
605 }
606
607 match self.capabilities().prompts {
609 Some(_) => {
610 tokio::select! {
611 result = inner.peer.get_prompt(request.clone()) => {
612 match result {
613 Ok(result) => {
614 debug!("Proxying get_prompt response");
615 Ok(result)
616 }
617 Err(err) => {
618 error!("Error getting prompt: {:?}", err);
619 Err(ErrorData::internal_error(
620 format!("Error getting prompt: {err}"),
621 None,
622 ))
623 }
624 }
625 }
626 _ = context.ct.cancelled() => {
627 info!("[get_prompt] Request canceled - MCP ID: {}, prompt: {:?}", self.mcp_id, request.name);
628 Err(ErrorData::internal_error(
629 "Request cancelled".to_string(),
630 None,
631 ))
632 }
633 }
634 }
635 None => {
636 warn!("Server doesn't support prompts capability");
638 let messages = Vec::new();
639 Ok(rmcp::model::GetPromptResult::new(messages))
640 }
641 }
642 }
643
644 async fn complete(
645 &self,
646 request: rmcp::model::CompleteRequestParams,
647 context: RequestContext<RoleServer>,
648 ) -> Result<rmcp::model::CompleteResult, ErrorData> {
649 let inner_guard = self.peer.load();
651 let inner = inner_guard.as_ref().ok_or_else(|| {
652 error!("Backend connection is not available (reconnecting)");
653 ErrorData::internal_error(
654 "Backend connection is not available, reconnecting...".to_string(),
655 None,
656 )
657 })?;
658
659 if inner.peer.is_transport_closed() {
661 error!("Backend transport is closed");
662 return Err(ErrorData::internal_error(
663 "Backend connection closed, please retry".to_string(),
664 None,
665 ));
666 }
667
668 tokio::select! {
669 result = inner.peer.complete(request) => {
670 match result {
671 Ok(result) => {
672 debug!("Proxying complete response");
673 Ok(result)
674 }
675 Err(err) => {
676 error!("Error completing: {:?}", err);
677 Err(ErrorData::internal_error(
678 format!("Error completing: {err}"),
679 None,
680 ))
681 }
682 }
683 }
684 _ = context.ct.cancelled() => {
685 info!("[complete] Request canceled - MCP ID: {}", self.mcp_id);
686 Err(ErrorData::internal_error(
687 "Request cancelled".to_string(),
688 None,
689 ))
690 }
691 }
692 }
693
694 async fn on_progress(
695 &self,
696 notification: rmcp::model::ProgressNotificationParam,
697 _context: NotificationContext<RoleServer>,
698 ) {
699 let inner_guard = self.peer.load();
701 let inner = match inner_guard.as_ref() {
702 Some(inner) => inner,
703 None => {
704 error!("Backend connection is not available, cannot forward progress notification");
705 return;
706 }
707 };
708
709 if inner.peer.is_transport_closed() {
711 error!("Backend transport is closed, cannot forward progress notification");
712 return;
713 }
714
715 match inner.peer.notify_progress(notification).await {
716 Ok(_) => {
717 debug!("Proxying progress notification");
718 }
719 Err(err) => {
720 error!("Error notifying progress: {:?}", err);
721 }
722 }
723 }
724
725 async fn on_cancelled(
726 &self,
727 notification: rmcp::model::CancelledNotificationParam,
728 _context: NotificationContext<RoleServer>,
729 ) {
730 let inner_guard = self.peer.load();
732 let inner = match inner_guard.as_ref() {
733 Some(inner) => inner,
734 None => {
735 error!(
736 "Backend connection is not available, cannot forward cancelled notification"
737 );
738 return;
739 }
740 };
741
742 if inner.peer.is_transport_closed() {
744 error!("Backend transport is closed, cannot forward cancelled notification");
745 return;
746 }
747
748 match inner.peer.notify_cancelled(notification).await {
749 Ok(_) => {
750 debug!("Proxying cancelled notification");
751 }
752 Err(err) => {
753 error!("Error notifying cancelled: {:?}", err);
754 }
755 }
756 }
757}
758
759impl ProxyHandler {
760 #[inline]
762 fn capabilities(&self) -> &rmcp::model::ServerCapabilities {
763 &self.cached_info.capabilities
764 }
765
766 fn default_server_info(mcp_id: &str) -> ServerInfo {
768 warn!(
769 "[ProxyHandler] Create default ServerInfo - MCP ID: {}",
770 mcp_id
771 );
772 ServerInfo::new(rmcp::model::ServerCapabilities::default())
773 .with_server_info(Implementation::new("MCP Proxy", "0.1.0"))
774 }
775
776 fn extract_server_info(
778 client: &RunningService<RoleClient, ClientInfo>,
779 mcp_id: &str,
780 ) -> ServerInfo {
781 client
782 .peer_info()
783 .map(|peer_info| {
784 ServerInfo::new(peer_info.capabilities.clone())
785 .with_protocol_version(peer_info.protocol_version.clone())
786 .with_server_info(Implementation::new(
787 peer_info.server_info.name.clone(),
788 peer_info.server_info.version.clone(),
789 ))
790 .with_instructions(peer_info.instructions.clone().unwrap_or_default())
791 })
792 .unwrap_or_else(|| Self::default_server_info(mcp_id))
793 }
794
795 pub fn new_disconnected(
798 mcp_id: String,
799 tool_filter: ToolFilter,
800 default_info: ServerInfo,
801 ) -> Self {
802 info!(
803 "[ProxyHandler] Create a disconnected handler - MCP ID: {}",
804 mcp_id
805 );
806
807 if tool_filter.is_enabled() {
809 if let Some(ref allow_list) = tool_filter.allow_tools {
810 info!(
811 "[ProxyHandler] Tool whitelist enabled - MCP ID: {}, allowed tools: {:?}",
812 mcp_id, allow_list
813 );
814 }
815 if let Some(ref deny_list) = tool_filter.deny_tools {
816 info!(
817 "[ProxyHandler] Tool blacklist enabled - MCP ID: {}, excluded tools: {:?}",
818 mcp_id, deny_list
819 );
820 }
821 }
822
823 Self {
824 peer: Arc::new(ArcSwapOption::empty()),
825 cached_info: default_info,
826 mcp_id,
827 tool_filter,
828 backend_version: Arc::new(AtomicU64::new(0)), }
830 }
831
832 pub fn new(client: RunningService<RoleClient, ClientInfo>) -> Self {
833 Self::with_mcp_id(client, "unknown".to_string())
834 }
835
836 pub fn with_mcp_id(client: RunningService<RoleClient, ClientInfo>, mcp_id: String) -> Self {
837 Self::with_tool_filter(client, mcp_id, ToolFilter::default())
838 }
839
840 pub fn with_tool_filter(
842 client: RunningService<RoleClient, ClientInfo>,
843 mcp_id: String,
844 tool_filter: ToolFilter,
845 ) -> Self {
846 use std::ops::Deref;
847
848 let cached_info = Self::extract_server_info(&client, &mcp_id);
850
851 let peer = client.deref().clone();
853
854 if tool_filter.is_enabled() {
856 if let Some(ref allow_list) = tool_filter.allow_tools {
857 info!(
858 "[ProxyHandler] Tool whitelist enabled - MCP ID: {}, allowed tools: {:?}",
859 mcp_id, allow_list
860 );
861 }
862 if let Some(ref deny_list) = tool_filter.deny_tools {
863 info!(
864 "[ProxyHandler] Tool blacklist enabled - MCP ID: {}, excluded tools: {:?}",
865 mcp_id, deny_list
866 );
867 }
868 }
869
870 let inner = PeerInner {
872 peer,
873 _running: Arc::new(client),
874 };
875
876 Self {
877 peer: Arc::new(ArcSwapOption::from(Some(Arc::new(inner)))),
878 cached_info,
879 mcp_id,
880 tool_filter,
881 backend_version: Arc::new(AtomicU64::new(1)), }
883 }
884
885 pub fn swap_backend(&self, new_client: Option<RunningService<RoleClient, ClientInfo>>) {
891 use std::ops::Deref;
892
893 match new_client {
894 Some(client) => {
895 let peer = client.deref().clone();
896 let inner = PeerInner {
897 peer,
898 _running: Arc::new(client),
899 };
900 self.peer.store(Some(Arc::new(inner)));
901 info!(
902 "[ProxyHandler] Backend connection updated - MCP ID: {}",
903 self.mcp_id
904 );
905 }
906 None => {
907 self.peer.store(None);
908 info!(
909 "[ProxyHandler] Backend connection disconnected - MCP ID: {}",
910 self.mcp_id
911 );
912 }
913 }
914
915 let new_version = self.backend_version.fetch_add(1, Ordering::SeqCst) + 1;
917 info!(
918 "[ProxyHandler] Backend version update: {} - MCP ID: {}",
919 new_version, self.mcp_id
920 );
921 }
922
923 pub fn is_backend_available(&self) -> bool {
925 let inner_guard = self.peer.load();
926 match inner_guard.as_ref() {
927 Some(inner) => !inner.peer.is_transport_closed(),
928 None => false,
929 }
930 }
931
932 pub async fn is_mcp_server_ready(&self) -> bool {
934 !self.is_terminated_async().await
935 }
936
937 pub fn is_terminated(&self) -> bool {
939 !self.is_backend_available()
940 }
941
942 pub async fn is_terminated_async(&self) -> bool {
944 let inner_guard = self.peer.load();
946 let inner = match inner_guard.as_ref() {
947 Some(inner) => inner,
948 None => return true,
949 };
950
951 if inner.peer.is_transport_closed() {
953 return true;
954 }
955
956 match inner.peer.list_tools(None).await {
958 Ok(_) => {
959 debug!("Backend connection status check: OK");
960 false
961 }
962 Err(e) => {
963 info!("Backend connection status check: Disconnected, reason: {e}");
964 true
965 }
966 }
967 }
968
969 pub fn mcp_id(&self) -> &str {
971 &self.mcp_id
972 }
973
974 pub fn get_backend_version(&self) -> u64 {
982 self.backend_version.load(Ordering::SeqCst)
983 }
984
985 pub fn swap_backend_from_connection(
994 &self,
995 conn: Option<crate::client::StreamClientConnection>,
996 ) {
997 match conn {
998 Some(c) => {
999 let running = c.into_running_service();
1000 self.swap_backend(Some(running));
1001 }
1002 None => {
1003 self.swap_backend(None);
1004 }
1005 }
1006 }
1007}