1use crate::config::mcp::{
7 McpAllowListConfig, McpClientConfig, McpProviderConfig, McpTransportConfig,
8};
9use anyhow::{Context, Result};
10use async_trait::async_trait;
11use parking_lot::RwLock;
12use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
13use rmcp::{
14 ServiceExt,
15 handler::client::ClientHandler,
16 model::{
17 CallToolRequestParam, CallToolResult, ClientCapabilities, ClientInfo, Implementation,
18 ListToolsResult, LoggingLevel, LoggingMessageNotificationParam, RootsCapabilities,
19 },
20 transport::TokioChildProcess,
21};
22use serde_json::{Map, Value};
23use std::collections::HashMap;
24use std::future;
25use std::sync::Arc;
26use tokio::process::Command;
27use tokio::sync::Mutex;
28use tracing::{Level, debug, error, info, warn};
29
30#[derive(Clone)]
31struct LoggingClientHandler {
32 provider_name: String,
33 info: ClientInfo,
34}
35
36impl LoggingClientHandler {
37 fn new(provider_name: &str) -> Self {
38 let mut info = ClientInfo::default();
39 info.capabilities = ClientCapabilities {
40 roots: Some(RootsCapabilities {
41 list_changed: Some(true),
42 }),
43 ..ClientCapabilities::default()
44 };
45 info.client_info = Implementation {
46 name: "vtcode".to_string(),
47 title: Some("VT Code MCP client".to_string()),
48 version: env!("CARGO_PKG_VERSION").to_string(),
49 icons: None,
50 website_url: Some("https://github.com/modelcontextprotocol".to_string()),
51 };
52
53 Self {
54 provider_name: provider_name.to_string(),
55 info,
56 }
57 }
58
59 fn handle_logging(&self, params: LoggingMessageNotificationParam) {
60 let payload = params.data;
61 let summary = payload
62 .get("message")
63 .and_then(Value::as_str)
64 .map(str::to_owned)
65 .unwrap_or_else(|| payload.to_string());
66
67 match params.level {
68 LoggingLevel::Debug => debug!(
69 provider = self.provider_name.as_str(),
70 summary = %summary,
71 payload = ?payload,
72 "MCP provider log"
73 ),
74 LoggingLevel::Info | LoggingLevel::Notice => info!(
75 provider = self.provider_name.as_str(),
76 summary = %summary,
77 payload = ?payload,
78 "MCP provider log"
79 ),
80 LoggingLevel::Warning => warn!(
81 provider = self.provider_name.as_str(),
82 summary = %summary,
83 payload = ?payload,
84 "MCP provider warning"
85 ),
86 LoggingLevel::Error
87 | LoggingLevel::Critical
88 | LoggingLevel::Alert
89 | LoggingLevel::Emergency => error!(
90 provider = self.provider_name.as_str(),
91 summary = %summary,
92 payload = ?payload,
93 "MCP provider error"
94 ),
95 }
96 }
97}
98
99impl ClientHandler for LoggingClientHandler {
100 fn on_logging_message(
101 &self,
102 params: LoggingMessageNotificationParam,
103 _context: rmcp::service::NotificationContext<rmcp::service::RoleClient>,
104 ) -> impl std::future::Future<Output = ()> + Send + '_ {
105 self.handle_logging(params);
106 future::ready(())
107 }
108
109 fn get_info(&self) -> ClientInfo {
110 self.info.clone()
111 }
112}
113
114pub struct McpClient {
116 config: McpClientConfig,
117 pub providers: HashMap<String, Arc<McpProvider>>,
118 active_connections: Arc<Mutex<HashMap<String, Arc<RunningMcpService>>>>,
119 allowlist: Arc<RwLock<McpAllowListConfig>>,
120 tool_provider_index: Arc<RwLock<HashMap<String, String>>>,
121}
122
123impl McpClient {
124 pub fn new(config: McpClientConfig) -> Self {
126 let allowlist = Arc::new(RwLock::new(config.allowlist.clone()));
127 Self {
128 config,
129 providers: HashMap::new(),
130 active_connections: Arc::new(Mutex::new(HashMap::new())),
131 allowlist,
132 tool_provider_index: Arc::new(RwLock::new(HashMap::new())),
133 }
134 }
135
136 fn record_tool_provider(&self, provider: &str, tool: &str) {
137 debug!("Recording tool '{}' -> provider '{}'", tool, provider);
138 self.tool_provider_index
139 .write()
140 .insert(tool.to_string(), provider.to_string());
141 }
142
143 pub fn provider_for_tool(&self, tool_name: &str) -> Option<String> {
145 let index = self.tool_provider_index.read();
146 if let Some(provider) = index.get(tool_name) {
147 if self.providers.contains_key(provider) {
149 debug!("Found tool '{}' in provider '{}'", tool_name, provider);
150 Some(provider.clone())
151 } else {
152 debug!(
153 "Tool '{}' references non-existent provider '{}'",
154 tool_name, provider
155 );
156 None
157 }
158 } else {
159 debug!("Tool '{}' not found in provider index", tool_name);
160 None
161 }
162 }
163
164 pub fn update_allowlist(&self, allowlist: McpAllowListConfig) {
166 *self.allowlist.write() = allowlist;
167 }
168
169 pub fn current_allowlist(&self) -> McpAllowListConfig {
171 self.allowlist.read().clone()
172 }
173
174 fn format_tool_result(
175 provider_name: &str,
176 tool_name: &str,
177 result: CallToolResult,
178 ) -> Result<Value> {
179 let is_error = result.is_error.unwrap_or(false);
180 let text_summary = result
181 .content
182 .iter()
183 .find_map(|content| content.as_text().map(|text| text.text.clone()));
184
185 if is_error {
186 let detail = result
187 .structured_content
188 .as_ref()
189 .and_then(|value| value.get("message").and_then(Value::as_str))
190 .map(str::to_owned)
191 .or_else(|| {
192 result
193 .structured_content
194 .as_ref()
195 .map(|value| value.to_string())
196 })
197 .or(text_summary)
198 .unwrap_or_else(|| "Unknown MCP tool error".to_string());
199
200 return Err(anyhow::anyhow!(
201 "MCP tool '{}' on provider '{}' reported an error: {}",
202 tool_name,
203 provider_name,
204 detail
205 ));
206 }
207
208 let mut payload = Map::new();
209 payload.insert("provider".into(), Value::String(provider_name.to_string()));
210 payload.insert("tool".into(), Value::String(tool_name.to_string()));
211
212 if let Some(meta) = result.meta {
213 if let Ok(meta_value) = serde_json::to_value(&meta) {
214 if !meta_value.is_null() {
215 payload.insert("meta".into(), meta_value);
216 }
217 }
218 }
219
220 if let Some(structured) = result.structured_content {
221 match structured {
222 Value::Object(mut object) => {
223 object
224 .entry("provider")
225 .or_insert_with(|| Value::String(provider_name.to_string()));
226 object
227 .entry("tool")
228 .or_insert_with(|| Value::String(tool_name.to_string()));
229
230 if let Some(meta_value) = payload.remove("meta") {
231 object.entry("meta").or_insert(meta_value);
232 }
233
234 return Ok(Value::Object(object));
235 }
236 other => {
237 payload.insert("structured_content".into(), other);
238 }
239 }
240 }
241
242 if let Some(summary) = text_summary {
243 payload.insert("message".into(), Value::String(summary));
244 }
245
246 if !result.content.is_empty() {
247 if let Ok(content_value) = serde_json::to_value(&result.content) {
248 payload.insert("content".into(), content_value);
249 }
250 }
251
252 Ok(Value::Object(payload))
253 }
254
255 pub async fn initialize(&mut self) -> Result<()> {
257 if !self.config.enabled {
258 info!("MCP client is disabled in configuration");
259 return Ok(());
260 }
261
262 info!(
263 "Initializing MCP client with {} configured providers",
264 self.config.providers.len()
265 );
266
267 for provider_config in &self.config.providers {
268 if provider_config.enabled {
269 info!("Initializing MCP provider '{}'", provider_config.name);
270
271 match McpProvider::new(provider_config.clone()).await {
272 Ok(provider) => {
273 let provider = Arc::new(provider);
274 self.providers
275 .insert(provider_config.name.clone(), provider);
276 info!(
277 "Successfully initialized MCP provider '{}'",
278 provider_config.name
279 );
280 self.audit_log(
281 Some(provider_config.name.as_str()),
282 "mcp.provider_initialized",
283 Level::INFO,
284 format!("Provider '{}' initialized", provider_config.name),
285 );
286 }
287 Err(e) => {
288 error!(
289 "Failed to initialize MCP provider '{}': {}",
290 provider_config.name, e
291 );
292 self.audit_log(
293 Some(provider_config.name.as_str()),
294 "mcp.provider_initialization_failed",
295 Level::WARN,
296 format!(
297 "Failed to initialize provider '{}' due to error: {}",
298 provider_config.name, e
299 ),
300 );
301 continue;
303 }
304 }
305 } else {
306 debug!(
307 "MCP provider '{}' is disabled, skipping",
308 provider_config.name
309 );
310 }
311 }
312
313 info!(
314 "MCP client initialization complete. Active providers: {}",
315 self.providers.len()
316 );
317
318 let _ = self.cleanup_dead_providers().await;
320
321 Ok(())
322 }
323
324 async fn kill_remaining_mcp_processes(&self) {
326 debug!("Checking for remaining MCP provider processes to clean up");
327
328 let process_cleanup_attempts = tokio::time::timeout(
331 tokio::time::Duration::from_secs(5),
332 self.attempt_process_cleanup(),
333 )
334 .await;
335
336 match process_cleanup_attempts {
337 Ok(Ok(cleaned_count)) => {
338 if cleaned_count > 0 {
339 info!(
340 "Cleaned up {} remaining MCP provider processes",
341 cleaned_count
342 );
343 self.audit_log(
344 None,
345 "mcp.process_cleanup",
346 Level::INFO,
347 format!(
348 "Cleaned up {} remaining MCP provider processes",
349 cleaned_count
350 ),
351 );
352 } else {
353 debug!("No remaining MCP provider processes to clean up");
354 }
355 }
356 Ok(Err(e)) => {
357 warn!("Error during MCP process cleanup (non-critical): {}", e);
358 self.audit_log(
359 None,
360 "mcp.process_cleanup_error",
361 Level::WARN,
362 format!("Error during MCP process cleanup: {}", e),
363 );
364 }
365 Err(_) => {
366 warn!("MCP process cleanup timed out (non-critical)");
367 self.audit_log(
368 None,
369 "mcp.process_cleanup_timeout",
370 Level::WARN,
371 "MCP process cleanup timed out".to_string(),
372 );
373 }
374 }
375 }
376
377 async fn attempt_process_cleanup(&self) -> Result<usize> {
379 use tokio::process::Command as TokioCommand;
380
381 let mut cleaned_count = 0;
382
383 let current_pid = std::process::id();
385
386 for provider_config in &self.config.providers {
389 if !provider_config.enabled {
390 continue;
391 }
392
393 let provider_name = &provider_config.name;
394 debug!("Attempting cleanup for MCP provider '{}'", provider_name);
395
396 let mut provider_cleaned = 0;
398
399 if let Ok(output) = TokioCommand::new("pgrep")
401 .args(["-f", &format!("mcp-server-{}", provider_name)])
402 .output()
403 .await
404 {
405 if output.status.success() {
406 let pids = String::from_utf8_lossy(&output.stdout);
407 for pid_str in pids.lines() {
408 if let Ok(pid) = pid_str.trim().parse::<u32>() {
409 if pid != current_pid && pid > 0 {
410 if self.kill_process_gracefully(pid).await {
411 provider_cleaned += 1;
412 }
413 }
414 }
415 }
416 }
417 }
418
419 if provider_cleaned == 0 {
421 if let Ok(output) = TokioCommand::new("ps").args(["aux"]).output().await {
422 if output.status.success() {
423 let processes = String::from_utf8_lossy(&output.stdout);
424 for line in processes.lines() {
425 if line.contains(provider_name)
427 && (line.contains("mcp")
428 || line.contains("node")
429 || line.contains("python"))
430 {
431 let parts: Vec<&str> = line.split_whitespace().collect();
433 if let Some(pid_str) = parts.first() {
434 if let Ok(pid) = pid_str.parse::<u32>() {
435 if pid != current_pid && pid > 0 {
436 if self.kill_process_gracefully(pid).await {
437 provider_cleaned += 1;
438 }
439 }
440 }
441 }
442 }
443 }
444 }
445 }
446 }
447
448 if provider_cleaned > 0 {
449 debug!(
450 "Cleaned up {} processes for MCP provider '{}'",
451 provider_cleaned, provider_name
452 );
453 cleaned_count += provider_cleaned;
454 self.tool_provider_index.write().clear();
456 }
457 }
458
459 Ok(cleaned_count)
460 }
461
462 async fn kill_process_gracefully(&self, pid: u32) -> bool {
464 debug!("Killing process {} gracefully", pid);
465
466 let _ = tokio::process::Command::new("kill")
468 .args(["-TERM", &pid.to_string()])
469 .output()
470 .await;
471
472 tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
474
475 if let Ok(output) = tokio::process::Command::new("kill")
477 .args(["-0", &pid.to_string()]) .output()
479 .await
480 {
481 if output.status.success() {
482 debug!("Process {} still running, force killing", pid);
484 let _ = tokio::process::Command::new("kill")
485 .args(["-KILL", &pid.to_string()])
486 .output()
487 .await;
488 true
489 } else {
490 debug!("Process {} already terminated", pid);
492 true
493 }
494 } else {
495 debug!("Process {} check failed, assuming terminated", pid);
497 true
498 }
499 }
500
501 pub async fn cleanup_dead_providers(&self) -> Result<()> {
503 let mut dead_providers = Vec::new();
504
505 for (provider_name, provider) in &self.providers {
506 let provider_health_check = tokio::time::timeout(
508 tokio::time::Duration::from_secs(2),
509 provider.has_tool("ping"),
510 )
511 .await;
512
513 match provider_health_check {
514 Ok(Ok(_)) => {
515 debug!("MCP provider '{}' is healthy", provider_name);
517 }
518 Ok(Err(e)) => {
519 let error_msg = e.to_string();
520 if error_msg.contains("No such process") || error_msg.contains("ESRCH") {
521 warn!(
522 "MCP provider '{}' has terminated process, marking for cleanup",
523 provider_name
524 );
525 dead_providers.push(provider_name.clone());
526 } else {
527 debug!(
528 "MCP provider '{}' returned error but process may be alive: {}",
529 provider_name, e
530 );
531 }
532 }
533 Err(_timeout) => {
534 warn!(
535 "MCP provider '{}' health check timed out, may be unresponsive",
536 provider_name
537 );
538 }
540 }
541 }
542
543 if !dead_providers.is_empty() {
546 warn!(
547 "Found {} dead MCP providers: {:?}",
548 dead_providers.len(),
549 dead_providers
550 );
551 }
552
553 Ok(())
554 }
555
556 pub async fn list_tools(&self) -> Result<Vec<McpToolInfo>> {
558 if !self.config.enabled {
559 debug!("MCP client is disabled, returning empty tool list");
560 return Ok(Vec::new());
561 }
562
563 if self.providers.is_empty() {
564 debug!("No MCP providers configured, returning empty tool list");
565 return Ok(Vec::new());
566 }
567
568 let mut all_tools = Vec::new();
569 let mut errors = Vec::new();
570
571 let allowlist_snapshot = self.allowlist.read().clone();
572
573 for (provider_name, provider) in &self.providers {
574 let provider_id = provider_name.as_str();
575 match tokio::time::timeout(tokio::time::Duration::from_secs(15), provider.list_tools())
576 .await
577 {
578 Ok(Ok(tools)) => {
579 debug!(
580 "Provider '{}' has {} tools",
581 provider_name,
582 tools.tools.len()
583 );
584
585 for tool in tools.tools {
586 let tool_name = tool.name.as_ref();
587
588 if allowlist_snapshot.is_tool_allowed(provider_id, tool_name) {
589 self.record_tool_provider(provider_id, tool_name);
590 all_tools.push(McpToolInfo {
591 name: tool_name.to_string(),
592 description: tool.description.unwrap_or_default().to_string(),
593 provider: provider_name.clone(),
594 input_schema: serde_json::to_value(&*tool.input_schema)
595 .unwrap_or(Value::Null),
596 });
597 } else {
598 self.audit_log(
599 Some(provider_id),
600 "mcp.tool_filtered",
601 Level::DEBUG,
602 format!(
603 "Filtered tool '{}' from provider '{}' due to allow list",
604 tool_name, provider_id
605 ),
606 );
607 }
608 }
609 }
610 Ok(Err(e)) => {
611 let error_msg = e.to_string();
612 if error_msg.contains("No such process")
613 || error_msg.contains("ESRCH")
614 || error_msg.contains("EPIPE")
615 || error_msg.contains("Broken pipe")
616 || error_msg.contains("write EPIPE")
617 {
618 debug!(
619 "MCP provider '{}' process/pipe terminated during tool listing (normal during shutdown): {}",
620 provider_name, e
621 );
622 } else {
623 warn!(
624 "Failed to list tools for provider '{}': {}",
625 provider_name, e
626 );
627 }
628 let error_msg = format!(
629 "Failed to list tools for provider '{}': {}",
630 provider_name, e
631 );
632 errors.push(error_msg);
633 }
634 Err(_timeout) => {
635 warn!("MCP provider '{}' tool listing timed out", provider_name);
636 let error_msg =
637 format!("Tool listing timeout for provider '{}'", provider_name);
638 errors.push(error_msg);
639 }
640 }
641 }
642
643 if !errors.is_empty() {
644 warn!(
645 "Encountered {} errors while listing MCP tools: {:?}",
646 errors.len(),
647 errors
648 );
649 }
650
651 info!(
652 "Found {} total MCP tools across all providers",
653 all_tools.len()
654 );
655 Ok(all_tools)
656 }
657
658 pub async fn execute_tool(&self, tool_name: &str, args: Value) -> Result<Value> {
660 if !self.config.enabled {
661 return Err(anyhow::anyhow!("MCP client is disabled"));
662 }
663
664 if self.providers.is_empty() {
665 return Err(anyhow::anyhow!("No MCP providers configured"));
666 }
667
668 let tool_name_owned = tool_name.to_string();
669 debug!("Executing MCP tool '{}' with args: {}", tool_name, args);
670
671 let provider_name = {
673 let mut found_provider = None;
674 let mut provider_errors = Vec::new();
675
676 for (name, provider) in &self.providers {
677 match provider.has_tool(&tool_name_owned).await {
678 Ok(true) => {
679 found_provider = Some(name.clone());
680 break;
681 }
682 Ok(false) => continue,
683 Err(e) => {
684 let error_msg = format!(
685 "Error checking tool availability for provider '{}': {}",
686 name, e
687 );
688 warn!("{}", error_msg);
689 provider_errors.push(error_msg);
690 }
691 }
692 }
693
694 found_provider.ok_or_else(|| {
695 let error_msg = format!(
696 "Tool '{}' not found in any MCP provider. Provider errors: {:?}",
697 tool_name, provider_errors
698 );
699 anyhow::anyhow!(error_msg)
700 })?
701 };
702
703 debug!("Found tool '{}' in provider '{}'", tool_name, provider_name);
704
705 if !self
706 .allowlist
707 .read()
708 .is_tool_allowed(provider_name.as_str(), tool_name)
709 {
710 let message = format!(
711 "Tool '{}' from provider '{}' is not permitted by the MCP allow list",
712 tool_name, provider_name
713 );
714 self.audit_log(
715 Some(provider_name.as_str()),
716 "mcp.tool_denied",
717 Level::WARN,
718 message.as_str(),
719 );
720 return Err(anyhow::anyhow!(message));
721 }
722
723 self.record_tool_provider(provider_name.as_str(), tool_name);
724
725 let provider = self.providers.get(&provider_name).ok_or_else(|| {
726 anyhow::anyhow!("Provider '{}' not found after discovery", provider_name)
727 })?;
728
729 let connection = match self.get_or_create_connection(provider).await {
731 Ok(conn) => conn,
732 Err(e) => {
733 error!(
734 "Failed to establish connection to provider '{}': {}",
735 provider_name, e
736 );
737 return Err(e);
738 }
739 };
740
741 match connection
743 .call_tool(CallToolRequestParam {
744 name: tool_name_owned.into(),
745 arguments: args.as_object().cloned(),
746 })
747 .await
748 {
749 Ok(result) => match Self::format_tool_result(provider_name.as_str(), tool_name, result)
750 {
751 Ok(serialized) => {
752 info!(
753 "Successfully executed MCP tool '{}' via provider '{}'",
754 tool_name, provider_name
755 );
756 self.audit_log(
757 Some(provider_name.as_str()),
758 "mcp.tool_execution",
759 Level::INFO,
760 format!(
761 "Successfully executed MCP tool '{}' via provider '{}'",
762 tool_name, provider_name
763 ),
764 );
765 Ok(serialized)
766 }
767 Err(err) => {
768 let err_message = err.to_string();
769 warn!(
770 "MCP tool '{}' via provider '{}' returned an error payload: {}",
771 tool_name, provider_name, err_message
772 );
773 self.audit_log(
774 Some(provider_name.as_str()),
775 "mcp.tool_failed",
776 Level::WARN,
777 format!(
778 "MCP tool '{}' via provider '{}' returned an error payload: {}",
779 tool_name, provider_name, err_message
780 ),
781 );
782 Err(err)
783 }
784 },
785 Err(e) => {
786 let error_message = e.to_string();
787
788 error!(
789 "MCP tool '{}' failed on provider '{}': {}",
790 tool_name, provider_name, error_message
791 );
792 self.audit_log(
793 Some(provider_name.as_str()),
794 "mcp.tool_failed",
795 Level::WARN,
796 format!(
797 "MCP tool '{}' failed on provider '{}': {}",
798 tool_name, provider_name, error_message
799 ),
800 );
801
802 if error_message.contains("EPIPE")
804 || error_message.contains("Broken pipe")
805 || error_message.contains("write EPIPE")
806 || error_message.contains("No such process")
807 || error_message.contains("ESRCH")
808 {
809 let mut connections = self.active_connections.lock().await;
811 connections.remove(&provider_name);
812 self.tool_provider_index
814 .write()
815 .retain(|_, provider| provider != &provider_name);
816
817 return Err(anyhow::anyhow!(
818 "MCP provider '{}' disconnected unexpectedly while executing '{}'. The provider process may have terminated. Try re-running the command to restart the provider.",
819 provider_name,
820 tool_name
821 ));
822 } else if error_message.contains("timeout") || error_message.contains("Timeout") {
823 let mut connections = self.active_connections.lock().await;
825 connections.remove(&provider_name);
826
827 return Err(anyhow::anyhow!(
828 "MCP tool '{}' execution timed out on provider '{}'. The provider may be unresponsive. Try re-running the command.",
829 tool_name,
830 provider_name
831 ));
832 } else if error_message.contains("permission")
833 || error_message.contains("Permission denied")
834 {
835 return Err(anyhow::anyhow!(
836 "Permission denied executing MCP tool '{}' on provider '{}': {}",
837 tool_name,
838 provider_name,
839 error_message
840 ));
841 } else if error_message.contains("network")
842 || error_message.contains("Connection refused")
843 {
844 return Err(anyhow::anyhow!(
845 "Network error executing MCP tool '{}' on provider '{}': {}",
846 tool_name,
847 provider_name,
848 error_message
849 ));
850 }
851
852 Err(anyhow::anyhow!(
853 "MCP tool execution failed: {}",
854 error_message
855 ))
856 }
857 }
858 }
859
860 async fn get_or_create_connection(
862 &self,
863 provider: &McpProvider,
864 ) -> Result<Arc<RunningMcpService>> {
865 let provider_name = &provider.config.name;
866 debug!("Getting connection for MCP provider '{}'", provider_name);
867
868 let mut connections = self.active_connections.lock().await;
869
870 if !connections.contains_key(provider_name) {
871 debug!("Creating new connection for provider '{}'", provider_name);
872
873 match tokio::time::timeout(tokio::time::Duration::from_secs(30), provider.connect())
875 .await
876 {
877 Ok(Ok(connection)) => {
878 let connection = Arc::new(connection);
879 connections.insert(provider_name.clone(), Arc::clone(&connection));
880 debug!(
881 "Successfully created connection for provider '{}'",
882 provider_name
883 );
884 Ok(connection)
885 }
886 Ok(Err(e)) => {
887 let error_msg = e.to_string();
888 if error_msg.contains("HTTP MCP server support") {
889 warn!(
890 "Provider '{}' uses HTTP transport which is not fully implemented: {}",
891 provider_name, e
892 );
893 Err(anyhow::anyhow!(
894 "HTTP MCP transport not fully implemented for provider '{}'. Consider using stdio transport instead.",
895 provider_name
896 ))
897 } else if error_msg.contains("command not found")
898 || error_msg.contains("No such file")
899 {
900 error!("Command not found for provider '{}': {}", provider_name, e);
901 Err(anyhow::anyhow!(
902 "Command '{}' not found for MCP provider '{}'. Please ensure the MCP server is installed and accessible.",
903 self.config
904 .providers
905 .iter()
906 .find(|p| p.name == *provider_name)
907 .map(|p| match &p.transport {
908 McpTransportConfig::Stdio(stdio) => stdio.command.as_str(),
909 _ => "unknown",
910 })
911 .unwrap_or("unknown"),
912 provider_name
913 ))
914 } else if error_msg.contains("permission")
915 || error_msg.contains("Permission denied")
916 {
917 error!(
918 "Permission denied creating connection for provider '{}': {}",
919 provider_name, e
920 );
921 Err(anyhow::anyhow!(
922 "Permission denied executing MCP server for provider '{}': {}",
923 provider_name,
924 error_msg
925 ))
926 } else {
927 error!(
928 "Failed to create connection for provider '{}': {}",
929 provider_name, e
930 );
931 Err(anyhow::anyhow!(
932 "Failed to create connection for MCP provider '{}': {}",
933 provider_name,
934 error_msg
935 ))
936 }
937 }
938 Err(_timeout) => {
939 error!(
940 "Connection creation timed out for provider '{}' after {} seconds",
941 provider_name, 30
942 );
943 Err(anyhow::anyhow!(
944 "Connection creation timed out for MCP provider '{}' after {} seconds. The provider may be slow to start or unresponsive.",
945 provider_name,
946 30
947 ))
948 }
949 }
950 } else {
951 let existing_connection = connections.get(provider_name).unwrap().clone();
953
954 if let Err(e) = self
956 .validate_connection(provider_name, &existing_connection)
957 .await
958 {
959 debug!(
960 "Existing connection for provider '{}' is unhealthy, creating new one: {}",
961 provider_name, e
962 );
963
964 connections.remove(provider_name);
966
967 match tokio::time::timeout(tokio::time::Duration::from_secs(30), provider.connect())
969 .await
970 {
971 Ok(Ok(connection)) => {
972 let connection = Arc::new(connection);
973 connections.insert(provider_name.clone(), Arc::clone(&connection));
974 debug!(
975 "Successfully created new connection for provider '{}'",
976 provider_name
977 );
978 Ok(connection)
979 }
980 Ok(Err(e)) => {
981 error!(
982 "Failed to create replacement connection for provider '{}': {}",
983 provider_name, e
984 );
985 Err(e)
986 }
987 Err(_timeout) => {
988 error!(
989 "Replacement connection creation timed out for provider '{}'",
990 provider_name
991 );
992 Err(anyhow::anyhow!(
993 "Replacement connection timeout for provider '{}'",
994 provider_name
995 ))
996 }
997 }
998 } else {
999 debug!(
1000 "Reusing existing healthy connection for provider '{}'",
1001 provider_name
1002 );
1003 Ok(existing_connection)
1004 }
1005 }
1006 }
1007
1008 async fn validate_connection(
1010 &self,
1011 provider_name: &str,
1012 connection: &RunningMcpService,
1013 ) -> Result<()> {
1014 debug!(
1015 "Validating connection health for provider '{}'",
1016 provider_name
1017 );
1018
1019 match tokio::time::timeout(
1022 tokio::time::Duration::from_secs(2),
1023 connection.list_tools(Default::default()),
1024 )
1025 .await
1026 {
1027 Ok(Ok(_)) => {
1028 debug!(
1029 "Connection health check passed for provider '{}'",
1030 provider_name
1031 );
1032 Ok(())
1033 }
1034 Ok(Err(e)) => {
1035 let error_msg = e.to_string();
1036 debug!(
1037 "Connection health check failed for provider '{}': {}",
1038 provider_name, error_msg
1039 );
1040 Err(anyhow::anyhow!(
1041 "Connection health check failed for provider '{}': {}",
1042 provider_name,
1043 error_msg
1044 ))
1045 }
1046 Err(_) => {
1047 debug!(
1048 "Connection health check timed out for provider '{}'",
1049 provider_name
1050 );
1051 Err(anyhow::anyhow!(
1052 "Connection health check timed out for provider '{}'",
1053 provider_name
1054 ))
1055 }
1056 }
1057 }
1058
1059 fn audit_log(
1060 &self,
1061 provider: Option<&str>,
1062 channel: &str,
1063 level: Level,
1064 message: impl AsRef<str>,
1065 ) {
1066 let logging_allowed = {
1067 let allowlist = self.allowlist.read();
1068 allowlist.is_logging_channel_allowed(provider, channel)
1069 };
1070
1071 if !logging_allowed {
1072 return;
1073 }
1074
1075 let msg = message.as_ref();
1076 match level {
1077 Level::ERROR => error!(target: "mcp", "[{}] {}", channel, msg),
1078 Level::WARN => warn!(target: "mcp", "[{}] {}", channel, msg),
1079 Level::INFO => info!(target: "mcp", "[{}] {}", channel, msg),
1080 Level::DEBUG => debug!(target: "mcp", "[{}] {}", channel, msg),
1081 _ => debug!(target: "mcp", "[{}] {}", channel, msg),
1082 }
1083 }
1084
1085 pub async fn shutdown(&self) -> Result<()> {
1087 info!("Shutting down MCP client and all provider connections");
1088
1089 let mut connections = self.active_connections.lock().await;
1090
1091 if connections.is_empty() {
1092 info!("No active MCP connections to shutdown");
1093 return Ok(());
1094 }
1095
1096 info!(
1097 "Shutting down {} MCP provider connections",
1098 connections.len()
1099 );
1100
1101 let cancellation_tokens: Vec<(String, rmcp::service::RunningServiceCancellationToken)> =
1102 connections
1103 .iter()
1104 .map(|(provider_name, connection)| {
1105 debug!(
1106 "Initiating graceful shutdown for MCP provider '{}'",
1107 provider_name
1108 );
1109 (provider_name.clone(), connection.cancellation_token())
1110 })
1111 .collect();
1112
1113 for (provider_name, token) in cancellation_tokens {
1114 debug!(
1115 "Cancelling MCP provider '{}' via cancellation token",
1116 provider_name
1117 );
1118 token.cancel();
1119 }
1120
1121 let shutdown_timeout = tokio::time::Duration::from_secs(5);
1123 let shutdown_start = std::time::Instant::now();
1124
1125 while shutdown_start.elapsed() < shutdown_timeout && !connections.is_empty() {
1127 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1128
1129 connections.retain(|_, connection| {
1131 Arc::strong_count(connection) > 1 });
1134 }
1135
1136 let remaining_count = connections.len();
1138 if remaining_count > 0 {
1139 warn!(
1140 "{} MCP provider connections did not shutdown gracefully within timeout, forcing shutdown",
1141 remaining_count
1142 );
1143 }
1144
1145 let drained_connections: Vec<_> = connections.drain().collect();
1147 drop(connections);
1148
1149 for (provider_name, connection) in drained_connections {
1150 debug!("Force shutting down MCP provider '{}'", provider_name);
1151
1152 if let Ok(connection) = Arc::try_unwrap(connection) {
1153 debug!(
1154 "Awaiting MCP provider '{}' task cancellation after graceful request",
1155 provider_name
1156 );
1157
1158 match connection.cancel().await {
1159 Ok(quit_reason) => {
1160 debug!(
1161 "MCP provider '{}' cancellation completed with reason: {:?}",
1162 provider_name, quit_reason
1163 );
1164 }
1165 Err(err) => {
1166 debug!(
1167 "MCP provider '{}' cancellation join error (non-critical): {}",
1168 provider_name, err
1169 );
1170 }
1171 }
1172 } else {
1173 debug!(
1174 "Additional references exist for MCP provider '{}'; dropping without awaiting",
1175 provider_name
1176 );
1177 }
1178 }
1179
1180 tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
1182
1183 self.kill_remaining_mcp_processes().await;
1186
1187 info!("MCP client shutdown complete");
1188 Ok(())
1189 }
1190}
1191
1192#[derive(Debug, Clone)]
1194pub struct McpToolInfo {
1195 pub name: String,
1196 pub description: String,
1197 pub provider: String,
1198 pub input_schema: Value,
1199}
1200
1201pub struct McpProvider {
1203 config: McpProviderConfig,
1204 tools_cache: Arc<Mutex<Option<ListToolsResult>>>,
1205}
1206
1207impl McpProvider {
1208 pub async fn new(config: McpProviderConfig) -> Result<Self> {
1210 Ok(Self {
1211 config,
1212 tools_cache: Arc::new(Mutex::new(None)),
1213 })
1214 }
1215
1216 pub async fn list_tools(&self) -> Result<ListToolsResult> {
1218 let provider_name = &self.config.name;
1219 debug!("Listing tools for MCP provider '{}'", provider_name);
1220
1221 {
1223 let cache = self.tools_cache.lock().await;
1224 if let Some(cached) = cache.as_ref() {
1225 debug!("Using cached tools for provider '{}'", provider_name);
1226 return Ok(cached.clone());
1227 }
1228 }
1229
1230 debug!("Connecting to provider '{}' to fetch tools", provider_name);
1231
1232 match self.connect().await {
1234 Ok(connection) => {
1235 match connection.list_tools(Default::default()).await {
1236 Ok(tools) => {
1237 debug!(
1238 "Found {} tools for provider '{}'",
1239 tools.tools.len(),
1240 provider_name
1241 );
1242
1243 {
1245 let mut cache = self.tools_cache.lock().await;
1246 *cache = Some(tools.clone());
1247 }
1248
1249 Ok(tools)
1250 }
1251 Err(e) => {
1252 error!(
1253 "Failed to list tools for provider '{}': {}",
1254 provider_name, e
1255 );
1256 Err(anyhow::anyhow!("Failed to list tools: {}", e))
1257 }
1258 }
1259 }
1260 Err(e) => {
1261 error!("Failed to connect to provider '{}': {}", provider_name, e);
1262 Err(e)
1263 }
1264 }
1265 }
1266
1267 pub async fn has_tool(&self, tool_name: &str) -> Result<bool> {
1269 let provider_name = &self.config.name;
1270 debug!(
1271 "Checking if provider '{}' has tool '{}'",
1272 provider_name, tool_name
1273 );
1274
1275 match tokio::time::timeout(tokio::time::Duration::from_secs(10), self.list_tools()).await {
1276 Ok(Ok(tools)) => {
1277 let has_tool = tools.tools.iter().any(|tool| tool.name == tool_name);
1278 debug!(
1279 "Provider '{}' {} tool '{}'",
1280 provider_name,
1281 if has_tool { "has" } else { "does not have" },
1282 tool_name
1283 );
1284 Ok(has_tool)
1285 }
1286 Ok(Err(e)) => {
1287 let error_msg = e.to_string();
1288 if error_msg.contains("No such process")
1289 || error_msg.contains("ESRCH")
1290 || error_msg.contains("EPIPE")
1291 || error_msg.contains("Broken pipe")
1292 || error_msg.contains("write EPIPE")
1293 {
1294 debug!(
1295 "MCP provider '{}' process/pipe terminated during tool check (normal during shutdown): {}",
1296 provider_name, e
1297 );
1298 } else {
1299 warn!(
1300 "Failed to check tool availability for provider '{}': {}",
1301 provider_name, e
1302 );
1303 }
1304 Err(e)
1305 }
1306 Err(_timeout) => {
1307 warn!("MCP provider '{}' tool check timed out", provider_name);
1308 Err(anyhow::anyhow!("Tool availability check timeout"))
1309 }
1310 }
1311 }
1312
1313 async fn connect(&self) -> Result<RunningMcpService> {
1315 let provider_name = &self.config.name;
1316 info!("Connecting to MCP provider '{}'", provider_name);
1317
1318 match &self.config.transport {
1319 McpTransportConfig::Stdio(stdio_config) => {
1320 debug!("Using stdio transport for provider '{}'", provider_name);
1321 self.connect_stdio(stdio_config).await
1322 }
1323 McpTransportConfig::Http(http_config) => {
1324 debug!("Using HTTP transport for provider '{}'", provider_name);
1325 self.connect_http(http_config).await
1326 }
1327 }
1328 }
1329
1330 async fn connect_http(
1332 &self,
1333 config: &crate::config::mcp::McpHttpServerConfig,
1334 ) -> Result<RunningMcpService> {
1335 let provider_name = &self.config.name;
1336 debug!(
1337 "Setting up HTTP connection for provider '{}'",
1338 provider_name
1339 );
1340
1341 let mut headers = HeaderMap::new();
1343 headers.insert("Content-Type", "application/json".parse().unwrap());
1344
1345 if let Some(api_key_env) = &config.api_key_env {
1347 if let Ok(api_key) = std::env::var(api_key_env) {
1348 headers.insert(
1349 "Authorization",
1350 format!("Bearer {}", api_key).parse().unwrap(),
1351 );
1352 } else {
1353 warn!(
1354 "API key environment variable '{}' not found for provider '{}'",
1355 api_key_env, provider_name
1356 );
1357 }
1358 }
1359
1360 for (key, value) in &config.headers {
1362 if let (Ok(header_name), Ok(header_value)) =
1363 (key.parse::<HeaderName>(), value.parse::<HeaderValue>())
1364 {
1365 headers.insert(header_name, header_value);
1366 }
1367 }
1368
1369 let client = reqwest::Client::builder()
1370 .default_headers(headers)
1371 .timeout(std::time::Duration::from_secs(30))
1372 .build()
1373 .context("Failed to build HTTP client")?;
1374
1375 debug!(
1377 "Testing HTTP MCP server connectivity at '{}'",
1378 config.endpoint
1379 );
1380
1381 match client.get(&config.endpoint).send().await {
1382 Ok(response) => {
1383 let status = response.status();
1384 if status.is_success() {
1385 debug!(
1386 "HTTP MCP server at '{}' is reachable (status: {})",
1387 config.endpoint, status
1388 );
1389
1390 let mcp_endpoint = if config.endpoint.ends_with('/') {
1393 format!("{}mcp", config.endpoint)
1394 } else {
1395 format!("{}/mcp", config.endpoint)
1396 };
1397
1398 debug!("Attempting to connect to MCP endpoint: {}", mcp_endpoint);
1399
1400 match client.get(&mcp_endpoint).send().await {
1402 Ok(mcp_response) => {
1403 if mcp_response.status().is_success() {
1404 debug!(
1405 "MCP endpoint '{}' is available (status: {})",
1406 mcp_endpoint,
1407 mcp_response.status()
1408 );
1409
1410 Err(anyhow::anyhow!(
1413 "HTTP MCP server detected at '{}' but full streamable HTTP implementation is required. \
1414 MCP endpoint is available at '{}'. \
1415 Consider using stdio transport or implement HTTP streaming support with Server-Sent Events.",
1416 config.endpoint,
1417 mcp_endpoint
1418 ))
1419 } else {
1420 debug!(
1421 "MCP endpoint '{}' returned status: {}",
1422 mcp_endpoint,
1423 mcp_response.status()
1424 );
1425 Err(anyhow::anyhow!(
1426 "HTTP MCP server at '{}' does not support MCP protocol. \
1427 Expected MCP endpoint at '{}' but got status: {}. \
1428 Consider using stdio transport instead.",
1429 config.endpoint,
1430 mcp_endpoint,
1431 mcp_response.status()
1432 ))
1433 }
1434 }
1435 Err(e) => {
1436 let error_msg = e.to_string();
1437 debug!(
1438 "Failed to connect to MCP endpoint '{}': {}",
1439 mcp_endpoint, error_msg
1440 );
1441
1442 Err(anyhow::anyhow!(
1443 "HTTP MCP server at '{}' does not properly support MCP protocol. \
1444 Could not connect to MCP endpoint at '{}': {}. \
1445 Consider using stdio transport instead.",
1446 config.endpoint,
1447 mcp_endpoint,
1448 error_msg
1449 ))
1450 }
1451 }
1452 } else {
1453 Err(anyhow::anyhow!(
1454 "HTTP MCP server returned error status: {} at endpoint: {}",
1455 status,
1456 config.endpoint
1457 ))
1458 }
1459 }
1460 Err(e) => {
1461 let error_msg = e.to_string();
1462 if error_msg.contains("dns") || error_msg.contains("Name resolution") {
1463 Err(anyhow::anyhow!(
1464 "HTTP MCP server DNS resolution failed for '{}': {}",
1465 config.endpoint,
1466 e
1467 ))
1468 } else if error_msg.contains("Connection refused") || error_msg.contains("connect")
1469 {
1470 Err(anyhow::anyhow!(
1471 "HTTP MCP server connection failed for '{}': {}",
1472 config.endpoint,
1473 e
1474 ))
1475 } else {
1476 Err(anyhow::anyhow!(
1477 "HTTP MCP server error for '{}': {}",
1478 config.endpoint,
1479 e
1480 ))
1481 }
1482 }
1483 }
1484 }
1485
1486 async fn connect_stdio(
1488 &self,
1489 config: &crate::config::mcp::McpStdioServerConfig,
1490 ) -> Result<RunningMcpService> {
1491 let provider_name = &self.config.name;
1492 debug!(
1493 "Setting up stdio connection for provider '{}'",
1494 provider_name
1495 );
1496
1497 debug!("Command: {} with args: {:?}", config.command, config.args);
1498
1499 let mut command = Command::new(&config.command);
1500 command.args(&config.args);
1501
1502 if let Some(working_dir) = &config.working_directory {
1504 debug!("Using working directory: {}", working_dir);
1505 command.current_dir(working_dir);
1506 }
1507
1508 if !self.config.env.is_empty() {
1510 debug!(
1511 "Setting environment variables for provider '{}'",
1512 provider_name
1513 );
1514 command.envs(&self.config.env);
1515 }
1516
1517 command.process_group(0);
1519
1520 debug!(
1521 "Creating TokioChildProcess for provider '{}'",
1522 provider_name
1523 );
1524
1525 match TokioChildProcess::new(command) {
1526 Ok(child_process) => {
1527 debug!(
1528 "Successfully created child process for provider '{}'",
1529 provider_name
1530 );
1531
1532 let handler = LoggingClientHandler::new(provider_name);
1534
1535 match tokio::time::timeout(
1536 tokio::time::Duration::from_secs(30),
1537 handler.serve(child_process),
1538 )
1539 .await
1540 {
1541 Ok(Ok(connection)) => {
1542 info!(
1543 "Successfully established connection to MCP provider '{}'",
1544 provider_name
1545 );
1546 Ok(connection)
1547 }
1548 Ok(Err(e)) => {
1549 let error_msg = e.to_string();
1551 if error_msg.contains("No such process")
1552 || error_msg.contains("ESRCH")
1553 || error_msg.contains("EPIPE")
1554 || error_msg.contains("Broken pipe")
1555 || error_msg.contains("write EPIPE")
1556 {
1557 debug!(
1558 "MCP provider '{}' pipe/process error during connection (normal during shutdown): {}",
1559 provider_name, e
1560 );
1561 Err(anyhow::anyhow!("MCP provider connection terminated: {}", e))
1562 } else {
1563 error!(
1564 "Failed to establish MCP connection for provider '{}': {}",
1565 provider_name, e
1566 );
1567 Err(anyhow::anyhow!("Failed to serve MCP connection: {}", e))
1568 }
1569 }
1570 Err(_timeout) => {
1571 warn!(
1572 "MCP provider '{}' connection timed out after 30 seconds",
1573 provider_name
1574 );
1575 Err(anyhow::anyhow!("MCP provider connection timeout"))
1576 }
1577 }
1578 }
1579 Err(e) => {
1580 let error_msg = e.to_string();
1582 if error_msg.contains("No such process") || error_msg.contains("ESRCH") {
1583 error!(
1584 "Failed to create child process for provider '{}' - process may have terminated: {}",
1585 provider_name, e
1586 );
1587 } else {
1588 error!(
1589 "Failed to create child process for provider '{}': {}",
1590 provider_name, e
1591 );
1592 }
1593 Err(anyhow::anyhow!("Failed to create child process: {}", e))
1594 }
1595 }
1596 }
1597}
1598
1599type RunningMcpService =
1601 rmcp::service::RunningService<rmcp::service::RoleClient, LoggingClientHandler>;
1602
1603#[derive(Debug, Clone)]
1605pub struct McpClientStatus {
1606 pub enabled: bool,
1607 pub provider_count: usize,
1608 pub active_connections: usize,
1609 pub configured_providers: Vec<String>,
1610}
1611
1612impl McpClient {
1613 pub fn get_status(&self) -> McpClientStatus {
1615 McpClientStatus {
1616 enabled: self.config.enabled,
1617 provider_count: self.providers.len(),
1618 active_connections: self
1619 .active_connections
1620 .try_lock()
1621 .map(|connections| connections.len())
1622 .unwrap_or(0),
1623 configured_providers: self.providers.keys().cloned().collect(),
1624 }
1625 }
1626}
1627
1628#[async_trait]
1630pub trait McpToolExecutor: Send + Sync {
1631 async fn execute_mcp_tool(&self, tool_name: &str, args: Value) -> Result<Value>;
1633
1634 async fn list_mcp_tools(&self) -> Result<Vec<McpToolInfo>>;
1636
1637 async fn has_mcp_tool(&self, tool_name: &str) -> Result<bool>;
1639
1640 fn get_status(&self) -> McpClientStatus;
1642}
1643
1644#[async_trait]
1645impl McpToolExecutor for McpClient {
1646 async fn execute_mcp_tool(&self, tool_name: &str, args: Value) -> Result<Value> {
1647 self.execute_tool(tool_name, args).await
1648 }
1649
1650 async fn list_mcp_tools(&self) -> Result<Vec<McpToolInfo>> {
1651 self.list_tools().await
1652 }
1653
1654 async fn has_mcp_tool(&self, tool_name: &str) -> Result<bool> {
1655 if self.providers.is_empty() {
1656 return Ok(false);
1657 }
1658
1659 let mut provider_errors = Vec::new();
1660
1661 for (provider_name, provider) in &self.providers {
1662 let provider_id = provider_name.as_str();
1663 match provider.has_tool(tool_name).await {
1664 Ok(true) => {
1665 if self
1666 .allowlist
1667 .read()
1668 .is_tool_allowed(provider_id, tool_name)
1669 {
1670 self.record_tool_provider(provider_id, tool_name);
1671 return Ok(true);
1672 }
1673
1674 self.audit_log(
1675 Some(provider_id),
1676 "mcp.tool_denied",
1677 Level::DEBUG,
1678 format!(
1679 "Tool '{}' exists on provider '{}' but is blocked by allow list",
1680 tool_name, provider_id
1681 ),
1682 );
1683 }
1684 Ok(false) => continue,
1685 Err(e) => {
1686 let error_msg = format!("Error checking provider '{}': {}", provider_name, e);
1687 warn!("{}", error_msg);
1688 provider_errors.push(error_msg);
1689 }
1690 }
1691 }
1692
1693 if !provider_errors.is_empty() {
1694 debug!(
1695 "Encountered {} errors while checking tool availability: {:?}",
1696 provider_errors.len(),
1697 provider_errors
1698 );
1699
1700 let summary = provider_errors.join("; ");
1701 return Err(anyhow::anyhow!(
1702 "Failed to confirm MCP tool '{}' availability. {}",
1703 tool_name,
1704 summary
1705 ));
1706 }
1707
1708 Ok(false)
1709 }
1710
1711 fn get_status(&self) -> McpClientStatus {
1712 self.get_status()
1713 }
1714}
1715
1716#[cfg(test)]
1717mod tests {
1718 use super::*;
1719 use crate::config::mcp::{McpStdioServerConfig, McpTransportConfig};
1720 use rmcp::model::{Content, Meta};
1721 use serde_json::json;
1722
1723 #[test]
1724 fn test_mcp_client_creation() {
1725 let config = McpClientConfig::default();
1726 let client = McpClient::new(config);
1727 assert!(!client.config.enabled);
1728 assert!(client.providers.is_empty());
1729 }
1730
1731 #[test]
1732 fn test_mcp_tool_info() {
1733 let tool_info = McpToolInfo {
1734 name: "test_tool".to_string(),
1735 description: "A test tool".to_string(),
1736 provider: "test_provider".to_string(),
1737 input_schema: json!({
1738 "type": "object",
1739 "properties": {
1740 "input": {"type": "string"}
1741 }
1742 }),
1743 };
1744
1745 assert_eq!(tool_info.name, "test_tool");
1746 assert_eq!(tool_info.provider, "test_provider");
1747 }
1748
1749 #[test]
1750 fn test_provider_config() {
1751 let config = McpProviderConfig {
1752 name: "test".to_string(),
1753 transport: McpTransportConfig::Stdio(McpStdioServerConfig {
1754 command: "echo".to_string(),
1755 args: vec!["hello".to_string()],
1756 working_directory: None,
1757 }),
1758 env: HashMap::new(),
1759 enabled: true,
1760 max_concurrent_requests: 3,
1761 };
1762
1763 assert_eq!(config.name, "test");
1764 assert!(config.enabled);
1765 assert_eq!(config.max_concurrent_requests, 3);
1766 }
1767
1768 #[test]
1769 fn test_tool_info_creation() {
1770 let tool_info = McpToolInfo {
1771 name: "test_tool".to_string(),
1772 description: "A test tool".to_string(),
1773 provider: "test_provider".to_string(),
1774 input_schema: serde_json::json!({
1775 "type": "object",
1776 "properties": {
1777 "input": {"type": "string"}
1778 }
1779 }),
1780 };
1781
1782 assert_eq!(tool_info.name, "test_tool");
1783 assert_eq!(tool_info.provider, "test_provider");
1784 }
1785
1786 #[test]
1787 fn test_format_tool_result_success() {
1788 let mut result = CallToolResult::structured(json!({
1789 "value": 42,
1790 "status": "ok"
1791 }));
1792 let mut meta = Meta::new();
1793 meta.insert("query".to_string(), Value::String("tokio".to_string()));
1794 result.meta = Some(meta);
1795
1796 let serialized = McpClient::format_tool_result("test", "demo", result).unwrap();
1797 assert_eq!(
1798 serialized.get("provider").and_then(Value::as_str),
1799 Some("test")
1800 );
1801 assert_eq!(serialized.get("tool").and_then(Value::as_str), Some("demo"));
1802 assert_eq!(serialized.get("status").and_then(Value::as_str), Some("ok"));
1803 assert_eq!(serialized.get("value").and_then(Value::as_i64), Some(42));
1804 assert_eq!(
1805 serialized
1806 .get("meta")
1807 .and_then(Value::as_object)
1808 .and_then(|map| map.get("query"))
1809 .and_then(Value::as_str),
1810 Some("tokio")
1811 );
1812 }
1813
1814 #[test]
1815 fn test_format_tool_result_error_detection() {
1816 let result = CallToolResult::structured_error(json!({
1817 "message": "something went wrong"
1818 }));
1819
1820 let error = McpClient::format_tool_result("test", "demo", result).unwrap_err();
1821 assert!(error.to_string().contains("something went wrong"));
1822 }
1823
1824 #[test]
1825 fn test_format_tool_result_error_from_text_content() {
1826 let result = CallToolResult::error(vec![Content::text("plain failure")]);
1827
1828 let error = McpClient::format_tool_result("test", "demo", result).unwrap_err();
1829 assert!(error.to_string().contains("plain failure"));
1830 }
1831}