1use crate::config::mcp::{
7 McpAllowListConfig, McpClientConfig, McpProviderConfig, McpTransportConfig,
8};
9use anyhow::{Context, Result};
10use async_trait::async_trait;
11use parking_lot::RwLock;
12use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
13use rmcp::{
14 ServiceExt,
15 handler::client::ClientHandler,
16 model::{
17 CallToolRequestParam, CallToolResult, ClientCapabilities, ClientInfo, Implementation,
18 ListToolsResult, LoggingLevel, LoggingMessageNotificationParam, RootsCapabilities,
19 },
20 transport::TokioChildProcess,
21};
22use serde_json::{Map, Value};
23use std::collections::HashMap;
24use std::future;
25use std::sync::Arc;
26use tokio::process::Command;
27use tokio::sync::Mutex;
28use tracing::{Level, debug, error, info, warn};
29
30#[derive(Clone)]
31struct LoggingClientHandler {
32 provider_name: String,
33 info: ClientInfo,
34}
35
36impl LoggingClientHandler {
37 fn new(provider_name: &str) -> Self {
38 let mut info = ClientInfo::default();
39 info.capabilities = ClientCapabilities {
40 roots: Some(RootsCapabilities {
41 list_changed: Some(true),
42 }),
43 ..ClientCapabilities::default()
44 };
45 info.client_info = Implementation {
46 name: "vtcode".to_string(),
47 title: Some("VT Code MCP client".to_string()),
48 version: env!("CARGO_PKG_VERSION").to_string(),
49 icons: None,
50 website_url: Some("https://github.com/modelcontextprotocol".to_string()),
51 };
52
53 Self {
54 provider_name: provider_name.to_string(),
55 info,
56 }
57 }
58
59 fn handle_logging(&self, params: LoggingMessageNotificationParam) {
60 let payload = params.data;
61 let summary = payload
62 .get("message")
63 .and_then(Value::as_str)
64 .map(str::to_owned)
65 .unwrap_or_else(|| payload.to_string());
66
67 match params.level {
68 LoggingLevel::Debug => debug!(
69 provider = self.provider_name.as_str(),
70 summary = %summary,
71 payload = ?payload,
72 "MCP provider log"
73 ),
74 LoggingLevel::Info | LoggingLevel::Notice => info!(
75 provider = self.provider_name.as_str(),
76 summary = %summary,
77 payload = ?payload,
78 "MCP provider log"
79 ),
80 LoggingLevel::Warning => warn!(
81 provider = self.provider_name.as_str(),
82 summary = %summary,
83 payload = ?payload,
84 "MCP provider warning"
85 ),
86 LoggingLevel::Error
87 | LoggingLevel::Critical
88 | LoggingLevel::Alert
89 | LoggingLevel::Emergency => error!(
90 provider = self.provider_name.as_str(),
91 summary = %summary,
92 payload = ?payload,
93 "MCP provider error"
94 ),
95 }
96 }
97}
98
99impl ClientHandler for LoggingClientHandler {
100 fn on_logging_message(
101 &self,
102 params: LoggingMessageNotificationParam,
103 _context: rmcp::service::NotificationContext<rmcp::service::RoleClient>,
104 ) -> impl std::future::Future<Output = ()> + Send + '_ {
105 self.handle_logging(params);
106 future::ready(())
107 }
108
109 fn get_info(&self) -> ClientInfo {
110 self.info.clone()
111 }
112}
113
114pub struct McpClient {
116 config: McpClientConfig,
117 pub providers: HashMap<String, Arc<McpProvider>>,
118 active_connections: Arc<Mutex<HashMap<String, Arc<RunningMcpService>>>>,
119 allowlist: Arc<RwLock<McpAllowListConfig>>,
120 tool_provider_index: Arc<RwLock<HashMap<String, String>>>,
121}
122
123impl McpClient {
124 pub fn new(config: McpClientConfig) -> Self {
126 let allowlist = Arc::new(RwLock::new(config.allowlist.clone()));
127 Self {
128 config,
129 providers: HashMap::new(),
130 active_connections: Arc::new(Mutex::new(HashMap::new())),
131 allowlist,
132 tool_provider_index: Arc::new(RwLock::new(HashMap::new())),
133 }
134 }
135
136 fn record_tool_provider(&self, provider: &str, tool: &str) {
137 debug!("Recording tool '{}' -> provider '{}'", tool, provider);
138 self.tool_provider_index
139 .write()
140 .insert(tool.to_string(), provider.to_string());
141 }
142
143 pub fn provider_for_tool(&self, tool_name: &str) -> Option<String> {
145 let index = self.tool_provider_index.read();
146 if let Some(provider) = index.get(tool_name) {
147 if self.providers.contains_key(provider) {
149 debug!("Found tool '{}' in provider '{}'", tool_name, provider);
150 Some(provider.clone())
151 } else {
152 debug!(
153 "Tool '{}' references non-existent provider '{}'",
154 tool_name, provider
155 );
156 None
157 }
158 } else {
159 debug!("Tool '{}' not found in provider index", tool_name);
160 None
161 }
162 }
163
164 pub fn update_allowlist(&self, allowlist: McpAllowListConfig) {
166 *self.allowlist.write() = allowlist;
167 }
168
169 pub fn current_allowlist(&self) -> McpAllowListConfig {
171 self.allowlist.read().clone()
172 }
173
174 fn format_tool_result(
175 provider_name: &str,
176 tool_name: &str,
177 result: CallToolResult,
178 ) -> Result<Value> {
179 let is_error = result.is_error.unwrap_or(false);
180 let text_summary = result
181 .content
182 .iter()
183 .find_map(|content| content.as_text().map(|text| text.text.clone()));
184
185 if is_error {
186 let detail = result
187 .structured_content
188 .as_ref()
189 .and_then(|value| value.get("message").and_then(Value::as_str))
190 .map(str::to_owned)
191 .or_else(|| {
192 result
193 .structured_content
194 .as_ref()
195 .map(|value| value.to_string())
196 })
197 .or(text_summary)
198 .unwrap_or_else(|| "Unknown MCP tool error".to_string());
199
200 return Err(anyhow::anyhow!(
201 "MCP tool '{}' on provider '{}' reported an error: {}",
202 tool_name,
203 provider_name,
204 detail
205 ));
206 }
207
208 let mut payload = Map::new();
209 payload.insert("provider".into(), Value::String(provider_name.to_string()));
210 payload.insert("tool".into(), Value::String(tool_name.to_string()));
211
212 if let Some(meta) = result.meta {
213 if let Ok(meta_value) = serde_json::to_value(&meta) {
214 if !meta_value.is_null() {
215 payload.insert("meta".into(), meta_value);
216 }
217 }
218 }
219
220 if let Some(structured) = result.structured_content {
221 match structured {
222 Value::Object(mut object) => {
223 object
224 .entry("provider")
225 .or_insert_with(|| Value::String(provider_name.to_string()));
226 object
227 .entry("tool")
228 .or_insert_with(|| Value::String(tool_name.to_string()));
229
230 if let Some(meta_value) = payload.remove("meta") {
231 object.entry("meta").or_insert(meta_value);
232 }
233
234 return Ok(Value::Object(object));
235 }
236 other => {
237 payload.insert("structured_content".into(), other);
238 }
239 }
240 }
241
242 if let Some(summary) = text_summary {
243 payload.insert("message".into(), Value::String(summary));
244 }
245
246 if !result.content.is_empty() {
247 if let Ok(content_value) = serde_json::to_value(&result.content) {
248 payload.insert("content".into(), content_value);
249 }
250 }
251
252 Ok(Value::Object(payload))
253 }
254
255 pub async fn initialize(&mut self) -> Result<()> {
257 if !self.config.enabled {
258 info!("MCP client is disabled in configuration");
259 return Ok(());
260 }
261
262 info!(
263 "Initializing MCP client with {} configured providers",
264 self.config.providers.len()
265 );
266
267 for provider_config in &self.config.providers {
268 if provider_config.enabled {
269 info!("Initializing MCP provider '{}'", provider_config.name);
270
271 match McpProvider::new(provider_config.clone()).await {
272 Ok(provider) => {
273 let provider = Arc::new(provider);
274 self.providers
275 .insert(provider_config.name.clone(), provider);
276 info!(
277 "Successfully initialized MCP provider '{}'",
278 provider_config.name
279 );
280 self.audit_log(
281 Some(provider_config.name.as_str()),
282 "mcp.provider_initialized",
283 Level::INFO,
284 format!("Provider '{}' initialized", provider_config.name),
285 );
286 }
287 Err(e) => {
288 error!(
289 "Failed to initialize MCP provider '{}': {}",
290 provider_config.name, e
291 );
292 self.audit_log(
293 Some(provider_config.name.as_str()),
294 "mcp.provider_initialization_failed",
295 Level::WARN,
296 format!(
297 "Failed to initialize provider '{}' due to error: {}",
298 provider_config.name, e
299 ),
300 );
301 continue;
303 }
304 }
305 } else {
306 debug!(
307 "MCP provider '{}' is disabled, skipping",
308 provider_config.name
309 );
310 }
311 }
312
313 info!(
314 "MCP client initialization complete. Active providers: {}",
315 self.providers.len()
316 );
317
318 Ok(())
323 }
324
325 async fn kill_remaining_mcp_processes(&self) {
327 debug!("Checking for remaining MCP provider processes to clean up");
328
329 let process_cleanup_attempts = tokio::time::timeout(
332 tokio::time::Duration::from_secs(5),
333 self.attempt_process_cleanup(),
334 )
335 .await;
336
337 match process_cleanup_attempts {
338 Ok(Ok(cleaned_count)) => {
339 if cleaned_count > 0 {
340 info!(
341 "Cleaned up {} remaining MCP provider processes",
342 cleaned_count
343 );
344 self.audit_log(
345 None,
346 "mcp.process_cleanup",
347 Level::INFO,
348 format!(
349 "Cleaned up {} remaining MCP provider processes",
350 cleaned_count
351 ),
352 );
353 } else {
354 debug!("No remaining MCP provider processes to clean up");
355 }
356 }
357 Ok(Err(e)) => {
358 warn!("Error during MCP process cleanup (non-critical): {}", e);
359 self.audit_log(
360 None,
361 "mcp.process_cleanup_error",
362 Level::WARN,
363 format!("Error during MCP process cleanup: {}", e),
364 );
365 }
366 Err(_) => {
367 warn!("MCP process cleanup timed out (non-critical)");
368 self.audit_log(
369 None,
370 "mcp.process_cleanup_timeout",
371 Level::WARN,
372 "MCP process cleanup timed out".to_string(),
373 );
374 }
375 }
376 }
377
378 async fn attempt_process_cleanup(&self) -> Result<usize> {
380 use tokio::process::Command as TokioCommand;
381
382 let mut cleaned_count = 0;
383
384 let current_pid = std::process::id();
386
387 for provider_config in &self.config.providers {
390 if !provider_config.enabled {
391 continue;
392 }
393
394 let provider_name = &provider_config.name;
395 debug!("Attempting cleanup for MCP provider '{}'", provider_name);
396
397 let mut provider_cleaned = 0;
399
400 if let Ok(output) = TokioCommand::new("pgrep")
402 .args(["-f", &format!("mcp-server-{}", provider_name)])
403 .output()
404 .await
405 {
406 if output.status.success() {
407 let pids = String::from_utf8_lossy(&output.stdout);
408 for pid_str in pids.lines() {
409 if let Ok(pid) = pid_str.trim().parse::<u32>() {
410 if pid != current_pid && pid > 0 {
411 if self.kill_process_gracefully(pid).await {
412 provider_cleaned += 1;
413 }
414 }
415 }
416 }
417 }
418 }
419
420 if provider_cleaned == 0 {
422 if let Ok(output) = TokioCommand::new("ps").args(["aux"]).output().await {
423 if output.status.success() {
424 let processes = String::from_utf8_lossy(&output.stdout);
425 for line in processes.lines() {
426 if line.contains(provider_name)
428 && (line.contains("mcp")
429 || line.contains("node")
430 || line.contains("python"))
431 {
432 let parts: Vec<&str> = line.split_whitespace().collect();
434 if let Some(pid_str) = parts.first() {
435 if let Ok(pid) = pid_str.parse::<u32>() {
436 if pid != current_pid && pid > 0 {
437 if self.kill_process_gracefully(pid).await {
438 provider_cleaned += 1;
439 }
440 }
441 }
442 }
443 }
444 }
445 }
446 }
447 }
448
449 if provider_cleaned > 0 {
450 debug!(
451 "Cleaned up {} processes for MCP provider '{}'",
452 provider_cleaned, provider_name
453 );
454 cleaned_count += provider_cleaned;
455 self.tool_provider_index.write().clear();
457 }
458 }
459
460 Ok(cleaned_count)
461 }
462
463 async fn kill_process_gracefully(&self, pid: u32) -> bool {
465 debug!("Killing process {} gracefully", pid);
466
467 let _ = tokio::process::Command::new("kill")
469 .args(["-TERM", &pid.to_string()])
470 .output()
471 .await;
472
473 tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
475
476 if let Ok(output) = tokio::process::Command::new("kill")
478 .args(["-0", &pid.to_string()]) .output()
480 .await
481 {
482 if output.status.success() {
483 debug!("Process {} still running, force killing", pid);
485 let _ = tokio::process::Command::new("kill")
486 .args(["-KILL", &pid.to_string()])
487 .output()
488 .await;
489 true
490 } else {
491 debug!("Process {} already terminated", pid);
493 true
494 }
495 } else {
496 debug!("Process {} check failed, assuming terminated", pid);
498 true
499 }
500 }
501
502 pub async fn cleanup_dead_providers(&self) -> Result<()> {
504 let mut dead_providers = Vec::new();
505
506 for (provider_name, provider) in &self.providers {
507 let provider_health_check = tokio::time::timeout(
509 tokio::time::Duration::from_secs(2),
510 provider.has_tool("ping"),
511 )
512 .await;
513
514 match provider_health_check {
515 Ok(Ok(_)) => {
516 debug!("MCP provider '{}' is healthy", provider_name);
518 }
519 Ok(Err(e)) => {
520 let error_msg = e.to_string();
521 if error_msg.contains("No such process") || error_msg.contains("ESRCH") {
522 warn!(
523 "MCP provider '{}' has terminated process, marking for cleanup",
524 provider_name
525 );
526 dead_providers.push(provider_name.clone());
527 } else {
528 debug!(
529 "MCP provider '{}' returned error but process may be alive: {}",
530 provider_name, e
531 );
532 }
533 }
534 Err(_timeout) => {
535 warn!(
536 "MCP provider '{}' health check timed out, may be unresponsive",
537 provider_name
538 );
539 }
541 }
542 }
543
544 if !dead_providers.is_empty() {
547 warn!(
548 "Found {} dead MCP providers: {:?}",
549 dead_providers.len(),
550 dead_providers
551 );
552 }
553
554 Ok(())
555 }
556
557 pub async fn list_tools(&self) -> Result<Vec<McpToolInfo>> {
559 if !self.config.enabled {
560 debug!("MCP client is disabled, returning empty tool list");
561 return Ok(Vec::new());
562 }
563
564 if self.providers.is_empty() {
565 debug!("No MCP providers configured, returning empty tool list");
566 return Ok(Vec::new());
567 }
568
569 let mut all_tools = Vec::new();
570 let mut errors = Vec::new();
571
572 let allowlist_snapshot = self.allowlist.read().clone();
573
574 for (provider_name, provider) in &self.providers {
575 let provider_id = provider_name.as_str();
576 match tokio::time::timeout(tokio::time::Duration::from_secs(15), provider.list_tools())
577 .await
578 {
579 Ok(Ok(tools)) => {
580 debug!(
581 "Provider '{}' has {} tools",
582 provider_name,
583 tools.tools.len()
584 );
585
586 for tool in tools.tools {
587 let tool_name = tool.name.as_ref();
588
589 if allowlist_snapshot.is_tool_allowed(provider_id, tool_name) {
590 self.record_tool_provider(provider_id, tool_name);
591 all_tools.push(McpToolInfo {
592 name: tool_name.to_string(),
593 description: tool.description.unwrap_or_default().to_string(),
594 provider: provider_name.clone(),
595 input_schema: serde_json::to_value(&*tool.input_schema)
596 .unwrap_or(Value::Null),
597 });
598 } else {
599 self.audit_log(
600 Some(provider_id),
601 "mcp.tool_filtered",
602 Level::DEBUG,
603 format!(
604 "Filtered tool '{}' from provider '{}' due to allow list",
605 tool_name, provider_id
606 ),
607 );
608 }
609 }
610 }
611 Ok(Err(e)) => {
612 let error_msg = e.to_string();
613 if error_msg.contains("No such process")
614 || error_msg.contains("ESRCH")
615 || error_msg.contains("EPIPE")
616 || error_msg.contains("Broken pipe")
617 || error_msg.contains("write EPIPE")
618 {
619 debug!(
620 "MCP provider '{}' process/pipe terminated during tool listing (normal during shutdown): {}",
621 provider_name, e
622 );
623 } else {
624 warn!(
625 "Failed to list tools for provider '{}': {}",
626 provider_name, e
627 );
628 }
629 let error_msg = format!(
630 "Failed to list tools for provider '{}': {}",
631 provider_name, e
632 );
633 errors.push(error_msg);
634 }
635 Err(_timeout) => {
636 warn!("MCP provider '{}' tool listing timed out", provider_name);
637 let error_msg =
638 format!("Tool listing timeout for provider '{}'", provider_name);
639 errors.push(error_msg);
640 }
641 }
642 }
643
644 if !errors.is_empty() {
645 warn!(
646 "Encountered {} errors while listing MCP tools: {:?}",
647 errors.len(),
648 errors
649 );
650 }
651
652 info!(
653 "Found {} total MCP tools across all providers",
654 all_tools.len()
655 );
656 Ok(all_tools)
657 }
658
659 pub async fn execute_tool(&self, tool_name: &str, args: Value) -> Result<Value> {
661 if !self.config.enabled {
662 return Err(anyhow::anyhow!("MCP client is disabled"));
663 }
664
665 if self.providers.is_empty() {
666 return Err(anyhow::anyhow!("No MCP providers configured"));
667 }
668
669 let tool_name_owned = tool_name.to_string();
670 debug!("Executing MCP tool '{}' with args: {}", tool_name, args);
671
672 let provider_name = {
674 let mut found_provider = None;
675 let mut provider_errors = Vec::new();
676
677 for (name, provider) in &self.providers {
678 match provider.has_tool(&tool_name_owned).await {
679 Ok(true) => {
680 found_provider = Some(name.clone());
681 break;
682 }
683 Ok(false) => continue,
684 Err(e) => {
685 let error_msg = format!(
686 "Error checking tool availability for provider '{}': {}",
687 name, e
688 );
689 warn!("{}", error_msg);
690 provider_errors.push(error_msg);
691 }
692 }
693 }
694
695 found_provider.ok_or_else(|| {
696 let error_msg = format!(
697 "Tool '{}' not found in any MCP provider. Provider errors: {:?}",
698 tool_name, provider_errors
699 );
700 anyhow::anyhow!(error_msg)
701 })?
702 };
703
704 debug!("Found tool '{}' in provider '{}'", tool_name, provider_name);
705
706 if !self
707 .allowlist
708 .read()
709 .is_tool_allowed(provider_name.as_str(), tool_name)
710 {
711 let message = format!(
712 "Tool '{}' from provider '{}' is not permitted by the MCP allow list",
713 tool_name, provider_name
714 );
715 self.audit_log(
716 Some(provider_name.as_str()),
717 "mcp.tool_denied",
718 Level::WARN,
719 message.as_str(),
720 );
721 return Err(anyhow::anyhow!(message));
722 }
723
724 self.record_tool_provider(provider_name.as_str(), tool_name);
725
726 let provider = self.providers.get(&provider_name).ok_or_else(|| {
727 anyhow::anyhow!("Provider '{}' not found after discovery", provider_name)
728 })?;
729
730 let connection = match self.get_or_create_connection(provider).await {
732 Ok(conn) => conn,
733 Err(e) => {
734 error!(
735 "Failed to establish connection to provider '{}': {}",
736 provider_name, e
737 );
738 return Err(e);
739 }
740 };
741
742 match connection
744 .call_tool(CallToolRequestParam {
745 name: tool_name_owned.into(),
746 arguments: args.as_object().cloned(),
747 })
748 .await
749 {
750 Ok(result) => match Self::format_tool_result(provider_name.as_str(), tool_name, result)
751 {
752 Ok(serialized) => {
753 info!(
754 "Successfully executed MCP tool '{}' via provider '{}'",
755 tool_name, provider_name
756 );
757 self.audit_log(
758 Some(provider_name.as_str()),
759 "mcp.tool_execution",
760 Level::INFO,
761 format!(
762 "Successfully executed MCP tool '{}' via provider '{}'",
763 tool_name, provider_name
764 ),
765 );
766 Ok(serialized)
767 }
768 Err(err) => {
769 let err_message = err.to_string();
770 warn!(
771 "MCP tool '{}' via provider '{}' returned an error payload: {}",
772 tool_name, provider_name, err_message
773 );
774 self.audit_log(
775 Some(provider_name.as_str()),
776 "mcp.tool_failed",
777 Level::WARN,
778 format!(
779 "MCP tool '{}' via provider '{}' returned an error payload: {}",
780 tool_name, provider_name, err_message
781 ),
782 );
783 Err(err)
784 }
785 },
786 Err(e) => {
787 let error_message = e.to_string();
788
789 error!(
790 "MCP tool '{}' failed on provider '{}': {}",
791 tool_name, provider_name, error_message
792 );
793 self.audit_log(
794 Some(provider_name.as_str()),
795 "mcp.tool_failed",
796 Level::WARN,
797 format!(
798 "MCP tool '{}' failed on provider '{}': {}",
799 tool_name, provider_name, error_message
800 ),
801 );
802
803 if error_message.contains("EPIPE")
805 || error_message.contains("Broken pipe")
806 || error_message.contains("write EPIPE")
807 || error_message.contains("No such process")
808 || error_message.contains("ESRCH")
809 {
810 let mut connections = self.active_connections.lock().await;
812 connections.remove(&provider_name);
813 self.tool_provider_index
815 .write()
816 .retain(|_, provider| provider != &provider_name);
817
818 return Err(anyhow::anyhow!(
819 "MCP provider '{}' disconnected unexpectedly while executing '{}'. The provider process may have terminated. Try re-running the command to restart the provider.",
820 provider_name,
821 tool_name
822 ));
823 } else if error_message.contains("timeout") || error_message.contains("Timeout") {
824 let mut connections = self.active_connections.lock().await;
826 connections.remove(&provider_name);
827
828 return Err(anyhow::anyhow!(
829 "MCP tool '{}' execution timed out on provider '{}'. The provider may be unresponsive. Try re-running the command.",
830 tool_name,
831 provider_name
832 ));
833 } else if error_message.contains("permission")
834 || error_message.contains("Permission denied")
835 {
836 return Err(anyhow::anyhow!(
837 "Permission denied executing MCP tool '{}' on provider '{}': {}",
838 tool_name,
839 provider_name,
840 error_message
841 ));
842 } else if error_message.contains("network")
843 || error_message.contains("Connection refused")
844 {
845 return Err(anyhow::anyhow!(
846 "Network error executing MCP tool '{}' on provider '{}': {}",
847 tool_name,
848 provider_name,
849 error_message
850 ));
851 }
852
853 Err(anyhow::anyhow!(
854 "MCP tool execution failed: {}",
855 error_message
856 ))
857 }
858 }
859 }
860
861 async fn get_or_create_connection(
863 &self,
864 provider: &McpProvider,
865 ) -> Result<Arc<RunningMcpService>> {
866 let provider_name = &provider.config.name;
867 debug!("Getting connection for MCP provider '{}'", provider_name);
868
869 let mut connections = self.active_connections.lock().await;
870
871 if !connections.contains_key(provider_name) {
872 debug!("Creating new connection for provider '{}'", provider_name);
873
874 match tokio::time::timeout(tokio::time::Duration::from_secs(30), provider.connect())
876 .await
877 {
878 Ok(Ok(connection)) => {
879 let connection = Arc::new(connection);
880 connections.insert(provider_name.clone(), Arc::clone(&connection));
881 debug!(
882 "Successfully created connection for provider '{}'",
883 provider_name
884 );
885 Ok(connection)
886 }
887 Ok(Err(e)) => {
888 let error_msg = e.to_string();
889 if error_msg.contains("HTTP MCP server support") {
890 warn!(
891 "Provider '{}' uses HTTP transport which is not fully implemented: {}",
892 provider_name, e
893 );
894 Err(anyhow::anyhow!(
895 "HTTP MCP transport not fully implemented for provider '{}'. Consider using stdio transport instead.",
896 provider_name
897 ))
898 } else if error_msg.contains("command not found")
899 || error_msg.contains("No such file")
900 {
901 error!("Command not found for provider '{}': {}", provider_name, e);
902 Err(anyhow::anyhow!(
903 "Command '{}' not found for MCP provider '{}'. Please ensure the MCP server is installed and accessible.",
904 self.config
905 .providers
906 .iter()
907 .find(|p| p.name == *provider_name)
908 .map(|p| match &p.transport {
909 McpTransportConfig::Stdio(stdio) => stdio.command.as_str(),
910 _ => "unknown",
911 })
912 .unwrap_or("unknown"),
913 provider_name
914 ))
915 } else if error_msg.contains("permission")
916 || error_msg.contains("Permission denied")
917 {
918 error!(
919 "Permission denied creating connection for provider '{}': {}",
920 provider_name, e
921 );
922 Err(anyhow::anyhow!(
923 "Permission denied executing MCP server for provider '{}': {}",
924 provider_name,
925 error_msg
926 ))
927 } else {
928 error!(
929 "Failed to create connection for provider '{}': {}",
930 provider_name, e
931 );
932 Err(anyhow::anyhow!(
933 "Failed to create connection for MCP provider '{}': {}",
934 provider_name,
935 error_msg
936 ))
937 }
938 }
939 Err(_timeout) => {
940 error!(
941 "Connection creation timed out for provider '{}' after {} seconds",
942 provider_name, 30
943 );
944 Err(anyhow::anyhow!(
945 "Connection creation timed out for MCP provider '{}' after {} seconds. The provider may be slow to start or unresponsive.",
946 provider_name,
947 30
948 ))
949 }
950 }
951 } else {
952 let existing_connection = connections.get(provider_name).unwrap().clone();
954
955 if let Err(e) = self
957 .validate_connection(provider_name, &existing_connection)
958 .await
959 {
960 debug!(
961 "Existing connection for provider '{}' is unhealthy, creating new one: {}",
962 provider_name, e
963 );
964
965 connections.remove(provider_name);
967
968 match tokio::time::timeout(tokio::time::Duration::from_secs(30), provider.connect())
970 .await
971 {
972 Ok(Ok(connection)) => {
973 let connection = Arc::new(connection);
974 connections.insert(provider_name.clone(), Arc::clone(&connection));
975 debug!(
976 "Successfully created new connection for provider '{}'",
977 provider_name
978 );
979 Ok(connection)
980 }
981 Ok(Err(e)) => {
982 error!(
983 "Failed to create replacement connection for provider '{}': {}",
984 provider_name, e
985 );
986 Err(e)
987 }
988 Err(_timeout) => {
989 error!(
990 "Replacement connection creation timed out for provider '{}'",
991 provider_name
992 );
993 Err(anyhow::anyhow!(
994 "Replacement connection timeout for provider '{}'",
995 provider_name
996 ))
997 }
998 }
999 } else {
1000 debug!(
1001 "Reusing existing healthy connection for provider '{}'",
1002 provider_name
1003 );
1004 Ok(existing_connection)
1005 }
1006 }
1007 }
1008
1009 async fn validate_connection(
1011 &self,
1012 provider_name: &str,
1013 connection: &RunningMcpService,
1014 ) -> Result<()> {
1015 debug!(
1016 "Validating connection health for provider '{}'",
1017 provider_name
1018 );
1019
1020 match tokio::time::timeout(
1023 tokio::time::Duration::from_secs(2),
1024 connection.list_tools(Default::default()),
1025 )
1026 .await
1027 {
1028 Ok(Ok(_)) => {
1029 debug!(
1030 "Connection health check passed for provider '{}'",
1031 provider_name
1032 );
1033 Ok(())
1034 }
1035 Ok(Err(e)) => {
1036 let error_msg = e.to_string();
1037 debug!(
1038 "Connection health check failed for provider '{}': {}",
1039 provider_name, error_msg
1040 );
1041 Err(anyhow::anyhow!(
1042 "Connection health check failed for provider '{}': {}",
1043 provider_name,
1044 error_msg
1045 ))
1046 }
1047 Err(_) => {
1048 debug!(
1049 "Connection health check timed out for provider '{}'",
1050 provider_name
1051 );
1052 Err(anyhow::anyhow!(
1053 "Connection health check timed out for provider '{}'",
1054 provider_name
1055 ))
1056 }
1057 }
1058 }
1059
1060 fn audit_log(
1061 &self,
1062 provider: Option<&str>,
1063 channel: &str,
1064 level: Level,
1065 message: impl AsRef<str>,
1066 ) {
1067 let logging_allowed = {
1068 let allowlist = self.allowlist.read();
1069 allowlist.is_logging_channel_allowed(provider, channel)
1070 };
1071
1072 if !logging_allowed {
1073 return;
1074 }
1075
1076 let msg = message.as_ref();
1077 match level {
1078 Level::ERROR => error!(target: "mcp", "[{}] {}", channel, msg),
1079 Level::WARN => warn!(target: "mcp", "[{}] {}", channel, msg),
1080 Level::INFO => info!(target: "mcp", "[{}] {}", channel, msg),
1081 Level::DEBUG => debug!(target: "mcp", "[{}] {}", channel, msg),
1082 _ => debug!(target: "mcp", "[{}] {}", channel, msg),
1083 }
1084 }
1085
1086 pub async fn shutdown(&self) -> Result<()> {
1088 info!("Shutting down MCP client and all provider connections");
1089
1090 let mut connections = self.active_connections.lock().await;
1091
1092 if connections.is_empty() {
1093 info!("No active MCP connections to shutdown");
1094 return Ok(());
1095 }
1096
1097 info!(
1098 "Shutting down {} MCP provider connections",
1099 connections.len()
1100 );
1101
1102 let cancellation_tokens: Vec<(String, rmcp::service::RunningServiceCancellationToken)> =
1103 connections
1104 .iter()
1105 .map(|(provider_name, connection)| {
1106 debug!(
1107 "Initiating graceful shutdown for MCP provider '{}'",
1108 provider_name
1109 );
1110 (provider_name.clone(), connection.cancellation_token())
1111 })
1112 .collect();
1113
1114 for (provider_name, token) in cancellation_tokens {
1115 debug!(
1116 "Cancelling MCP provider '{}' via cancellation token",
1117 provider_name
1118 );
1119 token.cancel();
1120 }
1121
1122 let shutdown_timeout = tokio::time::Duration::from_secs(5);
1124 let shutdown_start = std::time::Instant::now();
1125
1126 while shutdown_start.elapsed() < shutdown_timeout && !connections.is_empty() {
1128 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1129
1130 connections.retain(|_, connection| {
1132 Arc::strong_count(connection) > 1 });
1135 }
1136
1137 let remaining_count = connections.len();
1139 if remaining_count > 0 {
1140 warn!(
1141 "{} MCP provider connections did not shutdown gracefully within timeout, forcing shutdown",
1142 remaining_count
1143 );
1144 }
1145
1146 let drained_connections: Vec<_> = connections.drain().collect();
1148 drop(connections);
1149
1150 for (provider_name, connection) in drained_connections {
1151 debug!("Force shutting down MCP provider '{}'", provider_name);
1152
1153 if let Ok(connection) = Arc::try_unwrap(connection) {
1154 debug!(
1155 "Awaiting MCP provider '{}' task cancellation after graceful request",
1156 provider_name
1157 );
1158
1159 match connection.cancel().await {
1160 Ok(quit_reason) => {
1161 debug!(
1162 "MCP provider '{}' cancellation completed with reason: {:?}",
1163 provider_name, quit_reason
1164 );
1165 }
1166 Err(err) => {
1167 debug!(
1168 "MCP provider '{}' cancellation join error (non-critical): {}",
1169 provider_name, err
1170 );
1171 }
1172 }
1173 } else {
1174 debug!(
1175 "Additional references exist for MCP provider '{}'; dropping without awaiting",
1176 provider_name
1177 );
1178 }
1179 }
1180
1181 tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
1183
1184 self.kill_remaining_mcp_processes().await;
1187
1188 info!("MCP client shutdown complete");
1189 Ok(())
1190 }
1191}
1192
1193#[derive(Debug, Clone)]
1195pub struct McpToolInfo {
1196 pub name: String,
1197 pub description: String,
1198 pub provider: String,
1199 pub input_schema: Value,
1200}
1201
1202pub struct McpProvider {
1204 config: McpProviderConfig,
1205 tools_cache: Arc<Mutex<Option<ListToolsResult>>>,
1206}
1207
1208impl McpProvider {
1209 pub async fn new(config: McpProviderConfig) -> Result<Self> {
1211 Ok(Self {
1212 config,
1213 tools_cache: Arc::new(Mutex::new(None)),
1214 })
1215 }
1216
1217 pub async fn list_tools(&self) -> Result<ListToolsResult> {
1219 let provider_name = &self.config.name;
1220 debug!("Listing tools for MCP provider '{}'", provider_name);
1221
1222 {
1224 let cache = self.tools_cache.lock().await;
1225 if let Some(cached) = cache.as_ref() {
1226 debug!("Using cached tools for provider '{}'", provider_name);
1227 return Ok(cached.clone());
1228 }
1229 }
1230
1231 debug!("Connecting to provider '{}' to fetch tools", provider_name);
1232
1233 match self.connect().await {
1235 Ok(connection) => {
1236 match connection.list_tools(Default::default()).await {
1237 Ok(tools) => {
1238 debug!(
1239 "Found {} tools for provider '{}'",
1240 tools.tools.len(),
1241 provider_name
1242 );
1243
1244 {
1246 let mut cache = self.tools_cache.lock().await;
1247 *cache = Some(tools.clone());
1248 }
1249
1250 Ok(tools)
1251 }
1252 Err(e) => {
1253 error!(
1254 "Failed to list tools for provider '{}': {}",
1255 provider_name, e
1256 );
1257 Err(anyhow::anyhow!("Failed to list tools: {}", e))
1258 }
1259 }
1260 }
1261 Err(e) => {
1262 error!("Failed to connect to provider '{}': {}", provider_name, e);
1263 Err(e)
1264 }
1265 }
1266 }
1267
1268 pub async fn has_tool(&self, tool_name: &str) -> Result<bool> {
1270 let provider_name = &self.config.name;
1271 debug!(
1272 "Checking if provider '{}' has tool '{}'",
1273 provider_name, tool_name
1274 );
1275
1276 match tokio::time::timeout(tokio::time::Duration::from_secs(10), self.list_tools()).await {
1277 Ok(Ok(tools)) => {
1278 let has_tool = tools.tools.iter().any(|tool| tool.name == tool_name);
1279 debug!(
1280 "Provider '{}' {} tool '{}'",
1281 provider_name,
1282 if has_tool { "has" } else { "does not have" },
1283 tool_name
1284 );
1285 Ok(has_tool)
1286 }
1287 Ok(Err(e)) => {
1288 let error_msg = e.to_string();
1289 if error_msg.contains("No such process")
1290 || error_msg.contains("ESRCH")
1291 || error_msg.contains("EPIPE")
1292 || error_msg.contains("Broken pipe")
1293 || error_msg.contains("write EPIPE")
1294 {
1295 debug!(
1296 "MCP provider '{}' process/pipe terminated during tool check (normal during shutdown): {}",
1297 provider_name, e
1298 );
1299 } else {
1300 warn!(
1301 "Failed to check tool availability for provider '{}': {}",
1302 provider_name, e
1303 );
1304 }
1305 Err(e)
1306 }
1307 Err(_timeout) => {
1308 warn!("MCP provider '{}' tool check timed out", provider_name);
1309 Err(anyhow::anyhow!("Tool availability check timeout"))
1310 }
1311 }
1312 }
1313
1314 async fn connect(&self) -> Result<RunningMcpService> {
1316 let provider_name = &self.config.name;
1317 info!("Connecting to MCP provider '{}'", provider_name);
1318
1319 match &self.config.transport {
1320 McpTransportConfig::Stdio(stdio_config) => {
1321 debug!("Using stdio transport for provider '{}'", provider_name);
1322 self.connect_stdio(stdio_config).await
1323 }
1324 McpTransportConfig::Http(http_config) => {
1325 debug!("Using HTTP transport for provider '{}'", provider_name);
1326 self.connect_http(http_config).await
1327 }
1328 }
1329 }
1330
1331 async fn connect_http(
1333 &self,
1334 config: &crate::config::mcp::McpHttpServerConfig,
1335 ) -> Result<RunningMcpService> {
1336 let provider_name = &self.config.name;
1337 debug!(
1338 "Setting up HTTP connection for provider '{}'",
1339 provider_name
1340 );
1341
1342 let mut headers = HeaderMap::new();
1344 headers.insert("Content-Type", "application/json".parse().unwrap());
1345
1346 if let Some(api_key_env) = &config.api_key_env {
1348 if let Ok(api_key) = std::env::var(api_key_env) {
1349 headers.insert(
1350 "Authorization",
1351 format!("Bearer {}", api_key).parse().unwrap(),
1352 );
1353 } else {
1354 warn!(
1355 "API key environment variable '{}' not found for provider '{}'",
1356 api_key_env, provider_name
1357 );
1358 }
1359 }
1360
1361 for (key, value) in &config.headers {
1363 if let (Ok(header_name), Ok(header_value)) =
1364 (key.parse::<HeaderName>(), value.parse::<HeaderValue>())
1365 {
1366 headers.insert(header_name, header_value);
1367 }
1368 }
1369
1370 let client = reqwest::Client::builder()
1371 .default_headers(headers)
1372 .timeout(std::time::Duration::from_secs(30))
1373 .build()
1374 .context("Failed to build HTTP client")?;
1375
1376 debug!(
1378 "Testing HTTP MCP server connectivity at '{}'",
1379 config.endpoint
1380 );
1381
1382 match client.get(&config.endpoint).send().await {
1383 Ok(response) => {
1384 let status = response.status();
1385 if status.is_success() {
1386 debug!(
1387 "HTTP MCP server at '{}' is reachable (status: {})",
1388 config.endpoint, status
1389 );
1390
1391 let mcp_endpoint = if config.endpoint.ends_with('/') {
1394 format!("{}mcp", config.endpoint)
1395 } else {
1396 format!("{}/mcp", config.endpoint)
1397 };
1398
1399 debug!("Attempting to connect to MCP endpoint: {}", mcp_endpoint);
1400
1401 match client.get(&mcp_endpoint).send().await {
1403 Ok(mcp_response) => {
1404 if mcp_response.status().is_success() {
1405 debug!(
1406 "MCP endpoint '{}' is available (status: {})",
1407 mcp_endpoint,
1408 mcp_response.status()
1409 );
1410
1411 Err(anyhow::anyhow!(
1414 "HTTP MCP server detected at '{}' but full streamable HTTP implementation is required. \
1415 MCP endpoint is available at '{}'. \
1416 Consider using stdio transport or implement HTTP streaming support with Server-Sent Events.",
1417 config.endpoint,
1418 mcp_endpoint
1419 ))
1420 } else {
1421 debug!(
1422 "MCP endpoint '{}' returned status: {}",
1423 mcp_endpoint,
1424 mcp_response.status()
1425 );
1426 Err(anyhow::anyhow!(
1427 "HTTP MCP server at '{}' does not support MCP protocol. \
1428 Expected MCP endpoint at '{}' but got status: {}. \
1429 Consider using stdio transport instead.",
1430 config.endpoint,
1431 mcp_endpoint,
1432 mcp_response.status()
1433 ))
1434 }
1435 }
1436 Err(e) => {
1437 let error_msg = e.to_string();
1438 debug!(
1439 "Failed to connect to MCP endpoint '{}': {}",
1440 mcp_endpoint, error_msg
1441 );
1442
1443 Err(anyhow::anyhow!(
1444 "HTTP MCP server at '{}' does not properly support MCP protocol. \
1445 Could not connect to MCP endpoint at '{}': {}. \
1446 Consider using stdio transport instead.",
1447 config.endpoint,
1448 mcp_endpoint,
1449 error_msg
1450 ))
1451 }
1452 }
1453 } else {
1454 Err(anyhow::anyhow!(
1455 "HTTP MCP server returned error status: {} at endpoint: {}",
1456 status,
1457 config.endpoint
1458 ))
1459 }
1460 }
1461 Err(e) => {
1462 let error_msg = e.to_string();
1463 if error_msg.contains("dns") || error_msg.contains("Name resolution") {
1464 Err(anyhow::anyhow!(
1465 "HTTP MCP server DNS resolution failed for '{}': {}",
1466 config.endpoint,
1467 e
1468 ))
1469 } else if error_msg.contains("Connection refused") || error_msg.contains("connect")
1470 {
1471 Err(anyhow::anyhow!(
1472 "HTTP MCP server connection failed for '{}': {}",
1473 config.endpoint,
1474 e
1475 ))
1476 } else {
1477 Err(anyhow::anyhow!(
1478 "HTTP MCP server error for '{}': {}",
1479 config.endpoint,
1480 e
1481 ))
1482 }
1483 }
1484 }
1485 }
1486
1487 async fn connect_stdio(
1489 &self,
1490 config: &crate::config::mcp::McpStdioServerConfig,
1491 ) -> Result<RunningMcpService> {
1492 let provider_name = &self.config.name;
1493 debug!(
1494 "Setting up stdio connection for provider '{}'",
1495 provider_name
1496 );
1497
1498 debug!("Command: {} with args: {:?}", config.command, config.args);
1499
1500 let mut command = Command::new(&config.command);
1501 command.args(&config.args);
1502
1503 if let Some(working_dir) = &config.working_directory {
1505 debug!("Using working directory: {}", working_dir);
1506 command.current_dir(working_dir);
1507 }
1508
1509 if !self.config.env.is_empty() {
1511 debug!(
1512 "Setting environment variables for provider '{}'",
1513 provider_name
1514 );
1515 command.envs(&self.config.env);
1516 }
1517
1518 #[cfg(unix)]
1520 {
1521 #[allow(unused_imports)]
1522 use std::os::unix::process::CommandExt;
1523 command.process_group(0);
1524 }
1525
1526 debug!(
1527 "Creating TokioChildProcess for provider '{}'",
1528 provider_name
1529 );
1530
1531 match TokioChildProcess::new(command) {
1532 Ok(child_process) => {
1533 debug!(
1534 "Successfully created child process for provider '{}'",
1535 provider_name
1536 );
1537
1538 let handler = LoggingClientHandler::new(provider_name);
1540
1541 match tokio::time::timeout(
1542 tokio::time::Duration::from_secs(30),
1543 handler.serve(child_process),
1544 )
1545 .await
1546 {
1547 Ok(Ok(connection)) => {
1548 info!(
1549 "Successfully established connection to MCP provider '{}'",
1550 provider_name
1551 );
1552 Ok(connection)
1553 }
1554 Ok(Err(e)) => {
1555 let error_msg = e.to_string();
1557 if error_msg.contains("No such process")
1558 || error_msg.contains("ESRCH")
1559 || error_msg.contains("EPIPE")
1560 || error_msg.contains("Broken pipe")
1561 || error_msg.contains("write EPIPE")
1562 {
1563 debug!(
1564 "MCP provider '{}' pipe/process error during connection (normal during shutdown): {}",
1565 provider_name, e
1566 );
1567 Err(anyhow::anyhow!("MCP provider connection terminated: {}", e))
1568 } else {
1569 error!(
1570 "Failed to establish MCP connection for provider '{}': {}",
1571 provider_name, e
1572 );
1573 Err(anyhow::anyhow!("Failed to serve MCP connection: {}", e))
1574 }
1575 }
1576 Err(_timeout) => {
1577 warn!(
1578 "MCP provider '{}' connection timed out after 30 seconds",
1579 provider_name
1580 );
1581 Err(anyhow::anyhow!("MCP provider connection timeout"))
1582 }
1583 }
1584 }
1585 Err(e) => {
1586 let error_msg = e.to_string();
1588 if error_msg.contains("No such process") || error_msg.contains("ESRCH") {
1589 error!(
1590 "Failed to create child process for provider '{}' - process may have terminated: {}",
1591 provider_name, e
1592 );
1593 } else {
1594 error!(
1595 "Failed to create child process for provider '{}': {}",
1596 provider_name, e
1597 );
1598 }
1599 Err(anyhow::anyhow!("Failed to create child process: {}", e))
1600 }
1601 }
1602 }
1603}
1604
1605type RunningMcpService =
1607 rmcp::service::RunningService<rmcp::service::RoleClient, LoggingClientHandler>;
1608
1609#[derive(Debug, Clone)]
1611pub struct McpClientStatus {
1612 pub enabled: bool,
1613 pub provider_count: usize,
1614 pub active_connections: usize,
1615 pub configured_providers: Vec<String>,
1616}
1617
1618impl McpClient {
1619 pub fn get_status(&self) -> McpClientStatus {
1621 McpClientStatus {
1622 enabled: self.config.enabled,
1623 provider_count: self.providers.len(),
1624 active_connections: self
1625 .active_connections
1626 .try_lock()
1627 .map(|connections| connections.len())
1628 .unwrap_or(0),
1629 configured_providers: self.providers.keys().cloned().collect(),
1630 }
1631 }
1632}
1633
1634#[async_trait]
1636pub trait McpToolExecutor: Send + Sync {
1637 async fn execute_mcp_tool(&self, tool_name: &str, args: Value) -> Result<Value>;
1639
1640 async fn list_mcp_tools(&self) -> Result<Vec<McpToolInfo>>;
1642
1643 async fn has_mcp_tool(&self, tool_name: &str) -> Result<bool>;
1645
1646 fn get_status(&self) -> McpClientStatus;
1648}
1649
1650#[async_trait]
1651impl McpToolExecutor for McpClient {
1652 async fn execute_mcp_tool(&self, tool_name: &str, args: Value) -> Result<Value> {
1653 self.execute_tool(tool_name, args).await
1654 }
1655
1656 async fn list_mcp_tools(&self) -> Result<Vec<McpToolInfo>> {
1657 self.list_tools().await
1658 }
1659
1660 async fn has_mcp_tool(&self, tool_name: &str) -> Result<bool> {
1661 if self.providers.is_empty() {
1662 return Ok(false);
1663 }
1664
1665 let mut provider_errors = Vec::new();
1666
1667 for (provider_name, provider) in &self.providers {
1668 let provider_id = provider_name.as_str();
1669 match provider.has_tool(tool_name).await {
1670 Ok(true) => {
1671 if self
1672 .allowlist
1673 .read()
1674 .is_tool_allowed(provider_id, tool_name)
1675 {
1676 self.record_tool_provider(provider_id, tool_name);
1677 return Ok(true);
1678 }
1679
1680 self.audit_log(
1681 Some(provider_id),
1682 "mcp.tool_denied",
1683 Level::DEBUG,
1684 format!(
1685 "Tool '{}' exists on provider '{}' but is blocked by allow list",
1686 tool_name, provider_id
1687 ),
1688 );
1689 }
1690 Ok(false) => continue,
1691 Err(e) => {
1692 let error_msg = format!("Error checking provider '{}': {}", provider_name, e);
1693 warn!("{}", error_msg);
1694 provider_errors.push(error_msg);
1695 }
1696 }
1697 }
1698
1699 if !provider_errors.is_empty() {
1700 debug!(
1701 "Encountered {} errors while checking tool availability: {:?}",
1702 provider_errors.len(),
1703 provider_errors
1704 );
1705
1706 let summary = provider_errors.join("; ");
1707 return Err(anyhow::anyhow!(
1708 "Failed to confirm MCP tool '{}' availability. {}",
1709 tool_name,
1710 summary
1711 ));
1712 }
1713
1714 Ok(false)
1715 }
1716
1717 fn get_status(&self) -> McpClientStatus {
1718 self.get_status()
1719 }
1720}
1721
1722#[cfg(test)]
1723mod tests {
1724 use super::*;
1725 use crate::config::mcp::{McpStdioServerConfig, McpTransportConfig};
1726 use rmcp::model::{Content, Meta};
1727 use serde_json::json;
1728
1729 #[test]
1730 fn test_mcp_client_creation() {
1731 let config = McpClientConfig::default();
1732 let client = McpClient::new(config);
1733 assert!(!client.config.enabled);
1734 assert!(client.providers.is_empty());
1735 }
1736
1737 #[test]
1738 fn test_mcp_tool_info() {
1739 let tool_info = McpToolInfo {
1740 name: "test_tool".to_string(),
1741 description: "A test tool".to_string(),
1742 provider: "test_provider".to_string(),
1743 input_schema: json!({
1744 "type": "object",
1745 "properties": {
1746 "input": {"type": "string"}
1747 }
1748 }),
1749 };
1750
1751 assert_eq!(tool_info.name, "test_tool");
1752 assert_eq!(tool_info.provider, "test_provider");
1753 }
1754
1755 #[test]
1756 fn test_provider_config() {
1757 let config = McpProviderConfig {
1758 name: "test".to_string(),
1759 transport: McpTransportConfig::Stdio(McpStdioServerConfig {
1760 command: "echo".to_string(),
1761 args: vec!["hello".to_string()],
1762 working_directory: None,
1763 }),
1764 env: HashMap::new(),
1765 enabled: true,
1766 max_concurrent_requests: 3,
1767 };
1768
1769 assert_eq!(config.name, "test");
1770 assert!(config.enabled);
1771 assert_eq!(config.max_concurrent_requests, 3);
1772 }
1773
1774 #[test]
1775 fn test_tool_info_creation() {
1776 let tool_info = McpToolInfo {
1777 name: "test_tool".to_string(),
1778 description: "A test tool".to_string(),
1779 provider: "test_provider".to_string(),
1780 input_schema: serde_json::json!({
1781 "type": "object",
1782 "properties": {
1783 "input": {"type": "string"}
1784 }
1785 }),
1786 };
1787
1788 assert_eq!(tool_info.name, "test_tool");
1789 assert_eq!(tool_info.provider, "test_provider");
1790 }
1791
1792 #[test]
1793 fn test_format_tool_result_success() {
1794 let mut result = CallToolResult::structured(json!({
1795 "value": 42,
1796 "status": "ok"
1797 }));
1798 let mut meta = Meta::new();
1799 meta.insert("query".to_string(), Value::String("tokio".to_string()));
1800 result.meta = Some(meta);
1801
1802 let serialized = McpClient::format_tool_result("test", "demo", result).unwrap();
1803 assert_eq!(
1804 serialized.get("provider").and_then(Value::as_str),
1805 Some("test")
1806 );
1807 assert_eq!(serialized.get("tool").and_then(Value::as_str), Some("demo"));
1808 assert_eq!(serialized.get("status").and_then(Value::as_str), Some("ok"));
1809 assert_eq!(serialized.get("value").and_then(Value::as_i64), Some(42));
1810 assert_eq!(
1811 serialized
1812 .get("meta")
1813 .and_then(Value::as_object)
1814 .and_then(|map| map.get("query"))
1815 .and_then(Value::as_str),
1816 Some("tokio")
1817 );
1818 }
1819
1820 #[test]
1821 fn test_format_tool_result_error_detection() {
1822 let result = CallToolResult::structured_error(json!({
1823 "message": "something went wrong"
1824 }));
1825
1826 let error = McpClient::format_tool_result("test", "demo", result).unwrap_err();
1827 assert!(error.to_string().contains("something went wrong"));
1828 }
1829
1830 #[test]
1831 fn test_format_tool_result_error_from_text_content() {
1832 let result = CallToolResult::error(vec![Content::text("plain failure")]);
1833
1834 let error = McpClient::format_tool_result("test", "demo", result).unwrap_err();
1835 assert!(error.to_string().contains("plain failure"));
1836 }
1837}