vtcode_core/
mcp_client.rs

1//! MCP Client implementation
2//!
3//! This module provides a high-level abstraction over the rmcp library,
4//! managing MCP provider connections and tool execution.
5
6use crate::config::mcp::{
7    McpAllowListConfig, McpClientConfig, McpProviderConfig, McpTransportConfig,
8};
9use anyhow::{Context, Result};
10use async_trait::async_trait;
11use parking_lot::RwLock;
12use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
13use rmcp::{
14    ServiceExt,
15    handler::client::ClientHandler,
16    model::{
17        CallToolRequestParam, CallToolResult, ClientCapabilities, ClientInfo, Implementation,
18        ListToolsResult, LoggingLevel, LoggingMessageNotificationParam, RootsCapabilities,
19    },
20    transport::TokioChildProcess,
21};
22use serde_json::{Map, Value};
23use std::collections::HashMap;
24use std::future;
25use std::sync::Arc;
26use tokio::process::Command;
27use tokio::sync::Mutex;
28use tracing::{Level, debug, error, info, warn};
29
30#[derive(Clone)]
31struct LoggingClientHandler {
32    provider_name: String,
33    info: ClientInfo,
34}
35
36impl LoggingClientHandler {
37    fn new(provider_name: &str) -> Self {
38        let mut info = ClientInfo::default();
39        info.capabilities = ClientCapabilities {
40            roots: Some(RootsCapabilities {
41                list_changed: Some(true),
42            }),
43            ..ClientCapabilities::default()
44        };
45        info.client_info = Implementation {
46            name: "vtcode".to_string(),
47            title: Some("VT Code MCP client".to_string()),
48            version: env!("CARGO_PKG_VERSION").to_string(),
49            icons: None,
50            website_url: Some("https://github.com/modelcontextprotocol".to_string()),
51        };
52
53        Self {
54            provider_name: provider_name.to_string(),
55            info,
56        }
57    }
58
59    fn handle_logging(&self, params: LoggingMessageNotificationParam) {
60        let payload = params.data;
61        let summary = payload
62            .get("message")
63            .and_then(Value::as_str)
64            .map(str::to_owned)
65            .unwrap_or_else(|| payload.to_string());
66
67        match params.level {
68            LoggingLevel::Debug => debug!(
69                provider = self.provider_name.as_str(),
70                summary = %summary,
71                payload = ?payload,
72                "MCP provider log"
73            ),
74            LoggingLevel::Info | LoggingLevel::Notice => info!(
75                provider = self.provider_name.as_str(),
76                summary = %summary,
77                payload = ?payload,
78                "MCP provider log"
79            ),
80            LoggingLevel::Warning => warn!(
81                provider = self.provider_name.as_str(),
82                summary = %summary,
83                payload = ?payload,
84                "MCP provider warning"
85            ),
86            LoggingLevel::Error
87            | LoggingLevel::Critical
88            | LoggingLevel::Alert
89            | LoggingLevel::Emergency => error!(
90                provider = self.provider_name.as_str(),
91                summary = %summary,
92                payload = ?payload,
93                "MCP provider error"
94            ),
95        }
96    }
97}
98
99impl ClientHandler for LoggingClientHandler {
100    fn on_logging_message(
101        &self,
102        params: LoggingMessageNotificationParam,
103        _context: rmcp::service::NotificationContext<rmcp::service::RoleClient>,
104    ) -> impl std::future::Future<Output = ()> + Send + '_ {
105        self.handle_logging(params);
106        future::ready(())
107    }
108
109    fn get_info(&self) -> ClientInfo {
110        self.info.clone()
111    }
112}
113
114/// High-level MCP client that manages multiple providers
115pub struct McpClient {
116    config: McpClientConfig,
117    pub providers: HashMap<String, Arc<McpProvider>>,
118    active_connections: Arc<Mutex<HashMap<String, Arc<RunningMcpService>>>>,
119    allowlist: Arc<RwLock<McpAllowListConfig>>,
120    tool_provider_index: Arc<RwLock<HashMap<String, String>>>,
121}
122
123impl McpClient {
124    /// Create a new MCP client with the given configuration
125    pub fn new(config: McpClientConfig) -> Self {
126        let allowlist = Arc::new(RwLock::new(config.allowlist.clone()));
127        Self {
128            config,
129            providers: HashMap::new(),
130            active_connections: Arc::new(Mutex::new(HashMap::new())),
131            allowlist,
132            tool_provider_index: Arc::new(RwLock::new(HashMap::new())),
133        }
134    }
135
136    fn record_tool_provider(&self, provider: &str, tool: &str) {
137        debug!("Recording tool '{}' -> provider '{}'", tool, provider);
138        self.tool_provider_index
139            .write()
140            .insert(tool.to_string(), provider.to_string());
141    }
142
143    /// Retrieve provider reference for a known tool name
144    pub fn provider_for_tool(&self, tool_name: &str) -> Option<String> {
145        let index = self.tool_provider_index.read();
146        if let Some(provider) = index.get(tool_name) {
147            // Validate that the provider still exists and is enabled
148            if self.providers.contains_key(provider) {
149                debug!("Found tool '{}' in provider '{}'", tool_name, provider);
150                Some(provider.clone())
151            } else {
152                debug!(
153                    "Tool '{}' references non-existent provider '{}'",
154                    tool_name, provider
155                );
156                None
157            }
158        } else {
159            debug!("Tool '{}' not found in provider index", tool_name);
160            None
161        }
162    }
163
164    /// Replace the in-memory MCP allow list with the provided configuration
165    pub fn update_allowlist(&self, allowlist: McpAllowListConfig) {
166        *self.allowlist.write() = allowlist;
167    }
168
169    /// Get a clone of the current allow list
170    pub fn current_allowlist(&self) -> McpAllowListConfig {
171        self.allowlist.read().clone()
172    }
173
174    fn format_tool_result(
175        provider_name: &str,
176        tool_name: &str,
177        result: CallToolResult,
178    ) -> Result<Value> {
179        let is_error = result.is_error.unwrap_or(false);
180        let text_summary = result
181            .content
182            .iter()
183            .find_map(|content| content.as_text().map(|text| text.text.clone()));
184
185        if is_error {
186            let detail = result
187                .structured_content
188                .as_ref()
189                .and_then(|value| value.get("message").and_then(Value::as_str))
190                .map(str::to_owned)
191                .or_else(|| {
192                    result
193                        .structured_content
194                        .as_ref()
195                        .map(|value| value.to_string())
196                })
197                .or(text_summary)
198                .unwrap_or_else(|| "Unknown MCP tool error".to_string());
199
200            return Err(anyhow::anyhow!(
201                "MCP tool '{}' on provider '{}' reported an error: {}",
202                tool_name,
203                provider_name,
204                detail
205            ));
206        }
207
208        let mut payload = Map::new();
209        payload.insert("provider".into(), Value::String(provider_name.to_string()));
210        payload.insert("tool".into(), Value::String(tool_name.to_string()));
211
212        if let Some(meta) = result.meta {
213            if let Ok(meta_value) = serde_json::to_value(&meta) {
214                if !meta_value.is_null() {
215                    payload.insert("meta".into(), meta_value);
216                }
217            }
218        }
219
220        if let Some(structured) = result.structured_content {
221            match structured {
222                Value::Object(mut object) => {
223                    object
224                        .entry("provider")
225                        .or_insert_with(|| Value::String(provider_name.to_string()));
226                    object
227                        .entry("tool")
228                        .or_insert_with(|| Value::String(tool_name.to_string()));
229
230                    if let Some(meta_value) = payload.remove("meta") {
231                        object.entry("meta").or_insert(meta_value);
232                    }
233
234                    return Ok(Value::Object(object));
235                }
236                other => {
237                    payload.insert("structured_content".into(), other);
238                }
239            }
240        }
241
242        if let Some(summary) = text_summary {
243            payload.insert("message".into(), Value::String(summary));
244        }
245
246        if !result.content.is_empty() {
247            if let Ok(content_value) = serde_json::to_value(&result.content) {
248                payload.insert("content".into(), content_value);
249            }
250        }
251
252        Ok(Value::Object(payload))
253    }
254
255    /// Initialize the MCP client and connect to configured providers
256    pub async fn initialize(&mut self) -> Result<()> {
257        if !self.config.enabled {
258            info!("MCP client is disabled in configuration");
259            return Ok(());
260        }
261
262        info!(
263            "Initializing MCP client with {} configured providers",
264            self.config.providers.len()
265        );
266
267        for provider_config in &self.config.providers {
268            if provider_config.enabled {
269                info!("Initializing MCP provider '{}'", provider_config.name);
270
271                match McpProvider::new(provider_config.clone()).await {
272                    Ok(provider) => {
273                        let provider = Arc::new(provider);
274                        self.providers
275                            .insert(provider_config.name.clone(), provider);
276                        info!(
277                            "Successfully initialized MCP provider '{}'",
278                            provider_config.name
279                        );
280                        self.audit_log(
281                            Some(provider_config.name.as_str()),
282                            "mcp.provider_initialized",
283                            Level::INFO,
284                            format!("Provider '{}' initialized", provider_config.name),
285                        );
286                    }
287                    Err(e) => {
288                        error!(
289                            "Failed to initialize MCP provider '{}': {}",
290                            provider_config.name, e
291                        );
292                        self.audit_log(
293                            Some(provider_config.name.as_str()),
294                            "mcp.provider_initialization_failed",
295                            Level::WARN,
296                            format!(
297                                "Failed to initialize provider '{}' due to error: {}",
298                                provider_config.name, e
299                            ),
300                        );
301                        // Continue with other providers instead of failing completely
302                        continue;
303                    }
304                }
305            } else {
306                debug!(
307                    "MCP provider '{}' is disabled, skipping",
308                    provider_config.name
309                );
310            }
311        }
312
313        info!(
314            "MCP client initialization complete. Active providers: {}",
315            self.providers.len()
316        );
317
318        // Note: We don't call cleanup_dead_providers() here because no connections
319        // have been established yet during initialization. Cleanup will happen
320        // naturally when connections are first established and fail.
321
322        Ok(())
323    }
324
325    /// Kill any remaining MCP provider processes that may not have terminated properly
326    async fn kill_remaining_mcp_processes(&self) {
327        debug!("Checking for remaining MCP provider processes to clean up");
328
329        // Try to find and kill any remaining MCP provider processes
330        // This is a fallback for cases where the rmcp library doesn't properly terminate processes
331        let process_cleanup_attempts = tokio::time::timeout(
332            tokio::time::Duration::from_secs(5),
333            self.attempt_process_cleanup(),
334        )
335        .await;
336
337        match process_cleanup_attempts {
338            Ok(Ok(cleaned_count)) => {
339                if cleaned_count > 0 {
340                    info!(
341                        "Cleaned up {} remaining MCP provider processes",
342                        cleaned_count
343                    );
344                    self.audit_log(
345                        None,
346                        "mcp.process_cleanup",
347                        Level::INFO,
348                        format!(
349                            "Cleaned up {} remaining MCP provider processes",
350                            cleaned_count
351                        ),
352                    );
353                } else {
354                    debug!("No remaining MCP provider processes to clean up");
355                }
356            }
357            Ok(Err(e)) => {
358                warn!("Error during MCP process cleanup (non-critical): {}", e);
359                self.audit_log(
360                    None,
361                    "mcp.process_cleanup_error",
362                    Level::WARN,
363                    format!("Error during MCP process cleanup: {}", e),
364                );
365            }
366            Err(_) => {
367                warn!("MCP process cleanup timed out (non-critical)");
368                self.audit_log(
369                    None,
370                    "mcp.process_cleanup_timeout",
371                    Level::WARN,
372                    "MCP process cleanup timed out".to_string(),
373                );
374            }
375        }
376    }
377
378    /// Attempt to clean up MCP provider processes by finding and killing them
379    async fn attempt_process_cleanup(&self) -> Result<usize> {
380        use tokio::process::Command as TokioCommand;
381
382        let mut cleaned_count = 0;
383
384        // Get current process ID to avoid killing ourselves
385        let current_pid = std::process::id();
386
387        // Try to find MCP provider processes and kill them
388        // This is a best-effort cleanup for processes that may have escaped proper termination
389        for provider_config in &self.config.providers {
390            if !provider_config.enabled {
391                continue;
392            }
393
394            let provider_name = &provider_config.name;
395            debug!("Attempting cleanup for MCP provider '{}'", provider_name);
396
397            // Try multiple approaches to find and kill processes
398            let mut provider_cleaned = 0;
399
400            // Approach 1: Use pgrep with command pattern
401            if let Ok(output) = TokioCommand::new("pgrep")
402                .args(["-f", &format!("mcp-server-{}", provider_name)])
403                .output()
404                .await
405            {
406                if output.status.success() {
407                    let pids = String::from_utf8_lossy(&output.stdout);
408                    for pid_str in pids.lines() {
409                        if let Ok(pid) = pid_str.trim().parse::<u32>() {
410                            if pid != current_pid && pid > 0 {
411                                if self.kill_process_gracefully(pid).await {
412                                    provider_cleaned += 1;
413                                }
414                            }
415                        }
416                    }
417                }
418            }
419
420            // Approach 2: If pgrep failed, try ps with grep
421            if provider_cleaned == 0 {
422                if let Ok(output) = TokioCommand::new("ps").args(["aux"]).output().await {
423                    if output.status.success() {
424                        let processes = String::from_utf8_lossy(&output.stdout);
425                        for line in processes.lines() {
426                            // Look for lines containing the provider name and MCP-related terms
427                            if line.contains(provider_name)
428                                && (line.contains("mcp")
429                                    || line.contains("node")
430                                    || line.contains("python"))
431                            {
432                                // Extract PID from ps output (first column)
433                                let parts: Vec<&str> = line.split_whitespace().collect();
434                                if let Some(pid_str) = parts.first() {
435                                    if let Ok(pid) = pid_str.parse::<u32>() {
436                                        if pid != current_pid && pid > 0 {
437                                            if self.kill_process_gracefully(pid).await {
438                                                provider_cleaned += 1;
439                                            }
440                                        }
441                                    }
442                                }
443                            }
444                        }
445                    }
446                }
447            }
448
449            if provider_cleaned > 0 {
450                debug!(
451                    "Cleaned up {} processes for MCP provider '{}'",
452                    provider_cleaned, provider_name
453                );
454                cleaned_count += provider_cleaned;
455                // Clear the tool provider index when we kill processes
456                self.tool_provider_index.write().clear();
457            }
458        }
459
460        Ok(cleaned_count)
461    }
462
463    /// Kill a process gracefully with TERM first, then KILL if needed
464    async fn kill_process_gracefully(&self, pid: u32) -> bool {
465        debug!("Killing process {} gracefully", pid);
466
467        // Try graceful termination first
468        let _ = tokio::process::Command::new("kill")
469            .args(["-TERM", &pid.to_string()])
470            .output()
471            .await;
472
473        // Give it a moment to terminate gracefully
474        tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
475
476        // Check if process is still running
477        if let Ok(output) = tokio::process::Command::new("kill")
478            .args(["-0", &pid.to_string()]) // Check if process exists
479            .output()
480            .await
481        {
482            if output.status.success() {
483                // Process still exists, force kill it
484                debug!("Process {} still running, force killing", pid);
485                let _ = tokio::process::Command::new("kill")
486                    .args(["-KILL", &pid.to_string()])
487                    .output()
488                    .await;
489                true
490            } else {
491                // Process already terminated
492                debug!("Process {} already terminated", pid);
493                true
494            }
495        } else {
496            // kill -0 command failed, assume process doesn't exist
497            debug!("Process {} check failed, assuming terminated", pid);
498            true
499        }
500    }
501
502    /// Clean up providers with terminated processes
503    pub async fn cleanup_dead_providers(&self) -> Result<()> {
504        let mut dead_providers = Vec::new();
505
506        for (provider_name, provider) in &self.providers {
507            // Try to check if provider is still alive by attempting a quick operation
508            let provider_health_check = tokio::time::timeout(
509                tokio::time::Duration::from_secs(2),
510                provider.has_tool("ping"),
511            )
512            .await;
513
514            match provider_health_check {
515                Ok(Ok(_)) => {
516                    // Provider is responsive
517                    debug!("MCP provider '{}' is healthy", provider_name);
518                }
519                Ok(Err(e)) => {
520                    let error_msg = e.to_string();
521                    if error_msg.contains("No such process") || error_msg.contains("ESRCH") {
522                        warn!(
523                            "MCP provider '{}' has terminated process, marking for cleanup",
524                            provider_name
525                        );
526                        dead_providers.push(provider_name.clone());
527                    } else {
528                        debug!(
529                            "MCP provider '{}' returned error but process may be alive: {}",
530                            provider_name, e
531                        );
532                    }
533                }
534                Err(_timeout) => {
535                    warn!(
536                        "MCP provider '{}' health check timed out, may be unresponsive",
537                        provider_name
538                    );
539                    // Don't mark as dead on timeout, might just be slow
540                }
541            }
542        }
543
544        // Note: In a real implementation, we'd want to remove dead providers from the providers map
545        // For now, we'll just log them
546        if !dead_providers.is_empty() {
547            warn!(
548                "Found {} dead MCP providers: {:?}",
549                dead_providers.len(),
550                dead_providers
551            );
552        }
553
554        Ok(())
555    }
556
557    /// List all available MCP tools across all providers
558    pub async fn list_tools(&self) -> Result<Vec<McpToolInfo>> {
559        if !self.config.enabled {
560            debug!("MCP client is disabled, returning empty tool list");
561            return Ok(Vec::new());
562        }
563
564        if self.providers.is_empty() {
565            debug!("No MCP providers configured, returning empty tool list");
566            return Ok(Vec::new());
567        }
568
569        let mut all_tools = Vec::new();
570        let mut errors = Vec::new();
571
572        let allowlist_snapshot = self.allowlist.read().clone();
573
574        for (provider_name, provider) in &self.providers {
575            let provider_id = provider_name.as_str();
576            match tokio::time::timeout(tokio::time::Duration::from_secs(15), provider.list_tools())
577                .await
578            {
579                Ok(Ok(tools)) => {
580                    debug!(
581                        "Provider '{}' has {} tools",
582                        provider_name,
583                        tools.tools.len()
584                    );
585
586                    for tool in tools.tools {
587                        let tool_name = tool.name.as_ref();
588
589                        if allowlist_snapshot.is_tool_allowed(provider_id, tool_name) {
590                            self.record_tool_provider(provider_id, tool_name);
591                            all_tools.push(McpToolInfo {
592                                name: tool_name.to_string(),
593                                description: tool.description.unwrap_or_default().to_string(),
594                                provider: provider_name.clone(),
595                                input_schema: serde_json::to_value(&*tool.input_schema)
596                                    .unwrap_or(Value::Null),
597                            });
598                        } else {
599                            self.audit_log(
600                                Some(provider_id),
601                                "mcp.tool_filtered",
602                                Level::DEBUG,
603                                format!(
604                                    "Filtered tool '{}' from provider '{}' due to allow list",
605                                    tool_name, provider_id
606                                ),
607                            );
608                        }
609                    }
610                }
611                Ok(Err(e)) => {
612                    let error_msg = e.to_string();
613                    if error_msg.contains("No such process")
614                        || error_msg.contains("ESRCH")
615                        || error_msg.contains("EPIPE")
616                        || error_msg.contains("Broken pipe")
617                        || error_msg.contains("write EPIPE")
618                    {
619                        debug!(
620                            "MCP provider '{}' process/pipe terminated during tool listing (normal during shutdown): {}",
621                            provider_name, e
622                        );
623                    } else {
624                        warn!(
625                            "Failed to list tools for provider '{}': {}",
626                            provider_name, e
627                        );
628                    }
629                    let error_msg = format!(
630                        "Failed to list tools for provider '{}': {}",
631                        provider_name, e
632                    );
633                    errors.push(error_msg);
634                }
635                Err(_timeout) => {
636                    warn!("MCP provider '{}' tool listing timed out", provider_name);
637                    let error_msg =
638                        format!("Tool listing timeout for provider '{}'", provider_name);
639                    errors.push(error_msg);
640                }
641            }
642        }
643
644        if !errors.is_empty() {
645            warn!(
646                "Encountered {} errors while listing MCP tools: {:?}",
647                errors.len(),
648                errors
649            );
650        }
651
652        info!(
653            "Found {} total MCP tools across all providers",
654            all_tools.len()
655        );
656        Ok(all_tools)
657    }
658
659    /// Execute a tool call on the appropriate MCP provider
660    pub async fn execute_tool(&self, tool_name: &str, args: Value) -> Result<Value> {
661        if !self.config.enabled {
662            return Err(anyhow::anyhow!("MCP client is disabled"));
663        }
664
665        if self.providers.is_empty() {
666            return Err(anyhow::anyhow!("No MCP providers configured"));
667        }
668
669        let tool_name_owned = tool_name.to_string();
670        debug!("Executing MCP tool '{}' with args: {}", tool_name, args);
671
672        // Find the provider that has this tool
673        let provider_name = {
674            let mut found_provider = None;
675            let mut provider_errors = Vec::new();
676
677            for (name, provider) in &self.providers {
678                match provider.has_tool(&tool_name_owned).await {
679                    Ok(true) => {
680                        found_provider = Some(name.clone());
681                        break;
682                    }
683                    Ok(false) => continue,
684                    Err(e) => {
685                        let error_msg = format!(
686                            "Error checking tool availability for provider '{}': {}",
687                            name, e
688                        );
689                        warn!("{}", error_msg);
690                        provider_errors.push(error_msg);
691                    }
692                }
693            }
694
695            found_provider.ok_or_else(|| {
696                let error_msg = format!(
697                    "Tool '{}' not found in any MCP provider. Provider errors: {:?}",
698                    tool_name, provider_errors
699                );
700                anyhow::anyhow!(error_msg)
701            })?
702        };
703
704        debug!("Found tool '{}' in provider '{}'", tool_name, provider_name);
705
706        if !self
707            .allowlist
708            .read()
709            .is_tool_allowed(provider_name.as_str(), tool_name)
710        {
711            let message = format!(
712                "Tool '{}' from provider '{}' is not permitted by the MCP allow list",
713                tool_name, provider_name
714            );
715            self.audit_log(
716                Some(provider_name.as_str()),
717                "mcp.tool_denied",
718                Level::WARN,
719                message.as_str(),
720            );
721            return Err(anyhow::anyhow!(message));
722        }
723
724        self.record_tool_provider(provider_name.as_str(), tool_name);
725
726        let provider = self.providers.get(&provider_name).ok_or_else(|| {
727            anyhow::anyhow!("Provider '{}' not found after discovery", provider_name)
728        })?;
729
730        // Get or create connection for this provider
731        let connection = match self.get_or_create_connection(provider).await {
732            Ok(conn) => conn,
733            Err(e) => {
734                error!(
735                    "Failed to establish connection to provider '{}': {}",
736                    provider_name, e
737                );
738                return Err(e);
739            }
740        };
741
742        // Execute the tool call
743        match connection
744            .call_tool(CallToolRequestParam {
745                name: tool_name_owned.into(),
746                arguments: args.as_object().cloned(),
747            })
748            .await
749        {
750            Ok(result) => match Self::format_tool_result(provider_name.as_str(), tool_name, result)
751            {
752                Ok(serialized) => {
753                    info!(
754                        "Successfully executed MCP tool '{}' via provider '{}'",
755                        tool_name, provider_name
756                    );
757                    self.audit_log(
758                        Some(provider_name.as_str()),
759                        "mcp.tool_execution",
760                        Level::INFO,
761                        format!(
762                            "Successfully executed MCP tool '{}' via provider '{}'",
763                            tool_name, provider_name
764                        ),
765                    );
766                    Ok(serialized)
767                }
768                Err(err) => {
769                    let err_message = err.to_string();
770                    warn!(
771                        "MCP tool '{}' via provider '{}' returned an error payload: {}",
772                        tool_name, provider_name, err_message
773                    );
774                    self.audit_log(
775                        Some(provider_name.as_str()),
776                        "mcp.tool_failed",
777                        Level::WARN,
778                        format!(
779                            "MCP tool '{}' via provider '{}' returned an error payload: {}",
780                            tool_name, provider_name, err_message
781                        ),
782                    );
783                    Err(err)
784                }
785            },
786            Err(e) => {
787                let error_message = e.to_string();
788
789                error!(
790                    "MCP tool '{}' failed on provider '{}': {}",
791                    tool_name, provider_name, error_message
792                );
793                self.audit_log(
794                    Some(provider_name.as_str()),
795                    "mcp.tool_failed",
796                    Level::WARN,
797                    format!(
798                        "MCP tool '{}' failed on provider '{}': {}",
799                        tool_name, provider_name, error_message
800                    ),
801                );
802
803                // Handle different types of connection errors
804                if error_message.contains("EPIPE")
805                    || error_message.contains("Broken pipe")
806                    || error_message.contains("write EPIPE")
807                    || error_message.contains("No such process")
808                    || error_message.contains("ESRCH")
809                {
810                    // Drop the stale connection so a fresh process can be created next time
811                    let mut connections = self.active_connections.lock().await;
812                    connections.remove(&provider_name);
813                    // Remove cached tool-provider mapping so it is refreshed on reconnect
814                    self.tool_provider_index
815                        .write()
816                        .retain(|_, provider| provider != &provider_name);
817
818                    return Err(anyhow::anyhow!(
819                        "MCP provider '{}' disconnected unexpectedly while executing '{}'. The provider process may have terminated. Try re-running the command to restart the provider.",
820                        provider_name,
821                        tool_name
822                    ));
823                } else if error_message.contains("timeout") || error_message.contains("Timeout") {
824                    // Drop the stale connection on timeout
825                    let mut connections = self.active_connections.lock().await;
826                    connections.remove(&provider_name);
827
828                    return Err(anyhow::anyhow!(
829                        "MCP tool '{}' execution timed out on provider '{}'. The provider may be unresponsive. Try re-running the command.",
830                        tool_name,
831                        provider_name
832                    ));
833                } else if error_message.contains("permission")
834                    || error_message.contains("Permission denied")
835                {
836                    return Err(anyhow::anyhow!(
837                        "Permission denied executing MCP tool '{}' on provider '{}': {}",
838                        tool_name,
839                        provider_name,
840                        error_message
841                    ));
842                } else if error_message.contains("network")
843                    || error_message.contains("Connection refused")
844                {
845                    return Err(anyhow::anyhow!(
846                        "Network error executing MCP tool '{}' on provider '{}': {}",
847                        tool_name,
848                        provider_name,
849                        error_message
850                    ));
851                }
852
853                Err(anyhow::anyhow!(
854                    "MCP tool execution failed: {}",
855                    error_message
856                ))
857            }
858        }
859    }
860
861    /// Get or create a connection to the specified provider
862    async fn get_or_create_connection(
863        &self,
864        provider: &McpProvider,
865    ) -> Result<Arc<RunningMcpService>> {
866        let provider_name = &provider.config.name;
867        debug!("Getting connection for MCP provider '{}'", provider_name);
868
869        let mut connections = self.active_connections.lock().await;
870
871        if !connections.contains_key(provider_name) {
872            debug!("Creating new connection for provider '{}'", provider_name);
873
874            // Add timeout for connection creation
875            match tokio::time::timeout(tokio::time::Duration::from_secs(30), provider.connect())
876                .await
877            {
878                Ok(Ok(connection)) => {
879                    let connection = Arc::new(connection);
880                    connections.insert(provider_name.clone(), Arc::clone(&connection));
881                    debug!(
882                        "Successfully created connection for provider '{}'",
883                        provider_name
884                    );
885                    Ok(connection)
886                }
887                Ok(Err(e)) => {
888                    let error_msg = e.to_string();
889                    if error_msg.contains("HTTP MCP server support") {
890                        warn!(
891                            "Provider '{}' uses HTTP transport which is not fully implemented: {}",
892                            provider_name, e
893                        );
894                        Err(anyhow::anyhow!(
895                            "HTTP MCP transport not fully implemented for provider '{}'. Consider using stdio transport instead.",
896                            provider_name
897                        ))
898                    } else if error_msg.contains("command not found")
899                        || error_msg.contains("No such file")
900                    {
901                        error!("Command not found for provider '{}': {}", provider_name, e);
902                        Err(anyhow::anyhow!(
903                            "Command '{}' not found for MCP provider '{}'. Please ensure the MCP server is installed and accessible.",
904                            self.config
905                                .providers
906                                .iter()
907                                .find(|p| p.name == *provider_name)
908                                .map(|p| match &p.transport {
909                                    McpTransportConfig::Stdio(stdio) => stdio.command.as_str(),
910                                    _ => "unknown",
911                                })
912                                .unwrap_or("unknown"),
913                            provider_name
914                        ))
915                    } else if error_msg.contains("permission")
916                        || error_msg.contains("Permission denied")
917                    {
918                        error!(
919                            "Permission denied creating connection for provider '{}': {}",
920                            provider_name, e
921                        );
922                        Err(anyhow::anyhow!(
923                            "Permission denied executing MCP server for provider '{}': {}",
924                            provider_name,
925                            error_msg
926                        ))
927                    } else {
928                        error!(
929                            "Failed to create connection for provider '{}': {}",
930                            provider_name, e
931                        );
932                        Err(anyhow::anyhow!(
933                            "Failed to create connection for MCP provider '{}': {}",
934                            provider_name,
935                            error_msg
936                        ))
937                    }
938                }
939                Err(_timeout) => {
940                    error!(
941                        "Connection creation timed out for provider '{}' after {} seconds",
942                        provider_name, 30
943                    );
944                    Err(anyhow::anyhow!(
945                        "Connection creation timed out for MCP provider '{}' after {} seconds. The provider may be slow to start or unresponsive.",
946                        provider_name,
947                        30
948                    ))
949                }
950            }
951        } else {
952            // Validate existing connection is still healthy
953            let existing_connection = connections.get(provider_name).unwrap().clone();
954
955            // Quick health check - try to use the connection
956            if let Err(e) = self
957                .validate_connection(provider_name, &existing_connection)
958                .await
959            {
960                debug!(
961                    "Existing connection for provider '{}' is unhealthy, creating new one: {}",
962                    provider_name, e
963                );
964
965                // Remove the unhealthy connection
966                connections.remove(provider_name);
967
968                // Create new connection
969                match tokio::time::timeout(tokio::time::Duration::from_secs(30), provider.connect())
970                    .await
971                {
972                    Ok(Ok(connection)) => {
973                        let connection = Arc::new(connection);
974                        connections.insert(provider_name.clone(), Arc::clone(&connection));
975                        debug!(
976                            "Successfully created new connection for provider '{}'",
977                            provider_name
978                        );
979                        Ok(connection)
980                    }
981                    Ok(Err(e)) => {
982                        error!(
983                            "Failed to create replacement connection for provider '{}': {}",
984                            provider_name, e
985                        );
986                        Err(e)
987                    }
988                    Err(_timeout) => {
989                        error!(
990                            "Replacement connection creation timed out for provider '{}'",
991                            provider_name
992                        );
993                        Err(anyhow::anyhow!(
994                            "Replacement connection timeout for provider '{}'",
995                            provider_name
996                        ))
997                    }
998                }
999            } else {
1000                debug!(
1001                    "Reusing existing healthy connection for provider '{}'",
1002                    provider_name
1003                );
1004                Ok(existing_connection)
1005            }
1006        }
1007    }
1008
1009    /// Validate that an existing connection is still healthy
1010    async fn validate_connection(
1011        &self,
1012        provider_name: &str,
1013        connection: &RunningMcpService,
1014    ) -> Result<()> {
1015        debug!(
1016            "Validating connection health for provider '{}'",
1017            provider_name
1018        );
1019
1020        // Try to ping the connection with a simple tool check
1021        // Use a very short timeout to avoid blocking
1022        match tokio::time::timeout(
1023            tokio::time::Duration::from_secs(2),
1024            connection.list_tools(Default::default()),
1025        )
1026        .await
1027        {
1028            Ok(Ok(_)) => {
1029                debug!(
1030                    "Connection health check passed for provider '{}'",
1031                    provider_name
1032                );
1033                Ok(())
1034            }
1035            Ok(Err(e)) => {
1036                let error_msg = e.to_string();
1037                debug!(
1038                    "Connection health check failed for provider '{}': {}",
1039                    provider_name, error_msg
1040                );
1041                Err(anyhow::anyhow!(
1042                    "Connection health check failed for provider '{}': {}",
1043                    provider_name,
1044                    error_msg
1045                ))
1046            }
1047            Err(_) => {
1048                debug!(
1049                    "Connection health check timed out for provider '{}'",
1050                    provider_name
1051                );
1052                Err(anyhow::anyhow!(
1053                    "Connection health check timed out for provider '{}'",
1054                    provider_name
1055                ))
1056            }
1057        }
1058    }
1059
1060    fn audit_log(
1061        &self,
1062        provider: Option<&str>,
1063        channel: &str,
1064        level: Level,
1065        message: impl AsRef<str>,
1066    ) {
1067        let logging_allowed = {
1068            let allowlist = self.allowlist.read();
1069            allowlist.is_logging_channel_allowed(provider, channel)
1070        };
1071
1072        if !logging_allowed {
1073            return;
1074        }
1075
1076        let msg = message.as_ref();
1077        match level {
1078            Level::ERROR => error!(target: "mcp", "[{}] {}", channel, msg),
1079            Level::WARN => warn!(target: "mcp", "[{}] {}", channel, msg),
1080            Level::INFO => info!(target: "mcp", "[{}] {}", channel, msg),
1081            Level::DEBUG => debug!(target: "mcp", "[{}] {}", channel, msg),
1082            _ => debug!(target: "mcp", "[{}] {}", channel, msg),
1083        }
1084    }
1085
1086    /// Shutdown all MCP connections
1087    pub async fn shutdown(&self) -> Result<()> {
1088        info!("Shutting down MCP client and all provider connections");
1089
1090        let mut connections = self.active_connections.lock().await;
1091
1092        if connections.is_empty() {
1093            info!("No active MCP connections to shutdown");
1094            return Ok(());
1095        }
1096
1097        info!(
1098            "Shutting down {} MCP provider connections",
1099            connections.len()
1100        );
1101
1102        let cancellation_tokens: Vec<(String, rmcp::service::RunningServiceCancellationToken)> =
1103            connections
1104                .iter()
1105                .map(|(provider_name, connection)| {
1106                    debug!(
1107                        "Initiating graceful shutdown for MCP provider '{}'",
1108                        provider_name
1109                    );
1110                    (provider_name.clone(), connection.cancellation_token())
1111                })
1112                .collect();
1113
1114        for (provider_name, token) in cancellation_tokens {
1115            debug!(
1116                "Cancelling MCP provider '{}' via cancellation token",
1117                provider_name
1118            );
1119            token.cancel();
1120        }
1121
1122        // Give connections a grace period to shutdown cleanly
1123        let shutdown_timeout = tokio::time::Duration::from_secs(5);
1124        let shutdown_start = std::time::Instant::now();
1125
1126        // Wait for graceful shutdown or timeout
1127        while shutdown_start.elapsed() < shutdown_timeout && !connections.is_empty() {
1128            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1129
1130            // Remove any connections that have been dropped
1131            connections.retain(|_, connection| {
1132                // Check if the connection is still valid
1133                Arc::strong_count(connection) > 1 // At least our reference and possibly others
1134            });
1135        }
1136
1137        // Force shutdown any remaining connections
1138        let remaining_count = connections.len();
1139        if remaining_count > 0 {
1140            warn!(
1141                "{} MCP provider connections did not shutdown gracefully within timeout, forcing shutdown",
1142                remaining_count
1143            );
1144        }
1145
1146        // Clear all connections (this will drop them and should terminate processes)
1147        let drained_connections: Vec<_> = connections.drain().collect();
1148        drop(connections);
1149
1150        for (provider_name, connection) in drained_connections {
1151            debug!("Force shutting down MCP provider '{}'", provider_name);
1152
1153            if let Ok(connection) = Arc::try_unwrap(connection) {
1154                debug!(
1155                    "Awaiting MCP provider '{}' task cancellation after graceful request",
1156                    provider_name
1157                );
1158
1159                match connection.cancel().await {
1160                    Ok(quit_reason) => {
1161                        debug!(
1162                            "MCP provider '{}' cancellation completed with reason: {:?}",
1163                            provider_name, quit_reason
1164                        );
1165                    }
1166                    Err(err) => {
1167                        debug!(
1168                            "MCP provider '{}' cancellation join error (non-critical): {}",
1169                            provider_name, err
1170                        );
1171                    }
1172                }
1173            } else {
1174                debug!(
1175                    "Additional references exist for MCP provider '{}'; dropping without awaiting",
1176                    provider_name
1177                );
1178            }
1179        }
1180
1181        // Give processes time to terminate gracefully
1182        tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
1183
1184        // Additional cleanup: try to kill any remaining MCP provider processes
1185        // This handles cases where the rmcp library doesn't properly terminate processes
1186        self.kill_remaining_mcp_processes().await;
1187
1188        info!("MCP client shutdown complete");
1189        Ok(())
1190    }
1191}
1192
1193/// Information about an MCP tool
1194#[derive(Debug, Clone)]
1195pub struct McpToolInfo {
1196    pub name: String,
1197    pub description: String,
1198    pub provider: String,
1199    pub input_schema: Value,
1200}
1201
1202/// Individual MCP provider wrapper
1203pub struct McpProvider {
1204    config: McpProviderConfig,
1205    tools_cache: Arc<Mutex<Option<ListToolsResult>>>,
1206}
1207
1208impl McpProvider {
1209    /// Create a new MCP provider
1210    pub async fn new(config: McpProviderConfig) -> Result<Self> {
1211        Ok(Self {
1212            config,
1213            tools_cache: Arc::new(Mutex::new(None)),
1214        })
1215    }
1216
1217    /// List tools available from this provider
1218    pub async fn list_tools(&self) -> Result<ListToolsResult> {
1219        let provider_name = &self.config.name;
1220        debug!("Listing tools for MCP provider '{}'", provider_name);
1221
1222        // Check cache first
1223        {
1224            let cache = self.tools_cache.lock().await;
1225            if let Some(cached) = cache.as_ref() {
1226                debug!("Using cached tools for provider '{}'", provider_name);
1227                return Ok(cached.clone());
1228            }
1229        }
1230
1231        debug!("Connecting to provider '{}' to fetch tools", provider_name);
1232
1233        // Connect and get tools
1234        match self.connect().await {
1235            Ok(connection) => {
1236                match connection.list_tools(Default::default()).await {
1237                    Ok(tools) => {
1238                        debug!(
1239                            "Found {} tools for provider '{}'",
1240                            tools.tools.len(),
1241                            provider_name
1242                        );
1243
1244                        // Cache the result
1245                        {
1246                            let mut cache = self.tools_cache.lock().await;
1247                            *cache = Some(tools.clone());
1248                        }
1249
1250                        Ok(tools)
1251                    }
1252                    Err(e) => {
1253                        error!(
1254                            "Failed to list tools for provider '{}': {}",
1255                            provider_name, e
1256                        );
1257                        Err(anyhow::anyhow!("Failed to list tools: {}", e))
1258                    }
1259                }
1260            }
1261            Err(e) => {
1262                error!("Failed to connect to provider '{}': {}", provider_name, e);
1263                Err(e)
1264            }
1265        }
1266    }
1267
1268    /// Check if this provider has a specific tool
1269    pub async fn has_tool(&self, tool_name: &str) -> Result<bool> {
1270        let provider_name = &self.config.name;
1271        debug!(
1272            "Checking if provider '{}' has tool '{}'",
1273            provider_name, tool_name
1274        );
1275
1276        match tokio::time::timeout(tokio::time::Duration::from_secs(10), self.list_tools()).await {
1277            Ok(Ok(tools)) => {
1278                let has_tool = tools.tools.iter().any(|tool| tool.name == tool_name);
1279                debug!(
1280                    "Provider '{}' {} tool '{}'",
1281                    provider_name,
1282                    if has_tool { "has" } else { "does not have" },
1283                    tool_name
1284                );
1285                Ok(has_tool)
1286            }
1287            Ok(Err(e)) => {
1288                let error_msg = e.to_string();
1289                if error_msg.contains("No such process")
1290                    || error_msg.contains("ESRCH")
1291                    || error_msg.contains("EPIPE")
1292                    || error_msg.contains("Broken pipe")
1293                    || error_msg.contains("write EPIPE")
1294                {
1295                    debug!(
1296                        "MCP provider '{}' process/pipe terminated during tool check (normal during shutdown): {}",
1297                        provider_name, e
1298                    );
1299                } else {
1300                    warn!(
1301                        "Failed to check tool availability for provider '{}': {}",
1302                        provider_name, e
1303                    );
1304                }
1305                Err(e)
1306            }
1307            Err(_timeout) => {
1308                warn!("MCP provider '{}' tool check timed out", provider_name);
1309                Err(anyhow::anyhow!("Tool availability check timeout"))
1310            }
1311        }
1312    }
1313
1314    /// Connect to this MCP provider
1315    async fn connect(&self) -> Result<RunningMcpService> {
1316        let provider_name = &self.config.name;
1317        info!("Connecting to MCP provider '{}'", provider_name);
1318
1319        match &self.config.transport {
1320            McpTransportConfig::Stdio(stdio_config) => {
1321                debug!("Using stdio transport for provider '{}'", provider_name);
1322                self.connect_stdio(stdio_config).await
1323            }
1324            McpTransportConfig::Http(http_config) => {
1325                debug!("Using HTTP transport for provider '{}'", provider_name);
1326                self.connect_http(http_config).await
1327            }
1328        }
1329    }
1330
1331    /// Connect using HTTP transport
1332    async fn connect_http(
1333        &self,
1334        config: &crate::config::mcp::McpHttpServerConfig,
1335    ) -> Result<RunningMcpService> {
1336        let provider_name = &self.config.name;
1337        debug!(
1338            "Setting up HTTP connection for provider '{}'",
1339            provider_name
1340        );
1341
1342        // Build the HTTP client with proper headers
1343        let mut headers = HeaderMap::new();
1344        headers.insert("Content-Type", "application/json".parse().unwrap());
1345
1346        // Add API key if provided
1347        if let Some(api_key_env) = &config.api_key_env {
1348            if let Ok(api_key) = std::env::var(api_key_env) {
1349                headers.insert(
1350                    "Authorization",
1351                    format!("Bearer {}", api_key).parse().unwrap(),
1352                );
1353            } else {
1354                warn!(
1355                    "API key environment variable '{}' not found for provider '{}'",
1356                    api_key_env, provider_name
1357                );
1358            }
1359        }
1360
1361        // Add custom headers
1362        for (key, value) in &config.headers {
1363            if let (Ok(header_name), Ok(header_value)) =
1364                (key.parse::<HeaderName>(), value.parse::<HeaderValue>())
1365            {
1366                headers.insert(header_name, header_value);
1367            }
1368        }
1369
1370        let client = reqwest::Client::builder()
1371            .default_headers(headers)
1372            .timeout(std::time::Duration::from_secs(30))
1373            .build()
1374            .context("Failed to build HTTP client")?;
1375
1376        // Test basic connectivity first
1377        debug!(
1378            "Testing HTTP MCP server connectivity at '{}'",
1379            config.endpoint
1380        );
1381
1382        match client.get(&config.endpoint).send().await {
1383            Ok(response) => {
1384                let status = response.status();
1385                if status.is_success() {
1386                    debug!(
1387                        "HTTP MCP server at '{}' is reachable (status: {})",
1388                        config.endpoint, status
1389                    );
1390
1391                    // Check if the server supports MCP by looking for the MCP endpoint
1392                    // According to MCP spec, servers should expose tools at /mcp endpoint
1393                    let mcp_endpoint = if config.endpoint.ends_with('/') {
1394                        format!("{}mcp", config.endpoint)
1395                    } else {
1396                        format!("{}/mcp", config.endpoint)
1397                    };
1398
1399                    debug!("Attempting to connect to MCP endpoint: {}", mcp_endpoint);
1400
1401                    // Try to connect to the MCP endpoint
1402                    match client.get(&mcp_endpoint).send().await {
1403                        Ok(mcp_response) => {
1404                            if mcp_response.status().is_success() {
1405                                debug!(
1406                                    "MCP endpoint '{}' is available (status: {})",
1407                                    mcp_endpoint,
1408                                    mcp_response.status()
1409                                );
1410
1411                                // For now, return an error indicating this needs full streamable HTTP implementation
1412                                // A complete implementation would use Server-Sent Events (SSE) for streaming MCP
1413                                Err(anyhow::anyhow!(
1414                                    "HTTP MCP server detected at '{}' but full streamable HTTP implementation is required. \
1415                                     MCP endpoint is available at '{}'. \
1416                                     Consider using stdio transport or implement HTTP streaming support with Server-Sent Events.",
1417                                    config.endpoint,
1418                                    mcp_endpoint
1419                                ))
1420                            } else {
1421                                debug!(
1422                                    "MCP endpoint '{}' returned status: {}",
1423                                    mcp_endpoint,
1424                                    mcp_response.status()
1425                                );
1426                                Err(anyhow::anyhow!(
1427                                    "HTTP MCP server at '{}' does not support MCP protocol. \
1428                                     Expected MCP endpoint at '{}' but got status: {}. \
1429                                     Consider using stdio transport instead.",
1430                                    config.endpoint,
1431                                    mcp_endpoint,
1432                                    mcp_response.status()
1433                                ))
1434                            }
1435                        }
1436                        Err(e) => {
1437                            let error_msg = e.to_string();
1438                            debug!(
1439                                "Failed to connect to MCP endpoint '{}': {}",
1440                                mcp_endpoint, error_msg
1441                            );
1442
1443                            Err(anyhow::anyhow!(
1444                                "HTTP MCP server at '{}' does not properly support MCP protocol. \
1445                                 Could not connect to MCP endpoint at '{}': {}. \
1446                                 Consider using stdio transport instead.",
1447                                config.endpoint,
1448                                mcp_endpoint,
1449                                error_msg
1450                            ))
1451                        }
1452                    }
1453                } else {
1454                    Err(anyhow::anyhow!(
1455                        "HTTP MCP server returned error status: {} at endpoint: {}",
1456                        status,
1457                        config.endpoint
1458                    ))
1459                }
1460            }
1461            Err(e) => {
1462                let error_msg = e.to_string();
1463                if error_msg.contains("dns") || error_msg.contains("Name resolution") {
1464                    Err(anyhow::anyhow!(
1465                        "HTTP MCP server DNS resolution failed for '{}': {}",
1466                        config.endpoint,
1467                        e
1468                    ))
1469                } else if error_msg.contains("Connection refused") || error_msg.contains("connect")
1470                {
1471                    Err(anyhow::anyhow!(
1472                        "HTTP MCP server connection failed for '{}': {}",
1473                        config.endpoint,
1474                        e
1475                    ))
1476                } else {
1477                    Err(anyhow::anyhow!(
1478                        "HTTP MCP server error for '{}': {}",
1479                        config.endpoint,
1480                        e
1481                    ))
1482                }
1483            }
1484        }
1485    }
1486
1487    /// Connect using stdio transport
1488    async fn connect_stdio(
1489        &self,
1490        config: &crate::config::mcp::McpStdioServerConfig,
1491    ) -> Result<RunningMcpService> {
1492        let provider_name = &self.config.name;
1493        debug!(
1494            "Setting up stdio connection for provider '{}'",
1495            provider_name
1496        );
1497
1498        debug!("Command: {} with args: {:?}", config.command, config.args);
1499
1500        let mut command = Command::new(&config.command);
1501        command.args(&config.args);
1502
1503        // Set working directory if specified
1504        if let Some(working_dir) = &config.working_directory {
1505            debug!("Using working directory: {}", working_dir);
1506            command.current_dir(working_dir);
1507        }
1508
1509        // Set environment variables if specified
1510        if !self.config.env.is_empty() {
1511            debug!(
1512                "Setting environment variables for provider '{}'",
1513                provider_name
1514            );
1515            command.envs(&self.config.env);
1516        }
1517
1518        // Create new process group to ensure proper cleanup (Unix only)
1519        #[cfg(unix)]
1520        {
1521            #[allow(unused_imports)]
1522            use std::os::unix::process::CommandExt;
1523            command.process_group(0);
1524        }
1525
1526        debug!(
1527            "Creating TokioChildProcess for provider '{}'",
1528            provider_name
1529        );
1530
1531        match TokioChildProcess::new(command) {
1532            Ok(child_process) => {
1533                debug!(
1534                    "Successfully created child process for provider '{}'",
1535                    provider_name
1536                );
1537
1538                // Add timeout and better error handling for the MCP service
1539                let handler = LoggingClientHandler::new(provider_name);
1540
1541                match tokio::time::timeout(
1542                    tokio::time::Duration::from_secs(30),
1543                    handler.serve(child_process),
1544                )
1545                .await
1546                {
1547                    Ok(Ok(connection)) => {
1548                        info!(
1549                            "Successfully established connection to MCP provider '{}'",
1550                            provider_name
1551                        );
1552                        Ok(connection)
1553                    }
1554                    Ok(Err(e)) => {
1555                        // Check if this is a process-related error
1556                        let error_msg = e.to_string();
1557                        if error_msg.contains("No such process")
1558                            || error_msg.contains("ESRCH")
1559                            || error_msg.contains("EPIPE")
1560                            || error_msg.contains("Broken pipe")
1561                            || error_msg.contains("write EPIPE")
1562                        {
1563                            debug!(
1564                                "MCP provider '{}' pipe/process error during connection (normal during shutdown): {}",
1565                                provider_name, e
1566                            );
1567                            Err(anyhow::anyhow!("MCP provider connection terminated: {}", e))
1568                        } else {
1569                            error!(
1570                                "Failed to establish MCP connection for provider '{}': {}",
1571                                provider_name, e
1572                            );
1573                            Err(anyhow::anyhow!("Failed to serve MCP connection: {}", e))
1574                        }
1575                    }
1576                    Err(_timeout) => {
1577                        warn!(
1578                            "MCP provider '{}' connection timed out after 30 seconds",
1579                            provider_name
1580                        );
1581                        Err(anyhow::anyhow!("MCP provider connection timeout"))
1582                    }
1583                }
1584            }
1585            Err(e) => {
1586                // Check if this is a process creation error
1587                let error_msg = e.to_string();
1588                if error_msg.contains("No such process") || error_msg.contains("ESRCH") {
1589                    error!(
1590                        "Failed to create child process for provider '{}' - process may have terminated: {}",
1591                        provider_name, e
1592                    );
1593                } else {
1594                    error!(
1595                        "Failed to create child process for provider '{}': {}",
1596                        provider_name, e
1597                    );
1598                }
1599                Err(anyhow::anyhow!("Failed to create child process: {}", e))
1600            }
1601        }
1602    }
1603}
1604
1605/// Type alias for running MCP service
1606type RunningMcpService =
1607    rmcp::service::RunningService<rmcp::service::RoleClient, LoggingClientHandler>;
1608
1609/// Status information about the MCP client
1610#[derive(Debug, Clone)]
1611pub struct McpClientStatus {
1612    pub enabled: bool,
1613    pub provider_count: usize,
1614    pub active_connections: usize,
1615    pub configured_providers: Vec<String>,
1616}
1617
1618impl McpClient {
1619    /// Get MCP client status information
1620    pub fn get_status(&self) -> McpClientStatus {
1621        McpClientStatus {
1622            enabled: self.config.enabled,
1623            provider_count: self.providers.len(),
1624            active_connections: self
1625                .active_connections
1626                .try_lock()
1627                .map(|connections| connections.len())
1628                .unwrap_or(0),
1629            configured_providers: self.providers.keys().cloned().collect(),
1630        }
1631    }
1632}
1633
1634/// Trait for MCP tool execution
1635#[async_trait]
1636pub trait McpToolExecutor: Send + Sync {
1637    /// Execute an MCP tool
1638    async fn execute_mcp_tool(&self, tool_name: &str, args: Value) -> Result<Value>;
1639
1640    /// List available MCP tools
1641    async fn list_mcp_tools(&self) -> Result<Vec<McpToolInfo>>;
1642
1643    /// Check if an MCP tool exists
1644    async fn has_mcp_tool(&self, tool_name: &str) -> Result<bool>;
1645
1646    /// Get MCP client status information
1647    fn get_status(&self) -> McpClientStatus;
1648}
1649
1650#[async_trait]
1651impl McpToolExecutor for McpClient {
1652    async fn execute_mcp_tool(&self, tool_name: &str, args: Value) -> Result<Value> {
1653        self.execute_tool(tool_name, args).await
1654    }
1655
1656    async fn list_mcp_tools(&self) -> Result<Vec<McpToolInfo>> {
1657        self.list_tools().await
1658    }
1659
1660    async fn has_mcp_tool(&self, tool_name: &str) -> Result<bool> {
1661        if self.providers.is_empty() {
1662            return Ok(false);
1663        }
1664
1665        let mut provider_errors = Vec::new();
1666
1667        for (provider_name, provider) in &self.providers {
1668            let provider_id = provider_name.as_str();
1669            match provider.has_tool(tool_name).await {
1670                Ok(true) => {
1671                    if self
1672                        .allowlist
1673                        .read()
1674                        .is_tool_allowed(provider_id, tool_name)
1675                    {
1676                        self.record_tool_provider(provider_id, tool_name);
1677                        return Ok(true);
1678                    }
1679
1680                    self.audit_log(
1681                        Some(provider_id),
1682                        "mcp.tool_denied",
1683                        Level::DEBUG,
1684                        format!(
1685                            "Tool '{}' exists on provider '{}' but is blocked by allow list",
1686                            tool_name, provider_id
1687                        ),
1688                    );
1689                }
1690                Ok(false) => continue,
1691                Err(e) => {
1692                    let error_msg = format!("Error checking provider '{}': {}", provider_name, e);
1693                    warn!("{}", error_msg);
1694                    provider_errors.push(error_msg);
1695                }
1696            }
1697        }
1698
1699        if !provider_errors.is_empty() {
1700            debug!(
1701                "Encountered {} errors while checking tool availability: {:?}",
1702                provider_errors.len(),
1703                provider_errors
1704            );
1705
1706            let summary = provider_errors.join("; ");
1707            return Err(anyhow::anyhow!(
1708                "Failed to confirm MCP tool '{}' availability. {}",
1709                tool_name,
1710                summary
1711            ));
1712        }
1713
1714        Ok(false)
1715    }
1716
1717    fn get_status(&self) -> McpClientStatus {
1718        self.get_status()
1719    }
1720}
1721
1722#[cfg(test)]
1723mod tests {
1724    use super::*;
1725    use crate::config::mcp::{McpStdioServerConfig, McpTransportConfig};
1726    use rmcp::model::{Content, Meta};
1727    use serde_json::json;
1728
1729    #[test]
1730    fn test_mcp_client_creation() {
1731        let config = McpClientConfig::default();
1732        let client = McpClient::new(config);
1733        assert!(!client.config.enabled);
1734        assert!(client.providers.is_empty());
1735    }
1736
1737    #[test]
1738    fn test_mcp_tool_info() {
1739        let tool_info = McpToolInfo {
1740            name: "test_tool".to_string(),
1741            description: "A test tool".to_string(),
1742            provider: "test_provider".to_string(),
1743            input_schema: json!({
1744                "type": "object",
1745                "properties": {
1746                    "input": {"type": "string"}
1747                }
1748            }),
1749        };
1750
1751        assert_eq!(tool_info.name, "test_tool");
1752        assert_eq!(tool_info.provider, "test_provider");
1753    }
1754
1755    #[test]
1756    fn test_provider_config() {
1757        let config = McpProviderConfig {
1758            name: "test".to_string(),
1759            transport: McpTransportConfig::Stdio(McpStdioServerConfig {
1760                command: "echo".to_string(),
1761                args: vec!["hello".to_string()],
1762                working_directory: None,
1763            }),
1764            env: HashMap::new(),
1765            enabled: true,
1766            max_concurrent_requests: 3,
1767        };
1768
1769        assert_eq!(config.name, "test");
1770        assert!(config.enabled);
1771        assert_eq!(config.max_concurrent_requests, 3);
1772    }
1773
1774    #[test]
1775    fn test_tool_info_creation() {
1776        let tool_info = McpToolInfo {
1777            name: "test_tool".to_string(),
1778            description: "A test tool".to_string(),
1779            provider: "test_provider".to_string(),
1780            input_schema: serde_json::json!({
1781                "type": "object",
1782                "properties": {
1783                    "input": {"type": "string"}
1784                }
1785            }),
1786        };
1787
1788        assert_eq!(tool_info.name, "test_tool");
1789        assert_eq!(tool_info.provider, "test_provider");
1790    }
1791
1792    #[test]
1793    fn test_format_tool_result_success() {
1794        let mut result = CallToolResult::structured(json!({
1795            "value": 42,
1796            "status": "ok"
1797        }));
1798        let mut meta = Meta::new();
1799        meta.insert("query".to_string(), Value::String("tokio".to_string()));
1800        result.meta = Some(meta);
1801
1802        let serialized = McpClient::format_tool_result("test", "demo", result).unwrap();
1803        assert_eq!(
1804            serialized.get("provider").and_then(Value::as_str),
1805            Some("test")
1806        );
1807        assert_eq!(serialized.get("tool").and_then(Value::as_str), Some("demo"));
1808        assert_eq!(serialized.get("status").and_then(Value::as_str), Some("ok"));
1809        assert_eq!(serialized.get("value").and_then(Value::as_i64), Some(42));
1810        assert_eq!(
1811            serialized
1812                .get("meta")
1813                .and_then(Value::as_object)
1814                .and_then(|map| map.get("query"))
1815                .and_then(Value::as_str),
1816            Some("tokio")
1817        );
1818    }
1819
1820    #[test]
1821    fn test_format_tool_result_error_detection() {
1822        let result = CallToolResult::structured_error(json!({
1823            "message": "something went wrong"
1824        }));
1825
1826        let error = McpClient::format_tool_result("test", "demo", result).unwrap_err();
1827        assert!(error.to_string().contains("something went wrong"));
1828    }
1829
1830    #[test]
1831    fn test_format_tool_result_error_from_text_content() {
1832        let result = CallToolResult::error(vec![Content::text("plain failure")]);
1833
1834        let error = McpClient::format_tool_result("test", "demo", result).unwrap_err();
1835        assert!(error.to_string().contains("plain failure"));
1836    }
1837}